Merge ~ebarretto/ubuntu-cve-tracker:new-pkg-cache into ubuntu-cve-tracker:master

Proposed by Eduardo Barretto
Status: Merged
Merged at revision: a58b614ef83e3253e2d256f8fb6dc43fe25fc902
Proposed branch: ~ebarretto/ubuntu-cve-tracker:new-pkg-cache
Merge into: ubuntu-cve-tracker:master
Diff against target: 798 lines (+183/-312)
2 files modified
scripts/generate-oval (+46/-198)
scripts/oval_lib.py (+137/-114)
Reviewer Review Type Date Requested Status
David Fernandez Gonzalez Approve
Review via email: mp+449457@code.launchpad.net

Description of the change

This PR moves the OVAL generation for CVE and PKG based OVALs from the old cache mechanism to the new.
The new cache mechanism fixes a few of the issues we currently have, such as:
- long time to generate OVAL data
- situations where source package produces binaries with different versions, e.g. libreoffice.

It also fixes an epoch issue in the PKG-based OVAL and tries to simplify some of the PKG-based OVAL generation.

To post a comment you must log in.
Revision history for this message
David Fernandez Gonzalez (litios) wrote :

LGTM, thanks!

All the tests seem to generate the "same output" and tests pass.
A great increase in generation speed!

review: Approve

Preview Diff

