Merge lp:~abentley/launchpad/celery-everywhere-2 into lp:launchpad

Proposed by Aaron Bentley on 2012-04-09
Status: Merged
Merged at revision: 15076
Proposed branch: lp:~abentley/launchpad/celery-everywhere-2
Merge into: lp:launchpad
Diff against target: 670 lines (+254/-81)
12 files modified
lib/lp/code/model/branchjob.py (+51/-35)
lib/lp/code/model/tests/test_branch.py (+6/-6)
lib/lp/code/model/tests/test_branchjob.py (+46/-1)
lib/lp/codehosting/scanner/email.py (+5/-3)
lib/lp/codehosting/scanner/tests/test_email.py (+83/-10)
lib/lp/services/features/flags.py (+1/-1)
lib/lp/services/job/celeryconfig.py (+4/-4)
lib/lp/services/job/model/job.py (+27/-9)
lib/lp/services/job/runner.py (+8/-9)
lib/lp/services/job/tests/__init__.py (+1/-0)
lib/lp/services/job/tests/celery_helpers.py (+19/-0)
lib/lp/services/job/tests/test_runner.py (+3/-3)
To merge this branch: bzr merge lp:~abentley/launchpad/celery-everywhere-2
Reviewer Review Type Date Requested Status
Deryck Hodge (community) 2012-04-09 Approve on 2012-04-09
Review via email: mp+101284@code.launchpad.net

Commit Message

Support revision mail jobs and translation jobs via Celery.

Description of the Change

= Summary =
Support revision mail jobs and translation jobs via Celery.

== Pre-implementation notes ==
None

== Implementation details ==
Add celeryRunJob calls for RosettaUploadJob, RevisionMailJob and RevisionsAddedJob, and test.

Provide pop_notifications task to retrieve the mail notifications from a celeryd.

Rename the feature flag to remove an extra "s" in "classes".

Rename the task queue names to "job" and "branch_write_job", to be more descriptive.

Run jobs with the appropriate database user. To do this, introduce a "config" member on BaseRunnableJob that gives the configuration for that job type.

== Tests ==
bin/test -vt ViaCelery

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/services/job/runner.py
  lib/lp/code/model/tests/test_branchjob.py
  lib/lp/services/job/tests/celery_helpers.py
  lib/lp/codehosting/scanner/tests/test_email.py
  lib/lp/services/features/flags.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/model/job.py
  lib/lp/services/job/tests/__init__.py
  lib/lp/codehosting/scanner/email.py
  lib/lp/code/model/tests/test_branch.py
  lib/lp/code/model/branchjob.py

To post a comment you must log in.
Deryck Hodge (deryck) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/code/model/branchjob.py'
2--- lib/lp/code/model/branchjob.py 2012-04-06 17:28:25 +0000
3+++ lib/lp/code/model/branchjob.py 2012-04-10 16:18:21 +0000
4@@ -80,7 +80,10 @@
5 from lp.code.model.branch import Branch
6 from lp.code.model.branchmergeproposal import BranchMergeProposal
7 from lp.code.model.revision import RevisionSet
8-from lp.codehosting.bzrutils import server
9+from lp.codehosting.bzrutils import (
10+ read_locked,
11+ server,
12+ )
13 from lp.codehosting.scanner.bzrsync import BzrSync
14 from lp.codehosting.vfs import (
15 get_ro_server,
16@@ -89,6 +92,7 @@
17 from lp.codehosting.vfs.branchfs import get_real_branch_path
18 from lp.registry.interfaces.productseries import IProductSeriesSet
19 from lp.scripts.helpers import TransactionFreeOperation
20+from lp.services.config import config
21 from lp.services.database.enumcol import EnumCol
22 from lp.services.database.lpstorm import IStore
23 from lp.services.database.sqlbase import SQLBase
24@@ -296,6 +300,8 @@
25 class_job_type = BranchJobType.SCAN_BRANCH
26 memory_limit = 2 * (1024 ** 3)
27
28+ config = config.branchscanner
29+
30 @classmethod
31 def create(cls, branch):
32 """See `IBranchScanJobSource`."""
33@@ -327,7 +333,9 @@
34
35 user_error_types = (NotBranchError,)
36
37- task_queue = 'branch_write'
38+ task_queue = 'branch_write_job'
39+
40+ config = config.upgrade_branches
41
42 def getOperationDescription(self):
43 return 'upgrading a branch'
44@@ -413,6 +421,8 @@
45
46 class_job_type = BranchJobType.REVISION_MAIL
47
48+ config = config.sendbranchmail
49+
50 @classmethod
51 def create(cls, branch, revno, from_address, body, subject):
52 """See `IRevisionMailJobSource`."""
53@@ -458,6 +468,8 @@
54
55 class_job_type = BranchJobType.REVISIONS_ADDED_MAIL
56
57+ config = config.sendbranchmail
58+
59 @classmethod
60 def create(cls, branch, last_scanned_id, last_revision_id,
61 from_address):
62@@ -519,16 +531,13 @@
63 subscriptions = self.branch.getSubscriptionsByLevel(diff_levels)
64 if not subscriptions:
65 return
66-
67- self.bzr_branch.lock_read()
68- try:
69- for revision, revno in self.iterAddedMainline():
70- assert revno is not None
71- mailer = self.getMailerForRevision(
72- revision, revno, self.generateDiffs())
73- mailer.sendAll()
74- finally:
75- self.bzr_branch.unlock()
76+ with server(get_ro_server(), no_replace=True):
77+ with read_locked(self.bzr_branch):
78+ for revision, revno in self.iterAddedMainline():
79+ assert revno is not None
80+ mailer = self.getMailerForRevision(
81+ revision, revno, self.generateDiffs())
82+ mailer.sendAll()
83
84 def getDiffForRevisions(self, from_revision_id, to_revision_id):
85 """Generate the diff between from_revision_id and to_revision_id."""
86@@ -718,6 +727,8 @@
87
88 class_job_type = BranchJobType.ROSETTA_UPLOAD
89
90+ config = config.rosettabranches
91+
92 def __init__(self, branch_job):
93 super(RosettaUploadJob, self).__init__(branch_job)
94
95@@ -763,7 +774,9 @@
96 force_translations_upload)
97 branch_job = BranchJob(
98 branch, BranchJobType.ROSETTA_UPLOAD, metadata)
99- return cls(branch_job)
100+ job = cls(branch_job)
101+ job.celeryRunOnCommit()
102+ return job
103 else:
104 return None
105
106@@ -889,27 +902,28 @@
107
108 def run(self):
109 """See `IRosettaUploadJob`."""
110- # This is not called upon job creation because the branch would
111- # neither have been mirrored nor scanned then.
112- self._init_translation_file_lists()
113- # Get the product series that are connected to this branch and
114- # that want to upload translations.
115- productseriesset = getUtility(IProductSeriesSet)
116- productseries = productseriesset.findByTranslationsImportBranch(
117- self.branch, self.force_translations_upload)
118- translation_import_queue = getUtility(ITranslationImportQueue)
119- for series in productseries:
120- approver = TranslationBranchApprover(self.file_names,
121- productseries=series)
122- for iter_info in self._iter_lists_and_uploaders(series):
123- file_names, changed_files, uploader = iter_info
124- for upload_file_name, upload_file_content in changed_files:
125- if len(upload_file_content) == 0:
126- continue # Skip empty files
127- entry = translation_import_queue.addOrUpdateEntry(
128- upload_file_name, upload_file_content,
129- True, uploader, productseries=series)
130- approver.approve(entry)
131+ with server(get_ro_server(), no_replace=True):
132+ # This is not called upon job creation because the branch would
133+ # neither have been mirrored nor scanned then.
134+ self._init_translation_file_lists()
135+ # Get the product series that are connected to this branch and
136+ # that want to upload translations.
137+ productseriesset = getUtility(IProductSeriesSet)
138+ productseries = productseriesset.findByTranslationsImportBranch(
139+ self.branch, self.force_translations_upload)
140+ translation_import_queue = getUtility(ITranslationImportQueue)
141+ for series in productseries:
142+ approver = TranslationBranchApprover(self.file_names,
143+ productseries=series)
144+ for iter_info in self._iter_lists_and_uploaders(series):
145+ file_names, changed_files, uploader = iter_info
146+ for upload_file_name, upload_file_content in changed_files:
147+ if len(upload_file_content) == 0:
148+ continue # Skip empty files
149+ entry = translation_import_queue.addOrUpdateEntry(
150+ upload_file_name, upload_file_content,
151+ True, uploader, productseries=series)
152+ approver.approve(entry)
153
154 @staticmethod
155 def iterReady():
156@@ -949,7 +963,9 @@
157
158 class_job_type = BranchJobType.RECLAIM_BRANCH_SPACE
159
160- task_queue = 'branch_write'
161+ task_queue = 'branch_write_job'
162+
163+ config = config.reclaimbranchspace
164
165 def __repr__(self):
166 return '<RECLAIM_BRANCH_SPACE branch job (%(id)s) for %(branch)s>' % {
167
168=== modified file 'lib/lp/code/model/tests/test_branch.py'
169--- lib/lp/code/model/tests/test_branch.py 2012-04-06 17:28:25 +0000
170+++ lib/lp/code/model/tests/test_branch.py 2012-04-10 16:18:21 +0000
171@@ -316,8 +316,8 @@
172 # lp.services.job.celeryconfig is loaded.
173 from celery.exceptions import TimeoutError
174 self.useFixture(FeatureFixture({
175- 'jobs.celery.enabled_classses': 'BranchScanJob'}))
176- with celeryd('standard') as proc:
177+ 'jobs.celery.enabled_classes': 'BranchScanJob'}))
178+ with celeryd('job') as proc:
179 self.useBzrBranches()
180 db_branch, bzr_tree = self.create_branch_and_tree()
181 bzr_tree.commit(
182@@ -348,8 +348,8 @@
183 """Calling destroySelf causes Celery to delete the branch."""
184 from celery.exceptions import TimeoutError
185 self.useFixture(FeatureFixture({
186- 'jobs.celery.enabled_classses': 'ReclaimBranchSpaceJob'}))
187- with celeryd('branch_write'):
188+ 'jobs.celery.enabled_classes': 'ReclaimBranchSpaceJob'}))
189+ with celeryd('branch_write_job'):
190 self.useBzrBranches()
191 db_branch, tree = self.create_branch_and_tree()
192 branch_path = get_real_branch_path(db_branch.id)
193@@ -818,7 +818,7 @@
194
195 def test_requestUpgradeUsesCelery(self):
196 self.useFixture(FeatureFixture({
197- 'jobs.celery.enabled_classses': 'BranchUpgradeJob'}))
198+ 'jobs.celery.enabled_classes': 'BranchUpgradeJob'}))
199 cwd = os.getcwd()
200 self.useBzrBranches()
201 db_branch, tree = create_knit(self)
202@@ -829,7 +829,7 @@
203 db_branch.requestUpgrade(db_branch.owner)
204 with monitor_celery() as responses:
205 transaction.commit()
206- with celeryd('branch_write', cwd):
207+ with celeryd('branch_write_job', cwd):
208 responses[-1].wait(30)
209 new_branch = Branch.open(tree.branch.base)
210 self.assertEqual(
211
212=== modified file 'lib/lp/code/model/tests/test_branchjob.py'
213--- lib/lp/code/model/tests/test_branchjob.py 2012-04-06 17:28:25 +0000
214+++ lib/lp/code/model/tests/test_branchjob.py 2012-04-10 16:18:21 +0000
215@@ -60,6 +60,7 @@
216 RosettaUploadJob,
217 )
218 from lp.code.model.branchrevision import BranchRevision
219+from lp.code.model.directbranchcommit import DirectBranchCommit
220 from lp.code.model.tests.test_branch import create_knit
221 from lp.code.model.revision import RevisionSet
222 from lp.codehosting.vfs import branch_id_to_path
223@@ -67,18 +68,27 @@
224 from lp.services.config import config
225 from lp.services.database.constants import UTC_NOW
226 from lp.services.database.lpstorm import IMasterStore
227+from lp.services.features.testing import FeatureFixture
228 from lp.services.identity.interfaces.emailaddress import EmailAddressStatus
229 from lp.services.job.interfaces.job import JobStatus
230 from lp.services.job.model.job import Job
231 from lp.services.job.runner import JobRunner
232+from lp.services.job.tests import (
233+ celeryd,
234+ monitor_celery,
235+ )
236 from lp.services.osutils import override_environ
237 from lp.services.webapp import canonical_url
238-from lp.testing import TestCaseWithFactory
239+from lp.testing import (
240+ person_logged_in,
241+ TestCaseWithFactory,
242+ )
243 from lp.testing.dbuser import (
244 dbuser,
245 switch_dbuser,
246 )
247 from lp.testing.layers import (
248+ AppServerLayer,
249 DatabaseFunctionalLayer,
250 LaunchpadZopelessLayer,
251 )
252@@ -1222,6 +1232,41 @@
253 self.assertEqual([], unfinished_jobs)
254
255
256+class TestViaCelery(TestCaseWithFactory):
257+
258+ layer = AppServerLayer
259+
260+ def test_RosettaUploadJob(self):
261+ """Ensure RosettaUploadJob can run under Celery."""
262+ self.useContext(celeryd('job'))
263+ self.useBzrBranches(direct_database=True)
264+ self.useFixture(FeatureFixture({
265+ 'jobs.celery.enabled_classes': 'BranchScanJob RosettaUploadJob'
266+ }))
267+ db_branch = self.factory.makeAnyBranch()
268+ self.createBzrBranch(db_branch)
269+ commit = DirectBranchCommit(db_branch, no_race_check=True)
270+ commit.writeFile('foo.pot', 'gibberish')
271+ with monitor_celery() as responses:
272+ with person_logged_in(db_branch.owner):
273+ commit.commit('message')
274+ transaction.commit()
275+ # Wait for branch scan to complete.
276+ responses[0].wait(30)
277+ series = self.factory.makeProductSeries(branch=db_branch)
278+ RosettaUploadJob.create(
279+ commit.db_branch, NULL_REVISION,
280+ force_translations_upload=True)
281+ transaction.commit()
282+ # Wait for RosettaUploadJob to complete
283+ responses[1].wait(30)
284+ queue = getUtility(ITranslationImportQueue)
285+ entries = list(queue.getAllEntries(target=series))
286+ self.assertEqual(len(entries), 1)
287+ entry = entries[0]
288+ self.assertEqual('foo.pot', entry.path)
289+
290+
291 class TestReclaimBranchSpaceJob(TestCaseWithFactory):
292
293 layer = LaunchpadZopelessLayer
294
295=== modified file 'lib/lp/codehosting/scanner/email.py'
296--- lib/lp/codehosting/scanner/email.py 2012-01-01 02:58:52 +0000
297+++ lib/lp/codehosting/scanner/email.py 2012-04-10 16:18:21 +0000
298@@ -46,10 +46,11 @@
299 # No diff is associated with the removed email.
300 subject = "[Branch %s] %s removed" % (
301 revisions_removed.db_branch.unique_name, count)
302- getUtility(IRevisionMailJobSource).create(
303+ job = getUtility(IRevisionMailJobSource).create(
304 revisions_removed.db_branch, revno='removed',
305 from_address=config.canonical.noreply_from_address,
306 body=contents, subject=subject)
307+ job.celeryRunOnCommit()
308
309
310 def queue_tip_changed_email_jobs(tip_changed):
311@@ -66,11 +67,12 @@
312 revisions)
313 subject = "[Branch %s] %s" % (
314 tip_changed.db_branch.unique_name, revisions)
315- getUtility(IRevisionMailJobSource).create(
316+ job = getUtility(IRevisionMailJobSource).create(
317 tip_changed.db_branch, 'initial',
318 config.canonical.noreply_from_address, message, subject)
319 else:
320- getUtility(IRevisionsAddedJobSource).create(
321+ job = getUtility(IRevisionsAddedJobSource).create(
322 tip_changed.db_branch, tip_changed.db_branch.last_scanned_id,
323 tip_changed.bzr_branch.last_revision(),
324 config.canonical.noreply_from_address)
325+ job.celeryRunOnCommit()
326
327=== modified file 'lib/lp/codehosting/scanner/tests/test_email.py'
328--- lib/lp/codehosting/scanner/tests/test_email.py 2012-01-01 02:58:52 +0000
329+++ lib/lp/codehosting/scanner/tests/test_email.py 2012-04-10 16:18:21 +0000
330@@ -7,6 +7,7 @@
331
332 import email
333
334+from bzrlib.uncommit import uncommit
335 from zope.component import getUtility
336 from zope.event import notify
337
338@@ -20,13 +21,34 @@
339 IRevisionsAddedJobSource,
340 )
341 from lp.code.model.branchjob import RevisionMailJob
342+from lp.codehosting.scanner.bzrsync import BzrSync
343 from lp.codehosting.scanner import events
344 from lp.codehosting.scanner.tests.test_bzrsync import BzrSyncTestCase
345 from lp.registry.interfaces.person import IPersonSet
346+from lp.services.config import config
347+from lp.services.features.testing import FeatureFixture
348 from lp.services.job.runner import JobRunner
349 from lp.services.mail import stub
350+from lp.services.job.tests import (
351+ celeryd,
352+ monitor_celery,
353+ )
354 from lp.testing import TestCaseWithFactory
355-from lp.testing.layers import LaunchpadZopelessLayer
356+from lp.testing.dbuser import switch_dbuser
357+from lp.testing.layers import (
358+ LaunchpadZopelessLayer,
359+ ZopelessAppServerLayer,
360+ )
361+
362+
363+def add_subscriber(branch):
364+ test_user = getUtility(IPersonSet).getByEmail('test@canonical.com')
365+ branch.subscribe(
366+ test_user,
367+ BranchSubscriptionNotificationLevel.FULL,
368+ BranchSubscriptionDiffSize.FIVEKLINES,
369+ CodeReviewNotificationLevel.NOEMAIL,
370+ test_user)
371
372
373 class TestBzrSyncEmail(BzrSyncTestCase):
374@@ -39,13 +61,7 @@
375 def makeDatabaseBranch(self):
376 branch = BzrSyncTestCase.makeDatabaseBranch(self)
377 LaunchpadZopelessLayer.txn.begin()
378- test_user = getUtility(IPersonSet).getByEmail('test@canonical.com')
379- branch.subscribe(
380- test_user,
381- BranchSubscriptionNotificationLevel.FULL,
382- BranchSubscriptionDiffSize.FIVEKLINES,
383- CodeReviewNotificationLevel.NOEMAIL,
384- test_user)
385+ add_subscriber(branch)
386 LaunchpadZopelessLayer.txn.commit()
387 return branch
388
389@@ -130,12 +146,12 @@
390 recommit_email_msg = email.message_from_string(recommit_email[2])
391 recommit_email_body = recommit_email_msg.get_payload()[0].get_payload(
392 decode=True)
393- subject = '[Branch %s] Rev 1: second' % self.db_branch.unique_name
394+ subject = '[Branch %s] Rev 1: second' % self.db_branch.unique_name
395 self.assertEmailHeadersEqual(subject, recommit_email_msg['Subject'])
396 body_bits = [
397 'revno: 1',
398 'committer: %s' % author,
399- 'branch nick: %s' % self.bzr_branch.nick,
400+ 'branch nick: %s' % self.bzr_branch.nick,
401 'message:\n second',
402 'added:\n hello.txt',
403 ]
404@@ -143,6 +159,63 @@
405 self.assertTextIn(bit, recommit_email_body)
406
407
408+class TestViaCelery(TestCaseWithFactory):
409+
410+ layer = ZopelessAppServerLayer
411+
412+ @staticmethod
413+ def pop_notifications():
414+ from lp.services.job.tests.celery_helpers import pop_notifications
415+ return pop_notifications.delay().get(30)
416+
417+ def prepare(self, job_name):
418+ self.useFixture(FeatureFixture(
419+ {'jobs.celery.enabled_classes': job_name}))
420+ self.useContext(celeryd('job'))
421+ self.useBzrBranches(direct_database=True)
422+ db_branch, tree = self.create_branch_and_tree()
423+ add_subscriber(db_branch)
424+ switch_dbuser(config.branchscanner.dbuser)
425+ # Needed for feature flag teardown
426+ self.addCleanup(switch_dbuser, config.launchpad.dbuser)
427+ return db_branch, tree
428+
429+ def test_empty_branch(self):
430+ """RevisionMailJob for empty branches runs via Celery."""
431+ db_branch, tree = self.prepare('RevisionMailJob')
432+ with monitor_celery() as responses:
433+ BzrSync(db_branch).syncBranchAndClose(tree.branch)
434+ responses[-1].wait(30)
435+ self.assertEqual(1, len(self.pop_notifications()))
436+
437+ def test_uncommit_branch(self):
438+ """RevisionMailJob for removed revisions runs via Celery."""
439+ db_branch, tree = self.prepare('RevisionMailJob')
440+ tree.commit('message')
441+ bzr_sync = BzrSync(db_branch)
442+ with monitor_celery() as responses:
443+ bzr_sync.syncBranchAndClose(tree.branch)
444+ responses[0].wait(30)
445+ self.pop_notifications()
446+ uncommit(tree.branch)
447+ bzr_sync.syncBranchAndClose(tree.branch)
448+ responses[1].wait(30)
449+ self.assertEqual(1, len(self.pop_notifications()))
450+
451+ def test_revisions_added(self):
452+ """RevisionMailJob for removed revisions runs via Celery."""
453+ db_branch, tree = self.prepare('RevisionsAddedJob')
454+ tree.commit('message')
455+ bzr_sync = BzrSync(db_branch)
456+ bzr_sync.syncBranchAndClose(tree.branch)
457+ self.pop_notifications()
458+ tree.commit('message2')
459+ with monitor_celery() as responses:
460+ bzr_sync.syncBranchAndClose(tree.branch)
461+ responses[-1].wait(30)
462+ self.assertEqual(1, len(self.pop_notifications()))
463+
464+
465 class TestScanBranches(TestCaseWithFactory):
466
467 layer = LaunchpadZopelessLayer
468
469=== modified file 'lib/lp/services/features/flags.py'
470--- lib/lp/services/features/flags.py 2012-04-05 13:05:04 +0000
471+++ lib/lp/services/features/flags.py 2012-04-10 16:18:21 +0000
472@@ -125,7 +125,7 @@
473 '',
474 '',
475 ''),
476- ('jobs.celery.enabled_classses',
477+ ('jobs.celery.enabled_classes',
478 'space delimited',
479 'Names of Job classes that should be run via celery',
480 'No jobs run via celery',
481
482=== modified file 'lib/lp/services/job/celeryconfig.py'
483--- lib/lp/services/job/celeryconfig.py 2012-04-06 17:28:25 +0000
484+++ lib/lp/services/job/celeryconfig.py 2012-04-10 16:18:21 +0000
485@@ -8,9 +8,9 @@
486 CELERY_IMPORTS = ("lp.services.job.celeryjob", )
487 CELERY_RESULT_BACKEND = "amqp"
488 CELERY_QUEUES = {
489- "branch_write": {"binding_key": "branch_write"},
490- "standard": {"binding_key": "standard"},
491+ "branch_write_job": {"binding_key": "branch_write_job"},
492+ "job": {"binding_key": "job"},
493 }
494-CELERY_DEFAULT_EXCHANGE = "standard"
495-CELERY_DEFAULT_QUEUE = "standard"
496+CELERY_DEFAULT_EXCHANGE = "job"
497+CELERY_DEFAULT_QUEUE = "job"
498 CELERY_CREATE_MISSING_QUEUES = False
499
500=== modified file 'lib/lp/services/job/model/job.py'
501--- lib/lp/services/job/model/job.py 2012-03-21 20:39:49 +0000
502+++ lib/lp/services/job/model/job.py 2012-04-10 16:18:21 +0000
503@@ -33,7 +33,9 @@
504 Int,
505 Reference,
506 )
507+from storm.zope.interfaces import IZStorm
508 import transaction
509+from zope.component import getUtility
510 from zope.interface import implements
511
512 from lp.services.config import dbconfig
513@@ -253,17 +255,33 @@
514
515 needs_init = True
516
517- @classmethod
518- def get(cls, job_id):
519- if cls.needs_init:
520- scripts.execute_zcml_for_scripts(use_web_security=False)
521- cls.needs_init = False
522-
523- dbconfig.override(
524- dbuser='branchscanner', isolation_level='read_committed')
525+ @staticmethod
526+ def getDerived(job_id):
527+ """Return the derived branch job associated with the job id."""
528 from lp.code.model.branchjob import (
529 BranchJob,
530 )
531 store = IStore(BranchJob)
532 branch_job = store.find(BranchJob, BranchJob.job == job_id).one()
533- return branch_job.makeDerived()
534+ if branch_job is None:
535+ raise ValueError('No BranchJob with job=%s.' % job_id)
536+ return branch_job.makeDerived(), store
537+
538+ @classmethod
539+ def switchDBUser(cls, job_id):
540+ """Switch to the DB user associated with this Job ID."""
541+ derived, store = cls.getDerived(job_id)
542+ dbconfig.override(
543+ dbuser=derived.config.dbuser, isolation_level='read_committed')
544+ transaction.abort()
545+ getUtility(IZStorm).remove(store)
546+ store.close()
547+
548+ @classmethod
549+ def get(cls, job_id):
550+ transaction.abort()
551+ if cls.needs_init:
552+ scripts.execute_zcml_for_scripts(use_web_security=False)
553+ cls.needs_init = False
554+ cls.switchDBUser(job_id)
555+ return cls.getDerived(job_id)[0]
556
557=== modified file 'lib/lp/services/job/runner.py'
558--- lib/lp/services/job/runner.py 2012-04-06 17:28:25 +0000
559+++ lib/lp/services/job/runner.py 2012-04-10 16:18:21 +0000
560@@ -103,7 +103,7 @@
561
562 retry_error_types = ()
563
564- task_queue = 'standard'
565+ task_queue = 'job'
566
567 celery_responses = None
568
569@@ -190,23 +190,22 @@
570 return oops_config.create(
571 context=dict(exc_info=info))
572
573- def runViaCelery(self):
574+ def runViaCelery(self, ignore_result=False):
575 """Request that this job be run via celery."""
576 # Avoid importing from lp.services.job.celeryjob where not needed, to
577 # avoid configuring Celery when Rabbit is not configured.
578 from lp.services.job.celeryjob import CeleryRunJob
579- ignore_result = bool(BaseRunnableJob.celery_responses is None)
580- response = CeleryRunJob.apply_async(
581+ return CeleryRunJob.apply_async(
582 (self.job_id,), queue=self.task_queue,
583 ignore_result=ignore_result)
584- if not ignore_result:
585- BaseRunnableJob.celery_responses.append(response)
586- return response
587
588 def celeryCommitHook(self, succeeded):
589 """Hook function to call when a commit completes."""
590 if succeeded:
591- self.runViaCelery()
592+ ignore_result = bool(BaseRunnableJob.celery_responses is None)
593+ response = self.runViaCelery(ignore_result)
594+ if not ignore_result:
595+ BaseRunnableJob.celery_responses.append(response)
596
597 def celeryRunOnCommit(self):
598 """Configure transaction so that commit runs this job via Celery."""
599@@ -623,7 +622,7 @@
600
601 The name of a BaseRunnableJob must be specified.
602 """
603- flag = getFeatureFlag('jobs.celery.enabled_classses')
604+ flag = getFeatureFlag('jobs.celery.enabled_classes')
605 if flag is None:
606 return False
607 return class_name in flag.split(' ')
608
609=== modified file 'lib/lp/services/job/tests/__init__.py'
610--- lib/lp/services/job/tests/__init__.py 2012-04-06 17:28:25 +0000
611+++ lib/lp/services/job/tests/__init__.py 2012-04-10 16:18:21 +0000
612@@ -32,6 +32,7 @@
613 '--concurrency', '1',
614 '--loglevel', 'INFO',
615 '--queues', queue,
616+ '--include', 'lp.services.job.tests.celery_helpers',
617 )
618 with running('bin/celeryd', cmd_args, cwd=cwd) as proc:
619 # Wait for celeryd startup to complete.
620
621=== added file 'lib/lp/services/job/tests/celery_helpers.py'
622--- lib/lp/services/job/tests/celery_helpers.py 1970-01-01 00:00:00 +0000
623+++ lib/lp/services/job/tests/celery_helpers.py 2012-04-10 16:18:21 +0000
624@@ -0,0 +1,19 @@
625+# Copyright 2012 Canonical Ltd. This software is licensed under the
626+# GNU Affero General Public License version 3 (see the file LICENSE).
627+
628+__metaclass__ = type
629+
630+__all__ = ['pop_notifications']
631+
632+# Force the correct celeryconfig to be used.
633+import lp.services.job.celeryjob
634+# Quiet lint unused import warning.
635+lp.services.job.celeryjob
636+
637+from celery.task import task
638+
639+
640+@task
641+def pop_notifications():
642+ from lp.testing.mail_helpers import pop_notifications
643+ return pop_notifications()
644
645=== modified file 'lib/lp/services/job/tests/test_runner.py'
646--- lib/lp/services/job/tests/test_runner.py 2012-04-03 16:06:43 +0000
647+++ lib/lp/services/job/tests/test_runner.py 2012-04-10 16:18:21 +0000
648@@ -720,19 +720,19 @@
649 def test_matching_flag(self):
650 """A matching flag returns True."""
651 self.useFixture(FeatureFixture(
652- {'jobs.celery.enabled_classses': 'foo bar'}))
653+ {'jobs.celery.enabled_classes': 'foo bar'}))
654 self.assertTrue(celery_enabled('foo'))
655 self.assertTrue(celery_enabled('bar'))
656
657 def test_non_matching_flag(self):
658 """A non-matching flag returns false."""
659 self.useFixture(FeatureFixture(
660- {'jobs.celery.enabled_classses': 'foo bar'}))
661+ {'jobs.celery.enabled_classes': 'foo bar'}))
662 self.assertFalse(celery_enabled('baz'))
663 self.assertTrue(celery_enabled('bar'))
664
665 def test_substring(self):
666 """A substring of an enabled class does not match."""
667 self.useFixture(FeatureFixture(
668- {'jobs.celery.enabled_classses': 'foobar'}))
669+ {'jobs.celery.enabled_classes': 'foobar'}))
670 self.assertFalse(celery_enabled('bar'))