Merge ~ebarretto/ubuntu-cve-tracker:new-pkg-cache into ubuntu-cve-tracker:master
- Git
- lp:~ebarretto/ubuntu-cve-tracker
- new-pkg-cache
- Merge into master
Proposed by
Eduardo Barretto
Status: | Merged |
---|---|
Merged at revision: | a58b614ef83e3253e2d256f8fb6dc43fe25fc902 |
Proposed branch: | ~ebarretto/ubuntu-cve-tracker:new-pkg-cache |
Merge into: | ubuntu-cve-tracker:master |
Diff against target: |
798 lines (+183/-312) 2 files modified
scripts/generate-oval (+46/-198) scripts/oval_lib.py (+137/-114) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
David Fernandez Gonzalez | Approve | ||
Review via email: mp+449457@code.launchpad.net |
Commit message
Description of the change
This PR moves the OVAL generation for CVE and PKG based OVALs from the old cache mechanism to the new.
The new cache mechanism fixes a few of the issues we currently have, such as:
- long time to generate OVAL data
- situations where source package produces binaries with different versions, e.g. libreoffice.
It also fixes an epoch issue in the PKG-based OVAL and tries to simplify some of the PKG-based OVAL generation.
To post a comment you must log in.
Preview Diff
[H/L] Next/Prev Comment, [J/K] Next/Prev File, [N/P] Next/Prev Hunk
1 | diff --git a/scripts/generate-oval b/scripts/generate-oval |
2 | index 110d48e..6a80945 100755 |
3 | --- a/scripts/generate-oval |
4 | +++ b/scripts/generate-oval |
5 | @@ -31,15 +31,11 @@ import json |
6 | import os |
7 | import re |
8 | import sys |
9 | -#from launchpadlib.launchpad import Launchpad |
10 | |
11 | import apt_pkg |
12 | -from cve_lib import (kernel_srcs, product_series, load_cve, PRODUCT_UBUNTU, all_releases, eol_releases, devel_release, release_parent, release_name, release_ppa, release_progenitor, needs_oval) |
13 | +from cve_lib import (kernel_srcs, product_series, load_cve, PRODUCT_UBUNTU, all_releases, eol_releases, devel_release, release_parent, release_name, release_progenitor, needs_oval) |
14 | from kernel_lib import meta_kernels |
15 | import oval_lib |
16 | -import functools |
17 | -import lpl_common |
18 | -import tempfile |
19 | |
20 | # cope with apt_pkg api changes. |
21 | if 'init_system' in dir(apt_pkg): |
22 | @@ -57,18 +53,15 @@ for r in set(all_releases).difference(set(eol_releases)).difference(set([devel_r |
23 | |
24 | default_cves_to_process = ['active/CVE-*', 'retired/CVE-*'] |
25 | |
26 | -packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-") |
27 | - |
28 | debug_level = 0 |
29 | |
30 | package_cache = None |
31 | |
32 | -cve_cache = {} |
33 | - |
34 | def main(): |
35 | """ parse command line options and iterate through files to be processed |
36 | """ |
37 | global debug_level |
38 | + global supported_releases |
39 | |
40 | # parse command line options |
41 | parser = argparse.ArgumentParser(description='Generate CVE OVAL from ' |
42 | @@ -90,10 +83,8 @@ def main(): |
43 | '(default is ./)') |
44 | parser.add_argument('--no-progress', action='store_true', |
45 | help='do not show progress meter') |
46 | - parser.add_argument('--pkg-cache', action='store', default="pkg_cache.json", |
47 | + parser.add_argument('--pkg-cache-dir', action='store', default='./', |
48 | help='cache location for binary packages') |
49 | - parser.add_argument('--force-cache-reload', action='store_true', |
50 | - help='force reload of cache file') |
51 | parser.add_argument('-d', '--debug', action='count', default=0, |
52 | help="report debugging information") |
53 | parser.add_argument('--usn-oval', action='store_true', |
54 | @@ -149,25 +140,27 @@ def main(): |
55 | |
56 | return |
57 | |
58 | - cve_cache = {} |
59 | - cache = PackageCache(args.pkg_cache, args.force_cache_reload) |
60 | + releases = [args.oval_release] if args.oval_release else supported_releases |
61 | + supported_releases = releases |
62 | + cache = {} |
63 | + for release in releases: |
64 | + cve_cache = {} |
65 | + cache.update({release: get_package_cache(args.pkg_cache_dir, release)}) |
66 | |
67 | if args.pkg_oval: |
68 | - releases = [args.oval_release] if args.oval_release else supported_releases |
69 | - for release in releases: |
70 | - if args.oci: |
71 | - generate_oval_package(release, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ociprefix, ocioutdir) |
72 | - else: |
73 | - generate_oval_package(release, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only) |
74 | + if args.oci: |
75 | + generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ociprefix, ocioutdir) |
76 | + else: |
77 | + generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only) |
78 | return |
79 | |
80 | if args.oci: |
81 | - generate_oval_cve(args.output_dir, args.cve_prefix_dir, cache, args.oci, |
82 | - args.no_progress, args.packages, pathnames, ociprefix, ocioutdir) |
83 | + generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci, |
84 | + args.no_progress, args.packages, pathnames, ociprefix, ocioutdir) |
85 | else: |
86 | - generate_oval_cve(args.output_dir, args.cve_prefix_dir, cache, args.oci, |
87 | - args.no_progress, args.packages, pathnames) |
88 | - |
89 | + generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci, |
90 | + args.no_progress, args.packages, pathnames) |
91 | + return |
92 | |
93 | |
94 | # given a status generated by parse_package_status(), duplicate it for a |
95 | @@ -182,10 +175,8 @@ def duplicate_package_status(release, package, original_status, cache, override_ |
96 | elif 'fix-version' in original_status: |
97 | copied_status['fix-version'] = original_status['fix-version'] |
98 | |
99 | - if 'fix-version' in copied_status and copied_status['fix-version'].isdigit(): |
100 | - copied_status['bin-pkgs'] = cache.get_binarypkgs(package, release, version=copied_status['fix-version']) |
101 | - else: |
102 | - copied_status['bin-pkgs'] = cache.get_binarypkgs(package, release) |
103 | + if 'bin-pkgs' in original_status: |
104 | + copied_status['bin-pkgs'] = original_status['bin-pkgs'] |
105 | |
106 | return copied_status |
107 | |
108 | @@ -355,158 +346,16 @@ def prepend_usn_to_id(usn_database, usn_id): |
109 | if re.search(r'^[0-9]+-[0-9]$', usn_id): |
110 | usn_database[usn_id]['id'] = 'USN-' + usn_id |
111 | |
112 | -# Class to contain the binary package cache |
113 | -class PackageCache(): |
114 | - |
115 | - cachefile = None |
116 | - cache_updates = 0 |
117 | - releases = dict() |
118 | - unpublished_sources = dict() |
119 | - packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-") |
120 | - |
121 | - def __init__(self, cachefile='data_file.json', force_reload=False): |
122 | - self.cachefile = cachefile |
123 | - self.force_reload = force_reload |
124 | - |
125 | - # open the local cache if it exists |
126 | - if os.path.exists(self.cachefile): |
127 | - debug('Opening and reading cache file %s' % self.cachefile) |
128 | - with open(self.cachefile, "r") as read_file: |
129 | - self.pkgcache = json.load(read_file) |
130 | - else: |
131 | - self.pkgcache = dict() |
132 | - self.force_reload = True |
133 | - |
134 | - # Get launchpad handlers... |
135 | - debug('Setting up launchpad connection...') |
136 | - self.lp = lpl_common.connect(version='devel') |
137 | - #lp = Launchpad.login_anonymously("generate-oval", "production", version='devel') |
138 | - self.ubuntu = self.lp.distributions['ubuntu'] |
139 | - self.archive = self.ubuntu.main_archive |
140 | - |
141 | - # if debugging is enabled, flush cache every update |
142 | - self.cache_write_frequency = 1 if debug_level > 0 else 100 |
143 | - |
144 | - def write_cache(self): |
145 | - debug('Writing cache file %s' % self.cachefile) |
146 | - if self.cachefile: |
147 | - # create as separate file and try to do atomic rename so |
148 | - # multiple writers don't corrupt cache. |
149 | - with tempfile.NamedTemporaryFile(mode='w', prefix='oval_pkg_cache-', suffix='.new', dir=os.path.dirname(self.cachefile), delete=False) as write_file: |
150 | - new_cachefile = write_file.name |
151 | - json.dump(self.pkgcache, write_file, indent=2) |
152 | - |
153 | - os.rename(new_cachefile, self.cachefile) |
154 | - |
155 | - def _has_no_published_source(self, package, release): |
156 | - return (package in self.unpublished_sources |
157 | - and release in self.unpublished_sources[package]) |
158 | - |
159 | - def _add_no_published_source(self, package, release): |
160 | - if package not in self.unpublished_sources: |
161 | - self.unpublished_sources[package] = [release] |
162 | - else: |
163 | - self.unpublished_sources[package].append(release) |
164 | - |
165 | - # lookup source package in launchpad, get latest version |
166 | - def _lookup_latest_source_package(self, source_name, release): |
167 | - |
168 | - # cache lp release info |
169 | - if release not in self.releases: |
170 | - # we can have nested parent releases |
171 | - parent = release_parent(release) |
172 | - while release_parent(parent): |
173 | - parent = release_parent(parent) |
174 | - if parent: |
175 | - self.releases[release] = self.ubuntu.getSeries(name_or_version=parent) |
176 | - else: |
177 | - self.releases[release] = self.ubuntu.getSeries(name_or_version=product_series(release)[1]) |
178 | - |
179 | - ppa = release_ppa(release) |
180 | - if ppa: |
181 | - archive, group, ppa_full_name = lpl_common.get_archive(ppa, self.lp, False, distribution=self.ubuntu) |
182 | - sources = archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published') |
183 | - if len(sources) == 0: |
184 | - # if release is subproject and no version of package was released to it |
185 | - # then copy source from parent release |
186 | - sources = self.archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published') |
187 | - else: |
188 | - sources = self.archive.getPublishedSources(exact_match=True, source_name=source_name, distro_series=self.releases[release], status='Published') |
189 | - |
190 | - # some kernels get statuses even when not published in the |
191 | - # archive yet |
192 | - if len(sources) == 0: |
193 | - self._add_no_published_source(source_name, release) |
194 | - return None |
195 | - |
196 | - # in python3, filter returns an iterable object, so wrap in list() |
197 | - sources = list(filter(lambda x: x.pocket in ['Release', 'Security', 'Updates'], sources)) |
198 | - # some packages are only in proposed, even for non-devel releases |
199 | - if len(sources) == 0: |
200 | - self._add_no_published_source(source_name, release) |
201 | - return None |
202 | - |
203 | - source = sorted(sources, |
204 | - key=functools.cmp_to_key(lambda x, y: apt_pkg.version_compare(x.source_package_version, y.source_package_version)), |
205 | - reverse=True)[0] |
206 | - debug('Launchpad returned %s %s' % (source.source_package_name, source.source_package_version)) |
207 | - return source |
208 | - |
209 | - def get_binarypkgs(self, pname, release, version=None): |
210 | - """ return a list of binary packages from the source package """ |
211 | - |
212 | - # first check local cache |
213 | - # |
214 | - # if the version in the cve tracker is newer than the |
215 | - # version in the cache, we should refresh the cache |
216 | - # if there's no version in the tracker, return the cached entry |
217 | - if pname in self.pkgcache: |
218 | - if (release in self.pkgcache[pname]['Releases'] and |
219 | - (not version or |
220 | - apt_pkg.version_compare(version, self.pkgcache[pname]['Releases'][release].get('source_version', 0)) <= 0)): |
221 | - return self.pkgcache[pname]['Releases'][release]['binaries'] |
222 | - |
223 | - debug('Cache miss: %s %s %s' % (pname, release, version)) |
224 | - |
225 | - # skip lookup if unpublished_sources is empty and |
226 | - # force_reload is False |
227 | - if not self.unpublished_sources and not self.force_reload: |
228 | - if pname not in self.pkgcache or release not in self.pkgcache[pname]['Releases']: |
229 | - return None |
230 | - |
231 | - # skip lookup if we've already done a lookup and found no |
232 | - # published source for that release |
233 | - if self._has_no_published_source(pname, release): |
234 | - return None |
235 | - |
236 | - # query launchpad if not in local cache |
237 | - source = self._lookup_latest_source_package(pname, release) |
238 | - if not source: |
239 | - return None |
240 | - |
241 | - binaries = source.getPublishedBinaries() |
242 | - binlist = [] |
243 | - for i in binaries: |
244 | - # skip if we already saw this package |
245 | - if (i.binary_package_name in binlist): |
246 | - continue |
247 | - # for kernel we only want linux images |
248 | - if pname.startswith('linux') and not (i.binary_package_name).startswith('linux-image-'): |
249 | - continue |
250 | - # skip ignored packages, with exception of golang*-dev pkgs |
251 | - if (i.binary_package_name).startswith(('golang-go')) or not any(s in i.binary_package_name for s in self.packages_to_ignore): |
252 | - binlist.append(i.binary_package_name) |
253 | - |
254 | - # save current pkgcache to local cache |
255 | - if pname not in self.pkgcache: |
256 | - self.pkgcache[pname] = {'Releases': {}} |
257 | - self.pkgcache[pname]['Releases'][release] = {'binaries': binlist, 'source_version': source.source_package_version} |
258 | - |
259 | - self.cache_updates += 1 |
260 | - if self.cache_updates % self.cache_write_frequency == 0: |
261 | - self.write_cache() |
262 | +# loads cve package cache <release>-pkg-cache.json based given path to it. |
263 | +# To get the cache proceed as: $UCT/scripts/fetch-db <release>-pkg-cache.json pkg-cache |
264 | +def get_package_cache(pkg_cache_dir, release): |
265 | + data = {} |
266 | + for filename in glob.glob(os.path.join(pkg_cache_dir, release.replace('/', '_') + '-pkg-cache.json')): |
267 | + debug(f"Opening and reading cache file {filename}") |
268 | + with open(filename, 'r') as f: |
269 | + data = json.load(f) |
270 | |
271 | - return binlist |
272 | + return data |
273 | |
274 | # loads usn database.json based given a path to it. |
275 | # To get the database proceed as: $UCT/scripts/fetch-db database.json.bz2 |
276 | @@ -581,31 +430,31 @@ def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=N |
277 | |
278 | return True |
279 | |
280 | -def generate_oval_package(release, outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ociprefix='', ocioutdir=None): |
281 | - if not no_progress: |
282 | - print(f'[*] Generating OVAL for packages in release {release}') |
283 | - ov = oval_lib.OvalGeneratorPkg(release, release_name(release), pathnames, packages, not no_progress,pkg_cache=pkg_cache, fixed_only=fixed_only, cve_cache=cve_cache, oval_format='oci' if oci else 'dpkg', outdir=outdir, cve_prefix_dir=cve_prefix_dir, prefix=ociprefix) |
284 | - ov.generate_oval() |
285 | - |
286 | - if oci: |
287 | - ov.oval_format = 'dpkg' |
288 | +def generate_oval_package(outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ociprefix='', ocioutdir=None): |
289 | + for release in supported_releases: |
290 | + if not no_progress: |
291 | + print(f'[*] Generating OVAL for packages in release {release}') |
292 | + ov = oval_lib.OvalGeneratorPkg(release, release_name(release), pathnames, packages, not no_progress, pkg_cache=pkg_cache, fixed_only=fixed_only, cve_cache=cve_cache, oval_format='oci' if oci else 'dpkg', outdir=outdir, cve_prefix_dir=cve_prefix_dir, prefix=ociprefix) |
293 | ov.generate_oval() |
294 | |
295 | - pkg_cache.write_cache() |
296 | - if not no_progress: |
297 | - print(f'[X] Done generating OVAL for packages in release {release}') |
298 | + if oci: |
299 | + ov.oval_format = 'dpkg' |
300 | + ov.generate_oval() |
301 | + |
302 | + if not no_progress: |
303 | + print(f'[X] Done generating OVAL for packages in release {release}') |
304 | |
305 | def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages, pathnames, ociprefix=None, ocioutdir=None): |
306 | ovals = dict() |
307 | - for i in supported_releases: |
308 | + for release in supported_releases: |
309 | # we can have nested parent releases |
310 | - parent = release_progenitor(i) |
311 | - index = '{0}_dpkg'.format(i) |
312 | - ovals[index] = oval_lib.OvalGeneratorCVE(i, release_name(i), parent, warn, outdir, prefix='', oval_format='dpkg') |
313 | + parent = release_progenitor(release) |
314 | + index = '{0}_dpkg'.format(release) |
315 | + ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, outdir, prefix='', oval_format='dpkg') |
316 | ovals[index].add_release_applicability_definition() |
317 | if oci: |
318 | - index = '{0}_oci'.format(i) |
319 | - ovals[index] = oval_lib.OvalGeneratorCVE(i, release_name(i), parent, warn, ocioutdir, prefix=ociprefix, oval_format='oci') |
320 | + index = '{0}_oci'.format(release) |
321 | + ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, ocioutdir, prefix=ociprefix, oval_format='oci') |
322 | ovals[index].add_release_applicability_definition() |
323 | |
324 | # loop through all CVE data files |
325 | @@ -636,6 +485,5 @@ def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages, |
326 | for i in ovals: |
327 | ovals[i].write_to_file() |
328 | |
329 | - cache.write_cache() |
330 | if __name__ == '__main__': |
331 | main() |
332 | diff --git a/scripts/oval_lib.py b/scripts/oval_lib.py |
333 | index 20f452e..26908f9 100644 |
334 | --- a/scripts/oval_lib.py |
335 | +++ b/scripts/oval_lib.py |
336 | @@ -19,6 +19,7 @@ |
337 | from __future__ import unicode_literals |
338 | |
339 | from datetime import datetime, timezone |
340 | +import apt_pkg |
341 | import io |
342 | import os |
343 | import random |
344 | @@ -142,6 +143,49 @@ def generate_cve_tag(cve): |
345 | cve_ref += '>{0}</cve>'.format(cve['Candidate']) |
346 | return cve_ref |
347 | |
348 | +def get_latest_version(versions): |
349 | + latest = None |
350 | + for version in versions: |
351 | + if not latest: |
352 | + latest = version |
353 | + continue |
354 | + elif apt_pkg.version_compare(version, latest) > 0: |
355 | + latest = version |
356 | + |
357 | + return latest |
358 | + |
359 | +def get_binarypkgs(cache, source_name, release, version=None): |
360 | + """ return a list of binary packages from the source package version """ |
361 | + packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-") |
362 | + version_map = collections.defaultdict(list) |
363 | + rel = release |
364 | + cache_version = version |
365 | + |
366 | + if source_name not in cache[release]: |
367 | + # if a source package does not exist in such a release |
368 | + # return None |
369 | + return (None, None) |
370 | + elif version and version not in cache[rel][source_name]: |
371 | + # if version is not in release, then fetch latest source version |
372 | + cache_version = get_latest_version(list(cache[rel][source_name].keys())) |
373 | + elif not version: |
374 | + # if no version is provided, get latest source version |
375 | + version = get_latest_version(list(cache[rel][source_name].keys())) |
376 | + cache_version = version |
377 | + else: |
378 | + cache_version = version |
379 | + |
380 | + for binary, bin_data in cache[rel][source_name][cache_version]['binaries'].items(): |
381 | + # for kernel we only want linux images |
382 | + if source_name.startswith('linux') and not binary.startswith('linux-image-'): |
383 | + continue |
384 | + # skip ignored packages, with exception of golang*-dev pkgs |
385 | + if binary.startswith(('golang-go')) or \ |
386 | + not any(s in binary for s in packages_to_ignore): |
387 | + version_map[bin_data['version']].append(binary) |
388 | + |
389 | + return (version, version_map) |
390 | + |
391 | class OvalGenerator: |
392 | supported_oval_elements = ('definition', 'test', 'object', 'state', 'variable') |
393 | generator_version = '1.1' |
394 | @@ -340,13 +384,13 @@ class OvalGenerator: |
395 | return family_state, state |
396 | |
397 | class CVEPkgRelEntry: |
398 | - def __init__(self, pkg, release, cve, status, note) -> None: |
399 | + def __init__(self, pkg, release, cve, status, note, cache) -> None: |
400 | self.pkg = pkg |
401 | self.cve = cve |
402 | self.orig_status = status |
403 | self.orig_note = note |
404 | self.release = release |
405 | - cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, None) |
406 | + cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, cache) |
407 | |
408 | self.note = cve_info['note'] |
409 | self.status = cve_info['status'] |
410 | @@ -369,16 +413,13 @@ class CVEPkgRelEntry: |
411 | code = status_text.lower() |
412 | detail = note.strip('()') if note else None |
413 | status = {} |
414 | - fix_version = "" |
415 | + fix_version = None |
416 | |
417 | if detail and detail[0].isdigit() and len(detail.split(' ')) == 1: |
418 | fix_version = detail |
419 | |
420 | if cache and code != 'dne': |
421 | - if fix_version and code in ['released', 'not-affected']: |
422 | - status['bin-pkgs'] = cache.get_binarypkgs(package, release, version=fix_version) |
423 | - else: |
424 | - status['bin-pkgs'] = cache.get_binarypkgs(package, release) |
425 | + status['source-version'], status['bin-pkgs'] = get_binarypkgs(cache, package, release, version=fix_version) |
426 | |
427 | note_end = " (note: '{0}').".format(detail) if detail else '.' |
428 | if code == 'dne': |
429 | @@ -536,7 +577,7 @@ class OvalGeneratorPkg(OvalGenerator): |
430 | for cve in package.cves: |
431 | if self.fixed_only and cve.pkg_rel_entries[package.name].status != 'fixed': |
432 | continue |
433 | - cve_obj = self._generate_cve_object(cve) |
434 | + cve_obj = self._generate_cve_tag(cve) |
435 | advisory.append(cve_obj) |
436 | |
437 | rights.text = f"Copyright (C) {datetime.now().year} Canonical Ltd." |
438 | @@ -573,6 +614,19 @@ class OvalGeneratorPkg(OvalGenerator): |
439 | |
440 | return criteria |
441 | |
442 | + def _generate_subcriteria(self, operator) -> etree.Element: |
443 | + return etree.Element("criteria", attrib={ |
444 | + "operator": operator |
445 | + }) |
446 | + |
447 | + def _generate_criterion_element(self, comment, id) -> etree.Element: |
448 | + criterion = etree.Element("criterion", attrib={ |
449 | + "test_ref": f"{self.ns}:tst:{id}", |
450 | + "comment": comment |
451 | + }) |
452 | + |
453 | + return criterion |
454 | + |
455 | # Element generators |
456 | def _generate_reference(self, package) -> etree.Element: |
457 | reference = etree.Element("reference", attrib={ |
458 | @@ -583,7 +637,7 @@ class OvalGeneratorPkg(OvalGenerator): |
459 | |
460 | return reference |
461 | |
462 | - def _generate_definition_object(self, package) -> None: |
463 | + def _generate_definition_element(self, package) -> None: |
464 | id = f"{self.ns}:def:{self.definition_id}" |
465 | definition = etree.Element("definition") |
466 | definition.set("class", "vulnerability") |
467 | @@ -594,9 +648,10 @@ class OvalGeneratorPkg(OvalGenerator): |
468 | criteria = self._generate_criteria() |
469 | definition.append(metadata) |
470 | definition.append(criteria) |
471 | + |
472 | return definition |
473 | |
474 | - def _generate_cve_object(self, cve: CVE) -> etree.Element: |
475 | + def _generate_cve_tag(self, cve: CVE) -> etree.Element: |
476 | cve_tag = etree.Element("cve", |
477 | attrib={ |
478 | 'href' : f"https://ubuntu.com/security/{cve.number}", |
479 | @@ -612,7 +667,8 @@ class OvalGeneratorPkg(OvalGenerator): |
480 | cve_tag.set('usns', ','.join(cve.usns)) |
481 | |
482 | return cve_tag |
483 | - def _generate_var_object(self, comment, id, binaries) -> etree.Element: |
484 | + |
485 | + def _generate_var_element(self, comment, id, binaries) -> etree.Element: |
486 | var = etree.Element("constant_variable", |
487 | attrib={ |
488 | 'id' : f"{self.ns}:var:{id}", |
489 | @@ -627,7 +683,7 @@ class OvalGeneratorPkg(OvalGenerator): |
490 | |
491 | return var |
492 | |
493 | - def _generate_object_object(self, comment, id, var_id) -> etree.Element: |
494 | + def _generate_object_element(self, comment, id, var_id) -> etree.Element: |
495 | if self.oval_format == 'dpkg': |
496 | object = etree.Element("linux-def:dpkginfo_object", |
497 | attrib={ |
498 | @@ -662,6 +718,7 @@ class OvalGeneratorPkg(OvalGenerator): |
499 | path.text = '.' |
500 | filename.text = 'manifest' |
501 | instance.text = '1' |
502 | + |
503 | return object |
504 | |
505 | def _generate_test_element(self, comment, id, create_state, type, obj_id = None, state_id=None) -> etree.Element: |
506 | @@ -694,7 +751,10 @@ class OvalGeneratorPkg(OvalGenerator): |
507 | |
508 | return test |
509 | |
510 | - def _generate_state_object(self, comment, id, version) -> None: |
511 | + def _generate_state_element(self, comment, id, version) -> None: |
512 | + if version.find(':') == -1: |
513 | + version = f"0:{version}" |
514 | + |
515 | if self.oval_format == 'dpkg': |
516 | object = etree.Element("linux-def:dpkginfo_state", |
517 | attrib={ |
518 | @@ -708,7 +768,7 @@ class OvalGeneratorPkg(OvalGenerator): |
519 | "operation": "less than" |
520 | }) |
521 | |
522 | - version_check.text = f"0:{version}" |
523 | + version_check.text = f"{version}" |
524 | elif self.oval_format == 'oci': |
525 | object = etree.Element("ind-def:textfilecontent54_state", |
526 | attrib={ |
527 | @@ -722,20 +782,12 @@ class OvalGeneratorPkg(OvalGenerator): |
528 | "operation": "less than" |
529 | }) |
530 | |
531 | - version_check.text = f"0:{version}" |
532 | + version_check.text = f"{version}" |
533 | else: |
534 | ValueError(f"Format not {self.oval_format} not supported") |
535 | |
536 | return object |
537 | |
538 | - def _generate_criterion_element(self, comment, id) -> etree.Element: |
539 | - criterion = etree.Element("criterion", attrib={ |
540 | - "test_ref": f"{self.ns}:tst:{id}", |
541 | - "comment": comment |
542 | - }) |
543 | - |
544 | - return criterion |
545 | - |
546 | # Running kernel element generators |
547 | def _add_running_kernel_checks(self, root_element): |
548 | objects = root_element.find("objects") |
549 | @@ -813,11 +865,6 @@ class OvalGeneratorPkg(OvalGenerator): |
550 | return test |
551 | |
552 | # Kernel elements generators |
553 | - def _generate_criteria_kernel(self, operator) -> etree.Element: |
554 | - return etree.Element("criteria", attrib={ |
555 | - "operator": operator |
556 | - }) |
557 | - |
558 | def _generate_kernel_version_object_element(self, id, var_id) -> etree.Element: |
559 | object = etree.Element("ind-def:variable_object", |
560 | attrib={ |
561 | @@ -926,64 +973,45 @@ class OvalGeneratorPkg(OvalGenerator): |
562 | criterion = self._generate_criterion_element(criterion_note, id) |
563 | self._add_to_criteria(definition, criterion, depth) |
564 | |
565 | - def _generate_vulnerable_elements(self, package, obj_id=None): |
566 | + def _generate_elements(self, package, binaries, pkg_rel_entry, obj_id=None): |
567 | + create_state = False |
568 | + state = None |
569 | + var = None |
570 | + obj = None |
571 | binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary' |
572 | - test_note = f"Does the '{package.name}' package exist?" |
573 | object_note = f"The '{package.name}' package {binary_keyword}" |
574 | + test_note = "" |
575 | |
576 | - test = self._generate_test_element(test_note, self.definition_id, False, 'pkg', obj_id=obj_id) |
577 | - |
578 | - if not obj_id: |
579 | - object = self._generate_object_object(object_note, self.definition_id, self.definition_id) |
580 | - |
581 | - bins = package.binaries |
582 | + if self.oval_format == 'oci': |
583 | if is_kernel_binaries(package.binaries): |
584 | regex = process_kernel_binaries(package.binaries, 'oci') |
585 | - bins = [f'{regex}'] |
586 | - |
587 | - binaries = [] |
588 | - if self.oval_format == 'oci': |
589 | + binaries = [f'^{regex}(?::\w+|)\s+(.*)$'] |
590 | + else: |
591 | variable_values = '(?::\w+|)\s+(.*)$' |
592 | - for binary in bins: |
593 | + |
594 | + binaries = [] |
595 | + for binary in package.binaries: |
596 | binaries.append(f'^{binary}{variable_values}') |
597 | - else: |
598 | - binaries = bins |
599 | |
600 | - var = self._generate_var_object(object_note, self.definition_id, binaries) |
601 | - else: |
602 | - object = None |
603 | - var = None |
604 | - return test, object, var |
605 | + if pkg_rel_entry.status == 'vulnerable': |
606 | + test_note = f"Does the '{package.name}' package exist?" |
607 | + elif pkg_rel_entry.status == 'fixed': |
608 | + test_note = f"Does the '{package.name}' package exist and is the version less than '{pkg_rel_entry.fixed_version}'?" |
609 | + state_note = f"The package version is less than '{pkg_rel_entry.fixed_version}'" |
610 | |
611 | - def _generate_fixed_elements(self, package, pkg_rel_entry, obj_id=None): |
612 | - binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary' |
613 | - test_note = f"Does the '{package.name}' package exist and is the version less than '{pkg_rel_entry.fixed_version}'?" |
614 | - object_note = f"The '{package.name}' package {binary_keyword}" |
615 | - state_note = f"The package version is less than '{pkg_rel_entry.fixed_version}'" |
616 | + state = self._generate_state_element(state_note, self.definition_id, pkg_rel_entry.fixed_version) |
617 | + create_state = True |
618 | |
619 | - test = self._generate_test_element(test_note, self.definition_id, True, 'pkg', obj_id=obj_id) |
620 | - if not obj_id: |
621 | - object = self._generate_object_object(object_note, self.definition_id, self.definition_id) |
622 | + if not obj_id or create_state: |
623 | + obj_id = None |
624 | |
625 | - binaries = package.binaries |
626 | - if self.oval_format == 'oci': |
627 | - if is_kernel_binaries(package.binaries): |
628 | - regex = process_kernel_binaries(package.binaries, 'oci') |
629 | - binaries = [f'^{regex}(?::\w+|)\s+(.*)$'] |
630 | - else: |
631 | - variable_values = '(?::\w+|)\s+(.*)$' |
632 | + var = self._generate_var_element(object_note, self.definition_id, binaries) |
633 | |
634 | - binaries = [] |
635 | - for binary in package.binaries: |
636 | - binaries.append(f'^{binary}{variable_values}') |
637 | + obj = self._generate_object_element(object_note, self.definition_id, self.definition_id) |
638 | |
639 | - var = self._generate_var_object(object_note, self.definition_id, binaries) |
640 | - else: |
641 | - object = None |
642 | - var = None |
643 | - state = self._generate_state_object(state_note, self.definition_id, pkg_rel_entry.fixed_version) |
644 | + test = self._generate_test_element(test_note, self.definition_id, create_state, 'pkg', obj_id=obj_id) |
645 | |
646 | - return test, object, var, state |
647 | + return test, obj, var, state |
648 | |
649 | def _populate_pkg(self, package, root_element): |
650 | tests = root_element.find("tests") |
651 | @@ -993,7 +1021,7 @@ class OvalGeneratorPkg(OvalGenerator): |
652 | |
653 | # Add package definition |
654 | definitions = root_element.find("definitions") |
655 | - definition_element = self._generate_definition_object(package) |
656 | + definition_element = self._generate_definition_element(package) |
657 | |
658 | # Control/cache variables |
659 | one_time_added_id = None |
660 | @@ -1001,44 +1029,41 @@ class OvalGeneratorPkg(OvalGenerator): |
661 | binaries_id = None |
662 | cve_added = False |
663 | |
664 | + #criteria = None |
665 | + #if len(package.binaries) > 1: |
666 | + # criteria = self._generate_subcriteria('AND') |
667 | + |
668 | for cve in package.cves: |
669 | pkg_rel_entry = cve.pkg_rel_entries[package.name] |
670 | - if pkg_rel_entry.status == 'vulnerable': |
671 | - cve_added = True |
672 | - if one_time_added_id: |
673 | + for key in sorted(list(package.binaries)): |
674 | + binaries = package.binaries[key] |
675 | + if pkg_rel_entry.fixed_version: |
676 | + if pkg_rel_entry.fixed_version in fixed_versions: |
677 | + self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, definition_element) |
678 | + continue |
679 | + else: |
680 | + self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element) |
681 | + fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id |
682 | + elif one_time_added_id: |
683 | self._add_criterion(one_time_added_id, pkg_rel_entry, cve, definition_element) |
684 | + continue |
685 | else: |
686 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element) |
687 | - |
688 | - test, object, var = self._generate_vulnerable_elements(package, binaries_id) |
689 | - tests.append(test) |
690 | - |
691 | - if not binaries_id: |
692 | - objects.append(object) |
693 | - variables.append(var) |
694 | - binaries_id = self.definition_id |
695 | - |
696 | one_time_added_id = self.definition_id |
697 | - self._increase_id(is_definition=False) |
698 | - elif pkg_rel_entry.status == 'fixed': |
699 | - cve_added = True |
700 | |
701 | - if pkg_rel_entry.fixed_version in fixed_versions: |
702 | - self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, definition_element) |
703 | - else: |
704 | - self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element) |
705 | + test, obj, var, state = self._generate_elements(package, binaries, pkg_rel_entry, binaries_id) |
706 | |
707 | - test, object, var, state = self._generate_fixed_elements(package, pkg_rel_entry, binaries_id) |
708 | - tests.append(test) |
709 | + if state: |
710 | states.append(state) |
711 | |
712 | - if not binaries_id: |
713 | - objects.append(object) |
714 | - variables.append(var) |
715 | - binaries_id = self.definition_id |
716 | + if obj and var: |
717 | + binaries_id = self.definition_id |
718 | + variables.append(var) |
719 | + objects.append(obj) |
720 | |
721 | - fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id |
722 | - self._increase_id(is_definition=False) |
723 | + tests.append(test) |
724 | + self._increase_id(is_definition=False) |
725 | + cve_added = True |
726 | |
727 | if cve_added: |
728 | definitions.append(definition_element) |
729 | @@ -1048,7 +1073,7 @@ class OvalGeneratorPkg(OvalGenerator): |
730 | def _populate_kernel_pkg(self, package, root_element, running_kernel_id): |
731 | # Add package definition |
732 | definitions = root_element.find("definitions") |
733 | - definition_element = self._generate_definition_object(package) |
734 | + definition_element = self._generate_definition_element(package) |
735 | |
736 | # Control/cache variables |
737 | fixed_versions = {} |
738 | @@ -1056,7 +1081,7 @@ class OvalGeneratorPkg(OvalGenerator): |
739 | |
740 | # Generate one-time elements |
741 | kernel_criterion = self._generate_kernel_package_elements(package, root_element, running_kernel_id) |
742 | - criteria = self._generate_criteria_kernel('OR') |
743 | + criteria = self._generate_subcriteria('OR') |
744 | |
745 | self._add_to_criteria(definition_element, kernel_criterion, operator='AND') |
746 | self._add_to_criteria(definition_element, criteria, operator='AND') |
747 | @@ -1075,16 +1100,13 @@ class OvalGeneratorPkg(OvalGenerator): |
748 | |
749 | def _add_new_package(self, package_name, cve, release, cve_data, packages) -> None: |
750 | if package_name not in packages: |
751 | - binaries = self.pkg_cache.get_binarypkgs(package_name, release) |
752 | - version = '' |
753 | - if binaries: |
754 | - version = self.pkg_cache.pkgcache[package_name]['Releases'][release]['source_version'] |
755 | + version, binaries = get_binarypkgs(self.pkg_cache, package_name, release) |
756 | |
757 | pkg_obj = Package(package_name, release, binaries, version) |
758 | packages[package_name] = pkg_obj |
759 | |
760 | pkg_obj = packages[package_name] |
761 | - cve_pkg_entry = CVEPkgRelEntry(pkg_obj, release, cve, cve_data['pkgs'][package_name][release][0], cve_data['pkgs'][package_name][release][1]) |
762 | + cve_pkg_entry = CVEPkgRelEntry(pkg_obj, release, cve, cve_data['pkgs'][package_name][release][0], cve_data['pkgs'][package_name][release][1], self.pkg_cache) |
763 | |
764 | if cve_pkg_entry.status != 'fixed' and self.fixed_only: |
765 | return |
766 | @@ -1223,14 +1245,13 @@ class OvalGeneratorCVE: |
767 | test_refs = [] |
768 | packages = cve['packages'] |
769 | for package in sorted(packages.keys()): |
770 | - releases = packages[package]['Releases'] |
771 | - for release in sorted(releases.keys()): |
772 | - if release == self.release: |
773 | - release_status = releases[release] |
774 | - if 'bin-pkgs' in release_status and release_status['bin-pkgs']: |
775 | + if self.release in packages[package]['Releases']: |
776 | + release_status = packages[package]['Releases'][self.release] |
777 | + if 'bin-pkgs' in release_status and release_status['bin-pkgs']: |
778 | + for key in sorted(list(release_status['bin-pkgs'])): |
779 | pkg = { |
780 | 'name': package, |
781 | - 'binaries': release_status['bin-pkgs'], |
782 | + 'binaries': release_status['bin-pkgs'][key], |
783 | 'status': release_status['status'], |
784 | 'note': release_status['note'], |
785 | 'fix-version': release_status['fix-version'] if 'fix-version' in release_status else '', |
786 | @@ -1251,9 +1272,11 @@ class OvalGeneratorCVE: |
787 | if 'parent' in release_status: |
788 | product_description = cve_lib.get_subproject_description(release_status['parent']) |
789 | else: |
790 | - product_description = cve_lib.get_subproject_description(release) |
791 | + product_description = cve_lib.get_subproject_description(self.release) |
792 | instruction = prepare_instructions(instruction, header['Candidate'], product_description, pkg) |
793 | |
794 | + |
795 | + |
796 | # if no packages for this release, then we're done |
797 | if not len(test_refs): |
798 | return False |
LGTM, thanks!
All the tests seem to generate the "same output" and tests pass.
A great increase in generation speed!