Merge ~mpontillo/maas:beaconing-service into maas:master

Proposed by Mike Pontillo
Status: Merged
Approved by: Mike Pontillo
Approved revision: af83e74d10d132c3eaedd29b57be4a2e336c111b
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~mpontillo/maas:beaconing-service
Merge into: maas:master
Diff against target: 950 lines (+634/-22)
5 files modified
src/maastesting/factory.py (+21/-1)
src/provisioningserver/utils/beaconing.py (+74/-16)
src/provisioningserver/utils/services.py (+275/-4)
src/provisioningserver/utils/tests/test_beaconing.py (+111/-1)
src/provisioningserver/utils/tests/test_services.py (+153/-0)
Reviewer Review Type Date Requested Status
Данило Шеган (community) Approve
Review via email: mp+326397@code.launchpad.net

Commit message

Add initial beaconing service and protocol.

 * Add ProtocolForObserveBeacons to read input from each beacon
   monitoring process.
 * Add BeaconingSocketProtocol to receive input from either the
   ProtocolForObserveBeacons or the socket layer. (The socket
   layer path is used for unit tests, and in a future test command.)
 * Add a way to age out entries in an OrderedDict based upon UUID keys.
   (The UUIDs must have a valid 'time' field.)
 * Add initial transmit and receive path for beacons.
 * If solicitation packets are received, respond to them with
   advertisements.

To post a comment you must log in.
Revision history for this message
Mike Pontillo (mpontillo) wrote :

I tried to make this branch as small as possible so that it can be reviewed and landed; I've also developed a command to test beaconing, but that is not part of this proposal. That branch can be found here:

https://code.launchpad.net/~mpontillo/maas/+git/maas/+ref/send-beacons-command

For now, MAAS listens for beacons on any interface that is configured for network monitoring (that is, passive ARP observation). This approach may change in the future.

MAAS does not take any action on the received beacons except to (1) reply to any solicitation beacons with advertisement beacons and (2) keep state about beacons received within +/- two minutes of the current system time. This is enough to allow testing, sets the stage for MAAS to make some assumptions about fabric connectivity. But those assumptions are not yet acted upon in this branch.

Revision history for this message
Данило Шеган (danilo) wrote :

Looks good, sorry for the delay (it slipped on Friday due to my battles with the lander). A few minor nits inline.

review: Approve
Revision history for this message
Mike Pontillo (mpontillo) wrote :

Thanks for the review! Some replies below.

~mpontillo/maas:beaconing-service updated
af83e74... by Mike Pontillo

Review fixes.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maastesting/factory.py b/src/maastesting/factory.py
2index b83cffb..d12ac17 100644
3--- a/src/maastesting/factory.py
4+++ b/src/maastesting/factory.py
5@@ -30,7 +30,10 @@ from unittest import mock
6 import urllib.error
7 import urllib.parse
8 import urllib.request
9-from uuid import uuid1
10+from uuid import (
11+ UUID,
12+ uuid1,
13+)
14
15 from distro_info import UbuntuDistroInfo
16 from maastesting.fixtures import TempDirectory
17@@ -255,6 +258,23 @@ class Factory:
18 def make_UUID(self):
19 return str(uuid1())
20
21+ def make_UUID_with_timestamp(self, timestamp, clock_seq=None, node=None):
22+ if node is None:
23+ node = random.getrandbits(48) | 0x010000000000
24+ if clock_seq is None:
25+ clock_seq = random.getrandbits(14)
26+ timestamp = int(timestamp * 1e9 / 100) + 0x01b21dd213814000
27+ time_low = timestamp & 0xffffffff
28+ time_mid = (timestamp >> 32) & 0xffff
29+ time_hi_version = (timestamp >> 48) & 0x0fff
30+ clock_seq_low = clock_seq & 0xff
31+ clock_seq_hi_variant = (clock_seq >> 8) & 0x3f
32+ fields = (
33+ time_low, time_mid, time_hi_version, clock_seq_hi_variant,
34+ clock_seq_low, node
35+ )
36+ return str(UUID(fields=fields, version=1))
37+
38 def _make_random_network(
39 self, slash=None, but_not=EMPTY_SET, disjoint_from=None,
40 random_address_factory=None):
41diff --git a/src/provisioningserver/utils/beaconing.py b/src/provisioningserver/utils/beaconing.py
42index e8c2384..5b5d286 100644
43--- a/src/provisioningserver/utils/beaconing.py
44+++ b/src/provisioningserver/utils/beaconing.py
45@@ -19,13 +19,16 @@ from gzip import (
46 decompress,
47 )
48 import json
49+import math
50 import os
51 import stat
52 import struct
53 import subprocess
54 import sys
55 from textwrap import dedent
56+import time
57 import uuid
58+from uuid import UUID
59
60 from bson import BSON
61 from bson.errors import BSONError
62@@ -48,6 +51,9 @@ from provisioningserver.utils.tcpip import (
63
64
65 BEACON_PORT = 5240
66+BEACON_IPV4_MULTICAST = "224.0.0.118"
67+BEACON_IPV6_MULTICAST = "ff02::15a"
68+
69
70 BEACON_TYPES = {
71 "solicitation": 1,
72@@ -71,6 +77,61 @@ BeaconPayload = namedtuple('BeaconPayload', (
73 ))
74
75
76+def uuid_to_timestamp(uuid_str):
77+ """Given the specified UUID string, returns the timestamp.
78+
79+ The timestamp returned should be comparable to what would be returned
80+ from `import time; time.time()`.
81+
82+ :param uuid_str: a UUID in string format
83+ :return: float
84+ """
85+ uuid_time = UUID(uuid_str).time
86+ # Reverse the algorithm in uuid.py.
87+ timestamp = (uuid_time - 0x01b21dd213814000) * 100 / 1e9
88+ return timestamp
89+
90+
91+def age_out_uuid_queue(queue, threshold=120.0):
92+ """Ages out a ordered dictionary (using UUID-based keys) based on time.
93+
94+ The given threshold (in seconds) indicates how old an entry can be
95+ before it will be removed from the queue.
96+
97+ :param queue: An `OrderedDict` with UUID strings as keys.
98+ :param threshold: The maximum time an entry can remain in the queue.
99+ """
100+ removals = []
101+ current_time = time.time()
102+ for key in queue:
103+ beacon_timestamp = uuid_to_timestamp(key)
104+ # Don't leave beacons from the future in the queue if the clock
105+ # suddenly changes. (This shouldn't happen, since the Fernet TTL
106+ # should not have allowed them through. But just in case.)
107+ difference = math.fabs(current_time - beacon_timestamp)
108+ # Age out beacons greater than two minutes old.
109+ if difference > threshold:
110+ removals.append(key)
111+ else:
112+ # If we're already encountering packets that haven't met the age
113+ # threshold (and were received more recently, and thus are later
114+ # in the queue) then it's time to give up. (Yes, it's possible that
115+ # clock skew could be an issue here, but after a couple of minutes,
116+ # it won't matter.)
117+ break
118+ for uuid_to_remove in removals:
119+ queue.pop(uuid_to_remove, None)
120+
121+
122+def beacon_to_json(beacon_payload):
123+ """Converts the specified beacon into a format suitable for JSON."""
124+ return {
125+ "version": beacon_payload.version,
126+ "type": beacon_payload.type,
127+ "payload": beacon_payload.payload,
128+ }
129+
130+
131 def create_beacon_payload(beacon_type, payload=None, version=PROTOCOL_VERSION):
132 """Creates a beacon payload of the specified type, with the given data.
133
134@@ -106,7 +167,7 @@ def read_beacon_payload(beacon_bytes):
135 Decrypts the inner beacon data if necessary.
136
137 :param beacon_bytes: beacon payload (bytes).
138- :return: dict
139+ :return: BeaconPayload namedtuple
140 """
141 if len(beacon_bytes) < BEACON_HEADER_LENGTH_V1:
142 raise InvalidBeaconingPacket(
143@@ -131,7 +192,7 @@ def read_beacon_payload(beacon_bytes):
144 else:
145 try:
146 decrypted_data = fernet_decrypt_psk(
147- payload_bytes, raw=True)
148+ payload_bytes, ttl=60, raw=True)
149 except InvalidToken:
150 raise InvalidBeaconingPacket(
151 "Failed to decrypt inner payload: check MAAS secret key.")
152@@ -176,18 +237,13 @@ class BeaconingPacket:
153 self.valid = None
154 self.invalid_reason = None
155 self.packet = pkt_bytes
156- self.payload = self.parse()
157+ self.data = self.parse()
158
159 def parse(self):
160- """Output text-based details about this beaconing packet to the
161- specified file or stream.
162-
163- :param out: An object with `write(str)` and `flush()` methods.
164- """
165 try:
166- payload = read_beacon_payload(self.packet)
167+ beacon = read_beacon_payload(self.packet)
168 self.valid = True
169- return payload
170+ return beacon
171 except InvalidBeaconingPacket as ibp:
172 self.valid = False
173 self.invalid_reason = ibp.invalid_reason
174@@ -213,6 +269,8 @@ def observe_beaconing_packets(input=sys.stdin.buffer, out=sys.stdout):
175 try:
176 packet = decode_ethernet_udp_packet(packet_bytes, pcap_header)
177 beacon = BeaconingPacket(packet.payload)
178+ if not beacon.valid:
179+ continue
180 output_json = {
181 "source_mac": format_eui(packet.l2.src_eui),
182 "destination_mac": format_eui(packet.l2.dst_eui),
183@@ -220,15 +278,15 @@ def observe_beaconing_packets(input=sys.stdin.buffer, out=sys.stdout):
184 "destination_ip": str(packet.l3.dst_ip),
185 "source_port": packet.l4.packet.src_port,
186 "destination_port": packet.l4.packet.dst_port,
187+ "time": pcap_header.timestamp_seconds
188 }
189 if packet.l2.vid is not None:
190 output_json["vid"] = packet.l2.vid
191- if beacon.payload is not None:
192- output_json['payload'] = beacon.payload
193- output_json['time'] = pcap_header.timestamp_seconds
194- out.write(json.dumps(output_json))
195- out.write('\n')
196- out.flush()
197+ if beacon.data is not None:
198+ output_json.update(beacon_to_json(beacon.data))
199+ out.write(json.dumps(output_json))
200+ out.write('\n')
201+ out.flush()
202 except PacketProcessingError as e:
203 err.write(e.error)
204 err.write("\n")
205diff --git a/src/provisioningserver/utils/services.py b/src/provisioningserver/utils/services.py
206index dd6b399..0ca0394 100644
207--- a/src/provisioningserver/utils/services.py
208+++ b/src/provisioningserver/utils/services.py
209@@ -11,10 +11,12 @@ from abc import (
210 ABCMeta,
211 abstractmethod,
212 )
213+from collections import OrderedDict
214 from datetime import timedelta
215 import json
216 from json.decoder import JSONDecodeError
217 import os
218+from pprint import pformat
219 import re
220
221 from provisioningserver.config import is_dev_environment
222@@ -23,6 +25,14 @@ from provisioningserver.logger import (
223 LegacyLogger,
224 )
225 from provisioningserver.rpc.exceptions import NoConnectionsAvailable
226+from provisioningserver.utils.beaconing import (
227+ age_out_uuid_queue,
228+ BEACON_IPV4_MULTICAST,
229+ BEACON_PORT,
230+ beacon_to_json,
231+ create_beacon_payload,
232+ read_beacon_payload,
233+)
234 from provisioningserver.utils.fs import (
235 get_maas_provision_command,
236 NamedLock,
237@@ -47,8 +57,14 @@ from twisted.internet.error import (
238 ProcessDone,
239 ProcessTerminated,
240 )
241-from twisted.internet.protocol import ProcessProtocol
242+from twisted.internet.interfaces import IReactorMulticast
243+from twisted.internet.protocol import (
244+ DatagramProtocol,
245+ ProcessProtocol,
246+)
247 from twisted.internet.threads import deferToThread
248+from zope.interface.exceptions import DoesNotImplement
249+from zope.interface.verify import verifyObject
250
251
252 maaslog = get_maas_logger("networks.monitor")
253@@ -129,9 +145,6 @@ class ProtocolForObserveARP(JSONPerLineProtocol):
254 The difference between `JSONPerLineProtocol` and `ProtocolForObserveARP`
255 is that the neighbour observation protocol needs to insert the interface
256 metadata into the resultant object before the callback.
257-
258- This also ensures that the spawned process is configured as a process
259- group leader for its own process group.
260 """
261
262 def __init__(self, interface, *args, **kwargs):
263@@ -147,6 +160,28 @@ class ProtocolForObserveARP(JSONPerLineProtocol):
264 log.msg("observe-arp[%s]:" % self.interface, line)
265
266
267+class ProtocolForObserveBeacons(JSONPerLineProtocol):
268+ """Protocol used when spawning `maas-rack observe-beacons`.
269+
270+ The difference between `JSONPerLineProtocol` and
271+ `ProtocolForObserveBeacons` is that the beacon observation protocol needs
272+ to insert the interface metadata into the resultant object before the
273+ callback.
274+ """
275+
276+ def __init__(self, interface, *args, **kwargs):
277+ super().__init__(*args, **kwargs)
278+ self.interface = interface
279+
280+ def objectReceived(self, obj):
281+ obj['interface'] = self.interface
282+ super().objectReceived(obj)
283+
284+ def errLineReceived(self, line):
285+ line = line.decode("utf-8").rstrip()
286+ log.msg("observe-beacons[%s]:" % self.interface, line)
287+
288+
289 class ProtocolForObserveMDNS(JSONPerLineProtocol):
290 """Protocol used when spawning `maas-rack observe-mdns`.
291
292@@ -257,6 +292,30 @@ class NeighbourDiscoveryService(ProcessProtocolService):
293 self.ifname, callback=self.callback)
294
295
296+class BeaconingService(ProcessProtocolService):
297+ """Service to spawn the per-interface device discovery subprocess."""
298+
299+ def __init__(self, ifname: str, callback: callable):
300+ self.ifname = ifname
301+ self.callback = callback
302+ super().__init__()
303+
304+ def getDescription(self) -> str:
305+ return "Beaconing process for %s" % self.ifname
306+
307+ def getProcessParameters(self):
308+ maas_rack_cmd = get_maas_provision_command().encode("utf-8")
309+ return [
310+ maas_rack_cmd,
311+ b"observe-beacons",
312+ self.ifname.encode("utf-8")
313+ ]
314+
315+ def createProcessProtocol(self):
316+ return ProtocolForObserveBeacons(
317+ self.ifname, callback=self.callback)
318+
319+
320 class MDNSResolverService(ProcessProtocolService):
321 """Service to spawn the per-interface device discovery subprocess."""
322
323@@ -278,6 +337,173 @@ class MDNSResolverService(ProcessProtocolService):
324 return ProtocolForObserveMDNS(callback=self.callback)
325
326
327+class BeaconingSocketProtocol(DatagramProtocol):
328+ """Protocol to handle beaconing packets received from the socket layer."""
329+
330+ def __init__(
331+ self, reactor, process_incoming=False, debug=True, interface='::',
332+ loopback=False, port=BEACON_PORT):
333+ super().__init__()
334+ self.reactor = reactor
335+ self.process_incoming = process_incoming
336+ self.debug = debug
337+ # These queues keep track of beacons that have recently been sent
338+ # or received by the protocol. Ordering is needed here so that we can
339+ # later age out the least-recently-added packets without traversing the
340+ # entire dictionary.
341+ self.tx_queue = OrderedDict()
342+ self.rx_queue = OrderedDict()
343+ self.listen_port = None
344+ try:
345+ # Need to ensure that the passed-in reactor is, in fact, a "real"
346+ # reactor, and not None, or a mock reactor used in tests.
347+ verifyObject(IReactorMulticast, reactor)
348+ self.listen_port = reactor.listenMulticast(
349+ port, self, interface=interface, listenMultiple=True)
350+ self.transport.joinGroup(BEACON_IPV4_MULTICAST)
351+ self.transport.setLoopbackMode(loopback)
352+ # XXX mpontillo 2017-06-21: Twisted doesn't support IPv6 here yet.
353+ # self.transport.joinGroup(BEACON_IPV6_MULTICAST)
354+ except DoesNotImplement:
355+ pass
356+
357+ def stopProtocol(self):
358+ super().stopProtocol()
359+ if self.listen_port is not None:
360+ return self.listen_port.stopListening()
361+ return None
362+
363+ def send_beacon(self, beacon, destination_address):
364+ """Send a beacon to the specified destination.
365+
366+ :param beacon: The `BeaconPayload` namedtuple to send. Must have a
367+ `payload` ivar containing a 'uuid' element.
368+ :param destination_address: The UDP/IP (destination, port) tuple. IPv4
369+ addresses must be in IPv4-mapped IPv6 format.
370+ :return: True if the beacon was sent, False otherwise.
371+ """
372+ try:
373+ self.transport.write(beacon.bytes, destination_address)
374+ # If the packet cannot be sent for whatever reason, OSError will
375+ # be raised, and we won't record sending a beacon we didn't
376+ # actually send.
377+ self.tx_queue[beacon.payload['uuid']] = beacon
378+ age_out_uuid_queue(self.tx_queue)
379+ return True
380+ except OSError as e:
381+ if self.debug is True:
382+ log.msg("Error while sending beacon: %s" % e)
383+ return False
384+
385+ def beaconReceived(self, beacon_json):
386+ """Called whenever a beacon is received.
387+
388+ This method is responsible for updating the `tx_queue` and `rx_queue`
389+ data structures, and determining if the incoming beacon is meaningful
390+ for determining network topology.
391+
392+ :param beacon_json: The normalized beacon JSON, which can come either
393+ from the external tcpdump-based process, or from the sockets layer
394+ (with less information about the received packet).
395+ """
396+ rx_uuid = beacon_json.get('payload', {}).get("uuid")
397+ if rx_uuid is None:
398+ if self.debug is True:
399+ log.msg(
400+ "Rejecting incoming beacon: no UUID found: \n%s" % (
401+ pformat(beacon_json)))
402+ return
403+ own_beacon = False
404+ if self.tx_queue.get(rx_uuid):
405+ own_beacon = True
406+ is_dup = self.remember_beacon_and_check_duplicate(rx_uuid, beacon_json)
407+ if self.debug is True:
408+ log.msg("%s %sreceived:\n%s" % (
409+ "Own beacon" if own_beacon else "Beacon",
410+ "(duplicate) " if is_dup else "",
411+ beacon_json))
412+ # From what we know so far, we can infer some facts about the network.
413+ # (1) If we received our own beacon, that means the interface we sent
414+ # the packet out on is on the same fabric as the interface that
415+ # received it.
416+ # (2) If we receive a duplicate beacon on two different interfaces,
417+ # that means those two interfaces are on the same fabric.
418+ reply_ip = beacon_json['source_ip']
419+ reply_port = beacon_json['source_port']
420+ if ':' not in reply_ip:
421+ # Since we opened an IPv6-compatible socket, need IPv6 syntax
422+ # here to send to IPv4 addresses.
423+ reply_ip = '::ffff:' + reply_ip
424+ reply_address = (reply_ip, reply_port)
425+ beacon_type = beacon_json['type']
426+ if beacon_type == "solicitation":
427+ receive_interface_info = self.get_receive_interface_info(
428+ beacon_json)
429+ payload = {
430+ "interface": receive_interface_info,
431+ "acks": rx_uuid
432+ }
433+ reply = create_beacon_payload("advertisement", payload)
434+ self.send_beacon(reply, reply_address)
435+
436+ def remember_beacon_and_check_duplicate(self, rx_uuid, beacon_json):
437+ """Records an incoming beacon based on its UUID and JSON.
438+
439+ Organizes incoming beacons in the `rx_queue` by creating a list of
440+ beacons received [on different interfaces] per UUID.
441+
442+ :param rx_uuid: The UUID of the incoming beacon.
443+ :param beacon_json: The incoming beacon (in JSON format).
444+ :return: True if the beacon was a duplicate, otherwise False.
445+ """
446+ duplicate_received = False
447+ # Need to age out before doing anything else; we don't want to match
448+ # a duplicate packet and then delete it immediately after.
449+ age_out_uuid_queue(self.rx_queue)
450+ rx_packets_for_uuid = self.rx_queue.get(rx_uuid, [])
451+ if len(rx_packets_for_uuid) > 0:
452+ duplicate_received = True
453+ rx_packets_for_uuid.append(beacon_json)
454+ self.rx_queue[rx_uuid] = rx_packets_for_uuid
455+ return duplicate_received
456+
457+ def get_receive_interface_info(self, context):
458+ """Returns a dictionary representing information about the receive
459+ interface, given the context of the beacon. The context can be the
460+ limited information received from the socket layer, or the extended
461+ information from the monitoring process.
462+ """
463+ receive_interface_info = {
464+ "name": context.get('interface'),
465+ "source_ip": context.get('source_ip'),
466+ "destination_ip": context.get('destination_ip'),
467+ "source_mac": context.get('source_mac'),
468+ "destination_mac": context.get('destination_mac'),
469+ }
470+ if 'vid' in context:
471+ receive_interface_info['vid'] = context['vid']
472+ return receive_interface_info
473+
474+ def datagramReceived(self, datagram, addr):
475+ """Called by Twisted when a UDP datagram is received.
476+
477+ Note: In the typical use case, the MAAS server will ignore packets
478+ coming into this method. We need to listen to the socket normally,
479+ however, so that the underlying network stack will send ICMP
480+ destination (port) unreachable replies to anyone trying to send us
481+ beacons. However, at other times, (such as while running the test
482+ commands), we *will* listen to the socket layer for beacons.
483+ """
484+ if self.process_incoming is True:
485+ context = {
486+ "source_ip": addr[0],
487+ "source_port": addr[1]
488+ }
489+ beacon_json = beacon_to_json(read_beacon_payload(datagram))
490+ beacon_json.update(context)
491+ self.beaconReceived(beacon_json)
492+
493+
494 class NetworksMonitoringLock(NamedLock):
495 """Host scoped lock to ensure only one network monitoring service runs."""
496
497@@ -324,6 +550,7 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
498 self.interface_monitor.setName("updateInterfaces")
499 self.interface_monitor.clock = self.clock
500 self.interface_monitor.setServiceParent(self)
501+ self.beaconing_protocol = BeaconingSocketProtocol(clock)
502
503 def updateInterfaces(self):
504 """Update interfaces, catching and logging errors.
505@@ -388,12 +615,19 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
506 This MUST be overridden in subclasses.
507 """
508
509+ def reportBeacons(self, beacons):
510+ """Receives a report of an observed beacon packet."""
511+ for beacon in beacons:
512+ log.msg("Received beacon: %r" % beacon)
513+ self.beaconing_protocol.beaconReceived(beacon)
514+
515 def stopService(self):
516 """Stop the service.
517
518 Ensures that sole responsibility for monitoring networks is released.
519 """
520 d = super().stopService()
521+ self.beaconing_protocol.stopProtocol()
522 d.addBoth(callOut, self._releaseSoleResponsibility)
523 return d
524
525@@ -473,6 +707,13 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
526 service.setName("neighbour_discovery:" + ifname)
527 service.setServiceParent(self)
528
529+ def _startBeaconing(self, ifname):
530+ """"Start neighbour discovery service on the specified interface."""
531+ service = BeaconingService(ifname, self.reportBeacons)
532+ service.clock = self.clock
533+ service.setName("beaconing:" + ifname)
534+ service.setServiceParent(self)
535+
536 def _startMDNSDiscoveryService(self):
537 """Start resolving mDNS entries on attached networks."""
538 try:
539@@ -520,6 +761,30 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
540 maaslog.info(
541 "Stopped neighbour observation service for %s." % ifname)
542
543+ def _startBeaconingServices(self, new_interfaces):
544+ """Start monitoring services for the specified set of interfaces."""
545+ for ifname in new_interfaces:
546+ # Sanity check to ensure the service isn't already started.
547+ try:
548+ self.getServiceNamed("beaconing:" + ifname)
549+ except KeyError:
550+ # This is an expected exception. (The call inside the `try`
551+ # is only necessary to ensure the service doesn't exist.)
552+ self._startBeaconing(ifname)
553+
554+ def _stopBeaconingServices(self, deleted_interfaces):
555+ """Stop monitoring services for the specified set of interfaces."""
556+ for ifname in deleted_interfaces:
557+ try:
558+ service = self.getServiceNamed("beaconing:" + ifname)
559+ except KeyError:
560+ # Service doesn't exist, so no need to stop it.
561+ pass
562+ else:
563+ service.disownServiceParent()
564+ maaslog.info(
565+ "Stopped beaconing service for %s." % ifname)
566+
567 def _shouldMonitorMDNS(self, monitoring_state):
568 # If any interface is configured for mDNS, we must start the monitoring
569 # process. (You cannot select interfaces when using `avahi-browse`.)
570@@ -591,11 +856,17 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
571 log.msg("Starting neighbour discovery for interfaces: %r" % (
572 new_interfaces))
573 self._startNeighbourDiscoveryServices(new_interfaces)
574+ # XXX mpontillo 2017-07-12: for testing, just start beaconing
575+ # services on all the interfaces enabled for active discovery.
576+ self._startBeaconingServices(new_interfaces)
577 if len(deleted_interfaces) > 0:
578 log.msg(
579 "Stopping neighbour discovery for interfaces: %r" % (
580 deleted_interfaces))
581 self._stopNeighbourDiscoveryServices(deleted_interfaces)
582+ # XXX mpontillo 2017-07-12: this should be separately configured.
583+ # (see similar comment in the 'start' path above.)
584+ self._stopBeaconingServices(deleted_interfaces)
585 self._monitored = monitored_interfaces
586
587 def _interfacesRecorded(self, interfaces):
588diff --git a/src/provisioningserver/utils/tests/test_beaconing.py b/src/provisioningserver/utils/tests/test_beaconing.py
589index 2448280..514b209 100644
590--- a/src/provisioningserver/utils/tests/test_beaconing.py
591+++ b/src/provisioningserver/utils/tests/test_beaconing.py
592@@ -6,14 +6,20 @@
593 __all__ = []
594
595 from argparse import ArgumentParser
596+from collections import OrderedDict
597 from gzip import compress
598 import io
599+import math
600 import random
601 import struct
602 import subprocess
603 from tempfile import NamedTemporaryFile
604+import time
605 from unittest.mock import Mock
606-from uuid import UUID
607+from uuid import (
608+ UUID,
609+ uuid1,
610+)
611
612 from maastesting.factory import factory
613 from maastesting.matchers import MockCalledOnceWith
614@@ -26,23 +32,63 @@ from provisioningserver.tests.test_security import SharedSecretTestCase
615 from provisioningserver.utils import beaconing as beaconing_module
616 from provisioningserver.utils.beaconing import (
617 add_arguments,
618+ age_out_uuid_queue,
619 BEACON_HEADER_FORMAT_V1,
620+ beacon_to_json,
621 BEACON_TYPES,
622 BeaconingPacket,
623+ BeaconPayload,
624 create_beacon_payload,
625 InvalidBeaconingPacket,
626 read_beacon_payload,
627 run,
628+ uuid_to_timestamp,
629 )
630 from provisioningserver.utils.script import ActionScriptError
631 from testtools.matchers import (
632+ Contains,
633 Equals,
634+ HasLength,
635 Is,
636 IsInstance,
637+ LessThan,
638+ Not,
639 )
640 from testtools.testcase import ExpectedException
641
642
643+class TestUUIDToTimestamp(MAASTestCase):
644+
645+ def test__round_trip_preserves_timestamp(self):
646+ expected_timestamp = time.time()
647+ uuid = str(uuid1())
648+ actual_timestamp = uuid_to_timestamp(uuid)
649+ difference = math.fabs(actual_timestamp - expected_timestamp)
650+ # Tolerate a difference of ~3 seconds. We'll age out packets on the
651+ # order of minutes, so that should be good enough.
652+ self.assertThat(difference, LessThan(3.0))
653+
654+
655+class TestBeaconToJSON(MAASTestCase):
656+ """Tests for `beacon_to_json()` function."""
657+
658+ def test__preserves_version_type_and_payload__discards_bytes(self):
659+ test_bytes = factory.make_bytes()
660+ test_version = factory.make_string()
661+ test_type = factory.make_string()
662+ test_payload = {
663+ factory.make_string(): factory.make_string()
664+ }
665+ beacon = BeaconPayload(
666+ test_bytes, test_version, test_type, test_payload)
667+ beacon_json = beacon_to_json(beacon)
668+ self.assertThat(beacon_json["version"], Equals(test_version))
669+ self.assertThat(beacon_json["type"], Equals(test_type))
670+ self.assertThat(beacon_json["payload"], Equals(test_payload))
671+ self.assertThat(beacon_json, Not(Contains("bytes")))
672+ self.assertThat(beacon_json, HasLength(3))
673+
674+
675 class TestCreateBeaconPayload(SharedSecretTestCase):
676
677 def test__requires_maas_shared_secret_for_inner_data_payload(self):
678@@ -286,3 +332,67 @@ class TestObserveBeaconsCommand(MAASTestCase):
679 os.setpgrp.side_effect = exception_type
680 self.assertRaises(exception_type, run, [])
681 self.assertThat(os.setpgrp, MockCalledOnceWith())
682+
683+
684+class TestAgeOutUUIDQueue(MAASTestCase):
685+ """Tests for `age_out_uuid_queue()` function."""
686+
687+ def test__does_not_remove_fresh_entries(self):
688+ uuid_now = str(uuid1())
689+ queue = OrderedDict()
690+ queue[uuid_now] = {}
691+ self.assertThat(queue, HasLength(1))
692+ age_out_uuid_queue(queue)
693+ self.assertThat(queue, HasLength(1))
694+
695+ def test__keeps_entries_from_the_reasonable_past(self):
696+ uuid_from_the_past = factory.make_UUID_with_timestamp(
697+ time.time() - 60.0)
698+ queue = OrderedDict()
699+ queue[uuid_from_the_past] = {}
700+ self.assertThat(queue, HasLength(1))
701+ age_out_uuid_queue(queue)
702+ self.assertThat(queue, HasLength(1))
703+
704+ def test__keeps_entries_from_the_reasonable_future(self):
705+ uuid_from_the_future = factory.make_UUID_with_timestamp(
706+ time.time() + 60.0)
707+ queue = OrderedDict()
708+ queue[uuid_from_the_future] = {}
709+ self.assertThat(queue, HasLength(1))
710+ age_out_uuid_queue(queue)
711+ self.assertThat(queue, HasLength(1))
712+
713+ def test__removes_entries_from_the_past(self):
714+ uuid_from_the_past = factory.make_UUID_with_timestamp(
715+ time.time() - 123.0)
716+ queue = OrderedDict()
717+ queue[uuid_from_the_past] = {}
718+ self.assertThat(queue, HasLength(1))
719+ age_out_uuid_queue(queue)
720+ self.assertThat(queue, HasLength(0))
721+
722+ def test__removes_entries_from_the_future(self):
723+ uuid_from_the_future = factory.make_UUID_with_timestamp(
724+ time.time() + 123.0)
725+ queue = OrderedDict()
726+ queue[uuid_from_the_future] = {}
727+ self.assertThat(queue, HasLength(1))
728+ age_out_uuid_queue(queue)
729+ self.assertThat(queue, HasLength(0))
730+
731+ def test__removes_entries_from_the_distant_past(self):
732+ uuid_from_the_past = '00000000-0000-1000-aaaa-aaaaaaaaaaaa'
733+ queue = OrderedDict()
734+ queue[uuid_from_the_past] = {}
735+ self.assertThat(queue, HasLength(1))
736+ age_out_uuid_queue(queue)
737+ self.assertThat(queue, HasLength(0))
738+
739+ def test__removes_entries_from_the_far_future(self):
740+ uuid_from_the_future = 'ffffffff-ffff-1fff-0000-000000000000'
741+ queue = OrderedDict()
742+ queue[uuid_from_the_future] = {}
743+ self.assertThat(queue, HasLength(1))
744+ age_out_uuid_queue(queue)
745+ self.assertThat(queue, HasLength(0))
746diff --git a/src/provisioningserver/utils/tests/test_services.py b/src/provisioningserver/utils/tests/test_services.py
747index e287bec..ce8361f 100644
748--- a/src/provisioningserver/utils/tests/test_services.py
749+++ b/src/provisioningserver/utils/tests/test_services.py
750@@ -6,6 +6,7 @@
751 __all__ = []
752
753 from functools import partial
754+import random
755 import threading
756 from unittest.mock import (
757 call,
758@@ -27,8 +28,12 @@ from maastesting.testcase import (
759 MAASTwistedRunTest,
760 )
761 from maastesting.twisted import TwistedLoggerFixture
762+from provisioningserver.tests.test_security import SharedSecretTestCase
763 from provisioningserver.utils import services
764+from provisioningserver.utils.beaconing import create_beacon_payload
765 from provisioningserver.utils.services import (
766+ BeaconingService,
767+ BeaconingSocketProtocol,
768 JSONPerLineProtocol,
769 MDNSResolverService,
770 NeighbourDiscoveryService,
771@@ -36,6 +41,7 @@ from provisioningserver.utils.services import (
772 NetworksMonitoringService,
773 ProcessProtocolService,
774 ProtocolForObserveARP,
775+ ProtocolForObserveBeacons,
776 )
777 from testtools import ExpectedException
778 from testtools.matchers import (
779@@ -47,6 +53,7 @@ from testtools.matchers import (
780 from twisted.application.service import MultiService
781 from twisted.internet import reactor
782 from twisted.internet.defer import (
783+ Deferred,
784 DeferredQueue,
785 inlineCallbacks,
786 succeed,
787@@ -382,6 +389,21 @@ class TestProtocolForObserveARP(MAASTestCase):
788 callback, MockCallsMatch(call([{"interface": ifname}])))
789
790
791+class TestProtocolForObserveBeacons(MAASTestCase):
792+ """Tests for `ProtocolForObserveBeacons`."""
793+
794+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
795+
796+ def test_adds_interface(self):
797+ callback = Mock()
798+ ifname = factory.make_name('eth')
799+ proto = ProtocolForObserveBeacons(ifname, callback=callback)
800+ proto.makeConnection(Mock(pid=None))
801+ proto.outReceived(b"{}\n")
802+ self.expectThat(
803+ callback, MockCallsMatch(call([{"interface": ifname}])))
804+
805+
806 class MockProcessProtocolService(ProcessProtocolService):
807
808 def __init__(self):
809@@ -566,6 +588,58 @@ class TestNeighbourDiscoveryService(MAASTestCase):
810 % (ifname, ifname)))
811
812
813+class TestBeaconingService(MAASTestCase):
814+ """Tests for `BeaconingService`."""
815+
816+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=5)
817+
818+ def test__returns_expected_arguments(self):
819+ ifname = factory.make_name('eth')
820+ service = BeaconingService(ifname, Mock())
821+ args = service.getProcessParameters()
822+ self.assertThat(args, HasLength(3))
823+ self.assertTrue(args[0].endswith(b'maas-rack'))
824+ self.assertTrue(args[1], Equals(b"observe-beacons"))
825+ self.assertTrue(args[2], Equals(ifname.encode('utf-8')))
826+
827+ @inlineCallbacks
828+ def test__restarts_process_after_finishing(self):
829+ ifname = factory.make_name('eth')
830+ service = BeaconingService(ifname, Mock())
831+ mock_process_params = self.patch(service, 'getProcessParameters')
832+ mock_process_params.return_value = [b'/bin/echo', b'{}']
833+ service.clock = Clock()
834+ service.startService()
835+ # Wait for the protocol to finish
836+ service.clock.advance(0.0)
837+ yield service._protocol.done
838+ # Advance the clock (should start the service again)
839+ interval = service.step
840+ service.clock.advance(interval)
841+ # The Deferred should have been recreated.
842+ self.assertThat(service._protocol.done, Not(IsFiredDeferred()))
843+ yield service._protocol.done
844+ service.stopService()
845+
846+ @inlineCallbacks
847+ def test__protocol_logs_stderr(self):
848+ logger = self.useFixture(TwistedLoggerFixture())
849+ ifname = factory.make_name('eth')
850+ service = BeaconingService(ifname, lambda _: None)
851+ protocol = service.createProcessProtocol()
852+ reactor.spawnProcess(protocol, b"sh", (b"sh", b"-c", b"exec cat >&2"))
853+ protocol.transport.write(
854+ b"Lines written to stderr are logged\n"
855+ b"with a prefix, with no exceptions.\n")
856+ protocol.transport.closeStdin()
857+ yield protocol.done
858+ self.assertThat(logger.output, Equals(
859+ "observe-beacons[%s]: Lines written to stderr are logged\n"
860+ "---\n"
861+ "observe-beacons[%s]: with a prefix, with no exceptions."
862+ % (ifname, ifname)))
863+
864+
865 class TestMDNSResolverService(MAASTestCase):
866 """Tests for `MDNSResolverService`."""
867
868@@ -594,3 +668,82 @@ class TestMDNSResolverService(MAASTestCase):
869 "observe-mdns: Lines written to stderr are logged\n"
870 "---\n"
871 "observe-mdns: with a prefix, with one exception:"))
872+
873+
874+def wait_for_rx_packets(beacon_protocol, count, deferred=None):
875+ """Waits for a BeaconingSocketProtocol to transmit `count` packets."""
876+ if deferred is None:
877+ deferred = Deferred()
878+ if len(beacon_protocol.rx_queue) >= count:
879+ deferred.callback(None)
880+ else:
881+ reactor.callLater(
882+ 0.001, wait_for_rx_packets, beacon_protocol, count,
883+ deferred=deferred)
884+ return deferred
885+
886+
887+class TestBeaconingSocketProtocol(SharedSecretTestCase):
888+ """Tests for `BeaconingSocketProtocol`."""
889+
890+ run_tests_with = MAASTwistedRunTest.make_factory(timeout=15)
891+
892+ @inlineCallbacks
893+ def test__creates_listen_port_when_run_with_IReactorMulticast(self):
894+ # Note: Always use a random port for testing. (port=0)
895+ protocol = BeaconingSocketProtocol(reactor, port=0)
896+ self.assertThat(protocol.listen_port, Not(Is(None)))
897+ # This tests that the post gets closed properly; otherwise the test
898+ # suite will complain about things left in the reactor.
899+ yield protocol.stopProtocol()
900+
901+ def test__skips_creating_listen_port_when_run_with_fake_reactor(self):
902+ # Note: Always use a random port for testing. (port=0)
903+ protocol = BeaconingSocketProtocol(Clock(), port=0)
904+ self.assertThat(protocol.listen_port, Is(None))
905+ # No listen port, so stopProtocol() shouldn't return a Deferred.
906+ result = protocol.stopProtocol()
907+ self.assertThat(result, Is(None))
908+
909+ @inlineCallbacks
910+ def test__sends_and_receives_beacons(self):
911+ # Note: Always use a random port for testing. (port=0)
912+ logger = self.useFixture(TwistedLoggerFixture())
913+ protocol = BeaconingSocketProtocol(
914+ reactor, port=0, process_incoming=True, loopback=True,
915+ interface="::", debug=True)
916+ self.assertThat(protocol.listen_port, Not(Is(None)))
917+ listen_port = protocol.listen_port._realPortNumber
918+ self.write_secret()
919+ beacon = create_beacon_payload("solicitation", {})
920+ rx_uuid = beacon.payload['uuid']
921+ # XXX: We can't test IPv6 here until we either join the group
922+ # manually, or Twisted gains support for joining IPv6 groups.
923+ destination = random.choice(["::ffff:127.0.0.1", "::1"])
924+ protocol.send_beacon(beacon, (destination, listen_port))
925+ # Since we've instructed the protocol to loop back packets for testing,
926+ # it should have sent a multicast solicitation, received it back, sent
927+ # an advertisement, then received it back. So we'll wait for two
928+ # packets to be sent.
929+ yield wait_for_rx_packets(protocol, 2)
930+ # Grab the beacon we know we transmitted and then received.
931+ transmitted = protocol.tx_queue.pop(rx_uuid, None)
932+ received = protocol.rx_queue.pop(rx_uuid, None)
933+ self.assertThat(transmitted, Equals(beacon))
934+ self.assertThat(received[0]['payload']['uuid'], Equals(rx_uuid))
935+ # Grab the subsequent packets from the queues.
936+ transmitted = protocol.tx_queue.popitem()[1]
937+ received = protocol.rx_queue.popitem()[1]
938+ # We should have received a second packet to ack the first beacon.
939+ self.assertThat(received[0]['payload']['acks'], Equals(rx_uuid))
940+ # We should have transmitted an advertisement in response to the
941+ # solicitation.
942+ self.assertThat(transmitted.type, Equals('advertisement'))
943+ # This tests that the post gets closed properly; otherwise the test
944+ # suite will complain about things left in the reactor.
945+ yield protocol.stopProtocol()
946+ # In debug mode, the logger should have printed each packet.
947+ self.assertThat(
948+ logger.output,
949+ DocTestMatches(
950+ '...Own beacon received:...Own beacon received:...'))

Subscribers

People subscribed via source and target branches