Merge lp:~hazmat/txzookeeper/managed-watch-and-ephemeral into lp:txzookeeper
- managed-watch-and-ephemeral
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Jim Baker |
Approved revision: | 56 |
Merged at revision: | 47 |
Proposed branch: | lp:~hazmat/txzookeeper/managed-watch-and-ephemeral |
Merge into: | lp:txzookeeper |
Diff against target: |
1024 lines (+716/-53) 9 files modified
.bzrignore (+4/-0) setup.py (+2/-2) txzookeeper/client.py (+63/-40) txzookeeper/managed.py (+350/-0) txzookeeper/retry.py (+10/-9) txzookeeper/tests/__init__.py (+22/-0) txzookeeper/tests/test_client.py (+1/-1) txzookeeper/tests/test_managed.py (+263/-0) txzookeeper/tests/test_retry.py (+1/-1) |
To merge this branch: | bzr merge lp:~hazmat/txzookeeper/managed-watch-and-ephemeral |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+100715@code.launchpad.net |
Commit message
Description of the change
Managed Connections for txzk
A managed connection automatically handles transient connection failures,
and session expiration. On session expiration it recreates ephemeral nodes,
and fire watch events on extant watchers.
Kapil Thangavelu (hazmat) wrote : | # |
Kapil Thangavelu (hazmat) wrote : | # |
Reviewers: mp+100715_
Message:
Please take a look.
Description:
Managed Connections for txzk
A managed connection automatically handles transient connection
failures,
and session expiration. On session expiration it recreates ephemeral
nodes,
and fire watch events on extant watchers.
https:/
(do not edit description out of merge proposal)
Please review this at https:/
Affected files:
A .bzrignore
A [revision details]
M setup.py
M txzookeeper/
A txzookeeper/
M txzookeeper/
M txzookeeper/
M txzookeeper/
A txzookeeper/
M txzookeeper/
- 55. By Kapil Thangavelu
-
remove unesc. expire handler from retry client
William Reade (fwereade) wrote : | # |
This looks very nice indeed, but the vagueness of the "comes at a cost"
comment makes me wonder what I'm missing...
https:/
File txzookeeper/
https:/
txzookeeper/
Can't you just do:
with self._ctx():
?
https:/
txzookeeper/
Ditto
https:/
txzookeeper/
Please expand ;).
https:/
txzookeeper/
to restablish the session.
I'm pretty sure there are 2 es in re-establish, and that the hyphen may
have fallen out of common use but remains IMO more readable. As you
wish.
https:/
txzookeeper/
ephemerals, and trigger watches.
I like the word "restabslish", but ditto.
https:/
File txzookeeper/
https:/
txzookeeper/
(1/3 session time as ping)
s/propogate/
Kapil Thangavelu (hazmat) wrote : | # |
On 2012/04/04 13:23:29, fwereade wrote:
> This looks very nice indeed, but the vagueness of the "comes at a
cost" comment
> makes me wonder what I'm missing...
expanded greatly what those costs are.
> https:/
> File txzookeeper/
https:/
> txzookeeper/
> Can't you just do:
> with self._ctx():
> ?
much nicer, thanks.
https:/
> txzookeeper/
> Ditto
done.
https:/
> txzookeeper/
> Please expand ;).
done.
https:/
> txzookeeper/
expiration to
> restablish the session.
> I'm pretty sure there are 2 es in re-establish, and that the hyphen
may have
> fallen out of common use but remains IMO more readable. As you wish.
proper speling established ;-)
https:/
> txzookeeper/
ephemerals,
> and trigger watches.
> I like the word "restabslish", but ditto.
https:/
> File txzookeeper/
https:/
> txzookeeper/
propogate (1/3
> session time as ping)
> s/propogate/
done.
Kapil Thangavelu (hazmat) wrote : | # |
Please take a look.
https:/
File txzookeeper/
https:/
txzookeeper/
On 2012/04/04 13:23:29, fwereade wrote:
> Can't you just do:
> with self._ctx():
> ?
thanks, much nicer.
- 56. By Kapil Thangavelu
-
address review comments
Jim Baker (jimbaker) wrote : | # |
+1, LGTM, just some minors
https:/
File txzookeeper/
https:/
txzookeeper/
Presumably this should be cb_re_establish... but I don't see the
difference here with the easier synonym: restore. Good either way with
me.
https:/
txzookeeper/
restablish.
it's... re-establish
(or are you saying something about being restful? ;) )
https:/
txzookeeper/
it's... re-establish
https:/
txzookeeper/
restablish.
It's... re-established... re-establish
https:/
txzookeeper/
I'm not going to mark any more of these!
https:/
txzookeeper/
(e, e))
Another minor: probably want some consistency in beginning with
uppercase. Not certain why you use log.error here, but log.exception
later for what appears to be a similar case.
https:/
File txzookeeper/
https:/
txzookeeper/
client event, and clears watches.
no need for comma
Preview Diff
1 | === added file '.bzrignore' |
2 | --- .bzrignore 1970-01-01 00:00:00 +0000 |
3 | +++ .bzrignore 2012-04-04 18:12:22 +0000 |
4 | @@ -0,0 +1,4 @@ |
5 | +.emacs.desktop |
6 | +_trial_temp |
7 | +txzookeeper |
8 | +txzookeeper.egg-info |
9 | |
10 | === modified file 'setup.py' |
11 | --- setup.py 2011-12-12 13:19:28 +0000 |
12 | +++ setup.py 2012-04-04 18:12:22 +0000 |
13 | @@ -45,8 +45,8 @@ |
14 | name="txzookeeper", |
15 | version=version, |
16 | description="Twisted api for Apache Zookeeper", |
17 | - author="Ensemble Developers", |
18 | - author_email="ensemble@lists.ubuntu.com", |
19 | + author="Juju Developers", |
20 | + author_email="juju@lists.ubuntu.com", |
21 | url="https://launchpad.net/txzookeeper", |
22 | license="LGPL", |
23 | packages=find_packages(), |
24 | |
25 | === modified file 'txzookeeper/client.py' |
26 | --- txzookeeper/client.py 2011-12-06 21:21:29 +0000 |
27 | +++ txzookeeper/client.py 2012-04-04 18:12:22 +0000 |
28 | @@ -96,15 +96,20 @@ |
29 | |
30 | @property |
31 | def state_name(self): |
32 | - return STATE_NAME_MAPPING[self.args[2]] |
33 | + return STATE_NAME_MAPPING.get(self.args[2], "unknown") |
34 | |
35 | @property |
36 | def type_name(self): |
37 | - return TYPE_NAME_MAPPING[self.args[1]] |
38 | + return TYPE_NAME_MAPPING.get(self.args[1], "unknown") |
39 | + |
40 | + @property |
41 | + def handle(self): |
42 | + return self.args[3] |
43 | |
44 | def __str__(self): |
45 | - return "<txzookeeper.ConnectionException type: %s state: %s>" % ( |
46 | - self.type_name, self.state_name) |
47 | + return ( |
48 | + "<txzookeeper.ConnectionException handle: %s type: %s state: %s>" |
49 | + % (self.handle, self.type_name, self.state_name)) |
50 | |
51 | |
52 | def is_connection_exception(e): |
53 | @@ -133,11 +138,11 @@ |
54 | |
55 | @property |
56 | def type_name(self): |
57 | - return TYPE_NAME_MAPPING[self.type] |
58 | + return TYPE_NAME_MAPPING.get(self.type, "unknown") |
59 | |
60 | @property |
61 | def state_name(self): |
62 | - return STATE_NAME_MAPPING[self.connection_state] |
63 | + return STATE_NAME_MAPPING.get(self.connection_state, "unknown") |
64 | |
65 | def __repr__(self): |
66 | return "<ClientEvent %s at %r state: %s>" % ( |
67 | @@ -169,6 +174,10 @@ |
68 | self.connected = False |
69 | self.handle = None |
70 | |
71 | + def __repr__(self): |
72 | + return "<txZookeeperClient client: %s handle: %r state: %s>" % ( |
73 | + self.client_id[0], self.handle, self.state) |
74 | + |
75 | def _check_connected(self, d): |
76 | if not self.connected: |
77 | d.errback(NotConnectedException("not connected")) |
78 | @@ -218,7 +227,7 @@ |
79 | d.callback((value, stat)) |
80 | |
81 | callback = self._zk_thread_callback(_cb_get) |
82 | - watcher = self._wrap_watcher(watcher) |
83 | + watcher = self._wrap_watcher(watcher, "get", path) |
84 | result = zookeeper.aget(self.handle, path, watcher, callback) |
85 | self._check_result(result, d) |
86 | return d |
87 | @@ -234,7 +243,7 @@ |
88 | d.callback(children) |
89 | |
90 | callback = self._zk_thread_callback(_cb_get_children) |
91 | - watcher = self._wrap_watcher(watcher) |
92 | + watcher = self._wrap_watcher(watcher, "child", path) |
93 | result = zookeeper.aget_children(self.handle, path, watcher, callback) |
94 | self._check_result(result, d) |
95 | return d |
96 | @@ -251,12 +260,12 @@ |
97 | d.callback(stat) |
98 | |
99 | callback = self._zk_thread_callback(_cb_exists) |
100 | - watcher = self._wrap_watcher(watcher) |
101 | + watcher = self._wrap_watcher(watcher, "exists", path) |
102 | result = zookeeper.aexists(self.handle, path, watcher, callback) |
103 | self._check_result(result, d) |
104 | return d |
105 | |
106 | - def _wrap_watcher(self, watcher): |
107 | + def _wrap_watcher(self, watcher, watch_type, path): |
108 | if watcher is None: |
109 | return watcher |
110 | if not callable(watcher): |
111 | @@ -285,14 +294,19 @@ |
112 | else: |
113 | return watcher(event_type, conn_state, path) |
114 | |
115 | - def _zk_thread_callback(self, func): |
116 | + def _zk_thread_callback(self, func, *f_args, **f_kw): |
117 | """ |
118 | The client library invokes callbacks in a separate thread, we wrap |
119 | any user defined callback so that they are called back in the main |
120 | thread after, zookeeper calls the wrapper. |
121 | """ |
122 | + f_args = list(f_args) |
123 | + |
124 | def wrapper(handle, *args): # pragma: no cover |
125 | - reactor.callFromThread(func, *args) |
126 | + # make a copy the conn watch callback gets invoked multiple times |
127 | + cb_args = list(f_args) |
128 | + cb_args.extend(args) |
129 | + reactor.callFromThread(func, *cb_args, **f_kw) |
130 | return wrapper |
131 | |
132 | @property |
133 | @@ -391,6 +405,7 @@ |
134 | d = defer.Deferred() |
135 | if self._check_result(result, d): |
136 | return d |
137 | + self.handle = None |
138 | d.callback(True) |
139 | return d |
140 | |
141 | @@ -437,7 +452,6 @@ |
142 | else: |
143 | self.handle = zookeeper.init( |
144 | self._servers, callback, self._session_timeout) |
145 | - |
146 | return d |
147 | |
148 | def _cb_connected( |
149 | @@ -460,6 +474,7 @@ |
150 | # If we timed out and then connected, then close the conn. |
151 | if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called: |
152 | self.close() |
153 | + return |
154 | |
155 | # Send session events to the callback, in addition to any |
156 | # duplicate session events that will be sent for extant watches. |
157 | @@ -468,7 +483,10 @@ |
158 | self, ClientEvent(type, state, path)) |
159 | |
160 | return |
161 | - elif state == zookeeper.CONNECTED_STATE: |
162 | + # Connected successfully, or If we're expired on an initial |
163 | + # connect, someone else expired us. |
164 | + elif state in (zookeeper.CONNECTED_STATE, |
165 | + zookeeper.EXPIRED_SESSION_STATE): |
166 | connect_deferred.callback(self) |
167 | return |
168 | |
169 | @@ -488,17 +506,18 @@ |
170 | if self._check_connected(d): |
171 | return d |
172 | |
173 | - def _cb_created(result_code, path): |
174 | - if self._check_result(result_code, d): |
175 | - return |
176 | - d.callback(path) |
177 | - |
178 | - callback = self._zk_thread_callback(_cb_created) |
179 | + callback = self._zk_thread_callback( |
180 | + self._cb_created, d, data, acls, flags) |
181 | result = zookeeper.acreate( |
182 | self.handle, path, data, acls, flags, callback) |
183 | self._check_result(result, d) |
184 | return d |
185 | |
186 | + def _cb_created(self, d, data, acls, flags, result_code, path): |
187 | + if self._check_result(result_code, d): |
188 | + return |
189 | + d.callback(path) |
190 | + |
191 | def delete(self, path, version=-1): |
192 | """ |
193 | Delete the node at the given path. If the current node version on the |
194 | @@ -513,16 +532,16 @@ |
195 | if self._check_connected(d): |
196 | return d |
197 | |
198 | - def _cb_delete(result_code): |
199 | - if self._check_result(result_code, d): |
200 | - return |
201 | - d.callback(result_code) |
202 | - |
203 | - callback = self._zk_thread_callback(_cb_delete) |
204 | + callback = self._zk_thread_callback(self._cb_deleted, d, path) |
205 | result = zookeeper.adelete(self.handle, path, version, callback) |
206 | self._check_result(result, d) |
207 | return d |
208 | |
209 | + def _cb_deleted(self, d, path, result_code): |
210 | + if self._check_result(result_code, d): |
211 | + return |
212 | + d.callback(result_code) |
213 | + |
214 | def exists(self, path): |
215 | """ |
216 | Check that the given node path exists. Returns a deferred that |
217 | @@ -657,17 +676,17 @@ |
218 | if self._check_connected(d): |
219 | return d |
220 | |
221 | - def _cb_set_acl(result_code): |
222 | - if self._check_result(result_code, d): |
223 | - return |
224 | - d.callback(result_code) |
225 | - |
226 | - callback = self._zk_thread_callback(_cb_set_acl) |
227 | + callback = self._zk_thread_callback(self._cb_set_acl, d, path, acls) |
228 | result = zookeeper.aset_acl( |
229 | self.handle, path, version, acls, callback) |
230 | self._check_result(result, d) |
231 | return d |
232 | |
233 | + def _cb_set_acl(self, d, path, acls, result_code): |
234 | + if self._check_result(result_code, d): |
235 | + return |
236 | + d.callback(result_code) |
237 | + |
238 | def set(self, path, data="", version=-1): |
239 | """ |
240 | Sets the data of a node at the given path. If the current node version |
241 | @@ -683,16 +702,16 @@ |
242 | if self._check_connected(d): |
243 | return d |
244 | |
245 | - def _cb_set(result_code, node_stat): |
246 | - if self._check_result(result_code, d): |
247 | - return |
248 | - d.callback(node_stat) |
249 | - |
250 | - callback = self._zk_thread_callback(_cb_set) |
251 | + callback = self._zk_thread_callback(self._cb_set, d, path, data) |
252 | result = zookeeper.aset(self.handle, path, data, version, callback) |
253 | self._check_result(result, d) |
254 | return d |
255 | |
256 | + def _cb_set(self, d, path, data, result_code, node_stat): |
257 | + if self._check_result(result_code, d): |
258 | + return |
259 | + d.callback(node_stat) |
260 | + |
261 | def set_connection_watcher(self, watcher): |
262 | """ |
263 | Sets a permanent global watcher on the connection. This will get |
264 | @@ -702,7 +721,7 @@ |
265 | """ |
266 | if not callable(watcher): |
267 | raise SyntaxError("Invalid Watcher %r" % (watcher)) |
268 | - watcher = self._wrap_watcher(watcher) |
269 | + watcher = self._wrap_watcher(watcher, None, None) |
270 | zookeeper.set_watcher(self.handle, watcher) |
271 | |
272 | def set_session_callback(self, callback): |
273 | @@ -740,9 +759,13 @@ |
274 | """ |
275 | if not callable(callback): |
276 | raise TypeError("Invalid callback %r" % callback) |
277 | + if self._connection_error_callback is not None: |
278 | + raise RuntimeError(( |
279 | + "Connection error handlers can't be changed %s" % |
280 | + self._connection_error_callback)) |
281 | self._connection_error_callback = callback |
282 | |
283 | - def set_determinstic_order(self, boolean): |
284 | + def set_deterministic_order(self, boolean): |
285 | """ |
286 | The zookeeper client will by default randomize the server hosts |
287 | it will connect to unless this is set to True. |
288 | |
289 | === added file 'txzookeeper/managed.py' |
290 | --- txzookeeper/managed.py 1970-01-01 00:00:00 +0000 |
291 | +++ txzookeeper/managed.py 2012-04-04 18:12:22 +0000 |
292 | @@ -0,0 +1,350 @@ |
293 | +from functools import partial |
294 | + |
295 | +import contextlib |
296 | +import logging |
297 | +import zookeeper |
298 | + |
299 | +from twisted.internet.defer import ( |
300 | + inlineCallbacks, DeferredLock, fail, returnValue) |
301 | + |
302 | +from client import ZookeeperClient, ClientEvent, NotConnectedException |
303 | +from retry import RetryClient |
304 | + |
305 | + |
306 | +class StopWatcher(Exception): |
307 | + pass |
308 | + |
309 | + |
310 | +WATCH_KIND_MAP = { |
311 | + "child": "get_children_and_watch", |
312 | + "exists": "exists_and_watch", |
313 | + "get": "get_and_watch" |
314 | + } |
315 | + |
316 | + |
317 | +log = logging.getLogger("txzk.managed") |
318 | + |
319 | + |
320 | +class Watch(object): |
321 | + """ |
322 | + For application driven persistent watches, where the application |
323 | + is manually resetting the watch. |
324 | + """ |
325 | + |
326 | + __slots__ = ("_mgr", "_client", "_path", "_kind", "_callback") |
327 | + |
328 | + def __init__(self, mgr, path, kind, callback): |
329 | + self._mgr = mgr |
330 | + self._path = path |
331 | + self._kind = kind |
332 | + self._callback = callback |
333 | + |
334 | + @property |
335 | + def path(self): |
336 | + return self._path |
337 | + |
338 | + @property |
339 | + def kind(self): |
340 | + return self._kind |
341 | + |
342 | + @contextlib.contextmanager |
343 | + def _ctx(self): |
344 | + mgr = self._mgr |
345 | + del self._mgr |
346 | + try: |
347 | + yield mgr |
348 | + finally: |
349 | + mgr.remove(self) |
350 | + |
351 | + @inlineCallbacks |
352 | + def reset(self): |
353 | + with self._ctx(): |
354 | + yield self._callback( |
355 | + zookeeper.SESSION_EVENT, |
356 | + zookeeper.CONNECTED_STATE, |
357 | + self._path) |
358 | + |
359 | + def __call__(self, *args, **kw): |
360 | + with self._ctx(): |
361 | + return self._callback(*args, **kw) |
362 | + |
363 | + def __str__(self): |
364 | + return "<Watcher %s %s %r>" % (self.kind, self.path, self._callback) |
365 | + |
366 | + |
367 | +class WatchManager(object): |
368 | + |
369 | + watch_class = Watch |
370 | + |
371 | + def __init__(self): |
372 | + self._watches = [] |
373 | + |
374 | + def add(self, path, watch_type, watcher): |
375 | + w = self.watch_class(self, path, watch_type, watcher) |
376 | + self._watches.append(w) |
377 | + return w |
378 | + |
379 | + def remove(self, w): |
380 | + try: |
381 | + self._watches.remove(w) |
382 | + except ValueError: |
383 | + pass |
384 | + |
385 | + def iterkeys(self): |
386 | + for w in self._watches: |
387 | + yield (w.path, w.kind) |
388 | + |
389 | + def clear(self): |
390 | + del self._watches |
391 | + self._watches = [] |
392 | + |
393 | + @inlineCallbacks |
394 | + def reset(self, *ignored): |
395 | + watches = self._watches |
396 | + self._watches = [] |
397 | + |
398 | + for w in watches: |
399 | + try: |
400 | + yield w.reset() |
401 | + except Exception, e: |
402 | + log.error("Error reseting watch %s with session event. %s %r", |
403 | + w, e, e) |
404 | + continue |
405 | + |
406 | + |
407 | +class SessionClient(ZookeeperClient): |
408 | + """A managed client that automatically re-establishes ephemerals and |
409 | + triggers watches after reconnecting post session expiration. |
410 | + |
411 | + This abstracts the client from session expiration handling. It does |
412 | + come at a cost though. |
413 | + |
414 | + There are two application constraints that need to be considered for usage |
415 | + of the SessionClient or ManagedClient. The first is that watch callbacks |
416 | + which examine the event, must be able to handle the synthetic session |
417 | + event which is sent to them when the session is re-established. |
418 | + |
419 | + The second and more problematic is that algorithms/patterns |
420 | + utilizing ephemeral sequence nodes need to be rethought, as the |
421 | + session client will recreate the nodes when reconnecting at their |
422 | + previous paths. Some algorithms (like the std zk lock recipe) rely |
423 | + on properties like the smallest valued ephemeral sequence node in |
424 | + a container to identify the lock holder, with the notion that upon |
425 | + session expiration a new lock/leader will be sought. Sequence |
426 | + ephemeral node recreation in this context is problematic as the |
427 | + node is recreated at the exact previous path. Alternative lock |
428 | + strategies that do work are fairly simple at low volume, such as |
429 | + owning a particular node path (ie. /locks/lock-holder) with an |
430 | + ephemeral. |
431 | + """ |
432 | + |
433 | + def __init__( |
434 | + self, servers=None, session_timeout=None, connect_timeout=4000): |
435 | + """ |
436 | + """ |
437 | + super(SessionClient, self).__init__(servers, session_timeout) |
438 | + self._connect_timeout = connect_timeout |
439 | + self._watches = WatchManager() |
440 | + self._ephemerals = {} |
441 | + self._reconnect_lock = DeferredLock() |
442 | + self.set_connection_error_callback(self._cb_connection_error) |
443 | + |
444 | + @inlineCallbacks |
445 | + def cb_restablish_session(self, e=None): |
446 | + """Called on intercept of session expiration to create new session. |
447 | + |
448 | + This will reconnect to zk, re-establish ephemerals, and |
449 | + trigger watches. |
450 | + """ |
451 | + yield self._reconnect_lock.acquire() |
452 | + |
453 | + try: |
454 | + # If its been explicitly closed, don't restablish. |
455 | + if self.handle is None: |
456 | + return |
457 | + |
458 | + # If its a stale handle, don't restablish |
459 | + try: |
460 | + zookeeper.is_unrecoverable(self.handle) |
461 | + except zookeeper.ZooKeeperException: |
462 | + if e: |
463 | + raise e |
464 | + return |
465 | + |
466 | + # Its already been restablished, don't restablish. |
467 | + if not self.unrecoverable: |
468 | + return |
469 | + elif self.connected: |
470 | + self.close() |
471 | + self.handle = 0 |
472 | + elif isinstance(self.handle, int): |
473 | + self.handle = 0 |
474 | + |
475 | + # Restablish |
476 | + assert self.handle == 0 |
477 | + yield self._cb_restablish_session().addErrback( |
478 | + self._cb_restablish_errback, e) |
479 | + |
480 | + except Exception, e: |
481 | + log.error("error while restablish %r %s" % (e, e)) |
482 | + finally: |
483 | + yield self._reconnect_lock.release() |
484 | + |
485 | + @inlineCallbacks |
486 | + def _cb_restablish_session(self): |
487 | + """Re-establish a new session, and recreate ephemerals and watches. |
488 | + """ |
489 | + while 1: |
490 | + # Reconnect |
491 | + if self.handle is None: |
492 | + returnValue(self.handle) |
493 | + try: |
494 | + yield self.connect(timeout=self._connect_timeout) |
495 | + except zookeeper.ZooKeeperException, e: |
496 | + log.exception("Error while connecting %r %s" % (e, e)) |
497 | + continue |
498 | + else: |
499 | + break |
500 | + |
501 | + items = self._ephemerals.items() |
502 | + self._ephemerals = {} |
503 | + |
504 | + for path, e in items: |
505 | + try: |
506 | + yield self.create( |
507 | + path, e['data'], acls=e['acls'], flags=e['flags']) |
508 | + except zookeeper.NodeExistsException: |
509 | + log.error("Attempt to create ephemeral node failed %r", path) |
510 | + yield self._watches.reset() |
511 | + |
512 | + def _cb_restablish_errback(self, err, failure): |
513 | + """If there's an error re-establishing the session log it. |
514 | + """ |
515 | + log.error("Error while trying to restablish connection %s\n%s" % ( |
516 | + failure.value, failure.getTraceback())) |
517 | + return failure |
518 | + |
519 | + @inlineCallbacks |
520 | + def _cb_connection_error(self, client, error): |
521 | + """Convert session expiration to a transient connection error. |
522 | + |
523 | + Dispatches from api usage error. |
524 | + """ |
525 | + if not isinstance(error, ( |
526 | + zookeeper.SessionExpiredException, |
527 | + NotConnectedException, |
528 | + zookeeper.ClosingException)): |
529 | + raise error |
530 | + yield self._cb_restablish_session() |
531 | + raise zookeeper.ConnectionLossException |
532 | + |
533 | + @inlineCallbacks |
534 | + def _cb_connection_event(self, evt): |
535 | + """ |
536 | + """ |
537 | + |
538 | + # client connected tracker |
539 | + def _check_connected(self, d): |
540 | + """Clients are automatically reconnected.""" |
541 | + if self.connected: |
542 | + return |
543 | + |
544 | + if self.handle is None: |
545 | + d.errback(NotConnectedException("not connected")) |
546 | + return d |
547 | + |
548 | + c_d = self.cb_restablish_session() |
549 | + |
550 | + def after_connected(client): |
551 | + """Return a transient connection failure. |
552 | + |
553 | + The retry client will automatically attempt to retry the operation. |
554 | + """ |
555 | + return fail(zookeeper.ConnectionLossException("Retry")) |
556 | + |
557 | + c_d.addCallback(after_connected) |
558 | + c_d.chainDeferred(d) |
559 | + return d |
560 | + |
561 | + # Dispatch from node watches on session expiration |
562 | + def _watch_session_wrapper(self, watcher, event_type, conn_state, path): |
563 | + """Watch wrapper that diverts session events to a connection callback. |
564 | + """ |
565 | + if (event_type == zookeeper.SESSION_EVENT and |
566 | + conn_state == zookeeper.EXPIRED_SESSION_STATE): |
567 | + d = self.cb_restablish_session() |
568 | + d.addErrback(self._cb_restablish_errback) |
569 | + return d |
570 | + if event_type == zookeeper.SESSION_EVENT: |
571 | + if self._session_event_callback: |
572 | + self._session_event_callback( |
573 | + self, ClientEvent(event_type, conn_state, path)) |
574 | + else: |
575 | + return watcher(event_type, conn_state, path) |
576 | + |
577 | + # Track all watches |
578 | + def _wrap_watcher(self, watcher, watch_type, path): |
579 | + if watcher is None: |
580 | + return watcher |
581 | + if not callable(watcher): |
582 | + raise SyntaxError("invalid watcher") |
583 | + |
584 | + # handle conn watcher, separately. |
585 | + if watch_type is None and path is None: |
586 | + return self._zk_thread_callback( |
587 | + self._watch_session_wrapper, watcher) |
588 | + |
589 | + return self._zk_thread_callback( |
590 | + partial( |
591 | + self._watch_session_wrapper, |
592 | + self._watches.add(path, watch_type, watcher))) |
593 | + |
594 | + # Track ephemerals |
595 | + def _cb_created(self, d, data, acls, flags, result_code, path): |
596 | + if self._check_result(result_code, d): |
597 | + return |
598 | + |
599 | + if flags & zookeeper.EPHEMERAL: |
600 | + self._ephemerals[path] = dict( |
601 | + data=data, acls=acls, flags=flags) |
602 | + |
603 | + d.callback(path) |
604 | + |
605 | + def _cb_deleted(self, d, path, result_code): |
606 | + if self._check_result(result_code, d): |
607 | + return |
608 | + |
609 | + self._ephemerals.pop(path, None) |
610 | + d.callback(result_code) |
611 | + |
612 | + def _cb_set_acl(self, d, path, acls, result_code): |
613 | + if self._check_result(result_code, d): |
614 | + return |
615 | + |
616 | + if path in self._ephemerals: |
617 | + self._ephemerals[path]['acls'] = acls |
618 | + |
619 | + d.callback(result_code) |
620 | + |
621 | + def _cb_set(self, d, path, data, result_code, node_stat): |
622 | + if self._check_result(result_code, d): |
623 | + return |
624 | + |
625 | + if path in self._ephemerals: |
626 | + self._ephemerals[path]['data'] = data |
627 | + |
628 | + d.callback(node_stat) |
629 | + |
630 | + |
631 | +class ManagedClient(RetryClient): |
632 | + |
633 | + def __init__(self, *args, **kw): |
634 | + client = SessionClient(*args, **kw) |
635 | + super(ManagedClient, self).__init__( |
636 | + client, client.cb_restablish_session) |
637 | + |
638 | + |
639 | +def ManagedClient(servers=None, session_timeout=None, connect_timeout=10000): |
640 | + from retry import RetryClient |
641 | + client = SessionClient(servers, session_timeout, connect_timeout) |
642 | + return RetryClient(client) |
643 | |
644 | === modified file 'txzookeeper/retry.py' |
645 | --- txzookeeper/retry.py 2011-11-08 12:15:16 +0000 |
646 | +++ txzookeeper/retry.py 2012-04-04 18:12:22 +0000 |
647 | @@ -64,7 +64,7 @@ |
648 | |
649 | :param session_timeout: The timeout for the session, in milliseconds |
650 | :param max_delay: The max delay for a retry, in seconds. |
651 | - :param session_fraction: The fractinoal amount of a timeout to wait |
652 | + :param session_fraction: The fractional amount of a timeout to wait |
653 | """ |
654 | retry_delay = session_timeout / (float(session_fraction) * 1000) |
655 | return min(retry_delay, max_delay) |
656 | @@ -111,20 +111,20 @@ |
657 | must return a single value (either a deferred or result |
658 | value). |
659 | """ |
660 | - # For clients which aren't connected (session timeout == None) |
661 | - # we raise the errors to the callers |
662 | - session_timeout = client.session_timeout or 0 |
663 | - |
664 | - # If we keep retrying past the 1.5 * session timeout without |
665 | - # success just die, the session expiry is fatal. |
666 | - max_time = session_timeout * 1.5 + time.time() |
667 | - |
668 | while 1: |
669 | try: |
670 | value = yield func(*args, **kw) |
671 | except Exception, e: |
672 | + # For clients which aren't connected (session timeout == None) |
673 | + # we raise the errors to the callers |
674 | + session_timeout = client.session_timeout or 0 |
675 | + |
676 | + # If we keep retrying past the 1.5 * session timeout without |
677 | + # success just die, the session expiry is fatal. |
678 | + max_time = session_timeout * 1.5 + time.time() |
679 | if not check_retryable(client, max_time, e): |
680 | raise |
681 | + |
682 | # Give the connection a chance to auto-heal. |
683 | yield sleep(get_delay(session_timeout)) |
684 | continue |
685 | @@ -172,6 +172,7 @@ |
686 | # Give the connection a chance to auto-heal |
687 | d = sleep(get_delay(session_timeout)) |
688 | d.addCallback(retry_inner) |
689 | + |
690 | return d |
691 | |
692 | def retry_inner(value): |
693 | |
694 | === modified file 'txzookeeper/tests/__init__.py' |
695 | --- txzookeeper/tests/__init__.py 2011-10-22 04:24:02 +0000 |
696 | +++ txzookeeper/tests/__init__.py 2012-04-04 18:12:22 +0000 |
697 | @@ -20,6 +20,8 @@ |
698 | # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
699 | # |
700 | |
701 | +import logging |
702 | +import StringIO |
703 | import sys |
704 | |
705 | import zookeeper |
706 | @@ -72,6 +74,26 @@ |
707 | return None |
708 | return original_value |
709 | |
710 | + def capture_log( |
711 | + self, name="", level=logging.INFO, log_file=None, formatter=None): |
712 | + """Capture log channel to StringIO""" |
713 | + if log_file is None: |
714 | + log_file = StringIO.StringIO() |
715 | + log_handler = logging.StreamHandler(log_file) |
716 | + if formatter: |
717 | + log_handler.setFormatter(formatter) |
718 | + logger = logging.getLogger(name) |
719 | + logger.addHandler(log_handler) |
720 | + old_logger_level = logger.level |
721 | + logger.setLevel(level) |
722 | + |
723 | + @self.addCleanup |
724 | + def reset_logging(): |
725 | + logger.removeHandler(log_handler) |
726 | + logger.setLevel(old_logger_level) |
727 | + |
728 | + return log_file |
729 | + |
730 | |
731 | def egg_test_runner(): |
732 | """ |
733 | |
734 | === modified file 'txzookeeper/tests/test_client.py' |
735 | --- txzookeeper/tests/test_client.py 2011-10-22 04:24:02 +0000 |
736 | +++ txzookeeper/tests/test_client.py 2012-04-04 18:12:22 +0000 |
737 | @@ -1050,7 +1050,7 @@ |
738 | d = self.client.connect() |
739 | |
740 | def verify_session_timeout(client): |
741 | - self.assertEqual(client.session_timeout, 4000) |
742 | + self.assertIn(client.session_timeout, (4000, 10000)) |
743 | |
744 | d.addCallback(verify_session_timeout) |
745 | return d |
746 | |
747 | === added file 'txzookeeper/tests/test_managed.py' |
748 | --- txzookeeper/tests/test_managed.py 1970-01-01 00:00:00 +0000 |
749 | +++ txzookeeper/tests/test_managed.py 2012-04-04 18:12:22 +0000 |
750 | @@ -0,0 +1,263 @@ |
751 | +# |
752 | +# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved |
753 | +# |
754 | +# This file is part of txzookeeper. |
755 | +# |
756 | +# Authors: |
757 | +# Kapil Thangavelu |
758 | +# |
759 | +# txzookeeper is free software: you can redistribute it and/or modify |
760 | +# it under the terms of the GNU Lesser General Public License as published by |
761 | +# the Free Software Foundation, either version 3 of the License, or |
762 | +# (at your option) any later version. |
763 | +# |
764 | +# txzookeeper is distributed in the hope that it will be useful, |
765 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
766 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
767 | +# GNU Lesser General Public License for more details. |
768 | +# |
769 | +# You should have received a copy of the GNU Lesser General Public License |
770 | +# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
771 | +# |
772 | + |
773 | +import zookeeper |
774 | + |
775 | +from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList |
776 | + |
777 | +from txzookeeper.client import ZookeeperClient, ClientEvent |
778 | +from txzookeeper import managed |
779 | +from txzookeeper.tests import ZookeeperTestCase, utils |
780 | +from txzookeeper.tests import test_client |
781 | + |
782 | + |
783 | +class WatchTest(ZookeeperTestCase): |
784 | + """Watch manager and watch tests""" |
785 | + |
786 | + def setUp(self): |
787 | + self.watches = managed.WatchManager() |
788 | + |
789 | + def tearDown(self): |
790 | + self.watches.clear() |
791 | + del self.watches |
792 | + |
793 | + def test_add_remove(self): |
794 | + |
795 | + w = self.watches.add("/foobar", "child", lambda x: 1) |
796 | + self.assertIn( |
797 | + "<Watcher child /foobar", |
798 | + str(w)) |
799 | + self.assertIn(w, self.watches._watches) |
800 | + self.watches.remove(w) |
801 | + self.assertNotIn(w, self.watches._watches) |
802 | + self.watches.remove(w) |
803 | + |
804 | + @inlineCallbacks |
805 | + def test_watch_fire_removes(self): |
806 | + """Firing the watch removes it from the manager. |
807 | + """ |
808 | + w = self.watches.add("/foobar", "child", lambda x: 1) |
809 | + yield w("a") |
810 | + self.assertNotIn(w, self.watches._watches) |
811 | + |
812 | + @inlineCallbacks |
813 | + def test_watch_fire_with_error_removes(self): |
814 | + """Firing the watch removes it from the manager. |
815 | + """ |
816 | + d = Deferred() |
817 | + |
818 | + @inlineCallbacks |
819 | + def cb_error(e): |
820 | + yield d |
821 | + raise ValueError("a") |
822 | + |
823 | + w = self.watches.add("/foobar", "child", lambda x: 1) |
824 | + try: |
825 | + wd = w("a") |
826 | + d.callback(True) |
827 | + yield wd |
828 | + except ValueError: |
829 | + pass |
830 | + self.assertNotIn(w, self.watches._watches) |
831 | + |
832 | + @inlineCallbacks |
833 | + def test_reset_with_error(self): |
834 | + """A callback firing an error on reset is ignored. |
835 | + """ |
836 | + output = self.capture_log("txzk.managed") |
837 | + d = Deferred() |
838 | + results = [] |
839 | + |
840 | + @inlineCallbacks |
841 | + def callback(*args, **kw): |
842 | + results.append((args, kw)) |
843 | + yield d |
844 | + raise ValueError("a") |
845 | + |
846 | + w = self.watches.add("/foobar", "child", callback) |
847 | + reset_done = self.watches.reset() |
848 | + |
849 | + e, _ = results.pop() |
850 | + self.assertEqual( |
851 | + str(ClientEvent(*e)), |
852 | + "<ClientEvent session at '/foobar' state: connected>") |
853 | + d.callback(True) |
854 | + yield reset_done |
855 | + self.assertNotIn(w, self.watches._watches) |
856 | + self.assertIn("Error reseting watch", output.getvalue()) |
857 | + |
858 | + @inlineCallbacks |
859 | + def test_reset(self): |
860 | + """Reset fires a synthentic client event, and clears watches. |
861 | + """ |
862 | + d = Deferred() |
863 | + results = [] |
864 | + |
865 | + def callback(*args, **kw): |
866 | + results.append((args, kw)) |
867 | + return d |
868 | + |
869 | + w = self.watches.add("/foobar", "child", callback) |
870 | + reset_done = self.watches.reset() |
871 | + |
872 | + e, _ = results.pop() |
873 | + self.assertEqual( |
874 | + str(ClientEvent(*e)), |
875 | + "<ClientEvent session at '/foobar' state: connected>") |
876 | + d.callback(True) |
877 | + yield reset_done |
878 | + self.assertNotIn(w, self.watches._watches) |
879 | + |
880 | + |
881 | +class SessionClientTests(test_client.ClientTests): |
882 | + """Run through basic operations with SessionClient.""" |
883 | + timeout = 5 |
884 | + |
885 | + def setUp(self): |
886 | + super(SessionClientTests, self).setUp() |
887 | + self.client = managed.SessionClient("127.0.0.1:2181") |
888 | + |
889 | + def test_client_use_while_disconnected_returns_failure(self): |
890 | + # managed session client reconnects here. |
891 | + return True |
892 | + |
893 | + |
894 | +class SessionClientExpireTests(ZookeeperTestCase): |
895 | + """Verify expiration behavior.""" |
896 | + |
897 | + def setUp(self): |
898 | + super(SessionClientExpireTests, self).setUp() |
899 | + self.client = managed.ManagedClient("127.0.0.1:2181", 3000) |
900 | + self.client2 = None |
901 | + return self.client.connect() |
902 | + |
903 | + @inlineCallbacks |
904 | + def tearDown(self): |
905 | + self.client.close() |
906 | + |
907 | + self.client2 = ZookeeperClient("127.0.0.1:2181") |
908 | + yield self.client2.connect() |
909 | + utils.deleteTree(handle=self.client2.handle) |
910 | + yield self.client2.close() |
911 | + super(SessionClientExpireTests, self).tearDown() |
912 | + |
913 | + @inlineCallbacks |
914 | + def expire_session(self): |
915 | + assert self.client.connected |
916 | + self.client2 = ZookeeperClient(self.client.servers) |
917 | + yield self.client2.connect(client_id=self.client.client_id) |
918 | + yield self.client2.close() |
919 | + # It takes some time to propagate (1/3 session time as ping) |
920 | + yield self.sleep(2) |
921 | + |
922 | + @inlineCallbacks |
923 | + def test_session_expiration_conn(self): |
924 | + session_id = self.client.client_id[0] |
925 | + yield self.client.create("/fo-1", "abc") |
926 | + yield self.expire_session() |
927 | + yield self.client.exists("/") |
928 | + self.assertNotEqual(session_id, self.client.client_id[0]) |
929 | + |
930 | + @inlineCallbacks |
931 | + def test_session_expiration_conn_watch(self): |
932 | + session_id = self.client.client_id[0] |
933 | + yield self.client.create("/fo-1", "abc") |
934 | + yield self.expire_session() |
935 | + yield self.client.exists("/") |
936 | + self.assertNotEqual(session_id, self.client.client_id[0]) |
937 | + |
938 | + @inlineCallbacks |
939 | + def test_invoked_watch_gc(self): |
940 | + c_d, w_d = yield self.client.get_children_and_watch("/") |
941 | + yield c_d |
942 | + yield self.client.create("/foo") |
943 | + yield w_d |
944 | + yield self.expire_session() |
945 | + yield self.client.create("/foo2") |
946 | + # Nothing should blow up |
947 | + yield self.sleep(0.2) |
948 | + |
949 | + @inlineCallbacks |
950 | + def test_ephemeral_and_watch_recreate(self): |
951 | + # Create some ephemeral nodes |
952 | + yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
953 | + yield self.client.create("/fo-2", "def", flags=zookeeper.EPHEMERAL) |
954 | + |
955 | + # Create some watches |
956 | + g_d, g_w_d = self.client.get_and_watch("/fo-1") |
957 | + yield g_d |
958 | + |
959 | + c_d, c_w_d = self.client.get_children_and_watch("/") |
960 | + yield g_d |
961 | + |
962 | + e_d, e_w_d = self.client.get_children_and_watch("/fo-2") |
963 | + yield e_d |
964 | + |
965 | + # Expire the session |
966 | + yield self.expire_session() |
967 | + |
968 | + # Poof |
969 | + |
970 | + # Ephemerals back |
971 | + c, s = yield self.client.get("/fo-1") |
972 | + self.assertEqual(c, "abc") |
973 | + |
974 | + c, s = yield self.client.get("/fo-2") |
975 | + self.assertEqual(c, "def") |
976 | + |
977 | + # Watches triggered |
978 | + yield DeferredList( |
979 | + [g_w_d, c_w_d, e_w_d], |
980 | + fireOnOneErrback=True, consumeErrors=True) |
981 | + |
982 | + self.assertEqual( |
983 | + [str(d.result) for d in (g_w_d, c_w_d, e_w_d)], |
984 | + ["<ClientEvent session at '/fo-1' state: connected>", |
985 | + "<ClientEvent session at '/' state: connected>", |
986 | + "<ClientEvent session at '/fo-2' state: connected>"]) |
987 | + |
988 | + @inlineCallbacks |
989 | + def test_ephemeral_content_modification(self): |
990 | + yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
991 | + yield self.client.set("/fo-1", "def") |
992 | + yield self.expire_session() |
993 | + c, s = yield self.client.get("/fo-1") |
994 | + self.assertEqual(c, "def") |
995 | + |
996 | + @inlineCallbacks |
997 | + def test_ephemeral_acl_modification(self): |
998 | + yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
999 | + acl = [test_client.PUBLIC_ACL, |
1000 | + dict(scheme="digest", |
1001 | + id="zebra:moon", |
1002 | + perms=zookeeper.PERM_ALL)] |
1003 | + yield self.client.set_acl("/fo-1", acl) |
1004 | + yield self.expire_session() |
1005 | + n_acl, stat = yield self.client.get_acl("/fo-1") |
1006 | + self.assertEqual(acl, n_acl) |
1007 | + |
1008 | + @inlineCallbacks |
1009 | + def test_ephemeral_deletion(self): |
1010 | + yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
1011 | + yield self.client.delete("/fo-1") |
1012 | + yield self.expire_session() |
1013 | + self.assertFalse((yield self.client.exists("/fo-1"))) |
1014 | |
1015 | === modified file 'txzookeeper/tests/test_retry.py' |
1016 | --- txzookeeper/tests/test_retry.py 2011-11-08 12:15:16 +0000 |
1017 | +++ txzookeeper/tests/test_retry.py 2012-04-04 18:12:22 +0000 |
1018 | @@ -1,5 +1,5 @@ |
1019 | # |
1020 | -# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
1021 | +# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved |
1022 | # |
1023 | # This file is part of txzookeeper. |
1024 | # |
Reviewers: mp+100715_ code.launchpad. net,
Message:
Please take a look.
Description:
Managed Connections for txzk
A managed connection automatically handles transient connection
failures,
and session expiration. On session expiration it recreates ephemeral
nodes,
and fire watch events on extant watchers.
https:/ /code.launchpad .net/~hazmat/ txzookeeper/ managed- watch-and- ephemeral/ +merge/ 100715
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/5970080/
Affected files: client. py managed. py retry.py tests/_ _init__ .py tests/test_ client. py tests/test_ managed. py tests/test_ retry.py
A .bzrignore
A [revision details]
M setup.py
M txzookeeper/
A txzookeeper/
M txzookeeper/
M txzookeeper/
M txzookeeper/
A txzookeeper/
M txzookeeper/