Merge lp:~free.ekanayaka/landscape-client/reconnect into lp:~landscape/landscape-client/amp-trunk
- reconnect
- Merge into amp-trunk
Status: | Merged |
---|---|
Merge reported by: | Free Ekanayaka |
Merged at revision: | not available |
Proposed branch: | lp:~free.ekanayaka/landscape-client/reconnect |
Merge into: | lp:~landscape/landscape-client/amp-trunk |
Diff against target: |
688 lines (+486/-46) 3 files modified
landscape/__init__.py (+1/-1) landscape/lib/amp.py (+179/-25) landscape/lib/tests/test_amp.py (+306/-20) |
To merge this branch: | bzr merge lp:~free.ekanayaka/landscape-client/reconnect |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Free Ekanayaka (community) | Approve | ||
Thomas Herve (community) | Needs Fixing | ||
Jamu Kakar (community) | Approve | ||
Review via email: mp+18257@code.launchpad.net |
Commit message
Description of the change
- 221. By Free Ekanayaka
-
Pass a callback/errback pair to add_notifier
Thomas Herve (therve) wrote : | # |
[1] The tests in MethodCallClien
[2]
+ ports.append(
+
+ def assert_
+ self.assertTrue
+ client_
+ protocol.
+ ports[0]
You shouldn't rely on the test to succeed to close the port connection, otherwise it's going to give you spurious errors. You should use addCleanup just after the listenUNIX.
+1 with most of the callLater removed from tests.
- 222. By Free Ekanayaka
-
Make MethodCallClien
tFactoryTest tests time-independent (therve [1]) - 223. By Free Ekanayaka
-
Fix typos (Jamu [2] to [6])
Free Ekanayaka (free.ekanayaka) wrote : | # |
Jamu, Thomas thanks for the reviews!
@Jamu: Hehe, I didn't hack on Erlang, but I looked at some code recently which was using that pattern and I found it nice, because it feels a good hint for the reader.
@Therve: I fixed all the tests in MethodCallClien
- 224. By Free Ekanayaka
-
Reactor-less testing for MethodCallClien
tFactory - 225. By Free Ekanayaka
-
Fix testing on hardy
Free Ekanayaka (free.ekanayaka) wrote : | # |
I discussed this a little bit with Thomas on IRC and for now we're going to leave the tests time-dependent. Even though this makes them breakable in principle, the rational is that making them time-independent would require some deeper mocking making them less "real-world" and meaningful. In practice I don't expect them to break easily, however if we start to see problems in the buildbots we can think about changing them. Thomas +1'ed for merging on IRC.
Preview Diff
1 | === modified file 'landscape/__init__.py' |
2 | --- landscape/__init__.py 2009-10-27 12:45:38 +0000 |
3 | +++ landscape/__init__.py 2010-02-12 16:06:15 +0000 |
4 | @@ -1,4 +1,4 @@ |
5 | -DEBIAN_REVISION = "" |
6 | +DEBIAN_REVISION = "-0ubuntu0.9.10.0" |
7 | UPSTREAM_VERSION = "1.4.0" |
8 | VERSION = "%s%s" % (UPSTREAM_VERSION, DEBIAN_REVISION) |
9 | API = "3.2" |
10 | |
11 | === modified file 'landscape/lib/amp.py' |
12 | --- landscape/lib/amp.py 2010-01-25 08:28:36 +0000 |
13 | +++ landscape/lib/amp.py 2010-02-12 16:06:15 +0000 |
14 | @@ -2,7 +2,7 @@ |
15 | |
16 | from uuid import uuid4 |
17 | from twisted.internet.defer import Deferred, maybeDeferred |
18 | -from twisted.internet.protocol import ServerFactory, ClientFactory |
19 | +from twisted.internet.protocol import ServerFactory, ReconnectingClientFactory |
20 | from twisted.protocols.amp import Argument, String, Command, AMP |
21 | from twisted.python.failure import Failure |
22 | |
23 | @@ -212,25 +212,64 @@ |
24 | self.object = object |
25 | |
26 | |
27 | -class MethodCallClientFactory(ClientFactory): |
28 | - """Factory for building L{AMP} connections to L{MethodCall} servers.""" |
29 | +class MethodCallClientFactory(ReconnectingClientFactory): |
30 | + """Factory for building L{AMP} connections to L{MethodCall} servers. |
31 | + |
32 | + If the connection fails or is lost the factory will keep retrying to |
33 | + establish it. |
34 | + |
35 | + @cvar protocol: The factory used to build protocol instances. |
36 | + @cvar factor: The time factor by which the delay between two subsequent |
37 | + connection retries will increase. |
38 | + """ |
39 | |
40 | protocol = MethodCallClientProtocol |
41 | + factor = 1.6180339887498948 |
42 | |
43 | - def __init__(self, reactor, notifier): |
44 | + def __init__(self, reactor): |
45 | """ |
46 | - @param reactor: The reactor used by the created protocols to schedule |
47 | - notifications and timeouts. |
48 | - @param notifier: A function that will be called when the connection is |
49 | - established. It will be passed the protocol instance as argument. |
50 | + @param reactor: The reactor used by the created protocols |
51 | + to schedule notifications and timeouts. |
52 | """ |
53 | self.reactor = reactor |
54 | - self._notifier = notifier |
55 | + self.clock = self.reactor |
56 | + self._notifiers = [] |
57 | + |
58 | + def add_notifier(self, callback, errback=None): |
59 | + """Call the given function on connection, reconnection or give up. |
60 | + |
61 | + @param notifier: A function that will be called when the factory builds |
62 | + a new connected protocol or gives up connecting. It will be passed |
63 | + the new protocol instance as argument, or the connectionf failure. |
64 | + """ |
65 | + self._notifiers.append((callback, errback)) |
66 | + |
67 | + def remove_notifier(self, callback, errback=None): |
68 | + """Remove a notifier.""" |
69 | + self._notifiers.remove((callback, errback)) |
70 | + |
71 | + def notify_success(self, *args, **kwargs): |
72 | + """Notify all registered notifier callbacks.""" |
73 | + for callback, _ in self._notifiers: |
74 | + self.reactor.callLater(0, callback, *args, **kwargs) |
75 | + |
76 | + def notify_failure(self, failure): |
77 | + """Notify all registered notifier errbacks.""" |
78 | + for _, errback in self._notifiers: |
79 | + if errback is not None: |
80 | + self.reactor.callLater(0, errback, failure) |
81 | + |
82 | + def clientConnectionFailed(self, connector, reason): |
83 | + ReconnectingClientFactory.clientConnectionFailed(self, connector, |
84 | + reason) |
85 | + if self.maxRetries is not None and (self.retries > self.maxRetries): |
86 | + self.notify_failure(reason) # Give up |
87 | |
88 | def buildProtocol(self, addr): |
89 | + self.resetDelay() |
90 | protocol = self.protocol() |
91 | protocol.factory = self |
92 | - self.reactor.callLater(0, self._notifier, protocol) |
93 | + self.notify_success(protocol) |
94 | return protocol |
95 | |
96 | |
97 | @@ -242,12 +281,24 @@ |
98 | the remote object exposed by the peer. |
99 | """ |
100 | |
101 | - def __init__(self, protocol): |
102 | + def __init__(self, protocol, retry_on_reconnect=False, timeout=None): |
103 | """ |
104 | @param protocol: A reference to a connected L{AMP} protocol instance, |
105 | which will be used to send L{MethodCall} commands. |
106 | + @param retry_on_reconnect: If C{True}, this L{RemoteObject} will retry |
107 | + to perform again requests that failed due to a lost connection, as |
108 | + soon as a new connection is available. |
109 | + @param timeout: A timeout for failed requests, if the L{RemoteObject} |
110 | + can't perform them again successfully within this number of |
111 | + seconds, they will errback with a L{MethodCallError}. |
112 | """ |
113 | self._protocol = protocol |
114 | + self._factory = protocol.factory |
115 | + self._reactor = protocol.factory.reactor |
116 | + self._retry_on_reconnect = retry_on_reconnect |
117 | + self._timeout = timeout |
118 | + self._pending_requests = {} |
119 | + self._factory.add_notifier(self._handle_reconnect) |
120 | |
121 | def __getattr__(self, method): |
122 | """Return a function sending a L{MethodCall} for the given C{method}. |
123 | @@ -262,10 +313,97 @@ |
124 | result = self._protocol.send_method_call(method=method, |
125 | args=args, |
126 | kwargs=kwargs) |
127 | - return result.addCallback(lambda response: response["result"]) |
128 | + deferred = Deferred() |
129 | + result.addCallback(self._handle_response, deferred) |
130 | + result.addErrback(self._handle_failure, method, args, kwargs, |
131 | + deferred) |
132 | + return deferred |
133 | |
134 | return send_method_call |
135 | |
136 | + def _handle_response(self, response, deferred, call=None): |
137 | + """Handles a successful L{MethodCall} response. |
138 | + |
139 | + @param response: The L{MethodCall} response. |
140 | + @param deferred: The deferred that was returned to the caller. |
141 | + @param call: If not C{None}, the scheduled timeout call associated with |
142 | + the given deferred. |
143 | + """ |
144 | + result = response["result"] |
145 | + if call is not None: |
146 | + call.cancel() # This is a successful retry, cancel the timeout. |
147 | + deferred.callback(result) |
148 | + |
149 | + def _handle_failure(self, failure, method, args, kwargs, deferred, |
150 | + call=None): |
151 | + """Called when a L{MethodCall} command fails. |
152 | + |
153 | + If a failure is due to a connection error and if C{retry_on_reconnect} |
154 | + is C{True}, we will try to perform the requested L{MethodCall} again |
155 | + as soon as a new connection becomes available, giving up after the |
156 | + specified C{timeout}, if any. |
157 | + |
158 | + @param failure: The L{Failure} raised by the requested L{MethodCall}. |
159 | + @param name: The method name associated with the failed L{MethodCall}. |
160 | + @param args: The positional arguments of the failed L{MethodCall}. |
161 | + @param kwargs: The keyword arguments of the failed L{MethodCall}. |
162 | + @param deferred: The deferred that was returned to the caller. |
163 | + @param call: If not C{None}, the scheduled timeout call associated with |
164 | + the given deferred. |
165 | + """ |
166 | + is_method_call_error = failure.type is MethodCallError |
167 | + dont_retry = self._retry_on_reconnect == False |
168 | + |
169 | + if is_method_call_error or dont_retry: |
170 | + # This means either that the connection is working, and a |
171 | + # MethodCall protocol error occured, or that we gave up |
172 | + # trying and raised a timeout. In any case just propagate |
173 | + # the error. |
174 | + if deferred in self._pending_requests: |
175 | + self._pending_requests.pop(deferred) |
176 | + if call: |
177 | + call.cancel() |
178 | + deferred.errback(failure) |
179 | + return |
180 | + |
181 | + if self._timeout and call is None: |
182 | + # This is the first failure for this request, let's schedule a |
183 | + # timeout call. |
184 | + timeout = Failure(MethodCallError("timeout")) |
185 | + call = self._reactor.callLater(self._timeout, |
186 | + self._handle_failure, |
187 | + timeout, method, args, |
188 | + kwargs, deferred=deferred) |
189 | + |
190 | + self._pending_requests[deferred] = (method, args, kwargs, call) |
191 | + |
192 | + def _handle_reconnect(self, protocol): |
193 | + """Handles a reconnection. |
194 | + |
195 | + @param protocol: The newly connected protocol instance. |
196 | + """ |
197 | + self._protocol = protocol |
198 | + if self._retry_on_reconnect: |
199 | + self._retry() |
200 | + |
201 | + def _retry(self): |
202 | + """Try to perform again requests that failed.""" |
203 | + |
204 | + # We need to copy the requests list before iterating over it, because |
205 | + # if we are actually still disconnected, callRemote will return a |
206 | + # failed deferred and the _handle_failure errback will be executed |
207 | + # synchronously during the loop, modifing the requests list itself. |
208 | + requests = self._pending_requests.copy() |
209 | + self._pending_requests.clear() |
210 | + |
211 | + while requests: |
212 | + deferred, (method, args, kwargs, call) = requests.popitem() |
213 | + result = self._protocol.send_method_call(method, args, kwargs) |
214 | + result.addCallback(self._handle_response, |
215 | + deferred=deferred, call=call) |
216 | + result.addErrback(self._handle_failure, method, args, kwargs, |
217 | + deferred=deferred, call=call) |
218 | + |
219 | |
220 | class RemoteObjectCreator(object): |
221 | """Connect to remote objects exposed by a L{MethodCallProtocol}.""" |
222 | @@ -273,32 +411,48 @@ |
223 | factory = MethodCallClientFactory |
224 | remote = RemoteObject |
225 | |
226 | - def __init__(self, reactor, socket_path): |
227 | + def __init__(self, reactor, socket_path, *args, **kwargs): |
228 | """ |
229 | @param reactor: A reactor able to connect to Unix sockets. |
230 | @param socket: The path to the socket we want to connect to. |
231 | + @param args: Arguments to passed to the created L{RemoteObject}. |
232 | + @param kwargs: Keyword arguments for the created L{RemoteObject}. |
233 | """ |
234 | self._socket_path = socket_path |
235 | self._reactor = reactor |
236 | - self._remote = None |
237 | + self._args = args |
238 | + self._kwargs = kwargs |
239 | |
240 | - def connect(self): |
241 | + def connect(self, max_retries=None): |
242 | """Connect to a remote object exposed by a L{MethodCallProtocol}. |
243 | |
244 | This method will connect to the socket provided in the constructor |
245 | and return a L{Deferred} resulting in a connected L{RemoteObject}. |
246 | + |
247 | + @param max_retries: If not C{None} give up try to connect after this |
248 | + amount of times. |
249 | """ |
250 | - deferred = Deferred() |
251 | - factory = self.factory(self._reactor, deferred.callback) |
252 | - self._reactor.connectUNIX(self._socket_path, factory) |
253 | - deferred.addCallback(self._connection_made) |
254 | - return deferred |
255 | - |
256 | - def _connection_made(self, protocol): |
257 | - """Called when the connection has been established""" |
258 | - self._remote = self.remote(protocol) |
259 | - return self._remote |
260 | + self._connected = Deferred() |
261 | + self._factory = self.factory(self._reactor) |
262 | + self._factory.maxRetries = max_retries |
263 | + self._factory.add_notifier(self._success, self._failure) |
264 | + self._reactor.connectUNIX(self._socket_path, self._factory) |
265 | + return self._connected |
266 | + |
267 | + def _success(self, result): |
268 | + """Called when the first connection has been established""" |
269 | + |
270 | + # We did our job, remove our own notifier and let the remote object |
271 | + # handle reconnections. |
272 | + self._factory.remove_notifier(self._success, self._failure) |
273 | + self._remote = self.remote(result, *self._args, **self._kwargs) |
274 | + self._connected.callback(self._remote) |
275 | + |
276 | + def _failure(self, failure): |
277 | + """Called when the first connection has failed""" |
278 | + self._connected.errback(failure) |
279 | |
280 | def disconnect(self): |
281 | """Disconnect the L{RemoteObject} that we have created.""" |
282 | + self._factory.stopTrying() |
283 | self._remote._protocol.transport.loseConnection() |
284 | |
285 | === modified file 'landscape/lib/tests/test_amp.py' |
286 | --- landscape/lib/tests/test_amp.py 2010-01-20 09:20:46 +0000 |
287 | +++ landscape/lib/tests/test_amp.py 2010-02-12 16:06:15 +0000 |
288 | @@ -1,12 +1,15 @@ |
289 | from twisted.internet import reactor |
290 | -from twisted.internet.defer import Deferred |
291 | +from twisted.internet.defer import Deferred, DeferredList |
292 | from twisted.internet.protocol import ClientCreator |
293 | +from twisted.internet.error import ConnectionDone, ConnectError |
294 | +from twisted.internet.task import Clock |
295 | |
296 | from landscape.lib.amp import ( |
297 | MethodCallError, MethodCallServerProtocol, |
298 | MethodCallClientProtocol, MethodCallServerFactory, |
299 | MethodCallClientFactory, RemoteObject, RemoteObjectCreator) |
300 | from landscape.tests.helpers import LandscapeTest |
301 | +from landscape.tests.mocker import KWARGS |
302 | |
303 | |
304 | class WordsException(Exception): |
305 | @@ -81,7 +84,7 @@ |
306 | return deferred |
307 | |
308 | |
309 | -class WordsProtocol(MethodCallServerProtocol): |
310 | +class WordsServerProtocol(MethodCallServerProtocol): |
311 | |
312 | methods = ["empty", |
313 | "motd", |
314 | @@ -96,19 +99,39 @@ |
315 | "google"] |
316 | |
317 | |
318 | +class WordsClientProtocol(MethodCallClientProtocol): |
319 | + |
320 | + timeout = 0.1 |
321 | + |
322 | + |
323 | +class WordsServerFactory(MethodCallServerFactory): |
324 | + |
325 | + protocol = WordsServerProtocol |
326 | + |
327 | + |
328 | +class WordsClientFactory(MethodCallClientFactory): |
329 | + |
330 | + protocol = WordsClientProtocol |
331 | + factor = 0.19 |
332 | + |
333 | + |
334 | +class RemoteWordsCreator(RemoteObjectCreator): |
335 | + |
336 | + factory = WordsClientFactory |
337 | + |
338 | + |
339 | class MethodCallProtocolTest(LandscapeTest): |
340 | |
341 | def setUp(self): |
342 | super(MethodCallProtocolTest, self).setUp() |
343 | socket = self.mktemp() |
344 | - factory = MethodCallServerFactory(Words()) |
345 | - factory.protocol = WordsProtocol |
346 | + factory = WordsServerFactory(Words()) |
347 | self.port = reactor.listenUNIX(socket, factory) |
348 | |
349 | def set_protocol(protocol): |
350 | self.protocol = protocol |
351 | |
352 | - connector = ClientCreator(reactor, MethodCallClientProtocol) |
353 | + connector = ClientCreator(reactor, WordsClientProtocol) |
354 | connected = connector.connectUNIX(socket) |
355 | return connected.addCallback(set_protocol) |
356 | |
357 | @@ -240,17 +263,18 @@ |
358 | def setUp(self): |
359 | super(RemoteObjectTest, self).setUp() |
360 | socket = self.mktemp() |
361 | - server_factory = MethodCallServerFactory(Words()) |
362 | - server_factory.protocol = WordsProtocol |
363 | + server_factory = WordsServerFactory(Words()) |
364 | self.port = reactor.listenUNIX(socket, server_factory) |
365 | |
366 | def set_remote(protocol): |
367 | self.protocol = protocol |
368 | self.words = RemoteObject(protocol) |
369 | + client_factory.stopTrying() |
370 | |
371 | connected = Deferred() |
372 | connected.addCallback(set_remote) |
373 | - client_factory = MethodCallClientFactory(reactor, connected.callback) |
374 | + client_factory = WordsClientFactory(reactor) |
375 | + client_factory.add_notifier(connected.callback) |
376 | reactor.connectUNIX(socket, client_factory) |
377 | return connected |
378 | |
379 | @@ -385,7 +409,6 @@ |
380 | If the peer protocol doesn't send a response for a deferred within |
381 | the given timeout, the method call fails. |
382 | """ |
383 | - self.protocol.timeout = 0.1 |
384 | result = self.words.google("Long query") |
385 | return self.assertFailure(result, MethodCallError) |
386 | |
387 | @@ -407,19 +430,83 @@ |
388 | return result.addCallback(assert_late_response_is_handled) |
389 | |
390 | |
391 | +class MethodCallClientFactoryTest(LandscapeTest): |
392 | + |
393 | + def setUp(self): |
394 | + super(MethodCallClientFactoryTest, self).setUp() |
395 | + self.clock = Clock() |
396 | + self.factory = WordsClientFactory(self.clock) |
397 | + |
398 | + def test_add_notifier(self): |
399 | + """ |
400 | + The L{MethodCallClientFactory.add_notifier} method can be used to |
401 | + add a callback function to be called when a connection is made and |
402 | + a new protocol instance built. |
403 | + """ |
404 | + protocol = self.factory.protocol() |
405 | + self.factory.protocol = lambda: protocol |
406 | + callback = self.mocker.mock() |
407 | + callback(protocol) |
408 | + self.mocker.replay() |
409 | + self.factory.add_notifier(callback) |
410 | + self.factory.buildProtocol(None) |
411 | + self.clock.advance(0) |
412 | + |
413 | + def test_remove_notifier(self): |
414 | + """ |
415 | + The L{MethodCallClientFactory.remove_notifier} method can be used to |
416 | + remove a previously added notifier callback. |
417 | + """ |
418 | + callback = lambda protocol: 1 / 0 |
419 | + self.factory.add_notifier(callback) |
420 | + self.factory.remove_notifier(callback) |
421 | + self.factory.buildProtocol(None) |
422 | + self.clock.advance(0) |
423 | + |
424 | + def test_client_connection_failed(self): |
425 | + """ |
426 | + The L{MethodCallClientFactory} keeps trying to connect if maxRetries |
427 | + is not reached. |
428 | + """ |
429 | + # This is sub-optimal but the ReconnectingFactory in Hardy's Twisted |
430 | + # doesn't support task.Clock |
431 | + self.factory.retry = self.mocker.mock() |
432 | + self.factory.retry(KWARGS) |
433 | + self.mocker.replay() |
434 | + self.assertEquals(self.factory.retries, 0) |
435 | + self.factory.clientConnectionFailed(None, None) |
436 | + |
437 | + def test_client_connection_failed_with_max_retries_reached(self): |
438 | + """ |
439 | + The L{MethodCallClientFactory} stops trying to connect if maxRetries |
440 | + is reached. |
441 | + """ |
442 | + callback = lambda protocol: 1 / 0 |
443 | + errback = self.mocker.mock() |
444 | + errback("failure") |
445 | + self.mocker.replay() |
446 | + |
447 | + self.factory.add_notifier(callback, errback) |
448 | + self.factory.maxRetries = 1 |
449 | + self.factory.retries = self.factory.maxRetries |
450 | + self.factory.clientConnectionFailed(object(), "failure") |
451 | + self.clock.advance(0) |
452 | + |
453 | + |
454 | class RemoteObjectCreatorTest(LandscapeTest): |
455 | |
456 | def setUp(self): |
457 | super(RemoteObjectCreatorTest, self).setUp() |
458 | - socket = self.mktemp() |
459 | - factory = MethodCallServerFactory(Words()) |
460 | - factory.protocol = WordsProtocol |
461 | - self.port = reactor.listenUNIX(socket, factory) |
462 | - |
463 | - def set_remote(remote): |
464 | - self.words = remote |
465 | - |
466 | - self.connector = RemoteObjectCreator(reactor, socket) |
467 | + self.socket = self.mktemp() |
468 | + self.server_factory = WordsServerFactory(Words()) |
469 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
470 | + self.connector = RemoteWordsCreator(reactor, self.socket, |
471 | + retry_on_reconnect=True, |
472 | + timeout=0.7) |
473 | + |
474 | + def set_remote(words): |
475 | + self.words = words |
476 | + |
477 | connected = self.connector.connect() |
478 | return connected.addCallback(set_remote) |
479 | |
480 | @@ -430,7 +517,206 @@ |
481 | |
482 | def test_connect(self): |
483 | """ |
484 | - A L{RemoteObject} can send L{MethodCall}s without arguments and with |
485 | - an empty response. |
486 | + The L{RemoteObject} resulting form the deferred returned by |
487 | + L{RemoteObjectCreator.connect} is properly connected to the |
488 | + remote peer. |
489 | """ |
490 | return self.assertSuccess(self.words.empty()) |
491 | + |
492 | + def test_connect_with_max_retries(self): |
493 | + """ |
494 | + If C{max_retries} is passed to the L{RemoteObjectCreator} method, |
495 | + then it will give up trying to connect after that amout of times. |
496 | + """ |
497 | + self.connector.disconnect() |
498 | + self.port.stopListening() |
499 | + |
500 | + def reconnect(ignored): |
501 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
502 | + return self.connector.connect() |
503 | + |
504 | + result = self.connector.connect(max_retries=0) |
505 | + self.assertFailure(result, ConnectError) |
506 | + return result.addCallback(reconnect) |
507 | + |
508 | + def test_reconnect(self): |
509 | + """ |
510 | + If the connection is lost, the L{RemoteObject} created by the creator |
511 | + will transparently handle the reconnection. |
512 | + """ |
513 | + self.words._protocol.transport.loseConnection() |
514 | + self.port.stopListening() |
515 | + |
516 | + def restart_listening(): |
517 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
518 | + reactor.callLater(0.3, assert_remote) |
519 | + |
520 | + def assert_remote(): |
521 | + result = self.words.empty() |
522 | + result.addCallback(lambda x: reconnected.callback(None)) |
523 | + return result |
524 | + |
525 | + reactor.callLater(0.01, restart_listening) |
526 | + reconnected = Deferred() |
527 | + return reconnected |
528 | + |
529 | + def test_method_call_error(self): |
530 | + """ |
531 | + If a L{MethodCall} fails due to a L{MethodCallError}, the |
532 | + L{RemoteObject} won't try to perform it again. |
533 | + """ |
534 | + return self.assertFailure(self.words.secret(), MethodCallError) |
535 | + |
536 | + def test_retry(self): |
537 | + """ |
538 | + If the connection is lost, the L{RemoteObject} created by the creator |
539 | + will transparently retry to perform the L{MethodCall} requests that |
540 | + failed due to the broken connection. |
541 | + """ |
542 | + self.words._protocol.transport.loseConnection() |
543 | + self.port.stopListening() |
544 | + |
545 | + def restart_listening(): |
546 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
547 | + |
548 | + reactor.callLater(0.1, restart_listening) |
549 | + return self.words.empty() |
550 | + |
551 | + def test_retry_with_method_call_error(self): |
552 | + """ |
553 | + If a retried L{MethodCall} request fails due to a L{MethodCallError}, |
554 | + the L{RemoteObject} will properly propagate the error to the original |
555 | + caller. |
556 | + """ |
557 | + self.words._protocol.transport.loseConnection() |
558 | + self.port.stopListening() |
559 | + |
560 | + def restart_listening(): |
561 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
562 | + |
563 | + def assert_failure(error): |
564 | + self.assertEquals(str(error), "Forbidden method 'secret'") |
565 | + |
566 | + reactor.callLater(0.5, restart_listening) |
567 | + result = self.words.secret() |
568 | + self.assertFailure(result, MethodCallError) |
569 | + return result.addCallback(assert_failure) |
570 | + |
571 | + def test_wb_retry_with_while_still_disconnected(self): |
572 | + """ |
573 | + The L{RemoteObject._retry} method gets called as soon as a new |
574 | + connection is ready. If for whatever reason the connection drops |
575 | + again very quickly, the C{_retry} method will behave as expected. |
576 | + """ |
577 | + self.words._protocol.transport.loseConnection() |
578 | + self.port.stopListening() |
579 | + |
580 | + def handle_reconnect(protocol): |
581 | + # In this precise moment we have a newly connected protocol |
582 | + self.words._protocol = protocol |
583 | + |
584 | + # Pretend that the connection is lost again very quickly |
585 | + protocol.transport.loseConnection() |
586 | + self.port.stopListening() |
587 | + |
588 | + # Force RemoteObject._retry to run using a disconnected protocol |
589 | + reactor.callLater(0, self.words._retry) |
590 | + |
591 | + # Restore the real handler and start listening again very soon |
592 | + self.connector._factory.remove_notifier(handle_reconnect) |
593 | + self.connector._factory.add_notifier(self.words._handle_reconnect) |
594 | + reactor.callLater(0.2, restart_listening) |
595 | + |
596 | + def restart_listening(): |
597 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
598 | + |
599 | + def assert_failure(error): |
600 | + self.assertEquals(str(error), "Forbidden method 'secret'") |
601 | + |
602 | + # Use our own reconnect handler |
603 | + self.connector._factory.remove_notifier(self.words._handle_reconnect) |
604 | + self.connector._factory.add_notifier(handle_reconnect) |
605 | + |
606 | + reactor.callLater(0.2, restart_listening) |
607 | + result = self.words.secret() |
608 | + self.assertFailure(result, MethodCallError) |
609 | + return result.addCallback(assert_failure) |
610 | + |
611 | + def test_retry_with_many_method_calls(self): |
612 | + """ |
613 | + If several L{MethodCall} requests were issued while disconnected, they |
614 | + will be all eventually completed when the connection gets established |
615 | + again. |
616 | + """ |
617 | + self.words._protocol.transport.loseConnection() |
618 | + self.port.stopListening() |
619 | + |
620 | + def restart_listening(): |
621 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
622 | + |
623 | + def assert_guess(response): |
624 | + self.assertEquals(response, "Guessed!") |
625 | + |
626 | + def assert_secret(failure): |
627 | + self.assertEquals(str(failure.value), "Forbidden method 'secret'") |
628 | + |
629 | + def assert_motd(response): |
630 | + self.assertEquals(response, "Words are cool") |
631 | + |
632 | + reactor.callLater(0.1, restart_listening) |
633 | + |
634 | + results = [self.words.guess("word", "cool", value=4), |
635 | + self.words.secret(), |
636 | + self.words.motd()] |
637 | + results[0].addCallback(assert_guess) |
638 | + results[1].addErrback(assert_secret) |
639 | + results[2].addCallback(assert_motd) |
640 | + return DeferredList(results) |
641 | + |
642 | + def test_retry_without_retry_on_reconnect(self): |
643 | + """ |
644 | + If C{retry_on_reconnect} is C{False}, the L{RemoteObject} object won't |
645 | + retry to perform requests which failed because the connection was |
646 | + lost, however requests made after a reconnection will still succeed. |
647 | + """ |
648 | + self.words._protocol.transport.loseConnection() |
649 | + self.port.stopListening() |
650 | + |
651 | + def restart_listening(): |
652 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
653 | + reactor.callLater(0.3, assert_reconnected) |
654 | + |
655 | + def assert_reconnected(): |
656 | + result = self.words.empty() |
657 | + result.addCallback(lambda x: reconnected.callback(None)) |
658 | + return result |
659 | + |
660 | + reactor.callLater(0.1, restart_listening) |
661 | + self.words._retry_on_reconnect = False |
662 | + result = self.words.empty() |
663 | + self.assertFailure(result, ConnectionDone) |
664 | + reconnected = Deferred() |
665 | + return result.addCallback(lambda x: reconnected) |
666 | + |
667 | + def test_retry_with_timeout(self): |
668 | + """ |
669 | + If a C{timeout} is set, the L{RemoteObject} object will errback failed |
670 | + L{MethodCall}s after that amount of seconds, without retrying them when |
671 | + the connection established again. |
672 | + """ |
673 | + self.words._protocol.transport.loseConnection() |
674 | + self.port.stopListening() |
675 | + |
676 | + def restart_listening(): |
677 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
678 | + reactor.callLater(0.1, reconnected.callback, None) |
679 | + |
680 | + def assert_failure(error): |
681 | + self.assertEquals(str(error), "timeout") |
682 | + return reconnected |
683 | + |
684 | + reactor.callLater(0.9, restart_listening) |
685 | + result = self.words.empty() |
686 | + self.assertFailure(result, MethodCallError) |
687 | + reconnected = Deferred() |
688 | + return result.addCallback(assert_failure) |
[]
+ for callback, _ in self._notifiers:
Been hacking Erlang lately? The _ for an unused variable reminds me
of that. :)
[2]
+ @cvar factor: The time factor by which the delay between two subsequent
+ connection retries will decrease.
I believe you need to s/decrease/ increase/ here, no?
[3]
+ """Call the given function on connection, reconnection or giveup.
s/giveup/give up/
[4]
+ to perfom again requests that failed due to a lost connection, as
s/perfom/perform/
[5]
+ can't perform them again successfully within this amout of seconds,
s/amout/number/
[6]
+ @param failure: The L{Failure} raised by the requested L{MethodCall}
+ @param name: The method name associated with the failed L{MethodCall}
and
+ """Called when the first connection has been established"""
These sentences are missing periods.
Very nice branch. It hurt my brain a bit, but it looks good. :) +1!