Merge lp:~cjwatson/lazr.jobrunner/celery-4 into lp:lazr.jobrunner

Proposed by Colin Watson
Status: Merged
Merged at revision: 56
Proposed branch: lp:~cjwatson/lazr.jobrunner/celery-4
Merge into: lp:lazr.jobrunner
Prerequisite: lp:~cjwatson/lazr.jobrunner/oops-0.0.11
Diff against target: 271 lines (+86/-41)
11 files modified
NEWS.txt (+1/-0)
setup.py (+1/-1)
src/lazr/jobrunner/celeryconfig.py (+1/-6)
src/lazr/jobrunner/celerytask.py (+7/-1)
src/lazr/jobrunner/tests/config_no_prefetch.py (+1/-1)
src/lazr/jobrunner/tests/config_two_queues.py (+3/-3)
src/lazr/jobrunner/tests/simple_config.py (+1/-1)
src/lazr/jobrunner/tests/test_celerytask.py (+13/-8)
src/lazr/jobrunner/tests/test_jobrunnerctl.py (+51/-18)
src/lazr/jobrunner/tests/time_limit_config.py (+1/-1)
tox.ini (+6/-1)
To merge this branch: bzr merge lp:~cjwatson/lazr.jobrunner/celery-4
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+364032@code.launchpad.net

Commit message

Support celery >= 4.0.

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'NEWS.txt'
--- NEWS.txt 2019-03-06 12:53:46 +0000
+++ NEWS.txt 2019-03-06 12:53:46 +0000
@@ -5,6 +5,7 @@
5----------5----------
6* Add tox testing support.6* Add tox testing support.
7* Support and require oops >= 0.0.11.7* Support and require oops >= 0.0.11.
8* Support celery >= 4.0.
89
90.13100.13
10----11----
1112
=== modified file 'setup.py'
--- setup.py 2019-03-06 12:53:46 +0000
+++ setup.py 2019-03-06 12:53:46 +0000
@@ -28,7 +28,7 @@
28 # List your project dependencies here.28 # List your project dependencies here.
29 # For more details, see:29 # For more details, see:
30 # http://packages.python.org/distribute/setuptools.html#declaring-dependencies30 # http://packages.python.org/distribute/setuptools.html#declaring-dependencies
31 'celery>=3.0',31 'celery>=3.0,<5.0',
32]32]
3333
34tests_require = [34tests_require = [
3535
=== modified file 'src/lazr/jobrunner/celeryconfig.py'
--- src/lazr/jobrunner/celeryconfig.py 2012-05-10 14:18:11 +0000
+++ src/lazr/jobrunner/celeryconfig.py 2019-03-06 12:53:46 +0000
@@ -1,8 +1,3 @@
1#BROKER_PORT = 56721BROKER_URL = "amqp://"
2#BROKER_USER = "guest"
3#BROKER_PASSWORD = "guest"
4
5BROKER_VHOST = "/"
6CELERY_RESULT_BACKEND = "amqp"2CELERY_RESULT_BACKEND = "amqp"
7CELERY_IMPORTS = ("lazr.jobrunner.jobrunner", )3CELERY_IMPORTS = ("lazr.jobrunner.jobrunner", )
8CELERYD_LOG_LEVEL = 'INFO'
94
=== modified file 'src/lazr/jobrunner/celerytask.py'
--- src/lazr/jobrunner/celerytask.py 2015-08-03 05:44:51 +0000
+++ src/lazr/jobrunner/celerytask.py 2019-03-06 12:53:46 +0000
@@ -77,7 +77,13 @@
77 listings = []77 listings = []
7878
79 def add_listing(body, message):79 def add_listing(body, message):
80 listings.append((body['task'], body['args']))80 try:
81 # celery >= 4.0.0
82 listings.append((
83 message.properties['application_headers']['task'],
84 tuple(body[0])))
85 except (AttributeError, KeyError):
86 listings.append((body['task'], body['args']))
8187
82 drain_queues(app, queue_names, callbacks=[add_listing], retain=True)88 drain_queues(app, queue_names, callbacks=[add_listing], retain=True)
83 return listings89 return listings
8490
=== modified file 'src/lazr/jobrunner/tests/config_no_prefetch.py'
--- src/lazr/jobrunner/tests/config_no_prefetch.py 2012-05-09 11:50:26 +0000
+++ src/lazr/jobrunner/tests/config_no_prefetch.py 2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
1BROKER_VHOST = "/"1BROKER_URL = "amqp://"
2CELERY_RESULT_BACKEND = "amqp"2CELERY_RESULT_BACKEND = "amqp"
3CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )3CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
4CELERYD_CONCURRENCY = 14CELERYD_CONCURRENCY = 1
55
=== modified file 'src/lazr/jobrunner/tests/config_two_queues.py'
--- src/lazr/jobrunner/tests/config_two_queues.py 2012-04-10 12:44:00 +0000
+++ src/lazr/jobrunner/tests/config_two_queues.py 2019-03-06 12:53:46 +0000
@@ -1,10 +1,10 @@
1BROKER_VHOST = "/"1BROKER_URL = "amqp://"
2CELERY_RESULT_BACKEND = "amqp"2CELERY_RESULT_BACKEND = "amqp"
3CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )3CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
4CELERYD_CONCURRENCY = 14CELERYD_CONCURRENCY = 1
5CELERY_QUEUES = {5CELERY_QUEUES = {
6 "standard": {"binding_key": "job.standard"},6 "standard": {"routing_key": "job.standard"},
7 "standard_slow": {"binding_key": "job.standard.slow"},7 "standard_slow": {"routing_key": "job.standard.slow"},
8 }8 }
9CELERY_DEFAULT_EXCHANGE = "standard"9CELERY_DEFAULT_EXCHANGE = "standard"
10CELERY_DEFAULT_QUEUE = "standard"10CELERY_DEFAULT_QUEUE = "standard"
1111
=== modified file 'src/lazr/jobrunner/tests/simple_config.py'
--- src/lazr/jobrunner/tests/simple_config.py 2012-07-09 10:58:00 +0000
+++ src/lazr/jobrunner/tests/simple_config.py 2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
1BROKER_VHOST = "/"1BROKER_URL = "amqp://"
2CELERY_RESULT_BACKEND = "amqp"2CELERY_RESULT_BACKEND = "amqp"
3CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )3CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
4CELERYD_CONCURRENCY = 14CELERYD_CONCURRENCY = 1
55
=== modified file 'src/lazr/jobrunner/tests/test_celerytask.py'
--- src/lazr/jobrunner/tests/test_celerytask.py 2019-03-06 12:53:46 +0000
+++ src/lazr/jobrunner/tests/test_celerytask.py 2019-03-06 12:53:46 +0000
@@ -60,17 +60,22 @@
6060
61@contextlib.contextmanager61@contextlib.contextmanager
62def running(cmd_name, cmd_args, env=None, cwd=None):62def running(cmd_name, cmd_args, env=None, cwd=None):
63 proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,63 with open("/dev/null", "w") as devnull:
64 stderr=subprocess.PIPE, cwd=cwd)64 proc = subprocess.Popen((cmd_name,) + cmd_args, env=env,
65 try:65 stdout=devnull, stderr=subprocess.PIPE,
66 yield proc66 cwd=cwd)
67 finally:67 try:
68 proc.terminate()68 yield proc
69 proc.wait()69 finally:
70 proc.terminate()
71 proc.wait()
7072
7173
72def celery_worker(config_module, file_job_dir, queue='celery'):74def celery_worker(config_module, file_job_dir, queue='celery'):
73 cmd_args = ('worker', '--config', config_module, '--queue', queue)75 cmd_args = (
76 'worker', '--config', config_module, '--queue', queue,
77 '--loglevel', 'INFO',
78 )
74 environ = dict(os.environ)79 environ = dict(os.environ)
75 environ['FILE_JOB_DIR'] = file_job_dir80 environ['FILE_JOB_DIR'] = file_job_dir
76 return running('celery', cmd_args, environ, cwd=get_root())81 return running('celery', cmd_args, environ, cwd=get_root())
7782
=== modified file 'src/lazr/jobrunner/tests/test_jobrunnerctl.py'
--- src/lazr/jobrunner/tests/test_jobrunnerctl.py 2015-08-03 05:28:51 +0000
+++ src/lazr/jobrunner/tests/test_jobrunnerctl.py 2019-03-06 12:53:46 +0000
@@ -60,6 +60,41 @@
60 job.save()60 job.save()
61 return RunFileJob.apply_async(args=(job_id, ), eta=eta)61 return RunFileJob.apply_async(args=(job_id, ), eta=eta)
6262
63 def getpids(self, control, argv):
64 if getattr(control, 'cluster_from_argv', None) is not None:
65 # celery >= 4.0.0
66 cluster = control.cluster_from_argv(argv)
67 return [node.pid for node in cluster.getpids()]
68 else:
69 parser = NamespacedOptionParser(argv)
70 return [info[2] for info in control.getpids(parser, 'celery')]
71
72 def start(self, control, argv):
73 if getattr(control, 'Cluster', None) is not None:
74 # celery >= 4.0.0
75 control.start(*argv)
76 else:
77 control.start(argv, 'celery')
78
79 def kill(self, control, argv):
80 if getattr(control, 'Cluster', None) is not None:
81 # celery >= 4.0.0
82 control.kill(*argv)
83 else:
84 control.kill(argv, 'celery')
85
86 def node_alive(self, control, argv, pid):
87 if getattr(control, 'Cluster', None) is not None:
88 # celery >= 4.0.0
89 cluster = control.cluster_from_argv(argv)
90 for node in cluster:
91 if node.pid == pid:
92 return node.alive()
93 else:
94 return False
95 else:
96 return control.node_alive(pid)
97
63 def test_JobRunnerCtl_starts_stops_celery_worker(self):98 def test_JobRunnerCtl_starts_stops_celery_worker(self):
64 with tempdir() as temp_dir:99 with tempdir() as temp_dir:
65 config = 'lazr.jobrunner.tests.config_no_prefetch'100 config = 'lazr.jobrunner.tests.config_no_prefetch'
@@ -68,21 +103,19 @@
68 'worker', '--config=%s' % config, 'node_name', '-Q:node_name',103 'worker', '--config=%s' % config, 'node_name', '-Q:node_name',
69 'celery',104 'celery',
70 ]105 ]
71 parser = NamespacedOptionParser(argv)
72 # We may have a stale PID file.106 # We may have a stale PID file.
73 old_pids = [info[2] for info in control.getpids(parser, 'celery')]107 old_pids = self.getpids(control, argv)
74 control.start(argv, 'celery')108 self.start(control, argv)
75 sleep(1)109 sleep(1)
76 current_pids = [110 current_pids = self.getpids(control, argv)
77 info[2] for info in control.getpids(parser, 'celery')]
78 self.assertTrue(len(current_pids) > 0)111 self.assertTrue(len(current_pids) > 0)
79 self.assertNotEqual(old_pids, current_pids)112 self.assertNotEqual(old_pids, current_pids)
80 for pid in current_pids:113 for pid in current_pids:
81 self.assertTrue(control.node_alive(pid))114 self.assertTrue(self.node_alive(control, argv, pid))
82 control.kill(argv, 'celery')115 self.kill(control, argv)
83 sleep(1)116 sleep(1)
84 for pid in current_pids:117 for pid in current_pids:
85 self.assertFalse(control.node_alive(pid))118 self.assertFalse(self.node_alive(control, argv, pid))
86119
87 def test_JobRunnerCtl_kill_does_not_lose_jobs(self):120 def test_JobRunnerCtl_kill_does_not_lose_jobs(self):
88 # If a celeryd instance is killed while it executes a task121 # If a celeryd instance is killed while it executes a task
@@ -99,13 +132,13 @@
99 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',132 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
100 '-c:node_name', '1',133 '-c:node_name', '1',
101 ]134 ]
102 control.start(argv, 'celery')135 self.start(control, argv)
103 control.kill(argv, 'celery')136 self.kill(control, argv)
104 sleep(1)137 sleep(1)
105 control.start(argv, 'celery')138 self.start(control, argv)
106 for job in all_jobs:139 for job in all_jobs:
107 job.wait(10)140 job.wait(10)
108 control.kill(argv, 'celery')141 self.kill(control, argv)
109142
110 def test_JobRunnerCtl_kill_does_not_lose_jobs_with_eta(self):143 def test_JobRunnerCtl_kill_does_not_lose_jobs_with_eta(self):
111 with tempdir() as temp_dir:144 with tempdir() as temp_dir:
@@ -120,11 +153,11 @@
120 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',153 '--config=%s' % config, 'node_name', '-Q:node_name', 'celery',
121 '-c:node_name', '1',154 '-c:node_name', '1',
122 ]155 ]
123 control.start(argv, 'celery')156 self.start(control, argv)
124 sleep(1)157 sleep(1)
125 control.kill(argv, 'celery')158 self.kill(control, argv)
126 sleep(1)159 sleep(1)
127 control.start(argv, 'celery')160 self.start(control, argv)
128 for job in all_jobs:161 for job in all_jobs:
129 job.wait(10)162 job.wait(10)
130 control.kill(argv, 'celery')163 self.kill(control, argv)
131164
=== modified file 'src/lazr/jobrunner/tests/time_limit_config.py'
--- src/lazr/jobrunner/tests/time_limit_config.py 2012-03-21 20:38:50 +0000
+++ src/lazr/jobrunner/tests/time_limit_config.py 2019-03-06 12:53:46 +0000
@@ -1,4 +1,4 @@
1BROKER_VHOST = "/"1BROKER_URL = "amqp://"
2CELERY_RESULT_BACKEND = "amqp"2CELERY_RESULT_BACKEND = "amqp"
3CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )3CELERY_IMPORTS = ("lazr.jobrunner.tests.test_celerytask", )
4CELERYD_CONCURRENCY = 14CELERYD_CONCURRENCY = 1
55
=== modified file 'tox.ini'
--- tox.ini 2019-03-06 12:53:46 +0000
+++ tox.ini 2019-03-06 12:53:46 +0000
@@ -1,6 +1,6 @@
1[tox]1[tox]
2envlist =2envlist =
3 py27-celery313 py27-celery{31,40,41,42}
44
5[testenv]5[testenv]
6commands =6commands =
@@ -9,3 +9,8 @@
9 .[test]9 .[test]
10 zope.testrunner10 zope.testrunner
11 celery31: celery>=3.1,<4.011 celery31: celery>=3.1,<4.0
12 celery40: celery>=4.0,<4.1
13 # https://github.com/celery/kombu/issues/870
14 celery40: kombu<4.2
15 celery41: celery>=4.1,<4.2
16 celery42: celery>=4.2,<4.3

Subscribers

People subscribed via source and target branches

to all changes: