Merge ~blake-rouse/maas:fix-rack-rpc-conn into maas:master

Proposed by Blake Rouse
Status: Merged
Approved by: Andres Rodriguez
Approved revision: 51df2e0bae0a3336a6496706429c87c7057a3325
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~blake-rouse/maas:fix-rack-rpc-conn
Merge into: maas:master
Diff against target: 395 lines (+195/-23)
6 files modified
src/maasserver/models/node.py (+1/-1)
src/maasserver/models/tests/test_node.py (+1/-1)
src/maasserver/rpc/regionservice.py (+19/-3)
src/maasserver/rpc/tests/test_regionservice.py (+37/-0)
src/provisioningserver/rpc/clusterservice.py (+89/-17)
src/provisioningserver/rpc/tests/test_clusterservice.py (+48/-1)
Reviewer Review Type Date Requested Status
Andres Rodriguez (community) Approve
Review via email: mp+332814@code.launchpad.net

Commit message

Fixes LP: #1727073 - [2.3] rackd — 12% connected to region controllers.

Fix rackd RPC connections to only connect once and in parallel. Fix regiond RPC output to not include IP addresses that duplicate across regions. Fix precentage calculation for rackd connections.

Description of the change

All the code changes in the clusterservice are covered by the test_clusterservice. The try_connections needs covered so that was added to the tests.

I have tested this across a 2 region+rack controllers, with both having a configured virbr0 with IP 192.168.122.1.

To post a comment you must log in.
Revision history for this message
Andres Rodriguez (andreserl) wrote :

codewise looks good, just have a couple of suggestions inline. Are those possible to do?

review: Needs Information
Revision history for this message
Blake Rouse (blake-rouse) :
Revision history for this message
Blake Rouse (blake-rouse) wrote :

Okay its ready for another review. Logging has been updated.

Revision history for this message
Andres Rodriguez (andreserl) :
review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py
2index e6078b5..6699d81 100644
3--- a/src/maasserver/models/node.py
4+++ b/src/maasserver/models/node.py
5@@ -5116,7 +5116,7 @@ class RackController(Controller):
6 Service.objects.update_service_for(
7 self, "rackd", SERVICE_STATUS.DEGRADED,
8 "{:.0%} connected to region controllers.".format(
9- percentage))
10+ 1.0 - percentage))
11
12 def get_image_sync_status(self, boot_images=None):
13 """Return the status of the boot image import process."""
14diff --git a/src/maasserver/models/tests/test_node.py b/src/maasserver/models/tests/test_node.py
15index c4bcb65..c3e5b09 100644
16--- a/src/maasserver/models/tests/test_node.py
17+++ b/src/maasserver/models/tests/test_node.py
18@@ -9999,7 +9999,7 @@ class TestRackController(MAASTransactionServerTestCase):
19 MatchesStructure.byEquality(
20 status=SERVICE_STATUS.DEGRADED, status_info=(
21 "{:.0%} connected to region controllers.".format(
22- percentage))))
23+ 1.0 - percentage))))
24
25 fake_images = [
26 {
27diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py
28index 340a1be..5798823 100644
29--- a/src/maasserver/rpc/regionservice.py
30+++ b/src/maasserver/rpc/regionservice.py
31@@ -1255,15 +1255,31 @@ class RegionAdvertising:
32 Each tuple corresponds to somewhere an event-loop is listening
33 within the whole region. The `name` is the event-loop name.
34 """
35+ # Each regiond might be running a local bridge that duplicates the
36+ # same IP address across region controllers. Each region controller
37+ # must output a set of unique of IP addresses, to prevent the rack
38+ # controller from connecting to a different region controller then
39+ # the rack controller was expecting to be connecting to.
40+ def _unique_to_region(address, region, regions):
41+ for region_obj in regions:
42+ if region_obj != region:
43+ for process in region_obj.processes.all():
44+ for endpoint in process.endpoints.all():
45+ if endpoint.address == address:
46+ return False
47+ return True
48+
49 regions = RegionController.objects.all()
50 regions = regions.prefetch_related("processes", "processes__endpoints")
51 all_endpoints = []
52 for region_obj in regions:
53 for process in region_obj.processes.all():
54 for endpoint in process.endpoints.all():
55- all_endpoints.append((
56- "%s:pid=%d" % (region_obj.hostname, process.pid),
57- endpoint.address, endpoint.port))
58+ if _unique_to_region(
59+ endpoint.address, region_obj, regions):
60+ all_endpoints.append((
61+ "%s:pid=%d" % (region_obj.hostname, process.pid),
62+ endpoint.address, endpoint.port))
63 return all_endpoints
64
65 @classmethod
66diff --git a/src/maasserver/rpc/tests/test_regionservice.py b/src/maasserver/rpc/tests/test_regionservice.py
67index b3caf43..88c1a03 100644
68--- a/src/maasserver/rpc/tests/test_regionservice.py
69+++ b/src/maasserver/rpc/tests/test_regionservice.py
70@@ -33,6 +33,7 @@ from maasserver.models import (
71 RackController,
72 RegionController,
73 RegionControllerProcess,
74+ RegionControllerProcessEndpoint,
75 RegionRackRPCConnection,
76 Service as ServiceModel,
77 timestampedmodel,
78@@ -1639,6 +1640,42 @@ class TestRegionAdvertising(MAASServerTestCase):
79 ]
80 self.assertItemsEqual(expected, advertising.dump())
81
82+ def test_dump_doesnt_duplicate_ips_across_region_controllers(self):
83+ duplicate_address = factory.make_ipv4_address()
84+ addresses = {
85+ (factory.make_ipv4_address(), factory.pick_port()),
86+ (factory.make_ipv4_address(), factory.pick_port()),
87+ }
88+ advertising = RegionAdvertising.promote()
89+ advertising.update(
90+ addresses.union({(duplicate_address, factory.pick_port()), }))
91+
92+ other_addresses = {
93+ (factory.make_ipv4_address(), factory.pick_port()),
94+ (factory.make_ipv4_address(), factory.pick_port()),
95+ }
96+ other_region = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER)
97+ other_process = RegionControllerProcess.objects.create(
98+ region=other_region, pid=randint(1, 1000))
99+ for address, port in other_addresses:
100+ RegionControllerProcessEndpoint.objects.create(
101+ process=other_process, address=address, port=port)
102+ RegionControllerProcessEndpoint.objects.create(
103+ process=other_process,
104+ address=duplicate_address,
105+ port=factory.pick_port())
106+
107+ expected = [
108+ ("%s:pid=%d" % (gethostname(), os.getpid()), addr, port)
109+ for (addr, port) in addresses
110+ ] + [
111+ ("%s:pid=%d" % (
112+ other_region.hostname, other_process.pid),
113+ addr, port)
114+ for (addr, port) in other_addresses
115+ ]
116+ self.assertItemsEqual(expected, advertising.dump())
117+
118 def test__adds_connection_and_removes_connection(self):
119 advertising = RegionAdvertising.promote()
120 process = advertising.getRegionProcess()
121diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
122index d3e5413..946f516 100644
123--- a/src/provisioningserver/rpc/clusterservice.py
124+++ b/src/provisioningserver/rpc/clusterservice.py
125@@ -116,6 +116,7 @@ from twisted import web
126 from twisted.application.internet import TimerService
127 from twisted.internet import reactor
128 from twisted.internet.defer import (
129+ DeferredList,
130 inlineCallbacks,
131 maybeDeferred,
132 returnValue,
133@@ -841,6 +842,8 @@ class ClusterClient(Cluster):
134 log.msg("Event-loop '%s' authenticated." % self.ident)
135 registered = yield self.registerRackWithRegion()
136 if registered:
137+ if self.eventloop in self.service.try_connections:
138+ del self.service.try_connections[self.eventloop]
139 self.service.connections[self.eventloop] = self
140 self.ready.set(self.eventloop)
141 else:
142@@ -992,6 +995,8 @@ class ClusterClientService(TimerService, object):
143 super(ClusterClientService, self).__init__(
144 self._calculate_interval(None, None), self._tryUpdate)
145 self.connections = {}
146+ self.try_connections = {}
147+ self._previous_work = (None, None)
148 self.clock = reactor
149
150 # XXX jtv 2014-09-23, bug=1372767: Fix
151@@ -1229,6 +1234,48 @@ class ClusterClientService(TimerService, object):
152 ]
153 for name, addresses in eventloops.items()
154 }
155+
156+ drop, connect = self._calculate_work(eventloops)
157+
158+ # Log fully connected only once. If that state changes then log
159+ # it again. This prevents flooding the log with the same message when
160+ # the state of the connections has not changed.
161+ prev_work, self._previous_work = self._previous_work, (drop, connect)
162+ if len(drop) == 0 and len(connect) == 0:
163+ if prev_work != (drop, connect) and len(eventloops) > 0:
164+ controllers = {
165+ eventloop.split(':')[0]
166+ for eventloop, _ in eventloops.items()
167+ }
168+ log.msg(
169+ "Fully connected to all %d event-loops on all %d "
170+ "region controllers (%s)." % (
171+ len(eventloops), len(controllers),
172+ ', '.join(controllers)))
173+
174+ # Drop all connections at once, as the are no longer required.
175+ if len(drop) > 0:
176+ log.msg("Dropping connections to event-loops: %s" % (
177+ ', '.join(drop.keys())))
178+ yield DeferredList([
179+ maybeDeferred(self._drop_connection, connection)
180+ for eventloop, connections in drop.items()
181+ for connection in connections
182+ ], consumeErrors=True)
183+
184+ # Make all the new connections to each endpoint at the same time.
185+ if len(connect) > 0:
186+ log.msg("Making connections to event-loops: %s" % (
187+ ', '.join(connect.keys())))
188+ yield DeferredList([
189+ self._make_connections(eventloop, addresses)
190+ for eventloop, addresses in connect.items()
191+ ], consumeErrors=True)
192+
193+ def _calculate_work(self, eventloops):
194+ """Calculate the work that needs to be performed for reconnection."""
195+ drop, connect = {}, {}
196+
197 # Drop connections to event-loops that no longer include one of
198 # this cluster's established connections among its advertised
199 # endpoints. This is most likely to have happened because of
200@@ -1240,23 +1287,20 @@ class ClusterClientService(TimerService, object):
201 if eventloop in self.connections:
202 connection = self.connections[eventloop]
203 if connection.address not in addresses:
204- yield self._drop_connection(connection)
205+ drop[eventloop] = [connection]
206+ if eventloop in self.try_connections:
207+ connection = self.try_connections[eventloop]
208+ if connection.address not in addresses:
209+ drop[eventloop] = [connection]
210+
211 # Create new connections to event-loops that the cluster does
212- # not yet have a connection to. Try each advertised endpoint
213- # (address) in turn until one of them bites.
214+ # not yet have a connection to.
215 for eventloop, addresses in eventloops.items():
216- if eventloop not in self.connections:
217- for address in addresses:
218- try:
219- yield self._make_connection(eventloop, address)
220- except ConnectError as error:
221- host, port = address
222- log.msg("Event-loop %s (%s:%d): %s" % (
223- eventloop, host, port, error))
224- except:
225- log.err(None, "Failure making new RPC connection.")
226- else:
227- break
228+ if ((eventloop not in self.connections and
229+ eventloop not in self.try_connections) or
230+ eventloop in drop):
231+ connect[eventloop] = addresses
232+
233 # Remove connections to event-loops that are no longer
234 # advertised by the RPC info view. Most likely this means that
235 # the process in which the event-loop is no longer running, but
236@@ -1265,7 +1309,32 @@ class ClusterClientService(TimerService, object):
237 for eventloop in self.connections:
238 if eventloop not in eventloops:
239 connection = self.connections[eventloop]
240- yield self._drop_connection(connection)
241+ drop[eventloop] = [connection]
242+ for eventloop in self.try_connections:
243+ if eventloop not in eventloops:
244+ connection = self.try_connections[eventloop]
245+ drop[eventloop] = [connection]
246+
247+ return drop, connect
248+
249+ @inlineCallbacks
250+ def _make_connections(self, eventloop, addresses):
251+ """Connect to `eventloop` using all `addresses`."""
252+ for address in addresses:
253+ try:
254+ connection = yield self._make_connection(eventloop, address)
255+ except ConnectError as error:
256+ host, port = address
257+ log.msg("Event-loop %s (%s:%d): %s" % (
258+ eventloop, host, port, error))
259+ except:
260+ host, port = address
261+ log.err(None, (
262+ "Failure with event-loop %s (%s:%d)" % (
263+ eventloop, host, port)))
264+ else:
265+ self.try_connections[eventloop] = connection
266+ break
267
268 def _make_connection(self, eventloop, address):
269 """Connect to `eventloop` at `address`."""
270@@ -1284,6 +1353,9 @@ class ClusterClientService(TimerService, object):
271 If this is the last connection that was keeping rackd connected to
272 a regiond then dhcpd and dhcpd6 services will be turned off.
273 """
274+ if eventloop in self.try_connections:
275+ if self.try_connections[eventloop] is connection:
276+ del self.try_connections[eventloop]
277 if eventloop in self.connections:
278 if self.connections[eventloop] is connection:
279 del self.connections[eventloop]
280@@ -1299,7 +1371,7 @@ class ClusterClientService(TimerService, object):
281 dhcp_v6.off()
282 stopping_services.append("dhcpd6")
283 if len(stopping_services) > 0:
284- maaslog.error(
285+ log.msg(
286 "Lost all connections to region controllers. "
287 "Stopping service(s) %s." % ",".join(stopping_services))
288 service_monitor.ensureServices()
289diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
290index 8553d02..9a7c85d 100644
291--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
292+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
293@@ -636,7 +636,9 @@ class TestClusterClientService(MAASTestCase):
294 @inlineCallbacks
295 def test__update_connections_initially(self):
296 service = ClusterClientService(Clock())
297+ mock_client = Mock()
298 _make_connection = self.patch(service, "_make_connection")
299+ _make_connection.side_effect = lambda *args: succeed(mock_client)
300 _drop_connection = self.patch(service, "_drop_connection")
301
302 info = json.loads(self.example_rpc_info_view_response.decode("ascii"))
303@@ -650,10 +652,40 @@ class TestClusterClientService(MAASTestCase):
304 self.assertItemsEqual(
305 _make_connection_expected,
306 _make_connection.call_args_list)
307+ self.assertEquals({
308+ "host1:pid=1001": mock_client,
309+ "host1:pid=2002": mock_client,
310+ "host2:pid=3003": mock_client,
311+ }, service.try_connections)
312
313 self.assertEqual([], _drop_connection.mock_calls)
314
315 @inlineCallbacks
316+ def test__update_connections_logs_fully_connected(self):
317+ service = ClusterClientService(Clock())
318+ eventloops = {
319+ "region1:123": [("::ffff:127.0.0.1", 1234)],
320+ "region1:124": [("::ffff:127.0.0.1", 1235)],
321+ "region2:123": [("::ffff:127.0.0.2", 1234)],
322+ "region2:124": [("::ffff:127.0.0.2", 1235)],
323+ }
324+ for eventloop, addresses in eventloops.items():
325+ for address in addresses:
326+ client = Mock()
327+ client.address = address
328+ service.connections[eventloop] = client
329+
330+ logger = self.useFixture(TwistedLoggerFixture())
331+
332+ yield service._update_connections(eventloops)
333+ # Second call should not add it to the log.
334+ yield service._update_connections(eventloops)
335+
336+ self.assertEqual(
337+ "Fully connected to all 4 event-loops on all 2 region "
338+ "controllers (region2, region1).", logger.dump())
339+
340+ @inlineCallbacks
341 def test__update_connections_connect_error_is_logged_tersely(self):
342 service = ClusterClientService(Clock())
343 _make_connection = self.patch(service, "_make_connection")
344@@ -669,6 +701,8 @@ class TestClusterClientService(MAASTestCase):
345 MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234)))
346
347 self.assertEqual(
348+ "Making connections to event-loops: an-event-loop\n"
349+ "---\n"
350 "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection "
351 "was refused by other side.", logger.dump())
352
353@@ -689,7 +723,9 @@ class TestClusterClientService(MAASTestCase):
354
355 self.assertDocTestMatches(
356 """\
357- Failure making new RPC connection.
358+ Making connections to event-loops: an-event-loop
359+ ---
360+ Failure with event-loop an-event-loop (::ffff:127.0.0.1:1234)
361 Traceback (most recent call last):
362 ...
363 builtins.RuntimeError: Something went wrong.
364@@ -797,6 +833,15 @@ class TestClusterClientService(MAASTestCase):
365 connection.transport.loseConnection,
366 MockCalledOnceWith())
367
368+ def test__remove_connection_removes_from_try_connections(self):
369+ service = make_inert_client_service()
370+ service.startService()
371+ endpoint = Mock()
372+ connection = Mock()
373+ service.try_connections[endpoint] = connection
374+ service.remove_connection(endpoint, connection)
375+ self.assertThat(service.try_connections, Equals({}))
376+
377 def test__remove_connection_removes_from_connections(self):
378 service = make_inert_client_service()
379 service.startService()
380@@ -1063,6 +1108,7 @@ class TestClusterClient(MAASTestCase):
381
382 def test_connecting(self):
383 client = self.make_running_client()
384+ client.service.try_connections[client.eventloop] = client
385 self.patch_authenticate_for_success(client)
386 self.patch_register_for_success(client)
387 self.assertEqual(client.service.connections, {})
388@@ -1076,6 +1122,7 @@ class TestClusterClient(MAASTestCase):
389 self.assertTrue(extract_result(wait_for_authenticated))
390 # ready has been set with the name of the event-loop.
391 self.assertEqual(client.eventloop, extract_result(wait_for_ready))
392+ self.assertEqual(client.service.try_connections, {})
393 self.assertEqual(
394 client.service.connections,
395 {client.eventloop: client})

Subscribers

People subscribed via source and target branches