Merge lp:~hazmat/txzookeeper/session-event-handling into lp:txzookeeper
- session-event-handling
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Gustavo Niemeyer | ||||
Approved revision: | 54 | ||||
Merged at revision: | 41 | ||||
Proposed branch: | lp:~hazmat/txzookeeper/session-event-handling | ||||
Merge into: | lp:txzookeeper | ||||
Diff against target: |
1728 lines (+896/-206) 12 files modified
txzookeeper/client.py (+285/-110) txzookeeper/queue.py (+5/-5) txzookeeper/tests/__init__.py (+2/-2) txzookeeper/tests/common.py (+216/-0) txzookeeper/tests/mocker.py (+1/-0) txzookeeper/tests/test_client.py (+131/-78) txzookeeper/tests/test_node.py (+1/-1) txzookeeper/tests/test_queue.py (+5/-5) txzookeeper/tests/test_security.py (+2/-2) txzookeeper/tests/test_session.py (+245/-0) txzookeeper/tests/test_utils.py (+1/-1) txzookeeper/tests/utils.py (+2/-2) |
||||
To merge this branch: | bzr merge lp:~hazmat/txzookeeper/session-event-handling | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gustavo Niemeyer | Approve | ||
Jim Baker | Pending | ||
Review via email: mp+66901@code.launchpad.net |
This proposal supersedes a proposal from 2011-06-20.
Commit message
Description of the change
Session event and connection loss handling for txzookeeper.
- Session tests using a ZK cluster as a test layer/test resource, cluster state
is reset between tests.
- Zookeeper Client session tests for server rotation across multi-node cluster.
- Client server rotation and session/watch migration tests.
- Session event handling, sent to user defined callback, else ignored by default.
- Connection loss handling, sent to user defined callback, else raised at the
API call point. The callback result if any is returned as the error to be
returned to the API.
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal | # |
Gustavo Niemeyer (niemeyer) wrote : Posted in a previous version of this proposal | # |
This looks very good. A few comments, and +1 with them considered.
[1]
+ zookeeper.
+ zookeeper.
+ zookeeper.
Quotes are inconsistent here.
Also, "not-watching" would be a bit more friendly to the eye.
[2]
+def debug_symbols(
Left over?
[3]
error = error_class(
+
+ if ConnectionExcep
This idiom feels a little weird. A Foo is necessarily a Foo,
so we never have Foo.is_foo. Unless you have some background
for the choice that I miss, I suggest just moving that to a
"is_connection_
[4]
+ # The result of the connection error handler is returned
+ # to the api.
That's a nice design, despite our conversations around the problem of
connection level erroring.
I'm a bit concerned about the session events, though. A session-termination
event may be the only event the deferred ever sees, so if we divert it to
a separate callback, the logic pending on the deferred will be dead forever.
I'm not sure about what we should do here. Have you thought about this?
If the answer is to just be so for now, we should at least add a note to
the proper location in the code mentioning that these deferreds are dead.
[5]
+ d = defer.maybeDefe
+ self._connectio
+ self, error)
For consistency, either both callbacks should take "self", or neither of them
should.
[6]
+ # XXX/Debug
+ #print
+ #print "Session/Conn Event", ClientEvent(type, state, path)
+
Some left overs.
[7]
+ self._check_
+ if not d.called:
+ d.callback(True)
This is checking if the deferred is being called synchronously, which seems
dangerous as a pattern. Theorethically, the deferred may not have fired by
the time the function returns.
I suggest using the interface defined (considering the suggestions from
the pre-requisite branch):
if not self._failed(
d.
return d
Jim Baker (jimbaker) wrote : Posted in a previous version of this proposal | # |
+1, we certainly need this branch, and it looks quite solid.
In addition to Gustavo's comments:
[1]
Some minor grammar suggestions:
+ specified in comma separated fashion, if they are,
specified in comma separated fashion. If they are,
+ # If its a session event pass it to the session callback, else
If it's a session event, ...
+ nodes and watches)/ The connection's client id, is also useful
+ when introspecting the server logs for specific client
+ activity.
nodes and watches. The connection's client id is also...
+ Its used for all connection level events and session events.
It's used...
+ # Send session events to the callback, this in addition to any
+ # duplicate session events that will be sent for extant watches.
callback, in addition
+ """Set a callback to recieve session events.
receive
+ twisted integration using deferreds is not capable of recieving
receiving
+ to be invoked with them instead. The callback recieves a single
receives
+ call is made, by setting a connection level error handler,
+ applications can centralize their handling of connection loss,
Something like
By setting...
+ This is a global setting, across connections.
setting across all connections
+ A client session can be reattached to in a separate connection,
+ if the a session is expired, using the zookeeper connection will
+ raise a SessionExpiredE
+ # On occassion we get a session expired event while connecting.
occasion
+ # Closing one connection, will close the session
Closing one connection will close the session.
+ A client connected to multiple servers, will transparently
+ migrate amongst them, as individual servers can no longer be
+ reached. A client's session will be maintined.
servers will transparently...
...will be maintained.
+ # get a zookeeper connectionloss exception on occassion.
occasion
+ We can specify a callback for connection errors, that
+ can perform recovery for a disconnected client, restablishing
This seems incomplete; also there should not be a comma before "that"
+ """On connection failure, An application can choose to use a
+ new connection with to reconnect to a different member of the
+ zookeeper cluster, reacquiring the extant session.
an application... with which to reconnect...
Maybe this concludes with "This enables reacquiring the existing
session." (Extant doesn't seem like the right word choice here.)
+ A large obvious caveat to using a new client instance rather
+ than reconnecting the existing client, is that even though the
instance, rather than...
+ # Ephemeral is destroyed when the session closed.
the session is closed.
[2]
+ http://
+ http://
Why not just expand these links out? Alternatively use a readable URL
(the underlying nabble URL is modestly readable, I don't know how
permalink either one is, however).
[3]
+ assert callable(callback)
Shouldn't this be throwing a TypeError instead? Also, there is no
correponding assertion (or verification) on
set_connection_
[4]
+ def xtest_session_
Do you want to include this test?
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal | # |
Excerpts from Gustavo Niemeyer's message of Mon Jun 20 18:28:28 UTC 2011:
> Review: Approve
> This looks very good. A few comments, and +1 with them considered.
>
1-3 addressed.
>
> [4]
>
> + # The result of the connection error handler is returned
> + # to the api.
>
> That's a nice design, despite our conversations around the problem of
> connection level erroring.
>
> I'm a bit concerned about the session events, though. A session-termination
> event may be the only event the deferred ever sees, so if we divert it to
> a separate callback, the logic pending on the deferred will be dead forever.
>
> I'm not sure about what we should do here. Have you thought about this?
>
> If the answer is to just be so for now, we should at least add a note to
> the proper location in the code mentioning that these deferreds are dead.
>
Any deferreds attached to an unconnected client are dead. A session 'expired' event
is effectively a connection loss, the extant deferreds are dead. The expired
event is the only fatal session event. There is some testing around this (extant
watches on dead clients).
Alternatively, we could route session exceptions to the extant watcher, but then
we're back to the app code having to add guards to each watch for connection loss,
or lots of unhandled exceptions in deferred stacks. The connection exception
watcher will still fire as its always registered on the connection level watcher,
ie. the one that monitors the initial connection setup.
From a cleanup perspective this might be preferrable, but the numbers of extant
watches are small enough its not clear its really win, and most of that cleanup
will just result in ignorable log errors without connection loss handling on all
watches. The error handling itself and the error handling for a watchis basically
just ignore or propgate the exception to fully clear out the deferred chain with
the connection level error callback handling the actual reconnect scenario.
[5-6] Addressed
> [7]
>
> + self._check_
> + if not d.called:
> + d.callback(True)
>
> This is checking if the deferred is being called synchronously, which seems
> dangerous as a pattern. Theorethically, the deferred may not have fired by
> the time the function returns.
>
>
> I suggest using the interface defined (considering the suggestions from
> the pre-requisite branch):
>
> if not self._failed(
> d.callback(True)
> return d
>
In this case the txzk api is synchronous (close) and we're checking just checking
if its failed. I've updated the construct to not check the deferred.called status.
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal | # |
Excerpts from Jim Baker's message of Tue Jun 21 00:18:33 UTC 2011:
> Review: Approve
> +1, we certainly need this branch, and it looks quite solid.
>
> In addition to Gustavo's comments:
>
> [1]
>
Added most of the grammatical changes.
>
> [2]
>
> + http://
> + http://
>
> Why not just expand these links out? Alternatively use a readable URL
> (the underlying nabble URL is modestly readable, I don't know how
> permalink either one is, however).
>
The links are rather ugly by themselves and violate 80 col lines.
>
> [3]
>
> + assert callable(callback)
>
> Shouldn't this be throwing a TypeError instead? Also, there is no
> correponding assertion (or verification) on
> set_connection_
Well the tests definitely exercised it by way of using the functionality of the callback, but i've gone ahead and added the typeerror and invalid callback tests.
>
>
> [4]
>
> + def xtest_session_
>
> Do you want to include this test?
>
nice catch, i did but it was before the implementation of the session work, i've moved the test to test_session
thanks for the review,
cheers,
Kapil
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal | # |
gustavo, even though this branch is approved, i'm going to wait on a final comment/agreement on [4] before merging.
Gustavo Niemeyer (niemeyer) wrote : | # |
Deep conversation about error handling in Ensemble, in the context of item [4] above.
We've agreed to merge this as-is, since clearly there's a lot of work to be made for the desired end result.
Jul 05 17:30:51 <niemeyer> Let me understand this.. are you introducing this logic so that the application we'll use this to crash the agent?
Jul 05 17:30:59 <niemeyer> s/we'll/will
Jul 05 17:31:14 <niemeyer> If that's the case, as we've already agreed, that's ok for now
Jul 05 17:31:15 <hazmat_> that's one approach
Jul 05 17:31:36 <hazmat_> the other is letting the agent stop/start
Jul 05 17:31:47 <niemeyer> I just want to understand what we're doing, though, and what would be the proper way
Jul 05 17:31:53 <hazmat_> i know.. there's dead stuff on the reactor if we start/stop
Jul 05 17:31:58 <niemeyer> Not just that..
Jul 05 17:32:10 <niemeyer> The dead stuff is not my major worry
Jul 05 17:32:21 <niemeyer> The real problem I see is that we simply can't trust any code anymore
Jul 05 17:32:33 <niemeyer> Imagine.. "Oh, I'll loop until file foo is created."
Jul 05 17:32:34 <niemeyer> NOT!
Jul 05 17:32:43 <niemeyer> The loop never dies..
Jul 05 17:32:55 <niemeyer> *anything* we do would be entirely unreliable
Jul 05 17:33:01 <niemeyer> if it's touching a watch
Jul 05 17:33:13 <niemeyer> It's like piecemeal crashing..
Jul 05 17:33:36 <hazmat_> right error handling is application specific
Jul 05 17:34:23 <hazmat_> with the existing watch protocols, its not an issue, except for the case of a hook currently executing (which needs cleanup even in the case of a process shutdown)
Jul 05 17:35:11 <niemeyer> I'm pretty sure I can find cases where it is an issue..
Jul 05 17:35:21 <niemeyer> relation-get ip
Jul 05 17:35:35 <niemeyer> relation-get goes into the agent..
Jul 05 17:35:45 <niemeyer> Never returns again..
Jul 05 17:35:52 <niemeyer> Dead processes
Jul 05 17:36:05 <hazmat_> that's the case i just mentioned
Jul 05 17:36:22 <niemeyer> I see
Jul 05 17:36:34 <niemeyer> I read "watch currently executed" when you said hook
Jul 05 17:36:56 <niemeyer> Anyway.. that's the kind of issue
Jul 05 17:37:07 <niemeyer> There are likely more problematic cases already
Jul 05 17:38:58 <hazmat_> yup.. the interesting case to me, would be something that where we our 'interactively' waiting on a watch, which we don't have, all the existing watch apis are effectively complete on their own (ie. request / response cycles).. if we want to consider arbitrary code.. sure we can construct problem scenarios.. but the existing usage i think is okay..
Jul 05 17:39:00 * hazmat_ brainstorms
Jul 05 17:39:24 <niemeyer> Well, it's already not ok as we have just both spotted
Jul 05 17:40:13 <hazmat_> well the hook scenario is very different then the dead logic waiting issue
Jul 05 17:40:19 <niemeyer> Why?
Jul 05 17:40:33 <hazmat_> it doesn't really care if the session terminates, it just needs an active connection to zk
Jul 05 17:41:06 <niemeyer> Hmmm
Jul 05 17:43:27 <hazmat_> pretty much all of the watch protocols/callbacks are based on examining the current state, and then effecting changes for their area of responsibility to match that current state.. if their restarted, they function t...
Preview Diff
1 | === modified file 'txzookeeper/client.py' | |||
2 | --- txzookeeper/client.py 2011-06-17 23:52:34 +0000 | |||
3 | +++ txzookeeper/client.py 2011-07-05 13:29:55 +0000 | |||
4 | @@ -60,6 +60,24 @@ | |||
5 | 60 | zookeeper.SYSTEMERROR: zookeeper.SystemErrorException, | 60 | zookeeper.SYSTEMERROR: zookeeper.SystemErrorException, |
6 | 61 | zookeeper.UNIMPLEMENTED: zookeeper.UnimplementedException} | 61 | zookeeper.UNIMPLEMENTED: zookeeper.UnimplementedException} |
7 | 62 | 62 | ||
8 | 63 | # Mapping of connection state values to human strings. | ||
9 | 64 | STATE_NAME_MAPPING = { | ||
10 | 65 | zookeeper.ASSOCIATING_STATE: "associating", | ||
11 | 66 | zookeeper.AUTH_FAILED_STATE: "auth-failed", | ||
12 | 67 | zookeeper.CONNECTED_STATE: "connected", | ||
13 | 68 | zookeeper.CONNECTING_STATE: "connecting", | ||
14 | 69 | zookeeper.EXPIRED_SESSION_STATE: "expired", | ||
15 | 70 | } | ||
16 | 71 | |||
17 | 72 | # Mapping of event type to human string. | ||
18 | 73 | TYPE_NAME_MAPPING = { | ||
19 | 74 | zookeeper.NOTWATCHING_EVENT: "not-watching", | ||
20 | 75 | zookeeper.SESSION_EVENT: "session", | ||
21 | 76 | zookeeper.CREATED_EVENT: "created", | ||
22 | 77 | zookeeper.DELETED_EVENT: "deleted", | ||
23 | 78 | zookeeper.CHANGED_EVENT: "changed", | ||
24 | 79 | zookeeper.CHILD_EVENT: "child"} | ||
25 | 80 | |||
26 | 63 | 81 | ||
27 | 64 | class NotConnectedException(zookeeper.ZooKeeperException): | 82 | class NotConnectedException(zookeeper.ZooKeeperException): |
28 | 65 | """ | 83 | """ |
29 | @@ -73,6 +91,29 @@ | |||
30 | 73 | Raised if an error occurs during the client's connection attempt. | 91 | Raised if an error occurs during the client's connection attempt. |
31 | 74 | """ | 92 | """ |
32 | 75 | 93 | ||
33 | 94 | @property | ||
34 | 95 | def state_name(self): | ||
35 | 96 | return STATE_NAME_MAPPING[self.args[2]] | ||
36 | 97 | |||
37 | 98 | @property | ||
38 | 99 | def type_name(self): | ||
39 | 100 | return TYPE_NAME_MAPPING[self.args[1]] | ||
40 | 101 | |||
41 | 102 | def __str__(self): | ||
42 | 103 | return "<txzookeeper.ConnectionException type: %s state: %s>" % ( | ||
43 | 104 | self.type_name, self.state_name) | ||
44 | 105 | |||
45 | 106 | |||
46 | 107 | def is_connection_exception(e): | ||
47 | 108 | """ | ||
48 | 109 | For connection errors in response to api calls, a utility method | ||
49 | 110 | to determine if the cause is a connection exception. | ||
50 | 111 | """ | ||
51 | 112 | return isinstance(e, | ||
52 | 113 | (zookeeper.ClosingException, | ||
53 | 114 | zookeeper.ConnectionLossException, | ||
54 | 115 | zookeeper.SessionExpiredException)) | ||
55 | 116 | |||
56 | 76 | 117 | ||
57 | 77 | class ConnectionTimeoutException(zookeeper.ZooKeeperException): | 118 | class ConnectionTimeoutException(zookeeper.ZooKeeperException): |
58 | 78 | """ | 119 | """ |
59 | @@ -87,94 +128,131 @@ | |||
60 | 87 | some event on the zookeeper client that the watch was requested on. | 128 | some event on the zookeeper client that the watch was requested on. |
61 | 88 | """ | 129 | """ |
62 | 89 | 130 | ||
63 | 90 | type_name_map = { | ||
64 | 91 | -2: "notwatching", | ||
65 | 92 | -1: "session", | ||
66 | 93 | 1: 'created', | ||
67 | 94 | 2: 'deleted', | ||
68 | 95 | 3: 'changed', | ||
69 | 96 | 4: 'child'} | ||
70 | 97 | |||
71 | 98 | @property | 131 | @property |
72 | 99 | def type_name(self): | 132 | def type_name(self): |
74 | 100 | return self.type_name_map[self.type] | 133 | return TYPE_NAME_MAPPING[self.type] |
75 | 134 | |||
76 | 135 | @property | ||
77 | 136 | def state_name(self): | ||
78 | 137 | return STATE_NAME_MAPPING[self.connection_state] | ||
79 | 101 | 138 | ||
80 | 102 | def __repr__(self): | 139 | def __repr__(self): |
82 | 103 | return "<ClientEvent %s at %r>" % (self.type_name, self.path) | 140 | return "<ClientEvent %s at %r state: %s>" % ( |
83 | 141 | self.type_name, self.path, self.state_name) | ||
84 | 104 | 142 | ||
85 | 105 | 143 | ||
86 | 106 | class ZookeeperClient(object): | 144 | class ZookeeperClient(object): |
87 | 107 | """Asynchronous twisted client for zookeeper.""" | 145 | """Asynchronous twisted client for zookeeper.""" |
88 | 108 | 146 | ||
89 | 109 | def __init__(self, servers=None, session_timeout=None): | 147 | def __init__(self, servers=None, session_timeout=None): |
90 | 148 | """ | ||
91 | 149 | @param servers: A string specifying the servers and their | ||
92 | 150 | ports to connect to. Multiple servers can be | ||
93 | 151 | specified in comma separated fashion. if they are, | ||
94 | 152 | then the client will automatically rotate | ||
95 | 153 | among them if a server connection fails. Optionally | ||
96 | 154 | a chroot can be specified. A full server spec looks | ||
97 | 155 | like host:port/chroot_path | ||
98 | 156 | |||
99 | 157 | @param session_timeout: The client's zookeeper session timeout can be | ||
100 | 158 | hinted. The actual value is negotiated between the | ||
101 | 159 | client and server based on their respective | ||
102 | 160 | configurations. | ||
103 | 161 | """ | ||
104 | 110 | self._servers = servers | 162 | self._servers = servers |
105 | 111 | self._session_timeout = session_timeout | 163 | self._session_timeout = session_timeout |
106 | 164 | self._session_event_callback = None | ||
107 | 165 | self._connection_error_callback = None | ||
108 | 112 | self.connected = False | 166 | self.connected = False |
109 | 113 | self.handle = None | 167 | self.handle = None |
110 | 114 | 168 | ||
112 | 115 | def _check_connected(self): | 169 | def _check_connected(self, d): |
113 | 116 | if not self.connected: | 170 | if not self.connected: |
117 | 117 | raise NotConnectedException("not connected") | 171 | d.errback(NotConnectedException("not connected")) |
118 | 118 | 172 | return d | |
119 | 119 | def _check_result(self, result_code, callback=False, extra_codes=()): | 173 | |
120 | 174 | def _check_result(self, result_code, deferred, extra_codes=()): | ||
121 | 175 | """Check an API call or result for errors. | ||
122 | 176 | |||
123 | 177 | :param result_code: The api result code. | ||
124 | 178 | :param deferred: The deferred returned the client api consumer. | ||
125 | 179 | :param extra_codes: Additional result codes accepted as valid/ok. | ||
126 | 180 | |||
127 | 181 | If the result code is an error, an appropriate Exception class | ||
128 | 182 | is constructed and the errback on the deferred is invoked with it. | ||
129 | 183 | """ | ||
130 | 120 | error = None | 184 | error = None |
131 | 121 | if not result_code == zookeeper.OK and not result_code in extra_codes: | 185 | if not result_code == zookeeper.OK and not result_code in extra_codes: |
132 | 122 | error_msg = zookeeper.zerror(result_code) | 186 | error_msg = zookeeper.zerror(result_code) |
133 | 123 | error_class = ERROR_MAPPING.get( | 187 | error_class = ERROR_MAPPING.get( |
134 | 124 | result_code, zookeeper.ZooKeeperException) | 188 | result_code, zookeeper.ZooKeeperException) |
135 | 125 | error = error_class(error_msg) | 189 | error = error_class(error_msg) |
139 | 126 | if callback: | 190 | |
140 | 127 | return error | 191 | if is_connection_exception(error): |
141 | 128 | raise error | 192 | # Mark the client as disconnected. |
142 | 193 | self.connected = False | ||
143 | 194 | # Route connection errors to a connection level error | ||
144 | 195 | # handler if specified. | ||
145 | 196 | if self._connection_error_callback: | ||
146 | 197 | # The result of the connection error handler is returned | ||
147 | 198 | # to the api. | ||
148 | 199 | d = defer.maybeDeferred( | ||
149 | 200 | self._connection_error_callback, | ||
150 | 201 | self, error) | ||
151 | 202 | d.chainDeferred(deferred) | ||
152 | 203 | return True | ||
153 | 204 | |||
154 | 205 | deferred.errback(error) | ||
155 | 206 | return True | ||
156 | 129 | return None | 207 | return None |
157 | 130 | 208 | ||
158 | 131 | def _get(self, path, watcher): | 209 | def _get(self, path, watcher): |
159 | 132 | self._check_connected() | ||
160 | 133 | d = defer.Deferred() | 210 | d = defer.Deferred() |
161 | 211 | if self._check_connected(d): | ||
162 | 212 | return d | ||
163 | 134 | 213 | ||
164 | 135 | def _cb_get(result_code, value, stat): | 214 | def _cb_get(result_code, value, stat): |
168 | 136 | error = self._check_result(result_code, True) | 215 | if self._check_result(result_code, d): |
169 | 137 | if error: | 216 | return |
167 | 138 | return d.errback(error) | ||
170 | 139 | d.callback((value, stat)) | 217 | d.callback((value, stat)) |
171 | 140 | 218 | ||
172 | 141 | callback = self._zk_thread_callback(_cb_get) | 219 | callback = self._zk_thread_callback(_cb_get) |
173 | 142 | watcher = self._wrap_watcher(watcher) | 220 | watcher = self._wrap_watcher(watcher) |
174 | 143 | result = zookeeper.aget(self.handle, path, watcher, callback) | 221 | result = zookeeper.aget(self.handle, path, watcher, callback) |
176 | 144 | self._check_result(result) | 222 | self._check_result(result, d) |
177 | 145 | return d | 223 | return d |
178 | 146 | 224 | ||
179 | 147 | def _get_children(self, path, watcher): | 225 | def _get_children(self, path, watcher): |
180 | 148 | self._check_connected() | ||
181 | 149 | d = defer.Deferred() | 226 | d = defer.Deferred() |
182 | 227 | if self._check_connected(d): | ||
183 | 228 | return d | ||
184 | 150 | 229 | ||
185 | 151 | def _cb_get_children(result_code, children): | 230 | def _cb_get_children(result_code, children): |
189 | 152 | error = self._check_result(result_code, True) | 231 | if self._check_result(result_code, d): |
190 | 153 | if error: | 232 | return |
188 | 154 | return d.errback(error) | ||
191 | 155 | d.callback(children) | 233 | d.callback(children) |
192 | 156 | 234 | ||
193 | 157 | callback = self._zk_thread_callback(_cb_get_children) | 235 | callback = self._zk_thread_callback(_cb_get_children) |
194 | 158 | watcher = self._wrap_watcher(watcher) | 236 | watcher = self._wrap_watcher(watcher) |
195 | 159 | result = zookeeper.aget_children(self.handle, path, watcher, callback) | 237 | result = zookeeper.aget_children(self.handle, path, watcher, callback) |
197 | 160 | self._check_result(result) | 238 | self._check_result(result, d) |
198 | 161 | return d | 239 | return d |
199 | 162 | 240 | ||
200 | 163 | def _exists(self, path, watcher): | 241 | def _exists(self, path, watcher): |
201 | 164 | self._check_connected() | ||
202 | 165 | d = defer.Deferred() | 242 | d = defer.Deferred() |
203 | 243 | if self._check_connected(d): | ||
204 | 244 | return d | ||
205 | 166 | 245 | ||
206 | 167 | def _cb_exists(result_code, stat): | 246 | def _cb_exists(result_code, stat): |
211 | 168 | error = self._check_result( | 247 | if self._check_result( |
212 | 169 | result_code, True, extra_codes=(zookeeper.NONODE,)) | 248 | result_code, d, extra_codes=(zookeeper.NONODE,)): |
213 | 170 | if error: | 249 | return |
210 | 171 | return d.errback(error) | ||
214 | 172 | d.callback(stat) | 250 | d.callback(stat) |
215 | 173 | 251 | ||
216 | 174 | callback = self._zk_thread_callback(_cb_exists) | 252 | callback = self._zk_thread_callback(_cb_exists) |
217 | 175 | watcher = self._wrap_watcher(watcher) | 253 | watcher = self._wrap_watcher(watcher) |
218 | 176 | result = zookeeper.aexists(self.handle, path, watcher, callback) | 254 | result = zookeeper.aexists(self.handle, path, watcher, callback) |
220 | 177 | self._check_result(result) | 255 | self._check_result(result, d) |
221 | 178 | return d | 256 | return d |
222 | 179 | 257 | ||
223 | 180 | def _wrap_watcher(self, watcher): | 258 | def _wrap_watcher(self, watcher): |
224 | @@ -182,7 +260,22 @@ | |||
225 | 182 | return watcher | 260 | return watcher |
226 | 183 | if not callable(watcher): | 261 | if not callable(watcher): |
227 | 184 | raise SyntaxError("invalid watcher") | 262 | raise SyntaxError("invalid watcher") |
229 | 185 | return self._zk_thread_callback(watcher) | 263 | return self._zk_thread_callback( |
230 | 264 | partial(self._session_event_wrapper, watcher)) | ||
231 | 265 | |||
232 | 266 | def _session_event_wrapper(self, watcher, event_type, conn_state, path): | ||
233 | 267 | """Watch wrapper that diverts session events to a connection callback. | ||
234 | 268 | """ | ||
235 | 269 | # If it's a session event pass it to the session callback, else | ||
236 | 270 | # ignore it. Session events are sent repeatedly to watchers | ||
237 | 271 | # which we have modeled after deferred, which only accept a | ||
238 | 272 | # single return value. | ||
239 | 273 | if event_type == zookeeper.SESSION_EVENT: | ||
240 | 274 | if self._session_event_callback: | ||
241 | 275 | self._session_event_callback( | ||
242 | 276 | self, ClientEvent(event_type, conn_state, path)) | ||
243 | 277 | else: | ||
244 | 278 | return watcher(event_type, conn_state, path) | ||
245 | 186 | 279 | ||
246 | 187 | def _zk_thread_callback(self, func): | 280 | def _zk_thread_callback(self, func): |
247 | 188 | """ | 281 | """ |
248 | @@ -190,7 +283,6 @@ | |||
249 | 190 | any user defined callback so that they are called back in the main | 283 | any user defined callback so that they are called back in the main |
250 | 191 | thread after, zookeeper calls the wrapper. | 284 | thread after, zookeeper calls the wrapper. |
251 | 192 | """ | 285 | """ |
252 | 193 | |||
253 | 194 | def wrapper(handle, *args): # pragma: no cover | 286 | def wrapper(handle, *args): # pragma: no cover |
254 | 195 | reactor.callFromThread(func, *args) | 287 | reactor.callFromThread(func, *args) |
255 | 196 | return wrapper | 288 | return wrapper |
256 | @@ -207,6 +299,7 @@ | |||
257 | 207 | def session_timeout(self): | 299 | def session_timeout(self): |
258 | 208 | """ | 300 | """ |
259 | 209 | What's the negotiated session timeout for this connection, in seconds. | 301 | What's the negotiated session timeout for this connection, in seconds. |
260 | 302 | If the client is not connected the value is None. | ||
261 | 210 | """ | 303 | """ |
262 | 211 | if self.connected: | 304 | if self.connected: |
263 | 212 | return zookeeper.recv_timeout(self.handle) | 305 | return zookeeper.recv_timeout(self.handle) |
264 | @@ -222,9 +315,14 @@ | |||
265 | 222 | 315 | ||
266 | 223 | @property | 316 | @property |
267 | 224 | def client_id(self): | 317 | def client_id(self): |
271 | 225 | """ | 318 | """Returns the client id that identifies the server side session. |
272 | 226 | The connection's client id, useful when introspecting the server logs | 319 | |
273 | 227 | for specific client activity. | 320 | A client id is a tuple represented by the session id and |
274 | 321 | session password. It can be used to manually connect to an | ||
275 | 322 | extant server session (which contains associated ephemeral | ||
276 | 323 | nodes and watches)/ The connection's client id is also useful | ||
277 | 324 | when introspecting the server logs for specific client | ||
278 | 325 | activity. | ||
279 | 228 | """ | 326 | """ |
280 | 229 | if self.handle is None: | 327 | if self.handle is None: |
281 | 230 | return None | 328 | return None |
282 | @@ -239,34 +337,33 @@ | |||
283 | 239 | return bool(zookeeper.is_unrecoverable(self.handle)) | 337 | return bool(zookeeper.is_unrecoverable(self.handle)) |
284 | 240 | 338 | ||
285 | 241 | def add_auth(self, scheme, identity): | 339 | def add_auth(self, scheme, identity): |
290 | 242 | """ | 340 | """Adds an authentication identity to this connection. |
291 | 243 | Adds an authentication identity to this connection. A connection | 341 | |
292 | 244 | can use multiple authentication identities at the same time, all | 342 | A connection can use multiple authentication identities at the |
293 | 245 | are checked when verifying acls on a node. | 343 | same time, all are checked when verifying acls on a node. |
294 | 246 | 344 | ||
295 | 247 | @param scheme: a string specifying a an authentication scheme | 345 | @param scheme: a string specifying a an authentication scheme |
296 | 248 | valid values include 'digest'. | 346 | valid values include 'digest'. |
299 | 249 | @param identity: a string containingusername and password colon | 347 | @param identity: a string containing username and password colon |
300 | 250 | separated for example 'mary:apples' | 348 | separated, for example 'mary:apples' |
301 | 251 | """ | 349 | """ |
302 | 252 | self._check_connected() | ||
303 | 253 | d = defer.Deferred() | 350 | d = defer.Deferred() |
304 | 351 | if self._check_connected(d): | ||
305 | 352 | return d | ||
306 | 254 | 353 | ||
307 | 255 | def _cb_authenticated(result_code): | 354 | def _cb_authenticated(result_code): |
311 | 256 | error = self._check_result(result_code, True) | 355 | if self._check_result(result_code, d): |
312 | 257 | if error: | 356 | return |
310 | 258 | return d.errback(error) | ||
313 | 259 | d.callback(self) | 357 | d.callback(self) |
314 | 260 | 358 | ||
315 | 261 | callback = self._zk_thread_callback(_cb_authenticated) | 359 | callback = self._zk_thread_callback(_cb_authenticated) |
316 | 262 | result = zookeeper.add_auth(self.handle, scheme, identity, callback) | 360 | result = zookeeper.add_auth(self.handle, scheme, identity, callback) |
318 | 263 | self._check_result(result) | 361 | self._check_result(result, d) |
319 | 264 | return d | 362 | return d |
320 | 265 | 363 | ||
321 | 266 | def close(self, force=False): | 364 | def close(self, force=False): |
322 | 267 | """ | 365 | """ |
325 | 268 | Close the underlying socket connection and zookeeper server side | 366 | Close the underlying socket connection and server side session. |
324 | 269 | session. | ||
326 | 270 | 367 | ||
327 | 271 | @param force: boolean, require the connection to be closed now or | 368 | @param force: boolean, require the connection to be closed now or |
328 | 272 | an exception be raised. | 369 | an exception be raised. |
329 | @@ -276,23 +373,31 @@ | |||
330 | 276 | 373 | ||
331 | 277 | result = zookeeper.close(self.handle) | 374 | result = zookeeper.close(self.handle) |
332 | 278 | self.connected = False | 375 | self.connected = False |
337 | 279 | self._check_result(result) | 376 | d = defer.Deferred() |
338 | 280 | return result | 377 | |
339 | 281 | 378 | if self._check_result(result, d): | |
340 | 282 | def connect(self, servers=None, timeout=10): | 379 | return d |
341 | 380 | d.callback(True) | ||
342 | 381 | return d | ||
343 | 382 | |||
344 | 383 | def connect(self, servers=None, timeout=10, client_id=None): | ||
345 | 283 | """ | 384 | """ |
346 | 284 | Establish a connection to the given zookeeper server(s). | 385 | Establish a connection to the given zookeeper server(s). |
347 | 285 | 386 | ||
348 | 286 | @param servers: A string specifying the servers and their ports to | 387 | @param servers: A string specifying the servers and their ports to |
350 | 287 | connect to. | 388 | connect to. Multiple servers can be specified in |
351 | 389 | comma separated fashion. | ||
352 | 288 | @param timeout: How many seconds to wait on a connection to the | 390 | @param timeout: How many seconds to wait on a connection to the |
353 | 289 | zookeeper servers. | 391 | zookeeper servers. |
354 | 392 | |||
355 | 393 | @param session_id: | ||
356 | 290 | @returns A deferred that's fired when the connection is established. | 394 | @returns A deferred that's fired when the connection is established. |
357 | 291 | """ | 395 | """ |
358 | 292 | d = defer.Deferred() | 396 | d = defer.Deferred() |
359 | 293 | 397 | ||
360 | 294 | if self.connected: | 398 | if self.connected: |
362 | 295 | raise zookeeper.ZooKeeperException("Already Connected") | 399 | return defer.fail( |
363 | 400 | zookeeper.ZooKeeperException("Already Connected")) | ||
364 | 296 | 401 | ||
365 | 297 | # Use a scheduled function to ensure a timeout. | 402 | # Use a scheduled function to ensure a timeout. |
366 | 298 | def _check_timeout(): | 403 | def _check_timeout(): |
367 | @@ -311,31 +416,45 @@ | |||
368 | 311 | if servers is not None: | 416 | if servers is not None: |
369 | 312 | self._servers = servers | 417 | self._servers = servers |
370 | 313 | 418 | ||
373 | 314 | self.handle = zookeeper.init( | 419 | # Assemble client id if specified. |
374 | 315 | self._servers, callback, self._session_timeout) | 420 | if client_id: |
375 | 421 | self.handle = zookeeper.init( | ||
376 | 422 | self._servers, callback, self._session_timeout, client_id) | ||
377 | 423 | else: | ||
378 | 424 | self.handle = zookeeper.init( | ||
379 | 425 | self._servers, callback, self._session_timeout) | ||
380 | 316 | 426 | ||
381 | 317 | return d | 427 | return d |
382 | 318 | 428 | ||
383 | 319 | def _cb_connected( | 429 | def _cb_connected( |
384 | 320 | self, scheduled_timeout, connect_deferred, type, state, path): | 430 | self, scheduled_timeout, connect_deferred, type, state, path): |
385 | 431 | """This callback is invoked through the lifecycle of the connection. | ||
386 | 321 | 432 | ||
387 | 433 | It's used for all connection level events and session events. | ||
388 | 434 | """ | ||
389 | 322 | # Cancel the timeout delayed task if it hasn't fired. | 435 | # Cancel the timeout delayed task if it hasn't fired. |
390 | 323 | if scheduled_timeout.active(): | 436 | if scheduled_timeout.active(): |
391 | 324 | scheduled_timeout.cancel() | 437 | scheduled_timeout.cancel() |
392 | 325 | 438 | ||
393 | 439 | # Update connected boolean | ||
394 | 440 | if state == zookeeper.CONNECTED_STATE: | ||
395 | 441 | self.connected = True | ||
396 | 442 | elif state != zookeeper.CONNECTING_STATE: | ||
397 | 443 | self.connected = False | ||
398 | 444 | |||
399 | 326 | if connect_deferred.called: | 445 | if connect_deferred.called: |
400 | 327 | # If we timed out and then connected, then close the conn. | 446 | # If we timed out and then connected, then close the conn. |
403 | 328 | if state == zookeeper.CONNECTED_STATE: | 447 | if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called: |
402 | 329 | self.connected = True | ||
404 | 330 | self.close() | 448 | self.close() |
409 | 331 | # If the client is reused across multiple connect/close | 449 | |
410 | 332 | # cycles, and a previous connect timed out, then a | 450 | # Send session events to the callback, in addition to any |
411 | 333 | # subsequent connect may trigger the previous connect's | 451 | # duplicate session events that will be sent for extant watches. |
412 | 334 | # handler notifying of a CONNECTING_STATE, ignore. | 452 | if self._session_event_callback: |
413 | 453 | self._session_event_callback( | ||
414 | 454 | self, ClientEvent(type, state, path)) | ||
415 | 455 | |||
416 | 335 | return | 456 | return |
417 | 336 | elif state == zookeeper.CONNECTED_STATE: | 457 | elif state == zookeeper.CONNECTED_STATE: |
418 | 337 | # Connection established. | ||
419 | 338 | self.connected = True | ||
420 | 339 | connect_deferred.callback(self) | 458 | connect_deferred.callback(self) |
421 | 340 | return | 459 | return |
422 | 341 | 460 | ||
423 | @@ -351,19 +470,19 @@ | |||
424 | 351 | @params acls: A list of dictionaries specifying permissions. | 470 | @params acls: A list of dictionaries specifying permissions. |
425 | 352 | @params flags: Node creation flags (ephemeral, sequence, persistent) | 471 | @params flags: Node creation flags (ephemeral, sequence, persistent) |
426 | 353 | """ | 472 | """ |
427 | 354 | self._check_connected() | ||
428 | 355 | d = defer.Deferred() | 473 | d = defer.Deferred() |
429 | 474 | if self._check_connected(d): | ||
430 | 475 | return d | ||
431 | 356 | 476 | ||
432 | 357 | def _cb_created(result_code, path): | 477 | def _cb_created(result_code, path): |
436 | 358 | error = self._check_result(result_code, True) | 478 | if self._check_result(result_code, d): |
437 | 359 | if error: | 479 | return |
435 | 360 | return d.errback(error) | ||
438 | 361 | d.callback(path) | 480 | d.callback(path) |
439 | 362 | 481 | ||
440 | 363 | callback = self._zk_thread_callback(_cb_created) | 482 | callback = self._zk_thread_callback(_cb_created) |
441 | 364 | result = zookeeper.acreate( | 483 | result = zookeeper.acreate( |
442 | 365 | self.handle, path, data, acls, flags, callback) | 484 | self.handle, path, data, acls, flags, callback) |
444 | 366 | self._check_result(result) | 485 | self._check_result(result, d) |
445 | 367 | return d | 486 | return d |
446 | 368 | 487 | ||
447 | 369 | def delete(self, path, version=-1): | 488 | def delete(self, path, version=-1): |
448 | @@ -376,18 +495,18 @@ | |||
449 | 376 | @param path: the path of the node to be deleted. | 495 | @param path: the path of the node to be deleted. |
450 | 377 | @param version: the integer version of the node. | 496 | @param version: the integer version of the node. |
451 | 378 | """ | 497 | """ |
452 | 379 | self._check_connected() | ||
453 | 380 | d = defer.Deferred() | 498 | d = defer.Deferred() |
454 | 499 | if self._check_connected(d): | ||
455 | 500 | return d | ||
456 | 381 | 501 | ||
457 | 382 | def _cb_delete(result_code): | 502 | def _cb_delete(result_code): |
461 | 383 | error = self._check_result(result_code, True) | 503 | if self._check_result(result_code, d): |
462 | 384 | if error: | 504 | return |
460 | 385 | return d.errback(error) | ||
463 | 386 | d.callback(result_code) | 505 | d.callback(result_code) |
464 | 387 | 506 | ||
465 | 388 | callback = self._zk_thread_callback(_cb_delete) | 507 | callback = self._zk_thread_callback(_cb_delete) |
466 | 389 | result = zookeeper.adelete(self.handle, path, version, callback) | 508 | result = zookeeper.adelete(self.handle, path, version, callback) |
468 | 390 | self._check_result(result) | 509 | self._check_result(result, d) |
469 | 391 | return d | 510 | return d |
470 | 392 | 511 | ||
471 | 393 | def exists(self, path): | 512 | def exists(self, path): |
472 | @@ -395,8 +514,9 @@ | |||
473 | 395 | Check that the given node path exists. Returns a deferred that | 514 | Check that the given node path exists. Returns a deferred that |
474 | 396 | holds the node stat information if the node exists (created, | 515 | holds the node stat information if the node exists (created, |
475 | 397 | modified, version, etc.), or ``None`` if it does not exist. | 516 | modified, version, etc.), or ``None`` if it does not exist. |
476 | 517 | |||
477 | 518 | @param path: The path of the node whose existence will be checked. | ||
478 | 398 | """ | 519 | """ |
479 | 399 | |||
480 | 400 | return self._exists(path, None) | 520 | return self._exists(path, None) |
481 | 401 | 521 | ||
482 | 402 | def exists_and_watch(self, path): | 522 | def exists_and_watch(self, path): |
483 | @@ -406,20 +526,22 @@ | |||
484 | 406 | In addition to the deferred method result, this method returns | 526 | In addition to the deferred method result, this method returns |
485 | 407 | a deferred that is called back when the node is modified or | 527 | a deferred that is called back when the node is modified or |
486 | 408 | removed (once). | 528 | removed (once). |
487 | 529 | |||
488 | 530 | @param path: The path of the node whose existence will be checked. | ||
489 | 409 | """ | 531 | """ |
490 | 410 | |||
491 | 411 | d = defer.Deferred() | 532 | d = defer.Deferred() |
492 | 412 | 533 | ||
494 | 413 | def callback(*args): | 534 | def watcher(*args): |
495 | 414 | d.callback(ClientEvent(*args)) | 535 | d.callback(ClientEvent(*args)) |
497 | 415 | return self._exists(path, callback), d | 536 | return self._exists(path, watcher), d |
498 | 416 | 537 | ||
499 | 417 | def get(self, path): | 538 | def get(self, path): |
500 | 418 | """ | 539 | """ |
501 | 419 | Get the node's data for the given node path. Returns a | 540 | Get the node's data for the given node path. Returns a |
502 | 420 | deferred that holds the content of the node. | 541 | deferred that holds the content of the node. |
503 | 542 | |||
504 | 543 | @param path: The path of the node whose content will be retrieved. | ||
505 | 421 | """ | 544 | """ |
506 | 422 | |||
507 | 423 | return self._get(path, None) | 545 | return self._get(path, None) |
508 | 424 | 546 | ||
509 | 425 | def get_and_watch(self, path): | 547 | def get_and_watch(self, path): |
510 | @@ -429,17 +551,20 @@ | |||
511 | 429 | In addition to the deferred method result, this method returns | 551 | In addition to the deferred method result, this method returns |
512 | 430 | a deferred that is called back when the node is modified or | 552 | a deferred that is called back when the node is modified or |
513 | 431 | removed (once). | 553 | removed (once). |
514 | 554 | |||
515 | 555 | @param path: The path of the node whose content will be retrieved. | ||
516 | 432 | """ | 556 | """ |
517 | 433 | |||
518 | 434 | d = defer.Deferred() | 557 | d = defer.Deferred() |
519 | 435 | 558 | ||
521 | 436 | def callback(*args): | 559 | def watcher(*args): |
522 | 437 | d.callback(ClientEvent(*args)) | 560 | d.callback(ClientEvent(*args)) |
524 | 438 | return self._get(path, callback), d | 561 | return self._get(path, watcher), d |
525 | 439 | 562 | ||
526 | 440 | def get_children(self, path): | 563 | def get_children(self, path): |
527 | 441 | """ | 564 | """ |
528 | 442 | Get the ids of all children directly under the given path. | 565 | Get the ids of all children directly under the given path. |
529 | 566 | |||
530 | 567 | @param path: The path of the node whose children will be retrieved. | ||
531 | 443 | """ | 568 | """ |
532 | 444 | return self._get_children(path, None) | 569 | return self._get_children(path, None) |
533 | 445 | 570 | ||
534 | @@ -450,13 +575,14 @@ | |||
535 | 450 | In addition to the deferred method result, this method returns | 575 | In addition to the deferred method result, this method returns |
536 | 451 | a deferred that is called back when a change happens on the | 576 | a deferred that is called back when a change happens on the |
537 | 452 | provided path (once). | 577 | provided path (once). |
538 | 578 | |||
539 | 579 | @param path: The path of the node whose children will be retrieved. | ||
540 | 453 | """ | 580 | """ |
541 | 454 | |||
542 | 455 | d = defer.Deferred() | 581 | d = defer.Deferred() |
543 | 456 | 582 | ||
545 | 457 | def callback(*args): | 583 | def watcher(*args): |
546 | 458 | d.callback(ClientEvent(*args)) | 584 | d.callback(ClientEvent(*args)) |
548 | 459 | return self._get_children(path, callback), d | 585 | return self._get_children(path, watcher), d |
549 | 460 | 586 | ||
550 | 461 | def get_acl(self, path): | 587 | def get_acl(self, path): |
551 | 462 | """ | 588 | """ |
552 | @@ -464,19 +590,21 @@ | |||
553 | 464 | 590 | ||
554 | 465 | Each acl is a dictionary containing keys/values for scheme, id, | 591 | Each acl is a dictionary containing keys/values for scheme, id, |
555 | 466 | and perms. | 592 | and perms. |
556 | 593 | |||
557 | 594 | @param path: The path of the node whose acl will be retrieved. | ||
558 | 467 | """ | 595 | """ |
559 | 468 | self._check_connected() | ||
560 | 469 | d = defer.Deferred() | 596 | d = defer.Deferred() |
561 | 597 | if self._check_connected(d): | ||
562 | 598 | return d | ||
563 | 470 | 599 | ||
564 | 471 | def _cb_get_acl(result_code, acls, stat): | 600 | def _cb_get_acl(result_code, acls, stat): |
568 | 472 | error = self._check_result(result_code, True) | 601 | if self._check_result(result_code, d): |
569 | 473 | if error: | 602 | return |
567 | 474 | return d.errback(error) | ||
570 | 475 | d.callback((acls, stat)) | 603 | d.callback((acls, stat)) |
571 | 476 | 604 | ||
572 | 477 | callback = self._zk_thread_callback(_cb_get_acl) | 605 | callback = self._zk_thread_callback(_cb_get_acl) |
573 | 478 | result = zookeeper.aget_acl(self.handle, path, callback) | 606 | result = zookeeper.aget_acl(self.handle, path, callback) |
575 | 479 | self._check_result(result) | 607 | self._check_result(result, d) |
576 | 480 | return d | 608 | return d |
577 | 481 | 609 | ||
578 | 482 | def set_acl(self, path, acls, version=-1): | 610 | def set_acl(self, path, acls, version=-1): |
579 | @@ -502,19 +630,19 @@ | |||
580 | 502 | doesn't match the version on the server, then a | 630 | doesn't match the version on the server, then a |
581 | 503 | BadVersionException is raised. | 631 | BadVersionException is raised. |
582 | 504 | """ | 632 | """ |
583 | 505 | self._check_connected() | ||
584 | 506 | d = defer.Deferred() | 633 | d = defer.Deferred() |
585 | 634 | if self._check_connected(d): | ||
586 | 635 | return d | ||
587 | 507 | 636 | ||
588 | 508 | def _cb_set_acl(result_code): | 637 | def _cb_set_acl(result_code): |
592 | 509 | error = self._check_result(result_code, True) | 638 | if self._check_result(result_code, d): |
593 | 510 | if error: | 639 | return |
591 | 511 | return d.errback(error) | ||
594 | 512 | d.callback(result_code) | 640 | d.callback(result_code) |
595 | 513 | 641 | ||
596 | 514 | callback = self._zk_thread_callback(_cb_set_acl) | 642 | callback = self._zk_thread_callback(_cb_set_acl) |
597 | 515 | result = zookeeper.aset_acl( | 643 | result = zookeeper.aset_acl( |
598 | 516 | self.handle, path, version, acls, callback) | 644 | self.handle, path, version, acls, callback) |
600 | 517 | self._check_result(result) | 645 | self._check_result(result, d) |
601 | 518 | return d | 646 | return d |
602 | 519 | 647 | ||
603 | 520 | def set(self, path, data="", version=-1): | 648 | def set(self, path, data="", version=-1): |
604 | @@ -528,18 +656,18 @@ | |||
605 | 528 | @param data: The data to store on the node. | 656 | @param data: The data to store on the node. |
606 | 529 | @param version: Integer version value | 657 | @param version: Integer version value |
607 | 530 | """ | 658 | """ |
608 | 531 | self._check_connected() | ||
609 | 532 | d = defer.Deferred() | 659 | d = defer.Deferred() |
610 | 660 | if self._check_connected(d): | ||
611 | 661 | return d | ||
612 | 533 | 662 | ||
613 | 534 | def _cb_set(result_code, node_stat): | 663 | def _cb_set(result_code, node_stat): |
617 | 535 | error = self._check_result(result_code, True) | 664 | if self._check_result(result_code, d): |
618 | 536 | if error: | 665 | return |
616 | 537 | return d.errback(error) | ||
619 | 538 | d.callback(node_stat) | 666 | d.callback(node_stat) |
620 | 539 | 667 | ||
621 | 540 | callback = self._zk_thread_callback(_cb_set) | 668 | callback = self._zk_thread_callback(_cb_set) |
622 | 541 | result = zookeeper.aset(self.handle, path, data, version, callback) | 669 | result = zookeeper.aset(self.handle, path, data, version, callback) |
624 | 542 | self._check_result(result) | 670 | self._check_result(result, d) |
625 | 543 | return d | 671 | return d |
626 | 544 | 672 | ||
627 | 545 | def set_connection_watcher(self, watcher): | 673 | def set_connection_watcher(self, watcher): |
628 | @@ -549,25 +677,72 @@ | |||
629 | 549 | 677 | ||
630 | 550 | @param: watcher function | 678 | @param: watcher function |
631 | 551 | """ | 679 | """ |
632 | 680 | if not callable(watcher): | ||
633 | 681 | raise SyntaxError("Invalid Watcher %r" % (watcher)) | ||
634 | 552 | watcher = self._wrap_watcher(watcher) | 682 | watcher = self._wrap_watcher(watcher) |
635 | 553 | zookeeper.set_watcher(self.handle, watcher) | 683 | zookeeper.set_watcher(self.handle, watcher) |
636 | 554 | 684 | ||
637 | 685 | def set_session_callback(self, callback): | ||
638 | 686 | """Set a callback to receive session events. | ||
639 | 687 | |||
640 | 688 | Session events are by default ignored. Interested applications | ||
641 | 689 | may choose to set a session event watcher on the connection | ||
642 | 690 | to receive session events. Session events are typically broadcast | ||
643 | 691 | by the libzookeeper library to all extant watchers, but the | ||
644 | 692 | twisted integration using deferreds is not capable of receiving | ||
645 | 693 | multiple values (session events and watch events), so this | ||
646 | 694 | client implementation instead provides for a user defined callback | ||
647 | 695 | to be invoked with them instead. The callback receives a single | ||
648 | 696 | parameter, the session event in the form of a ClientEvent instance. | ||
649 | 697 | |||
650 | 698 | Additional details on session events | ||
651 | 699 | ------------------------------------ | ||
652 | 700 | http://bit.ly/mQrOMY | ||
653 | 701 | http://bit.ly/irKpfn | ||
654 | 702 | """ | ||
655 | 703 | if not callable(callback): | ||
656 | 704 | raise TypeError("Invalid callback %r" % callback) | ||
657 | 705 | self._session_event_callback = callback | ||
658 | 706 | |||
659 | 707 | def set_connection_error_callback(self, callback): | ||
660 | 708 | """Set a callback to receive connection error exceptions. | ||
661 | 709 | |||
662 | 710 | By default the error will be raised when the client API | ||
663 | 711 | call is made. Setting a connection level error handler allows | ||
664 | 712 | applications to centralize their handling of connection loss, | ||
665 | 713 | instead of having to guard every zk interaction. | ||
666 | 714 | |||
667 | 715 | The callback receives two parameters, the client instance | ||
668 | 716 | and the exception. | ||
669 | 717 | """ | ||
670 | 718 | if not callable(callback): | ||
671 | 719 | raise TypeError("Invalid callback %r" % callback) | ||
672 | 720 | self._connection_error_callback = callback | ||
673 | 721 | |||
674 | 722 | def set_determinstic_order(self, boolean): | ||
675 | 723 | """ | ||
676 | 724 | The zookeeper client will by default randomize the server hosts | ||
677 | 725 | it will connect to unless this is set to True. | ||
678 | 726 | |||
679 | 727 | This is a global setting across connections. | ||
680 | 728 | """ | ||
681 | 729 | zookeeper.deterministic_conn_order(bool(boolean)) | ||
682 | 730 | |||
683 | 555 | def sync(self, path="/"): | 731 | def sync(self, path="/"): |
686 | 556 | """ | 732 | """Flushes the connected zookeeper server with the leader. |
685 | 557 | Flushes the zookeeper connection to the leader. | ||
687 | 558 | 733 | ||
688 | 559 | @param path: The root path to flush, all child nodes are also flushed. | 734 | @param path: The root path to flush, all child nodes are also flushed. |
689 | 560 | """ | 735 | """ |
690 | 561 | self._check_connected() | ||
691 | 562 | d = defer.Deferred() | 736 | d = defer.Deferred() |
692 | 737 | if self._check_connected(d): | ||
693 | 738 | return d | ||
694 | 563 | 739 | ||
695 | 564 | def _cb_sync(result_code, path): | 740 | def _cb_sync(result_code, path): |
699 | 565 | error = self._check_result(result_code, True) | 741 | if self._check_result(result_code, d): |
700 | 566 | if error: | 742 | return |
698 | 567 | return d.errback(error) | ||
701 | 568 | d.callback(path) | 743 | d.callback(path) |
702 | 569 | 744 | ||
703 | 570 | callback = self._zk_thread_callback(_cb_sync) | 745 | callback = self._zk_thread_callback(_cb_sync) |
704 | 571 | result = zookeeper.async(self.handle, path, callback) | 746 | result = zookeeper.async(self.handle, path, callback) |
706 | 572 | self._check_result(result) | 747 | self._check_result(result, d) |
707 | 573 | return d | 748 | return d |
708 | 574 | 749 | ||
709 | === modified file 'txzookeeper/queue.py' | |||
710 | --- txzookeeper/queue.py 2011-06-17 23:52:34 +0000 | |||
711 | +++ txzookeeper/queue.py 2011-07-05 13:29:55 +0000 | |||
712 | @@ -90,7 +90,7 @@ | |||
713 | 90 | def on_queue_items_changed(*args): | 90 | def on_queue_items_changed(*args): |
714 | 91 | """Event watcher on queue node child events.""" | 91 | """Event watcher on queue node child events.""" |
715 | 92 | if request.complete or not self._client.connected: | 92 | if request.complete or not self._client.connected: |
717 | 93 | return # pragma: no cover | 93 | return # pragma: no cover |
718 | 94 | 94 | ||
719 | 95 | if request.processing_children: | 95 | if request.processing_children: |
720 | 96 | # If deferred stack is currently processing a set of children | 96 | # If deferred stack is currently processing a set of children |
721 | @@ -273,7 +273,7 @@ | |||
722 | 273 | """ | 273 | """ |
723 | 274 | 274 | ||
724 | 275 | def _item_processed_callback(self, result_code, item_path): | 275 | def _item_processed_callback(self, result_code, item_path): |
726 | 276 | return self._client.delete(item_path+"-processing") | 276 | return self._client.delete(item_path + "-processing") |
727 | 277 | 277 | ||
728 | 278 | def _filter_children(self, children, suffix="-processing"): | 278 | def _filter_children(self, children, suffix="-processing"): |
729 | 279 | """ | 279 | """ |
730 | @@ -300,7 +300,7 @@ | |||
731 | 300 | 300 | ||
732 | 301 | def on_node_exists(stat, path): | 301 | def on_node_exists(stat, path): |
733 | 302 | """Reserve the node for consumer processing.""" | 302 | """Reserve the node for consumer processing.""" |
735 | 303 | d = self._client.create(path+"-processing", | 303 | d = self._client.create(path + "-processing", |
736 | 304 | flags=zookeeper.EPHEMERAL) | 304 | flags=zookeeper.EPHEMERAL) |
737 | 305 | d.addCallback(on_reservation_success, path) | 305 | d.addCallback(on_reservation_success, path) |
738 | 306 | d.addErrback(on_reservation_failed) | 306 | d.addErrback(on_reservation_failed) |
739 | @@ -315,7 +315,7 @@ | |||
740 | 315 | 315 | ||
741 | 316 | def on_get_node_failed(failure, path): | 316 | def on_get_node_failed(failure, path): |
742 | 317 | """If we can't fetch the node, delete the processing node.""" | 317 | """If we can't fetch the node, delete the processing node.""" |
744 | 318 | d = self._client.delete(path+"-processing") | 318 | d = self._client.delete(path + "-processing") |
745 | 319 | 319 | ||
746 | 320 | # propogate unexpected errors appropriately | 320 | # propogate unexpected errors appropriately |
747 | 321 | if not failure.check(zookeeper.NoNodeException): | 321 | if not failure.check(zookeeper.NoNodeException): |
748 | @@ -372,7 +372,7 @@ | |||
749 | 372 | 372 | ||
750 | 373 | def __init__(self, path, client, acl=None, persistent=False): | 373 | def __init__(self, path, client, acl=None, persistent=False): |
751 | 374 | super(SerializedQueue, self).__init__(path, client, acl, persistent) | 374 | super(SerializedQueue, self).__init__(path, client, acl, persistent) |
753 | 375 | self._lock = Lock("%s/%s"%(self.path, "_lock"), client) | 375 | self._lock = Lock("%s/%s" % (self.path, "_lock"), client) |
754 | 376 | 376 | ||
755 | 377 | def _item_processed_callback(self, result_code, item_path): | 377 | def _item_processed_callback(self, result_code, item_path): |
756 | 378 | return self._lock.release() | 378 | return self._lock.release() |
757 | 379 | 379 | ||
758 | === modified file 'txzookeeper/tests/__init__.py' | |||
759 | --- txzookeeper/tests/__init__.py 2011-06-17 23:52:34 +0000 | |||
760 | +++ txzookeeper/tests/__init__.py 2011-07-05 13:29:55 +0000 | |||
761 | @@ -36,8 +36,8 @@ | |||
762 | 36 | 36 | ||
763 | 37 | def tearDown(self): | 37 | def tearDown(self): |
764 | 38 | super(ZookeeperTestCase, self).tearDown() | 38 | super(ZookeeperTestCase, self).tearDown() |
767 | 39 | zookeeper.set_log_stream(sys.stderr) # reset to default | 39 | #zookeeper.set_log_stream(sys.stderr) # reset to default |
768 | 40 | zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG) | 40 | #zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG) |
769 | 41 | self.log_file.close() | 41 | self.log_file.close() |
770 | 42 | 42 | ||
771 | 43 | def get_log(self): | 43 | def get_log(self): |
772 | 44 | 44 | ||
773 | === added file 'txzookeeper/tests/common.py' | |||
774 | --- txzookeeper/tests/common.py 1970-01-01 00:00:00 +0000 | |||
775 | +++ txzookeeper/tests/common.py 2011-07-05 13:29:55 +0000 | |||
776 | @@ -0,0 +1,216 @@ | |||
777 | 1 | import os | ||
778 | 2 | import os.path | ||
779 | 3 | import shutil | ||
780 | 4 | import subprocess | ||
781 | 5 | import tempfile | ||
782 | 6 | |||
783 | 7 | from itertools import chain | ||
784 | 8 | from collections import namedtuple | ||
785 | 9 | from glob import glob | ||
786 | 10 | |||
787 | 11 | |||
788 | 12 | ServerInfo = namedtuple( | ||
789 | 13 | "ServerInfo", "server_id client_port election_port leader_port") | ||
790 | 14 | |||
791 | 15 | |||
792 | 16 | class ManagedZooKeeper(object): | ||
793 | 17 | """Class to manage the running of a ZooKeeper instance for testing. | ||
794 | 18 | |||
795 | 19 | Note: no attempt is made to probe the ZooKeeper instance is | ||
796 | 20 | actually available, or that the selected port is free. In the | ||
797 | 21 | future, we may want to do that, especially when run in a | ||
798 | 22 | Hudson/Buildbot context, to ensure more test robustness.""" | ||
799 | 23 | |||
800 | 24 | def __init__(self, software_path, server_info, peers=()): | ||
801 | 25 | """Define the ZooKeeper test instance. | ||
802 | 26 | |||
803 | 27 | @param install_path: The path to the install for ZK | ||
804 | 28 | @param port: The port to run the managed ZK instance | ||
805 | 29 | """ | ||
806 | 30 | self.install_path = software_path | ||
807 | 31 | self.server_info = server_info | ||
808 | 32 | self.host = "127.0.0.1" | ||
809 | 33 | self.peers = peers | ||
810 | 34 | self.working_path = tempfile.mkdtemp() | ||
811 | 35 | self._running = False | ||
812 | 36 | |||
813 | 37 | def run(self): | ||
814 | 38 | """Run the ZooKeeper instance under a temporary directory. | ||
815 | 39 | |||
816 | 40 | Writes ZK log messages to zookeeper.log in the current directory. | ||
817 | 41 | """ | ||
818 | 42 | config_path = os.path.join(self.working_path, "zoo.cfg") | ||
819 | 43 | log_path = os.path.join(self.working_path, "log") | ||
820 | 44 | log4j_path = os.path.join(self.working_path, "log4j.properties") | ||
821 | 45 | data_path = os.path.join(self.working_path, "data") | ||
822 | 46 | |||
823 | 47 | # various setup steps | ||
824 | 48 | if not os.path.exists(self.working_path): | ||
825 | 49 | os.mdir(self.working_path) | ||
826 | 50 | if not os.path.exists(log_path): | ||
827 | 51 | os.mkdir(log_path) | ||
828 | 52 | if not os.path.exists(data_path): | ||
829 | 53 | os.mkdir(data_path) | ||
830 | 54 | |||
831 | 55 | with open(config_path, "w") as config: | ||
832 | 56 | config.write(""" | ||
833 | 57 | tickTime=2000 | ||
834 | 58 | dataDir=%s | ||
835 | 59 | clientPort=%s | ||
836 | 60 | maxClientCnxns=0 | ||
837 | 61 | """ % (data_path, self.server_info.client_port)) | ||
838 | 62 | |||
839 | 63 | # setup a replicated setup if peers are specified | ||
840 | 64 | if self.peers: | ||
841 | 65 | servers_cfg = [] | ||
842 | 66 | for p in chain((self.server_info,), self.peers): | ||
843 | 67 | servers_cfg.append("server.%s=localhost:%s:%s" % ( | ||
844 | 68 | p.server_id, p.leader_port, p.election_port)) | ||
845 | 69 | |||
846 | 70 | with open(config_path, "a") as config: | ||
847 | 71 | config.write(""" | ||
848 | 72 | initLimit=4 | ||
849 | 73 | syncLimit=2 | ||
850 | 74 | %s | ||
851 | 75 | """ % ("\n".join(servers_cfg))) | ||
852 | 76 | |||
853 | 77 | # Write server ids into datadir | ||
854 | 78 | with open(os.path.join(data_path, "myid"), "w") as myid_file: | ||
855 | 79 | myid_file.write(str(self.server_info.server_id)) | ||
856 | 80 | |||
857 | 81 | with open(log4j_path, "w") as log4j: | ||
858 | 82 | log4j.write(""" | ||
859 | 83 | # DEFAULT: console appender only | ||
860 | 84 | log4j.rootLogger=INFO, ROLLINGFILE | ||
861 | 85 | log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout | ||
862 | 86 | log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n | ||
863 | 87 | log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender | ||
864 | 88 | log4j.appender.ROLLINGFILE.Threshold=DEBUG | ||
865 | 89 | log4j.appender.ROLLINGFILE.File=""" + ( | ||
866 | 90 | self.working_path + os.sep + "zookeeper.log\n")) | ||
867 | 91 | |||
868 | 92 | self.process = subprocess.Popen( | ||
869 | 93 | args=["java", | ||
870 | 94 | "-cp", self.classpath, | ||
871 | 95 | "-Dzookeeper.log.dir=%s" % log_path, | ||
872 | 96 | "-Dzookeeper.root.logger=INFO,CONSOLE", | ||
873 | 97 | "-Dlog4j.configuration=file:%s" % log4j_path, | ||
874 | 98 | # "-Dlog4j.debug", | ||
875 | 99 | "org.apache.zookeeper.server.quorum.QuorumPeerMain", | ||
876 | 100 | config_path], | ||
877 | 101 | ) | ||
878 | 102 | self._running = True | ||
879 | 103 | |||
880 | 104 | @property | ||
881 | 105 | def classpath(self): | ||
882 | 106 | """Get the classpath necessary to run ZooKeeper.""" | ||
883 | 107 | |||
884 | 108 | # Two possibilities, as seen in zkEnv.sh: | ||
885 | 109 | # Check for a release - top-level zookeeper-*.jar? | ||
886 | 110 | jars = glob((os.path.join( | ||
887 | 111 | self.install_path, 'zookeeper-*.jar'))) | ||
888 | 112 | if jars: | ||
889 | 113 | # Relase build (`ant package`) | ||
890 | 114 | jars.extend(glob(os.path.join( | ||
891 | 115 | self.install_path, | ||
892 | 116 | "lib/*.jar"))) | ||
893 | 117 | else: | ||
894 | 118 | # Development build (plain `ant`) | ||
895 | 119 | jars = glob((os.path.join( | ||
896 | 120 | self.install_path, 'build/zookeeper-*.jar'))) | ||
897 | 121 | jars.extend(glob(os.path.join( | ||
898 | 122 | self.install_path, | ||
899 | 123 | "build/lib/*.jar"))) | ||
900 | 124 | return ":".join(jars) | ||
901 | 125 | |||
902 | 126 | @property | ||
903 | 127 | def address(self): | ||
904 | 128 | """Get the address of the ZooKeeper instance.""" | ||
905 | 129 | return "%s:%s" % (self.host, self.client_port) | ||
906 | 130 | |||
907 | 131 | @property | ||
908 | 132 | def running(self): | ||
909 | 133 | return self._running | ||
910 | 134 | |||
911 | 135 | @property | ||
912 | 136 | def client_port(self): | ||
913 | 137 | return self.server_info.client_port | ||
914 | 138 | |||
915 | 139 | def reset(self): | ||
916 | 140 | """Stop the zookeeper instance, cleaning out its on disk-data.""" | ||
917 | 141 | self.stop() | ||
918 | 142 | shutil.rmtree(os.path.join(self.working_path, "data")) | ||
919 | 143 | os.mkdir(os.path.join(self.working_path, "data")) | ||
920 | 144 | with open(os.path.join(self.working_path, "data", "myid"), "w") as fh: | ||
921 | 145 | fh.write(str(self.server_info.server_id)) | ||
922 | 146 | |||
923 | 147 | def stop(self): | ||
924 | 148 | """Stop the Zookeeper instance, retaining on disk state.""" | ||
925 | 149 | if not self._running: | ||
926 | 150 | return | ||
927 | 151 | self.process.terminate() | ||
928 | 152 | self.process.wait() | ||
929 | 153 | self._running = False | ||
930 | 154 | |||
931 | 155 | def destroy(self): | ||
932 | 156 | """Stop the ZooKeeper instance and destroy its on disk-state""" | ||
933 | 157 | # called by at exit handler, reimport to avoid cleanup race. | ||
934 | 158 | import shutil | ||
935 | 159 | self.stop() | ||
936 | 160 | |||
937 | 161 | shutil.rmtree(self.working_path) | ||
938 | 162 | |||
939 | 163 | |||
940 | 164 | class ZookeeperCluster(object): | ||
941 | 165 | |||
942 | 166 | def __init__(self, install_path, size=3, port_offset=20000): | ||
943 | 167 | self._install_path = install_path | ||
944 | 168 | self._servers = [] | ||
945 | 169 | |||
946 | 170 | # Calculate ports and peer group | ||
947 | 171 | port = port_offset | ||
948 | 172 | peers = [] | ||
949 | 173 | |||
950 | 174 | for i in range(size): | ||
951 | 175 | port += i * 10 | ||
952 | 176 | info = ServerInfo(i + 1, port, port + 1, port + 2) | ||
953 | 177 | peers.append(info) | ||
954 | 178 | |||
955 | 179 | # Instantiate Managed ZK Servers | ||
956 | 180 | for i in range(size): | ||
957 | 181 | server_peers = list(peers) | ||
958 | 182 | server_info = server_peers.pop(i) | ||
959 | 183 | self._servers.append( | ||
960 | 184 | ManagedZooKeeper( | ||
961 | 185 | self._install_path, server_info, server_peers)) | ||
962 | 186 | |||
963 | 187 | def __getitem__(self, k): | ||
964 | 188 | return self._servers[k] | ||
965 | 189 | |||
966 | 190 | def __iter__(self): | ||
967 | 191 | return iter(self._servers) | ||
968 | 192 | |||
969 | 193 | def start(self): | ||
970 | 194 | # Zookeeper client expresses a preference for either lower ports or | ||
971 | 195 | # lexographical ordering of hosts, to ensure that all servers have a | ||
972 | 196 | # chance to startup, start them in reverse order. | ||
973 | 197 | for server in reversed(list(self)): | ||
974 | 198 | server.run() | ||
975 | 199 | # Giving the servers a moment to start, decreases the overall time | ||
976 | 200 | # required for a client to successfully connect (2s vs. 4s without | ||
977 | 201 | # the sleep). | ||
978 | 202 | import time | ||
979 | 203 | time.sleep(2) | ||
980 | 204 | |||
981 | 205 | def stop(self): | ||
982 | 206 | for server in self: | ||
983 | 207 | server.stop() | ||
984 | 208 | self._servers = [] | ||
985 | 209 | |||
986 | 210 | def terminate(self): | ||
987 | 211 | for server in self: | ||
988 | 212 | server.destroy() | ||
989 | 213 | |||
990 | 214 | def reset(self): | ||
991 | 215 | for server in self: | ||
992 | 216 | server.reset() | ||
993 | 0 | 217 | ||
994 | === modified file 'txzookeeper/tests/mocker.py' | |||
995 | --- txzookeeper/tests/mocker.py 2011-06-08 21:14:20 +0000 | |||
996 | +++ txzookeeper/tests/mocker.py 2011-07-05 13:29:55 +0000 | |||
997 | @@ -1923,6 +1923,7 @@ | |||
998 | 1923 | def run(self, path): | 1923 | def run(self, path): |
999 | 1924 | return Mock(self.mocker, path) | 1924 | return Mock(self.mocker, path) |
1000 | 1925 | 1925 | ||
1001 | 1926 | |||
1002 | 1926 | def mock_returner_recorder(mocker, event): | 1927 | def mock_returner_recorder(mocker, event): |
1003 | 1927 | """Events that lead to other events must return mock objects.""" | 1928 | """Events that lead to other events must return mock objects.""" |
1004 | 1928 | parent_path = event.path.parent_path | 1929 | parent_path = event.path.parent_path |
1005 | 1929 | 1930 | ||
1006 | === modified file 'txzookeeper/tests/test_client.py' | |||
1007 | --- txzookeeper/tests/test_client.py 2011-06-17 23:52:34 +0000 | |||
1008 | +++ txzookeeper/tests/test_client.py 2011-07-05 13:29:55 +0000 | |||
1009 | @@ -23,19 +23,25 @@ | |||
1010 | 23 | 23 | ||
1011 | 24 | from twisted.internet.defer import Deferred | 24 | from twisted.internet.defer import Deferred |
1012 | 25 | from twisted.internet.base import DelayedCall | 25 | from twisted.internet.base import DelayedCall |
1013 | 26 | from twisted.python.failure import Failure | ||
1014 | 26 | 27 | ||
1015 | 27 | import zookeeper | 28 | import zookeeper |
1016 | 28 | 29 | ||
1017 | 30 | from mocker import ANY, MATCH | ||
1018 | 31 | from txzookeeper.tests import ZookeeperTestCase, utils | ||
1019 | 29 | from txzookeeper.client import ( | 32 | from txzookeeper.client import ( |
1020 | 30 | ZookeeperClient, ZOO_OPEN_ACL_UNSAFE, ConnectionTimeoutException, | 33 | ZookeeperClient, ZOO_OPEN_ACL_UNSAFE, ConnectionTimeoutException, |
1025 | 31 | ConnectionException, ClientEvent) | 34 | ConnectionException, NotConnectedException, ClientEvent) |
1022 | 32 | |||
1023 | 33 | from mocker import ANY | ||
1024 | 34 | from txzookeeper.tests import ZookeeperTestCase, utils | ||
1026 | 35 | 35 | ||
1027 | 36 | PUBLIC_ACL = ZOO_OPEN_ACL_UNSAFE | 36 | PUBLIC_ACL = ZOO_OPEN_ACL_UNSAFE |
1028 | 37 | 37 | ||
1029 | 38 | 38 | ||
1030 | 39 | def match_deferred(arg): | ||
1031 | 40 | return isinstance(arg, Deferred) | ||
1032 | 41 | |||
1033 | 42 | DEFERRED_MATCH = MATCH(match_deferred) | ||
1034 | 43 | |||
1035 | 44 | |||
1036 | 39 | class ClientTests(ZookeeperTestCase): | 45 | class ClientTests(ZookeeperTestCase): |
1037 | 40 | 46 | ||
1038 | 41 | def setUp(self): | 47 | def setUp(self): |
1039 | @@ -47,9 +53,10 @@ | |||
1040 | 47 | if self.client.connected: | 53 | if self.client.connected: |
1041 | 48 | utils.deleteTree(handle=self.client.handle) | 54 | utils.deleteTree(handle=self.client.handle) |
1042 | 49 | self.client.close() | 55 | self.client.close() |
1044 | 50 | del self.client | 56 | |
1045 | 51 | if self.client2 and self.client2.connected: | 57 | if self.client2 and self.client2.connected: |
1046 | 52 | self.client2.close() | 58 | self.client2.close() |
1047 | 59 | |||
1048 | 53 | super(ClientTests, self).tearDown() | 60 | super(ClientTests, self).tearDown() |
1049 | 54 | 61 | ||
1050 | 55 | def test_wb_connect_after_timeout(self): | 62 | def test_wb_connect_after_timeout(self): |
1051 | @@ -113,8 +120,10 @@ | |||
1052 | 113 | return d | 120 | return d |
1053 | 114 | 121 | ||
1054 | 115 | def test_client_event_repr(self): | 122 | def test_client_event_repr(self): |
1057 | 116 | event = ClientEvent(4, 'state', 'path') | 123 | event = ClientEvent(zookeeper.SESSION_EVENT, |
1058 | 117 | self.assertEqual(repr(event), "<ClientEvent child at 'path'>") | 124 | zookeeper.EXPIRED_SESSION_STATE, '') |
1059 | 125 | self.assertEqual(repr(event), | ||
1060 | 126 | "<ClientEvent session at '' state: expired>") | ||
1061 | 118 | 127 | ||
1062 | 119 | def test_client_event_attributes(self): | 128 | def test_client_event_attributes(self): |
1063 | 120 | event = ClientEvent(4, 'state', 'path') | 129 | event = ClientEvent(4, 'state', 'path') |
1064 | @@ -123,6 +132,10 @@ | |||
1065 | 123 | self.assertEqual(event.path, 'path') | 132 | self.assertEqual(event.path, 'path') |
1066 | 124 | self.assertEqual(event, (4, 'state', 'path')) | 133 | self.assertEqual(event, (4, 'state', 'path')) |
1067 | 125 | 134 | ||
1068 | 135 | def test_client_use_while_disconnected_returns_failure(self): | ||
1069 | 136 | return self.assertFailure( | ||
1070 | 137 | self.client.exists("/"), NotConnectedException) | ||
1071 | 138 | |||
1072 | 126 | def test_create_ephemeral_node_and_close_connection(self): | 139 | def test_create_ephemeral_node_and_close_connection(self): |
1073 | 127 | """ | 140 | """ |
1074 | 128 | The client can create transient nodes that are destroyed when the | 141 | The client can create transient nodes that are destroyed when the |
1075 | @@ -307,14 +320,17 @@ | |||
1076 | 307 | return self.client.create("/foobar-watched", "rabbit") | 320 | return self.client.create("/foobar-watched", "rabbit") |
1077 | 308 | 321 | ||
1078 | 309 | def get_node(path): | 322 | def get_node(path): |
1080 | 310 | return self.client.get_and_watch(path) | 323 | data, watch = self.client.get_and_watch(path) |
1081 | 324 | return data.addCallback(lambda x: (watch,)) | ||
1082 | 311 | 325 | ||
1084 | 312 | def new_connection((data, watch)): | 326 | def new_connection((watch,)): |
1085 | 313 | self.client2 = ZookeeperClient("127.0.0.1:2181") | 327 | self.client2 = ZookeeperClient("127.0.0.1:2181") |
1087 | 314 | return self.client2.connect(), watch | 328 | return self.client2.connect().addCallback( |
1088 | 329 | lambda x, y=None, z=None: (x, watch)) | ||
1089 | 315 | 330 | ||
1090 | 316 | def trigger_watch((client, watch)): | 331 | def trigger_watch((client, watch)): |
1091 | 317 | zookeeper.delete(self.client2.handle, "/foobar-watched") | 332 | zookeeper.delete(self.client2.handle, "/foobar-watched") |
1092 | 333 | self.client2.close() | ||
1093 | 318 | return watch | 334 | return watch |
1094 | 319 | 335 | ||
1095 | 320 | def verify_watch(event): | 336 | def verify_watch(event): |
1096 | @@ -385,11 +401,16 @@ | |||
1097 | 385 | """ | 401 | """ |
1098 | 386 | d = self.client.connect() | 402 | d = self.client.connect() |
1099 | 387 | 403 | ||
1100 | 404 | def inject_error(result_code, d, extra_codes=None): | ||
1101 | 405 | error = SyntaxError() | ||
1102 | 406 | d.errback(error) | ||
1103 | 407 | return error | ||
1104 | 408 | |||
1105 | 388 | def check_exists(client): | 409 | def check_exists(client): |
1106 | 389 | mock_client = self.mocker.patch(client) | 410 | mock_client = self.mocker.patch(client) |
1107 | 390 | mock_client._check_result( | 411 | mock_client._check_result( |
1110 | 391 | ANY, True, extra_codes=(zookeeper.NONODE,)) | 412 | ANY, DEFERRED_MATCH, extra_codes=(zookeeper.NONODE,)) |
1111 | 392 | self.mocker.result(SyntaxError()) | 413 | self.mocker.call(inject_error) |
1112 | 393 | self.mocker.replay() | 414 | self.mocker.replay() |
1113 | 394 | return client.exists("/zebra-moon") | 415 | return client.exists("/zebra-moon") |
1114 | 395 | 416 | ||
1115 | @@ -655,20 +676,21 @@ | |||
1116 | 655 | return d | 676 | return d |
1117 | 656 | 677 | ||
1118 | 657 | def test_get_children_with_error(self): | 678 | def test_get_children_with_error(self): |
1119 | 679 | """If the result of an api call is an error, its propgated. | ||
1120 | 680 | """ | ||
1121 | 658 | d = self.client.connect() | 681 | d = self.client.connect() |
1122 | 659 | 682 | ||
1123 | 660 | def get_children(client): | 683 | def get_children(client): |
1128 | 661 | mock_client = self.mocker.patch(self.client) | 684 | # Get the children of a nonexistant node |
1125 | 662 | mock_client._check_result(ANY, True) | ||
1126 | 663 | self.mocker.result(SyntaxError()) | ||
1127 | 664 | self.mocker.replay() | ||
1129 | 665 | return client.get_children("/tower") | 685 | return client.get_children("/tower") |
1130 | 666 | 686 | ||
1131 | 667 | def verify_failure(failure): | 687 | def verify_failure(failure): |
1133 | 668 | self.assertTrue(isinstance(failure.value, SyntaxError)) | 688 | self.assertTrue(isinstance(failure, Failure)) |
1134 | 689 | self.assertTrue( | ||
1135 | 690 | isinstance(failure.value, zookeeper.NoNodeException)) | ||
1136 | 669 | 691 | ||
1137 | 670 | d.addCallback(get_children) | 692 | d.addCallback(get_children) |
1139 | 671 | d.addErrback(verify_failure) | 693 | d.addBoth(verify_failure) |
1140 | 672 | return d | 694 | return d |
1141 | 673 | 695 | ||
1142 | 674 | # seems to be a segfault on this one, must be running latest zk | 696 | # seems to be a segfault on this one, must be running latest zk |
1143 | @@ -825,7 +847,7 @@ | |||
1144 | 825 | def verify_node_access(stat): | 847 | def verify_node_access(stat): |
1145 | 826 | self.assertEqual(stat['version'], 1) | 848 | self.assertEqual(stat['version'], 1) |
1146 | 827 | self.assertEqual(stat['dataLength'], 3) | 849 | self.assertEqual(stat['dataLength'], 3) |
1148 | 828 | self.assertTrue(failed) # we should have hit the errback | 850 | self.assertTrue(failed) # we should have hit the errback |
1149 | 829 | 851 | ||
1150 | 830 | d.addCallback(add_auth_one) | 852 | d.addCallback(add_auth_one) |
1151 | 831 | d.addCallback(create_node) | 853 | d.addCallback(create_node) |
1152 | @@ -905,10 +927,6 @@ | |||
1153 | 905 | acl = dict(scheme="digest", id="a:b", perms=zookeeper.PERM_ALL) | 927 | acl = dict(scheme="digest", id="a:b", perms=zookeeper.PERM_ALL) |
1154 | 906 | 928 | ||
1155 | 907 | def set_acl(client): | 929 | def set_acl(client): |
1156 | 908 | mock_client = self.mocker.patch(client) | ||
1157 | 909 | mock_client._check_result(ANY, True) | ||
1158 | 910 | self.mocker.result(zookeeper.NoNodeException()) | ||
1159 | 911 | self.mocker.replay() | ||
1160 | 912 | return client.set_acl("/zebra-moon22", [acl]) | 930 | return client.set_acl("/zebra-moon22", [acl]) |
1161 | 913 | 931 | ||
1162 | 914 | def verify_failure(failure): | 932 | def verify_failure(failure): |
1163 | @@ -946,23 +964,22 @@ | |||
1164 | 946 | """ | 964 | """ |
1165 | 947 | d = self.client.connect() | 965 | d = self.client.connect() |
1166 | 948 | 966 | ||
1169 | 949 | def create_node(client): | 967 | def inject_error(result, d): |
1170 | 950 | return client.create("/moose") | 968 | error = zookeeper.ZooKeeperException() |
1171 | 969 | d.errback(error) | ||
1172 | 970 | return error | ||
1173 | 951 | 971 | ||
1174 | 952 | def get_acl(path): | 972 | def get_acl(path): |
1180 | 953 | mock_client = self.mocker.patch(self.client) | 973 | # Get the ACL of a nonexistant node |
1181 | 954 | mock_client._check_result(ANY, True) | 974 | return self.client.get_acl("/moose") |
1177 | 955 | self.mocker.result(zookeeper.ZooKeeperException("foobar")) | ||
1178 | 956 | self.mocker.replay() | ||
1179 | 957 | return self.client.get_acl(path) | ||
1182 | 958 | 975 | ||
1183 | 959 | def verify_failure(failure): | 976 | def verify_failure(failure): |
1184 | 977 | self.assertTrue(isinstance(failure, Failure)) | ||
1185 | 960 | self.assertTrue( | 978 | self.assertTrue( |
1186 | 961 | isinstance(failure.value, zookeeper.ZooKeeperException)) | 979 | isinstance(failure.value, zookeeper.ZooKeeperException)) |
1187 | 962 | 980 | ||
1188 | 963 | d.addCallback(create_node) | ||
1189 | 964 | d.addCallback(get_acl) | 981 | d.addCallback(get_acl) |
1191 | 965 | d.addErrback(verify_failure) | 982 | d.addBoth(verify_failure) |
1192 | 966 | return d | 983 | return d |
1193 | 967 | 984 | ||
1194 | 968 | def test_client_id(self): | 985 | def test_client_id(self): |
1195 | @@ -1006,35 +1023,6 @@ | |||
1196 | 1006 | d.addCallback(verify_sync) | 1023 | d.addCallback(verify_sync) |
1197 | 1007 | return d | 1024 | return d |
1198 | 1008 | 1025 | ||
1199 | 1009 | def test_sync_error(self): | ||
1200 | 1010 | """ | ||
1201 | 1011 | On error the sync callback returns a an exception/failure. | ||
1202 | 1012 | """ | ||
1203 | 1013 | d = self.client.connect() | ||
1204 | 1014 | |||
1205 | 1015 | def create_node(client): | ||
1206 | 1016 | return client.create("/abc") | ||
1207 | 1017 | |||
1208 | 1018 | def client_sync(path): | ||
1209 | 1019 | mock_client = self.mocker.patch(self.client) | ||
1210 | 1020 | mock_client._check_result(ANY, True) | ||
1211 | 1021 | self.mocker.result(zookeeper.ZooKeeperException("foobar")) | ||
1212 | 1022 | self.mocker.replay() | ||
1213 | 1023 | return self.client.sync(path) | ||
1214 | 1024 | |||
1215 | 1025 | def verify_failure(failure): | ||
1216 | 1026 | self.assertTrue( | ||
1217 | 1027 | isinstance(failure.value, zookeeper.ZooKeeperException)) | ||
1218 | 1028 | |||
1219 | 1029 | def assert_failed(extra): | ||
1220 | 1030 | self.fail("Should have gone to errback") | ||
1221 | 1031 | |||
1222 | 1032 | d.addCallback(create_node) | ||
1223 | 1033 | d.addCallback(client_sync) | ||
1224 | 1034 | d.addCallback(assert_failed) | ||
1225 | 1035 | d.addErrback(verify_failure) | ||
1226 | 1036 | return d | ||
1227 | 1037 | |||
1228 | 1038 | def test_property_servers(self): | 1026 | def test_property_servers(self): |
1229 | 1039 | """ | 1027 | """ |
1230 | 1040 | The servers property of the client, shows which if any servers | 1028 | The servers property of the client, shows which if any servers |
1231 | @@ -1087,13 +1075,67 @@ | |||
1232 | 1087 | return client.set_connection_watcher(1) | 1075 | return client.set_connection_watcher(1) |
1233 | 1088 | 1076 | ||
1234 | 1089 | def verify_invalid(failure): | 1077 | def verify_invalid(failure): |
1236 | 1090 | self.assertEqual(failure.value.args, ("invalid watcher",)) | 1078 | self.assertEqual(failure.value.args, ("Invalid Watcher 1",)) |
1237 | 1091 | self.assertTrue(isinstance(failure.value, SyntaxError)) | 1079 | self.assertTrue(isinstance(failure.value, SyntaxError)) |
1238 | 1092 | 1080 | ||
1239 | 1093 | d.addCallback(set_invalid_watcher) | 1081 | d.addCallback(set_invalid_watcher) |
1240 | 1094 | d.addErrback(verify_invalid) | 1082 | d.addErrback(verify_invalid) |
1241 | 1095 | return d | 1083 | return d |
1242 | 1096 | 1084 | ||
1243 | 1085 | def xtest_session_expired_event(self): | ||
1244 | 1086 | """ | ||
1245 | 1087 | A client session can be reattached to in a separate connection, | ||
1246 | 1088 | if the a session is expired, using the zookeeper connection will | ||
1247 | 1089 | raise a SessionExpiredException. | ||
1248 | 1090 | """ | ||
1249 | 1091 | d = self.client.connect() | ||
1250 | 1092 | |||
1251 | 1093 | class StopTest(Exception): | ||
1252 | 1094 | pass | ||
1253 | 1095 | |||
1254 | 1096 | def new_connection_same_connection(client): | ||
1255 | 1097 | self.assertEqual(client.state, zookeeper.CONNECTED_STATE) | ||
1256 | 1098 | return ZookeeperClient("127.0.0.1:2181").connect( | ||
1257 | 1099 | client_id=client.client_id).addErrback( | ||
1258 | 1100 | guard_session_expired, client) | ||
1259 | 1101 | |||
1260 | 1102 | def guard_session_expired(failure, client): | ||
1261 | 1103 | # On occassion we get a session expired event while connecting. | ||
1262 | 1104 | failure.trap(ConnectionException) | ||
1263 | 1105 | self.assertEqual(failure.value.state_name, "expired") | ||
1264 | 1106 | # Stop the test from proceeding | ||
1265 | 1107 | raise StopTest() | ||
1266 | 1108 | |||
1267 | 1109 | def close_new_connection(client): | ||
1268 | 1110 | # Verify both connections are using same session | ||
1269 | 1111 | self.assertEqual(self.client.client_id, client.client_id) | ||
1270 | 1112 | self.assertEqual(client.state, zookeeper.CONNECTED_STATE) | ||
1271 | 1113 | |||
1272 | 1114 | # Closing one connection will close the session | ||
1273 | 1115 | client.close() | ||
1274 | 1116 | |||
1275 | 1117 | # Continued use of the other client will get a | ||
1276 | 1118 | # disconnect exception. | ||
1277 | 1119 | return self.client.exists("/") | ||
1278 | 1120 | |||
1279 | 1121 | def verify_original_closed(failure): | ||
1280 | 1122 | if not isinstance(failure, Failure): | ||
1281 | 1123 | self.fail("Test did not raise exception.") | ||
1282 | 1124 | failure.trap( | ||
1283 | 1125 | zookeeper.SessionExpiredException, | ||
1284 | 1126 | zookeeper.ConnectionLossException) | ||
1285 | 1127 | |||
1286 | 1128 | #print "client close" | ||
1287 | 1129 | self.client.close() | ||
1288 | 1130 | #print "creating new client for teardown" | ||
1289 | 1131 | return self.client.connect() | ||
1290 | 1132 | |||
1291 | 1133 | d.addCallback(new_connection_same_connection) | ||
1292 | 1134 | d.addCallback(close_new_connection) | ||
1293 | 1135 | d.addBoth(verify_original_closed) | ||
1294 | 1136 | |||
1295 | 1137 | return d | ||
1296 | 1138 | |||
1297 | 1097 | def test_connect_with_server(self): | 1139 | def test_connect_with_server(self): |
1298 | 1098 | """ | 1140 | """ |
1299 | 1099 | A client's servers can be specified in the connect method. | 1141 | A client's servers can be specified in the connect method. |
1300 | @@ -1164,14 +1206,14 @@ | |||
1301 | 1164 | All of the client apis (with the exception of connect) attempt | 1206 | All of the client apis (with the exception of connect) attempt |
1302 | 1165 | to ensure the client is connected before executing an operation. | 1207 | to ensure the client is connected before executing an operation. |
1303 | 1166 | """ | 1208 | """ |
1312 | 1167 | self.assertRaises( | 1209 | self.assertFailure( |
1313 | 1168 | zookeeper.ZooKeeperException, self.client.get_children, "/abc") | 1210 | self.client.get_children("/abc"), zookeeper.ZooKeeperException) |
1314 | 1169 | 1211 | ||
1315 | 1170 | self.assertRaises( | 1212 | self.assertFailure( |
1316 | 1171 | zookeeper.ZooKeeperException, self.client.create, "/abc") | 1213 | self.client.create("/abc"), zookeeper.ZooKeeperException) |
1317 | 1172 | 1214 | ||
1318 | 1173 | self.assertRaises( | 1215 | self.assertFailure( |
1319 | 1174 | zookeeper.ZooKeeperException, self.client.set, "/abc", "123") | 1216 | self.client.set("/abc", "123"), zookeeper.ZooKeeperException) |
1320 | 1175 | 1217 | ||
1321 | 1176 | def test_connect_multiple_raises(self): | 1218 | def test_connect_multiple_raises(self): |
1322 | 1177 | """ | 1219 | """ |
1323 | @@ -1181,8 +1223,9 @@ | |||
1324 | 1181 | d = self.client.connect() | 1223 | d = self.client.connect() |
1325 | 1182 | 1224 | ||
1326 | 1183 | def connect_again(client): | 1225 | def connect_again(client): |
1329 | 1184 | self.assertRaises( | 1226 | d = client.connect() |
1330 | 1185 | zookeeper.ZooKeeperException, client.connect) | 1227 | self.failUnlessFailure(d, zookeeper.ZooKeeperException) |
1331 | 1228 | return d | ||
1332 | 1186 | 1229 | ||
1333 | 1187 | d.addCallback(connect_again) | 1230 | d.addCallback(connect_again) |
1334 | 1188 | return d | 1231 | return d |
1335 | @@ -1199,18 +1242,18 @@ | |||
1336 | 1199 | d = self.client.connect() | 1242 | d = self.client.connect() |
1337 | 1200 | 1243 | ||
1338 | 1201 | def verify_failure(client): | 1244 | def verify_failure(client): |
1341 | 1202 | self.assertRaises( | 1245 | d = client.create("/abc") |
1342 | 1203 | zookeeper.ZooKeeperException, client.create, "/abc") | 1246 | self.failUnlessFailure(d, zookeeper.ZooKeeperException) |
1343 | 1204 | 1247 | ||
1344 | 1205 | d.addCallback(verify_failure) | 1248 | d.addCallback(verify_failure) |
1345 | 1206 | return d | 1249 | return d |
1346 | 1207 | 1250 | ||
1347 | 1208 | def test_connection_watcher(self): | 1251 | def test_connection_watcher(self): |
1348 | 1209 | """ | 1252 | """ |
1353 | 1210 | A connection watcher can be set that recieves notices on when the | 1253 | A connection watcher can be set that receives notices on when |
1354 | 1211 | connection state changes. Technically zookeeper would also use this as | 1254 | the connection state changes. Technically zookeeper would also |
1355 | 1212 | a global watcher for node state changes, but zkpython doesn't expose | 1255 | use this as a global watcher for node watches, but zkpython |
1356 | 1213 | that api, as its mostly considered legacy. | 1256 | doesn't expose that api, as its mostly considered legacy. |
1357 | 1214 | 1257 | ||
1358 | 1215 | its out of scope to simulate a connection level event within unit tests | 1258 | its out of scope to simulate a connection level event within unit tests |
1359 | 1216 | such as the server restarting. | 1259 | such as the server restarting. |
1360 | @@ -1243,3 +1286,13 @@ | |||
1361 | 1243 | If the client is not connected, closing returns None. | 1286 | If the client is not connected, closing returns None. |
1362 | 1244 | """ | 1287 | """ |
1363 | 1245 | self.assertEqual(self.client.close(), None) | 1288 | self.assertEqual(self.client.close(), None) |
1364 | 1289 | |||
1365 | 1290 | def test_invalid_connection_error_callback(self): | ||
1366 | 1291 | self.assertRaises(TypeError, | ||
1367 | 1292 | self.client.set_connection_error_callback, | ||
1368 | 1293 | None) | ||
1369 | 1294 | |||
1370 | 1295 | def test_invalid_session_callback(self): | ||
1371 | 1296 | self.assertRaises(TypeError, | ||
1372 | 1297 | self.client.set_session_callback, | ||
1373 | 1298 | None) | ||
1374 | 1246 | 1299 | ||
1375 | === modified file 'txzookeeper/tests/test_node.py' | |||
1376 | --- txzookeeper/tests/test_node.py 2011-06-17 23:52:34 +0000 | |||
1377 | +++ txzookeeper/tests/test_node.py 2011-07-05 13:29:55 +0000 | |||
1378 | @@ -59,7 +59,7 @@ | |||
1379 | 59 | def _make_digest_identity(self, credentials): | 59 | def _make_digest_identity(self, credentials): |
1380 | 60 | user, password = credentials.split(":") | 60 | user, password = credentials.split(":") |
1381 | 61 | digest = hashlib.new("sha1", credentials).digest() | 61 | digest = hashlib.new("sha1", credentials).digest() |
1383 | 62 | return "%s:%s"%(user, base64.b64encode(digest)) | 62 | return "%s:%s" % (user, base64.b64encode(digest)) |
1384 | 63 | 63 | ||
1385 | 64 | def test_node_name_and_path(self): | 64 | def test_node_name_and_path(self): |
1386 | 65 | """ | 65 | """ |
1387 | 66 | 66 | ||
1388 | === modified file 'txzookeeper/tests/test_queue.py' | |||
1389 | --- txzookeeper/tests/test_queue.py 2011-06-17 23:52:34 +0000 | |||
1390 | +++ txzookeeper/tests/test_queue.py 2011-07-05 13:29:55 +0000 | |||
1391 | @@ -175,7 +175,7 @@ | |||
1392 | 175 | watch = Deferred() | 175 | watch = Deferred() |
1393 | 176 | self.mocker.result((succeed(["entry-000000"]), watch)) | 176 | self.mocker.result((succeed(["entry-000000"]), watch)) |
1394 | 177 | 177 | ||
1396 | 178 | item_path = "%s/%s"%(path, "entry-000000") | 178 | item_path = "%s/%s" % (path, "entry-000000") |
1397 | 179 | mock_client.get(item_path) | 179 | mock_client.get(item_path) |
1398 | 180 | self.mocker.result(fail(SyntaxError("x"))) | 180 | self.mocker.result(fail(SyntaxError("x"))) |
1399 | 181 | self.mocker.replay() | 181 | self.mocker.replay() |
1400 | @@ -230,13 +230,13 @@ | |||
1401 | 230 | producer_done = Deferred() | 230 | producer_done = Deferred() |
1402 | 231 | 231 | ||
1403 | 232 | def iteration(i): | 232 | def iteration(i): |
1405 | 233 | if len(items) == (item_count-1): | 233 | if len(items) == (item_count - 1): |
1406 | 234 | return producer_done.callback(None) | 234 | return producer_done.callback(None) |
1407 | 235 | items.append(i) | 235 | items.append(i) |
1408 | 236 | queue.put(str(i)) | 236 | queue.put(str(i)) |
1409 | 237 | 237 | ||
1410 | 238 | for i in range(item_count): | 238 | for i in range(item_count): |
1412 | 239 | reactor.callLater(i*0.05, iteration, i) | 239 | reactor.callLater(i * 0.05, iteration, i) |
1413 | 240 | yield producer_done | 240 | yield producer_done |
1414 | 241 | returnValue(items) | 241 | returnValue(items) |
1415 | 242 | 242 | ||
1416 | @@ -284,7 +284,7 @@ | |||
1417 | 284 | def producer(start, offset): | 284 | def producer(start, offset): |
1418 | 285 | client = yield self.open_client() | 285 | client = yield self.open_client() |
1419 | 286 | q = self.queue_factory(path, client) | 286 | q = self.queue_factory(path, client) |
1421 | 287 | for i in range(start, start+offset): | 287 | for i in range(start, start + offset): |
1422 | 288 | yield q.put(str(i)) | 288 | yield q.put(str(i)) |
1423 | 289 | produce_results.append(str(i)) | 289 | produce_results.append(str(i)) |
1424 | 290 | 290 | ||
1425 | @@ -311,7 +311,7 @@ | |||
1426 | 311 | yield DeferredList( | 311 | yield DeferredList( |
1427 | 312 | [consumer(8), consumer(8), consumer(4)]) | 312 | [consumer(8), consumer(8), consumer(4)]) |
1428 | 313 | 313 | ||
1430 | 314 | err = set(produce_results)-set(consume_results) | 314 | err = set(produce_results) - set(consume_results) |
1431 | 315 | self.assertFalse(err) | 315 | self.assertFalse(err) |
1432 | 316 | 316 | ||
1433 | 317 | self.assertEqual(len(consume_results), len(produce_results)) | 317 | self.assertEqual(len(consume_results), len(produce_results)) |
1434 | 318 | 318 | ||
1435 | === modified file 'txzookeeper/tests/test_security.py' | |||
1436 | --- txzookeeper/tests/test_security.py 2011-06-17 23:52:34 +0000 | |||
1437 | +++ txzookeeper/tests/test_security.py 2011-07-05 13:29:55 +0000 | |||
1438 | @@ -73,9 +73,9 @@ | |||
1439 | 73 | def connect_users(self, *users): | 73 | def connect_users(self, *users): |
1440 | 74 | clients = [] | 74 | clients = [] |
1441 | 75 | for name in users: | 75 | for name in users: |
1443 | 76 | ident_user = getattr(self, "ident_%s"%(name), None) | 76 | ident_user = getattr(self, "ident_%s" % (name), None) |
1444 | 77 | if ident_user is None: | 77 | if ident_user is None: |
1446 | 78 | raise AttributeError("Invalid User %s"%(name)) | 78 | raise AttributeError("Invalid User %s" % (name)) |
1447 | 79 | client = ZookeeperClient("127.0.0.1:2181", 3000) | 79 | client = ZookeeperClient("127.0.0.1:2181", 3000) |
1448 | 80 | clients.append(client) | 80 | clients.append(client) |
1449 | 81 | yield self.open_and_authenticate(client, ident_user) | 81 | yield self.open_and_authenticate(client, ident_user) |
1450 | 82 | 82 | ||
1451 | === added file 'txzookeeper/tests/test_session.py' | |||
1452 | --- txzookeeper/tests/test_session.py 1970-01-01 00:00:00 +0000 | |||
1453 | +++ txzookeeper/tests/test_session.py 2011-07-05 13:29:55 +0000 | |||
1454 | @@ -0,0 +1,245 @@ | |||
1455 | 1 | |||
1456 | 2 | import atexit | ||
1457 | 3 | import os | ||
1458 | 4 | |||
1459 | 5 | import zookeeper | ||
1460 | 6 | |||
1461 | 7 | from twisted.internet import reactor | ||
1462 | 8 | from twisted.internet.defer import inlineCallbacks, Deferred, returnValue | ||
1463 | 9 | |||
1464 | 10 | from txzookeeper import ZookeeperClient | ||
1465 | 11 | from txzookeeper.client import NotConnectedException, ConnectionException | ||
1466 | 12 | |||
1467 | 13 | from txzookeeper.tests.common import ZookeeperCluster | ||
1468 | 14 | from txzookeeper.tests import ZookeeperTestCase | ||
1469 | 15 | |||
1470 | 16 | |||
1471 | 17 | ZK_HOME = os.environ.get("ZOOKEEPER_PATH") | ||
1472 | 18 | assert ZK_HOME, "ZOOKEEPER_PATH environment variable must be defined" | ||
1473 | 19 | |||
1474 | 20 | CLUSTER = ZookeeperCluster(ZK_HOME) | ||
1475 | 21 | atexit.register(lambda cluster: cluster.terminate(), CLUSTER) | ||
1476 | 22 | |||
1477 | 23 | |||
1478 | 24 | class ClientSessionTests(ZookeeperTestCase): | ||
1479 | 25 | |||
1480 | 26 | def setUp(self): | ||
1481 | 27 | super(ClientSessionTests, self).setUp() | ||
1482 | 28 | self.cluster.start() | ||
1483 | 29 | self.client = None | ||
1484 | 30 | self.client2 = None | ||
1485 | 31 | zookeeper.deterministic_conn_order(True) | ||
1486 | 32 | zookeeper.set_debug_level(0) | ||
1487 | 33 | |||
1488 | 34 | def sleep(self, delay): | ||
1489 | 35 | """Non-blocking sleep.""" | ||
1490 | 36 | deferred = Deferred() | ||
1491 | 37 | reactor.callLater(delay, deferred.callback, None) | ||
1492 | 38 | return deferred | ||
1493 | 39 | |||
1494 | 40 | @property | ||
1495 | 41 | def cluster(self): | ||
1496 | 42 | return CLUSTER | ||
1497 | 43 | |||
1498 | 44 | def tearDown(self): | ||
1499 | 45 | super(ClientSessionTests, self).tearDown() | ||
1500 | 46 | self.cluster.reset() | ||
1501 | 47 | |||
1502 | 48 | @inlineCallbacks | ||
1503 | 49 | def test_client_session_migration(self): | ||
1504 | 50 | """A client will automatically rotate servers to ensure a connection. | ||
1505 | 51 | |||
1506 | 52 | A client connected to multiple servers, will transparently | ||
1507 | 53 | migrate amongst them, as individual servers can no longer be | ||
1508 | 54 | reached. A client's session will be maintined. | ||
1509 | 55 | """ | ||
1510 | 56 | # Connect to the Zookeeper Cluster | ||
1511 | 57 | servers = ",".join([s.address for s in self.cluster]) | ||
1512 | 58 | self.client = ZookeeperClient(servers) | ||
1513 | 59 | yield self.client.connect() | ||
1514 | 60 | yield self.client.create("/hello", flags=zookeeper.EPHEMERAL) | ||
1515 | 61 | |||
1516 | 62 | # Shutdown the server the client is connected to | ||
1517 | 63 | self.cluster[0].stop() | ||
1518 | 64 | |||
1519 | 65 | # Wait for the shutdown and cycle, if we don't wait we'll | ||
1520 | 66 | # get a zookeeper connectionloss exception on occassion. | ||
1521 | 67 | yield self.sleep(0.1) | ||
1522 | 68 | |||
1523 | 69 | self.assertTrue(self.client.connected) | ||
1524 | 70 | exists = yield self.client.exists("/hello") | ||
1525 | 71 | self.assertTrue(exists) | ||
1526 | 72 | |||
1527 | 73 | @inlineCallbacks | ||
1528 | 74 | def test_client_watch_migration(self): | ||
1529 | 75 | """On server rotation, extant watches are still active. | ||
1530 | 76 | |||
1531 | 77 | A client connected to multiple servers, will transparently | ||
1532 | 78 | migrate amongst them, as individual servers can no longer be | ||
1533 | 79 | reached. Watch deferreds issued from the same client instance will | ||
1534 | 80 | continue to function as the session is maintained. | ||
1535 | 81 | """ | ||
1536 | 82 | session_events = [] | ||
1537 | 83 | |||
1538 | 84 | def session_event_callback(connection, e): | ||
1539 | 85 | session_events.append(e) | ||
1540 | 86 | |||
1541 | 87 | # Connect to the Zookeeper Cluster | ||
1542 | 88 | servers = ",".join([s.address for s in self.cluster]) | ||
1543 | 89 | self.client = ZookeeperClient(servers) | ||
1544 | 90 | self.client.set_session_callback(session_event_callback) | ||
1545 | 91 | yield self.client.connect() | ||
1546 | 92 | |||
1547 | 93 | # Setup a watch | ||
1548 | 94 | yield self.client.create("/hello") | ||
1549 | 95 | exists_d, watch_d = self.client.exists_and_watch("/hello") | ||
1550 | 96 | yield exists_d | ||
1551 | 97 | |||
1552 | 98 | # Shutdown the server the client is connected to | ||
1553 | 99 | self.cluster[0].stop() | ||
1554 | 100 | |||
1555 | 101 | # Wait for the shutdown and cycle, if we don't wait we'll | ||
1556 | 102 | # get occasionally get a zookeeper connectionloss exception. | ||
1557 | 103 | yield self.sleep(0.1) | ||
1558 | 104 | |||
1559 | 105 | # The session events that would have been ignored are sent | ||
1560 | 106 | # to the session event callback. | ||
1561 | 107 | self.assertTrue(session_events) | ||
1562 | 108 | self.assertTrue(self.client.connected) | ||
1563 | 109 | |||
1564 | 110 | # If we delete the node, we'll see the watch fire. | ||
1565 | 111 | yield self.client.delete("/hello") | ||
1566 | 112 | event = yield watch_d | ||
1567 | 113 | self.assertEqual(event.type_name, "deleted") | ||
1568 | 114 | self.assertEqual(event.path, "/hello") | ||
1569 | 115 | |||
1570 | 116 | @inlineCallbacks | ||
1571 | 117 | def test_connection_error_handler(self): | ||
1572 | 118 | """A callback can be specified for connection errors. | ||
1573 | 119 | |||
1574 | 120 | We can specify a callback for connection errors, that | ||
1575 | 121 | can perform recovery for a disconnected client, restablishing | ||
1576 | 122 | """ | ||
1577 | 123 | @inlineCallbacks | ||
1578 | 124 | def connection_error_handler(connection, error): | ||
1579 | 125 | # On loss of the connection, reconnect the client w/ same session. | ||
1580 | 126 | yield connection.connect( | ||
1581 | 127 | self.cluster[1].address, client_id=connection.client_id) | ||
1582 | 128 | returnValue(23) | ||
1583 | 129 | |||
1584 | 130 | self.client = ZookeeperClient(self.cluster[0].address) | ||
1585 | 131 | self.client.set_connection_error_callback(connection_error_handler) | ||
1586 | 132 | yield self.client.connect() | ||
1587 | 133 | |||
1588 | 134 | yield self.client.create("/hello") | ||
1589 | 135 | exists_d, watch_d = self.client.exists_and_watch("/hello") | ||
1590 | 136 | yield exists_d | ||
1591 | 137 | |||
1592 | 138 | # Shutdown the server the client is connected to | ||
1593 | 139 | self.cluster[0].stop() | ||
1594 | 140 | yield self.sleep(0.1) | ||
1595 | 141 | |||
1596 | 142 | # Results in connection loss exception, and invoking of error handler. | ||
1597 | 143 | result = yield self.client.exists("/hello") | ||
1598 | 144 | |||
1599 | 145 | # The result of the error handler is returned to the api | ||
1600 | 146 | self.assertEqual(result, 23) | ||
1601 | 147 | |||
1602 | 148 | exists = yield self.client.exists("/hello") | ||
1603 | 149 | self.assertTrue(exists) | ||
1604 | 150 | |||
1605 | 151 | @inlineCallbacks | ||
1606 | 152 | def test_client_session_expiration_event(self): | ||
1607 | 153 | """A client which recieves a session expiration event. | ||
1608 | 154 | """ | ||
1609 | 155 | session_events = [] | ||
1610 | 156 | events_received = Deferred() | ||
1611 | 157 | |||
1612 | 158 | def session_event_callback(connection, e): | ||
1613 | 159 | session_events.append(e) | ||
1614 | 160 | if len(session_events) == 4: | ||
1615 | 161 | events_received.callback(True) | ||
1616 | 162 | |||
1617 | 163 | # Connect to a node in the cluster and establish a watch | ||
1618 | 164 | self.client = ZookeeperClient(self.cluster[0].address) | ||
1619 | 165 | self.client.set_session_callback(session_event_callback) | ||
1620 | 166 | yield self.client.connect() | ||
1621 | 167 | |||
1622 | 168 | child_d, watch_d = self.client.get_children_and_watch("/") | ||
1623 | 169 | yield child_d | ||
1624 | 170 | |||
1625 | 171 | # Connect a client to the same session on a different node. | ||
1626 | 172 | self.client2 = ZookeeperClient(self.cluster[0].address) | ||
1627 | 173 | yield self.client2.connect(client_id=self.client.client_id) | ||
1628 | 174 | |||
1629 | 175 | # Close the new client and wait for the event propogation | ||
1630 | 176 | yield self.client2.close() | ||
1631 | 177 | |||
1632 | 178 | # It can take some time for this to propagate | ||
1633 | 179 | yield events_received | ||
1634 | 180 | self.assertEqual(len(session_events), 4) | ||
1635 | 181 | self.assertEqual(session_events[-1].state_name, "expired") | ||
1636 | 182 | |||
1637 | 183 | # The connection is dead without reconnecting. | ||
1638 | 184 | yield self.assertFailure( | ||
1639 | 185 | self.client.exists("/"), | ||
1640 | 186 | NotConnectedException, ConnectionException) | ||
1641 | 187 | |||
1642 | 188 | self.assertTrue(self.client.unrecoverable) | ||
1643 | 189 | |||
1644 | 190 | # If a reconnect attempt is made with a dead session id | ||
1645 | 191 | #yield self.client.connect(client_id=self.client.client_id) | ||
1646 | 192 | |||
1647 | 193 | test_client_session_expiration_event.timeout = 10 | ||
1648 | 194 | |||
1649 | 195 | @inlineCallbacks | ||
1650 | 196 | def test_client_reconnect_session_on_different_server(self): | ||
1651 | 197 | """On connection failure, An application can choose to use a | ||
1652 | 198 | new connection with which to reconnect to a different member | ||
1653 | 199 | of the zookeeper cluster, reacquiring the extant session. | ||
1654 | 200 | |||
1655 | 201 | A large obvious caveat to using a new client instance rather | ||
1656 | 202 | than reconnecting the existing client, is that even though the | ||
1657 | 203 | session has outstanding watches, the watch callbacks/deferreds | ||
1658 | 204 | won't be active unless the client instance used to create them | ||
1659 | 205 | is connected. | ||
1660 | 206 | """ | ||
1661 | 207 | session_events = [] | ||
1662 | 208 | |||
1663 | 209 | def session_event_callback(connection, e): | ||
1664 | 210 | session_events.append(e) | ||
1665 | 211 | |||
1666 | 212 | # Connect to a node in the cluster and establish a watch | ||
1667 | 213 | self.client = ZookeeperClient(self.cluster[2].address) | ||
1668 | 214 | self.client.set_session_callback(session_event_callback) | ||
1669 | 215 | yield self.client.connect() | ||
1670 | 216 | |||
1671 | 217 | yield self.client.create("/hello", flags=zookeeper.EPHEMERAL) | ||
1672 | 218 | exists_d, watch_d = self.client.exists_and_watch("/hello") | ||
1673 | 219 | yield exists_d | ||
1674 | 220 | |||
1675 | 221 | # Shutdown the server the client is connected to | ||
1676 | 222 | self.cluster[2].stop() | ||
1677 | 223 | yield self.sleep(0.1) | ||
1678 | 224 | |||
1679 | 225 | # Verify we got a session event regarding the down server | ||
1680 | 226 | self.assertTrue(session_events) | ||
1681 | 227 | |||
1682 | 228 | # Open up a new connection to a different server with same session | ||
1683 | 229 | self.client2 = ZookeeperClient(self.cluster[0].address) | ||
1684 | 230 | yield self.client2.connect(client_id=self.client.client_id) | ||
1685 | 231 | |||
1686 | 232 | # Close the old disconnected client | ||
1687 | 233 | self.client.close() | ||
1688 | 234 | |||
1689 | 235 | # Verify the ephemeral still exists | ||
1690 | 236 | exists = yield self.client2.exists("/hello") | ||
1691 | 237 | self.assertTrue(exists) | ||
1692 | 238 | |||
1693 | 239 | # Destroy the session and reconnect | ||
1694 | 240 | self.client2.close() | ||
1695 | 241 | yield self.client.connect(self.cluster[0].address) | ||
1696 | 242 | |||
1697 | 243 | # Ephemeral is destroyed when the session closed. | ||
1698 | 244 | exists = yield self.client.exists("/hello") | ||
1699 | 245 | self.assertFalse(exists) | ||
1700 | 0 | 246 | ||
1701 | === modified file 'txzookeeper/tests/test_utils.py' | |||
1702 | --- txzookeeper/tests/test_utils.py 2011-06-17 23:52:34 +0000 | |||
1703 | +++ txzookeeper/tests/test_utils.py 2011-07-05 13:29:55 +0000 | |||
1704 | @@ -34,7 +34,7 @@ | |||
1705 | 34 | def update_function_increment(self, content, stat): | 34 | def update_function_increment(self, content, stat): |
1706 | 35 | if not content: | 35 | if not content: |
1707 | 36 | return str(0) | 36 | return str(0) |
1709 | 37 | return str(int(content)+1) | 37 | return str(int(content) + 1) |
1710 | 38 | 38 | ||
1711 | 39 | def setUp(self): | 39 | def setUp(self): |
1712 | 40 | super(RetryChangeTest, self).setUp() | 40 | super(RetryChangeTest, self).setUp() |
1713 | 41 | 41 | ||
1714 | === modified file 'txzookeeper/tests/utils.py' | |||
1715 | --- txzookeeper/tests/utils.py 2011-06-17 23:52:34 +0000 | |||
1716 | +++ txzookeeper/tests/utils.py 2011-07-05 13:29:55 +0000 | |||
1717 | @@ -25,9 +25,9 @@ | |||
1718 | 25 | Destroy all the nodes in zookeeper (typically under a chroot for testing) | 25 | Destroy all the nodes in zookeeper (typically under a chroot for testing) |
1719 | 26 | """ | 26 | """ |
1720 | 27 | for child in zookeeper.get_children(handle, path): | 27 | for child in zookeeper.get_children(handle, path): |
1722 | 28 | if child == "zookeeper": # skip the metadata node | 28 | if child == "zookeeper": # skip the metadata node |
1723 | 29 | continue | 29 | continue |
1725 | 30 | child_path = "/"+("%s/%s"%(path, child)).strip("/") | 30 | child_path = "/" + ("%s/%s" % (path, child)).strip("/") |
1726 | 31 | try: | 31 | try: |
1727 | 32 | deleteTree(child_path, handle) | 32 | deleteTree(child_path, handle) |
1728 | 33 | zookeeper.delete(handle, child_path, -1) | 33 | zookeeper.delete(handle, child_path, -1) |
Please note that this branch has a pre-requisite branch also in review.