[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1diff --git a/scripts/generate-oval b/scripts/generate-oval
2index 110d48e..6a80945 100755
3--- a/scripts/generate-oval
4+++ b/scripts/generate-oval
5@@ -31,15 +31,11 @@ import json
6 import os
7 import re
8 import sys
9-#from launchpadlib.launchpad import Launchpad
10
11 import apt_pkg
12-from cve_lib import (kernel_srcs, product_series, load_cve, PRODUCT_UBUNTU, all_releases, eol_releases, devel_release, release_parent, release_name, release_ppa, release_progenitor, needs_oval)
13+from cve_lib import (kernel_srcs, product_series, load_cve, PRODUCT_UBUNTU, all_releases, eol_releases, devel_release, release_parent, release_name, release_progenitor, needs_oval)
14 from kernel_lib import meta_kernels
15 import oval_lib
16-import functools
17-import lpl_common
18-import tempfile
19
20 # cope with apt_pkg api changes.
21 if 'init_system' in dir(apt_pkg):
22@@ -57,18 +53,15 @@ for r in set(all_releases).difference(set(eol_releases)).difference(set([devel_r
23
24 default_cves_to_process = ['active/CVE-*', 'retired/CVE-*']
25
26-packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-")
27-
28 debug_level = 0
29
30 package_cache = None
31
32-cve_cache = {}
33-
34 def main():
35 """ parse command line options and iterate through files to be processed
36 """
37 global debug_level
38+ global supported_releases
39
40 # parse command line options
41 parser = argparse.ArgumentParser(description='Generate CVE OVAL from '
42@@ -90,10 +83,8 @@ def main():
43 '(default is ./)')
44 parser.add_argument('--no-progress', action='store_true',
45 help='do not show progress meter')
46- parser.add_argument('--pkg-cache', action='store', default="pkg_cache.json",
47+ parser.add_argument('--pkg-cache-dir', action='store', default='./',
48 help='cache location for binary packages')
49- parser.add_argument('--force-cache-reload', action='store_true',
50- help='force reload of cache file')
51 parser.add_argument('-d', '--debug', action='count', default=0,
52 help="report debugging information")
53 parser.add_argument('--usn-oval', action='store_true',
54@@ -149,25 +140,27 @@ def main():
55
56 return
57
58- cve_cache = {}
59- cache = PackageCache(args.pkg_cache, args.force_cache_reload)
60+ releases = [args.oval_release] if args.oval_release else supported_releases
61+ supported_releases = releases
62+ cache = {}
63+ for release in releases:
64+ cve_cache = {}
65+ cache.update({release: get_package_cache(args.pkg_cache_dir, release)})
66
67 if args.pkg_oval:
68- releases = [args.oval_release] if args.oval_release else supported_releases
69- for release in releases:
70- if args.oci:
71- generate_oval_package(release, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ociprefix, ocioutdir)
72- else:
73- generate_oval_package(release, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only)
74+ if args.oci:
75+ generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ociprefix, ocioutdir)
76+ else:
77+ generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only)
78 return
79
80 if args.oci:
81- generate_oval_cve(args.output_dir, args.cve_prefix_dir, cache, args.oci,
82- args.no_progress, args.packages, pathnames, ociprefix, ocioutdir)
83+ generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci,
84+ args.no_progress, args.packages, pathnames, ociprefix, ocioutdir)
85 else:
86- generate_oval_cve(args.output_dir, args.cve_prefix_dir, cache, args.oci,
87- args.no_progress, args.packages, pathnames)
88-
89+ generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci,
90+ args.no_progress, args.packages, pathnames)
91+ return
92
93
94 # given a status generated by parse_package_status(), duplicate it for a
95@@ -182,10 +175,8 @@ def duplicate_package_status(release, package, original_status, cache, override_
96 elif 'fix-version' in original_status:
97 copied_status['fix-version'] = original_status['fix-version']
98
99- if 'fix-version' in copied_status and copied_status['fix-version'].isdigit():
100- copied_status['bin-pkgs'] = cache.get_binarypkgs(package, release, version=copied_status['fix-version'])
101- else:
102- copied_status['bin-pkgs'] = cache.get_binarypkgs(package, release)
103+ if 'bin-pkgs' in original_status:
104+ copied_status['bin-pkgs'] = original_status['bin-pkgs']
105
106 return copied_status
107
108@@ -355,158 +346,16 @@ def prepend_usn_to_id(usn_database, usn_id):
109 if re.search(r'^[0-9]+-[0-9]$', usn_id):
110 usn_database[usn_id]['id'] = 'USN-' + usn_id
111
112-# Class to contain the binary package cache
113-class PackageCache():
114-
115- cachefile = None
116- cache_updates = 0
117- releases = dict()
118- unpublished_sources = dict()
119- packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-")
120-
121- def __init__(self, cachefile='data_file.json', force_reload=False):
122- self.cachefile = cachefile
123- self.force_reload = force_reload
124-
125- # open the local cache if it exists
126- if os.path.exists(self.cachefile):
127- debug('Opening and reading cache file %s' % self.cachefile)
128- with open(self.cachefile, "r") as read_file:
129- self.pkgcache = json.load(read_file)
130- else:
131- self.pkgcache = dict()
132- self.force_reload = True
133-
134- # Get launchpad handlers...
135- debug('Setting up launchpad connection...')
136- self.lp = lpl_common.connect(version='devel')
137- #lp = Launchpad.login_anonymously("generate-oval", "production", version='devel')
138- self.ubuntu = self.lp.distributions['ubuntu']
139- self.archive = self.ubuntu.main_archive
140-
141- # if debugging is enabled, flush cache every update
142- self.cache_write_frequency = 1 if debug_level > 0 else 100
143-
144- def write_cache(self):
145- debug('Writing cache file %s' % self.cachefile)
146- if self.cachefile:
147- # create as separate file and try to do atomic rename so
148- # multiple writers don't corrupt cache.
149- with tempfile.NamedTemporaryFile(mode='w', prefix='oval_pkg_cache-', suffix='.new', dir=os.path.dirname(self.cachefile), delete=False) as write_file:
150- new_cachefile = write_file.name
151- json.dump(self.pkgcache, write_file, indent=2)
152-
153- os.rename(new_cachefile, self.cachefile)
154-
155- def _has_no_published_source(self, package, release):
156- return (package in self.unpublished_sources
157- and release in self.unpublished_sources[package])
158-
159- def _add_no_published_source(self, package, release):
160- if package not in self.unpublished_sources:
161- self.unpublished_sources[package] = [release]
162- else:
163- self.unpublished_sources[package].append(release)
164-
165- # lookup source package in launchpad, get latest version
166- def _lookup_latest_source_package(self, source_name, release):
167-
168- # cache lp release info
169- if release not in self.releases:
170- # we can have nested parent releases
171- parent = release_parent(release)
172- while release_parent(parent):
173- parent = release_parent(parent)
174- if parent:
175- self.releases[release] = self.ubuntu.getSeries(name_or_version=parent)
176- else:
177- self.releases[release] = self.ubuntu.getSeries(name_or_version=product_series(release)[1])
178-
179- ppa = release_ppa(release)
180- if ppa:
181- archive, group, ppa_full_name = lpl_common.get_archive(ppa, self.lp, False, distribution=self.ubuntu)
182- sources = archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published')
183- if len(sources) == 0:
184- # if release is subproject and no version of package was released to it
185- # then copy source from parent release
186- sources = self.archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published')
187- else:
188- sources = self.archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published')
189-
190- # some kernels get statuses even when not published in the
191- # archive yet
192- if len(sources) == 0:
193- self._add_no_published_source(source_name, release)
194- return None
195-
196- # in python3, filter returns an iterable object, so wrap in list()
197- sources = list(filter(lambda x: x.pocket in ['Release', 'Security', 'Updates'], sources))
198- # some packages are only in proposed, even for non-devel releases
199- if len(sources) == 0:
200- self._add_no_published_source(source_name, release)
201- return None
202-
203- source = sorted(sources,
204- key=functools.cmp_to_key(lambda x, y: apt_pkg.version_compare(x.source_package_version, y.source_package_version)),
205- reverse=True)[0]
206- debug('Launchpad returned %s %s' % (source.source_package_name, source.source_package_version))
207- return source
208-
209- def get_binarypkgs(self, pname, release, version=None):
210- """ return a list of binary packages from the source package """
211-
212- # first check local cache
213- #
214- # if the version in the cve tracker is newer than the
215- # version in the cache, we should refresh the cache
216- # if there's no version in the tracker, return the cached entry
217- if pname in self.pkgcache:
218- if (release in self.pkgcache[pname]['Releases'] and
219- (not version or
220- apt_pkg.version_compare(version, self.pkgcache[pname]['Releases'][release].get('source_version', 0)) <= 0)):
221- return self.pkgcache[pname]['Releases'][release]['binaries']
222-
223- debug('Cache miss: %s %s %s' % (pname, release, version))
224-
225- # skip lookup if unpublished_sources is empty and
226- # force_reload is False
227- if not self.unpublished_sources and not self.force_reload:
228- if pname not in self.pkgcache or release not in self.pkgcache[pname]['Releases']:
229- return None
230-
231- # skip lookup if we've already done a lookup and found no
232- # published source for that release
233- if self._has_no_published_source(pname, release):
234- return None
235-
236- # query launchpad if not in local cache
237- source = self._lookup_latest_source_package(pname, release)
238- if not source:
239- return None
240-
241- binaries = source.getPublishedBinaries()
242- binlist = []
243- for i in binaries:
244- # skip if we already saw this package
245- if (i.binary_package_name in binlist):
246- continue
247- # for kernel we only want linux images
248- if pname.startswith('linux') and not (i.binary_package_name).startswith('linux-image-'):
249- continue
250- # skip ignored packages, with exception of golang*-dev pkgs
251- if (i.binary_package_name).startswith(('golang-go')) or not any(s in i.binary_package_name for s in self.packages_to_ignore):
252- binlist.append(i.binary_package_name)
253-
254- # save current pkgcache to local cache
255- if pname not in self.pkgcache:
256- self.pkgcache[pname] = {'Releases': {}}
257- self.pkgcache[pname]['Releases'][release] = {'binaries': binlist, 'source_version': source.source_package_version}
258-
259- self.cache_updates += 1
260- if self.cache_updates % self.cache_write_frequency == 0:
261- self.write_cache()
262+# loads cve package cache <release>-pkg-cache.json based given path to it.
263+# To get the cache proceed as: $UCT/scripts/fetch-db <release>-pkg-cache.json pkg-cache
264+def get_package_cache(pkg_cache_dir, release):
265+ data = {}
266+ for filename in glob.glob(os.path.join(pkg_cache_dir, release.replace('/', '_') + '-pkg-cache.json')):
267+ debug(f"Opening and reading cache file {filename}")
268+ with open(filename, 'r') as f:
269+ data = json.load(f)
270
271- return binlist
272+ return data
273
274 # loads usn database.json based given a path to it.
275 # To get the database proceed as: $UCT/scripts/fetch-db database.json.bz2
276@@ -581,31 +430,31 @@ def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=N
277
278 return True
279
280-def generate_oval_package(release, outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ociprefix='', ocioutdir=None):
281- if not no_progress:
282- print(f'[*] Generating OVAL for packages in release {release}')
283- ov = oval_lib.OvalGeneratorPkg(release, release_name(release), pathnames, packages, not no_progress,pkg_cache=pkg_cache, fixed_only=fixed_only, cve_cache=cve_cache, oval_format='oci' if oci else 'dpkg', outdir=outdir, cve_prefix_dir=cve_prefix_dir, prefix=ociprefix)
284- ov.generate_oval()
285-
286- if oci:
287- ov.oval_format = 'dpkg'
288+def generate_oval_package(outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ociprefix='', ocioutdir=None):
289+ for release in supported_releases:
290+ if not no_progress:
291+ print(f'[*] Generating OVAL for packages in release {release}')
292+ ov = oval_lib.OvalGeneratorPkg(release, release_name(release), pathnames, packages, not no_progress, pkg_cache=pkg_cache, fixed_only=fixed_only, cve_cache=cve_cache, oval_format='oci' if oci else 'dpkg', outdir=outdir, cve_prefix_dir=cve_prefix_dir, prefix=ociprefix)
293 ov.generate_oval()
294
295- pkg_cache.write_cache()
296- if not no_progress:
297- print(f'[X] Done generating OVAL for packages in release {release}')
298+ if oci:
299+ ov.oval_format = 'dpkg'
300+ ov.generate_oval()
301+
302+ if not no_progress:
303+ print(f'[X] Done generating OVAL for packages in release {release}')
304
305 def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages, pathnames, ociprefix=None, ocioutdir=None):
306 ovals = dict()
307- for i in supported_releases:
308+ for release in supported_releases:
309 # we can have nested parent releases
310- parent = release_progenitor(i)
311- index = '{0}_dpkg'.format(i)
312- ovals[index] = oval_lib.OvalGeneratorCVE(i, release_name(i), parent, warn, outdir, prefix='', oval_format='dpkg')
313+ parent = release_progenitor(release)
314+ index = '{0}_dpkg'.format(release)
315+ ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, outdir, prefix='', oval_format='dpkg')
316 ovals[index].add_release_applicability_definition()
317 if oci:
318- index = '{0}_oci'.format(i)
319- ovals[index] = oval_lib.OvalGeneratorCVE(i, release_name(i), parent, warn, ocioutdir, prefix=ociprefix, oval_format='oci')
320+ index = '{0}_oci'.format(release)
321+ ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, ocioutdir, prefix=ociprefix, oval_format='oci')
322 ovals[index].add_release_applicability_definition()
323
324 # loop through all CVE data files
325@@ -636,6 +485,5 @@ def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages,
326 for i in ovals:
327 ovals[i].write_to_file()
328
329- cache.write_cache()
330 if __name__ == '__main__':
331 main()
332diff --git a/scripts/oval_lib.py b/scripts/oval_lib.py
333index 20f452e..26908f9 100644
334--- a/scripts/oval_lib.py
335+++ b/scripts/oval_lib.py
336@@ -19,6 +19,7 @@
337 from __future__ import unicode_literals
338
339 from datetime import datetime, timezone
340+import apt_pkg
341 import io
342 import os
343 import random
344@@ -142,6 +143,49 @@ def generate_cve_tag(cve):
345 cve_ref += '>{0}</cve>'.format(cve['Candidate'])
346 return cve_ref
347
348+def get_latest_version(versions):
349+ latest = None
350+ for version in versions:
351+ if not latest:
352+ latest = version
353+ continue
354+ elif apt_pkg.version_compare(version, latest) > 0:
355+ latest = version
356+
357+ return latest
358+
359+def get_binarypkgs(cache, source_name, release, version=None):
360+ """ return a list of binary packages from the source package version """
361+ packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-")
362+ version_map = collections.defaultdict(list)
363+ rel = release
364+ cache_version = version
365+
366+ if source_name not in cache[release]:
367+ # if a source package does not exist in such a release
368+ # return None
369+ return (None, None)
370+ elif version and version not in cache[rel][source_name]:
371+ # if version is not in release, then fetch latest source version
372+ cache_version = get_latest_version(list(cache[rel][source_name].keys()))
373+ elif not version:
374+ # if no version is provided, get latest source version
375+ version = get_latest_version(list(cache[rel][source_name].keys()))
376+ cache_version = version
377+ else:
378+ cache_version = version
379+
380+ for binary, bin_data in cache[rel][source_name][cache_version]['binaries'].items():
381+ # for kernel we only want linux images
382+ if source_name.startswith('linux') and not binary.startswith('linux-image-'):
383+ continue
384+ # skip ignored packages, with exception of golang*-dev pkgs
385+ if binary.startswith(('golang-go')) or \
386+ not any(s in binary for s in packages_to_ignore):
387+ version_map[bin_data['version']].append(binary)
388+
389+ return (version, version_map)
390+
391 class OvalGenerator:
392 supported_oval_elements = ('definition', 'test', 'object', 'state', 'variable')
393 generator_version = '1.1'
394@@ -340,13 +384,13 @@ class OvalGenerator:
395 return family_state, state
396
397 class CVEPkgRelEntry:
398- def __init__(self, pkg, release, cve, status, note) -> None:
399+ def __init__(self, pkg, release, cve, status, note, cache) -> None:
400 self.pkg = pkg
401 self.cve = cve
402 self.orig_status = status
403 self.orig_note = note
404 self.release = release
405- cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, None)
406+ cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, cache)
407
408 self.note = cve_info['note']
409 self.status = cve_info['status']
410@@ -369,16 +413,13 @@ class CVEPkgRelEntry:
411 code = status_text.lower()
412 detail = note.strip('()') if note else None
413 status = {}
414- fix_version = ""
415+ fix_version = None
416
417 if detail and detail[0].isdigit() and len(detail.split(' ')) == 1:
418 fix_version = detail
419
420 if cache and code != 'dne':
421- if fix_version and code in ['released', 'not-affected']:
422- status['bin-pkgs'] = cache.get_binarypkgs(package, release, version=fix_version)
423- else:
424- status['bin-pkgs'] = cache.get_binarypkgs(package, release)
425+ status['source-version'], status['bin-pkgs'] = get_binarypkgs(cache, package, release, version=fix_version)
426
427 note_end = " (note: '{0}').".format(detail) if detail else '.'
428 if code == 'dne':
429@@ -536,7 +577,7 @@ class OvalGeneratorPkg(OvalGenerator):
430 for cve in package.cves:
431 if self.fixed_only and cve.pkg_rel_entries[package.name].status != 'fixed':
432 continue
433- cve_obj = self._generate_cve_object(cve)
434+ cve_obj = self._generate_cve_tag(cve)
435 advisory.append(cve_obj)
436
437 rights.text = f"Copyright (C) {datetime.now().year} Canonical Ltd."
438@@ -573,6 +614,19 @@ class OvalGeneratorPkg(OvalGenerator):
439
440 return criteria
441
442+ def _generate_subcriteria(self, operator) -> etree.Element:
443+ return etree.Element("criteria", attrib={
444+ "operator": operator
445+ })
446+
447+ def _generate_criterion_element(self, comment, id) -> etree.Element:
448+ criterion = etree.Element("criterion", attrib={
449+ "test_ref": f"{self.ns}:tst:{id}",
450+ "comment": comment
451+ })
452+
453+ return criterion
454+
455 # Element generators
456 def _generate_reference(self, package) -> etree.Element:
457 reference = etree.Element("reference", attrib={
458@@ -583,7 +637,7 @@ class OvalGeneratorPkg(OvalGenerator):
459
460 return reference
461
462- def _generate_definition_object(self, package) -> None:
463+ def _generate_definition_element(self, package) -> None:
464 id = f"{self.ns}:def:{self.definition_id}"
465 definition = etree.Element("definition")
466 definition.set("class", "vulnerability")
467@@ -594,9 +648,10 @@ class OvalGeneratorPkg(OvalGenerator):
468 criteria = self._generate_criteria()
469 definition.append(metadata)
470 definition.append(criteria)
471+
472 return definition
473
474- def _generate_cve_object(self, cve: CVE) -> etree.Element:
475+ def _generate_cve_tag(self, cve: CVE) -> etree.Element:
476 cve_tag = etree.Element("cve",
477 attrib={
478 'href' : f"https://ubuntu.com/security/{cve.number}",
479@@ -612,7 +667,8 @@ class OvalGeneratorPkg(OvalGenerator):
480 cve_tag.set('usns', ','.join(cve.usns))
481
482 return cve_tag
483- def _generate_var_object(self, comment, id, binaries) -> etree.Element:
484+
485+ def _generate_var_element(self, comment, id, binaries) -> etree.Element:
486 var = etree.Element("constant_variable",
487 attrib={
488 'id' : f"{self.ns}:var:{id}",
489@@ -627,7 +683,7 @@ class OvalGeneratorPkg(OvalGenerator):
490
491 return var
492
493- def _generate_object_object(self, comment, id, var_id) -> etree.Element:
494+ def _generate_object_element(self, comment, id, var_id) -> etree.Element:
495 if self.oval_format == 'dpkg':
496 object = etree.Element("linux-def:dpkginfo_object",
497 attrib={
498@@ -662,6 +718,7 @@ class OvalGeneratorPkg(OvalGenerator):
499 path.text = '.'
500 filename.text = 'manifest'
501 instance.text = '1'
502+
503 return object
504
505 def _generate_test_element(self, comment, id, create_state, type, obj_id = None, state_id=None) -> etree.Element:
506@@ -694,7 +751,10 @@ class OvalGeneratorPkg(OvalGenerator):
507
508 return test
509
510- def _generate_state_object(self, comment, id, version) -> None:
511+ def _generate_state_element(self, comment, id, version) -> None:
512+ if version.find(':') == -1:
513+ version = f"0:{version}"
514+
515 if self.oval_format == 'dpkg':
516 object = etree.Element("linux-def:dpkginfo_state",
517 attrib={
518@@ -708,7 +768,7 @@ class OvalGeneratorPkg(OvalGenerator):
519 "operation": "less than"
520 })
521
522- version_check.text = f"0:{version}"
523+ version_check.text = f"{version}"
524 elif self.oval_format == 'oci':
525 object = etree.Element("ind-def:textfilecontent54_state",
526 attrib={
527@@ -722,20 +782,12 @@ class OvalGeneratorPkg(OvalGenerator):
528 "operation": "less than"
529 })
530
531- version_check.text = f"0:{version}"
532+ version_check.text = f"{version}"
533 else:
534 ValueError(f"Format not {self.oval_format} not supported")
535
536 return object
537
538- def _generate_criterion_element(self, comment, id) -> etree.Element:
539- criterion = etree.Element("criterion", attrib={
540- "test_ref": f"{self.ns}:tst:{id}",
541- "comment": comment
542- })
543-
544- return criterion
545-
546 # Running kernel element generators
547 def _add_running_kernel_checks(self, root_element):
548 objects = root_element.find("objects")
549@@ -813,11 +865,6 @@ class OvalGeneratorPkg(OvalGenerator):
550 return test
551
552 # Kernel elements generators
553- def _generate_criteria_kernel(self, operator) -> etree.Element:
554- return etree.Element("criteria", attrib={
555- "operator": operator
556- })
557-
558 def _generate_kernel_version_object_element(self, id, var_id) -> etree.Element:
559 object = etree.Element("ind-def:variable_object",
560 attrib={
561@@ -926,64 +973,45 @@ class OvalGeneratorPkg(OvalGenerator):
562 criterion = self._generate_criterion_element(criterion_note, id)
563 self._add_to_criteria(definition, criterion, depth)
564
565- def _generate_vulnerable_elements(self, package, obj_id=None):
566+ def _generate_elements(self, package, binaries, pkg_rel_entry, obj_id=None):
567+ create_state = False
568+ state = None
569+ var = None
570+ obj = None
571 binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary'
572- test_note = f"Does the '{package.name}' package exist?"
573 object_note = f"The '{package.name}' package {binary_keyword}"
574+ test_note = ""
575
576- test = self._generate_test_element(test_note, self.definition_id, False, 'pkg', obj_id=obj_id)
577-
578- if not obj_id:
579- object = self._generate_object_object(object_note, self.definition_id, self.definition_id)
580-
581- bins = package.binaries
582+ if self.oval_format == 'oci':
583 if is_kernel_binaries(package.binaries):
584 regex = process_kernel_binaries(package.binaries, 'oci')
585- bins = [f'{regex}']
586-
587- binaries = []
588- if self.oval_format == 'oci':
589+ binaries = [f'^{regex}(?::\w+|)\s+(.*)$']
590+ else:
591 variable_values = '(?::\w+|)\s+(.*)$'
592- for binary in bins:
593+
594+ binaries = []
595+ for binary in package.binaries:
596 binaries.append(f'^{binary}{variable_values}')
597- else:
598- binaries = bins
599
600- var = self._generate_var_object(object_note, self.definition_id, binaries)
601- else:
602- object = None
603- var = None
604- return test, object, var
605+ if pkg_rel_entry.status == 'vulnerable':
606+ test_note = f"Does the '{package.name}' package exist?"
607+ elif pkg_rel_entry.status == 'fixed':
608+ test_note = f"Does the '{package.name}' package exist and is the version less than '{pkg_rel_entry.fixed_version}'?"
609+ state_note = f"The package version is less than '{pkg_rel_entry.fixed_version}'"
610
611- def _generate_fixed_elements(self, package, pkg_rel_entry, obj_id=None):
612- binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary'
613- test_note = f"Does the '{package.name}' package exist and is the version less than '{pkg_rel_entry.fixed_version}'?"
614- object_note = f"The '{package.name}' package {binary_keyword}"
615- state_note = f"The package version is less than '{pkg_rel_entry.fixed_version}'"
616+ state = self._generate_state_element(state_note, self.definition_id, pkg_rel_entry.fixed_version)
617+ create_state = True
618
619- test = self._generate_test_element(test_note, self.definition_id, True, 'pkg', obj_id=obj_id)
620- if not obj_id:
621- object = self._generate_object_object(object_note, self.definition_id, self.definition_id)
622+ if not obj_id or create_state:
623+ obj_id = None
624
625- binaries = package.binaries
626- if self.oval_format == 'oci':
627- if is_kernel_binaries(package.binaries):
628- regex = process_kernel_binaries(package.binaries, 'oci')
629- binaries = [f'^{regex}(?::\w+|)\s+(.*)$']
630- else:
631- variable_values = '(?::\w+|)\s+(.*)$'
632+ var = self._generate_var_element(object_note, self.definition_id, binaries)
633
634- binaries = []
635- for binary in package.binaries:
636- binaries.append(f'^{binary}{variable_values}')
637+ obj = self._generate_object_element(object_note, self.definition_id, self.definition_id)
638
639- var = self._generate_var_object(object_note, self.definition_id, binaries)
640- else:
641- object = None
642- var = None
643- state = self._generate_state_object(state_note, self.definition_id, pkg_rel_entry.fixed_version)
644+ test = self._generate_test_element(test_note, self.definition_id, create_state, 'pkg', obj_id=obj_id)
645
646- return test, object, var, state
647+ return test, obj, var, state
648
649 def _populate_pkg(self, package, root_element):
650 tests = root_element.find("tests")
651@@ -993,7 +1021,7 @@ class OvalGeneratorPkg(OvalGenerator):
652
653 # Add package definition
654 definitions = root_element.find("definitions")
655- definition_element = self._generate_definition_object(package)
656+ definition_element = self._generate_definition_element(package)
657
658 # Control/cache variables
659 one_time_added_id = None
660@@ -1001,44 +1029,41 @@ class OvalGeneratorPkg(OvalGenerator):
661 binaries_id = None
662 cve_added = False
663
664+ #criteria = None
665+ #if len(package.binaries) > 1:
666+ # criteria = self._generate_subcriteria('AND')
667+
668 for cve in package.cves:
669 pkg_rel_entry = cve.pkg_rel_entries[package.name]
670- if pkg_rel_entry.status == 'vulnerable':
671- cve_added = True
672- if one_time_added_id:
673+ for key in sorted(list(package.binaries)):
674+ binaries = package.binaries[key]
675+ if pkg_rel_entry.fixed_version:
676+ if pkg_rel_entry.fixed_version in fixed_versions:
677+ self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, definition_element)
678+ continue
679+ else:
680+ self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
681+ fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id
682+ elif one_time_added_id:
683 self._add_criterion(one_time_added_id, pkg_rel_entry, cve, definition_element)
684+ continue
685 else:
686 self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
687-
688- test, object, var = self._generate_vulnerable_elements(package, binaries_id)
689- tests.append(test)
690-
691- if not binaries_id:
692- objects.append(object)
693- variables.append(var)
694- binaries_id = self.definition_id
695-
696 one_time_added_id = self.definition_id
697- self._increase_id(is_definition=False)
698- elif pkg_rel_entry.status == 'fixed':
699- cve_added = True
700
701- if pkg_rel_entry.fixed_version in fixed_versions:
702- self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, definition_element)
703- else:
704- self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element)
705+ test, obj, var, state = self._generate_elements(package, binaries, pkg_rel_entry, binaries_id)
706
707- test, object, var, state = self._generate_fixed_elements(package, pkg_rel_entry, binaries_id)
708- tests.append(test)
709+ if state:
710 states.append(state)
711
712- if not binaries_id:
713- objects.append(object)
714- variables.append(var)
715- binaries_id = self.definition_id
716+ if obj and var:
717+ binaries_id = self.definition_id
718+ variables.append(var)
719+ objects.append(obj)
720
721- fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id
722- self._increase_id(is_definition=False)
723+ tests.append(test)
724+ self._increase_id(is_definition=False)
725+ cve_added = True
726
727 if cve_added:
728 definitions.append(definition_element)
729@@ -1048,7 +1073,7 @@ class OvalGeneratorPkg(OvalGenerator):
730 def _populate_kernel_pkg(self, package, root_element, running_kernel_id):
731 # Add package definition
732 definitions = root_element.find("definitions")
733- definition_element = self._generate_definition_object(package)
734+ definition_element = self._generate_definition_element(package)
735
736 # Control/cache variables
737 fixed_versions = {}
738@@ -1056,7 +1081,7 @@ class OvalGeneratorPkg(OvalGenerator):
739
740 # Generate one-time elements
741 kernel_criterion = self._generate_kernel_package_elements(package, root_element, running_kernel_id)
742- criteria = self._generate_criteria_kernel('OR')
743+ criteria = self._generate_subcriteria('OR')
744
745 self._add_to_criteria(definition_element, kernel_criterion, operator='AND')
746 self._add_to_criteria(definition_element, criteria, operator='AND')
747@@ -1075,16 +1100,13 @@ class OvalGeneratorPkg(OvalGenerator):
748
749 def _add_new_package(self, package_name, cve, release, cve_data, packages) -> None:
750 if package_name not in packages:
751- binaries = self.pkg_cache.get_binarypkgs(package_name, release)
752- version = ''
753- if binaries:
754- version = self.pkg_cache.pkgcache[package_name]['Releases'][release]['source_version']
755+ version, binaries = get_binarypkgs(self.pkg_cache, package_name, release)
756
757 pkg_obj = Package(package_name, release, binaries, version)
758 packages[package_name] = pkg_obj
759
760 pkg_obj = packages[package_name]
761- cve_pkg_entry = CVEPkgRelEntry(pkg_obj, release, cve, cve_data['pkgs'][package_name][release][0], cve_data['pkgs'][package_name][release][1])
762+ cve_pkg_entry = CVEPkgRelEntry(pkg_obj, release, cve, cve_data['pkgs'][package_name][release][0], cve_data['pkgs'][package_name][release][1], self.pkg_cache)
763
764 if cve_pkg_entry.status != 'fixed' and self.fixed_only:
765 return
766@@ -1223,14 +1245,13 @@ class OvalGeneratorCVE:
767 test_refs = []
768 packages = cve['packages']
769 for package in sorted(packages.keys()):
770- releases = packages[package]['Releases']
771- for release in sorted(releases.keys()):
772- if release == self.release:
773- release_status = releases[release]
774- if 'bin-pkgs' in release_status and release_status['bin-pkgs']:
775+ if self.release in packages[package]['Releases']:
776+ release_status = packages[package]['Releases'][self.release]
777+ if 'bin-pkgs' in release_status and release_status['bin-pkgs']:
778+ for key in sorted(list(release_status['bin-pkgs'])):
779 pkg = {
780 'name': package,
781- 'binaries': release_status['bin-pkgs'],
782+ 'binaries': release_status['bin-pkgs'][key],
783 'status': release_status['status'],
784 'note': release_status['note'],
785 'fix-version': release_status['fix-version'] if 'fix-version' in release_status else '',
786@@ -1251,9 +1272,11 @@ class OvalGeneratorCVE:
787 if 'parent' in release_status:
788 product_description = cve_lib.get_subproject_description(release_status['parent'])
789 else:
790- product_description = cve_lib.get_subproject_description(release)
791+ product_description = cve_lib.get_subproject_description(self.release)
792 instruction = prepare_instructions(instruction, header['Candidate'], product_description, pkg)
793
794+
795+
796 # if no packages for this release, then we're done
797 if not len(test_refs):
798 return False

Subscribers

People subscribed via source and target branches