Merge lp:~free.ekanayaka/landscape-client/amp-cleanup-3 into lp:~landscape/landscape-client/trunk
- amp-cleanup-3
- Merge into trunk
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Free Ekanayaka | ||||
Approved revision: | 664 | ||||
Merged at revision: | 664 | ||||
Proposed branch: | lp:~free.ekanayaka/landscape-client/amp-cleanup-3 | ||||
Merge into: | lp:~landscape/landscape-client/trunk | ||||
Diff against target: |
722 lines (+182/-119) 15 files modified
landscape/amp.py (+76/-21) landscape/broker/amp.py (+24/-35) landscape/broker/server.py (+1/-1) landscape/broker/service.py (+6/-3) landscape/broker/tests/helpers.py (+12/-13) landscape/broker/tests/test_service.py (+0/-1) landscape/lib/amp.py (+34/-9) landscape/lib/tests/test_amp.py (+10/-11) landscape/manager/service.py (+5/-3) landscape/manager/tests/test_service.py (+0/-2) landscape/monitor/service.py (+5/-3) landscape/monitor/tests/test_service.py (+2/-5) landscape/service.py (+3/-2) landscape/tests/helpers.py (+0/-8) landscape/tests/test_watchdog.py (+4/-2) |
||||
To merge this branch: | bzr merge lp:~free.ekanayaka/landscape-client/amp-cleanup-3 | ||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Christopher Armstrong (community) | Approve | ||
Geoff Teale (community) | Approve | ||
Review via email: mp+160374@code.launchpad.net |
Commit message
This is another branch in the a series that will make most client tests synchronous. In detail:
- Add a landscape.
client code around the generic low level landscape.lib.amp machinery. It basically
supports publishing a Python object over a Unix socket using AMP. It is the
server-side equivalent of RemoteComponent
- Make landscape.
landscape.
transport-
landscape.lib.amp should be transport-agnostic, providing plain Twisted factories
and protocols. It is a sort of replace-
- Create sub-classes of ComponentPublisher for each Landscape service that publishes an
object, this replaces the low-level sub-classing of protocols that we add before.
- Add a MethodCallServe
usedfor instantiating an AMP protocol with the appropriate "locator" parameter, that
will responde to L{MethodCall} commands send by clients.
Description of the change
This is another branch in the a series that will make most client tests synchronous. In detail:
- Add a landscape.
client code around the generic low level landscape.lib.amp machinery. It basically
supports publishing a Python object over a Unix socket using AMP. It is the
server-side equivalent of RemoteComponent
- Make landscape.
landscape.
transport-
landscape.lib.amp should be transport-agnostic, providing plain Twisted factories
and protocols. It is a sort of replace-
- Create sub-classes of ComponentPublisher for each Landscape service that publishes an
object, this replaces the low-level sub-classing of protocols that we add before.
- Add a MethodCallServe
usedfor instantiating an AMP protocol with the appropriate "locator" parameter, that
will responde to L{MethodCall} commands send by clients.
There are a few other little things here and there, if you're unsure about them, feel free to ask me.
Christopher Armstrong (radix) wrote : | # |
I'm taking this opportunity to generally get a better understanding of the AMP code, so there are some general comments here not directly related to your change.
[1] (non-branch) RemoteObject.
[2] MethodCallServe
[3] MethodCallServe
[4] Why didn't you delete RemoteObjectCon
[5] Do we usually put @params in class docstrings instead of their __init__? The docstring of RemoteComponent
[6] I think I would rename ComponentPublisher to UNIXComponentPu
[7] (non-branch) I don't really like the subclasses of ComponentPublisher, since all they do is declare which methods are acceptable. It seems like it'd be a lot nicer if the published-object could declare which methods are remote methods, perhaps through a decorator. Then, instead of having a e.g. BrokerServerPub
It's a good refactor. With the docstring comments addressed and a response to [4], I'm happy with merging.
Free Ekanayaka (free.ekanayaka) wrote : | # |
[1]
Yeah, I noticed that, and fixed it in a later amp-cleanup-N branch. Sorry if changes are a bit scattered, but I'm trying to keep diffs small. To have a feel of what it will finally look like, please see:
lp:~free.ekanayaka/landscape-client/amp-cleanup-9
If you don't mind I'll keep this unchanged in this branch, to avoid conflicts later on.
[2]
Fixed.
[3]
Fixed.
[4]
You're right, I didn't delete it simply because it's used in a number of tests in landscape/
[5]
This something I never quite understood myself. In many cases we do put @params in the class doscring for constructors (i.e. __init__), but in other we don't. We should probably set a rule. I think putting them directly under __init__ makes more sense.
[6]
Not quite sure, that would make sense if ComponentPublisher was a generic library, maybe with TCPComponentPub
[7]
Yeah, sub-classing is not that great, however I didn't want to to couple the published-object with the publisher (after all the published object doesn't need to know it's published). Perhaps an alternative solution would be to have convenience functions that replace those sub-classes. I'll try to think to some way to drop those sub-classes.
- 664. By Free Ekanayaka
-
Fix docstrings
Preview Diff
1 | === modified file 'landscape/amp.py' |
2 | --- landscape/amp.py 2013-04-19 08:26:00 +0000 |
3 | +++ landscape/amp.py 2013-04-29 11:43:28 +0000 |
4 | @@ -14,7 +14,8 @@ |
5 | import logging |
6 | |
7 | from landscape.lib.amp import ( |
8 | - MethodCallProtocol, MethodCallFactory, RemoteObjectConnector) |
9 | + MethodCallProtocol, MethodCallFactory, MethodCallServerFactory, |
10 | + RemoteObject) |
11 | |
12 | |
13 | class ComponentProtocol(MethodCallProtocol): |
14 | @@ -34,30 +35,64 @@ |
15 | initialDelay = 0.05 |
16 | |
17 | |
18 | -class RemoteComponentConnector(RemoteObjectConnector): |
19 | +class ComponentPublisher(object): |
20 | + """Publish a Landscape client component using a UNIX socket. |
21 | + |
22 | + Other Landscape client processes can then connect to the socket and invoke |
23 | + methods on the component remotely, using L{MethodCall} commands. |
24 | + |
25 | + @param component: The component to publish. It can be any Python object |
26 | + implementing the methods listed in the C{methods} class variable. |
27 | + @param reactor: The L{TwistedReactor} used to listen to the socket. |
28 | + @param config: The L{Configuration} object used to build the socket path. |
29 | + """ |
30 | + |
31 | + methods = ("ping", "exit") |
32 | + |
33 | + def __init__(self, component, reactor, config): |
34 | + self._reactor = reactor |
35 | + self._config = config |
36 | + self._component = component |
37 | + self._port = None |
38 | + |
39 | + def start(self): |
40 | + """Start accepting connections.""" |
41 | + factory = MethodCallServerFactory(self._component, self.methods) |
42 | + socket_path = _get_socket_path(self._component, self._config) |
43 | + self._port = self._reactor.listen_unix(socket_path, factory) |
44 | + |
45 | + def stop(self): |
46 | + """Stop accepting connections.""" |
47 | + return self._port.stopListening() |
48 | + |
49 | + |
50 | +class RemoteComponentConnector(object): |
51 | """Utility superclass for creating connections with a Landscape component. |
52 | |
53 | @cvar component: The class of the component to connect to, it is expected |
54 | to define a C{name} class attribute, which will be used to find out |
55 | the socket to use. It must be defined by sub-classes. |
56 | + @cvar factory: The factory class to use for building protocols. |
57 | + @cvar remote: The L{RemoteObject} class or sub-class used for building |
58 | + remote objects. |
59 | |
60 | @param reactor: A L{TwistedReactor} object. |
61 | @param config: A L{LandscapeConfiguration}. |
62 | - @param args: Positional arguments for protocol factory constructor. |
63 | - @param kwargs: Keyword arguments for protocol factory constructor. |
64 | + @param retry_on_reconnect: If C{True} the remote object built by this |
65 | + connector will retry L{MethodCall}s that failed due to lost |
66 | + connections. |
67 | |
68 | @see: L{MethodCallClientFactory}. |
69 | """ |
70 | - |
71 | + component = None # Must be defined by sub-classes |
72 | factory = ComponentProtocolFactory |
73 | + remote = RemoteObject |
74 | |
75 | def __init__(self, reactor, config, retry_on_reconnect=False): |
76 | - self._twisted_reactor = reactor |
77 | + self._reactor = reactor |
78 | + self._config = config |
79 | self._retry_on_reconnect = retry_on_reconnect |
80 | - socket = os.path.join(config.sockets_path, |
81 | - self.component.name + ".sock") |
82 | - super(RemoteComponentConnector, self).__init__( |
83 | - self._twisted_reactor._reactor, socket) |
84 | + self._connector = None |
85 | |
86 | def connect(self, max_retries=None, factor=None, quiet=False): |
87 | """Connect to the remote Landscape component. |
88 | @@ -73,28 +108,44 @@ |
89 | result in a faster reconnection attempts pace. |
90 | @param quiet: A boolean indicating whether to log errors. |
91 | """ |
92 | + reactor = self._reactor._reactor |
93 | + factory = self.factory(reactor) |
94 | + factory.retryOnReconnect = self._retry_on_reconnect |
95 | + factory.remote = self.remote |
96 | |
97 | def fire_reconnect(ignored): |
98 | - self._twisted_reactor.fire("%s-reconnect" % |
99 | - self.component.name) |
100 | + self._reactor.fire("%s-reconnect" % self.component.name) |
101 | |
102 | def connected(remote): |
103 | - # XXX temporary workaround till the AMP cleanup is completed, we |
104 | - # will use the factory directly then. |
105 | - remote._factory.retryOnReconnect = self._retry_on_reconnect |
106 | - self._factory.notifyOnConnect(fire_reconnect) |
107 | + factory.notifyOnConnect(fire_reconnect) |
108 | return remote |
109 | |
110 | def log_error(failure): |
111 | logging.error("Error while connecting to %s", self.component.name) |
112 | return failure |
113 | |
114 | - result = super(RemoteComponentConnector, self).connect( |
115 | - max_retries=max_retries, factor=factor) |
116 | + factory.maxRetries = max_retries |
117 | + if factor: |
118 | + factory.factor = factor |
119 | + socket_path = _get_socket_path(self.component, self._config) |
120 | + self._connector = reactor.connectUNIX(socket_path, factory) |
121 | + deferred = factory.getRemoteObject() |
122 | + |
123 | if not quiet: |
124 | - result.addErrback(log_error) |
125 | - result.addCallback(connected) |
126 | - return result |
127 | + deferred.addErrback(log_error) |
128 | + |
129 | + return deferred.addCallback(connected) |
130 | + |
131 | + def disconnect(self): |
132 | + """Disconnect the L{RemoteObject} that we have created.""" |
133 | + if self._connector is not None: |
134 | + factory = self._connector.factory |
135 | + factory.stopTrying() |
136 | + # XXX we should be using self._connector.disconnect() here |
137 | + remote = factory._remote |
138 | + if remote: |
139 | + if remote._sender.protocol.transport: |
140 | + remote._sender.protocol.transport.loseConnection() |
141 | |
142 | |
143 | class RemoteComponentsRegistry(object): |
144 | @@ -121,3 +172,7 @@ |
145 | that can be used to connect to a certain component. |
146 | """ |
147 | cls._by_name[connector_class.component.name] = connector_class |
148 | + |
149 | + |
150 | +def _get_socket_path(component, config): |
151 | + return os.path.join(config.sockets_path, component.name + ".sock") |
152 | |
153 | === modified file 'landscape/broker/amp.py' |
154 | --- landscape/broker/amp.py 2013-04-19 08:26:00 +0000 |
155 | +++ landscape/broker/amp.py 2013-04-29 11:43:28 +0000 |
156 | @@ -2,36 +2,31 @@ |
157 | |
158 | from landscape.lib.amp import RemoteObject, MethodCallArgument |
159 | from landscape.amp import ( |
160 | - ComponentProtocol, ComponentProtocolFactory, RemoteComponentConnector, |
161 | - RemoteComponentsRegistry) |
162 | + RemoteComponentConnector, RemoteComponentsRegistry, ComponentPublisher) |
163 | from landscape.broker.server import BrokerServer |
164 | from landscape.broker.client import BrokerClient |
165 | from landscape.monitor.monitor import Monitor |
166 | from landscape.manager.manager import Manager |
167 | |
168 | |
169 | -class BrokerServerProtocol(ComponentProtocol): |
170 | +class BrokerServerPublisher(ComponentPublisher): |
171 | """ |
172 | Communication protocol between the broker server and its clients. |
173 | """ |
174 | - methods = (ComponentProtocol.methods + |
175 | - ["fire_event", |
176 | - "get_accepted_message_types", |
177 | - "get_server_uuid", |
178 | - "is_message_pending", |
179 | - "register", |
180 | - "register_client", |
181 | - "register_client_accepted_message_type", |
182 | - "reload_configuration", |
183 | - "send_message", |
184 | - "stop_clients", |
185 | - "listen_events", |
186 | - "stop_exchanger"]) |
187 | - |
188 | - |
189 | -class BrokerServerProtocolFactory(ComponentProtocolFactory): |
190 | - |
191 | - protocol = BrokerServerProtocol |
192 | + |
193 | + methods = ComponentPublisher.methods + ( |
194 | + "fire_event", |
195 | + "get_accepted_message_types", |
196 | + "get_server_uuid", |
197 | + "is_message_pending", |
198 | + "register", |
199 | + "register_client", |
200 | + "register_client_accepted_message_type", |
201 | + "reload_configuration", |
202 | + "send_message", |
203 | + "stop_clients", |
204 | + "listen_events", |
205 | + "stop_exchanger") |
206 | |
207 | |
208 | class RemoteBroker(RemoteObject): |
209 | @@ -64,7 +59,6 @@ |
210 | def __init__(self, exchanger, message_store, broker_server): |
211 | self.exchanger = exchanger |
212 | self.message_store = message_store |
213 | - self.protocol = BrokerServerProtocol() |
214 | self.broker_server = broker_server |
215 | |
216 | def __getattr__(self, name): |
217 | @@ -73,7 +67,7 @@ |
218 | that they're encodable with AMP. |
219 | """ |
220 | original = getattr(self.broker_server, name, None) |
221 | - if (name in BrokerServerProtocol.methods |
222 | + if (name in BrokerServerPublisher.methods |
223 | and original is not None |
224 | and callable(original)): |
225 | def method(*args, **kwargs): |
226 | @@ -92,16 +86,13 @@ |
227 | return succeed(None) |
228 | |
229 | |
230 | -class BrokerClientProtocol(ComponentProtocol): |
231 | - """Communication protocol between a client and the broker.""" |
232 | - |
233 | - methods = (ComponentProtocol.methods + ["fire_event", "message"]) |
234 | - |
235 | - |
236 | -class BrokerClientProtocolFactory(ComponentProtocolFactory): |
237 | - |
238 | - protocol = BrokerClientProtocol |
239 | - remote = RemoteBroker |
240 | +class BrokerClientPublisher(ComponentPublisher): |
241 | + """ |
242 | + Communication protocol between the broker server and its clients. |
243 | + """ |
244 | + methods = ComponentPublisher.methods + ( |
245 | + "fire_event", |
246 | + "message") |
247 | |
248 | |
249 | class RemoteClient(RemoteObject): |
250 | @@ -111,7 +102,6 @@ |
251 | class RemoteBrokerConnector(RemoteComponentConnector): |
252 | """Helper to create connections with the L{BrokerServer}.""" |
253 | |
254 | - factory = BrokerClientProtocolFactory |
255 | remote = RemoteBroker |
256 | component = BrokerServer |
257 | |
258 | @@ -119,7 +109,6 @@ |
259 | class RemoteClientConnector(RemoteComponentConnector): |
260 | """Helper to create connections with the L{BrokerServer}.""" |
261 | |
262 | - factory = BrokerServerProtocolFactory |
263 | remote = RemoteClient |
264 | component = BrokerClient |
265 | |
266 | |
267 | === modified file 'landscape/broker/server.py' |
268 | --- landscape/broker/server.py 2013-03-31 22:44:46 +0000 |
269 | +++ landscape/broker/server.py 2013-04-29 11:43:28 +0000 |
270 | @@ -66,7 +66,7 @@ |
271 | """Register a broker client called C{name}. |
272 | |
273 | Various broker clients interact with the broker server, such as the |
274 | - monitor for example, using the L{BrokerServerProtocol} for performing |
275 | + monitor for example, using the L{BrokerServerConnector} for performing |
276 | remote method calls on the L{BrokerServer}. |
277 | |
278 | They establish connectivity with the broker by connecting and |
279 | |
280 | === modified file 'landscape/broker/service.py' |
281 | --- landscape/broker/service.py 2013-03-31 22:44:46 +0000 |
282 | +++ landscape/broker/service.py 2013-04-29 11:43:28 +0000 |
283 | @@ -11,7 +11,7 @@ |
284 | from landscape.broker.ping import Pinger |
285 | from landscape.broker.store import get_default_message_store |
286 | from landscape.broker.server import BrokerServer |
287 | -from landscape.broker.amp import BrokerServerProtocolFactory |
288 | +from landscape.broker.amp import BrokerServerPublisher |
289 | |
290 | |
291 | class BrokerService(LandscapeService): |
292 | @@ -65,21 +65,24 @@ |
293 | self.broker = BrokerServer(self.config, self.reactor, self.exchanger, |
294 | self.registration, self.message_store, |
295 | self.pinger) |
296 | - self.factory = BrokerServerProtocolFactory(object=self.broker) |
297 | + self.publisher = BrokerServerPublisher(self.broker, self.reactor, |
298 | + self.config) |
299 | |
300 | def startService(self): |
301 | """Start the broker. |
302 | |
303 | Create a L{BrokerServer} listening on C{broker_socket_path} for clients |
304 | - connecting with the L{BrokerClientProtocol}, and start the |
305 | + connecting with the L{BrokerServerConnector}, and start the |
306 | L{MessageExchange} and L{Pinger} services. |
307 | """ |
308 | super(BrokerService, self).startService() |
309 | + self.publisher.start() |
310 | self.exchanger.start() |
311 | self.pinger.start() |
312 | |
313 | def stopService(self): |
314 | """Stop the broker.""" |
315 | + self.publisher.stop() |
316 | self.exchanger.stop() |
317 | self.pinger.stop() |
318 | super(BrokerService, self).stopService() |
319 | |
320 | === modified file 'landscape/broker/tests/helpers.py' |
321 | --- landscape/broker/tests/helpers.py 2013-04-12 14:14:47 +0000 |
322 | +++ landscape/broker/tests/helpers.py 2013-04-29 11:43:28 +0000 |
323 | @@ -20,8 +20,7 @@ |
324 | from landscape.broker.config import BrokerConfiguration |
325 | from landscape.broker.server import BrokerServer |
326 | from landscape.broker.amp import ( |
327 | - BrokerServerProtocolFactory, BrokerClientProtocolFactory, |
328 | - RemoteBrokerConnector) |
329 | + BrokerServerPublisher, BrokerClientPublisher, RemoteBrokerConnector) |
330 | from landscape.broker.client import BrokerClient |
331 | |
332 | |
333 | @@ -176,13 +175,14 @@ |
334 | def set_up(self, test_case): |
335 | super(RemoteBrokerHelper, self).set_up(test_case) |
336 | |
337 | - factory = BrokerServerProtocolFactory(object=test_case.broker) |
338 | - socket = os.path.join(test_case.config.sockets_path, |
339 | - BrokerServer.name + ".sock") |
340 | - self._port = test_case.reactor.listen_unix(socket, factory) |
341 | + self._publisher = BrokerServerPublisher(test_case.broker, |
342 | + test_case.reactor, |
343 | + test_case.config) |
344 | self._connector = RemoteBrokerConnector(test_case.reactor, |
345 | test_case.config) |
346 | |
347 | + self._publisher.start() |
348 | + |
349 | def set_remote(remote): |
350 | test_case.remote = remote |
351 | return remote |
352 | @@ -192,7 +192,7 @@ |
353 | |
354 | def tear_down(self, test_case): |
355 | self._connector.disconnect() |
356 | - self._port.stopListening() |
357 | + self._publisher.stop() |
358 | super(RemoteBrokerHelper, self).tear_down(test_case) |
359 | |
360 | |
361 | @@ -246,11 +246,10 @@ |
362 | |
363 | def listen(ignored): |
364 | |
365 | - factory = BrokerClientProtocolFactory(object=test_case.client) |
366 | - socket = os.path.join(test_case.config.sockets_path, |
367 | - test_case.client.name + ".sock") |
368 | - self._client_port = test_case.client_reactor.listen_unix(socket, |
369 | - factory) |
370 | + self._client_publisher = BrokerClientPublisher(test_case.client, |
371 | + test_case.reactor, |
372 | + test_case.config) |
373 | + self._client_publisher.start() |
374 | result = test_case.remote.register_client("client") |
375 | return result.addCallback(set_remote_client) |
376 | |
377 | @@ -259,5 +258,5 @@ |
378 | |
379 | def tear_down(self, test_case): |
380 | self._client_connector.disconnect() |
381 | - self._client_port.stopListening() |
382 | + self._client_publisher.stop() |
383 | super(RemoteClientHelper, self).tear_down(test_case) |
384 | |
385 | === modified file 'landscape/broker/tests/test_service.py' |
386 | --- landscape/broker/tests/test_service.py 2013-03-31 22:44:46 +0000 |
387 | +++ landscape/broker/tests/test_service.py 2013-04-29 11:43:28 +0000 |
388 | @@ -80,5 +80,4 @@ |
389 | connected.addCallback(lambda remote: remote.get_server_uuid()) |
390 | connected.addCallback(lambda x: connector.disconnect()) |
391 | connected.addCallback(lambda x: self.service.stopService()) |
392 | - connected.addCallback(lambda x: self.service.port.stopListening()) |
393 | return connected |
394 | |
395 | === modified file 'landscape/lib/amp.py' |
396 | --- landscape/lib/amp.py 2013-04-23 13:50:29 +0000 |
397 | +++ landscape/lib/amp.py 2013-04-29 11:43:28 +0000 |
398 | @@ -46,7 +46,7 @@ |
399 | """ |
400 | |
401 | from twisted.internet.defer import Deferred, maybeDeferred, succeed |
402 | -from twisted.internet.protocol import ReconnectingClientFactory |
403 | +from twisted.internet.protocol import ServerFactory, ReconnectingClientFactory |
404 | from twisted.python.failure import Failure |
405 | |
406 | from twisted.protocols.amp import ( |
407 | @@ -423,6 +423,28 @@ |
408 | deferred=deferred, call=call) |
409 | |
410 | |
411 | +class MethodCallServerFactory(ServerFactory): |
412 | + """Expose a Python object using L{MethodCall} commands over C{AMP}.""" |
413 | + |
414 | + protocol = AMP |
415 | + |
416 | + def __init__(self, object, methods): |
417 | + """ |
418 | + @param object: The object exposed by the L{MethodCallProtocol}s |
419 | + instances created by this factory. |
420 | + @param methods: A list of the names of the methods that remote peers |
421 | + are allowed to call on the C{object} that we publish. |
422 | + """ |
423 | + self.object = object |
424 | + self.methods = methods |
425 | + |
426 | + def buildProtocol(self, addr): |
427 | + locator = MethodCallReceiver(self.object, self.methods) |
428 | + protocol = self.protocol(locator=locator) |
429 | + protocol.factory = self |
430 | + return protocol |
431 | + |
432 | + |
433 | class MethodCallClientFactory(ReconnectingClientFactory): |
434 | """ |
435 | Factory for L{MethodCallProtocol}s exposing an object or connecting to |
436 | @@ -550,6 +572,7 @@ |
437 | self._socket_path = socket_path |
438 | self._reactor = reactor |
439 | self._factory = None |
440 | + self._connector = None |
441 | |
442 | def connect(self, max_retries=None, factor=None): |
443 | """Connect to a remote object exposed by a L{MethodCallProtocol}. |
444 | @@ -563,18 +586,20 @@ |
445 | delay between subsequent retries should increase. Smaller values |
446 | result in a faster reconnection attempts pace. |
447 | """ |
448 | - self._factory = self.factory(reactor=self._reactor) |
449 | - self._factory.maxRetries = max_retries |
450 | + factory = self.factory(reactor=self._reactor) |
451 | + factory.maxRetries = max_retries |
452 | if factor: |
453 | - self._factory.factor = factor |
454 | - self._reactor.connectUNIX(self._socket_path, self._factory) |
455 | - return self._factory.getRemoteObject() |
456 | + factory.factor = factor |
457 | + self._connector = self._reactor.connectUNIX(self._socket_path, factory) |
458 | + return factory.getRemoteObject() |
459 | |
460 | def disconnect(self): |
461 | """Disconnect the L{RemoteObject} that we have created.""" |
462 | - if self._factory: |
463 | - self._factory.stopTrying() |
464 | - remote = self._factory._remote |
465 | + if self._connector is not None: |
466 | + factory = self._connector.factory |
467 | + factory.stopTrying() |
468 | + # XXX we should be using self._connector.disconnect() here |
469 | + remote = factory._remote |
470 | if remote: |
471 | if remote._sender.protocol.transport: |
472 | remote._sender.protocol.transport.loseConnection() |
473 | |
474 | === modified file 'landscape/lib/tests/test_amp.py' |
475 | --- landscape/lib/tests/test_amp.py 2013-04-23 13:41:26 +0000 |
476 | +++ landscape/lib/tests/test_amp.py 2013-04-29 11:43:28 +0000 |
477 | @@ -1,5 +1,4 @@ |
478 | from twisted.internet import reactor |
479 | -from twisted.internet.protocol import Factory |
480 | from twisted.internet.defer import Deferred, DeferredList |
481 | from twisted.internet.error import ConnectionDone, ConnectError |
482 | from twisted.internet.task import Clock |
483 | @@ -7,8 +6,9 @@ |
484 | from twisted.python.failure import Failure |
485 | |
486 | from landscape.lib.amp import ( |
487 | - MethodCallError, MethodCallProtocol, MethodCallClientFactory, RemoteObject, |
488 | - RemoteObjectConnector, MethodCallReceiver, MethodCallSender) |
489 | + MethodCallError, MethodCallProtocol, MethodCallServerFactory, |
490 | + MethodCallClientFactory, RemoteObject, RemoteObjectConnector, |
491 | + MethodCallReceiver, MethodCallSender) |
492 | from landscape.tests.helpers import LandscapeTest |
493 | |
494 | |
495 | @@ -613,9 +613,7 @@ |
496 | def setUp(self): |
497 | super(RemoteObjectConnectorTest, self).setUp() |
498 | self.socket = self.mktemp() |
499 | - self.server_factory = Factory() |
500 | - self.server_factory.protocol = lambda: ( |
501 | - AMP(locator=MethodCallReceiver(Words(reactor), METHODS))) |
502 | + self.server_factory = MethodCallServerFactory(Words(reactor), METHODS) |
503 | self.port = reactor.listenUNIX(self.socket, self.server_factory) |
504 | self.connector = RemoteWordsConnector(reactor, self.socket) |
505 | |
506 | @@ -662,7 +660,7 @@ |
507 | self.connector.disconnect() |
508 | |
509 | def assert_factor(ignored): |
510 | - self.assertEqual(self.connector._factory.factor, 1.0) |
511 | + self.assertEqual(self.connector._connector.factory.factor, 1.0) |
512 | |
513 | result = self.connector.connect(factor=1.0) |
514 | return result.addCallback(assert_factor) |
515 | @@ -769,8 +767,8 @@ |
516 | reactor.callLater(0, self.words._retry) |
517 | |
518 | # Restore the real handler and start listening again very soon |
519 | - self.connector._factory.dontNotifyOnConnect(handle_connect) |
520 | - self.connector._factory.notifyOnConnect(real_handle_connect) |
521 | + factory.dontNotifyOnConnect(handle_connect) |
522 | + factory.notifyOnConnect(real_handle_connect) |
523 | reactor.callLater(0.2, restart_listening) |
524 | |
525 | def restart_listening(): |
526 | @@ -780,8 +778,9 @@ |
527 | self.assertEqual(str(error), "Forbidden method 'secret'") |
528 | |
529 | # Use our own reconnect handler |
530 | - self.connector._factory.dontNotifyOnConnect(real_handle_connect) |
531 | - self.connector._factory.notifyOnConnect(handle_connect) |
532 | + factory = self.connector._connector.factory |
533 | + factory.dontNotifyOnConnect(real_handle_connect) |
534 | + factory.notifyOnConnect(handle_connect) |
535 | |
536 | reactor.callLater(0.2, restart_listening) |
537 | result = self.words.secret() |
538 | |
539 | === modified file 'landscape/manager/service.py' |
540 | --- landscape/manager/service.py 2010-04-12 12:46:07 +0000 |
541 | +++ landscape/manager/service.py 2013-04-29 11:43:28 +0000 |
542 | @@ -2,8 +2,7 @@ |
543 | |
544 | from landscape.service import LandscapeService, run_landscape_service |
545 | from landscape.manager.config import ManagerConfiguration |
546 | -from landscape.broker.amp import ( |
547 | - BrokerClientProtocolFactory, RemoteBrokerConnector) |
548 | +from landscape.broker.amp import BrokerClientPublisher, RemoteBrokerConnector |
549 | from landscape.manager.manager import Manager |
550 | |
551 | |
552 | @@ -19,7 +18,8 @@ |
553 | super(ManagerService, self).__init__(config) |
554 | self.plugins = self.get_plugins() |
555 | self.manager = Manager(self.reactor, self.config) |
556 | - self.factory = BrokerClientProtocolFactory(object=self.manager) |
557 | + self.publisher = BrokerClientPublisher(self.manager, self.reactor, |
558 | + self.config) |
559 | |
560 | def get_plugins(self): |
561 | """Return instances of all the plugins enabled in the configuration.""" |
562 | @@ -37,6 +37,7 @@ |
563 | - Add all configured plugins, that will in turn register themselves. |
564 | """ |
565 | super(ManagerService, self).startService() |
566 | + self.publisher.start() |
567 | |
568 | def start_plugins(broker): |
569 | self.broker = broker |
570 | @@ -52,6 +53,7 @@ |
571 | def stopService(self): |
572 | """Stop the manager and close the connection with the broker.""" |
573 | self.connector.disconnect() |
574 | + self.publisher.stop() |
575 | super(ManagerService, self).stopService() |
576 | |
577 | |
578 | |
579 | === modified file 'landscape/manager/tests/test_service.py' |
580 | --- landscape/manager/tests/test_service.py 2011-12-02 08:35:25 +0000 |
581 | +++ landscape/manager/tests/test_service.py 2013-04-29 11:43:28 +0000 |
582 | @@ -48,9 +48,7 @@ |
583 | [connector] = self.broker_service.broker.get_connectors() |
584 | connector.disconnect() |
585 | self.service.stopService() |
586 | - self.service.port.stopListening() |
587 | self.broker_service.stopService() |
588 | - self.broker_service.port.stopListening() |
589 | |
590 | def assert_broker_connection(ignored): |
591 | self.assertEqual(len(self.broker_service.broker.get_clients()), 1) |
592 | |
593 | === modified file 'landscape/monitor/service.py' |
594 | --- landscape/monitor/service.py 2010-03-19 08:29:46 +0000 |
595 | +++ landscape/monitor/service.py 2013-04-29 11:43:28 +0000 |
596 | @@ -7,8 +7,7 @@ |
597 | from landscape.service import LandscapeService, run_landscape_service |
598 | from landscape.monitor.config import MonitorConfiguration |
599 | from landscape.monitor.monitor import Monitor |
600 | -from landscape.broker.amp import ( |
601 | - BrokerClientProtocolFactory, RemoteBrokerConnector) |
602 | +from landscape.broker.amp import BrokerClientPublisher, RemoteBrokerConnector |
603 | |
604 | |
605 | class MonitorService(LandscapeService): |
606 | @@ -26,7 +25,8 @@ |
607 | self.plugins = self.get_plugins() |
608 | self.monitor = Monitor(self.reactor, self.config, self.persist, |
609 | persist_filename=self.persist_filename) |
610 | - self.factory = BrokerClientProtocolFactory(object=self.monitor) |
611 | + self.publisher = BrokerClientPublisher(self.monitor, self.reactor, |
612 | + self.config) |
613 | |
614 | def get_plugins(self): |
615 | return [namedClass("landscape.monitor.%s.%s" |
616 | @@ -36,6 +36,7 @@ |
617 | def startService(self): |
618 | """Start the monitor.""" |
619 | super(MonitorService, self).startService() |
620 | + self.publisher.start() |
621 | |
622 | def start_plugins(broker): |
623 | self.broker = broker |
624 | @@ -54,6 +55,7 @@ |
625 | The monitor is flushed to ensure that things like persist databases |
626 | get saved to disk. |
627 | """ |
628 | + self.publisher.stop() |
629 | self.monitor.flush() |
630 | self.connector.disconnect() |
631 | super(MonitorService, self).stopService() |
632 | |
633 | === modified file 'landscape/monitor/tests/test_service.py' |
634 | --- landscape/monitor/tests/test_service.py 2012-03-05 14:11:42 +0000 |
635 | +++ landscape/monitor/tests/test_service.py 2013-04-29 11:43:28 +0000 |
636 | @@ -58,9 +58,7 @@ |
637 | [connector] = self.broker_service.broker.get_connectors() |
638 | connector.disconnect() |
639 | self.service.stopService() |
640 | - self.service.port.stopListening() |
641 | self.broker_service.stopService() |
642 | - self.broker_service.port.stopListening() |
643 | |
644 | def assert_broker_connection(ignored): |
645 | self.assertEqual(len(self.broker_service.broker.get_clients()), 1) |
646 | @@ -81,8 +79,7 @@ |
647 | self.service.monitor.flush() |
648 | self.service.connector = self.mocker.mock() |
649 | self.service.connector.disconnect() |
650 | - self.service.port = self.mocker.mock() |
651 | - self.service.port.stopListening() |
652 | + self.service.publisher = self.mocker.mock() |
653 | + self.service.publisher.stop() |
654 | self.mocker.replay() |
655 | self.service.stopService() |
656 | - self.service.port.stopListening() |
657 | |
658 | === modified file 'landscape/service.py' |
659 | --- landscape/service.py 2013-02-12 16:26:34 +0000 |
660 | +++ landscape/service.py 2013-04-29 11:43:28 +0000 |
661 | @@ -8,7 +8,7 @@ |
662 | from landscape.log import rotate_logs |
663 | from landscape.reactor import TwistedReactor |
664 | from landscape.deployment import get_versioned_persist, init_logging |
665 | -from landscape.amp import ComponentProtocol |
666 | +from landscape.amp import ComponentProtocol, ComponentPublisher |
667 | |
668 | |
669 | class LandscapeService(Service, object): |
670 | @@ -50,7 +50,8 @@ |
671 | Service.startService(self) |
672 | logging.info("%s started with config %s" % ( |
673 | self.service_name.capitalize(), self.config.get_config_filename())) |
674 | - self.port = self.reactor.listen_unix(self.socket, self.factory) |
675 | + if hasattr(self, "factory"): |
676 | + self.port = self.reactor.listen_unix(self.socket, self.factory) |
677 | |
678 | def stopService(self): |
679 | # We don't need to call port.stopListening(), because the reactor |
680 | |
681 | === modified file 'landscape/tests/helpers.py' |
682 | --- landscape/tests/helpers.py 2013-04-15 07:32:54 +0000 |
683 | +++ landscape/tests/helpers.py 2013-04-29 11:43:28 +0000 |
684 | @@ -345,14 +345,6 @@ |
685 | reactor_factory = FakeReactor |
686 | transport_factory = FakeTransport |
687 | |
688 | - def stopService(service): |
689 | - # We need to explictely stop listening to the socket |
690 | - # because the reactor would still have active selectables |
691 | - # at the end of the test otherwise |
692 | - if os.path.exists(service.port.port): |
693 | - service.port.connectionLost(None) |
694 | - super(FakeBrokerService, service).stopService() |
695 | - |
696 | test_case.broker_service = FakeBrokerService(config) |
697 | test_case.remote = FakeRemoteBroker( |
698 | test_case.broker_service.exchanger, |
699 | |
700 | === modified file 'landscape/tests/test_watchdog.py' |
701 | --- landscape/tests/test_watchdog.py 2013-02-12 16:26:34 +0000 |
702 | +++ landscape/tests/test_watchdog.py 2013-04-29 11:43:28 +0000 |
703 | @@ -1453,7 +1453,8 @@ |
704 | |
705 | sys.path = %(path)r |
706 | |
707 | -from landscape.broker.amp import BrokerServerProtocolFactory |
708 | +from landscape.lib.amp import MethodCallServerFactory |
709 | +from landscape.broker.amp import BrokerServerPublisher |
710 | |
711 | class StubBroker(object): |
712 | |
713 | @@ -1464,7 +1465,8 @@ |
714 | reactor.callLater(1, reactor.stop) |
715 | |
716 | stub_broker = StubBroker() |
717 | -factory = BrokerServerProtocolFactory(object=stub_broker) |
718 | +methods = BrokerServerPublisher.methods |
719 | +factory = MethodCallServerFactory(stub_broker, methods) |
720 | reactor.listenUNIX(%(socket)r, factory) |
721 | reactor.run() |
722 | """ |
+1 Looking good