Merge lp:~abentley/launchpad/celery-everywhere-2 into lp:launchpad
- celery-everywhere-2
- Merge into devel
| Status: | Merged |
|---|---|
| Merged at revision: | 15076 |
| Proposed branch: | lp:~abentley/launchpad/celery-everywhere-2 |
| Merge into: | lp:launchpad |
| Diff against target: |
670 lines (+254/-81) 12 files modified
lib/lp/code/model/branchjob.py (+51/-35) lib/lp/code/model/tests/test_branch.py (+6/-6) lib/lp/code/model/tests/test_branchjob.py (+46/-1) lib/lp/codehosting/scanner/email.py (+5/-3) lib/lp/codehosting/scanner/tests/test_email.py (+83/-10) lib/lp/services/features/flags.py (+1/-1) lib/lp/services/job/celeryconfig.py (+4/-4) lib/lp/services/job/model/job.py (+27/-9) lib/lp/services/job/runner.py (+8/-9) lib/lp/services/job/tests/__init__.py (+1/-0) lib/lp/services/job/tests/celery_helpers.py (+19/-0) lib/lp/services/job/tests/test_runner.py (+3/-3) |
| To merge this branch: | bzr merge lp:~abentley/launchpad/celery-everywhere-2 |
| Related bugs: |
| Reviewer | Review Type | Date Requested | Status |
|---|---|---|---|
| Deryck Hodge (community) | 2012-04-09 | Approve on 2012-04-09 | |
|
Review via email:
|
|||
Commit Message
Support revision mail jobs and translation jobs via Celery.
Description of the Change
= Summary =
Support revision mail jobs and translation jobs via Celery.
== Pre-implementation notes ==
None
== Implementation details ==
Add celeryRunJob calls for RosettaUploadJob, RevisionMailJob and RevisionsAddedJob, and test.
Provide pop_notifications task to retrieve the mail notifications from a celeryd.
Rename the feature flag to remove an extra "s" in "classes".
Rename the task queue names to "job" and "branch_write_job", to be more descriptive.
Run jobs with the appropriate database user. To do this, introduce a "config" member on BaseRunnableJob that gives the configuration for that job type.
== Tests ==
bin/test -vt ViaCelery
== Demo and Q/A ==
None
= Launchpad lint =
Checking for conflicts and issues in changed files.
Linting changed files:
lib/lp/
lib/lp/
lib/lp/
lib/lp/
lib/lp/
lib/lp/
lib/lp/
lib/lp/
lib/lp/
lib/lp/
lib/lp/
lib/lp/
Preview Diff
| 1 | === modified file 'lib/lp/code/model/branchjob.py' |
| 2 | --- lib/lp/code/model/branchjob.py 2012-04-06 17:28:25 +0000 |
| 3 | +++ lib/lp/code/model/branchjob.py 2012-04-10 16:18:21 +0000 |
| 4 | @@ -80,7 +80,10 @@ |
| 5 | from lp.code.model.branch import Branch |
| 6 | from lp.code.model.branchmergeproposal import BranchMergeProposal |
| 7 | from lp.code.model.revision import RevisionSet |
| 8 | -from lp.codehosting.bzrutils import server |
| 9 | +from lp.codehosting.bzrutils import ( |
| 10 | + read_locked, |
| 11 | + server, |
| 12 | + ) |
| 13 | from lp.codehosting.scanner.bzrsync import BzrSync |
| 14 | from lp.codehosting.vfs import ( |
| 15 | get_ro_server, |
| 16 | @@ -89,6 +92,7 @@ |
| 17 | from lp.codehosting.vfs.branchfs import get_real_branch_path |
| 18 | from lp.registry.interfaces.productseries import IProductSeriesSet |
| 19 | from lp.scripts.helpers import TransactionFreeOperation |
| 20 | +from lp.services.config import config |
| 21 | from lp.services.database.enumcol import EnumCol |
| 22 | from lp.services.database.lpstorm import IStore |
| 23 | from lp.services.database.sqlbase import SQLBase |
| 24 | @@ -296,6 +300,8 @@ |
| 25 | class_job_type = BranchJobType.SCAN_BRANCH |
| 26 | memory_limit = 2 * (1024 ** 3) |
| 27 | |
| 28 | + config = config.branchscanner |
| 29 | + |
| 30 | @classmethod |
| 31 | def create(cls, branch): |
| 32 | """See `IBranchScanJobSource`.""" |
| 33 | @@ -327,7 +333,9 @@ |
| 34 | |
| 35 | user_error_types = (NotBranchError,) |
| 36 | |
| 37 | - task_queue = 'branch_write' |
| 38 | + task_queue = 'branch_write_job' |
| 39 | + |
| 40 | + config = config.upgrade_branches |
| 41 | |
| 42 | def getOperationDescription(self): |
| 43 | return 'upgrading a branch' |
| 44 | @@ -413,6 +421,8 @@ |
| 45 | |
| 46 | class_job_type = BranchJobType.REVISION_MAIL |
| 47 | |
| 48 | + config = config.sendbranchmail |
| 49 | + |
| 50 | @classmethod |
| 51 | def create(cls, branch, revno, from_address, body, subject): |
| 52 | """See `IRevisionMailJobSource`.""" |
| 53 | @@ -458,6 +468,8 @@ |
| 54 | |
| 55 | class_job_type = BranchJobType.REVISIONS_ADDED_MAIL |
| 56 | |
| 57 | + config = config.sendbranchmail |
| 58 | + |
| 59 | @classmethod |
| 60 | def create(cls, branch, last_scanned_id, last_revision_id, |
| 61 | from_address): |
| 62 | @@ -519,16 +531,13 @@ |
| 63 | subscriptions = self.branch.getSubscriptionsByLevel(diff_levels) |
| 64 | if not subscriptions: |
| 65 | return |
| 66 | - |
| 67 | - self.bzr_branch.lock_read() |
| 68 | - try: |
| 69 | - for revision, revno in self.iterAddedMainline(): |
| 70 | - assert revno is not None |
| 71 | - mailer = self.getMailerForRevision( |
| 72 | - revision, revno, self.generateDiffs()) |
| 73 | - mailer.sendAll() |
| 74 | - finally: |
| 75 | - self.bzr_branch.unlock() |
| 76 | + with server(get_ro_server(), no_replace=True): |
| 77 | + with read_locked(self.bzr_branch): |
| 78 | + for revision, revno in self.iterAddedMainline(): |
| 79 | + assert revno is not None |
| 80 | + mailer = self.getMailerForRevision( |
| 81 | + revision, revno, self.generateDiffs()) |
| 82 | + mailer.sendAll() |
| 83 | |
| 84 | def getDiffForRevisions(self, from_revision_id, to_revision_id): |
| 85 | """Generate the diff between from_revision_id and to_revision_id.""" |
| 86 | @@ -718,6 +727,8 @@ |
| 87 | |
| 88 | class_job_type = BranchJobType.ROSETTA_UPLOAD |
| 89 | |
| 90 | + config = config.rosettabranches |
| 91 | + |
| 92 | def __init__(self, branch_job): |
| 93 | super(RosettaUploadJob, self).__init__(branch_job) |
| 94 | |
| 95 | @@ -763,7 +774,9 @@ |
| 96 | force_translations_upload) |
| 97 | branch_job = BranchJob( |
| 98 | branch, BranchJobType.ROSETTA_UPLOAD, metadata) |
| 99 | - return cls(branch_job) |
| 100 | + job = cls(branch_job) |
| 101 | + job.celeryRunOnCommit() |
| 102 | + return job |
| 103 | else: |
| 104 | return None |
| 105 | |
| 106 | @@ -889,27 +902,28 @@ |
| 107 | |
| 108 | def run(self): |
| 109 | """See `IRosettaUploadJob`.""" |
| 110 | - # This is not called upon job creation because the branch would |
| 111 | - # neither have been mirrored nor scanned then. |
| 112 | - self._init_translation_file_lists() |
| 113 | - # Get the product series that are connected to this branch and |
| 114 | - # that want to upload translations. |
| 115 | - productseriesset = getUtility(IProductSeriesSet) |
| 116 | - productseries = productseriesset.findByTranslationsImportBranch( |
| 117 | - self.branch, self.force_translations_upload) |
| 118 | - translation_import_queue = getUtility(ITranslationImportQueue) |
| 119 | - for series in productseries: |
| 120 | - approver = TranslationBranchApprover(self.file_names, |
| 121 | - productseries=series) |
| 122 | - for iter_info in self._iter_lists_and_uploaders(series): |
| 123 | - file_names, changed_files, uploader = iter_info |
| 124 | - for upload_file_name, upload_file_content in changed_files: |
| 125 | - if len(upload_file_content) == 0: |
| 126 | - continue # Skip empty files |
| 127 | - entry = translation_import_queue.addOrUpdateEntry( |
| 128 | - upload_file_name, upload_file_content, |
| 129 | - True, uploader, productseries=series) |
| 130 | - approver.approve(entry) |
| 131 | + with server(get_ro_server(), no_replace=True): |
| 132 | + # This is not called upon job creation because the branch would |
| 133 | + # neither have been mirrored nor scanned then. |
| 134 | + self._init_translation_file_lists() |
| 135 | + # Get the product series that are connected to this branch and |
| 136 | + # that want to upload translations. |
| 137 | + productseriesset = getUtility(IProductSeriesSet) |
| 138 | + productseries = productseriesset.findByTranslationsImportBranch( |
| 139 | + self.branch, self.force_translations_upload) |
| 140 | + translation_import_queue = getUtility(ITranslationImportQueue) |
| 141 | + for series in productseries: |
| 142 | + approver = TranslationBranchApprover(self.file_names, |
| 143 | + productseries=series) |
| 144 | + for iter_info in self._iter_lists_and_uploaders(series): |
| 145 | + file_names, changed_files, uploader = iter_info |
| 146 | + for upload_file_name, upload_file_content in changed_files: |
| 147 | + if len(upload_file_content) == 0: |
| 148 | + continue # Skip empty files |
| 149 | + entry = translation_import_queue.addOrUpdateEntry( |
| 150 | + upload_file_name, upload_file_content, |
| 151 | + True, uploader, productseries=series) |
| 152 | + approver.approve(entry) |
| 153 | |
| 154 | @staticmethod |
| 155 | def iterReady(): |
| 156 | @@ -949,7 +963,9 @@ |
| 157 | |
| 158 | class_job_type = BranchJobType.RECLAIM_BRANCH_SPACE |
| 159 | |
| 160 | - task_queue = 'branch_write' |
| 161 | + task_queue = 'branch_write_job' |
| 162 | + |
| 163 | + config = config.reclaimbranchspace |
| 164 | |
| 165 | def __repr__(self): |
| 166 | return '<RECLAIM_BRANCH_SPACE branch job (%(id)s) for %(branch)s>' % { |
| 167 | |
| 168 | === modified file 'lib/lp/code/model/tests/test_branch.py' |
| 169 | --- lib/lp/code/model/tests/test_branch.py 2012-04-06 17:28:25 +0000 |
| 170 | +++ lib/lp/code/model/tests/test_branch.py 2012-04-10 16:18:21 +0000 |
| 171 | @@ -316,8 +316,8 @@ |
| 172 | # lp.services.job.celeryconfig is loaded. |
| 173 | from celery.exceptions import TimeoutError |
| 174 | self.useFixture(FeatureFixture({ |
| 175 | - 'jobs.celery.enabled_classses': 'BranchScanJob'})) |
| 176 | - with celeryd('standard') as proc: |
| 177 | + 'jobs.celery.enabled_classes': 'BranchScanJob'})) |
| 178 | + with celeryd('job') as proc: |
| 179 | self.useBzrBranches() |
| 180 | db_branch, bzr_tree = self.create_branch_and_tree() |
| 181 | bzr_tree.commit( |
| 182 | @@ -348,8 +348,8 @@ |
| 183 | """Calling destroySelf causes Celery to delete the branch.""" |
| 184 | from celery.exceptions import TimeoutError |
| 185 | self.useFixture(FeatureFixture({ |
| 186 | - 'jobs.celery.enabled_classses': 'ReclaimBranchSpaceJob'})) |
| 187 | - with celeryd('branch_write'): |
| 188 | + 'jobs.celery.enabled_classes': 'ReclaimBranchSpaceJob'})) |
| 189 | + with celeryd('branch_write_job'): |
| 190 | self.useBzrBranches() |
| 191 | db_branch, tree = self.create_branch_and_tree() |
| 192 | branch_path = get_real_branch_path(db_branch.id) |
| 193 | @@ -818,7 +818,7 @@ |
| 194 | |
| 195 | def test_requestUpgradeUsesCelery(self): |
| 196 | self.useFixture(FeatureFixture({ |
| 197 | - 'jobs.celery.enabled_classses': 'BranchUpgradeJob'})) |
| 198 | + 'jobs.celery.enabled_classes': 'BranchUpgradeJob'})) |
| 199 | cwd = os.getcwd() |
| 200 | self.useBzrBranches() |
| 201 | db_branch, tree = create_knit(self) |
| 202 | @@ -829,7 +829,7 @@ |
| 203 | db_branch.requestUpgrade(db_branch.owner) |
| 204 | with monitor_celery() as responses: |
| 205 | transaction.commit() |
| 206 | - with celeryd('branch_write', cwd): |
| 207 | + with celeryd('branch_write_job', cwd): |
| 208 | responses[-1].wait(30) |
| 209 | new_branch = Branch.open(tree.branch.base) |
| 210 | self.assertEqual( |
| 211 | |
| 212 | === modified file 'lib/lp/code/model/tests/test_branchjob.py' |
| 213 | --- lib/lp/code/model/tests/test_branchjob.py 2012-04-06 17:28:25 +0000 |
| 214 | +++ lib/lp/code/model/tests/test_branchjob.py 2012-04-10 16:18:21 +0000 |
| 215 | @@ -60,6 +60,7 @@ |
| 216 | RosettaUploadJob, |
| 217 | ) |
| 218 | from lp.code.model.branchrevision import BranchRevision |
| 219 | +from lp.code.model.directbranchcommit import DirectBranchCommit |
| 220 | from lp.code.model.tests.test_branch import create_knit |
| 221 | from lp.code.model.revision import RevisionSet |
| 222 | from lp.codehosting.vfs import branch_id_to_path |
| 223 | @@ -67,18 +68,27 @@ |
| 224 | from lp.services.config import config |
| 225 | from lp.services.database.constants import UTC_NOW |
| 226 | from lp.services.database.lpstorm import IMasterStore |
| 227 | +from lp.services.features.testing import FeatureFixture |
| 228 | from lp.services.identity.interfaces.emailaddress import EmailAddressStatus |
| 229 | from lp.services.job.interfaces.job import JobStatus |
| 230 | from lp.services.job.model.job import Job |
| 231 | from lp.services.job.runner import JobRunner |
| 232 | +from lp.services.job.tests import ( |
| 233 | + celeryd, |
| 234 | + monitor_celery, |
| 235 | + ) |
| 236 | from lp.services.osutils import override_environ |
| 237 | from lp.services.webapp import canonical_url |
| 238 | -from lp.testing import TestCaseWithFactory |
| 239 | +from lp.testing import ( |
| 240 | + person_logged_in, |
| 241 | + TestCaseWithFactory, |
| 242 | + ) |
| 243 | from lp.testing.dbuser import ( |
| 244 | dbuser, |
| 245 | switch_dbuser, |
| 246 | ) |
| 247 | from lp.testing.layers import ( |
| 248 | + AppServerLayer, |
| 249 | DatabaseFunctionalLayer, |
| 250 | LaunchpadZopelessLayer, |
| 251 | ) |
| 252 | @@ -1222,6 +1232,41 @@ |
| 253 | self.assertEqual([], unfinished_jobs) |
| 254 | |
| 255 | |
| 256 | +class TestViaCelery(TestCaseWithFactory): |
| 257 | + |
| 258 | + layer = AppServerLayer |
| 259 | + |
| 260 | + def test_RosettaUploadJob(self): |
| 261 | + """Ensure RosettaUploadJob can run under Celery.""" |
| 262 | + self.useContext(celeryd('job')) |
| 263 | + self.useBzrBranches(direct_database=True) |
| 264 | + self.useFixture(FeatureFixture({ |
| 265 | + 'jobs.celery.enabled_classes': 'BranchScanJob RosettaUploadJob' |
| 266 | + })) |
| 267 | + db_branch = self.factory.makeAnyBranch() |
| 268 | + self.createBzrBranch(db_branch) |
| 269 | + commit = DirectBranchCommit(db_branch, no_race_check=True) |
| 270 | + commit.writeFile('foo.pot', 'gibberish') |
| 271 | + with monitor_celery() as responses: |
| 272 | + with person_logged_in(db_branch.owner): |
| 273 | + commit.commit('message') |
| 274 | + transaction.commit() |
| 275 | + # Wait for branch scan to complete. |
| 276 | + responses[0].wait(30) |
| 277 | + series = self.factory.makeProductSeries(branch=db_branch) |
| 278 | + RosettaUploadJob.create( |
| 279 | + commit.db_branch, NULL_REVISION, |
| 280 | + force_translations_upload=True) |
| 281 | + transaction.commit() |
| 282 | + # Wait for RosettaUploadJob to complete |
| 283 | + responses[1].wait(30) |
| 284 | + queue = getUtility(ITranslationImportQueue) |
| 285 | + entries = list(queue.getAllEntries(target=series)) |
| 286 | + self.assertEqual(len(entries), 1) |
| 287 | + entry = entries[0] |
| 288 | + self.assertEqual('foo.pot', entry.path) |
| 289 | + |
| 290 | + |
| 291 | class TestReclaimBranchSpaceJob(TestCaseWithFactory): |
| 292 | |
| 293 | layer = LaunchpadZopelessLayer |
| 294 | |
| 295 | === modified file 'lib/lp/codehosting/scanner/email.py' |
| 296 | --- lib/lp/codehosting/scanner/email.py 2012-01-01 02:58:52 +0000 |
| 297 | +++ lib/lp/codehosting/scanner/email.py 2012-04-10 16:18:21 +0000 |
| 298 | @@ -46,10 +46,11 @@ |
| 299 | # No diff is associated with the removed email. |
| 300 | subject = "[Branch %s] %s removed" % ( |
| 301 | revisions_removed.db_branch.unique_name, count) |
| 302 | - getUtility(IRevisionMailJobSource).create( |
| 303 | + job = getUtility(IRevisionMailJobSource).create( |
| 304 | revisions_removed.db_branch, revno='removed', |
| 305 | from_address=config.canonical.noreply_from_address, |
| 306 | body=contents, subject=subject) |
| 307 | + job.celeryRunOnCommit() |
| 308 | |
| 309 | |
| 310 | def queue_tip_changed_email_jobs(tip_changed): |
| 311 | @@ -66,11 +67,12 @@ |
| 312 | revisions) |
| 313 | subject = "[Branch %s] %s" % ( |
| 314 | tip_changed.db_branch.unique_name, revisions) |
| 315 | - getUtility(IRevisionMailJobSource).create( |
| 316 | + job = getUtility(IRevisionMailJobSource).create( |
| 317 | tip_changed.db_branch, 'initial', |
| 318 | config.canonical.noreply_from_address, message, subject) |
| 319 | else: |
| 320 | - getUtility(IRevisionsAddedJobSource).create( |
| 321 | + job = getUtility(IRevisionsAddedJobSource).create( |
| 322 | tip_changed.db_branch, tip_changed.db_branch.last_scanned_id, |
| 323 | tip_changed.bzr_branch.last_revision(), |
| 324 | config.canonical.noreply_from_address) |
| 325 | + job.celeryRunOnCommit() |
| 326 | |
| 327 | === modified file 'lib/lp/codehosting/scanner/tests/test_email.py' |
| 328 | --- lib/lp/codehosting/scanner/tests/test_email.py 2012-01-01 02:58:52 +0000 |
| 329 | +++ lib/lp/codehosting/scanner/tests/test_email.py 2012-04-10 16:18:21 +0000 |
| 330 | @@ -7,6 +7,7 @@ |
| 331 | |
| 332 | import email |
| 333 | |
| 334 | +from bzrlib.uncommit import uncommit |
| 335 | from zope.component import getUtility |
| 336 | from zope.event import notify |
| 337 | |
| 338 | @@ -20,13 +21,34 @@ |
| 339 | IRevisionsAddedJobSource, |
| 340 | ) |
| 341 | from lp.code.model.branchjob import RevisionMailJob |
| 342 | +from lp.codehosting.scanner.bzrsync import BzrSync |
| 343 | from lp.codehosting.scanner import events |
| 344 | from lp.codehosting.scanner.tests.test_bzrsync import BzrSyncTestCase |
| 345 | from lp.registry.interfaces.person import IPersonSet |
| 346 | +from lp.services.config import config |
| 347 | +from lp.services.features.testing import FeatureFixture |
| 348 | from lp.services.job.runner import JobRunner |
| 349 | from lp.services.mail import stub |
| 350 | +from lp.services.job.tests import ( |
| 351 | + celeryd, |
| 352 | + monitor_celery, |
| 353 | + ) |
| 354 | from lp.testing import TestCaseWithFactory |
| 355 | -from lp.testing.layers import LaunchpadZopelessLayer |
| 356 | +from lp.testing.dbuser import switch_dbuser |
| 357 | +from lp.testing.layers import ( |
| 358 | + LaunchpadZopelessLayer, |
| 359 | + ZopelessAppServerLayer, |
| 360 | + ) |
| 361 | + |
| 362 | + |
| 363 | +def add_subscriber(branch): |
| 364 | + test_user = getUtility(IPersonSet).getByEmail('test@canonical.com') |
| 365 | + branch.subscribe( |
| 366 | + test_user, |
| 367 | + BranchSubscriptionNotificationLevel.FULL, |
| 368 | + BranchSubscriptionDiffSize.FIVEKLINES, |
| 369 | + CodeReviewNotificationLevel.NOEMAIL, |
| 370 | + test_user) |
| 371 | |
| 372 | |
| 373 | class TestBzrSyncEmail(BzrSyncTestCase): |
| 374 | @@ -39,13 +61,7 @@ |
| 375 | def makeDatabaseBranch(self): |
| 376 | branch = BzrSyncTestCase.makeDatabaseBranch(self) |
| 377 | LaunchpadZopelessLayer.txn.begin() |
| 378 | - test_user = getUtility(IPersonSet).getByEmail('test@canonical.com') |
| 379 | - branch.subscribe( |
| 380 | - test_user, |
| 381 | - BranchSubscriptionNotificationLevel.FULL, |
| 382 | - BranchSubscriptionDiffSize.FIVEKLINES, |
| 383 | - CodeReviewNotificationLevel.NOEMAIL, |
| 384 | - test_user) |
| 385 | + add_subscriber(branch) |
| 386 | LaunchpadZopelessLayer.txn.commit() |
| 387 | return branch |
| 388 | |
| 389 | @@ -130,12 +146,12 @@ |
| 390 | recommit_email_msg = email.message_from_string(recommit_email[2]) |
| 391 | recommit_email_body = recommit_email_msg.get_payload()[0].get_payload( |
| 392 | decode=True) |
| 393 | - subject = '[Branch %s] Rev 1: second' % self.db_branch.unique_name |
| 394 | + subject = '[Branch %s] Rev 1: second' % self.db_branch.unique_name |
| 395 | self.assertEmailHeadersEqual(subject, recommit_email_msg['Subject']) |
| 396 | body_bits = [ |
| 397 | 'revno: 1', |
| 398 | 'committer: %s' % author, |
| 399 | - 'branch nick: %s' % self.bzr_branch.nick, |
| 400 | + 'branch nick: %s' % self.bzr_branch.nick, |
| 401 | 'message:\n second', |
| 402 | 'added:\n hello.txt', |
| 403 | ] |
| 404 | @@ -143,6 +159,63 @@ |
| 405 | self.assertTextIn(bit, recommit_email_body) |
| 406 | |
| 407 | |
| 408 | +class TestViaCelery(TestCaseWithFactory): |
| 409 | + |
| 410 | + layer = ZopelessAppServerLayer |
| 411 | + |
| 412 | + @staticmethod |
| 413 | + def pop_notifications(): |
| 414 | + from lp.services.job.tests.celery_helpers import pop_notifications |
| 415 | + return pop_notifications.delay().get(30) |
| 416 | + |
| 417 | + def prepare(self, job_name): |
| 418 | + self.useFixture(FeatureFixture( |
| 419 | + {'jobs.celery.enabled_classes': job_name})) |
| 420 | + self.useContext(celeryd('job')) |
| 421 | + self.useBzrBranches(direct_database=True) |
| 422 | + db_branch, tree = self.create_branch_and_tree() |
| 423 | + add_subscriber(db_branch) |
| 424 | + switch_dbuser(config.branchscanner.dbuser) |
| 425 | + # Needed for feature flag teardown |
| 426 | + self.addCleanup(switch_dbuser, config.launchpad.dbuser) |
| 427 | + return db_branch, tree |
| 428 | + |
| 429 | + def test_empty_branch(self): |
| 430 | + """RevisionMailJob for empty branches runs via Celery.""" |
| 431 | + db_branch, tree = self.prepare('RevisionMailJob') |
| 432 | + with monitor_celery() as responses: |
| 433 | + BzrSync(db_branch).syncBranchAndClose(tree.branch) |
| 434 | + responses[-1].wait(30) |
| 435 | + self.assertEqual(1, len(self.pop_notifications())) |
| 436 | + |
| 437 | + def test_uncommit_branch(self): |
| 438 | + """RevisionMailJob for removed revisions runs via Celery.""" |
| 439 | + db_branch, tree = self.prepare('RevisionMailJob') |
| 440 | + tree.commit('message') |
| 441 | + bzr_sync = BzrSync(db_branch) |
| 442 | + with monitor_celery() as responses: |
| 443 | + bzr_sync.syncBranchAndClose(tree.branch) |
| 444 | + responses[0].wait(30) |
| 445 | + self.pop_notifications() |
| 446 | + uncommit(tree.branch) |
| 447 | + bzr_sync.syncBranchAndClose(tree.branch) |
| 448 | + responses[1].wait(30) |
| 449 | + self.assertEqual(1, len(self.pop_notifications())) |
| 450 | + |
| 451 | + def test_revisions_added(self): |
| 452 | + """RevisionMailJob for removed revisions runs via Celery.""" |
| 453 | + db_branch, tree = self.prepare('RevisionsAddedJob') |
| 454 | + tree.commit('message') |
| 455 | + bzr_sync = BzrSync(db_branch) |
| 456 | + bzr_sync.syncBranchAndClose(tree.branch) |
| 457 | + self.pop_notifications() |
| 458 | + tree.commit('message2') |
| 459 | + with monitor_celery() as responses: |
| 460 | + bzr_sync.syncBranchAndClose(tree.branch) |
| 461 | + responses[-1].wait(30) |
| 462 | + self.assertEqual(1, len(self.pop_notifications())) |
| 463 | + |
| 464 | + |
| 465 | class TestScanBranches(TestCaseWithFactory): |
| 466 | |
| 467 | layer = LaunchpadZopelessLayer |
| 468 | |
| 469 | === modified file 'lib/lp/services/features/flags.py' |
| 470 | --- lib/lp/services/features/flags.py 2012-04-05 13:05:04 +0000 |
| 471 | +++ lib/lp/services/features/flags.py 2012-04-10 16:18:21 +0000 |
| 472 | @@ -125,7 +125,7 @@ |
| 473 | '', |
| 474 | '', |
| 475 | ''), |
| 476 | - ('jobs.celery.enabled_classses', |
| 477 | + ('jobs.celery.enabled_classes', |
| 478 | 'space delimited', |
| 479 | 'Names of Job classes that should be run via celery', |
| 480 | 'No jobs run via celery', |
| 481 | |
| 482 | === modified file 'lib/lp/services/job/celeryconfig.py' |
| 483 | --- lib/lp/services/job/celeryconfig.py 2012-04-06 17:28:25 +0000 |
| 484 | +++ lib/lp/services/job/celeryconfig.py 2012-04-10 16:18:21 +0000 |
| 485 | @@ -8,9 +8,9 @@ |
| 486 | CELERY_IMPORTS = ("lp.services.job.celeryjob", ) |
| 487 | CELERY_RESULT_BACKEND = "amqp" |
| 488 | CELERY_QUEUES = { |
| 489 | - "branch_write": {"binding_key": "branch_write"}, |
| 490 | - "standard": {"binding_key": "standard"}, |
| 491 | + "branch_write_job": {"binding_key": "branch_write_job"}, |
| 492 | + "job": {"binding_key": "job"}, |
| 493 | } |
| 494 | -CELERY_DEFAULT_EXCHANGE = "standard" |
| 495 | -CELERY_DEFAULT_QUEUE = "standard" |
| 496 | +CELERY_DEFAULT_EXCHANGE = "job" |
| 497 | +CELERY_DEFAULT_QUEUE = "job" |
| 498 | CELERY_CREATE_MISSING_QUEUES = False |
| 499 | |
| 500 | === modified file 'lib/lp/services/job/model/job.py' |
| 501 | --- lib/lp/services/job/model/job.py 2012-03-21 20:39:49 +0000 |
| 502 | +++ lib/lp/services/job/model/job.py 2012-04-10 16:18:21 +0000 |
| 503 | @@ -33,7 +33,9 @@ |
| 504 | Int, |
| 505 | Reference, |
| 506 | ) |
| 507 | +from storm.zope.interfaces import IZStorm |
| 508 | import transaction |
| 509 | +from zope.component import getUtility |
| 510 | from zope.interface import implements |
| 511 | |
| 512 | from lp.services.config import dbconfig |
| 513 | @@ -253,17 +255,33 @@ |
| 514 | |
| 515 | needs_init = True |
| 516 | |
| 517 | - @classmethod |
| 518 | - def get(cls, job_id): |
| 519 | - if cls.needs_init: |
| 520 | - scripts.execute_zcml_for_scripts(use_web_security=False) |
| 521 | - cls.needs_init = False |
| 522 | - |
| 523 | - dbconfig.override( |
| 524 | - dbuser='branchscanner', isolation_level='read_committed') |
| 525 | + @staticmethod |
| 526 | + def getDerived(job_id): |
| 527 | + """Return the derived branch job associated with the job id.""" |
| 528 | from lp.code.model.branchjob import ( |
| 529 | BranchJob, |
| 530 | ) |
| 531 | store = IStore(BranchJob) |
| 532 | branch_job = store.find(BranchJob, BranchJob.job == job_id).one() |
| 533 | - return branch_job.makeDerived() |
| 534 | + if branch_job is None: |
| 535 | + raise ValueError('No BranchJob with job=%s.' % job_id) |
| 536 | + return branch_job.makeDerived(), store |
| 537 | + |
| 538 | + @classmethod |
| 539 | + def switchDBUser(cls, job_id): |
| 540 | + """Switch to the DB user associated with this Job ID.""" |
| 541 | + derived, store = cls.getDerived(job_id) |
| 542 | + dbconfig.override( |
| 543 | + dbuser=derived.config.dbuser, isolation_level='read_committed') |
| 544 | + transaction.abort() |
| 545 | + getUtility(IZStorm).remove(store) |
| 546 | + store.close() |
| 547 | + |
| 548 | + @classmethod |
| 549 | + def get(cls, job_id): |
| 550 | + transaction.abort() |
| 551 | + if cls.needs_init: |
| 552 | + scripts.execute_zcml_for_scripts(use_web_security=False) |
| 553 | + cls.needs_init = False |
| 554 | + cls.switchDBUser(job_id) |
| 555 | + return cls.getDerived(job_id)[0] |
| 556 | |
| 557 | === modified file 'lib/lp/services/job/runner.py' |
| 558 | --- lib/lp/services/job/runner.py 2012-04-06 17:28:25 +0000 |
| 559 | +++ lib/lp/services/job/runner.py 2012-04-10 16:18:21 +0000 |
| 560 | @@ -103,7 +103,7 @@ |
| 561 | |
| 562 | retry_error_types = () |
| 563 | |
| 564 | - task_queue = 'standard' |
| 565 | + task_queue = 'job' |
| 566 | |
| 567 | celery_responses = None |
| 568 | |
| 569 | @@ -190,23 +190,22 @@ |
| 570 | return oops_config.create( |
| 571 | context=dict(exc_info=info)) |
| 572 | |
| 573 | - def runViaCelery(self): |
| 574 | + def runViaCelery(self, ignore_result=False): |
| 575 | """Request that this job be run via celery.""" |
| 576 | # Avoid importing from lp.services.job.celeryjob where not needed, to |
| 577 | # avoid configuring Celery when Rabbit is not configured. |
| 578 | from lp.services.job.celeryjob import CeleryRunJob |
| 579 | - ignore_result = bool(BaseRunnableJob.celery_responses is None) |
| 580 | - response = CeleryRunJob.apply_async( |
| 581 | + return CeleryRunJob.apply_async( |
| 582 | (self.job_id,), queue=self.task_queue, |
| 583 | ignore_result=ignore_result) |
| 584 | - if not ignore_result: |
| 585 | - BaseRunnableJob.celery_responses.append(response) |
| 586 | - return response |
| 587 | |
| 588 | def celeryCommitHook(self, succeeded): |
| 589 | """Hook function to call when a commit completes.""" |
| 590 | if succeeded: |
| 591 | - self.runViaCelery() |
| 592 | + ignore_result = bool(BaseRunnableJob.celery_responses is None) |
| 593 | + response = self.runViaCelery(ignore_result) |
| 594 | + if not ignore_result: |
| 595 | + BaseRunnableJob.celery_responses.append(response) |
| 596 | |
| 597 | def celeryRunOnCommit(self): |
| 598 | """Configure transaction so that commit runs this job via Celery.""" |
| 599 | @@ -623,7 +622,7 @@ |
| 600 | |
| 601 | The name of a BaseRunnableJob must be specified. |
| 602 | """ |
| 603 | - flag = getFeatureFlag('jobs.celery.enabled_classses') |
| 604 | + flag = getFeatureFlag('jobs.celery.enabled_classes') |
| 605 | if flag is None: |
| 606 | return False |
| 607 | return class_name in flag.split(' ') |
| 608 | |
| 609 | === modified file 'lib/lp/services/job/tests/__init__.py' |
| 610 | --- lib/lp/services/job/tests/__init__.py 2012-04-06 17:28:25 +0000 |
| 611 | +++ lib/lp/services/job/tests/__init__.py 2012-04-10 16:18:21 +0000 |
| 612 | @@ -32,6 +32,7 @@ |
| 613 | '--concurrency', '1', |
| 614 | '--loglevel', 'INFO', |
| 615 | '--queues', queue, |
| 616 | + '--include', 'lp.services.job.tests.celery_helpers', |
| 617 | ) |
| 618 | with running('bin/celeryd', cmd_args, cwd=cwd) as proc: |
| 619 | # Wait for celeryd startup to complete. |
| 620 | |
| 621 | === added file 'lib/lp/services/job/tests/celery_helpers.py' |
| 622 | --- lib/lp/services/job/tests/celery_helpers.py 1970-01-01 00:00:00 +0000 |
| 623 | +++ lib/lp/services/job/tests/celery_helpers.py 2012-04-10 16:18:21 +0000 |
| 624 | @@ -0,0 +1,19 @@ |
| 625 | +# Copyright 2012 Canonical Ltd. This software is licensed under the |
| 626 | +# GNU Affero General Public License version 3 (see the file LICENSE). |
| 627 | + |
| 628 | +__metaclass__ = type |
| 629 | + |
| 630 | +__all__ = ['pop_notifications'] |
| 631 | + |
| 632 | +# Force the correct celeryconfig to be used. |
| 633 | +import lp.services.job.celeryjob |
| 634 | +# Quiet lint unused import warning. |
| 635 | +lp.services.job.celeryjob |
| 636 | + |
| 637 | +from celery.task import task |
| 638 | + |
| 639 | + |
| 640 | +@task |
| 641 | +def pop_notifications(): |
| 642 | + from lp.testing.mail_helpers import pop_notifications |
| 643 | + return pop_notifications() |
| 644 | |
| 645 | === modified file 'lib/lp/services/job/tests/test_runner.py' |
| 646 | --- lib/lp/services/job/tests/test_runner.py 2012-04-03 16:06:43 +0000 |
| 647 | +++ lib/lp/services/job/tests/test_runner.py 2012-04-10 16:18:21 +0000 |
| 648 | @@ -720,19 +720,19 @@ |
| 649 | def test_matching_flag(self): |
| 650 | """A matching flag returns True.""" |
| 651 | self.useFixture(FeatureFixture( |
| 652 | - {'jobs.celery.enabled_classses': 'foo bar'})) |
| 653 | + {'jobs.celery.enabled_classes': 'foo bar'})) |
| 654 | self.assertTrue(celery_enabled('foo')) |
| 655 | self.assertTrue(celery_enabled('bar')) |
| 656 | |
| 657 | def test_non_matching_flag(self): |
| 658 | """A non-matching flag returns false.""" |
| 659 | self.useFixture(FeatureFixture( |
| 660 | - {'jobs.celery.enabled_classses': 'foo bar'})) |
| 661 | + {'jobs.celery.enabled_classes': 'foo bar'})) |
| 662 | self.assertFalse(celery_enabled('baz')) |
| 663 | self.assertTrue(celery_enabled('bar')) |
| 664 | |
| 665 | def test_substring(self): |
| 666 | """A substring of an enabled class does not match.""" |
| 667 | self.useFixture(FeatureFixture( |
| 668 | - {'jobs.celery.enabled_classses': 'foobar'})) |
| 669 | + {'jobs.celery.enabled_classes': 'foobar'})) |
| 670 | self.assertFalse(celery_enabled('bar')) |
