Merge ubuntu-debuginfod:concurrency-rework into ubuntu-debuginfod:master

Proposed by Sergio Durigan Junior
Status: Merged
Merged at revision: 4e909f63e8695ed61e0c259d129075b52c7ead54
Proposed branch: ubuntu-debuginfod:concurrency-rework
Merge into: ubuntu-debuginfod:master
Diff against target: 1004 lines (+443/-248)
8 files modified
README.md (+6/-3)
ddebgetter.py (+85/-174)
ddebpoller.py (+83/-32)
debuggetter.py (+46/-7)
debuginfod.py (+85/-10)
debugpoller.py (+10/-13)
poll_launchpad.py (+80/-9)
utils.py (+48/-0)
Reviewer Review Type Date Requested Status
Athos Ribeiro (community) Approve
Canonical Server packageset reviewers Pending
Robie Basak Pending
Lena Voytek Pending
Bryce Harrington Pending
Canonical Server Reporter Pending
Review via email: mp+434525@code.launchpad.net

Description of the change

This is a rework of the entire application after discussing with Robie and finding out that it did a poor job at treating race conditions.

The main idea behind this MP is that we will now keep the state of each task in a SQL database. I chose PostgreSQL because (a) I like the database, and (b) it does a very decent job a handling concurrent requests. Unfortunately, sqlite did not meet the standards I was looking for.

The poller (poll_launchpad.py) will poll Launchpad, get the list of ddebs and source packages that have been published since it last ran, and dispatch tasks for each one. While doing so, it will also update the database and insert entries for each task. The poller is a single process.

The getter is where things get more interesting. Celery will spawn multiple concurrent threads that will treat tasks as they arrive, and in an ideal world these tasks are all unique. However, there are certain scenarios (very unlikely to happen, but not impossible) where we might have duplicate tasks in the queue. Even in this scenario, as long as these tasks are processed in different moments everything should be fine, because the getter is idempotent. However^2, in the extremely unlikely scenario where two workers decide to process two identical tasks, we might have some race conditions when, e.g., writing the ddeb to disk. For this reason, the getter now tries to move the files atomically.

The Celery application (debuginfod.py) will monitor the tasks and update the SQL database accordingly. If the task succeeds, we don't need to care about it anymore and can simply remove its entry. If the task gets rescheduled (because of a timeout issue, for example), then we keep its entry intact, which should prevent duplicate tasks from being dispatched by the poller. There's also some error logging as well, because sometimes a task may throw an unexpected exception and we need to be able to log this problem in order to come back to it later. The logging is also done using a SQL table.

I ran extensive tests locally to make sure things are working correctly, and fixed every problem I found. I'm confident that this can be put in production, but I would like your input first. The MP is big but I split the commits logically, which should make things easier to review. Let me know if you need any clarification.

To post a comment you must log in.
Revision history for this message
Bryce Harrington (bryce) wrote :

Quick review limited to just commit 8595463... yes this looks good. Since that commit is followup to the prior MP feel free to land it independently if you'd like.

Rest of this MP will take longer to digest.

Revision history for this message
Bryce Harrington (bryce) wrote :

Another quick look, just at the database code, with a refactoring suggestion.

Revision history for this message
Sergio Durigan Junior (sergiodj) :
Revision history for this message
Sergio Durigan Junior (sergiodj) wrote :

BTW, with my latest force-push I've also implemented a new class (DebugDB) which abstracts the access to the database. I think it's cleaner this way.

Revision history for this message
Lena Voytek (lvoytek) wrote :

So far this looks good to me. I've added some suggestions for cleanup below

Revision history for this message
Sergio Durigan Junior (sergiodj) wrote :

Thanks, Lena.

Addressed all comments and pushed a new commit on top of everything.

Revision history for this message
Athos Ribeiro (athos-ribeiro) wrote :

Looking good!

I added a few inline comments

review: Needs Information
Revision history for this message
Sergio Durigan Junior (sergiodj) wrote :

Thanks, Athos! Addressed most of your comments and pushed a new commit.

Revision history for this message
Sergio Durigan Junior (sergiodj) :
Revision history for this message
Athos Ribeiro (athos-ribeiro) wrote :

Thanks!

review: Approve
Revision history for this message
Sergio Durigan Junior (sergiodj) wrote :

Thanks for the reviews, folks.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/README.md b/README.md
2index 8fe7f00..3c49598 100644
3--- a/README.md
4+++ b/README.md
5@@ -8,18 +8,21 @@ This project is used in the deployment of Ubuntu's
6 * Python
7
8 ```
9-python3-pyelftools >= 0.29-1
10-python3-debian
11 python3-git
12 python3-celery
13 python3-launchpadlib
14 python3-requests
15 python3-sdnotify
16+python3-psycopg2
17 ```
18
19 * Applications
20
21 ```
22 celery
23-rabbitmq-server
24+rabbitmq-server*
25+postgresql-server*
26+
27+*: These could be charms, but (as of this writing) they don't yet
28+ support Jammy.
29 ```
30diff --git a/ddebgetter.py b/ddebgetter.py
31index 62eebc6..661d477 100644
32--- a/ddebgetter.py
33+++ b/ddebgetter.py
34@@ -18,16 +18,11 @@
35 # Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>
36
37 import os
38-from urllib import parse
39 import lzma
40 import tarfile
41 import subprocess
42
43-from debian import debfile
44 import tempfile
45-from elftools.common.utils import bytelist2string
46-from elftools.common.exceptions import ELFError, ELFParseError, DWARFError
47-from elftools.elf.elffile import ELFFile
48
49 from git import Git, Repo
50 from git.exc import GitCommandError
51@@ -49,45 +44,42 @@ class DdebGetter(DebugGetter):
52
53 :param request: The dictionary containing the information
54 necessary to fetch this ddeb.
55- :type request: dict(str : str)
56- """
57- if not request or not request.get("ddebs"):
58- return
59-
60- self._logger.debug(f"Processing request to download ddebs: {request}")
61- self._download_ddebs(
62- request["source_package"],
63- request["component"],
64- request["ddebs"],
65+ :type request: dict(str : str)"""
66+ if request is None:
67+ raise TypeError("Invalid request (None)")
68+ if request.get("ddeb_url") is None:
69+ raise TypeError("No 'ddeb_url' in request")
70+ if request.get("ddeb_url") is None:
71+ raise TypeError("No 'architecture' in request")
72+ if request["architecture"] == "source":
73+ raise ValueError("Wrong request: source fetch")
74+
75+ self._logger.debug(f"Processing request to download ddeb: {request}")
76+ self._download_ddeb(
77+ request["source_package"], request["component"], request["ddeb_url"]
78 )
79
80- def _download_ddebs(self, source_package, component, urls):
81- """Download the ddebs associated with a package/version.
82+ def _download_ddeb(self, source_package, component, ddeb_url):
83+ """Download a ddeb associated with a package.
84
85 :param str source_package: Source package name.
86
87- :param str version: Source package version.
88-
89 :param str component: Source package component.
90
91- :param list urls: List of ddeb URLs."""
92+ :param str ddeb_url: The ddeb URL."""
93 savepath = self._make_savepath(source_package, component)
94- os.makedirs(savepath, mode=0o755, exist_ok=True)
95- for url in urls:
96- self._logger.debug(f"Downloading '{url}' into '{savepath}'")
97- self._download_from_lp(url, savepath)
98+ self._logger.debug(f"Downloading '{ddeb_url}' into '{savepath}'")
99+ self._download_from_lp(ddeb_url, savepath)
100
101
102 # The function below was taken from git-ubuntu. We need to make
103 # sure we perform the same version -> tag transformation as it
104 # does.
105-def _git_dep14_tag(version):
106+def git_dep14_tag(version):
107 """Munge a version string according to http://dep.debian.net/deps/dep14/
108
109 :param str version: The version to be adjusted."""
110- version = version.replace("~", "_")
111- version = version.replace(":", "%")
112- version = version.replace("..", ".#.")
113+ version = version.replace("~", "_").replace(":", "%").replace("..", ".#.")
114 if version.endswith("."):
115 version = version + "#"
116 if version.endswith(".lock"):
117@@ -95,6 +87,20 @@ def _git_dep14_tag(version):
118 version = pre + ".#lock"
119 return version
120
121+def adjust_tar_filepath(tarinfo):
122+ """Adjust the filepath for a TarInfo file.
123+
124+ This function is needed because TarFile.add strips the leading
125+ slash from the filenames, so we have to workaround it by
126+ re-adding the slash ourselves.
127+
128+ This function is intended to be used as a callback provided to
129+ TarFile.add.
130+
131+ :param TarInfo tarinfo: The tarinfo."""
132+ tarinfo.name = os.path.join("/", tarinfo.name)
133+ return tarinfo
134+
135
136 class DdebSourceCodeGetter(DebugGetter):
137 """Get (fetch) the source code associated with a ddeb."""
138@@ -107,113 +113,24 @@ class DdebSourceCodeGetter(DebugGetter):
139
140 :param request: The dictionary containing the information
141 necessary to fetch this source code.
142- :type request: dict(str : str)
143- """
144- if not request or not request.get("ddebs"):
145- return
146-
147- self._logger.debug(
148- f"Processing request to download source code (for ddebs): {request}"
149- )
150+ :type request: dict(str : str)"""
151+ if request is None:
152+ raise TypeError("Invalid request (None)")
153+ if request.get("source_urls") is None:
154+ raise TypeError("No 'source_urls' in request")
155+ if request.get("ddeb_url") is None:
156+ raise TypeError("No 'architecture' in request")
157+ if request["architecture"] != "source":
158+ raise ValueError("Wrong request: ddeb fetch")
159+
160+ self._logger.debug(f"Processing request to download source code: {request}")
161 self._process_source(
162 request["source_package"],
163 request["version"],
164 request["component"],
165- request["ddebs"],
166- request["sources"],
167+ request["source_urls"],
168 )
169
170- def _get_comp_dirs(self, debug_file, filename):
171- """Get the list of available DW_AT_comp_dir declarations from the
172- DWARF file.
173-
174- :param TextIOWrapper debug_file: The .debug file.
175- """
176- elf_file = ELFFile(debug_file)
177-
178- if not elf_file.has_dwarf_info():
179- self._logger.debug(f"'{filename}' doesn't have DWARF")
180- return []
181-
182- result = []
183- dwarf_file = elf_file.get_dwarf_info()
184- for cu in dwarf_file.iter_CUs():
185- attr = cu.get_top_DIE().attributes
186- if "DW_AT_comp_dir" in attr.keys():
187- value = attr["DW_AT_comp_dir"].value
188- if isinstance(value, bytes):
189- result.append(bytelist2string(value).decode("UTF-8"))
190- self._logger.debug(
191- f"Found the following DW_AT_comp_dir for '{filename}':\n{result}"
192- )
193- return result
194-
195- def _should_fetch_source_for_debugfile(self, source_package, version, filepath):
196- """Whether we should fetch the source code for the specified source
197- package/deb.
198-
199- We return True if the debug file associated with the source
200- package has any DW_AT_comp_dir entity whose path points to
201- "/usr/src/source_package-version/".
202-
203- :param str source_package: Source package name.
204-
205- :param str version: Source package version.
206-
207- :param str filepath: Full path to the .ddeb file."""
208- deb_file = debfile.DebFile(filepath)
209- # The path we expect to find in any of the DW_AT_comp_dirs.
210- expected_path = f"/usr/src/{source_package}-{version}/"
211- for deb_internal_file in list(deb_file.data):
212- if not deb_internal_file.endswith(".debug"):
213- continue
214- with tempfile.TemporaryFile() as debug_file:
215- debug_file.write(deb_file.data.get_content(deb_internal_file))
216- try:
217- comp_dirs = self._get_comp_dirs(debug_file, deb_internal_file)
218- except (ELFError, ELFParseError, DWARFError) as e:
219- self._logger.warning(
220- f"Exception while looking for DW_AT_comp_dirs: {e}"
221- )
222- # There's no point in retrying anything here, so
223- # we just generate a warning and keep going.
224- continue
225-
226- for comp_dir in comp_dirs:
227- if comp_dir.startswith(expected_path):
228- self._logger.debug(f"Found a good DW_AT_comp_dir: {comp_dir}")
229- return True
230- return False
231-
232- def _should_fetch_source(self, source_package, version, component, filenames):
233- """Return True if we should fetch the source code for a specific
234- package, False otherwise.
235-
236- Whether or not we should fetch the source code is determined
237- by the presence of a specific path prefix (/usr/src/...) in
238- the DW_AT_comp_dir declarations of a DWARF file.
239-
240- :param str source_package: Source package name.
241-
242- :param str version: Source package version.
243-
244- :param str component: Source package component (main,
245- universe, etc.)
246-
247- :param list urls: is the list of URLs used to fetch the respective debug
248- information for the package.
249- """
250- savepath = self._make_savepath(source_package, component)
251- for fname in filenames:
252- filepath = os.path.join(savepath, fname)
253- if not os.path.isfile(filepath):
254- continue
255- if self._should_fetch_source_for_debugfile(
256- source_package, version, filepath
257- ):
258- return True
259- return False
260-
261 def _download_source_code_from_git(self, source_package, version, filepath):
262 """Download the source code using Launchpad's git repository.
263
264@@ -223,11 +140,16 @@ class DdebSourceCodeGetter(DebugGetter):
265
266 :param str filepath: The full pathname where the resulting
267 source code tarball should be saved.
268- """
269+
270+ :rtype: bool
271+
272+ This method returns True when the operation succeeds, or False
273+ *iff* the "git clone" command fails to run. Otherwise, this
274+ function will throw an exception."""
275 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as source_dir:
276 g = Git()
277 git_dir = os.path.join(source_dir, f"{source_package}")
278- tag = "applied/" + _git_dep14_tag(f"{version}")
279+ tag = "applied/" + git_dep14_tag(f"{version}")
280 self._logger.debug(
281 f"Cloning '{source_package}' git repo into '{git_dir}' (tag: '{tag}')"
282 )
283@@ -247,29 +169,16 @@ class DdebSourceCodeGetter(DebugGetter):
284 return False
285 repo = Repo(git_dir)
286 prefix_path = os.path.join("/usr/src/", f"{source_package}-{version}/")
287- with lzma.open(filepath, "w") as xzfile:
288- self._logger.debug(
289- f"Archiving git repo for '{source_package}-{version}' as '{filepath}'"
290- )
291- repo.archive(xzfile, prefix=prefix_path, format="tar")
292+ self._logger.debug(
293+ f"Archiving git repo for '{source_package}-{version}' as '{filepath}'"
294+ )
295+ with tempfile.NamedTemporaryFile() as tmpfile:
296+ with lzma.open(tmpfile.name, "w") as xzfile:
297+ repo.archive(xzfile, prefix=prefix_path, format="tar")
298+ self._try_to_move_atomically(tmpfile.name, filepath)
299
300 return True
301
302- def _adjust_tar_filepath(self, tarinfo):
303- """Adjust the filepath for a TarInfo file.
304-
305- This function is needed because TarFile.add strips the leading
306- slash from the filenames, so we have to workaround it by
307- re-adding the slash ourselves.
308-
309- This function is intended to be used as a callback provided to
310- TarFile.add.
311-
312- :param TarInfo tarinfo: The tarinfo.
313- """
314- tarinfo.name = f"/{tarinfo.name}"
315- return tarinfo
316-
317 def _download_source_code_from_dsc(
318 self, source_package, version, filepath, source_urls
319 ):
320@@ -286,17 +195,30 @@ class DdebSourceCodeGetter(DebugGetter):
321 :param list source_urls: List of URLs used to fetch the source
322 package. This is usually the list returned by the
323 sourceFileUrls() Launchpad API call.
324- """
325+
326+ :rtype: bool
327+ This method returns True when the operation succeeds, or False
328+ *iff* the "dpkg-source -x" command fails to run. Otherwise,
329+ this function will throw an exception."""
330 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as source_dir:
331 for url in source_urls:
332 self._download_from_lp(url, source_dir)
333
334- dscfile = os.path.join(source_dir, f"{source_package}_{version}.dsc")
335- if not os.path.isfile(dscfile):
336- self._logger.warning(f"'{dscfile}' doesn't exist, but it should.")
337+ dscfile = None
338+ for f in os.listdir(source_dir):
339+ newf = os.path.join(source_dir, f)
340+ if os.path.isfile(newf) and f.endswith(".dsc"):
341+ dscfile = newf
342+ break
343+
344+ if dscfile is None:
345+ self._logger.warning(
346+ "Could not find .dsc file, even though it should exist."
347+ )
348 return False
349
350 outdir = os.path.join(source_dir, "outdir")
351+ self._logger.debug(f"Will call 'dpkg-source -x {dscfile} {outdir}'")
352 try:
353 subprocess.run(
354 ["/usr/bin/dpkg-source", "-x", dscfile, outdir],
355@@ -315,8 +237,12 @@ class DdebSourceCodeGetter(DebugGetter):
356
357 prefix_path = os.path.join("/usr/src/", f"{source_package}-{version}/")
358
359- with tarfile.open(filepath, "w:xz") as tfile:
360- tfile.add(outdir, arcname=prefix_path, filter=self._adjust_tar_filepath)
361+ with tempfile.NamedTemporaryFile() as tmpfile:
362+ with tarfile.open(tmpfile.name, "w:xz") as tfile:
363+ tfile.add(
364+ outdir, arcname=prefix_path, filter=adjust_tar_filepath
365+ )
366+ self._try_to_move_atomically(tmpfile.name, filepath)
367
368 return True
369
370@@ -355,29 +281,14 @@ class DdebSourceCodeGetter(DebugGetter):
371 # like to retry the task.
372 raise DebugGetterRetry()
373
374- def _process_source(
375- self, source_package, version, component, ddeb_urls, source_urls
376- ):
377+ def _process_source(self, source_package, version, component, source_urls):
378 """Process the request to fetch the source code.
379
380- We only download the source code if we know it will be
381- properly indexed by debuginfod.
382-
383 :param str source_package: Source package name.
384
385 :param str version: Source package version.
386
387 :param str component: Source package component.
388
389- :param list urls: List of ddeb URLs."""
390- # Convert URLs into filenames.
391- filenames = [
392- os.path.basename(parse.urlparse(fname).path) for fname in ddeb_urls
393- ]
394- if not self._should_fetch_source(source_package, version, component, filenames):
395- self._logger.info(
396- f"Should not fetch source code for '{source_package}-{version}'"
397- )
398- return
399-
400+ :param list source_urls: List of source URLs."""
401 self._download_source_code(source_package, version, component, source_urls)
402diff --git a/ddebpoller.py b/ddebpoller.py
403index 24ba613..6f92c2f 100644
404--- a/ddebpoller.py
405+++ b/ddebpoller.py
406@@ -17,14 +17,18 @@
407
408 # Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>
409
410+import os
411+from urllib import parse
412 from debugpoller import DebugPoller
413
414
415 class DdebPoller(DebugPoller):
416- def __init__(
417- self, initial_interval=None, force_initial_interval=False, dry_run=False
418- ):
419- """Initialize the object using 'ddeb' as its name.
420+ """Perform the Launchpad polling and obtain a list of ddebs and source
421+ packages."""
422+
423+ def __init__(self, initial_interval=1, force_initial_interval=False, dry_run=False):
424+
425+ """Initialize the object using 'ddeb' as its name.
426
427 Look at DebugPoller's docstring for an explanation about the arguments."""
428 super().__init__(
429@@ -34,43 +38,90 @@ class DdebPoller(DebugPoller):
430 dry_run=dry_run,
431 )
432
433- def get_ddebs(self):
434- """Get the list of ddebs that have been published since the last
435- timestamp from Launchpad."""
436+ def get_ddebs_and_sources(self):
437+ """Get the list of ddebs and source files that have been published
438+ since the last timestamp from Launchpad.
439+
440+ :rtype: dict, str
441+
442+ Return a dictionary containing all ddebs and source packages
443+ found, and also the new timestamp that should then be recorded
444+ by calling record_timestamp."""
445 timestamp = self._get_timestamp()
446- assert timestamp is not None, f"_get_timestamp returned None"
447- assert timestamp != "", f"_get_timestamp returned a blank timestamp"
448+ if timestamp is None:
449+ raise RuntimeError("_get_timestamp returned None")
450+ if timestamp == "":
451+ raise RuntimeError("_get_timestamp returned a blank timestamp")
452 new_timestamp = self._generate_timestamp()
453- assert new_timestamp is not None, f"_generate_timestamp returned None"
454- assert new_timestamp != "", f"_generate_timestamp returned a blank timestamp"
455-
456- result = []
457+ if new_timestamp is None:
458+ raise RuntimeError("_generate_timestamp returned None")
459+ if new_timestamp == "":
460+ raise RuntimeError("_generate_timestamp returned a blank timestamp")
461
462 self._logger.info(
463 f"Polling ddebs created since '{timestamp}' (it's now '{new_timestamp}')"
464 )
465
466- for pkg in self._main_archive.getPublishedSources(
467- order_by_date=True, created_since_date=timestamp, status="Published"
468+ result = []
469+ for pkg in self._main_archive.getPublishedBinaries(
470+ order_by_date=True, created_since_date=timestamp
471 ):
472- ddeb_urls = [url for url in pkg.binaryFileUrls() if url.endswith(".ddeb")]
473- ddebs_len = len(ddeb_urls)
474- if ddebs_len > 0:
475- distro_series = self._lp.load(pkg.distro_series_link).name
476- pkgname = pkg.source_package_name
477- pkgver = pkg.source_package_version
478+ if pkg.status not in ("Pending", "Published"):
479+ continue
480+ if not pkg.is_debug:
481+ continue
482+
483+ ddeb_urls = pkg.binaryFileUrls()
484+ if len(ddeb_urls) == 0:
485+ # Safety check.
486+ continue
487+
488+ srcname = pkg.source_package_name
489+ srcver = pkg.source_package_version
490+ component = pkg.component_name
491+
492+ # We create one message (which will eventually become a
493+ # Celery task) per ddeb. This makes it easier later to do
494+ # deduplication, and also has the benefit of making the
495+ # downloading process more granular for multiple ddebs.
496+ for url in ddeb_urls:
497+ # Obtain the ddeb filename and its architecture.
498+ ddeb = os.path.basename(parse.urlparse(url).path)
499+ _, _, arch_and_extension = ddeb.split("_")
500+ arch, _ = arch_and_extension.split(".")
501+
502 msg = {
503- "source_package": pkgname,
504- "version": pkgver,
505- "component": pkg.component_name,
506- "distro_series": distro_series,
507- "ddebs": ddeb_urls,
508- "sources": pkg.sourceFileUrls(),
509+ "source_package": srcname,
510+ "version": srcver,
511+ "component": component,
512+ "ddeb_url": url,
513+ "ddeb_filename": ddeb,
514+ "architecture": arch,
515 }
516- self._logger.debug(
517- f"For source package '{pkgname}-{pkgver}', found {ddebs_len}:\n{ddeb_urls}"
518- )
519 result.append(msg)
520
521- self._record_timestamp(new_timestamp)
522- return result
523+ for pkg in self._main_archive.getPublishedSources(
524+ order_by_date=True, created_since_date=timestamp
525+ ):
526+ if pkg.status not in ("Pending", "Published"):
527+ continue
528+
529+ src_urls = pkg.sourceFileUrls()
530+ if len(src_urls) == 0:
531+ # Safety check.
532+ continue
533+
534+ srcname = pkg.source_package_name
535+ srcver = pkg.source_package_version
536+ component = pkg.component_name
537+
538+ msg = {
539+ "source_package": srcname,
540+ "version": srcver,
541+ "component": component,
542+ "source_urls": src_urls,
543+ "architecture": "source",
544+ }
545+ result.append(msg)
546+
547+ return result, new_timestamp
548diff --git a/debuggetter.py b/debuggetter.py
549index 9bb6102..bafa914 100644
550--- a/debuggetter.py
551+++ b/debuggetter.py
552@@ -21,6 +21,8 @@ import os
553 import requests
554 from urllib import parse
555 import logging
556+import tempfile
557+import shutil
558
559
560 class DebugGetterTimeout(Exception):
561@@ -34,6 +36,7 @@ class DebugGetterRetry(Exception):
562
563 DEFAULT_MIRROR_DIR = "/srv/debug-mirror"
564
565+
566 class DebugGetter:
567 """Base class for a Debug Getter."""
568
569@@ -45,8 +48,7 @@ class DebugGetter:
570
571 :param str subdir: The subdirectory (insider mirror_dir) where
572 the module will save its files. For example, a ddeb
573- getter module should specify "ddebs" here.
574- """
575+ getter module should specify "ddebs" here."""
576 self._mirror_dir = mirror_dir
577 self._subdir = subdir
578 self._logger = logging.getLogger(__name__)
579@@ -55,17 +57,52 @@ class DebugGetter:
580 """Return the full save path for a package.
581
582 :param str source_package: The package name.
583- :param str component: The component name (main, universe, etc.).
584- """
585+ :param str component: The component name (main, universe, etc.)."""
586+ if source_package.startswith("lib"):
587+ pkgname_initials = source_package[:4]
588+ else:
589+ pkgname_initials = source_package[0]
590+
591 return os.path.join(
592 self._mirror_dir,
593 self._subdir,
594 component,
595- source_package[:1],
596+ pkgname_initials,
597 source_package,
598 )
599
600+ def _try_to_move_atomically(self, src, dst):
601+ """Try to move SRC to DST atomically.
602+
603+ This function assumes that SRC and DST have different filenames.
604+
605+ :param str src: The source file (full path).
606+
607+ :param str dst: The destination file (full path)."""
608+ self._logger.debug(f"Trying to move '{src}' to '{dst}' atomically")
609+ savepath = os.path.dirname(dst)
610+ os.makedirs(savepath, mode=0o755, exist_ok=True)
611+ newtmpfile = os.path.join(savepath, os.path.basename(src))
612+ shutil.copyfile(src, newtmpfile)
613+ try:
614+ shutil.move(newtmpfile, dst)
615+ except shutil.SameFileError:
616+ os.remove(newtmpfile)
617+ self._logger.warning(
618+ f"Could not create '{dst}' atomically: same file already exists"
619+ )
620+
621 def _download_from_lp(self, url, savepath):
622+ """Download a file from Launchpad.
623+
624+ This method tries to download the file in chunks into a
625+ temporary file and then does an atomic move to the final
626+ destination.
627+
628+ :param str url: The URL that should be downloaded.
629+
630+ :param str savepath: The full path (minus the filename) where
631+ the file should be saved."""
632 filepath = os.path.join(savepath, os.path.basename(parse.urlparse(url).path))
633 if os.path.exists(filepath):
634 self._logger.debug(f"'{filepath}' exists, doing nothing")
635@@ -75,10 +112,12 @@ class DebugGetter:
636 with requests.Session() as s:
637 with s.get(url, allow_redirects=True, timeout=10, stream=True) as r:
638 r.raise_for_status()
639- with open(filepath, "wb") as f:
640+ with tempfile.NamedTemporaryFile(mode="wb") as tmpfile:
641 # 10 MB for chunk_size should be enough...
642 for chunk in r.iter_content(chunk_size=10 * 1024 * 1024):
643- f.write(chunk)
644+ tmpfile.write(chunk)
645+ tmpfile.flush()
646+ self._try_to_move_atomically(tmpfile.name, filepath)
647 except (
648 requests.ConnectionError,
649 requests.HTTPError,
650diff --git a/debuginfod.py b/debuginfod.py
651index c0d0357..5c5d77f 100644
652--- a/debuginfod.py
653+++ b/debuginfod.py
654@@ -18,11 +18,16 @@
655 # Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>
656
657 from celery import Celery
658-from celery.signals import worker_ready, worker_shutting_down
659+from celery.signals import (
660+ worker_ready,
661+ worker_shutting_down,
662+ task_postrun,
663+ celeryd_init,
664+)
665
666 from debuggetter import DebugGetterTimeout, DebugGetterRetry
667-
668 from ddebgetter import DdebGetter, DdebSourceCodeGetter
669+from utils import DebugDB
670
671 import sdnotify
672
673@@ -45,6 +50,23 @@ app.conf.update(
674 sdnotifier = sdnotify.SystemdNotifier()
675
676
677+def _record_error_into_db(exc, msg):
678+ """Record an error into the database.
679+
680+ :param Exception exc: The Exception that was triggered.
681+
682+ :param dict msg: The message that triggere the error."""
683+ with DebugDB() as db:
684+ with db.cursor() as cur:
685+ cur.execute(
686+ "INSERT INTO errors (task, exception) VALUES (%s, %s)",
687+ (
688+ str(msg),
689+ str(exc),
690+ ),
691+ )
692+
693+
694 @app.task(
695 name="grab_ddebs",
696 autoretry_for=(DebugGetterTimeout, DebugGetterRetry),
697@@ -58,10 +80,20 @@ def grab_ddebs(msg):
698 """Dispatch the DdebGetter task.
699
700 :param dict(str -> str) msg: The dictionary containing the message
701- that will be processed by the getter.
702- """
703- g = DdebGetter()
704- g.process_request(msg)
705+ that will be processed by the getter."""
706+ try:
707+ g = DdebGetter()
708+ g.process_request(msg)
709+ except Exception as e:
710+ if not isinstance(e, DebugGetterRetry) and not isinstance(
711+ e, DebugGetterTimeout
712+ ):
713+ # This is some other kind of error that we need to deal
714+ # with. Mark it as such.
715+ _record_error_into_db(e, msg)
716+ # We still need to raise the exception here. Celery will
717+ # reschedule the task if applicable.
718+ raise e
719
720
721 @app.task(
722@@ -77,10 +109,20 @@ def grab_ddebs_sources(msg):
723 """Dispatch the DdebSourceCodeGetter task.
724
725 :param dict(str -> str) msg: The dictionary containing the message
726- that will be processed by the getter.
727- """
728- g = DdebSourceCodeGetter()
729- g.process_request(msg)
730+ that will be processed by the getter."""
731+ try:
732+ g = DdebSourceCodeGetter()
733+ g.process_request(msg)
734+ except Exception as e:
735+ if not isinstance(e, DebugGetterRetry) and not isinstance(
736+ e, DebugGetterTimeout
737+ ):
738+ # This is some other kind of error that we need to deal
739+ # with. Mark it as such.
740+ _record_error_into_db(e, msg)
741+ # We still need to raise the exception here. Celery will
742+ # reschedule the task if applicable.
743+ raise e
744
745
746 @worker_ready.connect
747@@ -95,5 +137,38 @@ def notify_worker_shutting_down(**kwargs):
748 sdnotifier.notify("STOPPING=1")
749
750
751+@task_postrun.connect
752+def task_postrun(sender, args, state, **kwargs):
753+ """Process a task that's just finished.
754+
755+ We keep the state of each task in a SQL database, and this
756+ function is responsible for cleaning up the state for tasks that have
757+ succeeded."""
758+ if state not in ("SUCCESS",):
759+ return
760+
761+ with DebugDB() as db:
762+ with db.cursor() as cur:
763+ msg = args[0]
764+ jobid = msg["jobid"]
765+ cur.execute("DELETE FROM tasks WHERE id = %s", (jobid,))
766+
767+
768+@celeryd_init.connect
769+def setup_worker(sender=None, **kwargs):
770+ """Setup the worker.
771+
772+ We have to create the tables that will be populated when
773+ processing tasks."""
774+ with DebugDB() as db:
775+ with db.cursor() as cur:
776+ cur.execute(
777+ "CREATE TABLE IF NOT EXISTS tasks (id serial PRIMARY KEY, source text, version text, arch text)"
778+ )
779+ cur.execute(
780+ "CREATE TABLE IF NOT EXISTS errors (id serial PRIMARY KEY, task text, exception text)"
781+ )
782+
783+
784 if __name__ == "__main__":
785 app.start()
786diff --git a/debugpoller.py b/debugpoller.py
787index 058909d..2d74723 100644
788--- a/debugpoller.py
789+++ b/debugpoller.py
790@@ -58,9 +58,10 @@ class DebugPoller:
791
792 :param bool dry_run: Tell the poller that it shouldn't record
793 the timestamp in the file when the operation finishes.
794- Default is False.
795- """
796- self._lp = Launchpad.login_anonymously("ubuntu-debuginfod poller", "production")
797+ Default is False."""
798+ self._lp = Launchpad.login_anonymously(
799+ "ubuntu-debuginfod poller", "production", version="devel"
800+ )
801 self._main_archive = self._lp.distributions["ubuntu"].main_archive
802 self.TIMESTAMP_FILE = self.TIMESTAMP_FILE + f"-{module_name}"
803
804@@ -84,8 +85,7 @@ class DebugPoller:
805 :param interval: Specify how long ago (in hours) the timestamp must
806 refer to. If not specified, the timestamp is generated for
807 the current time.
808- :type interval: int or None
809- """
810+ :type interval: int or None"""
811 d = datetime.datetime.now(datetime.timezone.utc)
812
813 if interval is not None:
814@@ -100,8 +100,7 @@ class DebugPoller:
815 new one.
816
817 If a timestamp file exists, returns its value. Otherwise,
818- generate a new one with self._initial_interval.
819- """
820+ generate a new one with self._initial_interval."""
821 if not os.path.exists(self.TIMESTAMP_FILE) or self._force_initial_interval:
822 self._logger.debug(f"Timestamp file '{self.TIMESTAMP_FILE}' doesn't exist")
823 self._logger.debug(
824@@ -110,14 +109,13 @@ class DebugPoller:
825 return self._generate_timestamp(interval=self._initial_interval)
826
827 with open(self.TIMESTAMP_FILE, "r", encoding="UTF-8") as f:
828- return f.readline()
829+ return f.readline().rstrip()
830
831- def _record_timestamp(self, timestamp):
832+ def record_timestamp(self, timestamp):
833 """Save the timestamp into the timestamp file.
834
835 :param str timestamp: Timestamp that should be saved into the
836- TIMESTAMP_FILE.
837- """
838+ TIMESTAMP_FILE."""
839 if self._dry_run:
840 self._logger.debug("dry_run enabled, not recording timestamp file")
841 return
842@@ -130,6 +128,5 @@ class DebugPoller:
843 def set_initial_interval(self, initial_interval):
844 """Set the initial_interval value.
845
846- :param int initial_interval: The new initial_interval to be used.
847- """
848+ :param int initial_interval: The new initial_interval to be used."""
849 self._initial_interval = initial_interval
850diff --git a/poll_launchpad.py b/poll_launchpad.py
851index 161aa77..65afa28 100644
852--- a/poll_launchpad.py
853+++ b/poll_launchpad.py
854@@ -19,16 +19,87 @@
855
856 from ddebpoller import DdebPoller
857 from debuginfod import grab_ddebs, grab_ddebs_sources
858+from utils import DebugDB
859
860-from celery import chain
861+import psycopg2
862
863-def poll_launchpad():
864- """Poll Launchpad."""
865+
866+def ensure_task(conn, cursor, msg):
867+ """Return True if the task described by MSG should be processed.
868+
869+ Also, if True, then insert the task information into the database.
870+
871+ :param DebugDB conn: The database connection handler.
872+
873+ :param psycopg2.cursor cursor: The cursor to the SQL database.
874+
875+ :param dict msg: The message describing a task.
876+
877+ :rtype: bool, int
878+ :return: True or False, and the jobid if True (or None if False)"""
879+ srcname = msg["source_package"]
880+ srcver = msg["version"]
881+ architecture = msg["architecture"]
882+ cursor.execute(
883+ "SELECT * FROM tasks WHERE source = %s AND version = %s AND arch = %s",
884+ (
885+ srcname,
886+ srcver,
887+ architecture,
888+ ),
889+ )
890+
891+ if cursor.rowcount > 0:
892+ return False, None
893+
894+ cursor.execute(
895+ "INSERT INTO tasks (source, version, arch) VALUES (%s, %s, %s) RETURNING id",
896+ (
897+ srcname,
898+ srcver,
899+ architecture,
900+ ),
901+ )
902+ jobid = cursor.fetchone()[0]
903+ conn.commit()
904+ return True, jobid
905+
906+
907+def poll_launchpad(conn, cursor):
908+ """Poll Launchpad and process the list of ddebs and source packages.
909+
910+ :param DebugDB conn: The SQL connection handler.
911+
912+ :param psycopg2.cursor cursor: The cursor to the SQL database."""
913 poller = DdebPoller()
914- for msg in poller.get_ddebs():
915- # We have to use a chain here because the ddebs need to be
916- # downloaded before we can start fetching the sources.
917- chain(grab_ddebs.si(msg), grab_ddebs_sources.si(msg))()
918+ messages, new_timestamp = poller.get_ddebs_and_sources()
919+ for msg in messages:
920+ proceed, jobid = ensure_task(conn, cursor, msg)
921+ if proceed:
922+ if jobid is None:
923+ raise RuntimeError("jobid is None")
924+ # We append the ID to the message so that we can efficiently
925+ # remove the task once it's been processed.
926+ msg["jobid"] = str(jobid)
927+
928+ if msg["architecture"] == "source":
929+ # Source packages have their "architecture" as "source".
930+ grab_ddebs_sources.delay(msg)
931+ else:
932+ grab_ddebs.delay(msg)
933+ poller.record_timestamp(new_timestamp)
934+
935+
936+def main():
937+ # Connect to the database.
938+ with DebugDB() as db:
939+ with db.cursor() as cursor:
940+ cursor.execute(
941+ "CREATE TABLE IF NOT EXISTS tasks (id serial PRIMARY KEY, source text, version text, arch text)"
942+ )
943+ db.commit()
944+ poll_launchpad(db, cursor)
945+
946
947-if __name__ == '__main__':
948- poll_launchpad()
949+if __name__ == "__main__":
950+ main()
951diff --git a/utils.py b/utils.py
952new file mode 100644
953index 0000000..b577009
954--- /dev/null
955+++ b/utils.py
956@@ -0,0 +1,48 @@
957+#!/usr/bin/python3
958+
959+# Copyright (C) 2022 Canonical Ltd.
960+
961+# This program is free software: you can redistribute it and/or modify
962+# it under the terms of the GNU General Public License as published by
963+# the Free Software Foundation, either version 3 of the License, or
964+# (at your option) any later version.
965+
966+# This program is distributed in the hope that it will be useful,
967+# but WITHOUT ANY WARRANTY; without even the implied warranty of
968+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
969+# GNU General Public License for more details.
970+
971+# You should have received a copy of the GNU General Public License
972+# along with this program. If not, see <https://www.gnu.org/licenses/>.
973+
974+# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>
975+
976+import psycopg2
977+
978+
979+class DebugDB:
980+ """Abstract a connection to the SQL database."""
981+
982+ def __init__(self):
983+ self._conn = psycopg2.connect("dbname=ubuntu-debuginfod user=mirror")
984+
985+ def cursor(self):
986+ """Return the cursor to be used when dealing with the database."""
987+ return self._conn.cursor()
988+
989+ def commit(self):
990+ """Commit the transaction to the DB."""
991+ self._conn.commit()
992+
993+ def __enter__(self):
994+ return self
995+
996+ def __exit__(self, exc_type, exc_value, exc_traceback):
997+ self._conn.commit()
998+ self._conn.close()
999+ self._conn = None
1000+
1001+ def __del__(self):
1002+ if self._conn is not None:
1003+ self._conn.commit()
1004+ self._conn.close()

Subscribers

People subscribed via source and target branches

to all changes: