Merge lp:~free.ekanayaka/landscape-client/amp-cleanup-8 into lp:~landscape/landscape-client/trunk

Proposed by Free Ekanayaka
Status: Merged
Approved by: Free Ekanayaka
Approved revision: 673
Merged at revision: 670
Proposed branch: lp:~free.ekanayaka/landscape-client/amp-cleanup-8
Merge into: lp:~landscape/landscape-client/trunk
Diff against target: 751 lines (+234/-212)
9 files modified
landscape/amp.py (+8/-11)
landscape/broker/amp.py (+5/-5)
landscape/broker/server.py (+2/-2)
landscape/configuration.py (+12/-10)
landscape/lib/amp.py (+24/-32)
landscape/lib/tests/test_amp.py (+107/-126)
landscape/reactor.py (+4/-4)
landscape/tests/helpers.py (+13/-0)
landscape/tests/test_amp.py (+59/-22)
To merge this branch: bzr merge lp:~free.ekanayaka/landscape-client/amp-cleanup-8
Reviewer Review Type Date Requested Status
Geoff Teale (community) Approve
Alberto Donato (community) Approve
Review via email: mp+162068@code.launchpad.net

Commit message

Some more cleanup around the AMP-based machinery:

- landscape.amp.ComponentConnector: the disconnect method now uses Twisted's IConnector.disconnect(), without messing with the lower-level ITransport.loseConnection(). The objects implementing IConnector are the ones returned by reactor.connectUNIX/connectTCP/connectSSH and afaiu they're what is supposed to be used for terminating a connection.

- rename RemoteComponentRegistry to ComponentRegistry, to match ComponentPublisher and ComponentConnector

- landscape.configuration: the changes here are due because we know use IConnector.disconnect() vs ITransport.loseConnection(). Unfortunately the code in this module is factored and tested in a far less than ideal way, eventually we should take time to clean it up.

- landscape.lib.amp.RemoteObject: update the constructor docstring and reduce a bit of logic duplication by adding a new _send_method_call helper method, which is now used by __getattr__ and _retry (this makes _retry flush the fake asynchronous connection used in tests, in the same way __getattr__ does).

- landscape.lib.amp.MethodCallServerFactory: rename the "reactor" init parameter to "clock", since only the IReactorTime interface is needed (in this case instances are normally called "clock" and can be mocked with a twisted.internet.task.Clock instance in tests)

- landscape.lib.tests.test_amp: Add a FakeConnector class implementing IConnector, so we can use it unit-tests. Also, I've converted a few deferred-returning (asynchronous) tests to synchronous equivalent.

- landscape.tests.test_amp: Concerted asynchronous tests to synchronous equivalent.

Description of the change

Some more cleanup around the AMP-based machinery:

- landscape.amp.ComponentConnector: the disconnect method now uses Twisted's IConnector.disconnect(), without messing with the lower-level ITransport.loseConnection(). The objects implementing IConnector are the ones returned by reactor.connectUNIX/connectTCP/connectSSH and afaiu they're what is supposed to be used for terminating a connection.

- rename RemoteComponentRegistry to ComponentRegistry, to match ComponentPublisher and ComponentConnector

- landscape.configuration: the changes here are due because we know use IConnector.disconnect() vs ITransport.loseConnection(). Unfortunately the code in this module is factored and tested in a far less than ideal way, eventually we should take time to clean it up.

- landscape.lib.amp.RemoteObject: update the constructor docstring and reduce a bit of logic duplication by adding a new _send_method_call helper method, which is now used by __getattr__ and _retry (this makes _retry flush the fake asynchronous connection used in tests, in the same way __getattr__ does).

- landscape.lib.amp.MethodCallServerFactory: rename the "reactor" init parameter to "clock", since only the IReactorTime interface is needed (in this case instances are normally called "clock" and can be mocked with a twisted.internet.task.Clock instance in tests)

- landscape.lib.tests.test_amp: Add a FakeConnector class implementing IConnector, so we can use it unit-tests. Also, I've converted a few deferred-returning (asynchronous) tests to synchronous equivalent.

- landscape.tests.test_amp: Concerted asynchronous tests to synchronous equivalent.

To post a comment you must log in.
Revision history for this message
Alberto Donato (ack) wrote :

Great! +1

#1:
+ If C{max_retries} is passed to L{RemoteObjectConnector.connet},
+ then it will give up trying to connect after that amout of times.

typos, "connet" and "amout".

review: Approve
Revision history for this message
Geoff Teale (tealeg) wrote :

+1 Only trivial comments:

[1] "invokation" should be "invocation"
226 + # invokation letting tests simulate a synchronous transport.

[2] "till" -> "until"
439 + # queue, till the call gets a chance to be retried upon reconnection

review: Approve
673. By Free Ekanayaka

Fix typos

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

#1, [1] and [2] all fixed.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'landscape/amp.py'
2--- landscape/amp.py 2013-04-30 14:01:55 +0000
3+++ landscape/amp.py 2013-05-06 18:17:25 +0000
4@@ -95,6 +95,9 @@
5 factory.initialDelay = factory.delay = 0.05
6 factory.retryOnReconnect = self._retry_on_reconnect
7 factory.remote = self.remote
8+ factory.maxRetries = max_retries
9+ if factor:
10+ factory.factor = factor
11
12 def fire_reconnect(ignored):
13 self._reactor.fire("%s-reconnect" % self.component.name)
14@@ -107,9 +110,6 @@
15 logging.error("Error while connecting to %s", self.component.name)
16 return failure
17
18- factory.maxRetries = max_retries
19- if factor:
20- factory.factor = factor
21 socket_path = _get_socket_path(self.component, self._config)
22 deferred = factory.getRemoteObject()
23 self._connector = self._reactor.connect_unix(socket_path, factory)
24@@ -124,14 +124,11 @@
25 if self._connector is not None:
26 factory = self._connector.factory
27 factory.stopTrying()
28- # XXX we should be using self._connector.disconnect() here
29- remote = factory._remote
30- if remote:
31- if remote._sender.protocol.transport:
32- remote._sender.protocol.transport.loseConnection()
33-
34-
35-class RemoteComponentsRegistry(object):
36+ self._connector.disconnect()
37+ self._connector = None
38+
39+
40+class ComponentsRegistry(object):
41 """
42 A global registry for looking up Landscape component connectors by name.
43 """
44
45=== modified file 'landscape/broker/amp.py'
46--- landscape/broker/amp.py 2013-04-30 14:01:55 +0000
47+++ landscape/broker/amp.py 2013-05-06 18:17:25 +0000
48@@ -2,7 +2,7 @@
49
50 from landscape.lib.amp import RemoteObject, MethodCallArgument
51 from landscape.amp import (
52- ComponentConnector, RemoteComponentsRegistry, ComponentPublisher)
53+ ComponentConnector, ComponentsRegistry, ComponentPublisher)
54 from landscape.broker.server import BrokerServer
55 from landscape.broker.client import BrokerClient
56 from landscape.monitor.monitor import Monitor
57@@ -119,7 +119,7 @@
58
59 component = Manager
60
61-RemoteComponentsRegistry.register(RemoteBrokerConnector)
62-RemoteComponentsRegistry.register(RemoteClientConnector)
63-RemoteComponentsRegistry.register(RemoteMonitorConnector)
64-RemoteComponentsRegistry.register(RemoteManagerConnector)
65+ComponentsRegistry.register(RemoteBrokerConnector)
66+ComponentsRegistry.register(RemoteClientConnector)
67+ComponentsRegistry.register(RemoteMonitorConnector)
68+ComponentsRegistry.register(RemoteManagerConnector)
69
70=== modified file 'landscape/broker/server.py'
71--- landscape/broker/server.py 2013-04-29 11:37:27 +0000
72+++ landscape/broker/server.py 2013-05-06 18:17:25 +0000
73@@ -3,7 +3,7 @@
74 from twisted.internet.defer import Deferred
75
76 from landscape.lib.twisted_util import gather_results
77-from landscape.amp import RemoteComponentsRegistry
78+from landscape.amp import ComponentsRegistry
79 from landscape.manager.manager import FAILED
80
81
82@@ -37,7 +37,7 @@
83 @param message_store: The broker's L{MessageStore}.
84 """
85 name = "broker"
86- connectors_registry = RemoteComponentsRegistry
87+ connectors_registry = ComponentsRegistry
88
89 def __init__(self, config, reactor, exchange, registration,
90 message_store, pinger):
91
92=== modified file 'landscape/configuration.py'
93--- landscape/configuration.py 2013-04-23 11:18:33 +0000
94+++ landscape/configuration.py 2013-05-06 18:17:25 +0000
95@@ -492,7 +492,7 @@
96
97
98 def fetch_base64_ssl_public_certificate(hostname, on_info=print_text,
99- on_error=print_text):
100+ on_error=print_text):
101 """
102 Fetch base64 encoded SSL CA certificate from the discovered landscape
103 server and return that decoded info.
104@@ -622,20 +622,21 @@
105 reactor = TwistedReactor()
106 exit_with_error = []
107
108- def stop(error=None):
109- if not config.ok_no_register and error is not None:
110- exit_with_error.append(error)
111+ def stop(errors):
112+ if not config.ok_no_register:
113+ for error in errors:
114+ if error is not None:
115+ exit_with_error.append(error)
116 connector.disconnect()
117 reactor.stop()
118
119 def failure():
120 on_message("Invalid account name or "
121 "registration key.", error=True)
122- stop(2)
123+ return 2
124
125 def success():
126 on_message("System successfully registered.")
127- stop()
128
129 def exchange_failure():
130 on_message("We were unable to contact the server. "
131@@ -643,7 +644,7 @@
132 "The landscape client will continue to try and contact "
133 "the server periodically.",
134 error=True)
135- stop(2)
136+ return 2
137
138 def handle_registration_errors(failure):
139 # We'll get invalid credentials through the signal.
140@@ -653,7 +654,7 @@
141 def catch_all(failure):
142 on_message(failure.getTraceback(), error=True)
143 on_message("Unknown error occurred.", error=True)
144- stop(2)
145+ return 2
146
147 on_message("Please wait... ", "")
148
149@@ -670,14 +671,15 @@
150 # We consume errors here to ignore errors after the first one.
151 # catch_all will be called for the very first deferred that fails.
152 results = gather_results(deferreds, consume_errors=True)
153- return results.addErrback(catch_all)
154+ results.addErrback(catch_all)
155+ results.addCallback(stop)
156
157 def got_error(failure):
158 on_message("There was an error communicating with the Landscape"
159 " client.", error=True)
160 on_message("This machine will be registered with the provided "
161 "details when the client runs.", error=True)
162- stop(2)
163+ stop([2])
164
165 connector = RemoteBrokerConnector(reactor, config)
166 result = connector.connect(max_retries=max_retries, quiet=True)
167
168=== modified file 'landscape/lib/amp.py'
169--- landscape/lib/amp.py 2013-05-02 08:24:39 +0000
170+++ landscape/lib/amp.py 2013-05-06 18:17:25 +0000
171@@ -318,14 +318,9 @@
172
173 def __init__(self, factory):
174 """
175- @param protocol: A reference to a connected L{AMP} protocol instance,
176- which will be used to send L{MethodCall} commands.
177- @param retry_on_reconnect: If C{True}, this L{RemoteObject} will retry
178- to perform again requests that failed due to a lost connection, as
179- soon as a new connection is available.
180- @param timeout: A timeout for failed requests, if the L{RemoteObject}
181- can't perform them again successfully within this number of
182- seconds, they will errback with a L{MethodCallError}.
183+ @param factory: The L{MethodCallClientFactory} used for connecting to
184+ the other peer. Look there if you need to tweak the behavior of
185+ this L{RemoteObject}.
186 """
187 self._sender = None
188 self._pending_requests = {}
189@@ -340,27 +335,29 @@
190 keyword arguments it was called with, and returning a L{Deferred}
191 resulting in the L{MethodCall}'s response value.
192 """
193-
194 def send_method_call(*args, **kwargs):
195- result = self._sender.send_method_call(method=method,
196- args=args,
197- kwargs=kwargs)
198 deferred = Deferred()
199- result.addCallback(self._handle_result, deferred)
200- result.addErrback(self._handle_failure, method, args, kwargs,
201- deferred)
202-
203- if self._factory.fake_connection is not None:
204- # Transparently flush the connection after a send_method_call
205- # invokation letting tests simulate a synchronous transport.
206- # This is needed because the Twisted's AMP implementation
207- # assume that the transport is asynchronous.
208- self._factory.fake_connection.flush()
209-
210+ self._send_method_call(method, args, kwargs, deferred)
211 return deferred
212
213 return send_method_call
214
215+ def _send_method_call(self, method, args, kwargs, deferred, call=None):
216+ """Send a L{MethodCall} command, adding callbacks to handle retries."""
217+ result = self._sender.send_method_call(method=method,
218+ args=args,
219+ kwargs=kwargs)
220+ result.addCallback(self._handle_result, deferred, call=call)
221+ result.addErrback(self._handle_failure, method, args, kwargs,
222+ deferred, call=call)
223+
224+ if self._factory.fake_connection is not None:
225+ # Transparently flush the connection after a send_method_call
226+ # invocation letting tests simulate a synchronous transport.
227+ # This is needed because the Twisted's AMP implementation
228+ # assume that the transport is asynchronous.
229+ self._factory.fake_connection.flush()
230+
231 def _handle_result(self, result, deferred, call=None):
232 """Handles a successful C{send_method_call} result.
233
234@@ -437,11 +434,7 @@
235
236 while requests:
237 deferred, (method, args, kwargs, call) = requests.popitem()
238- result = self._sender.send_method_call(method, args, kwargs)
239- result.addCallback(self._handle_result,
240- deferred=deferred, call=call)
241- result.addErrback(self._handle_failure, method, args, kwargs,
242- deferred=deferred, call=call)
243+ self._send_method_call(method, args, kwargs, deferred, call=call)
244
245
246 class MethodCallServerFactory(ServerFactory):
247@@ -501,15 +494,14 @@
248 # hack can be removed.
249 fake_connection = None
250
251- def __init__(self, reactor=None):
252+ def __init__(self, clock):
253 """
254 @param object: The object exposed by the L{MethodCallProtocol}s
255 instances created by this factory.
256 @param reactor: The reactor used by the created protocols
257 to schedule notifications and timeouts.
258 """
259- self.reactor = reactor
260- self.clock = self.reactor
261+ self.clock = clock
262 self.delay = self.initialDelay
263 self._connects = []
264 self._requests = []
265@@ -600,7 +592,7 @@
266 delay between subsequent retries should increase. Smaller values
267 result in a faster reconnection attempts pace.
268 """
269- factory = self.factory(reactor=self._reactor)
270+ factory = self.factory(self._reactor)
271 factory.maxRetries = max_retries
272 if factor:
273 factory.factor = factor
274
275=== modified file 'landscape/lib/tests/test_amp.py'
276--- landscape/lib/tests/test_amp.py 2013-05-02 08:24:39 +0000
277+++ landscape/lib/tests/test_amp.py 2013-05-06 18:17:25 +0000
278@@ -22,7 +22,7 @@
279 self.stream.append(data)
280
281 def loseConnection(self):
282- self.connection.disconnect()
283+ raise NotImplemented()
284
285 def getPeer(self):
286 pass
287@@ -37,15 +37,14 @@
288 def __init__(self, client, server):
289 self.client = client
290 self.server = server
291- self.connect()
292
293- def connect(self):
294+ def make(self):
295 self.server.makeConnection(FakeTransport(self))
296 self.client.makeConnection(FakeTransport(self))
297
298- def disconnect(self):
299- reason = Failure(ConnectionDone())
300- connector = self
301+ def lose(self, connector, reason):
302+ self.server.connectionLost(reason)
303+ self.client.connectionLost(reason)
304 self.client.factory.clientConnectionLost(connector, reason)
305
306 def flush(self):
307@@ -53,14 +52,41 @@
308 Notify the server of any data written by the client and viceversa.
309 """
310 while True:
311- if self.client.transport.stream:
312+ if self.client.transport and self.client.transport.stream:
313 self.server.dataReceived(self.client.transport.stream.pop(0))
314- elif self.server.transport.stream:
315+ elif self.server.transport and self.server.transport.stream:
316 self.client.dataReceived(self.server.transport.stream.pop(0))
317 else:
318 break
319
320
321+class FakeConnector(object):
322+ """Make L{FakeConnection}s using the given server and client factories."""
323+
324+ def __init__(self, client, server):
325+ self.client = client
326+ self.server = server
327+ self.connection = None
328+
329+ @property
330+ def factory(self):
331+ return self.client
332+
333+ def connect(self):
334+ self.connection = FakeConnection(self.client.buildProtocol(None),
335+ self.server.buildProtocol(None))
336+
337+ # XXX Let the client factory be aware of this fake connection, so
338+ # it can flush it when needed. This is to workaround AMP not
339+ # supporting synchronous transports
340+ self.client.fake_connection = self.connection
341+
342+ self.connection.make()
343+
344+ def disconnect(self):
345+ self.connection.lose(self, Failure(ConnectionDone()))
346+
347+
348 class WordsException(Exception):
349 """Test exception."""
350
351@@ -185,6 +211,7 @@
352 server = MethodCallServerProtocol(Words(), METHODS)
353 client = MethodCallClientProtocol()
354 self.connection = FakeConnection(client, server)
355+ self.connection.make()
356 self.clock = Clock()
357 self.sender = MethodCallSender(client, self.clock)
358
359@@ -341,12 +368,11 @@
360 def setUp(self):
361 super(RemoteObjectTest, self).setUp()
362 self.clock = Clock()
363- server = MethodCallServerProtocol(Words(self.clock), METHODS)
364- client = MethodCallClientProtocol()
365- client.factory = WordsFactory(self.clock)
366- self.remote = RemoteObject(client.factory)
367- self.connection = FakeConnection(client, server)
368- client.factory.fake_connection = self.connection
369+ self.factory = WordsFactory(self.clock)
370+ server_factory = MethodCallServerFactory(Words(self.clock), METHODS)
371+ self.connector = FakeConnector(self.factory, server_factory)
372+ self.connector.connect()
373+ self.remote = self.successResultOf(self.factory.getRemoteObject())
374
375 def test_method_call_sender_with_forbidden_method(self):
376 """
377@@ -455,7 +481,7 @@
378
379 # Simulate time advancing and the receiver responding
380 self.clock.advance(0.5)
381- self.connection.flush()
382+ self.connector.connection.flush()
383
384 self.assertEqual(["Cool!"], result)
385
386@@ -474,7 +500,7 @@
387
388 # Simulate time advancing and the receiver responding
389 self.clock.advance(0.5)
390- self.connection.flush()
391+ self.connector.connection.flush()
392
393 [failure] = result
394 failure.trap(MethodCallError)
395@@ -521,13 +547,62 @@
396 [failure] = result
397 failure.trap(MethodCallError)
398
399+ def test_method_call_error(self):
400+ """
401+ If a L{MethodCall} fails due to a L{MethodCallError},
402+ the L{RemoteObject} won't try to perform it again, even if the
403+ C{retryOnReconnect} error is set, as a L{MethodCallError} is a
404+ permanent failure that is not likely to ever succeed.
405+ """
406+ self.factory.retryOnReconnect = True
407+ deferred = self.remote.secret()
408+ self.failureResultOf(deferred).trap(MethodCallError)
409+
410+ def test_retry(self):
411+ """
412+ If the connection is lost and C{retryOnReconnect} is C{True} on the
413+ factory, the L{RemoteObject} will transparently retry to perform
414+ the L{MethodCall} requests that failed due to the broken connections.
415+ """
416+ self.connector.disconnect()
417+ deferred = self.remote.capitalize("john")
418+
419+ # The deferred has not fired yet, because it's been put in the pending
420+ # queue, until the call gets a chance to be retried upon reconnection
421+ self.assertFalse(deferred.called)
422+
423+ # Time passes and the factory successfully reconnects
424+ self.clock.advance(1)
425+
426+ # We finally get the result
427+ self.assertEqual("John", self.successResultOf(deferred))
428+
429+ def test_retry_with_method_call_error(self):
430+ """
431+ If a retried L{MethodCall} request fails due to a L{MethodCallError},
432+ the L{RemoteObject} will properly propagate the error to the original
433+ caller.
434+ """
435+ self.connector.disconnect()
436+ deferred = self.remote.secret()
437+
438+ # The deferred has not fired yet, because it's been put in the pending
439+ # queue, until the call gets a chance to be retried upon reconnection
440+ self.assertFalse(deferred.called)
441+
442+ # Time passes and the factory successfully reconnects
443+ self.clock.advance(1)
444+
445+ failure = self.failureResultOf(deferred)
446+ self.assertEqual("Forbidden method 'secret'", str(failure.value))
447+
448
449 class MethodCallClientFactoryTest(LandscapeTest):
450
451 def setUp(self):
452 super(MethodCallClientFactoryTest, self).setUp()
453 self.clock = Clock()
454- self.factory = MethodCallClientFactory(reactor=self.clock)
455+ self.factory = MethodCallClientFactory(self.clock)
456
457 def test_max_delay(self):
458 """
459@@ -598,6 +673,21 @@
460 self.clock.advance(5)
461 self.assertTrue(connector.called)
462
463+ def test_reconnect(self):
464+ """
465+ If the connection is lost, the L{RemoteObject} created by the creator
466+ will transparently handle the reconnection.
467+ """
468+ server_factory = MethodCallServerFactory(Words(self.clock), METHODS)
469+ connector = FakeConnector(self.factory, server_factory)
470+ connector.connect()
471+ remote = self.successResultOf(self.factory.getRemoteObject())
472+
473+ connector.disconnect()
474+ self.clock.advance(5)
475+ deferred = remote.empty()
476+ self.assertIsNone(self.successResultOf(deferred))
477+
478
479 class RemoteObjectConnectorTest(LandscapeTest):
480
481@@ -627,115 +717,6 @@
482 """
483 return self.assertSuccess(self.words.empty())
484
485- def test_connect_with_max_retries(self):
486- """
487- If C{max_retries} is passed to L{RemoteObjectConnector.connet},
488- then it will give up trying to connect after that amout of times.
489- """
490- self.connector.disconnect()
491- self.port.stopListening()
492-
493- def reconnect(ignored):
494- self.port = reactor.listenUNIX(self.socket, self.server_factory)
495- return self.connector.connect()
496-
497- result = self.connector.connect(max_retries=0)
498- self.assertFailure(result, ConnectError)
499- return result.addCallback(reconnect)
500-
501- def test_connect_with_factor(self):
502- """
503- If C{factor} is passed to L{RemoteObjectConnector.connect} method,
504- then the associated protocol factory will be set to that value.
505- """
506- self.connector.disconnect()
507-
508- def assert_factor(ignored):
509- self.assertEqual(self.connector._connector.factory.factor, 1.0)
510-
511- result = self.connector.connect(factor=1.0)
512- return result.addCallback(assert_factor)
513-
514- def test_disconnect(self):
515- """
516- It is possible to call L{RemoteObjectConnector.disconnect} multiple
517- times, even if the connection has been already closed.
518- """
519- self.connector.disconnect()
520- self.connector.disconnect()
521-
522- def test_disconnect_without_connect(self):
523- """
524- It is possible to call L{RemoteObjectConnector.disconnect} even
525- if the connection was never established. In that case the method
526- is effectively a no-op.
527- """
528- connector = RemoteWordsConnector(None, None)
529- connector.disconnect()
530-
531- def test_reconnect(self):
532- """
533- If the connection is lost, the L{RemoteObject} created by the creator
534- will transparently handle the reconnection.
535- """
536- self.words._sender.protocol.transport.loseConnection()
537- self.port.stopListening()
538-
539- def restart_listening():
540- self.port = reactor.listenUNIX(self.socket, self.server_factory)
541- reactor.callLater(0.3, assert_remote)
542-
543- def assert_remote():
544- result = self.words.empty()
545- result.addCallback(lambda x: reconnected.callback(None))
546- return result
547-
548- reactor.callLater(0.01, restart_listening)
549- reconnected = Deferred()
550- return reconnected
551-
552- def test_method_call_error(self):
553- """
554- If a L{MethodCall} fails due to a L{MethodCallError}, the
555- L{RemoteObject} won't try to perform it again.
556- """
557- return self.assertFailure(self.words.secret(), MethodCallError)
558-
559- def test_retry(self):
560- """
561- If the connection is lost, the L{RemoteObject} created by the creator
562- will transparently retry to perform the L{MethodCall} requests that
563- failed due to the broken connection.
564- """
565- self.words._sender.protocol.transport.loseConnection()
566- self.port.stopListening()
567-
568- def restart_listening():
569- self.port = reactor.listenUNIX(self.socket, self.server_factory)
570-
571- reactor.callLater(0.1, restart_listening)
572- return self.words.empty()
573-
574- def test_retry_with_method_call_error(self):
575- """
576- If a retried L{MethodCall} request fails due to a L{MethodCallError},
577- the L{RemoteObject} will properly propagate the error to the original
578- caller.
579- """
580- self.words._sender.protocol.transport.loseConnection()
581- self.port.stopListening()
582-
583- def restart_listening():
584- self.port = reactor.listenUNIX(self.socket, self.server_factory)
585-
586- def assert_failure(error):
587- self.assertEqual(str(error), "Forbidden method 'secret'")
588-
589- reactor.callLater(0.5, restart_listening)
590- result = self.words.secret()
591- self.assertFailure(result, MethodCallError)
592- return result.addCallback(assert_failure)
593-
594 def test_wb_retry_with_while_still_disconnected(self):
595 """
596 The L{RemoteObject._retry} method gets called as soon as a new
597
598=== modified file 'landscape/reactor.py'
599--- landscape/reactor.py 2013-05-02 08:24:39 +0000
600+++ landscape/reactor.py 2013-05-06 18:17:25 +0000
601@@ -354,15 +354,15 @@
602
603 def connect_unix(self, path, factory):
604 server = self._socket_paths.get(path)
605- from landscape.lib.tests.test_amp import FakeConnection
606+ from landscape.lib.tests.test_amp import FakeConnector
607 if server:
608- connection = FakeConnection(factory.buildProtocol(path),
609- server.buildProtocol(path))
610- factory.fake_connection = connection
611+ connector = FakeConnector(factory, server)
612+ connector.connect()
613 else:
614 connector = object() # Fake connector
615 failure = Failure(ConnectError("No such file or directory"))
616 factory.clientConnectionFailed(connector, failure)
617+ return connector
618
619 def run(self):
620 """Continuously advance this reactor until reactor.stop() is called."""
621
622=== modified file 'landscape/tests/helpers.py'
623--- landscape/tests/helpers.py 2013-04-30 14:02:14 +0000
624+++ landscape/tests/helpers.py 2013-05-06 18:17:25 +0000
625@@ -148,6 +148,19 @@
626 else:
627 return result[0]
628
629+ def assertNoResult(self, deferred):
630+ """See C{twisted.trial._synctest._Assertions.assertNoResult}.
631+
632+ This is a copy of the original method, which is available only
633+ since Twisted 12.3.0 (from 2012-12-20).
634+ """
635+ result = []
636+ deferred.addBoth(result.append)
637+ if result:
638+ self.fail(
639+ "No result expected on %r, found %r instead" % (
640+ deferred, result[0]))
641+
642 def assertDeferredSucceeded(self, deferred):
643 self.assertTrue(isinstance(deferred, Deferred))
644 called = []
645
646=== modified file 'landscape/tests/test_amp.py'
647--- landscape/tests/test_amp.py 2013-04-23 11:18:33 +0000
648+++ landscape/tests/test_amp.py 2013-05-06 18:17:25 +0000
649@@ -1,5 +1,6 @@
650 from twisted.internet.defer import Deferred
651 from twisted.internet.error import ConnectError
652+from twisted.internet.task import Clock
653
654 from landscape.tests.helpers import LandscapeTest
655 from landscape.reactor import FakeReactor
656@@ -73,11 +74,25 @@
657 def setUp(self):
658 super(ComponentConnectorTest, self).setUp()
659 self.reactor = FakeReactor()
660+ # XXX this should be dropped once the FakeReactor doesn't use the
661+ # real reactor anymore under the hood.
662+ self.reactor._reactor = Clock()
663 self.config = Configuration()
664 self.config.data_path = self.makeDir()
665 self.makeDir(path=self.config.sockets_path)
666 self.connector = TestComponentConnector(self.reactor, self.config)
667
668+ def test_connect_with_max_retries(self):
669+ """
670+ If C{max_retries} is passed to L{RemoteObjectConnector.connect},
671+ then it will give up trying to connect after that amount of times.
672+ """
673+ self.log_helper.ignore_errors("Error while connecting to test")
674+ deferred = self.connector.connect(max_retries=2)
675+ self.assertNoResult(deferred)
676+ return
677+ self.failureResultOf(deferred).trap(ConnectError)
678+
679 def test_connect_logs_errors(self):
680 """
681 Connection errors are logged.
682@@ -104,25 +119,47 @@
683 An event is fired whenever the connection is established again after
684 it has been lost.
685 """
686- component = TestComponent()
687- publisher = ComponentPublisher(component, self.reactor, self.config)
688- publisher.start()
689-
690- def listen_again():
691- publisher.start()
692-
693- def connected(remote):
694- remote._sender.protocol.transport.loseConnection()
695- publisher.stop()
696- self.reactor._reactor.callLater(0.01, listen_again)
697-
698- def reconnected():
699- self.connector.disconnect()
700- publisher.stop()
701- deferred.callback(None)
702-
703- deferred = Deferred()
704- self.reactor.call_on("test-reconnect", reconnected)
705- result = self.connector.connect()
706- result.addCallback(connected)
707- return deferred
708+ reconnects = []
709+ self.reactor.call_on("test-reconnect", lambda: reconnects.append(True))
710+
711+ component = TestComponent()
712+ publisher = ComponentPublisher(component, self.reactor, self.config)
713+ publisher.start()
714+ deferred = self.connector.connect()
715+ self.successResultOf(deferred)
716+ self.connector._connector.disconnect() # Simulate a disconnection
717+ self.assertEqual([], reconnects)
718+ self.reactor._reactor.advance(10)
719+ self.assertEqual([True], reconnects)
720+
721+ def test_connect_with_factor(self):
722+ """
723+ If C{factor} is passed to the L{ComponentConnector.connect} method,
724+ then the associated protocol factory will be set to that value.
725+ """
726+ component = TestComponent()
727+ publisher = ComponentPublisher(component, self.reactor, self.config)
728+ publisher.start()
729+ deferred = self.connector.connect(factor=1.0)
730+ remote = self.successResultOf(deferred)
731+ self.assertEqual(1.0, remote._factory.factor)
732+
733+ def test_disconnect(self):
734+ """
735+ It is possible to call L{ComponentConnector.disconnect} multiple times,
736+ even if the connection has been already closed.
737+ """
738+ component = TestComponent()
739+ publisher = ComponentPublisher(component, self.reactor, self.config)
740+ publisher.start()
741+ self.connector.connect()
742+ self.connector.disconnect()
743+ self.connector.disconnect()
744+
745+ def test_disconnect_without_connect(self):
746+ """
747+ It is possible to call L{ComponentConnector.disconnect} even if the
748+ connection was never established. In that case the method is
749+ effectively a no-op.
750+ """
751+ self.connector.disconnect()

Subscribers

People subscribed via source and target branches

to all changes: