Merge lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes into lp:~maas-committers/maas/trunk

Proposed by Mike Pontillo
Status: Merged
Approved by: Mike Pontillo
Approved revision: no longer in the source branch.
Merged at revision: 5344
Proposed branch: lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes
Merge into: lp:~maas-committers/maas/trunk
Prerequisite: lp:~allenap/maas/drive-bys-2016-09-06
Diff against target: 469 lines (+180/-78)
2 files modified
src/provisioningserver/utils/services.py (+95/-33)
src/provisioningserver/utils/tests/test_services.py (+85/-45)
To merge this branch: bzr merge lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes
Reviewer Review Type Date Requested Status
Mike Pontillo (community) Approve
Gavin Panella Pending
Review via email: mp+305505@code.launchpad.net

This proposal supersedes a proposal from 2016-09-09.

Commit message

Fix bug that caused a large number of observation processes to spawn in certain situations. (Especially when observation processes die, or interface configuration changes.)

 * Add unit test for runaway process issue.
 * Move responsibility for managing Deferred (used to indicate when the service has stopped) from the ProcessProtocol implementation to the ProcessProtocolService.
 * Refactor unit tests to account for the new design.
 * Fix a typo that could have caused reconfigured interfaces to fail to spawn.

Description of the change

I've been testing this end-to-end all day, and LaMont verified (on an earlier version of this branch) that the code is working much better now.

This branch should ship with the next alpha to increase reliability. Without it, we can spawn processes every minute or so and eventually use up all the resources on a MAAS rack.

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote : Posted in a previous version of this proposal

All looks good. I have lots of small suggestions for simplifying the code, or to be more idiomatically Twisted, but I'll propose those as a follow-on instead of here, because this code is good to land as it is.

review: Approve
Revision history for this message
Mike Pontillo (mpontillo) wrote : Posted in a previous version of this proposal

Thanks for taking a look, Gavin!

Revision history for this message
Mike Pontillo (mpontillo) wrote : Posted in a previous version of this proposal

Setting this to depend on lp:~allenap/maas/drive-bys-2016-09-06 since there is a merge conflict.

Revision history for this message
Mike Pontillo (mpontillo) wrote :

Self-approving already-approved branch.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'src/provisioningserver/utils/services.py'
--- src/provisioningserver/utils/services.py 2016-09-12 17:50:38 +0000
+++ src/provisioningserver/utils/services.py 2016-09-12 17:50:38 +0000
@@ -35,7 +35,10 @@
35 Deferred,35 Deferred,
36 maybeDeferred,36 maybeDeferred,
37)37)
38from twisted.internet.error import ProcessDone38from twisted.internet.error import (
39 ProcessDone,
40 ProcessExitedAlready,
41)
39from twisted.internet.protocol import ProcessProtocol42from twisted.internet.protocol import ProcessProtocol
40from twisted.internet.threads import deferToThread43from twisted.internet.threads import deferToThread
41from twisted.python import log44from twisted.python import log
@@ -51,22 +54,28 @@
5154
52 def __init__(self, callback=None):55 def __init__(self, callback=None):
53 super().__init__()56 super().__init__()
54 self.done = Deferred()57 if callback is None:
55 self.callback = callback58 self._callback = lambda result: None
56 self.outbuf = b''59 else:
57 self.errbuf = b''60 self._callback = callback
61 self._done = lambda result: None
62 self._outbuf = b''
63 self._errbuf = b''
64
65 def setDoneCallback(self, done):
66 self._done = done
5867
59 def connectionMade(self):68 def connectionMade(self):
60 self.outbuf = b''69 self._outbuf = b''
61 self.errbuf = b''70 self._errbuf = b''
6271
63 def outReceived(self, data):72 def outReceived(self, data):
64 lines, self.outbuf = self.splitLines(self.outbuf + data)73 lines, self._outbuf = self.splitLines(self._outbuf + data)
65 for line in lines:74 for line in lines:
66 self.outLineReceived(line)75 self.outLineReceived(line)
6776
68 def errReceived(self, data):77 def errReceived(self, data):
69 lines, self.errbuf = self.splitLines(self.errbuf + data)78 lines, self._errbuf = self.splitLines(self._errbuf + data)
70 for line in lines:79 for line in lines:
71 self.errLineReceived(line)80 self.errLineReceived(line)
7281
@@ -94,19 +103,21 @@
94 self.objectReceived(obj)103 self.objectReceived(obj)
95104
96 def objectReceived(self, obj):105 def objectReceived(self, obj):
97 self.callback([obj])106 self._callback([obj])
98107
99 def errLineReceived(self, line):108 def errLineReceived(self, line):
100 line = line.decode("utf-8")109 line = line.decode("utf-8")
101 log.msg(line.rstrip())110 log.msg(line.rstrip())
102111
103 def processEnded(self, reason):112 def processEnded(self, reason):
104 if len(self.errbuf) != 0:113 if len(self._errbuf) != 0:
105 self.errLineReceived(self.errbuf)114 self.errLineReceived(self._errbuf)
115 # If the process finished normally, call the _done callback with
116 # None. Otherwise, pass the reason through.
106 if reason.check(ProcessDone):117 if reason.check(ProcessDone):
107 self.done.callback(None)118 self._done(None)
108 else:119 else:
109 self.done.errback(reason)120 self._done(reason)
110121
111122
112class NeighbourObservationProtocol(JSONPerLineProtocol):123class NeighbourObservationProtocol(JSONPerLineProtocol):
@@ -126,25 +137,37 @@
126137
127class ProcessProtocolService(TimerService, metaclass=ABCMeta):138class ProcessProtocolService(TimerService, metaclass=ABCMeta):
128139
129 def __init__(self, description, protocol, interval=60.0):140 def __init__(self, interval=60.0, reactor_process=None):
130 assert protocol is not None141 self.deferredProcessEnded = None
131 assert description is not None
132 self.description = description
133 self.protocol = protocol
134 self.process = None142 self.process = None
143 if reactor_process is not None:
144 self.reactor_process = reactor_process
145 else:
146 self.reactor_process = reactor
135 super().__init__(interval, self.startProcess)147 super().__init__(interval, self.startProcess)
136148
137 @deferred149 @deferred
138 def startProcess(self):150 def startProcess(self):
139 env = select_c_utf8_bytes_locale()151 env = select_c_utf8_bytes_locale()
140 log.msg("%s started." % self.description)152 log.msg("%s started." % self.getDescription())
141 args = self.getProcessParameters()153 args = self.getProcessParameters()
154 self.deferredProcessEnded = Deferred()
155 protocol = self.createProcessProtocol()
156 protocol.setDoneCallback(self.processEnded)
142 assert all(isinstance(arg, bytes) for arg in args), (157 assert all(isinstance(arg, bytes) for arg in args), (
143 "Process arguments must all be bytes, got: %s" % repr(args))158 "Process arguments must all be bytes, got: %s" % repr(args))
144 self.process = reactor.spawnProcess(159 self.process = self.reactor_process.spawnProcess(
145 self.protocol, args[0], args, env=env)160 protocol, args[0], args, env=env)
146 return self.protocol.done.addErrback(161 return self.deferredProcessEnded
147 log.err, "%s failed." % self.description)162
163 def processEnded(self, failure):
164 if failure is None:
165 log.msg("%s ended normally." % self.getDescription())
166 else:
167 log.err(failure, "%s failed." % self.getDescription())
168 # We don't need to call the errback, because we don't want the service
169 # to crash with an unhandled error.
170 self.deferredProcessEnded.callback(None)
148171
149 @abstractmethod172 @abstractmethod
150 def getProcessParameters(self):173 def getProcessParameters(self):
@@ -153,21 +176,51 @@
153 This MUST be overridden in subclasses.176 This MUST be overridden in subclasses.
154 """177 """
155178
179 @abstractmethod
180 def getDescription(self):
181 """Return the description of this process, suitable to use in verbose
182 logging.
183
184 This MUST be overridden in subclasses.
185 """
186
187 @abstractmethod
188 def createProcessProtocol(self):
189 """
190 Creates and returns the ProcessProtocol that will be used to
191 communicate with the process.
192
193 This MUST be overridden in subclasses.
194 """
195
196 def stopProcess(self):
197 try:
198 self.process.signalProcess("INT")
199 except ProcessExitedAlready:
200 pass
201
156 def stopService(self):202 def stopService(self):
157 """Stops the neighbour observation service."""203 """Stops the neighbour observation service."""
158 if self.process is not None:204 if self.process is not None:
159 self.process.loseConnection()205 self.process.loseConnection()
206 # We don't care about the result; we just want to make sure the
207 # process is dead.
208 # XXX this causes us to log errors such as:
209 # mDNS resolver process failed.
210 # reactor.callFromThread(self.stopProcess)
160 return super().stopService()211 return super().stopService()
161212
162213
163class NeighbourDiscoveryService(ProcessProtocolService):214class NeighbourDiscoveryService(ProcessProtocolService):
164 """Service to spawn the per-interface device discovery subprocess."""215 """Service to spawn the per-interface device discovery subprocess."""
165216
166 def __init__(self, ifname: str, callback):217 def __init__(self, ifname: str, callback: callable):
167 self.ifname = ifname218 self.ifname = ifname
168 description = "Neighbour observation process for %s" % ifname219 self.callback = callback
169 protocol = NeighbourObservationProtocol(ifname, callback=callback)220 super().__init__()
170 super().__init__(description=description, protocol=protocol)221
222 def getDescription(self) -> str:
223 return "Neighbour observation process for %s" % self.ifname
171224
172 def getProcessParameters(self):225 def getProcessParameters(self):
173 maas_rack_cmd = get_maas_provision_command().encode("utf-8")226 maas_rack_cmd = get_maas_provision_command().encode("utf-8")
@@ -177,14 +230,20 @@
177 self.ifname.encode("utf-8")230 self.ifname.encode("utf-8")
178 ]231 ]
179232
233 def createProcessProtocol(self):
234 return NeighbourObservationProtocol(
235 self.ifname, callback=self.callback)
236
180237
181class MDNSResolverService(ProcessProtocolService):238class MDNSResolverService(ProcessProtocolService):
182 """Service to spawn the per-interface device discovery subprocess."""239 """Service to spawn the per-interface device discovery subprocess."""
183240
184 def __init__(self, callback):241 def __init__(self, callback):
185 protocol = JSONPerLineProtocol(callback=callback)242 self.callback = callback
186 super().__init__(243 super().__init__()
187 description="mDNS resolver process", protocol=protocol)244
245 def getDescription(self):
246 return "mDNS observation process"
188247
189 def getProcessParameters(self):248 def getProcessParameters(self):
190 maas_rack_cmd = get_maas_provision_command().encode("utf-8")249 maas_rack_cmd = get_maas_provision_command().encode("utf-8")
@@ -193,6 +252,9 @@
193 b"observe-mdns",252 b"observe-mdns",
194 ]253 ]
195254
255 def createProcessProtocol(self):
256 return JSONPerLineProtocol(callback=self.callback)
257
196258
197class NetworksMonitoringService(MultiService, metaclass=ABCMeta):259class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
198 """Service to monitor network interfaces for configuration changes.260 """Service to monitor network interfaces for configuration changes.
@@ -379,7 +441,7 @@
379 for ifname in new_interfaces:441 for ifname in new_interfaces:
380 # Sanity check to ensure the service isn't already started.442 # Sanity check to ensure the service isn't already started.
381 try:443 try:
382 self.getServiceNamed("neighbour_disovery:" + ifname)444 self.getServiceNamed("neighbour_discovery:" + ifname)
383 except KeyError:445 except KeyError:
384 # This is an expected exception. (The call inside the `try`446 # This is an expected exception. (The call inside the `try`
385 # is only necessary to ensure the service doesn't exist.)447 # is only necessary to ensure the service doesn't exist.)
@@ -389,7 +451,7 @@
389 """Stop monitoring services for the specified set of interfaces."""451 """Stop monitoring services for the specified set of interfaces."""
390 for ifname in deleted_interfaces:452 for ifname in deleted_interfaces:
391 try:453 try:
392 service = self.getServiceNamed("neighbour_disovery:" + ifname)454 service = self.getServiceNamed("neighbour_discovery:" + ifname)
393 except KeyError:455 except KeyError:
394 # Service doesn't exist, so no need to stop it.456 # Service doesn't exist, so no need to stop it.
395 pass457 pass
396458
=== modified file 'src/provisioningserver/utils/tests/test_services.py'
--- src/provisioningserver/utils/tests/test_services.py 2016-09-12 17:50:38 +0000
+++ src/provisioningserver/utils/tests/test_services.py 2016-09-12 17:50:38 +0000
@@ -47,6 +47,7 @@
47from twisted.application.service import MultiService47from twisted.application.service import MultiService
48from twisted.internet import reactor48from twisted.internet import reactor
49from twisted.internet.defer import (49from twisted.internet.defer import (
50 Deferred,
50 DeferredQueue,51 DeferredQueue,
51 inlineCallbacks,52 inlineCallbacks,
52 succeed,53 succeed,
@@ -56,6 +57,7 @@
56 ProcessExitedAlready,57 ProcessExitedAlready,
57 ProcessTerminated,58 ProcessTerminated,
58)59)
60from twisted.internet.task import Clock
59from twisted.python import threadable61from twisted.python import threadable
60from twisted.python.failure import Failure62from twisted.python.failure import Failure
6163
@@ -282,12 +284,25 @@
282284
283 run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)285 run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
284286
287 def make_deferred_callback(self):
288 d = Deferred()
289
290 def callback(result):
291 if result is None:
292 d.callback(None)
293 else:
294 d.errback(result)
295
296 return d, callback
297
285 @inlineCallbacks298 @inlineCallbacks
286 def test__propagates_exit_errors(self):299 def test__propagates_exit_errors(self):
287 proto = JSONPerLineProtocol(callback=lambda json: None)300 proto = JSONPerLineProtocol(callback=lambda json: None)
301 d, callback = self.make_deferred_callback()
302 proto.setDoneCallback(callback)
288 reactor.spawnProcess(proto, b"false", (b"false",))303 reactor.spawnProcess(proto, b"false", (b"false",))
289 with ExpectedException(ProcessTerminated, ".* exit code 1"):304 with ExpectedException(ProcessTerminated, ".* exit code 1"):
290 yield proto.done305 yield d
291306
292 def test__parses_only_full_lines(self):307 def test__parses_only_full_lines(self):
293 callback = Mock()308 callback = Mock()
@@ -357,13 +372,16 @@
357 proto.processEnded(Failure(ProcessDone(0)))372 proto.processEnded(Failure(ProcessDone(0)))
358 self.assertThat(logger.output, Equals(message))373 self.assertThat(logger.output, Equals(message))
359374
375 @inlineCallbacks
360 def test__propagates_errors_from_command(self):376 def test__propagates_errors_from_command(self):
361 callback = Mock()377 d, callback = self.make_deferred_callback()
362 proto = JSONPerLineProtocol(callback=callback)378 proto = JSONPerLineProtocol()
379 proto.setDoneCallback(callback)
363 proto.connectionMade()380 proto.connectionMade()
364 reason = Failure(ProcessTerminated(1))381 reason = Failure(ProcessTerminated(1))
365 proto.processEnded(reason)382 proto.processEnded(reason)
366 self.assertRaises(ProcessTerminated, extract_result, proto.done)383 with ExpectedException(ProcessTerminated):
384 yield d
367385
368386
369class TestNeighbourObservationProtocol(MAASTestCase):387class TestNeighbourObservationProtocol(MAASTestCase):
@@ -381,25 +399,38 @@
381 callback, MockCallsMatch(call([{"interface": ifname}])))399 callback, MockCallsMatch(call([{"interface": ifname}])))
382400
383401
384class TrueProcessProtocolService(ProcessProtocolService):402class MockProcessProtocolService(ProcessProtocolService):
403
404 def __init__(self):
405 super().__init__()
406 self._callback = Mock()
407
408 def getDescription(self):
409 return "%s mock ProcessProtocolService" % self.__class__.__name__
410
411 def createProcessProtocol(self):
412 return JSONPerLineProtocol(callback=self._callback)
413
414
415class TrueProcessProtocolService(MockProcessProtocolService):
385416
386 def getProcessParameters(self):417 def getProcessParameters(self):
387 return [b"/bin/true"]418 return [b"/bin/true"]
388419
389420
390class FalseProcessProtocolService(ProcessProtocolService):421class FalseProcessProtocolService(MockProcessProtocolService):
391422
392 def getProcessParameters(self):423 def getProcessParameters(self):
393 return [b"/bin/false"]424 return [b"/bin/false"]
394425
395426
396class CatProcessProtocolService(ProcessProtocolService):427class CatProcessProtocolService(MockProcessProtocolService):
397428
398 def getProcessParameters(self):429 def getProcessParameters(self):
399 return [b"/bin/cat"]430 return [b"/bin/cat"]
400431
401432
402class EchoProcessProtocolService(ProcessProtocolService):433class EchoProcessProtocolService(MockProcessProtocolService):
403434
404 def getProcessParameters(self):435 def getProcessParameters(self):
405 return [b"/bin/echo", b"{}\n"]436 return [b"/bin/echo", b"{}\n"]
@@ -416,70 +447,59 @@
416447
417 def test__base_class_cannot_be_used(self):448 def test__base_class_cannot_be_used(self):
418 with ExpectedException(TypeError):449 with ExpectedException(TypeError):
419 ProcessProtocolService(450 ProcessProtocolService()
420 description="Mock process", protocol=Mock())
421451
422 @inlineCallbacks452 @inlineCallbacks
423 def test__starts_and_stops_process(self):453 def test__starts_and_stops_process(self):
424 protocol = MockJSONProtocol()454 service = CatProcessProtocolService()
425 service = CatProcessProtocolService(
426 description="Unit test process", protocol=protocol)
427 mock_callback = Mock()455 mock_callback = Mock()
428 protocol.done.addCallback(mock_callback)
429 service.startService()456 service.startService()
457 self.assertThat(service.deferredProcessEnded, Not(IsFiredDeferred()))
458 service.deferredProcessEnded.addCallback(mock_callback)
430 yield service.stopService()459 yield service.stopService()
460 yield service.deferredProcessEnded
461 self.assertThat(service.deferredProcessEnded, IsFiredDeferred())
431 self.assertTrue(mock_callback.called)462 self.assertTrue(mock_callback.called)
432 yield pause(0.0)463 yield pause(0.0)
433 self.assertThat(protocol.done, IsFiredDeferred())464 self.assertThat(service.deferredProcessEnded, IsFiredDeferred())
434 self.assertThat(extract_result(protocol.done), Is(None))465 self.assertThat(extract_result(service.deferredProcessEnded), Is(None))
435 with ExpectedException(ProcessExitedAlready):466 with ExpectedException(ProcessExitedAlready):
436 service.process.signalProcess("INT")467 service.process.signalProcess("INT")
437468
438 @inlineCallbacks469 @inlineCallbacks
439 def test__handles_normal_process_exit(self):470 def test__handles_normal_process_exit(self):
440 protocol = MockJSONProtocol()471 service = TrueProcessProtocolService()
441 service = TrueProcessProtocolService(
442 description="Unit test process", protocol=protocol)
443 mock_callback = Mock()472 mock_callback = Mock()
444 protocol.done.addCallback(mock_callback)
445 service.startService()473 service.startService()
474 service.deferredProcessEnded.addCallback(mock_callback)
446 yield service.stopService()475 yield service.stopService()
476 yield service.deferredProcessEnded
447 self.assertTrue(mock_callback.called)477 self.assertTrue(mock_callback.called)
448 yield pause(0.0)478 yield pause(0.0)
449 self.assertThat(protocol.done, IsFiredDeferred())479 self.assertThat(service.deferredProcessEnded, IsFiredDeferred())
450 self.assertThat(extract_result(protocol.done), Is(None))480 self.assertThat(extract_result(service.deferredProcessEnded), Is(None))
451481
452 @inlineCallbacks482 @inlineCallbacks
453 def test__handles_abnormal_process_exit(self):483 def test__handles_abnormal_process_exit(self):
454 protocol = MockJSONProtocol()484 service = FalseProcessProtocolService()
455 service = FalseProcessProtocolService(485 with TwistedLoggerFixture() as logger:
456 description="Unit test process", protocol=protocol)486 service.startService()
457 mock_errback = Mock()487 d = service.deferredProcessEnded
458 mock_callback = Mock()488 yield service.stopService()
459 protocol.done.addCallback(mock_callback)489 self.assertThat(d, IsFiredDeferred())
460 protocol.done.addErrback(mock_errback)490 self.assertThat(logger.output, DocTestMatches("... failed..."))
461 service.startService()
462 yield service.stopService()
463 self.assertTrue(mock_errback.called)
464 self.assertFalse(mock_callback.called)
465 yield pause(0.0)
466 self.assertThat(protocol.done, IsFiredDeferred())
467 self.assertThat(extract_result(protocol.done), Equals(None))
468491
469 @inlineCallbacks492 @inlineCallbacks
470 def test__calls_protocol_callback(self):493 def test__calls_protocol_callback(self):
471 callback = Mock()494 service = EchoProcessProtocolService()
472 protocol = MockJSONProtocol(callback=callback)
473 service = EchoProcessProtocolService(
474 description="Unit test process", protocol=protocol)
475 service.startService()495 service.startService()
476 # Wait for the protocol to finish. (the echo process will stop)496 # Wait for the protocol to finish. (the echo process will stop)
477 yield protocol.done497 d = service.deferredProcessEnded
478 self.assertThat(callback, MockCalledOnceWith([{}]))498 result = yield d
499 self.assertThat(service._callback, MockCalledOnceWith([{}]))
479 yield service.stopService()500 yield service.stopService()
480 yield pause(0.0)501 self.assertThat(d, IsFiredDeferred())
481 self.assertThat(protocol.done, IsFiredDeferred())502 self.assertThat(result, Equals(None))
482 self.assertThat(extract_result(protocol.done), Equals(None))
483503
484504
485class TestNeighbourDiscoveryService(MAASTestCase):505class TestNeighbourDiscoveryService(MAASTestCase):
@@ -496,6 +516,26 @@
496 self.assertTrue(args[1], Equals(b"observe-arp"))516 self.assertTrue(args[1], Equals(b"observe-arp"))
497 self.assertTrue(args[2], Equals(ifname.encode('utf-8')))517 self.assertTrue(args[2], Equals(ifname.encode('utf-8')))
498518
519 @inlineCallbacks
520 def test__restarts_process_after_finishing(self):
521 ifname = factory.make_name('eth')
522 service = NeighbourDiscoveryService(ifname, Mock())
523 mock_process_params = self.patch(service, 'getProcessParameters')
524 mock_process_params.return_value = [b'/bin/echo', b'{}']
525 service.clock = Clock()
526 service.startService()
527 # Wait for the protocol to finish
528 service.clock.advance(0.0)
529 yield service.deferredProcessEnded
530 self.assertThat(service.deferredProcessEnded, IsFiredDeferred())
531 # Advance the clock (should start the service again)
532 interval = service.step
533 service.clock.advance(interval)
534 # The Deferred should have been recreated.
535 self.assertThat(service.deferredProcessEnded, Not(IsFiredDeferred()))
536 yield service.deferredProcessEnded
537 service.stopService()
538
499539
500class TestMDNSResolverService(MAASTestCase):540class TestMDNSResolverService(MAASTestCase):
501 """Tests for `MDNSResolverService`."""541 """Tests for `MDNSResolverService`."""