Merge lp:~abentley/lazr.jobrunner/add-missing-jobs into lp:lazr.jobrunner

Proposed by Aaron Bentley
Status: Merged
Merged at revision: 35
Proposed branch: lp:~abentley/lazr.jobrunner/add-missing-jobs
Merge into: lp:lazr.jobrunner
Diff against target: 173 lines (+101/-8)
5 files modified
NEWS.txt (+5/-0)
setup.py (+5/-5)
src/lazr/jobrunner/celerytask.py (+54/-1)
src/lazr/jobrunner/tests/test_celerytask.py (+36/-1)
src/lazr/jobrunner/version.txt (+1/-1)
To merge this branch: bzr merge lp:~abentley/lazr.jobrunner/add-missing-jobs
Reviewer Review Type Date Requested Status
Abel Deuring (community) code Approve
Review via email: mp+105680@code.launchpad.net

Commit message

Support list_queued.

Description of the change

This branch adds a list_queued function.

It works by draining the queue, not acknowledging the messages, and closing the connection. This restores the messages, allowing them to be consumed by another consumer.

It is implemented via a drain_queues method, which is useful for test cases that need to clean up after themselves.

It bumps the version number to 0.6, so that the Launchpad-side code can request
it.

To post a comment you must log in.
Revision history for this message
Abel Deuring (adeuring) wrote :

nice work

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'NEWS.txt'
--- NEWS.txt 2012-05-11 14:35:54 +0000
+++ NEWS.txt 2012-05-14 15:49:20 +0000
@@ -1,6 +1,11 @@
1News1News
2====2====
33
40.6
5___
6
7* Support list_queued for celery tasks.
8
40.590.5
5___10___
611
712
=== modified file 'setup.py'
--- setup.py 2012-05-11 14:35:54 +0000
+++ setup.py 2012-05-14 15:49:20 +0000
@@ -22,7 +22,7 @@
22NEWS = open(os.path.join(here, 'NEWS.txt')).read()22NEWS = open(os.path.join(here, 'NEWS.txt')).read()
2323
2424
25version = '0.5'25version = '0.6'
2626
27install_requires = [27install_requires = [
28 # List your project dependencies here.28 # List your project dependencies here.
@@ -40,10 +40,10 @@
40 # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers40 # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
41 ],41 ],
42 keywords='',42 keywords='',
43 author='',43 author='Launchpad Developers',
44 author_email='',44 author_email='launchpad-dev@lists.launchpad.net',
45 url='',45 url='https://launchpad.net/lazr.jobrunner',
46 license='',46 license='GPL v3',
47 packages=find_packages('src'),47 packages=find_packages('src'),
48 package_dir = {'': 'src'},48 package_dir = {'': 'src'},
49 namespace_packages = ['lazr'],49 namespace_packages = ['lazr'],
5050
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py 2012-04-11 14:55:53 +0000
+++ src/lazr/jobrunner/celerytask.py 2012-05-14 15:49:20 +0000
@@ -17,8 +17,12 @@
17__metaclass__ = type17__metaclass__ = type
1818
1919
20from functools import partial
21from socket import timeout
22
20from celery.task import Task23from celery.task import Task
21from functools import partial24from kombu import Consumer, Exchange, Queue
25
22from lazr.jobrunner.jobrunner import (26from lazr.jobrunner.jobrunner import (
23 JobRunner,27 JobRunner,
24 LeaseHeld,28 LeaseHeld,
@@ -59,3 +63,52 @@
5963
60 def reQueue(self, job_id, fallback_queue):64 def reQueue(self, job_id, fallback_queue):
61 self.apply_async(args=(job_id, ), queue=fallback_queue)65 self.apply_async(args=(job_id, ), queue=fallback_queue)
66
67
68def list_queued(app, queue_names):
69 """List the queued messages as body/message tuples for a given app.
70
71 :param app: The app to list queues for (affects backend, Queue type,
72 etc.).
73 :param queue_names: Names of the queues to list.
74 """
75 listings = []
76
77 def add_listing(body, message):
78 listings.append((body['task'], body['args']))
79
80 drain_queues(app, queue_names, callbacks=[add_listing], retain=True)
81 return listings
82
83
84def drain_queues(app, queue_names, callbacks=None, retain=False):
85 """Drain the messages from queues.
86
87 :param app: The app to list queues for (affects backend, Queue type,
88 etc.).
89 :param queue_names: Names of the queues to list.
90 :param callbacks: Optional list of callbacks to call on each message.
91 Callback must accept (body, message) as parameters.
92 :param retain: After this operation, retain the messages in the queue.
93 """
94 if callbacks is None:
95 callbacks = [lambda x, y: None]
96 bindings = []
97 router = app.amqp.Router()
98 for queue_name in queue_names:
99 destination = router.expand_destination(queue_name)
100 exchange = Exchange(destination['exchange'])
101 queue = Queue(queue_name, exchange=exchange)
102 bindings.append(queue)
103 with app.broker_connection() as connection:
104 # The meaning of no_ack appears to be inverted.
105 # See: https://github.com/ask/kombu/issues/126
106 consumer = Consumer(
107 connection, bindings, callbacks=callbacks, no_ack=not retain)
108 with consumer:
109 try:
110 # Timeout of 0 causes error: [Errno 11] Resource temporarily
111 # unavailable
112 connection.drain_events(timeout=.1 ** 100)
113 except timeout:
114 pass
62115
=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py 2012-05-10 14:18:11 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py 2012-05-14 15:49:20 +0000
@@ -37,7 +37,11 @@
3737
38from celery.exceptions import SoftTimeLimitExceeded38from celery.exceptions import SoftTimeLimitExceeded
3939
40from lazr.jobrunner.celerytask import RunJob40from lazr.jobrunner.celerytask import (
41 drain_queues,
42 list_queued,
43 RunJob,
44 )
41from lazr.jobrunner.jobrunner import (45from lazr.jobrunner.jobrunner import (
42 JobStatus,46 JobStatus,
43 )47 )
@@ -441,3 +445,34 @@
441 job_output)445 job_output)
442446
443 self.assertEqual(JobStatus.FAILED, job.status)447 self.assertEqual(JobStatus.FAILED, job.status)
448
449
450class TestListQueues(TestCase):
451 """Tests for list_queues.
452
453 These tests deliberately do not use a celeryd, because we want to ensure
454 that the messages are retained so that they can be listed.
455 """
456
457 queue = 'steve'
458
459 def queue_job(self):
460 RunFileJob.apply_async(args=(10, ), queue=self.queue)
461 self.addCleanup(drain_queues, RunFileJob.app, [self.queue])
462
463 def test_list_queued(self):
464 """When a job is queued, it is listed."""
465 self.queue_job()
466 tasks = list_queued(RunFileJob.app, [self.queue])
467 self.assertEqual(('run_file_job', (10,)), tasks[0])
468
469 def test_list_queued_twice(self):
470 """Listing a job does not remove it from the queue."""
471 self.queue_job()
472 list_queued(RunFileJob.app, [self.queue])
473 tasks = list_queued(RunFileJob.app, [self.queue])
474 self.assertEqual(('run_file_job', (10,)), tasks[0])
475
476 def test_no_tasks(self):
477 """When no jobs are listed, the queue is shown as empty."""
478 self.assertEqual([], list_queued(RunFileJob.app, [self.queue]))
444479
=== modified file 'src/lazr/jobrunner/version.txt'
--- src/lazr/jobrunner/version.txt 2012-05-11 14:35:54 +0000
+++ src/lazr/jobrunner/version.txt 2012-05-14 15:49:20 +0000
@@ -1,1 +1,1 @@
10.510.6

Subscribers

People subscribed via source and target branches

to all changes: