Merge lp:~hazmat/txzookeeper/managed-watch-and-ephemeral into lp:txzookeeper

Proposed by Kapil Thangavelu
Status: Merged
Approved by: Jim Baker
Approved revision: 56
Merged at revision: 47
Proposed branch: lp:~hazmat/txzookeeper/managed-watch-and-ephemeral
Merge into: lp:txzookeeper
Diff against target: 1024 lines (+716/-53)
9 files modified
.bzrignore (+4/-0)
setup.py (+2/-2)
txzookeeper/client.py (+63/-40)
txzookeeper/managed.py (+350/-0)
txzookeeper/retry.py (+10/-9)
txzookeeper/tests/__init__.py (+22/-0)
txzookeeper/tests/test_client.py (+1/-1)
txzookeeper/tests/test_managed.py (+263/-0)
txzookeeper/tests/test_retry.py (+1/-1)
To merge this branch: bzr merge lp:~hazmat/txzookeeper/managed-watch-and-ephemeral
Reviewer Review Type Date Requested Status
Juju Engineering Pending
Review via email: mp+100715@code.launchpad.net

Description of the change

Managed Connections for txzk

A managed connection automatically handles transient connection failures,
and session expiration. On session expiration it recreates ephemeral nodes,
and fire watch events on extant watchers.

https://codereview.appspot.com/5976074/

To post a comment you must log in.
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Reviewers: mp+100715_code.launchpad.net,

Message:
Please take a look.

Description:
Managed Connections for txzk

A managed connection automatically handles transient connection
failures,
and session expiration. On session expiration it recreates ephemeral
nodes,
and fire watch events on extant watchers.

https://code.launchpad.net/~hazmat/txzookeeper/managed-watch-and-ephemeral/+merge/100715

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/5970080/

Affected files:
   A .bzrignore
   A [revision details]
   M setup.py
   M txzookeeper/client.py
   A txzookeeper/managed.py
   M txzookeeper/retry.py
   M txzookeeper/tests/__init__.py
   M txzookeeper/tests/test_client.py
   A txzookeeper/tests/test_managed.py
   M txzookeeper/tests/test_retry.py

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Reviewers: mp+100715_code.launchpad.net,

Message:
Please take a look.

Description:
Managed Connections for txzk

A managed connection automatically handles transient connection
failures,
and session expiration. On session expiration it recreates ephemeral
nodes,
and fire watch events on extant watchers.

https://code.launchpad.net/~hazmat/txzookeeper/managed-watch-and-ephemeral/+merge/100715

(do not edit description out of merge proposal)

Please review this at https://codereview.appspot.com/5976074/

Affected files:
   A .bzrignore
   A [revision details]
   M setup.py
   M txzookeeper/client.py
   A txzookeeper/managed.py
   M txzookeeper/retry.py
   M txzookeeper/tests/__init__.py
   M txzookeeper/tests/test_client.py
   A txzookeeper/tests/test_managed.py
   M txzookeeper/tests/test_retry.py

55. By Kapil Thangavelu

remove unesc. expire handler from retry client

Revision history for this message
William Reade (fwereade) wrote :

This looks very nice indeed, but the vagueness of the "comes at a cost"
comment makes me wonder what I'm missing...

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py
File txzookeeper/managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode66
txzookeeper/managed.py:66: mgr # keep the flakes happy
Can't you just do:

   with self._ctx():

?

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode70
txzookeeper/managed.py:70: mgr # keep the flakes happy
Ditto

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode122
txzookeeper/managed.py:122: come at a cost though.
Please expand ;).

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode138
txzookeeper/managed.py:138: """Called on intercept of session expiration
to restablish the session.
I'm pretty sure there are 2 es in re-establish, and that the hyphen may
have fallen out of common use but remains IMO more readable. As you
wish.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode140
txzookeeper/managed.py:140: This will reconnect to zk, restabslish
ephemerals, and trigger watches.
I like the word "restabslish", but ditto.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/tests/test_managed.py
File txzookeeper/tests/test_managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/tests/test_managed.py#newcode169
txzookeeper/tests/test_managed.py:169: # It takes some time to propogate
(1/3 session time as ping)
s/propogate/propagate/

https://codereview.appspot.com/5976074/

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

On 2012/04/04 13:23:29, fwereade wrote:
> This looks very nice indeed, but the vagueness of the "comes at a
cost" comment
> makes me wonder what I'm missing...

expanded greatly what those costs are.

> https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py
> File txzookeeper/managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode66
> txzookeeper/managed.py:66: mgr # keep the flakes happy
> Can't you just do:

> with self._ctx():

> ?

much nicer, thanks.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode70
> txzookeeper/managed.py:70: mgr # keep the flakes happy
> Ditto

done.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode122
> txzookeeper/managed.py:122: come at a cost though.
> Please expand ;).

done.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode138
> txzookeeper/managed.py:138: """Called on intercept of session
expiration to
> restablish the session.
> I'm pretty sure there are 2 es in re-establish, and that the hyphen
may have
> fallen out of common use but remains IMO more readable. As you wish.

proper speling established ;-)

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode140
> txzookeeper/managed.py:140: This will reconnect to zk, restabslish
ephemerals,
> and trigger watches.
> I like the word "restabslish", but ditto.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/tests/test_managed.py
> File txzookeeper/tests/test_managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/tests/test_managed.py#newcode169
> txzookeeper/tests/test_managed.py:169: # It takes some time to
propogate (1/3
> session time as ping)
> s/propogate/propagate/

done.

http://codereview.appspot.com/5976074/

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Please take a look.

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py
File txzookeeper/managed.py (right):

https://codereview.appspot.com/5976074/diff/1/txzookeeper/managed.py#newcode66
txzookeeper/managed.py:66: mgr # keep the flakes happy
On 2012/04/04 13:23:29, fwereade wrote:
> Can't you just do:

> with self._ctx():

> ?

thanks, much nicer.

https://codereview.appspot.com/5976074/

56. By Kapil Thangavelu

address review comments

Revision history for this message
Jim Baker (jimbaker) wrote :

+1, LGTM, just some minors

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py
File txzookeeper/managed.py (right):

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode153
txzookeeper/managed.py:153: def cb_restablish_session(self, e=None):
Presumably this should be cb_re_establish... but I don't see the
difference here with the easier synonym: restore. Good either way with
me.

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode162
txzookeeper/managed.py:162: # If its been explicitly closed, don't
restablish.
it's... re-establish

(or are you saying something about being restful? ;) )

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode166
txzookeeper/managed.py:166: # If its a stale handle, don't restablish
it's... re-establish

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode174
txzookeeper/managed.py:174: # Its already been restablished, don't
restablish.
It's... re-established... re-establish

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode183
txzookeeper/managed.py:183: # Restablish
I'm not going to mark any more of these!

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/managed.py#newcode189
txzookeeper/managed.py:189: log.error("error while restablish %r %s" %
(e, e))
Another minor: probably want some consistency in beginning with
uppercase. Not certain why you use log.error here, but log.exception
later for what appears to be a similar case.

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/tests/test_managed.py
File txzookeeper/tests/test_managed.py (right):

