Merge lp:~mpontillo/maas/avahi-service-and-rpc into lp:~maas-committers/maas/trunk
- avahi-service-and-rpc
- Merge into trunk
Proposed by
Mike Pontillo
Status: | Merged |
---|---|
Approved by: | Mike Pontillo |
Approved revision: | no longer in the source branch. |
Merged at revision: | 5307 |
Proposed branch: | lp:~mpontillo/maas/avahi-service-and-rpc |
Merge into: | lp:~maas-committers/maas/trunk |
Diff against target: |
889 lines (+480/-74) 13 files modified
.bzrignore (+1/-0) .gitignore (+1/-0) .idea/maas.iml (+8/-0) src/maasserver/models/interface.py (+3/-3) src/maasserver/models/mdns.py (+7/-1) src/maasserver/models/tests/test_interface.py (+7/-7) src/maasserver/regiondservices/networks_monitoring.py (+10/-0) src/maasserver/rpc/rackcontrollers.py (+12/-0) src/maasserver/rpc/regionservice.py (+12/-0) src/provisioningserver/rackdservices/networks_monitoring_service.py (+9/-2) src/provisioningserver/rpc/region.py (+17/-0) src/provisioningserver/utils/services.py (+137/-60) src/provisioningserver/utils/tests/test_services.py (+256/-1) |
To merge this branch: | bzr merge lp:~mpontillo/maas/avahi-service-and-rpc |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Gavin Panella (community) | Approve | ||
Review via email: mp+304431@code.launchpad.net |
Commit message
Add twisted service and RPC so that mDNS entries can be recorded end-to-end in the MAAS database.
Description of the change
To post a comment you must log in.
Revision history for this message
Mike Pontillo (mpontillo) wrote : | # |
Thanks for the review. Some replies below.
Revision history for this message
Gavin Panella (allenap) : | # |
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | === modified file '.bzrignore' |
2 | --- .bzrignore 2016-08-18 17:31:05 +0000 |
3 | +++ .bzrignore 2016-08-31 18:00:29 +0000 |
4 | @@ -6,6 +6,7 @@ |
5 | ./.failed |
6 | ./.hypothesis |
7 | ./.idea |
8 | +./.idea/libraries |
9 | ./.idea/scopes |
10 | ./.idea/workspace.xml |
11 | ./.installed.cfg |
12 | |
13 | === modified file '.gitignore' |
14 | --- .gitignore 2016-08-18 17:31:05 +0000 |
15 | +++ .gitignore 2016-08-31 18:00:29 +0000 |
16 | @@ -6,6 +6,7 @@ |
17 | /.failed |
18 | /.hypothesis |
19 | /.idea |
20 | +/.idea/libraries |
21 | /.idea/scopes |
22 | /.idea/workspace.xml |
23 | /.installed.cfg |
24 | |
25 | === modified file '.idea/maas.iml' |
26 | --- .idea/maas.iml 2016-04-15 16:44:43 +0000 |
27 | +++ .idea/maas.iml 2016-08-31 18:00:29 +0000 |
28 | @@ -1,5 +1,12 @@ |
29 | <?xml version="1.0" encoding="UTF-8"?> |
30 | <module type="PYTHON_MODULE" version="4"> |
31 | + <component name="FacetManager"> |
32 | + <facet type="buildout-python" name="Buildout Support"> |
33 | + <configuration> |
34 | + <option name="script" value="bin/py" /> |
35 | + </configuration> |
36 | + </facet> |
37 | + </component> |
38 | <component name="NewModuleRootManager"> |
39 | <content url="file://$MODULE_DIR$"> |
40 | <sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" /> |
41 | @@ -11,6 +18,7 @@ |
42 | </content> |
43 | <orderEntry type="jdk" jdkName="Python 3.5.1+ (/usr/bin/python3.5)" jdkType="Python SDK" /> |
44 | <orderEntry type="sourceFolder" forTests="false" /> |
45 | + <orderEntry type="library" name="Buildout Eggs" level="project" /> |
46 | </component> |
47 | <component name="TemplatesService"> |
48 | <option name="TEMPLATE_CONFIGURATION" value="Django" /> |
49 | |
50 | === modified file 'src/maasserver/models/interface.py' |
51 | --- src/maasserver/models/interface.py 2016-08-20 12:42:22 +0000 |
52 | +++ src/maasserver/models/interface.py 2016-08-31 18:00:29 +0000 |
53 | @@ -1165,7 +1165,7 @@ |
54 | from maasserver.models.mdns import MDNS |
55 | if self.mdns_discovery_state is False: |
56 | return None |
57 | - ip = avahi_json['ip'] |
58 | + ip = avahi_json['address'] |
59 | hostname = avahi_json['hostname'] |
60 | deleted = MDNS.objects.delete_and_log_obsolete_mdns_entries( |
61 | hostname, ip, interface=self) |
62 | @@ -1177,8 +1177,8 @@ |
63 | # If we deleted a previous mDNS entry, then we have already |
64 | # generated a log statement about this mDNS entry. |
65 | if not deleted: |
66 | - maaslog.info("%s: New mDNS hostname observed for %s: '%s'." % ( |
67 | - self.get_log_string(), ip, hostname)) |
68 | + maaslog.info("%s: New mDNS entry resolved: '%s' on %s." % ( |
69 | + self.get_log_string(), hostname, ip)) |
70 | else: |
71 | binding.count += 1 |
72 | binding.save(update_fields=['count', 'updated']) |
73 | |
74 | === modified file 'src/maasserver/models/mdns.py' |
75 | --- src/maasserver/models/mdns.py 2016-08-15 15:33:20 +0000 |
76 | +++ src/maasserver/models/mdns.py 2016-08-31 18:00:29 +0000 |
77 | @@ -27,6 +27,7 @@ |
78 | get_one, |
79 | UniqueViolation, |
80 | ) |
81 | +from netaddr import IPAddress |
82 | from provisioningserver.logger import get_maas_logger |
83 | |
84 | |
85 | @@ -47,11 +48,16 @@ |
86 | """ |
87 | # Check if this hostname was previously assigned to another IP address. |
88 | deleted = False |
89 | + incoming_ip_version = IPAddress(ip).version |
90 | previous_bindings = self.filter( |
91 | hostname=hostname, interface=interface).exclude(ip=ip) |
92 | + # Check if this hostname was previously assigned to a different IP. |
93 | for binding in previous_bindings: |
94 | + if incoming_ip_version != IPAddress(binding.ip).version: |
95 | + # Don't move hostnames between address families. |
96 | + continue |
97 | maaslog.info("%s: Hostname '%s' moved from %s to %s." % ( |
98 | - interface.get_log_string(), ip, binding.ip, ip)) |
99 | + interface.get_log_string(), hostname, binding.ip, ip)) |
100 | binding.delete() |
101 | deleted = True |
102 | # Check if this IP address had a different hostname assigned. |
103 | |
104 | === modified file 'src/maasserver/models/tests/test_interface.py' |
105 | --- src/maasserver/models/tests/test_interface.py 2016-08-25 17:06:55 +0000 |
106 | +++ src/maasserver/models/tests/test_interface.py 2016-08-31 18:00:29 +0000 |
107 | @@ -951,7 +951,7 @@ |
108 | if hostname is None: |
109 | hostname = factory.make_hostname() |
110 | return { |
111 | - 'ip': ip, |
112 | + 'address': ip, |
113 | 'hostname': hostname, |
114 | } |
115 | |
116 | @@ -983,7 +983,7 @@ |
117 | self.assertThat(MDNS.objects.count(), Equals(1)) |
118 | iface.update_mdns_entry(json) |
119 | mdns_entry = reload_object(mdns_entry) |
120 | - self.assertThat(mdns_entry.ip, Equals(json['ip'])) |
121 | + self.assertThat(mdns_entry.ip, Equals(json['address'])) |
122 | self.assertThat(mdns_entry.hostname, Equals(json['hostname'])) |
123 | # This is the second time we saw this entry. |
124 | self.assertThat(mdns_entry.count, Equals(2)) |
125 | @@ -995,13 +995,13 @@ |
126 | json = self.make_mdns_entry_json() |
127 | iface.update_mdns_entry(json) |
128 | # Have a different IP address claim ownership of the hostname. |
129 | - json['ip'] = factory.make_ip_address(ipv6=False) |
130 | + json['address'] = factory.make_ip_address(ipv6=False) |
131 | iface.update_mdns_entry(json) |
132 | self.assertThat(MDNS.objects.count(), Equals(1)) |
133 | - self.assertThat(list(MDNS.objects.all())[0].ip, Equals(json['ip'])) |
134 | + self.assertThat(MDNS.objects.first().ip, Equals(json['address'])) |
135 | # This is the first time we saw this neighbour, because the original |
136 | # binding was deleted. |
137 | - self.assertThat(list(MDNS.objects.all())[0].count, Equals(1)) |
138 | + self.assertThat(MDNS.objects.count(), Equals(1)) |
139 | |
140 | def test__logs_new_entry(self): |
141 | iface = factory.make_Interface(INTERFACE_TYPE.PHYSICAL) |
142 | @@ -1010,7 +1010,7 @@ |
143 | with FakeLogger("maas.interface") as maaslog: |
144 | iface.update_mdns_entry(json) |
145 | self.assertDocTestMatches( |
146 | - "...: New mDNS hostname observed for...", |
147 | + "...: New mDNS entry resolved...", |
148 | maaslog.output) |
149 | |
150 | def test__logs_moved_entry(self): |
151 | @@ -1019,7 +1019,7 @@ |
152 | json = self.make_mdns_entry_json() |
153 | iface.update_mdns_entry(json) |
154 | # Have a different IP address claim ownership of the hostma,e. |
155 | - json['ip'] = factory.make_ip_address(ipv6=False) |
156 | + json['address'] = factory.make_ip_address(ipv6=False) |
157 | with FakeLogger("maas.mDNS") as maaslog: |
158 | iface.update_mdns_entry(json) |
159 | self.assertDocTestMatches( |
160 | |
161 | === modified file 'src/maasserver/regiondservices/networks_monitoring.py' |
162 | --- src/maasserver/regiondservices/networks_monitoring.py 2016-08-18 14:47:07 +0000 |
163 | +++ src/maasserver/regiondservices/networks_monitoring.py 2016-08-31 18:00:29 +0000 |
164 | @@ -27,6 +27,10 @@ |
165 | """Record the specified list of neighbours.""" |
166 | return deferToDatabase(self.recordNeighboursIntoDatabase, neighbours) |
167 | |
168 | + def reportMDNSEntries(self, mdns): |
169 | + """Record the specified list of mDNS entries.""" |
170 | + return deferToDatabase(self.recordMDNSEntriesIntoDatabase, mdns) |
171 | + |
172 | @transactional |
173 | def recordInterfacesIntoDatabase(self, interfaces): |
174 | """Record the interfaces information.""" |
175 | @@ -38,3 +42,9 @@ |
176 | """Record the interfaces information.""" |
177 | region_controller = RegionController.objects.get_running_controller() |
178 | region_controller.report_neighbours(neighbours) |
179 | + |
180 | + @transactional |
181 | + def recordMDNSEntriesIntoDatabase(self, mdns): |
182 | + """Record the mDNS entries.""" |
183 | + region_controller = RegionController.objects.get_running_controller() |
184 | + region_controller.report_mdns_entries(mdns) |
185 | |
186 | === modified file 'src/maasserver/rpc/rackcontrollers.py' |
187 | --- src/maasserver/rpc/rackcontrollers.py 2016-08-19 13:31:57 +0000 |
188 | +++ src/maasserver/rpc/rackcontrollers.py 2016-08-31 18:00:29 +0000 |
189 | @@ -194,6 +194,18 @@ |
190 | |
191 | @synchronous |
192 | @transactional |
193 | +def report_mdns_entries(system_id, mdns): |
194 | + """Report observed neighbours seen on the rack controller.""" |
195 | + try: |
196 | + rack_controller = RackController.objects.get(system_id=system_id) |
197 | + except RackController.DoesNotExist: |
198 | + raise NoSuchNode.from_system_id(system_id) |
199 | + else: |
200 | + rack_controller.report_mdns_entries(mdns) |
201 | + |
202 | + |
203 | +@synchronous |
204 | +@transactional |
205 | def report_neighbours(system_id, neighbours): |
206 | """Report observed neighbours seen on the rack controller.""" |
207 | try: |
208 | |
209 | === modified file 'src/maasserver/rpc/regionservice.py' |
210 | --- src/maasserver/rpc/regionservice.py 2016-08-19 18:11:11 +0000 |
211 | +++ src/maasserver/rpc/regionservice.py 2016-08-31 18:00:29 +0000 |
212 | @@ -408,6 +408,18 @@ |
213 | d.addCallback(lambda args: {}) |
214 | return d |
215 | |
216 | + @region.ReportMDNSEntries.responder |
217 | + def report_mdns_entries(self, system_id, mdns): |
218 | + """report_neighbours() |
219 | + |
220 | + Implementation of |
221 | + :py:class:`~provisioningserver.rpc.region.ReportNeighbours`. |
222 | + """ |
223 | + d = deferToDatabase( |
224 | + rackcontrollers.report_mdns_entries, system_id, mdns) |
225 | + d.addCallback(lambda args: {}) |
226 | + return d |
227 | + |
228 | @region.ReportNeighbours.responder |
229 | def report_neighbours(self, system_id, neighbours): |
230 | """report_neighbours() |
231 | |
232 | === modified file 'src/provisioningserver/rackdservices/networks_monitoring_service.py' |
233 | --- src/provisioningserver/rackdservices/networks_monitoring_service.py 2016-08-18 14:47:07 +0000 |
234 | +++ src/provisioningserver/rackdservices/networks_monitoring_service.py 2016-08-31 18:00:29 +0000 |
235 | @@ -9,6 +9,7 @@ |
236 | |
237 | from provisioningserver.logger.log import get_maas_logger |
238 | from provisioningserver.rpc.region import ( |
239 | + ReportMDNSEntries, |
240 | ReportNeighbours, |
241 | RequestRackRefresh, |
242 | UpdateInterfaces, |
243 | @@ -27,7 +28,7 @@ |
244 | self.clientService = clientService |
245 | |
246 | def recordInterfaces(self, interfaces): |
247 | - """Record the interfaces information.""" |
248 | + """Record the interfaces information to the region.""" |
249 | client = self.clientService.getClient() |
250 | # On first run perform a refresh |
251 | if self._recorded is None: |
252 | @@ -38,8 +39,14 @@ |
253 | interfaces=interfaces) |
254 | |
255 | def reportNeighbours(self, neighbours): |
256 | - """Report neighbour information.""" |
257 | + """Report neighbour information to the region.""" |
258 | client = self.clientService.getClient() |
259 | return client( |
260 | ReportNeighbours, system_id=client.localIdent, |
261 | neighbours=neighbours) |
262 | + |
263 | + def reportMDNSEntries(self, mdns): |
264 | + """Report mDNS entries to the region.""" |
265 | + client = self.clientService.getClient() |
266 | + return client( |
267 | + ReportMDNSEntries, system_id=client.localIdent, mdns=mdns) |
268 | |
269 | === modified file 'src/provisioningserver/rpc/region.py' |
270 | --- src/provisioningserver/rpc/region.py 2016-08-19 18:11:11 +0000 |
271 | +++ src/provisioningserver/rpc/region.py 2016-08-31 18:00:29 +0000 |
272 | @@ -23,6 +23,7 @@ |
273 | "RegisterRackController", |
274 | "ReportBootImages", |
275 | "ReportForeignDHCPServer", |
276 | + "ReportMDNSEntries", |
277 | "ReportNeighbours", |
278 | "RequestNodeInfoByMACAddress", |
279 | "SendEvent", |
280 | @@ -367,6 +368,22 @@ |
281 | errors = [] |
282 | |
283 | |
284 | +class ReportMDNSEntries(amp.Command): |
285 | + """Called by a rack controller to report observed mDNS entries. |
286 | + |
287 | + :since: 2.1 |
288 | + """ |
289 | + |
290 | + arguments = [ |
291 | + (b'system_id', amp.Unicode()), |
292 | + (b'mdns', StructureAsJSON()), |
293 | + ] |
294 | + response = [] |
295 | + errors = { |
296 | + NoSuchNode: b"NoSuchNode", |
297 | + } |
298 | + |
299 | + |
300 | class ReportNeighbours(amp.Command): |
301 | """Called by a rack controller to report observed neighbor devices. |
302 | |
303 | |
304 | === modified file 'src/provisioningserver/utils/services.py' |
305 | --- src/provisioningserver/utils/services.py 2016-08-19 15:34:46 +0000 |
306 | +++ src/provisioningserver/utils/services.py 2016-08-31 18:00:29 +0000 |
307 | @@ -13,6 +13,7 @@ |
308 | ) |
309 | from datetime import timedelta |
310 | import json |
311 | +from json.decoder import JSONDecodeError |
312 | import os |
313 | |
314 | from provisioningserver.config import is_dev_environment |
315 | @@ -34,35 +35,27 @@ |
316 | Deferred, |
317 | maybeDeferred, |
318 | ) |
319 | -from twisted.internet.error import ( |
320 | - ProcessDone, |
321 | - ProcessExitedAlready, |
322 | -) |
323 | -from twisted.internet.interfaces import ILoggingContext |
324 | +from twisted.internet.error import ProcessDone |
325 | from twisted.internet.protocol import ProcessProtocol |
326 | from twisted.internet.threads import deferToThread |
327 | from twisted.python import log |
328 | -from zope.interface import implementer |
329 | |
330 | |
331 | maaslog = get_maas_logger("networks.monitor") |
332 | |
333 | |
334 | -@implementer(ILoggingContext) |
335 | -class NeighbourObservationProtocol(ProcessProtocol): |
336 | +class JSONPerLineProtocol(ProcessProtocol): |
337 | + """ProcessProtocol which allows easy parsing of a single JSON object per |
338 | + line of text. |
339 | + """ |
340 | |
341 | - def __init__(self, interface, prefix=None, callback=None): |
342 | + def __init__(self, callback=None): |
343 | super().__init__() |
344 | self.done = Deferred() |
345 | - self.interface = interface |
346 | - self.prefix = prefix |
347 | self.callback = callback |
348 | self.outbuf = b'' |
349 | self.errbuf = b'' |
350 | |
351 | - def logPrefix(self): |
352 | - return "-" if self.prefix is None else self.prefix |
353 | - |
354 | def connectionMade(self): |
355 | self.outbuf = b'' |
356 | self.errbuf = b'' |
357 | @@ -93,10 +86,14 @@ |
358 | |
359 | def outLineReceived(self, line): |
360 | line = line.decode("utf-8") |
361 | - obj = json.loads(line) |
362 | - obj['interface'] = self.interface |
363 | - # Report one observation at a time for now. (The API is designed to |
364 | - # support multiple, but that will be done in a future branch.) |
365 | + try: |
366 | + obj = json.loads(line) |
367 | + except JSONDecodeError: |
368 | + log.msg("Failed to parse JSON: %r" % line) |
369 | + else: |
370 | + self.objectReceived(obj) |
371 | + |
372 | + def objectReceived(self, obj): |
373 | self.callback([obj]) |
374 | |
375 | def errLineReceived(self, line): |
376 | @@ -112,52 +109,87 @@ |
377 | self.done.errback(reason) |
378 | |
379 | |
380 | -class NeighbourDiscoveryService(TimerService): |
381 | - """Service to spawn the per-interface device discovery subprocess.""" |
382 | - |
383 | - def __init__(self, ifname): |
384 | - super().__init__(60.0, self.observeInterface, ifname) |
385 | - self._ifname = ifname |
386 | - self._process = None |
387 | +class NeighbourObservationProtocol(JSONPerLineProtocol): |
388 | + |
389 | + def __init__(self, interface, *args, **kwargs): |
390 | + super().__init__(*args, **kwargs) |
391 | + self.interface = interface |
392 | + |
393 | + def objectReceived(self, obj): |
394 | + # The only difference between the JSONPerLineProtocol and the |
395 | + # NeighbourObservationProtocol is that the neighbour observation |
396 | + # protocol needs to insert the interface metadata into the resultant |
397 | + # object before the callback. |
398 | + obj['interface'] = self.interface |
399 | + super().objectReceived(obj) |
400 | + |
401 | + |
402 | +class ProcessProtocolService(TimerService, metaclass=ABCMeta): |
403 | + |
404 | + def __init__(self, description, protocol, interval=60.0): |
405 | + assert protocol is not None |
406 | + assert description is not None |
407 | + self.description = description |
408 | + self.protocol = protocol |
409 | + self.process = None |
410 | + super().__init__(interval, self.startProcess) |
411 | |
412 | @deferred |
413 | - def observeInterface(self, ifname): |
414 | - """Start a network observation process for the specified interface.""" |
415 | - observer_protocol = NeighbourObservationProtocol( |
416 | - ifname, callback=self.parent.reportNeighbours) |
417 | + def startProcess(self): |
418 | env = select_c_utf8_bytes_locale() |
419 | - maas_rack_cmd = get_maas_provision_command().encode("utf-8") |
420 | - self._process = reactor.spawnProcess( |
421 | - observer_protocol, maas_rack_cmd, |
422 | - [maas_rack_cmd, b"observe-arp", ifname.encode("utf-8")], |
423 | - env=env) |
424 | - log.msg("Started neighbour observation on interface: %s" % ifname) |
425 | - return observer_protocol.done.addErrback( |
426 | - log.err, |
427 | - "Neighbour observation process failed for interface: %s" % ( |
428 | - self._ifname |
429 | - )) |
430 | - |
431 | - def stopObservationProcess(self): |
432 | - """Attempts to kill the neighbour observation process gracefully.""" |
433 | - if self._process is not None: |
434 | - try: |
435 | - self._process.signalProcess("INT") |
436 | - except ProcessExitedAlready: |
437 | - pass # Our work here is done. |
438 | - except OSError: |
439 | - log.err( |
440 | - None, |
441 | - "Neighbour observation process for %s failed to " |
442 | - "shut down." % self._ifname) |
443 | - else: |
444 | - return self._process.done |
445 | + log.msg("%s started." % self.description) |
446 | + args = self.getProcessParameters() |
447 | + self.process = reactor.spawnProcess( |
448 | + self.protocol, args[0], args, env=env) |
449 | + return self.protocol.done.addErrback( |
450 | + log.err, "%s failed." % self.description) |
451 | + |
452 | + @abstractmethod |
453 | + def getProcessParameters(self): |
454 | + """Return the parameters for the subprocess to launch. |
455 | + |
456 | + This MUST be overridden in subclasses. |
457 | + """ |
458 | |
459 | def stopService(self): |
460 | """Stops the neighbour observation service.""" |
461 | - d = super().stopService() |
462 | - d.addCallback(callOut, self.stopObservationProcess) |
463 | - return d |
464 | + if self.process is not None: |
465 | + self.process.loseConnection() |
466 | + return super().stopService() |
467 | + |
468 | + |
469 | +class NeighbourDiscoveryService(ProcessProtocolService): |
470 | + """Service to spawn the per-interface device discovery subprocess.""" |
471 | + |
472 | + def __init__(self, ifname: str, callback): |
473 | + self.ifname = ifname |
474 | + description = "Neighbour observation process for %s" % ifname |
475 | + protocol = NeighbourObservationProtocol(ifname, callback=callback) |
476 | + super().__init__(description=description, protocol=protocol) |
477 | + |
478 | + def getProcessParameters(self): |
479 | + maas_rack_cmd = get_maas_provision_command().encode("utf-8") |
480 | + return [ |
481 | + maas_rack_cmd, |
482 | + b"observe-arp", |
483 | + self.ifname.encode("utf-8") |
484 | + ] |
485 | + |
486 | + |
487 | +class MDNSResolverService(ProcessProtocolService): |
488 | + """Service to spawn the per-interface device discovery subprocess.""" |
489 | + |
490 | + def __init__(self, callback): |
491 | + protocol = JSONPerLineProtocol(callback=callback) |
492 | + super().__init__( |
493 | + description="mDNS resolver process", protocol=protocol) |
494 | + |
495 | + def getProcessParameters(self): |
496 | + maas_rack_cmd = get_maas_provision_command().encode("utf-8") |
497 | + return [ |
498 | + maas_rack_cmd, |
499 | + b"observe-mdns", |
500 | + ] |
501 | |
502 | |
503 | class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
504 | @@ -231,6 +263,13 @@ |
505 | This MUST be overridden in subclasses. |
506 | """ |
507 | |
508 | + @abstractmethod |
509 | + def reportMDNSEntries(self, mdns): |
510 | + """Report on new or refreshed neighbours. |
511 | + |
512 | + This MUST be overridden in subclasses. |
513 | + """ |
514 | + |
515 | def stopService(self): |
516 | """Stop the service. |
517 | |
518 | @@ -307,10 +346,32 @@ |
519 | |
520 | def _startNeighbourDiscovery(self, ifname): |
521 | """"Start neighbour discovery service on the specified interface.""" |
522 | - service = NeighbourDiscoveryService(ifname) |
523 | + service = NeighbourDiscoveryService(ifname, self.reportNeighbours) |
524 | service.setName("neighbour_discovery:" + ifname) |
525 | service.setServiceParent(self) |
526 | |
527 | + def _startMDNSDiscoveryService(self): |
528 | + """Start resolving mDNS entries on attached networks.""" |
529 | + try: |
530 | + self.getServiceNamed("mdns_resolver") |
531 | + except KeyError: |
532 | + # This is an expected exception. (The call inside the `try` |
533 | + # is only necessary to ensure the service doesn't exist.) |
534 | + service = MDNSResolverService(self.reportMDNSEntries) |
535 | + service.setName("mdns_resolver") |
536 | + service.setServiceParent(self) |
537 | + |
538 | + def _stopMDNSDiscoveryService(self): |
539 | + """Stop resolving mDNS entries on attached networks.""" |
540 | + try: |
541 | + service = self.getServiceNamed("mdns_resolver") |
542 | + except KeyError: |
543 | + # Service doesn't exist, so no need to stop it. |
544 | + pass |
545 | + else: |
546 | + service.disownServiceParent() |
547 | + maaslog.info("Stopped mDNS resolver service.") |
548 | + |
549 | def _startNeighbourDiscoveryServices(self, new_interfaces): |
550 | """Start monitoring services for the specified set of interfaces.""" |
551 | for ifname in new_interfaces: |
552 | @@ -319,7 +380,7 @@ |
553 | self.getServiceNamed("neighbour_disovery:" + ifname) |
554 | except KeyError: |
555 | # This is an expected exception. (The call inside the `try` |
556 | - # is only necessary to ensure the service doesn't exist. |
557 | + # is only necessary to ensure the service doesn't exist.) |
558 | self._startNeighbourDiscovery(ifname) |
559 | |
560 | def _stopNeighbourDiscoveryServices(self, deleted_interfaces): |
561 | @@ -356,6 +417,22 @@ |
562 | deleted_interfaces = self._monitored.difference(monitored_interfaces) |
563 | self._startNeighbourDiscoveryServices(new_interfaces) |
564 | self._stopNeighbourDiscoveryServices(deleted_interfaces) |
565 | + # Determine if we went from monitoring zero interfaces to monitoring |
566 | + # at least one. If so, we need to start mDNS discovery. |
567 | + # XXX Need to get this setting from the region. |
568 | + if len(self._monitored) == 0 and len(monitored_interfaces) > 0: |
569 | + # We weren't currently monitoring any interfaces, but we have been |
570 | + # requested to monitor at least one. |
571 | + self._startMDNSDiscoveryService() |
572 | + elif len(self._monitored) > 0 and len(monitored_interfaces) == 0: |
573 | + # We are currently monitoring at least one interface, but we have |
574 | + # been requested to stop monitoring them all. |
575 | + self._stopMDNSDiscoveryService() |
576 | + else: |
577 | + # No state change. We either still AREN'T monitoring any |
578 | + # interfaces, or we still ARE monitoring them. (Either way, it |
579 | + # doesn't matter for mDNS discovery purposes.) |
580 | + pass |
581 | self._monitored = monitored_interfaces |
582 | |
583 | def _interfacesRecorded(self, interfaces): |
584 | |
585 | === modified file 'src/provisioningserver/utils/tests/test_services.py' |
586 | --- src/provisioningserver/utils/tests/test_services.py 2016-08-20 12:42:22 +0000 |
587 | +++ src/provisioningserver/utils/tests/test_services.py 2016-08-31 18:00:29 +0000 |
588 | @@ -8,6 +8,7 @@ |
589 | import threading |
590 | from unittest.mock import ( |
591 | call, |
592 | + Mock, |
593 | sentinel, |
594 | ) |
595 | |
596 | @@ -15,6 +16,7 @@ |
597 | from maastesting.matchers import ( |
598 | DocTestMatches, |
599 | HasLength, |
600 | + IsFiredDeferred, |
601 | MockCalledOnceWith, |
602 | MockCallsMatch, |
603 | MockNotCalled, |
604 | @@ -25,12 +27,23 @@ |
605 | ) |
606 | from maastesting.twisted import TwistedLoggerFixture |
607 | from provisioningserver.utils import services |
608 | -from provisioningserver.utils.services import NetworksMonitoringService |
609 | +from provisioningserver.utils.services import ( |
610 | + JSONPerLineProtocol, |
611 | + MDNSResolverService, |
612 | + NeighbourDiscoveryService, |
613 | + NeighbourObservationProtocol, |
614 | + NetworksMonitoringService, |
615 | + ProcessProtocolService, |
616 | +) |
617 | +from provisioningserver.utils.twisted import pause |
618 | +from testtools import ExpectedException |
619 | from testtools.matchers import ( |
620 | Equals, |
621 | + Is, |
622 | IsInstance, |
623 | Not, |
624 | ) |
625 | +from testtools.tests.twistedsupport.test_deferred import extract_result |
626 | from twisted.application.service import MultiService |
627 | from twisted.internet import reactor |
628 | from twisted.internet.defer import ( |
629 | @@ -38,7 +51,13 @@ |
630 | inlineCallbacks, |
631 | succeed, |
632 | ) |
633 | +from twisted.internet.error import ( |
634 | + ProcessDone, |
635 | + ProcessExitedAlready, |
636 | + ProcessTerminated, |
637 | +) |
638 | from twisted.python import threadable |
639 | +from twisted.python.failure import Failure |
640 | |
641 | |
642 | class StubNetworksMonitoringService(NetworksMonitoringService): |
643 | @@ -60,6 +79,9 @@ |
644 | def reportNeighbours(self, neighbours): |
645 | pass |
646 | |
647 | + def reportMDNSEntries(self, neighbours): |
648 | + pass |
649 | + |
650 | |
651 | class TestNetworksMonitoringService(MAASTestCase): |
652 | """Tests of `NetworksMonitoringService`.""" |
653 | @@ -253,3 +275,236 @@ |
654 | yield service.updateInterfaces() |
655 | # ... interfaces ARE recorded. |
656 | self.assertThat(service.interfaces, Not(Equals([]))) |
657 | + |
658 | + |
659 | +class TestJSONPerLineProtocol(MAASTestCase): |
660 | + """Tests for `JSONPerLineProtocol`.""" |
661 | + |
662 | + run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
663 | + |
664 | + @inlineCallbacks |
665 | + def test__propagates_exit_errors(self): |
666 | + proto = JSONPerLineProtocol(callback=lambda json: None) |
667 | + reactor.spawnProcess(proto, b"false", (b"false",)) |
668 | + with ExpectedException(ProcessTerminated, ".* exit code 1"): |
669 | + yield proto.done |
670 | + |
671 | + def test__parses_only_full_lines(self): |
672 | + callback = Mock() |
673 | + proto = JSONPerLineProtocol(callback=callback) |
674 | + proto.connectionMade() |
675 | + # Send an empty JSON dictionary using 3 separate writes. |
676 | + proto.outReceived(b"{") |
677 | + # No callback yet... |
678 | + self.expectThat(callback, MockCallsMatch()) |
679 | + proto.outReceived(b"}") |
680 | + # Still no callback... |
681 | + self.expectThat(callback, MockCallsMatch()) |
682 | + proto.outReceived(b"\n") |
683 | + # After a newline, we expect the JSON to be parsed and the callback |
684 | + # to receive an empty Python dictionary (which corresponds to the JSON |
685 | + # that was sent.) |
686 | + self.expectThat(callback, MockCallsMatch(call([{}]))) |
687 | + |
688 | + def test__ignores_interspersed_zero_length_writes(self): |
689 | + callback = Mock() |
690 | + proto = JSONPerLineProtocol(callback=callback) |
691 | + proto.connectionMade() |
692 | + proto.outReceived(b"") |
693 | + self.expectThat(callback, MockCallsMatch()) |
694 | + proto.outReceived(b"{}\n") |
695 | + self.expectThat(callback, MockCallsMatch(call([{}]))) |
696 | + proto.outReceived(b"") |
697 | + self.expectThat(callback, MockCallsMatch(call([{}]))) |
698 | + proto.outReceived(b"{}\n") |
699 | + self.expectThat(callback, MockCallsMatch(call([{}]), call([{}]))) |
700 | + |
701 | + def test__logs_non_json_output(self): |
702 | + callback = Mock() |
703 | + proto = JSONPerLineProtocol(callback=callback) |
704 | + proto.connectionMade() |
705 | + with TwistedLoggerFixture() as logger: |
706 | + proto.outReceived(b"{\n") |
707 | + self.assertThat( |
708 | + logger.output, DocTestMatches("Failed to parse JSON: ...")) |
709 | + |
710 | + def test__logs_stderr(self): |
711 | + message = factory.make_name("message") |
712 | + callback = Mock() |
713 | + proto = JSONPerLineProtocol(callback=callback) |
714 | + proto.connectionMade() |
715 | + with TwistedLoggerFixture() as logger: |
716 | + proto.errReceived((message + "\n").encode("ascii")) |
717 | + self.assertThat(logger.output, Equals(message)) |
718 | + |
719 | + def test__logs_only_full_lines_from_stderr(self): |
720 | + message = factory.make_name("message") |
721 | + callback = Mock() |
722 | + proto = JSONPerLineProtocol(callback=callback) |
723 | + proto.connectionMade() |
724 | + with TwistedLoggerFixture() as logger: |
725 | + proto.errReceived(message.encode("ascii")) |
726 | + self.assertThat(logger.output, Equals("")) |
727 | + |
728 | + def test__logs_stderr_at_process_end(self): |
729 | + message = factory.make_name("message") |
730 | + callback = Mock() |
731 | + proto = JSONPerLineProtocol(callback=callback) |
732 | + proto.connectionMade() |
733 | + with TwistedLoggerFixture() as logger: |
734 | + proto.errReceived(message.encode("ascii")) |
735 | + self.assertThat(logger.output, Equals("")) |
736 | + proto.processEnded(Failure(ProcessDone(0))) |
737 | + self.assertThat(logger.output, Equals(message)) |
738 | + |
739 | + def test__propagates_errors_from_command(self): |
740 | + callback = Mock() |
741 | + proto = JSONPerLineProtocol(callback=callback) |
742 | + proto.connectionMade() |
743 | + reason = Failure(ProcessTerminated(1)) |
744 | + proto.processEnded(reason) |
745 | + self.assertRaises(ProcessTerminated, extract_result, proto.done) |
746 | + |
747 | + |
748 | +class TestNeighbourObservationProtocol(MAASTestCase): |
749 | + """Tests for `NeighbourObservationProtocol`.""" |
750 | + |
751 | + run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
752 | + |
753 | + def test_adds_interface(self): |
754 | + callback = Mock() |
755 | + ifname = factory.make_name('eth') |
756 | + proto = NeighbourObservationProtocol(ifname, callback=callback) |
757 | + proto.connectionMade() |
758 | + proto.outReceived(b"{}\n") |
759 | + self.expectThat( |
760 | + callback, MockCallsMatch(call([{"interface": ifname}]))) |
761 | + |
762 | + |
763 | +class TrueProcessProtocolService(ProcessProtocolService): |
764 | + |
765 | + def getProcessParameters(self): |
766 | + return ["/bin/true"] |
767 | + |
768 | + |
769 | +class FalseProcessProtocolService(ProcessProtocolService): |
770 | + |
771 | + def getProcessParameters(self): |
772 | + return ["/bin/false"] |
773 | + |
774 | + |
775 | +class CatProcessProtocolService(ProcessProtocolService): |
776 | + |
777 | + def getProcessParameters(self): |
778 | + return ["/bin/cat"] |
779 | + |
780 | + |
781 | +class EchoProcessProtocolService(ProcessProtocolService): |
782 | + |
783 | + def getProcessParameters(self): |
784 | + return ["/bin/echo", "{}\n"] |
785 | + |
786 | + |
787 | +class MockJSONProtocol(JSONPerLineProtocol): |
788 | + pass |
789 | + |
790 | + |
791 | +class TestProcessProtocolService(MAASTestCase): |
792 | + """Tests for `JSONPerLineProtocol`.""" |
793 | + |
794 | + run_tests_with = MAASTwistedRunTest.make_factory(debug=True, timeout=5) |
795 | + |
796 | + def test__base_class_cannot_be_used(self): |
797 | + with ExpectedException(TypeError): |
798 | + ProcessProtocolService( |
799 | + description="Mock process", protocol=Mock()) |
800 | + |
801 | + @inlineCallbacks |
802 | + def test__starts_and_stops_process(self): |
803 | + protocol = MockJSONProtocol() |
804 | + service = CatProcessProtocolService( |
805 | + description="Unit test process", protocol=protocol) |
806 | + mock_callback = Mock() |
807 | + protocol.done.addCallback(mock_callback) |
808 | + service.startService() |
809 | + yield service.stopService() |
810 | + self.assertTrue(mock_callback.called) |
811 | + yield pause(0.0) |
812 | + self.assertThat(protocol.done, IsFiredDeferred()) |
813 | + self.assertThat(extract_result(protocol.done), Is(None)) |
814 | + with ExpectedException(ProcessExitedAlready): |
815 | + service.process.signalProcess("INT") |
816 | + |
817 | + @inlineCallbacks |
818 | + def test__handles_normal_process_exit(self): |
819 | + protocol = MockJSONProtocol() |
820 | + service = TrueProcessProtocolService( |
821 | + description="Unit test process", protocol=protocol) |
822 | + mock_callback = Mock() |
823 | + protocol.done.addCallback(mock_callback) |
824 | + service.startService() |
825 | + yield service.stopService() |
826 | + self.assertTrue(mock_callback.called) |
827 | + yield pause(0.0) |
828 | + self.assertThat(protocol.done, IsFiredDeferred()) |
829 | + self.assertThat(extract_result(protocol.done), Is(None)) |
830 | + |
831 | + @inlineCallbacks |
832 | + def test__handles_abnormal_process_exit(self): |
833 | + protocol = MockJSONProtocol() |
834 | + service = FalseProcessProtocolService( |
835 | + description="Unit test process", protocol=protocol) |
836 | + mock_errback = Mock() |
837 | + mock_callback = Mock() |
838 | + protocol.done.addCallback(mock_callback) |
839 | + protocol.done.addErrback(mock_errback) |
840 | + service.startService() |
841 | + yield service.stopService() |
842 | + self.assertTrue(mock_errback.called) |
843 | + self.assertFalse(mock_callback.called) |
844 | + yield pause(0.0) |
845 | + self.assertThat(protocol.done, IsFiredDeferred()) |
846 | + self.assertThat(extract_result(protocol.done), Equals(None)) |
847 | + |
848 | + @inlineCallbacks |
849 | + def test__calls_protocol_callback(self): |
850 | + callback = Mock() |
851 | + protocol = MockJSONProtocol(callback=callback) |
852 | + service = EchoProcessProtocolService( |
853 | + description="Unit test process", protocol=protocol) |
854 | + service.startService() |
855 | + # Wait for the protocol to finish. (the echo process will stop) |
856 | + yield protocol.done |
857 | + self.assertThat(callback, MockCalledOnceWith([{}])) |
858 | + yield service.stopService() |
859 | + yield pause(0.0) |
860 | + self.assertThat(protocol.done, IsFiredDeferred()) |
861 | + self.assertThat(extract_result(protocol.done), Equals(None)) |
862 | + |
863 | + |
864 | +class TestNeighbourDiscoveryService(MAASTestCase): |
865 | + """Tests for `NeighbourDiscoveryService`.""" |
866 | + |
867 | + run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
868 | + |
869 | + def test__returns_expected_arguments(self): |
870 | + ifname = factory.make_name('eth') |
871 | + service = NeighbourDiscoveryService(ifname, Mock()) |
872 | + args = service.getProcessParameters() |
873 | + self.assertThat(args, HasLength(3)) |
874 | + self.assertTrue(args[0].endswith(b'maas-rack')) |
875 | + self.assertTrue(args[1], Equals(b"observe-arp")) |
876 | + self.assertTrue(args[2], Equals(ifname.encode('utf-8'))) |
877 | + |
878 | + |
879 | +class TestMDNSResolverService(MAASTestCase): |
880 | + """Tests for `MDNSResolverService`.""" |
881 | + |
882 | + run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
883 | + |
884 | + def test__returns_expected_arguments(self): |
885 | + service = MDNSResolverService(Mock()) |
886 | + args = service.getProcessParameters() |
887 | + self.assertThat(args, HasLength(2)) |
888 | + self.assertTrue(args[0].endswith(b"maas-rack")) |
889 | + self.assertTrue(args[1], Equals(b"observe-mdns")) |
Looks good. I've made a few suggestions, that's all.