Merge ~blake-rouse/maas:fix-rack-rpc-conn into maas:master

Proposed by Blake Rouse
Status: Merged
Approved by: Andres Rodriguez
Approved revision: 51df2e0bae0a3336a6496706429c87c7057a3325
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~blake-rouse/maas:fix-rack-rpc-conn
Merge into: maas:master
Diff against target: 395 lines (+195/-23)
6 files modified
src/maasserver/models/node.py (+1/-1)
src/maasserver/models/tests/test_node.py (+1/-1)
src/maasserver/rpc/regionservice.py (+19/-3)
src/maasserver/rpc/tests/test_regionservice.py (+37/-0)
src/provisioningserver/rpc/clusterservice.py (+89/-17)
src/provisioningserver/rpc/tests/test_clusterservice.py (+48/-1)
Reviewer Review Type Date Requested Status
Andres Rodriguez (community) Approve
Review via email: mp+332814@code.launchpad.net

Commit message

Fixes LP: #1727073 - [2.3] rackd — 12% connected to region controllers.

Fix rackd RPC connections to only connect once and in parallel. Fix regiond RPC output to not include IP addresses that duplicate across regions. Fix precentage calculation for rackd connections.

Description of the change

All the code changes in the clusterservice are covered by the test_clusterservice. The try_connections needs covered so that was added to the tests.

I have tested this across a 2 region+rack controllers, with both having a configured virbr0 with IP 192.168.122.1.

To post a comment you must log in.
Revision history for this message
Andres Rodriguez (andreserl) wrote :

codewise looks good, just have a couple of suggestions inline. Are those possible to do?

review: Needs Information
Revision history for this message
Blake Rouse (blake-rouse) :
Revision history for this message
Blake Rouse (blake-rouse) wrote :

Okay its ready for another review. Logging has been updated.

Revision history for this message
Andres Rodriguez (andreserl) :
review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py
index e6078b5..6699d81 100644
--- a/src/maasserver/models/node.py
+++ b/src/maasserver/models/node.py
@@ -5116,7 +5116,7 @@ class RackController(Controller):
5116 Service.objects.update_service_for(5116 Service.objects.update_service_for(
5117 self, "rackd", SERVICE_STATUS.DEGRADED,5117 self, "rackd", SERVICE_STATUS.DEGRADED,
5118 "{:.0%} connected to region controllers.".format(5118 "{:.0%} connected to region controllers.".format(
5119 percentage))5119 1.0 - percentage))
51205120
5121 def get_image_sync_status(self, boot_images=None):5121 def get_image_sync_status(self, boot_images=None):
5122 """Return the status of the boot image import process."""5122 """Return the status of the boot image import process."""
diff --git a/src/maasserver/models/tests/test_node.py b/src/maasserver/models/tests/test_node.py
index c4bcb65..c3e5b09 100644
--- a/src/maasserver/models/tests/test_node.py
+++ b/src/maasserver/models/tests/test_node.py
@@ -9999,7 +9999,7 @@ class TestRackController(MAASTransactionServerTestCase):
9999 MatchesStructure.byEquality(9999 MatchesStructure.byEquality(
10000 status=SERVICE_STATUS.DEGRADED, status_info=(10000 status=SERVICE_STATUS.DEGRADED, status_info=(
10001 "{:.0%} connected to region controllers.".format(10001 "{:.0%} connected to region controllers.".format(
10002 percentage))))10002 1.0 - percentage))))
1000310003
10004 fake_images = [10004 fake_images = [
10005 {10005 {
diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py
index 340a1be..5798823 100644
--- a/src/maasserver/rpc/regionservice.py
+++ b/src/maasserver/rpc/regionservice.py
@@ -1255,15 +1255,31 @@ class RegionAdvertising:
1255 Each tuple corresponds to somewhere an event-loop is listening1255 Each tuple corresponds to somewhere an event-loop is listening
1256 within the whole region. The `name` is the event-loop name.1256 within the whole region. The `name` is the event-loop name.
1257 """1257 """
1258 # Each regiond might be running a local bridge that duplicates the
1259 # same IP address across region controllers. Each region controller
1260 # must output a set of unique of IP addresses, to prevent the rack
1261 # controller from connecting to a different region controller then
1262 # the rack controller was expecting to be connecting to.
1263 def _unique_to_region(address, region, regions):
1264 for region_obj in regions:
1265 if region_obj != region:
1266 for process in region_obj.processes.all():
1267 for endpoint in process.endpoints.all():
1268 if endpoint.address == address:
1269 return False
1270 return True
1271
1258 regions = RegionController.objects.all()1272 regions = RegionController.objects.all()
1259 regions = regions.prefetch_related("processes", "processes__endpoints")1273 regions = regions.prefetch_related("processes", "processes__endpoints")
1260 all_endpoints = []1274 all_endpoints = []
1261 for region_obj in regions:1275 for region_obj in regions:
1262 for process in region_obj.processes.all():1276 for process in region_obj.processes.all():
1263 for endpoint in process.endpoints.all():1277 for endpoint in process.endpoints.all():
1264 all_endpoints.append((1278 if _unique_to_region(
1265 "%s:pid=%d" % (region_obj.hostname, process.pid),1279 endpoint.address, region_obj, regions):
1266 endpoint.address, endpoint.port))1280 all_endpoints.append((
1281 "%s:pid=%d" % (region_obj.hostname, process.pid),
1282 endpoint.address, endpoint.port))
1267 return all_endpoints1283 return all_endpoints
12681284
1269 @classmethod1285 @classmethod
diff --git a/src/maasserver/rpc/tests/test_regionservice.py b/src/maasserver/rpc/tests/test_regionservice.py
index b3caf43..88c1a03 100644
--- a/src/maasserver/rpc/tests/test_regionservice.py
+++ b/src/maasserver/rpc/tests/test_regionservice.py
@@ -33,6 +33,7 @@ from maasserver.models import (
33 RackController,33 RackController,
34 RegionController,34 RegionController,
35 RegionControllerProcess,35 RegionControllerProcess,
36 RegionControllerProcessEndpoint,
36 RegionRackRPCConnection,37 RegionRackRPCConnection,
37 Service as ServiceModel,38 Service as ServiceModel,
38 timestampedmodel,39 timestampedmodel,
@@ -1639,6 +1640,42 @@ class TestRegionAdvertising(MAASServerTestCase):
1639 ]1640 ]
1640 self.assertItemsEqual(expected, advertising.dump())1641 self.assertItemsEqual(expected, advertising.dump())
16411642
1643 def test_dump_doesnt_duplicate_ips_across_region_controllers(self):
1644 duplicate_address = factory.make_ipv4_address()
1645 addresses = {
1646 (factory.make_ipv4_address(), factory.pick_port()),
1647 (factory.make_ipv4_address(), factory.pick_port()),
1648 }
1649 advertising = RegionAdvertising.promote()
1650 advertising.update(
1651 addresses.union({(duplicate_address, factory.pick_port()), }))
1652
1653 other_addresses = {
1654 (factory.make_ipv4_address(), factory.pick_port()),
1655 (factory.make_ipv4_address(), factory.pick_port()),
1656 }
1657 other_region = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER)
1658 other_process = RegionControllerProcess.objects.create(
1659 region=other_region, pid=randint(1, 1000))
1660 for address, port in other_addresses:
1661 RegionControllerProcessEndpoint.objects.create(
1662 process=other_process, address=address, port=port)
1663 RegionControllerProcessEndpoint.objects.create(
1664 process=other_process,
1665 address=duplicate_address,
1666 port=factory.pick_port())
1667
1668 expected = [
1669 ("%s:pid=%d" % (gethostname(), os.getpid()), addr, port)
1670 for (addr, port) in addresses
1671 ] + [
1672 ("%s:pid=%d" % (
1673 other_region.hostname, other_process.pid),
1674 addr, port)
1675 for (addr, port) in other_addresses
1676 ]
1677 self.assertItemsEqual(expected, advertising.dump())
1678
1642 def test__adds_connection_and_removes_connection(self):1679 def test__adds_connection_and_removes_connection(self):
1643 advertising = RegionAdvertising.promote()1680 advertising = RegionAdvertising.promote()
1644 process = advertising.getRegionProcess()1681 process = advertising.getRegionProcess()
diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
index d3e5413..946f516 100644
--- a/src/provisioningserver/rpc/clusterservice.py
+++ b/src/provisioningserver/rpc/clusterservice.py
@@ -116,6 +116,7 @@ from twisted import web
116from twisted.application.internet import TimerService116from twisted.application.internet import TimerService
117from twisted.internet import reactor117from twisted.internet import reactor
118from twisted.internet.defer import (118from twisted.internet.defer import (
119 DeferredList,
119 inlineCallbacks,120 inlineCallbacks,
120 maybeDeferred,121 maybeDeferred,
121 returnValue,122 returnValue,
@@ -841,6 +842,8 @@ class ClusterClient(Cluster):
841 log.msg("Event-loop '%s' authenticated." % self.ident)842 log.msg("Event-loop '%s' authenticated." % self.ident)
842 registered = yield self.registerRackWithRegion()843 registered = yield self.registerRackWithRegion()
843 if registered:844 if registered:
845 if self.eventloop in self.service.try_connections:
846 del self.service.try_connections[self.eventloop]
844 self.service.connections[self.eventloop] = self847 self.service.connections[self.eventloop] = self
845 self.ready.set(self.eventloop)848 self.ready.set(self.eventloop)
846 else:849 else:
@@ -992,6 +995,8 @@ class ClusterClientService(TimerService, object):
992 super(ClusterClientService, self).__init__(995 super(ClusterClientService, self).__init__(
993 self._calculate_interval(None, None), self._tryUpdate)996 self._calculate_interval(None, None), self._tryUpdate)
994 self.connections = {}997 self.connections = {}
998 self.try_connections = {}
999 self._previous_work = (None, None)
995 self.clock = reactor1000 self.clock = reactor
9961001
997 # XXX jtv 2014-09-23, bug=1372767: Fix1002 # XXX jtv 2014-09-23, bug=1372767: Fix
@@ -1229,6 +1234,48 @@ class ClusterClientService(TimerService, object):
1229 ]1234 ]
1230 for name, addresses in eventloops.items()1235 for name, addresses in eventloops.items()
1231 }1236 }
1237
1238 drop, connect = self._calculate_work(eventloops)
1239
1240 # Log fully connected only once. If that state changes then log
1241 # it again. This prevents flooding the log with the same message when
1242 # the state of the connections has not changed.
1243 prev_work, self._previous_work = self._previous_work, (drop, connect)
1244 if len(drop) == 0 and len(connect) == 0:
1245 if prev_work != (drop, connect) and len(eventloops) > 0:
1246 controllers = {
1247 eventloop.split(':')[0]
1248 for eventloop, _ in eventloops.items()
1249 }
1250 log.msg(
1251 "Fully connected to all %d event-loops on all %d "
1252 "region controllers (%s)." % (
1253 len(eventloops), len(controllers),
1254 ', '.join(controllers)))
1255
1256 # Drop all connections at once, as the are no longer required.
1257 if len(drop) > 0:
1258 log.msg("Dropping connections to event-loops: %s" % (
1259 ', '.join(drop.keys())))
1260 yield DeferredList([
1261 maybeDeferred(self._drop_connection, connection)
1262 for eventloop, connections in drop.items()
1263 for connection in connections
1264 ], consumeErrors=True)
1265
1266 # Make all the new connections to each endpoint at the same time.
1267 if len(connect) > 0:
1268 log.msg("Making connections to event-loops: %s" % (
1269 ', '.join(connect.keys())))
1270 yield DeferredList([
1271 self._make_connections(eventloop, addresses)
1272 for eventloop, addresses in connect.items()
1273 ], consumeErrors=True)
1274
1275 def _calculate_work(self, eventloops):
1276 """Calculate the work that needs to be performed for reconnection."""
1277 drop, connect = {}, {}
1278
1232 # Drop connections to event-loops that no longer include one of1279 # Drop connections to event-loops that no longer include one of
1233 # this cluster's established connections among its advertised1280 # this cluster's established connections among its advertised
1234 # endpoints. This is most likely to have happened because of1281 # endpoints. This is most likely to have happened because of
@@ -1240,23 +1287,20 @@ class ClusterClientService(TimerService, object):
1240 if eventloop in self.connections:1287 if eventloop in self.connections:
1241 connection = self.connections[eventloop]1288 connection = self.connections[eventloop]
1242 if connection.address not in addresses:1289 if connection.address not in addresses:
1243 yield self._drop_connection(connection)1290 drop[eventloop] = [connection]
1291 if eventloop in self.try_connections:
1292 connection = self.try_connections[eventloop]
1293 if connection.address not in addresses:
1294 drop[eventloop] = [connection]
1295
1244 # Create new connections to event-loops that the cluster does1296 # Create new connections to event-loops that the cluster does
1245 # not yet have a connection to. Try each advertised endpoint1297 # not yet have a connection to.
1246 # (address) in turn until one of them bites.
1247 for eventloop, addresses in eventloops.items():1298 for eventloop, addresses in eventloops.items():
1248 if eventloop not in self.connections:1299 if ((eventloop not in self.connections and
1249 for address in addresses:1300 eventloop not in self.try_connections) or
1250 try:1301 eventloop in drop):
1251 yield self._make_connection(eventloop, address)1302 connect[eventloop] = addresses
1252 except ConnectError as error:1303
1253 host, port = address
1254 log.msg("Event-loop %s (%s:%d): %s" % (
1255 eventloop, host, port, error))
1256 except:
1257 log.err(None, "Failure making new RPC connection.")
1258 else:
1259 break
1260 # Remove connections to event-loops that are no longer1304 # Remove connections to event-loops that are no longer
1261 # advertised by the RPC info view. Most likely this means that1305 # advertised by the RPC info view. Most likely this means that
1262 # the process in which the event-loop is no longer running, but1306 # the process in which the event-loop is no longer running, but
@@ -1265,7 +1309,32 @@ class ClusterClientService(TimerService, object):
1265 for eventloop in self.connections:1309 for eventloop in self.connections:
1266 if eventloop not in eventloops:1310 if eventloop not in eventloops:
1267 connection = self.connections[eventloop]1311 connection = self.connections[eventloop]
1268 yield self._drop_connection(connection)1312 drop[eventloop] = [connection]
1313 for eventloop in self.try_connections:
1314 if eventloop not in eventloops:
1315 connection = self.try_connections[eventloop]
1316 drop[eventloop] = [connection]
1317
1318 return drop, connect
1319
1320 @inlineCallbacks
1321 def _make_connections(self, eventloop, addresses):
1322 """Connect to `eventloop` using all `addresses`."""
1323 for address in addresses:
1324 try:
1325 connection = yield self._make_connection(eventloop, address)
1326 except ConnectError as error:
1327 host, port = address
1328 log.msg("Event-loop %s (%s:%d): %s" % (
1329 eventloop, host, port, error))
1330 except:
1331 host, port = address
1332 log.err(None, (
1333 "Failure with event-loop %s (%s:%d)" % (
1334 eventloop, host, port)))
1335 else:
1336 self.try_connections[eventloop] = connection
1337 break
12691338
1270 def _make_connection(self, eventloop, address):1339 def _make_connection(self, eventloop, address):
1271 """Connect to `eventloop` at `address`."""1340 """Connect to `eventloop` at `address`."""
@@ -1284,6 +1353,9 @@ class ClusterClientService(TimerService, object):
1284 If this is the last connection that was keeping rackd connected to1353 If this is the last connection that was keeping rackd connected to
1285 a regiond then dhcpd and dhcpd6 services will be turned off.1354 a regiond then dhcpd and dhcpd6 services will be turned off.
1286 """1355 """
1356 if eventloop in self.try_connections:
1357 if self.try_connections[eventloop] is connection:
1358 del self.try_connections[eventloop]
1287 if eventloop in self.connections:1359 if eventloop in self.connections:
1288 if self.connections[eventloop] is connection:1360 if self.connections[eventloop] is connection:
1289 del self.connections[eventloop]1361 del self.connections[eventloop]
@@ -1299,7 +1371,7 @@ class ClusterClientService(TimerService, object):
1299 dhcp_v6.off()1371 dhcp_v6.off()
1300 stopping_services.append("dhcpd6")1372 stopping_services.append("dhcpd6")
1301 if len(stopping_services) > 0:1373 if len(stopping_services) > 0:
1302 maaslog.error(1374 log.msg(
1303 "Lost all connections to region controllers. "1375 "Lost all connections to region controllers. "
1304 "Stopping service(s) %s." % ",".join(stopping_services))1376 "Stopping service(s) %s." % ",".join(stopping_services))
1305 service_monitor.ensureServices()1377 service_monitor.ensureServices()
diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
index 8553d02..9a7c85d 100644
--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
@@ -636,7 +636,9 @@ class TestClusterClientService(MAASTestCase):
636 @inlineCallbacks636 @inlineCallbacks
637 def test__update_connections_initially(self):637 def test__update_connections_initially(self):
638 service = ClusterClientService(Clock())638 service = ClusterClientService(Clock())
639 mock_client = Mock()
639 _make_connection = self.patch(service, "_make_connection")640 _make_connection = self.patch(service, "_make_connection")
641 _make_connection.side_effect = lambda *args: succeed(mock_client)
640 _drop_connection = self.patch(service, "_drop_connection")642 _drop_connection = self.patch(service, "_drop_connection")
641643
642 info = json.loads(self.example_rpc_info_view_response.decode("ascii"))644 info = json.loads(self.example_rpc_info_view_response.decode("ascii"))
@@ -650,10 +652,40 @@ class TestClusterClientService(MAASTestCase):
650 self.assertItemsEqual(652 self.assertItemsEqual(
651 _make_connection_expected,653 _make_connection_expected,
652 _make_connection.call_args_list)654 _make_connection.call_args_list)
655 self.assertEquals({
656 "host1:pid=1001": mock_client,
657 "host1:pid=2002": mock_client,
658 "host2:pid=3003": mock_client,
659 }, service.try_connections)
653660
654 self.assertEqual([], _drop_connection.mock_calls)661 self.assertEqual([], _drop_connection.mock_calls)
655662
656 @inlineCallbacks663 @inlineCallbacks
664 def test__update_connections_logs_fully_connected(self):
665 service = ClusterClientService(Clock())
666 eventloops = {
667 "region1:123": [("::ffff:127.0.0.1", 1234)],
668 "region1:124": [("::ffff:127.0.0.1", 1235)],
669 "region2:123": [("::ffff:127.0.0.2", 1234)],
670 "region2:124": [("::ffff:127.0.0.2", 1235)],
671 }
672 for eventloop, addresses in eventloops.items():
673 for address in addresses:
674 client = Mock()
675 client.address = address
676 service.connections[eventloop] = client
677
678 logger = self.useFixture(TwistedLoggerFixture())
679
680 yield service._update_connections(eventloops)
681 # Second call should not add it to the log.
682 yield service._update_connections(eventloops)
683
684 self.assertEqual(
685 "Fully connected to all 4 event-loops on all 2 region "
686 "controllers (region2, region1).", logger.dump())
687
688 @inlineCallbacks
657 def test__update_connections_connect_error_is_logged_tersely(self):689 def test__update_connections_connect_error_is_logged_tersely(self):
658 service = ClusterClientService(Clock())690 service = ClusterClientService(Clock())
659 _make_connection = self.patch(service, "_make_connection")691 _make_connection = self.patch(service, "_make_connection")
@@ -669,6 +701,8 @@ class TestClusterClientService(MAASTestCase):
669 MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234)))701 MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234)))
670702
671 self.assertEqual(703 self.assertEqual(
704 "Making connections to event-loops: an-event-loop\n"
705 "---\n"
672 "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection "706 "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection "
673 "was refused by other side.", logger.dump())707 "was refused by other side.", logger.dump())
674708
@@ -689,7 +723,9 @@ class TestClusterClientService(MAASTestCase):
689723
690 self.assertDocTestMatches(724 self.assertDocTestMatches(
691 """\725 """\
692 Failure making new RPC connection.726 Making connections to event-loops: an-event-loop
727 ---
728 Failure with event-loop an-event-loop (::ffff:127.0.0.1:1234)
693 Traceback (most recent call last):729 Traceback (most recent call last):
694 ...730 ...
695 builtins.RuntimeError: Something went wrong.731 builtins.RuntimeError: Something went wrong.
@@ -797,6 +833,15 @@ class TestClusterClientService(MAASTestCase):
797 connection.transport.loseConnection,833 connection.transport.loseConnection,
798 MockCalledOnceWith())834 MockCalledOnceWith())
799835
836 def test__remove_connection_removes_from_try_connections(self):
837 service = make_inert_client_service()
838 service.startService()
839 endpoint = Mock()
840 connection = Mock()
841 service.try_connections[endpoint] = connection
842 service.remove_connection(endpoint, connection)
843 self.assertThat(service.try_connections, Equals({}))
844
800 def test__remove_connection_removes_from_connections(self):845 def test__remove_connection_removes_from_connections(self):
801 service = make_inert_client_service()846 service = make_inert_client_service()
802 service.startService()847 service.startService()
@@ -1063,6 +1108,7 @@ class TestClusterClient(MAASTestCase):
10631108
1064 def test_connecting(self):1109 def test_connecting(self):
1065 client = self.make_running_client()1110 client = self.make_running_client()
1111 client.service.try_connections[client.eventloop] = client
1066 self.patch_authenticate_for_success(client)1112 self.patch_authenticate_for_success(client)
1067 self.patch_register_for_success(client)1113 self.patch_register_for_success(client)
1068 self.assertEqual(client.service.connections, {})1114 self.assertEqual(client.service.connections, {})
@@ -1076,6 +1122,7 @@ class TestClusterClient(MAASTestCase):
1076 self.assertTrue(extract_result(wait_for_authenticated))1122 self.assertTrue(extract_result(wait_for_authenticated))
1077 # ready has been set with the name of the event-loop.1123 # ready has been set with the name of the event-loop.
1078 self.assertEqual(client.eventloop, extract_result(wait_for_ready))1124 self.assertEqual(client.eventloop, extract_result(wait_for_ready))
1125 self.assertEqual(client.service.try_connections, {})
1079 self.assertEqual(1126 self.assertEqual(
1080 client.service.connections,1127 client.service.connections,
1081 {client.eventloop: client})1128 {client.eventloop: client})

Subscribers

People subscribed via source and target branches