Merge lp:~free.ekanayaka/landscape-client/service-protocol into lp:~landscape/landscape-client/amp-trunk

Proposed by Free Ekanayaka
Status: Merged
Merge reported by: Free Ekanayaka
Merged at revision: not available
Proposed branch: lp:~free.ekanayaka/landscape-client/service-protocol
Merge into: lp:~landscape/landscape-client/amp-trunk
Diff against target: 1773 lines (+1652/-11) (has conflicts)
9 files modified
landscape/__init__.py (+5/-0)
landscape/amp.py (+75/-0)
landscape/broker/amp.py (+21/-8)
landscape/broker/tests/helpers.py (+17/-0)
landscape/broker/tests/test_server.py (+217/-0)
landscape/lib/amp.py (+461/-0)
landscape/lib/tests/test_amp.py (+716/-0)
landscape/reactor.py (+14/-3)
landscape/tests/test_amp.py (+126/-0)
Text conflict in landscape/__init__.py
Text conflict in landscape/broker/amp.py
Text conflict in landscape/broker/tests/helpers.py
Text conflict in landscape/broker/tests/test_server.py
Text conflict in landscape/lib/amp.py
Text conflict in landscape/lib/tests/test_amp.py
To merge this branch: bzr merge lp:~free.ekanayaka/landscape-client/service-protocol
Reviewer Review Type Date Requested Status
Kapil Thangavelu (community) Approve
Kevin McDermott (community) Approve
Review via email: mp+20579@code.launchpad.net
To post a comment you must log in.
223. By Free Ekanayaka

Remove unused max_retries

Revision history for this message
Kevin McDermott (bigkevmcd) wrote :

Looks nice and small, thanks for keeping the branches small :-)

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Thanks Kevin!

@reviewers: For some reason the diff showed below is gigantic, but the actual one is only 282 lines (maybe LP is having troubles with criss-cross merges), however merging this branch to amp-trunk locally with bzr should be just fine.

224. By Free Ekanayaka

Simplified MethodCall classes by introducing a factory that can be used both for server-side and client-side protocols

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

looks good to me, nice work +1

review: Approve
225. By Free Ekanayaka

