Merge ubuntu-debuginfod:concurrency-rework into ubuntu-debuginfod:master
- Git
- lp:ubuntu-debuginfod
- concurrency-rework
- Merge into master
Status: | Merged |
---|---|
Merged at revision: | 4e909f63e8695ed61e0c259d129075b52c7ead54 |
Proposed branch: | ubuntu-debuginfod:concurrency-rework |
Merge into: | ubuntu-debuginfod:master |
Diff against target: |
1004 lines (+443/-248) 8 files modified
README.md (+6/-3) ddebgetter.py (+85/-174) ddebpoller.py (+83/-32) debuggetter.py (+46/-7) debuginfod.py (+85/-10) debugpoller.py (+10/-13) poll_launchpad.py (+80/-9) utils.py (+48/-0) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Athos Ribeiro (community) | Approve | ||
Canonical Server packageset reviewers | Pending | ||
Robie Basak | Pending | ||
Lena Voytek | Pending | ||
Bryce Harrington | Pending | ||
Canonical Server Reporter | Pending | ||
Review via email: mp+434525@code.launchpad.net |
Commit message
Description of the change
This is a rework of the entire application after discussing with Robie and finding out that it did a poor job at treating race conditions.
The main idea behind this MP is that we will now keep the state of each task in a SQL database. I chose PostgreSQL because (a) I like the database, and (b) it does a very decent job a handling concurrent requests. Unfortunately, sqlite did not meet the standards I was looking for.
The poller (poll_launchpad.py) will poll Launchpad, get the list of ddebs and source packages that have been published since it last ran, and dispatch tasks for each one. While doing so, it will also update the database and insert entries for each task. The poller is a single process.
The getter is where things get more interesting. Celery will spawn multiple concurrent threads that will treat tasks as they arrive, and in an ideal world these tasks are all unique. However, there are certain scenarios (very unlikely to happen, but not impossible) where we might have duplicate tasks in the queue. Even in this scenario, as long as these tasks are processed in different moments everything should be fine, because the getter is idempotent. However^2, in the extremely unlikely scenario where two workers decide to process two identical tasks, we might have some race conditions when, e.g., writing the ddeb to disk. For this reason, the getter now tries to move the files atomically.
The Celery application (debuginfod.py) will monitor the tasks and update the SQL database accordingly. If the task succeeds, we don't need to care about it anymore and can simply remove its entry. If the task gets rescheduled (because of a timeout issue, for example), then we keep its entry intact, which should prevent duplicate tasks from being dispatched by the poller. There's also some error logging as well, because sometimes a task may throw an unexpected exception and we need to be able to log this problem in order to come back to it later. The logging is also done using a SQL table.
I ran extensive tests locally to make sure things are working correctly, and fixed every problem I found. I'm confident that this can be put in production, but I would like your input first. The MP is big but I split the commits logically, which should make things easier to review. Let me know if you need any clarification.
Bryce Harrington (bryce) wrote : | # |
Bryce Harrington (bryce) wrote : | # |
Another quick look, just at the database code, with a refactoring suggestion.
Sergio Durigan Junior (sergiodj) : | # |
Sergio Durigan Junior (sergiodj) wrote : | # |
BTW, with my latest force-push I've also implemented a new class (DebugDB) which abstracts the access to the database. I think it's cleaner this way.
Lena Voytek (lvoytek) wrote : | # |
So far this looks good to me. I've added some suggestions for cleanup below
Sergio Durigan Junior (sergiodj) wrote : | # |
Thanks, Lena.
Addressed all comments and pushed a new commit on top of everything.
Athos Ribeiro (athos-ribeiro) wrote : | # |
Looking good!
I added a few inline comments
Sergio Durigan Junior (sergiodj) wrote : | # |
Thanks, Athos! Addressed most of your comments and pushed a new commit.
Sergio Durigan Junior (sergiodj) : | # |
Sergio Durigan Junior (sergiodj) wrote : | # |
Thanks for the reviews, folks.
Preview Diff
1 | diff --git a/README.md b/README.md |
2 | index 8fe7f00..3c49598 100644 |
3 | --- a/README.md |
4 | +++ b/README.md |
5 | @@ -8,18 +8,21 @@ This project is used in the deployment of Ubuntu's |
6 | * Python |
7 | |
8 | ``` |
9 | -python3-pyelftools >= 0.29-1 |
10 | -python3-debian |
11 | python3-git |
12 | python3-celery |
13 | python3-launchpadlib |
14 | python3-requests |
15 | python3-sdnotify |
16 | +python3-psycopg2 |
17 | ``` |
18 | |
19 | * Applications |
20 | |
21 | ``` |
22 | celery |
23 | -rabbitmq-server |
24 | +rabbitmq-server* |
25 | +postgresql-server* |
26 | + |
27 | +*: These could be charms, but (as of this writing) they don't yet |
28 | + support Jammy. |
29 | ``` |
30 | diff --git a/ddebgetter.py b/ddebgetter.py |
31 | index 62eebc6..661d477 100644 |
32 | --- a/ddebgetter.py |
33 | +++ b/ddebgetter.py |
34 | @@ -18,16 +18,11 @@ |
35 | # Authors: Sergio Durigan Junior <sergio.durigan@canonical.com> |
36 | |
37 | import os |
38 | -from urllib import parse |
39 | import lzma |
40 | import tarfile |
41 | import subprocess |
42 | |
43 | -from debian import debfile |
44 | import tempfile |
45 | -from elftools.common.utils import bytelist2string |
46 | -from elftools.common.exceptions import ELFError, ELFParseError, DWARFError |
47 | -from elftools.elf.elffile import ELFFile |
48 | |
49 | from git import Git, Repo |
50 | from git.exc import GitCommandError |
51 | @@ -49,45 +44,42 @@ class DdebGetter(DebugGetter): |
52 | |
53 | :param request: The dictionary containing the information |
54 | necessary to fetch this ddeb. |
55 | - :type request: dict(str : str) |
56 | - """ |
57 | - if not request or not request.get("ddebs"): |
58 | - return |
59 | - |
60 | - self._logger.debug(f"Processing request to download ddebs: {request}") |
61 | - self._download_ddebs( |
62 | - request["source_package"], |
63 | - request["component"], |
64 | - request["ddebs"], |
65 | + :type request: dict(str : str)""" |
66 | + if request is None: |
67 | + raise TypeError("Invalid request (None)") |
68 | + if request.get("ddeb_url") is None: |
69 | + raise TypeError("No 'ddeb_url' in request") |
70 | + if request.get("ddeb_url") is None: |
71 | + raise TypeError("No 'architecture' in request") |
72 | + if request["architecture"] == "source": |
73 | + raise ValueError("Wrong request: source fetch") |
74 | + |
75 | + self._logger.debug(f"Processing request to download ddeb: {request}") |
76 | + self._download_ddeb( |
77 | + request["source_package"], request["component"], request["ddeb_url"] |
78 | ) |
79 | |
80 | - def _download_ddebs(self, source_package, component, urls): |
81 | - """Download the ddebs associated with a package/version. |
82 | + def _download_ddeb(self, source_package, component, ddeb_url): |
83 | + """Download a ddeb associated with a package. |
84 | |
85 | :param str source_package: Source package name. |
86 | |
87 | - :param str version: Source package version. |
88 | - |
89 | :param str component: Source package component. |
90 | |
91 | - :param list urls: List of ddeb URLs.""" |
92 | + :param str ddeb_url: The ddeb URL.""" |
93 | savepath = self._make_savepath(source_package, component) |
94 | - os.makedirs(savepath, mode=0o755, exist_ok=True) |
95 | - for url in urls: |
96 | - self._logger.debug(f"Downloading '{url}' into '{savepath}'") |
97 | - self._download_from_lp(url, savepath) |
98 | + self._logger.debug(f"Downloading '{ddeb_url}' into '{savepath}'") |
99 | + self._download_from_lp(ddeb_url, savepath) |
100 | |
101 | |
102 | # The function below was taken from git-ubuntu. We need to make |
103 | # sure we perform the same version -> tag transformation as it |
104 | # does. |
105 | -def _git_dep14_tag(version): |
106 | +def git_dep14_tag(version): |
107 | """Munge a version string according to http://dep.debian.net/deps/dep14/ |
108 | |
109 | :param str version: The version to be adjusted.""" |
110 | - version = version.replace("~", "_") |
111 | - version = version.replace(":", "%") |
112 | - version = version.replace("..", ".#.") |
113 | + version = version.replace("~", "_").replace(":", "%").replace("..", ".#.") |
114 | if version.endswith("."): |
115 | version = version + "#" |
116 | if version.endswith(".lock"): |
117 | @@ -95,6 +87,20 @@ def _git_dep14_tag(version): |
118 | version = pre + ".#lock" |
119 | return version |
120 | |
121 | +def adjust_tar_filepath(tarinfo): |
122 | + """Adjust the filepath for a TarInfo file. |
123 | + |
124 | + This function is needed because TarFile.add strips the leading |
125 | + slash from the filenames, so we have to workaround it by |
126 | + re-adding the slash ourselves. |
127 | + |
128 | + This function is intended to be used as a callback provided to |
129 | + TarFile.add. |
130 | + |
131 | + :param TarInfo tarinfo: The tarinfo.""" |
132 | + tarinfo.name = os.path.join("/", tarinfo.name) |
133 | + return tarinfo |
134 | + |
135 | |
136 | class DdebSourceCodeGetter(DebugGetter): |
137 | """Get (fetch) the source code associated with a ddeb.""" |
138 | @@ -107,113 +113,24 @@ class DdebSourceCodeGetter(DebugGetter): |
139 | |
140 | :param request: The dictionary containing the information |
141 | necessary to fetch this source code. |
142 | - :type request: dict(str : str) |
143 | - """ |
144 | - if not request or not request.get("ddebs"): |
145 | - return |
146 | - |
147 | - self._logger.debug( |
148 | - f"Processing request to download source code (for ddebs): {request}" |
149 | - ) |
150 | + :type request: dict(str : str)""" |
151 | + if request is None: |
152 | + raise TypeError("Invalid request (None)") |
153 | + if request.get("source_urls") is None: |
154 | + raise TypeError("No 'source_urls' in request") |
155 | + if request.get("ddeb_url") is None: |
156 | + raise TypeError("No 'architecture' in request") |
157 | + if request["architecture"] != "source": |
158 | + raise ValueError("Wrong request: ddeb fetch") |
159 | + |
160 | + self._logger.debug(f"Processing request to download source code: {request}") |
161 | self._process_source( |
162 | request["source_package"], |
163 | request["version"], |
164 | request["component"], |
165 | - request["ddebs"], |
166 | - request["sources"], |
167 | + request["source_urls"], |
168 | ) |
169 | |
170 | - def _get_comp_dirs(self, debug_file, filename): |
171 | - """Get the list of available DW_AT_comp_dir declarations from the |
172 | - DWARF file. |
173 | - |
174 | - :param TextIOWrapper debug_file: The .debug file. |
175 | - """ |
176 | - elf_file = ELFFile(debug_file) |
177 | - |
178 | - if not elf_file.has_dwarf_info(): |
179 | - self._logger.debug(f"'{filename}' doesn't have DWARF") |
180 | - return [] |
181 | - |
182 | - result = [] |
183 | - dwarf_file = elf_file.get_dwarf_info() |
184 | - for cu in dwarf_file.iter_CUs(): |
185 | - attr = cu.get_top_DIE().attributes |
186 | - if "DW_AT_comp_dir" in attr.keys(): |
187 | - value = attr["DW_AT_comp_dir"].value |
188 | - if isinstance(value, bytes): |
189 | - result.append(bytelist2string(value).decode("UTF-8")) |
190 | - self._logger.debug( |
191 | - f"Found the following DW_AT_comp_dir for '{filename}':\n{result}" |
192 | - ) |
193 | - return result |
194 | - |
195 | - def _should_fetch_source_for_debugfile(self, source_package, version, filepath): |
196 | - """Whether we should fetch the source code for the specified source |
197 | - package/deb. |
198 | - |
199 | - We return True if the debug file associated with the source |
200 | - package has any DW_AT_comp_dir entity whose path points to |
201 | - "/usr/src/source_package-version/". |
202 | - |
203 | - :param str source_package: Source package name. |
204 | - |
205 | - :param str version: Source package version. |
206 | - |
207 | - :param str filepath: Full path to the .ddeb file.""" |
208 | - deb_file = debfile.DebFile(filepath) |
209 | - # The path we expect to find in any of the DW_AT_comp_dirs. |
210 | - expected_path = f"/usr/src/{source_package}-{version}/" |
211 | - for deb_internal_file in list(deb_file.data): |
212 | - if not deb_internal_file.endswith(".debug"): |
213 | - continue |
214 | - with tempfile.TemporaryFile() as debug_file: |
215 | - debug_file.write(deb_file.data.get_content(deb_internal_file)) |
216 | - try: |
217 | - comp_dirs = self._get_comp_dirs(debug_file, deb_internal_file) |
218 | - except (ELFError, ELFParseError, DWARFError) as e: |
219 | - self._logger.warning( |
220 | - f"Exception while looking for DW_AT_comp_dirs: {e}" |
221 | - ) |
222 | - # There's no point in retrying anything here, so |
223 | - # we just generate a warning and keep going. |
224 | - continue |
225 | - |
226 | - for comp_dir in comp_dirs: |
227 | - if comp_dir.startswith(expected_path): |
228 | - self._logger.debug(f"Found a good DW_AT_comp_dir: {comp_dir}") |
229 | - return True |
230 | - return False |
231 | - |
232 | - def _should_fetch_source(self, source_package, version, component, filenames): |
233 | - """Return True if we should fetch the source code for a specific |
234 | - package, False otherwise. |
235 | - |
236 | - Whether or not we should fetch the source code is determined |
237 | - by the presence of a specific path prefix (/usr/src/...) in |
238 | - the DW_AT_comp_dir declarations of a DWARF file. |
239 | - |
240 | - :param str source_package: Source package name. |
241 | - |
242 | - :param str version: Source package version. |
243 | - |
244 | - :param str component: Source package component (main, |
245 | - universe, etc.) |
246 | - |
247 | - :param list urls: is the list of URLs used to fetch the respective debug |
248 | - information for the package. |
249 | - """ |
250 | - savepath = self._make_savepath(source_package, component) |
251 | - for fname in filenames: |
252 | - filepath = os.path.join(savepath, fname) |
253 | - if not os.path.isfile(filepath): |
254 | - continue |
255 | - if self._should_fetch_source_for_debugfile( |
256 | - source_package, version, filepath |
257 | - ): |
258 | - return True |
259 | - return False |
260 | - |
261 | def _download_source_code_from_git(self, source_package, version, filepath): |
262 | """Download the source code using Launchpad's git repository. |
263 | |
264 | @@ -223,11 +140,16 @@ class DdebSourceCodeGetter(DebugGetter): |
265 | |
266 | :param str filepath: The full pathname where the resulting |
267 | source code tarball should be saved. |
268 | - """ |
269 | + |
270 | + :rtype: bool |
271 | + |
272 | + This method returns True when the operation succeeds, or False |
273 | + *iff* the "git clone" command fails to run. Otherwise, this |
274 | + function will throw an exception.""" |
275 | with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as source_dir: |
276 | g = Git() |
277 | git_dir = os.path.join(source_dir, f"{source_package}") |
278 | - tag = "applied/" + _git_dep14_tag(f"{version}") |
279 | + tag = "applied/" + git_dep14_tag(f"{version}") |
280 | self._logger.debug( |
281 | f"Cloning '{source_package}' git repo into '{git_dir}' (tag: '{tag}')" |
282 | ) |
283 | @@ -247,29 +169,16 @@ class DdebSourceCodeGetter(DebugGetter): |
284 | return False |
285 | repo = Repo(git_dir) |
286 | prefix_path = os.path.join("/usr/src/", f"{source_package}-{version}/") |
287 | - with lzma.open(filepath, "w") as xzfile: |
288 | - self._logger.debug( |
289 | - f"Archiving git repo for '{source_package}-{version}' as '{filepath}'" |
290 | - ) |
291 | - repo.archive(xzfile, prefix=prefix_path, format="tar") |
292 | + self._logger.debug( |
293 | + f"Archiving git repo for '{source_package}-{version}' as '{filepath}'" |
294 | + ) |
295 | + with tempfile.NamedTemporaryFile() as tmpfile: |
296 | + with lzma.open(tmpfile.name, "w") as xzfile: |
297 | + repo.archive(xzfile, prefix=prefix_path, format="tar") |
298 | + self._try_to_move_atomically(tmpfile.name, filepath) |
299 | |
300 | return True |
301 | |
302 | - def _adjust_tar_filepath(self, tarinfo): |
303 | - """Adjust the filepath for a TarInfo file. |
304 | - |
305 | - This function is needed because TarFile.add strips the leading |
306 | - slash from the filenames, so we have to workaround it by |
307 | - re-adding the slash ourselves. |
308 | - |
309 | - This function is intended to be used as a callback provided to |
310 | - TarFile.add. |
311 | - |
312 | - :param TarInfo tarinfo: The tarinfo. |
313 | - """ |
314 | - tarinfo.name = f"/{tarinfo.name}" |
315 | - return tarinfo |
316 | - |
317 | def _download_source_code_from_dsc( |
318 | self, source_package, version, filepath, source_urls |
319 | ): |
320 | @@ -286,17 +195,30 @@ class DdebSourceCodeGetter(DebugGetter): |
321 | :param list source_urls: List of URLs used to fetch the source |
322 | package. This is usually the list returned by the |
323 | sourceFileUrls() Launchpad API call. |
324 | - """ |
325 | + |
326 | + :rtype: bool |
327 | + This method returns True when the operation succeeds, or False |
328 | + *iff* the "dpkg-source -x" command fails to run. Otherwise, |
329 | + this function will throw an exception.""" |
330 | with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as source_dir: |
331 | for url in source_urls: |
332 | self._download_from_lp(url, source_dir) |
333 | |
334 | - dscfile = os.path.join(source_dir, f"{source_package}_{version}.dsc") |
335 | - if not os.path.isfile(dscfile): |
336 | - self._logger.warning(f"'{dscfile}' doesn't exist, but it should.") |
337 | + dscfile = None |
338 | + for f in os.listdir(source_dir): |
339 | + newf = os.path.join(source_dir, f) |
340 | + if os.path.isfile(newf) and f.endswith(".dsc"): |
341 | + dscfile = newf |
342 | + break |
343 | + |
344 | + if dscfile is None: |
345 | + self._logger.warning( |
346 | + "Could not find .dsc file, even though it should exist." |
347 | + ) |
348 | return False |
349 | |
350 | outdir = os.path.join(source_dir, "outdir") |
351 | + self._logger.debug(f"Will call 'dpkg-source -x {dscfile} {outdir}'") |
352 | try: |
353 | subprocess.run( |
354 | ["/usr/bin/dpkg-source", "-x", dscfile, outdir], |
355 | @@ -315,8 +237,12 @@ class DdebSourceCodeGetter(DebugGetter): |
356 | |
357 | prefix_path = os.path.join("/usr/src/", f"{source_package}-{version}/") |
358 | |
359 | - with tarfile.open(filepath, "w:xz") as tfile: |
360 | - tfile.add(outdir, arcname=prefix_path, filter=self._adjust_tar_filepath) |
361 | + with tempfile.NamedTemporaryFile() as tmpfile: |
362 | + with tarfile.open(tmpfile.name, "w:xz") as tfile: |
363 | + tfile.add( |
364 | + outdir, arcname=prefix_path, filter=adjust_tar_filepath |
365 | + ) |
366 | + self._try_to_move_atomically(tmpfile.name, filepath) |
367 | |
368 | return True |
369 | |
370 | @@ -355,29 +281,14 @@ class DdebSourceCodeGetter(DebugGetter): |
371 | # like to retry the task. |
372 | raise DebugGetterRetry() |
373 | |
374 | - def _process_source( |
375 | - self, source_package, version, component, ddeb_urls, source_urls |
376 | - ): |
377 | + def _process_source(self, source_package, version, component, source_urls): |
378 | """Process the request to fetch the source code. |
379 | |
380 | - We only download the source code if we know it will be |
381 | - properly indexed by debuginfod. |
382 | - |
383 | :param str source_package: Source package name. |
384 | |
385 | :param str version: Source package version. |
386 | |
387 | :param str component: Source package component. |
388 | |
389 | - :param list urls: List of ddeb URLs.""" |
390 | - # Convert URLs into filenames. |
391 | - filenames = [ |
392 | - os.path.basename(parse.urlparse(fname).path) for fname in ddeb_urls |
393 | - ] |
394 | - if not self._should_fetch_source(source_package, version, component, filenames): |
395 | - self._logger.info( |
396 | - f"Should not fetch source code for '{source_package}-{version}'" |
397 | - ) |
398 | - return |
399 | - |
400 | + :param list source_urls: List of source URLs.""" |
401 | self._download_source_code(source_package, version, component, source_urls) |
402 | diff --git a/ddebpoller.py b/ddebpoller.py |
403 | index 24ba613..6f92c2f 100644 |
404 | --- a/ddebpoller.py |
405 | +++ b/ddebpoller.py |
406 | @@ -17,14 +17,18 @@ |
407 | |
408 | # Authors: Sergio Durigan Junior <sergio.durigan@canonical.com> |
409 | |
410 | +import os |
411 | +from urllib import parse |
412 | from debugpoller import DebugPoller |
413 | |
414 | |
415 | class DdebPoller(DebugPoller): |
416 | - def __init__( |
417 | - self, initial_interval=None, force_initial_interval=False, dry_run=False |
418 | - ): |
419 | - """Initialize the object using 'ddeb' as its name. |
420 | + """Perform the Launchpad polling and obtain a list of ddebs and source |
421 | + packages.""" |
422 | + |
423 | + def __init__(self, initial_interval=1, force_initial_interval=False, dry_run=False): |
424 | + |
425 | + """Initialize the object using 'ddeb' as its name. |
426 | |
427 | Look at DebugPoller's docstring for an explanation about the arguments.""" |
428 | super().__init__( |
429 | @@ -34,43 +38,90 @@ class DdebPoller(DebugPoller): |
430 | dry_run=dry_run, |
431 | ) |
432 | |
433 | - def get_ddebs(self): |
434 | - """Get the list of ddebs that have been published since the last |
435 | - timestamp from Launchpad.""" |
436 | + def get_ddebs_and_sources(self): |
437 | + """Get the list of ddebs and source files that have been published |
438 | + since the last timestamp from Launchpad. |
439 | + |
440 | + :rtype: dict, str |
441 | + |
442 | + Return a dictionary containing all ddebs and source packages |
443 | + found, and also the new timestamp that should then be recorded |
444 | + by calling record_timestamp.""" |
445 | timestamp = self._get_timestamp() |
446 | - assert timestamp is not None, f"_get_timestamp returned None" |
447 | - assert timestamp != "", f"_get_timestamp returned a blank timestamp" |
448 | + if timestamp is None: |
449 | + raise RuntimeError("_get_timestamp returned None") |
450 | + if timestamp == "": |
451 | + raise RuntimeError("_get_timestamp returned a blank timestamp") |
452 | new_timestamp = self._generate_timestamp() |
453 | - assert new_timestamp is not None, f"_generate_timestamp returned None" |
454 | - assert new_timestamp != "", f"_generate_timestamp returned a blank timestamp" |
455 | - |
456 | - result = [] |
457 | + if new_timestamp is None: |
458 | + raise RuntimeError("_generate_timestamp returned None") |
459 | + if new_timestamp == "": |
460 | + raise RuntimeError("_generate_timestamp returned a blank timestamp") |
461 | |
462 | self._logger.info( |
463 | f"Polling ddebs created since '{timestamp}' (it's now '{new_timestamp}')" |
464 | ) |
465 | |
466 | - for pkg in self._main_archive.getPublishedSources( |
467 | - order_by_date=True, created_since_date=timestamp, status="Published" |
468 | + result = [] |
469 | + for pkg in self._main_archive.getPublishedBinaries( |
470 | + order_by_date=True, created_since_date=timestamp |
471 | ): |
472 | - ddeb_urls = [url for url in pkg.binaryFileUrls() if url.endswith(".ddeb")] |
473 | - ddebs_len = len(ddeb_urls) |
474 | - if ddebs_len > 0: |
475 | - distro_series = self._lp.load(pkg.distro_series_link).name |
476 | - pkgname = pkg.source_package_name |
477 | - pkgver = pkg.source_package_version |
478 | + if pkg.status not in ("Pending", "Published"): |
479 | + continue |
480 | + if not pkg.is_debug: |
481 | + continue |
482 | + |
483 | + ddeb_urls = pkg.binaryFileUrls() |
484 | + if len(ddeb_urls) == 0: |
485 | + # Safety check. |
486 | + continue |
487 | + |
488 | + srcname = pkg.source_package_name |
489 | + srcver = pkg.source_package_version |
490 | + component = pkg.component_name |
491 | + |
492 | + # We create one message (which will eventually become a |
493 | + # Celery task) per ddeb. This makes it easier later to do |
494 | + # deduplication, and also has the benefit of making the |
495 | + # downloading process more granular for multiple ddebs. |
496 | + for url in ddeb_urls: |
497 | + # Obtain the ddeb filename and its architecture. |
498 | + ddeb = os.path.basename(parse.urlparse(url).path) |
499 | + _, _, arch_and_extension = ddeb.split("_") |
500 | + arch, _ = arch_and_extension.split(".") |
501 | + |
502 | msg = { |
503 | - "source_package": pkgname, |
504 | - "version": pkgver, |
505 | - "component": pkg.component_name, |
506 | - "distro_series": distro_series, |
507 | - "ddebs": ddeb_urls, |
508 | - "sources": pkg.sourceFileUrls(), |
509 | + "source_package": srcname, |
510 | + "version": srcver, |
511 | + "component": component, |
512 | + "ddeb_url": url, |
513 | + "ddeb_filename": ddeb, |
514 | + "architecture": arch, |
515 | } |
516 | - self._logger.debug( |
517 | - f"For source package '{pkgname}-{pkgver}', found {ddebs_len}:\n{ddeb_urls}" |
518 | - ) |
519 | result.append(msg) |
520 | |
521 | - self._record_timestamp(new_timestamp) |
522 | - return result |
523 | + for pkg in self._main_archive.getPublishedSources( |
524 | + order_by_date=True, created_since_date=timestamp |
525 | + ): |
526 | + if pkg.status not in ("Pending", "Published"): |
527 | + continue |
528 | + |
529 | + src_urls = pkg.sourceFileUrls() |
530 | + if len(src_urls) == 0: |
531 | + # Safety check. |
532 | + continue |
533 | + |
534 | + srcname = pkg.source_package_name |
535 | + srcver = pkg.source_package_version |
536 | + component = pkg.component_name |
537 | + |
538 | + msg = { |
539 | + "source_package": srcname, |
540 | + "version": srcver, |
541 | + "component": component, |
542 | + "source_urls": src_urls, |
543 | + "architecture": "source", |
544 | + } |
545 | + result.append(msg) |
546 | + |
547 | + return result, new_timestamp |
548 | diff --git a/debuggetter.py b/debuggetter.py |
549 | index 9bb6102..bafa914 100644 |
550 | --- a/debuggetter.py |
551 | +++ b/debuggetter.py |
552 | @@ -21,6 +21,8 @@ import os |
553 | import requests |
554 | from urllib import parse |
555 | import logging |
556 | +import tempfile |
557 | +import shutil |
558 | |
559 | |
560 | class DebugGetterTimeout(Exception): |
561 | @@ -34,6 +36,7 @@ class DebugGetterRetry(Exception): |
562 | |
563 | DEFAULT_MIRROR_DIR = "/srv/debug-mirror" |
564 | |
565 | + |
566 | class DebugGetter: |
567 | """Base class for a Debug Getter.""" |
568 | |
569 | @@ -45,8 +48,7 @@ class DebugGetter: |
570 | |
571 | :param str subdir: The subdirectory (insider mirror_dir) where |
572 | the module will save its files. For example, a ddeb |
573 | - getter module should specify "ddebs" here. |
574 | - """ |
575 | + getter module should specify "ddebs" here.""" |
576 | self._mirror_dir = mirror_dir |
577 | self._subdir = subdir |
578 | self._logger = logging.getLogger(__name__) |
579 | @@ -55,17 +57,52 @@ class DebugGetter: |
580 | """Return the full save path for a package. |
581 | |
582 | :param str source_package: The package name. |
583 | - :param str component: The component name (main, universe, etc.). |
584 | - """ |
585 | + :param str component: The component name (main, universe, etc.).""" |
586 | + if source_package.startswith("lib"): |
587 | + pkgname_initials = source_package[:4] |
588 | + else: |
589 | + pkgname_initials = source_package[0] |
590 | + |
591 | return os.path.join( |
592 | self._mirror_dir, |
593 | self._subdir, |
594 | component, |
595 | - source_package[:1], |
596 | + pkgname_initials, |
597 | source_package, |
598 | ) |
599 | |
600 | + def _try_to_move_atomically(self, src, dst): |
601 | + """Try to move SRC to DST atomically. |
602 | + |
603 | + This function assumes that SRC and DST have different filenames. |
604 | + |
605 | + :param str src: The source file (full path). |
606 | + |
607 | + :param str dst: The destination file (full path).""" |
608 | + self._logger.debug(f"Trying to move '{src}' to '{dst}' atomically") |
609 | + savepath = os.path.dirname(dst) |
610 | + os.makedirs(savepath, mode=0o755, exist_ok=True) |
611 | + newtmpfile = os.path.join(savepath, os.path.basename(src)) |
612 | + shutil.copyfile(src, newtmpfile) |
613 | + try: |
614 | + shutil.move(newtmpfile, dst) |
615 | + except shutil.SameFileError: |
616 | + os.remove(newtmpfile) |
617 | + self._logger.warning( |
618 | + f"Could not create '{dst}' atomically: same file already exists" |
619 | + ) |
620 | + |
621 | def _download_from_lp(self, url, savepath): |
622 | + """Download a file from Launchpad. |
623 | + |
624 | + This method tries to download the file in chunks into a |
625 | + temporary file and then does an atomic move to the final |
626 | + destination. |
627 | + |
628 | + :param str url: The URL that should be downloaded. |
629 | + |
630 | + :param str savepath: The full path (minus the filename) where |
631 | + the file should be saved.""" |
632 | filepath = os.path.join(savepath, os.path.basename(parse.urlparse(url).path)) |
633 | if os.path.exists(filepath): |
634 | self._logger.debug(f"'{filepath}' exists, doing nothing") |
635 | @@ -75,10 +112,12 @@ class DebugGetter: |
636 | with requests.Session() as s: |
637 | with s.get(url, allow_redirects=True, timeout=10, stream=True) as r: |
638 | r.raise_for_status() |
639 | - with open(filepath, "wb") as f: |
640 | + with tempfile.NamedTemporaryFile(mode="wb") as tmpfile: |
641 | # 10 MB for chunk_size should be enough... |
642 | for chunk in r.iter_content(chunk_size=10 * 1024 * 1024): |
643 | - f.write(chunk) |
644 | + tmpfile.write(chunk) |
645 | + tmpfile.flush() |
646 | + self._try_to_move_atomically(tmpfile.name, filepath) |
647 | except ( |
648 | requests.ConnectionError, |
649 | requests.HTTPError, |
650 | diff --git a/debuginfod.py b/debuginfod.py |
651 | index c0d0357..5c5d77f 100644 |
652 | --- a/debuginfod.py |
653 | +++ b/debuginfod.py |
654 | @@ -18,11 +18,16 @@ |
655 | # Authors: Sergio Durigan Junior <sergio.durigan@canonical.com> |
656 | |
657 | from celery import Celery |
658 | -from celery.signals import worker_ready, worker_shutting_down |
659 | +from celery.signals import ( |
660 | + worker_ready, |
661 | + worker_shutting_down, |
662 | + task_postrun, |
663 | + celeryd_init, |
664 | +) |
665 | |
666 | from debuggetter import DebugGetterTimeout, DebugGetterRetry |
667 | - |
668 | from ddebgetter import DdebGetter, DdebSourceCodeGetter |
669 | +from utils import DebugDB |
670 | |
671 | import sdnotify |
672 | |
673 | @@ -45,6 +50,23 @@ app.conf.update( |
674 | sdnotifier = sdnotify.SystemdNotifier() |
675 | |
676 | |
677 | +def _record_error_into_db(exc, msg): |
678 | + """Record an error into the database. |
679 | + |
680 | + :param Exception exc: The Exception that was triggered. |
681 | + |
682 | + :param dict msg: The message that triggere the error.""" |
683 | + with DebugDB() as db: |
684 | + with db.cursor() as cur: |
685 | + cur.execute( |
686 | + "INSERT INTO errors (task, exception) VALUES (%s, %s)", |
687 | + ( |
688 | + str(msg), |
689 | + str(exc), |
690 | + ), |
691 | + ) |
692 | + |
693 | + |
694 | @app.task( |
695 | name="grab_ddebs", |
696 | autoretry_for=(DebugGetterTimeout, DebugGetterRetry), |
697 | @@ -58,10 +80,20 @@ def grab_ddebs(msg): |
698 | """Dispatch the DdebGetter task. |
699 | |
700 | :param dict(str -> str) msg: The dictionary containing the message |
701 | - that will be processed by the getter. |
702 | - """ |
703 | - g = DdebGetter() |
704 | - g.process_request(msg) |
705 | + that will be processed by the getter.""" |
706 | + try: |
707 | + g = DdebGetter() |
708 | + g.process_request(msg) |
709 | + except Exception as e: |
710 | + if not isinstance(e, DebugGetterRetry) and not isinstance( |
711 | + e, DebugGetterTimeout |
712 | + ): |
713 | + # This is some other kind of error that we need to deal |
714 | + # with. Mark it as such. |
715 | + _record_error_into_db(e, msg) |
716 | + # We still need to raise the exception here. Celery will |
717 | + # reschedule the task if applicable. |
718 | + raise e |
719 | |
720 | |
721 | @app.task( |
722 | @@ -77,10 +109,20 @@ def grab_ddebs_sources(msg): |
723 | """Dispatch the DdebSourceCodeGetter task. |
724 | |
725 | :param dict(str -> str) msg: The dictionary containing the message |
726 | - that will be processed by the getter. |
727 | - """ |
728 | - g = DdebSourceCodeGetter() |
729 | - g.process_request(msg) |
730 | + that will be processed by the getter.""" |
731 | + try: |
732 | + g = DdebSourceCodeGetter() |
733 | + g.process_request(msg) |
734 | + except Exception as e: |
735 | + if not isinstance(e, DebugGetterRetry) and not isinstance( |
736 | + e, DebugGetterTimeout |
737 | + ): |
738 | + # This is some other kind of error that we need to deal |
739 | + # with. Mark it as such. |
740 | + _record_error_into_db(e, msg) |
741 | + # We still need to raise the exception here. Celery will |
742 | + # reschedule the task if applicable. |
743 | + raise e |
744 | |
745 | |
746 | @worker_ready.connect |
747 | @@ -95,5 +137,38 @@ def notify_worker_shutting_down(**kwargs): |
748 | sdnotifier.notify("STOPPING=1") |
749 | |
750 | |
751 | +@task_postrun.connect |
752 | +def task_postrun(sender, args, state, **kwargs): |
753 | + """Process a task that's just finished. |
754 | + |
755 | + We keep the state of each task in a SQL database, and this |
756 | + function is responsible for cleaning up the state for tasks that have |
757 | + succeeded.""" |
758 | + if state not in ("SUCCESS",): |
759 | + return |
760 | + |
761 | + with DebugDB() as db: |
762 | + with db.cursor() as cur: |
763 | + msg = args[0] |
764 | + jobid = msg["jobid"] |
765 | + cur.execute("DELETE FROM tasks WHERE id = %s", (jobid,)) |
766 | + |
767 | + |
768 | +@celeryd_init.connect |
769 | +def setup_worker(sender=None, **kwargs): |
770 | + """Setup the worker. |
771 | + |
772 | + We have to create the tables that will be populated when |
773 | + processing tasks.""" |
774 | + with DebugDB() as db: |
775 | + with db.cursor() as cur: |
776 | + cur.execute( |
777 | + "CREATE TABLE IF NOT EXISTS tasks (id serial PRIMARY KEY, source text, version text, arch text)" |
778 | + ) |
779 | + cur.execute( |
780 | + "CREATE TABLE IF NOT EXISTS errors (id serial PRIMARY KEY, task text, exception text)" |
781 | + ) |
782 | + |
783 | + |
784 | if __name__ == "__main__": |
785 | app.start() |
786 | diff --git a/debugpoller.py b/debugpoller.py |
787 | index 058909d..2d74723 100644 |
788 | --- a/debugpoller.py |
789 | +++ b/debugpoller.py |
790 | @@ -58,9 +58,10 @@ class DebugPoller: |
791 | |
792 | :param bool dry_run: Tell the poller that it shouldn't record |
793 | the timestamp in the file when the operation finishes. |
794 | - Default is False. |
795 | - """ |
796 | - self._lp = Launchpad.login_anonymously("ubuntu-debuginfod poller", "production") |
797 | + Default is False.""" |
798 | + self._lp = Launchpad.login_anonymously( |
799 | + "ubuntu-debuginfod poller", "production", version="devel" |
800 | + ) |
801 | self._main_archive = self._lp.distributions["ubuntu"].main_archive |
802 | self.TIMESTAMP_FILE = self.TIMESTAMP_FILE + f"-{module_name}" |
803 | |
804 | @@ -84,8 +85,7 @@ class DebugPoller: |
805 | :param interval: Specify how long ago (in hours) the timestamp must |
806 | refer to. If not specified, the timestamp is generated for |
807 | the current time. |
808 | - :type interval: int or None |
809 | - """ |
810 | + :type interval: int or None""" |
811 | d = datetime.datetime.now(datetime.timezone.utc) |
812 | |
813 | if interval is not None: |
814 | @@ -100,8 +100,7 @@ class DebugPoller: |
815 | new one. |
816 | |
817 | If a timestamp file exists, returns its value. Otherwise, |
818 | - generate a new one with self._initial_interval. |
819 | - """ |
820 | + generate a new one with self._initial_interval.""" |
821 | if not os.path.exists(self.TIMESTAMP_FILE) or self._force_initial_interval: |
822 | self._logger.debug(f"Timestamp file '{self.TIMESTAMP_FILE}' doesn't exist") |
823 | self._logger.debug( |
824 | @@ -110,14 +109,13 @@ class DebugPoller: |
825 | return self._generate_timestamp(interval=self._initial_interval) |
826 | |
827 | with open(self.TIMESTAMP_FILE, "r", encoding="UTF-8") as f: |
828 | - return f.readline() |
829 | + return f.readline().rstrip() |
830 | |
831 | - def _record_timestamp(self, timestamp): |
832 | + def record_timestamp(self, timestamp): |
833 | """Save the timestamp into the timestamp file. |
834 | |
835 | :param str timestamp: Timestamp that should be saved into the |
836 | - TIMESTAMP_FILE. |
837 | - """ |
838 | + TIMESTAMP_FILE.""" |
839 | if self._dry_run: |
840 | self._logger.debug("dry_run enabled, not recording timestamp file") |
841 | return |
842 | @@ -130,6 +128,5 @@ class DebugPoller: |
843 | def set_initial_interval(self, initial_interval): |
844 | """Set the initial_interval value. |
845 | |
846 | - :param int initial_interval: The new initial_interval to be used. |
847 | - """ |
848 | + :param int initial_interval: The new initial_interval to be used.""" |
849 | self._initial_interval = initial_interval |
850 | diff --git a/poll_launchpad.py b/poll_launchpad.py |
851 | index 161aa77..65afa28 100644 |
852 | --- a/poll_launchpad.py |
853 | +++ b/poll_launchpad.py |
854 | @@ -19,16 +19,87 @@ |
855 | |
856 | from ddebpoller import DdebPoller |
857 | from debuginfod import grab_ddebs, grab_ddebs_sources |
858 | +from utils import DebugDB |
859 | |
860 | -from celery import chain |
861 | +import psycopg2 |
862 | |
863 | -def poll_launchpad(): |
864 | - """Poll Launchpad.""" |
865 | + |
866 | +def ensure_task(conn, cursor, msg): |
867 | + """Return True if the task described by MSG should be processed. |
868 | + |
869 | + Also, if True, then insert the task information into the database. |
870 | + |
871 | + :param DebugDB conn: The database connection handler. |
872 | + |
873 | + :param psycopg2.cursor cursor: The cursor to the SQL database. |
874 | + |
875 | + :param dict msg: The message describing a task. |
876 | + |
877 | + :rtype: bool, int |
878 | + :return: True or False, and the jobid if True (or None if False)""" |
879 | + srcname = msg["source_package"] |
880 | + srcver = msg["version"] |
881 | + architecture = msg["architecture"] |
882 | + cursor.execute( |
883 | + "SELECT * FROM tasks WHERE source = %s AND version = %s AND arch = %s", |
884 | + ( |
885 | + srcname, |
886 | + srcver, |
887 | + architecture, |
888 | + ), |
889 | + ) |
890 | + |
891 | + if cursor.rowcount > 0: |
892 | + return False, None |
893 | + |
894 | + cursor.execute( |
895 | + "INSERT INTO tasks (source, version, arch) VALUES (%s, %s, %s) RETURNING id", |
896 | + ( |
897 | + srcname, |
898 | + srcver, |
899 | + architecture, |
900 | + ), |
901 | + ) |
902 | + jobid = cursor.fetchone()[0] |
903 | + conn.commit() |
904 | + return True, jobid |
905 | + |
906 | + |
907 | +def poll_launchpad(conn, cursor): |
908 | + """Poll Launchpad and process the list of ddebs and source packages. |
909 | + |
910 | + :param DebugDB conn: The SQL connection handler. |
911 | + |
912 | + :param psycopg2.cursor cursor: The cursor to the SQL database.""" |
913 | poller = DdebPoller() |
914 | - for msg in poller.get_ddebs(): |
915 | - # We have to use a chain here because the ddebs need to be |
916 | - # downloaded before we can start fetching the sources. |
917 | - chain(grab_ddebs.si(msg), grab_ddebs_sources.si(msg))() |
918 | + messages, new_timestamp = poller.get_ddebs_and_sources() |
919 | + for msg in messages: |
920 | + proceed, jobid = ensure_task(conn, cursor, msg) |
921 | + if proceed: |
922 | + if jobid is None: |
923 | + raise RuntimeError("jobid is None") |
924 | + # We append the ID to the message so that we can efficiently |
925 | + # remove the task once it's been processed. |
926 | + msg["jobid"] = str(jobid) |
927 | + |
928 | + if msg["architecture"] == "source": |
929 | + # Source packages have their "architecture" as "source". |
930 | + grab_ddebs_sources.delay(msg) |
931 | + else: |
932 | + grab_ddebs.delay(msg) |
933 | + poller.record_timestamp(new_timestamp) |
934 | + |
935 | + |
936 | +def main(): |
937 | + # Connect to the database. |
938 | + with DebugDB() as db: |
939 | + with db.cursor() as cursor: |
940 | + cursor.execute( |
941 | + "CREATE TABLE IF NOT EXISTS tasks (id serial PRIMARY KEY, source text, version text, arch text)" |
942 | + ) |
943 | + db.commit() |
944 | + poll_launchpad(db, cursor) |
945 | + |
946 | |
947 | -if __name__ == '__main__': |
948 | - poll_launchpad() |
949 | +if __name__ == "__main__": |
950 | + main() |
951 | diff --git a/utils.py b/utils.py |
952 | new file mode 100644 |
953 | index 0000000..b577009 |
954 | --- /dev/null |
955 | +++ b/utils.py |
956 | @@ -0,0 +1,48 @@ |
957 | +#!/usr/bin/python3 |
958 | + |
959 | +# Copyright (C) 2022 Canonical Ltd. |
960 | + |
961 | +# This program is free software: you can redistribute it and/or modify |
962 | +# it under the terms of the GNU General Public License as published by |
963 | +# the Free Software Foundation, either version 3 of the License, or |
964 | +# (at your option) any later version. |
965 | + |
966 | +# This program is distributed in the hope that it will be useful, |
967 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
968 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
969 | +# GNU General Public License for more details. |
970 | + |
971 | +# You should have received a copy of the GNU General Public License |
972 | +# along with this program. If not, see <https://www.gnu.org/licenses/>. |
973 | + |
974 | +# Authors: Sergio Durigan Junior <sergio.durigan@canonical.com> |
975 | + |
976 | +import psycopg2 |
977 | + |
978 | + |
979 | +class DebugDB: |
980 | + """Abstract a connection to the SQL database.""" |
981 | + |
982 | + def __init__(self): |
983 | + self._conn = psycopg2.connect("dbname=ubuntu-debuginfod user=mirror") |
984 | + |
985 | + def cursor(self): |
986 | + """Return the cursor to be used when dealing with the database.""" |
987 | + return self._conn.cursor() |
988 | + |
989 | + def commit(self): |
990 | + """Commit the transaction to the DB.""" |
991 | + self._conn.commit() |
992 | + |
993 | + def __enter__(self): |
994 | + return self |
995 | + |
996 | + def __exit__(self, exc_type, exc_value, exc_traceback): |
997 | + self._conn.commit() |
998 | + self._conn.close() |
999 | + self._conn = None |
1000 | + |
1001 | + def __del__(self): |
1002 | + if self._conn is not None: |
1003 | + self._conn.commit() |
1004 | + self._conn.close() |
Quick review limited to just commit 8595463... yes this looks good. Since that commit is followup to the prior MP feel free to land it independently if you'd like.
Rest of this MP will take longer to digest.