Merge ~litios/ubuntu-cve-tracker:feature/data-lib into ubuntu-cve-tracker:master

Proposed by David Fernandez Gonzalez
Status: Merged
Approved by: Eduardo Barretto
Approved revision: 38ffc5e353ce07574295f3d214cd3b66c1a86106
Merge reported by: David Fernandez Gonzalez
Merged at revision: 38ffc5e353ce07574295f3d214cd3b66c1a86106
Proposed branch: ~litios/ubuntu-cve-tracker:feature/data-lib
Merge into: ubuntu-cve-tracker:master
Diff against target: 1000 lines (+955/-0)
7 files modified
scripts/datalib/__init__.py (+5/-0)
scripts/datalib/config.py (+18/-0)
scripts/datalib/models.py (+245/-0)
scripts/datalib/storage.py (+86/-0)
scripts/datalib/uct_models.py (+259/-0)
scripts/datalib/uct_storage.py (+322/-0)
scripts/datalib/utils.py (+20/-0)
Reviewer Review Type Date Requested Status
Eduardo Barretto Approve
Nick Galanis Approve
Review via email: mp+466666@code.launchpad.net

Description of the change

This library provides a project-agnostic interface for loading CVEs and Packages.

This library loads all the information from the different sources into well-defined objects with shared interfaces.

The storage classes load handle these objects and handle them to avoid memory duplication.

-----

To post a comment you must log in.
Revision history for this message
David Fernandez Gonzalez (litios) wrote :
Download full text (3.5 KiB)

Testing script:

```
from datalib import *
import time

now = time.time()

cve_storage = UCTCVEStorage()
cve_storage.load()

package_storage = UCTPackageStorage()
package_storage.load(filter_releases=['bionic', 'esm-apps/bionic', 'esm-infra/bionic'])

usn_storage = USNStorage()
usn_storage.load()

cve_storage.link_pkg_storage(package_storage)
usn_storage.link_pkg_storage(package_storage)
usn_storage.link_cve_storage(cve_storage)

later = time.time()
print('Total time: ', later - now)

gpac = package_storage.get_package('python-idna')
for version in gpac.source_versions:
    print(gpac, gpac.source_info[version]['release'], version)
    for binary in gpac.binaries[version]:
        print(f' - {binary.name} {binary.version} {binary.arches}')

print()
print('------------------')
print('Showing CVE-2024-3651')
print('------------------')
cve = cve_storage.get_cve('CVE-2024-3651')
print(cve.id)
print(cve.description)
for entry in cve.pkg_entries:
    print(' -', entry.cve, entry.pkg, entry.release, entry.status, entry.note)

print()
print('------------------')
print('Loading jammy into the storage')
package_storage.load_release('jammy')
cve = cve_storage.get_cve('CVE-2024-3651')
print('------------------')
print('Showing CVE entries now')
for entry in cve.pkg_entries:
    print(' -', entry.cve, entry.pkg, entry.release, entry.status, entry.note)

print()
print('------------------')
print('Showing python-idna package')
print('------------------')
gpac = package_storage.get_package('python-idna')
for version in gpac.source_versions:
    print(gpac, gpac.source_info[version]['release'], version)
    for binary in gpac.binaries[version]:
        print(f' - {binary.name} {binary.version} {binary.arches}')

print()
print('------------------')
print('Showing USNs for this CVE')
print('------------------')
usns = usn_storage.get_usns_by_cve(cve.id)
for usn in usns:
    print('USN',usn.id)
    print(usn.description)
    for cve in usn.cves:
        print(cve.id)
    print('Affected releases:', ','.join(list(usn.releases.keys())))
    for source_pkg in usn.package_fixed_versions:
        print(source_pkg.name)
        for fixed_version, release in usn.package_fixed_versions[source_pkg]:
            print(' - Fixed at', fixed_version, 'in', release)

print()
print('------------------')
print('Unloading jammy from the storage')
package_storage.unload_release('jammy')

print()
print('------------------')
print('Showing python-idna package')
print('------------------')
gpac = package_storage.get_package('python-idna')
for version in gpac.source_versions:
    print(gpac, gpac.source_info[version]['release'], version)
    for binary in gpac.binaries[version]:
        print(f' - {binary.name} {binary.version} {binary.arches}')

cve = cve_storage.get_cve('CVE-2024-3651')

print()
print('------------------')
print('Showing CVE-2024-3651')
print('------------------')
for entry in cve.pkg_entries:
    print(' -', entry.cve, entry.pkg, entry.release, entry.status, entry.note)

print()
print('------------------')
print('Showing USNs for this CVE')
print('------------------')
usns = usn_storage.get_usns_by_cve(cve.id)
for usn in usns:
    print('USN',usn.id)
    p...

Read more...

558d0f0... by David Fernandez Gonzalez

Revert "[Data lib] Add docs"

This reverts commit eaae95160bbf4c29a076db144afad562c85844e5.

Revision history for this message
David Fernandez Gonzalez (litios) wrote :

Docs were not meant to be merged, it was for the PR itself.

Reverting the change so you can still see them but I'll simply erase the 2 last commits when merging.

Revision history for this message
Nick Galanis (nickgalanis) wrote :

LGTM! Thanks David, amazing work. It's a very clean and abstract representation of our data. I ran the script you provided and was functional. Some things to consider (as already discussed):
 - Ensure python3.8 compatibility for focal machines
 - Possibly revisit the idea of multi-threading that could cause low-memory machines to crash

We would need to add/change some fields/functions as we move forward, but for now, I think it is safe to merge.

review: Approve
Revision history for this message
Diogo Sousa (0xdsousa) wrote :

Absolutely loving the type hints on everything, great work.

9531bb9... by David Fernandez Gonzalez

[DATA LIB] Allow getting source versions per release

Signed-off-by: David Fernandez Gonzalez <email address hidden>

6f1f847... by David Fernandez Gonzalez

[DATA LIB] Don't link to object if it hasn't been loaded

Signed-off-by: David Fernandez Gonzalez <email address hidden>

Revision history for this message
David Fernandez Gonzalez (litios) wrote (last edit ):

Adding two more commits:

* Include a feature to retrieve source versions from a package based on the release. Test from the example above:

Showing bionic versions {2.6-1, 2.5-1}
Showing bionic versions, including esm releases {2.6-1, 2.6-1ubuntu0.1~esm1, 2.5-1}

* Fix bug for USNs in case the CVE or the package was not loaded when loading the USN. In that case, those won't get added.

296894d... by David Fernandez Gonzalez

[Data lib] Use apt_pkg for version comparison

Signed-off-by: David Fernandez Gonzalez <email address hidden>

38ffc5e... by David Fernandez Gonzalez

[DATA LIB] Store CVEs, packages and SNs ordered.

For CVE and SNs, sort them based on the integer
values rather than standard ascii comparison.

Signed-off-by: David Fernandez Gonzalez <email address hidden>

Revision history for this message
Eduardo Barretto (ebarretto) wrote :

lgtm, thanks!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/scripts/datalib/__init__.py b/scripts/datalib/__init__.py
2new file mode 100644
3index 0000000..24bda8c
4--- /dev/null
5+++ b/scripts/datalib/__init__.py
6@@ -0,0 +1,5 @@
7+from .config import *
8+from .models import *
9+from .uct_models import *
10+from .storage import *
11+from .uct_storage import *
12diff --git a/scripts/datalib/config.py b/scripts/datalib/config.py
13new file mode 100644
14index 0000000..6869d92
15--- /dev/null
16+++ b/scripts/datalib/config.py
17@@ -0,0 +1,18 @@
18+#!/usr/bin/python3
19+# -*- coding: utf-8 -*-
20+# Module containing classes that represent CVE and Package data
21+#
22+# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com>
23+# Copyright (C) 2024 Canonical Ltd.
24+#
25+# This script is distributed under the terms and conditions of the GNU General
26+# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html
27+# for details.
28+#
29+import os
30+
31+VERBOSE = os.environ.get('UCTLIB_VERBOSE', True)
32+THREADING = os.environ.get('UCTLIB_THREADING', True)
33+UCT = os.environ.get('UCT')
34+PKG_CACHE_DIR = os.environ.get('UCTLIB_PKG_CACHE_DIR', '~/.cache/pkg-cache')
35+USN_DATABASE = os.environ.get('UCTLIB_USN_DATABASE_DIR', '~/.cache/usn/database.json')
36diff --git a/scripts/datalib/models.py b/scripts/datalib/models.py
37new file mode 100644
38index 0000000..adc3e72
39--- /dev/null
40+++ b/scripts/datalib/models.py
41@@ -0,0 +1,245 @@
42+#!/usr/bin/python3
43+# -*- coding: utf-8 -*-
44+# Module containing classes that represent CVE and Package data
45+#
46+# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com>
47+# Copyright (C) 2024 Canonical Ltd.
48+#
49+# This script is distributed under the terms and conditions of the GNU General
50+# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html
51+# for details.
52+#
53+
54+import re
55+import datetime
56+import apt_pkg
57+apt_pkg.init_system()
58+
59+class Version:
60+ version: str
61+
62+ def __init__(self, version) -> None:
63+ self.version = version
64+
65+ def compare_version(self, other: 'Version') -> int:
66+ if not hasattr(other, 'version'): raise Exception("Cannot compare")
67+ return apt_pkg.version_compare(self.version, other.version)
68+
69+ def __eq__(self, other: 'Version') -> bool:
70+ if not hasattr(other, 'version'): raise Exception("Cannot compare")
71+ return self.compare_version(other) == 0
72+
73+ def __lt__(self, other: 'Version') -> bool:
74+ if not hasattr(other, 'version'): raise Exception("Cannot compare")
75+ return self.compare_version(other) == -1
76+
77+ def __hash__(self) -> int:
78+ return hash(self.version)
79+
80+ def __repr__(self) -> str:
81+ return self.version
82+
83+class BinaryPackage:
84+ name: str
85+ version: Version
86+ arches: list[str]
87+ source: 'SourcePackage'
88+
89+ def __init__(self, name: str, version: Version, arches: list[str], source: 'SourcePackage') -> None:
90+ self.name = name
91+ self.version = version
92+ self.arches = arches
93+ self.source = source
94+
95+ def __eq__(self, other: object) -> bool:
96+ return self.name == other.name and \
97+ self.version == other.version and \
98+ self.source == other.source
99+
100+class SourcePackage:
101+ name: str
102+ source_versions: set[Version]
103+ binaries: dict[Version:list[BinaryPackage]]
104+
105+ def __init__(self, name: str, binaries: dict[Version:list[BinaryPackage]]) -> None:
106+ self.name = name
107+ self.binaries = binaries
108+ self.source_versions = set()
109+
110+ def __eq__(self, other: object) -> bool:
111+ return self.name == other.name and \
112+ self.source_versions == other.source_versions and \
113+ self.binaries == other.binaries
114+
115+ def is_same_package(self, other: object) -> bool:
116+ """Is same package but different version"""
117+ return self.name == other.name
118+
119+ def get_source_versions(self) -> set[Version]:
120+ return self.source_versions
121+
122+ def get_binaries(self, version: Version) -> list[BinaryPackage]:
123+ if version not in self.binaries: return []
124+ return self.binaries[version]
125+
126+ def add_source_version(self, version: Version) -> None:
127+ if version in self.source_versions: return
128+ self.source_versions.add(version)
129+ self.binaries[version] = []
130+
131+ def add_binary(self, src_version: Version, binary: BinaryPackage) -> None:
132+ if src_version not in self.binaries: return
133+ if binary not in self.binaries[src_version]:
134+ self.binaries[src_version].append(binary)
135+
136+ def is_binary_present(self, src_version: Version, binary: BinaryPackage) -> bool:
137+ if src_version not in self.binaries: return False
138+ return binary in self.binaries[src_version]
139+
140+class CVSS:
141+ entity: str
142+ score: float
143+ severity: str
144+ vector: str
145+
146+ def __init__(self, entity: str, score: float, vector: str) -> None:
147+ self.entity = entity
148+ self.score = score
149+ self.vector = vector
150+ if self.score == 0.0:
151+ self.severity = 'None'
152+ elif self.score < 4.0:
153+ self.severity = 'Low'
154+ elif self.score < 7.0:
155+ self.severity = 'Medium'
156+ elif self.score < 9.0:
157+ self.severity = 'High'
158+ elif self.score <= 10.0:
159+ self.severity = 'Critical'
160+ else:
161+ Exception(f'{self.score} is not a valid CVSS score.')
162+
163+ def __lt__(self, other):
164+ return self.score < other.score
165+
166+ def __repr__(self) -> str:
167+ return f'[{self.severity}] {self.vector} ({self.score}) -- {self.entity}'
168+
169+class CVE:
170+ id: str
171+ ID_REGEX = r'CVE-\d{4}-\d{4,7}'
172+ description: str
173+ public_date: str
174+ cvss: list[CVSS]
175+ assigned_to: str
176+ notes: list[tuple]
177+ references: list[str]
178+ priority: str
179+ priority_reason: str
180+ pkg_entries: list["CVEPkgEntry"]
181+
182+ def __init__(self, id: str, description: str, public_date: str, cvss: list[CVSS],
183+ assigned_to: str, notes: list[tuple], references: list[str],
184+ priority: str, priority_reason: str) -> None:
185+
186+ if not re.search(self.ID_REGEX, id):
187+ raise Exception(f'{id} is not a valid CVE ID')
188+
189+ self.id = id
190+ self.description = description.strip()
191+ self.public_date = public_date
192+ self.cvss = cvss
193+ self.assigned_to = assigned_to
194+ self.notes = notes
195+ self.references = references
196+ self.priority = priority
197+ self.priority_reason = priority_reason
198+ self.pkg_entries = []
199+
200+ def __lt__(self, other):
201+ if not hasattr(other, 'id'): raise Exception("Cannot compare")
202+ _, our_year, our_number = self.id.split('-')
203+ _, other_year, other_number = other.id.split('-')
204+
205+ return int(our_year) < int(other_year) or \
206+ int(our_year) == int(other_year) and int(our_number) < int(other_number)
207+
208+ def add_reference(self, ref: str) -> None:
209+ if ref not in self.references:
210+ self.references.append(ref)
211+
212+ def add_note(self, author: str, note: str) -> None:
213+ self.notes.append((author, note))
214+
215+ def set_assigned_to(self, assigned_to: str) -> None:
216+ self.assigned_to = assigned_to
217+
218+ def is_package_present(self, package: SourcePackage) -> bool:
219+ for cve_entry in self.pkg_entries:
220+ if package == cve_entry.pkg:
221+ return True
222+
223+ return False
224+
225+ def get_package_entry(self, package: SourcePackage) -> 'CVEPkgEntry':
226+ for cve_entry in self.pkg_entries:
227+ if package == cve_entry.pkg:
228+ return cve_entry
229+
230+ return None
231+
232+ def add_cve_entry(self, entry: 'CVEPkgEntry') -> bool:
233+ for self_entry in self.pkg_entries:
234+ if self_entry.pkg == entry.pkg:
235+ return False
236+ self.pkg_entries.append(entry)
237+ return True
238+
239+ def needs_triage(self) -> bool:
240+ if self.priority == 'needs-triage': return True
241+ for cve_entry in self.pkg_entries:
242+ if cve_entry.status == 'needs-triage':
243+ return True
244+
245+ return False
246+
247+ def __eq__(self, other: object) -> bool:
248+ return self.id == other.id
249+
250+ def __repr__(self) -> str:
251+ return str(vars(self))
252+
253+
254+class CVEPkgEntry:
255+ pkg: SourcePackage
256+ cve: CVE
257+ status: str
258+ note: str
259+
260+ STATUSES = ['needs-triage', 'needed', 'not-affected', 'released', 'ignored', 'deferred', 'DNE', 'pending']
261+ def __init__(self, pkg: SourcePackage, cve: CVE, status: str, note: str) -> None:
262+ self.pkg = pkg
263+ self.cve = cve
264+
265+ if status not in self.STATUSES:
266+ raise Exception(f'{status} is not a valid status')
267+
268+ self.status = status
269+ self.note = note
270+
271+class SN:
272+ id: str
273+ cves: set[CVE]
274+ packages: set[SourcePackage]
275+ package_fixed_versions: dict[SourcePackage:Version]
276+ lp_bugs: set[str]
277+ description: str
278+ date: datetime.date
279+
280+ def __init__(self, description: str, date: datetime.date, pkg_fixed_info: dict) -> None:
281+ self.cves = set()
282+ self.packages = set()
283+ self.lp_bugs = set()
284+ self.description = description
285+ self.date = date
286+ self.package_fixed_versions = pkg_fixed_info
287\ No newline at end of file
288diff --git a/scripts/datalib/storage.py b/scripts/datalib/storage.py
289new file mode 100644
290index 0000000..fc5c271
291--- /dev/null
292+++ b/scripts/datalib/storage.py
293@@ -0,0 +1,86 @@
294+#!/usr/bin/python3
295+# -*- coding: utf-8 -*-
296+# Module containing classes that represent CVE and Package data
297+#
298+# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com>
299+# Copyright (C) 2024 Canonical Ltd.
300+#
301+# This script is distributed under the terms and conditions of the GNU General
302+# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html
303+# for details.
304+#
305+from datalib.uct_models import SourcePackage
306+from .models import *
307+
308+class PackageStorage:
309+ """
310+ Storage class for handling packages.
311+ """
312+ packages: dict[str: SourcePackage]
313+
314+ def __init__(self) -> None:
315+ self.packages = {}
316+
317+ def _order_packages(self) -> None:
318+ self.packages = dict(sorted(self.packages.items()))
319+
320+ def get_package(self, pkg_name: str) -> SourcePackage:
321+ if pkg_name not in self.packages: return None
322+ return self.packages[pkg_name]
323+
324+class CVEStorage:
325+ """
326+ Storage class for handling CVE packages.
327+ """
328+ cves: dict[str: CVE]
329+ package_storage: PackageStorage
330+
331+ # TODO: add a changed cache so we can cache CVEs and only update when something got loaded
332+
333+ def __init__(self) -> None:
334+ self.cves = {}
335+
336+ def link_pkg_storage(self, package_storage: PackageStorage) -> None:
337+ self.package_storage = package_storage
338+
339+ def _order_cves(self) -> None:
340+ self.cves = dict(sorted(self.cves.items(),
341+ key=lambda value: int(value[0].split('-')[1]) * 10 ** 8 + int(value[0].split('-')[2])))
342+
343+ def _init_cve(self, cve: CVE, pkg_data: dict) -> None:
344+ """Internal function to add CVE object to the class storage"""
345+ self.cves[cve.id] = (cve, pkg_data)
346+
347+ def load_cve(self, cve_id: str) -> None:
348+ """Load a CVE into the storage"""
349+ raise NotImplementedError
350+
351+ def get_cve(self, cve_id: str) -> CVE:
352+ """
353+ Get a CVE from the storage.
354+ The package links will be computed when the
355+ CVE is requested, extracting them from the internal
356+ PackageStorage.
357+ """
358+ raise NotImplementedError
359+
360+class SNStorage:
361+ sns: dict[str: SN]
362+ package_storage: PackageStorage
363+ cve_storage: CVEStorage
364+
365+ def __init__(self) -> None:
366+ self.sns = {}
367+
368+ def _order_sns(self) -> None:
369+ self.sns = dict(sorted(self.sns.items(),
370+ key=lambda value: int(value[0].split('-')[0]) * 10 ** 8 + int(value[0].split('-')[1])))
371+
372+ def link_pkg_storage(self, package_storage: PackageStorage) -> None:
373+ self.package_storage = package_storage
374+
375+ def link_cve_storage(self, cve_storage: CVEStorage) -> None:
376+ self.cve_storage = cve_storage
377+
378+ def load_sn(self, object: SN) -> None:
379+ return NotImplementedError
380\ No newline at end of file
381diff --git a/scripts/datalib/uct_models.py b/scripts/datalib/uct_models.py
382new file mode 100644
383index 0000000..7b9fe01
384--- /dev/null
385+++ b/scripts/datalib/uct_models.py
386@@ -0,0 +1,259 @@
387+#!/usr/bin/python3
388+# -*- coding: utf-8 -*-
389+# Module containing classes that represent CVE and Package data
390+#
391+# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com>
392+# Copyright (C) 2024 Canonical Ltd.
393+#
394+# This script is distributed under the terms and conditions of the GNU General
395+# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html
396+# for details.
397+#
398+
399+from datalib.models import CVSS
400+from .models import *
401+from cve_lib import get_subproject_details, get_active_esm_releases, is_active_release
402+
403+class Release:
404+ name: str
405+ version: str
406+ codename: str
407+ product: str
408+ series: str
409+ is_esm: bool
410+ is_active: bool
411+ requires_oval: bool
412+ parent: "Release"
413+
414+ def __init__(self, name: str) -> None:
415+ canon, product, series, details = get_subproject_details(name)
416+ if not canon:
417+ self.name = name
418+ self.parent = None
419+ return
420+ self.name = canon
421+ self.product = product
422+ self.series = series
423+ self.is_esm = self.name in get_active_esm_releases()
424+ self.is_active = is_active_release(self.name)
425+ self.parent = None
426+ self.version = None
427+ self.codename = None
428+ if not details: return
429+ if 'oval' in details:
430+ self.requires_oval = details['oval']
431+ else:
432+ self.requires_oval = True
433+ if 'parent' in details:
434+ self.parent = Release(details['parent'])
435+ if 'version' in details:
436+ self.version = details['version']
437+ if 'codename' in details:
438+ self.codename = details['codename']
439+
440+ def __eq__(self, value: object) -> bool:
441+ return self.name == value.name
442+
443+ def __repr__(self) -> str:
444+ return self.name
445+
446+ def __hash__(self):
447+ return hash(self.name)
448+
449+ def get_esm_release(self, component: str) -> "Release":
450+ esm_releases = get_active_esm_releases()
451+ potential_releases = list(filter(lambda release: self.series in release, esm_releases))
452+ if len(potential_releases) == 1:
453+ return Release(potential_releases[0])
454+ elif len(potential_releases) == 2:
455+ if component in ['main', 'restricted']:
456+ target = 'esm-infra'
457+ elif component in ['universe', 'multiverse']:
458+ target = 'esm-apps'
459+
460+ if target in potential_releases[0]: return Release(potential_releases[0])
461+ else: return Release(potential_releases[1])
462+
463+ return None
464+ def is_parent_release(self, release: "Release") -> bool:
465+ return release in self.get_all_parents()
466+
467+ def get_direct_parent(self) -> "Release":
468+ return self.parent
469+
470+ def get_all_parents(self) -> "Release":
471+ release = self
472+ all_parents = []
473+ while release.parent:
474+ release = release.parent
475+ all_parents.append(release)
476+ return all_parents
477+
478+ def get_oldest_parent(self) -> "Release":
479+ if not self.parent: return None
480+ return self.get_all_parents()[-1]
481+
482+class UbuntuVersion(Version):
483+ def is_esm(self) -> bool:
484+ return 'esm' in self.version
485+
486+class UbuntuBinaryPackage(BinaryPackage):
487+ component: str
488+
489+ def __init__(self, name: str, version: UbuntuVersion, arches: list[str], source: 'UbuntuSourcePackage', component: str) -> None:
490+ self.component = component
491+ super().__init__(name, version, arches, source)
492+
493+class UbuntuSourcePackage(SourcePackage):
494+ source_info: dict[UbuntuVersion: dict]
495+ binaries: dict[UbuntuVersion:list[UbuntuBinaryPackage]]
496+
497+ def __init__(self, name, binaries) -> None:
498+ self.source_info = {}
499+ super().__init__(name, binaries)
500+
501+ def add_source_version(self, version: UbuntuVersion, release: Release, component: str, pocket: str):
502+ if version in self.source_versions: return
503+ self.source_info[version] = {
504+ 'pocket': pocket,
505+ 'component': component,
506+ 'release': release
507+ }
508+ return super().add_source_version(version)
509+
510+ def get_version_release(self, version: UbuntuVersion) -> Release:
511+ if version not in self.source_versions: return None
512+ return self.source_info[version]['release']
513+
514+ def get_version_component(self, version: UbuntuVersion) -> str:
515+ if version not in self.source_versions: return None
516+ return self.source_info[version]['component']
517+
518+ def get_version_pocket(self, version: UbuntuVersion) -> str:
519+ if version not in self.source_versions: return None
520+ return self.source_info[version]['pocket']
521+
522+ def get_release_source_versions(self, release: Release, include_parents: bool = False, include_esm: bool = False) -> set[UbuntuVersion]:
523+ rel_versions = set()
524+ for source_version in self.source_versions:
525+ source_version_rel = self.get_version_release(source_version)
526+ component = self.get_version_component(source_version)
527+ if (include_parents and release.is_parent_release(source_version_rel)) or \
528+ (include_esm and source_version_rel == release.get_esm_release(component)) or \
529+ source_version_rel == release:
530+ rel_versions.add(source_version)
531+
532+ return rel_versions
533+
534+ def get_latest_version(self, release: Release = None, include_parents: bool = False, include_esm: bool = False) -> UbuntuVersion:
535+ if release:
536+ source_versions = self.get_release_source_versions(release, include_parents, include_esm)
537+ else:
538+ source_versions = self.get_source_versions()
539+
540+ if not source_versions: return None
541+
542+ latest = None
543+ for source_version in source_versions:
544+ if not latest or source_version > latest:
545+ latest = source_version
546+
547+ return latest
548+
549+ def get_earliest_version(self, release: Release = None, include_parents: bool = False, include_esm: bool = False) -> UbuntuVersion:
550+ if release:
551+ source_versions = self.get_release_source_versions(release, include_parents, include_esm)
552+ else:
553+ source_versions = self.get_source_versions()
554+
555+ if not source_versions: return None
556+
557+ earliest = None
558+ for source_version in source_versions:
559+ if not earliest or source_version < earliest:
560+ earliest = source_version
561+
562+ return earliest
563+
564+ def expand_versions(self, other: "UbuntuSourcePackage") -> None:
565+ self.source_versions.update(other.source_versions)
566+ self.source_info.update(other.source_info)
567+ self.binaries.update(other.binaries)
568+
569+ def remove_version(self, version: UbuntuVersion) -> None:
570+ self.source_versions.remove(version)
571+ del self.source_info[version]
572+ del self.binaries[version]
573+
574+ def release_exists(self, release: Release, include_parents: bool) -> bool:
575+ """
576+ Check if release exists in the package.
577+
578+ If include_parents, it will also check if any of the release
579+ parents are listed.
580+ """
581+ for _, entry in self.source_info.items():
582+ if release == entry['release']:
583+ return True
584+ if include_parents and release.is_parent_release(entry['release']):
585+ return True
586+
587+ return False
588+
589+ def __repr__(self) -> str:
590+ return f'{self.name}'
591+
592+ def __hash__(self) -> int:
593+ return hash(self.name)
594+
595+class UbuntuCVEPkgEntry(CVEPkgEntry):
596+ release: Release
597+
598+ def __init__(self, pkg: UbuntuSourcePackage, cve: 'UbuntuCVE', status: str, note: str, release: Release) -> None:
599+ self.release = release
600+ super().__init__(pkg, cve, status, note)
601+
602+class UbuntuCVE(CVE):
603+ pkg_entries: list[UbuntuCVEPkgEntry]
604+
605+ def __init__(self, id: str, description: str, public_date: str, cvss: list[CVSS], assigned_to: str, notes: list[tuple], references: list[str], priority: str, priority_reason: str) -> None:
606+ super().__init__(id, description, public_date, cvss, assigned_to, notes, references, priority, priority_reason)
607+
608+ def is_rel_present(self, release: Release) -> bool:
609+ for entry in self.pkg_entries:
610+ if entry.release == release: return True
611+ return False
612+
613+ def add_cve_entry(self, entry: UbuntuCVEPkgEntry) -> bool:
614+ for self_entry in self.pkg_entries:
615+ if self_entry.pkg == entry.pkg and \
616+ self_entry.release == entry.release:
617+ return False
618+ self.pkg_entries.append(entry)
619+ return True
620+
621+ def __repr__(self) -> str:
622+ return self.id
623+
624+ def __hash__(self) -> int:
625+ return hash(self.id)
626+
627+
628+class USN(SN):
629+ package_fixed_versions: dict[SourcePackage:list[tuple[Version, Release]]]
630+ releases: set[Release]
631+
632+ def __init__(self, id: str, data: dict, cve_objs: set, package_fixed_versions: dict, lp_bugs: set):
633+ for item in ['description', 'releases', 'title', 'timestamp', 'summary', 'action', 'id', 'isummary']:
634+ if item in data:
635+ setattr(self, item, data[item])
636+ else:
637+ setattr(self, item, None)
638+
639+ self.id = id
640+ self.lp_bugs = lp_bugs
641+ self.cves = cve_objs
642+ self.package_fixed_versions = package_fixed_versions
643+
644+ def is_rel_present(self, release: Release) -> bool:
645+ return release in self.releases
646diff --git a/scripts/datalib/uct_storage.py b/scripts/datalib/uct_storage.py
647new file mode 100644
648index 0000000..f0e6232
649--- /dev/null
650+++ b/scripts/datalib/uct_storage.py
651@@ -0,0 +1,322 @@
652+#!/usr/bin/python3
653+# -*- coding: utf-8 -*-
654+# Module containing classes that represent CVE and Package data
655+#
656+# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com>
657+# Copyright (C) 2024 Canonical Ltd.
658+#
659+# This script is distributed under the terms and conditions of the GNU General
660+# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html
661+# for details.
662+#
663+from .storage import *
664+from .uct_models import *
665+from .config import PKG_CACHE_DIR, USN_DATABASE
666+from .utils import get_active_ubuntu_releases
667+from .config import UCT
668+
669+import json
670+import os
671+import cve_lib
672+from multiprocessing import Pool
673+
674+class UCTPackageStorage(PackageStorage):
675+ """
676+ Storage class for handling UCT packages.
677+ """
678+ packages: dict[str: UbuntuSourcePackage]
679+ latest_date_created: dict[Release:datetime.datetime]
680+ loaded_releases: set[Release]
681+
682+ def __init__(self) -> None:
683+ self.loaded_releases = set()
684+ self.latest_date_created = {}
685+ super().__init__()
686+
687+ def load(self, filter_releases: list = []) -> None:
688+ final_releases = []
689+ active_ubuntu_releases = get_active_ubuntu_releases()
690+ for release in active_ubuntu_releases:
691+ if filter_releases and release not in filter_releases: continue
692+ final_releases.append(release)
693+
694+ with Pool(processes=os.cpu_count()) as pool:
695+ print(f'Loading {len(final_releases)} releases')
696+ package_data = pool.map(self._load_release_from_cache, final_releases)
697+
698+ for release, release_data, last_date_created in package_data:
699+ self._init_release(release, release_data, last_date_created)
700+
701+ self._order_packages()
702+
703+ def _load_release_from_cache(self, release: str) -> dict:
704+ data = {}
705+
706+ try:
707+ with open(os.path.join(os.path.expanduser(PKG_CACHE_DIR), release.replace('/', '_') + '-pkg-cache.json')) as f:
708+ data = json.load(f)
709+ except FileNotFoundError as ex:
710+ print(ex)
711+ return (release, None, None)
712+
713+ release_data = {}
714+ latest_date_created = None
715+ release_obj = Release(release)
716+ for src_pkg_name, src_pkg_versions in data.items():
717+ if src_pkg_name == 'latest_date_created':
718+ latest_date_created = datetime.datetime.fromtimestamp(src_pkg_versions)
719+ continue
720+ source_package = UbuntuSourcePackage(src_pkg_name, {})
721+
722+ release_data[src_pkg_name] = source_package
723+ for src_pkg_version, src_pkg_version_data in src_pkg_versions.items():
724+ src_pkg_version_obj = UbuntuVersion(src_pkg_version)
725+ source_package.add_source_version(src_pkg_version_obj, release_obj,
726+ src_pkg_version_data['component'],
727+ src_pkg_version_data['pocket'])
728+
729+ for binary_name, binary_data in src_pkg_version_data['binaries'].items():
730+ source_package.add_binary(src_pkg_version_obj, UbuntuBinaryPackage(binary_name,
731+ UbuntuVersion(binary_data['version']),
732+ binary_data['arch'],
733+ source_package,
734+ binary_data['component']))
735+
736+ return (release_obj, release_data, latest_date_created)
737+
738+ def _init_release(self, release: Release, release_data: dict, latest_date_created: str) -> None:
739+ if not release_data: return
740+ for package_name in release_data:
741+ if package_name in self.packages:
742+ self.packages[package_name].expand_versions(release_data[package_name])
743+ else:
744+ self.packages[package_name] = release_data[package_name]
745+
746+ self.latest_date_created[release] = latest_date_created
747+ self.loaded_releases.add(release)
748+
749+ def get_package(self, pkg_name: str) -> UbuntuSourcePackage:
750+ return super().get_package(pkg_name)
751+
752+ def load_release(self, release:str) -> None:
753+ if Release(release) in self.loaded_releases: return
754+ release_obj, data, date = self._load_release_from_cache(release)
755+ self._init_release(release_obj, data, date)
756+
757+ def unload_release(self, release: str) -> None:
758+ release = Release(release)
759+ if release not in self.loaded_releases: return
760+ self.loaded_releases.remove(release)
761+
762+ for package_name, package in self.packages.items():
763+ original_source_versions = package.source_versions.copy()
764+ for version in original_source_versions:
765+ ver_rel = package.get_version_release(version)
766+ if ver_rel == release: package.remove_version(version)
767+
768+class UCTCVEStorage(CVEStorage):
769+ """
770+ Storage class for handling UCT CVE packages.
771+ """
772+ cves: dict[str: UbuntuCVE]
773+ package_storage: UCTPackageStorage
774+
775+ # TODO: add a changed cache so we can cache CVEs and only update when something got loaded
776+
777+ def load(self, cves_filter: list = []) -> None:
778+ final_cves = []
779+ cves_ids = self.get_uct_cve_ids()
780+ for cve_id in cves_ids:
781+ if cves_filter and cve_id not in cves_filter: continue
782+ final_cves.append(cve_id)
783+
784+ with Pool(processes=os.cpu_count()) as pool:
785+ print(f'Loading {len(final_cves)} CVEs')
786+ cves = pool.map(self._uct_cve_loader, final_cves)
787+
788+ for cve, cve_pkg_data in cves:
789+ if not cve: continue
790+ self._init_cve(cve, cve_pkg_data)
791+
792+ self._order_cves()
793+
794+ def _uct_cve_loader(self, cve_id: str) -> UbuntuCVE:
795+ """Internal function to load CVE from UCT"""
796+ data = {}
797+ for directory in cve_lib.cve_dirs:
798+ try:
799+ data = cve_lib.load_cve(os.path.join(UCT, directory, cve_id))
800+ except:
801+ continue
802+
803+ if not data:
804+ raise FileNotFoundError(f'Couldn\'t find CVE {cve_id}')
805+
806+ cvss = []
807+ for cvss_entry in data['CVSS']:
808+ if not cvss_entry: continue
809+
810+ cvss.append(CVSS(
811+ cvss_entry['source'],
812+ float(cvss_entry['baseScore']),
813+ cvss_entry['vector']
814+ ))
815+
816+ try:
817+ cve = UbuntuCVE(data['Candidate'], data['Description'], data['PublicDate'], cvss, data['Assigned-to'], data['Notes'], data['References'], data['Priority'][0], data['Priority'][1])
818+ except Exception as ex:
819+ print(ex)
820+ return (None, None)
821+
822+ return (cve, data['pkgs'])
823+
824+ def get_uct_cve_ids(self) -> list:
825+ """Return all CVE IDs identified from UCT"""
826+
827+ cve_ids = []
828+ for dir in cve_lib.cve_dirs:
829+ for (_, _, filenames) in os.walk(dir):
830+ for filename in filenames:
831+ if not re.search(CVE.ID_REGEX, filename): continue
832+ cve_ids.append(filename)
833+ break
834+
835+ return cve_ids
836+
837+ def load_cve(self, cve_id: str) -> None:
838+ """Load a CVE into the storage"""
839+ cve, cve_data = self._uct_cve_loader(cve_id)
840+ self._init_cve(cve, cve_data)
841+
842+ def get_all_cves(self) -> list[UbuntuCVE]:
843+ """Get all CVEs from the storage"""
844+ cves = []
845+ for cve_id in self.cves:
846+ cves.append(self.get_cve(cve_id))
847+ return cves
848+
849+ def get_all_rel_cves(self, release: Release) -> list[UbuntuCVE]:
850+ """Get all CVEs from the storage affecting a release"""
851+ cves = []
852+ for cve_id in self.cves:
853+ cve = self.get_cve(cve_id)
854+ if cve.is_rel_present(release):
855+ cves.append()
856+ return cves
857+
858+ def populate_cve_entries(self, cve_id: str) -> None:
859+ cve, cve_pkg_data = self.cves[cve_id]
860+ cve.pkg_entries = list()
861+ for package_name, package_entries in cve_pkg_data.items():
862+ package = self.package_storage.get_package(package_name)
863+ if not package: continue
864+ for release, status in package_entries.items():
865+ release = Release(release)
866+ if not package.release_exists(release, include_parents=True): continue
867+ cve.add_cve_entry(UbuntuCVEPkgEntry(package, cve, status[0], status[1], release))
868+
869+ def get_cve(self, cve_id: str, with_pkg_links: bool = True) -> UbuntuCVE:
870+ """
871+ Get a CVE from the storage.
872+ The package links will be computed when the
873+ CVE is requested, extracting them from the internal
874+ UCTPackageStorage.
875+
876+ If with_pkg_links is disabled, links won't be computed
877+ """
878+ if cve_id not in self.cves: return None
879+
880+ cve, _ = self.cves[cve_id]
881+ cve.pkg_entries = list()
882+ if not with_pkg_links: return cve
883+ self.populate_cve_entries(cve_id)
884+
885+ return cve
886+
887+class USNStorage(SNStorage):
888+ sns: dict[str: tuple[USN, list, dict]]
889+ package_storage: UCTPackageStorage
890+ cve_storage: UCTCVEStorage
891+
892+ def load(self) -> None:
893+ usns_data = self._get_usn_database_data()
894+ print(f'Loading {len(usns_data)} USNs')
895+ for usn_id, usn_data in usns_data.items():
896+ _, _, cves, lp_bugs, pkgs = self.process_usn_data((usn_id, usn_data))
897+ self.add_usn(usn_id, usn_data, cves, lp_bugs, pkgs)
898+
899+ self._order_sns()
900+
901+ def _get_usn_database_data(self) -> dict:
902+ data = {}
903+ with open(os.path.expanduser(USN_DATABASE), 'r') as f:
904+ data.update(json.load(f))
905+ return data
906+
907+ def process_usn_data(self, usn_raw: dict) -> tuple[set, set, dict]:
908+ pkgs = {}
909+ lp_bugs = set()
910+ cves = set()
911+ usn_id = usn_raw[0]
912+ usn_data = usn_raw[1]
913+ # Package - fixed version loading
914+ for rel, info in usn_data['releases'].items():
915+ # Get package
916+ for pkg, pkg_info in info['sources'].items():
917+ pkgs.setdefault(pkg, [])
918+ pkgs[pkg].append((pkg_info['version'], rel))
919+
920+ # CVE loading
921+ for cve_text in usn_data['cves']:
922+ if 'launchpad.net' in cve_text:
923+ lp_bugs.add(cve_text)
924+ continue
925+ elif re.search(r'CVE-\d{4}-\d{4,7}', cve_text):
926+ cves.add(cve_text)
927+
928+ return usn_id, usn_data, cves, lp_bugs, pkgs
929+
930+ def add_usn(self, usn_id: str, usn_data: dict, cves: set, lp_bugs: set, pkgs: dict) -> None:
931+ self.sns[usn_id] = (USN(usn_id, usn_data, set(), dict(), lp_bugs), cves, pkgs)
932+
933+ def get_usns_by_cve(self, cve_id: str) -> list[USN]:
934+ target_usns = []
935+ for usn_id, usn_data in self.sns.items():
936+ if cve_id in usn_data[1]:
937+ target_usns.append(self.get_usn(usn_id))
938+
939+ return target_usns
940+
941+ def get_usn(self, usn_id: str) -> USN:
942+ if usn_id not in self.sns: return None
943+ usn, cves, pkgs = self.sns[usn_id]
944+ cve_mapping = set()
945+ pkg_mapping = dict()
946+
947+ for cve_id in cves:
948+ cve = self.cve_storage.get_cve(cve_id)
949+ if not cve: continue
950+ cve_mapping.add(cve)
951+
952+ for pkg_name in pkgs:
953+ pkg = self.package_storage.get_package(pkg_name)
954+ if not pkg: continue
955+ for version, release in pkgs[pkg_name]:
956+ release = Release(release)
957+ if not pkg.release_exists(release, include_parents=True): continue
958+ pkg_mapping.setdefault(pkg, [])
959+ version = UbuntuVersion(version)
960+ if version.is_esm():
961+ for pkg_version in pkg.source_versions:
962+ if pkg.get_version_release(pkg_version) == release:
963+ release = release.get_esm_release(pkg.get_version_component(pkg_version))
964+ break
965+
966+ pkg_mapping[pkg].append((version, release))
967+
968+ usn.cves = cve_mapping
969+ usn.package_fixed_versions = pkg_mapping
970+
971+ return usn
972+
973+
974diff --git a/scripts/datalib/utils.py b/scripts/datalib/utils.py
975new file mode 100644
976index 0000000..cc072e0
977--- /dev/null
978+++ b/scripts/datalib/utils.py
979@@ -0,0 +1,20 @@
980+#!/usr/bin/python3
981+# -*- coding: utf-8 -*-
982+# Module containing classes that represent CVE and Package data
983+#
984+# Author: David Fernandez Gonzalez <david.fernandezgonzalez@canonical.com>
985+# Copyright (C) 2024 Canonical Ltd.
986+#
987+# This script is distributed under the terms and conditions of the GNU General
988+# Public License, Version 2 or later. See http://www.gnu.org/copyleft/gpl.html
989+# for details.
990+#
991+import cve_lib
992+
993+def get_active_ubuntu_releases():
994+ active_ubuntu_releases = []
995+ for release in cve_lib.all_releases:
996+ if release not in cve_lib.eol_releases or release in cve_lib.get_active_releases_with_esm():
997+ active_ubuntu_releases.append(release)
998+
999+ return active_ubuntu_releases
1000\ No newline at end of file

Subscribers

People subscribed via source and target branches