Merge lp:~adeuring/launchpad/celery-config into lp:launchpad

Proposed by Abel Deuring on 2012-04-18
Status: Merged
Approved by: Aaron Bentley on 2012-04-18
Approved revision: no longer in the source branch.
Merged at revision: 15126
Proposed branch: lp:~adeuring/launchpad/celery-config
Merge into: lp:launchpad
Diff against target: 303 lines (+270/-15)
3 files modified
lib/lp/services/config/schema-lazr.conf (+35/-0)
lib/lp/services/job/celeryconfig.py (+87/-15)
lib/lp/services/job/tests/test_celery_configuration.py (+148/-0)
To merge this branch: bzr merge lp:~adeuring/launchpad/celery-config
Reviewer Review Type Date Requested Status
Aaron Bentley (community) 2012-04-18 Approve on 2012-04-18
Abel Deuring (community) Resubmit on 2012-04-18
Review via email: mp+102535@code.launchpad.net

Commit Message

configuration for the Celery based job runner.

Description of the Change

This branch adds configuration details we need for the Celery based
job runner.

Celery reads its configuration from a Python file (we use
lp.services.job.celeryconfig). This module reads the "real config data"
from our regular lp.services.config.config module.

lp.services.job.celeryconfig already existed; I added several parameters
that were missing, and I added some sanity checks.

Note that the config module is used in two different places: The app
servers load it in order to send messages to a RabbitMQ server; several
celeryd instances consume these messages and run the jobs described by
them.

Normally, a celeryconfig modules looks like so:

PARAMETER_1 = value_1
PARAMETER_2 = value_2

i.e., the variables are set when the module is loaded. This makes it
difficult to test variants of the configuration -- re-loading a Python
module can be quite painful, so I moved this into a function configure().

The check for circular chain of linked fallback queues is quite naive and
inefficient, but I think it is good enough for our purposes: It is
unlikely that we will have, let's say, 500 different queues where some
complex fallback chains need to be tested.

test: ./bin/test -vvt lp.services.job.tests.test_celery_configuration

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/config/schema-lazr.conf
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/tests/test_celery_configuration.py

./lib/lp/services/config/schema-lazr.conf
     501: Line exceeds 80 characters.
    1112: Line exceeds 80 characters.
    1119: Line exceeds 80 characters.
    1711: Line exceeds 80 characters.

These are lines I did not change.

To post a comment you must log in.
Aaron Bentley (abentley) wrote :

I think we want the timeout to be 5 minutes. That's what it's currently set to, and it will ensure that ~99% of tasks run in the fast lane.

It's generally a bad idea to assume that sys.argv is defined. It won't be when Python is loaded as an extension. I've been bitten by that in Launchpad work before. If you must use sys.argv, you need to handle the case where it's not defined.

Since we expect to use binding keys that match the queue names, please don't require the binding key to be specified.

I think it would be nicer if configure accepted "argv" and returned a dict. Then you could do "globals().update(configure(getattr(sys, 'argv', [''])))" to actually set it. That would make it easier to test, and also shorter. You could even pull the function into a different module so that testing didn't have to actually set the globals.

check_job_specific_celeryd_configutartion is misspelled.

review: Needs Fixing
Abel Deuring (adeuring) wrote :

fixed

review: Resubmit
Aaron Bentley (abentley) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/config/schema-lazr.conf'
2--- lib/lp/services/config/schema-lazr.conf 2012-04-06 17:28:25 +0000
3+++ lib/lp/services/config/schema-lazr.conf 2012-04-18 17:27:21 +0000
4@@ -1955,3 +1955,38 @@
5 module: lp.answers.interfaces.questionjob
6 dbuser: answertracker
7 crontab_group: MAIN
8+
9+[job_runner_queues]
10+# The names of all queues.
11+queues: job job_slow branch_write_job branch_write_job_slow
12+
13+# The main job queue.
14+[job]
15+# The maximum job run time in seconds. When a job runs longer, Celery
16+# terminates it with a SoftTimeLimitExceeded exception.
17+timeout: 300
18+# If a job times out, it will be queued again in the fallback queue.
19+fallback_queue: job_slow
20+concurrency: 3
21+
22+# The queue for jobs that time out in the queue "job".
23+[job_slow]
24+# The maximum job run time in seconds. When a job runs longer, Celery
25+# terminates it with a SoftTimeLimitExceeded exception.
26+timeout: 86400
27+fallback_queue:
28+concurrency: 1
29+
30+[branch_write_job]
31+# The maximum job run time in seconds. When a job runs longer, Celery
32+# terminates it with a SoftTimeLimitExceeded exception.
33+timeout: 300
34+fallback_queue: branch_write_job_slow
35+concurrency: 3
36+
37+[branch_write_job_slow]
38+# The maximum job run time in seconds. When a job runs longer, Celery
39+# terminates it with a SoftTimeLimitExceeded exception.
40+timeout: 86400
41+fallback_queue:
42+concurrency: 1
43
44=== modified file 'lib/lp/services/job/celeryconfig.py'
45--- lib/lp/services/job/celeryconfig.py 2012-04-10 20:24:43 +0000
46+++ lib/lp/services/job/celeryconfig.py 2012-04-18 17:27:21 +0000
47@@ -1,16 +1,88 @@
48+# Copyright 2012 Canonical Ltd. This software is licensed under the
49+# GNU Affero General Public License version 3 (see the file LICENSE).
50+
51+import argparse
52+import sys
53 from lp.services.config import config
54-host, port = config.rabbitmq.host.split(':')
55-BROKER_HOST = host
56-BROKER_PORT = port
57-BROKER_USER = config.rabbitmq.userid
58-BROKER_PASSWORD = config.rabbitmq.password
59-BROKER_VHOST = config.rabbitmq.virtual_host
60-CELERY_IMPORTS = ("lp.services.job.celeryjob", )
61-CELERY_RESULT_BACKEND = "amqp"
62-CELERY_QUEUES = {
63- "branch_write_job": {"binding_key": "branch_write_job"},
64- "job": {"binding_key": "job"},
65-}
66-CELERY_DEFAULT_EXCHANGE = "job"
67-CELERY_DEFAULT_QUEUE = "job"
68-CELERY_CREATE_MISSING_QUEUES = False
69+
70+
71+class ConfigurationError(Exception):
72+ """Errors raised due to misconfiguration."""
73+
74+
75+def check_circular_fallbacks(queue):
76+ """Check for curcular fallback queues.
77+
78+ A circular chain of fallback queues could keep a job forever queued
79+ if it times out in all queues.
80+ """
81+ linked_queues = []
82+ while config[queue].fallback_queue != '':
83+ linked_queues.append(queue)
84+ queue = config[queue].fallback_queue
85+ if queue in linked_queues:
86+ raise ConfigurationError(
87+ 'Circular chain of fallback queues: '
88+ '%s already in %s' % (queue, linked_queues))
89+
90+
91+def configure(argv):
92+ """Set the Celery parameters.
93+
94+ Doing this in a function is convenient for testing.
95+ """
96+ result = {}
97+ celery_queues = {}
98+ queue_names = config.job_runner_queues.queues
99+ queue_names = queue_names.split(' ')
100+ for queue_name in queue_names:
101+ celery_queues[queue_name] = {
102+ 'binding_key': queue_name,
103+ }
104+ check_circular_fallbacks(queue_name)
105+
106+ parser = argparse.ArgumentParser()
107+ parser.add_argument('-Q', '--queues')
108+ args = parser.parse_known_args(argv)
109+ queues = args[0].queues
110+ # A queue must be specified as a command line parameter for each
111+ # celeryd instance, but this is not required for a Launchpad app server.
112+ if 'celeryd' in argv[0]:
113+ if queues is None or queues == '':
114+ raise ConfigurationError('A queue must be specified.')
115+ queues = queues.split(',')
116+ # Allow only one queue per celeryd instance. More than one queue
117+ # would require a check for consistent timeout values, and especially
118+ # a better way to specify a fallback queue.
119+ if len(queues) > 1:
120+ raise ConfigurationError(
121+ 'A celeryd instance may serve only one queue.')
122+ queue = queues[0]
123+ if queue not in celery_queues:
124+ raise ConfigurationError(
125+ 'Queue %s is not configured in schema-lazr.conf' % queue)
126+ result['CELERYD_TASK_SOFT_TIME_LIMIT'] = config[queue].timeout
127+ if config[queue].fallback_queue != '':
128+ result['FALLBACK'] = config[queue].fallback_queue
129+ result['CELERYD_CONCURRENCY'] = config[queue].concurrency
130+
131+ host, port = config.rabbitmq.host.split(':')
132+
133+ result['BROKER_HOST'] = host
134+ result['BROKER_PORT'] = port
135+ result['BROKER_USER'] = config.rabbitmq.userid
136+ result['BROKER_PASSWORD'] = config.rabbitmq.password
137+ result['BROKER_VHOST'] = config.rabbitmq.virtual_host
138+ result['CELERY_CREATE_MISSING_QUEUES'] = False
139+ result['CELERY_DEFAULT_EXCHANGE'] = 'job'
140+ result['CELERY_DEFAULT_QUEUE'] = 'job'
141+ result['CELERY_IMPORTS'] = ("lp.services.job.celeryjob", )
142+ result['CELERY_QUEUES'] = celery_queues
143+ result['CELERY_RESULT_BACKEND'] = 'amqp'
144+ return result
145+
146+try:
147+ globals().update(configure(getattr(sys, 'argv', [''])))
148+except ConfigurationError, error:
149+ print >>sys.stderr, error
150+ sys.exit(1)
151
152=== added file 'lib/lp/services/job/tests/test_celery_configuration.py'
153--- lib/lp/services/job/tests/test_celery_configuration.py 1970-01-01 00:00:00 +0000
154+++ lib/lp/services/job/tests/test_celery_configuration.py 2012-04-18 17:27:21 +0000
155@@ -0,0 +1,148 @@
156+# Copyright 2012 Canonical Ltd. This software is licensed under the
157+# GNU Affero General Public License version 3 (see the file LICENSE).
158+
159+from contextlib import contextmanager
160+
161+from lp.services.config import config
162+from lp.testing import TestCase
163+from lp.testing.layers import RabbitMQLayer
164+
165+
166+@contextmanager
167+def changed_config(changes):
168+ config.push('test_changes', changes)
169+ yield
170+ config.pop('test_changes')
171+
172+
173+class TestCeleryConfiguration(TestCase):
174+ layer = RabbitMQLayer
175+
176+ def check_default_common_parameters(self, config):
177+ # Tests for default config values that are set for app servers
178+ # and for celeryd instances.
179+
180+ # Four queues are defined; the binding key for each queue is
181+ # just the queue name.
182+ queue_names = [
183+ 'branch_write_job', 'branch_write_job_slow', 'job', 'job_slow']
184+ queues = config['CELERY_QUEUES']
185+ self.assertEqual(queue_names, sorted(queues))
186+ for name in queue_names:
187+ self.assertEqual(name, queues[name]['binding_key'])
188+
189+ self.assertEqual('localhost', config['BROKER_HOST'])
190+ # BROKER_PORT changes between test runs, so just check that it
191+ # is defined.
192+ self.assertTrue('BROKER_PORT' in config)
193+ self.assertEqual('guest', config['BROKER_USER'])
194+ self.assertEqual('guest', config['BROKER_PASSWORD'])
195+ self.assertEqual('/', config['BROKER_VHOST'])
196+ self.assertFalse(config['CELERY_CREATE_MISSING_QUEUES'])
197+ self.assertEqual('job', config['CELERY_DEFAULT_EXCHANGE'])
198+ self.assertEqual('job', config['CELERY_DEFAULT_QUEUE'])
199+ self.assertEqual(
200+ ('lp.services.job.celeryjob', ), config['CELERY_IMPORTS'])
201+ self.assertEqual('amqp', config['CELERY_RESULT_BACKEND'])
202+
203+ def test_app_server_configuration(self):
204+ from lp.services.job.celeryconfig import configure
205+ config = configure([''])
206+ self.check_default_common_parameters(config)
207+
208+ def check_job_specific_celeryd_configuration(self, expected, config):
209+ self.check_default_common_parameters(config)
210+ self.assertEqual(
211+ expected['concurrency'], config['CELERYD_CONCURRENCY'])
212+ self.assertEqual(
213+ expected['timeout'], config['CELERYD_TASK_SOFT_TIME_LIMIT'])
214+ self.assertEqual(
215+ expected['fallback'], config.get('FALLBACK', None))
216+
217+ def test_default_celeryd_configuration_fast_lanes(self):
218+ from lp.services.job.celeryconfig import configure
219+ expected = {
220+ 'concurrency': 3,
221+ 'fallback': 'job_slow',
222+ 'timeout': 300,
223+ }
224+ config = configure(['celeryd', '-Q', 'job'])
225+ self.check_default_common_parameters(config)
226+ self.check_job_specific_celeryd_configuration(expected, config)
227+ config = configure(['celeryd', '-Q', 'branch_write_job'])
228+ self.check_default_common_parameters(config)
229+ expected['fallback'] = 'branch_write_job_slow'
230+ self.check_job_specific_celeryd_configuration(expected, config)
231+
232+ def test_default_celeryd_configuration_slow_lanes(self):
233+ from lp.services.job.celeryconfig import configure
234+ expected = {
235+ 'concurrency': 1,
236+ 'fallback': None,
237+ 'timeout': 86400,
238+ }
239+ config = configure(['celeryd', '-Q', 'job_slow'])
240+ self.check_default_common_parameters(config)
241+ self.check_job_specific_celeryd_configuration(expected, config)
242+ config = configure(['celeryd', '-Q', 'branch_write_job_slow'])
243+ self.check_default_common_parameters(config)
244+ self.check_job_specific_celeryd_configuration(expected, config)
245+
246+ def test_circular_fallback_lanes(self):
247+ # Circular fallback lanes are detected.
248+ # Import late because the RabbitMQ parameters are set during layer
249+ # setup.
250+ from lp.services.job.celeryconfig import (
251+ ConfigurationError,
252+ configure,
253+ )
254+ with changed_config(
255+ """
256+ [job_slow]
257+ fallback_queue: job
258+ """):
259+ error = (
260+ "Circular chain of fallback queues: job already in "
261+ "['job', 'job_slow']"
262+ )
263+ self.assertRaisesWithContent(
264+ ConfigurationError, error, configure, [''])
265+
266+ def test_missing_queue_parameter_for_celeryd(self):
267+ # An exception is raised when celeryd is started without
268+ # the parameter -Q.
269+ # Import late because the RabbitMQ parameters are set during layer
270+ # setup.
271+ from lp.services.job.celeryconfig import (
272+ ConfigurationError,
273+ configure,
274+ )
275+ error = 'A queue must be specified.'
276+ self.assertRaisesWithContent(
277+ ConfigurationError, error, configure, ['celeryd'])
278+
279+ def test_two_queues_for_celeryd(self):
280+ # An exception is raised when celeryd is started for two queues.
281+ # Import late because the RabbitMQ parameters are set during layer
282+ # setup.
283+ from lp.services.job.celeryconfig import (
284+ ConfigurationError,
285+ configure,
286+ )
287+ error = 'A celeryd instance may serve only one queue.'
288+ self.assertRaisesWithContent(
289+ ConfigurationError, error, configure,
290+ ['celeryd', '--queue=job,branch_write_job'])
291+
292+ def test_unconfigured_queue_for_celeryd(self):
293+ # An exception is raised when celeryd is started for a queue that
294+ # is not configured.
295+ # Import late because the RabbitMQ parameters are set during layer
296+ # setup.
297+ from lp.services.job.celeryconfig import (
298+ ConfigurationError,
299+ configure,
300+ )
301+ error = 'Queue foo is not configured in schema-lazr.conf'
302+ self.assertRaisesWithContent(
303+ ConfigurationError, error, configure, ['celeryd', '--queue=foo'])