Merge ~mpontillo/maas:beaconing--use-hints-for-interface-update into maas:master
- Git
- lp:~mpontillo/maas
- beaconing--use-hints-for-interface-update
- Merge into master
Status: | Merged |
---|---|
Approved by: | Mike Pontillo |
Approved revision: | b10bf14b5bdbfb3bb4348891079f3c175e72e43b |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~mpontillo/maas:beaconing--use-hints-for-interface-update |
Merge into: | maas:master |
Diff against target: |
896 lines (+381/-97) 13 files modified
.gitignore (+1/-1) src/maasserver/models/node.py (+2/-1) src/maasserver/regiondservices/networks_monitoring.py (+5/-4) src/maasserver/regiondservices/tests/test_networks_monitoring.py (+2/-1) src/maasserver/rpc/rackcontrollers.py (+2/-2) src/maasserver/rpc/regionservice.py (+7/-4) src/maasserver/rpc/tests/test_rackcontrollers.py (+1/-1) src/maasserver/rpc/tests/test_regionservice_calls.py (+2/-1) src/provisioningserver/rackdservices/networks_monitoring_service.py (+16/-12) src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py (+74/-10) src/provisioningserver/rpc/region.py (+1/-0) src/provisioningserver/utils/services.py (+134/-55) src/provisioningserver/utils/tests/test_services.py (+134/-5) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Blake Rouse (community) | Approve | ||
Review via email: mp+328815@code.launchpad.net |
Commit message
Send topology hints to update_interfaces()
* Add code to convert topology hints to a compact JSON format.
* Start beaconing as soon as interfaces are known; don't
wait for interface monitoring settings. This means
beaconing will happen on all interfaces, all the time.
(Without regard for network discovery settings.)
* Fix bug that caused update_interfaces() to be called
with create_fabrics=True when it should have been False.
* Wait for beaconing to complete before updating interfaces.
* Drive-by fix for incorrect line in .gitignore which
erroneously covered django16_
Description of the change
Mike Pontillo (mpontillo) wrote : | # |
Thanks for the review. The NetworksMonitor
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b beaconing-
STATUS: FAILED BUILD
LOG: http://
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b beaconing-
STATUS: FAILED BUILD
LOG: http://
Mike Pontillo (mpontillo) wrote : | # |
Fixed merge conflict with the .gitignore file, which slipped in when another branch landed.
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b beaconing-
STATUS: FAILED BUILD
LOG: http://
Preview Diff
1 | diff --git a/.gitignore b/.gitignore |
2 | index 20d7b98..714a000 100644 |
3 | --- a/.gitignore |
4 | +++ b/.gitignore |
5 | @@ -19,6 +19,7 @@ |
6 | /.noseids |
7 | /.run |
8 | /.run-e2e |
9 | +/*.orig.tar.gz |
10 | /bin |
11 | /build |
12 | /build_pkg |
13 | @@ -57,4 +58,3 @@ __pycache__ |
14 | *~ |
15 | \#*\# |
16 | .#* |
17 | -*.tar.gz |
18 | diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py |
19 | index 127669b..389f84c 100644 |
20 | --- a/src/maasserver/models/node.py |
21 | +++ b/src/maasserver/models/node.py |
22 | @@ -4657,7 +4657,8 @@ class Controller(Node): |
23 | @with_connection |
24 | @synchronised(locks.rack_registration) |
25 | @transactional |
26 | - def update_interfaces(self, interfaces, create_fabrics=True): |
27 | + def update_interfaces( |
28 | + self, interfaces, topology_hints=None, create_fabrics=True): |
29 | """Update the interfaces attached to the controller. |
30 | |
31 | :param interfaces: Interfaces dictionary that was parsed from |
32 | diff --git a/src/maasserver/regiondservices/networks_monitoring.py b/src/maasserver/regiondservices/networks_monitoring.py |
33 | index 9427bef..9cf313a 100644 |
34 | --- a/src/maasserver/regiondservices/networks_monitoring.py |
35 | +++ b/src/maasserver/regiondservices/networks_monitoring.py |
36 | @@ -23,9 +23,10 @@ class RegionNetworksMonitoringService(NetworksMonitoringService): |
37 | """Get interface monitoring state from the region.""" |
38 | return deferToDatabase(self.getInterfaceMonitoringStateFromDatabase) |
39 | |
40 | - def recordInterfaces(self, interfaces): |
41 | + def recordInterfaces(self, interfaces, hints=None): |
42 | """Record the interfaces information.""" |
43 | - return deferToDatabase(self.recordInterfacesIntoDatabase, interfaces) |
44 | + return deferToDatabase( |
45 | + self.recordInterfacesIntoDatabase, interfaces, hints) |
46 | |
47 | def reportNeighbours(self, neighbours): |
48 | """Record the specified list of neighbours.""" |
49 | @@ -42,10 +43,10 @@ class RegionNetworksMonitoringService(NetworksMonitoringService): |
50 | return region_controller.get_discovery_state() |
51 | |
52 | @transactional |
53 | - def recordInterfacesIntoDatabase(self, interfaces): |
54 | + def recordInterfacesIntoDatabase(self, interfaces, hints): |
55 | """Record the interfaces information.""" |
56 | region_controller = RegionController.objects.get_running_controller() |
57 | - region_controller.update_interfaces(interfaces) |
58 | + region_controller.update_interfaces(interfaces, hints) |
59 | |
60 | @transactional |
61 | def recordNeighboursIntoDatabase(self, neighbours): |
62 | diff --git a/src/maasserver/regiondservices/tests/test_networks_monitoring.py b/src/maasserver/regiondservices/tests/test_networks_monitoring.py |
63 | index 62933ba..e49e34b 100644 |
64 | --- a/src/maasserver/regiondservices/tests/test_networks_monitoring.py |
65 | +++ b/src/maasserver/regiondservices/tests/test_networks_monitoring.py |
66 | @@ -52,7 +52,8 @@ class TestRegionNetworksMonitoringService(MAASTransactionServerTestCase): |
67 | } |
68 | } |
69 | |
70 | - service = RegionNetworksMonitoringService(reactor) |
71 | + service = RegionNetworksMonitoringService( |
72 | + reactor, enable_beaconing=False) |
73 | service.getInterfaces = lambda: succeed(interfaces) |
74 | |
75 | with FakeLogger("maas") as logger: |
76 | diff --git a/src/maasserver/rpc/rackcontrollers.py b/src/maasserver/rpc/rackcontrollers.py |
77 | index d7ed7e7..d80477c 100644 |
78 | --- a/src/maasserver/rpc/rackcontrollers.py |
79 | +++ b/src/maasserver/rpc/rackcontrollers.py |
80 | @@ -222,10 +222,10 @@ def update_foreign_dhcp(system_id, interface_name, dhcp_ip=None): |
81 | |
82 | @synchronous |
83 | @transactional |
84 | -def update_interfaces(system_id, interfaces): |
85 | +def update_interfaces(system_id, interfaces, topology_hints=None): |
86 | """Update the interface definition on the rack controller.""" |
87 | rack_controller = RackController.objects.get(system_id=system_id) |
88 | - rack_controller.update_interfaces(interfaces) |
89 | + rack_controller.update_interfaces(interfaces, topology_hints) |
90 | |
91 | |
92 | @synchronous |
93 | diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py |
94 | index c6fc9d9..340a1be 100644 |
95 | --- a/src/maasserver/rpc/regionservice.py |
96 | +++ b/src/maasserver/rpc/regionservice.py |
97 | @@ -404,14 +404,15 @@ class Region(RPCProtocol): |
98 | return d |
99 | |
100 | @region.UpdateInterfaces.responder |
101 | - def update_interfaces(self, system_id, interfaces): |
102 | + def update_interfaces(self, system_id, interfaces, topology_hints=None): |
103 | """update_interfaces() |
104 | |
105 | Implementation of |
106 | :py:class:`~provisioningserver.rpc.region.UpdateInterfaces`. |
107 | """ |
108 | d = deferToDatabase( |
109 | - rackcontrollers.update_interfaces, system_id, interfaces) |
110 | + rackcontrollers.update_interfaces, system_id, interfaces, |
111 | + topology_hints=topology_hints) |
112 | d.addCallback(lambda args: {}) |
113 | return d |
114 | |
115 | @@ -616,8 +617,10 @@ class RegionServer(Region): |
116 | def register( |
117 | self, system_id, hostname, interfaces, url, nodegroup_uuid=None, |
118 | beacon_support=False, version=None): |
119 | - # If beacons is None, register in legacy mode. |
120 | - create_fabrics = True if beacon_support else False |
121 | + # Hold off on fabric creation if the remote controller |
122 | + # supports beacons; it will happen later when UpdateInterfaces is |
123 | + # called. |
124 | + create_fabrics = False if beacon_support else True |
125 | result = yield self._register( |
126 | system_id, hostname, interfaces, url, |
127 | nodegroup_uuid=nodegroup_uuid, create_fabrics=create_fabrics, |
128 | diff --git a/src/maasserver/rpc/tests/test_rackcontrollers.py b/src/maasserver/rpc/tests/test_rackcontrollers.py |
129 | index 1f40522..5392b32 100644 |
130 | --- a/src/maasserver/rpc/tests/test_rackcontrollers.py |
131 | +++ b/src/maasserver/rpc/tests/test_rackcontrollers.py |
132 | @@ -479,7 +479,7 @@ class TestUpdateInterfaces(MAASServerTestCase): |
133 | update_interfaces(rack_controller.system_id, sentinel.interfaces) |
134 | self.assertThat( |
135 | patched_update_interfaces, |
136 | - MockCalledOnceWith(sentinel.interfaces)) |
137 | + MockCalledOnceWith(sentinel.interfaces, None)) |
138 | |
139 | |
140 | class TestReportNeighbours(MAASServerTestCase): |
141 | diff --git a/src/maasserver/rpc/tests/test_regionservice_calls.py b/src/maasserver/rpc/tests/test_regionservice_calls.py |
142 | index 1d1d497..209ee44 100644 |
143 | --- a/src/maasserver/rpc/tests/test_regionservice_calls.py |
144 | +++ b/src/maasserver/rpc/tests/test_regionservice_calls.py |
145 | @@ -1182,7 +1182,8 @@ class TestRegionProtocol_UpdateInterfaces(MAASTransactionServerTestCase): |
146 | self.assertThat( |
147 | update_interfaces, |
148 | MockCalledOnceWith( |
149 | - params['system_id'], params['interfaces'])) |
150 | + params['system_id'], params['interfaces'], |
151 | + topology_hints=None)) |
152 | |
153 | |
154 | class TestRegionProtocol_ReportNeighbours(MAASTestCase): |
155 | diff --git a/src/provisioningserver/rackdservices/networks_monitoring_service.py b/src/provisioningserver/rackdservices/networks_monitoring_service.py |
156 | index 1de76f2..d8f7e8c 100644 |
157 | --- a/src/provisioningserver/rackdservices/networks_monitoring_service.py |
158 | +++ b/src/provisioningserver/rackdservices/networks_monitoring_service.py |
159 | @@ -8,6 +8,7 @@ __all__ = [ |
160 | ] |
161 | |
162 | from provisioningserver.logger import get_maas_logger |
163 | +from provisioningserver.rpc.exceptions import NoConnectionsAvailable |
164 | from provisioningserver.rpc.region import ( |
165 | GetDiscoveryState, |
166 | ReportMDNSEntries, |
167 | @@ -16,6 +17,8 @@ from provisioningserver.rpc.region import ( |
168 | UpdateInterfaces, |
169 | ) |
170 | from provisioningserver.utils.services import NetworksMonitoringService |
171 | +from provisioningserver.utils.twisted import pause |
172 | +from twisted.internet.defer import inlineCallbacks |
173 | |
174 | |
175 | maaslog = get_maas_logger("networks.monitor") |
176 | @@ -44,20 +47,21 @@ class RackNetworksMonitoringService(NetworksMonitoringService): |
177 | d.addCallback(getState) |
178 | return d |
179 | |
180 | - def recordInterfaces(self, interfaces): |
181 | + @inlineCallbacks |
182 | + def recordInterfaces(self, interfaces, hints=None): |
183 | """Record the interfaces information to the region.""" |
184 | - def record(client): |
185 | - # On first run perform a refresh |
186 | + while self.running: |
187 | + try: |
188 | + client = yield self.clientService.getClientNow() |
189 | + except NoConnectionsAvailable: |
190 | + yield pause(1.0) |
191 | + continue |
192 | if self._recorded is None: |
193 | - return client(RequestRackRefresh, system_id=client.localIdent) |
194 | - else: |
195 | - return client( |
196 | - UpdateInterfaces, system_id=client.localIdent, |
197 | - interfaces=interfaces) |
198 | - |
199 | - d = self.clientService.getClientNow() |
200 | - d.addCallback(record) |
201 | - return d |
202 | + yield client(RequestRackRefresh, system_id=client.localIdent) |
203 | + yield client( |
204 | + UpdateInterfaces, system_id=client.localIdent, |
205 | + interfaces=interfaces, topology_hints=hints) |
206 | + break |
207 | |
208 | def reportNeighbours(self, neighbours): |
209 | """Report neighbour information to the region.""" |
210 | diff --git a/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py b/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py |
211 | index d325125..2404cfa 100644 |
212 | --- a/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py |
213 | +++ b/src/provisioningserver/rackdservices/tests/test_networks_monitoring_service.py |
214 | @@ -5,8 +5,13 @@ |
215 | |
216 | __all__ = [] |
217 | |
218 | +from unittest.mock import call |
219 | + |
220 | from maastesting.factory import factory |
221 | -from maastesting.matchers import MockCalledOnceWith |
222 | +from maastesting.matchers import ( |
223 | + MockCalledOnceWith, |
224 | + MockCallsMatch, |
225 | +) |
226 | from maastesting.testcase import ( |
227 | MAASTestCase, |
228 | MAASTwistedRunTest, |
229 | @@ -17,6 +22,7 @@ from provisioningserver.rackdservices.networks_monitoring_service import ( |
230 | ) |
231 | from provisioningserver.rpc import region |
232 | from provisioningserver.rpc.testing import MockLiveClusterToRegionRPCFixture |
233 | +from provisioningserver.utils import services as services_module |
234 | from twisted.internet.defer import ( |
235 | inlineCallbacks, |
236 | maybeDeferred, |
237 | @@ -27,7 +33,7 @@ from twisted.internet.task import Clock |
238 | |
239 | class TestRackNetworksMonitoringService(MAASTestCase): |
240 | |
241 | - run_tests_with = MAASTwistedRunTest.make_factory(debug=False, timeout=5) |
242 | + run_tests_with = MAASTwistedRunTest.make_factory(debug=True, timeout=5) |
243 | |
244 | @inlineCallbacks |
245 | def test_runs_refresh_first_time(self): |
246 | @@ -37,10 +43,14 @@ class TestRackNetworksMonitoringService(MAASTestCase): |
247 | |
248 | rpc_service = services.getServiceNamed('rpc') |
249 | service = RackNetworksMonitoringService( |
250 | - rpc_service, Clock(), enable_monitoring=False) |
251 | + rpc_service, Clock(), enable_monitoring=False, |
252 | + enable_beaconing=False) |
253 | |
254 | - yield service.startService() |
255 | - yield service.stopService() |
256 | + yield maybeDeferred(service.startService) |
257 | + # By stopping the interface_monitor first, we assure that the loop |
258 | + # happens at least once before the service stops completely. |
259 | + yield maybeDeferred(service.interface_monitor.stopService) |
260 | + yield maybeDeferred(service.stopService) |
261 | |
262 | self.assertThat( |
263 | protocol.RequestRackRefresh, MockCalledOnceWith( |
264 | @@ -64,7 +74,8 @@ class TestRackNetworksMonitoringService(MAASTestCase): |
265 | |
266 | rpc_service = services.getServiceNamed('rpc') |
267 | service = RackNetworksMonitoringService( |
268 | - rpc_service, Clock(), enable_monitoring=False) |
269 | + rpc_service, Clock(), enable_monitoring=False, |
270 | + enable_beaconing=False) |
271 | service.getInterfaces = lambda: succeed(interfaces) |
272 | # Put something in the cache. This tells recordInterfaces that refresh |
273 | # has already run but the interfaces have changed thus they need to be |
274 | @@ -77,7 +88,57 @@ class TestRackNetworksMonitoringService(MAASTestCase): |
275 | self.assertThat( |
276 | protocol.UpdateInterfaces, MockCalledOnceWith( |
277 | protocol, system_id=rpc_service.getClient().localIdent, |
278 | - interfaces=interfaces)) |
279 | + interfaces=interfaces, topology_hints=None)) |
280 | + |
281 | + @inlineCallbacks |
282 | + def test_reports_interfaces_with_hints_if_beaconing_enabled(self): |
283 | + fixture = self.useFixture(MockLiveClusterToRegionRPCFixture()) |
284 | + protocol, connecting = fixture.makeEventLoop(region.UpdateInterfaces) |
285 | + # Don't actually wait for beaconing to complete. |
286 | + pause_mock = self.patch(services_module, 'pause') |
287 | + queue_mcast_mock = self.patch( |
288 | + services_module.BeaconingSocketProtocol, 'queueMulticastBeaconing') |
289 | + self.addCleanup((yield connecting)) |
290 | + |
291 | + interfaces = { |
292 | + "eth0": { |
293 | + "type": "physical", |
294 | + "mac_address": factory.make_mac_address(), |
295 | + "parents": [], |
296 | + "links": [], |
297 | + "enabled": True, |
298 | + } |
299 | + } |
300 | + |
301 | + rpc_service = services.getServiceNamed('rpc') |
302 | + service = RackNetworksMonitoringService( |
303 | + rpc_service, Clock(), enable_monitoring=False, |
304 | + enable_beaconing=True) |
305 | + service.getInterfaces = lambda: succeed(interfaces) |
306 | + # Put something in the cache. This tells recordInterfaces that refresh |
307 | + # has already run but the interfaces have changed thus they need to be |
308 | + # updated. |
309 | + service._recorded = {} |
310 | + |
311 | + service.startService() |
312 | + yield service.stopService() |
313 | + |
314 | + self.assertThat( |
315 | + protocol.UpdateInterfaces, MockCalledOnceWith( |
316 | + protocol, system_id=rpc_service.getClient().localIdent, |
317 | + interfaces=interfaces, topology_hints=[])) |
318 | + # The service should have sent out beacons, waited three seconds, |
319 | + # solicited for more beacons, then waited another three seconds before |
320 | + # deciding that beaconing is complete. |
321 | + self.assertThat(pause_mock, MockCallsMatch(call(3.0), call(3.0))) |
322 | + self.assertThat( |
323 | + queue_mcast_mock, MockCallsMatch( |
324 | + # Called when the service starts. |
325 | + call(solicitation=True), |
326 | + # Called three seconds later. |
327 | + call(solicitation=True), |
328 | + # Not called again when the service shuts down. |
329 | + )) |
330 | |
331 | @inlineCallbacks |
332 | def test_reports_neighbours_to_region(self): |
333 | @@ -87,7 +148,8 @@ class TestRackNetworksMonitoringService(MAASTestCase): |
334 | self.addCleanup((yield connecting)) |
335 | rpc_service = services.getServiceNamed('rpc') |
336 | service = RackNetworksMonitoringService( |
337 | - rpc_service, Clock(), enable_monitoring=False) |
338 | + rpc_service, Clock(), enable_monitoring=False, |
339 | + enable_beaconing=False) |
340 | neighbours = [{"ip": factory.make_ip_address()}] |
341 | yield service.reportNeighbours(neighbours) |
342 | self.assertThat( |
343 | @@ -103,7 +165,8 @@ class TestRackNetworksMonitoringService(MAASTestCase): |
344 | self.addCleanup((yield connecting)) |
345 | rpc_service = services.getServiceNamed('rpc') |
346 | service = RackNetworksMonitoringService( |
347 | - rpc_service, Clock(), enable_monitoring=False) |
348 | + rpc_service, Clock(), enable_monitoring=False, |
349 | + enable_beaconing=False) |
350 | mdns = [ |
351 | { |
352 | 'interface': 'eth0', |
353 | @@ -126,7 +189,8 @@ class TestRackNetworksMonitoringService(MAASTestCase): |
354 | rpc_service = services.getServiceNamed('rpc') |
355 | reactor = Clock() |
356 | service = RackNetworksMonitoringService( |
357 | - rpc_service, reactor, enable_monitoring=False) |
358 | + rpc_service, reactor, enable_monitoring=False, |
359 | + enable_beaconing=False) |
360 | protocol.GetDiscoveryState.return_value = {'interfaces': {}} |
361 | # Put something in the cache. This tells recordInterfaces that refresh |
362 | # has already run but the interfaces have changed thus they need to be |
363 | diff --git a/src/provisioningserver/rpc/region.py b/src/provisioningserver/rpc/region.py |
364 | index ab16e4b..58c43ab 100644 |
365 | --- a/src/provisioningserver/rpc/region.py |
366 | +++ b/src/provisioningserver/rpc/region.py |
367 | @@ -460,6 +460,7 @@ class UpdateInterfaces(amp.Command): |
368 | arguments = [ |
369 | (b'system_id', amp.Unicode()), |
370 | (b'interfaces', StructureAsJSON()), |
371 | + (b'topology_hints', StructureAsJSON(optional=True)), |
372 | ] |
373 | response = [] |
374 | errors = [] |
375 | diff --git a/src/provisioningserver/utils/services.py b/src/provisioningserver/utils/services.py |
376 | index 5a43922..cb5c81b 100644 |
377 | --- a/src/provisioningserver/utils/services.py |
378 | +++ b/src/provisioningserver/utils/services.py |
379 | @@ -28,7 +28,6 @@ from provisioningserver.logger import ( |
380 | get_maas_logger, |
381 | LegacyLogger, |
382 | ) |
383 | -from provisioningserver.rpc.exceptions import NoConnectionsAvailable |
384 | from provisioningserver.utils.beaconing import ( |
385 | age_out_uuid_queue, |
386 | BEACON_IPV4_MULTICAST, |
387 | @@ -52,7 +51,7 @@ from provisioningserver.utils.shell import select_c_utf8_bytes_locale |
388 | from provisioningserver.utils.twisted import ( |
389 | callOut, |
390 | deferred, |
391 | - suppress, |
392 | + pause, |
393 | terminateProcess, |
394 | ) |
395 | from twisted.application.internet import TimerService |
396 | @@ -425,9 +424,15 @@ def join_ipv6_beacon_group(sock, ifindex): |
397 | socket.inet_pton(socket.AF_INET6, BEACON_IPV6_MULTICAST) + |
398 | struct.pack("I", ifindex) |
399 | ) |
400 | - sock.setsockopt( |
401 | - socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, |
402 | - ipv6_join_sockopt_args) |
403 | + try: |
404 | + sock.setsockopt( |
405 | + socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, |
406 | + ipv6_join_sockopt_args) |
407 | + except OSError: |
408 | + # Do this on a best-effort basis. We might get an "Address already in |
409 | + # use" error if the group is already joined, or (for whatever reason) |
410 | + # it is not possible to join a multicast group using this interface. |
411 | + pass |
412 | |
413 | |
414 | def set_ipv6_multicast_loopback(sock, loopback): |
415 | @@ -472,6 +477,7 @@ class BeaconingSocketProtocol(DatagramProtocol): |
416 | self.topology_hints = OrderedDict() |
417 | self.listen_port = None |
418 | self.mcast_requested = False |
419 | + self.mcast_solicitation = False |
420 | self.last_solicited_mcast = 0 |
421 | self._join_multicast_groups() |
422 | |
423 | @@ -482,8 +488,9 @@ class BeaconingSocketProtocol(DatagramProtocol): |
424 | verifyObject(IReactorMulticast, self.reactor) |
425 | except DoesNotImplement: |
426 | return |
427 | - self.listen_port = self.reactor.listenMulticast( |
428 | - self.port, self, interface=self.interface, listenMultiple=True) |
429 | + if self.listen_port is None: |
430 | + self.listen_port = self.reactor.listenMulticast( |
431 | + self.port, self, interface=self.interface, listenMultiple=True) |
432 | sock = self.transport.getHandle() |
433 | if self.loopback is True: |
434 | # This is only necessary for testing. |
435 | @@ -512,6 +519,34 @@ class BeaconingSocketProtocol(DatagramProtocol): |
436 | self.interfaces = interfaces |
437 | self._join_multicast_groups() |
438 | |
439 | + def getAllTopologyHints(self): |
440 | + """Returns the set of unique topology hints.""" |
441 | + # When beaconing runs, hints attached to individual packets might |
442 | + # come to the same conclusion about the implied fabric connectivity. |
443 | + # Use a set to prevent the region from processing duplicate hints. |
444 | + all_hints = set() |
445 | + for hints in self.topology_hints.values(): |
446 | + all_hints |= hints |
447 | + return all_hints |
448 | + |
449 | + def getJSONTopologyHints(self): |
450 | + """Returns all topology hints as a list of dictionaries. |
451 | + |
452 | + This method is used for sending data via the RPC layer, so be cautious |
453 | + when modifying. In addition, keys with no value are filtered out |
454 | + of the resulting dictionary, so that the hints are smaller on the wire. |
455 | + """ |
456 | + all_hints = self.getAllTopologyHints() |
457 | + json_hints = [ |
458 | + { |
459 | + key: value |
460 | + for key, value in hint._asdict().items() |
461 | + if value is not None |
462 | + } |
463 | + for hint in all_hints |
464 | + ] |
465 | + return json_hints |
466 | + |
467 | def stopProtocol(self): |
468 | super().stopProtocol() |
469 | if self.listen_port is not None: |
470 | @@ -629,27 +664,36 @@ class BeaconingSocketProtocol(DatagramProtocol): |
471 | reply = create_beacon_payload("advertisement", payload) |
472 | self.send_beacon(reply, beacon.reply_address) |
473 | if len(self.interfaces) > 0: |
474 | - self.queueMulticastAdvertisement() |
475 | + self.queueMulticastBeaconing() |
476 | |
477 | - def dequeueMulticastAdvertisement(self): |
478 | + def dequeueMulticastBeaconing(self): |
479 | """ |
480 | Callback to send multicast beacon advertisements. |
481 | |
482 | See `queueMulticastAdvertisement`, which schedules this method to run. |
483 | """ |
484 | mtime = time.monotonic() |
485 | + beacon_type = ( |
486 | + 'solicitation' if self.mcast_solicitation else 'advertisement') |
487 | + log.msg("Sending multicast beacon %ss." % beacon_type) |
488 | + self.send_multicast_beacons(self.interfaces, beacon_type) |
489 | self.last_solicited_mcast = mtime |
490 | self.mcast_requested = False |
491 | - log.msg("Sending multicast beacon advertisements.") |
492 | - self.send_multicast_beacons(self.interfaces, 'advertisement') |
493 | + self.mcast_solicitation = False |
494 | |
495 | - def queueMulticastAdvertisement(self): |
496 | + def queueMulticastBeaconing(self, solicitation=False): |
497 | """ |
498 | Requests that multicast advertisements be sent out on every interface. |
499 | |
500 | Ensures that advertisements will not be sent more than once every |
501 | five seconds. |
502 | + |
503 | + :param solicitation: If true, sends solicitations rather than |
504 | + advertisements. Solicitations are used to initiate "full beaconing" |
505 | + with peers; advertisements do not generate beacon replies. |
506 | """ |
507 | + if solicitation is True: |
508 | + self.mcast_solicitation = True |
509 | if self.mcast_requested: |
510 | # A multicast advertisement has been requested already. |
511 | return |
512 | @@ -660,7 +704,7 @@ class BeaconingSocketProtocol(DatagramProtocol): |
513 | else: |
514 | timeout = max(mtime - self.last_solicited_mcast, 5) |
515 | self.mcast_requested = True |
516 | - self.reactor.callLater(timeout, self.dequeueMulticastAdvertisement) |
517 | + self.reactor.callLater(timeout, self.dequeueMulticastBeaconing) |
518 | |
519 | def processTopologyHints(self, rx: ReceivedBeacon): |
520 | """ |
521 | @@ -692,9 +736,7 @@ class BeaconingSocketProtocol(DatagramProtocol): |
522 | self.topology_hints[rx.uuid] = hints |
523 | # XXX mpontillo 2017-08-07: temporary logging |
524 | log.msg("New topology hints [%s]:\n%s" % (rx.uuid, pformat(hints))) |
525 | - all_hints = set() |
526 | - for hints in self.topology_hints.values(): |
527 | - all_hints |= hints |
528 | + all_hints = self.getAllTopologyHints() |
529 | log.msg("Topology hint summary:\n%s" % pformat(all_hints)) |
530 | |
531 | def _add_remote_fabric_hints(self, hints, remote_ifinfo, rx): |
532 | @@ -905,7 +947,8 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
533 | |
534 | interval = timedelta(seconds=30).total_seconds() |
535 | |
536 | - def __init__(self, clock=None, enable_monitoring=True): |
537 | + def __init__( |
538 | + self, clock=None, enable_monitoring=True, enable_beaconing=True): |
539 | # Order is very important here. First we set the clock to the passed-in |
540 | # reactor, so that unit tests can fake out the clock if necessary. |
541 | # Then we call super(). The superclass will set up the structures |
542 | @@ -916,9 +959,11 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
543 | self.clock = clock |
544 | super().__init__() |
545 | self.enable_monitoring = enable_monitoring |
546 | + self.enable_beaconing = enable_beaconing |
547 | # The last successfully recorded interfaces. |
548 | self._recorded = None |
549 | self._monitored = frozenset() |
550 | + self._beaconing = frozenset() |
551 | self._monitoring_state = {} |
552 | self._monitoring_mdns = False |
553 | self._locked = False |
554 | @@ -935,33 +980,25 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
555 | self.interface_monitor.setServiceParent(self) |
556 | self.beaconing_protocol = None |
557 | |
558 | + @inlineCallbacks |
559 | def updateInterfaces(self): |
560 | """Update interfaces, catching and logging errors. |
561 | |
562 | This can be overridden by subclasses to conditionally update based on |
563 | some external configuration. |
564 | """ |
565 | - d = maybeDeferred(self._assumeSoleResponsibility) |
566 | - |
567 | - def update(responsible): |
568 | - if responsible: |
569 | - d = maybeDeferred(self.getInterfaces) |
570 | - d.addCallback(self._updateInterfaces) |
571 | - return d |
572 | - |
573 | - def failed(failure): |
574 | - log.err( |
575 | - failure, |
576 | - "Failed to update and/or record network interface " |
577 | - "configuration: %s" % failure.getErrorMessage()) |
578 | - |
579 | - d = d.addCallback(update) |
580 | - # During the update, we might fail to get the interface monitoring |
581 | - # state from the region. We can safely ignore this, as it will be |
582 | - # retried shortly. |
583 | - d.addErrback(suppress, NoConnectionsAvailable) |
584 | - d.addErrback(failed) |
585 | - return d |
586 | + responsible = self._assumeSoleResponsibility() |
587 | + if responsible: |
588 | + interfaces = None |
589 | + try: |
590 | + interfaces = yield maybeDeferred(self.getInterfaces) |
591 | + yield self._updateInterfaces(interfaces) |
592 | + except BaseException as e: |
593 | + msg = ( |
594 | + "Failed to update and/or record network interface " |
595 | + "configuration: %s; interfaces: %r" % (e, interfaces) |
596 | + ) |
597 | + log.err(None, msg) |
598 | |
599 | def getInterfaces(self): |
600 | """Get the current network interfaces configuration. |
601 | @@ -978,7 +1015,7 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
602 | """ |
603 | |
604 | @abstractmethod |
605 | - def recordInterfaces(self, interfaces): |
606 | + def recordInterfaces(self, interfaces, hints=None): |
607 | """Record the interfaces information. |
608 | |
609 | This MUST be overridden in subclasses. |
610 | @@ -1050,21 +1087,48 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
611 | # If we were monitoring neighbours on any interfaces, we need to |
612 | # stop the monitoring services. |
613 | self._configureNetworkDiscovery({}) |
614 | + if self.beaconing_protocol is not None: |
615 | + self._configureBeaconing({}) |
616 | |
617 | + @inlineCallbacks |
618 | def _updateInterfaces(self, interfaces): |
619 | """Record `interfaces` if they've changed.""" |
620 | if interfaces != self._recorded: |
621 | - d = maybeDeferred(self.recordInterfaces, interfaces) |
622 | + hints = None |
623 | + if self.enable_beaconing: |
624 | + self._configureBeaconing(interfaces) |
625 | + # Wait for beaconing to do its thing. |
626 | + yield pause(3.0) |
627 | + # Retry beacon soliciations, in case any packet loss occurred |
628 | + # the first time. |
629 | + self.beaconing_protocol.queueMulticastBeaconing( |
630 | + solicitation=True) |
631 | + yield pause(3.0) |
632 | + hints = self.beaconing_protocol.getJSONTopologyHints() |
633 | + yield maybeDeferred(self.recordInterfaces, interfaces, hints) |
634 | # Note: _interfacesRecorded() will reconfigure discovery after |
635 | # recording the interfaces, so there is no need to call |
636 | # _configureNetworkDiscovery() here. |
637 | - d.addCallback(callOut, self._interfacesRecorded, interfaces) |
638 | - return d |
639 | + self._interfacesRecorded(interfaces) |
640 | else: |
641 | # If the interfaces didn't change, we still need to poll for |
642 | # monitoring state changes. |
643 | - d = maybeDeferred(self._configureNetworkDiscovery, interfaces) |
644 | - return d |
645 | + yield maybeDeferred(self._configureNetworkDiscovery, interfaces) |
646 | + |
647 | + def _getInterfacesForBeaconing(self, interfaces: dict): |
648 | + """Return the interfaces which will be used for beaconing. |
649 | + |
650 | + :return: The set of interface names to run beaconing on. |
651 | + """ |
652 | + # Don't beacon when running the test suite/dev env. |
653 | + # In addition, if we don't own the lock, we should not be beaconing. |
654 | + if is_dev_environment() or not self._locked or interfaces is None: |
655 | + return set() |
656 | + monitored_interfaces = { |
657 | + ifname for ifname, ifdata in interfaces.items() |
658 | + if ifdata['monitored'] is True |
659 | + } |
660 | + return monitored_interfaces |
661 | |
662 | def _getInterfacesForNeighbourDiscovery( |
663 | self, interfaces: dict, monitoring_state: dict): |
664 | @@ -1227,6 +1291,32 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
665 | # doesn't matter for mDNS discovery purposes.) |
666 | pass |
667 | |
668 | + def _configureBeaconing(self, interfaces): |
669 | + beaconing_interfaces = self._getInterfacesForBeaconing(interfaces) |
670 | + # Calculate the difference between the sets. We need to know which |
671 | + # interfaces were added and deleted (with respect to the interfaces we |
672 | + # were already beaconing on). |
673 | + new_interfaces = beaconing_interfaces.difference(self._beaconing) |
674 | + deleted_interfaces = self._beaconing.difference(beaconing_interfaces) |
675 | + if len(new_interfaces) > 0: |
676 | + log.msg("Starting beaconing for interfaces: %r" % (new_interfaces)) |
677 | + self._startBeaconingServices(new_interfaces) |
678 | + if len(deleted_interfaces) > 0: |
679 | + log.msg( |
680 | + "Stopping beaconing for interfaces: %r" % (deleted_interfaces)) |
681 | + self._stopBeaconingServices(deleted_interfaces) |
682 | + self._beaconing = beaconing_interfaces |
683 | + if self.beaconing_protocol is None: |
684 | + self.beaconing_protocol = BeaconingSocketProtocol( |
685 | + self.clock, interfaces=interfaces) |
686 | + else: |
687 | + self.beaconing_protocol.updateInterfaces(interfaces) |
688 | + # If the interfaces have changed, perform beaconing again. |
689 | + # An empty dictionary will be passed in when the service stops, so |
690 | + # don't bother sending out beacons we won't reply to. |
691 | + if len(interfaces) > 0: |
692 | + self.beaconing_protocol.queueMulticastBeaconing(solicitation=True) |
693 | + |
694 | def _configureNeighbourDiscovery(self, interfaces, monitoring_state): |
695 | monitored_interfaces = self._getInterfacesForNeighbourDiscovery( |
696 | interfaces, monitoring_state) |
697 | @@ -1239,26 +1329,15 @@ class NetworksMonitoringService(MultiService, metaclass=ABCMeta): |
698 | log.msg("Starting neighbour discovery for interfaces: %r" % ( |
699 | new_interfaces)) |
700 | self._startNeighbourDiscoveryServices(new_interfaces) |
701 | - # XXX mpontillo 2017-07-12: for testing, just start beaconing |
702 | - # services on all the interfaces enabled for active discovery. |
703 | - self._startBeaconingServices(new_interfaces) |
704 | if len(deleted_interfaces) > 0: |
705 | log.msg( |
706 | "Stopping neighbour discovery for interfaces: %r" % ( |
707 | deleted_interfaces)) |
708 | self._stopNeighbourDiscoveryServices(deleted_interfaces) |
709 | - # XXX mpontillo 2017-07-12: this should be separately configured. |
710 | - # (see similar comment in the 'start' path above.) |
711 | - self._stopBeaconingServices(deleted_interfaces) |
712 | self._monitored = monitored_interfaces |
713 | |
714 | def _interfacesRecorded(self, interfaces): |
715 | """The given `interfaces` were recorded successfully.""" |
716 | self._recorded = interfaces |
717 | - if self.beaconing_protocol is None: |
718 | - self.beaconing_protocol = BeaconingSocketProtocol( |
719 | - self.clock, interfaces=interfaces) |
720 | - else: |
721 | - self.beaconing_protocol.updateInterfaces(interfaces) |
722 | if self.enable_monitoring is True: |
723 | self._configureNetworkDiscovery(interfaces) |
724 | diff --git a/src/provisioningserver/utils/tests/test_services.py b/src/provisioningserver/utils/tests/test_services.py |
725 | index acfc60e..36090ea 100644 |
726 | --- a/src/provisioningserver/utils/tests/test_services.py |
727 | +++ b/src/provisioningserver/utils/tests/test_services.py |
728 | @@ -76,8 +76,12 @@ from twisted.python.failure import Failure |
729 | class StubNetworksMonitoringService(NetworksMonitoringService): |
730 | """Concrete subclass for testing.""" |
731 | |
732 | - def __init__(self, enable_monitoring=False, *args, **kwargs): |
733 | - super().__init__(enable_monitoring=enable_monitoring, *args, **kwargs) |
734 | + def __init__( |
735 | + self, enable_monitoring=False, enable_beaconing=False, |
736 | + *args, **kwargs): |
737 | + super().__init__( |
738 | + *args, enable_monitoring=enable_monitoring, |
739 | + enable_beaconing=enable_beaconing, **kwargs) |
740 | self.iterations = DeferredQueue() |
741 | self.interfaces = [] |
742 | self.update_interface__calls = 0 |
743 | @@ -91,7 +95,7 @@ class StubNetworksMonitoringService(NetworksMonitoringService): |
744 | d.addBoth(self.iterations.put) |
745 | return d |
746 | |
747 | - def recordInterfaces(self, interfaces): |
748 | + def recordInterfaces(self, interfaces, hints=None): |
749 | self.interfaces.append(interfaces) |
750 | |
751 | def reportNeighbours(self, neighbours): |
752 | @@ -221,13 +225,13 @@ class TestNetworksMonitoringService(MAASTestCase): |
753 | # recordInterfaces is called the first time, as expected. |
754 | recordInterfaces.reset_mock() |
755 | yield service.updateInterfaces() |
756 | - self.assertThat(recordInterfaces, MockCalledOnceWith({})) |
757 | + self.assertThat(recordInterfaces, MockCalledOnceWith({}, None)) |
758 | |
759 | # recordInterfaces is called the second time too; the service noted |
760 | # that it crashed last time and knew to run it again. |
761 | recordInterfaces.reset_mock() |
762 | yield service.updateInterfaces() |
763 | - self.assertThat(recordInterfaces, MockCalledOnceWith({})) |
764 | + self.assertThat(recordInterfaces, MockCalledOnceWith({}, None)) |
765 | |
766 | # recordInterfaces is NOT called the third time; the service noted |
767 | # that the configuration had not changed. |
768 | @@ -1004,3 +1008,128 @@ class TestBeaconingSocketProtocol(SharedSecretTestCase): |
769 | } |
770 | self.assertThat(hints, Equals(expected_hints)) |
771 | yield protocol.stopProtocol() |
772 | + |
773 | + @inlineCallbacks |
774 | + def test__getJSONTopologyHints_converts_hints_to_dictionary(self): |
775 | + # Note: Always use a random port for testing. (port=0) |
776 | + protocol = BeaconingSocketProtocol( |
777 | + reactor, port=0, process_incoming=False, loopback=True, |
778 | + interface="::", debug=True) |
779 | + # Don't try to send out any replies. |
780 | + self.patch(services, 'create_beacon_payload') |
781 | + self.patch(protocol, 'send_beacon') |
782 | + # Need to generate a real UUID with the current time, so it doesn't |
783 | + # get aged out. |
784 | + uuid = str(uuid1()) |
785 | + # Make the protocol think we sent a beacon with this UUID already. |
786 | + tx_mac = factory.make_mac_address() |
787 | + fake_tx_beacon = FakeBeaconPayload( |
788 | + uuid, ifname='eth1', mac=tx_mac, vid=100) |
789 | + fake_rx_beacon = { |
790 | + "source_ip": "127.0.0.1", |
791 | + "source_port": 5240, |
792 | + "destination_ip": "224.0.0.118", |
793 | + "interface": "eth0", |
794 | + "type": "solicitation", |
795 | + "payload": fake_tx_beacon.payload |
796 | + } |
797 | + protocol.beaconReceived(fake_rx_beacon) |
798 | + all_hints = protocol.getJSONTopologyHints() |
799 | + expected_hints = [ |
800 | + # Note: since vid=None on the received beacon, we expect that |
801 | + # the hint won't have a 'vid' field. |
802 | + dict( |
803 | + ifname='eth0', hint="on_remote_network", |
804 | + related_ifname='eth1', related_vid=100, |
805 | + related_mac=tx_mac), |
806 | + ] |
807 | + self.assertThat(all_hints, Equals(expected_hints)) |
808 | + yield protocol.stopProtocol() |
809 | + |
810 | + @inlineCallbacks |
811 | + def test__queues_multicast_beacon_soliciations_upon_request(self): |
812 | + # Note: Always use a random port for testing. (port=0) |
813 | + clock = Clock() |
814 | + protocol = BeaconingSocketProtocol( |
815 | + clock, port=0, process_incoming=False, loopback=True, |
816 | + interface="::", debug=True) |
817 | + # Don't try to send out any replies. |
818 | + self.patch(services, 'create_beacon_payload') |
819 | + send_mcast_mock = self.patch(protocol, 'send_multicast_beacons') |
820 | + self.patch(protocol, 'send_beacon') |
821 | + yield protocol.queueMulticastBeaconing(solicitation=True) |
822 | + clock.advance(0) |
823 | + self.assertThat( |
824 | + send_mcast_mock, MockCalledOnceWith({}, 'solicitation')) |
825 | + |
826 | + @inlineCallbacks |
827 | + def test__multicasts_at_most_once_per_five_seconds(self): |
828 | + # Note: Always use a random port for testing. (port=0) |
829 | + clock = Clock() |
830 | + protocol = BeaconingSocketProtocol( |
831 | + clock, port=0, process_incoming=False, loopback=True, |
832 | + interface="::", debug=True) |
833 | + # Don't try to send out any replies. |
834 | + self.patch(services, 'create_beacon_payload') |
835 | + monotonic_mock = self.patch(services.time, 'monotonic') |
836 | + send_mcast_mock = self.patch(protocol, 'send_multicast_beacons') |
837 | + self.patch(protocol, 'send_beacon') |
838 | + monotonic_mock.side_effect = [ |
839 | + # Initial queue |
840 | + 6, |
841 | + # Initial dequeue |
842 | + 6, |
843 | + # Second queue (hasn't yet been 5 seconds) |
844 | + 10, |
845 | + # Third queue |
846 | + 11, |
847 | + # Second dequeue |
848 | + 11, |
849 | + ] |
850 | + yield protocol.queueMulticastBeaconing() |
851 | + clock.advance(0) |
852 | + self.assertThat( |
853 | + send_mcast_mock, MockCalledOnceWith({}, 'advertisement')) |
854 | + send_mcast_mock.reset_mock() |
855 | + yield protocol.queueMulticastBeaconing() |
856 | + yield protocol.queueMulticastBeaconing(solicitation=True) |
857 | + clock.advance(4.9) |
858 | + self.assertThat( |
859 | + send_mcast_mock, MockNotCalled()) |
860 | + clock.advance(0.1) |
861 | + self.assertThat( |
862 | + send_mcast_mock, MockCalledOnceWith({}, 'solicitation')) |
863 | + |
864 | + @inlineCallbacks |
865 | + def test__multiple_beacon_requests_coalesced(self): |
866 | + # Note: Always use a random port for testing. (port=0) |
867 | + clock = Clock() |
868 | + protocol = BeaconingSocketProtocol( |
869 | + clock, port=0, process_incoming=False, loopback=True, |
870 | + interface="::", debug=True) |
871 | + # Don't try to send out any replies. |
872 | + self.patch(services, 'create_beacon_payload') |
873 | + send_mcast_mock = self.patch(protocol, 'send_multicast_beacons') |
874 | + self.patch(protocol, 'send_beacon') |
875 | + yield protocol.queueMulticastBeaconing() |
876 | + yield protocol.queueMulticastBeaconing() |
877 | + clock.advance(5) |
878 | + self.assertThat( |
879 | + send_mcast_mock, MockCalledOnceWith({}, 'advertisement')) |
880 | + |
881 | + @inlineCallbacks |
882 | + def test__solicitation_wins_when_multiple_requests_queued(self): |
883 | + # Note: Always use a random port for testing. (port=0) |
884 | + clock = Clock() |
885 | + protocol = BeaconingSocketProtocol( |
886 | + clock, port=0, process_incoming=False, loopback=True, |
887 | + interface="::", debug=True) |
888 | + # Don't try to send out any replies. |
889 | + self.patch(services, 'create_beacon_payload') |
890 | + send_mcast_mock = self.patch(protocol, 'send_multicast_beacons') |
891 | + self.patch(protocol, 'send_beacon') |
892 | + yield protocol.queueMulticastBeaconing() |
893 | + yield protocol.queueMulticastBeaconing(solicitation=True) |
894 | + clock.advance(5) |
895 | + self.assertThat( |
896 | + send_mcast_mock, MockCalledOnceWith({}, 'solicitation')) |
Looks good. Just one question that I am not going to block you on. But if its the case that a region controller will never run beaconing? The question is why? Seems like it should.