Merge ~cgrabowski/maas:rpc_connection_pool_burst into maas:master

Proposed by Christian Grabowski
Status: Merged
Approved by: Christian Grabowski
Approved revision: 5f3b0b943fd0ca495036f6b21938da58ddeb6bba
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~cgrabowski/maas:rpc_connection_pool_burst
Merge into: maas:master
Diff against target: 768 lines (+304/-59)
12 files modified
src/provisioningserver/config.py (+17/-0)
src/provisioningserver/plugin.py (+7/-1)
src/provisioningserver/rackdservices/external.py (+3/-2)
src/provisioningserver/rackdservices/http.py (+3/-2)
src/provisioningserver/rackdservices/tests/test_external.py (+8/-5)
src/provisioningserver/rackdservices/tests/test_http.py (+5/-4)
src/provisioningserver/rpc/clusterservice.py (+72/-8)
src/provisioningserver/rpc/common.py (+22/-5)
src/provisioningserver/rpc/exceptions.py (+8/-0)
src/provisioningserver/rpc/testing/__init__.py (+2/-1)
src/provisioningserver/rpc/testing/doubles.py (+18/-0)
src/provisioningserver/rpc/tests/test_clusterservice.py (+139/-31)
Reviewer Review Type Date Requested Status
Adam Collard (community) Needs Information
MAAS Lander Approve
Alexsander de Souza Approve
Review via email: mp+428208@code.launchpad.net

Commit message

allocate additional connections when busy

always connect max idle connections times

Description of the change

Allows RPC connections to scale up to a configured amount when busy.

To post a comment you must log in.
Revision history for this message
Alexsander de Souza (alexsander-souza) wrote :

+1

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/300/consoleText
COMMIT: ba71e215175a711cbe340ccfa250bc1a1093ccb1

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/301/consoleText
COMMIT: 74329f97f1dd1a3d08ea1f795ebf9c51cacf5b41

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/302/consoleText
COMMIT: 820009b533a8fbbc449ff7c88af733ef8107fae8

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/303/consoleText
COMMIT: 47a4dfedbee9d93a57484e53d0557f5a29c32396

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/304/consoleText
COMMIT: 8f3912b1979ab12d569d49c7086d8cdbe86f55ba

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 5f3b0b943fd0ca495036f6b21938da58ddeb6bba

review: Approve
Revision history for this message
Adam Collard (adam-collard) :
Revision history for this message
Adam Collard (adam-collard) wrote :

How do the idle connections work with the _update_interval method of ClusterClientService?

That method wants to control how often to poll the region controller to establish connectivity, and has logic to compare the number of connected connections with the expected (which i think will go wrong with idle/bursting connections).

review: Needs Information
Revision history for this message
Adam Collard (adam-collard) wrote :

This broke system tests and could do with a little more love.

Reverting in https://code.launchpad.net/~adam-collard/maas/+git/maas/+merge/428258 to retain green tests and give space for the work

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/provisioningserver/config.py b/src/provisioningserver/config.py
2index 97b6e68..9511ad6 100644
3--- a/src/provisioningserver/config.py
4+++ b/src/provisioningserver/config.py
5@@ -762,6 +762,23 @@ class ClusterConfiguration(Configuration, metaclass=ClusterConfigurationMeta):
6 ),
7 )
8
9+ # RPC Connection Pool options
10+ max_idle_rpc_connections = ConfigurationOption(
11+ "max_idle_rpc_connections",
12+ "The nominal number of connections to have per endpoint",
13+ Number(min=1, max=1024, if_missing=1),
14+ )
15+ max_rpc_connections = ConfigurationOption(
16+ "max_rpc_connections",
17+ "The maximum number of connections to scale to when under load",
18+ Number(min=1, max=1024, if_missing=4),
19+ )
20+ rpc_keepalive = ConfigurationOption(
21+ "rpc_keepalive",
22+ "The duration in miliseconds to keep added connections alive",
23+ Number(min=1, max=5000, if_missing=1000),
24+ )
25+
26 # TFTP options.
27 tftp_port = ConfigurationOption(
28 "tftp_port",
29diff --git a/src/provisioningserver/plugin.py b/src/provisioningserver/plugin.py
30index e987c73..00ff898 100644
31--- a/src/provisioningserver/plugin.py
32+++ b/src/provisioningserver/plugin.py
33@@ -139,7 +139,13 @@ class ProvisioningServiceMaker:
34 def _makeRPCService(self):
35 from provisioningserver.rpc.clusterservice import ClusterClientService
36
37- rpc_service = ClusterClientService(reactor)
38+ with ClusterConfiguration.open() as config:
39+ rpc_service = ClusterClientService(
40+ reactor,
41+ config.max_idle_rpc_connections,
42+ config.max_rpc_connections,
43+ config.rpc_keepalive,
44+ )
45 rpc_service.setName("rpc")
46 return rpc_service
47
48diff --git a/src/provisioningserver/rackdservices/external.py b/src/provisioningserver/rackdservices/external.py
49index ccabb74..5b8afe5 100644
50--- a/src/provisioningserver/rackdservices/external.py
51+++ b/src/provisioningserver/rackdservices/external.py
52@@ -68,8 +68,9 @@ class RackOnlyExternalService(metaclass=ABCMeta):
53
54 # Filter the connects by region.
55 conn_per_region = defaultdict(set)
56- for eventloop, connection in connections.items():
57- conn_per_region[eventloop.split(":")[0]].add(connection)
58+ for eventloop, connection_set in connections.items():
59+ for connection in connection_set:
60+ conn_per_region[eventloop.split(":")[0]].add(connection)
61 for eventloop, connections in conn_per_region.items():
62 # Sort the connections so the same IP is always picked per
63 # region controller. This ensures that the HTTP configuration
64diff --git a/src/provisioningserver/rackdservices/http.py b/src/provisioningserver/rackdservices/http.py
65index 421e35f..bda9d23 100644
66--- a/src/provisioningserver/rackdservices/http.py
67+++ b/src/provisioningserver/rackdservices/http.py
68@@ -101,8 +101,9 @@ class RackHTTPService(TimerService):
69 controller is connected to."""
70 # Filter the connects by region.
71 conn_per_region = defaultdict(set)
72- for eventloop, connection in self._rpc_service.connections.items():
73- conn_per_region[eventloop.split(":")[0]].add(connection)
74+ for eventloop, connection_set in self._rpc_service.connections.items():
75+ for connection in connection_set:
76+ conn_per_region[eventloop.split(":")[0]].add(connection)
77 for _, connections in conn_per_region.items():
78 # Sort the connections so the same IP is always picked per
79 # region controller. This ensures that the HTTP configuration
80diff --git a/src/provisioningserver/rackdservices/tests/test_external.py b/src/provisioningserver/rackdservices/tests/test_external.py
81index ad214a1..0cb8601 100644
82--- a/src/provisioningserver/rackdservices/tests/test_external.py
83+++ b/src/provisioningserver/rackdservices/tests/test_external.py
84@@ -430,7 +430,8 @@ class TestRackDNS(MAASTestCase):
85 return frozenset(
86 {
87 client.address[0]
88- for _, client in rpc_service.connections.items()
89+ for _, clients in rpc_service.connections.items()
90+ for client in clients
91 }
92 )
93
94@@ -609,7 +610,7 @@ class TestRackDNS(MAASTestCase):
95 ip = factory.make_ip_address()
96 mock_conn = Mock()
97 mock_conn.address = (ip, random.randint(5240, 5250))
98- mock_rpc.connections[eventloop] = mock_conn
99+ mock_rpc.connections[eventloop] = {mock_conn}
100
101 dns = external.RackDNS()
102 region_ips = list(dns._genRegionIps(mock_rpc.connections))
103@@ -626,7 +627,7 @@ class TestRackDNS(MAASTestCase):
104 ip = factory.make_ip_address()
105 mock_conn = Mock()
106 mock_conn.address = (ip, random.randint(5240, 5250))
107- mock_rpc.connections[eventloop] = mock_conn
108+ mock_rpc.connections[eventloop] = {mock_conn}
109
110 dns = external.RackDNS()
111 region_ips = frozenset(dns._genRegionIps(mock_rpc.connections))
112@@ -659,7 +660,8 @@ class TestRackProxy(MAASTestCase):
113 return frozenset(
114 {
115 client.address[0]
116- for _, client in rpc_service.connections.items()
117+ for _, clients in rpc_service.connections.items()
118+ for client in clients
119 }
120 )
121
122@@ -824,7 +826,8 @@ class TestRackSyslog(MAASTestCase):
123 return frozenset(
124 {
125 (eventloop, client.address[0])
126- for eventloop, client in rpc_service.connections.items()
127+ for eventloop, clients in rpc_service.connections.items()
128+ for client in clients
129 }
130 )
131
132diff --git a/src/provisioningserver/rackdservices/tests/test_http.py b/src/provisioningserver/rackdservices/tests/test_http.py
133index bc43c66..43cb495 100644
134--- a/src/provisioningserver/rackdservices/tests/test_http.py
135+++ b/src/provisioningserver/rackdservices/tests/test_http.py
136@@ -92,7 +92,8 @@ class TestRackHTTPService(MAASTestCase):
137 return frozenset(
138 {
139 client.address[0]
140- for _, client in rpc_service.connections.items()
141+ for _, clients in rpc_service.connections.items()
142+ for client in clients
143 }
144 )
145
146@@ -208,7 +209,7 @@ class TestRackHTTPService(MAASTestCase):
147 ip = factory.make_ip_address()
148 mock_conn = Mock()
149 mock_conn.address = (ip, random.randint(5240, 5250))
150- mock_rpc.connections[eventloop] = mock_conn
151+ mock_rpc.connections[eventloop] = {mock_conn}
152
153 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)
154 region_ips = list(service._genRegionIps())
155@@ -225,7 +226,7 @@ class TestRackHTTPService(MAASTestCase):
156 ip = factory.make_ip_address()
157 mock_conn = Mock()
158 mock_conn.address = (ip, random.randint(5240, 5250))
159- mock_rpc.connections[eventloop] = mock_conn
160+ mock_rpc.connections[eventloop] = {mock_conn}
161
162 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)
163 region_ips = frozenset(service._genRegionIps())
164@@ -244,7 +245,7 @@ class TestRackHTTPService(MAASTestCase):
165 ip_addresses.add("[%s]" % ip)
166 mock_conn = Mock()
167 mock_conn.address = (ip, random.randint(5240, 5250))
168- mock_rpc.connections[eventloop] = mock_conn
169+ mock_rpc.connections[eventloop] = {mock_conn}
170
171 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)
172 region_ips = set(service._genRegionIps())
173diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
174index c92d48a..7b94e49 100644
175--- a/src/provisioningserver/rpc/clusterservice.py
176+++ b/src/provisioningserver/rpc/clusterservice.py
177@@ -999,6 +999,7 @@ class ClusterClient(Cluster):
178 # Events for this protocol's life-cycle.
179 self.authenticated = DeferredValue()
180 self.ready = DeferredValue()
181+ self.in_use = False
182 self.localIdent = None
183
184 @property
185@@ -1201,7 +1202,7 @@ class ClusterClientService(TimerService):
186
187 time_started = None
188
189- def __init__(self, reactor):
190+ def __init__(self, reactor, max_idle_conns=1, max_conns=1, keepalive=1000):
191 super().__init__(self._calculate_interval(None, None), self._tryUpdate)
192 self.connections = {}
193 self.try_connections = {}
194@@ -1224,10 +1225,40 @@ class ClusterClientService(TimerService):
195 self._updateInProgress = DeferredValue()
196 self._updateInProgress.set(None)
197
198+ # The maximum number of connections to allways allocate per eventloop
199+ self._max_idle_connections = max_idle_conns
200+ # The maximum number of connections to allocate while under load per eventloop
201+ self._max_connections = max_conns
202+ # The duration in milliseconds to keep scaled up connections alive
203+ self._keepalive = keepalive
204+
205 def startService(self):
206 self.time_started = self.clock.seconds()
207 super().startService()
208
209+ def _reap_extra_connection(self, eventloop, conn):
210+ if not conn.in_use:
211+ self._drop_connection(conn)
212+ return self._remove_connection(eventloop, conn)
213+ return self.clock.callLater(
214+ self._keepalive, self._reap_extra_connection, conn
215+ )
216+
217+ @inlineCallbacks
218+ def _scale_up_connections(self):
219+ for ev, ev_conns in self.connections.items():
220+ # pick first group with room for additional conns
221+ if len(ev_conns) < self._max_connections:
222+ # spawn an extra connection
223+ conn_to_clone = random.choice(list(ev_conns))
224+ conn = yield self._make_connection(ev, conn_to_clone.address)
225+ self.connections[ev].add(conn)
226+ self.clock.callLater(
227+ self._keepalive, self._reap_extra_connection, ev, conn
228+ )
229+ return
230+ raise exceptions.MaxConnectionsOpen
231+
232 def getClient(self):
233 """Returns a :class:`common.Client` connected to a region.
234
235@@ -1236,11 +1267,22 @@ class ClusterClientService(TimerService):
236 :raises: :py:class:`~.exceptions.NoConnectionsAvailable` when
237 there are no open connections to a region controller.
238 """
239- conns = list(self.connections.values())
240+ conns = [
241+ conn for conn_set in self.connections.values() for conn in conn_set
242+ ]
243 if len(conns) == 0:
244 raise exceptions.NoConnectionsAvailable()
245 else:
246- return common.Client(random.choice(conns))
247+ free_conns = [conn for conn in conns if not conn.in_use]
248+ if len(free_conns) > 0:
249+ return common.Client(random.choice(free_conns))
250+ else:
251+ for endpoint_conns in self.connections.values():
252+ if len(endpoint_conns) < self._max_connections:
253+ # caller should create a new connection
254+ raise exceptions.AllConnectionsBusy
255+ # return a busy connection, assume it will free up or timeout
256+ return common.Client(random.choice(conns))
257
258 @deferred
259 def getClientNow(self):
260@@ -1259,10 +1301,18 @@ class ClusterClientService(TimerService):
261 return self.getClient()
262 except exceptions.NoConnectionsAvailable:
263 return self._tryUpdate().addCallback(call, self.getClient)
264+ except exceptions.AllConnectionsBusy:
265+ return self._scale_up_connections().addCallback(
266+ call, self.getClient
267+ )
268
269 def getAllClients(self):
270 """Return a list of all connected :class:`common.Client`s."""
271- return [common.Client(conn) for conn in self.connections.values()]
272+ return [
273+ common.Client(conn)
274+ for conns in self.connections.values()
275+ for conn in conns
276+ ]
277
278 def _tryUpdate(self):
279 """Attempt to refresh outgoing connections.
280@@ -1391,7 +1441,9 @@ class ClusterClientService(TimerService):
281 """Update the saved RPC info state."""
282 # Build a list of addresses based on the current connections.
283 connected_addr = {
284- conn.address[0] for _, conn in self.connections.items()
285+ conn.address[0]
286+ for _, conns in self.connections.items()
287+ for conn in conns
288 }
289 if (
290 self._rpc_info_state is None
291@@ -1761,6 +1813,7 @@ class ClusterClientService(TimerService):
292 """Drop the given `connection`."""
293 return connection.transport.loseConnection()
294
295+ @inlineCallbacks
296 def add_connection(self, eventloop, connection):
297 """Add the connection to the tracked connections.
298
299@@ -1769,7 +1822,16 @@ class ClusterClientService(TimerService):
300 """
301 if eventloop in self.try_connections:
302 del self.try_connections[eventloop]
303- self.connections[eventloop] = connection
304+ if not self.connections.get(eventloop):
305+ self.connections[eventloop] = set()
306+ self.connections[eventloop].add(connection)
307+ # clone connection to equal num idle connections
308+ if self._max_idle_connections - 1 > 0:
309+ for _ in range(self._max_idle_connections - 1):
310+ extra_conn = yield self._make_connection(
311+ connection.eventloop, connection.address
312+ )
313+ self.connections[eventloop].add(extra_conn)
314 self._update_saved_rpc_info_state()
315
316 def remove_connection(self, eventloop, connection):
317@@ -1782,8 +1844,10 @@ class ClusterClientService(TimerService):
318 if self.try_connections[eventloop] is connection:
319 del self.try_connections[eventloop]
320 if eventloop in self.connections:
321- if self.connections[eventloop] is connection:
322- del self.connections[eventloop]
323+ if connection in self.connections.get(eventloop, set()):
324+ self.connections[eventloop].discard(connection)
325+ if len(self.connections[eventloop]) == 0:
326+ del self.connections[eventloop]
327 # Disable DHCP when no connections to a region controller.
328 if len(self.connections) == 0:
329 stopping_services = []
330diff --git a/src/provisioningserver/rpc/common.py b/src/provisioningserver/rpc/common.py
331index 5d67bba..40e091f 100644
332--- a/src/provisioningserver/rpc/common.py
333+++ b/src/provisioningserver/rpc/common.py
334@@ -14,7 +14,11 @@ from twisted.python.failure import Failure
335 from provisioningserver.logger import LegacyLogger
336 from provisioningserver.prometheus.metrics import PROMETHEUS_METRICS
337 from provisioningserver.rpc.interfaces import IConnection, IConnectionToRegion
338-from provisioningserver.utils.twisted import asynchronous, deferWithTimeout
339+from provisioningserver.utils.twisted import (
340+ asynchronous,
341+ callOut,
342+ deferWithTimeout,
343+)
344
345 log = LegacyLogger()
346
347@@ -156,6 +160,11 @@ class Client:
348 :return: A deferred result. Call its `wait` method (with a timeout
349 in seconds) to block on the call's completion.
350 """
351+ self._conn.in_use = True
352+
353+ def _free_conn():
354+ self._conn.in_use = False
355+
356 if len(args) != 0:
357 receiver_name = "{}.{}".format(
358 self.__module__,
359@@ -171,11 +180,19 @@ class Client:
360 if timeout is undefined:
361 timeout = 120 # 2 minutes
362 if timeout is None or timeout <= 0:
363- return self._conn.callRemote(cmd, **kwargs)
364+ d = self._conn.callRemote(cmd, **kwargs)
365+ if isinstance(d, Deferred):
366+ d.addBoth(lambda x: callOut(x, _free_conn))
367+ else:
368+ _free_conn()
369+ return d
370 else:
371- return deferWithTimeout(
372- timeout, self._conn.callRemote, cmd, **kwargs
373- )
374+ d = deferWithTimeout(timeout, self._conn.callRemote, cmd, **kwargs)
375+ if isinstance(d, Deferred):
376+ d.addBoth(lambda x: callOut(x, _free_conn))
377+ else:
378+ _free_conn()
379+ return d
380
381 @asynchronous
382 def getHostCertificate(self):
383diff --git a/src/provisioningserver/rpc/exceptions.py b/src/provisioningserver/rpc/exceptions.py
384index 7ee4f3f..136e471 100644
385--- a/src/provisioningserver/rpc/exceptions.py
386+++ b/src/provisioningserver/rpc/exceptions.py
387@@ -12,6 +12,14 @@ class NoConnectionsAvailable(Exception):
388 self.uuid = uuid
389
390
391+class AllConnectionsBusy(Exception):
392+ """The current connection pool is busy"""
393+
394+
395+class MaxConnectionsOpen(Exception):
396+ """The maxmimum number of connections are currently open"""
397+
398+
399 class NoSuchEventType(Exception):
400 """The specified event type was not found."""
401
402diff --git a/src/provisioningserver/rpc/testing/__init__.py b/src/provisioningserver/rpc/testing/__init__.py
403index ee4a9e2..1b2f94f 100644
404--- a/src/provisioningserver/rpc/testing/__init__.py
405+++ b/src/provisioningserver/rpc/testing/__init__.py
406@@ -262,7 +262,8 @@ class MockClusterToRegionRPCFixtureBase(fixtures.Fixture, metaclass=ABCMeta):
407 {
408 "eventloops": {
409 eventloop: [client.address]
410- for eventloop, client in connections
411+ for eventloop, clients in connections
412+ for client in clients
413 }
414 },
415 orig_url,
416diff --git a/src/provisioningserver/rpc/testing/doubles.py b/src/provisioningserver/rpc/testing/doubles.py
417index cb9f27f..0785859 100644
418--- a/src/provisioningserver/rpc/testing/doubles.py
419+++ b/src/provisioningserver/rpc/testing/doubles.py
420@@ -30,6 +30,7 @@ class FakeConnection:
421 ident = attr.ib(default=sentinel.ident)
422 hostCertificate = attr.ib(default=sentinel.hostCertificate)
423 peerCertificate = attr.ib(default=sentinel.peerCertificate)
424+ in_use = attr.ib(default=False)
425
426 def callRemote(self, cmd, **arguments):
427 return succeed(sentinel.response)
428@@ -48,6 +49,7 @@ class FakeConnectionToRegion:
429 address = attr.ib(default=(sentinel.host, sentinel.port))
430 hostCertificate = attr.ib(default=sentinel.hostCertificate)
431 peerCertificate = attr.ib(default=sentinel.peerCertificate)
432+ in_use = attr.ib(default=False)
433
434 def callRemote(self, cmd, **arguments):
435 return succeed(sentinel.response)
436@@ -56,6 +58,22 @@ class FakeConnectionToRegion:
437 verifyObject(IConnectionToRegion, FakeConnectionToRegion())
438
439
440+@attr.s(eq=False, order=False)
441+@implementer(IConnectionToRegion)
442+class FakeBusyConnectionToRegion:
443+ "A fake `IConnectionToRegion` that appears busy." ""
444+
445+ ident = attr.ib(default=sentinel.ident)
446+ localIdent = attr.ib(default=sentinel.localIdent)
447+ address = attr.ib(default=(sentinel.host, sentinel.port))
448+ hostCertificate = attr.ib(default=sentinel.hostCertificate)
449+ peerCertificate = attr.ib(default=sentinel.peerCertificate)
450+ in_use = attr.ib(default=True)
451+
452+ def callRemote(self, cmd, **arguments):
453+ return succeed(sentinel.response)
454+
455+
456 class StubOS(OperatingSystem):
457 """An :py:class:`OperatingSystem` subclass that has canned answers.
458
459diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
460index b50311d..fab455a 100644
461--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
462+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
463@@ -117,7 +117,11 @@ from provisioningserver.rpc.testing import (
464 call_responder,
465 MockLiveClusterToRegionRPCFixture,
466 )
467-from provisioningserver.rpc.testing.doubles import DummyConnection, StubOS
468+from provisioningserver.rpc.testing.doubles import (
469+ FakeBusyConnectionToRegion,
470+ FakeConnection,
471+ StubOS,
472+)
473 from provisioningserver.security import set_shared_secret_on_filesystem
474 from provisioningserver.service_monitor import service_monitor
475 from provisioningserver.testing.config import ClusterConfigurationFixture
476@@ -444,8 +448,10 @@ class TestClusterProtocol_DescribePowerTypes(MAASTestCase):
477 )
478
479
480-def make_inert_client_service():
481- service = ClusterClientService(Clock())
482+def make_inert_client_service(max_idle_conns=1, max_conns=1, keepalive=1):
483+ service = ClusterClientService(
484+ Clock(), max_idle_conns, max_conns, keepalive
485+ )
486 # ClusterClientService's superclass, TimerService, creates a
487 # LoopingCall with now=True. We neuter it here to allow
488 # observation of the behaviour of _update_interval() for
489@@ -499,10 +505,10 @@ class TestClusterClientService(MAASTestCase):
490
491 # Fake some connections.
492 service.connections = {
493- ipv4client.eventloop: ipv4client,
494- ipv6client.eventloop: ipv6client,
495- ipv6mapped.eventloop: ipv6mapped,
496- hostclient.eventloop: hostclient,
497+ ipv4client.eventloop: {ipv4client},
498+ ipv6client.eventloop: {ipv6client},
499+ ipv6mapped.eventloop: {ipv6mapped},
500+ hostclient.eventloop: {hostclient},
501 }
502
503 # Update the RPC state to the filesystem and info cache.
504@@ -515,7 +521,8 @@ class TestClusterClientService(MAASTestCase):
505 Equals(
506 {
507 client.address[0]
508- for _, client in service.connections.items()
509+ for _, clients in service.connections.items()
510+ for client in clients
511 }
512 ),
513 )
514@@ -1234,7 +1241,7 @@ class TestClusterClientService(MAASTestCase):
515 connection = Mock()
516 connection.address = (":::ffff", 2222)
517 service.add_connection(endpoint, connection)
518- self.assertThat(service.connections, Equals({endpoint: connection}))
519+ self.assertEqual(service.connections, {endpoint: {connection}})
520
521 def test_add_connection_calls__update_saved_rpc_info_state(self):
522 service = make_inert_client_service()
523@@ -1248,6 +1255,30 @@ class TestClusterClientService(MAASTestCase):
524 service._update_saved_rpc_info_state, MockCalledOnceWith()
525 )
526
527+ def test_add_connection_creates_max_idle_connections(self):
528+ service = make_inert_client_service(max_idle_conns=2)
529+ service.startService()
530+ endpoint = Mock()
531+ connection = Mock()
532+ connection.address = (":::ffff", 2222)
533+ connection2 = Mock()
534+ connection.address = (":::ffff", 2222)
535+ self.patch(service, "_make_connection").return_value = succeed(
536+ connection2
537+ )
538+ self.patch_autospec(service, "_update_saved_rpc_info_state")
539+ service.add_connection(endpoint, connection)
540+ self.assertEqual(
541+ len(
542+ [
543+ conn
544+ for conns in service.connections.values()
545+ for conn in conns
546+ ]
547+ ),
548+ service._max_idle_connections,
549+ )
550+
551 def test_remove_connection_removes_from_try_connections(self):
552 service = make_inert_client_service()
553 service.startService()
554@@ -1262,7 +1293,7 @@ class TestClusterClientService(MAASTestCase):
555 service.startService()
556 endpoint = Mock()
557 connection = Mock()
558- service.connections[endpoint] = connection
559+ service.connections[endpoint] = {connection}
560 service.remove_connection(endpoint, connection)
561 self.assertThat(service.connections, Equals({}))
562
563@@ -1271,7 +1302,7 @@ class TestClusterClientService(MAASTestCase):
564 service.startService()
565 endpoint = Mock()
566 connection = Mock()
567- service.connections[endpoint] = connection
568+ service.connections[endpoint] = {connection}
569 service.remove_connection(endpoint, connection)
570 self.assertEqual(service.step, service.INTERVAL_LOW)
571
572@@ -1280,7 +1311,7 @@ class TestClusterClientService(MAASTestCase):
573 service.startService()
574 endpoint = Mock()
575 connection = Mock()
576- service.connections[endpoint] = connection
577+ service.connections[endpoint] = {connection}
578
579 # Enable both dhcpd and dhcpd6.
580 service_monitor.getServiceByName("dhcpd").on()
581@@ -1295,13 +1326,17 @@ class TestClusterClientService(MAASTestCase):
582 def test_getClient(self):
583 service = ClusterClientService(Clock())
584 service.connections = {
585- sentinel.eventloop01: DummyConnection(),
586- sentinel.eventloop02: DummyConnection(),
587- sentinel.eventloop03: DummyConnection(),
588+ sentinel.eventloop01: {FakeConnection()},
589+ sentinel.eventloop02: {FakeConnection()},
590+ sentinel.eventloop03: {FakeConnection()},
591 }
592 self.assertIn(
593 service.getClient(),
594- {common.Client(conn) for conn in service.connections.values()},
595+ {
596+ common.Client(conn)
597+ for conns in service.connections.values()
598+ for conn in conns
599+ },
600 )
601
602 def test_getClient_when_there_are_no_connections(self):
603@@ -1310,17 +1345,65 @@ class TestClusterClientService(MAASTestCase):
604 self.assertRaises(exceptions.NoConnectionsAvailable, service.getClient)
605
606 @inlineCallbacks
607+ def test_getClientNow_scales_connections_when_busy(self):
608+ service = ClusterClientService(Clock(), max_conns=2)
609+ service.connections = {
610+ sentinel.eventloop01: {FakeBusyConnectionToRegion()},
611+ sentinel.eventloop02: {FakeBusyConnectionToRegion()},
612+ sentinel.eventloop03: {FakeBusyConnectionToRegion()},
613+ }
614+ self.patch(service, "_make_connection").return_value = succeed(
615+ FakeConnection()
616+ )
617+ original_conns = [
618+ conn for conns in service.connections.values() for conn in conns
619+ ]
620+ new_client = yield service.getClientNow()
621+ new_conn = new_client._conn
622+ self.assertIsNotNone(new_conn)
623+ self.assertNotIn(new_conn, original_conns)
624+ self.assertIn(
625+ new_conn,
626+ [conn for conns in service.connections.values() for conn in conns],
627+ )
628+
629+ @inlineCallbacks
630+ def test_getClientNow_returns_an_existing_connection_when_max_are_open(
631+ self,
632+ ):
633+ service = ClusterClientService(Clock(), max_conns=1)
634+ service.connections = {
635+ sentinel.eventloop01: {FakeBusyConnectionToRegion()},
636+ sentinel.eventloop02: {FakeBusyConnectionToRegion()},
637+ sentinel.eventloop03: {FakeBusyConnectionToRegion()},
638+ }
639+ self.patch(service, "_make_connection").return_value = succeed(
640+ FakeConnection()
641+ )
642+ original_conns = [
643+ conn for conns in service.connections.values() for conn in conns
644+ ]
645+ new_client = yield service.getClientNow()
646+ new_conn = new_client._conn
647+ self.assertIsNotNone(new_conn)
648+ self.assertIn(new_conn, original_conns)
649+
650+ @inlineCallbacks
651 def test_getClientNow_returns_current_connection(self):
652 service = ClusterClientService(Clock())
653 service.connections = {
654- sentinel.eventloop01: DummyConnection(),
655- sentinel.eventloop02: DummyConnection(),
656- sentinel.eventloop03: DummyConnection(),
657+ sentinel.eventloop01: {FakeConnection()},
658+ sentinel.eventloop02: {FakeConnection()},
659+ sentinel.eventloop03: {FakeConnection()},
660 }
661 client = yield service.getClientNow()
662 self.assertIn(
663 client,
664- {common.Client(conn) for conn in service.connections.values()},
665+ {
666+ common.Client(conn)
667+ for conns in service.connections.values()
668+ for conn in conns
669+ },
670 )
671
672 @inlineCallbacks
673@@ -1330,9 +1413,9 @@ class TestClusterClientService(MAASTestCase):
674
675 def addConnections():
676 service.connections = {
677- sentinel.eventloop01: DummyConnection(),
678- sentinel.eventloop02: DummyConnection(),
679- sentinel.eventloop03: DummyConnection(),
680+ sentinel.eventloop01: {FakeConnection()},
681+ sentinel.eventloop02: {FakeConnection()},
682+ sentinel.eventloop03: {FakeConnection()},
683 }
684 return succeed(None)
685
686@@ -1340,7 +1423,11 @@ class TestClusterClientService(MAASTestCase):
687 client = yield service.getClientNow()
688 self.assertIn(
689 client,
690- {common.Client(conn) for conn in service.connections.values()},
691+ {
692+ common.Client(conn)
693+ for conns in service.connections.values()
694+ for conn in conns
695+ },
696 )
697
698 def test_getClientNow_raises_exception_when_no_clients(self):
699@@ -1383,11 +1470,11 @@ class TestClusterClientService(MAASTestCase):
700 def test_getAllClients(self):
701 service = ClusterClientService(Clock())
702 uuid1 = factory.make_UUID()
703- c1 = DummyConnection()
704- service.connections[uuid1] = c1
705+ c1 = FakeConnection()
706+ service.connections[uuid1] = {c1}
707 uuid2 = factory.make_UUID()
708- c2 = DummyConnection()
709- service.connections[uuid2] = c2
710+ c2 = FakeConnection()
711+ service.connections[uuid2] = {c2}
712 clients = service.getAllClients()
713 self.assertEqual(clients, [common.Client(c1), common.Client(c2)])
714
715@@ -1396,6 +1483,26 @@ class TestClusterClientService(MAASTestCase):
716 service.connections = {}
717 self.assertThat(service.getAllClients(), Equals([]))
718
719+ @inlineCallbacks
720+ def test__reap_extra_connection_reaps_a_scaled_up_connection(self):
721+ clock = Clock()
722+ service = ClusterClientService(clock, max_conns=2, keepalive=0)
723+ service.connections = {
724+ sentinel.eventloop01: {FakeBusyConnectionToRegion()},
725+ sentinel.eventloop02: {FakeBusyConnectionToRegion()},
726+ sentinel.eventloop03: {FakeBusyConnectionToRegion()},
727+ }
728+ self.patch(service, "_make_connection").return_value = succeed(
729+ FakeConnection()
730+ )
731+ reap_call = self.patch(service, "_reap_extra_connection")
732+ new_client = yield service.getClientNow()
733+ delayed_calls = clock.getDelayedCalls()
734+ self.assertEqual(len(delayed_calls), 1)
735+ delayed_call = delayed_calls[0]
736+ self.assertIn(new_client._conn, delayed_call.args)
737+ self.assertEqual(reap_call.__name__, delayed_call.func.__name__)
738+
739
740 class TestClusterClientServiceIntervals(MAASTestCase):
741
742@@ -1562,14 +1669,14 @@ class TestClusterClient(MAASTestCase):
743 self.assertEqual(client.eventloop, extract_result(wait_for_ready))
744 self.assertEqual(client.service.try_connections, {})
745 self.assertEqual(
746- client.service.connections, {client.eventloop: client}
747+ client.service.connections, {client.eventloop: {client}}
748 )
749
750 def test_disconnects_when_there_is_an_existing_connection(self):
751 client = self.make_running_client()
752
753 # Pretend that a connection already exists for this address.
754- client.service.connections[client.eventloop] = sentinel.connection
755+ client.service.connections[client.eventloop] = {sentinel.connection}
756
757 # Connect via an in-memory transport.
758 transport = StringTransportWithDisconnection()
759@@ -1586,7 +1693,8 @@ class TestClusterClient(MAASTestCase):
760 # The connections list is unchanged because the new connection
761 # immediately disconnects.
762 self.assertEqual(
763- client.service.connections, {client.eventloop: sentinel.connection}
764+ client.service.connections,
765+ {client.eventloop: {sentinel.connection}},
766 )
767 self.assertFalse(client.connected)
768 self.assertIsNone(client.transport)

Subscribers

People subscribed via source and target branches