Merge lp:~free.ekanayaka/landscape-client/method-call-defer into lp:~landscape/landscape-client/amp-trunk
- method-call-defer
- Merge into amp-trunk
Status: | Merged |
---|---|
Approved by: | Sidnei da Silva |
Approved revision: | not available |
Merge reported by: | Free Ekanayaka |
Merged at revision: | not available |
Proposed branch: | lp:~free.ekanayaka/landscape-client/method-call-defer |
Merge into: | lp:~landscape/landscape-client/amp-trunk |
Diff against target: |
412 lines (+227/-30) 2 files modified
landscape/lib/amp.py (+123/-13) landscape/lib/tests/test_amp.py (+104/-17) |
To merge this branch: | bzr merge lp:~free.ekanayaka/landscape-client/method-call-defer |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Kapil Thangavelu (community) | Approve | ||
Duncan McGreggor (community) | Approve | ||
Review via email: mp+17758@code.launchpad.net |
This proposal supersedes a proposal from 2010-01-20.
Commit message
Description of the change
Free Ekanayaka (free.ekanayaka) wrote : | # |
Duncan McGreggor (oubiwann) wrote : | # |
This is super-cool, Free -- so glad you did this :-)
[1] 1661 tests, all passing -- nice!
[2] This isn't part of this particular change, but there's a bare exception before the deferred check:
| try:
| result = getattr(
| except Exception, error:
| raise MethodCallError
It'd be nice if expected exceptions were handled explicitly here.
[3] You'll want to get Thomas' feedback on this suggestion... but one thing you might want to consider is actually having receive_method_call always return a deferred (with return maybeDeferred(
+1 for merge!
- 216. By Free Ekanayaka
-
Use maybeDeferred to simplify receive_method_call logic (Duncan [3])
Free Ekanayaka (free.ekanayaka) wrote : | # |
Thanks Duncan! It was awesome to get your Landscape "goodbye review" :)
[1]
Yay, thanks buildbot.
[2]
The thing is that this is a very general method call, so in principle any exception could be raised. However whatever gets raised is wrapped around a MethodCallError, so the remote caller can possibly cope with it.
[3]
Great idea! That indeed simplified the logic quite a bit. Thanks.
Kapil Thangavelu (hazmat) wrote : | # |
Nice implementation, thorough test coverage. Looks good to me. +1
Free Ekanayaka (free.ekanayaka) wrote : | # |
Thanks Kapil, that's merged in r185 of amp-trunk.
Preview Diff
1 | === modified file 'landscape/lib/amp.py' |
2 | --- landscape/lib/amp.py 2010-01-20 08:33:12 +0000 |
3 | +++ landscape/lib/amp.py 2010-01-25 08:30:27 +0000 |
4 | @@ -1,8 +1,10 @@ |
5 | """Expose the methods of a remote object over AMP.""" |
6 | |
7 | -from twisted.internet.defer import Deferred |
8 | +from uuid import uuid4 |
9 | +from twisted.internet.defer import Deferred, maybeDeferred |
10 | from twisted.internet.protocol import ServerFactory, ClientFactory |
11 | from twisted.protocols.amp import Argument, String, Command, AMP |
12 | +from twisted.python.failure import Failure |
13 | |
14 | from landscape.lib.bpickle import loads, dumps, dumps_table |
15 | |
16 | @@ -35,11 +37,21 @@ |
17 | ("args", MethodCallArgument()), |
18 | ("kwargs", MethodCallArgument())] |
19 | |
20 | - response = [("result", MethodCallArgument())] |
21 | + response = [("result", MethodCallArgument()), |
22 | + ("deferred", String(optional=True))] |
23 | |
24 | errors = {MethodCallError: "METHOD_CALL_ERROR"} |
25 | |
26 | |
27 | +class DeferredResponse(Command): |
28 | + """Fire a L{Deferred} associated with an outstanding method call result.""" |
29 | + |
30 | + arguments = [("uuid", String()), |
31 | + ("result", MethodCallArgument(optional=True)), |
32 | + ("failure", String(optional=True))] |
33 | + requiresAnswer = False |
34 | + |
35 | + |
36 | class MethodCallServerProtocol(AMP): |
37 | """Expose methods of a local object over AMP. |
38 | |
39 | @@ -68,17 +80,109 @@ |
40 | if not method in self.methods: |
41 | raise MethodCallError("Forbidden method '%s'" % method) |
42 | |
43 | - try: |
44 | - result = getattr(self.factory.object, method)(*args, **kwargs) |
45 | - except Exception, error: |
46 | - raise MethodCallError("Remote exception %s" % str(error)) |
47 | + method_func = getattr(self.factory.object, method) |
48 | + result = maybeDeferred(method_func, *args, **kwargs) |
49 | + |
50 | + # If the Deferred was already fired, we can return its result |
51 | + if result.called: |
52 | + if isinstance(result.result, Failure): |
53 | + failure = str(result.result.value) |
54 | + result.addErrback(lambda error: None) # Stop propagating |
55 | + raise MethodCallError(failure) |
56 | + return {"result": self._check_result(result.result)} |
57 | + |
58 | + uuid = str(uuid4()) |
59 | + result.addBoth(self.send_deferred_response, uuid) |
60 | + return {"result": None, "deferred": uuid} |
61 | + |
62 | + def _check_result(self, result): |
63 | + """Check that the C{result} we're about to return is serializable. |
64 | + |
65 | + @return: The C{result} itself if valid. |
66 | + @raises: L{MethodCallError} if C{result} is not serializable. |
67 | + """ |
68 | if not MethodCallArgument.check(result): |
69 | raise MethodCallError("Non-serializable result") |
70 | - return {"result": result} |
71 | + return result |
72 | + |
73 | + def send_deferred_response(self, result, uuid): |
74 | + """Send a L{DeferredResponse} for the deferred with given C{uuid}. |
75 | + |
76 | + This is called when the result of a L{Deferred} returned by an |
77 | + object's method becomes available. A L{DeferredResponse} notifying |
78 | + such result (either success or failure) is sent to the peer. |
79 | + """ |
80 | + kwargs = {"uuid": uuid} |
81 | + if isinstance(result, Failure): |
82 | + kwargs["failure"] = str(result.value) |
83 | + else: |
84 | + kwargs["result"] = self._check_result(result) |
85 | + self.callRemote(DeferredResponse, **kwargs) |
86 | |
87 | |
88 | class MethodCallClientProtocol(AMP): |
89 | - """Calls methods of a remote object over L{AMP}.""" |
90 | + """Calls methods of a remote object over L{AMP}. |
91 | + |
92 | + @note: If the remote method returns a deferred, the associated local |
93 | + deferred returned by L{send_method_call} will result in the same |
94 | + callback value of the remote deferred. |
95 | + @cvar timeout: A timeout for remote methods returning L{Deferred}s, if a |
96 | + response for the deferred is not received within this amount of |
97 | + seconds, the remote method call will errback with a L{MethodCallError}. |
98 | + """ |
99 | + timeout = 60 |
100 | + |
101 | + def __init__(self): |
102 | + AMP.__init__(self) |
103 | + self._pending_responses = {} |
104 | + |
105 | + @DeferredResponse.responder |
106 | + def receive_deferred_response(self, uuid, result, failure): |
107 | + """Receive the deferred L{MethodCall} response. |
108 | + |
109 | + @param uuid: The id of the L{MethodCall} we're getting the result of. |
110 | + @param result: The result of the associated deferred if successful. |
111 | + @param failure: The failure message of the deferred if it failed. |
112 | + """ |
113 | + self.fire_pending_response(uuid, result, failure) |
114 | + return {} |
115 | + |
116 | + def fire_pending_response(self, uuid, result, failure): |
117 | + """Fire the L{Deferred} associated with a pending response. |
118 | + |
119 | + @param uuid: The id of the L{MethodCall} we're getting the result of. |
120 | + @param result: The result of the associated deferred if successful. |
121 | + @param failure: The failure message of the deferred if it failed. |
122 | + """ |
123 | + try: |
124 | + deferred, call = self._pending_responses.pop(uuid) |
125 | + except KeyError: |
126 | + # Late response for a request that has timeout, just ignore it |
127 | + return |
128 | + if not call.called: |
129 | + call.cancel() |
130 | + if failure is None: |
131 | + deferred.callback({"result": result}) |
132 | + else: |
133 | + deferred.errback(MethodCallError(failure)) |
134 | + |
135 | + def handle_response(self, response): |
136 | + """Handle a L{MethodCall} response. |
137 | + |
138 | + If the response is tagged as deferred, it will be queued as pending, |
139 | + and a L{Deferred} is returned, which will be fired as soon as the |
140 | + final response becomes available, or the timeout is reached. |
141 | + """ |
142 | + if response["deferred"]: |
143 | + uuid = response["deferred"] |
144 | + deferred = Deferred() |
145 | + call = self.factory.reactor.callLater(self.timeout, |
146 | + self.fire_pending_response, |
147 | + uuid, None, "timeout") |
148 | + self._pending_responses[uuid] = (deferred, call) |
149 | + return deferred |
150 | + |
151 | + return response |
152 | |
153 | def send_method_call(self, method, args=[], kwargs={}): |
154 | """Send a L{MethodCall} command with the given arguments. |
155 | @@ -87,8 +191,12 @@ |
156 | @param args: The positional arguments to pass to the remote method. |
157 | @param kwargs: The keyword arguments to pass to the remote method. |
158 | """ |
159 | - return self.callRemote(MethodCall, |
160 | - method=method, args=args, kwargs=kwargs) |
161 | + result = self.callRemote(MethodCall, |
162 | + method=method, args=args, kwargs=kwargs) |
163 | + # The result can be C{None} only if the requested command is a |
164 | + # DeferredResponse, which has requiresAnswer set to False |
165 | + if result is not None: |
166 | + return result.addCallback(self.handle_response) |
167 | |
168 | |
169 | class MethodCallServerFactory(ServerFactory): |
170 | @@ -111,16 +219,18 @@ |
171 | |
172 | def __init__(self, reactor, notifier): |
173 | """ |
174 | - @param reactor: The reactor used to schedule connection callbacks. |
175 | + @param reactor: The reactor used by the created protocols to schedule |
176 | + notifications and timeouts. |
177 | @param notifier: A function that will be called when the connection is |
178 | established. It will be passed the protocol instance as argument. |
179 | """ |
180 | - self._reactor = reactor |
181 | + self.reactor = reactor |
182 | self._notifier = notifier |
183 | |
184 | def buildProtocol(self, addr): |
185 | protocol = self.protocol() |
186 | - self._reactor.callLater(0, self._notifier, protocol) |
187 | + protocol.factory = self |
188 | + self.reactor.callLater(0, self._notifier, protocol) |
189 | return protocol |
190 | |
191 | |
192 | |
193 | === modified file 'landscape/lib/tests/test_amp.py' |
194 | --- landscape/lib/tests/test_amp.py 2010-01-20 08:33:12 +0000 |
195 | +++ landscape/lib/tests/test_amp.py 2010-01-25 08:30:27 +0000 |
196 | @@ -1,9 +1,11 @@ |
197 | from twisted.internet import reactor |
198 | +from twisted.internet.defer import Deferred |
199 | from twisted.internet.protocol import ClientCreator |
200 | |
201 | from landscape.lib.amp import ( |
202 | - MethodCallError, MethodCallServerProtocol, MethodCallClientProtocol, |
203 | - MethodCallServerFactory, RemoteObject, RemoteObjectCreator) |
204 | + MethodCallError, MethodCallServerProtocol, |
205 | + MethodCallClientProtocol, MethodCallServerFactory, |
206 | + MethodCallClientFactory, RemoteObject, RemoteObjectCreator) |
207 | from landscape.tests.helpers import LandscapeTest |
208 | |
209 | |
210 | @@ -60,6 +62,24 @@ |
211 | def translate(self, word): |
212 | raise WordsException("Unknown word") |
213 | |
214 | + def google(self, word): |
215 | + deferred = Deferred() |
216 | + if word == "Landscape": |
217 | + reactor.callLater(0.01, lambda: deferred.callback("Cool!")) |
218 | + elif word == "Easy query": |
219 | + deferred.callback("Done!") |
220 | + elif word == "Weird stuff": |
221 | + reactor.callLater(0.01, lambda: deferred.errback(Exception("bad"))) |
222 | + elif word == "Censored": |
223 | + deferred.errback(Exception("very bad")) |
224 | + elif word == "Long query": |
225 | + # Do nothing, the deferred won't be fired at all |
226 | + pass |
227 | + elif word == "Slowish query": |
228 | + # Fire the result after a while. |
229 | + reactor.callLater(0.05, lambda: deferred.callback("Done!")) |
230 | + return deferred |
231 | + |
232 | |
233 | class WordsProtocol(MethodCallServerProtocol): |
234 | |
235 | @@ -72,7 +92,8 @@ |
236 | "multiply_alphabetically", |
237 | "translate", |
238 | "meaning_of_life", |
239 | - "guess"] |
240 | + "guess", |
241 | + "google"] |
242 | |
243 | |
244 | class MethodCallProtocolTest(LandscapeTest): |
245 | @@ -114,7 +135,8 @@ |
246 | result = self.protocol.send_method_call(method="empty", |
247 | args=[], |
248 | kwargs={}) |
249 | - return self.assertSuccess(result, {"result": None}) |
250 | + return self.assertSuccess(result, {"result": None, |
251 | + "deferred": None}) |
252 | |
253 | def test_with_return_value(self): |
254 | """ |
255 | @@ -124,7 +146,8 @@ |
256 | result = self.protocol.send_method_call(method="motd", |
257 | args=[], |
258 | kwargs={}) |
259 | - return self.assertSuccess(result, {"result": "Words are cool"}) |
260 | + return self.assertSuccess(result, {"result": "Words are cool", |
261 | + "deferred": None}) |
262 | |
263 | def test_with_one_argument(self): |
264 | """ |
265 | @@ -134,7 +157,8 @@ |
266 | result = self.protocol.send_method_call(method="capitalize", |
267 | args=["john"], |
268 | kwargs={}) |
269 | - return self.assertSuccess(result, {"result": "John"}) |
270 | + return self.assertSuccess(result, {"result": "John", |
271 | + "deferred": None}) |
272 | |
273 | def test_with_boolean_return_value(self): |
274 | """ |
275 | @@ -143,7 +167,8 @@ |
276 | result = self.protocol.send_method_call(method="is_short", |
277 | args=["hi"], |
278 | kwargs={}) |
279 | - return self.assertSuccess(result, {"result": True}) |
280 | + return self.assertSuccess(result, {"result": True, |
281 | + "deferred": None}) |
282 | |
283 | def test_with_many_arguments(self): |
284 | """ |
285 | @@ -152,7 +177,8 @@ |
286 | result = self.protocol.send_method_call(method="concatenate", |
287 | args=["You ", "rock"], |
288 | kwargs={}) |
289 | - return self.assertSuccess(result, {"result": "You rock"}) |
290 | + return self.assertSuccess(result, {"result": "You rock", |
291 | + "deferred": None}) |
292 | |
293 | def test_with_default_arguments(self): |
294 | """ |
295 | @@ -162,7 +188,8 @@ |
296 | result = self.protocol.send_method_call(method="lower_case", |
297 | args=["OHH"], |
298 | kwargs={}) |
299 | - return self.assertSuccess(result, {"result": "ohh"}) |
300 | + return self.assertSuccess(result, {"result": "ohh", |
301 | + "deferred": None}) |
302 | |
303 | def test_with_overriden_default_arguments(self): |
304 | """ |
305 | @@ -173,7 +200,8 @@ |
306 | result = self.protocol.send_method_call(method="lower_case", |
307 | args=["OHH"], |
308 | kwargs={"index": 2}) |
309 | - return self.assertSuccess(result, {"result": "OHh"}) |
310 | + return self.assertSuccess(result, {"result": "OHh", |
311 | + "deferred": None}) |
312 | |
313 | def test_with_dictionary_arguments(self): |
314 | """ |
315 | @@ -183,7 +211,8 @@ |
316 | "alphabetically", |
317 | args=[{"foo": 2, "bar": 3}], |
318 | kwargs={}) |
319 | - return self.assertSuccess(result, {"result": "barbarbarfoofoo"}) |
320 | + return self.assertSuccess(result, {"result": "barbarbarfoofoo", |
321 | + "deferred": None}) |
322 | |
323 | def test_with_non_serializable_return_value(self): |
324 | """ |
325 | @@ -211,17 +240,19 @@ |
326 | def setUp(self): |
327 | super(RemoteObjectTest, self).setUp() |
328 | socket = self.mktemp() |
329 | - factory = MethodCallServerFactory(Words()) |
330 | - factory.protocol = WordsProtocol |
331 | - self.port = reactor.listenUNIX(socket, factory) |
332 | + server_factory = MethodCallServerFactory(Words()) |
333 | + server_factory.protocol = WordsProtocol |
334 | + self.port = reactor.listenUNIX(socket, server_factory) |
335 | |
336 | def set_remote(protocol): |
337 | self.protocol = protocol |
338 | self.words = RemoteObject(protocol) |
339 | |
340 | - connector = ClientCreator(reactor, MethodCallClientProtocol) |
341 | - connected = connector.connectUNIX(socket) |
342 | - return connected.addCallback(set_remote) |
343 | + connected = Deferred() |
344 | + connected.addCallback(set_remote) |
345 | + client_factory = MethodCallClientFactory(reactor, connected.callback) |
346 | + reactor.connectUNIX(socket, client_factory) |
347 | + return connected |
348 | |
349 | def tearDown(self): |
350 | self.protocol.transport.loseConnection() |
351 | @@ -319,6 +350,62 @@ |
352 | result = self.words.guess("word", "cool", value=4) |
353 | return self.assertSuccess(result, "Guessed!") |
354 | |
355 | + def test_with_success_full_deferred(self): |
356 | + """ |
357 | + If the target object method returns a L{Deferred}, it is handled |
358 | + transparently. |
359 | + """ |
360 | + result = self.words.google("Landscape") |
361 | + return self.assertSuccess(result, "Cool!") |
362 | + |
363 | + def test_with_failing_deferred(self): |
364 | + """ |
365 | + If the target object method returns a failing L{Deferred}, a |
366 | + L{MethodCallError} is raised. |
367 | + """ |
368 | + result = self.words.google("Weird stuff") |
369 | + return self.assertFailure(result, MethodCallError) |
370 | + |
371 | + def test_with_already_callback_deferred(self): |
372 | + """ |
373 | + The target object method can return an already fired L{Deferred}. |
374 | + """ |
375 | + result = self.words.google("Easy query") |
376 | + return self.assertSuccess(result, "Done!") |
377 | + |
378 | + def test_with_already_errback_deferred(self): |
379 | + """ |
380 | + If the target object method can return an already failed L{Deferred}. |
381 | + """ |
382 | + result = self.words.google("Censored") |
383 | + return self.assertFailure(result, MethodCallError) |
384 | + |
385 | + def test_with_deferred_timeout(self): |
386 | + """ |
387 | + If the peer protocol doesn't send a response for a deferred within |
388 | + the given timeout, the method call fails. |
389 | + """ |
390 | + self.protocol.timeout = 0.1 |
391 | + result = self.words.google("Long query") |
392 | + return self.assertFailure(result, MethodCallError) |
393 | + |
394 | + def test_with_late_response(self): |
395 | + """ |
396 | + If the peer protocol sends a late response for a request that has |
397 | + already timeout, that response is ignored. |
398 | + """ |
399 | + self.protocol.timeout = 0.01 |
400 | + result = self.words.google("Slowish query") |
401 | + self.assertFailure(result, MethodCallError) |
402 | + |
403 | + def assert_late_response_is_handled(ignored): |
404 | + deferred = Deferred() |
405 | + # We wait a bit to be sure that the late response gets delivered |
406 | + reactor.callLater(0.1, lambda: deferred.callback(None)) |
407 | + return deferred |
408 | + |
409 | + return result.addCallback(assert_late_response_is_handled) |
410 | + |
411 | |
412 | class RemoteObjectCreatorTest(LandscapeTest): |
413 |
Please note that this mechanism is mainly meant for unit-testing. In general our code will not really use this feature or rely on it, because we don't generally wait at all for remote calls that return deferreds. However when unit-testing it is often needed to wait for a remote call to complete, and this branch makes it easy.