Merge lp:~adeuring/lazr.jobrunner/bug-1015667 into lp:lazr.jobrunner

Proposed by Abel Deuring on 2012-07-03
Status: Merged
Approved by: Abel Deuring on 2012-07-03
Approved revision: 39
Merge reported by: Abel Deuring
Merged at revision: not available
Proposed branch: lp:~adeuring/lazr.jobrunner/bug-1015667
Merge into: lp:lazr.jobrunner
Diff against target: 259 lines (+181/-6)
4 files modified
setup.py (+6/-3)
src/lazr/jobrunner/bin/inspect_queues.py (+64/-0)
src/lazr/jobrunner/celerytask.py (+15/-3)
src/lazr/jobrunner/tests/test_celerytask.py (+96/-0)
To merge this branch: bzr merge lp:~adeuring/lazr.jobrunner/bug-1015667
Reviewer Review Type Date Requested Status
Richard Harding (community) 2012-07-03 Approve on 2012-07-03
Review via email: mp+113228@code.launchpad.net

Description of the Change

This branch adds a script "inspect-queues" to lazr.jobrunner.

As described in bug 1015667, some Celery jobs seem to leave
messages in result queues behind. The new script allows us to
see what the messages in these queues contain. I hope that we
can get some clue which jobs created these messages by looking
at these messages. (I have also a branch for the main LP code
ready which changes the task ID so that it includes the job
class and the job ID, together with a UUID. Since the result
queue name is derived from the task ID, we should be able to
trace the "offending jobs" a bit better.)

As a side effect, the messages are also consumed. This might
not be desired in every case -- but I could not figure out
how to call drain_queues() so that the messages stay in the
queues. The darin_queues() parameter "retain" seems to have
no effect...

In other words, the script must be used carefully. But if we
store the result of running "rabbitmqctl list_queues" and run
the script, let's say, 24 hours later, we can be sure that the
queues are suffciently stale so that no results are
inadvertently deleted.

I also changed the function drain_queues(). The old
implementation delegated the setup of the queues completely to
kombu.Consumer.__init__(). Doing this for the result queued
fails with the error "PRECONDITION_FAILED - parameters for queue
'...' in vhost '/' not equivalent". Calling
queue.declare(passive=True) avoids this error, but
Comsumer.__init__() does not allow to do this, so the queues
are now explicitly declared by drain_queues, when called from
inspect_queues().

no lint

To post a comment you must log in.
Richard Harding (rharding) wrote :

Abel, looks ok, but I'd feel better if the name was something more along the lines of clear-queues since that's what it's doing. Inspect doesn't sound destructive enough to warn other users that this could lead to undesired consequences.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'setup.py'
2--- setup.py 2012-05-14 16:42:07 +0000
3+++ setup.py 2012-07-03 14:52:21 +0000
4@@ -32,7 +32,8 @@
5 ]
6
7
8-setup(name='lazr.jobrunner',
9+setup(
10+ name='lazr.jobrunner',
11 version=version,
12 description="A Celery based job runner",
13 long_description=README + '\n\n' + NEWS,
14@@ -51,8 +52,10 @@
15 zip_safe=False,
16 install_requires=install_requires,
17 entry_points={
18- 'console_scripts':
19- ['jobrunnerctl=lazr.jobrunner.bin.jobrunnerctl:main']
20+ 'console_scripts': [
21+ 'jobrunnerctl=lazr.jobrunner.bin.jobrunnerctl:main',
22+ 'inspect-queues=lazr.jobrunner.bin.inspect_queues:main'
23+ ]
24 },
25 test_suite="lazr.jobrunner",
26 )
27
28=== added file 'src/lazr/jobrunner/bin/inspect_queues.py'
29--- src/lazr/jobrunner/bin/inspect_queues.py 1970-01-01 00:00:00 +0000
30+++ src/lazr/jobrunner/bin/inspect_queues.py 2012-07-03 14:52:21 +0000
31@@ -0,0 +1,64 @@
32+# Copyright 2012 Canonical Ltd. All rights reserved.
33+#
34+# This file is part of lazr.jobrunner
35+#
36+# lazr.jobrunner is free software: you can redistribute it and/or modify
37+# it under the terms of the GNU Lesser General Public License as published by
38+# the Free Software Foundation, version 3 of the License.
39+#
40+# lazr.jobrunner is distributed in the hope that it will be useful, but
41+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
42+# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
43+# License for more details.
44+#
45+# You should have received a copy of the GNU Lesser General Public License
46+# along with lazr.jobrunner. If not, see <http://www.gnu.org/licenses/>.
47+
48+
49+"""Inspect Celery result queues."""
50+
51+__metaclass__ = type
52+
53+from argparse import ArgumentParser
54+import os
55+import sys
56+from amqplib.client_0_8.exceptions import AMQPChannelException
57+
58+
59+def show_queue_data(body, message):
60+ print '%s: %r' % (message.delivery_info['routing_key'], body)
61+
62+
63+def inspect_queues(args):
64+ parser = ArgumentParser(description=__doc__, prog=args[0])
65+ parser.add_argument(
66+ '-c', '--config', dest='config', required=True,
67+ help='The name of the Celery config module')
68+ parser.add_argument(
69+ 'queues', nargs='+', metavar='queue',
70+ help='The names of RabbitMQ queues that hold results of Celery tasks')
71+ args = parser.parse_args(args[1:])
72+ os.environ["CELERY_CONFIG_MODULE"] = args.config
73+ # Late import because Celery modules are imported by celerytask,
74+ # and these modules need to know where to find the configuration.
75+ from lazr.jobrunner.celerytask import drain_queues, RunJob
76+
77+ # In theory, drain_queues() can be called with more than one queue
78+ # name in the second argument. But the callback is only called for
79+ # the first queue...
80+ for queue in args.queues:
81+ try:
82+ drain_queues(
83+ RunJob.app, [queue], callbacks=[show_queue_data],
84+ retain=True, passive_queues=True)
85+ except AMQPChannelException as exc:
86+ if exc.amqp_reply_code == 404:
87+ # Unknown queue name specified; amqp_reply_text is
88+ # self-explaining.
89+ print >>sys.stderr, exc.amqp_reply_text
90+ else:
91+ raise
92+
93+
94+def main():
95+ inspect_queues(sys.argv)
96
97=== modified file 'src/lazr/jobrunner/celerytask.py'
98--- src/lazr/jobrunner/celerytask.py 2012-05-18 13:35:46 +0000
99+++ src/lazr/jobrunner/celerytask.py 2012-07-03 14:52:21 +0000
100@@ -81,7 +81,8 @@
101 return listings
102
103
104-def drain_queues(app, queue_names, callbacks=None, retain=False):
105+def drain_queues(app, queue_names, callbacks=None, retain=False,
106+ passive_queues=False):
107 """Drain the messages from queues.
108
109 :param app: The app to list queues for (affects backend, Queue type,
110@@ -104,11 +105,22 @@
111 # The no_ack flag is misleadingly named.
112 # See: https://github.com/ask/kombu/issues/130
113 consumer = Consumer(
114- connection, bindings, callbacks=callbacks, no_ack=not retain)
115+ connection, bindings, callbacks=callbacks, no_ack=not retain,
116+ auto_declare=not passive_queues)
117+ if passive_queues:
118+ # This is basically copied from kombu.Queue.declare().
119+ # We can't use this method directly because queue_declare()
120+ # must be called with passive=True for result queues.
121+ # Otherwise, attempts to connect to the queue fail with
122+ # AMQPChannelException: (406, u"PRECONDITION_FAILED...", ...)
123+ for queue in consumer.queues:
124+ if queue.exchange:
125+ queue.exchange.declare()
126+ queue.queue_declare(passive=True)
127 with consumer:
128 try:
129 # Timeout of 0 causes error: [Errno 11] Resource temporarily
130- # unavailable
131+ # unavailable.
132 connection.drain_events(timeout=0.1 ** 100)
133 except timeout:
134 pass
135
136=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
137--- src/lazr/jobrunner/tests/test_celerytask.py 2012-05-14 15:42:44 +0000
138+++ src/lazr/jobrunner/tests/test_celerytask.py 2012-07-03 14:52:21 +0000
139@@ -18,6 +18,7 @@
140
141
142 import contextlib
143+from cStringIO import StringIO
144 import errno
145 import json
146 import os
147@@ -28,6 +29,7 @@
148 )
149 import shutil
150 import subprocess
151+import sys
152 import tempfile
153 from time import sleep
154 from unittest import TestCase
155@@ -37,6 +39,7 @@
156
157 from celery.exceptions import SoftTimeLimitExceeded
158
159+from lazr.jobrunner.bin.inspect_queues import inspect_queues
160 from lazr.jobrunner.celerytask import (
161 drain_queues,
162 list_queued,
163@@ -476,3 +479,96 @@
164 def test_no_tasks(self):
165 """When no jobs are listed, the queue is shown as empty."""
166 self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
167+
168+
169+class TestInspectQueues(TestCase):
170+ """Tests for the script inspect-queues."""
171+
172+ def queueName(self, task_id):
173+ return task_id.replace('-', '')
174+
175+ def runInspectQueues(self, celery_config, task_ids):
176+ """Invoke inspect_queues() and catch the data written to stdout
177+ and stderr.
178+ """
179+ queues = [self.queueName(task_id) for task_id in task_ids]
180+ real_stdout = sys.stdout
181+ real_stderr = sys.stderr
182+ try:
183+ sys.stdout = StringIO()
184+ sys.stderr = StringIO()
185+ args = ['program', '-c', celery_config] + queues
186+ inspect_queues(args)
187+ fake_stdout = sys.stdout.getvalue()
188+ fake_stderr = sys.stderr.getvalue()
189+ finally:
190+ sys.stdout = real_stdout
191+ sys.stderr = real_stderr
192+ return fake_stdout, fake_stderr
193+
194+ def invokeJob(self, celery_config, task, delay=1, job_args={}):
195+ """Run the given task.
196+
197+ :return: The name of the result queue.
198+ """
199+ with tempdir() as temp_dir:
200+ js = FileJobSource(temp_dir)
201+ job = FileJob(js, 11, **job_args)
202+ job.save()
203+ task_info = task.apply_async(args=(11, ))
204+ with celeryd(celery_config, temp_dir):
205+ # Wait just long enough so that celeryd can start and
206+ # process the job.
207+ sleep(delay)
208+ return task_info.task_id
209+
210+ def successMessage(self, task_id):
211+ return (
212+ "%s: {'status': 'SUCCESS', 'traceback': None, 'result': None, "
213+ "'task_id': '%s'}\n" % (self.queueName(task_id), task_id))
214+
215+ def noQueueMessage(self, task_id):
216+ return (
217+ "NOT_FOUND - no queue '%s' in vhost '/'\n"
218+ % self.queueName(task_id))
219+
220+ def test_inspect_queues__result_not_consumed(self):
221+ """When a Celery task is started so that a result is returned
222+ but the result is not consumed, the related message can be
223+ retrieved with inspect_queues().
224+ """
225+ celery_config = 'lazr.jobrunner.tests.config1'
226+ task_id = self.invokeJob(celery_config, RunFileJob)
227+ stdout, stderr = self.runInspectQueues(celery_config, [task_id])
228+ self.assertEqual(self.successMessage(task_id), stdout)
229+ self.assertEqual('', stderr)
230+
231+ # Reading a queue is destructive. An attempt to read again from
232+ # a queue results in an error.
233+ stdout, stderr = self.runInspectQueues(celery_config, [task_id])
234+ self.assertEqual('', stdout)
235+ self.assertEqual(self.noQueueMessage(task_id), stderr)
236+
237+ def test_inspect_queues__two_queues(self):
238+ """More than one queue can be inspected in one call of
239+ inspect_queue().
240+ """
241+ celery_config = 'lazr.jobrunner.tests.config1'
242+ task_id_1 = self.invokeJob(celery_config, RunFileJob)
243+ task_id_2 = self.invokeJob(celery_config, RunFileJob)
244+ stdout, stderr = self.runInspectQueues(
245+ celery_config, [task_id_1, task_id_2])
246+ expected_stdout = (
247+ self.successMessage(task_id_1) + self.successMessage(task_id_2))
248+ self.assertEqual(expected_stdout, stdout)
249+ self.assertEqual('', stderr)
250+
251+ def test_inspect_queues__task_without_result(self):
252+ """A Celery task which was started so that no result is returned
253+ does not write to a task queue.
254+ """
255+ celery_config = 'lazr.jobrunner.tests.config1'
256+ task_id = self.invokeJob(celery_config, RunFileJobNoResult)
257+ stdout, stderr = self.runInspectQueues(celery_config, [task_id])
258+ self.assertEqual('', stdout)
259+ self.assertEqual(self.noQueueMessage(task_id), stderr)

Subscribers

People subscribed via source and target branches

to all changes: