Merge lp:~free.ekanayaka/landscape-client/reconnect into lp:~landscape/landscape-client/amp-trunk

Proposed by Free Ekanayaka
Status: Merged
Merge reported by: Free Ekanayaka
Merged at revision: not available
Proposed branch: lp:~free.ekanayaka/landscape-client/reconnect
Merge into: lp:~landscape/landscape-client/amp-trunk
Diff against target: 688 lines (+486/-46)
3 files modified
landscape/__init__.py (+1/-1)
landscape/lib/amp.py (+179/-25)
landscape/lib/tests/test_amp.py (+306/-20)
To merge this branch: bzr merge lp:~free.ekanayaka/landscape-client/reconnect
Reviewer Review Type Date Requested Status
Free Ekanayaka (community) Approve
Thomas Herve (community) Needs Fixing
Jamu Kakar (community) Approve
Review via email: mp+18257@code.launchpad.net
To post a comment you must log in.
221. By Free Ekanayaka

Pass a callback/errback pair to add_notifier

Revision history for this message
Jamu Kakar (jkakar) wrote :

[]

+ for callback, _ in self._notifiers:

Been hacking Erlang lately? The _ for an unused variable reminds me
of that. :)

[2]

+ @cvar factor: The time factor by which the delay between two subsequent
+ connection retries will decrease.

I believe you need to s/decrease/increase/ here, no?

[3]

+ """Call the given function on connection, reconnection or giveup.

s/giveup/give up/

[4]

+ to perfom again requests that failed due to a lost connection, as

s/perfom/perform/

[5]

+ can't perform them again successfully within this amout of seconds,

s/amout/number/

[6]

+ @param failure: The L{Failure} raised by the requested L{MethodCall}
+ @param name: The method name associated with the failed L{MethodCall}

and

+ """Called when the first connection has been established"""

These sentences are missing periods.

Very nice branch. It hurt my brain a bit, but it looks good. :) +1!

review: Approve
Revision history for this message
Thomas Herve (therve) wrote :

[1] The tests in MethodCallClientFactoryTest are a bit unfortunate, because highly dependent of time. ReconnectionClientFactory has a "clock" attribute which makes it fairly easy to mock, using twisted.internet.task.Clock

[2]
+ ports.append(reactor.listenUNIX(self.socket, self.server_factory))
+
+ def assert_connection(protocol):
+ self.assertTrue(isinstance(protocol, WordsClientProtocol))
+ client_factory.stopTrying()
+ protocol.transport.loseConnection()
+ ports[0].stopListening()

You shouldn't rely on the test to succeed to close the port connection, otherwise it's going to give you spurious errors. You should use addCleanup just after the listenUNIX.

+1 with most of the callLater removed from tests.

review: Needs Fixing
222. By Free Ekanayaka

Make MethodCallClientFactoryTest tests time-independent (therve [1])

223. By Free Ekanayaka

Fix typos (Jamu [2] to [6])

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Jamu, Thomas thanks for the reviews!

@Jamu: Hehe, I didn't hack on Erlang, but I looked at some code recently which was using that pattern and I found it nice, because it feels a good hint for the reader.

@Therve: I fixed all the tests in MethodCallClientFactoryTest as you suggest. There are still some callLater-full tests in RemoteObjectCreatorTest, but they should be fine.

224. By Free Ekanayaka

Reactor-less testing for MethodCallClientFactory

225. By Free Ekanayaka

Fix testing on hardy

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

I discussed this a little bit with Thomas on IRC and for now we're going to leave the tests time-dependent. Even though this makes them breakable in principle, the rational is that making them time-independent would require some deeper mocking making them less "real-world" and meaningful. In practice I don't expect them to break easily, however if we start to see problems in the buildbots we can think about changing them. Thomas +1'ed for merging on IRC.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'landscape/__init__.py'
2--- landscape/__init__.py 2009-10-27 12:45:38 +0000
3+++ landscape/__init__.py 2010-02-12 16:06:15 +0000
4@@ -1,4 +1,4 @@
5-DEBIAN_REVISION = ""
6+DEBIAN_REVISION = "-0ubuntu0.9.10.0"
7 UPSTREAM_VERSION = "1.4.0"
8 VERSION = "%s%s" % (UPSTREAM_VERSION, DEBIAN_REVISION)
9 API = "3.2"
10
11=== modified file 'landscape/lib/amp.py'
12--- landscape/lib/amp.py 2010-01-25 08:28:36 +0000
13+++ landscape/lib/amp.py 2010-02-12 16:06:15 +0000
14@@ -2,7 +2,7 @@
15
16 from uuid import uuid4
17 from twisted.internet.defer import Deferred, maybeDeferred
18-from twisted.internet.protocol import ServerFactory, ClientFactory
19+from twisted.internet.protocol import ServerFactory, ReconnectingClientFactory
20 from twisted.protocols.amp import Argument, String, Command, AMP
21 from twisted.python.failure import Failure
22
23@@ -212,25 +212,64 @@
24 self.object = object
25
26
27-class MethodCallClientFactory(ClientFactory):
28- """Factory for building L{AMP} connections to L{MethodCall} servers."""
29+class MethodCallClientFactory(ReconnectingClientFactory):
30+ """Factory for building L{AMP} connections to L{MethodCall} servers.
31+
32+ If the connection fails or is lost the factory will keep retrying to
33+ establish it.
34+
35+ @cvar protocol: The factory used to build protocol instances.
36+ @cvar factor: The time factor by which the delay between two subsequent
37+ connection retries will increase.
38+ """
39
40 protocol = MethodCallClientProtocol
41+ factor = 1.6180339887498948
42
43- def __init__(self, reactor, notifier):
44+ def __init__(self, reactor):
45 """
46- @param reactor: The reactor used by the created protocols to schedule
47- notifications and timeouts.
48- @param notifier: A function that will be called when the connection is
49- established. It will be passed the protocol instance as argument.
50+ @param reactor: The reactor used by the created protocols
51+ to schedule notifications and timeouts.
52 """
53 self.reactor = reactor
54- self._notifier = notifier
55+ self.clock = self.reactor
56+ self._notifiers = []
57+
58+ def add_notifier(self, callback, errback=None):
59+ """Call the given function on connection, reconnection or give up.
60+
61+ @param notifier: A function that will be called when the factory builds
62+ a new connected protocol or gives up connecting. It will be passed
63+ the new protocol instance as argument, or the connectionf failure.
64+ """
65+ self._notifiers.append((callback, errback))
66+
67+ def remove_notifier(self, callback, errback=None):
68+ """Remove a notifier."""
69+ self._notifiers.remove((callback, errback))
70+
71+ def notify_success(self, *args, **kwargs):
72+ """Notify all registered notifier callbacks."""
73+ for callback, _ in self._notifiers:
74+ self.reactor.callLater(0, callback, *args, **kwargs)
75+
76+ def notify_failure(self, failure):
77+ """Notify all registered notifier errbacks."""
78+ for _, errback in self._notifiers:
79+ if errback is not None:
80+ self.reactor.callLater(0, errback, failure)
81+
82+ def clientConnectionFailed(self, connector, reason):
83+ ReconnectingClientFactory.clientConnectionFailed(self, connector,
84+ reason)
85+ if self.maxRetries is not None and (self.retries > self.maxRetries):
86+ self.notify_failure(reason) # Give up
87
88 def buildProtocol(self, addr):
89+ self.resetDelay()
90 protocol = self.protocol()
91 protocol.factory = self
92- self.reactor.callLater(0, self._notifier, protocol)
93+ self.notify_success(protocol)
94 return protocol
95
96
97@@ -242,12 +281,24 @@
98 the remote object exposed by the peer.
99 """
100
101- def __init__(self, protocol):
102+ def __init__(self, protocol, retry_on_reconnect=False, timeout=None):
103 """
104 @param protocol: A reference to a connected L{AMP} protocol instance,
105 which will be used to send L{MethodCall} commands.
106+ @param retry_on_reconnect: If C{True}, this L{RemoteObject} will retry
107+ to perform again requests that failed due to a lost connection, as
108+ soon as a new connection is available.
109+ @param timeout: A timeout for failed requests, if the L{RemoteObject}
110+ can't perform them again successfully within this number of
111+ seconds, they will errback with a L{MethodCallError}.
112 """
113 self._protocol = protocol
114+ self._factory = protocol.factory
115+ self._reactor = protocol.factory.reactor
116+ self._retry_on_reconnect = retry_on_reconnect
117+ self._timeout = timeout
118+ self._pending_requests = {}
119+ self._factory.add_notifier(self._handle_reconnect)
120
121 def __getattr__(self, method):
122 """Return a function sending a L{MethodCall} for the given C{method}.
123@@ -262,10 +313,97 @@
124 result = self._protocol.send_method_call(method=method,
125 args=args,
126 kwargs=kwargs)
127- return result.addCallback(lambda response: response["result"])
128+ deferred = Deferred()
129+ result.addCallback(self._handle_response, deferred)
130+ result.addErrback(self._handle_failure, method, args, kwargs,
131+ deferred)
132+ return deferred
133
134 return send_method_call
135
136+ def _handle_response(self, response, deferred, call=None):
137+ """Handles a successful L{MethodCall} response.
138+
139+ @param response: The L{MethodCall} response.
140+ @param deferred: The deferred that was returned to the caller.
141+ @param call: If not C{None}, the scheduled timeout call associated with
142+ the given deferred.
143+ """
144+ result = response["result"]
145+ if call is not None:
146+ call.cancel() # This is a successful retry, cancel the timeout.
147+ deferred.callback(result)
148+
149+ def _handle_failure(self, failure, method, args, kwargs, deferred,
150+ call=None):
151+ """Called when a L{MethodCall} command fails.
152+
153+ If a failure is due to a connection error and if C{retry_on_reconnect}
154+ is C{True}, we will try to perform the requested L{MethodCall} again
155+ as soon as a new connection becomes available, giving up after the
156+ specified C{timeout}, if any.
157+
158+ @param failure: The L{Failure} raised by the requested L{MethodCall}.
159+ @param name: The method name associated with the failed L{MethodCall}.
160+ @param args: The positional arguments of the failed L{MethodCall}.
161+ @param kwargs: The keyword arguments of the failed L{MethodCall}.
162+ @param deferred: The deferred that was returned to the caller.
163+ @param call: If not C{None}, the scheduled timeout call associated with
164+ the given deferred.
165+ """
166+ is_method_call_error = failure.type is MethodCallError
167+ dont_retry = self._retry_on_reconnect == False
168+
169+ if is_method_call_error or dont_retry:
170+ # This means either that the connection is working, and a
171+ # MethodCall protocol error occured, or that we gave up
172+ # trying and raised a timeout. In any case just propagate
173+ # the error.
174+ if deferred in self._pending_requests:
175+ self._pending_requests.pop(deferred)
176+ if call:
177+ call.cancel()
178+ deferred.errback(failure)
179+ return
180+
181+ if self._timeout and call is None:
182+ # This is the first failure for this request, let's schedule a
183+ # timeout call.
184+ timeout = Failure(MethodCallError("timeout"))
185+ call = self._reactor.callLater(self._timeout,
186+ self._handle_failure,
187+ timeout, method, args,
188+ kwargs, deferred=deferred)
189+
190+ self._pending_requests[deferred] = (method, args, kwargs, call)
191+
192+ def _handle_reconnect(self, protocol):
193+ """Handles a reconnection.
194+
195+ @param protocol: The newly connected protocol instance.
196+ """
197+ self._protocol = protocol
198+ if self._retry_on_reconnect:
199+ self._retry()
200+
201+ def _retry(self):
202+ """Try to perform again requests that failed."""
203+
204+ # We need to copy the requests list before iterating over it, because
205+ # if we are actually still disconnected, callRemote will return a
206+ # failed deferred and the _handle_failure errback will be executed
207+ # synchronously during the loop, modifing the requests list itself.
208+ requests = self._pending_requests.copy()
209+ self._pending_requests.clear()
210+
211+ while requests:
212+ deferred, (method, args, kwargs, call) = requests.popitem()
213+ result = self._protocol.send_method_call(method, args, kwargs)
214+ result.addCallback(self._handle_response,
215+ deferred=deferred, call=call)
216+ result.addErrback(self._handle_failure, method, args, kwargs,
217+ deferred=deferred, call=call)
218+
219
220 class RemoteObjectCreator(object):
221 """Connect to remote objects exposed by a L{MethodCallProtocol}."""
222@@ -273,32 +411,48 @@
223 factory = MethodCallClientFactory
224 remote = RemoteObject
225
226- def __init__(self, reactor, socket_path):
227+ def __init__(self, reactor, socket_path, *args, **kwargs):
228 """
229 @param reactor: A reactor able to connect to Unix sockets.
230 @param socket: The path to the socket we want to connect to.
231+ @param args: Arguments to passed to the created L{RemoteObject}.
232+ @param kwargs: Keyword arguments for the created L{RemoteObject}.
233 """
234 self._socket_path = socket_path
235 self._reactor = reactor
236- self._remote = None
237+ self._args = args
238+ self._kwargs = kwargs
239
240- def connect(self):
241+ def connect(self, max_retries=None):
242 """Connect to a remote object exposed by a L{MethodCallProtocol}.
243
244 This method will connect to the socket provided in the constructor
245 and return a L{Deferred} resulting in a connected L{RemoteObject}.
246+
247+ @param max_retries: If not C{None} give up try to connect after this
248+ amount of times.
249 """
250- deferred = Deferred()
251- factory = self.factory(self._reactor, deferred.callback)
252- self._reactor.connectUNIX(self._socket_path, factory)
253- deferred.addCallback(self._connection_made)
254- return deferred
255-
256- def _connection_made(self, protocol):
257- """Called when the connection has been established"""
258- self._remote = self.remote(protocol)
259- return self._remote
260+ self._connected = Deferred()
261+ self._factory = self.factory(self._reactor)
262+ self._factory.maxRetries = max_retries
263+ self._factory.add_notifier(self._success, self._failure)
264+ self._reactor.connectUNIX(self._socket_path, self._factory)
265+ return self._connected
266+
267+ def _success(self, result):
268+ """Called when the first connection has been established"""
269+
270+ # We did our job, remove our own notifier and let the remote object
271+ # handle reconnections.
272+ self._factory.remove_notifier(self._success, self._failure)
273+ self._remote = self.remote(result, *self._args, **self._kwargs)
274+ self._connected.callback(self._remote)
275+
276+ def _failure(self, failure):
277+ """Called when the first connection has failed"""
278+ self._connected.errback(failure)
279
280 def disconnect(self):
281 """Disconnect the L{RemoteObject} that we have created."""
282+ self._factory.stopTrying()
283 self._remote._protocol.transport.loseConnection()
284
285=== modified file 'landscape/lib/tests/test_amp.py'
286--- landscape/lib/tests/test_amp.py 2010-01-20 09:20:46 +0000
287+++ landscape/lib/tests/test_amp.py 2010-02-12 16:06:15 +0000
288@@ -1,12 +1,15 @@
289 from twisted.internet import reactor
290-from twisted.internet.defer import Deferred
291+from twisted.internet.defer import Deferred, DeferredList
292 from twisted.internet.protocol import ClientCreator
293+from twisted.internet.error import ConnectionDone, ConnectError
294+from twisted.internet.task import Clock
295
296 from landscape.lib.amp import (
297 MethodCallError, MethodCallServerProtocol,
298 MethodCallClientProtocol, MethodCallServerFactory,
299 MethodCallClientFactory, RemoteObject, RemoteObjectCreator)
300 from landscape.tests.helpers import LandscapeTest
301+from landscape.tests.mocker import KWARGS
302
303
304 class WordsException(Exception):
305@@ -81,7 +84,7 @@
306 return deferred
307
308
309-class WordsProtocol(MethodCallServerProtocol):
310+class WordsServerProtocol(MethodCallServerProtocol):
311
312 methods = ["empty",
313 "motd",
314@@ -96,19 +99,39 @@
315 "google"]
316
317
318+class WordsClientProtocol(MethodCallClientProtocol):
319+
320+ timeout = 0.1
321+
322+
323+class WordsServerFactory(MethodCallServerFactory):
324+
325+ protocol = WordsServerProtocol
326+
327+
328+class WordsClientFactory(MethodCallClientFactory):
329+
330+ protocol = WordsClientProtocol
331+ factor = 0.19
332+
333+
334+class RemoteWordsCreator(RemoteObjectCreator):
335+
336+ factory = WordsClientFactory
337+
338+
339 class MethodCallProtocolTest(LandscapeTest):
340
341 def setUp(self):
342 super(MethodCallProtocolTest, self).setUp()
343 socket = self.mktemp()
344- factory = MethodCallServerFactory(Words())
345- factory.protocol = WordsProtocol
346+ factory = WordsServerFactory(Words())
347 self.port = reactor.listenUNIX(socket, factory)
348
349 def set_protocol(protocol):
350 self.protocol = protocol
351
352- connector = ClientCreator(reactor, MethodCallClientProtocol)
353+ connector = ClientCreator(reactor, WordsClientProtocol)
354 connected = connector.connectUNIX(socket)
355 return connected.addCallback(set_protocol)
356
357@@ -240,17 +263,18 @@
358 def setUp(self):
359 super(RemoteObjectTest, self).setUp()
360 socket = self.mktemp()
361- server_factory = MethodCallServerFactory(Words())
362- server_factory.protocol = WordsProtocol
363+ server_factory = WordsServerFactory(Words())
364 self.port = reactor.listenUNIX(socket, server_factory)
365
366 def set_remote(protocol):
367 self.protocol = protocol
368 self.words = RemoteObject(protocol)
369+ client_factory.stopTrying()
370
371 connected = Deferred()
372 connected.addCallback(set_remote)
373- client_factory = MethodCallClientFactory(reactor, connected.callback)
374+ client_factory = WordsClientFactory(reactor)
375+ client_factory.add_notifier(connected.callback)
376 reactor.connectUNIX(socket, client_factory)
377 return connected
378
379@@ -385,7 +409,6 @@
380 If the peer protocol doesn't send a response for a deferred within
381 the given timeout, the method call fails.
382 """
383- self.protocol.timeout = 0.1
384 result = self.words.google("Long query")
385 return self.assertFailure(result, MethodCallError)
386
387@@ -407,19 +430,83 @@
388 return result.addCallback(assert_late_response_is_handled)
389
390
391+class MethodCallClientFactoryTest(LandscapeTest):
392+
393+ def setUp(self):
394+ super(MethodCallClientFactoryTest, self).setUp()
395+ self.clock = Clock()
396+ self.factory = WordsClientFactory(self.clock)
397+
398+ def test_add_notifier(self):
399+ """
400+ The L{MethodCallClientFactory.add_notifier} method can be used to
401+ add a callback function to be called when a connection is made and
402+ a new protocol instance built.
403+ """
404+ protocol = self.factory.protocol()
405+ self.factory.protocol = lambda: protocol
406+ callback = self.mocker.mock()
407+ callback(protocol)
408+ self.mocker.replay()
409+ self.factory.add_notifier(callback)
410+ self.factory.buildProtocol(None)
411+ self.clock.advance(0)
412+
413+ def test_remove_notifier(self):
414+ """
415+ The L{MethodCallClientFactory.remove_notifier} method can be used to
416+ remove a previously added notifier callback.
417+ """
418+ callback = lambda protocol: 1 / 0
419+ self.factory.add_notifier(callback)
420+ self.factory.remove_notifier(callback)
421+ self.factory.buildProtocol(None)
422+ self.clock.advance(0)
423+
424+ def test_client_connection_failed(self):
425+ """
426+ The L{MethodCallClientFactory} keeps trying to connect if maxRetries
427+ is not reached.
428+ """
429+ # This is sub-optimal but the ReconnectingFactory in Hardy's Twisted
430+ # doesn't support task.Clock
431+ self.factory.retry = self.mocker.mock()
432+ self.factory.retry(KWARGS)
433+ self.mocker.replay()
434+ self.assertEquals(self.factory.retries, 0)
435+ self.factory.clientConnectionFailed(None, None)
436+
437+ def test_client_connection_failed_with_max_retries_reached(self):
438+ """
439+ The L{MethodCallClientFactory} stops trying to connect if maxRetries
440+ is reached.
441+ """
442+ callback = lambda protocol: 1 / 0
443+ errback = self.mocker.mock()
444+ errback("failure")
445+ self.mocker.replay()
446+
447+ self.factory.add_notifier(callback, errback)
448+ self.factory.maxRetries = 1
449+ self.factory.retries = self.factory.maxRetries
450+ self.factory.clientConnectionFailed(object(), "failure")
451+ self.clock.advance(0)
452+
453+
454 class RemoteObjectCreatorTest(LandscapeTest):
455
456 def setUp(self):
457 super(RemoteObjectCreatorTest, self).setUp()
458- socket = self.mktemp()
459- factory = MethodCallServerFactory(Words())
460- factory.protocol = WordsProtocol
461- self.port = reactor.listenUNIX(socket, factory)
462-
463- def set_remote(remote):
464- self.words = remote
465-
466- self.connector = RemoteObjectCreator(reactor, socket)
467+ self.socket = self.mktemp()
468+ self.server_factory = WordsServerFactory(Words())
469+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
470+ self.connector = RemoteWordsCreator(reactor, self.socket,
471+ retry_on_reconnect=True,
472+ timeout=0.7)
473+
474+ def set_remote(words):
475+ self.words = words
476+
477 connected = self.connector.connect()
478 return connected.addCallback(set_remote)
479
480@@ -430,7 +517,206 @@
481
482 def test_connect(self):
483 """
484- A L{RemoteObject} can send L{MethodCall}s without arguments and with
485- an empty response.
486+ The L{RemoteObject} resulting form the deferred returned by
487+ L{RemoteObjectCreator.connect} is properly connected to the
488+ remote peer.
489 """
490 return self.assertSuccess(self.words.empty())
491+
492+ def test_connect_with_max_retries(self):
493+ """
494+ If C{max_retries} is passed to the L{RemoteObjectCreator} method,
495+ then it will give up trying to connect after that amout of times.
496+ """
497+ self.connector.disconnect()
498+ self.port.stopListening()
499+
500+ def reconnect(ignored):
501+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
502+ return self.connector.connect()
503+
504+ result = self.connector.connect(max_retries=0)
505+ self.assertFailure(result, ConnectError)
506+ return result.addCallback(reconnect)
507+
508+ def test_reconnect(self):
509+ """
510+ If the connection is lost, the L{RemoteObject} created by the creator
511+ will transparently handle the reconnection.
512+ """
513+ self.words._protocol.transport.loseConnection()
514+ self.port.stopListening()
515+
516+ def restart_listening():
517+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
518+ reactor.callLater(0.3, assert_remote)
519+
520+ def assert_remote():
521+ result = self.words.empty()
522+ result.addCallback(lambda x: reconnected.callback(None))
523+ return result
524+
525+ reactor.callLater(0.01, restart_listening)
526+ reconnected = Deferred()
527+ return reconnected
528+
529+ def test_method_call_error(self):
530+ """
531+ If a L{MethodCall} fails due to a L{MethodCallError}, the
532+ L{RemoteObject} won't try to perform it again.
533+ """
534+ return self.assertFailure(self.words.secret(), MethodCallError)
535+
536+ def test_retry(self):
537+ """
538+ If the connection is lost, the L{RemoteObject} created by the creator
539+ will transparently retry to perform the L{MethodCall} requests that
540+ failed due to the broken connection.
541+ """
542+ self.words._protocol.transport.loseConnection()
543+ self.port.stopListening()
544+
545+ def restart_listening():
546+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
547+
548+ reactor.callLater(0.1, restart_listening)
549+ return self.words.empty()
550+
551+ def test_retry_with_method_call_error(self):
552+ """
553+ If a retried L{MethodCall} request fails due to a L{MethodCallError},
554+ the L{RemoteObject} will properly propagate the error to the original
555+ caller.
556+ """
557+ self.words._protocol.transport.loseConnection()
558+ self.port.stopListening()
559+
560+ def restart_listening():
561+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
562+
563+ def assert_failure(error):
564+ self.assertEquals(str(error), "Forbidden method 'secret'")
565+
566+ reactor.callLater(0.5, restart_listening)
567+ result = self.words.secret()
568+ self.assertFailure(result, MethodCallError)
569+ return result.addCallback(assert_failure)
570+
571+ def test_wb_retry_with_while_still_disconnected(self):
572+ """
573+ The L{RemoteObject._retry} method gets called as soon as a new
574+ connection is ready. If for whatever reason the connection drops
575+ again very quickly, the C{_retry} method will behave as expected.
576+ """
577+ self.words._protocol.transport.loseConnection()
578+ self.port.stopListening()
579+
580+ def handle_reconnect(protocol):
581+ # In this precise moment we have a newly connected protocol
582+ self.words._protocol = protocol
583+
584+ # Pretend that the connection is lost again very quickly
585+ protocol.transport.loseConnection()
586+ self.port.stopListening()
587+
588+ # Force RemoteObject._retry to run using a disconnected protocol
589+ reactor.callLater(0, self.words._retry)
590+
591+ # Restore the real handler and start listening again very soon
592+ self.connector._factory.remove_notifier(handle_reconnect)
593+ self.connector._factory.add_notifier(self.words._handle_reconnect)
594+ reactor.callLater(0.2, restart_listening)
595+
596+ def restart_listening():
597+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
598+
599+ def assert_failure(error):
600+ self.assertEquals(str(error), "Forbidden method 'secret'")
601+
602+ # Use our own reconnect handler
603+ self.connector._factory.remove_notifier(self.words._handle_reconnect)
604+ self.connector._factory.add_notifier(handle_reconnect)
605+
606+ reactor.callLater(0.2, restart_listening)
607+ result = self.words.secret()
608+ self.assertFailure(result, MethodCallError)
609+ return result.addCallback(assert_failure)
610+
611+ def test_retry_with_many_method_calls(self):
612+ """
613+ If several L{MethodCall} requests were issued while disconnected, they
614+ will be all eventually completed when the connection gets established
615+ again.
616+ """
617+ self.words._protocol.transport.loseConnection()
618+ self.port.stopListening()
619+
620+ def restart_listening():
621+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
622+
623+ def assert_guess(response):
624+ self.assertEquals(response, "Guessed!")
625+
626+ def assert_secret(failure):
627+ self.assertEquals(str(failure.value), "Forbidden method 'secret'")
628+
629+ def assert_motd(response):
630+ self.assertEquals(response, "Words are cool")
631+
632+ reactor.callLater(0.1, restart_listening)
633+
634+ results = [self.words.guess("word", "cool", value=4),
635+ self.words.secret(),
636+ self.words.motd()]
637+ results[0].addCallback(assert_guess)
638+ results[1].addErrback(assert_secret)
639+ results[2].addCallback(assert_motd)
640+ return DeferredList(results)
641+
642+ def test_retry_without_retry_on_reconnect(self):
643+ """
644+ If C{retry_on_reconnect} is C{False}, the L{RemoteObject} object won't
645+ retry to perform requests which failed because the connection was
646+ lost, however requests made after a reconnection will still succeed.
647+ """
648+ self.words._protocol.transport.loseConnection()
649+ self.port.stopListening()
650+
651+ def restart_listening():
652+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
653+ reactor.callLater(0.3, assert_reconnected)
654+
655+ def assert_reconnected():
656+ result = self.words.empty()
657+ result.addCallback(lambda x: reconnected.callback(None))
658+ return result
659+
660+ reactor.callLater(0.1, restart_listening)
661+ self.words._retry_on_reconnect = False
662+ result = self.words.empty()
663+ self.assertFailure(result, ConnectionDone)
664+ reconnected = Deferred()
665+ return result.addCallback(lambda x: reconnected)
666+
667+ def test_retry_with_timeout(self):
668+ """
669+ If a C{timeout} is set, the L{RemoteObject} object will errback failed
670+ L{MethodCall}s after that amount of seconds, without retrying them when
671+ the connection established again.
672+ """
673+ self.words._protocol.transport.loseConnection()
674+ self.port.stopListening()
675+
676+ def restart_listening():
677+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
678+ reactor.callLater(0.1, reconnected.callback, None)
679+
680+ def assert_failure(error):
681+ self.assertEquals(str(error), "timeout")
682+ return reconnected
683+
684+ reactor.callLater(0.9, restart_listening)
685+ result = self.words.empty()
686+ self.assertFailure(result, MethodCallError)
687+ reconnected = Deferred()
688+ return result.addCallback(assert_failure)

Subscribers

People subscribed via source and target branches

to all changes: