Merge ~mpontillo/maas:beaconing--use-hints-for-interface-update into maas:master
- Git
- lp:~mpontillo/maas
- beaconing--use-hints-for-interface-update
- Merge into master
Status: | Merged |
---|---|
Approved by: | Mike Pontillo |
Approved revision: | b10bf14b5bdbfb3bb4348891079f3c175e72e43b |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~mpontillo/maas:beaconing--use-hints-for-interface-update |
Merge into: | maas:master |
Diff against target: |
896 lines (+381/-97) 13 files modified
.gitignore (+1/-1) src/maasserver/models/node.py (+2/-1) src/maasserver/regiondservices/networks_monitoring.py (+5/-4) src/maasserver/regiondservices/tests/test_networks_monitoring.py (+2/-1) src/maasserver/rpc/rackcontrollers.py (+2/-2) src/maasserver/rpc/regionservice.py (+7/-4) src/maasserver/rpc/tests/test_rackcontrollers.py (+1/-1) src/maasserver/rpc/tests/test_regionservice_calls.py (+2/-1) src/provisioningserver/rackdservices/networks_monitoring_service.py (+16/-12) src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py (+74/-10) src/provisioningserver/rpc/region.py (+1/-0) src/provisioningserver/utils/services.py (+134/-55) src/provisioningserver/utils/tests/test_services.py (+134/-5) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Blake Rouse (community) | Approve | ||
Review via email: mp+328815@code.launchpad.net |
Commit message
Send topology hints to update_interfaces()
* Add code to convert topology hints to a compact JSON format.
* Start beaconing as soon as interfaces are known; don't
wait for interface monitoring settings. This means
beaconing will happen on all interfaces, all the time.
(Without regard for network discovery settings.)
* Fix bug that caused update_interfaces() to be called
with create_fabrics=True when it should have been False.
* Wait for beaconing to complete before updating interfaces.
* Drive-by fix for incorrect line in .gitignore which
erroneously covered django16_
Description of the change
Mike Pontillo (mpontillo) wrote : | # |
Thanks for the review. The NetworksMonitor
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b beaconing-
STATUS: FAILED BUILD
LOG: http://
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b beaconing-
STATUS: FAILED BUILD
LOG: http://
Mike Pontillo (mpontillo) wrote : | # |
Fixed merge conflict with the .gitignore file, which slipped in when another branch landed.
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b beaconing-
STATUS: FAILED BUILD
LOG: http://
Preview Diff
1 | diff --git a/.gitignore b/.gitignore | |||
2 | index 20d7b98..714a000 100644 | |||
3 | --- a/.gitignore | |||
4 | +++ b/.gitignore | |||
5 | @@ -19,6 +19,7 @@ | |||
6 | 19 | /.noseids | 19 | /.noseids |
7 | 20 | /.run | 20 | /.run |
8 | 21 | /.run-e2e | 21 | /.run-e2e |
9 | 22 | /*.orig.tar.gz | ||
10 | 22 | /bin | 23 | /bin |
11 | 23 | /build | 24 | /build |
12 | 24 | /build_pkg | 25 | /build_pkg |
13 | @@ -57,4 +58,3 @@ __pycache__ | |||
14 | 57 | *~ | 58 | *~ |
15 | 58 | \#*\# | 59 | \#*\# |
16 | 59 | .#* | 60 | .#* |
17 | 60 | *.tar.gz | ||
18 | diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py | |||
19 | index 127669b..389f84c 100644 | |||
20 | --- a/src/maasserver/models/node.py | |||
21 | +++ b/src/maasserver/models/node.py | |||
22 | @@ -4657,7 +4657,8 @@ class Controller(Node): | |||
23 | 4657 | @with_connection | 4657 | @with_connection |
24 | 4658 | @synchronised(locks.rack_registration) | 4658 | @synchronised(locks.rack_registration) |
25 | 4659 | @transactional | 4659 | @transactional |
27 | 4660 | def update_interfaces(self, interfaces, create_fabrics=True): | 4660 | def update_interfaces( |
28 | 4661 | self, interfaces, topology_hints=None, create_fabrics=True): | ||
29 | 4661 | """Update the interfaces attached to the controller. | 4662 | """Update the interfaces attached to the controller. |
30 | 4662 | 4663 | ||
31 | 4663 | :param interfaces: Interfaces dictionary that was parsed from | 4664 | :param interfaces: Interfaces dictionary that was parsed from |
32 | diff --git a/src/maasserver/regiondservices/networks_monitoring.py b/src/maasserver/regiondservices/networks_monitoring.py | |||
33 | index 9427bef..9cf313a 100644 | |||
34 | --- a/src/maasserver/regiondservices/networks_monitoring.py | |||
35 | +++ b/src/maasserver/regiondservices/networks_monitoring.py | |||
36 | @@ -23,9 +23,10 @@ class RegionNetworksMonitoringService(NetworksMonitoringService): | |||
37 | 23 | """Get interface monitoring state from the region.""" | 23 | """Get interface monitoring state from the region.""" |
38 | 24 | return deferToDatabase(self.getInterfaceMonitoringStateFromDatabase) | 24 | return deferToDatabase(self.getInterfaceMonitoringStateFromDatabase) |
39 | 25 | 25 | ||
41 | 26 | def recordInterfaces(self, interfaces): | 26 | def recordInterfaces(self, interfaces, hints=None): |
42 | 27 | """Record the interfaces information.""" | 27 | """Record the interfaces information.""" |
44 | 28 | return deferToDatabase(self.recordInterfacesIntoDatabase, interfaces) | 28 | return deferToDatabase( |
45 | 29 | self.recordInterfacesIntoDatabase, interfaces, hints) | ||
46 | 29 | 30 | ||
47 | 30 | def reportNeighbours(self, neighbours): | 31 | def reportNeighbours(self, neighbours): |
48 | 31 | """Record the specified list of neighbours.""" | 32 | """Record the specified list of neighbours.""" |
49 | @@ -42,10 +43,10 @@ class RegionNetworksMonitoringService(NetworksMonitoringService): | |||
50 | 42 | return region_controller.get_discovery_state() | 43 | return region_controller.get_discovery_state() |
51 | 43 | 44 | ||
52 | 44 | @transactional | 45 | @transactional |
54 | 45 | def recordInterfacesIntoDatabase(self, interfaces): | 46 | def recordInterfacesIntoDatabase(self, interfaces, hints): |
55 | 46 | """Record the interfaces information.""" | 47 | """Record the interfaces information.""" |
56 | 47 | region_controller = RegionController.objects.get_running_controller() | 48 | region_controller = RegionController.objects.get_running_controller() |
58 | 48 | region_controller.update_interfaces(interfaces) | 49 | region_controller.update_interfaces(interfaces, hints) |
59 | 49 | 50 | ||
60 | 50 | @transactional | 51 | @transactional |
61 | 51 | def recordNeighboursIntoDatabase(self, neighbours): | 52 | def recordNeighboursIntoDatabase(self, neighbours): |
62 | diff --git a/src/maasserver/regiondservices/tests/test_networks_monitoring.py b/src/maasserver/regiondservices/tests/test_networks_monitoring.py | |||
63 | index 62933ba..e49e34b 100644 | |||
64 | --- a/src/maasserver/regiondservices/tests/test_networks_monitoring.py | |||
65 | +++ b/src/maasserver/regiondservices/tests/test_networks_monitoring.py | |||
66 | @@ -52,7 +52,8 @@ class TestRegionNetworksMonitoringService(MAASTransactionServerTestCase): | |||
67 | 52 | } | 52 | } |
68 | 53 | } | 53 | } |
69 | 54 | 54 | ||
71 | 55 | service = RegionNetworksMonitoringService(reactor) | 55 | service = RegionNetworksMonitoringService( |
72 | 56 | reactor, enable_beaconing=False) | ||
73 | 56 | service.getInterfaces = lambda: succeed(interfaces) | 57 | service.getInterfaces = lambda: succeed(interfaces) |
74 | 57 | 58 | ||
75 | 58 | with FakeLogger("maas") as logger: | 59 | with FakeLogger("maas") as logger: |
76 | diff --git a/src/maasserver/rpc/rackcontrollers.py b/src/maasserver/rpc/rackcontrollers.py | |||
77 | index d7ed7e7..d80477c 100644 | |||
78 | --- a/src/maasserver/rpc/rackcontrollers.py | |||
79 | +++ b/src/maasserver/rpc/rackcontrollers.py | |||
80 | @@ -222,10 +222,10 @@ def update_foreign_dhcp(system_id, interface_name, dhcp_ip=None): | |||
81 | 222 | 222 | ||
82 | 223 | @synchronous | 223 | @synchronous |
83 | 224 | @transactional | 224 | @transactional |
85 | 225 | def update_interfaces(system_id, interfaces): | 225 | def update_interfaces(system_id, interfaces, topology_hints=None): |
86 | 226 | """Update the interface definition on the rack controller.""" | 226 | """Update the interface definition on the rack controller.""" |
87 | 227 | rack_controller = RackController.objects.get(system_id=system_id) | 227 | rack_controller = RackController.objects.get(system_id=system_id) |
89 | 228 | rack_controller.update_interfaces(interfaces) | 228 | rack_controller.update_interfaces(interfaces, topology_hints) |
90 | 229 | 229 | ||
91 | 230 | 230 | ||
92 | 231 | @synchronous | 231 | @synchronous |
93 | diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py | |||
94 | index c6fc9d9..340a1be 100644 | |||
95 | --- a/src/maasserver/rpc/regionservice.py | |||
96 | +++ b/src/maasserver/rpc/regionservice.py | |||
97 | @@ -404,14 +404,15 @@ class Region(RPCProtocol): | |||
98 | 404 | return d | 404 | return d |
99 | 405 | 405 | ||
100 | 406 | @region.UpdateInterfaces.responder | 406 | @region.UpdateInterfaces.responder |
102 | 407 | def update_interfaces(self, system_id, interfaces): | 407 | def update_interfaces(self, system_id, interfaces, topology_hints=None): |
103 | 408 | """update_interfaces() | 408 | """update_interfaces() |
104 | 409 | 409 | ||
105 | 410 | Implementation of | 410 | Implementation of |
106 | 411 | :py:class:`~provisioningserver.rpc.region.UpdateInterfaces`. | 411 | :py:class:`~provisioningserver.rpc.region.UpdateInterfaces`. |
107 | 412 | """ | 412 | """ |
108 | 413 | d = deferToDatabase( | 413 | d = deferToDatabase( |
110 | 414 | rackcontrollers.update_interfaces, system_id, interfaces) | 414 | rackcontrollers.update_interfaces, system_id, interfaces, |
111 | 415 | topology_hints=topology_hints) | ||
112 | 415 | d.addCallback(lambda args: {}) | 416 | d.addCallback(lambda args: {}) |
113 | 416 | return d | 417 | return d |
114 | 417 | 418 | ||
115 | @@ -616,8 +617,10 @@ class RegionServer(Region): | |||
116 | 616 | def register( | 617 | def register( |
117 | 617 | self, system_id, hostname, interfaces, url, nodegroup_uuid=None, | 618 | self, system_id, hostname, interfaces, url, nodegroup_uuid=None, |
118 | 618 | beacon_support=False, version=None): | 619 | beacon_support=False, version=None): |
121 | 619 | # If beacons is None, register in legacy mode. | 620 | # Hold off on fabric creation if the remote controller |
122 | 620 | create_fabrics = True if beacon_support else False | 621 | # supports beacons; it will happen later when UpdateInterfaces is |
123 | 622 | # called. | ||
124 | 623 | create_fabrics = False if beacon_support else True | ||
125 | 621 | result = yield self._register( | 624 | result = yield self._register( |
126 | 622 | system_id, hostname, interfaces, url, | 625 | system_id, hostname, interfaces, url, |
127 | 623 | nodegroup_uuid=nodegroup_uuid, create_fabrics=create_fabrics, | 626 | nodegroup_uuid=nodegroup_uuid, create_fabrics=create_fabrics, |
128 | diff --git a/src/maasserver/rpc/tests/test_rackcontrollers.py b/src/maasserver/rpc/tests/test_rackcontrollers.py | |||
129 | index 1f40522..5392b32 100644 | |||
130 | --- a/src/maasserver/rpc/tests/test_rackcontrollers.py | |||
131 | +++ b/src/maasserver/rpc/tests/test_rackcontrollers.py | |||
132 | @@ -479,7 +479,7 @@ class TestUpdateInterfaces(MAASServerTestCase): | |||
133 | 479 | update_interfaces(rack_controller.system_id, sentinel.interfaces) | 479 | update_interfaces(rack_controller.system_id, sentinel.interfaces) |
134 | 480 | self.assertThat( | 480 | self.assertThat( |
135 | 481 | patched_update_interfaces, | 481 | patched_update_interfaces, |
137 | 482 | MockCalledOnceWith(sentinel.interfaces)) | 482 | MockCalledOnceWith(sentinel.interfaces, None)) |
138 | 483 | 483 | ||
139 | 484 | 484 | ||
140 | 485 | class TestReportNeighbours(MAASServerTestCase): | 485 | class TestReportNeighbours(MAASServerTestCase): |
141 | diff --git a/src/maasserver/rpc/tests/test_regionservice_calls.py b/src/maasserver/rpc/tests/test_regionservice_calls.py | |||
142 | index 1d1d497..209ee44 100644 | |||
143 | --- a/src/maasserver/rpc/tests/test_regionservice_calls.py | |||
144 | +++ b/src/maasserver/rpc/tests/test_regionservice_calls.py | |||
145 | @@ -1182,7 +1182,8 @@ class TestRegionProtocol_UpdateInterfaces(MAASTransactionServerTestCase): | |||
146 | 1182 | self.assertThat( | 1182 | self.assertThat( |
147 | 1183 | update_interfaces, | 1183 | update_interfaces, |
148 | 1184 | MockCalledOnceWith( | 1184 | MockCalledOnceWith( |
150 | 1185 | params['system_id'], params['interfaces'])) | 1185 | params['system_id'], params['interfaces'], |
151 | 1186 | topology_hints=None)) | ||
152 | 1186 | 1187 | ||
153 | 1187 | 1188 | ||
154 | 1188 | class TestRegionProtocol_ReportNeighbours(MAASTestCase): | 1189 | class TestRegionProtocol_ReportNeighbours(MAASTestCase): |
155 | diff --git a/src/provisioningserver/rackdservices/networks_monitoring_service.py b/src/provisioningserver/rackdservices/networks_monitoring_service.py | |||
156 | index 1de76f2..d8f7e8c 100644 | |||
157 | --- a/src/provisioningserver/rackdservices/networks_monitoring_service.py | |||
158 | +++ b/src/provisioningserver/rackdservices/networks_monitoring_service.py | |||
159 | @@ -8,6 +8,7 @@ __all__ = [ | |||
160 | 8 | ] | 8 | ] |
161 | 9 | 9 | ||
162 | 10 | from provisioningserver.logger import get_maas_logger | 10 | from provisioningserver.logger import get_maas_logger |
163 | 11 | from provisioningserver.rpc.exceptions import NoConnectionsAvailable | ||
164 | 11 | from provisioningserver.rpc.region import ( | 12 | from provisioningserver.rpc.region import ( |
165 | 12 | GetDiscoveryState, | 13 | GetDiscoveryState, |
166 | 13 | ReportMDNSEntries, | 14 | ReportMDNSEntries, |
167 | @@ -16,6 +17,8 @@ from provisioningserver.rpc.region import ( | |||
168 | 16 | UpdateInterfaces, | 17 | UpdateInterfaces, |
169 | 17 | ) | 18 | ) |
170 | 18 | from provisioningserver.utils.services import NetworksMonitoringService | 19 | from provisioningserver.utils.services import NetworksMonitoringService |
171 | 20 | from provisioningserver.utils.twisted import pause | ||
172 | 21 | from twisted.internet.defer import inlineCallbacks | ||
173 | 19 | 22 | ||
174 | 20 | 23 | ||
175 | 21 | maaslog = get_maas_logger("networks.monitor") | 24 | maaslog = get_maas_logger("networks.monitor") |
176 | @@ -44,20 +47,21 @@ class RackNetworksMonitoringService(NetworksMonitoringService): | |||
177 | 44 | d.addCallback(getState) | 47 | d.addCallback(getState) |
178 | 45 | return d | 48 | return d |
179 | 46 | 49 | ||
181 | 47 | def recordInterfaces(self, interfaces): | 50 | @inlineCallbacks |
182 | 51 | def recordInterfaces(self, interfaces, hints=None): | ||
183 | 48 | """Record the interfaces information to the region.""" | 52 | """Record the interfaces information to the region.""" |
186 | 49 | def record(client): | 53 | while self.running: |
187 | 50 | # On first run perform a refresh | 54 | try: |
188 | 55 | client = yield self.clientService.getClientNow() | ||
189 | 56 | except NoConnectionsAvailable: | ||
190 | 57 | yield pause(1.0) | ||
191 | 58 | continue | ||
192 | 51 | if self._recorded is None: | 59 | if self._recorded is None: |
202 | 52 | return client(RequestRackRefresh, system_id=client.localIdent) | 60 | yield client(RequestRackRefresh, system_id=client.localIdent) |
203 | 53 | else: | 61 | yield client( |
204 | 54 | return client( | 62 | UpdateInterfaces, system_id=client.localIdent, |
205 | 55 | UpdateInterfaces, system_id=client.localIdent, | 63 | interfaces=interfaces, topology_hints=hints) |
206 | 56 | interfaces=interfaces) | 64 | break |
198 | 57 | |||
199 | 58 | d = self.clientService.getClientNow() | ||
200 | 59 | d.addCallback(record) | ||
201 | 60 | return d | ||
207 | 61 | 65 | ||
208 | 62 | def reportNeighbours(self, neighbours): | 66 | def reportNeighbours(self, neighbours): |
209 | 63 | """Report neighbour information to the region.""" | 67 | """Report neighbour information to the region.""" |
210 | diff --git a/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py b/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py | |||
211 | index d325125..2404cfa 100644 | |||
212 | --- a/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py | |||
213 | +++ b/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py | |||
214 | @@ -5,8 +5,13 @@ | |||
215 | 5 | 5 | ||
216 | 6 | __all__ = [] | 6 | __all__ = [] |
217 | 7 | 7 | ||
218 | 8 | from unittest.mock import call | ||
219 | 9 | |||
220 | 8 | from maastesting.factory import factory | 10 | from maastesting.factory import factory |
222 | 9 | from maastesting.matchers import MockCalledOnceWith | 11 | from maastesting.matchers import ( |
223 | 12 | MockCalledOnceWith, | ||
224 | 13 | MockCallsMatch, | ||
225 | 14 | ) | ||
226 | 10 | from maastesting.testcase import ( | 15 | from maastesting.testcase import ( |
227 | 11 | MAASTestCase, | 16 | MAASTestCase, |
228 | 12 | MAASTwistedRunTest, | 17 | MAASTwistedRunTest, |
229 | @@ -17,6 +22,7 @@ from provisioningserver.rackdservices.networks_monitoring_service import ( | |||
230 | 17 | ) | 22 | ) |
231 | 18 | from provisioningserver.rpc import region | 23 | from provisioningserver.rpc import region |
232 | 19 | from provisioningserver.rpc.testing import MockLiveClusterToRegionRPCFixture | 24 | from provisioningserver.rpc.testing import MockLiveClusterToRegionRPCFixture |
233 | 25 | from provisioningserver.utils import services as services_module | ||
234 | 20 | from twisted.internet.defer import ( | 26 | from twisted.internet.defer import ( |
235 | 21 | inlineCallbacks, | 27 | inlineCallbacks, |
236 | 22 | maybeDeferred, | 28 | maybeDeferred, |
237 | @@ -27,7 +33,7 @@ from twisted.internet.task import Clock | |||
238 | 27 | 33 | ||
239 | 28 | class TestRackNetworksMonitoringService(MAASTestCase): | 34 | class TestRackNetworksMonitoringService(MAASTestCase): |
240 | 29 | 35 | ||
242 | 30 | run_tests_with = MAASTwistedRunTest.make_factory(debug=False, timeout=5) | 36 | run_tests_with = MAASTwistedRunTest.make_factory(debug=True, timeout=5) |
243 | 31 | 37 | ||
244 | 32 | @inlineCallbacks | 38 | @inlineCallbacks |
245 | 33 | def test_runs_refresh_first_time(self): | 39 | def test_runs_refresh_first_time(self): |
246 | @@ -37,10 +43,14 @@ class TestRackNetworksMonitoringService(MAASTestCase): | |||
247 | 37 | 43 | ||
248 | 38 | rpc_service = services.getServiceNamed('rpc') | 44 | rpc_service = services.getServiceNamed('rpc') |
249 | 39 | service = RackNetworksMonitoringService( | 45 | service = RackNetworksMonitoringService( |
251 | 40 | rpc_service, Clock(), enable_monitoring=False) | 46 | rpc_service, Clock(), enable_monitoring=False, |
252 | 47 | enable_beaconing=False) | ||
253 | 41 | 48 | ||
256 | 42 | yield service.startService() | 49 | yield maybeDeferred(service.startService) |
257 | 43 | yield service.stopService() | 50 | # By stopping the interface_monitor first, we assure that the loop |
258 | 51 | # happens at least once before the service stops completely. | ||
259 | 52 | yield maybeDeferred(service.interface_monitor.stopService) | ||
260 | 53 | yield maybeDeferred(service.stopService) | ||
261 | 44 | 54 | ||
262 | 45 | self.assertThat( | 55 | self.assertThat( |
263 | 46 | protocol.RequestRackRefresh, MockCalledOnceWith( | 56 | protocol.RequestRackRefresh, MockCalledOnceWith( |
264 | @@ -64,7 +74,8 @@ class TestRackNetworksMonitoringService(MAASTestCase): | |||
265 | 64 | 74 | ||
266 | 65 | rpc_service = services.getServiceNamed('rpc') | 75 | rpc_service = services.getServiceNamed('rpc') |
267 | 66 | service = RackNetworksMonitoringService( | 76 | service = RackNetworksMonitoringService( |
269 | 67 | rpc_service, Clock(), enable_monitoring=False) | 77 | rpc_service, Clock(), enable_monitoring=False, |
270 | 78 | enable_beaconing=False) | ||
271 | 68 | service.getInterfaces = lambda: succeed(interfaces) | 79 | service.getInterfaces = lambda: succeed(interfaces) |
272 | 69 | # Put something in the cache. This tells recordInterfaces that refresh | 80 | # Put something in the cache. This tells recordInterfaces that refresh |
273 | 70 | # has already run but the interfaces have changed thus they need to be | 81 | # has already run but the interfaces have changed thus they need to be |
274 | @@ -77,7 +88,57 @@ class TestRackNetworksMonitoringService(MAASTestCase): | |||
275 | 77 | self.assertThat( | 88 | self.assertThat( |
276 | 78 | protocol.UpdateInterfaces, MockCalledOnceWith( | 89 | protocol.UpdateInterfaces, MockCalledOnceWith( |
277 | 79 | protocol, system_id=rpc_service.getClient().localIdent, | 90 | protocol, system_id=rpc_service.getClient().localIdent, |
279 | 80 | interfaces=interfaces)) | 91 | interfaces=interfaces, topology_hints=None)) |
280 | 92 | |||
281 | 93 | @inlineCallbacks | ||
282 | 94 | def test_reports_interfaces_with_hints_if_beaconing_enabled(self): | ||
283 | 95 | fixture = self.useFixture(MockLiveClusterToRegionRPCFixture()) | ||
284 | 96 | protocol, connecting = fixture.makeEventLoop(region.UpdateInterfaces) | ||
285 | 97 | # Don't actually wait for beaconing to complete. | ||
286 | 98 | pause_mock = self.patch(services_module, 'pause') | ||
287 | 99 | queue_mcast_mock = self.patch( | ||
288 | 100 | services_module.BeaconingSocketProtocol, 'queueMulticastBeaconing') | ||
289 | 101 | self.addCleanup((yield connecting)) | ||
290 | 102 | |||
291 | 103 | interfaces = { | ||
292 | 104 | "eth0": { | ||
293 | 105 | "type": "physical", | ||
294 | 106 | "mac_address": factory.make_mac_address(), | ||
295 | 107 | "parents": [], | ||
296 | 108 | "links": [], | ||
297 | 109 | "enabled": True, | ||
298 | 110 | } | ||
299 | 111 | } | ||
300 | 112 | |||
301 | 113 | rpc_service = services.getServiceNamed('rpc') | ||
302 | 114 | service = RackNetworksMonitoringService( | ||
303 | 115 | rpc_service, Clock(), enable_monitoring=False, | ||
304 | 116 | enable_beaconing=True) | ||
305 | 117 | service.getInterfaces = lambda: succeed(interfaces) | ||
306 | 118 | # Put something in the cache. This tells recordInterfaces that refresh | ||
307 | 119 | # has already run but the interfaces have changed thus they need to be | ||
308 | 120 | # updated. | ||
309 | 121 | service._recorded = {} | ||
310 | 122 | |||
311 | 123 | service.startService() | ||
312 | 124 | yield service.stopService() | ||
313 | 125 | |||
314 | 126 | self.assertThat( | ||
315 | 127 | protocol.UpdateInterfaces, MockCalledOnceWith( | ||
316 | 128 | protocol, system_id=rpc_service.getClient().localIdent, | ||
317 | 129 | interfaces=interfaces, topology_hints=[])) | ||
318 | 130 | # The service should have sent out beacons, waited three seconds, | ||
319 | 131 | # solicited for more beacons, then waited another three seconds before | ||
320 | 132 | # deciding that beaconing is complete. | ||
321 | 133 | self.assertThat(pause_mock, MockCallsMatch(call(3.0), call(3.0))) | ||
322 | 134 | self.assertThat( | ||
323 | 135 | queue_mcast_mock, MockCallsMatch( | ||
324 | 136 | # Called when the service starts. | ||
325 | 137 | call(solicitation=True), | ||
326 | 138 | # Called three seconds later. | ||
327 | 139 | call(solicitation=True), | ||
328 | 140 | # Not called again when the service shuts down. | ||
329 | 141 | )) | ||
330 | 81 | 142 | ||
331 | 82 | @inlineCallbacks | 143 | @inlineCallbacks |
332 | 83 | def test_reports_neighbours_to_region(self): | 144 | def test_reports_neighbours_to_region(self): |
333 | @@ -87,7 +148,8 @@ class TestRackNetworksMonitoringService(MAASTestCase): | |||
334 | 87 | self.addCleanup((yield connecting)) | 148 | self.addCleanup((yield connecting)) |
335 | 88 | rpc_service = services.getServiceNamed('rpc') | 149 | rpc_service = services.getServiceNamed('rpc') |
336 | 89 | service = RackNetworksMonitoringService( | 150 | service = RackNetworksMonitoringService( |
338 | 90 | rpc_service, Clock(), enable_monitoring=False) | 151 | rpc_service, Clock(), enable_monitoring=False, |
339 | 152 | enable_beaconing=False) | ||
340 | 91 | neighbours = [{"ip": factory.make_ip_address()}] | 153 | neighbours = [{"ip": factory.make_ip_address()}] |
341 | 92 | yield service.reportNeighbours(neighbours) | 154 | yield service.reportNeighbours(neighbours) |
342 | 93 | self.assertThat( | 155 | self.assertThat( |
343 | @@ -103,7 +165,8 @@ class TestRackNetworksMonitoringService(MAASTestCase): | |||
344 | 103 | self.addCleanup((yield connecting)) | 165 | self.addCleanup((yield connecting)) |
345 | 104 | rpc_service = services.getServiceNamed('rpc') | 166 | rpc_service = services.getServiceNamed('rpc') |
346 | 105 | service = RackNetworksMonitoringService( | 167 | service = RackNetworksMonitoringService( |
348 | 106 | rpc_service, Clock(), enable_monitoring=False) | 168 | rpc_service, Clock(), enable_monitoring=False, |
349 | 169 | enable_beaconing=False) | ||
350 | 107 | mdns = [ | 170 | mdns = [ |
351 | 108 | { | 171 | { |
352 | 109 | 'interface': 'eth0', | 172 | 'interface': 'eth0', |
353 | @@ -126,7 +189,8 @@ class TestRackNetworksMonitoringService(MAASTestCase): | |||
354 | 126 | rpc_service = services.getServiceNamed('rpc') | 189 | rpc_service = services.getServiceNamed('rpc') |
355 | 127 | reactor = Clock() | 190 | reactor = Clock() |
356 | 128 | service = RackNetworksMonitoringService( | 191 | service = RackNetworksMonitoringService( |
358 | 129 | rpc_service, reactor, enable_monitoring=False) | 192 | rpc_service, reactor, enable_monitoring=False, |
359 | 193 | enable_beaconing=False) | ||
360 | 130 | protocol.GetDiscoveryState.return_value = {'interfaces': {}} | 194 | protocol.GetDiscoveryState.return_value = {'interfaces': {}} |
361 | 131 | # Put something in the cache. This tells recordInterfaces that refresh | 195 | # Put something in the cache. This tells recordInterfaces that refresh |
362 | 132 | # has already run but the interfaces have changed thus they need to be | 196 | # has already run but the interfaces have changed thus they need to be |
363 | diff --git a/src/provisioningserver/rpc/region.py b/src/provisioningserver/rpc/region.py | |||
364 | index ab16e4b..58c43ab 100644 | |||
365 | --- a/src/provisioningserver/rpc/region.py | |||
366 | +++ b/src/provisioningserver/rpc/region.py | |||
367 | @@ -460,6 +460,7 @@ class UpdateInterfaces(amp.Command): | |||
368 | 460 | arguments = [ | 460 | arguments = [ |
369 | 461 | (b'system_id', amp.Unicode()), | 461 | (b'system_id', amp.Unicode()), |
370 | 462 | (b'interfaces', StructureAsJSON()), | 462 | (b'interfaces', StructureAsJSON()), |
371 | 463 | (b'topology_hints', StructureAsJSON(optional=True)), | ||
372 | 463 | ] | 464 | ] |
373 | 464 | response = [] | 465 | response = [] |
374 | 465 | errors = [] | 466 | errors = [] |
375 | diff --git a/src/provisioningserver/utils/services.py b/src/provisioningserver/utils/services.py | |||
376 | index 5a43922..cb5c81b 100644 | |||
377 | --- a/src/provisioningserver/utils/services.py | |||
378 | +++ b/src/provisioningserver/utils/services.py | |||
379 | @@ -28,7 +28,6 @@ from provisioningserver.logger import ( | |||
380 | 28 | get_maas_logger, | 28 | get_maas_logger, |
381 | 29 | LegacyLogger, | 29 | LegacyLogger, |
382 | 30 | ) | 30 | ) |
383 | 31 | from provisioningserver.rpc.exceptions import NoConnectionsAvailable | ||
384 | 32 | from provisioningserver.utils.beaconing import ( | 31 | from provisioningserver.utils.beaconing import ( |
385 | 33 | age_out_uuid_queue, | 32 | age_out_uuid_queue, |
386 | 34 | BEACON_IPV4_MULTICAST, | 33 | BEACON_IPV4_MULTICAST, |
387 | @@ -52,7 +51,7 @@ from provisioningserver.utils.shell import select_c_utf8_bytes_locale | |||
388 | 52 | from provisioningserver.utils.twisted import ( | 51 | from provisioningserver.utils.twisted import ( |
389 | 53 | callOut, | 52 | callOut, |
390 | 54 | deferred, | 53 | deferred, |
392 | 55 | suppress, | 54 | pause, |
393 | 56 | terminateProcess, | 55 | terminateProcess, |
394 | 57 | ) | 56 | ) |
395 | 58 | from twisted.application.internet import TimerService | 57 | from twisted.application.internet import TimerService |
396 | @@ -425,9 +424,15 @@ def join_ipv6_beacon_group(sock, ifindex): | |||
397 | 425 | socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) + | 424 | socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) + |
398 | 426 | struct.pack("I", ifindex) | 425 | struct.pack("I", ifindex) |
399 | 427 | ) | 426 | ) |
403 | 428 | sock.setsockopt( | 427 | try: |
404 | 429 | socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, | 428 | sock.setsockopt( |
405 | 430 | ipv6_join_sockopt_args) | 429 | socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, |
406 | 430 | ipv6_join_sockopt_args) | ||
407 | 431 | except OSError: | ||
408 | 432 | # Do this on a best-effort basis. We might get an "Address already in | ||
409 | 433 | # use" error if the group is already joined, or (for whatever reason) | ||
410 | 434 | # it is not possible to join a multicast group using this interface. | ||
411 | 435 | pass | ||
412 | 431 | 436 | ||
413 | 432 | 437 | ||
414 | 433 | def set_ipv6_multicast_loopback(sock, loopback): | 438 | def set_ipv6_multicast_loopback(sock, loopback): |
415 | @@ -472,6 +477,7 @@ class BeaconingSocketProtocol(DatagramProtocol): | |||
416 | 472 | self.topology_hints = OrderedDict() | 477 | self.topology_hints = OrderedDict() |
417 | 473 | self.listen_port = None | 478 | self.listen_port = None |
418 | 474 | self.mcast_requested = False | 479 | self.mcast_requested = False |
419 | 480 | self.mcast_solicitation = False | ||
420 | 475 | self.last_solicited_mcast = 0 | 481 | self.last_solicited_mcast = 0 |
421 | 476 | self._join_multicast_groups() | 482 | self._join_multicast_groups() |
422 | 477 | 483 | ||
423 | @@ -482,8 +488,9 @@ class BeaconingSocketProtocol(DatagramProtocol): | |||
424 | 482 | verifyObject(IReactorMulticast, self.reactor) | 488 | verifyObject(IReactorMulticast, self.reactor) |
425 | 483 | except DoesNotImplement: | 489 | except DoesNotImplement: |
426 | 484 | return | 490 | return |
429 | 485 | self.listen_port = self.reactor.listenMulticast( | 491 | if self.listen_port is None: |
430 | 486 | self.port, self, interface=self.interface, listenMultiple=True) | 492 | self.listen_port = self.reactor.listenMulticast( |
431 | 493 | self.port, self, interface=self.interface, listenMultiple=True) | ||
432 | 487 | sock = self.transport.getHandle() | 494 | sock = self.transport.getHandle() |
433 | 488 | if self.loopback is True: | 495 | if self.loopback is True: |
434 | 489 | # This is only necessary for testing. | 496 | # This is only necessary for testing. |
435 | @@ -512,6 +519,34 @@ class BeaconingSocketProtocol(DatagramProtocol): | |||
436 | 512 | self.interfaces = interfaces | 519 | self.interfaces = interfaces |
437 | 513 | self._join_multicast_groups() | 520 | self._join_multicast_groups() |
438 | 514 | 521 | ||
439 | 522 | def getAllTopologyHints(self): | ||
440 | 523 | """Returns the set of unique topology hints.""" | ||
441 | 524 | # When beaconing runs, hints attached to individual packets might | ||
442 | 525 | # come to the same conclusion about the implied fabric connectivity. | ||
443 | 526 | # Use a set to prevent the region from processing duplicate hints. | ||
444 | 527 | all_hints = set() | ||
445 | 528 | for hints in self.topology_hints.values(): | ||
446 | 529 | all_hints |= hints | ||
447 | 530 | return all_hints | ||
448 | 531 | |||
449 | 532 | def getJSONTopologyHints(self): | ||
450 | 533 | """Returns all topology hints as a list of dictionaries. | ||
451 | 534 | |||
452 | 535 | This method is used for sending data via the RPC layer, so be cautious | ||
453 | 536 | when modifying. In addition, keys with no value are filtered out | ||
454 | 537 | of the resulting dictionary, so that the hints are smaller on the wire. | ||
455 | 538 | """ | ||
456 | 539 | all_hints = self.getAllTopologyHints() | ||
457 | 540 | json_hints = [ | ||
458 | 541 | { | ||
459 | 542 | key: value | ||
460 | 543 | for key, value in hint._asdict().items() | ||
461 | 544 | if value is not None | ||
462 | 545 | } | ||
463 | 546 | for hint in all_hints | ||
464 | 547 | ] | ||
465 | 548 | return json_hints | ||
466 | 549 | |||
467 | 515 | def stopProtocol(self): | 550 | def stopProtocol(self): |
468 | 516 | super().stopProtocol() | 551 | super().stopProtocol() |
469 | 517 | if self.listen_port is not None: | 552 | if self.listen_port is not None: |
470 | @@ -629,27 +664,36 @@ class BeaconingSocketProtocol(DatagramProtocol): | |||
471 | 629 | reply = create_beacon_payload("advertisement", payload) | 664 | reply = create_beacon_payload("advertisement", payload) |
472 | 630 | self.send_beacon(reply, beacon.reply_address) | 665 | self.send_beacon(reply, beacon.reply_address) |
473 | 631 | if len(self.interfaces) > 0: | 666 | if len(self.interfaces) > 0: |
475 | 632 | self.queueMulticastAdvertisement() | 667 | self.queueMulticastBeaconing() |
476 | 633 | 668 | ||
478 | 634 | def dequeueMulticastAdvertisement(self): | 669 | def dequeueMulticastBeaconing(self): |
479 | 635 | """ | 670 | """ |
480 | 636 | Callback to send multicast beacon advertisements. | 671 | Callback to send multicast beacon advertisements. |
481 | 637 | 672 | ||
482 | 638 | See `queueMulticastAdvertisement`, which schedules this method to run. | 673 | See `queueMulticastAdvertisement`, which schedules this method to run. |
483 | 639 | """ | 674 | """ |
484 | 640 | mtime = time.monotonic() | 675 | mtime = time.monotonic() |
485 | 676 | beacon_type = ( | ||
486 | 677 | 'solicitation' if self.mcast_solicitation else 'advertisement') | ||
487 | 678 | log.msg("Sending multicast beacon %ss." % beacon_type) | ||
488 | 679 | self.send_multicast_beacons(self.interfaces, beacon_type) | ||
489 | 641 | self.last_solicited_mcast = mtime | 680 | self.last_solicited_mcast = mtime |
490 | 642 | self.mcast_requested = False | 681 | self.mcast_requested = False |
493 | 643 | log.msg("Sending multicast beacon advertisements.") | 682 | self.mcast_solicitation = False |
492 | 644 | self.send_multicast_beacons(self.interfaces, 'advertisement') | ||
494 | 645 | 683 | ||
496 | 646 | def queueMulticastAdvertisement(self): | 684 | def queueMulticastBeaconing(self, solicitation=False): |
497 | 647 | """ | 685 | """ |
498 | 648 | Requests that multicast advertisements be sent out on every interface. | 686 | Requests that multicast advertisements be sent out on every interface. |
499 | 649 | 687 | ||
500 | 650 | Ensures that advertisements will not be sent more than once every | 688 | Ensures that advertisements will not be sent more than once every |
501 | 651 | five seconds. | 689 | five seconds. |
502 | 690 | |||
503 | 691 | :param solicitation: If true, sends solicitations rather than | ||
504 | 692 | advertisements. Solicitations are used to initiate "full beaconing" | ||
505 | 693 | with peers; advertisements do not generate beacon replies. | ||
506 | 652 | """ | 694 | """ |
507 | 695 | if solicitation is True: | ||
508 | 696 | self.mcast_solicitation = True | ||
509 | 653 | if self.mcast_requested: | 697 | if self.mcast_requested: |
510 | 654 | # A multicast advertisement has been requested already. | 698 | # A multicast advertisement has been requested already. |
511 | 655 | return | 699 | return |
512 | @@ -660,7 +704,7 @@ class BeaconingSocketProtocol(DatagramProtocol): | |||
513 | 660 | else: | 704 | else: |
514 | 661 | timeout = max(mtime - self.last_solicited_mcast, 5) | 705 | timeout = max(mtime - self.last_solicited_mcast, 5) |
515 | 662 | self.mcast_requested = True | 706 | self.mcast_requested = True |
517 | 663 | self.reactor.callLater(timeout, self.dequeueMulticastAdvertisement) | 707 | self.reactor.callLater(timeout, self.dequeueMulticastBeaconing) |
518 | 664 | 708 | ||
519 | 665 | def processTopologyHints(self, rx: ReceivedBeacon): | 709 | def processTopologyHints(self, rx: ReceivedBeacon): |
520 | 666 | """ | 710 | """ |
521 | @@ -692,9 +736,7 @@ class BeaconingSocketProtocol(DatagramProtocol): | |||
522 | 692 | self.topology_hints[rx.uuid] = hints | 736 | self.topology_hints[rx.uuid] = hints |
523 | 693 | # XXX mpontillo 2017-08-07: temporary logging | 737 | # XXX mpontillo 2017-08-07: temporary logging |
524 | 694 | log.msg("New topology hints [%s]:\n%s" % (rx.uuid, pformat(hints))) | 738 | log.msg("New topology hints [%s]:\n%s" % (rx.uuid, pformat(hints))) |
528 | 695 | all_hints = set() | 739 | all_hints = self.getAllTopologyHints() |
526 | 696 | for hints in self.topology_hints.values(): | ||
527 | 697 | all_hints |= hints | ||
529 | 698 | log.msg("Topology hint summary:\n%s" % pformat(all_hints)) | 740 | log.msg("Topology hint summary:\n%s" % pformat(all_hints)) |
530 | 699 | 741 | ||
531 | 700 | def _add_remote_fabric_hints(self, hints, remote_ifinfo, rx): | 742 | def _add_remote_fabric_hints(self, hints, remote_ifinfo, rx): |
532 | @@ -905,7 +947,8 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): | |||
533 | 905 | 947 | ||
534 | 906 | interval = timedelta(seconds=30).total_seconds() | 948 | interval = timedelta(seconds=30).total_seconds() |
535 | 907 | 949 | ||
537 | 908 | def __init__(self, clock=None, enable_monitoring=True): | 950 | def __init__( |
538 | 951 | self, clock=None, enable_monitoring=True, enable_beaconing=True): | ||
539 | 909 | # Order is very important here. First we set the clock to the passed-in | 952 | # Order is very important here. First we set the clock to the passed-in |
540 | 910 | # reactor, so that unit tests can fake out the clock if necessary. | 953 | # reactor, so that unit tests can fake out the clock if necessary. |
541 | 911 | # Then we call super(). The superclass will set up the structures | 954 | # Then we call super(). The superclass will set up the structures |
542 | @@ -916,9 +959,11 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): | |||
543 | 916 | self.clock = clock | 959 | self.clock = clock |
544 | 917 | super().__init__() | 960 | super().__init__() |
545 | 918 | self.enable_monitoring = enable_monitoring | 961 | self.enable_monitoring = enable_monitoring |
546 | 962 | self.enable_beaconing = enable_beaconing | ||
547 | 919 | # The last successfully recorded interfaces. | 963 | # The last successfully recorded interfaces. |
548 | 920 | self._recorded = None | 964 | self._recorded = None |
549 | 921 | self._monitored = frozenset() | 965 | self._monitored = frozenset() |
550 | 966 | self._beaconing = frozenset() | ||
551 | 922 | self._monitoring_state = {} | 967 | self._monitoring_state = {} |
552 | 923 | self._monitoring_mdns = False | 968 | self._monitoring_mdns = False |
553 | 924 | self._locked = False | 969 | self._locked = False |
554 | @@ -935,33 +980,25 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): | |||
555 | 935 | self.interface_monitor.setServiceParent(self) | 980 | self.interface_monitor.setServiceParent(self) |
556 | 936 | self.beaconing_protocol = None | 981 | self.beaconing_protocol = None |
557 | 937 | 982 | ||
558 | 983 | @inlineCallbacks | ||
559 | 938 | def updateInterfaces(self): | 984 | def updateInterfaces(self): |
560 | 939 | """Update interfaces, catching and logging errors. | 985 | """Update interfaces, catching and logging errors. |
561 | 940 | 986 | ||
562 | 941 | This can be overridden by subclasses to conditionally update based on | 987 | This can be overridden by subclasses to conditionally update based on |
563 | 942 | some external configuration. | 988 | some external configuration. |
564 | 943 | """ | 989 | """ |
586 | 944 | d = maybeDeferred(self._assumeSoleResponsibility) | 990 | responsible = self._assumeSoleResponsibility() |
587 | 945 | 991 | if responsible: | |
588 | 946 | def update(responsible): | 992 | interfaces = None |
589 | 947 | if responsible: | 993 | try: |
590 | 948 | d = maybeDeferred(self.getInterfaces) | 994 | interfaces = yield maybeDeferred(self.getInterfaces) |
591 | 949 | d.addCallback(self._updateInterfaces) | 995 | yield self._updateInterfaces(interfaces) |
592 | 950 | return d | 996 | except BaseException as e: |
593 | 951 | 997 | msg = ( | |
594 | 952 | def failed(failure): | 998 | "Failed to update and/or record network interface " |
595 | 953 | log.err( | 999 | "configuration: %s; interfaces: %r" % (e, interfaces) |
596 | 954 | failure, | 1000 | ) |
597 | 955 | "Failed to update and/or record network interface " | 1001 | log.err(None, msg) |
577 | 956 | "configuration: %s" % failure.getErrorMessage()) | ||
578 | 957 | |||
579 | 958 | d = d.addCallback(update) | ||
580 | 959 | # During the update, we might fail to get the interface monitoring | ||
581 | 960 | # state from the region. We can safely ignore this, as it will be | ||
582 | 961 | # retried shortly. | ||
583 | 962 | d.addErrback(suppress, NoConnectionsAvailable) | ||
584 | 963 | d.addErrback(failed) | ||
585 | 964 | return d | ||
598 | 965 | 1002 | ||
599 | 966 | def getInterfaces(self): | 1003 | def getInterfaces(self): |
600 | 967 | """Get the current network interfaces configuration. | 1004 | """Get the current network interfaces configuration. |
601 | @@ -978,7 +1015,7 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): | |||
602 | 978 | """ | 1015 | """ |
603 | 979 | 1016 | ||
604 | 980 | @abstractmethod | 1017 | @abstractmethod |
606 | 981 | def recordInterfaces(self, interfaces): | 1018 | def recordInterfaces(self, interfaces, hints=None): |
607 | 982 | """Record the interfaces information. | 1019 | """Record the interfaces information. |
608 | 983 | 1020 | ||
609 | 984 | This MUST be overridden in subclasses. | 1021 | This MUST be overridden in subclasses. |
610 | @@ -1050,21 +1087,48 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): | |||
611 | 1050 | # If we were monitoring neighbours on any interfaces, we need to | 1087 | # If we were monitoring neighbours on any interfaces, we need to |
612 | 1051 | # stop the monitoring services. | 1088 | # stop the monitoring services. |
613 | 1052 | self._configureNetworkDiscovery({}) | 1089 | self._configureNetworkDiscovery({}) |
614 | 1090 | if self.beaconing_protocol is not None: | ||
615 | 1091 | self._configureBeaconing({}) | ||
616 | 1053 | 1092 | ||
617 | 1093 | @inlineCallbacks | ||
618 | 1054 | def _updateInterfaces(self, interfaces): | 1094 | def _updateInterfaces(self, interfaces): |
619 | 1055 | """Record `interfaces` if they've changed.""" | 1095 | """Record `interfaces` if they've changed.""" |
620 | 1056 | if interfaces != self._recorded: | 1096 | if interfaces != self._recorded: |
622 | 1057 | d = maybeDeferred(self.recordInterfaces, interfaces) | 1097 | hints = None |
623 | 1098 | if self.enable_beaconing: | ||
624 | 1099 | self._configureBeaconing(interfaces) | ||
625 | 1100 | # Wait for beaconing to do its thing. | ||
626 | 1101 | yield pause(3.0) | ||
627 | 1102 | # Retry beacon soliciations, in case any packet loss occurred | ||
628 | 1103 | # the first time. | ||
629 | 1104 | self.beaconing_protocol.queueMulticastBeaconing( | ||
630 | 1105 | solicitation=True) | ||
631 | 1106 | yield pause(3.0) | ||
632 | 1107 | hints = self.beaconing_protocol.getJSONTopologyHints() | ||
633 | 1108 | yield maybeDeferred(self.recordInterfaces, interfaces, hints) | ||
634 | 1058 | # Note: _interfacesRecorded() will reconfigure discovery after | 1109 | # Note: _interfacesRecorded() will reconfigure discovery after |
635 | 1059 | # recording the interfaces, so there is no need to call | 1110 | # recording the interfaces, so there is no need to call |
636 | 1060 | # _configureNetworkDiscovery() here. | 1111 | # _configureNetworkDiscovery() here. |
639 | 1061 | d.addCallback(callOut, self._interfacesRecorded, interfaces) | 1112 | self._interfacesRecorded(interfaces) |
638 | 1062 | return d | ||
640 | 1063 | else: | 1113 | else: |
641 | 1064 | # If the interfaces didn't change, we still need to poll for | 1114 | # If the interfaces didn't change, we still need to poll for |
642 | 1065 | # monitoring state changes. | 1115 | # monitoring state changes. |
645 | 1066 | d = maybeDeferred(self._configureNetworkDiscovery, interfaces) | 1116 | yield maybeDeferred(self._configureNetworkDiscovery, interfaces) |
646 | 1067 | return d | 1117 | |
647 | 1118 | def _getInterfacesForBeaconing(self, interfaces: dict): | ||
648 | 1119 | """Return the interfaces which will be used for beaconing. | ||
649 | 1120 | |||
650 | 1121 | :return: The set of interface names to run beaconing on. | ||
651 | 1122 | """ | ||
652 | 1123 | # Don't beacon when running the test suite/dev env. | ||
653 | 1124 | # In addition, if we don't own the lock, we should not be beaconing. | ||
654 | 1125 | if is_dev_environment() or not self._locked or interfaces is None: | ||
655 | 1126 | return set() | ||
656 | 1127 | monitored_interfaces = { | ||
657 | 1128 | ifname for ifname, ifdata in interfaces.items() | ||
658 | 1129 | if ifdata['monitored'] is True | ||
659 | 1130 | } | ||
660 | 1131 | return monitored_interfaces | ||
661 | 1068 | 1132 | ||
662 | 1069 | def _getInterfacesForNeighbourDiscovery( | 1133 | def _getInterfacesForNeighbourDiscovery( |
663 | 1070 | self, interfaces: dict, monitoring_state: dict): | 1134 | self, interfaces: dict, monitoring_state: dict): |
664 | @@ -1227,6 +1291,32 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): | |||
665 | 1227 | # doesn't matter for mDNS discovery purposes.) | 1291 | # doesn't matter for mDNS discovery purposes.) |
666 | 1228 | pass | 1292 | pass |
667 | 1229 | 1293 | ||
668 | 1294 | def _configureBeaconing(self, interfaces): | ||
669 | 1295 | beaconing_interfaces = self._getInterfacesForBeaconing(interfaces) | ||
670 | 1296 | # Calculate the difference between the sets. We need to know which | ||
671 | 1297 | # interfaces were added and deleted (with respect to the interfaces we | ||
672 | 1298 | # were already beaconing on). | ||
673 | 1299 | new_interfaces = beaconing_interfaces.difference(self._beaconing) | ||
674 | 1300 | deleted_interfaces = self._beaconing.difference(beaconing_interfaces) | ||
675 | 1301 | if len(new_interfaces) > 0: | ||
676 | 1302 | log.msg("Starting beaconing for interfaces: %r" % (new_interfaces)) | ||
677 | 1303 | self._startBeaconingServices(new_interfaces) | ||
678 | 1304 | if len(deleted_interfaces) > 0: | ||
679 | 1305 | log.msg( | ||
680 | 1306 | "Stopping beaconing for interfaces: %r" % (deleted_interfaces)) | ||
681 | 1307 | self._stopBeaconingServices(deleted_interfaces) | ||
682 | 1308 | self._beaconing = beaconing_interfaces | ||
683 | 1309 | if self.beaconing_protocol is None: | ||
684 | 1310 | self.beaconing_protocol = BeaconingSocketProtocol( | ||
685 | 1311 | self.clock, interfaces=interfaces) | ||
686 | 1312 | else: | ||
687 | 1313 | self.beaconing_protocol.updateInterfaces(interfaces) | ||
688 | 1314 | # If the interfaces have changed, perform beaconing again. | ||
689 | 1315 | # An empty dictionary will be passed in when the service stops, so | ||
690 | 1316 | # don't bother sending out beacons we won't reply to. | ||
691 | 1317 | if len(interfaces) > 0: | ||
692 | 1318 | self.beaconing_protocol.queueMulticastBeaconing(solicitation=True) | ||
693 | 1319 | |||
694 | 1230 | def _configureNeighbourDiscovery(self, interfaces, monitoring_state): | 1320 | def _configureNeighbourDiscovery(self, interfaces, monitoring_state): |
695 | 1231 | monitored_interfaces = self._getInterfacesForNeighbourDiscovery( | 1321 | monitored_interfaces = self._getInterfacesForNeighbourDiscovery( |
696 | 1232 | interfaces, monitoring_state) | 1322 | interfaces, monitoring_state) |
697 | @@ -1239,26 +1329,15 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): | |||
698 | 1239 | log.msg("Starting neighbour discovery for interfaces: %r" % ( | 1329 | log.msg("Starting neighbour discovery for interfaces: %r" % ( |
699 | 1240 | new_interfaces)) | 1330 | new_interfaces)) |
700 | 1241 | self._startNeighbourDiscoveryServices(new_interfaces) | 1331 | self._startNeighbourDiscoveryServices(new_interfaces) |
701 | 1242 | # XXX mpontillo 2017-07-12: for testing, just start beaconing | ||
702 | 1243 | # services on all the interfaces enabled for active discovery. | ||
703 | 1244 | self._startBeaconingServices(new_interfaces) | ||
704 | 1245 | if len(deleted_interfaces) > 0: | 1332 | if len(deleted_interfaces) > 0: |
705 | 1246 | log.msg( | 1333 | log.msg( |
706 | 1247 | "Stopping neighbour discovery for interfaces: %r" % ( | 1334 | "Stopping neighbour discovery for interfaces: %r" % ( |
707 | 1248 | deleted_interfaces)) | 1335 | deleted_interfaces)) |
708 | 1249 | self._stopNeighbourDiscoveryServices(deleted_interfaces) | 1336 | self._stopNeighbourDiscoveryServices(deleted_interfaces) |
709 | 1250 | # XXX mpontillo 2017-07-12: this should be separately configured. | ||
710 | 1251 | # (see similar comment in the 'start' path above.) | ||
711 | 1252 | self._stopBeaconingServices(deleted_interfaces) | ||
712 | 1253 | self._monitored = monitored_interfaces | 1337 | self._monitored = monitored_interfaces |
713 | 1254 | 1338 | ||
714 | 1255 | def _interfacesRecorded(self, interfaces): | 1339 | def _interfacesRecorded(self, interfaces): |
715 | 1256 | """The given `interfaces` were recorded successfully.""" | 1340 | """The given `interfaces` were recorded successfully.""" |
716 | 1257 | self._recorded = interfaces | 1341 | self._recorded = interfaces |
717 | 1258 | if self.beaconing_protocol is None: | ||
718 | 1259 | self.beaconing_protocol = BeaconingSocketProtocol( | ||
719 | 1260 | self.clock, interfaces=interfaces) | ||
720 | 1261 | else: | ||
721 | 1262 | self.beaconing_protocol.updateInterfaces(interfaces) | ||
722 | 1263 | if self.enable_monitoring is True: | 1342 | if self.enable_monitoring is True: |
723 | 1264 | self._configureNetworkDiscovery(interfaces) | 1343 | self._configureNetworkDiscovery(interfaces) |
724 | diff --git a/src/provisioningserver/utils/tests/test_services.py b/src/provisioningserver/utils/tests/test_services.py | |||
725 | index acfc60e..36090ea 100644 | |||
726 | --- a/src/provisioningserver/utils/tests/test_services.py | |||
727 | +++ b/src/provisioningserver/utils/tests/test_services.py | |||
728 | @@ -76,8 +76,12 @@ from twisted.python.failure import Failure | |||
729 | 76 | class StubNetworksMonitoringService(NetworksMonitoringService): | 76 | class StubNetworksMonitoringService(NetworksMonitoringService): |
730 | 77 | """Concrete subclass for testing.""" | 77 | """Concrete subclass for testing.""" |
731 | 78 | 78 | ||
734 | 79 | def __init__(self, enable_monitoring=False, *args, **kwargs): | 79 | def __init__( |
735 | 80 | super().__init__(enable_monitoring=enable_monitoring, *args, **kwargs) | 80 | self, enable_monitoring=False, enable_beaconing=False, |
736 | 81 | *args, **kwargs): | ||
737 | 82 | super().__init__( | ||
738 | 83 | *args, enable_monitoring=enable_monitoring, | ||
739 | 84 | enable_beaconing=enable_beaconing, **kwargs) | ||
740 | 81 | self.iterations = DeferredQueue() | 85 | self.iterations = DeferredQueue() |
741 | 82 | self.interfaces = [] | 86 | self.interfaces = [] |
742 | 83 | self.update_interface__calls = 0 | 87 | self.update_interface__calls = 0 |
743 | @@ -91,7 +95,7 @@ class StubNetworksMonitoringService(NetworksMonitoringService): | |||
744 | 91 | d.addBoth(self.iterations.put) | 95 | d.addBoth(self.iterations.put) |
745 | 92 | return d | 96 | return d |
746 | 93 | 97 | ||
748 | 94 | def recordInterfaces(self, interfaces): | 98 | def recordInterfaces(self, interfaces, hints=None): |
749 | 95 | self.interfaces.append(interfaces) | 99 | self.interfaces.append(interfaces) |
750 | 96 | 100 | ||
751 | 97 | def reportNeighbours(self, neighbours): | 101 | def reportNeighbours(self, neighbours): |
752 | @@ -221,13 +225,13 @@ class TestNetworksMonitoringService(MAASTestCase): | |||
753 | 221 | # recordInterfaces is called the first time, as expected. | 225 | # recordInterfaces is called the first time, as expected. |
754 | 222 | recordInterfaces.reset_mock() | 226 | recordInterfaces.reset_mock() |
755 | 223 | yield service.updateInterfaces() | 227 | yield service.updateInterfaces() |
757 | 224 | self.assertThat(recordInterfaces, MockCalledOnceWith({})) | 228 | self.assertThat(recordInterfaces, MockCalledOnceWith({}, None)) |
758 | 225 | 229 | ||
759 | 226 | # recordInterfaces is called the second time too; the service noted | 230 | # recordInterfaces is called the second time too; the service noted |
760 | 227 | # that it crashed last time and knew to run it again. | 231 | # that it crashed last time and knew to run it again. |
761 | 228 | recordInterfaces.reset_mock() | 232 | recordInterfaces.reset_mock() |
762 | 229 | yield service.updateInterfaces() | 233 | yield service.updateInterfaces() |
764 | 230 | self.assertThat(recordInterfaces, MockCalledOnceWith({})) | 234 | self.assertThat(recordInterfaces, MockCalledOnceWith({}, None)) |
765 | 231 | 235 | ||
766 | 232 | # recordInterfaces is NOT called the third time; the service noted | 236 | # recordInterfaces is NOT called the third time; the service noted |
767 | 233 | # that the configuration had not changed. | 237 | # that the configuration had not changed. |
768 | @@ -1004,3 +1008,128 @@ class TestBeaconingSocketProtocol(SharedSecretTestCase): | |||
769 | 1004 | } | 1008 | } |
770 | 1005 | self.assertThat(hints, Equals(expected_hints)) | 1009 | self.assertThat(hints, Equals(expected_hints)) |
771 | 1006 | yield protocol.stopProtocol() | 1010 | yield protocol.stopProtocol() |
772 | 1011 | |||
773 | 1012 | @inlineCallbacks | ||
774 | 1013 | def test__getJSONTopologyHints_converts_hints_to_dictionary(self): | ||
775 | 1014 | # Note: Always use a random port for testing. (port=0) | ||
776 | 1015 | protocol = BeaconingSocketProtocol( | ||
777 | 1016 | reactor, port=0, process_incoming=False, loopback=True, | ||
778 | 1017 | interface="::", debug=True) | ||
779 | 1018 | # Don't try to send out any replies. | ||
780 | 1019 | self.patch(services, 'create_beacon_payload') | ||
781 | 1020 | self.patch(protocol, 'send_beacon') | ||
782 | 1021 | # Need to generate a real UUID with the current time, so it doesn't | ||
783 | 1022 | # get aged out. | ||
784 | 1023 | uuid = str(uuid1()) | ||
785 | 1024 | # Make the protocol think we sent a beacon with this UUID already. | ||
786 | 1025 | tx_mac = factory.make_mac_address() | ||
787 | 1026 | fake_tx_beacon = FakeBeaconPayload( | ||
788 | 1027 | uuid, ifname='eth1', mac=tx_mac, vid=100) | ||
789 | 1028 | fake_rx_beacon = { | ||
790 | 1029 | "source_ip": "127.0.0.1", | ||
791 | 1030 | "source_port": 5240, | ||
792 | 1031 | "destination_ip": "224.0.0.118", | ||
793 | 1032 | "interface": "eth0", | ||
794 | 1033 | "type": "solicitation", | ||
795 | 1034 | "payload": fake_tx_beacon.payload | ||
796 | 1035 | } | ||
797 | 1036 | protocol.beaconReceived(fake_rx_beacon) | ||
798 | 1037 | all_hints = protocol.getJSONTopologyHints() | ||
799 | 1038 | expected_hints = [ | ||
800 | 1039 | # Note: since vid=None on the received beacon, we expect that | ||
801 | 1040 | # the hint won't have a 'vid' field. | ||
802 | 1041 | dict( | ||
803 | 1042 | ifname='eth0', hint="on_remote_network", | ||
804 | 1043 | related_ifname='eth1', related_vid=100, | ||
805 | 1044 | related_mac=tx_mac), | ||
806 | 1045 | ] | ||
807 | 1046 | self.assertThat(all_hints, Equals(expected_hints)) | ||
808 | 1047 | yield protocol.stopProtocol() | ||
809 | 1048 | |||
810 | 1049 | @inlineCallbacks | ||
811 | 1050 | def test__queues_multicast_beacon_soliciations_upon_request(self): | ||
812 | 1051 | # Note: Always use a random port for testing. (port=0) | ||
813 | 1052 | clock = Clock() | ||
814 | 1053 | protocol = BeaconingSocketProtocol( | ||
815 | 1054 | clock, port=0, process_incoming=False, loopback=True, | ||
816 | 1055 | interface="::", debug=True) | ||
817 | 1056 | # Don't try to send out any replies. | ||
818 | 1057 | self.patch(services, 'create_beacon_payload') | ||
819 | 1058 | send_mcast_mock = self.patch(protocol, 'send_multicast_beacons') | ||
820 | 1059 | self.patch(protocol, 'send_beacon') | ||
821 | 1060 | yield protocol.queueMulticastBeaconing(solicitation=True) | ||
822 | 1061 | clock.advance(0) | ||
823 | 1062 | self.assertThat( | ||
824 | 1063 | send_mcast_mock, MockCalledOnceWith({}, 'solicitation')) | ||
825 | 1064 | |||
826 | 1065 | @inlineCallbacks | ||
827 | 1066 | def test__multicasts_at_most_once_per_five_seconds(self): | ||
828 | 1067 | # Note: Always use a random port for testing. (port=0) | ||
829 | 1068 | clock = Clock() | ||
830 | 1069 | protocol = BeaconingSocketProtocol( | ||
831 | 1070 | clock, port=0, process_incoming=False, loopback=True, | ||
832 | 1071 | interface="::", debug=True) | ||
833 | 1072 | # Don't try to send out any replies. | ||
834 | 1073 | self.patch(services, 'create_beacon_payload') | ||
835 | 1074 | monotonic_mock = self.patch(services.time, 'monotonic') | ||
836 | 1075 | send_mcast_mock = self.patch(protocol, 'send_multicast_beacons') | ||
837 | 1076 | self.patch(protocol, 'send_beacon') | ||
838 | 1077 | monotonic_mock.side_effect = [ | ||
839 | 1078 | # Initial queue | ||
840 | 1079 | 6, | ||
841 | 1080 | # Initial dequeue | ||
842 | 1081 | 6, | ||
843 | 1082 | # Second queue (hasn't yet been 5 seconds) | ||
844 | 1083 | 10, | ||
845 | 1084 | # Third queue | ||
846 | 1085 | 11, | ||
847 | 1086 | # Second dequeue | ||
848 | 1087 | 11, | ||
849 | 1088 | ] | ||
850 | 1089 | yield protocol.queueMulticastBeaconing() | ||
851 | 1090 | clock.advance(0) | ||
852 | 1091 | self.assertThat( | ||
853 | 1092 | send_mcast_mock, MockCalledOnceWith({}, 'advertisement')) | ||
854 | 1093 | send_mcast_mock.reset_mock() | ||
855 | 1094 | yield protocol.queueMulticastBeaconing() | ||
856 | 1095 | yield protocol.queueMulticastBeaconing(solicitation=True) | ||
857 | 1096 | clock.advance(4.9) | ||
858 | 1097 | self.assertThat( | ||
859 | 1098 | send_mcast_mock, MockNotCalled()) | ||
860 | 1099 | clock.advance(0.1) | ||
861 | 1100 | self.assertThat( | ||
862 | 1101 | send_mcast_mock, MockCalledOnceWith({}, 'solicitation')) | ||
863 | 1102 | |||
864 | 1103 | @inlineCallbacks | ||
865 | 1104 | def test__multiple_beacon_requests_coalesced(self): | ||
866 | 1105 | # Note: Always use a random port for testing. (port=0) | ||
867 | 1106 | clock = Clock() | ||
868 | 1107 | protocol = BeaconingSocketProtocol( | ||
869 | 1108 | clock, port=0, process_incoming=False, loopback=True, | ||
870 | 1109 | interface="::", debug=True) | ||
871 | 1110 | # Don't try to send out any replies. | ||
872 | 1111 | self.patch(services, 'create_beacon_payload') | ||
873 | 1112 | send_mcast_mock = self.patch(protocol, 'send_multicast_beacons') | ||
874 | 1113 | self.patch(protocol, 'send_beacon') | ||
875 | 1114 | yield protocol.queueMulticastBeaconing() | ||
876 | 1115 | yield protocol.queueMulticastBeaconing() | ||
877 | 1116 | clock.advance(5) | ||
878 | 1117 | self.assertThat( | ||
879 | 1118 | send_mcast_mock, MockCalledOnceWith({}, 'advertisement')) | ||
880 | 1119 | |||
881 | 1120 | @inlineCallbacks | ||
882 | 1121 | def test__solicitation_wins_when_multiple_requests_queued(self): | ||
883 | 1122 | # Note: Always use a random port for testing. (port=0) | ||
884 | 1123 | clock = Clock() | ||
885 | 1124 | protocol = BeaconingSocketProtocol( | ||
886 | 1125 | clock, port=0, process_incoming=False, loopback=True, | ||
887 | 1126 | interface="::", debug=True) | ||
888 | 1127 | # Don't try to send out any replies. | ||
889 | 1128 | self.patch(services, 'create_beacon_payload') | ||
890 | 1129 | send_mcast_mock = self.patch(protocol, 'send_multicast_beacons') | ||
891 | 1130 | self.patch(protocol, 'send_beacon') | ||
892 | 1131 | yield protocol.queueMulticastBeaconing() | ||
893 | 1132 | yield protocol.queueMulticastBeaconing(solicitation=True) | ||
894 | 1133 | clock.advance(5) | ||
895 | 1134 | self.assertThat( | ||
896 | 1135 | send_mcast_mock, MockCalledOnceWith({}, 'solicitation')) |
Looks good. Just one question that I am not going to block you on. But if its the case that a region controller will never run beaconing? The question is why? Seems like it should.