Merge lp:~hazmat/txzookeeper/session-event-handling into lp:txzookeeper

Proposed by Kapil Thangavelu
Status: Merged
Approved by: Gustavo Niemeyer
Approved revision: 54
Merged at revision: 41
Proposed branch: lp:~hazmat/txzookeeper/session-event-handling
Merge into: lp:txzookeeper
Diff against target: 1728 lines (+896/-206)
12 files modified
txzookeeper/client.py (+285/-110)
txzookeeper/queue.py (+5/-5)
txzookeeper/tests/__init__.py (+2/-2)
txzookeeper/tests/common.py (+216/-0)
txzookeeper/tests/mocker.py (+1/-0)
txzookeeper/tests/test_client.py (+131/-78)
txzookeeper/tests/test_node.py (+1/-1)
txzookeeper/tests/test_queue.py (+5/-5)
txzookeeper/tests/test_security.py (+2/-2)
txzookeeper/tests/test_session.py (+245/-0)
txzookeeper/tests/test_utils.py (+1/-1)
txzookeeper/tests/utils.py (+2/-2)
To merge this branch: bzr merge lp:~hazmat/txzookeeper/session-event-handling
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Approve
Jim Baker Pending
Review via email: mp+66901@code.launchpad.net

This proposal supersedes a proposal from 2011-06-20.

Description of the change

Session event and connection loss handling for txzookeeper.

  - Session tests using a ZK cluster as a test layer/test resource, cluster state
    is reset between tests.
  - Zookeeper Client session tests for server rotation across multi-node cluster.
  - Client server rotation and session/watch migration tests.
  - Session event handling, sent to user defined callback, else ignored by default.
  - Connection loss handling, sent to user defined callback, else raised at the
    API call point. The callback result if any is returned as the error to be
    returned to the API.

To post a comment you must log in.
Revision history for this message
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal

Please note that this branch has a pre-requisite branch also in review.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote : Posted in a previous version of this proposal

This looks very good. A few comments, and +1 with them considered.

[1]

+ zookeeper.NOTWATCHING_EVENT: "notwatching",
+ zookeeper.SESSION_EVENT: "session",
+ zookeeper.CREATED_EVENT: 'created',

Quotes are inconsistent here.

Also, "not-watching" would be a bit more friendly to the eye.

[2]

+def debug_symbols(sequence):

Left over?

[3]

error = error_class(error_msg)
+
+ if ConnectionException.is_connection_exception(error):

This idiom feels a little weird. A Foo is necessarily a Foo,
so we never have Foo.is_foo. Unless you have some background
for the choice that I miss, I suggest just moving that to a
"is_connection_exception()" top-level function.

[4]

+ # The result of the connection error handler is returned
+ # to the api.

That's a nice design, despite our conversations around the problem of
connection level erroring.

I'm a bit concerned about the session events, though. A session-termination
event may be the only event the deferred ever sees, so if we divert it to
a separate callback, the logic pending on the deferred will be dead forever.

I'm not sure about what we should do here. Have you thought about this?

If the answer is to just be so for now, we should at least add a note to
the proper location in the code mentioning that these deferreds are dead.

[5]

+ d = defer.maybeDeferred(
+ self._connection_error_callback,
+ self, error)

For consistency, either both callbacks should take "self", or neither of them
should.

[6]

+ # XXX/Debug
+ #print
+ #print "Session/Conn Event", ClientEvent(type, state, path)
+

Some left overs.

[7]

+ self._check_result(result, d)
+ if not d.called:
+ d.callback(True)

This is checking if the deferred is being called synchronously, which seems
dangerous as a pattern. Theorethically, the deferred may not have fired by
the time the function returns.

I suggest using the interface defined (considering the suggestions from
the pre-requisite branch):

  if not self._failed(result, d):
      d.callback(True)
  return d

review: Approve
Revision history for this message
Jim Baker (jimbaker) wrote : Posted in a previous version of this proposal

+1, we certainly need this branch, and it looks quite solid.

In addition to Gustavo's comments:

[1]

Some minor grammar suggestions:

+ specified in comma separated fashion, if they are,

specified in comma separated fashion. If they are,

+ # If its a session event pass it to the session callback, else

If it's a session event, ...

+ nodes and watches)/ The connection's client id, is also useful
+ when introspecting the server logs for specific client
+ activity.

nodes and watches. The connection's client id is also...

+ Its used for all connection level events and session events.

It's used...

+ # Send session events to the callback, this in addition to any
+ # duplicate session events that will be sent for extant watches.

callback, in addition

+ """Set a callback to recieve session events.

receive

+ twisted integration using deferreds is not capable of recieving

receiving

+ to be invoked with them instead. The callback recieves a single

receives

+ call is made, by setting a connection level error handler,
+ applications can centralize their handling of connection loss,

Something like

By setting...

+ This is a global setting, across connections.

setting across all connections

+ A client session can be reattached to in a separate connection,
+ if the a session is expired, using the zookeeper connection will
+ raise a SessionExpiredException.

+ # On occassion we get a session expired event while connecting.

occasion

+ # Closing one connection, will close the session

Closing one connection will close the session.

+ A client connected to multiple servers, will transparently
+ migrate amongst them, as individual servers can no longer be
+ reached. A client's session will be maintined.

servers will transparently...

...will be maintained.

+ # get a zookeeper connectionloss exception on occassion.

occasion

+ We can specify a callback for connection errors, that
+ can perform recovery for a disconnected client, restablishing

This seems incomplete; also there should not be a comma before "that"

+ """On connection failure, An application can choose to use a
+ new connection with to reconnect to a different member of the
+ zookeeper cluster, reacquiring the extant session.

an application... with which to reconnect...

Maybe this concludes with "This enables reacquiring the existing
session." (Extant doesn't seem like the right word choice here.)

+ A large obvious caveat to using a new client instance rather
+ than reconnecting the existing client, is that even though the

instance, rather than...

+ # Ephemeral is destroyed when the session closed.

the session is closed.

[2]

+ http://bit.ly/mQrOMY
+ http://bit.ly/irKpfn

Why not just expand these links out? Alternatively use a readable URL
(the underlying nabble URL is modestly readable, I don't know how
permalink either one is, however).

[3]

+ assert callable(callback)

Shouldn't this be throwing a TypeError instead? Also, there is no
correponding assertion (or verification) on
set_connection_error_callback. Lastly, there is no test on this path.

[4]

+ def xtest_session_expired_event(self):

Do you want to include this test?

review: Approve
Revision history for this message
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal

Excerpts from Gustavo Niemeyer's message of Mon Jun 20 18:28:28 UTC 2011:
> Review: Approve
> This looks very good. A few comments, and +1 with them considered.
>

1-3 addressed.

>
> [4]
>
> + # The result of the connection error handler is returned
> + # to the api.
>
> That's a nice design, despite our conversations around the problem of
> connection level erroring.
>
> I'm a bit concerned about the session events, though. A session-termination
> event may be the only event the deferred ever sees, so if we divert it to
> a separate callback, the logic pending on the deferred will be dead forever.
>
> I'm not sure about what we should do here. Have you thought about this?
>
> If the answer is to just be so for now, we should at least add a note to
> the proper location in the code mentioning that these deferreds are dead.
>

Any deferreds attached to an unconnected client are dead. A session 'expired' event
is effectively a connection loss, the extant deferreds are dead. The expired
event is the only fatal session event. There is some testing around this (extant
watches on dead clients).

Alternatively, we could route session exceptions to the extant watcher, but then
we're back to the app code having to add guards to each watch for connection loss,
or lots of unhandled exceptions in deferred stacks. The connection exception
watcher will still fire as its always registered on the connection level watcher,
ie. the one that monitors the initial connection setup.

From a cleanup perspective this might be preferrable, but the numbers of extant
watches are small enough its not clear its really win, and most of that cleanup
will just result in ignorable log errors without connection loss handling on all
watches. The error handling itself and the error handling for a watchis basically
just ignore or propgate the exception to fully clear out the deferred chain with
the connection level error callback handling the actual reconnect scenario.

[5-6] Addressed

> [7]
>
> + self._check_result(result, d)
> + if not d.called:
> + d.callback(True)
>
> This is checking if the deferred is being called synchronously, which seems
> dangerous as a pattern. Theorethically, the deferred may not have fired by
> the time the function returns.
>
>
> I suggest using the interface defined (considering the suggestions from
> the pre-requisite branch):
>
> if not self._failed(result, d):
> d.callback(True)
> return d
>

In this case the txzk api is synchronous (close) and we're checking just checking
if its failed. I've updated the construct to not check the deferred.called status.

Revision history for this message
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal

Excerpts from Jim Baker's message of Tue Jun 21 00:18:33 UTC 2011:
> Review: Approve
> +1, we certainly need this branch, and it looks quite solid.
>
> In addition to Gustavo's comments:
>
> [1]
>
Added most of the grammatical changes.
>
> [2]
>
> + http://bit.ly/mQrOMY
> + http://bit.ly/irKpfn
>
> Why not just expand these links out? Alternatively use a readable URL
> (the underlying nabble URL is modestly readable, I don't know how
> permalink either one is, however).
>

The links are rather ugly by themselves and violate 80 col lines.

>
> [3]
>
> + assert callable(callback)
>
> Shouldn't this be throwing a TypeError instead? Also, there is no
> correponding assertion (or verification) on
> set_connection_error_callback. Lastly, there is no test on this path.

Well the tests definitely exercised it by way of using the functionality of the callback, but i've gone ahead and added the typeerror and invalid callback tests.

>
>
> [4]
>
> + def xtest_session_expired_event(self):
>
> Do you want to include this test?
>

nice catch, i did but it was before the implementation of the session work, i've moved the test to test_session

thanks for the review,

cheers,

Kapil

Revision history for this message
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal

gustavo, even though this branch is approved, i'm going to wait on a final comment/agreement on [4] before merging.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (24.6 KiB)

Deep conversation about error handling in Ensemble, in the context of item [4] above.

We've agreed to merge this as-is, since clearly there's a lot of work to be made for the desired end result.

Jul 05 17:30:51 <niemeyer> Let me understand this.. are you introducing this logic so that the application we'll use this to crash the agent?
Jul 05 17:30:59 <niemeyer> s/we'll/will
Jul 05 17:31:14 <niemeyer> If that's the case, as we've already agreed, that's ok for now
Jul 05 17:31:15 <hazmat_> that's one approach
Jul 05 17:31:36 <hazmat_> the other is letting the agent stop/start
Jul 05 17:31:47 <niemeyer> I just want to understand what we're doing, though, and what would be the proper way
Jul 05 17:31:53 <hazmat_> i know.. there's dead stuff on the reactor if we start/stop
Jul 05 17:31:58 <niemeyer> Not just that..
Jul 05 17:32:10 <niemeyer> The dead stuff is not my major worry
Jul 05 17:32:21 <niemeyer> The real problem I see is that we simply can't trust any code anymore
Jul 05 17:32:33 <niemeyer> Imagine.. "Oh, I'll loop until file foo is created."
Jul 05 17:32:34 <niemeyer> NOT!
Jul 05 17:32:43 <niemeyer> The loop never dies..
Jul 05 17:32:55 <niemeyer> *anything* we do would be entirely unreliable
Jul 05 17:33:01 <niemeyer> if it's touching a watch
Jul 05 17:33:13 <niemeyer> It's like piecemeal crashing..
Jul 05 17:33:36 <hazmat_> right error handling is application specific
Jul 05 17:34:23 <hazmat_> with the existing watch protocols, its not an issue, except for the case of a hook currently executing (which needs cleanup even in the case of a process shutdown)
Jul 05 17:35:11 <niemeyer> I'm pretty sure I can find cases where it is an issue..
Jul 05 17:35:21 <niemeyer> relation-get ip
Jul 05 17:35:35 <niemeyer> relation-get goes into the agent..
Jul 05 17:35:45 <niemeyer> Never returns again..
Jul 05 17:35:52 <niemeyer> Dead processes
Jul 05 17:36:05 <hazmat_> that's the case i just mentioned
Jul 05 17:36:22 <niemeyer> I see
Jul 05 17:36:34 <niemeyer> I read "watch currently executed" when you said hook
Jul 05 17:36:56 <niemeyer> Anyway.. that's the kind of issue
Jul 05 17:37:07 <niemeyer> There are likely more problematic cases already
Jul 05 17:38:58 <hazmat_> yup.. the interesting case to me, would be something that where we our 'interactively' waiting on a watch, which we don't have, all the existing watch apis are effectively complete on their own (ie. request / response cycles).. if we want to consider arbitrary code.. sure we can construct problem scenarios.. but the existing usage i think is okay..
Jul 05 17:39:00 * hazmat_ brainstorms
Jul 05 17:39:24 <niemeyer> Well, it's already not ok as we have just both spotted
Jul 05 17:40:13 <hazmat_> well the hook scenario is very different then the dead logic waiting issue
Jul 05 17:40:19 <niemeyer> Why?
Jul 05 17:40:33 <hazmat_> it doesn't really care if the session terminates, it just needs an active connection to zk
Jul 05 17:41:06 <niemeyer> Hmmm
Jul 05 17:43:27 <hazmat_> pretty much all of the watch protocols/callbacks are based on examining the current state, and then effecting changes for their area of responsibility to match that current state.. if their restarted, they function t...

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'txzookeeper/client.py'
--- txzookeeper/client.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/client.py 2011-07-05 13:29:55 +0000
@@ -60,6 +60,24 @@
60 zookeeper.SYSTEMERROR: zookeeper.SystemErrorException,60 zookeeper.SYSTEMERROR: zookeeper.SystemErrorException,
61 zookeeper.UNIMPLEMENTED: zookeeper.UnimplementedException}61 zookeeper.UNIMPLEMENTED: zookeeper.UnimplementedException}
6262
63# Mapping of connection state values to human strings.
64STATE_NAME_MAPPING = {
65 zookeeper.ASSOCIATING_STATE: "associating",
66 zookeeper.AUTH_FAILED_STATE: "auth-failed",
67 zookeeper.CONNECTED_STATE: "connected",
68 zookeeper.CONNECTING_STATE: "connecting",
69 zookeeper.EXPIRED_SESSION_STATE: "expired",
70}
71
72# Mapping of event type to human string.
73TYPE_NAME_MAPPING = {
74 zookeeper.NOTWATCHING_EVENT: "not-watching",
75 zookeeper.SESSION_EVENT: "session",
76 zookeeper.CREATED_EVENT: "created",
77 zookeeper.DELETED_EVENT: "deleted",
78 zookeeper.CHANGED_EVENT: "changed",
79 zookeeper.CHILD_EVENT: "child"}
80
6381
64class NotConnectedException(zookeeper.ZooKeeperException):82class NotConnectedException(zookeeper.ZooKeeperException):
65 """83 """
@@ -73,6 +91,29 @@
73 Raised if an error occurs during the client's connection attempt.91 Raised if an error occurs during the client's connection attempt.
74 """92 """
7593
94 @property
95 def state_name(self):
96 return STATE_NAME_MAPPING[self.args[2]]
97
98 @property
99 def type_name(self):
100 return TYPE_NAME_MAPPING[self.args[1]]
101
102 def __str__(self):
103 return "<txzookeeper.ConnectionException type: %s state: %s>" % (
104 self.type_name, self.state_name)
105
106
107def is_connection_exception(e):
108 """
109 For connection errors in response to api calls, a utility method
110 to determine if the cause is a connection exception.
111 """
112 return isinstance(e,
113 (zookeeper.ClosingException,
114 zookeeper.ConnectionLossException,
115 zookeeper.SessionExpiredException))
116
76117
77class ConnectionTimeoutException(zookeeper.ZooKeeperException):118class ConnectionTimeoutException(zookeeper.ZooKeeperException):
78 """119 """
@@ -87,94 +128,131 @@
87 some event on the zookeeper client that the watch was requested on.128 some event on the zookeeper client that the watch was requested on.
88 """129 """
89130
90 type_name_map = {
91 -2: "notwatching",
92 -1: "session",
93 1: 'created',
94 2: 'deleted',
95 3: 'changed',
96 4: 'child'}
97
98 @property131 @property
99 def type_name(self):132 def type_name(self):
100 return self.type_name_map[self.type]133 return TYPE_NAME_MAPPING[self.type]
134
135 @property
136 def state_name(self):
137 return STATE_NAME_MAPPING[self.connection_state]
101138
102 def __repr__(self):139 def __repr__(self):
103 return "<ClientEvent %s at %r>" % (self.type_name, self.path)140 return "<ClientEvent %s at %r state: %s>" % (
141 self.type_name, self.path, self.state_name)
104142
105143
106class ZookeeperClient(object):144class ZookeeperClient(object):
107 """Asynchronous twisted client for zookeeper."""145 """Asynchronous twisted client for zookeeper."""
108146
109 def __init__(self, servers=None, session_timeout=None):147 def __init__(self, servers=None, session_timeout=None):
148 """
149 @param servers: A string specifying the servers and their
150 ports to connect to. Multiple servers can be
151 specified in comma separated fashion. if they are,
152 then the client will automatically rotate
153 among them if a server connection fails. Optionally
154 a chroot can be specified. A full server spec looks
155 like host:port/chroot_path
156
157 @param session_timeout: The client's zookeeper session timeout can be
158 hinted. The actual value is negotiated between the
159 client and server based on their respective
160 configurations.
161 """
110 self._servers = servers162 self._servers = servers
111 self._session_timeout = session_timeout163 self._session_timeout = session_timeout
164 self._session_event_callback = None
165 self._connection_error_callback = None
112 self.connected = False166 self.connected = False
113 self.handle = None167 self.handle = None
114168
115 def _check_connected(self):169 def _check_connected(self, d):
116 if not self.connected:170 if not self.connected:
117 raise NotConnectedException("not connected")171 d.errback(NotConnectedException("not connected"))
118172 return d
119 def _check_result(self, result_code, callback=False, extra_codes=()):173
174 def _check_result(self, result_code, deferred, extra_codes=()):
175 """Check an API call or result for errors.
176
177 :param result_code: The api result code.
178 :param deferred: The deferred returned the client api consumer.
179 :param extra_codes: Additional result codes accepted as valid/ok.
180
181 If the result code is an error, an appropriate Exception class
182 is constructed and the errback on the deferred is invoked with it.
183 """
120 error = None184 error = None
121 if not result_code == zookeeper.OK and not result_code in extra_codes:185 if not result_code == zookeeper.OK and not result_code in extra_codes:
122 error_msg = zookeeper.zerror(result_code)186 error_msg = zookeeper.zerror(result_code)
123 error_class = ERROR_MAPPING.get(187 error_class = ERROR_MAPPING.get(
124 result_code, zookeeper.ZooKeeperException)188 result_code, zookeeper.ZooKeeperException)
125 error = error_class(error_msg)189 error = error_class(error_msg)
126 if callback:190
127 return error191 if is_connection_exception(error):
128 raise error192 # Mark the client as disconnected.
193 self.connected = False
194 # Route connection errors to a connection level error
195 # handler if specified.
196 if self._connection_error_callback:
197 # The result of the connection error handler is returned
198 # to the api.
199 d = defer.maybeDeferred(
200 self._connection_error_callback,
201 self, error)
202 d.chainDeferred(deferred)
203 return True
204
205 deferred.errback(error)
206 return True
129 return None207 return None
130208
131 def _get(self, path, watcher):209 def _get(self, path, watcher):
132 self._check_connected()
133 d = defer.Deferred()210 d = defer.Deferred()
211 if self._check_connected(d):
212 return d
134213
135 def _cb_get(result_code, value, stat):214 def _cb_get(result_code, value, stat):
136 error = self._check_result(result_code, True)215 if self._check_result(result_code, d):
137 if error:216 return
138 return d.errback(error)
139 d.callback((value, stat))217 d.callback((value, stat))
140218
141 callback = self._zk_thread_callback(_cb_get)219 callback = self._zk_thread_callback(_cb_get)
142 watcher = self._wrap_watcher(watcher)220 watcher = self._wrap_watcher(watcher)
143 result = zookeeper.aget(self.handle, path, watcher, callback)221 result = zookeeper.aget(self.handle, path, watcher, callback)
144 self._check_result(result)222 self._check_result(result, d)
145 return d223 return d
146224
147 def _get_children(self, path, watcher):225 def _get_children(self, path, watcher):
148 self._check_connected()
149 d = defer.Deferred()226 d = defer.Deferred()
227 if self._check_connected(d):
228 return d
150229
151 def _cb_get_children(result_code, children):230 def _cb_get_children(result_code, children):
152 error = self._check_result(result_code, True)231 if self._check_result(result_code, d):
153 if error:232 return
154 return d.errback(error)
155 d.callback(children)233 d.callback(children)
156234
157 callback = self._zk_thread_callback(_cb_get_children)235 callback = self._zk_thread_callback(_cb_get_children)
158 watcher = self._wrap_watcher(watcher)236 watcher = self._wrap_watcher(watcher)
159 result = zookeeper.aget_children(self.handle, path, watcher, callback)237 result = zookeeper.aget_children(self.handle, path, watcher, callback)
160 self._check_result(result)238 self._check_result(result, d)
161 return d239 return d
162240
163 def _exists(self, path, watcher):241 def _exists(self, path, watcher):
164 self._check_connected()
165 d = defer.Deferred()242 d = defer.Deferred()
243 if self._check_connected(d):
244 return d
166245
167 def _cb_exists(result_code, stat):246 def _cb_exists(result_code, stat):
168 error = self._check_result(247 if self._check_result(
169 result_code, True, extra_codes=(zookeeper.NONODE,))248 result_code, d, extra_codes=(zookeeper.NONODE,)):
170 if error:249 return
171 return d.errback(error)
172 d.callback(stat)250 d.callback(stat)
173251
174 callback = self._zk_thread_callback(_cb_exists)252 callback = self._zk_thread_callback(_cb_exists)
175 watcher = self._wrap_watcher(watcher)253 watcher = self._wrap_watcher(watcher)
176 result = zookeeper.aexists(self.handle, path, watcher, callback)254 result = zookeeper.aexists(self.handle, path, watcher, callback)
177 self._check_result(result)255 self._check_result(result, d)
178 return d256 return d
179257
180 def _wrap_watcher(self, watcher):258 def _wrap_watcher(self, watcher):
@@ -182,7 +260,22 @@
182 return watcher260 return watcher
183 if not callable(watcher):261 if not callable(watcher):
184 raise SyntaxError("invalid watcher")262 raise SyntaxError("invalid watcher")
185 return self._zk_thread_callback(watcher)263 return self._zk_thread_callback(
264 partial(self._session_event_wrapper, watcher))
265
266 def _session_event_wrapper(self, watcher, event_type, conn_state, path):
267 """Watch wrapper that diverts session events to a connection callback.
268 """
269 # If it's a session event pass it to the session callback, else
270 # ignore it. Session events are sent repeatedly to watchers
271 # which we have modeled after deferred, which only accept a
272 # single return value.
273 if event_type == zookeeper.SESSION_EVENT:
274 if self._session_event_callback:
275 self._session_event_callback(
276 self, ClientEvent(event_type, conn_state, path))
277 else:
278 return watcher(event_type, conn_state, path)
186279
187 def _zk_thread_callback(self, func):280 def _zk_thread_callback(self, func):
188 """281 """
@@ -190,7 +283,6 @@
190 any user defined callback so that they are called back in the main283 any user defined callback so that they are called back in the main
191 thread after, zookeeper calls the wrapper.284 thread after, zookeeper calls the wrapper.
192 """285 """
193
194 def wrapper(handle, *args): # pragma: no cover286 def wrapper(handle, *args): # pragma: no cover
195 reactor.callFromThread(func, *args)287 reactor.callFromThread(func, *args)
196 return wrapper288 return wrapper
@@ -207,6 +299,7 @@
207 def session_timeout(self):299 def session_timeout(self):
208 """300 """
209 What's the negotiated session timeout for this connection, in seconds.301 What's the negotiated session timeout for this connection, in seconds.
302 If the client is not connected the value is None.
210 """303 """
211 if self.connected:304 if self.connected:
212 return zookeeper.recv_timeout(self.handle)305 return zookeeper.recv_timeout(self.handle)
@@ -222,9 +315,14 @@
222315
223 @property316 @property
224 def client_id(self):317 def client_id(self):
225 """318 """Returns the client id that identifies the server side session.
226 The connection's client id, useful when introspecting the server logs319
227 for specific client activity.320 A client id is a tuple represented by the session id and
321 session password. It can be used to manually connect to an
322 extant server session (which contains associated ephemeral
323 nodes and watches)/ The connection's client id is also useful
324 when introspecting the server logs for specific client
325 activity.
228 """326 """
229 if self.handle is None:327 if self.handle is None:
230 return None328 return None
@@ -239,34 +337,33 @@
239 return bool(zookeeper.is_unrecoverable(self.handle))337 return bool(zookeeper.is_unrecoverable(self.handle))
240338
241 def add_auth(self, scheme, identity):339 def add_auth(self, scheme, identity):
242 """340 """Adds an authentication identity to this connection.
243 Adds an authentication identity to this connection. A connection341
244 can use multiple authentication identities at the same time, all342 A connection can use multiple authentication identities at the
245 are checked when verifying acls on a node.343 same time, all are checked when verifying acls on a node.
246344
247 @param scheme: a string specifying a an authentication scheme345 @param scheme: a string specifying a an authentication scheme
248 valid values include 'digest'.346 valid values include 'digest'.
249 @param identity: a string containingusername and password colon347 @param identity: a string containing username and password colon
250 separated for example 'mary:apples'348 separated, for example 'mary:apples'
251 """349 """
252 self._check_connected()
253 d = defer.Deferred()350 d = defer.Deferred()
351 if self._check_connected(d):
352 return d
254353
255 def _cb_authenticated(result_code):354 def _cb_authenticated(result_code):
256 error = self._check_result(result_code, True)355 if self._check_result(result_code, d):
257 if error:356 return
258 return d.errback(error)
259 d.callback(self)357 d.callback(self)
260358
261 callback = self._zk_thread_callback(_cb_authenticated)359 callback = self._zk_thread_callback(_cb_authenticated)
262 result = zookeeper.add_auth(self.handle, scheme, identity, callback)360 result = zookeeper.add_auth(self.handle, scheme, identity, callback)
263 self._check_result(result)361 self._check_result(result, d)
264 return d362 return d
265363
266 def close(self, force=False):364 def close(self, force=False):
267 """365 """
268 Close the underlying socket connection and zookeeper server side366 Close the underlying socket connection and server side session.
269 session.
270367
271 @param force: boolean, require the connection to be closed now or368 @param force: boolean, require the connection to be closed now or
272 an exception be raised.369 an exception be raised.
@@ -276,23 +373,31 @@
276373
277 result = zookeeper.close(self.handle)374 result = zookeeper.close(self.handle)
278 self.connected = False375 self.connected = False
279 self._check_result(result)376 d = defer.Deferred()
280 return result377
281378 if self._check_result(result, d):
282 def connect(self, servers=None, timeout=10):379 return d
380 d.callback(True)
381 return d
382
383 def connect(self, servers=None, timeout=10, client_id=None):
283 """384 """
284 Establish a connection to the given zookeeper server(s).385 Establish a connection to the given zookeeper server(s).
285386
286 @param servers: A string specifying the servers and their ports to387 @param servers: A string specifying the servers and their ports to
287 connect to.388 connect to. Multiple servers can be specified in
389 comma separated fashion.
288 @param timeout: How many seconds to wait on a connection to the390 @param timeout: How many seconds to wait on a connection to the
289 zookeeper servers.391 zookeeper servers.
392
393 @param session_id:
290 @returns A deferred that's fired when the connection is established.394 @returns A deferred that's fired when the connection is established.
291 """395 """
292 d = defer.Deferred()396 d = defer.Deferred()
293397
294 if self.connected:398 if self.connected:
295 raise zookeeper.ZooKeeperException("Already Connected")399 return defer.fail(
400 zookeeper.ZooKeeperException("Already Connected"))
296401
297 # Use a scheduled function to ensure a timeout.402 # Use a scheduled function to ensure a timeout.
298 def _check_timeout():403 def _check_timeout():
@@ -311,31 +416,45 @@
311 if servers is not None:416 if servers is not None:
312 self._servers = servers417 self._servers = servers
313418
314 self.handle = zookeeper.init(419 # Assemble client id if specified.
315 self._servers, callback, self._session_timeout)420 if client_id:
421 self.handle = zookeeper.init(
422 self._servers, callback, self._session_timeout, client_id)
423 else:
424 self.handle = zookeeper.init(
425 self._servers, callback, self._session_timeout)
316426
317 return d427 return d
318428
319 def _cb_connected(429 def _cb_connected(
320 self, scheduled_timeout, connect_deferred, type, state, path):430 self, scheduled_timeout, connect_deferred, type, state, path):
431 """This callback is invoked through the lifecycle of the connection.
321432
433 It's used for all connection level events and session events.
434 """
322 # Cancel the timeout delayed task if it hasn't fired.435 # Cancel the timeout delayed task if it hasn't fired.
323 if scheduled_timeout.active():436 if scheduled_timeout.active():
324 scheduled_timeout.cancel()437 scheduled_timeout.cancel()
325438
439 # Update connected boolean
440 if state == zookeeper.CONNECTED_STATE:
441 self.connected = True
442 elif state != zookeeper.CONNECTING_STATE:
443 self.connected = False
444
326 if connect_deferred.called:445 if connect_deferred.called:
327 # If we timed out and then connected, then close the conn.446 # If we timed out and then connected, then close the conn.
328 if state == zookeeper.CONNECTED_STATE:447 if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called:
329 self.connected = True
330 self.close()448 self.close()
331 # If the client is reused across multiple connect/close449
332 # cycles, and a previous connect timed out, then a450 # Send session events to the callback, in addition to any
333 # subsequent connect may trigger the previous connect's451 # duplicate session events that will be sent for extant watches.
334 # handler notifying of a CONNECTING_STATE, ignore.452 if self._session_event_callback:
453 self._session_event_callback(
454 self, ClientEvent(type, state, path))
455
335 return456 return
336 elif state == zookeeper.CONNECTED_STATE:457 elif state == zookeeper.CONNECTED_STATE:
337 # Connection established.
338 self.connected = True
339 connect_deferred.callback(self)458 connect_deferred.callback(self)
340 return459 return
341460
@@ -351,19 +470,19 @@
351 @params acls: A list of dictionaries specifying permissions.470 @params acls: A list of dictionaries specifying permissions.
352 @params flags: Node creation flags (ephemeral, sequence, persistent)471 @params flags: Node creation flags (ephemeral, sequence, persistent)
353 """472 """
354 self._check_connected()
355 d = defer.Deferred()473 d = defer.Deferred()
474 if self._check_connected(d):
475 return d
356476
357 def _cb_created(result_code, path):477 def _cb_created(result_code, path):
358 error = self._check_result(result_code, True)478 if self._check_result(result_code, d):
359 if error:479 return
360 return d.errback(error)
361 d.callback(path)480 d.callback(path)
362481
363 callback = self._zk_thread_callback(_cb_created)482 callback = self._zk_thread_callback(_cb_created)
364 result = zookeeper.acreate(483 result = zookeeper.acreate(
365 self.handle, path, data, acls, flags, callback)484 self.handle, path, data, acls, flags, callback)
366 self._check_result(result)485 self._check_result(result, d)
367 return d486 return d
368487
369 def delete(self, path, version=-1):488 def delete(self, path, version=-1):
@@ -376,18 +495,18 @@
376 @param path: the path of the node to be deleted.495 @param path: the path of the node to be deleted.
377 @param version: the integer version of the node.496 @param version: the integer version of the node.
378 """497 """
379 self._check_connected()
380 d = defer.Deferred()498 d = defer.Deferred()
499 if self._check_connected(d):
500 return d
381501
382 def _cb_delete(result_code):502 def _cb_delete(result_code):
383 error = self._check_result(result_code, True)503 if self._check_result(result_code, d):
384 if error:504 return
385 return d.errback(error)
386 d.callback(result_code)505 d.callback(result_code)
387506
388 callback = self._zk_thread_callback(_cb_delete)507 callback = self._zk_thread_callback(_cb_delete)
389 result = zookeeper.adelete(self.handle, path, version, callback)508 result = zookeeper.adelete(self.handle, path, version, callback)
390 self._check_result(result)509 self._check_result(result, d)
391 return d510 return d
392511
393 def exists(self, path):512 def exists(self, path):
@@ -395,8 +514,9 @@
395 Check that the given node path exists. Returns a deferred that514 Check that the given node path exists. Returns a deferred that
396 holds the node stat information if the node exists (created,515 holds the node stat information if the node exists (created,
397 modified, version, etc.), or ``None`` if it does not exist.516 modified, version, etc.), or ``None`` if it does not exist.
517
518 @param path: The path of the node whose existence will be checked.
398 """519 """
399
400 return self._exists(path, None)520 return self._exists(path, None)
401521
402 def exists_and_watch(self, path):522 def exists_and_watch(self, path):
@@ -406,20 +526,22 @@
406 In addition to the deferred method result, this method returns526 In addition to the deferred method result, this method returns
407 a deferred that is called back when the node is modified or527 a deferred that is called back when the node is modified or
408 removed (once).528 removed (once).
529
530 @param path: The path of the node whose existence will be checked.
409 """531 """
410
411 d = defer.Deferred()532 d = defer.Deferred()
412533
413 def callback(*args):534 def watcher(*args):
414 d.callback(ClientEvent(*args))535 d.callback(ClientEvent(*args))
415 return self._exists(path, callback), d536 return self._exists(path, watcher), d
416537
417 def get(self, path):538 def get(self, path):
418 """539 """
419 Get the node's data for the given node path. Returns a540 Get the node's data for the given node path. Returns a
420 deferred that holds the content of the node.541 deferred that holds the content of the node.
542
543 @param path: The path of the node whose content will be retrieved.
421 """544 """
422
423 return self._get(path, None)545 return self._get(path, None)
424546
425 def get_and_watch(self, path):547 def get_and_watch(self, path):
@@ -429,17 +551,20 @@
429 In addition to the deferred method result, this method returns551 In addition to the deferred method result, this method returns
430 a deferred that is called back when the node is modified or552 a deferred that is called back when the node is modified or
431 removed (once).553 removed (once).
554
555 @param path: The path of the node whose content will be retrieved.
432 """556 """
433
434 d = defer.Deferred()557 d = defer.Deferred()
435558
436 def callback(*args):559 def watcher(*args):
437 d.callback(ClientEvent(*args))560 d.callback(ClientEvent(*args))
438 return self._get(path, callback), d561 return self._get(path, watcher), d
439562
440 def get_children(self, path):563 def get_children(self, path):
441 """564 """
442 Get the ids of all children directly under the given path.565 Get the ids of all children directly under the given path.
566
567 @param path: The path of the node whose children will be retrieved.
443 """568 """
444 return self._get_children(path, None)569 return self._get_children(path, None)
445570
@@ -450,13 +575,14 @@
450 In addition to the deferred method result, this method returns575 In addition to the deferred method result, this method returns
451 a deferred that is called back when a change happens on the576 a deferred that is called back when a change happens on the
452 provided path (once).577 provided path (once).
578
579 @param path: The path of the node whose children will be retrieved.
453 """580 """
454
455 d = defer.Deferred()581 d = defer.Deferred()
456582
457 def callback(*args):583 def watcher(*args):
458 d.callback(ClientEvent(*args))584 d.callback(ClientEvent(*args))
459 return self._get_children(path, callback), d585 return self._get_children(path, watcher), d
460586
461 def get_acl(self, path):587 def get_acl(self, path):
462 """588 """
@@ -464,19 +590,21 @@
464590
465 Each acl is a dictionary containing keys/values for scheme, id,591 Each acl is a dictionary containing keys/values for scheme, id,
466 and perms.592 and perms.
593
594 @param path: The path of the node whose acl will be retrieved.
467 """595 """
468 self._check_connected()
469 d = defer.Deferred()596 d = defer.Deferred()
597 if self._check_connected(d):
598 return d
470599
471 def _cb_get_acl(result_code, acls, stat):600 def _cb_get_acl(result_code, acls, stat):
472 error = self._check_result(result_code, True)601 if self._check_result(result_code, d):
473 if error:602 return
474 return d.errback(error)
475 d.callback((acls, stat))603 d.callback((acls, stat))
476604
477 callback = self._zk_thread_callback(_cb_get_acl)605 callback = self._zk_thread_callback(_cb_get_acl)
478 result = zookeeper.aget_acl(self.handle, path, callback)606 result = zookeeper.aget_acl(self.handle, path, callback)
479 self._check_result(result)607 self._check_result(result, d)
480 return d608 return d
481609
482 def set_acl(self, path, acls, version=-1):610 def set_acl(self, path, acls, version=-1):
@@ -502,19 +630,19 @@
502 doesn't match the version on the server, then a630 doesn't match the version on the server, then a
503 BadVersionException is raised.631 BadVersionException is raised.
504 """632 """
505 self._check_connected()
506 d = defer.Deferred()633 d = defer.Deferred()
634 if self._check_connected(d):
635 return d
507636
508 def _cb_set_acl(result_code):637 def _cb_set_acl(result_code):
509 error = self._check_result(result_code, True)638 if self._check_result(result_code, d):
510 if error:639 return
511 return d.errback(error)
512 d.callback(result_code)640 d.callback(result_code)
513641
514 callback = self._zk_thread_callback(_cb_set_acl)642 callback = self._zk_thread_callback(_cb_set_acl)
515 result = zookeeper.aset_acl(643 result = zookeeper.aset_acl(
516 self.handle, path, version, acls, callback)644 self.handle, path, version, acls, callback)
517 self._check_result(result)645 self._check_result(result, d)
518 return d646 return d
519647
520 def set(self, path, data="", version=-1):648 def set(self, path, data="", version=-1):
@@ -528,18 +656,18 @@
528 @param data: The data to store on the node.656 @param data: The data to store on the node.
529 @param version: Integer version value657 @param version: Integer version value
530 """658 """
531 self._check_connected()
532 d = defer.Deferred()659 d = defer.Deferred()
660 if self._check_connected(d):
661 return d
533662
534 def _cb_set(result_code, node_stat):663 def _cb_set(result_code, node_stat):
535 error = self._check_result(result_code, True)664 if self._check_result(result_code, d):
536 if error:665 return
537 return d.errback(error)
538 d.callback(node_stat)666 d.callback(node_stat)
539667
540 callback = self._zk_thread_callback(_cb_set)668 callback = self._zk_thread_callback(_cb_set)
541 result = zookeeper.aset(self.handle, path, data, version, callback)669 result = zookeeper.aset(self.handle, path, data, version, callback)
542 self._check_result(result)670 self._check_result(result, d)
543 return d671 return d
544672
545 def set_connection_watcher(self, watcher):673 def set_connection_watcher(self, watcher):
@@ -549,25 +677,72 @@
549677
550 @param: watcher function678 @param: watcher function
551 """679 """
680 if not callable(watcher):
681 raise SyntaxError("Invalid Watcher %r" % (watcher))
552 watcher = self._wrap_watcher(watcher)682 watcher = self._wrap_watcher(watcher)
553 zookeeper.set_watcher(self.handle, watcher)683 zookeeper.set_watcher(self.handle, watcher)
554684
685 def set_session_callback(self, callback):
686 """Set a callback to receive session events.
687
688 Session events are by default ignored. Interested applications
689 may choose to set a session event watcher on the connection
690 to receive session events. Session events are typically broadcast
691 by the libzookeeper library to all extant watchers, but the
692 twisted integration using deferreds is not capable of receiving
693 multiple values (session events and watch events), so this
694 client implementation instead provides for a user defined callback
695 to be invoked with them instead. The callback receives a single
696 parameter, the session event in the form of a ClientEvent instance.
697
698 Additional details on session events
699 ------------------------------------
700 http://bit.ly/mQrOMY
701 http://bit.ly/irKpfn
702 """
703 if not callable(callback):
704 raise TypeError("Invalid callback %r" % callback)
705 self._session_event_callback = callback
706
707 def set_connection_error_callback(self, callback):
708 """Set a callback to receive connection error exceptions.
709
710 By default the error will be raised when the client API
711 call is made. Setting a connection level error handler allows
712 applications to centralize their handling of connection loss,
713 instead of having to guard every zk interaction.
714
715 The callback receives two parameters, the client instance
716 and the exception.
717 """
718 if not callable(callback):
719 raise TypeError("Invalid callback %r" % callback)
720 self._connection_error_callback = callback
721
722 def set_determinstic_order(self, boolean):
723 """
724 The zookeeper client will by default randomize the server hosts
725 it will connect to unless this is set to True.
726
727 This is a global setting across connections.
728 """
729 zookeeper.deterministic_conn_order(bool(boolean))
730
555 def sync(self, path="/"):731 def sync(self, path="/"):
556 """732 """Flushes the connected zookeeper server with the leader.
557 Flushes the zookeeper connection to the leader.
558733
559 @param path: The root path to flush, all child nodes are also flushed.734 @param path: The root path to flush, all child nodes are also flushed.
560 """735 """
561 self._check_connected()
562 d = defer.Deferred()736 d = defer.Deferred()
737 if self._check_connected(d):
738 return d
563739
564 def _cb_sync(result_code, path):740 def _cb_sync(result_code, path):
565 error = self._check_result(result_code, True)741 if self._check_result(result_code, d):
566 if error:742 return
567 return d.errback(error)
568 d.callback(path)743 d.callback(path)
569744
570 callback = self._zk_thread_callback(_cb_sync)745 callback = self._zk_thread_callback(_cb_sync)
571 result = zookeeper.async(self.handle, path, callback)746 result = zookeeper.async(self.handle, path, callback)
572 self._check_result(result)747 self._check_result(result, d)
573 return d748 return d
574749
=== modified file 'txzookeeper/queue.py'
--- txzookeeper/queue.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/queue.py 2011-07-05 13:29:55 +0000
@@ -90,7 +90,7 @@
90 def on_queue_items_changed(*args):90 def on_queue_items_changed(*args):
91 """Event watcher on queue node child events."""91 """Event watcher on queue node child events."""
92 if request.complete or not self._client.connected:92 if request.complete or not self._client.connected:
93 return # pragma: no cover93 return # pragma: no cover
9494
95 if request.processing_children:95 if request.processing_children:
96 # If deferred stack is currently processing a set of children96 # If deferred stack is currently processing a set of children
@@ -273,7 +273,7 @@
273 """273 """
274274
275 def _item_processed_callback(self, result_code, item_path):275 def _item_processed_callback(self, result_code, item_path):
276 return self._client.delete(item_path+"-processing")276 return self._client.delete(item_path + "-processing")
277277
278 def _filter_children(self, children, suffix="-processing"):278 def _filter_children(self, children, suffix="-processing"):
279 """279 """
@@ -300,7 +300,7 @@
300300
301 def on_node_exists(stat, path):301 def on_node_exists(stat, path):
302 """Reserve the node for consumer processing."""302 """Reserve the node for consumer processing."""
303 d = self._client.create(path+"-processing",303 d = self._client.create(path + "-processing",
304 flags=zookeeper.EPHEMERAL)304 flags=zookeeper.EPHEMERAL)
305 d.addCallback(on_reservation_success, path)305 d.addCallback(on_reservation_success, path)
306 d.addErrback(on_reservation_failed)306 d.addErrback(on_reservation_failed)
@@ -315,7 +315,7 @@
315315
316 def on_get_node_failed(failure, path):316 def on_get_node_failed(failure, path):
317 """If we can't fetch the node, delete the processing node."""317 """If we can't fetch the node, delete the processing node."""
318 d = self._client.delete(path+"-processing")318 d = self._client.delete(path + "-processing")
319319
320 # propogate unexpected errors appropriately320 # propogate unexpected errors appropriately
321 if not failure.check(zookeeper.NoNodeException):321 if not failure.check(zookeeper.NoNodeException):
@@ -372,7 +372,7 @@
372372
373 def __init__(self, path, client, acl=None, persistent=False):373 def __init__(self, path, client, acl=None, persistent=False):
374 super(SerializedQueue, self).__init__(path, client, acl, persistent)374 super(SerializedQueue, self).__init__(path, client, acl, persistent)
375 self._lock = Lock("%s/%s"%(self.path, "_lock"), client)375 self._lock = Lock("%s/%s" % (self.path, "_lock"), client)
376376
377 def _item_processed_callback(self, result_code, item_path):377 def _item_processed_callback(self, result_code, item_path):
378 return self._lock.release()378 return self._lock.release()
379379
=== modified file 'txzookeeper/tests/__init__.py'
--- txzookeeper/tests/__init__.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/tests/__init__.py 2011-07-05 13:29:55 +0000
@@ -36,8 +36,8 @@
3636
37 def tearDown(self):37 def tearDown(self):
38 super(ZookeeperTestCase, self).tearDown()38 super(ZookeeperTestCase, self).tearDown()
39 zookeeper.set_log_stream(sys.stderr) # reset to default39 #zookeeper.set_log_stream(sys.stderr) # reset to default
40 zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)40 #zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
41 self.log_file.close()41 self.log_file.close()
4242
43 def get_log(self):43 def get_log(self):
4444
=== added file 'txzookeeper/tests/common.py'
--- txzookeeper/tests/common.py 1970-01-01 00:00:00 +0000
+++ txzookeeper/tests/common.py 2011-07-05 13:29:55 +0000
@@ -0,0 +1,216 @@
1import os
2import os.path
3import shutil
4import subprocess
5import tempfile
6
7from itertools import chain
8from collections import namedtuple
9from glob import glob
10
11
12ServerInfo = namedtuple(
13 "ServerInfo", "server_id client_port election_port leader_port")
14
15
16class ManagedZooKeeper(object):
17 """Class to manage the running of a ZooKeeper instance for testing.
18
19 Note: no attempt is made to probe the ZooKeeper instance is
20 actually available, or that the selected port is free. In the
21 future, we may want to do that, especially when run in a
22 Hudson/Buildbot context, to ensure more test robustness."""
23
24 def __init__(self, software_path, server_info, peers=()):
25 """Define the ZooKeeper test instance.
26
27 @param install_path: The path to the install for ZK
28 @param port: The port to run the managed ZK instance
29 """
30 self.install_path = software_path
31 self.server_info = server_info
32 self.host = "127.0.0.1"
33 self.peers = peers
34 self.working_path = tempfile.mkdtemp()
35 self._running = False
36
37 def run(self):
38 """Run the ZooKeeper instance under a temporary directory.
39
40 Writes ZK log messages to zookeeper.log in the current directory.
41 """
42 config_path = os.path.join(self.working_path, "zoo.cfg")
43 log_path = os.path.join(self.working_path, "log")
44 log4j_path = os.path.join(self.working_path, "log4j.properties")
45 data_path = os.path.join(self.working_path, "data")
46
47 # various setup steps
48 if not os.path.exists(self.working_path):
49 os.mdir(self.working_path)
50 if not os.path.exists(log_path):
51 os.mkdir(log_path)
52 if not os.path.exists(data_path):
53 os.mkdir(data_path)
54
55 with open(config_path, "w") as config:
56 config.write("""
57tickTime=2000
58dataDir=%s
59clientPort=%s
60maxClientCnxns=0
61""" % (data_path, self.server_info.client_port))
62
63 # setup a replicated setup if peers are specified
64 if self.peers:
65 servers_cfg = []
66 for p in chain((self.server_info,), self.peers):
67 servers_cfg.append("server.%s=localhost:%s:%s" % (
68 p.server_id, p.leader_port, p.election_port))
69
70 with open(config_path, "a") as config:
71 config.write("""
72initLimit=4
73syncLimit=2
74%s
75""" % ("\n".join(servers_cfg)))
76
77 # Write server ids into datadir
78 with open(os.path.join(data_path, "myid"), "w") as myid_file:
79 myid_file.write(str(self.server_info.server_id))
80
81 with open(log4j_path, "w") as log4j:
82 log4j.write("""
83# DEFAULT: console appender only
84log4j.rootLogger=INFO, ROLLINGFILE
85log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
86log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
87log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
88log4j.appender.ROLLINGFILE.Threshold=DEBUG
89log4j.appender.ROLLINGFILE.File=""" + (
90 self.working_path + os.sep + "zookeeper.log\n"))
91
92 self.process = subprocess.Popen(
93 args=["java",
94 "-cp", self.classpath,
95 "-Dzookeeper.log.dir=%s" % log_path,
96 "-Dzookeeper.root.logger=INFO,CONSOLE",
97 "-Dlog4j.configuration=file:%s" % log4j_path,
98 # "-Dlog4j.debug",
99 "org.apache.zookeeper.server.quorum.QuorumPeerMain",
100 config_path],
101 )
102 self._running = True
103
104 @property
105 def classpath(self):
106 """Get the classpath necessary to run ZooKeeper."""
107
108 # Two possibilities, as seen in zkEnv.sh:
109 # Check for a release - top-level zookeeper-*.jar?
110 jars = glob((os.path.join(
111 self.install_path, 'zookeeper-*.jar')))
112 if jars:
113 # Relase build (`ant package`)
114 jars.extend(glob(os.path.join(
115 self.install_path,
116 "lib/*.jar")))
117 else:
118 # Development build (plain `ant`)
119 jars = glob((os.path.join(
120 self.install_path, 'build/zookeeper-*.jar')))
121 jars.extend(glob(os.path.join(
122 self.install_path,
123 "build/lib/*.jar")))
124 return ":".join(jars)
125
126 @property
127 def address(self):
128 """Get the address of the ZooKeeper instance."""
129 return "%s:%s" % (self.host, self.client_port)
130
131 @property
132 def running(self):
133 return self._running
134
135 @property
136 def client_port(self):
137 return self.server_info.client_port
138
139 def reset(self):
140 """Stop the zookeeper instance, cleaning out its on disk-data."""
141 self.stop()
142 shutil.rmtree(os.path.join(self.working_path, "data"))
143 os.mkdir(os.path.join(self.working_path, "data"))
144 with open(os.path.join(self.working_path, "data", "myid"), "w") as fh:
145 fh.write(str(self.server_info.server_id))
146
147 def stop(self):
148 """Stop the Zookeeper instance, retaining on disk state."""
149 if not self._running:
150 return
151 self.process.terminate()
152 self.process.wait()
153 self._running = False
154
155 def destroy(self):
156 """Stop the ZooKeeper instance and destroy its on disk-state"""
157 # called by at exit handler, reimport to avoid cleanup race.
158 import shutil
159 self.stop()
160
161 shutil.rmtree(self.working_path)
162
163
164class ZookeeperCluster(object):
165
166 def __init__(self, install_path, size=3, port_offset=20000):
167 self._install_path = install_path
168 self._servers = []
169
170 # Calculate ports and peer group
171 port = port_offset
172 peers = []
173
174 for i in range(size):
175 port += i * 10
176 info = ServerInfo(i + 1, port, port + 1, port + 2)
177 peers.append(info)
178
179 # Instantiate Managed ZK Servers
180 for i in range(size):
181 server_peers = list(peers)
182 server_info = server_peers.pop(i)
183 self._servers.append(
184 ManagedZooKeeper(
185 self._install_path, server_info, server_peers))
186
187 def __getitem__(self, k):
188 return self._servers[k]
189
190 def __iter__(self):
191 return iter(self._servers)
192
193 def start(self):
194 # Zookeeper client expresses a preference for either lower ports or
195 # lexographical ordering of hosts, to ensure that all servers have a
196 # chance to startup, start them in reverse order.
197 for server in reversed(list(self)):
198 server.run()
199 # Giving the servers a moment to start, decreases the overall time
200 # required for a client to successfully connect (2s vs. 4s without
201 # the sleep).
202 import time
203 time.sleep(2)
204
205 def stop(self):
206 for server in self:
207 server.stop()
208 self._servers = []
209
210 def terminate(self):
211 for server in self:
212 server.destroy()
213
214 def reset(self):
215 for server in self:
216 server.reset()
0217
=== modified file 'txzookeeper/tests/mocker.py'
--- txzookeeper/tests/mocker.py 2011-06-08 21:14:20 +0000
+++ txzookeeper/tests/mocker.py 2011-07-05 13:29:55 +0000
@@ -1923,6 +1923,7 @@
1923 def run(self, path):1923 def run(self, path):
1924 return Mock(self.mocker, path)1924 return Mock(self.mocker, path)
19251925
1926
1926def mock_returner_recorder(mocker, event):1927def mock_returner_recorder(mocker, event):
1927 """Events that lead to other events must return mock objects."""1928 """Events that lead to other events must return mock objects."""
1928 parent_path = event.path.parent_path1929 parent_path = event.path.parent_path
19291930
=== modified file 'txzookeeper/tests/test_client.py'
--- txzookeeper/tests/test_client.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/tests/test_client.py 2011-07-05 13:29:55 +0000
@@ -23,19 +23,25 @@
2323
24from twisted.internet.defer import Deferred24from twisted.internet.defer import Deferred
25from twisted.internet.base import DelayedCall25from twisted.internet.base import DelayedCall
26from twisted.python.failure import Failure
2627
27import zookeeper28import zookeeper
2829
30from mocker import ANY, MATCH
31from txzookeeper.tests import ZookeeperTestCase, utils
29from txzookeeper.client import (32from txzookeeper.client import (
30 ZookeeperClient, ZOO_OPEN_ACL_UNSAFE, ConnectionTimeoutException,33 ZookeeperClient, ZOO_OPEN_ACL_UNSAFE, ConnectionTimeoutException,
31 ConnectionException, ClientEvent)34 ConnectionException, NotConnectedException, ClientEvent)
32
33from mocker import ANY
34from txzookeeper.tests import ZookeeperTestCase, utils
3535
36PUBLIC_ACL = ZOO_OPEN_ACL_UNSAFE36PUBLIC_ACL = ZOO_OPEN_ACL_UNSAFE
3737
3838
39def match_deferred(arg):
40 return isinstance(arg, Deferred)
41
42DEFERRED_MATCH = MATCH(match_deferred)
43
44
39class ClientTests(ZookeeperTestCase):45class ClientTests(ZookeeperTestCase):
4046
41 def setUp(self):47 def setUp(self):
@@ -47,9 +53,10 @@
47 if self.client.connected:53 if self.client.connected:
48 utils.deleteTree(handle=self.client.handle)54 utils.deleteTree(handle=self.client.handle)
49 self.client.close()55 self.client.close()
50 del self.client56
51 if self.client2 and self.client2.connected:57 if self.client2 and self.client2.connected:
52 self.client2.close()58 self.client2.close()
59
53 super(ClientTests, self).tearDown()60 super(ClientTests, self).tearDown()
5461
55 def test_wb_connect_after_timeout(self):62 def test_wb_connect_after_timeout(self):
@@ -113,8 +120,10 @@
113 return d120 return d
114121
115 def test_client_event_repr(self):122 def test_client_event_repr(self):
116 event = ClientEvent(4, 'state', 'path')123 event = ClientEvent(zookeeper.SESSION_EVENT,
117 self.assertEqual(repr(event), "<ClientEvent child at 'path'>")124 zookeeper.EXPIRED_SESSION_STATE, '')
125 self.assertEqual(repr(event),
126 "<ClientEvent session at '' state: expired>")
118127
119 def test_client_event_attributes(self):128 def test_client_event_attributes(self):
120 event = ClientEvent(4, 'state', 'path')129 event = ClientEvent(4, 'state', 'path')
@@ -123,6 +132,10 @@
123 self.assertEqual(event.path, 'path')132 self.assertEqual(event.path, 'path')
124 self.assertEqual(event, (4, 'state', 'path'))133 self.assertEqual(event, (4, 'state', 'path'))
125134
135 def test_client_use_while_disconnected_returns_failure(self):
136 return self.assertFailure(
137 self.client.exists("/"), NotConnectedException)
138
126 def test_create_ephemeral_node_and_close_connection(self):139 def test_create_ephemeral_node_and_close_connection(self):
127 """140 """
128 The client can create transient nodes that are destroyed when the141 The client can create transient nodes that are destroyed when the
@@ -307,14 +320,17 @@
307 return self.client.create("/foobar-watched", "rabbit")320 return self.client.create("/foobar-watched", "rabbit")
308321
309 def get_node(path):322 def get_node(path):
310 return self.client.get_and_watch(path)323 data, watch = self.client.get_and_watch(path)
324 return data.addCallback(lambda x: (watch,))
311325
312 def new_connection((data, watch)):326 def new_connection((watch,)):
313 self.client2 = ZookeeperClient("127.0.0.1:2181")327 self.client2 = ZookeeperClient("127.0.0.1:2181")
314 return self.client2.connect(), watch328 return self.client2.connect().addCallback(
329 lambda x, y=None, z=None: (x, watch))
315330
316 def trigger_watch((client, watch)):331 def trigger_watch((client, watch)):
317 zookeeper.delete(self.client2.handle, "/foobar-watched")332 zookeeper.delete(self.client2.handle, "/foobar-watched")
333 self.client2.close()
318 return watch334 return watch
319335
320 def verify_watch(event):336 def verify_watch(event):
@@ -385,11 +401,16 @@
385 """401 """
386 d = self.client.connect()402 d = self.client.connect()
387403
404 def inject_error(result_code, d, extra_codes=None):
405 error = SyntaxError()
406 d.errback(error)
407 return error
408
388 def check_exists(client):409 def check_exists(client):
389 mock_client = self.mocker.patch(client)410 mock_client = self.mocker.patch(client)
390 mock_client._check_result(411 mock_client._check_result(
391 ANY, True, extra_codes=(zookeeper.NONODE,))412 ANY, DEFERRED_MATCH, extra_codes=(zookeeper.NONODE,))
392 self.mocker.result(SyntaxError())413 self.mocker.call(inject_error)
393 self.mocker.replay()414 self.mocker.replay()
394 return client.exists("/zebra-moon")415 return client.exists("/zebra-moon")
395416
@@ -655,20 +676,21 @@
655 return d676 return d
656677
657 def test_get_children_with_error(self):678 def test_get_children_with_error(self):
679 """If the result of an api call is an error, its propgated.
680 """
658 d = self.client.connect()681 d = self.client.connect()
659682
660 def get_children(client):683 def get_children(client):
661 mock_client = self.mocker.patch(self.client)684 # Get the children of a nonexistant node
662 mock_client._check_result(ANY, True)
663 self.mocker.result(SyntaxError())
664 self.mocker.replay()
665 return client.get_children("/tower")685 return client.get_children("/tower")
666686
667 def verify_failure(failure):687 def verify_failure(failure):
668 self.assertTrue(isinstance(failure.value, SyntaxError))688 self.assertTrue(isinstance(failure, Failure))
689 self.assertTrue(
690 isinstance(failure.value, zookeeper.NoNodeException))
669691
670 d.addCallback(get_children)692 d.addCallback(get_children)
671 d.addErrback(verify_failure)693 d.addBoth(verify_failure)
672 return d694 return d
673695
674 # seems to be a segfault on this one, must be running latest zk696 # seems to be a segfault on this one, must be running latest zk
@@ -825,7 +847,7 @@
825 def verify_node_access(stat):847 def verify_node_access(stat):
826 self.assertEqual(stat['version'], 1)848 self.assertEqual(stat['version'], 1)
827 self.assertEqual(stat['dataLength'], 3)849 self.assertEqual(stat['dataLength'], 3)
828 self.assertTrue(failed) # we should have hit the errback850 self.assertTrue(failed) # we should have hit the errback
829851
830 d.addCallback(add_auth_one)852 d.addCallback(add_auth_one)
831 d.addCallback(create_node)853 d.addCallback(create_node)
@@ -905,10 +927,6 @@
905 acl = dict(scheme="digest", id="a:b", perms=zookeeper.PERM_ALL)927 acl = dict(scheme="digest", id="a:b", perms=zookeeper.PERM_ALL)
906928
907 def set_acl(client):929 def set_acl(client):
908 mock_client = self.mocker.patch(client)
909 mock_client._check_result(ANY, True)
910 self.mocker.result(zookeeper.NoNodeException())
911 self.mocker.replay()
912 return client.set_acl("/zebra-moon22", [acl])930 return client.set_acl("/zebra-moon22", [acl])
913931
914 def verify_failure(failure):932 def verify_failure(failure):
@@ -946,23 +964,22 @@
946 """964 """
947 d = self.client.connect()965 d = self.client.connect()
948966
949 def create_node(client):967 def inject_error(result, d):
950 return client.create("/moose")968 error = zookeeper.ZooKeeperException()
969 d.errback(error)
970 return error
951971
952 def get_acl(path):972 def get_acl(path):
953 mock_client = self.mocker.patch(self.client)973 # Get the ACL of a nonexistant node
954 mock_client._check_result(ANY, True)974 return self.client.get_acl("/moose")
955 self.mocker.result(zookeeper.ZooKeeperException("foobar"))
956 self.mocker.replay()
957 return self.client.get_acl(path)
958975
959 def verify_failure(failure):976 def verify_failure(failure):
977 self.assertTrue(isinstance(failure, Failure))
960 self.assertTrue(978 self.assertTrue(
961 isinstance(failure.value, zookeeper.ZooKeeperException))979 isinstance(failure.value, zookeeper.ZooKeeperException))
962980
963 d.addCallback(create_node)
964 d.addCallback(get_acl)981 d.addCallback(get_acl)
965 d.addErrback(verify_failure)982 d.addBoth(verify_failure)
966 return d983 return d
967984
968 def test_client_id(self):985 def test_client_id(self):
@@ -1006,35 +1023,6 @@
1006 d.addCallback(verify_sync)1023 d.addCallback(verify_sync)
1007 return d1024 return d
10081025
1009 def test_sync_error(self):
1010 """
1011 On error the sync callback returns a an exception/failure.
1012 """
1013 d = self.client.connect()
1014
1015 def create_node(client):
1016 return client.create("/abc")
1017
1018 def client_sync(path):
1019 mock_client = self.mocker.patch(self.client)
1020 mock_client._check_result(ANY, True)
1021 self.mocker.result(zookeeper.ZooKeeperException("foobar"))
1022 self.mocker.replay()
1023 return self.client.sync(path)
1024
1025 def verify_failure(failure):
1026 self.assertTrue(
1027 isinstance(failure.value, zookeeper.ZooKeeperException))
1028
1029 def assert_failed(extra):
1030 self.fail("Should have gone to errback")
1031
1032 d.addCallback(create_node)
1033 d.addCallback(client_sync)
1034 d.addCallback(assert_failed)
1035 d.addErrback(verify_failure)
1036 return d
1037
1038 def test_property_servers(self):1026 def test_property_servers(self):
1039 """1027 """
1040 The servers property of the client, shows which if any servers1028 The servers property of the client, shows which if any servers
@@ -1087,13 +1075,67 @@
1087 return client.set_connection_watcher(1)1075 return client.set_connection_watcher(1)
10881076
1089 def verify_invalid(failure):1077 def verify_invalid(failure):
1090 self.assertEqual(failure.value.args, ("invalid watcher",))1078 self.assertEqual(failure.value.args, ("Invalid Watcher 1",))
1091 self.assertTrue(isinstance(failure.value, SyntaxError))1079 self.assertTrue(isinstance(failure.value, SyntaxError))
10921080
1093 d.addCallback(set_invalid_watcher)1081 d.addCallback(set_invalid_watcher)
1094 d.addErrback(verify_invalid)1082 d.addErrback(verify_invalid)
1095 return d1083 return d
10961084
1085 def xtest_session_expired_event(self):
1086 """
1087 A client session can be reattached to in a separate connection,
1088 if the a session is expired, using the zookeeper connection will
1089 raise a SessionExpiredException.
1090 """
1091 d = self.client.connect()
1092
1093 class StopTest(Exception):
1094 pass
1095
1096 def new_connection_same_connection(client):
1097 self.assertEqual(client.state, zookeeper.CONNECTED_STATE)
1098 return ZookeeperClient("127.0.0.1:2181").connect(
1099 client_id=client.client_id).addErrback(
1100 guard_session_expired, client)
1101
1102 def guard_session_expired(failure, client):
1103 # On occassion we get a session expired event while connecting.
1104 failure.trap(ConnectionException)
1105 self.assertEqual(failure.value.state_name, "expired")
1106 # Stop the test from proceeding
1107 raise StopTest()
1108
1109 def close_new_connection(client):
1110 # Verify both connections are using same session
1111 self.assertEqual(self.client.client_id, client.client_id)
1112 self.assertEqual(client.state, zookeeper.CONNECTED_STATE)
1113
1114 # Closing one connection will close the session
1115 client.close()
1116
1117 # Continued use of the other client will get a
1118 # disconnect exception.
1119 return self.client.exists("/")
1120
1121 def verify_original_closed(failure):
1122 if not isinstance(failure, Failure):
1123 self.fail("Test did not raise exception.")
1124 failure.trap(
1125 zookeeper.SessionExpiredException,
1126 zookeeper.ConnectionLossException)
1127
1128 #print "client close"
1129 self.client.close()
1130 #print "creating new client for teardown"
1131 return self.client.connect()
1132
1133 d.addCallback(new_connection_same_connection)
1134 d.addCallback(close_new_connection)
1135 d.addBoth(verify_original_closed)
1136
1137 return d
1138
1097 def test_connect_with_server(self):1139 def test_connect_with_server(self):
1098 """1140 """
1099 A client's servers can be specified in the connect method.1141 A client's servers can be specified in the connect method.
@@ -1164,14 +1206,14 @@
1164 All of the client apis (with the exception of connect) attempt1206 All of the client apis (with the exception of connect) attempt
1165 to ensure the client is connected before executing an operation.1207 to ensure the client is connected before executing an operation.
1166 """1208 """
1167 self.assertRaises(1209 self.assertFailure(
1168 zookeeper.ZooKeeperException, self.client.get_children, "/abc")1210 self.client.get_children("/abc"), zookeeper.ZooKeeperException)
11691211
1170 self.assertRaises(1212 self.assertFailure(
1171 zookeeper.ZooKeeperException, self.client.create, "/abc")1213 self.client.create("/abc"), zookeeper.ZooKeeperException)
11721214
1173 self.assertRaises(1215 self.assertFailure(
1174 zookeeper.ZooKeeperException, self.client.set, "/abc", "123")1216 self.client.set("/abc", "123"), zookeeper.ZooKeeperException)
11751217
1176 def test_connect_multiple_raises(self):1218 def test_connect_multiple_raises(self):
1177 """1219 """
@@ -1181,8 +1223,9 @@
1181 d = self.client.connect()1223 d = self.client.connect()
11821224
1183 def connect_again(client):1225 def connect_again(client):
1184 self.assertRaises(1226 d = client.connect()
1185 zookeeper.ZooKeeperException, client.connect)1227 self.failUnlessFailure(d, zookeeper.ZooKeeperException)
1228 return d
11861229
1187 d.addCallback(connect_again)1230 d.addCallback(connect_again)
1188 return d1231 return d
@@ -1199,18 +1242,18 @@
1199 d = self.client.connect()1242 d = self.client.connect()
12001243
1201 def verify_failure(client):1244 def verify_failure(client):
1202 self.assertRaises(1245 d = client.create("/abc")
1203 zookeeper.ZooKeeperException, client.create, "/abc")1246 self.failUnlessFailure(d, zookeeper.ZooKeeperException)
12041247
1205 d.addCallback(verify_failure)1248 d.addCallback(verify_failure)
1206 return d1249 return d
12071250
1208 def test_connection_watcher(self):1251 def test_connection_watcher(self):
1209 """1252 """
1210 A connection watcher can be set that recieves notices on when the1253 A connection watcher can be set that receives notices on when
1211 connection state changes. Technically zookeeper would also use this as1254 the connection state changes. Technically zookeeper would also
1212 a global watcher for node state changes, but zkpython doesn't expose1255 use this as a global watcher for node watches, but zkpython
1213 that api, as its mostly considered legacy.1256 doesn't expose that api, as its mostly considered legacy.
12141257
1215 its out of scope to simulate a connection level event within unit tests1258 its out of scope to simulate a connection level event within unit tests
1216 such as the server restarting.1259 such as the server restarting.
@@ -1243,3 +1286,13 @@
1243 If the client is not connected, closing returns None.1286 If the client is not connected, closing returns None.
1244 """1287 """
1245 self.assertEqual(self.client.close(), None)1288 self.assertEqual(self.client.close(), None)
1289
1290 def test_invalid_connection_error_callback(self):
1291 self.assertRaises(TypeError,
1292 self.client.set_connection_error_callback,
1293 None)
1294
1295 def test_invalid_session_callback(self):
1296 self.assertRaises(TypeError,
1297 self.client.set_session_callback,
1298 None)
12461299
=== modified file 'txzookeeper/tests/test_node.py'
--- txzookeeper/tests/test_node.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/tests/test_node.py 2011-07-05 13:29:55 +0000
@@ -59,7 +59,7 @@
59 def _make_digest_identity(self, credentials):59 def _make_digest_identity(self, credentials):
60 user, password = credentials.split(":")60 user, password = credentials.split(":")
61 digest = hashlib.new("sha1", credentials).digest()61 digest = hashlib.new("sha1", credentials).digest()
62 return "%s:%s"%(user, base64.b64encode(digest))62 return "%s:%s" % (user, base64.b64encode(digest))
6363
64 def test_node_name_and_path(self):64 def test_node_name_and_path(self):
65 """65 """
6666
=== modified file 'txzookeeper/tests/test_queue.py'
--- txzookeeper/tests/test_queue.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/tests/test_queue.py 2011-07-05 13:29:55 +0000
@@ -175,7 +175,7 @@
175 watch = Deferred()175 watch = Deferred()
176 self.mocker.result((succeed(["entry-000000"]), watch))176 self.mocker.result((succeed(["entry-000000"]), watch))
177177
178 item_path = "%s/%s"%(path, "entry-000000")178 item_path = "%s/%s" % (path, "entry-000000")
179 mock_client.get(item_path)179 mock_client.get(item_path)
180 self.mocker.result(fail(SyntaxError("x")))180 self.mocker.result(fail(SyntaxError("x")))
181 self.mocker.replay()181 self.mocker.replay()
@@ -230,13 +230,13 @@
230 producer_done = Deferred()230 producer_done = Deferred()
231231
232 def iteration(i):232 def iteration(i):
233 if len(items) == (item_count-1):233 if len(items) == (item_count - 1):
234 return producer_done.callback(None)234 return producer_done.callback(None)
235 items.append(i)235 items.append(i)
236 queue.put(str(i))236 queue.put(str(i))
237237
238 for i in range(item_count):238 for i in range(item_count):
239 reactor.callLater(i*0.05, iteration, i)239 reactor.callLater(i * 0.05, iteration, i)
240 yield producer_done240 yield producer_done
241 returnValue(items)241 returnValue(items)
242242
@@ -284,7 +284,7 @@
284 def producer(start, offset):284 def producer(start, offset):
285 client = yield self.open_client()285 client = yield self.open_client()
286 q = self.queue_factory(path, client)286 q = self.queue_factory(path, client)
287 for i in range(start, start+offset):287 for i in range(start, start + offset):
288 yield q.put(str(i))288 yield q.put(str(i))
289 produce_results.append(str(i))289 produce_results.append(str(i))
290290
@@ -311,7 +311,7 @@
311 yield DeferredList(311 yield DeferredList(
312 [consumer(8), consumer(8), consumer(4)])312 [consumer(8), consumer(8), consumer(4)])
313313
314 err = set(produce_results)-set(consume_results)314 err = set(produce_results) - set(consume_results)
315 self.assertFalse(err)315 self.assertFalse(err)
316316
317 self.assertEqual(len(consume_results), len(produce_results))317 self.assertEqual(len(consume_results), len(produce_results))
318318
=== modified file 'txzookeeper/tests/test_security.py'
--- txzookeeper/tests/test_security.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/tests/test_security.py 2011-07-05 13:29:55 +0000
@@ -73,9 +73,9 @@
73 def connect_users(self, *users):73 def connect_users(self, *users):
74 clients = []74 clients = []
75 for name in users:75 for name in users:
76 ident_user = getattr(self, "ident_%s"%(name), None)76 ident_user = getattr(self, "ident_%s" % (name), None)
77 if ident_user is None:77 if ident_user is None:
78 raise AttributeError("Invalid User %s"%(name))78 raise AttributeError("Invalid User %s" % (name))
79 client = ZookeeperClient("127.0.0.1:2181", 3000)79 client = ZookeeperClient("127.0.0.1:2181", 3000)
80 clients.append(client)80 clients.append(client)
81 yield self.open_and_authenticate(client, ident_user)81 yield self.open_and_authenticate(client, ident_user)
8282
=== added file 'txzookeeper/tests/test_session.py'
--- txzookeeper/tests/test_session.py 1970-01-01 00:00:00 +0000
+++ txzookeeper/tests/test_session.py 2011-07-05 13:29:55 +0000
@@ -0,0 +1,245 @@
1
2import atexit
3import os
4
5import zookeeper
6
7from twisted.internet import reactor
8from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
9
10from txzookeeper import ZookeeperClient
11from txzookeeper.client import NotConnectedException, ConnectionException
12
13from txzookeeper.tests.common import ZookeeperCluster
14from txzookeeper.tests import ZookeeperTestCase
15
16
17ZK_HOME = os.environ.get("ZOOKEEPER_PATH")
18assert ZK_HOME, "ZOOKEEPER_PATH environment variable must be defined"
19
20CLUSTER = ZookeeperCluster(ZK_HOME)
21atexit.register(lambda cluster: cluster.terminate(), CLUSTER)
22
23
24class ClientSessionTests(ZookeeperTestCase):
25
26 def setUp(self):
27 super(ClientSessionTests, self).setUp()
28 self.cluster.start()
29 self.client = None
30 self.client2 = None
31 zookeeper.deterministic_conn_order(True)
32 zookeeper.set_debug_level(0)
33
34 def sleep(self, delay):
35 """Non-blocking sleep."""
36 deferred = Deferred()
37 reactor.callLater(delay, deferred.callback, None)
38 return deferred
39
40 @property
41 def cluster(self):
42 return CLUSTER
43
44 def tearDown(self):
45 super(ClientSessionTests, self).tearDown()
46 self.cluster.reset()
47
48 @inlineCallbacks
49 def test_client_session_migration(self):
50 """A client will automatically rotate servers to ensure a connection.
51
52 A client connected to multiple servers, will transparently
53 migrate amongst them, as individual servers can no longer be
54 reached. A client's session will be maintined.
55 """
56 # Connect to the Zookeeper Cluster
57 servers = ",".join([s.address for s in self.cluster])
58 self.client = ZookeeperClient(servers)
59 yield self.client.connect()
60 yield self.client.create("/hello", flags=zookeeper.EPHEMERAL)
61
62 # Shutdown the server the client is connected to
63 self.cluster[0].stop()
64
65 # Wait for the shutdown and cycle, if we don't wait we'll
66 # get a zookeeper connectionloss exception on occassion.
67 yield self.sleep(0.1)
68
69 self.assertTrue(self.client.connected)
70 exists = yield self.client.exists("/hello")
71 self.assertTrue(exists)
72
73 @inlineCallbacks
74 def test_client_watch_migration(self):
75 """On server rotation, extant watches are still active.
76
77 A client connected to multiple servers, will transparently
78 migrate amongst them, as individual servers can no longer be
79 reached. Watch deferreds issued from the same client instance will
80 continue to function as the session is maintained.
81 """
82 session_events = []
83
84 def session_event_callback(connection, e):
85 session_events.append(e)
86
87 # Connect to the Zookeeper Cluster
88 servers = ",".join([s.address for s in self.cluster])
89 self.client = ZookeeperClient(servers)
90 self.client.set_session_callback(session_event_callback)
91 yield self.client.connect()
92
93 # Setup a watch
94 yield self.client.create("/hello")
95 exists_d, watch_d = self.client.exists_and_watch("/hello")
96 yield exists_d
97
98 # Shutdown the server the client is connected to
99 self.cluster[0].stop()
100
101 # Wait for the shutdown and cycle, if we don't wait we'll
102 # get occasionally get a zookeeper connectionloss exception.
103 yield self.sleep(0.1)
104
105 # The session events that would have been ignored are sent
106 # to the session event callback.
107 self.assertTrue(session_events)
108 self.assertTrue(self.client.connected)
109
110 # If we delete the node, we'll see the watch fire.
111 yield self.client.delete("/hello")
112 event = yield watch_d
113 self.assertEqual(event.type_name, "deleted")
114 self.assertEqual(event.path, "/hello")
115
116 @inlineCallbacks
117 def test_connection_error_handler(self):
118 """A callback can be specified for connection errors.
119
120 We can specify a callback for connection errors, that
121 can perform recovery for a disconnected client, restablishing
122 """
123 @inlineCallbacks
124 def connection_error_handler(connection, error):
125 # On loss of the connection, reconnect the client w/ same session.
126 yield connection.connect(
127 self.cluster[1].address, client_id=connection.client_id)
128 returnValue(23)
129
130 self.client = ZookeeperClient(self.cluster[0].address)
131 self.client.set_connection_error_callback(connection_error_handler)
132 yield self.client.connect()
133
134 yield self.client.create("/hello")
135 exists_d, watch_d = self.client.exists_and_watch("/hello")
136 yield exists_d
137
138 # Shutdown the server the client is connected to
139 self.cluster[0].stop()
140 yield self.sleep(0.1)
141
142 # Results in connection loss exception, and invoking of error handler.
143 result = yield self.client.exists("/hello")
144
145 # The result of the error handler is returned to the api
146 self.assertEqual(result, 23)
147
148 exists = yield self.client.exists("/hello")
149 self.assertTrue(exists)
150
151 @inlineCallbacks
152 def test_client_session_expiration_event(self):
153 """A client which recieves a session expiration event.
154 """
155 session_events = []
156 events_received = Deferred()
157
158 def session_event_callback(connection, e):
159 session_events.append(e)
160 if len(session_events) == 4:
161 events_received.callback(True)
162
163 # Connect to a node in the cluster and establish a watch
164 self.client = ZookeeperClient(self.cluster[0].address)
165 self.client.set_session_callback(session_event_callback)
166 yield self.client.connect()
167
168 child_d, watch_d = self.client.get_children_and_watch("/")
169 yield child_d
170
171 # Connect a client to the same session on a different node.
172 self.client2 = ZookeeperClient(self.cluster[0].address)
173 yield self.client2.connect(client_id=self.client.client_id)
174
175 # Close the new client and wait for the event propogation
176 yield self.client2.close()
177
178 # It can take some time for this to propagate
179 yield events_received
180 self.assertEqual(len(session_events), 4)
181 self.assertEqual(session_events[-1].state_name, "expired")
182
183 # The connection is dead without reconnecting.
184 yield self.assertFailure(
185 self.client.exists("/"),
186 NotConnectedException, ConnectionException)
187
188 self.assertTrue(self.client.unrecoverable)
189
190 # If a reconnect attempt is made with a dead session id
191 #yield self.client.connect(client_id=self.client.client_id)
192
193 test_client_session_expiration_event.timeout = 10
194
195 @inlineCallbacks
196 def test_client_reconnect_session_on_different_server(self):
197 """On connection failure, An application can choose to use a
198 new connection with which to reconnect to a different member
199 of the zookeeper cluster, reacquiring the extant session.
200
201 A large obvious caveat to using a new client instance rather
202 than reconnecting the existing client, is that even though the
203 session has outstanding watches, the watch callbacks/deferreds
204 won't be active unless the client instance used to create them
205 is connected.
206 """
207 session_events = []
208
209 def session_event_callback(connection, e):
210 session_events.append(e)
211
212 # Connect to a node in the cluster and establish a watch
213 self.client = ZookeeperClient(self.cluster[2].address)
214 self.client.set_session_callback(session_event_callback)
215 yield self.client.connect()
216
217 yield self.client.create("/hello", flags=zookeeper.EPHEMERAL)
218 exists_d, watch_d = self.client.exists_and_watch("/hello")
219 yield exists_d
220
221 # Shutdown the server the client is connected to
222 self.cluster[2].stop()
223 yield self.sleep(0.1)
224
225 # Verify we got a session event regarding the down server
226 self.assertTrue(session_events)
227
228 # Open up a new connection to a different server with same session
229 self.client2 = ZookeeperClient(self.cluster[0].address)
230 yield self.client2.connect(client_id=self.client.client_id)
231
232 # Close the old disconnected client
233 self.client.close()
234
235 # Verify the ephemeral still exists
236 exists = yield self.client2.exists("/hello")
237 self.assertTrue(exists)
238
239 # Destroy the session and reconnect
240 self.client2.close()
241 yield self.client.connect(self.cluster[0].address)
242
243 # Ephemeral is destroyed when the session closed.
244 exists = yield self.client.exists("/hello")
245 self.assertFalse(exists)
0246
=== modified file 'txzookeeper/tests/test_utils.py'
--- txzookeeper/tests/test_utils.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/tests/test_utils.py 2011-07-05 13:29:55 +0000
@@ -34,7 +34,7 @@
34 def update_function_increment(self, content, stat):34 def update_function_increment(self, content, stat):
35 if not content:35 if not content:
36 return str(0)36 return str(0)
37 return str(int(content)+1)37 return str(int(content) + 1)
3838
39 def setUp(self):39 def setUp(self):
40 super(RetryChangeTest, self).setUp()40 super(RetryChangeTest, self).setUp()
4141
=== modified file 'txzookeeper/tests/utils.py'
--- txzookeeper/tests/utils.py 2011-06-17 23:52:34 +0000
+++ txzookeeper/tests/utils.py 2011-07-05 13:29:55 +0000
@@ -25,9 +25,9 @@
25 Destroy all the nodes in zookeeper (typically under a chroot for testing)25 Destroy all the nodes in zookeeper (typically under a chroot for testing)
26 """26 """
27 for child in zookeeper.get_children(handle, path):27 for child in zookeeper.get_children(handle, path):
28 if child == "zookeeper": # skip the metadata node28 if child == "zookeeper": # skip the metadata node
29 continue29 continue
30 child_path = "/"+("%s/%s"%(path, child)).strip("/")30 child_path = "/" + ("%s/%s" % (path, child)).strip("/")
31 try:31 try:
32 deleteTree(child_path, handle)32 deleteTree(child_path, handle)
33 zookeeper.delete(handle, child_path, -1)33 zookeeper.delete(handle, child_path, -1)

Subscribers

People subscribed via source and target branches

to all changes: