Merge ~mpontillo/maas:beaconing--use-hints-for-interface-update into maas:master

Proposed by Mike Pontillo
Status: Merged
Approved by: Mike Pontillo
Approved revision: b10bf14b5bdbfb3bb4348891079f3c175e72e43b
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~mpontillo/maas:beaconing--use-hints-for-interface-update
Merge into: maas:master
Diff against target: 896 lines (+381/-97)
13 files modified
.gitignore (+1/-1)
src/maasserver/models/node.py (+2/-1)
src/maasserver/regiondservices/networks_monitoring.py (+5/-4)
src/maasserver/regiondservices/tests/test_networks_monitoring.py (+2/-1)
src/maasserver/rpc/rackcontrollers.py (+2/-2)
src/maasserver/rpc/regionservice.py (+7/-4)
src/maasserver/rpc/tests/test_rackcontrollers.py (+1/-1)
src/maasserver/rpc/tests/test_regionservice_calls.py (+2/-1)
src/provisioningserver/rackdservices/networks_monitoring_service.py (+16/-12)
src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py (+74/-10)
src/provisioningserver/rpc/region.py (+1/-0)
src/provisioningserver/utils/services.py (+134/-55)
src/provisioningserver/utils/tests/test_services.py (+134/-5)
Reviewer Review Type Date Requested Status
Blake Rouse (community) Approve
Review via email: mp+328815@code.launchpad.net

Commit message

Send topology hints to update_interfaces()

 * Add code to convert topology hints to a compact JSON format.
 * Start beaconing as soon as interfaces are known; don't
   wait for interface monitoring settings. This means
   beaconing will happen on all interfaces, all the time.
   (Without regard for network discovery settings.)
 * Fix bug that caused update_interfaces() to be called
   with create_fabrics=True when it should have been False.
 * Wait for beaconing to complete before updating interfaces.
 * Drive-by fix for incorrect line in .gitignore which
   erroneously covered django16_south_maas19.tar.gz.

To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) wrote :

Looks good. Just one question that I am not going to block you on. But if its the case that a region controller will never run beaconing? The question is why? Seems like it should.

review: Approve
Revision history for this message
Mike Pontillo (mpontillo) wrote :

Thanks for the review. The NetworksMonitoringService always runs beaconing, regardless of whether it's on the rack or the region. (The line you pointed out was in the tests; the tests for the region didn't need much change, since the region service simply calls update_interfaces() rather than going through the RPC layer.)

Revision history for this message
MAAS Lander (maas-lander) wrote :

LANDING
-b beaconing--use-hints-for-interface-update lp:~mpontillo/maas into -b master lp:~maas-committers/maas

STATUS: FAILED BUILD
LOG: http://maas-ci-jenkins.internal:8080/job/maas/job/branch-tester/194/consoleText

Revision history for this message
MAAS Lander (maas-lander) wrote :

LANDING
-b beaconing--use-hints-for-interface-update lp:~mpontillo/maas into -b master lp:~maas-committers/maas

STATUS: FAILED BUILD
LOG: http://maas-ci-jenkins.internal:8080/job/maas/job/branch-tester/198/consoleText

Revision history for this message
Mike Pontillo (mpontillo) wrote :

Fixed merge conflict with the .gitignore file, which slipped in when another branch landed.

Revision history for this message
MAAS Lander (maas-lander) wrote :

LANDING
-b beaconing--use-hints-for-interface-update lp:~mpontillo/maas into -b master lp:~maas-committers/maas

STATUS: FAILED BUILD
LOG: http://maas-ci-jenkins.internal:8080/job/maas/job/branch-tester/200/consoleText

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/.gitignore b/.gitignore
2index 20d7b98..714a000 100644
3--- a/.gitignore
4+++ b/.gitignore
5@@ -19,6 +19,7 @@
6 /.noseids
7 /.run
8 /.run-e2e
9+/*.orig.tar.gz
10 /bin
11 /build
12 /build_pkg
13@@ -57,4 +58,3 @@ __pycache__
14 *~
15 \#*\#
16 .#*
17-*.tar.gz
18diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py
19index 127669b..389f84c 100644
20--- a/src/maasserver/models/node.py
21+++ b/src/maasserver/models/node.py
22@@ -4657,7 +4657,8 @@ class Controller(Node):
23 @with_connection
24 @synchronised(locks.rack_registration)
25 @transactional
26- def update_interfaces(self, interfaces, create_fabrics=True):
27+ def update_interfaces(
28+ self, interfaces, topology_hints=None, create_fabrics=True):
29 """Update the interfaces attached to the controller.
30
31 :param interfaces: Interfaces dictionary that was parsed from
32diff --git a/src/maasserver/regiondservices/networks_monitoring.py b/src/maasserver/regiondservices/networks_monitoring.py
33index 9427bef..9cf313a 100644
34--- a/src/maasserver/regiondservices/networks_monitoring.py
35+++ b/src/maasserver/regiondservices/networks_monitoring.py
36@@ -23,9 +23,10 @@ class RegionNetworksMonitoringService(NetworksMonitoringService):
37 """Get interface monitoring state from the region."""
38 return deferToDatabase(self.getInterfaceMonitoringStateFromDatabase)
39
40- def recordInterfaces(self, interfaces):
41+ def recordInterfaces(self, interfaces, hints=None):
42 """Record the interfaces information."""
43- return deferToDatabase(self.recordInterfacesIntoDatabase, interfaces)
44+ return deferToDatabase(
45+ self.recordInterfacesIntoDatabase, interfaces, hints)
46
47 def reportNeighbours(self, neighbours):
48 """Record the specified list of neighbours."""
49@@ -42,10 +43,10 @@ class RegionNetworksMonitoringService(NetworksMonitoringService):
50 return region_controller.get_discovery_state()
51
52 @transactional
53- def recordInterfacesIntoDatabase(self, interfaces):
54+ def recordInterfacesIntoDatabase(self, interfaces, hints):
55 """Record the interfaces information."""
56 region_controller = RegionController.objects.get_running_controller()
57- region_controller.update_interfaces(interfaces)
58+ region_controller.update_interfaces(interfaces, hints)
59
60 @transactional
61 def recordNeighboursIntoDatabase(self, neighbours):
62diff --git a/src/maasserver/regiondservices/tests/test_networks_monitoring.py b/src/maasserver/regiondservices/tests/test_networks_monitoring.py
63index 62933ba..e49e34b 100644
64--- a/src/maasserver/regiondservices/tests/test_networks_monitoring.py
65+++ b/src/maasserver/regiondservices/tests/test_networks_monitoring.py
66@@ -52,7 +52,8 @@ class TestRegionNetworksMonitoringService(MAASTransactionServerTestCase):
67 }
68 }
69
70- service = RegionNetworksMonitoringService(reactor)
71+ service = RegionNetworksMonitoringService(
72+ reactor, enable_beaconing=False)
73 service.getInterfaces = lambda: succeed(interfaces)
74
75 with FakeLogger("maas") as logger:
76diff --git a/src/maasserver/rpc/rackcontrollers.py b/src/maasserver/rpc/rackcontrollers.py
77index d7ed7e7..d80477c 100644
78--- a/src/maasserver/rpc/rackcontrollers.py
79+++ b/src/maasserver/rpc/rackcontrollers.py
80@@ -222,10 +222,10 @@ def update_foreign_dhcp(system_id, interface_name, dhcp_ip=None):
81
82 @synchronous
83 @transactional
84-def update_interfaces(system_id, interfaces):
85+def update_interfaces(system_id, interfaces, topology_hints=None):
86 """Update the interface definition on the rack controller."""
87 rack_controller = RackController.objects.get(system_id=system_id)
88- rack_controller.update_interfaces(interfaces)
89+ rack_controller.update_interfaces(interfaces, topology_hints)
90
91
92 @synchronous
93diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py
94index c6fc9d9..340a1be 100644
95--- a/src/maasserver/rpc/regionservice.py
96+++ b/src/maasserver/rpc/regionservice.py
97@@ -404,14 +404,15 @@ class Region(RPCProtocol):
98 return d
99
100 @region.UpdateInterfaces.responder
101- def update_interfaces(self, system_id, interfaces):
102+ def update_interfaces(self, system_id, interfaces, topology_hints=None):
103 """update_interfaces()
104
105 Implementation of
106 :py:class:`~provisioningserver.rpc.region.UpdateInterfaces`.
107 """
108 d = deferToDatabase(
109- rackcontrollers.update_interfaces, system_id, interfaces)
110+ rackcontrollers.update_interfaces, system_id, interfaces,
111+ topology_hints=topology_hints)
112 d.addCallback(lambda args: {})
113 return d
114
115@@ -616,8 +617,10 @@ class RegionServer(Region):
116 def register(
117 self, system_id, hostname, interfaces, url, nodegroup_uuid=None,
118 beacon_support=False, version=None):
119- # If beacons is None, register in legacy mode.
120- create_fabrics = True if beacon_support else False
121+ # Hold off on fabric creation if the remote controller
122+ # supports beacons; it will happen later when UpdateInterfaces is
123+ # called.
124+ create_fabrics = False if beacon_support else True
125 result = yield self._register(
126 system_id, hostname, interfaces, url,
127 nodegroup_uuid=nodegroup_uuid, create_fabrics=create_fabrics,
128diff --git a/src/maasserver/rpc/tests/test_rackcontrollers.py b/src/maasserver/rpc/tests/test_rackcontrollers.py
129index 1f40522..5392b32 100644
130--- a/src/maasserver/rpc/tests/test_rackcontrollers.py
131+++ b/src/maasserver/rpc/tests/test_rackcontrollers.py
132@@ -479,7 +479,7 @@ class TestUpdateInterfaces(MAASServerTestCase):
133 update_interfaces(rack_controller.system_id, sentinel.interfaces)
134 self.assertThat(
135 patched_update_interfaces,
136- MockCalledOnceWith(sentinel.interfaces))
137+ MockCalledOnceWith(sentinel.interfaces, None))
138
139
140 class TestReportNeighbours(MAASServerTestCase):
141diff --git a/src/maasserver/rpc/tests/test_regionservice_calls.py b/src/maasserver/rpc/tests/test_regionservice_calls.py
142index 1d1d497..209ee44 100644
143--- a/src/maasserver/rpc/tests/test_regionservice_calls.py
144+++ b/src/maasserver/rpc/tests/test_regionservice_calls.py
145@@ -1182,7 +1182,8 @@ class TestRegionProtocol_UpdateInterfaces(MAASTransactionServerTestCase):
146 self.assertThat(
147 update_interfaces,
148 MockCalledOnceWith(
149- params['system_id'], params['interfaces']))
150+ params['system_id'], params['interfaces'],
151+ topology_hints=None))
152
153
154 class TestRegionProtocol_ReportNeighbours(MAASTestCase):
155diff --git a/src/provisioningserver/rackdservices/networks_monitoring_service.py b/src/provisioningserver/rackdservices/networks_monitoring_service.py
156index 1de76f2..d8f7e8c 100644
157--- a/src/provisioningserver/rackdservices/networks_monitoring_service.py
158+++ b/src/provisioningserver/rackdservices/networks_monitoring_service.py
159@@ -8,6 +8,7 @@ __all__ = [
160 ]
161
162 from provisioningserver.logger import get_maas_logger
163+from provisioningserver.rpc.exceptions import NoConnectionsAvailable
164 from provisioningserver.rpc.region import (
165 GetDiscoveryState,
166 ReportMDNSEntries,
167@@ -16,6 +17,8 @@ from provisioningserver.rpc.region import (
168 UpdateInterfaces,
169 )
170 from provisioningserver.utils.services import NetworksMonitoringService
171+from provisioningserver.utils.twisted import pause
172+from twisted.internet.defer import inlineCallbacks
173
174
175 maaslog = get_maas_logger("networks.monitor")
176@@ -44,20 +47,21 @@ class RackNetworksMonitoringService(NetworksMonitoringService):
177 d.addCallback(getState)
178 return d
179
180- def recordInterfaces(self, interfaces):
181+ @inlineCallbacks
182+ def recordInterfaces(self, interfaces, hints=None):
183 """Record the interfaces information to the region."""
184- def record(client):
185- # On first run perform a refresh
186+ while self.running:
187+ try:
188+ client = yield self.clientService.getClientNow()
189+ except NoConnectionsAvailable:
190+ yield pause(1.0)
191+ continue
192 if self._recorded is None:
193- return client(RequestRackRefresh, system_id=client.localIdent)
194- else:
195- return client(
196- UpdateInterfaces, system_id=client.localIdent,
197- interfaces=interfaces)
198-
199- d = self.clientService.getClientNow()
200- d.addCallback(record)
201- return d
202+ yield client(RequestRackRefresh, system_id=client.localIdent)
203+ yield client(
204+ UpdateInterfaces, system_id=client.localIdent,
205+ interfaces=interfaces, topology_hints=hints)
206+ break
207
208 def reportNeighbours(self, neighbours):
209 """Report neighbour information to the region."""
210diff --git a/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py b/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py
211index d325125..2404cfa 100644
212--- a/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py
213+++ b/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py
214@@ -5,8 +5,13 @@
215
216 __all__ = []
217
218+from unittest.mock import call
219+
220 from maastesting.factory import factory
221-from maastesting.matchers import MockCalledOnceWith
222+from maastesting.matchers import (
223+ MockCalledOnceWith,
224+ MockCallsMatch,
225+)
226 from maastesting.testcase import (
227 MAASTestCase,
228 MAASTwistedRunTest,
229@@ -17,6 +22,7 @@ from provisioningserver.rackdservices.networks_monitoring_service import (
230 )
231 from provisioningserver.rpc import region
232 from provisioningserver.rpc.testing import MockLiveClusterToRegionRPCFixture
233+from provisioningserver.utils import services as services_module
234 from twisted.internet.defer import (
235 inlineCallbacks,
236 maybeDeferred,
237@@ -27,7 +33,7 @@ from twisted.internet.task import Clock
238
239 class TestRackNetworksMonitoringService(MAASTestCase):
240
241- run_tests_with = MAASTwistedRunTest.make_factory(debug=False, timeout=5)
242+ run_tests_with = MAASTwistedRunTest.make_factory(debug=True, timeout=5)
243
244 @inlineCallbacks
245 def test_runs_refresh_first_time(self):
246@@ -37,10 +43,14 @@ class TestRackNetworksMonitoringService(MAASTestCase):
247
248 rpc_service = services.getServiceNamed('rpc')
249 service = RackNetworksMonitoringService(
250- rpc_service, Clock(), enable_monitoring=False)
251+ rpc_service, Clock(), enable_monitoring=False,
252+ enable_beaconing=False)
253
254- yield service.startService()
255- yield service.stopService()
256+ yield maybeDeferred(service.startService)
257+ # By stopping the interface_monitor first, we assure that the loop
258+ # happens at least once before the service stops completely.
259+ yield maybeDeferred(service.interface_monitor.stopService)
260+ yield maybeDeferred(service.stopService)
261
262 self.assertThat(
263 protocol.RequestRackRefresh, MockCalledOnceWith(
264@@ -64,7 +74,8 @@ class TestRackNetworksMonitoringService(MAASTestCase):
265
266 rpc_service = services.getServiceNamed('rpc')
267 service = RackNetworksMonitoringService(
268- rpc_service, Clock(), enable_monitoring=False)
269+ rpc_service, Clock(), enable_monitoring=False,
270+ enable_beaconing=False)
271 service.getInterfaces = lambda: succeed(interfaces)
272 # Put something in the cache. This tells recordInterfaces that refresh
273 # has already run but the interfaces have changed thus they need to be
274@@ -77,7 +88,57 @@ class TestRackNetworksMonitoringService(MAASTestCase):
275 self.assertThat(
276 protocol.UpdateInterfaces, MockCalledOnceWith(
277 protocol, system_id=rpc_service.getClient().localIdent,
278- interfaces=interfaces))
279+ interfaces=interfaces, topology_hints=None))
280+
281+ @inlineCallbacks
282+ def test_reports_interfaces_with_hints_if_beaconing_enabled(self):
283+ fixture = self.useFixture(MockLiveClusterToRegionRPCFixture())
284+ protocol, connecting = fixture.makeEventLoop(region.UpdateInterfaces)
285+ # Don't actually wait for beaconing to complete.
286+ pause_mock = self.patch(services_module, 'pause')
287+ queue_mcast_mock = self.patch(
288+ services_module.BeaconingSocketProtocol, 'queueMulticastBeaconing')
289+ self.addCleanup((yield connecting))
290+
291+ interfaces = {
292+ "eth0": {
293+ "type": "physical",
294+ "mac_address": factory.make_mac_address(),
295+ "parents": [],
296+ "links": [],
297+ "enabled": True,
298+ }
299+ }
300+
301+ rpc_service = services.getServiceNamed('rpc')
302+ service = RackNetworksMonitoringService(
303+ rpc_service, Clock(), enable_monitoring=False,
304+ enable_beaconing=True)
305+ service.getInterfaces = lambda: succeed(interfaces)
306+ # Put something in the cache. This tells recordInterfaces that refresh
307+ # has already run but the interfaces have changed thus they need to be
308+ # updated.
309+ service._recorded = {}
310+
311+ service.startService()
312+ yield service.stopService()
313+
314+ self.assertThat(
315+ protocol.UpdateInterfaces, MockCalledOnceWith(
316+ protocol, system_id=rpc_service.getClient().localIdent,
317+ interfaces=interfaces, topology_hints=[]))
318+ # The service should have sent out beacons, waited three seconds,
319+ # solicited for more beacons, then waited another three seconds before
320+ # deciding that beaconing is complete.
321+ self.assertThat(pause_mock, MockCallsMatch(call(3.0), call(3.0)))
322+ self.assertThat(
323+ queue_mcast_mock, MockCallsMatch(
324+ # Called when the service starts.
325+ call(solicitation=True),
326+ # Called three seconds later.
327+ call(solicitation=True),
328+ # Not called again when the service shuts down.
329+ ))
330
331 @inlineCallbacks
332 def test_reports_neighbours_to_region(self):
333@@ -87,7 +148,8 @@ class TestRackNetworksMonitoringService(MAASTestCase):
334 self.addCleanup((yield connecting))
335 rpc_service = services.getServiceNamed('rpc')
336 service = RackNetworksMonitoringService(
337- rpc_service, Clock(), enable_monitoring=False)
338+ rpc_service, Clock(), enable_monitoring=False,
339+ enable_beaconing=False)
340 neighbours = [{"ip": factory.make_ip_address()}]
341 yield service.reportNeighbours(neighbours)
342 self.assertThat(
343@@ -103,7 +165,8 @@ class TestRackNetworksMonitoringService(MAASTestCase):
344 self.addCleanup((yield connecting))
345 rpc_service = services.getServiceNamed('rpc')
346 service = RackNetworksMonitoringService(
347- rpc_service, Clock(), enable_monitoring=False)
348+ rpc_service, Clock(), enable_monitoring=False,
349+ enable_beaconing=False)
350 mdns = [
351 {
352 'interface': 'eth0',
353@@ -126,7 +189,8 @@ class TestRackNetworksMonitoringService(MAASTestCase):
354 rpc_service = services.getServiceNamed('rpc')
355 reactor = Clock()
356 service = RackNetworksMonitoringService(
357- rpc_service, reactor, enable_monitoring=False)
358+ rpc_service, reactor, enable_monitoring=False,
359+ enable_beaconing=False)
360 protocol.GetDiscoveryState.return_value = {'interfaces': {}}
361 # Put something in the cache. This tells recordInterfaces that refresh
362 # has already run but the interfaces have changed thus they need to be
363diff --git a/src/provisioningserver/rpc/region.py b/src/provisioningserver/rpc/region.py
364index ab16e4b..58c43ab 100644
365--- a/src/provisioningserver/rpc/region.py
366+++ b/src/provisioningserver/rpc/region.py
367@@ -460,6 +460,7 @@ class UpdateInterfaces(amp.Command):
368 arguments = [
369 (b'system_id', amp.Unicode()),
370 (b'interfaces', StructureAsJSON()),
371+ (b'topology_hints', StructureAsJSON(optional=True)),
372 ]
373 response = []
374 errors = []
375diff --git a/src/provisioningserver/utils/services.py b/src/provisioningserver/utils/services.py
376index 5a43922..cb5c81b 100644
377--- a/src/provisioningserver/utils/services.py
378+++ b/src/provisioningserver/utils/services.py
379@@ -28,7 +28,6 @@ from provisioningserver.logger import (
380 get_maas_logger,
381 LegacyLogger,
382 )
383-from provisioningserver.rpc.exceptions import NoConnectionsAvailable
384 from provisioningserver.utils.beaconing import (
385 age_out_uuid_queue,
386 BEACON_IPV4_MULTICAST,
387@@ -52,7 +51,7 @@ from provisioningserver.utils.shell import select_c_utf8_bytes_locale
388 from provisioningserver.utils.twisted import (
389 callOut,
390 deferred,
391- suppress,
392+ pause,
393 terminateProcess,
394 )
395 from twisted.application.internet import TimerService
396@@ -425,9 +424,15 @@ def join_ipv6_beacon_group(sock, ifindex):
397 socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) +
398 struct.pack("I", ifindex)
399 )
400- sock.setsockopt(
401- socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP,
402- ipv6_join_sockopt_args)
403+ try:
404+ sock.setsockopt(
405+ socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP,
406+ ipv6_join_sockopt_args)
407+ except OSError:
408+ # Do this on a best-effort basis. We might get an "Address already in
409+ # use" error if the group is already joined, or (for whatever reason)
410+ # it is not possible to join a multicast group using this interface.
411+ pass
412
413
414 def set_ipv6_multicast_loopback(sock, loopback):
415@@ -472,6 +477,7 @@ class BeaconingSocketProtocol(DatagramProtocol):
416 self.topology_hints = OrderedDict()
417 self.listen_port = None
418 self.mcast_requested = False
419+ self.mcast_solicitation = False
420 self.last_solicited_mcast = 0
421 self._join_multicast_groups()
422
423@@ -482,8 +488,9 @@ class BeaconingSocketProtocol(DatagramProtocol):
424 verifyObject(IReactorMulticast, self.reactor)
425 except DoesNotImplement:
426 return
427- self.listen_port = self.reactor.listenMulticast(
428- self.port, self, interface=self.interface, listenMultiple=True)
429+ if self.listen_port is None:
430+ self.listen_port = self.reactor.listenMulticast(
431+ self.port, self, interface=self.interface, listenMultiple=True)
432 sock = self.transport.getHandle()
433 if self.loopback is True:
434 # This is only necessary for testing.
435@@ -512,6 +519,34 @@ class BeaconingSocketProtocol(DatagramProtocol):
436 self.interfaces = interfaces
437 self._join_multicast_groups()
438
439+ def getAllTopologyHints(self):
440+ """Returns the set of unique topology hints."""
441+ # When beaconing runs, hints attached to individual packets might
442+ # come to the same conclusion about the implied fabric connectivity.
443+ # Use a set to prevent the region from processing duplicate hints.
444+ all_hints = set()
445+ for hints in self.topology_hints.values():
446+ all_hints |= hints
447+ return all_hints
448+
449+ def getJSONTopologyHints(self):
450+ """Returns all topology hints as a list of dictionaries.
451+
452+ This method is used for sending data via the RPC layer, so be cautious
453+ when modifying. In addition, keys with no value are filtered out
454+ of the resulting dictionary, so that the hints are smaller on the wire.
455+ """
456+ all_hints = self.getAllTopologyHints()
457+ json_hints = [
458+ {
459+ key: value
460+ for key, value in hint._asdict().items()
461+ if value is not None
462+ }
463+ for hint in all_hints
464+ ]
465+ return json_hints
466+
467 def stopProtocol(self):
468 super().stopProtocol()
469 if self.listen_port is not None:
470@@ -629,27 +664,36 @@ class BeaconingSocketProtocol(DatagramProtocol):
471 reply = create_beacon_payload("advertisement", payload)
472 self.send_beacon(reply, beacon.reply_address)
473 if len(self.interfaces) > 0:
474- self.queueMulticastAdvertisement()
475+ self.queueMulticastBeaconing()
476
477- def dequeueMulticastAdvertisement(self):
478+ def dequeueMulticastBeaconing(self):
479 """
480 Callback to send multicast beacon advertisements.
481
482 See `queueMulticastAdvertisement`, which schedules this method to run.
483 """
484 mtime = time.monotonic()
485+ beacon_type = (
486+ 'solicitation' if self.mcast_solicitation else 'advertisement')
487+ log.msg("Sending multicast beacon %ss." % beacon_type)
488+ self.send_multicast_beacons(self.interfaces, beacon_type)
489 self.last_solicited_mcast = mtime
490 self.mcast_requested = False
491- log.msg("Sending multicast beacon advertisements.")
492- self.send_multicast_beacons(self.interfaces, 'advertisement')
493+ self.mcast_solicitation = False
494
495- def queueMulticastAdvertisement(self):
496+ def queueMulticastBeaconing(self, solicitation=False):
497 """
498 Requests that multicast advertisements be sent out on every interface.
499
500 Ensures that advertisements will not be sent more than once every
501 five seconds.
502+
503+ :param solicitation: If true, sends solicitations rather than
504+ advertisements. Solicitations are used to initiate "full beaconing"
505+ with peers; advertisements do not generate beacon replies.
506 """
507+ if solicitation is True:
508+ self.mcast_solicitation = True
509 if self.mcast_requested:
510 # A multicast advertisement has been requested already.
511 return
512@@ -660,7 +704,7 @@ class BeaconingSocketProtocol(DatagramProtocol):
513 else:
514 timeout = max(mtime - self.last_solicited_mcast, 5)
515 self.mcast_requested = True
516- self.reactor.callLater(timeout, self.dequeueMulticastAdvertisement)
517+ self.reactor.callLater(timeout, self.dequeueMulticastBeaconing)
518
519 def processTopologyHints(self, rx: ReceivedBeacon):
520 """
521@@ -692,9 +736,7 @@ class BeaconingSocketProtocol(DatagramProtocol):
522 self.topology_hints[rx.uuid] = hints
523 # XXX mpontillo 2017-08-07: temporary logging
524 log.msg("New topology hints [%s]:\n%s" % (rx.uuid, pformat(hints)))
525- all_hints = set()
526- for hints in self.topology_hints.values():
527- all_hints |= hints
528+ all_hints = self.getAllTopologyHints()
529 log.msg("Topology hint summary:\n%s" % pformat(all_hints))
530
531 def _add_remote_fabric_hints(self, hints, remote_ifinfo, rx):
532@@ -905,7 +947,8 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
533
534 interval = timedelta(seconds=30).total_seconds()
535
536- def __init__(self, clock=None, enable_monitoring=True):
537+ def __init__(
538+ self, clock=None, enable_monitoring=True, enable_beaconing=True):
539 # Order is very important here. First we set the clock to the passed-in
540 # reactor, so that unit tests can fake out the clock if necessary.
541 # Then we call super(). The superclass will set up the structures
542@@ -916,9 +959,11 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
543 self.clock = clock
544 super().__init__()
545 self.enable_monitoring = enable_monitoring
546+ self.enable_beaconing = enable_beaconing
547 # The last successfully recorded interfaces.
548 self._recorded = None
549 self._monitored = frozenset()
550+ self._beaconing = frozenset()
551 self._monitoring_state = {}
552 self._monitoring_mdns = False
553 self._locked = False
554@@ -935,33 +980,25 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
555 self.interface_monitor.setServiceParent(self)
556 self.beaconing_protocol = None
557
558+ @inlineCallbacks
559 def updateInterfaces(self):
560 """Update interfaces, catching and logging errors.
561
562 This can be overridden by subclasses to conditionally update based on
563 some external configuration.
564 """
565- d = maybeDeferred(self._assumeSoleResponsibility)
566-
567- def update(responsible):
568- if responsible:
569- d = maybeDeferred(self.getInterfaces)
570- d.addCallback(self._updateInterfaces)
571- return d
572-
573- def failed(failure):
574- log.err(
575- failure,
576- "Failed to update and/or record network interface "
577- "configuration: %s" % failure.getErrorMessage())
578-
579- d = d.addCallback(update)
580- # During the update, we might fail to get the interface monitoring
581- # state from the region. We can safely ignore this, as it will be
582- # retried shortly.
583- d.addErrback(suppress, NoConnectionsAvailable)
584- d.addErrback(failed)
585- return d
586+ responsible = self._assumeSoleResponsibility()
587+ if responsible:
588+ interfaces = None
589+ try:
590+ interfaces = yield maybeDeferred(self.getInterfaces)
591+ yield self._updateInterfaces(interfaces)
592+ except BaseException as e:
593+ msg = (
594+ "Failed to update and/or record network interface "
595+ "configuration: %s; interfaces: %r" % (e, interfaces)
596+ )
597+ log.err(None, msg)
598
599 def getInterfaces(self):
600 """Get the current network interfaces configuration.
601@@ -978,7 +1015,7 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
602 """
603
604 @abstractmethod
605- def recordInterfaces(self, interfaces):
606+ def recordInterfaces(self, interfaces, hints=None):
607 """Record the interfaces information.
608
609 This MUST be overridden in subclasses.
610@@ -1050,21 +1087,48 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
611 # If we were monitoring neighbours on any interfaces, we need to
612 # stop the monitoring services.
613 self._configureNetworkDiscovery({})
614+ if self.beaconing_protocol is not None:
615+ self._configureBeaconing({})
616
617+ @inlineCallbacks
618 def _updateInterfaces(self, interfaces):
619 """Record `interfaces` if they've changed."""
620 if interfaces != self._recorded:
621- d = maybeDeferred(self.recordInterfaces, interfaces)
622+ hints = None
623+ if self.enable_beaconing:
624+ self._configureBeaconing(interfaces)
625+ # Wait for beaconing to do its thing.
626+ yield pause(3.0)
627+ # Retry beacon soliciations, in case any packet loss occurred
628+ # the first time.
629+ self.beaconing_protocol.queueMulticastBeaconing(
630+ solicitation=True)
631+ yield pause(3.0)
632+ hints = self.beaconing_protocol.getJSONTopologyHints()
633+ yield maybeDeferred(self.recordInterfaces, interfaces, hints)
634 # Note: _interfacesRecorded() will reconfigure discovery after
635 # recording the interfaces, so there is no need to call
636 # _configureNetworkDiscovery() here.
637- d.addCallback(callOut, self._interfacesRecorded, interfaces)
638- return d
639+ self._interfacesRecorded(interfaces)
640 else:
641 # If the interfaces didn't change, we still need to poll for
642 # monitoring state changes.
643- d = maybeDeferred(self._configureNetworkDiscovery, interfaces)
644- return d
645+ yield maybeDeferred(self._configureNetworkDiscovery, interfaces)
646+
647+ def _getInterfacesForBeaconing(self, interfaces: dict):
648+ """Return the interfaces which will be used for beaconing.
649+
650+ :return: The set of interface names to run beaconing on.
651+ """
652+ # Don't beacon when running the test suite/dev env.
653+ # In addition, if we don't own the lock, we should not be beaconing.
654+ if is_dev_environment() or not self._locked or interfaces is None:
655+ return set()
656+ monitored_interfaces = {
657+ ifname for ifname, ifdata in interfaces.items()
658+ if ifdata['monitored'] is True
659+ }
660+ return monitored_interfaces
661
662 def _getInterfacesForNeighbourDiscovery(
663 self, interfaces: dict, monitoring_state: dict):
664@@ -1227,6 +1291,32 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
665 # doesn't matter for mDNS discovery purposes.)
666 pass
667
668+ def _configureBeaconing(self, interfaces):
669+ beaconing_interfaces = self._getInterfacesForBeaconing(interfaces)
670+ # Calculate the difference between the sets. We need to know which
671+ # interfaces were added and deleted (with respect to the interfaces we
672+ # were already beaconing on).
673+ new_interfaces = beaconing_interfaces.difference(self._beaconing)
674+ deleted_interfaces = self._beaconing.difference(beaconing_interfaces)
675+ if len(new_interfaces) > 0:
676+ log.msg("Starting beaconing for interfaces: %r" % (new_interfaces))
677+ self._startBeaconingServices(new_interfaces)
678+ if len(deleted_interfaces) > 0:
679+ log.msg(
680+ "Stopping beaconing for interfaces: %r" % (deleted_interfaces))
681+ self._stopBeaconingServices(deleted_interfaces)
682+ self._beaconing = beaconing_interfaces
683+ if self.beaconing_protocol is None:
684+ self.beaconing_protocol = BeaconingSocketProtocol(
685+ self.clock, interfaces=interfaces)
686+ else:
687+ self.beaconing_protocol.updateInterfaces(interfaces)
688+ # If the interfaces have changed, perform beaconing again.
689+ # An empty dictionary will be passed in when the service stops, so
690+ # don't bother sending out beacons we won't reply to.
691+ if len(interfaces) > 0:
692+ self.beaconing_protocol.queueMulticastBeaconing(solicitation=True)
693+
694 def _configureNeighbourDiscovery(self, interfaces, monitoring_state):
695 monitored_interfaces = self._getInterfacesForNeighbourDiscovery(
696 interfaces, monitoring_state)
697@@ -1239,26 +1329,15 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
698 log.msg("Starting neighbour discovery for interfaces: %r" % (
699 new_interfaces))
700 self._startNeighbourDiscoveryServices(new_interfaces)
701- # XXX mpontillo 2017-07-12: for testing, just start beaconing
702- # services on all the interfaces enabled for active discovery.
703- self._startBeaconingServices(new_interfaces)
704 if len(deleted_interfaces) > 0:
705 log.msg(
706 "Stopping neighbour discovery for interfaces: %r" % (
707 deleted_interfaces))
708 self._stopNeighbourDiscoveryServices(deleted_interfaces)
709- # XXX mpontillo 2017-07-12: this should be separately configured.
710- # (see similar comment in the 'start' path above.)
711- self._stopBeaconingServices(deleted_interfaces)
712 self._monitored = monitored_interfaces
713
714 def _interfacesRecorded(self, interfaces):
715 """The given `interfaces` were recorded successfully."""
716 self._recorded = interfaces
717- if self.beaconing_protocol is None:
718- self.beaconing_protocol = BeaconingSocketProtocol(
719- self.clock, interfaces=interfaces)
720- else:
721- self.beaconing_protocol.updateInterfaces(interfaces)
722 if self.enable_monitoring is True:
723 self._configureNetworkDiscovery(interfaces)
724diff --git a/src/provisioningserver/utils/tests/test_services.py b/src/provisioningserver/utils/tests/test_services.py
725index acfc60e..36090ea 100644
726--- a/src/provisioningserver/utils/tests/test_services.py
727+++ b/src/provisioningserver/utils/tests/test_services.py
728@@ -76,8 +76,12 @@ from twisted.python.failure import Failure
729 class StubNetworksMonitoringService(NetworksMonitoringService):
730 """Concrete subclass for testing."""
731
732- def __init__(self, enable_monitoring=False, *args, **kwargs):
733- super().__init__(enable_monitoring=enable_monitoring, *args, **kwargs)
734+ def __init__(
735+ self, enable_monitoring=False, enable_beaconing=False,
736+ *args, **kwargs):
737+ super().__init__(
738+ *args, enable_monitoring=enable_monitoring,
739+ enable_beaconing=enable_beaconing, **kwargs)
740 self.iterations = DeferredQueue()
741 self.interfaces = []
742 self.update_interface__calls = 0
743@@ -91,7 +95,7 @@ class StubNetworksMonitoringService(NetworksMonitoringService):
744 d.addBoth(self.iterations.put)
745 return d
746
747- def recordInterfaces(self, interfaces):
748+ def recordInterfaces(self, interfaces, hints=None):
749 self.interfaces.append(interfaces)
750
751 def reportNeighbours(self, neighbours):
752@@ -221,13 +225,13 @@ class TestNetworksMonitoringService(MAASTestCase):
753 # recordInterfaces is called the first time, as expected.
754 recordInterfaces.reset_mock()
755 yield service.updateInterfaces()
756- self.assertThat(recordInterfaces, MockCalledOnceWith({}))
757+ self.assertThat(recordInterfaces, MockCalledOnceWith({}, None))
758
759 # recordInterfaces is called the second time too; the service noted
760 # that it crashed last time and knew to run it again.
761 recordInterfaces.reset_mock()
762 yield service.updateInterfaces()
763- self.assertThat(recordInterfaces, MockCalledOnceWith({}))
764+ self.assertThat(recordInterfaces, MockCalledOnceWith({}, None))
765
766 # recordInterfaces is NOT called the third time; the service noted
767 # that the configuration had not changed.
768@@ -1004,3 +1008,128 @@ class TestBeaconingSocketProtocol(SharedSecretTestCase):
769 }
770 self.assertThat(hints, Equals(expected_hints))
771 yield protocol.stopProtocol()
772+
773+ @inlineCallbacks
774+ def test__getJSONTopologyHints_converts_hints_to_dictionary(self):
775+ # Note: Always use a random port for testing. (port=0)
776+ protocol = BeaconingSocketProtocol(
777+ reactor, port=0, process_incoming=False, loopback=True,
778+ interface="::", debug=True)
779+ # Don't try to send out any replies.
780+ self.patch(services, 'create_beacon_payload')
781+ self.patch(protocol, 'send_beacon')
782+ # Need to generate a real UUID with the current time, so it doesn't
783+ # get aged out.
784+ uuid = str(uuid1())
785+ # Make the protocol think we sent a beacon with this UUID already.
786+ tx_mac = factory.make_mac_address()
787+ fake_tx_beacon = FakeBeaconPayload(
788+ uuid, ifname='eth1', mac=tx_mac, vid=100)
789+ fake_rx_beacon = {
790+ "source_ip": "127.0.0.1",
791+ "source_port": 5240,
792+ "destination_ip": "224.0.0.118",
793+ "interface": "eth0",
794+ "type": "solicitation",
795+ "payload": fake_tx_beacon.payload
796+ }
797+ protocol.beaconReceived(fake_rx_beacon)
798+ all_hints = protocol.getJSONTopologyHints()
799+ expected_hints = [
800+ # Note: since vid=None on the received beacon, we expect that
801+ # the hint won't have a 'vid' field.
802+ dict(
803+ ifname='eth0', hint="on_remote_network",
804+ related_ifname='eth1', related_vid=100,
805+ related_mac=tx_mac),
806+ ]
807+ self.assertThat(all_hints, Equals(expected_hints))
808+ yield protocol.stopProtocol()
809+
810+ @inlineCallbacks
811+ def test__queues_multicast_beacon_soliciations_upon_request(self):
812+ # Note: Always use a random port for testing. (port=0)
813+ clock = Clock()
814+ protocol = BeaconingSocketProtocol(
815+ clock, port=0, process_incoming=False, loopback=True,
816+ interface="::", debug=True)
817+ # Don't try to send out any replies.
818+ self.patch(services, 'create_beacon_payload')
819+ send_mcast_mock = self.patch(protocol, 'send_multicast_beacons')
820+ self.patch(protocol, 'send_beacon')
821+ yield protocol.queueMulticastBeaconing(solicitation=True)
822+ clock.advance(0)
823+ self.assertThat(
824+ send_mcast_mock, MockCalledOnceWith({}, 'solicitation'))
825+
826+ @inlineCallbacks
827+ def test__multicasts_at_most_once_per_five_seconds(self):
828+ # Note: Always use a random port for testing. (port=0)
829+ clock = Clock()
830+ protocol = BeaconingSocketProtocol(
831+ clock, port=0, process_incoming=False, loopback=True,
832+ interface="::", debug=True)
833+ # Don't try to send out any replies.
834+ self.patch(services, 'create_beacon_payload')
835+ monotonic_mock = self.patch(services.time, 'monotonic')
836+ send_mcast_mock = self.patch(protocol, 'send_multicast_beacons')
837+ self.patch(protocol, 'send_beacon')
838+ monotonic_mock.side_effect = [
839+ # Initial queue
840+ 6,
841+ # Initial dequeue
842+ 6,
843+ # Second queue (hasn't yet been 5 seconds)
844+ 10,
845+ # Third queue
846+ 11,
847+ # Second dequeue
848+ 11,
849+ ]
850+ yield protocol.queueMulticastBeaconing()
851+ clock.advance(0)
852+ self.assertThat(
853+ send_mcast_mock, MockCalledOnceWith({}, 'advertisement'))
854+ send_mcast_mock.reset_mock()
855+ yield protocol.queueMulticastBeaconing()
856+ yield protocol.queueMulticastBeaconing(solicitation=True)
857+ clock.advance(4.9)
858+ self.assertThat(
859+ send_mcast_mock, MockNotCalled())
860+ clock.advance(0.1)
861+ self.assertThat(
862+ send_mcast_mock, MockCalledOnceWith({}, 'solicitation'))
863+
864+ @inlineCallbacks
865+ def test__multiple_beacon_requests_coalesced(self):
866+ # Note: Always use a random port for testing. (port=0)
867+ clock = Clock()
868+ protocol = BeaconingSocketProtocol(
869+ clock, port=0, process_incoming=False, loopback=True,
870+ interface="::", debug=True)
871+ # Don't try to send out any replies.
872+ self.patch(services, 'create_beacon_payload')
873+ send_mcast_mock = self.patch(protocol, 'send_multicast_beacons')
874+ self.patch(protocol, 'send_beacon')
875+ yield protocol.queueMulticastBeaconing()
876+ yield protocol.queueMulticastBeaconing()
877+ clock.advance(5)
878+ self.assertThat(
879+ send_mcast_mock, MockCalledOnceWith({}, 'advertisement'))
880+
881+ @inlineCallbacks
882+ def test__solicitation_wins_when_multiple_requests_queued(self):
883+ # Note: Always use a random port for testing. (port=0)
884+ clock = Clock()
885+ protocol = BeaconingSocketProtocol(
886+ clock, port=0, process_incoming=False, loopback=True,
887+ interface="::", debug=True)
888+ # Don't try to send out any replies.
889+ self.patch(services, 'create_beacon_payload')
890+ send_mcast_mock = self.patch(protocol, 'send_multicast_beacons')
891+ self.patch(protocol, 'send_beacon')
892+ yield protocol.queueMulticastBeaconing()
893+ yield protocol.queueMulticastBeaconing(solicitation=True)
894+ clock.advance(5)
895+ self.assertThat(
896+ send_mcast_mock, MockCalledOnceWith({}, 'solicitation'))

Subscribers

People subscribed via source and target branches