Merge ~cgrabowski/maas:fix_connection_pool_handshake_race_condition into maas:master

Proposed by Christian Grabowski
Status: Merged
Approved by: Christian Grabowski
Approved revision: cdc506149c60aeed840bf0dd951e41fdf0860a53
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~cgrabowski/maas:fix_connection_pool_handshake_race_condition
Merge into: maas:master
Diff against target: 154 lines (+64/-18)
4 files modified
src/provisioningserver/rpc/clusterservice.py (+1/-1)
src/provisioningserver/rpc/connectionpool.py (+8/-8)
src/provisioningserver/rpc/tests/test_clusterservice.py (+27/-6)
src/provisioningserver/rpc/tests/test_connectionpool.py (+28/-3)
Reviewer Review Type Date Requested Status
MAAS Lander Approve
Alexsander de Souza Approve
Review via email: mp+429170@code.launchpad.net

Commit message

only allow the clusterservice to call add_connection to avoid race condition with rpc handshake

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b fix_connection_pool_handshake_race_condition lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/439/consoleText
COMMIT: 3833aa2744122bff1b69b0cf0b00776f4571a666

review: Needs Fixing
Revision history for this message
Alexsander de Souza (alexsander-souza) wrote :

+1

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b fix_connection_pool_handshake_race_condition lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: cdc506149c60aeed840bf0dd951e41fdf0860a53

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
2index 32d5ad4..3a5045f 100644
3--- a/src/provisioningserver/rpc/clusterservice.py
4+++ b/src/provisioningserver/rpc/clusterservice.py
5@@ -1084,7 +1084,7 @@ class ClusterClient(Cluster):
6 log.msg("Event-loop '%s' authenticated." % self.ident)
7 registered = yield self.registerRackWithRegion()
8 if registered:
9- self.service.add_connection(self.eventloop, self)
10+ yield self.service.add_connection(self.eventloop, self)
11 self.ready.set(self.eventloop)
12 else:
13 self.transport.loseConnection()
14diff --git a/src/provisioningserver/rpc/connectionpool.py b/src/provisioningserver/rpc/connectionpool.py
15index 8023f80..3acaf08 100644
16--- a/src/provisioningserver/rpc/connectionpool.py
17+++ b/src/provisioningserver/rpc/connectionpool.py
18@@ -88,7 +88,6 @@ class ConnectionPool:
19 # spawn an extra connection
20 conn_to_clone = random.choice(list(ev_conns))
21 conn = yield self.connect(ev, conn_to_clone.address)
22- self.connections[ev].append(conn)
23 self.clock.callLater(
24 self._keepalive, self._reap_extra_connection, ev, conn
25 )
26@@ -144,14 +143,15 @@ class ConnectionPool:
27 self.connections[eventloop] = []
28
29 self.connections[eventloop].append(connection)
30-
31 # clone connection to equal num idle connections
32- if self._max_idle_connections - 1 > 0:
33- for _ in range(self._max_idle_connections - 1):
34- extra_conn = yield self.connect(
35- connection.eventloop, connection.address
36- )
37- self.connections[eventloop].append(extra_conn)
38+ idle_limit = self._max_idle_connections - len(
39+ self.connections[eventloop]
40+ )
41+ # if there's room for more and first conn, create more idle conns
42+ if idle_limit > 0 and len(self.connections[eventloop]) == 1:
43+ for _ in range(idle_limit):
44+ # calls to service to add self when handshake is finished
45+ yield self.connect(connection.eventloop, connection.address)
46
47 def remove_connection(self, eventloop, connection):
48 if self.is_staged(eventloop):
49diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
50index cd15e04..ddb0f1c 100644
51--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
52+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
53@@ -1211,19 +1211,26 @@ class TestClusterClientService(MAASTestCase):
54 service._update_saved_rpc_info_state, MockCalledOnceWith()
55 )
56
57+ @inlineCallbacks
58 def test_add_connection_creates_max_idle_connections(self):
59 service = make_inert_client_service(max_idle_conns=2)
60 service.startService()
61 endpoint = Mock()
62 connection = Mock()
63+ connection.eventloop = endpoint
64 connection.address = (":::ffff", 2222)
65- connection2 = Mock()
66- connection.address = (":::ffff", 2222)
67- self.patch(service.connections, "connect").return_value = succeed(
68- connection2
69- )
70+
71+ @inlineCallbacks
72+ def mock_connect(ev, addr):
73+ new_conn = Mock()
74+ new_conn.eventloop = ev
75+ new_conn.address = addr
76+ yield service.add_connection(ev, new_conn)
77+ return new_conn
78+
79+ self.patch(service.connections, "connect").side_effect = mock_connect
80 self.patch_autospec(service, "_update_saved_rpc_info_state")
81- service.add_connection(endpoint, connection)
82+ yield service.add_connection(endpoint, connection)
83 self.assertEqual(
84 len(
85 [
86@@ -1329,11 +1336,25 @@ class TestClusterClientService(MAASTestCase):
87 self.patch(service.connections, "connect").return_value = succeed(
88 FakeConnection()
89 )
90+
91+ # skip the connectionMade logic for this test
92+ @inlineCallbacks
93+ def mock_scale_up_connections():
94+ for ev, conns in service.connections.items():
95+ if len(conns) < service.connections._max_connections:
96+ conn = yield service.connections.connect()
97+ service.connections[ev].append(conn)
98+ break
99+
100+ scale_up = self.patch(service.connections, "scale_up_connections")
101+ scale_up.side_effect = mock_scale_up_connections
102+
103 original_conns = [
104 conn for conns in service.connections.values() for conn in conns
105 ]
106 new_client = yield service.getClientNow()
107 new_conn = new_client._conn
108+ scale_up.assert_called_once()
109 self.assertIsNotNone(new_conn)
110 self.assertNotIn(new_conn, original_conns)
111 self.assertIn(
112diff --git a/src/provisioningserver/rpc/tests/test_connectionpool.py b/src/provisioningserver/rpc/tests/test_connectionpool.py
113index dc06bd4..2f0096c 100644
114--- a/src/provisioningserver/rpc/tests/test_connectionpool.py
115+++ b/src/provisioningserver/rpc/tests/test_connectionpool.py
116@@ -117,10 +117,35 @@ class TestConnectionPool(MAASTestCase):
117 def test_scale_up_connections_adds_a_connection(self):
118 cp = ConnectionPool(Clock(), Mock(), max_conns=2)
119 eventloop = Mock()
120- connection1 = Mock()
121- connection2 = Mock()
122+ address = (factory.make_ip_address(), 5240)
123+ service = Mock()
124+
125+ @inlineCallbacks
126+ def mock_service_add_connection(ev, conn):
127+ yield cp.add_connection(ev, conn)
128+
129+ service.add_connection = mock_service_add_connection
130+
131+ connection1 = ClusterClient(address, eventloop, service)
132+ connection2 = ClusterClient(address, eventloop, service)
133 connect = self.patch(cp, "connect")
134- connect.return_value = succeed(connection2)
135+
136+ @inlineCallbacks
137+ def call_connectionMade(*args, **kwargs):
138+ yield connection2.connectionMade()
139+ return connection2
140+
141+ connect.side_effect = call_connectionMade
142+
143+ authRegion = self.patch(connection2, "authenticateRegion")
144+ authRegion.return_value = succeed(True)
145+ register = self.patch(connection2, "registerRackWithRegion")
146+
147+ def set_ident(*args, **kwargs):
148+ connection2.localIdent = factory.make_name()
149+ return succeed(True)
150+
151+ register.side_effect = set_ident
152 cp[eventloop] = [connection1]
153 cp.scale_up_connections()
154 self.assertCountEqual(cp[eventloop], [connection1, connection2])

Subscribers

People subscribed via source and target branches