Merge lp:~adeuring/launchpad/bug-1015667-3 into lp:launchpad

Proposed by Abel Deuring
Status: Merged
Approved by: Abel Deuring
Approved revision: no longer in the source branch.
Merged at revision: 15656
Proposed branch: lp:~adeuring/launchpad/bug-1015667-3
Merge into: lp:launchpad
Diff against target: 265 lines (+117/-23)
7 files modified
lib/lp/services/config/schema-lazr.conf (+7/-1)
lib/lp/services/job/celeryconfig.py (+5/-1)
lib/lp/services/job/celeryjob.py (+32/-14)
lib/lp/services/job/runner.py (+1/-1)
lib/lp/services/job/tests/test_celery_configuration.py (+2/-1)
lib/lp/services/job/tests/test_celeryjob.py (+69/-4)
lib/lp/services/job/tests/test_runner.py (+1/-1)
To merge this branch: bzr merge lp:~adeuring/launchpad/bug-1015667-3
Reviewer Review Type Date Requested Status
Benji York (community) code Approve
Review via email: mp+115110@code.launchpad.net

Commit message

run the Celery task RunMissingReady with ignore_result=True

Description of the change

This branch fixes bug 1015667: Celery is leaving an ever increasing number
of queues behind. Well, at least I hope that the bug will be fixed --
the result queues created by Celery tasks generally do not give any hint
which task created them: The queue names are the task IDs which are by
default strings generated by uuid4(), and the content of the queued
messages is nothing more than a status message and the data returned by
Task.run(). (OK, this is not completely true: Messages for failed tasks
also contain a traceback -- but that does not help much when we have
tasks that tend to run without failures.)

The culprit was the task run_missing_jobs which simply did not had
the flag ignore_result set.

I changed the task from a decorated function into a class derived
from celery.task.Task and defined the class attribute ignore_result.

This also allows also to override the default task ID. If no ID is
specified as a parameter for celery.task.Task.apply_async(), a default
value as mentioned above is created in Task.apply() or
celery.app.amqp.TaskPublisher.delay_task().

The test that no result messsage queue is created is somewhat paranoid:
Testing that an unwanted side effect of some other functionality does
not occur must ensure that main event actually happens. Otherwise,
the test might simply check: "If nothing happens, there is no result
queue."

test_run_missing_ready_does_not_return_results() does this by calling
list_queued() -- but since this function uses another function
drain_queues() which can also consume queued messages I wanted to be
sure that this does not happen, hence list_queues() is called twice
before celeryd is started.

    Side note, for extra fun: have a look at the implementation of
    list_queued() and drain_queues() in lazr.jobrunner. drain_queues()
    has the parameter "retain"; if it is True, messages are consumed;
    list_queued() calls drain_queued(retain=True, ...), so that looks
    sane. But the usage of this parameter in drain_queues() scares me:

            consumer = Consumer(
                connection, bindings, callbacks=callbacks, no_ack=not retain,
                auto_declare=not passive_queues)

    So no_ack must be False in order to kepp the message in the queue.
    And if you want to remove the message, you should set the "don't
    acknowledge" flag to False...

This test (and several others for the lazr.jobrunner package and for
lp.servcices.job) could be more simple and more robust if there would
be a way to easily check which queues exist on a rabbitmq instance
and how many messages they contain. There is "rabbitmqctl list_queues"
but this requires root privileges. Also, rabbitmq can provide a
webservice API, but last time I checked it was not available on precise.

And a final rant: After quickly glancing through the AMQP specs I believe
that the protocol does not provide a mecahnism to answer the question
"which queues exist on a given exchange". Scary...

Anyway, I also noticed a minor flaw in BaseRunnableJob.taskId(). This
method creates task IDs for regular job runner tasks, similar to
RUnMissingReady.appy_async(). BaseRunnableJob.taskId() also adds the
DB ID of a job and separated the different parts (class name, DB ID,
UUID) with '-'. But the '-' are removed from the queue name, so it is
a bit hard to separate the DB ID from the UUID in the queue name.
So I replaced the "separator symbol" '-' with '_'.

tests:

./bin/test services.job -vvt lp.services.job.tests.test_celeryjob
./bin/test services.job -vvt lp.services.job.tests.test_runner

no lint

To post a comment you must log in.
Revision history for this message
Benji York (benji) wrote :

This branch looks great.

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/config/schema-lazr.conf'
2--- lib/lp/services/config/schema-lazr.conf 2012-07-03 17:09:00 +0000
3+++ lib/lp/services/config/schema-lazr.conf 2012-07-19 14:00:56 +0000
4@@ -1805,7 +1805,7 @@
5
6 [job_runner_queues]
7 # The names of all queues.
8-queues: job job_slow branch_write_job branch_write_job_slow
9+queues: job job_slow branch_write_job branch_write_job_slow celerybeat
10
11 # The main job queue.
12 [job]
13@@ -1837,3 +1837,9 @@
14 timeout: 86400
15 fallback_queue:
16 concurrency: 1
17+
18+# The queue used for the celerybeat task RunMissingReady
19+[celerybeat]
20+timeout: 86400
21+fallback_queue:
22+concurrency: 1
23
24=== modified file 'lib/lp/services/job/celeryconfig.py'
25--- lib/lp/services/job/celeryconfig.py 2012-06-29 08:40:05 +0000
26+++ lib/lp/services/job/celeryconfig.py 2012-07-19 14:00:56 +0000
27@@ -35,6 +35,7 @@
28 Doing this in a function is convenient for testing.
29 """
30 result = {}
31+ CELERY_BEAT_QUEUE = 'celerybeat'
32 celery_queues = {}
33 queue_names = config.job_runner_queues.queues
34 queue_names = queue_names.split(' ')
35@@ -85,7 +86,10 @@
36 result['CELERYBEAT_SCHEDULE'] = {
37 'schedule-missing': {
38 'task': 'lp.services.job.celeryjob.run_missing_ready',
39- 'schedule': timedelta(seconds=600)
40+ 'schedule': timedelta(seconds=600),
41+ 'options': {
42+ 'routing_key': CELERY_BEAT_QUEUE,
43+ },
44 }
45 }
46 # See http://ask.github.com/celery/userguide/optimizing.html:
47
48=== modified file 'lib/lp/services/job/celeryjob.py'
49--- lib/lp/services/job/celeryjob.py 2012-06-14 05:18:22 +0000
50+++ lib/lp/services/job/celeryjob.py 2012-07-19 14:00:56 +0000
51@@ -16,10 +16,11 @@
52
53 from logging import info
54 import os
55+from uuid import uuid4
56
57
58 os.environ.setdefault('CELERY_CONFIG_MODULE', 'lp.services.job.celeryconfig')
59-from celery.task import task
60+from celery.task import Task
61 from lazr.jobrunner.celerytask import RunJob
62 from storm.zope.interfaces import IZStorm
63 import transaction
64@@ -80,24 +81,41 @@
65 queued_job_ids]
66
67
68-@task
69-def run_missing_ready(_no_init=False):
70+class RunMissingReady(Task):
71 """Task to run any jobs that are ready but not scheduled.
72
73 Currently supports only BranchScanJob.
74 :param _no_init: For tests. If True, do not perform the initialization.
75 """
76- if not _no_init:
77- task_init('run_missing_ready')
78- with TransactionFreeOperation():
79- count = 0
80- for job in find_missing_ready(BranchScanJob):
81- if not celery_enabled(job.__class__.__name__):
82- continue
83- job.celeryCommitHook(True)
84- count += 1
85- info('Scheduled %d missing jobs.', count)
86- transaction.commit()
87+ ignore_result = True
88+
89+ def run(self, _no_init=False):
90+ if not _no_init:
91+ task_init('run_missing_ready')
92+ with TransactionFreeOperation():
93+ count = 0
94+ for job in find_missing_ready(BranchScanJob):
95+ if not celery_enabled(job.__class__.__name__):
96+ continue
97+ job.celeryCommitHook(True)
98+ count += 1
99+ info('Scheduled %d missing jobs.', count)
100+ transaction.commit()
101+
102+ def apply_async(self, args=None, kwargs=None, task_id=None, publisher=None,
103+ connection=None, router=None, queues=None, **options):
104+ """Create a task_id if none is specified.
105+
106+ Override the quite generic default task_id with one containing
107+ the class name.
108+
109+ See also `celery.task.Task.apply_async()`.
110+ """
111+ if task_id is None:
112+ task_id = '%s_%s' % (self.__class__.__name__, uuid4())
113+ return super(RunMissingReady, self).apply_async(
114+ args, kwargs, task_id, publisher, connection, router, queues,
115+ **options)
116
117
118 needs_zcml = True
119
120=== modified file 'lib/lp/services/job/runner.py'
121--- lib/lp/services/job/runner.py 2012-07-03 15:36:00 +0000
122+++ lib/lp/services/job/runner.py 2012-07-19 14:00:56 +0000
123@@ -217,7 +217,7 @@
124 'result': SoftTimeLimitExceeded(1,),
125 'task_id': 'cba7d07b-37fe-4f1d-a5f6-79ad7c30222f'}
126 """
127- return '%s-%s-%s' % (
128+ return '%s_%s_%s' % (
129 self.__class__.__name__, self.job_id, uuid4())
130
131 def runViaCelery(self, ignore_result=False):
132
133=== modified file 'lib/lp/services/job/tests/test_celery_configuration.py'
134--- lib/lp/services/job/tests/test_celery_configuration.py 2012-04-18 17:20:14 +0000
135+++ lib/lp/services/job/tests/test_celery_configuration.py 2012-07-19 14:00:56 +0000
136@@ -25,7 +25,8 @@
137 # Four queues are defined; the binding key for each queue is
138 # just the queue name.
139 queue_names = [
140- 'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
141+ 'branch_write_job', 'branch_write_job_slow', 'celerybeat', 'job',
142+ 'job_slow']
143 queues = config['CELERY_QUEUES']
144 self.assertEqual(queue_names, sorted(queues))
145 for name in queue_names:
146
147=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
148--- lib/lp/services/job/tests/test_celeryjob.py 2012-06-27 03:25:41 +0000
149+++ lib/lp/services/job/tests/test_celeryjob.py 2012-07-19 14:00:56 +0000
150@@ -1,10 +1,18 @@
151 # Copyright 2012 Canonical Ltd. This software is licensed under the
152 # GNU Affero General Public License version 3 (see the file LICENSE).
153
154+from cStringIO import StringIO
155+import sys
156+from time import (
157+ sleep,
158+ time,
159+ )
160+from lazr.jobrunner.bin.clear_queues import clear_queues
161 from lp.code.model.branchjob import BranchScanJob
162 from lp.scripts.helpers import TransactionFreeOperation
163 from lp.services.features.testing import FeatureFixture
164 from lp.services.job.tests import (
165+ celeryd,
166 drain_celery_queues,
167 monitor_celery,
168 )
169@@ -21,10 +29,10 @@
170 super(TestRunMissingJobs, self).setUp()
171 from lp.services.job.celeryjob import (
172 find_missing_ready,
173- run_missing_ready,
174+ RunMissingReady,
175 )
176 self.find_missing_ready = find_missing_ready
177- self.run_missing_ready = run_missing_ready
178+ self.RunMissingReady = RunMissingReady
179
180 def createMissingJob(self):
181 job = BranchScanJob.create(self.factory.makeBranch())
182@@ -48,7 +56,7 @@
183 with monitor_celery() as responses:
184 with dbuser('run_missing_ready'):
185 with TransactionFreeOperation.require():
186- self.run_missing_ready(_no_init=True)
187+ self.RunMissingReady().run(_no_init=True)
188 self.assertEqual([], responses)
189
190 def test_run_missing_ready(self):
191@@ -59,5 +67,62 @@
192 with monitor_celery() as responses:
193 with dbuser('run_missing_ready'):
194 with TransactionFreeOperation.require():
195- self.run_missing_ready(_no_init=True)
196+ self.RunMissingReady().run(_no_init=True)
197 self.assertEqual(1, len(responses))
198+
199+ def test_run_missing_ready_does_not_return_results(self):
200+ """The celerybeat task run_missing_ready does not create a
201+ result queue."""
202+ from lazr.jobrunner.celerytask import list_queued
203+ job_queue_name = 'celerybeat'
204+ request = self.RunMissingReady().apply_async(
205+ kwargs={'_no_init': True}, queue=job_queue_name)
206+ self.assertTrue(request.task_id.startswith('RunMissingReady_'))
207+ result_queue_name = request.task_id.replace('-', '')
208+ # Paranoia check: This test intends to prove that a Celery
209+ # result queue fot the task created above will _not_ be created.
210+ # This would also happen when "with celeryd()" would do nothing.
211+ # So let's be sure that right now a task is queued...
212+ self.assertEqual(
213+ 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
214+ # ...and that list_queued() calls do not consume messages.
215+ self.assertEqual(
216+ 1, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
217+ # Wait at most 60 seconds for celeryd to start and process
218+ # the task.
219+ with celeryd(job_queue_name):
220+ wait_until = time() + 60
221+ while (time() < wait_until):
222+ queued_tasks = list_queued(
223+ self.RunMissingReady.app, [job_queue_name])
224+ if len(queued_tasks) == 0:
225+ break
226+ sleep(.2)
227+ # But now the message has been consumed by celeryd.
228+ self.assertEqual(
229+ 0, len(list_queued(self.RunMissingReady.app, [job_queue_name])))
230+ # No result queue was created for the task.
231+ try:
232+ real_stdout = sys.stdout
233+ real_stderr = sys.stderr
234+ sys.stdout = fake_stdout = StringIO()
235+ sys.stderr = fake_stderr = StringIO()
236+ clear_queues(
237+ ['script_name', '-c', 'lp.services.job.celeryconfig',
238+ result_queue_name])
239+ finally:
240+ sys.stdout = real_stdout
241+ sys.stderr = real_stderr
242+ fake_stdout = fake_stdout.getvalue()
243+ fake_stderr = fake_stderr.getvalue()
244+ self.assertEqual(
245+ '', fake_stdout,
246+ "Unexpected output from clear_queues:\n"
247+ "stdout: %r\n"
248+ "stderr: %r" % (fake_stdout, fake_stderr))
249+ self.assertEqual(
250+ "NOT_FOUND - no queue '%s' in vhost '/'\n" % result_queue_name,
251+ fake_stderr,
252+ "Unexpected output from clear_queues:\n"
253+ "stdout: %r\n"
254+ "stderr: %r" % (fake_stdout, fake_stderr))
255
256=== modified file 'lib/lp/services/job/tests/test_runner.py'
257--- lib/lp/services/job/tests/test_runner.py 2012-07-03 15:36:00 +0000
258+++ lib/lp/services/job/tests/test_runner.py 2012-07-19 14:00:56 +0000
259@@ -384,7 +384,7 @@
260 task_id = job.taskId()
261 uuid_expr = (
262 '[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}')
263- mo = re.search('^NullJob-%s-%s$' % (job.job_id, uuid_expr), task_id)
264+ mo = re.search('^NullJob_%s_%s$' % (job.job_id, uuid_expr), task_id)
265 self.assertIsNot(None, mo)
266
267