Merge lp:~wgrant/lazr.jobrunner/celery-3.1 into lp:lazr.jobrunner

Proposed by William Grant
Status: Merged
Merged at revision: 52
Proposed branch: lp:~wgrant/lazr.jobrunner/celery-3.1
Merge into: lp:lazr.jobrunner
Diff against target: 319 lines (+51/-49)
9 files modified
NEWS.txt (+4/-0)
buildout.cfg (+2/-2)
setup.py (+2/-2)
src/lazr/jobrunner/bin/clear_queues.py (+4/-8)
src/lazr/jobrunner/bin/jobrunnerctl.py (+1/-1)
src/lazr/jobrunner/celerytask.py (+6/-6)
src/lazr/jobrunner/tests/test_celerytask.py (+13/-12)
src/lazr/jobrunner/tests/test_jobrunnerctl.py (+18/-17)
src/lazr/jobrunner/version.txt (+1/-1)
To merge this branch: bzr merge lp:~wgrant/lazr.jobrunner/celery-3.1
Reviewer Review Type Date Requested Status
Colin Watson (community) Approve
Review via email: mp+266686@code.launchpad.net

Commit message

Support and require celery >= 3.0.

Description of the change

Support and require celery >= 3.0.

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS.txt'
2--- NEWS.txt 2013-06-14 00:38:07 +0000
3+++ NEWS.txt 2015-08-03 08:35:57 +0000
4@@ -1,6 +1,10 @@
5 News
6 ====
7
8+0.13
9+----
10+* Support and require celery >= 3.0.
11+
12 0.12
13 ----
14 * Only run the job if its is_runnable property is True.
15
16=== modified file 'buildout.cfg'
17--- buildout.cfg 2015-08-03 00:34:03 +0000
18+++ buildout.cfg 2015-08-03 08:35:57 +0000
19@@ -22,8 +22,8 @@
20 eggs = ${buildout:eggs}
21
22 [versions]
23-celery = 2.5.1
24+celery = 3.1.18
25 keyring = 0.6.2
26-kombu = 2.1.1
27+kombu = 3.0.26
28 zc.recipe.egg = 1.3.2
29 z3c.recipe.scripts = 1.0.1
30
31=== modified file 'setup.py'
32--- setup.py 2013-06-13 05:22:01 +0000
33+++ setup.py 2015-08-03 08:35:57 +0000
34@@ -22,13 +22,13 @@
35 NEWS = open(os.path.join(here, 'NEWS.txt')).read()
36
37
38-version = '0.12'
39+version = '0.13'
40
41 install_requires = [
42 # List your project dependencies here.
43 # For more details, see:
44 # http://packages.python.org/distribute/setuptools.html#declaring-dependencies
45- 'celery',
46+ 'celery>=3.0',
47 ]
48
49
50
51=== modified file 'src/lazr/jobrunner/bin/clear_queues.py'
52--- src/lazr/jobrunner/bin/clear_queues.py 2012-07-04 16:34:25 +0000
53+++ src/lazr/jobrunner/bin/clear_queues.py 2015-08-03 08:35:57 +0000
54@@ -22,7 +22,8 @@
55 from argparse import ArgumentParser
56 import os
57 import sys
58-from amqplib.client_0_8.exceptions import AMQPChannelException
59+
60+import amqp
61
62
63 def show_queue_data(body, message):
64@@ -51,13 +52,8 @@
65 drain_queues(
66 RunJob.app, [queue], callbacks=[show_queue_data],
67 retain=True, passive_queues=True)
68- except AMQPChannelException as exc:
69- if exc.amqp_reply_code == 404:
70- # Unknown queue name specified; amqp_reply_text is
71- # self-explaining.
72- print >>sys.stderr, exc.amqp_reply_text
73- else:
74- raise
75+ except amqp.exceptions.NotFound as exc:
76+ print >>sys.stderr, exc.reply_text
77
78
79 def main():
80
81=== modified file 'src/lazr/jobrunner/bin/jobrunnerctl.py'
82--- src/lazr/jobrunner/bin/jobrunnerctl.py 2012-05-09 15:08:25 +0000
83+++ src/lazr/jobrunner/bin/jobrunnerctl.py 2015-08-03 08:35:57 +0000
84@@ -21,7 +21,7 @@
85 import os
86 import sys
87
88-from celery.bin.celeryd_multi import MultiTool
89+from celery.bin.multi import MultiTool
90
91
92 class JobRunnerCtl(MultiTool):
93
94=== modified file 'src/lazr/jobrunner/celerytask.py'
95--- src/lazr/jobrunner/celerytask.py 2013-06-14 00:38:07 +0000
96+++ src/lazr/jobrunner/celerytask.py 2015-08-03 08:35:57 +0000
97@@ -21,7 +21,7 @@
98 from socket import timeout
99
100 from celery.task import Task
101-from kombu import Consumer, Exchange, Queue
102+from kombu import Consumer
103
104 from lazr.jobrunner.jobrunner import (
105 JobRunner,
106@@ -97,18 +97,18 @@
107 if callbacks is None:
108 callbacks = [lambda x, y: None]
109 bindings = []
110- router = app.amqp.Router(create_missing=True)
111+ router = app.amqp.Router(
112+ create_missing=True,
113+ queues=app.amqp.Queues(app.conf.CELERY_QUEUES, create_missing=True))
114 for queue_name in queue_names:
115 destination = router.expand_destination(queue_name)
116- exchange = Exchange(destination['exchange'])
117- queue = Queue(queue_name, exchange=exchange)
118- bindings.append(queue)
119+ bindings.append(destination['queue'])
120 with app.broker_connection() as connection:
121 # The no_ack flag is misleadingly named.
122 # See: https://github.com/ask/kombu/issues/130
123 consumer = Consumer(
124 connection, bindings, callbacks=callbacks, no_ack=not retain,
125- auto_declare=not passive_queues)
126+ auto_declare=not passive_queues, accept=['json', 'pickle'])
127 if passive_queues:
128 # This is basically copied from kombu.Queue.declare().
129 # We can't use this method directly because queue_declare()
130
131=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
132--- src/lazr/jobrunner/tests/test_celerytask.py 2012-07-09 15:53:48 +0000
133+++ src/lazr/jobrunner/tests/test_celerytask.py 2015-08-03 08:35:57 +0000
134@@ -69,11 +69,11 @@
135 proc.wait()
136
137
138-def celeryd(config_module, file_job_dir, queue='celery'):
139- cmd_args = ('--config', config_module, '--queue', queue)
140+def celery_worker(config_module, file_job_dir, queue='celery'):
141+ cmd_args = ('worker', '--config', config_module, '--queue', queue)
142 environ = dict(os.environ)
143 environ['FILE_JOB_DIR'] = file_job_dir
144- return running('bin/celeryd', cmd_args, environ, cwd=get_root())
145+ return running('bin/celery', cmd_args, environ, cwd=get_root())
146
147
148 @contextlib.contextmanager
149@@ -342,7 +342,7 @@
150 result = RunFileJob.delay(10)
151 self.assertIs(None, js.get_output(job))
152 self.assertEqual(JobStatus.WAITING, job.status)
153- with celeryd('lazr.jobrunner.tests.config1', temp_dir):
154+ with celery_worker('lazr.jobrunner.tests.config1', temp_dir):
155 result.wait(10)
156 job = js.get(job.job_id)
157 self.assertEqual('my_output', js.get_output(job))
158@@ -354,7 +354,7 @@
159 job = FileJob(js, 10, **kwargs)
160 job.save()
161 result = RunFileJob.apply_async(args=(10, ), queue=queue)
162- with celeryd(config, temp_dir, queue) as proc:
163+ with celery_worker(config, temp_dir, queue) as proc:
164 try:
165 result.wait(10)
166 except SoftTimeLimitExceeded:
167@@ -373,7 +373,7 @@
168 job = FileJob(js, 10, **kwargs)
169 job.save()
170 RunFileJobNoResult.apply_async(args=(10, ), queue=queue)
171- with celeryd(config, temp_dir, queue) as proc:
172+ with celery_worker(config, temp_dir, queue) as proc:
173 sleep(wait_time)
174 job = js.get(job.job_id)
175 return job, js, proc
176@@ -393,7 +393,7 @@
177 """Raises exception when a job exceeds the configured time limit."""
178 with tempdir() as temp_dir:
179 job, js, proc = self.run_file_job_ignore_result(
180- temp_dir, wait_time=2,
181+ temp_dir, wait_time=5,
182 config='lazr.jobrunner.tests.time_limit_config',
183 sleep=3)
184 self.assertEqual(JobStatus.FAILED, job.status)
185@@ -405,7 +405,7 @@
186 # If a fast and a slow lane are configured, jobs which time out
187 # in the fast lane are queued again in the slow lane.
188 with tempdir() as temp_dir:
189- with celeryd(
190+ with celery_worker(
191 'lazr.jobrunner.tests.time_limit_config_slow_lane',
192 temp_dir, queue='standard_slow'):
193 # The fast lane times out after one second; the job
194@@ -427,7 +427,7 @@
195 # If a fast and a slow lane are configured, jobs which time out
196 # in the fast lane are queued again in the slow lane.
197 with tempdir() as temp_dir:
198- with celeryd(
199+ with celery_worker(
200 'lazr.jobrunner.tests.time_limit_config_slow_lane',
201 temp_dir, queue='standard_slow'):
202 # The fast lane times out after one second; the job
203@@ -501,7 +501,7 @@
204 stdout, stderr = proc.communicate()
205 return stdout, stderr
206
207- def invokeJob(self, celery_config, task, delay=1, job_args={}):
208+ def invokeJob(self, celery_config, task, delay=5, job_args={}):
209 """Run the given task.
210
211 :return: The name of the result queue.
212@@ -511,7 +511,7 @@
213 job = FileJob(js, 11, **job_args)
214 job.save()
215 task_info = task.apply_async(args=(11, ))
216- with celeryd(celery_config, temp_dir):
217+ with celery_worker(celery_config, temp_dir):
218 # Wait just long enough so that celeryd can start and
219 # process the job.
220 sleep(delay)
221@@ -520,7 +520,8 @@
222 def successMessage(self, task_id):
223 return (
224 "%s: {'status': 'SUCCESS', 'traceback': None, 'result': None, "
225- "'task_id': '%s'}\n" % (self.queueName(task_id), task_id))
226+ "'task_id': '%s', 'children': []}\n"
227+ % (self.queueName(task_id), task_id))
228
229 def noQueueMessage(self, task_id):
230 return (
231
232=== modified file 'src/lazr/jobrunner/tests/test_jobrunnerctl.py'
233--- src/lazr/jobrunner/tests/test_jobrunnerctl.py 2012-05-09 12:18:45 +0000
234+++ src/lazr/jobrunner/tests/test_jobrunnerctl.py 2015-08-03 08:35:57 +0000
235@@ -23,7 +23,7 @@
236 from time import sleep
237 from unittest import TestCase
238
239-from celery.bin.celeryd_multi import NamespacedOptionParser
240+from celery.bin.multi import NamespacedOptionParser
241 from lazr.jobrunner.bin.jobrunnerctl import JobRunnerCtl
242 from lazr.jobrunner.tests.test_celerytask import (
243 FileJob,
244@@ -60,25 +60,26 @@
245 job.save()
246 return RunFileJob.apply_async(args=(job_id, ), eta=eta)
247
248- def test_JobRunnerCtl_starts_stops_celeryd(self):
249+ def test_JobRunnerCtl_starts_stops_celery_worker(self):
250 with tempdir() as temp_dir:
251 config = 'lazr.jobrunner.tests.config_no_prefetch'
252 control = self.getController(config, temp_dir)
253 argv = [
254- '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
255+ 'worker', '--config=%s' % config, 'node_name', '-Q:node_name',
256+ 'celery',
257 ]
258 parser = NamespacedOptionParser(argv)
259 # We may have a stale PID file.
260- old_pids = [info[2] for info in control.getpids(parser, 'celeryd')]
261- control.start(argv, 'celeryd')
262+ old_pids = [info[2] for info in control.getpids(parser, 'celery')]
263+ control.start(argv, 'celery')
264 sleep(1)
265 current_pids = [
266- info[2] for info in control.getpids(parser, 'celeryd')]
267+ info[2] for info in control.getpids(parser, 'celery')]
268 self.assertTrue(len(current_pids) > 0)
269 self.assertNotEqual(old_pids, current_pids)
270 for pid in current_pids:
271 self.assertTrue(control.node_alive(pid))
272- control.kill(argv, 'celeryd')
273+ control.kill(argv, 'celery')
274 sleep(1)
275 for pid in current_pids:
276 self.assertFalse(control.node_alive(pid))
277@@ -98,13 +99,13 @@
278 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
279 '-c:node_name', '1',
280 ]
281- control.start(argv, 'celeryd')
282- control.kill(argv, 'celeryd')
283+ control.start(argv, 'celery')
284+ control.kill(argv, 'celery')
285 sleep(1)
286- control.start(argv, 'celeryd')
287+ control.start(argv, 'celery')
288 for job in all_jobs:
289 job.wait(10)
290- control.kill(argv, 'celeryd')
291+ control.kill(argv, 'celery')
292
293 def test_JobRunnerCtl_kill_does_not_lose_jobs_with_eta(self):
294 with tempdir() as temp_dir:
295@@ -119,11 +120,11 @@
296 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
297 '-c:node_name', '1',
298 ]
299- control.start(argv, 'celeryd')
300- sleep(1)
301- control.kill(argv, 'celeryd')
302- sleep(1)
303- control.start(argv, 'celeryd')
304+ control.start(argv, 'celery')
305+ sleep(1)
306+ control.kill(argv, 'celery')
307+ sleep(1)
308+ control.start(argv, 'celery')
309 for job in all_jobs:
310 job.wait(10)
311- control.kill(argv, 'celeryd')
312+ control.kill(argv, 'celery')
313
314=== modified file 'src/lazr/jobrunner/version.txt'
315--- src/lazr/jobrunner/version.txt 2013-06-13 05:22:01 +0000
316+++ src/lazr/jobrunner/version.txt 2015-08-03 08:35:57 +0000
317@@ -1,1 +1,1 @@
318-0.12
319+0.13

Subscribers

People subscribed via source and target branches

to all changes: