Merge ~cgrabowski/maas:fix_connection_pool_handshake_race_condition into maas:master

Proposed by Christian Grabowski
Status: Merged
Approved by: Christian Grabowski
Approved revision: cdc506149c60aeed840bf0dd951e41fdf0860a53
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~cgrabowski/maas:fix_connection_pool_handshake_race_condition
Merge into: maas:master
Diff against target: 154 lines (+64/-18)
4 files modified
src/provisioningserver/rpc/clusterservice.py (+1/-1)
src/provisioningserver/rpc/connectionpool.py (+8/-8)
src/provisioningserver/rpc/tests/test_clusterservice.py (+27/-6)
src/provisioningserver/rpc/tests/test_connectionpool.py (+28/-3)
Reviewer Review Type Date Requested Status
MAAS Lander Approve
Alexsander de Souza Approve
Review via email: mp+429170@code.launchpad.net

Commit message

only allow the clusterservice to call add_connection to avoid race condition with rpc handshake

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b fix_connection_pool_handshake_race_condition lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/439/consoleText
COMMIT: 3833aa2744122bff1b69b0cf0b00776f4571a666

review: Needs Fixing
Revision history for this message
Alexsander de Souza (alexsander-souza) wrote :

+1

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b fix_connection_pool_handshake_race_condition lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: cdc506149c60aeed840bf0dd951e41fdf0860a53

review: Approve

Update scan failed

At least one of the branches involved have failed to scan. You can manually schedule a rescan if required.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
index 32d5ad4..3a5045f 100644
--- a/src/provisioningserver/rpc/clusterservice.py
+++ b/src/provisioningserver/rpc/clusterservice.py
@@ -1084,7 +1084,7 @@ class ClusterClient(Cluster):
1084 log.msg("Event-loop '%s' authenticated." % self.ident)1084 log.msg("Event-loop '%s' authenticated." % self.ident)
1085 registered = yield self.registerRackWithRegion()1085 registered = yield self.registerRackWithRegion()
1086 if registered:1086 if registered:
1087 self.service.add_connection(self.eventloop, self)1087 yield self.service.add_connection(self.eventloop, self)
1088 self.ready.set(self.eventloop)1088 self.ready.set(self.eventloop)
1089 else:1089 else:
1090 self.transport.loseConnection()1090 self.transport.loseConnection()
diff --git a/src/provisioningserver/rpc/connectionpool.py b/src/provisioningserver/rpc/connectionpool.py
index 8023f80..3acaf08 100644
--- a/src/provisioningserver/rpc/connectionpool.py
+++ b/src/provisioningserver/rpc/connectionpool.py
@@ -88,7 +88,6 @@ class ConnectionPool:
88 # spawn an extra connection88 # spawn an extra connection
89 conn_to_clone = random.choice(list(ev_conns))89 conn_to_clone = random.choice(list(ev_conns))
90 conn = yield self.connect(ev, conn_to_clone.address)90 conn = yield self.connect(ev, conn_to_clone.address)
91 self.connections[ev].append(conn)
92 self.clock.callLater(91 self.clock.callLater(
93 self._keepalive, self._reap_extra_connection, ev, conn92 self._keepalive, self._reap_extra_connection, ev, conn
94 )93 )
@@ -144,14 +143,15 @@ class ConnectionPool:
144 self.connections[eventloop] = []143 self.connections[eventloop] = []
145144
146 self.connections[eventloop].append(connection)145 self.connections[eventloop].append(connection)
147
148 # clone connection to equal num idle connections146 # clone connection to equal num idle connections
149 if self._max_idle_connections - 1 > 0:147 idle_limit = self._max_idle_connections - len(
150 for _ in range(self._max_idle_connections - 1):148 self.connections[eventloop]
151 extra_conn = yield self.connect(149 )
152 connection.eventloop, connection.address150 # if there's room for more and first conn, create more idle conns
153 )151 if idle_limit > 0 and len(self.connections[eventloop]) == 1:
154 self.connections[eventloop].append(extra_conn)152 for _ in range(idle_limit):
153 # calls to service to add self when handshake is finished
154 yield self.connect(connection.eventloop, connection.address)
155155
156 def remove_connection(self, eventloop, connection):156 def remove_connection(self, eventloop, connection):
157 if self.is_staged(eventloop):157 if self.is_staged(eventloop):
diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
index cd15e04..ddb0f1c 100644
--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
@@ -1211,19 +1211,26 @@ class TestClusterClientService(MAASTestCase):
1211 service._update_saved_rpc_info_state, MockCalledOnceWith()1211 service._update_saved_rpc_info_state, MockCalledOnceWith()
1212 )1212 )
12131213
1214 @inlineCallbacks
1214 def test_add_connection_creates_max_idle_connections(self):1215 def test_add_connection_creates_max_idle_connections(self):
1215 service = make_inert_client_service(max_idle_conns=2)1216 service = make_inert_client_service(max_idle_conns=2)
1216 service.startService()1217 service.startService()
1217 endpoint = Mock()1218 endpoint = Mock()
1218 connection = Mock()1219 connection = Mock()
1220 connection.eventloop = endpoint
1219 connection.address = (":::ffff", 2222)1221 connection.address = (":::ffff", 2222)
1220 connection2 = Mock()1222
1221 connection.address = (":::ffff", 2222)1223 @inlineCallbacks
1222 self.patch(service.connections, "connect").return_value = succeed(1224 def mock_connect(ev, addr):
1223 connection21225 new_conn = Mock()
1224 )1226 new_conn.eventloop = ev
1227 new_conn.address = addr
1228 yield service.add_connection(ev, new_conn)
1229 return new_conn
1230
1231 self.patch(service.connections, "connect").side_effect = mock_connect
1225 self.patch_autospec(service, "_update_saved_rpc_info_state")1232 self.patch_autospec(service, "_update_saved_rpc_info_state")
1226 service.add_connection(endpoint, connection)1233 yield service.add_connection(endpoint, connection)
1227 self.assertEqual(1234 self.assertEqual(
1228 len(1235 len(
1229 [1236 [
@@ -1329,11 +1336,25 @@ class TestClusterClientService(MAASTestCase):
1329 self.patch(service.connections, "connect").return_value = succeed(1336 self.patch(service.connections, "connect").return_value = succeed(
1330 FakeConnection()1337 FakeConnection()
1331 )1338 )
1339
1340 # skip the connectionMade logic for this test
1341 @inlineCallbacks
1342 def mock_scale_up_connections():
1343 for ev, conns in service.connections.items():
1344 if len(conns) < service.connections._max_connections:
1345 conn = yield service.connections.connect()
1346 service.connections[ev].append(conn)
1347 break
1348
1349 scale_up = self.patch(service.connections, "scale_up_connections")
1350 scale_up.side_effect = mock_scale_up_connections
1351
1332 original_conns = [1352 original_conns = [
1333 conn for conns in service.connections.values() for conn in conns1353 conn for conns in service.connections.values() for conn in conns
1334 ]1354 ]
1335 new_client = yield service.getClientNow()1355 new_client = yield service.getClientNow()
1336 new_conn = new_client._conn1356 new_conn = new_client._conn
1357 scale_up.assert_called_once()
1337 self.assertIsNotNone(new_conn)1358 self.assertIsNotNone(new_conn)
1338 self.assertNotIn(new_conn, original_conns)1359 self.assertNotIn(new_conn, original_conns)
1339 self.assertIn(1360 self.assertIn(
diff --git a/src/provisioningserver/rpc/tests/test_connectionpool.py b/src/provisioningserver/rpc/tests/test_connectionpool.py
index dc06bd4..2f0096c 100644
--- a/src/provisioningserver/rpc/tests/test_connectionpool.py
+++ b/src/provisioningserver/rpc/tests/test_connectionpool.py
@@ -117,10 +117,35 @@ class TestConnectionPool(MAASTestCase):
117 def test_scale_up_connections_adds_a_connection(self):117 def test_scale_up_connections_adds_a_connection(self):
118 cp = ConnectionPool(Clock(), Mock(), max_conns=2)118 cp = ConnectionPool(Clock(), Mock(), max_conns=2)
119 eventloop = Mock()119 eventloop = Mock()
120 connection1 = Mock()120 address = (factory.make_ip_address(), 5240)
121 connection2 = Mock()121 service = Mock()
122
123 @inlineCallbacks
124 def mock_service_add_connection(ev, conn):
125 yield cp.add_connection(ev, conn)
126
127 service.add_connection = mock_service_add_connection
128
129 connection1 = ClusterClient(address, eventloop, service)
130 connection2 = ClusterClient(address, eventloop, service)
122 connect = self.patch(cp, "connect")131 connect = self.patch(cp, "connect")
123 connect.return_value = succeed(connection2)132
133 @inlineCallbacks
134 def call_connectionMade(*args, **kwargs):
135 yield connection2.connectionMade()
136 return connection2
137
138 connect.side_effect = call_connectionMade
139
140 authRegion = self.patch(connection2, "authenticateRegion")
141 authRegion.return_value = succeed(True)
142 register = self.patch(connection2, "registerRackWithRegion")
143
144 def set_ident(*args, **kwargs):
145 connection2.localIdent = factory.make_name()
146 return succeed(True)
147
148 register.side_effect = set_ident
124 cp[eventloop] = [connection1]149 cp[eventloop] = [connection1]
125 cp.scale_up_connections()150 cp.scale_up_connections()
126 self.assertCountEqual(cp[eventloop], [connection1, connection2])151 self.assertCountEqual(cp[eventloop], [connection1, connection2])

Subscribers

People subscribed via source and target branches