Merge ~twom/launchpad:tasks-some-speedy-some-slow into launchpad:master

Proposed by Tom Wardill
Status: Merged
Approved by: Tom Wardill
Approved revision: fb58fd0d723fd4d6b2de36c7e789dd7e337d9b70
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~twom/launchpad:tasks-some-speedy-some-slow
Merge into: launchpad:master
Diff against target: 119 lines (+59/-4)
4 files modified
lib/lp/services/job/celeryconfig.py (+1/-3)
lib/lp/services/job/celeryjob.py (+4/-0)
lib/lp/services/job/runner.py (+7/-1)
lib/lp/services/job/tests/test_celery.py (+47/-0)
Reviewer Review Type Date Requested Status
Colin Watson (community) Approve
Review via email: mp+402919@code.launchpad.net

Commit message

Enable slow lane fallback in celery

Description of the change

* Use the correct key
* Don't re-queue the same task _and_ the slow lane task
* Add a test to ensure we're queueing correctly

To post a comment you must log in.
Revision history for this message
Colin Watson (cjwatson) :
review: Approve
Revision history for this message
Tom Wardill (twom) wrote :

This currently breaks a lot of celery related job tests:

lp.oci.tests.test_ocirecipebuildjob.TestOCIRegistryUploadJobViaCelery.test_run_upload
ValueError: expected txn status 'Active' or 'Doomed', but it's 'Committed'

Revision history for this message
Tom Wardill (twom) wrote :

> This currently breaks a lot of celery related job tests:
>
> lp.oci.tests.test_ocirecipebuildjob.TestOCIRegistryUploadJobViaCelery.test_run
> _upload
> ValueError: expected txn status 'Active' or 'Doomed', but it's 'Committed'

Fixed by not importing celery_app at module level.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/services/job/celeryconfig.py b/lib/lp/services/job/celeryconfig.py
2index dcc931e..aee9919 100644
3--- a/lib/lp/services/job/celeryconfig.py
4+++ b/lib/lp/services/job/celeryconfig.py
5@@ -72,9 +72,7 @@ def configure(argv):
6 # now that we're on Celery 3.1.
7 result['task_soft_time_limit'] = config[queue].timeout
8 if config[queue].fallback_queue != '':
9- # XXX wgrant 2015-08-03: lazr.jobrunner actually looks for
10- # FALLBACK_QUEUE; this probably isn't doing anything.
11- result['FALLBACK'] = config[queue].fallback_queue
12+ result['FALLBACK_QUEUE'] = config[queue].fallback_queue
13 # XXX wgrant 2015-08-03: This is mostly per-queue because we
14 # can't run *_job and *_job_slow in the same worker, which will be
15 # fixed once the CELERYD_TASK_SOFT_TIME_LIMIT override is gone.
16diff --git a/lib/lp/services/job/celeryjob.py b/lib/lp/services/job/celeryjob.py
17index 793330a..b4ae220 100644
18--- a/lib/lp/services/job/celeryjob.py
19+++ b/lib/lp/services/job/celeryjob.py
20@@ -67,9 +67,13 @@ class CeleryRunJob(RunJob):
21 :param dbuser: The database user to run under. This should match the
22 dbuser specified by the job's config.
23 """
24+ self.dbuser = dbuser
25 task_init(dbuser)
26 super(CeleryRunJob, self).run(job_id)
27
28+ def reQueue(self, job_id, fallback_queue):
29+ self.apply_async(args=(job_id, self.dbuser), queue=fallback_queue)
30+
31
32 @celery_app.task(base=CeleryRunJob, bind=True)
33 def celery_run_job(self, job_id, dbuser):
34diff --git a/lib/lp/services/job/runner.py b/lib/lp/services/job/runner.py
35index 1e8b0e6..2ec4506 100644
36--- a/lib/lp/services/job/runner.py
37+++ b/lib/lp/services/job/runner.py
38@@ -321,9 +321,15 @@ class BaseRunnableJob(BaseRunnableJobSource):
39 """See `IJob`."""
40 if self.job.attempt_count > 0:
41 self.job.scheduled_start = datetime.now(utc) + self.retry_delay
42+ # If we're aborting the transaction, we probably don't want to
43+ # start the task again
44+ if manage_transaction and abort_transaction:
45+ commit_hook = None
46+ else:
47+ commit_hook = self.celeryRunOnCommit
48 self.job.queue(
49 manage_transaction, abort_transaction,
50- add_commit_hook=self.celeryRunOnCommit)
51+ add_commit_hook=commit_hook)
52
53 def start(self, manage_transaction=False):
54 """See `IJob`."""
55diff --git a/lib/lp/services/job/tests/test_celery.py b/lib/lp/services/job/tests/test_celery.py
56index 5db8436..014a1c4 100644
57--- a/lib/lp/services/job/tests/test_celery.py
58+++ b/lib/lp/services/job/tests/test_celery.py
59@@ -9,9 +9,11 @@ from datetime import (
60 timedelta,
61 )
62 from time import sleep
63+from unittest import mock
64
65 import iso8601
66 from lazr.delegates import delegate_to
67+from lazr.jobrunner.celerytask import drain_queues
68 from pytz import UTC
69 from testtools.matchers import (
70 GreaterThan,
71@@ -221,3 +223,48 @@ class TestJobsViaCelery(TestCaseWithFactory):
72 store = IStore(Job)
73 dbjob = store.find(Job, id=job_id)[0]
74 self.assertEqual(JobStatus.WAITING, dbjob.status)
75+
76+
77+class TestTimeoutJob(TestJob):
78+
79+ def storeDateStarted(self):
80+ existing = self.job.base_json_data or {}
81+ existing.setdefault('dates_started', [])
82+ existing['dates_started'].append(self.job.date_started.isoformat())
83+ self.job.base_json_data = existing
84+
85+ def run(self):
86+ """Concoct various retry scenarios."""
87+
88+ if self.job.attempt_count == 1:
89+ from celery.exceptions import SoftTimeLimitExceeded
90+ raise SoftTimeLimitExceeded
91+
92+
93+class TestCeleryLaneFallback(TestCaseWithFactory):
94+
95+ layer = CeleryJobLayer
96+
97+ def test_fallback_to_slow_lane(self):
98+ # Check that we re-queue a slow task into the correct queue
99+ from lp.services.job.celeryjob import celery_app
100+ self.useFixture(FeatureFixture({
101+ 'jobs.celery.enabled_classes': 'TestTimeoutJob'}))
102+
103+ with block_on_job(self):
104+ job = TestTimeoutJob()
105+ job.celeryRunOnCommit()
106+ transaction.commit()
107+
108+ message_drain = mock.Mock()
109+
110+ drain_queues(
111+ celery_app,
112+ ['launchpad_job', 'launchpad_job_slow'], callbacks=[message_drain])
113+
114+ self.assertEqual(1, job.attempt_count)
115+ self.assertEqual(1, message_drain.call_count)
116+ self.assertEqual(
117+ 'launchpad_job_slow',
118+ message_drain.call_args[0][1].delivery_info['routing_key'])
119+

Subscribers

People subscribed via source and target branches

to status/vote changes: