Merge ~ebarretto/ubuntu-cve-tracker:new-pkg-cache into ubuntu-cve-tracker:master

Proposed by Eduardo Barretto
Status: Merged
Merged at revision: a58b614ef83e3253e2d256f8fb6dc43fe25fc902
Proposed branch: ~ebarretto/ubuntu-cve-tracker:new-pkg-cache
Merge into: ubuntu-cve-tracker:master
Diff against target: 798 lines (+183/-312)
2 files modified
scripts/generate-oval (+46/-198)
scripts/oval_lib.py (+137/-114)
Reviewer Review Type Date Requested Status
David Fernandez Gonzalez Approve
Review via email: mp+449457@code.launchpad.net

Description of the change

This PR moves the OVAL generation for CVE and PKG based OVALs from the old cache mechanism to the new.
The new cache mechanism fixes a few of the issues we currently have, such as:
- long time to generate OVAL data
- situations where source package produces binaries with different versions, e.g. libreoffice.

It also fixes an epoch issue in the PKG-based OVAL and tries to simplify some of the PKG-based OVAL generation.

To post a comment you must log in.
Revision history for this message
David Fernandez Gonzalez (litios) wrote :

LGTM, thanks!

All the tests seem to generate the "same output" and tests pass.
A great increase in generation speed!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
diff --git a/scripts/generate-oval b/scripts/generate-oval
index 110d48e..6a80945 100755
--- a/scripts/generate-oval
+++ b/scripts/generate-oval
@@ -31,15 +31,11 @@ import json
31import os31import os
32import re32import re
33import sys33import sys
34#from launchpadlib.launchpad import Launchpad
3534
36import apt_pkg35import apt_pkg
37from cve_lib import (kernel_srcs, product_series, load_cve, PRODUCT_UBUNTU, all_releases, eol_releases, devel_release, release_parent, release_name, release_ppa, release_progenitor, needs_oval)36from cve_lib import (kernel_srcs, product_series, load_cve, PRODUCT_UBUNTU, all_releases, eol_releases, devel_release, release_parent, release_name, release_progenitor, needs_oval)
38from kernel_lib import meta_kernels37from kernel_lib import meta_kernels
39import oval_lib38import oval_lib
40import functools
41import lpl_common
42import tempfile
4339
44# cope with apt_pkg api changes.40# cope with apt_pkg api changes.
45if 'init_system' in dir(apt_pkg):41if 'init_system' in dir(apt_pkg):
@@ -57,18 +53,15 @@ for r in set(all_releases).difference(set(eol_releases)).difference(set([devel_r
5753
58default_cves_to_process = ['active/CVE-*', 'retired/CVE-*']54default_cves_to_process = ['active/CVE-*', 'retired/CVE-*']
5955
60packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-")
61
62debug_level = 056debug_level = 0
6357
64package_cache = None58package_cache = None
6559
66cve_cache = {}
67
68def main():60def main():
69 """ parse command line options and iterate through files to be processed61 """ parse command line options and iterate through files to be processed
70 """62 """
71 global debug_level63 global debug_level
64 global supported_releases
7265
73 # parse command line options66 # parse command line options
74 parser = argparse.ArgumentParser(description='Generate CVE OVAL from '67 parser = argparse.ArgumentParser(description='Generate CVE OVAL from '
@@ -90,10 +83,8 @@ def main():
90 '(default is ./)')83 '(default is ./)')
91 parser.add_argument('--no-progress', action='store_true',84 parser.add_argument('--no-progress', action='store_true',
92 help='do not show progress meter')85 help='do not show progress meter')
93 parser.add_argument('--pkg-cache', action='store', default="pkg_cache.json",86 parser.add_argument('--pkg-cache-dir', action='store', default='./',
94 help='cache location for binary packages')87 help='cache location for binary packages')
95 parser.add_argument('--force-cache-reload', action='store_true',
96 help='force reload of cache file')
97 parser.add_argument('-d', '--debug', action='count', default=0,88 parser.add_argument('-d', '--debug', action='count', default=0,
98 help="report debugging information")89 help="report debugging information")
99 parser.add_argument('--usn-oval', action='store_true',90 parser.add_argument('--usn-oval', action='store_true',
@@ -149,25 +140,27 @@ def main():
149140
150 return141 return
151142
152 cve_cache = {}143 releases = [args.oval_release] if args.oval_release else supported_releases
153 cache = PackageCache(args.pkg_cache, args.force_cache_reload)144 supported_releases = releases
145 cache = {}
146 for release in releases:
147 cve_cache = {}
148 cache.update({release: get_package_cache(args.pkg_cache_dir, release)})
154149
155 if args.pkg_oval:150 if args.pkg_oval:
156 releases = [args.oval_release] if args.oval_release else supported_releases151 if args.oci:
157 for release in releases:152 generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ociprefix, ocioutdir)
158 if args.oci:153 else:
159 generate_oval_package(release, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ociprefix, ocioutdir)154 generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only)
160 else:
161 generate_oval_package(release, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only)
162 return155 return
163156
164 if args.oci:157 if args.oci:
165 generate_oval_cve(args.output_dir, args.cve_prefix_dir, cache, args.oci,158 generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci,
166 args.no_progress, args.packages, pathnames, ociprefix, ocioutdir)159 args.no_progress, args.packages, pathnames, ociprefix, ocioutdir)
167 else:160 else:
168 generate_oval_cve(args.output_dir, args.cve_prefix_dir, cache, args.oci,161 generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci,
169 args.no_progress, args.packages, pathnames)162 args.no_progress, args.packages, pathnames)
170163 return
171164
172165
173# given a status generated by parse_package_status(), duplicate it for a166# given a status generated by parse_package_status(), duplicate it for a
@@ -182,10 +175,8 @@ def duplicate_package_status(release, package, original_status, cache, override_
182 elif 'fix-version' in original_status:175 elif 'fix-version' in original_status:
183 copied_status['fix-version'] = original_status['fix-version']176 copied_status['fix-version'] = original_status['fix-version']
184177
185 if 'fix-version' in copied_status and copied_status['fix-version'].isdigit():178 if 'bin-pkgs' in original_status:
186 copied_status['bin-pkgs'] = cache.get_binarypkgs(package, release, version=copied_status['fix-version'])179 copied_status['bin-pkgs'] = original_status['bin-pkgs']
187 else:
188 copied_status['bin-pkgs'] = cache.get_binarypkgs(package, release)
189180
190 return copied_status181 return copied_status
191182
@@ -355,158 +346,16 @@ def prepend_usn_to_id(usn_database, usn_id):
355 if re.search(r'^[0-9]+-[0-9]$', usn_id):346 if re.search(r'^[0-9]+-[0-9]$', usn_id):
356 usn_database[usn_id]['id'] = 'USN-' + usn_id347 usn_database[usn_id]['id'] = 'USN-' + usn_id
357348
358# Class to contain the binary package cache349# loads cve package cache <release>-pkg-cache.json based given path to it.
359class PackageCache():350# To get the cache proceed as: $UCT/scripts/fetch-db <release>-pkg-cache.json pkg-cache
360351def get_package_cache(pkg_cache_dir, release):
361 cachefile = None352 data = {}
362 cache_updates = 0353 for filename in glob.glob(os.path.join(pkg_cache_dir, release.replace('/', '_') + '-pkg-cache.json')):
363 releases = dict()354 debug(f"Opening and reading cache file {filename}")
364 unpublished_sources = dict()355 with open(filename, 'r') as f:
365 packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-")356 data = json.load(f)
366
367 def __init__(self, cachefile='data_file.json', force_reload=False):
368 self.cachefile = cachefile
369 self.force_reload = force_reload
370
371 # open the local cache if it exists
372 if os.path.exists(self.cachefile):
373 debug('Opening and reading cache file %s' % self.cachefile)
374 with open(self.cachefile, "r") as read_file:
375 self.pkgcache = json.load(read_file)
376 else:
377 self.pkgcache = dict()
378 self.force_reload = True
379
380 # Get launchpad handlers...
381 debug('Setting up launchpad connection...')
382 self.lp = lpl_common.connect(version='devel')
383 #lp = Launchpad.login_anonymously("generate-oval", "production", version='devel')
384 self.ubuntu = self.lp.distributions['ubuntu']
385 self.archive = self.ubuntu.main_archive
386
387 # if debugging is enabled, flush cache every update
388 self.cache_write_frequency = 1 if debug_level > 0 else 100
389
390 def write_cache(self):
391 debug('Writing cache file %s' % self.cachefile)
392 if self.cachefile:
393 # create as separate file and try to do atomic rename so
394 # multiple writers don't corrupt cache.
395 with tempfile.NamedTemporaryFile(mode='w', prefix='oval_pkg_cache-', suffix='.new', dir=os.path.dirname(self.cachefile), delete=False) as write_file:
396 new_cachefile = write_file.name
397 json.dump(self.pkgcache, write_file, indent=2)
398
399 os.rename(new_cachefile, self.cachefile)
400
401 def _has_no_published_source(self, package, release):
402 return (package in self.unpublished_sources
403 and release in self.unpublished_sources[package])
404
405 def _add_no_published_source(self, package, release):
406 if package not in self.unpublished_sources:
407 self.unpublished_sources[package] = [release]
408 else:
409 self.unpublished_sources[package].append(release)
410
411 # lookup source package in launchpad, get latest version
412 def _lookup_latest_source_package(self, source_name, release):
413
414 # cache lp release info
415 if release not in self.releases:
416 # we can have nested parent releases
417 parent = release_parent(release)
418 while release_parent(parent):
419 parent = release_parent(parent)
420 if parent:
421 self.releases[release] = self.ubuntu.getSeries(name_or_version=parent)
422 else:
423 self.releases[release] = self.ubuntu.getSeries(name_or_version=product_series(release)[1])
424
425 ppa = release_ppa(release)
426 if ppa:
427 archive, group, ppa_full_name = lpl_common.get_archive(ppa, self.lp, False, distribution=self.ubuntu)
428 sources = archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published')
429 if len(sources) == 0:
430 # if release is subproject and no version of package was released to it
431 # then copy source from parent release
432 sources = self.archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published')
433 else:
434 sources = self.archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published')
435
436 # some kernels get statuses even when not published in the
437 # archive yet
438 if len(sources) == 0:
439 self._add_no_published_source(source_name, release)
440 return None
441
442 # in python3, filter returns an iterable object, so wrap in list()
443 sources = list(filter(lambda x: x.pocket in ['Release', 'Security', 'Updates'], sources))
444 # some packages are only in proposed, even for non-devel releases
445 if len(sources) == 0:
446 self._add_no_published_source(source_name, release)
447 return None
448
449 source = sorted(sources,
450 key=functools.cmp_to_key(lambda x, y: apt_pkg.version_compare(x.source_package_version, y.source_package_version)),
451 reverse=True)[0]
452 debug('Launchpad returned %s %s' % (source.source_package_name, source.source_package_version))
453 return source
454
455 def get_binarypkgs(self, pname, release, version=None):
456 """ return a list of binary packages from the source package """
457
458 # first check local cache
459 #
460 # if the version in the cve tracker is newer than the
461 # version in the cache, we should refresh the cache
462 # if there's no version in the tracker, return the cached entry
463 if pname in self.pkgcache:
464 if (release in self.pkgcache[pname]['Releases'] and
465 (not version or
466 apt_pkg.version_compare(version, self.pkgcache[pname]['Releases'][release].get('source_version', 0)) <= 0)):
467 return self.pkgcache[pname]['Releases'][release]['binaries']
468
469 debug('Cache miss: %s %s %s' % (pname, release, version))
470
471 # skip lookup if unpublished_sources is empty and
472 # force_reload is False
473 if not self.unpublished_sources and not self.force_reload:
474 if pname not in self.pkgcache or release not in self.pkgcache[pname]['Releases']:
475 return None
476
477 # skip lookup if we've already done a lookup and found no
478 # published source for that release
479 if self._has_no_published_source(pname, release):
480 return None
481
482 # query launchpad if not in local cache
483 source = self._lookup_latest_source_package(pname, release)
484 if not source:
485 return None
486
487 binaries = source.getPublishedBinaries()
488 binlist = []
489 for i in binaries:
490 # skip if we already saw this package
491 if (i.binary_package_name in binlist):
492 continue
493 # for kernel we only want linux images
494 if pname.startswith('linux') and not (i.binary_package_name).startswith('linux-image-'):
495 continue
496 # skip ignored packages, with exception of golang*-dev pkgs
497 if (i.binary_package_name).startswith(('golang-go')) or not any(s in i.binary_package_name for s in self.packages_to_ignore):
498 binlist.append(i.binary_package_name)
499
500 # save current pkgcache to local cache
501 if pname not in self.pkgcache:
502 self.pkgcache[pname] = {'Releases': {}}
503 self.pkgcache[pname]['Releases'][release] = {'binaries': binlist, 'source_version': source.source_package_version}
504
505 self.cache_updates += 1
506 if self.cache_updates % self.cache_write_frequency == 0:
507 self.write_cache()
508357
509 return binlist358 return data
510359
511# loads usn database.json based given a path to it.360# loads usn database.json based given a path to it.
512# To get the database proceed as: $UCT/scripts/fetch-db database.json.bz2361# To get the database proceed as: $UCT/scripts/fetch-db database.json.bz2
@@ -581,31 +430,31 @@ def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=N
581430
582 return True431 return True
583432
584def generate_oval_package(release, outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ociprefix='', ocioutdir=None):433def generate_oval_package(outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ociprefix='', ocioutdir=None):
585 if not no_progress:434 for release in supported_releases:
586 print(f'[*] Generating OVAL for packages in release {release}')435 if not no_progress:
587 ov = oval_lib.OvalGeneratorPkg(release, release_name(release), pathnames, packages, not no_progress,pkg_cache=pkg_cache, fixed_only=fixed_only, cve_cache=cve_cache, oval_format='oci' if oci else 'dpkg', outdir=outdir, cve_prefix_dir=cve_prefix_dir, prefix=ociprefix)436 print(f'[*] Generating OVAL for packages in release {release}')
588 ov.generate_oval()437 ov = oval_lib.OvalGeneratorPkg(release, release_name(release), pathnames, packages, not no_progress, pkg_cache=pkg_cache, fixed_only=fixed_only, cve_cache=cve_cache, oval_format='oci' if oci else 'dpkg', outdir=outdir, cve_prefix_dir=cve_prefix_dir, prefix=ociprefix)
589
590 if oci:
591 ov.oval_format = 'dpkg'
592 ov.generate_oval()438 ov.generate_oval()
593439
594 pkg_cache.write_cache()440 if oci:
595 if not no_progress:441 ov.oval_format = 'dpkg'
596 print(f'[X] Done generating OVAL for packages in release {release}')442 ov.generate_oval()
443
444 if not no_progress:
445 print(f'[X] Done generating OVAL for packages in release {release}')
597446
598def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages, pathnames, ociprefix=None, ocioutdir=None):447def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages, pathnames, ociprefix=None, ocioutdir=None):
599 ovals = dict()448 ovals = dict()
600 for i in supported_releases:449 for release in supported_releases:
601 # we can have nested parent releases450 # we can have nested parent releases
602 parent = release_progenitor(i)451 parent = release_progenitor(release)
603 index = '{0}_dpkg'.format(i)452 index = '{0}_dpkg'.format(release)
604 ovals[index] = oval_lib.OvalGeneratorCVE(i, release_name(i), parent, warn, outdir, prefix='', oval_format='dpkg')453 ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, outdir, prefix='', oval_format='dpkg')
605 ovals[index].add_release_applicability_definition()454 ovals[index].add_release_applicability_definition()
606 if oci:455 if oci:
607 index = '{0}_oci'.format(i)456 index = '{0}_oci'.format(release)
608 ovals[index] = oval_lib.OvalGeneratorCVE(i, release_name(i), parent, warn, ocioutdir, prefix=ociprefix, oval_format='oci')457 ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, ocioutdir, prefix=ociprefix, oval_format='oci')
609 ovals[index].add_release_applicability_definition()458 ovals[index].add_release_applicability_definition()
610459
611 # loop through all CVE data files460 # loop through all CVE data files
@@ -636,6 +485,5 @@ def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages,
636 for i in ovals:485 for i in ovals:
637 ovals[i].write_to_file()486 ovals[i].write_to_file()
638487
639 cache.write_cache()
640if __name__ == '__main__':488if __name__ == '__main__':
641 main()489 main()
diff --git a/scripts/oval_lib.py b/scripts/oval_lib.py
index 20f452e..26908f9 100644
--- a/scripts/oval_lib.py
+++ b/scripts/oval_lib.py
@@ -19,6 +19,7 @@
19from __future__ import unicode_literals19from __future__ import unicode_literals
2020
21from datetime import datetime, timezone21from datetime import datetime, timezone
22import apt_pkg
22import io23import io
23import os24import os
24import random25import random
@@ -142,6 +143,49 @@ def generate_cve_tag(cve):
142 cve_ref += '>{0}</cve>'.format(cve['Candidate'])143 cve_ref += '>{0}</cve>'.format(cve['Candidate'])
143 return cve_ref144 return cve_ref
144145
146def get_latest_version(versions):
147 latest = None
148 for version in versions:
149 if not latest:
150 latest = version
151 continue
152 elif apt_pkg.version_compare(version, latest) > 0:
153 latest = version
154
155 return latest
156
157def get_binarypkgs(cache, source_name, release, version=None):
158 """ return a list of binary packages from the source package version """
159 packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-")
160 version_map = collections.defaultdict(list)
161 rel = release
162 cache_version = version
163
164 if source_name not in cache[release]:
165 # if a source package does not exist in such a release
166 # return None
167 return (None, None)
168 elif version and version not in cache[rel][source_name]:
169 # if version is not in release, then fetch latest source version
170 cache_version = get_latest_version(list(cache[rel][source_name].keys()))
171 elif not version:
172 # if no version is provided, get latest source version
173 version = get_latest_version(list(cache[rel][source_name].keys()))
174 cache_version = version
175 else:
176 cache_version = version
177
178 for binary, bin_data in cache[rel][source_name][cache_version]['binaries'].items():
179 # for kernel we only want linux images
180 if source_name.startswith('linux') and not binary.startswith('linux-image-'):
181 continue
182 # skip ignored packages, with exception of golang*-dev pkgs
183 if binary.startswith(('golang-go')) or \
184 not any(s in binary for s in packages_to_ignore):
185 version_map[bin_data['version']].append(binary)
186
187 return (version, version_map)
188
145class OvalGenerator:189class OvalGenerator:
146 supported_oval_elements = ('definition', 'test', 'object', 'state', 'variable')190 supported_oval_elements = ('definition', 'test', 'object', 'state', 'variable')
147 generator_version = '1.1'191 generator_version = '1.1'
@@ -340,13 +384,13 @@ class OvalGenerator:
340 return family_state, state384 return family_state, state
341385
342class CVEPkgRelEntry:386class CVEPkgRelEntry:
343 def __init__(self, pkg, release, cve, status, note) -> None:387 def __init__(self, pkg, release, cve, status, note, cache) -> None:
344 self.pkg = pkg388 self.pkg = pkg
345 self.cve = cve389 self.cve = cve
346 self.orig_status = status390 self.orig_status = status
347 self.orig_note = note391 self.orig_note = note
348 self.release = release392 self.release = release
349 cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, None)393 cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, cache)
350394
351 self.note = cve_info['note']395 self.note = cve_info['note']
352 self.status = cve_info['status']396 self.status = cve_info['status']
@@ -369,16 +413,13 @@ class CVEPkgRelEntry:
369 code = status_text.lower()413 code = status_text.lower()
370 detail = note.strip('()') if note else None414 detail = note.strip('()') if note else None
371 status = {}415 status = {}
372 fix_version = ""416 fix_version = None
373417
374 if detail and detail[0].isdigit() and len(detail.split(' ')) == 1:418 if detail and detail[0].isdigit() and len(detail.split(' ')) == 1:
375 fix_version = detail419 fix_version = detail
376420
377 if cache and code != 'dne':421 if cache and code != 'dne':
378 if fix_version and code in ['released', 'not-affected']:422 status['source-version'], status['bin-pkgs'] = get_binarypkgs(cache, package, release, version=fix_version)
379 status['bin-pkgs'] = cache.get_binarypkgs(package, release, version=fix_version)
380 else:
381 status['bin-pkgs'] = cache.get_binarypkgs(package, release)
382423
383 note_end = " (note: '{0}').".format(detail) if detail else '.'424 note_end = " (note: '{0}').".format(detail) if detail else '.'
384 if code == 'dne':425 if code == 'dne':
@@ -536,7 +577,7 @@ class OvalGeneratorPkg(OvalGenerator):
536 for cve in package.cves:577 for cve in package.cves:
537 if self.fixed_only and cve.pkg_rel_entries[package.name].status != 'fixed':578 if self.fixed_only and cve.pkg_rel_entries[package.name].status != 'fixed':
538 continue579 continue
539 cve_obj = self._generate_cve_object(cve)580 cve_obj = self._generate_cve_tag(cve)
540 advisory.append(cve_obj)581 advisory.append(cve_obj)
541582
542 rights.text = f"Copyright (C) {datetime.now().year} Canonical Ltd."583 rights.text = f"Copyright (C) {datetime.now().year} Canonical Ltd."
@@ -573,6 +614,19 @@ class OvalGeneratorPkg(OvalGenerator):
573614
574 return criteria615 return criteria
575616
617 def _generate_subcriteria(self, operator) -> etree.Element:
618 return etree.Element("criteria", attrib={
619 "operator": operator
620 })
621
622 def _generate_criterion_element(self, comment, id) -> etree.Element:
623 criterion = etree.Element("criterion", attrib={
624 "test_ref": f"{self.ns}:tst:{id}",
625 "comment": comment
626 })
627
628 return criterion
629
576 # Element generators630 # Element generators
577 def _generate_reference(self, package) -> etree.Element:631 def _generate_reference(self, package) -> etree.Element:
578 reference = etree.Element("reference", attrib={632 reference = etree.Element("reference", attrib={
@@ -583,7 +637,7 @@ class OvalGeneratorPkg(OvalGenerator):
583637
584 return reference638 return reference
585639
586 def _generate_definition_object(self, package) -> None:640 def _generate_definition_element(self, package) -> None:
587 id = f"{self.ns}:def:{self.definition_id}"641 id = f"{self.ns}:def:{self.definition_id}"
588 definition = etree.Element("definition")642 definition = etree.Element("definition")
589 definition.set("class", "vulnerability")643 definition.set("class", "vulnerability")
@@ -594,9 +648,10 @@ class OvalGeneratorPkg(OvalGenerator):
594 criteria = self._generate_criteria()648 criteria = self._generate_criteria()
595 definition.append(metadata)649 definition.append(metadata)
596 definition.append(criteria)650 definition.append(criteria)
651
597 return definition652 return definition
598653
599 def _generate_cve_object(self, cve: CVE) -> etree.Element:654 def _generate_cve_tag(self, cve: CVE) -> etree.Element:
600 cve_tag = etree.Element("cve",655 cve_tag = etree.Element("cve",
601 attrib={656 attrib={
602 'href' : f"https://ubuntu.com/security/{cve.number}",657 'href' : f"https://ubuntu.com/security/{cve.number}",
@@ -612,7 +667,8 @@ class OvalGeneratorPkg(OvalGenerator):
612 cve_tag.set('usns', ','.join(cve.usns))667 cve_tag.set('usns', ','.join(cve.usns))
613668
614 return cve_tag669 return cve_tag
615 def _generate_var_object(self, comment, id, binaries) -> etree.Element:670
671 def _generate_var_element(self, comment, id, binaries) -> etree.Element:
616 var = etree.Element("constant_variable",672 var = etree.Element("constant_variable",
617 attrib={673 attrib={
618 'id' : f"{self.ns}:var:{id}",674 'id' : f"{self.ns}:var:{id}",
@@ -627,7 +683,7 @@ class OvalGeneratorPkg(OvalGenerator):
627683
628 return var684 return var
629685
630 def _generate_object_object(self, comment, id, var_id) -> etree.Element:686 def _generate_object_element(self, comment, id, var_id) -> etree.Element:
631 if self.oval_format == 'dpkg':687 if self.oval_format == 'dpkg':
632 object = etree.Element("linux-def:dpkginfo_object",688 object = etree.Element("linux-def:dpkginfo_object",
633 attrib={689 attrib={
@@ -662,6 +718,7 @@ class OvalGeneratorPkg(OvalGenerator):
662 path.text = '.'718 path.text = '.'
663 filename.text = 'manifest'719 filename.text = 'manifest'
664 instance.text = '1'720 instance.text = '1'
721
665 return object722 return object
666723
667 def _generate_test_element(self, comment, id, create_state, type, obj_id = None, state_id=None) -> etree.Element:724 def _generate_test_element(self, comment, id, create_state, type, obj_id = None, state_id=None) -> etree.Element:
@@ -694,7 +751,10 @@ class OvalGeneratorPkg(OvalGenerator):
694751
695 return test752 return test
696753
697 def _generate_state_object(self, comment, id, version) -> None:754 def _generate_state_element(self, comment, id, version) -> None:
755 if version.find(':') == -1:
756 version = f"0:{version}"
757
698 if self.oval_format == 'dpkg':758 if self.oval_format == 'dpkg':
699 object = etree.Element("linux-def:dpkginfo_state",759 object = etree.Element("linux-def:dpkginfo_state",
700 attrib={760 attrib={
@@ -708,7 +768,7 @@ class OvalGeneratorPkg(OvalGenerator):
708 "operation": "less than"768 "operation": "less than"
709 })769 })
710770
711 version_check.text = f"0:{version}"771 version_check.text = f"{version}"
712 elif self.oval_format == 'oci':772 elif self.oval_format == 'oci':
713 object = etree.Element("ind-def:textfilecontent54_state",773 object = etree.Element("ind-def:textfilecontent54_state",
714 attrib={774 attrib={
@@ -722,20 +782,12 @@ class OvalGeneratorPkg(OvalGenerator):
722 "operation": "less than"782 "operation": "less than"
723 })783 })
724784
725 version_check.text = f"0:{version}"785 version_check.text = f"{version}"
726 else:786 else:
727 ValueError(f"Format not {self.oval_format} not supported")787 ValueError(f"Format not {self.oval_format} not supported")
728788
729 return object789 return object
730790
731 def _generate_criterion_element(self, comment, id) -> etree.Element:
732 criterion = etree.Element("criterion", attrib={
733 "test_ref": f"{self.ns}:tst:{id}",
734 "comment": comment
735 })
736
737 return criterion
738
739 # Running kernel element generators791 # Running kernel element generators
740 def _add_running_kernel_checks(self, root_element):792 def _add_running_kernel_checks(self, root_element):
741 objects = root_element.find("objects")793 objects = root_element.find("objects")
@@ -813,11 +865,6 @@ class OvalGeneratorPkg(OvalGenerator):
813 return test865 return test
814866
815 # Kernel elements generators867 # Kernel elements generators
816 def _generate_criteria_kernel(self, operator) -> etree.Element:
817 return etree.Element("criteria", attrib={
818 "operator": operator
819 })
820
821 def _generate_kernel_version_object_element(self, id, var_id) -> etree.Element:868 def _generate_kernel_version_object_element(self, id, var_id) -> etree.Element:
822 object = etree.Element("ind-def:variable_object",869 object = etree.Element("ind-def:variable_object",
823 attrib={870 attrib={
@@ -926,64 +973,45 @@ class OvalGeneratorPkg(OvalGenerator):
926 criterion = self._generate_criterion_element(criterion_note, id)973 criterion = self._generate_criterion_element(criterion_note, id)
927 self._add_to_criteria(definition, criterion, depth)974 self._add_to_criteria(definition, criterion, depth)
928975
929 def _generate_vulnerable_elements(self, package, obj_id=None):976 def _generate_elements(self, package, binaries, pkg_rel_entry, obj_id=None):
977 create_state = False
978 state = None
979 var = None
980 obj = None
930 binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary'981 binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary'
931 test_note = f"Does the '{package.name}' package exist?"
932 object_note = f"The '{package.name}' package {binary_keyword}"982 object_note = f"The '{package.name}' package {binary_keyword}"
983 test_note = ""
933984
934 test = self._generate_test_element(test_note, self.definition_id, False, 'pkg', obj_id=obj_id)985 if self.oval_format == 'oci':
935
936 if not obj_id:
937 object = self._generate_object_object(object_note, self.definition_id, self.definition_id)
938
939 bins = package.binaries
940 if is_kernel_binaries(package.binaries):986 if is_kernel_binaries(package.binaries):
941 regex = process_kernel_binaries(package.binaries, 'oci')987 regex = process_kernel_binaries(package.binaries, 'oci')
942 bins = [f'{regex}']988 binaries = [f'^{regex}(?::\w+|)\s+(.*)$']
943989 else:
944 binaries = []
945 if self.oval_format == 'oci':
946 variable_values = '(?::\w+|)\s+(.*)$'990 variable_values = '(?::\w+|)\s+(.*)$'
947 for binary in bins:991
992 binaries = []
993 for binary in package.binaries:
948 binaries.append(f'^{binary}{variable_values}')994 binaries.append(f'^{binary}{variable_values}')
949 else:
950 binaries = bins
951995
952 var = self._generate_var_object(object_note, self.definition_id, binaries)996 if pkg_rel_entry.status == 'vulnerable':
953 else:997 test_note = f"Does the '{package.name}' package exist?"
954 object = None998 elif pkg_rel_entry.status == 'fixed':
955 var = None999 test_note = f"Does the '{package.name}' package exist and is the version less than '{pkg_rel_entry.fixed_version}'?"
956 return test, object, var1000 state_note = f"The package version is less than '{pkg_rel_entry.fixed_version}'"
9571001
958 def _generate_fixed_elements(self, package, pkg_rel_entry, obj_id=None):1002 state = self._generate_state_element(state_note, self.definition_id, pkg_rel_entry.fixed_version)
959 binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary'1003 create_state = True
960 test_note = f"Does the '{package.name}' package exist and is the version less than '{pkg_rel_entry.fixed_version}'?"
961 object_note = f"The '{package.name}' package {binary_keyword}"
962 state_note = f"The package version is less than '{pkg_rel_entry.fixed_version}'"
9631004
964 test = self._generate_test_element(test_note, self.definition_id, True, 'pkg', obj_id=obj_id)1005 if not obj_id or create_state:
965 if not obj_id:1006 obj_id = None
966 object = self._generate_object_object(object_note, self.definition_id, self.definition_id)
9671007
968 binaries = package.binaries1008 var = self._generate_var_element(object_note, self.definition_id, binaries)
969 if self.oval_format == 'oci':
970 if is_kernel_binaries(package.binaries):
971 regex = process_kernel_binaries(package.binaries, 'oci')
972 binaries = [f'^{regex}(?::\w+|)\s+(.*)$']
973 else:
974 variable_values = '(?::\w+|)\s+(.*)$'
9751009
976 binaries = []1010 obj = self._generate_object_element(object_note, self.definition_id, self.definition_id)
977 for binary in package.binaries:
978 binaries.append(f'^{binary}{variable_values}')
9791011
980 var = self._generate_var_object(object_note, self.definition_id, binaries)1012 test = self._generate_test_element(test_note, self.definition_id, create_state, 'pkg', obj_id=obj_id)
981 else:
982 object = None
983 var = None
984 state = self._generate_state_object(state_note, self.definition_id, pkg_rel_entry.fixed_version)
9851013
986 return test, object, var, state1014 return test, obj, var, state
9871015
988 def _populate_pkg(self, package, root_element):1016 def _populate_pkg(self, package, root_element):
989 tests = root_element.find("tests")1017 tests = root_element.find("tests")
@@ -993,7 +1021,7 @@ class OvalGeneratorPkg(OvalGenerator):
9931021
994 # Add package definition1022 # Add package definition
995 definitions = root_element.find("definitions")1023 definitions = root_element.find("definitions")
996 definition_element = self._generate_definition_object(package)1024 definition_element = self._generate_definition_element(package)
9971025
998 # Control/cache variables1026 # Control/cache variables
999 one_time_added_id = None1027 one_time_added_id = None
@@ -1001,44 +1029,41 @@ class OvalGeneratorPkg(OvalGenerator):
1001 binaries_id = None1029 binaries_id = None
1002 cve_added = False1030 cve_added = False
10031031
1032 #criteria = None
1033 #if len(package.binaries) > 1:
1034 # criteria = self._generate_subcriteria('AND')
1035
1004 for cve in package.cves:1036 for cve in package.cves:
1005 pkg_rel_entry = cve.pkg_rel_entries[package.name]1037 pkg_rel_entry = cve.pkg_rel_entries[package.name]
1006 if pkg_rel_entry.status == 'vulnerable':1038 for key in sorted(list(package.binaries)):
1007 cve_added = True1039 binaries = package.binaries[key]
1008 if one_time_added_id:1040 if pkg_rel_entry.fixed_version:
1041 if pkg_rel_entry.fixed_version in fixed_versions:
1042 self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, definition_element)
1043 continue
1044 else:
1045 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
1046 fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id
1047 elif one_time_added_id:
1009 self._add_criterion(one_time_added_id, pkg_rel_entry, cve, definition_element)1048 self._add_criterion(one_time_added_id, pkg_rel_entry, cve, definition_element)
1049 continue
1010 else:1050 else:
1011 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)1051 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
1012
1013 test, object, var = self._generate_vulnerable_elements(package, binaries_id)
1014 tests.append(test)
1015
1016 if not binaries_id:
1017 objects.append(object)
1018 variables.append(var)
1019 binaries_id = self.definition_id
1020
1021 one_time_added_id = self.definition_id1052 one_time_added_id = self.definition_id
1022 self._increase_id(is_definition=False)
1023 elif pkg_rel_entry.status == 'fixed':
1024 cve_added = True
10251053
1026 if pkg_rel_entry.fixed_version in fixed_versions:1054 test, obj, var, state = self._generate_elements(package, binaries, pkg_rel_entry, binaries_id)
1027 self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, definition_element)
1028 else:
1029 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
10301055
1031 test, object, var, state = self._generate_fixed_elements(package, pkg_rel_entry, binaries_id)1056 if state:
1032 tests.append(test)
1033 states.append(state)1057 states.append(state)
10341058
1035 if not binaries_id:1059 if obj and var:
1036 objects.append(object)1060 binaries_id = self.definition_id
1037 variables.append(var)1061 variables.append(var)
1038 binaries_id = self.definition_id1062 objects.append(obj)
10391063
1040 fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id1064 tests.append(test)
1041 self._increase_id(is_definition=False)1065 self._increase_id(is_definition=False)
1066 cve_added = True
10421067
1043 if cve_added:1068 if cve_added:
1044 definitions.append(definition_element)1069 definitions.append(definition_element)
@@ -1048,7 +1073,7 @@ class OvalGeneratorPkg(OvalGenerator):
1048 def _populate_kernel_pkg(self, package, root_element, running_kernel_id):1073 def _populate_kernel_pkg(self, package, root_element, running_kernel_id):
1049 # Add package definition1074 # Add package definition
1050 definitions = root_element.find("definitions")1075 definitions = root_element.find("definitions")
1051 definition_element = self._generate_definition_object(package)1076 definition_element = self._generate_definition_element(package)
10521077
1053 # Control/cache variables1078 # Control/cache variables
1054 fixed_versions = {}1079 fixed_versions = {}
@@ -1056,7 +1081,7 @@ class OvalGeneratorPkg(OvalGenerator):
10561081
1057 # Generate one-time elements1082 # Generate one-time elements
1058 kernel_criterion = self._generate_kernel_package_elements(package, root_element, running_kernel_id)1083 kernel_criterion = self._generate_kernel_package_elements(package, root_element, running_kernel_id)
1059 criteria = self._generate_criteria_kernel('OR')1084 criteria = self._generate_subcriteria('OR')
10601085
1061 self._add_to_criteria(definition_element, kernel_criterion, operator='AND')1086 self._add_to_criteria(definition_element, kernel_criterion, operator='AND')
1062 self._add_to_criteria(definition_element, criteria, operator='AND')1087 self._add_to_criteria(definition_element, criteria, operator='AND')
@@ -1075,16 +1100,13 @@ class OvalGeneratorPkg(OvalGenerator):
10751100
1076 def _add_new_package(self, package_name, cve, release, cve_data, packages) -> None:1101 def _add_new_package(self, package_name, cve, release, cve_data, packages) -> None:
1077 if package_name not in packages:1102 if package_name not in packages:
1078 binaries = self.pkg_cache.get_binarypkgs(package_name, release)1103 version, binaries = get_binarypkgs(self.pkg_cache, package_name, release)
1079 version = ''
1080 if binaries:
1081 version = self.pkg_cache.pkgcache[package_name]['Releases'][release]['source_version']
10821104
1083 pkg_obj = Package(package_name, release, binaries, version)1105 pkg_obj = Package(package_name, release, binaries, version)
1084 packages[package_name] = pkg_obj1106 packages[package_name] = pkg_obj
10851107
1086 pkg_obj = packages[package_name]1108 pkg_obj = packages[package_name]
1087 cve_pkg_entry = CVEPkgRelEntry(pkg_obj, release, cve, cve_data['pkgs'][package_name][release][0], cve_data['pkgs'][package_name][release][1])1109 cve_pkg_entry = CVEPkgRelEntry(pkg_obj, release, cve, cve_data['pkgs'][package_name][release][0], cve_data['pkgs'][package_name][release][1], self.pkg_cache)
10881110
1089 if cve_pkg_entry.status != 'fixed' and self.fixed_only:1111 if cve_pkg_entry.status != 'fixed' and self.fixed_only:
1090 return1112 return
@@ -1223,14 +1245,13 @@ class OvalGeneratorCVE:
1223 test_refs = []1245 test_refs = []
1224 packages = cve['packages']1246 packages = cve['packages']
1225 for package in sorted(packages.keys()):1247 for package in sorted(packages.keys()):
1226 releases = packages[package]['Releases']1248 if self.release in packages[package]['Releases']:
1227 for release in sorted(releases.keys()):1249 release_status = packages[package]['Releases'][self.release]
1228 if release == self.release:1250 if 'bin-pkgs' in release_status and release_status['bin-pkgs']:
1229 release_status = releases[release]1251 for key in sorted(list(release_status['bin-pkgs'])):
1230 if 'bin-pkgs' in release_status and release_status['bin-pkgs']:
1231 pkg = {1252 pkg = {
1232 'name': package,1253 'name': package,
1233 'binaries': release_status['bin-pkgs'],1254 'binaries': release_status['bin-pkgs'][key],
1234 'status': release_status['status'],1255 'status': release_status['status'],
1235 'note': release_status['note'],1256 'note': release_status['note'],
1236 'fix-version': release_status['fix-version'] if 'fix-version' in release_status else '',1257 'fix-version': release_status['fix-version'] if 'fix-version' in release_status else '',
@@ -1251,9 +1272,11 @@ class OvalGeneratorCVE:
1251 if 'parent' in release_status:1272 if 'parent' in release_status:
1252 product_description = cve_lib.get_subproject_description(release_status['parent'])1273 product_description = cve_lib.get_subproject_description(release_status['parent'])
1253 else:1274 else:
1254 product_description = cve_lib.get_subproject_description(release)1275 product_description = cve_lib.get_subproject_description(self.release)
1255 instruction = prepare_instructions(instruction, header['Candidate'], product_description, pkg)1276 instruction = prepare_instructions(instruction, header['Candidate'], product_description, pkg)
12561277
1278
1279
1257 # if no packages for this release, then we're done1280 # if no packages for this release, then we're done
1258 if not len(test_refs):1281 if not len(test_refs):
1259 return False1282 return False

Subscribers

People subscribed via source and target branches