Merge ~r00ta/maas:lp-2045228-fix-concurrent-dns-updates-3.3 into maas:3.3
- Git
- lp:~r00ta/maas
- lp-2045228-fix-concurrent-dns-updates-3.3
- Merge into 3.3
Proposed by
Jacopo Rota
Status: | Merged |
---|---|
Approved by: | Jacopo Rota |
Approved revision: | 3e2c197c8b316a54c93a0705b51d9afb0690b179 |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~r00ta/maas:lp-2045228-fix-concurrent-dns-updates-3.3 |
Merge into: | maas:3.3 |
Diff against target: |
719 lines (+163/-77) 7 files modified
src/maasserver/eventloop.py (+8/-3) src/maasserver/region_controller.py (+21/-13) src/maasserver/tests/test_eventloop.py (+13/-1) src/maasserver/tests/test_plugin.py (+2/-0) src/maasserver/tests/test_region_controller.py (+73/-56) src/maasserver/utils/dbtasks.py (+25/-4) src/maasserver/utils/tests/test_dbtasks.py (+21/-0) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Jacopo Rota | Approve | ||
MAAS Lander | Approve | ||
Review via email: mp+457574@code.launchpad.net |
Commit message
fix: lp-2045228 race condition in postgres listener for dns dynamic updates
Description of the change
Cherry pick of 5d4ea79dd26504f
To post a comment you must log in.
Revision history for this message
Jacopo Rota (r00ta) wrote : | # |
self approving backport
Revision history for this message
Jacopo Rota (r00ta) wrote : | # |
self approving backport
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/src/maasserver/eventloop.py b/src/maasserver/eventloop.py |
2 | index 753c66a..0052348 100644 |
3 | --- a/src/maasserver/eventloop.py |
4 | +++ b/src/maasserver/eventloop.py |
5 | @@ -67,10 +67,10 @@ def make_DatabaseTaskService(): |
6 | return dbtasks.DatabaseTasksService() |
7 | |
8 | |
9 | -def make_RegionControllerService(postgresListener): |
10 | +def make_RegionControllerService(postgresListener, dbtasks): |
11 | from maasserver.region_controller import RegionControllerService |
12 | |
13 | - return RegionControllerService(postgresListener) |
14 | + return RegionControllerService(postgresListener, dbtasks) |
15 | |
16 | |
17 | def make_RegionService(ipcWorker): |
18 | @@ -287,10 +287,15 @@ class RegionEventLoop: |
19 | "factory": make_DatabaseTaskService, |
20 | "requires": [], |
21 | }, |
22 | + "database-tasks-master": { |
23 | + "only_on_master": True, |
24 | + "factory": make_DatabaseTaskService, |
25 | + "requires": [], |
26 | + }, |
27 | "region-controller": { |
28 | "only_on_master": True, |
29 | "factory": make_RegionControllerService, |
30 | - "requires": ["postgres-listener-master"], |
31 | + "requires": ["postgres-listener-master", "database-tasks-master"], |
32 | }, |
33 | "rpc": { |
34 | "only_on_master": False, |
35 | diff --git a/src/maasserver/region_controller.py b/src/maasserver/region_controller.py |
36 | index 26e2c21..756dfbd 100644 |
37 | --- a/src/maasserver/region_controller.py |
38 | +++ b/src/maasserver/region_controller.py |
39 | @@ -78,6 +78,7 @@ class RegionControllerService(Service): |
40 | def __init__( |
41 | self, |
42 | postgresListener, |
43 | + dbtasks, |
44 | clock=reactor, |
45 | retryOnFailure=True, |
46 | rbacRetryOnFailureDelay=10, |
47 | @@ -103,6 +104,7 @@ class RegionControllerService(Service): |
48 | self._dns_requires_full_reload = True |
49 | self._dns_latest_serial = None |
50 | self.postgresListener = postgresListener |
51 | + self.dbtasks = dbtasks |
52 | self.dnsResolver = Resolver( |
53 | resolv=None, |
54 | servers=[("127.0.0.1", 53)], |
55 | @@ -175,27 +177,33 @@ class RegionControllerService(Service): |
56 | ) |
57 | eventloop.restart() |
58 | |
59 | - @asynchronous(timeout=FOREVER) |
60 | - @inlineCallbacks |
61 | def queueDynamicDNSUpdate(self, channel, message): |
62 | """ |
63 | Called when the `sys_dns_update` message is received |
64 | - and queues updates for existing domains |
65 | + and queues updates for existing domains. |
66 | + The updates are offloaded to the DatabaseTasksService in order to |
67 | + process them in sequence and keep consuming the next postgres notifications. |
68 | """ |
69 | + |
70 | + def updateCallback(data): |
71 | + (new_updates, need_reload) = data |
72 | + self._dns_requires_full_reload = ( |
73 | + self._dns_requires_full_reload or need_reload |
74 | + ) |
75 | + if self._dns_update_in_progress: |
76 | + self._queued_updates += new_updates |
77 | + else: |
78 | + self._dns_updates += new_updates |
79 | + |
80 | + log.debug("Start processing dynamic DNS update '{}'".format(message)) |
81 | if message == "": |
82 | return |
83 | |
84 | - (new_updates, need_reload) = yield deferToDatabase( |
85 | - process_dns_update_notify, message |
86 | - ) |
87 | - |
88 | - self._dns_requires_full_reload = ( |
89 | - self._dns_requires_full_reload or need_reload |
90 | + self.dbtasks.deferTaskWithCallbacks( |
91 | + process_dns_update_notify, |
92 | + [updateCallback], |
93 | + message, |
94 | ) |
95 | - if self._dns_update_in_progress: |
96 | - self._queued_updates += new_updates |
97 | - else: |
98 | - self._dns_updates += new_updates |
99 | |
100 | def startProcessing(self): |
101 | """Start the process looping call.""" |
102 | diff --git a/src/maasserver/tests/test_eventloop.py b/src/maasserver/tests/test_eventloop.py |
103 | index 691ff01..02c1ac4 100644 |
104 | --- a/src/maasserver/tests/test_eventloop.py |
105 | +++ b/src/maasserver/tests/test_eventloop.py |
106 | @@ -368,9 +368,21 @@ class TestFactories(MAASServerTestCase): |
107 | eventloop.loop.factories["database-tasks"]["only_on_master"] |
108 | ) |
109 | |
110 | + def test_make_MasterDatabaseTaskService(self): |
111 | + service = eventloop.make_DatabaseTaskService() |
112 | + self.assertIsInstance(service, dbtasks.DatabaseTasksService) |
113 | + # It is registered as a factory in RegionEventLoop. |
114 | + self.assertIs( |
115 | + eventloop.make_DatabaseTaskService, |
116 | + eventloop.loop.factories["database-tasks-master"]["factory"], |
117 | + ) |
118 | + self.assertTrue( |
119 | + eventloop.loop.factories["database-tasks-master"]["only_on_master"] |
120 | + ) |
121 | + |
122 | def test_make_RegionControllerService(self): |
123 | service = eventloop.make_RegionControllerService( |
124 | - sentinel.postgresListener |
125 | + sentinel.postgresListener, sentinel.dbtasks |
126 | ) |
127 | self.assertIsInstance( |
128 | service, region_controller.RegionControllerService |
129 | diff --git a/src/maasserver/tests/test_plugin.py b/src/maasserver/tests/test_plugin.py |
130 | index 6c2572c..ba32ca5 100644 |
131 | --- a/src/maasserver/tests/test_plugin.py |
132 | +++ b/src/maasserver/tests/test_plugin.py |
133 | @@ -237,6 +237,7 @@ class TestRegionMasterServiceMaker(TestServiceMaker): |
134 | service = service_maker.makeService(options) |
135 | self.assertIsInstance(service, MultiService) |
136 | expected_services = { |
137 | + "database-tasks-master", |
138 | "region-controller", |
139 | "nonce-cleanup", |
140 | "dns-publication-cleanup", |
141 | @@ -365,6 +366,7 @@ class TestRegionAllInOneServiceMaker(TestServiceMaker): |
142 | "web", |
143 | "ipc-worker", |
144 | # Master services. |
145 | + "database-tasks-master", |
146 | "region-controller", |
147 | "nonce-cleanup", |
148 | "dns-publication-cleanup", |
149 | diff --git a/src/maasserver/tests/test_region_controller.py b/src/maasserver/tests/test_region_controller.py |
150 | index fd974e8..b688e58 100644 |
151 | --- a/src/maasserver/tests/test_region_controller.py |
152 | +++ b/src/maasserver/tests/test_region_controller.py |
153 | @@ -29,6 +29,7 @@ from maasserver.testing.testcase import ( |
154 | MAASServerTestCase, |
155 | MAASTransactionServerTestCase, |
156 | ) |
157 | +from maasserver.utils.dbtasks import DatabaseTasksService |
158 | from maasserver.utils.threads import deferToDatabase |
159 | from maastesting.crochet import wait_for |
160 | from maastesting.matchers import ( |
161 | @@ -43,12 +44,12 @@ wait_for_reactor = wait_for() |
162 | |
163 | |
164 | class TestRegionControllerService(MAASServerTestCase): |
165 | - def make_service(self, listener): |
166 | + def make_service(self, listener=MagicMock(), dbtasks=MagicMock()): |
167 | # Don't retry on failure or the tests will loop forever. |
168 | - return RegionControllerService(listener, retryOnFailure=False) |
169 | + return RegionControllerService(listener, dbtasks, retryOnFailure=False) |
170 | |
171 | def test_init_sets_properties(self): |
172 | - service = self.make_service(sentinel.listener) |
173 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
174 | self.assertThat( |
175 | service, |
176 | MatchesStructure.byEquality( |
177 | @@ -56,6 +57,7 @@ class TestRegionControllerService(MAASServerTestCase): |
178 | processingDefer=None, |
179 | needsDNSUpdate=True, |
180 | postgresListener=sentinel.listener, |
181 | + dbtasks=sentinel.dbtasks, |
182 | ), |
183 | ) |
184 | |
185 | @@ -107,31 +109,27 @@ class TestRegionControllerService(MAASServerTestCase): |
186 | @wait_for_reactor |
187 | @inlineCallbacks |
188 | def test_stopService_handles_canceling_processing(self): |
189 | - listener = MagicMock() |
190 | - service = self.make_service(listener) |
191 | + service = self.make_service() |
192 | service.startProcessing() |
193 | yield service.stopService() |
194 | self.assertIsNone(service.processingDefer) |
195 | |
196 | def test_markDNSForUpdate_sets_needsDNSUpdate_and_starts_process(self): |
197 | - listener = MagicMock() |
198 | - service = self.make_service(listener) |
199 | + service = self.make_service() |
200 | mock_startProcessing = self.patch(service, "startProcessing") |
201 | service.markDNSForUpdate(None, None) |
202 | self.assertTrue(service.needsDNSUpdate) |
203 | self.assertThat(mock_startProcessing, MockCalledOnceWith()) |
204 | |
205 | def test_markProxyForUpdate_sets_needsProxyUpdate_and_starts_process(self): |
206 | - listener = MagicMock() |
207 | - service = self.make_service(listener) |
208 | + service = self.make_service() |
209 | mock_startProcessing = self.patch(service, "startProcessing") |
210 | service.markProxyForUpdate(None, None) |
211 | self.assertTrue(service.needsProxyUpdate) |
212 | self.assertThat(mock_startProcessing, MockCalledOnceWith()) |
213 | |
214 | def test_markRBACForUpdate_sets_needsRBACUpdate_and_starts_process(self): |
215 | - listener = MagicMock() |
216 | - service = self.make_service(listener) |
217 | + service = self.make_service() |
218 | mock_startProcessing = self.patch(service, "startProcessing") |
219 | service.markRBACForUpdate(None, None) |
220 | self.assertTrue(service.needsRBACUpdate) |
221 | @@ -139,19 +137,19 @@ class TestRegionControllerService(MAASServerTestCase): |
222 | |
223 | def test_restart_region_restarts_eventloop(self): |
224 | restart_mock = self.patch(eventloop, "restart") |
225 | - service = self.make_service(MagicMock()) |
226 | + service = self.make_service() |
227 | service.restartRegion("sys_vault_migration", "") |
228 | restart_mock.assert_called_once() |
229 | |
230 | def test_startProcessing_doesnt_call_start_when_looping_call_running(self): |
231 | - service = self.make_service(sentinel.listener) |
232 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
233 | mock_start = self.patch(service.processing, "start") |
234 | service.processing.running = True |
235 | service.startProcessing() |
236 | self.assertThat(mock_start, MockNotCalled()) |
237 | |
238 | def test_startProcessing_calls_start_when_looping_call_not_running(self): |
239 | - service = self.make_service(sentinel.listener) |
240 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
241 | mock_start = self.patch(service.processing, "start") |
242 | service.startProcessing() |
243 | self.assertThat(mock_start, MockCalledOnceWith(0.1, now=False)) |
244 | @@ -159,7 +157,7 @@ class TestRegionControllerService(MAASServerTestCase): |
245 | @wait_for_reactor |
246 | @inlineCallbacks |
247 | def test_reload_dns_on_start(self): |
248 | - service = self.make_service(sentinel.listener) |
249 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
250 | mock_dns_update_all_zones = self.patch( |
251 | region_controller, "dns_update_all_zones" |
252 | ) |
253 | @@ -177,7 +175,7 @@ class TestRegionControllerService(MAASServerTestCase): |
254 | @wait_for_reactor |
255 | @inlineCallbacks |
256 | def test_process_doesnt_update_zones_when_nothing_to_process(self): |
257 | - service = self.make_service(sentinel.listener) |
258 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
259 | service.needsDNSUpdate = False |
260 | mock_dns_update_all_zones = self.patch( |
261 | region_controller, "dns_update_all_zones" |
262 | @@ -189,7 +187,7 @@ class TestRegionControllerService(MAASServerTestCase): |
263 | @wait_for_reactor |
264 | @inlineCallbacks |
265 | def test_process_doesnt_proxy_update_config_when_nothing_to_process(self): |
266 | - service = self.make_service(sentinel.listener) |
267 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
268 | service.needsProxyUpdate = False |
269 | mock_proxy_update_config = self.patch( |
270 | region_controller, "proxy_update_config" |
271 | @@ -201,7 +199,7 @@ class TestRegionControllerService(MAASServerTestCase): |
272 | @wait_for_reactor |
273 | @inlineCallbacks |
274 | def test_process_doesnt_call_rbacSync_when_nothing_to_process(self): |
275 | - service = self.make_service(sentinel.listener) |
276 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
277 | service.needsRBACUpdate = False |
278 | mock_rbacSync = self.patch(service, "_rbacSync") |
279 | service.startProcessing() |
280 | @@ -211,7 +209,7 @@ class TestRegionControllerService(MAASServerTestCase): |
281 | @wait_for_reactor |
282 | @inlineCallbacks |
283 | def test_process_stops_processing(self): |
284 | - service = self.make_service(sentinel.listener) |
285 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
286 | service.needsDNSUpdate = False |
287 | service.startProcessing() |
288 | yield service.processingDefer |
289 | @@ -220,7 +218,7 @@ class TestRegionControllerService(MAASServerTestCase): |
290 | @wait_for_reactor |
291 | @inlineCallbacks |
292 | def test_process_updates_zones(self): |
293 | - service = self.make_service(sentinel.listener) |
294 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
295 | service.needsDNSUpdate = True |
296 | dns_result = ( |
297 | random.randint(1, 1000), |
298 | @@ -248,7 +246,7 @@ class TestRegionControllerService(MAASServerTestCase): |
299 | @wait_for_reactor |
300 | @inlineCallbacks |
301 | def test_process_zones_kills_bind_on_failed_reload(self): |
302 | - service = self.make_service(sentinel.listener) |
303 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
304 | service.needsDNSUpdate = True |
305 | service.retryOnFailure = True |
306 | dns_result_0 = ( |
307 | @@ -295,7 +293,7 @@ class TestRegionControllerService(MAASServerTestCase): |
308 | @wait_for_reactor |
309 | @inlineCallbacks |
310 | def test_process_updates_proxy(self): |
311 | - service = self.make_service(sentinel.listener) |
312 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
313 | service.needsProxyUpdate = True |
314 | mock_proxy_update_config = self.patch( |
315 | region_controller, "proxy_update_config" |
316 | @@ -314,7 +312,7 @@ class TestRegionControllerService(MAASServerTestCase): |
317 | @wait_for_reactor |
318 | @inlineCallbacks |
319 | def test_process_updates_rbac(self): |
320 | - service = self.make_service(sentinel.listener) |
321 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
322 | service.needsRBACUpdate = True |
323 | mock_rbacSync = self.patch(service, "_rbacSync") |
324 | mock_rbacSync.return_value = [] |
325 | @@ -330,7 +328,7 @@ class TestRegionControllerService(MAASServerTestCase): |
326 | @wait_for_reactor |
327 | @inlineCallbacks |
328 | def test_process_updates_zones_logs_failure(self): |
329 | - service = self.make_service(sentinel.listener) |
330 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
331 | service.needsDNSUpdate = True |
332 | mock_dns_update_all_zones = self.patch( |
333 | region_controller, "dns_update_all_zones" |
334 | @@ -349,7 +347,7 @@ class TestRegionControllerService(MAASServerTestCase): |
335 | @wait_for_reactor |
336 | @inlineCallbacks |
337 | def test_process_updates_proxy_logs_failure(self): |
338 | - service = self.make_service(sentinel.listener) |
339 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
340 | service.needsProxyUpdate = True |
341 | mock_proxy_update_config = self.patch( |
342 | region_controller, "proxy_update_config" |
343 | @@ -368,7 +366,7 @@ class TestRegionControllerService(MAASServerTestCase): |
344 | @wait_for_reactor |
345 | @inlineCallbacks |
346 | def test_process_updates_rbac_logs_failure(self): |
347 | - service = self.make_service(sentinel.listener) |
348 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
349 | service.needsRBACUpdate = True |
350 | mock_rbacSync = self.patch(service, "_rbacSync") |
351 | mock_rbacSync.side_effect = factory.make_exception() |
352 | @@ -383,7 +381,7 @@ class TestRegionControllerService(MAASServerTestCase): |
353 | @wait_for_reactor |
354 | @inlineCallbacks |
355 | def test_process_updates_rbac_retries_with_delay(self): |
356 | - service = self.make_service(sentinel.listener) |
357 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
358 | service.needsRBACUpdate = True |
359 | service.retryOnFailure = True |
360 | service.rbacRetryOnFailureDelay = random.randint(1, 10) |
361 | @@ -405,7 +403,7 @@ class TestRegionControllerService(MAASServerTestCase): |
362 | @wait_for_reactor |
363 | @inlineCallbacks |
364 | def test_process_updates_bind_proxy_and_rbac(self): |
365 | - service = self.make_service(sentinel.listener) |
366 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
367 | service.needsDNSUpdate = True |
368 | service.needsProxyUpdate = True |
369 | service.needsRBACUpdate = True |
370 | @@ -441,7 +439,7 @@ class TestRegionControllerService(MAASServerTestCase): |
371 | |
372 | @wait_for_reactor |
373 | def test_check_serial_doesnt_raise_error_on_successful_serial_match(self): |
374 | - service = self.make_service(sentinel.listener) |
375 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
376 | result_serial = random.randint(1, 1000) |
377 | formatted_serial = f"{result_serial:10d}" |
378 | dns_names = [factory.make_name("domain") for _ in range(3)] |
379 | @@ -472,7 +470,7 @@ class TestRegionControllerService(MAASServerTestCase): |
380 | @wait_for_reactor |
381 | @inlineCallbacks |
382 | def test_check_serial_raise_error_after_30_tries(self): |
383 | - service = self.make_service(sentinel.listener) |
384 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
385 | result_serial = random.randint(1, 1000) |
386 | formatted_serial = f"{result_serial:10d}" |
387 | dns_names = [factory.make_name("domain") for _ in range(3)] |
388 | @@ -487,7 +485,7 @@ class TestRegionControllerService(MAASServerTestCase): |
389 | @wait_for_reactor |
390 | @inlineCallbacks |
391 | def test_check_serial_handles_ValueError(self): |
392 | - service = self.make_service(sentinel.listener) |
393 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
394 | result_serial = random.randint(1, 1000) |
395 | formatted_serial = f"{result_serial:10d}" |
396 | dns_names = [factory.make_name("domain") for _ in range(3)] |
397 | @@ -502,7 +500,7 @@ class TestRegionControllerService(MAASServerTestCase): |
398 | @wait_for_reactor |
399 | @inlineCallbacks |
400 | def test_check_serial_handles_TimeoutError(self): |
401 | - service = self.make_service(sentinel.listener) |
402 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
403 | result_serial = random.randint(1, 1000) |
404 | formatted_serial = f"{result_serial:10d}" |
405 | dns_names = [factory.make_name("domain") for _ in range(3)] |
406 | @@ -515,7 +513,7 @@ class TestRegionControllerService(MAASServerTestCase): |
407 | yield service._checkSerial((formatted_serial, True, dns_names)) |
408 | |
409 | def test_getRBACClient_returns_None_when_no_url(self): |
410 | - service = self.make_service(sentinel.listener) |
411 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
412 | service.rbacClient = sentinel.client |
413 | SecretManager().delete_secret("external-auth") |
414 | self.assertIsNone(service._getRBACClient()) |
415 | @@ -526,7 +524,7 @@ class TestRegionControllerService(MAASServerTestCase): |
416 | SecretManager().set_composite_secret( |
417 | "external-auth", {"rbac-url": "http://rbac.example.com"} |
418 | ) |
419 | - service = self.make_service(sentinel.listener) |
420 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
421 | client = service._getRBACClient() |
422 | self.assertIsNotNone(client) |
423 | self.assertIs(client, service.rbacClient) |
424 | @@ -537,7 +535,7 @@ class TestRegionControllerService(MAASServerTestCase): |
425 | SecretManager().set_composite_secret( |
426 | "external-auth", {"rbac-url": "http://rbac.example.com"} |
427 | ) |
428 | - service = self.make_service(sentinel.listener) |
429 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
430 | client = service._getRBACClient() |
431 | SecretManager().set_composite_secret( |
432 | "external-auth", {"rbac-url": "http://other.example.com"} |
433 | @@ -552,7 +550,7 @@ class TestRegionControllerService(MAASServerTestCase): |
434 | SecretManager().set_composite_secret( |
435 | "external-auth", {"rbac-url": "http://rbac.example.com"} |
436 | ) |
437 | - service = self.make_service(sentinel.listener) |
438 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
439 | client = service._getRBACClient() |
440 | mock_get_auth_info.return_value = MagicMock() |
441 | new_client = service._getRBACClient() |
442 | @@ -561,7 +559,7 @@ class TestRegionControllerService(MAASServerTestCase): |
443 | self.assertIs(new_client, service._getRBACClient()) |
444 | |
445 | def test_rbacNeedsFull(self): |
446 | - service = self.make_service(sentinel.listener) |
447 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
448 | changes = [ |
449 | RBACSync(action=RBAC_ACTION.ADD), |
450 | RBACSync(action=RBAC_ACTION.UPDATE), |
451 | @@ -571,7 +569,7 @@ class TestRegionControllerService(MAASServerTestCase): |
452 | self.assertTrue(service._rbacNeedsFull(changes)) |
453 | |
454 | def test_rbacDifference(self): |
455 | - service = self.make_service(sentinel.listener) |
456 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
457 | changes = [ |
458 | RBACSync( |
459 | action=RBAC_ACTION.UPDATE, resource_id=1, resource_name="r-1" |
460 | @@ -636,7 +634,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
461 | ] |
462 | |
463 | publications = yield deferToDatabase(_create_publications) |
464 | - service = RegionControllerService(sentinel.listener) |
465 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
466 | service.needsDNSUpdate = True |
467 | service.previousSerial = publications[0].serial |
468 | dns_result = ( |
469 | @@ -677,7 +675,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
470 | ] |
471 | |
472 | publications = yield deferToDatabase(_create_publications) |
473 | - service = RegionControllerService(sentinel.listener) |
474 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
475 | service.needsDNSUpdate = True |
476 | service.previousSerial = publications[0].serial |
477 | dns_result = ( |
478 | @@ -708,14 +706,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
479 | def test_rbacSync_returns_None_when_nothing_to_do(self): |
480 | RBACSync.objects.clear("resource-pool") |
481 | |
482 | - service = RegionControllerService(sentinel.listener) |
483 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
484 | service.rbacInit = True |
485 | self.assertIsNone(service._rbacSync()) |
486 | |
487 | def test_rbacSync_returns_None_and_clears_sync_when_no_client(self): |
488 | RBACSync.objects.create(resource_type="resource-pool") |
489 | |
490 | - service = RegionControllerService(sentinel.listener) |
491 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
492 | self.assertIsNone(service._rbacSync()) |
493 | self.assertFalse(RBACSync.objects.exists()) |
494 | |
495 | @@ -729,7 +727,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
496 | |
497 | rbac_client = MagicMock() |
498 | rbac_client.update_resources.return_value = "x-y-z" |
499 | - service = RegionControllerService(sentinel.listener) |
500 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
501 | self.patch(service, "_getRBACClient").return_value = rbac_client |
502 | |
503 | self.assertEqual([], service._rbacSync()) |
504 | @@ -748,7 +746,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
505 | |
506 | rbac_client = MagicMock() |
507 | rbac_client.update_resources.return_value = "x-y-z" |
508 | - service = RegionControllerService(sentinel.listener) |
509 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
510 | self.patch(service, "_getRBACClient").return_value = rbac_client |
511 | |
512 | self.assertEqual([], service._rbacSync()) |
513 | @@ -773,7 +771,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
514 | |
515 | rbac_client = MagicMock() |
516 | rbac_client.update_resources.return_value = "x-y-z" |
517 | - service = RegionControllerService(sentinel.listener) |
518 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
519 | self.patch(service, "_getRBACClient").return_value = rbac_client |
520 | service.rbacInit = True |
521 | |
522 | @@ -807,7 +805,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
523 | SyncConflictError(), |
524 | "x-y-z", |
525 | ] |
526 | - service = RegionControllerService(sentinel.listener) |
527 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
528 | self.patch(service, "_getRBACClient").return_value = rbac_client |
529 | service.rbacInit = True |
530 | |
531 | @@ -838,7 +836,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
532 | |
533 | rbac_client = MagicMock() |
534 | rbac_client.update_resources.return_value = "x-y-z" |
535 | - service = RegionControllerService(sentinel.listener) |
536 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
537 | self.patch(service, "_getRBACClient").return_value = rbac_client |
538 | service.rbacInit = True |
539 | |
540 | @@ -856,7 +854,9 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
541 | domain = yield deferToDatabase(factory.make_Domain) |
542 | update_result = (random.randint(0, 10), True, [domain.name]) |
543 | record = yield deferToDatabase(factory.make_DNSResource, domain=domain) |
544 | - service = RegionControllerService(sentinel.listener) |
545 | + dbtasks = DatabaseTasksService() |
546 | + dbtasks.startService() |
547 | + service = RegionControllerService(sentinel.listener, dbtasks) |
548 | |
549 | update_zones = self.patch(region_controller, "dns_update_all_zones") |
550 | update_zones.return_value = update_result |
551 | @@ -864,10 +864,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
552 | check_serial.return_value = succeed(update_result) |
553 | |
554 | service._dns_update_in_progress = True |
555 | - yield service.queueDynamicDNSUpdate( |
556 | + service.queueDynamicDNSUpdate( |
557 | factory.make_name(), |
558 | f"INSERT {domain.name} {record.name} A 30 10.10.10.10", |
559 | ) |
560 | + |
561 | + # Wait until all the dynamic updates are processed |
562 | + yield dbtasks.syncTask() |
563 | + |
564 | self.assertCountEqual(service._dns_updates, []) |
565 | self.assertCountEqual( |
566 | service._queued_updates, |
567 | @@ -902,10 +906,12 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
568 | @wait_for_reactor |
569 | @inlineCallbacks |
570 | def test_dns_is_set_to_update_when_queued_updates_are_present(self): |
571 | + dbtasks = DatabaseTasksService() |
572 | + dbtasks.startService() |
573 | domain = yield deferToDatabase(factory.make_Domain) |
574 | update_result = (random.randint(0, 10), True, [domain.name]) |
575 | record = yield deferToDatabase(factory.make_DNSResource, domain=domain) |
576 | - service = RegionControllerService(sentinel.listener) |
577 | + service = RegionControllerService(sentinel.listener, dbtasks) |
578 | |
579 | update_zones = self.patch(region_controller, "dns_update_all_zones") |
580 | update_zones.return_value = update_result |
581 | @@ -913,10 +919,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
582 | check_serial.return_value = succeed(update_result) |
583 | |
584 | service._dns_update_in_progress = True |
585 | - yield service.queueDynamicDNSUpdate( |
586 | + service.queueDynamicDNSUpdate( |
587 | factory.make_name(), |
588 | f"INSERT {domain.name} {record.name} A 30 10.10.10.10", |
589 | ) |
590 | + |
591 | + # Wait until all the dynamic updates are processed |
592 | + yield dbtasks.syncTask() |
593 | + |
594 | self.assertCountEqual(service._dns_updates, []) |
595 | expected_updates = [ |
596 | DynamicDNSUpdate( |
597 | @@ -958,7 +968,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
598 | def test_check_serial_is_skipped_if_a_newer_serial_exists(self): |
599 | domain = yield deferToDatabase(factory.make_Domain) |
600 | update_result = (random.randint(0, 10), True, [domain.name]) |
601 | - service = RegionControllerService(sentinel.listener) |
602 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
603 | |
604 | query = self.patch(service.dnsResolver, "lookupAuthority") |
605 | |
606 | @@ -968,12 +978,16 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
607 | |
608 | query.assert_not_called() |
609 | |
610 | + @wait_for_reactor |
611 | + @inlineCallbacks |
612 | def test_queueDynamicDNSUpdate_can_be_called_synchronously(self): |
613 | - domain = factory.make_Domain() |
614 | + dbtasks = DatabaseTasksService() |
615 | + dbtasks.startService() |
616 | + domain = yield deferToDatabase(factory.make_Domain) |
617 | update_result = (random.randint(0, 10), True, [domain.name]) |
618 | - record1 = factory.make_DNSResource(domain=domain) |
619 | - record2 = factory.make_DNSResource(domain=domain) |
620 | - service = RegionControllerService(sentinel.listener) |
621 | + record1 = yield deferToDatabase(factory.make_DNSResource, domain) |
622 | + record2 = yield deferToDatabase(factory.make_DNSResource, domain) |
623 | + service = RegionControllerService(sentinel.listener, dbtasks) |
624 | |
625 | update_zones = self.patch(region_controller, "dns_update_all_zones") |
626 | update_zones.return_value = update_result |
627 | @@ -998,6 +1012,9 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
628 | f"DELETE {domain.name} {record2.name} A 30 2.2.2.2", |
629 | ) |
630 | |
631 | + # Wait until all the dynamic updates are processed |
632 | + yield dbtasks.syncTask() |
633 | + |
634 | self.assertCountEqual( |
635 | service._dns_updates, |
636 | [ |
637 | diff --git a/src/maasserver/utils/dbtasks.py b/src/maasserver/utils/dbtasks.py |
638 | index 5863f02..04cbf7e 100644 |
639 | --- a/src/maasserver/utils/dbtasks.py |
640 | +++ b/src/maasserver/utils/dbtasks.py |
641 | @@ -26,14 +26,17 @@ class DatabaseTaskAlreadyRunning(Exception): |
642 | class DatabaseTasksService(Service): |
643 | """Run deferred database operations one at a time. |
644 | |
645 | - Once the service is started, `deferTask` and `addTask` can be used to |
646 | - queue up execution of a database task. |
647 | + Once the service is started, `deferTask`, `addTask` and |
648 | + `deferTaskWithCallbacks` can be used to queue up execution of a database task. |
649 | |
650 | - The former — `deferTask` — will return a `Deferred` that fires with the |
651 | + The former —`deferTaskWithCallbacks` — returns nothing, and can be used to add tasks with callbacks to the queue. |
652 | + Errors arising from this task are simply logged. |
653 | + |
654 | + `deferTask` — will return a `Deferred` that fires with the |
655 | result of the database task. Errors arising from this task become the |
656 | responsibility of the caller. |
657 | |
658 | - The latter — `addTask` — returns nothing, and will log errors arising from |
659 | + `addTask` — returns nothing, and will log errors arising from |
660 | the database task. |
661 | |
662 | Before this service has been started, and as soon as shutdown has |
663 | @@ -50,6 +53,24 @@ class DatabaseTasksService(Service): |
664 | self.queue = DeferredQueue(size=0, backlog=1) |
665 | |
666 | @asynchronous |
667 | + def deferTaskWithCallbacks(self, func, callbacks, *args, **kwargs): |
668 | + """Schedules `func` with callbacks to run later. |
669 | + |
670 | + :raise QueueOverflow: If the queue of tasks is full. |
671 | + :return: `None` |
672 | + """ |
673 | + |
674 | + def task(): |
675 | + d = deferToDatabase(func, *args, **kwargs) |
676 | + for callback in callbacks: |
677 | + d.addCallback(callback) |
678 | + d.addErrback(log.err, "Unhandled failure in database task.") |
679 | + return d |
680 | + |
681 | + self.queue.put(task) |
682 | + return None |
683 | + |
684 | + @asynchronous |
685 | def deferTask(self, func, *args, **kwargs): |
686 | """Schedules `func` to run later. |
687 | |
688 | diff --git a/src/maasserver/utils/tests/test_dbtasks.py b/src/maasserver/utils/tests/test_dbtasks.py |
689 | index 0f54e05..06335ef 100644 |
690 | --- a/src/maasserver/utils/tests/test_dbtasks.py |
691 | +++ b/src/maasserver/utils/tests/test_dbtasks.py |
692 | @@ -120,6 +120,27 @@ class TestDatabaseTaskService(MAASTestCase): |
693 | finally: |
694 | service.stopService() |
695 | |
696 | + def test_callbacks_are_called_from_deferredTask(self): |
697 | + def simple_function(input): |
698 | + return input + " world" |
699 | + |
700 | + def callback(data): |
701 | + sentinel.callback = data |
702 | + |
703 | + service = DatabaseTasksService() |
704 | + service.startService() |
705 | + try: |
706 | + service.deferTaskWithCallbacks( |
707 | + simple_function, [callback], "Hello" |
708 | + ) |
709 | + service.syncTask().wait(TIMEOUT) |
710 | + self.assertEqual( |
711 | + "Hello world", |
712 | + sentinel.callback, |
713 | + ) |
714 | + finally: |
715 | + service.stopService() |
716 | + |
717 | def test_tasks_are_all_run_before_shutdown_completes(self): |
718 | service = DatabaseTasksService() |
719 | service.startService() |
UNIT TESTS fix-concurrent- dns-updates- 3.3 lp:~r00ta/maas/+git/maas into -b 3.3 lp:~maas-committers/maas
-b lp-2045228-
STATUS: SUCCESS 4c93a0705b51d9a fb0690b179
COMMIT: 3e2c197c8b316a5