Merge ~r00ta/maas:lp-2045228-fix-concurrent-dns-updates into maas:master
- Git
- lp:~r00ta/maas
- lp-2045228-fix-concurrent-dns-updates
- Merge into master
Proposed by
Jacopo Rota
Status: | Merged |
---|---|
Approved by: | Jacopo Rota |
Approved revision: | a85a852185ec5351afa445b2e9e75fc04855a41a |
Merge reported by: | MAAS Lander |
Merged at revision: | not available |
Proposed branch: | ~r00ta/maas:lp-2045228-fix-concurrent-dns-updates |
Merge into: | maas:master |
Diff against target: |
719 lines (+163/-77) 7 files modified
src/maasserver/eventloop.py (+8/-3) src/maasserver/region_controller.py (+21/-13) src/maasserver/tests/test_eventloop.py (+13/-1) src/maasserver/tests/test_plugin.py (+2/-0) src/maasserver/tests/test_region_controller.py (+73/-56) src/maasserver/utils/dbtasks.py (+25/-4) src/maasserver/utils/tests/test_dbtasks.py (+21/-0) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Christian Grabowski | Approve | ||
MAAS Lander | Approve | ||
Review via email: mp+457554@code.launchpad.net |
Commit message
fix: lp-2045228 race condition in postgres listener for dns dynamic updates
Description of the change
Since the dns dynamic updates must be consumed in sequence by the postgres listener, we have to offload the notifications to a DatabaseTasksSe
To post a comment you must log in.
- a85a852... by Jacopo Rota
-
fix: lp-2045228 race condition in postgres listener for dns dynamic updates
Revision history for this message
Jacopo Rota (r00ta) wrote : | # |
jenkins: !test
Revision history for this message
MAAS Lander (maas-lander) wrote : | # |
UNIT TESTS
-b lp-2045228-
STATUS: SUCCESS
COMMIT: a85a852185ec535
review:
Approve
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/src/maasserver/eventloop.py b/src/maasserver/eventloop.py |
2 | index b4f9334..82be437 100644 |
3 | --- a/src/maasserver/eventloop.py |
4 | +++ b/src/maasserver/eventloop.py |
5 | @@ -67,10 +67,10 @@ def make_DatabaseTaskService(): |
6 | return dbtasks.DatabaseTasksService() |
7 | |
8 | |
9 | -def make_RegionControllerService(postgresListener): |
10 | +def make_RegionControllerService(postgresListener, dbtasks): |
11 | from maasserver.region_controller import RegionControllerService |
12 | |
13 | - return RegionControllerService(postgresListener) |
14 | + return RegionControllerService(postgresListener, dbtasks) |
15 | |
16 | |
17 | def make_RegionService(ipcWorker): |
18 | @@ -299,10 +299,15 @@ class RegionEventLoop: |
19 | "factory": make_DatabaseTaskService, |
20 | "requires": [], |
21 | }, |
22 | + "database-tasks-master": { |
23 | + "only_on_master": True, |
24 | + "factory": make_DatabaseTaskService, |
25 | + "requires": [], |
26 | + }, |
27 | "region-controller": { |
28 | "only_on_master": True, |
29 | "factory": make_RegionControllerService, |
30 | - "requires": ["postgres-listener-master"], |
31 | + "requires": ["postgres-listener-master", "database-tasks-master"], |
32 | }, |
33 | "rpc": { |
34 | "only_on_master": False, |
35 | diff --git a/src/maasserver/region_controller.py b/src/maasserver/region_controller.py |
36 | index 26e2c21..756dfbd 100644 |
37 | --- a/src/maasserver/region_controller.py |
38 | +++ b/src/maasserver/region_controller.py |
39 | @@ -78,6 +78,7 @@ class RegionControllerService(Service): |
40 | def __init__( |
41 | self, |
42 | postgresListener, |
43 | + dbtasks, |
44 | clock=reactor, |
45 | retryOnFailure=True, |
46 | rbacRetryOnFailureDelay=10, |
47 | @@ -103,6 +104,7 @@ class RegionControllerService(Service): |
48 | self._dns_requires_full_reload = True |
49 | self._dns_latest_serial = None |
50 | self.postgresListener = postgresListener |
51 | + self.dbtasks = dbtasks |
52 | self.dnsResolver = Resolver( |
53 | resolv=None, |
54 | servers=[("127.0.0.1", 53)], |
55 | @@ -175,27 +177,33 @@ class RegionControllerService(Service): |
56 | ) |
57 | eventloop.restart() |
58 | |
59 | - @asynchronous(timeout=FOREVER) |
60 | - @inlineCallbacks |
61 | def queueDynamicDNSUpdate(self, channel, message): |
62 | """ |
63 | Called when the `sys_dns_update` message is received |
64 | - and queues updates for existing domains |
65 | + and queues updates for existing domains. |
66 | + The updates are offloaded to the DatabaseTasksService in order to |
67 | + process them in sequence and keep consuming the next postgres notifications. |
68 | """ |
69 | + |
70 | + def updateCallback(data): |
71 | + (new_updates, need_reload) = data |
72 | + self._dns_requires_full_reload = ( |
73 | + self._dns_requires_full_reload or need_reload |
74 | + ) |
75 | + if self._dns_update_in_progress: |
76 | + self._queued_updates += new_updates |
77 | + else: |
78 | + self._dns_updates += new_updates |
79 | + |
80 | + log.debug("Start processing dynamic DNS update '{}'".format(message)) |
81 | if message == "": |
82 | return |
83 | |
84 | - (new_updates, need_reload) = yield deferToDatabase( |
85 | - process_dns_update_notify, message |
86 | - ) |
87 | - |
88 | - self._dns_requires_full_reload = ( |
89 | - self._dns_requires_full_reload or need_reload |
90 | + self.dbtasks.deferTaskWithCallbacks( |
91 | + process_dns_update_notify, |
92 | + [updateCallback], |
93 | + message, |
94 | ) |
95 | - if self._dns_update_in_progress: |
96 | - self._queued_updates += new_updates |
97 | - else: |
98 | - self._dns_updates += new_updates |
99 | |
100 | def startProcessing(self): |
101 | """Start the process looping call.""" |
102 | diff --git a/src/maasserver/tests/test_eventloop.py b/src/maasserver/tests/test_eventloop.py |
103 | index 691ff01..02c1ac4 100644 |
104 | --- a/src/maasserver/tests/test_eventloop.py |
105 | +++ b/src/maasserver/tests/test_eventloop.py |
106 | @@ -368,9 +368,21 @@ class TestFactories(MAASServerTestCase): |
107 | eventloop.loop.factories["database-tasks"]["only_on_master"] |
108 | ) |
109 | |
110 | + def test_make_MasterDatabaseTaskService(self): |
111 | + service = eventloop.make_DatabaseTaskService() |
112 | + self.assertIsInstance(service, dbtasks.DatabaseTasksService) |
113 | + # It is registered as a factory in RegionEventLoop. |
114 | + self.assertIs( |
115 | + eventloop.make_DatabaseTaskService, |
116 | + eventloop.loop.factories["database-tasks-master"]["factory"], |
117 | + ) |
118 | + self.assertTrue( |
119 | + eventloop.loop.factories["database-tasks-master"]["only_on_master"] |
120 | + ) |
121 | + |
122 | def test_make_RegionControllerService(self): |
123 | service = eventloop.make_RegionControllerService( |
124 | - sentinel.postgresListener |
125 | + sentinel.postgresListener, sentinel.dbtasks |
126 | ) |
127 | self.assertIsInstance( |
128 | service, region_controller.RegionControllerService |
129 | diff --git a/src/maasserver/tests/test_plugin.py b/src/maasserver/tests/test_plugin.py |
130 | index 5b2d8ac..6dfe177 100644 |
131 | --- a/src/maasserver/tests/test_plugin.py |
132 | +++ b/src/maasserver/tests/test_plugin.py |
133 | @@ -237,6 +237,7 @@ class TestRegionMasterServiceMaker(TestServiceMaker): |
134 | service = service_maker.makeService(options) |
135 | self.assertIsInstance(service, MultiService) |
136 | expected_services = { |
137 | + "database-tasks-master", |
138 | "region-controller", |
139 | "nonce-cleanup", |
140 | "dns-publication-cleanup", |
141 | @@ -367,6 +368,7 @@ class TestRegionAllInOneServiceMaker(TestServiceMaker): |
142 | "web", |
143 | "ipc-worker", |
144 | # Master services. |
145 | + "database-tasks-master", |
146 | "region-controller", |
147 | "nonce-cleanup", |
148 | "dns-publication-cleanup", |
149 | diff --git a/src/maasserver/tests/test_region_controller.py b/src/maasserver/tests/test_region_controller.py |
150 | index fd974e8..b688e58 100644 |
151 | --- a/src/maasserver/tests/test_region_controller.py |
152 | +++ b/src/maasserver/tests/test_region_controller.py |
153 | @@ -29,6 +29,7 @@ from maasserver.testing.testcase import ( |
154 | MAASServerTestCase, |
155 | MAASTransactionServerTestCase, |
156 | ) |
157 | +from maasserver.utils.dbtasks import DatabaseTasksService |
158 | from maasserver.utils.threads import deferToDatabase |
159 | from maastesting.crochet import wait_for |
160 | from maastesting.matchers import ( |
161 | @@ -43,12 +44,12 @@ wait_for_reactor = wait_for() |
162 | |
163 | |
164 | class TestRegionControllerService(MAASServerTestCase): |
165 | - def make_service(self, listener): |
166 | + def make_service(self, listener=MagicMock(), dbtasks=MagicMock()): |
167 | # Don't retry on failure or the tests will loop forever. |
168 | - return RegionControllerService(listener, retryOnFailure=False) |
169 | + return RegionControllerService(listener, dbtasks, retryOnFailure=False) |
170 | |
171 | def test_init_sets_properties(self): |
172 | - service = self.make_service(sentinel.listener) |
173 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
174 | self.assertThat( |
175 | service, |
176 | MatchesStructure.byEquality( |
177 | @@ -56,6 +57,7 @@ class TestRegionControllerService(MAASServerTestCase): |
178 | processingDefer=None, |
179 | needsDNSUpdate=True, |
180 | postgresListener=sentinel.listener, |
181 | + dbtasks=sentinel.dbtasks, |
182 | ), |
183 | ) |
184 | |
185 | @@ -107,31 +109,27 @@ class TestRegionControllerService(MAASServerTestCase): |
186 | @wait_for_reactor |
187 | @inlineCallbacks |
188 | def test_stopService_handles_canceling_processing(self): |
189 | - listener = MagicMock() |
190 | - service = self.make_service(listener) |
191 | + service = self.make_service() |
192 | service.startProcessing() |
193 | yield service.stopService() |
194 | self.assertIsNone(service.processingDefer) |
195 | |
196 | def test_markDNSForUpdate_sets_needsDNSUpdate_and_starts_process(self): |
197 | - listener = MagicMock() |
198 | - service = self.make_service(listener) |
199 | + service = self.make_service() |
200 | mock_startProcessing = self.patch(service, "startProcessing") |
201 | service.markDNSForUpdate(None, None) |
202 | self.assertTrue(service.needsDNSUpdate) |
203 | self.assertThat(mock_startProcessing, MockCalledOnceWith()) |
204 | |
205 | def test_markProxyForUpdate_sets_needsProxyUpdate_and_starts_process(self): |
206 | - listener = MagicMock() |
207 | - service = self.make_service(listener) |
208 | + service = self.make_service() |
209 | mock_startProcessing = self.patch(service, "startProcessing") |
210 | service.markProxyForUpdate(None, None) |
211 | self.assertTrue(service.needsProxyUpdate) |
212 | self.assertThat(mock_startProcessing, MockCalledOnceWith()) |
213 | |
214 | def test_markRBACForUpdate_sets_needsRBACUpdate_and_starts_process(self): |
215 | - listener = MagicMock() |
216 | - service = self.make_service(listener) |
217 | + service = self.make_service() |
218 | mock_startProcessing = self.patch(service, "startProcessing") |
219 | service.markRBACForUpdate(None, None) |
220 | self.assertTrue(service.needsRBACUpdate) |
221 | @@ -139,19 +137,19 @@ class TestRegionControllerService(MAASServerTestCase): |
222 | |
223 | def test_restart_region_restarts_eventloop(self): |
224 | restart_mock = self.patch(eventloop, "restart") |
225 | - service = self.make_service(MagicMock()) |
226 | + service = self.make_service() |
227 | service.restartRegion("sys_vault_migration", "") |
228 | restart_mock.assert_called_once() |
229 | |
230 | def test_startProcessing_doesnt_call_start_when_looping_call_running(self): |
231 | - service = self.make_service(sentinel.listener) |
232 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
233 | mock_start = self.patch(service.processing, "start") |
234 | service.processing.running = True |
235 | service.startProcessing() |
236 | self.assertThat(mock_start, MockNotCalled()) |
237 | |
238 | def test_startProcessing_calls_start_when_looping_call_not_running(self): |
239 | - service = self.make_service(sentinel.listener) |
240 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
241 | mock_start = self.patch(service.processing, "start") |
242 | service.startProcessing() |
243 | self.assertThat(mock_start, MockCalledOnceWith(0.1, now=False)) |
244 | @@ -159,7 +157,7 @@ class TestRegionControllerService(MAASServerTestCase): |
245 | @wait_for_reactor |
246 | @inlineCallbacks |
247 | def test_reload_dns_on_start(self): |
248 | - service = self.make_service(sentinel.listener) |
249 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
250 | mock_dns_update_all_zones = self.patch( |
251 | region_controller, "dns_update_all_zones" |
252 | ) |
253 | @@ -177,7 +175,7 @@ class TestRegionControllerService(MAASServerTestCase): |
254 | @wait_for_reactor |
255 | @inlineCallbacks |
256 | def test_process_doesnt_update_zones_when_nothing_to_process(self): |
257 | - service = self.make_service(sentinel.listener) |
258 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
259 | service.needsDNSUpdate = False |
260 | mock_dns_update_all_zones = self.patch( |
261 | region_controller, "dns_update_all_zones" |
262 | @@ -189,7 +187,7 @@ class TestRegionControllerService(MAASServerTestCase): |
263 | @wait_for_reactor |
264 | @inlineCallbacks |
265 | def test_process_doesnt_proxy_update_config_when_nothing_to_process(self): |
266 | - service = self.make_service(sentinel.listener) |
267 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
268 | service.needsProxyUpdate = False |
269 | mock_proxy_update_config = self.patch( |
270 | region_controller, "proxy_update_config" |
271 | @@ -201,7 +199,7 @@ class TestRegionControllerService(MAASServerTestCase): |
272 | @wait_for_reactor |
273 | @inlineCallbacks |
274 | def test_process_doesnt_call_rbacSync_when_nothing_to_process(self): |
275 | - service = self.make_service(sentinel.listener) |
276 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
277 | service.needsRBACUpdate = False |
278 | mock_rbacSync = self.patch(service, "_rbacSync") |
279 | service.startProcessing() |
280 | @@ -211,7 +209,7 @@ class TestRegionControllerService(MAASServerTestCase): |
281 | @wait_for_reactor |
282 | @inlineCallbacks |
283 | def test_process_stops_processing(self): |
284 | - service = self.make_service(sentinel.listener) |
285 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
286 | service.needsDNSUpdate = False |
287 | service.startProcessing() |
288 | yield service.processingDefer |
289 | @@ -220,7 +218,7 @@ class TestRegionControllerService(MAASServerTestCase): |
290 | @wait_for_reactor |
291 | @inlineCallbacks |
292 | def test_process_updates_zones(self): |
293 | - service = self.make_service(sentinel.listener) |
294 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
295 | service.needsDNSUpdate = True |
296 | dns_result = ( |
297 | random.randint(1, 1000), |
298 | @@ -248,7 +246,7 @@ class TestRegionControllerService(MAASServerTestCase): |
299 | @wait_for_reactor |
300 | @inlineCallbacks |
301 | def test_process_zones_kills_bind_on_failed_reload(self): |
302 | - service = self.make_service(sentinel.listener) |
303 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
304 | service.needsDNSUpdate = True |
305 | service.retryOnFailure = True |
306 | dns_result_0 = ( |
307 | @@ -295,7 +293,7 @@ class TestRegionControllerService(MAASServerTestCase): |
308 | @wait_for_reactor |
309 | @inlineCallbacks |
310 | def test_process_updates_proxy(self): |
311 | - service = self.make_service(sentinel.listener) |
312 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
313 | service.needsProxyUpdate = True |
314 | mock_proxy_update_config = self.patch( |
315 | region_controller, "proxy_update_config" |
316 | @@ -314,7 +312,7 @@ class TestRegionControllerService(MAASServerTestCase): |
317 | @wait_for_reactor |
318 | @inlineCallbacks |
319 | def test_process_updates_rbac(self): |
320 | - service = self.make_service(sentinel.listener) |
321 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
322 | service.needsRBACUpdate = True |
323 | mock_rbacSync = self.patch(service, "_rbacSync") |
324 | mock_rbacSync.return_value = [] |
325 | @@ -330,7 +328,7 @@ class TestRegionControllerService(MAASServerTestCase): |
326 | @wait_for_reactor |
327 | @inlineCallbacks |
328 | def test_process_updates_zones_logs_failure(self): |
329 | - service = self.make_service(sentinel.listener) |
330 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
331 | service.needsDNSUpdate = True |
332 | mock_dns_update_all_zones = self.patch( |
333 | region_controller, "dns_update_all_zones" |
334 | @@ -349,7 +347,7 @@ class TestRegionControllerService(MAASServerTestCase): |
335 | @wait_for_reactor |
336 | @inlineCallbacks |
337 | def test_process_updates_proxy_logs_failure(self): |
338 | - service = self.make_service(sentinel.listener) |
339 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
340 | service.needsProxyUpdate = True |
341 | mock_proxy_update_config = self.patch( |
342 | region_controller, "proxy_update_config" |
343 | @@ -368,7 +366,7 @@ class TestRegionControllerService(MAASServerTestCase): |
344 | @wait_for_reactor |
345 | @inlineCallbacks |
346 | def test_process_updates_rbac_logs_failure(self): |
347 | - service = self.make_service(sentinel.listener) |
348 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
349 | service.needsRBACUpdate = True |
350 | mock_rbacSync = self.patch(service, "_rbacSync") |
351 | mock_rbacSync.side_effect = factory.make_exception() |
352 | @@ -383,7 +381,7 @@ class TestRegionControllerService(MAASServerTestCase): |
353 | @wait_for_reactor |
354 | @inlineCallbacks |
355 | def test_process_updates_rbac_retries_with_delay(self): |
356 | - service = self.make_service(sentinel.listener) |
357 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
358 | service.needsRBACUpdate = True |
359 | service.retryOnFailure = True |
360 | service.rbacRetryOnFailureDelay = random.randint(1, 10) |
361 | @@ -405,7 +403,7 @@ class TestRegionControllerService(MAASServerTestCase): |
362 | @wait_for_reactor |
363 | @inlineCallbacks |
364 | def test_process_updates_bind_proxy_and_rbac(self): |
365 | - service = self.make_service(sentinel.listener) |
366 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
367 | service.needsDNSUpdate = True |
368 | service.needsProxyUpdate = True |
369 | service.needsRBACUpdate = True |
370 | @@ -441,7 +439,7 @@ class TestRegionControllerService(MAASServerTestCase): |
371 | |
372 | @wait_for_reactor |
373 | def test_check_serial_doesnt_raise_error_on_successful_serial_match(self): |
374 | - service = self.make_service(sentinel.listener) |
375 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
376 | result_serial = random.randint(1, 1000) |
377 | formatted_serial = f"{result_serial:10d}" |
378 | dns_names = [factory.make_name("domain") for _ in range(3)] |
379 | @@ -472,7 +470,7 @@ class TestRegionControllerService(MAASServerTestCase): |
380 | @wait_for_reactor |
381 | @inlineCallbacks |
382 | def test_check_serial_raise_error_after_30_tries(self): |
383 | - service = self.make_service(sentinel.listener) |
384 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
385 | result_serial = random.randint(1, 1000) |
386 | formatted_serial = f"{result_serial:10d}" |
387 | dns_names = [factory.make_name("domain") for _ in range(3)] |
388 | @@ -487,7 +485,7 @@ class TestRegionControllerService(MAASServerTestCase): |
389 | @wait_for_reactor |
390 | @inlineCallbacks |
391 | def test_check_serial_handles_ValueError(self): |
392 | - service = self.make_service(sentinel.listener) |
393 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
394 | result_serial = random.randint(1, 1000) |
395 | formatted_serial = f"{result_serial:10d}" |
396 | dns_names = [factory.make_name("domain") for _ in range(3)] |
397 | @@ -502,7 +500,7 @@ class TestRegionControllerService(MAASServerTestCase): |
398 | @wait_for_reactor |
399 | @inlineCallbacks |
400 | def test_check_serial_handles_TimeoutError(self): |
401 | - service = self.make_service(sentinel.listener) |
402 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
403 | result_serial = random.randint(1, 1000) |
404 | formatted_serial = f"{result_serial:10d}" |
405 | dns_names = [factory.make_name("domain") for _ in range(3)] |
406 | @@ -515,7 +513,7 @@ class TestRegionControllerService(MAASServerTestCase): |
407 | yield service._checkSerial((formatted_serial, True, dns_names)) |
408 | |
409 | def test_getRBACClient_returns_None_when_no_url(self): |
410 | - service = self.make_service(sentinel.listener) |
411 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
412 | service.rbacClient = sentinel.client |
413 | SecretManager().delete_secret("external-auth") |
414 | self.assertIsNone(service._getRBACClient()) |
415 | @@ -526,7 +524,7 @@ class TestRegionControllerService(MAASServerTestCase): |
416 | SecretManager().set_composite_secret( |
417 | "external-auth", {"rbac-url": "http://rbac.example.com"} |
418 | ) |
419 | - service = self.make_service(sentinel.listener) |
420 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
421 | client = service._getRBACClient() |
422 | self.assertIsNotNone(client) |
423 | self.assertIs(client, service.rbacClient) |
424 | @@ -537,7 +535,7 @@ class TestRegionControllerService(MAASServerTestCase): |
425 | SecretManager().set_composite_secret( |
426 | "external-auth", {"rbac-url": "http://rbac.example.com"} |
427 | ) |
428 | - service = self.make_service(sentinel.listener) |
429 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
430 | client = service._getRBACClient() |
431 | SecretManager().set_composite_secret( |
432 | "external-auth", {"rbac-url": "http://other.example.com"} |
433 | @@ -552,7 +550,7 @@ class TestRegionControllerService(MAASServerTestCase): |
434 | SecretManager().set_composite_secret( |
435 | "external-auth", {"rbac-url": "http://rbac.example.com"} |
436 | ) |
437 | - service = self.make_service(sentinel.listener) |
438 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
439 | client = service._getRBACClient() |
440 | mock_get_auth_info.return_value = MagicMock() |
441 | new_client = service._getRBACClient() |
442 | @@ -561,7 +559,7 @@ class TestRegionControllerService(MAASServerTestCase): |
443 | self.assertIs(new_client, service._getRBACClient()) |
444 | |
445 | def test_rbacNeedsFull(self): |
446 | - service = self.make_service(sentinel.listener) |
447 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
448 | changes = [ |
449 | RBACSync(action=RBAC_ACTION.ADD), |
450 | RBACSync(action=RBAC_ACTION.UPDATE), |
451 | @@ -571,7 +569,7 @@ class TestRegionControllerService(MAASServerTestCase): |
452 | self.assertTrue(service._rbacNeedsFull(changes)) |
453 | |
454 | def test_rbacDifference(self): |
455 | - service = self.make_service(sentinel.listener) |
456 | + service = self.make_service(sentinel.listener, sentinel.dbtasks) |
457 | changes = [ |
458 | RBACSync( |
459 | action=RBAC_ACTION.UPDATE, resource_id=1, resource_name="r-1" |
460 | @@ -636,7 +634,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
461 | ] |
462 | |
463 | publications = yield deferToDatabase(_create_publications) |
464 | - service = RegionControllerService(sentinel.listener) |
465 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
466 | service.needsDNSUpdate = True |
467 | service.previousSerial = publications[0].serial |
468 | dns_result = ( |
469 | @@ -677,7 +675,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
470 | ] |
471 | |
472 | publications = yield deferToDatabase(_create_publications) |
473 | - service = RegionControllerService(sentinel.listener) |
474 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
475 | service.needsDNSUpdate = True |
476 | service.previousSerial = publications[0].serial |
477 | dns_result = ( |
478 | @@ -708,14 +706,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
479 | def test_rbacSync_returns_None_when_nothing_to_do(self): |
480 | RBACSync.objects.clear("resource-pool") |
481 | |
482 | - service = RegionControllerService(sentinel.listener) |
483 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
484 | service.rbacInit = True |
485 | self.assertIsNone(service._rbacSync()) |
486 | |
487 | def test_rbacSync_returns_None_and_clears_sync_when_no_client(self): |
488 | RBACSync.objects.create(resource_type="resource-pool") |
489 | |
490 | - service = RegionControllerService(sentinel.listener) |
491 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
492 | self.assertIsNone(service._rbacSync()) |
493 | self.assertFalse(RBACSync.objects.exists()) |
494 | |
495 | @@ -729,7 +727,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
496 | |
497 | rbac_client = MagicMock() |
498 | rbac_client.update_resources.return_value = "x-y-z" |
499 | - service = RegionControllerService(sentinel.listener) |
500 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
501 | self.patch(service, "_getRBACClient").return_value = rbac_client |
502 | |
503 | self.assertEqual([], service._rbacSync()) |
504 | @@ -748,7 +746,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
505 | |
506 | rbac_client = MagicMock() |
507 | rbac_client.update_resources.return_value = "x-y-z" |
508 | - service = RegionControllerService(sentinel.listener) |
509 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
510 | self.patch(service, "_getRBACClient").return_value = rbac_client |
511 | |
512 | self.assertEqual([], service._rbacSync()) |
513 | @@ -773,7 +771,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
514 | |
515 | rbac_client = MagicMock() |
516 | rbac_client.update_resources.return_value = "x-y-z" |
517 | - service = RegionControllerService(sentinel.listener) |
518 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
519 | self.patch(service, "_getRBACClient").return_value = rbac_client |
520 | service.rbacInit = True |
521 | |
522 | @@ -807,7 +805,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
523 | SyncConflictError(), |
524 | "x-y-z", |
525 | ] |
526 | - service = RegionControllerService(sentinel.listener) |
527 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
528 | self.patch(service, "_getRBACClient").return_value = rbac_client |
529 | service.rbacInit = True |
530 | |
531 | @@ -838,7 +836,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
532 | |
533 | rbac_client = MagicMock() |
534 | rbac_client.update_resources.return_value = "x-y-z" |
535 | - service = RegionControllerService(sentinel.listener) |
536 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
537 | self.patch(service, "_getRBACClient").return_value = rbac_client |
538 | service.rbacInit = True |
539 | |
540 | @@ -856,7 +854,9 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
541 | domain = yield deferToDatabase(factory.make_Domain) |
542 | update_result = (random.randint(0, 10), True, [domain.name]) |
543 | record = yield deferToDatabase(factory.make_DNSResource, domain=domain) |
544 | - service = RegionControllerService(sentinel.listener) |
545 | + dbtasks = DatabaseTasksService() |
546 | + dbtasks.startService() |
547 | + service = RegionControllerService(sentinel.listener, dbtasks) |
548 | |
549 | update_zones = self.patch(region_controller, "dns_update_all_zones") |
550 | update_zones.return_value = update_result |
551 | @@ -864,10 +864,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
552 | check_serial.return_value = succeed(update_result) |
553 | |
554 | service._dns_update_in_progress = True |
555 | - yield service.queueDynamicDNSUpdate( |
556 | + service.queueDynamicDNSUpdate( |
557 | factory.make_name(), |
558 | f"INSERT {domain.name} {record.name} A 30 10.10.10.10", |
559 | ) |
560 | + |
561 | + # Wait until all the dynamic updates are processed |
562 | + yield dbtasks.syncTask() |
563 | + |
564 | self.assertCountEqual(service._dns_updates, []) |
565 | self.assertCountEqual( |
566 | service._queued_updates, |
567 | @@ -902,10 +906,12 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
568 | @wait_for_reactor |
569 | @inlineCallbacks |
570 | def test_dns_is_set_to_update_when_queued_updates_are_present(self): |
571 | + dbtasks = DatabaseTasksService() |
572 | + dbtasks.startService() |
573 | domain = yield deferToDatabase(factory.make_Domain) |
574 | update_result = (random.randint(0, 10), True, [domain.name]) |
575 | record = yield deferToDatabase(factory.make_DNSResource, domain=domain) |
576 | - service = RegionControllerService(sentinel.listener) |
577 | + service = RegionControllerService(sentinel.listener, dbtasks) |
578 | |
579 | update_zones = self.patch(region_controller, "dns_update_all_zones") |
580 | update_zones.return_value = update_result |
581 | @@ -913,10 +919,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
582 | check_serial.return_value = succeed(update_result) |
583 | |
584 | service._dns_update_in_progress = True |
585 | - yield service.queueDynamicDNSUpdate( |
586 | + service.queueDynamicDNSUpdate( |
587 | factory.make_name(), |
588 | f"INSERT {domain.name} {record.name} A 30 10.10.10.10", |
589 | ) |
590 | + |
591 | + # Wait until all the dynamic updates are processed |
592 | + yield dbtasks.syncTask() |
593 | + |
594 | self.assertCountEqual(service._dns_updates, []) |
595 | expected_updates = [ |
596 | DynamicDNSUpdate( |
597 | @@ -958,7 +968,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
598 | def test_check_serial_is_skipped_if_a_newer_serial_exists(self): |
599 | domain = yield deferToDatabase(factory.make_Domain) |
600 | update_result = (random.randint(0, 10), True, [domain.name]) |
601 | - service = RegionControllerService(sentinel.listener) |
602 | + service = RegionControllerService(sentinel.listener, sentinel.dbtasks) |
603 | |
604 | query = self.patch(service.dnsResolver, "lookupAuthority") |
605 | |
606 | @@ -968,12 +978,16 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
607 | |
608 | query.assert_not_called() |
609 | |
610 | + @wait_for_reactor |
611 | + @inlineCallbacks |
612 | def test_queueDynamicDNSUpdate_can_be_called_synchronously(self): |
613 | - domain = factory.make_Domain() |
614 | + dbtasks = DatabaseTasksService() |
615 | + dbtasks.startService() |
616 | + domain = yield deferToDatabase(factory.make_Domain) |
617 | update_result = (random.randint(0, 10), True, [domain.name]) |
618 | - record1 = factory.make_DNSResource(domain=domain) |
619 | - record2 = factory.make_DNSResource(domain=domain) |
620 | - service = RegionControllerService(sentinel.listener) |
621 | + record1 = yield deferToDatabase(factory.make_DNSResource, domain) |
622 | + record2 = yield deferToDatabase(factory.make_DNSResource, domain) |
623 | + service = RegionControllerService(sentinel.listener, dbtasks) |
624 | |
625 | update_zones = self.patch(region_controller, "dns_update_all_zones") |
626 | update_zones.return_value = update_result |
627 | @@ -998,6 +1012,9 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase): |
628 | f"DELETE {domain.name} {record2.name} A 30 2.2.2.2", |
629 | ) |
630 | |
631 | + # Wait until all the dynamic updates are processed |
632 | + yield dbtasks.syncTask() |
633 | + |
634 | self.assertCountEqual( |
635 | service._dns_updates, |
636 | [ |
637 | diff --git a/src/maasserver/utils/dbtasks.py b/src/maasserver/utils/dbtasks.py |
638 | index 5863f02..04cbf7e 100644 |
639 | --- a/src/maasserver/utils/dbtasks.py |
640 | +++ b/src/maasserver/utils/dbtasks.py |
641 | @@ -26,14 +26,17 @@ class DatabaseTaskAlreadyRunning(Exception): |
642 | class DatabaseTasksService(Service): |
643 | """Run deferred database operations one at a time. |
644 | |
645 | - Once the service is started, `deferTask` and `addTask` can be used to |
646 | - queue up execution of a database task. |
647 | + Once the service is started, `deferTask`, `addTask` and |
648 | + `deferTaskWithCallbacks` can be used to queue up execution of a database task. |
649 | |
650 | - The former — `deferTask` — will return a `Deferred` that fires with the |
651 | + The former —`deferTaskWithCallbacks` — returns nothing, and can be used to add tasks with callbacks to the queue. |
652 | + Errors arising from this task are simply logged. |
653 | + |
654 | + `deferTask` — will return a `Deferred` that fires with the |
655 | result of the database task. Errors arising from this task become the |
656 | responsibility of the caller. |
657 | |
658 | - The latter — `addTask` — returns nothing, and will log errors arising from |
659 | + `addTask` — returns nothing, and will log errors arising from |
660 | the database task. |
661 | |
662 | Before this service has been started, and as soon as shutdown has |
663 | @@ -50,6 +53,24 @@ class DatabaseTasksService(Service): |
664 | self.queue = DeferredQueue(size=0, backlog=1) |
665 | |
666 | @asynchronous |
667 | + def deferTaskWithCallbacks(self, func, callbacks, *args, **kwargs): |
668 | + """Schedules `func` with callbacks to run later. |
669 | + |
670 | + :raise QueueOverflow: If the queue of tasks is full. |
671 | + :return: `None` |
672 | + """ |
673 | + |
674 | + def task(): |
675 | + d = deferToDatabase(func, *args, **kwargs) |
676 | + for callback in callbacks: |
677 | + d.addCallback(callback) |
678 | + d.addErrback(log.err, "Unhandled failure in database task.") |
679 | + return d |
680 | + |
681 | + self.queue.put(task) |
682 | + return None |
683 | + |
684 | + @asynchronous |
685 | def deferTask(self, func, *args, **kwargs): |
686 | """Schedules `func` to run later. |
687 | |
688 | diff --git a/src/maasserver/utils/tests/test_dbtasks.py b/src/maasserver/utils/tests/test_dbtasks.py |
689 | index 0f54e05..06335ef 100644 |
690 | --- a/src/maasserver/utils/tests/test_dbtasks.py |
691 | +++ b/src/maasserver/utils/tests/test_dbtasks.py |
692 | @@ -120,6 +120,27 @@ class TestDatabaseTaskService(MAASTestCase): |
693 | finally: |
694 | service.stopService() |
695 | |
696 | + def test_callbacks_are_called_from_deferredTask(self): |
697 | + def simple_function(input): |
698 | + return input + " world" |
699 | + |
700 | + def callback(data): |
701 | + sentinel.callback = data |
702 | + |
703 | + service = DatabaseTasksService() |
704 | + service.startService() |
705 | + try: |
706 | + service.deferTaskWithCallbacks( |
707 | + simple_function, [callback], "Hello" |
708 | + ) |
709 | + service.syncTask().wait(TIMEOUT) |
710 | + self.assertEqual( |
711 | + "Hello world", |
712 | + sentinel.callback, |
713 | + ) |
714 | + finally: |
715 | + service.stopService() |
716 | + |
717 | def test_tasks_are_all_run_before_shutdown_completes(self): |
718 | service = DatabaseTasksService() |
719 | service.startService() |
UNIT TESTS fix-concurrent- dns-updates lp:~r00ta/maas/+git/maas into -b master lp:~maas-committers/maas
-b lp-2045228-
STATUS: FAILED maas-ci. internal: 8080/job/ maas-tester/ 4236/console 9c1c157b3ef8d70 1b5410693a
LOG: http://
COMMIT: 5077da0bf100f98