https://codereview.appspot.com/5976074/diff/4001/txzookeeper/tests/test_managed.py#newcode110
txzookeeper/tests/test_managed.py:110: """Reset fires a synthentic
client event, and clears watches.
no need for comma

https://codereview.appspot.com/5976074/

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== added file '.bzrignore'
2--- .bzrignore 1970-01-01 00:00:00 +0000
3+++ .bzrignore 2012-04-04 18:12:22 +0000
4@@ -0,0 +1,4 @@
5+.emacs.desktop
6+_trial_temp
7+txzookeeper
8+txzookeeper.egg-info
9
10=== modified file 'setup.py'
11--- setup.py 2011-12-12 13:19:28 +0000
12+++ setup.py 2012-04-04 18:12:22 +0000
13@@ -45,8 +45,8 @@
14 name="txzookeeper",
15 version=version,
16 description="Twisted api for Apache Zookeeper",
17- author="Ensemble Developers",
18- author_email="ensemble@lists.ubuntu.com",
19+ author="Juju Developers",
20+ author_email="juju@lists.ubuntu.com",
21 url="https://launchpad.net/txzookeeper",
22 license="LGPL",
23 packages=find_packages(),
24
25=== modified file 'txzookeeper/client.py'
26--- txzookeeper/client.py 2011-12-06 21:21:29 +0000
27+++ txzookeeper/client.py 2012-04-04 18:12:22 +0000
28@@ -96,15 +96,20 @@
29
30 @property
31 def state_name(self):
32- return STATE_NAME_MAPPING[self.args[2]]
33+ return STATE_NAME_MAPPING.get(self.args[2], "unknown")
34
35 @property
36 def type_name(self):
37- return TYPE_NAME_MAPPING[self.args[1]]
38+ return TYPE_NAME_MAPPING.get(self.args[1], "unknown")
39+
40+ @property
41+ def handle(self):
42+ return self.args[3]
43
44 def __str__(self):
45- return "<txzookeeper.ConnectionException type: %s state: %s>" % (
46- self.type_name, self.state_name)
47+ return (
48+ "<txzookeeper.ConnectionException handle: %s type: %s state: %s>"
49+ % (self.handle, self.type_name, self.state_name))
50
51
52 def is_connection_exception(e):
53@@ -133,11 +138,11 @@
54
55 @property
56 def type_name(self):
57- return TYPE_NAME_MAPPING[self.type]
58+ return TYPE_NAME_MAPPING.get(self.type, "unknown")
59
60 @property
61 def state_name(self):
62- return STATE_NAME_MAPPING[self.connection_state]
63+ return STATE_NAME_MAPPING.get(self.connection_state, "unknown")
64
65 def __repr__(self):
66 return "<ClientEvent %s at %r state: %s>" % (
67@@ -169,6 +174,10 @@
68 self.connected = False
69 self.handle = None
70
71+ def __repr__(self):
72+ return "<txZookeeperClient client: %s handle: %r state: %s>" % (
73+ self.client_id[0], self.handle, self.state)
74+
75 def _check_connected(self, d):
76 if not self.connected:
77 d.errback(NotConnectedException("not connected"))
78@@ -218,7 +227,7 @@
79 d.callback((value, stat))
80
81 callback = self._zk_thread_callback(_cb_get)
82- watcher = self._wrap_watcher(watcher)
83+ watcher = self._wrap_watcher(watcher, "get", path)
84 result = zookeeper.aget(self.handle, path, watcher, callback)
85 self._check_result(result, d)
86 return d
87@@ -234,7 +243,7 @@
88 d.callback(children)
89
90 callback = self._zk_thread_callback(_cb_get_children)
91- watcher = self._wrap_watcher(watcher)
92+ watcher = self._wrap_watcher(watcher, "child", path)
93 result = zookeeper.aget_children(self.handle, path, watcher, callback)
94 self._check_result(result, d)
95 return d
96@@ -251,12 +260,12 @@
97 d.callback(stat)
98
99 callback = self._zk_thread_callback(_cb_exists)
100- watcher = self._wrap_watcher(watcher)
101+ watcher = self._wrap_watcher(watcher, "exists", path)
102 result = zookeeper.aexists(self.handle, path, watcher, callback)
103 self._check_result(result, d)
104 return d
105
106- def _wrap_watcher(self, watcher):
107+ def _wrap_watcher(self, watcher, watch_type, path):
108 if watcher is None:
109 return watcher
110 if not callable(watcher):
111@@ -285,14 +294,19 @@
112 else:
113 return watcher(event_type, conn_state, path)
114
115- def _zk_thread_callback(self, func):
116+ def _zk_thread_callback(self, func, *f_args, **f_kw):
117 """
118 The client library invokes callbacks in a separate thread, we wrap
119 any user defined callback so that they are called back in the main
120 thread after, zookeeper calls the wrapper.
121 """
122+ f_args = list(f_args)
123+
124 def wrapper(handle, *args): # pragma: no cover
125- reactor.callFromThread(func, *args)
126+ # make a copy the conn watch callback gets invoked multiple times
127+ cb_args = list(f_args)
128+ cb_args.extend(args)
129+ reactor.callFromThread(func, *cb_args, **f_kw)
130 return wrapper
131
132 @property
133@@ -391,6 +405,7 @@
134 d = defer.Deferred()
135 if self._check_result(result, d):
136 return d
137+ self.handle = None
138 d.callback(True)
139 return d
140
141@@ -437,7 +452,6 @@
142 else:
143 self.handle = zookeeper.init(
144 self._servers, callback, self._session_timeout)
145-
146 return d
147
148 def _cb_connected(
149@@ -460,6 +474,7 @@
150 # If we timed out and then connected, then close the conn.
151 if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called:
152 self.close()
153+ return
154
155 # Send session events to the callback, in addition to any
156 # duplicate session events that will be sent for extant watches.
157@@ -468,7 +483,10 @@
158 self, ClientEvent(type, state, path))
159
160 return
161- elif state == zookeeper.CONNECTED_STATE:
162+ # Connected successfully, or If we're expired on an initial
163+ # connect, someone else expired us.
164+ elif state in (zookeeper.CONNECTED_STATE,
165+ zookeeper.EXPIRED_SESSION_STATE):
166 connect_deferred.callback(self)
167 return
168
169@@ -488,17 +506,18 @@
170 if self._check_connected(d):
171 return d
172
173- def _cb_created(result_code, path):
174- if self._check_result(result_code, d):
175- return
176- d.callback(path)
177-
178- callback = self._zk_thread_callback(_cb_created)
179+ callback = self._zk_thread_callback(
180+ self._cb_created, d, data, acls, flags)
181 result = zookeeper.acreate(
182 self.handle, path, data, acls, flags, callback)
183 self._check_result(result, d)
184 return d
185
186+ def _cb_created(self, d, data, acls, flags, result_code, path):
187+ if self._check_result(result_code, d):
188+ return
189+ d.callback(path)
190+
191 def delete(self, path, version=-1):
192 """
193 Delete the node at the given path. If the current node version on the
194@@ -513,16 +532,16 @@
195 if self._check_connected(d):
196 return d
197
198- def _cb_delete(result_code):
199- if self._check_result(result_code, d):
200- return
201- d.callback(result_code)
202-
203- callback = self._zk_thread_callback(_cb_delete)
204+ callback = self._zk_thread_callback(self._cb_deleted, d, path)
205 result = zookeeper.adelete(self.handle, path, version, callback)
206 self._check_result(result, d)
207 return d
208
209+ def _cb_deleted(self, d, path, result_code):
210+ if self._check_result(result_code, d):
211+ return
212+ d.callback(result_code)
213+
214 def exists(self, path):
215 """
216 Check that the given node path exists. Returns a deferred that
217@@ -657,17 +676,17 @@
218 if self._check_connected(d):
219 return d
220
221- def _cb_set_acl(result_code):
222- if self._check_result(result_code, d):
223- return
224- d.callback(result_code)
225-
226- callback = self._zk_thread_callback(_cb_set_acl)
227+ callback = self._zk_thread_callback(self._cb_set_acl, d, path, acls)
228 result = zookeeper.aset_acl(
229 self.handle, path, version, acls, callback)
230 self._check_result(result, d)
231 return d
232
233+ def _cb_set_acl(self, d, path, acls, result_code):
234+ if self._check_result(result_code, d):
235+ return
236+ d.callback(result_code)
237+
238 def set(self, path, data="", version=-1):
239 """
240 Sets the data of a node at the given path. If the current node version
241@@ -683,16 +702,16 @@
242 if self._check_connected(d):
243 return d
244
245- def _cb_set(result_code, node_stat):
246- if self._check_result(result_code, d):
247- return
248- d.callback(node_stat)
249-
250- callback = self._zk_thread_callback(_cb_set)
251+ callback = self._zk_thread_callback(self._cb_set, d, path, data)
252 result = zookeeper.aset(self.handle, path, data, version, callback)
253 self._check_result(result, d)
254 return d
255
256+ def _cb_set(self, d, path, data, result_code, node_stat):
257+ if self._check_result(result_code, d):
258+ return
259+ d.callback(node_stat)
260+
261 def set_connection_watcher(self, watcher):
262 """
263 Sets a permanent global watcher on the connection. This will get
264@@ -702,7 +721,7 @@
265 """
266 if not callable(watcher):
267 raise SyntaxError("Invalid Watcher %r" % (watcher))
268- watcher = self._wrap_watcher(watcher)
269+ watcher = self._wrap_watcher(watcher, None, None)
270 zookeeper.set_watcher(self.handle, watcher)
271
272 def set_session_callback(self, callback):
273@@ -740,9 +759,13 @@
274 """
275 if not callable(callback):
276 raise TypeError("Invalid callback %r" % callback)
277+ if self._connection_error_callback is not None:
278+ raise RuntimeError((
279+ "Connection error handlers can't be changed %s" %
280+ self._connection_error_callback))
281 self._connection_error_callback = callback
282
283- def set_determinstic_order(self, boolean):
284+ def set_deterministic_order(self, boolean):
285 """
286 The zookeeper client will by default randomize the server hosts
287 it will connect to unless this is set to True.
288
289=== added file 'txzookeeper/managed.py'
290--- txzookeeper/managed.py 1970-01-01 00:00:00 +0000
291+++ txzookeeper/managed.py 2012-04-04 18:12:22 +0000
292@@ -0,0 +1,350 @@
293+from functools import partial
294+
295+import contextlib
296+import logging
297+import zookeeper
298+
299+from twisted.internet.defer import (
300+ inlineCallbacks, DeferredLock, fail, returnValue)
301+
302+from client import ZookeeperClient, ClientEvent, NotConnectedException
303+from retry import RetryClient
304+
305+
306+class StopWatcher(Exception):
307+ pass
308+
309+
310+WATCH_KIND_MAP = {
311+ "child": "get_children_and_watch",
312+ "exists": "exists_and_watch",
313+ "get": "get_and_watch"
314+ }
315+
316+
317+log = logging.getLogger("txzk.managed")
318+
319+
320+class Watch(object):
321+ """
322+ For application driven persistent watches, where the application
323+ is manually resetting the watch.
324+ """
325+
326+ __slots__ = ("_mgr", "_client", "_path", "_kind", "_callback")
327+
328+ def __init__(self, mgr, path, kind, callback):
329+ self._mgr = mgr
330+ self._path = path
331+ self._kind = kind
332+ self._callback = callback
333+
334+ @property
335+ def path(self):
336+ return self._path
337+
338+ @property
339+ def kind(self):
340+ return self._kind
341+
342+ @contextlib.contextmanager
343+ def _ctx(self):
344+ mgr = self._mgr
345+ del self._mgr
346+ try:
347+ yield mgr
348+ finally:
349+ mgr.remove(self)
350+
351+ @inlineCallbacks
352+ def reset(self):
353+ with self._ctx():
354+ yield self._callback(
355+ zookeeper.SESSION_EVENT,
356+ zookeeper.CONNECTED_STATE,
357+ self._path)
358+
359+ def __call__(self, *args, **kw):
360+ with self._ctx():
361+ return self._callback(*args, **kw)
362+
363+ def __str__(self):
364+ return "<Watcher %s %s %r>" % (self.kind, self.path, self._callback)
365+
366+
367+class WatchManager(object):
368+
369+ watch_class = Watch
370+
371+ def __init__(self):
372+ self._watches = []
373+
374+ def add(self, path, watch_type, watcher):
375+ w = self.watch_class(self, path, watch_type, watcher)
376+ self._watches.append(w)
377+ return w
378+
379+ def remove(self, w):
380+ try:
381+ self._watches.remove(w)
382+ except ValueError:
383+ pass
384+
385+ def iterkeys(self):
386+ for w in self._watches:
387+ yield (w.path, w.kind)
388+
389+ def clear(self):
390+ del self._watches
391+ self._watches = []
392+
393+ @inlineCallbacks
394+ def reset(self, *ignored):
395+ watches = self._watches
396+ self._watches = []
397+
398+ for w in watches:
399+ try:
400+ yield w.reset()
401+ except Exception, e:
402+ log.error("Error reseting watch %s with session event. %s %r",
403+ w, e, e)
404+ continue
405+
406+
407+class SessionClient(ZookeeperClient):
408+ """A managed client that automatically re-establishes ephemerals and
409+ triggers watches after reconnecting post session expiration.
410+
411+ This abstracts the client from session expiration handling. It does
412+ come at a cost though.
413+
414+ There are two application constraints that need to be considered for usage
415+ of the SessionClient or ManagedClient. The first is that watch callbacks
416+ which examine the event, must be able to handle the synthetic session
417+ event which is sent to them when the session is re-established.
418+
419+ The second and more problematic is that algorithms/patterns
420+ utilizing ephemeral sequence nodes need to be rethought, as the
421+ session client will recreate the nodes when reconnecting at their
422+ previous paths. Some algorithms (like the std zk lock recipe) rely
423+ on properties like the smallest valued ephemeral sequence node in
424+ a container to identify the lock holder, with the notion that upon
425+ session expiration a new lock/leader will be sought. Sequence
426+ ephemeral node recreation in this context is problematic as the
427+ node is recreated at the exact previous path. Alternative lock
428+ strategies that do work are fairly simple at low volume, such as
429+ owning a particular node path (ie. /locks/lock-holder) with an
430+ ephemeral.
431+ """
432+
433+ def __init__(
434+ self, servers=None, session_timeout=None, connect_timeout=4000):
435+ """
436+ """
437+ super(SessionClient, self).__init__(servers, session_timeout)
438+ self._connect_timeout = connect_timeout
439+ self._watches = WatchManager()
440+ self._ephemerals = {}
441+ self._reconnect_lock = DeferredLock()
442+ self.set_connection_error_callback(self._cb_connection_error)
443+
444+ @inlineCallbacks
445+ def cb_restablish_session(self, e=None):
446+ """Called on intercept of session expiration to create new session.
447+
448+ This will reconnect to zk, re-establish ephemerals, and
449+ trigger watches.
450+ """
451+ yield self._reconnect_lock.acquire()
452+
453+ try:
454+ # If its been explicitly closed, don't restablish.
455+ if self.handle is None:
456+ return
457+
458+ # If its a stale handle, don't restablish
459+ try:
460+ zookeeper.is_unrecoverable(self.handle)
461+ except zookeeper.ZooKeeperException:
462+ if e:
463+ raise e
464+ return
465+
466+ # Its already been restablished, don't restablish.
467+ if not self.unrecoverable:
468+ return
469+ elif self.connected:
470+ self.close()
471+ self.handle = 0
472+ elif isinstance(self.handle, int):
473+ self.handle = 0
474+
475+ # Restablish
476+ assert self.handle == 0
477+ yield self._cb_restablish_session().addErrback(
478+ self._cb_restablish_errback, e)
479+
480+ except Exception, e:
481+ log.error("error while restablish %r %s" % (e, e))
482+ finally:
483+ yield self._reconnect_lock.release()
484+
485+ @inlineCallbacks
486+ def _cb_restablish_session(self):
487+ """Re-establish a new session, and recreate ephemerals and watches.
488+ """
489+ while 1:
490+ # Reconnect
491+ if self.handle is None:
492+ returnValue(self.handle)
493+ try:
494+ yield self.connect(timeout=self._connect_timeout)
495+ except zookeeper.ZooKeeperException, e:
496+ log.exception("Error while connecting %r %s" % (e, e))
497+ continue
498+ else:
499+ break
500+
501+ items = self._ephemerals.items()
502+ self._ephemerals = {}
503+
504+ for path, e in items:
505+ try:
506+ yield self.create(
507+ path, e['data'], acls=e['acls'], flags=e['flags'])
508+ except zookeeper.NodeExistsException:
509+ log.error("Attempt to create ephemeral node failed %r", path)
510+ yield self._watches.reset()
511+
512+ def _cb_restablish_errback(self, err, failure):
513+ """If there's an error re-establishing the session log it.
514+ """
515+ log.error("Error while trying to restablish connection %s\n%s" % (
516+ failure.value, failure.getTraceback()))
517+ return failure
518+
519+ @inlineCallbacks
520+ def _cb_connection_error(self, client, error):
521+ """Convert session expiration to a transient connection error.
522+
523+ Dispatches from api usage error.
524+ """
525+ if not isinstance(error, (
526+ zookeeper.SessionExpiredException,
527+ NotConnectedException,
528+ zookeeper.ClosingException)):
529+ raise error
530+ yield self._cb_restablish_session()
531+ raise zookeeper.ConnectionLossException
532+
533+ @inlineCallbacks
534+ def _cb_connection_event(self, evt):
535+ """
536+ """
537+
538+ # client connected tracker
539+ def _check_connected(self, d):
540+ """Clients are automatically reconnected."""
541+ if self.connected:
542+ return
543+
544+ if self.handle is None:
545+ d.errback(NotConnectedException("not connected"))
546+ return d
547+
548+ c_d = self.cb_restablish_session()
549+
550+ def after_connected(client):
551+ """Return a transient connection failure.
552+
553+ The retry client will automatically attempt to retry the operation.
554+ """
555+ return fail(zookeeper.ConnectionLossException("Retry"))
556+
557+ c_d.addCallback(after_connected)
558+ c_d.chainDeferred(d)
559+ return d
560+
561+ # Dispatch from node watches on session expiration
562+ def _watch_session_wrapper(self, watcher, event_type, conn_state, path):
563+ """Watch wrapper that diverts session events to a connection callback.
564+ """
565+ if (event_type == zookeeper.SESSION_EVENT and
566+ conn_state == zookeeper.EXPIRED_SESSION_STATE):
567+ d = self.cb_restablish_session()
568+ d.addErrback(self._cb_restablish_errback)
569+ return d
570+ if event_type == zookeeper.SESSION_EVENT:
571+ if self._session_event_callback:
572+ self._session_event_callback(
573+ self, ClientEvent(event_type, conn_state, path))
574+ else:
575+ return watcher(event_type, conn_state, path)
576+
577+ # Track all watches
578+ def _wrap_watcher(self, watcher, watch_type, path):
579+ if watcher is None:
580+ return watcher
581+ if not callable(watcher):
582+ raise SyntaxError("invalid watcher")
583+
584+ # handle conn watcher, separately.
585+ if watch_type is None and path is None:
586+ return self._zk_thread_callback(
587+ self._watch_session_wrapper, watcher)
588+
589+ return self._zk_thread_callback(
590+ partial(
591+ self._watch_session_wrapper,
592+ self._watches.add(path, watch_type, watcher)))
593+
594+ # Track ephemerals
595+ def _cb_created(self, d, data, acls, flags, result_code, path):
596+ if self._check_result(result_code, d):
597+ return
598+
599+ if flags & zookeeper.EPHEMERAL:
600+ self._ephemerals[path] = dict(
601+ data=data, acls=acls, flags=flags)
602+
603+ d.callback(path)
604+
605+ def _cb_deleted(self, d, path, result_code):
606+ if self._check_result(result_code, d):
607+ return
608+
609+ self._ephemerals.pop(path, None)
610+ d.callback(result_code)
611+
612+ def _cb_set_acl(self, d, path, acls, result_code):
613+ if self._check_result(result_code, d):
614+ return
615+
616+ if path in self._ephemerals:
617+ self._ephemerals[path]['acls'] = acls
618+
619+ d.callback(result_code)
620+
621+ def _cb_set(self, d, path, data, result_code, node_stat):
622+ if self._check_result(result_code, d):
623+ return
624+
625+ if path in self._ephemerals:
626+ self._ephemerals[path]['data'] = data
627+
628+ d.callback(node_stat)
629+
630+
631+class ManagedClient(RetryClient):
632+
633+ def __init__(self, *args, **kw):
634+ client = SessionClient(*args, **kw)
635+ super(ManagedClient, self).__init__(
636+ client, client.cb_restablish_session)
637+
638+
639+def ManagedClient(servers=None, session_timeout=None, connect_timeout=10000):
640+ from retry import RetryClient
641+ client = SessionClient(servers, session_timeout, connect_timeout)
642+ return RetryClient(client)
643
644=== modified file 'txzookeeper/retry.py'
645--- txzookeeper/retry.py 2011-11-08 12:15:16 +0000
646+++ txzookeeper/retry.py 2012-04-04 18:12:22 +0000
647@@ -64,7 +64,7 @@
648
649 :param session_timeout: The timeout for the session, in milliseconds
650 :param max_delay: The max delay for a retry, in seconds.
651- :param session_fraction: The fractinoal amount of a timeout to wait
652+ :param session_fraction: The fractional amount of a timeout to wait
653 """
654 retry_delay = session_timeout / (float(session_fraction) * 1000)
655 return min(retry_delay, max_delay)
656@@ -111,20 +111,20 @@
657 must return a single value (either a deferred or result
658 value).
659 """
660- # For clients which aren't connected (session timeout == None)
661- # we raise the errors to the callers
662- session_timeout = client.session_timeout or 0
663-
664- # If we keep retrying past the 1.5 * session timeout without
665- # success just die, the session expiry is fatal.
666- max_time = session_timeout * 1.5 + time.time()
667-
668 while 1:
669 try:
670 value = yield func(*args, **kw)
671 except Exception, e:
672+ # For clients which aren't connected (session timeout == None)
673+ # we raise the errors to the callers
674+ session_timeout = client.session_timeout or 0
675+
676+ # If we keep retrying past the 1.5 * session timeout without
677+ # success just die, the session expiry is fatal.
678+ max_time = session_timeout * 1.5 + time.time()
679 if not check_retryable(client, max_time, e):
680 raise
681+
682 # Give the connection a chance to auto-heal.
683 yield sleep(get_delay(session_timeout))
684 continue
685@@ -172,6 +172,7 @@
686 # Give the connection a chance to auto-heal
687 d = sleep(get_delay(session_timeout))
688 d.addCallback(retry_inner)
689+
690 return d
691
692 def retry_inner(value):
693
694=== modified file 'txzookeeper/tests/__init__.py'
695--- txzookeeper/tests/__init__.py 2011-10-22 04:24:02 +0000
696+++ txzookeeper/tests/__init__.py 2012-04-04 18:12:22 +0000
697@@ -20,6 +20,8 @@
698 # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
699 #
700
701+import logging
702+import StringIO
703 import sys
704
705 import zookeeper
706@@ -72,6 +74,26 @@
707 return None
708 return original_value
709
710+ def capture_log(
711+ self, name="", level=logging.INFO, log_file=None, formatter=None):
712+ """Capture log channel to StringIO"""
713+ if log_file is None:
714+ log_file = StringIO.StringIO()
715+ log_handler = logging.StreamHandler(log_file)
716+ if formatter:
717+ log_handler.setFormatter(formatter)
718+ logger = logging.getLogger(name)
719+ logger.addHandler(log_handler)
720+ old_logger_level = logger.level
721+ logger.setLevel(level)
722+
723+ @self.addCleanup
724+ def reset_logging():
725+ logger.removeHandler(log_handler)
726+ logger.setLevel(old_logger_level)
727+
728+ return log_file
729+
730
731 def egg_test_runner():
732 """
733
734=== modified file 'txzookeeper/tests/test_client.py'
735--- txzookeeper/tests/test_client.py 2011-10-22 04:24:02 +0000
736+++ txzookeeper/tests/test_client.py 2012-04-04 18:12:22 +0000
737@@ -1050,7 +1050,7 @@
738 d = self.client.connect()
739
740 def verify_session_timeout(client):
741- self.assertEqual(client.session_timeout, 4000)
742+ self.assertIn(client.session_timeout, (4000, 10000))
743
744 d.addCallback(verify_session_timeout)
745 return d
746
747=== added file 'txzookeeper/tests/test_managed.py'
748--- txzookeeper/tests/test_managed.py 1970-01-01 00:00:00 +0000
749+++ txzookeeper/tests/test_managed.py 2012-04-04 18:12:22 +0000
750@@ -0,0 +1,263 @@
751+#
752+# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved
753+#
754+# This file is part of txzookeeper.
755+#
756+# Authors:
757+# Kapil Thangavelu
758+#
759+# txzookeeper is free software: you can redistribute it and/or modify
760+# it under the terms of the GNU Lesser General Public License as published by
761+# the Free Software Foundation, either version 3 of the License, or
762+# (at your option) any later version.
763+#
764+# txzookeeper is distributed in the hope that it will be useful,
765+# but WITHOUT ANY WARRANTY; without even the implied warranty of
766+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
767+# GNU Lesser General Public License for more details.
768+#
769+# You should have received a copy of the GNU Lesser General Public License
770+# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>.
771+#
772+
773+import zookeeper
774+
775+from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList
776+
777+from txzookeeper.client import ZookeeperClient, ClientEvent
778+from txzookeeper import managed
779+from txzookeeper.tests import ZookeeperTestCase, utils
780+from txzookeeper.tests import test_client
781+
782+
783+class WatchTest(ZookeeperTestCase):
784+ """Watch manager and watch tests"""
785+
786+ def setUp(self):
787+ self.watches = managed.WatchManager()
788+
789+ def tearDown(self):
790+ self.watches.clear()
791+ del self.watches
792+
793+ def test_add_remove(self):
794+
795+ w = self.watches.add("/foobar", "child", lambda x: 1)
796+ self.assertIn(
797+ "<Watcher child /foobar",
798+ str(w))
799+ self.assertIn(w, self.watches._watches)
800+ self.watches.remove(w)
801+ self.assertNotIn(w, self.watches._watches)
802+ self.watches.remove(w)
803+
804+ @inlineCallbacks
805+ def test_watch_fire_removes(self):
806+ """Firing the watch removes it from the manager.
807+ """
808+ w = self.watches.add("/foobar", "child", lambda x: 1)
809+ yield w("a")
810+ self.assertNotIn(w, self.watches._watches)
811+
812+ @inlineCallbacks
813+ def test_watch_fire_with_error_removes(self):
814+ """Firing the watch removes it from the manager.
815+ """
816+ d = Deferred()
817+
818+ @inlineCallbacks
819+ def cb_error(e):
820+ yield d
821+ raise ValueError("a")
822+
823+ w = self.watches.add("/foobar", "child", lambda x: 1)
824+ try:
825+ wd = w("a")
826+ d.callback(True)
827+ yield wd
828+ except ValueError:
829+ pass
830+ self.assertNotIn(w, self.watches._watches)
831+
832+ @inlineCallbacks
833+ def test_reset_with_error(self):
834+ """A callback firing an error on reset is ignored.
835+ """
836+ output = self.capture_log("txzk.managed")
837+ d = Deferred()
838+ results = []
839+
840+ @inlineCallbacks
841+ def callback(*args, **kw):
842+ results.append((args, kw))
843+ yield d
844+ raise ValueError("a")
845+
846+ w = self.watches.add("/foobar", "child", callback)
847+ reset_done = self.watches.reset()
848+
849+ e, _ = results.pop()
850+ self.assertEqual(
851+ str(ClientEvent(*e)),
852+ "<ClientEvent session at '/foobar' state: connected>")
853+ d.callback(True)
854+ yield reset_done
855+ self.assertNotIn(w, self.watches._watches)
856+ self.assertIn("Error reseting watch", output.getvalue())
857+
858+ @inlineCallbacks
859+ def test_reset(self):
860+ """Reset fires a synthentic client event, and clears watches.
861+ """
862+ d = Deferred()
863+ results = []
864+
865+ def callback(*args, **kw):
866+ results.append((args, kw))
867+ return d
868+
869+ w = self.watches.add("/foobar", "child", callback)
870+ reset_done = self.watches.reset()
871+
872+ e, _ = results.pop()
873+ self.assertEqual(
874+ str(ClientEvent(*e)),
875+ "<ClientEvent session at '/foobar' state: connected>")
876+ d.callback(True)
877+ yield reset_done
878+ self.assertNotIn(w, self.watches._watches)
879+
880+
881+class SessionClientTests(test_client.ClientTests):
882+ """Run through basic operations with SessionClient."""
883+ timeout = 5
884+
885+ def setUp(self):
886+ super(SessionClientTests, self).setUp()
887+ self.client = managed.SessionClient("127.0.0.1:2181")
888+
889+ def test_client_use_while_disconnected_returns_failure(self):
890+ # managed session client reconnects here.
891+ return True
892+
893+
894+class SessionClientExpireTests(ZookeeperTestCase):
895+ """Verify expiration behavior."""
896+
897+ def setUp(self):
898+ super(SessionClientExpireTests, self).setUp()
899+ self.client = managed.ManagedClient("127.0.0.1:2181", 3000)
900+ self.client2 = None
901+ return self.client.connect()
902+
903+ @inlineCallbacks
904+ def tearDown(self):
905+ self.client.close()
906+
907+ self.client2 = ZookeeperClient("127.0.0.1:2181")
908+ yield self.client2.connect()
909+ utils.deleteTree(handle=self.client2.handle)
910+ yield self.client2.close()
911+ super(SessionClientExpireTests, self).tearDown()
912+
913+ @inlineCallbacks
914+ def expire_session(self):
915+ assert self.client.connected
916+ self.client2 = ZookeeperClient(self.client.servers)
917+ yield self.client2.connect(client_id=self.client.client_id)
918+ yield self.client2.close()
919+ # It takes some time to propagate (1/3 session time as ping)
920+ yield self.sleep(2)
921+
922+ @inlineCallbacks
923+ def test_session_expiration_conn(self):
924+ session_id = self.client.client_id[0]
925+ yield self.client.create("/fo-1", "abc")
926+ yield self.expire_session()
927+ yield self.client.exists("/")
928+ self.assertNotEqual(session_id, self.client.client_id[0])
929+
930+ @inlineCallbacks
931+ def test_session_expiration_conn_watch(self):
932+ session_id = self.client.client_id[0]
933+ yield self.client.create("/fo-1", "abc")
934+ yield self.expire_session()
935+ yield self.client.exists("/")
936+ self.assertNotEqual(session_id, self.client.client_id[0])
937+
938+ @inlineCallbacks
939+ def test_invoked_watch_gc(self):
940+ c_d, w_d = yield self.client.get_children_and_watch("/")
941+ yield c_d
942+ yield self.client.create("/foo")
943+ yield w_d
944+ yield self.expire_session()
945+ yield self.client.create("/foo2")
946+ # Nothing should blow up
947+ yield self.sleep(0.2)
948+
949+ @inlineCallbacks
950+ def test_ephemeral_and_watch_recreate(self):
951+ # Create some ephemeral nodes
952+ yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
953+ yield self.client.create("/fo-2", "def", flags=zookeeper.EPHEMERAL)
954+
955+ # Create some watches
956+ g_d, g_w_d = self.client.get_and_watch("/fo-1")
957+ yield g_d
958+
959+ c_d, c_w_d = self.client.get_children_and_watch("/")
960+ yield g_d
961+
962+ e_d, e_w_d = self.client.get_children_and_watch("/fo-2")
963+ yield e_d
964+
965+ # Expire the session
966+ yield self.expire_session()
967+
968+ # Poof
969+
970+ # Ephemerals back
971+ c, s = yield self.client.get("/fo-1")
972+ self.assertEqual(c, "abc")
973+
974+ c, s = yield self.client.get("/fo-2")
975+ self.assertEqual(c, "def")
976+
977+ # Watches triggered
978+ yield DeferredList(
979+ [g_w_d, c_w_d, e_w_d],
980+ fireOnOneErrback=True, consumeErrors=True)
981+
982+ self.assertEqual(
983+ [str(d.result) for d in (g_w_d, c_w_d, e_w_d)],
984+ ["<ClientEvent session at '/fo-1' state: connected>",
985+ "<ClientEvent session at '/' state: connected>",
986+ "<ClientEvent session at '/fo-2' state: connected>"])
987+
988+ @inlineCallbacks
989+ def test_ephemeral_content_modification(self):
990+ yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
991+ yield self.client.set("/fo-1", "def")
992+ yield self.expire_session()
993+ c, s = yield self.client.get("/fo-1")
994+ self.assertEqual(c, "def")
995+
996+ @inlineCallbacks
997+ def test_ephemeral_acl_modification(self):
998+ yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
999+ acl = [test_client.PUBLIC_ACL,
1000+ dict(scheme="digest",
1001+ id="zebra:moon",
1002+ perms=zookeeper.PERM_ALL)]
1003+ yield self.client.set_acl("/fo-1", acl)
1004+ yield self.expire_session()
1005+ n_acl, stat = yield self.client.get_acl("/fo-1")
1006+ self.assertEqual(acl, n_acl)
1007+
1008+ @inlineCallbacks
1009+ def test_ephemeral_deletion(self):
1010+ yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL)
1011+ yield self.client.delete("/fo-1")
1012+ yield self.expire_session()
1013+ self.assertFalse((yield self.client.exists("/fo-1")))
1014
1015=== modified file 'txzookeeper/tests/test_retry.py'
1016--- txzookeeper/tests/test_retry.py 2011-11-08 12:15:16 +0000
1017+++ txzookeeper/tests/test_retry.py 2012-04-04 18:12:22 +0000
1018@@ -1,5 +1,5 @@
1019 #
1020-# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved
1021+# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved
1022 #
1023 # This file is part of txzookeeper.
1024 #

Subscribers

People subscribed via source and target branches

to all changes: