Merge lp:~free.ekanayaka/landscape-client/method-call-defer into lp:~landscape/landscape-client/amp-trunk

Proposed by Sidnei da Silva
Status: Merged
Approved by: Sidnei da Silva
Approved revision: not available
Merge reported by: Free Ekanayaka
Merged at revision: not available
Proposed branch: lp:~free.ekanayaka/landscape-client/method-call-defer
Merge into: lp:~landscape/landscape-client/amp-trunk
Diff against target: 412 lines (+227/-30)
2 files modified
landscape/lib/amp.py (+123/-13)
landscape/lib/tests/test_amp.py (+104/-17)
To merge this branch: bzr merge lp:~free.ekanayaka/landscape-client/method-call-defer
Reviewer Review Type Date Requested Status
Kapil Thangavelu (community) Approve
Duncan McGreggor (community) Approve
Review via email: mp+17758@code.launchpad.net

This proposal supersedes a proposal from 2010-01-20.

To post a comment you must log in.
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Please note that this mechanism is mainly meant for unit-testing. In general our code will not really use this feature or rely on it, because we don't generally wait at all for remote calls that return deferreds. However when unit-testing it is often needed to wait for a remote call to complete, and this branch makes it easy.

Revision history for this message
Duncan McGreggor (oubiwann) wrote :

This is super-cool, Free -- so glad you did this :-)

[1] 1661 tests, all passing -- nice!

[2] This isn't part of this particular change, but there's a bare exception before the deferred check:

| try:
| result = getattr(self.factory.object, method)(*args, **kwargs)
| except Exception, error:
| raise MethodCallError("Remote exception %s" % str(error))

It'd be nice if expected exceptions were handled explicitly here.

[3] You'll want to get Thomas' feedback on this suggestion... but one thing you might want to consider is actually having receive_method_call always return a deferred (with return maybeDeferred(result)). That would make the method much simpler, but it would require changes elsewhere. Regardless, probably enough work to justify being done in a separate ticket/branch ;-)

+1 for merge!

review: Approve
216. By Free Ekanayaka

Use maybeDeferred to simplify receive_method_call logic (Duncan [3])

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Thanks Duncan! It was awesome to get your Landscape "goodbye review" :)

[1]

Yay, thanks buildbot.

[2]

The thing is that this is a very general method call, so in principle any exception could be raised. However whatever gets raised is wrapped around a MethodCallError, so the remote caller can possibly cope with it.

[3]

Great idea! That indeed simplified the logic quite a bit. Thanks.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Nice implementation, thorough test coverage. Looks good to me. +1

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Thanks Kapil, that's merged in r185 of amp-trunk.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'landscape/lib/amp.py'
--- landscape/lib/amp.py 2010-01-20 08:33:12 +0000
+++ landscape/lib/amp.py 2010-01-25 08:30:27 +0000
@@ -1,8 +1,10 @@
1"""Expose the methods of a remote object over AMP."""1"""Expose the methods of a remote object over AMP."""
22
3from twisted.internet.defer import Deferred3from uuid import uuid4
4from twisted.internet.defer import Deferred, maybeDeferred
4from twisted.internet.protocol import ServerFactory, ClientFactory5from twisted.internet.protocol import ServerFactory, ClientFactory
5from twisted.protocols.amp import Argument, String, Command, AMP6from twisted.protocols.amp import Argument, String, Command, AMP
7from twisted.python.failure import Failure
68
7from landscape.lib.bpickle import loads, dumps, dumps_table9from landscape.lib.bpickle import loads, dumps, dumps_table
810
@@ -35,11 +37,21 @@
35 ("args", MethodCallArgument()),37 ("args", MethodCallArgument()),
36 ("kwargs", MethodCallArgument())]38 ("kwargs", MethodCallArgument())]
3739
38 response = [("result", MethodCallArgument())]40 response = [("result", MethodCallArgument()),
41 ("deferred", String(optional=True))]
3942
40 errors = {MethodCallError: "METHOD_CALL_ERROR"}43 errors = {MethodCallError: "METHOD_CALL_ERROR"}
4144
4245
46class DeferredResponse(Command):
47 """Fire a L{Deferred} associated with an outstanding method call result."""
48
49 arguments = [("uuid", String()),
50 ("result", MethodCallArgument(optional=True)),
51 ("failure", String(optional=True))]
52 requiresAnswer = False
53
54
43class MethodCallServerProtocol(AMP):55class MethodCallServerProtocol(AMP):
44 """Expose methods of a local object over AMP.56 """Expose methods of a local object over AMP.
4557
@@ -68,17 +80,109 @@
68 if not method in self.methods:80 if not method in self.methods:
69 raise MethodCallError("Forbidden method '%s'" % method)81 raise MethodCallError("Forbidden method '%s'" % method)
7082
71 try:83 method_func = getattr(self.factory.object, method)
72 result = getattr(self.factory.object, method)(*args, **kwargs)84 result = maybeDeferred(method_func, *args, **kwargs)
73 except Exception, error:85
74 raise MethodCallError("Remote exception %s" % str(error))86 # If the Deferred was already fired, we can return its result
87 if result.called:
88 if isinstance(result.result, Failure):
89 failure = str(result.result.value)
90 result.addErrback(lambda error: None) # Stop propagating
91 raise MethodCallError(failure)
92 return {"result": self._check_result(result.result)}
93
94 uuid = str(uuid4())
95 result.addBoth(self.send_deferred_response, uuid)
96 return {"result": None, "deferred": uuid}
97
98 def _check_result(self, result):
99 """Check that the C{result} we're about to return is serializable.
100
101 @return: The C{result} itself if valid.
102 @raises: L{MethodCallError} if C{result} is not serializable.
103 """
75 if not MethodCallArgument.check(result):104 if not MethodCallArgument.check(result):
76 raise MethodCallError("Non-serializable result")105 raise MethodCallError("Non-serializable result")
77 return {"result": result}106 return result
107
108 def send_deferred_response(self, result, uuid):
109 """Send a L{DeferredResponse} for the deferred with given C{uuid}.
110
111 This is called when the result of a L{Deferred} returned by an
112 object's method becomes available. A L{DeferredResponse} notifying
113 such result (either success or failure) is sent to the peer.
114 """
115 kwargs = {"uuid": uuid}
116 if isinstance(result, Failure):
117 kwargs["failure"] = str(result.value)
118 else:
119 kwargs["result"] = self._check_result(result)
120 self.callRemote(DeferredResponse, **kwargs)
78121
79122
80class MethodCallClientProtocol(AMP):123class MethodCallClientProtocol(AMP):
81 """Calls methods of a remote object over L{AMP}."""124 """Calls methods of a remote object over L{AMP}.
125
126 @note: If the remote method returns a deferred, the associated local
127 deferred returned by L{send_method_call} will result in the same
128 callback value of the remote deferred.
129 @cvar timeout: A timeout for remote methods returning L{Deferred}s, if a
130 response for the deferred is not received within this amount of
131 seconds, the remote method call will errback with a L{MethodCallError}.
132 """
133 timeout = 60
134
135 def __init__(self):
136 AMP.__init__(self)
137 self._pending_responses = {}
138
139 @DeferredResponse.responder
140 def receive_deferred_response(self, uuid, result, failure):
141 """Receive the deferred L{MethodCall} response.
142
143 @param uuid: The id of the L{MethodCall} we're getting the result of.
144 @param result: The result of the associated deferred if successful.
145 @param failure: The failure message of the deferred if it failed.
146 """
147 self.fire_pending_response(uuid, result, failure)
148 return {}
149
150 def fire_pending_response(self, uuid, result, failure):
151 """Fire the L{Deferred} associated with a pending response.
152
153 @param uuid: The id of the L{MethodCall} we're getting the result of.
154 @param result: The result of the associated deferred if successful.
155 @param failure: The failure message of the deferred if it failed.
156 """
157 try:
158 deferred, call = self._pending_responses.pop(uuid)
159 except KeyError:
160 # Late response for a request that has timeout, just ignore it
161 return
162 if not call.called:
163 call.cancel()
164 if failure is None:
165 deferred.callback({"result": result})
166 else:
167 deferred.errback(MethodCallError(failure))
168
169 def handle_response(self, response):
170 """Handle a L{MethodCall} response.
171
172 If the response is tagged as deferred, it will be queued as pending,
173 and a L{Deferred} is returned, which will be fired as soon as the
174 final response becomes available, or the timeout is reached.
175 """
176 if response["deferred"]:
177 uuid = response["deferred"]
178 deferred = Deferred()
179 call = self.factory.reactor.callLater(self.timeout,
180 self.fire_pending_response,
181 uuid, None, "timeout")
182 self._pending_responses[uuid] = (deferred, call)
183 return deferred
184
185 return response
82186
83 def send_method_call(self, method, args=[], kwargs={}):187 def send_method_call(self, method, args=[], kwargs={}):
84 """Send a L{MethodCall} command with the given arguments.188 """Send a L{MethodCall} command with the given arguments.
@@ -87,8 +191,12 @@
87 @param args: The positional arguments to pass to the remote method.191 @param args: The positional arguments to pass to the remote method.
88 @param kwargs: The keyword arguments to pass to the remote method.192 @param kwargs: The keyword arguments to pass to the remote method.
89 """193 """
90 return self.callRemote(MethodCall,194 result = self.callRemote(MethodCall,
91 method=method, args=args, kwargs=kwargs)195 method=method, args=args, kwargs=kwargs)
196 # The result can be C{None} only if the requested command is a
197 # DeferredResponse, which has requiresAnswer set to False
198 if result is not None:
199 return result.addCallback(self.handle_response)
92200
93201
94class MethodCallServerFactory(ServerFactory):202class MethodCallServerFactory(ServerFactory):
@@ -111,16 +219,18 @@
111219
112 def __init__(self, reactor, notifier):220 def __init__(self, reactor, notifier):
113 """221 """
114 @param reactor: The reactor used to schedule connection callbacks.222 @param reactor: The reactor used by the created protocols to schedule
223 notifications and timeouts.
115 @param notifier: A function that will be called when the connection is224 @param notifier: A function that will be called when the connection is
116 established. It will be passed the protocol instance as argument.225 established. It will be passed the protocol instance as argument.
117 """226 """
118 self._reactor = reactor227 self.reactor = reactor
119 self._notifier = notifier228 self._notifier = notifier
120229
121 def buildProtocol(self, addr):230 def buildProtocol(self, addr):
122 protocol = self.protocol()231 protocol = self.protocol()
123 self._reactor.callLater(0, self._notifier, protocol)232 protocol.factory = self
233 self.reactor.callLater(0, self._notifier, protocol)
124 return protocol234 return protocol
125235
126236
127237
=== modified file 'landscape/lib/tests/test_amp.py'
--- landscape/lib/tests/test_amp.py 2010-01-20 08:33:12 +0000
+++ landscape/lib/tests/test_amp.py 2010-01-25 08:30:27 +0000
@@ -1,9 +1,11 @@
1from twisted.internet import reactor1from twisted.internet import reactor
2from twisted.internet.defer import Deferred
2from twisted.internet.protocol import ClientCreator3from twisted.internet.protocol import ClientCreator
34
4from landscape.lib.amp import (5from landscape.lib.amp import (
5 MethodCallError, MethodCallServerProtocol, MethodCallClientProtocol,6 MethodCallError, MethodCallServerProtocol,
6 MethodCallServerFactory, RemoteObject, RemoteObjectCreator)7 MethodCallClientProtocol, MethodCallServerFactory,
8 MethodCallClientFactory, RemoteObject, RemoteObjectCreator)
7from landscape.tests.helpers import LandscapeTest9from landscape.tests.helpers import LandscapeTest
810
911
@@ -60,6 +62,24 @@
60 def translate(self, word):62 def translate(self, word):
61 raise WordsException("Unknown word")63 raise WordsException("Unknown word")
6264
65 def google(self, word):
66 deferred = Deferred()
67 if word == "Landscape":
68 reactor.callLater(0.01, lambda: deferred.callback("Cool!"))
69 elif word == "Easy query":
70 deferred.callback("Done!")
71 elif word == "Weird stuff":
72 reactor.callLater(0.01, lambda: deferred.errback(Exception("bad")))
73 elif word == "Censored":
74 deferred.errback(Exception("very bad"))
75 elif word == "Long query":
76 # Do nothing, the deferred won't be fired at all
77 pass
78 elif word == "Slowish query":
79 # Fire the result after a while.
80 reactor.callLater(0.05, lambda: deferred.callback("Done!"))
81 return deferred
82
6383
64class WordsProtocol(MethodCallServerProtocol):84class WordsProtocol(MethodCallServerProtocol):
6585
@@ -72,7 +92,8 @@
72 "multiply_alphabetically",92 "multiply_alphabetically",
73 "translate",93 "translate",
74 "meaning_of_life",94 "meaning_of_life",
75 "guess"]95 "guess",
96 "google"]
7697
7798
78class MethodCallProtocolTest(LandscapeTest):99class MethodCallProtocolTest(LandscapeTest):
@@ -114,7 +135,8 @@
114 result = self.protocol.send_method_call(method="empty",135 result = self.protocol.send_method_call(method="empty",
115 args=[],136 args=[],
116 kwargs={})137 kwargs={})
117 return self.assertSuccess(result, {"result": None})138 return self.assertSuccess(result, {"result": None,
139 "deferred": None})
118140
119 def test_with_return_value(self):141 def test_with_return_value(self):
120 """142 """
@@ -124,7 +146,8 @@
124 result = self.protocol.send_method_call(method="motd",146 result = self.protocol.send_method_call(method="motd",
125 args=[],147 args=[],
126 kwargs={})148 kwargs={})
127 return self.assertSuccess(result, {"result": "Words are cool"})149 return self.assertSuccess(result, {"result": "Words are cool",
150 "deferred": None})
128151
129 def test_with_one_argument(self):152 def test_with_one_argument(self):
130 """153 """
@@ -134,7 +157,8 @@
134 result = self.protocol.send_method_call(method="capitalize",157 result = self.protocol.send_method_call(method="capitalize",
135 args=["john"],158 args=["john"],
136 kwargs={})159 kwargs={})
137 return self.assertSuccess(result, {"result": "John"})160 return self.assertSuccess(result, {"result": "John",
161 "deferred": None})
138162
139 def test_with_boolean_return_value(self):163 def test_with_boolean_return_value(self):
140 """164 """
@@ -143,7 +167,8 @@
143 result = self.protocol.send_method_call(method="is_short",167 result = self.protocol.send_method_call(method="is_short",
144 args=["hi"],168 args=["hi"],
145 kwargs={})169 kwargs={})
146 return self.assertSuccess(result, {"result": True})170 return self.assertSuccess(result, {"result": True,
171 "deferred": None})
147172
148 def test_with_many_arguments(self):173 def test_with_many_arguments(self):
149 """174 """
@@ -152,7 +177,8 @@
152 result = self.protocol.send_method_call(method="concatenate",177 result = self.protocol.send_method_call(method="concatenate",
153 args=["You ", "rock"],178 args=["You ", "rock"],
154 kwargs={})179 kwargs={})
155 return self.assertSuccess(result, {"result": "You rock"})180 return self.assertSuccess(result, {"result": "You rock",
181 "deferred": None})
156182
157 def test_with_default_arguments(self):183 def test_with_default_arguments(self):
158 """184 """
@@ -162,7 +188,8 @@
162 result = self.protocol.send_method_call(method="lower_case",188 result = self.protocol.send_method_call(method="lower_case",
163 args=["OHH"],189 args=["OHH"],
164 kwargs={})190 kwargs={})
165 return self.assertSuccess(result, {"result": "ohh"})191 return self.assertSuccess(result, {"result": "ohh",
192 "deferred": None})
166193
167 def test_with_overriden_default_arguments(self):194 def test_with_overriden_default_arguments(self):
168 """195 """
@@ -173,7 +200,8 @@
173 result = self.protocol.send_method_call(method="lower_case",200 result = self.protocol.send_method_call(method="lower_case",
174 args=["OHH"],201 args=["OHH"],
175 kwargs={"index": 2})202 kwargs={"index": 2})
176 return self.assertSuccess(result, {"result": "OHh"})203 return self.assertSuccess(result, {"result": "OHh",
204 "deferred": None})
177205
178 def test_with_dictionary_arguments(self):206 def test_with_dictionary_arguments(self):
179 """207 """
@@ -183,7 +211,8 @@
183 "alphabetically",211 "alphabetically",
184 args=[{"foo": 2, "bar": 3}],212 args=[{"foo": 2, "bar": 3}],
185 kwargs={})213 kwargs={})
186 return self.assertSuccess(result, {"result": "barbarbarfoofoo"})214 return self.assertSuccess(result, {"result": "barbarbarfoofoo",
215 "deferred": None})
187216
188 def test_with_non_serializable_return_value(self):217 def test_with_non_serializable_return_value(self):
189 """218 """
@@ -211,17 +240,19 @@
211 def setUp(self):240 def setUp(self):
212 super(RemoteObjectTest, self).setUp()241 super(RemoteObjectTest, self).setUp()
213 socket = self.mktemp()242 socket = self.mktemp()
214 factory = MethodCallServerFactory(Words())243 server_factory = MethodCallServerFactory(Words())
215 factory.protocol = WordsProtocol244 server_factory.protocol = WordsProtocol
216 self.port = reactor.listenUNIX(socket, factory)245 self.port = reactor.listenUNIX(socket, server_factory)
217246
218 def set_remote(protocol):247 def set_remote(protocol):
219 self.protocol = protocol248 self.protocol = protocol
220 self.words = RemoteObject(protocol)249 self.words = RemoteObject(protocol)
221250
222 connector = ClientCreator(reactor, MethodCallClientProtocol)251 connected = Deferred()
223 connected = connector.connectUNIX(socket)252 connected.addCallback(set_remote)
224 return connected.addCallback(set_remote)253 client_factory = MethodCallClientFactory(reactor, connected.callback)
254 reactor.connectUNIX(socket, client_factory)
255 return connected
225256
226 def tearDown(self):257 def tearDown(self):
227 self.protocol.transport.loseConnection()258 self.protocol.transport.loseConnection()
@@ -319,6 +350,62 @@
319 result = self.words.guess("word", "cool", value=4)350 result = self.words.guess("word", "cool", value=4)
320 return self.assertSuccess(result, "Guessed!")351 return self.assertSuccess(result, "Guessed!")
321352
353 def test_with_success_full_deferred(self):
354 """
355 If the target object method returns a L{Deferred}, it is handled
356 transparently.
357 """
358 result = self.words.google("Landscape")
359 return self.assertSuccess(result, "Cool!")
360
361 def test_with_failing_deferred(self):
362 """
363 If the target object method returns a failing L{Deferred}, a
364 L{MethodCallError} is raised.
365 """
366 result = self.words.google("Weird stuff")
367 return self.assertFailure(result, MethodCallError)
368
369 def test_with_already_callback_deferred(self):
370 """
371 The target object method can return an already fired L{Deferred}.
372 """
373 result = self.words.google("Easy query")
374 return self.assertSuccess(result, "Done!")
375
376 def test_with_already_errback_deferred(self):
377 """
378 If the target object method can return an already failed L{Deferred}.
379 """
380 result = self.words.google("Censored")
381 return self.assertFailure(result, MethodCallError)
382
383 def test_with_deferred_timeout(self):
384 """
385 If the peer protocol doesn't send a response for a deferred within
386 the given timeout, the method call fails.
387 """
388 self.protocol.timeout = 0.1
389 result = self.words.google("Long query")
390 return self.assertFailure(result, MethodCallError)
391
392 def test_with_late_response(self):
393 """
394 If the peer protocol sends a late response for a request that has
395 already timeout, that response is ignored.
396 """
397 self.protocol.timeout = 0.01
398 result = self.words.google("Slowish query")
399 self.assertFailure(result, MethodCallError)
400
401 def assert_late_response_is_handled(ignored):
402 deferred = Deferred()
403 # We wait a bit to be sure that the late response gets delivered
404 reactor.callLater(0.1, lambda: deferred.callback(None))
405 return deferred
406
407 return result.addCallback(assert_late_response_is_handled)
408
322409
323class RemoteObjectCreatorTest(LandscapeTest):410class RemoteObjectCreatorTest(LandscapeTest):
324411

Subscribers

People subscribed via source and target branches

to all changes: