Merge lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue into lp:launchpad

Proposed by Abel Deuring
Status: Merged
Approved by: Aaron Bentley
Approved revision: no longer in the source branch.
Merged at revision: 15263
Proposed branch: lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue
Merge into: lp:launchpad
Diff against target: 227 lines (+153/-4)
3 files modified
lib/lp/services/job/model/job.py (+8/-2)
lib/lp/services/job/runner.py (+18/-2)
lib/lp/services/job/tests/test_retry_jobs_with_celery.py (+127/-0)
To merge this branch: bzr merge lp:~adeuring/launchpad/put-retry-jobs-into-celery-queue
Reviewer Review Type Date Requested Status
Aaron Bentley (community) Approve
Review via email: mp+106012@code.launchpad.net

Commit message

generate a Celery task when a job raises a retry error

Description of the change

One of the last missing details to run jobs via Celery: When a job
raises a retry execption, a new Celery request for the next attempt
to run the job must be issued.

A new request is created by BaseRunnableJob.runViaCelery(), which is
called in a hook when the DB transaction is committed. When a retry
execption occurs, the job runner calls job.queue(), which was up to
now only implemented in lp.services.job.model.job.Job, but this
class is/was not aware of any Celery related details, so I added
a method BaseRunnableJob.queue().

Calling celeryRunOnCommit() directly in BaseRunnableJob.queue() gives
other problems: The hook must be added before a transaction committed --
but lp.services.job.model.job.Job.queue() generally calls txn.commit()
twice, and job status is set to WAITING in the second commit call.
TO get the Celery hook placed into the "right" commit, I added the
optional parameter add_commit_hook to
lp.services.job.model.job.Job.queue().

BaseRunnableJob.runViaCelery() now checks if a lease for the job
exists. If so, it uses the lease expiration time as the ETA for the
new Celery request. The implementation looks a bit convoluted: Launchpad
uses datetime instances _with_ a timezone, but Celery works only with
datetime instances _without_ a timezone...

I changed UniversalJobSource so that we can create test jobs which do
not have a related DB table like "regular jobs".

test: ./bin/test -vvt lp.services.job.tests.test_retry_jobs_with_celery

no lint

To post a comment you must log in.
Revision history for this message
Aaron Bentley (abentley) :
review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/services/job/model/job.py'
2--- lib/lp/services/job/model/job.py 2012-05-14 14:57:15 +0000
3+++ lib/lp/services/job/model/job.py 2012-05-16 19:32:19 +0000
4@@ -1,4 +1,4 @@
5-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
6+# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
7 # GNU Affero General Public License version 3 (see the file LICENSE).
8
9 """ORM object representing jobs."""
10@@ -185,7 +185,8 @@
11 if manage_transaction:
12 transaction.commit()
13
14- def queue(self, manage_transaction=False, abort_transaction=False):
15+ def queue(self, manage_transaction=False, abort_transaction=False,
16+ add_commit_hook=None):
17 """See `IJob`."""
18 if manage_transaction:
19 if abort_transaction:
20@@ -194,6 +195,8 @@
21 transaction.commit()
22 self._set_status(JobStatus.WAITING)
23 self.date_finished = datetime.datetime.now(UTC)
24+ if add_commit_hook is not None:
25+ add_commit_hook()
26 if manage_transaction:
27 transaction.commit()
28
29@@ -262,6 +265,9 @@
30 job_id, module_name, class_name = ujob_id
31 bc_module = __import__(module_name, fromlist=[class_name])
32 db_class = getattr(bc_module, class_name)
33+ factory = getattr(db_class, 'makeInstance', None)
34+ if factory is not None:
35+ return factory(job_id)
36 store = IStore(db_class)
37 db_job = store.find(db_class, db_class.job == job_id).one()
38 if db_job is None:
39
40=== modified file 'lib/lp/services/job/runner.py'
41--- lib/lp/services/job/runner.py 2012-05-11 20:37:47 +0000
42+++ lib/lp/services/job/runner.py 2012-05-16 19:32:19 +0000
43@@ -1,4 +1,4 @@
44-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
45+# Copyright 2009-2012 Canonical Ltd. This software is licensed under the
46 # GNU Affero General Public License version 3 (see the file LICENSE).
47
48 """Facilities for running Jobs."""
49@@ -19,6 +19,10 @@
50
51 from calendar import timegm
52 from collections import defaultdict
53+from datetime import (
54+ datetime,
55+ timedelta,
56+ )
57 import contextlib
58 import logging
59 import os
60@@ -107,6 +111,8 @@
61
62 celery_responses = None
63
64+ retry_delay = timedelta(minutes=10)
65+
66 # We redefine __eq__ and __ne__ here to prevent the security proxy
67 # from mucking up our comparisons in tests and elsewhere.
68 def __eq__(self, job):
69@@ -202,8 +208,12 @@
70 cls = CeleryRunJob
71 db_class = self.getDBClass()
72 ujob_id = (self.job_id, db_class.__module__, db_class.__name__)
73+ if self.job.lease_expires is not None:
74+ eta = datetime.now() + self.retry_delay
75+ else:
76+ eta = None
77 return cls.apply_async(
78- (ujob_id, self.config.dbuser), queue=self.task_queue)
79+ (ujob_id, self.config.dbuser), queue=self.task_queue, eta=eta)
80
81 def getDBClass(self):
82 return self.context.__class__
83@@ -223,6 +233,12 @@
84 current = transaction.get()
85 current.addAfterCommitHook(self.celeryCommitHook)
86
87+ def queue(self, manage_transaction=False, abort_transaction=False):
88+ """See `IJob`."""
89+ self.job.queue(
90+ manage_transaction, abort_transaction,
91+ add_commit_hook=self.celeryRunOnCommit)
92+
93
94 class BaseJobRunner(LazrJobRunner):
95 """Runner of Jobs."""
96
97=== added file 'lib/lp/services/job/tests/test_retry_jobs_with_celery.py'
98--- lib/lp/services/job/tests/test_retry_jobs_with_celery.py 1970-01-01 00:00:00 +0000
99+++ lib/lp/services/job/tests/test_retry_jobs_with_celery.py 2012-05-16 19:32:19 +0000
100@@ -0,0 +1,127 @@
101+# Copyright 2012 Canonical Ltd. This software is licensed under the
102+# GNU Affero General Public License version 3 (see the file LICENSE).
103+
104+"""Tests for running jobs via Celery."""
105+
106+
107+from datetime import (
108+ timedelta,
109+ )
110+from time import (
111+ sleep,
112+ )
113+import transaction
114+from lazr.delegates import delegates
115+from zope.interface import implements
116+
117+from lp.services.config import config
118+from lp.services.database.lpstorm import IStore
119+from lp.services.features.testing import FeatureFixture
120+from lp.services.job.interfaces.job import (
121+ IJob,
122+ IRunnableJob,
123+ )
124+from lp.services.job.interfaces.job import JobStatus
125+from lp.services.job.model.job import Job
126+from lp.services.job.runner import BaseRunnableJob
127+from lp.services.job.tests import block_on_job
128+from lp.testing import TestCaseWithFactory
129+from lp.testing.layers import CeleryJobLayer
130+
131+
132+class TestJob(BaseRunnableJob):
133+ """A dummy job."""
134+
135+ implements(IRunnableJob)
136+ delegates(IJob, 'job')
137+
138+ config = config.launchpad
139+
140+ def __init__(self, job_id=None):
141+ if job_id is not None:
142+ store = IStore(Job)
143+ self.job = store.find(Job, id=job_id)[0]
144+ else:
145+ self.job = Job(max_retries=2)
146+
147+ def run(self):
148+ pass
149+
150+ @classmethod
151+ def makeInstance(cls, job_id):
152+ return cls(job_id)
153+
154+ @classmethod
155+ def getDBClass(cls):
156+ return cls
157+
158+
159+class RetryException(Exception):
160+ """An exception used as a retry exception in TestJobWithRetryError."""
161+
162+
163+class TestJobWithRetryError(TestJob):
164+ """A dummy job."""
165+
166+ retry_error_types = (RetryException, )
167+
168+ retry_delay = timedelta(seconds=1)
169+
170+ def run(self):
171+ """Raise a retry exception on the the first attempt to run the job."""
172+ if self.job.attempt_count < 2:
173+ # Reset the lease so that the next attempt to run the
174+ # job does not fail with a LeaseHeld error.
175+ self.job.lease_expires = None
176+ raise RetryException
177+
178+
179+class TestRetryJobsViaCelery(TestCaseWithFactory):
180+ """Tests for running jobs via Celery."""
181+
182+ layer = CeleryJobLayer
183+
184+ def test_TestJob(self):
185+ # TestJob can be run via Celery.
186+ self.useFixture(FeatureFixture({
187+ 'jobs.celery.enabled_classes': 'TestJob'
188+ }))
189+ with block_on_job(self):
190+ job = TestJob()
191+ job.celeryRunOnCommit()
192+ job_id = job.job_id
193+ transaction.commit()
194+ store = IStore(Job)
195+ dbjob = store.find(Job, id=job_id)[0]
196+ self.assertEqual(JobStatus.COMPLETED, dbjob.status)
197+
198+ def test_jobs_with_retry_exceptions_are_queued_again(self):
199+ # A job that raises a retry error is automatically queued
200+ # and executed again.
201+ self.useFixture(FeatureFixture({
202+ 'jobs.celery.enabled_classes': 'TestJobWithRetryError'
203+ }))
204+ with block_on_job(self):
205+ job = TestJobWithRetryError()
206+ job.celeryRunOnCommit()
207+ job_id = job.job_id
208+ transaction.commit()
209+ store = IStore(Job)
210+
211+ # block_on_job() is not aware of the Celery request
212+ # issued when the retry exception occurs, but we can
213+ # check the status of the job in the database.
214+ def job_finished():
215+ transaction.abort()
216+ dbjob = store.find(Job, id=job_id)[0]
217+ return (
218+ dbjob.status == JobStatus.COMPLETED and
219+ dbjob.attempt_count == 2)
220+ count = 0
221+ while count < 50 and not job_finished():
222+ sleep(0.2)
223+ count += 1
224+
225+ dbjob = store.find(Job, id=job_id)[0]
226+ self.assertEqual(2, dbjob.attempt_count)
227+ self.assertEqual(JobStatus.COMPLETED, dbjob.status)