Merge ~litios/ubuntu-cve-tracker:feature/data-lib into ubuntu-cve-tracker:master
- Git
- lp:~litios/ubuntu-cve-tracker
- feature/data-lib
- Merge into master
Status: | Merged |
---|---|
Approved by: | Eduardo Barretto |
Approved revision: | 38ffc5e353ce07574295f3d214cd3b66c1a86106 |
Merge reported by: | David Fernandez Gonzalez |
Merged at revision: | 38ffc5e353ce07574295f3d214cd3b66c1a86106 |
Proposed branch: | ~litios/ubuntu-cve-tracker:feature/data-lib |
Merge into: | ubuntu-cve-tracker:master |
Diff against target: |
1000 lines (+955/-0) 7 files modified
scripts/datalib/__init__.py (+5/-0) scripts/datalib/config.py (+18/-0) scripts/datalib/models.py (+245/-0) scripts/datalib/storage.py (+86/-0) scripts/datalib/uct_models.py (+259/-0) scripts/datalib/uct_storage.py (+322/-0) scripts/datalib/utils.py (+20/-0) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Eduardo Barretto | Approve | ||
Nick Galanis | Approve | ||
Review via email:
|
Commit message
Description of the change
This library provides a project-agnostic interface for loading CVEs and Packages.
This library loads all the information from the different sources into well-defined objects with shared interfaces.
The storage classes load handle these objects and handle them to avoid memory duplication.
-----
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
David Fernandez Gonzalez (litios) wrote : | # |
- 558d0f0... by David Fernandez Gonzalez
-
Revert "[Data lib] Add docs"
This reverts commit eaae95160bbf4c2
9a076db144afad5 62c85844e5.
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
David Fernandez Gonzalez (litios) wrote : | # |
Docs were not meant to be merged, it was for the PR itself.
Reverting the change so you can still see them but I'll simply erase the 2 last commits when merging.
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Nick Galanis (nickgalanis) wrote : | # |
LGTM! Thanks David, amazing work. It's a very clean and abstract representation of our data. I ran the script you provided and was functional. Some things to consider (as already discussed):
- Ensure python3.8 compatibility for focal machines
- Possibly revisit the idea of multi-threading that could cause low-memory machines to crash
We would need to add/change some fields/functions as we move forward, but for now, I think it is safe to merge.
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Diogo Sousa (0xdsousa) wrote : | # |
Absolutely loving the type hints on everything, great work.
- 9531bb9... by David Fernandez Gonzalez
-
[DATA LIB] Allow getting source versions per release
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 6f1f847... by David Fernandez Gonzalez
-
[DATA LIB] Don't link to object if it hasn't been loaded
Signed-off-by: David Fernandez Gonzalez <email address hidden>
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
David Fernandez Gonzalez (litios) wrote (last edit ): | # |
Adding two more commits:
* Include a feature to retrieve source versions from a package based on the release. Test from the example above:
Showing bionic versions {2.6-1, 2.5-1}
Showing bionic versions, including esm releases {2.6-1, 2.6-1ubuntu0.
* Fix bug for USNs in case the CVE or the package was not loaded when loading the USN. In that case, those won't get added.
- 296894d... by David Fernandez Gonzalez
-
[Data lib] Use apt_pkg for version comparison
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 38ffc5e... by David Fernandez Gonzalez
-
[DATA LIB] Store CVEs, packages and SNs ordered.
For CVE and SNs, sort them based on the integer
values rather than standard ascii comparison.Signed-off-by: David Fernandez Gonzalez <email address hidden>
![](/+icing/build/overlay/assets/skins/sam/images/close.gif)
Eduardo Barretto (ebarretto) wrote : | # |
lgtm, thanks!
Preview Diff
1 | diff --git a/scripts/datalib/__init__.py b/scripts/datalib/__init__.py |
2 | new file mode 100644 |
3 | index 0000000..24bda8c |
4 | --- /dev/null |
5 | +++ b/scripts/datalib/__init__.py |
6 | @@ -0,0 +1,5 @@ |
7 | +from .config import * |
8 | +from .models import * |
9 | +from .uct_models import * |
10 | +from .storage import * |
11 | +from .uct_storage import * |
12 | diff --git a/scripts/datalib/config.py b/scripts/datalib/config.py |
13 | new file mode 100644 |
14 | index 0000000..6869d92 |
15 | --- /dev/null |
16 | +++ b/scripts/datalib/config.py |
17 | @@ -0,0 +1,18 @@ |
18 | +#!/usr/bin/python3 |
19 | +# -*- coding: utf-8 -*- |
20 | +# Module containing classes that represent CVE and Package data |
21 | +# |
22 | +# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com> |
23 | +# Copyright (C) 2024 Canonical Ltd. |
24 | +# |
25 | +# This script is distributed under the terms and conditions of the GNU General |
26 | +# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html |
27 | +# for details. |
28 | +# |
29 | +import os |
30 | + |
31 | +VERBOSE = os.environ.get('UCTLIB_VERBOSE', True) |
32 | +THREADING = os.environ.get('UCTLIB_THREADING', True) |
33 | +UCT = os.environ.get('UCT') |
34 | +PKG_CACHE_DIR = os.environ.get('UCTLIB_PKG_CACHE_DIR', '~/.cache/pkg-cache') |
35 | +USN_DATABASE = os.environ.get('UCTLIB_USN_DATABASE_DIR', '~/.cache/usn/database.json') |
36 | diff --git a/scripts/datalib/models.py b/scripts/datalib/models.py |
37 | new file mode 100644 |
38 | index 0000000..adc3e72 |
39 | --- /dev/null |
40 | +++ b/scripts/datalib/models.py |
41 | @@ -0,0 +1,245 @@ |
42 | +#!/usr/bin/python3 |
43 | +# -*- coding: utf-8 -*- |
44 | +# Module containing classes that represent CVE and Package data |
45 | +# |
46 | +# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com> |
47 | +# Copyright (C) 2024 Canonical Ltd. |
48 | +# |
49 | +# This script is distributed under the terms and conditions of the GNU General |
50 | +# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html |
51 | +# for details. |
52 | +# |
53 | + |
54 | +import re |
55 | +import datetime |
56 | +import apt_pkg |
57 | +apt_pkg.init_system() |
58 | + |
59 | +class Version: |
60 | + version: str |
61 | + |
62 | + def __init__(self, version) -> None: |
63 | + self.version = version |
64 | + |
65 | + def compare_version(self, other: 'Version') -> int: |
66 | + if not hasattr(other, 'version'): raise Exception("Cannot compare") |
67 | + return apt_pkg.version_compare(self.version, other.version) |
68 | + |
69 | + def __eq__(self, other: 'Version') -> bool: |
70 | + if not hasattr(other, 'version'): raise Exception("Cannot compare") |
71 | + return self.compare_version(other) == 0 |
72 | + |
73 | + def __lt__(self, other: 'Version') -> bool: |
74 | + if not hasattr(other, 'version'): raise Exception("Cannot compare") |
75 | + return self.compare_version(other) == -1 |
76 | + |
77 | + def __hash__(self) -> int: |
78 | + return hash(self.version) |
79 | + |
80 | + def __repr__(self) -> str: |
81 | + return self.version |
82 | + |
83 | +class BinaryPackage: |
84 | + name: str |
85 | + version: Version |
86 | + arches: list[str] |
87 | + source: 'SourcePackage' |
88 | + |
89 | + def __init__(self, name: str, version: Version, arches: list[str], source: 'SourcePackage') -> None: |
90 | + self.name = name |
91 | + self.version = version |
92 | + self.arches = arches |
93 | + self.source = source |
94 | + |
95 | + def __eq__(self, other: object) -> bool: |
96 | + return self.name == other.name and \ |
97 | + self.version == other.version and \ |
98 | + self.source == other.source |
99 | + |
100 | +class SourcePackage: |
101 | + name: str |
102 | + source_versions: set[Version] |
103 | + binaries: dict[Version:list[BinaryPackage]] |
104 | + |
105 | + def __init__(self, name: str, binaries: dict[Version:list[BinaryPackage]]) -> None: |
106 | + self.name = name |
107 | + self.binaries = binaries |
108 | + self.source_versions = set() |
109 | + |
110 | + def __eq__(self, other: object) -> bool: |
111 | + return self.name == other.name and \ |
112 | + self.source_versions == other.source_versions and \ |
113 | + self.binaries == other.binaries |
114 | + |
115 | + def is_same_package(self, other: object) -> bool: |
116 | + """Is same package but different version""" |
117 | + return self.name == other.name |
118 | + |
119 | + def get_source_versions(self) -> set[Version]: |
120 | + return self.source_versions |
121 | + |
122 | + def get_binaries(self, version: Version) -> list[BinaryPackage]: |
123 | + if version not in self.binaries: return [] |
124 | + return self.binaries[version] |
125 | + |
126 | + def add_source_version(self, version: Version) -> None: |
127 | + if version in self.source_versions: return |
128 | + self.source_versions.add(version) |
129 | + self.binaries[version] = [] |
130 | + |
131 | + def add_binary(self, src_version: Version, binary: BinaryPackage) -> None: |
132 | + if src_version not in self.binaries: return |
133 | + if binary not in self.binaries[src_version]: |
134 | + self.binaries[src_version].append(binary) |
135 | + |
136 | + def is_binary_present(self, src_version: Version, binary: BinaryPackage) -> bool: |
137 | + if src_version not in self.binaries: return False |
138 | + return binary in self.binaries[src_version] |
139 | + |
140 | +class CVSS: |
141 | + entity: str |
142 | + score: float |
143 | + severity: str |
144 | + vector: str |
145 | + |
146 | + def __init__(self, entity: str, score: float, vector: str) -> None: |
147 | + self.entity = entity |
148 | + self.score = score |
149 | + self.vector = vector |
150 | + if self.score == 0.0: |
151 | + self.severity = 'None' |
152 | + elif self.score < 4.0: |
153 | + self.severity = 'Low' |
154 | + elif self.score < 7.0: |
155 | + self.severity = 'Medium' |
156 | + elif self.score < 9.0: |
157 | + self.severity = 'High' |
158 | + elif self.score <= 10.0: |
159 | + self.severity = 'Critical' |
160 | + else: |
161 | + Exception(f'{self.score} is not a valid CVSS score.') |
162 | + |
163 | + def __lt__(self, other): |
164 | + return self.score < other.score |
165 | + |
166 | + def __repr__(self) -> str: |
167 | + return f'[{self.severity}] {self.vector} ({self.score}) -- {self.entity}' |
168 | + |
169 | +class CVE: |
170 | + id: str |
171 | + ID_REGEX = r'CVE-\d{4}-\d{4,7}' |
172 | + description: str |
173 | + public_date: str |
174 | + cvss: list[CVSS] |
175 | + assigned_to: str |
176 | + notes: list[tuple] |
177 | + references: list[str] |
178 | + priority: str |
179 | + priority_reason: str |
180 | + pkg_entries: list["CVEPkgEntry"] |
181 | + |
182 | + def __init__(self, id: str, description: str, public_date: str, cvss: list[CVSS], |
183 | + assigned_to: str, notes: list[tuple], references: list[str], |
184 | + priority: str, priority_reason: str) -> None: |
185 | + |
186 | + if not re.search(self.ID_REGEX, id): |
187 | + raise Exception(f'{id} is not a valid CVE ID') |
188 | + |
189 | + self.id = id |
190 | + self.description = description.strip() |
191 | + self.public_date = public_date |
192 | + self.cvss = cvss |
193 | + self.assigned_to = assigned_to |
194 | + self.notes = notes |
195 | + self.references = references |
196 | + self.priority = priority |
197 | + self.priority_reason = priority_reason |
198 | + self.pkg_entries = [] |
199 | + |
200 | + def __lt__(self, other): |
201 | + if not hasattr(other, 'id'): raise Exception("Cannot compare") |
202 | + _, our_year, our_number = self.id.split('-') |
203 | + _, other_year, other_number = other.id.split('-') |
204 | + |
205 | + return int(our_year) < int(other_year) or \ |
206 | + int(our_year) == int(other_year) and int(our_number) < int(other_number) |
207 | + |
208 | + def add_reference(self, ref: str) -> None: |
209 | + if ref not in self.references: |
210 | + self.references.append(ref) |
211 | + |
212 | + def add_note(self, author: str, note: str) -> None: |
213 | + self.notes.append((author, note)) |
214 | + |
215 | + def set_assigned_to(self, assigned_to: str) -> None: |
216 | + self.assigned_to = assigned_to |
217 | + |
218 | + def is_package_present(self, package: SourcePackage) -> bool: |
219 | + for cve_entry in self.pkg_entries: |
220 | + if package == cve_entry.pkg: |
221 | + return True |
222 | + |
223 | + return False |
224 | + |
225 | + def get_package_entry(self, package: SourcePackage) -> 'CVEPkgEntry': |
226 | + for cve_entry in self.pkg_entries: |
227 | + if package == cve_entry.pkg: |
228 | + return cve_entry |
229 | + |
230 | + return None |
231 | + |
232 | + def add_cve_entry(self, entry: 'CVEPkgEntry') -> bool: |
233 | + for self_entry in self.pkg_entries: |
234 | + if self_entry.pkg == entry.pkg: |
235 | + return False |
236 | + self.pkg_entries.append(entry) |
237 | + return True |
238 | + |
239 | + def needs_triage(self) -> bool: |
240 | + if self.priority == 'needs-triage': return True |
241 | + for cve_entry in self.pkg_entries: |
242 | + if cve_entry.status == 'needs-triage': |
243 | + return True |
244 | + |
245 | + return False |
246 | + |
247 | + def __eq__(self, other: object) -> bool: |
248 | + return self.id == other.id |
249 | + |
250 | + def __repr__(self) -> str: |
251 | + return str(vars(self)) |
252 | + |
253 | + |
254 | +class CVEPkgEntry: |
255 | + pkg: SourcePackage |
256 | + cve: CVE |
257 | + status: str |
258 | + note: str |
259 | + |
260 | + STATUSES = ['needs-triage', 'needed', 'not-affected', 'released', 'ignored', 'deferred', 'DNE', 'pending'] |
261 | + def __init__(self, pkg: SourcePackage, cve: CVE, status: str, note: str) -> None: |
262 | + self.pkg = pkg |
263 | + self.cve = cve |
264 | + |
265 | + if status not in self.STATUSES: |
266 | + raise Exception(f'{status} is not a valid status') |
267 | + |
268 | + self.status = status |
269 | + self.note = note |
270 | + |
271 | +class SN: |
272 | + id: str |
273 | + cves: set[CVE] |
274 | + packages: set[SourcePackage] |
275 | + package_fixed_versions: dict[SourcePackage:Version] |
276 | + lp_bugs: set[str] |
277 | + description: str |
278 | + date: datetime.date |
279 | + |
280 | + def __init__(self, description: str, date: datetime.date, pkg_fixed_info: dict) -> None: |
281 | + self.cves = set() |
282 | + self.packages = set() |
283 | + self.lp_bugs = set() |
284 | + self.description = description |
285 | + self.date = date |
286 | + self.package_fixed_versions = pkg_fixed_info |
287 | \ No newline at end of file |
288 | diff --git a/scripts/datalib/storage.py b/scripts/datalib/storage.py |
289 | new file mode 100644 |
290 | index 0000000..fc5c271 |
291 | --- /dev/null |
292 | +++ b/scripts/datalib/storage.py |
293 | @@ -0,0 +1,86 @@ |
294 | +#!/usr/bin/python3 |
295 | +# -*- coding: utf-8 -*- |
296 | +# Module containing classes that represent CVE and Package data |
297 | +# |
298 | +# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com> |
299 | +# Copyright (C) 2024 Canonical Ltd. |
300 | +# |
301 | +# This script is distributed under the terms and conditions of the GNU General |
302 | +# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html |
303 | +# for details. |
304 | +# |
305 | +from datalib.uct_models import SourcePackage |
306 | +from .models import * |
307 | + |
308 | +class PackageStorage: |
309 | + """ |
310 | + Storage class for handling packages. |
311 | + """ |
312 | + packages: dict[str: SourcePackage] |
313 | + |
314 | + def __init__(self) -> None: |
315 | + self.packages = {} |
316 | + |
317 | + def _order_packages(self) -> None: |
318 | + self.packages = dict(sorted(self.packages.items())) |
319 | + |
320 | + def get_package(self, pkg_name: str) -> SourcePackage: |
321 | + if pkg_name not in self.packages: return None |
322 | + return self.packages[pkg_name] |
323 | + |
324 | +class CVEStorage: |
325 | + """ |
326 | + Storage class for handling CVE packages. |
327 | + """ |
328 | + cves: dict[str: CVE] |
329 | + package_storage: PackageStorage |
330 | + |
331 | + # TODO: add a changed cache so we can cache CVEs and only update when something got loaded |
332 | + |
333 | + def __init__(self) -> None: |
334 | + self.cves = {} |
335 | + |
336 | + def link_pkg_storage(self, package_storage: PackageStorage) -> None: |
337 | + self.package_storage = package_storage |
338 | + |
339 | + def _order_cves(self) -> None: |
340 | + self.cves = dict(sorted(self.cves.items(), |
341 | + key=lambda value: int(value[0].split('-')[1]) * 10 ** 8 + int(value[0].split('-')[2]))) |
342 | + |
343 | + def _init_cve(self, cve: CVE, pkg_data: dict) -> None: |
344 | + """Internal function to add CVE object to the class storage""" |
345 | + self.cves[cve.id] = (cve, pkg_data) |
346 | + |
347 | + def load_cve(self, cve_id: str) -> None: |
348 | + """Load a CVE into the storage""" |
349 | + raise NotImplementedError |
350 | + |
351 | + def get_cve(self, cve_id: str) -> CVE: |
352 | + """ |
353 | + Get a CVE from the storage. |
354 | + The package links will be computed when the |
355 | + CVE is requested, extracting them from the internal |
356 | + PackageStorage. |
357 | + """ |
358 | + raise NotImplementedError |
359 | + |
360 | +class SNStorage: |
361 | + sns: dict[str: SN] |
362 | + package_storage: PackageStorage |
363 | + cve_storage: CVEStorage |
364 | + |
365 | + def __init__(self) -> None: |
366 | + self.sns = {} |
367 | + |
368 | + def _order_sns(self) -> None: |
369 | + self.sns = dict(sorted(self.sns.items(), |
370 | + key=lambda value: int(value[0].split('-')[0]) * 10 ** 8 + int(value[0].split('-')[1]))) |
371 | + |
372 | + def link_pkg_storage(self, package_storage: PackageStorage) -> None: |
373 | + self.package_storage = package_storage |
374 | + |
375 | + def link_cve_storage(self, cve_storage: CVEStorage) -> None: |
376 | + self.cve_storage = cve_storage |
377 | + |
378 | + def load_sn(self, object: SN) -> None: |
379 | + return NotImplementedError |
380 | \ No newline at end of file |
381 | diff --git a/scripts/datalib/uct_models.py b/scripts/datalib/uct_models.py |
382 | new file mode 100644 |
383 | index 0000000..7b9fe01 |
384 | --- /dev/null |
385 | +++ b/scripts/datalib/uct_models.py |
386 | @@ -0,0 +1,259 @@ |
387 | +#!/usr/bin/python3 |
388 | +# -*- coding: utf-8 -*- |
389 | +# Module containing classes that represent CVE and Package data |
390 | +# |
391 | +# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com> |
392 | +# Copyright (C) 2024 Canonical Ltd. |
393 | +# |
394 | +# This script is distributed under the terms and conditions of the GNU General |
395 | +# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html |
396 | +# for details. |
397 | +# |
398 | + |
399 | +from datalib.models import CVSS |
400 | +from .models import * |
401 | +from cve_lib import get_subproject_details, get_active_esm_releases, is_active_release |
402 | + |
403 | +class Release: |
404 | + name: str |
405 | + version: str |
406 | + codename: str |
407 | + product: str |
408 | + series: str |
409 | + is_esm: bool |
410 | + is_active: bool |
411 | + requires_oval: bool |
412 | + parent: "Release" |
413 | + |
414 | + def __init__(self, name: str) -> None: |
415 | + canon, product, series, details = get_subproject_details(name) |
416 | + if not canon: |
417 | + self.name = name |
418 | + self.parent = None |
419 | + return |
420 | + self.name = canon |
421 | + self.product = product |
422 | + self.series = series |
423 | + self.is_esm = self.name in get_active_esm_releases() |
424 | + self.is_active = is_active_release(self.name) |
425 | + self.parent = None |
426 | + self.version = None |
427 | + self.codename = None |
428 | + if not details: return |
429 | + if 'oval' in details: |
430 | + self.requires_oval = details['oval'] |
431 | + else: |
432 | + self.requires_oval = True |
433 | + if 'parent' in details: |
434 | + self.parent = Release(details['parent']) |
435 | + if 'version' in details: |
436 | + self.version = details['version'] |
437 | + if 'codename' in details: |
438 | + self.codename = details['codename'] |
439 | + |
440 | + def __eq__(self, value: object) -> bool: |
441 | + return self.name == value.name |
442 | + |
443 | + def __repr__(self) -> str: |
444 | + return self.name |
445 | + |
446 | + def __hash__(self): |
447 | + return hash(self.name) |
448 | + |
449 | + def get_esm_release(self, component: str) -> "Release": |
450 | + esm_releases = get_active_esm_releases() |
451 | + potential_releases = list(filter(lambda release: self.series in release, esm_releases)) |
452 | + if len(potential_releases) == 1: |
453 | + return Release(potential_releases[0]) |
454 | + elif len(potential_releases) == 2: |
455 | + if component in ['main', 'restricted']: |
456 | + target = 'esm-infra' |
457 | + elif component in ['universe', 'multiverse']: |
458 | + target = 'esm-apps' |
459 | + |
460 | + if target in potential_releases[0]: return Release(potential_releases[0]) |
461 | + else: return Release(potential_releases[1]) |
462 | + |
463 | + return None |
464 | + def is_parent_release(self, release: "Release") -> bool: |
465 | + return release in self.get_all_parents() |
466 | + |
467 | + def get_direct_parent(self) -> "Release": |
468 | + return self.parent |
469 | + |
470 | + def get_all_parents(self) -> "Release": |
471 | + release = self |
472 | + all_parents = [] |
473 | + while release.parent: |
474 | + release = release.parent |
475 | + all_parents.append(release) |
476 | + return all_parents |
477 | + |
478 | + def get_oldest_parent(self) -> "Release": |
479 | + if not self.parent: return None |
480 | + return self.get_all_parents()[-1] |
481 | + |
482 | +class UbuntuVersion(Version): |
483 | + def is_esm(self) -> bool: |
484 | + return 'esm' in self.version |
485 | + |
486 | +class UbuntuBinaryPackage(BinaryPackage): |
487 | + component: str |
488 | + |
489 | + def __init__(self, name: str, version: UbuntuVersion, arches: list[str], source: 'UbuntuSourcePackage', component: str) -> None: |
490 | + self.component = component |
491 | + super().__init__(name, version, arches, source) |
492 | + |
493 | +class UbuntuSourcePackage(SourcePackage): |
494 | + source_info: dict[UbuntuVersion: dict] |
495 | + binaries: dict[UbuntuVersion:list[UbuntuBinaryPackage]] |
496 | + |
497 | + def __init__(self, name, binaries) -> None: |
498 | + self.source_info = {} |
499 | + super().__init__(name, binaries) |
500 | + |
501 | + def add_source_version(self, version: UbuntuVersion, release: Release, component: str, pocket: str): |
502 | + if version in self.source_versions: return |
503 | + self.source_info[version] = { |
504 | + 'pocket': pocket, |
505 | + 'component': component, |
506 | + 'release': release |
507 | + } |
508 | + return super().add_source_version(version) |
509 | + |
510 | + def get_version_release(self, version: UbuntuVersion) -> Release: |
511 | + if version not in self.source_versions: return None |
512 | + return self.source_info[version]['release'] |
513 | + |
514 | + def get_version_component(self, version: UbuntuVersion) -> str: |
515 | + if version not in self.source_versions: return None |
516 | + return self.source_info[version]['component'] |
517 | + |
518 | + def get_version_pocket(self, version: UbuntuVersion) -> str: |
519 | + if version not in self.source_versions: return None |
520 | + return self.source_info[version]['pocket'] |
521 | + |
522 | + def get_release_source_versions(self, release: Release, include_parents: bool = False, include_esm: bool = False) -> set[UbuntuVersion]: |
523 | + rel_versions = set() |
524 | + for source_version in self.source_versions: |
525 | + source_version_rel = self.get_version_release(source_version) |
526 | + component = self.get_version_component(source_version) |
527 | + if (include_parents and release.is_parent_release(source_version_rel)) or \ |
528 | + (include_esm and source_version_rel == release.get_esm_release(component)) or \ |
529 | + source_version_rel == release: |
530 | + rel_versions.add(source_version) |
531 | + |
532 | + return rel_versions |
533 | + |
534 | + def get_latest_version(self, release: Release = None, include_parents: bool = False, include_esm: bool = False) -> UbuntuVersion: |
535 | + if release: |
536 | + source_versions = self.get_release_source_versions(release, include_parents, include_esm) |
537 | + else: |
538 | + source_versions = self.get_source_versions() |
539 | + |
540 | + if not source_versions: return None |
541 | + |
542 | + latest = None |
543 | + for source_version in source_versions: |
544 | + if not latest or source_version > latest: |
545 | + latest = source_version |
546 | + |
547 | + return latest |
548 | + |
549 | + def get_earliest_version(self, release: Release = None, include_parents: bool = False, include_esm: bool = False) -> UbuntuVersion: |
550 | + if release: |
551 | + source_versions = self.get_release_source_versions(release, include_parents, include_esm) |
552 | + else: |
553 | + source_versions = self.get_source_versions() |
554 | + |
555 | + if not source_versions: return None |
556 | + |
557 | + earliest = None |
558 | + for source_version in source_versions: |
559 | + if not earliest or source_version < earliest: |
560 | + earliest = source_version |
561 | + |
562 | + return earliest |
563 | + |
564 | + def expand_versions(self, other: "UbuntuSourcePackage") -> None: |
565 | + self.source_versions.update(other.source_versions) |
566 | + self.source_info.update(other.source_info) |
567 | + self.binaries.update(other.binaries) |
568 | + |
569 | + def remove_version(self, version: UbuntuVersion) -> None: |
570 | + self.source_versions.remove(version) |
571 | + del self.source_info[version] |
572 | + del self.binaries[version] |
573 | + |
574 | + def release_exists(self, release: Release, include_parents: bool) -> bool: |
575 | + """ |
576 | + Check if release exists in the package. |
577 | + |
578 | + If include_parents, it will also check if any of the release |
579 | + parents are listed. |
580 | + """ |
581 | + for _, entry in self.source_info.items(): |
582 | + if release == entry['release']: |
583 | + return True |
584 | + if include_parents and release.is_parent_release(entry['release']): |
585 | + return True |
586 | + |
587 | + return False |
588 | + |
589 | + def __repr__(self) -> str: |
590 | + return f'{self.name}' |
591 | + |
592 | + def __hash__(self) -> int: |
593 | + return hash(self.name) |
594 | + |
595 | +class UbuntuCVEPkgEntry(CVEPkgEntry): |
596 | + release: Release |
597 | + |
598 | + def __init__(self, pkg: UbuntuSourcePackage, cve: 'UbuntuCVE', status: str, note: str, release: Release) -> None: |
599 | + self.release = release |
600 | + super().__init__(pkg, cve, status, note) |
601 | + |
602 | +class UbuntuCVE(CVE): |
603 | + pkg_entries: list[UbuntuCVEPkgEntry] |
604 | + |
605 | + def __init__(self, id: str, description: str, public_date: str, cvss: list[CVSS], assigned_to: str, notes: list[tuple], references: list[str], priority: str, priority_reason: str) -> None: |
606 | + super().__init__(id, description, public_date, cvss, assigned_to, notes, references, priority, priority_reason) |
607 | + |
608 | + def is_rel_present(self, release: Release) -> bool: |
609 | + for entry in self.pkg_entries: |
610 | + if entry.release == release: return True |
611 | + return False |
612 | + |
613 | + def add_cve_entry(self, entry: UbuntuCVEPkgEntry) -> bool: |
614 | + for self_entry in self.pkg_entries: |
615 | + if self_entry.pkg == entry.pkg and \ |
616 | + self_entry.release == entry.release: |
617 | + return False |
618 | + self.pkg_entries.append(entry) |
619 | + return True |
620 | + |
621 | + def __repr__(self) -> str: |
622 | + return self.id |
623 | + |
624 | + def __hash__(self) -> int: |
625 | + return hash(self.id) |
626 | + |
627 | + |
628 | +class USN(SN): |
629 | + package_fixed_versions: dict[SourcePackage:list[tuple[Version, Release]]] |
630 | + releases: set[Release] |
631 | + |
632 | + def __init__(self, id: str, data: dict, cve_objs: set, package_fixed_versions: dict, lp_bugs: set): |
633 | + for item in ['description', 'releases', 'title', 'timestamp', 'summary', 'action', 'id', 'isummary']: |
634 | + if item in data: |
635 | + setattr(self, item, data[item]) |
636 | + else: |
637 | + setattr(self, item, None) |
638 | + |
639 | + self.id = id |
640 | + self.lp_bugs = lp_bugs |
641 | + self.cves = cve_objs |
642 | + self.package_fixed_versions = package_fixed_versions |
643 | + |
644 | + def is_rel_present(self, release: Release) -> bool: |
645 | + return release in self.releases |
646 | diff --git a/scripts/datalib/uct_storage.py b/scripts/datalib/uct_storage.py |
647 | new file mode 100644 |
648 | index 0000000..f0e6232 |
649 | --- /dev/null |
650 | +++ b/scripts/datalib/uct_storage.py |
651 | @@ -0,0 +1,322 @@ |
652 | +#!/usr/bin/python3 |
653 | +# -*- coding: utf-8 -*- |
654 | +# Module containing classes that represent CVE and Package data |
655 | +# |
656 | +# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com> |
657 | +# Copyright (C) 2024 Canonical Ltd. |
658 | +# |
659 | +# This script is distributed under the terms and conditions of the GNU General |
660 | +# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html |
661 | +# for details. |
662 | +# |
663 | +from .storage import * |
664 | +from .uct_models import * |
665 | +from .config import PKG_CACHE_DIR, USN_DATABASE |
666 | +from .utils import get_active_ubuntu_releases |
667 | +from .config import UCT |
668 | + |
669 | +import json |
670 | +import os |
671 | +import cve_lib |
672 | +from multiprocessing import Pool |
673 | + |
674 | +class UCTPackageStorage(PackageStorage): |
675 | + """ |
676 | + Storage class for handling UCT packages. |
677 | + """ |
678 | + packages: dict[str: UbuntuSourcePackage] |
679 | + latest_date_created: dict[Release:datetime.datetime] |
680 | + loaded_releases: set[Release] |
681 | + |
682 | + def __init__(self) -> None: |
683 | + self.loaded_releases = set() |
684 | + self.latest_date_created = {} |
685 | + super().__init__() |
686 | + |
687 | + def load(self, filter_releases: list = []) -> None: |
688 | + final_releases = [] |
689 | + active_ubuntu_releases = get_active_ubuntu_releases() |
690 | + for release in active_ubuntu_releases: |
691 | + if filter_releases and release not in filter_releases: continue |
692 | + final_releases.append(release) |
693 | + |
694 | + with Pool(processes=os.cpu_count()) as pool: |
695 | + print(f'Loading {len(final_releases)} releases') |
696 | + package_data = pool.map(self._load_release_from_cache, final_releases) |
697 | + |
698 | + for release, release_data, last_date_created in package_data: |
699 | + self._init_release(release, release_data, last_date_created) |
700 | + |
701 | + self._order_packages() |
702 | + |
703 | + def _load_release_from_cache(self, release: str) -> dict: |
704 | + data = {} |
705 | + |
706 | + try: |
707 | + with open(os.path.join(os.path.expanduser(PKG_CACHE_DIR), release.replace('/', '_') + '-pkg-cache.json')) as f: |
708 | + data = json.load(f) |
709 | + except FileNotFoundError as ex: |
710 | + print(ex) |
711 | + return (release, None, None) |
712 | + |
713 | + release_data = {} |
714 | + latest_date_created = None |
715 | + release_obj = Release(release) |
716 | + for src_pkg_name, src_pkg_versions in data.items(): |
717 | + if src_pkg_name == 'latest_date_created': |
718 | + latest_date_created = datetime.datetime.fromtimestamp(src_pkg_versions) |
719 | + continue |
720 | + source_package = UbuntuSourcePackage(src_pkg_name, {}) |
721 | + |
722 | + release_data[src_pkg_name] = source_package |
723 | + for src_pkg_version, src_pkg_version_data in src_pkg_versions.items(): |
724 | + src_pkg_version_obj = UbuntuVersion(src_pkg_version) |
725 | + source_package.add_source_version(src_pkg_version_obj, release_obj, |
726 | + src_pkg_version_data['component'], |
727 | + src_pkg_version_data['pocket']) |
728 | + |
729 | + for binary_name, binary_data in src_pkg_version_data['binaries'].items(): |
730 | + source_package.add_binary(src_pkg_version_obj, UbuntuBinaryPackage(binary_name, |
731 | + UbuntuVersion(binary_data['version']), |
732 | + binary_data['arch'], |
733 | + source_package, |
734 | + binary_data['component'])) |
735 | + |
736 | + return (release_obj, release_data, latest_date_created) |
737 | + |
738 | + def _init_release(self, release: Release, release_data: dict, latest_date_created: str) -> None: |
739 | + if not release_data: return |
740 | + for package_name in release_data: |
741 | + if package_name in self.packages: |
742 | + self.packages[package_name].expand_versions(release_data[package_name]) |
743 | + else: |
744 | + self.packages[package_name] = release_data[package_name] |
745 | + |
746 | + self.latest_date_created[release] = latest_date_created |
747 | + self.loaded_releases.add(release) |
748 | + |
749 | + def get_package(self, pkg_name: str) -> UbuntuSourcePackage: |
750 | + return super().get_package(pkg_name) |
751 | + |
752 | + def load_release(self, release:str) -> None: |
753 | + if Release(release) in self.loaded_releases: return |
754 | + release_obj, data, date = self._load_release_from_cache(release) |
755 | + self._init_release(release_obj, data, date) |
756 | + |
757 | + def unload_release(self, release: str) -> None: |
758 | + release = Release(release) |
759 | + if release not in self.loaded_releases: return |
760 | + self.loaded_releases.remove(release) |
761 | + |
762 | + for package_name, package in self.packages.items(): |
763 | + original_source_versions = package.source_versions.copy() |
764 | + for version in original_source_versions: |
765 | + ver_rel = package.get_version_release(version) |
766 | + if ver_rel == release: package.remove_version(version) |
767 | + |
768 | +class UCTCVEStorage(CVEStorage): |
769 | + """ |
770 | + Storage class for handling UCT CVE packages. |
771 | + """ |
772 | + cves: dict[str: UbuntuCVE] |
773 | + package_storage: UCTPackageStorage |
774 | + |
775 | + # TODO: add a changed cache so we can cache CVEs and only update when something got loaded |
776 | + |
777 | + def load(self, cves_filter: list = []) -> None: |
778 | + final_cves = [] |
779 | + cves_ids = self.get_uct_cve_ids() |
780 | + for cve_id in cves_ids: |
781 | + if cves_filter and cve_id not in cves_filter: continue |
782 | + final_cves.append(cve_id) |
783 | + |
784 | + with Pool(processes=os.cpu_count()) as pool: |
785 | + print(f'Loading {len(final_cves)} CVEs') |
786 | + cves = pool.map(self._uct_cve_loader, final_cves) |
787 | + |
788 | + for cve, cve_pkg_data in cves: |
789 | + if not cve: continue |
790 | + self._init_cve(cve, cve_pkg_data) |
791 | + |
792 | + self._order_cves() |
793 | + |
794 | + def _uct_cve_loader(self, cve_id: str) -> UbuntuCVE: |
795 | + """Internal function to load CVE from UCT""" |
796 | + data = {} |
797 | + for directory in cve_lib.cve_dirs: |
798 | + try: |
799 | + data = cve_lib.load_cve(os.path.join(UCT, directory, cve_id)) |
800 | + except: |
801 | + continue |
802 | + |
803 | + if not data: |
804 | + raise FileNotFoundError(f'Couldn\'t find CVE {cve_id}') |
805 | + |
806 | + cvss = [] |
807 | + for cvss_entry in data['CVSS']: |
808 | + if not cvss_entry: continue |
809 | + |
810 | + cvss.append(CVSS( |
811 | + cvss_entry['source'], |
812 | + float(cvss_entry['baseScore']), |
813 | + cvss_entry['vector'] |
814 | + )) |
815 | + |
816 | + try: |
817 | + cve = UbuntuCVE(data['Candidate'], data['Description'], data['PublicDate'], cvss, data['Assigned-to'], data['Notes'], data['References'], data['Priority'][0], data['Priority'][1]) |
818 | + except Exception as ex: |
819 | + print(ex) |
820 | + return (None, None) |
821 | + |
822 | + return (cve, data['pkgs']) |
823 | + |
824 | + def get_uct_cve_ids(self) -> list: |
825 | + """Return all CVE IDs identified from UCT""" |
826 | + |
827 | + cve_ids = [] |
828 | + for dir in cve_lib.cve_dirs: |
829 | + for (_, _, filenames) in os.walk(dir): |
830 | + for filename in filenames: |
831 | + if not re.search(CVE.ID_REGEX, filename): continue |
832 | + cve_ids.append(filename) |
833 | + break |
834 | + |
835 | + return cve_ids |
836 | + |
837 | + def load_cve(self, cve_id: str) -> None: |
838 | + """Load a CVE into the storage""" |
839 | + cve, cve_data = self._uct_cve_loader(cve_id) |
840 | + self._init_cve(cve, cve_data) |
841 | + |
842 | + def get_all_cves(self) -> list[UbuntuCVE]: |
843 | + """Get all CVEs from the storage""" |
844 | + cves = [] |
845 | + for cve_id in self.cves: |
846 | + cves.append(self.get_cve(cve_id)) |
847 | + return cves |
848 | + |
849 | + def get_all_rel_cves(self, release: Release) -> list[UbuntuCVE]: |
850 | + """Get all CVEs from the storage affecting a release""" |
851 | + cves = [] |
852 | + for cve_id in self.cves: |
853 | + cve = self.get_cve(cve_id) |
854 | + if cve.is_rel_present(release): |
855 | + cves.append() |
856 | + return cves |
857 | + |
858 | + def populate_cve_entries(self, cve_id: str) -> None: |
859 | + cve, cve_pkg_data = self.cves[cve_id] |
860 | + cve.pkg_entries = list() |
861 | + for package_name, package_entries in cve_pkg_data.items(): |
862 | + package = self.package_storage.get_package(package_name) |
863 | + if not package: continue |
864 | + for release, status in package_entries.items(): |
865 | + release = Release(release) |
866 | + if not package.release_exists(release, include_parents=True): continue |
867 | + cve.add_cve_entry(UbuntuCVEPkgEntry(package, cve, status[0], status[1], release)) |
868 | + |
869 | + def get_cve(self, cve_id: str, with_pkg_links: bool = True) -> UbuntuCVE: |
870 | + """ |
871 | + Get a CVE from the storage. |
872 | + The package links will be computed when the |
873 | + CVE is requested, extracting them from the internal |
874 | + UCTPackageStorage. |
875 | + |
876 | + If with_pkg_links is disabled, links won't be computed |
877 | + """ |
878 | + if cve_id not in self.cves: return None |
879 | + |
880 | + cve, _ = self.cves[cve_id] |
881 | + cve.pkg_entries = list() |
882 | + if not with_pkg_links: return cve |
883 | + self.populate_cve_entries(cve_id) |
884 | + |
885 | + return cve |
886 | + |
887 | +class USNStorage(SNStorage): |
888 | + sns: dict[str: tuple[USN, list, dict]] |
889 | + package_storage: UCTPackageStorage |
890 | + cve_storage: UCTCVEStorage |
891 | + |
892 | + def load(self) -> None: |
893 | + usns_data = self._get_usn_database_data() |
894 | + print(f'Loading {len(usns_data)} USNs') |
895 | + for usn_id, usn_data in usns_data.items(): |
896 | + _, _, cves, lp_bugs, pkgs = self.process_usn_data((usn_id, usn_data)) |
897 | + self.add_usn(usn_id, usn_data, cves, lp_bugs, pkgs) |
898 | + |
899 | + self._order_sns() |
900 | + |
901 | + def _get_usn_database_data(self) -> dict: |
902 | + data = {} |
903 | + with open(os.path.expanduser(USN_DATABASE), 'r') as f: |
904 | + data.update(json.load(f)) |
905 | + return data |
906 | + |
907 | + def process_usn_data(self, usn_raw: dict) -> tuple[set, set, dict]: |
908 | + pkgs = {} |
909 | + lp_bugs = set() |
910 | + cves = set() |
911 | + usn_id = usn_raw[0] |
912 | + usn_data = usn_raw[1] |
913 | + # Package - fixed version loading |
914 | + for rel, info in usn_data['releases'].items(): |
915 | + # Get package |
916 | + for pkg, pkg_info in info['sources'].items(): |
917 | + pkgs.setdefault(pkg, []) |
918 | + pkgs[pkg].append((pkg_info['version'], rel)) |
919 | + |
920 | + # CVE loading |
921 | + for cve_text in usn_data['cves']: |
922 | + if 'launchpad.net' in cve_text: |
923 | + lp_bugs.add(cve_text) |
924 | + continue |
925 | + elif re.search(r'CVE-\d{4}-\d{4,7}', cve_text): |
926 | + cves.add(cve_text) |
927 | + |
928 | + return usn_id, usn_data, cves, lp_bugs, pkgs |
929 | + |
930 | + def add_usn(self, usn_id: str, usn_data: dict, cves: set, lp_bugs: set, pkgs: dict) -> None: |
931 | + self.sns[usn_id] = (USN(usn_id, usn_data, set(), dict(), lp_bugs), cves, pkgs) |
932 | + |
933 | + def get_usns_by_cve(self, cve_id: str) -> list[USN]: |
934 | + target_usns = [] |
935 | + for usn_id, usn_data in self.sns.items(): |
936 | + if cve_id in usn_data[1]: |
937 | + target_usns.append(self.get_usn(usn_id)) |
938 | + |
939 | + return target_usns |
940 | + |
941 | + def get_usn(self, usn_id: str) -> USN: |
942 | + if usn_id not in self.sns: return None |
943 | + usn, cves, pkgs = self.sns[usn_id] |
944 | + cve_mapping = set() |
945 | + pkg_mapping = dict() |
946 | + |
947 | + for cve_id in cves: |
948 | + cve = self.cve_storage.get_cve(cve_id) |
949 | + if not cve: continue |
950 | + cve_mapping.add(cve) |
951 | + |
952 | + for pkg_name in pkgs: |
953 | + pkg = self.package_storage.get_package(pkg_name) |
954 | + if not pkg: continue |
955 | + for version, release in pkgs[pkg_name]: |
956 | + release = Release(release) |
957 | + if not pkg.release_exists(release, include_parents=True): continue |
958 | + pkg_mapping.setdefault(pkg, []) |
959 | + version = UbuntuVersion(version) |
960 | + if version.is_esm(): |
961 | + for pkg_version in pkg.source_versions: |
962 | + if pkg.get_version_release(pkg_version) == release: |
963 | + release = release.get_esm_release(pkg.get_version_component(pkg_version)) |
964 | + break |
965 | + |
966 | + pkg_mapping[pkg].append((version, release)) |
967 | + |
968 | + usn.cves = cve_mapping |
969 | + usn.package_fixed_versions = pkg_mapping |
970 | + |
971 | + return usn |
972 | + |
973 | + |
974 | diff --git a/scripts/datalib/utils.py b/scripts/datalib/utils.py |
975 | new file mode 100644 |
976 | index 0000000..cc072e0 |
977 | --- /dev/null |
978 | +++ b/scripts/datalib/utils.py |
979 | @@ -0,0 +1,20 @@ |
980 | +#!/usr/bin/python3 |
981 | +# -*- coding: utf-8 -*- |
982 | +# Module containing classes that represent CVE and Package data |
983 | +# |
984 | +# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com> |
985 | +# Copyright (C) 2024 Canonical Ltd. |
986 | +# |
987 | +# This script is distributed under the terms and conditions of the GNU General |
988 | +# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html |
989 | +# for details. |
990 | +# |
991 | +import cve_lib |
992 | + |
993 | +def get_active_ubuntu_releases(): |
994 | + active_ubuntu_releases = [] |
995 | + for release in cve_lib.all_releases: |
996 | + if release not in cve_lib.eol_releases or release in cve_lib.get_active_releases_with_esm(): |
997 | + active_ubuntu_releases.append(release) |
998 | + |
999 | + return active_ubuntu_releases |
1000 | \ No newline at end of file |
Testing script:
```
from datalib import *
import time
now = time.time()
cve_storage = UCTCVEStorage()
cve_storage.load()
package_storage = UCTPackageStorage() storage. load(filter_ releases= ['bionic' , 'esm-apps/bionic', 'esm-infra/ bionic' ])
package_
usn_storage = USNStorage()
usn_storage.load()
cve_storage. link_pkg_ storage( package_ storage) link_pkg_ storage( package_ storage) link_cve_ storage( cve_storage)
usn_storage.
usn_storage.
later = time.time()
print('Total time: ', later - now)
gpac = package_ storage. get_package( 'python- idna') versions: info[version] ['release' ], version) version] :
for version in gpac.source_
print(gpac, gpac.source_
for binary in gpac.binaries[
print(f' - {binary.name} {binary.version} {binary.arches}')
print() ------- ------- ----') ------- ------- ----') get_cve( 'CVE-2024- 3651') description)
print('
print('Showing CVE-2024-3651')
print('
cve = cve_storage.
print(cve.id)
print(cve.
for entry in cve.pkg_entries:
print(' -', entry.cve, entry.pkg, entry.release, entry.status, entry.note)
print() ------- ------- ----') storage. load_release( 'jammy' ) get_cve( 'CVE-2024- 3651') ------- ------- ----')
print('
print('Loading jammy into the storage')
package_
cve = cve_storage.
print('
print('Showing CVE entries now')
for entry in cve.pkg_entries:
print(' -', entry.cve, entry.pkg, entry.release, entry.status, entry.note)
print() ------- ------- ----') ------- ------- ----') storage. get_package( 'python- idna') versions: info[version] ['release' ], version) version] :
print('
print('Showing python-idna package')
print('
gpac = package_
for version in gpac.source_
print(gpac, gpac.source_
for binary in gpac.binaries[
print(f' - {binary.name} {binary.version} {binary.arches}')
print() ------- ------- ----') ------- ------- ----') get_usns_ by_cve( cve.id) 'USN',usn. id) usn.description )
print( cve.id) list(usn. releases. keys()) )) fixed_versions:
print( source_ pkg.name) fixed_versions[ source_ pkg]:
print('
print('Showing USNs for this CVE')
print('
usns = usn_storage.
for usn in usns:
print(
print(
for cve in usn.cves:
print('Affected releases:', ','.join(
for source_pkg in usn.package_
for fixed_version, release in usn.package_
print(' - Fixed at', fixed_version, 'in', release)
print() ------- ------- ----') storage. unload_ release( 'jammy' )
print('
print('Unloading jammy from the storage')
package_
print() ------- ------- ----') ------- ------- ----') storage. get_package( 'python- idna') versions: info[version] ['release' ], version) version] :
print('
print('Showing python-idna package')
print('
gpac = package_
for version in gpac.source_
print(gpac, gpac.source_
for binary in gpac.binaries[
print(f' - {binary.name} {binary.version} {binary.arches}')
cve = cve_storage. get_cve( 'CVE-2024- 3651')
print() ------- ------- ----') ------- ------- ----')
print('
print('Showing CVE-2024-3651')
print('
for entry in cve.pkg_entries:
print(' -', entry.cve, entry.pkg, entry.release, entry.status, entry.note)
print() ------- ------- ----') ------- ------- ----') get_usns_ by_cve( cve.id) 'USN',usn. id)
print('
print('Showing USNs for this CVE')
print('
usns = usn_storage.
for usn in usns:
print(
p...