Merge ~blake-rouse/maas:fix-1765056 into maas:master

Proposed by Blake Rouse
Status: Merged
Approved by: Blake Rouse
Approved revision: f24b6f5c78542a218c99a7e0479c5e4318c96c25
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~blake-rouse/maas:fix-1765056
Merge into: maas:master
Diff against target: 272 lines (+43/-36)
4 files modified
src/maasserver/ipc.py (+29/-29)
src/maasserver/rpc/regionservice.py (+5/-2)
src/maasserver/rpc/tests/test_regionservice.py (+3/-2)
src/maasserver/tests/test_ipc.py (+6/-3)
Reviewer Review Type Date Requested Status
Andres Rodriguez (community) Approve
Review via email: mp+343520@code.launchpad.net

Commit message

LP: #1765056 - Add a connection ID to all RPC connections made to a region process. Pass that connection ID to the master process so adding and removing of connections can be handled out of order.

To post a comment you must log in.
Revision history for this message
Andres Rodriguez (andreserl) wrote :

lgtm! Just one quick question inline.

review: Approve
Revision history for this message
Blake Rouse (blake-rouse) :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maasserver/ipc.py b/src/maasserver/ipc.py
2index fd0941e..4ae0983 100644
3--- a/src/maasserver/ipc.py
4+++ b/src/maasserver/ipc.py
5@@ -97,6 +97,7 @@ class RPCRegisterConnection(amp.Command):
6
7 arguments = [
8 (b"pid", amp.Integer()),
9+ (b"connid", amp.Unicode()),
10 (b"ident", amp.Unicode()),
11 (b"host", amp.Unicode()),
12 (b"port", amp.Integer()),
13@@ -110,9 +111,7 @@ class RPCUnregisterConnection(amp.Command):
14
15 arguments = [
16 (b"pid", amp.Integer()),
17- (b"ident", amp.Unicode()),
18- (b"host", amp.Unicode()),
19- (b"port", amp.Integer()),
20+ (b"connid", amp.Unicode()),
21 ]
22 response = []
23 errors = []
24@@ -137,17 +136,16 @@ class IPCMaster(RPCProtocol):
25 return {}
26
27 @RPCRegisterConnection.responder
28- def rpc_register_connection(self, pid, ident, host, port):
29+ def rpc_register_connection(self, pid, connid, ident, host, port):
30 """Register worker has connection from RPC client."""
31 self.factory.service.registerWorkerRPCConnection(
32- pid, ident, host, port)
33+ pid, connid, ident, host, port)
34 return {}
35
36 @RPCUnregisterConnection.responder
37- def rpc_unregister_connection(self, pid, ident, host, port):
38+ def rpc_unregister_connection(self, pid, connid):
39 """Unregister worker lost connection from RPC client."""
40- self.factory.service.unregisterWorkerRPCConnection(
41- pid, ident, host, port)
42+ self.factory.service.unregisterWorkerRPCConnection(pid, connid)
43 return {}
44
45
46@@ -385,7 +383,7 @@ class IPCMasterService(service.Service, object):
47 def set_result(result):
48 pid, port = result
49 self.connections[pid]['rpc']['port'] = port
50- self.connections[pid]['rpc']['connections'] = set()
51+ self.connections[pid]['rpc']['connections'] = {}
52 return result
53
54 def log_rpc_open(result):
55@@ -411,27 +409,29 @@ class IPCMasterService(service.Service, object):
56 connection.save(force_update=True)
57 return connection
58
59- def registerWorkerRPCConnection(self, pid, ident, host, port):
60+ def registerWorkerRPCConnection(self, pid, connid, ident, host, port):
61 """Register the worker with `pid` has RPC an RPC connection."""
62 if pid in self.connections:
63
64 @transactional
65- def register_connection(pid, ident, host, port):
66+ def register_connection(pid, connid, ident, host, port):
67 process = self._getProcessObjFor(pid)
68 self._registerConnection(process, ident, host, port)
69- return (pid, ident, host, port)
70+ return (pid, connid, ident, host, port)
71
72 def log_connection(result):
73 pid, conn = result[0], result[1:]
74 log.msg(
75 "Worker pid:%d registered RPC connection to %s." % (
76- pid, conn))
77+ pid, conn[1:]))
78 return conn
79
80 def set_result(conn):
81- self.connections[pid]['rpc']['connections'].add(conn)
82+ connid, conn = conn[0], conn[1:]
83+ self.connections[pid]['rpc']['connections'][connid] = conn
84
85- d = deferToDatabase(register_connection, pid, ident, host, port)
86+ d = deferToDatabase(
87+ register_connection, pid, connid, ident, host, port)
88 d.addCallback(log_connection)
89 d.addCallback(set_result)
90 return d
91@@ -455,31 +455,32 @@ class IPCMasterService(service.Service, object):
92 RegionRackRPCConnection.objects.filter(
93 endpoint=endpoint, rack_controller=rackd).delete()
94
95- def unregisterWorkerRPCConnection(self, pid, ident, host, port):
96+ def unregisterWorkerRPCConnection(self, pid, connid):
97 """Unregister connection for worker with `pid`."""
98 if pid in self.connections:
99 connections = self.connections[pid]['rpc']['connections']
100- conn = (ident, host, port)
101- if conn in connections:
102+ conn = connections.get(connid, None)
103+ if conn is not None:
104
105 @transactional
106- def unregister_connection(pid, ident, host, port):
107+ def unregister_connection(pid, connid, ident, host, port):
108 process = self._getProcessObjFor(pid)
109 self._unregisterConnection(process, ident, host, port)
110- return (pid, ident, host, port)
111+ return (pid, connid, ident, host, port)
112
113 def log_disconnect(result):
114 pid, conn = result[0], result[1:]
115 log.msg(
116 "Worker pid:%d lost RPC connection to %s." % (
117- pid, conn))
118+ pid, conn[1:]))
119 return conn
120
121 def set_result(conn):
122- connections.remove(conn)
123+ connid = conn[0]
124+ connections.pop(connid, None)
125
126 d = deferToDatabase(
127- unregister_connection, pid, ident, host, port)
128+ unregister_connection, pid, connid, *conn)
129 d.addCallback(log_disconnect)
130 d.addCallback(set_result)
131 return d
132@@ -500,7 +501,7 @@ class IPCMasterService(service.Service, object):
133 RegionRackRPCConnection.objects.filter(
134 endpoint__process=process).values_list(
135 "id", flat=True))
136- for ident, host, port in connections:
137+ for _, (ident, host, port) in connections.items():
138 db_conn = self._registerConnection(
139 process, ident, host, port, force_save=False)
140 previous_connection_ids.discard(db_conn.id)
141@@ -685,21 +686,20 @@ class IPCWorkerService(service.Service, object):
142 return d
143
144 @asynchronous
145- def rpcRegisterConnection(self, ident, host, port):
146+ def rpcRegisterConnection(self, connid, ident, host, port):
147 """Register RPC connection on master."""
148 d = self.protocol.get()
149 d.addCallback(
150 lambda protocol: protocol.callRemote(
151 RPCRegisterConnection, pid=os.getpid(),
152- ident=ident, host=host, port=port))
153+ connid=connid, ident=ident, host=host, port=port))
154 return d
155
156 @asynchronous
157- def rpcUnregisterConnection(self, ident, host, port):
158+ def rpcUnregisterConnection(self, connid):
159 """Unregister RPC connection on master."""
160 d = self.protocol.get()
161 d.addCallback(
162 lambda protocol: protocol.callRemote(
163- RPCUnregisterConnection, pid=os.getpid(),
164- ident=ident, host=host, port=port))
165+ RPCUnregisterConnection, pid=os.getpid(), connid=connid))
166 return d
167diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py
168index 510ded4..22efb81 100644
169--- a/src/maasserver/rpc/regionservice.py
170+++ b/src/maasserver/rpc/regionservice.py
171@@ -16,6 +16,7 @@ from socket import (
172 AF_INET,
173 AF_INET6,
174 )
175+import uuid
176
177 from maasserver import eventloop
178 from maasserver.bootresources import get_simplestream_endpoint
179@@ -512,6 +513,7 @@ class RegionServer(Region):
180 """
181
182 factory = None
183+ connid = None
184 ident = None
185 host = None
186 hostIsRemote = False
187@@ -553,7 +555,7 @@ class RegionServer(Region):
188 # convert.
189 pass
190 return self.factory.service.ipcWorker.rpcRegisterConnection(
191- self.ident, self.host.host, self.host.port)
192+ self.connid, self.ident, self.host.host, self.host.port)
193
194 @inlineCallbacks
195 def authenticateCluster(self):
196@@ -654,6 +656,7 @@ class RegionServer(Region):
197
198 def connectionMade(self):
199 super(RegionServer, self).connectionMade()
200+ self.connid = str(uuid.uuid4())
201 if self.factory.service.running:
202 return self.performHandshake().addErrback(self.handshakeFailed)
203 else:
204@@ -662,7 +665,7 @@ class RegionServer(Region):
205 def connectionLost(self, reason):
206 if self.hostIsRemote:
207 d = self.factory.service.ipcWorker.rpcUnregisterConnection(
208- self.ident, self.host.host, self.host.port)
209+ self.connid)
210 d.addErrback(
211 log.err,
212 "Failed to unregister the connection with the master.")
213diff --git a/src/maasserver/rpc/tests/test_regionservice.py b/src/maasserver/rpc/tests/test_regionservice.py
214index 2b919a9..e4bec4b 100644
215--- a/src/maasserver/rpc/tests/test_regionservice.py
216+++ b/src/maasserver/rpc/tests/test_regionservice.py
217@@ -164,7 +164,7 @@ class TestRegionServer(MAASTransactionServerTestCase):
218
219 protocol.connectionLost(reason=None)
220 self.assertThat(ipcWorker.rpcUnregisterConnection, MockCalledOnceWith(
221- protocol.ident, sentinel.host, sentinel.port))
222+ protocol.connid))
223 # The connection is removed from the set, but the key remains.
224 self.assertDictEqual({protocol.ident: set()}, service.connections)
225 # connectionLost() is called on the superclass.
226@@ -479,7 +479,8 @@ class TestRegionServer(MAASTransactionServerTestCase):
227 self.assertTrue(sentinel.host, protocol.hostIsRemote)
228 self.assertThat(
229 ipcWorker.rpcRegisterConnection,
230- MockCalledOnceWith(protocol.ident, host.host, host.port))
231+ MockCalledOnceWith(
232+ protocol.connid, protocol.ident, host.host, host.port))
233
234 @wait_for_reactor
235 @inlineCallbacks
236diff --git a/src/maasserver/tests/test_ipc.py b/src/maasserver/tests/test_ipc.py
237index c7b9426..cec6408 100644
238--- a/src/maasserver/tests/test_ipc.py
239+++ b/src/maasserver/tests/test_ipc.py
240@@ -9,6 +9,7 @@ from datetime import timedelta
241 import os
242 import random
243 from unittest.mock import MagicMock
244+import uuid
245
246 from crochet import wait_for
247 from fixtures import EnvironmentVariableFixture
248@@ -252,9 +253,11 @@ class TestIPCCommunication(MAASTransactionServerTestCase):
249 yield rpc_started.get(timeout=2)
250
251 rackd = yield deferToDatabase(factory.make_RackController)
252+ connid = str(uuid.uuid4())
253 address = factory.make_ipv4_address()
254 port = random.randint(1000, 5000)
255- yield worker.rpcRegisterConnection(rackd.system_id, address, port)
256+ yield worker.rpcRegisterConnection(
257+ connid, rackd.system_id, address, port)
258
259 def getConnection():
260 region = RegionController.objects.get_running_controller()
261@@ -268,10 +271,10 @@ class TestIPCCommunication(MAASTransactionServerTestCase):
262 connection = yield deferToDatabase(getConnection)
263 self.assertIsNotNone(connection)
264 self.assertEquals(
265- {(rackd.system_id, address, port), },
266+ {connid: (rackd.system_id, address, port)},
267 master.connections[pid]['rpc']['connections'])
268
269- yield worker.rpcUnregisterConnection(rackd.system_id, address, port)
270+ yield worker.rpcUnregisterConnection(connid)
271 connection = yield deferToDatabase(getConnection)
272 self.assertIsNone(connection)
273

Subscribers

People subscribed via source and target branches