Drop the Landscape suffix for sake of brevity

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'landscape/__init__.py'
2--- landscape/__init__.py 2010-03-03 17:06:50 +0000
3+++ landscape/__init__.py 2010-03-05 12:44:28 +0000
4@@ -1,5 +1,10 @@
5+<<<<<<< TREE
6 DEBIAN_REVISION = ""
7 UPSTREAM_VERSION = "1.4.4"
8+=======
9+DEBIAN_REVISION = "-0ubuntu0.9.10.0"
10+UPSTREAM_VERSION = "1.4.0"
11+>>>>>>> MERGE-SOURCE
12 VERSION = "%s%s" % (UPSTREAM_VERSION, DEBIAN_REVISION)
13
14 # The "server-api" field of outgoing messages will be set to this value, and
15
16=== added file 'landscape/amp.py'
17--- landscape/amp.py 1970-01-01 00:00:00 +0000
18+++ landscape/amp.py 2010-03-05 12:44:28 +0000
19@@ -0,0 +1,75 @@
20+import os
21+import logging
22+
23+from landscape.lib.amp import (
24+ MethodCallProtocol, MethodCallFactory, RemoteObjectCreator)
25+
26+
27+class LandscapeComponentProtocol(MethodCallProtocol):
28+ """Communication protocol between the various Landscape components.
29+
30+ It can be used both as server-side protocol for exposing the methods of a
31+ certain Landscape component, or as client-side protocol for connecting to
32+ another Landscape component we want to call the methods of.
33+ """
34+ methods = ["ping", "exit"]
35+ timeout = 60
36+
37+
38+class LandscapeComponentFactory(MethodCallFactory):
39+
40+ protocol = LandscapeComponentProtocol
41+
42+
43+class RemoteLandscapeComponentCreator(RemoteObjectCreator):
44+ """Utility superclass for creating connections with a Landscape component.
45+
46+ @cvar component: The class of the component to connect to, it is expected
47+ to define a C{name} class attribute, which will be used to find out
48+ the socket to use. It must be defined by sub-classes.
49+ """
50+
51+ factory = LandscapeComponentFactory
52+
53+ def __init__(self, reactor, config, *args, **kwargs):
54+ """
55+ @param reactor: A L{TwistedReactor} object.
56+ @param config: A L{LandscapeConfiguration}.
57+ @param args: Positional arguments for protocol factory constructor.
58+ @param kwargs: Keyword arguments for protocol factory constructor.
59+
60+ @see: L{MethodCallClientFactory}.
61+ """
62+ self._twisted_reactor = reactor
63+ socket = os.path.join(config.data_path, self.component.name + ".sock")
64+ super(RemoteLandscapeComponentCreator, self).__init__(
65+ self._twisted_reactor._reactor, socket, *args, **kwargs)
66+
67+ def connect(self, max_retries=None):
68+ """Connect to the remote Landscape component.
69+
70+ If the connection is lost after having been established, and then
71+ it is established again by the reconnect mechanism, an event will
72+ be fired.
73+
74+ @param max_retries: If given, the connector will keep trying to connect
75+ up to that number of times, if the first connection attempt fails.
76+ """
77+
78+ def fire_reconnect(remote):
79+ self._twisted_reactor.fire("%s-reconnect" %
80+ self.component.name)
81+
82+ def connected(remote):
83+ self._factory.add_notifier(fire_reconnect)
84+ return remote
85+
86+ def log_error(failure):
87+ logging.error("Error while connecting to %s", self.component.name)
88+ return failure
89+
90+ result = super(RemoteLandscapeComponentCreator, self).connect(
91+ max_retries=max_retries)
92+ result.addErrback(log_error)
93+ result.addCallback(connected)
94+ return result
95
96=== modified file 'landscape/broker/amp.py'
97--- landscape/broker/amp.py 2010-02-11 14:29:50 +0000
98+++ landscape/broker/amp.py 2010-03-05 12:44:28 +0000
99@@ -1,8 +1,21 @@
100-class RemoteClient(object):
101- """A connected client utilizing features provided by a L{BrokerServer}."""
102-
103- def __init__(self, name):
104- """
105- @param name: Name of the broker client.
106- """
107- self.name = name
108+<<<<<<< TREE
109+class RemoteClient(object):
110+ """A connected client utilizing features provided by a L{BrokerServer}."""
111+
112+ def __init__(self, name):
113+ """
114+ @param name: Name of the broker client.
115+ """
116+ self.name = name
117+=======
118+class RemoteClient(object):
119+ """A connected client utilizing features provided by a L{BrokerServer}."""
120+
121+ def __init__(self, name):
122+ """
123+ @param name: Name of the broker client.
124+ @param protocol: A L{BrokerServerProtocol} connection with the broker
125+ server.
126+ """
127+ self.name = name
128+>>>>>>> MERGE-SOURCE
129
130=== modified file 'landscape/broker/tests/helpers.py'
131--- landscape/broker/tests/helpers.py 2010-02-11 14:29:50 +0000
132+++ landscape/broker/tests/helpers.py 2010-03-05 12:44:28 +0000
133@@ -99,6 +99,7 @@
134 test_case.config, test_case.identity, test_case.reactor,
135 test_case.exchanger, test_case.pinger, test_case.mstore,
136 fetch_async=fetch_func)
137+<<<<<<< TREE
138
139
140 class BrokerServerHelper(RegistrationHelper):
141@@ -113,3 +114,19 @@
142 test_case.broker = BrokerServer(test_case.config, test_case.reactor,
143 test_case.exchanger, test_case.handler,
144 test_case.mstore)
145+=======
146+
147+
148+class BrokerServerHelper(RegistrationHelper):
149+ """
150+ This helper adds a broker server to the L{RegistrationHelper}. The
151+ following attributes will be set in your test case:
152+ - server: A L{BrokerServer}.
153+ """
154+
155+ def set_up(self, test_case):
156+ super(BrokerServerHelper, self).set_up(test_case)
157+ test_case.broker = BrokerServer(test_case.config, test_case.reactor,
158+ test_case.exchanger, test_case.handler,
159+ test_case.mstore)
160+>>>>>>> MERGE-SOURCE
161
162=== modified file 'landscape/broker/tests/test_server.py'
163--- landscape/broker/tests/test_server.py 2010-03-03 16:17:11 +0000
164+++ landscape/broker/tests/test_server.py 2010-03-05 12:44:28 +0000
165@@ -1,3 +1,4 @@
166+<<<<<<< TREE
167 from twisted.internet.defer import succeed, fail
168
169 from landscape.broker.amp import RemoteClient
170@@ -212,3 +213,219 @@
171 self.reactor.call_on("pre-exit", pre_exit)
172 self.reactor.call_on("post-exit", post_exit)
173 return self.assertSuccess(self.broker.exit())
174+=======
175+from twisted.internet.defer import succeed, fail
176+
177+from landscape.broker.amp import RemoteClient
178+from landscape.tests.helpers import LandscapeTest, DEFAULT_ACCEPTED_TYPES
179+from landscape.broker.tests.helpers import BrokerServerHelper
180+
181+
182+class BrokerServerTest(LandscapeTest):
183+
184+ helpers = [BrokerServerHelper]
185+
186+ def test_ping(self):
187+ """
188+ The L{BrokerServer.ping} simply returns C{True}.
189+ """
190+ self.assertTrue(self.broker.ping())
191+
192+ def test_send_message(self):
193+ """
194+ The L{BrokerServer.send_message} method forwards a message to the
195+ broker's exchanger.
196+ """
197+ message = {"type": "test"}
198+ self.mstore.set_accepted_types(["test"])
199+ self.broker.send_message(message)
200+ self.assertMessages(self.mstore.get_pending_messages(), [message])
201+ self.assertFalse(self.exchanger.is_urgent())
202+
203+ def test_send_message_with_urgent(self):
204+ """
205+ The L{BrokerServer.send_message} can optionally specify the urgency
206+ of the message.
207+ """
208+ message = {"type": "test"}
209+ self.mstore.set_accepted_types(["test"])
210+ self.broker.send_message(message, True)
211+ self.assertMessages(self.mstore.get_pending_messages(), [message])
212+ self.assertTrue(self.exchanger.is_urgent())
213+
214+ def test_is_pending(self):
215+ """
216+ The L{BrokerServer.is_pending} method indicates if a message with
217+ the given id is pending waiting for delivery in the message store.
218+ """
219+ self.assertFalse(self.broker.is_message_pending(123))
220+ message = {"type": "test"}
221+ self.mstore.set_accepted_types(["test"])
222+ message_id = self.broker.send_message(message)
223+ self.assertTrue(self.broker.is_message_pending(message_id))
224+
225+ def test_register_client(self):
226+ """
227+ The L{BrokerServer.register_client} method can be used to register
228+ client components that need to communicate with the server. After
229+ the registration they can be fetched with L{BrokerServer.get_clients}.
230+ """
231+ self.assertEquals(self.broker.get_clients(), [])
232+ self.broker.register_client("test")
233+ [client] = self.broker.get_clients()
234+ self.assertTrue(isinstance(client, RemoteClient))
235+ self.assertEquals(client.name, "test")
236+
237+ def test_stop_clients(self):
238+ """
239+ The L{BrokerServer.stop_clients} method calls the C{exit} method
240+ of each registered client, and returns a deferred resulting in C{None}
241+ if all C{exit} calls were successful.
242+ """
243+ self.broker.register_client("foo")
244+ self.broker.register_client("bar")
245+ for client in self.broker.get_clients():
246+ client.exit = self.mocker.mock()
247+ self.expect(client.exit()).result(succeed(None))
248+ self.mocker.replay()
249+ return self.assertSuccess(self.broker.stop_clients())
250+
251+ def test_stop_clients_with_failure(self):
252+ """
253+ The L{BrokerServer.stop_clients} method calls the C{exit} method
254+ of each registered client, and returns a deferred resulting in C{None}
255+ if all C{exit} calls were successful.
256+ """
257+ self.broker.register_client("foo")
258+ self.broker.register_client("bar")
259+ [client1, client2] = self.broker.get_clients()
260+ client1.exit = self.mocker.mock()
261+ client2.exit = self.mocker.mock()
262+ self.expect(client1.exit()).result(succeed(None))
263+ self.expect(client2.exit()).result(fail(Exception()))
264+ self.mocker.replay()
265+ return self.assertFailure(self.broker.stop_clients(), Exception)
266+
267+ def test_reload_configuration(self):
268+ """
269+ The L{BrokerServer.reload_configuration} method forces the config
270+ file associated with the broker server to be reloaded.
271+ """
272+ open(self.config_filename, "a").write("computer_title = New Title")
273+ result = self.broker.reload_configuration()
274+ result.addCallback(lambda x: self.assertEquals(
275+ self.config.computer_title, "New Title"))
276+ return result
277+
278+ def test_reload_configuration_stops_clients(self):
279+ """
280+ The L{BrokerServer.reload_configuration} method forces the config
281+ file associated with the broker server to be reloaded.
282+ """
283+ self.broker.register_client("foo")
284+ self.broker.register_client("bar")
285+ for client in self.broker.get_clients():
286+ client.exit = self.mocker.mock()
287+ self.expect(client.exit()).result(succeed(None))
288+ self.mocker.replay()
289+ return self.assertSuccess(self.broker.reload_configuration())
290+
291+ def test_register(self):
292+ """
293+ The L{BrokerServer.register} method attempts to register with the
294+ Ladscape server and waits for a C{set-id} message from it.
295+ """
296+ registered = self.broker.register()
297+ # This should callback the deferred.
298+ self.exchanger.handle_message({"type": "set-id", "id": "abc",
299+ "insecure-id": "def"})
300+ return self.assertSuccess(registered)
301+
302+ def test_get_accepted_types_empty(self):
303+ """
304+ The L{BrokerServer.get_accepted_message_types} returns an empty list
305+ if no message types are accepted by the Landscape server.
306+ """
307+ self.mstore.set_accepted_types([])
308+ self.assertEquals(self.broker.get_accepted_message_types(), [])
309+
310+ def test_get_accepted_message_types(self):
311+ """
312+ The L{BrokerServer.get_accepted_message_types} returns the list of
313+ message types accepted by the Landscape server.
314+ """
315+ self.mstore.set_accepted_types(["foo", "bar"])
316+ self.assertEquals(sorted(self.broker.get_accepted_message_types()),
317+ ["bar", "foo"])
318+
319+ def test_get_server_uuid_with_unset_uuid(self):
320+ """
321+ The L{BrokerServer.get_server_uuid} method returns C{None} if the uuid
322+ of the Landscape server we're pointing at is unknown.
323+ """
324+ self.assertEquals(self.broker.get_server_uuid(), None)
325+
326+ def test_get_server_uuid(self):
327+ """
328+ The L{BrokerServer.get_server_uuid} method returns the uuid of the
329+ Landscape server we're pointing at.
330+ """
331+ self.mstore.set_server_uuid("the-uuid")
332+ self.assertEquals(self.broker.get_server_uuid(), "the-uuid")
333+
334+ def test_register_client_accepted_message_type(self):
335+ """
336+ The L{BrokerServer.register_client_accepted_message_type} method can
337+ register new message types accepted by this Landscape client.
338+ """
339+ self.broker.register_client_accepted_message_type("type1")
340+ self.broker.register_client_accepted_message_type("type2")
341+ self.assertEquals(self.exchanger.get_client_accepted_message_types(),
342+ sorted(["type1", "type2"] + DEFAULT_ACCEPTED_TYPES))
343+
344+ def test_exit(self):
345+ """
346+ The L{BrokerServer.exit} method stops all registered clients.
347+ """
348+ self.broker.register_client("foo")
349+ self.broker.register_client("bar")
350+ for client in self.broker.get_clients():
351+ client.exit = self.mocker.mock()
352+ self.expect(client.exit()).result(succeed(None))
353+ self.mocker.replay()
354+ return self.assertSuccess(self.broker.exit())
355+
356+ def test_exit_exits_when_other_daemons_blow_up(self):
357+ """
358+ If a broker client blow up in its exit() methods, exit should ignore
359+ the error and exit anyway.
360+ """
361+ self.broker.register_client("foo")
362+ [client] = self.broker.get_clients()
363+ client.exit = self.mocker.mock()
364+ post_exit = self.mocker.mock()
365+ self.expect(client.exit()).result(fail(ZeroDivisionError()))
366+ post_exit()
367+ self.mocker.replay()
368+ self.reactor.call_on("post-exit", post_exit)
369+ return self.assertSuccess(self.broker.exit())
370+
371+ def test_exit_fires_reactor_events(self):
372+ """
373+ The L{BrokerServer.exit} method fires a C{pre-exit} event before the
374+ clients are stopped and a C{post-exit} event after.
375+ """
376+ self.broker.register_client("foo")
377+ [client] = self.broker.get_clients()
378+ self.mocker.order()
379+ pre_exit = self.mocker.mock()
380+ client.exit = self.mocker.mock()
381+ post_exit = self.mocker.mock()
382+ pre_exit()
383+ self.expect(client.exit()).result(fail(ZeroDivisionError()))
384+ post_exit()
385+ self.mocker.replay()
386+ self.reactor.call_on("pre-exit", pre_exit)
387+ self.reactor.call_on("post-exit", post_exit)
388+ return self.assertSuccess(self.broker.exit())
389+>>>>>>> MERGE-SOURCE
390
391=== modified file 'landscape/lib/amp.py'
392--- landscape/lib/amp.py 2010-02-08 08:10:48 +0000
393+++ landscape/lib/amp.py 2010-03-05 12:44:28 +0000
394@@ -1,3 +1,4 @@
395+<<<<<<< TREE
396 """Expose the methods of a remote object over AMP."""
397
398 from uuid import uuid4
399@@ -456,3 +457,463 @@
400 """Disconnect the L{RemoteObject} that we have created."""
401 self._factory.stopTrying()
402 self._remote._protocol.transport.loseConnection()
403+=======
404+"""Expose the methods of a remote object over AMP."""
405+
406+from uuid import uuid4
407+from twisted.internet.defer import Deferred, maybeDeferred
408+from twisted.internet.protocol import ReconnectingClientFactory
409+from twisted.protocols.amp import Argument, String, Command, AMP
410+from twisted.python.failure import Failure
411+
412+from landscape.lib.bpickle import loads, dumps, dumps_table
413+
414+
415+class MethodCallArgument(Argument):
416+ """A bpickle-compatible argument."""
417+
418+ def toString(self, inObject):
419+ """Serialize an argument."""
420+ return dumps(inObject)
421+
422+ def fromString(self, inString):
423+ """Unserialize an argument."""
424+ return loads(inString)
425+
426+ @classmethod
427+ def check(cls, inObject):
428+ """Check if an argument is serializable."""
429+ return type(inObject) in dumps_table
430+
431+
432+class MethodCallError(Exception):
433+ """Raised when a L{MethodCall} command fails."""
434+
435+
436+class MethodCall(Command):
437+ """Call a method on the object exposed by a L{MethodCallProtocol}."""
438+
439+ arguments = [("method", String()),
440+ ("args", MethodCallArgument()),
441+ ("kwargs", MethodCallArgument())]
442+
443+ response = [("result", MethodCallArgument()),
444+ ("deferred", String(optional=True))]
445+
446+ errors = {MethodCallError: "METHOD_CALL_ERROR"}
447+
448+
449+class DeferredResponse(Command):
450+ """Fire a L{Deferred} associated with an outstanding method call result."""
451+
452+ arguments = [("uuid", String()),
453+ ("result", MethodCallArgument(optional=True)),
454+ ("failure", String(optional=True))]
455+ requiresAnswer = False
456+
457+
458+class MethodCallServerProtocol(AMP):
459+ """Expose methods of a local object over AMP.
460+
461+ The object to be exposed is expected to be the C{object} attribute of our
462+ protocol factory.
463+
464+ @cvar methods: The list of exposed object's methods that can be called with
465+ the protocol. It must be defined by sub-classes.
466+ """
467+
468+ methods = []
469+
470+ @MethodCall.responder
471+ def receive_method_call(self, method, args, kwargs):
472+ """Call an object's method with the given arguments.
473+
474+ If a connected client sends a L{MethodCall} for method C{foo_bar}, then
475+ the actual method C{foo_bar} of the object associated with the protocol
476+ will be called with the given C{args} and C{kwargs} and its return
477+ value delivered back to the client as response to the command.
478+
479+ @param method: The name of the object's method to call.
480+ @param args: The arguments to pass to the method.
481+ @param kwargs: The keywords arguments to pass to the method.
482+ """
483+ if not method in self.methods:
484+ raise MethodCallError("Forbidden method '%s'" % method)
485+
486+ method_func = getattr(self.factory.object, method)
487+ result = maybeDeferred(method_func, *args, **kwargs)
488+
489+ # If the Deferred was already fired, we can return its result
490+ if result.called:
491+ if isinstance(result.result, Failure):
492+ failure = str(result.result.value)
493+ result.addErrback(lambda error: None) # Stop propagating
494+ raise MethodCallError(failure)
495+ return {"result": self._check_result(result.result)}
496+
497+ uuid = str(uuid4())
498+ result.addBoth(self.send_deferred_response, uuid)
499+ return {"result": None, "deferred": uuid}
500+
501+ def _check_result(self, result):
502+ """Check that the C{result} we're about to return is serializable.
503+
504+ @return: The C{result} itself if valid.
505+ @raises: L{MethodCallError} if C{result} is not serializable.
506+ """
507+ if not MethodCallArgument.check(result):
508+ raise MethodCallError("Non-serializable result")
509+ return result
510+
511+ def send_deferred_response(self, result, uuid):
512+ """Send a L{DeferredResponse} for the deferred with given C{uuid}.
513+
514+ This is called when the result of a L{Deferred} returned by an
515+ object's method becomes available. A L{DeferredResponse} notifying
516+ such result (either success or failure) is sent to the peer.
517+ """
518+ kwargs = {"uuid": uuid}
519+ if isinstance(result, Failure):
520+ kwargs["failure"] = str(result.value)
521+ else:
522+ kwargs["result"] = self._check_result(result)
523+ self.callRemote(DeferredResponse, **kwargs)
524+
525+
526+class MethodCallClientProtocol(AMP):
527+ """Calls methods of a remote object over L{AMP}.
528+
529+ @note: If the remote method returns a deferred, the associated local
530+ deferred returned by L{send_method_call} will result in the same
531+ callback value of the remote deferred.
532+ @cvar timeout: A timeout for remote methods returning L{Deferred}s, if a
533+ response for the deferred is not received within this amount of
534+ seconds, the remote method call will errback with a L{MethodCallError}.
535+ """
536+ timeout = 60
537+
538+ def __init__(self):
539+ AMP.__init__(self)
540+ self._pending_responses = {}
541+
542+ @DeferredResponse.responder
543+ def receive_deferred_response(self, uuid, result, failure):
544+ """Receive the deferred L{MethodCall} response.
545+
546+ @param uuid: The id of the L{MethodCall} we're getting the result of.
547+ @param result: The result of the associated deferred if successful.
548+ @param failure: The failure message of the deferred if it failed.
549+ """
550+ self.fire_pending_response(uuid, result, failure)
551+ return {}
552+
553+ def fire_pending_response(self, uuid, result, failure):
554+ """Fire the L{Deferred} associated with a pending response.
555+
556+ @param uuid: The id of the L{MethodCall} we're getting the result of.
557+ @param result: The result of the associated deferred if successful.
558+ @param failure: The failure message of the deferred if it failed.
559+ """
560+ try:
561+ deferred, call = self._pending_responses.pop(uuid)
562+ except KeyError:
563+ # Late response for a request that has timeout, just ignore it
564+ return
565+ if not call.called:
566+ call.cancel()
567+ if failure is None:
568+ deferred.callback({"result": result})
569+ else:
570+ deferred.errback(MethodCallError(failure))
571+
572+ def handle_response(self, response):
573+ """Handle a L{MethodCall} response.
574+
575+ If the response is tagged as deferred, it will be queued as pending,
576+ and a L{Deferred} is returned, which will be fired as soon as the
577+ final response becomes available, or the timeout is reached.
578+ """
579+ if response["deferred"]:
580+ uuid = response["deferred"]
581+ deferred = Deferred()
582+ call = self.factory.reactor.callLater(self.timeout,
583+ self.fire_pending_response,
584+ uuid, None, "timeout")
585+ self._pending_responses[uuid] = (deferred, call)
586+ return deferred
587+
588+ return response
589+
590+ def send_method_call(self, method, args=[], kwargs={}):
591+ """Send a L{MethodCall} command with the given arguments.
592+
593+ @param method: The name of the remote method to invoke.
594+ @param args: The positional arguments to pass to the remote method.
595+ @param kwargs: The keyword arguments to pass to the remote method.
596+ """
597+ result = self.callRemote(MethodCall,
598+ method=method, args=args, kwargs=kwargs)
599+ # The result can be C{None} only if the requested command is a
600+ # DeferredResponse, which has requiresAnswer set to False
601+ if result is not None:
602+ return result.addCallback(self.handle_response)
603+
604+
605+class MethodCallProtocol(MethodCallServerProtocol, MethodCallClientProtocol):
606+ """Can be used both for sending and receiving L{MethodCall}s."""
607+
608+ def __init__(self):
609+ MethodCallServerProtocol.__init__(self)
610+ MethodCallClientProtocol.__init__(self)
611+
612+
613+class MethodCallFactory(ReconnectingClientFactory):
614+ """
615+ Factory for L{MethodCallProtocol}s exposing an object or connecting to
616+ to L{MethodCall} servers.
617+
618+ When used to connect, if the connection fails or is lost the factory
619+ will keep retrying to establish it.
620+
621+ @cvar protocol: The factory used to build protocol instances.
622+ @cvar factor: The time factor by which the delay between two subsequent
623+ connection retries will increase.
624+ """
625+
626+ protocol = MethodCallProtocol
627+ factor = 1.6180339887498948
628+
629+ def __init__(self, object=None, reactor=None):
630+ """
631+ @param object: The object exposed by the L{MethodCallProtocol}s
632+ instances created by this factory.
633+ @param reactor: The reactor used by the created protocols
634+ to schedule notifications and timeouts.
635+ """
636+ self.object = object
637+ self.reactor = reactor
638+ self.clock = self.reactor
639+ self._notifiers = []
640+
641+ def add_notifier(self, callback, errback=None):
642+ """Call the given function on connection, reconnection or give up.
643+
644+ @param notifier: A function that will be called when the factory builds
645+ a new connected protocol or gives up connecting. It will be passed
646+ the new protocol instance as argument, or the connectionf failure.
647+ """
648+ self._notifiers.append((callback, errback))
649+
650+ def remove_notifier(self, callback, errback=None):
651+ """Remove a notifier."""
652+ self._notifiers.remove((callback, errback))
653+
654+ def notify_success(self, *args, **kwargs):
655+ """Notify all registered notifier callbacks."""
656+ for callback, _ in self._notifiers:
657+ self.reactor.callLater(0, callback, *args, **kwargs)
658+
659+ def notify_failure(self, failure):
660+ """Notify all registered notifier errbacks."""
661+ for _, errback in self._notifiers:
662+ if errback is not None:
663+ self.reactor.callLater(0, errback, failure)
664+
665+ def clientConnectionFailed(self, connector, reason):
666+ ReconnectingClientFactory.clientConnectionFailed(self, connector,
667+ reason)
668+ if self.maxRetries is not None and (self.retries > self.maxRetries):
669+ self.notify_failure(reason) # Give up
670+
671+ def buildProtocol(self, addr):
672+ self.resetDelay()
673+ protocol = self.protocol()
674+ protocol.factory = self
675+ self.notify_success(protocol)
676+ return protocol
677+
678+
679+class RemoteObject(object):
680+ """An object able to transparently call methods on a remote object.
681+
682+ Any method call on a L{RemoteObject} instance will return a L{Deferred}
683+ resulting in the return value of the same method call performed on
684+ the remote object exposed by the peer.
685+ """
686+
687+ def __init__(self, protocol, retry_on_reconnect=False, timeout=None):
688+ """
689+ @param protocol: A reference to a connected L{AMP} protocol instance,
690+ which will be used to send L{MethodCall} commands.
691+ @param retry_on_reconnect: If C{True}, this L{RemoteObject} will retry
692+ to perform again requests that failed due to a lost connection, as
693+ soon as a new connection is available.
694+ @param timeout: A timeout for failed requests, if the L{RemoteObject}
695+ can't perform them again successfully within this number of
696+ seconds, they will errback with a L{MethodCallError}.
697+ """
698+ self._protocol = protocol
699+ self._factory = protocol.factory
700+ self._reactor = protocol.factory.reactor
701+ self._retry_on_reconnect = retry_on_reconnect
702+ self._timeout = timeout
703+ self._pending_requests = {}
704+ self._factory.add_notifier(self._handle_reconnect)
705+
706+ def __getattr__(self, method):
707+ """Return a function sending a L{MethodCall} for the given C{method}.
708+
709+ When the created function is called, it sends the an appropriate
710+ L{MethodCall} to the remote peer passing it the arguments and
711+ keyword arguments it was called with, and returning a L{Deferred}
712+ resulting in the L{MethodCall}'s response value.
713+ """
714+
715+ def send_method_call(*args, **kwargs):
716+ result = self._protocol.send_method_call(method=method,
717+ args=args,
718+ kwargs=kwargs)
719+ deferred = Deferred()
720+ result.addCallback(self._handle_response, deferred)
721+ result.addErrback(self._handle_failure, method, args, kwargs,
722+ deferred)
723+ return deferred
724+
725+ return send_method_call
726+
727+ def _handle_response(self, response, deferred, call=None):
728+ """Handles a successful L{MethodCall} response.
729+
730+ @param response: The L{MethodCall} response.
731+ @param deferred: The deferred that was returned to the caller.
732+ @param call: If not C{None}, the scheduled timeout call associated with
733+ the given deferred.
734+ """
735+ result = response["result"]
736+ if call is not None:
737+ call.cancel() # This is a successful retry, cancel the timeout.
738+ deferred.callback(result)
739+
740+ def _handle_failure(self, failure, method, args, kwargs, deferred,
741+ call=None):
742+ """Called when a L{MethodCall} command fails.
743+
744+ If a failure is due to a connection error and if C{retry_on_reconnect}
745+ is C{True}, we will try to perform the requested L{MethodCall} again
746+ as soon as a new connection becomes available, giving up after the
747+ specified C{timeout}, if any.
748+
749+ @param failure: The L{Failure} raised by the requested L{MethodCall}.
750+ @param name: The method name associated with the failed L{MethodCall}.
751+ @param args: The positional arguments of the failed L{MethodCall}.
752+ @param kwargs: The keyword arguments of the failed L{MethodCall}.
753+ @param deferred: The deferred that was returned to the caller.
754+ @param call: If not C{None}, the scheduled timeout call associated with
755+ the given deferred.
756+ """
757+ is_method_call_error = failure.type is MethodCallError
758+ dont_retry = self._retry_on_reconnect == False
759+
760+ if is_method_call_error or dont_retry:
761+ # This means either that the connection is working, and a
762+ # MethodCall protocol error occured, or that we gave up
763+ # trying and raised a timeout. In any case just propagate
764+ # the error.
765+ if deferred in self._pending_requests:
766+ self._pending_requests.pop(deferred)
767+ if call:
768+ call.cancel()
769+ deferred.errback(failure)
770+ return
771+
772+ if self._timeout and call is None:
773+ # This is the first failure for this request, let's schedule a
774+ # timeout call.
775+ timeout = Failure(MethodCallError("timeout"))
776+ call = self._reactor.callLater(self._timeout,
777+ self._handle_failure,
778+ timeout, method, args,
779+ kwargs, deferred=deferred)
780+
781+ self._pending_requests[deferred] = (method, args, kwargs, call)
782+
783+ def _handle_reconnect(self, protocol):
784+ """Handles a reconnection.
785+
786+ @param protocol: The newly connected protocol instance.
787+ """
788+ self._protocol = protocol
789+ if self._retry_on_reconnect:
790+ self._retry()
791+
792+ def _retry(self):
793+ """Try to perform again requests that failed."""
794+
795+ # We need to copy the requests list before iterating over it, because
796+ # if we are actually still disconnected, callRemote will return a
797+ # failed deferred and the _handle_failure errback will be executed
798+ # synchronously during the loop, modifing the requests list itself.
799+ requests = self._pending_requests.copy()
800+ self._pending_requests.clear()
801+
802+ while requests:
803+ deferred, (method, args, kwargs, call) = requests.popitem()
804+ result = self._protocol.send_method_call(method, args, kwargs)
805+ result.addCallback(self._handle_response,
806+ deferred=deferred, call=call)
807+ result.addErrback(self._handle_failure, method, args, kwargs,
808+ deferred=deferred, call=call)
809+
810+
811+class RemoteObjectCreator(object):
812+ """Connect to remote objects exposed by a L{MethodCallProtocol}."""
813+
814+ factory = MethodCallFactory
815+ remote = RemoteObject
816+
817+ def __init__(self, reactor, socket_path, *args, **kwargs):
818+ """
819+ @param reactor: A reactor able to connect to Unix sockets.
820+ @param socket: The path to the socket we want to connect to.
821+ @param args: Arguments to be passed to the created L{RemoteObject}.
822+ @param kwargs: Keyword arguments for the created L{RemoteObject}.
823+ """
824+ self._socket_path = socket_path
825+ self._reactor = reactor
826+ self._args = args
827+ self._kwargs = kwargs
828+
829+ def connect(self, max_retries=None):
830+ """Connect to a remote object exposed by a L{MethodCallProtocol}.
831+
832+ This method will connect to the socket provided in the constructor
833+ and return a L{Deferred} resulting in a connected L{RemoteObject}.
834+
835+ @param max_retries: If not C{None} give up try to connect after this
836+ amount of times.
837+ """
838+ self._connected = Deferred()
839+ self._factory = self.factory(reactor=self._reactor)
840+ self._factory.maxRetries = max_retries
841+ self._factory.add_notifier(self._success, self._failure)
842+ self._reactor.connectUNIX(self._socket_path, self._factory)
843+ return self._connected
844+
845+ def _success(self, result):
846+ """Called when the first connection has been established"""
847+
848+ # We did our job, remove our own notifier and let the remote object
849+ # handle reconnections.
850+ self._factory.remove_notifier(self._success, self._failure)
851+ self._remote = self.remote(result, *self._args, **self._kwargs)
852+ self._connected.callback(self._remote)
853+
854+ def _failure(self, failure):
855+ """Called when the first connection has failed"""
856+ self._connected.errback(failure)
857+
858+ def disconnect(self):
859+ """Disconnect the L{RemoteObject} that we have created."""
860+ self._factory.stopTrying()
861+ self._remote._protocol.transport.loseConnection()
862+>>>>>>> MERGE-SOURCE
863
864=== modified file 'landscape/lib/tests/test_amp.py'
865--- landscape/lib/tests/test_amp.py 2010-02-12 09:50:07 +0000
866+++ landscape/lib/tests/test_amp.py 2010-03-05 12:44:28 +0000
867@@ -1,3 +1,4 @@
868+<<<<<<< TREE
869 from twisted.internet import reactor
870 from twisted.internet.defer import Deferred, DeferredList
871 from twisted.internet.protocol import ClientCreator
872@@ -720,3 +721,718 @@
873 self.assertFailure(result, MethodCallError)
874 reconnected = Deferred()
875 return result.addCallback(assert_failure)
876+=======
877+from twisted.internet import reactor
878+from twisted.internet.defer import Deferred, DeferredList
879+from twisted.internet.protocol import ClientCreator
880+from twisted.internet.error import ConnectionDone, ConnectError
881+from twisted.internet.task import Clock
882+
883+from landscape.lib.amp import (
884+ MethodCallError, MethodCallProtocol, MethodCallFactory, RemoteObject,
885+ RemoteObjectCreator)
886+from landscape.tests.helpers import LandscapeTest
887+from landscape.tests.mocker import KWARGS
888+
889+
890+class WordsException(Exception):
891+ """Test exception."""
892+
893+
894+class Words(object):
895+ """Test class to be used as target object of a L{MethodCallProtocol}."""
896+
897+ def secret(self):
898+ raise RuntimeError("I'm not supposed to be called!")
899+
900+ def empty(self):
901+ pass
902+
903+ def motd(self):
904+ return "Words are cool"
905+
906+ def capitalize(self, word):
907+ return word.capitalize()
908+
909+ def is_short(self, word):
910+ return len(word) < 4
911+
912+ def concatenate(self, word1, word2):
913+ return word1 + word2
914+
915+ def lower_case(self, word, index=None):
916+ if index is None:
917+ return word.lower()
918+ else:
919+ return word[:index] + word[index:].lower()
920+
921+ def multiply_alphabetically(self, word_times):
922+ result = ""
923+ for word, times in sorted(word_times.iteritems()):
924+ result += word * times
925+ return result
926+
927+ def meaning_of_life(self):
928+
929+ class Complex(object):
930+ pass
931+ return Complex()
932+
933+ def _check(self, word, seed, value=3):
934+ if seed == "cool" and value == 4:
935+ return "Guessed!"
936+
937+ def guess(self, word, *args, **kwargs):
938+ return self._check(word, *args, **kwargs)
939+
940+ def translate(self, word):
941+ raise WordsException("Unknown word")
942+
943+ def google(self, word):
944+ deferred = Deferred()
945+ if word == "Landscape":
946+ reactor.callLater(0.01, lambda: deferred.callback("Cool!"))
947+ elif word == "Easy query":
948+ deferred.callback("Done!")
949+ elif word == "Weird stuff":
950+ reactor.callLater(0.01, lambda: deferred.errback(Exception("bad")))
951+ elif word == "Censored":
952+ deferred.errback(Exception("very bad"))
953+ elif word == "Long query":
954+ # Do nothing, the deferred won't be fired at all
955+ pass
956+ elif word == "Slowish query":
957+ # Fire the result after a while.
958+ reactor.callLater(0.05, lambda: deferred.callback("Done!"))
959+ return deferred
960+
961+
962+class WordsProtocol(MethodCallProtocol):
963+
964+ methods = ["empty",
965+ "motd",
966+ "capitalize",
967+ "is_short",
968+ "concatenate",
969+ "lower_case",
970+ "multiply_alphabetically",
971+ "translate",
972+ "meaning_of_life",
973+ "guess",
974+ "google"]
975+
976+ timeout = 0.1
977+
978+
979+class WordsFactory(MethodCallFactory):
980+
981+ protocol = WordsProtocol
982+ factor = 0.19
983+
984+
985+class RemoteWordsCreator(RemoteObjectCreator):
986+
987+ factory = WordsFactory
988+
989+
990+class MethodCallProtocolTest(LandscapeTest):
991+
992+ def setUp(self):
993+ super(MethodCallProtocolTest, self).setUp()
994+ socket = self.mktemp()
995+ factory = WordsFactory(object=Words())
996+ self.port = reactor.listenUNIX(socket, factory)
997+
998+ def set_protocol(protocol):
999+ self.protocol = protocol
1000+
1001+ connector = ClientCreator(reactor, WordsProtocol)
1002+ connected = connector.connectUNIX(socket)
1003+ return connected.addCallback(set_protocol)
1004+
1005+ def tearDown(self):
1006+ self.protocol.transport.loseConnection()
1007+ self.port.stopListening()
1008+ super(MethodCallProtocolTest, self).tearDown()
1009+
1010+ def test_with_forbidden_method(self):
1011+ """
1012+ If a method is not included in L{MethodCallProtocol.methods} it
1013+ can't be called.
1014+ """
1015+ result = self.protocol.send_method_call(method="secret",
1016+ args=[],
1017+ kwargs={})
1018+ return self.assertFailure(result, MethodCallError)
1019+
1020+ def test_with_no_arguments(self):
1021+ """
1022+ A connected client can issue a L{MethodCall} without arguments and
1023+ with an empty response.
1024+ """
1025+ result = self.protocol.send_method_call(method="empty",
1026+ args=[],
1027+ kwargs={})
1028+ return self.assertSuccess(result, {"result": None,
1029+ "deferred": None})
1030+
1031+ def test_with_return_value(self):
1032+ """
1033+ A connected client can issue a L{MethodCall} targeted to an
1034+ object method with a return value.
1035+ """
1036+ result = self.protocol.send_method_call(method="motd",
1037+ args=[],
1038+ kwargs={})
1039+ return self.assertSuccess(result, {"result": "Words are cool",
1040+ "deferred": None})
1041+
1042+ def test_with_one_argument(self):
1043+ """
1044+ A connected AMP client can issue a L{MethodCall} with one argument and
1045+ a response value.
1046+ """
1047+ result = self.protocol.send_method_call(method="capitalize",
1048+ args=["john"],
1049+ kwargs={})
1050+ return self.assertSuccess(result, {"result": "John",
1051+ "deferred": None})
1052+
1053+ def test_with_boolean_return_value(self):
1054+ """
1055+ The return value of a L{MethodCall} argument can be a boolean.
1056+ """
1057+ result = self.protocol.send_method_call(method="is_short",
1058+ args=["hi"],
1059+ kwargs={})
1060+ return self.assertSuccess(result, {"result": True,
1061+ "deferred": None})
1062+
1063+ def test_with_many_arguments(self):
1064+ """
1065+ A connected client can issue a L{MethodCall} with many arguments.
1066+ """
1067+ result = self.protocol.send_method_call(method="concatenate",
1068+ args=["You ", "rock"],
1069+ kwargs={})
1070+ return self.assertSuccess(result, {"result": "You rock",
1071+ "deferred": None})
1072+
1073+ def test_with_default_arguments(self):
1074+ """
1075+ A connected client can issue a L{MethodCall} for methods having
1076+ default arguments.
1077+ """
1078+ result = self.protocol.send_method_call(method="lower_case",
1079+ args=["OHH"],
1080+ kwargs={})
1081+ return self.assertSuccess(result, {"result": "ohh",
1082+ "deferred": None})
1083+
1084+ def test_with_overriden_default_arguments(self):
1085+ """
1086+ A connected client can issue a L{MethodCall} with keyword arguments
1087+ having default values in the target object. If a value is specified by
1088+ the caller it will be used in place of the default value
1089+ """
1090+ result = self.protocol.send_method_call(method="lower_case",
1091+ args=["OHH"],
1092+ kwargs={"index": 2})
1093+ return self.assertSuccess(result, {"result": "OHh",
1094+ "deferred": None})
1095+
1096+ def test_with_dictionary_arguments(self):
1097+ """
1098+ Method arguments passed to a L{MethodCall} can be dictionaries.
1099+ """
1100+ result = self.protocol.send_method_call(method="multiply_"
1101+ "alphabetically",
1102+ args=[{"foo": 2, "bar": 3}],
1103+ kwargs={})
1104+ return self.assertSuccess(result, {"result": "barbarbarfoofoo",
1105+ "deferred": None})
1106+
1107+ def test_with_non_serializable_return_value(self):
1108+ """
1109+ If the target object method returns an object that can't be serialized,
1110+ the L{MethodCall} result is C{None}.
1111+ """
1112+ result = self.protocol.send_method_call(method="meaning_of_life",
1113+ args=[],
1114+ kwargs={})
1115+ return self.assertFailure(result, MethodCallError)
1116+
1117+ def test_translate(self):
1118+ """
1119+ If the target object method raises an exception, the remote call fails
1120+ with a L{MethodCallError}.
1121+ """
1122+ result = self.protocol.send_method_call(method="translate",
1123+ args=["hi"],
1124+ kwargs={})
1125+ return self.assertFailure(result, MethodCallError)
1126+
1127+
1128+class RemoteObjectTest(LandscapeTest):
1129+
1130+ def setUp(self):
1131+ super(RemoteObjectTest, self).setUp()
1132+ socket = self.mktemp()
1133+ server_factory = WordsFactory(object=Words())
1134+ self.port = reactor.listenUNIX(socket, server_factory)
1135+
1136+ def set_remote(protocol):
1137+ self.protocol = protocol
1138+ self.words = RemoteObject(protocol)
1139+ client_factory.stopTrying()
1140+
1141+ connected = Deferred()
1142+ connected.addCallback(set_remote)
1143+ client_factory = WordsFactory(reactor=reactor)
1144+ client_factory.add_notifier(connected.callback)
1145+ reactor.connectUNIX(socket, client_factory)
1146+ return connected
1147+
1148+ def tearDown(self):
1149+ self.protocol.transport.loseConnection()
1150+ self.port.stopListening()
1151+ super(RemoteObjectTest, self).tearDown()
1152+
1153+ def test_method_call_sender_with_forbidden_method(self):
1154+ """
1155+ A L{RemoteObject} can send L{MethodCall}s without arguments and withj
1156+ an empty response.
1157+ """
1158+ result = self.words.secret()
1159+ return self.assertFailure(result, MethodCallError)
1160+
1161+ def test_with_no_arguments(self):
1162+ """
1163+ A L{RemoteObject} can send L{MethodCall}s without arguments and withj
1164+ an empty response.
1165+ """
1166+ return self.assertSuccess(self.words.empty())
1167+
1168+ def test_with_return_value(self):
1169+ """
1170+ A L{RemoteObject} can send L{MethodCall}s without arguments and get
1171+ back the value of the commands's response.
1172+ """
1173+ result = self.words.motd()
1174+ return self.assertSuccess(result, "Words are cool")
1175+
1176+ def test_with_one_argument(self):
1177+ """
1178+ A L{RemoteObject} can send L{MethodCall}s with one argument and get
1179+ the response value.
1180+ """
1181+ result = self.words.capitalize("john")
1182+ return self.assertSuccess(result, "John")
1183+
1184+ def test_with_one_keyword_argument(self):
1185+ """
1186+ A L{RemoteObject} can send L{MethodCall}s with a named argument.
1187+ """
1188+ result = self.words.capitalize(word="john")
1189+ return self.assertSuccess(result, "John")
1190+
1191+ def test_with_boolean_return_value(self):
1192+ """
1193+ The return value of a L{MethodCall} argument can be a boolean.
1194+ """
1195+ return self.assertSuccess(self.words.is_short("hi"), True)
1196+
1197+ def test_with_many_arguments(self):
1198+ """
1199+ A L{RemoteObject} can send L{MethodCall}s with more than one argument.
1200+ """
1201+ result = self.words.concatenate("You ", "rock")
1202+ return self.assertSuccess(result, "You rock")
1203+
1204+ def test_with_many_keyword_arguments(self):
1205+ """
1206+ A L{RemoteObject} can send L{MethodCall}s with several
1207+ named arguments.
1208+ """
1209+ result = self.words.concatenate(word2="rock", word1="You ")
1210+ return self.assertSuccess(result, "You rock")
1211+
1212+ def test_with_default_arguments(self):
1213+ """
1214+ A L{RemoteObject} can send a L{MethodCall} having an argument with
1215+ a default value.
1216+ """
1217+ result = self.words.lower_case("OHH")
1218+ return self.assertSuccess(result, "ohh")
1219+
1220+ def test_with_overriden_default_arguments(self):
1221+ """
1222+ A L{RemoteObject} can send L{MethodCall}s overriding the default
1223+ value of an argument.
1224+ """
1225+ result = self.words.lower_case("OHH", 2)
1226+ return self.assertSuccess(result, "OHh")
1227+
1228+ def test_with_dictionary_arguments(self):
1229+ """
1230+ A L{RemoteObject} can send a L{MethodCall}s for methods requiring
1231+ a dictionary arguments.
1232+ """
1233+ result = self.words.multiply_alphabetically({"foo": 2, "bar": 3})
1234+ return self.assertSuccess(result, "barbarbarfoofoo")
1235+
1236+ def test_with_generic_args_and_kwargs(self):
1237+ """
1238+ A L{RemoteObject} behaves well with L{MethodCall}s for methods
1239+ having generic C{*args} and C{**kwargs} arguments.
1240+ """
1241+ result = self.words.guess("word", "cool", value=4)
1242+ return self.assertSuccess(result, "Guessed!")
1243+
1244+ def test_with_success_full_deferred(self):
1245+ """
1246+ If the target object method returns a L{Deferred}, it is handled
1247+ transparently.
1248+ """
1249+ result = self.words.google("Landscape")
1250+ return self.assertSuccess(result, "Cool!")
1251+
1252+ def test_with_failing_deferred(self):
1253+ """
1254+ If the target object method returns a failing L{Deferred}, a
1255+ L{MethodCallError} is raised.
1256+ """
1257+ result = self.words.google("Weird stuff")
1258+ return self.assertFailure(result, MethodCallError)
1259+
1260+ def test_with_already_callback_deferred(self):
1261+ """
1262+ The target object method can return an already fired L{Deferred}.
1263+ """
1264+ result = self.words.google("Easy query")
1265+ return self.assertSuccess(result, "Done!")
1266+
1267+ def test_with_already_errback_deferred(self):
1268+ """
1269+ If the target object method can return an already failed L{Deferred}.
1270+ """
1271+ result = self.words.google("Censored")
1272+ return self.assertFailure(result, MethodCallError)
1273+
1274+ def test_with_deferred_timeout(self):
1275+ """
1276+ If the peer protocol doesn't send a response for a deferred within
1277+ the given timeout, the method call fails.
1278+ """
1279+ result = self.words.google("Long query")
1280+ return self.assertFailure(result, MethodCallError)
1281+
1282+ def test_with_late_response(self):
1283+ """
1284+ If the peer protocol sends a late response for a request that has
1285+ already timeout, that response is ignored.
1286+ """
1287+ self.protocol.timeout = 0.01
1288+ result = self.words.google("Slowish query")
1289+ self.assertFailure(result, MethodCallError)
1290+
1291+ def assert_late_response_is_handled(ignored):
1292+ deferred = Deferred()
1293+ # We wait a bit to be sure that the late response gets delivered
1294+ reactor.callLater(0.1, lambda: deferred.callback(None))
1295+ return deferred
1296+
1297+ return result.addCallback(assert_late_response_is_handled)
1298+
1299+
1300+class MethodCallFactoryTest(LandscapeTest):
1301+
1302+ def setUp(self):
1303+ super(MethodCallFactoryTest, self).setUp()
1304+ self.clock = Clock()
1305+ self.factory = WordsFactory(reactor=self.clock)
1306+
1307+ def test_add_notifier(self):
1308+ """
1309+ The L{MethodCallClientFactory.add_notifier} method can be used to
1310+ add a callback function to be called when a connection is made and
1311+ a new protocol instance built.
1312+ """
1313+ protocol = self.factory.protocol()
1314+ self.factory.protocol = lambda: protocol
1315+ callback = self.mocker.mock()
1316+ callback(protocol)
1317+ self.mocker.replay()
1318+ self.factory.add_notifier(callback)
1319+ self.factory.buildProtocol(None)
1320+ self.clock.advance(0)
1321+
1322+ def test_remove_notifier(self):
1323+ """
1324+ The L{MethodCallClientFactory.remove_notifier} method can be used to
1325+ remove a previously added notifier callback.
1326+ """
1327+ callback = lambda protocol: 1 / 0
1328+ self.factory.add_notifier(callback)
1329+ self.factory.remove_notifier(callback)
1330+ self.factory.buildProtocol(None)
1331+ self.clock.advance(0)
1332+
1333+ def test_client_connection_failed(self):
1334+ """
1335+ The L{MethodCallClientFactory} keeps trying to connect if maxRetries
1336+ is not reached.
1337+ """
1338+ # This is sub-optimal but the ReconnectingFactory in Hardy's Twisted
1339+ # doesn't support task.Clock
1340+ self.factory.retry = self.mocker.mock()
1341+ self.factory.retry(KWARGS)
1342+ self.mocker.replay()
1343+ self.assertEquals(self.factory.retries, 0)
1344+ self.factory.clientConnectionFailed(None, None)
1345+
1346+ def test_client_connection_failed_with_max_retries_reached(self):
1347+ """
1348+ The L{MethodCallClientFactory} stops trying to connect if maxRetries
1349+ is reached.
1350+ """
1351+ callback = lambda protocol: 1 / 0
1352+ errback = self.mocker.mock()
1353+ errback("failure")
1354+ self.mocker.replay()
1355+
1356+ self.factory.add_notifier(callback, errback)
1357+ self.factory.maxRetries = 1
1358+ self.factory.retries = self.factory.maxRetries
1359+ self.factory.clientConnectionFailed(object(), "failure")
1360+ self.clock.advance(0)
1361+
1362+
1363+class RemoteObjectCreatorTest(LandscapeTest):
1364+
1365+ def setUp(self):
1366+ super(RemoteObjectCreatorTest, self).setUp()
1367+ self.socket = self.mktemp()
1368+ self.server_factory = WordsFactory(object=Words())
1369+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1370+ self.connector = RemoteWordsCreator(reactor, self.socket,
1371+ retry_on_reconnect=True,
1372+ timeout=0.7)
1373+
1374+ def set_remote(words):
1375+ self.words = words
1376+
1377+ connected = self.connector.connect()
1378+ return connected.addCallback(set_remote)
1379+
1380+ def tearDown(self):
1381+ self.connector.disconnect()
1382+ self.port.stopListening()
1383+ super(RemoteObjectCreatorTest, self).tearDown()
1384+
1385+ def test_connect(self):
1386+ """
1387+ The L{RemoteObject} resulting form the deferred returned by
1388+ L{RemoteObjectCreator.connect} is properly connected to the
1389+ remote peer.
1390+ """
1391+ return self.assertSuccess(self.words.empty())
1392+
1393+ def test_connect_with_max_retries(self):
1394+ """
1395+ If C{max_retries} is passed to the L{RemoteObjectCreator} method,
1396+ then it will give up trying to connect after that amout of times.
1397+ """
1398+ self.connector.disconnect()
1399+ self.port.stopListening()
1400+
1401+ def reconnect(ignored):
1402+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1403+ return self.connector.connect()
1404+
1405+ result = self.connector.connect(max_retries=0)
1406+ self.assertFailure(result, ConnectError)
1407+ return result.addCallback(reconnect)
1408+
1409+ def test_reconnect(self):
1410+ """
1411+ If the connection is lost, the L{RemoteObject} created by the creator
1412+ will transparently handle the reconnection.
1413+ """
1414+ self.words._protocol.transport.loseConnection()
1415+ self.port.stopListening()
1416+
1417+ def restart_listening():
1418+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1419+ reactor.callLater(0.3, assert_remote)
1420+
1421+ def assert_remote():
1422+ result = self.words.empty()
1423+ result.addCallback(lambda x: reconnected.callback(None))
1424+ return result
1425+
1426+ reactor.callLater(0.01, restart_listening)
1427+ reconnected = Deferred()
1428+ return reconnected
1429+
1430+ def test_method_call_error(self):
1431+ """
1432+ If a L{MethodCall} fails due to a L{MethodCallError}, the
1433+ L{RemoteObject} won't try to perform it again.
1434+ """
1435+ return self.assertFailure(self.words.secret(), MethodCallError)
1436+
1437+ def test_retry(self):
1438+ """
1439+ If the connection is lost, the L{RemoteObject} created by the creator
1440+ will transparently retry to perform the L{MethodCall} requests that
1441+ failed due to the broken connection.
1442+ """
1443+ self.words._protocol.transport.loseConnection()
1444+ self.port.stopListening()
1445+
1446+ def restart_listening():
1447+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1448+
1449+ reactor.callLater(0.1, restart_listening)
1450+ return self.words.empty()
1451+
1452+ def test_retry_with_method_call_error(self):
1453+ """
1454+ If a retried L{MethodCall} request fails due to a L{MethodCallError},
1455+ the L{RemoteObject} will properly propagate the error to the original
1456+ caller.
1457+ """
1458+ self.words._protocol.transport.loseConnection()
1459+ self.port.stopListening()
1460+
1461+ def restart_listening():
1462+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1463+
1464+ def assert_failure(error):
1465+ self.assertEquals(str(error), "Forbidden method 'secret'")
1466+
1467+ reactor.callLater(0.5, restart_listening)
1468+ result = self.words.secret()
1469+ self.assertFailure(result, MethodCallError)
1470+ return result.addCallback(assert_failure)
1471+
1472+ def test_wb_retry_with_while_still_disconnected(self):
1473+ """
1474+ The L{RemoteObject._retry} method gets called as soon as a new
1475+ connection is ready. If for whatever reason the connection drops
1476+ again very quickly, the C{_retry} method will behave as expected.
1477+ """
1478+ self.words._protocol.transport.loseConnection()
1479+ self.port.stopListening()
1480+
1481+ def handle_reconnect(protocol):
1482+ # In this precise moment we have a newly connected protocol
1483+ self.words._protocol = protocol
1484+
1485+ # Pretend that the connection is lost again very quickly
1486+ protocol.transport.loseConnection()
1487+ self.port.stopListening()
1488+
1489+ # Force RemoteObject._retry to run using a disconnected protocol
1490+ reactor.callLater(0, self.words._retry)
1491+
1492+ # Restore the real handler and start listening again very soon
1493+ self.connector._factory.remove_notifier(handle_reconnect)
1494+ self.connector._factory.add_notifier(self.words._handle_reconnect)
1495+ reactor.callLater(0.2, restart_listening)
1496+
1497+ def restart_listening():
1498+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1499+
1500+ def assert_failure(error):
1501+ self.assertEquals(str(error), "Forbidden method 'secret'")
1502+
1503+ # Use our own reconnect handler
1504+ self.connector._factory.remove_notifier(self.words._handle_reconnect)
1505+ self.connector._factory.add_notifier(handle_reconnect)
1506+
1507+ reactor.callLater(0.2, restart_listening)
1508+ result = self.words.secret()
1509+ self.assertFailure(result, MethodCallError)
1510+ return result.addCallback(assert_failure)
1511+
1512+ def test_retry_with_many_method_calls(self):
1513+ """
1514+ If several L{MethodCall} requests were issued while disconnected, they
1515+ will be all eventually completed when the connection gets established
1516+ again.
1517+ """
1518+ self.words._protocol.transport.loseConnection()
1519+ self.port.stopListening()
1520+
1521+ def restart_listening():
1522+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1523+
1524+ def assert_guess(response):
1525+ self.assertEquals(response, "Guessed!")
1526+
1527+ def assert_secret(failure):
1528+ self.assertEquals(str(failure.value), "Forbidden method 'secret'")
1529+
1530+ def assert_motd(response):
1531+ self.assertEquals(response, "Words are cool")
1532+
1533+ reactor.callLater(0.1, restart_listening)
1534+
1535+ results = [self.words.guess("word", "cool", value=4),
1536+ self.words.secret(),
1537+ self.words.motd()]
1538+ results[0].addCallback(assert_guess)
1539+ results[1].addErrback(assert_secret)
1540+ results[2].addCallback(assert_motd)
1541+ return DeferredList(results)
1542+
1543+ def test_retry_without_retry_on_reconnect(self):
1544+ """
1545+ If C{retry_on_reconnect} is C{False}, the L{RemoteObject} object won't
1546+ retry to perform requests which failed because the connection was
1547+ lost, however requests made after a reconnection will still succeed.
1548+ """
1549+ self.words._protocol.transport.loseConnection()
1550+ self.port.stopListening()
1551+
1552+ def restart_listening():
1553+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1554+ reactor.callLater(0.3, assert_reconnected)
1555+
1556+ def assert_reconnected():
1557+ result = self.words.empty()
1558+ result.addCallback(lambda x: reconnected.callback(None))
1559+ return result
1560+
1561+ reactor.callLater(0.1, restart_listening)
1562+ self.words._retry_on_reconnect = False
1563+ result = self.words.empty()
1564+ self.assertFailure(result, ConnectionDone)
1565+ reconnected = Deferred()
1566+ return result.addCallback(lambda x: reconnected)
1567+
1568+ def test_retry_with_timeout(self):
1569+ """
1570+ If a C{timeout} is set, the L{RemoteObject} object will errback failed
1571+ L{MethodCall}s after that amount of seconds, without retrying them when
1572+ the connection established again.
1573+ """
1574+ self.words._protocol.transport.loseConnection()
1575+ self.port.stopListening()
1576+
1577+ def restart_listening():
1578+ self.port = reactor.listenUNIX(self.socket, self.server_factory)
1579+ reactor.callLater(0.1, reconnected.callback, None)
1580+
1581+ def assert_failure(error):
1582+ self.assertEquals(str(error), "timeout")
1583+ return reconnected
1584+
1585+ reactor.callLater(0.9, restart_listening)
1586+ result = self.words.empty()
1587+ self.assertFailure(result, MethodCallError)
1588+ reconnected = Deferred()
1589+ return result.addCallback(assert_failure)
1590+>>>>>>> MERGE-SOURCE
1591
1592=== modified file 'landscape/package/reporter.py'
1593=== modified file 'landscape/package/tests/test_reporter.py'
1594=== modified file 'landscape/reactor.py'
1595--- landscape/reactor.py 2009-11-02 13:49:26 +0000
1596+++ landscape/reactor.py 2010-03-05 12:44:28 +0000
1597@@ -158,6 +158,13 @@
1598 self.cancel_call(self._run_threaded_callbacks_id)
1599
1600
1601+class UnixReactorMixin(object):
1602+
1603+ def listen_unix(self, *args, **kwargs):
1604+ """Start listen on a Unix socket."""
1605+ return self._reactor.listenUNIX(*args, **kwargs)
1606+
1607+
1608 class ReactorID(object):
1609
1610 def __init__(self, timeout):
1611@@ -215,7 +222,7 @@
1612
1613
1614 class FakeReactor(EventHandlingReactorMixin,
1615- ThreadedCallsReactorMixin):
1616+ ThreadedCallsReactorMixin, UnixReactorMixin):
1617 """
1618 @ivar udp_transports: dict of {port: (protocol, transport)}
1619 @ivar hosts: Dict of {hostname: ip}. Users should populate this
1620@@ -228,6 +235,11 @@
1621 self.udp_transports = {}
1622 self.hosts = {}
1623
1624+ # We need a reference to the Twisted reactor as well to
1625+ # let Landscape services listen to Unix sockets
1626+ from twisted.internet import reactor
1627+ self._reactor = reactor
1628+
1629 def time(self):
1630 return float(self._current_time)
1631
1632@@ -331,9 +343,8 @@
1633 return succeed(hostname)
1634
1635
1636-
1637 class TwistedReactor(EventHandlingReactorMixin,
1638- ThreadedCallsReactorMixin):
1639+ ThreadedCallsReactorMixin, UnixReactorMixin):
1640 """Wrap and add functionalities to the Twisted C{reactor}."""
1641
1642 def __init__(self):
1643
1644=== added file 'landscape/tests/test_amp.py'
1645--- landscape/tests/test_amp.py 1970-01-01 00:00:00 +0000
1646+++ landscape/tests/test_amp.py 2010-03-05 12:44:28 +0000
1647@@ -0,0 +1,126 @@
1648+import os
1649+
1650+from twisted.internet.defer import Deferred
1651+from twisted.internet.error import ConnectError
1652+
1653+from landscape.tests.helpers import LandscapeTest
1654+from landscape.reactor import FakeReactor
1655+from landscape.deployment import Configuration
1656+from landscape.amp import (
1657+ LandscapeComponentFactory, RemoteLandscapeComponentCreator)
1658+
1659+
1660+class TestComponent(object):
1661+
1662+ name = "test"
1663+
1664+
1665+class TestComponentFactory(LandscapeComponentFactory):
1666+
1667+ maxRetries = 0
1668+ initialDelay = 0.01
1669+
1670+
1671+class RemoteTestComponentCreator(RemoteLandscapeComponentCreator):
1672+
1673+ factory = TestComponentFactory
1674+ component = TestComponent
1675+
1676+
1677+class RemoteLandscapeComponentTest(LandscapeTest):
1678+
1679+ def setUp(self):
1680+ super(RemoteLandscapeComponentTest, self).setUp()
1681+ reactor = FakeReactor()
1682+ config = Configuration()
1683+ config.data_path = self.makeDir()
1684+ socket = os.path.join(config.data_path, "test.sock")
1685+ self.component = TestComponent()
1686+ factory = LandscapeComponentFactory(object=self.component)
1687+ self.port = reactor.listen_unix(socket, factory)
1688+
1689+
1690+ self.connector = RemoteTestComponentCreator(reactor, config)
1691+ connected = self.connector.connect()
1692+ connected.addCallback(lambda remote: setattr(self, "remote", remote))
1693+ return connected
1694+
1695+ def tearDown(self):
1696+ self.connector.disconnect()
1697+ self.port.stopListening()
1698+ super(RemoteLandscapeComponentTest, self).tearDown()
1699+
1700+ def test_ping(self):
1701+ """
1702+ The L{LandscapeComponentProtocol} exposes the C{ping} method of a
1703+ remote Landscape component.
1704+ """
1705+ self.component.ping = self.mocker.mock()
1706+ self.expect(self.component.ping()).result(True)
1707+ self.mocker.replay()
1708+ result = self.remote.ping()
1709+ return self.assertSuccess(result, True)
1710+
1711+ def test_exit(self):
1712+ """
1713+ The L{LandscapeComponentProtocol} exposes the C{exit} method of a
1714+ remote Landscape component.
1715+ """
1716+ self.component.exit = self.mocker.mock()
1717+ self.component.exit()
1718+ self.mocker.replay()
1719+ result = self.remote.exit()
1720+ return self.assertSuccess(result)
1721+
1722+
1723+class RemoteLandscapeComponentCreatorTest(LandscapeTest):
1724+
1725+ def setUp(self):
1726+ super(RemoteLandscapeComponentCreatorTest, self).setUp()
1727+ self.reactor = FakeReactor()
1728+ self.config = Configuration()
1729+ self.config.data_path = self.makeDir()
1730+ self.connector = RemoteTestComponentCreator(self.reactor, self.config)
1731+
1732+ def test_connect_logs_errors(self):
1733+ """
1734+ Connection errors are logged.
1735+ """
1736+ self.log_helper.ignore_errors("Error while connecting to test")
1737+
1738+ def assert_log(ignored):
1739+ self.assertIn("Error while connecting to test",
1740+ self.logfile.getvalue())
1741+
1742+ result = self.connector.connect(max_retries=0)
1743+ self.assertFailure(result, ConnectError)
1744+ return result.addCallback(assert_log)
1745+
1746+ def test_reconnect_fires_event(self):
1747+ """
1748+ An event is fired whenever the connection is established again after
1749+ it has been lost.
1750+ """
1751+ socket = os.path.join(self.config.data_path, "test.sock")
1752+ factory = LandscapeComponentFactory()
1753+ ports = []
1754+ ports.append(self.reactor.listen_unix(socket, factory))
1755+
1756+ def listen_again():
1757+ ports.append(self.reactor.listen_unix(socket, factory))
1758+
1759+ def connected(remote):
1760+ remote._protocol.transport.loseConnection()
1761+ ports[0].stopListening()
1762+ self.reactor._reactor.callLater(0.01, listen_again)
1763+
1764+ def reconnected():
1765+ self.connector.disconnect()
1766+ ports[1].stopListening()
1767+ deferred.callback(None)
1768+
1769+ deferred = Deferred()
1770+ self.reactor.call_on("test-reconnect", reconnected)
1771+ result = self.connector.connect()
1772+ result.addCallback(connected)
1773+ return deferred

Subscribers

People subscribed via source and target branches

to all changes: