Merge ~mpontillo/maas:beaconing-service into maas:master
- Git
- lp:~mpontillo/maas
- beaconing-service
- Merge into master
Proposed by
Mike Pontillo
Status: | Merged |
---|---|
Approved by: | Mike Pontillo |
Approved revision: | af83e74d10d132c3eaedd29b57be4a2e336c111b |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~mpontillo/maas:beaconing-service |
Merge into: | maas:master |
Diff against target: |
950 lines (+634/-22) 5 files modified
src/maastesting/factory.py (+21/-1) src/provisioningserver/utils/beaconing.py (+74/-16) src/provisioningserver/utils/services.py (+275/-4) src/provisioningserver/utils/tests/test_beaconing.py (+111/-1) src/provisioningserver/utils/tests/test_services.py (+153/-0) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Данило Шеган (community) | Approve | ||
Review via email: mp+326397@code.launchpad.net |
Commit message
Add initial beaconing service and protocol.
* Add ProtocolForObse
monitoring process.
* Add BeaconingSocket
ProtocolForO
layer path is used for unit tests, and in a future test command.)
* Add a way to age out entries in an OrderedDict based upon UUID keys.
(The UUIDs must have a valid 'time' field.)
* Add initial transmit and receive path for beacons.
* If solicitation packets are received, respond to them with
advertisements.
Description of the change
To post a comment you must log in.
Revision history for this message
Mike Pontillo (mpontillo) wrote : | # |
Revision history for this message
Данило Шеган (danilo) wrote : | # |
Looks good, sorry for the delay (it slipped on Friday due to my battles with the lander). A few minor nits inline.
review:
Approve
Revision history for this message
Mike Pontillo (mpontillo) wrote : | # |
Thanks for the review! Some replies below.
- af83e74... by Mike Pontillo
-
Review fixes.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/src/maastesting/factory.py b/src/maastesting/factory.py |
2 | index b83cffb..d12ac17 100644 |
3 | --- a/src/maastesting/factory.py |
4 | +++ b/src/maastesting/factory.py |
5 | @@ -30,7 +30,10 @@ from unittest import mock |
6 | import urllib.error |
7 | import urllib.parse |
8 | import urllib.request |
9 | -from uuid import uuid1 |
10 | +from uuid import ( |
11 | + UUID, |
12 | + uuid1, |
13 | +) |
14 | |
15 | from distro_info import UbuntuDistroInfo |
16 | from maastesting.fixtures import TempDirectory |
17 | @@ -255,6 +258,23 @@ class Factory: |
18 | def make_UUID(self): |
19 | return str(uuid1()) |
20 | |
21 | + def make_UUID_with_timestamp(self, timestamp, clock_seq=None, node=None): |
22 | + if node is None: |
23 | + node = random.getrandbits(48) | 0x010000000000 |
24 | + if clock_seq is None: |
25 | + clock_seq = random.getrandbits(14) |
26 | + timestamp = int(timestamp * 1e9 / 100) + 0x01b21dd213814000 |
27 | + time_low = timestamp & 0xffffffff |
28 | + time_mid = (timestamp >> 32) & 0xffff |
29 | + time_hi_version = (timestamp >> 48) & 0x0fff |
30 | + clock_seq_low = clock_seq & 0xff |
31 | + clock_seq_hi_variant = (clock_seq >> 8) & 0x3f |
32 | + fields = ( |
33 | + time_low, time_mid, time_hi_version, clock_seq_hi_variant, |
34 | + clock_seq_low, node |
35 | + ) |
36 | + return str(UUID(fields=fields, version=1)) |
37 | + |
38 | def _make_random_network( |
39 | self, slash=None, but_not=EMPTY_SET, disjoint_from=None, |
40 | random_address_factory=None): |
41 | diff --git a/src/provisioningserver/utils/beaconing.py b/src/provisioningserver/utils/beaconing.py |
42 | index e8c2384..5b5d286 100644 |
43 | --- a/src/provisioningserver/utils/beaconing.py |
44 | +++ b/src/provisioningserver/utils/beaconing.py |
45 | @@ -19,13 +19,16 @@ from gzip import ( |
46 | decompress, |
47 | ) |
48 | import json |
49 | +import math |
50 | import os |
51 | import stat |
52 | import struct |
53 | import subprocess |
54 | import sys |
55 | from textwrap import dedent |
56 | +import time |
57 | import uuid |
58 | +from uuid import UUID |
59 | |
60 | from bson import BSON |
61 | from bson.errors import BSONError |
62 | @@ -48,6 +51,9 @@ from provisioningserver.utils.tcpip import ( |
63 | |
64 | |
65 | BEACON_PORT = 5240 |
66 | +BEACON_IPV4_MULTICAST = "224.0.0.118" |
67 | +BEACON_IPV6_MULTICAST = "ff02::15a" |
68 | + |
69 | |
70 | BEACON_TYPES = { |
71 | "solicitation": 1, |
72 | @@ -71,6 +77,61 @@ BeaconPayload = namedtuple('BeaconPayload', ( |
73 | )) |
74 | |
75 | |
76 | +def uuid_to_timestamp(uuid_str): |
77 | + """Given the specified UUID string, returns the timestamp. |
78 | + |
79 | + The timestamp returned should be comparable to what would be returned |
80 | + from `import time; time.time()`. |
81 | + |
82 | + :param uuid_str: a UUID in string format |
83 | + :return: float |
84 | + """ |
85 | + uuid_time = UUID(uuid_str).time |
86 | + # Reverse the algorithm in uuid.py. |
87 | + timestamp = (uuid_time - 0x01b21dd213814000) * 100 / 1e9 |
88 | + return timestamp |
89 | + |
90 | + |
91 | +def age_out_uuid_queue(queue, threshold=120.0): |
92 | + """Ages out a ordered dictionary (using UUID-based keys) based on time. |
93 | + |
94 | + The given threshold (in seconds) indicates how old an entry can be |
95 | + before it will be removed from the queue. |
96 | + |
97 | + :param queue: An `OrderedDict` with UUID strings as keys. |
98 | + :param threshold: The maximum time an entry can remain in the queue. |
99 | + """ |
100 | + removals = [] |
101 | + current_time = time.time() |
102 | + for key in queue: |
103 | + beacon_timestamp = uuid_to_timestamp(key) |
104 | + # Don't leave beacons from the future in the queue if the clock |
105 | + # suddenly changes. (This shouldn't happen, since the Fernet TTL |
106 | + # should not have allowed them through. But just in case.) |
107 | + difference = math.fabs(current_time - beacon_timestamp) |
108 | + # Age out beacons greater than two minutes old. |
109 | + if difference > threshold: |
110 | + removals.append(key) |
111 | + else: |
112 | + # If we're already encountering packets that haven't met the age |
113 | + # threshold (and were received more recently, and thus are later |
114 | + # in the queue) then it's time to give up. (Yes, it's possible that |
115 | + # clock skew could be an issue here, but after a couple of minutes, |
116 | + # it won't matter.) |
117 | + break |
118 | + for uuid_to_remove in removals: |
119 | + queue.pop(uuid_to_remove, None) |
120 | + |
121 | + |
122 | +def beacon_to_json(beacon_payload): |
123 | + """Converts the specified beacon into a format suitable for JSON.""" |
124 | + return { |
125 | + "version": beacon_payload.version, |
126 | + "type": beacon_payload.type, |
127 | + "payload": beacon_payload.payload, |
128 | + } |
129 | + |
130 | + |
131 | def create_beacon_payload(beacon_type, payload=None, version=PROTOCOL_VERSION): |
132 | """Creates a beacon payload of the specified type, with the given data. |
133 | |
134 | @@ -106,7 +167,7 @@ def read_beacon_payload(beacon_bytes): |
135 | Decrypts the inner beacon data if necessary. |
136 | |
137 | :param beacon_bytes: beacon payload (bytes). |
138 | - :return: dict |
139 | + :return: BeaconPayload namedtuple |
140 | """ |
141 | if len(beacon_bytes) < BEACON_HEADER_LENGTH_V1: |
142 | raise InvalidBeaconingPacket( |
143 | @@ -131,7 +192,7 @@ def read_beacon_payload(beacon_bytes): |
144 | else: |
145 | try: |
146 | decrypted_data = fernet_decrypt_psk( |
147 | - payload_bytes, raw=True) |
148 | + payload_bytes, ttl=60, raw=True) |
149 | except InvalidToken: |
150 | raise InvalidBeaconingPacket( |
151 | "Failed to decrypt inner payload: check MAAS secret key.") |
152 | @@ -176,18 +237,13 @@ class BeaconingPacket: |
153 | self.valid = None |
154 | self.invalid_reason = None |
155 | self.packet = pkt_bytes |
156 | - self.payload = self.parse() |
157 | + self.data = self.parse() |
158 | |
159 | def parse(self): |
160 | - """Output text-based details about this beaconing packet to the |
161 | - specified file or stream. |
162 | - |
163 | - :param out: An object with `write(str)` and `flush()` methods. |
164 | - """ |
165 | try: |
166 | - payload = read_beacon_payload(self.packet) |
167 | + beacon = read_beacon_payload(self.packet) |
168 | self.valid = True |
169 | - return payload |
170 | + return beacon |
171 | except InvalidBeaconingPacket as ibp: |
172 | self.valid = False |
173 | self.invalid_reason = ibp.invalid_reason |
174 | @@ -213,6 +269,8 @@ def observe_beaconing_packets(input=sys.stdin.buffer, out=sys.stdout): |
175 | try: |
176 | packet = decode_ethernet_udp_packet(packet_bytes, pcap_header) |
177 | beacon = BeaconingPacket(packet.payload) |
178 | + if not beacon.valid: |
179 | + continue |
180 | output_json = { |
181 | "source_mac": format_eui(packet.l2.src_eui), |
182 | "destination_mac": format_eui(packet.l2.dst_eui), |
183 | @@ -220,15 +278,15 @@ def observe_beaconing_packets(input=sys.stdin.buffer, out=sys.stdout): |
184 | "destination_ip": str(packet.l3.dst_ip), |
185 | "source_port": packet.l4.packet.src_port, |
186 | "destination_port": packet.l4.packet.dst_port, |
187 | + "time": pcap_header.timestamp_seconds |
188 | } |
189 | if packet.l2.vid is not None: |
190 | output_json["vid"] = packet.l2.vid |
191 | - if beacon.payload is not None: |
192 | - output_json['payload'] = beacon.payload |
193 | - output_json['time'] = pcap_header.timestamp_seconds |
194 | - out.write(json.dumps(output_json)) |
195 | - out.write('\n') |
196 | - out.flush() |
197 | + if beacon.data is not None: |
198 | + output_json.update(beacon_to_json(beacon.data)) |
199 | + out.write(json.dumps(output_json)) |
200 | + out.write('\n') |
201 | + out.flush() |
202 | except PacketProcessingError as e: |
203 | err.write(e.error) |
204 | err.write("\n") |
205 | diff --git a/src/provisioningserver/utils/services.py b/src/provisioningserver/utils/services.py |
206 | index dd6b399..0ca0394 100644 |
207 | --- a/src/provisioningserver/utils/services.py |
208 | +++ b/src/provisioningserver/utils/services.py |
209 | @@ -11,10 +11,12 @@ from abc import ( |
210 | ABCMeta, |
211 | abstractmethod, |
212 | ) |
213 | +from collections import OrderedDict |
214 | from datetime import timedelta |
215 | import json |
216 | from json.decoder import JSONDecodeError |
217 | import os |
218 | +from pprint import pformat |
219 | import re |
220 | |
221 | from provisioningserver.config import is_dev_environment |
222 | @@ -23,6 +25,14 @@ from provisioningserver.logger import ( |
223 | LegacyLogger, |
224 | ) |
225 | from provisioningserver.rpc.exceptions import NoConnectionsAvailable |
226 | +from provisioningserver.utils.beaconing import ( |
227 | + age_out_uuid_queue, |
228 | + BEACON_IPV4_MULTICAST, |
229 | + BEACON_PORT, |
230 | + beacon_to_json, |
231 | + create_beacon_payload, |
232 | + read_beacon_payload, |
233 | +) |
234 | from provisioningserver.utils.fs import ( |
235 | get_maas_provision_command, |
236 | NamedLock, |
237 | @@ -47,8 +57,14 @@ from twisted.internet.error import ( |
238 | ProcessDone, |
239 | ProcessTerminated, |
240 | ) |
241 | -from twisted.internet.protocol import ProcessProtocol |
242 | +from twisted.internet.interfaces import IReactorMulticast |
243 | +from twisted.internet.protocol import ( |
244 | + DatagramProtocol, |
245 | + ProcessProtocol, |
246 | +) |
247 | from twisted.internet.threads import deferToThread |
248 | +from zope.interface.exceptions import DoesNotImplement |
249 | +from zope.interface.verify import verifyObject |
250 | |
251 | |
252 | maaslog = get_maas_logger("networks.monitor") |
253 | @@ -129,9 +145,6 @@ class ProtocolForObserveARP(JSONPerLineProtocol): |
254 | The difference between `JSONPerLineProtocol` and `ProtocolForObserveARP` |
255 | is that the neighbour observation protocol needs to insert the interface |
256 | metadata into the resultant object before the callback. |
257 | - |
258 | - This also ensures that the spawned process is configured as a process |
259 | - group leader for its own process group. |
260 | """ |
261 | |
262 | def __init__(self, interface, *args, **kwargs): |
263 | @@ -147,6 +160,28 @@ class ProtocolForObserveARP(JSONPerLineProtocol): |
264 | log.msg("observe-arp[%s]:" % self.interface, line) |
265 | |
266 | |
267 | +class ProtocolForObserveBeacons(JSONPerLineProtocol): |
268 | + """Protocol used when spawning `maas-rack observe-beacons`. |
269 | + |
270 | + The difference between `JSONPerLineProtocol` and |
271 | + `ProtocolForObserveBeacons` is that the beacon observation protocol needs |
272 | + to insert the interface metadata into the resultant object before the |
273 | + callback. |
274 | + """ |
275 | + |
276 | + def __init__(self, interface, *args, **kwargs): |
277 | + super().__init__(*args, **kwargs) |
278 | + self.interface = interface |
279 | + |
280 | + def objectReceived(self, obj): |
281 | + obj['interface'] = self.interface |
282 | + super().objectReceived(obj) |
283 | + |
284 | + def errLineReceived(self, line): |
285 | + line = line.decode("utf-8").rstrip() |
286 | + log.msg("observe-beacons[%s]:" % self.interface, line) |
287 | + |
288 | + |
289 | class ProtocolForObserveMDNS(JSONPerLineProtocol): |
290 | """Protocol used when spawning `maas-rack observe-mdns`. |
291 | |
292 | @@ -257,6 +292,30 @@ class NeighbourDiscoveryService(ProcessProtocolService): |
293 | self.ifname, callback=self.callback) |
294 | |
295 | |
296 | +class BeaconingService(ProcessProtocolService): |
297 | + """Service to spawn the per-interface device discovery subprocess.""" |
298 | + |
299 | + def __init__(self, ifname: str, callback: callable): |
300 | + self.ifname = ifname |
301 | + self.callback = callback |
302 | + super().__init__() |
303 | + |
304 | + def getDescription(self) -> str: |
305 | + return "Beaconing process for %s" % self.ifname |
306 | + |
307 | + def getProcessParameters(self): |
308 | + maas_rack_cmd = get_maas_provision_command().encode("utf-8") |
309 | + return [ |
310 | + maas_rack_cmd, |
311 | + b"observe-beacons", |
312 | + self.ifname.encode("utf-8") |
313 | + ] |
314 | + |
315 | + def createProcessProtocol(self): |
316 | + return ProtocolForObserveBeacons( |
317 | + self.ifname, callback=self.callback) |
318 | + |
319 | + |
320 | class MDNSResolverService(ProcessProtocolService): |
321 | """Service to spawn the per-interface device discovery subprocess.""" |
322 | |
323 | @@ -278,6 +337,173 @@ class MDNSResolverService(ProcessProtocolService): |
324 | return ProtocolForObserveMDNS(callback=self.callback) |
325 | |
326 | |
327 | +class BeaconingSocketProtocol(DatagramProtocol): |
328 | + """Protocol to handle beaconing packets received from the socket layer.""" |
329 | + |
330 | + def __init__( |
331 | + self, reactor, process_incoming=False, debug=True, interface='::', |
332 | + loopback=False, port=BEACON_PORT): |
333 | + super().__init__() |
334 | + self.reactor = reactor |
335 | + self.process_incoming = process_incoming |
336 | + self.debug = debug |
337 | + # These queues keep track of beacons that have recently been sent |
338 | + # or received by the protocol. Ordering is needed here so that we can |
339 | + # later age out the least-recently-added packets without traversing the |
340 | + # entire dictionary. |
341 | + self.tx_queue = OrderedDict() |
342 | + self.rx_queue = OrderedDict() |
343 | + self.listen_port = None |
344 | + try: |
345 | + # Need to ensure that the passed-in reactor is, in fact, a "real" |
346 | + # reactor, and not None, or a mock reactor used in tests. |
347 | + verifyObject(IReactorMulticast, reactor) |
348 | + self.listen_port = reactor.listenMulticast( |
349 | + port, self, interface=interface, listenMultiple=True) |
350 | + self.transport.joinGroup(BEACON_IPV4_MULTICAST) |
351 | + self.transport.setLoopbackMode(loopback) |
352 | + # XXX mpontillo 2017-06-21: Twisted doesn't support IPv6 here yet. |
353 | + # self.transport.joinGroup(BEACON_IPV6_MULTICAST) |
354 | + except DoesNotImplement: |
355 | + pass |
356 | + |
357 | + def stopProtocol(self): |
358 | + super().stopProtocol() |
359 | + if self.listen_port is not None: |
360 | + return self.listen_port.stopListening() |
361 | + return None |
362 | + |
363 | + def send_beacon(self, beacon, destination_address): |
364 | + """Send a beacon to the specified destination. |
365 | + |
366 | + :param beacon: The `BeaconPayload` namedtuple to send. Must have a |
367 | + `payload` ivar containing a 'uuid' element. |
368 | + :param destination_address: The UDP/IP (destination, port) tuple. IPv4 |
369 | + addresses must be in IPv4-mapped IPv6 format. |
370 | + :return: True if the beacon was sent, False otherwise. |
371 | + """ |
372 | + try: |
373 | + self.transport.write(beacon.bytes, destination_address) |
374 | + # If the packet cannot be sent for whatever reason, OSError will |
375 | + # be raised, and we won't record sending a beacon we didn't |
376 | + # actually send. |
377 | + self.tx_queue[beacon.payload['uuid']] = beacon |
378 | + age_out_uuid_queue(self.tx_queue) |
379 | + return True |
380 | + except OSError as e: |
381 | + if self.debug is True: |
382 | + log.msg("Error while sending beacon: %s" % e) |
383 | + return False |
384 | + |
385 | + def beaconReceived(self, beacon_json): |
386 | + """Called whenever a beacon is received. |
387 | + |
388 | + This method is responsible for updating the `tx_queue` and `rx_queue` |
389 | + data structures, and determining if the incoming beacon is meaningful |
390 | + for determining network topology. |
391 | + |
392 | + :param beacon_json: The normalized beacon JSON, which can come either |
393 | + from the external tcpdump-based process, or from the sockets layer |
394 | + (with less information about the received packet). |
395 | + """ |
396 | + rx_uuid = beacon_json.get('payload', {}).get("uuid") |
397 | + if rx_uuid is None: |
398 | + if self.debug is True: |
399 | + log.msg( |
400 | + "Rejecting incoming beacon: no UUID found: \n%s" % ( |
401 | + pformat(beacon_json))) |
402 | + return |
403 | + own_beacon = False |
404 | + if self.tx_queue.get(rx_uuid): |
405 | + own_beacon = True |
406 | + is_dup = self.remember_beacon_and_check_duplicate(rx_uuid, beacon_json) |
407 | + if self.debug is True: |
408 | + log.msg("%s %sreceived:\n%s" % ( |
409 | + "Own beacon" if own_beacon else "Beacon", |
410 | + "(duplicate) " if is_dup else "", |
411 | + beacon_json)) |
412 | + # From what we know so far, we can infer some facts about the network. |
413 | + # (1) If we received our own beacon, that means the interface we sent |
414 | + # the packet out on is on the same fabric as the interface that |
415 | + # received it. |
416 | + # (2) If we receive a duplicate beacon on two different interfaces, |
417 | + # that means those two interfaces are on the same fabric. |
418 | + reply_ip = beacon_json['source_ip'] |
419 | + reply_port = beacon_json['source_port'] |
420 | + if ':' not in reply_ip: |
421 | + # Since we opened an IPv6-compatible socket, need IPv6 syntax |
422 | + # here to send to IPv4 addresses. |
423 | + reply_ip = '::ffff:' + reply_ip |
424 | + reply_address = (reply_ip, reply_port) |
425 | + beacon_type = beacon_json['type'] |
426 | + if beacon_type == "solicitation": |
427 | + receive_interface_info = self.get_receive_interface_info( |
428 | + beacon_json) |
429 | + payload = { |
430 | + "interface": receive_interface_info, |
431 | + "acks": rx_uuid |
432 | + } |
433 | + reply = create_beacon_payload("advertisement", payload) |
434 | + self.send_beacon(reply, reply_address) |
435 | + |
436 | + def remember_beacon_and_check_duplicate(self, rx_uuid, beacon_json): |
437 | + """Records an incoming beacon based on its UUID and JSON. |
438 | + |
439 | + Organizes incoming beacons in the `rx_queue` by creating a list of |
440 | + beacons received [on different interfaces] per UUID. |
441 | + |
442 | + :param rx_uuid: The UUID of the incoming beacon. |
443 | + :param beacon_json: The incoming beacon (in JSON format). |
444 | + :return: True if the beacon was a duplicate, otherwise False. |
445 | + """ |
446 | + duplicate_received = False |
447 | + # Need to age out before doing anything else; we don't want to match |
448 | + # a duplicate packet and then delete it immediately after. |
449 | + age_out_uuid_queue(self.rx_queue) |
450 | + rx_packets_for_uuid = self.rx_queue.get(rx_uuid, []) |
451 | + if len(rx_packets_for_uuid) > 0: |
452 | + duplicate_received = True |
453 | + rx_packets_for_uuid.append(beacon_json) |
454 | + self.rx_queue[rx_uuid] = rx_packets_for_uuid |
455 | + return duplicate_received |
456 | + |
457 | + def get_receive_interface_info(self, context): |
458 | + """Returns a dictionary representing information about the receive |
459 | + interface, given the context of the beacon. The context can be the |
460 | + limited information received from the socket layer, or the extended |
461 | + information from the monitoring process. |
462 | + """ |
463 | + receive_interface_info = { |
464 | + "name": context.get('interface'), |
465 | + "source_ip": context.get('source_ip'), |
466 | + "destination_ip": context.get('destination_ip'), |
467 | + "source_mac": context.get('source_mac'), |
468 | + "destination_mac": context.get('destination_mac'), |
469 | + } |
470 | + if 'vid' in context: |
471 | + receive_interface_info['vid'] = context['vid'] |
472 | + return receive_interface_info |
473 | + |
474 | + def datagramReceived(self, datagram, addr): |
475 | + """Called by Twisted when a UDP datagram is received. |
476 | + |
477 | + Note: In the typical use case, the MAAS server will ignore packets |
478 | + coming into this method. We need to listen to the socket normally, |
479 | + however, so that the underlying network stack will send ICMP |
480 | + destination (port) unreachable replies to anyone trying to send us |
481 | + beacons. However, at other times, (such as while running the test |
482 | + commands), we *will* listen to the socket layer for beacons. |
483 | + """ |
484 | + if self.process_incoming is True: |
485 | + context = { |
486 | + "source_ip": addr[0], |
487 | + "source_port": addr[1] |
488 | + } |
489 | + beacon_json = beacon_to_json(read_beacon_payload(datagram)) |
490 | + beacon_json.update(context) |
491 | + self.beaconReceived(beacon_json) |
492 | + |
493 | + |
494 | class NetworksMonitoringLock(NamedLock): |
495 | """Host scoped lock to ensure only one network monitoring service runs.""" |
496 | |
497 | @@ -324,6 +550,7 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
498 | self.interface_monitor.setName("updateInterfaces") |
499 | self.interface_monitor.clock = self.clock |
500 | self.interface_monitor.setServiceParent(self) |
501 | + self.beaconing_protocol = BeaconingSocketProtocol(clock) |
502 | |
503 | def updateInterfaces(self): |
504 | """Update interfaces, catching and logging errors. |
505 | @@ -388,12 +615,19 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
506 | This MUST be overridden in subclasses. |
507 | """ |
508 | |
509 | + def reportBeacons(self, beacons): |
510 | + """Receives a report of an observed beacon packet.""" |
511 | + for beacon in beacons: |
512 | + log.msg("Received beacon: %r" % beacon) |
513 | + self.beaconing_protocol.beaconReceived(beacon) |
514 | + |
515 | def stopService(self): |
516 | """Stop the service. |
517 | |
518 | Ensures that sole responsibility for monitoring networks is released. |
519 | """ |
520 | d = super().stopService() |
521 | + self.beaconing_protocol.stopProtocol() |
522 | d.addBoth(callOut, self._releaseSoleResponsibility) |
523 | return d |
524 | |
525 | @@ -473,6 +707,13 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
526 | service.setName("neighbour_discovery:" + ifname) |
527 | service.setServiceParent(self) |
528 | |
529 | + def _startBeaconing(self, ifname): |
530 | + """"Start neighbour discovery service on the specified interface.""" |
531 | + service = BeaconingService(ifname, self.reportBeacons) |
532 | + service.clock = self.clock |
533 | + service.setName("beaconing:" + ifname) |
534 | + service.setServiceParent(self) |
535 | + |
536 | def _startMDNSDiscoveryService(self): |
537 | """Start resolving mDNS entries on attached networks.""" |
538 | try: |
539 | @@ -520,6 +761,30 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
540 | maaslog.info( |
541 | "Stopped neighbour observation service for %s." % ifname) |
542 | |
543 | + def _startBeaconingServices(self, new_interfaces): |
544 | + """Start monitoring services for the specified set of interfaces.""" |
545 | + for ifname in new_interfaces: |
546 | + # Sanity check to ensure the service isn't already started. |
547 | + try: |
548 | + self.getServiceNamed("beaconing:" + ifname) |
549 | + except KeyError: |
550 | + # This is an expected exception. (The call inside the `try` |
551 | + # is only necessary to ensure the service doesn't exist.) |
552 | + self._startBeaconing(ifname) |
553 | + |
554 | + def _stopBeaconingServices(self, deleted_interfaces): |
555 | + """Stop monitoring services for the specified set of interfaces.""" |
556 | + for ifname in deleted_interfaces: |
557 | + try: |
558 | + service = self.getServiceNamed("beaconing:" + ifname) |
559 | + except KeyError: |
560 | + # Service doesn't exist, so no need to stop it. |
561 | + pass |
562 | + else: |
563 | + service.disownServiceParent() |
564 | + maaslog.info( |
565 | + "Stopped beaconing service for %s." % ifname) |
566 | + |
567 | def _shouldMonitorMDNS(self, monitoring_state): |
568 | # If any interface is configured for mDNS, we must start the monitoring |
569 | # process. (You cannot select interfaces when using `avahi-browse`.) |
570 | @@ -591,11 +856,17 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
571 | log.msg("Starting neighbour discovery for interfaces: %r" % ( |
572 | new_interfaces)) |
573 | self._startNeighbourDiscoveryServices(new_interfaces) |
574 | + # XXX mpontillo 2017-07-12: for testing, just start beaconing |
575 | + # services on all the interfaces enabled for active discovery. |
576 | + self._startBeaconingServices(new_interfaces) |
577 | if len(deleted_interfaces) > 0: |
578 | log.msg( |
579 | "Stopping neighbour discovery for interfaces: %r" % ( |
580 | deleted_interfaces)) |
581 | self._stopNeighbourDiscoveryServices(deleted_interfaces) |
582 | + # XXX mpontillo 2017-07-12: this should be separately configured. |
583 | + # (see similar comment in the 'start' path above.) |
584 | + self._stopBeaconingServices(deleted_interfaces) |
585 | self._monitored = monitored_interfaces |
586 | |
587 | def _interfacesRecorded(self, interfaces): |
588 | diff --git a/src/provisioningserver/utils/tests/test_beaconing.py b/src/provisioningserver/utils/tests/test_beaconing.py |
589 | index 2448280..514b209 100644 |
590 | --- a/src/provisioningserver/utils/tests/test_beaconing.py |
591 | +++ b/src/provisioningserver/utils/tests/test_beaconing.py |
592 | @@ -6,14 +6,20 @@ |
593 | __all__ = [] |
594 | |
595 | from argparse import ArgumentParser |
596 | +from collections import OrderedDict |
597 | from gzip import compress |
598 | import io |
599 | +import math |
600 | import random |
601 | import struct |
602 | import subprocess |
603 | from tempfile import NamedTemporaryFile |
604 | +import time |
605 | from unittest.mock import Mock |
606 | -from uuid import UUID |
607 | +from uuid import ( |
608 | + UUID, |
609 | + uuid1, |
610 | +) |
611 | |
612 | from maastesting.factory import factory |
613 | from maastesting.matchers import MockCalledOnceWith |
614 | @@ -26,23 +32,63 @@ from provisioningserver.tests.test_security import SharedSecretTestCase |
615 | from provisioningserver.utils import beaconing as beaconing_module |
616 | from provisioningserver.utils.beaconing import ( |
617 | add_arguments, |
618 | + age_out_uuid_queue, |
619 | BEACON_HEADER_FORMAT_V1, |
620 | + beacon_to_json, |
621 | BEACON_TYPES, |
622 | BeaconingPacket, |
623 | + BeaconPayload, |
624 | create_beacon_payload, |
625 | InvalidBeaconingPacket, |
626 | read_beacon_payload, |
627 | run, |
628 | + uuid_to_timestamp, |
629 | ) |
630 | from provisioningserver.utils.script import ActionScriptError |
631 | from testtools.matchers import ( |
632 | + Contains, |
633 | Equals, |
634 | + HasLength, |
635 | Is, |
636 | IsInstance, |
637 | + LessThan, |
638 | + Not, |
639 | ) |
640 | from testtools.testcase import ExpectedException |
641 | |
642 | |
643 | +class TestUUIDToTimestamp(MAASTestCase): |
644 | + |
645 | + def test__round_trip_preserves_timestamp(self): |
646 | + expected_timestamp = time.time() |
647 | + uuid = str(uuid1()) |
648 | + actual_timestamp = uuid_to_timestamp(uuid) |
649 | + difference = math.fabs(actual_timestamp - expected_timestamp) |
650 | + # Tolerate a difference of ~3 seconds. We'll age out packets on the |
651 | + # order of minutes, so that should be good enough. |
652 | + self.assertThat(difference, LessThan(3.0)) |
653 | + |
654 | + |
655 | +class TestBeaconToJSON(MAASTestCase): |
656 | + """Tests for `beacon_to_json()` function.""" |
657 | + |
658 | + def test__preserves_version_type_and_payload__discards_bytes(self): |
659 | + test_bytes = factory.make_bytes() |
660 | + test_version = factory.make_string() |
661 | + test_type = factory.make_string() |
662 | + test_payload = { |
663 | + factory.make_string(): factory.make_string() |
664 | + } |
665 | + beacon = BeaconPayload( |
666 | + test_bytes, test_version, test_type, test_payload) |
667 | + beacon_json = beacon_to_json(beacon) |
668 | + self.assertThat(beacon_json["version"], Equals(test_version)) |
669 | + self.assertThat(beacon_json["type"], Equals(test_type)) |
670 | + self.assertThat(beacon_json["payload"], Equals(test_payload)) |
671 | + self.assertThat(beacon_json, Not(Contains("bytes"))) |
672 | + self.assertThat(beacon_json, HasLength(3)) |
673 | + |
674 | + |
675 | class TestCreateBeaconPayload(SharedSecretTestCase): |
676 | |
677 | def test__requires_maas_shared_secret_for_inner_data_payload(self): |
678 | @@ -286,3 +332,67 @@ class TestObserveBeaconsCommand(MAASTestCase): |
679 | os.setpgrp.side_effect = exception_type |
680 | self.assertRaises(exception_type, run, []) |
681 | self.assertThat(os.setpgrp, MockCalledOnceWith()) |
682 | + |
683 | + |
684 | +class TestAgeOutUUIDQueue(MAASTestCase): |
685 | + """Tests for `age_out_uuid_queue()` function.""" |
686 | + |
687 | + def test__does_not_remove_fresh_entries(self): |
688 | + uuid_now = str(uuid1()) |
689 | + queue = OrderedDict() |
690 | + queue[uuid_now] = {} |
691 | + self.assertThat(queue, HasLength(1)) |
692 | + age_out_uuid_queue(queue) |
693 | + self.assertThat(queue, HasLength(1)) |
694 | + |
695 | + def test__keeps_entries_from_the_reasonable_past(self): |
696 | + uuid_from_the_past = factory.make_UUID_with_timestamp( |
697 | + time.time() - 60.0) |
698 | + queue = OrderedDict() |
699 | + queue[uuid_from_the_past] = {} |
700 | + self.assertThat(queue, HasLength(1)) |
701 | + age_out_uuid_queue(queue) |
702 | + self.assertThat(queue, HasLength(1)) |
703 | + |
704 | + def test__keeps_entries_from_the_reasonable_future(self): |
705 | + uuid_from_the_future = factory.make_UUID_with_timestamp( |
706 | + time.time() + 60.0) |
707 | + queue = OrderedDict() |
708 | + queue[uuid_from_the_future] = {} |
709 | + self.assertThat(queue, HasLength(1)) |
710 | + age_out_uuid_queue(queue) |
711 | + self.assertThat(queue, HasLength(1)) |
712 | + |
713 | + def test__removes_entries_from_the_past(self): |
714 | + uuid_from_the_past = factory.make_UUID_with_timestamp( |
715 | + time.time() - 123.0) |
716 | + queue = OrderedDict() |
717 | + queue[uuid_from_the_past] = {} |
718 | + self.assertThat(queue, HasLength(1)) |
719 | + age_out_uuid_queue(queue) |
720 | + self.assertThat(queue, HasLength(0)) |
721 | + |
722 | + def test__removes_entries_from_the_future(self): |
723 | + uuid_from_the_future = factory.make_UUID_with_timestamp( |
724 | + time.time() + 123.0) |
725 | + queue = OrderedDict() |
726 | + queue[uuid_from_the_future] = {} |
727 | + self.assertThat(queue, HasLength(1)) |
728 | + age_out_uuid_queue(queue) |
729 | + self.assertThat(queue, HasLength(0)) |
730 | + |
731 | + def test__removes_entries_from_the_distant_past(self): |
732 | + uuid_from_the_past = '00000000-0000-1000-aaaa-aaaaaaaaaaaa' |
733 | + queue = OrderedDict() |
734 | + queue[uuid_from_the_past] = {} |
735 | + self.assertThat(queue, HasLength(1)) |
736 | + age_out_uuid_queue(queue) |
737 | + self.assertThat(queue, HasLength(0)) |
738 | + |
739 | + def test__removes_entries_from_the_far_future(self): |
740 | + uuid_from_the_future = 'ffffffff-ffff-1fff-0000-000000000000' |
741 | + queue = OrderedDict() |
742 | + queue[uuid_from_the_future] = {} |
743 | + self.assertThat(queue, HasLength(1)) |
744 | + age_out_uuid_queue(queue) |
745 | + self.assertThat(queue, HasLength(0)) |
746 | diff --git a/src/provisioningserver/utils/tests/test_services.py b/src/provisioningserver/utils/tests/test_services.py |
747 | index e287bec..ce8361f 100644 |
748 | --- a/src/provisioningserver/utils/tests/test_services.py |
749 | +++ b/src/provisioningserver/utils/tests/test_services.py |
750 | @@ -6,6 +6,7 @@ |
751 | __all__ = [] |
752 | |
753 | from functools import partial |
754 | +import random |
755 | import threading |
756 | from unittest.mock import ( |
757 | call, |
758 | @@ -27,8 +28,12 @@ from maastesting.testcase import ( |
759 | MAASTwistedRunTest, |
760 | ) |
761 | from maastesting.twisted import TwistedLoggerFixture |
762 | +from provisioningserver.tests.test_security import SharedSecretTestCase |
763 | from provisioningserver.utils import services |
764 | +from provisioningserver.utils.beaconing import create_beacon_payload |
765 | from provisioningserver.utils.services import ( |
766 | + BeaconingService, |
767 | + BeaconingSocketProtocol, |
768 | JSONPerLineProtocol, |
769 | MDNSResolverService, |
770 | NeighbourDiscoveryService, |
771 | @@ -36,6 +41,7 @@ from provisioningserver.utils.services import ( |
772 | NetworksMonitoringService, |
773 | ProcessProtocolService, |
774 | ProtocolForObserveARP, |
775 | + ProtocolForObserveBeacons, |
776 | ) |
777 | from testtools import ExpectedException |
778 | from testtools.matchers import ( |
779 | @@ -47,6 +53,7 @@ from testtools.matchers import ( |
780 | from twisted.application.service import MultiService |
781 | from twisted.internet import reactor |
782 | from twisted.internet.defer import ( |
783 | + Deferred, |
784 | DeferredQueue, |
785 | inlineCallbacks, |
786 | succeed, |
787 | @@ -382,6 +389,21 @@ class TestProtocolForObserveARP(MAASTestCase): |
788 | callback, MockCallsMatch(call([{"interface": ifname}]))) |
789 | |
790 | |
791 | +class TestProtocolForObserveBeacons(MAASTestCase): |
792 | + """Tests for `ProtocolForObserveBeacons`.""" |
793 | + |
794 | + run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
795 | + |
796 | + def test_adds_interface(self): |
797 | + callback = Mock() |
798 | + ifname = factory.make_name('eth') |
799 | + proto = ProtocolForObserveBeacons(ifname, callback=callback) |
800 | + proto.makeConnection(Mock(pid=None)) |
801 | + proto.outReceived(b"{}\n") |
802 | + self.expectThat( |
803 | + callback, MockCallsMatch(call([{"interface": ifname}]))) |
804 | + |
805 | + |
806 | class MockProcessProtocolService(ProcessProtocolService): |
807 | |
808 | def __init__(self): |
809 | @@ -566,6 +588,58 @@ class TestNeighbourDiscoveryService(MAASTestCase): |
810 | % (ifname, ifname))) |
811 | |
812 | |
813 | +class TestBeaconingService(MAASTestCase): |
814 | + """Tests for `BeaconingService`.""" |
815 | + |
816 | + run_tests_with = MAASTwistedRunTest.make_factory(timeout=5) |
817 | + |
818 | + def test__returns_expected_arguments(self): |
819 | + ifname = factory.make_name('eth') |
820 | + service = BeaconingService(ifname, Mock()) |
821 | + args = service.getProcessParameters() |
822 | + self.assertThat(args, HasLength(3)) |
823 | + self.assertTrue(args[0].endswith(b'maas-rack')) |
824 | + self.assertTrue(args[1], Equals(b"observe-beacons")) |
825 | + self.assertTrue(args[2], Equals(ifname.encode('utf-8'))) |
826 | + |
827 | + @inlineCallbacks |
828 | + def test__restarts_process_after_finishing(self): |
829 | + ifname = factory.make_name('eth') |
830 | + service = BeaconingService(ifname, Mock()) |
831 | + mock_process_params = self.patch(service, 'getProcessParameters') |
832 | + mock_process_params.return_value = [b'/bin/echo', b'{}'] |
833 | + service.clock = Clock() |
834 | + service.startService() |
835 | + # Wait for the protocol to finish |
836 | + service.clock.advance(0.0) |
837 | + yield service._protocol.done |
838 | + # Advance the clock (should start the service again) |
839 | + interval = service.step |
840 | + service.clock.advance(interval) |
841 | + # The Deferred should have been recreated. |
842 | + self.assertThat(service._protocol.done, Not(IsFiredDeferred())) |
843 | + yield service._protocol.done |
844 | + service.stopService() |
845 | + |
846 | + @inlineCallbacks |
847 | + def test__protocol_logs_stderr(self): |
848 | + logger = self.useFixture(TwistedLoggerFixture()) |
849 | + ifname = factory.make_name('eth') |
850 | + service = BeaconingService(ifname, lambda _: None) |
851 | + protocol = service.createProcessProtocol() |
852 | + reactor.spawnProcess(protocol, b"sh", (b"sh", b"-c", b"exec cat >&2")) |
853 | + protocol.transport.write( |
854 | + b"Lines written to stderr are logged\n" |
855 | + b"with a prefix, with no exceptions.\n") |
856 | + protocol.transport.closeStdin() |
857 | + yield protocol.done |
858 | + self.assertThat(logger.output, Equals( |
859 | + "observe-beacons[%s]: Lines written to stderr are logged\n" |
860 | + "---\n" |
861 | + "observe-beacons[%s]: with a prefix, with no exceptions." |
862 | + % (ifname, ifname))) |
863 | + |
864 | + |
865 | class TestMDNSResolverService(MAASTestCase): |
866 | """Tests for `MDNSResolverService`.""" |
867 | |
868 | @@ -594,3 +668,82 @@ class TestMDNSResolverService(MAASTestCase): |
869 | "observe-mdns: Lines written to stderr are logged\n" |
870 | "---\n" |
871 | "observe-mdns: with a prefix, with one exception:")) |
872 | + |
873 | + |
874 | +def wait_for_rx_packets(beacon_protocol, count, deferred=None): |
875 | + """Waits for a BeaconingSocketProtocol to transmit `count` packets.""" |
876 | + if deferred is None: |
877 | + deferred = Deferred() |
878 | + if len(beacon_protocol.rx_queue) >= count: |
879 | + deferred.callback(None) |
880 | + else: |
881 | + reactor.callLater( |
882 | + 0.001, wait_for_rx_packets, beacon_protocol, count, |
883 | + deferred=deferred) |
884 | + return deferred |
885 | + |
886 | + |
887 | +class TestBeaconingSocketProtocol(SharedSecretTestCase): |
888 | + """Tests for `BeaconingSocketProtocol`.""" |
889 | + |
890 | + run_tests_with = MAASTwistedRunTest.make_factory(timeout=15) |
891 | + |
892 | + @inlineCallbacks |
893 | + def test__creates_listen_port_when_run_with_IReactorMulticast(self): |
894 | + # Note: Always use a random port for testing. (port=0) |
895 | + protocol = BeaconingSocketProtocol(reactor, port=0) |
896 | + self.assertThat(protocol.listen_port, Not(Is(None))) |
897 | + # This tests that the post gets closed properly; otherwise the test |
898 | + # suite will complain about things left in the reactor. |
899 | + yield protocol.stopProtocol() |
900 | + |
901 | + def test__skips_creating_listen_port_when_run_with_fake_reactor(self): |
902 | + # Note: Always use a random port for testing. (port=0) |
903 | + protocol = BeaconingSocketProtocol(Clock(), port=0) |
904 | + self.assertThat(protocol.listen_port, Is(None)) |
905 | + # No listen port, so stopProtocol() shouldn't return a Deferred. |
906 | + result = protocol.stopProtocol() |
907 | + self.assertThat(result, Is(None)) |
908 | + |
909 | + @inlineCallbacks |
910 | + def test__sends_and_receives_beacons(self): |
911 | + # Note: Always use a random port for testing. (port=0) |
912 | + logger = self.useFixture(TwistedLoggerFixture()) |
913 | + protocol = BeaconingSocketProtocol( |
914 | + reactor, port=0, process_incoming=True, loopback=True, |
915 | + interface="::", debug=True) |
916 | + self.assertThat(protocol.listen_port, Not(Is(None))) |
917 | + listen_port = protocol.listen_port._realPortNumber |
918 | + self.write_secret() |
919 | + beacon = create_beacon_payload("solicitation", {}) |
920 | + rx_uuid = beacon.payload['uuid'] |
921 | + # XXX: We can't test IPv6 here until we either join the group |
922 | + # manually, or Twisted gains support for joining IPv6 groups. |
923 | + destination = random.choice(["::ffff:127.0.0.1", "::1"]) |
924 | + protocol.send_beacon(beacon, (destination, listen_port)) |
925 | + # Since we've instructed the protocol to loop back packets for testing, |
926 | + # it should have sent a multicast solicitation, received it back, sent |
927 | + # an advertisement, then received it back. So we'll wait for two |
928 | + # packets to be sent. |
929 | + yield wait_for_rx_packets(protocol, 2) |
930 | + # Grab the beacon we know we transmitted and then received. |
931 | + transmitted = protocol.tx_queue.pop(rx_uuid, None) |
932 | + received = protocol.rx_queue.pop(rx_uuid, None) |
933 | + self.assertThat(transmitted, Equals(beacon)) |
934 | + self.assertThat(received[0]['payload']['uuid'], Equals(rx_uuid)) |
935 | + # Grab the subsequent packets from the queues. |
936 | + transmitted = protocol.tx_queue.popitem()[1] |
937 | + received = protocol.rx_queue.popitem()[1] |
938 | + # We should have received a second packet to ack the first beacon. |
939 | + self.assertThat(received[0]['payload']['acks'], Equals(rx_uuid)) |
940 | + # We should have transmitted an advertisement in response to the |
941 | + # solicitation. |
942 | + self.assertThat(transmitted.type, Equals('advertisement')) |
943 | + # This tests that the post gets closed properly; otherwise the test |
944 | + # suite will complain about things left in the reactor. |
945 | + yield protocol.stopProtocol() |
946 | + # In debug mode, the logger should have printed each packet. |
947 | + self.assertThat( |
948 | + logger.output, |
949 | + DocTestMatches( |
950 | + '...Own beacon received:...Own beacon received:...')) |
I tried to make this branch as small as possible so that it can be reviewed and landed; I've also developed a command to test beaconing, but that is not part of this proposal. That branch can be found here:
https:/ /code.launchpad .net/~mpontillo /maas/+ git/maas/ +ref/send- beacons- command
For now, MAAS listens for beacons on any interface that is configured for network monitoring (that is, passive ARP observation). This approach may change in the future.
MAAS does not take any action on the received beacons except to (1) reply to any solicitation beacons with advertisement beacons and (2) keep state about beacons received within +/- two minutes of the current system time. This is enough to allow testing, sets the stage for MAAS to make some assumptions about fabric connectivity. But those assumptions are not yet acted upon in this branch.