Merge lp:~free.ekanayaka/landscape-client/service-protocol into lp:~landscape/landscape-client/amp-trunk
- service-protocol
- Merge into amp-trunk
Proposed by
Free Ekanayaka
Status: | Merged |
---|---|
Merge reported by: | Free Ekanayaka |
Merged at revision: | not available |
Proposed branch: | lp:~free.ekanayaka/landscape-client/service-protocol |
Merge into: | lp:~landscape/landscape-client/amp-trunk |
Diff against target: |
1773 lines (+1652/-11) (has conflicts) 9 files modified
landscape/__init__.py (+5/-0) landscape/amp.py (+75/-0) landscape/broker/amp.py (+21/-8) landscape/broker/tests/helpers.py (+17/-0) landscape/broker/tests/test_server.py (+217/-0) landscape/lib/amp.py (+461/-0) landscape/lib/tests/test_amp.py (+716/-0) landscape/reactor.py (+14/-3) landscape/tests/test_amp.py (+126/-0) Text conflict in landscape/__init__.py Text conflict in landscape/broker/amp.py Text conflict in landscape/broker/tests/helpers.py Text conflict in landscape/broker/tests/test_server.py Text conflict in landscape/lib/amp.py Text conflict in landscape/lib/tests/test_amp.py |
To merge this branch: | bzr merge lp:~free.ekanayaka/landscape-client/service-protocol |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Kapil Thangavelu (community) | Approve | ||
Kevin McDermott (community) | Approve | ||
Review via email: mp+20579@code.launchpad.net |
Commit message
Description of the change
To post a comment you must log in.
- 223. By Free Ekanayaka
-
Remove unused max_retries
Revision history for this message
Free Ekanayaka (free.ekanayaka) wrote : | # |
Thanks Kevin!
@reviewers: For some reason the diff showed below is gigantic, but the actual one is only 282 lines (maybe LP is having troubles with criss-cross merges), however merging this branch to amp-trunk locally with bzr should be just fine.
- 224. By Free Ekanayaka
-
Simplified MethodCall classes by introducing a factory that can be used both for server-side and client-side protocols
Revision history for this message
Kapil Thangavelu (hazmat) wrote : | # |
looks good to me, nice work +1
review:
Approve
- 225. By Free Ekanayaka
-
Drop the Landscape suffix for sake of brevity
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file 'landscape/__init__.py' |
2 | --- landscape/__init__.py 2010-03-03 17:06:50 +0000 |
3 | +++ landscape/__init__.py 2010-03-05 12:44:28 +0000 |
4 | @@ -1,5 +1,10 @@ |
5 | +<<<<<<< TREE |
6 | DEBIAN_REVISION = "" |
7 | UPSTREAM_VERSION = "1.4.4" |
8 | +======= |
9 | +DEBIAN_REVISION = "-0ubuntu0.9.10.0" |
10 | +UPSTREAM_VERSION = "1.4.0" |
11 | +>>>>>>> MERGE-SOURCE |
12 | VERSION = "%s%s" % (UPSTREAM_VERSION, DEBIAN_REVISION) |
13 | |
14 | # The "server-api" field of outgoing messages will be set to this value, and |
15 | |
16 | === added file 'landscape/amp.py' |
17 | --- landscape/amp.py 1970-01-01 00:00:00 +0000 |
18 | +++ landscape/amp.py 2010-03-05 12:44:28 +0000 |
19 | @@ -0,0 +1,75 @@ |
20 | +import os |
21 | +import logging |
22 | + |
23 | +from landscape.lib.amp import ( |
24 | + MethodCallProtocol, MethodCallFactory, RemoteObjectCreator) |
25 | + |
26 | + |
27 | +class LandscapeComponentProtocol(MethodCallProtocol): |
28 | + """Communication protocol between the various Landscape components. |
29 | + |
30 | + It can be used both as server-side protocol for exposing the methods of a |
31 | + certain Landscape component, or as client-side protocol for connecting to |
32 | + another Landscape component we want to call the methods of. |
33 | + """ |
34 | + methods = ["ping", "exit"] |
35 | + timeout = 60 |
36 | + |
37 | + |
38 | +class LandscapeComponentFactory(MethodCallFactory): |
39 | + |
40 | + protocol = LandscapeComponentProtocol |
41 | + |
42 | + |
43 | +class RemoteLandscapeComponentCreator(RemoteObjectCreator): |
44 | + """Utility superclass for creating connections with a Landscape component. |
45 | + |
46 | + @cvar component: The class of the component to connect to, it is expected |
47 | + to define a C{name} class attribute, which will be used to find out |
48 | + the socket to use. It must be defined by sub-classes. |
49 | + """ |
50 | + |
51 | + factory = LandscapeComponentFactory |
52 | + |
53 | + def __init__(self, reactor, config, *args, **kwargs): |
54 | + """ |
55 | + @param reactor: A L{TwistedReactor} object. |
56 | + @param config: A L{LandscapeConfiguration}. |
57 | + @param args: Positional arguments for protocol factory constructor. |
58 | + @param kwargs: Keyword arguments for protocol factory constructor. |
59 | + |
60 | + @see: L{MethodCallClientFactory}. |
61 | + """ |
62 | + self._twisted_reactor = reactor |
63 | + socket = os.path.join(config.data_path, self.component.name + ".sock") |
64 | + super(RemoteLandscapeComponentCreator, self).__init__( |
65 | + self._twisted_reactor._reactor, socket, *args, **kwargs) |
66 | + |
67 | + def connect(self, max_retries=None): |
68 | + """Connect to the remote Landscape component. |
69 | + |
70 | + If the connection is lost after having been established, and then |
71 | + it is established again by the reconnect mechanism, an event will |
72 | + be fired. |
73 | + |
74 | + @param max_retries: If given, the connector will keep trying to connect |
75 | + up to that number of times, if the first connection attempt fails. |
76 | + """ |
77 | + |
78 | + def fire_reconnect(remote): |
79 | + self._twisted_reactor.fire("%s-reconnect" % |
80 | + self.component.name) |
81 | + |
82 | + def connected(remote): |
83 | + self._factory.add_notifier(fire_reconnect) |
84 | + return remote |
85 | + |
86 | + def log_error(failure): |
87 | + logging.error("Error while connecting to %s", self.component.name) |
88 | + return failure |
89 | + |
90 | + result = super(RemoteLandscapeComponentCreator, self).connect( |
91 | + max_retries=max_retries) |
92 | + result.addErrback(log_error) |
93 | + result.addCallback(connected) |
94 | + return result |
95 | |
96 | === modified file 'landscape/broker/amp.py' |
97 | --- landscape/broker/amp.py 2010-02-11 14:29:50 +0000 |
98 | +++ landscape/broker/amp.py 2010-03-05 12:44:28 +0000 |
99 | @@ -1,8 +1,21 @@ |
100 | -class RemoteClient(object): |
101 | - """A connected client utilizing features provided by a L{BrokerServer}.""" |
102 | - |
103 | - def __init__(self, name): |
104 | - """ |
105 | - @param name: Name of the broker client. |
106 | - """ |
107 | - self.name = name |
108 | +<<<<<<< TREE |
109 | +class RemoteClient(object): |
110 | + """A connected client utilizing features provided by a L{BrokerServer}.""" |
111 | + |
112 | + def __init__(self, name): |
113 | + """ |
114 | + @param name: Name of the broker client. |
115 | + """ |
116 | + self.name = name |
117 | +======= |
118 | +class RemoteClient(object): |
119 | + """A connected client utilizing features provided by a L{BrokerServer}.""" |
120 | + |
121 | + def __init__(self, name): |
122 | + """ |
123 | + @param name: Name of the broker client. |
124 | + @param protocol: A L{BrokerServerProtocol} connection with the broker |
125 | + server. |
126 | + """ |
127 | + self.name = name |
128 | +>>>>>>> MERGE-SOURCE |
129 | |
130 | === modified file 'landscape/broker/tests/helpers.py' |
131 | --- landscape/broker/tests/helpers.py 2010-02-11 14:29:50 +0000 |
132 | +++ landscape/broker/tests/helpers.py 2010-03-05 12:44:28 +0000 |
133 | @@ -99,6 +99,7 @@ |
134 | test_case.config, test_case.identity, test_case.reactor, |
135 | test_case.exchanger, test_case.pinger, test_case.mstore, |
136 | fetch_async=fetch_func) |
137 | +<<<<<<< TREE |
138 | |
139 | |
140 | class BrokerServerHelper(RegistrationHelper): |
141 | @@ -113,3 +114,19 @@ |
142 | test_case.broker = BrokerServer(test_case.config, test_case.reactor, |
143 | test_case.exchanger, test_case.handler, |
144 | test_case.mstore) |
145 | +======= |
146 | + |
147 | + |
148 | +class BrokerServerHelper(RegistrationHelper): |
149 | + """ |
150 | + This helper adds a broker server to the L{RegistrationHelper}. The |
151 | + following attributes will be set in your test case: |
152 | + - server: A L{BrokerServer}. |
153 | + """ |
154 | + |
155 | + def set_up(self, test_case): |
156 | + super(BrokerServerHelper, self).set_up(test_case) |
157 | + test_case.broker = BrokerServer(test_case.config, test_case.reactor, |
158 | + test_case.exchanger, test_case.handler, |
159 | + test_case.mstore) |
160 | +>>>>>>> MERGE-SOURCE |
161 | |
162 | === modified file 'landscape/broker/tests/test_server.py' |
163 | --- landscape/broker/tests/test_server.py 2010-03-03 16:17:11 +0000 |
164 | +++ landscape/broker/tests/test_server.py 2010-03-05 12:44:28 +0000 |
165 | @@ -1,3 +1,4 @@ |
166 | +<<<<<<< TREE |
167 | from twisted.internet.defer import succeed, fail |
168 | |
169 | from landscape.broker.amp import RemoteClient |
170 | @@ -212,3 +213,219 @@ |
171 | self.reactor.call_on("pre-exit", pre_exit) |
172 | self.reactor.call_on("post-exit", post_exit) |
173 | return self.assertSuccess(self.broker.exit()) |
174 | +======= |
175 | +from twisted.internet.defer import succeed, fail |
176 | + |
177 | +from landscape.broker.amp import RemoteClient |
178 | +from landscape.tests.helpers import LandscapeTest, DEFAULT_ACCEPTED_TYPES |
179 | +from landscape.broker.tests.helpers import BrokerServerHelper |
180 | + |
181 | + |
182 | +class BrokerServerTest(LandscapeTest): |
183 | + |
184 | + helpers = [BrokerServerHelper] |
185 | + |
186 | + def test_ping(self): |
187 | + """ |
188 | + The L{BrokerServer.ping} simply returns C{True}. |
189 | + """ |
190 | + self.assertTrue(self.broker.ping()) |
191 | + |
192 | + def test_send_message(self): |
193 | + """ |
194 | + The L{BrokerServer.send_message} method forwards a message to the |
195 | + broker's exchanger. |
196 | + """ |
197 | + message = {"type": "test"} |
198 | + self.mstore.set_accepted_types(["test"]) |
199 | + self.broker.send_message(message) |
200 | + self.assertMessages(self.mstore.get_pending_messages(), [message]) |
201 | + self.assertFalse(self.exchanger.is_urgent()) |
202 | + |
203 | + def test_send_message_with_urgent(self): |
204 | + """ |
205 | + The L{BrokerServer.send_message} can optionally specify the urgency |
206 | + of the message. |
207 | + """ |
208 | + message = {"type": "test"} |
209 | + self.mstore.set_accepted_types(["test"]) |
210 | + self.broker.send_message(message, True) |
211 | + self.assertMessages(self.mstore.get_pending_messages(), [message]) |
212 | + self.assertTrue(self.exchanger.is_urgent()) |
213 | + |
214 | + def test_is_pending(self): |
215 | + """ |
216 | + The L{BrokerServer.is_pending} method indicates if a message with |
217 | + the given id is pending waiting for delivery in the message store. |
218 | + """ |
219 | + self.assertFalse(self.broker.is_message_pending(123)) |
220 | + message = {"type": "test"} |
221 | + self.mstore.set_accepted_types(["test"]) |
222 | + message_id = self.broker.send_message(message) |
223 | + self.assertTrue(self.broker.is_message_pending(message_id)) |
224 | + |
225 | + def test_register_client(self): |
226 | + """ |
227 | + The L{BrokerServer.register_client} method can be used to register |
228 | + client components that need to communicate with the server. After |
229 | + the registration they can be fetched with L{BrokerServer.get_clients}. |
230 | + """ |
231 | + self.assertEquals(self.broker.get_clients(), []) |
232 | + self.broker.register_client("test") |
233 | + [client] = self.broker.get_clients() |
234 | + self.assertTrue(isinstance(client, RemoteClient)) |
235 | + self.assertEquals(client.name, "test") |
236 | + |
237 | + def test_stop_clients(self): |
238 | + """ |
239 | + The L{BrokerServer.stop_clients} method calls the C{exit} method |
240 | + of each registered client, and returns a deferred resulting in C{None} |
241 | + if all C{exit} calls were successful. |
242 | + """ |
243 | + self.broker.register_client("foo") |
244 | + self.broker.register_client("bar") |
245 | + for client in self.broker.get_clients(): |
246 | + client.exit = self.mocker.mock() |
247 | + self.expect(client.exit()).result(succeed(None)) |
248 | + self.mocker.replay() |
249 | + return self.assertSuccess(self.broker.stop_clients()) |
250 | + |
251 | + def test_stop_clients_with_failure(self): |
252 | + """ |
253 | + The L{BrokerServer.stop_clients} method calls the C{exit} method |
254 | + of each registered client, and returns a deferred resulting in C{None} |
255 | + if all C{exit} calls were successful. |
256 | + """ |
257 | + self.broker.register_client("foo") |
258 | + self.broker.register_client("bar") |
259 | + [client1, client2] = self.broker.get_clients() |
260 | + client1.exit = self.mocker.mock() |
261 | + client2.exit = self.mocker.mock() |
262 | + self.expect(client1.exit()).result(succeed(None)) |
263 | + self.expect(client2.exit()).result(fail(Exception())) |
264 | + self.mocker.replay() |
265 | + return self.assertFailure(self.broker.stop_clients(), Exception) |
266 | + |
267 | + def test_reload_configuration(self): |
268 | + """ |
269 | + The L{BrokerServer.reload_configuration} method forces the config |
270 | + file associated with the broker server to be reloaded. |
271 | + """ |
272 | + open(self.config_filename, "a").write("computer_title = New Title") |
273 | + result = self.broker.reload_configuration() |
274 | + result.addCallback(lambda x: self.assertEquals( |
275 | + self.config.computer_title, "New Title")) |
276 | + return result |
277 | + |
278 | + def test_reload_configuration_stops_clients(self): |
279 | + """ |
280 | + The L{BrokerServer.reload_configuration} method forces the config |
281 | + file associated with the broker server to be reloaded. |
282 | + """ |
283 | + self.broker.register_client("foo") |
284 | + self.broker.register_client("bar") |
285 | + for client in self.broker.get_clients(): |
286 | + client.exit = self.mocker.mock() |
287 | + self.expect(client.exit()).result(succeed(None)) |
288 | + self.mocker.replay() |
289 | + return self.assertSuccess(self.broker.reload_configuration()) |
290 | + |
291 | + def test_register(self): |
292 | + """ |
293 | + The L{BrokerServer.register} method attempts to register with the |
294 | + Ladscape server and waits for a C{set-id} message from it. |
295 | + """ |
296 | + registered = self.broker.register() |
297 | + # This should callback the deferred. |
298 | + self.exchanger.handle_message({"type": "set-id", "id": "abc", |
299 | + "insecure-id": "def"}) |
300 | + return self.assertSuccess(registered) |
301 | + |
302 | + def test_get_accepted_types_empty(self): |
303 | + """ |
304 | + The L{BrokerServer.get_accepted_message_types} returns an empty list |
305 | + if no message types are accepted by the Landscape server. |
306 | + """ |
307 | + self.mstore.set_accepted_types([]) |
308 | + self.assertEquals(self.broker.get_accepted_message_types(), []) |
309 | + |
310 | + def test_get_accepted_message_types(self): |
311 | + """ |
312 | + The L{BrokerServer.get_accepted_message_types} returns the list of |
313 | + message types accepted by the Landscape server. |
314 | + """ |
315 | + self.mstore.set_accepted_types(["foo", "bar"]) |
316 | + self.assertEquals(sorted(self.broker.get_accepted_message_types()), |
317 | + ["bar", "foo"]) |
318 | + |
319 | + def test_get_server_uuid_with_unset_uuid(self): |
320 | + """ |
321 | + The L{BrokerServer.get_server_uuid} method returns C{None} if the uuid |
322 | + of the Landscape server we're pointing at is unknown. |
323 | + """ |
324 | + self.assertEquals(self.broker.get_server_uuid(), None) |
325 | + |
326 | + def test_get_server_uuid(self): |
327 | + """ |
328 | + The L{BrokerServer.get_server_uuid} method returns the uuid of the |
329 | + Landscape server we're pointing at. |
330 | + """ |
331 | + self.mstore.set_server_uuid("the-uuid") |
332 | + self.assertEquals(self.broker.get_server_uuid(), "the-uuid") |
333 | + |
334 | + def test_register_client_accepted_message_type(self): |
335 | + """ |
336 | + The L{BrokerServer.register_client_accepted_message_type} method can |
337 | + register new message types accepted by this Landscape client. |
338 | + """ |
339 | + self.broker.register_client_accepted_message_type("type1") |
340 | + self.broker.register_client_accepted_message_type("type2") |
341 | + self.assertEquals(self.exchanger.get_client_accepted_message_types(), |
342 | + sorted(["type1", "type2"] + DEFAULT_ACCEPTED_TYPES)) |
343 | + |
344 | + def test_exit(self): |
345 | + """ |
346 | + The L{BrokerServer.exit} method stops all registered clients. |
347 | + """ |
348 | + self.broker.register_client("foo") |
349 | + self.broker.register_client("bar") |
350 | + for client in self.broker.get_clients(): |
351 | + client.exit = self.mocker.mock() |
352 | + self.expect(client.exit()).result(succeed(None)) |
353 | + self.mocker.replay() |
354 | + return self.assertSuccess(self.broker.exit()) |
355 | + |
356 | + def test_exit_exits_when_other_daemons_blow_up(self): |
357 | + """ |
358 | + If a broker client blow up in its exit() methods, exit should ignore |
359 | + the error and exit anyway. |
360 | + """ |
361 | + self.broker.register_client("foo") |
362 | + [client] = self.broker.get_clients() |
363 | + client.exit = self.mocker.mock() |
364 | + post_exit = self.mocker.mock() |
365 | + self.expect(client.exit()).result(fail(ZeroDivisionError())) |
366 | + post_exit() |
367 | + self.mocker.replay() |
368 | + self.reactor.call_on("post-exit", post_exit) |
369 | + return self.assertSuccess(self.broker.exit()) |
370 | + |
371 | + def test_exit_fires_reactor_events(self): |
372 | + """ |
373 | + The L{BrokerServer.exit} method fires a C{pre-exit} event before the |
374 | + clients are stopped and a C{post-exit} event after. |
375 | + """ |
376 | + self.broker.register_client("foo") |
377 | + [client] = self.broker.get_clients() |
378 | + self.mocker.order() |
379 | + pre_exit = self.mocker.mock() |
380 | + client.exit = self.mocker.mock() |
381 | + post_exit = self.mocker.mock() |
382 | + pre_exit() |
383 | + self.expect(client.exit()).result(fail(ZeroDivisionError())) |
384 | + post_exit() |
385 | + self.mocker.replay() |
386 | + self.reactor.call_on("pre-exit", pre_exit) |
387 | + self.reactor.call_on("post-exit", post_exit) |
388 | + return self.assertSuccess(self.broker.exit()) |
389 | +>>>>>>> MERGE-SOURCE |
390 | |
391 | === modified file 'landscape/lib/amp.py' |
392 | --- landscape/lib/amp.py 2010-02-08 08:10:48 +0000 |
393 | +++ landscape/lib/amp.py 2010-03-05 12:44:28 +0000 |
394 | @@ -1,3 +1,4 @@ |
395 | +<<<<<<< TREE |
396 | """Expose the methods of a remote object over AMP.""" |
397 | |
398 | from uuid import uuid4 |
399 | @@ -456,3 +457,463 @@ |
400 | """Disconnect the L{RemoteObject} that we have created.""" |
401 | self._factory.stopTrying() |
402 | self._remote._protocol.transport.loseConnection() |
403 | +======= |
404 | +"""Expose the methods of a remote object over AMP.""" |
405 | + |
406 | +from uuid import uuid4 |
407 | +from twisted.internet.defer import Deferred, maybeDeferred |
408 | +from twisted.internet.protocol import ReconnectingClientFactory |
409 | +from twisted.protocols.amp import Argument, String, Command, AMP |
410 | +from twisted.python.failure import Failure |
411 | + |
412 | +from landscape.lib.bpickle import loads, dumps, dumps_table |
413 | + |
414 | + |
415 | +class MethodCallArgument(Argument): |
416 | + """A bpickle-compatible argument.""" |
417 | + |
418 | + def toString(self, inObject): |
419 | + """Serialize an argument.""" |
420 | + return dumps(inObject) |
421 | + |
422 | + def fromString(self, inString): |
423 | + """Unserialize an argument.""" |
424 | + return loads(inString) |
425 | + |
426 | + @classmethod |
427 | + def check(cls, inObject): |
428 | + """Check if an argument is serializable.""" |
429 | + return type(inObject) in dumps_table |
430 | + |
431 | + |
432 | +class MethodCallError(Exception): |
433 | + """Raised when a L{MethodCall} command fails.""" |
434 | + |
435 | + |
436 | +class MethodCall(Command): |
437 | + """Call a method on the object exposed by a L{MethodCallProtocol}.""" |
438 | + |
439 | + arguments = [("method", String()), |
440 | + ("args", MethodCallArgument()), |
441 | + ("kwargs", MethodCallArgument())] |
442 | + |
443 | + response = [("result", MethodCallArgument()), |
444 | + ("deferred", String(optional=True))] |
445 | + |
446 | + errors = {MethodCallError: "METHOD_CALL_ERROR"} |
447 | + |
448 | + |
449 | +class DeferredResponse(Command): |
450 | + """Fire a L{Deferred} associated with an outstanding method call result.""" |
451 | + |
452 | + arguments = [("uuid", String()), |
453 | + ("result", MethodCallArgument(optional=True)), |
454 | + ("failure", String(optional=True))] |
455 | + requiresAnswer = False |
456 | + |
457 | + |
458 | +class MethodCallServerProtocol(AMP): |
459 | + """Expose methods of a local object over AMP. |
460 | + |
461 | + The object to be exposed is expected to be the C{object} attribute of our |
462 | + protocol factory. |
463 | + |
464 | + @cvar methods: The list of exposed object's methods that can be called with |
465 | + the protocol. It must be defined by sub-classes. |
466 | + """ |
467 | + |
468 | + methods = [] |
469 | + |
470 | + @MethodCall.responder |
471 | + def receive_method_call(self, method, args, kwargs): |
472 | + """Call an object's method with the given arguments. |
473 | + |
474 | + If a connected client sends a L{MethodCall} for method C{foo_bar}, then |
475 | + the actual method C{foo_bar} of the object associated with the protocol |
476 | + will be called with the given C{args} and C{kwargs} and its return |
477 | + value delivered back to the client as response to the command. |
478 | + |
479 | + @param method: The name of the object's method to call. |
480 | + @param args: The arguments to pass to the method. |
481 | + @param kwargs: The keywords arguments to pass to the method. |
482 | + """ |
483 | + if not method in self.methods: |
484 | + raise MethodCallError("Forbidden method '%s'" % method) |
485 | + |
486 | + method_func = getattr(self.factory.object, method) |
487 | + result = maybeDeferred(method_func, *args, **kwargs) |
488 | + |
489 | + # If the Deferred was already fired, we can return its result |
490 | + if result.called: |
491 | + if isinstance(result.result, Failure): |
492 | + failure = str(result.result.value) |
493 | + result.addErrback(lambda error: None) # Stop propagating |
494 | + raise MethodCallError(failure) |
495 | + return {"result": self._check_result(result.result)} |
496 | + |
497 | + uuid = str(uuid4()) |
498 | + result.addBoth(self.send_deferred_response, uuid) |
499 | + return {"result": None, "deferred": uuid} |
500 | + |
501 | + def _check_result(self, result): |
502 | + """Check that the C{result} we're about to return is serializable. |
503 | + |
504 | + @return: The C{result} itself if valid. |
505 | + @raises: L{MethodCallError} if C{result} is not serializable. |
506 | + """ |
507 | + if not MethodCallArgument.check(result): |
508 | + raise MethodCallError("Non-serializable result") |
509 | + return result |
510 | + |
511 | + def send_deferred_response(self, result, uuid): |
512 | + """Send a L{DeferredResponse} for the deferred with given C{uuid}. |
513 | + |
514 | + This is called when the result of a L{Deferred} returned by an |
515 | + object's method becomes available. A L{DeferredResponse} notifying |
516 | + such result (either success or failure) is sent to the peer. |
517 | + """ |
518 | + kwargs = {"uuid": uuid} |
519 | + if isinstance(result, Failure): |
520 | + kwargs["failure"] = str(result.value) |
521 | + else: |
522 | + kwargs["result"] = self._check_result(result) |
523 | + self.callRemote(DeferredResponse, **kwargs) |
524 | + |
525 | + |
526 | +class MethodCallClientProtocol(AMP): |
527 | + """Calls methods of a remote object over L{AMP}. |
528 | + |
529 | + @note: If the remote method returns a deferred, the associated local |
530 | + deferred returned by L{send_method_call} will result in the same |
531 | + callback value of the remote deferred. |
532 | + @cvar timeout: A timeout for remote methods returning L{Deferred}s, if a |
533 | + response for the deferred is not received within this amount of |
534 | + seconds, the remote method call will errback with a L{MethodCallError}. |
535 | + """ |
536 | + timeout = 60 |
537 | + |
538 | + def __init__(self): |
539 | + AMP.__init__(self) |
540 | + self._pending_responses = {} |
541 | + |
542 | + @DeferredResponse.responder |
543 | + def receive_deferred_response(self, uuid, result, failure): |
544 | + """Receive the deferred L{MethodCall} response. |
545 | + |
546 | + @param uuid: The id of the L{MethodCall} we're getting the result of. |
547 | + @param result: The result of the associated deferred if successful. |
548 | + @param failure: The failure message of the deferred if it failed. |
549 | + """ |
550 | + self.fire_pending_response(uuid, result, failure) |
551 | + return {} |
552 | + |
553 | + def fire_pending_response(self, uuid, result, failure): |
554 | + """Fire the L{Deferred} associated with a pending response. |
555 | + |
556 | + @param uuid: The id of the L{MethodCall} we're getting the result of. |
557 | + @param result: The result of the associated deferred if successful. |
558 | + @param failure: The failure message of the deferred if it failed. |
559 | + """ |
560 | + try: |
561 | + deferred, call = self._pending_responses.pop(uuid) |
562 | + except KeyError: |
563 | + # Late response for a request that has timeout, just ignore it |
564 | + return |
565 | + if not call.called: |
566 | + call.cancel() |
567 | + if failure is None: |
568 | + deferred.callback({"result": result}) |
569 | + else: |
570 | + deferred.errback(MethodCallError(failure)) |
571 | + |
572 | + def handle_response(self, response): |
573 | + """Handle a L{MethodCall} response. |
574 | + |
575 | + If the response is tagged as deferred, it will be queued as pending, |
576 | + and a L{Deferred} is returned, which will be fired as soon as the |
577 | + final response becomes available, or the timeout is reached. |
578 | + """ |
579 | + if response["deferred"]: |
580 | + uuid = response["deferred"] |
581 | + deferred = Deferred() |
582 | + call = self.factory.reactor.callLater(self.timeout, |
583 | + self.fire_pending_response, |
584 | + uuid, None, "timeout") |
585 | + self._pending_responses[uuid] = (deferred, call) |
586 | + return deferred |
587 | + |
588 | + return response |
589 | + |
590 | + def send_method_call(self, method, args=[], kwargs={}): |
591 | + """Send a L{MethodCall} command with the given arguments. |
592 | + |
593 | + @param method: The name of the remote method to invoke. |
594 | + @param args: The positional arguments to pass to the remote method. |
595 | + @param kwargs: The keyword arguments to pass to the remote method. |
596 | + """ |
597 | + result = self.callRemote(MethodCall, |
598 | + method=method, args=args, kwargs=kwargs) |
599 | + # The result can be C{None} only if the requested command is a |
600 | + # DeferredResponse, which has requiresAnswer set to False |
601 | + if result is not None: |
602 | + return result.addCallback(self.handle_response) |
603 | + |
604 | + |
605 | +class MethodCallProtocol(MethodCallServerProtocol, MethodCallClientProtocol): |
606 | + """Can be used both for sending and receiving L{MethodCall}s.""" |
607 | + |
608 | + def __init__(self): |
609 | + MethodCallServerProtocol.__init__(self) |
610 | + MethodCallClientProtocol.__init__(self) |
611 | + |
612 | + |
613 | +class MethodCallFactory(ReconnectingClientFactory): |
614 | + """ |
615 | + Factory for L{MethodCallProtocol}s exposing an object or connecting to |
616 | + to L{MethodCall} servers. |
617 | + |
618 | + When used to connect, if the connection fails or is lost the factory |
619 | + will keep retrying to establish it. |
620 | + |
621 | + @cvar protocol: The factory used to build protocol instances. |
622 | + @cvar factor: The time factor by which the delay between two subsequent |
623 | + connection retries will increase. |
624 | + """ |
625 | + |
626 | + protocol = MethodCallProtocol |
627 | + factor = 1.6180339887498948 |
628 | + |
629 | + def __init__(self, object=None, reactor=None): |
630 | + """ |
631 | + @param object: The object exposed by the L{MethodCallProtocol}s |
632 | + instances created by this factory. |
633 | + @param reactor: The reactor used by the created protocols |
634 | + to schedule notifications and timeouts. |
635 | + """ |
636 | + self.object = object |
637 | + self.reactor = reactor |
638 | + self.clock = self.reactor |
639 | + self._notifiers = [] |
640 | + |
641 | + def add_notifier(self, callback, errback=None): |
642 | + """Call the given function on connection, reconnection or give up. |
643 | + |
644 | + @param notifier: A function that will be called when the factory builds |
645 | + a new connected protocol or gives up connecting. It will be passed |
646 | + the new protocol instance as argument, or the connectionf failure. |
647 | + """ |
648 | + self._notifiers.append((callback, errback)) |
649 | + |
650 | + def remove_notifier(self, callback, errback=None): |
651 | + """Remove a notifier.""" |
652 | + self._notifiers.remove((callback, errback)) |
653 | + |
654 | + def notify_success(self, *args, **kwargs): |
655 | + """Notify all registered notifier callbacks.""" |
656 | + for callback, _ in self._notifiers: |
657 | + self.reactor.callLater(0, callback, *args, **kwargs) |
658 | + |
659 | + def notify_failure(self, failure): |
660 | + """Notify all registered notifier errbacks.""" |
661 | + for _, errback in self._notifiers: |
662 | + if errback is not None: |
663 | + self.reactor.callLater(0, errback, failure) |
664 | + |
665 | + def clientConnectionFailed(self, connector, reason): |
666 | + ReconnectingClientFactory.clientConnectionFailed(self, connector, |
667 | + reason) |
668 | + if self.maxRetries is not None and (self.retries > self.maxRetries): |
669 | + self.notify_failure(reason) # Give up |
670 | + |
671 | + def buildProtocol(self, addr): |
672 | + self.resetDelay() |
673 | + protocol = self.protocol() |
674 | + protocol.factory = self |
675 | + self.notify_success(protocol) |
676 | + return protocol |
677 | + |
678 | + |
679 | +class RemoteObject(object): |
680 | + """An object able to transparently call methods on a remote object. |
681 | + |
682 | + Any method call on a L{RemoteObject} instance will return a L{Deferred} |
683 | + resulting in the return value of the same method call performed on |
684 | + the remote object exposed by the peer. |
685 | + """ |
686 | + |
687 | + def __init__(self, protocol, retry_on_reconnect=False, timeout=None): |
688 | + """ |
689 | + @param protocol: A reference to a connected L{AMP} protocol instance, |
690 | + which will be used to send L{MethodCall} commands. |
691 | + @param retry_on_reconnect: If C{True}, this L{RemoteObject} will retry |
692 | + to perform again requests that failed due to a lost connection, as |
693 | + soon as a new connection is available. |
694 | + @param timeout: A timeout for failed requests, if the L{RemoteObject} |
695 | + can't perform them again successfully within this number of |
696 | + seconds, they will errback with a L{MethodCallError}. |
697 | + """ |
698 | + self._protocol = protocol |
699 | + self._factory = protocol.factory |
700 | + self._reactor = protocol.factory.reactor |
701 | + self._retry_on_reconnect = retry_on_reconnect |
702 | + self._timeout = timeout |
703 | + self._pending_requests = {} |
704 | + self._factory.add_notifier(self._handle_reconnect) |
705 | + |
706 | + def __getattr__(self, method): |
707 | + """Return a function sending a L{MethodCall} for the given C{method}. |
708 | + |
709 | + When the created function is called, it sends the an appropriate |
710 | + L{MethodCall} to the remote peer passing it the arguments and |
711 | + keyword arguments it was called with, and returning a L{Deferred} |
712 | + resulting in the L{MethodCall}'s response value. |
713 | + """ |
714 | + |
715 | + def send_method_call(*args, **kwargs): |
716 | + result = self._protocol.send_method_call(method=method, |
717 | + args=args, |
718 | + kwargs=kwargs) |
719 | + deferred = Deferred() |
720 | + result.addCallback(self._handle_response, deferred) |
721 | + result.addErrback(self._handle_failure, method, args, kwargs, |
722 | + deferred) |
723 | + return deferred |
724 | + |
725 | + return send_method_call |
726 | + |
727 | + def _handle_response(self, response, deferred, call=None): |
728 | + """Handles a successful L{MethodCall} response. |
729 | + |
730 | + @param response: The L{MethodCall} response. |
731 | + @param deferred: The deferred that was returned to the caller. |
732 | + @param call: If not C{None}, the scheduled timeout call associated with |
733 | + the given deferred. |
734 | + """ |
735 | + result = response["result"] |
736 | + if call is not None: |
737 | + call.cancel() # This is a successful retry, cancel the timeout. |
738 | + deferred.callback(result) |
739 | + |
740 | + def _handle_failure(self, failure, method, args, kwargs, deferred, |
741 | + call=None): |
742 | + """Called when a L{MethodCall} command fails. |
743 | + |
744 | + If a failure is due to a connection error and if C{retry_on_reconnect} |
745 | + is C{True}, we will try to perform the requested L{MethodCall} again |
746 | + as soon as a new connection becomes available, giving up after the |
747 | + specified C{timeout}, if any. |
748 | + |
749 | + @param failure: The L{Failure} raised by the requested L{MethodCall}. |
750 | + @param name: The method name associated with the failed L{MethodCall}. |
751 | + @param args: The positional arguments of the failed L{MethodCall}. |
752 | + @param kwargs: The keyword arguments of the failed L{MethodCall}. |
753 | + @param deferred: The deferred that was returned to the caller. |
754 | + @param call: If not C{None}, the scheduled timeout call associated with |
755 | + the given deferred. |
756 | + """ |
757 | + is_method_call_error = failure.type is MethodCallError |
758 | + dont_retry = self._retry_on_reconnect == False |
759 | + |
760 | + if is_method_call_error or dont_retry: |
761 | + # This means either that the connection is working, and a |
762 | + # MethodCall protocol error occured, or that we gave up |
763 | + # trying and raised a timeout. In any case just propagate |
764 | + # the error. |
765 | + if deferred in self._pending_requests: |
766 | + self._pending_requests.pop(deferred) |
767 | + if call: |
768 | + call.cancel() |
769 | + deferred.errback(failure) |
770 | + return |
771 | + |
772 | + if self._timeout and call is None: |
773 | + # This is the first failure for this request, let's schedule a |
774 | + # timeout call. |
775 | + timeout = Failure(MethodCallError("timeout")) |
776 | + call = self._reactor.callLater(self._timeout, |
777 | + self._handle_failure, |
778 | + timeout, method, args, |
779 | + kwargs, deferred=deferred) |
780 | + |
781 | + self._pending_requests[deferred] = (method, args, kwargs, call) |
782 | + |
783 | + def _handle_reconnect(self, protocol): |
784 | + """Handles a reconnection. |
785 | + |
786 | + @param protocol: The newly connected protocol instance. |
787 | + """ |
788 | + self._protocol = protocol |
789 | + if self._retry_on_reconnect: |
790 | + self._retry() |
791 | + |
792 | + def _retry(self): |
793 | + """Try to perform again requests that failed.""" |
794 | + |
795 | + # We need to copy the requests list before iterating over it, because |
796 | + # if we are actually still disconnected, callRemote will return a |
797 | + # failed deferred and the _handle_failure errback will be executed |
798 | + # synchronously during the loop, modifing the requests list itself. |
799 | + requests = self._pending_requests.copy() |
800 | + self._pending_requests.clear() |
801 | + |
802 | + while requests: |
803 | + deferred, (method, args, kwargs, call) = requests.popitem() |
804 | + result = self._protocol.send_method_call(method, args, kwargs) |
805 | + result.addCallback(self._handle_response, |
806 | + deferred=deferred, call=call) |
807 | + result.addErrback(self._handle_failure, method, args, kwargs, |
808 | + deferred=deferred, call=call) |
809 | + |
810 | + |
811 | +class RemoteObjectCreator(object): |
812 | + """Connect to remote objects exposed by a L{MethodCallProtocol}.""" |
813 | + |
814 | + factory = MethodCallFactory |
815 | + remote = RemoteObject |
816 | + |
817 | + def __init__(self, reactor, socket_path, *args, **kwargs): |
818 | + """ |
819 | + @param reactor: A reactor able to connect to Unix sockets. |
820 | + @param socket: The path to the socket we want to connect to. |
821 | + @param args: Arguments to be passed to the created L{RemoteObject}. |
822 | + @param kwargs: Keyword arguments for the created L{RemoteObject}. |
823 | + """ |
824 | + self._socket_path = socket_path |
825 | + self._reactor = reactor |
826 | + self._args = args |
827 | + self._kwargs = kwargs |
828 | + |
829 | + def connect(self, max_retries=None): |
830 | + """Connect to a remote object exposed by a L{MethodCallProtocol}. |
831 | + |
832 | + This method will connect to the socket provided in the constructor |
833 | + and return a L{Deferred} resulting in a connected L{RemoteObject}. |
834 | + |
835 | + @param max_retries: If not C{None} give up try to connect after this |
836 | + amount of times. |
837 | + """ |
838 | + self._connected = Deferred() |
839 | + self._factory = self.factory(reactor=self._reactor) |
840 | + self._factory.maxRetries = max_retries |
841 | + self._factory.add_notifier(self._success, self._failure) |
842 | + self._reactor.connectUNIX(self._socket_path, self._factory) |
843 | + return self._connected |
844 | + |
845 | + def _success(self, result): |
846 | + """Called when the first connection has been established""" |
847 | + |
848 | + # We did our job, remove our own notifier and let the remote object |
849 | + # handle reconnections. |
850 | + self._factory.remove_notifier(self._success, self._failure) |
851 | + self._remote = self.remote(result, *self._args, **self._kwargs) |
852 | + self._connected.callback(self._remote) |
853 | + |
854 | + def _failure(self, failure): |
855 | + """Called when the first connection has failed""" |
856 | + self._connected.errback(failure) |
857 | + |
858 | + def disconnect(self): |
859 | + """Disconnect the L{RemoteObject} that we have created.""" |
860 | + self._factory.stopTrying() |
861 | + self._remote._protocol.transport.loseConnection() |
862 | +>>>>>>> MERGE-SOURCE |
863 | |
864 | === modified file 'landscape/lib/tests/test_amp.py' |
865 | --- landscape/lib/tests/test_amp.py 2010-02-12 09:50:07 +0000 |
866 | +++ landscape/lib/tests/test_amp.py 2010-03-05 12:44:28 +0000 |
867 | @@ -1,3 +1,4 @@ |
868 | +<<<<<<< TREE |
869 | from twisted.internet import reactor |
870 | from twisted.internet.defer import Deferred, DeferredList |
871 | from twisted.internet.protocol import ClientCreator |
872 | @@ -720,3 +721,718 @@ |
873 | self.assertFailure(result, MethodCallError) |
874 | reconnected = Deferred() |
875 | return result.addCallback(assert_failure) |
876 | +======= |
877 | +from twisted.internet import reactor |
878 | +from twisted.internet.defer import Deferred, DeferredList |
879 | +from twisted.internet.protocol import ClientCreator |
880 | +from twisted.internet.error import ConnectionDone, ConnectError |
881 | +from twisted.internet.task import Clock |
882 | + |
883 | +from landscape.lib.amp import ( |
884 | + MethodCallError, MethodCallProtocol, MethodCallFactory, RemoteObject, |
885 | + RemoteObjectCreator) |
886 | +from landscape.tests.helpers import LandscapeTest |
887 | +from landscape.tests.mocker import KWARGS |
888 | + |
889 | + |
890 | +class WordsException(Exception): |
891 | + """Test exception.""" |
892 | + |
893 | + |
894 | +class Words(object): |
895 | + """Test class to be used as target object of a L{MethodCallProtocol}.""" |
896 | + |
897 | + def secret(self): |
898 | + raise RuntimeError("I'm not supposed to be called!") |
899 | + |
900 | + def empty(self): |
901 | + pass |
902 | + |
903 | + def motd(self): |
904 | + return "Words are cool" |
905 | + |
906 | + def capitalize(self, word): |
907 | + return word.capitalize() |
908 | + |
909 | + def is_short(self, word): |
910 | + return len(word) < 4 |
911 | + |
912 | + def concatenate(self, word1, word2): |
913 | + return word1 + word2 |
914 | + |
915 | + def lower_case(self, word, index=None): |
916 | + if index is None: |
917 | + return word.lower() |
918 | + else: |
919 | + return word[:index] + word[index:].lower() |
920 | + |
921 | + def multiply_alphabetically(self, word_times): |
922 | + result = "" |
923 | + for word, times in sorted(word_times.iteritems()): |
924 | + result += word * times |
925 | + return result |
926 | + |
927 | + def meaning_of_life(self): |
928 | + |
929 | + class Complex(object): |
930 | + pass |
931 | + return Complex() |
932 | + |
933 | + def _check(self, word, seed, value=3): |
934 | + if seed == "cool" and value == 4: |
935 | + return "Guessed!" |
936 | + |
937 | + def guess(self, word, *args, **kwargs): |
938 | + return self._check(word, *args, **kwargs) |
939 | + |
940 | + def translate(self, word): |
941 | + raise WordsException("Unknown word") |
942 | + |
943 | + def google(self, word): |
944 | + deferred = Deferred() |
945 | + if word == "Landscape": |
946 | + reactor.callLater(0.01, lambda: deferred.callback("Cool!")) |
947 | + elif word == "Easy query": |
948 | + deferred.callback("Done!") |
949 | + elif word == "Weird stuff": |
950 | + reactor.callLater(0.01, lambda: deferred.errback(Exception("bad"))) |
951 | + elif word == "Censored": |
952 | + deferred.errback(Exception("very bad")) |
953 | + elif word == "Long query": |
954 | + # Do nothing, the deferred won't be fired at all |
955 | + pass |
956 | + elif word == "Slowish query": |
957 | + # Fire the result after a while. |
958 | + reactor.callLater(0.05, lambda: deferred.callback("Done!")) |
959 | + return deferred |
960 | + |
961 | + |
962 | +class WordsProtocol(MethodCallProtocol): |
963 | + |
964 | + methods = ["empty", |
965 | + "motd", |
966 | + "capitalize", |
967 | + "is_short", |
968 | + "concatenate", |
969 | + "lower_case", |
970 | + "multiply_alphabetically", |
971 | + "translate", |
972 | + "meaning_of_life", |
973 | + "guess", |
974 | + "google"] |
975 | + |
976 | + timeout = 0.1 |
977 | + |
978 | + |
979 | +class WordsFactory(MethodCallFactory): |
980 | + |
981 | + protocol = WordsProtocol |
982 | + factor = 0.19 |
983 | + |
984 | + |
985 | +class RemoteWordsCreator(RemoteObjectCreator): |
986 | + |
987 | + factory = WordsFactory |
988 | + |
989 | + |
990 | +class MethodCallProtocolTest(LandscapeTest): |
991 | + |
992 | + def setUp(self): |
993 | + super(MethodCallProtocolTest, self).setUp() |
994 | + socket = self.mktemp() |
995 | + factory = WordsFactory(object=Words()) |
996 | + self.port = reactor.listenUNIX(socket, factory) |
997 | + |
998 | + def set_protocol(protocol): |
999 | + self.protocol = protocol |
1000 | + |
1001 | + connector = ClientCreator(reactor, WordsProtocol) |
1002 | + connected = connector.connectUNIX(socket) |
1003 | + return connected.addCallback(set_protocol) |
1004 | + |
1005 | + def tearDown(self): |
1006 | + self.protocol.transport.loseConnection() |
1007 | + self.port.stopListening() |
1008 | + super(MethodCallProtocolTest, self).tearDown() |
1009 | + |
1010 | + def test_with_forbidden_method(self): |
1011 | + """ |
1012 | + If a method is not included in L{MethodCallProtocol.methods} it |
1013 | + can't be called. |
1014 | + """ |
1015 | + result = self.protocol.send_method_call(method="secret", |
1016 | + args=[], |
1017 | + kwargs={}) |
1018 | + return self.assertFailure(result, MethodCallError) |
1019 | + |
1020 | + def test_with_no_arguments(self): |
1021 | + """ |
1022 | + A connected client can issue a L{MethodCall} without arguments and |
1023 | + with an empty response. |
1024 | + """ |
1025 | + result = self.protocol.send_method_call(method="empty", |
1026 | + args=[], |
1027 | + kwargs={}) |
1028 | + return self.assertSuccess(result, {"result": None, |
1029 | + "deferred": None}) |
1030 | + |
1031 | + def test_with_return_value(self): |
1032 | + """ |
1033 | + A connected client can issue a L{MethodCall} targeted to an |
1034 | + object method with a return value. |
1035 | + """ |
1036 | + result = self.protocol.send_method_call(method="motd", |
1037 | + args=[], |
1038 | + kwargs={}) |
1039 | + return self.assertSuccess(result, {"result": "Words are cool", |
1040 | + "deferred": None}) |
1041 | + |
1042 | + def test_with_one_argument(self): |
1043 | + """ |
1044 | + A connected AMP client can issue a L{MethodCall} with one argument and |
1045 | + a response value. |
1046 | + """ |
1047 | + result = self.protocol.send_method_call(method="capitalize", |
1048 | + args=["john"], |
1049 | + kwargs={}) |
1050 | + return self.assertSuccess(result, {"result": "John", |
1051 | + "deferred": None}) |
1052 | + |
1053 | + def test_with_boolean_return_value(self): |
1054 | + """ |
1055 | + The return value of a L{MethodCall} argument can be a boolean. |
1056 | + """ |
1057 | + result = self.protocol.send_method_call(method="is_short", |
1058 | + args=["hi"], |
1059 | + kwargs={}) |
1060 | + return self.assertSuccess(result, {"result": True, |
1061 | + "deferred": None}) |
1062 | + |
1063 | + def test_with_many_arguments(self): |
1064 | + """ |
1065 | + A connected client can issue a L{MethodCall} with many arguments. |
1066 | + """ |
1067 | + result = self.protocol.send_method_call(method="concatenate", |
1068 | + args=["You ", "rock"], |
1069 | + kwargs={}) |
1070 | + return self.assertSuccess(result, {"result": "You rock", |
1071 | + "deferred": None}) |
1072 | + |
1073 | + def test_with_default_arguments(self): |
1074 | + """ |
1075 | + A connected client can issue a L{MethodCall} for methods having |
1076 | + default arguments. |
1077 | + """ |
1078 | + result = self.protocol.send_method_call(method="lower_case", |
1079 | + args=["OHH"], |
1080 | + kwargs={}) |
1081 | + return self.assertSuccess(result, {"result": "ohh", |
1082 | + "deferred": None}) |
1083 | + |
1084 | + def test_with_overriden_default_arguments(self): |
1085 | + """ |
1086 | + A connected client can issue a L{MethodCall} with keyword arguments |
1087 | + having default values in the target object. If a value is specified by |
1088 | + the caller it will be used in place of the default value |
1089 | + """ |
1090 | + result = self.protocol.send_method_call(method="lower_case", |
1091 | + args=["OHH"], |
1092 | + kwargs={"index": 2}) |
1093 | + return self.assertSuccess(result, {"result": "OHh", |
1094 | + "deferred": None}) |
1095 | + |
1096 | + def test_with_dictionary_arguments(self): |
1097 | + """ |
1098 | + Method arguments passed to a L{MethodCall} can be dictionaries. |
1099 | + """ |
1100 | + result = self.protocol.send_method_call(method="multiply_" |
1101 | + "alphabetically", |
1102 | + args=[{"foo": 2, "bar": 3}], |
1103 | + kwargs={}) |
1104 | + return self.assertSuccess(result, {"result": "barbarbarfoofoo", |
1105 | + "deferred": None}) |
1106 | + |
1107 | + def test_with_non_serializable_return_value(self): |
1108 | + """ |
1109 | + If the target object method returns an object that can't be serialized, |
1110 | + the L{MethodCall} result is C{None}. |
1111 | + """ |
1112 | + result = self.protocol.send_method_call(method="meaning_of_life", |
1113 | + args=[], |
1114 | + kwargs={}) |
1115 | + return self.assertFailure(result, MethodCallError) |
1116 | + |
1117 | + def test_translate(self): |
1118 | + """ |
1119 | + If the target object method raises an exception, the remote call fails |
1120 | + with a L{MethodCallError}. |
1121 | + """ |
1122 | + result = self.protocol.send_method_call(method="translate", |
1123 | + args=["hi"], |
1124 | + kwargs={}) |
1125 | + return self.assertFailure(result, MethodCallError) |
1126 | + |
1127 | + |
1128 | +class RemoteObjectTest(LandscapeTest): |
1129 | + |
1130 | + def setUp(self): |
1131 | + super(RemoteObjectTest, self).setUp() |
1132 | + socket = self.mktemp() |
1133 | + server_factory = WordsFactory(object=Words()) |
1134 | + self.port = reactor.listenUNIX(socket, server_factory) |
1135 | + |
1136 | + def set_remote(protocol): |
1137 | + self.protocol = protocol |
1138 | + self.words = RemoteObject(protocol) |
1139 | + client_factory.stopTrying() |
1140 | + |
1141 | + connected = Deferred() |
1142 | + connected.addCallback(set_remote) |
1143 | + client_factory = WordsFactory(reactor=reactor) |
1144 | + client_factory.add_notifier(connected.callback) |
1145 | + reactor.connectUNIX(socket, client_factory) |
1146 | + return connected |
1147 | + |
1148 | + def tearDown(self): |
1149 | + self.protocol.transport.loseConnection() |
1150 | + self.port.stopListening() |
1151 | + super(RemoteObjectTest, self).tearDown() |
1152 | + |
1153 | + def test_method_call_sender_with_forbidden_method(self): |
1154 | + """ |
1155 | + A L{RemoteObject} can send L{MethodCall}s without arguments and withj |
1156 | + an empty response. |
1157 | + """ |
1158 | + result = self.words.secret() |
1159 | + return self.assertFailure(result, MethodCallError) |
1160 | + |
1161 | + def test_with_no_arguments(self): |
1162 | + """ |
1163 | + A L{RemoteObject} can send L{MethodCall}s without arguments and withj |
1164 | + an empty response. |
1165 | + """ |
1166 | + return self.assertSuccess(self.words.empty()) |
1167 | + |
1168 | + def test_with_return_value(self): |
1169 | + """ |
1170 | + A L{RemoteObject} can send L{MethodCall}s without arguments and get |
1171 | + back the value of the commands's response. |
1172 | + """ |
1173 | + result = self.words.motd() |
1174 | + return self.assertSuccess(result, "Words are cool") |
1175 | + |
1176 | + def test_with_one_argument(self): |
1177 | + """ |
1178 | + A L{RemoteObject} can send L{MethodCall}s with one argument and get |
1179 | + the response value. |
1180 | + """ |
1181 | + result = self.words.capitalize("john") |
1182 | + return self.assertSuccess(result, "John") |
1183 | + |
1184 | + def test_with_one_keyword_argument(self): |
1185 | + """ |
1186 | + A L{RemoteObject} can send L{MethodCall}s with a named argument. |
1187 | + """ |
1188 | + result = self.words.capitalize(word="john") |
1189 | + return self.assertSuccess(result, "John") |
1190 | + |
1191 | + def test_with_boolean_return_value(self): |
1192 | + """ |
1193 | + The return value of a L{MethodCall} argument can be a boolean. |
1194 | + """ |
1195 | + return self.assertSuccess(self.words.is_short("hi"), True) |
1196 | + |
1197 | + def test_with_many_arguments(self): |
1198 | + """ |
1199 | + A L{RemoteObject} can send L{MethodCall}s with more than one argument. |
1200 | + """ |
1201 | + result = self.words.concatenate("You ", "rock") |
1202 | + return self.assertSuccess(result, "You rock") |
1203 | + |
1204 | + def test_with_many_keyword_arguments(self): |
1205 | + """ |
1206 | + A L{RemoteObject} can send L{MethodCall}s with several |
1207 | + named arguments. |
1208 | + """ |
1209 | + result = self.words.concatenate(word2="rock", word1="You ") |
1210 | + return self.assertSuccess(result, "You rock") |
1211 | + |
1212 | + def test_with_default_arguments(self): |
1213 | + """ |
1214 | + A L{RemoteObject} can send a L{MethodCall} having an argument with |
1215 | + a default value. |
1216 | + """ |
1217 | + result = self.words.lower_case("OHH") |
1218 | + return self.assertSuccess(result, "ohh") |
1219 | + |
1220 | + def test_with_overriden_default_arguments(self): |
1221 | + """ |
1222 | + A L{RemoteObject} can send L{MethodCall}s overriding the default |
1223 | + value of an argument. |
1224 | + """ |
1225 | + result = self.words.lower_case("OHH", 2) |
1226 | + return self.assertSuccess(result, "OHh") |
1227 | + |
1228 | + def test_with_dictionary_arguments(self): |
1229 | + """ |
1230 | + A L{RemoteObject} can send a L{MethodCall}s for methods requiring |
1231 | + a dictionary arguments. |
1232 | + """ |
1233 | + result = self.words.multiply_alphabetically({"foo": 2, "bar": 3}) |
1234 | + return self.assertSuccess(result, "barbarbarfoofoo") |
1235 | + |
1236 | + def test_with_generic_args_and_kwargs(self): |
1237 | + """ |
1238 | + A L{RemoteObject} behaves well with L{MethodCall}s for methods |
1239 | + having generic C{*args} and C{**kwargs} arguments. |
1240 | + """ |
1241 | + result = self.words.guess("word", "cool", value=4) |
1242 | + return self.assertSuccess(result, "Guessed!") |
1243 | + |
1244 | + def test_with_success_full_deferred(self): |
1245 | + """ |
1246 | + If the target object method returns a L{Deferred}, it is handled |
1247 | + transparently. |
1248 | + """ |
1249 | + result = self.words.google("Landscape") |
1250 | + return self.assertSuccess(result, "Cool!") |
1251 | + |
1252 | + def test_with_failing_deferred(self): |
1253 | + """ |
1254 | + If the target object method returns a failing L{Deferred}, a |
1255 | + L{MethodCallError} is raised. |
1256 | + """ |
1257 | + result = self.words.google("Weird stuff") |
1258 | + return self.assertFailure(result, MethodCallError) |
1259 | + |
1260 | + def test_with_already_callback_deferred(self): |
1261 | + """ |
1262 | + The target object method can return an already fired L{Deferred}. |
1263 | + """ |
1264 | + result = self.words.google("Easy query") |
1265 | + return self.assertSuccess(result, "Done!") |
1266 | + |
1267 | + def test_with_already_errback_deferred(self): |
1268 | + """ |
1269 | + If the target object method can return an already failed L{Deferred}. |
1270 | + """ |
1271 | + result = self.words.google("Censored") |
1272 | + return self.assertFailure(result, MethodCallError) |
1273 | + |
1274 | + def test_with_deferred_timeout(self): |
1275 | + """ |
1276 | + If the peer protocol doesn't send a response for a deferred within |
1277 | + the given timeout, the method call fails. |
1278 | + """ |
1279 | + result = self.words.google("Long query") |
1280 | + return self.assertFailure(result, MethodCallError) |
1281 | + |
1282 | + def test_with_late_response(self): |
1283 | + """ |
1284 | + If the peer protocol sends a late response for a request that has |
1285 | + already timeout, that response is ignored. |
1286 | + """ |
1287 | + self.protocol.timeout = 0.01 |
1288 | + result = self.words.google("Slowish query") |
1289 | + self.assertFailure(result, MethodCallError) |
1290 | + |
1291 | + def assert_late_response_is_handled(ignored): |
1292 | + deferred = Deferred() |
1293 | + # We wait a bit to be sure that the late response gets delivered |
1294 | + reactor.callLater(0.1, lambda: deferred.callback(None)) |
1295 | + return deferred |
1296 | + |
1297 | + return result.addCallback(assert_late_response_is_handled) |
1298 | + |
1299 | + |
1300 | +class MethodCallFactoryTest(LandscapeTest): |
1301 | + |
1302 | + def setUp(self): |
1303 | + super(MethodCallFactoryTest, self).setUp() |
1304 | + self.clock = Clock() |
1305 | + self.factory = WordsFactory(reactor=self.clock) |
1306 | + |
1307 | + def test_add_notifier(self): |
1308 | + """ |
1309 | + The L{MethodCallClientFactory.add_notifier} method can be used to |
1310 | + add a callback function to be called when a connection is made and |
1311 | + a new protocol instance built. |
1312 | + """ |
1313 | + protocol = self.factory.protocol() |
1314 | + self.factory.protocol = lambda: protocol |
1315 | + callback = self.mocker.mock() |
1316 | + callback(protocol) |
1317 | + self.mocker.replay() |
1318 | + self.factory.add_notifier(callback) |
1319 | + self.factory.buildProtocol(None) |
1320 | + self.clock.advance(0) |
1321 | + |
1322 | + def test_remove_notifier(self): |
1323 | + """ |
1324 | + The L{MethodCallClientFactory.remove_notifier} method can be used to |
1325 | + remove a previously added notifier callback. |
1326 | + """ |
1327 | + callback = lambda protocol: 1 / 0 |
1328 | + self.factory.add_notifier(callback) |
1329 | + self.factory.remove_notifier(callback) |
1330 | + self.factory.buildProtocol(None) |
1331 | + self.clock.advance(0) |
1332 | + |
1333 | + def test_client_connection_failed(self): |
1334 | + """ |
1335 | + The L{MethodCallClientFactory} keeps trying to connect if maxRetries |
1336 | + is not reached. |
1337 | + """ |
1338 | + # This is sub-optimal but the ReconnectingFactory in Hardy's Twisted |
1339 | + # doesn't support task.Clock |
1340 | + self.factory.retry = self.mocker.mock() |
1341 | + self.factory.retry(KWARGS) |
1342 | + self.mocker.replay() |
1343 | + self.assertEquals(self.factory.retries, 0) |
1344 | + self.factory.clientConnectionFailed(None, None) |
1345 | + |
1346 | + def test_client_connection_failed_with_max_retries_reached(self): |
1347 | + """ |
1348 | + The L{MethodCallClientFactory} stops trying to connect if maxRetries |
1349 | + is reached. |
1350 | + """ |
1351 | + callback = lambda protocol: 1 / 0 |
1352 | + errback = self.mocker.mock() |
1353 | + errback("failure") |
1354 | + self.mocker.replay() |
1355 | + |
1356 | + self.factory.add_notifier(callback, errback) |
1357 | + self.factory.maxRetries = 1 |
1358 | + self.factory.retries = self.factory.maxRetries |
1359 | + self.factory.clientConnectionFailed(object(), "failure") |
1360 | + self.clock.advance(0) |
1361 | + |
1362 | + |
1363 | +class RemoteObjectCreatorTest(LandscapeTest): |
1364 | + |
1365 | + def setUp(self): |
1366 | + super(RemoteObjectCreatorTest, self).setUp() |
1367 | + self.socket = self.mktemp() |
1368 | + self.server_factory = WordsFactory(object=Words()) |
1369 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1370 | + self.connector = RemoteWordsCreator(reactor, self.socket, |
1371 | + retry_on_reconnect=True, |
1372 | + timeout=0.7) |
1373 | + |
1374 | + def set_remote(words): |
1375 | + self.words = words |
1376 | + |
1377 | + connected = self.connector.connect() |
1378 | + return connected.addCallback(set_remote) |
1379 | + |
1380 | + def tearDown(self): |
1381 | + self.connector.disconnect() |
1382 | + self.port.stopListening() |
1383 | + super(RemoteObjectCreatorTest, self).tearDown() |
1384 | + |
1385 | + def test_connect(self): |
1386 | + """ |
1387 | + The L{RemoteObject} resulting form the deferred returned by |
1388 | + L{RemoteObjectCreator.connect} is properly connected to the |
1389 | + remote peer. |
1390 | + """ |
1391 | + return self.assertSuccess(self.words.empty()) |
1392 | + |
1393 | + def test_connect_with_max_retries(self): |
1394 | + """ |
1395 | + If C{max_retries} is passed to the L{RemoteObjectCreator} method, |
1396 | + then it will give up trying to connect after that amout of times. |
1397 | + """ |
1398 | + self.connector.disconnect() |
1399 | + self.port.stopListening() |
1400 | + |
1401 | + def reconnect(ignored): |
1402 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1403 | + return self.connector.connect() |
1404 | + |
1405 | + result = self.connector.connect(max_retries=0) |
1406 | + self.assertFailure(result, ConnectError) |
1407 | + return result.addCallback(reconnect) |
1408 | + |
1409 | + def test_reconnect(self): |
1410 | + """ |
1411 | + If the connection is lost, the L{RemoteObject} created by the creator |
1412 | + will transparently handle the reconnection. |
1413 | + """ |
1414 | + self.words._protocol.transport.loseConnection() |
1415 | + self.port.stopListening() |
1416 | + |
1417 | + def restart_listening(): |
1418 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1419 | + reactor.callLater(0.3, assert_remote) |
1420 | + |
1421 | + def assert_remote(): |
1422 | + result = self.words.empty() |
1423 | + result.addCallback(lambda x: reconnected.callback(None)) |
1424 | + return result |
1425 | + |
1426 | + reactor.callLater(0.01, restart_listening) |
1427 | + reconnected = Deferred() |
1428 | + return reconnected |
1429 | + |
1430 | + def test_method_call_error(self): |
1431 | + """ |
1432 | + If a L{MethodCall} fails due to a L{MethodCallError}, the |
1433 | + L{RemoteObject} won't try to perform it again. |
1434 | + """ |
1435 | + return self.assertFailure(self.words.secret(), MethodCallError) |
1436 | + |
1437 | + def test_retry(self): |
1438 | + """ |
1439 | + If the connection is lost, the L{RemoteObject} created by the creator |
1440 | + will transparently retry to perform the L{MethodCall} requests that |
1441 | + failed due to the broken connection. |
1442 | + """ |
1443 | + self.words._protocol.transport.loseConnection() |
1444 | + self.port.stopListening() |
1445 | + |
1446 | + def restart_listening(): |
1447 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1448 | + |
1449 | + reactor.callLater(0.1, restart_listening) |
1450 | + return self.words.empty() |
1451 | + |
1452 | + def test_retry_with_method_call_error(self): |
1453 | + """ |
1454 | + If a retried L{MethodCall} request fails due to a L{MethodCallError}, |
1455 | + the L{RemoteObject} will properly propagate the error to the original |
1456 | + caller. |
1457 | + """ |
1458 | + self.words._protocol.transport.loseConnection() |
1459 | + self.port.stopListening() |
1460 | + |
1461 | + def restart_listening(): |
1462 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1463 | + |
1464 | + def assert_failure(error): |
1465 | + self.assertEquals(str(error), "Forbidden method 'secret'") |
1466 | + |
1467 | + reactor.callLater(0.5, restart_listening) |
1468 | + result = self.words.secret() |
1469 | + self.assertFailure(result, MethodCallError) |
1470 | + return result.addCallback(assert_failure) |
1471 | + |
1472 | + def test_wb_retry_with_while_still_disconnected(self): |
1473 | + """ |
1474 | + The L{RemoteObject._retry} method gets called as soon as a new |
1475 | + connection is ready. If for whatever reason the connection drops |
1476 | + again very quickly, the C{_retry} method will behave as expected. |
1477 | + """ |
1478 | + self.words._protocol.transport.loseConnection() |
1479 | + self.port.stopListening() |
1480 | + |
1481 | + def handle_reconnect(protocol): |
1482 | + # In this precise moment we have a newly connected protocol |
1483 | + self.words._protocol = protocol |
1484 | + |
1485 | + # Pretend that the connection is lost again very quickly |
1486 | + protocol.transport.loseConnection() |
1487 | + self.port.stopListening() |
1488 | + |
1489 | + # Force RemoteObject._retry to run using a disconnected protocol |
1490 | + reactor.callLater(0, self.words._retry) |
1491 | + |
1492 | + # Restore the real handler and start listening again very soon |
1493 | + self.connector._factory.remove_notifier(handle_reconnect) |
1494 | + self.connector._factory.add_notifier(self.words._handle_reconnect) |
1495 | + reactor.callLater(0.2, restart_listening) |
1496 | + |
1497 | + def restart_listening(): |
1498 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1499 | + |
1500 | + def assert_failure(error): |
1501 | + self.assertEquals(str(error), "Forbidden method 'secret'") |
1502 | + |
1503 | + # Use our own reconnect handler |
1504 | + self.connector._factory.remove_notifier(self.words._handle_reconnect) |
1505 | + self.connector._factory.add_notifier(handle_reconnect) |
1506 | + |
1507 | + reactor.callLater(0.2, restart_listening) |
1508 | + result = self.words.secret() |
1509 | + self.assertFailure(result, MethodCallError) |
1510 | + return result.addCallback(assert_failure) |
1511 | + |
1512 | + def test_retry_with_many_method_calls(self): |
1513 | + """ |
1514 | + If several L{MethodCall} requests were issued while disconnected, they |
1515 | + will be all eventually completed when the connection gets established |
1516 | + again. |
1517 | + """ |
1518 | + self.words._protocol.transport.loseConnection() |
1519 | + self.port.stopListening() |
1520 | + |
1521 | + def restart_listening(): |
1522 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1523 | + |
1524 | + def assert_guess(response): |
1525 | + self.assertEquals(response, "Guessed!") |
1526 | + |
1527 | + def assert_secret(failure): |
1528 | + self.assertEquals(str(failure.value), "Forbidden method 'secret'") |
1529 | + |
1530 | + def assert_motd(response): |
1531 | + self.assertEquals(response, "Words are cool") |
1532 | + |
1533 | + reactor.callLater(0.1, restart_listening) |
1534 | + |
1535 | + results = [self.words.guess("word", "cool", value=4), |
1536 | + self.words.secret(), |
1537 | + self.words.motd()] |
1538 | + results[0].addCallback(assert_guess) |
1539 | + results[1].addErrback(assert_secret) |
1540 | + results[2].addCallback(assert_motd) |
1541 | + return DeferredList(results) |
1542 | + |
1543 | + def test_retry_without_retry_on_reconnect(self): |
1544 | + """ |
1545 | + If C{retry_on_reconnect} is C{False}, the L{RemoteObject} object won't |
1546 | + retry to perform requests which failed because the connection was |
1547 | + lost, however requests made after a reconnection will still succeed. |
1548 | + """ |
1549 | + self.words._protocol.transport.loseConnection() |
1550 | + self.port.stopListening() |
1551 | + |
1552 | + def restart_listening(): |
1553 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1554 | + reactor.callLater(0.3, assert_reconnected) |
1555 | + |
1556 | + def assert_reconnected(): |
1557 | + result = self.words.empty() |
1558 | + result.addCallback(lambda x: reconnected.callback(None)) |
1559 | + return result |
1560 | + |
1561 | + reactor.callLater(0.1, restart_listening) |
1562 | + self.words._retry_on_reconnect = False |
1563 | + result = self.words.empty() |
1564 | + self.assertFailure(result, ConnectionDone) |
1565 | + reconnected = Deferred() |
1566 | + return result.addCallback(lambda x: reconnected) |
1567 | + |
1568 | + def test_retry_with_timeout(self): |
1569 | + """ |
1570 | + If a C{timeout} is set, the L{RemoteObject} object will errback failed |
1571 | + L{MethodCall}s after that amount of seconds, without retrying them when |
1572 | + the connection established again. |
1573 | + """ |
1574 | + self.words._protocol.transport.loseConnection() |
1575 | + self.port.stopListening() |
1576 | + |
1577 | + def restart_listening(): |
1578 | + self.port = reactor.listenUNIX(self.socket, self.server_factory) |
1579 | + reactor.callLater(0.1, reconnected.callback, None) |
1580 | + |
1581 | + def assert_failure(error): |
1582 | + self.assertEquals(str(error), "timeout") |
1583 | + return reconnected |
1584 | + |
1585 | + reactor.callLater(0.9, restart_listening) |
1586 | + result = self.words.empty() |
1587 | + self.assertFailure(result, MethodCallError) |
1588 | + reconnected = Deferred() |
1589 | + return result.addCallback(assert_failure) |
1590 | +>>>>>>> MERGE-SOURCE |
1591 | |
1592 | === modified file 'landscape/package/reporter.py' |
1593 | === modified file 'landscape/package/tests/test_reporter.py' |
1594 | === modified file 'landscape/reactor.py' |
1595 | --- landscape/reactor.py 2009-11-02 13:49:26 +0000 |
1596 | +++ landscape/reactor.py 2010-03-05 12:44:28 +0000 |
1597 | @@ -158,6 +158,13 @@ |
1598 | self.cancel_call(self._run_threaded_callbacks_id) |
1599 | |
1600 | |
1601 | +class UnixReactorMixin(object): |
1602 | + |
1603 | + def listen_unix(self, *args, **kwargs): |
1604 | + """Start listen on a Unix socket.""" |
1605 | + return self._reactor.listenUNIX(*args, **kwargs) |
1606 | + |
1607 | + |
1608 | class ReactorID(object): |
1609 | |
1610 | def __init__(self, timeout): |
1611 | @@ -215,7 +222,7 @@ |
1612 | |
1613 | |
1614 | class FakeReactor(EventHandlingReactorMixin, |
1615 | - ThreadedCallsReactorMixin): |
1616 | + ThreadedCallsReactorMixin, UnixReactorMixin): |
1617 | """ |
1618 | @ivar udp_transports: dict of {port: (protocol, transport)} |
1619 | @ivar hosts: Dict of {hostname: ip}. Users should populate this |
1620 | @@ -228,6 +235,11 @@ |
1621 | self.udp_transports = {} |
1622 | self.hosts = {} |
1623 | |
1624 | + # We need a reference to the Twisted reactor as well to |
1625 | + # let Landscape services listen to Unix sockets |
1626 | + from twisted.internet import reactor |
1627 | + self._reactor = reactor |
1628 | + |
1629 | def time(self): |
1630 | return float(self._current_time) |
1631 | |
1632 | @@ -331,9 +343,8 @@ |
1633 | return succeed(hostname) |
1634 | |
1635 | |
1636 | - |
1637 | class TwistedReactor(EventHandlingReactorMixin, |
1638 | - ThreadedCallsReactorMixin): |
1639 | + ThreadedCallsReactorMixin, UnixReactorMixin): |
1640 | """Wrap and add functionalities to the Twisted C{reactor}.""" |
1641 | |
1642 | def __init__(self): |
1643 | |
1644 | === added file 'landscape/tests/test_amp.py' |
1645 | --- landscape/tests/test_amp.py 1970-01-01 00:00:00 +0000 |
1646 | +++ landscape/tests/test_amp.py 2010-03-05 12:44:28 +0000 |
1647 | @@ -0,0 +1,126 @@ |
1648 | +import os |
1649 | + |
1650 | +from twisted.internet.defer import Deferred |
1651 | +from twisted.internet.error import ConnectError |
1652 | + |
1653 | +from landscape.tests.helpers import LandscapeTest |
1654 | +from landscape.reactor import FakeReactor |
1655 | +from landscape.deployment import Configuration |
1656 | +from landscape.amp import ( |
1657 | + LandscapeComponentFactory, RemoteLandscapeComponentCreator) |
1658 | + |
1659 | + |
1660 | +class TestComponent(object): |
1661 | + |
1662 | + name = "test" |
1663 | + |
1664 | + |
1665 | +class TestComponentFactory(LandscapeComponentFactory): |
1666 | + |
1667 | + maxRetries = 0 |
1668 | + initialDelay = 0.01 |
1669 | + |
1670 | + |
1671 | +class RemoteTestComponentCreator(RemoteLandscapeComponentCreator): |
1672 | + |
1673 | + factory = TestComponentFactory |
1674 | + component = TestComponent |
1675 | + |
1676 | + |
1677 | +class RemoteLandscapeComponentTest(LandscapeTest): |
1678 | + |
1679 | + def setUp(self): |
1680 | + super(RemoteLandscapeComponentTest, self).setUp() |
1681 | + reactor = FakeReactor() |
1682 | + config = Configuration() |
1683 | + config.data_path = self.makeDir() |
1684 | + socket = os.path.join(config.data_path, "test.sock") |
1685 | + self.component = TestComponent() |
1686 | + factory = LandscapeComponentFactory(object=self.component) |
1687 | + self.port = reactor.listen_unix(socket, factory) |
1688 | + |
1689 | + |
1690 | + self.connector = RemoteTestComponentCreator(reactor, config) |
1691 | + connected = self.connector.connect() |
1692 | + connected.addCallback(lambda remote: setattr(self, "remote", remote)) |
1693 | + return connected |
1694 | + |
1695 | + def tearDown(self): |
1696 | + self.connector.disconnect() |
1697 | + self.port.stopListening() |
1698 | + super(RemoteLandscapeComponentTest, self).tearDown() |
1699 | + |
1700 | + def test_ping(self): |
1701 | + """ |
1702 | + The L{LandscapeComponentProtocol} exposes the C{ping} method of a |
1703 | + remote Landscape component. |
1704 | + """ |
1705 | + self.component.ping = self.mocker.mock() |
1706 | + self.expect(self.component.ping()).result(True) |
1707 | + self.mocker.replay() |
1708 | + result = self.remote.ping() |
1709 | + return self.assertSuccess(result, True) |
1710 | + |
1711 | + def test_exit(self): |
1712 | + """ |
1713 | + The L{LandscapeComponentProtocol} exposes the C{exit} method of a |
1714 | + remote Landscape component. |
1715 | + """ |
1716 | + self.component.exit = self.mocker.mock() |
1717 | + self.component.exit() |
1718 | + self.mocker.replay() |
1719 | + result = self.remote.exit() |
1720 | + return self.assertSuccess(result) |
1721 | + |
1722 | + |
1723 | +class RemoteLandscapeComponentCreatorTest(LandscapeTest): |
1724 | + |
1725 | + def setUp(self): |
1726 | + super(RemoteLandscapeComponentCreatorTest, self).setUp() |
1727 | + self.reactor = FakeReactor() |
1728 | + self.config = Configuration() |
1729 | + self.config.data_path = self.makeDir() |
1730 | + self.connector = RemoteTestComponentCreator(self.reactor, self.config) |
1731 | + |
1732 | + def test_connect_logs_errors(self): |
1733 | + """ |
1734 | + Connection errors are logged. |
1735 | + """ |
1736 | + self.log_helper.ignore_errors("Error while connecting to test") |
1737 | + |
1738 | + def assert_log(ignored): |
1739 | + self.assertIn("Error while connecting to test", |
1740 | + self.logfile.getvalue()) |
1741 | + |
1742 | + result = self.connector.connect(max_retries=0) |
1743 | + self.assertFailure(result, ConnectError) |
1744 | + return result.addCallback(assert_log) |
1745 | + |
1746 | + def test_reconnect_fires_event(self): |
1747 | + """ |
1748 | + An event is fired whenever the connection is established again after |
1749 | + it has been lost. |
1750 | + """ |
1751 | + socket = os.path.join(self.config.data_path, "test.sock") |
1752 | + factory = LandscapeComponentFactory() |
1753 | + ports = [] |
1754 | + ports.append(self.reactor.listen_unix(socket, factory)) |
1755 | + |
1756 | + def listen_again(): |
1757 | + ports.append(self.reactor.listen_unix(socket, factory)) |
1758 | + |
1759 | + def connected(remote): |
1760 | + remote._protocol.transport.loseConnection() |
1761 | + ports[0].stopListening() |
1762 | + self.reactor._reactor.callLater(0.01, listen_again) |
1763 | + |
1764 | + def reconnected(): |
1765 | + self.connector.disconnect() |
1766 | + ports[1].stopListening() |
1767 | + deferred.callback(None) |
1768 | + |
1769 | + deferred = Deferred() |
1770 | + self.reactor.call_on("test-reconnect", reconnected) |
1771 | + result = self.connector.connect() |
1772 | + result.addCallback(connected) |
1773 | + return deferred |
Looks nice and small, thanks for keeping the branches small :-)