Merge lp:~mpontillo/maas/avahi-service-and-rpc into lp:~maas-committers/maas/trunk

Proposed by Mike Pontillo
Status: Merged
Approved by: Mike Pontillo
Approved revision: no longer in the source branch.
Merged at revision: 5307
Proposed branch: lp:~mpontillo/maas/avahi-service-and-rpc
Merge into: lp:~maas-committers/maas/trunk
Diff against target: 889 lines (+480/-74)
13 files modified
.bzrignore (+1/-0)
.gitignore (+1/-0)
.idea/maas.iml (+8/-0)
src/maasserver/models/interface.py (+3/-3)
src/maasserver/models/mdns.py (+7/-1)
src/maasserver/models/tests/test_interface.py (+7/-7)
src/maasserver/regiondservices/networks_monitoring.py (+10/-0)
src/maasserver/rpc/rackcontrollers.py (+12/-0)
src/maasserver/rpc/regionservice.py (+12/-0)
src/provisioningserver/rackdservices/networks_monitoring_service.py (+9/-2)
src/provisioningserver/rpc/region.py (+17/-0)
src/provisioningserver/utils/services.py (+137/-60)
src/provisioningserver/utils/tests/test_services.py (+256/-1)
To merge this branch: bzr merge lp:~mpontillo/maas/avahi-service-and-rpc
Reviewer Review Type Date Requested Status
Gavin Panella (community) Approve
Review via email: mp+304431@code.launchpad.net

Commit message

Add twisted service and RPC so that mDNS entries can be recorded end-to-end in the MAAS database.

To post a comment you must log in.
Revision history for this message
Gavin Panella (allenap) wrote :

Looks good. I've made a few suggestions, that's all.

review: Approve
Revision history for this message
Mike Pontillo (mpontillo) wrote :

Thanks for the review. Some replies below.

Revision history for this message
Gavin Panella (allenap) :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file '.bzrignore'
2--- .bzrignore 2016-08-18 17:31:05 +0000
3+++ .bzrignore 2016-08-31 18:00:29 +0000
4@@ -6,6 +6,7 @@
5 ./.failed
6 ./.hypothesis
7 ./.idea
8+./.idea/libraries
9 ./.idea/scopes
10 ./.idea/workspace.xml
11 ./.installed.cfg
12
13=== modified file '.gitignore'
14--- .gitignore 2016-08-18 17:31:05 +0000
15+++ .gitignore 2016-08-31 18:00:29 +0000
16@@ -6,6 +6,7 @@
17 /.failed
18 /.hypothesis
19 /.idea
20+/.idea/libraries
21 /.idea/scopes
22 /.idea/workspace.xml
23 /.installed.cfg
24
25=== modified file '.idea/maas.iml'
26--- .idea/maas.iml 2016-04-15 16:44:43 +0000
27+++ .idea/maas.iml 2016-08-31 18:00:29 +0000
28@@ -1,5 +1,12 @@
29 <?xml version="1.0" encoding="UTF-8"?>
30 <module type="PYTHON_MODULE" version="4">
31+ <component name="FacetManager">
32+ <facet type="buildout-python" name="Buildout Support">
33+ <configuration>
34+ <option name="script" value="bin/py" />
35+ </configuration>
36+ </facet>
37+ </component>
38 <component name="NewModuleRootManager">
39 <content url="file://$MODULE_DIR$">
40 <sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
41@@ -11,6 +18,7 @@
42 </content>
43 <orderEntry type="jdk" jdkName="Python 3.5.1+ (/usr/bin/python3.5)" jdkType="Python SDK" />
44 <orderEntry type="sourceFolder" forTests="false" />
45+ <orderEntry type="library" name="Buildout Eggs" level="project" />
46 </component>
47 <component name="TemplatesService">
48 <option name="TEMPLATE_CONFIGURATION" value="Django" />
49
50=== modified file 'src/maasserver/models/interface.py'
51--- src/maasserver/models/interface.py 2016-08-20 12:42:22 +0000
52+++ src/maasserver/models/interface.py 2016-08-31 18:00:29 +0000
53@@ -1165,7 +1165,7 @@
54 from maasserver.models.mdns import MDNS
55 if self.mdns_discovery_state is False:
56 return None
57- ip = avahi_json['ip']
58+ ip = avahi_json['address']
59 hostname = avahi_json['hostname']
60 deleted = MDNS.objects.delete_and_log_obsolete_mdns_entries(
61 hostname, ip, interface=self)
62@@ -1177,8 +1177,8 @@
63 # If we deleted a previous mDNS entry, then we have already
64 # generated a log statement about this mDNS entry.
65 if not deleted:
66- maaslog.info("%s: New mDNS hostname observed for %s: '%s'." % (
67- self.get_log_string(), ip, hostname))
68+ maaslog.info("%s: New mDNS entry resolved: '%s' on %s." % (
69+ self.get_log_string(), hostname, ip))
70 else:
71 binding.count += 1
72 binding.save(update_fields=['count', 'updated'])
73
74=== modified file 'src/maasserver/models/mdns.py'
75--- src/maasserver/models/mdns.py 2016-08-15 15:33:20 +0000
76+++ src/maasserver/models/mdns.py 2016-08-31 18:00:29 +0000
77@@ -27,6 +27,7 @@
78 get_one,
79 UniqueViolation,
80 )
81+from netaddr import IPAddress
82 from provisioningserver.logger import get_maas_logger
83
84
85@@ -47,11 +48,16 @@
86 """
87 # Check if this hostname was previously assigned to another IP address.
88 deleted = False
89+ incoming_ip_version = IPAddress(ip).version
90 previous_bindings = self.filter(
91 hostname=hostname, interface=interface).exclude(ip=ip)
92+ # Check if this hostname was previously assigned to a different IP.
93 for binding in previous_bindings:
94+ if incoming_ip_version != IPAddress(binding.ip).version:
95+ # Don't move hostnames between address families.
96+ continue
97 maaslog.info("%s: Hostname '%s' moved from %s to %s." % (
98- interface.get_log_string(), ip, binding.ip, ip))
99+ interface.get_log_string(), hostname, binding.ip, ip))
100 binding.delete()
101 deleted = True
102 # Check if this IP address had a different hostname assigned.
103
104=== modified file 'src/maasserver/models/tests/test_interface.py'
105--- src/maasserver/models/tests/test_interface.py 2016-08-25 17:06:55 +0000
106+++ src/maasserver/models/tests/test_interface.py 2016-08-31 18:00:29 +0000
107@@ -951,7 +951,7 @@
108 if hostname is None:
109 hostname = factory.make_hostname()
110 return {
111- 'ip': ip,
112+ 'address': ip,
113 'hostname': hostname,
114 }
115
116@@ -983,7 +983,7 @@
117 self.assertThat(MDNS.objects.count(), Equals(1))
118 iface.update_mdns_entry(json)
119 mdns_entry = reload_object(mdns_entry)
120- self.assertThat(mdns_entry.ip, Equals(json['ip']))
121+ self.assertThat(mdns_entry.ip, Equals(json['address']))
122 self.assertThat(mdns_entry.hostname, Equals(json['hostname']))
123 # This is the second time we saw this entry.
124 self.assertThat(mdns_entry.count, Equals(2))
125@@ -995,13 +995,13 @@
126 json = self.make_mdns_entry_json()
127 iface.update_mdns_entry(json)
128 # Have a different IP address claim ownership of the hostname.
129- json['ip'] = factory.make_ip_address(ipv6=False)
130+ json['address'] = factory.make_ip_address(ipv6=False)
131 iface.update_mdns_entry(json)
132 self.assertThat(MDNS.objects.count(), Equals(1))
133- self.assertThat(list(MDNS.objects.all())[0].ip, Equals(json['ip']))
134+ self.assertThat(MDNS.objects.first().ip, Equals(json['address']))
135 # This is the first time we saw this neighbour, because the original
136 # binding was deleted.
137- self.assertThat(list(MDNS.objects.all())[0].count, Equals(1))
138+ self.assertThat(MDNS.objects.count(), Equals(1))
139
140 def test__logs_new_entry(self):
141 iface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL)
142@@ -1010,7 +1010,7 @@
143 with FakeLogger("maas.interface") as maaslog:
144 iface.update_mdns_entry(json)
145 self.assertDocTestMatches(
146- "...: New mDNS hostname observed for...",
147+ "...: New mDNS entry resolved...",
148 maaslog.output)
149
150 def test__logs_moved_entry(self):
151@@ -1019,7 +1019,7 @@
152 json = self.make_mdns_entry_json()
153 iface.update_mdns_entry(json)
154 # Have a different IP address claim ownership of the hostma,e.
155- json['ip'] = factory.make_ip_address(ipv6=False)
156+ json['address'] = factory.make_ip_address(ipv6=False)
157 with FakeLogger("maas.mDNS") as maaslog:
158 iface.update_mdns_entry(json)
159 self.assertDocTestMatches(
160
161=== modified file 'src/maasserver/regiondservices/networks_monitoring.py'
162--- src/maasserver/regiondservices/networks_monitoring.py 2016-08-18 14:47:07 +0000
163+++ src/maasserver/regiondservices/networks_monitoring.py 2016-08-31 18:00:29 +0000
164@@ -27,6 +27,10 @@
165 """Record the specified list of neighbours."""
166 return deferToDatabase(self.recordNeighboursIntoDatabase, neighbours)
167
168+ def reportMDNSEntries(self, mdns):
169+ """Record the specified list of mDNS entries."""
170+ return deferToDatabase(self.recordMDNSEntriesIntoDatabase, mdns)
171+
172 @transactional
173 def recordInterfacesIntoDatabase(self, interfaces):
174 """Record the interfaces information."""
175@@ -38,3 +42,9 @@
176 """Record the interfaces information."""
177 region_controller = RegionController.objects.get_running_controller()
178 region_controller.report_neighbours(neighbours)
179+
180+ @transactional
181+ def recordMDNSEntriesIntoDatabase(self, mdns):
182+ """Record the mDNS entries."""
183+ region_controller = RegionController.objects.get_running_controller()
184+ region_controller.report_mdns_entries(mdns)
185
186=== modified file 'src/maasserver/rpc/rackcontrollers.py'
187--- src/maasserver/rpc/rackcontrollers.py 2016-08-19 13:31:57 +0000
188+++ src/maasserver/rpc/rackcontrollers.py 2016-08-31 18:00:29 +0000
189@@ -194,6 +194,18 @@
190
191 @synchronous
192 @transactional
193+def report_mdns_entries(system_id, mdns):
194+ """Report observed neighbours seen on the rack controller."""
195+ try:
196+ rack_controller = RackController.objects.get(system_id=system_id)
197+ except RackController.DoesNotExist:
198+ raise NoSuchNode.from_system_id(system_id)
199+ else:
200+ rack_controller.report_mdns_entries(mdns)
201+
202+
203+@synchronous
204+@transactional
205 def report_neighbours(system_id, neighbours):
206 """Report observed neighbours seen on the rack controller."""
207 try:
208
209=== modified file 'src/maasserver/rpc/regionservice.py'
210--- src/maasserver/rpc/regionservice.py 2016-08-19 18:11:11 +0000
211+++ src/maasserver/rpc/regionservice.py 2016-08-31 18:00:29 +0000
212@@ -408,6 +408,18 @@
213 d.addCallback(lambda args: {})
214 return d
215
216+ @region.ReportMDNSEntries.responder
217+ def report_mdns_entries(self, system_id, mdns):
218+ """report_neighbours()
219+
220+ Implementation of
221+ :py:class:`~provisioningserver.rpc.region.ReportNeighbours`.
222+ """
223+ d = deferToDatabase(
224+ rackcontrollers.report_mdns_entries, system_id, mdns)
225+ d.addCallback(lambda args: {})
226+ return d
227+
228 @region.ReportNeighbours.responder
229 def report_neighbours(self, system_id, neighbours):
230 """report_neighbours()
231
232=== modified file 'src/provisioningserver/rackdservices/networks_monitoring_service.py'
233--- src/provisioningserver/rackdservices/networks_monitoring_service.py 2016-08-18 14:47:07 +0000
234+++ src/provisioningserver/rackdservices/networks_monitoring_service.py 2016-08-31 18:00:29 +0000
235@@ -9,6 +9,7 @@
236
237 from provisioningserver.logger.log import get_maas_logger
238 from provisioningserver.rpc.region import (
239+ ReportMDNSEntries,
240 ReportNeighbours,
241 RequestRackRefresh,
242 UpdateInterfaces,
243@@ -27,7 +28,7 @@
244 self.clientService = clientService
245
246 def recordInterfaces(self, interfaces):
247- """Record the interfaces information."""
248+ """Record the interfaces information to the region."""
249 client = self.clientService.getClient()
250 # On first run perform a refresh
251 if self._recorded is None:
252@@ -38,8 +39,14 @@
253 interfaces=interfaces)
254
255 def reportNeighbours(self, neighbours):
256- """Report neighbour information."""
257+ """Report neighbour information to the region."""
258 client = self.clientService.getClient()
259 return client(
260 ReportNeighbours, system_id=client.localIdent,
261 neighbours=neighbours)
262+
263+ def reportMDNSEntries(self, mdns):
264+ """Report mDNS entries to the region."""
265+ client = self.clientService.getClient()
266+ return client(
267+ ReportMDNSEntries, system_id=client.localIdent, mdns=mdns)
268
269=== modified file 'src/provisioningserver/rpc/region.py'
270--- src/provisioningserver/rpc/region.py 2016-08-19 18:11:11 +0000
271+++ src/provisioningserver/rpc/region.py 2016-08-31 18:00:29 +0000
272@@ -23,6 +23,7 @@
273 "RegisterRackController",
274 "ReportBootImages",
275 "ReportForeignDHCPServer",
276+ "ReportMDNSEntries",
277 "ReportNeighbours",
278 "RequestNodeInfoByMACAddress",
279 "SendEvent",
280@@ -367,6 +368,22 @@
281 errors = []
282
283
284+class ReportMDNSEntries(amp.Command):
285+ """Called by a rack controller to report observed mDNS entries.
286+
287+ :since: 2.1
288+ """
289+
290+ arguments = [
291+ (b'system_id', amp.Unicode()),
292+ (b'mdns', StructureAsJSON()),
293+ ]
294+ response = []
295+ errors = {
296+ NoSuchNode: b"NoSuchNode",
297+ }
298+
299+
300 class ReportNeighbours(amp.Command):
301 """Called by a rack controller to report observed neighbor devices.
302
303
304=== modified file 'src/provisioningserver/utils/services.py'
305--- src/provisioningserver/utils/services.py 2016-08-19 15:34:46 +0000
306+++ src/provisioningserver/utils/services.py 2016-08-31 18:00:29 +0000
307@@ -13,6 +13,7 @@
308 )
309 from datetime import timedelta
310 import json
311+from json.decoder import JSONDecodeError
312 import os
313
314 from provisioningserver.config import is_dev_environment
315@@ -34,35 +35,27 @@
316 Deferred,
317 maybeDeferred,
318 )
319-from twisted.internet.error import (
320- ProcessDone,
321- ProcessExitedAlready,
322-)
323-from twisted.internet.interfaces import ILoggingContext
324+from twisted.internet.error import ProcessDone
325 from twisted.internet.protocol import ProcessProtocol
326 from twisted.internet.threads import deferToThread
327 from twisted.python import log
328-from zope.interface import implementer
329
330
331 maaslog = get_maas_logger("networks.monitor")
332
333
334-@implementer(ILoggingContext)
335-class NeighbourObservationProtocol(ProcessProtocol):
336+class JSONPerLineProtocol(ProcessProtocol):
337+ """ProcessProtocol which allows easy parsing of a single JSON object per
338+ line of text.
339+ """
340
341- def __init__(self, interface, prefix=None, callback=None):
342+ def __init__(self, callback=None):
343 super().__init__()
344 self.done = Deferred()
345- self.interface = interface
346- self.prefix = prefix
347 self.callback = callback
348 self.outbuf = b''
349 self.errbuf = b''
350
351- def logPrefix(self):
352- return "-" if self.prefix is None else self.prefix
353-
354 def connectionMade(self):
355 self.outbuf = b''
356 self.errbuf = b''
357@@ -93,10 +86,14 @@
358
359 def outLineReceived(self, line):
360 line = line.decode("utf-8")
361- obj = json.loads(line)
362- obj['interface'] = self.interface
363- # Report one observation at a time for now. (The API is designed to
364- # support multiple, but that will be done in a future branch.)
365+ try:
366+ obj = json.loads(line)
367+ except JSONDecodeError:
368+ log.msg("Failed to parse JSON: %r" % line)
369+ else:
370+ self.objectReceived(obj)
371+
372+ def objectReceived(self, obj):
373 self.callback([obj])
374
375 def errLineReceived(self, line):
376@@ -112,52 +109,87 @@
377 self.done.errback(reason)
378
379
380-class NeighbourDiscoveryService(TimerService):
381- """Service to spawn the per-interface device discovery subprocess."""
382-
383- def __init__(self, ifname):
384- super().__init__(60.0, self.observeInterface, ifname)
385- self._ifname = ifname
386- self._process = None
387+class NeighbourObservationProtocol(JSONPerLineProtocol):
388+
389+ def __init__(self, interface, *args, **kwargs):
390+ super().__init__(*args, **kwargs)
391+ self.interface = interface
392+
393+ def objectReceived(self, obj):
394+ # The only difference between the JSONPerLineProtocol and the
395+ # NeighbourObservationProtocol is that the neighbour observation
396+ # protocol needs to insert the interface metadata into the resultant
397+ # object before the callback.
398+ obj['interface'] = self.interface
399+ super().objectReceived(obj)
400+
401+
402+class ProcessProtocolService(TimerService, metaclass=ABCMeta):
403+
404+ def __init__(self, description, protocol, interval=60.0):
405+ assert protocol is not None
406+ assert description is not None
407+ self.description = description
408+ self.protocol = protocol
409+ self.process = None
410+ super().__init__(interval, self.startProcess)
411
412 @deferred
413- def observeInterface(self, ifname):
414- """Start a network observation process for the specified interface."""
415- observer_protocol = NeighbourObservationProtocol(
416- ifname, callback=self.parent.reportNeighbours)
417+ def startProcess(self):
418 env = select_c_utf8_bytes_locale()
419- maas_rack_cmd = get_maas_provision_command().encode("utf-8")
420- self._process = reactor.spawnProcess(
421- observer_protocol, maas_rack_cmd,
422- [maas_rack_cmd, b"observe-arp", ifname.encode("utf-8")],
423- env=env)
424- log.msg("Started neighbour observation on interface: %s" % ifname)
425- return observer_protocol.done.addErrback(
426- log.err,
427- "Neighbour observation process failed for interface: %s" % (
428- self._ifname
429- ))
430-
431- def stopObservationProcess(self):
432- """Attempts to kill the neighbour observation process gracefully."""
433- if self._process is not None:
434- try:
435- self._process.signalProcess("INT")
436- except ProcessExitedAlready:
437- pass # Our work here is done.
438- except OSError:
439- log.err(
440- None,
441- "Neighbour observation process for %s failed to "
442- "shut down." % self._ifname)
443- else:
444- return self._process.done
445+ log.msg("%s started." % self.description)
446+ args = self.getProcessParameters()
447+ self.process = reactor.spawnProcess(
448+ self.protocol, args[0], args, env=env)
449+ return self.protocol.done.addErrback(
450+ log.err, "%s failed." % self.description)
451+
452+ @abstractmethod
453+ def getProcessParameters(self):
454+ """Return the parameters for the subprocess to launch.
455+
456+ This MUST be overridden in subclasses.
457+ """
458
459 def stopService(self):
460 """Stops the neighbour observation service."""
461- d = super().stopService()
462- d.addCallback(callOut, self.stopObservationProcess)
463- return d
464+ if self.process is not None:
465+ self.process.loseConnection()
466+ return super().stopService()
467+
468+
469+class NeighbourDiscoveryService(ProcessProtocolService):
470+ """Service to spawn the per-interface device discovery subprocess."""
471+
472+ def __init__(self, ifname: str, callback):
473+ self.ifname = ifname
474+ description = "Neighbour observation process for %s" % ifname
475+ protocol = NeighbourObservationProtocol(ifname, callback=callback)
476+ super().__init__(description=description, protocol=protocol)
477+
478+ def getProcessParameters(self):
479+ maas_rack_cmd = get_maas_provision_command().encode("utf-8")
480+ return [
481+ maas_rack_cmd,
482+ b"observe-arp",
483+ self.ifname.encode("utf-8")
484+ ]
485+
486+
487+class MDNSResolverService(ProcessProtocolService):
488+ """Service to spawn the per-interface device discovery subprocess."""
489+
490+ def __init__(self, callback):
491+ protocol = JSONPerLineProtocol(callback=callback)
492+ super().__init__(
493+ description="mDNS resolver process", protocol=protocol)
494+
495+ def getProcessParameters(self):
496+ maas_rack_cmd = get_maas_provision_command().encode("utf-8")
497+ return [
498+ maas_rack_cmd,
499+ b"observe-mdns",
500+ ]
501
502
503 class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
504@@ -231,6 +263,13 @@
505 This MUST be overridden in subclasses.
506 """
507
508+ @abstractmethod
509+ def reportMDNSEntries(self, mdns):
510+ """Report on new or refreshed neighbours.
511+
512+ This MUST be overridden in subclasses.
513+ """
514+
515 def stopService(self):
516 """Stop the service.
517
518@@ -307,10 +346,32 @@
519
520 def _startNeighbourDiscovery(self, ifname):
521 """"Start neighbour discovery service on the specified interface."""
522- service = NeighbourDiscoveryService(ifname)
523+ service = NeighbourDiscoveryService(ifname, self.reportNeighbours)
524 service.setName("neighbour_discovery:" + ifname)
525 service.setServiceParent(self)
526
527+ def _startMDNSDiscoveryService(self):
528+ """Start resolving mDNS entries on attached networks."""
529+ try:
530+ self.getServiceNamed("mdns_resolver")
531+ except KeyError:
532+ # This is an expected exception. (The call inside the `try`
533+ # is only necessary to ensure the service doesn't exist.)
534+ service = MDNSResolverService(self.reportMDNSEntries)
535+ service.setName("mdns_resolver")
536+ service.setServiceParent(self)
537+
538+ def _stopMDNSDiscoveryService(self):
539+ """Stop resolving mDNS entries on attached networks."""
540+ try:
541+ service = self.getServiceNamed("mdns_resolver")
542+ except KeyError:
543+ # Service doesn't exist, so no need to stop it.
544+ pass
545+ else:
546+ service.disownServiceParent()
547+ maaslog.info("Stopped mDNS resolver service.")
548+
549 def _startNeighbourDiscoveryServices(self, new_interfaces):
550 """Start monitoring services for the specified set of interfaces."""
551 for ifname in new_interfaces:
552@@ -319,7 +380,7 @@
553 self.getServiceNamed("neighbour_disovery:" + ifname)
554 except KeyError:
555 # This is an expected exception. (The call inside the `try`
556- # is only necessary to ensure the service doesn't exist.
557+ # is only necessary to ensure the service doesn't exist.)
558 self._startNeighbourDiscovery(ifname)
559
560 def _stopNeighbourDiscoveryServices(self, deleted_interfaces):
561@@ -356,6 +417,22 @@
562 deleted_interfaces = self._monitored.difference(monitored_interfaces)
563 self._startNeighbourDiscoveryServices(new_interfaces)
564 self._stopNeighbourDiscoveryServices(deleted_interfaces)
565+ # Determine if we went from monitoring zero interfaces to monitoring
566+ # at least one. If so, we need to start mDNS discovery.
567+ # XXX Need to get this setting from the region.
568+ if len(self._monitored) == 0 and len(monitored_interfaces) > 0:
569+ # We weren't currently monitoring any interfaces, but we have been
570+ # requested to monitor at least one.
571+ self._startMDNSDiscoveryService()
572+ elif len(self._monitored) > 0 and len(monitored_interfaces) == 0:
573+ # We are currently monitoring at least one interface, but we have
574+ # been requested to stop monitoring them all.
575+ self._stopMDNSDiscoveryService()
576+ else:
577+ # No state change. We either still AREN'T monitoring any
578+ # interfaces, or we still ARE monitoring them. (Either way, it
579+ # doesn't matter for mDNS discovery purposes.)
580+ pass
581 self._monitored = monitored_interfaces
582
583 def _interfacesRecorded(self, interfaces):
584
585=== modified file 'src/provisioningserver/utils/tests/test_services.py'
586--- src/provisioningserver/utils/tests/test_services.py 2016-08-20 12:42:22 +0000
587+++ src/provisioningserver/utils/tests/test_services.py 2016-08-31 18:00:29 +0000
588@@ -8,6 +8,7 @@
589 import threading
590 from unittest.mock import (
591 call,
592+ Mock,
593 sentinel,
594 )
595
596@@ -15,6 +16,7 @@
597 from maastesting.matchers import (
598 DocTestMatches,
599 HasLength,
600+ IsFiredDeferred,
601 MockCalledOnceWith,
602 MockCallsMatch,
603 MockNotCalled,
604@@ -25,12 +27,23 @@
605 )
606 from maastesting.twisted import TwistedLoggerFixture
607 from provisioningserver.utils import services
608-from provisioningserver.utils.services import NetworksMonitoringService
609+from provisioningserver.utils.services import (
610+ JSONPerLineProtocol,
611+ MDNSResolverService,
612+ NeighbourDiscoveryService,
613+ NeighbourObservationProtocol,
614+ NetworksMonitoringService,
615+ ProcessProtocolService,
616+)
617+from provisioningserver.utils.twisted import pause
618+from testtools import ExpectedException
619 from testtools.matchers import (
620 Equals,
621+ Is,
622 IsInstance,
623 Not,
624 )
625+from testtools.tests.twistedsupport.test_deferred import extract_result
626 from twisted.application.service import MultiService
627 from twisted.internet import reactor
628 from twisted.internet.defer import (
629@@ -38,7 +51,13 @@
630 inlineCallbacks,
631 succeed,
632 )
633+from twisted.internet.error import (
634+ ProcessDone,
635+ ProcessExitedAlready,
636+ ProcessTerminated,
637+)
638 from twisted.python import threadable
639+from twisted.python.failure import Failure
640
641
642 class StubNetworksMonitoringService(NetworksMonitoringService):
643@@ -60,6 +79,9 @@
644 def reportNeighbours(self, neighbours):
645 pass
646
647+ def reportMDNSEntries(self, neighbours):
648+ pass
649+
650
651 class TestNetworksMonitoringService(MAASTestCase):
652 """Tests of `NetworksMonitoringService`."""
653@@ -253,3 +275,236 @@
654 yield service.updateInterfaces()
655 # ... interfaces ARE recorded.
656 self.assertThat(service.interfaces, Not(Equals([])))
657+
658+
659+class TestJSONPerLineProtocol(MAASTestCase):
660+ """Tests for `JSONPerLineProtocol`."""
661+
662+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
663+
664+ @inlineCallbacks
665+ def test__propagates_exit_errors(self):
666+ proto = JSONPerLineProtocol(callback=lambda json: None)
667+ reactor.spawnProcess(proto, b"false", (b"false",))
668+ with ExpectedException(ProcessTerminated, ".* exit code 1"):
669+ yield proto.done
670+
671+ def test__parses_only_full_lines(self):
672+ callback = Mock()
673+ proto = JSONPerLineProtocol(callback=callback)
674+ proto.connectionMade()
675+ # Send an empty JSON dictionary using 3 separate writes.
676+ proto.outReceived(b"{")
677+ # No callback yet...
678+ self.expectThat(callback, MockCallsMatch())
679+ proto.outReceived(b"}")
680+ # Still no callback...
681+ self.expectThat(callback, MockCallsMatch())
682+ proto.outReceived(b"\n")
683+ # After a newline, we expect the JSON to be parsed and the callback
684+ # to receive an empty Python dictionary (which corresponds to the JSON
685+ # that was sent.)
686+ self.expectThat(callback, MockCallsMatch(call([{}])))
687+
688+ def test__ignores_interspersed_zero_length_writes(self):
689+ callback = Mock()
690+ proto = JSONPerLineProtocol(callback=callback)
691+ proto.connectionMade()
692+ proto.outReceived(b"")
693+ self.expectThat(callback, MockCallsMatch())
694+ proto.outReceived(b"{}\n")
695+ self.expectThat(callback, MockCallsMatch(call([{}])))
696+ proto.outReceived(b"")
697+ self.expectThat(callback, MockCallsMatch(call([{}])))
698+ proto.outReceived(b"{}\n")
699+ self.expectThat(callback, MockCallsMatch(call([{}]), call([{}])))
700+
701+ def test__logs_non_json_output(self):
702+ callback = Mock()
703+ proto = JSONPerLineProtocol(callback=callback)
704+ proto.connectionMade()
705+ with TwistedLoggerFixture() as logger:
706+ proto.outReceived(b"{\n")
707+ self.assertThat(
708+ logger.output, DocTestMatches("Failed to parse JSON: ..."))
709+
710+ def test__logs_stderr(self):
711+ message = factory.make_name("message")
712+ callback = Mock()
713+ proto = JSONPerLineProtocol(callback=callback)
714+ proto.connectionMade()
715+ with TwistedLoggerFixture() as logger:
716+ proto.errReceived((message + "\n").encode("ascii"))
717+ self.assertThat(logger.output, Equals(message))
718+
719+ def test__logs_only_full_lines_from_stderr(self):
720+ message = factory.make_name("message")
721+ callback = Mock()
722+ proto = JSONPerLineProtocol(callback=callback)
723+ proto.connectionMade()
724+ with TwistedLoggerFixture() as logger:
725+ proto.errReceived(message.encode("ascii"))
726+ self.assertThat(logger.output, Equals(""))
727+
728+ def test__logs_stderr_at_process_end(self):
729+ message = factory.make_name("message")
730+ callback = Mock()
731+ proto = JSONPerLineProtocol(callback=callback)
732+ proto.connectionMade()
733+ with TwistedLoggerFixture() as logger:
734+ proto.errReceived(message.encode("ascii"))
735+ self.assertThat(logger.output, Equals(""))
736+ proto.processEnded(Failure(ProcessDone(0)))
737+ self.assertThat(logger.output, Equals(message))
738+
739+ def test__propagates_errors_from_command(self):
740+ callback = Mock()
741+ proto = JSONPerLineProtocol(callback=callback)
742+ proto.connectionMade()
743+ reason = Failure(ProcessTerminated(1))
744+ proto.processEnded(reason)
745+ self.assertRaises(ProcessTerminated, extract_result, proto.done)
746+
747+
748+class TestNeighbourObservationProtocol(MAASTestCase):
749+ """Tests for `NeighbourObservationProtocol`."""
750+
751+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
752+
753+ def test_adds_interface(self):
754+ callback = Mock()
755+ ifname = factory.make_name('eth')
756+ proto = NeighbourObservationProtocol(ifname, callback=callback)
757+ proto.connectionMade()
758+ proto.outReceived(b"{}\n")
759+ self.expectThat(
760+ callback, MockCallsMatch(call([{"interface": ifname}])))
761+
762+
763+class TrueProcessProtocolService(ProcessProtocolService):
764+
765+ def getProcessParameters(self):
766+ return ["/bin/true"]
767+
768+
769+class FalseProcessProtocolService(ProcessProtocolService):
770+
771+ def getProcessParameters(self):
772+ return ["/bin/false"]
773+
774+
775+class CatProcessProtocolService(ProcessProtocolService):
776+
777+ def getProcessParameters(self):
778+ return ["/bin/cat"]
779+
780+
781+class EchoProcessProtocolService(ProcessProtocolService):
782+
783+ def getProcessParameters(self):
784+ return ["/bin/echo", "{}\n"]
785+
786+
787+class MockJSONProtocol(JSONPerLineProtocol):
788+ pass
789+
790+
791+class TestProcessProtocolService(MAASTestCase):
792+ """Tests for `JSONPerLineProtocol`."""
793+
794+ run_tests_with = MAASTwistedRunTest.make_factory(debug=True, timeout=5)
795+
796+ def test__base_class_cannot_be_used(self):
797+ with ExpectedException(TypeError):
798+ ProcessProtocolService(
799+ description="Mock process", protocol=Mock())
800+
801+ @inlineCallbacks
802+ def test__starts_and_stops_process(self):
803+ protocol = MockJSONProtocol()
804+ service = CatProcessProtocolService(
805+ description="Unit test process", protocol=protocol)
806+ mock_callback = Mock()
807+ protocol.done.addCallback(mock_callback)
808+ service.startService()
809+ yield service.stopService()
810+ self.assertTrue(mock_callback.called)
811+ yield pause(0.0)
812+ self.assertThat(protocol.done, IsFiredDeferred())
813+ self.assertThat(extract_result(protocol.done), Is(None))
814+ with ExpectedException(ProcessExitedAlready):
815+ service.process.signalProcess("INT")
816+
817+ @inlineCallbacks
818+ def test__handles_normal_process_exit(self):
819+ protocol = MockJSONProtocol()
820+ service = TrueProcessProtocolService(
821+ description="Unit test process", protocol=protocol)
822+ mock_callback = Mock()
823+ protocol.done.addCallback(mock_callback)
824+ service.startService()
825+ yield service.stopService()
826+ self.assertTrue(mock_callback.called)
827+ yield pause(0.0)
828+ self.assertThat(protocol.done, IsFiredDeferred())
829+ self.assertThat(extract_result(protocol.done), Is(None))
830+
831+ @inlineCallbacks
832+ def test__handles_abnormal_process_exit(self):
833+ protocol = MockJSONProtocol()
834+ service = FalseProcessProtocolService(
835+ description="Unit test process", protocol=protocol)
836+ mock_errback = Mock()
837+ mock_callback = Mock()
838+ protocol.done.addCallback(mock_callback)
839+ protocol.done.addErrback(mock_errback)
840+ service.startService()
841+ yield service.stopService()
842+ self.assertTrue(mock_errback.called)
843+ self.assertFalse(mock_callback.called)
844+ yield pause(0.0)
845+ self.assertThat(protocol.done, IsFiredDeferred())
846+ self.assertThat(extract_result(protocol.done), Equals(None))
847+
848+ @inlineCallbacks
849+ def test__calls_protocol_callback(self):
850+ callback = Mock()
851+ protocol = MockJSONProtocol(callback=callback)
852+ service = EchoProcessProtocolService(
853+ description="Unit test process", protocol=protocol)
854+ service.startService()
855+ # Wait for the protocol to finish. (the echo process will stop)
856+ yield protocol.done
857+ self.assertThat(callback, MockCalledOnceWith([{}]))
858+ yield service.stopService()
859+ yield pause(0.0)
860+ self.assertThat(protocol.done, IsFiredDeferred())
861+ self.assertThat(extract_result(protocol.done), Equals(None))
862+
863+
864+class TestNeighbourDiscoveryService(MAASTestCase):
865+ """Tests for `NeighbourDiscoveryService`."""
866+
867+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
868+
869+ def test__returns_expected_arguments(self):
870+ ifname = factory.make_name('eth')
871+ service = NeighbourDiscoveryService(ifname, Mock())
872+ args = service.getProcessParameters()
873+ self.assertThat(args, HasLength(3))
874+ self.assertTrue(args[0].endswith(b'maas-rack'))
875+ self.assertTrue(args[1], Equals(b"observe-arp"))
876+ self.assertTrue(args[2], Equals(ifname.encode('utf-8')))
877+
878+
879+class TestMDNSResolverService(MAASTestCase):
880+ """Tests for `MDNSResolverService`."""
881+
882+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
883+
884+ def test__returns_expected_arguments(self):
885+ service = MDNSResolverService(Mock())
886+ args = service.getProcessParameters()
887+ self.assertThat(args, HasLength(2))
888+ self.assertTrue(args[0].endswith(b"maas-rack"))
889+ self.assertTrue(args[1], Equals(b"observe-mdns"))