Merge lp:~abentley/launchpad/celery-everywhere-5 into lp:launchpad

Proposed by Aaron Bentley on 2012-04-23
Status: Merged
Approved by: Aaron Bentley on 2012-04-23
Approved revision: no longer in the source branch.
Merged at revision: 15146
Proposed branch: lp:~abentley/launchpad/celery-everywhere-5
Merge into: lp:launchpad
Prerequisite: lp:~abentley/launchpad/celery-everywhere-4
Diff against target: 198 lines (+66/-13)
5 files modified
lib/lp/bugs/model/apportjob.py (+14/-2)
lib/lp/bugs/tests/test_apportjob.py (+26/-5)
lib/lp/services/job/model/job.py (+3/-1)
lib/lp/services/job/tests/__init__.py (+18/-4)
lib/lp/testing/factory.py (+5/-1)
To merge this branch: bzr merge lp:~abentley/launchpad/celery-everywhere-5
Reviewer Review Type Date Requested Status
Benji York (community) code 2012-04-23 Approve on 2012-04-23
Review via email: mp+103161@code.launchpad.net

Commit Message

Support running ApportJobs via Celery

Description of the Change

= Summary =
Support running ApportJobs via Celery

== Pre-implementation notes ==
None

== LOC Rationale ==
Part of a resourced arc that will reduce LOC.

== Implementation details ==
Enhance block_on_job to make test development easier: report oopses and worker-side tracebacks for exceptions.

Enhance LaunchpadObjectFactory.makeBlob to support specifying files, to simplify code.

Update ApportJob, ApportJobDerived and ProcessApportBlobJob to support running via Celery.

== Tests ==
bin/test test_apportjob -t TestViaCelery

== Demo and Q/A ==
None

= Launchpad lint =

Checking for conflicts and issues in changed files.

Linting changed files:
  lib/lp/soyuz/model/distroseriesdifferencejob.py
  lib/lp/soyuz/model/distributionjob.py
  lib/lp/bugs/model/apportjob.py
  lib/lp/soyuz/tests/test_initializedistroseriesjob.py
  lib/lp/testing/factory.py
  lib/lp/soyuz/model/initializedistroseriesjob.py
  lib/lp/soyuz/tests/test_distroseriesdifferencejob.py
  lib/lp/bugs/tests/test_apportjob.py
  lib/lp/services/job/model/job.py
  lib/lp/services/job/tests/__init__.py

To post a comment you must log in.
Benji York (benji) wrote :

This branch looks good. Things came to mind while reading the diff:

I really like the comments in the tests. I like it when a test fails
and the comments give good context as to what the assertions mean and
why they are important.

In makeBlob, since blob_file isn't used later on in the method, then you
don't have to save a reference to the opened file. I.e., instead of
doing

    blob_file = open(blob_path)
    blob = blob_file.read()

you can do

    blob = open(blob_path).read()

That's a small improvement, but it also means that you won't be
redefining blob_file. Make that two small improvements. ;)

review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/bugs/model/apportjob.py'
2--- lib/lp/bugs/model/apportjob.py 2011-12-30 06:14:56 +0000
3+++ lib/lp/bugs/model/apportjob.py 2012-04-24 15:36:12 +0000
4@@ -37,10 +37,14 @@
5 FileBugData,
6 FileBugDataParser,
7 )
8+from lp.services.config import config
9 from lp.services.database.enumcol import EnumCol
10 from lp.services.database.lpstorm import IStore
11 from lp.services.database.stormbase import StormBase
12-from lp.services.job.model.job import Job
13+from lp.services.job.model.job import (
14+ EnumeratedSubclass,
15+ Job,
16+ )
17 from lp.services.job.runner import BaseRunnableJob
18 from lp.services.librarian.interfaces import ILibraryFileAliasSet
19 from lp.services.temporaryblobstorage.model import TemporaryBlobStorage
20@@ -110,9 +114,13 @@
21 'No occurrence of %s has key %s' % (cls.__name__, key))
22 return instance
23
24+ def makeDerived(self):
25+ return ApportJobDerived.makeSubclass(self)
26+
27
28 class ApportJobDerived(BaseRunnableJob):
29 """Intermediate class for deriving from ApportJob."""
30+ __metaclass__ = EnumeratedSubclass
31 delegates(IApportJob)
32 classProvides(IApportJobSource)
33
34@@ -124,7 +132,9 @@
35 """See `IApportJob`."""
36 # If there's already a job for the blob, don't create a new one.
37 job = ApportJob(blob, cls.class_job_type, {})
38- return cls(job)
39+ derived = cls(job)
40+ derived.celeryRunOnCommit()
41+ return derived
42
43 @classmethod
44 def get(cls, job_id):
45@@ -173,6 +183,8 @@
46 class_job_type = ApportJobType.PROCESS_BLOB
47 classProvides(IProcessApportBlobJobSource)
48
49+ config = config.process_apport_blobs
50+
51 @classmethod
52 def create(cls, blob):
53 """See `IProcessApportBlobJobSource`."""
54
55=== modified file 'lib/lp/bugs/tests/test_apportjob.py'
56--- lib/lp/bugs/tests/test_apportjob.py 2012-02-21 22:46:28 +0000
57+++ lib/lp/bugs/tests/test_apportjob.py 2012-04-24 15:36:12 +0000
58@@ -27,7 +27,9 @@
59 FileBugDataParser,
60 )
61 from lp.services.config import config
62+from lp.services.features.testing import FeatureFixture
63 from lp.services.job.interfaces.job import JobStatus
64+from lp.services.job.tests import block_on_job
65 from lp.services.librarian.interfaces import ILibraryFileAliasSet
66 from lp.services.scripts.tests import run_script
67 from lp.services.temporaryblobstorage.interfaces import (
68@@ -39,6 +41,7 @@
69 TestCaseWithFactory,
70 )
71 from lp.testing.layers import (
72+ CeleryJobLayer,
73 LaunchpadFunctionalLayer,
74 LaunchpadZopelessLayer,
75 )
76@@ -92,12 +95,8 @@
77 super(ProcessApportBlobJobTestCase, self).setUp()
78
79 # Create a BLOB using existing testing data.
80- testfiles = os.path.join(config.root, 'lib/lp/bugs/tests/testfiles')
81- blob_file = open(
82- os.path.join(testfiles, 'extra_filebug_data.msg'))
83- blob_data = blob_file.read()
84
85- self.blob = self.factory.makeBlob(blob_data)
86+ self.blob = self.factory.makeBlob(blob_file='extra_filebug_data.msg')
87 transaction.commit() # We need the blob available from the Librarian.
88
89 def _assertFileBugDataMatchesDict(self, filebug_data, data_dict):
90@@ -310,6 +309,28 @@
91 self._assertFileBugDataMatchesDict(filebug_data, processed_data)
92
93
94+class TestViaCelery(TestCaseWithFactory):
95+
96+ layer = CeleryJobLayer
97+
98+ def test_ProcessApportBlobJob(self):
99+ # ProcessApportBlobJob runs under Celery.
100+ blob = self.factory.makeBlob(blob_file='extra_filebug_data.msg')
101+ self.useFixture(FeatureFixture(
102+ {'jobs.celery.enabled_classes': 'ProcessApportBlobJob'}))
103+ with block_on_job(self):
104+ job = getUtility(IProcessApportBlobJobSource).create(blob)
105+ transaction.commit()
106+
107+ # Once the job has been run, its metadata will contain a dict
108+ # called processed_data, which will contain the data parsed from
109+ # the BLOB.
110+ processed_data = job.metadata.get('processed_data', None)
111+ self.assertIsNot(
112+ None, processed_data,
113+ "processed_data should not be None after the job has run.")
114+
115+
116 class TestTemporaryBlobStorageAddView(TestCaseWithFactory):
117 """Test case for the TemporaryBlobStorageAddView."""
118
119
120=== modified file 'lib/lp/services/job/model/job.py'
121--- lib/lp/services/job/model/job.py 2012-04-24 15:35:12 +0000
122+++ lib/lp/services/job/model/job.py 2012-04-24 15:36:12 +0000
123@@ -269,6 +269,7 @@
124 def getUserAndBaseJob(cls, job_id):
125 """Return the derived branch job associated with the job id."""
126 # Avoid circular imports.
127+ from lp.bugs.model.apportjob import ApportJob
128 from lp.code.model.branchjob import (
129 BranchJob,
130 )
131@@ -279,7 +280,8 @@
132 dbconfig.override(
133 dbuser=config.launchpad.dbuser, isolation_level='read_committed')
134
135- for baseclass in [BranchJob, BranchMergeProposalJob, DistributionJob]:
136+ for baseclass in [
137+ ApportJob, BranchJob, BranchMergeProposalJob, DistributionJob]:
138 derived, base_class, store = cls._getDerived(job_id, baseclass)
139 if derived is not None:
140 cls.clearStore(store)
141
142=== modified file 'lib/lp/services/job/tests/__init__.py'
143--- lib/lp/services/job/tests/__init__.py 2012-04-13 18:31:35 +0000
144+++ lib/lp/services/job/tests/__init__.py 2012-04-24 15:36:12 +0000
145@@ -13,6 +13,9 @@
146
147 from contextlib import contextmanager
148
149+from testtools.content import text_content
150+
151+from lp.testing.fixture import CaptureOops
152 from lp.services.job.runner import BaseRunnableJob
153
154
155@@ -51,10 +54,21 @@
156
157
158 @contextmanager
159-def block_on_job():
160- with monitor_celery() as responses:
161- yield
162- responses[-1].wait(30)
163+def block_on_job(test_case=None):
164+ with CaptureOops() as capture:
165+ with monitor_celery() as responses:
166+ yield
167+ try:
168+ responses[-1].wait(30)
169+ finally:
170+ if test_case is not None and responses[-1].traceback is not None:
171+ test_case.addDetail(
172+ 'Worker traceback', text_content(responses[-1].traceback))
173+ if test_case is not None:
174+ capture.sync()
175+ for oops in capture.oopses:
176+ test_case.addDetail(
177+ 'oops', text_content(str(oops)))
178
179
180 def pop_remote_notifications():
181
182=== modified file 'lib/lp/testing/factory.py'
183--- lib/lp/testing/factory.py 2012-04-19 16:46:32 +0000
184+++ lib/lp/testing/factory.py 2012-04-24 15:36:12 +0000
185@@ -4232,8 +4232,12 @@
186 self.getUniqueString(), self.getUniqueString())
187 return getUtility(ISSHKeySet).new(person, public_key)
188
189- def makeBlob(self, blob=None, expires=None):
190+ def makeBlob(self, blob=None, expires=None, blob_file=None):
191 """Create a new TemporaryFileStorage BLOB."""
192+ if blob_file is not None:
193+ blob_path = os.path.join(
194+ config.root, 'lib/lp/bugs/tests/testfiles', blob_file)
195+ blob = open(blob_path).read()
196 if blob is None:
197 blob = self.getUniqueString()
198 new_uuid = getUtility(ITemporaryStorageManager).new(blob, expires)