Merge lp:~free.ekanayaka/landscape-client/method-call-defer into lp:~landscape/landscape-client/amp-trunk

Proposed by Sidnei da Silva
Status: Merged
Approved by: Sidnei da Silva
Approved revision: not available
Merge reported by: Free Ekanayaka
Merged at revision: not available
Proposed branch: lp:~free.ekanayaka/landscape-client/method-call-defer
Merge into: lp:~landscape/landscape-client/amp-trunk
Diff against target: 412 lines (+227/-30)
2 files modified
landscape/lib/amp.py (+123/-13)
landscape/lib/tests/test_amp.py (+104/-17)
To merge this branch: bzr merge lp:~free.ekanayaka/landscape-client/method-call-defer
Reviewer Review Type Date Requested Status
Kapil Thangavelu (community) Approve
Duncan McGreggor (community) Approve
Review via email: mp+17758@code.launchpad.net

This proposal supersedes a proposal from 2010-01-20.

To post a comment you must log in.
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Please note that this mechanism is mainly meant for unit-testing. In general our code will not really use this feature or rely on it, because we don't generally wait at all for remote calls that return deferreds. However when unit-testing it is often needed to wait for a remote call to complete, and this branch makes it easy.

Revision history for this message
Duncan McGreggor (oubiwann) wrote :

This is super-cool, Free -- so glad you did this :-)

[1] 1661 tests, all passing -- nice!

[2] This isn't part of this particular change, but there's a bare exception before the deferred check:

| try:
| result = getattr(self.factory.object, method)(*args, **kwargs)
| except Exception, error:
| raise MethodCallError("Remote exception %s" % str(error))

It'd be nice if expected exceptions were handled explicitly here.

[3] You'll want to get Thomas' feedback on this suggestion... but one thing you might want to consider is actually having receive_method_call always return a deferred (with return maybeDeferred(result)). That would make the method much simpler, but it would require changes elsewhere. Regardless, probably enough work to justify being done in a separate ticket/branch ;-)

+1 for merge!

review: Approve
216. By Free Ekanayaka

Use maybeDeferred to simplify receive_method_call logic (Duncan [3])

Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Thanks Duncan! It was awesome to get your Landscape "goodbye review" :)

[1]

Yay, thanks buildbot.

[2]

The thing is that this is a very general method call, so in principle any exception could be raised. However whatever gets raised is wrapped around a MethodCallError, so the remote caller can possibly cope with it.

[3]

Great idea! That indeed simplified the logic quite a bit. Thanks.

Revision history for this message
Kapil Thangavelu (hazmat) wrote :

Nice implementation, thorough test coverage. Looks good to me. +1

review: Approve
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote :

Thanks Kapil, that's merged in r185 of amp-trunk.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'landscape/lib/amp.py'
2--- landscape/lib/amp.py 2010-01-20 08:33:12 +0000
3+++ landscape/lib/amp.py 2010-01-25 08:30:27 +0000
4@@ -1,8 +1,10 @@
5 """Expose the methods of a remote object over AMP."""
6
7-from twisted.internet.defer import Deferred
8+from uuid import uuid4
9+from twisted.internet.defer import Deferred, maybeDeferred
10 from twisted.internet.protocol import ServerFactory, ClientFactory
11 from twisted.protocols.amp import Argument, String, Command, AMP
12+from twisted.python.failure import Failure
13
14 from landscape.lib.bpickle import loads, dumps, dumps_table
15
16@@ -35,11 +37,21 @@
17 ("args", MethodCallArgument()),
18 ("kwargs", MethodCallArgument())]
19
20- response = [("result", MethodCallArgument())]
21+ response = [("result", MethodCallArgument()),
22+ ("deferred", String(optional=True))]
23
24 errors = {MethodCallError: "METHOD_CALL_ERROR"}
25
26
27+class DeferredResponse(Command):
28+ """Fire a L{Deferred} associated with an outstanding method call result."""
29+
30+ arguments = [("uuid", String()),
31+ ("result", MethodCallArgument(optional=True)),
32+ ("failure", String(optional=True))]
33+ requiresAnswer = False
34+
35+
36 class MethodCallServerProtocol(AMP):
37 """Expose methods of a local object over AMP.
38
39@@ -68,17 +80,109 @@
40 if not method in self.methods:
41 raise MethodCallError("Forbidden method '%s'" % method)
42
43- try:
44- result = getattr(self.factory.object, method)(*args, **kwargs)
45- except Exception, error:
46- raise MethodCallError("Remote exception %s" % str(error))
47+ method_func = getattr(self.factory.object, method)
48+ result = maybeDeferred(method_func, *args, **kwargs)
49+
50+ # If the Deferred was already fired, we can return its result
51+ if result.called:
52+ if isinstance(result.result, Failure):
53+ failure = str(result.result.value)
54+ result.addErrback(lambda error: None) # Stop propagating
55+ raise MethodCallError(failure)
56+ return {"result": self._check_result(result.result)}
57+
58+ uuid = str(uuid4())
59+ result.addBoth(self.send_deferred_response, uuid)
60+ return {"result": None, "deferred": uuid}
61+
62+ def _check_result(self, result):
63+ """Check that the C{result} we're about to return is serializable.
64+
65+ @return: The C{result} itself if valid.
66+ @raises: L{MethodCallError} if C{result} is not serializable.
67+ """
68 if not MethodCallArgument.check(result):
69 raise MethodCallError("Non-serializable result")
70- return {"result": result}
71+ return result
72+
73+ def send_deferred_response(self, result, uuid):
74+ """Send a L{DeferredResponse} for the deferred with given C{uuid}.
75+
76+ This is called when the result of a L{Deferred} returned by an
77+ object's method becomes available. A L{DeferredResponse} notifying
78+ such result (either success or failure) is sent to the peer.
79+ """
80+ kwargs = {"uuid": uuid}
81+ if isinstance(result, Failure):
82+ kwargs["failure"] = str(result.value)
83+ else:
84+ kwargs["result"] = self._check_result(result)
85+ self.callRemote(DeferredResponse, **kwargs)
86
87
88 class MethodCallClientProtocol(AMP):
89- """Calls methods of a remote object over L{AMP}."""
90+ """Calls methods of a remote object over L{AMP}.
91+
92+ @note: If the remote method returns a deferred, the associated local
93+ deferred returned by L{send_method_call} will result in the same
94+ callback value of the remote deferred.
95+ @cvar timeout: A timeout for remote methods returning L{Deferred}s, if a
96+ response for the deferred is not received within this amount of
97+ seconds, the remote method call will errback with a L{MethodCallError}.
98+ """
99+ timeout = 60
100+
101+ def __init__(self):
102+ AMP.__init__(self)
103+ self._pending_responses = {}
104+
105+ @DeferredResponse.responder
106+ def receive_deferred_response(self, uuid, result, failure):
107+ """Receive the deferred L{MethodCall} response.
108+
109+ @param uuid: The id of the L{MethodCall} we're getting the result of.
110+ @param result: The result of the associated deferred if successful.
111+ @param failure: The failure message of the deferred if it failed.
112+ """
113+ self.fire_pending_response(uuid, result, failure)
114+ return {}
115+
116+ def fire_pending_response(self, uuid, result, failure):
117+ """Fire the L{Deferred} associated with a pending response.
118+
119+ @param uuid: The id of the L{MethodCall} we're getting the result of.
120+ @param result: The result of the associated deferred if successful.
121+ @param failure: The failure message of the deferred if it failed.
122+ """
123+ try:
124+ deferred, call = self._pending_responses.pop(uuid)
125+ except KeyError:
126+ # Late response for a request that has timeout, just ignore it
127+ return
128+ if not call.called:
129+ call.cancel()
130+ if failure is None:
131+ deferred.callback({"result": result})
132+ else:
133+ deferred.errback(MethodCallError(failure))
134+
135+ def handle_response(self, response):
136+ """Handle a L{MethodCall} response.
137+
138+ If the response is tagged as deferred, it will be queued as pending,
139+ and a L{Deferred} is returned, which will be fired as soon as the
140+ final response becomes available, or the timeout is reached.
141+ """
142+ if response["deferred"]:
143+ uuid = response["deferred"]
144+ deferred = Deferred()
145+ call = self.factory.reactor.callLater(self.timeout,
146+ self.fire_pending_response,
147+ uuid, None, "timeout")
148+ self._pending_responses[uuid] = (deferred, call)
149+ return deferred
150+
151+ return response
152
153 def send_method_call(self, method, args=[], kwargs={}):
154 """Send a L{MethodCall} command with the given arguments.
155@@ -87,8 +191,12 @@
156 @param args: The positional arguments to pass to the remote method.
157 @param kwargs: The keyword arguments to pass to the remote method.
158 """
159- return self.callRemote(MethodCall,
160- method=method, args=args, kwargs=kwargs)
161+ result = self.callRemote(MethodCall,
162+ method=method, args=args, kwargs=kwargs)
163+ # The result can be C{None} only if the requested command is a
164+ # DeferredResponse, which has requiresAnswer set to False
165+ if result is not None:
166+ return result.addCallback(self.handle_response)
167
168
169 class MethodCallServerFactory(ServerFactory):
170@@ -111,16 +219,18 @@
171
172 def __init__(self, reactor, notifier):
173 """
174- @param reactor: The reactor used to schedule connection callbacks.
175+ @param reactor: The reactor used by the created protocols to schedule
176+ notifications and timeouts.
177 @param notifier: A function that will be called when the connection is
178 established. It will be passed the protocol instance as argument.
179 """
180- self._reactor = reactor
181+ self.reactor = reactor
182 self._notifier = notifier
183
184 def buildProtocol(self, addr):
185 protocol = self.protocol()
186- self._reactor.callLater(0, self._notifier, protocol)
187+ protocol.factory = self
188+ self.reactor.callLater(0, self._notifier, protocol)
189 return protocol
190
191
192
193=== modified file 'landscape/lib/tests/test_amp.py'
194--- landscape/lib/tests/test_amp.py 2010-01-20 08:33:12 +0000
195+++ landscape/lib/tests/test_amp.py 2010-01-25 08:30:27 +0000
196@@ -1,9 +1,11 @@
197 from twisted.internet import reactor
198+from twisted.internet.defer import Deferred
199 from twisted.internet.protocol import ClientCreator
200
201 from landscape.lib.amp import (
202- MethodCallError, MethodCallServerProtocol, MethodCallClientProtocol,
203- MethodCallServerFactory, RemoteObject, RemoteObjectCreator)
204+ MethodCallError, MethodCallServerProtocol,
205+ MethodCallClientProtocol, MethodCallServerFactory,
206+ MethodCallClientFactory, RemoteObject, RemoteObjectCreator)
207 from landscape.tests.helpers import LandscapeTest
208
209
210@@ -60,6 +62,24 @@
211 def translate(self, word):
212 raise WordsException("Unknown word")
213
214+ def google(self, word):
215+ deferred = Deferred()
216+ if word == "Landscape":
217+ reactor.callLater(0.01, lambda: deferred.callback("Cool!"))
218+ elif word == "Easy query":
219+ deferred.callback("Done!")
220+ elif word == "Weird stuff":
221+ reactor.callLater(0.01, lambda: deferred.errback(Exception("bad")))
222+ elif word == "Censored":
223+ deferred.errback(Exception("very bad"))
224+ elif word == "Long query":
225+ # Do nothing, the deferred won't be fired at all
226+ pass
227+ elif word == "Slowish query":
228+ # Fire the result after a while.
229+ reactor.callLater(0.05, lambda: deferred.callback("Done!"))
230+ return deferred
231+
232
233 class WordsProtocol(MethodCallServerProtocol):
234
235@@ -72,7 +92,8 @@
236 "multiply_alphabetically",
237 "translate",
238 "meaning_of_life",
239- "guess"]
240+ "guess",
241+ "google"]
242
243
244 class MethodCallProtocolTest(LandscapeTest):
245@@ -114,7 +135,8 @@
246 result = self.protocol.send_method_call(method="empty",
247 args=[],
248 kwargs={})
249- return self.assertSuccess(result, {"result": None})
250+ return self.assertSuccess(result, {"result": None,
251+ "deferred": None})
252
253 def test_with_return_value(self):
254 """
255@@ -124,7 +146,8 @@
256 result = self.protocol.send_method_call(method="motd",
257 args=[],
258 kwargs={})
259- return self.assertSuccess(result, {"result": "Words are cool"})
260+ return self.assertSuccess(result, {"result": "Words are cool",
261+ "deferred": None})
262
263 def test_with_one_argument(self):
264 """
265@@ -134,7 +157,8 @@
266 result = self.protocol.send_method_call(method="capitalize",
267 args=["john"],
268 kwargs={})
269- return self.assertSuccess(result, {"result": "John"})
270+ return self.assertSuccess(result, {"result": "John",
271+ "deferred": None})
272
273 def test_with_boolean_return_value(self):
274 """
275@@ -143,7 +167,8 @@
276 result = self.protocol.send_method_call(method="is_short",
277 args=["hi"],
278 kwargs={})
279- return self.assertSuccess(result, {"result": True})
280+ return self.assertSuccess(result, {"result": True,
281+ "deferred": None})
282
283 def test_with_many_arguments(self):
284 """
285@@ -152,7 +177,8 @@
286 result = self.protocol.send_method_call(method="concatenate",
287 args=["You ", "rock"],
288 kwargs={})
289- return self.assertSuccess(result, {"result": "You rock"})
290+ return self.assertSuccess(result, {"result": "You rock",
291+ "deferred": None})
292
293 def test_with_default_arguments(self):
294 """
295@@ -162,7 +188,8 @@
296 result = self.protocol.send_method_call(method="lower_case",
297 args=["OHH"],
298 kwargs={})
299- return self.assertSuccess(result, {"result": "ohh"})
300+ return self.assertSuccess(result, {"result": "ohh",
301+ "deferred": None})
302
303 def test_with_overriden_default_arguments(self):
304 """
305@@ -173,7 +200,8 @@
306 result = self.protocol.send_method_call(method="lower_case",
307 args=["OHH"],
308 kwargs={"index": 2})
309- return self.assertSuccess(result, {"result": "OHh"})
310+ return self.assertSuccess(result, {"result": "OHh",
311+ "deferred": None})
312
313 def test_with_dictionary_arguments(self):
314 """
315@@ -183,7 +211,8 @@
316 "alphabetically",
317 args=[{"foo": 2, "bar": 3}],
318 kwargs={})
319- return self.assertSuccess(result, {"result": "barbarbarfoofoo"})
320+ return self.assertSuccess(result, {"result": "barbarbarfoofoo",
321+ "deferred": None})
322
323 def test_with_non_serializable_return_value(self):
324 """
325@@ -211,17 +240,19 @@
326 def setUp(self):
327 super(RemoteObjectTest, self).setUp()
328 socket = self.mktemp()
329- factory = MethodCallServerFactory(Words())
330- factory.protocol = WordsProtocol
331- self.port = reactor.listenUNIX(socket, factory)
332+ server_factory = MethodCallServerFactory(Words())
333+ server_factory.protocol = WordsProtocol
334+ self.port = reactor.listenUNIX(socket, server_factory)
335
336 def set_remote(protocol):
337 self.protocol = protocol
338 self.words = RemoteObject(protocol)
339
340- connector = ClientCreator(reactor, MethodCallClientProtocol)
341- connected = connector.connectUNIX(socket)
342- return connected.addCallback(set_remote)
343+ connected = Deferred()
344+ connected.addCallback(set_remote)
345+ client_factory = MethodCallClientFactory(reactor, connected.callback)
346+ reactor.connectUNIX(socket, client_factory)
347+ return connected
348
349 def tearDown(self):
350 self.protocol.transport.loseConnection()
351@@ -319,6 +350,62 @@
352 result = self.words.guess("word", "cool", value=4)
353 return self.assertSuccess(result, "Guessed!")
354
355+ def test_with_success_full_deferred(self):
356+ """
357+ If the target object method returns a L{Deferred}, it is handled
358+ transparently.
359+ """
360+ result = self.words.google("Landscape")
361+ return self.assertSuccess(result, "Cool!")
362+
363+ def test_with_failing_deferred(self):
364+ """
365+ If the target object method returns a failing L{Deferred}, a
366+ L{MethodCallError} is raised.
367+ """
368+ result = self.words.google("Weird stuff")
369+ return self.assertFailure(result, MethodCallError)
370+
371+ def test_with_already_callback_deferred(self):
372+ """
373+ The target object method can return an already fired L{Deferred}.
374+ """
375+ result = self.words.google("Easy query")
376+ return self.assertSuccess(result, "Done!")
377+
378+ def test_with_already_errback_deferred(self):
379+ """
380+ If the target object method can return an already failed L{Deferred}.
381+ """
382+ result = self.words.google("Censored")
383+ return self.assertFailure(result, MethodCallError)
384+
385+ def test_with_deferred_timeout(self):
386+ """
387+ If the peer protocol doesn't send a response for a deferred within
388+ the given timeout, the method call fails.
389+ """
390+ self.protocol.timeout = 0.1
391+ result = self.words.google("Long query")
392+ return self.assertFailure(result, MethodCallError)
393+
394+ def test_with_late_response(self):
395+ """
396+ If the peer protocol sends a late response for a request that has
397+ already timeout, that response is ignored.
398+ """
399+ self.protocol.timeout = 0.01
400+ result = self.words.google("Slowish query")
401+ self.assertFailure(result, MethodCallError)
402+
403+ def assert_late_response_is_handled(ignored):
404+ deferred = Deferred()
405+ # We wait a bit to be sure that the late response gets delivered
406+ reactor.callLater(0.1, lambda: deferred.callback(None))
407+ return deferred
408+
409+ return result.addCallback(assert_late_response_is_handled)
410+
411
412 class RemoteObjectCreatorTest(LandscapeTest):
413

Subscribers

People subscribed via source and target branches

to all changes: