Merge ~cgrabowski/maas:rpc_connection_pool_burst into maas:master
- Git
- lp:~cgrabowski/maas
- rpc_connection_pool_burst
- Merge into master
Proposed by
Christian Grabowski
Status: | Merged |
---|---|
Approved by: | Christian Grabowski |
Approved revision: | 0c5564670e61e93261925cf91a46850824536a77 |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~cgrabowski/maas:rpc_connection_pool_burst |
Merge into: | maas:master |
Diff against target: |
1518 lines (+741/-175) 15 files modified
src/provisioningserver/config.py (+17/-0) src/provisioningserver/dhcp/tests/test_config.py (+2/-2) src/provisioningserver/plugin.py (+7/-1) src/provisioningserver/rackdservices/external.py (+3/-2) src/provisioningserver/rackdservices/http.py (+3/-2) src/provisioningserver/rackdservices/tests/test_external.py (+8/-5) src/provisioningserver/rackdservices/tests/test_http.py (+5/-4) src/provisioningserver/rpc/clusterservice.py (+56/-50) src/provisioningserver/rpc/common.py (+22/-5) src/provisioningserver/rpc/connectionpool.py (+163/-0) src/provisioningserver/rpc/exceptions.py (+8/-0) src/provisioningserver/rpc/testing/__init__.py (+2/-1) src/provisioningserver/rpc/testing/doubles.py (+18/-0) src/provisioningserver/rpc/tests/test_clusterservice.py (+147/-103) src/provisioningserver/rpc/tests/test_connectionpool.py (+280/-0) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Alexsander de Souza | Approve | ||
MAAS Lander | Approve | ||
Review via email:
|
Commit message
move connection lifecycle logic into ConnectionPool
allocate additional connections when busy
always connect max idle connections times
Description of the change
To post a comment you must log in.
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Adam Collard (adam-collard) wrote : | # |
jenkins: !test
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rpc_connection_
STATUS: FAILED
LOG: http://
COMMIT: c19b8616c6acc5c
review:
Needs Fixing
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rpc_connection_
STATUS: SUCCESS
COMMIT: 0c5564670e61e93
review:
Approve
Revision history for this message
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Alexsander de Souza (alexsander-souza) wrote : | # |
LGTM
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/src/provisioningserver/config.py b/src/provisioningserver/config.py | |||
2 | index 97b6e68..9511ad6 100644 | |||
3 | --- a/src/provisioningserver/config.py | |||
4 | +++ b/src/provisioningserver/config.py | |||
5 | @@ -762,6 +762,23 @@ class ClusterConfiguration(Configuration, metaclass=ClusterConfigurationMeta): | |||
6 | 762 | ), | 762 | ), |
7 | 763 | ) | 763 | ) |
8 | 764 | 764 | ||
9 | 765 | # RPC Connection Pool options | ||
10 | 766 | max_idle_rpc_connections = ConfigurationOption( | ||
11 | 767 | "max_idle_rpc_connections", | ||
12 | 768 | "The nominal number of connections to have per endpoint", | ||
13 | 769 | Number(min=1, max=1024, if_missing=1), | ||
14 | 770 | ) | ||
15 | 771 | max_rpc_connections = ConfigurationOption( | ||
16 | 772 | "max_rpc_connections", | ||
17 | 773 | "The maximum number of connections to scale to when under load", | ||
18 | 774 | Number(min=1, max=1024, if_missing=4), | ||
19 | 775 | ) | ||
20 | 776 | rpc_keepalive = ConfigurationOption( | ||
21 | 777 | "rpc_keepalive", | ||
22 | 778 | "The duration in miliseconds to keep added connections alive", | ||
23 | 779 | Number(min=1, max=5000, if_missing=1000), | ||
24 | 780 | ) | ||
25 | 781 | |||
26 | 765 | # TFTP options. | 782 | # TFTP options. |
27 | 766 | tftp_port = ConfigurationOption( | 783 | tftp_port = ConfigurationOption( |
28 | 767 | "tftp_port", | 784 | "tftp_port", |
29 | diff --git a/src/provisioningserver/dhcp/tests/test_config.py b/src/provisioningserver/dhcp/tests/test_config.py | |||
30 | index c53e906..8c3f2fe 100644 | |||
31 | --- a/src/provisioningserver/dhcp/tests/test_config.py | |||
32 | +++ b/src/provisioningserver/dhcp/tests/test_config.py | |||
33 | @@ -176,7 +176,7 @@ def validate_dhcpd_configuration(test, configuration, ipv6): | |||
34 | 176 | ), | 176 | ), |
35 | 177 | ), | 177 | ), |
36 | 178 | ) | 178 | ) |
38 | 179 | cmd = ( | 179 | cmd = [ |
39 | 180 | "dhcpd", | 180 | "dhcpd", |
40 | 181 | ("-6" if ipv6 else "-4"), | 181 | ("-6" if ipv6 else "-4"), |
41 | 182 | "-t", | 182 | "-t", |
42 | @@ -184,7 +184,7 @@ def validate_dhcpd_configuration(test, configuration, ipv6): | |||
43 | 184 | conffile.name, | 184 | conffile.name, |
44 | 185 | "-lf", | 185 | "-lf", |
45 | 186 | leasesfile.name, | 186 | leasesfile.name, |
47 | 187 | ) | 187 | ] |
48 | 188 | if not running_in_docker(): | 188 | if not running_in_docker(): |
49 | 189 | # Call `dhcpd` without AppArmor confinement, so that it can read | 189 | # Call `dhcpd` without AppArmor confinement, so that it can read |
50 | 190 | # configurations file from /tmp. This is not needed when running | 190 | # configurations file from /tmp. This is not needed when running |
51 | diff --git a/src/provisioningserver/plugin.py b/src/provisioningserver/plugin.py | |||
52 | index e987c73..00ff898 100644 | |||
53 | --- a/src/provisioningserver/plugin.py | |||
54 | +++ b/src/provisioningserver/plugin.py | |||
55 | @@ -139,7 +139,13 @@ class ProvisioningServiceMaker: | |||
56 | 139 | def _makeRPCService(self): | 139 | def _makeRPCService(self): |
57 | 140 | from provisioningserver.rpc.clusterservice import ClusterClientService | 140 | from provisioningserver.rpc.clusterservice import ClusterClientService |
58 | 141 | 141 | ||
60 | 142 | rpc_service = ClusterClientService(reactor) | 142 | with ClusterConfiguration.open() as config: |
61 | 143 | rpc_service = ClusterClientService( | ||
62 | 144 | reactor, | ||
63 | 145 | config.max_idle_rpc_connections, | ||
64 | 146 | config.max_rpc_connections, | ||
65 | 147 | config.rpc_keepalive, | ||
66 | 148 | ) | ||
67 | 143 | rpc_service.setName("rpc") | 149 | rpc_service.setName("rpc") |
68 | 144 | return rpc_service | 150 | return rpc_service |
69 | 145 | 151 | ||
70 | diff --git a/src/provisioningserver/rackdservices/external.py b/src/provisioningserver/rackdservices/external.py | |||
71 | index ccabb74..5b8afe5 100644 | |||
72 | --- a/src/provisioningserver/rackdservices/external.py | |||
73 | +++ b/src/provisioningserver/rackdservices/external.py | |||
74 | @@ -68,8 +68,9 @@ class RackOnlyExternalService(metaclass=ABCMeta): | |||
75 | 68 | 68 | ||
76 | 69 | # Filter the connects by region. | 69 | # Filter the connects by region. |
77 | 70 | conn_per_region = defaultdict(set) | 70 | conn_per_region = defaultdict(set) |
80 | 71 | for eventloop, connection in connections.items(): | 71 | for eventloop, connection_set in connections.items(): |
81 | 72 | conn_per_region[eventloop.split(":")[0]].add(connection) | 72 | for connection in connection_set: |
82 | 73 | conn_per_region[eventloop.split(":")[0]].add(connection) | ||
83 | 73 | for eventloop, connections in conn_per_region.items(): | 74 | for eventloop, connections in conn_per_region.items(): |
84 | 74 | # Sort the connections so the same IP is always picked per | 75 | # Sort the connections so the same IP is always picked per |
85 | 75 | # region controller. This ensures that the HTTP configuration | 76 | # region controller. This ensures that the HTTP configuration |
86 | diff --git a/src/provisioningserver/rackdservices/http.py b/src/provisioningserver/rackdservices/http.py | |||
87 | index 421e35f..bda9d23 100644 | |||
88 | --- a/src/provisioningserver/rackdservices/http.py | |||
89 | +++ b/src/provisioningserver/rackdservices/http.py | |||
90 | @@ -101,8 +101,9 @@ class RackHTTPService(TimerService): | |||
91 | 101 | controller is connected to.""" | 101 | controller is connected to.""" |
92 | 102 | # Filter the connects by region. | 102 | # Filter the connects by region. |
93 | 103 | conn_per_region = defaultdict(set) | 103 | conn_per_region = defaultdict(set) |
96 | 104 | for eventloop, connection in self._rpc_service.connections.items(): | 104 | for eventloop, connection_set in self._rpc_service.connections.items(): |
97 | 105 | conn_per_region[eventloop.split(":")[0]].add(connection) | 105 | for connection in connection_set: |
98 | 106 | conn_per_region[eventloop.split(":")[0]].add(connection) | ||
99 | 106 | for _, connections in conn_per_region.items(): | 107 | for _, connections in conn_per_region.items(): |
100 | 107 | # Sort the connections so the same IP is always picked per | 108 | # Sort the connections so the same IP is always picked per |
101 | 108 | # region controller. This ensures that the HTTP configuration | 109 | # region controller. This ensures that the HTTP configuration |
102 | diff --git a/src/provisioningserver/rackdservices/tests/test_external.py b/src/provisioningserver/rackdservices/tests/test_external.py | |||
103 | index ad214a1..0cb8601 100644 | |||
104 | --- a/src/provisioningserver/rackdservices/tests/test_external.py | |||
105 | +++ b/src/provisioningserver/rackdservices/tests/test_external.py | |||
106 | @@ -430,7 +430,8 @@ class TestRackDNS(MAASTestCase): | |||
107 | 430 | return frozenset( | 430 | return frozenset( |
108 | 431 | { | 431 | { |
109 | 432 | client.address[0] | 432 | client.address[0] |
111 | 433 | for _, client in rpc_service.connections.items() | 433 | for _, clients in rpc_service.connections.items() |
112 | 434 | for client in clients | ||
113 | 434 | } | 435 | } |
114 | 435 | ) | 436 | ) |
115 | 436 | 437 | ||
116 | @@ -609,7 +610,7 @@ class TestRackDNS(MAASTestCase): | |||
117 | 609 | ip = factory.make_ip_address() | 610 | ip = factory.make_ip_address() |
118 | 610 | mock_conn = Mock() | 611 | mock_conn = Mock() |
119 | 611 | mock_conn.address = (ip, random.randint(5240, 5250)) | 612 | mock_conn.address = (ip, random.randint(5240, 5250)) |
121 | 612 | mock_rpc.connections[eventloop] = mock_conn | 613 | mock_rpc.connections[eventloop] = {mock_conn} |
122 | 613 | 614 | ||
123 | 614 | dns = external.RackDNS() | 615 | dns = external.RackDNS() |
124 | 615 | region_ips = list(dns._genRegionIps(mock_rpc.connections)) | 616 | region_ips = list(dns._genRegionIps(mock_rpc.connections)) |
125 | @@ -626,7 +627,7 @@ class TestRackDNS(MAASTestCase): | |||
126 | 626 | ip = factory.make_ip_address() | 627 | ip = factory.make_ip_address() |
127 | 627 | mock_conn = Mock() | 628 | mock_conn = Mock() |
128 | 628 | mock_conn.address = (ip, random.randint(5240, 5250)) | 629 | mock_conn.address = (ip, random.randint(5240, 5250)) |
130 | 629 | mock_rpc.connections[eventloop] = mock_conn | 630 | mock_rpc.connections[eventloop] = {mock_conn} |
131 | 630 | 631 | ||
132 | 631 | dns = external.RackDNS() | 632 | dns = external.RackDNS() |
133 | 632 | region_ips = frozenset(dns._genRegionIps(mock_rpc.connections)) | 633 | region_ips = frozenset(dns._genRegionIps(mock_rpc.connections)) |
134 | @@ -659,7 +660,8 @@ class TestRackProxy(MAASTestCase): | |||
135 | 659 | return frozenset( | 660 | return frozenset( |
136 | 660 | { | 661 | { |
137 | 661 | client.address[0] | 662 | client.address[0] |
139 | 662 | for _, client in rpc_service.connections.items() | 663 | for _, clients in rpc_service.connections.items() |
140 | 664 | for client in clients | ||
141 | 663 | } | 665 | } |
142 | 664 | ) | 666 | ) |
143 | 665 | 667 | ||
144 | @@ -824,7 +826,8 @@ class TestRackSyslog(MAASTestCase): | |||
145 | 824 | return frozenset( | 826 | return frozenset( |
146 | 825 | { | 827 | { |
147 | 826 | (eventloop, client.address[0]) | 828 | (eventloop, client.address[0]) |
149 | 827 | for eventloop, client in rpc_service.connections.items() | 829 | for eventloop, clients in rpc_service.connections.items() |
150 | 830 | for client in clients | ||
151 | 828 | } | 831 | } |
152 | 829 | ) | 832 | ) |
153 | 830 | 833 | ||
154 | diff --git a/src/provisioningserver/rackdservices/tests/test_http.py b/src/provisioningserver/rackdservices/tests/test_http.py | |||
155 | index bc43c66..43cb495 100644 | |||
156 | --- a/src/provisioningserver/rackdservices/tests/test_http.py | |||
157 | +++ b/src/provisioningserver/rackdservices/tests/test_http.py | |||
158 | @@ -92,7 +92,8 @@ class TestRackHTTPService(MAASTestCase): | |||
159 | 92 | return frozenset( | 92 | return frozenset( |
160 | 93 | { | 93 | { |
161 | 94 | client.address[0] | 94 | client.address[0] |
163 | 95 | for _, client in rpc_service.connections.items() | 95 | for _, clients in rpc_service.connections.items() |
164 | 96 | for client in clients | ||
165 | 96 | } | 97 | } |
166 | 97 | ) | 98 | ) |
167 | 98 | 99 | ||
168 | @@ -208,7 +209,7 @@ class TestRackHTTPService(MAASTestCase): | |||
169 | 208 | ip = factory.make_ip_address() | 209 | ip = factory.make_ip_address() |
170 | 209 | mock_conn = Mock() | 210 | mock_conn = Mock() |
171 | 210 | mock_conn.address = (ip, random.randint(5240, 5250)) | 211 | mock_conn.address = (ip, random.randint(5240, 5250)) |
173 | 211 | mock_rpc.connections[eventloop] = mock_conn | 212 | mock_rpc.connections[eventloop] = {mock_conn} |
174 | 212 | 213 | ||
175 | 213 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) | 214 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) |
176 | 214 | region_ips = list(service._genRegionIps()) | 215 | region_ips = list(service._genRegionIps()) |
177 | @@ -225,7 +226,7 @@ class TestRackHTTPService(MAASTestCase): | |||
178 | 225 | ip = factory.make_ip_address() | 226 | ip = factory.make_ip_address() |
179 | 226 | mock_conn = Mock() | 227 | mock_conn = Mock() |
180 | 227 | mock_conn.address = (ip, random.randint(5240, 5250)) | 228 | mock_conn.address = (ip, random.randint(5240, 5250)) |
182 | 228 | mock_rpc.connections[eventloop] = mock_conn | 229 | mock_rpc.connections[eventloop] = {mock_conn} |
183 | 229 | 230 | ||
184 | 230 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) | 231 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) |
185 | 231 | region_ips = frozenset(service._genRegionIps()) | 232 | region_ips = frozenset(service._genRegionIps()) |
186 | @@ -244,7 +245,7 @@ class TestRackHTTPService(MAASTestCase): | |||
187 | 244 | ip_addresses.add("[%s]" % ip) | 245 | ip_addresses.add("[%s]" % ip) |
188 | 245 | mock_conn = Mock() | 246 | mock_conn = Mock() |
189 | 246 | mock_conn.address = (ip, random.randint(5240, 5250)) | 247 | mock_conn.address = (ip, random.randint(5240, 5250)) |
191 | 247 | mock_rpc.connections[eventloop] = mock_conn | 248 | mock_rpc.connections[eventloop] = {mock_conn} |
192 | 248 | 249 | ||
193 | 249 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) | 250 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) |
194 | 250 | region_ips = set(service._genRegionIps()) | 251 | region_ips = set(service._genRegionIps()) |
195 | diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py | |||
196 | index c92d48a..a7205db 100644 | |||
197 | --- a/src/provisioningserver/rpc/clusterservice.py | |||
198 | +++ b/src/provisioningserver/rpc/clusterservice.py | |||
199 | @@ -9,7 +9,6 @@ import json | |||
200 | 9 | from operator import itemgetter | 9 | from operator import itemgetter |
201 | 10 | import os | 10 | import os |
202 | 11 | from os import urandom | 11 | from os import urandom |
203 | 12 | import random | ||
204 | 13 | from socket import AF_INET, AF_INET6, gethostname | 12 | from socket import AF_INET, AF_INET6, gethostname |
205 | 14 | import sys | 13 | import sys |
206 | 15 | from urllib.parse import urlparse | 14 | from urllib.parse import urlparse |
207 | @@ -24,7 +23,6 @@ from twisted.internet.defer import ( | |||
208 | 24 | maybeDeferred, | 23 | maybeDeferred, |
209 | 25 | returnValue, | 24 | returnValue, |
210 | 26 | ) | 25 | ) |
211 | 27 | from twisted.internet.endpoints import connectProtocol, TCP6ClientEndpoint | ||
212 | 28 | from twisted.internet.error import ConnectError, ConnectionClosed, ProcessDone | 26 | from twisted.internet.error import ConnectError, ConnectionClosed, ProcessDone |
213 | 29 | from twisted.internet.threads import deferToThread | 27 | from twisted.internet.threads import deferToThread |
214 | 30 | from twisted.protocols import amp | 28 | from twisted.protocols import amp |
215 | @@ -67,6 +65,7 @@ from provisioningserver.rpc.boot_images import ( | |||
216 | 67 | list_boot_images, | 65 | list_boot_images, |
217 | 68 | ) | 66 | ) |
218 | 69 | from provisioningserver.rpc.common import Ping, RPCProtocol | 67 | from provisioningserver.rpc.common import Ping, RPCProtocol |
219 | 68 | from provisioningserver.rpc.connectionpool import ConnectionPool | ||
220 | 70 | from provisioningserver.rpc.exceptions import CannotConfigureDHCP | 69 | from provisioningserver.rpc.exceptions import CannotConfigureDHCP |
221 | 71 | from provisioningserver.rpc.interfaces import IConnectionToRegion | 70 | from provisioningserver.rpc.interfaces import IConnectionToRegion |
222 | 72 | from provisioningserver.rpc.osystems import ( | 71 | from provisioningserver.rpc.osystems import ( |
223 | @@ -999,6 +998,7 @@ class ClusterClient(Cluster): | |||
224 | 999 | # Events for this protocol's life-cycle. | 998 | # Events for this protocol's life-cycle. |
225 | 1000 | self.authenticated = DeferredValue() | 999 | self.authenticated = DeferredValue() |
226 | 1001 | self.ready = DeferredValue() | 1000 | self.ready = DeferredValue() |
227 | 1001 | self.in_use = False | ||
228 | 1002 | self.localIdent = None | 1002 | self.localIdent = None |
229 | 1003 | 1003 | ||
230 | 1004 | @property | 1004 | @property |
231 | @@ -1201,13 +1201,15 @@ class ClusterClientService(TimerService): | |||
232 | 1201 | 1201 | ||
233 | 1202 | time_started = None | 1202 | time_started = None |
234 | 1203 | 1203 | ||
236 | 1204 | def __init__(self, reactor): | 1204 | def __init__(self, reactor, max_idle_conns=1, max_conns=1, keepalive=1000): |
237 | 1205 | super().__init__(self._calculate_interval(None, None), self._tryUpdate) | 1205 | super().__init__(self._calculate_interval(None, None), self._tryUpdate) |
238 | 1206 | self.connections = {} | ||
239 | 1207 | self.try_connections = {} | ||
240 | 1208 | self._previous_work = (None, None) | 1206 | self._previous_work = (None, None) |
241 | 1209 | self.clock = reactor | 1207 | self.clock = reactor |
242 | 1210 | 1208 | ||
243 | 1209 | self.connections = ConnectionPool( | ||
244 | 1210 | reactor, self, max_idle_conns, max_conns, keepalive | ||
245 | 1211 | ) | ||
246 | 1212 | |||
247 | 1211 | # Stored the URL used to connect to the region controller. This will be | 1213 | # Stored the URL used to connect to the region controller. This will be |
248 | 1212 | # the URL that was used to get the eventloops. | 1214 | # the URL that was used to get the eventloops. |
249 | 1213 | self.maas_url = None | 1215 | self.maas_url = None |
250 | @@ -1236,11 +1238,19 @@ class ClusterClientService(TimerService): | |||
251 | 1236 | :raises: :py:class:`~.exceptions.NoConnectionsAvailable` when | 1238 | :raises: :py:class:`~.exceptions.NoConnectionsAvailable` when |
252 | 1237 | there are no open connections to a region controller. | 1239 | there are no open connections to a region controller. |
253 | 1238 | """ | 1240 | """ |
256 | 1239 | conns = list(self.connections.values()) | 1241 | if len(self.connections) == 0: |
255 | 1240 | if len(conns) == 0: | ||
257 | 1241 | raise exceptions.NoConnectionsAvailable() | 1242 | raise exceptions.NoConnectionsAvailable() |
258 | 1242 | else: | 1243 | else: |
260 | 1243 | return common.Client(random.choice(conns)) | 1244 | try: |
261 | 1245 | return common.Client( | ||
262 | 1246 | self.connections.get_random_free_connection() | ||
263 | 1247 | ) | ||
264 | 1248 | except exceptions.AllConnectionsBusy as e: | ||
265 | 1249 | for endpoint_conns in self.connections.values(): | ||
266 | 1250 | if len(endpoint_conns) < self.connections._max_connections: | ||
267 | 1251 | raise e | ||
268 | 1252 | # return a busy connection, assume it will free up or timeout | ||
269 | 1253 | return common.Client(self.connections.get_random_connection()) | ||
270 | 1244 | 1254 | ||
271 | 1245 | @deferred | 1255 | @deferred |
272 | 1246 | def getClientNow(self): | 1256 | def getClientNow(self): |
273 | @@ -1259,10 +1269,17 @@ class ClusterClientService(TimerService): | |||
274 | 1259 | return self.getClient() | 1269 | return self.getClient() |
275 | 1260 | except exceptions.NoConnectionsAvailable: | 1270 | except exceptions.NoConnectionsAvailable: |
276 | 1261 | return self._tryUpdate().addCallback(call, self.getClient) | 1271 | return self._tryUpdate().addCallback(call, self.getClient) |
277 | 1272 | except exceptions.AllConnectionsBusy: | ||
278 | 1273 | return self.connections.scale_up_connections().addCallback( | ||
279 | 1274 | call, self.getClient | ||
280 | 1275 | ) | ||
281 | 1262 | 1276 | ||
282 | 1263 | def getAllClients(self): | 1277 | def getAllClients(self): |
283 | 1264 | """Return a list of all connected :class:`common.Client`s.""" | 1278 | """Return a list of all connected :class:`common.Client`s.""" |
285 | 1265 | return [common.Client(conn) for conn in self.connections.values()] | 1279 | return [ |
286 | 1280 | common.Client(conn) | ||
287 | 1281 | for conn in self.connections.get_all_connections() | ||
288 | 1282 | ] | ||
289 | 1266 | 1283 | ||
290 | 1267 | def _tryUpdate(self): | 1284 | def _tryUpdate(self): |
291 | 1268 | """Attempt to refresh outgoing connections. | 1285 | """Attempt to refresh outgoing connections. |
292 | @@ -1391,7 +1408,9 @@ class ClusterClientService(TimerService): | |||
293 | 1391 | """Update the saved RPC info state.""" | 1408 | """Update the saved RPC info state.""" |
294 | 1392 | # Build a list of addresses based on the current connections. | 1409 | # Build a list of addresses based on the current connections. |
295 | 1393 | connected_addr = { | 1410 | connected_addr = { |
297 | 1394 | conn.address[0] for _, conn in self.connections.items() | 1411 | conn.address[0] |
298 | 1412 | for _, conns in self.connections.items() | ||
299 | 1413 | for conn in conns | ||
300 | 1395 | } | 1414 | } |
301 | 1396 | if ( | 1415 | if ( |
302 | 1397 | self._rpc_info_state is None | 1416 | self._rpc_info_state is None |
303 | @@ -1467,8 +1486,8 @@ class ClusterClientService(TimerService): | |||
304 | 1467 | # Gather the list of successful responses. | 1486 | # Gather the list of successful responses. |
305 | 1468 | successful = [] | 1487 | successful = [] |
306 | 1469 | errors = [] | 1488 | errors = [] |
309 | 1470 | for sucess, result in results: | 1489 | for success, result in results: |
310 | 1471 | if sucess: | 1490 | if success: |
311 | 1472 | body, orig_url = result | 1491 | body, orig_url = result |
312 | 1473 | eventloops = body.get("eventloops") | 1492 | eventloops = body.get("eventloops") |
313 | 1474 | if eventloops is not None: | 1493 | if eventloops is not None: |
314 | @@ -1656,12 +1675,15 @@ class ClusterClientService(TimerService): | |||
315 | 1656 | "Dropping connections to event-loops: %s" | 1675 | "Dropping connections to event-loops: %s" |
316 | 1657 | % (", ".join(drop.keys())) | 1676 | % (", ".join(drop.keys())) |
317 | 1658 | ) | 1677 | ) |
318 | 1678 | drop_defers = [] | ||
319 | 1679 | for eventloop, connections in drop.items(): | ||
320 | 1680 | for connection in connections: | ||
321 | 1681 | drop_defers.append( | ||
322 | 1682 | maybeDeferred(self.connections.disconnect, connection) | ||
323 | 1683 | ) | ||
324 | 1684 | self.connections.remove_connection(eventloop, connection) | ||
325 | 1659 | yield DeferredList( | 1685 | yield DeferredList( |
331 | 1660 | [ | 1686 | drop_defers, |
327 | 1661 | maybeDeferred(self._drop_connection, connection) | ||
328 | 1662 | for eventloop, connections in drop.items() | ||
329 | 1663 | for connection in connections | ||
330 | 1664 | ], | ||
332 | 1665 | consumeErrors=True, | 1687 | consumeErrors=True, |
333 | 1666 | ) | 1688 | ) |
334 | 1667 | 1689 | ||
335 | @@ -1692,11 +1714,12 @@ class ClusterClientService(TimerService): | |||
336 | 1692 | # between consenting adults. | 1714 | # between consenting adults. |
337 | 1693 | for eventloop, addresses in eventloops.items(): | 1715 | for eventloop, addresses in eventloops.items(): |
338 | 1694 | if eventloop in self.connections: | 1716 | if eventloop in self.connections: |
344 | 1695 | connection = self.connections[eventloop] | 1717 | connection_list = self.connections[eventloop] |
345 | 1696 | if connection.address not in addresses: | 1718 | for connection in connection_list: |
346 | 1697 | drop[eventloop] = [connection] | 1719 | if connection.address not in addresses: |
347 | 1698 | if eventloop in self.try_connections: | 1720 | drop[eventloop] = [connection] |
348 | 1699 | connection = self.try_connections[eventloop] | 1721 | if self.connections.is_staged(eventloop): |
349 | 1722 | connection = self.connections.get_staged_connection(eventloop) | ||
350 | 1700 | if connection.address not in addresses: | 1723 | if connection.address not in addresses: |
351 | 1701 | drop[eventloop] = [connection] | 1724 | drop[eventloop] = [connection] |
352 | 1702 | 1725 | ||
353 | @@ -1705,7 +1728,7 @@ class ClusterClientService(TimerService): | |||
354 | 1705 | for eventloop, addresses in eventloops.items(): | 1728 | for eventloop, addresses in eventloops.items(): |
355 | 1706 | if ( | 1729 | if ( |
356 | 1707 | eventloop not in self.connections | 1730 | eventloop not in self.connections |
358 | 1708 | and eventloop not in self.try_connections | 1731 | and not self.connections.is_staged(eventloop) |
359 | 1709 | ) or eventloop in drop: | 1732 | ) or eventloop in drop: |
360 | 1710 | connect[eventloop] = addresses | 1733 | connect[eventloop] = addresses |
361 | 1711 | 1734 | ||
362 | @@ -1714,13 +1737,13 @@ class ClusterClientService(TimerService): | |||
363 | 1714 | # the process in which the event-loop is no longer running, but | 1737 | # the process in which the event-loop is no longer running, but |
364 | 1715 | # it could be an indicator of a heavily loaded machine, or a | 1738 | # it could be an indicator of a heavily loaded machine, or a |
365 | 1716 | # fault. In any case, it seems to make sense to disconnect. | 1739 | # fault. In any case, it seems to make sense to disconnect. |
367 | 1717 | for eventloop in self.connections: | 1740 | for eventloop in self.connections.keys(): |
368 | 1718 | if eventloop not in eventloops: | 1741 | if eventloop not in eventloops: |
372 | 1719 | connection = self.connections[eventloop] | 1742 | connection_list = self.connections[eventloop] |
373 | 1720 | drop[eventloop] = [connection] | 1743 | drop[eventloop] = connection_list |
374 | 1721 | for eventloop in self.try_connections: | 1744 | for eventloop in self.connections.get_staged_connections(): |
375 | 1722 | if eventloop not in eventloops: | 1745 | if eventloop not in eventloops: |
377 | 1723 | connection = self.try_connections[eventloop] | 1746 | connection = self.connections.get_staged_connection(eventloop) |
378 | 1724 | drop[eventloop] = [connection] | 1747 | drop[eventloop] = [connection] |
379 | 1725 | 1748 | ||
380 | 1726 | return drop, connect | 1749 | return drop, connect |
381 | @@ -1730,7 +1753,7 @@ class ClusterClientService(TimerService): | |||
382 | 1730 | """Connect to `eventloop` using all `addresses`.""" | 1753 | """Connect to `eventloop` using all `addresses`.""" |
383 | 1731 | for address in addresses: | 1754 | for address in addresses: |
384 | 1732 | try: | 1755 | try: |
386 | 1733 | connection = yield self._make_connection(eventloop, address) | 1756 | connection = yield self.connections.connect(eventloop, address) |
387 | 1734 | except ConnectError as error: | 1757 | except ConnectError as error: |
388 | 1735 | host, port = address | 1758 | host, port = address |
389 | 1736 | log.msg( | 1759 | log.msg( |
390 | @@ -1747,29 +1770,17 @@ class ClusterClientService(TimerService): | |||
391 | 1747 | ), | 1770 | ), |
392 | 1748 | ) | 1771 | ) |
393 | 1749 | else: | 1772 | else: |
395 | 1750 | self.try_connections[eventloop] = connection | 1773 | self.connections.stage_connection(eventloop, connection) |
396 | 1751 | break | 1774 | break |
397 | 1752 | 1775 | ||
409 | 1753 | def _make_connection(self, eventloop, address): | 1776 | @inlineCallbacks |
399 | 1754 | """Connect to `eventloop` at `address`.""" | ||
400 | 1755 | # Force everything to use AF_INET6 sockets. | ||
401 | 1756 | endpoint = TCP6ClientEndpoint(self.clock, *address) | ||
402 | 1757 | protocol = ClusterClient(address, eventloop, self) | ||
403 | 1758 | return connectProtocol(endpoint, protocol) | ||
404 | 1759 | |||
405 | 1760 | def _drop_connection(self, connection): | ||
406 | 1761 | """Drop the given `connection`.""" | ||
407 | 1762 | return connection.transport.loseConnection() | ||
408 | 1763 | |||
410 | 1764 | def add_connection(self, eventloop, connection): | 1777 | def add_connection(self, eventloop, connection): |
411 | 1765 | """Add the connection to the tracked connections. | 1778 | """Add the connection to the tracked connections. |
412 | 1766 | 1779 | ||
413 | 1767 | Update the saved RPC info state information based on the new | 1780 | Update the saved RPC info state information based on the new |
414 | 1768 | connection. | 1781 | connection. |
415 | 1769 | """ | 1782 | """ |
419 | 1770 | if eventloop in self.try_connections: | 1783 | yield self.connections.add_connection(eventloop, connection) |
417 | 1771 | del self.try_connections[eventloop] | ||
418 | 1772 | self.connections[eventloop] = connection | ||
420 | 1773 | self._update_saved_rpc_info_state() | 1784 | self._update_saved_rpc_info_state() |
421 | 1774 | 1785 | ||
422 | 1775 | def remove_connection(self, eventloop, connection): | 1786 | def remove_connection(self, eventloop, connection): |
423 | @@ -1778,12 +1789,7 @@ class ClusterClientService(TimerService): | |||
424 | 1778 | If this is the last connection that was keeping rackd connected to | 1789 | If this is the last connection that was keeping rackd connected to |
425 | 1779 | a regiond then dhcpd and dhcpd6 services will be turned off. | 1790 | a regiond then dhcpd and dhcpd6 services will be turned off. |
426 | 1780 | """ | 1791 | """ |
433 | 1781 | if eventloop in self.try_connections: | 1792 | self.connections.remove_connection(eventloop, connection) |
428 | 1782 | if self.try_connections[eventloop] is connection: | ||
429 | 1783 | del self.try_connections[eventloop] | ||
430 | 1784 | if eventloop in self.connections: | ||
431 | 1785 | if self.connections[eventloop] is connection: | ||
432 | 1786 | del self.connections[eventloop] | ||
434 | 1787 | # Disable DHCP when no connections to a region controller. | 1793 | # Disable DHCP when no connections to a region controller. |
435 | 1788 | if len(self.connections) == 0: | 1794 | if len(self.connections) == 0: |
436 | 1789 | stopping_services = [] | 1795 | stopping_services = [] |
437 | diff --git a/src/provisioningserver/rpc/common.py b/src/provisioningserver/rpc/common.py | |||
438 | index 5d67bba..40e091f 100644 | |||
439 | --- a/src/provisioningserver/rpc/common.py | |||
440 | +++ b/src/provisioningserver/rpc/common.py | |||
441 | @@ -14,7 +14,11 @@ from twisted.python.failure import Failure | |||
442 | 14 | from provisioningserver.logger import LegacyLogger | 14 | from provisioningserver.logger import LegacyLogger |
443 | 15 | from provisioningserver.prometheus.metrics import PROMETHEUS_METRICS | 15 | from provisioningserver.prometheus.metrics import PROMETHEUS_METRICS |
444 | 16 | from provisioningserver.rpc.interfaces import IConnection, IConnectionToRegion | 16 | from provisioningserver.rpc.interfaces import IConnection, IConnectionToRegion |
446 | 17 | from provisioningserver.utils.twisted import asynchronous, deferWithTimeout | 17 | from provisioningserver.utils.twisted import ( |
447 | 18 | asynchronous, | ||
448 | 19 | callOut, | ||
449 | 20 | deferWithTimeout, | ||
450 | 21 | ) | ||
451 | 18 | 22 | ||
452 | 19 | log = LegacyLogger() | 23 | log = LegacyLogger() |
453 | 20 | 24 | ||
454 | @@ -156,6 +160,11 @@ class Client: | |||
455 | 156 | :return: A deferred result. Call its `wait` method (with a timeout | 160 | :return: A deferred result. Call its `wait` method (with a timeout |
456 | 157 | in seconds) to block on the call's completion. | 161 | in seconds) to block on the call's completion. |
457 | 158 | """ | 162 | """ |
458 | 163 | self._conn.in_use = True | ||
459 | 164 | |||
460 | 165 | def _free_conn(): | ||
461 | 166 | self._conn.in_use = False | ||
462 | 167 | |||
463 | 159 | if len(args) != 0: | 168 | if len(args) != 0: |
464 | 160 | receiver_name = "{}.{}".format( | 169 | receiver_name = "{}.{}".format( |
465 | 161 | self.__module__, | 170 | self.__module__, |
466 | @@ -171,11 +180,19 @@ class Client: | |||
467 | 171 | if timeout is undefined: | 180 | if timeout is undefined: |
468 | 172 | timeout = 120 # 2 minutes | 181 | timeout = 120 # 2 minutes |
469 | 173 | if timeout is None or timeout <= 0: | 182 | if timeout is None or timeout <= 0: |
471 | 174 | return self._conn.callRemote(cmd, **kwargs) | 183 | d = self._conn.callRemote(cmd, **kwargs) |
472 | 184 | if isinstance(d, Deferred): | ||
473 | 185 | d.addBoth(lambda x: callOut(x, _free_conn)) | ||
474 | 186 | else: | ||
475 | 187 | _free_conn() | ||
476 | 188 | return d | ||
477 | 175 | else: | 189 | else: |
481 | 176 | return deferWithTimeout( | 190 | d = deferWithTimeout(timeout, self._conn.callRemote, cmd, **kwargs) |
482 | 177 | timeout, self._conn.callRemote, cmd, **kwargs | 191 | if isinstance(d, Deferred): |
483 | 178 | ) | 192 | d.addBoth(lambda x: callOut(x, _free_conn)) |
484 | 193 | else: | ||
485 | 194 | _free_conn() | ||
486 | 195 | return d | ||
487 | 179 | 196 | ||
488 | 180 | @asynchronous | 197 | @asynchronous |
489 | 181 | def getHostCertificate(self): | 198 | def getHostCertificate(self): |
490 | diff --git a/src/provisioningserver/rpc/connectionpool.py b/src/provisioningserver/rpc/connectionpool.py | |||
491 | 182 | new file mode 100644 | 199 | new file mode 100644 |
492 | index 0000000..8023f80 | |||
493 | --- /dev/null | |||
494 | +++ b/src/provisioningserver/rpc/connectionpool.py | |||
495 | @@ -0,0 +1,163 @@ | |||
496 | 1 | # Copyright 2022 Canonical Ltd. This software is licensed under the | ||
497 | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). | ||
498 | 3 | |||
499 | 4 | """ RPC Connection Pooling and Lifecycle """ | ||
500 | 5 | |||
501 | 6 | import random | ||
502 | 7 | |||
503 | 8 | from twisted.internet.defer import inlineCallbacks | ||
504 | 9 | from twisted.internet.endpoints import connectProtocol, TCP6ClientEndpoint | ||
505 | 10 | |||
506 | 11 | from provisioningserver.rpc import exceptions | ||
507 | 12 | |||
508 | 13 | |||
509 | 14 | class ConnectionPool: | ||
510 | 15 | def __init__( | ||
511 | 16 | self, reactor, service, max_idle_conns=1, max_conns=1, keepalive=1000 | ||
512 | 17 | ): | ||
513 | 18 | # The maximum number of connections to allways allocate per eventloop | ||
514 | 19 | self._max_idle_connections = max_idle_conns | ||
515 | 20 | # The maximum number of connections to allocate while under load per eventloop | ||
516 | 21 | self._max_connections = max_conns | ||
517 | 22 | # The duration in milliseconds to keep scaled up connections alive | ||
518 | 23 | self._keepalive = keepalive | ||
519 | 24 | |||
520 | 25 | self.connections = {} | ||
521 | 26 | self.try_connections = {} | ||
522 | 27 | self.clock = reactor | ||
523 | 28 | self._service = service | ||
524 | 29 | |||
525 | 30 | def __setitem__(self, key, item): | ||
526 | 31 | self.connections[key] = item | ||
527 | 32 | |||
528 | 33 | def __getitem__(self, key): | ||
529 | 34 | return self.connections.get(key) | ||
530 | 35 | |||
531 | 36 | def __repr__(self): | ||
532 | 37 | return repr(self.connections) | ||
533 | 38 | |||
534 | 39 | def __len__(self): | ||
535 | 40 | return len(self.get_all_connections()) | ||
536 | 41 | |||
537 | 42 | def __delitem__(self, key): | ||
538 | 43 | del self.connections[key] | ||
539 | 44 | |||
540 | 45 | def __contains__(self, item): | ||
541 | 46 | return item in self.connections | ||
542 | 47 | |||
543 | 48 | def __cmp__(self, value): | ||
544 | 49 | return self.connections.__cmp__(value) | ||
545 | 50 | |||
546 | 51 | def __eq__(self, value): | ||
547 | 52 | return self.connections.__eq__(value) | ||
548 | 53 | |||
549 | 54 | def keys(self): | ||
550 | 55 | return self.connections.keys() | ||
551 | 56 | |||
552 | 57 | def values(self): | ||
553 | 58 | return self.connections.values() | ||
554 | 59 | |||
555 | 60 | def items(self): | ||
556 | 61 | return self.connections.items() | ||
557 | 62 | |||
558 | 63 | def _reap_extra_connection(self, eventloop, conn): | ||
559 | 64 | if not conn.in_use: | ||
560 | 65 | self.disconnect(conn) | ||
561 | 66 | return self.remove_connection(eventloop, conn) | ||
562 | 67 | return self.clock.callLater( | ||
563 | 68 | self._keepalive, self._reap_extra_connection, eventloop, conn | ||
564 | 69 | ) | ||
565 | 70 | |||
566 | 71 | def is_staged(self, eventloop): | ||
567 | 72 | return eventloop in self.try_connections | ||
568 | 73 | |||
569 | 74 | def get_staged_connection(self, eventloop): | ||
570 | 75 | return self.try_connections.get(eventloop) | ||
571 | 76 | |||
572 | 77 | def get_staged_connections(self): | ||
573 | 78 | return self.try_connections | ||
574 | 79 | |||
575 | 80 | def stage_connection(self, eventloop, connection): | ||
576 | 81 | self.try_connections[eventloop] = connection | ||
577 | 82 | |||
578 | 83 | @inlineCallbacks | ||
579 | 84 | def scale_up_connections(self): | ||
580 | 85 | for ev, ev_conns in self.connections.items(): | ||
581 | 86 | # pick first group with room for additional conns | ||
582 | 87 | if len(ev_conns) < self._max_connections: | ||
583 | 88 | # spawn an extra connection | ||
584 | 89 | conn_to_clone = random.choice(list(ev_conns)) | ||
585 | 90 | conn = yield self.connect(ev, conn_to_clone.address) | ||
586 | 91 | self.connections[ev].append(conn) | ||
587 | 92 | self.clock.callLater( | ||
588 | 93 | self._keepalive, self._reap_extra_connection, ev, conn | ||
589 | 94 | ) | ||
590 | 95 | return | ||
591 | 96 | raise exceptions.MaxConnectionsOpen() | ||
592 | 97 | |||
593 | 98 | def get_connection(self, eventloop): | ||
594 | 99 | return random.choice(self.connections[eventloop]) | ||
595 | 100 | |||
596 | 101 | def get_random_connection(self): | ||
597 | 102 | return random.choice(self.get_all_connections()) | ||
598 | 103 | |||
599 | 104 | def get_random_free_connection(self): | ||
600 | 105 | free_conns = self.get_all_free_connections() | ||
601 | 106 | if len(free_conns) == 0: | ||
602 | 107 | # caller should create a new connection | ||
603 | 108 | raise exceptions.AllConnectionsBusy() | ||
604 | 109 | return random.choice(free_conns) | ||
605 | 110 | |||
606 | 111 | def get_all_connections(self): | ||
607 | 112 | return [ | ||
608 | 113 | conn | ||
609 | 114 | for conn_list in self.connections.values() | ||
610 | 115 | for conn in conn_list | ||
611 | 116 | ] | ||
612 | 117 | |||
613 | 118 | def get_all_free_connections(self): | ||
614 | 119 | return [ | ||
615 | 120 | conn | ||
616 | 121 | for conn_list in self.connections.values() | ||
617 | 122 | for conn in conn_list | ||
618 | 123 | if not conn.in_use | ||
619 | 124 | ] | ||
620 | 125 | |||
621 | 126 | @inlineCallbacks | ||
622 | 127 | def connect(self, eventloop, address): | ||
623 | 128 | from provisioningserver.rpc.clusterservice import ClusterClient | ||
624 | 129 | |||
625 | 130 | # Force everything to use AF_INET6 sockets. | ||
626 | 131 | endpoint = TCP6ClientEndpoint(self.clock, *address) | ||
627 | 132 | protocol = ClusterClient(address, eventloop, self._service) | ||
628 | 133 | conn = yield connectProtocol(endpoint, protocol) | ||
629 | 134 | return conn | ||
630 | 135 | |||
631 | 136 | def disconnect(self, connection): | ||
632 | 137 | return connection.transport.loseConnection() | ||
633 | 138 | |||
634 | 139 | @inlineCallbacks | ||
635 | 140 | def add_connection(self, eventloop, connection): | ||
636 | 141 | if self.is_staged(eventloop): | ||
637 | 142 | del self.try_connections[eventloop] | ||
638 | 143 | if eventloop not in self.connections: | ||
639 | 144 | self.connections[eventloop] = [] | ||
640 | 145 | |||
641 | 146 | self.connections[eventloop].append(connection) | ||
642 | 147 | |||
643 | 148 | # clone connection to equal num idle connections | ||
644 | 149 | if self._max_idle_connections - 1 > 0: | ||
645 | 150 | for _ in range(self._max_idle_connections - 1): | ||
646 | 151 | extra_conn = yield self.connect( | ||
647 | 152 | connection.eventloop, connection.address | ||
648 | 153 | ) | ||
649 | 154 | self.connections[eventloop].append(extra_conn) | ||
650 | 155 | |||
651 | 156 | def remove_connection(self, eventloop, connection): | ||
652 | 157 | if self.is_staged(eventloop): | ||
653 | 158 | if self.try_connections[eventloop] is connection: | ||
654 | 159 | del self.try_connections[eventloop] | ||
655 | 160 | if connection in self.connections.get(eventloop, []): | ||
656 | 161 | self.connections[eventloop].remove(connection) | ||
657 | 162 | if len(self.connections[eventloop]) == 0: | ||
658 | 163 | del self.connections[eventloop] | ||
659 | diff --git a/src/provisioningserver/rpc/exceptions.py b/src/provisioningserver/rpc/exceptions.py | |||
660 | index 7ee4f3f..136e471 100644 | |||
661 | --- a/src/provisioningserver/rpc/exceptions.py | |||
662 | +++ b/src/provisioningserver/rpc/exceptions.py | |||
663 | @@ -12,6 +12,14 @@ class NoConnectionsAvailable(Exception): | |||
664 | 12 | self.uuid = uuid | 12 | self.uuid = uuid |
665 | 13 | 13 | ||
666 | 14 | 14 | ||
667 | 15 | class AllConnectionsBusy(Exception): | ||
668 | 16 | """The current connection pool is busy""" | ||
669 | 17 | |||
670 | 18 | |||
671 | 19 | class MaxConnectionsOpen(Exception): | ||
672 | 20 | """The maxmimum number of connections are currently open""" | ||
673 | 21 | |||
674 | 22 | |||
675 | 15 | class NoSuchEventType(Exception): | 23 | class NoSuchEventType(Exception): |
676 | 16 | """The specified event type was not found.""" | 24 | """The specified event type was not found.""" |
677 | 17 | 25 | ||
678 | diff --git a/src/provisioningserver/rpc/testing/__init__.py b/src/provisioningserver/rpc/testing/__init__.py | |||
679 | index ee4a9e2..1b2f94f 100644 | |||
680 | --- a/src/provisioningserver/rpc/testing/__init__.py | |||
681 | +++ b/src/provisioningserver/rpc/testing/__init__.py | |||
682 | @@ -262,7 +262,8 @@ class MockClusterToRegionRPCFixtureBase(fixtures.Fixture, metaclass=ABCMeta): | |||
683 | 262 | { | 262 | { |
684 | 263 | "eventloops": { | 263 | "eventloops": { |
685 | 264 | eventloop: [client.address] | 264 | eventloop: [client.address] |
687 | 265 | for eventloop, client in connections | 265 | for eventloop, clients in connections |
688 | 266 | for client in clients | ||
689 | 266 | } | 267 | } |
690 | 267 | }, | 268 | }, |
691 | 268 | orig_url, | 269 | orig_url, |
692 | diff --git a/src/provisioningserver/rpc/testing/doubles.py b/src/provisioningserver/rpc/testing/doubles.py | |||
693 | index cb9f27f..0785859 100644 | |||
694 | --- a/src/provisioningserver/rpc/testing/doubles.py | |||
695 | +++ b/src/provisioningserver/rpc/testing/doubles.py | |||
696 | @@ -30,6 +30,7 @@ class FakeConnection: | |||
697 | 30 | ident = attr.ib(default=sentinel.ident) | 30 | ident = attr.ib(default=sentinel.ident) |
698 | 31 | hostCertificate = attr.ib(default=sentinel.hostCertificate) | 31 | hostCertificate = attr.ib(default=sentinel.hostCertificate) |
699 | 32 | peerCertificate = attr.ib(default=sentinel.peerCertificate) | 32 | peerCertificate = attr.ib(default=sentinel.peerCertificate) |
700 | 33 | in_use = attr.ib(default=False) | ||
701 | 33 | 34 | ||
702 | 34 | def callRemote(self, cmd, **arguments): | 35 | def callRemote(self, cmd, **arguments): |
703 | 35 | return succeed(sentinel.response) | 36 | return succeed(sentinel.response) |
704 | @@ -48,6 +49,7 @@ class FakeConnectionToRegion: | |||
705 | 48 | address = attr.ib(default=(sentinel.host, sentinel.port)) | 49 | address = attr.ib(default=(sentinel.host, sentinel.port)) |
706 | 49 | hostCertificate = attr.ib(default=sentinel.hostCertificate) | 50 | hostCertificate = attr.ib(default=sentinel.hostCertificate) |
707 | 50 | peerCertificate = attr.ib(default=sentinel.peerCertificate) | 51 | peerCertificate = attr.ib(default=sentinel.peerCertificate) |
708 | 52 | in_use = attr.ib(default=False) | ||
709 | 51 | 53 | ||
710 | 52 | def callRemote(self, cmd, **arguments): | 54 | def callRemote(self, cmd, **arguments): |
711 | 53 | return succeed(sentinel.response) | 55 | return succeed(sentinel.response) |
712 | @@ -56,6 +58,22 @@ class FakeConnectionToRegion: | |||
713 | 56 | verifyObject(IConnectionToRegion, FakeConnectionToRegion()) | 58 | verifyObject(IConnectionToRegion, FakeConnectionToRegion()) |
714 | 57 | 59 | ||
715 | 58 | 60 | ||
716 | 61 | @attr.s(eq=False, order=False) | ||
717 | 62 | @implementer(IConnectionToRegion) | ||
718 | 63 | class FakeBusyConnectionToRegion: | ||
719 | 64 | "A fake `IConnectionToRegion` that appears busy." "" | ||
720 | 65 | |||
721 | 66 | ident = attr.ib(default=sentinel.ident) | ||
722 | 67 | localIdent = attr.ib(default=sentinel.localIdent) | ||
723 | 68 | address = attr.ib(default=(sentinel.host, sentinel.port)) | ||
724 | 69 | hostCertificate = attr.ib(default=sentinel.hostCertificate) | ||
725 | 70 | peerCertificate = attr.ib(default=sentinel.peerCertificate) | ||
726 | 71 | in_use = attr.ib(default=True) | ||
727 | 72 | |||
728 | 73 | def callRemote(self, cmd, **arguments): | ||
729 | 74 | return succeed(sentinel.response) | ||
730 | 75 | |||
731 | 76 | |||
732 | 59 | class StubOS(OperatingSystem): | 77 | class StubOS(OperatingSystem): |
733 | 60 | """An :py:class:`OperatingSystem` subclass that has canned answers. | 78 | """An :py:class:`OperatingSystem` subclass that has canned answers. |
734 | 61 | 79 | ||
735 | diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py | |||
736 | index b50311d..6f3e4f9 100644 | |||
737 | --- a/src/provisioningserver/rpc/tests/test_clusterservice.py | |||
738 | +++ b/src/provisioningserver/rpc/tests/test_clusterservice.py | |||
739 | @@ -23,7 +23,6 @@ from testtools.matchers import ( | |||
740 | 23 | Is, | 23 | Is, |
741 | 24 | IsInstance, | 24 | IsInstance, |
742 | 25 | KeysEqual, | 25 | KeysEqual, |
743 | 26 | MatchesAll, | ||
744 | 27 | MatchesDict, | 26 | MatchesDict, |
745 | 28 | MatchesListwise, | 27 | MatchesListwise, |
746 | 29 | MatchesStructure, | 28 | MatchesStructure, |
747 | @@ -32,7 +31,6 @@ from twisted import web | |||
748 | 32 | from twisted.application.internet import TimerService | 31 | from twisted.application.internet import TimerService |
749 | 33 | from twisted.internet import error, reactor | 32 | from twisted.internet import error, reactor |
750 | 34 | from twisted.internet.defer import Deferred, fail, inlineCallbacks, succeed | 33 | from twisted.internet.defer import Deferred, fail, inlineCallbacks, succeed |
751 | 35 | from twisted.internet.endpoints import TCP6ClientEndpoint | ||
752 | 36 | from twisted.internet.error import ConnectionClosed | 34 | from twisted.internet.error import ConnectionClosed |
753 | 37 | from twisted.internet.task import Clock | 35 | from twisted.internet.task import Clock |
754 | 38 | from twisted.internet.testing import StringTransportWithDisconnection | 36 | from twisted.internet.testing import StringTransportWithDisconnection |
755 | @@ -117,7 +115,11 @@ from provisioningserver.rpc.testing import ( | |||
756 | 117 | call_responder, | 115 | call_responder, |
757 | 118 | MockLiveClusterToRegionRPCFixture, | 116 | MockLiveClusterToRegionRPCFixture, |
758 | 119 | ) | 117 | ) |
760 | 120 | from provisioningserver.rpc.testing.doubles import DummyConnection, StubOS | 118 | from provisioningserver.rpc.testing.doubles import ( |
761 | 119 | FakeBusyConnectionToRegion, | ||
762 | 120 | FakeConnection, | ||
763 | 121 | StubOS, | ||
764 | 122 | ) | ||
765 | 121 | from provisioningserver.security import set_shared_secret_on_filesystem | 123 | from provisioningserver.security import set_shared_secret_on_filesystem |
766 | 122 | from provisioningserver.service_monitor import service_monitor | 124 | from provisioningserver.service_monitor import service_monitor |
767 | 123 | from provisioningserver.testing.config import ClusterConfigurationFixture | 125 | from provisioningserver.testing.config import ClusterConfigurationFixture |
768 | @@ -444,8 +446,10 @@ class TestClusterProtocol_DescribePowerTypes(MAASTestCase): | |||
769 | 444 | ) | 446 | ) |
770 | 445 | 447 | ||
771 | 446 | 448 | ||
774 | 447 | def make_inert_client_service(): | 449 | def make_inert_client_service(max_idle_conns=1, max_conns=1, keepalive=1): |
775 | 448 | service = ClusterClientService(Clock()) | 450 | service = ClusterClientService( |
776 | 451 | Clock(), max_idle_conns, max_conns, keepalive | ||
777 | 452 | ) | ||
778 | 449 | # ClusterClientService's superclass, TimerService, creates a | 453 | # ClusterClientService's superclass, TimerService, creates a |
779 | 450 | # LoopingCall with now=True. We neuter it here to allow | 454 | # LoopingCall with now=True. We neuter it here to allow |
780 | 451 | # observation of the behaviour of _update_interval() for | 455 | # observation of the behaviour of _update_interval() for |
781 | @@ -498,11 +502,11 @@ class TestClusterClientService(MAASTestCase): | |||
782 | 498 | ) | 502 | ) |
783 | 499 | 503 | ||
784 | 500 | # Fake some connections. | 504 | # Fake some connections. |
790 | 501 | service.connections = { | 505 | service.connections.connections = { |
791 | 502 | ipv4client.eventloop: ipv4client, | 506 | ipv4client.eventloop: [ipv4client], |
792 | 503 | ipv6client.eventloop: ipv6client, | 507 | ipv6client.eventloop: [ipv6client], |
793 | 504 | ipv6mapped.eventloop: ipv6mapped, | 508 | ipv6mapped.eventloop: [ipv6mapped], |
794 | 505 | hostclient.eventloop: hostclient, | 509 | hostclient.eventloop: [hostclient], |
795 | 506 | } | 510 | } |
796 | 507 | 511 | ||
797 | 508 | # Update the RPC state to the filesystem and info cache. | 512 | # Update the RPC state to the filesystem and info cache. |
798 | @@ -515,7 +519,8 @@ class TestClusterClientService(MAASTestCase): | |||
799 | 515 | Equals( | 519 | Equals( |
800 | 516 | { | 520 | { |
801 | 517 | client.address[0] | 521 | client.address[0] |
803 | 518 | for _, client in service.connections.items() | 522 | for _, clients in service.connections.items() |
804 | 523 | for client in clients | ||
805 | 519 | } | 524 | } |
806 | 520 | ), | 525 | ), |
807 | 521 | ) | 526 | ) |
808 | @@ -999,9 +1004,9 @@ class TestClusterClientService(MAASTestCase): | |||
809 | 999 | def test_update_connections_initially(self): | 1004 | def test_update_connections_initially(self): |
810 | 1000 | service = ClusterClientService(Clock()) | 1005 | service = ClusterClientService(Clock()) |
811 | 1001 | mock_client = Mock() | 1006 | mock_client = Mock() |
813 | 1002 | _make_connection = self.patch(service, "_make_connection") | 1007 | _make_connection = self.patch(service.connections, "connect") |
814 | 1003 | _make_connection.side_effect = lambda *args: succeed(mock_client) | 1008 | _make_connection.side_effect = lambda *args: succeed(mock_client) |
816 | 1004 | _drop_connection = self.patch(service, "_drop_connection") | 1009 | _drop_connection = self.patch(service.connections, "disconnect") |
817 | 1005 | 1010 | ||
818 | 1006 | info = json.loads(self.example_rpc_info_view_response.decode("ascii")) | 1011 | info = json.loads(self.example_rpc_info_view_response.decode("ascii")) |
819 | 1007 | yield service._update_connections(info["eventloops"]) | 1012 | yield service._update_connections(info["eventloops"]) |
820 | @@ -1020,7 +1025,7 @@ class TestClusterClientService(MAASTestCase): | |||
821 | 1020 | "host1:pid=2002": mock_client, | 1025 | "host1:pid=2002": mock_client, |
822 | 1021 | "host2:pid=3003": mock_client, | 1026 | "host2:pid=3003": mock_client, |
823 | 1022 | }, | 1027 | }, |
825 | 1023 | service.try_connections, | 1028 | service.connections.try_connections, |
826 | 1024 | ) | 1029 | ) |
827 | 1025 | 1030 | ||
828 | 1026 | self.assertEqual([], _drop_connection.mock_calls) | 1031 | self.assertEqual([], _drop_connection.mock_calls) |
829 | @@ -1038,7 +1043,7 @@ class TestClusterClientService(MAASTestCase): | |||
830 | 1038 | for address in addresses: | 1043 | for address in addresses: |
831 | 1039 | client = Mock() | 1044 | client = Mock() |
832 | 1040 | client.address = address | 1045 | client.address = address |
834 | 1041 | service.connections[eventloop] = client | 1046 | service.connections.connections[eventloop] = [client] |
835 | 1042 | 1047 | ||
836 | 1043 | logger = self.useFixture(TwistedLoggerFixture()) | 1048 | logger = self.useFixture(TwistedLoggerFixture()) |
837 | 1044 | 1049 | ||
838 | @@ -1055,7 +1060,7 @@ class TestClusterClientService(MAASTestCase): | |||
839 | 1055 | @inlineCallbacks | 1060 | @inlineCallbacks |
840 | 1056 | def test_update_connections_connect_error_is_logged_tersely(self): | 1061 | def test_update_connections_connect_error_is_logged_tersely(self): |
841 | 1057 | service = ClusterClientService(Clock()) | 1062 | service = ClusterClientService(Clock()) |
843 | 1058 | _make_connection = self.patch(service, "_make_connection") | 1063 | _make_connection = self.patch(service.connections, "connect") |
844 | 1059 | _make_connection.side_effect = error.ConnectionRefusedError() | 1064 | _make_connection.side_effect = error.ConnectionRefusedError() |
845 | 1060 | 1065 | ||
846 | 1061 | logger = self.useFixture(TwistedLoggerFixture()) | 1066 | logger = self.useFixture(TwistedLoggerFixture()) |
847 | @@ -1079,7 +1084,7 @@ class TestClusterClientService(MAASTestCase): | |||
848 | 1079 | @inlineCallbacks | 1084 | @inlineCallbacks |
849 | 1080 | def test_update_connections_unknown_error_is_logged_with_stack(self): | 1085 | def test_update_connections_unknown_error_is_logged_with_stack(self): |
850 | 1081 | service = ClusterClientService(Clock()) | 1086 | service = ClusterClientService(Clock()) |
852 | 1082 | _make_connection = self.patch(service, "_make_connection") | 1087 | _make_connection = self.patch(service.connections, "connect") |
853 | 1083 | _make_connection.side_effect = RuntimeError("Something went wrong.") | 1088 | _make_connection.side_effect = RuntimeError("Something went wrong.") |
854 | 1084 | 1089 | ||
855 | 1085 | logger = self.useFixture(TwistedLoggerFixture()) | 1090 | logger = self.useFixture(TwistedLoggerFixture()) |
856 | @@ -1106,8 +1111,8 @@ class TestClusterClientService(MAASTestCase): | |||
857 | 1106 | 1111 | ||
858 | 1107 | def test_update_connections_when_there_are_existing_connections(self): | 1112 | def test_update_connections_when_there_are_existing_connections(self): |
859 | 1108 | service = ClusterClientService(Clock()) | 1113 | service = ClusterClientService(Clock()) |
862 | 1109 | _make_connection = self.patch(service, "_make_connection") | 1114 | _connect = self.patch(service.connections, "connect") |
863 | 1110 | _drop_connection = self.patch(service, "_drop_connection") | 1115 | _disconnect = self.patch(service.connections, "disconnect") |
864 | 1111 | 1116 | ||
865 | 1112 | host1client = ClusterClient( | 1117 | host1client = ClusterClient( |
866 | 1113 | ("::ffff:1.1.1.1", 1111), "host1:pid=1", service | 1118 | ("::ffff:1.1.1.1", 1111), "host1:pid=1", service |
867 | @@ -1120,9 +1125,9 @@ class TestClusterClientService(MAASTestCase): | |||
868 | 1120 | ) | 1125 | ) |
869 | 1121 | 1126 | ||
870 | 1122 | # Fake some connections. | 1127 | # Fake some connections. |
874 | 1123 | service.connections = { | 1128 | service.connections.connections = { |
875 | 1124 | host1client.eventloop: host1client, | 1129 | host1client.eventloop: [host1client], |
876 | 1125 | host2client.eventloop: host2client, | 1130 | host2client.eventloop: [host2client], |
877 | 1126 | } | 1131 | } |
878 | 1127 | 1132 | ||
879 | 1128 | # Request a new set of connections that overlaps with the | 1133 | # Request a new set of connections that overlaps with the |
880 | @@ -1137,10 +1142,10 @@ class TestClusterClientService(MAASTestCase): | |||
881 | 1137 | # A connection is made for host3's event-loop, and the | 1142 | # A connection is made for host3's event-loop, and the |
882 | 1138 | # connection to host2's event-loop is dropped. | 1143 | # connection to host2's event-loop is dropped. |
883 | 1139 | self.assertThat( | 1144 | self.assertThat( |
885 | 1140 | _make_connection, | 1145 | _connect, |
886 | 1141 | MockCalledOnceWith(host3client.eventloop, host3client.address), | 1146 | MockCalledOnceWith(host3client.eventloop, host3client.address), |
887 | 1142 | ) | 1147 | ) |
889 | 1143 | self.assertThat(_drop_connection, MockCalledWith(host2client)) | 1148 | self.assertThat(_disconnect, MockCalledWith(host2client)) |
890 | 1144 | 1149 | ||
891 | 1145 | @inlineCallbacks | 1150 | @inlineCallbacks |
892 | 1146 | def test_update_only_updates_interval_when_eventloops_are_unknown(self): | 1151 | def test_update_only_updates_interval_when_eventloops_are_unknown(self): |
893 | @@ -1175,57 +1180,15 @@ class TestClusterClientService(MAASTestCase): | |||
894 | 1175 | logger.dump(), | 1180 | logger.dump(), |
895 | 1176 | ) | 1181 | ) |
896 | 1177 | 1182 | ||
897 | 1178 | def test_make_connection(self): | ||
898 | 1179 | service = ClusterClientService(Clock()) | ||
899 | 1180 | connectProtocol = self.patch(clusterservice, "connectProtocol") | ||
900 | 1181 | service._make_connection("an-event-loop", ("a.example.com", 1111)) | ||
901 | 1182 | self.assertThat(connectProtocol.call_args_list, HasLength(1)) | ||
902 | 1183 | self.assertThat( | ||
903 | 1184 | connectProtocol.call_args_list[0][0], | ||
904 | 1185 | MatchesListwise( | ||
905 | 1186 | ( | ||
906 | 1187 | # First argument is an IPv4 TCP client endpoint | ||
907 | 1188 | # specification. | ||
908 | 1189 | MatchesAll( | ||
909 | 1190 | IsInstance(TCP6ClientEndpoint), | ||
910 | 1191 | MatchesStructure.byEquality( | ||
911 | 1192 | _reactor=service.clock, | ||
912 | 1193 | _host="a.example.com", | ||
913 | 1194 | _port=1111, | ||
914 | 1195 | ), | ||
915 | 1196 | ), | ||
916 | 1197 | # Second argument is a ClusterClient instance, the | ||
917 | 1198 | # protocol to use for the connection. | ||
918 | 1199 | MatchesAll( | ||
919 | 1200 | IsInstance(clusterservice.ClusterClient), | ||
920 | 1201 | MatchesStructure.byEquality( | ||
921 | 1202 | address=("a.example.com", 1111), | ||
922 | 1203 | eventloop="an-event-loop", | ||
923 | 1204 | service=service, | ||
924 | 1205 | ), | ||
925 | 1206 | ), | ||
926 | 1207 | ) | ||
927 | 1208 | ), | ||
928 | 1209 | ) | ||
929 | 1210 | |||
930 | 1211 | def test_drop_connection(self): | ||
931 | 1212 | connection = Mock() | ||
932 | 1213 | service = make_inert_client_service() | ||
933 | 1214 | service.startService() | ||
934 | 1215 | service._drop_connection(connection) | ||
935 | 1216 | self.assertThat( | ||
936 | 1217 | connection.transport.loseConnection, MockCalledOnceWith() | ||
937 | 1218 | ) | ||
938 | 1219 | |||
939 | 1220 | def test_add_connection_removes_from_try_connections(self): | 1183 | def test_add_connection_removes_from_try_connections(self): |
940 | 1221 | service = make_inert_client_service() | 1184 | service = make_inert_client_service() |
941 | 1222 | service.startService() | 1185 | service.startService() |
942 | 1223 | endpoint = Mock() | 1186 | endpoint = Mock() |
943 | 1224 | connection = Mock() | 1187 | connection = Mock() |
944 | 1225 | connection.address = (":::ffff", 2222) | 1188 | connection.address = (":::ffff", 2222) |
946 | 1226 | service.try_connections[endpoint] = connection | 1189 | service.connections.try_connections[endpoint] = connection |
947 | 1227 | service.add_connection(endpoint, connection) | 1190 | service.add_connection(endpoint, connection) |
949 | 1228 | self.assertThat(service.try_connections, Equals({})) | 1191 | self.assertThat(service.connections.try_connections, Equals({})) |
950 | 1229 | 1192 | ||
951 | 1230 | def test_add_connection_adds_to_connections(self): | 1193 | def test_add_connection_adds_to_connections(self): |
952 | 1231 | service = make_inert_client_service() | 1194 | service = make_inert_client_service() |
953 | @@ -1234,7 +1197,7 @@ class TestClusterClientService(MAASTestCase): | |||
954 | 1234 | connection = Mock() | 1197 | connection = Mock() |
955 | 1235 | connection.address = (":::ffff", 2222) | 1198 | connection.address = (":::ffff", 2222) |
956 | 1236 | service.add_connection(endpoint, connection) | 1199 | service.add_connection(endpoint, connection) |
958 | 1237 | self.assertThat(service.connections, Equals({endpoint: connection})) | 1200 | self.assertEqual(service.connections, {endpoint: [connection]}) |
959 | 1238 | 1201 | ||
960 | 1239 | def test_add_connection_calls__update_saved_rpc_info_state(self): | 1202 | def test_add_connection_calls__update_saved_rpc_info_state(self): |
961 | 1240 | service = make_inert_client_service() | 1203 | service = make_inert_client_service() |
962 | @@ -1248,21 +1211,45 @@ class TestClusterClientService(MAASTestCase): | |||
963 | 1248 | service._update_saved_rpc_info_state, MockCalledOnceWith() | 1211 | service._update_saved_rpc_info_state, MockCalledOnceWith() |
964 | 1249 | ) | 1212 | ) |
965 | 1250 | 1213 | ||
966 | 1214 | def test_add_connection_creates_max_idle_connections(self): | ||
967 | 1215 | service = make_inert_client_service(max_idle_conns=2) | ||
968 | 1216 | service.startService() | ||
969 | 1217 | endpoint = Mock() | ||
970 | 1218 | connection = Mock() | ||
971 | 1219 | connection.address = (":::ffff", 2222) | ||
972 | 1220 | connection2 = Mock() | ||
973 | 1221 | connection.address = (":::ffff", 2222) | ||
974 | 1222 | self.patch(service.connections, "connect").return_value = succeed( | ||
975 | 1223 | connection2 | ||
976 | 1224 | ) | ||
977 | 1225 | self.patch_autospec(service, "_update_saved_rpc_info_state") | ||
978 | 1226 | service.add_connection(endpoint, connection) | ||
979 | 1227 | self.assertEqual( | ||
980 | 1228 | len( | ||
981 | 1229 | [ | ||
982 | 1230 | conn | ||
983 | 1231 | for conns in service.connections.values() | ||
984 | 1232 | for conn in conns | ||
985 | 1233 | ] | ||
986 | 1234 | ), | ||
987 | 1235 | service.connections._max_idle_connections, | ||
988 | 1236 | ) | ||
989 | 1237 | |||
990 | 1251 | def test_remove_connection_removes_from_try_connections(self): | 1238 | def test_remove_connection_removes_from_try_connections(self): |
991 | 1252 | service = make_inert_client_service() | 1239 | service = make_inert_client_service() |
992 | 1253 | service.startService() | 1240 | service.startService() |
993 | 1254 | endpoint = Mock() | 1241 | endpoint = Mock() |
994 | 1255 | connection = Mock() | 1242 | connection = Mock() |
996 | 1256 | service.try_connections[endpoint] = connection | 1243 | service.connections.try_connections[endpoint] = connection |
997 | 1257 | service.remove_connection(endpoint, connection) | 1244 | service.remove_connection(endpoint, connection) |
999 | 1258 | self.assertThat(service.try_connections, Equals({})) | 1245 | self.assertEqual(service.connections.try_connections, {}) |
1000 | 1259 | 1246 | ||
1001 | 1260 | def test_remove_connection_removes_from_connections(self): | 1247 | def test_remove_connection_removes_from_connections(self): |
1002 | 1261 | service = make_inert_client_service() | 1248 | service = make_inert_client_service() |
1003 | 1262 | service.startService() | 1249 | service.startService() |
1004 | 1263 | endpoint = Mock() | 1250 | endpoint = Mock() |
1005 | 1264 | connection = Mock() | 1251 | connection = Mock() |
1007 | 1265 | service.connections[endpoint] = connection | 1252 | service.connections[endpoint] = {connection} |
1008 | 1266 | service.remove_connection(endpoint, connection) | 1253 | service.remove_connection(endpoint, connection) |
1009 | 1267 | self.assertThat(service.connections, Equals({})) | 1254 | self.assertThat(service.connections, Equals({})) |
1010 | 1268 | 1255 | ||
1011 | @@ -1271,7 +1258,7 @@ class TestClusterClientService(MAASTestCase): | |||
1012 | 1271 | service.startService() | 1258 | service.startService() |
1013 | 1272 | endpoint = Mock() | 1259 | endpoint = Mock() |
1014 | 1273 | connection = Mock() | 1260 | connection = Mock() |
1016 | 1274 | service.connections[endpoint] = connection | 1261 | service.connections[endpoint] = {connection} |
1017 | 1275 | service.remove_connection(endpoint, connection) | 1262 | service.remove_connection(endpoint, connection) |
1018 | 1276 | self.assertEqual(service.step, service.INTERVAL_LOW) | 1263 | self.assertEqual(service.step, service.INTERVAL_LOW) |
1019 | 1277 | 1264 | ||
1020 | @@ -1280,7 +1267,7 @@ class TestClusterClientService(MAASTestCase): | |||
1021 | 1280 | service.startService() | 1267 | service.startService() |
1022 | 1281 | endpoint = Mock() | 1268 | endpoint = Mock() |
1023 | 1282 | connection = Mock() | 1269 | connection = Mock() |
1025 | 1283 | service.connections[endpoint] = connection | 1270 | service.connections[endpoint] = {connection} |
1026 | 1284 | 1271 | ||
1027 | 1285 | # Enable both dhcpd and dhcpd6. | 1272 | # Enable both dhcpd and dhcpd6. |
1028 | 1286 | service_monitor.getServiceByName("dhcpd").on() | 1273 | service_monitor.getServiceByName("dhcpd").on() |
1029 | @@ -1294,45 +1281,96 @@ class TestClusterClientService(MAASTestCase): | |||
1030 | 1294 | 1281 | ||
1031 | 1295 | def test_getClient(self): | 1282 | def test_getClient(self): |
1032 | 1296 | service = ClusterClientService(Clock()) | 1283 | service = ClusterClientService(Clock()) |
1037 | 1297 | service.connections = { | 1284 | service.connections.connections = { |
1038 | 1298 | sentinel.eventloop01: DummyConnection(), | 1285 | sentinel.eventloop01: [FakeConnection()], |
1039 | 1299 | sentinel.eventloop02: DummyConnection(), | 1286 | sentinel.eventloop02: [FakeConnection()], |
1040 | 1300 | sentinel.eventloop03: DummyConnection(), | 1287 | sentinel.eventloop03: [FakeConnection()], |
1041 | 1301 | } | 1288 | } |
1042 | 1302 | self.assertIn( | 1289 | self.assertIn( |
1043 | 1303 | service.getClient(), | 1290 | service.getClient(), |
1045 | 1304 | {common.Client(conn) for conn in service.connections.values()}, | 1291 | { |
1046 | 1292 | common.Client(conn) | ||
1047 | 1293 | for conns in service.connections.values() | ||
1048 | 1294 | for conn in conns | ||
1049 | 1295 | }, | ||
1050 | 1305 | ) | 1296 | ) |
1051 | 1306 | 1297 | ||
1052 | 1307 | def test_getClient_when_there_are_no_connections(self): | 1298 | def test_getClient_when_there_are_no_connections(self): |
1053 | 1308 | service = ClusterClientService(Clock()) | 1299 | service = ClusterClientService(Clock()) |
1055 | 1309 | service.connections = {} | 1300 | service.connections.connections = {} |
1056 | 1310 | self.assertRaises(exceptions.NoConnectionsAvailable, service.getClient) | 1301 | self.assertRaises(exceptions.NoConnectionsAvailable, service.getClient) |
1057 | 1311 | 1302 | ||
1058 | 1312 | @inlineCallbacks | 1303 | @inlineCallbacks |
1059 | 1304 | def test_getClientNow_scales_connections_when_busy(self): | ||
1060 | 1305 | service = ClusterClientService(Clock(), max_conns=2) | ||
1061 | 1306 | service.connections.connections = { | ||
1062 | 1307 | sentinel.eventloop01: [FakeBusyConnectionToRegion()], | ||
1063 | 1308 | sentinel.eventloop02: [FakeBusyConnectionToRegion()], | ||
1064 | 1309 | sentinel.eventloop03: [FakeBusyConnectionToRegion()], | ||
1065 | 1310 | } | ||
1066 | 1311 | self.patch(service.connections, "connect").return_value = succeed( | ||
1067 | 1312 | FakeConnection() | ||
1068 | 1313 | ) | ||
1069 | 1314 | original_conns = [ | ||
1070 | 1315 | conn for conns in service.connections.values() for conn in conns | ||
1071 | 1316 | ] | ||
1072 | 1317 | new_client = yield service.getClientNow() | ||
1073 | 1318 | new_conn = new_client._conn | ||
1074 | 1319 | self.assertIsNotNone(new_conn) | ||
1075 | 1320 | self.assertNotIn(new_conn, original_conns) | ||
1076 | 1321 | self.assertIn( | ||
1077 | 1322 | new_conn, | ||
1078 | 1323 | [conn for conns in service.connections.values() for conn in conns], | ||
1079 | 1324 | ) | ||
1080 | 1325 | |||
1081 | 1326 | @inlineCallbacks | ||
1082 | 1327 | def test_getClientNow_returns_an_existing_connection_when_max_are_open( | ||
1083 | 1328 | self, | ||
1084 | 1329 | ): | ||
1085 | 1330 | service = ClusterClientService(Clock(), max_conns=1) | ||
1086 | 1331 | service.connections.connections = { | ||
1087 | 1332 | sentinel.eventloop01: [FakeBusyConnectionToRegion()], | ||
1088 | 1333 | sentinel.eventloop02: [FakeBusyConnectionToRegion()], | ||
1089 | 1334 | sentinel.eventloop03: [FakeBusyConnectionToRegion()], | ||
1090 | 1335 | } | ||
1091 | 1336 | self.patch(service, "_make_connection").return_value = succeed( | ||
1092 | 1337 | FakeConnection() | ||
1093 | 1338 | ) | ||
1094 | 1339 | original_conns = [ | ||
1095 | 1340 | conn for conns in service.connections.values() for conn in conns | ||
1096 | 1341 | ] | ||
1097 | 1342 | new_client = yield service.getClientNow() | ||
1098 | 1343 | new_conn = new_client._conn | ||
1099 | 1344 | self.assertIsNotNone(new_conn) | ||
1100 | 1345 | self.assertIn(new_conn, original_conns) | ||
1101 | 1346 | |||
1102 | 1347 | @inlineCallbacks | ||
1103 | 1313 | def test_getClientNow_returns_current_connection(self): | 1348 | def test_getClientNow_returns_current_connection(self): |
1104 | 1314 | service = ClusterClientService(Clock()) | 1349 | service = ClusterClientService(Clock()) |
1109 | 1315 | service.connections = { | 1350 | service.connections.connections = { |
1110 | 1316 | sentinel.eventloop01: DummyConnection(), | 1351 | sentinel.eventloop01: [FakeConnection()], |
1111 | 1317 | sentinel.eventloop02: DummyConnection(), | 1352 | sentinel.eventloop02: [FakeConnection()], |
1112 | 1318 | sentinel.eventloop03: DummyConnection(), | 1353 | sentinel.eventloop03: [FakeConnection()], |
1113 | 1319 | } | 1354 | } |
1114 | 1320 | client = yield service.getClientNow() | 1355 | client = yield service.getClientNow() |
1115 | 1321 | self.assertIn( | 1356 | self.assertIn( |
1116 | 1322 | client, | 1357 | client, |
1118 | 1323 | {common.Client(conn) for conn in service.connections.values()}, | 1358 | [ |
1119 | 1359 | common.Client(conn) | ||
1120 | 1360 | for conns in service.connections.values() | ||
1121 | 1361 | for conn in conns | ||
1122 | 1362 | ], | ||
1123 | 1324 | ) | 1363 | ) |
1124 | 1325 | 1364 | ||
1125 | 1326 | @inlineCallbacks | 1365 | @inlineCallbacks |
1126 | 1327 | def test_getClientNow_calls__tryUpdate_when_there_are_no_connections(self): | 1366 | def test_getClientNow_calls__tryUpdate_when_there_are_no_connections(self): |
1127 | 1328 | service = ClusterClientService(Clock()) | 1367 | service = ClusterClientService(Clock()) |
1128 | 1329 | service.connections = {} | ||
1129 | 1330 | 1368 | ||
1130 | 1331 | def addConnections(): | 1369 | def addConnections(): |
1135 | 1332 | service.connections = { | 1370 | service.connections.connections = { |
1136 | 1333 | sentinel.eventloop01: DummyConnection(), | 1371 | sentinel.eventloop01: [FakeConnection()], |
1137 | 1334 | sentinel.eventloop02: DummyConnection(), | 1372 | sentinel.eventloop02: [FakeConnection()], |
1138 | 1335 | sentinel.eventloop03: DummyConnection(), | 1373 | sentinel.eventloop03: [FakeConnection()], |
1139 | 1336 | } | 1374 | } |
1140 | 1337 | return succeed(None) | 1375 | return succeed(None) |
1141 | 1338 | 1376 | ||
1142 | @@ -1340,12 +1378,15 @@ class TestClusterClientService(MAASTestCase): | |||
1143 | 1340 | client = yield service.getClientNow() | 1378 | client = yield service.getClientNow() |
1144 | 1341 | self.assertIn( | 1379 | self.assertIn( |
1145 | 1342 | client, | 1380 | client, |
1147 | 1343 | {common.Client(conn) for conn in service.connections.values()}, | 1381 | { |
1148 | 1382 | common.Client(conn) | ||
1149 | 1383 | for conns in service.connections.values() | ||
1150 | 1384 | for conn in conns | ||
1151 | 1385 | }, | ||
1152 | 1344 | ) | 1386 | ) |
1153 | 1345 | 1387 | ||
1154 | 1346 | def test_getClientNow_raises_exception_when_no_clients(self): | 1388 | def test_getClientNow_raises_exception_when_no_clients(self): |
1155 | 1347 | service = ClusterClientService(Clock()) | 1389 | service = ClusterClientService(Clock()) |
1156 | 1348 | service.connections = {} | ||
1157 | 1349 | 1390 | ||
1158 | 1350 | self.patch(service, "_tryUpdate").return_value = succeed(None) | 1391 | self.patch(service, "_tryUpdate").return_value = succeed(None) |
1159 | 1351 | d = service.getClientNow() | 1392 | d = service.getClientNow() |
1160 | @@ -1383,17 +1424,16 @@ class TestClusterClientService(MAASTestCase): | |||
1161 | 1383 | def test_getAllClients(self): | 1424 | def test_getAllClients(self): |
1162 | 1384 | service = ClusterClientService(Clock()) | 1425 | service = ClusterClientService(Clock()) |
1163 | 1385 | uuid1 = factory.make_UUID() | 1426 | uuid1 = factory.make_UUID() |
1166 | 1386 | c1 = DummyConnection() | 1427 | c1 = FakeConnection() |
1167 | 1387 | service.connections[uuid1] = c1 | 1428 | service.connections[uuid1] = {c1} |
1168 | 1388 | uuid2 = factory.make_UUID() | 1429 | uuid2 = factory.make_UUID() |
1171 | 1389 | c2 = DummyConnection() | 1430 | c2 = FakeConnection() |
1172 | 1390 | service.connections[uuid2] = c2 | 1431 | service.connections[uuid2] = {c2} |
1173 | 1391 | clients = service.getAllClients() | 1432 | clients = service.getAllClients() |
1174 | 1392 | self.assertEqual(clients, [common.Client(c1), common.Client(c2)]) | 1433 | self.assertEqual(clients, [common.Client(c1), common.Client(c2)]) |
1175 | 1393 | 1434 | ||
1176 | 1394 | def test_getAllClients_when_there_are_no_connections(self): | 1435 | def test_getAllClients_when_there_are_no_connections(self): |
1177 | 1395 | service = ClusterClientService(Clock()) | 1436 | service = ClusterClientService(Clock()) |
1178 | 1396 | service.connections = {} | ||
1179 | 1397 | self.assertThat(service.getAllClients(), Equals([])) | 1437 | self.assertThat(service.getAllClients(), Equals([])) |
1180 | 1398 | 1438 | ||
1181 | 1399 | 1439 | ||
1182 | @@ -1546,7 +1586,7 @@ class TestClusterClient(MAASTestCase): | |||
1183 | 1546 | 1586 | ||
1184 | 1547 | def test_connecting(self): | 1587 | def test_connecting(self): |
1185 | 1548 | client = self.make_running_client() | 1588 | client = self.make_running_client() |
1187 | 1549 | client.service.try_connections[client.eventloop] = client | 1589 | client.service.connections.try_connections[client.eventloop] = client |
1188 | 1550 | self.patch_authenticate_for_success(client) | 1590 | self.patch_authenticate_for_success(client) |
1189 | 1551 | self.patch_register_for_success(client) | 1591 | self.patch_register_for_success(client) |
1190 | 1552 | self.assertEqual(client.service.connections, {}) | 1592 | self.assertEqual(client.service.connections, {}) |
1191 | @@ -1560,16 +1600,19 @@ class TestClusterClient(MAASTestCase): | |||
1192 | 1560 | self.assertTrue(extract_result(wait_for_authenticated)) | 1600 | self.assertTrue(extract_result(wait_for_authenticated)) |
1193 | 1561 | # ready has been set with the name of the event-loop. | 1601 | # ready has been set with the name of the event-loop. |
1194 | 1562 | self.assertEqual(client.eventloop, extract_result(wait_for_ready)) | 1602 | self.assertEqual(client.eventloop, extract_result(wait_for_ready)) |
1196 | 1563 | self.assertEqual(client.service.try_connections, {}) | 1603 | self.assertEqual(len(client.service.connections.try_connections), 0) |
1197 | 1564 | self.assertEqual( | 1604 | self.assertEqual( |
1199 | 1565 | client.service.connections, {client.eventloop: client} | 1605 | client.service.connections.connections, |
1200 | 1606 | {client.eventloop: [client]}, | ||
1201 | 1566 | ) | 1607 | ) |
1202 | 1567 | 1608 | ||
1203 | 1568 | def test_disconnects_when_there_is_an_existing_connection(self): | 1609 | def test_disconnects_when_there_is_an_existing_connection(self): |
1204 | 1569 | client = self.make_running_client() | 1610 | client = self.make_running_client() |
1205 | 1570 | 1611 | ||
1206 | 1571 | # Pretend that a connection already exists for this address. | 1612 | # Pretend that a connection already exists for this address. |
1208 | 1572 | client.service.connections[client.eventloop] = sentinel.connection | 1613 | client.service.connections.connections[client.eventloop] = [ |
1209 | 1614 | sentinel.connection | ||
1210 | 1615 | ] | ||
1211 | 1573 | 1616 | ||
1212 | 1574 | # Connect via an in-memory transport. | 1617 | # Connect via an in-memory transport. |
1213 | 1575 | transport = StringTransportWithDisconnection() | 1618 | transport = StringTransportWithDisconnection() |
1214 | @@ -1586,7 +1629,8 @@ class TestClusterClient(MAASTestCase): | |||
1215 | 1586 | # The connections list is unchanged because the new connection | 1629 | # The connections list is unchanged because the new connection |
1216 | 1587 | # immediately disconnects. | 1630 | # immediately disconnects. |
1217 | 1588 | self.assertEqual( | 1631 | self.assertEqual( |
1219 | 1589 | client.service.connections, {client.eventloop: sentinel.connection} | 1632 | client.service.connections, |
1220 | 1633 | {client.eventloop: [sentinel.connection]}, | ||
1221 | 1590 | ) | 1634 | ) |
1222 | 1591 | self.assertFalse(client.connected) | 1635 | self.assertFalse(client.connected) |
1223 | 1592 | self.assertIsNone(client.transport) | 1636 | self.assertIsNone(client.transport) |
1224 | @@ -1631,7 +1675,7 @@ class TestClusterClient(MAASTestCase): | |||
1225 | 1631 | 1675 | ||
1226 | 1632 | # The connections list is unchanged because the new connection | 1676 | # The connections list is unchanged because the new connection |
1227 | 1633 | # immediately disconnects. | 1677 | # immediately disconnects. |
1229 | 1634 | self.assertEqual(client.service.connections, {}) | 1678 | self.assertEqual(client.service.connections.connections, {}) |
1230 | 1635 | self.assertFalse(client.connected) | 1679 | self.assertFalse(client.connected) |
1231 | 1636 | 1680 | ||
1232 | 1637 | def test_disconnects_when_authentication_errors(self): | 1681 | def test_disconnects_when_authentication_errors(self): |
1233 | diff --git a/src/provisioningserver/rpc/tests/test_connectionpool.py b/src/provisioningserver/rpc/tests/test_connectionpool.py | |||
1234 | 1638 | new file mode 100644 | 1682 | new file mode 100644 |
1235 | index 0000000..692d5e6 | |||
1236 | --- /dev/null | |||
1237 | +++ b/src/provisioningserver/rpc/tests/test_connectionpool.py | |||
1238 | @@ -0,0 +1,280 @@ | |||
1239 | 1 | # Copyright 2022 Canonical Ltd. This software is licensed under the | ||
1240 | 2 | # GNU Affero General Public License version 3 (see the file LICENSE). | ||
1241 | 3 | |||
1242 | 4 | from unittest.mock import Mock | ||
1243 | 5 | |||
1244 | 6 | from twisted.internet.defer import inlineCallbacks, succeed | ||
1245 | 7 | from twisted.internet.endpoints import TCP6ClientEndpoint | ||
1246 | 8 | from twisted.internet.task import Clock | ||
1247 | 9 | |||
1248 | 10 | from maastesting import get_testing_timeout | ||
1249 | 11 | from maastesting.testcase import MAASTestCase, MAASTwistedRunTest | ||
1250 | 12 | from maastesting.twisted import extract_result | ||
1251 | 13 | from provisioningserver.rpc import connectionpool as connectionpoolModule | ||
1252 | 14 | from provisioningserver.rpc import exceptions | ||
1253 | 15 | from provisioningserver.rpc.clusterservice import ClusterClient | ||
1254 | 16 | from provisioningserver.rpc.connectionpool import ConnectionPool | ||
1255 | 17 | |||
1256 | 18 | TIMEOUT = get_testing_timeout() | ||
1257 | 19 | |||
1258 | 20 | |||
1259 | 21 | class TestConnectionPool(MAASTestCase): | ||
1260 | 22 | |||
1261 | 23 | run_tests_with = MAASTwistedRunTest.make_factory(timeout=TIMEOUT) | ||
1262 | 24 | |||
1263 | 25 | def test_setitem_sets_item_in_connections(self): | ||
1264 | 26 | cp = ConnectionPool(Clock(), Mock()) | ||
1265 | 27 | key = Mock() | ||
1266 | 28 | val = Mock() | ||
1267 | 29 | cp[key] = val | ||
1268 | 30 | self.assertEqual(cp.connections, {key: val}) | ||
1269 | 31 | |||
1270 | 32 | def test_getitem_gets_item_in_connections(self): | ||
1271 | 33 | cp = ConnectionPool(Clock(), Mock()) | ||
1272 | 34 | key = Mock() | ||
1273 | 35 | val = Mock() | ||
1274 | 36 | cp[key] = val | ||
1275 | 37 | self.assertEqual(cp.connections[key], cp[key]) | ||
1276 | 38 | |||
1277 | 39 | def test_len_gets_length_of_connections(self): | ||
1278 | 40 | cp = ConnectionPool(Clock(), Mock()) | ||
1279 | 41 | key = Mock() | ||
1280 | 42 | val = Mock() | ||
1281 | 43 | cp[key] = [val] | ||
1282 | 44 | self.assertEqual(len(cp), len(cp.get_all_connections())) | ||
1283 | 45 | |||
1284 | 46 | def test_delitem_removes_item_from_connections(self): | ||
1285 | 47 | cp = ConnectionPool(Clock(), Mock()) | ||
1286 | 48 | key = Mock() | ||
1287 | 49 | val = Mock() | ||
1288 | 50 | cp[key] = val | ||
1289 | 51 | self.assertEqual(cp.connections[key], val) | ||
1290 | 52 | del cp[key] | ||
1291 | 53 | self.assertIsNone(cp.connections.get(key)) | ||
1292 | 54 | |||
1293 | 55 | def test_contains_returns_if_key_in_connections(self): | ||
1294 | 56 | cp = ConnectionPool(Clock(), Mock()) | ||
1295 | 57 | key = Mock() | ||
1296 | 58 | val = Mock() | ||
1297 | 59 | cp[key] = val | ||
1298 | 60 | self.assertEqual(key in cp, key in cp.connections) | ||
1299 | 61 | |||
1300 | 62 | def test_compare_ConnectionPool_equal_to_compare_connections(self): | ||
1301 | 63 | cp = ConnectionPool(Clock(), Mock()) | ||
1302 | 64 | self.assertEqual(cp, cp.connections) | ||
1303 | 65 | self.assertEqual(cp, {}) | ||
1304 | 66 | |||
1305 | 67 | def test__reap_extra_connection_reaps_a_non_busy_connection(self): | ||
1306 | 68 | cp = ConnectionPool(Clock(), Mock()) | ||
1307 | 69 | eventloop = Mock() | ||
1308 | 70 | connection = Mock() | ||
1309 | 71 | connection.in_use = False | ||
1310 | 72 | cp[eventloop] = [connection] | ||
1311 | 73 | disconnect = self.patch(cp, "disconnect") | ||
1312 | 74 | cp._reap_extra_connection(eventloop, connection) | ||
1313 | 75 | self.assertEqual(len(cp), 0) | ||
1314 | 76 | disconnect.assert_called_once_with(connection) | ||
1315 | 77 | |||
1316 | 78 | def test__reap_extra_connection_waits_for_a_busy_connection(self): | ||
1317 | 79 | clock = Clock() | ||
1318 | 80 | cp = ConnectionPool(clock, Mock()) | ||
1319 | 81 | eventloop = Mock() | ||
1320 | 82 | connection = Mock() | ||
1321 | 83 | connection.in_use = True | ||
1322 | 84 | cp[eventloop] = [connection] | ||
1323 | 85 | self.patch(cp, "disconnect") | ||
1324 | 86 | cp._reap_extra_connection(eventloop, connection) | ||
1325 | 87 | self.assertIn(eventloop, clock.calls[0].args) | ||
1326 | 88 | self.assertIn(connection, clock.calls[0].args) | ||
1327 | 89 | self.assertEqual( | ||
1328 | 90 | "_reap_extra_connection", clock.calls[0].func.__name__ | ||
1329 | 91 | ) | ||
1330 | 92 | self.assertEqual(cp._keepalive, clock.calls[0].time) | ||
1331 | 93 | |||
1332 | 94 | def test_is_staged(self): | ||
1333 | 95 | cp = ConnectionPool(Clock(), Mock()) | ||
1334 | 96 | eventloop1 = Mock() | ||
1335 | 97 | eventloop2 = Mock() | ||
1336 | 98 | cp.try_connections[eventloop1] = Mock() | ||
1337 | 99 | self.assertTrue(cp.is_staged(eventloop1)) | ||
1338 | 100 | self.assertFalse(cp.is_staged(eventloop2)) | ||
1339 | 101 | |||
1340 | 102 | def test_get_staged_connection(self): | ||
1341 | 103 | cp = ConnectionPool(Clock(), Mock()) | ||
1342 | 104 | eventloop = Mock() | ||
1343 | 105 | connection = Mock() | ||
1344 | 106 | cp.try_connections[eventloop] = connection | ||
1345 | 107 | self.assertEqual(cp.get_staged_connection(eventloop), connection) | ||
1346 | 108 | |||
1347 | 109 | def test_get_staged_connections(self): | ||
1348 | 110 | cp = ConnectionPool(Clock(), Mock()) | ||
1349 | 111 | eventloop = Mock() | ||
1350 | 112 | connection = Mock() | ||
1351 | 113 | cp.try_connections[eventloop] = connection | ||
1352 | 114 | self.assertEqual(cp.get_staged_connections(), {eventloop: connection}) | ||
1353 | 115 | |||
1354 | 116 | def test_scale_up_connections_adds_a_connection(self): | ||
1355 | 117 | cp = ConnectionPool(Clock(), Mock(), max_conns=2) | ||
1356 | 118 | eventloop = Mock() | ||
1357 | 119 | connection1 = Mock() | ||
1358 | 120 | connection2 = Mock() | ||
1359 | 121 | connect = self.patch(cp, "connect") | ||
1360 | 122 | connect.return_value = succeed(connection2) | ||
1361 | 123 | cp[eventloop] = [connection1] | ||
1362 | 124 | cp.scale_up_connections() | ||
1363 | 125 | self.assertCountEqual(cp[eventloop], [connection1, connection2]) | ||
1364 | 126 | |||
1365 | 127 | def test_scale_up_connections_raises_MaxConnectionsOpen_when_cannot_create_another( | ||
1366 | 128 | self, | ||
1367 | 129 | ): | ||
1368 | 130 | cp = ConnectionPool(Clock(), Mock()) | ||
1369 | 131 | eventloop = Mock() | ||
1370 | 132 | connection1 = Mock() | ||
1371 | 133 | connection2 = Mock() | ||
1372 | 134 | connect = self.patch(cp, "connect") | ||
1373 | 135 | connect.return_value = succeed(connection2) | ||
1374 | 136 | cp[eventloop] = [connection1] | ||
1375 | 137 | self.assertRaises( | ||
1376 | 138 | exceptions.MaxConnectionsOpen, | ||
1377 | 139 | extract_result, | ||
1378 | 140 | cp.scale_up_connections(), | ||
1379 | 141 | ) | ||
1380 | 142 | |||
1381 | 143 | def test_get_connection(self): | ||
1382 | 144 | cp = ConnectionPool(Clock(), Mock(), max_idle_conns=2, max_conns=2) | ||
1383 | 145 | eventloops = [Mock() for _ in range(3)] | ||
1384 | 146 | cp.connections = { | ||
1385 | 147 | eventloop: [Mock() for _ in range(2)] for eventloop in eventloops | ||
1386 | 148 | } | ||
1387 | 149 | self.assertIn(cp.get_connection(eventloops[0]), cp[eventloops[0]]) | ||
1388 | 150 | |||
1389 | 151 | def test_get_random_connection(self): | ||
1390 | 152 | cp = ConnectionPool(Clock(), Mock(), max_idle_conns=2, max_conns=2) | ||
1391 | 153 | eventloops = [Mock() for _ in range(3)] | ||
1392 | 154 | cp.connections = { | ||
1393 | 155 | eventloop: [Mock() for _ in range(2)] for eventloop in eventloops | ||
1394 | 156 | } | ||
1395 | 157 | self.assertIn( | ||
1396 | 158 | cp.get_connection(eventloops[0]), | ||
1397 | 159 | [conn for conn_list in cp.values() for conn in conn_list], | ||
1398 | 160 | ) | ||
1399 | 161 | |||
1400 | 162 | def test_get_random_free_connection_returns_a_free_connection(self): | ||
1401 | 163 | cp = ConnectionPool(Clock(), Mock()) | ||
1402 | 164 | eventloops = [Mock() for _ in range(3)] | ||
1403 | 165 | |||
1404 | 166 | def _create_conn(in_use): | ||
1405 | 167 | conn = Mock() | ||
1406 | 168 | conn.in_use = in_use | ||
1407 | 169 | return conn | ||
1408 | 170 | |||
1409 | 171 | cp.connections = { | ||
1410 | 172 | eventloops[0]: [_create_conn(True)], | ||
1411 | 173 | eventloops[1]: [_create_conn(False)], | ||
1412 | 174 | eventloops[2]: [_create_conn(True)], | ||
1413 | 175 | } | ||
1414 | 176 | conn = cp.get_random_free_connection() | ||
1415 | 177 | self.assertIn(conn, cp[eventloops[1]]) | ||
1416 | 178 | |||
1417 | 179 | def test_get_random_free_connection_raises_AllConnectionsBusy_when_there_are_no_free_connections( | ||
1418 | 180 | self, | ||
1419 | 181 | ): | ||
1420 | 182 | cp = ConnectionPool(Clock(), Mock()) | ||
1421 | 183 | eventloops = [Mock() for _ in range(3)] | ||
1422 | 184 | |||
1423 | 185 | def _create_conn(in_use): | ||
1424 | 186 | conn = Mock() | ||
1425 | 187 | conn.in_use = in_use | ||
1426 | 188 | return conn | ||
1427 | 189 | |||
1428 | 190 | cp.connections = { | ||
1429 | 191 | eventloops[0]: [_create_conn(True)], | ||
1430 | 192 | eventloops[1]: [_create_conn(True)], | ||
1431 | 193 | eventloops[2]: [_create_conn(True)], | ||
1432 | 194 | } | ||
1433 | 195 | |||
1434 | 196 | self.assertRaises( | ||
1435 | 197 | exceptions.AllConnectionsBusy, cp.get_random_free_connection | ||
1436 | 198 | ) | ||
1437 | 199 | |||
1438 | 200 | def test_get_all_connections(self): | ||
1439 | 201 | cp = ConnectionPool(Clock(), Mock()) | ||
1440 | 202 | eventloops = [Mock() for _ in range(3)] | ||
1441 | 203 | cp.connections = { | ||
1442 | 204 | eventloops[0]: [Mock()], | ||
1443 | 205 | eventloops[1]: [Mock()], | ||
1444 | 206 | eventloops[2]: [Mock()], | ||
1445 | 207 | } | ||
1446 | 208 | |||
1447 | 209 | self.assertCountEqual( | ||
1448 | 210 | cp.get_all_connections(), | ||
1449 | 211 | [conn for conn_list in cp.values() for conn in conn_list], | ||
1450 | 212 | ) | ||
1451 | 213 | |||
1452 | 214 | def test_get_all_free_connections(self): | ||
1453 | 215 | cp = ConnectionPool(Clock(), Mock(), max_conns=2) | ||
1454 | 216 | eventloops = [Mock() for _ in range(3)] | ||
1455 | 217 | |||
1456 | 218 | def _create_conn(in_use): | ||
1457 | 219 | conn = Mock() | ||
1458 | 220 | conn.in_use = in_use | ||
1459 | 221 | return conn | ||
1460 | 222 | |||
1461 | 223 | cp.connections = { | ||
1462 | 224 | eventloops[0]: [_create_conn(True), _create_conn(False)], | ||
1463 | 225 | eventloops[1]: [_create_conn(True)], | ||
1464 | 226 | eventloops[2]: [_create_conn(False)], | ||
1465 | 227 | } | ||
1466 | 228 | |||
1467 | 229 | self.assertCountEqual( | ||
1468 | 230 | cp.get_all_free_connections(), | ||
1469 | 231 | [ | ||
1470 | 232 | conn | ||
1471 | 233 | for conn_list in cp.values() | ||
1472 | 234 | for conn in conn_list | ||
1473 | 235 | if not conn.in_use | ||
1474 | 236 | ], | ||
1475 | 237 | ) | ||
1476 | 238 | |||
1477 | 239 | @inlineCallbacks | ||
1478 | 240 | def test_connect(self): | ||
1479 | 241 | clock = Clock() | ||
1480 | 242 | connection = Mock() | ||
1481 | 243 | service = Mock() | ||
1482 | 244 | cp = ConnectionPool(clock, service) | ||
1483 | 245 | connectProtocol = self.patch(connectionpoolModule, "connectProtocol") | ||
1484 | 246 | connectProtocol.return_value = connection | ||
1485 | 247 | result = yield cp.connect("an-event-loop", ("a.example.com", 1111)) | ||
1486 | 248 | self.assertEqual(len(connectProtocol.call_args_list), 1) | ||
1487 | 249 | connectProtocol.called_once_with( | ||
1488 | 250 | TCP6ClientEndpoint(reactor=clock, host="a.example.com", port=1111), | ||
1489 | 251 | ClusterClient( | ||
1490 | 252 | address=("a.example.com", 1111), | ||
1491 | 253 | eventloop="an-event-loop", | ||
1492 | 254 | service=service, | ||
1493 | 255 | ), | ||
1494 | 256 | ) | ||
1495 | 257 | self.assertEqual(result, connection) | ||
1496 | 258 | |||
1497 | 259 | def test_drop_connection(self): | ||
1498 | 260 | connection = Mock() | ||
1499 | 261 | cp = ConnectionPool(Clock(), Mock()) | ||
1500 | 262 | cp.disconnect(connection) | ||
1501 | 263 | connection.transport.loseConnection.assert_called_once_with() | ||
1502 | 264 | |||
1503 | 265 | @inlineCallbacks | ||
1504 | 266 | def test_add_connection_adds_the_staged_connection(self): | ||
1505 | 267 | eventloop = Mock() | ||
1506 | 268 | connection = Mock() | ||
1507 | 269 | cp = ConnectionPool(Clock(), Mock()) | ||
1508 | 270 | cp.try_connections = {eventloop: connection} | ||
1509 | 271 | yield cp.add_connection(eventloop, connection) | ||
1510 | 272 | self.assertIn(connection, cp.get_all_connections()) | ||
1511 | 273 | |||
1512 | 274 | def test_remove_connection_removes_connection_from_pool(self): | ||
1513 | 275 | eventloop = Mock() | ||
1514 | 276 | connection = Mock() | ||
1515 | 277 | cp = ConnectionPool(Clock(), Mock()) | ||
1516 | 278 | cp.connections[eventloop] = [connection] | ||
1517 | 279 | cp.remove_connection(eventloop, connection) | ||
1518 | 280 | self.assertEqual(cp.connections, {}) |
UNIT TESTS pool_burst lp:~cgrabowski/maas/+git/maas into -b master lp:~maas-committers/maas
-b rpc_connection_
STATUS: FAILED maas-ci. internal: 8080/job/ maas-tester/ 318/consoleText de30fa33916ad40 da3d3b04e5
LOG: http://
COMMIT: c19b8616c6acc5c