Merge lp:~free.ekanayaka/landscape-client/method-call-defer into lp:~landscape/landscape-client/amp-trunk

Proposed by Free Ekanayaka
Status: Superseded
Proposed branch: lp:~free.ekanayaka/landscape-client/method-call-defer
Merge into: lp:~landscape/landscape-client/amp-trunk
Diff against target: 402 lines (+221/-24)
2 files modified
landscape/lib/amp.py (+117/-7)
landscape/lib/tests/test_amp.py (+104/-17)
To merge this branch: bzr merge lp:~free.ekanayaka/landscape-client/method-call-defer
Reviewer Review Type Date Requested Status
Landscape Pending
Landscape Pending
Review via email: mp+17726@code.launchpad.net

This proposal has been superseded by a proposal from 2010-01-20.

To post a comment you must log in.
216. By Free Ekanayaka

Use maybeDeferred to simplify receive_method_call logic (Duncan [3])

Unmerged revisions

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'landscape/lib/amp.py'
2--- landscape/lib/amp.py 2010-01-20 08:33:12 +0000
3+++ landscape/lib/amp.py 2010-01-20 09:25:23 +0000
4@@ -1,8 +1,10 @@
5 """Expose the methods of a remote object over AMP."""
6
7+from uuid import uuid4
8 from twisted.internet.defer import Deferred
9 from twisted.internet.protocol import ServerFactory, ClientFactory
10 from twisted.protocols.amp import Argument, String, Command, AMP
11+from twisted.python.failure import Failure
12
13 from landscape.lib.bpickle import loads, dumps, dumps_table
14
15@@ -35,11 +37,21 @@
16 ("args", MethodCallArgument()),
17 ("kwargs", MethodCallArgument())]
18
19- response = [("result", MethodCallArgument())]
20+ response = [("result", MethodCallArgument()),
21+ ("deferred", String(optional=True))]
22
23 errors = {MethodCallError: "METHOD_CALL_ERROR"}
24
25
26+class DeferredResponse(Command):
27+ """Fire a L{Deferred} associated with an outstanding method call result."""
28+
29+ arguments = [("uuid", String()),
30+ ("result", MethodCallArgument(optional=True)),
31+ ("failure", String(optional=True))]
32+ requiresAnswer = False
33+
34+
35 class MethodCallServerProtocol(AMP):
36 """Expose methods of a local object over AMP.
37
38@@ -72,13 +84,105 @@
39 result = getattr(self.factory.object, method)(*args, **kwargs)
40 except Exception, error:
41 raise MethodCallError("Remote exception %s" % str(error))
42+
43+ # If the method returns a Deferred, register a callback that will
44+ # eventually notify the remote peer of its success or failure.
45+ if isinstance(result, Deferred):
46+
47+ # If the Deferred was already fired, we can return its result
48+ if result.called:
49+ if isinstance(result.result, Failure):
50+ failure = str(result.result.value)
51+ result.addErrback(lambda error: None) # Stop propagating
52+ raise MethodCallError(failure)
53+ return {"result": result.result}
54+
55+ uuid = str(uuid4())
56+ result.addBoth(self.send_deferred_response, uuid)
57+ return {"result": None, "deferred": uuid}
58+
59 if not MethodCallArgument.check(result):
60 raise MethodCallError("Non-serializable result")
61 return {"result": result}
62
63+ def send_deferred_response(self, result, uuid):
64+ """Send a L{DeferredResponse} for the deferred with given C{uuid}.
65+
66+ This is called when the result of a L{Deferred} returned by an
67+ object's method becomes available. A L{DeferredResponse} notifying
68+ such result (either success or failure) is sent to the peer.
69+ """
70+ kwargs = {"uuid": uuid}
71+ if isinstance(result, Failure):
72+ kwargs["failure"] = str(result.value)
73+ else:
74+ kwargs["result"] = result
75+ self.callRemote(DeferredResponse, **kwargs)
76+
77
78 class MethodCallClientProtocol(AMP):
79- """Calls methods of a remote object over L{AMP}."""
80+ """Calls methods of a remote object over L{AMP}.
81+
82+ @note: If the remote method returns a deferred, the associated local
83+ deferred returned by L{send_method_call} will result in the same
84+ callback value of the remote deferred.
85+ @cvar timeout: A timeout for remote methods returning L{Deferred}s, if a
86+ response for the deferred is not received within this amount of
87+ seconds, the remote method call will errback with a L{MethodCallError}.
88+ """
89+ timeout = 60
90+
91+ def __init__(self):
92+ AMP.__init__(self)
93+ self._pending_responses = {}
94+
95+ @DeferredResponse.responder
96+ def receive_deferred_response(self, uuid, result, failure):
97+ """Receive the deferred L{MethodCall} response.
98+
99+ @param uuid: The id of the L{MethodCall} we're getting the result of.
100+ @param result: The result of the associated deferred if successful.
101+ @param failure: The failure message of the deferred if it failed.
102+ """
103+ self.fire_pending_response(uuid, result, failure)
104+ return {}
105+
106+ def fire_pending_response(self, uuid, result, failure):
107+ """Fire the L{Deferred} associated with a pending response.
108+
109+ @param uuid: The id of the L{MethodCall} we're getting the result of.
110+ @param result: The result of the associated deferred if successful.
111+ @param failure: The failure message of the deferred if it failed.
112+ """
113+ try:
114+ deferred, call = self._pending_responses.pop(uuid)
115+ except KeyError:
116+ # Late response for a request that has timeout, just ignore it
117+ return
118+ if not call.called:
119+ call.cancel()
120+ if failure is None:
121+ deferred.callback({"result": result})
122+ else:
123+ deferred.errback(MethodCallError(failure))
124+
125+ def handle_response(self, response):
126+ """Handle a L{MethodCall} response.
127+
128+ If the response is tagged as deferred, it will be queued as pending,
129+ and a L{Deferred} is returned, which will be fired as soon as the
130+ final response becomes available, or the timeout is reached.
131+ """
132+ if response["deferred"]:
133+ uuid = response["deferred"]
134+ deferred = Deferred()
135+ call = self.factory.reactor.callLater(self.timeout,
136+ self.fire_pending_response,
137+ uuid, None, "timeout")
138+ self._pending_responses[uuid] = (deferred, call)
139+ return deferred
140+
141+ return response
142
143 def send_method_call(self, method, args=[], kwargs={}):
144 """Send a L{MethodCall} command with the given arguments.
145@@ -87,8 +191,12 @@
146 @param args: The positional arguments to pass to the remote method.
147 @param kwargs: The keyword arguments to pass to the remote method.
148 """
149- return self.callRemote(MethodCall,
150- method=method, args=args, kwargs=kwargs)
151+ result = self.callRemote(MethodCall,
152+ method=method, args=args, kwargs=kwargs)
153+ # The result can be C{None} only if the requested command is a
154+ # DeferredResponse, which has requiresAnswer set to False
155+ if result is not None:
156+ return result.addCallback(self.handle_response)
157
158
159 class MethodCallServerFactory(ServerFactory):
160@@ -111,16 +219,18 @@
161
162 def __init__(self, reactor, notifier):
163 """
164- @param reactor: The reactor used to schedule connection callbacks.
165+ @param reactor: The reactor used by the created protocols to schedule
166+ notifications and timeouts.
167 @param notifier: A function that will be called when the connection is
168 established. It will be passed the protocol instance as argument.
169 """
170- self._reactor = reactor
171+ self.reactor = reactor
172 self._notifier = notifier
173
174 def buildProtocol(self, addr):
175 protocol = self.protocol()
176- self._reactor.callLater(0, self._notifier, protocol)
177+ protocol.factory = self
178+ self.reactor.callLater(0, self._notifier, protocol)
179 return protocol
180
181
182
183=== modified file 'landscape/lib/tests/test_amp.py'
184--- landscape/lib/tests/test_amp.py 2010-01-20 08:33:12 +0000
185+++ landscape/lib/tests/test_amp.py 2010-01-20 09:25:23 +0000
186@@ -1,9 +1,11 @@
187 from twisted.internet import reactor
188+from twisted.internet.defer import Deferred
189 from twisted.internet.protocol import ClientCreator
190
191 from landscape.lib.amp import (
192- MethodCallError, MethodCallServerProtocol, MethodCallClientProtocol,
193- MethodCallServerFactory, RemoteObject, RemoteObjectCreator)
194+ MethodCallError, MethodCallServerProtocol,
195+ MethodCallClientProtocol, MethodCallServerFactory,
196+ MethodCallClientFactory, RemoteObject, RemoteObjectCreator)
197 from landscape.tests.helpers import LandscapeTest
198
199
200@@ -60,6 +62,24 @@
201 def translate(self, word):
202 raise WordsException("Unknown word")
203
204+ def google(self, word):
205+ deferred = Deferred()
206+ if word == "Landscape":
207+ reactor.callLater(0.01, lambda: deferred.callback("Cool!"))
208+ elif word == "Easy query":
209+ deferred.callback("Done!")
210+ elif word == "Weird stuff":
211+ reactor.callLater(0.01, lambda: deferred.errback(Exception("bad")))
212+ elif word == "Censored":
213+ deferred.errback(Exception("very bad"))
214+ elif word == "Long query":
215+ # Do nothing, the deferred won't be fired at all
216+ pass
217+ elif word == "Slowish query":
218+ # Fire the result after a while.
219+ reactor.callLater(0.05, lambda: deferred.callback("Done!"))
220+ return deferred
221+
222
223 class WordsProtocol(MethodCallServerProtocol):
224
225@@ -72,7 +92,8 @@
226 "multiply_alphabetically",
227 "translate",
228 "meaning_of_life",
229- "guess"]
230+ "guess",
231+ "google"]
232
233
234 class MethodCallProtocolTest(LandscapeTest):
235@@ -114,7 +135,8 @@
236 result = self.protocol.send_method_call(method="empty",
237 args=[],
238 kwargs={})
239- return self.assertSuccess(result, {"result": None})
240+ return self.assertSuccess(result, {"result": None,
241+ "deferred": None})
242
243 def test_with_return_value(self):
244 """
245@@ -124,7 +146,8 @@
246 result = self.protocol.send_method_call(method="motd",
247 args=[],
248 kwargs={})
249- return self.assertSuccess(result, {"result": "Words are cool"})
250+ return self.assertSuccess(result, {"result": "Words are cool",
251+ "deferred": None})
252
253 def test_with_one_argument(self):
254 """
255@@ -134,7 +157,8 @@
256 result = self.protocol.send_method_call(method="capitalize",
257 args=["john"],
258 kwargs={})
259- return self.assertSuccess(result, {"result": "John"})
260+ return self.assertSuccess(result, {"result": "John",
261+ "deferred": None})
262
263 def test_with_boolean_return_value(self):
264 """
265@@ -143,7 +167,8 @@
266 result = self.protocol.send_method_call(method="is_short",
267 args=["hi"],
268 kwargs={})
269- return self.assertSuccess(result, {"result": True})
270+ return self.assertSuccess(result, {"result": True,
271+ "deferred": None})
272
273 def test_with_many_arguments(self):
274 """
275@@ -152,7 +177,8 @@
276 result = self.protocol.send_method_call(method="concatenate",
277 args=["You ", "rock"],
278 kwargs={})
279- return self.assertSuccess(result, {"result": "You rock"})
280+ return self.assertSuccess(result, {"result": "You rock",
281+ "deferred": None})
282
283 def test_with_default_arguments(self):
284 """
285@@ -162,7 +188,8 @@
286 result = self.protocol.send_method_call(method="lower_case",
287 args=["OHH"],
288 kwargs={})
289- return self.assertSuccess(result, {"result": "ohh"})
290+ return self.assertSuccess(result, {"result": "ohh",
291+ "deferred": None})
292
293 def test_with_overriden_default_arguments(self):
294 """
295@@ -173,7 +200,8 @@
296 result = self.protocol.send_method_call(method="lower_case",
297 args=["OHH"],
298 kwargs={"index": 2})
299- return self.assertSuccess(result, {"result": "OHh"})
300+ return self.assertSuccess(result, {"result": "OHh",
301+ "deferred": None})
302
303 def test_with_dictionary_arguments(self):
304 """
305@@ -183,7 +211,8 @@
306 "alphabetically",
307 args=[{"foo": 2, "bar": 3}],
308 kwargs={})
309- return self.assertSuccess(result, {"result": "barbarbarfoofoo"})
310+ return self.assertSuccess(result, {"result": "barbarbarfoofoo",
311+ "deferred": None})
312
313 def test_with_non_serializable_return_value(self):
314 """
315@@ -211,17 +240,19 @@
316 def setUp(self):
317 super(RemoteObjectTest, self).setUp()
318 socket = self.mktemp()
319- factory = MethodCallServerFactory(Words())
320- factory.protocol = WordsProtocol
321- self.port = reactor.listenUNIX(socket, factory)
322+ server_factory = MethodCallServerFactory(Words())
323+ server_factory.protocol = WordsProtocol
324+ self.port = reactor.listenUNIX(socket, server_factory)
325
326 def set_remote(protocol):
327 self.protocol = protocol
328 self.words = RemoteObject(protocol)
329
330- connector = ClientCreator(reactor, MethodCallClientProtocol)
331- connected = connector.connectUNIX(socket)
332- return connected.addCallback(set_remote)
333+ connected = Deferred()
334+ connected.addCallback(set_remote)
335+ client_factory = MethodCallClientFactory(reactor, connected.callback)
336+ reactor.connectUNIX(socket, client_factory)
337+ return connected
338
339 def tearDown(self):
340 self.protocol.transport.loseConnection()
341@@ -319,6 +350,62 @@
342 result = self.words.guess("word", "cool", value=4)
343 return self.assertSuccess(result, "Guessed!")
344
345+ def test_with_success_full_deferred(self):
346+ """
347+ If the target object method returns a L{Deferred}, it is handled
348+ transparently.
349+ """
350+ result = self.words.google("Landscape")
351+ return self.assertSuccess(result, "Cool!")
352+
353+ def test_with_failing_deferred(self):
354+ """
355+ If the target object method returns a failing L{Deferred}, a
356+ L{MethodCallError} is raised.
357+ """
358+ result = self.words.google("Weird stuff")
359+ return self.assertFailure(result, MethodCallError)
360+
361+ def test_with_already_callback_deferred(self):
362+ """
363+ The target object method can return an already fired L{Deferred}.
364+ """
365+ result = self.words.google("Easy query")
366+ return self.assertSuccess(result, "Done!")
367+
368+ def test_with_already_errback_deferred(self):
369+ """
370+ If the target object method can return an already failed L{Deferred}.
371+ """
372+ result = self.words.google("Censored")
373+ return self.assertFailure(result, MethodCallError)
374+
375+ def test_with_deferred_timeout(self):
376+ """
377+ If the peer protocol doesn't send a response for a deferred within
378+ the given timeout, the method call fails.
379+ """
380+ self.protocol.timeout = 0.1
381+ result = self.words.google("Long query")
382+ return self.assertFailure(result, MethodCallError)
383+
384+ def test_with_late_response(self):
385+ """
386+ If the peer protocol sends a late response for a request that has
387+ already timeout, that response is ignored.
388+ """
389+ self.protocol.timeout = 0.01
390+ result = self.words.google("Slowish query")
391+ self.assertFailure(result, MethodCallError)
392+
393+ def assert_late_response_is_handled(ignored):
394+ deferred = Deferred()
395+ # We wait a bit to be sure that the late response gets delivered
396+ reactor.callLater(0.1, lambda: deferred.callback(None))
397+ return deferred
398+
399+ return result.addCallback(assert_late_response_is_handled)
400+
401
402 class RemoteObjectCreatorTest(LandscapeTest):
403

Subscribers

People subscribed via source and target branches

to all changes: