Merge lp:~cjwatson/lazr.jobrunner/celery-4 into lp:lazr.jobrunner

Proposed by Colin Watson
Status: Merged
Merged at revision: 56
Proposed branch: lp:~cjwatson/lazr.jobrunner/celery-4
Merge into: lp:lazr.jobrunner
Prerequisite: lp:~cjwatson/lazr.jobrunner/oops-0.0.11
Diff against target: 271 lines (+86/-41)
11 files modified
NEWS.txt (+1/-0)
setup.py (+1/-1)
src/lazr/jobrunner/celeryconfig.py (+1/-6)
src/lazr/jobrunner/celerytask.py (+7/-1)
src/lazr/jobrunner/tests/config_no_prefetch.py (+1/-1)
src/lazr/jobrunner/tests/config_two_queues.py (+3/-3)
src/lazr/jobrunner/tests/simple_config.py (+1/-1)
src/lazr/jobrunner/tests/test_celerytask.py (+13/-8)
src/lazr/jobrunner/tests/test_jobrunnerctl.py (+51/-18)
src/lazr/jobrunner/tests/time_limit_config.py (+1/-1)
tox.ini (+6/-1)
To merge this branch: bzr merge lp:~cjwatson/lazr.jobrunner/celery-4
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+364032@code.launchpad.net

Commit message

Support celery >= 4.0.

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'NEWS.txt'
2--- NEWS.txt 2019-03-06 12:53:46 +0000
3+++ NEWS.txt 2019-03-06 12:53:46 +0000
4@@ -5,6 +5,7 @@
5 ----------
6 * Add tox testing support.
7 * Support and require oops >= 0.0.11.
8+* Support celery >= 4.0.
9
10 0.13
11 ----
12
13=== modified file 'setup.py'
14--- setup.py 2019-03-06 12:53:46 +0000
15+++ setup.py 2019-03-06 12:53:46 +0000
16@@ -28,7 +28,7 @@
17 # List your project dependencies here.
18 # For more details, see:
19 # http://packages.python.org/distribute/setuptools.html#declaring-dependencies
20- 'celery>=3.0',
21+ 'celery>=3.0,<5.0',
22 ]
23
24 tests_require = [
25
26=== modified file 'src/lazr/jobrunner/celeryconfig.py'
27--- src/lazr/jobrunner/celeryconfig.py 2012-05-10 14:18:11 +0000
28+++ src/lazr/jobrunner/celeryconfig.py 2019-03-06 12:53:46 +0000
29@@ -1,8 +1,3 @@
30-#BROKER_PORT = 5672
31-#BROKER_USER = "guest"
32-#BROKER_PASSWORD = "guest"
33-
34-BROKER_VHOST = "/"
35+BROKER_URL = "amqp://"
36 CELERY_RESULT_BACKEND = "amqp"
37 CELERY_IMPORTS = ("lazr.jobrunner.jobrunner", )
38-CELERYD_LOG_LEVEL = 'INFO'
39
40=== modified file 'src/lazr/jobrunner/celerytask.py'
41--- src/lazr/jobrunner/celerytask.py 2015-08-03 05:44:51 +0000
42+++ src/lazr/jobrunner/celerytask.py 2019-03-06 12:53:46 +0000
43@@ -77,7 +77,13 @@
44 listings = []
45
46 def add_listing(body, message):
47- listings.append((body['task'], body['args']))
48+ try:
49+ # celery >= 4.0.0
50+ listings.append((
51+ message.properties['application_headers']['task'],
52+ tuple(body[0])))
53+ except (AttributeError, KeyError):
54+ listings.append((body['task'], body['args']))
55
56 drain_queues(app, queue_names, callbacks=[add_listing], retain=True)
57 return listings
58
59=== modified file 'src/lazr/jobrunner/tests/config_no_prefetch.py'
60--- src/lazr/jobrunner/tests/config_no_prefetch.py 2012-05-09 11:50:26 +0000
61+++ src/lazr/jobrunner/tests/config_no_prefetch.py 2019-03-06 12:53:46 +0000
62@@ -1,4 +1,4 @@
63-BROKER_VHOST = "/"
64+BROKER_URL = "amqp://"
65 CELERY_RESULT_BACKEND = "amqp"
66 CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
67 CELERYD_CONCURRENCY = 1
68
69=== modified file 'src/lazr/jobrunner/tests/config_two_queues.py'
70--- src/lazr/jobrunner/tests/config_two_queues.py 2012-04-10 12:44:00 +0000
71+++ src/lazr/jobrunner/tests/config_two_queues.py 2019-03-06 12:53:46 +0000
72@@ -1,10 +1,10 @@
73-BROKER_VHOST = "/"
74+BROKER_URL = "amqp://"
75 CELERY_RESULT_BACKEND = "amqp"
76 CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
77 CELERYD_CONCURRENCY = 1
78 CELERY_QUEUES = {
79- "standard": {"binding_key": "job.standard"},
80- "standard_slow": {"binding_key": "job.standard.slow"},
81+ "standard": {"routing_key": "job.standard"},
82+ "standard_slow": {"routing_key": "job.standard.slow"},
83 }
84 CELERY_DEFAULT_EXCHANGE = "standard"
85 CELERY_DEFAULT_QUEUE = "standard"
86
87=== modified file 'src/lazr/jobrunner/tests/simple_config.py'
88--- src/lazr/jobrunner/tests/simple_config.py 2012-07-09 10:58:00 +0000
89+++ src/lazr/jobrunner/tests/simple_config.py 2019-03-06 12:53:46 +0000
90@@ -1,4 +1,4 @@
91-BROKER_VHOST = "/"
92+BROKER_URL = "amqp://"
93 CELERY_RESULT_BACKEND = "amqp"
94 CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
95 CELERYD_CONCURRENCY = 1
96
97=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
98--- src/lazr/jobrunner/tests/test_celerytask.py 2019-03-06 12:53:46 +0000
99+++ src/lazr/jobrunner/tests/test_celerytask.py 2019-03-06 12:53:46 +0000
100@@ -60,17 +60,22 @@
101
102 @contextlib.contextmanager
103 def running(cmd_name, cmd_args, env=None, cwd=None):
104- proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
105- stderr=subprocess.PIPE, cwd=cwd)
106- try:
107- yield proc
108- finally:
109- proc.terminate()
110- proc.wait()
111+ with open("/dev/null", "w") as devnull:
112+ proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
113+ stdout=devnull, stderr=subprocess.PIPE,
114+ cwd=cwd)
115+ try:
116+ yield proc
117+ finally:
118+ proc.terminate()
119+ proc.wait()
120
121
122 def celery_worker(config_module, file_job_dir, queue='celery'):
123- cmd_args = ('worker', '--config', config_module, '--queue', queue)
124+ cmd_args = (
125+ 'worker', '--config', config_module, '--queue', queue,
126+ '--loglevel', 'INFO',
127+ )
128 environ = dict(os.environ)
129 environ['FILE_JOB_DIR'] = file_job_dir
130 return running('celery', cmd_args, environ, cwd=get_root())
131
132=== modified file 'src/lazr/jobrunner/tests/test_jobrunnerctl.py'
133--- src/lazr/jobrunner/tests/test_jobrunnerctl.py 2015-08-03 05:28:51 +0000
134+++ src/lazr/jobrunner/tests/test_jobrunnerctl.py 2019-03-06 12:53:46 +0000
135@@ -60,6 +60,41 @@
136 job.save()
137 return RunFileJob.apply_async(args=(job_id, ), eta=eta)
138
139+ def getpids(self, control, argv):
140+ if getattr(control, 'cluster_from_argv', None) is not None:
141+ # celery >= 4.0.0
142+ cluster = control.cluster_from_argv(argv)
143+ return [node.pid for node in cluster.getpids()]
144+ else:
145+ parser = NamespacedOptionParser(argv)
146+ return [info[2] for info in control.getpids(parser, 'celery')]
147+
148+ def start(self, control, argv):
149+ if getattr(control, 'Cluster', None) is not None:
150+ # celery >= 4.0.0
151+ control.start(*argv)
152+ else:
153+ control.start(argv, 'celery')
154+
155+ def kill(self, control, argv):
156+ if getattr(control, 'Cluster', None) is not None:
157+ # celery >= 4.0.0
158+ control.kill(*argv)
159+ else:
160+ control.kill(argv, 'celery')
161+
162+ def node_alive(self, control, argv, pid):
163+ if getattr(control, 'Cluster', None) is not None:
164+ # celery >= 4.0.0
165+ cluster = control.cluster_from_argv(argv)
166+ for node in cluster:
167+ if node.pid == pid:
168+ return node.alive()
169+ else:
170+ return False
171+ else:
172+ return control.node_alive(pid)
173+
174 def test_JobRunnerCtl_starts_stops_celery_worker(self):
175 with tempdir() as temp_dir:
176 config = 'lazr.jobrunner.tests.config_no_prefetch'
177@@ -68,21 +103,19 @@
178 'worker', '--config=%s' % config, 'node_name', '-Q:node_name',
179 'celery',
180 ]
181- parser = NamespacedOptionParser(argv)
182 # We may have a stale PID file.
183- old_pids = [info[2] for info in control.getpids(parser, 'celery')]
184- control.start(argv, 'celery')
185+ old_pids = self.getpids(control, argv)
186+ self.start(control, argv)
187 sleep(1)
188- current_pids = [
189- info[2] for info in control.getpids(parser, 'celery')]
190+ current_pids = self.getpids(control, argv)
191 self.assertTrue(len(current_pids) > 0)
192 self.assertNotEqual(old_pids, current_pids)
193 for pid in current_pids:
194- self.assertTrue(control.node_alive(pid))
195- control.kill(argv, 'celery')
196+ self.assertTrue(self.node_alive(control, argv, pid))
197+ self.kill(control, argv)
198 sleep(1)
199 for pid in current_pids:
200- self.assertFalse(control.node_alive(pid))
201+ self.assertFalse(self.node_alive(control, argv, pid))
202
203 def test_JobRunnerCtl_kill_does_not_lose_jobs(self):
204 # If a celeryd instance is killed while it executes a task
205@@ -99,13 +132,13 @@
206 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
207 '-c:node_name', '1',
208 ]
209- control.start(argv, 'celery')
210- control.kill(argv, 'celery')
211+ self.start(control, argv)
212+ self.kill(control, argv)
213 sleep(1)
214- control.start(argv, 'celery')
215+ self.start(control, argv)
216 for job in all_jobs:
217 job.wait(10)
218- control.kill(argv, 'celery')
219+ self.kill(control, argv)
220
221 def test_JobRunnerCtl_kill_does_not_lose_jobs_with_eta(self):
222 with tempdir() as temp_dir:
223@@ -120,11 +153,11 @@
224 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
225 '-c:node_name', '1',
226 ]
227- control.start(argv, 'celery')
228- sleep(1)
229- control.kill(argv, 'celery')
230- sleep(1)
231- control.start(argv, 'celery')
232+ self.start(control, argv)
233+ sleep(1)
234+ self.kill(control, argv)
235+ sleep(1)
236+ self.start(control, argv)
237 for job in all_jobs:
238 job.wait(10)
239- control.kill(argv, 'celery')
240+ self.kill(control, argv)
241
242=== modified file 'src/lazr/jobrunner/tests/time_limit_config.py'
243--- src/lazr/jobrunner/tests/time_limit_config.py 2012-03-21 20:38:50 +0000
244+++ src/lazr/jobrunner/tests/time_limit_config.py 2019-03-06 12:53:46 +0000
245@@ -1,4 +1,4 @@
246-BROKER_VHOST = "/"
247+BROKER_URL = "amqp://"
248 CELERY_RESULT_BACKEND = "amqp"
249 CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
250 CELERYD_CONCURRENCY = 1
251
252=== modified file 'tox.ini'
253--- tox.ini 2019-03-06 12:53:46 +0000
254+++ tox.ini 2019-03-06 12:53:46 +0000
255@@ -1,6 +1,6 @@
256 [tox]
257 envlist =
258- py27-celery31
259+ py27-celery{31,40,41,42}
260
261 [testenv]
262 commands =
263@@ -9,3 +9,8 @@
264 .[test]
265 zope.testrunner
266 celery31: celery>=3.1,<4.0
267+ celery40: celery>=4.0,<4.1
268+ # https://github.com/celery/kombu/issues/870
269+ celery40: kombu<4.2
270+ celery41: celery>=4.1,<4.2
271+ celery42: celery>=4.2,<4.3

Subscribers

People subscribed via source and target branches

to all changes: