Merge ~blake-rouse/maas:ha-fixes-2.2 into maas:2.2
- Git
- lp:~blake-rouse/maas
- ha-fixes-2.2
- Merge into 2.2
Proposed by
Blake Rouse
Status: | Merged | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Merge reported by: | Blake Rouse | ||||||||||||
Merged at revision: | d873fad043e7015d3594d6ec3272cc0f778e52b8 | ||||||||||||
Proposed branch: | ~blake-rouse/maas:ha-fixes-2.2 | ||||||||||||
Merge into: | maas:2.2 | ||||||||||||
Diff against target: |
874 lines (+509/-34) 13 files modified
src/maasserver/locks.py (+9/-6) src/maasserver/models/node.py (+12/-2) src/maasserver/models/tests/test_node.py (+1/-1) src/maasserver/rpc/rackcontrollers.py (+1/-1) src/maasserver/rpc/regionservice.py (+19/-3) src/maasserver/rpc/tests/test_rackcontrollers.py (+2/-2) src/maasserver/rpc/tests/test_regionservice.py (+37/-0) src/maasserver/utils/orm.py (+11/-0) src/maasserver/utils/tests/test_orm.py (+16/-0) src/provisioningserver/rackdservices/tests/test_tftp.py (+223/-0) src/provisioningserver/rackdservices/tftp.py (+41/-1) src/provisioningserver/rpc/clusterservice.py (+89/-17) src/provisioningserver/rpc/tests/test_clusterservice.py (+48/-1) |
||||||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Blake Rouse (community) | Approve | ||
Review via email: mp+332862@code.launchpad.net |
Description of the change
To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) wrote : | # |
Revision history for this message
Blake Rouse (blake-rouse) wrote : | # |
Last CI failed because of bad import, fixed and new CI run:
Revision history for this message
Blake Rouse (blake-rouse) wrote : | # |
Passed CI
Self-approving backport.
review:
Approve
Revision history for this message
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b ha-fixes-2.2 lp:~blake-rouse/maas into -b 2.2 lp:~maas-committers/maas
STATUS: FAILED BUILD
LOG: http://
Revision history for this message
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b ha-fixes-2.2 lp:~blake-rouse/maas into -b 2.2 lp:~maas-committers/maas
STATUS: FAILED BUILD
LOG: http://
Revision history for this message
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b ha-fixes-2.2 lp:~blake-rouse/maas into -b 2.2 lp:~maas-committers/maas
STATUS: FAILED MERGE
LOG: http://
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/src/maasserver/locks.py b/src/maasserver/locks.py |
2 | index b482f5c..f68539e 100644 |
3 | --- a/src/maasserver/locks.py |
4 | +++ b/src/maasserver/locks.py |
5 | @@ -9,7 +9,6 @@ __all__ = [ |
6 | "eventloop", |
7 | "import_images", |
8 | "node_acquire", |
9 | - "rack_registration", |
10 | "security", |
11 | "startup", |
12 | ] |
13 | @@ -19,7 +18,11 @@ from maasserver.utils.dblocks import ( |
14 | DatabaseXactLock, |
15 | ) |
16 | |
17 | -# Lock around starting-up a MAAS region. |
18 | +# Lock around starting-up a MAAS region and connection of rack controllers. |
19 | +# This can be a problem where a region controller and a rack controller try |
20 | +# to create there node objects at the same time. Rack registration also |
21 | +# involves populating fabrics, VLANs, and other information that may overlap |
22 | +# between rack controller. |
23 | startup = DatabaseLock(1) |
24 | |
25 | # Lock around performing critical security-related operations, like |
26 | @@ -41,10 +44,10 @@ node_acquire = DatabaseXactLock(7) |
27 | # Lock to help with concurrent allocation of IP addresses. |
28 | address_allocation = DatabaseLock(8) |
29 | |
30 | -# Lock to prevent concurrent registration of rack controllers. This can be a |
31 | -# problem because registration involves populating fabrics, VLANs, and other |
32 | -# information that may overlap between rack controller. |
33 | -rack_registration = DatabaseLock(9) |
34 | +# Lock used to be used just for rack registration. Because of lp:1705594 this |
35 | +# was consolidated into the startup lock with the region controller. |
36 | +# DO NOT USE '9' AGAIN, it is reserved so it doesn't break upgrades. |
37 | +# rack_registration = DatabaseLock(9) |
38 | |
39 | # Lock to prevent concurrent network scanning. |
40 | try_active_discovery = DatabaseLock(10).TRY |
41 | diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py |
42 | index f6ee6cc..e0c95fe 100644 |
43 | --- a/src/maasserver/models/node.py |
44 | +++ b/src/maasserver/models/node.py |
45 | @@ -54,7 +54,10 @@ from django.db.models import ( |
46 | ) |
47 | from django.db.models.query import QuerySet |
48 | from django.shortcuts import get_object_or_404 |
49 | -from maasserver import DefaultMeta |
50 | +from maasserver import ( |
51 | + DefaultMeta, |
52 | + locks, |
53 | +) |
54 | from maasserver.clusterrpc.pods import decompose_machine |
55 | from maasserver.clusterrpc.power import ( |
56 | power_cycle, |
57 | @@ -143,6 +146,7 @@ from maasserver.storage_layouts import ( |
58 | StorageLayoutError, |
59 | StorageLayoutMissingBootDiskError, |
60 | ) |
61 | +from maasserver.utils import synchronised |
62 | from maasserver.utils.dns import validate_hostname |
63 | from maasserver.utils.mac import get_vendor_for_mac |
64 | from maasserver.utils.orm import ( |
65 | @@ -151,6 +155,7 @@ from maasserver.utils.orm import ( |
66 | post_commit, |
67 | post_commit_do, |
68 | transactional, |
69 | + with_connection, |
70 | ) |
71 | from maasserver.utils.threads import ( |
72 | callOutToDatabase, |
73 | @@ -208,6 +213,7 @@ from provisioningserver.utils.twisted import ( |
74 | asynchronous, |
75 | callOut, |
76 | deferWithTimeout, |
77 | + synchronous, |
78 | ) |
79 | from twisted.internet.defer import ( |
80 | Deferred, |
81 | @@ -4595,6 +4601,10 @@ class Controller(Node): |
82 | for interface in interfaces |
83 | } |
84 | |
85 | + @synchronous |
86 | + @with_connection |
87 | + @synchronised(locks.startup) |
88 | + @transactional |
89 | def update_interfaces(self, interfaces): |
90 | """Update the interfaces attached to the controller. |
91 | |
92 | @@ -4872,7 +4882,7 @@ class RackController(Controller): |
93 | Service.objects.update_service_for( |
94 | self, "rackd", SERVICE_STATUS.DEGRADED, |
95 | "{:.0%} connected to region controllers.".format( |
96 | - percentage)) |
97 | + 1.0 - percentage)) |
98 | |
99 | def get_image_sync_status(self, boot_images=None): |
100 | """Return the status of the boot image import process.""" |
101 | diff --git a/src/maasserver/models/tests/test_node.py b/src/maasserver/models/tests/test_node.py |
102 | index 0467bba..e8ece11 100644 |
103 | --- a/src/maasserver/models/tests/test_node.py |
104 | +++ b/src/maasserver/models/tests/test_node.py |
105 | @@ -9495,7 +9495,7 @@ class TestRackController(MAASTransactionServerTestCase): |
106 | MatchesStructure.byEquality( |
107 | status=SERVICE_STATUS.DEGRADED, status_info=( |
108 | "{:.0%} connected to region controllers.".format( |
109 | - percentage)))) |
110 | + 1.0 - percentage)))) |
111 | |
112 | fake_images = [ |
113 | { |
114 | diff --git a/src/maasserver/rpc/rackcontrollers.py b/src/maasserver/rpc/rackcontrollers.py |
115 | index 9c49532..7c82548 100644 |
116 | --- a/src/maasserver/rpc/rackcontrollers.py |
117 | +++ b/src/maasserver/rpc/rackcontrollers.py |
118 | @@ -72,7 +72,7 @@ def handle_upgrade(rack_controller, nodegroup_uuid): |
119 | |
120 | @synchronous |
121 | @with_connection |
122 | -@synchronised(locks.rack_registration) |
123 | +@synchronised(locks.startup) |
124 | @transactional |
125 | def register( |
126 | system_id=None, hostname='', interfaces=None, |
127 | diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py |
128 | index 3ddf50c..534002b 100644 |
129 | --- a/src/maasserver/rpc/regionservice.py |
130 | +++ b/src/maasserver/rpc/regionservice.py |
131 | @@ -1214,15 +1214,31 @@ class RegionAdvertising: |
132 | Each tuple corresponds to somewhere an event-loop is listening |
133 | within the whole region. The `name` is the event-loop name. |
134 | """ |
135 | + # Each regiond might be running a local bridge that duplicates the |
136 | + # same IP address across region controllers. Each region controller |
137 | + # must output a set of unique of IP addresses, to prevent the rack |
138 | + # controller from connecting to a different region controller then |
139 | + # the rack controller was expecting to be connecting to. |
140 | + def _unique_to_region(address, region, regions): |
141 | + for region_obj in regions: |
142 | + if region_obj != region: |
143 | + for process in region_obj.processes.all(): |
144 | + for endpoint in process.endpoints.all(): |
145 | + if endpoint.address == address: |
146 | + return False |
147 | + return True |
148 | + |
149 | regions = RegionController.objects.all() |
150 | regions = regions.prefetch_related("processes", "processes__endpoints") |
151 | all_endpoints = [] |
152 | for region_obj in regions: |
153 | for process in region_obj.processes.all(): |
154 | for endpoint in process.endpoints.all(): |
155 | - all_endpoints.append(( |
156 | - "%s:pid=%d" % (region_obj.hostname, process.pid), |
157 | - endpoint.address, endpoint.port)) |
158 | + if _unique_to_region( |
159 | + endpoint.address, region_obj, regions): |
160 | + all_endpoints.append(( |
161 | + "%s:pid=%d" % (region_obj.hostname, process.pid), |
162 | + endpoint.address, endpoint.port)) |
163 | return all_endpoints |
164 | |
165 | @classmethod |
166 | diff --git a/src/maasserver/rpc/tests/test_rackcontrollers.py b/src/maasserver/rpc/tests/test_rackcontrollers.py |
167 | index 8c62f80..e5b075b 100644 |
168 | --- a/src/maasserver/rpc/tests/test_rackcontrollers.py |
169 | +++ b/src/maasserver/rpc/tests/test_rackcontrollers.py |
170 | @@ -288,11 +288,11 @@ class TestRegisterRackController(MAASServerTestCase): |
171 | for name, interface in interfaces.items() |
172 | ))) |
173 | |
174 | - def test_registers_with_rack_registration_lock_held(self): |
175 | + def test_registers_with_startup_lock_held(self): |
176 | lock_status = [] |
177 | |
178 | def record_lock_status(*args): |
179 | - lock_status.append(locks.rack_registration.is_locked()) |
180 | + lock_status.append(locks.startup.is_locked()) |
181 | return None # Simulate that no rack found. |
182 | |
183 | find = self.patch(rackcontrollers, "find") |
184 | diff --git a/src/maasserver/rpc/tests/test_regionservice.py b/src/maasserver/rpc/tests/test_regionservice.py |
185 | index 0a8016a..12678c5 100644 |
186 | --- a/src/maasserver/rpc/tests/test_regionservice.py |
187 | +++ b/src/maasserver/rpc/tests/test_regionservice.py |
188 | @@ -33,6 +33,7 @@ from maasserver.models import ( |
189 | RackController, |
190 | RegionController, |
191 | RegionControllerProcess, |
192 | + RegionControllerProcessEndpoint, |
193 | RegionRackRPCConnection, |
194 | Service as ServiceModel, |
195 | timestampedmodel, |
196 | @@ -1604,6 +1605,42 @@ class TestRegionAdvertising(MAASServerTestCase): |
197 | ] |
198 | self.assertItemsEqual(expected, advertising.dump()) |
199 | |
200 | + def test_dump_doesnt_duplicate_ips_across_region_controllers(self): |
201 | + duplicate_address = factory.make_ipv4_address() |
202 | + addresses = { |
203 | + (factory.make_ipv4_address(), factory.pick_port()), |
204 | + (factory.make_ipv4_address(), factory.pick_port()), |
205 | + } |
206 | + advertising = RegionAdvertising.promote() |
207 | + advertising.update( |
208 | + addresses.union({(duplicate_address, factory.pick_port()), })) |
209 | + |
210 | + other_addresses = { |
211 | + (factory.make_ipv4_address(), factory.pick_port()), |
212 | + (factory.make_ipv4_address(), factory.pick_port()), |
213 | + } |
214 | + other_region = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER) |
215 | + other_process = RegionControllerProcess.objects.create( |
216 | + region=other_region, pid=randint(1, 1000)) |
217 | + for address, port in other_addresses: |
218 | + RegionControllerProcessEndpoint.objects.create( |
219 | + process=other_process, address=address, port=port) |
220 | + RegionControllerProcessEndpoint.objects.create( |
221 | + process=other_process, |
222 | + address=duplicate_address, |
223 | + port=factory.pick_port()) |
224 | + |
225 | + expected = [ |
226 | + ("%s:pid=%d" % (gethostname(), os.getpid()), addr, port) |
227 | + for (addr, port) in addresses |
228 | + ] + [ |
229 | + ("%s:pid=%d" % ( |
230 | + other_region.hostname, other_process.pid), |
231 | + addr, port) |
232 | + for (addr, port) in other_addresses |
233 | + ] |
234 | + self.assertItemsEqual(expected, advertising.dump()) |
235 | + |
236 | def test__adds_connection_and_removes_connection(self): |
237 | advertising = RegionAdvertising.promote() |
238 | process = advertising.getRegionProcess() |
239 | diff --git a/src/maasserver/utils/orm.py b/src/maasserver/utils/orm.py |
240 | index 2ec0590..40f8a3f 100644 |
241 | --- a/src/maasserver/utils/orm.py |
242 | +++ b/src/maasserver/utils/orm.py |
243 | @@ -616,14 +616,25 @@ def connected(): |
244 | |
245 | If there is not yet a connection to the database, this will connect on |
246 | entry and disconnect on exit. Preexisting connections will be left alone. |
247 | + |
248 | + If the preexisting connection is not usable it is closed and a new |
249 | + connection is made. |
250 | """ |
251 | if connection.connection is None: |
252 | + connection.close_if_unusable_or_obsolete() |
253 | connection.ensure_connection() |
254 | try: |
255 | yield |
256 | finally: |
257 | connection.close() |
258 | + elif connection.is_usable(): |
259 | + yield |
260 | else: |
261 | + # Connection is not usable, so we disconnect and reconnect. Since |
262 | + # the connection was previously connected we do not disconnect this |
263 | + # new connection. |
264 | + connection.close_if_unusable_or_obsolete() |
265 | + connection.ensure_connection() |
266 | yield |
267 | |
268 | |
269 | diff --git a/src/maasserver/utils/tests/test_orm.py b/src/maasserver/utils/tests/test_orm.py |
270 | index 064f1b0..94ee817 100644 |
271 | --- a/src/maasserver/utils/tests/test_orm.py |
272 | +++ b/src/maasserver/utils/tests/test_orm.py |
273 | @@ -1004,6 +1004,22 @@ class TestConnected(MAASTransactionServerTestCase): |
274 | self.assertThat(connection.connection, Is(preexisting_connection)) |
275 | self.assertThat(connection.connection, Is(preexisting_connection)) |
276 | |
277 | + def test__disconnects_and_reconnects_if_not_usable(self): |
278 | + connection.ensure_connection() |
279 | + preexisting_connection = connection.connection |
280 | + |
281 | + connection.errors_occurred = True |
282 | + self.patch(connection, "is_usable").return_value = False |
283 | + |
284 | + self.assertThat(connection.connection, Not(Is(None))) |
285 | + with orm.connected(): |
286 | + self.assertThat( |
287 | + connection.connection, Not(Is(preexisting_connection))) |
288 | + self.assertThat(connection.connection, Not(Is(None))) |
289 | + |
290 | + self.assertThat(connection.connection, Not(Is(preexisting_connection))) |
291 | + self.assertThat(connection.connection, Not(Is(None))) |
292 | + |
293 | |
294 | class TestWithConnection(MAASTransactionServerTestCase): |
295 | """Tests for the `orm.with_connection` decorator.""" |
296 | diff --git a/src/provisioningserver/rackdservices/tests/test_tftp.py b/src/provisioningserver/rackdservices/tests/test_tftp.py |
297 | index 1082d6a..14510d9 100644 |
298 | --- a/src/provisioningserver/rackdservices/tests/test_tftp.py |
299 | +++ b/src/provisioningserver/rackdservices/tests/test_tftp.py |
300 | @@ -379,6 +379,229 @@ class TestTFTPBackend(MAASTestCase): |
301 | ) |
302 | |
303 | @inlineCallbacks |
304 | + def test_get_boot_method_reader_uses_same_client(self): |
305 | + # Fake configuration parameters, as discovered from the file path. |
306 | + fake_params = {"mac": factory.make_mac_address("-")} |
307 | + # Fake kernel configuration parameters, as returned from the RPC call. |
308 | + fake_kernel_params = make_kernel_parameters() |
309 | + fake_params = fake_kernel_params._asdict() |
310 | + |
311 | + # Stub the output of list_boot_images so the label is set in the |
312 | + # kernel parameters. |
313 | + boot_image = { |
314 | + "osystem": fake_params["osystem"], |
315 | + "release": fake_params["release"], |
316 | + "architecture": fake_params["arch"], |
317 | + "subarchitecture": fake_params["subarch"], |
318 | + "purpose": fake_params["purpose"], |
319 | + "supported_subarches": "", |
320 | + "label": fake_params["label"], |
321 | + } |
322 | + self.patch(tftp_module, "list_boot_images").return_value = [boot_image] |
323 | + del fake_params["label"] |
324 | + |
325 | + # Stub RPC call to return the fake configuration parameters. |
326 | + clients = [] |
327 | + for _ in range(10): |
328 | + client = Mock() |
329 | + client.localIdent = factory.make_name("system_id") |
330 | + client.side_effect = lambda *args, **kwargs: ( |
331 | + succeed(dict(fake_params))) |
332 | + clients.append(client) |
333 | + client_service = Mock() |
334 | + client_service.getClientNow.side_effect = [ |
335 | + succeed(client) |
336 | + for client in clients |
337 | + ] |
338 | + client_service.getAllClients.return_value = clients |
339 | + |
340 | + # get_boot_method_reader() takes a dict() of parameters and returns an |
341 | + # `IReader` of a PXE configuration, rendered by |
342 | + # `PXEBootMethod.get_reader`. |
343 | + backend = TFTPBackend( |
344 | + self.make_dir(), client_service) |
345 | + |
346 | + # Stub get_reader to return the render parameters. |
347 | + method = PXEBootMethod() |
348 | + fake_render_result = factory.make_name("render").encode("utf-8") |
349 | + render_patch = self.patch(method, "get_reader") |
350 | + render_patch.return_value = BytesReader(fake_render_result) |
351 | + |
352 | + # Get the reader once. |
353 | + remote_ip = factory.make_ipv4_address() |
354 | + params_with_ip = dict(fake_params) |
355 | + params_with_ip['remote_ip'] = remote_ip |
356 | + reader = yield backend.get_boot_method_reader(method, params_with_ip) |
357 | + self.addCleanup(reader.finish) |
358 | + |
359 | + # Get the reader twice. |
360 | + params_with_ip = dict(fake_params) |
361 | + params_with_ip['remote_ip'] = remote_ip |
362 | + reader = yield backend.get_boot_method_reader(method, params_with_ip) |
363 | + self.addCleanup(reader.finish) |
364 | + |
365 | + # Only one client is saved. |
366 | + self.assertEquals(clients[0], backend.client_to_remote[remote_ip]) |
367 | + |
368 | + # Only the first client should have been called twice, and all the |
369 | + # other clients should not have been called. |
370 | + self.assertEquals(2, clients[0].call_count) |
371 | + for idx in range(1, 10): |
372 | + self.assertThat(clients[idx], MockNotCalled()) |
373 | + |
374 | + @inlineCallbacks |
375 | + def test_get_boot_method_reader_uses_different_clients(self): |
376 | + # Fake configuration parameters, as discovered from the file path. |
377 | + fake_params = {"mac": factory.make_mac_address("-")} |
378 | + # Fake kernel configuration parameters, as returned from the RPC call. |
379 | + fake_kernel_params = make_kernel_parameters() |
380 | + fake_params = fake_kernel_params._asdict() |
381 | + |
382 | + # Stub the output of list_boot_images so the label is set in the |
383 | + # kernel parameters. |
384 | + boot_image = { |
385 | + "osystem": fake_params["osystem"], |
386 | + "release": fake_params["release"], |
387 | + "architecture": fake_params["arch"], |
388 | + "subarchitecture": fake_params["subarch"], |
389 | + "purpose": fake_params["purpose"], |
390 | + "supported_subarches": "", |
391 | + "label": fake_params["label"], |
392 | + } |
393 | + self.patch(tftp_module, "list_boot_images").return_value = [boot_image] |
394 | + del fake_params["label"] |
395 | + |
396 | + # Stub RPC call to return the fake configuration parameters. |
397 | + clients = [] |
398 | + for _ in range(10): |
399 | + client = Mock() |
400 | + client.localIdent = factory.make_name("system_id") |
401 | + client.side_effect = lambda *args, **kwargs: ( |
402 | + succeed(dict(fake_params))) |
403 | + clients.append(client) |
404 | + client_service = Mock() |
405 | + client_service.getClientNow.side_effect = [ |
406 | + succeed(client) |
407 | + for client in clients |
408 | + ] |
409 | + client_service.getAllClients.return_value = clients |
410 | + |
411 | + # get_boot_method_reader() takes a dict() of parameters and returns an |
412 | + # `IReader` of a PXE configuration, rendered by |
413 | + # `PXEBootMethod.get_reader`. |
414 | + backend = TFTPBackend( |
415 | + self.make_dir(), client_service) |
416 | + |
417 | + # Stub get_reader to return the render parameters. |
418 | + method = PXEBootMethod() |
419 | + fake_render_result = factory.make_name("render").encode("utf-8") |
420 | + render_patch = self.patch(method, "get_reader") |
421 | + render_patch.return_value = BytesReader(fake_render_result) |
422 | + |
423 | + # Get the reader once. |
424 | + remote_ip_one = factory.make_ipv4_address() |
425 | + params_with_ip = dict(fake_params) |
426 | + params_with_ip['remote_ip'] = remote_ip_one |
427 | + reader = yield backend.get_boot_method_reader(method, params_with_ip) |
428 | + self.addCleanup(reader.finish) |
429 | + |
430 | + # Get the reader twice. |
431 | + remote_ip_two = factory.make_ipv4_address() |
432 | + params_with_ip = dict(fake_params) |
433 | + params_with_ip['remote_ip'] = remote_ip_two |
434 | + reader = yield backend.get_boot_method_reader(method, params_with_ip) |
435 | + self.addCleanup(reader.finish) |
436 | + |
437 | + # The both clients are saved. |
438 | + self.assertEquals(clients[0], backend.client_to_remote[remote_ip_one]) |
439 | + self.assertEquals(clients[1], backend.client_to_remote[remote_ip_two]) |
440 | + |
441 | + # Only the first and second client should have been called once, and |
442 | + # all the other clients should not have been called. |
443 | + self.assertEquals(1, clients[0].call_count) |
444 | + self.assertEquals(1, clients[1].call_count) |
445 | + for idx in range(2, 10): |
446 | + self.assertThat(clients[idx], MockNotCalled()) |
447 | + |
448 | + @inlineCallbacks |
449 | + def test_get_boot_method_reader_grabs_new_client_on_lost_conn(self): |
450 | + # Fake configuration parameters, as discovered from the file path. |
451 | + fake_params = {"mac": factory.make_mac_address("-")} |
452 | + # Fake kernel configuration parameters, as returned from the RPC call. |
453 | + fake_kernel_params = make_kernel_parameters() |
454 | + fake_params = fake_kernel_params._asdict() |
455 | + |
456 | + # Stub the output of list_boot_images so the label is set in the |
457 | + # kernel parameters. |
458 | + boot_image = { |
459 | + "osystem": fake_params["osystem"], |
460 | + "release": fake_params["release"], |
461 | + "architecture": fake_params["arch"], |
462 | + "subarchitecture": fake_params["subarch"], |
463 | + "purpose": fake_params["purpose"], |
464 | + "supported_subarches": "", |
465 | + "label": fake_params["label"], |
466 | + } |
467 | + self.patch(tftp_module, "list_boot_images").return_value = [boot_image] |
468 | + del fake_params["label"] |
469 | + |
470 | + # Stub RPC call to return the fake configuration parameters. |
471 | + clients = [] |
472 | + for _ in range(10): |
473 | + client = Mock() |
474 | + client.localIdent = factory.make_name("system_id") |
475 | + client.side_effect = lambda *args, **kwargs: ( |
476 | + succeed(dict(fake_params))) |
477 | + clients.append(client) |
478 | + client_service = Mock() |
479 | + client_service.getClientNow.side_effect = [ |
480 | + succeed(client) |
481 | + for client in clients |
482 | + ] |
483 | + client_service.getAllClients.side_effect = [ |
484 | + clients[1:], |
485 | + clients[2:], |
486 | + ] |
487 | + |
488 | + # get_boot_method_reader() takes a dict() of parameters and returns an |
489 | + # `IReader` of a PXE configuration, rendered by |
490 | + # `PXEBootMethod.get_reader`. |
491 | + backend = TFTPBackend( |
492 | + self.make_dir(), client_service) |
493 | + |
494 | + # Stub get_reader to return the render parameters. |
495 | + method = PXEBootMethod() |
496 | + fake_render_result = factory.make_name("render").encode("utf-8") |
497 | + render_patch = self.patch(method, "get_reader") |
498 | + render_patch.return_value = BytesReader(fake_render_result) |
499 | + |
500 | + # Get the reader once. |
501 | + remote_ip = factory.make_ipv4_address() |
502 | + params_with_ip = dict(fake_params) |
503 | + params_with_ip['remote_ip'] = remote_ip |
504 | + reader = yield backend.get_boot_method_reader(method, params_with_ip) |
505 | + self.addCleanup(reader.finish) |
506 | + |
507 | + # The first client is now saved. |
508 | + self.assertEquals(clients[0], backend.client_to_remote[remote_ip]) |
509 | + |
510 | + # Get the reader twice. |
511 | + params_with_ip = dict(fake_params) |
512 | + params_with_ip['remote_ip'] = remote_ip |
513 | + reader = yield backend.get_boot_method_reader(method, params_with_ip) |
514 | + self.addCleanup(reader.finish) |
515 | + |
516 | + # The second client is now saved. |
517 | + self.assertEquals(clients[1], backend.client_to_remote[remote_ip]) |
518 | + |
519 | + # Only the first and second client should have been called once, and |
520 | + # all the other clients should not have been called. |
521 | + self.assertEquals(1, clients[0].call_count) |
522 | + self.assertEquals(1, clients[1].call_count) |
523 | + for idx in range(2, 10): |
524 | + self.assertThat(clients[idx], MockNotCalled()) |
525 | + |
526 | + @inlineCallbacks |
527 | def test_get_boot_method_reader_returns_rendered_params(self): |
528 | # Fake configuration parameters, as discovered from the file path. |
529 | fake_params = {"mac": factory.make_mac_address("-")} |
530 | diff --git a/src/provisioningserver/rackdservices/tftp.py b/src/provisioningserver/rackdservices/tftp.py |
531 | index b56d4b3..5dc1547 100644 |
532 | --- a/src/provisioningserver/rackdservices/tftp.py |
533 | +++ b/src/provisioningserver/rackdservices/tftp.py |
534 | @@ -66,6 +66,7 @@ from twisted.internet.defer import ( |
535 | inlineCallbacks, |
536 | maybeDeferred, |
537 | returnValue, |
538 | + succeed, |
539 | ) |
540 | from twisted.internet.task import deferLater |
541 | from twisted.python.filepath import FilePath |
542 | @@ -163,9 +164,48 @@ class TFTPBackend(FilesystemSynchronousBackend): |
543 | base_path = FilePath(base_path) |
544 | super(TFTPBackend, self).__init__( |
545 | base_path, can_read=True, can_write=False) |
546 | + self.client_to_remote = {} |
547 | self.client_service = client_service |
548 | self.fetcher = RPCFetcher() |
549 | |
550 | + def _get_new_client_for_remote(self, remote_ip): |
551 | + """Return a new client for the `remote_ip`. |
552 | + |
553 | + Don't use directly called from `get_client_for`. |
554 | + """ |
555 | + def store_client(client): |
556 | + self.client_to_remote[remote_ip] = client |
557 | + return client |
558 | + |
559 | + d = self.client_service.getClientNow() |
560 | + d.addCallback(store_client) |
561 | + return d |
562 | + |
563 | + def get_client_for(self, params): |
564 | + """Always gets the same client based on `params`. |
565 | + |
566 | + This is done so that all TFTP requests from the same remote client go |
567 | + to the same regiond process. `RPCFetcher` only duplciate on the client |
568 | + and arguments, so if the client is not the same the duplicate effort |
569 | + is not consolidated. |
570 | + """ |
571 | + remote_ip = params.get('remote_ip') |
572 | + if remote_ip: |
573 | + client = self.client_to_remote.get(remote_ip, None) |
574 | + if client is None: |
575 | + # Get a new client for the remote_ip. |
576 | + return self._get_new_client_for_remote(remote_ip) |
577 | + else: |
578 | + # Check that the existing client is still valid. |
579 | + clients = self.client_service.getAllClients() |
580 | + if client in clients: |
581 | + return succeed(client) |
582 | + else: |
583 | + del self.client_to_remote[remote_ip] |
584 | + return self._get_new_client_for_remote(remote_ip) |
585 | + else: |
586 | + return self.client_service.getClientNow() |
587 | + |
588 | @inlineCallbacks |
589 | @typed |
590 | def get_boot_method(self, file_name: TFTPPath): |
591 | @@ -243,7 +283,7 @@ class TFTPBackend(FilesystemSynchronousBackend): |
592 | d.addCallback(lambda data: KernelParameters(**data)) |
593 | return d |
594 | |
595 | - d = self.client_service.getClientNow() |
596 | + d = self.get_client_for(params) |
597 | d.addCallback(fetch, params) |
598 | return d |
599 | |
600 | diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py |
601 | index 26e2172..a4ab2ba 100644 |
602 | --- a/src/provisioningserver/rpc/clusterservice.py |
603 | +++ b/src/provisioningserver/rpc/clusterservice.py |
604 | @@ -106,6 +106,7 @@ from twisted import web |
605 | from twisted.application.internet import TimerService |
606 | from twisted.internet import reactor |
607 | from twisted.internet.defer import ( |
608 | + DeferredList, |
609 | inlineCallbacks, |
610 | maybeDeferred, |
611 | returnValue, |
612 | @@ -784,6 +785,8 @@ class ClusterClient(Cluster): |
613 | log.msg("Event-loop '%s' authenticated." % self.ident) |
614 | registered = yield self.registerRackWithRegion() |
615 | if registered: |
616 | + if self.eventloop in self.service.try_connections: |
617 | + del self.service.try_connections[self.eventloop] |
618 | self.service.connections[self.eventloop] = self |
619 | self.ready.set(self.eventloop) |
620 | else: |
621 | @@ -935,6 +938,8 @@ class ClusterClientService(TimerService, object): |
622 | super(ClusterClientService, self).__init__( |
623 | self._calculate_interval(None, None), self._tryUpdate) |
624 | self.connections = {} |
625 | + self.try_connections = {} |
626 | + self._previous_work = (None, None) |
627 | self.clock = reactor |
628 | |
629 | # XXX jtv 2014-09-23, bug=1372767: Fix |
630 | @@ -1172,6 +1177,48 @@ class ClusterClientService(TimerService, object): |
631 | ] |
632 | for name, addresses in eventloops.items() |
633 | } |
634 | + |
635 | + drop, connect = self._calculate_work(eventloops) |
636 | + |
637 | + # Log fully connected only once. If that state changes then log |
638 | + # it again. This prevents flooding the log with the same message when |
639 | + # the state of the connections has not changed. |
640 | + prev_work, self._previous_work = self._previous_work, (drop, connect) |
641 | + if len(drop) == 0 and len(connect) == 0: |
642 | + if prev_work != (drop, connect) and len(eventloops) > 0: |
643 | + controllers = { |
644 | + eventloop.split(':')[0] |
645 | + for eventloop, _ in eventloops.items() |
646 | + } |
647 | + log.msg( |
648 | + "Fully connected to all %d event-loops on all %d " |
649 | + "region controllers (%s)." % ( |
650 | + len(eventloops), len(controllers), |
651 | + ', '.join(sorted(controllers)))) |
652 | + |
653 | + # Drop all connections at once, as the are no longer required. |
654 | + if len(drop) > 0: |
655 | + log.msg("Dropping connections to event-loops: %s" % ( |
656 | + ', '.join(drop.keys()))) |
657 | + yield DeferredList([ |
658 | + maybeDeferred(self._drop_connection, connection) |
659 | + for eventloop, connections in drop.items() |
660 | + for connection in connections |
661 | + ], consumeErrors=True) |
662 | + |
663 | + # Make all the new connections to each endpoint at the same time. |
664 | + if len(connect) > 0: |
665 | + log.msg("Making connections to event-loops: %s" % ( |
666 | + ', '.join(connect.keys()))) |
667 | + yield DeferredList([ |
668 | + self._make_connections(eventloop, addresses) |
669 | + for eventloop, addresses in connect.items() |
670 | + ], consumeErrors=True) |
671 | + |
672 | + def _calculate_work(self, eventloops): |
673 | + """Calculate the work that needs to be performed for reconnection.""" |
674 | + drop, connect = {}, {} |
675 | + |
676 | # Drop connections to event-loops that no longer include one of |
677 | # this cluster's established connections among its advertised |
678 | # endpoints. This is most likely to have happened because of |
679 | @@ -1183,23 +1230,20 @@ class ClusterClientService(TimerService, object): |
680 | if eventloop in self.connections: |
681 | connection = self.connections[eventloop] |
682 | if connection.address not in addresses: |
683 | - yield self._drop_connection(connection) |
684 | + drop[eventloop] = [connection] |
685 | + if eventloop in self.try_connections: |
686 | + connection = self.try_connections[eventloop] |
687 | + if connection.address not in addresses: |
688 | + drop[eventloop] = [connection] |
689 | + |
690 | # Create new connections to event-loops that the cluster does |
691 | - # not yet have a connection to. Try each advertised endpoint |
692 | - # (address) in turn until one of them bites. |
693 | + # not yet have a connection to. |
694 | for eventloop, addresses in eventloops.items(): |
695 | - if eventloop not in self.connections: |
696 | - for address in addresses: |
697 | - try: |
698 | - yield self._make_connection(eventloop, address) |
699 | - except ConnectError as error: |
700 | - host, port = address |
701 | - log.msg("Event-loop %s (%s:%d): %s" % ( |
702 | - eventloop, host, port, error)) |
703 | - except: |
704 | - log.err(None, "Failure making new RPC connection.") |
705 | - else: |
706 | - break |
707 | + if ((eventloop not in self.connections and |
708 | + eventloop not in self.try_connections) or |
709 | + eventloop in drop): |
710 | + connect[eventloop] = addresses |
711 | + |
712 | # Remove connections to event-loops that are no longer |
713 | # advertised by the RPC info view. Most likely this means that |
714 | # the process in which the event-loop is no longer running, but |
715 | @@ -1208,7 +1252,32 @@ class ClusterClientService(TimerService, object): |
716 | for eventloop in self.connections: |
717 | if eventloop not in eventloops: |
718 | connection = self.connections[eventloop] |
719 | - yield self._drop_connection(connection) |
720 | + drop[eventloop] = [connection] |
721 | + for eventloop in self.try_connections: |
722 | + if eventloop not in eventloops: |
723 | + connection = self.try_connections[eventloop] |
724 | + drop[eventloop] = [connection] |
725 | + |
726 | + return drop, connect |
727 | + |
728 | + @inlineCallbacks |
729 | + def _make_connections(self, eventloop, addresses): |
730 | + """Connect to `eventloop` using all `addresses`.""" |
731 | + for address in addresses: |
732 | + try: |
733 | + connection = yield self._make_connection(eventloop, address) |
734 | + except ConnectError as error: |
735 | + host, port = address |
736 | + log.msg("Event-loop %s (%s:%d): %s" % ( |
737 | + eventloop, host, port, error)) |
738 | + except: |
739 | + host, port = address |
740 | + log.err(None, ( |
741 | + "Failure with event-loop %s (%s:%d)" % ( |
742 | + eventloop, host, port))) |
743 | + else: |
744 | + self.try_connections[eventloop] = connection |
745 | + break |
746 | |
747 | def _make_connection(self, eventloop, address): |
748 | """Connect to `eventloop` at `address`.""" |
749 | @@ -1227,6 +1296,9 @@ class ClusterClientService(TimerService, object): |
750 | If this is the last connection that was keeping rackd connected to |
751 | a regiond then dhcpd and dhcpd6 services will be turned off. |
752 | """ |
753 | + if eventloop in self.try_connections: |
754 | + if self.try_connections[eventloop] is connection: |
755 | + del self.try_connections[eventloop] |
756 | if eventloop in self.connections: |
757 | if self.connections[eventloop] is connection: |
758 | del self.connections[eventloop] |
759 | @@ -1242,7 +1314,7 @@ class ClusterClientService(TimerService, object): |
760 | dhcp_v6.off() |
761 | stopping_services.append("dhcpd6") |
762 | if len(stopping_services) > 0: |
763 | - maaslog.error( |
764 | + log.msg( |
765 | "Lost all connections to region controllers. " |
766 | "Stopping service(s) %s." % ",".join(stopping_services)) |
767 | service_monitor.ensureServices() |
768 | diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py |
769 | index e12a107..f8177f4 100644 |
770 | --- a/src/provisioningserver/rpc/tests/test_clusterservice.py |
771 | +++ b/src/provisioningserver/rpc/tests/test_clusterservice.py |
772 | @@ -613,7 +613,9 @@ class TestClusterClientService(MAASTestCase): |
773 | @inlineCallbacks |
774 | def test__update_connections_initially(self): |
775 | service = ClusterClientService(Clock()) |
776 | + mock_client = Mock() |
777 | _make_connection = self.patch(service, "_make_connection") |
778 | + _make_connection.side_effect = lambda *args: succeed(mock_client) |
779 | _drop_connection = self.patch(service, "_drop_connection") |
780 | |
781 | info = json.loads(self.example_rpc_info_view_response.decode("ascii")) |
782 | @@ -627,10 +629,40 @@ class TestClusterClientService(MAASTestCase): |
783 | self.assertItemsEqual( |
784 | _make_connection_expected, |
785 | _make_connection.call_args_list) |
786 | + self.assertEquals({ |
787 | + "host1:pid=1001": mock_client, |
788 | + "host1:pid=2002": mock_client, |
789 | + "host2:pid=3003": mock_client, |
790 | + }, service.try_connections) |
791 | |
792 | self.assertEqual([], _drop_connection.mock_calls) |
793 | |
794 | @inlineCallbacks |
795 | + def test__update_connections_logs_fully_connected(self): |
796 | + service = ClusterClientService(Clock()) |
797 | + eventloops = { |
798 | + "region1:123": [("::ffff:127.0.0.1", 1234)], |
799 | + "region1:124": [("::ffff:127.0.0.1", 1235)], |
800 | + "region2:123": [("::ffff:127.0.0.2", 1234)], |
801 | + "region2:124": [("::ffff:127.0.0.2", 1235)], |
802 | + } |
803 | + for eventloop, addresses in eventloops.items(): |
804 | + for address in addresses: |
805 | + client = Mock() |
806 | + client.address = address |
807 | + service.connections[eventloop] = client |
808 | + |
809 | + logger = self.useFixture(TwistedLoggerFixture()) |
810 | + |
811 | + yield service._update_connections(eventloops) |
812 | + # Second call should not add it to the log. |
813 | + yield service._update_connections(eventloops) |
814 | + |
815 | + self.assertEqual( |
816 | + "Fully connected to all 4 event-loops on all 2 region " |
817 | + "controllers (region1, region2).", logger.dump()) |
818 | + |
819 | + @inlineCallbacks |
820 | def test__update_connections_connect_error_is_logged_tersely(self): |
821 | service = ClusterClientService(Clock()) |
822 | _make_connection = self.patch(service, "_make_connection") |
823 | @@ -646,6 +678,8 @@ class TestClusterClientService(MAASTestCase): |
824 | MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234))) |
825 | |
826 | self.assertEqual( |
827 | + "Making connections to event-loops: an-event-loop\n" |
828 | + "---\n" |
829 | "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection " |
830 | "was refused by other side.", logger.dump()) |
831 | |
832 | @@ -666,7 +700,9 @@ class TestClusterClientService(MAASTestCase): |
833 | |
834 | self.assertDocTestMatches( |
835 | """\ |
836 | - Failure making new RPC connection. |
837 | + Making connections to event-loops: an-event-loop |
838 | + --- |
839 | + Failure with event-loop an-event-loop (::ffff:127.0.0.1:1234) |
840 | Traceback (most recent call last): |
841 | ... |
842 | builtins.RuntimeError: Something went wrong. |
843 | @@ -774,6 +810,15 @@ class TestClusterClientService(MAASTestCase): |
844 | connection.transport.loseConnection, |
845 | MockCalledOnceWith()) |
846 | |
847 | + def test__remove_connection_removes_from_try_connections(self): |
848 | + service = make_inert_client_service() |
849 | + service.startService() |
850 | + endpoint = Mock() |
851 | + connection = Mock() |
852 | + service.try_connections[endpoint] = connection |
853 | + service.remove_connection(endpoint, connection) |
854 | + self.assertThat(service.try_connections, Equals({})) |
855 | + |
856 | def test__remove_connection_removes_from_connections(self): |
857 | service = make_inert_client_service() |
858 | service.startService() |
859 | @@ -1040,6 +1085,7 @@ class TestClusterClient(MAASTestCase): |
860 | |
861 | def test_connecting(self): |
862 | client = self.make_running_client() |
863 | + client.service.try_connections[client.eventloop] = client |
864 | self.patch_authenticate_for_success(client) |
865 | self.patch_register_for_success(client) |
866 | self.assertEqual(client.service.connections, {}) |
867 | @@ -1053,6 +1099,7 @@ class TestClusterClient(MAASTestCase): |
868 | self.assertTrue(extract_result(wait_for_authenticated)) |
869 | # ready has been set with the name of the event-loop. |
870 | self.assertEqual(client.eventloop, extract_result(wait_for_ready)) |
871 | + self.assertEqual(client.service.try_connections, {}) |
872 | self.assertEqual( |
873 | client.service.connections, |
874 | {client.eventloop: client}) |
Currently running in the CI to verify a good backport.
http:// 162.213. 35.104: 8080/job/ maas-git- manual/ 78/console