Merge lp:~abentley/lazr.jobrunner/add-missing-jobs into lp:lazr.jobrunner

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 35
Proposed branch: lp:~abentley/lazr.jobrunner/add-missing-jobs
Merge into: lp:lazr.jobrunner
Diff against target: 173 lines (+101/-8)
5 files modified
NEWS.txt (+5/-0)
setup.py (+5/-5)
src/lazr/jobrunner/celerytask.py (+54/-1)
src/lazr/jobrunner/tests/test_celerytask.py (+36/-1)
src/lazr/jobrunner/version.txt (+1/-1)
To merge this branch: bzr merge lp:~abentley/lazr.jobrunner/add-missing-jobs
Reviewer Review Type Date Requested Status
Abel Deuring (community) code Approve
Review via email: mp+105680@code.launchpad.net

Commit message

Support list_queued.

Description of the change

This branch adds a list_queued function.

It works by draining the queue, not acknowledging the messages, and closing the connection. This restores the messages, allowing them to be consumed by another consumer.

It is implemented via a drain_queues method, which is useful for test cases that need to clean up after themselves.

It bumps the version number to 0.6, so that the Launchpad-side code can request
it.

To post a comment you must log in.
Revision history for this message
Abel Deuring (adeuring) wrote :

nice work

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS.txt'
2--- NEWS.txt 2012-05-11 14:35:54 +0000
3+++ NEWS.txt 2012-05-14 15:49:20 +0000
4@@ -1,6 +1,11 @@
5 News
6 ====
7
8+0.6
9+___
10+
11+* Support list_queued for celery tasks.
12+
13 0.5
14 ___
15
16
17=== modified file 'setup.py'
18--- setup.py 2012-05-11 14:35:54 +0000
19+++ setup.py 2012-05-14 15:49:20 +0000
20@@ -22,7 +22,7 @@
21 NEWS = open(os.path.join(here, 'NEWS.txt')).read()
22
23
24-version = '0.5'
25+version = '0.6'
26
27 install_requires = [
28 # List your project dependencies here.
29@@ -40,10 +40,10 @@
30 # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
31 ],
32 keywords='',
33- author='',
34- author_email='',
35- url='',
36- license='',
37+ author='Launchpad Developers',
38+ author_email='launchpad-dev@lists.launchpad.net',
39+ url='https://launchpad.net/lazr.jobrunner',
40+ license='GPL v3',
41 packages=find_packages('src'),
42 package_dir = {'': 'src'},
43 namespace_packages = ['lazr'],
44
45=== modified file 'src/lazr/jobrunner/celerytask.py'
46--- src/lazr/jobrunner/celerytask.py 2012-04-11 14:55:53 +0000
47+++ src/lazr/jobrunner/celerytask.py 2012-05-14 15:49:20 +0000
48@@ -17,8 +17,12 @@
49 __metaclass__ = type
50
51
52+from functools import partial
53+from socket import timeout
54+
55 from celery.task import Task
56-from functools import partial
57+from kombu import Consumer, Exchange, Queue
58+
59 from lazr.jobrunner.jobrunner import (
60 JobRunner,
61 LeaseHeld,
62@@ -59,3 +63,52 @@
63
64 def reQueue(self, job_id, fallback_queue):
65 self.apply_async(args=(job_id, ), queue=fallback_queue)
66+
67+
68+def list_queued(app, queue_names):
69+ """List the queued messages as body/message tuples for a given app.
70+
71+ :param app: The app to list queues for (affects backend, Queue type,
72+ etc.).
73+ :param queue_names: Names of the queues to list.
74+ """
75+ listings = []
76+
77+ def add_listing(body, message):
78+ listings.append((body['task'], body['args']))
79+
80+ drain_queues(app, queue_names, callbacks=[add_listing], retain=True)
81+ return listings
82+
83+
84+def drain_queues(app, queue_names, callbacks=None, retain=False):
85+ """Drain the messages from queues.
86+
87+ :param app: The app to list queues for (affects backend, Queue type,
88+ etc.).
89+ :param queue_names: Names of the queues to list.
90+ :param callbacks: Optional list of callbacks to call on each message.
91+ Callback must accept (body, message) as parameters.
92+ :param retain: After this operation, retain the messages in the queue.
93+ """
94+ if callbacks is None:
95+ callbacks = [lambda x, y: None]
96+ bindings = []
97+ router = app.amqp.Router()
98+ for queue_name in queue_names:
99+ destination = router.expand_destination(queue_name)
100+ exchange = Exchange(destination['exchange'])
101+ queue = Queue(queue_name, exchange=exchange)
102+ bindings.append(queue)
103+ with app.broker_connection() as connection:
104+ # The meaning of no_ack appears to be inverted.
105+ # See: https://github.com/ask/kombu/issues/126
106+ consumer = Consumer(
107+ connection, bindings, callbacks=callbacks, no_ack=not retain)
108+ with consumer:
109+ try:
110+ # Timeout of 0 causes error: [Errno 11] Resource temporarily
111+ # unavailable
112+ connection.drain_events(timeout=.1 ** 100)
113+ except timeout:
114+ pass
115
116=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
117--- src/lazr/jobrunner/tests/test_celerytask.py 2012-05-10 14:18:11 +0000
118+++ src/lazr/jobrunner/tests/test_celerytask.py 2012-05-14 15:49:20 +0000
119@@ -37,7 +37,11 @@
120
121 from celery.exceptions import SoftTimeLimitExceeded
122
123-from lazr.jobrunner.celerytask import RunJob
124+from lazr.jobrunner.celerytask import (
125+ drain_queues,
126+ list_queued,
127+ RunJob,
128+ )
129 from lazr.jobrunner.jobrunner import (
130 JobStatus,
131 )
132@@ -441,3 +445,34 @@
133 job_output)
134
135 self.assertEqual(JobStatus.FAILED, job.status)
136+
137+
138+class TestListQueues(TestCase):
139+ """Tests for list_queues.
140+
141+ These tests deliberately do not use a celeryd, because we want to ensure
142+ that the messages are retained so that they can be listed.
143+ """
144+
145+ queue = 'steve'
146+
147+ def queue_job(self):
148+ RunFileJob.apply_async(args=(10, ), queue=self.queue)
149+ self.addCleanup(drain_queues, RunFileJob.app, [self.queue])
150+
151+ def test_list_queued(self):
152+ """When a job is queued, it is listed."""
153+ self.queue_job()
154+ tasks = list_queued(RunFileJob.app, [self.queue])
155+ self.assertEqual(('run_file_job', (10,)), tasks[0])
156+
157+ def test_list_queued_twice(self):
158+ """Listing a job does not remove it from the queue."""
159+ self.queue_job()
160+ list_queued(RunFileJob.app, [self.queue])
161+ tasks = list_queued(RunFileJob.app, [self.queue])
162+ self.assertEqual(('run_file_job', (10,)), tasks[0])
163+
164+ def test_no_tasks(self):
165+ """When no jobs are listed, the queue is shown as empty."""
166+ self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
167
168=== modified file 'src/lazr/jobrunner/version.txt'
169--- src/lazr/jobrunner/version.txt 2012-05-11 14:35:54 +0000
170+++ src/lazr/jobrunner/version.txt 2012-05-14 15:49:20 +0000
171@@ -1,1 +1,1 @@
172-0.5
173+0.6

Subscribers

People subscribed via source and target branches

to all changes: