Merge ubuntu-debuginfod:concurrency-rework into ubuntu-debuginfod:master

Proposed by Sergio Durigan Junior
Status: Merged
Merged at revision: 4e909f63e8695ed61e0c259d129075b52c7ead54
Proposed branch: ubuntu-debuginfod:concurrency-rework
Merge into: ubuntu-debuginfod:master
Diff against target: 1004 lines (+443/-248)
8 files modified
README.md (+6/-3)
ddebgetter.py (+85/-174)
ddebpoller.py (+83/-32)
debuggetter.py (+46/-7)
debuginfod.py (+85/-10)
debugpoller.py (+10/-13)
poll_launchpad.py (+80/-9)
utils.py (+48/-0)
Reviewer Review Type Date Requested Status
Athos Ribeiro (community) Approve
Canonical Server packageset reviewers Pending
Robie Basak Pending
Lena Voytek Pending
Bryce Harrington Pending
Canonical Server Reporter Pending
Review via email: mp+434525@code.launchpad.net

Description of the change

This is a rework of the entire application after discussing with Robie and finding out that it did a poor job at treating race conditions.

The main idea behind this MP is that we will now keep the state of each task in a SQL database. I chose PostgreSQL because (a) I like the database, and (b) it does a very decent job a handling concurrent requests. Unfortunately, sqlite did not meet the standards I was looking for.

The poller (poll_launchpad.py) will poll Launchpad, get the list of ddebs and source packages that have been published since it last ran, and dispatch tasks for each one. While doing so, it will also update the database and insert entries for each task. The poller is a single process.

The getter is where things get more interesting. Celery will spawn multiple concurrent threads that will treat tasks as they arrive, and in an ideal world these tasks are all unique. However, there are certain scenarios (very unlikely to happen, but not impossible) where we might have duplicate tasks in the queue. Even in this scenario, as long as these tasks are processed in different moments everything should be fine, because the getter is idempotent. However^2, in the extremely unlikely scenario where two workers decide to process two identical tasks, we might have some race conditions when, e.g., writing the ddeb to disk. For this reason, the getter now tries to move the files atomically.

The Celery application (debuginfod.py) will monitor the tasks and update the SQL database accordingly. If the task succeeds, we don't need to care about it anymore and can simply remove its entry. If the task gets rescheduled (because of a timeout issue, for example), then we keep its entry intact, which should prevent duplicate tasks from being dispatched by the poller. There's also some error logging as well, because sometimes a task may throw an unexpected exception and we need to be able to log this problem in order to come back to it later. The logging is also done using a SQL table.

I ran extensive tests locally to make sure things are working correctly, and fixed every problem I found. I'm confident that this can be put in production, but I would like your input first. The MP is big but I split the commits logically, which should make things easier to review. Let me know if you need any clarification.

To post a comment you must log in.
Revision history for this message
Bryce Harrington (bryce) wrote :

Quick review limited to just commit 8595463... yes this looks good. Since that commit is followup to the prior MP feel free to land it independently if you'd like.

Rest of this MP will take longer to digest.

Revision history for this message
Bryce Harrington (bryce) wrote :

Another quick look, just at the database code, with a refactoring suggestion.

Revision history for this message
Sergio Durigan Junior (sergiodj) :
Revision history for this message
Sergio Durigan Junior (sergiodj) wrote :

BTW, with my latest force-push I've also implemented a new class (DebugDB) which abstracts the access to the database. I think it's cleaner this way.

Revision history for this message
Lena Voytek (lvoytek) wrote :

So far this looks good to me. I've added some suggestions for cleanup below

Revision history for this message
Sergio Durigan Junior (sergiodj) wrote :

Thanks, Lena.

Addressed all comments and pushed a new commit on top of everything.

Revision history for this message
Athos Ribeiro (athos-ribeiro) wrote :

Looking good!

I added a few inline comments

review: Needs Information
Revision history for this message
Sergio Durigan Junior (sergiodj) wrote :

Thanks, Athos! Addressed most of your comments and pushed a new commit.

Revision history for this message
Sergio Durigan Junior (sergiodj) :
Revision history for this message
Athos Ribeiro (athos-ribeiro) wrote :

Thanks!

review: Approve
Revision history for this message
Sergio Durigan Junior (sergiodj) wrote :

Thanks for the reviews, folks.

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/README.md b/README.md
index 8fe7f00..3c49598 100644
--- a/README.md
+++ b/README.md
@@ -8,18 +8,21 @@ This project is used in the deployment of Ubuntu's
8* Python8* Python
99
10```10```
11python3-pyelftools >= 0.29-1
12python3-debian
13python3-git11python3-git
14python3-celery12python3-celery
15python3-launchpadlib13python3-launchpadlib
16python3-requests14python3-requests
17python3-sdnotify15python3-sdnotify
16python3-psycopg2
18```17```
1918
20* Applications19* Applications
2120
22```21```
23celery22celery
24rabbitmq-server23rabbitmq-server*
24postgresql-server*
25
26*: These could be charms, but (as of this writing) they don't yet
27 support Jammy.
25```28```
diff --git a/ddebgetter.py b/ddebgetter.py
index 62eebc6..661d477 100644
--- a/ddebgetter.py
+++ b/ddebgetter.py
@@ -18,16 +18,11 @@
18# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>18# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>
1919
20import os20import os
21from urllib import parse
22import lzma21import lzma
23import tarfile22import tarfile
24import subprocess23import subprocess
2524
26from debian import debfile
27import tempfile25import tempfile
28from elftools.common.utils import bytelist2string
29from elftools.common.exceptions import ELFError, ELFParseError, DWARFError
30from elftools.elf.elffile import ELFFile
3126
32from git import Git, Repo27from git import Git, Repo
33from git.exc import GitCommandError28from git.exc import GitCommandError
@@ -49,45 +44,42 @@ class DdebGetter(DebugGetter):
4944
50 :param request: The dictionary containing the information45 :param request: The dictionary containing the information
51 necessary to fetch this ddeb.46 necessary to fetch this ddeb.
52 :type request: dict(str : str)47 :type request: dict(str : str)"""
53 """48 if request is None:
54 if not request or not request.get("ddebs"):49 raise TypeError("Invalid request (None)")
55 return50 if request.get("ddeb_url") is None:
5651 raise TypeError("No 'ddeb_url' in request")
57 self._logger.debug(f"Processing request to download ddebs: {request}")52 if request.get("ddeb_url") is None:
58 self._download_ddebs(53 raise TypeError("No 'architecture' in request")
59 request["source_package"],54 if request["architecture"] == "source":
60 request["component"],55 raise ValueError("Wrong request: source fetch")
61 request["ddebs"],56
57 self._logger.debug(f"Processing request to download ddeb: {request}")
58 self._download_ddeb(
59 request["source_package"], request["component"], request["ddeb_url"]
62 )60 )
6361
64 def _download_ddebs(self, source_package, component, urls):62 def _download_ddeb(self, source_package, component, ddeb_url):
65 """Download the ddebs associated with a package/version.63 """Download a ddeb associated with a package.
6664
67 :param str source_package: Source package name.65 :param str source_package: Source package name.
6866
69 :param str version: Source package version.
70
71 :param str component: Source package component.67 :param str component: Source package component.
7268
73 :param list urls: List of ddeb URLs."""69 :param str ddeb_url: The ddeb URL."""
74 savepath = self._make_savepath(source_package, component)70 savepath = self._make_savepath(source_package, component)
75 os.makedirs(savepath, mode=0o755, exist_ok=True)71 self._logger.debug(f"Downloading '{ddeb_url}' into '{savepath}'")
76 for url in urls:72 self._download_from_lp(ddeb_url, savepath)
77 self._logger.debug(f"Downloading '{url}' into '{savepath}'")
78 self._download_from_lp(url, savepath)
7973
8074
81# The function below was taken from git-ubuntu. We need to make75# The function below was taken from git-ubuntu. We need to make
82# sure we perform the same version -> tag transformation as it76# sure we perform the same version -> tag transformation as it
83# does.77# does.
84def _git_dep14_tag(version):78def git_dep14_tag(version):
85 """Munge a version string according to http://dep.debian.net/deps/dep14/79 """Munge a version string according to http://dep.debian.net/deps/dep14/
8680
87 :param str version: The version to be adjusted."""81 :param str version: The version to be adjusted."""
88 version = version.replace("~", "_")82 version = version.replace("~", "_").replace(":", "%").replace("..", ".#.")
89 version = version.replace(":", "%")
90 version = version.replace("..", ".#.")
91 if version.endswith("."):83 if version.endswith("."):
92 version = version + "#"84 version = version + "#"
93 if version.endswith(".lock"):85 if version.endswith(".lock"):
@@ -95,6 +87,20 @@ def _git_dep14_tag(version):
95 version = pre + ".#lock"87 version = pre + ".#lock"
96 return version88 return version
9789
90def adjust_tar_filepath(tarinfo):
91 """Adjust the filepath for a TarInfo file.
92
93 This function is needed because TarFile.add strips the leading
94 slash from the filenames, so we have to workaround it by
95 re-adding the slash ourselves.
96
97 This function is intended to be used as a callback provided to
98 TarFile.add.
99
100 :param TarInfo tarinfo: The tarinfo."""
101 tarinfo.name = os.path.join("/", tarinfo.name)
102 return tarinfo
103
98104
99class DdebSourceCodeGetter(DebugGetter):105class DdebSourceCodeGetter(DebugGetter):
100 """Get (fetch) the source code associated with a ddeb."""106 """Get (fetch) the source code associated with a ddeb."""
@@ -107,113 +113,24 @@ class DdebSourceCodeGetter(DebugGetter):
107113
108 :param request: The dictionary containing the information114 :param request: The dictionary containing the information
109 necessary to fetch this source code.115 necessary to fetch this source code.
110 :type request: dict(str : str)116 :type request: dict(str : str)"""
111 """117 if request is None:
112 if not request or not request.get("ddebs"):118 raise TypeError("Invalid request (None)")
113 return119 if request.get("source_urls") is None:
114120 raise TypeError("No 'source_urls' in request")
115 self._logger.debug(121 if request.get("ddeb_url") is None:
116 f"Processing request to download source code (for ddebs): {request}"122 raise TypeError("No 'architecture' in request")
117 )123 if request["architecture"] != "source":
124 raise ValueError("Wrong request: ddeb fetch")
125
126 self._logger.debug(f"Processing request to download source code: {request}")
118 self._process_source(127 self._process_source(
119 request["source_package"],128 request["source_package"],
120 request["version"],129 request["version"],
121 request["component"],130 request["component"],
122 request["ddebs"],131 request["source_urls"],
123 request["sources"],
124 )132 )
125133
126 def _get_comp_dirs(self, debug_file, filename):
127 """Get the list of available DW_AT_comp_dir declarations from the
128 DWARF file.
129
130 :param TextIOWrapper debug_file: The .debug file.
131 """
132 elf_file = ELFFile(debug_file)
133
134 if not elf_file.has_dwarf_info():
135 self._logger.debug(f"'{filename}' doesn't have DWARF")
136 return []
137
138 result = []
139 dwarf_file = elf_file.get_dwarf_info()
140 for cu in dwarf_file.iter_CUs():
141 attr = cu.get_top_DIE().attributes
142 if "DW_AT_comp_dir" in attr.keys():
143 value = attr["DW_AT_comp_dir"].value
144 if isinstance(value, bytes):
145 result.append(bytelist2string(value).decode("UTF-8"))
146 self._logger.debug(
147 f"Found the following DW_AT_comp_dir for '{filename}':\n{result}"
148 )
149 return result
150
151 def _should_fetch_source_for_debugfile(self, source_package, version, filepath):
152 """Whether we should fetch the source code for the specified source
153 package/deb.
154
155 We return True if the debug file associated with the source
156 package has any DW_AT_comp_dir entity whose path points to
157 "/usr/src/source_package-version/".
158
159 :param str source_package: Source package name.
160
161 :param str version: Source package version.
162
163 :param str filepath: Full path to the .ddeb file."""
164 deb_file = debfile.DebFile(filepath)
165 # The path we expect to find in any of the DW_AT_comp_dirs.
166 expected_path = f"/usr/src/{source_package}-{version}/"
167 for deb_internal_file in list(deb_file.data):
168 if not deb_internal_file.endswith(".debug"):
169 continue
170 with tempfile.TemporaryFile() as debug_file:
171 debug_file.write(deb_file.data.get_content(deb_internal_file))
172 try:
173 comp_dirs = self._get_comp_dirs(debug_file, deb_internal_file)
174 except (ELFError, ELFParseError, DWARFError) as e:
175 self._logger.warning(
176 f"Exception while looking for DW_AT_comp_dirs: {e}"
177 )
178 # There's no point in retrying anything here, so
179 # we just generate a warning and keep going.
180 continue
181
182 for comp_dir in comp_dirs:
183 if comp_dir.startswith(expected_path):
184 self._logger.debug(f"Found a good DW_AT_comp_dir: {comp_dir}")
185 return True
186 return False
187
188 def _should_fetch_source(self, source_package, version, component, filenames):
189 """Return True if we should fetch the source code for a specific
190 package, False otherwise.
191
192 Whether or not we should fetch the source code is determined
193 by the presence of a specific path prefix (/usr/src/...) in
194 the DW_AT_comp_dir declarations of a DWARF file.
195
196 :param str source_package: Source package name.
197
198 :param str version: Source package version.
199
200 :param str component: Source package component (main,
201 universe, etc.)
202
203 :param list urls: is the list of URLs used to fetch the respective debug
204 information for the package.
205 """
206 savepath = self._make_savepath(source_package, component)
207 for fname in filenames:
208 filepath = os.path.join(savepath, fname)
209 if not os.path.isfile(filepath):
210 continue
211 if self._should_fetch_source_for_debugfile(
212 source_package, version, filepath
213 ):
214 return True
215 return False
216
217 def _download_source_code_from_git(self, source_package, version, filepath):134 def _download_source_code_from_git(self, source_package, version, filepath):
218 """Download the source code using Launchpad's git repository.135 """Download the source code using Launchpad's git repository.
219136
@@ -223,11 +140,16 @@ class DdebSourceCodeGetter(DebugGetter):
223140
224 :param str filepath: The full pathname where the resulting141 :param str filepath: The full pathname where the resulting
225 source code tarball should be saved.142 source code tarball should be saved.
226 """143
144 :rtype: bool
145
146 This method returns True when the operation succeeds, or False
147 *iff* the "git clone" command fails to run. Otherwise, this
148 function will throw an exception."""
227 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as source_dir:149 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as source_dir:
228 g = Git()150 g = Git()
229 git_dir = os.path.join(source_dir, f"{source_package}")151 git_dir = os.path.join(source_dir, f"{source_package}")
230 tag = "applied/" + _git_dep14_tag(f"{version}")152 tag = "applied/" + git_dep14_tag(f"{version}")
231 self._logger.debug(153 self._logger.debug(
232 f"Cloning '{source_package}' git repo into '{git_dir}' (tag: '{tag}')"154 f"Cloning '{source_package}' git repo into '{git_dir}' (tag: '{tag}')"
233 )155 )
@@ -247,29 +169,16 @@ class DdebSourceCodeGetter(DebugGetter):
247 return False169 return False
248 repo = Repo(git_dir)170 repo = Repo(git_dir)
249 prefix_path = os.path.join("/usr/src/", f"{source_package}-{version}/")171 prefix_path = os.path.join("/usr/src/", f"{source_package}-{version}/")
250 with lzma.open(filepath, "w") as xzfile:172 self._logger.debug(
251 self._logger.debug(173 f"Archiving git repo for '{source_package}-{version}' as '{filepath}'"
252 f"Archiving git repo for '{source_package}-{version}' as '{filepath}'"174 )
253 )175 with tempfile.NamedTemporaryFile() as tmpfile:
254 repo.archive(xzfile, prefix=prefix_path, format="tar")176 with lzma.open(tmpfile.name, "w") as xzfile:
177 repo.archive(xzfile, prefix=prefix_path, format="tar")
178 self._try_to_move_atomically(tmpfile.name, filepath)
255179
256 return True180 return True
257181
258 def _adjust_tar_filepath(self, tarinfo):
259 """Adjust the filepath for a TarInfo file.
260
261 This function is needed because TarFile.add strips the leading
262 slash from the filenames, so we have to workaround it by
263 re-adding the slash ourselves.
264
265 This function is intended to be used as a callback provided to
266 TarFile.add.
267
268 :param TarInfo tarinfo: The tarinfo.
269 """
270 tarinfo.name = f"/{tarinfo.name}"
271 return tarinfo
272
273 def _download_source_code_from_dsc(182 def _download_source_code_from_dsc(
274 self, source_package, version, filepath, source_urls183 self, source_package, version, filepath, source_urls
275 ):184 ):
@@ -286,17 +195,30 @@ class DdebSourceCodeGetter(DebugGetter):
286 :param list source_urls: List of URLs used to fetch the source195 :param list source_urls: List of URLs used to fetch the source
287 package. This is usually the list returned by the196 package. This is usually the list returned by the
288 sourceFileUrls() Launchpad API call.197 sourceFileUrls() Launchpad API call.
289 """198
199 :rtype: bool
200 This method returns True when the operation succeeds, or False
201 *iff* the "dpkg-source -x" command fails to run. Otherwise,
202 this function will throw an exception."""
290 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as source_dir:203 with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as source_dir:
291 for url in source_urls:204 for url in source_urls:
292 self._download_from_lp(url, source_dir)205 self._download_from_lp(url, source_dir)
293206
294 dscfile = os.path.join(source_dir, f"{source_package}_{version}.dsc")207 dscfile = None
295 if not os.path.isfile(dscfile):208 for f in os.listdir(source_dir):
296 self._logger.warning(f"'{dscfile}' doesn't exist, but it should.")209 newf = os.path.join(source_dir, f)
210 if os.path.isfile(newf) and f.endswith(".dsc"):
211 dscfile = newf
212 break
213
214 if dscfile is None:
215 self._logger.warning(
216 "Could not find .dsc file, even though it should exist."
217 )
297 return False218 return False
298219
299 outdir = os.path.join(source_dir, "outdir")220 outdir = os.path.join(source_dir, "outdir")
221 self._logger.debug(f"Will call 'dpkg-source -x {dscfile} {outdir}'")
300 try:222 try:
301 subprocess.run(223 subprocess.run(
302 ["/usr/bin/dpkg-source", "-x", dscfile, outdir],224 ["/usr/bin/dpkg-source", "-x", dscfile, outdir],
@@ -315,8 +237,12 @@ class DdebSourceCodeGetter(DebugGetter):
315237
316 prefix_path = os.path.join("/usr/src/", f"{source_package}-{version}/")238 prefix_path = os.path.join("/usr/src/", f"{source_package}-{version}/")
317239
318 with tarfile.open(filepath, "w:xz") as tfile:240 with tempfile.NamedTemporaryFile() as tmpfile:
319 tfile.add(outdir, arcname=prefix_path, filter=self._adjust_tar_filepath)241 with tarfile.open(tmpfile.name, "w:xz") as tfile:
242 tfile.add(
243 outdir, arcname=prefix_path, filter=adjust_tar_filepath
244 )
245 self._try_to_move_atomically(tmpfile.name, filepath)
320246
321 return True247 return True
322248
@@ -355,29 +281,14 @@ class DdebSourceCodeGetter(DebugGetter):
355 # like to retry the task.281 # like to retry the task.
356 raise DebugGetterRetry()282 raise DebugGetterRetry()
357283
358 def _process_source(284 def _process_source(self, source_package, version, component, source_urls):
359 self, source_package, version, component, ddeb_urls, source_urls
360 ):
361 """Process the request to fetch the source code.285 """Process the request to fetch the source code.
362286
363 We only download the source code if we know it will be
364 properly indexed by debuginfod.
365
366 :param str source_package: Source package name.287 :param str source_package: Source package name.
367288
368 :param str version: Source package version.289 :param str version: Source package version.
369290
370 :param str component: Source package component.291 :param str component: Source package component.
371292
372 :param list urls: List of ddeb URLs."""293 :param list source_urls: List of source URLs."""
373 # Convert URLs into filenames.
374 filenames = [
375 os.path.basename(parse.urlparse(fname).path) for fname in ddeb_urls
376 ]
377 if not self._should_fetch_source(source_package, version, component, filenames):
378 self._logger.info(
379 f"Should not fetch source code for '{source_package}-{version}'"
380 )
381 return
382
383 self._download_source_code(source_package, version, component, source_urls)294 self._download_source_code(source_package, version, component, source_urls)
diff --git a/ddebpoller.py b/ddebpoller.py
index 24ba613..6f92c2f 100644
--- a/ddebpoller.py
+++ b/ddebpoller.py
@@ -17,14 +17,18 @@
1717
18# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>18# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>
1919
20import os
21from urllib import parse
20from debugpoller import DebugPoller22from debugpoller import DebugPoller
2123
2224
23class DdebPoller(DebugPoller):25class DdebPoller(DebugPoller):
24 def __init__(26 """Perform the Launchpad polling and obtain a list of ddebs and source
25 self, initial_interval=None, force_initial_interval=False, dry_run=False27 packages."""
26 ):28
27 """Initialize the object using 'ddeb' as its name.29 def __init__(self, initial_interval=1, force_initial_interval=False, dry_run=False):
30
31 """Initialize the object using 'ddeb' as its name.
2832
29 Look at DebugPoller's docstring for an explanation about the arguments."""33 Look at DebugPoller's docstring for an explanation about the arguments."""
30 super().__init__(34 super().__init__(
@@ -34,43 +38,90 @@ class DdebPoller(DebugPoller):
34 dry_run=dry_run,38 dry_run=dry_run,
35 )39 )
3640
37 def get_ddebs(self):41 def get_ddebs_and_sources(self):
38 """Get the list of ddebs that have been published since the last42 """Get the list of ddebs and source files that have been published
39 timestamp from Launchpad."""43 since the last timestamp from Launchpad.
44
45 :rtype: dict, str
46
47 Return a dictionary containing all ddebs and source packages
48 found, and also the new timestamp that should then be recorded
49 by calling record_timestamp."""
40 timestamp = self._get_timestamp()50 timestamp = self._get_timestamp()
41 assert timestamp is not None, f"_get_timestamp returned None"51 if timestamp is None:
42 assert timestamp != "", f"_get_timestamp returned a blank timestamp"52 raise RuntimeError("_get_timestamp returned None")
53 if timestamp == "":
54 raise RuntimeError("_get_timestamp returned a blank timestamp")
43 new_timestamp = self._generate_timestamp()55 new_timestamp = self._generate_timestamp()
44 assert new_timestamp is not None, f"_generate_timestamp returned None"56 if new_timestamp is None:
45 assert new_timestamp != "", f"_generate_timestamp returned a blank timestamp"57 raise RuntimeError("_generate_timestamp returned None")
4658 if new_timestamp == "":
47 result = []59 raise RuntimeError("_generate_timestamp returned a blank timestamp")
4860
49 self._logger.info(61 self._logger.info(
50 f"Polling ddebs created since '{timestamp}' (it's now '{new_timestamp}')"62 f"Polling ddebs created since '{timestamp}' (it's now '{new_timestamp}')"
51 )63 )
5264
53 for pkg in self._main_archive.getPublishedSources(65 result = []
54 order_by_date=True, created_since_date=timestamp, status="Published"66 for pkg in self._main_archive.getPublishedBinaries(
67 order_by_date=True, created_since_date=timestamp
55 ):68 ):
56 ddeb_urls = [url for url in pkg.binaryFileUrls() if url.endswith(".ddeb")]69 if pkg.status not in ("Pending", "Published"):
57 ddebs_len = len(ddeb_urls)70 continue
58 if ddebs_len > 0:71 if not pkg.is_debug:
59 distro_series = self._lp.load(pkg.distro_series_link).name72 continue
60 pkgname = pkg.source_package_name73
61 pkgver = pkg.source_package_version74 ddeb_urls = pkg.binaryFileUrls()
75 if len(ddeb_urls) == 0:
76 # Safety check.
77 continue
78
79 srcname = pkg.source_package_name
80 srcver = pkg.source_package_version
81 component = pkg.component_name
82
83 # We create one message (which will eventually become a
84 # Celery task) per ddeb. This makes it easier later to do
85 # deduplication, and also has the benefit of making the
86 # downloading process more granular for multiple ddebs.
87 for url in ddeb_urls:
88 # Obtain the ddeb filename and its architecture.
89 ddeb = os.path.basename(parse.urlparse(url).path)
90 _, _, arch_and_extension = ddeb.split("_")
91 arch, _ = arch_and_extension.split(".")
92
62 msg = {93 msg = {
63 "source_package": pkgname,94 "source_package": srcname,
64 "version": pkgver,95 "version": srcver,
65 "component": pkg.component_name,96 "component": component,
66 "distro_series": distro_series,97 "ddeb_url": url,
67 "ddebs": ddeb_urls,98 "ddeb_filename": ddeb,
68 "sources": pkg.sourceFileUrls(),99 "architecture": arch,
69 }100 }
70 self._logger.debug(
71 f"For source package '{pkgname}-{pkgver}', found {ddebs_len}:\n{ddeb_urls}"
72 )
73 result.append(msg)101 result.append(msg)
74102
75 self._record_timestamp(new_timestamp)103 for pkg in self._main_archive.getPublishedSources(
76 return result104 order_by_date=True, created_since_date=timestamp
105 ):
106 if pkg.status not in ("Pending", "Published"):
107 continue
108
109 src_urls = pkg.sourceFileUrls()
110 if len(src_urls) == 0:
111 # Safety check.
112 continue
113
114 srcname = pkg.source_package_name
115 srcver = pkg.source_package_version
116 component = pkg.component_name
117
118 msg = {
119 "source_package": srcname,
120 "version": srcver,
121 "component": component,
122 "source_urls": src_urls,
123 "architecture": "source",
124 }
125 result.append(msg)
126
127 return result, new_timestamp
diff --git a/debuggetter.py b/debuggetter.py
index 9bb6102..bafa914 100644
--- a/debuggetter.py
+++ b/debuggetter.py
@@ -21,6 +21,8 @@ import os
21import requests21import requests
22from urllib import parse22from urllib import parse
23import logging23import logging
24import tempfile
25import shutil
2426
2527
26class DebugGetterTimeout(Exception):28class DebugGetterTimeout(Exception):
@@ -34,6 +36,7 @@ class DebugGetterRetry(Exception):
3436
35DEFAULT_MIRROR_DIR = "/srv/debug-mirror"37DEFAULT_MIRROR_DIR = "/srv/debug-mirror"
3638
39
37class DebugGetter:40class DebugGetter:
38 """Base class for a Debug Getter."""41 """Base class for a Debug Getter."""
3942
@@ -45,8 +48,7 @@ class DebugGetter:
4548
46 :param str subdir: The subdirectory (insider mirror_dir) where49 :param str subdir: The subdirectory (insider mirror_dir) where
47 the module will save its files. For example, a ddeb50 the module will save its files. For example, a ddeb
48 getter module should specify "ddebs" here.51 getter module should specify "ddebs" here."""
49 """
50 self._mirror_dir = mirror_dir52 self._mirror_dir = mirror_dir
51 self._subdir = subdir53 self._subdir = subdir
52 self._logger = logging.getLogger(__name__)54 self._logger = logging.getLogger(__name__)
@@ -55,17 +57,52 @@ class DebugGetter:
55 """Return the full save path for a package.57 """Return the full save path for a package.
5658
57 :param str source_package: The package name.59 :param str source_package: The package name.
58 :param str component: The component name (main, universe, etc.).60 :param str component: The component name (main, universe, etc.)."""
59 """61 if source_package.startswith("lib"):
62 pkgname_initials = source_package[:4]
63 else:
64 pkgname_initials = source_package[0]
65
60 return os.path.join(66 return os.path.join(
61 self._mirror_dir,67 self._mirror_dir,
62 self._subdir,68 self._subdir,
63 component,69 component,
64 source_package[:1],70 pkgname_initials,
65 source_package,71 source_package,
66 )72 )
6773
74 def _try_to_move_atomically(self, src, dst):
75 """Try to move SRC to DST atomically.
76
77 This function assumes that SRC and DST have different filenames.
78
79 :param str src: The source file (full path).
80
81 :param str dst: The destination file (full path)."""
82 self._logger.debug(f"Trying to move '{src}' to '{dst}' atomically")
83 savepath = os.path.dirname(dst)
84 os.makedirs(savepath, mode=0o755, exist_ok=True)
85 newtmpfile = os.path.join(savepath, os.path.basename(src))
86 shutil.copyfile(src, newtmpfile)
87 try:
88 shutil.move(newtmpfile, dst)
89 except shutil.SameFileError:
90 os.remove(newtmpfile)
91 self._logger.warning(
92 f"Could not create '{dst}' atomically: same file already exists"
93 )
94
68 def _download_from_lp(self, url, savepath):95 def _download_from_lp(self, url, savepath):
96 """Download a file from Launchpad.
97
98 This method tries to download the file in chunks into a
99 temporary file and then does an atomic move to the final
100 destination.
101
102 :param str url: The URL that should be downloaded.
103
104 :param str savepath: The full path (minus the filename) where
105 the file should be saved."""
69 filepath = os.path.join(savepath, os.path.basename(parse.urlparse(url).path))106 filepath = os.path.join(savepath, os.path.basename(parse.urlparse(url).path))
70 if os.path.exists(filepath):107 if os.path.exists(filepath):
71 self._logger.debug(f"'{filepath}' exists, doing nothing")108 self._logger.debug(f"'{filepath}' exists, doing nothing")
@@ -75,10 +112,12 @@ class DebugGetter:
75 with requests.Session() as s:112 with requests.Session() as s:
76 with s.get(url, allow_redirects=True, timeout=10, stream=True) as r:113 with s.get(url, allow_redirects=True, timeout=10, stream=True) as r:
77 r.raise_for_status()114 r.raise_for_status()
78 with open(filepath, "wb") as f:115 with tempfile.NamedTemporaryFile(mode="wb") as tmpfile:
79 # 10 MB for chunk_size should be enough...116 # 10 MB for chunk_size should be enough...
80 for chunk in r.iter_content(chunk_size=10 * 1024 * 1024):117 for chunk in r.iter_content(chunk_size=10 * 1024 * 1024):
81 f.write(chunk)118 tmpfile.write(chunk)
119 tmpfile.flush()
120 self._try_to_move_atomically(tmpfile.name, filepath)
82 except (121 except (
83 requests.ConnectionError,122 requests.ConnectionError,
84 requests.HTTPError,123 requests.HTTPError,
diff --git a/debuginfod.py b/debuginfod.py
index c0d0357..5c5d77f 100644
--- a/debuginfod.py
+++ b/debuginfod.py
@@ -18,11 +18,16 @@
18# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>18# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>
1919
20from celery import Celery20from celery import Celery
21from celery.signals import worker_ready, worker_shutting_down21from celery.signals import (
22 worker_ready,
23 worker_shutting_down,
24 task_postrun,
25 celeryd_init,
26)
2227
23from debuggetter import DebugGetterTimeout, DebugGetterRetry28from debuggetter import DebugGetterTimeout, DebugGetterRetry
24
25from ddebgetter import DdebGetter, DdebSourceCodeGetter29from ddebgetter import DdebGetter, DdebSourceCodeGetter
30from utils import DebugDB
2631
27import sdnotify32import sdnotify
2833
@@ -45,6 +50,23 @@ app.conf.update(
45sdnotifier = sdnotify.SystemdNotifier()50sdnotifier = sdnotify.SystemdNotifier()
4651
4752
53def _record_error_into_db(exc, msg):
54 """Record an error into the database.
55
56 :param Exception exc: The Exception that was triggered.
57
58 :param dict msg: The message that triggere the error."""
59 with DebugDB() as db:
60 with db.cursor() as cur:
61 cur.execute(
62 "INSERT INTO errors (task, exception) VALUES (%s, %s)",
63 (
64 str(msg),
65 str(exc),
66 ),
67 )
68
69
48@app.task(70@app.task(
49 name="grab_ddebs",71 name="grab_ddebs",
50 autoretry_for=(DebugGetterTimeout, DebugGetterRetry),72 autoretry_for=(DebugGetterTimeout, DebugGetterRetry),
@@ -58,10 +80,20 @@ def grab_ddebs(msg):
58 """Dispatch the DdebGetter task.80 """Dispatch the DdebGetter task.
5981
60 :param dict(str -> str) msg: The dictionary containing the message82 :param dict(str -> str) msg: The dictionary containing the message
61 that will be processed by the getter.83 that will be processed by the getter."""
62 """84 try:
63 g = DdebGetter()85 g = DdebGetter()
64 g.process_request(msg)86 g.process_request(msg)
87 except Exception as e:
88 if not isinstance(e, DebugGetterRetry) and not isinstance(
89 e, DebugGetterTimeout
90 ):
91 # This is some other kind of error that we need to deal
92 # with. Mark it as such.
93 _record_error_into_db(e, msg)
94 # We still need to raise the exception here. Celery will
95 # reschedule the task if applicable.
96 raise e
6597
6698
67@app.task(99@app.task(
@@ -77,10 +109,20 @@ def grab_ddebs_sources(msg):
77 """Dispatch the DdebSourceCodeGetter task.109 """Dispatch the DdebSourceCodeGetter task.
78110
79 :param dict(str -> str) msg: The dictionary containing the message111 :param dict(str -> str) msg: The dictionary containing the message
80 that will be processed by the getter.112 that will be processed by the getter."""
81 """113 try:
82 g = DdebSourceCodeGetter()114 g = DdebSourceCodeGetter()
83 g.process_request(msg)115 g.process_request(msg)
116 except Exception as e:
117 if not isinstance(e, DebugGetterRetry) and not isinstance(
118 e, DebugGetterTimeout
119 ):
120 # This is some other kind of error that we need to deal
121 # with. Mark it as such.
122 _record_error_into_db(e, msg)
123 # We still need to raise the exception here. Celery will
124 # reschedule the task if applicable.
125 raise e
84126
85127
86@worker_ready.connect128@worker_ready.connect
@@ -95,5 +137,38 @@ def notify_worker_shutting_down(**kwargs):
95 sdnotifier.notify("STOPPING=1")137 sdnotifier.notify("STOPPING=1")
96138
97139
140@task_postrun.connect
141def task_postrun(sender, args, state, **kwargs):
142 """Process a task that's just finished.
143
144 We keep the state of each task in a SQL database, and this
145 function is responsible for cleaning up the state for tasks that have
146 succeeded."""
147 if state not in ("SUCCESS",):
148 return
149
150 with DebugDB() as db:
151 with db.cursor() as cur:
152 msg = args[0]
153 jobid = msg["jobid"]
154 cur.execute("DELETE FROM tasks WHERE id = %s", (jobid,))
155
156
157@celeryd_init.connect
158def setup_worker(sender=None, **kwargs):
159 """Setup the worker.
160
161 We have to create the tables that will be populated when
162 processing tasks."""
163 with DebugDB() as db:
164 with db.cursor() as cur:
165 cur.execute(
166 "CREATE TABLE IF NOT EXISTS tasks (id serial PRIMARY KEY, source text, version text, arch text)"
167 )
168 cur.execute(
169 "CREATE TABLE IF NOT EXISTS errors (id serial PRIMARY KEY, task text, exception text)"
170 )
171
172
98if __name__ == "__main__":173if __name__ == "__main__":
99 app.start()174 app.start()
diff --git a/debugpoller.py b/debugpoller.py
index 058909d..2d74723 100644
--- a/debugpoller.py
+++ b/debugpoller.py
@@ -58,9 +58,10 @@ class DebugPoller:
5858
59 :param bool dry_run: Tell the poller that it shouldn't record59 :param bool dry_run: Tell the poller that it shouldn't record
60 the timestamp in the file when the operation finishes.60 the timestamp in the file when the operation finishes.
61 Default is False.61 Default is False."""
62 """62 self._lp = Launchpad.login_anonymously(
63 self._lp = Launchpad.login_anonymously("ubuntu-debuginfod poller", "production")63 "ubuntu-debuginfod poller", "production", version="devel"
64 )
64 self._main_archive = self._lp.distributions["ubuntu"].main_archive65 self._main_archive = self._lp.distributions["ubuntu"].main_archive
65 self.TIMESTAMP_FILE = self.TIMESTAMP_FILE + f"-{module_name}"66 self.TIMESTAMP_FILE = self.TIMESTAMP_FILE + f"-{module_name}"
6667
@@ -84,8 +85,7 @@ class DebugPoller:
84 :param interval: Specify how long ago (in hours) the timestamp must85 :param interval: Specify how long ago (in hours) the timestamp must
85 refer to. If not specified, the timestamp is generated for86 refer to. If not specified, the timestamp is generated for
86 the current time.87 the current time.
87 :type interval: int or None88 :type interval: int or None"""
88 """
89 d = datetime.datetime.now(datetime.timezone.utc)89 d = datetime.datetime.now(datetime.timezone.utc)
9090
91 if interval is not None:91 if interval is not None:
@@ -100,8 +100,7 @@ class DebugPoller:
100 new one.100 new one.
101101
102 If a timestamp file exists, returns its value. Otherwise,102 If a timestamp file exists, returns its value. Otherwise,
103 generate a new one with self._initial_interval.103 generate a new one with self._initial_interval."""
104 """
105 if not os.path.exists(self.TIMESTAMP_FILE) or self._force_initial_interval:104 if not os.path.exists(self.TIMESTAMP_FILE) or self._force_initial_interval:
106 self._logger.debug(f"Timestamp file '{self.TIMESTAMP_FILE}' doesn't exist")105 self._logger.debug(f"Timestamp file '{self.TIMESTAMP_FILE}' doesn't exist")
107 self._logger.debug(106 self._logger.debug(
@@ -110,14 +109,13 @@ class DebugPoller:
110 return self._generate_timestamp(interval=self._initial_interval)109 return self._generate_timestamp(interval=self._initial_interval)
111110
112 with open(self.TIMESTAMP_FILE, "r", encoding="UTF-8") as f:111 with open(self.TIMESTAMP_FILE, "r", encoding="UTF-8") as f:
113 return f.readline()112 return f.readline().rstrip()
114113
115 def _record_timestamp(self, timestamp):114 def record_timestamp(self, timestamp):
116 """Save the timestamp into the timestamp file.115 """Save the timestamp into the timestamp file.
117116
118 :param str timestamp: Timestamp that should be saved into the117 :param str timestamp: Timestamp that should be saved into the
119 TIMESTAMP_FILE.118 TIMESTAMP_FILE."""
120 """
121 if self._dry_run:119 if self._dry_run:
122 self._logger.debug("dry_run enabled, not recording timestamp file")120 self._logger.debug("dry_run enabled, not recording timestamp file")
123 return121 return
@@ -130,6 +128,5 @@ class DebugPoller:
130 def set_initial_interval(self, initial_interval):128 def set_initial_interval(self, initial_interval):
131 """Set the initial_interval value.129 """Set the initial_interval value.
132130
133 :param int initial_interval: The new initial_interval to be used.131 :param int initial_interval: The new initial_interval to be used."""
134 """
135 self._initial_interval = initial_interval132 self._initial_interval = initial_interval
diff --git a/poll_launchpad.py b/poll_launchpad.py
index 161aa77..65afa28 100644
--- a/poll_launchpad.py
+++ b/poll_launchpad.py
@@ -19,16 +19,87 @@
1919
20from ddebpoller import DdebPoller20from ddebpoller import DdebPoller
21from debuginfod import grab_ddebs, grab_ddebs_sources21from debuginfod import grab_ddebs, grab_ddebs_sources
22from utils import DebugDB
2223
23from celery import chain24import psycopg2
2425
25def poll_launchpad():26
26 """Poll Launchpad."""27def ensure_task(conn, cursor, msg):
28 """Return True if the task described by MSG should be processed.
29
30 Also, if True, then insert the task information into the database.
31
32 :param DebugDB conn: The database connection handler.
33
34 :param psycopg2.cursor cursor: The cursor to the SQL database.
35
36 :param dict msg: The message describing a task.
37
38 :rtype: bool, int
39 :return: True or False, and the jobid if True (or None if False)"""
40 srcname = msg["source_package"]
41 srcver = msg["version"]
42 architecture = msg["architecture"]
43 cursor.execute(
44 "SELECT * FROM tasks WHERE source = %s AND version = %s AND arch = %s",
45 (
46 srcname,
47 srcver,
48 architecture,
49 ),
50 )
51
52 if cursor.rowcount > 0:
53 return False, None
54
55 cursor.execute(
56 "INSERT INTO tasks (source, version, arch) VALUES (%s, %s, %s) RETURNING id",
57 (
58 srcname,
59 srcver,
60 architecture,
61 ),
62 )
63 jobid = cursor.fetchone()[0]
64 conn.commit()
65 return True, jobid
66
67
68def poll_launchpad(conn, cursor):
69 """Poll Launchpad and process the list of ddebs and source packages.
70
71 :param DebugDB conn: The SQL connection handler.
72
73 :param psycopg2.cursor cursor: The cursor to the SQL database."""
27 poller = DdebPoller()74 poller = DdebPoller()
28 for msg in poller.get_ddebs():75 messages, new_timestamp = poller.get_ddebs_and_sources()
29 # We have to use a chain here because the ddebs need to be76 for msg in messages:
30 # downloaded before we can start fetching the sources.77 proceed, jobid = ensure_task(conn, cursor, msg)
31 chain(grab_ddebs.si(msg), grab_ddebs_sources.si(msg))()78 if proceed:
79 if jobid is None:
80 raise RuntimeError("jobid is None")
81 # We append the ID to the message so that we can efficiently
82 # remove the task once it's been processed.
83 msg["jobid"] = str(jobid)
84
85 if msg["architecture"] == "source":
86 # Source packages have their "architecture" as "source".
87 grab_ddebs_sources.delay(msg)
88 else:
89 grab_ddebs.delay(msg)
90 poller.record_timestamp(new_timestamp)
91
92
93def main():
94 # Connect to the database.
95 with DebugDB() as db:
96 with db.cursor() as cursor:
97 cursor.execute(
98 "CREATE TABLE IF NOT EXISTS tasks (id serial PRIMARY KEY, source text, version text, arch text)"
99 )
100 db.commit()
101 poll_launchpad(db, cursor)
102
32103
33if __name__ == '__main__':104if __name__ == "__main__":
34 poll_launchpad()105 main()
diff --git a/utils.py b/utils.py
35new file mode 100644106new file mode 100644
index 0000000..b577009
--- /dev/null
+++ b/utils.py
@@ -0,0 +1,48 @@
1#!/usr/bin/python3
2
3# Copyright (C) 2022 Canonical Ltd.
4
5# This program is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14
15# You should have received a copy of the GNU General Public License
16# along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com>
19
20import psycopg2
21
22
23class DebugDB:
24 """Abstract a connection to the SQL database."""
25
26 def __init__(self):
27 self._conn = psycopg2.connect("dbname=ubuntu-debuginfod user=mirror")
28
29 def cursor(self):
30 """Return the cursor to be used when dealing with the database."""
31 return self._conn.cursor()
32
33 def commit(self):
34 """Commit the transaction to the DB."""
35 self._conn.commit()
36
37 def __enter__(self):
38 return self
39
40 def __exit__(self, exc_type, exc_value, exc_traceback):
41 self._conn.commit()
42 self._conn.close()
43 self._conn = None
44
45 def __del__(self):
46 if self._conn is not None:
47 self._conn.commit()
48 self._conn.close()

Subscribers

People subscribed via source and target branches

to all changes: