Merge lp:~hazmat/txzookeeper/backoff-retry-managed-sanity into lp:~hazmat/txzookeeper/trunk
- backoff-retry-managed-sanity
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Merge reported by: | Kapil Thangavelu | ||||
Merged at revision: | not available | ||||
Proposed branch: | lp:~hazmat/txzookeeper/backoff-retry-managed-sanity | ||||
Merge into: | lp:~hazmat/txzookeeper/trunk | ||||
Diff against target: |
3336 lines (+2225/-241) 27 files modified
.bzrignore (+4/-0) Makefile (+6/-0) debian/changelog (+18/-0) setup.py (+6/-3) txzookeeper/__init__.py (+5/-2) txzookeeper/client.py (+159/-79) txzookeeper/lock.py (+6/-3) txzookeeper/managed.py (+413/-0) txzookeeper/node.py (+7/-26) txzookeeper/queue.py (+5/-2) txzookeeper/retry.py (+359/-0) txzookeeper/tests/__init__.py (+57/-7) txzookeeper/tests/common.py (+27/-2) txzookeeper/tests/proxy.py (+97/-0) txzookeeper/tests/test_client.py (+63/-77) txzookeeper/tests/test_conn_failure.py (+194/-0) txzookeeper/tests/test_lock.py (+4/-1) txzookeeper/tests/test_managed.py (+309/-0) txzookeeper/tests/test_node.py (+6/-3) txzookeeper/tests/test_queue.py (+5/-2) txzookeeper/tests/test_retry.py (+332/-0) txzookeeper/tests/test_security.py (+4/-1) txzookeeper/tests/test_session.py (+111/-20) txzookeeper/tests/test_utils.py (+4/-1) txzookeeper/tests/utils.py (+4/-1) txzookeeper/todo.txt (+1/-9) txzookeeper/utils.py (+19/-2) |
||||
To merge this branch: | bzr merge lp:~hazmat/txzookeeper/backoff-retry-managed-sanity | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Kapil Thangavelu | Pending | ||
Review via email: mp+146938@code.launchpad.net |
Commit message
Description of the change
Session management and retry improvements.
Lots of improvements and several bug fixes.
Previously managed clientwas returning the underlying session client instead
of retry, so retry wasn't enabled for uses trying to catch/store the client from
connect.
The retry code was using seconds for session timeout instead of milliseconds.
The retry code wasn't taking account of the start of retry properly when
determining max retry.
The retry on persistent error wasn't signaling to the managed client to
establish a new session.
The client code wasn't properly cleaning up txzk handles on conn timeouts.
The session establishment now uses the client session events so even without
activity or active watches the session is restablished.
Session restablishment will now backoff on reconnects for up to 6m.
Kapil Thangavelu (hazmat) wrote : | # |
Kapil Thangavelu (hazmat) wrote : | # |
Please take a look.
- 52. By Kapil Thangavelu
-
much better session recovery and retry handling
Kapil Thangavelu (hazmat) wrote : | # |
Please take a look.
Kapil Thangavelu (hazmat) wrote : | # |
for reasons unknown the diff on this very wrong, all the other revs other than 52 are already on trunk.
Benjamin Saller (bcsaller) wrote : | # |
Hi,
Thanks for the branch. Initially I had an older version of pyzookeeper in my path and the tests were segfaulting, but after tracking that down I was able to get things working smoothly.
Coverage shows that the session backoff code isn't tested, if I'm reading it properly self._backoff_
It looks like retry_error_
Nice simplification in node.py.
This looks good but I would like to see a few more tests around the retry path.
- 53. By Kapil Thangavelu
-
app errors don't trigger reconnect
Kapil Thangavelu (hazmat) wrote : | # |
the test for backoff are in test_session which operates a full zk cluster
and has a test case that verifies backoff has run. there's a new makefile
with one target .. make coverage that runs through the full test suite. in
testing with juju environments, i discovered another minor issue with
retry.
thanks for the review.
-k
On Wed, Feb 6, 2013 at 7:15 PM, Benjamin Saller <
<email address hidden>> wrote:
> Hi,
>
> Thanks for the branch. Initially I had an older version of pyzookeeper in
> my path and the tests were segfaulting, but after tracking that down I was
> able to get things working smoothly.
>
> Coverage shows that the session backoff code isn't tested, if I'm reading
> it properly self._backoff_
> some of the focus of this branch goes without regression testing.
>
> It looks like retry_error_
> tests either.
>
> Nice simplification in node.py.
>
> This looks good but I would like to see a few more tests around the retry
> path.
>
>
>
> --
>
> https:/
> You are the owner of lp:~hazmat/txzookeeper/backoff-retry-managed-sanity.
>
- 54. By Kapil Thangavelu
-
argh.. conditional fail
- 55. By Kapil Thangavelu
-
prevent forced reconnect hurds
Mikael Uvebrandt (mikael-uvebrandt) wrote : | # |
Would love to see this in a release soon - we've seen this reconnect hammering in production, and it's conceivable that it contributed to killing the zookeeper (3.3.3) cluster in one instance (open files limit hit).
- 56. By Kapil Thangavelu
-
incr rev
- 57. By Kapil Thangavelu
-
fix typo in reconnect test assert, cleanup hurd logic
- 58. By Kapil Thangavelu
-
work around libzk bug to make session expiration tests more reliable
Kapil Thangavelu (hazmat) wrote : | # |
Merged to trunk.
Preview Diff
1 | === added file '.bzrignore' |
2 | --- .bzrignore 1970-01-01 00:00:00 +0000 |
3 | +++ .bzrignore 2013-05-15 13:35:05 +0000 |
4 | @@ -0,0 +1,4 @@ |
5 | +.emacs.desktop |
6 | +_trial_temp |
7 | +txzookeeper |
8 | +txzookeeper.egg-info |
9 | |
10 | === added file 'Makefile' |
11 | --- Makefile 1970-01-01 00:00:00 +0000 |
12 | +++ Makefile 2013-05-15 13:35:05 +0000 |
13 | @@ -0,0 +1,6 @@ |
14 | +COVERAGE_FILES=`find txzookeeper -name "*py" | grep -v "tests"` |
15 | + |
16 | +coverage: |
17 | + python -c "import coverage as c; c.main()" run /usr/bin/trial txzookeeper |
18 | + python -c "import coverage as c; c.main()" html -d htmlcov $(COVERAGE_FILES) |
19 | + gnome-open htmlcov/index.html |
20 | \ No newline at end of file |
21 | |
22 | === modified file 'debian/changelog' |
23 | --- debian/changelog 2011-07-11 21:29:44 +0000 |
24 | +++ debian/changelog 2013-05-15 13:35:05 +0000 |
25 | @@ -1,3 +1,21 @@ |
26 | +txzookeeper (0.9.8-0ubuntu1) raring; urgency=low |
27 | + |
28 | + * Better reconnect/retry behavior. |
29 | + |
30 | + -- kapil <kapil@objectrealms.net> Mon, 6 May 2013 08:54:08 -0400 |
31 | + |
32 | +txzookeeper (0.9.7-0ubuntu1) quantal; urgency=low |
33 | + |
34 | + * Path included on exceptions, connection close bugfix |
35 | + |
36 | + -- kapil <kapil@objectrealms.net> Wed, 19 Sep 2012 08:54:08 -0400 |
37 | + |
38 | +txzookeeper (0.9.6-0ubuntu3) quantal; urgency=low |
39 | + |
40 | + * managed client implementation with auto session expiration behavior |
41 | + |
42 | + -- kapil <kapil@objectrealms.net> Fri, 22 Jun 2012 03:00:08 -0400 |
43 | + |
44 | txzookeeper (0.8.0-0ubuntu1) natty; urgency=low |
45 | |
46 | * Much improved session event and connection error handling (and cluster tests) |
47 | |
48 | === modified file 'setup.py' |
49 | --- setup.py 2011-06-17 23:52:34 +0000 |
50 | +++ setup.py 2013-05-15 13:35:05 +0000 |
51 | @@ -3,6 +3,9 @@ |
52 | # |
53 | # This file is part of txzookeeper. |
54 | # |
55 | +# Authors: |
56 | +# Kapil Thangavelu |
57 | +# |
58 | # txzookeeper is free software: you can redistribute it and/or modify |
59 | # it under the terms of the GNU Lesser General Public License as published by |
60 | # the Free Software Foundation, either version 3 of the License, or |
61 | @@ -42,8 +45,8 @@ |
62 | name="txzookeeper", |
63 | version=version, |
64 | description="Twisted api for Apache Zookeeper", |
65 | - author="Ensemble Developers", |
66 | - author_email="ensemble@lists.ubuntu.com", |
67 | + author="Juju Developers", |
68 | + author_email="juju@lists.ubuntu.com", |
69 | url="https://launchpad.net/txzookeeper", |
70 | license="LGPL", |
71 | packages=find_packages(), |
72 | @@ -55,6 +58,6 @@ |
73 | "Intended Audience :: Information Technology", |
74 | "Programming Language :: Python", |
75 | "Topic :: Database", |
76 | - "License :: OSI Approved :: MIT License", |
77 | + "License :: OSI Approved :: GNU Library or Lesser General Public License (LGPL)" |
78 | ], |
79 | ) |
80 | |
81 | === modified file 'txzookeeper/__init__.py' |
82 | --- txzookeeper/__init__.py 2011-07-06 02:10:17 +0000 |
83 | +++ txzookeeper/__init__.py 2013-05-15 13:35:05 +0000 |
84 | @@ -1,8 +1,11 @@ |
85 | # |
86 | -# Copyright (C) 2010, 2011 Canonical Ltd. All Rights Reserved |
87 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
88 | # |
89 | # This file is part of txzookeeper. |
90 | # |
91 | +# Authors: |
92 | +# Kapil Thangavelu |
93 | +# |
94 | # txzookeeper is free software: you can redistribute it and/or modify |
95 | # it under the terms of the GNU Lesser General Public License as published by |
96 | # the Free Software Foundation, either version 3 of the License, or |
97 | @@ -22,4 +25,4 @@ |
98 | __all__ = ["ZookeeperClient"] |
99 | |
100 | # Remember to update debian/changelog as well, for the daily build. |
101 | -version = "0.8.0" |
102 | +version = "0.9.8" |
103 | |
104 | === modified file 'txzookeeper/client.py' |
105 | --- txzookeeper/client.py 2011-06-30 20:16:17 +0000 |
106 | +++ txzookeeper/client.py 2013-05-15 13:35:05 +0000 |
107 | @@ -1,8 +1,11 @@ |
108 | # |
109 | -# Copyright (C) 2010, 2011 Canonical Ltd. All Rights Reserved |
110 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
111 | # |
112 | # This file is part of txzookeeper. |
113 | # |
114 | +# Authors: |
115 | +# Kapil Thangavelu |
116 | +# |
117 | # txzookeeper is free software: you can redistribute it and/or modify |
118 | # it under the terms of the GNU Lesser General Public License as published by |
119 | # the Free Software Foundation, either version 3 of the License, or |
120 | @@ -33,6 +36,9 @@ |
121 | "scheme": "world", |
122 | "id": "anyone"} |
123 | |
124 | +# Skip acls for policy objects @ a higher level |
125 | +SKIP_ACLS = object() |
126 | + |
127 | # Map result codes to exceptions classes. |
128 | ERROR_MAPPING = { |
129 | zookeeper.APIERROR: zookeeper.ApiErrorException, |
130 | @@ -67,6 +73,7 @@ |
131 | zookeeper.CONNECTED_STATE: "connected", |
132 | zookeeper.CONNECTING_STATE: "connecting", |
133 | zookeeper.EXPIRED_SESSION_STATE: "expired", |
134 | + None: "unknown", |
135 | } |
136 | |
137 | # Mapping of event type to human string. |
138 | @@ -76,7 +83,9 @@ |
139 | zookeeper.CREATED_EVENT: "created", |
140 | zookeeper.DELETED_EVENT: "deleted", |
141 | zookeeper.CHANGED_EVENT: "changed", |
142 | - zookeeper.CHILD_EVENT: "child"} |
143 | + zookeeper.CHILD_EVENT: "child", |
144 | + None: "unknown", |
145 | + } |
146 | |
147 | |
148 | class NotConnectedException(zookeeper.ZooKeeperException): |
149 | @@ -99,9 +108,14 @@ |
150 | def type_name(self): |
151 | return TYPE_NAME_MAPPING[self.args[1]] |
152 | |
153 | + @property |
154 | + def handle(self): |
155 | + return self.args[3] |
156 | + |
157 | def __str__(self): |
158 | - return "<txzookeeper.ConnectionException type: %s state: %s>" % ( |
159 | - self.type_name, self.state_name) |
160 | + return ( |
161 | + "<txzookeeper.ConnectionException handle: %s type: %s state: %s>" |
162 | + % (self.handle, self.type_name, self.state_name)) |
163 | |
164 | |
165 | def is_connection_exception(e): |
166 | @@ -122,7 +136,8 @@ |
167 | """ |
168 | |
169 | |
170 | -class ClientEvent(namedtuple("ClientEvent", 'type, connection_state, path')): |
171 | +class ClientEvent(namedtuple("ClientEvent", |
172 | + 'type, connection_state, path, handle')): |
173 | """ |
174 | A client event is returned when a watch deferred fires. It denotes |
175 | some event on the zookeeper client that the watch was requested on. |
176 | @@ -137,8 +152,8 @@ |
177 | return STATE_NAME_MAPPING[self.connection_state] |
178 | |
179 | def __repr__(self): |
180 | - return "<ClientEvent %s at %r state: %s>" % ( |
181 | - self.type_name, self.path, self.state_name) |
182 | + return "<ClientEvent %s at %r state: %s handle:%s>" % ( |
183 | + self.type_name, self.path, self.state_name, self.handle) |
184 | |
185 | |
186 | class ZookeeperClient(object): |
187 | @@ -166,12 +181,21 @@ |
188 | self.connected = False |
189 | self.handle = None |
190 | |
191 | + def __repr__(self): |
192 | + if not self.client_id: |
193 | + session_id = "" |
194 | + else: |
195 | + session_id = self.client_id[0] |
196 | + |
197 | + return "<%s session: %s handle: %r state: %s>" % ( |
198 | + self.__class__.__name__, session_id, self.handle, self.state) |
199 | + |
200 | def _check_connected(self, d): |
201 | if not self.connected: |
202 | d.errback(NotConnectedException("not connected")) |
203 | return d |
204 | |
205 | - def _check_result(self, result_code, deferred, extra_codes=()): |
206 | + def _check_result(self, result_code, deferred, extra_codes=(), path=None): |
207 | """Check an API call or result for errors. |
208 | |
209 | :param result_code: The api result code. |
210 | @@ -184,18 +208,18 @@ |
211 | error = None |
212 | if not result_code == zookeeper.OK and not result_code in extra_codes: |
213 | error_msg = zookeeper.zerror(result_code) |
214 | + if path is not None: |
215 | + error_msg += " %s" % path |
216 | error_class = ERROR_MAPPING.get( |
217 | result_code, zookeeper.ZooKeeperException) |
218 | error = error_class(error_msg) |
219 | |
220 | if is_connection_exception(error): |
221 | - # Mark the client as disconnected. |
222 | - self.connected = False |
223 | # Route connection errors to a connection level error |
224 | # handler if specified. |
225 | if self._connection_error_callback: |
226 | # The result of the connection error handler is returned |
227 | - # to the api. |
228 | + # to the api invoker. |
229 | d = defer.maybeDeferred( |
230 | self._connection_error_callback, |
231 | self, error) |
232 | @@ -212,14 +236,14 @@ |
233 | return d |
234 | |
235 | def _cb_get(result_code, value, stat): |
236 | - if self._check_result(result_code, d): |
237 | + if self._check_result(result_code, d, path=path): |
238 | return |
239 | d.callback((value, stat)) |
240 | |
241 | callback = self._zk_thread_callback(_cb_get) |
242 | - watcher = self._wrap_watcher(watcher) |
243 | + watcher = self._wrap_watcher(watcher, "get", path) |
244 | result = zookeeper.aget(self.handle, path, watcher, callback) |
245 | - self._check_result(result, d) |
246 | + self._check_result(result, d, path=path) |
247 | return d |
248 | |
249 | def _get_children(self, path, watcher): |
250 | @@ -228,14 +252,14 @@ |
251 | return d |
252 | |
253 | def _cb_get_children(result_code, children): |
254 | - if self._check_result(result_code, d): |
255 | + if self._check_result(result_code, d, path=path): |
256 | return |
257 | d.callback(children) |
258 | |
259 | callback = self._zk_thread_callback(_cb_get_children) |
260 | - watcher = self._wrap_watcher(watcher) |
261 | + watcher = self._wrap_watcher(watcher, "child", path) |
262 | result = zookeeper.aget_children(self.handle, path, watcher, callback) |
263 | - self._check_result(result, d) |
264 | + self._check_result(result, d, path=path) |
265 | return d |
266 | |
267 | def _exists(self, path, watcher): |
268 | @@ -245,17 +269,17 @@ |
269 | |
270 | def _cb_exists(result_code, stat): |
271 | if self._check_result( |
272 | - result_code, d, extra_codes=(zookeeper.NONODE,)): |
273 | + result_code, d, extra_codes=(zookeeper.NONODE,), path=path): |
274 | return |
275 | d.callback(stat) |
276 | |
277 | callback = self._zk_thread_callback(_cb_exists) |
278 | - watcher = self._wrap_watcher(watcher) |
279 | + watcher = self._wrap_watcher(watcher, "exists", path) |
280 | result = zookeeper.aexists(self.handle, path, watcher, callback) |
281 | - self._check_result(result, d) |
282 | + self._check_result(result, d, path=path) |
283 | return d |
284 | |
285 | - def _wrap_watcher(self, watcher): |
286 | + def _wrap_watcher(self, watcher, watch_type, path): |
287 | if watcher is None: |
288 | return watcher |
289 | if not callable(watcher): |
290 | @@ -273,18 +297,31 @@ |
291 | if event_type == zookeeper.SESSION_EVENT: |
292 | if self._session_event_callback: |
293 | self._session_event_callback( |
294 | - self, ClientEvent(event_type, conn_state, path)) |
295 | + self, ClientEvent( |
296 | + event_type, conn_state, path, self.handle)) |
297 | + # We do propagate to watch deferreds, in one case in |
298 | + # particular, namely if the session is expired, in which |
299 | + # case the watches are dead, and we send an appropriate |
300 | + # error. |
301 | + if conn_state == zookeeper.EXPIRED_SESSION_STATE: |
302 | + error = zookeeper.SessionExpiredException("Session expired") |
303 | + return watcher(None, None, None, error=error) |
304 | else: |
305 | return watcher(event_type, conn_state, path) |
306 | |
307 | - def _zk_thread_callback(self, func): |
308 | + def _zk_thread_callback(self, func, *f_args, **f_kw): |
309 | """ |
310 | The client library invokes callbacks in a separate thread, we wrap |
311 | any user defined callback so that they are called back in the main |
312 | thread after, zookeeper calls the wrapper. |
313 | """ |
314 | + f_args = list(f_args) |
315 | + |
316 | def wrapper(handle, *args): # pragma: no cover |
317 | - reactor.callFromThread(func, *args) |
318 | + # make a copy, the conn watch callback gets invoked multiple times |
319 | + cb_args = list(f_args) |
320 | + cb_args.extend(args) |
321 | + reactor.callFromThread(func, *cb_args, **f_kw) |
322 | return wrapper |
323 | |
324 | @property |
325 | @@ -298,7 +335,8 @@ |
326 | @property |
327 | def session_timeout(self): |
328 | """ |
329 | - What's the negotiated session timeout for this connection, in seconds. |
330 | + The negotiated session timeout for this connection, in milliseconds. |
331 | + |
332 | If the client is not connected the value is None. |
333 | """ |
334 | if self.connected: |
335 | @@ -326,7 +364,11 @@ |
336 | """ |
337 | if self.handle is None: |
338 | return None |
339 | - return zookeeper.client_id(self.handle) |
340 | + try: |
341 | + return zookeeper.client_id(self.handle) |
342 | + # Invalid handle |
343 | + except zookeeper.ZooKeeperException: |
344 | + return None |
345 | |
346 | @property |
347 | def unrecoverable(self): |
348 | @@ -334,7 +376,11 @@ |
349 | Boolean value representing whether the current connection can be |
350 | recovered. |
351 | """ |
352 | - return bool(zookeeper.is_unrecoverable(self.handle)) |
353 | + try: |
354 | + return bool(zookeeper.is_unrecoverable(self.handle)) |
355 | + except zookeeper.ZooKeeperException: |
356 | + # guard against invalid handles |
357 | + return True |
358 | |
359 | def add_auth(self, scheme, identity): |
360 | """Adds an authentication identity to this connection. |
361 | @@ -368,15 +414,21 @@ |
362 | @param force: boolean, require the connection to be closed now or |
363 | an exception be raised. |
364 | """ |
365 | - if not self.connected: |
366 | - return |
367 | - |
368 | - result = zookeeper.close(self.handle) |
369 | self.connected = False |
370 | + |
371 | + if self.handle is None: |
372 | + return |
373 | + |
374 | + try: |
375 | + result = zookeeper.close(self.handle) |
376 | + except zookeeper.ZooKeeperException: |
377 | + self.handle = None |
378 | + return |
379 | + |
380 | d = defer.Deferred() |
381 | - |
382 | if self._check_result(result, d): |
383 | return d |
384 | + self.handle = None |
385 | d.callback(True) |
386 | return d |
387 | |
388 | @@ -401,6 +453,12 @@ |
389 | |
390 | # Use a scheduled function to ensure a timeout. |
391 | def _check_timeout(): |
392 | + # Close the handle |
393 | + try: |
394 | + if self.handle is not None: |
395 | + zookeeper.close(self.handle) |
396 | + except zookeeper.ZooKeeperException: |
397 | + pass |
398 | d.errback( |
399 | ConnectionTimeoutException("could not connect before timeout")) |
400 | |
401 | @@ -416,14 +474,13 @@ |
402 | if servers is not None: |
403 | self._servers = servers |
404 | |
405 | - # Assemble client id if specified. |
406 | + # Use client id if specified. |
407 | if client_id: |
408 | self.handle = zookeeper.init( |
409 | self._servers, callback, self._session_timeout, client_id) |
410 | else: |
411 | self.handle = zookeeper.init( |
412 | self._servers, callback, self._session_timeout) |
413 | - |
414 | return d |
415 | |
416 | def _cb_connected( |
417 | @@ -446,15 +503,20 @@ |
418 | # If we timed out and then connected, then close the conn. |
419 | if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called: |
420 | self.close() |
421 | + self.handle = -1 |
422 | + return |
423 | |
424 | # Send session events to the callback, in addition to any |
425 | # duplicate session events that will be sent for extant watches. |
426 | if self._session_event_callback: |
427 | self._session_event_callback( |
428 | - self, ClientEvent(type, state, path)) |
429 | + self, ClientEvent(type, state, path, self.handle)) |
430 | |
431 | return |
432 | - elif state == zookeeper.CONNECTED_STATE: |
433 | + # Connected successfully, or If we're expired on an initial |
434 | + # connect, someone else expired us. |
435 | + elif state in (zookeeper.CONNECTED_STATE, |
436 | + zookeeper.EXPIRED_SESSION_STATE): |
437 | connect_deferred.callback(self) |
438 | return |
439 | |
440 | @@ -470,21 +532,25 @@ |
441 | @params acls: A list of dictionaries specifying permissions. |
442 | @params flags: Node creation flags (ephemeral, sequence, persistent) |
443 | """ |
444 | + if acls == SKIP_ACLS: |
445 | + acls = [ZOO_OPEN_ACL_UNSAFE] |
446 | + |
447 | d = defer.Deferred() |
448 | if self._check_connected(d): |
449 | return d |
450 | |
451 | - def _cb_created(result_code, path): |
452 | - if self._check_result(result_code, d): |
453 | - return |
454 | - d.callback(path) |
455 | - |
456 | - callback = self._zk_thread_callback(_cb_created) |
457 | + callback = self._zk_thread_callback( |
458 | + self._cb_created, d, data, acls, flags) |
459 | result = zookeeper.acreate( |
460 | self.handle, path, data, acls, flags, callback) |
461 | - self._check_result(result, d) |
462 | + self._check_result(result, d, path=path) |
463 | return d |
464 | |
465 | + def _cb_created(self, d, data, acls, flags, result_code, path): |
466 | + if self._check_result(result_code, d, path=path): |
467 | + return |
468 | + d.callback(path) |
469 | + |
470 | def delete(self, path, version=-1): |
471 | """ |
472 | Delete the node at the given path. If the current node version on the |
473 | @@ -496,19 +562,17 @@ |
474 | @param version: the integer version of the node. |
475 | """ |
476 | d = defer.Deferred() |
477 | - if self._check_connected(d): |
478 | - return d |
479 | - |
480 | - def _cb_delete(result_code): |
481 | - if self._check_result(result_code, d): |
482 | - return |
483 | - d.callback(result_code) |
484 | - |
485 | - callback = self._zk_thread_callback(_cb_delete) |
486 | + |
487 | + callback = self._zk_thread_callback(self._cb_deleted, d, path) |
488 | result = zookeeper.adelete(self.handle, path, version, callback) |
489 | - self._check_result(result, d) |
490 | + self._check_result(result, d, path=path) |
491 | return d |
492 | |
493 | + def _cb_deleted(self, d, path, result_code): |
494 | + if self._check_result(result_code, d, path=path): |
495 | + return |
496 | + d.callback(result_code) |
497 | + |
498 | def exists(self, path): |
499 | """ |
500 | Check that the given node path exists. Returns a deferred that |
501 | @@ -531,8 +595,12 @@ |
502 | """ |
503 | d = defer.Deferred() |
504 | |
505 | - def watcher(*args): |
506 | - d.callback(ClientEvent(*args)) |
507 | + def watcher(event_type, conn_state, path, error=None): |
508 | + if error: |
509 | + d.errback(error) |
510 | + else: |
511 | + d.callback(ClientEvent( |
512 | + event_type, conn_state, path, self.handle)) |
513 | return self._exists(path, watcher), d |
514 | |
515 | def get(self, path): |
516 | @@ -556,8 +624,12 @@ |
517 | """ |
518 | d = defer.Deferred() |
519 | |
520 | - def watcher(*args): |
521 | - d.callback(ClientEvent(*args)) |
522 | + def watcher(event_type, conn_state, path, error=None): |
523 | + if error: |
524 | + d.errback(error) |
525 | + else: |
526 | + d.callback(ClientEvent( |
527 | + event_type, conn_state, path, self.handle)) |
528 | return self._get(path, watcher), d |
529 | |
530 | def get_children(self, path): |
531 | @@ -580,8 +652,12 @@ |
532 | """ |
533 | d = defer.Deferred() |
534 | |
535 | - def watcher(*args): |
536 | - d.callback(ClientEvent(*args)) |
537 | + def watcher(event_type, conn_state, path, error=None): |
538 | + if error: |
539 | + d.errback(error) |
540 | + else: |
541 | + d.callback(ClientEvent( |
542 | + event_type, conn_state, path, self.handle)) |
543 | return self._get_children(path, watcher), d |
544 | |
545 | def get_acl(self, path): |
546 | @@ -598,13 +674,13 @@ |
547 | return d |
548 | |
549 | def _cb_get_acl(result_code, acls, stat): |
550 | - if self._check_result(result_code, d): |
551 | + if self._check_result(result_code, d, path=path): |
552 | return |
553 | d.callback((acls, stat)) |
554 | |
555 | callback = self._zk_thread_callback(_cb_get_acl) |
556 | result = zookeeper.aget_acl(self.handle, path, callback) |
557 | - self._check_result(result, d) |
558 | + self._check_result(result, d, path=path) |
559 | return d |
560 | |
561 | def set_acl(self, path, acls, version=-1): |
562 | @@ -634,17 +710,17 @@ |
563 | if self._check_connected(d): |
564 | return d |
565 | |
566 | - def _cb_set_acl(result_code): |
567 | - if self._check_result(result_code, d): |
568 | - return |
569 | - d.callback(result_code) |
570 | - |
571 | - callback = self._zk_thread_callback(_cb_set_acl) |
572 | + callback = self._zk_thread_callback(self._cb_set_acl, d, path, acls) |
573 | result = zookeeper.aset_acl( |
574 | self.handle, path, version, acls, callback) |
575 | - self._check_result(result, d) |
576 | + self._check_result(result, d, path=path) |
577 | return d |
578 | |
579 | + def _cb_set_acl(self, d, path, acls, result_code): |
580 | + if self._check_result(result_code, d, path=path): |
581 | + return |
582 | + d.callback(result_code) |
583 | + |
584 | def set(self, path, data="", version=-1): |
585 | """ |
586 | Sets the data of a node at the given path. If the current node version |
587 | @@ -660,16 +736,16 @@ |
588 | if self._check_connected(d): |
589 | return d |
590 | |
591 | - def _cb_set(result_code, node_stat): |
592 | - if self._check_result(result_code, d): |
593 | - return |
594 | - d.callback(node_stat) |
595 | - |
596 | - callback = self._zk_thread_callback(_cb_set) |
597 | + callback = self._zk_thread_callback(self._cb_set, d, path, data) |
598 | result = zookeeper.aset(self.handle, path, data, version, callback) |
599 | - self._check_result(result, d) |
600 | + self._check_result(result, d, path=path) |
601 | return d |
602 | |
603 | + def _cb_set(self, d, path, data, result_code, node_stat): |
604 | + if self._check_result(result_code, d, path=path): |
605 | + return |
606 | + d.callback(node_stat) |
607 | + |
608 | def set_connection_watcher(self, watcher): |
609 | """ |
610 | Sets a permanent global watcher on the connection. This will get |
611 | @@ -679,7 +755,7 @@ |
612 | """ |
613 | if not callable(watcher): |
614 | raise SyntaxError("Invalid Watcher %r" % (watcher)) |
615 | - watcher = self._wrap_watcher(watcher) |
616 | + watcher = self._wrap_watcher(watcher, None, None) |
617 | zookeeper.set_watcher(self.handle, watcher) |
618 | |
619 | def set_session_callback(self, callback): |
620 | @@ -717,9 +793,13 @@ |
621 | """ |
622 | if not callable(callback): |
623 | raise TypeError("Invalid callback %r" % callback) |
624 | + if self._connection_error_callback is not None: |
625 | + raise RuntimeError(( |
626 | + "Connection error handlers can't be changed %s" % |
627 | + self._connection_error_callback)) |
628 | self._connection_error_callback = callback |
629 | |
630 | - def set_determinstic_order(self, boolean): |
631 | + def set_deterministic_order(self, boolean): |
632 | """ |
633 | The zookeeper client will by default randomize the server hosts |
634 | it will connect to unless this is set to True. |
635 | @@ -738,11 +818,11 @@ |
636 | return d |
637 | |
638 | def _cb_sync(result_code, path): |
639 | - if self._check_result(result_code, d): |
640 | + if self._check_result(result_code, d, path=path): |
641 | return |
642 | d.callback(path) |
643 | |
644 | callback = self._zk_thread_callback(_cb_sync) |
645 | result = zookeeper.async(self.handle, path, callback) |
646 | - self._check_result(result, d) |
647 | + self._check_result(result, d, path=path) |
648 | return d |
649 | |
650 | === modified file 'txzookeeper/lock.py' |
651 | --- txzookeeper/lock.py 2011-06-17 23:52:34 +0000 |
652 | +++ txzookeeper/lock.py 2013-05-15 13:35:05 +0000 |
653 | @@ -1,8 +1,11 @@ |
654 | # |
655 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
656 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
657 | # |
658 | # This file is part of txzookeeper. |
659 | # |
660 | +# Authors: |
661 | +# Kapil Thangavelu |
662 | +# |
663 | # txzookeeper is free software: you can redistribute it and/or modify |
664 | # it under the terms of the GNU Lesser General Public License as published by |
665 | # the Free Software Foundation, either version 3 of the License, or |
666 | @@ -18,7 +21,7 @@ |
667 | # |
668 | |
669 | import zookeeper |
670 | -from twisted.internet.defer import Deferred, fail |
671 | +from twisted.internet.defer import fail |
672 | |
673 | |
674 | class LockError(Exception): |
675 | @@ -133,7 +136,7 @@ |
676 | """Release the lock.""" |
677 | |
678 | if not self._acquired: |
679 | - error = LockError("Not holding lock %s"%(self.path)) |
680 | + error = LockError("Not holding lock %s" % (self.path)) |
681 | return fail(error) |
682 | |
683 | d = self._client.delete(self._candidate_path) |
684 | |
685 | === added file 'txzookeeper/managed.py' |
686 | --- txzookeeper/managed.py 1970-01-01 00:00:00 +0000 |
687 | +++ txzookeeper/managed.py 2013-05-15 13:35:05 +0000 |
688 | @@ -0,0 +1,413 @@ |
689 | + |
690 | +from functools import partial |
691 | + |
692 | +import contextlib |
693 | +import logging |
694 | +import time |
695 | +import zookeeper |
696 | + |
697 | +from twisted.internet.defer import ( |
698 | + inlineCallbacks, DeferredLock, fail, returnValue, Deferred) |
699 | + |
700 | +from client import ( |
701 | + ZookeeperClient, ClientEvent, NotConnectedException, |
702 | + ConnectionTimeoutException) |
703 | +from retry import RetryClient, is_session_error |
704 | +from utils import sleep |
705 | + |
706 | + |
707 | +class StopWatcher(Exception): |
708 | + pass |
709 | + |
710 | +BACKOFF_INCREMENT = 10 |
711 | +MAX_BACKOFF = 360 |
712 | + |
713 | +WATCH_KIND_MAP = { |
714 | + "child": "get_children_and_watch", |
715 | + "exists": "exists_and_watch", |
716 | + "get": "get_and_watch"} |
717 | + |
718 | + |
719 | +log = logging.getLogger("txzk.managed") |
720 | +#log.setLevel(logging.INFO) |
721 | + |
722 | + |
723 | +class Watch(object): |
724 | + """ |
725 | + For application driven persistent watches, where the application |
726 | + is manually resetting the watch. |
727 | + """ |
728 | + |
729 | + __slots__ = ("_mgr", "_client", "_path", "_kind", "_callback") |
730 | + |
731 | + def __init__(self, mgr, path, kind, callback): |
732 | + self._mgr = mgr |
733 | + self._path = path |
734 | + self._kind = kind |
735 | + self._callback = callback |
736 | + |
737 | + @property |
738 | + def path(self): |
739 | + return self._path |
740 | + |
741 | + @property |
742 | + def kind(self): |
743 | + return self._kind |
744 | + |
745 | + @contextlib.contextmanager |
746 | + def _ctx(self): |
747 | + mgr = self._mgr |
748 | + del self._mgr |
749 | + try: |
750 | + yield mgr |
751 | + finally: |
752 | + mgr.remove(self) |
753 | + |
754 | + @inlineCallbacks |
755 | + def reset(self): |
756 | + with self._ctx(): |
757 | + yield self._callback( |
758 | + zookeeper.SESSION_EVENT, |
759 | + zookeeper.CONNECTED_STATE, |
760 | + self._path) |
761 | + |
762 | + def __call__(self, *args, **kw): |
763 | + with self._ctx(): |
764 | + return self._callback(*args, **kw) |
765 | + |
766 | + def __str__(self): |
767 | + return "<Watcher %s %s %r>" % (self.kind, self.path, self._callback) |
768 | + |
769 | + |
770 | +class WatchManager(object): |
771 | + |
772 | + watch_class = Watch |
773 | + |
774 | + def __init__(self): |
775 | + self._watches = [] |
776 | + |
777 | + def add(self, path, watch_type, watcher): |
778 | + w = self.watch_class(self, path, watch_type, watcher) |
779 | + self._watches.append(w) |
780 | + return w |
781 | + |
782 | + def remove(self, w): |
783 | + try: |
784 | + self._watches.remove(w) |
785 | + except ValueError: |
786 | + pass |
787 | + |
788 | + def iterkeys(self): |
789 | + for w in self._watches: |
790 | + yield (w.path, w.kind) |
791 | + |
792 | + def clear(self): |
793 | + del self._watches |
794 | + self._watches = [] |
795 | + |
796 | + @inlineCallbacks |
797 | + def reset(self, *ignored): |
798 | + watches = self._watches |
799 | + self._watches = [] |
800 | + |
801 | + for w in watches: |
802 | + try: |
803 | + yield w.reset() |
804 | + except Exception, e: |
805 | + log.error("Error reseting watch %s with session event. %s %r", |
806 | + w, e, e) |
807 | + continue |
808 | + |
809 | + |
810 | +class SessionClient(ZookeeperClient): |
811 | + """A managed client that automatically re-establishes ephemerals and |
812 | + triggers watches after reconnecting post session expiration. |
813 | + |
814 | + This abstracts the client from session expiration handling. It does |
815 | + come at a cost though. |
816 | + |
817 | + There are two application constraints that need to be considered for usage |
818 | + of the SessionClient or ManagedClient. The first is that watch callbacks |
819 | + which examine the event, must be able to handle the synthetic session |
820 | + event which is sent to them when the session is re-established. |
821 | + |
822 | + The second and more problematic is that algorithms/patterns |
823 | + utilizing ephemeral sequence nodes need to be rethought, as the |
824 | + session client will recreate the nodes when reconnecting at their |
825 | + previous paths. Some algorithms (like the std zk lock recipe) rely |
826 | + on properties like the smallest valued ephemeral sequence node in |
827 | + a container to identify the lock holder, with the notion that upon |
828 | + session expiration a new lock/leader will be sought. Sequence |
829 | + ephemeral node recreation in this context is problematic as the |
830 | + node is recreated at the exact previous path. Alternative lock |
831 | + strategies that do work are fairly simple at low volume, such as |
832 | + owning a particular node path (ie. /locks/lock-holder) with an |
833 | + ephemeral. |
834 | + |
835 | + As a result the session client only tracks and restablishes non sequence |
836 | + ephemeral nodes. For coordination around ephemeral sequence nodes it |
837 | + provides for watching for the establishment of new sessions via |
838 | + `subscribe_new_session` |
839 | + """ |
840 | + |
841 | + def __init__(self, servers=None, session_timeout=None, |
842 | + connect_timeout=4000): |
843 | + """ |
844 | + """ |
845 | + super(SessionClient, self).__init__(servers, session_timeout) |
846 | + self._connect_timeout = connect_timeout |
847 | + self._watches = WatchManager() |
848 | + self._ephemerals = {} |
849 | + self._session_notifications = [] |
850 | + self._reconnect_lock = DeferredLock() |
851 | + self.set_connection_error_callback(self._cb_connection_error) |
852 | + self.set_session_callback(self._cb_session_event) |
853 | + self._backoff_seconds = 0 |
854 | + self._last_reconnect = time.time() |
855 | + |
856 | + def subscribe_new_session(self): |
857 | + d = Deferred() |
858 | + self._session_notifications.append(d) |
859 | + return d |
860 | + |
861 | + @inlineCallbacks |
862 | + def cb_restablish_session(self, e=None, forced=False): |
863 | + """Called on intercept of session expiration to create new session. |
864 | + |
865 | + This will reconnect to zk, re-establish ephemerals, and |
866 | + trigger watches. |
867 | + """ |
868 | + yield self._reconnect_lock.acquire() |
869 | + log.debug( |
870 | + "Connection reconnect, lock acquired handle:%d", self.handle) |
871 | + |
872 | + try: |
873 | + # If its been explicitly closed, don't re-establish. |
874 | + if self.handle is None: |
875 | + log.debug("No handle, client closed") |
876 | + return |
877 | + |
878 | + # Don't allow forced reconnect hurds within a session. |
879 | + if forced and ( |
880 | + (time.time() - self._last_reconnect) |
881 | + < self.session_timeout / 1000.0): |
882 | + forced = False |
883 | + |
884 | + if not forced and not self.unrecoverable: |
885 | + log.debug("Client already connected, allowing retry") |
886 | + return |
887 | + elif self.connected or self.handle >= 0: |
888 | + self.close() |
889 | + self.handle = -1 |
890 | + |
891 | + # Re-establish |
892 | + yield self._cb_restablish_session().addErrback( |
893 | + self._cb_restablish_errback, e) |
894 | + |
895 | + except Exception, e: |
896 | + log.error("error while re-establish %r %s" % (e, e)) |
897 | + finally: |
898 | + log.debug("Reconnect lock released %s", self) |
899 | + yield self._reconnect_lock.release() |
900 | + |
901 | + @inlineCallbacks |
902 | + def _cb_restablish_session(self): |
903 | + """Re-establish a new session, and recreate ephemerals and watches. |
904 | + """ |
905 | + # Reconnect |
906 | + while 1: |
907 | + log.debug("Reconnect loop - connect timeout %d", |
908 | + self._connect_timeout) |
909 | + |
910 | + # If we have some failures, back off |
911 | + if self._backoff_seconds: |
912 | + log.debug("Backing off reconnect %d" % self._backoff_seconds) |
913 | + yield sleep(self._backoff_seconds) |
914 | + |
915 | + # The client was explicitly closed, abort reconnect. |
916 | + if self.handle is None: |
917 | + returnValue(self.handle) |
918 | + try: |
919 | + yield self.connect(timeout=self._connect_timeout) |
920 | + log.info("Restablished connection") |
921 | + self._last_reconnect = time.time() |
922 | + except ConnectionTimeoutException: |
923 | + log.debug("Timeout establishing connection, retrying...") |
924 | + except zookeeper.ZooKeeperException, e: |
925 | + log.exception("Error while connecting %r %s" % (e, e)) |
926 | + except Exception, e: |
927 | + log.info("Reconnect unknown error, aborting: %s", e) |
928 | + raise |
929 | + else: |
930 | + break |
931 | + |
932 | + if self._backoff_seconds < MAX_BACKOFF: |
933 | + self._backoff_seconds = min( |
934 | + self._backoff_seconds + BACKOFF_INCREMENT, |
935 | + MAX_BACKOFF - 1) |
936 | + |
937 | + # Recreate ephemerals |
938 | + items = self._ephemerals.items() |
939 | + self._ephemerals = {} |
940 | + |
941 | + for path, e in items: |
942 | + try: |
943 | + yield self.create( |
944 | + path, e['data'], acls=e['acls'], flags=e['flags']) |
945 | + except zookeeper.NodeExistsException: |
946 | + log.error("Attempt to create ephemeral node failed %r", path) |
947 | + |
948 | + # Signal watches |
949 | + yield self._watches.reset() |
950 | + |
951 | + # Notify new session observers |
952 | + notifications = self._session_notifications |
953 | + self._session_notifications = [] |
954 | + |
955 | + # all good, reset backoff |
956 | + self._backoff_seconds = 0 |
957 | + |
958 | + for n in notifications: |
959 | + n.callback(True) |
960 | + |
961 | + def _cb_restablish_errback(self, err, failure): |
962 | + """If there's an error re-establishing the session log it. |
963 | + """ |
964 | + log.error("Error while trying to re-establish connection %s\n%s" % ( |
965 | + err, failure)) |
966 | + return failure |
967 | + |
968 | + @inlineCallbacks |
969 | + def _cb_connection_error(self, client, error): |
970 | + """Convert session expiration to a transient connection error. |
971 | + |
972 | + Dispatches from api usage error. |
973 | + """ |
974 | + if not is_session_error(error): |
975 | + raise error |
976 | + log.debug("Connection error detected, delaying retry...") |
977 | + yield sleep(1) |
978 | + raise zookeeper.ConnectionLossException |
979 | + |
980 | + # Dispatch from retry exceed session maximum |
981 | + def cb_retry_expired(self, error): |
982 | + log.debug("Persistent retry error, reconnecting...") |
983 | + return self.cb_restablish_session(forced=True) |
984 | + |
985 | + # Dispatch from connection events |
986 | + def _cb_session_event(self, client, event): |
987 | + if (event.type == zookeeper.SESSION_EVENT and |
988 | + event.connection_state == zookeeper.EXPIRED_SESSION_STATE): |
989 | + log.debug("Client session expired event, restablishing") |
990 | + self.cb_restablish_session() |
991 | + |
992 | + # Client connected tracker on client operations. |
993 | + def _check_connected(self, d): |
994 | + """Clients are automatically reconnected.""" |
995 | + if self.connected: |
996 | + return |
997 | + |
998 | + if self.handle is None: |
999 | + d.errback(NotConnectedException("Connection closed")) |
1000 | + return d |
1001 | + |
1002 | + log.info("Detected dead connection, reconnecting...") |
1003 | + c_d = self.cb_restablish_session() |
1004 | + |
1005 | + def after_connected(client): |
1006 | + """Return a transient connection failure. |
1007 | + |
1008 | + The retry client will automatically attempt to retry the operation. |
1009 | + """ |
1010 | + log.debug("Reconnected, returning transient error") |
1011 | + return fail(zookeeper.ConnectionLossException("Retry")) |
1012 | + |
1013 | + c_d.addCallback(after_connected) |
1014 | + c_d.chainDeferred(d) |
1015 | + return d |
1016 | + |
1017 | + # Dispatch from node watches on session expiration |
1018 | + def _watch_session_wrapper(self, watcher, event_type, conn_state, path): |
1019 | + """Watch wrapper that diverts session events to a connection callback. |
1020 | + """ |
1021 | + if (event_type == zookeeper.SESSION_EVENT and |
1022 | + conn_state == zookeeper.EXPIRED_SESSION_STATE): |
1023 | + if self.unrecoverable: |
1024 | + log.debug("Watch got session expired event, reconnecting...") |
1025 | + d = self.cb_restablish_session() |
1026 | + d.addErrback(self._cb_restablish_errback) |
1027 | + return d |
1028 | + |
1029 | + if event_type == zookeeper.SESSION_EVENT: |
1030 | + if self._session_event_callback: |
1031 | + self._session_event_callback( |
1032 | + self, ClientEvent( |
1033 | + event_type, conn_state, path, self.handle)) |
1034 | + else: |
1035 | + return watcher(event_type, conn_state, path) |
1036 | + |
1037 | + # Track all watches |
1038 | + def _wrap_watcher(self, watcher, watch_type, path): |
1039 | + if watcher is None: |
1040 | + return watcher |
1041 | + if not callable(watcher): |
1042 | + raise SyntaxError("invalid watcher") |
1043 | + |
1044 | + # handle conn watcher, separately. |
1045 | + if watch_type is None and path is None: |
1046 | + return self._zk_thread_callback( |
1047 | + self._watch_session_wrapper, watcher) |
1048 | + |
1049 | + return self._zk_thread_callback( |
1050 | + partial( |
1051 | + self._watch_session_wrapper, |
1052 | + self._watches.add(path, watch_type, watcher))) |
1053 | + |
1054 | + # Track ephemerals |
1055 | + def _cb_created(self, d, data, acls, flags, result_code, path): |
1056 | + if self._check_result(result_code, d, path=path): |
1057 | + return |
1058 | + |
1059 | + if (flags & zookeeper.EPHEMERAL) and not (flags & zookeeper.SEQUENCE): |
1060 | + self._ephemerals[path] = dict( |
1061 | + data=data, acls=acls, flags=flags) |
1062 | + |
1063 | + d.callback(path) |
1064 | + |
1065 | + def _cb_deleted(self, d, path, result_code): |
1066 | + if self._check_result(result_code, d, path=path): |
1067 | + return |
1068 | + |
1069 | + self._ephemerals.pop(path, None) |
1070 | + d.callback(result_code) |
1071 | + |
1072 | + def _cb_set_acl(self, d, path, acls, result_code): |
1073 | + if self._check_result(result_code, d, path=path): |
1074 | + return |
1075 | + |
1076 | + if path in self._ephemerals: |
1077 | + self._ephemerals[path]['acls'] = acls |
1078 | + |
1079 | + d.callback(result_code) |
1080 | + |
1081 | + def _cb_set(self, d, path, data, result_code, node_stat): |
1082 | + if self._check_result(result_code, d, path=path): |
1083 | + return |
1084 | + |
1085 | + if path in self._ephemerals: |
1086 | + self._ephemerals[path]['data'] = data |
1087 | + |
1088 | + d.callback(node_stat) |
1089 | + |
1090 | + |
1091 | +class _ManagedClient(RetryClient): |
1092 | + |
1093 | + def subscribe_new_session(self): |
1094 | + return self.client.subscribe_new_session() |
1095 | + |
1096 | + |
1097 | +def ManagedClient(servers=None, session_timeout=None, connect_timeout=10000): |
1098 | + client = SessionClient(servers, session_timeout, connect_timeout) |
1099 | + managed_client = _ManagedClient(client) |
1100 | + managed_client.set_retry_error_callback(client.cb_retry_expired) |
1101 | + return managed_client |
1102 | |
1103 | === modified file 'txzookeeper/node.py' |
1104 | --- txzookeeper/node.py 2011-06-17 23:52:34 +0000 |
1105 | +++ txzookeeper/node.py 2013-05-15 13:35:05 +0000 |
1106 | @@ -1,8 +1,11 @@ |
1107 | # |
1108 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
1109 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
1110 | # |
1111 | # This file is part of txzookeeper. |
1112 | # |
1113 | +# Authors: |
1114 | +# Kapil Thangavelu |
1115 | +# |
1116 | # txzookeeper is free software: you can redistribute it and/or modify |
1117 | # it under the terms of the GNU Lesser General Public License as published by |
1118 | # the Free Software Foundation, either version 3 of the License, or |
1119 | @@ -127,16 +130,9 @@ |
1120 | Returns a boolean based on the node's existence. Also returns a |
1121 | deferred that fires when the node is modified/created/added/deleted. |
1122 | """ |
1123 | - node_changed = Deferred() |
1124 | - |
1125 | - def on_node_event((event, state, path)): |
1126 | - return node_changed.callback( |
1127 | - NodeEvent(event, state, self)) |
1128 | - |
1129 | d, w = self._context.exists_and_watch(self.path) |
1130 | - w.addCallback(on_node_event) |
1131 | d.addCallback(self._on_exists_success) |
1132 | - return d, node_changed |
1133 | + return d, w |
1134 | |
1135 | def _on_get_node_error(self, failure): |
1136 | failure.trap(NoNodeException) |
1137 | @@ -162,17 +158,10 @@ |
1138 | Retrieve the node's data and a deferred that fires when this data |
1139 | changes. |
1140 | """ |
1141 | - node_changed = Deferred() |
1142 | - |
1143 | - def on_node_change((event, status, path)): |
1144 | - node_changed.callback( |
1145 | - NodeEvent(event, status, self)) |
1146 | - |
1147 | d, w = self._context.get_and_watch(self.path) |
1148 | - w.addCallback(on_node_change) |
1149 | d.addCallback(self._on_get_node_success) |
1150 | d.addErrback(self._on_get_node_error) |
1151 | - return d, node_changed |
1152 | + return d, w |
1153 | |
1154 | def set_data(self, data): |
1155 | """Set the node's data.""" |
1156 | @@ -233,17 +222,9 @@ |
1157 | a deferred that fires if a child is added or deleted. Optionally |
1158 | a name prefix may be passed which the child node must abide. |
1159 | """ |
1160 | - children_changed = Deferred() |
1161 | - |
1162 | - def on_child_added_removed((event, status, path)): |
1163 | - # path is the container not the child. |
1164 | - children_changed.callback( |
1165 | - NodeEvent(event, status, self)) |
1166 | - |
1167 | d, w = self._context.get_children_and_watch(self.path) |
1168 | - w.addCallback(on_child_added_removed) |
1169 | d.addCallback(self._on_get_children_filter_results, prefix) |
1170 | - return d, children_changed |
1171 | + return d, w |
1172 | |
1173 | def __cmp__(self, other): |
1174 | return cmp(self.path, other.path) |
1175 | |
1176 | === modified file 'txzookeeper/queue.py' |
1177 | --- txzookeeper/queue.py 2011-06-30 17:14:06 +0000 |
1178 | +++ txzookeeper/queue.py 2013-05-15 13:35:05 +0000 |
1179 | @@ -1,8 +1,11 @@ |
1180 | # |
1181 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
1182 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
1183 | # |
1184 | # This file is part of txzookeeper. |
1185 | # |
1186 | +# Authors: |
1187 | +# Kapil Thangavelu |
1188 | +# |
1189 | # txzookeeper is free software: you can redistribute it and/or modify |
1190 | # it under the terms of the GNU Lesser General Public License as published by |
1191 | # the Free Software Foundation, either version 3 of the License, or |
1192 | @@ -116,7 +119,7 @@ |
1193 | |
1194 | flags = zookeeper.SEQUENCE |
1195 | if not self._persistent: |
1196 | - flags = flags|zookeeper.EPHEMERAL |
1197 | + flags = flags | zookeeper.EPHEMERAL |
1198 | |
1199 | d = self._client.create( |
1200 | "/".join((self._path, self.prefix)), item, self._acl, flags) |
1201 | |
1202 | === added file 'txzookeeper/retry.py' |
1203 | --- txzookeeper/retry.py 1970-01-01 00:00:00 +0000 |
1204 | +++ txzookeeper/retry.py 2013-05-15 13:35:05 +0000 |
1205 | @@ -0,0 +1,359 @@ |
1206 | +# |
1207 | +# Copyright (C) 2011 Canonical Ltd. All Rights Reserved |
1208 | +# |
1209 | +# This file is part of txzookeeper. |
1210 | +# |
1211 | +# Authors: |
1212 | +# Kapil Thangavelu |
1213 | +# |
1214 | +# txzookeeper is free software: you can redistribute it and/or modify |
1215 | +# it under the terms of the GNU Lesser General Public License as published by |
1216 | +# the Free Software Foundation, either version 3 of the License, or |
1217 | +# (at your option) any later version. |
1218 | +# |
1219 | +# txzookeeper is distributed in the hope that it will be useful, |
1220 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
1221 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
1222 | +# GNU Lesser General Public License for more details. |
1223 | +# |
1224 | +# You should have received a copy of the GNU Lesser General Public License |
1225 | +# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
1226 | +# |
1227 | + |
1228 | +""" |
1229 | +A retry client facade that transparently handles transient connection |
1230 | +errors. |
1231 | +""" |
1232 | + |
1233 | +import time |
1234 | +import logging |
1235 | +import zookeeper |
1236 | + |
1237 | +from twisted.internet.defer import inlineCallbacks, returnValue, Deferred |
1238 | + |
1239 | +from txzookeeper.client import NotConnectedException |
1240 | +from txzookeeper.utils import sleep |
1241 | + |
1242 | +__all__ = ["retry", "RetryClient"] |
1243 | + |
1244 | +log = logging.getLogger("txzk.retry") |
1245 | + |
1246 | + |
1247 | +class RetryException(Exception): |
1248 | + """Explicit retry exception. |
1249 | + """ |
1250 | + |
1251 | +# Session timeout percentage that we should wait till retrying. |
1252 | +RETRY_FRACTION = 30 |
1253 | + |
1254 | + |
1255 | +def is_retryable(e): |
1256 | + """Determine if an exception signifies a recoverable connection error. |
1257 | + """ |
1258 | + return isinstance( |
1259 | + e, |
1260 | + (zookeeper.ClosingException, |
1261 | + zookeeper.ConnectionLossException, |
1262 | + zookeeper.OperationTimeoutException, |
1263 | + RetryException)) |
1264 | + |
1265 | + |
1266 | +def is_session_error(e): |
1267 | + return isinstance( |
1268 | + e, |
1269 | + (zookeeper.SessionExpiredException, |
1270 | + zookeeper.ConnectionLossException, |
1271 | + zookeeper.ClosingException, |
1272 | + NotConnectedException)) |
1273 | + |
1274 | + |
1275 | +def _args(args): |
1276 | + return args and args[0] or "NA" |
1277 | + |
1278 | + |
1279 | +def get_delay(session_timeout, max_delay=5, session_fraction=RETRY_FRACTION): |
1280 | + """Get retry delay between retrying an operation. |
1281 | + |
1282 | + Returns either the specified fraction of a session timeout or the |
1283 | + max delay, whichever is smaller. |
1284 | + |
1285 | + The goal is to allow the connection time to auto-heal, before |
1286 | + retrying an operation. |
1287 | + |
1288 | + :param session_timeout: The timeout for the session, in milliseconds |
1289 | + :param max_delay: The max delay for a retry, in seconds. |
1290 | + :param session_fraction: The fractional amount of a timeout to wait |
1291 | + """ |
1292 | + retry_delay = (session_timeout * (float(session_fraction) / 100)) / 1000 |
1293 | + return min(retry_delay, max_delay) |
1294 | + |
1295 | + |
1296 | +def check_error(e): |
1297 | + """Verify a zookeeper connection error, as opposed to an app error. |
1298 | + """ |
1299 | + return is_retryable(e) or is_session_error(e) |
1300 | + |
1301 | + |
1302 | +def check_retryable(retry_client, max_time, error): |
1303 | + """Check an error and a client to see if an operation is retryable. |
1304 | + |
1305 | + :param retry_client: A txzookeeper client |
1306 | + :param max_time: The max time (epoch tick) that the op is retryable till. |
1307 | + :param error: The client operation exception. |
1308 | + """ |
1309 | + |
1310 | + t = time.time() |
1311 | + |
1312 | + # Only if the error is known. |
1313 | + if not is_retryable(error): |
1314 | + return False |
1315 | + |
1316 | + # Only if we've haven't exceeded the max allotted time. |
1317 | + if max_time <= t: |
1318 | + return False |
1319 | + |
1320 | + # Only if the client hasn't been explicitly closed. |
1321 | + if not retry_client.connected: |
1322 | + return False |
1323 | + |
1324 | + # Only if the client is in a recoverable state. |
1325 | + if retry_client.unrecoverable: |
1326 | + return False |
1327 | + |
1328 | + return True |
1329 | + |
1330 | + |
1331 | +@inlineCallbacks |
1332 | +def retry(client, func, *args, **kw): |
1333 | + """Constructs a retry wrapper around a function that retries invocations. |
1334 | + |
1335 | + If the function execution results in an exception due to a transient |
1336 | + connection error, the retry wrapper will reinvoke the operation after |
1337 | + a suitable delay (fractional value of the session timeout). |
1338 | + |
1339 | + :param client: A ZookeeperClient instance. |
1340 | + :param func: A callable python object that interacts with |
1341 | + zookeeper, the callable must utilize the same zookeeper |
1342 | + connection as passed in the `client` param. The function |
1343 | + must return a single value (either a deferred or result |
1344 | + value). |
1345 | + """ |
1346 | + retry_started = [time.time()] |
1347 | + retry_error = False |
1348 | + while 1: |
1349 | + try: |
1350 | + value = yield func(*args, **kw) |
1351 | + except Exception, e: |
1352 | + # For clients which aren't connected (session timeout == None) |
1353 | + # we raise the errors to the callers. |
1354 | + session_timeout = client.session_timeout or 0 |
1355 | + |
1356 | + # The longest we keep retrying is 1.2 * session timeout |
1357 | + max_time = (session_timeout / 1000.0) * 1.2 + retry_started[0] |
1358 | + |
1359 | + if not check_retryable(client, max_time, e): |
1360 | + # Check if its a persistent client error, and if so use the cb |
1361 | + # if present to try and reconnect for client errors. |
1362 | + if (check_error(e) |
1363 | + and time.time() > max_time |
1364 | + and callable(client.cb_retry_error) |
1365 | + and not retry_error): |
1366 | + log.debug("Retry error %r on %s @ %s", |
1367 | + e, func.__name__, _args(args)) |
1368 | + retry_error = True |
1369 | + yield client.cb_retry_error(e) |
1370 | + retry_started[0] = time.time() |
1371 | + continue |
1372 | + raise |
1373 | + |
1374 | + # Give the connection a chance to auto-heal. |
1375 | + yield sleep(get_delay(session_timeout)) |
1376 | + log.debug("Retry on %s @ %s", func.__name__, _args(args)) |
1377 | + continue |
1378 | + |
1379 | + returnValue(value) |
1380 | + |
1381 | + |
1382 | +def retry_watch(client, func, *args, **kw): |
1383 | + """Contructs a wrapper around a watch callable that retries invocations. |
1384 | + |
1385 | + If the callable execution results in an exception due to a transient |
1386 | + connection error, the retry wrapper will reinvoke the operation after |
1387 | + a suitable delay (fractional value of the session timeout). |
1388 | + |
1389 | + A watch function must return back a tuple of deferreds |
1390 | + (value_deferred, watch_deferred). No inline callbacks are |
1391 | + performed in here to ensure that callers continue to see a |
1392 | + tuple of results. |
1393 | + |
1394 | + The client passed to this retry function must be the same as |
1395 | + the one utilized by the python callable. |
1396 | + |
1397 | + :param client: A ZookeeperClient instance. |
1398 | + :param func: A python callable that interacts with zookeeper. If a |
1399 | + function is passed, a txzookeeper client must the first |
1400 | + parameter of this function. The function must return a |
1401 | + tuple of (value_deferred, watch_deferred) |
1402 | + """ |
1403 | + # For clients which aren't connected (session timeout == None) |
1404 | + # we raise the usage errors to the callers |
1405 | + session_timeout = client.session_timeout or 0 |
1406 | + |
1407 | + # If we keep retrying past the 1.2 * session timeout without |
1408 | + # success just die, the session expiry is fatal. |
1409 | + max_time = session_timeout * 1.2 + time.time() |
1410 | + value_d, watch_d = func(*args, **kw) |
1411 | + |
1412 | + def retry_delay(f): |
1413 | + """Errback, verifes an op is retryable, and delays the next retry. |
1414 | + """ |
1415 | + # Check that operation is retryable. |
1416 | + if not check_retryable(client, max_time, f.value): |
1417 | + return f |
1418 | + |
1419 | + # Give the connection a chance to auto-heal |
1420 | + d = sleep(get_delay(session_timeout)) |
1421 | + d.addCallback(retry_inner) |
1422 | + |
1423 | + return d |
1424 | + |
1425 | + def retry_inner(value): |
1426 | + """Retry operation invoker. |
1427 | + """ |
1428 | + # Invoke the function |
1429 | + retry_value_d, retry_watch_d = func(*args, **kw) |
1430 | + |
1431 | + # If we need to retry again. |
1432 | + retry_value_d.addErrback(retry_delay) |
1433 | + |
1434 | + # Chain the new watch deferred to the old, presuming its doa |
1435 | + # if the value deferred errored on a connection error. |
1436 | + retry_watch_d.chainDeferred(watch_d) |
1437 | + |
1438 | + # Insert back into the callback chain. |
1439 | + return retry_value_d |
1440 | + |
1441 | + # Attach the retry |
1442 | + value_d.addErrback(retry_delay) |
1443 | + |
1444 | + return value_d, watch_d |
1445 | + |
1446 | + |
1447 | +def _passproperty(name): |
1448 | + """Returns a method wrapper that delegates to a client's property. |
1449 | + """ |
1450 | + def wrapper(retry_client): |
1451 | + return getattr(retry_client.client, name) |
1452 | + return property(wrapper) |
1453 | + |
1454 | + |
1455 | +class RetryClient(object): |
1456 | + """A ZookeeperClient wrapper that transparently performs retries. |
1457 | + |
1458 | + A zookeeper connection can experience transient connection failures |
1459 | + on any operation. As long as the session associated to the connection |
1460 | + is still active on the zookeeper cluster, libzookeeper can reconnect |
1461 | + automatically to the cluster and session and the client is able to |
1462 | + retry. |
1463 | + |
1464 | + Whether a given operation is safe for retry depends on the application |
1465 | + in question and how's interacting with zookeeper. |
1466 | + |
1467 | + In particular coordination around sequence nodes can be |
1468 | + problematic, as the client has no way of knowing if the operation |
1469 | + succeed or not without additional application specific context. |
1470 | + |
1471 | + Idempotent operations against the zookeeper tree are generally |
1472 | + safe to retry. |
1473 | + |
1474 | + This class provides a simple wrapper around a zookeeper client, |
1475 | + that will automatically perform retries on operations that |
1476 | + interact with the zookeeper tree, in the face of transient errors, |
1477 | + till the session timeout has been reached. All of the attributes |
1478 | + and methods of a zookeeper client are exposed. |
1479 | + |
1480 | + All the methods of the client that interact with the zookeeper tree |
1481 | + are retry enabled. |
1482 | + """ |
1483 | + |
1484 | + def __init__(self, client): |
1485 | + self.client = client |
1486 | + self.client.cb_retry_error = None |
1487 | + |
1488 | + def set_retry_error_callback(self, callback): |
1489 | + self.client.cb_retry_error = callback |
1490 | + |
1491 | + def add_auth(self, *args, **kw): |
1492 | + return retry(self.client, self.client.add_auth, *args, **kw) |
1493 | + |
1494 | + def create(self, *args, **kw): |
1495 | + return retry(self.client, self.client.create, *args, **kw) |
1496 | + |
1497 | + def delete(self, *args, **kw): |
1498 | + return retry(self.client, self.client.delete, *args, **kw) |
1499 | + |
1500 | + def exists(self, *args, **kw): |
1501 | + return retry(self.client, self.client.exists, *args, **kw) |
1502 | + |
1503 | + def get(self, *args, **kw): |
1504 | + return retry(self.client, self.client.get, *args, **kw) |
1505 | + |
1506 | + def get_acl(self, *args, **kw): |
1507 | + return retry(self.client, self.client.get_acl, *args, **kw) |
1508 | + |
1509 | + def get_children(self, *args, **kw): |
1510 | + return retry(self.client, self.client.get_children, *args, **kw) |
1511 | + |
1512 | + def set_acl(self, *args, **kw): |
1513 | + return retry(self.client, self.client.set_acl, *args, **kw) |
1514 | + |
1515 | + def set(self, *args, **kw): |
1516 | + return retry(self.client, self.client.set, *args, **kw) |
1517 | + |
1518 | + def sync(self, *args, **kw): |
1519 | + return retry(self.client, self.client.sync, *args, **kw) |
1520 | + |
1521 | + # Watch retries |
1522 | + |
1523 | + def exists_and_watch(self, *args, **kw): |
1524 | + return retry_watch( |
1525 | + self.client, self.client.exists_and_watch, *args, **kw) |
1526 | + |
1527 | + def get_and_watch(self, *args, **kw): |
1528 | + return retry_watch( |
1529 | + self.client, self.client.get_and_watch, *args, **kw) |
1530 | + |
1531 | + def get_children_and_watch(self, *args, **kw): |
1532 | + return retry_watch( |
1533 | + self.client, self.client.get_children_and_watch, *args, **kw) |
1534 | + |
1535 | + # Passthrough methods |
1536 | + |
1537 | + def set_connection_watcher(self, *args, **kw): |
1538 | + return self.client.set_connection_watcher(*args, **kw) |
1539 | + |
1540 | + def set_connection_error_callback(self, *args, **kw): |
1541 | + return self.client.set_connection_error_callback(*args, **kw) |
1542 | + |
1543 | + def set_session_callback(self, *args, **kw): |
1544 | + return self.client.set_session_callback(*args, **kw) |
1545 | + |
1546 | + def set_determinstic_order(self, *args, **kw): |
1547 | + return self.client.set_determinstic_order(*args, **kw) |
1548 | + |
1549 | + def close(self): |
1550 | + return self.client.close() |
1551 | + |
1552 | + @inlineCallbacks |
1553 | + def connect(self, *args, **kw): |
1554 | + yield self.client.connect(*args, **kw) |
1555 | + returnValue(self) |
1556 | + |
1557 | + # passthrough properties |
1558 | + state = _passproperty("state") |
1559 | + client_id = _passproperty("client_id") |
1560 | + session_timeout = _passproperty("session_timeout") |
1561 | + servers = _passproperty("servers") |
1562 | + handle = _passproperty("handle") |
1563 | + connected = _passproperty("connected") |
1564 | + unrecoverable = _passproperty("unrecoverable") |
1565 | |
1566 | === modified file 'txzookeeper/tests/__init__.py' |
1567 | --- txzookeeper/tests/__init__.py 2011-06-30 18:05:14 +0000 |
1568 | +++ txzookeeper/tests/__init__.py 2013-05-15 13:35:05 +0000 |
1569 | @@ -1,8 +1,11 @@ |
1570 | # |
1571 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
1572 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
1573 | # |
1574 | # This file is part of txzookeeper. |
1575 | # |
1576 | +# Authors: |
1577 | +# Kapil Thangavelu |
1578 | +# |
1579 | # txzookeeper is free software: you can redistribute it and/or modify |
1580 | # it under the terms of the GNU Lesser General Public License as published by |
1581 | # the Free Software Foundation, either version 3 of the License, or |
1582 | @@ -17,9 +20,12 @@ |
1583 | # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
1584 | # |
1585 | |
1586 | +import logging |
1587 | +import StringIO |
1588 | import sys |
1589 | |
1590 | import zookeeper |
1591 | +from twisted.internet.defer import Deferred |
1592 | from twisted.trial.unittest import TestCase |
1593 | |
1594 | from mocker import MockerTestCase |
1595 | @@ -29,20 +35,64 @@ |
1596 | |
1597 | def setUp(self): |
1598 | super(ZookeeperTestCase, self).setUp() |
1599 | - self.log_file_path = self.makeFile() |
1600 | - self.log_file = open(self.log_file_path, 'w') |
1601 | - #zookeeper.set_log_stream(self.log_file) |
1602 | zookeeper.set_debug_level(0) |
1603 | |
1604 | def tearDown(self): |
1605 | super(ZookeeperTestCase, self).tearDown() |
1606 | - #zookeeper.set_log_stream(sys.stderr) # reset to default |
1607 | - #zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG) |
1608 | - self.log_file.close() |
1609 | |
1610 | def get_log(self): |
1611 | return open(self.log_file_path).read() |
1612 | |
1613 | + def sleep(self, delay): |
1614 | + """Non-blocking sleep.""" |
1615 | + from twisted.internet import reactor |
1616 | + deferred = Deferred() |
1617 | + reactor.callLater(delay, deferred.callback, None) |
1618 | + return deferred |
1619 | + |
1620 | + _missing_attr = object() |
1621 | + |
1622 | + def patch(self, object, attr, value): |
1623 | + """Replace an object's attribute, and restore original value later. |
1624 | + |
1625 | + Returns the original value of the attribute if any or None. |
1626 | + """ |
1627 | + original_value = getattr(object, attr, self._missing_attr) |
1628 | + |
1629 | + @self.addCleanup |
1630 | + def restore_original(): |
1631 | + if original_value is self._missing_attr: |
1632 | + try: |
1633 | + delattr(object, attr) |
1634 | + except AttributeError: |
1635 | + pass |
1636 | + else: |
1637 | + setattr(object, attr, original_value) |
1638 | + setattr(object, attr, value) |
1639 | + if original_value is self._missing_attr: |
1640 | + return None |
1641 | + return original_value |
1642 | + |
1643 | + def capture_log(self, name="", level=logging.INFO, |
1644 | + log_file=None, formatter=None): |
1645 | + """Capture log channel to StringIO""" |
1646 | + if log_file is None: |
1647 | + log_file = StringIO.StringIO() |
1648 | + log_handler = logging.StreamHandler(log_file) |
1649 | + if formatter: |
1650 | + log_handler.setFormatter(formatter) |
1651 | + logger = logging.getLogger(name) |
1652 | + logger.addHandler(log_handler) |
1653 | + old_logger_level = logger.level |
1654 | + logger.setLevel(level) |
1655 | + |
1656 | + @self.addCleanup |
1657 | + def reset_logging(): |
1658 | + logger.removeHandler(log_handler) |
1659 | + logger.setLevel(old_logger_level) |
1660 | + |
1661 | + return log_file |
1662 | + |
1663 | |
1664 | def egg_test_runner(): |
1665 | """ |
1666 | |
1667 | === modified file 'txzookeeper/tests/common.py' |
1668 | --- txzookeeper/tests/common.py 2011-06-18 15:35:03 +0000 |
1669 | +++ txzookeeper/tests/common.py 2013-05-15 13:35:05 +0000 |
1670 | @@ -1,3 +1,26 @@ |
1671 | +# |
1672 | +# Copyright (C) 2010-2011, 2011 Canonical Ltd. All Rights Reserved |
1673 | +# |
1674 | +# This file is part of txzookeeper. |
1675 | +# |
1676 | +# Authors: |
1677 | +# Kapil Thangavelu |
1678 | +# |
1679 | +# txzookeeper is free software: you can redistribute it and/or modify |
1680 | +# it under the terms of the GNU Lesser General Public License as published by |
1681 | +# the Free Software Foundation, either version 3 of the License, or |
1682 | +# (at your option) any later version. |
1683 | +# |
1684 | +# txzookeeper is distributed in the hope that it will be useful, |
1685 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
1686 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
1687 | +# GNU Lesser General Public License for more details. |
1688 | +# |
1689 | +# You should have received a copy of the GNU Lesser General Public License |
1690 | +# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
1691 | +# |
1692 | + |
1693 | + |
1694 | import os |
1695 | import os.path |
1696 | import shutil |
1697 | @@ -46,7 +69,7 @@ |
1698 | |
1699 | # various setup steps |
1700 | if not os.path.exists(self.working_path): |
1701 | - os.mdir(self.working_path) |
1702 | + os.mkdir(self.working_path) |
1703 | if not os.path.exists(log_path): |
1704 | os.mkdir(log_path) |
1705 | if not os.path.exists(data_path): |
1706 | @@ -54,10 +77,12 @@ |
1707 | |
1708 | with open(config_path, "w") as config: |
1709 | config.write(""" |
1710 | -tickTime=2000 |
1711 | +tickTime=1000 |
1712 | dataDir=%s |
1713 | clientPort=%s |
1714 | maxClientCnxns=0 |
1715 | +maxSessionTimeout=5000 |
1716 | +minSessionTimeout=2000 |
1717 | """ % (data_path, self.server_info.client_port)) |
1718 | |
1719 | # setup a replicated setup if peers are specified |
1720 | |
1721 | === added file 'txzookeeper/tests/proxy.py' |
1722 | --- txzookeeper/tests/proxy.py 1970-01-01 00:00:00 +0000 |
1723 | +++ txzookeeper/tests/proxy.py 2013-05-15 13:35:05 +0000 |
1724 | @@ -0,0 +1,97 @@ |
1725 | +# |
1726 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
1727 | +# |
1728 | +# This file is part of txzookeeper. |
1729 | +# |
1730 | +# Authors: |
1731 | +# Kapil Thangavelu |
1732 | +# |
1733 | +# txzookeeper is free software: you can redistribute it and/or modify |
1734 | +# it under the terms of the GNU Lesser General Public License as published by |
1735 | +# the Free Software Foundation, either version 3 of the License, or |
1736 | +# (at your option) any later version. |
1737 | +# |
1738 | +# txzookeeper is distributed in the hope that it will be useful, |
1739 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
1740 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
1741 | +# GNU Lesser General Public License for more details. |
1742 | +# |
1743 | +# You should have received a copy of the GNU Lesser General Public License |
1744 | +# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
1745 | +# |
1746 | + |
1747 | +from twisted.protocols import portforward |
1748 | + |
1749 | + |
1750 | +class Blockable(object): |
1751 | + |
1752 | + _blocked = None |
1753 | + |
1754 | + def set_blocked(self, value): |
1755 | + value = bool(value) |
1756 | + self._blocked = value |
1757 | + if self.transport and not self._blocked: |
1758 | + self.transport.resumeProducing() |
1759 | + |
1760 | + |
1761 | +class ProxyClient(portforward.ProxyClient, Blockable): |
1762 | + |
1763 | + def dataReceived(self, data): |
1764 | + if self._blocked: |
1765 | + return |
1766 | + portforward.ProxyClient.dataReceived(self, data) |
1767 | + |
1768 | + def setServer(self, server): |
1769 | + server.set_blocked(self._blocked) |
1770 | + super(ProxyClient, self).setServer(server) |
1771 | + |
1772 | + def connectionMade(self): |
1773 | + self.peer.setPeer(self) |
1774 | + if not self._blocked: |
1775 | + # The server waits till the client is connected |
1776 | + self.peer.transport.resumeProducing() |
1777 | + else: |
1778 | + self.transport.pauseProducing() |
1779 | + |
1780 | + |
1781 | +class ProxyClientFactory(portforward.ProxyClientFactory): |
1782 | + |
1783 | + protocol = ProxyClient |
1784 | + |
1785 | + |
1786 | +class ProxyServer(portforward.ProxyServer, Blockable): |
1787 | + |
1788 | + clientProtocolFactory = ProxyClientFactory |
1789 | + |
1790 | + def dataReceived(self, data): |
1791 | + if self._blocked: |
1792 | + return |
1793 | + portforward.ProxyServer.dataReceived(self, data) |
1794 | + |
1795 | + |
1796 | +class ProxyFactory(portforward.ProxyFactory): |
1797 | + |
1798 | + protocol = ProxyServer |
1799 | + instance = _blocked = False |
1800 | + |
1801 | + def lose_connection(self): |
1802 | + """Terminate both ends of the proxy connection.""" |
1803 | + if self.instance: |
1804 | + self.instance.transport.loseConnection() |
1805 | + if self.instance.peer: |
1806 | + self.instance.peer.transport.loseConnection() |
1807 | + |
1808 | + def set_blocked(self, value): |
1809 | + self._blocked = bool(value) |
1810 | + if self.instance: |
1811 | + self.instance.set_blocked(self._blocked) |
1812 | + if self.instance.peer: |
1813 | + self.instance.peer.set_blocked(self._blocked) |
1814 | + |
1815 | + def buildProtocol(self, addr): |
1816 | + # Track last protocol used, on reconnect any pauses are disabled. |
1817 | + self.instance = portforward.ProxyFactory.buildProtocol(self, addr) |
1818 | + # Propogate the value, the client will aggressively try to |
1819 | + # reconnect else. |
1820 | + self.instance.set_blocked(self._blocked) |
1821 | + return self.instance |
1822 | |
1823 | === modified file 'txzookeeper/tests/test_client.py' |
1824 | --- txzookeeper/tests/test_client.py 2011-06-30 20:16:17 +0000 |
1825 | +++ txzookeeper/tests/test_client.py 2013-05-15 13:35:05 +0000 |
1826 | @@ -1,8 +1,11 @@ |
1827 | # |
1828 | -# Copyright (C) 2010, 2011 Canonical Ltd. All Rights Reserved |
1829 | +# Copyright (C) 2010-2011, 2011 Canonical Ltd. All Rights Reserved |
1830 | # |
1831 | # This file is part of txzookeeper. |
1832 | # |
1833 | +# Authors: |
1834 | +# Kapil Thangavelu |
1835 | +# |
1836 | # txzookeeper is free software: you can redistribute it and/or modify |
1837 | # it under the terms of the GNU Lesser General Public License as published by |
1838 | # the Free Software Foundation, either version 3 of the License, or |
1839 | @@ -21,13 +24,13 @@ |
1840 | import base64 |
1841 | import hashlib |
1842 | |
1843 | -from twisted.internet.defer import Deferred |
1844 | +from twisted.internet.defer import Deferred, maybeDeferred |
1845 | from twisted.internet.base import DelayedCall |
1846 | from twisted.python.failure import Failure |
1847 | |
1848 | import zookeeper |
1849 | |
1850 | -from mocker import ANY, MATCH |
1851 | +from mocker import ANY, MATCH, ARGS |
1852 | from txzookeeper.tests import ZookeeperTestCase, utils |
1853 | from txzookeeper.client import ( |
1854 | ZookeeperClient, ZOO_OPEN_ACL_UNSAFE, ConnectionTimeoutException, |
1855 | @@ -119,18 +122,55 @@ |
1856 | d.addCallback(check_connected) |
1857 | return d |
1858 | |
1859 | + def test_close(self): |
1860 | + """ |
1861 | + Test that the connection is closed, also for the first |
1862 | + connection when the zookeeper handle is 0. |
1863 | + """ |
1864 | + |
1865 | + def _fake_init(*_): |
1866 | + return 0 |
1867 | + |
1868 | + mock_init = self.mocker.replace("zookeeper.init") |
1869 | + mock_init(ARGS) |
1870 | + self.mocker.call(_fake_init) |
1871 | + |
1872 | + def _fake_close(handle): |
1873 | + return zookeeper.OK |
1874 | + |
1875 | + mock_close = self.mocker.replace("zookeeper.close") |
1876 | + mock_close(0) |
1877 | + self.mocker.call(_fake_close) |
1878 | + |
1879 | + self.mocker.replay() |
1880 | + |
1881 | + # Avoid unclean reactor by letting the callLater go through, |
1882 | + # but we do not care about the timeout. |
1883 | + def _silence_timeout(failure): |
1884 | + failure.trap(ConnectionTimeoutException) |
1885 | + self.client.connect(timeout=0).addErrback(_silence_timeout) |
1886 | + |
1887 | + d = maybeDeferred(self.client.close) |
1888 | + |
1889 | + def _verify(result): |
1890 | + self.mocker.verify() |
1891 | + d.addCallback(_verify) |
1892 | + return d |
1893 | + |
1894 | def test_client_event_repr(self): |
1895 | event = ClientEvent(zookeeper.SESSION_EVENT, |
1896 | - zookeeper.EXPIRED_SESSION_STATE, '') |
1897 | - self.assertEqual(repr(event), |
1898 | - "<ClientEvent session at '' state: expired>") |
1899 | + zookeeper.EXPIRED_SESSION_STATE, '', 0) |
1900 | + self.assertEqual( |
1901 | + repr(event), |
1902 | + "<ClientEvent session at '' state: expired handle:0>") |
1903 | |
1904 | def test_client_event_attributes(self): |
1905 | - event = ClientEvent(4, 'state', 'path') |
1906 | + event = ClientEvent(4, 'state', 'path', 0) |
1907 | self.assertEqual(event.type, 4) |
1908 | self.assertEqual(event.connection_state, 'state') |
1909 | self.assertEqual(event.path, 'path') |
1910 | - self.assertEqual(event, (4, 'state', 'path')) |
1911 | + self.assertEqual(event.handle, 0) |
1912 | + self.assertEqual(event, (4, 'state', 'path', 0)) |
1913 | |
1914 | def test_client_use_while_disconnected_returns_failure(self): |
1915 | return self.assertFailure( |
1916 | @@ -401,7 +441,7 @@ |
1917 | """ |
1918 | d = self.client.connect() |
1919 | |
1920 | - def inject_error(result_code, d, extra_codes=None): |
1921 | + def inject_error(result_code, d, extra_codes=None, path=None): |
1922 | error = SyntaxError() |
1923 | d.errback(error) |
1924 | return error |
1925 | @@ -409,7 +449,8 @@ |
1926 | def check_exists(client): |
1927 | mock_client = self.mocker.patch(client) |
1928 | mock_client._check_result( |
1929 | - ANY, DEFERRED_MATCH, extra_codes=(zookeeper.NONODE,)) |
1930 | + ANY, DEFERRED_MATCH, extra_codes=(zookeeper.NONODE,), |
1931 | + path="/zebra-moon") |
1932 | self.mocker.call(inject_error) |
1933 | self.mocker.replay() |
1934 | return client.exists("/zebra-moon") |
1935 | @@ -506,7 +547,7 @@ |
1936 | |
1937 | def check_exists(path): |
1938 | exists, watch = self.client.exists_and_watch( |
1939 | - "%s/wooly-mammoth"%node_path) |
1940 | + "%s/wooly-mammoth" % node_path) |
1941 | watch.chainDeferred(watcher_deferred) |
1942 | return exists |
1943 | |
1944 | @@ -517,13 +558,14 @@ |
1945 | |
1946 | def create_node(client): |
1947 | self.assertEqual(client.connected, True) |
1948 | - return self.client2.create("%s/wooly-mammoth"%node_path, "extinct") |
1949 | + return self.client2.create( |
1950 | + "%s/wooly-mammoth" % node_path, "extinct") |
1951 | |
1952 | def shim(path): |
1953 | return watcher_deferred |
1954 | |
1955 | def verify_watch(event): |
1956 | - self.assertEqual(event.path, "%s/wooly-mammoth"%node_path) |
1957 | + self.assertEqual(event.path, "%s/wooly-mammoth" % node_path) |
1958 | self.assertEqual(event.type, zookeeper.CREATED_EVENT) |
1959 | |
1960 | d.addCallback(create_container) |
1961 | @@ -600,7 +642,8 @@ |
1962 | |
1963 | def verify_succeeds(failure): |
1964 | self.assertTrue(failure) |
1965 | - self.assertEqual(failure.value.args, ("no node",)) |
1966 | + self.assertEqual( |
1967 | + failure.value.args, ("no node /abcd",)) |
1968 | |
1969 | d.addCallback(delete_node) |
1970 | d.addCallback(verify_fails) |
1971 | @@ -643,7 +686,8 @@ |
1972 | |
1973 | def verify_succeeds(failure): |
1974 | self.assertTrue(failure) |
1975 | - self.assertEqual(failure.value.args, ("no node",)) |
1976 | + self.assertTrue( |
1977 | + failure.value.args, ("no node /xy1")) |
1978 | |
1979 | d.addCallback(set_node) |
1980 | d.addCallback(verify_fails) |
1981 | @@ -833,7 +877,7 @@ |
1982 | return self.client.set("/orchard", "bar") |
1983 | |
1984 | def node_access_failed(failure): |
1985 | - self.assertEqual(failure.value.args, ("not authenticated",)) |
1986 | + self.assertEqual(failure.value.args, ("not authenticated /orchard",)) |
1987 | failed.append(True) |
1988 | return |
1989 | |
1990 | @@ -1046,7 +1090,7 @@ |
1991 | d = self.client.connect() |
1992 | |
1993 | def verify_session_timeout(client): |
1994 | - self.assertEqual(client.session_timeout, 4000) |
1995 | + self.assertIn(client.session_timeout, (4000, 10000)) |
1996 | |
1997 | d.addCallback(verify_session_timeout) |
1998 | return d |
1999 | @@ -1082,60 +1126,6 @@ |
2000 | d.addErrback(verify_invalid) |
2001 | return d |
2002 | |
2003 | - def xtest_session_expired_event(self): |
2004 | - """ |
2005 | - A client session can be reattached to in a separate connection, |
2006 | - if the a session is expired, using the zookeeper connection will |
2007 | - raise a SessionExpiredException. |
2008 | - """ |
2009 | - d = self.client.connect() |
2010 | - |
2011 | - class StopTest(Exception): |
2012 | - pass |
2013 | - |
2014 | - def new_connection_same_connection(client): |
2015 | - self.assertEqual(client.state, zookeeper.CONNECTED_STATE) |
2016 | - return ZookeeperClient("127.0.0.1:2181").connect( |
2017 | - client_id=client.client_id).addErrback( |
2018 | - guard_session_expired, client) |
2019 | - |
2020 | - def guard_session_expired(failure, client): |
2021 | - # On occassion we get a session expired event while connecting. |
2022 | - failure.trap(ConnectionException) |
2023 | - self.assertEqual(failure.value.state_name, "expired") |
2024 | - # Stop the test from proceeding |
2025 | - raise StopTest() |
2026 | - |
2027 | - def close_new_connection(client): |
2028 | - # Verify both connections are using same session |
2029 | - self.assertEqual(self.client.client_id, client.client_id) |
2030 | - self.assertEqual(client.state, zookeeper.CONNECTED_STATE) |
2031 | - |
2032 | - # Closing one connection will close the session |
2033 | - client.close() |
2034 | - |
2035 | - # Continued use of the other client will get a |
2036 | - # disconnect exception. |
2037 | - return self.client.exists("/") |
2038 | - |
2039 | - def verify_original_closed(failure): |
2040 | - if not isinstance(failure, Failure): |
2041 | - self.fail("Test did not raise exception.") |
2042 | - failure.trap( |
2043 | - zookeeper.SessionExpiredException, |
2044 | - zookeeper.ConnectionLossException) |
2045 | - |
2046 | - #print "client close" |
2047 | - self.client.close() |
2048 | - #print "creating new client for teardown" |
2049 | - return self.client.connect() |
2050 | - |
2051 | - d.addCallback(new_connection_same_connection) |
2052 | - d.addCallback(close_new_connection) |
2053 | - d.addBoth(verify_original_closed) |
2054 | - |
2055 | - return d |
2056 | - |
2057 | def test_connect_with_server(self): |
2058 | """ |
2059 | A client's servers can be specified in the connect method. |
2060 | @@ -1183,12 +1173,8 @@ |
2061 | connected before then, then an errback is invoked with a timeout |
2062 | exception. |
2063 | """ |
2064 | - mock_init = self.mocker.replace("zookeeper.init") |
2065 | - mock_init(ANY, ANY, ANY) |
2066 | - self.mocker.result(0) |
2067 | - self.mocker.replay() |
2068 | - |
2069 | - d = self.client.connect(timeout=0.1) |
2070 | + # Connect to a non standard port with nothing at the remote side. |
2071 | + d = self.client.connect("127.0.0.1:2182", timeout=0.2) |
2072 | |
2073 | def verify_timeout(failure): |
2074 | self.assertTrue( |
2075 | |
2076 | === added file 'txzookeeper/tests/test_conn_failure.py' |
2077 | --- txzookeeper/tests/test_conn_failure.py 1970-01-01 00:00:00 +0000 |
2078 | +++ txzookeeper/tests/test_conn_failure.py 2013-05-15 13:35:05 +0000 |
2079 | @@ -0,0 +1,194 @@ |
2080 | +# |
2081 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
2082 | +# |
2083 | +# This file is part of txzookeeper. |
2084 | +# |
2085 | +# Authors: |
2086 | +# Kapil Thangavelu |
2087 | +# |
2088 | +# txzookeeper is free software: you can redistribute it and/or modify |
2089 | +# it under the terms of the GNU Lesser General Public License as published by |
2090 | +# the Free Software Foundation, either version 3 of the License, or |
2091 | +# (at your option) any later version. |
2092 | +# |
2093 | +# txzookeeper is distributed in the hope that it will be useful, |
2094 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
2095 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
2096 | +# GNU Lesser General Public License for more details. |
2097 | +# |
2098 | +# You should have received a copy of the GNU Lesser General Public License |
2099 | +# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
2100 | +# |
2101 | + |
2102 | +import zookeeper |
2103 | + |
2104 | +from twisted.internet import reactor |
2105 | +from twisted.internet.defer import inlineCallbacks |
2106 | + |
2107 | +from txzookeeper import ZookeeperClient |
2108 | +from txzookeeper.tests import ZookeeperTestCase, utils |
2109 | +from txzookeeper.tests.proxy import ProxyFactory |
2110 | + |
2111 | + |
2112 | +class WatchDeliveryConnectionFailedTest(ZookeeperTestCase): |
2113 | + """Watches are still sent on reconnect. |
2114 | + """ |
2115 | + |
2116 | + def setUp(self): |
2117 | + super(WatchDeliveryConnectionFailedTest, self).setUp() |
2118 | + |
2119 | + self.proxy = ProxyFactory("127.0.0.1", 2181) |
2120 | + self.proxy_port = reactor.listenTCP(0, self.proxy) |
2121 | + host = self.proxy_port.getHost() |
2122 | + self.proxied_client = ZookeeperClient( |
2123 | + "%s:%s" % (host.host, host.port)) |
2124 | + self.direct_client = ZookeeperClient("127.0.0.1:2181", 3000) |
2125 | + self.session_events = [] |
2126 | + |
2127 | + def session_event_collector(conn, event): |
2128 | + self.session_events.append(event) |
2129 | + |
2130 | + self.proxied_client.set_session_callback(session_event_collector) |
2131 | + return self.direct_client.connect() |
2132 | + |
2133 | + @inlineCallbacks |
2134 | + def tearDown(self): |
2135 | + zookeeper.set_debug_level(0) |
2136 | + if self.proxied_client.connected: |
2137 | + yield self.proxied_client.close() |
2138 | + if not self.direct_client.connected: |
2139 | + yield self.direct_client.connect() |
2140 | + utils.deleteTree(handle=self.direct_client.handle) |
2141 | + yield self.direct_client.close() |
2142 | + self.proxy.lose_connection() |
2143 | + yield self.proxy_port.stopListening() |
2144 | + |
2145 | + def verify_events(self, events, expected): |
2146 | + """Verify the state of the session events encountered. |
2147 | + """ |
2148 | + for value, state in zip([e.state_name for e in events], expected): |
2149 | + self.assertEqual(value, state) |
2150 | + |
2151 | + @inlineCallbacks |
2152 | + def test_child_watch_fires_upon_reconnect(self): |
2153 | + yield self.proxied_client.connect() |
2154 | + |
2155 | + # Setup tree |
2156 | + cpath = "/test-tree" |
2157 | + yield self.direct_client.create(cpath) |
2158 | + |
2159 | + # Setup watch |
2160 | + child_d, watch_d = self.proxied_client.get_children_and_watch(cpath) |
2161 | + |
2162 | + self.assertEqual((yield child_d), []) |
2163 | + |
2164 | + # Kill the connection and fire the watch |
2165 | + self.proxy.lose_connection() |
2166 | + yield self.direct_client.create( |
2167 | + cpath + "/abc", flags=zookeeper.SEQUENCE) |
2168 | + |
2169 | + # We should still get the child event. |
2170 | + yield watch_d |
2171 | + |
2172 | + # We get two pairs of (connecting, connected) for the conn and watch |
2173 | + self.assertEqual(len(self.session_events), 4) |
2174 | + self.verify_events( |
2175 | + self.session_events, |
2176 | + ("connecting", "connecting", "connected", "connected")) |
2177 | + |
2178 | + @inlineCallbacks |
2179 | + def test_exists_watch_fires_upon_reconnect(self): |
2180 | + yield self.proxied_client.connect() |
2181 | + cpath = "/test" |
2182 | + |
2183 | + # Setup watch |
2184 | + exists_d, watch_d = self.proxied_client.exists_and_watch(cpath) |
2185 | + |
2186 | + self.assertEqual((yield exists_d), None) |
2187 | + |
2188 | + # Kill the connection and fire the watch |
2189 | + self.proxy.lose_connection() |
2190 | + yield self.direct_client.create(cpath) |
2191 | + |
2192 | + # We should still get the exists event. |
2193 | + yield watch_d |
2194 | + |
2195 | + # We get two pairs of (connecting, connected) for the conn and watch |
2196 | + self.assertEqual(len(self.session_events), 4) |
2197 | + self.verify_events( |
2198 | + self.session_events, |
2199 | + ("connecting", "connecting", "connected", "connected")) |
2200 | + |
2201 | + @inlineCallbacks |
2202 | + def test_get_watch_fires_upon_reconnect(self): |
2203 | + yield self.proxied_client.connect() |
2204 | + # Setup tree |
2205 | + cpath = "/test" |
2206 | + yield self.direct_client.create(cpath, "abc") |
2207 | + |
2208 | + # Setup watch |
2209 | + get_d, watch_d = self.proxied_client.get_and_watch(cpath) |
2210 | + content, stat = yield get_d |
2211 | + self.assertEqual(content, "abc") |
2212 | + |
2213 | + # Kill the connection and fire the watch |
2214 | + self.proxy.lose_connection() |
2215 | + yield self.direct_client.set(cpath, "xyz") |
2216 | + |
2217 | + # We should still get the exists event. |
2218 | + yield watch_d |
2219 | + |
2220 | + # We also two pairs of (connecting, connected) for the conn and watch |
2221 | + self.assertEqual(len(self.session_events), 4) |
2222 | + self.verify_events( |
2223 | + self.session_events, |
2224 | + ("connecting", "connecting", "connected", "connected")) |
2225 | + |
2226 | + @inlineCallbacks |
2227 | + def test_watch_delivery_failure_resends(self): |
2228 | + """Simulate a network failure for the watch delivery |
2229 | + |
2230 | + The zk server effectively sends the watch delivery to the client, |
2231 | + but the client never recieves it. |
2232 | + """ |
2233 | + yield self.proxied_client.connect() |
2234 | + cpath = "/test" |
2235 | + |
2236 | + # Setup watch |
2237 | + exists_d, watch_d = self.proxied_client.exists_and_watch(cpath) |
2238 | + |
2239 | + self.assertEqual((yield exists_d), None) |
2240 | + |
2241 | + # Pause the connection fire the watch, and blackhole the data. |
2242 | + self.proxy.set_blocked(True) |
2243 | + yield self.direct_client.create(cpath) |
2244 | + self.proxy.set_blocked(False) |
2245 | + self.proxy.lose_connection() |
2246 | + |
2247 | + # We should still get the exists event. |
2248 | + yield watch_d |
2249 | + |
2250 | + @inlineCallbacks |
2251 | + def xtest_binding_bug_session_exception(self): |
2252 | + """This test triggers an exception in the python-zookeeper binding. |
2253 | + |
2254 | + File "txzookeeper/client.py", line 491, in create |
2255 | + self.handle, path, data, acls, flags, callback) |
2256 | + exceptions.SystemError: error return without exception set |
2257 | + """ |
2258 | + yield self.proxied_client.connect() |
2259 | + data_d, watch_d = yield self.proxied_client.exists_and_watch("/") |
2260 | + self.assertTrue((yield data_d)) |
2261 | + self.proxy.set_blocked(True) |
2262 | + # Wait for session expiration, on a single server options are limited |
2263 | + yield self.sleep(15) |
2264 | + # Unblock the proxy for next connect, and then drop the connection. |
2265 | + self.proxy.set_blocked(False) |
2266 | + self.proxy.lose_connection() |
2267 | + # Wait for a reconnect |
2268 | + yield self.assertFailure(watch_d, zookeeper.SessionExpiredException) |
2269 | + # Leads to bindings bug failure |
2270 | + yield self.assertFailure( |
2271 | + self.proxied_client.get("/a"), |
2272 | + zookeeper.SessionExpiredException) |
2273 | + self.assertEqual(self.session_events[-1].state_name, "expired") |
2274 | |
2275 | === modified file 'txzookeeper/tests/test_lock.py' |
2276 | --- txzookeeper/tests/test_lock.py 2011-06-17 23:52:34 +0000 |
2277 | +++ txzookeeper/tests/test_lock.py 2013-05-15 13:35:05 +0000 |
2278 | @@ -1,8 +1,11 @@ |
2279 | # |
2280 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
2281 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
2282 | # |
2283 | # This file is part of txzookeeper. |
2284 | # |
2285 | +# Authors: |
2286 | +# Kapil Thangavelu |
2287 | +# |
2288 | # txzookeeper is free software: you can redistribute it and/or modify |
2289 | # it under the terms of the GNU Lesser General Public License as published by |
2290 | # the Free Software Foundation, either version 3 of the License, or |
2291 | |
2292 | === added file 'txzookeeper/tests/test_managed.py' |
2293 | --- txzookeeper/tests/test_managed.py 1970-01-01 00:00:00 +0000 |
2294 | +++ txzookeeper/tests/test_managed.py 2013-05-15 13:35:05 +0000 |
2295 | @@ -0,0 +1,309 @@ |
2296 | +# |
2297 | +# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved |
2298 | +# |
2299 | +# This file is part of txzookeeper. |
2300 | +# |
2301 | +# Authors: |
2302 | +# Kapil Thangavelu |
2303 | +# |
2304 | +# txzookeeper is free software: you can redistribute it and/or modify |
2305 | +# it under the terms of the GNU Lesser General Public License as published by |
2306 | +# the Free Software Foundation, either version 3 of the License, or |
2307 | +# (at your option) any later version. |
2308 | +# |
2309 | +# txzookeeper is distributed in the hope that it will be useful, |
2310 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
2311 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
2312 | +# GNU Lesser General Public License for more details. |
2313 | +# |
2314 | +# You should have received a copy of the GNU Lesser General Public License |
2315 | +# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
2316 | +# |
2317 | + |
2318 | +import logging |
2319 | +import zookeeper |
2320 | + |
2321 | +from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList |
2322 | + |
2323 | +from txzookeeper.client import ZookeeperClient, ClientEvent |
2324 | +from txzookeeper import managed |
2325 | +from txzookeeper.tests import ZookeeperTestCase, utils |
2326 | +from txzookeeper.tests import test_client |
2327 | + |
2328 | + |
2329 | +class WatchTest(ZookeeperTestCase): |
2330 | + """Watch manager and watch tests""" |
2331 | + |
2332 | + def setUp(self): |
2333 | + self.watches = managed.WatchManager() |
2334 | + |
2335 | + def tearDown(self): |
2336 | + self.watches.clear() |
2337 | + del self.watches |
2338 | + |
2339 | + def test_add_remove(self): |
2340 | + |
2341 | + w = self.watches.add("/foobar", "child", lambda x: 1) |
2342 | + self.assertIn( |
2343 | + "<Watcher child /foobar", |
2344 | + str(w)) |
2345 | + self.assertIn(w, self.watches._watches) |
2346 | + self.watches.remove(w) |
2347 | + self.assertNotIn(w, self.watches._watches) |
2348 | + self.watches.remove(w) |
2349 | + |
2350 | + @inlineCallbacks |
2351 | + def test_watch_fire_removes(self): |
2352 | + """Firing the watch removes it from the manager. |
2353 | + """ |
2354 | + w = self.watches.add("/foobar", "child", lambda x: 1) |
2355 | + yield w("a") |
2356 | + self.assertNotIn(w, self.watches._watches) |
2357 | + |
2358 | + @inlineCallbacks |
2359 | + def test_watch_fire_with_error_removes(self): |
2360 | + """Firing the watch removes it from the manager. |
2361 | + """ |
2362 | + d = Deferred() |
2363 | + |
2364 | + @inlineCallbacks |
2365 | + def cb_error(e): |
2366 | + yield d |
2367 | + raise ValueError("a") |
2368 | + |
2369 | + w = self.watches.add("/foobar", "child", lambda x: 1) |
2370 | + try: |
2371 | + wd = w("a") |
2372 | + d.callback(True) |
2373 | + yield wd |
2374 | + except ValueError: |
2375 | + pass |
2376 | + self.assertNotIn(w, self.watches._watches) |
2377 | + |
2378 | + @inlineCallbacks |
2379 | + def test_reset_with_error(self): |
2380 | + """A callback firing an error on reset is ignored. |
2381 | + """ |
2382 | + output = self.capture_log("txzk.managed") |
2383 | + d = Deferred() |
2384 | + results = [] |
2385 | + |
2386 | + @inlineCallbacks |
2387 | + def callback(*args, **kw): |
2388 | + results.append((args, kw)) |
2389 | + yield d |
2390 | + raise ValueError("a") |
2391 | + |
2392 | + w = self.watches.add("/foobar", "child", callback) |
2393 | + reset_done = self.watches.reset() |
2394 | + |
2395 | + e, _ = results.pop() |
2396 | + e = list(e) |
2397 | + e.append(0) |
2398 | + |
2399 | + self.assertEqual( |
2400 | + str(ClientEvent(*e)), |
2401 | + "<ClientEvent session at '/foobar' state: connected handle:0>") |
2402 | + d.callback(True) |
2403 | + yield reset_done |
2404 | + self.assertNotIn(w, self.watches._watches) |
2405 | + self.assertIn("Error reseting watch", output.getvalue()) |
2406 | + |
2407 | + @inlineCallbacks |
2408 | + def test_reset(self): |
2409 | + """Reset fires a synthentic client event, and clears watches. |
2410 | + """ |
2411 | + d = Deferred() |
2412 | + results = [] |
2413 | + |
2414 | + def callback(*args, **kw): |
2415 | + results.append((args, kw)) |
2416 | + return d |
2417 | + |
2418 | + w = self.watches.add("/foobar", "child", callback) |
2419 | + reset_done = self.watches.reset() |
2420 | + |
2421 | + e, _ = results.pop() |
2422 | + e = list(e) |
2423 | + e.append(0) |
2424 | + |
2425 | + self.assertEqual( |
2426 | + str(ClientEvent(*e)), |
2427 | + "<ClientEvent session at '/foobar' state: connected handle:0>") |
2428 | + d.callback(True) |
2429 | + yield reset_done |
2430 | + self.assertNotIn(w, self.watches._watches) |
2431 | + |
2432 | + |
2433 | +class SessionClientTests(test_client.ClientTests): |
2434 | + """Run through basic operations with SessionClient.""" |
2435 | + timeout = 5 |
2436 | + |
2437 | + def setUp(self): |
2438 | + super(SessionClientTests, self).setUp() |
2439 | + self.client = managed.SessionClient("127.0.0.1:2181") |
2440 | + |
2441 | + def test_client_use_while_disconnected_returns_failure(self): |
2442 | + # managed session client reconnects here. |
2443 | + return True |
2444 | + |
2445 | + |
2446 | +class SessionClientExpireTests(ZookeeperTestCase): |
2447 | + """Verify expiration behavior.""" |
2448 | + |
2449 | + def setUp(self): |
2450 | + super(SessionClientExpireTests, self).setUp() |
2451 | + self.client = managed.ManagedClient("127.0.0.1:2181", 3000) |
2452 | + self.client2 = None |
2453 | + self.output = self.capture_log(level=logging.DEBUG) |
2454 | + return self.client.connect() |
2455 | + |
2456 | + @inlineCallbacks |
2457 | + def tearDown(self): |
2458 | + self.client.close() |
2459 | + self.client2 = ZookeeperClient("127.0.0.1:2181") |
2460 | + yield self.client2.connect() |
2461 | + utils.deleteTree(handle=self.client2.handle) |
2462 | + yield self.client2.close() |
2463 | + super(SessionClientExpireTests, self).tearDown() |
2464 | + |
2465 | + @inlineCallbacks |
2466 | + def expire_session(self, wait=True, retry=0): |
2467 | + assert self.client.connected |
2468 | + #if wait: |
2469 | + # d = self.client.subscribe_new_session() |
2470 | + client_id = self.client.client_id |
2471 | + self.client2 = ZookeeperClient(self.client.servers) |
2472 | + yield self.client2.connect(client_id=client_id) |
2473 | + yield self.client2.close() |
2474 | + |
2475 | + # It takes some time to propagate (1/3 session time as ping) |
2476 | + if wait: |
2477 | + yield self.sleep(2) |
2478 | + client_new_id = self.client.client_id |
2479 | + # Crappy workaround to c libzk bug/issue see http://goo.gl/9ei5c |
2480 | + # Works most of the time.. but bound it when it doesn't. lame! |
2481 | + if client_id[0] == client_new_id[0] and retry < 10: |
2482 | + yield self.expire_session(wait, retry+1) |
2483 | + |
2484 | + @inlineCallbacks |
2485 | + def test_session_expiration_conn(self): |
2486 | + d = self.client.subscribe_new_session() |
2487 | + session_id = self.client.client_id[0] |
2488 | + yield self.client.create("/fo-1", "abc") |
2489 | + yield self.expire_session(wait=True) |
2490 | + yield d |
2491 | + stat = yield self.client.exists("/") |
2492 | + self.assertTrue(stat) |
2493 | + self.assertNotEqual(session_id, self.client.client_id[0]) |
2494 | + |
2495 | + @inlineCallbacks |
2496 | + def test_session_expiration_notification(self): |
2497 | + session_id = self.client.client_id[0] |
2498 | + c_d, w_d = self.client.get_and_watch("/") |
2499 | + yield c_d |
2500 | + d = self.client.subscribe_new_session() |
2501 | + self.assertFalse(d.called) |
2502 | + yield self.expire_session(wait=True) |
2503 | + yield d |
2504 | + yield w_d |
2505 | + self.assertNotEqual(session_id, self.client.client_id[0]) |
2506 | + |
2507 | + @inlineCallbacks |
2508 | + def test_invoked_watch_gc(self): |
2509 | + c_d, w_d = yield self.client.get_children_and_watch("/") |
2510 | + yield c_d |
2511 | + yield self.client.create("/foo") |
2512 | + yield w_d |
2513 | + yield self.expire_session() |
2514 | + yield self.client.create("/foo2") |
2515 | + # Nothing should blow up |
2516 | + yield self.sleep(0.2) |
2517 | + |
2518 | + @inlineCallbacks |
2519 | + def test_app_usage_error_bypass_retry(self): |
2520 | + """App errors shouldn't trigger a reconnect.""" |
2521 | + output = self.capture_log(level=logging.DEBUG) |
2522 | + yield self.assertFailure( |
2523 | + self.client.get("/abc"), zookeeper.NoNodeException) |
2524 | + self.assertNotIn("Persistent retry error", output.getvalue()) |
2525 | + |
2526 | + @inlineCallbacks |
2527 | + def test_ephemeral_and_watch_recreate(self): |
2528 | + # Create some ephemeral nodes |
2529 | + yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
2530 | + yield self.client.create("/fo-2", "def", flags=zookeeper.EPHEMERAL) |
2531 | + |
2532 | + # Create some watches |
2533 | + g_d, g_w_d = self.client.get_and_watch("/fo-1") |
2534 | + yield g_d |
2535 | + |
2536 | + c_d, c_w_d = self.client.get_children_and_watch("/") |
2537 | + yield g_d |
2538 | + |
2539 | + e_d, e_w_d = self.client.get_children_and_watch("/fo-2") |
2540 | + yield e_d |
2541 | + |
2542 | + # Expire the session |
2543 | + yield self.expire_session() |
2544 | + |
2545 | + # Poof |
2546 | + |
2547 | + # Ephemerals back |
2548 | + c, s = yield self.client.get("/fo-1") |
2549 | + self.assertEqual(c, "abc") |
2550 | + |
2551 | + c, s = yield self.client.get("/fo-2") |
2552 | + self.assertEqual(c, "def") |
2553 | + |
2554 | + # Watches triggered |
2555 | + yield DeferredList( |
2556 | + [g_w_d, c_w_d, e_w_d], |
2557 | + fireOnOneErrback=True, consumeErrors=True) |
2558 | + |
2559 | + h = self.client.handle |
2560 | + self.assertEqual( |
2561 | + [str(d.result) for d in (g_w_d, c_w_d, e_w_d)], |
2562 | + ["<ClientEvent session at '/fo-1' state: connected handle:%d>" % h, |
2563 | + "<ClientEvent session at '/' state: connected handle:%d>" % h, |
2564 | + "<ClientEvent session at '/fo-2' state: connected handle:%d>" % h |
2565 | + ]) |
2566 | + |
2567 | + @inlineCallbacks |
2568 | + def test_ephemeral_no_track_sequence_nodes(self): |
2569 | + """ Ephemeral tracking ignores sequence nodes. |
2570 | + """ |
2571 | + yield self.client.create("/music", "abc") |
2572 | + yield self.client.create( |
2573 | + "/music/u2-", "abc", |
2574 | + flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE) |
2575 | + yield self.expire_session() |
2576 | + children = yield self.client.get_children("/music") |
2577 | + self.assertEqual(children, []) |
2578 | + |
2579 | + @inlineCallbacks |
2580 | + def test_ephemeral_content_modification(self): |
2581 | + yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
2582 | + yield self.client.set("/fo-1", "def") |
2583 | + yield self.expire_session() |
2584 | + c, s = yield self.client.get("/fo-1") |
2585 | + self.assertEqual(c, "def") |
2586 | + |
2587 | + @inlineCallbacks |
2588 | + def test_ephemeral_acl_modification(self): |
2589 | + yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
2590 | + acl = [test_client.PUBLIC_ACL, |
2591 | + dict(scheme="digest", |
2592 | + id="zebra:moon", |
2593 | + perms=zookeeper.PERM_ALL)] |
2594 | + yield self.client.set_acl("/fo-1", acl) |
2595 | + yield self.expire_session() |
2596 | + n_acl, stat = yield self.client.get_acl("/fo-1") |
2597 | + self.assertEqual(acl, n_acl) |
2598 | + |
2599 | + @inlineCallbacks |
2600 | + def test_ephemeral_deletion(self): |
2601 | + yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) |
2602 | + yield self.client.delete("/fo-1") |
2603 | + yield self.expire_session() |
2604 | + self.assertFalse((yield self.client.exists("/fo-1"))) |
2605 | |
2606 | === modified file 'txzookeeper/tests/test_node.py' |
2607 | --- txzookeeper/tests/test_node.py 2011-06-30 17:14:06 +0000 |
2608 | +++ txzookeeper/tests/test_node.py 2013-05-15 13:35:05 +0000 |
2609 | @@ -1,8 +1,11 @@ |
2610 | # |
2611 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
2612 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
2613 | # |
2614 | # This file is part of txzookeeper. |
2615 | # |
2616 | +# Authors: |
2617 | +# Kapil Thangavelu |
2618 | +# |
2619 | # txzookeeper is free software: you can redistribute it and/or modify |
2620 | # it under the terms of the GNU Lesser General Public License as published by |
2621 | # the Free Software Foundation, either version 3 of the License, or |
2622 | @@ -227,8 +230,8 @@ |
2623 | """ |
2624 | node = ZNode("/zoo/elephant", self.client) |
2625 | exists, watch = yield node.exists_and_watch() |
2626 | - |
2627 | - self.client.create("/zoo/elephant") |
2628 | + self.assertFalse((yield exists)) |
2629 | + yield self.client.create("/zoo/elephant") |
2630 | event = yield watch |
2631 | self.assertEqual(event.type, zookeeper.CREATED_EVENT) |
2632 | self.assertEqual(event.path, node.path) |
2633 | |
2634 | === modified file 'txzookeeper/tests/test_queue.py' |
2635 | --- txzookeeper/tests/test_queue.py 2011-06-30 17:14:06 +0000 |
2636 | +++ txzookeeper/tests/test_queue.py 2013-05-15 13:35:05 +0000 |
2637 | @@ -1,8 +1,11 @@ |
2638 | # |
2639 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
2640 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
2641 | # |
2642 | # This file is part of txzookeeper. |
2643 | # |
2644 | +# Authors: |
2645 | +# Kapil Thangavelu |
2646 | +# |
2647 | # txzookeeper is free software: you can redistribute it and/or modify |
2648 | # it under the terms of the GNU Lesser General Public License as published by |
2649 | # the Free Software Foundation, either version 3 of the License, or |
2650 | @@ -17,11 +20,11 @@ |
2651 | # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
2652 | # |
2653 | |
2654 | - |
2655 | from zookeeper import NoNodeException |
2656 | from twisted.internet.defer import ( |
2657 | inlineCallbacks, returnValue, DeferredList, Deferred, succeed, fail) |
2658 | |
2659 | + |
2660 | from txzookeeper import ZookeeperClient |
2661 | from txzookeeper.client import NotConnectedException |
2662 | from txzookeeper.queue import Queue, ReliableQueue, SerializedQueue, QueueItem |
2663 | |
2664 | === added file 'txzookeeper/tests/test_retry.py' |
2665 | --- txzookeeper/tests/test_retry.py 1970-01-01 00:00:00 +0000 |
2666 | +++ txzookeeper/tests/test_retry.py 2013-05-15 13:35:05 +0000 |
2667 | @@ -0,0 +1,332 @@ |
2668 | +# |
2669 | +# Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved |
2670 | +# |
2671 | +# This file is part of txzookeeper. |
2672 | +# |
2673 | +# Authors: |
2674 | +# Kapil Thangavelu |
2675 | +# |
2676 | +# txzookeeper is free software: you can redistribute it and/or modify |
2677 | +# it under the terms of the GNU Lesser General Public License as published by |
2678 | +# the Free Software Foundation, either version 3 of the License, or |
2679 | +# (at your option) any later version. |
2680 | +# |
2681 | +# txzookeeper is distributed in the hope that it will be useful, |
2682 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
2683 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
2684 | +# GNU Lesser General Public License for more details. |
2685 | +# |
2686 | +# You should have received a copy of the GNU Lesser General Public License |
2687 | +# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
2688 | +# |
2689 | + |
2690 | +import json |
2691 | +import time |
2692 | +import zookeeper |
2693 | + |
2694 | +from twisted.internet.defer import inlineCallbacks, fail, succeed, Deferred |
2695 | + |
2696 | +from txzookeeper.client import ZookeeperClient |
2697 | +from txzookeeper.retry import ( |
2698 | + RetryClient, retry, retry_watch, |
2699 | + check_retryable, is_retryable, get_delay, sleep) |
2700 | + |
2701 | +from txzookeeper import retry as retry_module |
2702 | + |
2703 | + |
2704 | +from txzookeeper.utils import retry_change |
2705 | + |
2706 | +from txzookeeper.tests import ZookeeperTestCase, utils |
2707 | +from txzookeeper.tests.proxy import ProxyFactory |
2708 | +from txzookeeper.tests import test_client |
2709 | + |
2710 | + |
2711 | +class RetryCoreTests(ZookeeperTestCase): |
2712 | + """Test the retry functions in isolation. |
2713 | + """ |
2714 | + timeout = 5 |
2715 | + |
2716 | + def test_is_retryable(self): |
2717 | + self.assertEqual( |
2718 | + is_retryable(zookeeper.SessionExpiredException()), False) |
2719 | + self.assertEqual( |
2720 | + is_retryable(zookeeper.ConnectionLossException()), True) |
2721 | + self.assertEqual( |
2722 | + is_retryable(zookeeper.OperationTimeoutException()), True) |
2723 | + self.assertEqual( |
2724 | + is_retryable(TypeError()), False) |
2725 | + |
2726 | + def setup_always_retryable(self): |
2727 | + def check_retry(*args): |
2728 | + return True |
2729 | + |
2730 | + self.patch(retry_module, "check_retryable", check_retry) |
2731 | + |
2732 | + @inlineCallbacks |
2733 | + def test_sleep(self): |
2734 | + t = time.time() |
2735 | + yield sleep(0.5) |
2736 | + self.assertTrue(time.time() - t > 0.5) |
2737 | + |
2738 | + @inlineCallbacks |
2739 | + def test_retry_function(self): |
2740 | + """The retry wrapper can be used for a function.""" |
2741 | + self.setup_always_retryable() |
2742 | + |
2743 | + results = [fail(zookeeper.ConnectionLossException()), |
2744 | + fail(zookeeper.ConnectionLossException()), |
2745 | + succeed(21)] |
2746 | + |
2747 | + def original(zebra): |
2748 | + """Hello World""" |
2749 | + return results.pop(0) |
2750 | + |
2751 | + result = yield retry(ZookeeperClient(), original, "magic") |
2752 | + self.assertEqual(result, 21) |
2753 | + |
2754 | + @inlineCallbacks |
2755 | + def test_retry_method(self): |
2756 | + """The retry wrapper can be used for a method.""" |
2757 | + self.setup_always_retryable() |
2758 | + |
2759 | + results = [fail(zookeeper.ConnectionLossException()), |
2760 | + fail(zookeeper.ConnectionLossException()), |
2761 | + succeed(21)] |
2762 | + |
2763 | + class Foobar(object): |
2764 | + |
2765 | + def original(self, zebra): |
2766 | + """Hello World""" |
2767 | + return results.pop(0) |
2768 | + |
2769 | + client = ZookeeperClient() |
2770 | + foo = Foobar() |
2771 | + result = yield retry(client, foo.original, "magic") |
2772 | + self.assertEqual(result, 21) |
2773 | + |
2774 | + @inlineCallbacks |
2775 | + def test_retry_watch_function(self): |
2776 | + self.setup_always_retryable() |
2777 | + |
2778 | + results = [(fail(zookeeper.ConnectionLossException()), Deferred()), |
2779 | + (fail(zookeeper.ConnectionLossException()), Deferred()), |
2780 | + (succeed(21), succeed(22))] |
2781 | + |
2782 | + def original(zebra): |
2783 | + """Hello World""" |
2784 | + return results.pop(0) |
2785 | + |
2786 | + client = ZookeeperClient() |
2787 | + value_d, watch_d = retry_watch(client, original, "magic") |
2788 | + self.assertEqual((yield value_d), 21) |
2789 | + self.assertEqual((yield watch_d), 22) |
2790 | + |
2791 | + def test_check_retryable(self): |
2792 | + unrecoverable_errors = [ |
2793 | + zookeeper.ApiErrorException(), |
2794 | + zookeeper.NoAuthException(), |
2795 | + zookeeper.NodeExistsException(), |
2796 | + zookeeper.SessionExpiredException(), |
2797 | + ] |
2798 | + |
2799 | + class _Conn(object): |
2800 | + def __init__(self, **kw): |
2801 | + self.__dict__.update(kw) |
2802 | + |
2803 | + error = zookeeper.ConnectionLossException() |
2804 | + conn = _Conn(connected=True, unrecoverable=False) |
2805 | + max_time = time.time() + 10 |
2806 | + |
2807 | + for e in unrecoverable_errors: |
2808 | + self.assertFalse(check_retryable(_Conn(), max_time, e)) |
2809 | + |
2810 | + self.assertTrue(check_retryable(conn, max_time, error)) |
2811 | + self.assertFalse(check_retryable(conn, time.time() - 10, error)) |
2812 | + self.assertFalse(check_retryable( |
2813 | + _Conn(connected=False, unrecoverable=False), max_time, error)) |
2814 | + self.assertFalse(check_retryable( |
2815 | + _Conn(connected=True, unrecoverable=True), max_time, error)) |
2816 | + |
2817 | + def test_get_delay(self): |
2818 | + # Delay currently set to ~1/3 of session time. |
2819 | + |
2820 | + # Verify max value is respected |
2821 | + self.assertEqual(get_delay(25000, 10), 7.5) |
2822 | + # Verify normal calculation |
2823 | + self.assertEqual(get_delay(15000, 10, 30), 4.5) |
2824 | + |
2825 | + |
2826 | +class RetryClientTests(test_client.ClientTests): |
2827 | + """Run the full client test suite against the retry facade. |
2828 | + """ |
2829 | + def setUp(self): |
2830 | + super(RetryClientTests, self).setUp() |
2831 | + self.client = RetryClient(ZookeeperClient("127.0.0.1:2181", 3000)) |
2832 | + self.client2 = None |
2833 | + |
2834 | + def tearDown(self): |
2835 | + if self.client.connected: |
2836 | + utils.deleteTree(handle=self.client.handle) |
2837 | + self.client.close() |
2838 | + |
2839 | + if self.client2 and self.client2.connected: |
2840 | + self.client2.close() |
2841 | + |
2842 | + super(RetryClientTests, self).tearDown() |
2843 | + |
2844 | + def test_wb_connect_after_timeout(self): |
2845 | + """white box tests disabled for retryclient.""" |
2846 | + |
2847 | + def test_wb_reconnect_after_timeout_and_close(self): |
2848 | + """white box tests disabled for retryclient.""" |
2849 | + |
2850 | + def test_exists_with_error(self): |
2851 | + """White box tests disabled for retryclient.""" |
2852 | + |
2853 | + |
2854 | +class RetryClientConnectionLossTest(ZookeeperTestCase): |
2855 | + |
2856 | + def setUp(self): |
2857 | + super(RetryClientConnectionLossTest, self).setUp() |
2858 | + |
2859 | + from twisted.internet import reactor |
2860 | + self.proxy = ProxyFactory("127.0.0.1", 2181) |
2861 | + self.proxy_port = reactor.listenTCP(0, self.proxy) |
2862 | + host = self.proxy_port.getHost() |
2863 | + self.proxied_client = RetryClient(ZookeeperClient( |
2864 | + "%s:%s" % (host.host, host.port))) |
2865 | + self.direct_client = ZookeeperClient("127.0.0.1:2181", 3000) |
2866 | + self.session_events = [] |
2867 | + |
2868 | + def session_event_collector(conn, event): |
2869 | + self.session_events.append(event) |
2870 | + |
2871 | + self.proxied_client.set_session_callback(session_event_collector) |
2872 | + return self.direct_client.connect() |
2873 | + |
2874 | + @inlineCallbacks |
2875 | + def tearDown(self): |
2876 | + import zookeeper |
2877 | + zookeeper.set_debug_level(0) |
2878 | + if self.proxied_client.connected: |
2879 | + yield self.proxied_client.close() |
2880 | + if not self.direct_client.connected: |
2881 | + yield self.direct_client.connect() |
2882 | + utils.deleteTree(handle=self.direct_client.handle) |
2883 | + yield self.direct_client.close() |
2884 | + self.proxy.lose_connection() |
2885 | + yield self.proxy_port.stopListening() |
2886 | + |
2887 | + @inlineCallbacks |
2888 | + def test_get_children_and_watch(self): |
2889 | + yield self.proxied_client.connect() |
2890 | + |
2891 | + # Setup tree |
2892 | + cpath = "/test-tree" |
2893 | + yield self.direct_client.create(cpath) |
2894 | + |
2895 | + # Block the request (drops all packets.) |
2896 | + self.proxy.set_blocked(True) |
2897 | + child_d, watch_d = self.proxied_client.get_children_and_watch(cpath) |
2898 | + |
2899 | + # Unblock and disconnect |
2900 | + self.proxy.set_blocked(False) |
2901 | + self.proxy.lose_connection() |
2902 | + |
2903 | + # Call goes through |
2904 | + self.assertEqual((yield child_d), []) |
2905 | + self.assertEqual(len(self.session_events), 2) |
2906 | + |
2907 | + # And we have reconnect events |
2908 | + self.assertEqual(self.session_events[-1].state_name, "connected") |
2909 | + |
2910 | + yield self.direct_client.create(cpath + "/abc") |
2911 | + |
2912 | + # The original watch is still active |
2913 | + yield watch_d |
2914 | + |
2915 | + @inlineCallbacks |
2916 | + def test_exists_and_watch(self): |
2917 | + yield self.proxied_client.connect() |
2918 | + |
2919 | + cpath = "/test-tree" |
2920 | + |
2921 | + # Block the request |
2922 | + self.proxy.set_blocked(True) |
2923 | + exists_d, watch_d = self.proxied_client.exists_and_watch(cpath) |
2924 | + |
2925 | + # Create the node |
2926 | + yield self.direct_client.create(cpath) |
2927 | + |
2928 | + # Unblock and disconnect |
2929 | + self.proxy.set_blocked(False) |
2930 | + self.proxy.lose_connection() |
2931 | + |
2932 | + # Call gets retried, see the latest state |
2933 | + self.assertTrue((yield exists_d)) |
2934 | + self.assertEqual(len(self.session_events), 2) |
2935 | + |
2936 | + # And we have reconnect events |
2937 | + self.assertEqual(self.session_events[-1].state_name, "connected") |
2938 | + |
2939 | + yield self.direct_client.delete(cpath) |
2940 | + |
2941 | + # The original watch is still active |
2942 | + yield watch_d |
2943 | + |
2944 | + @inlineCallbacks |
2945 | + def test_get_and_watch(self): |
2946 | + yield self.proxied_client.connect() |
2947 | + |
2948 | + # Setup tree |
2949 | + cpath = "/test-tree" |
2950 | + yield self.direct_client.create(cpath) |
2951 | + |
2952 | + # Block the request (drops all packets.) |
2953 | + self.proxy.set_blocked(True) |
2954 | + get_d, watch_d = self.proxied_client.get_and_watch(cpath) |
2955 | + |
2956 | + # Unblock and disconnect |
2957 | + self.proxy.set_blocked(False) |
2958 | + self.proxy.lose_connection() |
2959 | + |
2960 | + # Call goes through |
2961 | + content, stat = yield get_d |
2962 | + self.assertEqual(content, '') |
2963 | + self.assertEqual(len(self.session_events), 2) |
2964 | + |
2965 | + # And we have reconnect events |
2966 | + self.assertEqual(self.session_events[-1].state_name, "connected") |
2967 | + |
2968 | + yield self.direct_client.delete(cpath) |
2969 | + |
2970 | + # The original watch is still active |
2971 | + yield watch_d |
2972 | + |
2973 | + @inlineCallbacks |
2974 | + def test_set(self): |
2975 | + yield self.proxied_client.connect() |
2976 | + |
2977 | + # Setup tree |
2978 | + cpath = "/test-tree" |
2979 | + yield self.direct_client.create(cpath, json.dumps({"a": 1, "c": 2})) |
2980 | + |
2981 | + def update_node(content, stat): |
2982 | + data = json.loads(content) |
2983 | + data["a"] += 1 |
2984 | + data["b"] = 0 |
2985 | + return json.dumps(data) |
2986 | + |
2987 | + # Block the request (drops all packets.) |
2988 | + self.proxy.set_blocked(True) |
2989 | + mod_d = retry_change(self.proxied_client, cpath, update_node) |
2990 | + |
2991 | + # Unblock and disconnect |
2992 | + self.proxy.set_blocked(False) |
2993 | + self.proxy.lose_connection() |
2994 | + |
2995 | + # Call goes through, contents verified. |
2996 | + yield mod_d |
2997 | + content, stat = yield self.direct_client.get(cpath) |
2998 | + self.assertEqual(json.loads(content), |
2999 | + {"a": 2, "b": 0, "c": 2}) |
3000 | |
3001 | === modified file 'txzookeeper/tests/test_security.py' |
3002 | --- txzookeeper/tests/test_security.py 2011-06-30 17:14:06 +0000 |
3003 | +++ txzookeeper/tests/test_security.py 2013-05-15 13:35:05 +0000 |
3004 | @@ -1,8 +1,11 @@ |
3005 | # |
3006 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
3007 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
3008 | # |
3009 | # This file is part of txzookeeper. |
3010 | # |
3011 | +# Authors: |
3012 | +# Kapil Thangavelu |
3013 | +# |
3014 | # txzookeeper is free software: you can redistribute it and/or modify |
3015 | # it under the terms of the GNU Lesser General Public License as published by |
3016 | # the Free Software Foundation, either version 3 of the License, or |
3017 | |
3018 | === modified file 'txzookeeper/tests/test_session.py' |
3019 | --- txzookeeper/tests/test_session.py 2011-06-30 20:16:17 +0000 |
3020 | +++ txzookeeper/tests/test_session.py 2013-05-15 13:35:05 +0000 |
3021 | @@ -1,21 +1,46 @@ |
3022 | +# |
3023 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
3024 | +# |
3025 | +# This file is part of txzookeeper. |
3026 | +# |
3027 | +# Authors: |
3028 | +# Kapil Thangavelu |
3029 | +# |
3030 | +# txzookeeper is free software: you can redistribute it and/or modify |
3031 | +# it under the terms of the GNU Lesser General Public License as published by |
3032 | +# the Free Software Foundation, either version 3 of the License, or |
3033 | +# (at your option) any later version. |
3034 | +# |
3035 | +# txzookeeper is distributed in the hope that it will be useful, |
3036 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
3037 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
3038 | +# GNU Lesser General Public License for more details. |
3039 | +# |
3040 | +# You should have received a copy of the GNU Lesser General Public License |
3041 | +# along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
3042 | +# |
3043 | |
3044 | import atexit |
3045 | +import logging |
3046 | import os |
3047 | |
3048 | import zookeeper |
3049 | |
3050 | -from twisted.internet import reactor |
3051 | -from twisted.internet.defer import inlineCallbacks, Deferred, returnValue |
3052 | +from twisted.internet.defer import ( |
3053 | + inlineCallbacks, Deferred, DeferredList, returnValue) |
3054 | |
3055 | from txzookeeper import ZookeeperClient |
3056 | from txzookeeper.client import NotConnectedException, ConnectionException |
3057 | |
3058 | from txzookeeper.tests.common import ZookeeperCluster |
3059 | from txzookeeper.tests import ZookeeperTestCase |
3060 | +from txzookeeper import managed |
3061 | |
3062 | |
3063 | ZK_HOME = os.environ.get("ZOOKEEPER_PATH") |
3064 | -assert ZK_HOME, "ZOOKEEPER_PATH environment variable must be defined" |
3065 | +assert ZK_HOME, ( |
3066 | + "ZOOKEEPER_PATH environment variable must be defined.\n " |
3067 | + "For deb package installations this is /usr/share/java") |
3068 | |
3069 | CLUSTER = ZookeeperCluster(ZK_HOME) |
3070 | atexit.register(lambda cluster: cluster.terminate(), CLUSTER) |
3071 | @@ -31,18 +56,16 @@ |
3072 | zookeeper.deterministic_conn_order(True) |
3073 | zookeeper.set_debug_level(0) |
3074 | |
3075 | - def sleep(self, delay): |
3076 | - """Non-blocking sleep.""" |
3077 | - deferred = Deferred() |
3078 | - reactor.callLater(delay, deferred.callback, None) |
3079 | - return deferred |
3080 | - |
3081 | @property |
3082 | def cluster(self): |
3083 | return CLUSTER |
3084 | |
3085 | def tearDown(self): |
3086 | super(ClientSessionTests, self).tearDown() |
3087 | + if self.client: |
3088 | + self.client.close() |
3089 | + if self.client2: |
3090 | + self.client2.close() |
3091 | self.cluster.reset() |
3092 | |
3093 | @inlineCallbacks |
3094 | @@ -118,11 +141,18 @@ |
3095 | """A callback can be specified for connection errors. |
3096 | |
3097 | We can specify a callback for connection errors, that |
3098 | - can perform recovery for a disconnected client, restablishing |
3099 | + can perform recovery for a disconnected client. |
3100 | """ |
3101 | @inlineCallbacks |
3102 | def connection_error_handler(connection, error): |
3103 | + # Moved management of this connection attribute out of the |
3104 | + # default behavior for a connection exception, to support |
3105 | + # the retry facade. Under the hood libzk is going to be |
3106 | + # trying to transparently reconnect |
3107 | + connection.connected = False |
3108 | + |
3109 | # On loss of the connection, reconnect the client w/ same session. |
3110 | + connection.close() |
3111 | yield connection.connect( |
3112 | self.cluster[1].address, client_id=connection.client_id) |
3113 | returnValue(23) |
3114 | @@ -157,7 +187,7 @@ |
3115 | |
3116 | def session_event_callback(connection, e): |
3117 | session_events.append(e) |
3118 | - if len(session_events) == 4: |
3119 | + if len(session_events) == 8: |
3120 | events_received.callback(True) |
3121 | |
3122 | # Connect to a node in the cluster and establish a watch |
3123 | @@ -165,11 +195,18 @@ |
3124 | self.client.set_session_callback(session_event_callback) |
3125 | yield self.client.connect() |
3126 | |
3127 | - child_d, watch_d = self.client.get_children_and_watch("/") |
3128 | - yield child_d |
3129 | + # Setup some watches to verify they are cleaned out on expiration. |
3130 | + d, e_watch_d = self.client.exists_and_watch("/") |
3131 | + yield d |
3132 | + |
3133 | + d, g_watch_d = self.client.get_and_watch("/") |
3134 | + yield d |
3135 | + |
3136 | + d, c_watch_d = self.client.get_children_and_watch("/") |
3137 | + yield d |
3138 | |
3139 | # Connect a client to the same session on a different node. |
3140 | - self.client2 = ZookeeperClient(self.cluster[0].address) |
3141 | + self.client2 = ZookeeperClient(self.cluster[1].address) |
3142 | yield self.client2.connect(client_id=self.client.client_id) |
3143 | |
3144 | # Close the new client and wait for the event propogation |
3145 | @@ -177,8 +214,11 @@ |
3146 | |
3147 | # It can take some time for this to propagate |
3148 | yield events_received |
3149 | - self.assertEqual(len(session_events), 4) |
3150 | - self.assertEqual(session_events[-1].state_name, "expired") |
3151 | + self.assertEqual(len(session_events), 8) |
3152 | + |
3153 | + # The last four (conn + 3 watches) are all expired |
3154 | + for evt in session_events[4:]: |
3155 | + self.assertEqual(evt.state_name, "expired") |
3156 | |
3157 | # The connection is dead without reconnecting. |
3158 | yield self.assertFailure( |
3159 | @@ -186,9 +226,17 @@ |
3160 | NotConnectedException, ConnectionException) |
3161 | |
3162 | self.assertTrue(self.client.unrecoverable) |
3163 | + yield self.assertFailure(e_watch_d, zookeeper.SessionExpiredException) |
3164 | + yield self.assertFailure(g_watch_d, zookeeper.SessionExpiredException) |
3165 | + yield self.assertFailure(c_watch_d, zookeeper.SessionExpiredException) |
3166 | |
3167 | # If a reconnect attempt is made with a dead session id |
3168 | - #yield self.client.connect(client_id=self.client.client_id) |
3169 | + client_id = self.client.client_id |
3170 | + self.client.close() # Free the handle |
3171 | + yield self.client.connect(client_id=client_id) |
3172 | + yield self.assertFailure( |
3173 | + self.client.get_children("/"), |
3174 | + NotConnectedException, ConnectionException) |
3175 | |
3176 | test_client_session_expiration_event.timeout = 10 |
3177 | |
3178 | @@ -210,13 +258,13 @@ |
3179 | session_events.append(e) |
3180 | |
3181 | # Connect to a node in the cluster and establish a watch |
3182 | - self.client = ZookeeperClient(self.cluster[2].address) |
3183 | + self.client = ZookeeperClient(self.cluster[2].address, |
3184 | + session_timeout=5000) |
3185 | self.client.set_session_callback(session_event_callback) |
3186 | yield self.client.connect() |
3187 | |
3188 | yield self.client.create("/hello", flags=zookeeper.EPHEMERAL) |
3189 | - exists_d, watch_d = self.client.exists_and_watch("/hello") |
3190 | - yield exists_d |
3191 | + self.assertTrue((yield self.client.exists("/hello"))) |
3192 | |
3193 | # Shutdown the server the client is connected to |
3194 | self.cluster[2].stop() |
3195 | @@ -243,3 +291,46 @@ |
3196 | # Ephemeral is destroyed when the session closed. |
3197 | exists = yield self.client.exists("/hello") |
3198 | self.assertFalse(exists) |
3199 | + |
3200 | + @inlineCallbacks |
3201 | + def test_managed_client_backoff(self): |
3202 | + output = self.capture_log(level=logging.DEBUG) |
3203 | + self.patch(managed, 'BACKOFF_INCREMENT', 2) |
3204 | + self.client = yield managed.ManagedClient( |
3205 | + self.cluster[0].address, |
3206 | + connect_timeout=4).connect() |
3207 | + self.client2 = yield ZookeeperClient(self.cluster[1].address).connect() |
3208 | + |
3209 | + exists_d, watch_d = self.client.exists_and_watch("/hello") |
3210 | + yield exists_d |
3211 | + |
3212 | + yield self.client2.create("/hello", "world") |
3213 | + yield self.client2.close() |
3214 | + |
3215 | + self.cluster[0].stop() |
3216 | + yield self.sleep(1) |
3217 | + |
3218 | + # Try to do something with the connection while its down. |
3219 | + ops = [] |
3220 | + ops.append(self.client.create('/abc', 'test')) |
3221 | + ops.append(self.client.get("/hello")) |
3222 | + ops.append(self.client.get_children("/")) |
3223 | + ops.append(self.client.set("/hello", "sad")) |
3224 | + |
3225 | + # Sleep and let the session expire, and ensure we're down long enough |
3226 | + # for backoff to trigger. |
3227 | + yield self.sleep(10) |
3228 | + |
3229 | + # Start the cluster and watch things work |
3230 | + self.cluster[0].run() |
3231 | + yield DeferredList( |
3232 | + ops, fireOnOneErrback=True, consumeErrors=True) |
3233 | + |
3234 | + yield watch_d |
3235 | + |
3236 | + # Verify we backed off at least once |
3237 | + self.assertIn("Backing off reconnect", output.getvalue()) |
3238 | + # Verify we only reconnected once |
3239 | + self.assertTrue(output.getvalue().count("Restablished connection"), 1) |
3240 | + |
3241 | + test_managed_client_backoff.timeout = 25 |
3242 | |
3243 | === modified file 'txzookeeper/tests/test_utils.py' |
3244 | --- txzookeeper/tests/test_utils.py 2011-06-30 17:14:06 +0000 |
3245 | +++ txzookeeper/tests/test_utils.py 2013-05-15 13:35:05 +0000 |
3246 | @@ -1,8 +1,11 @@ |
3247 | # |
3248 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
3249 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
3250 | # |
3251 | # This file is part of txzookeeper. |
3252 | # |
3253 | +# Authors: |
3254 | +# Kapil Thangavelu |
3255 | +# |
3256 | # txzookeeper is free software: you can redistribute it and/or modify |
3257 | # it under the terms of the GNU Lesser General Public License as published by |
3258 | # the Free Software Foundation, either version 3 of the License, or |
3259 | |
3260 | === modified file 'txzookeeper/tests/utils.py' |
3261 | --- txzookeeper/tests/utils.py 2011-06-30 17:14:06 +0000 |
3262 | +++ txzookeeper/tests/utils.py 2013-05-15 13:35:05 +0000 |
3263 | @@ -1,8 +1,11 @@ |
3264 | # |
3265 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
3266 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
3267 | # |
3268 | # This file is part of txzookeeper. |
3269 | # |
3270 | +# Authors: |
3271 | +# Kapil Thangavelu |
3272 | +# |
3273 | # txzookeeper is free software: you can redistribute it and/or modify |
3274 | # it under the terms of the GNU Lesser General Public License as published by |
3275 | # the Free Software Foundation, either version 3 of the License, or |
3276 | |
3277 | === modified file 'txzookeeper/todo.txt' |
3278 | --- txzookeeper/todo.txt 2010-07-12 07:42:22 +0000 |
3279 | +++ txzookeeper/todo.txt 2013-05-15 13:35:05 +0000 |
3280 | @@ -1,10 +1,2 @@ |
3281 | |
3282 | -bugs to file upstream |
3283 | - |
3284 | - - you can set acl on a non existant node. |
3285 | - - memory leak every api invocation. [really? need some measurements here] |
3286 | - |
3287 | - observed while trying xtest_get_children_with_watcher |
3288 | - |
3289 | - - segfault if close during completion. |
3290 | - - getting a watch notification when closing a connection, segfaults. |
3291 | +Documentation |
3292 | |
3293 | === modified file 'txzookeeper/utils.py' |
3294 | --- txzookeeper/utils.py 2011-06-17 23:52:34 +0000 |
3295 | +++ txzookeeper/utils.py 2013-05-15 13:35:05 +0000 |
3296 | @@ -1,8 +1,11 @@ |
3297 | # |
3298 | -# Copyright (C) 2010 Canonical Ltd. All Rights Reserved |
3299 | +# Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved |
3300 | # |
3301 | # This file is part of txzookeeper. |
3302 | # |
3303 | +# Authors: |
3304 | +# Kapil Thangavelu |
3305 | +# |
3306 | # txzookeeper is free software: you can redistribute it and/or modify |
3307 | # it under the terms of the GNU Lesser General Public License as published by |
3308 | # the Free Software Foundation, either version 3 of the License, or |
3309 | @@ -17,8 +20,9 @@ |
3310 | # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
3311 | # |
3312 | |
3313 | + |
3314 | import zookeeper |
3315 | -from twisted.internet.defer import inlineCallbacks |
3316 | +from twisted.internet.defer import inlineCallbacks, Deferred |
3317 | |
3318 | |
3319 | @inlineCallbacks |
3320 | @@ -65,3 +69,16 @@ |
3321 | zookeeper.NoNodeException, |
3322 | zookeeper.BadVersionException): |
3323 | pass |
3324 | + |
3325 | + |
3326 | +def sleep(delay): |
3327 | + """Non-blocking sleep. |
3328 | + |
3329 | + :param int delay: time in seconds to sleep. |
3330 | + :return: a Deferred that fires after the desired delay. |
3331 | + :rtype: :class:`twisted.internet.defer.Deferred` |
3332 | + """ |
3333 | + from twisted.internet import reactor |
3334 | + deferred = Deferred() |
3335 | + reactor.callLater(delay, deferred.callback, None) |
3336 | + return deferred |
Reviewers: mp+146938_ code.launchpad. net,
Message:
Please take a look.
Description:
Session management and retry improvements.
Lots of improvements and several bug fixes.
Previously managed clientwas returning the underlying session client
instead
of retry, so retry wasn't enabled for uses trying to catch/store the
client from
connect.
The retry code was using seconds for session timeout instead of
milliseconds.
The retry code wasn't taking account of the start of retry properly when
determining max retry.
The retry on persistent error wasn't signaling to the managed client to
establish a new session.
The client code wasn't properly cleaning up txzk handles on conn
timeouts.
The session establishment now uses the client session events so even
without
activity or active watches the session is restablished.
Session restablishment will now backoff on reconnects for up to 6m.
https:/ /code.launchpad .net/~hazmat/ txzookeeper/ backoff- retry-managed- sanity/ +merge/ 146938
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/7307051/
Affected files: __init_ _.py client. py managed. py queue.py retry.py tests/_ _init__ .py tests/common. py tests/proxy. py tests/test_ client. py tests/test_ conn_failure. py tests/test_ lock.py tests/test_ managed. py tests/test_ node.py tests/test_ queue.py tests/test_ retry.py tests/test_ security. py tests/test_ session. py tests/test_ utils.py tests/utils. py todo.txt utils.py
A .bzrignore
A [revision details]
M debian/changelog
M setup.py
M txzookeeper/
M txzookeeper/
M txzookeeper/lock.py
A txzookeeper/
M txzookeeper/node.py
M txzookeeper/
A txzookeeper/
M txzookeeper/
M txzookeeper/
A txzookeeper/
M txzookeeper/
A txzookeeper/
M txzookeeper/
A txzookeeper/
M txzookeeper/
M txzookeeper/
A txzookeeper/
M txzookeeper/
M txzookeeper/
M txzookeeper/
M txzookeeper/
M txzookeeper/
M txzookeeper/