Merge lp:~hazmat/txzookeeper/managed-watch-and-ephemeral into lp:txzookeeper
- managed-watch-and-ephemeral
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Jim Baker |
Approved revision: | 56 |
Merged at revision: | 47 |
Proposed branch: | lp:~hazmat/txzookeeper/managed-watch-and-ephemeral |
Merge into: | lp:txzookeeper |
Diff against target: |
1024 lines (+716/-53) 9 files modified
.bzrignore (+4/-0) setup.py (+2/-2) txzookeeper/client.py (+63/-40) txzookeeper/managed.py (+350/-0) txzookeeper/retry.py (+10/-9) txzookeeper/tests/__init__.py (+22/-0) txzookeeper/tests/test_client.py (+1/-1) txzookeeper/tests/test_managed.py (+263/-0) txzookeeper/tests/test_retry.py (+1/-1) |
To merge this branch: | bzr merge lp:~hazmat/txzookeeper/managed-watch-and-ephemeral |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Juju Engineering | Pending | ||
Review via email: mp+100715@code.launchpad.net |
Commit message
Description of the change
Managed Connections for txzk
A managed connection automatically handles transient connection failures,
and session expiration. On session expiration it recreates ephemeral nodes,
and fire watch events on extant watchers.
Kapil Thangavelu (hazmat) wrote : | # |
Kapil Thangavelu (hazmat) wrote : | # |
Reviewers: mp+100715_
Message:
Please take a look.
Description:
Managed Connections for txzk
A managed connection automatically handles transient connection
failures,
and session expiration. On session expiration it recreates ephemeral
nodes,
and fire watch events on extant watchers.
https:/
(do not edit description out of merge proposal)
Please review this at https:/
Affected files:
A .bzrignore
A [revision details]
M setup.py
M txzookeeper/
A txzookeeper/
M txzookeeper/
M txzookeeper/
M txzookeeper/
A txzookeeper/
M txzookeeper/
- 55. By Kapil Thangavelu
-
remove unesc. expire handler from retry client
William Reade (fwereade) wrote : | # |
This looks very nice indeed, but the vagueness of the "comes at a cost"
comment makes me wonder what I'm missing...
https:/
File txzookeeper/
https:/
txzookeeper/
Can't you just do:
with self._ctx():
?
https:/
txzookeeper/
Ditto
https:/
txzookeeper/
Please expand ;).
https:/
txzookeeper/
to restablish the session.
I'm pretty sure there are 2 es in re-establish, and that the hyphen may
have fallen out of common use but remains IMO more readable. As you
wish.
https:/
txzookeeper/
ephemerals, and trigger watches.
I like the word "restabslish", but ditto.
https:/
File txzookeeper/
https:/
txzookeeper/
(1/3 session time as ping)
s/propogate/
Kapil Thangavelu (hazmat) wrote : | # |
On 2012/04/04 13:23:29, fwereade wrote:
> This looks very nice indeed, but the vagueness of the "comes at a
cost" comment
> makes me wonder what I'm missing...
expanded greatly what those costs are.
> https:/
> File txzookeeper/
https:/
> txzookeeper/
> Can't you just do:
> with self._ctx():
> ?
much nicer, thanks.
https:/
> txzookeeper/
> Ditto
done.
https:/
> txzookeeper/
> Please expand ;).
done.
https:/
> txzookeeper/
expiration to
> restablish the session.
> I'm pretty sure there are 2 es in re-establish, and that the hyphen
may have
> fallen out of common use but remains IMO more readable. As you wish.
proper speling established ;-)
https:/
> txzookeeper/
ephemerals,
> and trigger watches.
> I like the word "restabslish", but ditto.
https:/
> File txzookeeper/
https:/
> txzookeeper/
propogate (1/3
> session time as ping)
> s/propogate/
done.
Kapil Thangavelu (hazmat) wrote : | # |
Please take a look.
https:/
File txzookeeper/
https:/
txzookeeper/
On 2012/04/04 13:23:29, fwereade wrote:
> Can't you just do:
> with self._ctx():
> ?
thanks, much nicer.
- 56. By Kapil Thangavelu
-
address review comments
Jim Baker (jimbaker) wrote : | # |
+1, LGTM, just some minors
https:/
File txzookeeper/
https:/
txzookeeper/
Presumably this should be cb_re_establish... but I don't see the
difference here with the easier synonym: restore. Good either way with
me.
https:/
txzookeeper/
restablish.
it's... re-establish
(or are you saying something about being restful? ;) )
https:/
txzookeeper/
it's... re-establish
https:/
txzookeeper/
restablish.
It's... re-established... re-establish
https:/
txzookeeper/
I'm not going to mark any more of these!
https:/
txzookeeper/
(e, e))
Another minor: probably want some consistency in beginning with
uppercase. Not certain why you use log.error here, but log.exception
later for what appears to be a similar case.
https:/
File txzookeeper/
https:/
txzookeeper/
client event, and clears watches.
no need for comma
Preview Diff
1 | === added file '.bzrignore' | |||
2 | --- .bzrignore 1970-01-01 00:00:00 +0000 | |||
3 | +++ .bzrignore 2012-04-04 18:12:22 +0000 | |||
4 | @@ -0,0 +1,4 @@ | |||
5 | 1 | .emacs.desktop | ||
6 | 2 | _trial_temp | ||
7 | 3 | txzookeeper | ||
8 | 4 | txzookeeper.egg-info | ||
9 | 0 | 5 | ||
10 | === modified file 'setup.py' | |||
11 | --- setup.py 2011-12-12 13:19:28 +0000 | |||
12 | +++ setup.py 2012-04-04 18:12:22 +0000 | |||
13 | @@ -45,8 +45,8 @@ | |||
14 | 45 | name="txzookeeper", | 45 | name="txzookeeper", |
15 | 46 | version=version, | 46 | version=version, |
16 | 47 | description="Twisted api for Apache Zookeeper", | 47 | description="Twisted api for Apache Zookeeper", |
19 | 48 | author="Ensemble Developers", | 48 | author="Juju Developers", |
20 | 49 | author_email="ensemble@lists.ubuntu.com", | 49 | author_email="juju@lists.ubuntu.com", |
21 | 50 | url="https://launchpad.net/txzookeeper", | 50 | url="https://launchpad.net/txzookeeper", |
22 | 51 | license="LGPL", | 51 | license="LGPL", |
23 | 52 | packages=find_packages(), | 52 | packages=find_packages(), |
24 | 53 | 53 | ||
25 | === modified file 'txzookeeper/client.py' | |||
26 | --- txzookeeper/client.py 2011-12-06 21:21:29 +0000 | |||
27 | +++ txzookeeper/client.py 2012-04-04 18:12:22 +0000 | |||
28 | @@ -96,15 +96,20 @@ | |||
29 | 96 | 96 | ||
30 | 97 | @property | 97 | @property |
31 | 98 | def state_name(self): | 98 | def state_name(self): |
33 | 99 | return STATE_NAME_MAPPING[self.args[2]] | 99 | return STATE_NAME_MAPPING.get(self.args[2], "unknown") |
34 | 100 | 100 | ||
35 | 101 | @property | 101 | @property |
36 | 102 | def type_name(self): | 102 | def type_name(self): |
38 | 103 | return TYPE_NAME_MAPPING[self.args[1]] | 103 | return TYPE_NAME_MAPPING.get(self.args[1], "unknown") |
39 | 104 | |||
40 | 105 | @property | ||
41 | 106 | def handle(self): | ||
42 | 107 | return self.args[3] | ||
43 | 104 | 108 | ||
44 | 105 | def __str__(self): | 109 | def __str__(self): |
47 | 106 | return "<txzookeeper.ConnectionException type: %s state: %s>" % ( | 110 | return ( |
48 | 107 | self.type_name, self.state_name) | 111 | "<txzookeeper.ConnectionException handle: %s type: %s state: %s>" |
49 | 112 | % (self.handle, self.type_name, self.state_name)) | ||
50 | 108 | 113 | ||
51 | 109 | 114 | ||
52 | 110 | def is_connection_exception(e): | 115 | def is_connection_exception(e): |
53 | @@ -133,11 +138,11 @@ | |||
54 | 133 | 138 | ||
55 | 134 | @property | 139 | @property |
56 | 135 | def type_name(self): | 140 | def type_name(self): |
58 | 136 | return TYPE_NAME_MAPPING[self.type] | 141 | return TYPE_NAME_MAPPING.get(self.type, "unknown") |
59 | 137 | 142 | ||
60 | 138 | @property | 143 | @property |
61 | 139 | def state_name(self): | 144 | def state_name(self): |
63 | 140 | return STATE_NAME_MAPPING[self.connection_state] | 145 | return STATE_NAME_MAPPING.get(self.connection_state, "unknown") |
64 | 141 | 146 | ||
65 | 142 | def __repr__(self): | 147 | def __repr__(self): |
66 | 143 | return "<ClientEvent %s at %r state: %s>" % ( | 148 | return "<ClientEvent %s at %r state: %s>" % ( |
67 | @@ -169,6 +174,10 @@ | |||
68 | 169 | self.connected = False | 174 | self.connected = False |
69 | 170 | self.handle = None | 175 | self.handle = None |
70 | 171 | 176 | ||
71 | 177 | def __repr__(self): | ||
72 | 178 | return "<txZookeeperClient client: %s handle: %r state: %s>" % ( | ||
73 | 179 | self.client_id[0], self.handle, self.state) | ||
74 | 180 | |||
75 | 172 | def _check_connected(self, d): | 181 | def _check_connected(self, d): |
76 | 173 | if not self.connected: | 182 | if not self.connected: |
77 | 174 | d.errback(NotConnectedException("not connected")) | 183 | d.errback(NotConnectedException("not connected")) |
78 | @@ -218,7 +227,7 @@ | |||
79 | 218 | d.callback((value, stat)) | 227 | d.callback((value, stat)) |
80 | 219 | 228 | ||
81 | 220 | callback = self._zk_thread_callback(_cb_get) | 229 | callback = self._zk_thread_callback(_cb_get) |
83 | 221 | watcher = self._wrap_watcher(watcher) | 230 | watcher = self._wrap_watcher(watcher, "get", path) |
84 | 222 | result = zookeeper.aget(self.handle, path, watcher, callback) | 231 | result = zookeeper.aget(self.handle, path, watcher, callback) |
85 | 223 | self._check_result(result, d) | 232 | self._check_result(result, d) |
86 | 224 | return d | 233 | return d |
87 | @@ -234,7 +243,7 @@ | |||
88 | 234 | d.callback(children) | 243 | d.callback(children) |
89 | 235 | 244 | ||
90 | 236 | callback = self._zk_thread_callback(_cb_get_children) | 245 | callback = self._zk_thread_callback(_cb_get_children) |
92 | 237 | watcher = self._wrap_watcher(watcher) | 246 | watcher = self._wrap_watcher(watcher, "child", path) |
93 | 238 | result = zookeeper.aget_children(self.handle, path, watcher, callback) | 247 | result = zookeeper.aget_children(self.handle, path, watcher, callback) |
94 | 239 | self._check_result(result, d) | 248 | self._check_result(result, d) |
95 | 240 | return d | 249 | return d |
96 | @@ -251,12 +260,12 @@ | |||
97 | 251 | d.callback(stat) | 260 | d.callback(stat) |
98 | 252 | 261 | ||
99 | 253 | callback = self._zk_thread_callback(_cb_exists) | 262 | callback = self._zk_thread_callback(_cb_exists) |
101 | 254 | watcher = self._wrap_watcher(watcher) | 263 | watcher = self._wrap_watcher(watcher, "exists", path) |
102 | 255 | result = zookeeper.aexists(self.handle, path, watcher, callback) | 264 | result = zookeeper.aexists(self.handle, path, watcher, callback) |
103 | 256 | self._check_result(result, d) | 265 | self._check_result(result, d) |
104 | 257 | return d | 266 | return d |
105 | 258 | 267 | ||
107 | 259 | def _wrap_watcher(self, watcher): | 268 | def _wrap_watcher(self, watcher, watch_type, path): |
108 | 260 | if watcher is None: | 269 | if watcher is None: |
109 | 261 | return watcher | 270 | return watcher |
110 | 262 | if not callable(watcher): | 271 | if not callable(watcher): |
111 | @@ -285,14 +294,19 @@ | |||
112 | 285 | else: | 294 | else: |
113 | 286 | return watcher(event_type, conn_state, path) | 295 | return watcher(event_type, conn_state, path) |
114 | 287 | 296 | ||
116 | 288 | def _zk_thread_callback(self, func): | 297 | def _zk_thread_callback(self, func, *f_args, **f_kw): |
117 | 289 | """ | 298 | """ |
118 | 290 | The client library invokes callbacks in a separate thread, we wrap | 299 | The client library invokes callbacks in a separate thread, we wrap |
119 | 291 | any user defined callback so that they are called back in the main | 300 | any user defined callback so that they are called back in the main |
120 | 292 | thread after, zookeeper calls the wrapper. | 301 | thread after, zookeeper calls the wrapper. |
121 | 293 | """ | 302 | """ |
122 | 303 | f_args = list(f_args) | ||
123 | 304 | |||
124 | 294 | def wrapper(handle, *args): # pragma: no cover | 305 | def wrapper(handle, *args): # pragma: no cover |
126 | 295 | reactor.callFromThread(func, *args) | 306 | # make a copy the conn watch callback gets invoked multiple times |
127 | 307 | cb_args = list(f_args) | ||
128 | 308 | cb_args.extend(args) | ||
129 | 309 | reactor.callFromThread(func, *cb_args, **f_kw) | ||
130 | 296 | return wrapper | 310 | return wrapper |
131 | 297 | 311 | ||
132 | 298 | @property | 312 | @property |
133 | @@ -391,6 +405,7 @@ | |||
134 | 391 | d = defer.Deferred() | 405 | d = defer.Deferred() |
135 | 392 | if self._check_result(result, d): | 406 | if self._check_result(result, d): |
136 | 393 | return d | 407 | return d |
137 | 408 | self.handle = None | ||
138 | 394 | d.callback(True) | 409 | d.callback(True) |
139 | 395 | return d | 410 | return d |
140 | 396 | 411 | ||
141 | @@ -437,7 +452,6 @@ | |||
142 | 437 | else: | 452 | else: |
143 | 438 | self.handle = zookeeper.init( | 453 | self.handle = zookeeper.init( |
144 | 439 | self._servers, callback, self._session_timeout) | 454 | self._servers, callback, self._session_timeout) |
145 | 440 | |||
146 | 441 | return d | 455 | return d |
147 | 442 | 456 | ||
148 | 443 | def _cb_connected( | 457 | def _cb_connected( |
149 | @@ -460,6 +474,7 @@ | |||
150 | 460 | # If we timed out and then connected, then close the conn. | 474 | # If we timed out and then connected, then close the conn. |
151 | 461 | if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called: | 475 | if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called: |
152 | 462 | self.close() | 476 | self.close() |
153 | 477 | return | ||
154 | 463 | 478 | ||
155 | 464 | # Send session events to the callback, in addition to any | 479 | # Send session events to the callback, in addition to any |
156 | 465 | # duplicate session events that will be sent for extant watches. | 480 | # duplicate session events that will be sent for extant watches. |
157 | @@ -468,7 +483,10 @@ | |||
158 | 468 | self, ClientEvent(type, state, path)) | 483 | self, ClientEvent(type, state, path)) |
159 | 469 | 484 | ||
160 | 470 | return | 485 | return |
162 | 471 | elif state == zookeeper.CONNECTED_STATE: | 486 | # Connected successfully, or If we're expired on an initial |
163 | 487 | # connect, someone else expired us. | ||
164 | 488 | elif state in (zookeeper.CONNECTED_STATE, | ||
165 | 489 | zookeeper.EXPIRED_SESSION_STATE): | ||
166 | 472 | connect_deferred.callback(self) | 490 | connect_deferred.callback(self) |
167 | 473 | return | 491 | return |
168 | 474 | 492 | ||
169 | @@ -488,17 +506,18 @@ | |||
170 | 488 | if self._check_connected(d): | 506 | if self._check_connected(d): |
171 | 489 | return d | 507 | return d |
172 | 490 | 508 | ||
179 | 491 | def _cb_created(result_code, path): | 509 | callback = self._zk_thread_callback( |
180 | 492 | if self._check_result(result_code, d): | 510 | self._cb_created, d, data, acls, flags) |
175 | 493 | return | ||
176 | 494 | d.callback(path) | ||
177 | 495 | |||
178 | 496 | callback = self._zk_thread_callback(_cb_created) | ||
181 | 497 | result = zookeeper.acreate( | 511 | result = zookeeper.acreate( |
182 | 498 | self.handle, path, data, acls, flags, callback) | 512 | self.handle, path, data, acls, flags, callback) |
183 | 499 | self._check_result(result, d) | 513 | self._check_result(result, d) |
184 | 500 | return d | 514 | return d |
185 | 501 | 515 | ||
186 | 516 | def _cb_created(self, d, data, acls, flags, result_code, path): | ||
187 | 517 | if self._check_result(result_code, d): | ||
188 | 518 | return | ||
189 | 519 | d.callback(path) | ||
190 | 520 | |||
191 | 502 | def delete(self, path, version=-1): | 521 | def delete(self, path, version=-1): |
192 | 503 | """ | 522 | """ |
193 | 504 | Delete the node at the given path. If the current node version on the | 523 | Delete the node at the given path. If the current node version on the |
194 | @@ -513,16 +532,16 @@ | |||
195 | 513 | if self._check_connected(d): | 532 | if self._check_connected(d): |
196 | 514 | return d | 533 | return d |
197 | 515 | 534 | ||
204 | 516 | def _cb_delete(result_code): | 535 | callback = self._zk_thread_callback(self._cb_deleted, d, path) |
199 | 517 | if self._check_result(result_code, d): | ||
200 | 518 | return | ||
201 | 519 | d.callback(result_code) | ||
202 | 520 | |||
203 | 521 | callback = self._zk_thread_callback(_cb_delete) | ||
205 | 522 | result = zookeeper.adelete(self.handle, path, version, callback) | 536 | result = zookeeper.adelete(self.handle, path, version, callback) |
206 | 523 | self._check_result(result, d) | 537 | self._check_result(result, d) |
207 | 524 | return d | 538 | return d |
208 | 525 | 539 | ||
209 | 540 | def _cb_deleted(self, d, path, result_code): | ||
210 | 541 | if self._check_result(result_code, d): | ||
211 | 542 | return | ||
212 | 543 | d.callback(result_code) | ||
213 | 544 | |||
214 | 526 | def exists(self, path): | 545 | def exists(self, path): |
215 | 527 | """ | 546 | """ |
216 | 528 | Check that the given node path exists. Returns a deferred that | 547 | Check that the given node path exists. Returns a deferred that |
217 | @@ -657,17 +676,17 @@ | |||
218 | 657 | if self._check_connected(d): | 676 | if self._check_connected(d): |
219 | 658 | return d | 677 | return d |
220 | 659 | 678 | ||
227 | 660 | def _cb_set_acl(result_code): | 679 | callback = self._zk_thread_callback(self._cb_set_acl, d, path, acls) |
222 | 661 | if self._check_result(result_code, d): | ||
223 | 662 | return | ||
224 | 663 | d.callback(result_code) | ||
225 | 664 | |||
226 | 665 | callback = self._zk_thread_callback(_cb_set_acl) | ||
228 | 666 | result = zookeeper.aset_acl( | 680 | result = zookeeper.aset_acl( |
229 | 667 | self.handle, path, version, acls, callback) | 681 | self.handle, path, version, acls, callback) |
230 | 668 | self._check_result(result, d) | 682 | self._check_result(result, d) |
231 | 669 | return d | 683 | return d |
232 | 670 | 684 | ||
233 | 685 | def _cb_set_acl(self, d, path, acls, result_code): | ||
234 | 686 | if self._check_result(result_code, d): | ||
235 | 687 | return | ||
236 | 688 | d.callback(result_code) | ||
237 | 689 | |||
238 | 671 | def set(self, path, data="", version=-1): | 690 | def set(self, path, data="", version=-1): |
239 | 672 | """ | 691 | """ |
240 | 673 | Sets the data of a node at the given path. If the current node version | 692 | Sets the data of a node at the given path. If the current node version |
241 | @@ -683,16 +702,16 @@ | |||
242 | 683 | if self._check_connected(d): | 702 | if self._check_connected(d): |
243 | 684 | return d | 703 | return d |
244 | 685 | 704 | ||
251 | 686 | def _cb_set(result_code, node_stat): | 705 | callback = self._zk_thread_callback(self._cb_set, d, path, data) |
246 | 687 | if self._check_result(result_code, d): | ||
247 | 688 | return | ||
248 | 689 | d.callback(node_stat) | ||
249 | 690 | |||
250 | 691 | callback = self._zk_thread_callback(_cb_set) | ||
252 | 692 | result = zookeeper.aset(self.handle, path, data, version, callback) | 706 | result = zookeeper.aset(self.handle, path, data, version, callback) |
253 | 693 | self._check_result(result, d) | 707 | self._check_result(result, d) |
254 | 694 | return d | 708 | return d |
255 | 695 | 709 | ||
256 | 710 | def _cb_set(self, d, path, data, result_code, node_stat): | ||
257 | 711 | if self._check_result(result_code, d): | ||
258 | 712 | return | ||
259 | 713 | d.callback(node_stat) | ||
260 | 714 | |||
261 | 696 | def set_connection_watcher(self, watcher): | 715 | def set_connection_watcher(self, watcher): |
262 | 697 | """ | 716 | """ |
263 | 698 | Sets a permanent global watcher on the connection. This will get | 717 | Sets a permanent global watcher on the connection. This will get |
264 | @@ -702,7 +721,7 @@ | |||
265 | 702 | """ | 721 | """ |
266 | 703 | if not callable(watcher): | 722 | if not callable(watcher): |
267 | 704 | raise SyntaxError("Invalid Watcher %r" % (watcher)) | 723 | raise SyntaxError("Invalid Watcher %r" % (watcher)) |
269 | 705 | watcher = self._wrap_watcher(watcher) | 724 | watcher = self._wrap_watcher(watcher, None, None) |
270 | 706 | zookeeper.set_watcher(self.handle, watcher) | 725 | zookeeper.set_watcher(self.handle, watcher) |
271 | 707 | 726 | ||
272 | 708 | def set_session_callback(self, callback): | 727 | def set_session_callback(self, callback): |
273 | @@ -740,9 +759,13 @@ | |||
274 | 740 | """ | 759 | """ |
275 | 741 | if not callable(callback): | 760 | if not callable(callback): |
276 | 742 | raise TypeError("Invalid callback %r" % callback) | 761 | raise TypeError("Invalid callback %r" % callback) |
277 | 762 | if self._connection_error_callback is not None: | ||
278 | 763 | raise RuntimeError(( | ||
279 | 764 | "Connection error handlers can't be changed %s" % | ||
280 | 765 | self._connection_error_callback)) | ||
281 | 743 | self._connection_error_callback = callback | 766 | self._connection_error_callback = callback |
282 | 744 | 767 | ||
284 | 745 | def set_determinstic_order(self, boolean): | 768 | def set_deterministic_order(self, boolean): |
285 | 746 | """ | 769 | """ |
286 | 747 | The zookeeper client will by default randomize the server hosts | 770 | The zookeeper client will by default randomize the server hosts |
287 | 748 | it will connect to unless this is set to True. | 771 | it will connect to unless this is set to True. |
288 | 749 | 772 | ||
289 | === added file 'txzookeeper/managed.py' | |||
290 | --- txzookeeper/managed.py 1970-01-01 00:00:00 +0000 | |||
291 | +++ txzookeeper/managed.py 2012-04-04 18:12:22 +0000 | |||
292 | @@ -0,0 +1,350 @@ | |||
293 | 1 | from functools import partial | ||
294 | 2 | |||
295 | 3 | import contextlib | ||
296 | 4 | import logging | ||
297 | 5 | import zookeeper | ||
298 | 6 | |||
299 | 7 | from twisted.internet.defer import ( | ||
300 | 8 | inlineCallbacks, DeferredLock, fail, returnValue) | ||
301 | 9 | |||
302 | 10 | from client import ZookeeperClient, ClientEvent, NotConnectedException | ||
303 | 11 | from retry import RetryClient | ||
304 | 12 | |||
305 | 13 | |||
306 | 14 | class StopWatcher(Exception): | ||
307 | 15 | pass | ||
308 | 16 | |||
309 | 17 | |||
310 | 18 | WATCH_KIND_MAP = { | ||
311 | 19 | "child": "get_children_and_watch", | ||
312 | 20 | "exists": "exists_and_watch", | ||
313 | 21 | "get": "get_and_watch" | ||
314 | 22 | } | ||
315 | 23 | |||
316 | 24 | |||
317 | 25 | log = logging.getLogger("txzk.managed") | ||
318 | 26 | |||
319 | 27 | |||
320 | 28 | class Watch(object): | ||
321 | 29 | """ | ||
322 | 30 | For application driven persistent watches, where the application | ||
323 | 31 | is manually resetting the watch. | ||
324 | 32 | """ | ||
325 | 33 | |||
326 | 34 | __slots__ = ("_mgr", "_client", "_path", "_kind", "_callback") | ||
327 | 35 | |||
328 | 36 | def __init__(self, mgr, path, kind, callback): | ||
329 | 37 | self._mgr = mgr | ||
330 | 38 | self._path = path | ||
331 | 39 | self._kind = kind | ||
332 | 40 | self._callback = callback | ||
333 | 41 | |||
334 | 42 | @property | ||
335 | 43 | def path(self): | ||
336 | 44 | return self._path | ||
337 | 45 | |||
338 | 46 | @property | ||
339 | 47 | def kind(self): | ||
340 | 48 | return self._kind | ||
341 | 49 | |||
342 | 50 | @contextlib.contextmanager | ||
343 | 51 | def _ctx(self): | ||
344 | 52 | mgr = self._mgr | ||
345 | 53 | del self._mgr | ||
346 | 54 | try: | ||
347 | 55 | yield mgr | ||
348 | 56 | finally: | ||
349 | 57 | mgr.remove(self) | ||
350 | 58 | |||
351 | 59 | @inlineCallbacks | ||
352 | 60 | def reset(self): | ||
353 | 61 | with self._ctx(): | ||
354 | 62 | yield self._callback( | ||
355 | 63 | zookeeper.SESSION_EVENT, | ||
356 | 64 | zookeeper.CONNECTED_STATE, | ||
357 | 65 | self._path) | ||
358 | 66 | |||
359 | 67 | def __call__(self, *args, **kw): | ||
360 | 68 | with self._ctx(): | ||
361 | 69 | return self._callback(*args, **kw) | ||
362 | 70 | |||
363 | 71 | def __str__(self): | ||
364 | 72 | return "<Watcher %s %s %r>" % (self.kind, self.path, self._callback) | ||
365 | 73 | |||
366 | 74 | |||
367 | 75 | class WatchManager(object): | ||
368 | 76 | |||
369 | 77 | watch_class = Watch | ||
370 | 78 | |||
371 | 79 | def __init__(self): | ||
372 | 80 | self._watches = [] | ||
373 | 81 | |||
374 | 82 | def add(self, path, watch_type, watcher): | ||
375 | 83 | w = self.watch_class(self, path, watch_type, watcher) | ||
376 | 84 | self._watches.append(w) | ||
377 | 85 | return w | ||
378 | 86 | |||
379 | 87 | def remove(self, w): | ||
380 | 88 | try: | ||
381 | 89 | self._watches.remove(w) | ||
382 | 90 | except ValueError: | ||
383 | 91 | pass | ||
384 | 92 | |||
385 | 93 | def iterkeys(self): | ||
386 | 94 | for w in self._watches: | ||
387 | 95 | yield (w.path, w.kind) | ||
388 | 96 | |||
389 | 97 | def clear(self): | ||
390 | 98 | del self._watches | ||
391 | 99 | self._watches = [] | ||
392 | 100 | |||
393 | 101 | @inlineCallbacks | ||
394 | 102 | def reset(self, *ignored): | ||
395 | 103 | watches = self._watches | ||
396 | 104 | self._watches = [] | ||
397 | 105 | |||
398 | 106 | for w in watches: | ||
399 | 107 | try: | ||
400 | 108 | yield w.reset() | ||
401 | 109 | except Exception, e: | ||
402 | 110 | log.error("Error reseting watch %s with session event. %s %r", | ||
403 | 111 | w, e, e) | ||
404 | 112 | continue | ||
405 | 113 | |||
406 | 114 | |||
407 | 115 | class SessionClient(ZookeeperClient): | ||
408 | 116 | """A managed client that automatically re-establishes ephemerals and | ||
409 | 117 | triggers watches after reconnecting post session expiration. | ||
410 | 118 | |||
411 | 119 | This abstracts the client from session expiration handling. It does | ||
412 | 120 | come at a cost though. | ||
413 | 121 | |||
414 | 122 | There are two application constraints that need to be considered for usage | ||
415 | 123 | of the SessionClient or ManagedClient. The first is that watch callbacks | ||
416 | 124 | which examine the event, must be able to handle the synthetic session | ||
417 | 125 | event which is sent to them when the session is re-established. | ||
418 | 126 | |||
419 | 127 | The second and more problematic is that algorithms/patterns | ||
420 | 128 | utilizing ephemeral sequence nodes need to be rethought, as the | ||
421 | 129 | session client will recreate the nodes when reconnecting at their | ||
422 | 130 | previous paths. Some algorithms (like the std zk lock recipe) rely | ||
423 | 131 | on properties like the smallest valued ephemeral sequence node in | ||
424 | 132 | a container to identify the lock holder, with the notion that upon | ||
425 | 133 | session expiration a new lock/leader will be sought. Sequence | ||
426 | 134 | ephemeral node recreation in this context is problematic as the | ||
427 | 135 | node is recreated at the exact previous path. Alternative lock | ||
428 | 136 | strategies that do work are fairly simple at low volume, such as | ||
429 | 137 | owning a particular node path (ie. /locks/lock-holder) with an | ||
430 | 138 | ephemeral. | ||
431 | 139 | """ | ||
432 | 140 | |||
433 | 141 | def __init__( | ||
434 | 142 | self, servers=None, session_timeout=None, connect_timeout=4000): | ||
435 | 143 | """ | ||
436 | 144 | """ | ||
437 | 145 | super(SessionClient, self).__init__(servers, session_timeout) | ||
438 | 146 | self._connect_timeout = connect_timeout | ||
439 | 147 | self._watches = WatchManager() | ||
440 | 148 | self._ephemerals = {} | ||
441 | 149 | self._reconnect_lock = DeferredLock() | ||
442 | 150 | self.set_connection_error_callback(self._cb_connection_error) | ||
443 | 151 | |||
444 | 152 | @inlineCallbacks | ||
445 | 153 | def cb_restablish_session(self, e=None): | ||
446 | 154 | """Called on intercept of session expiration to create new session. | ||
447 | 155 | |||
448 | 156 | This will reconnect to zk, re-establish ephemerals, and | ||
449 | 157 | trigger watches. | ||
450 | 158 | """ | ||
451 | 159 | yield self._reconnect_lock.acquire() | ||
452 | 160 | |||
453 | 161 | try: | ||
454 | 162 | # If its been explicitly closed, don't restablish. | ||
455 | 163 | if self.handle is None: | ||
456 | 164 | return | ||
457 | 165 | |||
458 | 166 | # If its a stale handle, don't restablish | ||
459 | 167 | try: | ||
460 | 168 | zookeeper.is_unrecoverable(self.handle) | ||
461 | 169 | except zookeeper.ZooKeeperException: | ||
462 | 170 | if e: | ||
463 | 171 | raise e | ||
464 | 172 | return | ||
465 | 173 | |||
466 | 174 | # Its already been restablished, don't restablish. | ||
467 | 175 | if not self.unrecoverable: | ||
468 | 176 | return | ||
469 | 177 | elif self.connected: | ||
470 | 178 | self.close() | ||
471 | 179 | self.handle = 0 | ||
472 | 180 | elif isinstance(self.handle, int): | ||
473 | 181 | self.handle = 0 | ||
474 | 182 | |||
475 | 183 | # Restablish | ||
476 | 184 | assert self.handle == 0 | ||
477 | 185 | yield self._cb_restablish_session().addErrback( | ||
478 | 186 | self._cb_restablish_errback, e) | ||
479 | 187 | |||
480 | 188 | except Exception, e: | ||
481 | 189 | log.error("error while restablish %r %s" % (e, e)) | ||
482 | 190 | finally: | ||
483 | 191 | yield self._reconnect_lock.release() | ||
484 | 192 | |||
485 | 193 | @inlineCallbacks | ||
486 | 194 | def _cb_restablish_session(self): | ||
487 | 195 | """Re-establish a new session, and recreate ephemerals and watches. | ||
488 | 196 | """ | ||
489 | 197 | while 1: | ||
490 | 198 | # Reconnect | ||
491 | 199 | if self.handle is None: | ||
492 | 200 | returnValue(self.handle) | ||
493 | 201 | try: | ||
494 | 202 | yield self.connect(timeout=self._connect_timeout) | ||
495 | 203 | except zookeeper.ZooKeeperException, e: | ||
496 | 204 | log.exception("Error while connecting %r %s" % (e, e)) | ||
497 | 205 | continue | ||
498 | 206 | else: | ||
499 | 207 | break | ||
500 | 208 | |||
501 | 209 | items = self._ephemerals.items() | ||
502 | 210 | self._ephemerals = {} | ||
503 | 211 | |||
504 | 212 | for path, e in items: | ||
505 | 213 | try: | ||
506 | 214 | yield self.create( | ||
507 | 215 | path, e['data'], acls=e['acls'], flags=e['flags']) | ||
508 | 216 | except zookeeper.NodeExistsException: | ||
509 | 217 | log.error("Attempt to create ephemeral node failed %r", path) | ||
510 | 218 | yield self._watches.reset() | ||
511 | 219 | |||
512 | 220 | def _cb_restablish_errback(self, err, failure): | ||
513 | 221 | """If there's an error re-establishing the session log it. | ||
514 | 222 | """ | ||
515 | 223 | log.error("Error while trying to restablish connection %s\n%s" % ( | ||
516 | 224 | failure.value, failure.getTraceback())) | ||
517 | 225 | return failure | ||
518 | 226 | |||
519 | 227 | @inlineCallbacks | ||
520 | 228 | def _cb_connection_error(self, client, error): | ||
521 | 229 | """Convert session expiration to a transient connection error. | ||
522 | 230 | |||
523 | 231 | Dispatches from api usage error. | ||
524 | 232 | """ | ||
525 | 233 | if not isinstance(error, ( | ||
526 | 234 | zookeeper.SessionExpiredException, | ||
527 | 235 | NotConnectedException, | ||
528 | 236 | zookeeper.ClosingException)): | ||
529 | 237 | raise error | ||
530 | 238 | yield self._cb_restablish_session() | ||
531 | 239 | raise zookeeper.ConnectionLossException | ||
532 | 240 | |||
533 | 241 | @inlineCallbacks | ||
534 | 242 | def _cb_connection_event(self, evt): | ||
535 | 243 | """ | ||
536 | 244 | """ | ||
537 | 245 | |||
538 | 246 | # client connected tracker | ||
539 | 247 | def _check_connected(self, d): | ||
540 | 248 | """Clients are automatically reconnected.""" | ||
541 | 249 | if self.connected: | ||
542 | 250 | return | ||
543 | 251 | |||
544 | 252 | if self.handle is None: | ||
545 | 253 | d.errback(NotConnectedException("not connected")) | ||
546 | 254 | return d | ||
547 | 255 | |||
548 | 256 | c_d = self.cb_restablish_session() | ||
549 | 257 | |||
550 | 258 | def after_connected(client): | ||
551 | 259 | """Return a transient connection failure. | ||
552 | 260 | |||
553 | 261 | The retry client will automatically attempt to retry the operation. | ||
554 | 262 | """ | ||
555 | 263 | return fail(zookeeper.ConnectionLossException("Retry")) | ||
556 | 264 | |||
557 | 265 | c_d.addCallback(after_connected) | ||
558 | 266 | c_d.chainDeferred(d) | ||
559 | 267 | return d | ||
560 | 268 | |||
561 | 269 | # Dispatch from node watches on session expiration | ||
562 | 270 | def _watch_session_wrapper(self, watcher, event_type, conn_state, path): | ||
563 | 271 | """Watch wrapper that diverts session events to a connection callback. | ||
564 | 272 | """ | ||
565 | 273 | if (event_type == zookeeper.SESSION_EVENT and | ||
566 | 274 | conn_state == zookeeper.EXPIRED_SESSION_STATE): | ||
567 | 275 | d = self.cb_restablish_session() | ||
568 | 276 | d.addErrback(self._cb_restablish_errback) | ||
569 | 277 | return d | ||
570 | 278 | if event_type == zookeeper.SESSION_EVENT: | ||
571 | 279 | if self._session_event_callback: | ||
572 | 280 | self._session_event_callback( | ||
573 | 281 | self, ClientEvent(event_type, conn_state, path)) | ||
574 | 282 | else: | ||
575 | 283 | return watcher(event_type, conn_state, path) | ||
576 | 284 | |||
577 | 285 | # Track all watches | ||
578 | 286 | def _wrap_watcher(self, watcher, watch_type, path): | ||
579 | 287 | if watcher is None: | ||
580 | 288 | return watcher | ||
581 | 289 | if not callable(watcher): | ||
582 | 290 | raise SyntaxError("invalid watcher") | ||
583 | 291 | |||
584 | 292 | # handle conn watcher, separately. | ||
585 | 293 | if watch_type is None and path is None: | ||
586 | 294 | return self._zk_thread_callback( | ||
587 | 295 | self._watch_session_wrapper, watcher) | ||
588 | 296 | |||
589 | 297 | return self._zk_thread_callback( | ||
590 | 298 | partial( | ||
591 | 299 | self._watch_session_wrapper, | ||
592 | 300 | self._watches.add(path, watch_type, watcher))) | ||
593 | 301 | |||
594 | 302 | # Track ephemerals | ||
595 | 303 | def _cb_created(self, d, data, acls, flags, result_code, path): | ||
596 | 304 | if self._check_result(result_code, d): | ||
597 | 305 | return | ||
598 | 306 | |||
599 | 307 | if flags & zookeeper.EPHEMERAL: | ||
600 | 308 | self._ephemerals[path] = dict( | ||
601 | 309 | data=data, acls=acls, flags=flags) | ||
602 | 310 | |||
603 | 311 | d.callback(path) | ||
604 | 312 | |||
605 | 313 | def _cb_deleted(self, d, path, result_code): | ||
606 | 314 | if self._check_result(result_code, d): | ||
607 | 315 | return | ||
608 | 316 | |||
609 | 317 | self._ephemerals.pop(path, None) | ||
610 | 318 | d.callback(result_code) | ||
611 | 319 | |||
612 | 320 | def _cb_set_acl(self, d, path, acls, result_code): | ||
613 | 321 | if self._check_result(result_code, d): | ||
614 | 322 | return | ||
615 | 323 | |||
616 | 324 | if path in self._ephemerals: | ||
617 | 325 | self._ephemerals[path]['acls'] = acls | ||
618 | 326 | |||
619 | 327 | d.callback(result_code) | ||
620 | 328 | |||
621 | 329 | def _cb_set(self, d, path, data, result_code, node_stat): | ||
622 | 330 | if self._check_result(result_code, d): | ||
623 | 331 | return | ||
624 | 332 | |||
625 | 333 | if path in self._ephemerals: | ||
626 | 334 | self._ephemerals[path]['data'] = data | ||
627 | 335 | |||
628 | 336 | d.callback(node_stat) | ||
629 | 337 | |||
630 | 338 | |||
631 | 339 | class ManagedClient(RetryClient): | ||
632 | 340 | |||
633 | 341 | def __init__(self, *args, **kw): | ||
634 | 342 | client = SessionClient(*args, **kw) | ||
635 | 343 | super(ManagedClient, self).__init__( | ||
636 | 344 | client, client.cb_restablish_session) | ||
637 | 345 | |||
638 | 346 | |||
639 | 347 | def ManagedClient(servers=None, session_timeout=None, connect_timeout=10000): | ||
640 | 348 | from retry import RetryClient | ||
641 | 349 | client = SessionClient(servers, session_timeout, connect_timeout) | ||
642 | 350 | return RetryClient(client) | ||
643 | 0 | 351 | ||
644 | === modified file 'txzookeeper/retry.py' | |||
645 | --- txzookeeper/retry.py 2011-11-08 12:15:16 +0000 | |||
646 | +++ txzookeeper/retry.py 2012-04-04 18:12:22 +0000 | |||
647 | @@ -64,7 +64,7 @@ | |||
648 | 64 | 64 | ||
649 | 65 | :param session_timeout: The timeout for the session, in milliseconds | 65 | :param session_timeout: The timeout for the session, in milliseconds |
650 | 66 | :param max_delay: The max delay for a retry, in seconds. | 66 | :param max_delay: The max delay for a retry, in seconds. |
652 | 67 | :param session_fraction: The fractinoal amount of a timeout to wait | 67 | :param session_fraction: The fractional amount of a timeout to wait |
653 | 68 | """ | 68 | """ |
654 | 69 | retry_delay = session_timeout / (float(session_fraction) * 1000) | 69 | retry_delay = session_timeout / (float(session_fraction) * 1000) |
655 | 70 | return min(retry_delay, max_delay) | 70 | return min(retry_delay, max_delay) |
656 | @@ -111,20 +111,20 @@ | |||
657 | 111 | must return a single value (either a deferred or result | 111 | must return a single value (either a deferred or result |
658 | 112 | value). | 112 | value). |
659 | 113 | """ | 113 | """ |
660 | 114 | # For clients which aren't connected (session timeout == None) | ||
661 | 115 | # we raise the errors to the callers | ||
662 | 116 | session_timeout = client.session_timeout or 0 | ||
663 | 117 | |||
664 | 118 | # If we keep retrying past the 1.5 * session timeout without | ||
665 | 119 | # success just die, the session expiry is fatal. | ||
666 | 120 | max_time = session_timeout * 1.5 + time.time() | ||
667 | 121 | |||
668 | 122 | while 1: | 114 | while 1: |
669 | 123 | try: | 115 | try: |
670 | 124 | value = yield func(*args, **kw) | 116 | value = yield func(*args, **kw) |
671 | 125 | except Exception, e: | 117 | except Exception, e: |
672 | 118 | # For clients which aren't connected (session timeout == None) | ||
673 | 119 | # we raise the errors to the callers | ||
674 | 120 | session_timeout = client.session_timeout or 0 | ||
675 | 121 | |||
676 | 122 | # If we keep retrying past the 1.5 * session timeout without | ||
677 | 123 | # success just die, the session expiry is fatal. | ||
678 | 124 | max_time = session_timeout * 1.5 + time.time() | ||
679 | 126 | if not check_retryable(client, max_time, e): | 125 | if not check_retryable(client, max_time, e): |
680 | 127 | raise | 126 | raise |
681 | 127 | |||
682 | 128 | # Give the connection a chance to auto-heal. | 128 | # Give the connection a chance to auto-heal. |
683 | 129 | yield sleep(get_delay(session_timeout)) | 129 | yield sleep(get_delay(session_timeout)) |
684 | 130 | continue | 130 | continue |
685 | @@ -172,6 +172,7 @@ | |||
686 | 172 | # Give the connection a chance to auto-heal | 172 | # Give the connection a chance to auto-heal |
687 | 173 | d = sleep(get_delay(session_timeout)) | 173 | d = sleep(get_delay(session_timeout)) |
688 | 174 | d.addCallback(retry_inner) | 174 | d.addCallback(retry_inner) |
689 | 175 | |||
690 | 175 | return d | 176 | return d |
691 | 176 | 177 | ||
692 | 177 | def retry_inner(value): | 178 | def retry_inner(value): |
693 | 178 | 179 | ||
694 | === modified file 'txzookeeper/tests/__init__.py' | |||
695 | --- txzookeeper/tests/__init__.py 2011-10-22 04:24:02 +0000 | |||
696 | +++ txzookeeper/tests/__init__.py 2012-04-04 18:12:22 +0000 | |||
697 | @@ -20,6 +20,8 @@ | |||
698 | 20 | # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. | 20 | # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. |
699 | 21 | # | 21 | # |
700 | 22 | 22 | ||
701 | 23 | import logging | ||
702 | 24 | import StringIO | ||
703 | 23 | import sys | 25 | import sys |
704 | 24 | 26 | ||
705 | 25 | import zookeeper | 27 | import zookeeper |
706 | @@ -72,6 +74,26 @@ | |||
707 | 72 | return None | 74 | return None |
708 | 73 | return original_value | 75 | return original_value |
709 | 74 | 76 | ||
710 | 77 | def capture_log( | ||
711 | 78 | self, name="", level=logging.INFO, log_file=None, formatter=None): | ||
712 | 79 | """Capture log channel to StringIO""" | ||
713 | 80 | if log_file is None: | ||
714 | 81 | log_file = StringIO.StringIO() | ||
715 | 82 | log_handler = logging.StreamHandler(log_file) | ||
716 | 83 | if formatter: | ||
717 | 84 | log_handler.setFormatter(formatter) | ||
718 | 85 | logger = logging.getLogger(name) | ||
719 | 86 | logger.addHandler(log_handler) | ||
720 | 87 | old_logger_level = logger.level | ||
721 | 88 | logger.setLevel(level) | ||
722 | 89 | |||
723 | 90 | @self.addCleanup | ||
724 | 91 | def reset_logging(): | ||
725 | 92 | logger.removeHandler(log_handler) | ||
726 | 93 | logger.setLevel(old_logger_level) | ||
727 | 94 | |||
728 | 95 | return log_file | ||
729 | 96 | |||
730 | 75 | 97 | ||
731 | 76 | def egg_test_runner(): | 98 | def egg_test_runner(): |
732 | 77 | """ | 99 | """ |
733 | 78 | 100 | ||
734 | === modified file 'txzookeeper/tests/test_client.py' | |||
735 | --- txzookeeper/tests/test_client.py 2011-10-22 04:24:02 +0000 | |||
736 | +++ txzookeeper/tests/test_client.py 2012-04-04 18:12:22 +0000 | |||
737 | @@ -1050,7 +1050,7 @@ | |||
738 | 1050 | d = self.client.connect() | 1050 | d = self.client.connect() |
739 | 1051 | 1051 | ||
740 | 1052 | def verify_session_timeout(client): | 1052 | def verify_session_timeout(client): |
742 | 1053 | self.assertEqual(client.session_timeout, 4000) | 1053 | self.assertIn(client.session_timeout, (4000, 10000)) |
743 | 1054 | 1054 | ||
744 | 1055 | d.addCallback(verify_session_timeout) | 1055 | d.addCallback(verify_session_timeout) |
745 | 1056 | return d | 1056 | return d |
746 | 1057 | 1057 | ||
747 | === added file 'txzookeeper/tests/test_managed.py' | |||
748 | --- txzookeeper/tests/test_managed.py 1970-01-01 00:00:00 +0000 | |||
749 | +++ txzookeeper/tests/test_managed.py 2012-04-04 18:12:22 +0000 | |||
750 | @@ -0,0 +1,263 @@ | |||
751 | 1 | # | ||
752 | 2 | # Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved | ||
753 | 3 | # | ||
754 | 4 | # This file is part of txzookeeper. | ||
755 | 5 | # | ||
756 | 6 | # Authors: | ||
757 | 7 | # Kapil Thangavelu | ||
758 | 8 | # | ||
759 | 9 | # txzookeeper is free software: you can redistribute it and/or modify | ||
760 | 10 | # it under the terms of the GNU Lesser General Public License as published by | ||
761 | 11 | # the Free Software Foundation, either version 3 of the License, or | ||
762 | 12 | # (at your option) any later version. | ||
763 | 13 | # | ||
764 | 14 | # txzookeeper is distributed in the hope that it will be useful, | ||
765 | 15 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
766 | 16 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
767 | 17 | # GNU Lesser General Public License for more details. | ||
768 | 18 | # | ||
769 | 19 | # You should have received a copy of the GNU Lesser General Public License | ||
770 | 20 | # along with txzookeeper. If not, see <http://www.gnu.org/licenses/>. | ||
771 | 21 | # | ||
772 | 22 | |||
773 | 23 | import zookeeper | ||
774 | 24 | |||
775 | 25 | from twisted.internet.defer import inlineCallbacks, Deferred, DeferredList | ||
776 | 26 | |||
777 | 27 | from txzookeeper.client import ZookeeperClient, ClientEvent | ||
778 | 28 | from txzookeeper import managed | ||
779 | 29 | from txzookeeper.tests import ZookeeperTestCase, utils | ||
780 | 30 | from txzookeeper.tests import test_client | ||
781 | 31 | |||
782 | 32 | |||
783 | 33 | class WatchTest(ZookeeperTestCase): | ||
784 | 34 | """Watch manager and watch tests""" | ||
785 | 35 | |||
786 | 36 | def setUp(self): | ||
787 | 37 | self.watches = managed.WatchManager() | ||
788 | 38 | |||
789 | 39 | def tearDown(self): | ||
790 | 40 | self.watches.clear() | ||
791 | 41 | del self.watches | ||
792 | 42 | |||
793 | 43 | def test_add_remove(self): | ||
794 | 44 | |||
795 | 45 | w = self.watches.add("/foobar", "child", lambda x: 1) | ||
796 | 46 | self.assertIn( | ||
797 | 47 | "<Watcher child /foobar", | ||
798 | 48 | str(w)) | ||
799 | 49 | self.assertIn(w, self.watches._watches) | ||
800 | 50 | self.watches.remove(w) | ||
801 | 51 | self.assertNotIn(w, self.watches._watches) | ||
802 | 52 | self.watches.remove(w) | ||
803 | 53 | |||
804 | 54 | @inlineCallbacks | ||
805 | 55 | def test_watch_fire_removes(self): | ||
806 | 56 | """Firing the watch removes it from the manager. | ||
807 | 57 | """ | ||
808 | 58 | w = self.watches.add("/foobar", "child", lambda x: 1) | ||
809 | 59 | yield w("a") | ||
810 | 60 | self.assertNotIn(w, self.watches._watches) | ||
811 | 61 | |||
812 | 62 | @inlineCallbacks | ||
813 | 63 | def test_watch_fire_with_error_removes(self): | ||
814 | 64 | """Firing the watch removes it from the manager. | ||
815 | 65 | """ | ||
816 | 66 | d = Deferred() | ||
817 | 67 | |||
818 | 68 | @inlineCallbacks | ||
819 | 69 | def cb_error(e): | ||
820 | 70 | yield d | ||
821 | 71 | raise ValueError("a") | ||
822 | 72 | |||
823 | 73 | w = self.watches.add("/foobar", "child", lambda x: 1) | ||
824 | 74 | try: | ||
825 | 75 | wd = w("a") | ||
826 | 76 | d.callback(True) | ||
827 | 77 | yield wd | ||
828 | 78 | except ValueError: | ||
829 | 79 | pass | ||
830 | 80 | self.assertNotIn(w, self.watches._watches) | ||
831 | 81 | |||
832 | 82 | @inlineCallbacks | ||
833 | 83 | def test_reset_with_error(self): | ||
834 | 84 | """A callback firing an error on reset is ignored. | ||
835 | 85 | """ | ||
836 | 86 | output = self.capture_log("txzk.managed") | ||
837 | 87 | d = Deferred() | ||
838 | 88 | results = [] | ||
839 | 89 | |||
840 | 90 | @inlineCallbacks | ||
841 | 91 | def callback(*args, **kw): | ||
842 | 92 | results.append((args, kw)) | ||
843 | 93 | yield d | ||
844 | 94 | raise ValueError("a") | ||
845 | 95 | |||
846 | 96 | w = self.watches.add("/foobar", "child", callback) | ||
847 | 97 | reset_done = self.watches.reset() | ||
848 | 98 | |||
849 | 99 | e, _ = results.pop() | ||
850 | 100 | self.assertEqual( | ||
851 | 101 | str(ClientEvent(*e)), | ||
852 | 102 | "<ClientEvent session at '/foobar' state: connected>") | ||
853 | 103 | d.callback(True) | ||
854 | 104 | yield reset_done | ||
855 | 105 | self.assertNotIn(w, self.watches._watches) | ||
856 | 106 | self.assertIn("Error reseting watch", output.getvalue()) | ||
857 | 107 | |||
858 | 108 | @inlineCallbacks | ||
859 | 109 | def test_reset(self): | ||
860 | 110 | """Reset fires a synthentic client event, and clears watches. | ||
861 | 111 | """ | ||
862 | 112 | d = Deferred() | ||
863 | 113 | results = [] | ||
864 | 114 | |||
865 | 115 | def callback(*args, **kw): | ||
866 | 116 | results.append((args, kw)) | ||
867 | 117 | return d | ||
868 | 118 | |||
869 | 119 | w = self.watches.add("/foobar", "child", callback) | ||
870 | 120 | reset_done = self.watches.reset() | ||
871 | 121 | |||
872 | 122 | e, _ = results.pop() | ||
873 | 123 | self.assertEqual( | ||
874 | 124 | str(ClientEvent(*e)), | ||
875 | 125 | "<ClientEvent session at '/foobar' state: connected>") | ||
876 | 126 | d.callback(True) | ||
877 | 127 | yield reset_done | ||
878 | 128 | self.assertNotIn(w, self.watches._watches) | ||
879 | 129 | |||
880 | 130 | |||
881 | 131 | class SessionClientTests(test_client.ClientTests): | ||
882 | 132 | """Run through basic operations with SessionClient.""" | ||
883 | 133 | timeout = 5 | ||
884 | 134 | |||
885 | 135 | def setUp(self): | ||
886 | 136 | super(SessionClientTests, self).setUp() | ||
887 | 137 | self.client = managed.SessionClient("127.0.0.1:2181") | ||
888 | 138 | |||
889 | 139 | def test_client_use_while_disconnected_returns_failure(self): | ||
890 | 140 | # managed session client reconnects here. | ||
891 | 141 | return True | ||
892 | 142 | |||
893 | 143 | |||
894 | 144 | class SessionClientExpireTests(ZookeeperTestCase): | ||
895 | 145 | """Verify expiration behavior.""" | ||
896 | 146 | |||
897 | 147 | def setUp(self): | ||
898 | 148 | super(SessionClientExpireTests, self).setUp() | ||
899 | 149 | self.client = managed.ManagedClient("127.0.0.1:2181", 3000) | ||
900 | 150 | self.client2 = None | ||
901 | 151 | return self.client.connect() | ||
902 | 152 | |||
903 | 153 | @inlineCallbacks | ||
904 | 154 | def tearDown(self): | ||
905 | 155 | self.client.close() | ||
906 | 156 | |||
907 | 157 | self.client2 = ZookeeperClient("127.0.0.1:2181") | ||
908 | 158 | yield self.client2.connect() | ||
909 | 159 | utils.deleteTree(handle=self.client2.handle) | ||
910 | 160 | yield self.client2.close() | ||
911 | 161 | super(SessionClientExpireTests, self).tearDown() | ||
912 | 162 | |||
913 | 163 | @inlineCallbacks | ||
914 | 164 | def expire_session(self): | ||
915 | 165 | assert self.client.connected | ||
916 | 166 | self.client2 = ZookeeperClient(self.client.servers) | ||
917 | 167 | yield self.client2.connect(client_id=self.client.client_id) | ||
918 | 168 | yield self.client2.close() | ||
919 | 169 | # It takes some time to propagate (1/3 session time as ping) | ||
920 | 170 | yield self.sleep(2) | ||
921 | 171 | |||
922 | 172 | @inlineCallbacks | ||
923 | 173 | def test_session_expiration_conn(self): | ||
924 | 174 | session_id = self.client.client_id[0] | ||
925 | 175 | yield self.client.create("/fo-1", "abc") | ||
926 | 176 | yield self.expire_session() | ||
927 | 177 | yield self.client.exists("/") | ||
928 | 178 | self.assertNotEqual(session_id, self.client.client_id[0]) | ||
929 | 179 | |||
930 | 180 | @inlineCallbacks | ||
931 | 181 | def test_session_expiration_conn_watch(self): | ||
932 | 182 | session_id = self.client.client_id[0] | ||
933 | 183 | yield self.client.create("/fo-1", "abc") | ||
934 | 184 | yield self.expire_session() | ||
935 | 185 | yield self.client.exists("/") | ||
936 | 186 | self.assertNotEqual(session_id, self.client.client_id[0]) | ||
937 | 187 | |||
938 | 188 | @inlineCallbacks | ||
939 | 189 | def test_invoked_watch_gc(self): | ||
940 | 190 | c_d, w_d = yield self.client.get_children_and_watch("/") | ||
941 | 191 | yield c_d | ||
942 | 192 | yield self.client.create("/foo") | ||
943 | 193 | yield w_d | ||
944 | 194 | yield self.expire_session() | ||
945 | 195 | yield self.client.create("/foo2") | ||
946 | 196 | # Nothing should blow up | ||
947 | 197 | yield self.sleep(0.2) | ||
948 | 198 | |||
949 | 199 | @inlineCallbacks | ||
950 | 200 | def test_ephemeral_and_watch_recreate(self): | ||
951 | 201 | # Create some ephemeral nodes | ||
952 | 202 | yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) | ||
953 | 203 | yield self.client.create("/fo-2", "def", flags=zookeeper.EPHEMERAL) | ||
954 | 204 | |||
955 | 205 | # Create some watches | ||
956 | 206 | g_d, g_w_d = self.client.get_and_watch("/fo-1") | ||
957 | 207 | yield g_d | ||
958 | 208 | |||
959 | 209 | c_d, c_w_d = self.client.get_children_and_watch("/") | ||
960 | 210 | yield g_d | ||
961 | 211 | |||
962 | 212 | e_d, e_w_d = self.client.get_children_and_watch("/fo-2") | ||
963 | 213 | yield e_d | ||
964 | 214 | |||
965 | 215 | # Expire the session | ||
966 | 216 | yield self.expire_session() | ||
967 | 217 | |||
968 | 218 | # Poof | ||
969 | 219 | |||
970 | 220 | # Ephemerals back | ||
971 | 221 | c, s = yield self.client.get("/fo-1") | ||
972 | 222 | self.assertEqual(c, "abc") | ||
973 | 223 | |||
974 | 224 | c, s = yield self.client.get("/fo-2") | ||
975 | 225 | self.assertEqual(c, "def") | ||
976 | 226 | |||
977 | 227 | # Watches triggered | ||
978 | 228 | yield DeferredList( | ||
979 | 229 | [g_w_d, c_w_d, e_w_d], | ||
980 | 230 | fireOnOneErrback=True, consumeErrors=True) | ||
981 | 231 | |||
982 | 232 | self.assertEqual( | ||
983 | 233 | [str(d.result) for d in (g_w_d, c_w_d, e_w_d)], | ||
984 | 234 | ["<ClientEvent session at '/fo-1' state: connected>", | ||
985 | 235 | "<ClientEvent session at '/' state: connected>", | ||
986 | 236 | "<ClientEvent session at '/fo-2' state: connected>"]) | ||
987 | 237 | |||
988 | 238 | @inlineCallbacks | ||
989 | 239 | def test_ephemeral_content_modification(self): | ||
990 | 240 | yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) | ||
991 | 241 | yield self.client.set("/fo-1", "def") | ||
992 | 242 | yield self.expire_session() | ||
993 | 243 | c, s = yield self.client.get("/fo-1") | ||
994 | 244 | self.assertEqual(c, "def") | ||
995 | 245 | |||
996 | 246 | @inlineCallbacks | ||
997 | 247 | def test_ephemeral_acl_modification(self): | ||
998 | 248 | yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) | ||
999 | 249 | acl = [test_client.PUBLIC_ACL, | ||
1000 | 250 | dict(scheme="digest", | ||
1001 | 251 | id="zebra:moon", | ||
1002 | 252 | perms=zookeeper.PERM_ALL)] | ||
1003 | 253 | yield self.client.set_acl("/fo-1", acl) | ||
1004 | 254 | yield self.expire_session() | ||
1005 | 255 | n_acl, stat = yield self.client.get_acl("/fo-1") | ||
1006 | 256 | self.assertEqual(acl, n_acl) | ||
1007 | 257 | |||
1008 | 258 | @inlineCallbacks | ||
1009 | 259 | def test_ephemeral_deletion(self): | ||
1010 | 260 | yield self.client.create("/fo-1", "abc", flags=zookeeper.EPHEMERAL) | ||
1011 | 261 | yield self.client.delete("/fo-1") | ||
1012 | 262 | yield self.expire_session() | ||
1013 | 263 | self.assertFalse((yield self.client.exists("/fo-1"))) | ||
1014 | 0 | 264 | ||
1015 | === modified file 'txzookeeper/tests/test_retry.py' | |||
1016 | --- txzookeeper/tests/test_retry.py 2011-11-08 12:15:16 +0000 | |||
1017 | +++ txzookeeper/tests/test_retry.py 2012-04-04 18:12:22 +0000 | |||
1018 | @@ -1,5 +1,5 @@ | |||
1019 | 1 | # | 1 | # |
1021 | 2 | # Copyright (C) 2010-2011 Canonical Ltd. All Rights Reserved | 2 | # Copyright (C) 2010-2012 Canonical Ltd. All Rights Reserved |
1022 | 3 | # | 3 | # |
1023 | 4 | # This file is part of txzookeeper. | 4 | # This file is part of txzookeeper. |
1024 | 5 | # | 5 | # |
Reviewers: mp+100715_ code.launchpad. net,
Message:
Please take a look.
Description:
Managed Connections for txzk
A managed connection automatically handles transient connection
failures,
and session expiration. On session expiration it recreates ephemeral
nodes,
and fire watch events on extant watchers.
https:/ /code.launchpad .net/~hazmat/ txzookeeper/ managed- watch-and- ephemeral/ +merge/ 100715
(do not edit description out of merge proposal)
Please review this at https:/ /codereview. appspot. com/5970080/
Affected files: client. py managed. py retry.py tests/_ _init__ .py tests/test_ client. py tests/test_ managed. py tests/test_ retry.py
A .bzrignore
A [revision details]
M setup.py
M txzookeeper/
A txzookeeper/
M txzookeeper/
M txzookeeper/
M txzookeeper/
A txzookeeper/
M txzookeeper/