Merge ~blake-rouse/maas:fix-rack-rpc-conn into maas:master
- Git
- lp:~blake-rouse/maas
- fix-rack-rpc-conn
- Merge into master
Status: | Merged | ||||
---|---|---|---|---|---|
Approved by: | Andres Rodriguez | ||||
Approved revision: | 51df2e0bae0a3336a6496706429c87c7057a3325 | ||||
Merge reported by: | MAAS Lander | ||||
Merged at revision: | not available | ||||
Proposed branch: | ~blake-rouse/maas:fix-rack-rpc-conn | ||||
Merge into: | maas:master | ||||
Diff against target: |
395 lines (+195/-23) 6 files modified
src/maasserver/models/node.py (+1/-1) src/maasserver/models/tests/test_node.py (+1/-1) src/maasserver/rpc/regionservice.py (+19/-3) src/maasserver/rpc/tests/test_regionservice.py (+37/-0) src/provisioningserver/rpc/clusterservice.py (+89/-17) src/provisioningserver/rpc/tests/test_clusterservice.py (+48/-1) |
||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Andres Rodriguez (community) | Approve | ||
Review via email: mp+332814@code.launchpad.net |
Commit message
Fixes LP: #1727073 - [2.3] rackd — 12% connected to region controllers.
Fix rackd RPC connections to only connect once and in parallel. Fix regiond RPC output to not include IP addresses that duplicate across regions. Fix precentage calculation for rackd connections.
Description of the change
All the code changes in the clusterservice are covered by the test_clusterser
I have tested this across a 2 region+rack controllers, with both having a configured virbr0 with IP 192.168.122.1.
Blake Rouse (blake-rouse) : | # |
Blake Rouse (blake-rouse) wrote : | # |
Okay its ready for another review. Logging has been updated.
Andres Rodriguez (andreserl) : | # |
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b fix-rack-rpc-conn lp:~blake-rouse/maas into -b master lp:~maas-committers/maas
STATUS: FAILED BUILD
LOG: http://
Preview Diff
1 | diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py |
2 | index e6078b5..6699d81 100644 |
3 | --- a/src/maasserver/models/node.py |
4 | +++ b/src/maasserver/models/node.py |
5 | @@ -5116,7 +5116,7 @@ class RackController(Controller): |
6 | Service.objects.update_service_for( |
7 | self, "rackd", SERVICE_STATUS.DEGRADED, |
8 | "{:.0%} connected to region controllers.".format( |
9 | - percentage)) |
10 | + 1.0 - percentage)) |
11 | |
12 | def get_image_sync_status(self, boot_images=None): |
13 | """Return the status of the boot image import process.""" |
14 | diff --git a/src/maasserver/models/tests/test_node.py b/src/maasserver/models/tests/test_node.py |
15 | index c4bcb65..c3e5b09 100644 |
16 | --- a/src/maasserver/models/tests/test_node.py |
17 | +++ b/src/maasserver/models/tests/test_node.py |
18 | @@ -9999,7 +9999,7 @@ class TestRackController(MAASTransactionServerTestCase): |
19 | MatchesStructure.byEquality( |
20 | status=SERVICE_STATUS.DEGRADED, status_info=( |
21 | "{:.0%} connected to region controllers.".format( |
22 | - percentage)))) |
23 | + 1.0 - percentage)))) |
24 | |
25 | fake_images = [ |
26 | { |
27 | diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py |
28 | index 340a1be..5798823 100644 |
29 | --- a/src/maasserver/rpc/regionservice.py |
30 | +++ b/src/maasserver/rpc/regionservice.py |
31 | @@ -1255,15 +1255,31 @@ class RegionAdvertising: |
32 | Each tuple corresponds to somewhere an event-loop is listening |
33 | within the whole region. The `name` is the event-loop name. |
34 | """ |
35 | + # Each regiond might be running a local bridge that duplicates the |
36 | + # same IP address across region controllers. Each region controller |
37 | + # must output a set of unique of IP addresses, to prevent the rack |
38 | + # controller from connecting to a different region controller then |
39 | + # the rack controller was expecting to be connecting to. |
40 | + def _unique_to_region(address, region, regions): |
41 | + for region_obj in regions: |
42 | + if region_obj != region: |
43 | + for process in region_obj.processes.all(): |
44 | + for endpoint in process.endpoints.all(): |
45 | + if endpoint.address == address: |
46 | + return False |
47 | + return True |
48 | + |
49 | regions = RegionController.objects.all() |
50 | regions = regions.prefetch_related("processes", "processes__endpoints") |
51 | all_endpoints = [] |
52 | for region_obj in regions: |
53 | for process in region_obj.processes.all(): |
54 | for endpoint in process.endpoints.all(): |
55 | - all_endpoints.append(( |
56 | - "%s:pid=%d" % (region_obj.hostname, process.pid), |
57 | - endpoint.address, endpoint.port)) |
58 | + if _unique_to_region( |
59 | + endpoint.address, region_obj, regions): |
60 | + all_endpoints.append(( |
61 | + "%s:pid=%d" % (region_obj.hostname, process.pid), |
62 | + endpoint.address, endpoint.port)) |
63 | return all_endpoints |
64 | |
65 | @classmethod |
66 | diff --git a/src/maasserver/rpc/tests/test_regionservice.py b/src/maasserver/rpc/tests/test_regionservice.py |
67 | index b3caf43..88c1a03 100644 |
68 | --- a/src/maasserver/rpc/tests/test_regionservice.py |
69 | +++ b/src/maasserver/rpc/tests/test_regionservice.py |
70 | @@ -33,6 +33,7 @@ from maasserver.models import ( |
71 | RackController, |
72 | RegionController, |
73 | RegionControllerProcess, |
74 | + RegionControllerProcessEndpoint, |
75 | RegionRackRPCConnection, |
76 | Service as ServiceModel, |
77 | timestampedmodel, |
78 | @@ -1639,6 +1640,42 @@ class TestRegionAdvertising(MAASServerTestCase): |
79 | ] |
80 | self.assertItemsEqual(expected, advertising.dump()) |
81 | |
82 | + def test_dump_doesnt_duplicate_ips_across_region_controllers(self): |
83 | + duplicate_address = factory.make_ipv4_address() |
84 | + addresses = { |
85 | + (factory.make_ipv4_address(), factory.pick_port()), |
86 | + (factory.make_ipv4_address(), factory.pick_port()), |
87 | + } |
88 | + advertising = RegionAdvertising.promote() |
89 | + advertising.update( |
90 | + addresses.union({(duplicate_address, factory.pick_port()), })) |
91 | + |
92 | + other_addresses = { |
93 | + (factory.make_ipv4_address(), factory.pick_port()), |
94 | + (factory.make_ipv4_address(), factory.pick_port()), |
95 | + } |
96 | + other_region = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER) |
97 | + other_process = RegionControllerProcess.objects.create( |
98 | + region=other_region, pid=randint(1, 1000)) |
99 | + for address, port in other_addresses: |
100 | + RegionControllerProcessEndpoint.objects.create( |
101 | + process=other_process, address=address, port=port) |
102 | + RegionControllerProcessEndpoint.objects.create( |
103 | + process=other_process, |
104 | + address=duplicate_address, |
105 | + port=factory.pick_port()) |
106 | + |
107 | + expected = [ |
108 | + ("%s:pid=%d" % (gethostname(), os.getpid()), addr, port) |
109 | + for (addr, port) in addresses |
110 | + ] + [ |
111 | + ("%s:pid=%d" % ( |
112 | + other_region.hostname, other_process.pid), |
113 | + addr, port) |
114 | + for (addr, port) in other_addresses |
115 | + ] |
116 | + self.assertItemsEqual(expected, advertising.dump()) |
117 | + |
118 | def test__adds_connection_and_removes_connection(self): |
119 | advertising = RegionAdvertising.promote() |
120 | process = advertising.getRegionProcess() |
121 | diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py |
122 | index d3e5413..946f516 100644 |
123 | --- a/src/provisioningserver/rpc/clusterservice.py |
124 | +++ b/src/provisioningserver/rpc/clusterservice.py |
125 | @@ -116,6 +116,7 @@ from twisted import web |
126 | from twisted.application.internet import TimerService |
127 | from twisted.internet import reactor |
128 | from twisted.internet.defer import ( |
129 | + DeferredList, |
130 | inlineCallbacks, |
131 | maybeDeferred, |
132 | returnValue, |
133 | @@ -841,6 +842,8 @@ class ClusterClient(Cluster): |
134 | log.msg("Event-loop '%s' authenticated." % self.ident) |
135 | registered = yield self.registerRackWithRegion() |
136 | if registered: |
137 | + if self.eventloop in self.service.try_connections: |
138 | + del self.service.try_connections[self.eventloop] |
139 | self.service.connections[self.eventloop] = self |
140 | self.ready.set(self.eventloop) |
141 | else: |
142 | @@ -992,6 +995,8 @@ class ClusterClientService(TimerService, object): |
143 | super(ClusterClientService, self).__init__( |
144 | self._calculate_interval(None, None), self._tryUpdate) |
145 | self.connections = {} |
146 | + self.try_connections = {} |
147 | + self._previous_work = (None, None) |
148 | self.clock = reactor |
149 | |
150 | # XXX jtv 2014-09-23, bug=1372767: Fix |
151 | @@ -1229,6 +1234,48 @@ class ClusterClientService(TimerService, object): |
152 | ] |
153 | for name, addresses in eventloops.items() |
154 | } |
155 | + |
156 | + drop, connect = self._calculate_work(eventloops) |
157 | + |
158 | + # Log fully connected only once. If that state changes then log |
159 | + # it again. This prevents flooding the log with the same message when |
160 | + # the state of the connections has not changed. |
161 | + prev_work, self._previous_work = self._previous_work, (drop, connect) |
162 | + if len(drop) == 0 and len(connect) == 0: |
163 | + if prev_work != (drop, connect) and len(eventloops) > 0: |
164 | + controllers = { |
165 | + eventloop.split(':')[0] |
166 | + for eventloop, _ in eventloops.items() |
167 | + } |
168 | + log.msg( |
169 | + "Fully connected to all %d event-loops on all %d " |
170 | + "region controllers (%s)." % ( |
171 | + len(eventloops), len(controllers), |
172 | + ', '.join(controllers))) |
173 | + |
174 | + # Drop all connections at once, as the are no longer required. |
175 | + if len(drop) > 0: |
176 | + log.msg("Dropping connections to event-loops: %s" % ( |
177 | + ', '.join(drop.keys()))) |
178 | + yield DeferredList([ |
179 | + maybeDeferred(self._drop_connection, connection) |
180 | + for eventloop, connections in drop.items() |
181 | + for connection in connections |
182 | + ], consumeErrors=True) |
183 | + |
184 | + # Make all the new connections to each endpoint at the same time. |
185 | + if len(connect) > 0: |
186 | + log.msg("Making connections to event-loops: %s" % ( |
187 | + ', '.join(connect.keys()))) |
188 | + yield DeferredList([ |
189 | + self._make_connections(eventloop, addresses) |
190 | + for eventloop, addresses in connect.items() |
191 | + ], consumeErrors=True) |
192 | + |
193 | + def _calculate_work(self, eventloops): |
194 | + """Calculate the work that needs to be performed for reconnection.""" |
195 | + drop, connect = {}, {} |
196 | + |
197 | # Drop connections to event-loops that no longer include one of |
198 | # this cluster's established connections among its advertised |
199 | # endpoints. This is most likely to have happened because of |
200 | @@ -1240,23 +1287,20 @@ class ClusterClientService(TimerService, object): |
201 | if eventloop in self.connections: |
202 | connection = self.connections[eventloop] |
203 | if connection.address not in addresses: |
204 | - yield self._drop_connection(connection) |
205 | + drop[eventloop] = [connection] |
206 | + if eventloop in self.try_connections: |
207 | + connection = self.try_connections[eventloop] |
208 | + if connection.address not in addresses: |
209 | + drop[eventloop] = [connection] |
210 | + |
211 | # Create new connections to event-loops that the cluster does |
212 | - # not yet have a connection to. Try each advertised endpoint |
213 | - # (address) in turn until one of them bites. |
214 | + # not yet have a connection to. |
215 | for eventloop, addresses in eventloops.items(): |
216 | - if eventloop not in self.connections: |
217 | - for address in addresses: |
218 | - try: |
219 | - yield self._make_connection(eventloop, address) |
220 | - except ConnectError as error: |
221 | - host, port = address |
222 | - log.msg("Event-loop %s (%s:%d): %s" % ( |
223 | - eventloop, host, port, error)) |
224 | - except: |
225 | - log.err(None, "Failure making new RPC connection.") |
226 | - else: |
227 | - break |
228 | + if ((eventloop not in self.connections and |
229 | + eventloop not in self.try_connections) or |
230 | + eventloop in drop): |
231 | + connect[eventloop] = addresses |
232 | + |
233 | # Remove connections to event-loops that are no longer |
234 | # advertised by the RPC info view. Most likely this means that |
235 | # the process in which the event-loop is no longer running, but |
236 | @@ -1265,7 +1309,32 @@ class ClusterClientService(TimerService, object): |
237 | for eventloop in self.connections: |
238 | if eventloop not in eventloops: |
239 | connection = self.connections[eventloop] |
240 | - yield self._drop_connection(connection) |
241 | + drop[eventloop] = [connection] |
242 | + for eventloop in self.try_connections: |
243 | + if eventloop not in eventloops: |
244 | + connection = self.try_connections[eventloop] |
245 | + drop[eventloop] = [connection] |
246 | + |
247 | + return drop, connect |
248 | + |
249 | + @inlineCallbacks |
250 | + def _make_connections(self, eventloop, addresses): |
251 | + """Connect to `eventloop` using all `addresses`.""" |
252 | + for address in addresses: |
253 | + try: |
254 | + connection = yield self._make_connection(eventloop, address) |
255 | + except ConnectError as error: |
256 | + host, port = address |
257 | + log.msg("Event-loop %s (%s:%d): %s" % ( |
258 | + eventloop, host, port, error)) |
259 | + except: |
260 | + host, port = address |
261 | + log.err(None, ( |
262 | + "Failure with event-loop %s (%s:%d)" % ( |
263 | + eventloop, host, port))) |
264 | + else: |
265 | + self.try_connections[eventloop] = connection |
266 | + break |
267 | |
268 | def _make_connection(self, eventloop, address): |
269 | """Connect to `eventloop` at `address`.""" |
270 | @@ -1284,6 +1353,9 @@ class ClusterClientService(TimerService, object): |
271 | If this is the last connection that was keeping rackd connected to |
272 | a regiond then dhcpd and dhcpd6 services will be turned off. |
273 | """ |
274 | + if eventloop in self.try_connections: |
275 | + if self.try_connections[eventloop] is connection: |
276 | + del self.try_connections[eventloop] |
277 | if eventloop in self.connections: |
278 | if self.connections[eventloop] is connection: |
279 | del self.connections[eventloop] |
280 | @@ -1299,7 +1371,7 @@ class ClusterClientService(TimerService, object): |
281 | dhcp_v6.off() |
282 | stopping_services.append("dhcpd6") |
283 | if len(stopping_services) > 0: |
284 | - maaslog.error( |
285 | + log.msg( |
286 | "Lost all connections to region controllers. " |
287 | "Stopping service(s) %s." % ",".join(stopping_services)) |
288 | service_monitor.ensureServices() |
289 | diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py |
290 | index 8553d02..9a7c85d 100644 |
291 | --- a/src/provisioningserver/rpc/tests/test_clusterservice.py |
292 | +++ b/src/provisioningserver/rpc/tests/test_clusterservice.py |
293 | @@ -636,7 +636,9 @@ class TestClusterClientService(MAASTestCase): |
294 | @inlineCallbacks |
295 | def test__update_connections_initially(self): |
296 | service = ClusterClientService(Clock()) |
297 | + mock_client = Mock() |
298 | _make_connection = self.patch(service, "_make_connection") |
299 | + _make_connection.side_effect = lambda *args: succeed(mock_client) |
300 | _drop_connection = self.patch(service, "_drop_connection") |
301 | |
302 | info = json.loads(self.example_rpc_info_view_response.decode("ascii")) |
303 | @@ -650,10 +652,40 @@ class TestClusterClientService(MAASTestCase): |
304 | self.assertItemsEqual( |
305 | _make_connection_expected, |
306 | _make_connection.call_args_list) |
307 | + self.assertEquals({ |
308 | + "host1:pid=1001": mock_client, |
309 | + "host1:pid=2002": mock_client, |
310 | + "host2:pid=3003": mock_client, |
311 | + }, service.try_connections) |
312 | |
313 | self.assertEqual([], _drop_connection.mock_calls) |
314 | |
315 | @inlineCallbacks |
316 | + def test__update_connections_logs_fully_connected(self): |
317 | + service = ClusterClientService(Clock()) |
318 | + eventloops = { |
319 | + "region1:123": [("::ffff:127.0.0.1", 1234)], |
320 | + "region1:124": [("::ffff:127.0.0.1", 1235)], |
321 | + "region2:123": [("::ffff:127.0.0.2", 1234)], |
322 | + "region2:124": [("::ffff:127.0.0.2", 1235)], |
323 | + } |
324 | + for eventloop, addresses in eventloops.items(): |
325 | + for address in addresses: |
326 | + client = Mock() |
327 | + client.address = address |
328 | + service.connections[eventloop] = client |
329 | + |
330 | + logger = self.useFixture(TwistedLoggerFixture()) |
331 | + |
332 | + yield service._update_connections(eventloops) |
333 | + # Second call should not add it to the log. |
334 | + yield service._update_connections(eventloops) |
335 | + |
336 | + self.assertEqual( |
337 | + "Fully connected to all 4 event-loops on all 2 region " |
338 | + "controllers (region2, region1).", logger.dump()) |
339 | + |
340 | + @inlineCallbacks |
341 | def test__update_connections_connect_error_is_logged_tersely(self): |
342 | service = ClusterClientService(Clock()) |
343 | _make_connection = self.patch(service, "_make_connection") |
344 | @@ -669,6 +701,8 @@ class TestClusterClientService(MAASTestCase): |
345 | MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234))) |
346 | |
347 | self.assertEqual( |
348 | + "Making connections to event-loops: an-event-loop\n" |
349 | + "---\n" |
350 | "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection " |
351 | "was refused by other side.", logger.dump()) |
352 | |
353 | @@ -689,7 +723,9 @@ class TestClusterClientService(MAASTestCase): |
354 | |
355 | self.assertDocTestMatches( |
356 | """\ |
357 | - Failure making new RPC connection. |
358 | + Making connections to event-loops: an-event-loop |
359 | + --- |
360 | + Failure with event-loop an-event-loop (::ffff:127.0.0.1:1234) |
361 | Traceback (most recent call last): |
362 | ... |
363 | builtins.RuntimeError: Something went wrong. |
364 | @@ -797,6 +833,15 @@ class TestClusterClientService(MAASTestCase): |
365 | connection.transport.loseConnection, |
366 | MockCalledOnceWith()) |
367 | |
368 | + def test__remove_connection_removes_from_try_connections(self): |
369 | + service = make_inert_client_service() |
370 | + service.startService() |
371 | + endpoint = Mock() |
372 | + connection = Mock() |
373 | + service.try_connections[endpoint] = connection |
374 | + service.remove_connection(endpoint, connection) |
375 | + self.assertThat(service.try_connections, Equals({})) |
376 | + |
377 | def test__remove_connection_removes_from_connections(self): |
378 | service = make_inert_client_service() |
379 | service.startService() |
380 | @@ -1063,6 +1108,7 @@ class TestClusterClient(MAASTestCase): |
381 | |
382 | def test_connecting(self): |
383 | client = self.make_running_client() |
384 | + client.service.try_connections[client.eventloop] = client |
385 | self.patch_authenticate_for_success(client) |
386 | self.patch_register_for_success(client) |
387 | self.assertEqual(client.service.connections, {}) |
388 | @@ -1076,6 +1122,7 @@ class TestClusterClient(MAASTestCase): |
389 | self.assertTrue(extract_result(wait_for_authenticated)) |
390 | # ready has been set with the name of the event-loop. |
391 | self.assertEqual(client.eventloop, extract_result(wait_for_ready)) |
392 | + self.assertEqual(client.service.try_connections, {}) |
393 | self.assertEqual( |
394 | client.service.connections, |
395 | {client.eventloop: client}) |
codewise looks good, just have a couple of suggestions inline. Are those possible to do?