Merge ~cjwatson/launchpad:refactor-update-by-hash into launchpad:master

Proposed by Colin Watson
Status: Merged
Approved by: Colin Watson
Approved revision: bde545ea4ca0de7e9ea00c1775a2f371fa4f56a0
Merge reported by: Otto Co-Pilot
Merged at revision: not available
Proposed branch: ~cjwatson/launchpad:refactor-update-by-hash
Merge into: launchpad:master
Diff against target: 411 lines (+157/-126)
5 files modified
lib/lp/archivepublisher/publishing.py (+134/-89)
lib/lp/archivepublisher/tests/test_publisher.py (+6/-0)
lib/lp/soyuz/interfaces/archivefile.py (+3/-4)
lib/lp/soyuz/model/archivefile.py (+4/-5)
lib/lp/soyuz/tests/test_archivefile.py (+10/-28)
Reviewer Review Type Date Requested Status
William Grant code Approve
Review via email: mp+434573@code.launchpad.net

Commit message

Refactor _updateByHash to clarify logic

Description of the change

William Grant pointed out in https://code.launchpad.net/~cjwatson/launchpad/+git/launchpad/+merge/373971 that the flow of `Publisher._updateByHash` was quite difficult to understand, and proposed an alternative structure in https://code.launchpad.net/~wgrant/launchpad/+git/launchpad/+merge/390811. Over two years later, I've finally got round to digesting and polishing this.

The basic idea is that, rather than interleaving database/disk queries and actions, we now gather all the information we need from the database and from the archive on disk at the start, and then separately take all the actions needed to reconcile them and to keep `by-hash` directories up to date.

No functional change is intended here.

To post a comment you must log in.
Revision history for this message
William Grant (wgrant) wrote :

Nice, just one comment.

review: Approve (code)
Revision history for this message
Colin Watson (cjwatson) :

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/lib/lp/archivepublisher/publishing.py b/lib/lp/archivepublisher/publishing.py
2index 58d948a..b8cea18 100644
3--- a/lib/lp/archivepublisher/publishing.py
4+++ b/lib/lp/archivepublisher/publishing.py
5@@ -48,6 +48,7 @@ from lp.registry.model.distroseries import DistroSeries
6 from lp.services.database.bulk import load
7 from lp.services.database.constants import UTC_NOW
8 from lp.services.database.interfaces import IStore
9+from lp.services.database.sqlbase import get_transaction_timestamp
10 from lp.services.helpers import filenameToContentType
11 from lp.services.librarian.client import LibrarianClient
12 from lp.services.osutils import ensure_directory_exists, open_for_writing
13@@ -65,6 +66,7 @@ from lp.soyuz.interfaces.publishing import (
14 IPublishingSet,
15 active_publishing_status,
16 )
17+from lp.soyuz.model.archivefile import ArchiveFile
18 from lp.soyuz.model.binarypackagerelease import BinaryPackageRelease
19 from lp.soyuz.model.distroarchseries import DistroArchSeries
20 from lp.soyuz.model.publishing import (
21@@ -1168,15 +1170,14 @@ class Publisher:
22 return self.distro.displayname
23 return "LP-PPA-%s" % get_ppa_reference(self.archive)
24
25- def _updateByHash(self, suite, release_file_name, extra_files):
26- """Update by-hash files for a suite.
27+ def _getCurrentFiles(self, suite, release_file_name, extra_files):
28+ # Gather information on entries in the current Release file.
29+ release_path = os.path.join(
30+ self._config.distsroot, suite, release_file_name
31+ )
32+ with open(release_path) as release_file:
33+ release_data = Release(release_file)
34
35- This takes Release file data which references a set of on-disk
36- files, injects any newly-modified files from that set into the
37- librarian and the ArchiveFile table, and updates the on-disk by-hash
38- directories to be in sync with ArchiveFile. Any on-disk by-hash
39- entries that ceased to be current sufficiently long ago are removed.
40- """
41 extra_data = {}
42 for filename, real_filename in extra_files.items():
43 hashes = self._readIndexFileHashes(
44@@ -1189,24 +1190,9 @@ class Publisher:
45 hashes[archive_hash.deb822_name]
46 )
47
48- release_path = os.path.join(
49- self._config.distsroot, suite, release_file_name
50- )
51- with open(release_path) as release_file:
52- release_data = Release(release_file)
53- archive_file_set = getUtility(IArchiveFileSet)
54- by_hashes = ByHashes(self._config.distsroot, self.log)
55 suite_dir = os.path.relpath(
56 os.path.join(self._config.distsroot, suite), self._config.distsroot
57 )
58- container = "release:%s" % suite
59-
60- def strip_dists(path):
61- assert path.startswith("dists/")
62- return path[len("dists/") :]
63-
64- # Gather information on entries in the current Release file, and
65- # make sure nothing there is condemned.
66 current_files = {}
67 for current_entry in release_data["SHA256"] + extra_data.get(
68 "SHA256", []
69@@ -1214,66 +1200,93 @@ class Publisher:
70 path = os.path.join(suite_dir, current_entry["name"])
71 real_name = current_entry.get("real_name", current_entry["name"])
72 real_path = os.path.join(suite_dir, real_name)
73- current_files[path] = (
74- int(current_entry["size"]),
75- current_entry["sha256"],
76- real_path,
77- )
78- uncondemned_files = set()
79- for db_file in archive_file_set.getByArchive(
80- self.archive, container=container, condemned=True, eager_load=True
81- ):
82- stripped_path = strip_dists(db_file.path)
83- if stripped_path in current_files:
84- current_sha256 = current_files[stripped_path][1]
85- if db_file.library_file.content.sha256 == current_sha256:
86- uncondemned_files.add(db_file)
87- if uncondemned_files:
88- for container, path, sha256 in archive_file_set.unscheduleDeletion(
89- uncondemned_files
90- ):
91- self.log.debug(
92- "by-hash: Unscheduled %s for %s in %s for deletion"
93- % (sha256, path, container)
94+ full_path = os.path.join(self._config.distsroot, real_path)
95+ if os.path.exists(full_path):
96+ current_files[path] = (
97+ int(current_entry["size"]),
98+ current_entry["sha256"],
99+ real_path,
100+ )
101+ else:
102+ self.log.warning(
103+ "%s contains %s, but %s does not exist!"
104+ % (release_path, path, full_path)
105 )
106+ return current_files
107
108- # Remove any condemned files from the database whose stay of
109- # execution has elapsed. We ensure that we know about all the
110- # relevant by-hash directory trees before doing any removals so that
111- # we can prune them properly later.
112+ def _updateByHash(self, suite, release_file_name, extra_files):
113+ """Update by-hash files for a suite.
114+
115+ This takes Release file data which references a set of on-disk
116+ files, injects any newly-modified files from that set into the
117+ librarian and the ArchiveFile table, and updates the on-disk by-hash
118+ directories to be in sync with ArchiveFile. Any on-disk by-hash
119+ entries that ceased to be current sufficiently long ago are removed.
120+ """
121+ archive_file_set = getUtility(IArchiveFileSet)
122+ container = "release:%s" % suite
123+
124+ by_hashes = ByHashes(self._config.distsroot, self.log)
125+ existing_live_files = {}
126+ existing_nonlive_files = {}
127+ keep_files = set()
128+ reapable_files = set()
129+
130+ def strip_dists(path):
131+ assert path.startswith("dists/")
132+ return path[len("dists/") :]
133+
134+ # Record all files from the database.
135+ db_now = get_transaction_timestamp(IStore(ArchiveFile))
136 for db_file in archive_file_set.getByArchive(
137- self.archive, container=container
138- ):
139- by_hashes.registerChild(os.path.dirname(strip_dists(db_file.path)))
140- for container, path, sha256 in archive_file_set.reap(
141- self.archive, container=container
142+ self.archive, container=container, eager_load=True
143 ):
144- self.log.debug(
145- "by-hash: Deleted %s for %s in %s" % (sha256, path, container)
146+ file_key = (
147+ strip_dists(db_file.path),
148+ db_file.library_file.content.sha256,
149 )
150+ # Ensure any subdirectories are registered early on, in case we're
151+ # about to delete the only file and need to know to prune it.
152+ by_hashes.registerChild(os.path.dirname(strip_dists(db_file.path)))
153
154- # Ensure that all files recorded in the database are in by-hash.
155- db_files = archive_file_set.getByArchive(
156- self.archive, container=container, eager_load=True
157+ if db_file.scheduled_deletion_date is None:
158+ # XXX wgrant 2020-09-16: Once we have
159+ # ArchiveFile.date_superseded in place, this should be a DB
160+ # constraint - i.e. there should only be a single
161+ # non-superseded row for each path/content pair.
162+ assert file_key not in existing_live_files
163+ existing_live_files[file_key] = db_file
164+ else:
165+ existing_nonlive_files[file_key] = db_file
166+
167+ if (
168+ db_file.scheduled_deletion_date is not None
169+ and db_file.scheduled_deletion_date < db_now
170+ ):
171+ # File has expired. Mark it for reaping.
172+ reapable_files.add(db_file)
173+ else:
174+ # File should still be on disk.
175+ by_hashes.add(strip_dists(db_file.path), db_file.library_file)
176+
177+ # Record all files from the archive on disk.
178+ current_files = self._getCurrentFiles(
179+ suite, release_file_name, extra_files
180 )
181- for db_file in db_files:
182- by_hashes.add(strip_dists(db_file.path), db_file.library_file)
183+ new_live_files = {
184+ (path, sha256) for path, (_, sha256, _) in current_files.items()
185+ }
186
187- # Condemn any database records that do not correspond to current
188- # index files.
189- condemned_files = set()
190- for db_file in db_files:
191- if db_file.scheduled_deletion_date is None:
192- stripped_path = strip_dists(db_file.path)
193- if stripped_path in current_files:
194- current_sha256 = current_files[stripped_path][1]
195- else:
196- current_sha256 = None
197- if db_file.library_file.content.sha256 != current_sha256:
198- condemned_files.add(db_file)
199- if condemned_files:
200+ # Schedule the deletion of any ArchiveFiles which are current in the
201+ # DB but weren't current in the archive this round.
202+ old_files = [
203+ af
204+ for key, af in existing_live_files.items()
205+ if key not in new_live_files
206+ ]
207+ if old_files:
208 for container, path, sha256 in archive_file_set.scheduleDeletion(
209- condemned_files, timedelta(days=BY_HASH_STAY_OF_EXECUTION)
210+ old_files, timedelta(days=BY_HASH_STAY_OF_EXECUTION)
211 ):
212 self.log.debug(
213 "by-hash: Scheduled %s for %s in %s for deletion"
214@@ -1281,32 +1294,64 @@ class Publisher:
215 )
216
217 # Ensure that all the current index files are in by-hash and have
218- # corresponding database entries.
219+ # corresponding ArchiveFiles.
220 # XXX cjwatson 2016-03-15: This should possibly use bulk creation,
221 # although we can only avoid about a third of the queries since the
222 # librarian client has no bulk upload methods.
223 for path, (size, sha256, real_path) in current_files.items():
224+ file_key = (path, sha256)
225 full_path = os.path.join(self._config.distsroot, real_path)
226- if os.path.exists(full_path) and not by_hashes.known(
227- path, "SHA256", sha256
228- ):
229- with open(full_path, "rb") as fileobj:
230- db_file = archive_file_set.newFromFile(
231- self.archive,
232- container,
233- os.path.join("dists", path),
234- fileobj,
235- size,
236- filenameToContentType(path),
237- )
238+ assert os.path.exists(full_path) # guaranteed by _getCurrentFiles
239+ # Ensure there's a current ArchiveFile row, either by finding a
240+ # matching non-live file and marking it live again, or by
241+ # creating a new one based on the file on disk.
242+ if file_key not in existing_live_files:
243+ if file_key in existing_nonlive_files:
244+ db_file = existing_nonlive_files[file_key]
245+ keep_files.add(db_file)
246+ else:
247+ with open(full_path, "rb") as fileobj:
248+ db_file = archive_file_set.newFromFile(
249+ self.archive,
250+ container,
251+ os.path.join("dists", path),
252+ fileobj,
253+ size,
254+ filenameToContentType(path),
255+ )
256+ # And ensure the by-hash links exist on disk.
257+ if not by_hashes.known(path, "SHA256", sha256):
258 by_hashes.add(
259 path, db_file.library_file, copy_from_path=real_path
260 )
261
262- # Finally, remove any files from disk that aren't recorded in the
263- # database and aren't active.
264+ # Unschedule the deletion of any ArchiveFiles which are current in
265+ # the archive this round but that were previously scheduled for
266+ # deletion in the DB.
267+ if keep_files:
268+ for container, path, sha256 in archive_file_set.unscheduleDeletion(
269+ keep_files
270+ ):
271+ self.log.debug(
272+ "by-hash: Unscheduled %s for %s in %s for deletion"
273+ % (sha256, path, container)
274+ )
275+
276+ # Remove any files from disk that aren't recorded in the database.
277 by_hashes.prune()
278
279+ # And remove expired ArchiveFiles from the DB now that we've pruned
280+ # them and their directories from disk.
281+ delete_files = reapable_files - keep_files
282+ if delete_files:
283+ for container, path, sha256 in archive_file_set.delete(
284+ delete_files
285+ ):
286+ self.log.debug(
287+ "by-hash: Deleted %s for %s in %s"
288+ % (sha256, path, container)
289+ )
290+
291 def _writeReleaseFile(self, suite, release_data):
292 """Write a Release file to the archive (as Release.new).
293
294diff --git a/lib/lp/archivepublisher/tests/test_publisher.py b/lib/lp/archivepublisher/tests/test_publisher.py
295index 44f54c3..80c7d6a 100644
296--- a/lib/lp/archivepublisher/tests/test_publisher.py
297+++ b/lib/lp/archivepublisher/tests/test_publisher.py
298@@ -3029,6 +3029,12 @@ class TestUpdateByHash(TestPublisherBase):
299 "lp.soyuz.model.archivefile._now", lambda: self.times[-1]
300 )
301 )
302+ self.useFixture(
303+ MonkeyPatch(
304+ "lp.archivepublisher.publishing.get_transaction_timestamp",
305+ lambda _: self.times[-1],
306+ )
307+ )
308
309 def advanceTime(self, delta=None, absolute=None):
310 if delta is not None:
311diff --git a/lib/lp/soyuz/interfaces/archivefile.py b/lib/lp/soyuz/interfaces/archivefile.py
312index e41548b..6ca9990 100644
313--- a/lib/lp/soyuz/interfaces/archivefile.py
314+++ b/lib/lp/soyuz/interfaces/archivefile.py
315@@ -140,11 +140,10 @@ class IArchiveFileSet(Interface):
316 :return: An iterable of matched container names.
317 """
318
319- def reap(archive, container=None):
320- """Delete archive files that are past their scheduled deletion date.
321+ def delete(archive_files):
322+ """Delete these archive files.
323
324- :param archive: Delete files from this `IArchive`.
325- :param container: Delete only files with this container.
326+ :param archive_files: The `IArchiveFile`s to delete.
327 :return: An iterable of (container, path, sha256) for files that
328 were deleted.
329 """
330diff --git a/lib/lp/soyuz/model/archivefile.py b/lib/lp/soyuz/model/archivefile.py
331index 670cc96..3aecb98 100644
332--- a/lib/lp/soyuz/model/archivefile.py
333+++ b/lib/lp/soyuz/model/archivefile.py
334@@ -216,18 +216,17 @@ class ArchiveFileSet:
335 )
336
337 @staticmethod
338- def reap(archive, container=None):
339+ def delete(archive_files):
340 """See `IArchiveFileSet`."""
341 # XXX cjwatson 2016-03-30 bug=322972: Requires manual SQL due to
342 # lack of support for DELETE FROM ... USING ... in Storm.
343 clauses = [
344- ArchiveFile.archive == archive,
345- ArchiveFile.scheduled_deletion_date < _now(),
346+ ArchiveFile.id.is_in(
347+ {archive_file.id for archive_file in archive_files}
348+ ),
349 ArchiveFile.library_file_id == LibraryFileAlias.id,
350 LibraryFileAlias.contentID == LibraryFileContent.id,
351 ]
352- if container is not None:
353- clauses.append(ArchiveFile.container == container)
354 where = convert_storm_clause_to_string(And(*clauses))
355 return list(
356 IPrimaryStore(ArchiveFile).execute(
357diff --git a/lib/lp/soyuz/tests/test_archivefile.py b/lib/lp/soyuz/tests/test_archivefile.py
358index 6e8f819..f3febf1 100644
359--- a/lib/lp/soyuz/tests/test_archivefile.py
360+++ b/lib/lp/soyuz/tests/test_archivefile.py
361@@ -256,40 +256,22 @@ class TestArchiveFile(TestCaseWithFactory):
362 ),
363 )
364
365- def test_reap(self):
366+ def test_delete(self):
367 archive = self.factory.makeArchive()
368 archive_files = [
369- self.factory.makeArchiveFile(archive=archive, container="foo")
370- for _ in range(3)
371+ self.factory.makeArchiveFile(archive=archive) for _ in range(4)
372 ]
373- archive_files.append(self.factory.makeArchiveFile(archive=archive))
374- other_archive = self.factory.makeArchive()
375- archive_files.append(
376- self.factory.makeArchiveFile(archive=other_archive)
377- )
378- now = get_transaction_timestamp(Store.of(archive_files[0]))
379- removeSecurityProxy(
380- archive_files[0]
381- ).scheduled_deletion_date = now - timedelta(days=1)
382- removeSecurityProxy(
383- archive_files[1]
384- ).scheduled_deletion_date = now + timedelta(days=1)
385- removeSecurityProxy(
386- archive_files[3]
387- ).scheduled_deletion_date = now - timedelta(days=1)
388- removeSecurityProxy(
389- archive_files[4]
390- ).scheduled_deletion_date = now - timedelta(days=1)
391- archive_file_set = getUtility(IArchiveFileSet)
392 expected_rows = [
393 (
394- "foo",
395- archive_files[0].path,
396- archive_files[0].library_file.content.sha256,
397- ),
398+ archive_file.container,
399+ archive_file.path,
400+ archive_file.library_file.content.sha256,
401+ )
402+ for archive_file in archive_files[:2]
403 ]
404- rows = archive_file_set.reap(archive, container="foo")
405+ archive_file_set = getUtility(IArchiveFileSet)
406+ rows = archive_file_set.delete(archive_files[:2])
407 self.assertContentEqual(expected_rows, rows)
408 self.assertContentEqual(
409- archive_files[1:4], archive_file_set.getByArchive(archive)
410+ archive_files[2:], archive_file_set.getByArchive(archive)
411 )

Subscribers

People subscribed via source and target branches

to status/vote changes: