Merge lp:~hazmat/txzookeeper/distributed-queue into lp:txzookeeper

Proposed by Kapil Thangavelu
Status: Merged
Merged at revision: 24
Proposed branch: lp:~hazmat/txzookeeper/distributed-queue
Merge into: lp:txzookeeper
Diff against target: 989 lines (+901/-13)
7 files modified
txzookeeper/lock.py (+12/-7)
txzookeeper/queue.py (+451/-0)
txzookeeper/tests/__init__.py (+0/-4)
txzookeeper/tests/test_client.py (+27/-0)
txzookeeper/tests/test_lock.py (+17/-0)
txzookeeper/tests/test_queue.py (+392/-0)
txzookeeper/todo.txt (+2/-2)
To merge this branch: bzr merge lp:~hazmat/txzookeeper/distributed-queue
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Approve
Review via email: mp+25712@code.launchpad.net

Description of the change

distributed multi producer / multi consumer queue implementation.

To post a comment you must log in.
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (4.0 KiB)

[1]

+ self.path = path
+ self.client = client
+ self.persistent = persistent

I tend to prefer keeping things private, unless they are actually required by external clients, or would be obviously important to know about. This way it's easy to tell what's the "trusted" API clients are supposed to rely on, and also makes it more comfortable when changing the interface (anything private can be removed/renamed/replaced).

How do you feel about this in general?

[2]

+ self, path, client, acl=[ZOO_OPEN_ACL_UNSAFE], persistent=False):

Also a pretty general point which I'm making mostly to synchronize our thinking, rather than as a *required* change here:

I tend to prefer using the style of acl=None in the initialization of default parameters, and then process it internally in the constructor (if acl is None, acl = ... or similar).

While the result is obviously the same here, the main distinction is that it enables code using the function or class to say "I don't have anything special in this parameter.. just do your default.", which gets pretty tricky when the default value is in the keyword argument constructor itself.

[3]

+ return self._get(wait=True)
+
+ get_wait = get

Do we need all these alternatives? Part of the beauty of Twisted is that we don't really have to care about what "waiting" means.

I suggest we have a single interface for getting, and it will always return a deferred which will fire once an item is available, no matter what. This will also simplify a bit the logic elsewhere (in _refill, _get_item, etc).

[4]

+ d = self.client.create(
+ "/".join((self.path, self.prefix)), item, self._acl, flags)
+ return d

Nice. It feels pretty cool to be able to wait on a put this way.

[5]

+ d = self.client.exists(self.path)
+
+ def on_success(stat):
+ return stat["numChildren"]

Oh, interesting trick! I would imagine that getting the full list would be required, but this is of course a lot better.

[6]

+ Fetch the node data in the queue directory for the given node name. If
+ wait is
(...)
+ # tests. Instead we process our entire our node cache before
+ # proceeding.

Couple of comment details: "is ..." and "our entire our".

[7]

+ Refetch the queue children, setting a watch as needed, and invalidating
+ any previous children entries queue.

It would be nice to have a higher level description of what the algorithm is actually doing. E.g. what is the children entries queue about, what happens when it's empty, or when two different consumers have a partially overlapping queue, how are items consumed, etc.

[8]

In on_queue_items_changed():

+ self._cached_entries = []
+ d = self._refill(wait=wait)

Why is the cache being reset right after we're told changes have happened? Shouldn't this happen once we actually get the new list of children?

[9]

+ d = self._refill(wait=wait)
(...)
+ d = self.client.get_children(
+ self.path, on_queue_items_changed)

It'd be good to have more descriptive names for these variables, since one of th...

Read more...

25. By Kapil Thangavelu

address some review comments (6, 8, 10)

26. By Kapil Thangavelu

remove method alias get_wait for get, review #3

27. By Kapil Thangavelu

use private variables for internals, introduce read only properties for accessors to public attributes, per review comment #1

28. By Kapil Thangavelu

remove acl mutable default arg, force queue prefix to private

Revision history for this message
Kapil Thangavelu (hazmat) wrote :
Download full text (5.9 KiB)

On Fri, 28 May 2010 16:30:42 -0400, Gustavo Niemeyer
<email address hidden> wrote:

> [1]
>
> + self.path = path
> + self.client = client
> + self.persistent = persistent
>
> I tend to prefer keeping things private, unless they are actually
> required by external clients, or would be obviously important to know
> about. This way it's easy to tell what's the "trusted" API clients are
> supposed to rely on, and also makes it more comfortable when changing
> the interface (anything private can be removed/renamed/replaced).
>
> How do you feel about this in general?

In general i'm fine with it. i have mixed feeling about private is spelled
in some cases, i've dealt with libraries that take private (double under
style) to levels that made clean reuse or debugging difficult. In this
particular case, i'm fine with client being private (single underscore),
and path and persistent being read only properties and have made those
changes. In general its okay as long its not a double underscore and there
isn't a legitimate reason for the its use by consumer, if the latter i'd
tend to prefer at least read only property access if direct attribute
access would be dangerous.

>
>
> [2]
>
> + self, path, client, acl=[ZOO_OPEN_ACL_UNSAFE],
> persistent=False):
>
> Also a pretty general point which I'm making mostly to synchronize our
> thinking, rather than as a *required* change here:
>
> I tend to prefer using the style of acl=None in the initialization of
> default parameters, and then process it internally in the constructor
> (if acl is None, acl = ... or similar).
>
> While the result is obviously the same here, the main distinction is
> that it enables code using the function or class to say "I don't have
> anything special in this parameter.. just do your default.", which gets
> pretty tricky when the default value is in the keyword argument
> constructor itself.
>

i'm fine with that, to me its a minor tradeoff to quick reading the
function signature, but then again there's a whole class of bugs around
mutable default args. changed.

>
> [3]
>
> + return self._get(wait=True)
> +
> + get_wait = get
>
> Do we need all these alternatives? Part of the beauty of Twisted is
> that we don't really have to care about what "waiting" means.

right, originally i was trying to match signature provided by the queue
implementation. i think this alias is an artifact of when i changing the
defaults behavior of get (used to raise empty error). its removed now.

>
> I suggest we have a single interface for getting, and it will always
> return a deferred which will fire once an item is available, no matter
> what. This will also simplify a bit the logic elsewhere (in _refill,
> _get_item, etc).
>

hmmm.. i like having the option of both apis and the synchronicity to the
Queue.Queue api. alternatively perhaps a timeout option on the get api
would also suffice (also in Queue.Queue api). the logic simplification
 from the method removal is pretty minor, maybe four lines of code total
in the methods mentioned.

>
>
> [6]
>
> + Fetch the node data in the queue...

Read more...

29. By Kapil Thangavelu

remove get_nowait api

30. By Kapil Thangavelu

use trap for no node error handler, remove unused import queue.empty

31. By Kapil Thangavelu

remove old get_nowait tests.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

[3] I went ahead and removed the get_nowait api as per our discussion. in future the use case can be addressed with a timeout.

[9] no more scope name conflict on deferreds.

32. By Kapil Thangavelu

refactor distributed queue implementation

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

i refactored the implementation it no longer uses any cached values, and is more efficient about establishing watches on the queue.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Cool, nice improvements indeed!

I'm going to restart the review again, since it's a brand new implementation. I'll start the sequence on 12 just to avoid conflicts.

[12]

+ def __init__(self, deferred, watcher):
+ self.deferred = deferred
+ self.watcher = watcher
+ self.processing = False
+ self.refetch = False

I'm having a slightly hard time reading the code because of the generality of the names here. E.g. "refetch" what? "watcher" for what? "processing" what? Why is "processing" only turned on inside _get_item?

[13]

+ def on_queue_items_changed(*args):
+ """Event watcher on queue node child events."""
+ if request.complete or not self._client.connected:
+ return
+
+ if request.processing:
+ request.refetch = True
+ else:
+ self._get(request)

I think I might be missing some detail about the implementation, perhaps due to [12].

Let's say that things happen in the following order:

1) User calls get()
2) get() calls _get()
3) _get() hooks _get_item on the result, and the watcher above on changes
4) Result is returned, and _get_item() is queued for being called
5) watcher from (2) fires, and calls _get() again
6) Repeat (3)
7) Repeat (4)

In other words, it looks like _get_item might end up being queued multiple times even before it started running the first time.

[14]

I find a bit suspect that we're killing the item from the queue even before the user had a chance to know about its existence. This will make the queue somewhat prone to items being eaten by crashes.

[15]

+ # Refetching deferred until we process all the children from
+ # from a get children call.
+ if request.refetch:
+ request.processing = False
+ return self._get(request)

Again, going back to [12], I find the nomenclature a bit non-obvious to follow. Why is it not "processing" anymore on a refetch? Looks like there's a very tight relationship with the get children watcher in this logic, even though that's not being made explicit by names nor comments.

Also, I believe there's an issue here. Let's say things happen in this order:

1) children is [], refetch is False, request.processing is True
2) on_no_node_error is called on a NoNodeException
3) Due to the state, the exception is swallowed and nothing really changes
4) on_queue_items_changed is called
5) Given the state, refetch is made True
6) and... nothing else ever happens?

I think making names more explicit might help avoiding issues like these, since it'll be easier to reason about the logic there and know out of gut feeling when things aren't happening the way they should.

review: Needs Fixing
33. By Kapil Thangavelu

request.processing should be false anytime there are no children, not just on refetch.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :
Download full text (4.2 KiB)

On Tue, 08 Jun 2010 10:28:24 -0400, Gustavo Niemeyer
<email address hidden> wrote:

> Review: Needs Fixing
> Cool, nice improvements indeed!
>
> I'm going to restart the review again, since it's a brand new
> implementation. I'll start the sequence on 12 just to avoid conflicts.
>
> [12]
>
> + def __init__(self, deferred, watcher):
> + self.deferred = deferred
> + self.watcher = watcher
> + self.processing = False
> + self.refetch = False
>
> I'm having a slightly hard time reading the code because of the
> generality of the names here. E.g. "refetch" what? "watcher" for what?
> "processing" what? Why is "processing" only turned on inside _get_item?
>

Thanks for the review. Noted on names, i'll update them. fwiw, refetch ->
children, watcher -> queue children, processing -> children

re processing inside of _get_item, i've moved it to _get

> [13]
>
> + def on_queue_items_changed(*args):
> + """Event watcher on queue node child events."""
> + if request.complete or not self._client.connected:
> + return
> +
> + if request.processing:
> + request.refetch = True
> + else:
> + self._get(request)
>
> I think I might be missing some detail about the implementation, perhaps
> due to [12].
>
> Let's say that things happen in the following order:
>
> 1) User calls get()
> 2) get() calls _get()
> 3) _get() hooks _get_item on the result, and the watcher above on changes
> 4) Result is returned, and _get_item() is queued for being called
> 5) watcher from (2) fires, and calls _get() again
> 6) Repeat (3)
> 7) Repeat (4)
>
> In other words, it looks like _get_item might end up being queued
> multiple times even before it started running the first time.

indeed, its possible. i've updated the code to set processing = True in
_get before the client call.

> [14]
>
> I find a bit suspect that we're killing the item from the queue even
> before the user had a chance to know about its existence. This will
> make the queue somewhat prone to items being eaten by crashes.

As it is right now, the queue semantics enforce isolation and minimal
communication with the zookeeper server. I noted the requirement regarding
error handling for queue consumers in the module docstring. If we want
reliable message delivery/processing and want the queue to enforce that
semantic for us, than we'll likely need some aggregate/composite node
structures, cooperative semantics, or data introspection/modification on
queue item nodes, all of which are will incur additional communication
overhead to enforce that semantic. I think those are effectively different
data structures, a queue isn't a nesc. a message queue.

> [15]
>
>
> + # Refetching deferred until we process all the children from
> + # from a get children call.
> + if request.refetch:
> + request.processing = False
> + return self._get(request)
>
> Again, going back to [12], I find the nomenclature a bit non-obvious to
> follow. Why is it not "processing" anymore on a refetch? Looks like...

Read more...

34. By Kapil Thangavelu

set request processing in _get, such that watches that fire before callbacks are processed correctly.

35. By Kapil Thangavelu

reliable queue implementation

36. By Kapil Thangavelu

serialized queue impl

37. By Kapil Thangavelu

refactor tests to abstract queue class, reset refetch flag when refetching children.

38. By Kapil Thangavelu

additional tests for serialized and reliable queues.

39. By Kapil Thangavelu

add test to validate that unexpected errors propogate to the get deferred.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

[12] I added some doc strings to the get request which documents all the variables, if this is insufficient for clarity let me know, and i'll do a round of variable renames.

[14] Per out discussion on reliable processing/consumption of queue items, i've added the reliable queue and serialized queue implementations. A new serialized queue implementation utilizing a lock is something i'll address in a future branch (added bug:592266), the implementation as is suffers from a lot of pointless contention among consumers waking up from watches without any work they can proceed with, nonetheless its functional.

40. By Kapil Thangavelu

additional doc string on persistent v. transient reliable queues

41. By Kapil Thangavelu

rename GetRequest variables for clarity.

42. By Kapil Thangavelu

document each branch of the conditional within the queue child watch.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Thanks Kapil. Queue looks great. Some items about the new stuff:

[15]

+ and processed in the order they where placed in the queue. (TODO) An
+ implementation with less contention between consumers might instead utilize
+ a reliable queue with a lock.

Even without the lock, a better implementation would wait for the removal of the specific -processing file which is holding further action back, rather than any change in the queue. E.g. if the item itself is removed, but not the -processing control file, it will fire off, and fail again. Also (and most importantly), any new items added to the queue will also fire the watch and cause the consumer to refetch the full child list only to find out that it's still being processed by someone else, and it can do nothing about the new item appended at the back of the queue.

[16]

+class SerializedQueueTests(QueueTests):
+
+ queue_factory = SerializedQueue

Don't we need something here? :-)

[17]

Queue should probably use QueueItems too (without delete). That said, since it's just a reference implementation and we'll likely need the reliable version, feel free to ignore this.

[18]

It feels like there's potential for more reuse in the _get_item implementation, but I don't think we should wait this for merging. It's just a good refactoring for a boring moment.

review: Needs Fixing
Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

Oh, I forgot to mention one thing:

[19]

For rich documentation of parameters, return values, etc, we've been using the epydoc format:

    http://epydoc.sourceforge.net/manual-fields.html

43. By Kapil Thangavelu

doc string cleanup

44. By Kapil Thangavelu

merge trunk

45. By Kapil Thangavelu

if lock acquisition fails, then all instance state associated to the attempt should be cleared, so subsequent acquire attempts can proceed.

46. By Kapil Thangavelu

serialized lock impl aggregating a reliable queue and distributed lock.

47. By Kapil Thangavelu

make queue prefix public, utilize reliable queue tests for serialized queue.

48. By Kapil Thangavelu

extra epydoc docstrings

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

On Fri, 11 Jun 2010 10:57:30 -0400, Gustavo Niemeyer
<email address hidden> wrote:

> Review: Needs Fixing
> Thanks Kapil. Queue looks great. Some items about the new stuff:
>
> [15]
>
> + and processed in the order they where placed in the queue. (TODO) An
> + implementation with less contention between consumers might instead
> utilize
> + a reliable queue with a lock.
>
> Even without the lock, a better implementation would wait for the
> removal of the specific -processing file which is holding further action
> back, rather than any change in the queue. E.g. if the item itself is
> removed, but not the -processing control file, it will fire off, and
> fail again. Also (and most importantly), any new items added to the
> queue will also fire the watch and cause the consumer to refetch the
> full child list only to find out that it's still being processed by
> someone else, and it can do nothing about the new item appended at the
> back of the queue.

That would definitely be a better implementation than what was there
previously wrt to contention. However it would effectively be
reimplementing parts of the lock logic (ie. things like check exists
return value on the processing node is equivalent to checking exists on
the previous lock candidate node). I went ahead and reimplemented the
serialized queue using a subclass of reliable queue that utilizes an
exclusive lock to serialize access.

>
> [16]
>
> +class SerializedQueueTests(QueueTests):
> +
> + queue_factory = SerializedQueue
>
> Don't we need something here? :-)

yeah.. i'm not sure what offhand. The default implementation of all the
queues are ordered. Perhaps a multiple clients, one which gets an item and
then sleeps a close.

>
> [17]
>
> Queue should probably use QueueItems too (without delete). That said,
> since it's just a reference implementation and we'll likely need the
> reliable version, feel free to ignore this.
>

cool, i'm just going to leave it as is. I tried to be clear in the doc
string that its mostly meant as example implementation of the common
zookeeper recipe.

> [18]
>
> It feels like there's potential for more reuse in the _get_item
> implementation, but I don't think we should wait this for merging. It's
> just a good refactoring for a boring moment.
>

definitely, the closures are convient, but make things a bit more
difficult for clean reuse. i took a stab at this yesterday, but shelved it
for now. Effectively we stuff the extra args for callbacks and errbacks
instead of relying on the closure.

49. By Kapil Thangavelu

module docstring

50. By Kapil Thangavelu

serialized queue behavior test

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

The design of SerializedQueue looks pretty nice. Thank you!

A few additional (last?) comments below:

[20]

In SerializedQueue:

            if name.endswith(suffix):
                children[:] = []

Putting some debugging information, I see that this logic is actually used, but I'm a bit confused regarding when it's necessary. The tests which reach this logic also make me a bit worried: test_staged_multiproducer_multiconsumer, which theoretically should be a case which should never see items being processed due to the lock.

[21]

Something that occurred to me while I was reading the code is that the line:

            request.processing_children = False

May still not get executed on certain exists from the logic in the item processing. Should we guarantee that it will necessarily be reset on any exit, so that we never get an unusable object (with an inconsistent state)?

With these details addressed, +1!

review: Approve
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

> The design of SerializedQueue looks pretty nice. Thank you!
>
> A few additional (last?) comments below:
>
> [20]
>
> In SerializedQueue:
>
> if name.endswith(suffix):
> children[:] = []
>
> Putting some debugging information, I see that this logic is actually used,
> but I'm a bit confused regarding when it's necessary. The tests which reach
> this logic also make me a bit worried:
> test_staged_multiproducer_multiconsumer, which theoretically should be a case
> which should never see items being processed due to the lock.

the lock is only held when fetching an item. ie. the lock guards access to the queue items. However the consumer might still be processing the item, and until it does the processing node isn't released. it might be a better semantic (less contention) for serialization if the lock was held for the duration of the item processing. i'll make that change.

>
> [21]
>
> Something that occurred to me while I was reading the code is that the line:
>
> request.processing_children = False
>
> May still not get executed on certain exists from the logic in the item
> processing. Should we guarantee that it will necessarily be reset on any
> exit, so that we never get an unusable object (with an inconsistent state)?
>

is there a scenario where this would be the case? afaics all the failure mode exists are covered by error handlers that will set this flag to false.

51. By Kapil Thangavelu

serializable queue consumer holds lock till item is processed.

52. By Kapil Thangavelu

fix a race condition in reliable queue, if two consumers had fetch an item, and one of the consumers finished processing it before the other created a reservation, the slow consumer would return an item that was deleted. Now the the cycle switches from get->reserve-return to exists->reserve->get->return.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

On Thu, 08 Jul 2010 12:27:42 -0400, Kapil Thangavelu
<email address hidden> wrote:

>> The design of SerializedQueue looks pretty nice. Thank you!
>>
>> A few additional (last?) comments below:
>>
>> [20]
>>
>> In SerializedQueue:
>>
>> if name.endswith(suffix):
>> children[:] = []
>>
>> Putting some debugging information, I see that this logic is actually
>> used,
>> but I'm a bit confused regarding when it's necessary. The tests which
>> reach
>> this logic also make me a bit worried:
>> test_staged_multiproducer_multiconsumer, which theoretically should be
>> a case
>> which should never see items being processed due to the lock.
>
> the lock is only held when fetching an item. ie. the lock guards access
> to the queue items. However the consumer might still be processing the
> item, and until it does the processing node isn't released. it might be
> a better semantic (less contention) for serialization if the lock was
> held for the duration of the item processing. i'll make that change.
>

This is implemented now, the serialized queue works much smoother now with
less contention.
I've removed the children[:] filtering logic that was in use previously.

>
>> [21]
>>
>> Something that occurred to me while I was reading the code is that the
>> line:
>>
>> request.processing_children = False
>>
>> May still not get executed on certain exists from the logic in the item
>> processing. Should we guarantee that it will necessarily be reset on
>> any
>> exit, so that we never get an unusable object (with an inconsistent
>> state)?
>>
>
> is there a scenario where this would be the case? afaics all the failure
> mode exists are covered by error handlers that will set this flag to
> false.
>
after our discussion on irc, i took a look at putting the
request.processing_children into an errback/callback handler, but its not
clear that the same semantic would be achieved, Because one of the
scenarios is that a consumer is waiting on a producer to put items in the
queue, request.processing_children should be false here, but the no
callbacks would have been fired. ie.. a consumer attempts to fetch an item
 from a queue, there are some items in the queue, and the consumer is
processing children, but is competing with other consumers, if the queue
empties before an item is fetched, the consumer should wait on the watch
with processing_children = False with a callback/errback on the get the
flag wouldn't be set appropriately.

cheers,

Kapil

53. By Kapil Thangavelu

propgate unexpected errors when retrieving a node

54. By Kapil Thangavelu

refactor serialized queue to avoid extraneous existance checks and processing nodes.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'txzookeeper/lock.py'
--- txzookeeper/lock.py 2010-06-10 15:44:40 +0000
+++ txzookeeper/lock.py 2010-07-09 19:52:57 +0000
@@ -51,17 +51,22 @@
51 "/".join((self.path, self.prefix)),51 "/".join((self.path, self.prefix)),
52 flags=zookeeper.EPHEMERAL|zookeeper.SEQUENCE)52 flags=zookeeper.EPHEMERAL|zookeeper.SEQUENCE)
5353
54 def on_candidate_create(path):54 d.addCallback(self._on_candidate_create)
55 self._candidate_path = path55 d.addErrback(self._on_no_queue_error)
56 return self._acquire()
57
58 d.addCallback(on_candidate_create)
59
60 return d56 return d
6157
62 def _acquire(self):58 def _on_candidate_create(self, path):
59 self._candidate_path = path
60 return self._acquire()
61
62 def _on_no_queue_error(self, failure):
63 self._candidate_path = None
64 return failure
65
66 def _acquire(self, *args):
63 d = self._client.get_children(self.path)67 d = self._client.get_children(self.path)
64 d.addCallback(self._check_candidate_nodes)68 d.addCallback(self._check_candidate_nodes)
69 d.addErrback(self._on_no_queue_error)
65 return d70 return d
6671
67 def _check_candidate_nodes(self, children):72 def _check_candidate_nodes(self, children):
6873
=== added file 'txzookeeper/queue.py'
--- txzookeeper/queue.py 1970-01-01 00:00:00 +0000
+++ txzookeeper/queue.py 2010-07-09 19:52:57 +0000
@@ -0,0 +1,451 @@
1"""
2Several distributed multiprocess queue implementations.
3
4The C{Queue} implementation follows closely the apache zookeeper recipe, it
5provides no guarantees beyond isolation and concurrency of retrieval of items.
6
7The C{ReliableQueue} implementation, provides isolation, and concurrency, as
8well guarantees that if a consumer dies before processing an item, that item is
9made available to another consumer.
10
11The C{SerializedQueue} implementation provides for strict in order processing
12of items within a queue.
13"""
14
15import zookeeper
16
17from twisted.internet.defer import Deferred, fail
18from twisted.python.failure import Failure
19from txzookeeper.lock import Lock
20from txzookeeper.client import ZOO_OPEN_ACL_UNSAFE
21
22
23class Queue(object):
24 """
25 Implementation is based off the apache zookeeper Queue recipe.
26
27 There are some things to keep in mind when using this queue implementation.
28 Its primarily to enforce isolation and concurrent access, however it does
29 not provide for reliable consumption. An error condition in a queue
30 consumer must requeue the item, else its lost, as its removed from
31 zookeeper on retrieval in this implementation. This implementation more
32 closely mirrors the behavior and api of the pythonstandard library Queue,
33 or multiprocessing.Queue ableit with the caveat of only strings for queue
34 items.
35 """
36
37 prefix = "entry-"
38
39 def __init__(self, path, client, acl=None, persistent=False):
40 """
41 @param client: A connected C{ZookeeperClient} instance.
42 @param path: The path to the queue inthe zookeeper hierarchy.
43 @param acl: An acl to be used for queue items.
44 @param persistent: Boolean flag which denotes if items in the queue are
45 persistent.
46 """
47 self._path = path
48 self._client = client
49 self._persistent = persistent
50 if acl is None:
51 acl = [ZOO_OPEN_ACL_UNSAFE]
52 self._acl = acl
53
54 @property
55 def path(self):
56 """Path to the queue."""
57 return self._path
58
59 @property
60 def persistent(self):
61 """If the queue is persistent returns True."""
62 return self._persistent
63
64 def get(self):
65 """
66 Get and remove an item from the queue. If no item is available
67 at the moment, a deferred is return that will fire when an item
68 is available.
69 """
70
71 def on_queue_items_changed(*args):
72 """Event watcher on queue node child events."""
73 if request.complete or not self._client.connected:
74 return # pragma: no cover
75
76 if request.processing_children:
77 # If deferred stack is currently processing a set of children
78 # defer refetching the children till its done.
79 request.refetch_children = True
80 else:
81 # Else the item get request is just waiting for a watch,
82 # restart the get.
83 self._get(request)
84
85 request = GetRequest(Deferred(), on_queue_items_changed)
86 self._get(request)
87 return request.deferred
88
89 def put(self, item):
90 """
91 Put an item into the queue.
92
93 @param item: String data to be put on the queue.
94 """
95 if not isinstance(item, str):
96 return fail(ValueError("queue items must be strings"))
97
98 flags = zookeeper.SEQUENCE
99 if not self._persistent:
100 flags = flags|zookeeper.EPHEMERAL
101
102 d = self._client.create(
103 "/".join((self._path, self.prefix)), item, self._acl, flags)
104 return d
105
106 def qsize(self):
107 """
108 Return the approximate size of the queue. This value is always
109 effectively a snapshot. Returns a deferred returning an integer.
110 """
111 d = self._client.exists(self._path)
112
113 def on_success(stat):
114 return stat["numChildren"]
115
116 d.addCallback(on_success)
117 return d
118
119 def _get(self, request):
120 request.processing_children = True
121 d = self._client.get_children(self._path, request.child_watcher)
122 d.addCallback(self._get_item, request)
123 return d
124
125 def _get_item(self, children, request):
126
127 def fetch_node(name):
128 path = "/".join((self._path, name))
129 d = self._client.get(path)
130 d.addCallback(on_get_node_success)
131 d.addErrback(on_no_node)
132 return d
133
134 def on_get_node_success((data, stat)):
135 d = self._client.delete("/".join((self._path, name)))
136 d.addCallback(on_delete_node_success, data)
137 d.addErrback(on_no_node)
138 return d
139
140 def on_delete_node_success(result_code, data):
141 request.processing_children = False
142 request.callback(data)
143
144 def on_no_node(failure=None):
145 if failure and not failure.check(zookeeper.NoNodeException):
146 request.errback(failure)
147 return
148 if children:
149 name = children.pop(0)
150 return fetch_node(name)
151
152 # Refetching deferred until we process all the children from
153 # from a get children call.
154 request.processing_children = False
155 if request.refetch_children:
156 request.refetch_children = False
157 return self._get(request)
158
159 if not children:
160 return on_no_node()
161
162 children.sort()
163 name = children.pop(0)
164 return fetch_node(name)
165
166
167class GetRequest(object):
168 """
169 An encapsulation of a consumer request to fetch an item from the queue.
170
171 @refetch_children - boolean field, when true signals that children should
172 be refetched after processing the current set of children.
173
174 @child_watcher -The queue child/item watcher.
175
176 @processing_children - Boolean flag, set to true when the last known
177 children of the queue are being processed. If a watch fires while the
178 children are being processed it sets the refetch_children flag to true
179 instead of getting the children immediately.
180
181 @deferred - The deferred representing retrieving an item from the queue.
182 """
183
184 def __init__(self, deferred, watcher):
185 self.deferred = deferred
186 self.child_watcher = watcher
187 self.processing_children = False
188 self.refetch_children = False
189
190 @property
191 def complete(self):
192 return self.deferred.called
193
194 def callback(self, data):
195 self.deferred.callback(data)
196
197 def errback(self, error):
198 self.deferred.errback(error)
199
200
201class QueueItem(object):
202 """
203 An encapsulation of a work item put into a queue. The work item data is
204 accessible via the data attribute. When the item has been processed by
205 the consumer, the delete method can be invoked to remove the item
206 permanently from the queue.
207
208 An optional processed callback maybe passed to the constructor that will
209 be invoked after the node has been processed.
210 """
211
212 def __init__(self, path, data, client, processed_callback=None):
213 self._path = path
214 self._data = data
215 self._client = client
216 self._processed_callback = processed_callback
217
218 @property
219 def data(self):
220 return self._data
221
222 @property
223 def path(self):
224 return self._path
225
226 def delete(self):
227 """
228 Delete the item node and the item processing node in the queue.
229 Typically invoked by a queue consumer, to signal succesful processing
230 of the queue item.
231 """
232 d = self._client.delete(self.path)
233
234 if self._processed_callback:
235 d.addCallback(self._processed_callback, self.path)
236 return d
237
238
239class ReliableQueue(Queue):
240 """
241 A distributed queue. It varies from a C{Queue} in that it ensures any
242 item consumed from the queue is explicitly ack'd by the consumer.
243 If the consumer dies after retrieving an item before ack'ing the item.
244 The item will be made available to another consumer. To encapsulate the
245 acking behavior the queue item data is returned in a C{QueueItem} instance,
246 with a delete method that will remove it from the queue after processing.
247
248 Reliable queues may be persistent or transient. If the queue is durable,
249 than any item added to the queue must be processed in order to be removed.
250 If the queue is transient, then any jobs placed in the queue by a client
251 are removed when the client is closed, regardless of whether the job
252 has been processed or not.
253 """
254
255 def _item_processed_callback(self, result_code, item_path):
256 return self._client.delete(item_path+"-processing")
257
258 def _filter_children(self, children, suffix="-processing"):
259 """
260 Filter any children currently being processed, modified in place.
261 """
262 children.sort()
263 for name in list(children):
264 # remove any processing nodes and their associated queue item.
265 if name.endswith(suffix):
266 children.remove(name)
267 item_name = name[:-len(suffix)]
268 if item_name in children:
269 children.remove(item_name)
270
271 def _get_item(self, children, request):
272
273 def check_node(name):
274 """Check the node still exists."""
275 path = "/".join((self._path, name))
276 d = self._client.exists(path)
277 d.addCallback(on_node_exists, path)
278 d.addErrback(on_reservation_failed)
279 return d
280
281 def on_node_exists(stat, path):
282 """Reserve the node for consumer processing."""
283 d = self._client.create(path+"-processing",
284 flags=zookeeper.EPHEMERAL)
285 d.addCallback(on_reservation_success, path)
286 d.addErrback(on_reservation_failed)
287 return d
288
289 def on_reservation_success(processing_path, path):
290 """Fetch the node data to return"""
291 d = self._client.get(path)
292 d.addCallback(on_get_node_success, path)
293 d.addErrback(on_get_node_failed, path)
294 return d
295
296 def on_get_node_failed(failure, path):
297 """If we can't fetch the node, delete the processing node."""
298 d = self._client.delete(path+"-processing")
299
300 # propogate unexpected errors appropriately
301 if not failure.check(zookeeper.NoNodeException):
302 d.addCallback(lambda x: request.errback(failure))
303 else:
304 d.addCallback(on_reservation_failed)
305 return d
306
307 def on_get_node_success((data, stat), path):
308 """If we got the node, we're done."""
309 request.processing_children = False
310 request.callback(
311 QueueItem(
312 path, data, self._client, self._item_processed_callback))
313
314 def on_reservation_failed(failure=None):
315 """If we can't get the node or reserve, continue processing
316 the children."""
317 if failure and not failure.check(
318 zookeeper.NodeExistsException, zookeeper.NoNodeException):
319 request.processing_children = True
320 request.errback(failure)
321 return
322
323 if children:
324 name = children.pop(0)
325 return check_node(name)
326
327 # If a watch fired while processing children, process it
328 # after the children list is exhausted.
329 request.processing_children = False
330 if request.refetch_children:
331 request.refetch_children = False
332 return self._get(request)
333
334 self._filter_children(children)
335
336 if not children:
337 return on_reservation_failed()
338
339 name = children.pop(0)
340 return check_node(name)
341
342
343class SerializedQueue(Queue):
344 """
345 A serialized queue ensures even with multiple consumers items are retrieved
346 and processed in the order they where placed in the queue.
347
348 This implementation aggregates a reliable queue, with a lock to provide
349 for serialized consumer access. The lock is released only when a queue item
350 has been processed.
351 """
352
353 def __init__(self, path, client, acl=None, persistent=False):
354 super(SerializedQueue, self).__init__(path, client, acl, persistent)
355 self._lock = Lock("%s/%s"%(self.path, "_lock"), client)
356
357 def _item_processed_callback(self, result_code, item_path):
358 return self._lock.release()
359
360 def _filter_children(self, children, suffix="-processing"):
361 """
362 Filter the lock from consideration as an item to be processed.
363 """
364 children.sort()
365 for name in list(children):
366 if name.startswith('_'):
367 children.remove(name)
368
369 def _on_lock_directory_does_not_exist(self, failure):
370 """
371 If the lock directory does not exist, go ahead and create it and
372 attempt to acquire the lock.
373 """
374 failure.trap(zookeeper.NoNodeException)
375 d = self._client.create(self._lock.path)
376 d.addBoth(self._on_lock_created_or_exists)
377 return d
378
379 def _on_lock_created_or_exists(self, failure):
380 """
381 The lock node creation will either result in success or node exists
382 error, if a concurrent client created the node first. In either case
383 we proceed with attempting to acquire the lock.
384 """
385 if isinstance(failure, Failure):
386 failure.trap(zookeeper.NodeExistsException)
387 d = self._lock.acquire()
388 return d
389
390 def _on_lock_acquired(self, lock):
391 """
392 After the exclusive queue lock is acquired, we proceed with an attempt
393 to fetch an item from the queue.
394 """
395 d = super(SerializedQueue, self).get()
396 return d
397
398 def get(self):
399 """
400 Get and remove an item from the queue. If no item is available
401 at the moment, a deferred is return that will fire when an item
402 is available.
403 """
404 d = self._lock.acquire()
405
406 d.addErrback(self._on_lock_directory_does_not_exist)
407 d.addCallback(self._on_lock_acquired)
408 return d
409
410 def _get_item(self, children, request):
411
412 def fetch_node(name):
413 path = "/".join((self._path, name))
414 d = self._client.get(path)
415 d.addCallback(on_node_retrieved, path)
416 d.addErrback(on_reservation_failed)
417 return d
418
419 def on_node_retrieved((data, stat), path):
420 request.processing_children = False
421 request.callback(
422 QueueItem(
423 path, data, self._client, self._item_processed_callback))
424
425 def on_reservation_failed(failure=None):
426 """If we can't get the node or reserve, continue processing
427 the children."""
428 if failure and not failure.check(
429 zookeeper.NodeExistsException, zookeeper.NoNodeException):
430 request.processing_children = True
431 request.errback(failure)
432 return
433
434 if children:
435 name = children.pop(0)
436 return fetch_node(name)
437
438 # If a watch fired while processing children, process it
439 # after the children list is exhausted.
440 request.processing_children = False
441 if request.refetch_children:
442 request.refetch_children = False
443 return self._get(request)
444
445 self._filter_children(children)
446
447 if not children:
448 return on_reservation_failed()
449
450 name = children.pop(0)
451 return fetch_node(name)
0452
=== modified file 'txzookeeper/tests/__init__.py'
--- txzookeeper/tests/__init__.py 2010-06-10 15:46:02 +0000
+++ txzookeeper/tests/__init__.py 2010-07-09 19:52:57 +0000
@@ -4,10 +4,6 @@
4from twisted.trial.unittest import TestCase4from twisted.trial.unittest import TestCase
5from mocker import MockerTestCase5from mocker import MockerTestCase
66
7#from txzookeeper.client import Wrapper
8
9#zookeeper = Wrapper(zookeeper)
10
11class ZookeeperTestCase(TestCase, MockerTestCase):7class ZookeeperTestCase(TestCase, MockerTestCase):
128
13 def setUp(self):9 def setUp(self):
1410
=== modified file 'txzookeeper/tests/test_client.py'
--- txzookeeper/tests/test_client.py 2010-06-01 14:26:45 +0000
+++ txzookeeper/tests/test_client.py 2010-07-09 19:52:57 +0000
@@ -342,6 +342,33 @@
342 d.addCallback(verify_exists)342 d.addCallback(verify_exists)
343 return d343 return d
344344
345 def test_exists_with_watcher_and_close(self):
346 """
347 Closing a connection with an watch outstanding behaves correctly.
348 """
349 d = self.client.connect()
350 zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
351
352 def node_watcher(event_type, state, path):
353 client = getattr(self, "client", None)
354 if client is not None and client.connected:
355 self.fail("Client should be disconnected")
356
357 def create_node(client):
358 return client.create("/syracuse")
359
360 def check_exists(path):
361 # shouldn't fire till unit test cleanup
362 return self.client.exists(path, node_watcher)
363
364 def verify_exists(result):
365 self.assertTrue(result)
366
367 d.addCallback(create_node)
368 d.addCallback(check_exists)
369 d.addCallback(verify_exists)
370 return d
371
345 def test_exists_with_nonexistant_watcher(self):372 def test_exists_with_nonexistant_watcher(self):
346 """373 """
347 The exists method can also be used to set an optional watcher on a374 The exists method can also be used to set an optional watcher on a
348375
=== modified file 'txzookeeper/tests/test_lock.py'
--- txzookeeper/tests/test_lock.py 2010-06-10 15:44:40 +0000
+++ txzookeeper/tests/test_lock.py 2010-07-09 19:52:57 +0000
@@ -1,4 +1,5 @@
11
2from zookeeper import NoNodeException
2from mocker import ANY3from mocker import ANY
3from twisted.internet.defer import (4from twisted.internet.defer import (
4 inlineCallbacks, returnValue, Deferred, succeed)5 inlineCallbacks, returnValue, Deferred, succeed)
@@ -81,6 +82,22 @@
81 yield self.failUnlessFailure(lock.acquire(), LockError)82 yield self.failUnlessFailure(lock.acquire(), LockError)
8283
83 @inlineCallbacks84 @inlineCallbacks
85 def test_acquire_after_error(self):
86 """
87 Any instance state associated with a failed acquired should be cleared
88 on error, allowing subsequent to succeed.
89 """
90 client = yield self.open_client()
91 path = "/lock-test-acquire-after-error"
92 lock = Lock(path, client)
93 d = lock.acquire()
94 self.failUnlessFailure(d, NoNodeException)
95 yield d
96 yield client.create(path)
97 yield lock.acquire()
98 self.assertEqual(lock.acquired, True)
99
100 @inlineCallbacks
84 def test_error_on_acquire_acquiring(self):101 def test_error_on_acquire_acquiring(self):
85 """102 """
86 Attempting to acquire the lock while an attempt is already in progress,103 Attempting to acquire the lock while an attempt is already in progress,
87104
=== added file 'txzookeeper/tests/test_queue.py'
--- txzookeeper/tests/test_queue.py 1970-01-01 00:00:00 +0000
+++ txzookeeper/tests/test_queue.py 2010-07-09 19:52:57 +0000
@@ -0,0 +1,392 @@
1
2from zookeeper import NoNodeException
3from twisted.internet.defer import (
4 inlineCallbacks, returnValue, DeferredList, Deferred, succeed, fail)
5
6from txzookeeper import ZookeeperClient
7from txzookeeper.client import NotConnectedException
8from txzookeeper.queue import Queue, ReliableQueue, SerializedQueue, QueueItem
9from txzookeeper.tests import ZookeeperTestCase, utils
10
11from mocker import ANY
12
13
14class QueueTests(ZookeeperTestCase):
15
16 queue_factory = Queue
17
18 def setUp(self):
19 super(QueueTests, self).setUp()
20 self.clients = []
21
22 def tearDown(self):
23 cleanup = False
24
25 for client in self.clients:
26 if not cleanup and client.connected:
27 utils.deleteTree(handle=client.handle)
28 cleanup = True
29 if client.connected:
30 client.close()
31 super(QueueTests, self).tearDown()
32
33 def compare_data(self, data, item):
34 if isinstance(item, QueueItem):
35 self.assertEqual(data, item.data)
36 else:
37 self.assertEqual(data, item)
38
39 def consume_item(self, item):
40 if isinstance(item, QueueItem):
41 return item.delete(), item.data
42 return None, item
43
44 @inlineCallbacks
45 def open_client(self, credentials=None):
46 """
47 Open a zookeeper client, optionally authenticating with the
48 credentials if given.
49 """
50 client = ZookeeperClient("127.0.0.1:2181")
51 self.clients.append(client)
52 yield client.connect()
53 if credentials:
54 d = client.add_auth("digest", credentials)
55 # hack to keep auth fast
56 yield client.exists("/")
57 yield d
58 returnValue(client)
59
60 def test_path_property(self):
61 """
62 The queue has a property that can be used to introspect its
63 path in read only manner.
64 """
65 q = self.queue_factory("/moon", None)
66 self.assertEqual(q.path, "/moon")
67
68 def test_persistent_property(self):
69 """
70 The queue has a property that can be used to introspect
71 whether or not the queue entries are persistent.
72 """
73 q = self.queue_factory("/moon", None, persistent=True)
74 self.assertEqual(q.persistent, True)
75
76 @inlineCallbacks
77 def test_put_item(self):
78 """
79 An item can be put on the queue, and is stored in a node in
80 queue's directory.
81 """
82 client = yield self.open_client()
83 path = yield client.create("/queue-test")
84 queue = self.queue_factory(path, client)
85 item = "transform image bluemarble.jpg"
86 yield queue.put(item)
87 children = yield client.get_children(path)
88 self.assertEqual(len(children), 1)
89 data, stat = yield client.get("/".join((path, children[0])))
90 self.compare_data(data, item)
91
92 @inlineCallbacks
93 def test_qsize(self):
94 """
95 The client implements a method which returns an unreliable
96 approximation of the number of items in the queue (mirrors api
97 of Queue.Queue), its unreliable only in that the value represents
98 a temporal snapshot of the value at the time it was requested,
99 not its current value.
100 """
101 client = yield self.open_client()
102 path = yield client.create("/test-qsize")
103 queue = self.queue_factory(path, client)
104
105 yield queue.put("abc")
106 size = yield queue.qsize()
107 self.assertTrue(size, 1)
108
109 yield queue.put("bcd")
110 size = yield queue.qsize()
111 self.assertTrue(size, 2)
112
113 yield queue.get()
114 size = yield queue.qsize()
115 self.assertTrue(size, 1)
116
117 @inlineCallbacks
118 def test_invalid_put_item(self):
119 """
120 The queue only accepts string items.
121 """
122 client = yield self.open_client()
123 queue = self.queue_factory("/unused", client)
124 self.failUnlessFailure(queue.put(123), ValueError)
125
126 @inlineCallbacks
127 def test_get_with_invalid_queue(self):
128 """
129 If the queue hasn't been created an unknown node exception is raised
130 on get.
131 """
132 client = yield self.open_client()
133 queue = self.queue_factory("/unused", client)
134 yield self.failUnlessFailure(queue.put("abc"), NoNodeException)
135
136 @inlineCallbacks
137 def test_put_with_invalid_queue(self):
138 """
139 If the queue hasn't been created an unknown node exception is raised
140 on put.
141 """
142 client = yield self.open_client()
143 queue = self.queue_factory("/unused", client)
144 yield self.failUnlessFailure(queue.put("abc"), NoNodeException)
145
146 @inlineCallbacks
147 def test_unexpected_error_during_item_retrieval(self):
148 """
149 If an unexpected error occurs when reserving an item, the error is
150 passed up to the get deferred's errback method.
151 """
152 test_client = yield self.open_client()
153 path = yield test_client.create("/reliable-queue-test")
154
155 # setup the test scenario
156 mock_client = self.mocker.patch(test_client)
157 mock_client.get_children(path, ANY)
158 self.mocker.result(succeed(["entry-000000"]))
159
160 item_path = "%s/%s"%(path, "entry-000000")
161 mock_client.get(item_path)
162 self.mocker.result(fail(SyntaxError("x")))
163 self.mocker.replay()
164
165 # odd behavior, this should return a failure, as above, but it returns
166 # None
167 d = self.queue_factory(path, mock_client).get()
168 assert d
169 self.failUnlessFailure(d, SyntaxError)
170 yield d
171
172 @inlineCallbacks
173 def test_get_and_put(self):
174 """
175 Get can also be used on empty queues and returns a deferred that fires
176 whenever an item is has been retrieved from the queue.
177 """
178 client = yield self.open_client()
179 path = yield client.create("/queue-wait-test")
180 data = "zebra moon"
181 queue = self.queue_factory(path, client)
182 d = queue.get()
183
184 @inlineCallbacks
185 def push_item():
186 queue = self.queue_factory(path, client)
187 yield queue.put(data)
188
189 from twisted.internet import reactor
190 reactor.callLater(0.1, push_item)
191
192 item = yield d
193 self.compare_data(data, item)
194
195 @inlineCallbacks
196 def test_interleaved_multiple_consumers_wait(self):
197 """
198 Multiple consumers and a producer adding and removing items on the
199 the queue concurrently.
200 """
201 test_client = yield self.open_client()
202 path = yield test_client.create("/multi-consumer-wait-test")
203 results = []
204
205 @inlineCallbacks
206 def producer(item_count):
207 from twisted.internet import reactor
208 client = yield self.open_client()
209 queue = self.queue_factory(path, client)
210
211 items = []
212 producer_done = Deferred()
213
214 def iteration(i):
215 if len(items) == (item_count-1):
216 return producer_done.callback(None)
217 items.append(i)
218 queue.put(str(i))
219
220 for i in range(item_count):
221 reactor.callLater(i*0.05, iteration, i)
222 yield producer_done
223 returnValue(items)
224
225 @inlineCallbacks
226 def consumer(item_count):
227 client = yield self.open_client()
228 queue = self.queue_factory(path, client)
229 for i in range(item_count):
230 try:
231 data = yield queue.get()
232 d, data = self.consume_item(data)
233 if d:
234 yield d
235 except NotConnectedException:
236 # when the test closes, we need to catch this
237 # as one of the producers will likely hang.
238 returnValue(len(results))
239 results.append((client.handle, data))
240
241 returnValue(len(results))
242
243 yield DeferredList(
244 [DeferredList([consumer(3), consumer(2)], fireOnOneCallback=1),
245 producer(6)])
246 # as soon as the producer and either consumer is complete than the test
247 # is done. Thus the only assertion we can make is the result is the
248 # size of at least the smallest consumer.
249 self.assertTrue(len(results) >= 2)
250
251 @inlineCallbacks
252 def test_staged_multiproducer_multiconsumer(self):
253 """
254 A real world scenario test, A set of producers filling a queue with
255 items, and then a set of concurrent consumers pulling from the queue
256 till its empty. The consumers use a non blocking get (defer raises
257 exception on empty).
258 """
259 test_client = yield self.open_client()
260 path = yield test_client.create("/multi-prod-cons")
261
262 consume_results = []
263 produce_results = []
264
265 @inlineCallbacks
266 def producer(start, offset):
267 client = yield self.open_client()
268 q = self.queue_factory(path, client)
269 for i in range(start, start+offset):
270 yield q.put(str(i))
271 produce_results.append(str(i))
272
273 @inlineCallbacks
274 def consumer(max):
275 client = yield self.open_client()
276 q = self.queue_factory(path, client)
277 attempts = range(max)
278 for el in attempts:
279 value = yield q.get()
280 d, value = self.consume_item(value)
281 if d:
282 yield d
283 consume_results.append(value)
284 returnValue(True)
285
286 # two producers 20 items total
287 yield DeferredList(
288 [producer(0, 10), producer(10, 10)])
289
290 children = yield test_client.get_children(path)
291 self.assertEqual(len(children), 20)
292
293 yield DeferredList(
294 [consumer(8), consumer(8), consumer(4)])
295
296 err = set(produce_results)-set(consume_results)
297 self.assertFalse(err)
298
299 self.assertEqual(len(consume_results), len(produce_results))
300
301
302class ReliableQueueTests(QueueTests):
303
304 queue_factory = ReliableQueue
305
306 @inlineCallbacks
307 def test_unprocessed_item_reappears(self):
308 """
309 If a queue consumer exits before processing an item, then
310 the item will become visible to other queue consumers.
311 """
312 test_client = yield self.open_client()
313 path = yield test_client.create("/reliable-queue-test")
314
315 data = "rabbit stew"
316 queue = self.queue_factory(path, test_client)
317 yield queue.put(data)
318
319 test_client2 = yield self.open_client()
320 queue2 = self.queue_factory(path, test_client2)
321 item = yield queue2.get()
322 self.compare_data(data, item)
323
324 d = queue.get()
325 yield test_client2.close()
326
327 item = yield d
328 self.compare_data(data, item)
329
330 @inlineCallbacks
331 def test_processed_item_removed(self):
332 """
333 If a client processes an item, than that item is removed from the queue
334 permanently.
335 """
336 test_client = yield self.open_client()
337 path = yield test_client.create("/reliable-queue-test")
338
339 data = "rabbit stew"
340 queue = self.queue_factory(path, test_client)
341 yield queue.put(data)
342 item = yield queue.get()
343 self.compare_data(data, item)
344 yield item.delete()
345 yield test_client.close()
346
347 test_client2 = yield self.open_client()
348 children = yield test_client2.get_children(path)
349 children = [c for c in children if c.startswith(queue.prefix)]
350 self.assertFalse(bool(children))
351
352
353class SerializedQueueTests(ReliableQueueTests):
354
355 queue_factory = SerializedQueue
356
357 @inlineCallbacks
358 def test_serialized_behavior(self):
359 """
360 The serialized queue behavior is such that even with multiple
361 consumers, items are processed in order.
362 """
363 test_client = yield self.open_client()
364 path = yield test_client.create("/serialized-queue-test")
365
366 queue = self.queue_factory(path, test_client, persistent=True)
367
368 yield queue.put("a")
369 yield queue.put("b")
370
371 test_client2 = yield self.open_client()
372 queue2 = self.queue_factory(path, test_client2, persistent=True)
373
374 d = queue2.get()
375
376 def on_get_item_sleep_and_close(item):
377 """Close the connection after we have the item."""
378 from twisted.internet import reactor
379 reactor.callLater(0.1, test_client2.close)
380 return item
381
382 d.addCallback(on_get_item_sleep_and_close)
383
384 # fetch the item from queue2
385 item1 = yield d
386 # fetch the item from queue1, this will not get "b", because client2 is
387 # still processing "a". When client2 closes its connection, client1
388 # will get item "a"
389 item2 = yield queue.get()
390
391 self.compare_data("a", item2)
392 self.assertEqual(item1.data, item2.data)
0393
=== modified file 'txzookeeper/todo.txt'
--- txzookeeper/todo.txt 2010-05-08 12:14:56 +0000
+++ txzookeeper/todo.txt 2010-07-09 19:52:57 +0000
@@ -2,10 +2,10 @@
2bugs to file upstream2bugs to file upstream
33
4 - you can set acl on a non existant node.4 - you can set acl on a non existant node.
5 - memory leak every api invocation.5 - memory leak every api invocation. [really? need some measurements here]
66
7 observed while trying xtest_get_children_with_watcher7 observed while trying xtest_get_children_with_watcher
88
9 - async get children with watcher seems broken.9 - async get children with watcher seems broken. [772 - fixed upstream]
10 - segfault if close during completion.10 - segfault if close during completion.
11 - getting a watch notification when closing a connection, segfaults.11 - getting a watch notification when closing a connection, segfaults.

Subscribers

People subscribed via source and target branches

to all changes: