Merge lp:~abentley/launchpad/restore-queue-test into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 15684
Proposed branch: lp:~abentley/launchpad/restore-queue-test
Merge into: lp:launchpad
Diff against target: 289 lines (+125/-24)
8 files modified
lib/lp/services/config/schema-lazr.conf (+7/-1)
lib/lp/services/job/celeryconfig.py (+5/-1)
lib/lp/services/job/celeryjob.py (+32/-14)
lib/lp/services/job/runner.py (+1/-1)
lib/lp/services/job/tests/celery_helpers.py (+12/-1)
lib/lp/services/job/tests/test_celery_configuration.py (+2/-1)
lib/lp/services/job/tests/test_celeryjob.py (+65/-4)
lib/lp/services/job/tests/test_runner.py (+1/-1)
To merge this branch: bzr merge lp:~abentley/launchpad/restore-queue-test
Reviewer Review Type Date Requested Status
Richard Harding (community) Approve
Review via email: mp+116534@code.launchpad.net

Commit message

Fix bug #1015667: Celery is leaving an ever increasing number of queues behind

Description of the change

= Summary =
Fix bug #1015667: Celery is leaving an ever increasing number of queues behind

== Proposed fix ==
Re-apply the changes from r15656, and fix the test.

== Pre-implementation notes ==
None

== LOC Rationale ==
Reverting a rollback

== Implementation details ==
Add polling to ensure that list_queued does not race with RabbitMQ message delivery.

Replace existing polling by waiting for a no-op task.

== Tests ==
bin/test -t test_run_missing_ready_does_not_return_results

== Demo and Q/A ==
Enable Celery and celery-based branch scanning on qastaging. Run for at least an hour. No mysterious queues should be left behind.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/config/schema-lazr.conf
  lib/lp/services/job/runner.py
  lib/lp/services/job/tests/celery_helpers.py
  lib/lp/services/job/celeryjob.py
  lib/lp/services/job/tests/test_celeryjob.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/tests/test_celery_configuration.py

./lib/lp/services/config/schema-lazr.conf
     445: Line exceeds 80 characters.
    1038: Line exceeds 80 characters.
    1045: Line exceeds 80 characters.
    1583: Line exceeds 80 characters.

To post a comment you must log in.
Revision history for this message
Richard Harding (rharding) wrote :

Thanks for the update Aaron.

Only comments are would it make sense for the db user to be an attribute of the job class for the _init_task() call so that there's some context on what that string is.

Would the noop task be more a test helper and be specific to that vs in the list of normal jobs or is it logical that someone might want to use it for actual regular processing at some point?

Finally, the TransactionFreeOperation is a bit confusing as I'd not expect the transaction.commit at the end of something that's not a transaction? Looking at the object, it appears to just be something that says there aren't any current active transactions, so is this just guarding against multiple transactions stomping on each other? Since this is copied from the old code and not pertaining the queues, marking approved but appreciate any education lesson on this front.

review: Approve
Revision history for this message
Aaron Bentley (abentley) wrote :

> Thanks for the update Aaron.
>
> Only comments are would it make sense for the db user to be an attribute of
> the job class for the _init_task() call so that there's some context on what
> that string is.

That's not a Job class, it's a native Celery Task class. Job classes do indeed have a dbuser and use that for init_task.

> Would the noop task be more a test helper and be specific to that vs in the
> list of normal jobs or is it logical that someone might want to use it for
> actual regular processing at some point?

Yes. All of the tasks in celery_helpers are test helpers, and are only available during test runs.

> Finally, the TransactionFreeOperation is a bit confusing as I'd not expect the
> transaction.commit at the end of something that's not a transaction? Looking
> at the object, it appears to just be something that says there aren't any
> current active transactions, so is this just guarding against multiple
> transactions stomping on each other? Since this is copied from the old code
> and not pertaining the queues, marking approved but appreciate any education
> lesson on this front.

It's ensuring that the operation does not leave any open transactions, because that would prevent other operations from acquiring locks and eventually force them to time out. You're right that the name is a bit confusing.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/services/config/schema-lazr.conf'
--- lib/lp/services/config/schema-lazr.conf 2012-07-23 13:29:01 +0000
+++ lib/lp/services/config/schema-lazr.conf 2012-07-24 19:53:20 +0000
@@ -1805,7 +1805,7 @@
18051805
1806[job_runner_queues]1806[job_runner_queues]
1807# The names of all queues.1807# The names of all queues.
1808queues: job job_slow branch_write_job branch_write_job_slow1808queues: job job_slow branch_write_job branch_write_job_slow celerybeat
18091809
1810# The main job queue.1810# The main job queue.
1811[job]1811[job]
@@ -1837,3 +1837,9 @@
1837timeout: 864001837timeout: 86400
1838fallback_queue:1838fallback_queue:
1839concurrency: 11839concurrency: 1
1840
1841# The queue used for the celerybeat task RunMissingReady
1842[celerybeat]
1843timeout: 86400
1844fallback_queue:
1845concurrency: 1
18401846
=== modified file 'lib/lp/services/job/celeryconfig.py'
--- lib/lp/services/job/celeryconfig.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/celeryconfig.py 2012-07-24 19:53:20 +0000
@@ -35,6 +35,7 @@
35 Doing this in a function is convenient for testing.35 Doing this in a function is convenient for testing.
36 """36 """
37 result = {}37 result = {}
38 CELERY_BEAT_QUEUE = 'celerybeat'
38 celery_queues = {}39 celery_queues = {}
39 queue_names = config.job_runner_queues.queues40 queue_names = config.job_runner_queues.queues
40 queue_names = queue_names.split(' ')41 queue_names = queue_names.split(' ')
@@ -85,7 +86,10 @@
85 result['CELERYBEAT_SCHEDULE'] = {86 result['CELERYBEAT_SCHEDULE'] = {
86 'schedule-missing': {87 'schedule-missing': {
87 'task': 'lp.services.job.celeryjob.run_missing_ready',88 'task': 'lp.services.job.celeryjob.run_missing_ready',
88 'schedule': timedelta(seconds=600)89 'schedule': timedelta(seconds=600),
90 'options': {
91 'routing_key': CELERY_BEAT_QUEUE,
92 },
89 }93 }
90 }94 }
91 # See http://ask.github.com/celery/userguide/optimizing.html:95 # See http://ask.github.com/celery/userguide/optimizing.html:
9296
=== modified file 'lib/lp/services/job/celeryjob.py'
--- lib/lp/services/job/celeryjob.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/celeryjob.py 2012-07-24 19:53:20 +0000
@@ -16,10 +16,11 @@
1616
17from logging import info17from logging import info
18import os18import os
19from uuid import uuid4
1920
2021
21os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')22os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
22from celery.task import task23from celery.task import Task
23from lazr.jobrunner.celerytask import RunJob24from lazr.jobrunner.celerytask import RunJob
24from storm.zope.interfaces import IZStorm25from storm.zope.interfaces import IZStorm
25import transaction26import transaction
@@ -80,24 +81,41 @@
80 queued_job_ids]81 queued_job_ids]
8182
8283
83@task84class RunMissingReady(Task):
84def run_missing_ready(_no_init=False):
85 """Task to run any jobs that are ready but not scheduled.85 """Task to run any jobs that are ready but not scheduled.
8686
87 Currently supports only BranchScanJob.87 Currently supports only BranchScanJob.
88 :param _no_init: For tests. If True, do not perform the initialization.88 :param _no_init: For tests. If True, do not perform the initialization.
89 """89 """
90 if not _no_init:90 ignore_result = True
91 task_init('run_missing_ready')91
92 with TransactionFreeOperation():92 def run(self, _no_init=False):
93 count = 093 if not _no_init:
94 for job in find_missing_ready(BranchScanJob):94 task_init('run_missing_ready')
95 if not celery_enabled(job.__class__.__name__):95 with TransactionFreeOperation():
96 continue96 count = 0
97 job.celeryCommitHook(True)97 for job in find_missing_ready(BranchScanJob):
98 count += 198 if not celery_enabled(job.__class__.__name__):
99 info('Scheduled %d missing jobs.', count)99 continue
100 transaction.commit()100 job.celeryCommitHook(True)
101 count += 1
102 info('Scheduled %d missing jobs.', count)
103 transaction.commit()
104
105 def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
106 connection=None, router=None, queues=None, **options):
107 """Create a task_id if none is specified.
108
109 Override the quite generic default task_id with one containing
110 the class name.
111
112 See also `celery.task.Task.apply_async()`.
113 """
114 if task_id is None:
115 task_id = '%s_%s' % (self.__class__.__name__, uuid4())
116 return super(RunMissingReady, self).apply_async(
117 args, kwargs, task_id, publisher, connection, router, queues,
118 **options)
101119
102120
103needs_zcml = True121needs_zcml = True
104122
=== modified file 'lib/lp/services/job/runner.py'
--- lib/lp/services/job/runner.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/runner.py 2012-07-24 19:53:20 +0000
@@ -217,7 +217,7 @@
217 'result': SoftTimeLimitExceeded(1,),217 'result': SoftTimeLimitExceeded(1,),
218 'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}218 'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}
219 """219 """
220 return '%s-%s-%s' % (220 return '%s_%s_%s' % (
221 self.__class__.__name__, self.job_id, uuid4())221 self.__class__.__name__, self.job_id, uuid4())
222222
223 def runViaCelery(self, ignore_result=False):223 def runViaCelery(self, ignore_result=False):
224224
=== modified file 'lib/lp/services/job/tests/celery_helpers.py'
--- lib/lp/services/job/tests/celery_helpers.py 2012-06-14 05:18:22 +0000
+++ lib/lp/services/job/tests/celery_helpers.py 2012-07-24 19:53:20 +0000
@@ -3,7 +3,10 @@
33
4__metaclass__ = type4__metaclass__ = type
55
6__all__ = ['pop_notifications']6__all__ = [
7 'noop',
8 'pop_notifications'
9 ]
710
8# Force the correct celeryconfig to be used.11# Force the correct celeryconfig to be used.
9import lp.services.job.celeryjob12import lp.services.job.celeryjob
@@ -18,3 +21,11 @@
18def pop_notifications():21def pop_notifications():
19 from lp.testing.mail_helpers import pop_notifications22 from lp.testing.mail_helpers import pop_notifications
20 return pop_notifications()23 return pop_notifications()
24
25
26@task
27def noop():
28 """Task that does nothing.
29
30 Used to ensure that other tasks have completed.
31 """
2132
=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
--- lib/lp/services/job/tests/test_celery_configuration.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_celery_configuration.py 2012-07-24 19:53:20 +0000
@@ -25,7 +25,8 @@
25 # Four queues are defined; the binding key for each queue is25 # Four queues are defined; the binding key for each queue is
26 # just the queue name.26 # just the queue name.
27 queue_names = [27 queue_names = [
28 'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']28 'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
29 'job_slow']
29 queues = config['CELERY_QUEUES']30 queues = config['CELERY_QUEUES']
30 self.assertEqual(queue_names, sorted(queues))31 self.assertEqual(queue_names, sorted(queues))
31 for name in queue_names:32 for name in queue_names:
3233
=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
--- lib/lp/services/job/tests/test_celeryjob.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_celeryjob.py 2012-07-24 19:53:20 +0000
@@ -1,10 +1,15 @@
1# Copyright 2012 Canonical Ltd. This software is licensed under the1# Copyright 2012 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4from cStringIO import StringIO
5import sys
6from time import sleep
7from lazr.jobrunner.bin.clear_queues import clear_queues
4from lp.code.model.branchjob import BranchScanJob8from lp.code.model.branchjob import BranchScanJob
5from lp.scripts.helpers import TransactionFreeOperation9from lp.scripts.helpers import TransactionFreeOperation
6from lp.services.features.testing import FeatureFixture10from lp.services.features.testing import FeatureFixture
7from lp.services.job.tests import (11from lp.services.job.tests import (
12 celeryd,
8 drain_celery_queues,13 drain_celery_queues,
9 monitor_celery,14 monitor_celery,
10 )15 )
@@ -21,10 +26,10 @@
21 super(TestRunMissingJobs, self).setUp()26 super(TestRunMissingJobs, self).setUp()
22 from lp.services.job.celeryjob import (27 from lp.services.job.celeryjob import (
23 find_missing_ready,28 find_missing_ready,
24 run_missing_ready,29 RunMissingReady,
25 )30 )
26 self.find_missing_ready = find_missing_ready31 self.find_missing_ready = find_missing_ready
27 self.run_missing_ready = run_missing_ready32 self.RunMissingReady = RunMissingReady
2833
29 def createMissingJob(self):34 def createMissingJob(self):
30 job = BranchScanJob.create(self.factory.makeBranch())35 job = BranchScanJob.create(self.factory.makeBranch())
@@ -48,7 +53,7 @@
48 with monitor_celery() as responses:53 with monitor_celery() as responses:
49 with dbuser('run_missing_ready'):54 with dbuser('run_missing_ready'):
50 with TransactionFreeOperation.require():55 with TransactionFreeOperation.require():
51 self.run_missing_ready(_no_init=True)56 self.RunMissingReady().run(_no_init=True)
52 self.assertEqual([], responses)57 self.assertEqual([], responses)
5358
54 def test_run_missing_ready(self):59 def test_run_missing_ready(self):
@@ -59,5 +64,61 @@
59 with monitor_celery() as responses:64 with monitor_celery() as responses:
60 with dbuser('run_missing_ready'):65 with dbuser('run_missing_ready'):
61 with TransactionFreeOperation.require():66 with TransactionFreeOperation.require():
62 self.run_missing_ready(_no_init=True)67 self.RunMissingReady().run(_no_init=True)
63 self.assertEqual(1, len(responses))68 self.assertEqual(1, len(responses))
69
70 def test_run_missing_ready_does_not_return_results(self):
71 """The celerybeat task run_missing_ready does not create a
72 result queue."""
73 from lazr.jobrunner.celerytask import list_queued
74 from lp.services.job.tests.celery_helpers import noop
75 job_queue_name = 'celerybeat'
76 request = self.RunMissingReady().apply_async(
77 kwargs={'_no_init': True}, queue=job_queue_name)
78 self.assertTrue(request.task_id.startswith('RunMissingReady_'))
79 result_queue_name = request.task_id.replace('-', '')
80 # Paranoia check: This test intends to prove that a Celery
81 # result queue for the task created above will _not_ be created.
82 # This would also happen when "with celeryd()" would do nothing.
83 # So let's be sure that a task is queued...
84 # Give the system some time to deliver the message
85 for x in range(10):
86 if list_queued(self.RunMissingReady.app, [job_queue_name]) > 0:
87 break
88 sleep(1)
89 self.assertEqual(
90 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
91 # Wait at most 60 seconds for celeryd to start and process
92 # the task.
93 with celeryd(job_queue_name):
94 # Due to FIFO ordering, this will only return after
95 # RunMissingReady has finished.
96 noop.apply_async(queue=job_queue_name).wait(60)
97 # But now the message has been consumed by celeryd.
98 self.assertEqual(
99 0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
100 # No result queue was created for the task.
101 try:
102 real_stdout = sys.stdout
103 real_stderr = sys.stderr
104 sys.stdout = fake_stdout = StringIO()
105 sys.stderr = fake_stderr = StringIO()
106 clear_queues(
107 ['script_name', '-c', 'lp.services.job.celeryconfig',
108 result_queue_name])
109 finally:
110 sys.stdout = real_stdout
111 sys.stderr = real_stderr
112 fake_stdout = fake_stdout.getvalue()
113 fake_stderr = fake_stderr.getvalue()
114 self.assertEqual(
115 '', fake_stdout,
116 "Unexpected output from clear_queues:\n"
117 "stdout: %r\n"
118 "stderr: %r" % (fake_stdout, fake_stderr))
119 self.assertEqual(
120 "NOT_FOUND - no queue '%s' in vhost '/'\n" % result_queue_name,
121 fake_stderr,
122 "Unexpected output from clear_queues:\n"
123 "stdout: %r\n"
124 "stderr: %r" % (fake_stdout, fake_stderr))
64125
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2012-07-23 13:29:01 +0000
+++ lib/lp/services/job/tests/test_runner.py 2012-07-24 19:53:20 +0000
@@ -384,7 +384,7 @@
384 task_id = job.taskId()384 task_id = job.taskId()
385 uuid_expr = (385 uuid_expr = (
386 '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')386 '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
387 mo = re.search('^NullJob-%s-%s$' % (job.job_id, uuid_expr), task_id)387 mo = re.search('^NullJob_%s_%s$' % (job.job_id, uuid_expr), task_id)
388 self.assertIsNot(None, mo)388 self.assertIsNot(None, mo)
389389
390390