Merge ~blake-rouse/maas:ha-fixes-2.2 into maas:2.2

Proposed by Blake Rouse
Status: Merged
Merge reported by: Blake Rouse
Merged at revision: d873fad043e7015d3594d6ec3272cc0f778e52b8
Proposed branch: ~blake-rouse/maas:ha-fixes-2.2
Merge into: maas:2.2
Diff against target: 874 lines (+509/-34)
13 files modified
src/maasserver/locks.py (+9/-6)
src/maasserver/models/node.py (+12/-2)
src/maasserver/models/tests/test_node.py (+1/-1)
src/maasserver/rpc/rackcontrollers.py (+1/-1)
src/maasserver/rpc/regionservice.py (+19/-3)
src/maasserver/rpc/tests/test_rackcontrollers.py (+2/-2)
src/maasserver/rpc/tests/test_regionservice.py (+37/-0)
src/maasserver/utils/orm.py (+11/-0)
src/maasserver/utils/tests/test_orm.py (+16/-0)
src/provisioningserver/rackdservices/tests/test_tftp.py (+223/-0)
src/provisioningserver/rackdservices/tftp.py (+41/-1)
src/provisioningserver/rpc/clusterservice.py (+89/-17)
src/provisioningserver/rpc/tests/test_clusterservice.py (+48/-1)
Reviewer Review Type Date Requested Status
Blake Rouse (community) Approve
Review via email: mp+332862@code.launchpad.net

Commit message

Fixes LP: #1705594, #1724677, #1727073.

Backports of ad6874821711208d0e420ee2168862b59cf16cef, 98d01e01e0064fa62e1cda75363805c50398220d, d388d7ad9e5660973cddd81e09cc52ec4e8e2595, b2f6b7759e6345b1a81f9e9b1b3b6fedfefd84ce from master.

To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) wrote :

Currently running in the CI to verify a good backport.

http://162.213.35.104:8080/job/maas-git-manual/78/console

Revision history for this message
Blake Rouse (blake-rouse) wrote :

Last CI failed because of bad import, fixed and new CI run:

http://162.213.35.104:8080/job/maas-git-manual/79/console

Revision history for this message
Blake Rouse (blake-rouse) wrote :

Passed CI

Self-approving backport.

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :
Revision history for this message
MAAS Lander (maas-lander) wrote :
Revision history for this message
MAAS Lander (maas-lander) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/src/maasserver/locks.py b/src/maasserver/locks.py
index b482f5c..f68539e 100644
--- a/src/maasserver/locks.py
+++ b/src/maasserver/locks.py
@@ -9,7 +9,6 @@ __all__ = [
9 "eventloop",9 "eventloop",
10 "import_images",10 "import_images",
11 "node_acquire",11 "node_acquire",
12 "rack_registration",
13 "security",12 "security",
14 "startup",13 "startup",
15]14]
@@ -19,7 +18,11 @@ from maasserver.utils.dblocks import (
19 DatabaseXactLock,18 DatabaseXactLock,
20)19)
2120
22# Lock around starting-up a MAAS region.21# Lock around starting-up a MAAS region and connection of rack controllers.
22# This can be a problem where a region controller and a rack controller try
23# to create there node objects at the same time. Rack registration also
24# involves populating fabrics, VLANs, and other information that may overlap
25# between rack controller.
23startup = DatabaseLock(1)26startup = DatabaseLock(1)
2427
25# Lock around performing critical security-related operations, like28# Lock around performing critical security-related operations, like
@@ -41,10 +44,10 @@ node_acquire = DatabaseXactLock(7)
41# Lock to help with concurrent allocation of IP addresses.44# Lock to help with concurrent allocation of IP addresses.
42address_allocation = DatabaseLock(8)45address_allocation = DatabaseLock(8)
4346
44# Lock to prevent concurrent registration of rack controllers. This can be a47# Lock used to be used just for rack registration. Because of lp:1705594 this
45# problem because registration involves populating fabrics, VLANs, and other48# was consolidated into the startup lock with the region controller.
46# information that may overlap between rack controller.49# DO NOT USE '9' AGAIN, it is reserved so it doesn't break upgrades.
47rack_registration = DatabaseLock(9)50# rack_registration = DatabaseLock(9)
4851
49# Lock to prevent concurrent network scanning.52# Lock to prevent concurrent network scanning.
50try_active_discovery = DatabaseLock(10).TRY53try_active_discovery = DatabaseLock(10).TRY
diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py
index f6ee6cc..e0c95fe 100644
--- a/src/maasserver/models/node.py
+++ b/src/maasserver/models/node.py
@@ -54,7 +54,10 @@ from django.db.models import (
54)54)
55from django.db.models.query import QuerySet55from django.db.models.query import QuerySet
56from django.shortcuts import get_object_or_40456from django.shortcuts import get_object_or_404
57from maasserver import DefaultMeta57from maasserver import (
58 DefaultMeta,
59 locks,
60)
58from maasserver.clusterrpc.pods import decompose_machine61from maasserver.clusterrpc.pods import decompose_machine
59from maasserver.clusterrpc.power import (62from maasserver.clusterrpc.power import (
60 power_cycle,63 power_cycle,
@@ -143,6 +146,7 @@ from maasserver.storage_layouts import (
143 StorageLayoutError,146 StorageLayoutError,
144 StorageLayoutMissingBootDiskError,147 StorageLayoutMissingBootDiskError,
145)148)
149from maasserver.utils import synchronised
146from maasserver.utils.dns import validate_hostname150from maasserver.utils.dns import validate_hostname
147from maasserver.utils.mac import get_vendor_for_mac151from maasserver.utils.mac import get_vendor_for_mac
148from maasserver.utils.orm import (152from maasserver.utils.orm import (
@@ -151,6 +155,7 @@ from maasserver.utils.orm import (
151 post_commit,155 post_commit,
152 post_commit_do,156 post_commit_do,
153 transactional,157 transactional,
158 with_connection,
154)159)
155from maasserver.utils.threads import (160from maasserver.utils.threads import (
156 callOutToDatabase,161 callOutToDatabase,
@@ -208,6 +213,7 @@ from provisioningserver.utils.twisted import (
208 asynchronous,213 asynchronous,
209 callOut,214 callOut,
210 deferWithTimeout,215 deferWithTimeout,
216 synchronous,
211)217)
212from twisted.internet.defer import (218from twisted.internet.defer import (
213 Deferred,219 Deferred,
@@ -4595,6 +4601,10 @@ class Controller(Node):
4595 for interface in interfaces4601 for interface in interfaces
4596 }4602 }
45974603
4604 @synchronous
4605 @with_connection
4606 @synchronised(locks.startup)
4607 @transactional
4598 def update_interfaces(self, interfaces):4608 def update_interfaces(self, interfaces):
4599 """Update the interfaces attached to the controller.4609 """Update the interfaces attached to the controller.
46004610
@@ -4872,7 +4882,7 @@ class RackController(Controller):
4872 Service.objects.update_service_for(4882 Service.objects.update_service_for(
4873 self, "rackd", SERVICE_STATUS.DEGRADED,4883 self, "rackd", SERVICE_STATUS.DEGRADED,
4874 "{:.0%} connected to region controllers.".format(4884 "{:.0%} connected to region controllers.".format(
4875 percentage))4885 1.0 - percentage))
48764886
4877 def get_image_sync_status(self, boot_images=None):4887 def get_image_sync_status(self, boot_images=None):
4878 """Return the status of the boot image import process."""4888 """Return the status of the boot image import process."""
diff --git a/src/maasserver/models/tests/test_node.py b/src/maasserver/models/tests/test_node.py
index 0467bba..e8ece11 100644
--- a/src/maasserver/models/tests/test_node.py
+++ b/src/maasserver/models/tests/test_node.py
@@ -9495,7 +9495,7 @@ class TestRackController(MAASTransactionServerTestCase):
9495 MatchesStructure.byEquality(9495 MatchesStructure.byEquality(
9496 status=SERVICE_STATUS.DEGRADED, status_info=(9496 status=SERVICE_STATUS.DEGRADED, status_info=(
9497 "{:.0%} connected to region controllers.".format(9497 "{:.0%} connected to region controllers.".format(
9498 percentage))))9498 1.0 - percentage))))
94999499
9500 fake_images = [9500 fake_images = [
9501 {9501 {
diff --git a/src/maasserver/rpc/rackcontrollers.py b/src/maasserver/rpc/rackcontrollers.py
index 9c49532..7c82548 100644
--- a/src/maasserver/rpc/rackcontrollers.py
+++ b/src/maasserver/rpc/rackcontrollers.py
@@ -72,7 +72,7 @@ def handle_upgrade(rack_controller, nodegroup_uuid):
7272
73@synchronous73@synchronous
74@with_connection74@with_connection
75@synchronised(locks.rack_registration)75@synchronised(locks.startup)
76@transactional76@transactional
77def register(77def register(
78 system_id=None, hostname='', interfaces=None,78 system_id=None, hostname='', interfaces=None,
diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py
index 3ddf50c..534002b 100644
--- a/src/maasserver/rpc/regionservice.py
+++ b/src/maasserver/rpc/regionservice.py
@@ -1214,15 +1214,31 @@ class RegionAdvertising:
1214 Each tuple corresponds to somewhere an event-loop is listening1214 Each tuple corresponds to somewhere an event-loop is listening
1215 within the whole region. The `name` is the event-loop name.1215 within the whole region. The `name` is the event-loop name.
1216 """1216 """
1217 # Each regiond might be running a local bridge that duplicates the
1218 # same IP address across region controllers. Each region controller
1219 # must output a set of unique of IP addresses, to prevent the rack
1220 # controller from connecting to a different region controller then
1221 # the rack controller was expecting to be connecting to.
1222 def _unique_to_region(address, region, regions):
1223 for region_obj in regions:
1224 if region_obj != region:
1225 for process in region_obj.processes.all():
1226 for endpoint in process.endpoints.all():
1227 if endpoint.address == address:
1228 return False
1229 return True
1230
1217 regions = RegionController.objects.all()1231 regions = RegionController.objects.all()
1218 regions = regions.prefetch_related("processes", "processes__endpoints")1232 regions = regions.prefetch_related("processes", "processes__endpoints")
1219 all_endpoints = []1233 all_endpoints = []
1220 for region_obj in regions:1234 for region_obj in regions:
1221 for process in region_obj.processes.all():1235 for process in region_obj.processes.all():
1222 for endpoint in process.endpoints.all():1236 for endpoint in process.endpoints.all():
1223 all_endpoints.append((1237 if _unique_to_region(
1224 "%s:pid=%d" % (region_obj.hostname, process.pid),1238 endpoint.address, region_obj, regions):
1225 endpoint.address, endpoint.port))1239 all_endpoints.append((
1240 "%s:pid=%d" % (region_obj.hostname, process.pid),
1241 endpoint.address, endpoint.port))
1226 return all_endpoints1242 return all_endpoints
12271243
1228 @classmethod1244 @classmethod
diff --git a/src/maasserver/rpc/tests/test_rackcontrollers.py b/src/maasserver/rpc/tests/test_rackcontrollers.py
index 8c62f80..e5b075b 100644
--- a/src/maasserver/rpc/tests/test_rackcontrollers.py
+++ b/src/maasserver/rpc/tests/test_rackcontrollers.py
@@ -288,11 +288,11 @@ class TestRegisterRackController(MAASServerTestCase):
288 for name, interface in interfaces.items()288 for name, interface in interfaces.items()
289 )))289 )))
290290
291 def test_registers_with_rack_registration_lock_held(self):291 def test_registers_with_startup_lock_held(self):
292 lock_status = []292 lock_status = []
293293
294 def record_lock_status(*args):294 def record_lock_status(*args):
295 lock_status.append(locks.rack_registration.is_locked())295 lock_status.append(locks.startup.is_locked())
296 return None # Simulate that no rack found.296 return None # Simulate that no rack found.
297297
298 find = self.patch(rackcontrollers, "find")298 find = self.patch(rackcontrollers, "find")
diff --git a/src/maasserver/rpc/tests/test_regionservice.py b/src/maasserver/rpc/tests/test_regionservice.py
index 0a8016a..12678c5 100644
--- a/src/maasserver/rpc/tests/test_regionservice.py
+++ b/src/maasserver/rpc/tests/test_regionservice.py
@@ -33,6 +33,7 @@ from maasserver.models import (
33 RackController,33 RackController,
34 RegionController,34 RegionController,
35 RegionControllerProcess,35 RegionControllerProcess,
36 RegionControllerProcessEndpoint,
36 RegionRackRPCConnection,37 RegionRackRPCConnection,
37 Service as ServiceModel,38 Service as ServiceModel,
38 timestampedmodel,39 timestampedmodel,
@@ -1604,6 +1605,42 @@ class TestRegionAdvertising(MAASServerTestCase):
1604 ]1605 ]
1605 self.assertItemsEqual(expected, advertising.dump())1606 self.assertItemsEqual(expected, advertising.dump())
16061607
1608 def test_dump_doesnt_duplicate_ips_across_region_controllers(self):
1609 duplicate_address = factory.make_ipv4_address()
1610 addresses = {
1611 (factory.make_ipv4_address(), factory.pick_port()),
1612 (factory.make_ipv4_address(), factory.pick_port()),
1613 }
1614 advertising = RegionAdvertising.promote()
1615 advertising.update(
1616 addresses.union({(duplicate_address, factory.pick_port()), }))
1617
1618 other_addresses = {
1619 (factory.make_ipv4_address(), factory.pick_port()),
1620 (factory.make_ipv4_address(), factory.pick_port()),
1621 }
1622 other_region = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER)
1623 other_process = RegionControllerProcess.objects.create(
1624 region=other_region, pid=randint(1, 1000))
1625 for address, port in other_addresses:
1626 RegionControllerProcessEndpoint.objects.create(
1627 process=other_process, address=address, port=port)
1628 RegionControllerProcessEndpoint.objects.create(
1629 process=other_process,
1630 address=duplicate_address,
1631 port=factory.pick_port())
1632
1633 expected = [
1634 ("%s:pid=%d" % (gethostname(), os.getpid()), addr, port)
1635 for (addr, port) in addresses
1636 ] + [
1637 ("%s:pid=%d" % (
1638 other_region.hostname, other_process.pid),
1639 addr, port)
1640 for (addr, port) in other_addresses
1641 ]
1642 self.assertItemsEqual(expected, advertising.dump())
1643
1607 def test__adds_connection_and_removes_connection(self):1644 def test__adds_connection_and_removes_connection(self):
1608 advertising = RegionAdvertising.promote()1645 advertising = RegionAdvertising.promote()
1609 process = advertising.getRegionProcess()1646 process = advertising.getRegionProcess()
diff --git a/src/maasserver/utils/orm.py b/src/maasserver/utils/orm.py
index 2ec0590..40f8a3f 100644
--- a/src/maasserver/utils/orm.py
+++ b/src/maasserver/utils/orm.py
@@ -616,14 +616,25 @@ def connected():
616616
617 If there is not yet a connection to the database, this will connect on617 If there is not yet a connection to the database, this will connect on
618 entry and disconnect on exit. Preexisting connections will be left alone.618 entry and disconnect on exit. Preexisting connections will be left alone.
619
620 If the preexisting connection is not usable it is closed and a new
621 connection is made.
619 """622 """
620 if connection.connection is None:623 if connection.connection is None:
624 connection.close_if_unusable_or_obsolete()
621 connection.ensure_connection()625 connection.ensure_connection()
622 try:626 try:
623 yield627 yield
624 finally:628 finally:
625 connection.close()629 connection.close()
630 elif connection.is_usable():
631 yield
626 else:632 else:
633 # Connection is not usable, so we disconnect and reconnect. Since
634 # the connection was previously connected we do not disconnect this
635 # new connection.
636 connection.close_if_unusable_or_obsolete()
637 connection.ensure_connection()
627 yield638 yield
628639
629640
diff --git a/src/maasserver/utils/tests/test_orm.py b/src/maasserver/utils/tests/test_orm.py
index 064f1b0..94ee817 100644
--- a/src/maasserver/utils/tests/test_orm.py
+++ b/src/maasserver/utils/tests/test_orm.py
@@ -1004,6 +1004,22 @@ class TestConnected(MAASTransactionServerTestCase):
1004 self.assertThat(connection.connection, Is(preexisting_connection))1004 self.assertThat(connection.connection, Is(preexisting_connection))
1005 self.assertThat(connection.connection, Is(preexisting_connection))1005 self.assertThat(connection.connection, Is(preexisting_connection))
10061006
1007 def test__disconnects_and_reconnects_if_not_usable(self):
1008 connection.ensure_connection()
1009 preexisting_connection = connection.connection
1010
1011 connection.errors_occurred = True
1012 self.patch(connection, "is_usable").return_value = False
1013
1014 self.assertThat(connection.connection, Not(Is(None)))
1015 with orm.connected():
1016 self.assertThat(
1017 connection.connection, Not(Is(preexisting_connection)))
1018 self.assertThat(connection.connection, Not(Is(None)))
1019
1020 self.assertThat(connection.connection, Not(Is(preexisting_connection)))
1021 self.assertThat(connection.connection, Not(Is(None)))
1022
10071023
1008class TestWithConnection(MAASTransactionServerTestCase):1024class TestWithConnection(MAASTransactionServerTestCase):
1009 """Tests for the `orm.with_connection` decorator."""1025 """Tests for the `orm.with_connection` decorator."""
diff --git a/src/provisioningserver/rackdservices/tests/test_tftp.py b/src/provisioningserver/rackdservices/tests/test_tftp.py
index 1082d6a..14510d9 100644
--- a/src/provisioningserver/rackdservices/tests/test_tftp.py
+++ b/src/provisioningserver/rackdservices/tests/test_tftp.py
@@ -379,6 +379,229 @@ class TestTFTPBackend(MAASTestCase):
379 )379 )
380380
381 @inlineCallbacks381 @inlineCallbacks
382 def test_get_boot_method_reader_uses_same_client(self):
383 # Fake configuration parameters, as discovered from the file path.
384 fake_params = {"mac": factory.make_mac_address("-")}
385 # Fake kernel configuration parameters, as returned from the RPC call.
386 fake_kernel_params = make_kernel_parameters()
387 fake_params = fake_kernel_params._asdict()
388
389 # Stub the output of list_boot_images so the label is set in the
390 # kernel parameters.
391 boot_image = {
392 "osystem": fake_params["osystem"],
393 "release": fake_params["release"],
394 "architecture": fake_params["arch"],
395 "subarchitecture": fake_params["subarch"],
396 "purpose": fake_params["purpose"],
397 "supported_subarches": "",
398 "label": fake_params["label"],
399 }
400 self.patch(tftp_module, "list_boot_images").return_value = [boot_image]
401 del fake_params["label"]
402
403 # Stub RPC call to return the fake configuration parameters.
404 clients = []
405 for _ in range(10):
406 client = Mock()
407 client.localIdent = factory.make_name("system_id")
408 client.side_effect = lambda *args, **kwargs: (
409 succeed(dict(fake_params)))
410 clients.append(client)
411 client_service = Mock()
412 client_service.getClientNow.side_effect = [
413 succeed(client)
414 for client in clients
415 ]
416 client_service.getAllClients.return_value = clients
417
418 # get_boot_method_reader() takes a dict() of parameters and returns an
419 # `IReader` of a PXE configuration, rendered by
420 # `PXEBootMethod.get_reader`.
421 backend = TFTPBackend(
422 self.make_dir(), client_service)
423
424 # Stub get_reader to return the render parameters.
425 method = PXEBootMethod()
426 fake_render_result = factory.make_name("render").encode("utf-8")
427 render_patch = self.patch(method, "get_reader")
428 render_patch.return_value = BytesReader(fake_render_result)
429
430 # Get the reader once.
431 remote_ip = factory.make_ipv4_address()
432 params_with_ip = dict(fake_params)
433 params_with_ip['remote_ip'] = remote_ip
434 reader = yield backend.get_boot_method_reader(method, params_with_ip)
435 self.addCleanup(reader.finish)
436
437 # Get the reader twice.
438 params_with_ip = dict(fake_params)
439 params_with_ip['remote_ip'] = remote_ip
440 reader = yield backend.get_boot_method_reader(method, params_with_ip)
441 self.addCleanup(reader.finish)
442
443 # Only one client is saved.
444 self.assertEquals(clients[0], backend.client_to_remote[remote_ip])
445
446 # Only the first client should have been called twice, and all the
447 # other clients should not have been called.
448 self.assertEquals(2, clients[0].call_count)
449 for idx in range(1, 10):
450 self.assertThat(clients[idx], MockNotCalled())
451
452 @inlineCallbacks
453 def test_get_boot_method_reader_uses_different_clients(self):
454 # Fake configuration parameters, as discovered from the file path.
455 fake_params = {"mac": factory.make_mac_address("-")}
456 # Fake kernel configuration parameters, as returned from the RPC call.
457 fake_kernel_params = make_kernel_parameters()
458 fake_params = fake_kernel_params._asdict()
459
460 # Stub the output of list_boot_images so the label is set in the
461 # kernel parameters.
462 boot_image = {
463 "osystem": fake_params["osystem"],
464 "release": fake_params["release"],
465 "architecture": fake_params["arch"],
466 "subarchitecture": fake_params["subarch"],
467 "purpose": fake_params["purpose"],
468 "supported_subarches": "",
469 "label": fake_params["label"],
470 }
471 self.patch(tftp_module, "list_boot_images").return_value = [boot_image]
472 del fake_params["label"]
473
474 # Stub RPC call to return the fake configuration parameters.
475 clients = []
476 for _ in range(10):
477 client = Mock()
478 client.localIdent = factory.make_name("system_id")
479 client.side_effect = lambda *args, **kwargs: (
480 succeed(dict(fake_params)))
481 clients.append(client)
482 client_service = Mock()
483 client_service.getClientNow.side_effect = [
484 succeed(client)
485 for client in clients
486 ]
487 client_service.getAllClients.return_value = clients
488
489 # get_boot_method_reader() takes a dict() of parameters and returns an
490 # `IReader` of a PXE configuration, rendered by
491 # `PXEBootMethod.get_reader`.
492 backend = TFTPBackend(
493 self.make_dir(), client_service)
494
495 # Stub get_reader to return the render parameters.
496 method = PXEBootMethod()
497 fake_render_result = factory.make_name("render").encode("utf-8")
498 render_patch = self.patch(method, "get_reader")
499 render_patch.return_value = BytesReader(fake_render_result)
500
501 # Get the reader once.
502 remote_ip_one = factory.make_ipv4_address()
503 params_with_ip = dict(fake_params)
504 params_with_ip['remote_ip'] = remote_ip_one
505 reader = yield backend.get_boot_method_reader(method, params_with_ip)
506 self.addCleanup(reader.finish)
507
508 # Get the reader twice.
509 remote_ip_two = factory.make_ipv4_address()
510 params_with_ip = dict(fake_params)
511 params_with_ip['remote_ip'] = remote_ip_two
512 reader = yield backend.get_boot_method_reader(method, params_with_ip)
513 self.addCleanup(reader.finish)
514
515 # The both clients are saved.
516 self.assertEquals(clients[0], backend.client_to_remote[remote_ip_one])
517 self.assertEquals(clients[1], backend.client_to_remote[remote_ip_two])
518
519 # Only the first and second client should have been called once, and
520 # all the other clients should not have been called.
521 self.assertEquals(1, clients[0].call_count)
522 self.assertEquals(1, clients[1].call_count)
523 for idx in range(2, 10):
524 self.assertThat(clients[idx], MockNotCalled())
525
526 @inlineCallbacks
527 def test_get_boot_method_reader_grabs_new_client_on_lost_conn(self):
528 # Fake configuration parameters, as discovered from the file path.
529 fake_params = {"mac": factory.make_mac_address("-")}
530 # Fake kernel configuration parameters, as returned from the RPC call.
531 fake_kernel_params = make_kernel_parameters()
532 fake_params = fake_kernel_params._asdict()
533
534 # Stub the output of list_boot_images so the label is set in the
535 # kernel parameters.
536 boot_image = {
537 "osystem": fake_params["osystem"],
538 "release": fake_params["release"],
539 "architecture": fake_params["arch"],
540 "subarchitecture": fake_params["subarch"],
541 "purpose": fake_params["purpose"],
542 "supported_subarches": "",
543 "label": fake_params["label"],
544 }
545 self.patch(tftp_module, "list_boot_images").return_value = [boot_image]
546 del fake_params["label"]
547
548 # Stub RPC call to return the fake configuration parameters.
549 clients = []
550 for _ in range(10):
551 client = Mock()
552 client.localIdent = factory.make_name("system_id")
553 client.side_effect = lambda *args, **kwargs: (
554 succeed(dict(fake_params)))
555 clients.append(client)
556 client_service = Mock()
557 client_service.getClientNow.side_effect = [
558 succeed(client)
559 for client in clients
560 ]
561 client_service.getAllClients.side_effect = [
562 clients[1:],
563 clients[2:],
564 ]
565
566 # get_boot_method_reader() takes a dict() of parameters and returns an
567 # `IReader` of a PXE configuration, rendered by
568 # `PXEBootMethod.get_reader`.
569 backend = TFTPBackend(
570 self.make_dir(), client_service)
571
572 # Stub get_reader to return the render parameters.
573 method = PXEBootMethod()
574 fake_render_result = factory.make_name("render").encode("utf-8")
575 render_patch = self.patch(method, "get_reader")
576 render_patch.return_value = BytesReader(fake_render_result)
577
578 # Get the reader once.
579 remote_ip = factory.make_ipv4_address()
580 params_with_ip = dict(fake_params)
581 params_with_ip['remote_ip'] = remote_ip
582 reader = yield backend.get_boot_method_reader(method, params_with_ip)
583 self.addCleanup(reader.finish)
584
585 # The first client is now saved.
586 self.assertEquals(clients[0], backend.client_to_remote[remote_ip])
587
588 # Get the reader twice.
589 params_with_ip = dict(fake_params)
590 params_with_ip['remote_ip'] = remote_ip
591 reader = yield backend.get_boot_method_reader(method, params_with_ip)
592 self.addCleanup(reader.finish)
593
594 # The second client is now saved.
595 self.assertEquals(clients[1], backend.client_to_remote[remote_ip])
596
597 # Only the first and second client should have been called once, and
598 # all the other clients should not have been called.
599 self.assertEquals(1, clients[0].call_count)
600 self.assertEquals(1, clients[1].call_count)
601 for idx in range(2, 10):
602 self.assertThat(clients[idx], MockNotCalled())
603
604 @inlineCallbacks
382 def test_get_boot_method_reader_returns_rendered_params(self):605 def test_get_boot_method_reader_returns_rendered_params(self):
383 # Fake configuration parameters, as discovered from the file path.606 # Fake configuration parameters, as discovered from the file path.
384 fake_params = {"mac": factory.make_mac_address("-")}607 fake_params = {"mac": factory.make_mac_address("-")}
diff --git a/src/provisioningserver/rackdservices/tftp.py b/src/provisioningserver/rackdservices/tftp.py
index b56d4b3..5dc1547 100644
--- a/src/provisioningserver/rackdservices/tftp.py
+++ b/src/provisioningserver/rackdservices/tftp.py
@@ -66,6 +66,7 @@ from twisted.internet.defer import (
66 inlineCallbacks,66 inlineCallbacks,
67 maybeDeferred,67 maybeDeferred,
68 returnValue,68 returnValue,
69 succeed,
69)70)
70from twisted.internet.task import deferLater71from twisted.internet.task import deferLater
71from twisted.python.filepath import FilePath72from twisted.python.filepath import FilePath
@@ -163,9 +164,48 @@ class TFTPBackend(FilesystemSynchronousBackend):
163 base_path = FilePath(base_path)164 base_path = FilePath(base_path)
164 super(TFTPBackend, self).__init__(165 super(TFTPBackend, self).__init__(
165 base_path, can_read=True, can_write=False)166 base_path, can_read=True, can_write=False)
167 self.client_to_remote = {}
166 self.client_service = client_service168 self.client_service = client_service
167 self.fetcher = RPCFetcher()169 self.fetcher = RPCFetcher()
168170
171 def _get_new_client_for_remote(self, remote_ip):
172 """Return a new client for the `remote_ip`.
173
174 Don't use directly called from `get_client_for`.
175 """
176 def store_client(client):
177 self.client_to_remote[remote_ip] = client
178 return client
179
180 d = self.client_service.getClientNow()
181 d.addCallback(store_client)
182 return d
183
184 def get_client_for(self, params):
185 """Always gets the same client based on `params`.
186
187 This is done so that all TFTP requests from the same remote client go
188 to the same regiond process. `RPCFetcher` only duplciate on the client
189 and arguments, so if the client is not the same the duplicate effort
190 is not consolidated.
191 """
192 remote_ip = params.get('remote_ip')
193 if remote_ip:
194 client = self.client_to_remote.get(remote_ip, None)
195 if client is None:
196 # Get a new client for the remote_ip.
197 return self._get_new_client_for_remote(remote_ip)
198 else:
199 # Check that the existing client is still valid.
200 clients = self.client_service.getAllClients()
201 if client in clients:
202 return succeed(client)
203 else:
204 del self.client_to_remote[remote_ip]
205 return self._get_new_client_for_remote(remote_ip)
206 else:
207 return self.client_service.getClientNow()
208
169 @inlineCallbacks209 @inlineCallbacks
170 @typed210 @typed
171 def get_boot_method(self, file_name: TFTPPath):211 def get_boot_method(self, file_name: TFTPPath):
@@ -243,7 +283,7 @@ class TFTPBackend(FilesystemSynchronousBackend):
243 d.addCallback(lambda data: KernelParameters(**data))283 d.addCallback(lambda data: KernelParameters(**data))
244 return d284 return d
245285
246 d = self.client_service.getClientNow()286 d = self.get_client_for(params)
247 d.addCallback(fetch, params)287 d.addCallback(fetch, params)
248 return d288 return d
249289
diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
index 26e2172..a4ab2ba 100644
--- a/src/provisioningserver/rpc/clusterservice.py
+++ b/src/provisioningserver/rpc/clusterservice.py
@@ -106,6 +106,7 @@ from twisted import web
106from twisted.application.internet import TimerService106from twisted.application.internet import TimerService
107from twisted.internet import reactor107from twisted.internet import reactor
108from twisted.internet.defer import (108from twisted.internet.defer import (
109 DeferredList,
109 inlineCallbacks,110 inlineCallbacks,
110 maybeDeferred,111 maybeDeferred,
111 returnValue,112 returnValue,
@@ -784,6 +785,8 @@ class ClusterClient(Cluster):
784 log.msg("Event-loop '%s' authenticated." % self.ident)785 log.msg("Event-loop '%s' authenticated." % self.ident)
785 registered = yield self.registerRackWithRegion()786 registered = yield self.registerRackWithRegion()
786 if registered:787 if registered:
788 if self.eventloop in self.service.try_connections:
789 del self.service.try_connections[self.eventloop]
787 self.service.connections[self.eventloop] = self790 self.service.connections[self.eventloop] = self
788 self.ready.set(self.eventloop)791 self.ready.set(self.eventloop)
789 else:792 else:
@@ -935,6 +938,8 @@ class ClusterClientService(TimerService, object):
935 super(ClusterClientService, self).__init__(938 super(ClusterClientService, self).__init__(
936 self._calculate_interval(None, None), self._tryUpdate)939 self._calculate_interval(None, None), self._tryUpdate)
937 self.connections = {}940 self.connections = {}
941 self.try_connections = {}
942 self._previous_work = (None, None)
938 self.clock = reactor943 self.clock = reactor
939944
940 # XXX jtv 2014-09-23, bug=1372767: Fix945 # XXX jtv 2014-09-23, bug=1372767: Fix
@@ -1172,6 +1177,48 @@ class ClusterClientService(TimerService, object):
1172 ]1177 ]
1173 for name, addresses in eventloops.items()1178 for name, addresses in eventloops.items()
1174 }1179 }
1180
1181 drop, connect = self._calculate_work(eventloops)
1182
1183 # Log fully connected only once. If that state changes then log
1184 # it again. This prevents flooding the log with the same message when
1185 # the state of the connections has not changed.
1186 prev_work, self._previous_work = self._previous_work, (drop, connect)
1187 if len(drop) == 0 and len(connect) == 0:
1188 if prev_work != (drop, connect) and len(eventloops) > 0:
1189 controllers = {
1190 eventloop.split(':')[0]
1191 for eventloop, _ in eventloops.items()
1192 }
1193 log.msg(
1194 "Fully connected to all %d event-loops on all %d "
1195 "region controllers (%s)." % (
1196 len(eventloops), len(controllers),
1197 ', '.join(sorted(controllers))))
1198
1199 # Drop all connections at once, as the are no longer required.
1200 if len(drop) > 0:
1201 log.msg("Dropping connections to event-loops: %s" % (
1202 ', '.join(drop.keys())))
1203 yield DeferredList([
1204 maybeDeferred(self._drop_connection, connection)
1205 for eventloop, connections in drop.items()
1206 for connection in connections
1207 ], consumeErrors=True)
1208
1209 # Make all the new connections to each endpoint at the same time.
1210 if len(connect) > 0:
1211 log.msg("Making connections to event-loops: %s" % (
1212 ', '.join(connect.keys())))
1213 yield DeferredList([
1214 self._make_connections(eventloop, addresses)
1215 for eventloop, addresses in connect.items()
1216 ], consumeErrors=True)
1217
1218 def _calculate_work(self, eventloops):
1219 """Calculate the work that needs to be performed for reconnection."""
1220 drop, connect = {}, {}
1221
1175 # Drop connections to event-loops that no longer include one of1222 # Drop connections to event-loops that no longer include one of
1176 # this cluster's established connections among its advertised1223 # this cluster's established connections among its advertised
1177 # endpoints. This is most likely to have happened because of1224 # endpoints. This is most likely to have happened because of
@@ -1183,23 +1230,20 @@ class ClusterClientService(TimerService, object):
1183 if eventloop in self.connections:1230 if eventloop in self.connections:
1184 connection = self.connections[eventloop]1231 connection = self.connections[eventloop]
1185 if connection.address not in addresses:1232 if connection.address not in addresses:
1186 yield self._drop_connection(connection)1233 drop[eventloop] = [connection]
1234 if eventloop in self.try_connections:
1235 connection = self.try_connections[eventloop]
1236 if connection.address not in addresses:
1237 drop[eventloop] = [connection]
1238
1187 # Create new connections to event-loops that the cluster does1239 # Create new connections to event-loops that the cluster does
1188 # not yet have a connection to. Try each advertised endpoint1240 # not yet have a connection to.
1189 # (address) in turn until one of them bites.
1190 for eventloop, addresses in eventloops.items():1241 for eventloop, addresses in eventloops.items():
1191 if eventloop not in self.connections:1242 if ((eventloop not in self.connections and
1192 for address in addresses:1243 eventloop not in self.try_connections) or
1193 try:1244 eventloop in drop):
1194 yield self._make_connection(eventloop, address)1245 connect[eventloop] = addresses
1195 except ConnectError as error:1246
1196 host, port = address
1197 log.msg("Event-loop %s (%s:%d): %s" % (
1198 eventloop, host, port, error))
1199 except:
1200 log.err(None, "Failure making new RPC connection.")
1201 else:
1202 break
1203 # Remove connections to event-loops that are no longer1247 # Remove connections to event-loops that are no longer
1204 # advertised by the RPC info view. Most likely this means that1248 # advertised by the RPC info view. Most likely this means that
1205 # the process in which the event-loop is no longer running, but1249 # the process in which the event-loop is no longer running, but
@@ -1208,7 +1252,32 @@ class ClusterClientService(TimerService, object):
1208 for eventloop in self.connections:1252 for eventloop in self.connections:
1209 if eventloop not in eventloops:1253 if eventloop not in eventloops:
1210 connection = self.connections[eventloop]1254 connection = self.connections[eventloop]
1211 yield self._drop_connection(connection)1255 drop[eventloop] = [connection]
1256 for eventloop in self.try_connections:
1257 if eventloop not in eventloops:
1258 connection = self.try_connections[eventloop]
1259 drop[eventloop] = [connection]
1260
1261 return drop, connect
1262
1263 @inlineCallbacks
1264 def _make_connections(self, eventloop, addresses):
1265 """Connect to `eventloop` using all `addresses`."""
1266 for address in addresses:
1267 try:
1268 connection = yield self._make_connection(eventloop, address)
1269 except ConnectError as error:
1270 host, port = address
1271 log.msg("Event-loop %s (%s:%d): %s" % (
1272 eventloop, host, port, error))
1273 except:
1274 host, port = address
1275 log.err(None, (
1276 "Failure with event-loop %s (%s:%d)" % (
1277 eventloop, host, port)))
1278 else:
1279 self.try_connections[eventloop] = connection
1280 break
12121281
1213 def _make_connection(self, eventloop, address):1282 def _make_connection(self, eventloop, address):
1214 """Connect to `eventloop` at `address`."""1283 """Connect to `eventloop` at `address`."""
@@ -1227,6 +1296,9 @@ class ClusterClientService(TimerService, object):
1227 If this is the last connection that was keeping rackd connected to1296 If this is the last connection that was keeping rackd connected to
1228 a regiond then dhcpd and dhcpd6 services will be turned off.1297 a regiond then dhcpd and dhcpd6 services will be turned off.
1229 """1298 """
1299 if eventloop in self.try_connections:
1300 if self.try_connections[eventloop] is connection:
1301 del self.try_connections[eventloop]
1230 if eventloop in self.connections:1302 if eventloop in self.connections:
1231 if self.connections[eventloop] is connection:1303 if self.connections[eventloop] is connection:
1232 del self.connections[eventloop]1304 del self.connections[eventloop]
@@ -1242,7 +1314,7 @@ class ClusterClientService(TimerService, object):
1242 dhcp_v6.off()1314 dhcp_v6.off()
1243 stopping_services.append("dhcpd6")1315 stopping_services.append("dhcpd6")
1244 if len(stopping_services) > 0:1316 if len(stopping_services) > 0:
1245 maaslog.error(1317 log.msg(
1246 "Lost all connections to region controllers. "1318 "Lost all connections to region controllers. "
1247 "Stopping service(s) %s." % ",".join(stopping_services))1319 "Stopping service(s) %s." % ",".join(stopping_services))
1248 service_monitor.ensureServices()1320 service_monitor.ensureServices()
diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
index e12a107..f8177f4 100644
--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
@@ -613,7 +613,9 @@ class TestClusterClientService(MAASTestCase):
613 @inlineCallbacks613 @inlineCallbacks
614 def test__update_connections_initially(self):614 def test__update_connections_initially(self):
615 service = ClusterClientService(Clock())615 service = ClusterClientService(Clock())
616 mock_client = Mock()
616 _make_connection = self.patch(service, "_make_connection")617 _make_connection = self.patch(service, "_make_connection")
618 _make_connection.side_effect = lambda *args: succeed(mock_client)
617 _drop_connection = self.patch(service, "_drop_connection")619 _drop_connection = self.patch(service, "_drop_connection")
618620
619 info = json.loads(self.example_rpc_info_view_response.decode("ascii"))621 info = json.loads(self.example_rpc_info_view_response.decode("ascii"))
@@ -627,10 +629,40 @@ class TestClusterClientService(MAASTestCase):
627 self.assertItemsEqual(629 self.assertItemsEqual(
628 _make_connection_expected,630 _make_connection_expected,
629 _make_connection.call_args_list)631 _make_connection.call_args_list)
632 self.assertEquals({
633 "host1:pid=1001": mock_client,
634 "host1:pid=2002": mock_client,
635 "host2:pid=3003": mock_client,
636 }, service.try_connections)
630637
631 self.assertEqual([], _drop_connection.mock_calls)638 self.assertEqual([], _drop_connection.mock_calls)
632639
633 @inlineCallbacks640 @inlineCallbacks
641 def test__update_connections_logs_fully_connected(self):
642 service = ClusterClientService(Clock())
643 eventloops = {
644 "region1:123": [("::ffff:127.0.0.1", 1234)],
645 "region1:124": [("::ffff:127.0.0.1", 1235)],
646 "region2:123": [("::ffff:127.0.0.2", 1234)],
647 "region2:124": [("::ffff:127.0.0.2", 1235)],
648 }
649 for eventloop, addresses in eventloops.items():
650 for address in addresses:
651 client = Mock()
652 client.address = address
653 service.connections[eventloop] = client
654
655 logger = self.useFixture(TwistedLoggerFixture())
656
657 yield service._update_connections(eventloops)
658 # Second call should not add it to the log.
659 yield service._update_connections(eventloops)
660
661 self.assertEqual(
662 "Fully connected to all 4 event-loops on all 2 region "
663 "controllers (region1, region2).", logger.dump())
664
665 @inlineCallbacks
634 def test__update_connections_connect_error_is_logged_tersely(self):666 def test__update_connections_connect_error_is_logged_tersely(self):
635 service = ClusterClientService(Clock())667 service = ClusterClientService(Clock())
636 _make_connection = self.patch(service, "_make_connection")668 _make_connection = self.patch(service, "_make_connection")
@@ -646,6 +678,8 @@ class TestClusterClientService(MAASTestCase):
646 MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234)))678 MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234)))
647679
648 self.assertEqual(680 self.assertEqual(
681 "Making connections to event-loops: an-event-loop\n"
682 "---\n"
649 "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection "683 "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection "
650 "was refused by other side.", logger.dump())684 "was refused by other side.", logger.dump())
651685
@@ -666,7 +700,9 @@ class TestClusterClientService(MAASTestCase):
666700
667 self.assertDocTestMatches(701 self.assertDocTestMatches(
668 """\702 """\
669 Failure making new RPC connection.703 Making connections to event-loops: an-event-loop
704 ---
705 Failure with event-loop an-event-loop (::ffff:127.0.0.1:1234)
670 Traceback (most recent call last):706 Traceback (most recent call last):
671 ...707 ...
672 builtins.RuntimeError: Something went wrong.708 builtins.RuntimeError: Something went wrong.
@@ -774,6 +810,15 @@ class TestClusterClientService(MAASTestCase):
774 connection.transport.loseConnection,810 connection.transport.loseConnection,
775 MockCalledOnceWith())811 MockCalledOnceWith())
776812
813 def test__remove_connection_removes_from_try_connections(self):
814 service = make_inert_client_service()
815 service.startService()
816 endpoint = Mock()
817 connection = Mock()
818 service.try_connections[endpoint] = connection
819 service.remove_connection(endpoint, connection)
820 self.assertThat(service.try_connections, Equals({}))
821
777 def test__remove_connection_removes_from_connections(self):822 def test__remove_connection_removes_from_connections(self):
778 service = make_inert_client_service()823 service = make_inert_client_service()
779 service.startService()824 service.startService()
@@ -1040,6 +1085,7 @@ class TestClusterClient(MAASTestCase):
10401085
1041 def test_connecting(self):1086 def test_connecting(self):
1042 client = self.make_running_client()1087 client = self.make_running_client()
1088 client.service.try_connections[client.eventloop] = client
1043 self.patch_authenticate_for_success(client)1089 self.patch_authenticate_for_success(client)
1044 self.patch_register_for_success(client)1090 self.patch_register_for_success(client)
1045 self.assertEqual(client.service.connections, {})1091 self.assertEqual(client.service.connections, {})
@@ -1053,6 +1099,7 @@ class TestClusterClient(MAASTestCase):
1053 self.assertTrue(extract_result(wait_for_authenticated))1099 self.assertTrue(extract_result(wait_for_authenticated))
1054 # ready has been set with the name of the event-loop.1100 # ready has been set with the name of the event-loop.
1055 self.assertEqual(client.eventloop, extract_result(wait_for_ready))1101 self.assertEqual(client.eventloop, extract_result(wait_for_ready))
1102 self.assertEqual(client.service.try_connections, {})
1056 self.assertEqual(1103 self.assertEqual(
1057 client.service.connections,1104 client.service.connections,
1058 {client.eventloop: client})1105 {client.eventloop: client})

Subscribers

People subscribed via source and target branches