Merge lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes into lp:~maas-committers/maas/trunk

Proposed by Mike Pontillo
Status: Merged
Approved by: Mike Pontillo
Approved revision: no longer in the source branch.
Merged at revision: 5344
Proposed branch: lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes
Merge into: lp:~maas-committers/maas/trunk
Prerequisite: lp:~allenap/maas/drive-bys-2016-09-06
Diff against target: 469 lines (+180/-78)
2 files modified
src/provisioningserver/utils/services.py (+95/-33)
src/provisioningserver/utils/tests/test_services.py (+85/-45)
To merge this branch: bzr merge lp:~mpontillo/maas/try-not-to-spawn-loads-of-observation-processes
Reviewer Review Type Date Requested Status
Mike Pontillo (community) Approve
Gavin Panella Pending
Review via email: mp+305505@code.launchpad.net

This proposal supersedes a proposal from 2016-09-09.

Commit message

Fix bug that caused a large number of observation processes to spawn in certain situations. (Especially when observation processes die, or interface configuration changes.)

 * Add unit test for runaway process issue.
 * Move responsibility for managing Deferred (used to indicate when the service has stopped) from the ProcessProtocol implementation to the ProcessProtocolService.
 * Refactor unit tests to account for the new design.
 * Fix a typo that could have caused reconfigured interfaces to fail to spawn.

Description of the change

I've been testing this end-to-end all day, and LaMont verified (on an earlier version of this branch) that the code is working much better now.

This branch should ship with the next alpha to increase reliability. Without it, we can spawn processes every minute or so and eventually use up all the resources on a MAAS rack.

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote : Posted in a previous version of this proposal

All looks good. I have lots of small suggestions for simplifying the code, or to be more idiomatically Twisted, but I'll propose those as a follow-on instead of here, because this code is good to land as it is.

review: Approve
Revision history for this message
Mike Pontillo (mpontillo) wrote : Posted in a previous version of this proposal

Thanks for taking a look, Gavin!

Revision history for this message
Mike Pontillo (mpontillo) wrote : Posted in a previous version of this proposal

Setting this to depend on lp:~allenap/maas/drive-bys-2016-09-06 since there is a merge conflict.

Revision history for this message
Mike Pontillo (mpontillo) wrote :

Self-approving already-approved branch.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'src/provisioningserver/utils/services.py'
2--- src/provisioningserver/utils/services.py 2016-09-12 17:50:38 +0000
3+++ src/provisioningserver/utils/services.py 2016-09-12 17:50:38 +0000
4@@ -35,7 +35,10 @@
5 Deferred,
6 maybeDeferred,
7 )
8-from twisted.internet.error import ProcessDone
9+from twisted.internet.error import (
10+ ProcessDone,
11+ ProcessExitedAlready,
12+)
13 from twisted.internet.protocol import ProcessProtocol
14 from twisted.internet.threads import deferToThread
15 from twisted.python import log
16@@ -51,22 +54,28 @@
17
18 def __init__(self, callback=None):
19 super().__init__()
20- self.done = Deferred()
21- self.callback = callback
22- self.outbuf = b''
23- self.errbuf = b''
24+ if callback is None:
25+ self._callback = lambda result: None
26+ else:
27+ self._callback = callback
28+ self._done = lambda result: None
29+ self._outbuf = b''
30+ self._errbuf = b''
31+
32+ def setDoneCallback(self, done):
33+ self._done = done
34
35 def connectionMade(self):
36- self.outbuf = b''
37- self.errbuf = b''
38+ self._outbuf = b''
39+ self._errbuf = b''
40
41 def outReceived(self, data):
42- lines, self.outbuf = self.splitLines(self.outbuf + data)
43+ lines, self._outbuf = self.splitLines(self._outbuf + data)
44 for line in lines:
45 self.outLineReceived(line)
46
47 def errReceived(self, data):
48- lines, self.errbuf = self.splitLines(self.errbuf + data)
49+ lines, self._errbuf = self.splitLines(self._errbuf + data)
50 for line in lines:
51 self.errLineReceived(line)
52
53@@ -94,19 +103,21 @@
54 self.objectReceived(obj)
55
56 def objectReceived(self, obj):
57- self.callback([obj])
58+ self._callback([obj])
59
60 def errLineReceived(self, line):
61 line = line.decode("utf-8")
62 log.msg(line.rstrip())
63
64 def processEnded(self, reason):
65- if len(self.errbuf) != 0:
66- self.errLineReceived(self.errbuf)
67+ if len(self._errbuf) != 0:
68+ self.errLineReceived(self._errbuf)
69+ # If the process finished normally, call the _done callback with
70+ # None. Otherwise, pass the reason through.
71 if reason.check(ProcessDone):
72- self.done.callback(None)
73+ self._done(None)
74 else:
75- self.done.errback(reason)
76+ self._done(reason)
77
78
79 class NeighbourObservationProtocol(JSONPerLineProtocol):
80@@ -126,25 +137,37 @@
81
82 class ProcessProtocolService(TimerService, metaclass=ABCMeta):
83
84- def __init__(self, description, protocol, interval=60.0):
85- assert protocol is not None
86- assert description is not None
87- self.description = description
88- self.protocol = protocol
89+ def __init__(self, interval=60.0, reactor_process=None):
90+ self.deferredProcessEnded = None
91 self.process = None
92+ if reactor_process is not None:
93+ self.reactor_process = reactor_process
94+ else:
95+ self.reactor_process = reactor
96 super().__init__(interval, self.startProcess)
97
98 @deferred
99 def startProcess(self):
100 env = select_c_utf8_bytes_locale()
101- log.msg("%s started." % self.description)
102+ log.msg("%s started." % self.getDescription())
103 args = self.getProcessParameters()
104+ self.deferredProcessEnded = Deferred()
105+ protocol = self.createProcessProtocol()
106+ protocol.setDoneCallback(self.processEnded)
107 assert all(isinstance(arg, bytes) for arg in args), (
108 "Process arguments must all be bytes, got: %s" % repr(args))
109- self.process = reactor.spawnProcess(
110- self.protocol, args[0], args, env=env)
111- return self.protocol.done.addErrback(
112- log.err, "%s failed." % self.description)
113+ self.process = self.reactor_process.spawnProcess(
114+ protocol, args[0], args, env=env)
115+ return self.deferredProcessEnded
116+
117+ def processEnded(self, failure):
118+ if failure is None:
119+ log.msg("%s ended normally." % self.getDescription())
120+ else:
121+ log.err(failure, "%s failed." % self.getDescription())
122+ # We don't need to call the errback, because we don't want the service
123+ # to crash with an unhandled error.
124+ self.deferredProcessEnded.callback(None)
125
126 @abstractmethod
127 def getProcessParameters(self):
128@@ -153,21 +176,51 @@
129 This MUST be overridden in subclasses.
130 """
131
132+ @abstractmethod
133+ def getDescription(self):
134+ """Return the description of this process, suitable to use in verbose
135+ logging.
136+
137+ This MUST be overridden in subclasses.
138+ """
139+
140+ @abstractmethod
141+ def createProcessProtocol(self):
142+ """
143+ Creates and returns the ProcessProtocol that will be used to
144+ communicate with the process.
145+
146+ This MUST be overridden in subclasses.
147+ """
148+
149+ def stopProcess(self):
150+ try:
151+ self.process.signalProcess("INT")
152+ except ProcessExitedAlready:
153+ pass
154+
155 def stopService(self):
156 """Stops the neighbour observation service."""
157 if self.process is not None:
158 self.process.loseConnection()
159+ # We don't care about the result; we just want to make sure the
160+ # process is dead.
161+ # XXX this causes us to log errors such as:
162+ # mDNS resolver process failed.
163+ # reactor.callFromThread(self.stopProcess)
164 return super().stopService()
165
166
167 class NeighbourDiscoveryService(ProcessProtocolService):
168 """Service to spawn the per-interface device discovery subprocess."""
169
170- def __init__(self, ifname: str, callback):
171+ def __init__(self, ifname: str, callback: callable):
172 self.ifname = ifname
173- description = "Neighbour observation process for %s" % ifname
174- protocol = NeighbourObservationProtocol(ifname, callback=callback)
175- super().__init__(description=description, protocol=protocol)
176+ self.callback = callback
177+ super().__init__()
178+
179+ def getDescription(self) -> str:
180+ return "Neighbour observation process for %s" % self.ifname
181
182 def getProcessParameters(self):
183 maas_rack_cmd = get_maas_provision_command().encode("utf-8")
184@@ -177,14 +230,20 @@
185 self.ifname.encode("utf-8")
186 ]
187
188+ def createProcessProtocol(self):
189+ return NeighbourObservationProtocol(
190+ self.ifname, callback=self.callback)
191+
192
193 class MDNSResolverService(ProcessProtocolService):
194 """Service to spawn the per-interface device discovery subprocess."""
195
196 def __init__(self, callback):
197- protocol = JSONPerLineProtocol(callback=callback)
198- super().__init__(
199- description="mDNS resolver process", protocol=protocol)
200+ self.callback = callback
201+ super().__init__()
202+
203+ def getDescription(self):
204+ return "mDNS observation process"
205
206 def getProcessParameters(self):
207 maas_rack_cmd = get_maas_provision_command().encode("utf-8")
208@@ -193,6 +252,9 @@
209 b"observe-mdns",
210 ]
211
212+ def createProcessProtocol(self):
213+ return JSONPerLineProtocol(callback=self.callback)
214+
215
216 class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
217 """Service to monitor network interfaces for configuration changes.
218@@ -379,7 +441,7 @@
219 for ifname in new_interfaces:
220 # Sanity check to ensure the service isn't already started.
221 try:
222- self.getServiceNamed("neighbour_disovery:" + ifname)
223+ self.getServiceNamed("neighbour_discovery:" + ifname)
224 except KeyError:
225 # This is an expected exception. (The call inside the `try`
226 # is only necessary to ensure the service doesn't exist.)
227@@ -389,7 +451,7 @@
228 """Stop monitoring services for the specified set of interfaces."""
229 for ifname in deleted_interfaces:
230 try:
231- service = self.getServiceNamed("neighbour_disovery:" + ifname)
232+ service = self.getServiceNamed("neighbour_discovery:" + ifname)
233 except KeyError:
234 # Service doesn't exist, so no need to stop it.
235 pass
236
237=== modified file 'src/provisioningserver/utils/tests/test_services.py'
238--- src/provisioningserver/utils/tests/test_services.py 2016-09-12 17:50:38 +0000
239+++ src/provisioningserver/utils/tests/test_services.py 2016-09-12 17:50:38 +0000
240@@ -47,6 +47,7 @@
241 from twisted.application.service import MultiService
242 from twisted.internet import reactor
243 from twisted.internet.defer import (
244+ Deferred,
245 DeferredQueue,
246 inlineCallbacks,
247 succeed,
248@@ -56,6 +57,7 @@
249 ProcessExitedAlready,
250 ProcessTerminated,
251 )
252+from twisted.internet.task import Clock
253 from twisted.python import threadable
254 from twisted.python.failure import Failure
255
256@@ -282,12 +284,25 @@
257
258 run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
259
260+ def make_deferred_callback(self):
261+ d = Deferred()
262+
263+ def callback(result):
264+ if result is None:
265+ d.callback(None)
266+ else:
267+ d.errback(result)
268+
269+ return d, callback
270+
271 @inlineCallbacks
272 def test__propagates_exit_errors(self):
273 proto = JSONPerLineProtocol(callback=lambda json: None)
274+ d, callback = self.make_deferred_callback()
275+ proto.setDoneCallback(callback)
276 reactor.spawnProcess(proto, b"false", (b"false",))
277 with ExpectedException(ProcessTerminated, ".* exit code 1"):
278- yield proto.done
279+ yield d
280
281 def test__parses_only_full_lines(self):
282 callback = Mock()
283@@ -357,13 +372,16 @@
284 proto.processEnded(Failure(ProcessDone(0)))
285 self.assertThat(logger.output, Equals(message))
286
287+ @inlineCallbacks
288 def test__propagates_errors_from_command(self):
289- callback = Mock()
290- proto = JSONPerLineProtocol(callback=callback)
291+ d, callback = self.make_deferred_callback()
292+ proto = JSONPerLineProtocol()
293+ proto.setDoneCallback(callback)
294 proto.connectionMade()
295 reason = Failure(ProcessTerminated(1))
296 proto.processEnded(reason)
297- self.assertRaises(ProcessTerminated, extract_result, proto.done)
298+ with ExpectedException(ProcessTerminated):
299+ yield d
300
301
302 class TestNeighbourObservationProtocol(MAASTestCase):
303@@ -381,25 +399,38 @@
304 callback, MockCallsMatch(call([{"interface": ifname}])))
305
306
307-class TrueProcessProtocolService(ProcessProtocolService):
308+class MockProcessProtocolService(ProcessProtocolService):
309+
310+ def __init__(self):
311+ super().__init__()
312+ self._callback = Mock()
313+
314+ def getDescription(self):
315+ return "%s mock ProcessProtocolService" % self.__class__.__name__
316+
317+ def createProcessProtocol(self):
318+ return JSONPerLineProtocol(callback=self._callback)
319+
320+
321+class TrueProcessProtocolService(MockProcessProtocolService):
322
323 def getProcessParameters(self):
324 return [b"/bin/true"]
325
326
327-class FalseProcessProtocolService(ProcessProtocolService):
328+class FalseProcessProtocolService(MockProcessProtocolService):
329
330 def getProcessParameters(self):
331 return [b"/bin/false"]
332
333
334-class CatProcessProtocolService(ProcessProtocolService):
335+class CatProcessProtocolService(MockProcessProtocolService):
336
337 def getProcessParameters(self):
338 return [b"/bin/cat"]
339
340
341-class EchoProcessProtocolService(ProcessProtocolService):
342+class EchoProcessProtocolService(MockProcessProtocolService):
343
344 def getProcessParameters(self):
345 return [b"/bin/echo", b"{}\n"]
346@@ -416,70 +447,59 @@
347
348 def test__base_class_cannot_be_used(self):
349 with ExpectedException(TypeError):
350- ProcessProtocolService(
351- description="Mock process", protocol=Mock())
352+ ProcessProtocolService()
353
354 @inlineCallbacks
355 def test__starts_and_stops_process(self):
356- protocol = MockJSONProtocol()
357- service = CatProcessProtocolService(
358- description="Unit test process", protocol=protocol)
359+ service = CatProcessProtocolService()
360 mock_callback = Mock()
361- protocol.done.addCallback(mock_callback)
362 service.startService()
363+ self.assertThat(service.deferredProcessEnded, Not(IsFiredDeferred()))
364+ service.deferredProcessEnded.addCallback(mock_callback)
365 yield service.stopService()
366+ yield service.deferredProcessEnded
367+ self.assertThat(service.deferredProcessEnded, IsFiredDeferred())
368 self.assertTrue(mock_callback.called)
369 yield pause(0.0)
370- self.assertThat(protocol.done, IsFiredDeferred())
371- self.assertThat(extract_result(protocol.done), Is(None))
372+ self.assertThat(service.deferredProcessEnded, IsFiredDeferred())
373+ self.assertThat(extract_result(service.deferredProcessEnded), Is(None))
374 with ExpectedException(ProcessExitedAlready):
375 service.process.signalProcess("INT")
376
377 @inlineCallbacks
378 def test__handles_normal_process_exit(self):
379- protocol = MockJSONProtocol()
380- service = TrueProcessProtocolService(
381- description="Unit test process", protocol=protocol)
382+ service = TrueProcessProtocolService()
383 mock_callback = Mock()
384- protocol.done.addCallback(mock_callback)
385 service.startService()
386+ service.deferredProcessEnded.addCallback(mock_callback)
387 yield service.stopService()
388+ yield service.deferredProcessEnded
389 self.assertTrue(mock_callback.called)
390 yield pause(0.0)
391- self.assertThat(protocol.done, IsFiredDeferred())
392- self.assertThat(extract_result(protocol.done), Is(None))
393+ self.assertThat(service.deferredProcessEnded, IsFiredDeferred())
394+ self.assertThat(extract_result(service.deferredProcessEnded), Is(None))
395
396 @inlineCallbacks
397 def test__handles_abnormal_process_exit(self):
398- protocol = MockJSONProtocol()
399- service = FalseProcessProtocolService(
400- description="Unit test process", protocol=protocol)
401- mock_errback = Mock()
402- mock_callback = Mock()
403- protocol.done.addCallback(mock_callback)
404- protocol.done.addErrback(mock_errback)
405- service.startService()
406- yield service.stopService()
407- self.assertTrue(mock_errback.called)
408- self.assertFalse(mock_callback.called)
409- yield pause(0.0)
410- self.assertThat(protocol.done, IsFiredDeferred())
411- self.assertThat(extract_result(protocol.done), Equals(None))
412+ service = FalseProcessProtocolService()
413+ with TwistedLoggerFixture() as logger:
414+ service.startService()
415+ d = service.deferredProcessEnded
416+ yield service.stopService()
417+ self.assertThat(d, IsFiredDeferred())
418+ self.assertThat(logger.output, DocTestMatches("... failed..."))
419
420 @inlineCallbacks
421 def test__calls_protocol_callback(self):
422- callback = Mock()
423- protocol = MockJSONProtocol(callback=callback)
424- service = EchoProcessProtocolService(
425- description="Unit test process", protocol=protocol)
426+ service = EchoProcessProtocolService()
427 service.startService()
428 # Wait for the protocol to finish. (the echo process will stop)
429- yield protocol.done
430- self.assertThat(callback, MockCalledOnceWith([{}]))
431+ d = service.deferredProcessEnded
432+ result = yield d
433+ self.assertThat(service._callback, MockCalledOnceWith([{}]))
434 yield service.stopService()
435- yield pause(0.0)
436- self.assertThat(protocol.done, IsFiredDeferred())
437- self.assertThat(extract_result(protocol.done), Equals(None))
438+ self.assertThat(d, IsFiredDeferred())
439+ self.assertThat(result, Equals(None))
440
441
442 class TestNeighbourDiscoveryService(MAASTestCase):
443@@ -496,6 +516,26 @@
444 self.assertTrue(args[1], Equals(b"observe-arp"))
445 self.assertTrue(args[2], Equals(ifname.encode('utf-8')))
446
447+ @inlineCallbacks
448+ def test__restarts_process_after_finishing(self):
449+ ifname = factory.make_name('eth')
450+ service = NeighbourDiscoveryService(ifname, Mock())
451+ mock_process_params = self.patch(service, 'getProcessParameters')
452+ mock_process_params.return_value = [b'/bin/echo', b'{}']
453+ service.clock = Clock()
454+ service.startService()
455+ # Wait for the protocol to finish
456+ service.clock.advance(0.0)
457+ yield service.deferredProcessEnded
458+ self.assertThat(service.deferredProcessEnded, IsFiredDeferred())
459+ # Advance the clock (should start the service again)
460+ interval = service.step
461+ service.clock.advance(interval)
462+ # The Deferred should have been recreated.
463+ self.assertThat(service.deferredProcessEnded, Not(IsFiredDeferred()))
464+ yield service.deferredProcessEnded
465+ service.stopService()
466+
467
468 class TestMDNSResolverService(MAASTestCase):
469 """Tests for `MDNSResolverService`."""