Merge lp:~free.ekanayaka/landscape-client/amp-cleanup-2 into lp:~landscape/landscape-client/trunk
- amp-cleanup-2
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Free Ekanayaka | ||||
Approved revision: | 664 | ||||
Merged at revision: | 663 | ||||
Proposed branch: | lp:~free.ekanayaka/landscape-client/amp-cleanup-2 | ||||
Merge into: | lp:~landscape/landscape-client/trunk | ||||
Diff against target: |
663 lines (+226/-197) 4 files modified
landscape/amp.py (+8/-4) landscape/broker/amp.py (+1/-0) landscape/lib/amp.py (+139/-125) landscape/lib/tests/test_amp.py (+78/-68) |
||||
To merge this branch: | bzr merge lp:~free.ekanayaka/landscape-client/amp-cleanup-2 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Alberto Donato (community) | Approve | ||
Geoff Teale (community) | Approve | ||
Review via email: mp+160119@code.launchpad.net |
Commit message
This branch is a step toward the goal of fully-synchronous tests in the client. In detail:
- Drop the retry_on_reconnect parameter of the RemoteBroker constructor, and
stick it to MethodCallClien
a sort of controller of RemoteBroker. Since user code is supposed to deal
with factories directly, the can control the RemoteBroker's behavior by
modifying the factory.
- Move the MethodCallFactory class after the RemoteBroker class, since we now need
to set the latter as 'remote' class attribute of the former.
- Rename MethodCallFactory to MethodCallClien
a protocol factory for clients, not for servers.
- Implement a new interface for MethodCallClien
for user-code. This includes adding a getRemoteObject method and replacing the
notify_success and notify_failure methods with a single notifyOnConnect (camel case
for consistency with Twisted).
Description of the change
This branch is a step toward the goal of fully-synchronous tests in the client. In detail:
- Drop the retry_on_reconnect parameter of the RemoteBroker constructor, and
stick it to MethodCallClien
a sort of controller of RemoteBroker. Since user code is supposed to deal
with factories directly, the can control the RemoteBroker's behavior by
modifying the factory.
- Move the MethodCallFactory class after the RemoteBroker class, since we now need
to set the latter as 'remote' class attribute of the former.
- Rename MethodCallFactory to MethodCallClien
a protocol factory for clients, not for servers.
- Implement a new interface for MethodCallClien
for user-code. This includes adding a getRemoteObject method and replacing the
notify_success and notify_failure methods with a single notifyOnConnect (camel case
for consistency with Twisted).
- 663. By Free Ekanayaka
-
Fix wording and typos
Free Ekanayaka (free.ekanayaka) wrote : | # |
[1] to [3] all fixed. Thanks!
Alberto Donato (ack) wrote : | # |
Looks good! +1
#1:
def synchronous_
# Transparently flush the connection after a send_method_call
# invokation
Typo, not from this branch: s/invokation/
- 664. By Free Ekanayaka
-
Fix typo
Free Ekanayaka (free.ekanayaka) wrote : | # |
#1 Fixed.
Preview Diff
1 | === modified file 'landscape/amp.py' |
2 | --- landscape/amp.py 2013-04-12 07:49:34 +0000 |
3 | +++ landscape/amp.py 2013-04-23 13:44:28 +0000 |
4 | @@ -51,12 +51,13 @@ |
5 | |
6 | factory = ComponentProtocolFactory |
7 | |
8 | - def __init__(self, reactor, config, *args, **kwargs): |
9 | + def __init__(self, reactor, config, retry_on_reconnect=False): |
10 | self._twisted_reactor = reactor |
11 | + self._retry_on_reconnect = retry_on_reconnect |
12 | socket = os.path.join(config.sockets_path, |
13 | self.component.name + ".sock") |
14 | super(RemoteComponentConnector, self).__init__( |
15 | - self._twisted_reactor._reactor, socket, *args, **kwargs) |
16 | + self._twisted_reactor._reactor, socket) |
17 | |
18 | def connect(self, max_retries=None, factor=None, quiet=False): |
19 | """Connect to the remote Landscape component. |
20 | @@ -73,12 +74,15 @@ |
21 | @param quiet: A boolean indicating whether to log errors. |
22 | """ |
23 | |
24 | - def fire_reconnect(remote): |
25 | + def fire_reconnect(ignored): |
26 | self._twisted_reactor.fire("%s-reconnect" % |
27 | self.component.name) |
28 | |
29 | def connected(remote): |
30 | - self._factory.add_notifier(fire_reconnect) |
31 | + # XXX temporary workaround till the AMP cleanup is completed, we |
32 | + # will use the factory directly then. |
33 | + remote._factory.retryOnReconnect = self._retry_on_reconnect |
34 | + self._factory.notifyOnConnect(fire_reconnect) |
35 | return remote |
36 | |
37 | def log_error(failure): |
38 | |
39 | === modified file 'landscape/broker/amp.py' |
40 | --- landscape/broker/amp.py 2013-03-18 03:46:10 +0000 |
41 | +++ landscape/broker/amp.py 2013-04-23 13:44:28 +0000 |
42 | @@ -101,6 +101,7 @@ |
43 | class BrokerClientProtocolFactory(ComponentProtocolFactory): |
44 | |
45 | protocol = BrokerClientProtocol |
46 | + remote = RemoteBroker |
47 | |
48 | |
49 | class RemoteClient(RemoteObject): |
50 | |
51 | === modified file 'landscape/lib/amp.py' |
52 | --- landscape/lib/amp.py 2013-04-19 11:26:09 +0000 |
53 | +++ landscape/lib/amp.py 2013-04-23 13:44:28 +0000 |
54 | @@ -45,7 +45,7 @@ |
55 | for more details about the Twisted AMP protocol. |
56 | """ |
57 | |
58 | -from twisted.internet.defer import Deferred, maybeDeferred |
59 | +from twisted.internet.defer import Deferred, maybeDeferred, succeed |
60 | from twisted.internet.protocol import ReconnectingClientFactory |
61 | from twisted.python.failure import Failure |
62 | |
63 | @@ -277,7 +277,11 @@ |
64 | |
65 | |
66 | class MethodCallClientProtocol(AMP): |
67 | - """XXX Placeholder""" |
68 | + """Send L{MethodCall} commands over the wire using the AMP protocol.""" |
69 | + |
70 | + def connectionMade(self): |
71 | + """Notify our factory that we're ready to go.""" |
72 | + self.factory.clientConnectionMade(self) |
73 | |
74 | |
75 | class MethodCallProtocol(MethodCallServerProtocol, MethodCallClientProtocol): |
76 | @@ -288,81 +292,6 @@ |
77 | MethodCallClientProtocol.__init__(self) |
78 | |
79 | |
80 | -class MethodCallFactory(ReconnectingClientFactory): |
81 | - """ |
82 | - Factory for L{MethodCallProtocol}s exposing an object or connecting to |
83 | - L{MethodCall} servers. |
84 | - |
85 | - When used to connect, if the connection fails or is lost the factory |
86 | - will keep retrying to establish it. |
87 | - |
88 | - @cvar protocol: The factory used to build protocol instances. |
89 | - @cvar factor: The time factor by which the delay between two subsequent |
90 | - connection retries will increase. |
91 | - @cvar maxDelay: Maximum number of seconds between connection attempts. |
92 | - """ |
93 | - |
94 | - protocol = MethodCallProtocol |
95 | - factor = 1.6180339887498948 |
96 | - maxDelay = 30 |
97 | - |
98 | - def __init__(self, object=None, reactor=None): |
99 | - """ |
100 | - @param object: The object exposed by the L{MethodCallProtocol}s |
101 | - instances created by this factory. |
102 | - @param reactor: The reactor used by the created protocols |
103 | - to schedule notifications and timeouts. |
104 | - """ |
105 | - self.object = object |
106 | - self.reactor = reactor |
107 | - self.clock = self.reactor |
108 | - self.delay = self.initialDelay |
109 | - self._notifiers = [] |
110 | - |
111 | - def add_notifier(self, callback, errback=None): |
112 | - """Call the given function on connection, reconnection or give up. |
113 | - |
114 | - @param notifier: A function that will be called when the factory builds |
115 | - a new connected protocol or gives up connecting. It will be passed |
116 | - the new protocol instance as argument, or the connectionf failure. |
117 | - """ |
118 | - self._notifiers.append((callback, errback)) |
119 | - |
120 | - def remove_notifier(self, callback, errback=None): |
121 | - """Remove a notifier.""" |
122 | - self._notifiers.remove((callback, errback)) |
123 | - |
124 | - def notify_success(self, *args, **kwargs): |
125 | - """Notify all registered notifier callbacks.""" |
126 | - for callback, _ in self._notifiers: |
127 | - self.reactor.callLater(0, callback, *args, **kwargs) |
128 | - |
129 | - def notify_failure(self, failure): |
130 | - """Notify all registered notifier errbacks.""" |
131 | - for _, errback in self._notifiers: |
132 | - if errback is not None: |
133 | - self.reactor.callLater(0, errback, failure) |
134 | - |
135 | - def clientConnectionFailed(self, connector, reason): |
136 | - ReconnectingClientFactory.clientConnectionFailed(self, connector, |
137 | - reason) |
138 | - if self.maxRetries is not None and (self.retries > self.maxRetries): |
139 | - self.notify_failure(reason) # Give up |
140 | - |
141 | - def buildProtocol(self, addr): |
142 | - self.resetDelay() |
143 | - if self.object is not None: |
144 | - # XXX temporary hack to emulate the behavior of this code before |
145 | - # MethodCallReceiver was introduced |
146 | - locator = MethodCallReceiver(self.object, self.protocol.methods) |
147 | - protocol = AMP(locator=locator) |
148 | - protocol.factory = self |
149 | - else: |
150 | - protocol = ReconnectingClientFactory.buildProtocol(self, addr) |
151 | - self.notify_success(protocol) |
152 | - return protocol |
153 | - |
154 | - |
155 | class RemoteObject(object): |
156 | """An object able to transparently call methods on a remote object. |
157 | |
158 | @@ -371,8 +300,7 @@ |
159 | the remote object exposed by the peer. |
160 | """ |
161 | |
162 | - def __init__(self, sender, retry_on_reconnect=False, timeout=None, |
163 | - factory=None): |
164 | + def __init__(self, factory): |
165 | """ |
166 | @param protocol: A reference to a connected L{AMP} protocol instance, |
167 | which will be used to send L{MethodCall} commands. |
168 | @@ -383,16 +311,10 @@ |
169 | can't perform them again successfully within this number of |
170 | seconds, they will errback with a L{MethodCallError}. |
171 | """ |
172 | - self._sender = sender |
173 | + self._sender = None |
174 | + self._pending_requests = {} |
175 | self._factory = factory |
176 | - self._retry_on_reconnect = retry_on_reconnect |
177 | - self._timeout = timeout |
178 | - self._pending_requests = {} |
179 | - if self._factory: |
180 | - # XXX temporary hack to emulate the behavior of this code before |
181 | - # MethodCallReceiver was introduced |
182 | - self._reactor = factory.reactor |
183 | - self._factory.add_notifier(self._handle_reconnect) |
184 | + self._factory.notifyOnConnect(self._handle_connect) |
185 | |
186 | def __getattr__(self, method): |
187 | """Return a function sending a L{MethodCall} for the given C{method}. |
188 | @@ -445,7 +367,7 @@ |
189 | the given deferred. |
190 | """ |
191 | is_method_call_error = failure.type is MethodCallError |
192 | - dont_retry = self._retry_on_reconnect is False |
193 | + dont_retry = self._factory.retryOnReconnect is False |
194 | |
195 | if is_method_call_error or dont_retry: |
196 | # This means either that the connection is working, and a |
197 | @@ -459,23 +381,26 @@ |
198 | deferred.errback(failure) |
199 | return |
200 | |
201 | - if self._timeout and call is None: |
202 | + if self._factory.retryTimeout and call is None: |
203 | # This is the first failure for this request, let's schedule a |
204 | # timeout call. |
205 | timeout = Failure(MethodCallError("timeout")) |
206 | - call = self._reactor.callLater(self._timeout, |
207 | - self._handle_failure, |
208 | - timeout, method, args, |
209 | - kwargs, deferred=deferred) |
210 | + call = self._factory.clock.callLater(self._factory.retryTimeout, |
211 | + self._handle_failure, |
212 | + timeout, method, args, |
213 | + kwargs, deferred=deferred) |
214 | |
215 | self._pending_requests[deferred] = (method, args, kwargs, call) |
216 | |
217 | - def _handle_reconnect(self, protocol): |
218 | + def _handle_connect(self, protocol): |
219 | """Handles a reconnection. |
220 | |
221 | @param protocol: The newly connected protocol instance. |
222 | """ |
223 | - self._sender.protocol = protocol |
224 | + if self._sender is None: |
225 | + self._sender = MethodCallSender(protocol, self._factory.clock) |
226 | + else: |
227 | + self._sender.protocol = protocol |
228 | if self._retry_on_reconnect: |
229 | self._retry() |
230 | |
231 | @@ -498,13 +423,124 @@ |
232 | deferred=deferred, call=call) |
233 | |
234 | |
235 | +class MethodCallClientFactory(ReconnectingClientFactory): |
236 | + """ |
237 | + Factory for L{MethodCallProtocol}s exposing an object or connecting to |
238 | + L{MethodCall} servers. |
239 | + |
240 | + When used to connect, if the connection fails or is lost the factory |
241 | + will keep retrying to establish it. |
242 | + |
243 | + @ivar factor: The time factor by which the delay between two subsequent |
244 | + connection retries will increase. |
245 | + @ivar maxDelay: Maximum number of seconds between connection attempts. |
246 | + @ivar protocol: The factory used to build protocol instances. |
247 | + @ivar remote: The factory used to build remote object instances. |
248 | + @ivar retryOnReconnect: If C{True}, the remote object returned by the |
249 | + C{getRemoteObject} method will retry requests that failed, as a |
250 | + result of a lost connection, as soon as a new connection is available. |
251 | + @param retryTimeout: A timeout for retrying requests, if the remote object |
252 | + can't perform them again successfully within this number of seconds, |
253 | + they will errback with a L{MethodCallError}. |
254 | + """ |
255 | + |
256 | + factor = 1.6180339887498948 |
257 | + maxDelay = 30 |
258 | + |
259 | + protocol = MethodCallClientProtocol |
260 | + remote = RemoteObject |
261 | + |
262 | + retryOnReconnect = False |
263 | + retryTimeout = None |
264 | + |
265 | + def __init__(self, reactor=None, object=None): |
266 | + """ |
267 | + @param object: The object exposed by the L{MethodCallProtocol}s |
268 | + instances created by this factory. |
269 | + @param reactor: The reactor used by the created protocols |
270 | + to schedule notifications and timeouts. |
271 | + """ |
272 | + self.object = object # XXX |
273 | + self.reactor = reactor |
274 | + self.clock = self.reactor |
275 | + self.delay = self.initialDelay |
276 | + self._connects = [] |
277 | + self._requests = [] |
278 | + self._remote = None |
279 | + |
280 | + def getRemoteObject(self): |
281 | + """Get a L{RemoteObject} as soon as the connection is ready. |
282 | + |
283 | + @return: A C{Deferred} firing with a connected L{RemoteObject}. |
284 | + """ |
285 | + if self._remote is not None: |
286 | + return succeed(self._remote) |
287 | + deferred = Deferred() |
288 | + self._requests.append(deferred) |
289 | + return deferred |
290 | + |
291 | + def notifyOnConnect(self, callback): |
292 | + """Invoke the given C{callback} when a connection is re-established.""" |
293 | + self._connects.append(callback) |
294 | + |
295 | + def dontNotifyOnConnect(self, callback): |
296 | + """Remove the given C{callback} from listeners.""" |
297 | + self._connects.remove(callback) |
298 | + |
299 | + def clientConnectionMade(self, protocol): |
300 | + """Called when a newly built protocol gets connected.""" |
301 | + if self._remote is None: |
302 | + # This is the first time we successfully connect |
303 | + self._remote = self.remote(self) |
304 | + |
305 | + for callback in self._connects: |
306 | + callback(protocol) |
307 | + |
308 | + # In all cases fire pending requests |
309 | + self._fire_requests(self._remote) |
310 | + |
311 | + def clientConnectionFailed(self, connector, reason): |
312 | + """Try to connect again or errback pending request.""" |
313 | + ReconnectingClientFactory.clientConnectionFailed(self, connector, |
314 | + reason) |
315 | + if self._callID is None: |
316 | + # The factory won't retry to connect, so notify that we failed |
317 | + self._fire_requests(reason) |
318 | + |
319 | + def buildProtocol(self, addr): |
320 | + self.resetDelay() |
321 | + if self.object is not None: |
322 | + # XXX temporary hack to emulate the behavior of this code before |
323 | + # MethodCallReceiver was introduced |
324 | + locator = MethodCallReceiver(self.object, self.protocol.methods) |
325 | + protocol = AMP(locator=locator) |
326 | + protocol.factory = self |
327 | + else: |
328 | + protocol = ReconnectingClientFactory.buildProtocol(self, addr) |
329 | + return protocol |
330 | + |
331 | + def _fire_requests(self, result): |
332 | + """ |
333 | + Fire all pending L{getRemoteObject} deferreds with the given C{result}. |
334 | + """ |
335 | + requests = self._requests[:] |
336 | + self._requests = [] |
337 | + |
338 | + for deferred in requests: |
339 | + deferred.callback(result) |
340 | + |
341 | + |
342 | +class MethodCallFactory(MethodCallClientFactory): |
343 | + """XXX placeholder""" |
344 | + |
345 | + |
346 | class RemoteObjectConnector(object): |
347 | """Connect to remote objects exposed by a L{MethodCallProtocol}.""" |
348 | |
349 | - factory = MethodCallFactory |
350 | + factory = MethodCallClientFactory |
351 | remote = RemoteObject |
352 | |
353 | - def __init__(self, reactor, socket_path, *args, **kwargs): |
354 | + def __init__(self, reactor, socket_path): |
355 | """ |
356 | @param reactor: A reactor able to connect to Unix sockets. |
357 | @param socket: The path to the socket we want to connect to. |
358 | @@ -513,9 +549,6 @@ |
359 | """ |
360 | self._socket_path = socket_path |
361 | self._reactor = reactor |
362 | - self._args = args |
363 | - self._kwargs = kwargs |
364 | - self._remote = None |
365 | self._factory = None |
366 | |
367 | def connect(self, max_retries=None, factor=None): |
368 | @@ -530,37 +563,18 @@ |
369 | delay between subsequent retries should increase. Smaller values |
370 | result in a faster reconnection attempts pace. |
371 | """ |
372 | - self._connected = Deferred() |
373 | self._factory = self.factory(reactor=self._reactor) |
374 | self._factory.maxRetries = max_retries |
375 | if factor: |
376 | self._factory.factor = factor |
377 | - self._factory.add_notifier(self._success, self._failure) |
378 | self._reactor.connectUNIX(self._socket_path, self._factory) |
379 | - return self._connected |
380 | - |
381 | - def _success(self, result): |
382 | - """Called when the first connection has been established""" |
383 | - |
384 | - # We did our job, remove our own notifier and let the remote object |
385 | - # handle reconnections. |
386 | - self._factory.remove_notifier(self._success, self._failure) |
387 | - sender = MethodCallSender(result, self._reactor) |
388 | - # XXX temporary hack to emulate the behavior of this code before |
389 | - # MethodCallReceiver was introduced |
390 | - self._kwargs["factory"] = self._factory |
391 | - self._remote = self.remote(sender, *self._args, **self._kwargs) |
392 | - self._connected.callback(self._remote) |
393 | - |
394 | - def _failure(self, failure): |
395 | - """Called when the first connection has failed""" |
396 | - self._connected.errback(failure) |
397 | + return self._factory.getRemoteObject() |
398 | |
399 | def disconnect(self): |
400 | """Disconnect the L{RemoteObject} that we have created.""" |
401 | if self._factory: |
402 | self._factory.stopTrying() |
403 | - if self._remote: |
404 | - if self._remote._sender.protocol.transport: |
405 | - self._remote._sender.protocol.transport.loseConnection() |
406 | - self._remote = None |
407 | + remote = self._factory._remote |
408 | + if remote: |
409 | + if remote._sender.protocol.transport: |
410 | + remote._sender.protocol.transport.loseConnection() |
411 | |
412 | === modified file 'landscape/lib/tests/test_amp.py' |
413 | --- landscape/lib/tests/test_amp.py 2013-04-18 09:32:04 +0000 |
414 | +++ landscape/lib/tests/test_amp.py 2013-04-23 13:44:28 +0000 |
415 | @@ -4,12 +4,12 @@ |
416 | from twisted.internet.error import ConnectionDone, ConnectError |
417 | from twisted.internet.task import Clock |
418 | from twisted.protocols.amp import AMP |
419 | +from twisted.python.failure import Failure |
420 | |
421 | from landscape.lib.amp import ( |
422 | - MethodCallError, MethodCallProtocol, MethodCallFactory, RemoteObject, |
423 | + MethodCallError, MethodCallProtocol, MethodCallClientFactory, RemoteObject, |
424 | RemoteObjectConnector, MethodCallReceiver, MethodCallSender) |
425 | from landscape.tests.helpers import LandscapeTest |
426 | -from landscape.tests.mocker import KWARGS |
427 | |
428 | |
429 | class FakeTransport(object): |
430 | @@ -149,11 +149,14 @@ |
431 | METHODS = WordsProtocol.methods |
432 | |
433 | |
434 | -class WordsFactory(MethodCallFactory): |
435 | +class WordsFactory(MethodCallClientFactory): |
436 | |
437 | protocol = WordsProtocol |
438 | factor = 0.19 |
439 | |
440 | + retryOnReconnect = True |
441 | + retryTimeout = 0.7 |
442 | + |
443 | |
444 | class RemoteWordsConnector(RemoteObjectConnector): |
445 | |
446 | @@ -339,18 +342,20 @@ |
447 | server = AMP(locator=MethodCallReceiver(Words(self.clock), METHODS)) |
448 | client = AMP() |
449 | self.connection = FakeConnection(client, server) |
450 | - sender = MethodCallSender(client, self.clock) |
451 | - send_method_call = sender.send_method_call |
452 | + factory = WordsFactory(self.clock) |
453 | + self.remote = RemoteObject(factory) |
454 | + factory.clientConnectionMade(client) |
455 | + |
456 | + send_method_call = self.remote._sender.send_method_call |
457 | |
458 | def synchronous_send_method_call(method, args=[], kwargs={}): |
459 | # Transparently flush the connection after a send_method_call |
460 | - # invokation |
461 | + # invocation |
462 | deferred = send_method_call(method, args=args, kwargs=kwargs) |
463 | self.connection.flush() |
464 | return deferred |
465 | |
466 | - sender.send_method_call = synchronous_send_method_call |
467 | - self.remote = RemoteObject(sender) |
468 | + self.remote._sender.send_method_call = synchronous_send_method_call |
469 | |
470 | def test_method_call_sender_with_forbidden_method(self): |
471 | """ |
472 | @@ -526,12 +531,12 @@ |
473 | failure.trap(MethodCallError) |
474 | |
475 | |
476 | -class MethodCallFactoryTest(LandscapeTest): |
477 | +class MethodCallClientFactoryTest(LandscapeTest): |
478 | |
479 | def setUp(self): |
480 | - super(MethodCallFactoryTest, self).setUp() |
481 | + super(MethodCallClientFactoryTest, self).setUp() |
482 | self.clock = Clock() |
483 | - self.factory = MethodCallFactory(reactor=self.clock) |
484 | + self.factory = MethodCallClientFactory(reactor=self.clock) |
485 | |
486 | def test_max_delay(self): |
487 | """ |
488 | @@ -540,60 +545,67 @@ |
489 | """ |
490 | self.assertEqual(self.factory.maxDelay, 30) |
491 | |
492 | - def test_add_notifier(self): |
493 | - """ |
494 | - The L{MethodCallClientFactory.add_notifier} method can be used to |
495 | - add a callback function to be called when a connection is made and |
496 | - a new protocol instance built. |
497 | - """ |
498 | - protocol = self.factory.protocol() |
499 | - self.factory.protocol = lambda: protocol |
500 | - callback = self.mocker.mock() |
501 | - callback(protocol) |
502 | - self.mocker.replay() |
503 | - self.factory.add_notifier(callback) |
504 | - self.factory.buildProtocol(None) |
505 | - self.clock.advance(0) |
506 | - |
507 | - def test_remove_notifier(self): |
508 | - """ |
509 | - The L{MethodCallClientFactory.remove_notifier} method can be used to |
510 | - remove a previously added notifier callback. |
511 | - """ |
512 | - callback = lambda protocol: 1 / 0 |
513 | - self.factory.add_notifier(callback) |
514 | - self.factory.remove_notifier(callback) |
515 | - self.factory.buildProtocol(None) |
516 | - self.clock.advance(0) |
517 | + def test_connect_notifier(self): |
518 | + """ |
519 | + The C{notifyOnConnect} method supports specifying a callback that |
520 | + will be invoked when a connection has been established. |
521 | + """ |
522 | + protocols = [] |
523 | + self.factory.notifyOnConnect(protocols.append) |
524 | + protocol = self.factory.buildProtocol(None) |
525 | + protocol.connectionMade() |
526 | + self.assertEqual([protocol], protocols) |
527 | + |
528 | + def test_connect_notifier_with_reconnect(self): |
529 | + """ |
530 | + The C{notifyOnConnect} method will also callback when a connection is |
531 | + re-established after it was lost. |
532 | + """ |
533 | + protocols = [] |
534 | + self.factory.notifyOnConnect(protocols.append) |
535 | + protocol1 = self.factory.buildProtocol(None) |
536 | + protocol1.connectionMade() |
537 | + protocol2 = self.factory.buildProtocol(None) |
538 | + protocol2.connectionMade() |
539 | + self.assertEqual([protocol1, protocol2], protocols) |
540 | + |
541 | + def test_get_remote_object(self): |
542 | + """ |
543 | + The C{getRemoteObject} method returns a deferred firing with a |
544 | + connected L{RemoteBroker}. |
545 | + """ |
546 | + deferred = self.factory.getRemoteObject() |
547 | + protocol = self.factory.buildProtocol(None) |
548 | + protocol.connectionMade() |
549 | + self.assertIsInstance(self.successResultOf(deferred), RemoteObject) |
550 | + |
551 | + def test_get_remote_object_failure(self): |
552 | + """ |
553 | + If the factory fails to establish a connection the deferreds returned |
554 | + by C{getRemoteObject} will fail. |
555 | + """ |
556 | + deferred = self.factory.getRemoteObject() |
557 | + self.factory.continueTrying = False # Don't retry |
558 | + self.factory.clientConnectionFailed(None, Failure(ConnectError())) |
559 | + self.failureResultOf(deferred).trap(ConnectError) |
560 | |
561 | def test_client_connection_failed(self): |
562 | """ |
563 | The L{MethodCallClientFactory} keeps trying to connect if maxRetries |
564 | is not reached. |
565 | """ |
566 | - # This is sub-optimal but the ReconnectingFactory in Hardy's Twisted |
567 | - # doesn't support task.Clock |
568 | - self.factory.retry = self.mocker.mock() |
569 | - self.factory.retry(KWARGS) |
570 | - self.mocker.replay() |
571 | + class FakeConnector(object): |
572 | + called = False |
573 | + |
574 | + def connect(self): |
575 | + self.called = True |
576 | + |
577 | + connector = FakeConnector() |
578 | self.assertEqual(self.factory.retries, 0) |
579 | - self.factory.clientConnectionFailed(None, None) |
580 | - |
581 | - def test_client_connection_failed_with_max_retries_reached(self): |
582 | - """ |
583 | - The L{MethodCallClientFactory} stops trying to connect if maxRetries |
584 | - is reached. |
585 | - """ |
586 | - callback = lambda protocol: 1 / 0 |
587 | - errback = self.mocker.mock() |
588 | - errback("failure") |
589 | - self.mocker.replay() |
590 | - |
591 | - self.factory.add_notifier(callback, errback) |
592 | - self.factory.maxRetries = 1 |
593 | - self.factory.retries = self.factory.maxRetries |
594 | - self.factory.clientConnectionFailed(object(), "failure") |
595 | - self.clock.advance(0) |
596 | + self.factory.clientConnectionFailed(connector, None) |
597 | + self.assertEqual(self.factory.retries, 1) |
598 | + self.clock.advance(5) |
599 | + self.assertTrue(connector.called) |
600 | |
601 | |
602 | class RemoteObjectConnectorTest(LandscapeTest): |
603 | @@ -605,9 +617,7 @@ |
604 | self.server_factory.protocol = lambda: ( |
605 | AMP(locator=MethodCallReceiver(Words(reactor), METHODS))) |
606 | self.port = reactor.listenUNIX(self.socket, self.server_factory) |
607 | - self.connector = RemoteWordsConnector(reactor, self.socket, |
608 | - retry_on_reconnect=True, |
609 | - timeout=0.7) |
610 | + self.connector = RemoteWordsConnector(reactor, self.socket) |
611 | |
612 | def set_remote(words): |
613 | self.words = words |
614 | @@ -664,7 +674,6 @@ |
615 | """ |
616 | self.connector.disconnect() |
617 | self.connector.disconnect() |
618 | - self.assertIs(self.connector._remote, None) |
619 | |
620 | def test_disconnect_without_connect(self): |
621 | """ |
622 | @@ -746,8 +755,9 @@ |
623 | """ |
624 | self.words._sender.protocol.transport.loseConnection() |
625 | self.port.stopListening() |
626 | + real_handle_connect = self.words._handle_connect |
627 | |
628 | - def handle_reconnect(protocol): |
629 | + def handle_connect(protocol): |
630 | # In this precise moment we have a newly connected protocol |
631 | self.words._sender.protocol = protocol |
632 | |
633 | @@ -759,8 +769,8 @@ |
634 | reactor.callLater(0, self.words._retry) |
635 | |
636 | # Restore the real handler and start listening again very soon |
637 | - self.connector._factory.remove_notifier(handle_reconnect) |
638 | - self.connector._factory.add_notifier(self.words._handle_reconnect) |
639 | + self.connector._factory.dontNotifyOnConnect(handle_connect) |
640 | + self.connector._factory.notifyOnConnect(real_handle_connect) |
641 | reactor.callLater(0.2, restart_listening) |
642 | |
643 | def restart_listening(): |
644 | @@ -770,8 +780,8 @@ |
645 | self.assertEqual(str(error), "Forbidden method 'secret'") |
646 | |
647 | # Use our own reconnect handler |
648 | - self.connector._factory.remove_notifier(self.words._handle_reconnect) |
649 | - self.connector._factory.add_notifier(handle_reconnect) |
650 | + self.connector._factory.dontNotifyOnConnect(real_handle_connect) |
651 | + self.connector._factory.notifyOnConnect(handle_connect) |
652 | |
653 | reactor.callLater(0.2, restart_listening) |
654 | result = self.words.secret() |
655 | @@ -828,7 +838,7 @@ |
656 | return result |
657 | |
658 | reactor.callLater(0.1, restart_listening) |
659 | - self.words._retry_on_reconnect = False |
660 | + self.words._factory.retryOnReconnect = False |
661 | result = self.words.empty() |
662 | self.assertFailure(result, ConnectionDone) |
663 | reconnected = Deferred() |
+1 Sweet
[1] this text is a little hard to follow:
248 + @ivar retryOnReconnect: If C{True}, the remote object returned by the
249 + C{getRemoteObject} method will retry to perform again requests that
250 + failed due to a lost connection, as soon as a new connection is
251 + available.
I'd rewrite it as:
@ivar retryOnReconnect: If C{True}, the remote object returned
by the C{getRemoteObject} method will retry requests that
failed, as a result of a lost connection, as soon as a new
connection is available.
[2] "a the" should be just "a" or just "the". Either works, I'd go for "a".
517 + """
518 + The C{notifyOnConnect} method supports specifying a callback that
519 + will be invoked when a the connection has been established.
520 + """
[3] Again a little hard to follow:
528 + """
529 + The C{notifyOnConnect} fires the callbacks also then a connection is
530 + re-established after it was lost.
531 + """
Something like this would be better:
The C{notifyOnConnect} will also callback when a connection is
re-established after it was lost.