Merge lp:~hazmat/txzookeeper/managed-watch-and-ephemeral into lp:txzookeeper

Proposed by Kapil Thangavelu
Status: Merged
Approved by: Jim Baker
Approved revision: 56
Merged at revision: 47
Proposed branch: lp:~hazmat/txzookeeper/managed-watch-and-ephemeral
Merge into: lp:txzookeeper
Diff against target: 1024 lines (+716/-53)
9 files modified
.bzrignore (+4/-0)
setup.py (+2/-2)
txzookeeper/client.py (+63/-40)
txzookeeper/managed.py (+350/-0)
txzookeeper/retry.py (+10/-9)
txzookeeper/tests/__init__.py (+22/-0)
txzookeeper/tests/test_client.py (+1/-1)
txzookeeper/tests/test_managed.py (+263/-0)
txzookeeper/tests/test_retry.py (+1/-1)
To merge this branch: bzr merge lp:~hazmat/txzookeeper/managed-watch-and-ephemeral
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+100715@code.launchpad.net

Description of the change

Managed Connections for txzk

A managed connection automatically handles transient connection failures,
and session expiration. On session expiration it recreates ephemeral nodes,
and fire watch events on extant watchers.

https://codereview.appspot.com/5976074/

To post a comment you must log in.
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Reviewers: mp+100715_code.launchpad.net,

Message:
Please take a look.

Description:
Managed Connections for txzk

A managed connection automatically handles transient connection
failures,
and session expiration. On session expiration it recreates ephemeral
nodes,
and fire watch events on extant watchers.

https://code.launchpad.net/~hazmat/txzookeeper/managed-watch-and-ephemeral/+merge/100715

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/5970080/

Affected files:
   A .bzrignore
   A [revision details]
   M setup.py
   M txzookeeper/client.py
   A txzookeeper/managed.py
   M txzookeeper/retry.py
   M txzookeeper/tests/__init__.py
   M txzookeeper/tests/test_client.py
   A txzookeeper/tests/test_managed.py
   M txzookeeper/tests/test_retry.py

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Reviewers: mp+100715_code.launchpad.net,

Message:
Please take a look.

Description:
Managed Connections for txzk

A managed connection automatically handles transient connection
failures,
and session expiration. On session expiration it recreates ephemeral
nodes,
and fire watch events on extant watchers.

https://code.launchpad.net/~hazmat/txzookeeper/managed-watch-and-ephemeral/+merge/100715

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/5976074/

Affected files:
   A .bzrignore
   A [revision details]
   M setup.py
   M txzookeeper/client.py
   A txzookeeper/managed.py
   M txzookeeper/retry.py
   M txzookeeper/tests/__init__.py
   M txzookeeper/tests/test_client.py
   A txzookeeper/tests/test_managed.py
   M txzookeeper/tests/test_retry.py

55. By Kapil Thangavelu

remove unesc. expire handler from retry client

Revision history for this message
William Reade (fwereade) wrote :

This looks very nice indeed, but the vagueness of the "comes at a cost"
comment makes me wonder what I'm missing...

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py
File txzookeeper/managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode66
txzookeeper/managed.py:66: mgr # keep the flakes happy
Can't you just do:

   with self._ctx():

?

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode70
txzookeeper/managed.py:70: mgr # keep the flakes happy
Ditto

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode122
txzookeeper/managed.py:122: come at a cost though.
Please expand ;).

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode138
txzookeeper/managed.py:138: """Called on intercept of session expiration
to restablish the session.
I'm pretty sure there are 2 es in re-establish, and that the hyphen may
have fallen out of common use but remains IMO more readable. As you
wish.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode140
txzookeeper/managed.py:140: This will reconnect to zk, restabslish
ephemerals, and trigger watches.
I like the word "restabslish", but ditto.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/tests/test_managed.py
File txzookeeper/tests/test_managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/tests/test_managed.py#newcode169
txzookeeper/tests/test_managed.py:169: # It takes some time to propogate
(1/3 session time as ping)
s/propogate/propagate/

https://codereview.appspot.com/5976074/

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

On 2012/04/04 13:23:29, fwereade wrote:
> This looks very nice indeed, but the vagueness of the "comes at a
cost" comment
> makes me wonder what I'm missing...

expanded greatly what those costs are.

> https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py
> File txzookeeper/managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode66
> txzookeeper/managed.py:66: mgr # keep the flakes happy
> Can't you just do:

> with self._ctx():

> ?

much nicer, thanks.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode70
> txzookeeper/managed.py:70: mgr # keep the flakes happy
> Ditto

done.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode122
> txzookeeper/managed.py:122: come at a cost though.
> Please expand ;).

done.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode138
> txzookeeper/managed.py:138: """Called on intercept of session
expiration to
> restablish the session.
> I'm pretty sure there are 2 es in re-establish, and that the hyphen
may have
> fallen out of common use but remains IMO more readable. As you wish.

proper speling established ;-)

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode140
> txzookeeper/managed.py:140: This will reconnect to zk, restabslish
ephemerals,
> and trigger watches.
> I like the word "restabslish", but ditto.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/tests/test_managed.py
> File txzookeeper/tests/test_managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/tests/test_managed.py#newcode169
> txzookeeper/tests/test_managed.py:169: # It takes some time to
propogate (1/3
> session time as ping)
> s/propogate/propagate/

done.

http://codereview.appspot.com/5976074/

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Please take a look.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py
File txzookeeper/managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode66
txzookeeper/managed.py:66: mgr # keep the flakes happy
On 2012/04/04 13:23:29, fwereade wrote:
> Can't you just do:

> with self._ctx():

> ?

thanks, much nicer.

https://codereview.appspot.com/5976074/

56. By Kapil Thangavelu

address review comments

Revision history for this message
Jim Baker (jimbaker) wrote :

+1, LGTM, just some minors

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py
File txzookeeper/managed.py (right):

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode153
txzookeeper/managed.py:153: def cb_restablish_session(self, e=None):
Presumably this should be cb_re_establish... but I don't see the
difference here with the easier synonym: restore. Good either way with
me.

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode162
txzookeeper/managed.py:162: # If its been explicitly closed, don't
restablish.
it's... re-establish

(or are you saying something about being restful? ;) )

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode166
txzookeeper/managed.py:166: # If its a stale handle, don't restablish
it's... re-establish

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode174
txzookeeper/managed.py:174: # Its already been restablished, don't
restablish.
It's... re-established... re-establish

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode183
txzookeeper/managed.py:183: # Restablish
I'm not going to mark any more of these!

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode189
txzookeeper/managed.py:189: log.error("error while restablish %r %s" %
(e, e))
Another minor: probably want some consistency in beginning with
uppercase. Not certain why you use log.error here, but log.exception
later for what appears to be a similar case.

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/tests/test_managed.py
File txzookeeper/tests/test_managed.py (right):

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/tests/test_managed.py#newcode110
txzookeeper/tests/test_managed.py:110: """Reset fires a synthentic
client event, and clears watches.
no need for comma

https://codereview.appspot.com/5976074/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== added file '.bzrignore'
--- .bzrignore 1970-01-01 00:00:00 +0000
+++ .bzrignore 2012-04-04 18:12:22 +0000
@@ -0,0 +1,4 @@
1.emacs.desktop
2_trial_temp
3txzookeeper
4txzookeeper.egg-info
05
=== modified file 'setup.py'
--- setup.py 2011-12-12 13:19:28 +0000
+++ setup.py 2012-04-04 18:12:22 +0000
@@ -45,8 +45,8 @@
45 name="txzookeeper",45 name="txzookeeper",
46 version=version,46 version=version,
47 description="Twisted api for Apache Zookeeper",47 description="Twisted api for Apache Zookeeper",
48 author="Ensemble Developers",48 author="Juju Developers",
49 author_email="ensemble@lists.ubuntu.com",49 author_email="juju@lists.ubuntu.com",
50 url="https://launchpad.net/txzookeeper",50 url="https://launchpad.net/txzookeeper",
51 license="LGPL",51 license="LGPL",
52 packages=find_packages(),52 packages=find_packages(),
5353
=== modified file 'txzookeeper/client.py'
--- txzookeeper/client.py 2011-12-06 21:21:29 +0000
+++ txzookeeper/client.py 2012-04-04 18:12:22 +0000
@@ -96,15 +96,20 @@
9696
97 @property97 @property
98 def state_name(self):98 def state_name(self):
99 return STATE_NAME_MAPPING[self.args[2]]99 return STATE_NAME_MAPPING.get(self.args[2], "unknown")
100100
101 @property101 @property
102 def type_name(self):102 def type_name(self):
103 return TYPE_NAME_MAPPING[self.args[1]]103 return TYPE_NAME_MAPPING.get(self.args[1], "unknown")
104
105 @property
106 def handle(self):
107 return self.args[3]
104108
105 def __str__(self):109 def __str__(self):
106 return "<txzookeeper.ConnectionException type: %s state: %s>" % (110 return (
107 self.type_name, self.state_name)111 "<txzookeeper.ConnectionException handle: %s type: %s state: %s>"
112 % (self.handle, self.type_name, self.state_name))
108113
109114
110def is_connection_exception(e):115def is_connection_exception(e):
@@ -133,11 +138,11 @@
133138
134 @property139 @property
135 def type_name(self):140 def type_name(self):
136 return TYPE_NAME_MAPPING[self.type]141 return TYPE_NAME_MAPPING.get(self.type, "unknown")
137142
138 @property143 @property
139 def state_name(self):144 def state_name(self):
140 return STATE_NAME_MAPPING[self.connection_state]145 return STATE_NAME_MAPPING.get(self.connection_state, "unknown")
141146
142 def __repr__(self):147 def __repr__(self):
143 return "<ClientEvent %s at %r state: %s>" % (148 return "<ClientEvent %s at %r state: %s>" % (
@@ -169,6 +174,10 @@
169 self.connected = False174 self.connected = False
170 self.handle = None175 self.handle = None
171176
177 def __repr__(self):
178 return "<txZookeeperClient client: %s handle: %r state: %s>" % (
179 self.client_id[0], self.handle, self.state)
180
172 def _check_connected(self, d):181 def _check_connected(self, d):
173 if not self.connected:182 if not self.connected:
174 d.errback(NotConnectedException("not connected"))183 d.errback(NotConnectedException("not connected"))
@@ -218,7 +227,7 @@
218 d.callback((value, stat))227 d.callback((value, stat))
219228
220 callback = self._zk_thread_callback(_cb_get)229 callback = self._zk_thread_callback(_cb_get)
221 watcher = self._wrap_watcher(watcher)230 watcher = self._wrap_watcher(watcher, "get", path)
222 result = zookeeper.aget(self.handle, path, watcher, callback)231 result = zookeeper.aget(self.handle, path, watcher, callback)
223 self._check_result(result, d)232 self._check_result(result, d)
224 return d233 return d
@@ -234,7 +243,7 @@
234 d.callback(children)243 d.callback(children)
235244
236 callback = self._zk_thread_callback(_cb_get_children)245 callback = self._zk_thread_callback(_cb_get_children)
237 watcher = self._wrap_watcher(watcher)246 watcher = self._wrap_watcher(watcher, "child", path)
238 result = zookeeper.aget_children(self.handle, path, watcher, callback)247 result = zookeeper.aget_children(self.handle, path, watcher, callback)
239 self._check_result(result, d)248 self._check_result(result, d)
240 return d249 return d
@@ -251,12 +260,12 @@
251 d.callback(stat)260 d.callback(stat)
252261
253 callback = self._zk_thread_callback(_cb_exists)262 callback = self._zk_thread_callback(_cb_exists)
254 watcher = self._wrap_watcher(watcher)263 watcher = self._wrap_watcher(watcher, "exists", path)
255 result = zookeeper.aexists(self.handle, path, watcher, callback)264 result = zookeeper.aexists(self.handle, path, watcher, callback)
256 self._check_result(result, d)265 self._check_result(result, d)
257 return d266 return d
258267
259 def _wrap_watcher(self, watcher):268 def _wrap_watcher(self, watcher, watch_type, path):
260 if watcher is None:269 if watcher is None:
261 return watcher270 return watcher
262 if not callable(watcher):271 if not callable(watcher):
@@ -285,14 +294,19 @@
285 else:294 else:
286 return watcher(event_type, conn_state, path)295 return watcher(event_type, conn_state, path)
287296
288 def _zk_thread_callback(self, func):297 def _zk_thread_callback(self, func, *f_args, **f_kw):
289 """298 """
290 The client library invokes callbacks in a separate thread, we wrap299 The client library invokes callbacks in a separate thread, we wrap
291 any user defined callback so that they are called back in the main300 any user defined callback so that they are called back in the main
292 thread after, zookeeper calls the wrapper.301 thread after, zookeeper calls the wrapper.
293 """302 """
303 f_args = list(f_args)
304
294 def wrapper(handle, *args): # pragma: no cover305 def wrapper(handle, *args): # pragma: no cover
295 reactor.callFromThread(func, *args)306 # make a copy the conn watch callback gets invoked multiple times
307 cb_args = list(f_args)
308 cb_args.extend(args)
309 reactor.callFromThread(func, *cb_args, **f_kw)
296 return wrapper310 return wrapper
297311
298 @property312 @property
@@ -391,6 +405,7 @@
391 d = defer.Deferred()405 d = defer.Deferred()
392 if self._check_result(result, d):406 if self._check_result(result, d):
393 return d407 return d
408 self.handle = None
394 d.callback(True)409 d.callback(True)
395 return d410 return d
396411
@@ -437,7 +452,6 @@
437 else:452 else:
438 self.handle = zookeeper.init(453 self.handle = zookeeper.init(
439 self._servers, callback, self._session_timeout)454 self._servers, callback, self._session_timeout)
440
441 return d455 return d
442456
443 def _cb_connected(457 def _cb_connected(
@@ -460,6 +474,7 @@
460 # If we timed out and then connected, then close the conn.474 # If we timed out and then connected, then close the conn.
461 if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called:475 if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called:
462 self.close()476 self.close()
477 return
463478
464 # Send session events to the callback, in addition to any479 # Send session events to the callback, in addition to any
465 # duplicate session events that will be sent for extant watches.480 # duplicate session events that will be sent for extant watches.
@@ -468,7 +483,10 @@
468 self, ClientEvent(type, state, path))483 self, ClientEvent(type, state, path))
469484
470 return485 return
471 elif state == zookeeper.CONNECTED_STATE:486 # Connected successfully, or If we're expired on an initial
487 # connect, someone else expired us.
488 elif state in (zookeeper.CONNECTED_STATE,
489 zookeeper.EXPIRED_SESSION_STATE):
472 connect_deferred.callback(self)490 connect_deferred.callback(self)
473 return491 return
474492
@@ -488,17 +506,18 @@
488 if self._check_connected(d):506 if self._check_connected(d):
489 return d507 return d
490508
491 def _cb_created(result_code, path):509 callback = self._zk_thread_callback(
492 if self._check_result(result_code, d):510 self._cb_created, d, data, acls, flags)
493 return
494 d.callback(path)
495
496 callback = self._zk_thread_callback(_cb_created)
497 result = zookeeper.acreate(511 result = zookeeper.acreate(
498 self.handle, path, data, acls, flags, callback)512 self.handle, path, data, acls, flags, callback)
499 self._check_result(result, d)513 self._check_result(result, d)
500 return d514 return d
501515
516 def _cb_created(self, d, data, acls, flags, result_code, path):
517 if self._check_result(result_code, d):
518 return
519 d.callback(path)
520
502 def delete(self, path, version=-1):521 def delete(self, path, version=-1):
503 """522 """
504 Delete the node at the given path. If the current node version on the523 Delete the node at the given path. If the current node version on the
@@ -513,16 +532,16 @@
513 if self._check_connected(d):532 if self._check_connected(d):
514 return d533 return d
515534
516 def _cb_delete(result_code):535 callback = self._zk_thread_callback(self._cb_deleted, d, path)
517 if self._check_result(result_code, d):
518 return
519 d.callback(result_code)
520
521 callback = self._zk_thread_callback(_cb_delete)
522 result = zookeeper.adelete(self.handle, path, version, callback)536 result = zookeeper.adelete(self.handle, path, version, callback)
523 self._check_result(result, d)537 self._check_result(result, d)
524 return d538 return d
525539
540 def _cb_deleted(self, d, path, result_code):
541 if self._check_result(result_code, d):
542 return
543 d.callback(result_code)
544
526 def exists(self, path):545 def exists(self, path):
527 """546 """
528 Check that the given node path exists. Returns a deferred that547 Check that the given node path exists. Returns a deferred that
@@ -657,17 +676,17 @@
657 if self._check_connected(d):676 if self._check_connected(d):
658 return d677 return d
659678
660 def _cb_set_acl(result_code):679 callback = self._zk_thread_callback(self._cb_set_acl, d, path, acls)
661 if self._check_result(result_code, d):
662 return
663 d.callback(result_code)
664
665 callback = self._zk_thread_callback(_cb_set_acl)
666 result = zookeeper.aset_acl(680 result = zookeeper.aset_acl(
667 self.handle, path, version, acls, callback)681 self.handle, path, version, acls, callback)
668 self._check_result(result, d)682 self._check_result(result, d)
669 return d683 return d
670684
685 def _cb_set_acl(self, d, path, acls, result_code):
686 if self._check_result(result_code, d):
687 return
688 d.callback(result_code)
689
671 def set(self, path, data="", version=-1):690 def set(self, path, data="", version=-1):
672 """691 """
673 Sets the data of a node at the given path. If the current node version692 Sets the data of a node at the given path. If the current node version
@@ -683,16 +702,16 @@
683 if self._check_connected(d):702 if self._check_connected(d):
684 return d703 return d
685704
686 def _cb_set(result_code, node_stat):705 callback = self._zk_thread_callback(self._cb_set, d, path, data)
687 if self._check_result(result_code, d):
688 return
689 d.callback(node_stat)
690
691 callback = self._zk_thread_callback(_cb_set)
692 result = zookeeper.aset(self.handle, path, data, version, callback)706 result = zookeeper.aset(self.handle, path, data, version, callback)
693 self._check_result(result, d)707 self._check_result(result, d)
694 return d708 return d
695709
710 def _cb_set(self, d, path, data, result_code, node_stat):
711 if self._check_result(result_code, d):
712 return
713 d.callback(node_stat)
714
696 def set_connection_watcher(self, watcher):715 def set_connection_watcher(self, watcher):
697 """716 """
698 Sets a permanent global watcher on the connection. This will get717 Sets a permanent global watcher on the connection. This will get
@@ -702,7 +721,7 @@
702 """721 """
703 if not callable(watcher):722 if not callable(watcher):
704 raise SyntaxError("Invalid Watcher %r" % (watcher))723 raise SyntaxError("Invalid Watcher %r" % (watcher))
705 watcher = self._wrap_watcher(watcher)724 watcher = self._wrap_watcher(watcher, None, None)
706 zookeeper.set_watcher(self.handle, watcher)725 zookeeper.set_watcher(self.handle, watcher)
707726
708 def set_session_callback(self, callback):727 def set_session_callback(self, callback):
@@ -740,9 +759,13 @@
740 """759 """
741 if not callable(callback):760 if not callable(callback):
742 raise TypeError("Invalid callback %r" % callback)761 raise TypeError("Invalid callback %r" % callback)
762 if self._connection_error_callback is not None:
763 raise RuntimeError((
764 "Connection error handlers can't be changed %s" %
765 self._connection_error_callback))
743 self._connection_error_callback = callback766 self._connection_error_callback = callback
744767
745 def set_determinstic_order(self, boolean):768 def set_deterministic_order(self, boolean):
746 """769 """
747 The zookeeper client will by default randomize the server hosts770 The zookeeper client will by default randomize the server hosts
748 it will connect to unless this is set to True.771 it will connect to unless this is set to True.
749772
=== added file 'txzookeeper/managed.py'
--- txzookeeper/managed.py 1970-01-01 00:00:00 +0000
+++ txzookeeper/managed.py 2012-04-04 18:12:22 +0000
@@ -0,0 +1,350 @@
1from functools import partial
2
3import contextlib
4import logging
5import zookeeper
6
7from twisted.internet.defer import (
8 inlineCallbacks, DeferredLock, fail, returnValue)
9
10from client import ZookeeperClient, ClientEvent, NotConnectedException
11from retry import RetryClient
12
13
14class StopWatcher(Exception):
15 pass
16
17
18WATCH_KIND_MAP = {
19 "child": "get_children_and_watch",
20 "exists": "exists_and_watch",
21 "get": "get_and_watch"
22 }
23
24
25log = logging.getLogger("txzk.managed")
26
27
28class Watch(object):
29 """
30 For application driven persistent watches, where the application
31 is manually resetting the watch.
32 """
33
34 __slots__ = ("_mgr", "_client", "_path", "_kind", "_callback")
35
36 def __init__(self, mgr, path, kind, callback):
37 self._mgr = mgr
38 self._path = path
39 self._kind = kind
40 self._callback = callback
41
42 @property
43 def path(self):
44 return self._path
45
46 @property
47 def kind(self):
48 return self._kind
49
50 @contextlib.contextmanager
51 def _ctx(self):
52 mgr = self._mgr
53 del self._mgr
54 try:
55 yield mgr
56 finally:
57 mgr.remove(self)
58
59 @inlineCallbacks
60 def reset(self):
61 with self._ctx():
62 yield self._callback(
63 zookeeper.SESSION_EVENT,
64 zookeeper.CONNECTED_STATE,
65 self._path)
66
67 def __call__(self, *args, **kw):
68 with self._ctx():
69 return self._callback(*args, **kw)
70
71 def __str__(self):
72 return "<Watcher %s %s %r>" % (self.kind, self.path, self._callback)
73
74
75class WatchManager(object):
76
77 watch_class = Watch
78
79 def __init__(self):
80 self._watches = []
81
82 def add(self, path, watch_type, watcher):
83 w = self.watch_class(self, path, watch_type, watcher)
84 self._watches.append(w)
85 return w
86
87 def remove(self, w):
88 try:
89 self._watches.remove(w)
90 except ValueError:
91 pass
92
93 def iterkeys(self):
94 for w in self._watches:
95 yield (w.path, w.kind)
96
97 def clear(self):
98 del self._watches
99 self._watches = []
100
101 @inlineCallbacks
102 def reset(self, *ignored):
103 watches = self._watches
104 self._watches = []
105
106 for w in watches:
107 try:
108 yield w.reset()
109 except Exception, e:
110 log.error("Error reseting watch %s with session event. %s %r",
111 w, e, e)
112 continue
113
114
115class SessionClient(ZookeeperClient):
116 """A managed client that automatically re-establishes ephemerals and
117 triggers watches after reconnecting post session expiration.
118
119 This abstracts the client from session expiration handling. It does
120 come at a cost though.
121
122 There are two application constraints that need to be considered for usage
123 of the SessionClient or ManagedClient. The first is that watch callbacks
124 which examine the event, must be able to handle the synthetic session
125 event which is sent to them when the session is re-established.
126
127 The second and more problematic is that algorithms/patterns
128 utilizing ephemeral sequence nodes need to be rethought, as the
129 session client will recreate the nodes when reconnecting at their
130 previous paths. Some algorithms (like the std zk lock recipe) rely
131 on properties like the smallest valued ephemeral sequence node in
132 a container to identify the lock holder, with the notion that upon
133 session expiration a new lock/leader will be sought. Sequence
134 ephemeral node recreation in this context is problematic as the
135 node is recreated at the exact previous path. Alternative lock
136 strategies that do work are fairly simple at low volume, such as
137 owning a particular node path (ie. /locks/lock-holder) with an
138 ephemeral.
139 """
140
141 def __init__(
142 self, servers=None, session_timeout=None, connect_timeout=4000):
143 """
144 """
145 super(SessionClient, self).__init__(servers, session_timeout)
146 self._connect_timeout = connect_timeout
147 self._watches = WatchManager()
148 self._ephemerals = {}
149 self._reconnect_lock = DeferredLock()
150 self.set_connection_error_callback(self._cb_connection_error)
151
152 @inlineCallbacks
153 def cb_restablish_session(self, e=None):
154 """Called on intercept of session expiration to create new session.
155
156 This will reconnect to zk, re-establish ephemerals, and
157 trigger watches.
158 """
159 yield self._reconnect_lock.acquire()
160
161 try:
162 # If its been explicitly closed, don't restablish.
163 if self.handle is None:
164 return
165
166 # If its a stale handle, don't restablish
167 try:
168 zookeeper.is_unrecoverable(self.handle)
169 except zookeeper.ZooKeeperException:
170 if e:
171 raise e
172 return
173
174 # Its already been restablished, don't restablish.
175 if not self.unrecoverable:
176 return
177 elif self.connected:
178 self.close()
179 self.handle = 0
180 elif isinstance(self.handle, int):
181 self.handle = 0
182
183 # Restablish
184 assert self.handle == 0
185 yield self._cb_restablish_session().addErrback(
186 self._cb_restablish_errback, e)
187
188 except Exception, e:
189 log.error("error while restablish %r %s" % (e, e))
190 finally:
191 yield self._reconnect_lock.release()
192
193 @inlineCallbacks
194 def _cb_restablish_session(self):
195 """Re-establish a new session, and recreate ephemerals and watches.
196 """
197 while 1:
198 # Reconnect
199 if self.handle is None:
200 returnValue(self.handle)
201 try:
202 yield self.connect(timeout=self._connect_timeout)
203 except zookeeper.ZooKeeperException, e:
204 log.exception("Error while connecting %r %s" % (e, e))
205 continue
206 else:
207 break
208
209 items = self._ephemerals.items()
210 self._ephemerals = {}
211
212 for path, e in items:
213 try:
214 yield self.create(
215 path, e['data'], acls=e['acls'], flags=e['flags'])
216 except zookeeper.NodeExistsException:
217 log.error("Attempt to create ephemeral node failed %r", path)
218 yield self._watches.reset()
219
220 def _cb_restablish_errback(self, err, failure):
221 """If there's an error re-establishing the session log it.
222 """
223 log.error("Error while trying to restablish connection %s\n%s" % (
224 failure.value, failure.getTraceback()))
225 return failure
226
227 @inlineCallbacks
228 def _cb_connection_error(self, client, error):
229 """Convert session expiration to a transient connection error.
230
231 Dispatches from api usage error.
232 """
233 if not isinstance(error, (
234 zookeeper.SessionExpiredException,
235 NotConnectedException,
236 zookeeper.ClosingException)):
237 raise error
238 yield self._cb_restablish_session()
239 raise zookeeper.ConnectionLossException
240
241 @inlineCallbacks
242 def _cb_connection_event(self, evt):
243 """
244 """
245
246 # client connected tracker
247 def _check_connected(self, d):
248 """Clients are automatically reconnected."""
249 if self.connected:
250 return
251
252 if self.handle is None:
253 d.errback(NotConnectedException("not connected"))
254 return d
255
256 c_d = self.cb_restablish_session()
257
258 def after_connected(client):
259 """Return a transient connection failure.
260
261 The retry client will automatically attempt to retry the operation.
262 """
263 return fail(zookeeper.ConnectionLossException("Retry"))
264
265 c_d.addCallback(after_connected)
266 c_d.chainDeferred(d)
267 return d
268
269 # Dispatch from node watches on session expiration
270 def _watch_session_wrapper(self, watcher, event_type, conn_state, path):
271 """Watch wrapper that diverts session events to a connection callback.
272 """
273 if (event_type == zookeeper.SESSION_EVENT and
274 conn_state == zookeeper.EXPIRED_SESSION_STATE):
275 d = self.cb_restablish_session()
276 d.addErrback(self._cb_restablish_errback)
277 return d
278 if event_type == zookeeper.SESSION_EVENT:
279 if self._session_event_callback:
280 self._session_event_callback(
281 self, ClientEvent(event_type, conn_state, path))
282 else:
283 return watcher(event_type, conn_state, path)
284
285 # Track all watches
286 def _wrap_watcher(self, watcher, watch_type, path):
287 if watcher is None:
288 return watcher
289 if not callable(watcher):
290 raise SyntaxError("invalid watcher")
291
292 # handle conn watcher, separately.
293 if watch_type is None and path is None:
294 return self._zk_thread_callback(
295 self._watch_session_wrapper, watcher)
296
297 return self._zk_thread_callback(
298 partial(
299 self._watch_session_wrapper,
300 self._watches.add(path, watch_type, watcher)))
301
302 # Track ephemerals
303 def _cb_created(self, d, data, acls, flags, result_code, path):
304 if self._check_result(result_code, d):
305 return
306
307 if flags & zookeeper.EPHEMERAL:
308 self._ephemerals[path] = dict(
309 data=data, acls=acls, flags=flags)
310
311 d.callback(path)
312
313 def _cb_deleted(self, d, path, result_code):
314 if self._check_result(result_code, d):
315 return
316
317 self._ephemerals.pop(path, None)
318 d.callback(result_code)
319
320 def _cb_set_acl(self, d, path, acls, result_code):
321 if self._check_result(result_code, d):
322 return
323
324 if path in self._ephemerals:
325 self._ephemerals[path]['acls'] = acls
326
327 d.callback(result_code)
328
329 def _cb_set(self, d, path, data, result_code, node_stat):
330 if self._check_result(result_code, d):
331 return
332
333 if path in self._ephemerals:
334 self._ephemerals[path]['data'] = data
335
336 d.callback(node_stat)
337
338
339class ManagedClient(RetryClient):
340
341 def __init__(self, *args, **kw):
342 client = SessionClient(*args, **kw)
343 super(ManagedClient, self).__init__(
344 client, client.cb_restablish_session)
345
346
347def ManagedClient(servers=None, session_timeout=None, connect_timeout=10000):
348 from retry import RetryClient
349 client = SessionClient(servers, session_timeout, connect_timeout)
350 return RetryClient(client)
0351
=== modified file 'txzookeeper/retry.py'
--- txzookeeper/retry.py 2011-11-08 12:15:16 +0000
+++ txzookeeper/retry.py 2012-04-04 18:12:22 +0000
@@ -64,7 +64,7 @@
6464
65 :param session_timeout: The timeout for the session, in milliseconds65 :param session_timeout: The timeout for the session, in milliseconds
66 :param max_delay: The max delay for a retry, in seconds.66 :param max_delay: The max delay for a retry, in seconds.
67 :param session_fraction: The fractinoal amount of a timeout to wait67 :param session_fraction: The fractional amount of a timeout to wait
68 """68 """
69 retry_delay = session_timeout / (float(session_fraction) * 1000)69 retry_delay = session_timeout / (float(session_fraction) * 1000)
70 return min(retry_delay, max_delay)70 return min(retry_delay, max_delay)
@@ -111,20 +111,20 @@
111 must return a single value (either a deferred or result111 must return a single value (either a deferred or result
112 value).112 value).
113 """113 """
114 # For clients which aren't connected (session timeout == None)
115 # we raise the errors to the callers
116 session_timeout = client.session_timeout or 0
117
118 # If we keep retrying past the 1.5 * session timeout without
119 # success just die, the session expiry is fatal.
120 max_time = session_timeout * 1.5 + time.time()
121
122 while 1:114 while 1:
123 try:115 try:
124 value = yield func(*args, **kw)116 value = yield func(*args, **kw)
125 except Exception, e:117 except Exception, e:
118 # For clients which aren't connected (session timeout == None)
119 # we raise the errors to the callers
120 session_timeout = client.session_timeout or 0
121
122 # If we keep retrying past the 1.5 * session timeout without
123 # success just die, the session expiry is fatal.
124 max_time = session_timeout * 1.5 + time.time()
126 if not check_retryable(client, max_time, e):125 if not check_retryable(client, max_time, e):
127 raise126 raise
127
128 # Give the connection a chance to auto-heal.128 # Give the connection a chance to auto-heal.
129 yield sleep(get_delay(session_timeout))129 yield sleep(get_delay(session_timeout))
130 continue130 continue
@@ -172,6 +172,7 @@
172 # Give the connection a chance to auto-heal172 # Give the connection a chance to auto-heal
173 d = sleep(get_delay(session_timeout))173 d = sleep(get_delay(session_timeout))
174 d.addCallback(retry_inner)174 d.addCallback(retry_inner)
175
175 return d176 return d
176177
177 def retry_inner(value):178 def retry_inner(value):
178179
=== modified file 'txzookeeper/tests/__init__.py'
--- txzookeeper/tests/__init__.py 2011-10-22 04:24:02 +0000
+++ txzookeeper/tests/__init__.py 2012-04-04 18:12:22 +0000
@@ -20,6 +20,8 @@
20# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.20# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
21#21#
2222
23import logging
24import StringIO
23import sys25import sys
2426
25import zookeeper27import zookeeper
@@ -72,6 +74,26 @@
72 return None74 return None
73 return original_value75 return original_value
7476
77 def capture_log(
78 self, name="", level=logging.INFO, log_file=None, formatter=None):
79 """Capture log channel to StringIO"""
80 if log_file is None:
81 log_file = StringIO.StringIO()
82 log_handler = logging.StreamHandler(log_file)
83 if formatter:
84 log_handler.setFormatter(formatter)
85 logger = logging.getLogger(name)
86 logger.addHandler(log_handler)
87 old_logger_level = logger.level
88 logger.setLevel(level)
89
90 @self.addCleanup
91 def reset_logging():
92 logger.removeHandler(log_handler)
93 logger.setLevel(old_logger_level)
94
95 return log_file
96
7597
76def egg_test_runner():98def egg_test_runner():
77 """99 """
78100
=== modified file 'txzookeeper/tests/test_client.py'
--- txzookeeper/tests/test_client.py 2011-10-22 04:24:02 +0000
+++ txzookeeper/tests/test_client.py 2012-04-04 18:12:22 +0000
@@ -1050,7 +1050,7 @@
1050 d = self.client.connect()1050 d = self.client.connect()
10511051
1052 def verify_session_timeout(client):1052 def verify_session_timeout(client):
1053 self.assertEqual(client.session_timeout, 4000)1053 self.assertIn(client.session_timeout, (4000, 10000))
10541054
1055 d.addCallback(verify_session_timeout)1055 d.addCallback(verify_session_timeout)
1056 return d1056 return d
10571057
=== added file 'txzookeeper/tests/test_managed.py'
--- txzookeeper/tests/test_managed.py 1970-01-01 00:00:00 +0000
+++ txzookeeper/tests/test_managed.py 2012-04-04 18:12:22 +0000
@@ -0,0 +1,263 @@
1#
2# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved
3#
4# This file is part of txzookeeper.
5#
6# Authors:
7# Kapil Thangavelu
8#
9# txzookeeper is free software: you can redistribute it and/or modify
10# it under the terms of the GNU Lesser General Public License as published by
11# the Free Software Foundation, either version 3 of the License, or
12# (at your option) any later version.
13#
14# txzookeeper is distributed in the hope that it will be useful,
15# but WITHOUT ANY WARRANTY; without even the implied warranty of
16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17# GNU Lesser General Public License for more details.
18#
19# You should have received a copy of the GNU Lesser General Public License
20# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
21#
22
23import zookeeper
24
25from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList
26
27from txzookeeper.client import ZookeeperClient, ClientEvent
28from txzookeeper import managed
29from txzookeeper.tests import ZookeeperTestCase, utils
30from txzookeeper.tests import test_client
31
32
33class WatchTest(ZookeeperTestCase):
34 """Watch manager and watch tests"""
35
36 def setUp(self):
37 self.watches = managed.WatchManager()
38
39 def tearDown(self):
40 self.watches.clear()
41 del self.watches
42
43 def test_add_remove(self):
44
45 w = self.watches.add("/foobar", "child", lambda x: 1)
46 self.assertIn(
47 "<Watcher child /foobar",
48 str(w))
49 self.assertIn(w, self.watches._watches)
50 self.watches.remove(w)
51 self.assertNotIn(w, self.watches._watches)
52 self.watches.remove(w)
53
54 @inlineCallbacks
55 def test_watch_fire_removes(self):
56 """Firing the watch removes it from the manager.
57 """
58 w = self.watches.add("/foobar", "child", lambda x: 1)
59 yield w("a")
60 self.assertNotIn(w, self.watches._watches)
61
62 @inlineCallbacks
63 def test_watch_fire_with_error_removes(self):
64 """Firing the watch removes it from the manager.
65 """
66 d = Deferred()
67
68 @inlineCallbacks
69 def cb_error(e):
70 yield d
71 raise ValueError("a")
72
73 w = self.watches.add("/foobar", "child", lambda x: 1)
74 try:
75 wd = w("a")
76 d.callback(True)
77 yield wd
78 except ValueError:
79 pass
80 self.assertNotIn(w, self.watches._watches)
81
82 @inlineCallbacks
83 def test_reset_with_error(self):
84 """A callback firing an error on reset is ignored.
85 """
86 output = self.capture_log("txzk.managed")
87 d = Deferred()
88 results = []
89
90 @inlineCallbacks
91 def callback(*args, **kw):
92 results.append((args, kw))
93 yield d
94 raise ValueError("a")
95
96 w = self.watches.add("/foobar", "child", callback)
97 reset_done = self.watches.reset()
98
99 e, _ = results.pop()
100 self.assertEqual(
101 str(ClientEvent(*e)),
102 "<ClientEvent session at '/foobar' state: connected>")
103 d.callback(True)
104 yield reset_done
105 self.assertNotIn(w, self.watches._watches)
106 self.assertIn("Error reseting watch", output.getvalue())
107
108 @inlineCallbacks
109 def test_reset(self):
110 """Reset fires a synthentic client event, and clears watches.
111 """
112 d = Deferred()
113 results = []
114
115 def callback(*args, **kw):
116 results.append((args, kw))
117 return d
118
119 w = self.watches.add("/foobar", "child", callback)
120 reset_done = self.watches.reset()
121
122 e, _ = results.pop()
123 self.assertEqual(
124 str(ClientEvent(*e)),
125 "<ClientEvent session at '/foobar' state: connected>")
126 d.callback(True)
127 yield reset_done
128 self.assertNotIn(w, self.watches._watches)
129
130
131class SessionClientTests(test_client.ClientTests):
132 """Run through basic operations with SessionClient."""
133 timeout = 5
134
135 def setUp(self):
136 super(SessionClientTests, self).setUp()
137 self.client = managed.SessionClient("127.0.0.1:2181")
138
139 def test_client_use_while_disconnected_returns_failure(self):
140 # managed session client reconnects here.
141 return True
142
143
144class SessionClientExpireTests(ZookeeperTestCase):
145 """Verify expiration behavior."""
146
147 def setUp(self):
148 super(SessionClientExpireTests, self).setUp()
149 self.client = managed.ManagedClient("127.0.0.1:2181", 3000)
150 self.client2 = None
151 return self.client.connect()
152
153 @inlineCallbacks
154 def tearDown(self):
155 self.client.close()
156
157 self.client2 = ZookeeperClient("127.0.0.1:2181")
158 yield self.client2.connect()
159 utils.deleteTree(handle=self.client2.handle)
160 yield self.client2.close()
161 super(SessionClientExpireTests, self).tearDown()
162
163 @inlineCallbacks
164 def expire_session(self):
165 assert self.client.connected
166 self.client2 = ZookeeperClient(self.client.servers)
167 yield self.client2.connect(client_id=self.client.client_id)
168 yield self.client2.close()
169 # It takes some time to propagate (1/3 session time as ping)
170 yield self.sleep(2)
171
172 @inlineCallbacks
173 def test_session_expiration_conn(self):
174 session_id = self.client.client_id[0]
175 yield self.client.create("/fo-1", "abc")
176 yield self.expire_session()
177 yield self.client.exists("/")
178 self.assertNotEqual(session_id, self.client.client_id[0])
179
180 @inlineCallbacks
181 def test_session_expiration_conn_watch(self):
182 session_id = self.client.client_id[0]
183 yield self.client.create("/fo-1", "abc")
184 yield self.expire_session()
185 yield self.client.exists("/")
186 self.assertNotEqual(session_id, self.client.client_id[0])
187
188 @inlineCallbacks
189 def test_invoked_watch_gc(self):
190 c_d, w_d = yield self.client.get_children_and_watch("/")
191 yield c_d
192 yield self.client.create("/foo")
193 yield w_d
194 yield self.expire_session()
195 yield self.client.create("/foo2")
196 # Nothing should blow up
197 yield self.sleep(0.2)
198
199 @inlineCallbacks
200 def test_ephemeral_and_watch_recreate(self):
201 # Create some ephemeral nodes
202 yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
203 yield self.client.create("/fo-2", "def", flags=zookeeper.EPHEMERAL)
204
205 # Create some watches
206 g_d, g_w_d = self.client.get_and_watch("/fo-1")
207 yield g_d
208
209 c_d, c_w_d = self.client.get_children_and_watch("/")
210 yield g_d
211
212 e_d, e_w_d = self.client.get_children_and_watch("/fo-2")
213 yield e_d
214
215 # Expire the session
216 yield self.expire_session()
217
218 # Poof
219
220 # Ephemerals back
221 c, s = yield self.client.get("/fo-1")
222 self.assertEqual(c, "abc")
223
224 c, s = yield self.client.get("/fo-2")
225 self.assertEqual(c, "def")
226
227 # Watches triggered
228 yield DeferredList(
229 [g_w_d, c_w_d, e_w_d],
230 fireOnOneErrback=True, consumeErrors=True)
231
232 self.assertEqual(
233 [str(d.result) for d in (g_w_d, c_w_d, e_w_d)],
234 ["<ClientEvent session at '/fo-1' state: connected>",
235 "<ClientEvent session at '/' state: connected>",
236 "<ClientEvent session at '/fo-2' state: connected>"])
237
238 @inlineCallbacks
239 def test_ephemeral_content_modification(self):
240 yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
241 yield self.client.set("/fo-1", "def")
242 yield self.expire_session()
243 c, s = yield self.client.get("/fo-1")
244 self.assertEqual(c, "def")
245
246 @inlineCallbacks
247 def test_ephemeral_acl_modification(self):
248 yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
249 acl = [test_client.PUBLIC_ACL,
250 dict(scheme="digest",
251 id="zebra:moon",
252 perms=zookeeper.PERM_ALL)]
253 yield self.client.set_acl("/fo-1", acl)
254 yield self.expire_session()
255 n_acl, stat = yield self.client.get_acl("/fo-1")
256 self.assertEqual(acl, n_acl)
257
258 @inlineCallbacks
259 def test_ephemeral_deletion(self):
260 yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
261 yield self.client.delete("/fo-1")
262 yield self.expire_session()
263 self.assertFalse((yield self.client.exists("/fo-1")))
0264
=== modified file 'txzookeeper/tests/test_retry.py'
--- txzookeeper/tests/test_retry.py 2011-11-08 12:15:16 +0000
+++ txzookeeper/tests/test_retry.py 2012-04-04 18:12:22 +0000
@@ -1,5 +1,5 @@
1#1#
2# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved2# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved
3#3#
4# This file is part of txzookeeper.4# This file is part of txzookeeper.
5#5#

Subscribers

People subscribed via source and target branches

to all changes: