Merge ~blake-rouse/maas:ha-fixes-2.2 into maas:2.2

Proposed by Blake Rouse
Status: Merged
Merge reported by: Blake Rouse
Merged at revision: d873fad043e7015d3594d6ec3272cc0f778e52b8
Proposed branch: ~blake-rouse/maas:ha-fixes-2.2
Merge into: maas:2.2
Diff against target: 874 lines (+509/-34)
13 files modified
src/maasserver/locks.py (+9/-6)
src/maasserver/models/node.py (+12/-2)
src/maasserver/models/tests/test_node.py (+1/-1)
src/maasserver/rpc/rackcontrollers.py (+1/-1)
src/maasserver/rpc/regionservice.py (+19/-3)
src/maasserver/rpc/tests/test_rackcontrollers.py (+2/-2)
src/maasserver/rpc/tests/test_regionservice.py (+37/-0)
src/maasserver/utils/orm.py (+11/-0)
src/maasserver/utils/tests/test_orm.py (+16/-0)
src/provisioningserver/rackdservices/tests/test_tftp.py (+223/-0)
src/provisioningserver/rackdservices/tftp.py (+41/-1)
src/provisioningserver/rpc/clusterservice.py (+89/-17)
src/provisioningserver/rpc/tests/test_clusterservice.py (+48/-1)
Reviewer Review Type Date Requested Status
Blake Rouse (community) Approve
Review via email: mp+332862@code.launchpad.net

Commit message

Fixes LP: #1705594, #1724677, #1727073.

Backports of ad6874821711208d0e420ee2168862b59cf16cef, 98d01e01e0064fa62e1cda75363805c50398220d, d388d7ad9e5660973cddd81e09cc52ec4e8e2595, b2f6b7759e6345b1a81f9e9b1b3b6fedfefd84ce from master.

To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) wrote :

Currently running in the CI to verify a good backport.

http://162.213.35.104:8080/job/maas-git-manual/78/console

Revision history for this message
Blake Rouse (blake-rouse) wrote :

Last CI failed because of bad import, fixed and new CI run:

http://162.213.35.104:8080/job/maas-git-manual/79/console

Revision history for this message
Blake Rouse (blake-rouse) wrote :

Passed CI

Self-approving backport.

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :
Revision history for this message
MAAS Lander (maas-lander) wrote :
Revision history for this message
MAAS Lander (maas-lander) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maasserver/locks.py b/src/maasserver/locks.py
2index b482f5c..f68539e 100644
3--- a/src/maasserver/locks.py
4+++ b/src/maasserver/locks.py
5@@ -9,7 +9,6 @@ __all__ = [
6 "eventloop",
7 "import_images",
8 "node_acquire",
9- "rack_registration",
10 "security",
11 "startup",
12 ]
13@@ -19,7 +18,11 @@ from maasserver.utils.dblocks import (
14 DatabaseXactLock,
15 )
16
17-# Lock around starting-up a MAAS region.
18+# Lock around starting-up a MAAS region and connection of rack controllers.
19+# This can be a problem where a region controller and a rack controller try
20+# to create there node objects at the same time. Rack registration also
21+# involves populating fabrics, VLANs, and other information that may overlap
22+# between rack controller.
23 startup = DatabaseLock(1)
24
25 # Lock around performing critical security-related operations, like
26@@ -41,10 +44,10 @@ node_acquire = DatabaseXactLock(7)
27 # Lock to help with concurrent allocation of IP addresses.
28 address_allocation = DatabaseLock(8)
29
30-# Lock to prevent concurrent registration of rack controllers. This can be a
31-# problem because registration involves populating fabrics, VLANs, and other
32-# information that may overlap between rack controller.
33-rack_registration = DatabaseLock(9)
34+# Lock used to be used just for rack registration. Because of lp:1705594 this
35+# was consolidated into the startup lock with the region controller.
36+# DO NOT USE '9' AGAIN, it is reserved so it doesn't break upgrades.
37+# rack_registration = DatabaseLock(9)
38
39 # Lock to prevent concurrent network scanning.
40 try_active_discovery = DatabaseLock(10).TRY
41diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py
42index f6ee6cc..e0c95fe 100644
43--- a/src/maasserver/models/node.py
44+++ b/src/maasserver/models/node.py
45@@ -54,7 +54,10 @@ from django.db.models import (
46 )
47 from django.db.models.query import QuerySet
48 from django.shortcuts import get_object_or_404
49-from maasserver import DefaultMeta
50+from maasserver import (
51+ DefaultMeta,
52+ locks,
53+)
54 from maasserver.clusterrpc.pods import decompose_machine
55 from maasserver.clusterrpc.power import (
56 power_cycle,
57@@ -143,6 +146,7 @@ from maasserver.storage_layouts import (
58 StorageLayoutError,
59 StorageLayoutMissingBootDiskError,
60 )
61+from maasserver.utils import synchronised
62 from maasserver.utils.dns import validate_hostname
63 from maasserver.utils.mac import get_vendor_for_mac
64 from maasserver.utils.orm import (
65@@ -151,6 +155,7 @@ from maasserver.utils.orm import (
66 post_commit,
67 post_commit_do,
68 transactional,
69+ with_connection,
70 )
71 from maasserver.utils.threads import (
72 callOutToDatabase,
73@@ -208,6 +213,7 @@ from provisioningserver.utils.twisted import (
74 asynchronous,
75 callOut,
76 deferWithTimeout,
77+ synchronous,
78 )
79 from twisted.internet.defer import (
80 Deferred,
81@@ -4595,6 +4601,10 @@ class Controller(Node):
82 for interface in interfaces
83 }
84
85+ @synchronous
86+ @with_connection
87+ @synchronised(locks.startup)
88+ @transactional
89 def update_interfaces(self, interfaces):
90 """Update the interfaces attached to the controller.
91
92@@ -4872,7 +4882,7 @@ class RackController(Controller):
93 Service.objects.update_service_for(
94 self, "rackd", SERVICE_STATUS.DEGRADED,
95 "{:.0%} connected to region controllers.".format(
96- percentage))
97+ 1.0 - percentage))
98
99 def get_image_sync_status(self, boot_images=None):
100 """Return the status of the boot image import process."""
101diff --git a/src/maasserver/models/tests/test_node.py b/src/maasserver/models/tests/test_node.py
102index 0467bba..e8ece11 100644
103--- a/src/maasserver/models/tests/test_node.py
104+++ b/src/maasserver/models/tests/test_node.py
105@@ -9495,7 +9495,7 @@ class TestRackController(MAASTransactionServerTestCase):
106 MatchesStructure.byEquality(
107 status=SERVICE_STATUS.DEGRADED, status_info=(
108 "{:.0%} connected to region controllers.".format(
109- percentage))))
110+ 1.0 - percentage))))
111
112 fake_images = [
113 {
114diff --git a/src/maasserver/rpc/rackcontrollers.py b/src/maasserver/rpc/rackcontrollers.py
115index 9c49532..7c82548 100644
116--- a/src/maasserver/rpc/rackcontrollers.py
117+++ b/src/maasserver/rpc/rackcontrollers.py
118@@ -72,7 +72,7 @@ def handle_upgrade(rack_controller, nodegroup_uuid):
119
120 @synchronous
121 @with_connection
122-@synchronised(locks.rack_registration)
123+@synchronised(locks.startup)
124 @transactional
125 def register(
126 system_id=None, hostname='', interfaces=None,
127diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py
128index 3ddf50c..534002b 100644
129--- a/src/maasserver/rpc/regionservice.py
130+++ b/src/maasserver/rpc/regionservice.py
131@@ -1214,15 +1214,31 @@ class RegionAdvertising:
132 Each tuple corresponds to somewhere an event-loop is listening
133 within the whole region. The `name` is the event-loop name.
134 """
135+ # Each regiond might be running a local bridge that duplicates the
136+ # same IP address across region controllers. Each region controller
137+ # must output a set of unique of IP addresses, to prevent the rack
138+ # controller from connecting to a different region controller then
139+ # the rack controller was expecting to be connecting to.
140+ def _unique_to_region(address, region, regions):
141+ for region_obj in regions:
142+ if region_obj != region:
143+ for process in region_obj.processes.all():
144+ for endpoint in process.endpoints.all():
145+ if endpoint.address == address:
146+ return False
147+ return True
148+
149 regions = RegionController.objects.all()
150 regions = regions.prefetch_related("processes", "processes__endpoints")
151 all_endpoints = []
152 for region_obj in regions:
153 for process in region_obj.processes.all():
154 for endpoint in process.endpoints.all():
155- all_endpoints.append((
156- "%s:pid=%d" % (region_obj.hostname, process.pid),
157- endpoint.address, endpoint.port))
158+ if _unique_to_region(
159+ endpoint.address, region_obj, regions):
160+ all_endpoints.append((
161+ "%s:pid=%d" % (region_obj.hostname, process.pid),
162+ endpoint.address, endpoint.port))
163 return all_endpoints
164
165 @classmethod
166diff --git a/src/maasserver/rpc/tests/test_rackcontrollers.py b/src/maasserver/rpc/tests/test_rackcontrollers.py
167index 8c62f80..e5b075b 100644
168--- a/src/maasserver/rpc/tests/test_rackcontrollers.py
169+++ b/src/maasserver/rpc/tests/test_rackcontrollers.py
170@@ -288,11 +288,11 @@ class TestRegisterRackController(MAASServerTestCase):
171 for name, interface in interfaces.items()
172 )))
173
174- def test_registers_with_rack_registration_lock_held(self):
175+ def test_registers_with_startup_lock_held(self):
176 lock_status = []
177
178 def record_lock_status(*args):
179- lock_status.append(locks.rack_registration.is_locked())
180+ lock_status.append(locks.startup.is_locked())
181 return None # Simulate that no rack found.
182
183 find = self.patch(rackcontrollers, "find")
184diff --git a/src/maasserver/rpc/tests/test_regionservice.py b/src/maasserver/rpc/tests/test_regionservice.py
185index 0a8016a..12678c5 100644
186--- a/src/maasserver/rpc/tests/test_regionservice.py
187+++ b/src/maasserver/rpc/tests/test_regionservice.py
188@@ -33,6 +33,7 @@ from maasserver.models import (
189 RackController,
190 RegionController,
191 RegionControllerProcess,
192+ RegionControllerProcessEndpoint,
193 RegionRackRPCConnection,
194 Service as ServiceModel,
195 timestampedmodel,
196@@ -1604,6 +1605,42 @@ class TestRegionAdvertising(MAASServerTestCase):
197 ]
198 self.assertItemsEqual(expected, advertising.dump())
199
200+ def test_dump_doesnt_duplicate_ips_across_region_controllers(self):
201+ duplicate_address = factory.make_ipv4_address()
202+ addresses = {
203+ (factory.make_ipv4_address(), factory.pick_port()),
204+ (factory.make_ipv4_address(), factory.pick_port()),
205+ }
206+ advertising = RegionAdvertising.promote()
207+ advertising.update(
208+ addresses.union({(duplicate_address, factory.pick_port()), }))
209+
210+ other_addresses = {
211+ (factory.make_ipv4_address(), factory.pick_port()),
212+ (factory.make_ipv4_address(), factory.pick_port()),
213+ }
214+ other_region = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER)
215+ other_process = RegionControllerProcess.objects.create(
216+ region=other_region, pid=randint(1, 1000))
217+ for address, port in other_addresses:
218+ RegionControllerProcessEndpoint.objects.create(
219+ process=other_process, address=address, port=port)
220+ RegionControllerProcessEndpoint.objects.create(
221+ process=other_process,
222+ address=duplicate_address,
223+ port=factory.pick_port())
224+
225+ expected = [
226+ ("%s:pid=%d" % (gethostname(), os.getpid()), addr, port)
227+ for (addr, port) in addresses
228+ ] + [
229+ ("%s:pid=%d" % (
230+ other_region.hostname, other_process.pid),
231+ addr, port)
232+ for (addr, port) in other_addresses
233+ ]
234+ self.assertItemsEqual(expected, advertising.dump())
235+
236 def test__adds_connection_and_removes_connection(self):
237 advertising = RegionAdvertising.promote()
238 process = advertising.getRegionProcess()
239diff --git a/src/maasserver/utils/orm.py b/src/maasserver/utils/orm.py
240index 2ec0590..40f8a3f 100644
241--- a/src/maasserver/utils/orm.py
242+++ b/src/maasserver/utils/orm.py
243@@ -616,14 +616,25 @@ def connected():
244
245 If there is not yet a connection to the database, this will connect on
246 entry and disconnect on exit. Preexisting connections will be left alone.
247+
248+ If the preexisting connection is not usable it is closed and a new
249+ connection is made.
250 """
251 if connection.connection is None:
252+ connection.close_if_unusable_or_obsolete()
253 connection.ensure_connection()
254 try:
255 yield
256 finally:
257 connection.close()
258+ elif connection.is_usable():
259+ yield
260 else:
261+ # Connection is not usable, so we disconnect and reconnect. Since
262+ # the connection was previously connected we do not disconnect this
263+ # new connection.
264+ connection.close_if_unusable_or_obsolete()
265+ connection.ensure_connection()
266 yield
267
268
269diff --git a/src/maasserver/utils/tests/test_orm.py b/src/maasserver/utils/tests/test_orm.py
270index 064f1b0..94ee817 100644
271--- a/src/maasserver/utils/tests/test_orm.py
272+++ b/src/maasserver/utils/tests/test_orm.py
273@@ -1004,6 +1004,22 @@ class TestConnected(MAASTransactionServerTestCase):
274 self.assertThat(connection.connection, Is(preexisting_connection))
275 self.assertThat(connection.connection, Is(preexisting_connection))
276
277+ def test__disconnects_and_reconnects_if_not_usable(self):
278+ connection.ensure_connection()
279+ preexisting_connection = connection.connection
280+
281+ connection.errors_occurred = True
282+ self.patch(connection, "is_usable").return_value = False
283+
284+ self.assertThat(connection.connection, Not(Is(None)))
285+ with orm.connected():
286+ self.assertThat(
287+ connection.connection, Not(Is(preexisting_connection)))
288+ self.assertThat(connection.connection, Not(Is(None)))
289+
290+ self.assertThat(connection.connection, Not(Is(preexisting_connection)))
291+ self.assertThat(connection.connection, Not(Is(None)))
292+
293
294 class TestWithConnection(MAASTransactionServerTestCase):
295 """Tests for the `orm.with_connection` decorator."""
296diff --git a/src/provisioningserver/rackdservices/tests/test_tftp.py b/src/provisioningserver/rackdservices/tests/test_tftp.py
297index 1082d6a..14510d9 100644
298--- a/src/provisioningserver/rackdservices/tests/test_tftp.py
299+++ b/src/provisioningserver/rackdservices/tests/test_tftp.py
300@@ -379,6 +379,229 @@ class TestTFTPBackend(MAASTestCase):
301 )
302
303 @inlineCallbacks
304+ def test_get_boot_method_reader_uses_same_client(self):
305+ # Fake configuration parameters, as discovered from the file path.
306+ fake_params = {"mac": factory.make_mac_address("-")}
307+ # Fake kernel configuration parameters, as returned from the RPC call.
308+ fake_kernel_params = make_kernel_parameters()
309+ fake_params = fake_kernel_params._asdict()
310+
311+ # Stub the output of list_boot_images so the label is set in the
312+ # kernel parameters.
313+ boot_image = {
314+ "osystem": fake_params["osystem"],
315+ "release": fake_params["release"],
316+ "architecture": fake_params["arch"],
317+ "subarchitecture": fake_params["subarch"],
318+ "purpose": fake_params["purpose"],
319+ "supported_subarches": "",
320+ "label": fake_params["label"],
321+ }
322+ self.patch(tftp_module, "list_boot_images").return_value = [boot_image]
323+ del fake_params["label"]
324+
325+ # Stub RPC call to return the fake configuration parameters.
326+ clients = []
327+ for _ in range(10):
328+ client = Mock()
329+ client.localIdent = factory.make_name("system_id")
330+ client.side_effect = lambda *args, **kwargs: (
331+ succeed(dict(fake_params)))
332+ clients.append(client)
333+ client_service = Mock()
334+ client_service.getClientNow.side_effect = [
335+ succeed(client)
336+ for client in clients
337+ ]
338+ client_service.getAllClients.return_value = clients
339+
340+ # get_boot_method_reader() takes a dict() of parameters and returns an
341+ # `IReader` of a PXE configuration, rendered by
342+ # `PXEBootMethod.get_reader`.
343+ backend = TFTPBackend(
344+ self.make_dir(), client_service)
345+
346+ # Stub get_reader to return the render parameters.
347+ method = PXEBootMethod()
348+ fake_render_result = factory.make_name("render").encode("utf-8")
349+ render_patch = self.patch(method, "get_reader")
350+ render_patch.return_value = BytesReader(fake_render_result)
351+
352+ # Get the reader once.
353+ remote_ip = factory.make_ipv4_address()
354+ params_with_ip = dict(fake_params)
355+ params_with_ip['remote_ip'] = remote_ip
356+ reader = yield backend.get_boot_method_reader(method, params_with_ip)
357+ self.addCleanup(reader.finish)
358+
359+ # Get the reader twice.
360+ params_with_ip = dict(fake_params)
361+ params_with_ip['remote_ip'] = remote_ip
362+ reader = yield backend.get_boot_method_reader(method, params_with_ip)
363+ self.addCleanup(reader.finish)
364+
365+ # Only one client is saved.
366+ self.assertEquals(clients[0], backend.client_to_remote[remote_ip])
367+
368+ # Only the first client should have been called twice, and all the
369+ # other clients should not have been called.
370+ self.assertEquals(2, clients[0].call_count)
371+ for idx in range(1, 10):
372+ self.assertThat(clients[idx], MockNotCalled())
373+
374+ @inlineCallbacks
375+ def test_get_boot_method_reader_uses_different_clients(self):
376+ # Fake configuration parameters, as discovered from the file path.
377+ fake_params = {"mac": factory.make_mac_address("-")}
378+ # Fake kernel configuration parameters, as returned from the RPC call.
379+ fake_kernel_params = make_kernel_parameters()
380+ fake_params = fake_kernel_params._asdict()
381+
382+ # Stub the output of list_boot_images so the label is set in the
383+ # kernel parameters.
384+ boot_image = {
385+ "osystem": fake_params["osystem"],
386+ "release": fake_params["release"],
387+ "architecture": fake_params["arch"],
388+ "subarchitecture": fake_params["subarch"],
389+ "purpose": fake_params["purpose"],
390+ "supported_subarches": "",
391+ "label": fake_params["label"],
392+ }
393+ self.patch(tftp_module, "list_boot_images").return_value = [boot_image]
394+ del fake_params["label"]
395+
396+ # Stub RPC call to return the fake configuration parameters.
397+ clients = []
398+ for _ in range(10):
399+ client = Mock()
400+ client.localIdent = factory.make_name("system_id")
401+ client.side_effect = lambda *args, **kwargs: (
402+ succeed(dict(fake_params)))
403+ clients.append(client)
404+ client_service = Mock()
405+ client_service.getClientNow.side_effect = [
406+ succeed(client)
407+ for client in clients
408+ ]
409+ client_service.getAllClients.return_value = clients
410+
411+ # get_boot_method_reader() takes a dict() of parameters and returns an
412+ # `IReader` of a PXE configuration, rendered by
413+ # `PXEBootMethod.get_reader`.
414+ backend = TFTPBackend(
415+ self.make_dir(), client_service)
416+
417+ # Stub get_reader to return the render parameters.
418+ method = PXEBootMethod()
419+ fake_render_result = factory.make_name("render").encode("utf-8")
420+ render_patch = self.patch(method, "get_reader")
421+ render_patch.return_value = BytesReader(fake_render_result)
422+
423+ # Get the reader once.
424+ remote_ip_one = factory.make_ipv4_address()
425+ params_with_ip = dict(fake_params)
426+ params_with_ip['remote_ip'] = remote_ip_one
427+ reader = yield backend.get_boot_method_reader(method, params_with_ip)
428+ self.addCleanup(reader.finish)
429+
430+ # Get the reader twice.
431+ remote_ip_two = factory.make_ipv4_address()
432+ params_with_ip = dict(fake_params)
433+ params_with_ip['remote_ip'] = remote_ip_two
434+ reader = yield backend.get_boot_method_reader(method, params_with_ip)
435+ self.addCleanup(reader.finish)
436+
437+ # The both clients are saved.
438+ self.assertEquals(clients[0], backend.client_to_remote[remote_ip_one])
439+ self.assertEquals(clients[1], backend.client_to_remote[remote_ip_two])
440+
441+ # Only the first and second client should have been called once, and
442+ # all the other clients should not have been called.
443+ self.assertEquals(1, clients[0].call_count)
444+ self.assertEquals(1, clients[1].call_count)
445+ for idx in range(2, 10):
446+ self.assertThat(clients[idx], MockNotCalled())
447+
448+ @inlineCallbacks
449+ def test_get_boot_method_reader_grabs_new_client_on_lost_conn(self):
450+ # Fake configuration parameters, as discovered from the file path.
451+ fake_params = {"mac": factory.make_mac_address("-")}
452+ # Fake kernel configuration parameters, as returned from the RPC call.
453+ fake_kernel_params = make_kernel_parameters()
454+ fake_params = fake_kernel_params._asdict()
455+
456+ # Stub the output of list_boot_images so the label is set in the
457+ # kernel parameters.
458+ boot_image = {
459+ "osystem": fake_params["osystem"],
460+ "release": fake_params["release"],
461+ "architecture": fake_params["arch"],
462+ "subarchitecture": fake_params["subarch"],
463+ "purpose": fake_params["purpose"],
464+ "supported_subarches": "",
465+ "label": fake_params["label"],
466+ }
467+ self.patch(tftp_module, "list_boot_images").return_value = [boot_image]
468+ del fake_params["label"]
469+
470+ # Stub RPC call to return the fake configuration parameters.
471+ clients = []
472+ for _ in range(10):
473+ client = Mock()
474+ client.localIdent = factory.make_name("system_id")
475+ client.side_effect = lambda *args, **kwargs: (
476+ succeed(dict(fake_params)))
477+ clients.append(client)
478+ client_service = Mock()
479+ client_service.getClientNow.side_effect = [
480+ succeed(client)
481+ for client in clients
482+ ]
483+ client_service.getAllClients.side_effect = [
484+ clients[1:],
485+ clients[2:],
486+ ]
487+
488+ # get_boot_method_reader() takes a dict() of parameters and returns an
489+ # `IReader` of a PXE configuration, rendered by
490+ # `PXEBootMethod.get_reader`.
491+ backend = TFTPBackend(
492+ self.make_dir(), client_service)
493+
494+ # Stub get_reader to return the render parameters.
495+ method = PXEBootMethod()
496+ fake_render_result = factory.make_name("render").encode("utf-8")
497+ render_patch = self.patch(method, "get_reader")
498+ render_patch.return_value = BytesReader(fake_render_result)
499+
500+ # Get the reader once.
501+ remote_ip = factory.make_ipv4_address()
502+ params_with_ip = dict(fake_params)
503+ params_with_ip['remote_ip'] = remote_ip
504+ reader = yield backend.get_boot_method_reader(method, params_with_ip)
505+ self.addCleanup(reader.finish)
506+
507+ # The first client is now saved.
508+ self.assertEquals(clients[0], backend.client_to_remote[remote_ip])
509+
510+ # Get the reader twice.
511+ params_with_ip = dict(fake_params)
512+ params_with_ip['remote_ip'] = remote_ip
513+ reader = yield backend.get_boot_method_reader(method, params_with_ip)
514+ self.addCleanup(reader.finish)
515+
516+ # The second client is now saved.
517+ self.assertEquals(clients[1], backend.client_to_remote[remote_ip])
518+
519+ # Only the first and second client should have been called once, and
520+ # all the other clients should not have been called.
521+ self.assertEquals(1, clients[0].call_count)
522+ self.assertEquals(1, clients[1].call_count)
523+ for idx in range(2, 10):
524+ self.assertThat(clients[idx], MockNotCalled())
525+
526+ @inlineCallbacks
527 def test_get_boot_method_reader_returns_rendered_params(self):
528 # Fake configuration parameters, as discovered from the file path.
529 fake_params = {"mac": factory.make_mac_address("-")}
530diff --git a/src/provisioningserver/rackdservices/tftp.py b/src/provisioningserver/rackdservices/tftp.py
531index b56d4b3..5dc1547 100644
532--- a/src/provisioningserver/rackdservices/tftp.py
533+++ b/src/provisioningserver/rackdservices/tftp.py
534@@ -66,6 +66,7 @@ from twisted.internet.defer import (
535 inlineCallbacks,
536 maybeDeferred,
537 returnValue,
538+ succeed,
539 )
540 from twisted.internet.task import deferLater
541 from twisted.python.filepath import FilePath
542@@ -163,9 +164,48 @@ class TFTPBackend(FilesystemSynchronousBackend):
543 base_path = FilePath(base_path)
544 super(TFTPBackend, self).__init__(
545 base_path, can_read=True, can_write=False)
546+ self.client_to_remote = {}
547 self.client_service = client_service
548 self.fetcher = RPCFetcher()
549
550+ def _get_new_client_for_remote(self, remote_ip):
551+ """Return a new client for the `remote_ip`.
552+
553+ Don't use directly called from `get_client_for`.
554+ """
555+ def store_client(client):
556+ self.client_to_remote[remote_ip] = client
557+ return client
558+
559+ d = self.client_service.getClientNow()
560+ d.addCallback(store_client)
561+ return d
562+
563+ def get_client_for(self, params):
564+ """Always gets the same client based on `params`.
565+
566+ This is done so that all TFTP requests from the same remote client go
567+ to the same regiond process. `RPCFetcher` only duplciate on the client
568+ and arguments, so if the client is not the same the duplicate effort
569+ is not consolidated.
570+ """
571+ remote_ip = params.get('remote_ip')
572+ if remote_ip:
573+ client = self.client_to_remote.get(remote_ip, None)
574+ if client is None:
575+ # Get a new client for the remote_ip.
576+ return self._get_new_client_for_remote(remote_ip)
577+ else:
578+ # Check that the existing client is still valid.
579+ clients = self.client_service.getAllClients()
580+ if client in clients:
581+ return succeed(client)
582+ else:
583+ del self.client_to_remote[remote_ip]
584+ return self._get_new_client_for_remote(remote_ip)
585+ else:
586+ return self.client_service.getClientNow()
587+
588 @inlineCallbacks
589 @typed
590 def get_boot_method(self, file_name: TFTPPath):
591@@ -243,7 +283,7 @@ class TFTPBackend(FilesystemSynchronousBackend):
592 d.addCallback(lambda data: KernelParameters(**data))
593 return d
594
595- d = self.client_service.getClientNow()
596+ d = self.get_client_for(params)
597 d.addCallback(fetch, params)
598 return d
599
600diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
601index 26e2172..a4ab2ba 100644
602--- a/src/provisioningserver/rpc/clusterservice.py
603+++ b/src/provisioningserver/rpc/clusterservice.py
604@@ -106,6 +106,7 @@ from twisted import web
605 from twisted.application.internet import TimerService
606 from twisted.internet import reactor
607 from twisted.internet.defer import (
608+ DeferredList,
609 inlineCallbacks,
610 maybeDeferred,
611 returnValue,
612@@ -784,6 +785,8 @@ class ClusterClient(Cluster):
613 log.msg("Event-loop '%s' authenticated." % self.ident)
614 registered = yield self.registerRackWithRegion()
615 if registered:
616+ if self.eventloop in self.service.try_connections:
617+ del self.service.try_connections[self.eventloop]
618 self.service.connections[self.eventloop] = self
619 self.ready.set(self.eventloop)
620 else:
621@@ -935,6 +938,8 @@ class ClusterClientService(TimerService, object):
622 super(ClusterClientService, self).__init__(
623 self._calculate_interval(None, None), self._tryUpdate)
624 self.connections = {}
625+ self.try_connections = {}
626+ self._previous_work = (None, None)
627 self.clock = reactor
628
629 # XXX jtv 2014-09-23, bug=1372767: Fix
630@@ -1172,6 +1177,48 @@ class ClusterClientService(TimerService, object):
631 ]
632 for name, addresses in eventloops.items()
633 }
634+
635+ drop, connect = self._calculate_work(eventloops)
636+
637+ # Log fully connected only once. If that state changes then log
638+ # it again. This prevents flooding the log with the same message when
639+ # the state of the connections has not changed.
640+ prev_work, self._previous_work = self._previous_work, (drop, connect)
641+ if len(drop) == 0 and len(connect) == 0:
642+ if prev_work != (drop, connect) and len(eventloops) > 0:
643+ controllers = {
644+ eventloop.split(':')[0]
645+ for eventloop, _ in eventloops.items()
646+ }
647+ log.msg(
648+ "Fully connected to all %d event-loops on all %d "
649+ "region controllers (%s)." % (
650+ len(eventloops), len(controllers),
651+ ', '.join(sorted(controllers))))
652+
653+ # Drop all connections at once, as the are no longer required.
654+ if len(drop) > 0:
655+ log.msg("Dropping connections to event-loops: %s" % (
656+ ', '.join(drop.keys())))
657+ yield DeferredList([
658+ maybeDeferred(self._drop_connection, connection)
659+ for eventloop, connections in drop.items()
660+ for connection in connections
661+ ], consumeErrors=True)
662+
663+ # Make all the new connections to each endpoint at the same time.
664+ if len(connect) > 0:
665+ log.msg("Making connections to event-loops: %s" % (
666+ ', '.join(connect.keys())))
667+ yield DeferredList([
668+ self._make_connections(eventloop, addresses)
669+ for eventloop, addresses in connect.items()
670+ ], consumeErrors=True)
671+
672+ def _calculate_work(self, eventloops):
673+ """Calculate the work that needs to be performed for reconnection."""
674+ drop, connect = {}, {}
675+
676 # Drop connections to event-loops that no longer include one of
677 # this cluster's established connections among its advertised
678 # endpoints. This is most likely to have happened because of
679@@ -1183,23 +1230,20 @@ class ClusterClientService(TimerService, object):
680 if eventloop in self.connections:
681 connection = self.connections[eventloop]
682 if connection.address not in addresses:
683- yield self._drop_connection(connection)
684+ drop[eventloop] = [connection]
685+ if eventloop in self.try_connections:
686+ connection = self.try_connections[eventloop]
687+ if connection.address not in addresses:
688+ drop[eventloop] = [connection]
689+
690 # Create new connections to event-loops that the cluster does
691- # not yet have a connection to. Try each advertised endpoint
692- # (address) in turn until one of them bites.
693+ # not yet have a connection to.
694 for eventloop, addresses in eventloops.items():
695- if eventloop not in self.connections:
696- for address in addresses:
697- try:
698- yield self._make_connection(eventloop, address)
699- except ConnectError as error:
700- host, port = address
701- log.msg("Event-loop %s (%s:%d): %s" % (
702- eventloop, host, port, error))
703- except:
704- log.err(None, "Failure making new RPC connection.")
705- else:
706- break
707+ if ((eventloop not in self.connections and
708+ eventloop not in self.try_connections) or
709+ eventloop in drop):
710+ connect[eventloop] = addresses
711+
712 # Remove connections to event-loops that are no longer
713 # advertised by the RPC info view. Most likely this means that
714 # the process in which the event-loop is no longer running, but
715@@ -1208,7 +1252,32 @@ class ClusterClientService(TimerService, object):
716 for eventloop in self.connections:
717 if eventloop not in eventloops:
718 connection = self.connections[eventloop]
719- yield self._drop_connection(connection)
720+ drop[eventloop] = [connection]
721+ for eventloop in self.try_connections:
722+ if eventloop not in eventloops:
723+ connection = self.try_connections[eventloop]
724+ drop[eventloop] = [connection]
725+
726+ return drop, connect
727+
728+ @inlineCallbacks
729+ def _make_connections(self, eventloop, addresses):
730+ """Connect to `eventloop` using all `addresses`."""
731+ for address in addresses:
732+ try:
733+ connection = yield self._make_connection(eventloop, address)
734+ except ConnectError as error:
735+ host, port = address
736+ log.msg("Event-loop %s (%s:%d): %s" % (
737+ eventloop, host, port, error))
738+ except:
739+ host, port = address
740+ log.err(None, (
741+ "Failure with event-loop %s (%s:%d)" % (
742+ eventloop, host, port)))
743+ else:
744+ self.try_connections[eventloop] = connection
745+ break
746
747 def _make_connection(self, eventloop, address):
748 """Connect to `eventloop` at `address`."""
749@@ -1227,6 +1296,9 @@ class ClusterClientService(TimerService, object):
750 If this is the last connection that was keeping rackd connected to
751 a regiond then dhcpd and dhcpd6 services will be turned off.
752 """
753+ if eventloop in self.try_connections:
754+ if self.try_connections[eventloop] is connection:
755+ del self.try_connections[eventloop]
756 if eventloop in self.connections:
757 if self.connections[eventloop] is connection:
758 del self.connections[eventloop]
759@@ -1242,7 +1314,7 @@ class ClusterClientService(TimerService, object):
760 dhcp_v6.off()
761 stopping_services.append("dhcpd6")
762 if len(stopping_services) > 0:
763- maaslog.error(
764+ log.msg(
765 "Lost all connections to region controllers. "
766 "Stopping service(s) %s." % ",".join(stopping_services))
767 service_monitor.ensureServices()
768diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
769index e12a107..f8177f4 100644
770--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
771+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
772@@ -613,7 +613,9 @@ class TestClusterClientService(MAASTestCase):
773 @inlineCallbacks
774 def test__update_connections_initially(self):
775 service = ClusterClientService(Clock())
776+ mock_client = Mock()
777 _make_connection = self.patch(service, "_make_connection")
778+ _make_connection.side_effect = lambda *args: succeed(mock_client)
779 _drop_connection = self.patch(service, "_drop_connection")
780
781 info = json.loads(self.example_rpc_info_view_response.decode("ascii"))
782@@ -627,10 +629,40 @@ class TestClusterClientService(MAASTestCase):
783 self.assertItemsEqual(
784 _make_connection_expected,
785 _make_connection.call_args_list)
786+ self.assertEquals({
787+ "host1:pid=1001": mock_client,
788+ "host1:pid=2002": mock_client,
789+ "host2:pid=3003": mock_client,
790+ }, service.try_connections)
791
792 self.assertEqual([], _drop_connection.mock_calls)
793
794 @inlineCallbacks
795+ def test__update_connections_logs_fully_connected(self):
796+ service = ClusterClientService(Clock())
797+ eventloops = {
798+ "region1:123": [("::ffff:127.0.0.1", 1234)],
799+ "region1:124": [("::ffff:127.0.0.1", 1235)],
800+ "region2:123": [("::ffff:127.0.0.2", 1234)],
801+ "region2:124": [("::ffff:127.0.0.2", 1235)],
802+ }
803+ for eventloop, addresses in eventloops.items():
804+ for address in addresses:
805+ client = Mock()
806+ client.address = address
807+ service.connections[eventloop] = client
808+
809+ logger = self.useFixture(TwistedLoggerFixture())
810+
811+ yield service._update_connections(eventloops)
812+ # Second call should not add it to the log.
813+ yield service._update_connections(eventloops)
814+
815+ self.assertEqual(
816+ "Fully connected to all 4 event-loops on all 2 region "
817+ "controllers (region1, region2).", logger.dump())
818+
819+ @inlineCallbacks
820 def test__update_connections_connect_error_is_logged_tersely(self):
821 service = ClusterClientService(Clock())
822 _make_connection = self.patch(service, "_make_connection")
823@@ -646,6 +678,8 @@ class TestClusterClientService(MAASTestCase):
824 MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234)))
825
826 self.assertEqual(
827+ "Making connections to event-loops: an-event-loop\n"
828+ "---\n"
829 "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection "
830 "was refused by other side.", logger.dump())
831
832@@ -666,7 +700,9 @@ class TestClusterClientService(MAASTestCase):
833
834 self.assertDocTestMatches(
835 """\
836- Failure making new RPC connection.
837+ Making connections to event-loops: an-event-loop
838+ ---
839+ Failure with event-loop an-event-loop (::ffff:127.0.0.1:1234)
840 Traceback (most recent call last):
841 ...
842 builtins.RuntimeError: Something went wrong.
843@@ -774,6 +810,15 @@ class TestClusterClientService(MAASTestCase):
844 connection.transport.loseConnection,
845 MockCalledOnceWith())
846
847+ def test__remove_connection_removes_from_try_connections(self):
848+ service = make_inert_client_service()
849+ service.startService()
850+ endpoint = Mock()
851+ connection = Mock()
852+ service.try_connections[endpoint] = connection
853+ service.remove_connection(endpoint, connection)
854+ self.assertThat(service.try_connections, Equals({}))
855+
856 def test__remove_connection_removes_from_connections(self):
857 service = make_inert_client_service()
858 service.startService()
859@@ -1040,6 +1085,7 @@ class TestClusterClient(MAASTestCase):
860
861 def test_connecting(self):
862 client = self.make_running_client()
863+ client.service.try_connections[client.eventloop] = client
864 self.patch_authenticate_for_success(client)
865 self.patch_register_for_success(client)
866 self.assertEqual(client.service.connections, {})
867@@ -1053,6 +1099,7 @@ class TestClusterClient(MAASTestCase):
868 self.assertTrue(extract_result(wait_for_authenticated))
869 # ready has been set with the name of the event-loop.
870 self.assertEqual(client.eventloop, extract_result(wait_for_ready))
871+ self.assertEqual(client.service.try_connections, {})
872 self.assertEqual(
873 client.service.connections,
874 {client.eventloop: client})

Subscribers

People subscribed via source and target branches