Merge lp:~wgrant/launchpad/celery-tweaks into lp:launchpad

Proposed by William Grant
Status: Merged
Approved by: Steve Kowalik
Approved revision: no longer in the source branch.
Merged at revision: 16666
Proposed branch: lp:~wgrant/launchpad/celery-tweaks
Merge into: lp:launchpad
Diff against target: 104 lines (+50/-2)
3 files modified
lib/lp/services/job/celeryjob.py (+2/-0)
lib/lp/soyuz/model/queue.py (+1/-0)
lib/lp/soyuz/tests/test_packagecopyjob.py (+47/-2)
To merge this branch: bzr merge lp:~wgrant/launchpad/celery-tweaks
Reviewer Review Type Date Requested Status
Steve Kowalik (community) code Approve
Review via email: mp+167197@code.launchpad.net

Commit message

Two celery fixes: reschedule accepted PackageCopyJobs, and fix mail delivery.

Description of the change

Two celery fixes:

 - PackageCopyJobs need to be requeued in celery once they're resumed by acceptFromQueue.
 - celery Zope initialisation was calling execute_zcml_for_scripts but not set_immediate_mail_delivery, so it was using dev-configured Zope transactional appserver mail.

To post a comment you must log in.
Revision history for this message
Steve Kowalik (stevenk) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/celeryjob.py'
2--- lib/lp/services/job/celeryjob.py 2012-08-02 15:21:50 +0000
3+++ lib/lp/services/job/celeryjob.py 2013-06-04 05:56:27 +0000
4@@ -34,6 +34,7 @@
5 install_feature_controller,
6 make_script_feature_controller,
7 )
8+from lp.services.mail.sendmail import set_immediate_mail_delivery
9 from lp.services.job.model.job import (
10 Job,
11 UniversalJobSource,
12@@ -139,6 +140,7 @@
13 return
14 transaction.abort()
15 scripts.execute_zcml_for_scripts(use_web_security=False)
16+ set_immediate_mail_delivery(True)
17 needs_zcml = False
18
19
20
21=== modified file 'lib/lp/soyuz/model/queue.py'
22--- lib/lp/soyuz/model/queue.py 2013-05-31 02:51:23 +0000
23+++ lib/lp/soyuz/model/queue.py 2013-06-04 05:56:27 +0000
24@@ -542,6 +542,7 @@
25 self.setAccepted()
26 job = PlainPackageCopyJob.get(self.package_copy_job_id)
27 job.resume()
28+ job.celeryRunOnCommit()
29 # The copy job will send emails as appropriate. We don't
30 # need to worry about closing bugs from syncs, although we
31 # should probably give karma but that needs more work to
32
33=== modified file 'lib/lp/soyuz/tests/test_packagecopyjob.py'
34--- lib/lp/soyuz/tests/test_packagecopyjob.py 2013-05-24 01:02:43 +0000
35+++ lib/lp/soyuz/tests/test_packagecopyjob.py 2013-06-04 05:56:27 +0000
36@@ -59,7 +59,7 @@
37 from lp.soyuz.model.queue import PackageUpload
38 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
39 from lp.testing import (
40- celebrity_logged_in,
41+ admin_logged_in,
42 person_logged_in,
43 run_script,
44 TestCaseWithFactory,
45@@ -87,7 +87,7 @@
46 def create_proper_job(factory):
47 """Create a job that will complete successfully."""
48 publisher = SoyuzTestPublisher()
49- with celebrity_logged_in('admin'):
50+ with admin_logged_in():
51 publisher.prepareBreezyAutotest()
52 distroseries = publisher.breezy_autotest
53
54@@ -1642,6 +1642,51 @@
55 emails = pop_remote_notifications()
56 self.assertEqual(1, len(emails))
57
58+ def test_resume_from_queue(self):
59+ # Accepting a suspended copy from the queue sends it back
60+ # through celery.
61+ self.useFixture(FeatureFixture({
62+ 'jobs.celery.enabled_classes': 'PlainPackageCopyJob',
63+ }))
64+
65+ source_pub = self.factory.makeSourcePackagePublishingHistory(
66+ component=u"main", status=PackagePublishingStatus.PUBLISHED)
67+ target_series = self.factory.makeDistroSeries()
68+ getUtility(ISourcePackageFormatSelectionSet).add(
69+ target_series, SourcePackageFormat.FORMAT_1_0)
70+ requester = self.factory.makePerson()
71+ with person_logged_in(target_series.main_archive.owner):
72+ target_series.main_archive.newComponentUploader(requester, u"main")
73+ job = getUtility(IPlainPackageCopyJobSource).create(
74+ package_name=source_pub.source_package_name,
75+ package_version=source_pub.source_package_version,
76+ source_archive=source_pub.archive,
77+ target_archive=target_series.main_archive,
78+ target_distroseries=target_series,
79+ target_pocket=PackagePublishingPocket.PROPOSED,
80+ include_binaries=False, requester=requester)
81+
82+ # Run the job once. There's no ancestry so it will be suspended
83+ # and added to the queue.
84+ with block_on_job(self):
85+ transaction.commit()
86+ self.assertEqual(JobStatus.SUSPENDED, job.status)
87+
88+ # Approve its queue entry and rerun to completion.
89+ pu = getUtility(IPackageUploadSet).getByPackageCopyJobIDs(
90+ [job.id]).one()
91+ with admin_logged_in():
92+ pu.acceptFromQueue()
93+ self.assertEqual(JobStatus.WAITING, job.status)
94+
95+ with block_on_job(self):
96+ transaction.commit()
97+ self.assertEqual(JobStatus.COMPLETED, job.status)
98+ self.assertEqual(
99+ 1,
100+ target_series.main_archive.getPublishedSources(
101+ name=source_pub.source_package_name).count())
102+
103
104 class TestPlainPackageCopyJobPermissions(TestCaseWithFactory):
105