Merge lp:~abentley/launchpad/celery-everywhere-4 into lp:launchpad

Proposed by Aaron Bentley on 2012-04-20
Status: Merged
Approved by: Aaron Bentley on 2012-04-23
Approved revision: no longer in the source branch.
Merged at revision: 15146
Proposed branch: lp:~abentley/launchpad/celery-everywhere-4
Merge into: lp:launchpad
Diff against target: 440 lines (+147/-74)
7 files modified
lib/lp/services/job/model/job.py (+23/-21)
lib/lp/services/job/tests/test_job.py (+9/-4)
lib/lp/soyuz/model/distributionjob.py (+10/-1)
lib/lp/soyuz/model/distroseriesdifferencejob.py (+8/-3)
lib/lp/soyuz/model/initializedistroseriesjob.py (+6/-1)
lib/lp/soyuz/tests/test_distroseriesdifferencejob.py (+19/-0)
lib/lp/soyuz/tests/test_initializedistroseriesjob.py (+72/-44)
To merge this branch: bzr merge lp:~abentley/launchpad/celery-everywhere-4
Reviewer Review Type Date Requested Status
Benji York (community) code 2012-04-20 Approve on 2012-04-23
Review via email: mp+102918@code.launchpad.net

Commit Message

Support DistributionJobs via Celery

Description of the Change

= Summary =
Support DistributionJob via Celery

== Pre-implementation notes ==
None

== Implementation details ==
Handle missing Jobs more cleanly
Refactor retrieving jobs and changing db user to avoid db permission errors
Add DistributionJob to the list of base classes, add makeDerived method.
Make DistributionJobDerived an EnumeratedSubclass
Add config to InitializeDistroSeriesJob and dispatch via Celery
Add config to DistributionDifferenceJob and dispatch via Celery
Extract create_child from InitializeDistroSeriesJobTestsWithPackages to permit reuse in non-subclasses.

== Tests ==
bin/test --layer=CeleryJobLayer -m 'test_distroseriesdifferencejob|test_initializedistroseriesjob'

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/soyuz/model/distroseriesdifferencejob.py
  lib/lp/soyuz/model/distributionjob.py
  lib/lp/soyuz/tests/test_initializedistroseriesjob.py
  lib/lp/soyuz/model/initializedistroseriesjob.py
  lib/lp/soyuz/tests/test_distroseriesdifferencejob.py
  lib/lp/services/job/model/job.py

To post a comment you must log in.
Benji York (benji) wrote :

This branch looks good. It looks like you had a multi-line import that
shrunk and can now fit on one line:

=== modified file 'lib/lp/soyuz/tests/test_distroseriesdifferencejob.py'
--- lib/lp/soyuz/tests/test_distroseriesdifferencejob.py 2012-01-24 17:27:44 +0000
+++ lib/lp/soyuz/tests/test_distroseriesdifferencejob.py 2012-04-20 19:12:21 +0000
@@ -22,6 +22,9 @@
 from lp.services.database.lpstorm import IMasterStore
 from lp.services.features.testing import FeatureFixture
 from lp.services.job.interfaces.job import JobStatus
+from lp.services.job.tests import (
+ block_on_job,
+ )

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/model/job.py'
2--- lib/lp/services/job/model/job.py 2012-04-13 23:56:24 +0000
3+++ lib/lp/services/job/model/job.py 2012-04-24 15:22:07 +0000
4@@ -38,7 +38,7 @@
5 from zope.component import getUtility
6 from zope.interface import implements
7
8-from lp.services.config import dbconfig
9+from lp.services.config import config, dbconfig
10 from lp.services.database import bulk
11 from lp.services.database.constants import UTC_NOW
12 from lp.services.database.datetimecol import UtcDateTimeCol
13@@ -258,7 +258,15 @@
14 needs_init = True
15
16 @staticmethod
17- def getDerived(job_id):
18+ def _getDerived(job_id, base_class):
19+ store = IStore(base_class)
20+ base_job = store.find(base_class, base_class.job == job_id).one()
21+ if base_job is None:
22+ return None, None, None
23+ return base_job.makeDerived(), base_job.__class__, store
24+
25+ @classmethod
26+ def getUserAndBaseJob(cls, job_id):
27 """Return the derived branch job associated with the job id."""
28 # Avoid circular imports.
29 from lp.code.model.branchjob import (
30@@ -267,15 +275,16 @@
31 from lp.code.model.branchmergeproposaljob import (
32 BranchMergeProposalJob,
33 )
34- store = IStore(Job)
35- for cls in [BranchJob, BranchMergeProposalJob]:
36- base_job = store.find(cls, cls.job == job_id).one()
37- if base_job is not None:
38- break
39- if base_job is None:
40- raise ValueError('No BranchJob with job=%s.' % job_id)
41+ from lp.soyuz.model.distributionjob import DistributionJob
42+ dbconfig.override(
43+ dbuser=config.launchpad.dbuser, isolation_level='read_committed')
44
45- return base_job.makeDerived(), store
46+ for baseclass in [BranchJob, BranchMergeProposalJob, DistributionJob]:
47+ derived, base_class, store = cls._getDerived(job_id, baseclass)
48+ if derived is not None:
49+ cls.clearStore(store)
50+ return derived.config.dbuser, base_class
51+ raise ValueError('No Job with job=%s.' % job_id)
52
53 @staticmethod
54 def clearStore(store):
55@@ -284,19 +293,12 @@
56 store.close()
57
58 @classmethod
59- def switchDBUser(cls, job_id):
60- """Switch to the DB user associated with this Job ID."""
61- cls.clearStore(IStore(Job))
62- derived, store = cls.getDerived(job_id)
63- dbconfig.override(
64- dbuser=derived.config.dbuser, isolation_level='read_committed')
65- cls.clearStore(store)
66-
67- @classmethod
68 def get(cls, job_id):
69 transaction.abort()
70 if cls.needs_init:
71 scripts.execute_zcml_for_scripts(use_web_security=False)
72 cls.needs_init = False
73- cls.switchDBUser(job_id)
74- return cls.getDerived(job_id)[0]
75+ cls.clearStore(IStore(Job))
76+ dbuser, base_class = cls.getUserAndBaseJob(job_id)
77+ dbconfig.override(dbuser=dbuser, isolation_level='read_committed')
78+ return cls._getDerived(job_id, base_class)[0]
79
80=== modified file 'lib/lp/services/job/tests/test_job.py'
81--- lib/lp/services/job/tests/test_job.py 2012-04-13 23:46:36 +0000
82+++ lib/lp/services/job/tests/test_job.py 2012-04-24 15:22:07 +0000
83@@ -11,7 +11,11 @@
84 from storm.locals import Store
85 import transaction
86
87-from lp.code.model.branchmergeproposaljob import CodeReviewCommentEmailJob
88+from lp.code.model.branchmergeproposaljob import (
89+ BranchMergeProposalJob,
90+ CodeReviewCommentEmailJob,
91+ )
92+from lp.services.config import config
93 from lp.services.database.constants import UTC_NOW
94 from lp.services.database.lpstorm import IStore
95 from lp.services.job.interfaces.job import (
96@@ -457,8 +461,9 @@
97
98 layer = ZopelessDatabaseLayer
99
100- def test_getDerived_with_merge_proposal_job(self):
101+ def test_getUserAndBaseJob_with_merge_proposal_job(self):
102 comment = self.factory.makeCodeReviewComment()
103 job = CodeReviewCommentEmailJob.create(comment)
104- newjob = UniversalJobSource.getDerived(job.job_id)[0]
105- self.assertEqual(job, newjob)
106+ dbuser, base_class = UniversalJobSource.getUserAndBaseJob(job.job_id)
107+ self.assertEqual(dbuser, config.merge_proposal_jobs.dbuser)
108+ self.assertEqual(base_class, BranchMergeProposalJob)
109
110=== modified file 'lib/lp/soyuz/model/distributionjob.py'
111--- lib/lp/soyuz/model/distributionjob.py 2011-12-30 06:14:56 +0000
112+++ lib/lp/soyuz/model/distributionjob.py 2012-04-24 15:22:07 +0000
113@@ -23,7 +23,10 @@
114 from lp.services.database.enumcol import EnumCol
115 from lp.services.database.lpstorm import IStore
116 from lp.services.database.stormbase import StormBase
117-from lp.services.job.model.job import Job
118+from lp.services.job.model.job import (
119+ EnumeratedSubclass,
120+ Job,
121+ )
122 from lp.services.job.runner import BaseRunnableJob
123 from lp.soyuz.interfaces.distributionjob import (
124 DistributionJobType,
125@@ -61,9 +64,15 @@
126 self.job_type = job_type
127 self.metadata = metadata
128
129+ def makeDerived(self):
130+ return DistributionJobDerived.makeSubclass(self)
131+
132
133 class DistributionJobDerived(BaseRunnableJob):
134 """Abstract class for deriving from DistributionJob."""
135+
136+ __metaclass__ = EnumeratedSubclass
137+
138 delegates(IDistributionJob)
139
140 def __init__(self, job):
141
142=== modified file 'lib/lp/soyuz/model/distroseriesdifferencejob.py'
143--- lib/lp/soyuz/model/distroseriesdifferencejob.py 2012-02-24 03:47:44 +0000
144+++ lib/lp/soyuz/model/distroseriesdifferencejob.py 2012-04-24 15:22:07 +0000
145@@ -21,6 +21,7 @@
146 from lp.registry.model.distroseries import DistroSeries
147 from lp.registry.model.distroseriesdifference import DistroSeriesDifference
148 from lp.registry.model.sourcepackagename import SourcePackageName
149+from lp.services.config import config
150 from lp.services.database import bulk
151 from lp.services.database.lpstorm import (
152 IMasterStore,
153@@ -65,12 +66,14 @@
154 `derived_series`. The difference is between the versions of
155 `sourcepackagename` in `parent_series` and `derived_series`.
156 """
157- job = DistributionJob(
158+ db_job = DistributionJob(
159 distribution=derived_series.distribution, distroseries=derived_series,
160 job_type=DistributionJobType.DISTROSERIESDIFFERENCE,
161 metadata=make_metadata(sourcepackagename.id, parent_series.id))
162- IMasterStore(DistributionJob).add(job)
163- return DistroSeriesDifferenceJob(job)
164+ IMasterStore(DistributionJob).add(db_job)
165+ job = DistroSeriesDifferenceJob(db_job)
166+ job.celeryRunOnCommit()
167+ return job
168
169
170 def create_multiple_jobs(derived_series, parent_series):
171@@ -165,6 +168,8 @@
172
173 class_job_type = DistributionJobType.DISTROSERIESDIFFERENCE
174
175+ config = config.distroseriesdifferencejob
176+
177 @classmethod
178 def createForPackagePublication(cls, derived_series, sourcepackagename,
179 pocket):
180
181=== modified file 'lib/lp/soyuz/model/initializedistroseriesjob.py'
182--- lib/lp/soyuz/model/initializedistroseriesjob.py 2012-01-01 02:58:52 +0000
183+++ lib/lp/soyuz/model/initializedistroseriesjob.py 2012-04-24 15:22:07 +0000
184@@ -13,6 +13,7 @@
185 )
186
187 from lp.registry.model.distroseries import DistroSeries
188+from lp.services.config import config
189 from lp.services.database.lpstorm import (
190 IMasterStore,
191 IStore,
192@@ -46,6 +47,8 @@
193
194 user_error_types = (InitializationError,)
195
196+ config = config.initializedistroseries
197+
198 @classmethod
199 def create(cls, child, parents, arches=(), archindep_archtag=None,
200 packagesets=(), rebuild=False, overlays=(),
201@@ -106,7 +109,9 @@
202 distribution_job = DistributionJob(
203 child.distribution, child, cls.class_job_type, metadata)
204 store.add(distribution_job)
205- return cls(distribution_job)
206+ derived_job = cls(distribution_job)
207+ derived_job.celeryRunOnCommit()
208+ return derived_job
209
210 @classmethod
211 def get(cls, distroseries):
212
213=== modified file 'lib/lp/soyuz/tests/test_distroseriesdifferencejob.py'
214--- lib/lp/soyuz/tests/test_distroseriesdifferencejob.py 2012-01-24 17:27:44 +0000
215+++ lib/lp/soyuz/tests/test_distroseriesdifferencejob.py 2012-04-24 15:22:07 +0000
216@@ -22,6 +22,7 @@
217 from lp.services.database.lpstorm import IMasterStore
218 from lp.services.features.testing import FeatureFixture
219 from lp.services.job.interfaces.job import JobStatus
220+from lp.services.job.tests import block_on_job
221 from lp.services.scripts.tests import run_script
222 from lp.soyuz.enums import (
223 ArchivePurpose,
224@@ -44,6 +45,7 @@
225 from lp.testing import TestCaseWithFactory
226 from lp.testing.dbuser import switch_dbuser
227 from lp.testing.layers import (
228+ CeleryJobLayer,
229 LaunchpadZopelessLayer,
230 ZopelessDatabaseLayer,
231 )
232@@ -977,3 +979,20 @@
233
234 # The test is that we get here without exceptions.
235 pass
236+
237+
238+class TestViaCelery(TestCaseWithFactory):
239+
240+ layer = CeleryJobLayer
241+
242+ def test_DerivedDistroseriesDifferenceJob(self):
243+ self.useFixture(FeatureFixture({
244+ FEATURE_FLAG_ENABLE_MODULE: u'on',
245+ 'jobs.celery.enabled_classes': 'DistroSeriesDifferenceJob',
246+ }))
247+ dsp = self.factory.makeDistroSeriesParent()
248+ package = self.factory.makeSourcePackageName()
249+ with block_on_job():
250+ job = create_job(dsp.derived_series, package, dsp.parent_series)
251+ transaction.commit()
252+ self.assertEqual(JobStatus.COMPLETED, job.status)
253
254=== modified file 'lib/lp/soyuz/tests/test_initializedistroseriesjob.py'
255--- lib/lp/soyuz/tests/test_initializedistroseriesjob.py 2012-01-20 15:42:44 +0000
256+++ lib/lp/soyuz/tests/test_initializedistroseriesjob.py 2012-04-24 15:22:07 +0000
257@@ -10,6 +10,8 @@
258 from lp.buildmaster.enums import BuildStatus
259 from lp.registry.interfaces.distroseriesparent import IDistroSeriesParentSet
260 from lp.registry.interfaces.pocket import PackagePublishingPocket
261+from lp.services.features.testing import FeatureFixture
262+from lp.services.job.tests import block_on_job
263 from lp.services.scripts.tests import run_script
264 from lp.soyuz.enums import SourcePackageFormat
265 from lp.soyuz.interfaces.distributionjob import (
266@@ -26,9 +28,13 @@
267 from lp.soyuz.model.initializedistroseriesjob import InitializeDistroSeriesJob
268 from lp.soyuz.scripts.initialize_distroseries import InitializationError
269 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
270-from lp.testing import TestCaseWithFactory
271+from lp.testing import (
272+ celebrity_logged_in,
273+ TestCaseWithFactory,
274+ )
275 from lp.testing.dbuser import switch_dbuser
276 from lp.testing.layers import (
277+ CeleryJobLayer,
278 DatabaseFunctionalLayer,
279 LaunchpadZopelessLayer,
280 )
281@@ -221,36 +227,17 @@
282 self.assertEqual(message, removeSecurityProxy(job).error_description)
283
284
285-class InitializeDistroSeriesJobTestsWithPackages(TestCaseWithFactory):
286- """Test case for InitializeDistroSeriesJob."""
287-
288- layer = LaunchpadZopelessLayer
289-
290- @property
291- def job_source(self):
292- return getUtility(IInitializeDistroSeriesJobSource)
293-
294- def setupDas(self, parent, proc, arch_tag):
295- pf = getUtility(IProcessorFamilySet).getByName(proc)
296- parent_das = self.factory.makeDistroArchSeries(
297- distroseries=parent, processorfamily=pf,
298- architecturetag=arch_tag)
299- lf = self.factory.makeLibraryFileAlias()
300- transaction.commit()
301- parent_das.addOrUpdateChroot(lf)
302- parent_das.supports_virtualized = True
303- return parent_das
304-
305- def _create_child(self):
306- pf = self.factory.makeProcessorFamily()
307- pf.addProcessor('x86', '', '')
308- parent = self.factory.makeDistroSeries()
309- parent_das = self.factory.makeDistroArchSeries(
310- distroseries=parent, processorfamily=pf)
311- lf = self.factory.makeLibraryFileAlias()
312- # Since the LFA needs to be in the librarian, commit.
313- transaction.commit()
314- parent_das.addOrUpdateChroot(lf)
315+def create_child(factory):
316+ pf = factory.makeProcessorFamily()
317+ pf.addProcessor('x86', '', '')
318+ parent = factory.makeDistroSeries()
319+ parent_das = factory.makeDistroArchSeries(
320+ distroseries=parent, processorfamily=pf)
321+ lf = factory.makeLibraryFileAlias()
322+ # Since the LFA needs to be in the librarian, commit.
323+ transaction.commit()
324+ parent_das.addOrUpdateChroot(lf)
325+ with celebrity_logged_in('admin'):
326 parent_das.supports_virtualized = True
327 parent.nominatedarchindep = parent_das
328 publisher = SoyuzTestPublisher()
329@@ -264,18 +251,39 @@
330 test1 = getUtility(IPackagesetSet).new(
331 u'test1', u'test 1 packageset', parent.owner,
332 distroseries=parent)
333- self.test1_packageset_id = str(test1.id)
334+ test1_packageset_id = str(test1.id)
335 test1.addSources('udev')
336- parent.updatePackageCount()
337- child = self.factory.makeDistroSeries()
338- getUtility(ISourcePackageFormatSelectionSet).add(
339- child, SourcePackageFormat.FORMAT_1_0)
340- # Make sure everything hits the database, switching db users aborts.
341+ parent.updatePackageCount()
342+ child = factory.makeDistroSeries()
343+ getUtility(ISourcePackageFormatSelectionSet).add(
344+ child, SourcePackageFormat.FORMAT_1_0)
345+ # Make sure everything hits the database, switching db users aborts.
346+ transaction.commit()
347+ return parent, child, test1_packageset_id
348+
349+
350+class InitializeDistroSeriesJobTestsWithPackages(TestCaseWithFactory):
351+ """Test case for InitializeDistroSeriesJob."""
352+
353+ layer = LaunchpadZopelessLayer
354+
355+ @property
356+ def job_source(self):
357+ return getUtility(IInitializeDistroSeriesJobSource)
358+
359+ def setupDas(self, parent, proc, arch_tag):
360+ pf = getUtility(IProcessorFamilySet).getByName(proc)
361+ parent_das = self.factory.makeDistroArchSeries(
362+ distroseries=parent, processorfamily=pf,
363+ architecturetag=arch_tag)
364+ lf = self.factory.makeLibraryFileAlias()
365 transaction.commit()
366- return parent, child
367+ parent_das.addOrUpdateChroot(lf)
368+ parent_das.supports_virtualized = True
369+ return parent_das
370
371 def test_job(self):
372- parent, child = self._create_child()
373+ parent, child, test1_packageset_id = create_child(self.factory)
374 job = self.job_source.create(child, [parent.id])
375 switch_dbuser('initializedistroseries')
376
377@@ -285,10 +293,10 @@
378 self.assertEqual(parent.binarycount, child.binarycount)
379
380 def test_job_with_arguments(self):
381- parent, child = self._create_child()
382+ parent, child, test1_packageset_id = create_child(self.factory)
383 arch = parent.nominatedarchindep.architecturetag
384 job = self.job_source.create(
385- child, [parent.id], packagesets=(self.test1_packageset_id,),
386+ child, [parent.id], packagesets=(test1_packageset_id,),
387 arches=(arch,), rebuild=True)
388 switch_dbuser('initializedistroseries')
389
390@@ -302,7 +310,7 @@
391 self.assertEqual(builds.count(), 1)
392
393 def test_job_with_none_arguments(self):
394- parent, child = self._create_child()
395+ parent, child, test1_packageset_id = create_child(self.factory)
396 job = self.job_source.create(
397 child, [parent.id], archindep_archtag=None, packagesets=None,
398 arches=None, overlays=None, overlay_pockets=None,
399@@ -314,7 +322,7 @@
400 self.assertEqual(parent.sourcecount, child.sourcecount)
401
402 def test_job_with_none_archindep_archtag_argument(self):
403- parent, child = self._create_child()
404+ parent, child, test1_packageset_id = create_child(self.factory)
405 job = self.job_source.create(
406 child, [parent.id], archindep_archtag=None, packagesets=None,
407 arches=None, overlays=None, overlay_pockets=None,
408@@ -327,7 +335,7 @@
409 child.nominatedarchindep.architecturetag)
410
411 def test_job_with_archindep_archtag_argument(self):
412- parent, child = self._create_child()
413+ parent, child, test1_packageset_id = create_child(self.factory)
414 self.setupDas(parent, 'amd64', 'amd64')
415 self.setupDas(parent, 'powerpc', 'hppa')
416 job = self.job_source.create(
417@@ -344,3 +352,23 @@
418 def test_cronscript(self):
419 run_script(
420 'cronscripts/run_jobs.py', ['-v', 'initializedistroseries'])
421+
422+
423+class TestViaCelery(TestCaseWithFactory):
424+
425+ layer = CeleryJobLayer
426+
427+ def test_job(self):
428+ """Job runs successfully via Celery."""
429+ fixture = FeatureFixture({
430+ 'jobs.celery.enabled_classes': 'InitializeDistroSeriesJob',
431+ })
432+ self.useFixture(fixture)
433+ parent, child, test1 = create_child(self.factory)
434+ job_source = getUtility(IInitializeDistroSeriesJobSource)
435+ with block_on_job():
436+ job_source.create(child, [parent.id])
437+ transaction.commit()
438+ child.updatePackageCount()
439+ self.assertEqual(parent.sourcecount, child.sourcecount)
440+ self.assertEqual(parent.binarycount, child.binarycount)