Merge ~r00ta/maas:lp-2045228-fix-concurrent-dns-updates-3.3 into maas:3.3

Proposed by Jacopo Rota
Status: Merged
Approved by: Jacopo Rota
Approved revision: 3e2c197c8b316a54c93a0705b51d9afb0690b179
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~r00ta/maas:lp-2045228-fix-concurrent-dns-updates-3.3
Merge into: maas:3.3
Diff against target: 719 lines (+163/-77)
7 files modified
src/maasserver/eventloop.py (+8/-3)
src/maasserver/region_controller.py (+21/-13)
src/maasserver/tests/test_eventloop.py (+13/-1)
src/maasserver/tests/test_plugin.py (+2/-0)
src/maasserver/tests/test_region_controller.py (+73/-56)
src/maasserver/utils/dbtasks.py (+25/-4)
src/maasserver/utils/tests/test_dbtasks.py (+21/-0)
Reviewer Review Type Date Requested Status
Jacopo Rota Approve
MAAS Lander Approve
Review via email: mp+457574@code.launchpad.net

Commit message

fix: lp-2045228 race condition in postgres listener for dns dynamic updates

Description of the change

Cherry pick of 5d4ea79dd26504f26f26722673cf94428cea6365 - https://code.launchpad.net/~r00ta/maas/+git/maas/+merge/457554

To post a comment you must log in.
Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b lp-2045228-fix-concurrent-dns-updates-3.3 lp:~r00ta/maas/+git/maas into -b 3.3 lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: 3e2c197c8b316a54c93a0705b51d9afb0690b179

review: Approve
Revision history for this message
Jacopo Rota (r00ta) wrote :

self approving backport

Revision history for this message
Jacopo Rota (r00ta) wrote :

