Merge lp:~abentley/launchpad/celery-everywhere-3 into lp:launchpad

Proposed by Aaron Bentley on 2012-04-11
Status: Merged
Approved by: Aaron Bentley on 2012-04-13
Approved revision: no longer in the source branch.
Merged at revision: 15097
Proposed branch: lp:~abentley/launchpad/celery-everywhere-3
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/celery-everywhere-2
Diff against target: 628 lines (+204/-77)
10 files modified
lib/lp/code/model/branchmergeproposal.py (+4/-3)
lib/lp/code/model/branchmergeproposaljob.py (+44/-50)
lib/lp/code/model/tests/test_branchmergeproposaljobs.py (+100/-2)
lib/lp/codehosting/scanner/tests/test_email.py (+6/-10)
lib/lp/codehosting/scanner/tests/test_mergedetection.py (+1/-2)
lib/lp/services/job/celeryjob.py (+9/-1)
lib/lp/services/job/model/job.py (+12/-4)
lib/lp/services/job/runner.py (+7/-4)
lib/lp/services/job/tests/__init__.py (+8/-1)
lib/lp/services/job/tests/test_job.py (+13/-0)
To merge this branch: bzr merge lp:~abentley/launchpad/celery-everywhere-3
Reviewer Review Type Date Requested Status
j.c.sackett (community) 2012-04-12 Approve on 2012-04-12
Richard Harding (community) code* 2012-04-11 Approve on 2012-04-12
Review via email: mp+101626@code.launchpad.net

Commit Message

Support running BranchMergeProposalJobs via Celery.

Description of the Change

= Summary =
Implement Celery support for BranchMergeProposalJobs

== Proposed fix ==
Add BranchMergeProposalJob support to UniversalJobSource, add celeryRunOnCommit to BranchMergeProposalJob.create, and update the Jobs

== Pre-implementation notes ==
None

== Implementation details ==
All jobs specify a config.

Some jobs send mail, so I extracted pop_remote_notifications from test_email.

Jobs which need access to branches are updated to use server(no_replace=True), and the corresponding functionality is removed from contextManager().

BranchMergeProposalJobDerived now uses EnumeratedSubclass as its metaclass, to match BranchJobDerived. This makes BranchMergeProposalJobFactory redundant, so it is removed.

Some of BranchMergeProposalJobDerived.create is extracted to _create, so that subclasses can reuse it.

The contents of ReviewRequestedEmail mail was untested, so I added a test.

It turns out that apply_async(ignore_result=True) does not work as I expected, so I've added CeleryRunJobIgnoreResult to support it.

== LOC ==
This code is part of a resourced arc that will reduce LOC

== Tests ==
bin/test -t TestViaCelery test_branchmergeproposaljobs

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/services/job/tests/test_runner.py
  lib/lp/codehosting/scanner/tests/test_mergedetection.py
  lib/lp/services/job/runner.py
  lib/lp/code/model/tests/test_branchmergeproposaljobs.py
  lib/lp/code/model/branchmergeproposal.py
  lib/lp/code/model/tests/test_branchjob.py
  lib/lp/services/job/tests/celery_helpers.py
  lib/lp/services/job/celeryjob.py
  lib/lp/codehosting/scanner/tests/test_email.py
  lib/lp/services/job/tests/test_job.py
  lib/lp/services/features/flags.py
  lib/lp/code/model/branchmergeproposaljob.py
  lib/lp/services/job/celeryconfig.py
  lib/lp/services/job/model/job.py
  lib/lp/services/job/tests/__init__.py
  lib/lp/codehosting/scanner/email.py
  lib/lp/code/model/tests/test_branch.py
  lib/lp/code/model/branchjob.py

To post a comment you must log in.
Richard Harding (rharding) wrote :

Aaron, thanks for the branch. Looks good. I've got a couple of questions, but approving as these are small items.

- Are we allowed to do multiple imports on a single line if they're done as per the circular deps code in #10?
- cant the config.merge_proposal_jobs be put on the super class to prevent the repetition?
- can the tests setup the context in the setUp method of the test class in TestViaCelery

review: Approve (code*)
Aaron Bentley (abentley) wrote :

> - Are we allowed to do multiple imports on a single line if they're done as
> per the circular deps code in #10?

No. I'll fix it.

> - cant the config.merge_proposal_jobs be put on the super class to prevent
> the repetition?

It can. It shouldn't be. These values should all be different, because each job is supposed to have its own database user. However, currently all merge proposal jobs have the same database user (and config), so I've reproduced that here.

> - can the tests setup the context in the setUp method of the test class in
> TestViaCelery

It can, but I never use setUp if I can help it. It impedes code re-use and it's less explicit.
http://brainupdate.wordpress.com/2011/04/07/setup-in-unit-tests-mostly-harmful/

j.c.sackett (jcsackett) wrote :

Aaron--

Very nice. Marking as improved, with a minor nag below.

> === modified file 'lib/lp/services/job/model/job.py'
> --- lib/lp/services/job/model/job.py 2012-04-09 19:02:25 +0000
> +++ lib/lp/services/job/model/job.py 2012-04-11 18:53:28 +0000
> @@ -261,11 +261,18 @@
> from lp.code.model.branchjob import (
> BranchJob,
> )
> - store = IStore(BranchJob)
> - branch_job = store.find(BranchJob, BranchJob.job == job_id).one()
> - if branch_job is None:
> + from lp.code.model.branchmergeproposaljob import (
> + BranchMergeProposalJob,
> + )
> + store = IStore(Job)
> + for cls in [BranchJob, BranchMergeProposalJob]:
> + base_job = store.find(cls, cls.job == job_id).one()
> + if base_job is not None:
> + break
> + if base_job is None:
> raise ValueError('No BranchJob with job=%s.' % job_id)
> - return branch_job.makeDerived(), store
> +
> + return base_job.makeDerived(), store
>
> @classmethod
> def switchDBUser(cls, job_id):

I'm assuming the imports within this method are b/c of circular import
issues; could you throw in a comment saying so?

> === modified file 'lib/lp/services/job/runner.py'
> --- lib/lp/services/job/runner.py 2012-04-10 20:24:43 +0000
> +++ lib/lp/services/job/runner.py 2012-04-11 18:53:28 +0000
> @@ -194,10 +194,13 @@
> """Request that this job be run via celery."""
> # Avoid importing from lp.services.job.celeryjob where not needed, to
> # avoid configuring Celery when Rabbit is not configured.
> - from lp.services.job.celeryjob import CeleryRunJob
> - return CeleryRunJob.apply_async(
> - (self.job_id,), queue=self.task_queue,
> - ignore_result=ignore_result)
> + from lp.services.job.celeryjob import (
> + CeleryRunJob, CeleryRunJobIgnoreResult)
> + if ignore_result:
> + cls = CeleryRunJobIgnoreResult
> + else:
> + cls = CeleryRunJob
> + return cls.apply_async((self.job_id,), queue=self.task_queue)

Same comment as above.

review: Approve
Aaron Bentley (abentley) wrote :

> Aaron--
>
> Very nice. Marking as improved, with a minor nag below.
>
> > === modified file 'lib/lp/services/job/model/job.py'
> > --- lib/lp/services/job/model/job.py 2012-04-09 19:02:25 +0000
> > +++ lib/lp/services/job/model/job.py 2012-04-11 18:53:28 +0000
> > @@ -261,11 +261,18 @@
> > from lp.code.model.branchjob import (
> > BranchJob,
> > )
> > - store = IStore(BranchJob)
> > - branch_job = store.find(BranchJob, BranchJob.job == job_id).one()
> > - if branch_job is None:
> > + from lp.code.model.branchmergeproposaljob import (
> > + BranchMergeProposalJob,
> > + )
> > + store = IStore(Job)
> > + for cls in [BranchJob, BranchMergeProposalJob]:
> > + base_job = store.find(cls, cls.job == job_id).one()
> > + if base_job is not None:
> > + break
> > + if base_job is None:
> > raise ValueError('No BranchJob with job=%s.' % job_id)
> > - return branch_job.makeDerived(), store
> > +
> > + return base_job.makeDerived(), store
> >
> > @classmethod
> > def switchDBUser(cls, job_id):
>
> I'm assuming the imports within this method are b/c of circular import
> issues; could you throw in a comment saying so?

Done.

> > === modified file 'lib/lp/services/job/runner.py'
> > --- lib/lp/services/job/runner.py 2012-04-10 20:24:43 +0000
> > +++ lib/lp/services/job/runner.py 2012-04-11 18:53:28 +0000
> > @@ -194,10 +194,13 @@
> > """Request that this job be run via celery."""
> > # Avoid importing from lp.services.job.celeryjob where not needed,
> to
> > # avoid configuring Celery when Rabbit is not configured.
> > - from lp.services.job.celeryjob import CeleryRunJob
> > - return CeleryRunJob.apply_async(
> > - (self.job_id,), queue=self.task_queue,
> > - ignore_result=ignore_result)
> > + from lp.services.job.celeryjob import (
> > + CeleryRunJob, CeleryRunJobIgnoreResult)
> > + if ignore_result:
> > + cls = CeleryRunJobIgnoreResult
> > + else:
> > + cls = CeleryRunJob
> > + return cls.apply_async((self.job_id,), queue=self.task_queue)
>
> Same comment as above.

There is already a comment explaining these imports: "Avoid importing from lp.services.job.celeryjob where not needed..."

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/code/model/branchmergeproposal.py'
2--- lib/lp/code/model/branchmergeproposal.py 2012-02-28 04:24:19 +0000
3+++ lib/lp/code/model/branchmergeproposal.py 2012-04-13 19:25:26 +0000
4@@ -208,8 +208,9 @@
5 def next_preview_diff_job(self):
6 # circular dependencies
7 from lp.code.model.branchmergeproposaljob import (
8- BranchMergeProposalJob, BranchMergeProposalJobFactory,
9- BranchMergeProposalJobType)
10+ BranchMergeProposalJob,
11+ BranchMergeProposalJobType,
12+ )
13 jobs = Store.of(self).find(
14 BranchMergeProposalJob,
15 BranchMergeProposalJob.branch_merge_proposal == self,
16@@ -219,7 +220,7 @@
17 Job._status.is_in([JobStatus.WAITING, JobStatus.RUNNING]))
18 job = jobs.order_by(Job.scheduled_start, Job.date_created).first()
19 if job is not None:
20- return BranchMergeProposalJobFactory.create(job)
21+ return job.makeDerived()
22 else:
23 return None
24
25
26=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
27--- lib/lp/code/model/branchmergeproposaljob.py 2011-12-30 06:14:56 +0000
28+++ lib/lp/code/model/branchmergeproposaljob.py 2012-04-13 19:25:26 +0000
29@@ -14,7 +14,6 @@
30
31 __all__ = [
32 'BranchMergeProposalJob',
33- 'BranchMergeProposalJobFactory',
34 'BranchMergeProposalJobSource',
35 'BranchMergeProposalJobType',
36 'CodeReviewCommentEmailJob',
37@@ -89,6 +88,7 @@
38 from lp.code.mail.codereviewcomment import CodeReviewCommentMailer
39 from lp.code.model.branchmergeproposal import BranchMergeProposal
40 from lp.code.model.diff import PreviewDiff
41+from lp.codehosting.bzrutils import server
42 from lp.codehosting.vfs import (
43 get_ro_server,
44 get_rw_server,
45@@ -98,7 +98,10 @@
46 from lp.services.database.enumcol import EnumCol
47 from lp.services.database.stormbase import StormBase
48 from lp.services.job.interfaces.job import JobStatus
49-from lp.services.job.model.job import Job
50+from lp.services.job.model.job import (
51+ EnumeratedSubclass,
52+ Job,
53+ )
54 from lp.services.job.runner import (
55 BaseRunnableJob,
56 BaseRunnableJobSource,
57@@ -236,10 +239,15 @@
58 'No occurrence of %s has key %s' % (klass.__name__, key))
59 return instance
60
61+ def makeDerived(self):
62+ return BranchMergeProposalJobDerived.makeSubclass(self)
63+
64
65 class BranchMergeProposalJobDerived(BaseRunnableJob):
66-
67 """Intermediate class for deriving from BranchMergeProposalJob."""
68+
69+ __metaclass__ = EnumeratedSubclass
70+
71 delegates(IBranchMergeProposalJob)
72
73 def __init__(self, job):
74@@ -256,9 +264,15 @@
75 @classmethod
76 def create(cls, bmp):
77 """See `IMergeProposalCreationJob`."""
78- job = BranchMergeProposalJob(
79- bmp, cls.class_job_type, {})
80- return cls(job)
81+ return cls._create(bmp, {})
82+
83+ @classmethod
84+ def _create(cls, bmp, metadata):
85+ base_job = BranchMergeProposalJob(
86+ bmp, cls.class_job_type, metadata)
87+ job = cls(base_job)
88+ job.celeryRunOnCommit()
89+ return job
90
91 @classmethod
92 def get(cls, job_id):
93@@ -317,6 +331,8 @@
94
95 class_job_type = BranchMergeProposalJobType.MERGE_PROPOSAL_NEEDS_REVIEW
96
97+ config = config.merge_proposal_jobs
98+
99 def run(self):
100 """See `IMergeProposalNeedsReviewEmailJob`."""
101 mailer = BMPMailer.forCreation(
102@@ -344,6 +360,8 @@
103
104 class_job_type = BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF
105
106+ config = config.merge_proposal_jobs
107+
108 user_error_types = (UpdatePreviewDiffNotReady, )
109
110 retry_error_types = (BranchHasPendingWrites, )
111@@ -369,8 +387,9 @@
112 def run(self):
113 """See `IRunnableJob`."""
114 self.checkReady()
115- preview = PreviewDiff.fromBranchMergeProposal(
116- self.branch_merge_proposal)
117+ with server(get_ro_server(), no_replace=True):
118+ preview = PreviewDiff.fromBranchMergeProposal(
119+ self.branch_merge_proposal)
120 with BranchMergeProposalDelta.monitor(
121 self.branch_merge_proposal):
122 self.branch_merge_proposal.preview_diff = preview
123@@ -473,6 +492,8 @@
124
125 class_job_type = BranchMergeProposalJobType.CODE_REVIEW_COMMENT_EMAIL
126
127+ config = config.merge_proposal_jobs
128+
129 def run(self):
130 """See `IRunnableJob`."""
131 mailer = CodeReviewCommentMailer.forCreation(self.code_review_comment)
132@@ -483,8 +504,7 @@
133 """See `ICodeReviewCommentEmailJobSource`."""
134 metadata = cls.getMetadata(code_review_comment)
135 bmp = code_review_comment.branch_merge_proposal
136- job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata)
137- return cls(job)
138+ return cls._create(bmp, metadata)
139
140 @staticmethod
141 def getMetadata(code_review_comment):
142@@ -525,6 +545,8 @@
143
144 class_job_type = BranchMergeProposalJobType.REVIEW_REQUEST_EMAIL
145
146+ config = config.merge_proposal_jobs
147+
148 def run(self):
149 """See `IRunnableJob`."""
150 reason = RecipientReason.forReviewer(
151@@ -538,8 +560,7 @@
152 """See `IReviewRequestedEmailJobSource`."""
153 metadata = cls.getMetadata(review_request)
154 bmp = review_request.branch_merge_proposal
155- job = BranchMergeProposalJob(bmp, cls.class_job_type, metadata)
156- return cls(job)
157+ return cls._create(bmp, metadata)
158
159 @staticmethod
160 def getMetadata(review_request):
161@@ -591,6 +612,8 @@
162
163 class_job_type = BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED
164
165+ config = config.merge_proposal_jobs
166+
167 def run(self):
168 """See `IRunnableJob`."""
169 mailer = BMPMailer.forModification(
170@@ -601,9 +624,7 @@
171 def create(cls, merge_proposal, delta_text, editor):
172 """See `IReviewRequestedEmailJobSource`."""
173 metadata = cls.getMetadata(delta_text, editor)
174- job = BranchMergeProposalJob(
175- merge_proposal, cls.class_job_type, metadata)
176- return cls(job)
177+ return cls._create(merge_proposal, metadata)
178
179 @staticmethod
180 def getMetadata(delta_text, editor):
181@@ -658,6 +679,8 @@
182
183 class_job_type = BranchMergeProposalJobType.GENERATE_INCREMENTAL_DIFF
184
185+ config = config.merge_proposal_jobs
186+
187 def acquireLease(self, duration=600):
188 return self.job.acquireLease(duration)
189
190@@ -665,15 +688,14 @@
191 revision_set = getUtility(IRevisionSet)
192 old_revision = revision_set.getByRevisionId(self.old_revision_id)
193 new_revision = revision_set.getByRevisionId(self.new_revision_id)
194- self.branch_merge_proposal.generateIncrementalDiff(
195- old_revision, new_revision)
196+ with server(get_ro_server(), no_replace=True):
197+ self.branch_merge_proposal.generateIncrementalDiff(
198+ old_revision, new_revision)
199
200 @classmethod
201 def create(cls, merge_proposal, old_revision_id, new_revision_id):
202 metadata = cls.getMetadata(old_revision_id, new_revision_id)
203- job = BranchMergeProposalJob(
204- merge_proposal, cls.class_job_type, metadata)
205- return cls(job)
206+ return cls._create(merge_proposal, metadata)
207
208 @staticmethod
209 def getMetadata(old_revision_id, new_revision_id):
210@@ -710,31 +732,6 @@
211 return format_address_for_person(registrant)
212
213
214-class BranchMergeProposalJobFactory:
215- """Construct a derived merge proposal job for a BranchMergeProposalJob."""
216-
217- job_classes = {
218- BranchMergeProposalJobType.MERGE_PROPOSAL_NEEDS_REVIEW:
219- MergeProposalNeedsReviewEmailJob,
220- BranchMergeProposalJobType.UPDATE_PREVIEW_DIFF:
221- UpdatePreviewDiffJob,
222- BranchMergeProposalJobType.CODE_REVIEW_COMMENT_EMAIL:
223- CodeReviewCommentEmailJob,
224- BranchMergeProposalJobType.REVIEW_REQUEST_EMAIL:
225- ReviewRequestedEmailJob,
226- BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED:
227- MergeProposalUpdatedEmailJob,
228- BranchMergeProposalJobType.GENERATE_INCREMENTAL_DIFF:
229- GenerateIncrementalDiffJob,
230- }
231-
232- @classmethod
233- def create(cls, bmp_job):
234- """Create the derived job for the bmp_job's job type."""
235- job_class = cls.job_classes[bmp_job.job_type]
236- return job_class(bmp_job)
237-
238-
239 class BranchMergeProposalJobSource(BaseRunnableJobSource):
240 """Provide a job source for all merge proposal jobs.
241
242@@ -748,10 +745,7 @@
243 def contextManager():
244 """See `IJobSource`."""
245 errorlog.globalErrorUtility.configure('merge_proposal_jobs')
246- server = get_ro_server()
247- server.start_server()
248 yield
249- server.stop_server()
250
251 @staticmethod
252 def get(job_id):
253@@ -763,7 +757,7 @@
254 or its job_type does not match the desired subclass.
255 """
256 job = BranchMergeProposalJob.get(job_id)
257- return BranchMergeProposalJobFactory.create(job)
258+ return job.makeDerived()
259
260 @staticmethod
261 def iterReady(job_type=None):
262@@ -802,7 +796,7 @@
263 # If the job is running, then skip it
264 if job.status == JobStatus.RUNNING:
265 continue
266- derived_job = BranchMergeProposalJobFactory.create(bmp_job)
267+ derived_job = bmp_job.makeDerived()
268 # If the job is an update preview diff, then check that it is
269 # ready.
270 if IUpdatePreviewDiffJob.providedBy(derived_job):
271
272=== modified file 'lib/lp/code/model/tests/test_branchmergeproposaljobs.py'
273--- lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2012-01-20 15:42:44 +0000
274+++ lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2012-04-13 19:25:26 +0000
275@@ -1,4 +1,4 @@
276-# Copyright 2010-2011 Canonical Ltd. This software is licensed under the
277+# Copyright 2010-2012 Canonical Ltd. This software is licensed under the
278 # GNU Affero General Public License version 3 (see the file LICENSE).
279
280 """Tests for branch merge proposal jobs."""
281@@ -55,9 +55,15 @@
282 )
283 from lp.code.subscribers.branchmergeproposal import merge_proposal_modified
284 from lp.services.config import config
285+from lp.services.features.testing import FeatureFixture
286 from lp.services.job.interfaces.job import JobStatus
287 from lp.services.job.model.job import Job
288 from lp.services.job.runner import JobRunner
289+from lp.services.job.tests import (
290+ celeryd,
291+ monitor_celery,
292+ pop_remote_notifications,
293+ )
294 from lp.services.osutils import override_environ
295 from lp.services.webapp.testing import verifyObject
296 from lp.testing import (
297@@ -65,7 +71,10 @@
298 TestCaseWithFactory,
299 )
300 from lp.testing.dbuser import dbuser
301-from lp.testing.layers import LaunchpadZopelessLayer
302+from lp.testing.layers import (
303+ AppServerLayer,
304+ LaunchpadZopelessLayer,
305+ )
306 from lp.testing.mail_helpers import pop_notifications
307
308
309@@ -549,6 +558,15 @@
310 'emailing a reviewer requesting a review',
311 job.getOperationDescription())
312
313+ def test_run_sends_mail(self):
314+ request = self.factory.makeCodeReviewVoteReference()
315+ job = ReviewRequestedEmailJob.create(request)
316+ job.run()
317+ (notification,) = pop_notifications()
318+ self.assertIn(
319+ 'You have been requested to review the proposed merge',
320+ notification.get_payload(decode=True))
321+
322
323 class TestMergeProposalUpdatedEmailJob(TestCaseWithFactory):
324
325@@ -573,3 +591,83 @@
326 self.assertEqual(
327 'emailing subscribers about merge proposal changes',
328 job.getOperationDescription())
329+
330+
331+class TestViaCelery(TestCaseWithFactory):
332+
333+ layer = AppServerLayer
334+
335+ def test_MergeProposalNeedsReviewEmailJob(self):
336+ """MergeProposalNeedsReviewEmailJob runs under Celery."""
337+ self.useFixture(FeatureFixture(
338+ {'jobs.celery.enabled_classes':
339+ 'MergeProposalNeedsReviewEmailJob'}))
340+ self.useContext(celeryd('job'))
341+ bmp = self.factory.makeBranchMergeProposal()
342+ with monitor_celery() as responses:
343+ MergeProposalNeedsReviewEmailJob.create(bmp)
344+ transaction.commit()
345+ responses[0].wait(30)
346+ self.assertEqual(2, len(pop_remote_notifications()))
347+
348+ def test_UpdatePreviewDiffJob(self):
349+ """UpdatePreviewDiffJob runs under Celery."""
350+ self.useContext(celeryd('job'))
351+ self.useBzrBranches(direct_database=True)
352+ bmp = create_example_merge(self)[0]
353+ self.factory.makeRevisionsForBranch(bmp.source_branch, count=1)
354+ self.useFixture(FeatureFixture(
355+ {'jobs.celery.enabled_classes': 'UpdatePreviewDiffJob'}))
356+ with monitor_celery() as responses:
357+ UpdatePreviewDiffJob.create(bmp)
358+ transaction.commit()
359+ responses[0].wait(30)
360+ self.assertIsNot(None, bmp.preview_diff)
361+
362+ def test_CodeReviewCommentEmailJob(self):
363+ """CodeReviewCommentEmailJob runs under Celery."""
364+ comment = self.factory.makeCodeReviewComment()
365+ self.useContext(celeryd('job'))
366+ self.useFixture(FeatureFixture(
367+ {'jobs.celery.enabled_classes': 'CodeReviewCommentEmailJob'}))
368+ with monitor_celery() as responses:
369+ CodeReviewCommentEmailJob.create(comment)
370+ transaction.commit()
371+ responses[0].wait(30)
372+ self.assertEqual(2, len(pop_remote_notifications()))
373+
374+ def test_ReviewRequestedEmailJob(self):
375+ """ReviewRequestedEmailJob runs under Celery."""
376+ request = self.factory.makeCodeReviewVoteReference()
377+ self.useContext(celeryd('job'))
378+ self.useFixture(FeatureFixture(
379+ {'jobs.celery.enabled_classes': 'ReviewRequestedEmailJob'}))
380+ with monitor_celery() as responses:
381+ ReviewRequestedEmailJob.create(request)
382+ transaction.commit()
383+ responses[0].wait(30)
384+ self.assertEqual(1, len(pop_remote_notifications()))
385+
386+ def test_MergeProposalUpdatedEmailJob(self):
387+ """MergeProposalUpdatedEmailJob runs under Celery."""
388+ bmp = self.factory.makeBranchMergeProposal()
389+ self.useContext(celeryd('job'))
390+ self.useFixture(FeatureFixture(
391+ {'jobs.celery.enabled_classes': 'MergeProposalUpdatedEmailJob'}))
392+ with monitor_celery() as responses:
393+ MergeProposalUpdatedEmailJob.create(
394+ bmp, 'change', bmp.registrant)
395+ transaction.commit()
396+ responses[0].wait(30)
397+ self.assertEqual(2, len(pop_remote_notifications()))
398+
399+ def test_GenerateIncrementalDiffJob(self):
400+ """GenerateIncrementalDiffJob runs under Celery."""
401+ self.useContext(celeryd('job'))
402+ self.useFixture(FeatureFixture(
403+ {'jobs.celery.enabled_classes': 'GenerateIncrementalDiffJob'}))
404+ with monitor_celery() as responses:
405+ job = make_runnable_incremental_diff_job(self)
406+ transaction.commit()
407+ responses[0].wait(30)
408+ self.assertEqual(JobStatus.COMPLETED, job.status)
409
410=== modified file 'lib/lp/codehosting/scanner/tests/test_email.py'
411--- lib/lp/codehosting/scanner/tests/test_email.py 2012-04-09 19:02:25 +0000
412+++ lib/lp/codehosting/scanner/tests/test_email.py 2012-04-13 19:25:26 +0000
413@@ -32,6 +32,7 @@
414 from lp.services.job.tests import (
415 celeryd,
416 monitor_celery,
417+ pop_remote_notifications,
418 )
419 from lp.testing import TestCaseWithFactory
420 from lp.testing.dbuser import switch_dbuser
421@@ -163,11 +164,6 @@
422
423 layer = ZopelessAppServerLayer
424
425- @staticmethod
426- def pop_notifications():
427- from lp.services.job.tests.celery_helpers import pop_notifications
428- return pop_notifications.delay().get(30)
429-
430 def prepare(self, job_name):
431 self.useFixture(FeatureFixture(
432 {'jobs.celery.enabled_classes': job_name}))
433@@ -186,7 +182,7 @@
434 with monitor_celery() as responses:
435 BzrSync(db_branch).syncBranchAndClose(tree.branch)
436 responses[-1].wait(30)
437- self.assertEqual(1, len(self.pop_notifications()))
438+ self.assertEqual(1, len(pop_remote_notifications()))
439
440 def test_uncommit_branch(self):
441 """RevisionMailJob for removed revisions runs via Celery."""
442@@ -196,11 +192,11 @@
443 with monitor_celery() as responses:
444 bzr_sync.syncBranchAndClose(tree.branch)
445 responses[0].wait(30)
446- self.pop_notifications()
447+ pop_remote_notifications()
448 uncommit(tree.branch)
449 bzr_sync.syncBranchAndClose(tree.branch)
450 responses[1].wait(30)
451- self.assertEqual(1, len(self.pop_notifications()))
452+ self.assertEqual(1, len(pop_remote_notifications()))
453
454 def test_revisions_added(self):
455 """RevisionMailJob for removed revisions runs via Celery."""
456@@ -208,12 +204,12 @@
457 tree.commit('message')
458 bzr_sync = BzrSync(db_branch)
459 bzr_sync.syncBranchAndClose(tree.branch)
460- self.pop_notifications()
461+ pop_remote_notifications()
462 tree.commit('message2')
463 with monitor_celery() as responses:
464 bzr_sync.syncBranchAndClose(tree.branch)
465 responses[-1].wait(30)
466- self.assertEqual(1, len(self.pop_notifications()))
467+ self.assertEqual(1, len(pop_remote_notifications()))
468
469
470 class TestScanBranches(TestCaseWithFactory):
471
472=== modified file 'lib/lp/codehosting/scanner/tests/test_mergedetection.py'
473--- lib/lp/codehosting/scanner/tests/test_mergedetection.py 2012-01-01 02:58:52 +0000
474+++ lib/lp/codehosting/scanner/tests/test_mergedetection.py 2012-04-13 19:25:26 +0000
475@@ -19,7 +19,6 @@
476 from lp.code.interfaces.branchlookup import IBranchLookup
477 from lp.code.model.branchmergeproposaljob import (
478 BranchMergeProposalJob,
479- BranchMergeProposalJobFactory,
480 BranchMergeProposalJobType,
481 )
482 from lp.codehosting.scanner import (
483@@ -285,7 +284,7 @@
484 BranchMergeProposalJob.branch_merge_proposal == proposal,
485 BranchMergeProposalJob.job_type ==
486 BranchMergeProposalJobType.MERGE_PROPOSAL_UPDATED).one()
487- derived_job = BranchMergeProposalJobFactory.create(job)
488+ derived_job = job.makeDerived()
489 derived_job.run()
490 notifications = pop_notifications()
491 self.assertIn('Work in progress => Merged',
492
493=== modified file 'lib/lp/services/job/celeryjob.py'
494--- lib/lp/services/job/celeryjob.py 2012-04-05 19:05:16 +0000
495+++ lib/lp/services/job/celeryjob.py 2012-04-13 19:25:26 +0000
496@@ -9,7 +9,10 @@
497
498 __metaclass__ = type
499
500-__all__ = ['CeleryRunJob']
501+__all__ = [
502+ 'CeleryRunJob',
503+ 'CeleryRunJobIgnoreResult',
504+ ]
505
506 import os
507
508@@ -28,3 +31,8 @@
509 def getJobRunner(self):
510 """Return a BaseJobRunner, to support customization."""
511 return BaseJobRunner()
512+
513+
514+class CeleryRunJobIgnoreResult(CeleryRunJob):
515+
516+ ignore_result = True
517
518=== modified file 'lib/lp/services/job/model/job.py'
519--- lib/lp/services/job/model/job.py 2012-04-12 15:28:10 +0000
520+++ lib/lp/services/job/model/job.py 2012-04-13 19:25:26 +0000
521@@ -260,14 +260,22 @@
522 @staticmethod
523 def getDerived(job_id):
524 """Return the derived branch job associated with the job id."""
525+ # Avoid circular imports.
526 from lp.code.model.branchjob import (
527 BranchJob,
528 )
529- store = IStore(BranchJob)
530- branch_job = store.find(BranchJob, BranchJob.job == job_id).one()
531- if branch_job is None:
532+ from lp.code.model.branchmergeproposaljob import (
533+ BranchMergeProposalJob,
534+ )
535+ store = IStore(Job)
536+ for cls in [BranchJob, BranchMergeProposalJob]:
537+ base_job = store.find(cls, cls.job == job_id).one()
538+ if base_job is not None:
539+ break
540+ if base_job is None:
541 raise ValueError('No BranchJob with job=%s.' % job_id)
542- return branch_job.makeDerived(), store
543+
544+ return base_job.makeDerived(), store
545
546 @classmethod
547 def switchDBUser(cls, job_id):
548
549=== modified file 'lib/lp/services/job/runner.py'
550--- lib/lp/services/job/runner.py 2012-04-10 20:24:43 +0000
551+++ lib/lp/services/job/runner.py 2012-04-13 19:25:26 +0000
552@@ -194,10 +194,13 @@
553 """Request that this job be run via celery."""
554 # Avoid importing from lp.services.job.celeryjob where not needed, to
555 # avoid configuring Celery when Rabbit is not configured.
556- from lp.services.job.celeryjob import CeleryRunJob
557- return CeleryRunJob.apply_async(
558- (self.job_id,), queue=self.task_queue,
559- ignore_result=ignore_result)
560+ from lp.services.job.celeryjob import (
561+ CeleryRunJob, CeleryRunJobIgnoreResult)
562+ if ignore_result:
563+ cls = CeleryRunJobIgnoreResult
564+ else:
565+ cls = CeleryRunJob
566+ return cls.apply_async((self.job_id,), queue=self.task_queue)
567
568 def celeryCommitHook(self, succeeded):
569 """Hook function to call when a commit completes."""
570
571=== modified file 'lib/lp/services/job/tests/__init__.py'
572--- lib/lp/services/job/tests/__init__.py 2012-04-10 20:24:43 +0000
573+++ lib/lp/services/job/tests/__init__.py 2012-04-13 19:25:26 +0000
574@@ -5,7 +5,8 @@
575
576 __all__ = [
577 'celeryd',
578- 'monitor_celery'
579+ 'monitor_celery',
580+ 'pop_remote_notifications',
581 ]
582
583
584@@ -49,3 +50,9 @@
585 yield responses
586 finally:
587 BaseRunnableJob.celery_responses = old_responses
588+
589+
590+def pop_remote_notifications():
591+ """Pop the notifications from a celeryd worker."""
592+ from lp.services.job.tests.celery_helpers import pop_notifications
593+ return pop_notifications.delay().get(30)
594
595=== modified file 'lib/lp/services/job/tests/test_job.py'
596--- lib/lp/services/job/tests/test_job.py 2012-04-12 17:15:01 +0000
597+++ lib/lp/services/job/tests/test_job.py 2012-04-13 19:25:26 +0000
598@@ -11,6 +11,7 @@
599 from storm.locals import Store
600 import transaction
601
602+from lp.code.model.branchmergeproposaljob import CodeReviewCommentEmailJob
603 from lp.services.database.constants import UTC_NOW
604 from lp.services.database.lpstorm import IStore
605 from lp.services.job.interfaces.job import (
606@@ -20,6 +21,7 @@
607 from lp.services.job.model.job import (
608 InvalidTransition,
609 Job,
610+ UniversalJobSource,
611 )
612 from lp.services.webapp.testing import verifyObject
613 from lp.testing import (
614@@ -449,3 +451,14 @@
615 job = Job()
616 job.acquireLease(-300)
617 self.assertEqual(0, job.getTimeout())
618+
619+
620+class TestUniversalJobSource(TestCaseWithFactory):
621+
622+ layer = ZopelessDatabaseLayer
623+
624+ def test_getDerived_with_merge_proposal_job(self):
625+ comment = self.factory.makeCodeReviewComment()
626+ job = CodeReviewCommentEmailJob.create(comment)
627+ newjob = UniversalJobSource.getDerived(job.job_id)[0]
628+ self.assertEqual(job, newjob)