Merge lp:~cjwatson/launchpad/snap-upload-better-retry into lp:launchpad

Proposed by Colin Watson
Status: Merged
Merged at revision: 18426
Proposed branch: lp:~cjwatson/launchpad/snap-upload-better-retry
Merge into: lp:launchpad
Diff against target: 357 lines (+133/-30)
8 files modified
lib/lp/code/model/branchmergeproposaljob.py (+7/-2)
lib/lp/code/model/tests/test_branchmergeproposaljobs.py (+34/-0)
lib/lp/services/job/model/job.py (+3/-1)
lib/lp/services/job/tests/test_celery.py (+10/-10)
lib/lp/services/job/tests/test_job.py (+12/-5)
lib/lp/services/job/tests/test_runner.py (+2/-1)
lib/lp/snappy/model/snapbuildjob.py (+27/-9)
lib/lp/snappy/tests/test_snapbuildjob.py (+38/-2)
To merge this branch: bzr merge lp:~cjwatson/launchpad/snap-upload-better-retry
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+326548@code.launchpad.net

Commit message

Make SnapStoreUploadJob retries go via celery and be much more responsive.

Description of the change

There were several problems here. The lifecycle method overrides in SnapStoreUploadJob were subtly wrong and thus bypassed celeryRunOnCommit when retrying. The lease was held for much longer than the retry delay, even after the job had finished, which introduced several minutes of unnecessary delay (before fixing this, I checked that all the iterReady implementations honour Job.ready_jobs). Finally, we needed a shorter retry delay for the first few attempts to poll the status endpoint; that polling doesn't involve much work on the Launchpad side, and it may well succeed quite quickly for small packages.

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
2--- lib/lp/code/model/branchmergeproposaljob.py 2016-09-06 15:34:38 +0000
3+++ lib/lp/code/model/branchmergeproposaljob.py 2017-06-29 18:31:47 +0000
4@@ -675,8 +675,13 @@
5 continue
6 # We have now seen this merge proposal.
7 seen_merge_proposals.add(bmp.id)
8- # If the job is running, then skip it
9- if job.status == JobStatus.RUNNING:
10+ # If the job is running or can't currently be run due to its
11+ # lease or its start time, then skip it.
12+ if (job.status == JobStatus.RUNNING or
13+ (job.lease_expires is not None and
14+ job.lease_expires >= datetime.now(pytz.UTC)) or
15+ (job.scheduled_start is not None and
16+ job.scheduled_start > datetime.now(pytz.UTC))):
17 continue
18 derived_job = bmp_job.makeDerived()
19 # If the job is an update preview diff, then check that it is
20
21=== modified file 'lib/lp/code/model/tests/test_branchmergeproposaljobs.py'
22--- lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2017-05-25 17:22:38 +0000
23+++ lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2017-06-29 18:31:47 +0000
24@@ -559,6 +559,40 @@
25 jobs = self.job_source.iterReady()
26 self.assertEqual(0, len(jobs))
27
28+ def test_iterReady_new_merge_proposal_update_diff_leased(self):
29+ # If either the diff or the email job has an acquired lease, then
30+ # iterReady skips it.
31+ self.makeBranchMergeProposal(
32+ set_state=BranchMergeProposalStatus.NEEDS_REVIEW)
33+ [update_diff_job] = self.job_source.iterReady()
34+ self.assertIsInstance(update_diff_job, UpdatePreviewDiffJob)
35+ update_diff_job.acquireLease()
36+ self.assertEqual(0, len(self.job_source.iterReady()))
37+ update_diff_job.start()
38+ update_diff_job.complete()
39+ [email_job] = self.job_source.iterReady()
40+ self.assertIsInstance(email_job, MergeProposalNeedsReviewEmailJob)
41+ email_job.acquireLease()
42+ self.assertEqual(0, len(self.job_source.iterReady()))
43+
44+ def test_iterReady_new_merge_proposal_update_diff_scheduled(self):
45+ # If either the diff or the email job has a scheduled start time in
46+ # the future, then iterReady skips it.
47+ self.makeBranchMergeProposal(
48+ set_state=BranchMergeProposalStatus.NEEDS_REVIEW)
49+ [update_diff_job] = self.job_source.iterReady()
50+ self.assertIsInstance(update_diff_job, UpdatePreviewDiffJob)
51+ update_diff_job.start()
52+ update_diff_job.queue()
53+ self.assertEqual(0, len(self.job_source.iterReady()))
54+ update_diff_job.start()
55+ update_diff_job.complete()
56+ [email_job] = self.job_source.iterReady()
57+ self.assertIsInstance(email_job, MergeProposalNeedsReviewEmailJob)
58+ email_job.start()
59+ email_job.queue()
60+ self.assertEqual(0, len(self.job_source.iterReady()))
61+
62 def makeBranchMergeProposal(self, set_state=None):
63 # Make a merge proposal that would have a ready update diff job.
64 bmp = self.factory.makeBranchMergeProposal(set_state=set_state)
65
66=== modified file 'lib/lp/services/job/model/job.py'
67--- lib/lp/services/job/model/job.py 2015-10-14 15:22:01 +0000
68+++ lib/lp/services/job/model/job.py 2017-06-29 18:31:47 +0000
69@@ -1,4 +1,4 @@
70-# Copyright 2009-2013 Canonical Ltd. This software is licensed under the
71+# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
72 # GNU Affero General Public License version 3 (see the file LICENSE).
73
74 """ORM object representing jobs."""
75@@ -206,6 +206,8 @@
76 if self.status != JobStatus.WAITING:
77 self._set_status(JobStatus.WAITING)
78 self.date_finished = datetime.datetime.now(UTC)
79+ # Release the lease to allow short retry delays to be effective.
80+ self.lease_expires = None
81 if add_commit_hook is not None:
82 add_commit_hook()
83 if manage_transaction:
84
85=== modified file 'lib/lp/services/job/tests/test_celery.py'
86--- lib/lp/services/job/tests/test_celery.py 2015-08-04 02:26:26 +0000
87+++ lib/lp/services/job/tests/test_celery.py 2017-06-29 18:31:47 +0000
88@@ -1,4 +1,4 @@
89-# Copyright 2012 Canonical Ltd. This software is licensed under the
90+# Copyright 2012-2017 Canonical Ltd. This software is licensed under the
91 # GNU Affero General Public License version 3 (see the file LICENSE).
92
93 """Tests for running jobs via Celery."""
94@@ -93,9 +93,10 @@
95 self.job.lease_expires = datetime.now(UTC)
96 raise RetryException
97 elif self.job.attempt_count == 2:
98- # The retry delay is 5 seconds, but the lease is for nearly
99- # 10 seconds, so the job will be rescheduled 10 seconds in
100- # the future.
101+ # The retry delay is 5 seconds, but the lease is for nearly 10
102+ # seconds. However, the job releases the lease when it's
103+ # requeued, so the job will again be rescheduled for 5 seconds
104+ # (retry_delay) in the future.
105 raise RetryException
106
107
108@@ -185,10 +186,9 @@
109 iso8601.parse_date(d)
110 for d in job.job.base_json_data['dates_started']]
111
112- # The first attempt's lease is set to the end of the job, so
113- # the second attempt should start roughly 5 seconds after the
114- # first. The third attempt has to wait out the full 10 second
115- # lease, so it should start roughly 10 seconds after the second.
116+ # The first attempt's lease is set to the end of the job, so the
117+ # second attempt should start roughly 5 seconds after the first. The
118+ # third attempt should start roughly 5 seconds after the second.
119 self.assertThat(dates_started, HasLength(3))
120 self.assertThat(dates_started,
121 MatchesListwise([
122@@ -197,8 +197,8 @@
123 GreaterThan(dates_started[0] + timedelta(seconds=4)),
124 LessThan(dates_started[0] + timedelta(seconds=8))),
125 MatchesAll(
126- GreaterThan(dates_started[1] + timedelta(seconds=8)),
127- LessThan(dates_started[1] + timedelta(seconds=13))),
128+ GreaterThan(dates_started[1] + timedelta(seconds=4)),
129+ LessThan(dates_started[1] + timedelta(seconds=8))),
130 ]))
131 self.assertEqual(3, job.attempt_count)
132 self.assertEqual(JobStatus.COMPLETED, job.status)
133
134=== modified file 'lib/lp/services/job/tests/test_job.py'
135--- lib/lp/services/job/tests/test_job.py 2015-07-30 01:50:11 +0000
136+++ lib/lp/services/job/tests/test_job.py 2017-06-29 18:31:47 +0000
137@@ -1,4 +1,4 @@
138-# Copyright 2009-2011 Canonical Ltd. This software is licensed under the
139+# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
140 # GNU Affero General Public License version 3 (see the file LICENSE).
141
142 __metaclass__ = type
143@@ -7,11 +7,11 @@
144 datetime,
145 timedelta,
146 )
147-from pytz import UTC
148 import time
149
150 from lazr.jobrunner.jobrunner import LeaseHeld
151 import pytz
152+from pytz import UTC
153 from storm.locals import Store
154 from testtools.matchers import Equals
155 import transaction
156@@ -186,6 +186,13 @@
157 self.assertNotEqual(None, job.date_finished)
158 self.assertEqual(job.status, JobStatus.WAITING)
159
160+ def test_queue_clears_lease_expires(self):
161+ """Queueing a job releases its lease."""
162+ job = Job(_status=JobStatus.RUNNING)
163+ job.lease_expires = UTC_NOW
164+ job.queue()
165+ self.assertIsNone(job.lease_expires)
166+
167 def test_suspend(self):
168 """A job that is in the WAITING state can be suspended."""
169 job = Job(_status=JobStatus.WAITING)
170@@ -218,12 +225,12 @@
171 job.status,
172 JobStatus.WAITING)
173
174- def test_resume_clears_lease_expiry(self):
175- """A job that resumes should null out the lease_expiry."""
176+ def test_resume_clears_lease_expires(self):
177+ """A job that resumes should null out the lease_expires."""
178 job = Job(_status=JobStatus.SUSPENDED)
179 job.lease_expires = UTC_NOW
180 job.resume()
181- self.assertIs(None, job.lease_expires)
182+ self.assertIsNone(job.lease_expires)
183
184 def test_resume_when_running(self):
185 """When a job is running, attempting to resume is invalid."""
186
187=== modified file 'lib/lp/services/job/tests/test_runner.py'
188--- lib/lp/services/job/tests/test_runner.py 2016-05-06 09:28:28 +0000
189+++ lib/lp/services/job/tests/test_runner.py 2017-06-29 18:31:47 +0000
190@@ -1,4 +1,4 @@
191-# Copyright 2009-2016 Canonical Ltd. This software is licensed under the
192+# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
193 # GNU Affero General Public License version 3 (see the file LICENSE).
194
195 """Tests for job-running facilities."""
196@@ -349,6 +349,7 @@
197 MatchesAll(
198 GreaterThan(expected_delay - timedelta(minutes=1)),
199 LessThan(expected_delay + timedelta(minutes=1))))
200+ self.assertIsNone(job.lease_expires)
201 self.assertNotIn(job, runner.completed_jobs)
202 self.assertIn(job, runner.incomplete_jobs)
203
204
205=== modified file 'lib/lp/snappy/model/snapbuildjob.py'
206--- lib/lp/snappy/model/snapbuildjob.py 2017-06-14 10:25:23 +0000
207+++ lib/lp/snappy/model/snapbuildjob.py 2017-06-29 18:31:47 +0000
208@@ -197,7 +197,6 @@
209 )
210
211 retry_error_types = (UploadNotScannedYetResponse, RetryableSnapStoreError)
212- retry_delay = timedelta(minutes=1)
213 max_retries = 20
214
215 config = config.ISnapStoreUploadJobSource
216@@ -255,29 +254,29 @@
217 # Ideally we'd just override Job._set_status or similar, but
218 # lazr.delegates makes that difficult, so we use this to override all
219 # the individual Job lifecycle methods instead.
220- def _do_lifecycle(self, method, *args, **kwargs):
221+ def _do_lifecycle(self, method_name, *args, **kwargs):
222 old_store_upload_status = self.snapbuild.store_upload_status
223- method(*args, **kwargs)
224+ getattr(super(SnapStoreUploadJob, self), method_name)(*args, **kwargs)
225 if self.snapbuild.store_upload_status != old_store_upload_status:
226 notify(SnapBuildStoreUploadStatusChangedEvent(self.snapbuild))
227
228 def start(self, *args, **kwargs):
229- self._do_lifecycle(self.job.start, *args, **kwargs)
230+ self._do_lifecycle("start", *args, **kwargs)
231
232 def complete(self, *args, **kwargs):
233- self._do_lifecycle(self.job.complete, *args, **kwargs)
234+ self._do_lifecycle("complete", *args, **kwargs)
235
236 def fail(self, *args, **kwargs):
237- self._do_lifecycle(self.job.fail, *args, **kwargs)
238+ self._do_lifecycle("fail", *args, **kwargs)
239
240 def queue(self, *args, **kwargs):
241- self._do_lifecycle(self.job.queue, *args, **kwargs)
242+ self._do_lifecycle("queue", *args, **kwargs)
243
244 def suspend(self, *args, **kwargs):
245- self._do_lifecycle(self.job.suspend, *args, **kwargs)
246+ self._do_lifecycle("suspend", *args, **kwargs)
247
248 def resume(self, *args, **kwargs):
249- self._do_lifecycle(self.job.resume, *args, **kwargs)
250+ self._do_lifecycle("resume", *args, **kwargs)
251
252 def getOopsVars(self):
253 """See `IRunnableJob`."""
254@@ -285,15 +284,34 @@
255 oops_vars.append(('error_detail', self.error_detail))
256 return oops_vars
257
258+ @property
259+ def retry_delay(self):
260+ """See `BaseRunnableJob`."""
261+ if "status_url" in self.metadata and self.store_url is None:
262+ # At the moment we have to poll the status endpoint to find out
263+ # if the store has finished scanning. Try to deal with easy
264+ # cases quickly without hammering our job runners or the store
265+ # too badly.
266+ delays = (15, 15, 30, 30)
267+ try:
268+ return timedelta(seconds=delays[self.attempt_count - 1])
269+ except IndexError:
270+ pass
271+ return timedelta(minutes=1)
272+
273 def run(self):
274 """See `IRunnableJob`."""
275 client = getUtility(ISnapStoreClient)
276 try:
277 if "status_url" not in self.metadata:
278 self.metadata["status_url"] = client.upload(self.snapbuild)
279+ # We made progress, so reset attempt_count.
280+ self.attempt_count = 1
281 if self.store_url is None:
282 self.store_url, self.store_revision = (
283 client.checkStatus(self.metadata["status_url"]))
284+ # We made progress, so reset attempt_count.
285+ self.attempt_count = 1
286 if self.snapbuild.snap.store_channels:
287 if self.store_revision is None:
288 raise ManualReview(
289
290=== modified file 'lib/lp/snappy/tests/test_snapbuildjob.py'
291--- lib/lp/snappy/tests/test_snapbuildjob.py 2017-06-14 10:25:23 +0000
292+++ lib/lp/snappy/tests/test_snapbuildjob.py 2017-06-29 18:31:47 +0000
293@@ -7,6 +7,8 @@
294
295 __metaclass__ = type
296
297+from datetime import timedelta
298+
299 from fixtures import FakeLogger
300 from testtools.matchers import (
301 Equals,
302@@ -256,7 +258,6 @@
303 self.assertWebhookDeliveries(snapbuild, ["Pending"])
304 # Try again. The upload part of the job is retried, and this time
305 # it succeeds.
306- job.lease_expires = None
307 job.scheduled_start = None
308 client.upload.calls = []
309 client.upload.failure = None
310@@ -403,7 +404,6 @@
311 self.assertWebhookDeliveries(snapbuild, ["Pending"])
312 # Try again. The upload part of the job is not retried, and this
313 # time the scan completes.
314- job.lease_expires = None
315 job.scheduled_start = None
316 client.upload.calls = []
317 client.checkStatus.calls = []
318@@ -594,3 +594,39 @@
319 snapbuild.id, footer)
320 self.assertWebhookDeliveries(
321 snapbuild, ["Pending", "Failed to release to channels"])
322+
323+ def test_retry_delay(self):
324+ # The job is retried every minute, unless it just made one of its
325+ # first four attempts to poll the status endpoint, in which case the
326+ # delays are 15/15/30/30 seconds.
327+ self.useFixture(FakeLogger())
328+ snapbuild = self.makeSnapBuild()
329+ job = SnapStoreUploadJob.create(snapbuild)
330+ client = FakeSnapStoreClient()
331+ client.upload.failure = UploadFailedResponse(
332+ "Proxy error", can_retry=True)
333+ self.useFixture(ZopeUtilityFixture(client, ISnapStoreClient))
334+ with dbuser(config.ISnapStoreUploadJobSource.dbuser):
335+ JobRunner([job]).runAll()
336+ self.assertNotIn("status_url", job.metadata)
337+ self.assertEqual(timedelta(seconds=60), job.retry_delay)
338+ job.scheduled_start = None
339+ client.upload.failure = None
340+ client.upload.result = self.status_url
341+ client.checkStatus.failure = UploadNotScannedYetResponse()
342+ for expected_delay in (15, 15, 30, 30, 60):
343+ with dbuser(config.ISnapStoreUploadJobSource.dbuser):
344+ JobRunner([job]).runAll()
345+ self.assertIn("status_url", job.metadata)
346+ self.assertIsNone(job.store_url)
347+ self.assertEqual(
348+ timedelta(seconds=expected_delay), job.retry_delay)
349+ job.scheduled_start = None
350+ client.checkStatus.failure = None
351+ client.checkStatus.result = (self.store_url, 1)
352+ with dbuser(config.ISnapStoreUploadJobSource.dbuser):
353+ JobRunner([job]).runAll()
354+ self.assertEqual(self.store_url, job.store_url)
355+ self.assertIsNone(job.error_message)
356+ self.assertEqual([], pop_notifications())
357+ self.assertEqual(JobStatus.COMPLETED, job.job.status)