Merge lp:~hazmat/txzookeeper/session-event-handling into lp:txzookeeper

Proposed by Kapil Thangavelu
Status: Merged
Approved by: Gustavo Niemeyer
Approved revision: 54
Merged at revision: 41
Proposed branch: lp:~hazmat/txzookeeper/session-event-handling
Merge into: lp:txzookeeper
Diff against target: 1728 lines (+896/-206)
12 files modified
txzookeeper/client.py (+285/-110)
txzookeeper/queue.py (+5/-5)
txzookeeper/tests/__init__.py (+2/-2)
txzookeeper/tests/common.py (+216/-0)
txzookeeper/tests/mocker.py (+1/-0)
txzookeeper/tests/test_client.py (+131/-78)
txzookeeper/tests/test_node.py (+1/-1)
txzookeeper/tests/test_queue.py (+5/-5)
txzookeeper/tests/test_security.py (+2/-2)
txzookeeper/tests/test_session.py (+245/-0)
txzookeeper/tests/test_utils.py (+1/-1)
txzookeeper/tests/utils.py (+2/-2)
To merge this branch: bzr merge lp:~hazmat/txzookeeper/session-event-handling
Reviewer Review Type Date Requested Status
Gustavo Niemeyer Approve
Jim Baker Pending
Review via email: mp+66901@code.launchpad.net

This proposal supersedes a proposal from 2011-06-20.

Description of the change

Session event and connection loss handling for txzookeeper.

  - Session tests using a ZK cluster as a test layer/test resource, cluster state
    is reset between tests.
  - Zookeeper Client session tests for server rotation across multi-node cluster.
  - Client server rotation and session/watch migration tests.
  - Session event handling, sent to user defined callback, else ignored by default.
  - Connection loss handling, sent to user defined callback, else raised at the
    API call point. The callback result if any is returned as the error to be
    returned to the API.

To post a comment you must log in.
Revision history for this message
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal

Please note that this branch has a pre-requisite branch also in review.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote : Posted in a previous version of this proposal

This looks very good. A few comments, and +1 with them considered.

[1]

+ zookeeper.NOTWATCHING_EVENT: "notwatching",
+ zookeeper.SESSION_EVENT: "session",
+ zookeeper.CREATED_EVENT: 'created',

Quotes are inconsistent here.

Also, "not-watching" would be a bit more friendly to the eye.

[2]

+def debug_symbols(sequence):

Left over?

[3]

error = error_class(error_msg)
+
+ if ConnectionException.is_connection_exception(error):

This idiom feels a little weird. A Foo is necessarily a Foo,
so we never have Foo.is_foo. Unless you have some background
for the choice that I miss, I suggest just moving that to a
"is_connection_exception()" top-level function.

[4]

+ # The result of the connection error handler is returned
+ # to the api.

That's a nice design, despite our conversations around the problem of
connection level erroring.

I'm a bit concerned about the session events, though. A session-termination
event may be the only event the deferred ever sees, so if we divert it to
a separate callback, the logic pending on the deferred will be dead forever.

I'm not sure about what we should do here. Have you thought about this?

If the answer is to just be so for now, we should at least add a note to
the proper location in the code mentioning that these deferreds are dead.

[5]

+ d = defer.maybeDeferred(
+ self._connection_error_callback,
+ self, error)

For consistency, either both callbacks should take "self", or neither of them
should.

[6]

+ # XXX/Debug
+ #print
+ #print "Session/Conn Event", ClientEvent(type, state, path)
+

Some left overs.

[7]

+ self._check_result(result, d)
+ if not d.called:
+ d.callback(True)

This is checking if the deferred is being called synchronously, which seems
dangerous as a pattern. Theorethically, the deferred may not have fired by
the time the function returns.

I suggest using the interface defined (considering the suggestions from
the pre-requisite branch):

  if not self._failed(result, d):
      d.callback(True)
  return d

review: Approve
Revision history for this message
Jim Baker (jimbaker) wrote : Posted in a previous version of this proposal

+1, we certainly need this branch, and it looks quite solid.

In addition to Gustavo's comments:

[1]

Some minor grammar suggestions:

+ specified in comma separated fashion, if they are,

specified in comma separated fashion. If they are,

+ # If its a session event pass it to the session callback, else

If it's a session event, ...

+ nodes and watches)/ The connection's client id, is also useful
+ when introspecting the server logs for specific client
+ activity.

nodes and watches. The connection's client id is also...

+ Its used for all connection level events and session events.

It's used...

+ # Send session events to the callback, this in addition to any
+ # duplicate session events that will be sent for extant watches.

callback, in addition

+ """Set a callback to recieve session events.

receive

+ twisted integration using deferreds is not capable of recieving

receiving

+ to be invoked with them instead. The callback recieves a single

receives

+ call is made, by setting a connection level error handler,
+ applications can centralize their handling of connection loss,

Something like

By setting...

+ This is a global setting, across connections.

setting across all connections

+ A client session can be reattached to in a separate connection,
+ if the a session is expired, using the zookeeper connection will
+ raise a SessionExpiredException.

+ # On occassion we get a session expired event while connecting.

occasion

+ # Closing one connection, will close the session

Closing one connection will close the session.

+ A client connected to multiple servers, will transparently
+ migrate amongst them, as individual servers can no longer be
+ reached. A client's session will be maintined.

servers will transparently...

...will be maintained.

+ # get a zookeeper connectionloss exception on occassion.

occasion

+ We can specify a callback for connection errors, that
+ can perform recovery for a disconnected client, restablishing

This seems incomplete; also there should not be a comma before "that"

+ """On connection failure, An application can choose to use a
+ new connection with to reconnect to a different member of the
+ zookeeper cluster, reacquiring the extant session.

an application... with which to reconnect...

Maybe this concludes with "This enables reacquiring the existing
session." (Extant doesn't seem like the right word choice here.)

+ A large obvious caveat to using a new client instance rather
+ than reconnecting the existing client, is that even though the

instance, rather than...

+ # Ephemeral is destroyed when the session closed.

the session is closed.

[2]

+ http://bit.ly/mQrOMY
+ http://bit.ly/irKpfn

Why not just expand these links out? Alternatively use a readable URL
(the underlying nabble URL is modestly readable, I don't know how
permalink either one is, however).

[3]

+ assert callable(callback)

Shouldn't this be throwing a TypeError instead? Also, there is no
correponding assertion (or verification) on
set_connection_error_callback. Lastly, there is no test on this path.

[4]

+ def xtest_session_expired_event(self):

Do you want to include this test?

review: Approve
Revision history for this message
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal

Excerpts from Gustavo Niemeyer's message of Mon Jun 20 18:28:28 UTC 2011:
> Review: Approve
> This looks very good. A few comments, and +1 with them considered.
>

1-3 addressed.

>
> [4]
>
> + # The result of the connection error handler is returned
> + # to the api.
>
> That's a nice design, despite our conversations around the problem of
> connection level erroring.
>
> I'm a bit concerned about the session events, though. A session-termination
> event may be the only event the deferred ever sees, so if we divert it to
> a separate callback, the logic pending on the deferred will be dead forever.
>
> I'm not sure about what we should do here. Have you thought about this?
>
> If the answer is to just be so for now, we should at least add a note to
> the proper location in the code mentioning that these deferreds are dead.
>

Any deferreds attached to an unconnected client are dead. A session 'expired' event
is effectively a connection loss, the extant deferreds are dead. The expired
event is the only fatal session event. There is some testing around this (extant
watches on dead clients).

Alternatively, we could route session exceptions to the extant watcher, but then
we're back to the app code having to add guards to each watch for connection loss,
or lots of unhandled exceptions in deferred stacks. The connection exception
watcher will still fire as its always registered on the connection level watcher,
ie. the one that monitors the initial connection setup.

From a cleanup perspective this might be preferrable, but the numbers of extant
watches are small enough its not clear its really win, and most of that cleanup
will just result in ignorable log errors without connection loss handling on all
watches. The error handling itself and the error handling for a watchis basically
just ignore or propgate the exception to fully clear out the deferred chain with
the connection level error callback handling the actual reconnect scenario.

[5-6] Addressed

> [7]
>
> + self._check_result(result, d)
> + if not d.called:
> + d.callback(True)
>
> This is checking if the deferred is being called synchronously, which seems
> dangerous as a pattern. Theorethically, the deferred may not have fired by
> the time the function returns.
>
>
> I suggest using the interface defined (considering the suggestions from
> the pre-requisite branch):
>
> if not self._failed(result, d):
> d.callback(True)
> return d
>

In this case the txzk api is synchronous (close) and we're checking just checking
if its failed. I've updated the construct to not check the deferred.called status.

Revision history for this message
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal

Excerpts from Jim Baker's message of Tue Jun 21 00:18:33 UTC 2011:
> Review: Approve
> +1, we certainly need this branch, and it looks quite solid.
>
> In addition to Gustavo's comments:
>
> [1]
>
Added most of the grammatical changes.
>
> [2]
>
> + http://bit.ly/mQrOMY
> + http://bit.ly/irKpfn
>
> Why not just expand these links out? Alternatively use a readable URL
> (the underlying nabble URL is modestly readable, I don't know how
> permalink either one is, however).
>

The links are rather ugly by themselves and violate 80 col lines.

>
> [3]
>
> + assert callable(callback)
>
> Shouldn't this be throwing a TypeError instead? Also, there is no
> correponding assertion (or verification) on
> set_connection_error_callback. Lastly, there is no test on this path.

Well the tests definitely exercised it by way of using the functionality of the callback, but i've gone ahead and added the typeerror and invalid callback tests.

>
>
> [4]
>
> + def xtest_session_expired_event(self):
>
> Do you want to include this test?
>

nice catch, i did but it was before the implementation of the session work, i've moved the test to test_session

thanks for the review,

cheers,

Kapil

Revision history for this message
Kapil Thangavelu (hazmat) wrote : Posted in a previous version of this proposal

gustavo, even though this branch is approved, i'm going to wait on a final comment/agreement on [4] before merging.

Revision history for this message
Gustavo Niemeyer (niemeyer) wrote :
Download full text (24.6 KiB)

Deep conversation about error handling in Ensemble, in the context of item [4] above.

We've agreed to merge this as-is, since clearly there's a lot of work to be made for the desired end result.

Jul 05 17:30:51 <niemeyer> Let me understand this.. are you introducing this logic so that the application we'll use this to crash the agent?
Jul 05 17:30:59 <niemeyer> s/we'll/will
Jul 05 17:31:14 <niemeyer> If that's the case, as we've already agreed, that's ok for now
Jul 05 17:31:15 <hazmat_> that's one approach
Jul 05 17:31:36 <hazmat_> the other is letting the agent stop/start
Jul 05 17:31:47 <niemeyer> I just want to understand what we're doing, though, and what would be the proper way
Jul 05 17:31:53 <hazmat_> i know.. there's dead stuff on the reactor if we start/stop
Jul 05 17:31:58 <niemeyer> Not just that..
Jul 05 17:32:10 <niemeyer> The dead stuff is not my major worry
Jul 05 17:32:21 <niemeyer> The real problem I see is that we simply can't trust any code anymore
Jul 05 17:32:33 <niemeyer> Imagine.. "Oh, I'll loop until file foo is created."
Jul 05 17:32:34 <niemeyer> NOT!
Jul 05 17:32:43 <niemeyer> The loop never dies..
Jul 05 17:32:55 <niemeyer> *anything* we do would be entirely unreliable
Jul 05 17:33:01 <niemeyer> if it's touching a watch
Jul 05 17:33:13 <niemeyer> It's like piecemeal crashing..
Jul 05 17:33:36 <hazmat_> right error handling is application specific
Jul 05 17:34:23 <hazmat_> with the existing watch protocols, its not an issue, except for the case of a hook currently executing (which needs cleanup even in the case of a process shutdown)
Jul 05 17:35:11 <niemeyer> I'm pretty sure I can find cases where it is an issue..
Jul 05 17:35:21 <niemeyer> relation-get ip
Jul 05 17:35:35 <niemeyer> relation-get goes into the agent..
Jul 05 17:35:45 <niemeyer> Never returns again..
Jul 05 17:35:52 <niemeyer> Dead processes
Jul 05 17:36:05 <hazmat_> that's the case i just mentioned
Jul 05 17:36:22 <niemeyer> I see
Jul 05 17:36:34 <niemeyer> I read "watch currently executed" when you said hook
Jul 05 17:36:56 <niemeyer> Anyway.. that's the kind of issue
Jul 05 17:37:07 <niemeyer> There are likely more problematic cases already
Jul 05 17:38:58 <hazmat_> yup.. the interesting case to me, would be something that where we our 'interactively' waiting on a watch, which we don't have, all the existing watch apis are effectively complete on their own (ie. request / response cycles).. if we want to consider arbitrary code.. sure we can construct problem scenarios.. but the existing usage i think is okay..
Jul 05 17:39:00 * hazmat_ brainstorms
Jul 05 17:39:24 <niemeyer> Well, it's already not ok as we have just both spotted
Jul 05 17:40:13 <hazmat_> well the hook scenario is very different then the dead logic waiting issue
Jul 05 17:40:19 <niemeyer> Why?
Jul 05 17:40:33 <hazmat_> it doesn't really care if the session terminates, it just needs an active connection to zk
Jul 05 17:41:06 <niemeyer> Hmmm
Jul 05 17:43:27 <hazmat_> pretty much all of the watch protocols/callbacks are based on examining the current state, and then effecting changes for their area of responsibility to match that current state.. if their restarted, they function t...

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'txzookeeper/client.py'
2--- txzookeeper/client.py 2011-06-17 23:52:34 +0000
3+++ txzookeeper/client.py 2011-07-05 13:29:55 +0000
4@@ -60,6 +60,24 @@
5 zookeeper.SYSTEMERROR: zookeeper.SystemErrorException,
6 zookeeper.UNIMPLEMENTED: zookeeper.UnimplementedException}
7
8+# Mapping of connection state values to human strings.
9+STATE_NAME_MAPPING = {
10+ zookeeper.ASSOCIATING_STATE: "associating",
11+ zookeeper.AUTH_FAILED_STATE: "auth-failed",
12+ zookeeper.CONNECTED_STATE: "connected",
13+ zookeeper.CONNECTING_STATE: "connecting",
14+ zookeeper.EXPIRED_SESSION_STATE: "expired",
15+}
16+
17+# Mapping of event type to human string.
18+TYPE_NAME_MAPPING = {
19+ zookeeper.NOTWATCHING_EVENT: "not-watching",
20+ zookeeper.SESSION_EVENT: "session",
21+ zookeeper.CREATED_EVENT: "created",
22+ zookeeper.DELETED_EVENT: "deleted",
23+ zookeeper.CHANGED_EVENT: "changed",
24+ zookeeper.CHILD_EVENT: "child"}
25+
26
27 class NotConnectedException(zookeeper.ZooKeeperException):
28 """
29@@ -73,6 +91,29 @@
30 Raised if an error occurs during the client's connection attempt.
31 """
32
33+ @property
34+ def state_name(self):
35+ return STATE_NAME_MAPPING[self.args[2]]
36+
37+ @property
38+ def type_name(self):
39+ return TYPE_NAME_MAPPING[self.args[1]]
40+
41+ def __str__(self):
42+ return "<txzookeeper.ConnectionException type: %s state: %s>" % (
43+ self.type_name, self.state_name)
44+
45+
46+def is_connection_exception(e):
47+ """
48+ For connection errors in response to api calls, a utility method
49+ to determine if the cause is a connection exception.
50+ """
51+ return isinstance(e,
52+ (zookeeper.ClosingException,
53+ zookeeper.ConnectionLossException,
54+ zookeeper.SessionExpiredException))
55+
56
57 class ConnectionTimeoutException(zookeeper.ZooKeeperException):
58 """
59@@ -87,94 +128,131 @@
60 some event on the zookeeper client that the watch was requested on.
61 """
62
63- type_name_map = {
64- -2: "notwatching",
65- -1: "session",
66- 1: 'created',
67- 2: 'deleted',
68- 3: 'changed',
69- 4: 'child'}
70-
71 @property
72 def type_name(self):
73- return self.type_name_map[self.type]
74+ return TYPE_NAME_MAPPING[self.type]
75+
76+ @property
77+ def state_name(self):
78+ return STATE_NAME_MAPPING[self.connection_state]
79
80 def __repr__(self):
81- return "<ClientEvent %s at %r>" % (self.type_name, self.path)
82+ return "<ClientEvent %s at %r state: %s>" % (
83+ self.type_name, self.path, self.state_name)
84
85
86 class ZookeeperClient(object):
87 """Asynchronous twisted client for zookeeper."""
88
89 def __init__(self, servers=None, session_timeout=None):
90+ """
91+ @param servers: A string specifying the servers and their
92+ ports to connect to. Multiple servers can be
93+ specified in comma separated fashion. if they are,
94+ then the client will automatically rotate
95+ among them if a server connection fails. Optionally
96+ a chroot can be specified. A full server spec looks
97+ like host:port/chroot_path
98+
99+ @param session_timeout: The client's zookeeper session timeout can be
100+ hinted. The actual value is negotiated between the
101+ client and server based on their respective
102+ configurations.
103+ """
104 self._servers = servers
105 self._session_timeout = session_timeout
106+ self._session_event_callback = None
107+ self._connection_error_callback = None
108 self.connected = False
109 self.handle = None
110
111- def _check_connected(self):
112+ def _check_connected(self, d):
113 if not self.connected:
114- raise NotConnectedException("not connected")
115-
116- def _check_result(self, result_code, callback=False, extra_codes=()):
117+ d.errback(NotConnectedException("not connected"))
118+ return d
119+
120+ def _check_result(self, result_code, deferred, extra_codes=()):
121+ """Check an API call or result for errors.
122+
123+ :param result_code: The api result code.
124+ :param deferred: The deferred returned the client api consumer.
125+ :param extra_codes: Additional result codes accepted as valid/ok.
126+
127+ If the result code is an error, an appropriate Exception class
128+ is constructed and the errback on the deferred is invoked with it.
129+ """
130 error = None
131 if not result_code == zookeeper.OK and not result_code in extra_codes:
132 error_msg = zookeeper.zerror(result_code)
133 error_class = ERROR_MAPPING.get(
134 result_code, zookeeper.ZooKeeperException)
135 error = error_class(error_msg)
136- if callback:
137- return error
138- raise error
139+
140+ if is_connection_exception(error):
141+ # Mark the client as disconnected.
142+ self.connected = False
143+ # Route connection errors to a connection level error
144+ # handler if specified.
145+ if self._connection_error_callback:
146+ # The result of the connection error handler is returned
147+ # to the api.
148+ d = defer.maybeDeferred(
149+ self._connection_error_callback,
150+ self, error)
151+ d.chainDeferred(deferred)
152+ return True
153+
154+ deferred.errback(error)
155+ return True
156 return None
157
158 def _get(self, path, watcher):
159- self._check_connected()
160 d = defer.Deferred()
161+ if self._check_connected(d):
162+ return d
163
164 def _cb_get(result_code, value, stat):
165- error = self._check_result(result_code, True)
166- if error:
167- return d.errback(error)
168+ if self._check_result(result_code, d):
169+ return
170 d.callback((value, stat))
171
172 callback = self._zk_thread_callback(_cb_get)
173 watcher = self._wrap_watcher(watcher)
174 result = zookeeper.aget(self.handle, path, watcher, callback)
175- self._check_result(result)
176+ self._check_result(result, d)
177 return d
178
179 def _get_children(self, path, watcher):
180- self._check_connected()
181 d = defer.Deferred()
182+ if self._check_connected(d):
183+ return d
184
185 def _cb_get_children(result_code, children):
186- error = self._check_result(result_code, True)
187- if error:
188- return d.errback(error)
189+ if self._check_result(result_code, d):
190+ return
191 d.callback(children)
192
193 callback = self._zk_thread_callback(_cb_get_children)
194 watcher = self._wrap_watcher(watcher)
195 result = zookeeper.aget_children(self.handle, path, watcher, callback)
196- self._check_result(result)
197+ self._check_result(result, d)
198 return d
199
200 def _exists(self, path, watcher):
201- self._check_connected()
202 d = defer.Deferred()
203+ if self._check_connected(d):
204+ return d
205
206 def _cb_exists(result_code, stat):
207- error = self._check_result(
208- result_code, True, extra_codes=(zookeeper.NONODE,))
209- if error:
210- return d.errback(error)
211+ if self._check_result(
212+ result_code, d, extra_codes=(zookeeper.NONODE,)):
213+ return
214 d.callback(stat)
215
216 callback = self._zk_thread_callback(_cb_exists)
217 watcher = self._wrap_watcher(watcher)
218 result = zookeeper.aexists(self.handle, path, watcher, callback)
219- self._check_result(result)
220+ self._check_result(result, d)
221 return d
222
223 def _wrap_watcher(self, watcher):
224@@ -182,7 +260,22 @@
225 return watcher
226 if not callable(watcher):
227 raise SyntaxError("invalid watcher")
228- return self._zk_thread_callback(watcher)
229+ return self._zk_thread_callback(
230+ partial(self._session_event_wrapper, watcher))
231+
232+ def _session_event_wrapper(self, watcher, event_type, conn_state, path):
233+ """Watch wrapper that diverts session events to a connection callback.
234+ """
235+ # If it's a session event pass it to the session callback, else
236+ # ignore it. Session events are sent repeatedly to watchers
237+ # which we have modeled after deferred, which only accept a
238+ # single return value.
239+ if event_type == zookeeper.SESSION_EVENT:
240+ if self._session_event_callback:
241+ self._session_event_callback(
242+ self, ClientEvent(event_type, conn_state, path))
243+ else:
244+ return watcher(event_type, conn_state, path)
245
246 def _zk_thread_callback(self, func):
247 """
248@@ -190,7 +283,6 @@
249 any user defined callback so that they are called back in the main
250 thread after, zookeeper calls the wrapper.
251 """
252-
253 def wrapper(handle, *args): # pragma: no cover
254 reactor.callFromThread(func, *args)
255 return wrapper
256@@ -207,6 +299,7 @@
257 def session_timeout(self):
258 """
259 What's the negotiated session timeout for this connection, in seconds.
260+ If the client is not connected the value is None.
261 """
262 if self.connected:
263 return zookeeper.recv_timeout(self.handle)
264@@ -222,9 +315,14 @@
265
266 @property
267 def client_id(self):
268- """
269- The connection's client id, useful when introspecting the server logs
270- for specific client activity.
271+ """Returns the client id that identifies the server side session.
272+
273+ A client id is a tuple represented by the session id and
274+ session password. It can be used to manually connect to an
275+ extant server session (which contains associated ephemeral
276+ nodes and watches)/ The connection's client id is also useful
277+ when introspecting the server logs for specific client
278+ activity.
279 """
280 if self.handle is None:
281 return None
282@@ -239,34 +337,33 @@
283 return bool(zookeeper.is_unrecoverable(self.handle))
284
285 def add_auth(self, scheme, identity):
286- """
287- Adds an authentication identity to this connection. A connection
288- can use multiple authentication identities at the same time, all
289- are checked when verifying acls on a node.
290+ """Adds an authentication identity to this connection.
291+
292+ A connection can use multiple authentication identities at the
293+ same time, all are checked when verifying acls on a node.
294
295 @param scheme: a string specifying a an authentication scheme
296 valid values include 'digest'.
297- @param identity: a string containingusername and password colon
298- separated for example 'mary:apples'
299+ @param identity: a string containing username and password colon
300+ separated, for example 'mary:apples'
301 """
302- self._check_connected()
303 d = defer.Deferred()
304+ if self._check_connected(d):
305+ return d
306
307 def _cb_authenticated(result_code):
308- error = self._check_result(result_code, True)
309- if error:
310- return d.errback(error)
311+ if self._check_result(result_code, d):
312+ return
313 d.callback(self)
314
315 callback = self._zk_thread_callback(_cb_authenticated)
316 result = zookeeper.add_auth(self.handle, scheme, identity, callback)
317- self._check_result(result)
318+ self._check_result(result, d)
319 return d
320
321 def close(self, force=False):
322 """
323- Close the underlying socket connection and zookeeper server side
324- session.
325+ Close the underlying socket connection and server side session.
326
327 @param force: boolean, require the connection to be closed now or
328 an exception be raised.
329@@ -276,23 +373,31 @@
330
331 result = zookeeper.close(self.handle)
332 self.connected = False
333- self._check_result(result)
334- return result
335-
336- def connect(self, servers=None, timeout=10):
337+ d = defer.Deferred()
338+
339+ if self._check_result(result, d):
340+ return d
341+ d.callback(True)
342+ return d
343+
344+ def connect(self, servers=None, timeout=10, client_id=None):
345 """
346 Establish a connection to the given zookeeper server(s).
347
348 @param servers: A string specifying the servers and their ports to
349- connect to.
350+ connect to. Multiple servers can be specified in
351+ comma separated fashion.
352 @param timeout: How many seconds to wait on a connection to the
353 zookeeper servers.
354+
355+ @param session_id:
356 @returns A deferred that's fired when the connection is established.
357 """
358 d = defer.Deferred()
359
360 if self.connected:
361- raise zookeeper.ZooKeeperException("Already Connected")
362+ return defer.fail(
363+ zookeeper.ZooKeeperException("Already Connected"))
364
365 # Use a scheduled function to ensure a timeout.
366 def _check_timeout():
367@@ -311,31 +416,45 @@
368 if servers is not None:
369 self._servers = servers
370
371- self.handle = zookeeper.init(
372- self._servers, callback, self._session_timeout)
373+ # Assemble client id if specified.
374+ if client_id:
375+ self.handle = zookeeper.init(
376+ self._servers, callback, self._session_timeout, client_id)
377+ else:
378+ self.handle = zookeeper.init(
379+ self._servers, callback, self._session_timeout)
380
381 return d
382
383 def _cb_connected(
384 self, scheduled_timeout, connect_deferred, type, state, path):
385+ """This callback is invoked through the lifecycle of the connection.
386
387+ It's used for all connection level events and session events.
388+ """
389 # Cancel the timeout delayed task if it hasn't fired.
390 if scheduled_timeout.active():
391 scheduled_timeout.cancel()
392
393+ # Update connected boolean
394+ if state == zookeeper.CONNECTED_STATE:
395+ self.connected = True
396+ elif state != zookeeper.CONNECTING_STATE:
397+ self.connected = False
398+
399 if connect_deferred.called:
400 # If we timed out and then connected, then close the conn.
401- if state == zookeeper.CONNECTED_STATE:
402- self.connected = True
403+ if state == zookeeper.CONNECTED_STATE and scheduled_timeout.called:
404 self.close()
405- # If the client is reused across multiple connect/close
406- # cycles, and a previous connect timed out, then a
407- # subsequent connect may trigger the previous connect's
408- # handler notifying of a CONNECTING_STATE, ignore.
409+
410+ # Send session events to the callback, in addition to any
411+ # duplicate session events that will be sent for extant watches.
412+ if self._session_event_callback:
413+ self._session_event_callback(
414+ self, ClientEvent(type, state, path))
415+
416 return
417 elif state == zookeeper.CONNECTED_STATE:
418- # Connection established.
419- self.connected = True
420 connect_deferred.callback(self)
421 return
422
423@@ -351,19 +470,19 @@
424 @params acls: A list of dictionaries specifying permissions.
425 @params flags: Node creation flags (ephemeral, sequence, persistent)
426 """
427- self._check_connected()
428 d = defer.Deferred()
429+ if self._check_connected(d):
430+ return d
431
432 def _cb_created(result_code, path):
433- error = self._check_result(result_code, True)
434- if error:
435- return d.errback(error)
436+ if self._check_result(result_code, d):
437+ return
438 d.callback(path)
439
440 callback = self._zk_thread_callback(_cb_created)
441 result = zookeeper.acreate(
442 self.handle, path, data, acls, flags, callback)
443- self._check_result(result)
444+ self._check_result(result, d)
445 return d
446
447 def delete(self, path, version=-1):
448@@ -376,18 +495,18 @@
449 @param path: the path of the node to be deleted.
450 @param version: the integer version of the node.
451 """
452- self._check_connected()
453 d = defer.Deferred()
454+ if self._check_connected(d):
455+ return d
456
457 def _cb_delete(result_code):
458- error = self._check_result(result_code, True)
459- if error:
460- return d.errback(error)
461+ if self._check_result(result_code, d):
462+ return
463 d.callback(result_code)
464
465 callback = self._zk_thread_callback(_cb_delete)
466 result = zookeeper.adelete(self.handle, path, version, callback)
467- self._check_result(result)
468+ self._check_result(result, d)
469 return d
470
471 def exists(self, path):
472@@ -395,8 +514,9 @@
473 Check that the given node path exists. Returns a deferred that
474 holds the node stat information if the node exists (created,
475 modified, version, etc.), or ``None`` if it does not exist.
476+
477+ @param path: The path of the node whose existence will be checked.
478 """
479-
480 return self._exists(path, None)
481
482 def exists_and_watch(self, path):
483@@ -406,20 +526,22 @@
484 In addition to the deferred method result, this method returns
485 a deferred that is called back when the node is modified or
486 removed (once).
487+
488+ @param path: The path of the node whose existence will be checked.
489 """
490-
491 d = defer.Deferred()
492
493- def callback(*args):
494+ def watcher(*args):
495 d.callback(ClientEvent(*args))
496- return self._exists(path, callback), d
497+ return self._exists(path, watcher), d
498
499 def get(self, path):
500 """
501 Get the node's data for the given node path. Returns a
502 deferred that holds the content of the node.
503+
504+ @param path: The path of the node whose content will be retrieved.
505 """
506-
507 return self._get(path, None)
508
509 def get_and_watch(self, path):
510@@ -429,17 +551,20 @@
511 In addition to the deferred method result, this method returns
512 a deferred that is called back when the node is modified or
513 removed (once).
514+
515+ @param path: The path of the node whose content will be retrieved.
516 """
517-
518 d = defer.Deferred()
519
520- def callback(*args):
521+ def watcher(*args):
522 d.callback(ClientEvent(*args))
523- return self._get(path, callback), d
524+ return self._get(path, watcher), d
525
526 def get_children(self, path):
527 """
528 Get the ids of all children directly under the given path.
529+
530+ @param path: The path of the node whose children will be retrieved.
531 """
532 return self._get_children(path, None)
533
534@@ -450,13 +575,14 @@
535 In addition to the deferred method result, this method returns
536 a deferred that is called back when a change happens on the
537 provided path (once).
538+
539+ @param path: The path of the node whose children will be retrieved.
540 """
541-
542 d = defer.Deferred()
543
544- def callback(*args):
545+ def watcher(*args):
546 d.callback(ClientEvent(*args))
547- return self._get_children(path, callback), d
548+ return self._get_children(path, watcher), d
549
550 def get_acl(self, path):
551 """
552@@ -464,19 +590,21 @@
553
554 Each acl is a dictionary containing keys/values for scheme, id,
555 and perms.
556+
557+ @param path: The path of the node whose acl will be retrieved.
558 """
559- self._check_connected()
560 d = defer.Deferred()
561+ if self._check_connected(d):
562+ return d
563
564 def _cb_get_acl(result_code, acls, stat):
565- error = self._check_result(result_code, True)
566- if error:
567- return d.errback(error)
568+ if self._check_result(result_code, d):
569+ return
570 d.callback((acls, stat))
571
572 callback = self._zk_thread_callback(_cb_get_acl)
573 result = zookeeper.aget_acl(self.handle, path, callback)
574- self._check_result(result)
575+ self._check_result(result, d)
576 return d
577
578 def set_acl(self, path, acls, version=-1):
579@@ -502,19 +630,19 @@
580 doesn't match the version on the server, then a
581 BadVersionException is raised.
582 """
583- self._check_connected()
584 d = defer.Deferred()
585+ if self._check_connected(d):
586+ return d
587
588 def _cb_set_acl(result_code):
589- error = self._check_result(result_code, True)
590- if error:
591- return d.errback(error)
592+ if self._check_result(result_code, d):
593+ return
594 d.callback(result_code)
595
596 callback = self._zk_thread_callback(_cb_set_acl)
597 result = zookeeper.aset_acl(
598 self.handle, path, version, acls, callback)
599- self._check_result(result)
600+ self._check_result(result, d)
601 return d
602
603 def set(self, path, data="", version=-1):
604@@ -528,18 +656,18 @@
605 @param data: The data to store on the node.
606 @param version: Integer version value
607 """
608- self._check_connected()
609 d = defer.Deferred()
610+ if self._check_connected(d):
611+ return d
612
613 def _cb_set(result_code, node_stat):
614- error = self._check_result(result_code, True)
615- if error:
616- return d.errback(error)
617+ if self._check_result(result_code, d):
618+ return
619 d.callback(node_stat)
620
621 callback = self._zk_thread_callback(_cb_set)
622 result = zookeeper.aset(self.handle, path, data, version, callback)
623- self._check_result(result)
624+ self._check_result(result, d)
625 return d
626
627 def set_connection_watcher(self, watcher):
628@@ -549,25 +677,72 @@
629
630 @param: watcher function
631 """
632+ if not callable(watcher):
633+ raise SyntaxError("Invalid Watcher %r" % (watcher))
634 watcher = self._wrap_watcher(watcher)
635 zookeeper.set_watcher(self.handle, watcher)
636
637+ def set_session_callback(self, callback):
638+ """Set a callback to receive session events.
639+
640+ Session events are by default ignored. Interested applications
641+ may choose to set a session event watcher on the connection
642+ to receive session events. Session events are typically broadcast
643+ by the libzookeeper library to all extant watchers, but the
644+ twisted integration using deferreds is not capable of receiving
645+ multiple values (session events and watch events), so this
646+ client implementation instead provides for a user defined callback
647+ to be invoked with them instead. The callback receives a single
648+ parameter, the session event in the form of a ClientEvent instance.
649+
650+ Additional details on session events
651+ ------------------------------------
652+ http://bit.ly/mQrOMY
653+ http://bit.ly/irKpfn
654+ """
655+ if not callable(callback):
656+ raise TypeError("Invalid callback %r" % callback)
657+ self._session_event_callback = callback
658+
659+ def set_connection_error_callback(self, callback):
660+ """Set a callback to receive connection error exceptions.
661+
662+ By default the error will be raised when the client API
663+ call is made. Setting a connection level error handler allows
664+ applications to centralize their handling of connection loss,
665+ instead of having to guard every zk interaction.
666+
667+ The callback receives two parameters, the client instance
668+ and the exception.
669+ """
670+ if not callable(callback):
671+ raise TypeError("Invalid callback %r" % callback)
672+ self._connection_error_callback = callback
673+
674+ def set_determinstic_order(self, boolean):
675+ """
676+ The zookeeper client will by default randomize the server hosts
677+ it will connect to unless this is set to True.
678+
679+ This is a global setting across connections.
680+ """
681+ zookeeper.deterministic_conn_order(bool(boolean))
682+
683 def sync(self, path="/"):
684- """
685- Flushes the zookeeper connection to the leader.
686+ """Flushes the connected zookeeper server with the leader.
687
688 @param path: The root path to flush, all child nodes are also flushed.
689 """
690- self._check_connected()
691 d = defer.Deferred()
692+ if self._check_connected(d):
693+ return d
694
695 def _cb_sync(result_code, path):
696- error = self._check_result(result_code, True)
697- if error:
698- return d.errback(error)
699+ if self._check_result(result_code, d):
700+ return
701 d.callback(path)
702
703 callback = self._zk_thread_callback(_cb_sync)
704 result = zookeeper.async(self.handle, path, callback)
705- self._check_result(result)
706+ self._check_result(result, d)
707 return d
708
709=== modified file 'txzookeeper/queue.py'
710--- txzookeeper/queue.py 2011-06-17 23:52:34 +0000
711+++ txzookeeper/queue.py 2011-07-05 13:29:55 +0000
712@@ -90,7 +90,7 @@
713 def on_queue_items_changed(*args):
714 """Event watcher on queue node child events."""
715 if request.complete or not self._client.connected:
716- return # pragma: no cover
717+ return # pragma: no cover
718
719 if request.processing_children:
720 # If deferred stack is currently processing a set of children
721@@ -273,7 +273,7 @@
722 """
723
724 def _item_processed_callback(self, result_code, item_path):
725- return self._client.delete(item_path+"-processing")
726+ return self._client.delete(item_path + "-processing")
727
728 def _filter_children(self, children, suffix="-processing"):
729 """
730@@ -300,7 +300,7 @@
731
732 def on_node_exists(stat, path):
733 """Reserve the node for consumer processing."""
734- d = self._client.create(path+"-processing",
735+ d = self._client.create(path + "-processing",
736 flags=zookeeper.EPHEMERAL)
737 d.addCallback(on_reservation_success, path)
738 d.addErrback(on_reservation_failed)
739@@ -315,7 +315,7 @@
740
741 def on_get_node_failed(failure, path):
742 """If we can't fetch the node, delete the processing node."""
743- d = self._client.delete(path+"-processing")
744+ d = self._client.delete(path + "-processing")
745
746 # propogate unexpected errors appropriately
747 if not failure.check(zookeeper.NoNodeException):
748@@ -372,7 +372,7 @@
749
750 def __init__(self, path, client, acl=None, persistent=False):
751 super(SerializedQueue, self).__init__(path, client, acl, persistent)
752- self._lock = Lock("%s/%s"%(self.path, "_lock"), client)
753+ self._lock = Lock("%s/%s" % (self.path, "_lock"), client)
754
755 def _item_processed_callback(self, result_code, item_path):
756 return self._lock.release()
757
758=== modified file 'txzookeeper/tests/__init__.py'
759--- txzookeeper/tests/__init__.py 2011-06-17 23:52:34 +0000
760+++ txzookeeper/tests/__init__.py 2011-07-05 13:29:55 +0000
761@@ -36,8 +36,8 @@
762
763 def tearDown(self):
764 super(ZookeeperTestCase, self).tearDown()
765- zookeeper.set_log_stream(sys.stderr) # reset to default
766- zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
767+ #zookeeper.set_log_stream(sys.stderr) # reset to default
768+ #zookeeper.set_debug_level(zookeeper.LOG_LEVEL_DEBUG)
769 self.log_file.close()
770
771 def get_log(self):
772
773=== added file 'txzookeeper/tests/common.py'
774--- txzookeeper/tests/common.py 1970-01-01 00:00:00 +0000
775+++ txzookeeper/tests/common.py 2011-07-05 13:29:55 +0000
776@@ -0,0 +1,216 @@
777+import os
778+import os.path
779+import shutil
780+import subprocess
781+import tempfile
782+
783+from itertools import chain
784+from collections import namedtuple
785+from glob import glob
786+
787+
788+ServerInfo = namedtuple(
789+ "ServerInfo", "server_id client_port election_port leader_port")
790+
791+
792+class ManagedZooKeeper(object):
793+ """Class to manage the running of a ZooKeeper instance for testing.
794+
795+ Note: no attempt is made to probe the ZooKeeper instance is
796+ actually available, or that the selected port is free. In the
797+ future, we may want to do that, especially when run in a
798+ Hudson/Buildbot context, to ensure more test robustness."""
799+
800+ def __init__(self, software_path, server_info, peers=()):
801+ """Define the ZooKeeper test instance.
802+
803+ @param install_path: The path to the install for ZK
804+ @param port: The port to run the managed ZK instance
805+ """
806+ self.install_path = software_path
807+ self.server_info = server_info
808+ self.host = "127.0.0.1"
809+ self.peers = peers
810+ self.working_path = tempfile.mkdtemp()
811+ self._running = False
812+
813+ def run(self):
814+ """Run the ZooKeeper instance under a temporary directory.
815+
816+ Writes ZK log messages to zookeeper.log in the current directory.
817+ """
818+ config_path = os.path.join(self.working_path, "zoo.cfg")
819+ log_path = os.path.join(self.working_path, "log")
820+ log4j_path = os.path.join(self.working_path, "log4j.properties")
821+ data_path = os.path.join(self.working_path, "data")
822+
823+ # various setup steps
824+ if not os.path.exists(self.working_path):
825+ os.mdir(self.working_path)
826+ if not os.path.exists(log_path):
827+ os.mkdir(log_path)
828+ if not os.path.exists(data_path):
829+ os.mkdir(data_path)
830+
831+ with open(config_path, "w") as config:
832+ config.write("""
833+tickTime=2000
834+dataDir=%s
835+clientPort=%s
836+maxClientCnxns=0
837+""" % (data_path, self.server_info.client_port))
838+
839+ # setup a replicated setup if peers are specified
840+ if self.peers:
841+ servers_cfg = []
842+ for p in chain((self.server_info,), self.peers):
843+ servers_cfg.append("server.%s=localhost:%s:%s" % (
844+ p.server_id, p.leader_port, p.election_port))
845+
846+ with open(config_path, "a") as config:
847+ config.write("""
848+initLimit=4
849+syncLimit=2
850+%s
851+""" % ("\n".join(servers_cfg)))
852+
853+ # Write server ids into datadir
854+ with open(os.path.join(data_path, "myid"), "w") as myid_file:
855+ myid_file.write(str(self.server_info.server_id))
856+
857+ with open(log4j_path, "w") as log4j:
858+ log4j.write("""
859+# DEFAULT: console appender only
860+log4j.rootLogger=INFO, ROLLINGFILE
861+log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
862+log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n
863+log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
864+log4j.appender.ROLLINGFILE.Threshold=DEBUG
865+log4j.appender.ROLLINGFILE.File=""" + (
866+ self.working_path + os.sep + "zookeeper.log\n"))
867+
868+ self.process = subprocess.Popen(
869+ args=["java",
870+ "-cp", self.classpath,
871+ "-Dzookeeper.log.dir=%s" % log_path,
872+ "-Dzookeeper.root.logger=INFO,CONSOLE",
873+ "-Dlog4j.configuration=file:%s" % log4j_path,
874+ # "-Dlog4j.debug",
875+ "org.apache.zookeeper.server.quorum.QuorumPeerMain",
876+ config_path],
877+ )
878+ self._running = True
879+
880+ @property
881+ def classpath(self):
882+ """Get the classpath necessary to run ZooKeeper."""
883+
884+ # Two possibilities, as seen in zkEnv.sh:
885+ # Check for a release - top-level zookeeper-*.jar?
886+ jars = glob((os.path.join(
887+ self.install_path, 'zookeeper-*.jar')))
888+ if jars:
889+ # Relase build (`ant package`)
890+ jars.extend(glob(os.path.join(
891+ self.install_path,
892+ "lib/*.jar")))
893+ else:
894+ # Development build (plain `ant`)
895+ jars = glob((os.path.join(
896+ self.install_path, 'build/zookeeper-*.jar')))
897+ jars.extend(glob(os.path.join(
898+ self.install_path,
899+ "build/lib/*.jar")))
900+ return ":".join(jars)
901+
902+ @property
903+ def address(self):
904+ """Get the address of the ZooKeeper instance."""
905+ return "%s:%s" % (self.host, self.client_port)
906+
907+ @property
908+ def running(self):
909+ return self._running
910+
911+ @property
912+ def client_port(self):
913+ return self.server_info.client_port
914+
915+ def reset(self):
916+ """Stop the zookeeper instance, cleaning out its on disk-data."""
917+ self.stop()
918+ shutil.rmtree(os.path.join(self.working_path, "data"))
919+ os.mkdir(os.path.join(self.working_path, "data"))
920+ with open(os.path.join(self.working_path, "data", "myid"), "w") as fh:
921+ fh.write(str(self.server_info.server_id))
922+
923+ def stop(self):
924+ """Stop the Zookeeper instance, retaining on disk state."""
925+ if not self._running:
926+ return
927+ self.process.terminate()
928+ self.process.wait()
929+ self._running = False
930+
931+ def destroy(self):
932+ """Stop the ZooKeeper instance and destroy its on disk-state"""
933+ # called by at exit handler, reimport to avoid cleanup race.
934+ import shutil
935+ self.stop()
936+
937+ shutil.rmtree(self.working_path)
938+
939+
940+class ZookeeperCluster(object):
941+
942+ def __init__(self, install_path, size=3, port_offset=20000):
943+ self._install_path = install_path
944+ self._servers = []
945+
946+ # Calculate ports and peer group
947+ port = port_offset
948+ peers = []
949+
950+ for i in range(size):
951+ port += i * 10
952+ info = ServerInfo(i + 1, port, port + 1, port + 2)
953+ peers.append(info)
954+
955+ # Instantiate Managed ZK Servers
956+ for i in range(size):
957+ server_peers = list(peers)
958+ server_info = server_peers.pop(i)
959+ self._servers.append(
960+ ManagedZooKeeper(
961+ self._install_path, server_info, server_peers))
962+
963+ def __getitem__(self, k):
964+ return self._servers[k]
965+
966+ def __iter__(self):
967+ return iter(self._servers)
968+
969+ def start(self):
970+ # Zookeeper client expresses a preference for either lower ports or
971+ # lexographical ordering of hosts, to ensure that all servers have a
972+ # chance to startup, start them in reverse order.
973+ for server in reversed(list(self)):
974+ server.run()
975+ # Giving the servers a moment to start, decreases the overall time
976+ # required for a client to successfully connect (2s vs. 4s without
977+ # the sleep).
978+ import time
979+ time.sleep(2)
980+
981+ def stop(self):
982+ for server in self:
983+ server.stop()
984+ self._servers = []
985+
986+ def terminate(self):
987+ for server in self:
988+ server.destroy()
989+
990+ def reset(self):
991+ for server in self:
992+ server.reset()
993
994=== modified file 'txzookeeper/tests/mocker.py'
995--- txzookeeper/tests/mocker.py 2011-06-08 21:14:20 +0000
996+++ txzookeeper/tests/mocker.py 2011-07-05 13:29:55 +0000
997@@ -1923,6 +1923,7 @@
998 def run(self, path):
999 return Mock(self.mocker, path)
1000
1001+
1002 def mock_returner_recorder(mocker, event):
1003 """Events that lead to other events must return mock objects."""
1004 parent_path = event.path.parent_path
1005
1006=== modified file 'txzookeeper/tests/test_client.py'
1007--- txzookeeper/tests/test_client.py 2011-06-17 23:52:34 +0000
1008+++ txzookeeper/tests/test_client.py 2011-07-05 13:29:55 +0000
1009@@ -23,19 +23,25 @@
1010
1011 from twisted.internet.defer import Deferred
1012 from twisted.internet.base import DelayedCall
1013+from twisted.python.failure import Failure
1014
1015 import zookeeper
1016
1017+from mocker import ANY, MATCH
1018+from txzookeeper.tests import ZookeeperTestCase, utils
1019 from txzookeeper.client import (
1020 ZookeeperClient, ZOO_OPEN_ACL_UNSAFE, ConnectionTimeoutException,
1021- ConnectionException, ClientEvent)
1022-
1023-from mocker import ANY
1024-from txzookeeper.tests import ZookeeperTestCase, utils
1025+ ConnectionException, NotConnectedException, ClientEvent)
1026
1027 PUBLIC_ACL = ZOO_OPEN_ACL_UNSAFE
1028
1029
1030+def match_deferred(arg):
1031+ return isinstance(arg, Deferred)
1032+
1033+DEFERRED_MATCH = MATCH(match_deferred)
1034+
1035+
1036 class ClientTests(ZookeeperTestCase):
1037
1038 def setUp(self):
1039@@ -47,9 +53,10 @@
1040 if self.client.connected:
1041 utils.deleteTree(handle=self.client.handle)
1042 self.client.close()
1043- del self.client
1044+
1045 if self.client2 and self.client2.connected:
1046 self.client2.close()
1047+
1048 super(ClientTests, self).tearDown()
1049
1050 def test_wb_connect_after_timeout(self):
1051@@ -113,8 +120,10 @@
1052 return d
1053
1054 def test_client_event_repr(self):
1055- event = ClientEvent(4, 'state', 'path')
1056- self.assertEqual(repr(event), "<ClientEvent child at 'path'>")
1057+ event = ClientEvent(zookeeper.SESSION_EVENT,
1058+ zookeeper.EXPIRED_SESSION_STATE, '')
1059+ self.assertEqual(repr(event),
1060+ "<ClientEvent session at '' state: expired>")
1061
1062 def test_client_event_attributes(self):
1063 event = ClientEvent(4, 'state', 'path')
1064@@ -123,6 +132,10 @@
1065 self.assertEqual(event.path, 'path')
1066 self.assertEqual(event, (4, 'state', 'path'))
1067
1068+ def test_client_use_while_disconnected_returns_failure(self):
1069+ return self.assertFailure(
1070+ self.client.exists("/"), NotConnectedException)
1071+
1072 def test_create_ephemeral_node_and_close_connection(self):
1073 """
1074 The client can create transient nodes that are destroyed when the
1075@@ -307,14 +320,17 @@
1076 return self.client.create("/foobar-watched", "rabbit")
1077
1078 def get_node(path):
1079- return self.client.get_and_watch(path)
1080+ data, watch = self.client.get_and_watch(path)
1081+ return data.addCallback(lambda x: (watch,))
1082
1083- def new_connection((data, watch)):
1084+ def new_connection((watch,)):
1085 self.client2 = ZookeeperClient("127.0.0.1:2181")
1086- return self.client2.connect(), watch
1087+ return self.client2.connect().addCallback(
1088+ lambda x, y=None, z=None: (x, watch))
1089
1090 def trigger_watch((client, watch)):
1091 zookeeper.delete(self.client2.handle, "/foobar-watched")
1092+ self.client2.close()
1093 return watch
1094
1095 def verify_watch(event):
1096@@ -385,11 +401,16 @@
1097 """
1098 d = self.client.connect()
1099
1100+ def inject_error(result_code, d, extra_codes=None):
1101+ error = SyntaxError()
1102+ d.errback(error)
1103+ return error
1104+
1105 def check_exists(client):
1106 mock_client = self.mocker.patch(client)
1107 mock_client._check_result(
1108- ANY, True, extra_codes=(zookeeper.NONODE,))
1109- self.mocker.result(SyntaxError())
1110+ ANY, DEFERRED_MATCH, extra_codes=(zookeeper.NONODE,))
1111+ self.mocker.call(inject_error)
1112 self.mocker.replay()
1113 return client.exists("/zebra-moon")
1114
1115@@ -655,20 +676,21 @@
1116 return d
1117
1118 def test_get_children_with_error(self):
1119+ """If the result of an api call is an error, its propgated.
1120+ """
1121 d = self.client.connect()
1122
1123 def get_children(client):
1124- mock_client = self.mocker.patch(self.client)
1125- mock_client._check_result(ANY, True)
1126- self.mocker.result(SyntaxError())
1127- self.mocker.replay()
1128+ # Get the children of a nonexistant node
1129 return client.get_children("/tower")
1130
1131 def verify_failure(failure):
1132- self.assertTrue(isinstance(failure.value, SyntaxError))
1133+ self.assertTrue(isinstance(failure, Failure))
1134+ self.assertTrue(
1135+ isinstance(failure.value, zookeeper.NoNodeException))
1136
1137 d.addCallback(get_children)
1138- d.addErrback(verify_failure)
1139+ d.addBoth(verify_failure)
1140 return d
1141
1142 # seems to be a segfault on this one, must be running latest zk
1143@@ -825,7 +847,7 @@
1144 def verify_node_access(stat):
1145 self.assertEqual(stat['version'], 1)
1146 self.assertEqual(stat['dataLength'], 3)
1147- self.assertTrue(failed) # we should have hit the errback
1148+ self.assertTrue(failed) # we should have hit the errback
1149
1150 d.addCallback(add_auth_one)
1151 d.addCallback(create_node)
1152@@ -905,10 +927,6 @@
1153 acl = dict(scheme="digest", id="a:b", perms=zookeeper.PERM_ALL)
1154
1155 def set_acl(client):
1156- mock_client = self.mocker.patch(client)
1157- mock_client._check_result(ANY, True)
1158- self.mocker.result(zookeeper.NoNodeException())
1159- self.mocker.replay()
1160 return client.set_acl("/zebra-moon22", [acl])
1161
1162 def verify_failure(failure):
1163@@ -946,23 +964,22 @@
1164 """
1165 d = self.client.connect()
1166
1167- def create_node(client):
1168- return client.create("/moose")
1169+ def inject_error(result, d):
1170+ error = zookeeper.ZooKeeperException()
1171+ d.errback(error)
1172+ return error
1173
1174 def get_acl(path):
1175- mock_client = self.mocker.patch(self.client)
1176- mock_client._check_result(ANY, True)
1177- self.mocker.result(zookeeper.ZooKeeperException("foobar"))
1178- self.mocker.replay()
1179- return self.client.get_acl(path)
1180+ # Get the ACL of a nonexistant node
1181+ return self.client.get_acl("/moose")
1182
1183 def verify_failure(failure):
1184+ self.assertTrue(isinstance(failure, Failure))
1185 self.assertTrue(
1186 isinstance(failure.value, zookeeper.ZooKeeperException))
1187
1188- d.addCallback(create_node)
1189 d.addCallback(get_acl)
1190- d.addErrback(verify_failure)
1191+ d.addBoth(verify_failure)
1192 return d
1193
1194 def test_client_id(self):
1195@@ -1006,35 +1023,6 @@
1196 d.addCallback(verify_sync)
1197 return d
1198
1199- def test_sync_error(self):
1200- """
1201- On error the sync callback returns a an exception/failure.
1202- """
1203- d = self.client.connect()
1204-
1205- def create_node(client):
1206- return client.create("/abc")
1207-
1208- def client_sync(path):
1209- mock_client = self.mocker.patch(self.client)
1210- mock_client._check_result(ANY, True)
1211- self.mocker.result(zookeeper.ZooKeeperException("foobar"))
1212- self.mocker.replay()
1213- return self.client.sync(path)
1214-
1215- def verify_failure(failure):
1216- self.assertTrue(
1217- isinstance(failure.value, zookeeper.ZooKeeperException))
1218-
1219- def assert_failed(extra):
1220- self.fail("Should have gone to errback")
1221-
1222- d.addCallback(create_node)
1223- d.addCallback(client_sync)
1224- d.addCallback(assert_failed)
1225- d.addErrback(verify_failure)
1226- return d
1227-
1228 def test_property_servers(self):
1229 """
1230 The servers property of the client, shows which if any servers
1231@@ -1087,13 +1075,67 @@
1232 return client.set_connection_watcher(1)
1233
1234 def verify_invalid(failure):
1235- self.assertEqual(failure.value.args, ("invalid watcher",))
1236+ self.assertEqual(failure.value.args, ("Invalid Watcher 1",))
1237 self.assertTrue(isinstance(failure.value, SyntaxError))
1238
1239 d.addCallback(set_invalid_watcher)
1240 d.addErrback(verify_invalid)
1241 return d
1242
1243+ def xtest_session_expired_event(self):
1244+ """
1245+ A client session can be reattached to in a separate connection,
1246+ if the a session is expired, using the zookeeper connection will
1247+ raise a SessionExpiredException.
1248+ """
1249+ d = self.client.connect()
1250+
1251+ class StopTest(Exception):
1252+ pass
1253+
1254+ def new_connection_same_connection(client):
1255+ self.assertEqual(client.state, zookeeper.CONNECTED_STATE)
1256+ return ZookeeperClient("127.0.0.1:2181").connect(
1257+ client_id=client.client_id).addErrback(
1258+ guard_session_expired, client)
1259+
1260+ def guard_session_expired(failure, client):
1261+ # On occassion we get a session expired event while connecting.
1262+ failure.trap(ConnectionException)
1263+ self.assertEqual(failure.value.state_name, "expired")
1264+ # Stop the test from proceeding
1265+ raise StopTest()
1266+
1267+ def close_new_connection(client):
1268+ # Verify both connections are using same session
1269+ self.assertEqual(self.client.client_id, client.client_id)
1270+ self.assertEqual(client.state, zookeeper.CONNECTED_STATE)
1271+
1272+ # Closing one connection will close the session
1273+ client.close()
1274+
1275+ # Continued use of the other client will get a
1276+ # disconnect exception.
1277+ return self.client.exists("/")
1278+
1279+ def verify_original_closed(failure):
1280+ if not isinstance(failure, Failure):
1281+ self.fail("Test did not raise exception.")
1282+ failure.trap(
1283+ zookeeper.SessionExpiredException,
1284+ zookeeper.ConnectionLossException)
1285+
1286+ #print "client close"
1287+ self.client.close()
1288+ #print "creating new client for teardown"
1289+ return self.client.connect()
1290+
1291+ d.addCallback(new_connection_same_connection)
1292+ d.addCallback(close_new_connection)
1293+ d.addBoth(verify_original_closed)
1294+
1295+ return d
1296+
1297 def test_connect_with_server(self):
1298 """
1299 A client's servers can be specified in the connect method.
1300@@ -1164,14 +1206,14 @@
1301 All of the client apis (with the exception of connect) attempt
1302 to ensure the client is connected before executing an operation.
1303 """
1304- self.assertRaises(
1305- zookeeper.ZooKeeperException, self.client.get_children, "/abc")
1306-
1307- self.assertRaises(
1308- zookeeper.ZooKeeperException, self.client.create, "/abc")
1309-
1310- self.assertRaises(
1311- zookeeper.ZooKeeperException, self.client.set, "/abc", "123")
1312+ self.assertFailure(
1313+ self.client.get_children("/abc"), zookeeper.ZooKeeperException)
1314+
1315+ self.assertFailure(
1316+ self.client.create("/abc"), zookeeper.ZooKeeperException)
1317+
1318+ self.assertFailure(
1319+ self.client.set("/abc", "123"), zookeeper.ZooKeeperException)
1320
1321 def test_connect_multiple_raises(self):
1322 """
1323@@ -1181,8 +1223,9 @@
1324 d = self.client.connect()
1325
1326 def connect_again(client):
1327- self.assertRaises(
1328- zookeeper.ZooKeeperException, client.connect)
1329+ d = client.connect()
1330+ self.failUnlessFailure(d, zookeeper.ZooKeeperException)
1331+ return d
1332
1333 d.addCallback(connect_again)
1334 return d
1335@@ -1199,18 +1242,18 @@
1336 d = self.client.connect()
1337
1338 def verify_failure(client):
1339- self.assertRaises(
1340- zookeeper.ZooKeeperException, client.create, "/abc")
1341+ d = client.create("/abc")
1342+ self.failUnlessFailure(d, zookeeper.ZooKeeperException)
1343
1344 d.addCallback(verify_failure)
1345 return d
1346
1347 def test_connection_watcher(self):
1348 """
1349- A connection watcher can be set that recieves notices on when the
1350- connection state changes. Technically zookeeper would also use this as
1351- a global watcher for node state changes, but zkpython doesn't expose
1352- that api, as its mostly considered legacy.
1353+ A connection watcher can be set that receives notices on when
1354+ the connection state changes. Technically zookeeper would also
1355+ use this as a global watcher for node watches, but zkpython
1356+ doesn't expose that api, as its mostly considered legacy.
1357
1358 its out of scope to simulate a connection level event within unit tests
1359 such as the server restarting.
1360@@ -1243,3 +1286,13 @@
1361 If the client is not connected, closing returns None.
1362 """
1363 self.assertEqual(self.client.close(), None)
1364+
1365+ def test_invalid_connection_error_callback(self):
1366+ self.assertRaises(TypeError,
1367+ self.client.set_connection_error_callback,
1368+ None)
1369+
1370+ def test_invalid_session_callback(self):
1371+ self.assertRaises(TypeError,
1372+ self.client.set_session_callback,
1373+ None)
1374
1375=== modified file 'txzookeeper/tests/test_node.py'
1376--- txzookeeper/tests/test_node.py 2011-06-17 23:52:34 +0000
1377+++ txzookeeper/tests/test_node.py 2011-07-05 13:29:55 +0000
1378@@ -59,7 +59,7 @@
1379 def _make_digest_identity(self, credentials):
1380 user, password = credentials.split(":")
1381 digest = hashlib.new("sha1", credentials).digest()
1382- return "%s:%s"%(user, base64.b64encode(digest))
1383+ return "%s:%s" % (user, base64.b64encode(digest))
1384
1385 def test_node_name_and_path(self):
1386 """
1387
1388=== modified file 'txzookeeper/tests/test_queue.py'
1389--- txzookeeper/tests/test_queue.py 2011-06-17 23:52:34 +0000
1390+++ txzookeeper/tests/test_queue.py 2011-07-05 13:29:55 +0000
1391@@ -175,7 +175,7 @@
1392 watch = Deferred()
1393 self.mocker.result((succeed(["entry-000000"]), watch))
1394
1395- item_path = "%s/%s"%(path, "entry-000000")
1396+ item_path = "%s/%s" % (path, "entry-000000")
1397 mock_client.get(item_path)
1398 self.mocker.result(fail(SyntaxError("x")))
1399 self.mocker.replay()
1400@@ -230,13 +230,13 @@
1401 producer_done = Deferred()
1402
1403 def iteration(i):
1404- if len(items) == (item_count-1):
1405+ if len(items) == (item_count - 1):
1406 return producer_done.callback(None)
1407 items.append(i)
1408 queue.put(str(i))
1409
1410 for i in range(item_count):
1411- reactor.callLater(i*0.05, iteration, i)
1412+ reactor.callLater(i * 0.05, iteration, i)
1413 yield producer_done
1414 returnValue(items)
1415
1416@@ -284,7 +284,7 @@
1417 def producer(start, offset):
1418 client = yield self.open_client()
1419 q = self.queue_factory(path, client)
1420- for i in range(start, start+offset):
1421+ for i in range(start, start + offset):
1422 yield q.put(str(i))
1423 produce_results.append(str(i))
1424
1425@@ -311,7 +311,7 @@
1426 yield DeferredList(
1427 [consumer(8), consumer(8), consumer(4)])
1428
1429- err = set(produce_results)-set(consume_results)
1430+ err = set(produce_results) - set(consume_results)
1431 self.assertFalse(err)
1432
1433 self.assertEqual(len(consume_results), len(produce_results))
1434
1435=== modified file 'txzookeeper/tests/test_security.py'
1436--- txzookeeper/tests/test_security.py 2011-06-17 23:52:34 +0000
1437+++ txzookeeper/tests/test_security.py 2011-07-05 13:29:55 +0000
1438@@ -73,9 +73,9 @@
1439 def connect_users(self, *users):
1440 clients = []
1441 for name in users:
1442- ident_user = getattr(self, "ident_%s"%(name), None)
1443+ ident_user = getattr(self, "ident_%s" % (name), None)
1444 if ident_user is None:
1445- raise AttributeError("Invalid User %s"%(name))
1446+ raise AttributeError("Invalid User %s" % (name))
1447 client = ZookeeperClient("127.0.0.1:2181", 3000)
1448 clients.append(client)
1449 yield self.open_and_authenticate(client, ident_user)
1450
1451=== added file 'txzookeeper/tests/test_session.py'
1452--- txzookeeper/tests/test_session.py 1970-01-01 00:00:00 +0000
1453+++ txzookeeper/tests/test_session.py 2011-07-05 13:29:55 +0000
1454@@ -0,0 +1,245 @@
1455+
1456+import atexit
1457+import os
1458+
1459+import zookeeper
1460+
1461+from twisted.internet import reactor
1462+from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
1463+
1464+from txzookeeper import ZookeeperClient
1465+from txzookeeper.client import NotConnectedException, ConnectionException
1466+
1467+from txzookeeper.tests.common import ZookeeperCluster
1468+from txzookeeper.tests import ZookeeperTestCase
1469+
1470+
1471+ZK_HOME = os.environ.get("ZOOKEEPER_PATH")
1472+assert ZK_HOME, "ZOOKEEPER_PATH environment variable must be defined"
1473+
1474+CLUSTER = ZookeeperCluster(ZK_HOME)
1475+atexit.register(lambda cluster: cluster.terminate(), CLUSTER)
1476+
1477+
1478+class ClientSessionTests(ZookeeperTestCase):
1479+
1480+ def setUp(self):
1481+ super(ClientSessionTests, self).setUp()
1482+ self.cluster.start()
1483+ self.client = None
1484+ self.client2 = None
1485+ zookeeper.deterministic_conn_order(True)
1486+ zookeeper.set_debug_level(0)
1487+
1488+ def sleep(self, delay):
1489+ """Non-blocking sleep."""
1490+ deferred = Deferred()
1491+ reactor.callLater(delay, deferred.callback, None)
1492+ return deferred
1493+
1494+ @property
1495+ def cluster(self):
1496+ return CLUSTER
1497+
1498+ def tearDown(self):
1499+ super(ClientSessionTests, self).tearDown()
1500+ self.cluster.reset()
1501+
1502+ @inlineCallbacks
1503+ def test_client_session_migration(self):
1504+ """A client will automatically rotate servers to ensure a connection.
1505+
1506+ A client connected to multiple servers, will transparently
1507+ migrate amongst them, as individual servers can no longer be
1508+ reached. A client's session will be maintined.
1509+ """
1510+ # Connect to the Zookeeper Cluster
1511+ servers = ",".join([s.address for s in self.cluster])
1512+ self.client = ZookeeperClient(servers)
1513+ yield self.client.connect()
1514+ yield self.client.create("/hello", flags=zookeeper.EPHEMERAL)
1515+
1516+ # Shutdown the server the client is connected to
1517+ self.cluster[0].stop()
1518+
1519+ # Wait for the shutdown and cycle, if we don't wait we'll
1520+ # get a zookeeper connectionloss exception on occassion.
1521+ yield self.sleep(0.1)
1522+
1523+ self.assertTrue(self.client.connected)
1524+ exists = yield self.client.exists("/hello")
1525+ self.assertTrue(exists)
1526+
1527+ @inlineCallbacks
1528+ def test_client_watch_migration(self):
1529+ """On server rotation, extant watches are still active.
1530+
1531+ A client connected to multiple servers, will transparently
1532+ migrate amongst them, as individual servers can no longer be
1533+ reached. Watch deferreds issued from the same client instance will
1534+ continue to function as the session is maintained.
1535+ """
1536+ session_events = []
1537+
1538+ def session_event_callback(connection, e):
1539+ session_events.append(e)
1540+
1541+ # Connect to the Zookeeper Cluster
1542+ servers = ",".join([s.address for s in self.cluster])
1543+ self.client = ZookeeperClient(servers)
1544+ self.client.set_session_callback(session_event_callback)
1545+ yield self.client.connect()
1546+
1547+ # Setup a watch
1548+ yield self.client.create("/hello")
1549+ exists_d, watch_d = self.client.exists_and_watch("/hello")
1550+ yield exists_d
1551+
1552+ # Shutdown the server the client is connected to
1553+ self.cluster[0].stop()
1554+
1555+ # Wait for the shutdown and cycle, if we don't wait we'll
1556+ # get occasionally get a zookeeper connectionloss exception.
1557+ yield self.sleep(0.1)
1558+
1559+ # The session events that would have been ignored are sent
1560+ # to the session event callback.
1561+ self.assertTrue(session_events)
1562+ self.assertTrue(self.client.connected)
1563+
1564+ # If we delete the node, we'll see the watch fire.
1565+ yield self.client.delete("/hello")
1566+ event = yield watch_d
1567+ self.assertEqual(event.type_name, "deleted")
1568+ self.assertEqual(event.path, "/hello")
1569+
1570+ @inlineCallbacks
1571+ def test_connection_error_handler(self):
1572+ """A callback can be specified for connection errors.
1573+
1574+ We can specify a callback for connection errors, that
1575+ can perform recovery for a disconnected client, restablishing
1576+ """
1577+ @inlineCallbacks
1578+ def connection_error_handler(connection, error):
1579+ # On loss of the connection, reconnect the client w/ same session.
1580+ yield connection.connect(
1581+ self.cluster[1].address, client_id=connection.client_id)
1582+ returnValue(23)
1583+
1584+ self.client = ZookeeperClient(self.cluster[0].address)
1585+ self.client.set_connection_error_callback(connection_error_handler)
1586+ yield self.client.connect()
1587+
1588+ yield self.client.create("/hello")
1589+ exists_d, watch_d = self.client.exists_and_watch("/hello")
1590+ yield exists_d
1591+
1592+ # Shutdown the server the client is connected to
1593+ self.cluster[0].stop()
1594+ yield self.sleep(0.1)
1595+
1596+ # Results in connection loss exception, and invoking of error handler.
1597+ result = yield self.client.exists("/hello")
1598+
1599+ # The result of the error handler is returned to the api
1600+ self.assertEqual(result, 23)
1601+
1602+ exists = yield self.client.exists("/hello")
1603+ self.assertTrue(exists)
1604+
1605+ @inlineCallbacks
1606+ def test_client_session_expiration_event(self):
1607+ """A client which recieves a session expiration event.
1608+ """
1609+ session_events = []
1610+ events_received = Deferred()
1611+
1612+ def session_event_callback(connection, e):
1613+ session_events.append(e)
1614+ if len(session_events) == 4:
1615+ events_received.callback(True)
1616+
1617+ # Connect to a node in the cluster and establish a watch
1618+ self.client = ZookeeperClient(self.cluster[0].address)
1619+ self.client.set_session_callback(session_event_callback)
1620+ yield self.client.connect()
1621+
1622+ child_d, watch_d = self.client.get_children_and_watch("/")
1623+ yield child_d
1624+
1625+ # Connect a client to the same session on a different node.
1626+ self.client2 = ZookeeperClient(self.cluster[0].address)
1627+ yield self.client2.connect(client_id=self.client.client_id)
1628+
1629+ # Close the new client and wait for the event propogation
1630+ yield self.client2.close()
1631+
1632+ # It can take some time for this to propagate
1633+ yield events_received
1634+ self.assertEqual(len(session_events), 4)
1635+ self.assertEqual(session_events[-1].state_name, "expired")
1636+
1637+ # The connection is dead without reconnecting.
1638+ yield self.assertFailure(
1639+ self.client.exists("/"),
1640+ NotConnectedException, ConnectionException)
1641+
1642+ self.assertTrue(self.client.unrecoverable)
1643+
1644+ # If a reconnect attempt is made with a dead session id
1645+ #yield self.client.connect(client_id=self.client.client_id)
1646+
1647+ test_client_session_expiration_event.timeout = 10
1648+
1649+ @inlineCallbacks
1650+ def test_client_reconnect_session_on_different_server(self):
1651+ """On connection failure, An application can choose to use a
1652+ new connection with which to reconnect to a different member
1653+ of the zookeeper cluster, reacquiring the extant session.
1654+
1655+ A large obvious caveat to using a new client instance rather
1656+ than reconnecting the existing client, is that even though the
1657+ session has outstanding watches, the watch callbacks/deferreds
1658+ won't be active unless the client instance used to create them
1659+ is connected.
1660+ """
1661+ session_events = []
1662+
1663+ def session_event_callback(connection, e):
1664+ session_events.append(e)
1665+
1666+ # Connect to a node in the cluster and establish a watch
1667+ self.client = ZookeeperClient(self.cluster[2].address)
1668+ self.client.set_session_callback(session_event_callback)
1669+ yield self.client.connect()
1670+
1671+ yield self.client.create("/hello", flags=zookeeper.EPHEMERAL)
1672+ exists_d, watch_d = self.client.exists_and_watch("/hello")
1673+ yield exists_d
1674+
1675+ # Shutdown the server the client is connected to
1676+ self.cluster[2].stop()
1677+ yield self.sleep(0.1)
1678+
1679+ # Verify we got a session event regarding the down server
1680+ self.assertTrue(session_events)
1681+
1682+ # Open up a new connection to a different server with same session
1683+ self.client2 = ZookeeperClient(self.cluster[0].address)
1684+ yield self.client2.connect(client_id=self.client.client_id)
1685+
1686+ # Close the old disconnected client
1687+ self.client.close()
1688+
1689+ # Verify the ephemeral still exists
1690+ exists = yield self.client2.exists("/hello")
1691+ self.assertTrue(exists)
1692+
1693+ # Destroy the session and reconnect
1694+ self.client2.close()
1695+ yield self.client.connect(self.cluster[0].address)
1696+
1697+ # Ephemeral is destroyed when the session closed.
1698+ exists = yield self.client.exists("/hello")
1699+ self.assertFalse(exists)
1700
1701=== modified file 'txzookeeper/tests/test_utils.py'
1702--- txzookeeper/tests/test_utils.py 2011-06-17 23:52:34 +0000
1703+++ txzookeeper/tests/test_utils.py 2011-07-05 13:29:55 +0000
1704@@ -34,7 +34,7 @@
1705 def update_function_increment(self, content, stat):
1706 if not content:
1707 return str(0)
1708- return str(int(content)+1)
1709+ return str(int(content) + 1)
1710
1711 def setUp(self):
1712 super(RetryChangeTest, self).setUp()
1713
1714=== modified file 'txzookeeper/tests/utils.py'
1715--- txzookeeper/tests/utils.py 2011-06-17 23:52:34 +0000
1716+++ txzookeeper/tests/utils.py 2011-07-05 13:29:55 +0000
1717@@ -25,9 +25,9 @@
1718 Destroy all the nodes in zookeeper (typically under a chroot for testing)
1719 """
1720 for child in zookeeper.get_children(handle, path):
1721- if child == "zookeeper": # skip the metadata node
1722+ if child == "zookeeper": # skip the metadata node
1723 continue
1724- child_path = "/"+("%s/%s"%(path, child)).strip("/")
1725+ child_path = "/" + ("%s/%s" % (path, child)).strip("/")
1726 try:
1727 deleteTree(child_path, handle)
1728 zookeeper.delete(handle, child_path, -1)

Subscribers

People subscribed via source and target branches

to all changes: