Merge lp:~abentley/launchpad/celery-job-feature-flag into lp:launchpad

Proposed by Aaron Bentley on 2012-04-03
Status: Merged
Approved by: Graham Binns on 2012-04-03
Approved revision: no longer in the source branch.
Merged at revision: 15057
Proposed branch: lp:~abentley/launchpad/celery-job-feature-flag
Merge into: lp:launchpad
Diff against target: 268 lines (+130/-2)
6 files modified
lib/lp/code/model/branch.py (+11/-2)
lib/lp/code/model/tests/test_branch.py (+53/-0)
lib/lp/services/features/flags.py (+6/-0)
lib/lp/services/job/runner.py (+15/-0)
lib/lp/services/job/tests/test_celeryjob.py (+14/-0)
lib/lp/services/job/tests/test_runner.py (+31/-0)
To merge this branch: bzr merge lp:~abentley/launchpad/celery-job-feature-flag
Reviewer Review Type Date Requested Status
Graham Binns (community) code 2012-04-03 Approve on 2012-04-03
Review via email: mp+100647@code.launchpad.net

Commit Message

Control running jobs via Celery with feature flag.

Description of the Change

= Summary =
Fix 972098: celery RabbitMQ queue slowly increasing on ackee

== Proposed fix ==
Implement a feature flag to control which kinds of jobs run via Celery, so that none will run by default.

== Pre-implementation notes ==
Discussed with deryck

== Implementation details ==
jobs.celery.enabled_classses is a space-separated list of BaseJobRunner subclasses that should be run via Celery. It is implemented via lp.services.job.runner.celery_enabled().

In order to test that branchChanged was honouring the flag, we needed to test it specifically. But that didn't provide a way to get the Celery response, so that we could call response.wait. So I implemented monitor_celery, to provide the responses. This implementation also ensures that if responses will not be used, they are not sent in the first place.

== Tests ==
bin/test -t TestBranchJobViaCelery -t TestCeleryEnabled

== Demo and Q/A ==
Have webops determine the length of the "celery" RabbitMQ queue. Push a new branch. Have webops determine the length of the "celery" RabbitMQ queue. The length should not have changed.

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
  lib/lp/services/job/tests/test_celeryjob.py
  lib/lp/code/model/branch.py
  lib/lp/services/features/flags.py
  lib/lp/code/model/tests/test_branch.py

To post a comment you must log in.
Graham Binns (gmb) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/code/model/branch.py'
2--- lib/lp/code/model/branch.py 2012-03-27 18:13:01 +0000
3+++ lib/lp/code/model/branch.py 2012-04-03 16:24:23 +0000
4@@ -150,6 +150,10 @@
5 from lp.services.helpers import shortlist
6 from lp.services.job.interfaces.job import JobStatus
7 from lp.services.job.model.job import Job
8+from lp.services.job.runner import (
9+ BaseRunnableJob,
10+ celery_enabled,
11+ )
12 from lp.services.mail.notificationrecipientset import NotificationRecipientSet
13 from lp.services.propertycache import cachedproperty
14 from lp.services.webapp import urlappend
15@@ -1060,13 +1064,18 @@
16 if self.last_scanned_id != last_revision_id:
17 from lp.code.model.branchjob import BranchScanJob
18 job_id = BranchScanJob.create(self).job_id
19- if celery_scan:
20+ if celery_scan and celery_enabled('BranchScanJob'):
21 # lp.services.job.celery is imported only where needed.
22 from lp.services.job.celeryjob import CeleryRunJob
23 current = transaction.get()
24+
25 def runHook(succeeded):
26+ ignore_result = (BaseRunnableJob.celery_responses is None)
27 if succeeded:
28- CeleryRunJob.delay(job_id)
29+ response = CeleryRunJob.apply_async(
30+ (job_id,), ignore_result=ignore_result)
31+ if not ignore_result:
32+ BaseRunnableJob.celery_responses.append(response)
33 current.addAfterCommitHook(runHook)
34 self.control_format = control_format
35 self.branch_format = branch_format
36
37=== modified file 'lib/lp/code/model/tests/test_branch.py'
38--- lib/lp/code/model/tests/test_branch.py 2012-03-26 21:03:05 +0000
39+++ lib/lp/code/model/tests/test_branch.py 2012-04-03 16:24:23 +0000
40@@ -12,6 +12,7 @@
41 datetime,
42 timedelta,
43 )
44+import os
45
46 from bzrlib.bzrdir import BzrDir
47 from bzrlib.revision import NULL_REVISION
48@@ -109,6 +110,7 @@
49 from lp.services.config import config
50 from lp.services.database.constants import UTC_NOW
51 from lp.services.database.lpstorm import IStore
52+from lp.services.features.testing import FeatureFixture
53 from lp.services.osutils import override_environ
54 from lp.services.propertycache import clear_property_cache
55 from lp.services.webapp.interfaces import IOpenLaunchBag
56@@ -132,6 +134,7 @@
57 DatabaseFunctionalLayer,
58 LaunchpadFunctionalLayer,
59 LaunchpadZopelessLayer,
60+ ZopelessAppServerLayer,
61 )
62 from lp.translations.model.translationtemplatesbuildjob import (
63 ITranslationTemplatesBuildJobSource,
64@@ -285,6 +288,56 @@
65 branch.repository_format))
66
67
68+class TestBranchJobViaCelery(TestCaseWithFactory):
69+
70+ layer = ZopelessAppServerLayer
71+
72+ def test_branchChanged_via_celery(self):
73+ """Running a job via Celery succeeds and emits expected output."""
74+ # Delay importing anything that uses Celery until RabbitMQLayer is
75+ # running, so that config.rabbitmq.host is defined when
76+ # lp.services.job.celeryconfig is loaded.
77+ from lp.services.job.celeryjob import CeleryRunJob
78+ from lp.services.job.tests.test_celeryjob import monitor_celery
79+ from celery.exceptions import TimeoutError
80+ from lazr.jobrunner.tests.test_celerytask import running
81+ self.useFixture(FeatureFixture({'jobs.celery.enabled_classses':
82+ 'BranchScanJob'}))
83+ cmd_args = ('--config', 'lp.services.job.tests.celeryconfig')
84+ env = dict(os.environ)
85+ env['BROKER_URL'] = CeleryRunJob.app.conf['BROKER_URL']
86+ with running('bin/celeryd', cmd_args, env=env) as proc:
87+ self.useBzrBranches()
88+ db_branch, bzr_tree = self.create_branch_and_tree()
89+ bzr_tree.commit(
90+ 'First commit', rev_id='rev1', committer='me@example.org')
91+ db_branch.branchChanged(None, 'rev1', None, None, None)
92+ with monitor_celery() as responses:
93+ transaction.commit()
94+ try:
95+ responses[-1].wait(30)
96+ except TimeoutError:
97+ pass
98+ self.assertIn(
99+ 'Updating branch scanner status: 1 revs', proc.stderr.read())
100+ self.assertEqual(db_branch.revision_count, 1)
101+
102+ def test_branchChanged_via_celery_no_enabled(self):
103+ """Running a job via Celery succeeds and emits expected output."""
104+ # Delay importing anything that uses Celery until RabbitMQLayer is
105+ # running, so that config.rabbitmq.host is defined when
106+ # lp.services.job.celeryconfig is loaded.
107+ from lp.services.job.tests.test_celeryjob import monitor_celery
108+ self.useBzrBranches()
109+ db_branch, bzr_tree = self.create_branch_and_tree()
110+ bzr_tree.commit(
111+ 'First commit', rev_id='rev1', committer='me@example.org')
112+ db_branch.branchChanged(None, 'rev1', None, None, None)
113+ with monitor_celery() as responses:
114+ transaction.commit()
115+ self.assertEqual([], responses)
116+
117+
118 class TestBranchRevisionMethods(TestCaseWithFactory):
119 """Test the branch methods for adding and removing branch revisions."""
120
121
122=== modified file 'lib/lp/services/features/flags.py'
123--- lib/lp/services/features/flags.py 2012-03-26 21:23:40 +0000
124+++ lib/lp/services/features/flags.py 2012-04-03 16:24:23 +0000
125@@ -125,6 +125,12 @@
126 '',
127 '',
128 ''),
129+ ('jobs.celery.enabled_classses',
130+ 'space delimited',
131+ 'Names of Job classes that should be run via celery',
132+ 'No jobs run via celery',
133+ 'Celery-enabled job classes',
134+ 'https://dev.launchpad.net/CeleryJobRunner'),
135 ('js.combo_loader.enabled',
136 'boolean',
137 'Determines if we use a js combo loader or not.',
138
139=== modified file 'lib/lp/services/job/runner.py'
140--- lib/lp/services/job/runner.py 2012-03-23 17:26:11 +0000
141+++ lib/lp/services/job/runner.py 2012-04-03 16:24:23 +0000
142@@ -9,6 +9,7 @@
143 'BaseJobRunner',
144 'BaseRunnableJob',
145 'BaseRunnableJobSource',
146+ 'celery_enabled',
147 'JobCronScript',
148 'JobRunner',
149 'JobRunnerProcess',
150@@ -62,6 +63,7 @@
151 config,
152 dbconfig,
153 )
154+from lp.services.features import getFeatureFlag
155 from lp.services.job.interfaces.job import (
156 IJob,
157 IRunnableJob,
158@@ -101,6 +103,8 @@
159
160 retry_error_types = ()
161
162+ celery_responses = None
163+
164 # We redefine __eq__ and __ne__ here to prevent the security proxy
165 # from mucking up our comparisons in tests and elsewhere.
166 def __eq__(self, job):
167@@ -585,3 +589,14 @@
168
169 def __init__(self):
170 Exception.__init__(self, "Job ran too long.")
171+
172+
173+def celery_enabled(class_name):
174+ """Determine whether a given class is configured to run via Celery.
175+
176+ The name of a BaseRunnableJob must be specified.
177+ """
178+ flag = getFeatureFlag('jobs.celery.enabled_classses')
179+ if flag is None:
180+ return False
181+ return class_name in flag.split(' ')
182
183=== modified file 'lib/lp/services/job/tests/test_celeryjob.py'
184--- lib/lp/services/job/tests/test_celeryjob.py 2012-03-23 18:39:09 +0000
185+++ lib/lp/services/job/tests/test_celeryjob.py 2012-04-03 16:24:23 +0000
186@@ -2,15 +2,29 @@
187 # GNU Affero General Public License version 3 (see the file LICENSE).
188
189
190+from contextlib import contextmanager
191 import os
192
193 import transaction
194
195 from lp.code.model.branchjob import BranchScanJob
196+from lp.services.job.runner import BaseRunnableJob
197 from lp.testing import TestCaseWithFactory
198 from lp.testing.layers import ZopelessAppServerLayer
199
200
201+@contextmanager
202+def monitor_celery():
203+ """Context manager that provides a list of Celery responses."""
204+ responses = []
205+ old_responses = BaseRunnableJob.celery_responses
206+ BaseRunnableJob.celery_responses = responses
207+ try:
208+ yield responses
209+ finally:
210+ BaseRunnableJob.celery_responses = old_responses
211+
212+
213 class TestCelery(TestCaseWithFactory):
214
215 layer = ZopelessAppServerLayer
216
217=== modified file 'lib/lp/services/job/tests/test_runner.py'
218--- lib/lp/services/job/tests/test_runner.py 2012-03-30 18:13:38 +0000
219+++ lib/lp/services/job/tests/test_runner.py 2012-04-03 16:24:23 +0000
220@@ -19,6 +19,7 @@
221
222 from lp.code.interfaces.branchmergeproposal import IUpdatePreviewDiffJobSource
223 from lp.services.config import config
224+from lp.services.features.testing import FeatureFixture
225 from lp.services.job.interfaces.job import (
226 IRunnableJob,
227 JobStatus,
228@@ -26,6 +27,7 @@
229 from lp.services.job.model.job import Job
230 from lp.services.job.runner import (
231 BaseRunnableJob,
232+ celery_enabled,
233 JobCronScript,
234 JobRunner,
235 TwistedJobRunner,
236@@ -705,3 +707,32 @@
237 """No --log-twisted sets JobCronScript.log_twisted False."""
238 jcs = JobCronScript(TwistedJobRunner, test_args=[])
239 self.assertFalse(jcs.log_twisted)
240+
241+
242+class TestCeleryEnabled(TestCaseWithFactory):
243+
244+ layer = LaunchpadZopelessLayer
245+
246+ def test_no_flag(self):
247+ """With no flag set, result is False."""
248+ self.assertFalse(celery_enabled('foo'))
249+
250+ def test_matching_flag(self):
251+ """A matching flag returns True."""
252+ self.useFixture(FeatureFixture(
253+ {'jobs.celery.enabled_classses': 'foo bar'}))
254+ self.assertTrue(celery_enabled('foo'))
255+ self.assertTrue(celery_enabled('bar'))
256+
257+ def test_non_matching_flag(self):
258+ """A non-matching flag returns false."""
259+ self.useFixture(FeatureFixture(
260+ {'jobs.celery.enabled_classses': 'foo bar'}))
261+ self.assertFalse(celery_enabled('baz'))
262+ self.assertTrue(celery_enabled('bar'))
263+
264+ def test_substring(self):
265+ """A substring of an enabled class does not match."""
266+ self.useFixture(FeatureFixture(
267+ {'jobs.celery.enabled_classses': 'foobar'}))
268+ self.assertFalse(celery_enabled('bar'))