Merge lp:~free.ekanayaka/landscape-client/method-call-defer into lp:~landscape/landscape-client/amp-trunk
- method-call-defer
- Merge into amp-trunk
Proposed by
Free Ekanayaka
Status: | Superseded |
---|---|
Proposed branch: | lp:~free.ekanayaka/landscape-client/method-call-defer |
Merge into: | lp:~landscape/landscape-client/amp-trunk |
Diff against target: |
402 lines (+221/-24) 2 files modified
landscape/lib/amp.py (+117/-7) landscape/lib/tests/test_amp.py (+104/-17) |
To merge this branch: | bzr merge lp:~free.ekanayaka/landscape-client/method-call-defer |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Landscape | Pending | ||
Landscape | Pending | ||
Review via email: mp+17726@code.launchpad.net |
This proposal has been superseded by a proposal from 2010-01-20.
Commit message
Description of the change
To post a comment you must log in.
- 216. By Free Ekanayaka
-
Use maybeDeferred to simplify receive_method_call logic (Duncan [3])
Unmerged revisions
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'landscape/lib/amp.py' |
2 | --- landscape/lib/amp.py 2010-01-20 08:33:12 +0000 |
3 | +++ landscape/lib/amp.py 2010-01-20 09:25:23 +0000 |
4 | @@ -1,8 +1,10 @@ |
5 | """Expose the methods of a remote object over AMP.""" |
6 | |
7 | +from uuid import uuid4 |
8 | from twisted.internet.defer import Deferred |
9 | from twisted.internet.protocol import ServerFactory, ClientFactory |
10 | from twisted.protocols.amp import Argument, String, Command, AMP |
11 | +from twisted.python.failure import Failure |
12 | |
13 | from landscape.lib.bpickle import loads, dumps, dumps_table |
14 | |
15 | @@ -35,11 +37,21 @@ |
16 | ("args", MethodCallArgument()), |
17 | ("kwargs", MethodCallArgument())] |
18 | |
19 | - response = [("result", MethodCallArgument())] |
20 | + response = [("result", MethodCallArgument()), |
21 | + ("deferred", String(optional=True))] |
22 | |
23 | errors = {MethodCallError: "METHOD_CALL_ERROR"} |
24 | |
25 | |
26 | +class DeferredResponse(Command): |
27 | + """Fire a L{Deferred} associated with an outstanding method call result.""" |
28 | + |
29 | + arguments = [("uuid", String()), |
30 | + ("result", MethodCallArgument(optional=True)), |
31 | + ("failure", String(optional=True))] |
32 | + requiresAnswer = False |
33 | + |
34 | + |
35 | class MethodCallServerProtocol(AMP): |
36 | """Expose methods of a local object over AMP. |
37 | |
38 | @@ -72,13 +84,105 @@ |
39 | result = getattr(self.factory.object, method)(*args, **kwargs) |
40 | except Exception, error: |
41 | raise MethodCallError("Remote exception %s" % str(error)) |
42 | + |
43 | + # If the method returns a Deferred, register a callback that will |
44 | + # eventually notify the remote peer of its success or failure. |
45 | + if isinstance(result, Deferred): |
46 | + |
47 | + # If the Deferred was already fired, we can return its result |
48 | + if result.called: |
49 | + if isinstance(result.result, Failure): |
50 | + failure = str(result.result.value) |
51 | + result.addErrback(lambda error: None) # Stop propagating |
52 | + raise MethodCallError(failure) |
53 | + return {"result": result.result} |
54 | + |
55 | + uuid = str(uuid4()) |
56 | + result.addBoth(self.send_deferred_response, uuid) |
57 | + return {"result": None, "deferred": uuid} |
58 | + |
59 | if not MethodCallArgument.check(result): |
60 | raise MethodCallError("Non-serializable result") |
61 | return {"result": result} |
62 | |
63 | + def send_deferred_response(self, result, uuid): |
64 | + """Send a L{DeferredResponse} for the deferred with given C{uuid}. |
65 | + |
66 | + This is called when the result of a L{Deferred} returned by an |
67 | + object's method becomes available. A L{DeferredResponse} notifying |
68 | + such result (either success or failure) is sent to the peer. |
69 | + """ |
70 | + kwargs = {"uuid": uuid} |
71 | + if isinstance(result, Failure): |
72 | + kwargs["failure"] = str(result.value) |
73 | + else: |
74 | + kwargs["result"] = result |
75 | + self.callRemote(DeferredResponse, **kwargs) |
76 | + |
77 | |
78 | class MethodCallClientProtocol(AMP): |
79 | - """Calls methods of a remote object over L{AMP}.""" |
80 | + """Calls methods of a remote object over L{AMP}. |
81 | + |
82 | + @note: If the remote method returns a deferred, the associated local |
83 | + deferred returned by L{send_method_call} will result in the same |
84 | + callback value of the remote deferred. |
85 | + @cvar timeout: A timeout for remote methods returning L{Deferred}s, if a |
86 | + response for the deferred is not received within this amount of |
87 | + seconds, the remote method call will errback with a L{MethodCallError}. |
88 | + """ |
89 | + timeout = 60 |
90 | + |
91 | + def __init__(self): |
92 | + AMP.__init__(self) |
93 | + self._pending_responses = {} |
94 | + |
95 | + @DeferredResponse.responder |
96 | + def receive_deferred_response(self, uuid, result, failure): |
97 | + """Receive the deferred L{MethodCall} response. |
98 | + |
99 | + @param uuid: The id of the L{MethodCall} we're getting the result of. |
100 | + @param result: The result of the associated deferred if successful. |
101 | + @param failure: The failure message of the deferred if it failed. |
102 | + """ |
103 | + self.fire_pending_response(uuid, result, failure) |
104 | + return {} |
105 | + |
106 | + def fire_pending_response(self, uuid, result, failure): |
107 | + """Fire the L{Deferred} associated with a pending response. |
108 | + |
109 | + @param uuid: The id of the L{MethodCall} we're getting the result of. |
110 | + @param result: The result of the associated deferred if successful. |
111 | + @param failure: The failure message of the deferred if it failed. |
112 | + """ |
113 | + try: |
114 | + deferred, call = self._pending_responses.pop(uuid) |
115 | + except KeyError: |
116 | + # Late response for a request that has timeout, just ignore it |
117 | + return |
118 | + if not call.called: |
119 | + call.cancel() |
120 | + if failure is None: |
121 | + deferred.callback({"result": result}) |
122 | + else: |
123 | + deferred.errback(MethodCallError(failure)) |
124 | + |
125 | + def handle_response(self, response): |
126 | + """Handle a L{MethodCall} response. |
127 | + |
128 | + If the response is tagged as deferred, it will be queued as pending, |
129 | + and a L{Deferred} is returned, which will be fired as soon as the |
130 | + final response becomes available, or the timeout is reached. |
131 | + """ |
132 | + if response["deferred"]: |
133 | + uuid = response["deferred"] |
134 | + deferred = Deferred() |
135 | + call = self.factory.reactor.callLater(self.timeout, |
136 | + self.fire_pending_response, |
137 | + uuid, None, "timeout") |
138 | + self._pending_responses[uuid] = (deferred, call) |
139 | + return deferred |
140 | + |
141 | + return response |
142 | |
143 | def send_method_call(self, method, args=[], kwargs={}): |
144 | """Send a L{MethodCall} command with the given arguments. |
145 | @@ -87,8 +191,12 @@ |
146 | @param args: The positional arguments to pass to the remote method. |
147 | @param kwargs: The keyword arguments to pass to the remote method. |
148 | """ |
149 | - return self.callRemote(MethodCall, |
150 | - method=method, args=args, kwargs=kwargs) |
151 | + result = self.callRemote(MethodCall, |
152 | + method=method, args=args, kwargs=kwargs) |
153 | + # The result can be C{None} only if the requested command is a |
154 | + # DeferredResponse, which has requiresAnswer set to False |
155 | + if result is not None: |
156 | + return result.addCallback(self.handle_response) |
157 | |
158 | |
159 | class MethodCallServerFactory(ServerFactory): |
160 | @@ -111,16 +219,18 @@ |
161 | |
162 | def __init__(self, reactor, notifier): |
163 | """ |
164 | - @param reactor: The reactor used to schedule connection callbacks. |
165 | + @param reactor: The reactor used by the created protocols to schedule |
166 | + notifications and timeouts. |
167 | @param notifier: A function that will be called when the connection is |
168 | established. It will be passed the protocol instance as argument. |
169 | """ |
170 | - self._reactor = reactor |
171 | + self.reactor = reactor |
172 | self._notifier = notifier |
173 | |
174 | def buildProtocol(self, addr): |
175 | protocol = self.protocol() |
176 | - self._reactor.callLater(0, self._notifier, protocol) |
177 | + protocol.factory = self |
178 | + self.reactor.callLater(0, self._notifier, protocol) |
179 | return protocol |
180 | |
181 | |
182 | |
183 | === modified file 'landscape/lib/tests/test_amp.py' |
184 | --- landscape/lib/tests/test_amp.py 2010-01-20 08:33:12 +0000 |
185 | +++ landscape/lib/tests/test_amp.py 2010-01-20 09:25:23 +0000 |
186 | @@ -1,9 +1,11 @@ |
187 | from twisted.internet import reactor |
188 | +from twisted.internet.defer import Deferred |
189 | from twisted.internet.protocol import ClientCreator |
190 | |
191 | from landscape.lib.amp import ( |
192 | - MethodCallError, MethodCallServerProtocol, MethodCallClientProtocol, |
193 | - MethodCallServerFactory, RemoteObject, RemoteObjectCreator) |
194 | + MethodCallError, MethodCallServerProtocol, |
195 | + MethodCallClientProtocol, MethodCallServerFactory, |
196 | + MethodCallClientFactory, RemoteObject, RemoteObjectCreator) |
197 | from landscape.tests.helpers import LandscapeTest |
198 | |
199 | |
200 | @@ -60,6 +62,24 @@ |
201 | def translate(self, word): |
202 | raise WordsException("Unknown word") |
203 | |
204 | + def google(self, word): |
205 | + deferred = Deferred() |
206 | + if word == "Landscape": |
207 | + reactor.callLater(0.01, lambda: deferred.callback("Cool!")) |
208 | + elif word == "Easy query": |
209 | + deferred.callback("Done!") |
210 | + elif word == "Weird stuff": |
211 | + reactor.callLater(0.01, lambda: deferred.errback(Exception("bad"))) |
212 | + elif word == "Censored": |
213 | + deferred.errback(Exception("very bad")) |
214 | + elif word == "Long query": |
215 | + # Do nothing, the deferred won't be fired at all |
216 | + pass |
217 | + elif word == "Slowish query": |
218 | + # Fire the result after a while. |
219 | + reactor.callLater(0.05, lambda: deferred.callback("Done!")) |
220 | + return deferred |
221 | + |
222 | |
223 | class WordsProtocol(MethodCallServerProtocol): |
224 | |
225 | @@ -72,7 +92,8 @@ |
226 | "multiply_alphabetically", |
227 | "translate", |
228 | "meaning_of_life", |
229 | - "guess"] |
230 | + "guess", |
231 | + "google"] |
232 | |
233 | |
234 | class MethodCallProtocolTest(LandscapeTest): |
235 | @@ -114,7 +135,8 @@ |
236 | result = self.protocol.send_method_call(method="empty", |
237 | args=[], |
238 | kwargs={}) |
239 | - return self.assertSuccess(result, {"result": None}) |
240 | + return self.assertSuccess(result, {"result": None, |
241 | + "deferred": None}) |
242 | |
243 | def test_with_return_value(self): |
244 | """ |
245 | @@ -124,7 +146,8 @@ |
246 | result = self.protocol.send_method_call(method="motd", |
247 | args=[], |
248 | kwargs={}) |
249 | - return self.assertSuccess(result, {"result": "Words are cool"}) |
250 | + return self.assertSuccess(result, {"result": "Words are cool", |
251 | + "deferred": None}) |
252 | |
253 | def test_with_one_argument(self): |
254 | """ |
255 | @@ -134,7 +157,8 @@ |
256 | result = self.protocol.send_method_call(method="capitalize", |
257 | args=["john"], |
258 | kwargs={}) |
259 | - return self.assertSuccess(result, {"result": "John"}) |
260 | + return self.assertSuccess(result, {"result": "John", |
261 | + "deferred": None}) |
262 | |
263 | def test_with_boolean_return_value(self): |
264 | """ |
265 | @@ -143,7 +167,8 @@ |
266 | result = self.protocol.send_method_call(method="is_short", |
267 | args=["hi"], |
268 | kwargs={}) |
269 | - return self.assertSuccess(result, {"result": True}) |
270 | + return self.assertSuccess(result, {"result": True, |
271 | + "deferred": None}) |
272 | |
273 | def test_with_many_arguments(self): |
274 | """ |
275 | @@ -152,7 +177,8 @@ |
276 | result = self.protocol.send_method_call(method="concatenate", |
277 | args=["You ", "rock"], |
278 | kwargs={}) |
279 | - return self.assertSuccess(result, {"result": "You rock"}) |
280 | + return self.assertSuccess(result, {"result": "You rock", |
281 | + "deferred": None}) |
282 | |
283 | def test_with_default_arguments(self): |
284 | """ |
285 | @@ -162,7 +188,8 @@ |
286 | result = self.protocol.send_method_call(method="lower_case", |
287 | args=["OHH"], |
288 | kwargs={}) |
289 | - return self.assertSuccess(result, {"result": "ohh"}) |
290 | + return self.assertSuccess(result, {"result": "ohh", |
291 | + "deferred": None}) |
292 | |
293 | def test_with_overriden_default_arguments(self): |
294 | """ |
295 | @@ -173,7 +200,8 @@ |
296 | result = self.protocol.send_method_call(method="lower_case", |
297 | args=["OHH"], |
298 | kwargs={"index": 2}) |
299 | - return self.assertSuccess(result, {"result": "OHh"}) |
300 | + return self.assertSuccess(result, {"result": "OHh", |
301 | + "deferred": None}) |
302 | |
303 | def test_with_dictionary_arguments(self): |
304 | """ |
305 | @@ -183,7 +211,8 @@ |
306 | "alphabetically", |
307 | args=[{"foo": 2, "bar": 3}], |
308 | kwargs={}) |
309 | - return self.assertSuccess(result, {"result": "barbarbarfoofoo"}) |
310 | + return self.assertSuccess(result, {"result": "barbarbarfoofoo", |
311 | + "deferred": None}) |
312 | |
313 | def test_with_non_serializable_return_value(self): |
314 | """ |
315 | @@ -211,17 +240,19 @@ |
316 | def setUp(self): |
317 | super(RemoteObjectTest, self).setUp() |
318 | socket = self.mktemp() |
319 | - factory = MethodCallServerFactory(Words()) |
320 | - factory.protocol = WordsProtocol |
321 | - self.port = reactor.listenUNIX(socket, factory) |
322 | + server_factory = MethodCallServerFactory(Words()) |
323 | + server_factory.protocol = WordsProtocol |
324 | + self.port = reactor.listenUNIX(socket, server_factory) |
325 | |
326 | def set_remote(protocol): |
327 | self.protocol = protocol |
328 | self.words = RemoteObject(protocol) |
329 | |
330 | - connector = ClientCreator(reactor, MethodCallClientProtocol) |
331 | - connected = connector.connectUNIX(socket) |
332 | - return connected.addCallback(set_remote) |
333 | + connected = Deferred() |
334 | + connected.addCallback(set_remote) |
335 | + client_factory = MethodCallClientFactory(reactor, connected.callback) |
336 | + reactor.connectUNIX(socket, client_factory) |
337 | + return connected |
338 | |
339 | def tearDown(self): |
340 | self.protocol.transport.loseConnection() |
341 | @@ -319,6 +350,62 @@ |
342 | result = self.words.guess("word", "cool", value=4) |
343 | return self.assertSuccess(result, "Guessed!") |
344 | |
345 | + def test_with_success_full_deferred(self): |
346 | + """ |
347 | + If the target object method returns a L{Deferred}, it is handled |
348 | + transparently. |
349 | + """ |
350 | + result = self.words.google("Landscape") |
351 | + return self.assertSuccess(result, "Cool!") |
352 | + |
353 | + def test_with_failing_deferred(self): |
354 | + """ |
355 | + If the target object method returns a failing L{Deferred}, a |
356 | + L{MethodCallError} is raised. |
357 | + """ |
358 | + result = self.words.google("Weird stuff") |
359 | + return self.assertFailure(result, MethodCallError) |
360 | + |
361 | + def test_with_already_callback_deferred(self): |
362 | + """ |
363 | + The target object method can return an already fired L{Deferred}. |
364 | + """ |
365 | + result = self.words.google("Easy query") |
366 | + return self.assertSuccess(result, "Done!") |
367 | + |
368 | + def test_with_already_errback_deferred(self): |
369 | + """ |
370 | + If the target object method can return an already failed L{Deferred}. |
371 | + """ |
372 | + result = self.words.google("Censored") |
373 | + return self.assertFailure(result, MethodCallError) |
374 | + |
375 | + def test_with_deferred_timeout(self): |
376 | + """ |
377 | + If the peer protocol doesn't send a response for a deferred within |
378 | + the given timeout, the method call fails. |
379 | + """ |
380 | + self.protocol.timeout = 0.1 |
381 | + result = self.words.google("Long query") |
382 | + return self.assertFailure(result, MethodCallError) |
383 | + |
384 | + def test_with_late_response(self): |
385 | + """ |
386 | + If the peer protocol sends a late response for a request that has |
387 | + already timeout, that response is ignored. |
388 | + """ |
389 | + self.protocol.timeout = 0.01 |
390 | + result = self.words.google("Slowish query") |
391 | + self.assertFailure(result, MethodCallError) |
392 | + |
393 | + def assert_late_response_is_handled(ignored): |
394 | + deferred = Deferred() |
395 | + # We wait a bit to be sure that the late response gets delivered |
396 | + reactor.callLater(0.1, lambda: deferred.callback(None)) |
397 | + return deferred |
398 | + |
399 | + return result.addCallback(assert_late_response_is_handled) |
400 | + |
401 | |
402 | class RemoteObjectCreatorTest(LandscapeTest): |
403 |