Merge ~mpontillo/maas:beaconing--use-hints-for-interface-update into maas:master

Proposed by Mike Pontillo
Status: Merged
Approved by: Mike Pontillo
Approved revision: b10bf14b5bdbfb3bb4348891079f3c175e72e43b
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~mpontillo/maas:beaconing--use-hints-for-interface-update
Merge into: maas:master
Diff against target: 896 lines (+381/-97)
13 files modified
.gitignore (+1/-1)
src/maasserver/models/node.py (+2/-1)
src/maasserver/regiondservices/networks_monitoring.py (+5/-4)
src/maasserver/regiondservices/tests/test_networks_monitoring.py (+2/-1)
src/maasserver/rpc/rackcontrollers.py (+2/-2)
src/maasserver/rpc/regionservice.py (+7/-4)
src/maasserver/rpc/tests/test_rackcontrollers.py (+1/-1)
src/maasserver/rpc/tests/test_regionservice_calls.py (+2/-1)
src/provisioningserver/rackdservices/networks_monitoring_service.py (+16/-12)
src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py (+74/-10)
src/provisioningserver/rpc/region.py (+1/-0)
src/provisioningserver/utils/services.py (+134/-55)
src/provisioningserver/utils/tests/test_services.py (+134/-5)
Reviewer Review Type Date Requested Status
Blake Rouse (community) Approve
Review via email: mp+328815@code.launchpad.net

Commit message

Send topology hints to update_interfaces()

 * Add code to convert topology hints to a compact JSON format.
 * Start beaconing as soon as interfaces are known; don't
   wait for interface monitoring settings. This means
   beaconing will happen on all interfaces, all the time.
   (Without regard for network discovery settings.)
 * Fix bug that caused update_interfaces() to be called
   with create_fabrics=True when it should have been False.
 * Wait for beaconing to complete before updating interfaces.
 * Drive-by fix for incorrect line in .gitignore which
   erroneously covered django16_south_maas19.tar.gz.

To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) wrote :

Looks good. Just one question that I am not going to block you on. But if its the case that a region controller will never run beaconing? The question is why? Seems like it should.

review: Approve
Revision history for this message
Mike Pontillo (mpontillo) wrote :

Thanks for the review. The NetworksMonitoringService always runs beaconing, regardless of whether it's on the rack or the region. (The line you pointed out was in the tests; the tests for the region didn't need much change, since the region service simply calls update_interfaces() rather than going through the RPC layer.)

Revision history for this message
MAAS Lander (maas-lander) wrote :

LANDING
-b beaconing--use-hints-for-interface-update lp:~mpontillo/maas into -b master lp:~maas-committers/maas

STATUS: FAILED BUILD
LOG: http://maas-ci-jenkins.internal:8080/job/maas/job/branch-tester/194/consoleText

Revision history for this message
MAAS Lander (maas-lander) wrote :

LANDING
-b beaconing--use-hints-for-interface-update lp:~mpontillo/maas into -b master lp:~maas-committers/maas

STATUS: FAILED BUILD
LOG: http://maas-ci-jenkins.internal:8080/job/maas/job/branch-tester/198/consoleText

Revision history for this message
Mike Pontillo (mpontillo) wrote :

Fixed merge conflict with the .gitignore file, which slipped in when another branch landed.

Revision history for this message
MAAS Lander (maas-lander) wrote :

LANDING
-b beaconing--use-hints-for-interface-update lp:~mpontillo/maas into -b master lp:~maas-committers/maas

STATUS: FAILED BUILD
LOG: http://maas-ci-jenkins.internal:8080/job/maas/job/branch-tester/200/consoleText

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/.gitignore b/.gitignore
index 20d7b98..714a000 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,7 @@
19/.noseids19/.noseids
20/.run20/.run
21/.run-e2e21/.run-e2e
22/*.orig.tar.gz
22/bin23/bin
23/build24/build
24/build_pkg25/build_pkg
@@ -57,4 +58,3 @@ __pycache__
57*~58*~
58\#*\#59\#*\#
59.#*60.#*
60*.tar.gz
diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py
index 127669b..389f84c 100644
--- a/src/maasserver/models/node.py
+++ b/src/maasserver/models/node.py
@@ -4657,7 +4657,8 @@ class Controller(Node):
4657 @with_connection4657 @with_connection
4658 @synchronised(locks.rack_registration)4658 @synchronised(locks.rack_registration)
4659 @transactional4659 @transactional
4660 def update_interfaces(self, interfaces, create_fabrics=True):4660 def update_interfaces(
4661 self, interfaces, topology_hints=None, create_fabrics=True):
4661 """Update the interfaces attached to the controller.4662 """Update the interfaces attached to the controller.
46624663
4663 :param interfaces: Interfaces dictionary that was parsed from4664 :param interfaces: Interfaces dictionary that was parsed from
diff --git a/src/maasserver/regiondservices/networks_monitoring.py b/src/maasserver/regiondservices/networks_monitoring.py
index 9427bef..9cf313a 100644
--- a/src/maasserver/regiondservices/networks_monitoring.py
+++ b/src/maasserver/regiondservices/networks_monitoring.py
@@ -23,9 +23,10 @@ class RegionNetworksMonitoringService(NetworksMonitoringService):
23 """Get interface monitoring state from the region."""23 """Get interface monitoring state from the region."""
24 return deferToDatabase(self.getInterfaceMonitoringStateFromDatabase)24 return deferToDatabase(self.getInterfaceMonitoringStateFromDatabase)
2525
26 def recordInterfaces(self, interfaces):26 def recordInterfaces(self, interfaces, hints=None):
27 """Record the interfaces information."""27 """Record the interfaces information."""
28 return deferToDatabase(self.recordInterfacesIntoDatabase, interfaces)28 return deferToDatabase(
29 self.recordInterfacesIntoDatabase, interfaces, hints)
2930
30 def reportNeighbours(self, neighbours):31 def reportNeighbours(self, neighbours):
31 """Record the specified list of neighbours."""32 """Record the specified list of neighbours."""
@@ -42,10 +43,10 @@ class RegionNetworksMonitoringService(NetworksMonitoringService):
42 return region_controller.get_discovery_state()43 return region_controller.get_discovery_state()
4344
44 @transactional45 @transactional
45 def recordInterfacesIntoDatabase(self, interfaces):46 def recordInterfacesIntoDatabase(self, interfaces, hints):
46 """Record the interfaces information."""47 """Record the interfaces information."""
47 region_controller = RegionController.objects.get_running_controller()48 region_controller = RegionController.objects.get_running_controller()
48 region_controller.update_interfaces(interfaces)49 region_controller.update_interfaces(interfaces, hints)
4950
50 @transactional51 @transactional
51 def recordNeighboursIntoDatabase(self, neighbours):52 def recordNeighboursIntoDatabase(self, neighbours):
diff --git a/src/maasserver/regiondservices/tests/test_networks_monitoring.py b/src/maasserver/regiondservices/tests/test_networks_monitoring.py
index 62933ba..e49e34b 100644
--- a/src/maasserver/regiondservices/tests/test_networks_monitoring.py
+++ b/src/maasserver/regiondservices/tests/test_networks_monitoring.py
@@ -52,7 +52,8 @@ class TestRegionNetworksMonitoringService(MAASTransactionServerTestCase):
52 }52 }
53 }53 }
5454
55 service = RegionNetworksMonitoringService(reactor)55 service = RegionNetworksMonitoringService(
56 reactor, enable_beaconing=False)
56 service.getInterfaces = lambda: succeed(interfaces)57 service.getInterfaces = lambda: succeed(interfaces)
5758
58 with FakeLogger("maas") as logger:59 with FakeLogger("maas") as logger:
diff --git a/src/maasserver/rpc/rackcontrollers.py b/src/maasserver/rpc/rackcontrollers.py
index d7ed7e7..d80477c 100644
--- a/src/maasserver/rpc/rackcontrollers.py
+++ b/src/maasserver/rpc/rackcontrollers.py
@@ -222,10 +222,10 @@ def update_foreign_dhcp(system_id, interface_name, dhcp_ip=None):
222222
223@synchronous223@synchronous
224@transactional224@transactional
225def update_interfaces(system_id, interfaces):225def update_interfaces(system_id, interfaces, topology_hints=None):
226 """Update the interface definition on the rack controller."""226 """Update the interface definition on the rack controller."""
227 rack_controller = RackController.objects.get(system_id=system_id)227 rack_controller = RackController.objects.get(system_id=system_id)
228 rack_controller.update_interfaces(interfaces)228 rack_controller.update_interfaces(interfaces, topology_hints)
229229
230230
231@synchronous231@synchronous
diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py
index c6fc9d9..340a1be 100644
--- a/src/maasserver/rpc/regionservice.py
+++ b/src/maasserver/rpc/regionservice.py
@@ -404,14 +404,15 @@ class Region(RPCProtocol):
404 return d404 return d
405405
406 @region.UpdateInterfaces.responder406 @region.UpdateInterfaces.responder
407 def update_interfaces(self, system_id, interfaces):407 def update_interfaces(self, system_id, interfaces, topology_hints=None):
408 """update_interfaces()408 """update_interfaces()
409409
410 Implementation of410 Implementation of
411 :py:class:`~provisioningserver.rpc.region.UpdateInterfaces`.411 :py:class:`~provisioningserver.rpc.region.UpdateInterfaces`.
412 """412 """
413 d = deferToDatabase(413 d = deferToDatabase(
414 rackcontrollers.update_interfaces, system_id, interfaces)414 rackcontrollers.update_interfaces, system_id, interfaces,
415 topology_hints=topology_hints)
415 d.addCallback(lambda args: {})416 d.addCallback(lambda args: {})
416 return d417 return d
417418
@@ -616,8 +617,10 @@ class RegionServer(Region):
616 def register(617 def register(
617 self, system_id, hostname, interfaces, url, nodegroup_uuid=None,618 self, system_id, hostname, interfaces, url, nodegroup_uuid=None,
618 beacon_support=False, version=None):619 beacon_support=False, version=None):
619 # If beacons is None, register in legacy mode.620 # Hold off on fabric creation if the remote controller
620 create_fabrics = True if beacon_support else False621 # supports beacons; it will happen later when UpdateInterfaces is
622 # called.
623 create_fabrics = False if beacon_support else True
621 result = yield self._register(624 result = yield self._register(
622 system_id, hostname, interfaces, url,625 system_id, hostname, interfaces, url,
623 nodegroup_uuid=nodegroup_uuid, create_fabrics=create_fabrics,626 nodegroup_uuid=nodegroup_uuid, create_fabrics=create_fabrics,
diff --git a/src/maasserver/rpc/tests/test_rackcontrollers.py b/src/maasserver/rpc/tests/test_rackcontrollers.py
index 1f40522..5392b32 100644
--- a/src/maasserver/rpc/tests/test_rackcontrollers.py
+++ b/src/maasserver/rpc/tests/test_rackcontrollers.py
@@ -479,7 +479,7 @@ class TestUpdateInterfaces(MAASServerTestCase):
479 update_interfaces(rack_controller.system_id, sentinel.interfaces)479 update_interfaces(rack_controller.system_id, sentinel.interfaces)
480 self.assertThat(480 self.assertThat(
481 patched_update_interfaces,481 patched_update_interfaces,
482 MockCalledOnceWith(sentinel.interfaces))482 MockCalledOnceWith(sentinel.interfaces, None))
483483
484484
485class TestReportNeighbours(MAASServerTestCase):485class TestReportNeighbours(MAASServerTestCase):
diff --git a/src/maasserver/rpc/tests/test_regionservice_calls.py b/src/maasserver/rpc/tests/test_regionservice_calls.py
index 1d1d497..209ee44 100644
--- a/src/maasserver/rpc/tests/test_regionservice_calls.py
+++ b/src/maasserver/rpc/tests/test_regionservice_calls.py
@@ -1182,7 +1182,8 @@ class TestRegionProtocol_UpdateInterfaces(MAASTransactionServerTestCase):
1182 self.assertThat(1182 self.assertThat(
1183 update_interfaces,1183 update_interfaces,
1184 MockCalledOnceWith(1184 MockCalledOnceWith(
1185 params['system_id'], params['interfaces']))1185 params['system_id'], params['interfaces'],
1186 topology_hints=None))
11861187
11871188
1188class TestRegionProtocol_ReportNeighbours(MAASTestCase):1189class TestRegionProtocol_ReportNeighbours(MAASTestCase):
diff --git a/src/provisioningserver/rackdservices/networks_monitoring_service.py b/src/provisioningserver/rackdservices/networks_monitoring_service.py
index 1de76f2..d8f7e8c 100644
--- a/src/provisioningserver/rackdservices/networks_monitoring_service.py
+++ b/src/provisioningserver/rackdservices/networks_monitoring_service.py
@@ -8,6 +8,7 @@ __all__ = [
8]8]
99
10from provisioningserver.logger import get_maas_logger10from provisioningserver.logger import get_maas_logger
11from provisioningserver.rpc.exceptions import NoConnectionsAvailable
11from provisioningserver.rpc.region import (12from provisioningserver.rpc.region import (
12 GetDiscoveryState,13 GetDiscoveryState,
13 ReportMDNSEntries,14 ReportMDNSEntries,
@@ -16,6 +17,8 @@ from provisioningserver.rpc.region import (
16 UpdateInterfaces,17 UpdateInterfaces,
17)18)
18from provisioningserver.utils.services import NetworksMonitoringService19from provisioningserver.utils.services import NetworksMonitoringService
20from provisioningserver.utils.twisted import pause
21from twisted.internet.defer import inlineCallbacks
1922
2023
21maaslog = get_maas_logger("networks.monitor")24maaslog = get_maas_logger("networks.monitor")
@@ -44,20 +47,21 @@ class RackNetworksMonitoringService(NetworksMonitoringService):
44 d.addCallback(getState)47 d.addCallback(getState)
45 return d48 return d
4649
47 def recordInterfaces(self, interfaces):50 @inlineCallbacks
51 def recordInterfaces(self, interfaces, hints=None):
48 """Record the interfaces information to the region."""52 """Record the interfaces information to the region."""
49 def record(client):53 while self.running:
50 # On first run perform a refresh54 try:
55 client = yield self.clientService.getClientNow()
56 except NoConnectionsAvailable:
57 yield pause(1.0)
58 continue
51 if self._recorded is None:59 if self._recorded is None:
52 return client(RequestRackRefresh, system_id=client.localIdent)60 yield client(RequestRackRefresh, system_id=client.localIdent)
53 else:61 yield client(
54 return client(62 UpdateInterfaces, system_id=client.localIdent,
55 UpdateInterfaces, system_id=client.localIdent,63 interfaces=interfaces, topology_hints=hints)
56 interfaces=interfaces)64 break
57
58 d = self.clientService.getClientNow()
59 d.addCallback(record)
60 return d
6165
62 def reportNeighbours(self, neighbours):66 def reportNeighbours(self, neighbours):
63 """Report neighbour information to the region."""67 """Report neighbour information to the region."""
diff --git a/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py b/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py
index d325125..2404cfa 100644
--- a/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py
+++ b/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py
@@ -5,8 +5,13 @@
55
6__all__ = []6__all__ = []
77
8from unittest.mock import call
9
8from maastesting.factory import factory10from maastesting.factory import factory
9from maastesting.matchers import MockCalledOnceWith11from maastesting.matchers import (
12 MockCalledOnceWith,
13 MockCallsMatch,
14)
10from maastesting.testcase import (15from maastesting.testcase import (
11 MAASTestCase,16 MAASTestCase,
12 MAASTwistedRunTest,17 MAASTwistedRunTest,
@@ -17,6 +22,7 @@ from provisioningserver.rackdservices.networks_monitoring_service import (
17)22)
18from provisioningserver.rpc import region23from provisioningserver.rpc import region
19from provisioningserver.rpc.testing import MockLiveClusterToRegionRPCFixture24from provisioningserver.rpc.testing import MockLiveClusterToRegionRPCFixture
25from provisioningserver.utils import services as services_module
20from twisted.internet.defer import (26from twisted.internet.defer import (
21 inlineCallbacks,27 inlineCallbacks,
22 maybeDeferred,28 maybeDeferred,
@@ -27,7 +33,7 @@ from twisted.internet.task import Clock
2733
28class TestRackNetworksMonitoringService(MAASTestCase):34class TestRackNetworksMonitoringService(MAASTestCase):
2935
30 run_tests_with = MAASTwistedRunTest.make_factory(debug=False, timeout=5)36 run_tests_with = MAASTwistedRunTest.make_factory(debug=True, timeout=5)
3137
32 @inlineCallbacks38 @inlineCallbacks
33 def test_runs_refresh_first_time(self):39 def test_runs_refresh_first_time(self):
@@ -37,10 +43,14 @@ class TestRackNetworksMonitoringService(MAASTestCase):
3743
38 rpc_service = services.getServiceNamed('rpc')44 rpc_service = services.getServiceNamed('rpc')
39 service = RackNetworksMonitoringService(45 service = RackNetworksMonitoringService(
40 rpc_service, Clock(), enable_monitoring=False)46 rpc_service, Clock(), enable_monitoring=False,
47 enable_beaconing=False)
4148
42 yield service.startService()49 yield maybeDeferred(service.startService)
43 yield service.stopService()50 # By stopping the interface_monitor first, we assure that the loop
51 # happens at least once before the service stops completely.
52 yield maybeDeferred(service.interface_monitor.stopService)
53 yield maybeDeferred(service.stopService)
4454
45 self.assertThat(55 self.assertThat(
46 protocol.RequestRackRefresh, MockCalledOnceWith(56 protocol.RequestRackRefresh, MockCalledOnceWith(
@@ -64,7 +74,8 @@ class TestRackNetworksMonitoringService(MAASTestCase):
6474
65 rpc_service = services.getServiceNamed('rpc')75 rpc_service = services.getServiceNamed('rpc')
66 service = RackNetworksMonitoringService(76 service = RackNetworksMonitoringService(
67 rpc_service, Clock(), enable_monitoring=False)77 rpc_service, Clock(), enable_monitoring=False,
78 enable_beaconing=False)
68 service.getInterfaces = lambda: succeed(interfaces)79 service.getInterfaces = lambda: succeed(interfaces)
69 # Put something in the cache. This tells recordInterfaces that refresh80 # Put something in the cache. This tells recordInterfaces that refresh
70 # has already run but the interfaces have changed thus they need to be81 # has already run but the interfaces have changed thus they need to be
@@ -77,7 +88,57 @@ class TestRackNetworksMonitoringService(MAASTestCase):
77 self.assertThat(88 self.assertThat(
78 protocol.UpdateInterfaces, MockCalledOnceWith(89 protocol.UpdateInterfaces, MockCalledOnceWith(
79 protocol, system_id=rpc_service.getClient().localIdent,90 protocol, system_id=rpc_service.getClient().localIdent,
80 interfaces=interfaces))91 interfaces=interfaces, topology_hints=None))
92
93 @inlineCallbacks
94 def test_reports_interfaces_with_hints_if_beaconing_enabled(self):
95 fixture = self.useFixture(MockLiveClusterToRegionRPCFixture())
96 protocol, connecting = fixture.makeEventLoop(region.UpdateInterfaces)
97 # Don't actually wait for beaconing to complete.
98 pause_mock = self.patch(services_module, 'pause')
99 queue_mcast_mock = self.patch(
100 services_module.BeaconingSocketProtocol, 'queueMulticastBeaconing')
101 self.addCleanup((yield connecting))
102
103 interfaces = {
104 "eth0": {
105 "type": "physical",
106 "mac_address": factory.make_mac_address(),
107 "parents": [],
108 "links": [],
109 "enabled": True,
110 }
111 }
112
113 rpc_service = services.getServiceNamed('rpc')
114 service = RackNetworksMonitoringService(
115 rpc_service, Clock(), enable_monitoring=False,
116 enable_beaconing=True)
117 service.getInterfaces = lambda: succeed(interfaces)
118 # Put something in the cache. This tells recordInterfaces that refresh
119 # has already run but the interfaces have changed thus they need to be
120 # updated.
121 service._recorded = {}
122
123 service.startService()
124 yield service.stopService()
125
126 self.assertThat(
127 protocol.UpdateInterfaces, MockCalledOnceWith(
128 protocol, system_id=rpc_service.getClient().localIdent,
129 interfaces=interfaces, topology_hints=[]))
130 # The service should have sent out beacons, waited three seconds,
131 # solicited for more beacons, then waited another three seconds before
132 # deciding that beaconing is complete.
133 self.assertThat(pause_mock, MockCallsMatch(call(3.0), call(3.0)))
134 self.assertThat(
135 queue_mcast_mock, MockCallsMatch(
136 # Called when the service starts.
137 call(solicitation=True),
138 # Called three seconds later.
139 call(solicitation=True),
140 # Not called again when the service shuts down.
141 ))
81142
82 @inlineCallbacks143 @inlineCallbacks
83 def test_reports_neighbours_to_region(self):144 def test_reports_neighbours_to_region(self):
@@ -87,7 +148,8 @@ class TestRackNetworksMonitoringService(MAASTestCase):
87 self.addCleanup((yield connecting))148 self.addCleanup((yield connecting))
88 rpc_service = services.getServiceNamed('rpc')149 rpc_service = services.getServiceNamed('rpc')
89 service = RackNetworksMonitoringService(150 service = RackNetworksMonitoringService(
90 rpc_service, Clock(), enable_monitoring=False)151 rpc_service, Clock(), enable_monitoring=False,
152 enable_beaconing=False)
91 neighbours = [{"ip": factory.make_ip_address()}]153 neighbours = [{"ip": factory.make_ip_address()}]
92 yield service.reportNeighbours(neighbours)154 yield service.reportNeighbours(neighbours)
93 self.assertThat(155 self.assertThat(
@@ -103,7 +165,8 @@ class TestRackNetworksMonitoringService(MAASTestCase):
103 self.addCleanup((yield connecting))165 self.addCleanup((yield connecting))
104 rpc_service = services.getServiceNamed('rpc')166 rpc_service = services.getServiceNamed('rpc')
105 service = RackNetworksMonitoringService(167 service = RackNetworksMonitoringService(
106 rpc_service, Clock(), enable_monitoring=False)168 rpc_service, Clock(), enable_monitoring=False,
169 enable_beaconing=False)
107 mdns = [170 mdns = [
108 {171 {
109 'interface': 'eth0',172 'interface': 'eth0',
@@ -126,7 +189,8 @@ class TestRackNetworksMonitoringService(MAASTestCase):
126 rpc_service = services.getServiceNamed('rpc')189 rpc_service = services.getServiceNamed('rpc')
127 reactor = Clock()190 reactor = Clock()
128 service = RackNetworksMonitoringService(191 service = RackNetworksMonitoringService(
129 rpc_service, reactor, enable_monitoring=False)192 rpc_service, reactor, enable_monitoring=False,
193 enable_beaconing=False)
130 protocol.GetDiscoveryState.return_value = {'interfaces': {}}194 protocol.GetDiscoveryState.return_value = {'interfaces': {}}
131 # Put something in the cache. This tells recordInterfaces that refresh195 # Put something in the cache. This tells recordInterfaces that refresh
132 # has already run but the interfaces have changed thus they need to be196 # has already run but the interfaces have changed thus they need to be
diff --git a/src/provisioningserver/rpc/region.py b/src/provisioningserver/rpc/region.py
index ab16e4b..58c43ab 100644
--- a/src/provisioningserver/rpc/region.py
+++ b/src/provisioningserver/rpc/region.py
@@ -460,6 +460,7 @@ class UpdateInterfaces(amp.Command):
460 arguments = [460 arguments = [
461 (b'system_id', amp.Unicode()),461 (b'system_id', amp.Unicode()),
462 (b'interfaces', StructureAsJSON()),462 (b'interfaces', StructureAsJSON()),
463 (b'topology_hints', StructureAsJSON(optional=True)),
463 ]464 ]
464 response = []465 response = []
465 errors = []466 errors = []
diff --git a/src/provisioningserver/utils/services.py b/src/provisioningserver/utils/services.py
index 5a43922..cb5c81b 100644
--- a/src/provisioningserver/utils/services.py
+++ b/src/provisioningserver/utils/services.py
@@ -28,7 +28,6 @@ from provisioningserver.logger import (
28 get_maas_logger,28 get_maas_logger,
29 LegacyLogger,29 LegacyLogger,
30)30)
31from provisioningserver.rpc.exceptions import NoConnectionsAvailable
32from provisioningserver.utils.beaconing import (31from provisioningserver.utils.beaconing import (
33 age_out_uuid_queue,32 age_out_uuid_queue,
34 BEACON_IPV4_MULTICAST,33 BEACON_IPV4_MULTICAST,
@@ -52,7 +51,7 @@ from provisioningserver.utils.shell import select_c_utf8_bytes_locale
52from provisioningserver.utils.twisted import (51from provisioningserver.utils.twisted import (
53 callOut,52 callOut,
54 deferred,53 deferred,
55 suppress,54 pause,
56 terminateProcess,55 terminateProcess,
57)56)
58from twisted.application.internet import TimerService57from twisted.application.internet import TimerService
@@ -425,9 +424,15 @@ def join_ipv6_beacon_group(sock, ifindex):
425 socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) +424 socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) +
426 struct.pack("I", ifindex)425 struct.pack("I", ifindex)
427 )426 )
428 sock.setsockopt(427 try:
429 socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP,428 sock.setsockopt(
430 ipv6_join_sockopt_args)429 socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP,
430 ipv6_join_sockopt_args)
431 except OSError:
432 # Do this on a best-effort basis. We might get an "Address already in
433 # use" error if the group is already joined, or (for whatever reason)
434 # it is not possible to join a multicast group using this interface.
435 pass
431436
432437
433def set_ipv6_multicast_loopback(sock, loopback):438def set_ipv6_multicast_loopback(sock, loopback):
@@ -472,6 +477,7 @@ class BeaconingSocketProtocol(DatagramProtocol):
472 self.topology_hints = OrderedDict()477 self.topology_hints = OrderedDict()
473 self.listen_port = None478 self.listen_port = None
474 self.mcast_requested = False479 self.mcast_requested = False
480 self.mcast_solicitation = False
475 self.last_solicited_mcast = 0481 self.last_solicited_mcast = 0
476 self._join_multicast_groups()482 self._join_multicast_groups()
477483
@@ -482,8 +488,9 @@ class BeaconingSocketProtocol(DatagramProtocol):
482 verifyObject(IReactorMulticast, self.reactor)488 verifyObject(IReactorMulticast, self.reactor)
483 except DoesNotImplement:489 except DoesNotImplement:
484 return490 return
485 self.listen_port = self.reactor.listenMulticast(491 if self.listen_port is None:
486 self.port, self, interface=self.interface, listenMultiple=True)492 self.listen_port = self.reactor.listenMulticast(
493 self.port, self, interface=self.interface, listenMultiple=True)
487 sock = self.transport.getHandle()494 sock = self.transport.getHandle()
488 if self.loopback is True:495 if self.loopback is True:
489 # This is only necessary for testing.496 # This is only necessary for testing.
@@ -512,6 +519,34 @@ class BeaconingSocketProtocol(DatagramProtocol):
512 self.interfaces = interfaces519 self.interfaces = interfaces
513 self._join_multicast_groups()520 self._join_multicast_groups()
514521
522 def getAllTopologyHints(self):
523 """Returns the set of unique topology hints."""
524 # When beaconing runs, hints attached to individual packets might
525 # come to the same conclusion about the implied fabric connectivity.
526 # Use a set to prevent the region from processing duplicate hints.
527 all_hints = set()
528 for hints in self.topology_hints.values():
529 all_hints |= hints
530 return all_hints
531
532 def getJSONTopologyHints(self):
533 """Returns all topology hints as a list of dictionaries.
534
535 This method is used for sending data via the RPC layer, so be cautious
536 when modifying. In addition, keys with no value are filtered out
537 of the resulting dictionary, so that the hints are smaller on the wire.
538 """
539 all_hints = self.getAllTopologyHints()
540 json_hints = [
541 {
542 key: value
543 for key, value in hint._asdict().items()
544 if value is not None
545 }
546 for hint in all_hints
547 ]
548 return json_hints
549
515 def stopProtocol(self):550 def stopProtocol(self):
516 super().stopProtocol()551 super().stopProtocol()
517 if self.listen_port is not None:552 if self.listen_port is not None:
@@ -629,27 +664,36 @@ class BeaconingSocketProtocol(DatagramProtocol):
629 reply = create_beacon_payload("advertisement", payload)664 reply = create_beacon_payload("advertisement", payload)
630 self.send_beacon(reply, beacon.reply_address)665 self.send_beacon(reply, beacon.reply_address)
631 if len(self.interfaces) > 0:666 if len(self.interfaces) > 0:
632 self.queueMulticastAdvertisement()667 self.queueMulticastBeaconing()
633668
634 def dequeueMulticastAdvertisement(self):669 def dequeueMulticastBeaconing(self):
635 """670 """
636 Callback to send multicast beacon advertisements.671 Callback to send multicast beacon advertisements.
637672
638 See `queueMulticastAdvertisement`, which schedules this method to run.673 See `queueMulticastAdvertisement`, which schedules this method to run.
639 """674 """
640 mtime = time.monotonic()675 mtime = time.monotonic()
676 beacon_type = (
677 'solicitation' if self.mcast_solicitation else 'advertisement')
678 log.msg("Sending multicast beacon %ss." % beacon_type)
679 self.send_multicast_beacons(self.interfaces, beacon_type)
641 self.last_solicited_mcast = mtime680 self.last_solicited_mcast = mtime
642 self.mcast_requested = False681 self.mcast_requested = False
643 log.msg("Sending multicast beacon advertisements.")682 self.mcast_solicitation = False
644 self.send_multicast_beacons(self.interfaces, 'advertisement')
645683
646 def queueMulticastAdvertisement(self):684 def queueMulticastBeaconing(self, solicitation=False):
647 """685 """
648 Requests that multicast advertisements be sent out on every interface.686 Requests that multicast advertisements be sent out on every interface.
649687
650 Ensures that advertisements will not be sent more than once every688 Ensures that advertisements will not be sent more than once every
651 five seconds.689 five seconds.
690
691 :param solicitation: If true, sends solicitations rather than
692 advertisements. Solicitations are used to initiate "full beaconing"
693 with peers; advertisements do not generate beacon replies.
652 """694 """
695 if solicitation is True:
696 self.mcast_solicitation = True
653 if self.mcast_requested:697 if self.mcast_requested:
654 # A multicast advertisement has been requested already.698 # A multicast advertisement has been requested already.
655 return699 return
@@ -660,7 +704,7 @@ class BeaconingSocketProtocol(DatagramProtocol):
660 else:704 else:
661 timeout = max(mtime - self.last_solicited_mcast, 5)705 timeout = max(mtime - self.last_solicited_mcast, 5)
662 self.mcast_requested = True706 self.mcast_requested = True
663 self.reactor.callLater(timeout, self.dequeueMulticastAdvertisement)707 self.reactor.callLater(timeout, self.dequeueMulticastBeaconing)
664708
665 def processTopologyHints(self, rx: ReceivedBeacon):709 def processTopologyHints(self, rx: ReceivedBeacon):
666 """710 """
@@ -692,9 +736,7 @@ class BeaconingSocketProtocol(DatagramProtocol):
692 self.topology_hints[rx.uuid] = hints736 self.topology_hints[rx.uuid] = hints
693 # XXX mpontillo 2017-08-07: temporary logging737 # XXX mpontillo 2017-08-07: temporary logging
694 log.msg("New topology hints [%s]:\n%s" % (rx.uuid, pformat(hints)))738 log.msg("New topology hints [%s]:\n%s" % (rx.uuid, pformat(hints)))
695 all_hints = set()739 all_hints = self.getAllTopologyHints()
696 for hints in self.topology_hints.values():
697 all_hints |= hints
698 log.msg("Topology hint summary:\n%s" % pformat(all_hints))740 log.msg("Topology hint summary:\n%s" % pformat(all_hints))
699741
700 def _add_remote_fabric_hints(self, hints, remote_ifinfo, rx):742 def _add_remote_fabric_hints(self, hints, remote_ifinfo, rx):
@@ -905,7 +947,8 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
905947
906 interval = timedelta(seconds=30).total_seconds()948 interval = timedelta(seconds=30).total_seconds()
907949
908 def __init__(self, clock=None, enable_monitoring=True):950 def __init__(
951 self, clock=None, enable_monitoring=True, enable_beaconing=True):
909 # Order is very important here. First we set the clock to the passed-in952 # Order is very important here. First we set the clock to the passed-in
910 # reactor, so that unit tests can fake out the clock if necessary.953 # reactor, so that unit tests can fake out the clock if necessary.
911 # Then we call super(). The superclass will set up the structures954 # Then we call super(). The superclass will set up the structures
@@ -916,9 +959,11 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
916 self.clock = clock959 self.clock = clock
917 super().__init__()960 super().__init__()
918 self.enable_monitoring = enable_monitoring961 self.enable_monitoring = enable_monitoring
962 self.enable_beaconing = enable_beaconing
919 # The last successfully recorded interfaces.963 # The last successfully recorded interfaces.
920 self._recorded = None964 self._recorded = None
921 self._monitored = frozenset()965 self._monitored = frozenset()
966 self._beaconing = frozenset()
922 self._monitoring_state = {}967 self._monitoring_state = {}
923 self._monitoring_mdns = False968 self._monitoring_mdns = False
924 self._locked = False969 self._locked = False
@@ -935,33 +980,25 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
935 self.interface_monitor.setServiceParent(self)980 self.interface_monitor.setServiceParent(self)
936 self.beaconing_protocol = None981 self.beaconing_protocol = None
937982
983 @inlineCallbacks
938 def updateInterfaces(self):984 def updateInterfaces(self):
939 """Update interfaces, catching and logging errors.985 """Update interfaces, catching and logging errors.
940986
941 This can be overridden by subclasses to conditionally update based on987 This can be overridden by subclasses to conditionally update based on
942 some external configuration.988 some external configuration.
943 """989 """
944 d = maybeDeferred(self._assumeSoleResponsibility)990 responsible = self._assumeSoleResponsibility()
945991 if responsible:
946 def update(responsible):992 interfaces = None
947 if responsible:993 try:
948 d = maybeDeferred(self.getInterfaces)994 interfaces = yield maybeDeferred(self.getInterfaces)
949 d.addCallback(self._updateInterfaces)995 yield self._updateInterfaces(interfaces)
950 return d996 except BaseException as e:
951997 msg = (
952 def failed(failure):998 "Failed to update and/or record network interface "
953 log.err(999 "configuration: %s; interfaces: %r" % (e, interfaces)
954 failure,1000 )
955 "Failed to update and/or record network interface "1001 log.err(None, msg)
956 "configuration: %s" % failure.getErrorMessage())
957
958 d = d.addCallback(update)
959 # During the update, we might fail to get the interface monitoring
960 # state from the region. We can safely ignore this, as it will be
961 # retried shortly.
962 d.addErrback(suppress, NoConnectionsAvailable)
963 d.addErrback(failed)
964 return d
9651002
966 def getInterfaces(self):1003 def getInterfaces(self):
967 """Get the current network interfaces configuration.1004 """Get the current network interfaces configuration.
@@ -978,7 +1015,7 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
978 """1015 """
9791016
980 @abstractmethod1017 @abstractmethod
981 def recordInterfaces(self, interfaces):1018 def recordInterfaces(self, interfaces, hints=None):
982 """Record the interfaces information.1019 """Record the interfaces information.
9831020
984 This MUST be overridden in subclasses.1021 This MUST be overridden in subclasses.
@@ -1050,21 +1087,48 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
1050 # If we were monitoring neighbours on any interfaces, we need to1087 # If we were monitoring neighbours on any interfaces, we need to
1051 # stop the monitoring services.1088 # stop the monitoring services.
1052 self._configureNetworkDiscovery({})1089 self._configureNetworkDiscovery({})
1090 if self.beaconing_protocol is not None:
1091 self._configureBeaconing({})
10531092
1093 @inlineCallbacks
1054 def _updateInterfaces(self, interfaces):1094 def _updateInterfaces(self, interfaces):
1055 """Record `interfaces` if they've changed."""1095 """Record `interfaces` if they've changed."""
1056 if interfaces != self._recorded:1096 if interfaces != self._recorded:
1057 d = maybeDeferred(self.recordInterfaces, interfaces)1097 hints = None
1098 if self.enable_beaconing:
1099 self._configureBeaconing(interfaces)
1100 # Wait for beaconing to do its thing.
1101 yield pause(3.0)
1102 # Retry beacon soliciations, in case any packet loss occurred
1103 # the first time.
1104 self.beaconing_protocol.queueMulticastBeaconing(
1105 solicitation=True)
1106 yield pause(3.0)
1107 hints = self.beaconing_protocol.getJSONTopologyHints()
1108 yield maybeDeferred(self.recordInterfaces, interfaces, hints)
1058 # Note: _interfacesRecorded() will reconfigure discovery after1109 # Note: _interfacesRecorded() will reconfigure discovery after
1059 # recording the interfaces, so there is no need to call1110 # recording the interfaces, so there is no need to call
1060 # _configureNetworkDiscovery() here.1111 # _configureNetworkDiscovery() here.
1061 d.addCallback(callOut, self._interfacesRecorded, interfaces)1112 self._interfacesRecorded(interfaces)
1062 return d
1063 else:1113 else:
1064 # If the interfaces didn't change, we still need to poll for1114 # If the interfaces didn't change, we still need to poll for
1065 # monitoring state changes.1115 # monitoring state changes.
1066 d = maybeDeferred(self._configureNetworkDiscovery, interfaces)1116 yield maybeDeferred(self._configureNetworkDiscovery, interfaces)
1067 return d1117
1118 def _getInterfacesForBeaconing(self, interfaces: dict):
1119 """Return the interfaces which will be used for beaconing.
1120
1121 :return: The set of interface names to run beaconing on.
1122 """
1123 # Don't beacon when running the test suite/dev env.
1124 # In addition, if we don't own the lock, we should not be beaconing.
1125 if is_dev_environment() or not self._locked or interfaces is None:
1126 return set()
1127 monitored_interfaces = {
1128 ifname for ifname, ifdata in interfaces.items()
1129 if ifdata['monitored'] is True
1130 }
1131 return monitored_interfaces
10681132
1069 def _getInterfacesForNeighbourDiscovery(1133 def _getInterfacesForNeighbourDiscovery(
1070 self, interfaces: dict, monitoring_state: dict):1134 self, interfaces: dict, monitoring_state: dict):
@@ -1227,6 +1291,32 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
1227 # doesn't matter for mDNS discovery purposes.)1291 # doesn't matter for mDNS discovery purposes.)
1228 pass1292 pass
12291293
1294 def _configureBeaconing(self, interfaces):
1295 beaconing_interfaces = self._getInterfacesForBeaconing(interfaces)
1296 # Calculate the difference between the sets. We need to know which
1297 # interfaces were added and deleted (with respect to the interfaces we
1298 # were already beaconing on).
1299 new_interfaces = beaconing_interfaces.difference(self._beaconing)
1300 deleted_interfaces = self._beaconing.difference(beaconing_interfaces)
1301 if len(new_interfaces) > 0:
1302 log.msg("Starting beaconing for interfaces: %r" % (new_interfaces))
1303 self._startBeaconingServices(new_interfaces)
1304 if len(deleted_interfaces) > 0:
1305 log.msg(
1306 "Stopping beaconing for interfaces: %r" % (deleted_interfaces))
1307 self._stopBeaconingServices(deleted_interfaces)
1308 self._beaconing = beaconing_interfaces
1309 if self.beaconing_protocol is None:
1310 self.beaconing_protocol = BeaconingSocketProtocol(
1311 self.clock, interfaces=interfaces)
1312 else:
1313 self.beaconing_protocol.updateInterfaces(interfaces)
1314 # If the interfaces have changed, perform beaconing again.
1315 # An empty dictionary will be passed in when the service stops, so
1316 # don't bother sending out beacons we won't reply to.
1317 if len(interfaces) > 0:
1318 self.beaconing_protocol.queueMulticastBeaconing(solicitation=True)
1319
1230 def _configureNeighbourDiscovery(self, interfaces, monitoring_state):1320 def _configureNeighbourDiscovery(self, interfaces, monitoring_state):
1231 monitored_interfaces = self._getInterfacesForNeighbourDiscovery(1321 monitored_interfaces = self._getInterfacesForNeighbourDiscovery(
1232 interfaces, monitoring_state)1322 interfaces, monitoring_state)
@@ -1239,26 +1329,15 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta):
1239 log.msg("Starting neighbour discovery for interfaces: %r" % (1329 log.msg("Starting neighbour discovery for interfaces: %r" % (
1240 new_interfaces))1330 new_interfaces))
1241 self._startNeighbourDiscoveryServices(new_interfaces)1331 self._startNeighbourDiscoveryServices(new_interfaces)
1242 # XXX mpontillo 2017-07-12: for testing, just start beaconing
1243 # services on all the interfaces enabled for active discovery.
1244 self._startBeaconingServices(new_interfaces)
1245 if len(deleted_interfaces) > 0:1332 if len(deleted_interfaces) > 0:
1246 log.msg(1333 log.msg(
1247 "Stopping neighbour discovery for interfaces: %r" % (1334 "Stopping neighbour discovery for interfaces: %r" % (
1248 deleted_interfaces))1335 deleted_interfaces))
1249 self._stopNeighbourDiscoveryServices(deleted_interfaces)1336 self._stopNeighbourDiscoveryServices(deleted_interfaces)
1250 # XXX mpontillo 2017-07-12: this should be separately configured.
1251 # (see similar comment in the 'start' path above.)
1252 self._stopBeaconingServices(deleted_interfaces)
1253 self._monitored = monitored_interfaces1337 self._monitored = monitored_interfaces
12541338
1255 def _interfacesRecorded(self, interfaces):1339 def _interfacesRecorded(self, interfaces):
1256 """The given `interfaces` were recorded successfully."""1340 """The given `interfaces` were recorded successfully."""
1257 self._recorded = interfaces1341 self._recorded = interfaces
1258 if self.beaconing_protocol is None:
1259 self.beaconing_protocol = BeaconingSocketProtocol(
1260 self.clock, interfaces=interfaces)
1261 else:
1262 self.beaconing_protocol.updateInterfaces(interfaces)
1263 if self.enable_monitoring is True:1342 if self.enable_monitoring is True:
1264 self._configureNetworkDiscovery(interfaces)1343 self._configureNetworkDiscovery(interfaces)
diff --git a/src/provisioningserver/utils/tests/test_services.py b/src/provisioningserver/utils/tests/test_services.py
index acfc60e..36090ea 100644
--- a/src/provisioningserver/utils/tests/test_services.py
+++ b/src/provisioningserver/utils/tests/test_services.py
@@ -76,8 +76,12 @@ from twisted.python.failure import Failure
76class StubNetworksMonitoringService(NetworksMonitoringService):76class StubNetworksMonitoringService(NetworksMonitoringService):
77 """Concrete subclass for testing."""77 """Concrete subclass for testing."""
7878
79 def __init__(self, enable_monitoring=False, *args, **kwargs):79 def __init__(
80 super().__init__(enable_monitoring=enable_monitoring, *args, **kwargs)80 self, enable_monitoring=False, enable_beaconing=False,
81 *args, **kwargs):
82 super().__init__(
83 *args, enable_monitoring=enable_monitoring,
84 enable_beaconing=enable_beaconing, **kwargs)
81 self.iterations = DeferredQueue()85 self.iterations = DeferredQueue()
82 self.interfaces = []86 self.interfaces = []
83 self.update_interface__calls = 087 self.update_interface__calls = 0
@@ -91,7 +95,7 @@ class StubNetworksMonitoringService(NetworksMonitoringService):
91 d.addBoth(self.iterations.put)95 d.addBoth(self.iterations.put)
92 return d96 return d
9397
94 def recordInterfaces(self, interfaces):98 def recordInterfaces(self, interfaces, hints=None):
95 self.interfaces.append(interfaces)99 self.interfaces.append(interfaces)
96100
97 def reportNeighbours(self, neighbours):101 def reportNeighbours(self, neighbours):
@@ -221,13 +225,13 @@ class TestNetworksMonitoringService(MAASTestCase):
221 # recordInterfaces is called the first time, as expected.225 # recordInterfaces is called the first time, as expected.
222 recordInterfaces.reset_mock()226 recordInterfaces.reset_mock()
223 yield service.updateInterfaces()227 yield service.updateInterfaces()
224 self.assertThat(recordInterfaces, MockCalledOnceWith({}))228 self.assertThat(recordInterfaces, MockCalledOnceWith({}, None))
225229
226 # recordInterfaces is called the second time too; the service noted230 # recordInterfaces is called the second time too; the service noted
227 # that it crashed last time and knew to run it again.231 # that it crashed last time and knew to run it again.
228 recordInterfaces.reset_mock()232 recordInterfaces.reset_mock()
229 yield service.updateInterfaces()233 yield service.updateInterfaces()
230 self.assertThat(recordInterfaces, MockCalledOnceWith({}))234 self.assertThat(recordInterfaces, MockCalledOnceWith({}, None))
231235
232 # recordInterfaces is NOT called the third time; the service noted236 # recordInterfaces is NOT called the third time; the service noted
233 # that the configuration had not changed.237 # that the configuration had not changed.
@@ -1004,3 +1008,128 @@ class TestBeaconingSocketProtocol(SharedSecretTestCase):
1004 }1008 }
1005 self.assertThat(hints, Equals(expected_hints))1009 self.assertThat(hints, Equals(expected_hints))
1006 yield protocol.stopProtocol()1010 yield protocol.stopProtocol()
1011
1012 @inlineCallbacks
1013 def test__getJSONTopologyHints_converts_hints_to_dictionary(self):
1014 # Note: Always use a random port for testing. (port=0)
1015 protocol = BeaconingSocketProtocol(
1016 reactor, port=0, process_incoming=False, loopback=True,
1017 interface="::", debug=True)
1018 # Don't try to send out any replies.
1019 self.patch(services, 'create_beacon_payload')
1020 self.patch(protocol, 'send_beacon')
1021 # Need to generate a real UUID with the current time, so it doesn't
1022 # get aged out.
1023 uuid = str(uuid1())
1024 # Make the protocol think we sent a beacon with this UUID already.
1025 tx_mac = factory.make_mac_address()
1026 fake_tx_beacon = FakeBeaconPayload(
1027 uuid, ifname='eth1', mac=tx_mac, vid=100)
1028 fake_rx_beacon = {
1029 "source_ip": "127.0.0.1",
1030 "source_port": 5240,
1031 "destination_ip": "224.0.0.118",
1032 "interface": "eth0",
1033 "type": "solicitation",
1034 "payload": fake_tx_beacon.payload
1035 }
1036 protocol.beaconReceived(fake_rx_beacon)
1037 all_hints = protocol.getJSONTopologyHints()
1038 expected_hints = [
1039 # Note: since vid=None on the received beacon, we expect that
1040 # the hint won't have a 'vid' field.
1041 dict(
1042 ifname='eth0', hint="on_remote_network",
1043 related_ifname='eth1', related_vid=100,
1044 related_mac=tx_mac),
1045 ]
1046 self.assertThat(all_hints, Equals(expected_hints))
1047 yield protocol.stopProtocol()
1048
1049 @inlineCallbacks
1050 def test__queues_multicast_beacon_soliciations_upon_request(self):
1051 # Note: Always use a random port for testing. (port=0)
1052 clock = Clock()
1053 protocol = BeaconingSocketProtocol(
1054 clock, port=0, process_incoming=False, loopback=True,
1055 interface="::", debug=True)
1056 # Don't try to send out any replies.
1057 self.patch(services, 'create_beacon_payload')
1058 send_mcast_mock = self.patch(protocol, 'send_multicast_beacons')
1059 self.patch(protocol, 'send_beacon')
1060 yield protocol.queueMulticastBeaconing(solicitation=True)
1061 clock.advance(0)
1062 self.assertThat(
1063 send_mcast_mock, MockCalledOnceWith({}, 'solicitation'))
1064
1065 @inlineCallbacks
1066 def test__multicasts_at_most_once_per_five_seconds(self):
1067 # Note: Always use a random port for testing. (port=0)
1068 clock = Clock()
1069 protocol = BeaconingSocketProtocol(
1070 clock, port=0, process_incoming=False, loopback=True,
1071 interface="::", debug=True)
1072 # Don't try to send out any replies.
1073 self.patch(services, 'create_beacon_payload')
1074 monotonic_mock = self.patch(services.time, 'monotonic')
1075 send_mcast_mock = self.patch(protocol, 'send_multicast_beacons')
1076 self.patch(protocol, 'send_beacon')
1077 monotonic_mock.side_effect = [
1078 # Initial queue
1079 6,
1080 # Initial dequeue
1081 6,
1082 # Second queue (hasn't yet been 5 seconds)
1083 10,
1084 # Third queue
1085 11,
1086 # Second dequeue
1087 11,
1088 ]
1089 yield protocol.queueMulticastBeaconing()
1090 clock.advance(0)
1091 self.assertThat(
1092 send_mcast_mock, MockCalledOnceWith({}, 'advertisement'))
1093 send_mcast_mock.reset_mock()
1094 yield protocol.queueMulticastBeaconing()
1095 yield protocol.queueMulticastBeaconing(solicitation=True)
1096 clock.advance(4.9)
1097 self.assertThat(
1098 send_mcast_mock, MockNotCalled())
1099 clock.advance(0.1)
1100 self.assertThat(
1101 send_mcast_mock, MockCalledOnceWith({}, 'solicitation'))
1102
1103 @inlineCallbacks
1104 def test__multiple_beacon_requests_coalesced(self):
1105 # Note: Always use a random port for testing. (port=0)
1106 clock = Clock()
1107 protocol = BeaconingSocketProtocol(
1108 clock, port=0, process_incoming=False, loopback=True,
1109 interface="::", debug=True)
1110 # Don't try to send out any replies.
1111 self.patch(services, 'create_beacon_payload')
1112 send_mcast_mock = self.patch(protocol, 'send_multicast_beacons')
1113 self.patch(protocol, 'send_beacon')
1114 yield protocol.queueMulticastBeaconing()
1115 yield protocol.queueMulticastBeaconing()
1116 clock.advance(5)
1117 self.assertThat(
1118 send_mcast_mock, MockCalledOnceWith({}, 'advertisement'))
1119
1120 @inlineCallbacks
1121 def test__solicitation_wins_when_multiple_requests_queued(self):
1122 # Note: Always use a random port for testing. (port=0)
1123 clock = Clock()
1124 protocol = BeaconingSocketProtocol(
1125 clock, port=0, process_incoming=False, loopback=True,
1126 interface="::", debug=True)
1127 # Don't try to send out any replies.
1128 self.patch(services, 'create_beacon_payload')
1129 send_mcast_mock = self.patch(protocol, 'send_multicast_beacons')
1130 self.patch(protocol, 'send_beacon')
1131 yield protocol.queueMulticastBeaconing()
1132 yield protocol.queueMulticastBeaconing(solicitation=True)
1133 clock.advance(5)
1134 self.assertThat(
1135 send_mcast_mock, MockCalledOnceWith({}, 'solicitation'))

Subscribers

People subscribed via source and target branches