Merge ~r00ta/maas:lp-2045228-fix-concurrent-dns-updates into maas:master

Proposed by Jacopo Rota
Status: Merged
Approved by: Jacopo Rota
Approved revision: a85a852185ec5351afa445b2e9e75fc04855a41a
Merge reported by: MAAS Lander
Merged at revision: not available
Proposed branch: ~r00ta/maas:lp-2045228-fix-concurrent-dns-updates
Merge into: maas:master
Diff against target: 719 lines (+163/-77)
7 files modified
src/maasserver/eventloop.py (+8/-3)
src/maasserver/region_controller.py (+21/-13)
src/maasserver/tests/test_eventloop.py (+13/-1)
src/maasserver/tests/test_plugin.py (+2/-0)
src/maasserver/tests/test_region_controller.py (+73/-56)
src/maasserver/utils/dbtasks.py (+25/-4)
src/maasserver/utils/tests/test_dbtasks.py (+21/-0)
Reviewer Review Type Date Requested Status
Christian Grabowski Approve
MAAS Lander Approve
Review via email: mp+457554@code.launchpad.net

Commit message

fix: lp-2045228 race condition in postgres listener for dns dynamic updates

Description of the change

Since the dns dynamic updates must be consumed in sequence by the postgres listener, we have to offload the notifications to a DatabaseTasksService that will execute the tasks in order.

To post a comment you must log in.
a85a852... by Jacopo Rota

fix: lp-2045228 race condition in postgres listener for dns dynamic updates

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b lp-2045228-fix-concurrent-dns-updates lp:~r00ta/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: FAILED
LOG: http://maas-ci.internal:8080/job/maas-tester/4236/console
COMMIT: 5077da0bf100f989c1c157b3ef8d701b5410693a

review: Needs Fixing
Revision history for this message
Jacopo Rota (r00ta) wrote :

jenkins: !test

Revision history for this message
MAAS Lander (maas-lander) wrote :

UNIT TESTS
-b lp-2045228-fix-concurrent-dns-updates lp:~r00ta/maas/+git/maas into -b master lp:~maas-committers/maas

STATUS: SUCCESS
COMMIT: a85a852185ec5351afa445b2e9e75fc04855a41a

review: Approve
Revision history for this message
Christian Grabowski (cgrabowski) wrote :

+1

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/src/maasserver/eventloop.py b/src/maasserver/eventloop.py
2index b4f9334..82be437 100644
3--- a/src/maasserver/eventloop.py
4+++ b/src/maasserver/eventloop.py
5@@ -67,10 +67,10 @@ def make_DatabaseTaskService():
6 return dbtasks.DatabaseTasksService()
7
8
9-def make_RegionControllerService(postgresListener):
10+def make_RegionControllerService(postgresListener, dbtasks):
11 from maasserver.region_controller import RegionControllerService
12
13- return RegionControllerService(postgresListener)
14+ return RegionControllerService(postgresListener, dbtasks)
15
16
17 def make_RegionService(ipcWorker):
18@@ -299,10 +299,15 @@ class RegionEventLoop:
19 "factory": make_DatabaseTaskService,
20 "requires": [],
21 },
22+ "database-tasks-master": {
23+ "only_on_master": True,
24+ "factory": make_DatabaseTaskService,
25+ "requires": [],
26+ },
27 "region-controller": {
28 "only_on_master": True,
29 "factory": make_RegionControllerService,
30- "requires": ["postgres-listener-master"],
31+ "requires": ["postgres-listener-master", "database-tasks-master"],
32 },
33 "rpc": {
34 "only_on_master": False,
35diff --git a/src/maasserver/region_controller.py b/src/maasserver/region_controller.py
36index 26e2c21..756dfbd 100644
37--- a/src/maasserver/region_controller.py
38+++ b/src/maasserver/region_controller.py
39@@ -78,6 +78,7 @@ class RegionControllerService(Service):
40 def __init__(
41 self,
42 postgresListener,
43+ dbtasks,
44 clock=reactor,
45 retryOnFailure=True,
46 rbacRetryOnFailureDelay=10,
47@@ -103,6 +104,7 @@ class RegionControllerService(Service):
48 self._dns_requires_full_reload = True
49 self._dns_latest_serial = None
50 self.postgresListener = postgresListener
51+ self.dbtasks = dbtasks
52 self.dnsResolver = Resolver(
53 resolv=None,
54 servers=[("127.0.0.1", 53)],
55@@ -175,27 +177,33 @@ class RegionControllerService(Service):
56 )
57 eventloop.restart()
58
59- @asynchronous(timeout=FOREVER)
60- @inlineCallbacks
61 def queueDynamicDNSUpdate(self, channel, message):
62 """
63 Called when the `sys_dns_update` message is received
64- and queues updates for existing domains
65+ and queues updates for existing domains.
66+ The updates are offloaded to the DatabaseTasksService in order to
67+ process them in sequence and keep consuming the next postgres notifications.
68 """
69+
70+ def updateCallback(data):
71+ (new_updates, need_reload) = data
72+ self._dns_requires_full_reload = (
73+ self._dns_requires_full_reload or need_reload
74+ )
75+ if self._dns_update_in_progress:
76+ self._queued_updates += new_updates
77+ else:
78+ self._dns_updates += new_updates
79+
80+ log.debug("Start processing dynamic DNS update '{}'".format(message))
81 if message == "":
82 return
83
84- (new_updates, need_reload) = yield deferToDatabase(
85- process_dns_update_notify, message
86- )
87-
88- self._dns_requires_full_reload = (
89- self._dns_requires_full_reload or need_reload
90+ self.dbtasks.deferTaskWithCallbacks(
91+ process_dns_update_notify,
92+ [updateCallback],
93+ message,
94 )
95- if self._dns_update_in_progress:
96- self._queued_updates += new_updates
97- else:
98- self._dns_updates += new_updates
99
100 def startProcessing(self):
101 """Start the process looping call."""
102diff --git a/src/maasserver/tests/test_eventloop.py b/src/maasserver/tests/test_eventloop.py
103index 691ff01..02c1ac4 100644
104--- a/src/maasserver/tests/test_eventloop.py
105+++ b/src/maasserver/tests/test_eventloop.py
106@@ -368,9 +368,21 @@ class TestFactories(MAASServerTestCase):
107 eventloop.loop.factories["database-tasks"]["only_on_master"]
108 )
109
110+ def test_make_MasterDatabaseTaskService(self):
111+ service = eventloop.make_DatabaseTaskService()
112+ self.assertIsInstance(service, dbtasks.DatabaseTasksService)
113+ # It is registered as a factory in RegionEventLoop.
114+ self.assertIs(
115+ eventloop.make_DatabaseTaskService,
116+ eventloop.loop.factories["database-tasks-master"]["factory"],
117+ )
118+ self.assertTrue(
119+ eventloop.loop.factories["database-tasks-master"]["only_on_master"]
120+ )
121+
122 def test_make_RegionControllerService(self):
123 service = eventloop.make_RegionControllerService(
124- sentinel.postgresListener
125+ sentinel.postgresListener, sentinel.dbtasks
126 )
127 self.assertIsInstance(
128 service, region_controller.RegionControllerService
129diff --git a/src/maasserver/tests/test_plugin.py b/src/maasserver/tests/test_plugin.py
130index 5b2d8ac..6dfe177 100644
131--- a/src/maasserver/tests/test_plugin.py
132+++ b/src/maasserver/tests/test_plugin.py
133@@ -237,6 +237,7 @@ class TestRegionMasterServiceMaker(TestServiceMaker):
134 service = service_maker.makeService(options)
135 self.assertIsInstance(service, MultiService)
136 expected_services = {
137+ "database-tasks-master",
138 "region-controller",
139 "nonce-cleanup",
140 "dns-publication-cleanup",
141@@ -367,6 +368,7 @@ class TestRegionAllInOneServiceMaker(TestServiceMaker):
142 "web",
143 "ipc-worker",
144 # Master services.
145+ "database-tasks-master",
146 "region-controller",
147 "nonce-cleanup",
148 "dns-publication-cleanup",
149diff --git a/src/maasserver/tests/test_region_controller.py b/src/maasserver/tests/test_region_controller.py
150index fd974e8..b688e58 100644
151--- a/src/maasserver/tests/test_region_controller.py
152+++ b/src/maasserver/tests/test_region_controller.py
153@@ -29,6 +29,7 @@ from maasserver.testing.testcase import (
154 MAASServerTestCase,
155 MAASTransactionServerTestCase,
156 )
157+from maasserver.utils.dbtasks import DatabaseTasksService
158 from maasserver.utils.threads import deferToDatabase
159 from maastesting.crochet import wait_for
160 from maastesting.matchers import (
161@@ -43,12 +44,12 @@ wait_for_reactor = wait_for()
162
163
164 class TestRegionControllerService(MAASServerTestCase):
165- def make_service(self, listener):
166+ def make_service(self, listener=MagicMock(), dbtasks=MagicMock()):
167 # Don't retry on failure or the tests will loop forever.
168- return RegionControllerService(listener, retryOnFailure=False)
169+ return RegionControllerService(listener, dbtasks, retryOnFailure=False)
170
171 def test_init_sets_properties(self):
172- service = self.make_service(sentinel.listener)
173+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
174 self.assertThat(
175 service,
176 MatchesStructure.byEquality(
177@@ -56,6 +57,7 @@ class TestRegionControllerService(MAASServerTestCase):
178 processingDefer=None,
179 needsDNSUpdate=True,
180 postgresListener=sentinel.listener,
181+ dbtasks=sentinel.dbtasks,
182 ),
183 )
184
185@@ -107,31 +109,27 @@ class TestRegionControllerService(MAASServerTestCase):
186 @wait_for_reactor
187 @inlineCallbacks
188 def test_stopService_handles_canceling_processing(self):
189- listener = MagicMock()
190- service = self.make_service(listener)
191+ service = self.make_service()
192 service.startProcessing()
193 yield service.stopService()
194 self.assertIsNone(service.processingDefer)
195
196 def test_markDNSForUpdate_sets_needsDNSUpdate_and_starts_process(self):
197- listener = MagicMock()
198- service = self.make_service(listener)
199+ service = self.make_service()
200 mock_startProcessing = self.patch(service, "startProcessing")
201 service.markDNSForUpdate(None, None)
202 self.assertTrue(service.needsDNSUpdate)
203 self.assertThat(mock_startProcessing, MockCalledOnceWith())
204
205 def test_markProxyForUpdate_sets_needsProxyUpdate_and_starts_process(self):
206- listener = MagicMock()
207- service = self.make_service(listener)
208+ service = self.make_service()
209 mock_startProcessing = self.patch(service, "startProcessing")
210 service.markProxyForUpdate(None, None)
211 self.assertTrue(service.needsProxyUpdate)
212 self.assertThat(mock_startProcessing, MockCalledOnceWith())
213
214 def test_markRBACForUpdate_sets_needsRBACUpdate_and_starts_process(self):
215- listener = MagicMock()
216- service = self.make_service(listener)
217+ service = self.make_service()
218 mock_startProcessing = self.patch(service, "startProcessing")
219 service.markRBACForUpdate(None, None)
220 self.assertTrue(service.needsRBACUpdate)
221@@ -139,19 +137,19 @@ class TestRegionControllerService(MAASServerTestCase):
222
223 def test_restart_region_restarts_eventloop(self):
224 restart_mock = self.patch(eventloop, "restart")
225- service = self.make_service(MagicMock())
226+ service = self.make_service()
227 service.restartRegion("sys_vault_migration", "")
228 restart_mock.assert_called_once()
229
230 def test_startProcessing_doesnt_call_start_when_looping_call_running(self):
231- service = self.make_service(sentinel.listener)
232+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
233 mock_start = self.patch(service.processing, "start")
234 service.processing.running = True
235 service.startProcessing()
236 self.assertThat(mock_start, MockNotCalled())
237
238 def test_startProcessing_calls_start_when_looping_call_not_running(self):
239- service = self.make_service(sentinel.listener)
240+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
241 mock_start = self.patch(service.processing, "start")
242 service.startProcessing()
243 self.assertThat(mock_start, MockCalledOnceWith(0.1, now=False))
244@@ -159,7 +157,7 @@ class TestRegionControllerService(MAASServerTestCase):
245 @wait_for_reactor
246 @inlineCallbacks
247 def test_reload_dns_on_start(self):
248- service = self.make_service(sentinel.listener)
249+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
250 mock_dns_update_all_zones = self.patch(
251 region_controller, "dns_update_all_zones"
252 )
253@@ -177,7 +175,7 @@ class TestRegionControllerService(MAASServerTestCase):
254 @wait_for_reactor
255 @inlineCallbacks
256 def test_process_doesnt_update_zones_when_nothing_to_process(self):
257- service = self.make_service(sentinel.listener)
258+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
259 service.needsDNSUpdate = False
260 mock_dns_update_all_zones = self.patch(
261 region_controller, "dns_update_all_zones"
262@@ -189,7 +187,7 @@ class TestRegionControllerService(MAASServerTestCase):
263 @wait_for_reactor
264 @inlineCallbacks
265 def test_process_doesnt_proxy_update_config_when_nothing_to_process(self):
266- service = self.make_service(sentinel.listener)
267+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
268 service.needsProxyUpdate = False
269 mock_proxy_update_config = self.patch(
270 region_controller, "proxy_update_config"
271@@ -201,7 +199,7 @@ class TestRegionControllerService(MAASServerTestCase):
272 @wait_for_reactor
273 @inlineCallbacks
274 def test_process_doesnt_call_rbacSync_when_nothing_to_process(self):
275- service = self.make_service(sentinel.listener)
276+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
277 service.needsRBACUpdate = False
278 mock_rbacSync = self.patch(service, "_rbacSync")
279 service.startProcessing()
280@@ -211,7 +209,7 @@ class TestRegionControllerService(MAASServerTestCase):
281 @wait_for_reactor
282 @inlineCallbacks
283 def test_process_stops_processing(self):
284- service = self.make_service(sentinel.listener)
285+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
286 service.needsDNSUpdate = False
287 service.startProcessing()
288 yield service.processingDefer
289@@ -220,7 +218,7 @@ class TestRegionControllerService(MAASServerTestCase):
290 @wait_for_reactor
291 @inlineCallbacks
292 def test_process_updates_zones(self):
293- service = self.make_service(sentinel.listener)
294+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
295 service.needsDNSUpdate = True
296 dns_result = (
297 random.randint(1, 1000),
298@@ -248,7 +246,7 @@ class TestRegionControllerService(MAASServerTestCase):
299 @wait_for_reactor
300 @inlineCallbacks
301 def test_process_zones_kills_bind_on_failed_reload(self):
302- service = self.make_service(sentinel.listener)
303+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
304 service.needsDNSUpdate = True
305 service.retryOnFailure = True
306 dns_result_0 = (
307@@ -295,7 +293,7 @@ class TestRegionControllerService(MAASServerTestCase):
308 @wait_for_reactor
309 @inlineCallbacks
310 def test_process_updates_proxy(self):
311- service = self.make_service(sentinel.listener)
312+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
313 service.needsProxyUpdate = True
314 mock_proxy_update_config = self.patch(
315 region_controller, "proxy_update_config"
316@@ -314,7 +312,7 @@ class TestRegionControllerService(MAASServerTestCase):
317 @wait_for_reactor
318 @inlineCallbacks
319 def test_process_updates_rbac(self):
320- service = self.make_service(sentinel.listener)
321+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
322 service.needsRBACUpdate = True
323 mock_rbacSync = self.patch(service, "_rbacSync")
324 mock_rbacSync.return_value = []
325@@ -330,7 +328,7 @@ class TestRegionControllerService(MAASServerTestCase):
326 @wait_for_reactor
327 @inlineCallbacks
328 def test_process_updates_zones_logs_failure(self):
329- service = self.make_service(sentinel.listener)
330+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
331 service.needsDNSUpdate = True
332 mock_dns_update_all_zones = self.patch(
333 region_controller, "dns_update_all_zones"
334@@ -349,7 +347,7 @@ class TestRegionControllerService(MAASServerTestCase):
335 @wait_for_reactor
336 @inlineCallbacks
337 def test_process_updates_proxy_logs_failure(self):
338- service = self.make_service(sentinel.listener)
339+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
340 service.needsProxyUpdate = True
341 mock_proxy_update_config = self.patch(
342 region_controller, "proxy_update_config"
343@@ -368,7 +366,7 @@ class TestRegionControllerService(MAASServerTestCase):
344 @wait_for_reactor
345 @inlineCallbacks
346 def test_process_updates_rbac_logs_failure(self):
347- service = self.make_service(sentinel.listener)
348+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
349 service.needsRBACUpdate = True
350 mock_rbacSync = self.patch(service, "_rbacSync")
351 mock_rbacSync.side_effect = factory.make_exception()
352@@ -383,7 +381,7 @@ class TestRegionControllerService(MAASServerTestCase):
353 @wait_for_reactor
354 @inlineCallbacks
355 def test_process_updates_rbac_retries_with_delay(self):
356- service = self.make_service(sentinel.listener)
357+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
358 service.needsRBACUpdate = True
359 service.retryOnFailure = True
360 service.rbacRetryOnFailureDelay = random.randint(1, 10)
361@@ -405,7 +403,7 @@ class TestRegionControllerService(MAASServerTestCase):
362 @wait_for_reactor
363 @inlineCallbacks
364 def test_process_updates_bind_proxy_and_rbac(self):
365- service = self.make_service(sentinel.listener)
366+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
367 service.needsDNSUpdate = True
368 service.needsProxyUpdate = True
369 service.needsRBACUpdate = True
370@@ -441,7 +439,7 @@ class TestRegionControllerService(MAASServerTestCase):
371
372 @wait_for_reactor
373 def test_check_serial_doesnt_raise_error_on_successful_serial_match(self):
374- service = self.make_service(sentinel.listener)
375+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
376 result_serial = random.randint(1, 1000)
377 formatted_serial = f"{result_serial:10d}"
378 dns_names = [factory.make_name("domain") for _ in range(3)]
379@@ -472,7 +470,7 @@ class TestRegionControllerService(MAASServerTestCase):
380 @wait_for_reactor
381 @inlineCallbacks
382 def test_check_serial_raise_error_after_30_tries(self):
383- service = self.make_service(sentinel.listener)
384+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
385 result_serial = random.randint(1, 1000)
386 formatted_serial = f"{result_serial:10d}"
387 dns_names = [factory.make_name("domain") for _ in range(3)]
388@@ -487,7 +485,7 @@ class TestRegionControllerService(MAASServerTestCase):
389 @wait_for_reactor
390 @inlineCallbacks
391 def test_check_serial_handles_ValueError(self):
392- service = self.make_service(sentinel.listener)
393+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
394 result_serial = random.randint(1, 1000)
395 formatted_serial = f"{result_serial:10d}"
396 dns_names = [factory.make_name("domain") for _ in range(3)]
397@@ -502,7 +500,7 @@ class TestRegionControllerService(MAASServerTestCase):
398 @wait_for_reactor
399 @inlineCallbacks
400 def test_check_serial_handles_TimeoutError(self):
401- service = self.make_service(sentinel.listener)
402+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
403 result_serial = random.randint(1, 1000)
404 formatted_serial = f"{result_serial:10d}"
405 dns_names = [factory.make_name("domain") for _ in range(3)]
406@@ -515,7 +513,7 @@ class TestRegionControllerService(MAASServerTestCase):
407 yield service._checkSerial((formatted_serial, True, dns_names))
408
409 def test_getRBACClient_returns_None_when_no_url(self):
410- service = self.make_service(sentinel.listener)
411+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
412 service.rbacClient = sentinel.client
413 SecretManager().delete_secret("external-auth")
414 self.assertIsNone(service._getRBACClient())
415@@ -526,7 +524,7 @@ class TestRegionControllerService(MAASServerTestCase):
416 SecretManager().set_composite_secret(
417 "external-auth", {"rbac-url": "http://rbac.example.com"}
418 )
419- service = self.make_service(sentinel.listener)
420+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
421 client = service._getRBACClient()
422 self.assertIsNotNone(client)
423 self.assertIs(client, service.rbacClient)
424@@ -537,7 +535,7 @@ class TestRegionControllerService(MAASServerTestCase):
425 SecretManager().set_composite_secret(
426 "external-auth", {"rbac-url": "http://rbac.example.com"}
427 )
428- service = self.make_service(sentinel.listener)
429+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
430 client = service._getRBACClient()
431 SecretManager().set_composite_secret(
432 "external-auth", {"rbac-url": "http://other.example.com"}
433@@ -552,7 +550,7 @@ class TestRegionControllerService(MAASServerTestCase):
434 SecretManager().set_composite_secret(
435 "external-auth", {"rbac-url": "http://rbac.example.com"}
436 )
437- service = self.make_service(sentinel.listener)
438+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
439 client = service._getRBACClient()
440 mock_get_auth_info.return_value = MagicMock()
441 new_client = service._getRBACClient()
442@@ -561,7 +559,7 @@ class TestRegionControllerService(MAASServerTestCase):
443 self.assertIs(new_client, service._getRBACClient())
444
445 def test_rbacNeedsFull(self):
446- service = self.make_service(sentinel.listener)
447+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
448 changes = [
449 RBACSync(action=RBAC_ACTION.ADD),
450 RBACSync(action=RBAC_ACTION.UPDATE),
451@@ -571,7 +569,7 @@ class TestRegionControllerService(MAASServerTestCase):
452 self.assertTrue(service._rbacNeedsFull(changes))
453
454 def test_rbacDifference(self):
455- service = self.make_service(sentinel.listener)
456+ service = self.make_service(sentinel.listener, sentinel.dbtasks)
457 changes = [
458 RBACSync(
459 action=RBAC_ACTION.UPDATE, resource_id=1, resource_name="r-1"
460@@ -636,7 +634,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
461 ]
462
463 publications = yield deferToDatabase(_create_publications)
464- service = RegionControllerService(sentinel.listener)
465+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
466 service.needsDNSUpdate = True
467 service.previousSerial = publications[0].serial
468 dns_result = (
469@@ -677,7 +675,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
470 ]
471
472 publications = yield deferToDatabase(_create_publications)
473- service = RegionControllerService(sentinel.listener)
474+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
475 service.needsDNSUpdate = True
476 service.previousSerial = publications[0].serial
477 dns_result = (
478@@ -708,14 +706,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
479 def test_rbacSync_returns_None_when_nothing_to_do(self):
480 RBACSync.objects.clear("resource-pool")
481
482- service = RegionControllerService(sentinel.listener)
483+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
484 service.rbacInit = True
485 self.assertIsNone(service._rbacSync())
486
487 def test_rbacSync_returns_None_and_clears_sync_when_no_client(self):
488 RBACSync.objects.create(resource_type="resource-pool")
489
490- service = RegionControllerService(sentinel.listener)
491+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
492 self.assertIsNone(service._rbacSync())
493 self.assertFalse(RBACSync.objects.exists())
494
495@@ -729,7 +727,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
496
497 rbac_client = MagicMock()
498 rbac_client.update_resources.return_value = "x-y-z"
499- service = RegionControllerService(sentinel.listener)
500+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
501 self.patch(service, "_getRBACClient").return_value = rbac_client
502
503 self.assertEqual([], service._rbacSync())
504@@ -748,7 +746,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
505
506 rbac_client = MagicMock()
507 rbac_client.update_resources.return_value = "x-y-z"
508- service = RegionControllerService(sentinel.listener)
509+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
510 self.patch(service, "_getRBACClient").return_value = rbac_client
511
512 self.assertEqual([], service._rbacSync())
513@@ -773,7 +771,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
514
515 rbac_client = MagicMock()
516 rbac_client.update_resources.return_value = "x-y-z"
517- service = RegionControllerService(sentinel.listener)
518+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
519 self.patch(service, "_getRBACClient").return_value = rbac_client
520 service.rbacInit = True
521
522@@ -807,7 +805,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
523 SyncConflictError(),
524 "x-y-z",
525 ]
526- service = RegionControllerService(sentinel.listener)
527+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
528 self.patch(service, "_getRBACClient").return_value = rbac_client
529 service.rbacInit = True
530
531@@ -838,7 +836,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
532
533 rbac_client = MagicMock()
534 rbac_client.update_resources.return_value = "x-y-z"
535- service = RegionControllerService(sentinel.listener)
536+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
537 self.patch(service, "_getRBACClient").return_value = rbac_client
538 service.rbacInit = True
539
540@@ -856,7 +854,9 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
541 domain = yield deferToDatabase(factory.make_Domain)
542 update_result = (random.randint(0, 10), True, [domain.name])
543 record = yield deferToDatabase(factory.make_DNSResource, domain=domain)
544- service = RegionControllerService(sentinel.listener)
545+ dbtasks = DatabaseTasksService()
546+ dbtasks.startService()
547+ service = RegionControllerService(sentinel.listener, dbtasks)
548
549 update_zones = self.patch(region_controller, "dns_update_all_zones")
550 update_zones.return_value = update_result
551@@ -864,10 +864,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
552 check_serial.return_value = succeed(update_result)
553
554 service._dns_update_in_progress = True
555- yield service.queueDynamicDNSUpdate(
556+ service.queueDynamicDNSUpdate(
557 factory.make_name(),
558 f"INSERT {domain.name} {record.name} A 30 10.10.10.10",
559 )
560+
561+ # Wait until all the dynamic updates are processed
562+ yield dbtasks.syncTask()
563+
564 self.assertCountEqual(service._dns_updates, [])
565 self.assertCountEqual(
566 service._queued_updates,
567@@ -902,10 +906,12 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
568 @wait_for_reactor
569 @inlineCallbacks
570 def test_dns_is_set_to_update_when_queued_updates_are_present(self):
571+ dbtasks = DatabaseTasksService()
572+ dbtasks.startService()
573 domain = yield deferToDatabase(factory.make_Domain)
574 update_result = (random.randint(0, 10), True, [domain.name])
575 record = yield deferToDatabase(factory.make_DNSResource, domain=domain)
576- service = RegionControllerService(sentinel.listener)
577+ service = RegionControllerService(sentinel.listener, dbtasks)
578
579 update_zones = self.patch(region_controller, "dns_update_all_zones")
580 update_zones.return_value = update_result
581@@ -913,10 +919,14 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
582 check_serial.return_value = succeed(update_result)
583
584 service._dns_update_in_progress = True
585- yield service.queueDynamicDNSUpdate(
586+ service.queueDynamicDNSUpdate(
587 factory.make_name(),
588 f"INSERT {domain.name} {record.name} A 30 10.10.10.10",
589 )
590+
591+ # Wait until all the dynamic updates are processed
592+ yield dbtasks.syncTask()
593+
594 self.assertCountEqual(service._dns_updates, [])
595 expected_updates = [
596 DynamicDNSUpdate(
597@@ -958,7 +968,7 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
598 def test_check_serial_is_skipped_if_a_newer_serial_exists(self):
599 domain = yield deferToDatabase(factory.make_Domain)
600 update_result = (random.randint(0, 10), True, [domain.name])
601- service = RegionControllerService(sentinel.listener)
602+ service = RegionControllerService(sentinel.listener, sentinel.dbtasks)
603
604 query = self.patch(service.dnsResolver, "lookupAuthority")
605
606@@ -968,12 +978,16 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
607
608 query.assert_not_called()
609
610+ @wait_for_reactor
611+ @inlineCallbacks
612 def test_queueDynamicDNSUpdate_can_be_called_synchronously(self):
613- domain = factory.make_Domain()
614+ dbtasks = DatabaseTasksService()
615+ dbtasks.startService()
616+ domain = yield deferToDatabase(factory.make_Domain)
617 update_result = (random.randint(0, 10), True, [domain.name])
618- record1 = factory.make_DNSResource(domain=domain)
619- record2 = factory.make_DNSResource(domain=domain)
620- service = RegionControllerService(sentinel.listener)
621+ record1 = yield deferToDatabase(factory.make_DNSResource, domain)
622+ record2 = yield deferToDatabase(factory.make_DNSResource, domain)
623+ service = RegionControllerService(sentinel.listener, dbtasks)
624
625 update_zones = self.patch(region_controller, "dns_update_all_zones")
626 update_zones.return_value = update_result
627@@ -998,6 +1012,9 @@ class TestRegionControllerServiceTransactional(MAASTransactionServerTestCase):
628 f"DELETE {domain.name} {record2.name} A 30 2.2.2.2",
629 )
630
631+ # Wait until all the dynamic updates are processed
632+ yield dbtasks.syncTask()
633+
634 self.assertCountEqual(
635 service._dns_updates,
636 [
637diff --git a/src/maasserver/utils/dbtasks.py b/src/maasserver/utils/dbtasks.py
638index 5863f02..04cbf7e 100644
639--- a/src/maasserver/utils/dbtasks.py
640+++ b/src/maasserver/utils/dbtasks.py
641@@ -26,14 +26,17 @@ class DatabaseTaskAlreadyRunning(Exception):
642 class DatabaseTasksService(Service):
643 """Run deferred database operations one at a time.
644
645- Once the service is started, `deferTask` and `addTask` can be used to
646- queue up execution of a database task.
647+ Once the service is started, `deferTask`, `addTask` and
648+ `deferTaskWithCallbacks` can be used to queue up execution of a database task.
649
650- The former — `deferTask` — will return a `Deferred` that fires with the
651+ The former —`deferTaskWithCallbacks` — returns nothing, and can be used to add tasks with callbacks to the queue.
652+ Errors arising from this task are simply logged.
653+
654+ `deferTask` — will return a `Deferred` that fires with the
655 result of the database task. Errors arising from this task become the
656 responsibility of the caller.
657
658- The latter — `addTask` — returns nothing, and will log errors arising from
659+ `addTask` — returns nothing, and will log errors arising from
660 the database task.
661
662 Before this service has been started, and as soon as shutdown has
663@@ -50,6 +53,24 @@ class DatabaseTasksService(Service):
664 self.queue = DeferredQueue(size=0, backlog=1)
665
666 @asynchronous
667+ def deferTaskWithCallbacks(self, func, callbacks, *args, **kwargs):
668+ """Schedules `func` with callbacks to run later.
669+
670+ :raise QueueOverflow: If the queue of tasks is full.
671+ :return: `None`
672+ """
673+
674+ def task():
675+ d = deferToDatabase(func, *args, **kwargs)
676+ for callback in callbacks:
677+ d.addCallback(callback)
678+ d.addErrback(log.err, "Unhandled failure in database task.")
679+ return d
680+
681+ self.queue.put(task)
682+ return None
683+
684+ @asynchronous
685 def deferTask(self, func, *args, **kwargs):
686 """Schedules `func` to run later.
687
688diff --git a/src/maasserver/utils/tests/test_dbtasks.py b/src/maasserver/utils/tests/test_dbtasks.py
689index 0f54e05..06335ef 100644
690--- a/src/maasserver/utils/tests/test_dbtasks.py
691+++ b/src/maasserver/utils/tests/test_dbtasks.py
692@@ -120,6 +120,27 @@ class TestDatabaseTaskService(MAASTestCase):
693 finally:
694 service.stopService()
695
696+ def test_callbacks_are_called_from_deferredTask(self):
697+ def simple_function(input):
698+ return input + " world"
699+
700+ def callback(data):
701+ sentinel.callback = data
702+
703+ service = DatabaseTasksService()
704+ service.startService()
705+ try:
706+ service.deferTaskWithCallbacks(
707+ simple_function, [callback], "Hello"
708+ )
709+ service.syncTask().wait(TIMEOUT)
710+ self.assertEqual(
711+ "Hello world",
712+ sentinel.callback,
713+ )
714+ finally:
715+ service.stopService()
716+
717 def test_tasks_are_all_run_before_shutdown_completes(self):
718 service = DatabaseTasksService()
719 service.startService()

Subscribers

People subscribed via source and target branches