Merge ~litios/ubuntu-cve-tracker:oval-refactor into ubuntu-cve-tracker:master

Proposed by David Fernandez Gonzalez
Status: Merged
Merge reported by: David Fernandez Gonzalez
Merged at revision: 3acb478f6a74effd62974c3e94399922a0036306
Proposed branch: ~litios/ubuntu-cve-tracker:oval-refactor
Merge into: ubuntu-cve-tracker:master
Diff against target: 2793 lines (+1117/-1275)
3 files modified
scripts/generate-oval (+85/-96)
scripts/oval_lib.py (+1031/-1178)
test/gold_oval_structure/oval.xml (+1/-1)
Reviewer Review Type Date Requested Status
Eduardo Barretto Approve
Review via email: mp+455118@code.launchpad.net

Description of the change

This PR refactors the OVAL generation. The main changes are:

* Use of XML library.
* Creation of a common Generator class that shares most of the code needed to generate the OVAL elements.
* Refactor of CVE Generator and PKG generator to inherit from this class.
* Refactor of generate-oval to share a common interface
* New feature for pkg cache when the pkg version is not present: now it will use the lowest in the release if the binary versions differ.

The biggest impact will be on OVAL CVE, as the IDs, spacing, etc will differ now due to using the same one as PKG OVAL has been using.

To post a comment you must log in.
Revision history for this message
Eduardo Barretto (ebarretto) wrote (last edit ):

Hey, I just did a quick check and found some small things to improve and wanted to mention already.
I will continue to check the code later and run some tests locally too.
Thanks for all this hard work :)

review: Needs Fixing
7365bc8... by David Fernandez Gonzalez

oval: increase generator speed

Signed-off-by: David Fernandez Gonzalez <email address hidden>

0574f1a... by David Fernandez Gonzalez

generate-oval: improvements

Signed-off-by: David Fernandez Gonzalez <email address hidden>

107e5ce... by David Fernandez Gonzalez

oval_lib: use CVE-based IDs for definitions for CVE OVAL

Signed-off-by: David Fernandez Gonzalez <email address hidden>

bec0e63... by David Fernandez Gonzalez

oval_lib: fix some bugs

Signed-off-by: David Fernandez Gonzalez <email address hidden>

29ca294... by David Fernandez Gonzalez

oval_lib: replace macos tag by linux

Signed-off-by: David Fernandez Gonzalez <email address hidden>

7239e32... by David Fernandez Gonzalez

oval_lib: bump OVAL generator version to 2

Signed-off-by: David Fernandez Gonzalez <email address hidden>

a42624b... by David Fernandez Gonzalez

oval_lib: use latest version binaries when vulnerable

e0a6581... by David Fernandez Gonzalez

oval_lib: PKG - fix ids and ignore packages without binaries

Signed-off-by: David Fernandez Gonzalez <email address hidden>

7595fed... by David Fernandez Gonzalez

OVAL: fix test to fit new linux tag

Signed-off-by: David Fernandez Gonzalez <email address hidden>

9bf1b0f... by David Fernandez Gonzalez

OVAL: CVE - use same IDs as old format

Signed-off-by: David Fernandez Gonzalez <email address hidden>

42bf4f9... by David Fernandez Gonzalez

OVAL: CVE/PKG no longer use tmp directories

Signed-off-by: David Fernandez Gonzalez <email address hidden>

08b5354... by David Fernandez Gonzalez

OVAL: prefix argument is not longer used for CVE/PKG

Signed-off-by: David Fernandez Gonzalez <email address hidden>

05c7540... by David Fernandez Gonzalez

OVAL: enable ocioutdir for PKG/CVE

Signed-off-by: David Fernandez Gonzalez <email address hidden>

Revision history for this message
Eduardo Barretto (ebarretto) wrote :

lgtm!
thanks for landing the last fixes!

review: Approve
89047d3... by David Fernandez Gonzalez

OVAL: improve USN generation code

 * Fix bug when not listing releases
 * Refactor generate_oval_usn code in generate-oval

Signed-off-by: David Fernandez Gonzalez <email address hidden>

3acb478... by David Fernandez Gonzalez

OVAL: make IDs smaller

Signed-off-by: David Fernandez Gonzalez <email address hidden>

Revision history for this message
David Fernandez Gonzalez (litios) wrote :

Thanks a lot for all the reviews and the patience to check everything!!

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/scripts/generate-oval b/scripts/generate-oval
index 723127f..bdb9902 100755
--- a/scripts/generate-oval
+++ b/scripts/generate-oval
@@ -96,8 +96,8 @@ def main():
96 '(default is ./)')96 '(default is ./)')
97 parser.add_argument('--usn-number', default=None, type=str,97 parser.add_argument('--usn-number', default=None, type=str,
98 help='if passed specifics a USN for the oval_usn generator')98 help='if passed specifics a USN for the oval_usn generator')
99 parser.add_argument('--oval-release', default=None, type=str,99 parser.add_argument('--oval-releases', default=None, action='append',
100 help='specifies a release to generate the oval usn')100 help='specifies releases to generate the oval')
101 parser.add_argument('--packages', nargs='+', action='store', default=None,101 parser.add_argument('--packages', nargs='+', action='store', default=None,
102 help='generates oval for specific packages. Only for '102 help='generates oval for specific packages. Only for '
103 'CVE OVAL')103 'CVE OVAL')
@@ -132,27 +132,23 @@ def main():
132132
133 if args.usn_oval:133 if args.usn_oval:
134 if args.oci:134 if args.oci:
135 generate_oval_usn(args.output_dir, args.usn_number, args.oval_release,135 generate_oval_usn(args.output_dir, args.usn_number, args.oval_releases,
136 args.cve_prefix_dir, args.usn_db_dir, ociprefix, ocioutdir)136 args.cve_prefix_dir, args.usn_db_dir, args.no_progress, ociprefix, ocioutdir)
137 else:137 else:
138 generate_oval_usn(args.output_dir, args.usn_number, args.oval_release,138 generate_oval_usn(args.output_dir, args.usn_number, args.oval_releases,
139 args.cve_prefix_dir, args.usn_db_dir)139 args.cve_prefix_dir, args.usn_db_dir, args.no_progress,)
140140
141 return141 return
142142
143 # if --oval-release, we still need to load parents cache143 # if --oval-release, we still need to load parents cache
144 # so we can generate a complete oval for those releases that144 # so we can generate a complete oval for those releases that
145 # have a parent145 # have a parent
146 if args.oval_release:146 releases = supported_releases
147 releases = [args.oval_release]147 if args.oval_releases:
148 if args.oval_release not in supported_releases:148 releases = args.oval_releases
149 error(f"unknown oval release {args.oval_release}")149 for release in releases:
150 else:150 if release not in supported_releases:
151 r = args.oval_release151 error(f"unknown oval release {release}")
152 while release_parent(r):
153 r = release_parent(r)
154 releases.append(r)
155 supported_releases = releases
156152
157 cache = {}153 cache = {}
158 for release in supported_releases:154 for release in supported_releases:
@@ -161,17 +157,15 @@ def main():
161157
162 if args.pkg_oval:158 if args.pkg_oval:
163 if args.oci:159 if args.oci:
164 generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ociprefix, ocioutdir)160 generate_oval_package(releases, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ocioutdir)
165 else:161 else:
166 generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only)162 generate_oval_package(releases, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only)
167 return163 return
168164
169 if args.oci:165 if args.oci:
170 generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci,166 generate_oval_cve(releases, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ocioutdir)
171 args.no_progress, args.packages, pathnames, ociprefix, ocioutdir)
172 else:167 else:
173 generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci,168 generate_oval_cve(releases, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only)
174 args.no_progress, args.packages, pathnames)
175 return169 return
176170
177171
@@ -390,7 +384,7 @@ def get_usn_database(usn_db_dir):
390# WARNING:384# WARNING:
391# be sure the release you are passing is in the usn-number passed385# be sure the release you are passing is in the usn-number passed
392# otherwise it will generate an oval file without the usn info.386# otherwise it will generate an oval file without the usn info.
393def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=None, ocioutdir=None):387def generate_oval_usn(outdir, usn, usn_releases, cve_dir, usn_db_dir, no_progress, ociprefix=None, ocioutdir=None):
394 # Get the usn database.json data388 # Get the usn database.json data
395 usn_database = get_usn_database(usn_db_dir)389 usn_database = get_usn_database(usn_db_dir)
396 if not usn_database:390 if not usn_database:
@@ -400,33 +394,28 @@ def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=N
400 if usn not in usn_database:394 if usn not in usn_database:
401 error("Please enter a valid USN number or update your database.json and try again")395 error("Please enter a valid USN number or update your database.json and try again")
402396
403 if usn_release:
404 if usn_release not in supported_releases:
405 error("Please enter a valid release name.")
406
407 # Create OvalGeneratorUSN objects397 # Create OvalGeneratorUSN objects
408 ovals = []398 ovals = []
409 # Does the oval for just a specific given release399 valid_releases = []
410 if usn_release:400
411 ovals.append(oval_lib.OvalGeneratorUSN(usn_release, release_name(usn_release), outdir, cve_dir))401 # Check or generate valid releases
412 # Also produce oval generator object for OCI402 if usn_releases:
413 if ocioutdir:403 for usn_release in usn_releases:
414 ovals.append(oval_lib.OvalGeneratorUSN(usn_release, release_name(usn_release), ocioutdir,404 if usn_release not in supported_releases:
415 cve_dir, ociprefix, 'oci'))405 error(f"Invalid release name '{usn_release}'.")
406 valid_releases = usn_releases
416 else:407 else:
417 for release in supported_releases:408 valid_releases = list(filter(lambda release: product_series(release)[0] == PRODUCT_UBUNTU, supported_releases))
418 # for now we don't differentiate products (e.g. esm) in the USN DB
419 product, series = product_series(release)
420 if product != PRODUCT_UBUNTU:
421 continue
422409
423 ovals.append(oval_lib.OvalGeneratorUSN(release, release_name(release), outdir, cve_dir))410 if not no_progress:
424 # Also produce oval generator object for OCI411 print('[*] Generating OVAL USN for packages in releases', ', '.join(valid_releases))
425 if ocioutdir:
426 ovals.append(oval_lib.OvalGeneratorUSN(release, release_name(release), ocioutdir,
427 cve_dir, ociprefix,
428 'oci'))
429412
413 for release in valid_releases:
414 ovals.append(oval_lib.OvalGeneratorUSN(release, release_name(release), outdir, cve_dir))
415 # Also produce oval generator object for OCI
416 if ocioutdir:
417 ovals.append(oval_lib.OvalGeneratorUSN(release, release_name(release), ocioutdir,
418 cve_dir, ociprefix, 'oci'))
430 # Generate OVAL USN data419 # Generate OVAL USN data
431 if usn:420 if usn:
432 prepend_usn_to_id(usn_database, usn)421 prepend_usn_to_id(usn_database, usn)
@@ -441,62 +430,62 @@ def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=N
441 for oval in ovals:430 for oval in ovals:
442 oval.write_oval_elements()431 oval.write_oval_elements()
443432
433 if not no_progress:
434 print(f'[*] Done generating OVAL USN for packages in releases {", ".join(valid_releases)}')
435
444 return True436 return True
445437
446def generate_oval_package(outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ociprefix='', ocioutdir=None):438def generate_oval_package(releases, outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ocioutdir=None):
447 for release in supported_releases:439 if not no_progress:
448 if not no_progress:440 print(f'[*] Generating OVAL PKG for packages in releases {", ".join(releases)}')
449 print(f'[*] Generating OVAL for packages in release {release}')441
450 ov = oval_lib.OvalGeneratorPkg(release, release_name(release), pathnames, packages, not no_progress, pkg_cache=pkg_cache, fixed_only=fixed_only, cve_cache=cve_cache, oval_format='oci' if oci else 'dpkg', outdir=outdir, cve_prefix_dir=cve_prefix_dir, prefix=ociprefix)442 ov = oval_lib.OvalGeneratorPkg(
443 releases,
444 pathnames,
445 packages,
446 not no_progress,
447 pkg_cache=pkg_cache,
448 fixed_only=fixed_only,
449 cve_cache=cve_cache,
450 oval_format='dpkg',
451 outdir=outdir,
452 cve_prefix_dir=cve_prefix_dir
453 )
454 ov.generate_oval()
455
456 if oci:
457 ov.oval_format = 'oci'
458 ov.output_dir = ocioutdir
451 ov.generate_oval()459 ov.generate_oval()
452460
453 if oci:461 if not no_progress:
454 ov.oval_format = 'dpkg'462 print(f'[X] Done generating OVAL PKG for packages in releases {", ".join(releases)}')
455 ov.generate_oval()463
456464def generate_oval_cve(releases, outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ocioutdir=None):
457 if not no_progress:465 if not no_progress:
458 print(f'[X] Done generating OVAL for packages in release {release}')466 print(f'[*] Generating OVAL CVE for packages in releases {",".join(releases)}')
459467
460def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages, pathnames, ociprefix=None, ocioutdir=None):468 ov = oval_lib.OvalGeneratorCVE(
461 ovals = dict()469 releases,
462 for release in supported_releases:470 pathnames,
463 # we can have nested parent releases471 packages,
464 parent = release_progenitor(release)472 not no_progress,
465 index = '{0}_dpkg'.format(release)473 pkg_cache=pkg_cache,
466 ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, outdir, prefix='', oval_format='dpkg')474 fixed_only=fixed_only,
467 ovals[index].add_release_applicability_definition()475 cve_cache=cve_cache,
468 if oci:476 oval_format='dpkg',
469 index = '{0}_oci'.format(release)477 outdir=outdir,
470 ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, ocioutdir, prefix=ociprefix, oval_format='oci')478 cve_prefix_dir=cve_prefix_dir
471 ovals[index].add_release_applicability_definition()479 )
472480 ov.generate_oval()
473 # loop through all CVE data files481
474 files = []482 if oci:
475 for pathname in pathnames:483 ov.oval_format = 'oci'
476 files = files + glob.glob(os.path.join(cve_prefix_dir, pathname))484 ov.output_dir = ocioutdir
477 files.sort()485 ov.generate_oval()
478
479 pkg_filter = None
480 if packages:
481 pkg_filter = packages
482
483 files_count = len(files)
484 for i_file, filepath in enumerate(files):
485 cve_data = parse_cve_file(filepath, cache, pkg_filter)
486 # skip CVEs without packages for supported releases
487 if not cve_data['packages']:
488 if not no_progress:
489 progress_bar(i_file + 1, files_count)
490 continue
491
492 for i in ovals:
493 ovals[i].generate_cve_definition(cve_data)
494
495 if not no_progress:
496 progress_bar(i_file + 1, files_count)
497486
498 for i in ovals:487 if not no_progress:
499 ovals[i].write_to_file()488 print(f'[X] Done generating OVAL CVE for packages in releases {", ".join(releases)}')
500489
501if __name__ == '__main__':490if __name__ == '__main__':
502 main()491 main()
diff --git a/scripts/oval_lib.py b/scripts/oval_lib.py
index 655f0af..abe63dc 100644
--- a/scripts/oval_lib.py
+++ b/scripts/oval_lib.py
@@ -22,7 +22,6 @@ from datetime import datetime, timezone
22import apt_pkg22import apt_pkg
23import io23import io
24import os24import os
25import random
26import re25import re
27import shutil26import shutil
28import sys27import sys
@@ -30,6 +29,7 @@ import tempfile
30import collections29import collections
31import glob30import glob
32import xml.etree.cElementTree as etree31import xml.etree.cElementTree as etree
32import json
33from xml.dom import minidom33from xml.dom import minidom
34from typing import Tuple # Needed because of Python < 3.9 and to also support < 3.734from typing import Tuple # Needed because of Python < 3.9 and to also support < 3.7
3535
@@ -41,6 +41,7 @@ from xml.sax.saxutils import escape
41sources = {}41sources = {}
42source_map_binaries = {}42source_map_binaries = {}
43debug_level = 043debug_level = 0
44GENERIC_VERSION = '0:0'
4445
45def recursive_rm(dirPath):46def recursive_rm(dirPath):
46 '''recursively remove directory'''47 '''recursively remove directory'''
@@ -143,80 +144,361 @@ def generate_cve_tag(cve):
143 cve_ref += '>{0}</cve>'.format(cve['Candidate'])144 cve_ref += '>{0}</cve>'.format(cve['Candidate'])
144 return cve_ref145 return cve_ref
145146
146def get_latest_version(versions):147def get_binarypkgs(cache, source_name, release):
147 latest = None
148 for version in versions:
149 if not latest:
150 latest = version
151 continue
152 elif apt_pkg.version_compare(version, latest) > 0:
153 latest = version
154
155 return latest
156
157def get_binarypkgs(cache, source_name, release, version=None):
158 """ return a list of binary packages from the source package version """148 """ return a list of binary packages from the source package version """
159 packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-")149 packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-")
160 version_map = collections.defaultdict(list)150 binaries_map = collections.defaultdict(dict)
161 cache_version = version
162151
163 if source_name not in cache[release]:152 if source_name not in cache[release]:
164 rel = release153 rel = release
165 while cve_lib.release_parent(rel):154 while cve_lib.release_parent(rel):
166 rel = cve_lib.release_parent(rel)155 rel = cve_lib.release_parent(rel)
167 r , sv, vm = get_binarypkgs(cache, source_name, rel, version)156 r , vb = get_binarypkgs(cache, source_name, rel)
168 if r:157 if r:
169 return r, sv, vm158 return r, vb
170159
171 # if a source package does not exist in such a release160 # if a source package does not exist in such a release
172 # return None161 # return None
173 return release, None, None162 return None, None
174 elif version and version not in cache[release][source_name]:163
175 rel = release164 for source_version in cache[release][source_name]:
176 while cve_lib.release_parent(rel):165 binaries_map.setdefault(source_version, dict())
177 rel = cve_lib.release_parent(rel)166 for binary, bin_data in cache[release][source_name][source_version]['binaries'].items():
178 r , sv, vm = get_binarypkgs(cache, source_name, rel, version)167 # for kernel we only want linux images
179 if r:168 if source_name.startswith('linux') and not binary.startswith('linux-image-'):
180 return r, sv, vm169 continue
181170 # skip ignored packages, with exception of golang*-dev pkgs
182 # if version is not in release, then fetch latest source version171 if binary.startswith(('golang-go')) or \
183 cache_version = get_latest_version(list(cache[release][source_name].keys()))172 not any(s in binary for s in packages_to_ignore):
184 elif not version:173 binaries_map[source_version].setdefault(bin_data['version'], list())
185 # if no version is provided, get latest source version174 binaries_map[source_version][bin_data['version']].append(binary)
186 version = get_latest_version(list(cache[release][source_name].keys()))175
187 cache_version = version176 return release, binaries_map
188 else:177
189 cache_version = version178class CVEPkgRelEntry:
179 def __init__(self, pkg, release, cve, status, note) -> None:
180 self.pkg = pkg
181 self.cve = cve
182 self.orig_status = status
183 self.orig_note = note
184 self.release = release
185 cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, None)
186
187 self.note = cve_info['note']
188 self.status = cve_info['status']
189 self.fixed_version = cve_info['fix-version'] if self.status == 'fixed' else None
190
191 @staticmethod
192 def parse_package_status(release, package, status_text, note, filepath, cache):
193 """ parse ubuntu package status string format:
194 <status code> (<version/notes>)
195 outputs dictionary: {
196 'status' : '<not-applicable | unknown | vulnerable | fixed>',
197 'note' : '<description of the status>',
198 'fix-version' : '<version with issue fixed, if applicable>',
199 'bin-pkgs' : []
200 } """
201
202 # TODO fix for CVE Generator
203
204 # break out status code and detail
205 code = status_text.lower()
206 detail = note.strip('()') if note else None
207 status = {}
208 fix_version = ""
209
210 if detail and detail[0].isdigit() and len(detail.split(' ')) == 1:
211 fix_version = detail
212
213 note_end = " (note: '{0}').".format(detail) if detail else '.'
214 if code == 'dne':
215 status['status'] = 'not-applicable'
216 status['note'] = \
217 " package does not exist in {0}{1}".format(release, note_end)
218 elif code == 'ignored':
219 status['status'] = 'vulnerable'
220 status['note'] = ": while related to the CVE in some way, a decision has been made to ignore this issue{0}".format(note_end)
221 elif code == 'not-affected':
222 # check if there is a release version and if so, test for
223 # package existence with that version
224 if fix_version:
225 status['status'] = 'fixed'
226 status['note'] = " package in {0}, is related to the CVE in some way and has been fixed{1}".format(release, note_end)
227 status['fix-version'] = fix_version
228 else:
229 status['status'] = 'not-vulnerable'
230 status['note'] = " package in {0}, while related to the CVE in some way, is not affected{1}".format(release, note_end)
231 elif code == 'needed':
232 status['status'] = 'vulnerable'
233 status['note'] = \
234 " package in {0} is affected and needs fixing{1}".format(release, note_end)
235 elif code == 'pending':
236 # pending means that packages have been prepared and are in
237 # -proposed or in a ppa somewhere, and should have a version
238 # attached. If there is a version, test for package existence
239 # with that version, otherwise mark as vulnerable
240 if fix_version:
241 status['status'] = 'fixed'
242 status['note'] = " package in {0} is affected. An update containing the fix has been completed and is pending publication{1}".format(release, note_end)
243 status['fix-version'] = fix_version
244 else:
245 status['status'] = 'vulnerable'
246 status['note'] = " package in {0} is affected. An update containing the fix has been completed and is pending publication{1}".format(release, note_end)
247 elif code == 'deferred':
248 status['status'] = 'vulnerable'
249 status['note'] = " package in {0} is affected, but a decision has been made to defer addressing it{1}".format(release, note_end)
250 elif code in ['released']:
251 # if there isn't a release version, then just mark
252 # as vulnerable to test for package existence
253 if not fix_version:
254 status['status'] = 'vulnerable'
255 status['note'] = " package in {0} was vulnerable and has been fixed, but no release version available for it{1}".format(release, note_end)
256 else:
257 status['status'] = 'fixed'
258 status['note'] = " package in {0} was vulnerable but has been fixed{1}".format(release, note_end)
259 status['fix-version'] = fix_version
260 elif code == 'needs-triage':
261 status['status'] = 'vulnerable'
262 status['note'] = " package in {0} is affected and may need fixing{1}".format(release, note_end)
263 else:
264 # TODO LOGGIN
265 print('Unsupported status "{0}" in {1}_{2} in "{3}". Setting to "unknown".'.format(code, release, package, filepath))
266 status['status'] = 'unknown'
267 status['note'] = " package in {0} has a vulnerability that is not known (status: '{1}'). It is pending evaluation{2}".format(release, code, note_end)
268
269 return status
270
271 def is_not_applicable(self) -> bool:
272 return self.status in ['not-vulnerable', 'not-applicable']
273
274 def __str__(self) -> str:
275 return f'{str(self.pkg)}:{self.status} {self.fixed_version}'
276
277class CVE:
278 def __init__(self, number, info, pkgs=None) -> None:
279 self.number = number
280 self.description = info['Description']
281 self.priority = info['Priority'][0]
282 self.public_date = info['PublicDate']
283 self.public_date_at_usn = info['PublicDateAtUSN'] if 'PublicDateAtUSN' in info else ''
284 self.cvss = info['CVSS']
285 self.assigned_to = info['Assigned-to'] if 'Assigned-to' in info else ''
286 self.discoverd_by = info['Discovered-by'] if 'Discovered-by' in info else ''
287 self.usns = []
288 self.references = []
289 self.bugs = []
290 for url in info['References'].split('\n'):
291 if 'https://ubuntu.com/security/notices/USN-' in url:
292 self.usns.append(url[40:])
293 elif re.match("https?:\/\/(bugs\.)?launchpad\.net\/(.*\/\+bug|bugs)\/\d+", url):
294 self.bugs.append(url)
295 elif url:
296 self.references.append(url)
297
298 for bug in info['Bugs'].split('\n'):
299 if bug:
300 self.bugs.append(bug)
301
302 self.pkg_rel_entries = {}
303 self.pkgs = pkgs if pkgs else []
304
305 def get_pkgs(self, releases):
306 # We assume priority is as the order in the list
307 pkgs = []
308 pkg_rel = {}
309 for pkg in self.pkgs:
310 if pkg.rel not in releases:
311 continue
312
313 if pkg.name not in pkg_rel:
314 pkg_rel[pkg.name] = pkg.rel
315 elif releases.index(pkg.rel) < releases.index(pkg_rel[pkg.name]):
316 pkg_rel[pkg.name] = pkg.rel
317
318 for pkg in self.pkgs:
319 if self.pkg_rel_entries[str(pkg)].is_not_applicable():
320 continue
321
322 if pkg.name in pkg_rel and pkg_rel[pkg.name] == pkg.rel:
323 pkgs.append(pkg)
324
325 return pkgs
326
327
328 def add_pkg(self, pkg_object, release, state, note):
329 cve_pkg_entry = CVEPkgRelEntry(pkg_object, release, self, state, note)
330 self.pkg_rel_entries[str(pkg_object)] = cve_pkg_entry
331 self.pkgs.append(pkg_object)
332 pkg_object.add_cve(self)
333
334 def __str__(self) -> str:
335 return self.number
336
337 def __repr__(self):
338 return self.__str__()
339
340class Package:
341 def __init__(self, pkgname, rel, versions_binaries):
342 self.name = pkgname
343 self.rel = rel
344 self.description = cve_lib.lookup_package_override_description(pkgname)
345
346 if not self.description:
347 if 'description' in sources[rel][pkgname]:
348 self.description = sources[rel][pkgname]['description']
349 elif pkgname in source_map_binaries[rel] and \
350 'description' in source_map_binaries[rel][pkgname]:
351 self.description = source_map_binaries[rel][pkgname]['description']
352 else:
353 # Get first description found
354 if 'binaries' in sources[self.rel][self.name]:
355 for binary in sources[self.rel][self.name]['binaries']:
356 if binary in source_map_binaries[self.rel] and 'description' in source_map_binaries[self.rel][binary]:
357 self.description = source_map_binaries[self.rel][binary]["description"]
358 break
359
360 self.section = sources[rel][pkgname]['section']
361 self.versions_binaries = versions_binaries if versions_binaries else {}
362 self.earliest_version = self.get_earliest_version()
363 self.latest_version = self.get_latest_version()
364
365 binary_versions = self.get_binary_versions(self.earliest_version)
366 self.is_kernel_pkg = False if len(binary_versions) == 0 else \
367 is_kernel_binaries(self.get_binaries(self.earliest_version, binary_versions[0]))
368 self.cves = []
369
370 def add_cve(self, cve) -> None:
371 self.cves.append(cve)
372
373 def get_latest_version(self):
374 latest = None
375 for version in self.versions_binaries.keys():
376 if not latest:
377 latest = version
378 continue
379 elif apt_pkg.version_compare(version, latest) > 0:
380 latest = version
381
382 return latest
383
384 def get_earliest_version(self):
385 earliest = None
386 for version in self.versions_binaries.keys():
387 if not earliest:
388 earliest = version
389 continue
390 elif apt_pkg.version_compare(earliest, version) > 0:
391 earliest = version
392
393 return earliest
394
395 def version_exists(self, source_version):
396 return source_version in self.versions_binaries
397
398 def all_binaries_same_version(self, source_version):
399 if source_version not in self.versions_binaries:
400 return len(self.versions_binaries[self.earliest_version]) <= 1
401 return len(self.versions_binaries[source_version]) <= 1
402
403 def get_version_to_check(self, source_version):
404 if not source_version:
405 return self.latest_version
406 else:
407 if source_version in self.versions_binaries or self.all_binaries_same_version(source_version):
408 return source_version
409 else:
410 if source_version and apt_pkg.version_compare(source_version, self.earliest_version) > 0:
411 print(f'Wrong CVE entry version {source_version} - earliest for package {self.name} in {self.rel} is {self.earliest_version}')
412
413 return self.earliest_version
414
415 def get_binary_versions(self, source_version):
416 if not self.versions_binaries: return []
417
418 if source_version not in self.versions_binaries:
419 # If this is the case, package binaries should all have the same version
420 # Relying on that, we can use the version of the CVE as the right version
421 return [source_version]
422 return list(self.versions_binaries[source_version].keys())
423
424 def get_binaries(self, source_version, binary_version):
425 if not self.versions_binaries: return {}
426 if source_version not in self.versions_binaries:
427 if len(self.versions_binaries[self.earliest_version]) != 1:
428 print(f"WARN: Version {source_version} doesn't exist yet the package {self.name} has different versions for the binaries")
429
430 version_binaries = self.versions_binaries[self.earliest_version]
431 return version_binaries[self.get_binary_versions(self.earliest_version)[0]]
432 return self.versions_binaries[source_version][binary_version]
433
434 def __str__(self) -> str:
435 return f"{self.name}/{self.rel}"
190436
191 for binary, bin_data in cache[release][source_name][cache_version]['binaries'].items():437 def __repr__(self):
192 # for kernel we only want linux images438 return self.__str__()
193 if source_name.startswith('linux') and not binary.startswith('linux-image-'):439
194 continue440class USN:
195 # skip ignored packages, with exception of golang*-dev pkgs441 def __init__(self, data):
196 if binary.startswith(('golang-go')) or \442 for item in ['description', 'releases', 'title', 'timestamp', 'summary', 'action', 'cves', 'id', 'isummary']:
197 not any(s in binary for s in packages_to_ignore):443 if item in data:
198 version_map[bin_data['version']].append(binary)444 setattr(self, item, data[item])
445 else:
446 setattr(self, item, None)
447
448 def __str__(self) -> str:
449 return self.id
199450
200 return release, version, version_map451 def __repr__(self) -> str:
452 return self.id
201453
454# Oval Generators
202class OvalGenerator:455class OvalGenerator:
203 supported_oval_elements = ('definition', 'test', 'object', 'state', 'variable')456 supported_oval_elements = ('definition', 'test', 'object', 'state', 'variable')
204 generator_version = '1.1'457 generator_version = '2'
205 oval_schema_version = '5.11.1'458 oval_schema_version = '5.11.1'
206 def __init__(self, release, release_name = None, warn_method=False, outdir='./', prefix='', oval_format='dpkg') -> None:459 def __init__(self, type, releases, cve_paths, packages, progress, pkg_cache, fixed_only=True, cve_cache=None, cve_prefix_dir=None, outdir='./', oval_format='dpkg') -> None:
207 self.release = release460 self.releases = releases
208 # e.g. codename for trusty/esm should be trusty
209 self.release_codename = cve_lib.release_progenitor(release) if cve_lib.release_progenitor(release) else self.release.replace('/', '_')
210 self.release_name = release_name
211 #self.warn = warn_method or self.warn
212 self.tmpdir = tempfile.mkdtemp(prefix='oval_lib-')
213 self.output_dir = outdir461 self.output_dir = outdir
214 self.oval_format = oval_format462 self.oval_format = oval_format
463 self.generator_type = type
464 self.progress = progress
465 self.cve_cache = cve_cache
466 self.pkg_cache = pkg_cache
467 self.cve_paths = cve_paths
468 self.fixed_only = fixed_only
469 self.packages, self.cves = self._load(cve_prefix_dir, packages)
470
471 def _init_ids(self, release):
472 # e.g. codename for trusty/esm should be trusty
473 self.release = release
474 self.release_codename = cve_lib.release_progenitor(self.release) if cve_lib.release_progenitor(self.release) else self.release.replace('/', '_')
475 self.release_name = cve_lib.release_name(self.release)
476
477 self.parent_releases = set()
478 current_release = self.release
479 while(cve_lib.release_parent(current_release)):
480 current_release = cve_lib.release_parent(current_release)
481 if current_release != self.release:
482 self.parent_releases.add(current_release)
483
215 self.ns = 'oval:com.ubuntu.{0}'.format(self.release_codename)484 self.ns = 'oval:com.ubuntu.{0}'.format(self.release_codename)
216 self.id = 100485 self.id = 100
217 self.host_def_id = self.id486 self.host_def_id = self.id
218 self.release_applicability_definition_id = '{0}:def:{1}0'.format(self.ns, self.id)487 self.release_applicability_definition_id = '{0}:def:{1}0'.format(self.ns, self.id)
219488 ###
489 # ID schema: 2204|00001|0001
490 # * The first four digits are the ubuntu release number
491 # * The next 5 digits is # just a package counter, we increase it for each definition
492 # * The last 4 digits is a counter for the criterion
493 ###
494 release_code = int(self.release_name.split(' ')[1].replace('.', '')) if self.release not in cve_lib.external_releases else 1111
495 self.release_id = release_code * 10 ** 10
496 self.definition_id = self.release_id
497 self.definition_step = 1 * 10 ** 5
498 self.criterion_step = 10
499 self.output_filepath = \
500 '{0}com.ubuntu.{1}.{2}.oval.xml'.format('oci.' if self.oval_format == 'oci' else '', self.release.replace('/', '_'), self.generator_type)
501
220 def _add_structure(self, root) -> None:502 def _add_structure(self, root) -> None:
221 structure = {}503 structure = {}
222 for element in self.supported_oval_elements:504 for element in self.supported_oval_elements:
@@ -225,21 +507,11 @@ class OvalGenerator:
225507
226 return structure508 return structure
227509
228 def _get_root_element(self, type) -> etree.Element:510 def _get_generator(self, type) -> etree.Element:
229 oval_timestamp = datetime.now(tz=timezone.utc).strftime(511 oval_timestamp = datetime.now(tz=timezone.utc).strftime(
230 '%Y-%m-%dT%H:%M:%S')512 '%Y-%m-%dT%H:%M:%S')
231513
232 root_element = etree.Element("oval_definitions", attrib= {514 generator = etree.Element("generator")
233 "xmlns":"http://oval.mitre.org/XMLSchema/oval-definitions-5",
234 "xmlns:ind-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#independent",
235 "xmlns:oval":"http://oval.mitre.org/XMLSchema/oval-common-5",
236 "xmlns:unix-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#unix",
237 "xmlns:linux-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#linux",
238 "xmlns:xsi":"http://www.w3.org/2001/XMLSchema-instance" ,
239 "xsi:schemaLocation":"http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd"
240 })
241
242 generator = etree.SubElement(root_element, "generator")
243 product_name = etree.SubElement(generator, "oval:product_name")515 product_name = etree.SubElement(generator, "oval:product_name")
244 product_version = etree.SubElement(generator, "oval:product_version")516 product_version = etree.SubElement(generator, "oval:product_version")
245 schema_version = etree.SubElement(generator, "oval:schema_version")517 schema_version = etree.SubElement(generator, "oval:schema_version")
@@ -250,6 +522,19 @@ class OvalGenerator:
250 schema_version.text = self.oval_schema_version522 schema_version.text = self.oval_schema_version
251 timestamp.text = oval_timestamp523 timestamp.text = oval_timestamp
252524
525 return generator
526
527 def _get_root_element(self) -> etree.Element:
528 root_element = etree.Element("oval_definitions", attrib= {
529 "xmlns":"http://oval.mitre.org/XMLSchema/oval-definitions-5",
530 "xmlns:ind-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#independent",
531 "xmlns:oval":"http://oval.mitre.org/XMLSchema/oval-common-5",
532 "xmlns:unix-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#unix",
533 "xmlns:linux-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#linux",
534 "xmlns:xsi":"http://www.w3.org/2001/XMLSchema-instance" ,
535 "xsi:schemaLocation":"http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#linux linux-definitions-schema.xsd"
536 })
537
253 xml_tree = etree.ElementTree(root_element)538 xml_tree = etree.ElementTree(root_element)
254 return xml_tree, root_element539 return xml_tree, root_element
255540
@@ -396,229 +681,90 @@ class OvalGenerator:
396681
397 return family_state, state682 return family_state, state
398683
399class CVEPkgRelEntry:684 def _add_new_package(self, package_name, cve, release, cve_data, packages) -> None:
400 def __init__(self, pkg, release, cve, status, note, cache) -> None:685 if package_name not in packages:
401 self.pkg = pkg686 _, versions_binaries = get_binarypkgs(self.pkg_cache, package_name, release)
402 self.cve = cve687 pkg_obj = Package(package_name, release, versions_binaries)
403 self.orig_status = status688 packages[package_name] = pkg_obj
404 self.orig_note = note
405 self.release = release
406 cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, cache)
407689
408 self.note = cve_info['note']690 pkg_obj = packages[package_name]
409 self.status = cve_info['status']691 cve.add_pkg(pkg_obj, release, cve_data['pkgs'][package_name][release][0],cve_data['pkgs'][package_name][release][1])
410 self.fixed_version = cve_info['fix-version'] if self.status == 'fixed' else None
411692
412 @staticmethod693 def _load(self, cve_prefix_dir, packages_filter=None) -> None:
413 def parse_package_status(release, package, status_text, note, filepath, cache):694 cve_lib.load_external_subprojects()
414 """ parse ubuntu package status string format:
415 <status code> (<version/notes>)
416 outputs dictionary: {
417 'status' : '<not-applicable | unknown | vulnerable | fixed>',
418 'note' : '<description of the status>',
419 'fix-version' : '<version with issue fixed, if applicable>',
420 'bin-pkgs' : []
421 } """
422695
423 # TODO fix for CVE Generator696 cve_paths = []
697 for pathname in self.cve_paths:
698 cve_paths = cve_paths + glob.glob(os.path.join(cve_prefix_dir, pathname))
424699
425 # break out status code and detail700 cve_paths.sort(key=lambda cve:
426 code = status_text.lower()701 (int(cve.split('/')[-1].split('-')[1]), int(cve.split('/')[-1].split('-')[2])) \
427 detail = note.strip('()') if note else None702 if cve.split('/')[-1].split('-')[2].isnumeric() \
428 status = {}703 else (int(cve.split('/')[-1].split('-')[1]), 0)
429 fix_version = None704 )
430705
431 if detail and detail[0].isdigit() and len(detail.split(' ')) == 1:706 packages = {}
432 fix_version = detail707 cves = {}
708 base_releases = self.releases
709 final_releases = set(self.releases)
710 for current_release in base_releases:
711 while(cve_lib.release_parent(current_release)):
712 current_release = cve_lib.release_parent(current_release)
713 final_releases.add(current_release)
714
715 for release in final_releases:
716 packages.setdefault(release, {})
717 cves.setdefault(release, {})
718 sources[release] = load(releases=[release], skip_eol_releases=False)[release]
433719
434 parent = release720 orig_name = cve_lib.get_orig_rel_name(release)
435 if cache and code != 'dne':721 if '/' in orig_name:
436 parent, status['source-version'], status['bin-pkgs'] = get_binarypkgs(cache, package, release, version=fix_version)722 orig_name = orig_name.split('/', maxsplit=1)[1]
437 if parent != release:723 source_map_binaries[release] = load(data_type='packages',releases=[orig_name], skip_eol_releases=False)[orig_name] \
438 status['parent'] = parent724 if release not in cve_lib.external_releases else {}
439725
440 note_end = " (note: '{0}').".format(detail) if detail else '.'726 i = 0
441 if code == 'dne':727 for cve_path in cve_paths:
442 status['status'] = 'not-applicable'728 cve_number = cve_path.rsplit('/', 1)[1]
443 status['note'] = \729 i += 1
444 " package does not exist in {0}{1}".format(release, note_end)
445 elif code == 'ignored':
446 status['status'] = 'vulnerable'
447 status['note'] = ": while related to the CVE in some way, a decision has been made to ignore this issue{0}".format(note_end)
448 elif code == 'not-affected':
449 # check if there is a release version and if so, test for
450 # package existence with that version
451 if fix_version:
452 status['status'] = 'fixed'
453 status['note'] = " package in {0}, is related to the CVE in some way and has been fixed{1}".format(parent, note_end)
454 status['fix-version'] = fix_version
455 else:
456 status['status'] = 'not-vulnerable'
457 status['note'] = " package in {0}, while related to the CVE in some way, is not affected{1}".format(parent, note_end)
458 elif code == 'needed':
459 status['status'] = 'vulnerable'
460 status['note'] = \
461 " package in {0} is affected and needs fixing{1}".format(parent, note_end)
462 elif code == 'pending':
463 # pending means that packages have been prepared and are in
464 # -proposed or in a ppa somewhere, and should have a version
465 # attached. If there is a version, test for package existence
466 # with that version, otherwise mark as vulnerable
467 if fix_version:
468 status['status'] = 'fixed'
469 status['note'] = " package in {0} is affected. An update containing the fix has been completed and is pending publication{1}".format(parent, note_end)
470 status['fix-version'] = fix_version
471 else:
472 status['status'] = 'vulnerable'
473 status['note'] = " package in {0} is affected. An update containing the fix has been completed and is pending publication{1}".format(parent, note_end)
474 elif code == 'deferred':
475 status['status'] = 'vulnerable'
476 status['note'] = " package in {0} is affected, but a decision has been made to defer addressing it{1}".format(parent, note_end)
477 elif code in ['released']:
478 # if there isn't a release version, then just mark
479 # as vulnerable to test for package existence
480 if not fix_version:
481 status['status'] = 'vulnerable'
482 status['note'] = " package in {0} was vulnerable and has been fixed, but no release version available for it{1}".format(parent, note_end)
483 else:
484 status['status'] = 'fixed'
485 status['note'] = " package in {0} was vulnerable but has been fixed{1}".format(parent, note_end)
486 status['fix-version'] = fix_version
487 elif code == 'needs-triage':
488 status['status'] = 'vulnerable'
489 status['note'] = " package in {0} is affected and may need fixing{1}".format(parent, note_end)
490 else:
491 # TODO LOGGIN
492 print('Unsupported status "{0}" in {1}_{2} in "{3}". Setting to "unknown".'.format(code, release, package, filepath))
493 status['status'] = 'unknown'
494 status['note'] = " package in {0} has a vulnerability that is not known (status: '{1}'). It is pending evaluation{2}".format(release, code, note_end)
495
496 return status
497
498 def __str__(self) -> str:
499 return f'{str(self.pkg)}:{self.status} {self.fixed_version}'
500
501class CVE:
502 def __init__(self, number, info, pkgs=[]) -> None:
503 self.number = number
504 self.description = info['Description']
505 self.priority = info['Priority'][0]
506 self.public_date = info['PublicDate']
507 self.cvss = info['CVSS']
508 self.usns = []
509 for url in info['References'].split('\n'):
510 if 'https://ubuntu.com/security/notices/USN-' in url:
511 self.usns.append(url[40:])
512 self.pkg_rel_entries = {}
513 self.pkgs = pkgs
514
515 def add_pkg(self, pkg_object, cve_pkg_entry):
516 if cve_pkg_entry.status in ['not-vulnerable', 'not-applicable']:
517 return
518
519 self.pkg_rel_entries[pkg_object.name] = cve_pkg_entry
520 self.pkgs.append(pkg_object)
521 pkg_object.cves.append(self)
522
523 def __str__(self) -> str:
524 return self.number
525
526 def __repr__(self):
527 return self.__str__()
528
529class Package:
530 def __init__(self, pkgname, rel, binaries, version):
531 self.name = pkgname
532 self.rel = rel
533 self.description = cve_lib.lookup_package_override_description(pkgname)
534
535 if not self.description:
536 if 'description' in sources[rel][pkgname]:
537 self.description = sources[rel][pkgname]['description']
538 elif pkgname in source_map_binaries[rel] and \
539 'description' in source_map_binaries[rel][pkgname]:
540 self.description = source_map_binaries[rel][pkgname]['description']
541 else:
542 # Get first description found
543 if 'binaries' in sources[self.rel][self.name]:
544 for binary in sources[self.rel][self.name]['binaries']:
545 if binary in source_map_binaries[self.rel] and 'description' in source_map_binaries[self.rel][binary]:
546 self.description = source_map_binaries[self.rel][binary]["description"]
547 break
548
549 self.section = sources[rel][pkgname]['section']
550 self.version = version
551 self.binaries = binaries if binaries else []
552 self.cves = []
553
554 def add_cve(self, cve) -> None:
555 self.cves.append(cve)
556
557 def __str__(self) -> str:
558 return f"{self.name}/{self.rel}"
559
560 def __repr__(self):
561 return self.__str__()
562
563class OvalGeneratorPkg(OvalGenerator):
564 def __init__(self, release, release_name, cve_paths, packages, progress, pkg_cache, fixed_only=True, cve_cache=None, cve_prefix_dir=None, warn_method=False, outdir='./', prefix='', oval_format='dpkg') -> None:
565 super().__init__(release, release_name, warn_method, outdir, prefix, oval_format)
566 self.progress = progress
567 self.cve_cache = cve_cache
568 self.pkg_cache = pkg_cache
569 self.cve_paths = cve_paths
570 self.fixed_only = fixed_only
571 self.packages = self._load_pkgs(cve_prefix_dir, packages)
572
573 def _reset(self):
574 ###
575 # ID schema: 2204|00001|0001
576 # * The first four digits are the ubuntu release number
577 # * The next 5 digits is # just a package counter, we increase it for each definition
578 # * The last 4 digits is a counter for the criterion
579 ###
580 release_code = int(self.release_name.split(' ')[1].replace('.', '')) if self.release not in cve_lib.external_releases else 1111
581 self.definition_id = release_code * 10 ** 10
582 self.definition_step = 1 * 10 ** 5
583 self.criterion_step = 10
584 self.output_filepath = \
585 '{0}com.ubuntu.{1}.pkg.oval.xml'.format('oci.' if self.oval_format == 'oci' else '', self.release.replace('/', '_'))
586730
587 def _generate_advisory(self, package: Package) -> etree.Element:731 if self.progress:
588 advisory = etree.Element("advisory")732 print(f'[{i:5}/{len(cve_paths)}] Processing {cve_number:18}', end='\r')
589 rights = etree.SubElement(advisory, "rights")
590 component = etree.SubElement(advisory, "component")
591 version = etree.SubElement(advisory, "current_version")
592733
593 for cve in package.cves:734 if not cve_number in self.cve_cache:
594 if self.fixed_only and cve.pkg_rel_entries[package.name].status != 'fixed':735 self.cve_cache[cve_number] = cve_lib.load_cve(cve_path)
595 continue
596 cve_obj = self._generate_cve_tag(cve)
597 advisory.append(cve_obj)
598736
599 rights.text = f"Copyright (C) {datetime.now().year} Canonical Ltd."737 info = self.cve_cache[cve_number]
600 component.text = package.section738 cve_obj = CVE(cve_number, info)
601 version.text = package.version739 for pkg in info['pkgs']:
740 if packages_filter and pkg not in packages_filter:
741 continue
602742
603 return advisory743 for release in final_releases:
744 if pkg in sources[release] and release in info['pkgs'][pkg] and \
745 info['pkgs'][pkg][release][0] != 'DNE':
746 self._add_new_package(pkg, cve_obj, release, info, packages[release])
747 if cve_number not in cves[release]:
748 cves[release][cve_number] = cve_obj
604749
605 def _generate_metadata(self, package: Package) -> etree.Element:750 for release in final_releases:
606 metadata = etree.Element("metadata")751 packages[release] = dict(sorted(packages[release].items()))
607 title = etree.SubElement(metadata, "title")752 cves[release] = dict(sorted(cves[release].items()))
608 reference = self._generate_reference(package)
609 advisory = self._generate_advisory(package)
610 metadata.append(reference)
611 description = etree.SubElement(metadata, "description")
612 affected = etree.SubElement(metadata, "affected", attrib = {"family": "unix"})
613 platform = etree.SubElement(affected, "platform")
614 metadata.append(advisory)
615753
616 platform.text = self.release_name754 if self.progress:
617 title.text = package.name755 print(' ' * 40, end='\r')
618 description.text = package.description756 return packages, cves
619757
620 return metadata758 def _write_oval_xml(self, xml_tree: etree.ElementTree, root_element: etree.ElementTree) -> None:
759 if sys.version_info[0] >= 3 and sys.version_info[1] >= 9:
760 etree.indent(xml_tree, level=0) # indent only available from Python 3.9
761 xml_tree.write(os.path.join(self.output_dir, self.output_filepath))
762 else:
763 xmlstr = minidom.parseString(etree.tostring(root_element)).toprettyxml(indent=" ")
764 with open(os.path.join(self.output_dir, self.output_filepath), 'w') as file:
765 file.write(xmlstr)
621766
767 # Object generators
622 def _generate_criteria(self) -> etree.Element:768 def _generate_criteria(self) -> etree.Element:
623 criteria = etree.Element("criteria")769 criteria = etree.Element("criteria")
624 if self.oval_format == 'dpkg':770 if self.oval_format == 'dpkg':
@@ -629,38 +775,15 @@ class OvalGeneratorPkg(OvalGenerator):
629 extend_definition.set("applicability_check", "true")775 extend_definition.set("applicability_check", "true")
630776
631 return criteria777 return criteria
632778
633 def _generate_subcriteria(self, operator) -> etree.Element:779 def _generate_definition_object(self, object) -> etree.Element:
634 return etree.Element("criteria", attrib={
635 "operator": operator
636 })
637
638 def _generate_criterion_element(self, comment, id) -> etree.Element:
639 criterion = etree.Element("criterion", attrib={
640 "test_ref": f"{self.ns}:tst:{id}",
641 "comment": comment
642 })
643
644 return criterion
645
646 # Element generators
647 def _generate_reference(self, package) -> etree.Element:
648 reference = etree.Element("reference", attrib={
649 "source": "Package",
650 "ref_id": package.name,
651 "ref_url": f'https://launchpad.net/ubuntu/+source/{package.name}'
652 })
653
654 return reference
655
656 def _generate_definition_element(self, package) -> None:
657 id = f"{self.ns}:def:{self.definition_id}"780 id = f"{self.ns}:def:{self.definition_id}"
658 definition = etree.Element("definition")781 definition = etree.Element("definition")
659 definition.set("class", "vulnerability")782 definition.set("class", "vulnerability")
660 definition.set("id", id)783 definition.set("id", id)
661 definition.set("version", "1")784 definition.set("version", "1")
662785
663 metadata = self._generate_metadata(package)786 metadata = self._generate_metadata(object)
664 criteria = self._generate_criteria()787 criteria = self._generate_criteria()
665 definition.append(metadata)788 definition.append(metadata)
666 definition.append(criteria)789 definition.append(criteria)
@@ -692,7 +815,7 @@ class OvalGeneratorPkg(OvalGenerator):
692 cve_tag.set('usns', ','.join(cve.usns))815 cve_tag.set('usns', ','.join(cve.usns))
693816
694 return cve_tag817 return cve_tag
695818
696 def _generate_var_element(self, comment, id, binaries) -> etree.Element:819 def _generate_var_element(self, comment, id, binaries) -> etree.Element:
697 var = etree.Element("constant_variable",820 var = etree.Element("constant_variable",
698 attrib={821 attrib={
@@ -813,6 +936,72 @@ class OvalGeneratorPkg(OvalGenerator):
813936
814 return object937 return object
815938
939 def _generate_criterion_element(self, comment, id) -> etree.Element:
940 criterion = etree.Element("criterion", attrib={
941 "test_ref": f"{self.ns}:tst:{id}",
942 "comment": comment
943 })
944
945 return criterion
946
947 def _generate_vulnerable_elements(self, package, binaries, obj_id=None):
948 binary_keyword = 'binaries' if len(binaries) > 1 else 'binary'
949 test_note = f"Does the '{package.name}' package exist?"
950 object_note = f"The '{package.name}' package {binary_keyword}"
951
952 test = self._generate_test_element(test_note, self.definition_id, False, 'pkg', obj_id=obj_id)
953
954 if not obj_id:
955 object = self._generate_object_element(object_note, self.definition_id, self.definition_id)
956
957 if package.is_kernel_pkg:
958 regex = process_kernel_binaries(binaries, 'oci')
959 binaries = [f'{regex}']
960
961 final_binaries = []
962 if self.oval_format == 'oci':
963 variable_values = '(?::\w+|)\s+(.*)$'
964 for binary in binaries:
965 final_binaries.append(f'^{binary}{variable_values}')
966 else:
967 final_binaries = binaries
968
969 var = self._generate_var_element(object_note, self.definition_id, final_binaries)
970 else:
971 object = None
972 var = None
973 return test, object, var
974
975 def _generate_fixed_elements(self, package, binaries, version, obj_id=None):
976 binary_keyword = 'binaries' if len(binaries) > 1 else 'binary'
977 test_note = f"Does the '{package.name}' package exist and is the version less than '{version}'?"
978 object_note = f"The '{package.name}' package {binary_keyword}"
979 state_note = f"The package version is less than '{version}'"
980
981 test = self._generate_test_element(test_note, self.definition_id, True, 'pkg', obj_id=obj_id)
982 if not obj_id:
983 object = self._generate_object_element(object_note, self.definition_id, self.definition_id)
984
985 final_binaries = binaries
986 if self.oval_format == 'oci':
987 if package.is_kernel_pkg:
988 regex = process_kernel_binaries(binaries, 'oci')
989 final_binaries = [f'^{regex}(?::\w+|)\s+(.*)$']
990 else:
991 variable_values = '(?::\w+|)\s+(.*)$'
992
993 final_binaries = []
994 for binary in binaries:
995 final_binaries.append(f'^{binary}{variable_values}')
996
997 var = self._generate_var_element(object_note, self.definition_id, final_binaries)
998 else:
999 object = None
1000 var = None
1001 state = self._generate_state_element(state_note, self.definition_id, version)
1002
1003 return test, object, var, state
1004
816 # Running kernel element generators1005 # Running kernel element generators
817 def _add_running_kernel_checks(self, root_element):1006 def _add_running_kernel_checks(self, root_element):
818 objects = root_element.find("objects")1007 objects = root_element.find("objects")
@@ -890,6 +1079,11 @@ class OvalGeneratorPkg(OvalGenerator):
890 return test1079 return test
8911080
892 # Kernel elements generators1081 # Kernel elements generators
1082 def _generate_criteria_kernel(self, operator) -> etree.Element:
1083 return etree.Element("criteria", attrib={
1084 "operator": operator
1085 })
1086
893 def _generate_kernel_version_object_element(self, id, var_id) -> etree.Element:1087 def _generate_kernel_version_object_element(self, id, var_id) -> etree.Element:
894 object = etree.Element("ind-def:variable_object",1088 object = etree.Element("ind-def:variable_object",
895 attrib={1089 attrib={
@@ -924,12 +1118,12 @@ class OvalGeneratorPkg(OvalGenerator):
924 value.text = f"0:{patched}"1118 value.text = f"0:{patched}"
925 return state1119 return state
9261120
927 def _generate_kernel_package_elements(self, package: Package, root_element, running_kernel_check_id) -> etree.Element:1121 def _generate_kernel_package_elements(self, package: Package, binaries, root_element, running_kernel_check_id) -> etree.Element:
928 tests = root_element.find("tests")1122 tests = root_element.find("tests")
929 states = root_element.find("states")1123 states = root_element.find("states")
9301124
931 comment_running_kernel = f'Is kernel {package.name} running?'1125 comment_running_kernel = f'Is kernel {package.name} running?'
932 regex = process_kernel_binaries(package.binaries, self.oval_format)1126 regex = process_kernel_binaries(binaries, self.oval_format)
9331127
934 criterion_running_kernel = self._generate_criterion_element(comment_running_kernel, self.definition_id)1128 criterion_running_kernel = self._generate_criterion_element(comment_running_kernel, self.definition_id)
935 test_running_kernel = self._generate_test_element_running_kernel(self.definition_id, comment_running_kernel, running_kernel_check_id)1129 test_running_kernel = self._generate_test_element_running_kernel(self.definition_id, comment_running_kernel, running_kernel_check_id)
@@ -942,22 +1136,25 @@ class OvalGeneratorPkg(OvalGenerator):
9421136
943 return criterion_running_kernel1137 return criterion_running_kernel
9441138
945 def _add_fixed_kernel_elements(self, cve: CVE, package: Package, package_rel_entry:CVEPkgRelEntry, root_element, running_kernel_id, fixed_versions) -> etree.Element:1139 def _add_kernel_elements(self, cve: CVE, package: Package, version, package_rel_entry:CVEPkgRelEntry, root_element, running_kernel_id, fixed_versions) -> etree.Element:
946 tests = root_element.find("tests")1140 tests = root_element.find("tests")
947 objects = root_element.find("objects")1141 objects = root_element.find("objects")
948 states = root_element.find("states")1142 states = root_element.find("states")
9491143
950 comment_version = f'Kernel {package.name} version comparison'1144 comment_version = f'Kernel {package.name} version comparison'
951 comment_criterion = f'({cve.number}) {package.name} {package_rel_entry.note}'1145 comment_criterion = ''
1146 if self.generator_type == 'pkg':
1147 comment_criterion = f'({cve.number}) '
1148 comment_criterion = comment_criterion + f'{package.name}{package_rel_entry.note}'
9521149
953 if package_rel_entry.fixed_version in fixed_versions:1150 if version in fixed_versions:
954 criterion_version = self._generate_criterion_element(comment_criterion, fixed_versions[package_rel_entry.fixed_version])1151 criterion_version = self._generate_criterion_element(comment_criterion, fixed_versions[version])
955 else:1152 else:
956 create_state = False1153 create_state = False
9571154
958 if package_rel_entry.fixed_version:1155 if version:
959 create_state = True1156 create_state = True
960 ste_kernel_version = self._generate_state_kernel_element("Kernel check", self.definition_id, package_rel_entry.fixed_version)1157 ste_kernel_version = self._generate_state_kernel_element("Kernel check", self.definition_id, version)
961 states.append(ste_kernel_version)1158 states.append(ste_kernel_version)
9621159
963 obj_kernel_version = self._generate_kernel_version_object_element(self.definition_id, running_kernel_id)1160 obj_kernel_version = self._generate_kernel_version_object_element(self.definition_id, running_kernel_id)
@@ -969,7 +1166,7 @@ class OvalGeneratorPkg(OvalGenerator):
969 tests.append(test_kernel_version)1166 tests.append(test_kernel_version)
970 objects.append(obj_kernel_version)1167 objects.append(obj_kernel_version)
9711168
972 fixed_versions[package_rel_entry.fixed_version] = self.definition_id1169 fixed_versions[version] = self.definition_id
9731170
974 return criterion_version1171 return criterion_version
9751172
@@ -977,8 +1174,10 @@ class OvalGeneratorPkg(OvalGenerator):
977 def _increase_id(self, is_definition):1174 def _increase_id(self, is_definition):
978 if is_definition:1175 if is_definition:
979 self.definition_id += self.definition_step1176 self.definition_id += self.definition_step
980 clean_value = self.definition_step / 101177 # Ugly hack, Python doesn't like operating big numbers
981 self.definition_id = int(int(self.definition_id / clean_value) * clean_value)1178 criterion_appendix_length = len(str(self.definition_step)) - 1
1179 self.definition_id = int(str(self.definition_id)[: -1 * criterion_appendix_length])
1180 self.definition_id = int(self.definition_id * self.definition_step)
982 else:1181 else:
983 self.definition_id += self.criterion_step1182 self.definition_id += self.criterion_step
9841183
@@ -994,99 +1193,177 @@ class OvalGeneratorPkg(OvalGenerator):
994 criteria.append(element)1193 criteria.append(element)
9951194
996 def _add_criterion(self, id, package_entry, cve, definition, depth=2) -> None:1195 def _add_criterion(self, id, package_entry, cve, definition, depth=2) -> None:
997 criterion_note = f'({cve.number}) {package_entry.pkg.name}{package_entry.note}'1196 criterion_note = f'({cve.number}) ' if self.generator_type == 'pkg' else ''
1197 criterion_note += f'{package_entry.pkg.name}{package_entry.note}'
998 criterion = self._generate_criterion_element(criterion_note, id)1198 criterion = self._generate_criterion_element(criterion_note, id)
999 self._add_to_criteria(definition, criterion, depth)1199 self._add_to_criteria(definition, criterion, depth)
10001200
1001 def _generate_elements(self, package, binaries, pkg_rel_entry, obj_id=None):1201 def _generate_elements(self, package, binaries, version, pkg_rel_entry, obj_id=None):
1002 create_state = False1202 create_state = False
1003 state = None1203 state = None
1004 var = None1204 var = None
1005 obj = None1205 obj = None
1006 binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary'1206 binary_keyword = 'binaries' if len(binaries) > 1 else 'binary'
1007 object_note = f"The '{package.name}' package {binary_keyword}"1207 object_note = f"The '{package.name}' package {binary_keyword}"
1008 test_note = ""1208 test_note = ""
10091209
1210 final_binaries = binaries
1010 if self.oval_format == 'oci':1211 if self.oval_format == 'oci':
1011 if is_kernel_binaries(package.binaries):1212 if package.is_kernel_pkg:
1012 regex = process_kernel_binaries(package.binaries, 'oci')1213 regex = process_kernel_binaries(binaries, 'oci')
1013 binaries = [f'^{regex}(?::\w+|)\s+(.*)$']1214 final_binaries = [f'^{regex}(?::\w+|)\s+(.*)$']
1014 else:1215 else:
1015 variable_values = '(?::\w+|)\s+(.*)$'1216 variable_values = '(?::\w+|)\s+(.*)$'
10161217
1017 binaries = []1218 final_binaries = []
1018 for binary in package.binaries:1219 for binary in binaries:
1019 binaries.append(f'^{binary}{variable_values}')1220 final_binaries.append(f'^{binary}{variable_values}')
10201221
1021 if pkg_rel_entry.status == 'vulnerable':1222 if pkg_rel_entry.status == 'vulnerable':
1022 test_note = f"Does the '{package.name}' package exist?"1223 test_note = f"Does the '{package.name}' package exist?"
1023 elif pkg_rel_entry.status == 'fixed':1224 elif pkg_rel_entry.status == 'fixed':
1024 test_note = f"Does the '{package.name}' package exist and is the version less than '{pkg_rel_entry.fixed_version}'?"1225 test_note = f"Does the '{package.name}' package exist and is the version less than '{version}'?"
1025 state_note = f"The package version is less than '{pkg_rel_entry.fixed_version}'"1226 state_note = f"The package version is less than '{version}'"
10261227
1027 state = self._generate_state_element(state_note, self.definition_id, pkg_rel_entry.fixed_version)1228 state = self._generate_state_element(state_note, self.definition_id, version)
1028 create_state = True1229 create_state = True
10291230
1030 if not obj_id or create_state:1231 if not obj_id:
1031 obj_id = None1232 var = self._generate_var_element(object_note, self.definition_id, final_binaries)
1032
1033 var = self._generate_var_element(object_note, self.definition_id, binaries)
1034
1035 obj = self._generate_object_element(object_note, self.definition_id, self.definition_id)1233 obj = self._generate_object_element(object_note, self.definition_id, self.definition_id)
10361234
1037 test = self._generate_test_element(test_note, self.definition_id, create_state, 'pkg', obj_id=obj_id)1235 test = self._generate_test_element(test_note, self.definition_id, create_state, 'pkg', obj_id=obj_id)
10381236
1039 return test, obj, var, state1237 return test, obj, var, state
10401238
1041 def _populate_pkg(self, package, root_element):1239 # returns True if we should ignore this source package; primarily used
1042 tests = root_element.find("tests")1240 # for -edge kernels
1043 objects = root_element.find("objects")1241 def _ignore_source_package(self, source):
1044 variables = root_element.find("variables")1242 if re.match('linux-.*-edge$', source):
1045 states = root_element.find("states")1243 return True
1244 if re.match('linux-riscv.*$', source):
1245 # linux-riscv.* currently causes a lot of false positives, skip
1246 # it altogether while we don't land a better fix
1247 return True
1248 return False
10461249
1047 # Add package definition
1048 definitions = root_element.find("definitions")
1049 definition_element = self._generate_definition_element(package)
10501250
1051 # Control/cache variables1251class OvalGeneratorPkg(OvalGenerator):
1052 one_time_added_id = None1252 def __init__(self, releases, cve_paths, packages, progress, pkg_cache, fixed_only=True, cve_cache=None, cve_prefix_dir=None, outdir='./', oval_format='dpkg') -> None:
1053 fixed_versions = {}1253 super().__init__('pkg', releases, cve_paths, packages, progress, pkg_cache, fixed_only, cve_cache, cve_prefix_dir, outdir, oval_format)
1054 binaries_id = None
1055 cve_added = False
10561254
1057 #criteria = None1255 def _generate_advisory(self, package: Package) -> etree.Element:
1058 #if len(package.binaries) > 1:1256 advisory = etree.Element("advisory")
1059 # criteria = self._generate_subcriteria('AND')1257 rights = etree.SubElement(advisory, "rights")
1258 component = etree.SubElement(advisory, "component")
1259 version = etree.SubElement(advisory, "current_version")
10601260
1061 for cve in package.cves:1261 for cve in package.cves:
1062 pkg_rel_entry = cve.pkg_rel_entries[package.name]1262 if self.fixed_only and cve.pkg_rel_entries[str(package)].status != 'fixed':
1063 for key in sorted(list(package.binaries)):1263 continue
1064 binaries = package.binaries[key]1264 elif cve.pkg_rel_entries[str(package)].is_not_applicable():
1065 if pkg_rel_entry.fixed_version:1265 continue
1066 if pkg_rel_entry.fixed_version in fixed_versions:1266 cve_obj = self._generate_cve_tag(cve)
1067 self._add_test_ref_to_cve_tag(fixed_versions[pkg_rel_entry.fixed_version], cve, definition_element)1267 advisory.append(cve_obj)
1068 self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, definition_element)
1069 continue
1070 else:
1071 self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element)
1072 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
1073 fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id
1074 elif one_time_added_id:
1075 self._add_test_ref_to_cve_tag(one_time_added_id, cve, definition_element)
1076 self._add_criterion(one_time_added_id, pkg_rel_entry, cve, definition_element)
1077 continue
1078 else:
1079 self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element)
1080 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
1081 one_time_added_id = self.definition_id
10821268
1083 test, obj, var, state = self._generate_elements(package, binaries, pkg_rel_entry, binaries_id)1269 rights.text = f"Copyright (C) {datetime.now().year} Canonical Ltd."
1270 component.text = package.section
1271 version.text = package.get_latest_version()
1272
1273 return advisory
1274
1275 def _generate_metadata(self, package: Package) -> etree.Element:
1276 metadata = etree.Element("metadata")
1277 title = etree.SubElement(metadata, "title")
1278 reference = self._generate_reference(package)
1279 advisory = self._generate_advisory(package)
1280 metadata.append(reference)
1281 description = etree.SubElement(metadata, "description")
1282 affected = etree.SubElement(metadata, "affected", attrib = {"family": "unix"})
1283 platform = etree.SubElement(affected, "platform")
1284 metadata.append(advisory)
1285
1286 platform.text = self.release_name
1287 title.text = package.name
1288 description.text = package.description
1289
1290 return metadata
1291
1292 # Element generators
1293 def _generate_reference(self, package) -> etree.Element:
1294 reference = etree.Element("reference", attrib={
1295 "source": "Package",
1296 "ref_id": package.name,
1297 "ref_url": f'https://launchpad.net/ubuntu/+source/{package.name}'
1298 })
1299
1300 return reference
1301
1302 def _populate_pkg(self, package, root_element):
1303 tests = root_element.find("tests")
1304 objects = root_element.find("objects")
1305 variables = root_element.find("variables")
1306 states = root_element.find("states")
1307
1308 # Add package definition
1309 definitions = root_element.find("definitions")
1310 definition_element = self._generate_definition_object(package)
1311
1312 # Control/cache variables
1313 one_time_added_id = None
1314 fixed_versions = {}
1315 binaries_ids = {}
1316 cve_added = False
1317
1318 #criteria = None
1319 #if len(package.binaries) > 1:
1320 # criteria = self._generate_subcriteria('AND')
1321
1322 for cve in package.cves:
1323 if self.fixed_only and cve.pkg_rel_entries[str(package)].status != 'fixed':
1324 continue
1325 pkg_rel_entry = cve.pkg_rel_entries[str(package)]
1326 if pkg_rel_entry.is_not_applicable(): continue
1327 source_version = package.get_version_to_check(pkg_rel_entry.fixed_version)
1328 for binary_version in package.get_binary_versions(source_version):
1329 binaries = package.get_binaries(source_version, binary_version)
1330
1331 # For released / not affected (version) CVEs
1332 if pkg_rel_entry.fixed_version:
1333 if binary_version in fixed_versions:
1334 self._add_test_ref_to_cve_tag(fixed_versions[binary_version], cve, definition_element)
1335 self._add_criterion(fixed_versions[binary_version], pkg_rel_entry, cve, definition_element)
1336 continue
1337 else:
1338 self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element)
1339 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
1340 fixed_versions[binary_version] = self.definition_id
1341 # For not fixed CVEs, we already added one for this package
1342 elif one_time_added_id:
1343 self._add_test_ref_to_cve_tag(one_time_added_id, cve, definition_element)
1344 self._add_criterion(one_time_added_id, pkg_rel_entry, cve, definition_element)
1345 continue
1346 # For not fixed CVEs, only need to add it once per package
1347 else:
1348 self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element)
1349 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
1350 one_time_added_id = self.definition_id
1351
1352 # If version doesn't exist and only one binary_version, they all have the same binaries
1353 if not package.version_exists(source_version) and package.all_binaries_same_version:
1354 binary_id_version = GENERIC_VERSION
1355 else:
1356 binary_id_version = binary_version
1357
1358 binaries_ids.setdefault(binary_id_version, None)
1359 binaries_id = binaries_ids[binary_id_version]
1360 test, obj, var, state = self._generate_elements(package, binaries, binary_version, pkg_rel_entry, binaries_id)
10841361
1085 if state:1362 if state:
1086 states.append(state)1363 states.append(state)
10871364
1088 if obj and var:1365 if obj and var:
1089 binaries_id = self.definition_id1366 binaries_ids[binary_id_version] = self.definition_id
1090 variables.append(var)1367 variables.append(var)
1091 objects.append(obj)1368 objects.append(obj)
10921369
@@ -1099,29 +1376,39 @@ class OvalGeneratorPkg(OvalGenerator):
10991376
1100 self._increase_id(is_definition=True)1377 self._increase_id(is_definition=True)
11011378
1102 def _populate_kernel_pkg(self, package, root_element, running_kernel_id):1379 def _populate_kernel_pkg(self, package, root_element, running_kernel_id):
1380 for cve in package.cves:
1381 pkg_rel_entry = cve.pkg_rel_entries[str(package)]
1382 version_to_check = package.get_version_to_check(pkg_rel_entry.fixed_version)
1383 for binary_version in package.get_binary_versions(version_to_check):
1384 binaries = package.get_binaries(version_to_check, binary_version)
1103 # Add package definition1385 # Add package definition
1104 definitions = root_element.find("definitions")1386 definitions = root_element.find("definitions")
1105 definition_element = self._generate_definition_element(package)1387 definition_element = self._generate_definition_object(package)
11061388
1107 # Control/cache variables1389 # Control/cache variables
1108 fixed_versions = {}1390 fixed_versions = {}
1109 cve_added = False1391 cve_added = False
11101392
1393 # Kernel binaries have all same version
1394 version = package.get_latest_version()
1395 binaries = package.get_binaries(version, version)
1396
1111 # Generate one-time elements1397 # Generate one-time elements
1112 kernel_criterion = self._generate_kernel_package_elements(package, root_element, running_kernel_id)1398 kernel_criterion = self._generate_kernel_package_elements(package, binaries, root_element, running_kernel_id)
1113 criteria = self._generate_subcriteria('OR')1399 criteria = self._generate_criteria_kernel('OR')
11141400
1115 self._add_to_criteria(definition_element, kernel_criterion, operator='AND')1401 self._add_to_criteria(definition_element, kernel_criterion, operator='AND')
1116 self._add_to_criteria(definition_element, criteria, operator='AND')1402 self._add_to_criteria(definition_element, criteria, operator='AND')
11171403
1118 for cve in package.cves:1404 for cve in package.cves:
1119 pkg_rel_entry = cve.pkg_rel_entries[package.name]1405 pkg_rel_entry = cve.pkg_rel_entries[str(package)]
1406 if pkg_rel_entry.is_not_applicable(): continue
1120 cve_added = True1407 cve_added = True
11211408
1122 self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element)1409 self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element)
11231410
1124 kernel_version_criterion = self._add_fixed_kernel_elements(cve, package, pkg_rel_entry, root_element, running_kernel_id, fixed_versions)1411 kernel_version_criterion = self._add_kernel_elements(cve, package, pkg_rel_entry.fixed_version, pkg_rel_entry, root_element, running_kernel_id, fixed_versions)
1125 self._add_to_criteria(definition_element, kernel_version_criterion, depth=3)1412 self._add_to_criteria(definition_element, kernel_version_criterion, depth=3)
1126 self._increase_id(is_definition=False)1413 self._increase_id(is_definition=False)
11271414
@@ -1129,881 +1416,447 @@ class OvalGeneratorPkg(OvalGenerator):
1129 definitions.append(definition_element)1416 definitions.append(definition_element)
1130 self._increase_id(is_definition=True)1417 self._increase_id(is_definition=True)
11311418
1132 def _add_new_package(self, package_name, cve, release, cve_data, packages) -> None:
1133 if package_name not in packages:
1134 _, version, binaries = get_binarypkgs(self.pkg_cache, package_name, release)
1135
1136 pkg_obj = Package(package_name, release, binaries, version)
1137 packages[package_name] = pkg_obj
1138
1139 pkg_obj = packages[package_name]
1140 cve_pkg_entry = CVEPkgRelEntry(pkg_obj, release, cve, cve_data['pkgs'][package_name][release][0], cve_data['pkgs'][package_name][release][1], self.pkg_cache)
1141
1142 if cve_pkg_entry.status != 'fixed' and self.fixed_only:
1143 return
1144
1145 cve.add_pkg(pkg_obj, cve_pkg_entry)
1146
1147 def _load_pkgs(self, cve_prefix_dir, packages_filter=None) -> None:
1148 cve_lib.load_external_subprojects()
1149
1150 cves = []
1151 for pathname in self.cve_paths:
1152 cves = cves + glob.glob(os.path.join(cve_prefix_dir, pathname))
1153
1154 cves.sort(key=lambda cve:
1155 (int(cve.split('/')[-1].split('-')[1]), int(cve.split('/')[-1].split('-')[2])) \
1156 if cve.split('/')[-1].split('-')[2].isnumeric() \
1157 else (int(cve.split('/')[-1].split('-')[1]), 0)
1158 )
1159
1160 packages = {}
1161 releases = [self.release]
1162 current_release = self.release
1163 while(cve_lib.release_parent(current_release)):
1164 current_release = cve_lib.release_parent(current_release)
1165 releases.append(current_release)
1166
1167 for release in releases:
1168 sources[release] = load(releases=[release], skip_eol_releases=False)[release]
1169
1170 orig_name = cve_lib.get_orig_rel_name(release)
1171 if '/' in orig_name:
1172 orig_name = orig_name.split('/', maxsplit=1)[1]
1173 source_map_binaries[release] = load(data_type='packages',releases=[orig_name], skip_eol_releases=False)[orig_name] \
1174 if release not in cve_lib.external_releases else {}
1175
1176 i = 0
1177 for cve_path in cves:
1178 cve_number = cve_path.rsplit('/', 1)[1]
1179 i += 1
1180
1181 if self.progress:
1182 print(f'[{i:5}/{len(cves)}] Processing {cve_number:18}', end='\r')
1183
1184 if not cve_number in self.cve_cache:
1185 self.cve_cache[cve_number] = cve_lib.load_cve(cve_path)
1186
1187 info = self.cve_cache[cve_number]
1188 cve_obj = CVE(cve_number, info)
1189
1190 for pkg in info['pkgs']:
1191 if packages_filter and pkg not in packages_filter:
1192 continue
1193
1194 for release in releases:
1195 if pkg in sources[release] and release in info['pkgs'][pkg] and \
1196 info['pkgs'][pkg][release][0] != 'DNE':
1197 self._add_new_package(pkg, cve_obj, release, info, packages)
1198 break
1199
1200 packages = dict(sorted(packages.items()))
1201 if self.progress:
1202 print(' ' * 40, end='\r')
1203 return packages
1204
1205 def generate_oval(self) -> None:1419 def generate_oval(self) -> None:
1206 self._reset()1420 for release in self.releases:
1207 xml_tree, root_element = self._get_root_element("Package")1421 self._init_ids(release)
1208 self._add_structure(root_element)1422 xml_tree, root_element = self._get_root_element()
1423 generator = self._get_generator("Package")
1424 root_element.append(generator)
1425 self._add_structure(root_element)
12091426
1210 if self.oval_format == 'dpkg':1427 if self.oval_format == 'dpkg':
1211 # One time kernel check1428 # One time kernel check
1212 self._add_release_checks(root_element)1429 self._add_release_checks(root_element)
1213 self._add_running_kernel_checks(root_element)1430 self._add_running_kernel_checks(root_element)
1214 running_kernel_id = self.definition_id1431 running_kernel_id = self.definition_id
1215 self._increase_id(is_definition=True)1432 self._increase_id(is_definition=True)
1433
1434 all_pkgs = dict()
1435 for parent_release in list(self.parent_releases)[::-1]:
1436 all_pkgs.update(self.packages[parent_release])
1437
1438 all_pkgs.update(self.packages[self.release])
1439
1440 for pkg in all_pkgs:
1441 if self._ignore_source_package(pkg): continue
1442 if not all_pkgs[pkg].versions_binaries: continue
1443 if not all_pkgs[pkg].get_binary_versions(next(iter(all_pkgs[pkg].versions_binaries))): continue
1444 if all_pkgs[pkg].is_kernel_pkg and self.oval_format != 'oci':
1445 self._populate_kernel_pkg(all_pkgs[pkg], root_element, running_kernel_id)
1446 else:
1447 self._populate_pkg(all_pkgs[pkg], root_element)
12161448
1217 for pkg in self.packages:1449 self._write_oval_xml(xml_tree, root_element)
1218 if len(self.packages[pkg].binaries) == 0:
1219 continue
12201450
1221 if is_kernel_binaries(self.packages[pkg].binaries) and self.oval_format != 'oci':1451class OvalGeneratorCVE(OvalGenerator):
1222 self._populate_kernel_pkg(self.packages[pkg], root_element, running_kernel_id)1452 def __init__(self, releases, cve_paths, packages, progress, pkg_cache, fixed_only=True, cve_cache=None, cve_prefix_dir=None, outdir='./', oval_format='dpkg') -> None:
1223 else:1453 super().__init__('cve', releases, cve_paths, packages, progress, pkg_cache, fixed_only, cve_cache, cve_prefix_dir, outdir, oval_format)
1224 self._populate_pkg(self.packages[pkg], root_element)
12251454
1226 #etree.indent(xml_tree, level=0) -> only available from Python 3.91455 # For CVE OVAL, the definition ID is generated
1227 xmlstr = minidom.parseString(etree.tostring(root_element)).toprettyxml(indent=" ")1456 # from the CVE ID
1457 def _set_definition_id(self, cve_id):
1458 self.definition_id = int(re.sub('[^0-9]', '', cve_id)) * self.definition_step
12281459
1229 with open(os.path.join(self.output_dir, self.output_filepath), 'w') as file:1460 def _generate_advisory(self, cve: CVE) -> etree.Element:
1230 file.write(xmlstr)1461 advisory = etree.Element("advisory")
1231 #xml_tree.write(os.path.join(self.output_dir, self.output_filepath))1462 severity = etree.SubElement(advisory, "severity")
1232 return1463 rights = etree.SubElement(advisory, "rights")
1464 public_date = etree.SubElement(advisory, "public_date")
12331465
1234class OvalGeneratorCVE:1466 if cve.public_date_at_usn:
1235 supported_oval_elements = ('definition', 'test', 'object', 'state',1467 public_date_at_usn = etree.SubElement(advisory, "public_date_at_usn")
1236 'variable')1468 public_date_at_usn.text = cve.public_date_at_usn
1237 generator_version = '1.1'
1238 oval_schema_version = '5.11.1'
12391469
1240 def __init__(self, release, release_name, parent, warn_method=False, outdir='./', prefix='', oval_format='dpkg'):1470 if cve.assigned_to:
1241 """ constructor, set defaults for instances """1471 assigned_to = etree.SubElement(advisory, "assigned_to")
1472 assigned_to.text = cve.assigned_to
1473
1474 if cve.discoverd_by:
1475 discoverd_by = etree.SubElement(advisory, "discoverd_by")
1476 discoverd_by.text = cve.discoverd_by
12421477
1243 self.release = release1478 for bug in cve.bugs:
1244 # e.g. codename for trusty/esm should be trusty1479 element = etree.SubElement(advisory, 'bug')
1245 self.release_codename = cve_lib.release_progenitor(release) if cve_lib.release_progenitor(release) else self.release.replace('/', '_')1480 element.text = bug
1246 self.release_name = release_name
1247 self.warn = warn_method or self.warn
1248 self.tmpdir = tempfile.mkdtemp(prefix='oval_lib-')
1249 self.output_dir = outdir
1250 self.oval_format = oval_format
1251 self.output_filepath = \
1252 '{0}com.ubuntu.{1}.cve.oval.xml'.format(prefix, self.release.replace('/', '_'))
1253 self.ns = 'oval:com.ubuntu.{0}'.format(self.release_codename)
1254 self.id = 10
1255 self.release_applicability_definition_id = '{0}:def:{1}0'.format(self.ns, self.id)
12561481
1257 def __del__(self):1482 for usn in cve.usns:
1258 """ deconstructor, clean up """1483 element = etree.SubElement(advisory, 'ref')
1259 if os.path.exists(self.tmpdir):1484 element.text = f'https://ubuntu.com/security/notices/USN-{usn}'
1260 recursive_rm(self.tmpdir)
1261
1262 def generate_cve_definition(self, cve):
1263 """ generate an OVAL definition based on parsed CVE data """
1264
1265 header = cve['header']
1266 # if the multiplier is not large enough, the tests IDs will
1267 # overlap on things with large numbers of binary packages.
1268 # if we ever have an issue that touches more than 1,000,000
1269 # binary packages, that will cause a problem.
1270 id_base = int(re.sub('[^0-9]', '', header['Candidate'])) * 1000000
1271 if not self.unique_id_base(id_base, header['Source-note']):
1272 self.warn('Calculated id_base "{0}" based on candidate value "{1}" is not unique. Skipping CVE.'.format(id_base, header['Candidate']))
1273
1274 instruction = ""
1275 # make test(s) for each package
1276 test_refs = []
1277 packages = cve['packages']
1278 for package in sorted(packages.keys()):
1279 if self.release in packages[package]['Releases']:
1280 release_status = packages[package]['Releases'][self.release]
1281 if 'bin-pkgs' in release_status and release_status['bin-pkgs']:
1282 for key in sorted(list(release_status['bin-pkgs'])):
1283 pkg = {
1284 'name': package,
1285 'binaries': release_status['bin-pkgs'][key],
1286 'status': release_status['status'],
1287 'note': release_status['note'],
1288 'fix-version': release_status['fix-version'] if 'fix-version' in release_status else '',
1289 'id_base': id_base + len(test_refs),
1290 'source-note': header['Source-note']
1291 }
1292 if is_kernel_binaries(pkg['binaries']):
1293 test_ref = self.get_running_kernel_testref(pkg)
1294 if test_ref:
1295 test_refs = test_refs + test_ref
1296 pkg['id_base'] = id_base + 1
1297 else:
1298 test_ref = self.get_oval_test_for_package(pkg)
1299 if test_ref:
1300 test_refs.append(test_ref)
1301 # prepare update instructions if package is fixed
1302 if pkg['status'] == 'fixed':
1303 if 'parent' in release_status:
1304 product_description = cve_lib.get_subproject_description(release_status['parent'])
1305 else:
1306 product_description = cve_lib.get_subproject_description(self.release)
1307 instruction = prepare_instructions(instruction, header['Candidate'], product_description, pkg)
1308
1309
1310
1311 # if no packages for this release, then we're done
1312 if not len(test_refs):
1313 return False
1314
1315 # convert CVE data to OVAL definition metadata
1316 mapping = {
1317 'ns': escape(self.ns),
1318 'id_base': id_base,
1319 'codename': escape(self.release_codename),
1320 'release_name': escape(self.release_name),
1321 'applicability_def_id': escape(
1322 self.release_applicability_definition_id),
1323 'cve_title': escape(header['Candidate']),
1324 'description': escape('{0} {1}'.format(header['Description'],
1325 header['Ubuntu-Description']).strip() + instruction),
1326 'priority': escape(header['Priority']),
1327 'criteria': '',
1328 'references': '',
1329 'notes': ''
1330 }
13311485
1332 # convert test_refs to criteria1486 advisory.append(self._generate_cve_tag(cve))
1333 if len(test_refs) == 1:1487 rights.text = f"Copyright (C) {cve.public_date.split('-', 1)[0]} Canonical Ltd."
1334 negation_attribute = 'negate = "true" ' \1488 severity.text = cve.priority.capitalize()
1335 if 'negate' in test_refs[0] and test_refs[0]['negate'] else ''1489 public_date.text = cve.public_date
1336 mapping['criteria'] = \
1337 '<criterion test_ref="{0}" comment="{1}" {2}/>'.format(
1338 test_refs[0]['id'], escape(test_refs[0]['comment']), negation_attribute)
1339 else:
1340 criteria = []
1341 criteria.append('<criteria operator="OR">')
1342 for test_ref in test_refs:
1343 if 'kernel' in test_ref:
1344 criteria.append(' <criteria operator="AND">')
1345 negation_attribute = 'negate = "true" ' \
1346 if 'negate' in test_ref and test_ref['negate'] else ''
1347 criteria.append(
1348 ' <criterion test_ref="{0}" comment="{1}" {2}/>'.format(
1349 test_ref['id'],
1350 escape(test_ref['comment']), negation_attribute))
1351 elif 'kernelobj' in test_ref:
1352 criteria.append(
1353 ' <criterion test_ref="{0}" comment="{1}" {2}/>'.format(
1354 test_ref['id'],
1355 escape(test_ref['comment']), negation_attribute))
1356 criteria.append(' </criteria>')
1357 else:
1358 negation_attribute = 'negate = "true" ' \
1359 if 'negate' in test_ref and test_ref['negate'] else ''
1360 criteria.append(
1361 ' <criterion test_ref="{0}" comment="{1}" {2}/>'.format(
1362 test_ref['id'],
1363 escape(test_ref['comment']), negation_attribute))
1364 criteria.append('</criteria>')
1365 mapping['criteria'] = '\n '.join(criteria)
1366
1367 # convert notes
1368 if header['Notes']:
1369 mapping['notes'] = '\n <oval:notes>' + \
1370 '\n <oval:note>{0}</oval:note>'.format(escape(header['Notes'])) + \
1371 '\n </oval:notes>'
1372
1373 # convert additional data <advisory> metadata elements
1374 advisory = []
1375 advisory.append('<severity>{0}</severity>'.format(
1376 escape(header['Priority'].title())))
1377 advisory.append(
1378 '<rights>Copyright (C) {0}Canonical Ltd.</rights>'.format(escape(
1379 header['PublicDate'].split('-', 1)[0] + ' '
1380 if header['PublicDate'] else '')))
1381 if header['PublicDate']:
1382 advisory.append('<public_date>{0}</public_date>'.format(
1383 escape(header['PublicDate'])))
1384 if header['PublicDateAtUSN']:
1385 advisory.append(
1386 '<public_date_at_usn>{0}</public_date_at_usn>'.format(escape(
1387 header['PublicDateAtUSN'])))
1388 if header['Assigned-to']:
1389 advisory.append('<assigned_to>{0}</assigned_to>'.format(escape(
1390 header['Assigned-to'])))
1391 if header['Discovered-by']:
1392 advisory.append('<discovered_by>{0}</discovered_by>'.format(escape(
1393 header['Discovered-by'])))
1394 if header['CRD']:
1395 advisory.append('<crd>{0}</crd>'.format(escape(header['CRD'])))
1396 for bug in header['Bugs']:
1397 advisory.append('<bug>{0}</bug>'.format(escape(bug)))
1398 for ref in header['References']:
1399 if ref.startswith('https://cve.mitre'):
1400 cve_title = ref.split('=')[-1].strip()
1401 if not cve_title:
1402 continue
1403 mapping['cve_title'] = escape(cve_title)
1404 mapping['references'] = '\n <reference source="CVE" ref_id="{0}" ref_url="{1}" />'.format(mapping['cve_title'], escape(ref))
14051490
1406 cve_ref = generate_cve_tag(header)1491 return advisory
1407 advisory.append(cve_ref)
1408 mapping['advisory_elements'] = '\n '.join(advisory)
14091492
1410 if self.oval_format == 'dpkg':1493 def _generate_metadata(self, cve: CVE) -> etree.Element:
1411 mapping['os_release_check'] = """<extend_definition definition_ref="{applicability_def_id}" comment="{release_name} ({codename}) is installed." applicability_check="true" />""".format(**mapping)1494 metadata = etree.Element("metadata")
1412 else:1495 title = etree.SubElement(metadata, "title")
1413 mapping['os_release_check'] = ''1496 reference = self._generate_reference(cve)
14141497 advisory = self._generate_advisory(cve)
1415 self.queue_element('definition', """1498 description = etree.SubElement(metadata, "description")
1416 <definition class="vulnerability" id="{ns}:def:{id_base}0" version="1">1499 affected = etree.SubElement(metadata, "affected", attrib = {"family": "unix"})
1417 <metadata>1500 platform = etree.SubElement(affected, "platform")
1418 <title>{cve_title} on {release_name} ({codename}) - {priority}.</title>1501 metadata.append(reference)
1419 <description>{description}</description>1502 metadata.append(advisory)
1420 <affected family="unix">
1421 <platform>{release_name}</platform>
1422 </affected>{references}
1423 <advisory>
1424 {advisory_elements}
1425 </advisory>
1426 </metadata>{notes}
1427 <criteria>
1428 {os_release_check}
1429 {criteria}
1430 </criteria>
1431 </definition>\n""".format(**mapping))
14321503
1433 # TODO: xml lib1504 platform.text = self.release_name
1434 def add_release_applicability_definition(self):1505 title.text = f'{cve.number} on {self.release_name} ({self.release_codename}) - {cve.priority}'
1435 """ add platform/release applicability OVAL definition for codename """1506 description.text = cve.description.replace('\n','')
14361507
1437 mapping = {1508 return metadata
1438 'ns': self.ns,
1439 'id_base': self.id,
1440 'codename': self.release_codename,
1441 'release_name': self.release_name,
1442 }
1443 self.release_applicability_definition_id = \
1444 '{ns}:def:{id_base}0'.format(**mapping)
14451509
1446 if self.oval_format == 'dpkg':1510 # Element generators
1447 self.queue_element('definition', """1511 def _generate_reference(self, cve: CVE) -> etree.Element:
1448 <definition class="inventory" id="{ns}:def:{id_base}0" version="1">1512 reference = etree.Element("reference", attrib={
1449 <metadata>1513 "source": "CVE",
1450 <title>Check that {release_name} ({codename}) is installed.</title>1514 "ref_id": cve.number,
1451 <description></description>1515 "ref_url": f'https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve.number}'
1452 </metadata>1516 })
1453 <criteria>
1454 <criterion test_ref="{ns}:tst:{id_base}0" comment="The host is part of the unix family." />
1455 <criterion test_ref="{ns}:tst:{id_base}1" comment="The host is running Ubuntu {codename}." />
1456 </criteria>
1457 </definition>\n""".format(**mapping))
1458
1459 self.queue_element('test', """
1460 <ind-def:family_test id="{ns}:tst:{id_base}0" check="at least one" check_existence="at_least_one_exists" version="1" comment="Is the host part of the unix family?">
1461 <ind-def:object object_ref="{ns}:obj:{id_base}0"/>
1462 <ind-def:state state_ref="{ns}:ste:{id_base}0"/>
1463 </ind-def:family_test>
1464
1465 <ind-def:textfilecontent54_test id="{ns}:tst:{id_base}1" check="at least one" check_existence="at_least_one_exists" version="1" comment="Is the host running Ubuntu {codename}?">
1466 <ind-def:object object_ref="{ns}:obj:{id_base}1"/>
1467 <ind-def:state state_ref="{ns}:ste:{id_base}1"/>
1468 </ind-def:textfilecontent54_test>\n""".format(**mapping))
1469
1470 # /etc/lsb-release has to be a single path, due to some
1471 # environments (namely snaps) not being allowed to list the
1472 # content of /etc/
1473 self.queue_element('object', """
1474 <ind-def:family_object id="{ns}:obj:{id_base}0" version="1" comment="The singleton family object."/>
1475
1476 <ind-def:textfilecontent54_object id="{ns}:obj:{id_base}1" version="1" comment="The singleton release codename object.">
1477 <ind-def:filepath>/etc/lsb-release</ind-def:filepath>
1478 <ind-def:pattern operation="pattern match">^[\\s\\S]*DISTRIB_CODENAME=([a-z]+)$</ind-def:pattern>
1479 <ind-def:instance datatype="int">1</ind-def:instance>
1480 </ind-def:textfilecontent54_object>\n""".format(**mapping))
1481
1482 self.queue_element('state', """
1483 <ind-def:family_state id="{ns}:ste:{id_base}0" version="1" comment="The singleton family object.">
1484 <ind-def:family>unix</ind-def:family>
1485 </ind-def:family_state>
1486
1487 <ind-def:textfilecontent54_state id="{ns}:ste:{id_base}1" version="1" comment="{release_name}">
1488 <ind-def:subexpression>{codename}</ind-def:subexpression>
1489 </ind-def:textfilecontent54_state>\n""".format(**mapping))
1490
1491 def get_oval_test_for_package(self, package):
1492 """ create OVAL test and dependent objects for this package status
1493 @package = {
1494 'name' : '<package name>',
1495 'binaries' : [ '<binary_pkg_name', '<binary_pkg_name', ... ],
1496 'status' : '<not-applicable | unknown | vulnerable | fixed>',
1497 'note' : '<a description of the status>',
1498 'fix-version' : '<the version in which the issue was fixed, if applicable>',
1499 'id_base' : a base for the integer section of the OVAL id,
1500 'source-note' : a note about the datasource for debugging
1501 }
1502 """
15031517
1504 if package['status'] == 'fixed' and not package['fix-version']:1518 return reference
1505 self.warn('"{0}" package in {1} is marked fixed, but missing a fix-version. Changing status to vulnerable.'.format(package['name'], package['source-note']))
1506 package['status'] = 'vulnerable'
15071519
1508 if package['status'] == 'not-applicable':1520 def prepare_instructions(self, instruction, cve: CVE, product_description, package: Package, fixed_version):
1509 # if the package status is not-applicable, skip it!1521 if "LSN" in cve.number:
1510 return False1522 instruction = """\n
1511 elif package['status'] == 'not-vulnerable':1523 To check your kernel type and Livepatch version, enter this command:
1512 # if the packaget status is not-vulnerable, skip it!
1513 return False
1514 """
1515 object_id = self.get_package_object_id(package['name'], package['id_base'], 1)
15161524
1517 test_title = "Returns true whether or not the '{0}' package exists.".format(package['name'])1525 canonical-livepatch status"""
1518 test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id, None, 1, 'any_exist')
15191526
1520 package['note'] = package['name'] + package['note']1527 if not instruction:
1521 return {'id': test_id, 'comment': package['note'], 'negate': True}1528 instruction = """\n
1522 """1529 Update Instructions:
1523 elif package['status'] == 'vulnerable':
1524 object_id = self.get_package_object_id(package['name'], package['binaries'], package['id_base'])
15251530
1526 test_title = "Does the '{0}' package exist?".format(package['name'])1531 Run `sudo pro fix {0}` to fix the vulnerability. The problem can be corrected
1527 test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id)1532 by updating your system to the following package versions:""".format(cve)
15281533
1529 package['note'] = package['name'] + package['note']1534 instruction += '\n\n'
1530 return {'id': test_id, 'comment': package['note']}1535 source_version = package.get_version_to_check(fixed_version)
1531 elif package['status'] == 'fixed':1536 for binary_version in package.get_binary_versions(source_version):
1532 object_id = self.get_package_object_id(package['name'], package['binaries'], package['id_base'])1537 binaries = package.get_binaries(source_version, binary_version)
1538 for binary in binaries:
1539 instruction += """{0} - {1}\n""".format(binary, binary_version)
15331540
1534 state_id = self.get_package_version_state_id(package['id_base'], package['fix-version'])1541 if "LSN" in cve.number:
1542 instruction += "Livepatch subscription required"
1543 elif "Long Term" in product_description or "Interim" in product_description:
1544 instruction += "No subscription required"
1545 else:
1546 instruction += product_description
15351547
1536 test_title = "Does the '{0}' package exist and is the version less than '{1}'?".format(package['name'], package['fix-version'])1548 return instruction
1537 test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id, state_id)
15381549
1539 package['note'] = package['name'] + package['note']1550 def _populate_pkg(self, cve: CVE, package: Package, root_element, main_criteria, cache, fixed_versions) -> None:
1540 return {'id': test_id, 'comment': package['note']}1551 tests = root_element.find("tests")
1541 else:1552 objects = root_element.find("objects")
1542 if package['status'] != 'unknown':1553 variables = root_element.find("variables")
1543 self.warn('"{0}" is not a supported package status. Outputting for "unknown" status.'.format(package['status']))1554 states = root_element.find("states")
1555 pkg_rel_entry = cve.pkg_rel_entries[str(package)]
15441556
1545 if not hasattr(self, 'id_unknown_test'):1557 source_version = package.get_version_to_check(pkg_rel_entry.fixed_version)
1546 self.id_unknown_test = '{0}:tst:10'.format(self.ns)1558 for binary_version in package.get_binary_versions(source_version):
1547 self.queue_element('test', """1559 binaries = package.get_binaries(source_version, binary_version)
1548 <ind-def:unknown_test id="{0}" check="all" comment="The result of this test is always UNKNOWN." version="1" />\n""".format(self.id_unknown_test))1560 cache_entry = f'{package.name}-{binary_version}'
15491561
1550 package['note'] = package['name'] + package['note']1562 cache.setdefault(cache_entry, dict(bin_id=None, def_id=None))
1551 return {'id': self.id_unknown_test, 'comment': package['note']}
15521563
1553 # TODO: xml lib1564 # If version doesn't exist and only one binary_version, they all have the same binaries
1554 def get_package_object_id(self, name, bin_pkgs, id_base, version=1):1565 if not package.version_exists(source_version) and package.all_binaries_same_version:
1555 """ create unique object for each package and return its OVAL id """1566 cache_entry_bin = f'{package.name}-{GENERIC_VERSION}'
1556 if not hasattr(self, 'package_objects'):1567 cache.setdefault(cache_entry_bin, dict(bin_id=None, def_id=None))
1557 self.package_objects = {}1568 else:
1569 cache_entry_bin = cache_entry
15581570
1559 key = tuple(sorted(bin_pkgs))1571 if pkg_rel_entry.status == 'vulnerable' and not self.fixed_only:
1572 if not cache_entry in cache or not cache[cache_entry]['def_id']:
1573 self._add_criterion(self.definition_id, pkg_rel_entry, cve, main_criteria)
15601574
1561 if key not in self.package_objects:1575 test, object, var = self._generate_vulnerable_elements(package, binaries, cache[cache_entry_bin]['bin_id'])
1562 object_id = '{0}:obj:{1}0'.format(self.ns, id_base)1576 tests.append(test)
15631577
1564 if len(bin_pkgs) > 1:1578 if not cache[cache_entry_bin]['bin_id']:
1565 # create variable for binary package names1579 objects.append(object)
1566 variable_id = '{0}:var:{1}0'.format(self.ns, id_base)1580 variables.append(var)
1567 if self.oval_format == 'dpkg':1581 cache[cache_entry_bin]['bin_id'] = self.definition_id
1568 variable_values = '</value>\n <value>'.join(bin_pkgs)
1569 self.queue_element('variable', """
1570 <constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries">
1571 <value>{3}</value>
1572 </constant_variable>\n""".format(variable_id, version, name, variable_values))
1573
1574 # create an object that references the variable
1575 self.queue_element('object', """
1576 <linux-def:dpkginfo_object id="{0}" version="{1}" comment="The '{2}' package binaries.">
1577 <linux-def:name var_ref="{3}" var_check="at least one" />
1578 </linux-def:dpkginfo_object>\n""".format(object_id, version, name, variable_id))
15791582
1583 cache[cache_entry]['def_id'] = self.definition_id
1584 self._increase_id(is_definition=False)
1580 else:1585 else:
1581 variable_values = '(?::\w+|)\s+(.*)$</value>\n <value>^'.join(bin_pkgs)1586 self._add_criterion(cache[cache_entry]['def_id'], pkg_rel_entry, cve, main_criteria)
1582 self.queue_element('variable', """1587 elif pkg_rel_entry.status == 'fixed':
1583 <constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries">1588 if binary_version in fixed_versions:
1584 <value>^{3}(?::\w+|)\s+(.*)$</value>1589 self._add_criterion(fixed_versions[binary_version], pkg_rel_entry, cve, main_criteria)
1585 </constant_variable>\n""".format(variable_id, version, name, variable_values))
1586
1587 # create an object that references the variable
1588 self.queue_element('object', """
1589 <ind-def:textfilecontent54_object id="{0}" version="{1}" comment="The '{2}' package binaries.">
1590 <ind-def:path>.</ind-def:path>
1591 <ind-def:filename>manifest</ind-def:filename>
1592 <ind-def:pattern operation="pattern match" datatype="string" var_ref="{3}" var_check="at least one" />
1593 <ind-def:instance operation="greater than or equal" datatype="int">1</ind-def:instance>
1594 </ind-def:textfilecontent54_object>\n""".format(object_id, version, name, variable_id))
1595
1596 else:
1597 if self.oval_format == 'dpkg':
1598 # 1 binary package, so just use name in object (no variable)
1599 self.queue_element('object', """
1600 <linux-def:dpkginfo_object id="{0}" version="{1}" comment="The '{2}' package binary.">
1601 <linux-def:name>{3}</linux-def:name>
1602 </linux-def:dpkginfo_object>\n""".format(object_id, version, name, bin_pkgs[0]))
1603 else:1590 else:
1604 variable_id = '{0}:var:{1}0'.format(self.ns, id_base)1591 self._add_criterion(self.definition_id, pkg_rel_entry, cve, main_criteria)
1605 variable_values = '(?::\w+|)\s+(.*)$</value>\n <value>^'.join(bin_pkgs)
1606 self.queue_element('variable', """
1607 <constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries">
1608 <value>^{3}(?::\w+|)\s+(.*)$</value>
1609 </constant_variable>\n""".format(variable_id, version, name, variable_values))
1610 self.queue_element('object', """
1611 <ind-def:textfilecontent54_object id="{0}" version="{1}" comment="The '{2}' package binary.">
1612 <ind-def:path>.</ind-def:path>
1613 <ind-def:filename>manifest</ind-def:filename>
1614 <ind-def:pattern operation="pattern match" datatype="string" var_ref="{3}" var_check="at least one" />
1615 <ind-def:instance operation="greater than or equal" datatype="int">1</ind-def:instance>
1616 </ind-def:textfilecontent54_object>\n""".format(object_id, version, name, variable_id))
1617
1618 self.package_objects[key] = object_id
1619
1620 return self.package_objects[key]
1621
1622 # TODO: xml lib
1623 def get_package_version_state_id(self, id_base, fix_version, version=1):
1624 """ create unique states for each version and return its OVAL id """
1625 if not hasattr(self, 'package_version_states'):
1626 self.package_version_states = {}
1627
1628 key = fix_version
1629 if key not in self.package_version_states:
1630 state_id = '{0}:ste:{1}0'.format(self.ns, id_base)
1631 if self.oval_format == 'dpkg':
1632 epoch_fix_version = fix_version if fix_version.find(':') != -1 else "0:" + fix_version
1633 self.queue_element('state', """
1634 <linux-def:dpkginfo_state id="{0}" version="{1}" comment="The package version is less than '{2}'.">
1635 <linux-def:evr datatype="debian_evr_string" operation="less than">{2}</linux-def:evr>
1636 </linux-def:dpkginfo_state>\n""".format(state_id, version, epoch_fix_version))
1637 else:
1638 self.queue_element('state', """
1639 <ind-def:textfilecontent54_state id="{0}" version="{1}" comment="The package version is less than '{2}'.">
1640 <ind-def:subexpression datatype="debian_evr_string" operation="less than">{2}</ind-def:subexpression>
1641 </ind-def:textfilecontent54_state>\n""".format(state_id, version, fix_version))
1642 self.package_version_states[key] = state_id
16431592
1644 return self.package_version_states[key]1593 test, object, var, state = self._generate_fixed_elements(package, binaries, binary_version, cache[cache_entry_bin]['bin_id'])
1594 tests.append(test)
1595 states.append(state)
16451596
1646 # TODO: xml lib1597 if not cache[cache_entry_bin]['bin_id']:
1647 def get_package_test_id(self, name, id_base, test_title, object_id, state_id=None, version=1, check_existence='at_least_one_exists'):1598 objects.append(object)
1648 """ create unique test for each parameter set and return its OVAL id """1599 variables.append(var)
1649 if not hasattr(self, 'package_tests'):1600 cache[cache_entry_bin]['bin_id'] = self.definition_id
1650 self.package_tests = {}1601
16511602 fixed_versions[binary_version] = self.definition_id
1652 key = (name, test_title, object_id, state_id)1603 self._increase_id(is_definition=False)
1653 if key not in self.package_tests:1604
1654 test_id = '{0}:tst:{1}0'.format(self.ns, id_base)1605 def _populate_kernel_pkg(self, cve: CVE, package: Package, root_element, main_criteria, running_kernel_id, cache, fixed_versions) -> None:
1655 if self.oval_format == 'dpkg':1606 # Kernel binaries have all same version
1656 state_ref = '\n <linux-def:state state_ref="{0}" />'.format(state_id) if state_id else ''1607 version = package.get_latest_version()
1657 self.queue_element('test', """1608 binaries = package.get_binaries(version, version)
1658 <linux-def:dpkginfo_test id="{0}" version="{1}" check_existence="{5}" check="at least one" comment="{2}">1609 pkg_rel_entry = cve.pkg_rel_entries[str(package)]
1659 <linux-def:object object_ref="{3}"/>{4}1610 cache_entry = f'{package.name}-{pkg_rel_entry.fixed_version}'
1660 </linux-def:dpkginfo_test>\n""".format(test_id, version, test_title, object_id, state_ref, check_existence))1611
1612 if not cache_entry in cache:
1613 # Generate one-time elements
1614 kernel_criterion = self._generate_kernel_package_elements(package, binaries, root_element, running_kernel_id)
1615 cache[cache_entry] = kernel_criterion
1616
1617 if pkg_rel_entry.status == 'fixed':
1618 criteria = self._generate_criteria_kernel('AND')
1619 self._add_to_criteria(criteria, cache[cache_entry], operator='AND', depth=0)
1620
1621 kernel_version_criterion = self._add_kernel_elements(cve, package, pkg_rel_entry.fixed_version, pkg_rel_entry, root_element, running_kernel_id, fixed_versions)
1622 self._add_to_criteria(criteria, kernel_version_criterion, depth=0)
1623 self._add_to_criteria(main_criteria, criteria, depth=2, operator='OR')
1624 self._increase_id(is_definition=False)
1625 else:
1626 self._add_to_criteria(main_criteria, cache[cache_entry], depth=2, operator='OR')
1627
1628 def _generate_elements_from_cve(self, cve, supported_releases, root_element, running_kernel_id, pkg_cache, fixed_versions) -> None:
1629 if not cve.pkgs: return
1630 added = False
1631 definition_element = self._generate_definition_object(cve)
1632 instructions = ''
1633 pkgs = cve.get_pkgs(supported_releases)
1634 for pkg in pkgs:
1635 if not pkg.versions_binaries: continue
1636 if not pkg.get_binary_versions(next(iter(pkg.versions_binaries))): continue
1637 if self._ignore_source_package(pkg.name): continue
1638
1639 added = True
1640 pkg_rel_entry = cve.pkg_rel_entries[str(pkg)]
1641 if pkg.is_kernel_pkg and self.oval_format != 'oci':
1642 self._populate_kernel_pkg(cve, pkg, root_element, definition_element, running_kernel_id, pkg_cache, fixed_versions)
1661 else:1643 else:
1662 state_ref = '\n <ind-def:state state_ref="{0}" />'.format(state_id) if state_id else ''1644 self._populate_pkg(cve, pkg, root_element, definition_element, pkg_cache, fixed_versions)
1663 self.queue_element('test', """1645
1664 <ind-def:textfilecontent54_test id="{0}" version="{1}" check_existence="{5}" check="at least one" comment="{2}">1646 if pkg_rel_entry.status == 'fixed' and pkg.versions_binaries:
1665 <ind-def:object object_ref="{3}"/>{4}1647 product_description = cve_lib.get_subproject_description(pkg_rel_entry.release)
1666 </ind-def:textfilecontent54_test>\n""".format(test_id, version, test_title, object_id, state_ref, check_existence))1648 instructions = self.prepare_instructions(instructions, cve, product_description, pkg, pkg_rel_entry.fixed_version)
1667 self.package_tests[key] = test_id1649
16681650 if added:
1669 return self.package_tests[key]1651 definitions = root_element.find("definitions")
16701652 metadata = definition_element.find('metadata')
1671 def get_running_kernel_testref(self, package):1653 metadata.find('description').text = metadata.find('description').text + instructions
1672 if package['status'] == 'not-applicable':1654 definitions.append(definition_element)
1673 # if the package status is not-applicable, skip it!
1674 return
1675 elif package['status'] == 'not-vulnerable':
1676 # if the packaget status is not-vulnerable, skip it!
1677 return
1678
1679 testref = []
1680 uname_regex = process_kernel_binaries(package['binaries'], self.oval_format)
1681 if uname_regex:
1682 if self.oval_format == 'dpkg':
1683 var_id = self.get_running_kernel_variable_id(
1684 uname_regex,
1685 package['id_base'])
1686 ste_id = self.get_running_kernel_state_id(
1687 uname_regex,
1688 package['id_base'],
1689 var_id)
1690 obj_id = self.get_running_kernel_object_id(
1691 package['id_base'])
1692 test_id = self.get_running_kernel_test_id(
1693 uname_regex, package['id_base'], package['name'],
1694 obj_id, ste_id)
1695 testref.append({'id': test_id,
1696 'comment': 'Is kernel {0} running'.format(package['name']),
1697 'kernel': uname_regex,
1698 'var_id': var_id,
1699 }
1700 )
1701
1702 # even if a cve was not fixed, we should add the test and object
1703 # but not the state as there won't be a fixed version to compare
1704 # with
1705 ste_id = None
1706 if package['fix-version']:
1707 ste_id = self.get_patched_kernel_state_id(
1708 package['id_base'],
1709 package['fix-version']
1710 )
1711
1712 obj_id = self.get_patched_kernel_object_id(package['id_base'],
1713 var_id)
1714 test_id = self.get_patched_kernel_test_id(
1715 package['id_base'],
1716 package['fix-version'],
1717 obj_id, ste_id
1718 )
1719 testref.append({'id': test_id,
1720 'comment': 'kernel version comparison',
1721 'kernelobj': True})
1722 else: # OCI
1723 object_id = self.get_package_object_id(package['name'],
1724 [uname_regex],
1725 package['id_base'])
1726 state_id = None
1727 test_title = "Does the '{0}' package exist?".format(package['name'])
1728 if package['fix-version']:
1729 state_id = self.get_package_version_state_id(package['id_base'],
1730 package['fix-version'])
1731 test_title = "Does the '{0}' package exist and is the version less than '{1}'?".format(package['name'],
1732 package['fix-version'])
1733 test_id = self.get_package_test_id(package['name'],
1734 package['id_base'],
1735 test_title,
1736 object_id,
1737 state_id)
1738 package['note'] = package['name'] + package['note']
1739 return [{'id': test_id, 'comment': package['note']}]
1740
1741 return testref
1742
1743 # TODO: xml lib
1744 def get_running_kernel_object_id(self, id_base, version=1):
1745 """ creates a uname_object so we can use the value from uname -r for
1746 mainly two things:
1747 1. compare with the return uname is of the same version and flavour
1748 as the kernel we fixed a CVE. This is done in
1749 get_running_kernel_state_id
1750 2. store the uname value, minus the flavour, in a debian evr string
1751 format, e.g: 0:5.4.0-1059. With this we can compare if the patched
1752 kernel is greater than the running kernel
1753 The result of this two will go through an AND logic to confirm
1754 if we are or not vulnerable to such CVE"""
1755 if not hasattr(self, 'kernel_uname_obj_id'):
1756 self.kernel_uname_obj_id = None
1757
1758 if not self.kernel_uname_obj_id:
1759 object_id = '{0}:obj:{1}0'.format(self.ns, id_base)
1760
1761 self.queue_element('object', """
1762 <unix-def:uname_object id="{0}" version="{1}"/>\n""".format(object_id, version))
1763
1764 self.kernel_uname_obj_id = object_id
1765
1766 return self.kernel_uname_obj_id
1767
1768 # TODO: xml lib
1769 def get_running_kernel_state_id(self, uname_regex, id_base, var_id, version=1):
1770 """ create uname_state to compare the system uname to the affected kernel
1771 uname regex, allowing us to verify we are running the same major version
1772 and flavour as the affected kernel.
1773 Return its OVAL id
1774 """
1775 if not hasattr(self, 'uname_states'):
1776 self.uname_states = {}
1777
1778 if uname_regex not in self.uname_states:
1779 state_id = '{0}:ste:{1}0'.format(self.ns, id_base)
1780 self.queue_element('state', """
1781 <unix-def:uname_state id="{0}" version="{1}">
1782 <unix-def:os_release operation="pattern match">{2}</unix-def:os_release>
1783 </unix-def:uname_state>\n""".format(state_id, version, uname_regex))
1784
1785 self.uname_states[uname_regex] = state_id
1786
1787 return self.uname_states[uname_regex]
1788
1789 # TODO: xml lib
1790 def get_running_kernel_variable_id(self, uname_regex, id_base, version=1):
1791 """ creates a local variable to store running kernel version in devian evr string"""
1792 if not hasattr(self, 'uname_variables'):
1793 self.uname_variables = {}
1794
1795 var_id = '{0}:var:{1}0'.format(self.ns, id_base)
1796 obj_id = '{0}:obj:{1}0'.format(self.ns, id_base)
1797 self.queue_element('variable', """
1798 <local_variable id="{0}" datatype="debian_evr_string" version="{1}" comment="kernel version in evr format">
1799 <concat>
1800 <literal_component>0:</literal_component>
1801 <regex_capture pattern="^([\d|\.]+-\d+)[-|\w]+$">
1802 <object_component object_ref="{2}" item_field="os_release" />
1803 </regex_capture>
1804 </concat>
1805 </local_variable>\n""".format(var_id, version, obj_id))
1806
1807 self.uname_variables['local_variable'] = var_id
1808
1809 return self.uname_variables['local_variable']
1810
1811 # TODO: xml lib
1812 def get_running_kernel_test_id(self, uname_regex, id_base, name, object_id, state_id, version=1):
1813 """ create uname test and return its OVAL id """
1814 if not hasattr(self, 'uname_tests'):
1815 self.uname_tests = {}
1816
1817 if uname_regex not in self.uname_tests:
1818 test_id = '{0}:tst:{1}0'.format(self.ns, id_base)
1819 self.queue_element('test', """
1820 <unix-def:uname_test check="at least one" comment="Is kernel {0} currently running?" id="{1}" version="{2}">
1821 <unix-def:object object_ref="{3}"/>
1822 <unix-def:state state_ref="{4}"/>
1823 </unix-def:uname_test>\n""".format(name, test_id, version, object_id, state_id))
1824
1825 self.uname_tests[uname_regex] = test_id
1826
1827 return self.uname_tests[uname_regex]
18281655
1829 def get_patched_kernel_variable_id(self, id_base, fixed_version, version=1):
1830 """ creates a local variable to store the patched kernel version """
1831 if not hasattr(self, 'patched_kernel_variables'):
1832 self.patched_kernel_variables = {}
18331656
1834 patched = re.search('([\d|\.]+-\d+)[\.|\d]+', fixed_version)1657 def generate_oval(self) -> None:
1835 if patched:1658 for release in self.releases:
1836 patched = patched.group(1)1659 self._init_ids(release)
1837 else:1660 self.definition_step = 1 * 10 ** 7
1838 patched = fixed_version1661 xml_tree, root_element = self._get_root_element()
18391662 generator = self._get_generator("CVE")
1840 if patched not in self.patched_kernel_variables:1663 root_element.append(generator)
1841 var_id = '{0}:var:{1}0'.format(self.ns, id_base + 1)1664 self._add_structure(root_element)
1665 running_kernel_id = None
18421666
1843 self.queue_element('variable', """1667 if self.oval_format == 'dpkg':
1844 <constant_variable id="{0}" version="{1}" datatype="debian_evr_string" comment="patched kernel">1668 # One time kernel check
1845 <value>0:{2}</value>1669 self._add_release_checks(root_element)
1846 </constant_variable>""".format(var_id, version, patched))1670 self._add_running_kernel_checks(root_element)
1671 running_kernel_id = self.definition_id
1672
1673 pkg_cache = {}
1674 fixed_versions = {}
1675 accepted_releases = list(self.parent_releases)
1676 accepted_releases.insert(0, self.release)
1677
1678 all_cves = self.cves[self.release]
1679 for parent_release in list(self.parent_releases):
1680 for cve in self.cves[parent_release]:
1681 if cve not in all_cves:
1682 all_cves[cve] = self.cves[parent_release][cve]
1683
1684 all_cves = dict(sorted(all_cves.items()))
1685
1686 for cve in all_cves:
1687 self._set_definition_id(cve_id=all_cves[cve].number)
1688 self._generate_elements_from_cve(all_cves[cve], accepted_releases, root_element, running_kernel_id, pkg_cache, fixed_versions)
1689
1690 self._write_oval_xml(xml_tree, root_element)
1691
1692class OvalGeneratorUSNs(OvalGenerator):
1693 def __init__(self, release, release_name, cve_paths, packages, progress, pkg_cache, usn_db_dir, fixed_only=True, cve_cache=None, cve_prefix_dir=None, outdir='./', oval_format='dpkg') -> None:
1694 super().__init__('usn', release, release_name, cve_paths, packages, progress, pkg_cache, fixed_only, cve_cache, cve_prefix_dir, outdir, oval_format)
1695 self._load_usns(usn_db_dir)
1696
1697 def _load_usns(self, usn_db_dir):
1698 self.usns = {}
1699 for filename in glob.glob(os.path.join(usn_db_dir, 'database*.json')):
1700 with open(filename, 'r') as f:
1701 data = json.load(f)
1702 for item in data:
1703 usn = USN(item)
1704 self.usns[usn.id] = usn
1705
1706 for usn_id in sorted(self.usns.keys()):
1707 if re.search(r'^[0-9]+-[0-9]$', usn_id):
1708 self.usns[usn_id]['id'] = 'USN-' + usn_id
1709
1710 def _generate_advisory(self, usn: USN) -> etree.Element:
1711 severities = ['low', 'medium', 'high', 'critical']
1712 advisory = etree.Element("advisory")
1713 severity = etree.SubElement(advisory, "severity")
1714 issued = etree.SubElement(advisory, "issued")
1715 severity = None
1716 for cve in usn.cves:
1717 cve_obj = self._generate_cve_tag(self.cves[cve])
1718 advisory.append(cve_obj)
18471719
1848 self.patched_kernel_variables[patched] = var_id1720 if not severity or severities.index(self.cves[cve].severity) > severities.index(severity):
1721 severity = self.cves[cve].severity
18491722
1850 return self.patched_kernel_variables[patched]1723 severity.text = severity.capitalize()
1724 issued.text = usn.timestamp
18511725
1852 def get_patched_kernel_object_id(self, id_base, var_id, version=1):1726 return advisory
1853 """ create variable object that points to kernel version
1854 in evr format in local_variable
1855 """
18561727
1857 object_id = '{0}:obj:{1}0'.format(self.ns, id_base + 1)1728 def _generate_metadata(self, usn: USN) -> etree.Element:
1729 metadata = etree.Element("metadata")
1730 title = etree.SubElement(metadata, "title")
1731 description = etree.SubElement(metadata, "description")
1732 affected = etree.SubElement(metadata, "affected", attrib = {"family": "unix"})
1733 platform = etree.SubElement(affected, "platform")
18581734
1859 self.queue_element('object', """1735 reference = self._generate_reference(usn)
1860 <ind-def:variable_object id="{0}" version="{1}">1736 metadata.append(reference)
1861 <ind-def:var_ref>{2}</ind-def:var_ref>1737 advisory = self._generate_advisory(usn)
1862 </ind-def:variable_object>\n""".format(object_id, version, var_id))1738 metadata.append(reference)
1739 metadata.append(advisory)
18631740
1864 return object_id1741 platform.text = self.release_name
1742 title.text = usn.title
1743 description.text = usn.description
18651744
1866 # TODO: xml lib1745 return metadata
1867 def get_patched_kernel_state_id(self, id_base, fixed_version, version=1):
1868 """ create state to compare to the running kernel
1869 Return its OVAL id
1870 """
1871 if not hasattr(self, 'patched_kernel_states'):
1872 self.patched_kernel_states = {}
18731746
1874 patched = re.search('([\d|\.]+-\d+)[\.|\d]+', fixed_version)1747 # Element generators
1875 if patched:1748 def _generate_reference(self, usn: USN) -> etree.Element:
1876 patched = patched.group(1)1749 reference = etree.Element("reference", attrib={
1877 else:1750 "source": "USN",
1878 patched = fixed_version1751 "ref_id": usn.id,
1752 "ref_url": f'https://ubuntu.com/security/notices/{usn.id}'
1753 })
18791754
1880 if patched not in self.patched_kernel_states:1755 return reference
1881 state_id = '{0}:ste:{1}0'.format(self.ns, id_base + 1)
18821756
1883 self.queue_element('state', """1757 def _populate_pkg(self, cve: CVE, package: Package, root_element, main_criteria, cache, fixed_versions) -> None:
1884 <ind-def:variable_state id="{0}" version="{1}">1758 tests = root_element.find("tests")
1885 <ind-def:value datatype="debian_evr_string" operation="less than">{2}</ind-def:value>1759 objects = root_element.find("objects")
1886 </ind-def:variable_state>\n""".format(state_id, version, patched))1760 variables = root_element.find("variables")
1761 states = root_element.find("states")
1762 pkg_rel_entry = cve.pkg_rel_entries[str(package)]
1763 cache.setdefault(package.name, dict(bin_id=None, def_id=None))
18871764
1888 self.patched_kernel_states[patched] = state_id
18891765
1890 return self.patched_kernel_states[patched]1766 if pkg_rel_entry.status == 'vulnerable' and not self.fixed_only:
1767 if not package.name in cache or not cache[package.name]['def_id']:
1768 self._add_criterion(self.definition_id, pkg_rel_entry, cve, main_criteria)
18911769
1892 def get_patched_kernel_test_id(self, id_base, fixed_version, object_id, state_id, version=1):1770 test, object, var = self._generate_vulnerable_elements(package, cache[package.name]['bin_id'])
1893 """ create patched kernel test and return its OVAL id """1771 tests.append(test)
1894 if not hasattr(self, 'patched_kernel_tests'):
1895 self.patched_kernel_tests = {}
18961772
1897 if fixed_version not in self.patched_kernel_tests:1773 if not cache[package.name]['bin_id']:
1898 test_id = '{0}:tst:{1}0'.format(self.ns, id_base + 1)1774 objects.append(object)
1775 variables.append(var)
1776 cache[package.name]['bin_id'] = self.definition_id
18991777
1900 if state_id:1778 cache[package.name]['def_id'] = self.definition_id
1901 self.queue_element('test', """1779 self._increase_id(is_definition=False)
1902 <ind-def:variable_test id="{0}" version="1" check="all" check_existence="all_exist" comment="kernel version comparison">
1903 <ind-def:object object_ref="{1}"/>
1904 <ind-def:state state_ref="{2}"/>
1905 </ind-def:variable_test>\n""".format(test_id, object_id, state_id))
1906 else:1780 else:
1907 self.queue_element('test', """1781 self._add_criterion(cache[package.name]['def_id'], pkg_rel_entry, cve, main_criteria)
1908 <ind-def:variable_test id="{0}" version="1" check="all" check_existence="all_exist" comment="kernel version comparison">1782 elif pkg_rel_entry.status == 'fixed':
1909 <ind-def:object object_ref="{1}"/>1783 if pkg_rel_entry.fixed_version in fixed_versions:
1910 </ind-def:variable_test>\n""".format(test_id, object_id))1784 self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, main_criteria)
19111785 else:
1912 self.patched_kernel_tests[fixed_version] = test_id1786 self._add_criterion(self.definition_id, pkg_rel_entry, cve, main_criteria)
1913
1914 return self.patched_kernel_tests[fixed_version]
1915
1916 def queue_element(self, element, xml):
1917 """ add an OVAL element to an output queue file """
1918 if element not in OvalGenerator.supported_oval_elements:
1919 self.warn('"{0}" is not a supported OVAL element.'.format(element))
1920 return
1921
1922 if not hasattr(self, 'tmp'):
1923 self.tmp = {}
1924 self.tmp_n = random.randrange(1000000, 9999999)
19251787
1926 if element not in self.tmp:1788 test, object, var, state = self._generate_fixed_elements(package, pkg_rel_entry, cache[package.name]['bin_id'])
1927 self.tmp[element] = _open(os.path.join(self.tmpdir,1789 tests.append(test)
1928 './queue.{0}.{1}.xml'.format(1790 states.append(state)
1929 self.tmp_n, element)), 'wt')
19301791
1931 # trim and fix indenting (assumes fragment is nicely indented internally)1792 if not cache[package.name]['bin_id']:
1932 xml = xml.strip('\n')1793 objects.append(object)
1933 base_indent = re.match(r'\s*', xml).group(0)1794 variables.append(var)
1934 xml = re.sub('^{0}'.format(base_indent), ' ', xml, 0,1795 cache[package.name]['bin_id'] = self.definition_id
1935 re.MULTILINE)
19361796
1937 self.tmp[element].write(xml + '\n')1797 fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id
1798 self._increase_id(is_definition=False)
19381799
1939 # TODO: xml lib1800 def _populate_kernel_pkg(self, cve: CVE, package: Package, root_element, main_criteria, running_kernel_id, cache, fixed_versions) -> None:
1940 def write_to_file(self):1801 if not package.name in cache:
1941 """ dequeue all elements into one OVAL definitions file and clean up """1802 # Generate one-time elements
1942 if not hasattr(self, 'tmp'):1803 kernel_criterion = self._generate_kernel_package_elements(package, root_element, running_kernel_id)
1943 return1804 cache[package.name] = kernel_criterion
19441805
1945 # close queue files for writing and then open for reading1806 pkg_rel_entry = cve.pkg_rel_entries[str(package)]
1946 for key in self.tmp:
1947 self.tmp[key].close()
1948 self.tmp[key] = _open(self.tmp[key].name, 'rt')
19491807
1950 tmp = os.path.join(self.tmpdir, self.output_filepath)1808 if pkg_rel_entry.status == 'fixed':
1951 with _open(tmp, 'wt') as f:1809 criteria = self._generate_criteria_kernel('AND')
1952 # add header1810 self._add_to_criteria(criteria, cache[package.name], operator='AND', depth=0)
1953 oval_timestamp = datetime.now(tz=timezone.utc).strftime(
1954 '%Y-%m-%dT%H:%M:%S')
1955 f.write("""<oval_definitions
1956 xmlns="http://oval.mitre.org/XMLSchema/oval-definitions-5"
1957 xmlns:ind-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#independent"
1958 xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5"
1959 xmlns:unix-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix"
1960 xmlns:linux-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux"
1961 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd">
19621811
1963 <generator>1812 kernel_version_criterion = self._add_kernel_elements(cve, package, pkg_rel_entry, root_element, running_kernel_id, fixed_versions)
1964 <oval:product_name>Canonical CVE OVAL Generator</oval:product_name>1813 self._add_to_criteria(criteria, kernel_version_criterion, depth=0)
1965 <oval:product_version>{0}</oval:product_version>1814 self._add_to_criteria(main_criteria, criteria, depth=2, operator='OR')
1966 <oval:schema_version>{1}</oval:schema_version>1815 self._increase_id(is_definition=False)
1967 <oval:timestamp>{2}</oval:timestamp>1816 else:
1968 </generator>\n""".format(OvalGenerator.generator_version, OvalGenerator.oval_schema_version, oval_timestamp))1817 self._add_to_criteria(main_criteria, cache[package.name], depth=2, operator='OR')
1818
1819 self._increase_id(is_definition=True)
19691820
1970 # add queued file content1821 def generate_oval(self) -> None:
1971 for element in OvalGenerator.supported_oval_elements:1822 self._reset()
1972 if element in self.tmp:1823 xml_tree, root_element = self._get_root_element()
1973 f.write("\n <{0}s>\n".format(element))1824 generator = self._get_generator("USN")
1974 f.write(self.tmp[element].read().rstrip())1825 root_element.append(generator)
1975 f.write("\n </{0}s>".format(element))1826 self._add_structure(root_element)
19761827
1977 # add footer1828 if self.oval_format == 'dpkg':
1978 f.write("\n</oval_definitions>")1829 # One time kernel check
1830 self._add_release_checks(root_element)
1831 self._add_running_kernel_checks(root_element)
1832 running_kernel_id = self.definition_id
1833 self._increase_id(is_definition=True)
19791834
1980 # close and delete queue files1835 definitions = root_element.find("definitions")
1981 for key in self.tmp:1836 pkg_cache = {}
1982 self.tmp[key].close()1837 fixed_versions = {}
1983 os.remove(self.tmp[key].name)
19841838
1985 # close self.output_filepath and move into place1839 for usn in self.usns:
1986 f.close()1840 definition_element = self._generate_definition_object(self.usns[usn])
1987 shutil.move(tmp, os.path.join(self.output_dir, self.output_filepath))1841 instructions = ''
19881842
1989 # remove tmp dir if empty1843 for cve in self.cves:
1990 if not os.listdir(self.tmpdir):1844 for pkg in self.cves[cve].pkgs:
1991 os.rmdir(self.tmpdir)1845 pkg_rel_entry = self.cves[cve].pkg_rel_entries[str(pkg)]
1846 if self.packages[pkg].is_kernel_pkg and self.oval_format != 'oci':
1847 self._populate_kernel_pkg(self.cves[cve], pkg, root_element, definition_element, running_kernel_id, pkg_cache, fixed_versions)
1848 else:
1849 self._populate_pkg(self.cves[cve], pkg, root_element, definition_element, pkg_cache, fixed_versions)
1850
1851 if pkg_rel_entry.status == 'fixed' and pkg.binaries:
1852 product_description = cve_lib.get_subproject_description(pkg_rel_entry.release)
1853 instructions = prepare_instructions(instructions, self.cves[cve].number, product_description, {'binaries': pkg.binaries, 'fix-version': pkg_rel_entry.fixed_version})
1854
1855 metadata = definition_element.find('metadata')
1856 metadata.find('description').text = metadata.find('description').text + instructions
1857 definitions.append(definition_element)
19921858
1993 def unique_id_base(self, id_base, note):1859 self._write_oval_xml(xml_tree, root_element)
1994 """ queue a warning message """
1995 if not hasattr(self, 'id_bases'):
1996 self.id_bases = {}
1997 is_unique = id_base not in self.id_bases.keys()
1998 if not is_unique:
1999 self.warn('ID Base collision {0} in {1} and {2}.'.format(
2000 id_base, note, self.id_bases[id_base]))
2001 self.id_bases[id_base] = note
2002 return is_unique
2003
2004 def warn(self, message):
2005 """ print a warning message """
2006 print('WARNING: {0}'.format(message))
20071860
2008class OvalGeneratorUSN():1861class OvalGeneratorUSN():
2009 supported_oval_elements = ('definition', 'test', 'object', 'state',1862 supported_oval_elements = ('definition', 'test', 'object', 'state',
@@ -2709,7 +2562,7 @@ class OvalGeneratorUSN():
2709 xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5"2562 xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5"
2710 xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix"2563 xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix"
2711 xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux"2564 xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux"
2712 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd">2565 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#linux linux-definitions-schema.xsd">
27132566
2714 <generator>2567 <generator>
2715 <oval:product_name>Canonical USN OVAL Generator</oval:product_name>2568 <oval:product_name>Canonical USN OVAL Generator</oval:product_name>
diff --git a/test/gold_oval_structure/oval.xml b/test/gold_oval_structure/oval.xml
index c45fd96..1600b15 100644
--- a/test/gold_oval_structure/oval.xml
+++ b/test/gold_oval_structure/oval.xml
@@ -4,7 +4,7 @@
4 xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5"4 xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5"
5 xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix"5 xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix"
6 xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux"6 xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux"
7 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd">7 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#linux linux-definitions-schema.xsd">
88
9 <generator>9 <generator>
10 <oval:product_name>Canonical USN OVAL Generator</oval:product_name>10 <oval:product_name>Canonical USN OVAL Generator</oval:product_name>

Subscribers

People subscribed via source and target branches