[1] + self.path = path + self.client = client + self.persistent = persistent I tend to prefer keeping things private, unless they are actually required by external clients, or would be obviously important to know about. This way it's easy to tell what's the "trusted" API clients are supposed to rely on, and also makes it more comfortable when changing the interface (anything private can be removed/renamed/replaced). How do you feel about this in general? [2] + self, path, client, acl=[ZOO_OPEN_ACL_UNSAFE], persistent=False): Also a pretty general point which I'm making mostly to synchronize our thinking, rather than as a *required* change here: I tend to prefer using the style of acl=None in the initialization of default parameters, and then process it internally in the constructor (if acl is None, acl = ... or similar). While the result is obviously the same here, the main distinction is that it enables code using the function or class to say "I don't have anything special in this parameter.. just do your default.", which gets pretty tricky when the default value is in the keyword argument constructor itself. [3] + return self._get(wait=True) + + get_wait = get Do we need all these alternatives? Part of the beauty of Twisted is that we don't really have to care about what "waiting" means. I suggest we have a single interface for getting, and it will always return a deferred which will fire once an item is available, no matter what. This will also simplify a bit the logic elsewhere (in _refill, _get_item, etc). [4] + d = self.client.create( + "/".join((self.path, self.prefix)), item, self._acl, flags) + return d Nice. It feels pretty cool to be able to wait on a put this way. [5] + d = self.client.exists(self.path) + + def on_success(stat): + return stat["numChildren"] Oh, interesting trick! I would imagine that getting the full list would be required, but this is of course a lot better. [6] + Fetch the node data in the queue directory for the given node name. If + wait is (...) + # tests. Instead we process our entire our node cache before + # proceeding. Couple of comment details: "is ..." and "our entire our". [7] + Refetch the queue children, setting a watch as needed, and invalidating + any previous children entries queue. It would be nice to have a higher level description of what the algorithm is actually doing. E.g. what is the children entries queue about, what happens when it's empty, or when two different consumers have a partially overlapping queue, how are items consumed, etc. [8] In on_queue_items_changed(): + self._cached_entries = [] + d = self._refill(wait=wait) Why is the cache being reset right after we're told changes have happened? Shouldn't this happen once we actually get the new list of children? [9] + d = self._refill(wait=wait) (...) + d = self.client.get_children( + self.path, on_queue_items_changed) It'd be good to have more descriptive names for these variables, since one of them is shadowing the other. Note, for instance: + d.addCallback(notify_waiting) + + d = self.client.get_children( The "d" above isn't the "d" below, even though they share a scope and are so close. [10] + if isinstance(failure.value, zookeeper.NoNodeException): Failure has a trap() method which is handy for these cases. [11] This is a gut feeling, but it feels a bit like there are too many entry points into the same caching logic. There are four places testing for the emptiness of _cached_entries, and at least four other places modifying it for various reasons, in different functions. It'd be really good for us if we can streamline and simplify this into a single place popping items, and a single place refilling it. If you don't feel like working on this, I can have a look at refactoring this somehow to see if that's feasible, once these branches go in and we have a stable base again.