Merge lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes into lp:~maas-committers/maas/trunk
- try-not-to-spawn-loads-of-observation-processes
- Merge into trunk
Status: | Merged |
---|---|
Approved by: | Mike Pontillo |
Approved revision: | no longer in the source branch. |
Merged at revision: | 5344 |
Proposed branch: | lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes |
Merge into: | lp:~maas-committers/maas/trunk |
Prerequisite: | lp:~allenap/maas/drive-bys-2016-09-06 |
Diff against target: |
469 lines (+180/-78) 2 files modified
src/provisioningserver/utils/services.py (+95/-33) src/provisioningserver/utils/tests/test_services.py (+85/-45) |
To merge this branch: | bzr merge lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Mike Pontillo (community) | Approve | ||
Gavin Panella | Pending | ||
Review via email: mp+305505@code.launchpad.net |
This proposal supersedes a proposal from 2016-09-09.
Commit message
Fix bug that caused a large number of observation processes to spawn in certain situations. (Especially when observation processes die, or interface configuration changes.)
* Add unit test for runaway process issue.
* Move responsibility for managing Deferred (used to indicate when the service has stopped) from the ProcessProtocol implementation to the ProcessProtocol
* Refactor unit tests to account for the new design.
* Fix a typo that could have caused reconfigured interfaces to fail to spawn.
Description of the change
I've been testing this end-to-end all day, and LaMont verified (on an earlier version of this branch) that the code is working much better now.
This branch should ship with the next alpha to increase reliability. Without it, we can spawn processes every minute or so and eventually use up all the resources on a MAAS rack.
Gavin Panella (allenap) wrote : Posted in a previous version of this proposal | # |
Mike Pontillo (mpontillo) wrote : Posted in a previous version of this proposal | # |
Thanks for taking a look, Gavin!
Mike Pontillo (mpontillo) wrote : Posted in a previous version of this proposal | # |
Setting this to depend on lp:~allenap/maas/drive-bys-2016-09-06 since there is a merge conflict.
Mike Pontillo (mpontillo) wrote : | # |
Self-approving already-approved branch.
Preview Diff
1 | === modified file 'src/provisioningserver/utils/services.py' | |||
2 | --- src/provisioningserver/utils/services.py 2016-09-12 17:50:38 +0000 | |||
3 | +++ src/provisioningserver/utils/services.py 2016-09-12 17:50:38 +0000 | |||
4 | @@ -35,7 +35,10 @@ | |||
5 | 35 | Deferred, | 35 | Deferred, |
6 | 36 | maybeDeferred, | 36 | maybeDeferred, |
7 | 37 | ) | 37 | ) |
9 | 38 | from twisted.internet.error import ProcessDone | 38 | from twisted.internet.error import ( |
10 | 39 | ProcessDone, | ||
11 | 40 | ProcessExitedAlready, | ||
12 | 41 | ) | ||
13 | 39 | from twisted.internet.protocol import ProcessProtocol | 42 | from twisted.internet.protocol import ProcessProtocol |
14 | 40 | from twisted.internet.threads import deferToThread | 43 | from twisted.internet.threads import deferToThread |
15 | 41 | from twisted.python import log | 44 | from twisted.python import log |
16 | @@ -51,22 +54,28 @@ | |||
17 | 51 | 54 | ||
18 | 52 | def __init__(self, callback=None): | 55 | def __init__(self, callback=None): |
19 | 53 | super().__init__() | 56 | super().__init__() |
24 | 54 | self.done = Deferred() | 57 | if callback is None: |
25 | 55 | self.callback = callback | 58 | self._callback = lambda result: None |
26 | 56 | self.outbuf = b'' | 59 | else: |
27 | 57 | self.errbuf = b'' | 60 | self._callback = callback |
28 | 61 | self._done = lambda result: None | ||
29 | 62 | self._outbuf = b'' | ||
30 | 63 | self._errbuf = b'' | ||
31 | 64 | |||
32 | 65 | def setDoneCallback(self, done): | ||
33 | 66 | self._done = done | ||
34 | 58 | 67 | ||
35 | 59 | def connectionMade(self): | 68 | def connectionMade(self): |
38 | 60 | self.outbuf = b'' | 69 | self._outbuf = b'' |
39 | 61 | self.errbuf = b'' | 70 | self._errbuf = b'' |
40 | 62 | 71 | ||
41 | 63 | def outReceived(self, data): | 72 | def outReceived(self, data): |
43 | 64 | lines, self.outbuf = self.splitLines(self.outbuf + data) | 73 | lines, self._outbuf = self.splitLines(self._outbuf + data) |
44 | 65 | for line in lines: | 74 | for line in lines: |
45 | 66 | self.outLineReceived(line) | 75 | self.outLineReceived(line) |
46 | 67 | 76 | ||
47 | 68 | def errReceived(self, data): | 77 | def errReceived(self, data): |
49 | 69 | lines, self.errbuf = self.splitLines(self.errbuf + data) | 78 | lines, self._errbuf = self.splitLines(self._errbuf + data) |
50 | 70 | for line in lines: | 79 | for line in lines: |
51 | 71 | self.errLineReceived(line) | 80 | self.errLineReceived(line) |
52 | 72 | 81 | ||
53 | @@ -94,19 +103,21 @@ | |||
54 | 94 | self.objectReceived(obj) | 103 | self.objectReceived(obj) |
55 | 95 | 104 | ||
56 | 96 | def objectReceived(self, obj): | 105 | def objectReceived(self, obj): |
58 | 97 | self.callback([obj]) | 106 | self._callback([obj]) |
59 | 98 | 107 | ||
60 | 99 | def errLineReceived(self, line): | 108 | def errLineReceived(self, line): |
61 | 100 | line = line.decode("utf-8") | 109 | line = line.decode("utf-8") |
62 | 101 | log.msg(line.rstrip()) | 110 | log.msg(line.rstrip()) |
63 | 102 | 111 | ||
64 | 103 | def processEnded(self, reason): | 112 | def processEnded(self, reason): |
67 | 104 | if len(self.errbuf) != 0: | 113 | if len(self._errbuf) != 0: |
68 | 105 | self.errLineReceived(self.errbuf) | 114 | self.errLineReceived(self._errbuf) |
69 | 115 | # If the process finished normally, call the _done callback with | ||
70 | 116 | # None. Otherwise, pass the reason through. | ||
71 | 106 | if reason.check(ProcessDone): | 117 | if reason.check(ProcessDone): |
73 | 107 | self.done.callback(None) | 118 | self._done(None) |
74 | 108 | else: | 119 | else: |
76 | 109 | self.done.errback(reason) | 120 | self._done(reason) |
77 | 110 | 121 | ||
78 | 111 | 122 | ||
79 | 112 | class NeighbourObservationProtocol(JSONPerLineProtocol): | 123 | class NeighbourObservationProtocol(JSONPerLineProtocol): |
80 | @@ -126,25 +137,37 @@ | |||
81 | 126 | 137 | ||
82 | 127 | class ProcessProtocolService(TimerService, metaclass=ABCMeta): | 138 | class ProcessProtocolService(TimerService, metaclass=ABCMeta): |
83 | 128 | 139 | ||
89 | 129 | def __init__(self, description, protocol, interval=60.0): | 140 | def __init__(self, interval=60.0, reactor_process=None): |
90 | 130 | assert protocol is not None | 141 | self.deferredProcessEnded = None |
86 | 131 | assert description is not None | ||
87 | 132 | self.description = description | ||
88 | 133 | self.protocol = protocol | ||
91 | 134 | self.process = None | 142 | self.process = None |
92 | 143 | if reactor_process is not None: | ||
93 | 144 | self.reactor_process = reactor_process | ||
94 | 145 | else: | ||
95 | 146 | self.reactor_process = reactor | ||
96 | 135 | super().__init__(interval, self.startProcess) | 147 | super().__init__(interval, self.startProcess) |
97 | 136 | 148 | ||
98 | 137 | @deferred | 149 | @deferred |
99 | 138 | def startProcess(self): | 150 | def startProcess(self): |
100 | 139 | env = select_c_utf8_bytes_locale() | 151 | env = select_c_utf8_bytes_locale() |
102 | 140 | log.msg("%s started." % self.description) | 152 | log.msg("%s started." % self.getDescription()) |
103 | 141 | args = self.getProcessParameters() | 153 | args = self.getProcessParameters() |
104 | 154 | self.deferredProcessEnded = Deferred() | ||
105 | 155 | protocol = self.createProcessProtocol() | ||
106 | 156 | protocol.setDoneCallback(self.processEnded) | ||
107 | 142 | assert all(isinstance(arg, bytes) for arg in args), ( | 157 | assert all(isinstance(arg, bytes) for arg in args), ( |
108 | 143 | "Process arguments must all be bytes, got: %s" % repr(args)) | 158 | "Process arguments must all be bytes, got: %s" % repr(args)) |
113 | 144 | self.process = reactor.spawnProcess( | 159 | self.process = self.reactor_process.spawnProcess( |
114 | 145 | self.protocol, args[0], args, env=env) | 160 | protocol, args[0], args, env=env) |
115 | 146 | return self.protocol.done.addErrback( | 161 | return self.deferredProcessEnded |
116 | 147 | log.err, "%s failed." % self.description) | 162 | |
117 | 163 | def processEnded(self, failure): | ||
118 | 164 | if failure is None: | ||
119 | 165 | log.msg("%s ended normally." % self.getDescription()) | ||
120 | 166 | else: | ||
121 | 167 | log.err(failure, "%s failed." % self.getDescription()) | ||
122 | 168 | # We don't need to call the errback, because we don't want the service | ||
123 | 169 | # to crash with an unhandled error. | ||
124 | 170 | self.deferredProcessEnded.callback(None) | ||
125 | 148 | 171 | ||
126 | 149 | @abstractmethod | 172 | @abstractmethod |
127 | 150 | def getProcessParameters(self): | 173 | def getProcessParameters(self): |
128 | @@ -153,21 +176,51 @@ | |||
129 | 153 | This MUST be overridden in subclasses. | 176 | This MUST be overridden in subclasses. |
130 | 154 | """ | 177 | """ |
131 | 155 | 178 | ||
132 | 179 | @abstractmethod | ||
133 | 180 | def getDescription(self): | ||
134 | 181 | """Return the description of this process, suitable to use in verbose | ||
135 | 182 | logging. | ||
136 | 183 | |||
137 | 184 | This MUST be overridden in subclasses. | ||
138 | 185 | """ | ||
139 | 186 | |||
140 | 187 | @abstractmethod | ||
141 | 188 | def createProcessProtocol(self): | ||
142 | 189 | """ | ||
143 | 190 | Creates and returns the ProcessProtocol that will be used to | ||
144 | 191 | communicate with the process. | ||
145 | 192 | |||
146 | 193 | This MUST be overridden in subclasses. | ||
147 | 194 | """ | ||
148 | 195 | |||
149 | 196 | def stopProcess(self): | ||
150 | 197 | try: | ||
151 | 198 | self.process.signalProcess("INT") | ||
152 | 199 | except ProcessExitedAlready: | ||
153 | 200 | pass | ||
154 | 201 | |||
155 | 156 | def stopService(self): | 202 | def stopService(self): |
156 | 157 | """Stops the neighbour observation service.""" | 203 | """Stops the neighbour observation service.""" |
157 | 158 | if self.process is not None: | 204 | if self.process is not None: |
158 | 159 | self.process.loseConnection() | 205 | self.process.loseConnection() |
159 | 206 | # We don't care about the result; we just want to make sure the | ||
160 | 207 | # process is dead. | ||
161 | 208 | # XXX this causes us to log errors such as: | ||
162 | 209 | # mDNS resolver process failed. | ||
163 | 210 | # reactor.callFromThread(self.stopProcess) | ||
164 | 160 | return super().stopService() | 211 | return super().stopService() |
165 | 161 | 212 | ||
166 | 162 | 213 | ||
167 | 163 | class NeighbourDiscoveryService(ProcessProtocolService): | 214 | class NeighbourDiscoveryService(ProcessProtocolService): |
168 | 164 | """Service to spawn the per-interface device discovery subprocess.""" | 215 | """Service to spawn the per-interface device discovery subprocess.""" |
169 | 165 | 216 | ||
171 | 166 | def __init__(self, ifname: str, callback): | 217 | def __init__(self, ifname: str, callback: callable): |
172 | 167 | self.ifname = ifname | 218 | self.ifname = ifname |
176 | 168 | description = "Neighbour observation process for %s" % ifname | 219 | self.callback = callback |
177 | 169 | protocol = NeighbourObservationProtocol(ifname, callback=callback) | 220 | super().__init__() |
178 | 170 | super().__init__(description=description, protocol=protocol) | 221 | |
179 | 222 | def getDescription(self) -> str: | ||
180 | 223 | return "Neighbour observation process for %s" % self.ifname | ||
181 | 171 | 224 | ||
182 | 172 | def getProcessParameters(self): | 225 | def getProcessParameters(self): |
183 | 173 | maas_rack_cmd = get_maas_provision_command().encode("utf-8") | 226 | maas_rack_cmd = get_maas_provision_command().encode("utf-8") |
184 | @@ -177,14 +230,20 @@ | |||
185 | 177 | self.ifname.encode("utf-8") | 230 | self.ifname.encode("utf-8") |
186 | 178 | ] | 231 | ] |
187 | 179 | 232 | ||
188 | 233 | def createProcessProtocol(self): | ||
189 | 234 | return NeighbourObservationProtocol( | ||
190 | 235 | self.ifname, callback=self.callback) | ||
191 | 236 | |||
192 | 180 | 237 | ||
193 | 181 | class MDNSResolverService(ProcessProtocolService): | 238 | class MDNSResolverService(ProcessProtocolService): |
194 | 182 | """Service to spawn the per-interface device discovery subprocess.""" | 239 | """Service to spawn the per-interface device discovery subprocess.""" |
195 | 183 | 240 | ||
196 | 184 | def __init__(self, callback): | 241 | def __init__(self, callback): |
200 | 185 | protocol = JSONPerLineProtocol(callback=callback) | 242 | self.callback = callback |
201 | 186 | super().__init__( | 243 | super().__init__() |
202 | 187 | description="mDNS resolver process", protocol=protocol) | 244 | |
203 | 245 | def getDescription(self): | ||
204 | 246 | return "mDNS observation process" | ||
205 | 188 | 247 | ||
206 | 189 | def getProcessParameters(self): | 248 | def getProcessParameters(self): |
207 | 190 | maas_rack_cmd = get_maas_provision_command().encode("utf-8") | 249 | maas_rack_cmd = get_maas_provision_command().encode("utf-8") |
208 | @@ -193,6 +252,9 @@ | |||
209 | 193 | b"observe-mdns", | 252 | b"observe-mdns", |
210 | 194 | ] | 253 | ] |
211 | 195 | 254 | ||
212 | 255 | def createProcessProtocol(self): | ||
213 | 256 | return JSONPerLineProtocol(callback=self.callback) | ||
214 | 257 | |||
215 | 196 | 258 | ||
216 | 197 | class NetworksMonitoringService(MultiService, metaclass=ABCMeta): | 259 | class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
217 | 198 | """Service to monitor network interfaces for configuration changes. | 260 | """Service to monitor network interfaces for configuration changes. |
218 | @@ -379,7 +441,7 @@ | |||
219 | 379 | for ifname in new_interfaces: | 441 | for ifname in new_interfaces: |
220 | 380 | # Sanity check to ensure the service isn't already started. | 442 | # Sanity check to ensure the service isn't already started. |
221 | 381 | try: | 443 | try: |
223 | 382 | self.getServiceNamed("neighbour_disovery:" + ifname) | 444 | self.getServiceNamed("neighbour_discovery:" + ifname) |
224 | 383 | except KeyError: | 445 | except KeyError: |
225 | 384 | # This is an expected exception. (The call inside the `try` | 446 | # This is an expected exception. (The call inside the `try` |
226 | 385 | # is only necessary to ensure the service doesn't exist.) | 447 | # is only necessary to ensure the service doesn't exist.) |
227 | @@ -389,7 +451,7 @@ | |||
228 | 389 | """Stop monitoring services for the specified set of interfaces.""" | 451 | """Stop monitoring services for the specified set of interfaces.""" |
229 | 390 | for ifname in deleted_interfaces: | 452 | for ifname in deleted_interfaces: |
230 | 391 | try: | 453 | try: |
232 | 392 | service = self.getServiceNamed("neighbour_disovery:" + ifname) | 454 | service = self.getServiceNamed("neighbour_discovery:" + ifname) |
233 | 393 | except KeyError: | 455 | except KeyError: |
234 | 394 | # Service doesn't exist, so no need to stop it. | 456 | # Service doesn't exist, so no need to stop it. |
235 | 395 | pass | 457 | pass |
236 | 396 | 458 | ||
237 | === modified file 'src/provisioningserver/utils/tests/test_services.py' | |||
238 | --- src/provisioningserver/utils/tests/test_services.py 2016-09-12 17:50:38 +0000 | |||
239 | +++ src/provisioningserver/utils/tests/test_services.py 2016-09-12 17:50:38 +0000 | |||
240 | @@ -47,6 +47,7 @@ | |||
241 | 47 | from twisted.application.service import MultiService | 47 | from twisted.application.service import MultiService |
242 | 48 | from twisted.internet import reactor | 48 | from twisted.internet import reactor |
243 | 49 | from twisted.internet.defer import ( | 49 | from twisted.internet.defer import ( |
244 | 50 | Deferred, | ||
245 | 50 | DeferredQueue, | 51 | DeferredQueue, |
246 | 51 | inlineCallbacks, | 52 | inlineCallbacks, |
247 | 52 | succeed, | 53 | succeed, |
248 | @@ -56,6 +57,7 @@ | |||
249 | 56 | ProcessExitedAlready, | 57 | ProcessExitedAlready, |
250 | 57 | ProcessTerminated, | 58 | ProcessTerminated, |
251 | 58 | ) | 59 | ) |
252 | 60 | from twisted.internet.task import Clock | ||
253 | 59 | from twisted.python import threadable | 61 | from twisted.python import threadable |
254 | 60 | from twisted.python.failure import Failure | 62 | from twisted.python.failure import Failure |
255 | 61 | 63 | ||
256 | @@ -282,12 +284,25 @@ | |||
257 | 282 | 284 | ||
258 | 283 | run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) | 285 | run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
259 | 284 | 286 | ||
260 | 287 | def make_deferred_callback(self): | ||
261 | 288 | d = Deferred() | ||
262 | 289 | |||
263 | 290 | def callback(result): | ||
264 | 291 | if result is None: | ||
265 | 292 | d.callback(None) | ||
266 | 293 | else: | ||
267 | 294 | d.errback(result) | ||
268 | 295 | |||
269 | 296 | return d, callback | ||
270 | 297 | |||
271 | 285 | @inlineCallbacks | 298 | @inlineCallbacks |
272 | 286 | def test__propagates_exit_errors(self): | 299 | def test__propagates_exit_errors(self): |
273 | 287 | proto = JSONPerLineProtocol(callback=lambda json: None) | 300 | proto = JSONPerLineProtocol(callback=lambda json: None) |
274 | 301 | d, callback = self.make_deferred_callback() | ||
275 | 302 | proto.setDoneCallback(callback) | ||
276 | 288 | reactor.spawnProcess(proto, b"false", (b"false",)) | 303 | reactor.spawnProcess(proto, b"false", (b"false",)) |
277 | 289 | with ExpectedException(ProcessTerminated, ".* exit code 1"): | 304 | with ExpectedException(ProcessTerminated, ".* exit code 1"): |
279 | 290 | yield proto.done | 305 | yield d |
280 | 291 | 306 | ||
281 | 292 | def test__parses_only_full_lines(self): | 307 | def test__parses_only_full_lines(self): |
282 | 293 | callback = Mock() | 308 | callback = Mock() |
283 | @@ -357,13 +372,16 @@ | |||
284 | 357 | proto.processEnded(Failure(ProcessDone(0))) | 372 | proto.processEnded(Failure(ProcessDone(0))) |
285 | 358 | self.assertThat(logger.output, Equals(message)) | 373 | self.assertThat(logger.output, Equals(message)) |
286 | 359 | 374 | ||
287 | 375 | @inlineCallbacks | ||
288 | 360 | def test__propagates_errors_from_command(self): | 376 | def test__propagates_errors_from_command(self): |
291 | 361 | callback = Mock() | 377 | d, callback = self.make_deferred_callback() |
292 | 362 | proto = JSONPerLineProtocol(callback=callback) | 378 | proto = JSONPerLineProtocol() |
293 | 379 | proto.setDoneCallback(callback) | ||
294 | 363 | proto.connectionMade() | 380 | proto.connectionMade() |
295 | 364 | reason = Failure(ProcessTerminated(1)) | 381 | reason = Failure(ProcessTerminated(1)) |
296 | 365 | proto.processEnded(reason) | 382 | proto.processEnded(reason) |
298 | 366 | self.assertRaises(ProcessTerminated, extract_result, proto.done) | 383 | with ExpectedException(ProcessTerminated): |
299 | 384 | yield d | ||
300 | 367 | 385 | ||
301 | 368 | 386 | ||
302 | 369 | class TestNeighbourObservationProtocol(MAASTestCase): | 387 | class TestNeighbourObservationProtocol(MAASTestCase): |
303 | @@ -381,25 +399,38 @@ | |||
304 | 381 | callback, MockCallsMatch(call([{"interface": ifname}]))) | 399 | callback, MockCallsMatch(call([{"interface": ifname}]))) |
305 | 382 | 400 | ||
306 | 383 | 401 | ||
308 | 384 | class TrueProcessProtocolService(ProcessProtocolService): | 402 | class MockProcessProtocolService(ProcessProtocolService): |
309 | 403 | |||
310 | 404 | def __init__(self): | ||
311 | 405 | super().__init__() | ||
312 | 406 | self._callback = Mock() | ||
313 | 407 | |||
314 | 408 | def getDescription(self): | ||
315 | 409 | return "%s mock ProcessProtocolService" % self.__class__.__name__ | ||
316 | 410 | |||
317 | 411 | def createProcessProtocol(self): | ||
318 | 412 | return JSONPerLineProtocol(callback=self._callback) | ||
319 | 413 | |||
320 | 414 | |||
321 | 415 | class TrueProcessProtocolService(MockProcessProtocolService): | ||
322 | 385 | 416 | ||
323 | 386 | def getProcessParameters(self): | 417 | def getProcessParameters(self): |
324 | 387 | return [b"/bin/true"] | 418 | return [b"/bin/true"] |
325 | 388 | 419 | ||
326 | 389 | 420 | ||
328 | 390 | class FalseProcessProtocolService(ProcessProtocolService): | 421 | class FalseProcessProtocolService(MockProcessProtocolService): |
329 | 391 | 422 | ||
330 | 392 | def getProcessParameters(self): | 423 | def getProcessParameters(self): |
331 | 393 | return [b"/bin/false"] | 424 | return [b"/bin/false"] |
332 | 394 | 425 | ||
333 | 395 | 426 | ||
335 | 396 | class CatProcessProtocolService(ProcessProtocolService): | 427 | class CatProcessProtocolService(MockProcessProtocolService): |
336 | 397 | 428 | ||
337 | 398 | def getProcessParameters(self): | 429 | def getProcessParameters(self): |
338 | 399 | return [b"/bin/cat"] | 430 | return [b"/bin/cat"] |
339 | 400 | 431 | ||
340 | 401 | 432 | ||
342 | 402 | class EchoProcessProtocolService(ProcessProtocolService): | 433 | class EchoProcessProtocolService(MockProcessProtocolService): |
343 | 403 | 434 | ||
344 | 404 | def getProcessParameters(self): | 435 | def getProcessParameters(self): |
345 | 405 | return [b"/bin/echo", b"{}\n"] | 436 | return [b"/bin/echo", b"{}\n"] |
346 | @@ -416,70 +447,59 @@ | |||
347 | 416 | 447 | ||
348 | 417 | def test__base_class_cannot_be_used(self): | 448 | def test__base_class_cannot_be_used(self): |
349 | 418 | with ExpectedException(TypeError): | 449 | with ExpectedException(TypeError): |
352 | 419 | ProcessProtocolService( | 450 | ProcessProtocolService() |
351 | 420 | description="Mock process", protocol=Mock()) | ||
353 | 421 | 451 | ||
354 | 422 | @inlineCallbacks | 452 | @inlineCallbacks |
355 | 423 | def test__starts_and_stops_process(self): | 453 | def test__starts_and_stops_process(self): |
359 | 424 | protocol = MockJSONProtocol() | 454 | service = CatProcessProtocolService() |
357 | 425 | service = CatProcessProtocolService( | ||
358 | 426 | description="Unit test process", protocol=protocol) | ||
360 | 427 | mock_callback = Mock() | 455 | mock_callback = Mock() |
361 | 428 | protocol.done.addCallback(mock_callback) | ||
362 | 429 | service.startService() | 456 | service.startService() |
363 | 457 | self.assertThat(service.deferredProcessEnded, Not(IsFiredDeferred())) | ||
364 | 458 | service.deferredProcessEnded.addCallback(mock_callback) | ||
365 | 430 | yield service.stopService() | 459 | yield service.stopService() |
366 | 460 | yield service.deferredProcessEnded | ||
367 | 461 | self.assertThat(service.deferredProcessEnded, IsFiredDeferred()) | ||
368 | 431 | self.assertTrue(mock_callback.called) | 462 | self.assertTrue(mock_callback.called) |
369 | 432 | yield pause(0.0) | 463 | yield pause(0.0) |
372 | 433 | self.assertThat(protocol.done, IsFiredDeferred()) | 464 | self.assertThat(service.deferredProcessEnded, IsFiredDeferred()) |
373 | 434 | self.assertThat(extract_result(protocol.done), Is(None)) | 465 | self.assertThat(extract_result(service.deferredProcessEnded), Is(None)) |
374 | 435 | with ExpectedException(ProcessExitedAlready): | 466 | with ExpectedException(ProcessExitedAlready): |
375 | 436 | service.process.signalProcess("INT") | 467 | service.process.signalProcess("INT") |
376 | 437 | 468 | ||
377 | 438 | @inlineCallbacks | 469 | @inlineCallbacks |
378 | 439 | def test__handles_normal_process_exit(self): | 470 | def test__handles_normal_process_exit(self): |
382 | 440 | protocol = MockJSONProtocol() | 471 | service = TrueProcessProtocolService() |
380 | 441 | service = TrueProcessProtocolService( | ||
381 | 442 | description="Unit test process", protocol=protocol) | ||
383 | 443 | mock_callback = Mock() | 472 | mock_callback = Mock() |
384 | 444 | protocol.done.addCallback(mock_callback) | ||
385 | 445 | service.startService() | 473 | service.startService() |
386 | 474 | service.deferredProcessEnded.addCallback(mock_callback) | ||
387 | 446 | yield service.stopService() | 475 | yield service.stopService() |
388 | 476 | yield service.deferredProcessEnded | ||
389 | 447 | self.assertTrue(mock_callback.called) | 477 | self.assertTrue(mock_callback.called) |
390 | 448 | yield pause(0.0) | 478 | yield pause(0.0) |
393 | 449 | self.assertThat(protocol.done, IsFiredDeferred()) | 479 | self.assertThat(service.deferredProcessEnded, IsFiredDeferred()) |
394 | 450 | self.assertThat(extract_result(protocol.done), Is(None)) | 480 | self.assertThat(extract_result(service.deferredProcessEnded), Is(None)) |
395 | 451 | 481 | ||
396 | 452 | @inlineCallbacks | 482 | @inlineCallbacks |
397 | 453 | def test__handles_abnormal_process_exit(self): | 483 | def test__handles_abnormal_process_exit(self): |
412 | 454 | protocol = MockJSONProtocol() | 484 | service = FalseProcessProtocolService() |
413 | 455 | service = FalseProcessProtocolService( | 485 | with TwistedLoggerFixture() as logger: |
414 | 456 | description="Unit test process", protocol=protocol) | 486 | service.startService() |
415 | 457 | mock_errback = Mock() | 487 | d = service.deferredProcessEnded |
416 | 458 | mock_callback = Mock() | 488 | yield service.stopService() |
417 | 459 | protocol.done.addCallback(mock_callback) | 489 | self.assertThat(d, IsFiredDeferred()) |
418 | 460 | protocol.done.addErrback(mock_errback) | 490 | self.assertThat(logger.output, DocTestMatches("... failed...")) |
405 | 461 | service.startService() | ||
406 | 462 | yield service.stopService() | ||
407 | 463 | self.assertTrue(mock_errback.called) | ||
408 | 464 | self.assertFalse(mock_callback.called) | ||
409 | 465 | yield pause(0.0) | ||
410 | 466 | self.assertThat(protocol.done, IsFiredDeferred()) | ||
411 | 467 | self.assertThat(extract_result(protocol.done), Equals(None)) | ||
419 | 468 | 491 | ||
420 | 469 | @inlineCallbacks | 492 | @inlineCallbacks |
421 | 470 | def test__calls_protocol_callback(self): | 493 | def test__calls_protocol_callback(self): |
426 | 471 | callback = Mock() | 494 | service = EchoProcessProtocolService() |
423 | 472 | protocol = MockJSONProtocol(callback=callback) | ||
424 | 473 | service = EchoProcessProtocolService( | ||
425 | 474 | description="Unit test process", protocol=protocol) | ||
427 | 475 | service.startService() | 495 | service.startService() |
428 | 476 | # Wait for the protocol to finish. (the echo process will stop) | 496 | # Wait for the protocol to finish. (the echo process will stop) |
431 | 477 | yield protocol.done | 497 | d = service.deferredProcessEnded |
432 | 478 | self.assertThat(callback, MockCalledOnceWith([{}])) | 498 | result = yield d |
433 | 499 | self.assertThat(service._callback, MockCalledOnceWith([{}])) | ||
434 | 479 | yield service.stopService() | 500 | yield service.stopService() |
438 | 480 | yield pause(0.0) | 501 | self.assertThat(d, IsFiredDeferred()) |
439 | 481 | self.assertThat(protocol.done, IsFiredDeferred()) | 502 | self.assertThat(result, Equals(None)) |
437 | 482 | self.assertThat(extract_result(protocol.done), Equals(None)) | ||
440 | 483 | 503 | ||
441 | 484 | 504 | ||
442 | 485 | class TestNeighbourDiscoveryService(MAASTestCase): | 505 | class TestNeighbourDiscoveryService(MAASTestCase): |
443 | @@ -496,6 +516,26 @@ | |||
444 | 496 | self.assertTrue(args[1], Equals(b"observe-arp")) | 516 | self.assertTrue(args[1], Equals(b"observe-arp")) |
445 | 497 | self.assertTrue(args[2], Equals(ifname.encode('utf-8'))) | 517 | self.assertTrue(args[2], Equals(ifname.encode('utf-8'))) |
446 | 498 | 518 | ||
447 | 519 | @inlineCallbacks | ||
448 | 520 | def test__restarts_process_after_finishing(self): | ||
449 | 521 | ifname = factory.make_name('eth') | ||
450 | 522 | service = NeighbourDiscoveryService(ifname, Mock()) | ||
451 | 523 | mock_process_params = self.patch(service, 'getProcessParameters') | ||
452 | 524 | mock_process_params.return_value = [b'/bin/echo', b'{}'] | ||
453 | 525 | service.clock = Clock() | ||
454 | 526 | service.startService() | ||
455 | 527 | # Wait for the protocol to finish | ||
456 | 528 | service.clock.advance(0.0) | ||
457 | 529 | yield service.deferredProcessEnded | ||
458 | 530 | self.assertThat(service.deferredProcessEnded, IsFiredDeferred()) | ||
459 | 531 | # Advance the clock (should start the service again) | ||
460 | 532 | interval = service.step | ||
461 | 533 | service.clock.advance(interval) | ||
462 | 534 | # The Deferred should have been recreated. | ||
463 | 535 | self.assertThat(service.deferredProcessEnded, Not(IsFiredDeferred())) | ||
464 | 536 | yield service.deferredProcessEnded | ||
465 | 537 | service.stopService() | ||
466 | 538 | |||
467 | 499 | 539 | ||
468 | 500 | class TestMDNSResolverService(MAASTestCase): | 540 | class TestMDNSResolverService(MAASTestCase): |
469 | 501 | """Tests for `MDNSResolverService`.""" | 541 | """Tests for `MDNSResolverService`.""" |
All looks good. I have lots of small suggestions for simplifying the code, or to be more idiomatically Twisted, but I'll propose those as a follow-on instead of here, because this code is good to land as it is.