Merge ~cgrabowski/maas:rpc_connection_pool_burst into maas:master
- Git
- lp:~cgrabowski/maas
- rpc_connection_pool_burst
- Merge into master
Status: | Merged |
---|---|
Approved by: | Christian Grabowski |
Approved revision: | 5f3b0b943fd0ca495036f6b21938da58ddeb6bba |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~cgrabowski/maas:rpc_connection_pool_burst |
Merge into: | maas:master |
Diff against target: |
768 lines (+304/-59) 12 files modified
src/provisioningserver/config.py (+17/-0) src/provisioningserver/plugin.py (+7/-1) src/provisioningserver/rackdservices/external.py (+3/-2) src/provisioningserver/rackdservices/http.py (+3/-2) src/provisioningserver/rackdservices/tests/test_external.py (+8/-5) src/provisioningserver/rackdservices/tests/test_http.py (+5/-4) src/provisioningserver/rpc/clusterservice.py (+72/-8) src/provisioningserver/rpc/common.py (+22/-5) src/provisioningserver/rpc/exceptions.py (+8/-0) src/provisioningserver/rpc/testing/__init__.py (+2/-1) src/provisioningserver/rpc/testing/doubles.py (+18/-0) src/provisioningserver/rpc/tests/test_clusterservice.py (+139/-31) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Adam Collard (community) | Needs Information | ||
MAAS Lander | Approve | ||
Alexsander de Souza | Approve | ||
Review via email: mp+428208@code.launchpad.net |
Commit message
allocate additional connections when busy
always connect max idle connections times
Description of the change
Allows RPC connections to scale up to a configured amount when busy.
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rpc_connection_
STATUS: FAILED
LOG: http://
COMMIT: ba71e215175a711
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rpc_connection_
STATUS: FAILED
LOG: http://
COMMIT: 74329f97f1dd1a3
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rpc_connection_
STATUS: FAILED
LOG: http://
COMMIT: 820009b533a8fbb
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rpc_connection_
STATUS: FAILED
LOG: http://
COMMIT: 47a4dfedbee9d93
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rpc_connection_
STATUS: FAILED
LOG: http://
COMMIT: 8f3912b1979ab12
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b rpc_connection_
STATUS: SUCCESS
COMMIT: 5f3b0b943fd0ca4
Adam Collard (adam-collard) : | # |
Adam Collard (adam-collard) wrote : | # |
How do the idle connections work with the _update_interval method of ClusterClientSe
That method wants to control how often to poll the region controller to establish connectivity, and has logic to compare the number of connected connections with the expected (which i think will go wrong with idle/bursting connections).
Adam Collard (adam-collard) wrote : | # |
This broke system tests and could do with a little more love.
Reverting in https:/
Preview Diff
1 | diff --git a/src/provisioningserver/config.py b/src/provisioningserver/config.py |
2 | index 97b6e68..9511ad6 100644 |
3 | --- a/src/provisioningserver/config.py |
4 | +++ b/src/provisioningserver/config.py |
5 | @@ -762,6 +762,23 @@ class ClusterConfiguration(Configuration, metaclass=ClusterConfigurationMeta): |
6 | ), |
7 | ) |
8 | |
9 | + # RPC Connection Pool options |
10 | + max_idle_rpc_connections = ConfigurationOption( |
11 | + "max_idle_rpc_connections", |
12 | + "The nominal number of connections to have per endpoint", |
13 | + Number(min=1, max=1024, if_missing=1), |
14 | + ) |
15 | + max_rpc_connections = ConfigurationOption( |
16 | + "max_rpc_connections", |
17 | + "The maximum number of connections to scale to when under load", |
18 | + Number(min=1, max=1024, if_missing=4), |
19 | + ) |
20 | + rpc_keepalive = ConfigurationOption( |
21 | + "rpc_keepalive", |
22 | + "The duration in miliseconds to keep added connections alive", |
23 | + Number(min=1, max=5000, if_missing=1000), |
24 | + ) |
25 | + |
26 | # TFTP options. |
27 | tftp_port = ConfigurationOption( |
28 | "tftp_port", |
29 | diff --git a/src/provisioningserver/plugin.py b/src/provisioningserver/plugin.py |
30 | index e987c73..00ff898 100644 |
31 | --- a/src/provisioningserver/plugin.py |
32 | +++ b/src/provisioningserver/plugin.py |
33 | @@ -139,7 +139,13 @@ class ProvisioningServiceMaker: |
34 | def _makeRPCService(self): |
35 | from provisioningserver.rpc.clusterservice import ClusterClientService |
36 | |
37 | - rpc_service = ClusterClientService(reactor) |
38 | + with ClusterConfiguration.open() as config: |
39 | + rpc_service = ClusterClientService( |
40 | + reactor, |
41 | + config.max_idle_rpc_connections, |
42 | + config.max_rpc_connections, |
43 | + config.rpc_keepalive, |
44 | + ) |
45 | rpc_service.setName("rpc") |
46 | return rpc_service |
47 | |
48 | diff --git a/src/provisioningserver/rackdservices/external.py b/src/provisioningserver/rackdservices/external.py |
49 | index ccabb74..5b8afe5 100644 |
50 | --- a/src/provisioningserver/rackdservices/external.py |
51 | +++ b/src/provisioningserver/rackdservices/external.py |
52 | @@ -68,8 +68,9 @@ class RackOnlyExternalService(metaclass=ABCMeta): |
53 | |
54 | # Filter the connects by region. |
55 | conn_per_region = defaultdict(set) |
56 | - for eventloop, connection in connections.items(): |
57 | - conn_per_region[eventloop.split(":")[0]].add(connection) |
58 | + for eventloop, connection_set in connections.items(): |
59 | + for connection in connection_set: |
60 | + conn_per_region[eventloop.split(":")[0]].add(connection) |
61 | for eventloop, connections in conn_per_region.items(): |
62 | # Sort the connections so the same IP is always picked per |
63 | # region controller. This ensures that the HTTP configuration |
64 | diff --git a/src/provisioningserver/rackdservices/http.py b/src/provisioningserver/rackdservices/http.py |
65 | index 421e35f..bda9d23 100644 |
66 | --- a/src/provisioningserver/rackdservices/http.py |
67 | +++ b/src/provisioningserver/rackdservices/http.py |
68 | @@ -101,8 +101,9 @@ class RackHTTPService(TimerService): |
69 | controller is connected to.""" |
70 | # Filter the connects by region. |
71 | conn_per_region = defaultdict(set) |
72 | - for eventloop, connection in self._rpc_service.connections.items(): |
73 | - conn_per_region[eventloop.split(":")[0]].add(connection) |
74 | + for eventloop, connection_set in self._rpc_service.connections.items(): |
75 | + for connection in connection_set: |
76 | + conn_per_region[eventloop.split(":")[0]].add(connection) |
77 | for _, connections in conn_per_region.items(): |
78 | # Sort the connections so the same IP is always picked per |
79 | # region controller. This ensures that the HTTP configuration |
80 | diff --git a/src/provisioningserver/rackdservices/tests/test_external.py b/src/provisioningserver/rackdservices/tests/test_external.py |
81 | index ad214a1..0cb8601 100644 |
82 | --- a/src/provisioningserver/rackdservices/tests/test_external.py |
83 | +++ b/src/provisioningserver/rackdservices/tests/test_external.py |
84 | @@ -430,7 +430,8 @@ class TestRackDNS(MAASTestCase): |
85 | return frozenset( |
86 | { |
87 | client.address[0] |
88 | - for _, client in rpc_service.connections.items() |
89 | + for _, clients in rpc_service.connections.items() |
90 | + for client in clients |
91 | } |
92 | ) |
93 | |
94 | @@ -609,7 +610,7 @@ class TestRackDNS(MAASTestCase): |
95 | ip = factory.make_ip_address() |
96 | mock_conn = Mock() |
97 | mock_conn.address = (ip, random.randint(5240, 5250)) |
98 | - mock_rpc.connections[eventloop] = mock_conn |
99 | + mock_rpc.connections[eventloop] = {mock_conn} |
100 | |
101 | dns = external.RackDNS() |
102 | region_ips = list(dns._genRegionIps(mock_rpc.connections)) |
103 | @@ -626,7 +627,7 @@ class TestRackDNS(MAASTestCase): |
104 | ip = factory.make_ip_address() |
105 | mock_conn = Mock() |
106 | mock_conn.address = (ip, random.randint(5240, 5250)) |
107 | - mock_rpc.connections[eventloop] = mock_conn |
108 | + mock_rpc.connections[eventloop] = {mock_conn} |
109 | |
110 | dns = external.RackDNS() |
111 | region_ips = frozenset(dns._genRegionIps(mock_rpc.connections)) |
112 | @@ -659,7 +660,8 @@ class TestRackProxy(MAASTestCase): |
113 | return frozenset( |
114 | { |
115 | client.address[0] |
116 | - for _, client in rpc_service.connections.items() |
117 | + for _, clients in rpc_service.connections.items() |
118 | + for client in clients |
119 | } |
120 | ) |
121 | |
122 | @@ -824,7 +826,8 @@ class TestRackSyslog(MAASTestCase): |
123 | return frozenset( |
124 | { |
125 | (eventloop, client.address[0]) |
126 | - for eventloop, client in rpc_service.connections.items() |
127 | + for eventloop, clients in rpc_service.connections.items() |
128 | + for client in clients |
129 | } |
130 | ) |
131 | |
132 | diff --git a/src/provisioningserver/rackdservices/tests/test_http.py b/src/provisioningserver/rackdservices/tests/test_http.py |
133 | index bc43c66..43cb495 100644 |
134 | --- a/src/provisioningserver/rackdservices/tests/test_http.py |
135 | +++ b/src/provisioningserver/rackdservices/tests/test_http.py |
136 | @@ -92,7 +92,8 @@ class TestRackHTTPService(MAASTestCase): |
137 | return frozenset( |
138 | { |
139 | client.address[0] |
140 | - for _, client in rpc_service.connections.items() |
141 | + for _, clients in rpc_service.connections.items() |
142 | + for client in clients |
143 | } |
144 | ) |
145 | |
146 | @@ -208,7 +209,7 @@ class TestRackHTTPService(MAASTestCase): |
147 | ip = factory.make_ip_address() |
148 | mock_conn = Mock() |
149 | mock_conn.address = (ip, random.randint(5240, 5250)) |
150 | - mock_rpc.connections[eventloop] = mock_conn |
151 | + mock_rpc.connections[eventloop] = {mock_conn} |
152 | |
153 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) |
154 | region_ips = list(service._genRegionIps()) |
155 | @@ -225,7 +226,7 @@ class TestRackHTTPService(MAASTestCase): |
156 | ip = factory.make_ip_address() |
157 | mock_conn = Mock() |
158 | mock_conn.address = (ip, random.randint(5240, 5250)) |
159 | - mock_rpc.connections[eventloop] = mock_conn |
160 | + mock_rpc.connections[eventloop] = {mock_conn} |
161 | |
162 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) |
163 | region_ips = frozenset(service._genRegionIps()) |
164 | @@ -244,7 +245,7 @@ class TestRackHTTPService(MAASTestCase): |
165 | ip_addresses.add("[%s]" % ip) |
166 | mock_conn = Mock() |
167 | mock_conn.address = (ip, random.randint(5240, 5250)) |
168 | - mock_rpc.connections[eventloop] = mock_conn |
169 | + mock_rpc.connections[eventloop] = {mock_conn} |
170 | |
171 | service = http.RackHTTPService(self.make_dir(), mock_rpc, reactor) |
172 | region_ips = set(service._genRegionIps()) |
173 | diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py |
174 | index c92d48a..7b94e49 100644 |
175 | --- a/src/provisioningserver/rpc/clusterservice.py |
176 | +++ b/src/provisioningserver/rpc/clusterservice.py |
177 | @@ -999,6 +999,7 @@ class ClusterClient(Cluster): |
178 | # Events for this protocol's life-cycle. |
179 | self.authenticated = DeferredValue() |
180 | self.ready = DeferredValue() |
181 | + self.in_use = False |
182 | self.localIdent = None |
183 | |
184 | @property |
185 | @@ -1201,7 +1202,7 @@ class ClusterClientService(TimerService): |
186 | |
187 | time_started = None |
188 | |
189 | - def __init__(self, reactor): |
190 | + def __init__(self, reactor, max_idle_conns=1, max_conns=1, keepalive=1000): |
191 | super().__init__(self._calculate_interval(None, None), self._tryUpdate) |
192 | self.connections = {} |
193 | self.try_connections = {} |
194 | @@ -1224,10 +1225,40 @@ class ClusterClientService(TimerService): |
195 | self._updateInProgress = DeferredValue() |
196 | self._updateInProgress.set(None) |
197 | |
198 | + # The maximum number of connections to allways allocate per eventloop |
199 | + self._max_idle_connections = max_idle_conns |
200 | + # The maximum number of connections to allocate while under load per eventloop |
201 | + self._max_connections = max_conns |
202 | + # The duration in milliseconds to keep scaled up connections alive |
203 | + self._keepalive = keepalive |
204 | + |
205 | def startService(self): |
206 | self.time_started = self.clock.seconds() |
207 | super().startService() |
208 | |
209 | + def _reap_extra_connection(self, eventloop, conn): |
210 | + if not conn.in_use: |
211 | + self._drop_connection(conn) |
212 | + return self._remove_connection(eventloop, conn) |
213 | + return self.clock.callLater( |
214 | + self._keepalive, self._reap_extra_connection, conn |
215 | + ) |
216 | + |
217 | + @inlineCallbacks |
218 | + def _scale_up_connections(self): |
219 | + for ev, ev_conns in self.connections.items(): |
220 | + # pick first group with room for additional conns |
221 | + if len(ev_conns) < self._max_connections: |
222 | + # spawn an extra connection |
223 | + conn_to_clone = random.choice(list(ev_conns)) |
224 | + conn = yield self._make_connection(ev, conn_to_clone.address) |
225 | + self.connections[ev].add(conn) |
226 | + self.clock.callLater( |
227 | + self._keepalive, self._reap_extra_connection, ev, conn |
228 | + ) |
229 | + return |
230 | + raise exceptions.MaxConnectionsOpen |
231 | + |
232 | def getClient(self): |
233 | """Returns a :class:`common.Client` connected to a region. |
234 | |
235 | @@ -1236,11 +1267,22 @@ class ClusterClientService(TimerService): |
236 | :raises: :py:class:`~.exceptions.NoConnectionsAvailable` when |
237 | there are no open connections to a region controller. |
238 | """ |
239 | - conns = list(self.connections.values()) |
240 | + conns = [ |
241 | + conn for conn_set in self.connections.values() for conn in conn_set |
242 | + ] |
243 | if len(conns) == 0: |
244 | raise exceptions.NoConnectionsAvailable() |
245 | else: |
246 | - return common.Client(random.choice(conns)) |
247 | + free_conns = [conn for conn in conns if not conn.in_use] |
248 | + if len(free_conns) > 0: |
249 | + return common.Client(random.choice(free_conns)) |
250 | + else: |
251 | + for endpoint_conns in self.connections.values(): |
252 | + if len(endpoint_conns) < self._max_connections: |
253 | + # caller should create a new connection |
254 | + raise exceptions.AllConnectionsBusy |
255 | + # return a busy connection, assume it will free up or timeout |
256 | + return common.Client(random.choice(conns)) |
257 | |
258 | @deferred |
259 | def getClientNow(self): |
260 | @@ -1259,10 +1301,18 @@ class ClusterClientService(TimerService): |
261 | return self.getClient() |
262 | except exceptions.NoConnectionsAvailable: |
263 | return self._tryUpdate().addCallback(call, self.getClient) |
264 | + except exceptions.AllConnectionsBusy: |
265 | + return self._scale_up_connections().addCallback( |
266 | + call, self.getClient |
267 | + ) |
268 | |
269 | def getAllClients(self): |
270 | """Return a list of all connected :class:`common.Client`s.""" |
271 | - return [common.Client(conn) for conn in self.connections.values()] |
272 | + return [ |
273 | + common.Client(conn) |
274 | + for conns in self.connections.values() |
275 | + for conn in conns |
276 | + ] |
277 | |
278 | def _tryUpdate(self): |
279 | """Attempt to refresh outgoing connections. |
280 | @@ -1391,7 +1441,9 @@ class ClusterClientService(TimerService): |
281 | """Update the saved RPC info state.""" |
282 | # Build a list of addresses based on the current connections. |
283 | connected_addr = { |
284 | - conn.address[0] for _, conn in self.connections.items() |
285 | + conn.address[0] |
286 | + for _, conns in self.connections.items() |
287 | + for conn in conns |
288 | } |
289 | if ( |
290 | self._rpc_info_state is None |
291 | @@ -1761,6 +1813,7 @@ class ClusterClientService(TimerService): |
292 | """Drop the given `connection`.""" |
293 | return connection.transport.loseConnection() |
294 | |
295 | + @inlineCallbacks |
296 | def add_connection(self, eventloop, connection): |
297 | """Add the connection to the tracked connections. |
298 | |
299 | @@ -1769,7 +1822,16 @@ class ClusterClientService(TimerService): |
300 | """ |
301 | if eventloop in self.try_connections: |
302 | del self.try_connections[eventloop] |
303 | - self.connections[eventloop] = connection |
304 | + if not self.connections.get(eventloop): |
305 | + self.connections[eventloop] = set() |
306 | + self.connections[eventloop].add(connection) |
307 | + # clone connection to equal num idle connections |
308 | + if self._max_idle_connections - 1 > 0: |
309 | + for _ in range(self._max_idle_connections - 1): |
310 | + extra_conn = yield self._make_connection( |
311 | + connection.eventloop, connection.address |
312 | + ) |
313 | + self.connections[eventloop].add(extra_conn) |
314 | self._update_saved_rpc_info_state() |
315 | |
316 | def remove_connection(self, eventloop, connection): |
317 | @@ -1782,8 +1844,10 @@ class ClusterClientService(TimerService): |
318 | if self.try_connections[eventloop] is connection: |
319 | del self.try_connections[eventloop] |
320 | if eventloop in self.connections: |
321 | - if self.connections[eventloop] is connection: |
322 | - del self.connections[eventloop] |
323 | + if connection in self.connections.get(eventloop, set()): |
324 | + self.connections[eventloop].discard(connection) |
325 | + if len(self.connections[eventloop]) == 0: |
326 | + del self.connections[eventloop] |
327 | # Disable DHCP when no connections to a region controller. |
328 | if len(self.connections) == 0: |
329 | stopping_services = [] |
330 | diff --git a/src/provisioningserver/rpc/common.py b/src/provisioningserver/rpc/common.py |
331 | index 5d67bba..40e091f 100644 |
332 | --- a/src/provisioningserver/rpc/common.py |
333 | +++ b/src/provisioningserver/rpc/common.py |
334 | @@ -14,7 +14,11 @@ from twisted.python.failure import Failure |
335 | from provisioningserver.logger import LegacyLogger |
336 | from provisioningserver.prometheus.metrics import PROMETHEUS_METRICS |
337 | from provisioningserver.rpc.interfaces import IConnection, IConnectionToRegion |
338 | -from provisioningserver.utils.twisted import asynchronous, deferWithTimeout |
339 | +from provisioningserver.utils.twisted import ( |
340 | + asynchronous, |
341 | + callOut, |
342 | + deferWithTimeout, |
343 | +) |
344 | |
345 | log = LegacyLogger() |
346 | |
347 | @@ -156,6 +160,11 @@ class Client: |
348 | :return: A deferred result. Call its `wait` method (with a timeout |
349 | in seconds) to block on the call's completion. |
350 | """ |
351 | + self._conn.in_use = True |
352 | + |
353 | + def _free_conn(): |
354 | + self._conn.in_use = False |
355 | + |
356 | if len(args) != 0: |
357 | receiver_name = "{}.{}".format( |
358 | self.__module__, |
359 | @@ -171,11 +180,19 @@ class Client: |
360 | if timeout is undefined: |
361 | timeout = 120 # 2 minutes |
362 | if timeout is None or timeout <= 0: |
363 | - return self._conn.callRemote(cmd, **kwargs) |
364 | + d = self._conn.callRemote(cmd, **kwargs) |
365 | + if isinstance(d, Deferred): |
366 | + d.addBoth(lambda x: callOut(x, _free_conn)) |
367 | + else: |
368 | + _free_conn() |
369 | + return d |
370 | else: |
371 | - return deferWithTimeout( |
372 | - timeout, self._conn.callRemote, cmd, **kwargs |
373 | - ) |
374 | + d = deferWithTimeout(timeout, self._conn.callRemote, cmd, **kwargs) |
375 | + if isinstance(d, Deferred): |
376 | + d.addBoth(lambda x: callOut(x, _free_conn)) |
377 | + else: |
378 | + _free_conn() |
379 | + return d |
380 | |
381 | @asynchronous |
382 | def getHostCertificate(self): |
383 | diff --git a/src/provisioningserver/rpc/exceptions.py b/src/provisioningserver/rpc/exceptions.py |
384 | index 7ee4f3f..136e471 100644 |
385 | --- a/src/provisioningserver/rpc/exceptions.py |
386 | +++ b/src/provisioningserver/rpc/exceptions.py |
387 | @@ -12,6 +12,14 @@ class NoConnectionsAvailable(Exception): |
388 | self.uuid = uuid |
389 | |
390 | |
391 | +class AllConnectionsBusy(Exception): |
392 | + """The current connection pool is busy""" |
393 | + |
394 | + |
395 | +class MaxConnectionsOpen(Exception): |
396 | + """The maxmimum number of connections are currently open""" |
397 | + |
398 | + |
399 | class NoSuchEventType(Exception): |
400 | """The specified event type was not found.""" |
401 | |
402 | diff --git a/src/provisioningserver/rpc/testing/__init__.py b/src/provisioningserver/rpc/testing/__init__.py |
403 | index ee4a9e2..1b2f94f 100644 |
404 | --- a/src/provisioningserver/rpc/testing/__init__.py |
405 | +++ b/src/provisioningserver/rpc/testing/__init__.py |
406 | @@ -262,7 +262,8 @@ class MockClusterToRegionRPCFixtureBase(fixtures.Fixture, metaclass=ABCMeta): |
407 | { |
408 | "eventloops": { |
409 | eventloop: [client.address] |
410 | - for eventloop, client in connections |
411 | + for eventloop, clients in connections |
412 | + for client in clients |
413 | } |
414 | }, |
415 | orig_url, |
416 | diff --git a/src/provisioningserver/rpc/testing/doubles.py b/src/provisioningserver/rpc/testing/doubles.py |
417 | index cb9f27f..0785859 100644 |
418 | --- a/src/provisioningserver/rpc/testing/doubles.py |
419 | +++ b/src/provisioningserver/rpc/testing/doubles.py |
420 | @@ -30,6 +30,7 @@ class FakeConnection: |
421 | ident = attr.ib(default=sentinel.ident) |
422 | hostCertificate = attr.ib(default=sentinel.hostCertificate) |
423 | peerCertificate = attr.ib(default=sentinel.peerCertificate) |
424 | + in_use = attr.ib(default=False) |
425 | |
426 | def callRemote(self, cmd, **arguments): |
427 | return succeed(sentinel.response) |
428 | @@ -48,6 +49,7 @@ class FakeConnectionToRegion: |
429 | address = attr.ib(default=(sentinel.host, sentinel.port)) |
430 | hostCertificate = attr.ib(default=sentinel.hostCertificate) |
431 | peerCertificate = attr.ib(default=sentinel.peerCertificate) |
432 | + in_use = attr.ib(default=False) |
433 | |
434 | def callRemote(self, cmd, **arguments): |
435 | return succeed(sentinel.response) |
436 | @@ -56,6 +58,22 @@ class FakeConnectionToRegion: |
437 | verifyObject(IConnectionToRegion, FakeConnectionToRegion()) |
438 | |
439 | |
440 | +@attr.s(eq=False, order=False) |
441 | +@implementer(IConnectionToRegion) |
442 | +class FakeBusyConnectionToRegion: |
443 | + "A fake `IConnectionToRegion` that appears busy." "" |
444 | + |
445 | + ident = attr.ib(default=sentinel.ident) |
446 | + localIdent = attr.ib(default=sentinel.localIdent) |
447 | + address = attr.ib(default=(sentinel.host, sentinel.port)) |
448 | + hostCertificate = attr.ib(default=sentinel.hostCertificate) |
449 | + peerCertificate = attr.ib(default=sentinel.peerCertificate) |
450 | + in_use = attr.ib(default=True) |
451 | + |
452 | + def callRemote(self, cmd, **arguments): |
453 | + return succeed(sentinel.response) |
454 | + |
455 | + |
456 | class StubOS(OperatingSystem): |
457 | """An :py:class:`OperatingSystem` subclass that has canned answers. |
458 | |
459 | diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py |
460 | index b50311d..fab455a 100644 |
461 | --- a/src/provisioningserver/rpc/tests/test_clusterservice.py |
462 | +++ b/src/provisioningserver/rpc/tests/test_clusterservice.py |
463 | @@ -117,7 +117,11 @@ from provisioningserver.rpc.testing import ( |
464 | call_responder, |
465 | MockLiveClusterToRegionRPCFixture, |
466 | ) |
467 | -from provisioningserver.rpc.testing.doubles import DummyConnection, StubOS |
468 | +from provisioningserver.rpc.testing.doubles import ( |
469 | + FakeBusyConnectionToRegion, |
470 | + FakeConnection, |
471 | + StubOS, |
472 | +) |
473 | from provisioningserver.security import set_shared_secret_on_filesystem |
474 | from provisioningserver.service_monitor import service_monitor |
475 | from provisioningserver.testing.config import ClusterConfigurationFixture |
476 | @@ -444,8 +448,10 @@ class TestClusterProtocol_DescribePowerTypes(MAASTestCase): |
477 | ) |
478 | |
479 | |
480 | -def make_inert_client_service(): |
481 | - service = ClusterClientService(Clock()) |
482 | +def make_inert_client_service(max_idle_conns=1, max_conns=1, keepalive=1): |
483 | + service = ClusterClientService( |
484 | + Clock(), max_idle_conns, max_conns, keepalive |
485 | + ) |
486 | # ClusterClientService's superclass, TimerService, creates a |
487 | # LoopingCall with now=True. We neuter it here to allow |
488 | # observation of the behaviour of _update_interval() for |
489 | @@ -499,10 +505,10 @@ class TestClusterClientService(MAASTestCase): |
490 | |
491 | # Fake some connections. |
492 | service.connections = { |
493 | - ipv4client.eventloop: ipv4client, |
494 | - ipv6client.eventloop: ipv6client, |
495 | - ipv6mapped.eventloop: ipv6mapped, |
496 | - hostclient.eventloop: hostclient, |
497 | + ipv4client.eventloop: {ipv4client}, |
498 | + ipv6client.eventloop: {ipv6client}, |
499 | + ipv6mapped.eventloop: {ipv6mapped}, |
500 | + hostclient.eventloop: {hostclient}, |
501 | } |
502 | |
503 | # Update the RPC state to the filesystem and info cache. |
504 | @@ -515,7 +521,8 @@ class TestClusterClientService(MAASTestCase): |
505 | Equals( |
506 | { |
507 | client.address[0] |
508 | - for _, client in service.connections.items() |
509 | + for _, clients in service.connections.items() |
510 | + for client in clients |
511 | } |
512 | ), |
513 | ) |
514 | @@ -1234,7 +1241,7 @@ class TestClusterClientService(MAASTestCase): |
515 | connection = Mock() |
516 | connection.address = (":::ffff", 2222) |
517 | service.add_connection(endpoint, connection) |
518 | - self.assertThat(service.connections, Equals({endpoint: connection})) |
519 | + self.assertEqual(service.connections, {endpoint: {connection}}) |
520 | |
521 | def test_add_connection_calls__update_saved_rpc_info_state(self): |
522 | service = make_inert_client_service() |
523 | @@ -1248,6 +1255,30 @@ class TestClusterClientService(MAASTestCase): |
524 | service._update_saved_rpc_info_state, MockCalledOnceWith() |
525 | ) |
526 | |
527 | + def test_add_connection_creates_max_idle_connections(self): |
528 | + service = make_inert_client_service(max_idle_conns=2) |
529 | + service.startService() |
530 | + endpoint = Mock() |
531 | + connection = Mock() |
532 | + connection.address = (":::ffff", 2222) |
533 | + connection2 = Mock() |
534 | + connection.address = (":::ffff", 2222) |
535 | + self.patch(service, "_make_connection").return_value = succeed( |
536 | + connection2 |
537 | + ) |
538 | + self.patch_autospec(service, "_update_saved_rpc_info_state") |
539 | + service.add_connection(endpoint, connection) |
540 | + self.assertEqual( |
541 | + len( |
542 | + [ |
543 | + conn |
544 | + for conns in service.connections.values() |
545 | + for conn in conns |
546 | + ] |
547 | + ), |
548 | + service._max_idle_connections, |
549 | + ) |
550 | + |
551 | def test_remove_connection_removes_from_try_connections(self): |
552 | service = make_inert_client_service() |
553 | service.startService() |
554 | @@ -1262,7 +1293,7 @@ class TestClusterClientService(MAASTestCase): |
555 | service.startService() |
556 | endpoint = Mock() |
557 | connection = Mock() |
558 | - service.connections[endpoint] = connection |
559 | + service.connections[endpoint] = {connection} |
560 | service.remove_connection(endpoint, connection) |
561 | self.assertThat(service.connections, Equals({})) |
562 | |
563 | @@ -1271,7 +1302,7 @@ class TestClusterClientService(MAASTestCase): |
564 | service.startService() |
565 | endpoint = Mock() |
566 | connection = Mock() |
567 | - service.connections[endpoint] = connection |
568 | + service.connections[endpoint] = {connection} |
569 | service.remove_connection(endpoint, connection) |
570 | self.assertEqual(service.step, service.INTERVAL_LOW) |
571 | |
572 | @@ -1280,7 +1311,7 @@ class TestClusterClientService(MAASTestCase): |
573 | service.startService() |
574 | endpoint = Mock() |
575 | connection = Mock() |
576 | - service.connections[endpoint] = connection |
577 | + service.connections[endpoint] = {connection} |
578 | |
579 | # Enable both dhcpd and dhcpd6. |
580 | service_monitor.getServiceByName("dhcpd").on() |
581 | @@ -1295,13 +1326,17 @@ class TestClusterClientService(MAASTestCase): |
582 | def test_getClient(self): |
583 | service = ClusterClientService(Clock()) |
584 | service.connections = { |
585 | - sentinel.eventloop01: DummyConnection(), |
586 | - sentinel.eventloop02: DummyConnection(), |
587 | - sentinel.eventloop03: DummyConnection(), |
588 | + sentinel.eventloop01: {FakeConnection()}, |
589 | + sentinel.eventloop02: {FakeConnection()}, |
590 | + sentinel.eventloop03: {FakeConnection()}, |
591 | } |
592 | self.assertIn( |
593 | service.getClient(), |
594 | - {common.Client(conn) for conn in service.connections.values()}, |
595 | + { |
596 | + common.Client(conn) |
597 | + for conns in service.connections.values() |
598 | + for conn in conns |
599 | + }, |
600 | ) |
601 | |
602 | def test_getClient_when_there_are_no_connections(self): |
603 | @@ -1310,17 +1345,65 @@ class TestClusterClientService(MAASTestCase): |
604 | self.assertRaises(exceptions.NoConnectionsAvailable, service.getClient) |
605 | |
606 | @inlineCallbacks |
607 | + def test_getClientNow_scales_connections_when_busy(self): |
608 | + service = ClusterClientService(Clock(), max_conns=2) |
609 | + service.connections = { |
610 | + sentinel.eventloop01: {FakeBusyConnectionToRegion()}, |
611 | + sentinel.eventloop02: {FakeBusyConnectionToRegion()}, |
612 | + sentinel.eventloop03: {FakeBusyConnectionToRegion()}, |
613 | + } |
614 | + self.patch(service, "_make_connection").return_value = succeed( |
615 | + FakeConnection() |
616 | + ) |
617 | + original_conns = [ |
618 | + conn for conns in service.connections.values() for conn in conns |
619 | + ] |
620 | + new_client = yield service.getClientNow() |
621 | + new_conn = new_client._conn |
622 | + self.assertIsNotNone(new_conn) |
623 | + self.assertNotIn(new_conn, original_conns) |
624 | + self.assertIn( |
625 | + new_conn, |
626 | + [conn for conns in service.connections.values() for conn in conns], |
627 | + ) |
628 | + |
629 | + @inlineCallbacks |
630 | + def test_getClientNow_returns_an_existing_connection_when_max_are_open( |
631 | + self, |
632 | + ): |
633 | + service = ClusterClientService(Clock(), max_conns=1) |
634 | + service.connections = { |
635 | + sentinel.eventloop01: {FakeBusyConnectionToRegion()}, |
636 | + sentinel.eventloop02: {FakeBusyConnectionToRegion()}, |
637 | + sentinel.eventloop03: {FakeBusyConnectionToRegion()}, |
638 | + } |
639 | + self.patch(service, "_make_connection").return_value = succeed( |
640 | + FakeConnection() |
641 | + ) |
642 | + original_conns = [ |
643 | + conn for conns in service.connections.values() for conn in conns |
644 | + ] |
645 | + new_client = yield service.getClientNow() |
646 | + new_conn = new_client._conn |
647 | + self.assertIsNotNone(new_conn) |
648 | + self.assertIn(new_conn, original_conns) |
649 | + |
650 | + @inlineCallbacks |
651 | def test_getClientNow_returns_current_connection(self): |
652 | service = ClusterClientService(Clock()) |
653 | service.connections = { |
654 | - sentinel.eventloop01: DummyConnection(), |
655 | - sentinel.eventloop02: DummyConnection(), |
656 | - sentinel.eventloop03: DummyConnection(), |
657 | + sentinel.eventloop01: {FakeConnection()}, |
658 | + sentinel.eventloop02: {FakeConnection()}, |
659 | + sentinel.eventloop03: {FakeConnection()}, |
660 | } |
661 | client = yield service.getClientNow() |
662 | self.assertIn( |
663 | client, |
664 | - {common.Client(conn) for conn in service.connections.values()}, |
665 | + { |
666 | + common.Client(conn) |
667 | + for conns in service.connections.values() |
668 | + for conn in conns |
669 | + }, |
670 | ) |
671 | |
672 | @inlineCallbacks |
673 | @@ -1330,9 +1413,9 @@ class TestClusterClientService(MAASTestCase): |
674 | |
675 | def addConnections(): |
676 | service.connections = { |
677 | - sentinel.eventloop01: DummyConnection(), |
678 | - sentinel.eventloop02: DummyConnection(), |
679 | - sentinel.eventloop03: DummyConnection(), |
680 | + sentinel.eventloop01: {FakeConnection()}, |
681 | + sentinel.eventloop02: {FakeConnection()}, |
682 | + sentinel.eventloop03: {FakeConnection()}, |
683 | } |
684 | return succeed(None) |
685 | |
686 | @@ -1340,7 +1423,11 @@ class TestClusterClientService(MAASTestCase): |
687 | client = yield service.getClientNow() |
688 | self.assertIn( |
689 | client, |
690 | - {common.Client(conn) for conn in service.connections.values()}, |
691 | + { |
692 | + common.Client(conn) |
693 | + for conns in service.connections.values() |
694 | + for conn in conns |
695 | + }, |
696 | ) |
697 | |
698 | def test_getClientNow_raises_exception_when_no_clients(self): |
699 | @@ -1383,11 +1470,11 @@ class TestClusterClientService(MAASTestCase): |
700 | def test_getAllClients(self): |
701 | service = ClusterClientService(Clock()) |
702 | uuid1 = factory.make_UUID() |
703 | - c1 = DummyConnection() |
704 | - service.connections[uuid1] = c1 |
705 | + c1 = FakeConnection() |
706 | + service.connections[uuid1] = {c1} |
707 | uuid2 = factory.make_UUID() |
708 | - c2 = DummyConnection() |
709 | - service.connections[uuid2] = c2 |
710 | + c2 = FakeConnection() |
711 | + service.connections[uuid2] = {c2} |
712 | clients = service.getAllClients() |
713 | self.assertEqual(clients, [common.Client(c1), common.Client(c2)]) |
714 | |
715 | @@ -1396,6 +1483,26 @@ class TestClusterClientService(MAASTestCase): |
716 | service.connections = {} |
717 | self.assertThat(service.getAllClients(), Equals([])) |
718 | |
719 | + @inlineCallbacks |
720 | + def test__reap_extra_connection_reaps_a_scaled_up_connection(self): |
721 | + clock = Clock() |
722 | + service = ClusterClientService(clock, max_conns=2, keepalive=0) |
723 | + service.connections = { |
724 | + sentinel.eventloop01: {FakeBusyConnectionToRegion()}, |
725 | + sentinel.eventloop02: {FakeBusyConnectionToRegion()}, |
726 | + sentinel.eventloop03: {FakeBusyConnectionToRegion()}, |
727 | + } |
728 | + self.patch(service, "_make_connection").return_value = succeed( |
729 | + FakeConnection() |
730 | + ) |
731 | + reap_call = self.patch(service, "_reap_extra_connection") |
732 | + new_client = yield service.getClientNow() |
733 | + delayed_calls = clock.getDelayedCalls() |
734 | + self.assertEqual(len(delayed_calls), 1) |
735 | + delayed_call = delayed_calls[0] |
736 | + self.assertIn(new_client._conn, delayed_call.args) |
737 | + self.assertEqual(reap_call.__name__, delayed_call.func.__name__) |
738 | + |
739 | |
740 | class TestClusterClientServiceIntervals(MAASTestCase): |
741 | |
742 | @@ -1562,14 +1669,14 @@ class TestClusterClient(MAASTestCase): |
743 | self.assertEqual(client.eventloop, extract_result(wait_for_ready)) |
744 | self.assertEqual(client.service.try_connections, {}) |
745 | self.assertEqual( |
746 | - client.service.connections, {client.eventloop: client} |
747 | + client.service.connections, {client.eventloop: {client}} |
748 | ) |
749 | |
750 | def test_disconnects_when_there_is_an_existing_connection(self): |
751 | client = self.make_running_client() |
752 | |
753 | # Pretend that a connection already exists for this address. |
754 | - client.service.connections[client.eventloop] = sentinel.connection |
755 | + client.service.connections[client.eventloop] = {sentinel.connection} |
756 | |
757 | # Connect via an in-memory transport. |
758 | transport = StringTransportWithDisconnection() |
759 | @@ -1586,7 +1693,8 @@ class TestClusterClient(MAASTestCase): |
760 | # The connections list is unchanged because the new connection |
761 | # immediately disconnects. |
762 | self.assertEqual( |
763 | - client.service.connections, {client.eventloop: sentinel.connection} |
764 | + client.service.connections, |
765 | + {client.eventloop: {sentinel.connection}}, |
766 | ) |
767 | self.assertFalse(client.connected) |
768 | self.assertIsNone(client.transport) |
+1