Merge ~cjwatson/launchpad:copy-advisory-lock into launchpad:master

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: a154f19200a82d984f5332035991ba81ea50e762
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~cjwatson/launchpad:copy-advisory-lock
Merge into: launchpad:master
Diff against target: 202 lines (+96/-6)
3 files modified
lib/lp/services/database/locking.py (+6/-1)
lib/lp/soyuz/model/packagecopyjob.py (+42/-4)
lib/lp/soyuz/tests/test_packagecopyjob.py (+48/-1)
Reviewer Review Type Date Requested Status
Ioana Lasc (community) Approve
Review via email: mp+399646@code.launchpad.net

Commit message

Take an advisory lock when copying packages

Description of the change

Take an advisory lock on the hash of the target archive, source package name, and source package version when copying packages; otherwise it's possible for multiple copies of the same package into different series in the same archive to race with the conflict checker and create conflicting builds.

There are still some possible races (since different sources may produce the same binaries), but they're much less common.

This is similar to https://code.launchpad.net/~cjwatson/launchpad/copy-lock-archive/+merge/279275, converted to git and rebased on master; but I made the lock parameters more fine-grained (archive+name+version rather than just archive) to avoid the thundering-herd problem.

To post a comment you must log in.
Revision history for this message
Ioana Lasc (ilasc) wrote :

Looks good.

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/services/database/locking.py b/lib/lp/services/database/locking.py
2index e8046a4..266f971 100644
3--- a/lib/lp/services/database/locking.py
4+++ b/lib/lp/services/database/locking.py
5@@ -1,4 +1,4 @@
6-# Copyright 2011-2015 Canonical Ltd. This software is licensed under the
7+# Copyright 2011-2021 Canonical Ltd. This software is licensed under the
8 # GNU Affero General Public License version 3 (see the file LICENSE).
9
10 __metaclass__ = type
11@@ -39,6 +39,11 @@ class LockType(DBEnumeratedType):
12 Git repository reference scan.
13 """)
14
15+ PACKAGE_COPY = DBItem(2, """Package copy.
16+
17+ Package copy.
18+ """)
19+
20
21 @contextmanager
22 def try_advisory_lock(lock_type, lock_id, store):
23diff --git a/lib/lp/soyuz/model/packagecopyjob.py b/lib/lp/soyuz/model/packagecopyjob.py
24index b07b88d..83c8b1f 100644
25--- a/lib/lp/soyuz/model/packagecopyjob.py
26+++ b/lib/lp/soyuz/model/packagecopyjob.py
27@@ -1,4 +1,4 @@
28-# Copyright 2010-2019 Canonical Ltd. This software is licensed under the
29+# Copyright 2010-2021 Canonical Ltd. This software is licensed under the
30 # GNU Affero General Public License version 3 (see the file LICENSE).
31
32 __metaclass__ = type
33@@ -8,7 +8,9 @@ __all__ = [
34 "PlainPackageCopyJob",
35 ]
36
37+from datetime import timedelta
38 import logging
39+import random
40
41 from lazr.delegates import delegate_to
42 from lazr.jobrunner.jobrunner import SuspendJobException
43@@ -50,6 +52,11 @@ from lp.services.database.interfaces import (
44 IMasterStore,
45 IStore,
46 )
47+from lp.services.database.locking import (
48+ AdvisoryLockHeld,
49+ LockType,
50+ try_advisory_lock,
51+ )
52 from lp.services.database.stormbase import StormBase
53 from lp.services.job.interfaces.job import JobStatus
54 from lp.services.job.model.job import (
55@@ -267,7 +274,7 @@ class PlainPackageCopyJob(PackageCopyJobDerived):
56 user_error_types = (CannotCopy,)
57 # Raised when closing bugs ends up hitting another process and
58 # deadlocking.
59- retry_error_types = (TransactionRollbackError,)
60+ retry_error_types = (TransactionRollbackError, AdvisoryLockHeld)
61 max_retries = 5
62
63 @classmethod
64@@ -579,10 +586,41 @@ class PlainPackageCopyJob(PackageCopyJobDerived):
65 transaction.commit()
66 super(PlainPackageCopyJob, self).notifyOops(oops)
67
68+ @property
69+ def _advisory_lock_id(self):
70+ """An ID for use in advisory locks for this job."""
71+ # Mask off the bottom 31 bits so that this fits in PostgreSQL's
72+ # integer type, allowing it to be used as the second argument to a
73+ # two-argument pg_try_advisory_lock function.
74+ return hash((
75+ self.target_archive_id,
76+ self.package_name,
77+ self.package_version)) & 0x7FFFFFFF
78+
79+ @property
80+ def retry_delay(self):
81+ """See `BaseRunnableJob`."""
82+ # Retry in somewhere between 6 and 8 minutes. This is longer than
83+ # the lease duration and the soft time limit, and the randomness
84+ # makes it less likely that N-1 of a set of jobs that are all
85+ # competing for the same advisory lock will collide the next time
86+ # round. There's already no particular ordering guarantee among
87+ # jobs with the same copy policy (see
88+ # `PackageCopyJobDerived.iterReady`).
89+ return timedelta(minutes=random.uniform(6, 8))
90+
91 def run(self):
92 """See `IRunnableJob`."""
93 try:
94- self.attemptCopy()
95+ # Take an advisory lock to fend off by far the most common case
96+ # of races between multiple instances of the copier's conflict
97+ # checker, namely copies of the same source name and version
98+ # from the same archive into multiple destination series. Other
99+ # races are still possible, but much rarer.
100+ with try_advisory_lock(
101+ LockType.PACKAGE_COPY, self._advisory_lock_id,
102+ IStore(Archive)):
103+ self.attemptCopy()
104 except CannotCopy as e:
105 # Remember the target archive purpose, as otherwise aborting the
106 # transaction will forget it.
107@@ -607,7 +645,7 @@ class PlainPackageCopyJob(PackageCopyJobDerived):
108 # the job. We will normally have a DistroSeriesDifference
109 # in this case.
110 pass
111- except SuspendJobException:
112+ except (SuspendJobException, AdvisoryLockHeld):
113 raise
114 except:
115 # Abort work done so far, but make sure that we commit the
116diff --git a/lib/lp/soyuz/tests/test_packagecopyjob.py b/lib/lp/soyuz/tests/test_packagecopyjob.py
117index 6817c10..7c2ef99 100644
118--- a/lib/lp/soyuz/tests/test_packagecopyjob.py
119+++ b/lib/lp/soyuz/tests/test_packagecopyjob.py
120@@ -1,4 +1,4 @@
121-# Copyright 2010-2019 Canonical Ltd. This software is licensed under the
122+# Copyright 2010-2021 Canonical Ltd. This software is licensed under the
123 # GNU Affero General Public License version 3 (see the file LICENSE).
124
125 """Tests for sync package jobs."""
126@@ -6,8 +6,11 @@
127 from __future__ import absolute_import, print_function, unicode_literals
128
129 import operator
130+import os
131+import signal
132 from textwrap import dedent
133
134+from fixtures import FakeLogger
135 from storm.store import Store
136 from testtools.content import text_content
137 from testtools.matchers import (
138@@ -28,6 +31,10 @@ from lp.registry.model.distroseriesdifferencecomment import (
139 )
140 from lp.services.config import config
141 from lp.services.database.interfaces import IStore
142+from lp.services.database.locking import (
143+ LockType,
144+ try_advisory_lock,
145+ )
146 from lp.services.features.testing import FeatureFixture
147 from lp.services.job.interfaces.job import JobStatus
148 from lp.services.job.runner import JobRunner
149@@ -58,6 +65,7 @@ from lp.soyuz.interfaces.section import ISectionSet
150 from lp.soyuz.interfaces.sourcepackageformat import (
151 ISourcePackageFormatSelectionSet,
152 )
153+from lp.soyuz.model.archive import Archive
154 from lp.soyuz.model.packagecopyjob import PackageCopyJob
155 from lp.soyuz.model.queue import PackageUpload
156 from lp.soyuz.tests.test_publishing import SoyuzTestPublisher
157@@ -329,6 +337,45 @@ class PlainPackageCopyJobTests(TestCaseWithFactory, LocalTestHelper):
158
159 self.assertRaises(Boom, job.run)
160
161+ def test_run_tries_advisory_lock(self):
162+ # A job is retried if an advisory lock for the same archive, package
163+ # name, and version is held.
164+ logger = self.useFixture(FakeLogger())
165+ job = create_proper_job(self.factory)
166+ advisory_lock_id = hash((
167+ job.target_archive_id,
168+ job.package_name,
169+ job.package_version)) & 0x7FFFFFFF
170+ self.assertEqual(
171+ advisory_lock_id, removeSecurityProxy(job)._advisory_lock_id)
172+ switch_dbuser(self.dbuser)
173+ # Fork so that we can take an advisory lock from a different
174+ # PostgreSQL session.
175+ read, write = os.pipe()
176+ pid = os.fork()
177+ if pid == 0: # child
178+ os.close(read)
179+ with try_advisory_lock(
180+ LockType.PACKAGE_COPY, advisory_lock_id, IStore(Archive)):
181+ os.write(write, b"1")
182+ try:
183+ signal.pause()
184+ except KeyboardInterrupt:
185+ pass
186+ os._exit(0)
187+ else: # parent
188+ try:
189+ os.close(write)
190+ os.read(read, 1)
191+ runner = JobRunner([job])
192+ runner.runAll()
193+ self.assertEqual(JobStatus.WAITING, job.status)
194+ self.assertEqual([], runner.oops_ids)
195+ self.assertIn(
196+ "Scheduling retry due to AdvisoryLockHeld", logger.output)
197+ finally:
198+ os.kill(pid, signal.SIGINT)
199+
200 def test_run_posts_copy_failure_as_comment(self):
201 # If the job fails with a CannotCopy exception, it swallows the
202 # exception and posts a DistroSeriesDifferenceComment with the

Subscribers

People subscribed via source and target branches

to status/vote changes: