Merge lp:~hazmat/txzookeeper/distributed-queue into lp:txzookeeper

Proposed by Kapil Thangavelu
Status: Merged
Merged at revision: 24
Proposed branch: lp:~hazmat/txzookeeper/distributed-queue
Merge into: lp:txzookeeper
Diff against target: 989 lines (+901/-13)
7 files modified
txzookeeper/lock.py (+12/-7)
txzookeeper/queue.py (+451/-0)
txzookeeper/tests/__init__.py (+0/-4)
txzookeeper/tests/test_client.py (+27/-0)
txzookeeper/tests/test_lock.py (+17/-0)
txzookeeper/tests/test_queue.py (+392/-0)
txzookeeper/todo.txt (+2/-2)
To merge this branch: bzr merge lp:~hazmat/txzookeeper/distributed-queue
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Approve
Review via email: mp+25712@code.launchpad.net

Description of the change

distributed multi producer / multi consumer queue implementation.

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (4.0 KiB)

[1]

+ self.path = path
+ self.client = client
+ self.persistent = persistent

I tend to prefer keeping things private, unless they are actually required by external clients, or would be obviously important to know about. This way it's easy to tell what's the "trusted" API clients are supposed to rely on, and also makes it more comfortable when changing the interface (anything private can be removed/renamed/replaced).

How do you feel about this in general?

[2]

+ self, path, client, acl=[ZOO_OPEN_ACL_UNSAFE], persistent=False):

Also a pretty general point which I'm making mostly to synchronize our thinking, rather than as a *required* change here:

I tend to prefer using the style of acl=None in the initialization of default parameters, and then process it internally in the constructor (if acl is None, acl = ... or similar).

While the result is obviously the same here, the main distinction is that it enables code using the function or class to say "I don't have anything special in this parameter.. just do your default.", which gets pretty tricky when the default value is in the keyword argument constructor itself.

[3]

+ return self._get(wait=True)
+
+ get_wait = get

Do we need all these alternatives? Part of the beauty of Twisted is that we don't really have to care about what "waiting" means.

I suggest we have a single interface for getting, and it will always return a deferred which will fire once an item is available, no matter what. This will also simplify a bit the logic elsewhere (in _refill, _get_item, etc).

[4]

+ d = self.client.create(
+ "/".join((self.path, self.prefix)), item, self._acl, flags)
+ return d

Nice. It feels pretty cool to be able to wait on a put this way.

[5]

+ d = self.client.exists(self.path)
+
+ def on_success(stat):
+ return stat["numChildren"]

Oh, interesting trick! I would imagine that getting the full list would be required, but this is of course a lot better.

[6]

+ Fetch the node data in the queue directory for the given node name. If
+ wait is
(...)
+ # tests. Instead we process our entire our node cache before
+ # proceeding.

Couple of comment details: "is ..." and "our entire our".

[7]

+ Refetch the queue children, setting a watch as needed, and invalidating
+ any previous children entries queue.

It would be nice to have a higher level description of what the algorithm is actually doing. E.g. what is the children entries queue about, what happens when it's empty, or when two different consumers have a partially overlapping queue, how are items consumed, etc.

[8]

In on_queue_items_changed():

+ self._cached_entries = []
+ d = self._refill(wait=wait)

Why is the cache being reset right after we're told changes have happened? Shouldn't this happen once we actually get the new list of children?

[9]

+ d = self._refill(wait=wait)
(...)
+ d = self.client.get_children(
+ self.path, on_queue_items_changed)

It'd be good to have more descriptive names for these variables, since one of th...

Read more...

25. By Kapil Thangavelu

address some review comments (6, 8, 10)

26. By Kapil Thangavelu

remove method alias get_wait for get, review #3

27. By Kapil Thangavelu

use private variables for internals, introduce read only properties for accessors to public attributes, per review comment #1

28. By Kapil Thangavelu

remove acl mutable default arg, force queue prefix to private

Revision history for this message
Kapil Thangavelu (hazmat) wrote :
Download full text (5.9 KiB)

On Fri, 28 May 2010 16:30:42 -0400, Gustavo Niemeyer
<email address hidden> wrote:

> [1]
>
> + self.path = path
> + self.client = client
> + self.persistent = persistent
>
> I tend to prefer keeping things private, unless they are actually
> required by external clients, or would be obviously important to know
> about. This way it's easy to tell what's the "trusted" API clients are
> supposed to rely on, and also makes it more comfortable when changing
> the interface (anything private can be removed/renamed/replaced).
>
> How do you feel about this in general?

In general i'm fine with it. i have mixed feeling about private is spelled
in some cases, i've dealt with libraries that take private (double under
style) to levels that made clean reuse or debugging difficult. In this
particular case, i'm fine with client being private (single underscore),
and path and persistent being read only properties and have made those
changes. In general its okay as long its not a double underscore and there
isn't a legitimate reason for the its use by consumer, if the latter i'd
tend to prefer at least read only property access if direct attribute
access would be dangerous.

>
>
> [2]
>
> + self, path, client, acl=[ZOO_OPEN_ACL_UNSAFE],
> persistent=False):
>
> Also a pretty general point which I'm making mostly to synchronize our
> thinking, rather than as a *required* change here:
>
> I tend to prefer using the style of acl=None in the initialization of
> default parameters, and then process it internally in the constructor
> (if acl is None, acl = ... or similar).
>
> While the result is obviously the same here, the main distinction is
> that it enables code using the function or class to say "I don't have
> anything special in this parameter.. just do your default.", which gets
> pretty tricky when the default value is in the keyword argument
> constructor itself.
>

i'm fine with that, to me its a minor tradeoff to quick reading the
function signature, but then again there's a whole class of bugs around
mutable default args. changed.

>
> [3]
>
> + return self._get(wait=True)
> +
> + get_wait = get
>
> Do we need all these alternatives? Part of the beauty of Twisted is
> that we don't really have to care about what "waiting" means.

right, originally i was trying to match signature provided by the queue
implementation. i think this alias is an artifact of when i changing the
defaults behavior of get (used to raise empty error). its removed now.

>
> I suggest we have a single interface for getting, and it will always
> return a deferred which will fire once an item is available, no matter
> what. This will also simplify a bit the logic elsewhere (in _refill,
> _get_item, etc).
>

hmmm.. i like having the option of both apis and the synchronicity to the
Queue.Queue api. alternatively perhaps a timeout option on the get api
would also suffice (also in Queue.Queue api). the logic simplification
 from the method removal is pretty minor, maybe four lines of code total
in the methods mentioned.

>
>
> [6]
>
> + Fetch the node data in the queue...

Read more...

29. By Kapil Thangavelu

remove get_nowait api

30. By Kapil Thangavelu

use trap for no node error handler, remove unused import queue.empty

31. By Kapil Thangavelu

remove old get_nowait tests.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

[3] I went ahead and removed the get_nowait api as per our discussion. in future the use case can be addressed with a timeout.

[9] no more scope name conflict on deferreds.

32. By Kapil Thangavelu

refactor distributed queue implementation

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

i refactored the implementation it no longer uses any cached values, and is more efficient about establishing watches on the queue.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Cool, nice improvements indeed!

I'm going to restart the review again, since it's a brand new implementation. I'll start the sequence on 12 just to avoid conflicts.

[12]

+ def __init__(self, deferred, watcher):
+ self.deferred = deferred
+ self.watcher = watcher
+ self.processing = False
+ self.refetch = False

I'm having a slightly hard time reading the code because of the generality of the names here. E.g. "refetch" what? "watcher" for what? "processing" what? Why is "processing" only turned on inside _get_item?

[13]

+ def on_queue_items_changed(*args):
+ """Event watcher on queue node child events."""
+ if request.complete or not self._client.connected:
+ return
+
+ if request.processing:
+ request.refetch = True
+ else:
+ self._get(request)

I think I might be missing some detail about the implementation, perhaps due to [12].

Let's say that things happen in the following order:

1) User calls get()
2) get() calls _get()
3) _get() hooks _get_item on the result, and the watcher above on changes
4) Result is returned, and _get_item() is queued for being called
5) watcher from (2) fires, and calls _get() again
6) Repeat (3)
7) Repeat (4)

In other words, it looks like _get_item might end up being queued multiple times even before it started running the first time.

[14]

I find a bit suspect that we're killing the item from the queue even before the user had a chance to know about its existence. This will make the queue somewhat prone to items being eaten by crashes.

[15]

+ # Refetching deferred until we process all the children from
+ # from a get children call.
+ if request.refetch:
+ request.processing = False
+ return self._get(request)

Again, going back to [12], I find the nomenclature a bit non-obvious to follow. Why is it not "processing" anymore on a refetch? Looks like there's a very tight relationship with the get children watcher in this logic, even though that's not being made explicit by names nor comments.

Also, I believe there's an issue here. Let's say things happen in this order:

1) children is [], refetch is False, request.processing is True
2) on_no_node_error is called on a NoNodeException
3) Due to the state, the exception is swallowed and nothing really changes
4) on_queue_items_changed is called
5) Given the state, refetch is made True
6) and... nothing else ever happens?

I think making names more explicit might help avoiding issues like these, since it'll be easier to reason about the logic there and know out of gut feeling when things aren't happening the way they should.

review: Needs Fixing
33. By Kapil Thangavelu

request.processing should be false anytime there are no children, not just on refetch.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :
Download full text (4.2 KiB)

On Tue, 08 Jun 2010 10:28:24 -0400, Gustavo Niemeyer
<email address hidden> wrote:

> Review: Needs Fixing
> Cool, nice improvements indeed!
>
> I'm going to restart the review again, since it's a brand new
> implementation. I'll start the sequence on 12 just to avoid conflicts.
>
> [12]
>
> + def __init__(self, deferred, watcher):
> + self.deferred = deferred
> + self.watcher = watcher
> + self.processing = False
> + self.refetch = False
>
> I'm having a slightly hard time reading the code because of the
> generality of the names here. E.g. "refetch" what? "watcher" for what?
> "processing" what? Why is "processing" only turned on inside _get_item?
>

Thanks for the review. Noted on names, i'll update them. fwiw, refetch ->
children, watcher -> queue children, processing -> children

re processing inside of _get_item, i've moved it to _get

> [13]
>
> + def on_queue_items_changed(*args):
> + """Event watcher on queue node child events."""
> + if request.complete or not self._client.connected:
> + return
> +
> + if request.processing:
> + request.refetch = True
> + else:
> + self._get(request)
>
> I think I might be missing some detail about the implementation, perhaps
> due to [12].
>
> Let's say that things happen in the following order:
>
> 1) User calls get()
> 2) get() calls _get()
> 3) _get() hooks _get_item on the result, and the watcher above on changes
> 4) Result is returned, and _get_item() is queued for being called
> 5) watcher from (2) fires, and calls _get() again
> 6) Repeat (3)
> 7) Repeat (4)
>
> In other words, it looks like _get_item might end up being queued
> multiple times even before it started running the first time.

indeed, its possible. i've updated the code to set processing = True in
_get before the client call.

> [14]
>
> I find a bit suspect that we're killing the item from the queue even
> before the user had a chance to know about its existence. This will
> make the queue somewhat prone to items being eaten by crashes.

As it is right now, the queue semantics enforce isolation and minimal
communication with the zookeeper server. I noted the requirement regarding
error handling for queue consumers in the module docstring. If we want
reliable message delivery/processing and want the queue to enforce that
semantic for us, than we'll likely need some aggregate/composite node
structures, cooperative semantics, or data introspection/modification on
queue item nodes, all of which are will incur additional communication
overhead to enforce that semantic. I think those are effectively different
data structures, a queue isn't a nesc. a message queue.

> [15]
>
>
> + # Refetching deferred until we process all the children from
> + # from a get children call.
> + if request.refetch:
> + request.processing = False
> + return self._get(request)
>
> Again, going back to [12], I find the nomenclature a bit non-obvious to
> follow. Why is it not "processing" anymore on a refetch? Looks like...

Read more...

34. By Kapil Thangavelu

set request processing in _get, such that watches that fire before callbacks are processed correctly.

35. By Kapil Thangavelu

reliable queue implementation

36. By Kapil Thangavelu

serialized queue impl

37. By Kapil Thangavelu

refactor tests to abstract queue class, reset refetch flag when refetching children.

38. By Kapil Thangavelu

additional tests for serialized and reliable queues.

39. By Kapil Thangavelu

add test to validate that unexpected errors propogate to the get deferred.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

[12] I added some doc strings to the get request which documents all the variables, if this is insufficient for clarity let me know, and i'll do a round of variable renames.

[14] Per out discussion on reliable processing/consumption of queue items, i've added the reliable queue and serialized queue implementations. A new serialized queue implementation utilizing a lock is something i'll address in a future branch (added bug:592266), the implementation as is suffers from a lot of pointless contention among consumers waking up from watches without any work they can proceed with, nonetheless its functional.

40. By Kapil Thangavelu

additional doc string on persistent v. transient reliable queues

41. By Kapil Thangavelu

rename GetRequest variables for clarity.

42. By Kapil Thangavelu

document each branch of the conditional within the queue child watch.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Thanks Kapil. Queue looks great. Some items about the new stuff:

[15]

+ and processed in the order they where placed in the queue. (TODO) An
+ implementation with less contention between consumers might instead utilize
+ a reliable queue with a lock.

Even without the lock, a better implementation would wait for the removal of the specific -processing file which is holding further action back, rather than any change in the queue. E.g. if the item itself is removed, but not the -processing control file, it will fire off, and fail again. Also (and most importantly), any new items added to the queue will also fire the watch and cause the consumer to refetch the full child list only to find out that it's still being processed by someone else, and it can do nothing about the new item appended at the back of the queue.

[16]

+class SerializedQueueTests(QueueTests):
+
+ queue_factory = SerializedQueue

Don't we need something here? :-)

[17]

Queue should probably use QueueItems too (without delete). That said, since it's just a reference implementation and we'll likely need the reliable version, feel free to ignore this.

[18]

It feels like there's potential for more reuse in the _get_item implementation, but I don't think we should wait this for merging. It's just a good refactoring for a boring moment.

review: Needs Fixing
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Oh, I forgot to mention one thing:

[19]

For rich documentation of parameters, return values, etc, we've been using the epydoc format:

    http://epydoc.sourceforge.net/manual-fields.html

43. By Kapil Thangavelu

doc string cleanup

44. By Kapil Thangavelu

merge trunk

45. By Kapil Thangavelu

if lock acquisition fails, then all instance state associated to the attempt should be cleared, so subsequent acquire attempts can proceed.

46. By Kapil Thangavelu

serialized lock impl aggregating a reliable queue and distributed lock.

47. By Kapil Thangavelu

make queue prefix public, utilize reliable queue tests for serialized queue.

48. By Kapil Thangavelu

extra epydoc docstrings

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

On Fri, 11 Jun 2010 10:57:30 -0400, Gustavo Niemeyer
<email address hidden> wrote:

> Review: Needs Fixing
> Thanks Kapil. Queue looks great. Some items about the new stuff:
>
> [15]
>
> + and processed in the order they where placed in the queue. (TODO) An
> + implementation with less contention between consumers might instead
> utilize
> + a reliable queue with a lock.
>
> Even without the lock, a better implementation would wait for the
> removal of the specific -processing file which is holding further action
> back, rather than any change in the queue. E.g. if the item itself is
> removed, but not the -processing control file, it will fire off, and
> fail again. Also (and most importantly), any new items added to the
> queue will also fire the watch and cause the consumer to refetch the
> full child list only to find out that it's still being processed by
> someone else, and it can do nothing about the new item appended at the
> back of the queue.

That would definitely be a better implementation than what was there
previously wrt to contention. However it would effectively be
reimplementing parts of the lock logic (ie. things like check exists
return value on the processing node is equivalent to checking exists on
the previous lock candidate node). I went ahead and reimplemented the
serialized queue using a subclass of reliable queue that utilizes an
exclusive lock to serialize access.

>
> [16]
>
> +class SerializedQueueTests(QueueTests):
> +
> + queue_factory = SerializedQueue
>
> Don't we need something here? :-)

yeah.. i'm not sure what offhand. The default implementation of all the
queues are ordered. Perhaps a multiple clients, one which gets an item and
then sleeps a close.

>
> [17]
>
> Queue should probably use QueueItems too (without delete). That said,
> since it's just a reference implementation and we'll likely need the
> reliable version, feel free to ignore this.
>

cool, i'm just going to leave it as is. I tried to be clear in the doc
string that its mostly meant as example implementation of the common
zookeeper recipe.

> [18]
>
> It feels like there's potential for more reuse in the _get_item
> implementation, but I don't think we should wait this for merging. It's
> just a good refactoring for a boring moment.
>

definitely, the closures are convient, but make things a bit more
difficult for clean reuse. i took a stab at this yesterday, but shelved it
for now. Effectively we stuff the extra args for callbacks and errbacks
instead of relying on the closure.

49. By Kapil Thangavelu

module docstring

50. By Kapil Thangavelu

serialized queue behavior test

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

The design of SerializedQueue looks pretty nice. Thank you!

A few additional (last?) comments below:

[20]

In SerializedQueue:

            if name.endswith(suffix):
                children[:] = []

Putting some debugging information, I see that this logic is actually used, but I'm a bit confused regarding when it's necessary. The tests which reach this logic also make me a bit worried: test_staged_multiproducer_multiconsumer, which theoretically should be a case which should never see items being processed due to the lock.

[21]

Something that occurred to me while I was reading the code is that the line:

            request.processing_children = False

May still not get executed on certain exists from the logic in the item processing. Should we guarantee that it will necessarily be reset on any exit, so that we never get an unusable object (with an inconsistent state)?

With these details addressed, +1!

review: Approve
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

> The design of SerializedQueue looks pretty nice. Thank you!
>
> A few additional (last?) comments below:
>
> [20]
>
> In SerializedQueue:
>
> if name.endswith(suffix):
> children[:] = []
>
> Putting some debugging information, I see that this logic is actually used,
> but I'm a bit confused regarding when it's necessary. The tests which reach
> this logic also make me a bit worried:
> test_staged_multiproducer_multiconsumer, which theoretically should be a case
> which should never see items being processed due to the lock.

the lock is only held when fetching an item. ie. the lock guards access to the queue items. However the consumer might still be processing the item, and until it does the processing node isn't released. it might be a better semantic (less contention) for serialization if the lock was held for the duration of the item processing. i'll make that change.

>
> [21]
>
> Something that occurred to me while I was reading the code is that the line:
>
> request.processing_children = False
>
> May still not get executed on certain exists from the logic in the item
> processing. Should we guarantee that it will necessarily be reset on any
> exit, so that we never get an unusable object (with an inconsistent state)?
>

is there a scenario where this would be the case? afaics all the failure mode exists are covered by error handlers that will set this flag to false.

51. By Kapil Thangavelu

serializable queue consumer holds lock till item is processed.

52. By Kapil Thangavelu

fix a race condition in reliable queue, if two consumers had fetch an item, and one of the consumers finished processing it before the other created a reservation, the slow consumer would return an item that was deleted. Now the the cycle switches from get->reserve-return to exists->reserve->get->return.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

On Thu, 08 Jul 2010 12:27:42 -0400, Kapil Thangavelu
<email address hidden> wrote:

>> The design of SerializedQueue looks pretty nice. Thank you!
>>
>> A few additional (last?) comments below:
>>
>> [20]
>>
>> In SerializedQueue:
>>
>> if name.endswith(suffix):
>> children[:] = []
>>
>> Putting some debugging information, I see that this logic is actually
>> used,
>> but I'm a bit confused regarding when it's necessary. The tests which
>> reach
>> this logic also make me a bit worried:
>> test_staged_multiproducer_multiconsumer, which theoretically should be
>> a case
>> which should never see items being processed due to the lock.
>
> the lock is only held when fetching an item. ie. the lock guards access
> to the queue items. However the consumer might still be processing the
> item, and until it does the processing node isn't released. it might be
> a better semantic (less contention) for serialization if the lock was
> held for the duration of the item processing. i'll make that change.
>

This is implemented now, the serialized queue works much smoother now with
less contention.
I've removed the children[:] filtering logic that was in use previously.

>
>> [21]
>>
>> Something that occurred to me while I was reading the code is that the
>> line:
>>
>> request.processing_children = False
>>
>> May still not get executed on certain exists from the logic in the item
>> processing. Should we guarantee that it will necessarily be reset on
>> any
>> exit, so that we never get an unusable object (with an inconsistent
>> state)?
>>
>
> is there a scenario where this would be the case? afaics all the failure
> mode exists are covered by error handlers that will set this flag to
> false.
>
after our discussion on irc, i took a look at putting the
request.processing_children into an errback/callback handler, but its not
clear that the same semantic would be achieved, Because one of the
scenarios is that a consumer is waiting on a producer to put items in the
queue, request.processing_children should be false here, but the no
callbacks would have been fired. ie.. a consumer attempts to fetch an item
 from a queue, there are some items in the queue, and the consumer is
processing children, but is competing with other consumers, if the queue
empties before an item is fetched, the consumer should wait on the watch
with processing_children = False with a callback/errback on the get the
flag wouldn't be set appropriately.

cheers,

Kapil

53. By Kapil Thangavelu

propgate unexpected errors when retrieving a node

54. By Kapil Thangavelu

refactor serialized queue to avoid extraneous existance checks and processing nodes.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txzookeeper/lock.py'
2--- txzookeeper/lock.py 2010-06-10 15:44:40 +0000
3+++ txzookeeper/lock.py 2010-07-09 19:52:57 +0000
4@@ -51,17 +51,22 @@
5 "/".join((self.path, self.prefix)),
6 flags=zookeeper.EPHEMERAL|zookeeper.SEQUENCE)
7
8- def on_candidate_create(path):
9- self._candidate_path = path
10- return self._acquire()
11-
12- d.addCallback(on_candidate_create)
13-
14+ d.addCallback(self._on_candidate_create)
15+ d.addErrback(self._on_no_queue_error)
16 return d
17
18- def _acquire(self):
19+ def _on_candidate_create(self, path):
20+ self._candidate_path = path
21+ return self._acquire()
22+
23+ def _on_no_queue_error(self, failure):
24+ self._candidate_path = None
25+ return failure
26+
27+ def _acquire(self, *args):
28 d = self._client.get_children(self.path)
29 d.addCallback(self._check_candidate_nodes)
30+ d.addErrback(self._on_no_queue_error)
31 return d
32
33 def _check_candidate_nodes(self, children):
34
35=== added file 'txzookeeper/queue.py'
36--- txzookeeper/queue.py 1970-01-01 00:00:00 +0000
37+++ txzookeeper/queue.py 2010-07-09 19:52:57 +0000
38@@ -0,0 +1,451 @@
39+"""
40+Several distributed multiprocess queue implementations.
41+
42+The C{Queue} implementation follows closely the apache zookeeper recipe, it
43+provides no guarantees beyond isolation and concurrency of retrieval of items.
44+
45+The C{ReliableQueue} implementation, provides isolation, and concurrency, as
46+well guarantees that if a consumer dies before processing an item, that item is
47+made available to another consumer.
48+
49+The C{SerializedQueue} implementation provides for strict in order processing
50+of items within a queue.
51+"""
52+
53+import zookeeper
54+
55+from twisted.internet.defer import Deferred, fail
56+from twisted.python.failure import Failure
57+from txzookeeper.lock import Lock
58+from txzookeeper.client import ZOO_OPEN_ACL_UNSAFE
59+
60+
61+class Queue(object):
62+ """
63+ Implementation is based off the apache zookeeper Queue recipe.
64+
65+ There are some things to keep in mind when using this queue implementation.
66+ Its primarily to enforce isolation and concurrent access, however it does
67+ not provide for reliable consumption. An error condition in a queue
68+ consumer must requeue the item, else its lost, as its removed from
69+ zookeeper on retrieval in this implementation. This implementation more
70+ closely mirrors the behavior and api of the pythonstandard library Queue,
71+ or multiprocessing.Queue ableit with the caveat of only strings for queue
72+ items.
73+ """
74+
75+ prefix = "entry-"
76+
77+ def __init__(self, path, client, acl=None, persistent=False):
78+ """
79+ @param client: A connected C{ZookeeperClient} instance.
80+ @param path: The path to the queue inthe zookeeper hierarchy.
81+ @param acl: An acl to be used for queue items.
82+ @param persistent: Boolean flag which denotes if items in the queue are
83+ persistent.
84+ """
85+ self._path = path
86+ self._client = client
87+ self._persistent = persistent
88+ if acl is None:
89+ acl = [ZOO_OPEN_ACL_UNSAFE]
90+ self._acl = acl
91+
92+ @property
93+ def path(self):
94+ """Path to the queue."""
95+ return self._path
96+
97+ @property
98+ def persistent(self):
99+ """If the queue is persistent returns True."""
100+ return self._persistent
101+
102+ def get(self):
103+ """
104+ Get and remove an item from the queue. If no item is available
105+ at the moment, a deferred is return that will fire when an item
106+ is available.
107+ """
108+
109+ def on_queue_items_changed(*args):
110+ """Event watcher on queue node child events."""
111+ if request.complete or not self._client.connected:
112+ return # pragma: no cover
113+
114+ if request.processing_children:
115+ # If deferred stack is currently processing a set of children
116+ # defer refetching the children till its done.
117+ request.refetch_children = True
118+ else:
119+ # Else the item get request is just waiting for a watch,
120+ # restart the get.
121+ self._get(request)
122+
123+ request = GetRequest(Deferred(), on_queue_items_changed)
124+ self._get(request)
125+ return request.deferred
126+
127+ def put(self, item):
128+ """
129+ Put an item into the queue.
130+
131+ @param item: String data to be put on the queue.
132+ """
133+ if not isinstance(item, str):
134+ return fail(ValueError("queue items must be strings"))
135+
136+ flags = zookeeper.SEQUENCE
137+ if not self._persistent:
138+ flags = flags|zookeeper.EPHEMERAL
139+
140+ d = self._client.create(
141+ "/".join((self._path, self.prefix)), item, self._acl, flags)
142+ return d
143+
144+ def qsize(self):
145+ """
146+ Return the approximate size of the queue. This value is always
147+ effectively a snapshot. Returns a deferred returning an integer.
148+ """
149+ d = self._client.exists(self._path)
150+
151+ def on_success(stat):
152+ return stat["numChildren"]
153+
154+ d.addCallback(on_success)
155+ return d
156+
157+ def _get(self, request):
158+ request.processing_children = True
159+ d = self._client.get_children(self._path, request.child_watcher)
160+ d.addCallback(self._get_item, request)
161+ return d
162+
163+ def _get_item(self, children, request):
164+
165+ def fetch_node(name):
166+ path = "/".join((self._path, name))
167+ d = self._client.get(path)
168+ d.addCallback(on_get_node_success)
169+ d.addErrback(on_no_node)
170+ return d
171+
172+ def on_get_node_success((data, stat)):
173+ d = self._client.delete("/".join((self._path, name)))
174+ d.addCallback(on_delete_node_success, data)
175+ d.addErrback(on_no_node)
176+ return d
177+
178+ def on_delete_node_success(result_code, data):
179+ request.processing_children = False
180+ request.callback(data)
181+
182+ def on_no_node(failure=None):
183+ if failure and not failure.check(zookeeper.NoNodeException):
184+ request.errback(failure)
185+ return
186+ if children:
187+ name = children.pop(0)
188+ return fetch_node(name)
189+
190+ # Refetching deferred until we process all the children from
191+ # from a get children call.
192+ request.processing_children = False
193+ if request.refetch_children:
194+ request.refetch_children = False
195+ return self._get(request)
196+
197+ if not children:
198+ return on_no_node()
199+
200+ children.sort()
201+ name = children.pop(0)
202+ return fetch_node(name)
203+
204+
205+class GetRequest(object):
206+ """
207+ An encapsulation of a consumer request to fetch an item from the queue.
208+
209+ @refetch_children - boolean field, when true signals that children should
210+ be refetched after processing the current set of children.
211+
212+ @child_watcher -The queue child/item watcher.
213+
214+ @processing_children - Boolean flag, set to true when the last known
215+ children of the queue are being processed. If a watch fires while the
216+ children are being processed it sets the refetch_children flag to true
217+ instead of getting the children immediately.
218+
219+ @deferred - The deferred representing retrieving an item from the queue.
220+ """
221+
222+ def __init__(self, deferred, watcher):
223+ self.deferred = deferred
224+ self.child_watcher = watcher
225+ self.processing_children = False
226+ self.refetch_children = False
227+
228+ @property
229+ def complete(self):
230+ return self.deferred.called
231+
232+ def callback(self, data):
233+ self.deferred.callback(data)
234+
235+ def errback(self, error):
236+ self.deferred.errback(error)
237+
238+
239+class QueueItem(object):
240+ """
241+ An encapsulation of a work item put into a queue. The work item data is
242+ accessible via the data attribute. When the item has been processed by
243+ the consumer, the delete method can be invoked to remove the item
244+ permanently from the queue.
245+
246+ An optional processed callback maybe passed to the constructor that will
247+ be invoked after the node has been processed.
248+ """
249+
250+ def __init__(self, path, data, client, processed_callback=None):
251+ self._path = path
252+ self._data = data
253+ self._client = client
254+ self._processed_callback = processed_callback
255+
256+ @property
257+ def data(self):
258+ return self._data
259+
260+ @property
261+ def path(self):
262+ return self._path
263+
264+ def delete(self):
265+ """
266+ Delete the item node and the item processing node in the queue.
267+ Typically invoked by a queue consumer, to signal succesful processing
268+ of the queue item.
269+ """
270+ d = self._client.delete(self.path)
271+
272+ if self._processed_callback:
273+ d.addCallback(self._processed_callback, self.path)
274+ return d
275+
276+
277+class ReliableQueue(Queue):
278+ """
279+ A distributed queue. It varies from a C{Queue} in that it ensures any
280+ item consumed from the queue is explicitly ack'd by the consumer.
281+ If the consumer dies after retrieving an item before ack'ing the item.
282+ The item will be made available to another consumer. To encapsulate the
283+ acking behavior the queue item data is returned in a C{QueueItem} instance,
284+ with a delete method that will remove it from the queue after processing.
285+
286+ Reliable queues may be persistent or transient. If the queue is durable,
287+ than any item added to the queue must be processed in order to be removed.
288+ If the queue is transient, then any jobs placed in the queue by a client
289+ are removed when the client is closed, regardless of whether the job
290+ has been processed or not.
291+ """
292+
293+ def _item_processed_callback(self, result_code, item_path):
294+ return self._client.delete(item_path+"-processing")
295+
296+ def _filter_children(self, children, suffix="-processing"):
297+ """
298+ Filter any children currently being processed, modified in place.
299+ """
300+ children.sort()
301+ for name in list(children):
302+ # remove any processing nodes and their associated queue item.
303+ if name.endswith(suffix):
304+ children.remove(name)
305+ item_name = name[:-len(suffix)]
306+ if item_name in children:
307+ children.remove(item_name)
308+
309+ def _get_item(self, children, request):
310+
311+ def check_node(name):
312+ """Check the node still exists."""
313+ path = "/".join((self._path, name))
314+ d = self._client.exists(path)
315+ d.addCallback(on_node_exists, path)
316+ d.addErrback(on_reservation_failed)
317+ return d
318+
319+ def on_node_exists(stat, path):
320+ """Reserve the node for consumer processing."""
321+ d = self._client.create(path+"-processing",
322+ flags=zookeeper.EPHEMERAL)
323+ d.addCallback(on_reservation_success, path)
324+ d.addErrback(on_reservation_failed)
325+ return d
326+
327+ def on_reservation_success(processing_path, path):
328+ """Fetch the node data to return"""
329+ d = self._client.get(path)
330+ d.addCallback(on_get_node_success, path)
331+ d.addErrback(on_get_node_failed, path)
332+ return d
333+
334+ def on_get_node_failed(failure, path):
335+ """If we can't fetch the node, delete the processing node."""
336+ d = self._client.delete(path+"-processing")
337+
338+ # propogate unexpected errors appropriately
339+ if not failure.check(zookeeper.NoNodeException):
340+ d.addCallback(lambda x: request.errback(failure))
341+ else:
342+ d.addCallback(on_reservation_failed)
343+ return d
344+
345+ def on_get_node_success((data, stat), path):
346+ """If we got the node, we're done."""
347+ request.processing_children = False
348+ request.callback(
349+ QueueItem(
350+ path, data, self._client, self._item_processed_callback))
351+
352+ def on_reservation_failed(failure=None):
353+ """If we can't get the node or reserve, continue processing
354+ the children."""
355+ if failure and not failure.check(
356+ zookeeper.NodeExistsException, zookeeper.NoNodeException):
357+ request.processing_children = True
358+ request.errback(failure)
359+ return
360+
361+ if children:
362+ name = children.pop(0)
363+ return check_node(name)
364+
365+ # If a watch fired while processing children, process it
366+ # after the children list is exhausted.
367+ request.processing_children = False
368+ if request.refetch_children:
369+ request.refetch_children = False
370+ return self._get(request)
371+
372+ self._filter_children(children)
373+
374+ if not children:
375+ return on_reservation_failed()
376+
377+ name = children.pop(0)
378+ return check_node(name)
379+
380+
381+class SerializedQueue(Queue):
382+ """
383+ A serialized queue ensures even with multiple consumers items are retrieved
384+ and processed in the order they where placed in the queue.
385+
386+ This implementation aggregates a reliable queue, with a lock to provide
387+ for serialized consumer access. The lock is released only when a queue item
388+ has been processed.
389+ """
390+
391+ def __init__(self, path, client, acl=None, persistent=False):
392+ super(SerializedQueue, self).__init__(path, client, acl, persistent)
393+ self._lock = Lock("%s/%s"%(self.path, "_lock"), client)
394+
395+ def _item_processed_callback(self, result_code, item_path):
396+ return self._lock.release()
397+
398+ def _filter_children(self, children, suffix="-processing"):
399+ """
400+ Filter the lock from consideration as an item to be processed.
401+ """
402+ children.sort()
403+ for name in list(children):
404+ if name.startswith('_'):
405+ children.remove(name)
406+
407+ def _on_lock_directory_does_not_exist(self, failure):
408+ """
409+ If the lock directory does not exist, go ahead and create it and
410+ attempt to acquire the lock.
411+ """
412+ failure.trap(zookeeper.NoNodeException)
413+ d = self._client.create(self._lock.path)
414+ d.addBoth(self._on_lock_created_or_exists)
415+ return d
416+
417+ def _on_lock_created_or_exists(self, failure):
418+ """
419+ The lock node creation will either result in success or node exists
420+ error, if a concurrent client created the node first. In either case
421+ we proceed with attempting to acquire the lock.
422+ """
423+ if isinstance(failure, Failure):
424+ failure.trap(zookeeper.NodeExistsException)
425+ d = self._lock.acquire()
426+ return d
427+
428+ def _on_lock_acquired(self, lock):
429+ """
430+ After the exclusive queue lock is acquired, we proceed with an attempt
431+ to fetch an item from the queue.
432+ """
433+ d = super(SerializedQueue, self).get()
434+ return d
435+
436+ def get(self):
437+ """
438+ Get and remove an item from the queue. If no item is available
439+ at the moment, a deferred is return that will fire when an item
440+ is available.
441+ """
442+ d = self._lock.acquire()
443+
444+ d.addErrback(self._on_lock_directory_does_not_exist)
445+ d.addCallback(self._on_lock_acquired)
446+ return d
447+
448+ def _get_item(self, children, request):
449+
450+ def fetch_node(name):
451+ path = "/".join((self._path, name))
452+ d = self._client.get(path)
453+ d.addCallback(on_node_retrieved, path)
454+ d.addErrback(on_reservation_failed)
455+ return d
456+
457+ def on_node_retrieved((data, stat), path):
458+ request.processing_children = False
459+ request.callback(
460+ QueueItem(
461+ path, data, self._client, self._item_processed_callback))
462+
463+ def on_reservation_failed(failure=None):
464+ """If we can't get the node or reserve, continue processing
465+ the children."""
466+ if failure and not failure.check(
467+ zookeeper.NodeExistsException, zookeeper.NoNodeException):
468+ request.processing_children = True
469+ request.errback(failure)
470+ return
471+
472+ if children:
473+ name = children.pop(0)
474+ return fetch_node(name)
475+
476+ # If a watch fired while processing children, process it
477+ # after the children list is exhausted.
478+ request.processing_children = False
479+ if request.refetch_children:
480+ request.refetch_children = False
481+ return self._get(request)
482+
483+ self._filter_children(children)
484+
485+ if not children:
486+ return on_reservation_failed()
487+
488+ name = children.pop(0)
489+ return fetch_node(name)
490
491=== modified file 'txzookeeper/tests/__init__.py'
492--- txzookeeper/tests/__init__.py 2010-06-10 15:46:02 +0000
493+++ txzookeeper/tests/__init__.py 2010-07-09 19:52:57 +0000
494@@ -4,10 +4,6 @@
495 from twisted.trial.unittest import TestCase
496 from mocker import MockerTestCase
497
498-#from txzookeeper.client import Wrapper
499-
500-#zookeeper = Wrapper(zookeeper)
501-
502 class ZookeeperTestCase(TestCase, MockerTestCase):
503
504 def setUp(self):
505
506=== modified file 'txzookeeper/tests/test_client.py'
507--- txzookeeper/tests/test_client.py 2010-06-01 14:26:45 +0000
508+++ txzookeeper/tests/test_client.py 2010-07-09 19:52:57 +0000
509@@ -342,6 +342,33 @@
510 d.addCallback(verify_exists)
511 return d
512
513+ def test_exists_with_watcher_and_close(self):
514+ """
515+ Closing a connection with an watch outstanding behaves correctly.
516+ """
517+ d = self.client.connect()
518+ zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
519+
520+ def node_watcher(event_type, state, path):
521+ client = getattr(self, "client", None)
522+ if client is not None and client.connected:
523+ self.fail("Client should be disconnected")
524+
525+ def create_node(client):
526+ return client.create("/syracuse")
527+
528+ def check_exists(path):
529+ # shouldn't fire till unit test cleanup
530+ return self.client.exists(path, node_watcher)
531+
532+ def verify_exists(result):
533+ self.assertTrue(result)
534+
535+ d.addCallback(create_node)
536+ d.addCallback(check_exists)
537+ d.addCallback(verify_exists)
538+ return d
539+
540 def test_exists_with_nonexistant_watcher(self):
541 """
542 The exists method can also be used to set an optional watcher on a
543
544=== modified file 'txzookeeper/tests/test_lock.py'
545--- txzookeeper/tests/test_lock.py 2010-06-10 15:44:40 +0000
546+++ txzookeeper/tests/test_lock.py 2010-07-09 19:52:57 +0000
547@@ -1,4 +1,5 @@
548
549+from zookeeper import NoNodeException
550 from mocker import ANY
551 from twisted.internet.defer import (
552 inlineCallbacks, returnValue, Deferred, succeed)
553@@ -81,6 +82,22 @@
554 yield self.failUnlessFailure(lock.acquire(), LockError)
555
556 @inlineCallbacks
557+ def test_acquire_after_error(self):
558+ """
559+ Any instance state associated with a failed acquired should be cleared
560+ on error, allowing subsequent to succeed.
561+ """
562+ client = yield self.open_client()
563+ path = "/lock-test-acquire-after-error"
564+ lock = Lock(path, client)
565+ d = lock.acquire()
566+ self.failUnlessFailure(d, NoNodeException)
567+ yield d
568+ yield client.create(path)
569+ yield lock.acquire()
570+ self.assertEqual(lock.acquired, True)
571+
572+ @inlineCallbacks
573 def test_error_on_acquire_acquiring(self):
574 """
575 Attempting to acquire the lock while an attempt is already in progress,
576
577=== added file 'txzookeeper/tests/test_queue.py'
578--- txzookeeper/tests/test_queue.py 1970-01-01 00:00:00 +0000
579+++ txzookeeper/tests/test_queue.py 2010-07-09 19:52:57 +0000
580@@ -0,0 +1,392 @@
581+
582+from zookeeper import NoNodeException
583+from twisted.internet.defer import (
584+ inlineCallbacks, returnValue, DeferredList, Deferred, succeed, fail)
585+
586+from txzookeeper import ZookeeperClient
587+from txzookeeper.client import NotConnectedException
588+from txzookeeper.queue import Queue, ReliableQueue, SerializedQueue, QueueItem
589+from txzookeeper.tests import ZookeeperTestCase, utils
590+
591+from mocker import ANY
592+
593+
594+class QueueTests(ZookeeperTestCase):
595+
596+ queue_factory = Queue
597+
598+ def setUp(self):
599+ super(QueueTests, self).setUp()
600+ self.clients = []
601+
602+ def tearDown(self):
603+ cleanup = False
604+
605+ for client in self.clients:
606+ if not cleanup and client.connected:
607+ utils.deleteTree(handle=client.handle)
608+ cleanup = True
609+ if client.connected:
610+ client.close()
611+ super(QueueTests, self).tearDown()
612+
613+ def compare_data(self, data, item):
614+ if isinstance(item, QueueItem):
615+ self.assertEqual(data, item.data)
616+ else:
617+ self.assertEqual(data, item)
618+
619+ def consume_item(self, item):
620+ if isinstance(item, QueueItem):
621+ return item.delete(), item.data
622+ return None, item
623+
624+ @inlineCallbacks
625+ def open_client(self, credentials=None):
626+ """
627+ Open a zookeeper client, optionally authenticating with the
628+ credentials if given.
629+ """
630+ client = ZookeeperClient("127.0.0.1:2181")
631+ self.clients.append(client)
632+ yield client.connect()
633+ if credentials:
634+ d = client.add_auth("digest", credentials)
635+ # hack to keep auth fast
636+ yield client.exists("/")
637+ yield d
638+ returnValue(client)
639+
640+ def test_path_property(self):
641+ """
642+ The queue has a property that can be used to introspect its
643+ path in read only manner.
644+ """
645+ q = self.queue_factory("/moon", None)
646+ self.assertEqual(q.path, "/moon")
647+
648+ def test_persistent_property(self):
649+ """
650+ The queue has a property that can be used to introspect
651+ whether or not the queue entries are persistent.
652+ """
653+ q = self.queue_factory("/moon", None, persistent=True)
654+ self.assertEqual(q.persistent, True)
655+
656+ @inlineCallbacks
657+ def test_put_item(self):
658+ """
659+ An item can be put on the queue, and is stored in a node in
660+ queue's directory.
661+ """
662+ client = yield self.open_client()
663+ path = yield client.create("/queue-test")
664+ queue = self.queue_factory(path, client)
665+ item = "transform image bluemarble.jpg"
666+ yield queue.put(item)
667+ children = yield client.get_children(path)
668+ self.assertEqual(len(children), 1)
669+ data, stat = yield client.get("/".join((path, children[0])))
670+ self.compare_data(data, item)
671+
672+ @inlineCallbacks
673+ def test_qsize(self):
674+ """
675+ The client implements a method which returns an unreliable
676+ approximation of the number of items in the queue (mirrors api
677+ of Queue.Queue), its unreliable only in that the value represents
678+ a temporal snapshot of the value at the time it was requested,
679+ not its current value.
680+ """
681+ client = yield self.open_client()
682+ path = yield client.create("/test-qsize")
683+ queue = self.queue_factory(path, client)
684+
685+ yield queue.put("abc")
686+ size = yield queue.qsize()
687+ self.assertTrue(size, 1)
688+
689+ yield queue.put("bcd")
690+ size = yield queue.qsize()
691+ self.assertTrue(size, 2)
692+
693+ yield queue.get()
694+ size = yield queue.qsize()
695+ self.assertTrue(size, 1)
696+
697+ @inlineCallbacks
698+ def test_invalid_put_item(self):
699+ """
700+ The queue only accepts string items.
701+ """
702+ client = yield self.open_client()
703+ queue = self.queue_factory("/unused", client)
704+ self.failUnlessFailure(queue.put(123), ValueError)
705+
706+ @inlineCallbacks
707+ def test_get_with_invalid_queue(self):
708+ """
709+ If the queue hasn't been created an unknown node exception is raised
710+ on get.
711+ """
712+ client = yield self.open_client()
713+ queue = self.queue_factory("/unused", client)
714+ yield self.failUnlessFailure(queue.put("abc"), NoNodeException)
715+
716+ @inlineCallbacks
717+ def test_put_with_invalid_queue(self):
718+ """
719+ If the queue hasn't been created an unknown node exception is raised
720+ on put.
721+ """
722+ client = yield self.open_client()
723+ queue = self.queue_factory("/unused", client)
724+ yield self.failUnlessFailure(queue.put("abc"), NoNodeException)
725+
726+ @inlineCallbacks
727+ def test_unexpected_error_during_item_retrieval(self):
728+ """
729+ If an unexpected error occurs when reserving an item, the error is
730+ passed up to the get deferred's errback method.
731+ """
732+ test_client = yield self.open_client()
733+ path = yield test_client.create("/reliable-queue-test")
734+
735+ # setup the test scenario
736+ mock_client = self.mocker.patch(test_client)
737+ mock_client.get_children(path, ANY)
738+ self.mocker.result(succeed(["entry-000000"]))
739+
740+ item_path = "%s/%s"%(path, "entry-000000")
741+ mock_client.get(item_path)
742+ self.mocker.result(fail(SyntaxError("x")))
743+ self.mocker.replay()
744+
745+ # odd behavior, this should return a failure, as above, but it returns
746+ # None
747+ d = self.queue_factory(path, mock_client).get()
748+ assert d
749+ self.failUnlessFailure(d, SyntaxError)
750+ yield d
751+
752+ @inlineCallbacks
753+ def test_get_and_put(self):
754+ """
755+ Get can also be used on empty queues and returns a deferred that fires
756+ whenever an item is has been retrieved from the queue.
757+ """
758+ client = yield self.open_client()
759+ path = yield client.create("/queue-wait-test")
760+ data = "zebra moon"
761+ queue = self.queue_factory(path, client)
762+ d = queue.get()
763+
764+ @inlineCallbacks
765+ def push_item():
766+ queue = self.queue_factory(path, client)
767+ yield queue.put(data)
768+
769+ from twisted.internet import reactor
770+ reactor.callLater(0.1, push_item)
771+
772+ item = yield d
773+ self.compare_data(data, item)
774+
775+ @inlineCallbacks
776+ def test_interleaved_multiple_consumers_wait(self):
777+ """
778+ Multiple consumers and a producer adding and removing items on the
779+ the queue concurrently.
780+ """
781+ test_client = yield self.open_client()
782+ path = yield test_client.create("/multi-consumer-wait-test")
783+ results = []
784+
785+ @inlineCallbacks
786+ def producer(item_count):
787+ from twisted.internet import reactor
788+ client = yield self.open_client()
789+ queue = self.queue_factory(path, client)
790+
791+ items = []
792+ producer_done = Deferred()
793+
794+ def iteration(i):
795+ if len(items) == (item_count-1):
796+ return producer_done.callback(None)
797+ items.append(i)
798+ queue.put(str(i))
799+
800+ for i in range(item_count):
801+ reactor.callLater(i*0.05, iteration, i)
802+ yield producer_done
803+ returnValue(items)
804+
805+ @inlineCallbacks
806+ def consumer(item_count):
807+ client = yield self.open_client()
808+ queue = self.queue_factory(path, client)
809+ for i in range(item_count):
810+ try:
811+ data = yield queue.get()
812+ d, data = self.consume_item(data)
813+ if d:
814+ yield d
815+ except NotConnectedException:
816+ # when the test closes, we need to catch this
817+ # as one of the producers will likely hang.
818+ returnValue(len(results))
819+ results.append((client.handle, data))
820+
821+ returnValue(len(results))
822+
823+ yield DeferredList(
824+ [DeferredList([consumer(3), consumer(2)], fireOnOneCallback=1),
825+ producer(6)])
826+ # as soon as the producer and either consumer is complete than the test
827+ # is done. Thus the only assertion we can make is the result is the
828+ # size of at least the smallest consumer.
829+ self.assertTrue(len(results) >= 2)
830+
831+ @inlineCallbacks
832+ def test_staged_multiproducer_multiconsumer(self):
833+ """
834+ A real world scenario test, A set of producers filling a queue with
835+ items, and then a set of concurrent consumers pulling from the queue
836+ till its empty. The consumers use a non blocking get (defer raises
837+ exception on empty).
838+ """
839+ test_client = yield self.open_client()
840+ path = yield test_client.create("/multi-prod-cons")
841+
842+ consume_results = []
843+ produce_results = []
844+
845+ @inlineCallbacks
846+ def producer(start, offset):
847+ client = yield self.open_client()
848+ q = self.queue_factory(path, client)
849+ for i in range(start, start+offset):
850+ yield q.put(str(i))
851+ produce_results.append(str(i))
852+
853+ @inlineCallbacks
854+ def consumer(max):
855+ client = yield self.open_client()
856+ q = self.queue_factory(path, client)
857+ attempts = range(max)
858+ for el in attempts:
859+ value = yield q.get()
860+ d, value = self.consume_item(value)
861+ if d:
862+ yield d
863+ consume_results.append(value)
864+ returnValue(True)
865+
866+ # two producers 20 items total
867+ yield DeferredList(
868+ [producer(0, 10), producer(10, 10)])
869+
870+ children = yield test_client.get_children(path)
871+ self.assertEqual(len(children), 20)
872+
873+ yield DeferredList(
874+ [consumer(8), consumer(8), consumer(4)])
875+
876+ err = set(produce_results)-set(consume_results)
877+ self.assertFalse(err)
878+
879+ self.assertEqual(len(consume_results), len(produce_results))
880+
881+
882+class ReliableQueueTests(QueueTests):
883+
884+ queue_factory = ReliableQueue
885+
886+ @inlineCallbacks
887+ def test_unprocessed_item_reappears(self):
888+ """
889+ If a queue consumer exits before processing an item, then
890+ the item will become visible to other queue consumers.
891+ """
892+ test_client = yield self.open_client()
893+ path = yield test_client.create("/reliable-queue-test")
894+
895+ data = "rabbit stew"
896+ queue = self.queue_factory(path, test_client)
897+ yield queue.put(data)
898+
899+ test_client2 = yield self.open_client()
900+ queue2 = self.queue_factory(path, test_client2)
901+ item = yield queue2.get()
902+ self.compare_data(data, item)
903+
904+ d = queue.get()
905+ yield test_client2.close()
906+
907+ item = yield d
908+ self.compare_data(data, item)
909+
910+ @inlineCallbacks
911+ def test_processed_item_removed(self):
912+ """
913+ If a client processes an item, than that item is removed from the queue
914+ permanently.
915+ """
916+ test_client = yield self.open_client()
917+ path = yield test_client.create("/reliable-queue-test")
918+
919+ data = "rabbit stew"
920+ queue = self.queue_factory(path, test_client)
921+ yield queue.put(data)
922+ item = yield queue.get()
923+ self.compare_data(data, item)
924+ yield item.delete()
925+ yield test_client.close()
926+
927+ test_client2 = yield self.open_client()
928+ children = yield test_client2.get_children(path)
929+ children = [c for c in children if c.startswith(queue.prefix)]
930+ self.assertFalse(bool(children))
931+
932+
933+class SerializedQueueTests(ReliableQueueTests):
934+
935+ queue_factory = SerializedQueue
936+
937+ @inlineCallbacks
938+ def test_serialized_behavior(self):
939+ """
940+ The serialized queue behavior is such that even with multiple
941+ consumers, items are processed in order.
942+ """
943+ test_client = yield self.open_client()
944+ path = yield test_client.create("/serialized-queue-test")
945+
946+ queue = self.queue_factory(path, test_client, persistent=True)
947+
948+ yield queue.put("a")
949+ yield queue.put("b")
950+
951+ test_client2 = yield self.open_client()
952+ queue2 = self.queue_factory(path, test_client2, persistent=True)
953+
954+ d = queue2.get()
955+
956+ def on_get_item_sleep_and_close(item):
957+ """Close the connection after we have the item."""
958+ from twisted.internet import reactor
959+ reactor.callLater(0.1, test_client2.close)
960+ return item
961+
962+ d.addCallback(on_get_item_sleep_and_close)
963+
964+ # fetch the item from queue2
965+ item1 = yield d
966+ # fetch the item from queue1, this will not get "b", because client2 is
967+ # still processing "a". When client2 closes its connection, client1
968+ # will get item "a"
969+ item2 = yield queue.get()
970+
971+ self.compare_data("a", item2)
972+ self.assertEqual(item1.data, item2.data)
973
974=== modified file 'txzookeeper/todo.txt'
975--- txzookeeper/todo.txt 2010-05-08 12:14:56 +0000
976+++ txzookeeper/todo.txt 2010-07-09 19:52:57 +0000
977@@ -2,10 +2,10 @@
978 bugs to file upstream
979
980 - you can set acl on a non existant node.
981- - memory leak every api invocation.
982+ - memory leak every api invocation. [really? need some measurements here]
983
984 observed while trying xtest_get_children_with_watcher
985
986- - async get children with watcher seems broken.
987+ - async get children with watcher seems broken. [772 - fixed upstream]
988 - segfault if close during completion.
989 - getting a watch notification when closing a connection, segfaults.

Subscribers

People subscribed via source and target branches

to all changes: