Merge lp:~abentley/launchpad/restore-queue-test into lp:launchpad

Proposed by Aaron Bentley
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 15684
Proposed branch: lp:~abentley/launchpad/restore-queue-test
Merge into: lp:launchpad
Diff against target: 289 lines (+125/-24)
8 files modified
lib/lp/services/config/schema-lazr.conf (+7/-1)
lib/lp/services/job/celeryconfig.py (+5/-1)
lib/lp/services/job/celeryjob.py (+32/-14)
lib/lp/services/job/runner.py (+1/-1)
lib/lp/services/job/tests/celery_helpers.py (+12/-1)
lib/lp/services/job/tests/test_celery_configuration.py (+2/-1)
lib/lp/services/job/tests/test_celeryjob.py (+65/-4)
lib/lp/services/job/tests/test_runner.py (+1/-1)
To merge this branch: bzr merge lp:~abentley/launchpad/restore-queue-test
Reviewer Review Type Date Requested Status
Richard Harding (community) Approve
Review via email: mp+116534@code.launchpad.net

Commit message

Fix bug #1015667: Celery is leaving an ever increasing number of queues behind

Description of the change

= Summary =
Fix bug #1015667: Celery is leaving an ever increasing number of queues behind

== Proposed fix ==
Re-apply the changes from r15656, and fix the test.

== Pre-implementation notes ==
None

== LOC Rationale ==
Reverting a rollback

== Implementation details ==
Add polling to ensure that list_queued does not race with RabbitMQ message delivery.

Replace existing polling by waiting for a no-op task.

== Tests ==
bin/test -t test_run_missing_ready_does_not_return_results

== Demo and Q/A ==
Enable Celery and celery-based branch scanning on qastaging. Run for at least an hour. No mysterious queues should be left behind.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/config/schema-lazr.conf
  lib/lp/services/job/runner.py
  lib/lp/services/job/tests/celery_helpers.py
  lib/lp/services/job/celeryjob.py
  lib/lp/services/job/tests/test_celeryjob.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/tests/test_celery_configuration.py

./lib/lp/services/config/schema-lazr.conf
     445: Line exceeds 80 characters.
    1038: Line exceeds 80 characters.
    1045: Line exceeds 80 characters.
    1583: Line exceeds 80 characters.

To post a comment you must log in.
Revision history for this message
Richard Harding (rharding) wrote :

Thanks for the update Aaron.

Only comments are would it make sense for the db user to be an attribute of the job class for the _init_task() call so that there's some context on what that string is.

Would the noop task be more a test helper and be specific to that vs in the list of normal jobs or is it logical that someone might want to use it for actual regular processing at some point?

Finally, the TransactionFreeOperation is a bit confusing as I'd not expect the transaction.commit at the end of something that's not a transaction? Looking at the object, it appears to just be something that says there aren't any current active transactions, so is this just guarding against multiple transactions stomping on each other? Since this is copied from the old code and not pertaining the queues, marking approved but appreciate any education lesson on this front.

review: Approve
Revision history for this message
Aaron Bentley (abentley) wrote :

> Thanks for the update Aaron.
>
> Only comments are would it make sense for the db user to be an attribute of
> the job class for the _init_task() call so that there's some context on what
> that string is.

That's not a Job class, it's a native Celery Task class. Job classes do indeed have a dbuser and use that for init_task.

> Would the noop task be more a test helper and be specific to that vs in the
> list of normal jobs or is it logical that someone might want to use it for
> actual regular processing at some point?

Yes. All of the tasks in celery_helpers are test helpers, and are only available during test runs.

> Finally, the TransactionFreeOperation is a bit confusing as I'd not expect the
> transaction.commit at the end of something that's not a transaction? Looking
> at the object, it appears to just be something that says there aren't any
> current active transactions, so is this just guarding against multiple
> transactions stomping on each other? Since this is copied from the old code
> and not pertaining the queues, marking approved but appreciate any education
> lesson on this front.

It's ensuring that the operation does not leave any open transactions, because that would prevent other operations from acquiring locks and eventually force them to time out. You're right that the name is a bit confusing.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/config/schema-lazr.conf'
2--- lib/lp/services/config/schema-lazr.conf 2012-07-23 13:29:01 +0000
3+++ lib/lp/services/config/schema-lazr.conf 2012-07-24 19:53:20 +0000
4@@ -1805,7 +1805,7 @@
5
6 [job_runner_queues]
7 # The names of all queues.
8-queues: job job_slow branch_write_job branch_write_job_slow
9+queues: job job_slow branch_write_job branch_write_job_slow celerybeat
10
11 # The main job queue.
12 [job]
13@@ -1837,3 +1837,9 @@
14 timeout: 86400
15 fallback_queue:
16 concurrency: 1
17+
18+# The queue used for the celerybeat task RunMissingReady
19+[celerybeat]
20+timeout: 86400
21+fallback_queue:
22+concurrency: 1
23
24=== modified file 'lib/lp/services/job/celeryconfig.py'
25--- lib/lp/services/job/celeryconfig.py 2012-07-23 13:29:01 +0000
26+++ lib/lp/services/job/celeryconfig.py 2012-07-24 19:53:20 +0000
27@@ -35,6 +35,7 @@
28 Doing this in a function is convenient for testing.
29 """
30 result = {}
31+ CELERY_BEAT_QUEUE = 'celerybeat'
32 celery_queues = {}
33 queue_names = config.job_runner_queues.queues
34 queue_names = queue_names.split(' ')
35@@ -85,7 +86,10 @@
36 result['CELERYBEAT_SCHEDULE'] = {
37 'schedule-missing': {
38 'task': 'lp.services.job.celeryjob.run_missing_ready',
39- 'schedule': timedelta(seconds=600)
40+ 'schedule': timedelta(seconds=600),
41+ 'options': {
42+ 'routing_key': CELERY_BEAT_QUEUE,
43+ },
44 }
45 }
46 # See http://ask.github.com/celery/userguide/optimizing.html:
47
48=== modified file 'lib/lp/services/job/celeryjob.py'
49--- lib/lp/services/job/celeryjob.py 2012-07-23 13:29:01 +0000
50+++ lib/lp/services/job/celeryjob.py 2012-07-24 19:53:20 +0000
51@@ -16,10 +16,11 @@
52
53 from logging import info
54 import os
55+from uuid import uuid4
56
57
58 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
59-from celery.task import task
60+from celery.task import Task
61 from lazr.jobrunner.celerytask import RunJob
62 from storm.zope.interfaces import IZStorm
63 import transaction
64@@ -80,24 +81,41 @@
65 queued_job_ids]
66
67
68-@task
69-def run_missing_ready(_no_init=False):
70+class RunMissingReady(Task):
71 """Task to run any jobs that are ready but not scheduled.
72
73 Currently supports only BranchScanJob.
74 :param _no_init: For tests. If True, do not perform the initialization.
75 """
76- if not _no_init:
77- task_init('run_missing_ready')
78- with TransactionFreeOperation():
79- count = 0
80- for job in find_missing_ready(BranchScanJob):
81- if not celery_enabled(job.__class__.__name__):
82- continue
83- job.celeryCommitHook(True)
84- count += 1
85- info('Scheduled %d missing jobs.', count)
86- transaction.commit()
87+ ignore_result = True
88+
89+ def run(self, _no_init=False):
90+ if not _no_init:
91+ task_init('run_missing_ready')
92+ with TransactionFreeOperation():
93+ count = 0
94+ for job in find_missing_ready(BranchScanJob):
95+ if not celery_enabled(job.__class__.__name__):
96+ continue
97+ job.celeryCommitHook(True)
98+ count += 1
99+ info('Scheduled %d missing jobs.', count)
100+ transaction.commit()
101+
102+ def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
103+ connection=None, router=None, queues=None, **options):
104+ """Create a task_id if none is specified.
105+
106+ Override the quite generic default task_id with one containing
107+ the class name.
108+
109+ See also `celery.task.Task.apply_async()`.
110+ """
111+ if task_id is None:
112+ task_id = '%s_%s' % (self.__class__.__name__, uuid4())
113+ return super(RunMissingReady, self).apply_async(
114+ args, kwargs, task_id, publisher, connection, router, queues,
115+ **options)
116
117
118 needs_zcml = True
119
120=== modified file 'lib/lp/services/job/runner.py'
121--- lib/lp/services/job/runner.py 2012-07-23 13:29:01 +0000
122+++ lib/lp/services/job/runner.py 2012-07-24 19:53:20 +0000
123@@ -217,7 +217,7 @@
124 'result': SoftTimeLimitExceeded(1,),
125 'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}
126 """
127- return '%s-%s-%s' % (
128+ return '%s_%s_%s' % (
129 self.__class__.__name__, self.job_id, uuid4())
130
131 def runViaCelery(self, ignore_result=False):
132
133=== modified file 'lib/lp/services/job/tests/celery_helpers.py'
134--- lib/lp/services/job/tests/celery_helpers.py 2012-06-14 05:18:22 +0000
135+++ lib/lp/services/job/tests/celery_helpers.py 2012-07-24 19:53:20 +0000
136@@ -3,7 +3,10 @@
137
138 __metaclass__ = type
139
140-__all__ = ['pop_notifications']
141+__all__ = [
142+ 'noop',
143+ 'pop_notifications'
144+ ]
145
146 # Force the correct celeryconfig to be used.
147 import lp.services.job.celeryjob
148@@ -18,3 +21,11 @@
149 def pop_notifications():
150 from lp.testing.mail_helpers import pop_notifications
151 return pop_notifications()
152+
153+
154+@task
155+def noop():
156+ """Task that does nothing.
157+
158+ Used to ensure that other tasks have completed.
159+ """
160
161=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
162--- lib/lp/services/job/tests/test_celery_configuration.py 2012-07-23 13:29:01 +0000
163+++ lib/lp/services/job/tests/test_celery_configuration.py 2012-07-24 19:53:20 +0000
164@@ -25,7 +25,8 @@
165 # Four queues are defined; the binding key for each queue is
166 # just the queue name.
167 queue_names = [
168- 'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
169+ 'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
170+ 'job_slow']
171 queues = config['CELERY_QUEUES']
172 self.assertEqual(queue_names, sorted(queues))
173 for name in queue_names:
174
175=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
176--- lib/lp/services/job/tests/test_celeryjob.py 2012-07-23 13:29:01 +0000
177+++ lib/lp/services/job/tests/test_celeryjob.py 2012-07-24 19:53:20 +0000
178@@ -1,10 +1,15 @@
179 # Copyright 2012 Canonical Ltd. This software is licensed under the
180 # GNU Affero General Public License version 3 (see the file LICENSE).
181
182+from cStringIO import StringIO
183+import sys
184+from time import sleep
185+from lazr.jobrunner.bin.clear_queues import clear_queues
186 from lp.code.model.branchjob import BranchScanJob
187 from lp.scripts.helpers import TransactionFreeOperation
188 from lp.services.features.testing import FeatureFixture
189 from lp.services.job.tests import (
190+ celeryd,
191 drain_celery_queues,
192 monitor_celery,
193 )
194@@ -21,10 +26,10 @@
195 super(TestRunMissingJobs, self).setUp()
196 from lp.services.job.celeryjob import (
197 find_missing_ready,
198- run_missing_ready,
199+ RunMissingReady,
200 )
201 self.find_missing_ready = find_missing_ready
202- self.run_missing_ready = run_missing_ready
203+ self.RunMissingReady = RunMissingReady
204
205 def createMissingJob(self):
206 job = BranchScanJob.create(self.factory.makeBranch())
207@@ -48,7 +53,7 @@
208 with monitor_celery() as responses:
209 with dbuser('run_missing_ready'):
210 with TransactionFreeOperation.require():
211- self.run_missing_ready(_no_init=True)
212+ self.RunMissingReady().run(_no_init=True)
213 self.assertEqual([], responses)
214
215 def test_run_missing_ready(self):
216@@ -59,5 +64,61 @@
217 with monitor_celery() as responses:
218 with dbuser('run_missing_ready'):
219 with TransactionFreeOperation.require():
220- self.run_missing_ready(_no_init=True)
221+ self.RunMissingReady().run(_no_init=True)
222 self.assertEqual(1, len(responses))
223+
224+ def test_run_missing_ready_does_not_return_results(self):
225+ """The celerybeat task run_missing_ready does not create a
226+ result queue."""
227+ from lazr.jobrunner.celerytask import list_queued
228+ from lp.services.job.tests.celery_helpers import noop
229+ job_queue_name = 'celerybeat'
230+ request = self.RunMissingReady().apply_async(
231+ kwargs={'_no_init': True}, queue=job_queue_name)
232+ self.assertTrue(request.task_id.startswith('RunMissingReady_'))
233+ result_queue_name = request.task_id.replace('-', '')
234+ # Paranoia check: This test intends to prove that a Celery
235+ # result queue for the task created above will _not_ be created.
236+ # This would also happen when "with celeryd()" would do nothing.
237+ # So let's be sure that a task is queued...
238+ # Give the system some time to deliver the message
239+ for x in range(10):
240+ if list_queued(self.RunMissingReady.app, [job_queue_name]) > 0:
241+ break
242+ sleep(1)
243+ self.assertEqual(
244+ 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
245+ # Wait at most 60 seconds for celeryd to start and process
246+ # the task.
247+ with celeryd(job_queue_name):
248+ # Due to FIFO ordering, this will only return after
249+ # RunMissingReady has finished.
250+ noop.apply_async(queue=job_queue_name).wait(60)
251+ # But now the message has been consumed by celeryd.
252+ self.assertEqual(
253+ 0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
254+ # No result queue was created for the task.
255+ try:
256+ real_stdout = sys.stdout
257+ real_stderr = sys.stderr
258+ sys.stdout = fake_stdout = StringIO()
259+ sys.stderr = fake_stderr = StringIO()
260+ clear_queues(
261+ ['script_name', '-c', 'lp.services.job.celeryconfig',
262+ result_queue_name])
263+ finally:
264+ sys.stdout = real_stdout
265+ sys.stderr = real_stderr
266+ fake_stdout = fake_stdout.getvalue()
267+ fake_stderr = fake_stderr.getvalue()
268+ self.assertEqual(
269+ '', fake_stdout,
270+ "Unexpected output from clear_queues:\n"
271+ "stdout: %r\n"
272+ "stderr: %r" % (fake_stdout, fake_stderr))
273+ self.assertEqual(
274+ "NOT_FOUND - no queue '%s' in vhost '/'\n" % result_queue_name,
275+ fake_stderr,
276+ "Unexpected output from clear_queues:\n"
277+ "stdout: %r\n"
278+ "stderr: %r" % (fake_stdout, fake_stderr))
279
280=== modified file 'lib/lp/services/job/tests/test_runner.py'
281--- lib/lp/services/job/tests/test_runner.py 2012-07-23 13:29:01 +0000
282+++ lib/lp/services/job/tests/test_runner.py 2012-07-24 19:53:20 +0000
283@@ -384,7 +384,7 @@
284 task_id = job.taskId()
285 uuid_expr = (
286 '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
287- mo = re.search('^NullJob-%s-%s$' % (job.job_id, uuid_expr), task_id)
288+ mo = re.search('^NullJob_%s_%s$' % (job.job_id, uuid_expr), task_id)
289 self.assertIsNot(None, mo)
290
291