self approving backport

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maasserver/eventloop.py b/src/maasserver/eventloop.py
2index 753c66a..0052348 100644
3--- a/src/maasserver/eventloop.py
4+++ b/src/maasserver/eventloop.py
5@@ -67,10 +67,10 @@ def make_DatabaseTaskService():
6 return dbtasks.DatabaseTasksService()
7
8
9-def make_RegionControllerService(postgresListener):
10+def make_RegionControllerService(postgresListener, dbtasks):
11 from maasserver.region_controller import RegionControllerService
12
13- return RegionControllerService(postgresListener)
14+ return RegionControllerService(postgresListener, dbtasks)
15
16
17 def make_RegionService(ipcWorker):
18@@ -287,10 +287,15 @@ class RegionEventLoop:
19 "factory": make_DatabaseTaskService,
20 "requires": [],
21 },
22+ "database-tasks-master": {
23+ "only_on_master": True,
24+ "factory": make_DatabaseTaskService,
25+ "requires": [],
26+ },
27 "region-controller": {
28 "only_on_master": True,
29 "factory": make_RegionControllerService,
30- "requires": ["postgres-listener-master"],
31+ "requires": ["postgres-listener-master", "database-tasks-master"],
32 },
33 "rpc": {
34 "only_on_master": False,
35diff --git a/src/maasserver/region_controller.py b/src/maasserver/region_controller.py
36index 26e2c21..756dfbd 100644
37--- a/src/maasserver/region_controller.py
38+++ b/src/maasserver/region_controller.py
39@@ -78,6 +78,7 @@ class RegionControllerService(Service):
40 def __init__(
41 self,
42 postgresListener,
43+ dbtasks,
44 clock=reactor,
45 retryOnFailure=True,
46 rbacRetryOnFailureDelay=10,
47@@ -103,6 +104,7 @@ class RegionControllerService(Service):
48 self._dns_requires_full_reload = True
49 self._dns_latest_serial = None
50 self.postgresListener = postgresListener
51+ self.dbtasks = dbtasks
52 self.dnsResolver = Resolver(
53 resolv=None,
54 servers=[("127.0.0.1", 53)],
55@@ -175,27 +177,33 @@ class RegionControllerService(Service):
56 )
57 eventloop.restart()
58
59- @asynchronous(timeout=FOREVER)
60- @inlineCallbacks
61 def queueDynamicDNSUpdate(self, channel, message):
62 """
63 Called when the `sys_dns_update` message is received
64- and queues updates for existing domains
65+ and queues updates for existing domains.
66+ The updates are offloaded to the DatabaseTasksService in order to
67+ process them in sequence and keep consuming the next postgres notifications.
68 """
69+
70+ def updateCallback(data):
71+ (new_updates, need_reload) = data
72+ self._dns_requires_full_reload = (
73+ self._dns_requires_full_reload or need_reload
74+ )
75+ if self._dns_update_in_progress:
76+ self._queued_updates += new_updates
77+ else:
78+ self._dns_updates += new_updates
79+
80+ log.debug("Start processing dynamic DNS update '{}'".format(message))
81 if message == "":
82 return
83
84- (new_updates, need_reload) = yield deferToDatabase(
85- process_dns_update_notify, message
86- )
87-
88- self._dns_requires_full_reload = (
89- self._dns_requires_full_reload or need_reload
90+ self.dbtasks.deferTaskWithCallbacks(
91+ process_dns_update_notify,
92+ [updateCallback],
93+ message,
94 )
95- if self._dns_update_in_progress:
96- self._queued_updates += new_updates
97- else:
98- self._dns_updates += new_updates
99
100 def startProcessing(self):
101 """Start the process looping call."""
102diff --git a/src/maasserver/tests/test_eventloop.py b/src/maasserver/tests/test_eventloop.py
103index 691ff01..02c1ac4 100644
104--- a/src/maasserver/tests/test_eventloop.py
105+++ b/src/maasserver/tests/test_eventloop.py
106@@ -368,9 +368,21 @@ class TestFactories(MAASServerTestCase):
107 eventloop.loop.factories["database-tasks"]["only_on_master"]
108 )
109
110+ def test_make_MasterDatabaseTaskService(self):
111+ service = eventloop.make_DatabaseTaskService()
112+ self.assertIsInstance(service, dbtasks.DatabaseTasksService)
113+ # It is registered as a factory in RegionEventLoop.
114+ self.assertIs(
115+ eventloop.make_DatabaseTaskService,
116+ eventloop.loop.factories["database-tasks-master"]["factory"],
117+ )
118+ self.assertTrue(
119+ eventloop.loop.factories["database-tasks-master"]["only_on_master"]
120+ )
121+
122 def test_make_RegionControllerService(self):
123 service = eventloop.make_RegionControllerService(
124- sentinel.postgresListener
125+ sentinel.postgresListener, sentinel.dbtasks
126 )
127 self.assertIsInstance(
128 service, region_controller.RegionControllerService
129diff --git a/src/maasserver/tests/test_plugin.py b/src/maasserver/tests/test_plugin.py
130index 6c2572c..ba32ca5 100644
131--- a/src/maasserver/tests/test_plugin.py
132+++ b/src/maasserver/tests/test_plugin.py
133@@ -237,6 +237,7 @@ class TestRegionMasterServiceMaker(TestServiceMaker):
134 service = service_maker.makeService(options)
135 self.assertIsInstance(service, MultiService)
136 expected_services = {
137+ "database-tasks-master",
138 "region-controller",
139 "nonce-cleanup",
140 "dns-publication-cleanup",
141@@ -365,6 +366,7 @@ class TestRegionAllInOneServiceMaker(TestServiceMaker):
142 "web",
143 "ipc-worker",
144 # Master services.
145+ "database-tasks-master",
146 "region-controller",
147 "nonce-cleanup",
148 "dns-publication-cleanup",
149diff --git a/src/maasserver/tests/test_region_controller.py b/src/maasserver/tests/test_region_controller.py
150index fd974e8..b688e58 100644
151--- a/src/maasserver/tests/test_region_controller.py
152+++ b/src/maasserver/tests/test_region_controller.py
153@@ -29,6 +29,7 @@ from maasserver.testing.testcase import (
154 MAASServerTestCase,
155 MAASTransactionServerTestCase,
156 )
157+from maasserver.utils.dbtasks import DatabaseTasksService
158 from maasserver.utils.threads import deferToDatabase
159 from maastesting.crochet import wait_for
160 from maastesting.matchers import (
161@@ -43,12 +44,12 @@ wait_for_reactor = wait_for()
162
163
164 class TestRegionControllerService(MAASServerTestCase):
165- def make_service(self, listener):
166+ def make_service(self, listener=MagicMock(), dbtasks=MagicMock()):
167 # Don't retry on failure or the tests will loop forever.
168- return RegionControllerService(listener, retryOnFailure=False)
169+ return RegionControllerService(listener, dbtasks, retryOnFailure=False)
170
171 def test_init_sets_properties(self):
172- service = self.make_service(sentinel.listener)
173+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
174 self.assertThat(
175 service,
176 MatchesStructure.byEquality(
177@@ -56,6 +57,7 @@ class TestRegionControllerService(MAASServerTestCase):
178 processingDefer=None,
179 needsDNSUpdate=True,
180 postgresListener=sentinel.listener,
181+ dbtasks=sentinel.dbtasks,
182 ),
183 )
184
185@@ -107,31 +109,27 @@ class TestRegionControllerService(MAASServerTestCase):
186 @wait_for_reactor
187 @inlineCallbacks
188 def test_stopService_handles_canceling_processing(self):
189- listener = MagicMock()
190- service = self.make_service(listener)
191+ service = self.make_service()
192 service.startProcessing()
193 yield service.stopService()
194 self.assertIsNone(service.processingDefer)
195
196 def test_markDNSForUpdate_sets_needsDNSUpdate_and_starts_process(self):
197- listener = MagicMock()
198- service = self.make_service(listener)
199+ service = self.make_service()
200 mock_startProcessing = self.patch(service, "startProcessing")
201 service.markDNSForUpdate(None, None)
202 self.assertTrue(service.needsDNSUpdate)
203 self.assertThat(mock_startProcessing, MockCalledOnceWith())
204
205 def test_markProxyForUpdate_sets_needsProxyUpdate_and_starts_process(self):
206- listener = MagicMock()
207- service = self.make_service(listener)
208+ service = self.make_service()
209 mock_startProcessing = self.patch(service, "startProcessing")
210 service.markProxyForUpdate(None, None)
211 self.assertTrue(service.needsProxyUpdate)
212 self.assertThat(mock_startProcessing, MockCalledOnceWith())
213
214 def test_markRBACForUpdate_sets_needsRBACUpdate_and_starts_process(self):
215- listener = MagicMock()
216- service = self.make_service(listener)
217+ service = self.make_service()
218 mock_startProcessing = self.patch(service, "startProcessing")
219 service.markRBACForUpdate(None, None)
220 self.assertTrue(service.needsRBACUpdate)
221@@ -139,19 +137,19 @@ class TestRegionControllerService(MAASServerTestCase):
222
223 def test_restart_region_restarts_eventloop(self):
224 restart_mock = self.patch(eventloop, "restart")
225- service = self.make_service(MagicMock())
226+ service = self.make_service()
227 service.restartRegion("sys_vault_migration", "")
228 restart_mock.assert_called_once()
229
230 def test_startProcessing_doesnt_call_start_when_looping_call_running(self):
231- service = self.make_service(sentinel.listener)
232+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
233 mock_start = self.patch(service.processing, "start")
234 service.processing.running = True
235 service.startProcessing()
236 self.assertThat(mock_start, MockNotCalled())
237
238 def test_startProcessing_calls_start_when_looping_call_not_running(self):
239- service = self.make_service(sentinel.listener)
240+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
241 mock_start = self.patch(service.processing, "start")
242 service.startProcessing()
243 self.assertThat(mock_start, MockCalledOnceWith(0.1, now=False))
244@@ -159,7 +157,7 @@ class TestRegionControllerService(MAASServerTestCase):
245 @wait_for_reactor
246 @inlineCallbacks
247 def test_reload_dns_on_start(self):
248- service = self.make_service(sentinel.listener)
249+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
250 mock_dns_update_all_zones = self.patch(
251 region_controller, "dns_update_all_zones"
252 )
253@@ -177,7 +175,7 @@ class TestRegionControllerService(MAASServerTestCase):
254 @wait_for_reactor
255 @inlineCallbacks
256 def test_process_doesnt_update_zones_when_nothing_to_process(self):
257- service = self.make_service(sentinel.listener)
258+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
259 service.needsDNSUpdate = False
260 mock_dns_update_all_zones = self.patch(
261 region_controller, "dns_update_all_zones"
262@@ -189,7 +187,7 @@ class TestRegionControllerService(MAASServerTestCase):
263 @wait_for_reactor
264 @inlineCallbacks
265 def test_process_doesnt_proxy_update_config_when_nothing_to_process(self):
266- service = self.make_service(sentinel.listener)
267+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
268 service.needsProxyUpdate = False
269 mock_proxy_update_config = self.patch(
270 region_controller, "proxy_update_config"
271@@ -201,7 +199,7 @@ class TestRegionControllerService(MAASServerTestCase):
272 @wait_for_reactor
273 @inlineCallbacks
274 def test_process_doesnt_call_rbacSync_when_nothing_to_process(self):
275- service = self.make_service(sentinel.listener)
276+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
277 service.needsRBACUpdate = False
278 mock_rbacSync = self.patch(service, "_rbacSync")
279 service.startProcessing()
280@@ -211,7 +209,7 @@ class TestRegionControllerService(MAASServerTestCase):
281 @wait_for_reactor
282 @inlineCallbacks
283 def test_process_stops_processing(self):
284- service = self.make_service(sentinel.listener)
285+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
286 service.needsDNSUpdate = False
287 service.startProcessing()
288 yield service.processingDefer
289@@ -220,7 +218,7 @@ class TestRegionControllerService(MAASServerTestCase):
290 @wait_for_reactor
291 @inlineCallbacks
292 def test_process_updates_zones(self):
293- service = self.make_service(sentinel.listener)
294+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
295 service.needsDNSUpdate = True
296 dns_result = (
297 random.randint(1, 1000),
298@@ -248,7 +246,7 @@ class TestRegionControllerService(MAASServerTestCase):
299 @wait_for_reactor
300 @inlineCallbacks
301 def test_process_zones_kills_bind_on_failed_reload(self):
302- service = self.make_service(sentinel.listener)
303+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
304 service.needsDNSUpdate = True
305 service.retryOnFailure = True
306 dns_result_0 = (
307@@ -295,7 +293,7 @@ class TestRegionControllerService(MAASServerTestCase):
308 @wait_for_reactor
309 @inlineCallbacks
310 def test_process_updates_proxy(self):
311- service = self.make_service(sentinel.listener)
312+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
313 service.needsProxyUpdate = True
314 mock_proxy_update_config = self.patch(
315 region_controller, "proxy_update_config"
316@@ -314,7 +312,7 @@ class TestRegionControllerService(MAASServerTestCase):
317 @wait_for_reactor
318 @inlineCallbacks
319 def test_process_updates_rbac(self):
320- service = self.make_service(sentinel.listener)
321+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
322 service.needsRBACUpdate = True
323 mock_rbacSync = self.patch(service, "_rbacSync")
324 mock_rbacSync.return_value = []
325@@ -330,7 +328,7 @@ class TestRegionControllerService(MAASServerTestCase):
326 @wait_for_reactor
327 @inlineCallbacks
328 def test_process_updates_zones_logs_failure(self):
329- service = self.make_service(sentinel.listener)
330+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
331 service.needsDNSUpdate = True
332 mock_dns_update_all_zones = self.patch(
333 region_controller, "dns_update_all_zones"
334@@ -349,7 +347,7 @@ class TestRegionControllerService(MAASServerTestCase):
335 @wait_for_reactor
336 @inlineCallbacks
337 def test_process_updates_proxy_logs_failure(self):
338- service = self.make_service(sentinel.listener)
339+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
340 service.needsProxyUpdate = True
341 mock_proxy_update_config = self.patch(
342 region_controller, "proxy_update_config"
343@@ -368,7 +366,7 @@ class TestRegionControllerService(MAASServerTestCase):
344 @wait_for_reactor
345 @inlineCallbacks
346 def test_process_updates_rbac_logs_failure(self):
347- service = self.make_service(sentinel.listener)
348+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
349 service.needsRBACUpdate = True
350 mock_rbacSync = self.patch(service, "_rbacSync")
351 mock_rbacSync.side_effect = factory.make_exception()
352@@ -383,7 +381,7 @@ class TestRegionControllerService(MAASServerTestCase):
353 @wait_for_reactor
354 @inlineCallbacks
355 def test_process_updates_rbac_retries_with_delay(self):
356- service = self.make_service(sentinel.listener)
357+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
358 service.needsRBACUpdate = True
359 service.retryOnFailure = True
360 service.rbacRetryOnFailureDelay = random.randint(1, 10)
361@@ -405,7 +403,7 @@ class TestRegionControllerService(MAASServerTestCase):
362 @wait_for_reactor
363 @inlineCallbacks
364 def test_process_updates_bind_proxy_and_rbac(self):
365- service = self.make_service(sentinel.listener)
366+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
367 service.needsDNSUpdate = True
368 service.needsProxyUpdate = True
369 service.needsRBACUpdate = True
370@@ -441,7 +439,7 @@ class TestRegionControllerService(MAASServerTestCase):
371
372 @wait_for_reactor
373 def test_check_serial_doesnt_raise_error_on_successful_serial_match(self):
374- service = self.make_service(sentinel.listener)
375+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
376 result_serial = random.randint(1, 1000)
377 formatted_serial = f"{result_serial:10d}"
378 dns_names = [factory.make_name("domain") for _ in range(3)]
379@@ -472,7 +470,7 @@ class TestRegionControllerService(MAASServerTestCase):
380 @wait_for_reactor
381 @inlineCallbacks
382 def test_check_serial_raise_error_after_30_tries(self):
383- service = self.make_service(sentinel.listener)
384+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
385 result_serial = random.randint(1, 1000)
386 formatted_serial = f"{result_serial:10d}"
387 dns_names = [factory.make_name("domain") for _ in range(3)]
388@@ -487,7 +485,7 @@ class TestRegionControllerService(MAASServerTestCase):
389 @wait_for_reactor
390 @inlineCallbacks
391 def test_check_serial_handles_ValueError(self):
392- service = self.make_service(sentinel.listener)
393+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
394 result_serial = random.randint(1, 1000)
395 formatted_serial = f"{result_serial:10d}"
396 dns_names = [factory.make_name("domain") for _ in range(3)]
397@@ -502,7 +500,7 @@ class TestRegionControllerService(MAASServerTestCase):
398 @wait_for_reactor
399 @inlineCallbacks
400 def test_check_serial_handles_TimeoutError(self):
401- service = self.make_service(sentinel.listener)
402+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
403 result_serial = random.randint(1, 1000)
404 formatted_serial = f"{result_serial:10d}"
405 dns_names = [factory.make_name("domain") for _ in range(3)]
406@@ -515,7 +513,7 @@ class TestRegionControllerService(MAASServerTestCase):
407 yield service._checkSerial((formatted_serial, True, dns_names))
408
409 def test_getRBACClient_returns_None_when_no_url(self):
410- service = self.make_service(sentinel.listener)
411+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
412 service.rbacClient = sentinel.client
413 SecretManager().delete_secret("external-auth")
414 self.assertIsNone(service._getRBACClient())
415@@ -526,7 +524,7 @@ class TestRegionControllerService(MAASServerTestCase):
416 SecretManager().set_composite_secret(
417 "external-auth", {"rbac-url": "http://rbac.example.com"}
418 )
419- service = self.make_service(sentinel.listener)
420+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
421 client = service._getRBACClient()
422 self.assertIsNotNone(client)
423 self.assertIs(client, service.rbacClient)
424@@ -537,7 +535,7 @@ class TestRegionControllerService(MAASServerTestCase):
425 SecretManager().set_composite_secret(
426 "external-auth", {"rbac-url": "http://rbac.example.com"}
427 )
428- service = self.make_service(sentinel.listener)
429+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
430 client = service._getRBACClient()
431 SecretManager().set_composite_secret(
432 "external-auth", {"rbac-url": "http://other.example.com"}
433@@ -552,7 +550,7 @@ class TestRegionControllerService(MAASServerTestCase):
434 SecretManager().set_composite_secret(
435 "external-auth", {"rbac-url": "http://rbac.example.com"}
436 )
437- service = self.make_service(sentinel.listener)
438+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
439 client = service._getRBACClient()
440 mock_get_auth_info.return_value = MagicMock()
441 new_client = service._getRBACClient()
442@@ -561,7 +559,7 @@ class TestRegionControllerService(MAASServerTestCase):
443 self.assertIs(new_client, service._getRBACClient())
444
445 def test_rbacNeedsFull(self):
446- service = self.make_service(sentinel.listener)
447+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
448 changes = [
449 RBACSync(action=RBAC_ACTION.ADD),
450 RBACSync(action=RBAC_ACTION.UPDATE),
451@@ -571,7 +569,7 @@ class TestRegionControllerService(MAASServerTestCase):
452 self.assertTrue(service._rbacNeedsFull(changes))
453
454 def test_rbacDifference(self):
455- service = self.make_service(sentinel.listener)
456+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
457 changes = [
458 RBACSync(
459 action=RBAC_ACTION.UPDATE, resource_id=1, resource_name="r-1"
460@@ -636,7 +634,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
461 ]
462
463 publications = yield deferToDatabase(_create_publications)
464- service = RegionControllerService(sentinel.listener)
465+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
466 service.needsDNSUpdate = True
467 service.previousSerial = publications[0].serial
468 dns_result = (
469@@ -677,7 +675,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
470 ]
471
472 publications = yield deferToDatabase(_create_publications)
473- service = RegionControllerService(sentinel.listener)
474+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
475 service.needsDNSUpdate = True
476 service.previousSerial = publications[0].serial
477 dns_result = (
478@@ -708,14 +706,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
479 def test_rbacSync_returns_None_when_nothing_to_do(self):
480 RBACSync.objects.clear("resource-pool")
481
482- service = RegionControllerService(sentinel.listener)
483+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
484 service.rbacInit = True
485 self.assertIsNone(service._rbacSync())
486
487 def test_rbacSync_returns_None_and_clears_sync_when_no_client(self):
488 RBACSync.objects.create(resource_type="resource-pool")
489
490- service = RegionControllerService(sentinel.listener)
491+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
492 self.assertIsNone(service._rbacSync())
493 self.assertFalse(RBACSync.objects.exists())
494
495@@ -729,7 +727,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
496
497 rbac_client = MagicMock()
498 rbac_client.update_resources.return_value = "x-y-z"
499- service = RegionControllerService(sentinel.listener)
500+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
501 self.patch(service, "_getRBACClient").return_value = rbac_client
502
503 self.assertEqual([], service._rbacSync())
504@@ -748,7 +746,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
505
506 rbac_client = MagicMock()
507 rbac_client.update_resources.return_value = "x-y-z"
508- service = RegionControllerService(sentinel.listener)
509+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
510 self.patch(service, "_getRBACClient").return_value = rbac_client
511
512 self.assertEqual([], service._rbacSync())
513@@ -773,7 +771,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
514
515 rbac_client = MagicMock()
516 rbac_client.update_resources.return_value = "x-y-z"
517- service = RegionControllerService(sentinel.listener)
518+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
519 self.patch(service, "_getRBACClient").return_value = rbac_client
520 service.rbacInit = True
521
522@@ -807,7 +805,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
523 SyncConflictError(),
524 "x-y-z",
525 ]
526- service = RegionControllerService(sentinel.listener)
527+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
528 self.patch(service, "_getRBACClient").return_value = rbac_client
529 service.rbacInit = True
530
531@@ -838,7 +836,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
532
533 rbac_client = MagicMock()
534 rbac_client.update_resources.return_value = "x-y-z"
535- service = RegionControllerService(sentinel.listener)
536+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
537 self.patch(service, "_getRBACClient").return_value = rbac_client
538 service.rbacInit = True
539
540@@ -856,7 +854,9 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
541 domain = yield deferToDatabase(factory.make_Domain)
542 update_result = (random.randint(0, 10), True, [domain.name])
543 record = yield deferToDatabase(factory.make_DNSResource, domain=domain)
544- service = RegionControllerService(sentinel.listener)
545+ dbtasks = DatabaseTasksService()
546+ dbtasks.startService()
547+ service = RegionControllerService(sentinel.listener, dbtasks)
548
549 update_zones = self.patch(region_controller, "dns_update_all_zones")
550 update_zones.return_value = update_result
551@@ -864,10 +864,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
552 check_serial.return_value = succeed(update_result)
553
554 service._dns_update_in_progress = True
555- yield service.queueDynamicDNSUpdate(
556+ service.queueDynamicDNSUpdate(
557 factory.make_name(),
558 f"INSERT {domain.name} {record.name} A 30 10.10.10.10",
559 )
560+
561+ # Wait until all the dynamic updates are processed
562+ yield dbtasks.syncTask()
563+
564 self.assertCountEqual(service._dns_updates, [])
565 self.assertCountEqual(
566 service._queued_updates,
567@@ -902,10 +906,12 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
568 @wait_for_reactor
569 @inlineCallbacks
570 def test_dns_is_set_to_update_when_queued_updates_are_present(self):
571+ dbtasks = DatabaseTasksService()
572+ dbtasks.startService()
573 domain = yield deferToDatabase(factory.make_Domain)
574 update_result = (random.randint(0, 10), True, [domain.name])
575 record = yield deferToDatabase(factory.make_DNSResource, domain=domain)
576- service = RegionControllerService(sentinel.listener)
577+ service = RegionControllerService(sentinel.listener, dbtasks)
578
579 update_zones = self.patch(region_controller, "dns_update_all_zones")
580 update_zones.return_value = update_result
581@@ -913,10 +919,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
582 check_serial.return_value = succeed(update_result)
583
584 service._dns_update_in_progress = True
585- yield service.queueDynamicDNSUpdate(
586+ service.queueDynamicDNSUpdate(
587 factory.make_name(),
588 f"INSERT {domain.name} {record.name} A 30 10.10.10.10",
589 )
590+
591+ # Wait until all the dynamic updates are processed
592+ yield dbtasks.syncTask()
593+
594 self.assertCountEqual(service._dns_updates, [])
595 expected_updates = [
596 DynamicDNSUpdate(
597@@ -958,7 +968,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
598 def test_check_serial_is_skipped_if_a_newer_serial_exists(self):
599 domain = yield deferToDatabase(factory.make_Domain)
600 update_result = (random.randint(0, 10), True, [domain.name])
601- service = RegionControllerService(sentinel.listener)
602+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
603
604 query = self.patch(service.dnsResolver, "lookupAuthority")
605
606@@ -968,12 +978,16 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
607
608 query.assert_not_called()
609
610+ @wait_for_reactor
611+ @inlineCallbacks
612 def test_queueDynamicDNSUpdate_can_be_called_synchronously(self):
613- domain = factory.make_Domain()
614+ dbtasks = DatabaseTasksService()
615+ dbtasks.startService()
616+ domain = yield deferToDatabase(factory.make_Domain)
617 update_result = (random.randint(0, 10), True, [domain.name])
618- record1 = factory.make_DNSResource(domain=domain)
619- record2 = factory.make_DNSResource(domain=domain)
620- service = RegionControllerService(sentinel.listener)
621+ record1 = yield deferToDatabase(factory.make_DNSResource, domain)
622+ record2 = yield deferToDatabase(factory.make_DNSResource, domain)
623+ service = RegionControllerService(sentinel.listener, dbtasks)
624
625 update_zones = self.patch(region_controller, "dns_update_all_zones")
626 update_zones.return_value = update_result
627@@ -998,6 +1012,9 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
628 f"DELETE {domain.name} {record2.name} A 30 2.2.2.2",
629 )
630
631+ # Wait until all the dynamic updates are processed
632+ yield dbtasks.syncTask()
633+
634 self.assertCountEqual(
635 service._dns_updates,
636 [
637diff --git a/src/maasserver/utils/dbtasks.py b/src/maasserver/utils/dbtasks.py
638index 5863f02..04cbf7e 100644
639--- a/src/maasserver/utils/dbtasks.py
640+++ b/src/maasserver/utils/dbtasks.py
641@@ -26,14 +26,17 @@ class DatabaseTaskAlreadyRunning(Exception):
642 class DatabaseTasksService(Service):
643 """Run deferred database operations one at a time.
644
645- Once the service is started, `deferTask` and `addTask` can be used to
646- queue up execution of a database task.
647+ Once the service is started, `deferTask`, `addTask` and
648+ `deferTaskWithCallbacks` can be used to queue up execution of a database task.
649
650- The former — `deferTask` — will return a `Deferred` that fires with the
651+ The former —`deferTaskWithCallbacks` — returns nothing, and can be used to add tasks with callbacks to the queue.
652+ Errors arising from this task are simply logged.
653+
654+ `deferTask` — will return a `Deferred` that fires with the
655 result of the database task. Errors arising from this task become the
656 responsibility of the caller.
657
658- The latter — `addTask` — returns nothing, and will log errors arising from
659+ `addTask` — returns nothing, and will log errors arising from
660 the database task.
661
662 Before this service has been started, and as soon as shutdown has
663@@ -50,6 +53,24 @@ class DatabaseTasksService(Service):
664 self.queue = DeferredQueue(size=0, backlog=1)
665
666 @asynchronous
667+ def deferTaskWithCallbacks(self, func, callbacks, *args, **kwargs):
668+ """Schedules `func` with callbacks to run later.
669+
670+ :raise QueueOverflow: If the queue of tasks is full.
671+ :return: `None`
672+ """
673+
674+ def task():
675+ d = deferToDatabase(func, *args, **kwargs)
676+ for callback in callbacks:
677+ d.addCallback(callback)
678+ d.addErrback(log.err, "Unhandled failure in database task.")
679+ return d
680+
681+ self.queue.put(task)
682+ return None
683+
684+ @asynchronous
685 def deferTask(self, func, *args, **kwargs):
686 """Schedules `func` to run later.
687
688diff --git a/src/maasserver/utils/tests/test_dbtasks.py b/src/maasserver/utils/tests/test_dbtasks.py
689index 0f54e05..06335ef 100644
690--- a/src/maasserver/utils/tests/test_dbtasks.py
691+++ b/src/maasserver/utils/tests/test_dbtasks.py
692@@ -120,6 +120,27 @@ class TestDatabaseTaskService(MAASTestCase):
693 finally:
694 service.stopService()
695
696+ def test_callbacks_are_called_from_deferredTask(self):
697+ def simple_function(input):
698+ return input + " world"
699+
700+ def callback(data):
701+ sentinel.callback = data
702+
703+ service = DatabaseTasksService()
704+ service.startService()
705+ try:
706+ service.deferTaskWithCallbacks(
707+ simple_function, [callback], "Hello"
708+ )
709+ service.syncTask().wait(TIMEOUT)
710+ self.assertEqual(
711+ "Hello world",
712+ sentinel.callback,
713+ )
714+ finally:
715+ service.stopService()
716+
717 def test_tasks_are_all_run_before_shutdown_completes(self):
718 service = DatabaseTasksService()
719 service.startService()

Subscribers

People subscribed via source and target branches