Merge lp:~hazmat/txzookeeper/session-event-handling into lp:txzookeeper

Proposed by Kapil Thangavelu
Status: Superseded
Proposed branch: lp:~hazmat/txzookeeper/session-event-handling
Merge into: lp:txzookeeper
Prerequisite: lp:~hazmat/txzookeeper/swap-sync-errors-to-failures
Diff against target: 1464 lines (+807/-183)
11 files modified
txzookeeper/client.py (+222/-65)
txzookeeper/queue.py (+5/-5)
txzookeeper/tests/__init__.py (+2/-2)
txzookeeper/tests/common.py (+216/-0)
txzookeeper/tests/test_client.py (+106/-100)
txzookeeper/tests/test_node.py (+1/-1)
txzookeeper/tests/test_queue.py (+5/-5)
txzookeeper/tests/test_security.py (+2/-2)
txzookeeper/tests/test_session.py (+245/-0)
txzookeeper/tests/test_utils.py (+1/-1)
txzookeeper/tests/utils.py (+2/-2)
To merge this branch: bzr merge lp:~hazmat/txzookeeper/session-event-handling
Reviewer Review Type Date Requested Status
Jim Baker (community) Approve
Gustavo Niemeyer Approve
Review via email: mp+65210@code.launchpad.net

This proposal has been superseded by a proposal from 2011-07-05.

Description of the change

Session event and connection loss handling for txzookeeper.

  - Session tests using a ZK cluster as a test layer/test resource, cluster state
    is reset between tests.
  - Zookeeper Client session tests for server rotation across multi-node cluster.
  - Client server rotation and session/watch migration tests.
  - Session event handling, sent to user defined callback, else ignored by default.
  - Connection loss handling, sent to user defined callback, else raised at the
    API call point. The callback result if any is returned as the error to be
    returned to the API.

To post a comment you must log in.
Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Please note that this branch has a pre-requisite branch also in review.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :

This looks very good. A few comments, and +1 with them considered.

[1]

+ zookeeper.NOTWATCHING_EVENT: "notwatching",
+ zookeeper.SESSION_EVENT: "session",
+ zookeeper.CREATED_EVENT: 'created',

Quotes are inconsistent here.

Also, "not-watching" would be a bit more friendly to the eye.

[2]

+def debug_symbols(sequence):

Left over?

[3]

error = error_class(error_msg)
+
+ if ConnectionException.is_connection_exception(error):

This idiom feels a little weird. A Foo is necessarily a Foo,
so we never have Foo.is_foo. Unless you have some background
for the choice that I miss, I suggest just moving that to a
"is_connection_exception()" top-level function.

[4]

+ # The result of the connection error handler is returned
+ # to the api.

That's a nice design, despite our conversations around the problem of
connection level erroring.

I'm a bit concerned about the session events, though. A session-termination
event may be the only event the deferred ever sees, so if we divert it to
a separate callback, the logic pending on the deferred will be dead forever.

I'm not sure about what we should do here. Have you thought about this?

If the answer is to just be so for now, we should at least add a note to
the proper location in the code mentioning that these deferreds are dead.

[5]

+ d = defer.maybeDeferred(
+ self._connection_error_callback,
+ self, error)

For consistency, either both callbacks should take "self", or neither of them
should.

[6]

+ # XXX/Debug
+ #print
+ #print "Session/Conn Event", ClientEvent(type, state, path)
+

Some left overs.

[7]

+ self._check_result(result, d)
+ if not d.called:
+ d.callback(True)

This is checking if the deferred is being called synchronously, which seems
dangerous as a pattern. Theorethically, the deferred may not have fired by
the time the function returns.

I suggest using the interface defined (considering the suggestions from
the pre-requisite branch):

  if not self._failed(result, d):
      d.callback(True)
  return d

review: Approve
Revision history for this message
Jim Baker (jimbaker) wrote :

+1, we certainly need this branch, and it looks quite solid.

In addition to Gustavo's comments:

[1]

Some minor grammar suggestions:

+ specified in comma separated fashion, if they are,

specified in comma separated fashion. If they are,

+ # If its a session event pass it to the session callback, else

If it's a session event, ...

+ nodes and watches)/ The connection's client id, is also useful
+ when introspecting the server logs for specific client
+ activity.

nodes and watches. The connection's client id is also...

+ Its used for all connection level events and session events.

It's used...

+ # Send session events to the callback, this in addition to any
+ # duplicate session events that will be sent for extant watches.

callback, in addition

+ """Set a callback to recieve session events.

receive

+ twisted integration using deferreds is not capable of recieving

receiving

+ to be invoked with them instead. The callback recieves a single

receives

+ call is made, by setting a connection level error handler,
+ applications can centralize their handling of connection loss,

Something like

By setting...

+ This is a global setting, across connections.

setting across all connections

+ A client session can be reattached to in a separate connection,
+ if the a session is expired, using the zookeeper connection will
+ raise a SessionExpiredException.

+ # On occassion we get a session expired event while connecting.

occasion

+ # Closing one connection, will close the session

Closing one connection will close the session.

+ A client connected to multiple servers, will transparently
+ migrate amongst them, as individual servers can no longer be
+ reached. A client's session will be maintined.

servers will transparently...

...will be maintained.

+ # get a zookeeper connectionloss exception on occassion.

occasion

+ We can specify a callback for connection errors, that
+ can perform recovery for a disconnected client, restablishing

This seems incomplete; also there should not be a comma before "that"

+ """On connection failure, An application can choose to use a
+ new connection with to reconnect to a different member of the
+ zookeeper cluster, reacquiring the extant session.

an application... with which to reconnect...

Maybe this concludes with "This enables reacquiring the existing
session." (Extant doesn't seem like the right word choice here.)

+ A large obvious caveat to using a new client instance rather
+ than reconnecting the existing client, is that even though the

instance, rather than...

+ # Ephemeral is destroyed when the session closed.

the session is closed.

[2]

+ http://bit.ly/mQrOMY
+ http://bit.ly/irKpfn

Why not just expand these links out? Alternatively use a readable URL
(the underlying nabble URL is modestly readable, I don't know how
permalink either one is, however).

[3]

+ assert callable(callback)

Shouldn't this be throwing a TypeError instead? Also, there is no
correponding assertion (or verification) on
set_connection_error_callback. Lastly, there is no test on this path.

[4]

+ def xtest_session_expired_event(self):

Do you want to include this test?

review: Approve
48. By Kapil Thangavelu

add a test for expired session events, address some review comments [1-3]

49. By Kapil Thangavelu

replace some mock'd error behavior tests with real equivalents where possible, yank the one that wasn't (sync w/ error)

50. By Kapil Thangavelu

client usage while disconnected, will return a failed deferred instead of raising an exception

51. By Kapil Thangavelu

unrecoverable is a client property not method.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Excerpts from Gustavo Niemeyer's message of Mon Jun 20 18:28:28 UTC 2011:
> Review: Approve
> This looks very good. A few comments, and +1 with them considered.
>

1-3 addressed.

>
> [4]
>
> + # The result of the connection error handler is returned
> + # to the api.
>
> That's a nice design, despite our conversations around the problem of
> connection level erroring.
>
> I'm a bit concerned about the session events, though. A session-termination
> event may be the only event the deferred ever sees, so if we divert it to
> a separate callback, the logic pending on the deferred will be dead forever.
>
> I'm not sure about what we should do here. Have you thought about this?
>
> If the answer is to just be so for now, we should at least add a note to
> the proper location in the code mentioning that these deferreds are dead.
>

Any deferreds attached to an unconnected client are dead. A session 'expired' event
is effectively a connection loss, the extant deferreds are dead. The expired
event is the only fatal session event. There is some testing around this (extant
watches on dead clients).

Alternatively, we could route session exceptions to the extant watcher, but then
we're back to the app code having to add guards to each watch for connection loss,
or lots of unhandled exceptions in deferred stacks. The connection exception
watcher will still fire as its always registered on the connection level watcher,
ie. the one that monitors the initial connection setup.

From a cleanup perspective this might be preferrable, but the numbers of extant
watches are small enough its not clear its really win, and most of that cleanup
will just result in ignorable log errors without connection loss handling on all
watches. The error handling itself and the error handling for a watchis basically
just ignore or propgate the exception to fully clear out the deferred chain with
the connection level error callback handling the actual reconnect scenario.

[5-6] Addressed

> [7]
>
> + self._check_result(result, d)
> + if not d.called:
> + d.callback(True)
>
> This is checking if the deferred is being called synchronously, which seems
> dangerous as a pattern. Theorethically, the deferred may not have fired by
> the time the function returns.
>
>
> I suggest using the interface defined (considering the suggestions from
> the pre-requisite branch):
>
> if not self._failed(result, d):
> d.callback(True)
> return d
>

In this case the txzk api is synchronous (close) and we're checking just checking
if its failed. I've updated the construct to not check the deferred.called status.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Excerpts from Jim Baker's message of Tue Jun 21 00:18:33 UTC 2011:
> Review: Approve
> +1, we certainly need this branch, and it looks quite solid.
>
> In addition to Gustavo's comments:
>
> [1]
>
Added most of the grammatical changes.
>
> [2]
>
> + http://bit.ly/mQrOMY
> + http://bit.ly/irKpfn
>
> Why not just expand these links out? Alternatively use a readable URL
> (the underlying nabble URL is modestly readable, I don't know how
> permalink either one is, however).
>

The links are rather ugly by themselves and violate 80 col lines.

>
> [3]
>
> + assert callable(callback)
>
> Shouldn't this be throwing a TypeError instead? Also, there is no
> correponding assertion (or verification) on
> set_connection_error_callback. Lastly, there is no test on this path.

Well the tests definitely exercised it by way of using the functionality of the callback, but i've gone ahead and added the typeerror and invalid callback tests.

>
>
> [4]
>
> + def xtest_session_expired_event(self):
>
> Do you want to include this test?
>

nice catch, i did but it was before the implementation of the session work, i've moved the test to test_session

thanks for the review,

cheers,

Kapil

52. By Kapil Thangavelu

merge trunk

53. By Kapil Thangavelu

add a disconnected client test for failures, dead connections can exhibit either notconnected or connection exceptions adjust tests to reflect.

54. By Kapil Thangavelu

address jim's review comments, grammar fixes, additional param valiadation tests

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

gustavo, even though this branch is approved, i'm going to wait on a final comment/agreement on [4] before merging.

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txzookeeper/client.py'
2--- txzookeeper/client.py 2011-06-30 20:18:33 +0000
3+++ txzookeeper/client.py 2011-06-30 20:18:34 +0000
4@@ -60,6 +60,24 @@
5 zookeeper.SYSTEMERROR: zookeeper.SystemErrorException,
6 zookeeper.UNIMPLEMENTED: zookeeper.UnimplementedException}
7
8+# Mapping of connection state values to human strings.
9+STATE_NAME_MAPPING = {
10+ zookeeper.ASSOCIATING_STATE: "associating",
11+ zookeeper.AUTH_FAILED_STATE: "auth-failed",
12+ zookeeper.CONNECTED_STATE: "connected",
13+ zookeeper.CONNECTING_STATE: "connecting",
14+ zookeeper.EXPIRED_SESSION_STATE: "expired",
15+}
16+
17+# Mapping of event type to human string.
18+TYPE_NAME_MAPPING = {
19+ zookeeper.NOTWATCHING_EVENT: "not-watching",
20+ zookeeper.SESSION_EVENT: "session",
21+ zookeeper.CREATED_EVENT: "created",
22+ zookeeper.DELETED_EVENT: "deleted",
23+ zookeeper.CHANGED_EVENT: "changed",
24+ zookeeper.CHILD_EVENT: "child"}
25+
26
27 class NotConnectedException(zookeeper.ZooKeeperException):
28 """
29@@ -73,6 +91,29 @@
30 Raised if an error occurs during the client's connection attempt.
31 """
32
33+ @property
34+ def state_name(self):
35+ return STATE_NAME_MAPPING[self.args[2]]
36+
37+ @property
38+ def type_name(self):
39+ return TYPE_NAME_MAPPING[self.args[1]]
40+
41+ def __str__(self):
42+ return "<txzookeeper.ConnectionException type: %s state: %s>" % (
43+ self.type_name, self.state_name)
44+
45+
46+def is_connection_exception(e):
47+ """
48+ For connection errors in response to api calls, a utility method
49+ to determine if the cause is a connection exception.
50+ """
51+ return isinstance(e,
52+ (zookeeper.ClosingException,
53+ zookeeper.ConnectionLossException,
54+ zookeeper.SessionExpiredException))
55+
56
57 class ConnectionTimeoutException(zookeeper.ZooKeeperException):
58 """
59@@ -87,20 +128,17 @@
60 some event on the zookeeper client that the watch was requested on.
61 """
62
63- type_name_map = {
64- -2: "notwatching",
65- -1: "session",
66- 1: 'created',
67- 2: 'deleted',
68- 3: 'changed',
69- 4: 'child'}
70-
71 @property
72 def type_name(self):
73- return self.type_name_map[self.type]
74+ return TYPE_NAME_MAPPING[self.type]
75+
76+ @property
77+ def state_name(self):
78+ return STATE_NAME_MAPPING[self.connection_state]
79
80 def __repr__(self):
81- return "<ClientEvent %s at %r>" % (self.type_name, self.path)
82+ return "<ClientEvent %s at %r state: %s>" % (
83+ self.type_name, self.path, self.state_name)
84
85
86 class ZookeeperClient(object):
87@@ -108,11 +146,13 @@
88
89 def __init__(self, servers=None, session_timeout=None):
90 """
91- @param servers: A string specifying the servers and their ports to
92- connect to. Multiple servers can be specified in
93- comma separated fashion. Optionally a chroot
94- can be specified. A full server spec looks like
95- host:port/chroot_path
96+ @param servers: A string specifying the servers and their
97+ ports to connect to. Multiple servers can be
98+ specified in comma separated fashion. if they are,
99+ then the client will automatically rotate
100+ among them if a server connection fails. Optionally
101+ a chroot can be specified. A full server spec looks
102+ like host:port/chroot_path
103
104 @param session_timeout: The client's zookeeper session timeout can be
105 hinted. The actual value is negotiated between the
106@@ -121,29 +161,55 @@
107 """
108 self._servers = servers
109 self._session_timeout = session_timeout
110+ self._session_event_callback = None
111+ self._connection_error_callback = None
112 self.connected = False
113 self.handle = None
114
115- def _check_connected(self):
116+ def _check_connected(self, d):
117 if not self.connected:
118- raise NotConnectedException("not connected")
119-
120- def _check_result(self, result_code, deferred=None, extra_codes=()):
121+ d.errback(NotConnectedException("not connected"))
122+ return d
123+
124+ def _check_result(self, result_code, deferred, extra_codes=()):
125+ """Check an API call or result for errors.
126+
127+ :param result_code: The api result code.
128+ :param deferred: The deferred returned the client api consumer.
129+ :param extra_codes: Additional result codes accepted as valid/ok.
130+
131+ If the result code is an error, an appropriate Exception class
132+ is constructed and the errback on the deferred is invoked with it.
133+ """
134 error = None
135 if not result_code == zookeeper.OK and not result_code in extra_codes:
136 error_msg = zookeeper.zerror(result_code)
137 error_class = ERROR_MAPPING.get(
138 result_code, zookeeper.ZooKeeperException)
139 error = error_class(error_msg)
140- if deferred:
141- deferred.errback(error)
142- return error
143- raise error
144+
145+ if is_connection_exception(error):
146+ # Mark the client as disconnected.
147+ self.connected = False
148+ # Route connection errors to a connection level error
149+ # handler if specified.
150+ if self._connection_error_callback:
151+ # The result of the connection error handler is returned
152+ # to the api.
153+ d = defer.maybeDeferred(
154+ self._connection_error_callback,
155+ self, error)
156+ d.chainDeferred(deferred)
157+ return True
158+
159+ deferred.errback(error)
160+ return True
161 return None
162
163 def _get(self, path, watcher):
164- self._check_connected()
165 d = defer.Deferred()
166+ if self._check_connected(d):
167+ return d
168
169 def _cb_get(result_code, value, stat):
170 if self._check_result(result_code, d):
171@@ -157,8 +223,9 @@
172 return d
173
174 def _get_children(self, path, watcher):
175- self._check_connected()
176 d = defer.Deferred()
177+ if self._check_connected(d):
178+ return d
179
180 def _cb_get_children(result_code, children):
181 if self._check_result(result_code, d):
182@@ -172,8 +239,9 @@
183 return d
184
185 def _exists(self, path, watcher):
186- self._check_connected()
187 d = defer.Deferred()
188+ if self._check_connected(d):
189+ return d
190
191 def _cb_exists(result_code, stat):
192 if self._check_result(
193@@ -190,7 +258,24 @@
194 def _wrap_watcher(self, watcher):
195 if watcher is None:
196 return watcher
197- return self._zk_thread_callback(watcher)
198+ if not callable(watcher):
199+ raise SyntaxError("invalid watcher")
200+ return self._zk_thread_callback(
201+ partial(self._session_event_wrapper, watcher))
202+
203+ def _session_event_wrapper(self, watcher, event_type, conn_state, path):
204+ """Watch wrapper that diverts session events to a connection callback.
205+ """
206+ # If it's a session event pass it to the session callback, else
207+ # ignore it. Session events are sent repeatedly to watchers
208+ # which we have modeled after deferred, which only accept a
209+ # single return value.
210+ if event_type == zookeeper.SESSION_EVENT:
211+ if self._session_event_callback:
212+ self._session_event_callback(
213+ self, ClientEvent(event_type, conn_state, path))
214+ else:
215+ return watcher(event_type, conn_state, path)
216
217 def _zk_thread_callback(self, func):
218 """
219@@ -198,7 +283,6 @@
220 any user defined callback so that they are called back in the main
221 thread after, zookeeper calls the wrapper.
222 """
223-
224 def wrapper(handle, *args): # pragma: no cover
225 reactor.callFromThread(func, *args)
226 return wrapper
227@@ -231,9 +315,14 @@
228
229 @property
230 def client_id(self):
231- """
232- The connection's client id, useful when introspecting the server logs
233- for specific client activity.
234+ """Returns the client id that identifies the server side session.
235+
236+ A client id is a tuple represented by the session id and
237+ session password. It can be used to manually connect to an
238+ extant server session (which contains associated ephemeral
239+ nodes and watches)/ The connection's client id is also useful
240+ when introspecting the server logs for specific client
241+ activity.
242 """
243 if self.handle is None:
244 return None
245@@ -248,21 +337,21 @@
246 return bool(zookeeper.is_unrecoverable(self.handle))
247
248 def add_auth(self, scheme, identity):
249- """
250- Adds an authentication identity to this connection. A connection
251- can use multiple authentication identities at the same time, all
252- are checked when verifying acls on a node.
253+ """Adds an authentication identity to this connection.
254+
255+ A connection can use multiple authentication identities at the
256+ same time, all are checked when verifying acls on a node.
257
258 @param scheme: a string specifying a an authentication scheme
259 valid values include 'digest'.
260- @param identity: a string containingusername and password colon
261- separated for example 'mary:apples'
262+ @param identity: a string containing username and password colon
263+ separated, for example 'mary:apples'
264 """
265- self._check_connected()
266 d = defer.Deferred()
267+ if self._check_connected(d):
268+ return d
269
270 def _cb_authenticated(result_code):
271-
272 if self._check_result(result_code, d):
273 return
274 d.callback(self)
275@@ -274,8 +363,7 @@
276
277 def close(self, force=False):
278 """
279- Close the underlying socket connection and zookeeper server side
280- session.
281+ Close the underlying socket connection and server side session.
282
283 @param force: boolean, require the connection to be closed now or
284 an exception be raised.
285@@ -285,12 +373,14 @@
286
287 result = zookeeper.close(self.handle)
288 self.connected = False
289- error = self._check_result(result)
290- if error:
291- return defer.fail(error)
292- return result
293-
294- def connect(self, servers=None, timeout=10):
295+ d = defer.Deferred()
296+
297+ if self._check_result(result, d):
298+ return d
299+ d.callback(True)
300+ return d
301+
302+ def connect(self, servers=None, timeout=10, client_id=None):
303 """
304 Establish a connection to the given zookeeper server(s).
305
306@@ -299,6 +389,8 @@
307 comma separated fashion.
308 @param timeout: How many seconds to wait on a connection to the
309 zookeeper servers.
310+
311+ @param session_id:
312 @returns A deferred that's fired when the connection is established.
313 """
314 d = defer.Deferred()
315@@ -324,31 +416,45 @@
316 if servers is not None:
317 self._servers = servers
318
319- self.handle = zookeeper.init(
320- self._servers, callback, self._session_timeout)
321+ # Assemble client id if specified.
322+ if client_id:
323+ self.handle = zookeeper.init(
324+ self._servers, callback, self._session_timeout, client_id)
325+ else:
326+ self.handle = zookeeper.init(
327+ self._servers, callback, self._session_timeout)
328
329 return d
330
331 def _cb_connected(
332 self, scheduled_timeout, connect_deferred, type, state, path):
333+ """This callback is invoked through the lifecycle of the connection.
334
335+ It's used for all connection level events and session events.
336+ """
337 # Cancel the timeout delayed task if it hasn't fired.
338 if scheduled_timeout.active():
339 scheduled_timeout.cancel()
340
341+ # Update connected boolean
342+ if state == zookeeper.CONNECTED_STATE:
343+ self.connected = True
344+ elif state != zookeeper.CONNECTING_STATE:
345+ self.connected = False
346+
347 if connect_deferred.called:
348 # If we timed out and then connected, then close the conn.
349- if state == zookeeper.CONNECTED_STATE:
350- self.connected = True
351+ if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called:
352 self.close()
353- # If the client is reused across multiple connect/close
354- # cycles, and a previous connect timed out, then a
355- # subsequent connect may trigger the previous connect's
356- # handler notifying of a CONNECTING_STATE, ignore.
357+
358+ # Send session events to the callback, in addition to any
359+ # duplicate session events that will be sent for extant watches.
360+ if self._session_event_callback:
361+ self._session_event_callback(
362+ self, ClientEvent(type, state, path))
363+
364 return
365 elif state == zookeeper.CONNECTED_STATE:
366- # Connection established.
367- self.connected = True
368 connect_deferred.callback(self)
369 return
370
371@@ -364,8 +470,9 @@
372 @params acls: A list of dictionaries specifying permissions.
373 @params flags: Node creation flags (ephemeral, sequence, persistent)
374 """
375- self._check_connected()
376 d = defer.Deferred()
377+ if self._check_connected(d):
378+ return d
379
380 def _cb_created(result_code, path):
381 if self._check_result(result_code, d):
382@@ -388,8 +495,9 @@
383 @param path: the path of the node to be deleted.
384 @param version: the integer version of the node.
385 """
386- self._check_connected()
387 d = defer.Deferred()
388+ if self._check_connected(d):
389+ return d
390
391 def _cb_delete(result_code):
392 if self._check_result(result_code, d):
393@@ -485,8 +593,9 @@
394
395 @param path: The path of the node whose acl will be retrieved.
396 """
397- self._check_connected()
398 d = defer.Deferred()
399+ if self._check_connected(d):
400+ return d
401
402 def _cb_get_acl(result_code, acls, stat):
403 if self._check_result(result_code, d):
404@@ -521,8 +630,9 @@
405 doesn't match the version on the server, then a
406 BadVersionException is raised.
407 """
408- self._check_connected()
409 d = defer.Deferred()
410+ if self._check_connected(d):
411+ return d
412
413 def _cb_set_acl(result_code):
414 if self._check_result(result_code, d):
415@@ -546,8 +656,9 @@
416 @param data: The data to store on the node.
417 @param version: Integer version value
418 """
419- self._check_connected()
420 d = defer.Deferred()
421+ if self._check_connected(d):
422+ return d
423
424 def _cb_set(result_code, node_stat):
425 if self._check_result(result_code, d):
426@@ -571,14 +682,60 @@
427 watcher = self._wrap_watcher(watcher)
428 zookeeper.set_watcher(self.handle, watcher)
429
430+ def set_session_callback(self, callback):
431+ """Set a callback to receive session events.
432+
433+ Session events are by default ignored. Interested applications
434+ may choose to set a session event watcher on the connection
435+ to receive session events. Session events are typically broadcast
436+ by the libzookeeper library to all extant watchers, but the
437+ twisted integration using deferreds is not capable of receiving
438+ multiple values (session events and watch events), so this
439+ client implementation instead provides for a user defined callback
440+ to be invoked with them instead. The callback receives a single
441+ parameter, the session event in the form of a ClientEvent instance.
442+
443+ Additional details on session events
444+ ------------------------------------
445+ http://bit.ly/mQrOMY
446+ http://bit.ly/irKpfn
447+ """
448+ if not callable(callback):
449+ raise TypeError("Invalid callback %r" % callback)
450+ self._session_event_callback = callback
451+
452+ def set_connection_error_callback(self, callback):
453+ """Set a callback to receive connection error exceptions.
454+
455+ By default the error will be raised when the client API
456+ call is made. Setting a connection level error handler allows
457+ applications to centralize their handling of connection loss,
458+ instead of having to guard every zk interaction.
459+
460+ The callback receives two parameters, the client instance
461+ and the exception.
462+ """
463+ if not callable(callback):
464+ raise TypeError("Invalid callback %r" % callback)
465+ self._connection_error_callback = callback
466+
467+ def set_determinstic_order(self, boolean):
468+ """
469+ The zookeeper client will by default randomize the server hosts
470+ it will connect to unless this is set to True.
471+
472+ This is a global setting across connections.
473+ """
474+ zookeeper.deterministic_conn_order(bool(boolean))
475+
476 def sync(self, path="/"):
477- """
478- Flushes the zookeeper connection to the leader.
479+ """Flushes the connected zookeeper server with the leader.
480
481 @param path: The root path to flush, all child nodes are also flushed.
482 """
483- self._check_connected()
484 d = defer.Deferred()
485+ if self._check_connected(d):
486+ return d
487
488 def _cb_sync(result_code, path):
489 if self._check_result(result_code, d):
490
491=== modified file 'txzookeeper/queue.py'
492--- txzookeeper/queue.py 2011-06-17 23:52:34 +0000
493+++ txzookeeper/queue.py 2011-06-30 20:18:34 +0000
494@@ -90,7 +90,7 @@
495 def on_queue_items_changed(*args):
496 """Event watcher on queue node child events."""
497 if request.complete or not self._client.connected:
498- return # pragma: no cover
499+ return # pragma: no cover
500
501 if request.processing_children:
502 # If deferred stack is currently processing a set of children
503@@ -273,7 +273,7 @@
504 """
505
506 def _item_processed_callback(self, result_code, item_path):
507- return self._client.delete(item_path+"-processing")
508+ return self._client.delete(item_path + "-processing")
509
510 def _filter_children(self, children, suffix="-processing"):
511 """
512@@ -300,7 +300,7 @@
513
514 def on_node_exists(stat, path):
515 """Reserve the node for consumer processing."""
516- d = self._client.create(path+"-processing",
517+ d = self._client.create(path + "-processing",
518 flags=zookeeper.EPHEMERAL)
519 d.addCallback(on_reservation_success, path)
520 d.addErrback(on_reservation_failed)
521@@ -315,7 +315,7 @@
522
523 def on_get_node_failed(failure, path):
524 """If we can't fetch the node, delete the processing node."""
525- d = self._client.delete(path+"-processing")
526+ d = self._client.delete(path + "-processing")
527
528 # propogate unexpected errors appropriately
529 if not failure.check(zookeeper.NoNodeException):
530@@ -372,7 +372,7 @@
531
532 def __init__(self, path, client, acl=None, persistent=False):
533 super(SerializedQueue, self).__init__(path, client, acl, persistent)
534- self._lock = Lock("%s/%s"%(self.path, "_lock"), client)
535+ self._lock = Lock("%s/%s" % (self.path, "_lock"), client)
536
537 def _item_processed_callback(self, result_code, item_path):
538 return self._lock.release()
539
540=== modified file 'txzookeeper/tests/__init__.py'
541--- txzookeeper/tests/__init__.py 2011-06-17 23:52:34 +0000
542+++ txzookeeper/tests/__init__.py 2011-06-30 20:18:34 +0000
543@@ -36,8 +36,8 @@
544
545 def tearDown(self):
546 super(ZookeeperTestCase, self).tearDown()
547- zookeeper.set_log_stream(sys.stderr) # reset to default
548- zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
549+ #zookeeper.set_log_stream(sys.stderr) # reset to default
550+ #zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
551 self.log_file.close()
552
553 def get_log(self):
554
555=== added file 'txzookeeper/tests/common.py'
556--- txzookeeper/tests/common.py 1970-01-01 00:00:00 +0000
557+++ txzookeeper/tests/common.py 2011-06-30 20:18:34 +0000
558@@ -0,0 +1,216 @@
559+import os
560+import os.path
561+import shutil
562+import subprocess
563+import tempfile
564+
565+from itertools import chain
566+from collections import namedtuple
567+from glob import glob
568+
569+
570+ServerInfo = namedtuple(
571+ "ServerInfo", "server_id client_port election_port leader_port")
572+
573+
574+class ManagedZooKeeper(object):
575+ """Class to manage the running of a ZooKeeper instance for testing.
576+
577+ Note: no attempt is made to probe the ZooKeeper instance is
578+ actually available, or that the selected port is free. In the
579+ future, we may want to do that, especially when run in a
580+ Hudson/Buildbot context, to ensure more test robustness."""
581+
582+ def __init__(self, software_path, server_info, peers=()):
583+ """Define the ZooKeeper test instance.
584+
585+ @param install_path: The path to the install for ZK
586+ @param port: The port to run the managed ZK instance
587+ """
588+ self.install_path = software_path
589+ self.server_info = server_info
590+ self.host = "127.0.0.1"
591+ self.peers = peers
592+ self.working_path = tempfile.mkdtemp()
593+ self._running = False
594+
595+ def run(self):
596+ """Run the ZooKeeper instance under a temporary directory.
597+
598+ Writes ZK log messages to zookeeper.log in the current directory.
599+ """
600+ config_path = os.path.join(self.working_path, "zoo.cfg")
601+ log_path = os.path.join(self.working_path, "log")
602+ log4j_path = os.path.join(self.working_path, "log4j.properties")
603+ data_path = os.path.join(self.working_path, "data")
604+
605+ # various setup steps
606+ if not os.path.exists(self.working_path):
607+ os.mdir(self.working_path)
608+ if not os.path.exists(log_path):
609+ os.mkdir(log_path)
610+ if not os.path.exists(data_path):
611+ os.mkdir(data_path)
612+
613+ with open(config_path, "w") as config:
614+ config.write("""
615+tickTime=2000
616+dataDir=%s
617+clientPort=%s
618+maxClientCnxns=0
619+""" % (data_path, self.server_info.client_port))
620+
621+ # setup a replicated setup if peers are specified
622+ if self.peers:
623+ servers_cfg = []
624+ for p in chain((self.server_info,), self.peers):
625+ servers_cfg.append("server.%s=localhost:%s:%s" % (
626+ p.server_id, p.leader_port, p.election_port))
627+
628+ with open(config_path, "a") as config:
629+ config.write("""
630+initLimit=4
631+syncLimit=2
632+%s
633+""" % ("\n".join(servers_cfg)))
634+
635+ # Write server ids into datadir
636+ with open(os.path.join(data_path, "myid"), "w") as myid_file:
637+ myid_file.write(str(self.server_info.server_id))
638+
639+ with open(log4j_path, "w") as log4j:
640+ log4j.write("""
641+# DEFAULT: console appender only
642+log4j.rootLogger=INFO, ROLLINGFILE
643+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
644+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
645+log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
646+log4j.appender.ROLLINGFILE.Threshold=DEBUG
647+log4j.appender.ROLLINGFILE.File=""" + (
648+ self.working_path + os.sep + "zookeeper.log\n"))
649+
650+ self.process = subprocess.Popen(
651+ args=["java",
652+ "-cp", self.classpath,
653+ "-Dzookeeper.log.dir=%s" % log_path,
654+ "-Dzookeeper.root.logger=INFO,CONSOLE",
655+ "-Dlog4j.configuration=file:%s" % log4j_path,
656+ # "-Dlog4j.debug",
657+ "org.apache.zookeeper.server.quorum.QuorumPeerMain",
658+ config_path],
659+ )
660+ self._running = True
661+
662+ @property
663+ def classpath(self):
664+ """Get the classpath necessary to run ZooKeeper."""
665+
666+ # Two possibilities, as seen in zkEnv.sh:
667+ # Check for a release - top-level zookeeper-*.jar?
668+ jars = glob((os.path.join(
669+ self.install_path, 'zookeeper-*.jar')))
670+ if jars:
671+ # Relase build (`ant package`)
672+ jars.extend(glob(os.path.join(
673+ self.install_path,
674+ "lib/*.jar")))
675+ else:
676+ # Development build (plain `ant`)
677+ jars = glob((os.path.join(
678+ self.install_path, 'build/zookeeper-*.jar')))
679+ jars.extend(glob(os.path.join(
680+ self.install_path,
681+ "build/lib/*.jar")))
682+ return ":".join(jars)
683+
684+ @property
685+ def address(self):
686+ """Get the address of the ZooKeeper instance."""
687+ return "%s:%s" % (self.host, self.client_port)
688+
689+ @property
690+ def running(self):
691+ return self._running
692+
693+ @property
694+ def client_port(self):
695+ return self.server_info.client_port
696+
697+ def reset(self):
698+ """Stop the zookeeper instance, cleaning out its on disk-data."""
699+ self.stop()
700+ shutil.rmtree(os.path.join(self.working_path, "data"))
701+ os.mkdir(os.path.join(self.working_path, "data"))
702+ with open(os.path.join(self.working_path, "data", "myid"), "w") as fh:
703+ fh.write(str(self.server_info.server_id))
704+
705+ def stop(self):
706+ """Stop the Zookeeper instance, retaining on disk state."""
707+ if not self._running:
708+ return
709+ self.process.terminate()
710+ self.process.wait()
711+ self._running = False
712+
713+ def destroy(self):
714+ """Stop the ZooKeeper instance and destroy its on disk-state"""
715+ # called by at exit handler, reimport to avoid cleanup race.
716+ import shutil
717+ self.stop()
718+
719+ shutil.rmtree(self.working_path)
720+
721+
722+class ZookeeperCluster(object):
723+
724+ def __init__(self, install_path, size=3, port_offset=20000):
725+ self._install_path = install_path
726+ self._servers = []
727+
728+ # Calculate ports and peer group
729+ port = port_offset
730+ peers = []
731+
732+ for i in range(size):
733+ port += i * 10
734+ info = ServerInfo(i + 1, port, port + 1, port + 2)
735+ peers.append(info)
736+
737+ # Instantiate Managed ZK Servers
738+ for i in range(size):
739+ server_peers = list(peers)
740+ server_info = server_peers.pop(i)
741+ self._servers.append(
742+ ManagedZooKeeper(
743+ self._install_path, server_info, server_peers))
744+
745+ def __getitem__(self, k):
746+ return self._servers[k]
747+
748+ def __iter__(self):
749+ return iter(self._servers)
750+
751+ def start(self):
752+ # Zookeeper client expresses a preference for either lower ports or
753+ # lexographical ordering of hosts, to ensure that all servers have a
754+ # chance to startup, start them in reverse order.
755+ for server in reversed(list(self)):
756+ server.run()
757+ # Giving the servers a moment to start, decreases the overall time
758+ # required for a client to successfully connect (2s vs. 4s without
759+ # the sleep).
760+ import time
761+ time.sleep(2)
762+
763+ def stop(self):
764+ for server in self:
765+ server.stop()
766+ self._servers = []
767+
768+ def terminate(self):
769+ for server in self:
770+ server.destroy()
771+
772+ def reset(self):
773+ for server in self:
774+ server.reset()
775
776=== modified file 'txzookeeper/tests/test_client.py'
777--- txzookeeper/tests/test_client.py 2011-06-30 20:18:33 +0000
778+++ txzookeeper/tests/test_client.py 2011-06-30 20:18:34 +0000
779@@ -23,18 +23,15 @@
780
781 from twisted.internet.defer import Deferred
782 from twisted.internet.base import DelayedCall
783+from twisted.python.failure import Failure
784
785 import zookeeper
786
787 from mocker import ANY, MATCH
788-from twisted.internet.defer import Deferred
789 from txzookeeper.tests import ZookeeperTestCase, utils
790 from txzookeeper.client import (
791 ZookeeperClient, ZOO_OPEN_ACL_UNSAFE, ConnectionTimeoutException,
792- ConnectionException, ClientEvent)
793-
794-from mocker import ANY
795-from txzookeeper.tests import ZookeeperTestCase, utils
796+ ConnectionException, NotConnectedException, ClientEvent)
797
798 PUBLIC_ACL = ZOO_OPEN_ACL_UNSAFE
799
800@@ -56,9 +53,10 @@
801 if self.client.connected:
802 utils.deleteTree(handle=self.client.handle)
803 self.client.close()
804- del self.client
805+
806 if self.client2 and self.client2.connected:
807 self.client2.close()
808+
809 super(ClientTests, self).tearDown()
810
811 def test_wb_connect_after_timeout(self):
812@@ -122,8 +120,10 @@
813 return d
814
815 def test_client_event_repr(self):
816- event = ClientEvent(4, 'state', 'path')
817- self.assertEqual(repr(event), "<ClientEvent child at 'path'>")
818+ event = ClientEvent(zookeeper.SESSION_EVENT,
819+ zookeeper.EXPIRED_SESSION_STATE, '')
820+ self.assertEqual(repr(event),
821+ "<ClientEvent session at '' state: expired>")
822
823 def test_client_event_attributes(self):
824 event = ClientEvent(4, 'state', 'path')
825@@ -132,6 +132,10 @@
826 self.assertEqual(event.path, 'path')
827 self.assertEqual(event, (4, 'state', 'path'))
828
829+ def test_client_use_while_disconnected_returns_failure(self):
830+ return self.assertFailure(
831+ self.client.exists("/"), NotConnectedException)
832+
833 def test_create_ephemeral_node_and_close_connection(self):
834 """
835 The client can create transient nodes that are destroyed when the
836@@ -316,14 +320,17 @@
837 return self.client.create("/foobar-watched", "rabbit")
838
839 def get_node(path):
840- return self.client.get_and_watch(path)
841+ data, watch = self.client.get_and_watch(path)
842+ return data.addCallback(lambda x: (watch,))
843
844- def new_connection((data, watch)):
845+ def new_connection((watch,)):
846 self.client2 = ZookeeperClient("127.0.0.1:2181")
847- return self.client2.connect(), watch
848+ return self.client2.connect().addCallback(
849+ lambda x, y=None, z=None: (x, watch))
850
851 def trigger_watch((client, watch)):
852 zookeeper.delete(self.client2.handle, "/foobar-watched")
853+ self.client2.close()
854 return watch
855
856 def verify_watch(event):
857@@ -669,29 +676,21 @@
858 return d
859
860 def test_get_children_with_error(self):
861+ """If the result of an api call is an error, its propgated.
862+ """
863 d = self.client.connect()
864
865- def inject_error(result_code, d):
866- error = SyntaxError()
867- d.errback(error)
868- return error
869-
870 def get_children(client):
871- mock_client = self.mocker.patch(self.client)
872- # mock once for the api call
873- mock_client._check_result(ANY, DEFERRED_MATCH)
874- self.mocker.result(None)
875- # mock twice for the async result
876- mock_client._check_result(ANY, DEFERRED_MATCH)
877- self.mocker.call(inject_error)
878- self.mocker.replay()
879+ # Get the children of a nonexistant node
880 return client.get_children("/tower")
881
882 def verify_failure(failure):
883- self.assertTrue(isinstance(failure.value, SyntaxError))
884+ self.assertTrue(isinstance(failure, Failure))
885+ self.assertTrue(
886+ isinstance(failure.value, zookeeper.NoNodeException))
887
888 d.addCallback(get_children)
889- d.addErrback(verify_failure)
890+ d.addBoth(verify_failure)
891 return d
892
893 # seems to be a segfault on this one, must be running latest zk
894@@ -848,7 +847,7 @@
895 def verify_node_access(stat):
896 self.assertEqual(stat['version'], 1)
897 self.assertEqual(stat['dataLength'], 3)
898- self.assertTrue(failed) # we should have hit the errback
899+ self.assertTrue(failed) # we should have hit the errback
900
901 d.addCallback(add_auth_one)
902 d.addCallback(create_node)
903@@ -927,20 +926,7 @@
904
905 acl = dict(scheme="digest", id="a:b", perms=zookeeper.PERM_ALL)
906
907- def inject_error(result, d):
908- error = zookeeper.NoNodeException()
909- d.errback(error)
910- return error
911-
912 def set_acl(client):
913- mock_client = self.mocker.patch(client)
914- # mock once for the api call
915- mock_client._check_result(ANY, DEFERRED_MATCH)
916- self.mocker.result(None)
917- # mock twice for the async result
918- mock_client._check_result(ANY, DEFERRED_MATCH)
919- self.mocker.call(inject_error)
920- self.mocker.replay()
921 return client.set_acl("/zebra-moon22", [acl])
922
923 def verify_failure(failure):
924@@ -983,25 +969,17 @@
925 d.errback(error)
926 return error
927
928- def create_node(client):
929- return client.create("/moose")
930-
931 def get_acl(path):
932- mock_client = self.mocker.patch(self.client)
933- mock_client._check_result(ANY, DEFERRED_MATCH)
934- self.mocker.result(None)
935- mock_client._check_result(ANY, DEFERRED_MATCH)
936- self.mocker.call(inject_error)
937- self.mocker.replay()
938- return self.client.get_acl(path)
939+ # Get the ACL of a nonexistant node
940+ return self.client.get_acl("/moose")
941
942 def verify_failure(failure):
943+ self.assertTrue(isinstance(failure, Failure))
944 self.assertTrue(
945 isinstance(failure.value, zookeeper.ZooKeeperException))
946
947- d.addCallback(create_node)
948 d.addCallback(get_acl)
949- d.addErrback(verify_failure)
950+ d.addBoth(verify_failure)
951 return d
952
953 def test_client_id(self):
954@@ -1045,42 +1023,6 @@
955 d.addCallback(verify_sync)
956 return d
957
958- def test_sync_error(self):
959- """
960- On error the sync callback returns a an exception/failure.
961- """
962- d = self.client.connect()
963-
964- def inject_error(result_code, d):
965- error = zookeeper.ZooKeeperException("foobar")
966- d.errback(error)
967- return error
968-
969- def create_node(client):
970- return client.create("/abc")
971-
972- def client_sync(path):
973- mock_client = self.mocker.patch(self.client)
974- mock_client._check_result(ANY, DEFERRED_MATCH)
975- self.mocker.result(None)
976- mock_client._check_result(ANY, DEFERRED_MATCH)
977- self.mocker.call(inject_error)
978- self.mocker.replay()
979- return self.client.sync(path)
980-
981- def verify_failure(failure):
982- self.assertTrue(
983- isinstance(failure.value, zookeeper.ZooKeeperException))
984-
985- def assert_failed(extra):
986- self.fail("Should have gone to errback")
987-
988- d.addCallback(create_node)
989- d.addCallback(client_sync)
990- d.addCallback(assert_failed)
991- d.addErrback(verify_failure)
992- return d
993-
994 def test_property_servers(self):
995 """
996 The servers property of the client, shows which if any servers
997@@ -1140,6 +1082,60 @@
998 d.addErrback(verify_invalid)
999 return d
1000
1001+ def xtest_session_expired_event(self):
1002+ """
1003+ A client session can be reattached to in a separate connection,
1004+ if the a session is expired, using the zookeeper connection will
1005+ raise a SessionExpiredException.
1006+ """
1007+ d = self.client.connect()
1008+
1009+ class StopTest(Exception):
1010+ pass
1011+
1012+ def new_connection_same_connection(client):
1013+ self.assertEqual(client.state, zookeeper.CONNECTED_STATE)
1014+ return ZookeeperClient("127.0.0.1:2181").connect(
1015+ client_id=client.client_id).addErrback(
1016+ guard_session_expired, client)
1017+
1018+ def guard_session_expired(failure, client):
1019+ # On occassion we get a session expired event while connecting.
1020+ failure.trap(ConnectionException)
1021+ self.assertEqual(failure.value.state_name, "expired")
1022+ # Stop the test from proceeding
1023+ raise StopTest()
1024+
1025+ def close_new_connection(client):
1026+ # Verify both connections are using same session
1027+ self.assertEqual(self.client.client_id, client.client_id)
1028+ self.assertEqual(client.state, zookeeper.CONNECTED_STATE)
1029+
1030+ # Closing one connection will close the session
1031+ client.close()
1032+
1033+ # Continued use of the other client will get a
1034+ # disconnect exception.
1035+ return self.client.exists("/")
1036+
1037+ def verify_original_closed(failure):
1038+ if not isinstance(failure, Failure):
1039+ self.fail("Test did not raise exception.")
1040+ failure.trap(
1041+ zookeeper.SessionExpiredException,
1042+ zookeeper.ConnectionLossException)
1043+
1044+ #print "client close"
1045+ self.client.close()
1046+ #print "creating new client for teardown"
1047+ return self.client.connect()
1048+
1049+ d.addCallback(new_connection_same_connection)
1050+ d.addCallback(close_new_connection)
1051+ d.addBoth(verify_original_closed)
1052+
1053+ return d
1054+
1055 def test_connect_with_server(self):
1056 """
1057 A client's servers can be specified in the connect method.
1058@@ -1210,14 +1206,14 @@
1059 All of the client apis (with the exception of connect) attempt
1060 to ensure the client is connected before executing an operation.
1061 """
1062- self.assertRaises(
1063- zookeeper.ZooKeeperException, self.client.get_children, "/abc")
1064-
1065- self.assertRaises(
1066- zookeeper.ZooKeeperException, self.client.create, "/abc")
1067-
1068- self.assertRaises(
1069- zookeeper.ZooKeeperException, self.client.set, "/abc", "123")
1070+ self.assertFailure(
1071+ self.client.get_children("/abc"), zookeeper.ZooKeeperException)
1072+
1073+ self.assertFailure(
1074+ self.client.create("/abc"), zookeeper.ZooKeeperException)
1075+
1076+ self.assertFailure(
1077+ self.client.set("/abc", "123"), zookeeper.ZooKeeperException)
1078
1079 def test_connect_multiple_raises(self):
1080 """
1081@@ -1254,10 +1250,10 @@
1082
1083 def test_connection_watcher(self):
1084 """
1085- A connection watcher can be set that recieves notices on when the
1086- connection state changes. Technically zookeeper would also use this as
1087- a global watcher for node state changes, but zkpython doesn't expose
1088- that api, as its mostly considered legacy.
1089+ A connection watcher can be set that receives notices on when
1090+ the connection state changes. Technically zookeeper would also
1091+ use this as a global watcher for node watches, but zkpython
1092+ doesn't expose that api, as its mostly considered legacy.
1093
1094 its out of scope to simulate a connection level event within unit tests
1095 such as the server restarting.
1096@@ -1290,3 +1286,13 @@
1097 If the client is not connected, closing returns None.
1098 """
1099 self.assertEqual(self.client.close(), None)
1100+
1101+ def test_invalid_connection_error_callback(self):
1102+ self.assertRaises(TypeError,
1103+ self.client.set_connection_error_callback,
1104+ None)
1105+
1106+ def test_invalid_session_callback(self):
1107+ self.assertRaises(TypeError,
1108+ self.client.set_session_callback,
1109+ None)
1110
1111=== modified file 'txzookeeper/tests/test_node.py'
1112--- txzookeeper/tests/test_node.py 2011-06-17 23:52:34 +0000
1113+++ txzookeeper/tests/test_node.py 2011-06-30 20:18:34 +0000
1114@@ -59,7 +59,7 @@
1115 def _make_digest_identity(self, credentials):
1116 user, password = credentials.split(":")
1117 digest = hashlib.new("sha1", credentials).digest()
1118- return "%s:%s"%(user, base64.b64encode(digest))
1119+ return "%s:%s" % (user, base64.b64encode(digest))
1120
1121 def test_node_name_and_path(self):
1122 """
1123
1124=== modified file 'txzookeeper/tests/test_queue.py'
1125--- txzookeeper/tests/test_queue.py 2011-06-17 23:52:34 +0000
1126+++ txzookeeper/tests/test_queue.py 2011-06-30 20:18:34 +0000
1127@@ -175,7 +175,7 @@
1128 watch = Deferred()
1129 self.mocker.result((succeed(["entry-000000"]), watch))
1130
1131- item_path = "%s/%s"%(path, "entry-000000")
1132+ item_path = "%s/%s" % (path, "entry-000000")
1133 mock_client.get(item_path)
1134 self.mocker.result(fail(SyntaxError("x")))
1135 self.mocker.replay()
1136@@ -230,13 +230,13 @@
1137 producer_done = Deferred()
1138
1139 def iteration(i):
1140- if len(items) == (item_count-1):
1141+ if len(items) == (item_count - 1):
1142 return producer_done.callback(None)
1143 items.append(i)
1144 queue.put(str(i))
1145
1146 for i in range(item_count):
1147- reactor.callLater(i*0.05, iteration, i)
1148+ reactor.callLater(i * 0.05, iteration, i)
1149 yield producer_done
1150 returnValue(items)
1151
1152@@ -284,7 +284,7 @@
1153 def producer(start, offset):
1154 client = yield self.open_client()
1155 q = self.queue_factory(path, client)
1156- for i in range(start, start+offset):
1157+ for i in range(start, start + offset):
1158 yield q.put(str(i))
1159 produce_results.append(str(i))
1160
1161@@ -311,7 +311,7 @@
1162 yield DeferredList(
1163 [consumer(8), consumer(8), consumer(4)])
1164
1165- err = set(produce_results)-set(consume_results)
1166+ err = set(produce_results) - set(consume_results)
1167 self.assertFalse(err)
1168
1169 self.assertEqual(len(consume_results), len(produce_results))
1170
1171=== modified file 'txzookeeper/tests/test_security.py'
1172--- txzookeeper/tests/test_security.py 2011-06-17 23:52:34 +0000
1173+++ txzookeeper/tests/test_security.py 2011-06-30 20:18:34 +0000
1174@@ -73,9 +73,9 @@
1175 def connect_users(self, *users):
1176 clients = []
1177 for name in users:
1178- ident_user = getattr(self, "ident_%s"%(name), None)
1179+ ident_user = getattr(self, "ident_%s" % (name), None)
1180 if ident_user is None:
1181- raise AttributeError("Invalid User %s"%(name))
1182+ raise AttributeError("Invalid User %s" % (name))
1183 client = ZookeeperClient("127.0.0.1:2181", 3000)
1184 clients.append(client)
1185 yield self.open_and_authenticate(client, ident_user)
1186
1187=== added file 'txzookeeper/tests/test_session.py'
1188--- txzookeeper/tests/test_session.py 1970-01-01 00:00:00 +0000
1189+++ txzookeeper/tests/test_session.py 2011-06-30 20:18:34 +0000
1190@@ -0,0 +1,245 @@
1191+
1192+import atexit
1193+import os
1194+
1195+import zookeeper
1196+
1197+from twisted.internet import reactor
1198+from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
1199+
1200+from txzookeeper import ZookeeperClient
1201+from txzookeeper.client import NotConnectedException, ConnectionException
1202+
1203+from txzookeeper.tests.common import ZookeeperCluster
1204+from txzookeeper.tests import ZookeeperTestCase
1205+
1206+
1207+ZK_HOME = os.environ.get("ZOOKEEPER_PATH")
1208+assert ZK_HOME, "ZOOKEEPER_PATH environment variable must be defined"
1209+
1210+CLUSTER = ZookeeperCluster(ZK_HOME)
1211+atexit.register(lambda cluster: cluster.terminate(), CLUSTER)
1212+
1213+
1214+class ClientSessionTests(ZookeeperTestCase):
1215+
1216+ def setUp(self):
1217+ super(ClientSessionTests, self).setUp()
1218+ self.cluster.start()
1219+ self.client = None
1220+ self.client2 = None
1221+ zookeeper.deterministic_conn_order(True)
1222+ zookeeper.set_debug_level(0)
1223+
1224+ def sleep(self, delay):
1225+ """Non-blocking sleep."""
1226+ deferred = Deferred()
1227+ reactor.callLater(delay, deferred.callback, None)
1228+ return deferred
1229+
1230+ @property
1231+ def cluster(self):
1232+ return CLUSTER
1233+
1234+ def tearDown(self):
1235+ super(ClientSessionTests, self).tearDown()
1236+ self.cluster.reset()
1237+
1238+ @inlineCallbacks
1239+ def test_client_session_migration(self):
1240+ """A client will automatically rotate servers to ensure a connection.
1241+
1242+ A client connected to multiple servers, will transparently
1243+ migrate amongst them, as individual servers can no longer be
1244+ reached. A client's session will be maintined.
1245+ """
1246+ # Connect to the Zookeeper Cluster
1247+ servers = ",".join([s.address for s in self.cluster])
1248+ self.client = ZookeeperClient(servers)
1249+ yield self.client.connect()
1250+ yield self.client.create("/hello", flags=zookeeper.EPHEMERAL)
1251+
1252+ # Shutdown the server the client is connected to
1253+ self.cluster[0].stop()
1254+
1255+ # Wait for the shutdown and cycle, if we don't wait we'll
1256+ # get a zookeeper connectionloss exception on occassion.
1257+ yield self.sleep(0.1)
1258+
1259+ self.assertTrue(self.client.connected)
1260+ exists = yield self.client.exists("/hello")
1261+ self.assertTrue(exists)
1262+
1263+ @inlineCallbacks
1264+ def test_client_watch_migration(self):
1265+ """On server rotation, extant watches are still active.
1266+
1267+ A client connected to multiple servers, will transparently
1268+ migrate amongst them, as individual servers can no longer be
1269+ reached. Watch deferreds issued from the same client instance will
1270+ continue to function as the session is maintained.
1271+ """
1272+ session_events = []
1273+
1274+ def session_event_callback(connection, e):
1275+ session_events.append(e)
1276+
1277+ # Connect to the Zookeeper Cluster
1278+ servers = ",".join([s.address for s in self.cluster])
1279+ self.client = ZookeeperClient(servers)
1280+ self.client.set_session_callback(session_event_callback)
1281+ yield self.client.connect()
1282+
1283+ # Setup a watch
1284+ yield self.client.create("/hello")
1285+ exists_d, watch_d = self.client.exists_and_watch("/hello")
1286+ yield exists_d
1287+
1288+ # Shutdown the server the client is connected to
1289+ self.cluster[0].stop()
1290+
1291+ # Wait for the shutdown and cycle, if we don't wait we'll
1292+ # get occasionally get a zookeeper connectionloss exception.
1293+ yield self.sleep(0.1)
1294+
1295+ # The session events that would have been ignored are sent
1296+ # to the session event callback.
1297+ self.assertTrue(session_events)
1298+ self.assertTrue(self.client.connected)
1299+
1300+ # If we delete the node, we'll see the watch fire.
1301+ yield self.client.delete("/hello")
1302+ event = yield watch_d
1303+ self.assertEqual(event.type_name, "deleted")
1304+ self.assertEqual(event.path, "/hello")
1305+
1306+ @inlineCallbacks
1307+ def test_connection_error_handler(self):
1308+ """A callback can be specified for connection errors.
1309+
1310+ We can specify a callback for connection errors, that
1311+ can perform recovery for a disconnected client, restablishing
1312+ """
1313+ @inlineCallbacks
1314+ def connection_error_handler(connection, error):
1315+ # On loss of the connection, reconnect the client w/ same session.
1316+ yield connection.connect(
1317+ self.cluster[1].address, client_id=connection.client_id)
1318+ returnValue(23)
1319+
1320+ self.client = ZookeeperClient(self.cluster[0].address)
1321+ self.client.set_connection_error_callback(connection_error_handler)
1322+ yield self.client.connect()
1323+
1324+ yield self.client.create("/hello")
1325+ exists_d, watch_d = self.client.exists_and_watch("/hello")
1326+ yield exists_d
1327+
1328+ # Shutdown the server the client is connected to
1329+ self.cluster[0].stop()
1330+ yield self.sleep(0.1)
1331+
1332+ # Results in connection loss exception, and invoking of error handler.
1333+ result = yield self.client.exists("/hello")
1334+
1335+ # The result of the error handler is returned to the api
1336+ self.assertEqual(result, 23)
1337+
1338+ exists = yield self.client.exists("/hello")
1339+ self.assertTrue(exists)
1340+
1341+ @inlineCallbacks
1342+ def test_client_session_expiration_event(self):
1343+ """A client which recieves a session expiration event.
1344+ """
1345+ session_events = []
1346+ events_received = Deferred()
1347+
1348+ def session_event_callback(connection, e):
1349+ session_events.append(e)
1350+ if len(session_events) == 4:
1351+ events_received.callback(True)
1352+
1353+ # Connect to a node in the cluster and establish a watch
1354+ self.client = ZookeeperClient(self.cluster[0].address)
1355+ self.client.set_session_callback(session_event_callback)
1356+ yield self.client.connect()
1357+
1358+ child_d, watch_d = self.client.get_children_and_watch("/")
1359+ yield child_d
1360+
1361+ # Connect a client to the same session on a different node.
1362+ self.client2 = ZookeeperClient(self.cluster[0].address)
1363+ yield self.client2.connect(client_id=self.client.client_id)
1364+
1365+ # Close the new client and wait for the event propogation
1366+ yield self.client2.close()
1367+
1368+ # It can take some time for this to propagate
1369+ yield events_received
1370+ self.assertEqual(len(session_events), 4)
1371+ self.assertEqual(session_events[-1].state_name, "expired")
1372+
1373+ # The connection is dead without reconnecting.
1374+ yield self.assertFailure(
1375+ self.client.exists("/"),
1376+ NotConnectedException, ConnectionException)
1377+
1378+ self.assertTrue(self.client.unrecoverable)
1379+
1380+ # If a reconnect attempt is made with a dead session id
1381+ #yield self.client.connect(client_id=self.client.client_id)
1382+
1383+ test_client_session_expiration_event.timeout = 10
1384+
1385+ @inlineCallbacks
1386+ def test_client_reconnect_session_on_different_server(self):
1387+ """On connection failure, An application can choose to use a
1388+ new connection with which to reconnect to a different member
1389+ of the zookeeper cluster, reacquiring the extant session.
1390+
1391+ A large obvious caveat to using a new client instance rather
1392+ than reconnecting the existing client, is that even though the
1393+ session has outstanding watches, the watch callbacks/deferreds
1394+ won't be active unless the client instance used to create them
1395+ is connected.
1396+ """
1397+ session_events = []
1398+
1399+ def session_event_callback(connection, e):
1400+ session_events.append(e)
1401+
1402+ # Connect to a node in the cluster and establish a watch
1403+ self.client = ZookeeperClient(self.cluster[2].address)
1404+ self.client.set_session_callback(session_event_callback)
1405+ yield self.client.connect()
1406+
1407+ yield self.client.create("/hello", flags=zookeeper.EPHEMERAL)
1408+ exists_d, watch_d = self.client.exists_and_watch("/hello")
1409+ yield exists_d
1410+
1411+ # Shutdown the server the client is connected to
1412+ self.cluster[2].stop()
1413+ yield self.sleep(0.1)
1414+
1415+ # Verify we got a session event regarding the down server
1416+ self.assertTrue(session_events)
1417+
1418+ # Open up a new connection to a different server with same session
1419+ self.client2 = ZookeeperClient(self.cluster[0].address)
1420+ yield self.client2.connect(client_id=self.client.client_id)
1421+
1422+ # Close the old disconnected client
1423+ self.client.close()
1424+
1425+ # Verify the ephemeral still exists
1426+ exists = yield self.client2.exists("/hello")
1427+ self.assertTrue(exists)
1428+
1429+ # Destroy the session and reconnect
1430+ self.client2.close()
1431+ yield self.client.connect(self.cluster[0].address)
1432+
1433+ # Ephemeral is destroyed when the session closed.
1434+ exists = yield self.client.exists("/hello")
1435+ self.assertFalse(exists)
1436
1437=== modified file 'txzookeeper/tests/test_utils.py'
1438--- txzookeeper/tests/test_utils.py 2011-06-17 23:52:34 +0000
1439+++ txzookeeper/tests/test_utils.py 2011-06-30 20:18:34 +0000
1440@@ -34,7 +34,7 @@
1441 def update_function_increment(self, content, stat):
1442 if not content:
1443 return str(0)
1444- return str(int(content)+1)
1445+ return str(int(content) + 1)
1446
1447 def setUp(self):
1448 super(RetryChangeTest, self).setUp()
1449
1450=== modified file 'txzookeeper/tests/utils.py'
1451--- txzookeeper/tests/utils.py 2011-06-17 23:52:34 +0000
1452+++ txzookeeper/tests/utils.py 2011-06-30 20:18:34 +0000
1453@@ -25,9 +25,9 @@
1454 Destroy all the nodes in zookeeper (typically under a chroot for testing)
1455 """
1456 for child in zookeeper.get_children(handle, path):
1457- if child == "zookeeper": # skip the metadata node
1458+ if child == "zookeeper": # skip the metadata node
1459 continue
1460- child_path = "/"+("%s/%s"%(path, child)).strip("/")
1461+ child_path = "/" + ("%s/%s" % (path, child)).strip("/")
1462 try:
1463 deleteTree(child_path, handle)
1464 zookeeper.delete(handle, child_path, -1)

Subscribers

People subscribed via source and target branches

to all changes: