Merge ~blake-rouse/maas:ha-fixes-2.2 into maas:2.2
- Git
- lp:~blake-rouse/maas
- ha-fixes-2.2
- Merge into 2.2
Proposed by
Blake Rouse
Status: | Merged | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Merge reported by: | Blake Rouse | ||||||||||||
Merged at revision: | d873fad043e7015d3594d6ec3272cc0f778e52b8 | ||||||||||||
Proposed branch: | ~blake-rouse/maas:ha-fixes-2.2 | ||||||||||||
Merge into: | maas:2.2 | ||||||||||||
Diff against target: |
874 lines (+509/-34) 13 files modified
src/maasserver/locks.py (+9/-6) src/maasserver/models/node.py (+12/-2) src/maasserver/models/tests/test_node.py (+1/-1) src/maasserver/rpc/rackcontrollers.py (+1/-1) src/maasserver/rpc/regionservice.py (+19/-3) src/maasserver/rpc/tests/test_rackcontrollers.py (+2/-2) src/maasserver/rpc/tests/test_regionservice.py (+37/-0) src/maasserver/utils/orm.py (+11/-0) src/maasserver/utils/tests/test_orm.py (+16/-0) src/provisioningserver/rackdservices/tests/test_tftp.py (+223/-0) src/provisioningserver/rackdservices/tftp.py (+41/-1) src/provisioningserver/rpc/clusterservice.py (+89/-17) src/provisioningserver/rpc/tests/test_clusterservice.py (+48/-1) |
||||||||||||
Related bugs: |
|
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Blake Rouse (community) | Approve | ||
Review via email: mp+332862@code.launchpad.net |
Description of the change
To post a comment you must log in.
Revision history for this message
Blake Rouse (blake-rouse) wrote : | # |
Revision history for this message
Blake Rouse (blake-rouse) wrote : | # |
Last CI failed because of bad import, fixed and new CI run:
Revision history for this message
Blake Rouse (blake-rouse) wrote : | # |
Passed CI
Self-approving backport.
review:
Approve
Revision history for this message
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b ha-fixes-2.2 lp:~blake-rouse/maas into -b 2.2 lp:~maas-committers/maas
STATUS: FAILED BUILD
LOG: http://
Revision history for this message
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b ha-fixes-2.2 lp:~blake-rouse/maas into -b 2.2 lp:~maas-committers/maas
STATUS: FAILED BUILD
LOG: http://
Revision history for this message
MAAS Lander (maas-lander) wrote : | # |
LANDING
-b ha-fixes-2.2 lp:~blake-rouse/maas into -b 2.2 lp:~maas-committers/maas
STATUS: FAILED MERGE
LOG: http://
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/src/maasserver/locks.py b/src/maasserver/locks.py | |||
2 | index b482f5c..f68539e 100644 | |||
3 | --- a/src/maasserver/locks.py | |||
4 | +++ b/src/maasserver/locks.py | |||
5 | @@ -9,7 +9,6 @@ __all__ = [ | |||
6 | 9 | "eventloop", | 9 | "eventloop", |
7 | 10 | "import_images", | 10 | "import_images", |
8 | 11 | "node_acquire", | 11 | "node_acquire", |
9 | 12 | "rack_registration", | ||
10 | 13 | "security", | 12 | "security", |
11 | 14 | "startup", | 13 | "startup", |
12 | 15 | ] | 14 | ] |
13 | @@ -19,7 +18,11 @@ from maasserver.utils.dblocks import ( | |||
14 | 19 | DatabaseXactLock, | 18 | DatabaseXactLock, |
15 | 20 | ) | 19 | ) |
16 | 21 | 20 | ||
18 | 22 | # Lock around starting-up a MAAS region. | 21 | # Lock around starting-up a MAAS region and connection of rack controllers. |
19 | 22 | # This can be a problem where a region controller and a rack controller try | ||
20 | 23 | # to create there node objects at the same time. Rack registration also | ||
21 | 24 | # involves populating fabrics, VLANs, and other information that may overlap | ||
22 | 25 | # between rack controller. | ||
23 | 23 | startup = DatabaseLock(1) | 26 | startup = DatabaseLock(1) |
24 | 24 | 27 | ||
25 | 25 | # Lock around performing critical security-related operations, like | 28 | # Lock around performing critical security-related operations, like |
26 | @@ -41,10 +44,10 @@ node_acquire = DatabaseXactLock(7) | |||
27 | 41 | # Lock to help with concurrent allocation of IP addresses. | 44 | # Lock to help with concurrent allocation of IP addresses. |
28 | 42 | address_allocation = DatabaseLock(8) | 45 | address_allocation = DatabaseLock(8) |
29 | 43 | 46 | ||
34 | 44 | # Lock to prevent concurrent registration of rack controllers. This can be a | 47 | # Lock used to be used just for rack registration. Because of lp:1705594 this |
35 | 45 | # problem because registration involves populating fabrics, VLANs, and other | 48 | # was consolidated into the startup lock with the region controller. |
36 | 46 | # information that may overlap between rack controller. | 49 | # DO NOT USE '9' AGAIN, it is reserved so it doesn't break upgrades. |
37 | 47 | rack_registration = DatabaseLock(9) | 50 | # rack_registration = DatabaseLock(9) |
38 | 48 | 51 | ||
39 | 49 | # Lock to prevent concurrent network scanning. | 52 | # Lock to prevent concurrent network scanning. |
40 | 50 | try_active_discovery = DatabaseLock(10).TRY | 53 | try_active_discovery = DatabaseLock(10).TRY |
41 | diff --git a/src/maasserver/models/node.py b/src/maasserver/models/node.py | |||
42 | index f6ee6cc..e0c95fe 100644 | |||
43 | --- a/src/maasserver/models/node.py | |||
44 | +++ b/src/maasserver/models/node.py | |||
45 | @@ -54,7 +54,10 @@ from django.db.models import ( | |||
46 | 54 | ) | 54 | ) |
47 | 55 | from django.db.models.query import QuerySet | 55 | from django.db.models.query import QuerySet |
48 | 56 | from django.shortcuts import get_object_or_404 | 56 | from django.shortcuts import get_object_or_404 |
50 | 57 | from maasserver import DefaultMeta | 57 | from maasserver import ( |
51 | 58 | DefaultMeta, | ||
52 | 59 | locks, | ||
53 | 60 | ) | ||
54 | 58 | from maasserver.clusterrpc.pods import decompose_machine | 61 | from maasserver.clusterrpc.pods import decompose_machine |
55 | 59 | from maasserver.clusterrpc.power import ( | 62 | from maasserver.clusterrpc.power import ( |
56 | 60 | power_cycle, | 63 | power_cycle, |
57 | @@ -143,6 +146,7 @@ from maasserver.storage_layouts import ( | |||
58 | 143 | StorageLayoutError, | 146 | StorageLayoutError, |
59 | 144 | StorageLayoutMissingBootDiskError, | 147 | StorageLayoutMissingBootDiskError, |
60 | 145 | ) | 148 | ) |
61 | 149 | from maasserver.utils import synchronised | ||
62 | 146 | from maasserver.utils.dns import validate_hostname | 150 | from maasserver.utils.dns import validate_hostname |
63 | 147 | from maasserver.utils.mac import get_vendor_for_mac | 151 | from maasserver.utils.mac import get_vendor_for_mac |
64 | 148 | from maasserver.utils.orm import ( | 152 | from maasserver.utils.orm import ( |
65 | @@ -151,6 +155,7 @@ from maasserver.utils.orm import ( | |||
66 | 151 | post_commit, | 155 | post_commit, |
67 | 152 | post_commit_do, | 156 | post_commit_do, |
68 | 153 | transactional, | 157 | transactional, |
69 | 158 | with_connection, | ||
70 | 154 | ) | 159 | ) |
71 | 155 | from maasserver.utils.threads import ( | 160 | from maasserver.utils.threads import ( |
72 | 156 | callOutToDatabase, | 161 | callOutToDatabase, |
73 | @@ -208,6 +213,7 @@ from provisioningserver.utils.twisted import ( | |||
74 | 208 | asynchronous, | 213 | asynchronous, |
75 | 209 | callOut, | 214 | callOut, |
76 | 210 | deferWithTimeout, | 215 | deferWithTimeout, |
77 | 216 | synchronous, | ||
78 | 211 | ) | 217 | ) |
79 | 212 | from twisted.internet.defer import ( | 218 | from twisted.internet.defer import ( |
80 | 213 | Deferred, | 219 | Deferred, |
81 | @@ -4595,6 +4601,10 @@ class Controller(Node): | |||
82 | 4595 | for interface in interfaces | 4601 | for interface in interfaces |
83 | 4596 | } | 4602 | } |
84 | 4597 | 4603 | ||
85 | 4604 | @synchronous | ||
86 | 4605 | @with_connection | ||
87 | 4606 | @synchronised(locks.startup) | ||
88 | 4607 | @transactional | ||
89 | 4598 | def update_interfaces(self, interfaces): | 4608 | def update_interfaces(self, interfaces): |
90 | 4599 | """Update the interfaces attached to the controller. | 4609 | """Update the interfaces attached to the controller. |
91 | 4600 | 4610 | ||
92 | @@ -4872,7 +4882,7 @@ class RackController(Controller): | |||
93 | 4872 | Service.objects.update_service_for( | 4882 | Service.objects.update_service_for( |
94 | 4873 | self, "rackd", SERVICE_STATUS.DEGRADED, | 4883 | self, "rackd", SERVICE_STATUS.DEGRADED, |
95 | 4874 | "{:.0%} connected to region controllers.".format( | 4884 | "{:.0%} connected to region controllers.".format( |
97 | 4875 | percentage)) | 4885 | 1.0 - percentage)) |
98 | 4876 | 4886 | ||
99 | 4877 | def get_image_sync_status(self, boot_images=None): | 4887 | def get_image_sync_status(self, boot_images=None): |
100 | 4878 | """Return the status of the boot image import process.""" | 4888 | """Return the status of the boot image import process.""" |
101 | diff --git a/src/maasserver/models/tests/test_node.py b/src/maasserver/models/tests/test_node.py | |||
102 | index 0467bba..e8ece11 100644 | |||
103 | --- a/src/maasserver/models/tests/test_node.py | |||
104 | +++ b/src/maasserver/models/tests/test_node.py | |||
105 | @@ -9495,7 +9495,7 @@ class TestRackController(MAASTransactionServerTestCase): | |||
106 | 9495 | MatchesStructure.byEquality( | 9495 | MatchesStructure.byEquality( |
107 | 9496 | status=SERVICE_STATUS.DEGRADED, status_info=( | 9496 | status=SERVICE_STATUS.DEGRADED, status_info=( |
108 | 9497 | "{:.0%} connected to region controllers.".format( | 9497 | "{:.0%} connected to region controllers.".format( |
110 | 9498 | percentage)))) | 9498 | 1.0 - percentage)))) |
111 | 9499 | 9499 | ||
112 | 9500 | fake_images = [ | 9500 | fake_images = [ |
113 | 9501 | { | 9501 | { |
114 | diff --git a/src/maasserver/rpc/rackcontrollers.py b/src/maasserver/rpc/rackcontrollers.py | |||
115 | index 9c49532..7c82548 100644 | |||
116 | --- a/src/maasserver/rpc/rackcontrollers.py | |||
117 | +++ b/src/maasserver/rpc/rackcontrollers.py | |||
118 | @@ -72,7 +72,7 @@ def handle_upgrade(rack_controller, nodegroup_uuid): | |||
119 | 72 | 72 | ||
120 | 73 | @synchronous | 73 | @synchronous |
121 | 74 | @with_connection | 74 | @with_connection |
123 | 75 | @synchronised(locks.rack_registration) | 75 | @synchronised(locks.startup) |
124 | 76 | @transactional | 76 | @transactional |
125 | 77 | def register( | 77 | def register( |
126 | 78 | system_id=None, hostname='', interfaces=None, | 78 | system_id=None, hostname='', interfaces=None, |
127 | diff --git a/src/maasserver/rpc/regionservice.py b/src/maasserver/rpc/regionservice.py | |||
128 | index 3ddf50c..534002b 100644 | |||
129 | --- a/src/maasserver/rpc/regionservice.py | |||
130 | +++ b/src/maasserver/rpc/regionservice.py | |||
131 | @@ -1214,15 +1214,31 @@ class RegionAdvertising: | |||
132 | 1214 | Each tuple corresponds to somewhere an event-loop is listening | 1214 | Each tuple corresponds to somewhere an event-loop is listening |
133 | 1215 | within the whole region. The `name` is the event-loop name. | 1215 | within the whole region. The `name` is the event-loop name. |
134 | 1216 | """ | 1216 | """ |
135 | 1217 | # Each regiond might be running a local bridge that duplicates the | ||
136 | 1218 | # same IP address across region controllers. Each region controller | ||
137 | 1219 | # must output a set of unique of IP addresses, to prevent the rack | ||
138 | 1220 | # controller from connecting to a different region controller then | ||
139 | 1221 | # the rack controller was expecting to be connecting to. | ||
140 | 1222 | def _unique_to_region(address, region, regions): | ||
141 | 1223 | for region_obj in regions: | ||
142 | 1224 | if region_obj != region: | ||
143 | 1225 | for process in region_obj.processes.all(): | ||
144 | 1226 | for endpoint in process.endpoints.all(): | ||
145 | 1227 | if endpoint.address == address: | ||
146 | 1228 | return False | ||
147 | 1229 | return True | ||
148 | 1230 | |||
149 | 1217 | regions = RegionController.objects.all() | 1231 | regions = RegionController.objects.all() |
150 | 1218 | regions = regions.prefetch_related("processes", "processes__endpoints") | 1232 | regions = regions.prefetch_related("processes", "processes__endpoints") |
151 | 1219 | all_endpoints = [] | 1233 | all_endpoints = [] |
152 | 1220 | for region_obj in regions: | 1234 | for region_obj in regions: |
153 | 1221 | for process in region_obj.processes.all(): | 1235 | for process in region_obj.processes.all(): |
154 | 1222 | for endpoint in process.endpoints.all(): | 1236 | for endpoint in process.endpoints.all(): |
158 | 1223 | all_endpoints.append(( | 1237 | if _unique_to_region( |
159 | 1224 | "%s:pid=%d" % (region_obj.hostname, process.pid), | 1238 | endpoint.address, region_obj, regions): |
160 | 1225 | endpoint.address, endpoint.port)) | 1239 | all_endpoints.append(( |
161 | 1240 | "%s:pid=%d" % (region_obj.hostname, process.pid), | ||
162 | 1241 | endpoint.address, endpoint.port)) | ||
163 | 1226 | return all_endpoints | 1242 | return all_endpoints |
164 | 1227 | 1243 | ||
165 | 1228 | @classmethod | 1244 | @classmethod |
166 | diff --git a/src/maasserver/rpc/tests/test_rackcontrollers.py b/src/maasserver/rpc/tests/test_rackcontrollers.py | |||
167 | index 8c62f80..e5b075b 100644 | |||
168 | --- a/src/maasserver/rpc/tests/test_rackcontrollers.py | |||
169 | +++ b/src/maasserver/rpc/tests/test_rackcontrollers.py | |||
170 | @@ -288,11 +288,11 @@ class TestRegisterRackController(MAASServerTestCase): | |||
171 | 288 | for name, interface in interfaces.items() | 288 | for name, interface in interfaces.items() |
172 | 289 | ))) | 289 | ))) |
173 | 290 | 290 | ||
175 | 291 | def test_registers_with_rack_registration_lock_held(self): | 291 | def test_registers_with_startup_lock_held(self): |
176 | 292 | lock_status = [] | 292 | lock_status = [] |
177 | 293 | 293 | ||
178 | 294 | def record_lock_status(*args): | 294 | def record_lock_status(*args): |
180 | 295 | lock_status.append(locks.rack_registration.is_locked()) | 295 | lock_status.append(locks.startup.is_locked()) |
181 | 296 | return None # Simulate that no rack found. | 296 | return None # Simulate that no rack found. |
182 | 297 | 297 | ||
183 | 298 | find = self.patch(rackcontrollers, "find") | 298 | find = self.patch(rackcontrollers, "find") |
184 | diff --git a/src/maasserver/rpc/tests/test_regionservice.py b/src/maasserver/rpc/tests/test_regionservice.py | |||
185 | index 0a8016a..12678c5 100644 | |||
186 | --- a/src/maasserver/rpc/tests/test_regionservice.py | |||
187 | +++ b/src/maasserver/rpc/tests/test_regionservice.py | |||
188 | @@ -33,6 +33,7 @@ from maasserver.models import ( | |||
189 | 33 | RackController, | 33 | RackController, |
190 | 34 | RegionController, | 34 | RegionController, |
191 | 35 | RegionControllerProcess, | 35 | RegionControllerProcess, |
192 | 36 | RegionControllerProcessEndpoint, | ||
193 | 36 | RegionRackRPCConnection, | 37 | RegionRackRPCConnection, |
194 | 37 | Service as ServiceModel, | 38 | Service as ServiceModel, |
195 | 38 | timestampedmodel, | 39 | timestampedmodel, |
196 | @@ -1604,6 +1605,42 @@ class TestRegionAdvertising(MAASServerTestCase): | |||
197 | 1604 | ] | 1605 | ] |
198 | 1605 | self.assertItemsEqual(expected, advertising.dump()) | 1606 | self.assertItemsEqual(expected, advertising.dump()) |
199 | 1606 | 1607 | ||
200 | 1608 | def test_dump_doesnt_duplicate_ips_across_region_controllers(self): | ||
201 | 1609 | duplicate_address = factory.make_ipv4_address() | ||
202 | 1610 | addresses = { | ||
203 | 1611 | (factory.make_ipv4_address(), factory.pick_port()), | ||
204 | 1612 | (factory.make_ipv4_address(), factory.pick_port()), | ||
205 | 1613 | } | ||
206 | 1614 | advertising = RegionAdvertising.promote() | ||
207 | 1615 | advertising.update( | ||
208 | 1616 | addresses.union({(duplicate_address, factory.pick_port()), })) | ||
209 | 1617 | |||
210 | 1618 | other_addresses = { | ||
211 | 1619 | (factory.make_ipv4_address(), factory.pick_port()), | ||
212 | 1620 | (factory.make_ipv4_address(), factory.pick_port()), | ||
213 | 1621 | } | ||
214 | 1622 | other_region = factory.make_Node(node_type=NODE_TYPE.REGION_CONTROLLER) | ||
215 | 1623 | other_process = RegionControllerProcess.objects.create( | ||
216 | 1624 | region=other_region, pid=randint(1, 1000)) | ||
217 | 1625 | for address, port in other_addresses: | ||
218 | 1626 | RegionControllerProcessEndpoint.objects.create( | ||
219 | 1627 | process=other_process, address=address, port=port) | ||
220 | 1628 | RegionControllerProcessEndpoint.objects.create( | ||
221 | 1629 | process=other_process, | ||
222 | 1630 | address=duplicate_address, | ||
223 | 1631 | port=factory.pick_port()) | ||
224 | 1632 | |||
225 | 1633 | expected = [ | ||
226 | 1634 | ("%s:pid=%d" % (gethostname(), os.getpid()), addr, port) | ||
227 | 1635 | for (addr, port) in addresses | ||
228 | 1636 | ] + [ | ||
229 | 1637 | ("%s:pid=%d" % ( | ||
230 | 1638 | other_region.hostname, other_process.pid), | ||
231 | 1639 | addr, port) | ||
232 | 1640 | for (addr, port) in other_addresses | ||
233 | 1641 | ] | ||
234 | 1642 | self.assertItemsEqual(expected, advertising.dump()) | ||
235 | 1643 | |||
236 | 1607 | def test__adds_connection_and_removes_connection(self): | 1644 | def test__adds_connection_and_removes_connection(self): |
237 | 1608 | advertising = RegionAdvertising.promote() | 1645 | advertising = RegionAdvertising.promote() |
238 | 1609 | process = advertising.getRegionProcess() | 1646 | process = advertising.getRegionProcess() |
239 | diff --git a/src/maasserver/utils/orm.py b/src/maasserver/utils/orm.py | |||
240 | index 2ec0590..40f8a3f 100644 | |||
241 | --- a/src/maasserver/utils/orm.py | |||
242 | +++ b/src/maasserver/utils/orm.py | |||
243 | @@ -616,14 +616,25 @@ def connected(): | |||
244 | 616 | 616 | ||
245 | 617 | If there is not yet a connection to the database, this will connect on | 617 | If there is not yet a connection to the database, this will connect on |
246 | 618 | entry and disconnect on exit. Preexisting connections will be left alone. | 618 | entry and disconnect on exit. Preexisting connections will be left alone. |
247 | 619 | |||
248 | 620 | If the preexisting connection is not usable it is closed and a new | ||
249 | 621 | connection is made. | ||
250 | 619 | """ | 622 | """ |
251 | 620 | if connection.connection is None: | 623 | if connection.connection is None: |
252 | 624 | connection.close_if_unusable_or_obsolete() | ||
253 | 621 | connection.ensure_connection() | 625 | connection.ensure_connection() |
254 | 622 | try: | 626 | try: |
255 | 623 | yield | 627 | yield |
256 | 624 | finally: | 628 | finally: |
257 | 625 | connection.close() | 629 | connection.close() |
258 | 630 | elif connection.is_usable(): | ||
259 | 631 | yield | ||
260 | 626 | else: | 632 | else: |
261 | 633 | # Connection is not usable, so we disconnect and reconnect. Since | ||
262 | 634 | # the connection was previously connected we do not disconnect this | ||
263 | 635 | # new connection. | ||
264 | 636 | connection.close_if_unusable_or_obsolete() | ||
265 | 637 | connection.ensure_connection() | ||
266 | 627 | yield | 638 | yield |
267 | 628 | 639 | ||
268 | 629 | 640 | ||
269 | diff --git a/src/maasserver/utils/tests/test_orm.py b/src/maasserver/utils/tests/test_orm.py | |||
270 | index 064f1b0..94ee817 100644 | |||
271 | --- a/src/maasserver/utils/tests/test_orm.py | |||
272 | +++ b/src/maasserver/utils/tests/test_orm.py | |||
273 | @@ -1004,6 +1004,22 @@ class TestConnected(MAASTransactionServerTestCase): | |||
274 | 1004 | self.assertThat(connection.connection, Is(preexisting_connection)) | 1004 | self.assertThat(connection.connection, Is(preexisting_connection)) |
275 | 1005 | self.assertThat(connection.connection, Is(preexisting_connection)) | 1005 | self.assertThat(connection.connection, Is(preexisting_connection)) |
276 | 1006 | 1006 | ||
277 | 1007 | def test__disconnects_and_reconnects_if_not_usable(self): | ||
278 | 1008 | connection.ensure_connection() | ||
279 | 1009 | preexisting_connection = connection.connection | ||
280 | 1010 | |||
281 | 1011 | connection.errors_occurred = True | ||
282 | 1012 | self.patch(connection, "is_usable").return_value = False | ||
283 | 1013 | |||
284 | 1014 | self.assertThat(connection.connection, Not(Is(None))) | ||
285 | 1015 | with orm.connected(): | ||
286 | 1016 | self.assertThat( | ||
287 | 1017 | connection.connection, Not(Is(preexisting_connection))) | ||
288 | 1018 | self.assertThat(connection.connection, Not(Is(None))) | ||
289 | 1019 | |||
290 | 1020 | self.assertThat(connection.connection, Not(Is(preexisting_connection))) | ||
291 | 1021 | self.assertThat(connection.connection, Not(Is(None))) | ||
292 | 1022 | |||
293 | 1007 | 1023 | ||
294 | 1008 | class TestWithConnection(MAASTransactionServerTestCase): | 1024 | class TestWithConnection(MAASTransactionServerTestCase): |
295 | 1009 | """Tests for the `orm.with_connection` decorator.""" | 1025 | """Tests for the `orm.with_connection` decorator.""" |
296 | diff --git a/src/provisioningserver/rackdservices/tests/test_tftp.py b/src/provisioningserver/rackdservices/tests/test_tftp.py | |||
297 | index 1082d6a..14510d9 100644 | |||
298 | --- a/src/provisioningserver/rackdservices/tests/test_tftp.py | |||
299 | +++ b/src/provisioningserver/rackdservices/tests/test_tftp.py | |||
300 | @@ -379,6 +379,229 @@ class TestTFTPBackend(MAASTestCase): | |||
301 | 379 | ) | 379 | ) |
302 | 380 | 380 | ||
303 | 381 | @inlineCallbacks | 381 | @inlineCallbacks |
304 | 382 | def test_get_boot_method_reader_uses_same_client(self): | ||
305 | 383 | # Fake configuration parameters, as discovered from the file path. | ||
306 | 384 | fake_params = {"mac": factory.make_mac_address("-")} | ||
307 | 385 | # Fake kernel configuration parameters, as returned from the RPC call. | ||
308 | 386 | fake_kernel_params = make_kernel_parameters() | ||
309 | 387 | fake_params = fake_kernel_params._asdict() | ||
310 | 388 | |||
311 | 389 | # Stub the output of list_boot_images so the label is set in the | ||
312 | 390 | # kernel parameters. | ||
313 | 391 | boot_image = { | ||
314 | 392 | "osystem": fake_params["osystem"], | ||
315 | 393 | "release": fake_params["release"], | ||
316 | 394 | "architecture": fake_params["arch"], | ||
317 | 395 | "subarchitecture": fake_params["subarch"], | ||
318 | 396 | "purpose": fake_params["purpose"], | ||
319 | 397 | "supported_subarches": "", | ||
320 | 398 | "label": fake_params["label"], | ||
321 | 399 | } | ||
322 | 400 | self.patch(tftp_module, "list_boot_images").return_value = [boot_image] | ||
323 | 401 | del fake_params["label"] | ||
324 | 402 | |||
325 | 403 | # Stub RPC call to return the fake configuration parameters. | ||
326 | 404 | clients = [] | ||
327 | 405 | for _ in range(10): | ||
328 | 406 | client = Mock() | ||
329 | 407 | client.localIdent = factory.make_name("system_id") | ||
330 | 408 | client.side_effect = lambda *args, **kwargs: ( | ||
331 | 409 | succeed(dict(fake_params))) | ||
332 | 410 | clients.append(client) | ||
333 | 411 | client_service = Mock() | ||
334 | 412 | client_service.getClientNow.side_effect = [ | ||
335 | 413 | succeed(client) | ||
336 | 414 | for client in clients | ||
337 | 415 | ] | ||
338 | 416 | client_service.getAllClients.return_value = clients | ||
339 | 417 | |||
340 | 418 | # get_boot_method_reader() takes a dict() of parameters and returns an | ||
341 | 419 | # `IReader` of a PXE configuration, rendered by | ||
342 | 420 | # `PXEBootMethod.get_reader`. | ||
343 | 421 | backend = TFTPBackend( | ||
344 | 422 | self.make_dir(), client_service) | ||
345 | 423 | |||
346 | 424 | # Stub get_reader to return the render parameters. | ||
347 | 425 | method = PXEBootMethod() | ||
348 | 426 | fake_render_result = factory.make_name("render").encode("utf-8") | ||
349 | 427 | render_patch = self.patch(method, "get_reader") | ||
350 | 428 | render_patch.return_value = BytesReader(fake_render_result) | ||
351 | 429 | |||
352 | 430 | # Get the reader once. | ||
353 | 431 | remote_ip = factory.make_ipv4_address() | ||
354 | 432 | params_with_ip = dict(fake_params) | ||
355 | 433 | params_with_ip['remote_ip'] = remote_ip | ||
356 | 434 | reader = yield backend.get_boot_method_reader(method, params_with_ip) | ||
357 | 435 | self.addCleanup(reader.finish) | ||
358 | 436 | |||
359 | 437 | # Get the reader twice. | ||
360 | 438 | params_with_ip = dict(fake_params) | ||
361 | 439 | params_with_ip['remote_ip'] = remote_ip | ||
362 | 440 | reader = yield backend.get_boot_method_reader(method, params_with_ip) | ||
363 | 441 | self.addCleanup(reader.finish) | ||
364 | 442 | |||
365 | 443 | # Only one client is saved. | ||
366 | 444 | self.assertEquals(clients[0], backend.client_to_remote[remote_ip]) | ||
367 | 445 | |||
368 | 446 | # Only the first client should have been called twice, and all the | ||
369 | 447 | # other clients should not have been called. | ||
370 | 448 | self.assertEquals(2, clients[0].call_count) | ||
371 | 449 | for idx in range(1, 10): | ||
372 | 450 | self.assertThat(clients[idx], MockNotCalled()) | ||
373 | 451 | |||
374 | 452 | @inlineCallbacks | ||
375 | 453 | def test_get_boot_method_reader_uses_different_clients(self): | ||
376 | 454 | # Fake configuration parameters, as discovered from the file path. | ||
377 | 455 | fake_params = {"mac": factory.make_mac_address("-")} | ||
378 | 456 | # Fake kernel configuration parameters, as returned from the RPC call. | ||
379 | 457 | fake_kernel_params = make_kernel_parameters() | ||
380 | 458 | fake_params = fake_kernel_params._asdict() | ||
381 | 459 | |||
382 | 460 | # Stub the output of list_boot_images so the label is set in the | ||
383 | 461 | # kernel parameters. | ||
384 | 462 | boot_image = { | ||
385 | 463 | "osystem": fake_params["osystem"], | ||
386 | 464 | "release": fake_params["release"], | ||
387 | 465 | "architecture": fake_params["arch"], | ||
388 | 466 | "subarchitecture": fake_params["subarch"], | ||
389 | 467 | "purpose": fake_params["purpose"], | ||
390 | 468 | "supported_subarches": "", | ||
391 | 469 | "label": fake_params["label"], | ||
392 | 470 | } | ||
393 | 471 | self.patch(tftp_module, "list_boot_images").return_value = [boot_image] | ||
394 | 472 | del fake_params["label"] | ||
395 | 473 | |||
396 | 474 | # Stub RPC call to return the fake configuration parameters. | ||
397 | 475 | clients = [] | ||
398 | 476 | for _ in range(10): | ||
399 | 477 | client = Mock() | ||
400 | 478 | client.localIdent = factory.make_name("system_id") | ||
401 | 479 | client.side_effect = lambda *args, **kwargs: ( | ||
402 | 480 | succeed(dict(fake_params))) | ||
403 | 481 | clients.append(client) | ||
404 | 482 | client_service = Mock() | ||
405 | 483 | client_service.getClientNow.side_effect = [ | ||
406 | 484 | succeed(client) | ||
407 | 485 | for client in clients | ||
408 | 486 | ] | ||
409 | 487 | client_service.getAllClients.return_value = clients | ||
410 | 488 | |||
411 | 489 | # get_boot_method_reader() takes a dict() of parameters and returns an | ||
412 | 490 | # `IReader` of a PXE configuration, rendered by | ||
413 | 491 | # `PXEBootMethod.get_reader`. | ||
414 | 492 | backend = TFTPBackend( | ||
415 | 493 | self.make_dir(), client_service) | ||
416 | 494 | |||
417 | 495 | # Stub get_reader to return the render parameters. | ||
418 | 496 | method = PXEBootMethod() | ||
419 | 497 | fake_render_result = factory.make_name("render").encode("utf-8") | ||
420 | 498 | render_patch = self.patch(method, "get_reader") | ||
421 | 499 | render_patch.return_value = BytesReader(fake_render_result) | ||
422 | 500 | |||
423 | 501 | # Get the reader once. | ||
424 | 502 | remote_ip_one = factory.make_ipv4_address() | ||
425 | 503 | params_with_ip = dict(fake_params) | ||
426 | 504 | params_with_ip['remote_ip'] = remote_ip_one | ||
427 | 505 | reader = yield backend.get_boot_method_reader(method, params_with_ip) | ||
428 | 506 | self.addCleanup(reader.finish) | ||
429 | 507 | |||
430 | 508 | # Get the reader twice. | ||
431 | 509 | remote_ip_two = factory.make_ipv4_address() | ||
432 | 510 | params_with_ip = dict(fake_params) | ||
433 | 511 | params_with_ip['remote_ip'] = remote_ip_two | ||
434 | 512 | reader = yield backend.get_boot_method_reader(method, params_with_ip) | ||
435 | 513 | self.addCleanup(reader.finish) | ||
436 | 514 | |||
437 | 515 | # The both clients are saved. | ||
438 | 516 | self.assertEquals(clients[0], backend.client_to_remote[remote_ip_one]) | ||
439 | 517 | self.assertEquals(clients[1], backend.client_to_remote[remote_ip_two]) | ||
440 | 518 | |||
441 | 519 | # Only the first and second client should have been called once, and | ||
442 | 520 | # all the other clients should not have been called. | ||
443 | 521 | self.assertEquals(1, clients[0].call_count) | ||
444 | 522 | self.assertEquals(1, clients[1].call_count) | ||
445 | 523 | for idx in range(2, 10): | ||
446 | 524 | self.assertThat(clients[idx], MockNotCalled()) | ||
447 | 525 | |||
448 | 526 | @inlineCallbacks | ||
449 | 527 | def test_get_boot_method_reader_grabs_new_client_on_lost_conn(self): | ||
450 | 528 | # Fake configuration parameters, as discovered from the file path. | ||
451 | 529 | fake_params = {"mac": factory.make_mac_address("-")} | ||
452 | 530 | # Fake kernel configuration parameters, as returned from the RPC call. | ||
453 | 531 | fake_kernel_params = make_kernel_parameters() | ||
454 | 532 | fake_params = fake_kernel_params._asdict() | ||
455 | 533 | |||
456 | 534 | # Stub the output of list_boot_images so the label is set in the | ||
457 | 535 | # kernel parameters. | ||
458 | 536 | boot_image = { | ||
459 | 537 | "osystem": fake_params["osystem"], | ||
460 | 538 | "release": fake_params["release"], | ||
461 | 539 | "architecture": fake_params["arch"], | ||
462 | 540 | "subarchitecture": fake_params["subarch"], | ||
463 | 541 | "purpose": fake_params["purpose"], | ||
464 | 542 | "supported_subarches": "", | ||
465 | 543 | "label": fake_params["label"], | ||
466 | 544 | } | ||
467 | 545 | self.patch(tftp_module, "list_boot_images").return_value = [boot_image] | ||
468 | 546 | del fake_params["label"] | ||
469 | 547 | |||
470 | 548 | # Stub RPC call to return the fake configuration parameters. | ||
471 | 549 | clients = [] | ||
472 | 550 | for _ in range(10): | ||
473 | 551 | client = Mock() | ||
474 | 552 | client.localIdent = factory.make_name("system_id") | ||
475 | 553 | client.side_effect = lambda *args, **kwargs: ( | ||
476 | 554 | succeed(dict(fake_params))) | ||
477 | 555 | clients.append(client) | ||
478 | 556 | client_service = Mock() | ||
479 | 557 | client_service.getClientNow.side_effect = [ | ||
480 | 558 | succeed(client) | ||
481 | 559 | for client in clients | ||
482 | 560 | ] | ||
483 | 561 | client_service.getAllClients.side_effect = [ | ||
484 | 562 | clients[1:], | ||
485 | 563 | clients[2:], | ||
486 | 564 | ] | ||
487 | 565 | |||
488 | 566 | # get_boot_method_reader() takes a dict() of parameters and returns an | ||
489 | 567 | # `IReader` of a PXE configuration, rendered by | ||
490 | 568 | # `PXEBootMethod.get_reader`. | ||
491 | 569 | backend = TFTPBackend( | ||
492 | 570 | self.make_dir(), client_service) | ||
493 | 571 | |||
494 | 572 | # Stub get_reader to return the render parameters. | ||
495 | 573 | method = PXEBootMethod() | ||
496 | 574 | fake_render_result = factory.make_name("render").encode("utf-8") | ||
497 | 575 | render_patch = self.patch(method, "get_reader") | ||
498 | 576 | render_patch.return_value = BytesReader(fake_render_result) | ||
499 | 577 | |||
500 | 578 | # Get the reader once. | ||
501 | 579 | remote_ip = factory.make_ipv4_address() | ||
502 | 580 | params_with_ip = dict(fake_params) | ||
503 | 581 | params_with_ip['remote_ip'] = remote_ip | ||
504 | 582 | reader = yield backend.get_boot_method_reader(method, params_with_ip) | ||
505 | 583 | self.addCleanup(reader.finish) | ||
506 | 584 | |||
507 | 585 | # The first client is now saved. | ||
508 | 586 | self.assertEquals(clients[0], backend.client_to_remote[remote_ip]) | ||
509 | 587 | |||
510 | 588 | # Get the reader twice. | ||
511 | 589 | params_with_ip = dict(fake_params) | ||
512 | 590 | params_with_ip['remote_ip'] = remote_ip | ||
513 | 591 | reader = yield backend.get_boot_method_reader(method, params_with_ip) | ||
514 | 592 | self.addCleanup(reader.finish) | ||
515 | 593 | |||
516 | 594 | # The second client is now saved. | ||
517 | 595 | self.assertEquals(clients[1], backend.client_to_remote[remote_ip]) | ||
518 | 596 | |||
519 | 597 | # Only the first and second client should have been called once, and | ||
520 | 598 | # all the other clients should not have been called. | ||
521 | 599 | self.assertEquals(1, clients[0].call_count) | ||
522 | 600 | self.assertEquals(1, clients[1].call_count) | ||
523 | 601 | for idx in range(2, 10): | ||
524 | 602 | self.assertThat(clients[idx], MockNotCalled()) | ||
525 | 603 | |||
526 | 604 | @inlineCallbacks | ||
527 | 382 | def test_get_boot_method_reader_returns_rendered_params(self): | 605 | def test_get_boot_method_reader_returns_rendered_params(self): |
528 | 383 | # Fake configuration parameters, as discovered from the file path. | 606 | # Fake configuration parameters, as discovered from the file path. |
529 | 384 | fake_params = {"mac": factory.make_mac_address("-")} | 607 | fake_params = {"mac": factory.make_mac_address("-")} |
530 | diff --git a/src/provisioningserver/rackdservices/tftp.py b/src/provisioningserver/rackdservices/tftp.py | |||
531 | index b56d4b3..5dc1547 100644 | |||
532 | --- a/src/provisioningserver/rackdservices/tftp.py | |||
533 | +++ b/src/provisioningserver/rackdservices/tftp.py | |||
534 | @@ -66,6 +66,7 @@ from twisted.internet.defer import ( | |||
535 | 66 | inlineCallbacks, | 66 | inlineCallbacks, |
536 | 67 | maybeDeferred, | 67 | maybeDeferred, |
537 | 68 | returnValue, | 68 | returnValue, |
538 | 69 | succeed, | ||
539 | 69 | ) | 70 | ) |
540 | 70 | from twisted.internet.task import deferLater | 71 | from twisted.internet.task import deferLater |
541 | 71 | from twisted.python.filepath import FilePath | 72 | from twisted.python.filepath import FilePath |
542 | @@ -163,9 +164,48 @@ class TFTPBackend(FilesystemSynchronousBackend): | |||
543 | 163 | base_path = FilePath(base_path) | 164 | base_path = FilePath(base_path) |
544 | 164 | super(TFTPBackend, self).__init__( | 165 | super(TFTPBackend, self).__init__( |
545 | 165 | base_path, can_read=True, can_write=False) | 166 | base_path, can_read=True, can_write=False) |
546 | 167 | self.client_to_remote = {} | ||
547 | 166 | self.client_service = client_service | 168 | self.client_service = client_service |
548 | 167 | self.fetcher = RPCFetcher() | 169 | self.fetcher = RPCFetcher() |
549 | 168 | 170 | ||
550 | 171 | def _get_new_client_for_remote(self, remote_ip): | ||
551 | 172 | """Return a new client for the `remote_ip`. | ||
552 | 173 | |||
553 | 174 | Don't use directly called from `get_client_for`. | ||
554 | 175 | """ | ||
555 | 176 | def store_client(client): | ||
556 | 177 | self.client_to_remote[remote_ip] = client | ||
557 | 178 | return client | ||
558 | 179 | |||
559 | 180 | d = self.client_service.getClientNow() | ||
560 | 181 | d.addCallback(store_client) | ||
561 | 182 | return d | ||
562 | 183 | |||
563 | 184 | def get_client_for(self, params): | ||
564 | 185 | """Always gets the same client based on `params`. | ||
565 | 186 | |||
566 | 187 | This is done so that all TFTP requests from the same remote client go | ||
567 | 188 | to the same regiond process. `RPCFetcher` only duplciate on the client | ||
568 | 189 | and arguments, so if the client is not the same the duplicate effort | ||
569 | 190 | is not consolidated. | ||
570 | 191 | """ | ||
571 | 192 | remote_ip = params.get('remote_ip') | ||
572 | 193 | if remote_ip: | ||
573 | 194 | client = self.client_to_remote.get(remote_ip, None) | ||
574 | 195 | if client is None: | ||
575 | 196 | # Get a new client for the remote_ip. | ||
576 | 197 | return self._get_new_client_for_remote(remote_ip) | ||
577 | 198 | else: | ||
578 | 199 | # Check that the existing client is still valid. | ||
579 | 200 | clients = self.client_service.getAllClients() | ||
580 | 201 | if client in clients: | ||
581 | 202 | return succeed(client) | ||
582 | 203 | else: | ||
583 | 204 | del self.client_to_remote[remote_ip] | ||
584 | 205 | return self._get_new_client_for_remote(remote_ip) | ||
585 | 206 | else: | ||
586 | 207 | return self.client_service.getClientNow() | ||
587 | 208 | |||
588 | 169 | @inlineCallbacks | 209 | @inlineCallbacks |
589 | 170 | @typed | 210 | @typed |
590 | 171 | def get_boot_method(self, file_name: TFTPPath): | 211 | def get_boot_method(self, file_name: TFTPPath): |
591 | @@ -243,7 +283,7 @@ class TFTPBackend(FilesystemSynchronousBackend): | |||
592 | 243 | d.addCallback(lambda data: KernelParameters(**data)) | 283 | d.addCallback(lambda data: KernelParameters(**data)) |
593 | 244 | return d | 284 | return d |
594 | 245 | 285 | ||
596 | 246 | d = self.client_service.getClientNow() | 286 | d = self.get_client_for(params) |
597 | 247 | d.addCallback(fetch, params) | 287 | d.addCallback(fetch, params) |
598 | 248 | return d | 288 | return d |
599 | 249 | 289 | ||
600 | diff --git a/src/provisioningserver/rpc/clusterservice.py b/src/provisioningserver/rpc/clusterservice.py | |||
601 | index 26e2172..a4ab2ba 100644 | |||
602 | --- a/src/provisioningserver/rpc/clusterservice.py | |||
603 | +++ b/src/provisioningserver/rpc/clusterservice.py | |||
604 | @@ -106,6 +106,7 @@ from twisted import web | |||
605 | 106 | from twisted.application.internet import TimerService | 106 | from twisted.application.internet import TimerService |
606 | 107 | from twisted.internet import reactor | 107 | from twisted.internet import reactor |
607 | 108 | from twisted.internet.defer import ( | 108 | from twisted.internet.defer import ( |
608 | 109 | DeferredList, | ||
609 | 109 | inlineCallbacks, | 110 | inlineCallbacks, |
610 | 110 | maybeDeferred, | 111 | maybeDeferred, |
611 | 111 | returnValue, | 112 | returnValue, |
612 | @@ -784,6 +785,8 @@ class ClusterClient(Cluster): | |||
613 | 784 | log.msg("Event-loop '%s' authenticated." % self.ident) | 785 | log.msg("Event-loop '%s' authenticated." % self.ident) |
614 | 785 | registered = yield self.registerRackWithRegion() | 786 | registered = yield self.registerRackWithRegion() |
615 | 786 | if registered: | 787 | if registered: |
616 | 788 | if self.eventloop in self.service.try_connections: | ||
617 | 789 | del self.service.try_connections[self.eventloop] | ||
618 | 787 | self.service.connections[self.eventloop] = self | 790 | self.service.connections[self.eventloop] = self |
619 | 788 | self.ready.set(self.eventloop) | 791 | self.ready.set(self.eventloop) |
620 | 789 | else: | 792 | else: |
621 | @@ -935,6 +938,8 @@ class ClusterClientService(TimerService, object): | |||
622 | 935 | super(ClusterClientService, self).__init__( | 938 | super(ClusterClientService, self).__init__( |
623 | 936 | self._calculate_interval(None, None), self._tryUpdate) | 939 | self._calculate_interval(None, None), self._tryUpdate) |
624 | 937 | self.connections = {} | 940 | self.connections = {} |
625 | 941 | self.try_connections = {} | ||
626 | 942 | self._previous_work = (None, None) | ||
627 | 938 | self.clock = reactor | 943 | self.clock = reactor |
628 | 939 | 944 | ||
629 | 940 | # XXX jtv 2014-09-23, bug=1372767: Fix | 945 | # XXX jtv 2014-09-23, bug=1372767: Fix |
630 | @@ -1172,6 +1177,48 @@ class ClusterClientService(TimerService, object): | |||
631 | 1172 | ] | 1177 | ] |
632 | 1173 | for name, addresses in eventloops.items() | 1178 | for name, addresses in eventloops.items() |
633 | 1174 | } | 1179 | } |
634 | 1180 | |||
635 | 1181 | drop, connect = self._calculate_work(eventloops) | ||
636 | 1182 | |||
637 | 1183 | # Log fully connected only once. If that state changes then log | ||
638 | 1184 | # it again. This prevents flooding the log with the same message when | ||
639 | 1185 | # the state of the connections has not changed. | ||
640 | 1186 | prev_work, self._previous_work = self._previous_work, (drop, connect) | ||
641 | 1187 | if len(drop) == 0 and len(connect) == 0: | ||
642 | 1188 | if prev_work != (drop, connect) and len(eventloops) > 0: | ||
643 | 1189 | controllers = { | ||
644 | 1190 | eventloop.split(':')[0] | ||
645 | 1191 | for eventloop, _ in eventloops.items() | ||
646 | 1192 | } | ||
647 | 1193 | log.msg( | ||
648 | 1194 | "Fully connected to all %d event-loops on all %d " | ||
649 | 1195 | "region controllers (%s)." % ( | ||
650 | 1196 | len(eventloops), len(controllers), | ||
651 | 1197 | ', '.join(sorted(controllers)))) | ||
652 | 1198 | |||
653 | 1199 | # Drop all connections at once, as the are no longer required. | ||
654 | 1200 | if len(drop) > 0: | ||
655 | 1201 | log.msg("Dropping connections to event-loops: %s" % ( | ||
656 | 1202 | ', '.join(drop.keys()))) | ||
657 | 1203 | yield DeferredList([ | ||
658 | 1204 | maybeDeferred(self._drop_connection, connection) | ||
659 | 1205 | for eventloop, connections in drop.items() | ||
660 | 1206 | for connection in connections | ||
661 | 1207 | ], consumeErrors=True) | ||
662 | 1208 | |||
663 | 1209 | # Make all the new connections to each endpoint at the same time. | ||
664 | 1210 | if len(connect) > 0: | ||
665 | 1211 | log.msg("Making connections to event-loops: %s" % ( | ||
666 | 1212 | ', '.join(connect.keys()))) | ||
667 | 1213 | yield DeferredList([ | ||
668 | 1214 | self._make_connections(eventloop, addresses) | ||
669 | 1215 | for eventloop, addresses in connect.items() | ||
670 | 1216 | ], consumeErrors=True) | ||
671 | 1217 | |||
672 | 1218 | def _calculate_work(self, eventloops): | ||
673 | 1219 | """Calculate the work that needs to be performed for reconnection.""" | ||
674 | 1220 | drop, connect = {}, {} | ||
675 | 1221 | |||
676 | 1175 | # Drop connections to event-loops that no longer include one of | 1222 | # Drop connections to event-loops that no longer include one of |
677 | 1176 | # this cluster's established connections among its advertised | 1223 | # this cluster's established connections among its advertised |
678 | 1177 | # endpoints. This is most likely to have happened because of | 1224 | # endpoints. This is most likely to have happened because of |
679 | @@ -1183,23 +1230,20 @@ class ClusterClientService(TimerService, object): | |||
680 | 1183 | if eventloop in self.connections: | 1230 | if eventloop in self.connections: |
681 | 1184 | connection = self.connections[eventloop] | 1231 | connection = self.connections[eventloop] |
682 | 1185 | if connection.address not in addresses: | 1232 | if connection.address not in addresses: |
684 | 1186 | yield self._drop_connection(connection) | 1233 | drop[eventloop] = [connection] |
685 | 1234 | if eventloop in self.try_connections: | ||
686 | 1235 | connection = self.try_connections[eventloop] | ||
687 | 1236 | if connection.address not in addresses: | ||
688 | 1237 | drop[eventloop] = [connection] | ||
689 | 1238 | |||
690 | 1187 | # Create new connections to event-loops that the cluster does | 1239 | # Create new connections to event-loops that the cluster does |
693 | 1188 | # not yet have a connection to. Try each advertised endpoint | 1240 | # not yet have a connection to. |
692 | 1189 | # (address) in turn until one of them bites. | ||
694 | 1190 | for eventloop, addresses in eventloops.items(): | 1241 | for eventloop, addresses in eventloops.items(): |
707 | 1191 | if eventloop not in self.connections: | 1242 | if ((eventloop not in self.connections and |
708 | 1192 | for address in addresses: | 1243 | eventloop not in self.try_connections) or |
709 | 1193 | try: | 1244 | eventloop in drop): |
710 | 1194 | yield self._make_connection(eventloop, address) | 1245 | connect[eventloop] = addresses |
711 | 1195 | except ConnectError as error: | 1246 | |
700 | 1196 | host, port = address | ||
701 | 1197 | log.msg("Event-loop %s (%s:%d): %s" % ( | ||
702 | 1198 | eventloop, host, port, error)) | ||
703 | 1199 | except: | ||
704 | 1200 | log.err(None, "Failure making new RPC connection.") | ||
705 | 1201 | else: | ||
706 | 1202 | break | ||
712 | 1203 | # Remove connections to event-loops that are no longer | 1247 | # Remove connections to event-loops that are no longer |
713 | 1204 | # advertised by the RPC info view. Most likely this means that | 1248 | # advertised by the RPC info view. Most likely this means that |
714 | 1205 | # the process in which the event-loop is no longer running, but | 1249 | # the process in which the event-loop is no longer running, but |
715 | @@ -1208,7 +1252,32 @@ class ClusterClientService(TimerService, object): | |||
716 | 1208 | for eventloop in self.connections: | 1252 | for eventloop in self.connections: |
717 | 1209 | if eventloop not in eventloops: | 1253 | if eventloop not in eventloops: |
718 | 1210 | connection = self.connections[eventloop] | 1254 | connection = self.connections[eventloop] |
720 | 1211 | yield self._drop_connection(connection) | 1255 | drop[eventloop] = [connection] |
721 | 1256 | for eventloop in self.try_connections: | ||
722 | 1257 | if eventloop not in eventloops: | ||
723 | 1258 | connection = self.try_connections[eventloop] | ||
724 | 1259 | drop[eventloop] = [connection] | ||
725 | 1260 | |||
726 | 1261 | return drop, connect | ||
727 | 1262 | |||
728 | 1263 | @inlineCallbacks | ||
729 | 1264 | def _make_connections(self, eventloop, addresses): | ||
730 | 1265 | """Connect to `eventloop` using all `addresses`.""" | ||
731 | 1266 | for address in addresses: | ||
732 | 1267 | try: | ||
733 | 1268 | connection = yield self._make_connection(eventloop, address) | ||
734 | 1269 | except ConnectError as error: | ||
735 | 1270 | host, port = address | ||
736 | 1271 | log.msg("Event-loop %s (%s:%d): %s" % ( | ||
737 | 1272 | eventloop, host, port, error)) | ||
738 | 1273 | except: | ||
739 | 1274 | host, port = address | ||
740 | 1275 | log.err(None, ( | ||
741 | 1276 | "Failure with event-loop %s (%s:%d)" % ( | ||
742 | 1277 | eventloop, host, port))) | ||
743 | 1278 | else: | ||
744 | 1279 | self.try_connections[eventloop] = connection | ||
745 | 1280 | break | ||
746 | 1212 | 1281 | ||
747 | 1213 | def _make_connection(self, eventloop, address): | 1282 | def _make_connection(self, eventloop, address): |
748 | 1214 | """Connect to `eventloop` at `address`.""" | 1283 | """Connect to `eventloop` at `address`.""" |
749 | @@ -1227,6 +1296,9 @@ class ClusterClientService(TimerService, object): | |||
750 | 1227 | If this is the last connection that was keeping rackd connected to | 1296 | If this is the last connection that was keeping rackd connected to |
751 | 1228 | a regiond then dhcpd and dhcpd6 services will be turned off. | 1297 | a regiond then dhcpd and dhcpd6 services will be turned off. |
752 | 1229 | """ | 1298 | """ |
753 | 1299 | if eventloop in self.try_connections: | ||
754 | 1300 | if self.try_connections[eventloop] is connection: | ||
755 | 1301 | del self.try_connections[eventloop] | ||
756 | 1230 | if eventloop in self.connections: | 1302 | if eventloop in self.connections: |
757 | 1231 | if self.connections[eventloop] is connection: | 1303 | if self.connections[eventloop] is connection: |
758 | 1232 | del self.connections[eventloop] | 1304 | del self.connections[eventloop] |
759 | @@ -1242,7 +1314,7 @@ class ClusterClientService(TimerService, object): | |||
760 | 1242 | dhcp_v6.off() | 1314 | dhcp_v6.off() |
761 | 1243 | stopping_services.append("dhcpd6") | 1315 | stopping_services.append("dhcpd6") |
762 | 1244 | if len(stopping_services) > 0: | 1316 | if len(stopping_services) > 0: |
764 | 1245 | maaslog.error( | 1317 | log.msg( |
765 | 1246 | "Lost all connections to region controllers. " | 1318 | "Lost all connections to region controllers. " |
766 | 1247 | "Stopping service(s) %s." % ",".join(stopping_services)) | 1319 | "Stopping service(s) %s." % ",".join(stopping_services)) |
767 | 1248 | service_monitor.ensureServices() | 1320 | service_monitor.ensureServices() |
768 | diff --git a/src/provisioningserver/rpc/tests/test_clusterservice.py b/src/provisioningserver/rpc/tests/test_clusterservice.py | |||
769 | index e12a107..f8177f4 100644 | |||
770 | --- a/src/provisioningserver/rpc/tests/test_clusterservice.py | |||
771 | +++ b/src/provisioningserver/rpc/tests/test_clusterservice.py | |||
772 | @@ -613,7 +613,9 @@ class TestClusterClientService(MAASTestCase): | |||
773 | 613 | @inlineCallbacks | 613 | @inlineCallbacks |
774 | 614 | def test__update_connections_initially(self): | 614 | def test__update_connections_initially(self): |
775 | 615 | service = ClusterClientService(Clock()) | 615 | service = ClusterClientService(Clock()) |
776 | 616 | mock_client = Mock() | ||
777 | 616 | _make_connection = self.patch(service, "_make_connection") | 617 | _make_connection = self.patch(service, "_make_connection") |
778 | 618 | _make_connection.side_effect = lambda *args: succeed(mock_client) | ||
779 | 617 | _drop_connection = self.patch(service, "_drop_connection") | 619 | _drop_connection = self.patch(service, "_drop_connection") |
780 | 618 | 620 | ||
781 | 619 | info = json.loads(self.example_rpc_info_view_response.decode("ascii")) | 621 | info = json.loads(self.example_rpc_info_view_response.decode("ascii")) |
782 | @@ -627,10 +629,40 @@ class TestClusterClientService(MAASTestCase): | |||
783 | 627 | self.assertItemsEqual( | 629 | self.assertItemsEqual( |
784 | 628 | _make_connection_expected, | 630 | _make_connection_expected, |
785 | 629 | _make_connection.call_args_list) | 631 | _make_connection.call_args_list) |
786 | 632 | self.assertEquals({ | ||
787 | 633 | "host1:pid=1001": mock_client, | ||
788 | 634 | "host1:pid=2002": mock_client, | ||
789 | 635 | "host2:pid=3003": mock_client, | ||
790 | 636 | }, service.try_connections) | ||
791 | 630 | 637 | ||
792 | 631 | self.assertEqual([], _drop_connection.mock_calls) | 638 | self.assertEqual([], _drop_connection.mock_calls) |
793 | 632 | 639 | ||
794 | 633 | @inlineCallbacks | 640 | @inlineCallbacks |
795 | 641 | def test__update_connections_logs_fully_connected(self): | ||
796 | 642 | service = ClusterClientService(Clock()) | ||
797 | 643 | eventloops = { | ||
798 | 644 | "region1:123": [("::ffff:127.0.0.1", 1234)], | ||
799 | 645 | "region1:124": [("::ffff:127.0.0.1", 1235)], | ||
800 | 646 | "region2:123": [("::ffff:127.0.0.2", 1234)], | ||
801 | 647 | "region2:124": [("::ffff:127.0.0.2", 1235)], | ||
802 | 648 | } | ||
803 | 649 | for eventloop, addresses in eventloops.items(): | ||
804 | 650 | for address in addresses: | ||
805 | 651 | client = Mock() | ||
806 | 652 | client.address = address | ||
807 | 653 | service.connections[eventloop] = client | ||
808 | 654 | |||
809 | 655 | logger = self.useFixture(TwistedLoggerFixture()) | ||
810 | 656 | |||
811 | 657 | yield service._update_connections(eventloops) | ||
812 | 658 | # Second call should not add it to the log. | ||
813 | 659 | yield service._update_connections(eventloops) | ||
814 | 660 | |||
815 | 661 | self.assertEqual( | ||
816 | 662 | "Fully connected to all 4 event-loops on all 2 region " | ||
817 | 663 | "controllers (region1, region2).", logger.dump()) | ||
818 | 664 | |||
819 | 665 | @inlineCallbacks | ||
820 | 634 | def test__update_connections_connect_error_is_logged_tersely(self): | 666 | def test__update_connections_connect_error_is_logged_tersely(self): |
821 | 635 | service = ClusterClientService(Clock()) | 667 | service = ClusterClientService(Clock()) |
822 | 636 | _make_connection = self.patch(service, "_make_connection") | 668 | _make_connection = self.patch(service, "_make_connection") |
823 | @@ -646,6 +678,8 @@ class TestClusterClientService(MAASTestCase): | |||
824 | 646 | MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234))) | 678 | MockCalledOnceWith("an-event-loop", ("::ffff:127.0.0.1", 1234))) |
825 | 647 | 679 | ||
826 | 648 | self.assertEqual( | 680 | self.assertEqual( |
827 | 681 | "Making connections to event-loops: an-event-loop\n" | ||
828 | 682 | "---\n" | ||
829 | 649 | "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection " | 683 | "Event-loop an-event-loop (::ffff:127.0.0.1:1234): Connection " |
830 | 650 | "was refused by other side.", logger.dump()) | 684 | "was refused by other side.", logger.dump()) |
831 | 651 | 685 | ||
832 | @@ -666,7 +700,9 @@ class TestClusterClientService(MAASTestCase): | |||
833 | 666 | 700 | ||
834 | 667 | self.assertDocTestMatches( | 701 | self.assertDocTestMatches( |
835 | 668 | """\ | 702 | """\ |
837 | 669 | Failure making new RPC connection. | 703 | Making connections to event-loops: an-event-loop |
838 | 704 | --- | ||
839 | 705 | Failure with event-loop an-event-loop (::ffff:127.0.0.1:1234) | ||
840 | 670 | Traceback (most recent call last): | 706 | Traceback (most recent call last): |
841 | 671 | ... | 707 | ... |
842 | 672 | builtins.RuntimeError: Something went wrong. | 708 | builtins.RuntimeError: Something went wrong. |
843 | @@ -774,6 +810,15 @@ class TestClusterClientService(MAASTestCase): | |||
844 | 774 | connection.transport.loseConnection, | 810 | connection.transport.loseConnection, |
845 | 775 | MockCalledOnceWith()) | 811 | MockCalledOnceWith()) |
846 | 776 | 812 | ||
847 | 813 | def test__remove_connection_removes_from_try_connections(self): | ||
848 | 814 | service = make_inert_client_service() | ||
849 | 815 | service.startService() | ||
850 | 816 | endpoint = Mock() | ||
851 | 817 | connection = Mock() | ||
852 | 818 | service.try_connections[endpoint] = connection | ||
853 | 819 | service.remove_connection(endpoint, connection) | ||
854 | 820 | self.assertThat(service.try_connections, Equals({})) | ||
855 | 821 | |||
856 | 777 | def test__remove_connection_removes_from_connections(self): | 822 | def test__remove_connection_removes_from_connections(self): |
857 | 778 | service = make_inert_client_service() | 823 | service = make_inert_client_service() |
858 | 779 | service.startService() | 824 | service.startService() |
859 | @@ -1040,6 +1085,7 @@ class TestClusterClient(MAASTestCase): | |||
860 | 1040 | 1085 | ||
861 | 1041 | def test_connecting(self): | 1086 | def test_connecting(self): |
862 | 1042 | client = self.make_running_client() | 1087 | client = self.make_running_client() |
863 | 1088 | client.service.try_connections[client.eventloop] = client | ||
864 | 1043 | self.patch_authenticate_for_success(client) | 1089 | self.patch_authenticate_for_success(client) |
865 | 1044 | self.patch_register_for_success(client) | 1090 | self.patch_register_for_success(client) |
866 | 1045 | self.assertEqual(client.service.connections, {}) | 1091 | self.assertEqual(client.service.connections, {}) |
867 | @@ -1053,6 +1099,7 @@ class TestClusterClient(MAASTestCase): | |||
868 | 1053 | self.assertTrue(extract_result(wait_for_authenticated)) | 1099 | self.assertTrue(extract_result(wait_for_authenticated)) |
869 | 1054 | # ready has been set with the name of the event-loop. | 1100 | # ready has been set with the name of the event-loop. |
870 | 1055 | self.assertEqual(client.eventloop, extract_result(wait_for_ready)) | 1101 | self.assertEqual(client.eventloop, extract_result(wait_for_ready)) |
871 | 1102 | self.assertEqual(client.service.try_connections, {}) | ||
872 | 1056 | self.assertEqual( | 1103 | self.assertEqual( |
873 | 1057 | client.service.connections, | 1104 | client.service.connections, |
874 | 1058 | {client.eventloop: client}) | 1105 | {client.eventloop: client}) |
Currently running in the CI to verify a good backport.
http:// 162.213. 35.104: 8080/job/ maas-git- manual/ 78/console