Merge lp:~hazmat/txzookeeper/backoff-retry-managed-sanity into lp:~hazmat/txzookeeper/trunk

Proposed by Kapil Thangavelu
Status: Merged
Merge reported by: Kapil Thangavelu
Merged at revision: not available
Proposed branch: lp:~hazmat/txzookeeper/backoff-retry-managed-sanity
Merge into: lp:~hazmat/txzookeeper/trunk
Diff against target: 3336 lines (+2225/-241)
27 files modified
.bzrignore (+4/-0)
Makefile (+6/-0)
debian/changelog (+18/-0)
setup.py (+6/-3)
txzookeeper/__init__.py (+5/-2)
txzookeeper/client.py (+159/-79)
txzookeeper/lock.py (+6/-3)
txzookeeper/managed.py (+413/-0)
txzookeeper/node.py (+7/-26)
txzookeeper/queue.py (+5/-2)
txzookeeper/retry.py (+359/-0)
txzookeeper/tests/__init__.py (+57/-7)
txzookeeper/tests/common.py (+27/-2)
txzookeeper/tests/proxy.py (+97/-0)
txzookeeper/tests/test_client.py (+63/-77)
txzookeeper/tests/test_conn_failure.py (+194/-0)
txzookeeper/tests/test_lock.py (+4/-1)
txzookeeper/tests/test_managed.py (+309/-0)
txzookeeper/tests/test_node.py (+6/-3)
txzookeeper/tests/test_queue.py (+5/-2)
txzookeeper/tests/test_retry.py (+332/-0)
txzookeeper/tests/test_security.py (+4/-1)
txzookeeper/tests/test_session.py (+111/-20)
txzookeeper/tests/test_utils.py (+4/-1)
txzookeeper/tests/utils.py (+4/-1)
txzookeeper/todo.txt (+1/-9)
txzookeeper/utils.py (+19/-2)
To merge this branch: bzr merge lp:~hazmat/txzookeeper/backoff-retry-managed-sanity
Reviewer Review Type Date Requested Status
Kapil Thangavelu Pending
Review via email: mp+146938@code.launchpad.net

Description of the change

Session management and retry improvements.

Lots of improvements and several bug fixes.

Previously managed clientwas returning the underlying session client instead
of retry, so retry wasn't enabled for uses trying to catch/store the client from
connect.

The retry code was using seconds for session timeout instead of milliseconds.

The retry code wasn't taking account of the start of retry properly when
determining max retry.

The retry on persistent error wasn't signaling to the managed client to
establish a new session.

The client code wasn't properly cleaning up txzk handles on conn timeouts.

The session establishment now uses the client session events so even without
activity or active watches the session is restablished.

Session restablishment will now backoff on reconnects for up to 6m.

https://codereview.appspot.com/7307051/

To post a comment you must log in.
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Reviewers: mp+146938_code.launchpad.net,

Message:
Please take a look.

Description:
Session management and retry improvements.

Lots of improvements and several bug fixes.

Previously managed clientwas returning the underlying session client
instead
of retry, so retry wasn't enabled for uses trying to catch/store the
client from
connect.

The retry code was using seconds for session timeout instead of
milliseconds.

The retry code wasn't taking account of the start of retry properly when

determining max retry.

The retry on persistent error wasn't signaling to the managed client to
establish a new session.

The client code wasn't properly cleaning up txzk handles on conn
timeouts.

The session establishment now uses the client session events so even
without
activity or active watches the session is restablished.

Session restablishment will now backoff on reconnects for up to 6m.

https://code.launchpad.net/~hazmat/txzookeeper/backoff-retry-managed-sanity/+merge/146938

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/7307051/

Affected files:
   A .bzrignore
   A [revision details]
   M debian/changelog
   M setup.py
   M txzookeeper/__init__.py
   M txzookeeper/client.py
   M txzookeeper/lock.py
   A txzookeeper/managed.py
   M txzookeeper/node.py
   M txzookeeper/queue.py
   A txzookeeper/retry.py
   M txzookeeper/tests/__init__.py
   M txzookeeper/tests/common.py
   A txzookeeper/tests/proxy.py
   M txzookeeper/tests/test_client.py
   A txzookeeper/tests/test_conn_failure.py
   M txzookeeper/tests/test_lock.py
   A txzookeeper/tests/test_managed.py
   M txzookeeper/tests/test_node.py
   M txzookeeper/tests/test_queue.py
   A txzookeeper/tests/test_retry.py
   M txzookeeper/tests/test_security.py
   M txzookeeper/tests/test_session.py
   M txzookeeper/tests/test_utils.py
   M txzookeeper/tests/utils.py
   M txzookeeper/todo.txt
   M txzookeeper/utils.py

Revision history for this message
Kapil Thangavelu (hazmat) wrote :
52. By Kapil Thangavelu

much better session recovery and retry handling

Revision history for this message
Kapil Thangavelu (hazmat) wrote :
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

for reasons unknown the diff on this very wrong, all the other revs other than 52 are already on trunk.

Revision history for this message
Benjamin Saller (bcsaller) wrote :

Hi,

Thanks for the branch. Initially I had an older version of pyzookeeper in my path and the tests were segfaulting, but after tracking that down I was able to get things working smoothly.

Coverage shows that the session backoff code isn't tested, if I'm reading it properly self._backoff_seconds is never incremented in the tests and so some of the focus of this branch goes without regression testing.

It looks like retry_error_callback never triggers cb_retry_expired in the tests either.

Nice simplification in node.py.

This looks good but I would like to see a few more tests around the retry path.

53. By Kapil Thangavelu

app errors don't trigger reconnect

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

the test for backoff are in test_session which operates a full zk cluster
and has a test case that verifies backoff has run. there's a new makefile
with one target .. make coverage that runs through the full test suite. in
testing with juju environments, i discovered another minor issue with
retry.

thanks for the review.

-k

On Wed, Feb 6, 2013 at 7:15 PM, Benjamin Saller <
<email address hidden>> wrote:

> Hi,
>
> Thanks for the branch. Initially I had an older version of pyzookeeper in
> my path and the tests were segfaulting, but after tracking that down I was
> able to get things working smoothly.
>
> Coverage shows that the session backoff code isn't tested, if I'm reading
> it properly self._backoff_seconds is never incremented in the tests and so
> some of the focus of this branch goes without regression testing.
>
> It looks like retry_error_callback never triggers cb_retry_expired in the
> tests either.
>
> Nice simplification in node.py.
>
> This looks good but I would like to see a few more tests around the retry
> path.
>
>
>
> --
>
> https://code.launchpad.net/~hazmat/txzookeeper/backoff-retry-managed-sanity/+merge/146938
> You are the owner of lp:~hazmat/txzookeeper/backoff-retry-managed-sanity.
>

54. By Kapil Thangavelu

argh.. conditional fail

55. By Kapil Thangavelu

prevent forced reconnect hurds

Revision history for this message
Mikael Uvebrandt (mikael-uvebrandt) wrote :

Would love to see this in a release soon - we've seen this reconnect hammering in production, and it's conceivable that it contributed to killing the zookeeper (3.3.3) cluster in one instance (open files limit hit).

56. By Kapil Thangavelu

incr rev

57. By Kapil Thangavelu

fix typo in reconnect test assert, cleanup hurd logic

58. By Kapil Thangavelu

work around libzk bug to make session expiration tests more reliable

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Merged to trunk.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file '.bzrignore'
2--- .bzrignore 1970-01-01 00:00:00 +0000
3+++ .bzrignore 2013-05-15 13:35:05 +0000
4@@ -0,0 +1,4 @@
5+.emacs.desktop
6+_trial_temp
7+txzookeeper
8+txzookeeper.egg-info
9
10=== added file 'Makefile'
11--- Makefile 1970-01-01 00:00:00 +0000
12+++ Makefile 2013-05-15 13:35:05 +0000
13@@ -0,0 +1,6 @@
14+COVERAGE_FILES=`find txzookeeper -name "*py" | grep -v "tests"`
15+
16+coverage:
17+ python -c "import coverage as c; c.main()" run /usr/bin/trial txzookeeper
18+ python -c "import coverage as c; c.main()" html -d htmlcov $(COVERAGE_FILES)
19+ gnome-open htmlcov/index.html
20\ No newline at end of file
21
22=== modified file 'debian/changelog'
23--- debian/changelog 2011-07-11 21:29:44 +0000
24+++ debian/changelog 2013-05-15 13:35:05 +0000
25@@ -1,3 +1,21 @@
26+txzookeeper (0.9.8-0ubuntu1) raring; urgency=low
27+
28+ * Better reconnect/retry behavior.
29+
30+ -- kapil <kapil@objectrealms.net> Mon, 6 May 2013 08:54:08 -0400
31+
32+txzookeeper (0.9.7-0ubuntu1) quantal; urgency=low
33+
34+ * Path included on exceptions, connection close bugfix
35+
36+ -- kapil <kapil@objectrealms.net> Wed, 19 Sep 2012 08:54:08 -0400
37+
38+txzookeeper (0.9.6-0ubuntu3) quantal; urgency=low
39+
40+ * managed client implementation with auto session expiration behavior
41+
42+ -- kapil <kapil@objectrealms.net> Fri, 22 Jun 2012 03:00:08 -0400
43+
44 txzookeeper (0.8.0-0ubuntu1) natty; urgency=low
45
46 * Much improved session event and connection error handling (and cluster tests)
47
48=== modified file 'setup.py'
49--- setup.py 2011-06-17 23:52:34 +0000
50+++ setup.py 2013-05-15 13:35:05 +0000
51@@ -3,6 +3,9 @@
52 #
53 # This file is part of txzookeeper.
54 #
55+# Authors:
56+# Kapil Thangavelu
57+#
58 # txzookeeper is free software: you can redistribute it and/or modify
59 # it under the terms of the GNU Lesser General Public License as published by
60 # the Free Software Foundation, either version 3 of the License, or
61@@ -42,8 +45,8 @@
62 name="txzookeeper",
63 version=version,
64 description="Twisted api for Apache Zookeeper",
65- author="Ensemble Developers",
66- author_email="ensemble@lists.ubuntu.com",
67+ author="Juju Developers",
68+ author_email="juju@lists.ubuntu.com",
69 url="https://launchpad.net/txzookeeper",
70 license="LGPL",
71 packages=find_packages(),
72@@ -55,6 +58,6 @@
73 "Intended Audience :: Information Technology",
74 "Programming Language :: Python",
75 "Topic :: Database",
76- "License :: OSI Approved :: MIT License",
77+ "License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)"
78 ],
79 )
80
81=== modified file 'txzookeeper/__init__.py'
82--- txzookeeper/__init__.py 2011-07-06 02:10:17 +0000
83+++ txzookeeper/__init__.py 2013-05-15 13:35:05 +0000
84@@ -1,8 +1,11 @@
85 #
86-# Copyright (C) 2010, 2011 Canonical Ltd. All Rights Reserved
87+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
88 #
89 # This file is part of txzookeeper.
90 #
91+# Authors:
92+# Kapil Thangavelu
93+#
94 # txzookeeper is free software: you can redistribute it and/or modify
95 # it under the terms of the GNU Lesser General Public License as published by
96 # the Free Software Foundation, either version 3 of the License, or
97@@ -22,4 +25,4 @@
98 __all__ = ["ZookeeperClient"]
99
100 # Remember to update debian/changelog as well, for the daily build.
101-version = "0.8.0"
102+version = "0.9.8"
103
104=== modified file 'txzookeeper/client.py'
105--- txzookeeper/client.py 2011-06-30 20:16:17 +0000
106+++ txzookeeper/client.py 2013-05-15 13:35:05 +0000
107@@ -1,8 +1,11 @@
108 #
109-# Copyright (C) 2010, 2011 Canonical Ltd. All Rights Reserved
110+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
111 #
112 # This file is part of txzookeeper.
113 #
114+# Authors:
115+# Kapil Thangavelu
116+#
117 # txzookeeper is free software: you can redistribute it and/or modify
118 # it under the terms of the GNU Lesser General Public License as published by
119 # the Free Software Foundation, either version 3 of the License, or
120@@ -33,6 +36,9 @@
121 "scheme": "world",
122 "id": "anyone"}
123
124+# Skip acls for policy objects @ a higher level
125+SKIP_ACLS = object()
126+
127 # Map result codes to exceptions classes.
128 ERROR_MAPPING = {
129 zookeeper.APIERROR: zookeeper.ApiErrorException,
130@@ -67,6 +73,7 @@
131 zookeeper.CONNECTED_STATE: "connected",
132 zookeeper.CONNECTING_STATE: "connecting",
133 zookeeper.EXPIRED_SESSION_STATE: "expired",
134+ None: "unknown",
135 }
136
137 # Mapping of event type to human string.
138@@ -76,7 +83,9 @@
139 zookeeper.CREATED_EVENT: "created",
140 zookeeper.DELETED_EVENT: "deleted",
141 zookeeper.CHANGED_EVENT: "changed",
142- zookeeper.CHILD_EVENT: "child"}
143+ zookeeper.CHILD_EVENT: "child",
144+ None: "unknown",
145+ }
146
147
148 class NotConnectedException(zookeeper.ZooKeeperException):
149@@ -99,9 +108,14 @@
150 def type_name(self):
151 return TYPE_NAME_MAPPING[self.args[1]]
152
153+ @property
154+ def handle(self):
155+ return self.args[3]
156+
157 def __str__(self):
158- return "<txzookeeper.ConnectionException type: %s state: %s>" % (
159- self.type_name, self.state_name)
160+ return (
161+ "<txzookeeper.ConnectionException handle: %s type: %s state: %s>"
162+ % (self.handle, self.type_name, self.state_name))
163
164
165 def is_connection_exception(e):
166@@ -122,7 +136,8 @@
167 """
168
169
170-class ClientEvent(namedtuple("ClientEvent", 'type, connection_state, path')):
171+class ClientEvent(namedtuple("ClientEvent",
172+ 'type, connection_state, path, handle')):
173 """
174 A client event is returned when a watch deferred fires. It denotes
175 some event on the zookeeper client that the watch was requested on.
176@@ -137,8 +152,8 @@
177 return STATE_NAME_MAPPING[self.connection_state]
178
179 def __repr__(self):
180- return "<ClientEvent %s at %r state: %s>" % (
181- self.type_name, self.path, self.state_name)
182+ return "<ClientEvent %s at %r state: %s handle:%s>" % (
183+ self.type_name, self.path, self.state_name, self.handle)
184
185
186 class ZookeeperClient(object):
187@@ -166,12 +181,21 @@
188 self.connected = False
189 self.handle = None
190
191+ def __repr__(self):
192+ if not self.client_id:
193+ session_id = ""
194+ else:
195+ session_id = self.client_id[0]
196+
197+ return "<%s session: %s handle: %r state: %s>" % (
198+ self.__class__.__name__, session_id, self.handle, self.state)
199+
200 def _check_connected(self, d):
201 if not self.connected:
202 d.errback(NotConnectedException("not connected"))
203 return d
204
205- def _check_result(self, result_code, deferred, extra_codes=()):
206+ def _check_result(self, result_code, deferred, extra_codes=(), path=None):
207 """Check an API call or result for errors.
208
209 :param result_code: The api result code.
210@@ -184,18 +208,18 @@
211 error = None
212 if not result_code == zookeeper.OK and not result_code in extra_codes:
213 error_msg = zookeeper.zerror(result_code)
214+ if path is not None:
215+ error_msg += " %s" % path
216 error_class = ERROR_MAPPING.get(
217 result_code, zookeeper.ZooKeeperException)
218 error = error_class(error_msg)
219
220 if is_connection_exception(error):
221- # Mark the client as disconnected.
222- self.connected = False
223 # Route connection errors to a connection level error
224 # handler if specified.
225 if self._connection_error_callback:
226 # The result of the connection error handler is returned
227- # to the api.
228+ # to the api invoker.
229 d = defer.maybeDeferred(
230 self._connection_error_callback,
231 self, error)
232@@ -212,14 +236,14 @@
233 return d
234
235 def _cb_get(result_code, value, stat):
236- if self._check_result(result_code, d):
237+ if self._check_result(result_code, d, path=path):
238 return
239 d.callback((value, stat))
240
241 callback = self._zk_thread_callback(_cb_get)
242- watcher = self._wrap_watcher(watcher)
243+ watcher = self._wrap_watcher(watcher, "get", path)
244 result = zookeeper.aget(self.handle, path, watcher, callback)
245- self._check_result(result, d)
246+ self._check_result(result, d, path=path)
247 return d
248
249 def _get_children(self, path, watcher):
250@@ -228,14 +252,14 @@
251 return d
252
253 def _cb_get_children(result_code, children):
254- if self._check_result(result_code, d):
255+ if self._check_result(result_code, d, path=path):
256 return
257 d.callback(children)
258
259 callback = self._zk_thread_callback(_cb_get_children)
260- watcher = self._wrap_watcher(watcher)
261+ watcher = self._wrap_watcher(watcher, "child", path)
262 result = zookeeper.aget_children(self.handle, path, watcher, callback)
263- self._check_result(result, d)
264+ self._check_result(result, d, path=path)
265 return d
266
267 def _exists(self, path, watcher):
268@@ -245,17 +269,17 @@
269
270 def _cb_exists(result_code, stat):
271 if self._check_result(
272- result_code, d, extra_codes=(zookeeper.NONODE,)):
273+ result_code, d, extra_codes=(zookeeper.NONODE,), path=path):
274 return
275 d.callback(stat)
276
277 callback = self._zk_thread_callback(_cb_exists)
278- watcher = self._wrap_watcher(watcher)
279+ watcher = self._wrap_watcher(watcher, "exists", path)
280 result = zookeeper.aexists(self.handle, path, watcher, callback)
281- self._check_result(result, d)
282+ self._check_result(result, d, path=path)
283 return d
284
285- def _wrap_watcher(self, watcher):
286+ def _wrap_watcher(self, watcher, watch_type, path):
287 if watcher is None:
288 return watcher
289 if not callable(watcher):
290@@ -273,18 +297,31 @@
291 if event_type == zookeeper.SESSION_EVENT:
292 if self._session_event_callback:
293 self._session_event_callback(
294- self, ClientEvent(event_type, conn_state, path))
295+ self, ClientEvent(
296+ event_type, conn_state, path, self.handle))
297+ # We do propagate to watch deferreds, in one case in
298+ # particular, namely if the session is expired, in which
299+ # case the watches are dead, and we send an appropriate
300+ # error.
301+ if conn_state == zookeeper.EXPIRED_SESSION_STATE:
302+ error = zookeeper.SessionExpiredException("Session expired")
303+ return watcher(None, None, None, error=error)
304 else:
305 return watcher(event_type, conn_state, path)
306
307- def _zk_thread_callback(self, func):
308+ def _zk_thread_callback(self, func, *f_args, **f_kw):
309 """
310 The client library invokes callbacks in a separate thread, we wrap
311 any user defined callback so that they are called back in the main
312 thread after, zookeeper calls the wrapper.
313 """
314+ f_args = list(f_args)
315+
316 def wrapper(handle, *args): # pragma: no cover
317- reactor.callFromThread(func, *args)
318+ # make a copy, the conn watch callback gets invoked multiple times
319+ cb_args = list(f_args)
320+ cb_args.extend(args)
321+ reactor.callFromThread(func, *cb_args, **f_kw)
322 return wrapper
323
324 @property
325@@ -298,7 +335,8 @@
326 @property
327 def session_timeout(self):
328 """
329- What's the negotiated session timeout for this connection, in seconds.
330+ The negotiated session timeout for this connection, in milliseconds.
331+
332 If the client is not connected the value is None.
333 """
334 if self.connected:
335@@ -326,7 +364,11 @@
336 """
337 if self.handle is None:
338 return None
339- return zookeeper.client_id(self.handle)
340+ try:
341+ return zookeeper.client_id(self.handle)
342+ # Invalid handle
343+ except zookeeper.ZooKeeperException:
344+ return None
345
346 @property
347 def unrecoverable(self):
348@@ -334,7 +376,11 @@
349 Boolean value representing whether the current connection can be
350 recovered.
351 """
352- return bool(zookeeper.is_unrecoverable(self.handle))
353+ try:
354+ return bool(zookeeper.is_unrecoverable(self.handle))
355+ except zookeeper.ZooKeeperException:
356+ # guard against invalid handles
357+ return True
358
359 def add_auth(self, scheme, identity):
360 """Adds an authentication identity to this connection.
361@@ -368,15 +414,21 @@
362 @param force: boolean, require the connection to be closed now or
363 an exception be raised.
364 """
365- if not self.connected:
366- return
367-
368- result = zookeeper.close(self.handle)
369 self.connected = False
370+
371+ if self.handle is None:
372+ return
373+
374+ try:
375+ result = zookeeper.close(self.handle)
376+ except zookeeper.ZooKeeperException:
377+ self.handle = None
378+ return
379+
380 d = defer.Deferred()
381-
382 if self._check_result(result, d):
383 return d
384+ self.handle = None
385 d.callback(True)
386 return d
387
388@@ -401,6 +453,12 @@
389
390 # Use a scheduled function to ensure a timeout.
391 def _check_timeout():
392+ # Close the handle
393+ try:
394+ if self.handle is not None:
395+ zookeeper.close(self.handle)
396+ except zookeeper.ZooKeeperException:
397+ pass
398 d.errback(
399 ConnectionTimeoutException("could not connect before timeout"))
400
401@@ -416,14 +474,13 @@
402 if servers is not None:
403 self._servers = servers
404
405- # Assemble client id if specified.
406+ # Use client id if specified.
407 if client_id:
408 self.handle = zookeeper.init(
409 self._servers, callback, self._session_timeout, client_id)
410 else:
411 self.handle = zookeeper.init(
412 self._servers, callback, self._session_timeout)
413-
414 return d
415
416 def _cb_connected(
417@@ -446,15 +503,20 @@
418 # If we timed out and then connected, then close the conn.
419 if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called:
420 self.close()
421+ self.handle = -1
422+ return
423
424 # Send session events to the callback, in addition to any
425 # duplicate session events that will be sent for extant watches.
426 if self._session_event_callback:
427 self._session_event_callback(
428- self, ClientEvent(type, state, path))
429+ self, ClientEvent(type, state, path, self.handle))
430
431 return
432- elif state == zookeeper.CONNECTED_STATE:
433+ # Connected successfully, or If we're expired on an initial
434+ # connect, someone else expired us.
435+ elif state in (zookeeper.CONNECTED_STATE,
436+ zookeeper.EXPIRED_SESSION_STATE):
437 connect_deferred.callback(self)
438 return
439
440@@ -470,21 +532,25 @@
441 @params acls: A list of dictionaries specifying permissions.
442 @params flags: Node creation flags (ephemeral, sequence, persistent)
443 """
444+ if acls == SKIP_ACLS:
445+ acls = [ZOO_OPEN_ACL_UNSAFE]
446+
447 d = defer.Deferred()
448 if self._check_connected(d):
449 return d
450
451- def _cb_created(result_code, path):
452- if self._check_result(result_code, d):
453- return
454- d.callback(path)
455-
456- callback = self._zk_thread_callback(_cb_created)
457+ callback = self._zk_thread_callback(
458+ self._cb_created, d, data, acls, flags)
459 result = zookeeper.acreate(
460 self.handle, path, data, acls, flags, callback)
461- self._check_result(result, d)
462+ self._check_result(result, d, path=path)
463 return d
464
465+ def _cb_created(self, d, data, acls, flags, result_code, path):
466+ if self._check_result(result_code, d, path=path):
467+ return
468+ d.callback(path)
469+
470 def delete(self, path, version=-1):
471 """
472 Delete the node at the given path. If the current node version on the
473@@ -496,19 +562,17 @@
474 @param version: the integer version of the node.
475 """
476 d = defer.Deferred()
477- if self._check_connected(d):
478- return d
479-
480- def _cb_delete(result_code):
481- if self._check_result(result_code, d):
482- return
483- d.callback(result_code)
484-
485- callback = self._zk_thread_callback(_cb_delete)
486+
487+ callback = self._zk_thread_callback(self._cb_deleted, d, path)
488 result = zookeeper.adelete(self.handle, path, version, callback)
489- self._check_result(result, d)
490+ self._check_result(result, d, path=path)
491 return d
492
493+ def _cb_deleted(self, d, path, result_code):
494+ if self._check_result(result_code, d, path=path):
495+ return
496+ d.callback(result_code)
497+
498 def exists(self, path):
499 """
500 Check that the given node path exists. Returns a deferred that
501@@ -531,8 +595,12 @@
502 """
503 d = defer.Deferred()
504
505- def watcher(*args):
506- d.callback(ClientEvent(*args))
507+ def watcher(event_type, conn_state, path, error=None):
508+ if error:
509+ d.errback(error)
510+ else:
511+ d.callback(ClientEvent(
512+ event_type, conn_state, path, self.handle))
513 return self._exists(path, watcher), d
514
515 def get(self, path):
516@@ -556,8 +624,12 @@
517 """
518 d = defer.Deferred()
519
520- def watcher(*args):
521- d.callback(ClientEvent(*args))
522+ def watcher(event_type, conn_state, path, error=None):
523+ if error:
524+ d.errback(error)
525+ else:
526+ d.callback(ClientEvent(
527+ event_type, conn_state, path, self.handle))
528 return self._get(path, watcher), d
529
530 def get_children(self, path):
531@@ -580,8 +652,12 @@
532 """
533 d = defer.Deferred()
534
535- def watcher(*args):
536- d.callback(ClientEvent(*args))
537+ def watcher(event_type, conn_state, path, error=None):
538+ if error:
539+ d.errback(error)
540+ else:
541+ d.callback(ClientEvent(
542+ event_type, conn_state, path, self.handle))
543 return self._get_children(path, watcher), d
544
545 def get_acl(self, path):
546@@ -598,13 +674,13 @@
547 return d
548
549 def _cb_get_acl(result_code, acls, stat):
550- if self._check_result(result_code, d):
551+ if self._check_result(result_code, d, path=path):
552 return
553 d.callback((acls, stat))
554
555 callback = self._zk_thread_callback(_cb_get_acl)
556 result = zookeeper.aget_acl(self.handle, path, callback)
557- self._check_result(result, d)
558+ self._check_result(result, d, path=path)
559 return d
560
561 def set_acl(self, path, acls, version=-1):
562@@ -634,17 +710,17 @@
563 if self._check_connected(d):
564 return d
565
566- def _cb_set_acl(result_code):
567- if self._check_result(result_code, d):
568- return
569- d.callback(result_code)
570-
571- callback = self._zk_thread_callback(_cb_set_acl)
572+ callback = self._zk_thread_callback(self._cb_set_acl, d, path, acls)
573 result = zookeeper.aset_acl(
574 self.handle, path, version, acls, callback)
575- self._check_result(result, d)
576+ self._check_result(result, d, path=path)
577 return d
578
579+ def _cb_set_acl(self, d, path, acls, result_code):
580+ if self._check_result(result_code, d, path=path):
581+ return
582+ d.callback(result_code)
583+
584 def set(self, path, data="", version=-1):
585 """
586 Sets the data of a node at the given path. If the current node version
587@@ -660,16 +736,16 @@
588 if self._check_connected(d):
589 return d
590
591- def _cb_set(result_code, node_stat):
592- if self._check_result(result_code, d):
593- return
594- d.callback(node_stat)
595-
596- callback = self._zk_thread_callback(_cb_set)
597+ callback = self._zk_thread_callback(self._cb_set, d, path, data)
598 result = zookeeper.aset(self.handle, path, data, version, callback)
599- self._check_result(result, d)
600+ self._check_result(result, d, path=path)
601 return d
602
603+ def _cb_set(self, d, path, data, result_code, node_stat):
604+ if self._check_result(result_code, d, path=path):
605+ return
606+ d.callback(node_stat)
607+
608 def set_connection_watcher(self, watcher):
609 """
610 Sets a permanent global watcher on the connection. This will get
611@@ -679,7 +755,7 @@
612 """
613 if not callable(watcher):
614 raise SyntaxError("Invalid Watcher %r" % (watcher))
615- watcher = self._wrap_watcher(watcher)
616+ watcher = self._wrap_watcher(watcher, None, None)
617 zookeeper.set_watcher(self.handle, watcher)
618
619 def set_session_callback(self, callback):
620@@ -717,9 +793,13 @@
621 """
622 if not callable(callback):
623 raise TypeError("Invalid callback %r" % callback)
624+ if self._connection_error_callback is not None:
625+ raise RuntimeError((
626+ "Connection error handlers can't be changed %s" %
627+ self._connection_error_callback))
628 self._connection_error_callback = callback
629
630- def set_determinstic_order(self, boolean):
631+ def set_deterministic_order(self, boolean):
632 """
633 The zookeeper client will by default randomize the server hosts
634 it will connect to unless this is set to True.
635@@ -738,11 +818,11 @@
636 return d
637
638 def _cb_sync(result_code, path):
639- if self._check_result(result_code, d):
640+ if self._check_result(result_code, d, path=path):
641 return
642 d.callback(path)
643
644 callback = self._zk_thread_callback(_cb_sync)
645 result = zookeeper.async(self.handle, path, callback)
646- self._check_result(result, d)
647+ self._check_result(result, d, path=path)
648 return d
649
650=== modified file 'txzookeeper/lock.py'
651--- txzookeeper/lock.py 2011-06-17 23:52:34 +0000
652+++ txzookeeper/lock.py 2013-05-15 13:35:05 +0000
653@@ -1,8 +1,11 @@
654 #
655-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
656+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
657 #
658 # This file is part of txzookeeper.
659 #
660+# Authors:
661+# Kapil Thangavelu
662+#
663 # txzookeeper is free software: you can redistribute it and/or modify
664 # it under the terms of the GNU Lesser General Public License as published by
665 # the Free Software Foundation, either version 3 of the License, or
666@@ -18,7 +21,7 @@
667 #
668
669 import zookeeper
670-from twisted.internet.defer import Deferred, fail
671+from twisted.internet.defer import fail
672
673
674 class LockError(Exception):
675@@ -133,7 +136,7 @@
676 """Release the lock."""
677
678 if not self._acquired:
679- error = LockError("Not holding lock %s"%(self.path))
680+ error = LockError("Not holding lock %s" % (self.path))
681 return fail(error)
682
683 d = self._client.delete(self._candidate_path)
684
685=== added file 'txzookeeper/managed.py'
686--- txzookeeper/managed.py 1970-01-01 00:00:00 +0000
687+++ txzookeeper/managed.py 2013-05-15 13:35:05 +0000
688@@ -0,0 +1,413 @@
689+
690+from functools import partial
691+
692+import contextlib
693+import logging
694+import time
695+import zookeeper
696+
697+from twisted.internet.defer import (
698+ inlineCallbacks, DeferredLock, fail, returnValue, Deferred)
699+
700+from client import (
701+ ZookeeperClient, ClientEvent, NotConnectedException,
702+ ConnectionTimeoutException)
703+from retry import RetryClient, is_session_error
704+from utils import sleep
705+
706+
707+class StopWatcher(Exception):
708+ pass
709+
710+BACKOFF_INCREMENT = 10
711+MAX_BACKOFF = 360
712+
713+WATCH_KIND_MAP = {
714+ "child": "get_children_and_watch",
715+ "exists": "exists_and_watch",
716+ "get": "get_and_watch"}
717+
718+
719+log = logging.getLogger("txzk.managed")
720+#log.setLevel(logging.INFO)
721+
722+
723+class Watch(object):
724+ """
725+ For application driven persistent watches, where the application
726+ is manually resetting the watch.
727+ """
728+
729+ __slots__ = ("_mgr", "_client", "_path", "_kind", "_callback")
730+
731+ def __init__(self, mgr, path, kind, callback):
732+ self._mgr = mgr
733+ self._path = path
734+ self._kind = kind
735+ self._callback = callback
736+
737+ @property
738+ def path(self):
739+ return self._path
740+
741+ @property
742+ def kind(self):
743+ return self._kind
744+
745+ @contextlib.contextmanager
746+ def _ctx(self):
747+ mgr = self._mgr
748+ del self._mgr
749+ try:
750+ yield mgr
751+ finally:
752+ mgr.remove(self)
753+
754+ @inlineCallbacks
755+ def reset(self):
756+ with self._ctx():
757+ yield self._callback(
758+ zookeeper.SESSION_EVENT,
759+ zookeeper.CONNECTED_STATE,
760+ self._path)
761+
762+ def __call__(self, *args, **kw):
763+ with self._ctx():
764+ return self._callback(*args, **kw)
765+
766+ def __str__(self):
767+ return "<Watcher %s %s %r>" % (self.kind, self.path, self._callback)
768+
769+
770+class WatchManager(object):
771+
772+ watch_class = Watch
773+
774+ def __init__(self):
775+ self._watches = []
776+
777+ def add(self, path, watch_type, watcher):
778+ w = self.watch_class(self, path, watch_type, watcher)
779+ self._watches.append(w)
780+ return w
781+
782+ def remove(self, w):
783+ try:
784+ self._watches.remove(w)
785+ except ValueError:
786+ pass
787+
788+ def iterkeys(self):
789+ for w in self._watches:
790+ yield (w.path, w.kind)
791+
792+ def clear(self):
793+ del self._watches
794+ self._watches = []
795+
796+ @inlineCallbacks
797+ def reset(self, *ignored):
798+ watches = self._watches
799+ self._watches = []
800+
801+ for w in watches:
802+ try:
803+ yield w.reset()
804+ except Exception, e:
805+ log.error("Error reseting watch %s with session event. %s %r",
806+ w, e, e)
807+ continue
808+
809+
810+class SessionClient(ZookeeperClient):
811+ """A managed client that automatically re-establishes ephemerals and
812+ triggers watches after reconnecting post session expiration.
813+
814+ This abstracts the client from session expiration handling. It does
815+ come at a cost though.
816+
817+ There are two application constraints that need to be considered for usage
818+ of the SessionClient or ManagedClient. The first is that watch callbacks
819+ which examine the event, must be able to handle the synthetic session
820+ event which is sent to them when the session is re-established.
821+
822+ The second and more problematic is that algorithms/patterns
823+ utilizing ephemeral sequence nodes need to be rethought, as the
824+ session client will recreate the nodes when reconnecting at their
825+ previous paths. Some algorithms (like the std zk lock recipe) rely
826+ on properties like the smallest valued ephemeral sequence node in
827+ a container to identify the lock holder, with the notion that upon
828+ session expiration a new lock/leader will be sought. Sequence
829+ ephemeral node recreation in this context is problematic as the
830+ node is recreated at the exact previous path. Alternative lock
831+ strategies that do work are fairly simple at low volume, such as
832+ owning a particular node path (ie. /locks/lock-holder) with an
833+ ephemeral.
834+
835+ As a result the session client only tracks and restablishes non sequence
836+ ephemeral nodes. For coordination around ephemeral sequence nodes it
837+ provides for watching for the establishment of new sessions via
838+ `subscribe_new_session`
839+ """
840+
841+ def __init__(self, servers=None, session_timeout=None,
842+ connect_timeout=4000):
843+ """
844+ """
845+ super(SessionClient, self).__init__(servers, session_timeout)
846+ self._connect_timeout = connect_timeout
847+ self._watches = WatchManager()
848+ self._ephemerals = {}
849+ self._session_notifications = []
850+ self._reconnect_lock = DeferredLock()
851+ self.set_connection_error_callback(self._cb_connection_error)
852+ self.set_session_callback(self._cb_session_event)
853+ self._backoff_seconds = 0
854+ self._last_reconnect = time.time()
855+
856+ def subscribe_new_session(self):
857+ d = Deferred()
858+ self._session_notifications.append(d)
859+ return d
860+
861+ @inlineCallbacks
862+ def cb_restablish_session(self, e=None, forced=False):
863+ """Called on intercept of session expiration to create new session.
864+
865+ This will reconnect to zk, re-establish ephemerals, and
866+ trigger watches.
867+ """
868+ yield self._reconnect_lock.acquire()
869+ log.debug(
870+ "Connection reconnect, lock acquired handle:%d", self.handle)
871+
872+ try:
873+ # If its been explicitly closed, don't re-establish.
874+ if self.handle is None:
875+ log.debug("No handle, client closed")
876+ return
877+
878+ # Don't allow forced reconnect hurds within a session.
879+ if forced and (
880+ (time.time() - self._last_reconnect)
881+ < self.session_timeout / 1000.0):
882+ forced = False
883+
884+ if not forced and not self.unrecoverable:
885+ log.debug("Client already connected, allowing retry")
886+ return
887+ elif self.connected or self.handle >= 0:
888+ self.close()
889+ self.handle = -1
890+
891+ # Re-establish
892+ yield self._cb_restablish_session().addErrback(
893+ self._cb_restablish_errback, e)
894+
895+ except Exception, e:
896+ log.error("error while re-establish %r %s" % (e, e))
897+ finally:
898+ log.debug("Reconnect lock released %s", self)
899+ yield self._reconnect_lock.release()
900+
901+ @inlineCallbacks
902+ def _cb_restablish_session(self):
903+ """Re-establish a new session, and recreate ephemerals and watches.
904+ """
905+ # Reconnect
906+ while 1:
907+ log.debug("Reconnect loop - connect timeout %d",
908+ self._connect_timeout)
909+
910+ # If we have some failures, back off
911+ if self._backoff_seconds:
912+ log.debug("Backing off reconnect %d" % self._backoff_seconds)
913+ yield sleep(self._backoff_seconds)
914+
915+ # The client was explicitly closed, abort reconnect.
916+ if self.handle is None:
917+ returnValue(self.handle)
918+ try:
919+ yield self.connect(timeout=self._connect_timeout)
920+ log.info("Restablished connection")
921+ self._last_reconnect = time.time()
922+ except ConnectionTimeoutException:
923+ log.debug("Timeout establishing connection, retrying...")
924+ except zookeeper.ZooKeeperException, e:
925+ log.exception("Error while connecting %r %s" % (e, e))
926+ except Exception, e:
927+ log.info("Reconnect unknown error, aborting: %s", e)
928+ raise
929+ else:
930+ break
931+
932+ if self._backoff_seconds < MAX_BACKOFF:
933+ self._backoff_seconds = min(
934+ self._backoff_seconds + BACKOFF_INCREMENT,
935+ MAX_BACKOFF - 1)
936+
937+ # Recreate ephemerals
938+ items = self._ephemerals.items()
939+ self._ephemerals = {}
940+
941+ for path, e in items:
942+ try:
943+ yield self.create(
944+ path, e['data'], acls=e['acls'], flags=e['flags'])
945+ except zookeeper.NodeExistsException:
946+ log.error("Attempt to create ephemeral node failed %r", path)
947+
948+ # Signal watches
949+ yield self._watches.reset()
950+
951+ # Notify new session observers
952+ notifications = self._session_notifications
953+ self._session_notifications = []
954+
955+ # all good, reset backoff
956+ self._backoff_seconds = 0
957+
958+ for n in notifications:
959+ n.callback(True)
960+
961+ def _cb_restablish_errback(self, err, failure):
962+ """If there's an error re-establishing the session log it.
963+ """
964+ log.error("Error while trying to re-establish connection %s\n%s" % (
965+ err, failure))
966+ return failure
967+
968+ @inlineCallbacks
969+ def _cb_connection_error(self, client, error):
970+ """Convert session expiration to a transient connection error.
971+
972+ Dispatches from api usage error.
973+ """
974+ if not is_session_error(error):
975+ raise error
976+ log.debug("Connection error detected, delaying retry...")
977+ yield sleep(1)
978+ raise zookeeper.ConnectionLossException
979+
980+ # Dispatch from retry exceed session maximum
981+ def cb_retry_expired(self, error):
982+ log.debug("Persistent retry error, reconnecting...")
983+ return self.cb_restablish_session(forced=True)
984+
985+ # Dispatch from connection events
986+ def _cb_session_event(self, client, event):
987+ if (event.type == zookeeper.SESSION_EVENT and
988+ event.connection_state == zookeeper.EXPIRED_SESSION_STATE):
989+ log.debug("Client session expired event, restablishing")
990+ self.cb_restablish_session()
991+
992+ # Client connected tracker on client operations.
993+ def _check_connected(self, d):
994+ """Clients are automatically reconnected."""
995+ if self.connected:
996+ return
997+
998+ if self.handle is None:
999+ d.errback(NotConnectedException("Connection closed"))
1000+ return d
1001+
1002+ log.info("Detected dead connection, reconnecting...")
1003+ c_d = self.cb_restablish_session()
1004+
1005+ def after_connected(client):
1006+ """Return a transient connection failure.
1007+
1008+ The retry client will automatically attempt to retry the operation.
1009+ """
1010+ log.debug("Reconnected, returning transient error")
1011+ return fail(zookeeper.ConnectionLossException("Retry"))
1012+
1013+ c_d.addCallback(after_connected)
1014+ c_d.chainDeferred(d)
1015+ return d
1016+
1017+ # Dispatch from node watches on session expiration
1018+ def _watch_session_wrapper(self, watcher, event_type, conn_state, path):
1019+ """Watch wrapper that diverts session events to a connection callback.
1020+ """
1021+ if (event_type == zookeeper.SESSION_EVENT and
1022+ conn_state == zookeeper.EXPIRED_SESSION_STATE):
1023+ if self.unrecoverable:
1024+ log.debug("Watch got session expired event, reconnecting...")
1025+ d = self.cb_restablish_session()
1026+ d.addErrback(self._cb_restablish_errback)
1027+ return d
1028+
1029+ if event_type == zookeeper.SESSION_EVENT:
1030+ if self._session_event_callback:
1031+ self._session_event_callback(
1032+ self, ClientEvent(
1033+ event_type, conn_state, path, self.handle))
1034+ else:
1035+ return watcher(event_type, conn_state, path)
1036+
1037+ # Track all watches
1038+ def _wrap_watcher(self, watcher, watch_type, path):
1039+ if watcher is None:
1040+ return watcher
1041+ if not callable(watcher):
1042+ raise SyntaxError("invalid watcher")
1043+
1044+ # handle conn watcher, separately.
1045+ if watch_type is None and path is None:
1046+ return self._zk_thread_callback(
1047+ self._watch_session_wrapper, watcher)
1048+
1049+ return self._zk_thread_callback(
1050+ partial(
1051+ self._watch_session_wrapper,
1052+ self._watches.add(path, watch_type, watcher)))
1053+
1054+ # Track ephemerals
1055+ def _cb_created(self, d, data, acls, flags, result_code, path):
1056+ if self._check_result(result_code, d, path=path):
1057+ return
1058+
1059+ if (flags & zookeeper.EPHEMERAL) and not (flags & zookeeper.SEQUENCE):
1060+ self._ephemerals[path] = dict(
1061+ data=data, acls=acls, flags=flags)
1062+
1063+ d.callback(path)
1064+
1065+ def _cb_deleted(self, d, path, result_code):
1066+ if self._check_result(result_code, d, path=path):
1067+ return
1068+
1069+ self._ephemerals.pop(path, None)
1070+ d.callback(result_code)
1071+
1072+ def _cb_set_acl(self, d, path, acls, result_code):
1073+ if self._check_result(result_code, d, path=path):
1074+ return
1075+
1076+ if path in self._ephemerals:
1077+ self._ephemerals[path]['acls'] = acls
1078+
1079+ d.callback(result_code)
1080+
1081+ def _cb_set(self, d, path, data, result_code, node_stat):
1082+ if self._check_result(result_code, d, path=path):
1083+ return
1084+
1085+ if path in self._ephemerals:
1086+ self._ephemerals[path]['data'] = data
1087+
1088+ d.callback(node_stat)
1089+
1090+
1091+class _ManagedClient(RetryClient):
1092+
1093+ def subscribe_new_session(self):
1094+ return self.client.subscribe_new_session()
1095+
1096+
1097+def ManagedClient(servers=None, session_timeout=None, connect_timeout=10000):
1098+ client = SessionClient(servers, session_timeout, connect_timeout)
1099+ managed_client = _ManagedClient(client)
1100+ managed_client.set_retry_error_callback(client.cb_retry_expired)
1101+ return managed_client
1102
1103=== modified file 'txzookeeper/node.py'
1104--- txzookeeper/node.py 2011-06-17 23:52:34 +0000
1105+++ txzookeeper/node.py 2013-05-15 13:35:05 +0000
1106@@ -1,8 +1,11 @@
1107 #
1108-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
1109+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
1110 #
1111 # This file is part of txzookeeper.
1112 #
1113+# Authors:
1114+# Kapil Thangavelu
1115+#
1116 # txzookeeper is free software: you can redistribute it and/or modify
1117 # it under the terms of the GNU Lesser General Public License as published by
1118 # the Free Software Foundation, either version 3 of the License, or
1119@@ -127,16 +130,9 @@
1120 Returns a boolean based on the node's existence. Also returns a
1121 deferred that fires when the node is modified/created/added/deleted.
1122 """
1123- node_changed = Deferred()
1124-
1125- def on_node_event((event, state, path)):
1126- return node_changed.callback(
1127- NodeEvent(event, state, self))
1128-
1129 d, w = self._context.exists_and_watch(self.path)
1130- w.addCallback(on_node_event)
1131 d.addCallback(self._on_exists_success)
1132- return d, node_changed
1133+ return d, w
1134
1135 def _on_get_node_error(self, failure):
1136 failure.trap(NoNodeException)
1137@@ -162,17 +158,10 @@
1138 Retrieve the node's data and a deferred that fires when this data
1139 changes.
1140 """
1141- node_changed = Deferred()
1142-
1143- def on_node_change((event, status, path)):
1144- node_changed.callback(
1145- NodeEvent(event, status, self))
1146-
1147 d, w = self._context.get_and_watch(self.path)
1148- w.addCallback(on_node_change)
1149 d.addCallback(self._on_get_node_success)
1150 d.addErrback(self._on_get_node_error)
1151- return d, node_changed
1152+ return d, w
1153
1154 def set_data(self, data):
1155 """Set the node's data."""
1156@@ -233,17 +222,9 @@
1157 a deferred that fires if a child is added or deleted. Optionally
1158 a name prefix may be passed which the child node must abide.
1159 """
1160- children_changed = Deferred()
1161-
1162- def on_child_added_removed((event, status, path)):
1163- # path is the container not the child.
1164- children_changed.callback(
1165- NodeEvent(event, status, self))
1166-
1167 d, w = self._context.get_children_and_watch(self.path)
1168- w.addCallback(on_child_added_removed)
1169 d.addCallback(self._on_get_children_filter_results, prefix)
1170- return d, children_changed
1171+ return d, w
1172
1173 def __cmp__(self, other):
1174 return cmp(self.path, other.path)
1175
1176=== modified file 'txzookeeper/queue.py'
1177--- txzookeeper/queue.py 2011-06-30 17:14:06 +0000
1178+++ txzookeeper/queue.py 2013-05-15 13:35:05 +0000
1179@@ -1,8 +1,11 @@
1180 #
1181-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
1182+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
1183 #
1184 # This file is part of txzookeeper.
1185 #
1186+# Authors:
1187+# Kapil Thangavelu
1188+#
1189 # txzookeeper is free software: you can redistribute it and/or modify
1190 # it under the terms of the GNU Lesser General Public License as published by
1191 # the Free Software Foundation, either version 3 of the License, or
1192@@ -116,7 +119,7 @@
1193
1194 flags = zookeeper.SEQUENCE
1195 if not self._persistent:
1196- flags = flags|zookeeper.EPHEMERAL
1197+ flags = flags | zookeeper.EPHEMERAL
1198
1199 d = self._client.create(
1200 "/".join((self._path, self.prefix)), item, self._acl, flags)
1201
1202=== added file 'txzookeeper/retry.py'
1203--- txzookeeper/retry.py 1970-01-01 00:00:00 +0000
1204+++ txzookeeper/retry.py 2013-05-15 13:35:05 +0000
1205@@ -0,0 +1,359 @@
1206+#
1207+# Copyright (C) 2011 Canonical Ltd. All Rights Reserved
1208+#
1209+# This file is part of txzookeeper.
1210+#
1211+# Authors:
1212+# Kapil Thangavelu
1213+#
1214+# txzookeeper is free software: you can redistribute it and/or modify
1215+# it under the terms of the GNU Lesser General Public License as published by
1216+# the Free Software Foundation, either version 3 of the License, or
1217+# (at your option) any later version.
1218+#
1219+# txzookeeper is distributed in the hope that it will be useful,
1220+# but WITHOUT ANY WARRANTY; without even the implied warranty of
1221+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1222+# GNU Lesser General Public License for more details.
1223+#
1224+# You should have received a copy of the GNU Lesser General Public License
1225+# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
1226+#
1227+
1228+"""
1229+A retry client facade that transparently handles transient connection
1230+errors.
1231+"""
1232+
1233+import time
1234+import logging
1235+import zookeeper
1236+
1237+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
1238+
1239+from txzookeeper.client import NotConnectedException
1240+from txzookeeper.utils import sleep
1241+
1242+__all__ = ["retry", "RetryClient"]
1243+
1244+log = logging.getLogger("txzk.retry")
1245+
1246+
1247+class RetryException(Exception):
1248+ """Explicit retry exception.
1249+ """
1250+
1251+# Session timeout percentage that we should wait till retrying.
1252+RETRY_FRACTION = 30
1253+
1254+
1255+def is_retryable(e):
1256+ """Determine if an exception signifies a recoverable connection error.
1257+ """
1258+ return isinstance(
1259+ e,
1260+ (zookeeper.ClosingException,
1261+ zookeeper.ConnectionLossException,
1262+ zookeeper.OperationTimeoutException,
1263+ RetryException))
1264+
1265+
1266+def is_session_error(e):
1267+ return isinstance(
1268+ e,
1269+ (zookeeper.SessionExpiredException,
1270+ zookeeper.ConnectionLossException,
1271+ zookeeper.ClosingException,
1272+ NotConnectedException))
1273+
1274+
1275+def _args(args):
1276+ return args and args[0] or "NA"
1277+
1278+
1279+def get_delay(session_timeout, max_delay=5, session_fraction=RETRY_FRACTION):
1280+ """Get retry delay between retrying an operation.
1281+
1282+ Returns either the specified fraction of a session timeout or the
1283+ max delay, whichever is smaller.
1284+
1285+ The goal is to allow the connection time to auto-heal, before
1286+ retrying an operation.
1287+
1288+ :param session_timeout: The timeout for the session, in milliseconds
1289+ :param max_delay: The max delay for a retry, in seconds.
1290+ :param session_fraction: The fractional amount of a timeout to wait
1291+ """
1292+ retry_delay = (session_timeout * (float(session_fraction) / 100)) / 1000
1293+ return min(retry_delay, max_delay)
1294+
1295+
1296+def check_error(e):
1297+ """Verify a zookeeper connection error, as opposed to an app error.
1298+ """
1299+ return is_retryable(e) or is_session_error(e)
1300+
1301+
1302+def check_retryable(retry_client, max_time, error):
1303+ """Check an error and a client to see if an operation is retryable.
1304+
1305+ :param retry_client: A txzookeeper client
1306+ :param max_time: The max time (epoch tick) that the op is retryable till.
1307+ :param error: The client operation exception.
1308+ """
1309+
1310+ t = time.time()
1311+
1312+ # Only if the error is known.
1313+ if not is_retryable(error):
1314+ return False
1315+
1316+ # Only if we've haven't exceeded the max allotted time.
1317+ if max_time <= t:
1318+ return False
1319+
1320+ # Only if the client hasn't been explicitly closed.
1321+ if not retry_client.connected:
1322+ return False
1323+
1324+ # Only if the client is in a recoverable state.
1325+ if retry_client.unrecoverable:
1326+ return False
1327+
1328+ return True
1329+
1330+
1331+@inlineCallbacks
1332+def retry(client, func, *args, **kw):
1333+ """Constructs a retry wrapper around a function that retries invocations.
1334+
1335+ If the function execution results in an exception due to a transient
1336+ connection error, the retry wrapper will reinvoke the operation after
1337+ a suitable delay (fractional value of the session timeout).
1338+
1339+ :param client: A ZookeeperClient instance.
1340+ :param func: A callable python object that interacts with
1341+ zookeeper, the callable must utilize the same zookeeper
1342+ connection as passed in the `client` param. The function
1343+ must return a single value (either a deferred or result
1344+ value).
1345+ """
1346+ retry_started = [time.time()]
1347+ retry_error = False
1348+ while 1:
1349+ try:
1350+ value = yield func(*args, **kw)
1351+ except Exception, e:
1352+ # For clients which aren't connected (session timeout == None)
1353+ # we raise the errors to the callers.
1354+ session_timeout = client.session_timeout or 0
1355+
1356+ # The longest we keep retrying is 1.2 * session timeout
1357+ max_time = (session_timeout / 1000.0) * 1.2 + retry_started[0]
1358+
1359+ if not check_retryable(client, max_time, e):
1360+ # Check if its a persistent client error, and if so use the cb
1361+ # if present to try and reconnect for client errors.
1362+ if (check_error(e)
1363+ and time.time() > max_time
1364+ and callable(client.cb_retry_error)
1365+ and not retry_error):
1366+ log.debug("Retry error %r on %s @ %s",
1367+ e, func.__name__, _args(args))
1368+ retry_error = True
1369+ yield client.cb_retry_error(e)
1370+ retry_started[0] = time.time()
1371+ continue
1372+ raise
1373+
1374+ # Give the connection a chance to auto-heal.
1375+ yield sleep(get_delay(session_timeout))
1376+ log.debug("Retry on %s @ %s", func.__name__, _args(args))
1377+ continue
1378+
1379+ returnValue(value)
1380+
1381+
1382+def retry_watch(client, func, *args, **kw):
1383+ """Contructs a wrapper around a watch callable that retries invocations.
1384+
1385+ If the callable execution results in an exception due to a transient
1386+ connection error, the retry wrapper will reinvoke the operation after
1387+ a suitable delay (fractional value of the session timeout).
1388+
1389+ A watch function must return back a tuple of deferreds
1390+ (value_deferred, watch_deferred). No inline callbacks are
1391+ performed in here to ensure that callers continue to see a
1392+ tuple of results.
1393+
1394+ The client passed to this retry function must be the same as
1395+ the one utilized by the python callable.
1396+
1397+ :param client: A ZookeeperClient instance.
1398+ :param func: A python callable that interacts with zookeeper. If a
1399+ function is passed, a txzookeeper client must the first
1400+ parameter of this function. The function must return a
1401+ tuple of (value_deferred, watch_deferred)
1402+ """
1403+ # For clients which aren't connected (session timeout == None)
1404+ # we raise the usage errors to the callers
1405+ session_timeout = client.session_timeout or 0
1406+
1407+ # If we keep retrying past the 1.2 * session timeout without
1408+ # success just die, the session expiry is fatal.
1409+ max_time = session_timeout * 1.2 + time.time()
1410+ value_d, watch_d = func(*args, **kw)
1411+
1412+ def retry_delay(f):
1413+ """Errback, verifes an op is retryable, and delays the next retry.
1414+ """
1415+ # Check that operation is retryable.
1416+ if not check_retryable(client, max_time, f.value):
1417+ return f
1418+
1419+ # Give the connection a chance to auto-heal
1420+ d = sleep(get_delay(session_timeout))
1421+ d.addCallback(retry_inner)
1422+
1423+ return d
1424+
1425+ def retry_inner(value):
1426+ """Retry operation invoker.
1427+ """
1428+ # Invoke the function
1429+ retry_value_d, retry_watch_d = func(*args, **kw)
1430+
1431+ # If we need to retry again.
1432+ retry_value_d.addErrback(retry_delay)
1433+
1434+ # Chain the new watch deferred to the old, presuming its doa
1435+ # if the value deferred errored on a connection error.
1436+ retry_watch_d.chainDeferred(watch_d)
1437+
1438+ # Insert back into the callback chain.
1439+ return retry_value_d
1440+
1441+ # Attach the retry
1442+ value_d.addErrback(retry_delay)
1443+
1444+ return value_d, watch_d
1445+
1446+
1447+def _passproperty(name):
1448+ """Returns a method wrapper that delegates to a client's property.
1449+ """
1450+ def wrapper(retry_client):
1451+ return getattr(retry_client.client, name)
1452+ return property(wrapper)
1453+
1454+
1455+class RetryClient(object):
1456+ """A ZookeeperClient wrapper that transparently performs retries.
1457+
1458+ A zookeeper connection can experience transient connection failures
1459+ on any operation. As long as the session associated to the connection
1460+ is still active on the zookeeper cluster, libzookeeper can reconnect
1461+ automatically to the cluster and session and the client is able to
1462+ retry.
1463+
1464+ Whether a given operation is safe for retry depends on the application
1465+ in question and how's interacting with zookeeper.
1466+
1467+ In particular coordination around sequence nodes can be
1468+ problematic, as the client has no way of knowing if the operation
1469+ succeed or not without additional application specific context.
1470+
1471+ Idempotent operations against the zookeeper tree are generally
1472+ safe to retry.
1473+
1474+ This class provides a simple wrapper around a zookeeper client,
1475+ that will automatically perform retries on operations that
1476+ interact with the zookeeper tree, in the face of transient errors,
1477+ till the session timeout has been reached. All of the attributes
1478+ and methods of a zookeeper client are exposed.
1479+
1480+ All the methods of the client that interact with the zookeeper tree
1481+ are retry enabled.
1482+ """
1483+
1484+ def __init__(self, client):
1485+ self.client = client
1486+ self.client.cb_retry_error = None
1487+
1488+ def set_retry_error_callback(self, callback):
1489+ self.client.cb_retry_error = callback
1490+
1491+ def add_auth(self, *args, **kw):
1492+ return retry(self.client, self.client.add_auth, *args, **kw)
1493+
1494+ def create(self, *args, **kw):
1495+ return retry(self.client, self.client.create, *args, **kw)
1496+
1497+ def delete(self, *args, **kw):
1498+ return retry(self.client, self.client.delete, *args, **kw)
1499+
1500+ def exists(self, *args, **kw):
1501+ return retry(self.client, self.client.exists, *args, **kw)
1502+
1503+ def get(self, *args, **kw):
1504+ return retry(self.client, self.client.get, *args, **kw)
1505+
1506+ def get_acl(self, *args, **kw):
1507+ return retry(self.client, self.client.get_acl, *args, **kw)
1508+
1509+ def get_children(self, *args, **kw):
1510+ return retry(self.client, self.client.get_children, *args, **kw)
1511+
1512+ def set_acl(self, *args, **kw):
1513+ return retry(self.client, self.client.set_acl, *args, **kw)
1514+
1515+ def set(self, *args, **kw):
1516+ return retry(self.client, self.client.set, *args, **kw)
1517+
1518+ def sync(self, *args, **kw):
1519+ return retry(self.client, self.client.sync, *args, **kw)
1520+
1521+ # Watch retries
1522+
1523+ def exists_and_watch(self, *args, **kw):
1524+ return retry_watch(
1525+ self.client, self.client.exists_and_watch, *args, **kw)
1526+
1527+ def get_and_watch(self, *args, **kw):
1528+ return retry_watch(
1529+ self.client, self.client.get_and_watch, *args, **kw)
1530+
1531+ def get_children_and_watch(self, *args, **kw):
1532+ return retry_watch(
1533+ self.client, self.client.get_children_and_watch, *args, **kw)
1534+
1535+ # Passthrough methods
1536+
1537+ def set_connection_watcher(self, *args, **kw):
1538+ return self.client.set_connection_watcher(*args, **kw)
1539+
1540+ def set_connection_error_callback(self, *args, **kw):
1541+ return self.client.set_connection_error_callback(*args, **kw)
1542+
1543+ def set_session_callback(self, *args, **kw):
1544+ return self.client.set_session_callback(*args, **kw)
1545+
1546+ def set_determinstic_order(self, *args, **kw):
1547+ return self.client.set_determinstic_order(*args, **kw)
1548+
1549+ def close(self):
1550+ return self.client.close()
1551+
1552+ @inlineCallbacks
1553+ def connect(self, *args, **kw):
1554+ yield self.client.connect(*args, **kw)
1555+ returnValue(self)
1556+
1557+ # passthrough properties
1558+ state = _passproperty("state")
1559+ client_id = _passproperty("client_id")
1560+ session_timeout = _passproperty("session_timeout")
1561+ servers = _passproperty("servers")
1562+ handle = _passproperty("handle")
1563+ connected = _passproperty("connected")
1564+ unrecoverable = _passproperty("unrecoverable")
1565
1566=== modified file 'txzookeeper/tests/__init__.py'
1567--- txzookeeper/tests/__init__.py 2011-06-30 18:05:14 +0000
1568+++ txzookeeper/tests/__init__.py 2013-05-15 13:35:05 +0000
1569@@ -1,8 +1,11 @@
1570 #
1571-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
1572+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
1573 #
1574 # This file is part of txzookeeper.
1575 #
1576+# Authors:
1577+# Kapil Thangavelu
1578+#
1579 # txzookeeper is free software: you can redistribute it and/or modify
1580 # it under the terms of the GNU Lesser General Public License as published by
1581 # the Free Software Foundation, either version 3 of the License, or
1582@@ -17,9 +20,12 @@
1583 # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
1584 #
1585
1586+import logging
1587+import StringIO
1588 import sys
1589
1590 import zookeeper
1591+from twisted.internet.defer import Deferred
1592 from twisted.trial.unittest import TestCase
1593
1594 from mocker import MockerTestCase
1595@@ -29,20 +35,64 @@
1596
1597 def setUp(self):
1598 super(ZookeeperTestCase, self).setUp()
1599- self.log_file_path = self.makeFile()
1600- self.log_file = open(self.log_file_path, 'w')
1601- #zookeeper.set_log_stream(self.log_file)
1602 zookeeper.set_debug_level(0)
1603
1604 def tearDown(self):
1605 super(ZookeeperTestCase, self).tearDown()
1606- #zookeeper.set_log_stream(sys.stderr) # reset to default
1607- #zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
1608- self.log_file.close()
1609
1610 def get_log(self):
1611 return open(self.log_file_path).read()
1612
1613+ def sleep(self, delay):
1614+ """Non-blocking sleep."""
1615+ from twisted.internet import reactor
1616+ deferred = Deferred()
1617+ reactor.callLater(delay, deferred.callback, None)
1618+ return deferred
1619+
1620+ _missing_attr = object()
1621+
1622+ def patch(self, object, attr, value):
1623+ """Replace an object's attribute, and restore original value later.
1624+
1625+ Returns the original value of the attribute if any or None.
1626+ """
1627+ original_value = getattr(object, attr, self._missing_attr)
1628+
1629+ @self.addCleanup
1630+ def restore_original():
1631+ if original_value is self._missing_attr:
1632+ try:
1633+ delattr(object, attr)
1634+ except AttributeError:
1635+ pass
1636+ else:
1637+ setattr(object, attr, original_value)
1638+ setattr(object, attr, value)
1639+ if original_value is self._missing_attr:
1640+ return None
1641+ return original_value
1642+
1643+ def capture_log(self, name="", level=logging.INFO,
1644+ log_file=None, formatter=None):
1645+ """Capture log channel to StringIO"""
1646+ if log_file is None:
1647+ log_file = StringIO.StringIO()
1648+ log_handler = logging.StreamHandler(log_file)
1649+ if formatter:
1650+ log_handler.setFormatter(formatter)
1651+ logger = logging.getLogger(name)
1652+ logger.addHandler(log_handler)
1653+ old_logger_level = logger.level
1654+ logger.setLevel(level)
1655+
1656+ @self.addCleanup
1657+ def reset_logging():
1658+ logger.removeHandler(log_handler)
1659+ logger.setLevel(old_logger_level)
1660+
1661+ return log_file
1662+
1663
1664 def egg_test_runner():
1665 """
1666
1667=== modified file 'txzookeeper/tests/common.py'
1668--- txzookeeper/tests/common.py 2011-06-18 15:35:03 +0000
1669+++ txzookeeper/tests/common.py 2013-05-15 13:35:05 +0000
1670@@ -1,3 +1,26 @@
1671+#
1672+# Copyright (C) 2010-2011, 2011 Canonical Ltd. All Rights Reserved
1673+#
1674+# This file is part of txzookeeper.
1675+#
1676+# Authors:
1677+# Kapil Thangavelu
1678+#
1679+# txzookeeper is free software: you can redistribute it and/or modify
1680+# it under the terms of the GNU Lesser General Public License as published by
1681+# the Free Software Foundation, either version 3 of the License, or
1682+# (at your option) any later version.
1683+#
1684+# txzookeeper is distributed in the hope that it will be useful,
1685+# but WITHOUT ANY WARRANTY; without even the implied warranty of
1686+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1687+# GNU Lesser General Public License for more details.
1688+#
1689+# You should have received a copy of the GNU Lesser General Public License
1690+# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
1691+#
1692+
1693+
1694 import os
1695 import os.path
1696 import shutil
1697@@ -46,7 +69,7 @@
1698
1699 # various setup steps
1700 if not os.path.exists(self.working_path):
1701- os.mdir(self.working_path)
1702+ os.mkdir(self.working_path)
1703 if not os.path.exists(log_path):
1704 os.mkdir(log_path)
1705 if not os.path.exists(data_path):
1706@@ -54,10 +77,12 @@
1707
1708 with open(config_path, "w") as config:
1709 config.write("""
1710-tickTime=2000
1711+tickTime=1000
1712 dataDir=%s
1713 clientPort=%s
1714 maxClientCnxns=0
1715+maxSessionTimeout=5000
1716+minSessionTimeout=2000
1717 """ % (data_path, self.server_info.client_port))
1718
1719 # setup a replicated setup if peers are specified
1720
1721=== added file 'txzookeeper/tests/proxy.py'
1722--- txzookeeper/tests/proxy.py 1970-01-01 00:00:00 +0000
1723+++ txzookeeper/tests/proxy.py 2013-05-15 13:35:05 +0000
1724@@ -0,0 +1,97 @@
1725+#
1726+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
1727+#
1728+# This file is part of txzookeeper.
1729+#
1730+# Authors:
1731+# Kapil Thangavelu
1732+#
1733+# txzookeeper is free software: you can redistribute it and/or modify
1734+# it under the terms of the GNU Lesser General Public License as published by
1735+# the Free Software Foundation, either version 3 of the License, or
1736+# (at your option) any later version.
1737+#
1738+# txzookeeper is distributed in the hope that it will be useful,
1739+# but WITHOUT ANY WARRANTY; without even the implied warranty of
1740+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
1741+# GNU Lesser General Public License for more details.
1742+#
1743+# You should have received a copy of the GNU Lesser General Public License
1744+# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
1745+#
1746+
1747+from twisted.protocols import portforward
1748+
1749+
1750+class Blockable(object):
1751+
1752+ _blocked = None
1753+
1754+ def set_blocked(self, value):
1755+ value = bool(value)
1756+ self._blocked = value
1757+ if self.transport and not self._blocked:
1758+ self.transport.resumeProducing()
1759+
1760+
1761+class ProxyClient(portforward.ProxyClient, Blockable):
1762+
1763+ def dataReceived(self, data):
1764+ if self._blocked:
1765+ return
1766+ portforward.ProxyClient.dataReceived(self, data)
1767+
1768+ def setServer(self, server):
1769+ server.set_blocked(self._blocked)
1770+ super(ProxyClient, self).setServer(server)
1771+
1772+ def connectionMade(self):
1773+ self.peer.setPeer(self)
1774+ if not self._blocked:
1775+ # The server waits till the client is connected
1776+ self.peer.transport.resumeProducing()
1777+ else:
1778+ self.transport.pauseProducing()
1779+
1780+
1781+class ProxyClientFactory(portforward.ProxyClientFactory):
1782+
1783+ protocol = ProxyClient
1784+
1785+
1786+class ProxyServer(portforward.ProxyServer, Blockable):
1787+
1788+ clientProtocolFactory = ProxyClientFactory
1789+
1790+ def dataReceived(self, data):
1791+ if self._blocked:
1792+ return
1793+ portforward.ProxyServer.dataReceived(self, data)
1794+
1795+
1796+class ProxyFactory(portforward.ProxyFactory):
1797+
1798+ protocol = ProxyServer
1799+ instance = _blocked = False
1800+
1801+ def lose_connection(self):
1802+ """Terminate both ends of the proxy connection."""
1803+ if self.instance:
1804+ self.instance.transport.loseConnection()
1805+ if self.instance.peer:
1806+ self.instance.peer.transport.loseConnection()
1807+
1808+ def set_blocked(self, value):
1809+ self._blocked = bool(value)
1810+ if self.instance:
1811+ self.instance.set_blocked(self._blocked)
1812+ if self.instance.peer:
1813+ self.instance.peer.set_blocked(self._blocked)
1814+
1815+ def buildProtocol(self, addr):
1816+ # Track last protocol used, on reconnect any pauses are disabled.
1817+ self.instance = portforward.ProxyFactory.buildProtocol(self, addr)
1818+ # Propogate the value, the client will aggressively try to
1819+ # reconnect else.
1820+ self.instance.set_blocked(self._blocked)
1821+ return self.instance
1822
1823=== modified file 'txzookeeper/tests/test_client.py'
1824--- txzookeeper/tests/test_client.py 2011-06-30 20:16:17 +0000
1825+++ txzookeeper/tests/test_client.py 2013-05-15 13:35:05 +0000
1826@@ -1,8 +1,11 @@
1827 #
1828-# Copyright (C) 2010, 2011 Canonical Ltd. All Rights Reserved
1829+# Copyright (C) 2010-2011, 2011 Canonical Ltd. All Rights Reserved
1830 #
1831 # This file is part of txzookeeper.
1832 #
1833+# Authors:
1834+# Kapil Thangavelu
1835+#
1836 # txzookeeper is free software: you can redistribute it and/or modify
1837 # it under the terms of the GNU Lesser General Public License as published by
1838 # the Free Software Foundation, either version 3 of the License, or
1839@@ -21,13 +24,13 @@
1840 import base64
1841 import hashlib
1842
1843-from twisted.internet.defer import Deferred
1844+from twisted.internet.defer import Deferred, maybeDeferred
1845 from twisted.internet.base import DelayedCall
1846 from twisted.python.failure import Failure
1847
1848 import zookeeper
1849
1850-from mocker import ANY, MATCH
1851+from mocker import ANY, MATCH, ARGS
1852 from txzookeeper.tests import ZookeeperTestCase, utils
1853 from txzookeeper.client import (
1854 ZookeeperClient, ZOO_OPEN_ACL_UNSAFE, ConnectionTimeoutException,
1855@@ -119,18 +122,55 @@
1856 d.addCallback(check_connected)
1857 return d
1858
1859+ def test_close(self):
1860+ """
1861+ Test that the connection is closed, also for the first
1862+ connection when the zookeeper handle is 0.
1863+ """
1864+
1865+ def _fake_init(*_):
1866+ return 0
1867+
1868+ mock_init = self.mocker.replace("zookeeper.init")
1869+ mock_init(ARGS)
1870+ self.mocker.call(_fake_init)
1871+
1872+ def _fake_close(handle):
1873+ return zookeeper.OK
1874+
1875+ mock_close = self.mocker.replace("zookeeper.close")
1876+ mock_close(0)
1877+ self.mocker.call(_fake_close)
1878+
1879+ self.mocker.replay()
1880+
1881+ # Avoid unclean reactor by letting the callLater go through,
1882+ # but we do not care about the timeout.
1883+ def _silence_timeout(failure):
1884+ failure.trap(ConnectionTimeoutException)
1885+ self.client.connect(timeout=0).addErrback(_silence_timeout)
1886+
1887+ d = maybeDeferred(self.client.close)
1888+
1889+ def _verify(result):
1890+ self.mocker.verify()
1891+ d.addCallback(_verify)
1892+ return d
1893+
1894 def test_client_event_repr(self):
1895 event = ClientEvent(zookeeper.SESSION_EVENT,
1896- zookeeper.EXPIRED_SESSION_STATE, '')
1897- self.assertEqual(repr(event),
1898- "<ClientEvent session at '' state: expired>")
1899+ zookeeper.EXPIRED_SESSION_STATE, '', 0)
1900+ self.assertEqual(
1901+ repr(event),
1902+ "<ClientEvent session at '' state: expired handle:0>")
1903
1904 def test_client_event_attributes(self):
1905- event = ClientEvent(4, 'state', 'path')
1906+ event = ClientEvent(4, 'state', 'path', 0)
1907 self.assertEqual(event.type, 4)
1908 self.assertEqual(event.connection_state, 'state')
1909 self.assertEqual(event.path, 'path')
1910- self.assertEqual(event, (4, 'state', 'path'))
1911+ self.assertEqual(event.handle, 0)
1912+ self.assertEqual(event, (4, 'state', 'path', 0))
1913
1914 def test_client_use_while_disconnected_returns_failure(self):
1915 return self.assertFailure(
1916@@ -401,7 +441,7 @@
1917 """
1918 d = self.client.connect()
1919
1920- def inject_error(result_code, d, extra_codes=None):
1921+ def inject_error(result_code, d, extra_codes=None, path=None):
1922 error = SyntaxError()
1923 d.errback(error)
1924 return error
1925@@ -409,7 +449,8 @@
1926 def check_exists(client):
1927 mock_client = self.mocker.patch(client)
1928 mock_client._check_result(
1929- ANY, DEFERRED_MATCH, extra_codes=(zookeeper.NONODE,))
1930+ ANY, DEFERRED_MATCH, extra_codes=(zookeeper.NONODE,),
1931+ path="/zebra-moon")
1932 self.mocker.call(inject_error)
1933 self.mocker.replay()
1934 return client.exists("/zebra-moon")
1935@@ -506,7 +547,7 @@
1936
1937 def check_exists(path):
1938 exists, watch = self.client.exists_and_watch(
1939- "%s/wooly-mammoth"%node_path)
1940+ "%s/wooly-mammoth" % node_path)
1941 watch.chainDeferred(watcher_deferred)
1942 return exists
1943
1944@@ -517,13 +558,14 @@
1945
1946 def create_node(client):
1947 self.assertEqual(client.connected, True)
1948- return self.client2.create("%s/wooly-mammoth"%node_path, "extinct")
1949+ return self.client2.create(
1950+ "%s/wooly-mammoth" % node_path, "extinct")
1951
1952 def shim(path):
1953 return watcher_deferred
1954
1955 def verify_watch(event):
1956- self.assertEqual(event.path, "%s/wooly-mammoth"%node_path)
1957+ self.assertEqual(event.path, "%s/wooly-mammoth" % node_path)
1958 self.assertEqual(event.type, zookeeper.CREATED_EVENT)
1959
1960 d.addCallback(create_container)
1961@@ -600,7 +642,8 @@
1962
1963 def verify_succeeds(failure):
1964 self.assertTrue(failure)
1965- self.assertEqual(failure.value.args, ("no node",))
1966+ self.assertEqual(
1967+ failure.value.args, ("no node /abcd",))
1968
1969 d.addCallback(delete_node)
1970 d.addCallback(verify_fails)
1971@@ -643,7 +686,8 @@
1972
1973 def verify_succeeds(failure):
1974 self.assertTrue(failure)
1975- self.assertEqual(failure.value.args, ("no node",))
1976+ self.assertTrue(
1977+ failure.value.args, ("no node /xy1"))
1978
1979 d.addCallback(set_node)
1980 d.addCallback(verify_fails)
1981@@ -833,7 +877,7 @@
1982 return self.client.set("/orchard", "bar")
1983
1984 def node_access_failed(failure):
1985- self.assertEqual(failure.value.args, ("not authenticated",))
1986+ self.assertEqual(failure.value.args, ("not authenticated /orchard",))
1987 failed.append(True)
1988 return
1989
1990@@ -1046,7 +1090,7 @@
1991 d = self.client.connect()
1992
1993 def verify_session_timeout(client):
1994- self.assertEqual(client.session_timeout, 4000)
1995+ self.assertIn(client.session_timeout, (4000, 10000))
1996
1997 d.addCallback(verify_session_timeout)
1998 return d
1999@@ -1082,60 +1126,6 @@
2000 d.addErrback(verify_invalid)
2001 return d
2002
2003- def xtest_session_expired_event(self):
2004- """
2005- A client session can be reattached to in a separate connection,
2006- if the a session is expired, using the zookeeper connection will
2007- raise a SessionExpiredException.
2008- """
2009- d = self.client.connect()
2010-
2011- class StopTest(Exception):
2012- pass
2013-
2014- def new_connection_same_connection(client):
2015- self.assertEqual(client.state, zookeeper.CONNECTED_STATE)
2016- return ZookeeperClient("127.0.0.1:2181").connect(
2017- client_id=client.client_id).addErrback(
2018- guard_session_expired, client)
2019-
2020- def guard_session_expired(failure, client):
2021- # On occassion we get a session expired event while connecting.
2022- failure.trap(ConnectionException)
2023- self.assertEqual(failure.value.state_name, "expired")
2024- # Stop the test from proceeding
2025- raise StopTest()
2026-
2027- def close_new_connection(client):
2028- # Verify both connections are using same session
2029- self.assertEqual(self.client.client_id, client.client_id)
2030- self.assertEqual(client.state, zookeeper.CONNECTED_STATE)
2031-
2032- # Closing one connection will close the session
2033- client.close()
2034-
2035- # Continued use of the other client will get a
2036- # disconnect exception.
2037- return self.client.exists("/")
2038-
2039- def verify_original_closed(failure):
2040- if not isinstance(failure, Failure):
2041- self.fail("Test did not raise exception.")
2042- failure.trap(
2043- zookeeper.SessionExpiredException,
2044- zookeeper.ConnectionLossException)
2045-
2046- #print "client close"
2047- self.client.close()
2048- #print "creating new client for teardown"
2049- return self.client.connect()
2050-
2051- d.addCallback(new_connection_same_connection)
2052- d.addCallback(close_new_connection)
2053- d.addBoth(verify_original_closed)
2054-
2055- return d
2056-
2057 def test_connect_with_server(self):
2058 """
2059 A client's servers can be specified in the connect method.
2060@@ -1183,12 +1173,8 @@
2061 connected before then, then an errback is invoked with a timeout
2062 exception.
2063 """
2064- mock_init = self.mocker.replace("zookeeper.init")
2065- mock_init(ANY, ANY, ANY)
2066- self.mocker.result(0)
2067- self.mocker.replay()
2068-
2069- d = self.client.connect(timeout=0.1)
2070+ # Connect to a non standard port with nothing at the remote side.
2071+ d = self.client.connect("127.0.0.1:2182", timeout=0.2)
2072
2073 def verify_timeout(failure):
2074 self.assertTrue(
2075
2076=== added file 'txzookeeper/tests/test_conn_failure.py'
2077--- txzookeeper/tests/test_conn_failure.py 1970-01-01 00:00:00 +0000
2078+++ txzookeeper/tests/test_conn_failure.py 2013-05-15 13:35:05 +0000
2079@@ -0,0 +1,194 @@
2080+#
2081+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
2082+#
2083+# This file is part of txzookeeper.
2084+#
2085+# Authors:
2086+# Kapil Thangavelu
2087+#
2088+# txzookeeper is free software: you can redistribute it and/or modify
2089+# it under the terms of the GNU Lesser General Public License as published by
2090+# the Free Software Foundation, either version 3 of the License, or
2091+# (at your option) any later version.
2092+#
2093+# txzookeeper is distributed in the hope that it will be useful,
2094+# but WITHOUT ANY WARRANTY; without even the implied warranty of
2095+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
2096+# GNU Lesser General Public License for more details.
2097+#
2098+# You should have received a copy of the GNU Lesser General Public License
2099+# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
2100+#
2101+
2102+import zookeeper
2103+
2104+from twisted.internet import reactor
2105+from twisted.internet.defer import inlineCallbacks
2106+
2107+from txzookeeper import ZookeeperClient
2108+from txzookeeper.tests import ZookeeperTestCase, utils
2109+from txzookeeper.tests.proxy import ProxyFactory
2110+
2111+
2112+class WatchDeliveryConnectionFailedTest(ZookeeperTestCase):
2113+ """Watches are still sent on reconnect.
2114+ """
2115+
2116+ def setUp(self):
2117+ super(WatchDeliveryConnectionFailedTest, self).setUp()
2118+
2119+ self.proxy = ProxyFactory("127.0.0.1", 2181)
2120+ self.proxy_port = reactor.listenTCP(0, self.proxy)
2121+ host = self.proxy_port.getHost()
2122+ self.proxied_client = ZookeeperClient(
2123+ "%s:%s" % (host.host, host.port))
2124+ self.direct_client = ZookeeperClient("127.0.0.1:2181", 3000)
2125+ self.session_events = []
2126+
2127+ def session_event_collector(conn, event):
2128+ self.session_events.append(event)
2129+
2130+ self.proxied_client.set_session_callback(session_event_collector)
2131+ return self.direct_client.connect()
2132+
2133+ @inlineCallbacks
2134+ def tearDown(self):
2135+ zookeeper.set_debug_level(0)
2136+ if self.proxied_client.connected:
2137+ yield self.proxied_client.close()
2138+ if not self.direct_client.connected:
2139+ yield self.direct_client.connect()
2140+ utils.deleteTree(handle=self.direct_client.handle)
2141+ yield self.direct_client.close()
2142+ self.proxy.lose_connection()
2143+ yield self.proxy_port.stopListening()
2144+
2145+ def verify_events(self, events, expected):
2146+ """Verify the state of the session events encountered.
2147+ """
2148+ for value, state in zip([e.state_name for e in events], expected):
2149+ self.assertEqual(value, state)
2150+
2151+ @inlineCallbacks
2152+ def test_child_watch_fires_upon_reconnect(self):
2153+ yield self.proxied_client.connect()
2154+
2155+ # Setup tree
2156+ cpath = "/test-tree"
2157+ yield self.direct_client.create(cpath)
2158+
2159+ # Setup watch
2160+ child_d, watch_d = self.proxied_client.get_children_and_watch(cpath)
2161+
2162+ self.assertEqual((yield child_d), [])
2163+
2164+ # Kill the connection and fire the watch
2165+ self.proxy.lose_connection()
2166+ yield self.direct_client.create(
2167+ cpath + "/abc", flags=zookeeper.SEQUENCE)
2168+
2169+ # We should still get the child event.
2170+ yield watch_d
2171+
2172+ # We get two pairs of (connecting, connected) for the conn and watch
2173+ self.assertEqual(len(self.session_events), 4)
2174+ self.verify_events(
2175+ self.session_events,
2176+ ("connecting", "connecting", "connected", "connected"))
2177+
2178+ @inlineCallbacks
2179+ def test_exists_watch_fires_upon_reconnect(self):
2180+ yield self.proxied_client.connect()
2181+ cpath = "/test"
2182+
2183+ # Setup watch
2184+ exists_d, watch_d = self.proxied_client.exists_and_watch(cpath)
2185+
2186+ self.assertEqual((yield exists_d), None)
2187+
2188+ # Kill the connection and fire the watch
2189+ self.proxy.lose_connection()
2190+ yield self.direct_client.create(cpath)
2191+
2192+ # We should still get the exists event.
2193+ yield watch_d
2194+
2195+ # We get two pairs of (connecting, connected) for the conn and watch
2196+ self.assertEqual(len(self.session_events), 4)
2197+ self.verify_events(
2198+ self.session_events,
2199+ ("connecting", "connecting", "connected", "connected"))
2200+
2201+ @inlineCallbacks
2202+ def test_get_watch_fires_upon_reconnect(self):
2203+ yield self.proxied_client.connect()
2204+ # Setup tree
2205+ cpath = "/test"
2206+ yield self.direct_client.create(cpath, "abc")
2207+
2208+ # Setup watch
2209+ get_d, watch_d = self.proxied_client.get_and_watch(cpath)
2210+ content, stat = yield get_d
2211+ self.assertEqual(content, "abc")
2212+
2213+ # Kill the connection and fire the watch
2214+ self.proxy.lose_connection()
2215+ yield self.direct_client.set(cpath, "xyz")
2216+
2217+ # We should still get the exists event.
2218+ yield watch_d
2219+
2220+ # We also two pairs of (connecting, connected) for the conn and watch
2221+ self.assertEqual(len(self.session_events), 4)
2222+ self.verify_events(
2223+ self.session_events,
2224+ ("connecting", "connecting", "connected", "connected"))
2225+
2226+ @inlineCallbacks
2227+ def test_watch_delivery_failure_resends(self):
2228+ """Simulate a network failure for the watch delivery
2229+
2230+ The zk server effectively sends the watch delivery to the client,
2231+ but the client never recieves it.
2232+ """
2233+ yield self.proxied_client.connect()
2234+ cpath = "/test"
2235+
2236+ # Setup watch
2237+ exists_d, watch_d = self.proxied_client.exists_and_watch(cpath)
2238+
2239+ self.assertEqual((yield exists_d), None)
2240+
2241+ # Pause the connection fire the watch, and blackhole the data.
2242+ self.proxy.set_blocked(True)
2243+ yield self.direct_client.create(cpath)
2244+ self.proxy.set_blocked(False)
2245+ self.proxy.lose_connection()
2246+
2247+ # We should still get the exists event.
2248+ yield watch_d
2249+
2250+ @inlineCallbacks
2251+ def xtest_binding_bug_session_exception(self):
2252+ """This test triggers an exception in the python-zookeeper binding.
2253+
2254+ File "txzookeeper/client.py", line 491, in create
2255+ self.handle, path, data, acls, flags, callback)
2256+ exceptions.SystemError: error return without exception set
2257+ """
2258+ yield self.proxied_client.connect()
2259+ data_d, watch_d = yield self.proxied_client.exists_and_watch("/")
2260+ self.assertTrue((yield data_d))
2261+ self.proxy.set_blocked(True)
2262+ # Wait for session expiration, on a single server options are limited
2263+ yield self.sleep(15)
2264+ # Unblock the proxy for next connect, and then drop the connection.
2265+ self.proxy.set_blocked(False)
2266+ self.proxy.lose_connection()
2267+ # Wait for a reconnect
2268+ yield self.assertFailure(watch_d, zookeeper.SessionExpiredException)
2269+ # Leads to bindings bug failure
2270+ yield self.assertFailure(
2271+ self.proxied_client.get("/a"),
2272+ zookeeper.SessionExpiredException)
2273+ self.assertEqual(self.session_events[-1].state_name, "expired")
2274
2275=== modified file 'txzookeeper/tests/test_lock.py'
2276--- txzookeeper/tests/test_lock.py 2011-06-17 23:52:34 +0000
2277+++ txzookeeper/tests/test_lock.py 2013-05-15 13:35:05 +0000
2278@@ -1,8 +1,11 @@
2279 #
2280-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
2281+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
2282 #
2283 # This file is part of txzookeeper.
2284 #
2285+# Authors:
2286+# Kapil Thangavelu
2287+#
2288 # txzookeeper is free software: you can redistribute it and/or modify
2289 # it under the terms of the GNU Lesser General Public License as published by
2290 # the Free Software Foundation, either version 3 of the License, or
2291
2292=== added file 'txzookeeper/tests/test_managed.py'
2293--- txzookeeper/tests/test_managed.py 1970-01-01 00:00:00 +0000
2294+++ txzookeeper/tests/test_managed.py 2013-05-15 13:35:05 +0000
2295@@ -0,0 +1,309 @@
2296+#
2297+# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved
2298+#
2299+# This file is part of txzookeeper.
2300+#
2301+# Authors:
2302+# Kapil Thangavelu
2303+#
2304+# txzookeeper is free software: you can redistribute it and/or modify
2305+# it under the terms of the GNU Lesser General Public License as published by
2306+# the Free Software Foundation, either version 3 of the License, or
2307+# (at your option) any later version.
2308+#
2309+# txzookeeper is distributed in the hope that it will be useful,
2310+# but WITHOUT ANY WARRANTY; without even the implied warranty of
2311+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
2312+# GNU Lesser General Public License for more details.
2313+#
2314+# You should have received a copy of the GNU Lesser General Public License
2315+# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
2316+#
2317+
2318+import logging
2319+import zookeeper
2320+
2321+from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList
2322+
2323+from txzookeeper.client import ZookeeperClient, ClientEvent
2324+from txzookeeper import managed
2325+from txzookeeper.tests import ZookeeperTestCase, utils
2326+from txzookeeper.tests import test_client
2327+
2328+
2329+class WatchTest(ZookeeperTestCase):
2330+ """Watch manager and watch tests"""
2331+
2332+ def setUp(self):
2333+ self.watches = managed.WatchManager()
2334+
2335+ def tearDown(self):
2336+ self.watches.clear()
2337+ del self.watches
2338+
2339+ def test_add_remove(self):
2340+
2341+ w = self.watches.add("/foobar", "child", lambda x: 1)
2342+ self.assertIn(
2343+ "<Watcher child /foobar",
2344+ str(w))
2345+ self.assertIn(w, self.watches._watches)
2346+ self.watches.remove(w)
2347+ self.assertNotIn(w, self.watches._watches)
2348+ self.watches.remove(w)
2349+
2350+ @inlineCallbacks
2351+ def test_watch_fire_removes(self):
2352+ """Firing the watch removes it from the manager.
2353+ """
2354+ w = self.watches.add("/foobar", "child", lambda x: 1)
2355+ yield w("a")
2356+ self.assertNotIn(w, self.watches._watches)
2357+
2358+ @inlineCallbacks
2359+ def test_watch_fire_with_error_removes(self):
2360+ """Firing the watch removes it from the manager.
2361+ """
2362+ d = Deferred()
2363+
2364+ @inlineCallbacks
2365+ def cb_error(e):
2366+ yield d
2367+ raise ValueError("a")
2368+
2369+ w = self.watches.add("/foobar", "child", lambda x: 1)
2370+ try:
2371+ wd = w("a")
2372+ d.callback(True)
2373+ yield wd
2374+ except ValueError:
2375+ pass
2376+ self.assertNotIn(w, self.watches._watches)
2377+
2378+ @inlineCallbacks
2379+ def test_reset_with_error(self):
2380+ """A callback firing an error on reset is ignored.
2381+ """
2382+ output = self.capture_log("txzk.managed")
2383+ d = Deferred()
2384+ results = []
2385+
2386+ @inlineCallbacks
2387+ def callback(*args, **kw):
2388+ results.append((args, kw))
2389+ yield d
2390+ raise ValueError("a")
2391+
2392+ w = self.watches.add("/foobar", "child", callback)
2393+ reset_done = self.watches.reset()
2394+
2395+ e, _ = results.pop()
2396+ e = list(e)
2397+ e.append(0)
2398+
2399+ self.assertEqual(
2400+ str(ClientEvent(*e)),
2401+ "<ClientEvent session at '/foobar' state: connected handle:0>")
2402+ d.callback(True)
2403+ yield reset_done
2404+ self.assertNotIn(w, self.watches._watches)
2405+ self.assertIn("Error reseting watch", output.getvalue())
2406+
2407+ @inlineCallbacks
2408+ def test_reset(self):
2409+ """Reset fires a synthentic client event, and clears watches.
2410+ """
2411+ d = Deferred()
2412+ results = []
2413+
2414+ def callback(*args, **kw):
2415+ results.append((args, kw))
2416+ return d
2417+
2418+ w = self.watches.add("/foobar", "child", callback)
2419+ reset_done = self.watches.reset()
2420+
2421+ e, _ = results.pop()
2422+ e = list(e)
2423+ e.append(0)
2424+
2425+ self.assertEqual(
2426+ str(ClientEvent(*e)),
2427+ "<ClientEvent session at '/foobar' state: connected handle:0>")
2428+ d.callback(True)
2429+ yield reset_done
2430+ self.assertNotIn(w, self.watches._watches)
2431+
2432+
2433+class SessionClientTests(test_client.ClientTests):
2434+ """Run through basic operations with SessionClient."""
2435+ timeout = 5
2436+
2437+ def setUp(self):
2438+ super(SessionClientTests, self).setUp()
2439+ self.client = managed.SessionClient("127.0.0.1:2181")
2440+
2441+ def test_client_use_while_disconnected_returns_failure(self):
2442+ # managed session client reconnects here.
2443+ return True
2444+
2445+
2446+class SessionClientExpireTests(ZookeeperTestCase):
2447+ """Verify expiration behavior."""
2448+
2449+ def setUp(self):
2450+ super(SessionClientExpireTests, self).setUp()
2451+ self.client = managed.ManagedClient("127.0.0.1:2181", 3000)
2452+ self.client2 = None
2453+ self.output = self.capture_log(level=logging.DEBUG)
2454+ return self.client.connect()
2455+
2456+ @inlineCallbacks
2457+ def tearDown(self):
2458+ self.client.close()
2459+ self.client2 = ZookeeperClient("127.0.0.1:2181")
2460+ yield self.client2.connect()
2461+ utils.deleteTree(handle=self.client2.handle)
2462+ yield self.client2.close()
2463+ super(SessionClientExpireTests, self).tearDown()
2464+
2465+ @inlineCallbacks
2466+ def expire_session(self, wait=True, retry=0):
2467+ assert self.client.connected
2468+ #if wait:
2469+ # d = self.client.subscribe_new_session()
2470+ client_id = self.client.client_id
2471+ self.client2 = ZookeeperClient(self.client.servers)
2472+ yield self.client2.connect(client_id=client_id)
2473+ yield self.client2.close()
2474+
2475+ # It takes some time to propagate (1/3 session time as ping)
2476+ if wait:
2477+ yield self.sleep(2)
2478+ client_new_id = self.client.client_id
2479+ # Crappy workaround to c libzk bug/issue see http://goo.gl/9ei5c
2480+ # Works most of the time.. but bound it when it doesn't. lame!
2481+ if client_id[0] == client_new_id[0] and retry < 10:
2482+ yield self.expire_session(wait, retry+1)
2483+
2484+ @inlineCallbacks
2485+ def test_session_expiration_conn(self):
2486+ d = self.client.subscribe_new_session()
2487+ session_id = self.client.client_id[0]
2488+ yield self.client.create("/fo-1", "abc")
2489+ yield self.expire_session(wait=True)
2490+ yield d
2491+ stat = yield self.client.exists("/")
2492+ self.assertTrue(stat)
2493+ self.assertNotEqual(session_id, self.client.client_id[0])
2494+
2495+ @inlineCallbacks
2496+ def test_session_expiration_notification(self):
2497+ session_id = self.client.client_id[0]
2498+ c_d, w_d = self.client.get_and_watch("/")
2499+ yield c_d
2500+ d = self.client.subscribe_new_session()
2501+ self.assertFalse(d.called)
2502+ yield self.expire_session(wait=True)
2503+ yield d
2504+ yield w_d
2505+ self.assertNotEqual(session_id, self.client.client_id[0])
2506+
2507+ @inlineCallbacks
2508+ def test_invoked_watch_gc(self):
2509+ c_d, w_d = yield self.client.get_children_and_watch("/")
2510+ yield c_d
2511+ yield self.client.create("/foo")
2512+ yield w_d
2513+ yield self.expire_session()
2514+ yield self.client.create("/foo2")
2515+ # Nothing should blow up
2516+ yield self.sleep(0.2)
2517+
2518+ @inlineCallbacks
2519+ def test_app_usage_error_bypass_retry(self):
2520+ """App errors shouldn't trigger a reconnect."""
2521+ output = self.capture_log(level=logging.DEBUG)
2522+ yield self.assertFailure(
2523+ self.client.get("/abc"), zookeeper.NoNodeException)
2524+ self.assertNotIn("Persistent retry error", output.getvalue())
2525+
2526+ @inlineCallbacks
2527+ def test_ephemeral_and_watch_recreate(self):
2528+ # Create some ephemeral nodes
2529+ yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
2530+ yield self.client.create("/fo-2", "def", flags=zookeeper.EPHEMERAL)
2531+
2532+ # Create some watches
2533+ g_d, g_w_d = self.client.get_and_watch("/fo-1")
2534+ yield g_d
2535+
2536+ c_d, c_w_d = self.client.get_children_and_watch("/")
2537+ yield g_d
2538+
2539+ e_d, e_w_d = self.client.get_children_and_watch("/fo-2")
2540+ yield e_d
2541+
2542+ # Expire the session
2543+ yield self.expire_session()
2544+
2545+ # Poof
2546+
2547+ # Ephemerals back
2548+ c, s = yield self.client.get("/fo-1")
2549+ self.assertEqual(c, "abc")
2550+
2551+ c, s = yield self.client.get("/fo-2")
2552+ self.assertEqual(c, "def")
2553+
2554+ # Watches triggered
2555+ yield DeferredList(
2556+ [g_w_d, c_w_d, e_w_d],
2557+ fireOnOneErrback=True, consumeErrors=True)
2558+
2559+ h = self.client.handle
2560+ self.assertEqual(
2561+ [str(d.result) for d in (g_w_d, c_w_d, e_w_d)],
2562+ ["<ClientEvent session at '/fo-1' state: connected handle:%d>" % h,
2563+ "<ClientEvent session at '/' state: connected handle:%d>" % h,
2564+ "<ClientEvent session at '/fo-2' state: connected handle:%d>" % h
2565+ ])
2566+
2567+ @inlineCallbacks
2568+ def test_ephemeral_no_track_sequence_nodes(self):
2569+ """ Ephemeral tracking ignores sequence nodes.
2570+ """
2571+ yield self.client.create("/music", "abc")
2572+ yield self.client.create(
2573+ "/music/u2-", "abc",
2574+ flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
2575+ yield self.expire_session()
2576+ children = yield self.client.get_children("/music")
2577+ self.assertEqual(children, [])
2578+
2579+ @inlineCallbacks
2580+ def test_ephemeral_content_modification(self):
2581+ yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
2582+ yield self.client.set("/fo-1", "def")
2583+ yield self.expire_session()
2584+ c, s = yield self.client.get("/fo-1")
2585+ self.assertEqual(c, "def")
2586+
2587+ @inlineCallbacks
2588+ def test_ephemeral_acl_modification(self):
2589+ yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
2590+ acl = [test_client.PUBLIC_ACL,
2591+ dict(scheme="digest",
2592+ id="zebra:moon",
2593+ perms=zookeeper.PERM_ALL)]
2594+ yield self.client.set_acl("/fo-1", acl)
2595+ yield self.expire_session()
2596+ n_acl, stat = yield self.client.get_acl("/fo-1")
2597+ self.assertEqual(acl, n_acl)
2598+
2599+ @inlineCallbacks
2600+ def test_ephemeral_deletion(self):
2601+ yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
2602+ yield self.client.delete("/fo-1")
2603+ yield self.expire_session()
2604+ self.assertFalse((yield self.client.exists("/fo-1")))
2605
2606=== modified file 'txzookeeper/tests/test_node.py'
2607--- txzookeeper/tests/test_node.py 2011-06-30 17:14:06 +0000
2608+++ txzookeeper/tests/test_node.py 2013-05-15 13:35:05 +0000
2609@@ -1,8 +1,11 @@
2610 #
2611-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
2612+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
2613 #
2614 # This file is part of txzookeeper.
2615 #
2616+# Authors:
2617+# Kapil Thangavelu
2618+#
2619 # txzookeeper is free software: you can redistribute it and/or modify
2620 # it under the terms of the GNU Lesser General Public License as published by
2621 # the Free Software Foundation, either version 3 of the License, or
2622@@ -227,8 +230,8 @@
2623 """
2624 node = ZNode("/zoo/elephant", self.client)
2625 exists, watch = yield node.exists_and_watch()
2626-
2627- self.client.create("/zoo/elephant")
2628+ self.assertFalse((yield exists))
2629+ yield self.client.create("/zoo/elephant")
2630 event = yield watch
2631 self.assertEqual(event.type, zookeeper.CREATED_EVENT)
2632 self.assertEqual(event.path, node.path)
2633
2634=== modified file 'txzookeeper/tests/test_queue.py'
2635--- txzookeeper/tests/test_queue.py 2011-06-30 17:14:06 +0000
2636+++ txzookeeper/tests/test_queue.py 2013-05-15 13:35:05 +0000
2637@@ -1,8 +1,11 @@
2638 #
2639-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
2640+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
2641 #
2642 # This file is part of txzookeeper.
2643 #
2644+# Authors:
2645+# Kapil Thangavelu
2646+#
2647 # txzookeeper is free software: you can redistribute it and/or modify
2648 # it under the terms of the GNU Lesser General Public License as published by
2649 # the Free Software Foundation, either version 3 of the License, or
2650@@ -17,11 +20,11 @@
2651 # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
2652 #
2653
2654-
2655 from zookeeper import NoNodeException
2656 from twisted.internet.defer import (
2657 inlineCallbacks, returnValue, DeferredList, Deferred, succeed, fail)
2658
2659+
2660 from txzookeeper import ZookeeperClient
2661 from txzookeeper.client import NotConnectedException
2662 from txzookeeper.queue import Queue, ReliableQueue, SerializedQueue, QueueItem
2663
2664=== added file 'txzookeeper/tests/test_retry.py'
2665--- txzookeeper/tests/test_retry.py 1970-01-01 00:00:00 +0000
2666+++ txzookeeper/tests/test_retry.py 2013-05-15 13:35:05 +0000
2667@@ -0,0 +1,332 @@
2668+#
2669+# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved
2670+#
2671+# This file is part of txzookeeper.
2672+#
2673+# Authors:
2674+# Kapil Thangavelu
2675+#
2676+# txzookeeper is free software: you can redistribute it and/or modify
2677+# it under the terms of the GNU Lesser General Public License as published by
2678+# the Free Software Foundation, either version 3 of the License, or
2679+# (at your option) any later version.
2680+#
2681+# txzookeeper is distributed in the hope that it will be useful,
2682+# but WITHOUT ANY WARRANTY; without even the implied warranty of
2683+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
2684+# GNU Lesser General Public License for more details.
2685+#
2686+# You should have received a copy of the GNU Lesser General Public License
2687+# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
2688+#
2689+
2690+import json
2691+import time
2692+import zookeeper
2693+
2694+from twisted.internet.defer import inlineCallbacks, fail, succeed, Deferred
2695+
2696+from txzookeeper.client import ZookeeperClient
2697+from txzookeeper.retry import (
2698+ RetryClient, retry, retry_watch,
2699+ check_retryable, is_retryable, get_delay, sleep)
2700+
2701+from txzookeeper import retry as retry_module
2702+
2703+
2704+from txzookeeper.utils import retry_change
2705+
2706+from txzookeeper.tests import ZookeeperTestCase, utils
2707+from txzookeeper.tests.proxy import ProxyFactory
2708+from txzookeeper.tests import test_client
2709+
2710+
2711+class RetryCoreTests(ZookeeperTestCase):
2712+ """Test the retry functions in isolation.
2713+ """
2714+ timeout = 5
2715+
2716+ def test_is_retryable(self):
2717+ self.assertEqual(
2718+ is_retryable(zookeeper.SessionExpiredException()), False)
2719+ self.assertEqual(
2720+ is_retryable(zookeeper.ConnectionLossException()), True)
2721+ self.assertEqual(
2722+ is_retryable(zookeeper.OperationTimeoutException()), True)
2723+ self.assertEqual(
2724+ is_retryable(TypeError()), False)
2725+
2726+ def setup_always_retryable(self):
2727+ def check_retry(*args):
2728+ return True
2729+
2730+ self.patch(retry_module, "check_retryable", check_retry)
2731+
2732+ @inlineCallbacks
2733+ def test_sleep(self):
2734+ t = time.time()
2735+ yield sleep(0.5)
2736+ self.assertTrue(time.time() - t > 0.5)
2737+
2738+ @inlineCallbacks
2739+ def test_retry_function(self):
2740+ """The retry wrapper can be used for a function."""
2741+ self.setup_always_retryable()
2742+
2743+ results = [fail(zookeeper.ConnectionLossException()),
2744+ fail(zookeeper.ConnectionLossException()),
2745+ succeed(21)]
2746+
2747+ def original(zebra):
2748+ """Hello World"""
2749+ return results.pop(0)
2750+
2751+ result = yield retry(ZookeeperClient(), original, "magic")
2752+ self.assertEqual(result, 21)
2753+
2754+ @inlineCallbacks
2755+ def test_retry_method(self):
2756+ """The retry wrapper can be used for a method."""
2757+ self.setup_always_retryable()
2758+
2759+ results = [fail(zookeeper.ConnectionLossException()),
2760+ fail(zookeeper.ConnectionLossException()),
2761+ succeed(21)]
2762+
2763+ class Foobar(object):
2764+
2765+ def original(self, zebra):
2766+ """Hello World"""
2767+ return results.pop(0)
2768+
2769+ client = ZookeeperClient()
2770+ foo = Foobar()
2771+ result = yield retry(client, foo.original, "magic")
2772+ self.assertEqual(result, 21)
2773+
2774+ @inlineCallbacks
2775+ def test_retry_watch_function(self):
2776+ self.setup_always_retryable()
2777+
2778+ results = [(fail(zookeeper.ConnectionLossException()), Deferred()),
2779+ (fail(zookeeper.ConnectionLossException()), Deferred()),
2780+ (succeed(21), succeed(22))]
2781+
2782+ def original(zebra):
2783+ """Hello World"""
2784+ return results.pop(0)
2785+
2786+ client = ZookeeperClient()
2787+ value_d, watch_d = retry_watch(client, original, "magic")
2788+ self.assertEqual((yield value_d), 21)
2789+ self.assertEqual((yield watch_d), 22)
2790+
2791+ def test_check_retryable(self):
2792+ unrecoverable_errors = [
2793+ zookeeper.ApiErrorException(),
2794+ zookeeper.NoAuthException(),
2795+ zookeeper.NodeExistsException(),
2796+ zookeeper.SessionExpiredException(),
2797+ ]
2798+
2799+ class _Conn(object):
2800+ def __init__(self, **kw):
2801+ self.__dict__.update(kw)
2802+
2803+ error = zookeeper.ConnectionLossException()
2804+ conn = _Conn(connected=True, unrecoverable=False)
2805+ max_time = time.time() + 10
2806+
2807+ for e in unrecoverable_errors:
2808+ self.assertFalse(check_retryable(_Conn(), max_time, e))
2809+
2810+ self.assertTrue(check_retryable(conn, max_time, error))
2811+ self.assertFalse(check_retryable(conn, time.time() - 10, error))
2812+ self.assertFalse(check_retryable(
2813+ _Conn(connected=False, unrecoverable=False), max_time, error))
2814+ self.assertFalse(check_retryable(
2815+ _Conn(connected=True, unrecoverable=True), max_time, error))
2816+
2817+ def test_get_delay(self):
2818+ # Delay currently set to ~1/3 of session time.
2819+
2820+ # Verify max value is respected
2821+ self.assertEqual(get_delay(25000, 10), 7.5)
2822+ # Verify normal calculation
2823+ self.assertEqual(get_delay(15000, 10, 30), 4.5)
2824+
2825+
2826+class RetryClientTests(test_client.ClientTests):
2827+ """Run the full client test suite against the retry facade.
2828+ """
2829+ def setUp(self):
2830+ super(RetryClientTests, self).setUp()
2831+ self.client = RetryClient(ZookeeperClient("127.0.0.1:2181", 3000))
2832+ self.client2 = None
2833+
2834+ def tearDown(self):
2835+ if self.client.connected:
2836+ utils.deleteTree(handle=self.client.handle)
2837+ self.client.close()
2838+
2839+ if self.client2 and self.client2.connected:
2840+ self.client2.close()
2841+
2842+ super(RetryClientTests, self).tearDown()
2843+
2844+ def test_wb_connect_after_timeout(self):
2845+ """white box tests disabled for retryclient."""
2846+
2847+ def test_wb_reconnect_after_timeout_and_close(self):
2848+ """white box tests disabled for retryclient."""
2849+
2850+ def test_exists_with_error(self):
2851+ """White box tests disabled for retryclient."""
2852+
2853+
2854+class RetryClientConnectionLossTest(ZookeeperTestCase):
2855+
2856+ def setUp(self):
2857+ super(RetryClientConnectionLossTest, self).setUp()
2858+
2859+ from twisted.internet import reactor
2860+ self.proxy = ProxyFactory("127.0.0.1", 2181)
2861+ self.proxy_port = reactor.listenTCP(0, self.proxy)
2862+ host = self.proxy_port.getHost()
2863+ self.proxied_client = RetryClient(ZookeeperClient(
2864+ "%s:%s" % (host.host, host.port)))
2865+ self.direct_client = ZookeeperClient("127.0.0.1:2181", 3000)
2866+ self.session_events = []
2867+
2868+ def session_event_collector(conn, event):
2869+ self.session_events.append(event)
2870+
2871+ self.proxied_client.set_session_callback(session_event_collector)
2872+ return self.direct_client.connect()
2873+
2874+ @inlineCallbacks
2875+ def tearDown(self):
2876+ import zookeeper
2877+ zookeeper.set_debug_level(0)
2878+ if self.proxied_client.connected:
2879+ yield self.proxied_client.close()
2880+ if not self.direct_client.connected:
2881+ yield self.direct_client.connect()
2882+ utils.deleteTree(handle=self.direct_client.handle)
2883+ yield self.direct_client.close()
2884+ self.proxy.lose_connection()
2885+ yield self.proxy_port.stopListening()
2886+
2887+ @inlineCallbacks
2888+ def test_get_children_and_watch(self):
2889+ yield self.proxied_client.connect()
2890+
2891+ # Setup tree
2892+ cpath = "/test-tree"
2893+ yield self.direct_client.create(cpath)
2894+
2895+ # Block the request (drops all packets.)
2896+ self.proxy.set_blocked(True)
2897+ child_d, watch_d = self.proxied_client.get_children_and_watch(cpath)
2898+
2899+ # Unblock and disconnect
2900+ self.proxy.set_blocked(False)
2901+ self.proxy.lose_connection()
2902+
2903+ # Call goes through
2904+ self.assertEqual((yield child_d), [])
2905+ self.assertEqual(len(self.session_events), 2)
2906+
2907+ # And we have reconnect events
2908+ self.assertEqual(self.session_events[-1].state_name, "connected")
2909+
2910+ yield self.direct_client.create(cpath + "/abc")
2911+
2912+ # The original watch is still active
2913+ yield watch_d
2914+
2915+ @inlineCallbacks
2916+ def test_exists_and_watch(self):
2917+ yield self.proxied_client.connect()
2918+
2919+ cpath = "/test-tree"
2920+
2921+ # Block the request
2922+ self.proxy.set_blocked(True)
2923+ exists_d, watch_d = self.proxied_client.exists_and_watch(cpath)
2924+
2925+ # Create the node
2926+ yield self.direct_client.create(cpath)
2927+
2928+ # Unblock and disconnect
2929+ self.proxy.set_blocked(False)
2930+ self.proxy.lose_connection()
2931+
2932+ # Call gets retried, see the latest state
2933+ self.assertTrue((yield exists_d))
2934+ self.assertEqual(len(self.session_events), 2)
2935+
2936+ # And we have reconnect events
2937+ self.assertEqual(self.session_events[-1].state_name, "connected")
2938+
2939+ yield self.direct_client.delete(cpath)
2940+
2941+ # The original watch is still active
2942+ yield watch_d
2943+
2944+ @inlineCallbacks
2945+ def test_get_and_watch(self):
2946+ yield self.proxied_client.connect()
2947+
2948+ # Setup tree
2949+ cpath = "/test-tree"
2950+ yield self.direct_client.create(cpath)
2951+
2952+ # Block the request (drops all packets.)
2953+ self.proxy.set_blocked(True)
2954+ get_d, watch_d = self.proxied_client.get_and_watch(cpath)
2955+
2956+ # Unblock and disconnect
2957+ self.proxy.set_blocked(False)
2958+ self.proxy.lose_connection()
2959+
2960+ # Call goes through
2961+ content, stat = yield get_d
2962+ self.assertEqual(content, '')
2963+ self.assertEqual(len(self.session_events), 2)
2964+
2965+ # And we have reconnect events
2966+ self.assertEqual(self.session_events[-1].state_name, "connected")
2967+
2968+ yield self.direct_client.delete(cpath)
2969+
2970+ # The original watch is still active
2971+ yield watch_d
2972+
2973+ @inlineCallbacks
2974+ def test_set(self):
2975+ yield self.proxied_client.connect()
2976+
2977+ # Setup tree
2978+ cpath = "/test-tree"
2979+ yield self.direct_client.create(cpath, json.dumps({"a": 1, "c": 2}))
2980+
2981+ def update_node(content, stat):
2982+ data = json.loads(content)
2983+ data["a"] += 1
2984+ data["b"] = 0
2985+ return json.dumps(data)
2986+
2987+ # Block the request (drops all packets.)
2988+ self.proxy.set_blocked(True)
2989+ mod_d = retry_change(self.proxied_client, cpath, update_node)
2990+
2991+ # Unblock and disconnect
2992+ self.proxy.set_blocked(False)
2993+ self.proxy.lose_connection()
2994+
2995+ # Call goes through, contents verified.
2996+ yield mod_d
2997+ content, stat = yield self.direct_client.get(cpath)
2998+ self.assertEqual(json.loads(content),
2999+ {"a": 2, "b": 0, "c": 2})
3000
3001=== modified file 'txzookeeper/tests/test_security.py'
3002--- txzookeeper/tests/test_security.py 2011-06-30 17:14:06 +0000
3003+++ txzookeeper/tests/test_security.py 2013-05-15 13:35:05 +0000
3004@@ -1,8 +1,11 @@
3005 #
3006-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
3007+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
3008 #
3009 # This file is part of txzookeeper.
3010 #
3011+# Authors:
3012+# Kapil Thangavelu
3013+#
3014 # txzookeeper is free software: you can redistribute it and/or modify
3015 # it under the terms of the GNU Lesser General Public License as published by
3016 # the Free Software Foundation, either version 3 of the License, or
3017
3018=== modified file 'txzookeeper/tests/test_session.py'
3019--- txzookeeper/tests/test_session.py 2011-06-30 20:16:17 +0000
3020+++ txzookeeper/tests/test_session.py 2013-05-15 13:35:05 +0000
3021@@ -1,21 +1,46 @@
3022+#
3023+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
3024+#
3025+# This file is part of txzookeeper.
3026+#
3027+# Authors:
3028+# Kapil Thangavelu
3029+#
3030+# txzookeeper is free software: you can redistribute it and/or modify
3031+# it under the terms of the GNU Lesser General Public License as published by
3032+# the Free Software Foundation, either version 3 of the License, or
3033+# (at your option) any later version.
3034+#
3035+# txzookeeper is distributed in the hope that it will be useful,
3036+# but WITHOUT ANY WARRANTY; without even the implied warranty of
3037+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
3038+# GNU Lesser General Public License for more details.
3039+#
3040+# You should have received a copy of the GNU Lesser General Public License
3041+# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
3042+#
3043
3044 import atexit
3045+import logging
3046 import os
3047
3048 import zookeeper
3049
3050-from twisted.internet import reactor
3051-from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
3052+from twisted.internet.defer import (
3053+ inlineCallbacks, Deferred, DeferredList, returnValue)
3054
3055 from txzookeeper import ZookeeperClient
3056 from txzookeeper.client import NotConnectedException, ConnectionException
3057
3058 from txzookeeper.tests.common import ZookeeperCluster
3059 from txzookeeper.tests import ZookeeperTestCase
3060+from txzookeeper import managed
3061
3062
3063 ZK_HOME = os.environ.get("ZOOKEEPER_PATH")
3064-assert ZK_HOME, "ZOOKEEPER_PATH environment variable must be defined"
3065+assert ZK_HOME, (
3066+ "ZOOKEEPER_PATH environment variable must be defined.\n "
3067+ "For deb package installations this is /usr/share/java")
3068
3069 CLUSTER = ZookeeperCluster(ZK_HOME)
3070 atexit.register(lambda cluster: cluster.terminate(), CLUSTER)
3071@@ -31,18 +56,16 @@
3072 zookeeper.deterministic_conn_order(True)
3073 zookeeper.set_debug_level(0)
3074
3075- def sleep(self, delay):
3076- """Non-blocking sleep."""
3077- deferred = Deferred()
3078- reactor.callLater(delay, deferred.callback, None)
3079- return deferred
3080-
3081 @property
3082 def cluster(self):
3083 return CLUSTER
3084
3085 def tearDown(self):
3086 super(ClientSessionTests, self).tearDown()
3087+ if self.client:
3088+ self.client.close()
3089+ if self.client2:
3090+ self.client2.close()
3091 self.cluster.reset()
3092
3093 @inlineCallbacks
3094@@ -118,11 +141,18 @@
3095 """A callback can be specified for connection errors.
3096
3097 We can specify a callback for connection errors, that
3098- can perform recovery for a disconnected client, restablishing
3099+ can perform recovery for a disconnected client.
3100 """
3101 @inlineCallbacks
3102 def connection_error_handler(connection, error):
3103+ # Moved management of this connection attribute out of the
3104+ # default behavior for a connection exception, to support
3105+ # the retry facade. Under the hood libzk is going to be
3106+ # trying to transparently reconnect
3107+ connection.connected = False
3108+
3109 # On loss of the connection, reconnect the client w/ same session.
3110+ connection.close()
3111 yield connection.connect(
3112 self.cluster[1].address, client_id=connection.client_id)
3113 returnValue(23)
3114@@ -157,7 +187,7 @@
3115
3116 def session_event_callback(connection, e):
3117 session_events.append(e)
3118- if len(session_events) == 4:
3119+ if len(session_events) == 8:
3120 events_received.callback(True)
3121
3122 # Connect to a node in the cluster and establish a watch
3123@@ -165,11 +195,18 @@
3124 self.client.set_session_callback(session_event_callback)
3125 yield self.client.connect()
3126
3127- child_d, watch_d = self.client.get_children_and_watch("/")
3128- yield child_d
3129+ # Setup some watches to verify they are cleaned out on expiration.
3130+ d, e_watch_d = self.client.exists_and_watch("/")
3131+ yield d
3132+
3133+ d, g_watch_d = self.client.get_and_watch("/")
3134+ yield d
3135+
3136+ d, c_watch_d = self.client.get_children_and_watch("/")
3137+ yield d
3138
3139 # Connect a client to the same session on a different node.
3140- self.client2 = ZookeeperClient(self.cluster[0].address)
3141+ self.client2 = ZookeeperClient(self.cluster[1].address)
3142 yield self.client2.connect(client_id=self.client.client_id)
3143
3144 # Close the new client and wait for the event propogation
3145@@ -177,8 +214,11 @@
3146
3147 # It can take some time for this to propagate
3148 yield events_received
3149- self.assertEqual(len(session_events), 4)
3150- self.assertEqual(session_events[-1].state_name, "expired")
3151+ self.assertEqual(len(session_events), 8)
3152+
3153+ # The last four (conn + 3 watches) are all expired
3154+ for evt in session_events[4:]:
3155+ self.assertEqual(evt.state_name, "expired")
3156
3157 # The connection is dead without reconnecting.
3158 yield self.assertFailure(
3159@@ -186,9 +226,17 @@
3160 NotConnectedException, ConnectionException)
3161
3162 self.assertTrue(self.client.unrecoverable)
3163+ yield self.assertFailure(e_watch_d, zookeeper.SessionExpiredException)
3164+ yield self.assertFailure(g_watch_d, zookeeper.SessionExpiredException)
3165+ yield self.assertFailure(c_watch_d, zookeeper.SessionExpiredException)
3166
3167 # If a reconnect attempt is made with a dead session id
3168- #yield self.client.connect(client_id=self.client.client_id)
3169+ client_id = self.client.client_id
3170+ self.client.close() # Free the handle
3171+ yield self.client.connect(client_id=client_id)
3172+ yield self.assertFailure(
3173+ self.client.get_children("/"),
3174+ NotConnectedException, ConnectionException)
3175
3176 test_client_session_expiration_event.timeout = 10
3177
3178@@ -210,13 +258,13 @@
3179 session_events.append(e)
3180
3181 # Connect to a node in the cluster and establish a watch
3182- self.client = ZookeeperClient(self.cluster[2].address)
3183+ self.client = ZookeeperClient(self.cluster[2].address,
3184+ session_timeout=5000)
3185 self.client.set_session_callback(session_event_callback)
3186 yield self.client.connect()
3187
3188 yield self.client.create("/hello", flags=zookeeper.EPHEMERAL)
3189- exists_d, watch_d = self.client.exists_and_watch("/hello")
3190- yield exists_d
3191+ self.assertTrue((yield self.client.exists("/hello")))
3192
3193 # Shutdown the server the client is connected to
3194 self.cluster[2].stop()
3195@@ -243,3 +291,46 @@
3196 # Ephemeral is destroyed when the session closed.
3197 exists = yield self.client.exists("/hello")
3198 self.assertFalse(exists)
3199+
3200+ @inlineCallbacks
3201+ def test_managed_client_backoff(self):
3202+ output = self.capture_log(level=logging.DEBUG)
3203+ self.patch(managed, 'BACKOFF_INCREMENT', 2)
3204+ self.client = yield managed.ManagedClient(
3205+ self.cluster[0].address,
3206+ connect_timeout=4).connect()
3207+ self.client2 = yield ZookeeperClient(self.cluster[1].address).connect()
3208+
3209+ exists_d, watch_d = self.client.exists_and_watch("/hello")
3210+ yield exists_d
3211+
3212+ yield self.client2.create("/hello", "world")
3213+ yield self.client2.close()
3214+
3215+ self.cluster[0].stop()
3216+ yield self.sleep(1)
3217+
3218+ # Try to do something with the connection while its down.
3219+ ops = []
3220+ ops.append(self.client.create('/abc', 'test'))
3221+ ops.append(self.client.get("/hello"))
3222+ ops.append(self.client.get_children("/"))
3223+ ops.append(self.client.set("/hello", "sad"))
3224+
3225+ # Sleep and let the session expire, and ensure we're down long enough
3226+ # for backoff to trigger.
3227+ yield self.sleep(10)
3228+
3229+ # Start the cluster and watch things work
3230+ self.cluster[0].run()
3231+ yield DeferredList(
3232+ ops, fireOnOneErrback=True, consumeErrors=True)
3233+
3234+ yield watch_d
3235+
3236+ # Verify we backed off at least once
3237+ self.assertIn("Backing off reconnect", output.getvalue())
3238+ # Verify we only reconnected once
3239+ self.assertTrue(output.getvalue().count("Restablished connection"), 1)
3240+
3241+ test_managed_client_backoff.timeout = 25
3242
3243=== modified file 'txzookeeper/tests/test_utils.py'
3244--- txzookeeper/tests/test_utils.py 2011-06-30 17:14:06 +0000
3245+++ txzookeeper/tests/test_utils.py 2013-05-15 13:35:05 +0000
3246@@ -1,8 +1,11 @@
3247 #
3248-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
3249+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
3250 #
3251 # This file is part of txzookeeper.
3252 #
3253+# Authors:
3254+# Kapil Thangavelu
3255+#
3256 # txzookeeper is free software: you can redistribute it and/or modify
3257 # it under the terms of the GNU Lesser General Public License as published by
3258 # the Free Software Foundation, either version 3 of the License, or
3259
3260=== modified file 'txzookeeper/tests/utils.py'
3261--- txzookeeper/tests/utils.py 2011-06-30 17:14:06 +0000
3262+++ txzookeeper/tests/utils.py 2013-05-15 13:35:05 +0000
3263@@ -1,8 +1,11 @@
3264 #
3265-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
3266+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
3267 #
3268 # This file is part of txzookeeper.
3269 #
3270+# Authors:
3271+# Kapil Thangavelu
3272+#
3273 # txzookeeper is free software: you can redistribute it and/or modify
3274 # it under the terms of the GNU Lesser General Public License as published by
3275 # the Free Software Foundation, either version 3 of the License, or
3276
3277=== modified file 'txzookeeper/todo.txt'
3278--- txzookeeper/todo.txt 2010-07-12 07:42:22 +0000
3279+++ txzookeeper/todo.txt 2013-05-15 13:35:05 +0000
3280@@ -1,10 +1,2 @@
3281
3282-bugs to file upstream
3283-
3284- - you can set acl on a non existant node.
3285- - memory leak every api invocation. [really? need some measurements here]
3286-
3287- observed while trying xtest_get_children_with_watcher
3288-
3289- - segfault if close during completion.
3290- - getting a watch notification when closing a connection, segfaults.
3291+Documentation
3292
3293=== modified file 'txzookeeper/utils.py'
3294--- txzookeeper/utils.py 2011-06-17 23:52:34 +0000
3295+++ txzookeeper/utils.py 2013-05-15 13:35:05 +0000
3296@@ -1,8 +1,11 @@
3297 #
3298-# Copyright (C) 2010 Canonical Ltd. All Rights Reserved
3299+# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
3300 #
3301 # This file is part of txzookeeper.
3302 #
3303+# Authors:
3304+# Kapil Thangavelu
3305+#
3306 # txzookeeper is free software: you can redistribute it and/or modify
3307 # it under the terms of the GNU Lesser General Public License as published by
3308 # the Free Software Foundation, either version 3 of the License, or
3309@@ -17,8 +20,9 @@
3310 # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
3311 #
3312
3313+
3314 import zookeeper
3315-from twisted.internet.defer import inlineCallbacks
3316+from twisted.internet.defer import inlineCallbacks, Deferred
3317
3318
3319 @inlineCallbacks
3320@@ -65,3 +69,16 @@
3321 zookeeper.NoNodeException,
3322 zookeeper.BadVersionException):
3323 pass
3324+
3325+
3326+def sleep(delay):
3327+ """Non-blocking sleep.
3328+
3329+ :param int delay: time in seconds to sleep.
3330+ :return: a Deferred that fires after the desired delay.
3331+ :rtype: :class:`twisted.internet.defer.Deferred`
3332+ """
3333+ from twisted.internet import reactor
3334+ deferred = Deferred()
3335+ reactor.callLater(delay, deferred.callback, None)
3336+ return deferred

Subscribers

People subscribed via source and target branches

to all changes: