Merge ~cgrabowski/maas:rpc_connection_pool_burst into maas:master

Proposed by Christian Grabowski
Status: Merged
Approved by: Christian Grabowski
Approved revision: 5f3b0b943fd0ca495036f6b21938da58ddeb6bba
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~cgrabowski/maas:rpc_connection_pool_burst
Merge into: maas:master
Diff against target: 768 lines (+304/-59)
12 files modified
src/provisioningserver/config.py (+17/-0)
src/provisioningserver/plugin.py (+7/-1)
src/provisioningserver/rackdservices/external.py (+3/-2)
src/provisioningserver/rackdservices/http.py (+3/-2)
src/provisioningserver/rackdservices/tests/test_external.py (+8/-5)
src/provisioningserver/rackdservices/tests/test_http.py (+5/-4)
src/provisioningserver/rpc/clusterservice.py (+72/-8)
src/provisioningserver/rpc/common.py (+22/-5)
src/provisioningserver/rpc/exceptions.py (+8/-0)
src/provisioningserver/rpc/testing/__init__.py (+2/-1)
src/provisioningserver/rpc/testing/doubles.py (+18/-0)
src/provisioningserver/rpc/tests/test_clusterservice.py (+139/-31)
Reviewer Review Type Date Requested Status
Adam Collard (community) Needs Information
MAAS Lander Approve
Alexsander de Souza Approve
Review via email: mp+428208@code.launchpad.net

Commit message

allocate additional connections when busy

always connect max idle connections times

Description of the change

Allows RPC connections to scale up to a configured amount when busy.

To post a comment you must log in.
Revision history for this message
Alexsander de Souza (alexsander-souza) wrote :

+1

review: Approve
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/300/consoleText
COMMIT: ba71e215175a711cbe340ccfa250bc1a1093ccb1

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/301/consoleText
COMMIT: 74329f97f1dd1a3d08ea1f795ebf9c51cacf5b41

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/302/consoleText
COMMIT: 820009b533a8fbbc449ff7c88af733ef8107fae8

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/303/consoleText
COMMIT: 47a4dfedbee9d93a57484e53d0557f5a29c32396

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/304/consoleText
COMMIT: 8f3912b1979ab12d569d49c7086d8cdbe86f55ba

review: Needs Fixing
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b rpc_connection_pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 5f3b0b943fd0ca495036f6b21938da58ddeb6bba

review: Approve
Revision history for this message
Adam Collard (adam-collard) :
Revision history for this message
Adam Collard (adam-collard) wrote :

How do the idle connections work with the _update_interval method of ClusterClientService?

That method wants to control how often to poll the region controller to establish connectivity, and has logic to compare the number of connected connections with the expected (which i think will go wrong with idle/bursting connections).

review: Needs Information
Revision history for this message
Adam Collard (adam-collard) wrote :

This broke system tests and could do with a little more love.

Reverting in https://code.launchpad.net/~adam-collard/maas/+git/maas/+merge/428258 to retain green tests and give space for the work

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/src/provisioningserver/config.py b/src/provisioningserver/config.py
index 97b6e68..9511ad6 100644
--- a/src/provisioningserver/config.py
+++ b/src/provisioningserver/config.py
@@ -762,6 +762,23 @@ class ClusterConfiguration(Configuration, metaclass=ClusterConfigurationMeta):
762 ),762 ),
763 )763 )
764764
765 # RPC Connection Pool options
766 max_idle_rpc_connections = ConfigurationOption(
767 "max_idle_rpc_connections",
768 "The nominal number of connections to have per endpoint",
769 Number(min=1, max=1024, if_missing=1),
770 )
771 max_rpc_connections = ConfigurationOption(
772 "max_rpc_connections",
773 "The maximum number of connections to scale to when under load",
774 Number(min=1, max=1024, if_missing=4),
775 )
776 rpc_keepalive = ConfigurationOption(
777 "rpc_keepalive",
778 "The duration in miliseconds to keep added connections alive",
779 Number(min=1, max=5000, if_missing=1000),
780 )
781
765 # TFTP options.782 # TFTP options.
766 tftp_port = ConfigurationOption(783 tftp_port = ConfigurationOption(
767 "tftp_port",784 "tftp_port",
diff --git a/src/provisioningserver/plugin.py b/src/provisioningserver/plugin.py
index e987c73..00ff898 100644
--- a/src/provisioningserver/plugin.py
+++ b/src/provisioningserver/plugin.py
@@ -139,7 +139,13 @@ class ProvisioningServiceMaker:
139 def _makeRPCService(self):139 def _makeRPCService(self):
140 from provisioningserver.rpc.clusterservice import ClusterClientService140 from provisioningserver.rpc.clusterservice import ClusterClientService
141141
142 rpc_service = ClusterClientService(reactor)142 with ClusterConfiguration.open() as config:
143 rpc_service = ClusterClientService(
144 reactor,
145 config.max_idle_rpc_connections,
146 config.max_rpc_connections,
147 config.rpc_keepalive,
148 )
143 rpc_service.setName("rpc")149 rpc_service.setName("rpc")
144 return rpc_service150 return rpc_service
145151
diff --git a/src/provisioningserver/rackdservices/external.py b/src/provisioningserver/rackdservices/external.py
index ccabb74..5b8afe5 100644
--- a/src/provisioningserver/rackdservices/external.py
+++ b/src/provisioningserver/rackdservices/external.py
@@ -68,8 +68,9 @@ class RackOnlyExternalService(metaclass=ABCMeta):
6868
69 # Filter the connects by region.69 # Filter the connects by region.
70 conn_per_region = defaultdict(set)70 conn_per_region = defaultdict(set)
71 for eventloop, connection in connections.items():71 for eventloop, connection_set in connections.items():
72 conn_per_region[eventloop.split(":")[0]].add(connection)72 for connection in connection_set:
73 conn_per_region[eventloop.split(":")[0]].add(connection)
73 for eventloop, connections in conn_per_region.items():74 for eventloop, connections in conn_per_region.items():
74 # Sort the connections so the same IP is always picked per75 # Sort the connections so the same IP is always picked per
75 # region controller. This ensures that the HTTP configuration76 # region controller. This ensures that the HTTP configuration
diff --git a/src/provisioningserver/rackdservices/http.py b/src/provisioningserver/rackdservices/http.py
index 421e35f..bda9d23 100644
--- a/src/provisioningserver/rackdservices/http.py
+++ b/src/provisioningserver/rackdservices/http.py
@@ -101,8 +101,9 @@ class RackHTTPService(TimerService):
101 controller is connected to."""101 controller is connected to."""
102 # Filter the connects by region.102 # Filter the connects by region.
103 conn_per_region = defaultdict(set)103 conn_per_region = defaultdict(set)
104 for eventloop, connection in self._rpc_service.connections.items():104 for eventloop, connection_set in self._rpc_service.connections.items():
105 conn_per_region[eventloop.split(":")[0]].add(connection)105 for connection in connection_set:
106 conn_per_region[eventloop.split(":")[0]].add(connection)
106 for _, connections in conn_per_region.items():107 for _, connections in conn_per_region.items():
107 # Sort the connections so the same IP is always picked per108 # Sort the connections so the same IP is always picked per
108 # region controller. This ensures that the HTTP configuration109 # region controller. This ensures that the HTTP configuration
diff --git a/src/provisioningserver/rackdservices/tests/test_external.py b/src/provisioningserver/rackdservices/tests/test_external.py
index ad214a1..0cb8601 100644
--- a/src/provisioningserver/rackdservices/tests/test_external.py
+++ b/src/provisioningserver/rackdservices/tests/test_external.py
@@ -430,7 +430,8 @@ class TestRackDNS(MAASTestCase):
430 return frozenset(430 return frozenset(
431 {431 {
432 client.address[0]432 client.address[0]
433 for _, client in rpc_service.connections.items()433 for _, clients in rpc_service.connections.items()
434 for client in clients
434 }435 }
435 )436 )
436437
@@ -609,7 +610,7 @@ class TestRackDNS(MAASTestCase):
609 ip = factory.make_ip_address()610 ip = factory.make_ip_address()
610 mock_conn = Mock()611 mock_conn = Mock()
611 mock_conn.address = (ip, random.randint(5240, 5250))612 mock_conn.address = (ip, random.randint(5240, 5250))
612 mock_rpc.connections[eventloop] = mock_conn613 mock_rpc.connections[eventloop] = {mock_conn}
613614
614 dns = external.RackDNS()615 dns = external.RackDNS()
615 region_ips = list(dns._genRegionIps(mock_rpc.connections))616 region_ips = list(dns._genRegionIps(mock_rpc.connections))
@@ -626,7 +627,7 @@ class TestRackDNS(MAASTestCase):
626 ip = factory.make_ip_address()627 ip = factory.make_ip_address()
627 mock_conn = Mock()628 mock_conn = Mock()
628 mock_conn.address = (ip, random.randint(5240, 5250))629 mock_conn.address = (ip, random.randint(5240, 5250))
629 mock_rpc.connections[eventloop] = mock_conn630 mock_rpc.connections[eventloop] = {mock_conn}
630631
631 dns = external.RackDNS()632 dns = external.RackDNS()
632 region_ips = frozenset(dns._genRegionIps(mock_rpc.connections))633 region_ips = frozenset(dns._genRegionIps(mock_rpc.connections))
@@ -659,7 +660,8 @@ class TestRackProxy(MAASTestCase):
659 return frozenset(660 return frozenset(
660 {661 {
661 client.address[0]662 client.address[0]
662 for _, client in rpc_service.connections.items()663 for _, clients in rpc_service.connections.items()
664 for client in clients
663 }665 }
664 )666 )
665667
@@ -824,7 +826,8 @@ class TestRackSyslog(MAASTestCase):
824 return frozenset(826 return frozenset(
825 {827 {
826 (eventloop, client.address[0])828 (eventloop, client.address[0])
827 for eventloop, client in rpc_service.connections.items()829 for eventloop, clients in rpc_service.connections.items()
830 for client in clients
828 }831 }
829 )832 )
830833
diff --git a/src/provisioningserver/rackdservices/tests/test_http.py b/src/provisioningserver/rackdservices/tests/test_http.py
index bc43c66..43cb495 100644
--- a/src/provisioningserver/rackdservices/tests/test_http.py
+++ b/src/provisioningserver/rackdservices/tests/test_http.py
@@ -92,7 +92,8 @@ class TestRackHTTPService(MAASTestCase):
92 return frozenset(92 return frozenset(
93 {93 {
94 client.address[0]94 client.address[0]
95 for _, client in rpc_service.connections.items()95 for _, clients in rpc_service.connections.items()
96 for client in clients
96 }97 }
97 )98 )
9899
@@ -208,7 +209,7 @@ class TestRackHTTPService(MAASTestCase):
208 ip = factory.make_ip_address()209 ip = factory.make_ip_address()
209 mock_conn = Mock()210 mock_conn = Mock()
210 mock_conn.address = (ip, random.randint(5240, 5250))211 mock_conn.address = (ip, random.randint(5240, 5250))
211 mock_rpc.connections[eventloop] = mock_conn212 mock_rpc.connections[eventloop] = {mock_conn}
212213
213 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)214 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)
214 region_ips = list(service._genRegionIps())215 region_ips = list(service._genRegionIps())
@@ -225,7 +226,7 @@ class TestRackHTTPService(MAASTestCase):
225 ip = factory.make_ip_address()226 ip = factory.make_ip_address()
226 mock_conn = Mock()227 mock_conn = Mock()
227 mock_conn.address = (ip, random.randint(5240, 5250))228 mock_conn.address = (ip, random.randint(5240, 5250))
228 mock_rpc.connections[eventloop] = mock_conn229 mock_rpc.connections[eventloop] = {mock_conn}
229230
230 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)231 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)
231 region_ips = frozenset(service._genRegionIps())232 region_ips = frozenset(service._genRegionIps())
@@ -244,7 +245,7 @@ class TestRackHTTPService(MAASTestCase):
244 ip_addresses.add("[%s]" % ip)245 ip_addresses.add("[%s]" % ip)
245 mock_conn = Mock()246 mock_conn = Mock()
246 mock_conn.address = (ip, random.randint(5240, 5250))247 mock_conn.address = (ip, random.randint(5240, 5250))
247 mock_rpc.connections[eventloop] = mock_conn248 mock_rpc.connections[eventloop] = {mock_conn}
248249
249 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)250 service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor)
250 region_ips = set(service._genRegionIps())251 region_ips = set(service._genRegionIps())
diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py
index c92d48a..7b94e49 100644
--- a/src/provisioningserver/rpc/clusterservice.py
+++ b/src/provisioningserver/rpc/clusterservice.py
@@ -999,6 +999,7 @@ class ClusterClient(Cluster):
999 # Events for this protocol's life-cycle.999 # Events for this protocol's life-cycle.
1000 self.authenticated = DeferredValue()1000 self.authenticated = DeferredValue()
1001 self.ready = DeferredValue()1001 self.ready = DeferredValue()
1002 self.in_use = False
1002 self.localIdent = None1003 self.localIdent = None
10031004
1004 @property1005 @property
@@ -1201,7 +1202,7 @@ class ClusterClientService(TimerService):
12011202
1202 time_started = None1203 time_started = None
12031204
1204 def __init__(self, reactor):1205 def __init__(self, reactor, max_idle_conns=1, max_conns=1, keepalive=1000):
1205 super().__init__(self._calculate_interval(None, None), self._tryUpdate)1206 super().__init__(self._calculate_interval(None, None), self._tryUpdate)
1206 self.connections = {}1207 self.connections = {}
1207 self.try_connections = {}1208 self.try_connections = {}
@@ -1224,10 +1225,40 @@ class ClusterClientService(TimerService):
1224 self._updateInProgress = DeferredValue()1225 self._updateInProgress = DeferredValue()
1225 self._updateInProgress.set(None)1226 self._updateInProgress.set(None)
12261227
1228 # The maximum number of connections to allways allocate per eventloop
1229 self._max_idle_connections = max_idle_conns
1230 # The maximum number of connections to allocate while under load per eventloop
1231 self._max_connections = max_conns
1232 # The duration in milliseconds to keep scaled up connections alive
1233 self._keepalive = keepalive
1234
1227 def startService(self):1235 def startService(self):
1228 self.time_started = self.clock.seconds()1236 self.time_started = self.clock.seconds()
1229 super().startService()1237 super().startService()
12301238
1239 def _reap_extra_connection(self, eventloop, conn):
1240 if not conn.in_use:
1241 self._drop_connection(conn)
1242 return self._remove_connection(eventloop, conn)
1243 return self.clock.callLater(
1244 self._keepalive, self._reap_extra_connection, conn
1245 )
1246
1247 @inlineCallbacks
1248 def _scale_up_connections(self):
1249 for ev, ev_conns in self.connections.items():
1250 # pick first group with room for additional conns
1251 if len(ev_conns) < self._max_connections:
1252 # spawn an extra connection
1253 conn_to_clone = random.choice(list(ev_conns))
1254 conn = yield self._make_connection(ev, conn_to_clone.address)
1255 self.connections[ev].add(conn)
1256 self.clock.callLater(
1257 self._keepalive, self._reap_extra_connection, ev, conn
1258 )
1259 return
1260 raise exceptions.MaxConnectionsOpen
1261
1231 def getClient(self):1262 def getClient(self):
1232 """Returns a :class:`common.Client` connected to a region.1263 """Returns a :class:`common.Client` connected to a region.
12331264
@@ -1236,11 +1267,22 @@ class ClusterClientService(TimerService):
1236 :raises: :py:class:`~.exceptions.NoConnectionsAvailable` when1267 :raises: :py:class:`~.exceptions.NoConnectionsAvailable` when
1237 there are no open connections to a region controller.1268 there are no open connections to a region controller.
1238 """1269 """
1239 conns = list(self.connections.values())1270 conns = [
1271 conn for conn_set in self.connections.values() for conn in conn_set
1272 ]
1240 if len(conns) == 0:1273 if len(conns) == 0:
1241 raise exceptions.NoConnectionsAvailable()1274 raise exceptions.NoConnectionsAvailable()
1242 else:1275 else:
1243 return common.Client(random.choice(conns))1276 free_conns = [conn for conn in conns if not conn.in_use]
1277 if len(free_conns) > 0:
1278 return common.Client(random.choice(free_conns))
1279 else:
1280 for endpoint_conns in self.connections.values():
1281 if len(endpoint_conns) < self._max_connections:
1282 # caller should create a new connection
1283 raise exceptions.AllConnectionsBusy
1284 # return a busy connection, assume it will free up or timeout
1285 return common.Client(random.choice(conns))
12441286
1245 @deferred1287 @deferred
1246 def getClientNow(self):1288 def getClientNow(self):
@@ -1259,10 +1301,18 @@ class ClusterClientService(TimerService):
1259 return self.getClient()1301 return self.getClient()
1260 except exceptions.NoConnectionsAvailable:1302 except exceptions.NoConnectionsAvailable:
1261 return self._tryUpdate().addCallback(call, self.getClient)1303 return self._tryUpdate().addCallback(call, self.getClient)
1304 except exceptions.AllConnectionsBusy:
1305 return self._scale_up_connections().addCallback(
1306 call, self.getClient
1307 )
12621308
1263 def getAllClients(self):1309 def getAllClients(self):
1264 """Return a list of all connected :class:`common.Client`s."""1310 """Return a list of all connected :class:`common.Client`s."""
1265 return [common.Client(conn) for conn in self.connections.values()]1311 return [
1312 common.Client(conn)
1313 for conns in self.connections.values()
1314 for conn in conns
1315 ]
12661316
1267 def _tryUpdate(self):1317 def _tryUpdate(self):
1268 """Attempt to refresh outgoing connections.1318 """Attempt to refresh outgoing connections.
@@ -1391,7 +1441,9 @@ class ClusterClientService(TimerService):
1391 """Update the saved RPC info state."""1441 """Update the saved RPC info state."""
1392 # Build a list of addresses based on the current connections.1442 # Build a list of addresses based on the current connections.
1393 connected_addr = {1443 connected_addr = {
1394 conn.address[0] for _, conn in self.connections.items()1444 conn.address[0]
1445 for _, conns in self.connections.items()
1446 for conn in conns
1395 }1447 }
1396 if (1448 if (
1397 self._rpc_info_state is None1449 self._rpc_info_state is None
@@ -1761,6 +1813,7 @@ class ClusterClientService(TimerService):
1761 """Drop the given `connection`."""1813 """Drop the given `connection`."""
1762 return connection.transport.loseConnection()1814 return connection.transport.loseConnection()
17631815
1816 @inlineCallbacks
1764 def add_connection(self, eventloop, connection):1817 def add_connection(self, eventloop, connection):
1765 """Add the connection to the tracked connections.1818 """Add the connection to the tracked connections.
17661819
@@ -1769,7 +1822,16 @@ class ClusterClientService(TimerService):
1769 """1822 """
1770 if eventloop in self.try_connections:1823 if eventloop in self.try_connections:
1771 del self.try_connections[eventloop]1824 del self.try_connections[eventloop]
1772 self.connections[eventloop] = connection1825 if not self.connections.get(eventloop):
1826 self.connections[eventloop] = set()
1827 self.connections[eventloop].add(connection)
1828 # clone connection to equal num idle connections
1829 if self._max_idle_connections - 1 > 0:
1830 for _ in range(self._max_idle_connections - 1):
1831 extra_conn = yield self._make_connection(
1832 connection.eventloop, connection.address
1833 )
1834 self.connections[eventloop].add(extra_conn)
1773 self._update_saved_rpc_info_state()1835 self._update_saved_rpc_info_state()
17741836
1775 def remove_connection(self, eventloop, connection):1837 def remove_connection(self, eventloop, connection):
@@ -1782,8 +1844,10 @@ class ClusterClientService(TimerService):
1782 if self.try_connections[eventloop] is connection:1844 if self.try_connections[eventloop] is connection:
1783 del self.try_connections[eventloop]1845 del self.try_connections[eventloop]
1784 if eventloop in self.connections:1846 if eventloop in self.connections:
1785 if self.connections[eventloop] is connection:1847 if connection in self.connections.get(eventloop, set()):
1786 del self.connections[eventloop]1848 self.connections[eventloop].discard(connection)
1849 if len(self.connections[eventloop]) == 0:
1850 del self.connections[eventloop]
1787 # Disable DHCP when no connections to a region controller.1851 # Disable DHCP when no connections to a region controller.
1788 if len(self.connections) == 0:1852 if len(self.connections) == 0:
1789 stopping_services = []1853 stopping_services = []
diff --git a/src/provisioningserver/rpc/common.py b/src/provisioningserver/rpc/common.py
index 5d67bba..40e091f 100644
--- a/src/provisioningserver/rpc/common.py
+++ b/src/provisioningserver/rpc/common.py
@@ -14,7 +14,11 @@ from twisted.python.failure import Failure
14from provisioningserver.logger import LegacyLogger14from provisioningserver.logger import LegacyLogger
15from provisioningserver.prometheus.metrics import PROMETHEUS_METRICS15from provisioningserver.prometheus.metrics import PROMETHEUS_METRICS
16from provisioningserver.rpc.interfaces import IConnection, IConnectionToRegion16from provisioningserver.rpc.interfaces import IConnection, IConnectionToRegion
17from provisioningserver.utils.twisted import asynchronous, deferWithTimeout17from provisioningserver.utils.twisted import (
18 asynchronous,
19 callOut,
20 deferWithTimeout,
21)
1822
19log = LegacyLogger()23log = LegacyLogger()
2024
@@ -156,6 +160,11 @@ class Client:
156 :return: A deferred result. Call its `wait` method (with a timeout160 :return: A deferred result. Call its `wait` method (with a timeout
157 in seconds) to block on the call's completion.161 in seconds) to block on the call's completion.
158 """162 """
163 self._conn.in_use = True
164
165 def _free_conn():
166 self._conn.in_use = False
167
159 if len(args) != 0:168 if len(args) != 0:
160 receiver_name = "{}.{}".format(169 receiver_name = "{}.{}".format(
161 self.__module__,170 self.__module__,
@@ -171,11 +180,19 @@ class Client:
171 if timeout is undefined:180 if timeout is undefined:
172 timeout = 120 # 2 minutes181 timeout = 120 # 2 minutes
173 if timeout is None or timeout <= 0:182 if timeout is None or timeout <= 0:
174 return self._conn.callRemote(cmd, **kwargs)183 d = self._conn.callRemote(cmd, **kwargs)
184 if isinstance(d, Deferred):
185 d.addBoth(lambda x: callOut(x, _free_conn))
186 else:
187 _free_conn()
188 return d
175 else:189 else:
176 return deferWithTimeout(190 d = deferWithTimeout(timeout, self._conn.callRemote, cmd, **kwargs)
177 timeout, self._conn.callRemote, cmd, **kwargs191 if isinstance(d, Deferred):
178 )192 d.addBoth(lambda x: callOut(x, _free_conn))
193 else:
194 _free_conn()
195 return d
179196
180 @asynchronous197 @asynchronous
181 def getHostCertificate(self):198 def getHostCertificate(self):
diff --git a/src/provisioningserver/rpc/exceptions.py b/src/provisioningserver/rpc/exceptions.py
index 7ee4f3f..136e471 100644
--- a/src/provisioningserver/rpc/exceptions.py
+++ b/src/provisioningserver/rpc/exceptions.py
@@ -12,6 +12,14 @@ class NoConnectionsAvailable(Exception):
12 self.uuid = uuid12 self.uuid = uuid
1313
1414
15class AllConnectionsBusy(Exception):
16 """The current connection pool is busy"""
17
18
19class MaxConnectionsOpen(Exception):
20 """The maxmimum number of connections are currently open"""
21
22
15class NoSuchEventType(Exception):23class NoSuchEventType(Exception):
16 """The specified event type was not found."""24 """The specified event type was not found."""
1725
diff --git a/src/provisioningserver/rpc/testing/__init__.py b/src/provisioningserver/rpc/testing/__init__.py
index ee4a9e2..1b2f94f 100644
--- a/src/provisioningserver/rpc/testing/__init__.py
+++ b/src/provisioningserver/rpc/testing/__init__.py
@@ -262,7 +262,8 @@ class MockClusterToRegionRPCFixtureBase(fixtures.Fixture, metaclass=ABCMeta):
262 {262 {
263 "eventloops": {263 "eventloops": {
264 eventloop: [client.address]264 eventloop: [client.address]
265 for eventloop, client in connections265 for eventloop, clients in connections
266 for client in clients
266 }267 }
267 },268 },
268 orig_url,269 orig_url,
diff --git a/src/provisioningserver/rpc/testing/doubles.py b/src/provisioningserver/rpc/testing/doubles.py
index cb9f27f..0785859 100644
--- a/src/provisioningserver/rpc/testing/doubles.py
+++ b/src/provisioningserver/rpc/testing/doubles.py
@@ -30,6 +30,7 @@ class FakeConnection:
30 ident = attr.ib(default=sentinel.ident)30 ident = attr.ib(default=sentinel.ident)
31 hostCertificate = attr.ib(default=sentinel.hostCertificate)31 hostCertificate = attr.ib(default=sentinel.hostCertificate)
32 peerCertificate = attr.ib(default=sentinel.peerCertificate)32 peerCertificate = attr.ib(default=sentinel.peerCertificate)
33 in_use = attr.ib(default=False)
3334
34 def callRemote(self, cmd, **arguments):35 def callRemote(self, cmd, **arguments):
35 return succeed(sentinel.response)36 return succeed(sentinel.response)
@@ -48,6 +49,7 @@ class FakeConnectionToRegion:
48 address = attr.ib(default=(sentinel.host, sentinel.port))49 address = attr.ib(default=(sentinel.host, sentinel.port))
49 hostCertificate = attr.ib(default=sentinel.hostCertificate)50 hostCertificate = attr.ib(default=sentinel.hostCertificate)
50 peerCertificate = attr.ib(default=sentinel.peerCertificate)51 peerCertificate = attr.ib(default=sentinel.peerCertificate)
52 in_use = attr.ib(default=False)
5153
52 def callRemote(self, cmd, **arguments):54 def callRemote(self, cmd, **arguments):
53 return succeed(sentinel.response)55 return succeed(sentinel.response)
@@ -56,6 +58,22 @@ class FakeConnectionToRegion:
56verifyObject(IConnectionToRegion, FakeConnectionToRegion())58verifyObject(IConnectionToRegion, FakeConnectionToRegion())
5759
5860
61@attr.s(eq=False, order=False)
62@implementer(IConnectionToRegion)
63class FakeBusyConnectionToRegion:
64 "A fake `IConnectionToRegion` that appears busy." ""
65
66 ident = attr.ib(default=sentinel.ident)
67 localIdent = attr.ib(default=sentinel.localIdent)
68 address = attr.ib(default=(sentinel.host, sentinel.port))
69 hostCertificate = attr.ib(default=sentinel.hostCertificate)
70 peerCertificate = attr.ib(default=sentinel.peerCertificate)
71 in_use = attr.ib(default=True)
72
73 def callRemote(self, cmd, **arguments):
74 return succeed(sentinel.response)
75
76
59class StubOS(OperatingSystem):77class StubOS(OperatingSystem):
60 """An :py:class:`OperatingSystem` subclass that has canned answers.78 """An :py:class:`OperatingSystem` subclass that has canned answers.
6179
diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py
index b50311d..fab455a 100644
--- a/src/provisioningserver/rpc/tests/test_clusterservice.py
+++ b/src/provisioningserver/rpc/tests/test_clusterservice.py
@@ -117,7 +117,11 @@ from provisioningserver.rpc.testing import (
117 call_responder,117 call_responder,
118 MockLiveClusterToRegionRPCFixture,118 MockLiveClusterToRegionRPCFixture,
119)119)
120from provisioningserver.rpc.testing.doubles import DummyConnection, StubOS120from provisioningserver.rpc.testing.doubles import (
121 FakeBusyConnectionToRegion,
122 FakeConnection,
123 StubOS,
124)
121from provisioningserver.security import set_shared_secret_on_filesystem125from provisioningserver.security import set_shared_secret_on_filesystem
122from provisioningserver.service_monitor import service_monitor126from provisioningserver.service_monitor import service_monitor
123from provisioningserver.testing.config import ClusterConfigurationFixture127from provisioningserver.testing.config import ClusterConfigurationFixture
@@ -444,8 +448,10 @@ class TestClusterProtocol_DescribePowerTypes(MAASTestCase):
444 )448 )
445449
446450
447def make_inert_client_service():451def make_inert_client_service(max_idle_conns=1, max_conns=1, keepalive=1):
448 service = ClusterClientService(Clock())452 service = ClusterClientService(
453 Clock(), max_idle_conns, max_conns, keepalive
454 )
449 # ClusterClientService's superclass, TimerService, creates a455 # ClusterClientService's superclass, TimerService, creates a
450 # LoopingCall with now=True. We neuter it here to allow456 # LoopingCall with now=True. We neuter it here to allow
451 # observation of the behaviour of _update_interval() for457 # observation of the behaviour of _update_interval() for
@@ -499,10 +505,10 @@ class TestClusterClientService(MAASTestCase):
499505
500 # Fake some connections.506 # Fake some connections.
501 service.connections = {507 service.connections = {
502 ipv4client.eventloop: ipv4client,508 ipv4client.eventloop: {ipv4client},
503 ipv6client.eventloop: ipv6client,509 ipv6client.eventloop: {ipv6client},
504 ipv6mapped.eventloop: ipv6mapped,510 ipv6mapped.eventloop: {ipv6mapped},
505 hostclient.eventloop: hostclient,511 hostclient.eventloop: {hostclient},
506 }512 }
507513
508 # Update the RPC state to the filesystem and info cache.514 # Update the RPC state to the filesystem and info cache.
@@ -515,7 +521,8 @@ class TestClusterClientService(MAASTestCase):
515 Equals(521 Equals(
516 {522 {
517 client.address[0]523 client.address[0]
518 for _, client in service.connections.items()524 for _, clients in service.connections.items()
525 for client in clients
519 }526 }
520 ),527 ),
521 )528 )
@@ -1234,7 +1241,7 @@ class TestClusterClientService(MAASTestCase):
1234 connection = Mock()1241 connection = Mock()
1235 connection.address = (":::ffff", 2222)1242 connection.address = (":::ffff", 2222)
1236 service.add_connection(endpoint, connection)1243 service.add_connection(endpoint, connection)
1237 self.assertThat(service.connections, Equals({endpoint: connection}))1244 self.assertEqual(service.connections, {endpoint: {connection}})
12381245
1239 def test_add_connection_calls__update_saved_rpc_info_state(self):1246 def test_add_connection_calls__update_saved_rpc_info_state(self):
1240 service = make_inert_client_service()1247 service = make_inert_client_service()
@@ -1248,6 +1255,30 @@ class TestClusterClientService(MAASTestCase):
1248 service._update_saved_rpc_info_state, MockCalledOnceWith()1255 service._update_saved_rpc_info_state, MockCalledOnceWith()
1249 )1256 )
12501257
1258 def test_add_connection_creates_max_idle_connections(self):
1259 service = make_inert_client_service(max_idle_conns=2)
1260 service.startService()
1261 endpoint = Mock()
1262 connection = Mock()
1263 connection.address = (":::ffff", 2222)
1264 connection2 = Mock()
1265 connection.address = (":::ffff", 2222)
1266 self.patch(service, "_make_connection").return_value = succeed(
1267 connection2
1268 )
1269 self.patch_autospec(service, "_update_saved_rpc_info_state")
1270 service.add_connection(endpoint, connection)
1271 self.assertEqual(
1272 len(
1273 [
1274 conn
1275 for conns in service.connections.values()
1276 for conn in conns
1277 ]
1278 ),
1279 service._max_idle_connections,
1280 )
1281
1251 def test_remove_connection_removes_from_try_connections(self):1282 def test_remove_connection_removes_from_try_connections(self):
1252 service = make_inert_client_service()1283 service = make_inert_client_service()
1253 service.startService()1284 service.startService()
@@ -1262,7 +1293,7 @@ class TestClusterClientService(MAASTestCase):
1262 service.startService()1293 service.startService()
1263 endpoint = Mock()1294 endpoint = Mock()
1264 connection = Mock()1295 connection = Mock()
1265 service.connections[endpoint] = connection1296 service.connections[endpoint] = {connection}
1266 service.remove_connection(endpoint, connection)1297 service.remove_connection(endpoint, connection)
1267 self.assertThat(service.connections, Equals({}))1298 self.assertThat(service.connections, Equals({}))
12681299
@@ -1271,7 +1302,7 @@ class TestClusterClientService(MAASTestCase):
1271 service.startService()1302 service.startService()
1272 endpoint = Mock()1303 endpoint = Mock()
1273 connection = Mock()1304 connection = Mock()
1274 service.connections[endpoint] = connection1305 service.connections[endpoint] = {connection}
1275 service.remove_connection(endpoint, connection)1306 service.remove_connection(endpoint, connection)
1276 self.assertEqual(service.step, service.INTERVAL_LOW)1307 self.assertEqual(service.step, service.INTERVAL_LOW)
12771308
@@ -1280,7 +1311,7 @@ class TestClusterClientService(MAASTestCase):
1280 service.startService()1311 service.startService()
1281 endpoint = Mock()1312 endpoint = Mock()
1282 connection = Mock()1313 connection = Mock()
1283 service.connections[endpoint] = connection1314 service.connections[endpoint] = {connection}
12841315
1285 # Enable both dhcpd and dhcpd6.1316 # Enable both dhcpd and dhcpd6.
1286 service_monitor.getServiceByName("dhcpd").on()1317 service_monitor.getServiceByName("dhcpd").on()
@@ -1295,13 +1326,17 @@ class TestClusterClientService(MAASTestCase):
1295 def test_getClient(self):1326 def test_getClient(self):
1296 service = ClusterClientService(Clock())1327 service = ClusterClientService(Clock())
1297 service.connections = {1328 service.connections = {
1298 sentinel.eventloop01: DummyConnection(),1329 sentinel.eventloop01: {FakeConnection()},
1299 sentinel.eventloop02: DummyConnection(),1330 sentinel.eventloop02: {FakeConnection()},
1300 sentinel.eventloop03: DummyConnection(),1331 sentinel.eventloop03: {FakeConnection()},
1301 }1332 }
1302 self.assertIn(1333 self.assertIn(
1303 service.getClient(),1334 service.getClient(),
1304 {common.Client(conn) for conn in service.connections.values()},1335 {
1336 common.Client(conn)
1337 for conns in service.connections.values()
1338 for conn in conns
1339 },
1305 )1340 )
13061341
1307 def test_getClient_when_there_are_no_connections(self):1342 def test_getClient_when_there_are_no_connections(self):
@@ -1310,17 +1345,65 @@ class TestClusterClientService(MAASTestCase):
1310 self.assertRaises(exceptions.NoConnectionsAvailable, service.getClient)1345 self.assertRaises(exceptions.NoConnectionsAvailable, service.getClient)
13111346
1312 @inlineCallbacks1347 @inlineCallbacks
1348 def test_getClientNow_scales_connections_when_busy(self):
1349 service = ClusterClientService(Clock(), max_conns=2)
1350 service.connections = {
1351 sentinel.eventloop01: {FakeBusyConnectionToRegion()},
1352 sentinel.eventloop02: {FakeBusyConnectionToRegion()},
1353 sentinel.eventloop03: {FakeBusyConnectionToRegion()},
1354 }
1355 self.patch(service, "_make_connection").return_value = succeed(
1356 FakeConnection()
1357 )
1358 original_conns = [
1359 conn for conns in service.connections.values() for conn in conns
1360 ]
1361 new_client = yield service.getClientNow()
1362 new_conn = new_client._conn
1363 self.assertIsNotNone(new_conn)
1364 self.assertNotIn(new_conn, original_conns)
1365 self.assertIn(
1366 new_conn,
1367 [conn for conns in service.connections.values() for conn in conns],
1368 )
1369
1370 @inlineCallbacks
1371 def test_getClientNow_returns_an_existing_connection_when_max_are_open(
1372 self,
1373 ):
1374 service = ClusterClientService(Clock(), max_conns=1)
1375 service.connections = {
1376 sentinel.eventloop01: {FakeBusyConnectionToRegion()},
1377 sentinel.eventloop02: {FakeBusyConnectionToRegion()},
1378 sentinel.eventloop03: {FakeBusyConnectionToRegion()},
1379 }
1380 self.patch(service, "_make_connection").return_value = succeed(
1381 FakeConnection()
1382 )
1383 original_conns = [
1384 conn for conns in service.connections.values() for conn in conns
1385 ]
1386 new_client = yield service.getClientNow()
1387 new_conn = new_client._conn
1388 self.assertIsNotNone(new_conn)
1389 self.assertIn(new_conn, original_conns)
1390
1391 @inlineCallbacks
1313 def test_getClientNow_returns_current_connection(self):1392 def test_getClientNow_returns_current_connection(self):
1314 service = ClusterClientService(Clock())1393 service = ClusterClientService(Clock())
1315 service.connections = {1394 service.connections = {
1316 sentinel.eventloop01: DummyConnection(),1395 sentinel.eventloop01: {FakeConnection()},
1317 sentinel.eventloop02: DummyConnection(),1396 sentinel.eventloop02: {FakeConnection()},
1318 sentinel.eventloop03: DummyConnection(),1397 sentinel.eventloop03: {FakeConnection()},
1319 }1398 }
1320 client = yield service.getClientNow()1399 client = yield service.getClientNow()
1321 self.assertIn(1400 self.assertIn(
1322 client,1401 client,
1323 {common.Client(conn) for conn in service.connections.values()},1402 {
1403 common.Client(conn)
1404 for conns in service.connections.values()
1405 for conn in conns
1406 },
1324 )1407 )
13251408
1326 @inlineCallbacks1409 @inlineCallbacks
@@ -1330,9 +1413,9 @@ class TestClusterClientService(MAASTestCase):
13301413
1331 def addConnections():1414 def addConnections():
1332 service.connections = {1415 service.connections = {
1333 sentinel.eventloop01: DummyConnection(),1416 sentinel.eventloop01: {FakeConnection()},
1334 sentinel.eventloop02: DummyConnection(),1417 sentinel.eventloop02: {FakeConnection()},
1335 sentinel.eventloop03: DummyConnection(),1418 sentinel.eventloop03: {FakeConnection()},
1336 }1419 }
1337 return succeed(None)1420 return succeed(None)
13381421
@@ -1340,7 +1423,11 @@ class TestClusterClientService(MAASTestCase):
1340 client = yield service.getClientNow()1423 client = yield service.getClientNow()
1341 self.assertIn(1424 self.assertIn(
1342 client,1425 client,
1343 {common.Client(conn) for conn in service.connections.values()},1426 {
1427 common.Client(conn)
1428 for conns in service.connections.values()
1429 for conn in conns
1430 },
1344 )1431 )
13451432
1346 def test_getClientNow_raises_exception_when_no_clients(self):1433 def test_getClientNow_raises_exception_when_no_clients(self):
@@ -1383,11 +1470,11 @@ class TestClusterClientService(MAASTestCase):
1383 def test_getAllClients(self):1470 def test_getAllClients(self):
1384 service = ClusterClientService(Clock())1471 service = ClusterClientService(Clock())
1385 uuid1 = factory.make_UUID()1472 uuid1 = factory.make_UUID()
1386 c1 = DummyConnection()1473 c1 = FakeConnection()
1387 service.connections[uuid1] = c11474 service.connections[uuid1] = {c1}
1388 uuid2 = factory.make_UUID()1475 uuid2 = factory.make_UUID()
1389 c2 = DummyConnection()1476 c2 = FakeConnection()
1390 service.connections[uuid2] = c21477 service.connections[uuid2] = {c2}
1391 clients = service.getAllClients()1478 clients = service.getAllClients()
1392 self.assertEqual(clients, [common.Client(c1), common.Client(c2)])1479 self.assertEqual(clients, [common.Client(c1), common.Client(c2)])
13931480
@@ -1396,6 +1483,26 @@ class TestClusterClientService(MAASTestCase):
1396 service.connections = {}1483 service.connections = {}
1397 self.assertThat(service.getAllClients(), Equals([]))1484 self.assertThat(service.getAllClients(), Equals([]))
13981485
1486 @inlineCallbacks
1487 def test__reap_extra_connection_reaps_a_scaled_up_connection(self):
1488 clock = Clock()
1489 service = ClusterClientService(clock, max_conns=2, keepalive=0)
1490 service.connections = {
1491 sentinel.eventloop01: {FakeBusyConnectionToRegion()},
1492 sentinel.eventloop02: {FakeBusyConnectionToRegion()},
1493 sentinel.eventloop03: {FakeBusyConnectionToRegion()},
1494 }
1495 self.patch(service, "_make_connection").return_value = succeed(
1496 FakeConnection()
1497 )
1498 reap_call = self.patch(service, "_reap_extra_connection")
1499 new_client = yield service.getClientNow()
1500 delayed_calls = clock.getDelayedCalls()
1501 self.assertEqual(len(delayed_calls), 1)
1502 delayed_call = delayed_calls[0]
1503 self.assertIn(new_client._conn, delayed_call.args)
1504 self.assertEqual(reap_call.__name__, delayed_call.func.__name__)
1505
13991506
1400class TestClusterClientServiceIntervals(MAASTestCase):1507class TestClusterClientServiceIntervals(MAASTestCase):
14011508
@@ -1562,14 +1669,14 @@ class TestClusterClient(MAASTestCase):
1562 self.assertEqual(client.eventloop, extract_result(wait_for_ready))1669 self.assertEqual(client.eventloop, extract_result(wait_for_ready))
1563 self.assertEqual(client.service.try_connections, {})1670 self.assertEqual(client.service.try_connections, {})
1564 self.assertEqual(1671 self.assertEqual(
1565 client.service.connections, {client.eventloop: client}1672 client.service.connections, {client.eventloop: {client}}
1566 )1673 )
15671674
1568 def test_disconnects_when_there_is_an_existing_connection(self):1675 def test_disconnects_when_there_is_an_existing_connection(self):
1569 client = self.make_running_client()1676 client = self.make_running_client()
15701677
1571 # Pretend that a connection already exists for this address.1678 # Pretend that a connection already exists for this address.
1572 client.service.connections[client.eventloop] = sentinel.connection1679 client.service.connections[client.eventloop] = {sentinel.connection}
15731680
1574 # Connect via an in-memory transport.1681 # Connect via an in-memory transport.
1575 transport = StringTransportWithDisconnection()1682 transport = StringTransportWithDisconnection()
@@ -1586,7 +1693,8 @@ class TestClusterClient(MAASTestCase):
1586 # The connections list is unchanged because the new connection1693 # The connections list is unchanged because the new connection
1587 # immediately disconnects.1694 # immediately disconnects.
1588 self.assertEqual(1695 self.assertEqual(
1589 client.service.connections, {client.eventloop: sentinel.connection}1696 client.service.connections,
1697 {client.eventloop: {sentinel.connection}},
1590 )1698 )
1591 self.assertFalse(client.connected)1699 self.assertFalse(client.connected)
1592 self.assertIsNone(client.transport)1700 self.assertIsNone(client.transport)

Subscribers

People subscribed via source and target branches