Merge lp:~cjwatson/launchpad/snap-upload-better-retry into lp:launchpad

Proposed by Colin Watson
Status: Merged
Merged at revision: 18426
Proposed branch: lp:~cjwatson/launchpad/snap-upload-better-retry
Merge into: lp:launchpad
Diff against target: 357 lines (+133/-30)
8 files modified
lib/lp/code/model/branchmergeproposaljob.py (+7/-2)
lib/lp/code/model/tests/test_branchmergeproposaljobs.py (+34/-0)
lib/lp/services/job/model/job.py (+3/-1)
lib/lp/services/job/tests/test_celery.py (+10/-10)
lib/lp/services/job/tests/test_job.py (+12/-5)
lib/lp/services/job/tests/test_runner.py (+2/-1)
lib/lp/snappy/model/snapbuildjob.py (+27/-9)
lib/lp/snappy/tests/test_snapbuildjob.py (+38/-2)
To merge this branch: bzr merge lp:~cjwatson/launchpad/snap-upload-better-retry
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+326548@code.launchpad.net

Commit message

Make SnapStoreUploadJob retries go via celery and be much more responsive.

Description of the change

There were several problems here. The lifecycle method overrides in SnapStoreUploadJob were subtly wrong and thus bypassed celeryRunOnCommit when retrying. The lease was held for much longer than the retry delay, even after the job had finished, which introduced several minutes of unnecessary delay (before fixing this, I checked that all the iterReady implementations honour Job.ready_jobs). Finally, we needed a shorter retry delay for the first few attempts to poll the status endpoint; that polling doesn't involve much work on the Launchpad side, and it may well succeed quite quickly for small packages.

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) :
review: Approve (code)

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
=== modified file 'lib/lp/code/model/branchmergeproposaljob.py'
--- lib/lp/code/model/branchmergeproposaljob.py 2016-09-06 15:34:38 +0000
+++ lib/lp/code/model/branchmergeproposaljob.py 2017-06-29 18:31:47 +0000
@@ -675,8 +675,13 @@
675 continue675 continue
676 # We have now seen this merge proposal.676 # We have now seen this merge proposal.
677 seen_merge_proposals.add(bmp.id)677 seen_merge_proposals.add(bmp.id)
678 # If the job is running, then skip it678 # If the job is running or can't currently be run due to its
679 if job.status == JobStatus.RUNNING:679 # lease or its start time, then skip it.
680 if (job.status == JobStatus.RUNNING or
681 (job.lease_expires is not None and
682 job.lease_expires >= datetime.now(pytz.UTC)) or
683 (job.scheduled_start is not None and
684 job.scheduled_start > datetime.now(pytz.UTC))):
680 continue685 continue
681 derived_job = bmp_job.makeDerived()686 derived_job = bmp_job.makeDerived()
682 # If the job is an update preview diff, then check that it is687 # If the job is an update preview diff, then check that it is
683688
=== modified file 'lib/lp/code/model/tests/test_branchmergeproposaljobs.py'
--- lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2017-05-25 17:22:38 +0000
+++ lib/lp/code/model/tests/test_branchmergeproposaljobs.py 2017-06-29 18:31:47 +0000
@@ -559,6 +559,40 @@
559 jobs = self.job_source.iterReady()559 jobs = self.job_source.iterReady()
560 self.assertEqual(0, len(jobs))560 self.assertEqual(0, len(jobs))
561561
562 def test_iterReady_new_merge_proposal_update_diff_leased(self):
563 # If either the diff or the email job has an acquired lease, then
564 # iterReady skips it.
565 self.makeBranchMergeProposal(
566 set_state=BranchMergeProposalStatus.NEEDS_REVIEW)
567 [update_diff_job] = self.job_source.iterReady()
568 self.assertIsInstance(update_diff_job, UpdatePreviewDiffJob)
569 update_diff_job.acquireLease()
570 self.assertEqual(0, len(self.job_source.iterReady()))
571 update_diff_job.start()
572 update_diff_job.complete()
573 [email_job] = self.job_source.iterReady()
574 self.assertIsInstance(email_job, MergeProposalNeedsReviewEmailJob)
575 email_job.acquireLease()
576 self.assertEqual(0, len(self.job_source.iterReady()))
577
578 def test_iterReady_new_merge_proposal_update_diff_scheduled(self):
579 # If either the diff or the email job has a scheduled start time in
580 # the future, then iterReady skips it.
581 self.makeBranchMergeProposal(
582 set_state=BranchMergeProposalStatus.NEEDS_REVIEW)
583 [update_diff_job] = self.job_source.iterReady()
584 self.assertIsInstance(update_diff_job, UpdatePreviewDiffJob)
585 update_diff_job.start()
586 update_diff_job.queue()
587 self.assertEqual(0, len(self.job_source.iterReady()))
588 update_diff_job.start()
589 update_diff_job.complete()
590 [email_job] = self.job_source.iterReady()
591 self.assertIsInstance(email_job, MergeProposalNeedsReviewEmailJob)
592 email_job.start()
593 email_job.queue()
594 self.assertEqual(0, len(self.job_source.iterReady()))
595
562 def makeBranchMergeProposal(self, set_state=None):596 def makeBranchMergeProposal(self, set_state=None):
563 # Make a merge proposal that would have a ready update diff job.597 # Make a merge proposal that would have a ready update diff job.
564 bmp = self.factory.makeBranchMergeProposal(set_state=set_state)598 bmp = self.factory.makeBranchMergeProposal(set_state=set_state)
565599
=== modified file 'lib/lp/services/job/model/job.py'
--- lib/lp/services/job/model/job.py 2015-10-14 15:22:01 +0000
+++ lib/lp/services/job/model/job.py 2017-06-29 18:31:47 +0000
@@ -1,4 +1,4 @@
1# Copyright 2009-2013 Canonical Ltd. This software is licensed under the1# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""ORM object representing jobs."""4"""ORM object representing jobs."""
@@ -206,6 +206,8 @@
206 if self.status != JobStatus.WAITING:206 if self.status != JobStatus.WAITING:
207 self._set_status(JobStatus.WAITING)207 self._set_status(JobStatus.WAITING)
208 self.date_finished = datetime.datetime.now(UTC)208 self.date_finished = datetime.datetime.now(UTC)
209 # Release the lease to allow short retry delays to be effective.
210 self.lease_expires = None
209 if add_commit_hook is not None:211 if add_commit_hook is not None:
210 add_commit_hook()212 add_commit_hook()
211 if manage_transaction:213 if manage_transaction:
212214
=== modified file 'lib/lp/services/job/tests/test_celery.py'
--- lib/lp/services/job/tests/test_celery.py 2015-08-04 02:26:26 +0000
+++ lib/lp/services/job/tests/test_celery.py 2017-06-29 18:31:47 +0000
@@ -1,4 +1,4 @@
1# Copyright 2012 Canonical Ltd. This software is licensed under the1# Copyright 2012-2017 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""Tests for running jobs via Celery."""4"""Tests for running jobs via Celery."""
@@ -93,9 +93,10 @@
93 self.job.lease_expires = datetime.now(UTC)93 self.job.lease_expires = datetime.now(UTC)
94 raise RetryException94 raise RetryException
95 elif self.job.attempt_count == 2:95 elif self.job.attempt_count == 2:
96 # The retry delay is 5 seconds, but the lease is for nearly96 # The retry delay is 5 seconds, but the lease is for nearly 10
97 # 10 seconds, so the job will be rescheduled 10 seconds in97 # seconds. However, the job releases the lease when it's
98 # the future.98 # requeued, so the job will again be rescheduled for 5 seconds
99 # (retry_delay) in the future.
99 raise RetryException100 raise RetryException
100101
101102
@@ -185,10 +186,9 @@
185 iso8601.parse_date(d)186 iso8601.parse_date(d)
186 for d in job.job.base_json_data['dates_started']]187 for d in job.job.base_json_data['dates_started']]
187188
188 # The first attempt's lease is set to the end of the job, so189 # The first attempt's lease is set to the end of the job, so the
189 # the second attempt should start roughly 5 seconds after the190 # second attempt should start roughly 5 seconds after the first. The
190 # first. The third attempt has to wait out the full 10 second191 # third attempt should start roughly 5 seconds after the second.
191 # lease, so it should start roughly 10 seconds after the second.
192 self.assertThat(dates_started, HasLength(3))192 self.assertThat(dates_started, HasLength(3))
193 self.assertThat(dates_started,193 self.assertThat(dates_started,
194 MatchesListwise([194 MatchesListwise([
@@ -197,8 +197,8 @@
197 GreaterThan(dates_started[0] + timedelta(seconds=4)),197 GreaterThan(dates_started[0] + timedelta(seconds=4)),
198 LessThan(dates_started[0] + timedelta(seconds=8))),198 LessThan(dates_started[0] + timedelta(seconds=8))),
199 MatchesAll(199 MatchesAll(
200 GreaterThan(dates_started[1] + timedelta(seconds=8)),200 GreaterThan(dates_started[1] + timedelta(seconds=4)),
201 LessThan(dates_started[1] + timedelta(seconds=13))),201 LessThan(dates_started[1] + timedelta(seconds=8))),
202 ]))202 ]))
203 self.assertEqual(3, job.attempt_count)203 self.assertEqual(3, job.attempt_count)
204 self.assertEqual(JobStatus.COMPLETED, job.status)204 self.assertEqual(JobStatus.COMPLETED, job.status)
205205
=== modified file 'lib/lp/services/job/tests/test_job.py'
--- lib/lp/services/job/tests/test_job.py 2015-07-30 01:50:11 +0000
+++ lib/lp/services/job/tests/test_job.py 2017-06-29 18:31:47 +0000
@@ -1,4 +1,4 @@
1# Copyright 2009-2011 Canonical Ltd. This software is licensed under the1# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4__metaclass__ = type4__metaclass__ = type
@@ -7,11 +7,11 @@
7 datetime,7 datetime,
8 timedelta,8 timedelta,
9 )9 )
10from pytz import UTC
11import time10import time
1211
13from lazr.jobrunner.jobrunner import LeaseHeld12from lazr.jobrunner.jobrunner import LeaseHeld
14import pytz13import pytz
14from pytz import UTC
15from storm.locals import Store15from storm.locals import Store
16from testtools.matchers import Equals16from testtools.matchers import Equals
17import transaction17import transaction
@@ -186,6 +186,13 @@
186 self.assertNotEqual(None, job.date_finished)186 self.assertNotEqual(None, job.date_finished)
187 self.assertEqual(job.status, JobStatus.WAITING)187 self.assertEqual(job.status, JobStatus.WAITING)
188188
189 def test_queue_clears_lease_expires(self):
190 """Queueing a job releases its lease."""
191 job = Job(_status=JobStatus.RUNNING)
192 job.lease_expires = UTC_NOW
193 job.queue()
194 self.assertIsNone(job.lease_expires)
195
189 def test_suspend(self):196 def test_suspend(self):
190 """A job that is in the WAITING state can be suspended."""197 """A job that is in the WAITING state can be suspended."""
191 job = Job(_status=JobStatus.WAITING)198 job = Job(_status=JobStatus.WAITING)
@@ -218,12 +225,12 @@
218 job.status,225 job.status,
219 JobStatus.WAITING)226 JobStatus.WAITING)
220227
221 def test_resume_clears_lease_expiry(self):228 def test_resume_clears_lease_expires(self):
222 """A job that resumes should null out the lease_expiry."""229 """A job that resumes should null out the lease_expires."""
223 job = Job(_status=JobStatus.SUSPENDED)230 job = Job(_status=JobStatus.SUSPENDED)
224 job.lease_expires = UTC_NOW231 job.lease_expires = UTC_NOW
225 job.resume()232 job.resume()
226 self.assertIs(None, job.lease_expires)233 self.assertIsNone(job.lease_expires)
227234
228 def test_resume_when_running(self):235 def test_resume_when_running(self):
229 """When a job is running, attempting to resume is invalid."""236 """When a job is running, attempting to resume is invalid."""
230237
=== modified file 'lib/lp/services/job/tests/test_runner.py'
--- lib/lp/services/job/tests/test_runner.py 2016-05-06 09:28:28 +0000
+++ lib/lp/services/job/tests/test_runner.py 2017-06-29 18:31:47 +0000
@@ -1,4 +1,4 @@
1# Copyright 2009-2016 Canonical Ltd. This software is licensed under the1# Copyright 2009-2017 Canonical Ltd. This software is licensed under the
2# GNU Affero General Public License version 3 (see the file LICENSE).2# GNU Affero General Public License version 3 (see the file LICENSE).
33
4"""Tests for job-running facilities."""4"""Tests for job-running facilities."""
@@ -349,6 +349,7 @@
349 MatchesAll(349 MatchesAll(
350 GreaterThan(expected_delay - timedelta(minutes=1)),350 GreaterThan(expected_delay - timedelta(minutes=1)),
351 LessThan(expected_delay + timedelta(minutes=1))))351 LessThan(expected_delay + timedelta(minutes=1))))
352 self.assertIsNone(job.lease_expires)
352 self.assertNotIn(job, runner.completed_jobs)353 self.assertNotIn(job, runner.completed_jobs)
353 self.assertIn(job, runner.incomplete_jobs)354 self.assertIn(job, runner.incomplete_jobs)
354355
355356
=== modified file 'lib/lp/snappy/model/snapbuildjob.py'
--- lib/lp/snappy/model/snapbuildjob.py 2017-06-14 10:25:23 +0000
+++ lib/lp/snappy/model/snapbuildjob.py 2017-06-29 18:31:47 +0000
@@ -197,7 +197,6 @@
197 )197 )
198198
199 retry_error_types = (UploadNotScannedYetResponse, RetryableSnapStoreError)199 retry_error_types = (UploadNotScannedYetResponse, RetryableSnapStoreError)
200 retry_delay = timedelta(minutes=1)
201 max_retries = 20200 max_retries = 20
202201
203 config = config.ISnapStoreUploadJobSource202 config = config.ISnapStoreUploadJobSource
@@ -255,29 +254,29 @@
255 # Ideally we'd just override Job._set_status or similar, but254 # Ideally we'd just override Job._set_status or similar, but
256 # lazr.delegates makes that difficult, so we use this to override all255 # lazr.delegates makes that difficult, so we use this to override all
257 # the individual Job lifecycle methods instead.256 # the individual Job lifecycle methods instead.
258 def _do_lifecycle(self, method, *args, **kwargs):257 def _do_lifecycle(self, method_name, *args, **kwargs):
259 old_store_upload_status = self.snapbuild.store_upload_status258 old_store_upload_status = self.snapbuild.store_upload_status
260 method(*args, **kwargs)259 getattr(super(SnapStoreUploadJob, self), method_name)(*args, **kwargs)
261 if self.snapbuild.store_upload_status != old_store_upload_status:260 if self.snapbuild.store_upload_status != old_store_upload_status:
262 notify(SnapBuildStoreUploadStatusChangedEvent(self.snapbuild))261 notify(SnapBuildStoreUploadStatusChangedEvent(self.snapbuild))
263262
264 def start(self, *args, **kwargs):263 def start(self, *args, **kwargs):
265 self._do_lifecycle(self.job.start, *args, **kwargs)264 self._do_lifecycle("start", *args, **kwargs)
266265
267 def complete(self, *args, **kwargs):266 def complete(self, *args, **kwargs):
268 self._do_lifecycle(self.job.complete, *args, **kwargs)267 self._do_lifecycle("complete", *args, **kwargs)
269268
270 def fail(self, *args, **kwargs):269 def fail(self, *args, **kwargs):
271 self._do_lifecycle(self.job.fail, *args, **kwargs)270 self._do_lifecycle("fail", *args, **kwargs)
272271
273 def queue(self, *args, **kwargs):272 def queue(self, *args, **kwargs):
274 self._do_lifecycle(self.job.queue, *args, **kwargs)273 self._do_lifecycle("queue", *args, **kwargs)
275274
276 def suspend(self, *args, **kwargs):275 def suspend(self, *args, **kwargs):
277 self._do_lifecycle(self.job.suspend, *args, **kwargs)276 self._do_lifecycle("suspend", *args, **kwargs)
278277
279 def resume(self, *args, **kwargs):278 def resume(self, *args, **kwargs):
280 self._do_lifecycle(self.job.resume, *args, **kwargs)279 self._do_lifecycle("resume", *args, **kwargs)
281280
282 def getOopsVars(self):281 def getOopsVars(self):
283 """See `IRunnableJob`."""282 """See `IRunnableJob`."""
@@ -285,15 +284,34 @@
285 oops_vars.append(('error_detail', self.error_detail))284 oops_vars.append(('error_detail', self.error_detail))
286 return oops_vars285 return oops_vars
287286
287 @property
288 def retry_delay(self):
289 """See `BaseRunnableJob`."""
290 if "status_url" in self.metadata and self.store_url is None:
291 # At the moment we have to poll the status endpoint to find out
292 # if the store has finished scanning. Try to deal with easy
293 # cases quickly without hammering our job runners or the store
294 # too badly.
295 delays = (15, 15, 30, 30)
296 try:
297 return timedelta(seconds=delays[self.attempt_count - 1])
298 except IndexError:
299 pass
300 return timedelta(minutes=1)
301
288 def run(self):302 def run(self):
289 """See `IRunnableJob`."""303 """See `IRunnableJob`."""
290 client = getUtility(ISnapStoreClient)304 client = getUtility(ISnapStoreClient)
291 try:305 try:
292 if "status_url" not in self.metadata:306 if "status_url" not in self.metadata:
293 self.metadata["status_url"] = client.upload(self.snapbuild)307 self.metadata["status_url"] = client.upload(self.snapbuild)
308 # We made progress, so reset attempt_count.
309 self.attempt_count = 1
294 if self.store_url is None:310 if self.store_url is None:
295 self.store_url, self.store_revision = (311 self.store_url, self.store_revision = (
296 client.checkStatus(self.metadata["status_url"]))312 client.checkStatus(self.metadata["status_url"]))
313 # We made progress, so reset attempt_count.
314 self.attempt_count = 1
297 if self.snapbuild.snap.store_channels:315 if self.snapbuild.snap.store_channels:
298 if self.store_revision is None:316 if self.store_revision is None:
299 raise ManualReview(317 raise ManualReview(
300318
=== modified file 'lib/lp/snappy/tests/test_snapbuildjob.py'
--- lib/lp/snappy/tests/test_snapbuildjob.py 2017-06-14 10:25:23 +0000
+++ lib/lp/snappy/tests/test_snapbuildjob.py 2017-06-29 18:31:47 +0000
@@ -7,6 +7,8 @@
77
8__metaclass__ = type8__metaclass__ = type
99
10from datetime import timedelta
11
10from fixtures import FakeLogger12from fixtures import FakeLogger
11from testtools.matchers import (13from testtools.matchers import (
12 Equals,14 Equals,
@@ -256,7 +258,6 @@
256 self.assertWebhookDeliveries(snapbuild, ["Pending"])258 self.assertWebhookDeliveries(snapbuild, ["Pending"])
257 # Try again. The upload part of the job is retried, and this time259 # Try again. The upload part of the job is retried, and this time
258 # it succeeds.260 # it succeeds.
259 job.lease_expires = None
260 job.scheduled_start = None261 job.scheduled_start = None
261 client.upload.calls = []262 client.upload.calls = []
262 client.upload.failure = None263 client.upload.failure = None
@@ -403,7 +404,6 @@
403 self.assertWebhookDeliveries(snapbuild, ["Pending"])404 self.assertWebhookDeliveries(snapbuild, ["Pending"])
404 # Try again. The upload part of the job is not retried, and this405 # Try again. The upload part of the job is not retried, and this
405 # time the scan completes.406 # time the scan completes.
406 job.lease_expires = None
407 job.scheduled_start = None407 job.scheduled_start = None
408 client.upload.calls = []408 client.upload.calls = []
409 client.checkStatus.calls = []409 client.checkStatus.calls = []
@@ -594,3 +594,39 @@
594 snapbuild.id, footer)594 snapbuild.id, footer)
595 self.assertWebhookDeliveries(595 self.assertWebhookDeliveries(
596 snapbuild, ["Pending", "Failed to release to channels"])596 snapbuild, ["Pending", "Failed to release to channels"])
597
598 def test_retry_delay(self):
599 # The job is retried every minute, unless it just made one of its
600 # first four attempts to poll the status endpoint, in which case the
601 # delays are 15/15/30/30 seconds.
602 self.useFixture(FakeLogger())
603 snapbuild = self.makeSnapBuild()
604 job = SnapStoreUploadJob.create(snapbuild)
605 client = FakeSnapStoreClient()
606 client.upload.failure = UploadFailedResponse(
607 "Proxy error", can_retry=True)
608 self.useFixture(ZopeUtilityFixture(client, ISnapStoreClient))
609 with dbuser(config.ISnapStoreUploadJobSource.dbuser):
610 JobRunner([job]).runAll()
611 self.assertNotIn("status_url", job.metadata)
612 self.assertEqual(timedelta(seconds=60), job.retry_delay)
613 job.scheduled_start = None
614 client.upload.failure = None
615 client.upload.result = self.status_url
616 client.checkStatus.failure = UploadNotScannedYetResponse()
617 for expected_delay in (15, 15, 30, 30, 60):
618 with dbuser(config.ISnapStoreUploadJobSource.dbuser):
619 JobRunner([job]).runAll()
620 self.assertIn("status_url", job.metadata)
621 self.assertIsNone(job.store_url)
622 self.assertEqual(
623 timedelta(seconds=expected_delay), job.retry_delay)
624 job.scheduled_start = None
625 client.checkStatus.failure = None
626 client.checkStatus.result = (self.store_url, 1)
627 with dbuser(config.ISnapStoreUploadJobSource.dbuser):
628 JobRunner([job]).runAll()
629 self.assertEqual(self.store_url, job.store_url)
630 self.assertIsNone(job.error_message)
631 self.assertEqual([], pop_notifications())
632 self.assertEqual(JobStatus.COMPLETED, job.job.status)