Merge ~litios/ubuntu-cve-tracker:oval-refactor into ubuntu-cve-tracker:master
- Git
- lp:~litios/ubuntu-cve-tracker
- oval-refactor
- Merge into master
Status: | Merged |
---|---|
Merge reported by: | David Fernandez Gonzalez |
Merged at revision: | 3acb478f6a74effd62974c3e94399922a0036306 |
Proposed branch: | ~litios/ubuntu-cve-tracker:oval-refactor |
Merge into: | ubuntu-cve-tracker:master |
Diff against target: |
2793 lines (+1117/-1275) 3 files modified
scripts/generate-oval (+85/-96) scripts/oval_lib.py (+1031/-1178) test/gold_oval_structure/oval.xml (+1/-1) |
Related bugs: |
Reviewer | Review Type | Date Requested | Status |
---|---|---|---|
Eduardo Barretto | Approve | ||
Review via email: mp+455118@code.launchpad.net |
Commit message
Description of the change
This PR refactors the OVAL generation. The main changes are:
* Use of XML library.
* Creation of a common Generator class that shares most of the code needed to generate the OVAL elements.
* Refactor of CVE Generator and PKG generator to inherit from this class.
* Refactor of generate-oval to share a common interface
* New feature for pkg cache when the pkg version is not present: now it will use the lowest in the release if the binary versions differ.
The biggest impact will be on OVAL CVE, as the IDs, spacing, etc will differ now due to using the same one as PKG OVAL has been using.
Eduardo Barretto (ebarretto) wrote (last edit ): | # |
- 7365bc8... by David Fernandez Gonzalez
-
oval: increase generator speed
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 0574f1a... by David Fernandez Gonzalez
-
generate-oval: improvements
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 107e5ce... by David Fernandez Gonzalez
-
oval_lib: use CVE-based IDs for definitions for CVE OVAL
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- bec0e63... by David Fernandez Gonzalez
-
oval_lib: fix some bugs
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 29ca294... by David Fernandez Gonzalez
-
oval_lib: replace macos tag by linux
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 7239e32... by David Fernandez Gonzalez
-
oval_lib: bump OVAL generator version to 2
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- a42624b... by David Fernandez Gonzalez
-
oval_lib: use latest version binaries when vulnerable
- e0a6581... by David Fernandez Gonzalez
-
oval_lib: PKG - fix ids and ignore packages without binaries
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 7595fed... by David Fernandez Gonzalez
-
OVAL: fix test to fit new linux tag
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 9bf1b0f... by David Fernandez Gonzalez
-
OVAL: CVE - use same IDs as old format
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 42bf4f9... by David Fernandez Gonzalez
-
OVAL: CVE/PKG no longer use tmp directories
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 08b5354... by David Fernandez Gonzalez
-
OVAL: prefix argument is not longer used for CVE/PKG
Signed-off-by: David Fernandez Gonzalez <email address hidden>
- 05c7540... by David Fernandez Gonzalez
-
OVAL: enable ocioutdir for PKG/CVE
Signed-off-by: David Fernandez Gonzalez <email address hidden>
Eduardo Barretto (ebarretto) wrote : | # |
lgtm!
thanks for landing the last fixes!
- 89047d3... by David Fernandez Gonzalez
-
OVAL: improve USN generation code
* Fix bug when not listing releases
* Refactor generate_oval_usn code in generate-ovalSigned-off-by: David Fernandez Gonzalez <email address hidden>
- 3acb478... by David Fernandez Gonzalez
-
OVAL: make IDs smaller
Signed-off-by: David Fernandez Gonzalez <email address hidden>
David Fernandez Gonzalez (litios) wrote : | # |
Thanks a lot for all the reviews and the patience to check everything!!
Preview Diff
1 | diff --git a/scripts/generate-oval b/scripts/generate-oval | |||
2 | index 723127f..bdb9902 100755 | |||
3 | --- a/scripts/generate-oval | |||
4 | +++ b/scripts/generate-oval | |||
5 | @@ -96,8 +96,8 @@ def main(): | |||
6 | 96 | '(default is ./)') | 96 | '(default is ./)') |
7 | 97 | parser.add_argument('--usn-number', default=None, type=str, | 97 | parser.add_argument('--usn-number', default=None, type=str, |
8 | 98 | help='if passed specifics a USN for the oval_usn generator') | 98 | help='if passed specifics a USN for the oval_usn generator') |
11 | 99 | parser.add_argument('--oval-release', default=None, type=str, | 99 | parser.add_argument('--oval-releases', default=None, action='append', |
12 | 100 | help='specifies a release to generate the oval usn') | 100 | help='specifies releases to generate the oval') |
13 | 101 | parser.add_argument('--packages', nargs='+', action='store', default=None, | 101 | parser.add_argument('--packages', nargs='+', action='store', default=None, |
14 | 102 | help='generates oval for specific packages. Only for ' | 102 | help='generates oval for specific packages. Only for ' |
15 | 103 | 'CVE OVAL') | 103 | 'CVE OVAL') |
16 | @@ -132,27 +132,23 @@ def main(): | |||
17 | 132 | 132 | ||
18 | 133 | if args.usn_oval: | 133 | if args.usn_oval: |
19 | 134 | if args.oci: | 134 | if args.oci: |
22 | 135 | generate_oval_usn(args.output_dir, args.usn_number, args.oval_release, | 135 | generate_oval_usn(args.output_dir, args.usn_number, args.oval_releases, |
23 | 136 | args.cve_prefix_dir, args.usn_db_dir, ociprefix, ocioutdir) | 136 | args.cve_prefix_dir, args.usn_db_dir, args.no_progress, ociprefix, ocioutdir) |
24 | 137 | else: | 137 | else: |
27 | 138 | generate_oval_usn(args.output_dir, args.usn_number, args.oval_release, | 138 | generate_oval_usn(args.output_dir, args.usn_number, args.oval_releases, |
28 | 139 | args.cve_prefix_dir, args.usn_db_dir) | 139 | args.cve_prefix_dir, args.usn_db_dir, args.no_progress,) |
29 | 140 | 140 | ||
30 | 141 | return | 141 | return |
31 | 142 | 142 | ||
32 | 143 | # if --oval-release, we still need to load parents cache | 143 | # if --oval-release, we still need to load parents cache |
33 | 144 | # so we can generate a complete oval for those releases that | 144 | # so we can generate a complete oval for those releases that |
34 | 145 | # have a parent | 145 | # have a parent |
45 | 146 | if args.oval_release: | 146 | releases = supported_releases |
46 | 147 | releases = [args.oval_release] | 147 | if args.oval_releases: |
47 | 148 | if args.oval_release not in supported_releases: | 148 | releases = args.oval_releases |
48 | 149 | error(f"unknown oval release {args.oval_release}") | 149 | for release in releases: |
49 | 150 | else: | 150 | if release not in supported_releases: |
50 | 151 | r = args.oval_release | 151 | error(f"unknown oval release {release}") |
41 | 152 | while release_parent(r): | ||
42 | 153 | r = release_parent(r) | ||
43 | 154 | releases.append(r) | ||
44 | 155 | supported_releases = releases | ||
51 | 156 | 152 | ||
52 | 157 | cache = {} | 153 | cache = {} |
53 | 158 | for release in supported_releases: | 154 | for release in supported_releases: |
54 | @@ -161,17 +157,15 @@ def main(): | |||
55 | 161 | 157 | ||
56 | 162 | if args.pkg_oval: | 158 | if args.pkg_oval: |
57 | 163 | if args.oci: | 159 | if args.oci: |
59 | 164 | generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ociprefix, ocioutdir) | 160 | generate_oval_package(releases, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ocioutdir) |
60 | 165 | else: | 161 | else: |
62 | 166 | generate_oval_package(outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only) | 162 | generate_oval_package(releases, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only) |
63 | 167 | return | 163 | return |
64 | 168 | 164 | ||
65 | 169 | if args.oci: | 165 | if args.oci: |
68 | 170 | generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci, | 166 | generate_oval_cve(releases, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only, ocioutdir) |
67 | 171 | args.no_progress, args.packages, pathnames, ociprefix, ocioutdir) | ||
69 | 172 | else: | 167 | else: |
72 | 173 | generate_oval_cve(outdir, args.cve_prefix_dir, cache, args.oci, | 168 | generate_oval_cve(releases, outdir, args.cve_prefix_dir, cache, cve_cache, args.oci, args.no_progress, args.packages, pathnames, args.fixed_only) |
71 | 174 | args.no_progress, args.packages, pathnames) | ||
73 | 175 | return | 169 | return |
74 | 176 | 170 | ||
75 | 177 | 171 | ||
76 | @@ -390,7 +384,7 @@ def get_usn_database(usn_db_dir): | |||
77 | 390 | # WARNING: | 384 | # WARNING: |
78 | 391 | # be sure the release you are passing is in the usn-number passed | 385 | # be sure the release you are passing is in the usn-number passed |
79 | 392 | # otherwise it will generate an oval file without the usn info. | 386 | # otherwise it will generate an oval file without the usn info. |
81 | 393 | def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=None, ocioutdir=None): | 387 | def generate_oval_usn(outdir, usn, usn_releases, cve_dir, usn_db_dir, no_progress, ociprefix=None, ocioutdir=None): |
82 | 394 | # Get the usn database.json data | 388 | # Get the usn database.json data |
83 | 395 | usn_database = get_usn_database(usn_db_dir) | 389 | usn_database = get_usn_database(usn_db_dir) |
84 | 396 | if not usn_database: | 390 | if not usn_database: |
85 | @@ -400,33 +394,28 @@ def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=N | |||
86 | 400 | if usn not in usn_database: | 394 | if usn not in usn_database: |
87 | 401 | error("Please enter a valid USN number or update your database.json and try again") | 395 | error("Please enter a valid USN number or update your database.json and try again") |
88 | 402 | 396 | ||
89 | 403 | if usn_release: | ||
90 | 404 | if usn_release not in supported_releases: | ||
91 | 405 | error("Please enter a valid release name.") | ||
92 | 406 | |||
93 | 407 | # Create OvalGeneratorUSN objects | 397 | # Create OvalGeneratorUSN objects |
94 | 408 | ovals = [] | 398 | ovals = [] |
102 | 409 | # Does the oval for just a specific given release | 399 | valid_releases = [] |
103 | 410 | if usn_release: | 400 | |
104 | 411 | ovals.append(oval_lib.OvalGeneratorUSN(usn_release, release_name(usn_release), outdir, cve_dir)) | 401 | # Check or generate valid releases |
105 | 412 | # Also produce oval generator object for OCI | 402 | if usn_releases: |
106 | 413 | if ocioutdir: | 403 | for usn_release in usn_releases: |
107 | 414 | ovals.append(oval_lib.OvalGeneratorUSN(usn_release, release_name(usn_release), ocioutdir, | 404 | if usn_release not in supported_releases: |
108 | 415 | cve_dir, ociprefix, 'oci')) | 405 | error(f"Invalid release name '{usn_release}'.") |
109 | 406 | valid_releases = usn_releases | ||
110 | 416 | else: | 407 | else: |
116 | 417 | for release in supported_releases: | 408 | valid_releases = list(filter(lambda release: product_series(release)[0] == PRODUCT_UBUNTU, supported_releases)) |
112 | 418 | # for now we don't differentiate products (e.g. esm) in the USN DB | ||
113 | 419 | product, series = product_series(release) | ||
114 | 420 | if product != PRODUCT_UBUNTU: | ||
115 | 421 | continue | ||
117 | 422 | 409 | ||
124 | 423 | ovals.append(oval_lib.OvalGeneratorUSN(release, release_name(release), outdir, cve_dir)) | 410 | if not no_progress: |
125 | 424 | # Also produce oval generator object for OCI | 411 | print('[*] Generating OVAL USN for packages in releases', ', '.join(valid_releases)) |
120 | 425 | if ocioutdir: | ||
121 | 426 | ovals.append(oval_lib.OvalGeneratorUSN(release, release_name(release), ocioutdir, | ||
122 | 427 | cve_dir, ociprefix, | ||
123 | 428 | 'oci')) | ||
126 | 429 | 412 | ||
127 | 413 | for release in valid_releases: | ||
128 | 414 | ovals.append(oval_lib.OvalGeneratorUSN(release, release_name(release), outdir, cve_dir)) | ||
129 | 415 | # Also produce oval generator object for OCI | ||
130 | 416 | if ocioutdir: | ||
131 | 417 | ovals.append(oval_lib.OvalGeneratorUSN(release, release_name(release), ocioutdir, | ||
132 | 418 | cve_dir, ociprefix, 'oci')) | ||
133 | 430 | # Generate OVAL USN data | 419 | # Generate OVAL USN data |
134 | 431 | if usn: | 420 | if usn: |
135 | 432 | prepend_usn_to_id(usn_database, usn) | 421 | prepend_usn_to_id(usn_database, usn) |
136 | @@ -441,62 +430,62 @@ def generate_oval_usn(outdir, usn, usn_release, cve_dir, usn_db_dir, ociprefix=N | |||
137 | 441 | for oval in ovals: | 430 | for oval in ovals: |
138 | 442 | oval.write_oval_elements() | 431 | oval.write_oval_elements() |
139 | 443 | 432 | ||
140 | 433 | if not no_progress: | ||
141 | 434 | print(f'[*] Done generating OVAL USN for packages in releases {", ".join(valid_releases)}') | ||
142 | 435 | |||
143 | 444 | return True | 436 | return True |
144 | 445 | 437 | ||
150 | 446 | def generate_oval_package(outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ociprefix='', ocioutdir=None): | 438 | def generate_oval_package(releases, outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ocioutdir=None): |
151 | 447 | for release in supported_releases: | 439 | if not no_progress: |
152 | 448 | if not no_progress: | 440 | print(f'[*] Generating OVAL PKG for packages in releases {", ".join(releases)}') |
153 | 449 | print(f'[*] Generating OVAL for packages in release {release}') | 441 | |
154 | 450 | ov = oval_lib.OvalGeneratorPkg(release, release_name(release), pathnames, packages, not no_progress, pkg_cache=pkg_cache, fixed_only=fixed_only, cve_cache=cve_cache, oval_format='oci' if oci else 'dpkg', outdir=outdir, cve_prefix_dir=cve_prefix_dir, prefix=ociprefix) | 442 | ov = oval_lib.OvalGeneratorPkg( |
155 | 443 | releases, | ||
156 | 444 | pathnames, | ||
157 | 445 | packages, | ||
158 | 446 | not no_progress, | ||
159 | 447 | pkg_cache=pkg_cache, | ||
160 | 448 | fixed_only=fixed_only, | ||
161 | 449 | cve_cache=cve_cache, | ||
162 | 450 | oval_format='dpkg', | ||
163 | 451 | outdir=outdir, | ||
164 | 452 | cve_prefix_dir=cve_prefix_dir | ||
165 | 453 | ) | ||
166 | 454 | ov.generate_oval() | ||
167 | 455 | |||
168 | 456 | if oci: | ||
169 | 457 | ov.oval_format = 'oci' | ||
170 | 458 | ov.output_dir = ocioutdir | ||
171 | 451 | ov.generate_oval() | 459 | ov.generate_oval() |
172 | 452 | 460 | ||
217 | 453 | if oci: | 461 | if not no_progress: |
218 | 454 | ov.oval_format = 'dpkg' | 462 | print(f'[X] Done generating OVAL PKG for packages in releases {", ".join(releases)}') |
219 | 455 | ov.generate_oval() | 463 | |
220 | 456 | 464 | def generate_oval_cve(releases, outdir, cve_prefix_dir, pkg_cache, cve_cache, oci, no_progress, packages, pathnames, fixed_only, ocioutdir=None): | |
221 | 457 | if not no_progress: | 465 | if not no_progress: |
222 | 458 | print(f'[X] Done generating OVAL for packages in release {release}') | 466 | print(f'[*] Generating OVAL CVE for packages in releases {",".join(releases)}') |
223 | 459 | 467 | ||
224 | 460 | def generate_oval_cve(outdir, cve_prefix_dir, cache, oci, no_progress, packages, pathnames, ociprefix=None, ocioutdir=None): | 468 | ov = oval_lib.OvalGeneratorCVE( |
225 | 461 | ovals = dict() | 469 | releases, |
226 | 462 | for release in supported_releases: | 470 | pathnames, |
227 | 463 | # we can have nested parent releases | 471 | packages, |
228 | 464 | parent = release_progenitor(release) | 472 | not no_progress, |
229 | 465 | index = '{0}_dpkg'.format(release) | 473 | pkg_cache=pkg_cache, |
230 | 466 | ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, outdir, prefix='', oval_format='dpkg') | 474 | fixed_only=fixed_only, |
231 | 467 | ovals[index].add_release_applicability_definition() | 475 | cve_cache=cve_cache, |
232 | 468 | if oci: | 476 | oval_format='dpkg', |
233 | 469 | index = '{0}_oci'.format(release) | 477 | outdir=outdir, |
234 | 470 | ovals[index] = oval_lib.OvalGeneratorCVE(release, release_name(release), parent, warn, ocioutdir, prefix=ociprefix, oval_format='oci') | 478 | cve_prefix_dir=cve_prefix_dir |
235 | 471 | ovals[index].add_release_applicability_definition() | 479 | ) |
236 | 472 | 480 | ov.generate_oval() | |
237 | 473 | # loop through all CVE data files | 481 | |
238 | 474 | files = [] | 482 | if oci: |
239 | 475 | for pathname in pathnames: | 483 | ov.oval_format = 'oci' |
240 | 476 | files = files + glob.glob(os.path.join(cve_prefix_dir, pathname)) | 484 | ov.output_dir = ocioutdir |
241 | 477 | files.sort() | 485 | ov.generate_oval() |
198 | 478 | |||
199 | 479 | pkg_filter = None | ||
200 | 480 | if packages: | ||
201 | 481 | pkg_filter = packages | ||
202 | 482 | |||
203 | 483 | files_count = len(files) | ||
204 | 484 | for i_file, filepath in enumerate(files): | ||
205 | 485 | cve_data = parse_cve_file(filepath, cache, pkg_filter) | ||
206 | 486 | # skip CVEs without packages for supported releases | ||
207 | 487 | if not cve_data['packages']: | ||
208 | 488 | if not no_progress: | ||
209 | 489 | progress_bar(i_file + 1, files_count) | ||
210 | 490 | continue | ||
211 | 491 | |||
212 | 492 | for i in ovals: | ||
213 | 493 | ovals[i].generate_cve_definition(cve_data) | ||
214 | 494 | |||
215 | 495 | if not no_progress: | ||
216 | 496 | progress_bar(i_file + 1, files_count) | ||
242 | 497 | 486 | ||
245 | 498 | for i in ovals: | 487 | if not no_progress: |
246 | 499 | ovals[i].write_to_file() | 488 | print(f'[X] Done generating OVAL CVE for packages in releases {", ".join(releases)}') |
247 | 500 | 489 | ||
248 | 501 | if __name__ == '__main__': | 490 | if __name__ == '__main__': |
249 | 502 | main() | 491 | main() |
250 | diff --git a/scripts/oval_lib.py b/scripts/oval_lib.py | |||
251 | index 655f0af..abe63dc 100644 | |||
252 | --- a/scripts/oval_lib.py | |||
253 | +++ b/scripts/oval_lib.py | |||
254 | @@ -22,7 +22,6 @@ from datetime import datetime, timezone | |||
255 | 22 | import apt_pkg | 22 | import apt_pkg |
256 | 23 | import io | 23 | import io |
257 | 24 | import os | 24 | import os |
258 | 25 | import random | ||
259 | 26 | import re | 25 | import re |
260 | 27 | import shutil | 26 | import shutil |
261 | 28 | import sys | 27 | import sys |
262 | @@ -30,6 +29,7 @@ import tempfile | |||
263 | 30 | import collections | 29 | import collections |
264 | 31 | import glob | 30 | import glob |
265 | 32 | import xml.etree.cElementTree as etree | 31 | import xml.etree.cElementTree as etree |
266 | 32 | import json | ||
267 | 33 | from xml.dom import minidom | 33 | from xml.dom import minidom |
268 | 34 | from typing import Tuple # Needed because of Python < 3.9 and to also support < 3.7 | 34 | from typing import Tuple # Needed because of Python < 3.9 and to also support < 3.7 |
269 | 35 | 35 | ||
270 | @@ -41,6 +41,7 @@ from xml.sax.saxutils import escape | |||
271 | 41 | sources = {} | 41 | sources = {} |
272 | 42 | source_map_binaries = {} | 42 | source_map_binaries = {} |
273 | 43 | debug_level = 0 | 43 | debug_level = 0 |
274 | 44 | GENERIC_VERSION = '0:0' | ||
275 | 44 | 45 | ||
276 | 45 | def recursive_rm(dirPath): | 46 | def recursive_rm(dirPath): |
277 | 46 | '''recursively remove directory''' | 47 | '''recursively remove directory''' |
278 | @@ -143,80 +144,361 @@ def generate_cve_tag(cve): | |||
279 | 143 | cve_ref += '>{0}</cve>'.format(cve['Candidate']) | 144 | cve_ref += '>{0}</cve>'.format(cve['Candidate']) |
280 | 144 | return cve_ref | 145 | return cve_ref |
281 | 145 | 146 | ||
294 | 146 | def get_latest_version(versions): | 147 | def get_binarypkgs(cache, source_name, release): |
283 | 147 | latest = None | ||
284 | 148 | for version in versions: | ||
285 | 149 | if not latest: | ||
286 | 150 | latest = version | ||
287 | 151 | continue | ||
288 | 152 | elif apt_pkg.version_compare(version, latest) > 0: | ||
289 | 153 | latest = version | ||
290 | 154 | |||
291 | 155 | return latest | ||
292 | 156 | |||
293 | 157 | def get_binarypkgs(cache, source_name, release, version=None): | ||
295 | 158 | """ return a list of binary packages from the source package version """ | 148 | """ return a list of binary packages from the source package version """ |
296 | 159 | packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-") | 149 | packages_to_ignore = ("-dev", "-doc", "-dbg", "-dbgsym", "-udeb", "-locale-") |
299 | 160 | version_map = collections.defaultdict(list) | 150 | binaries_map = collections.defaultdict(dict) |
298 | 161 | cache_version = version | ||
300 | 162 | 151 | ||
301 | 163 | if source_name not in cache[release]: | 152 | if source_name not in cache[release]: |
302 | 164 | rel = release | 153 | rel = release |
303 | 165 | while cve_lib.release_parent(rel): | 154 | while cve_lib.release_parent(rel): |
304 | 166 | rel = cve_lib.release_parent(rel) | 155 | rel = cve_lib.release_parent(rel) |
306 | 167 | r , sv, vm = get_binarypkgs(cache, source_name, rel, version) | 156 | r , vb = get_binarypkgs(cache, source_name, rel) |
307 | 168 | if r: | 157 | if r: |
309 | 169 | return r, sv, vm | 158 | return r, vb |
310 | 170 | 159 | ||
311 | 171 | # if a source package does not exist in such a release | 160 | # if a source package does not exist in such a release |
312 | 172 | # return None | 161 | # return None |
330 | 173 | return release, None, None | 162 | return None, None |
331 | 174 | elif version and version not in cache[release][source_name]: | 163 | |
332 | 175 | rel = release | 164 | for source_version in cache[release][source_name]: |
333 | 176 | while cve_lib.release_parent(rel): | 165 | binaries_map.setdefault(source_version, dict()) |
334 | 177 | rel = cve_lib.release_parent(rel) | 166 | for binary, bin_data in cache[release][source_name][source_version]['binaries'].items(): |
335 | 178 | r , sv, vm = get_binarypkgs(cache, source_name, rel, version) | 167 | # for kernel we only want linux images |
336 | 179 | if r: | 168 | if source_name.startswith('linux') and not binary.startswith('linux-image-'): |
337 | 180 | return r, sv, vm | 169 | continue |
338 | 181 | 170 | # skip ignored packages, with exception of golang*-dev pkgs | |
339 | 182 | # if version is not in release, then fetch latest source version | 171 | if binary.startswith(('golang-go')) or \ |
340 | 183 | cache_version = get_latest_version(list(cache[release][source_name].keys())) | 172 | not any(s in binary for s in packages_to_ignore): |
341 | 184 | elif not version: | 173 | binaries_map[source_version].setdefault(bin_data['version'], list()) |
342 | 185 | # if no version is provided, get latest source version | 174 | binaries_map[source_version][bin_data['version']].append(binary) |
343 | 186 | version = get_latest_version(list(cache[release][source_name].keys())) | 175 | |
344 | 187 | cache_version = version | 176 | return release, binaries_map |
345 | 188 | else: | 177 | |
346 | 189 | cache_version = version | 178 | class CVEPkgRelEntry: |
347 | 179 | def __init__(self, pkg, release, cve, status, note) -> None: | ||
348 | 180 | self.pkg = pkg | ||
349 | 181 | self.cve = cve | ||
350 | 182 | self.orig_status = status | ||
351 | 183 | self.orig_note = note | ||
352 | 184 | self.release = release | ||
353 | 185 | cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, None) | ||
354 | 186 | |||
355 | 187 | self.note = cve_info['note'] | ||
356 | 188 | self.status = cve_info['status'] | ||
357 | 189 | self.fixed_version = cve_info['fix-version'] if self.status == 'fixed' else None | ||
358 | 190 | |||
359 | 191 | @staticmethod | ||
360 | 192 | def parse_package_status(release, package, status_text, note, filepath, cache): | ||
361 | 193 | """ parse ubuntu package status string format: | ||
362 | 194 | <status code> (<version/notes>) | ||
363 | 195 | outputs dictionary: { | ||
364 | 196 | 'status' : '<not-applicable | unknown | vulnerable | fixed>', | ||
365 | 197 | 'note' : '<description of the status>', | ||
366 | 198 | 'fix-version' : '<version with issue fixed, if applicable>', | ||
367 | 199 | 'bin-pkgs' : [] | ||
368 | 200 | } """ | ||
369 | 201 | |||
370 | 202 | # TODO fix for CVE Generator | ||
371 | 203 | |||
372 | 204 | # break out status code and detail | ||
373 | 205 | code = status_text.lower() | ||
374 | 206 | detail = note.strip('()') if note else None | ||
375 | 207 | status = {} | ||
376 | 208 | fix_version = "" | ||
377 | 209 | |||
378 | 210 | if detail and detail[0].isdigit() and len(detail.split(' ')) == 1: | ||
379 | 211 | fix_version = detail | ||
380 | 212 | |||
381 | 213 | note_end = " (note: '{0}').".format(detail) if detail else '.' | ||
382 | 214 | if code == 'dne': | ||
383 | 215 | status['status'] = 'not-applicable' | ||
384 | 216 | status['note'] = \ | ||
385 | 217 | " package does not exist in {0}{1}".format(release, note_end) | ||
386 | 218 | elif code == 'ignored': | ||
387 | 219 | status['status'] = 'vulnerable' | ||
388 | 220 | status['note'] = ": while related to the CVE in some way, a decision has been made to ignore this issue{0}".format(note_end) | ||
389 | 221 | elif code == 'not-affected': | ||
390 | 222 | # check if there is a release version and if so, test for | ||
391 | 223 | # package existence with that version | ||
392 | 224 | if fix_version: | ||
393 | 225 | status['status'] = 'fixed' | ||
394 | 226 | status['note'] = " package in {0}, is related to the CVE in some way and has been fixed{1}".format(release, note_end) | ||
395 | 227 | status['fix-version'] = fix_version | ||
396 | 228 | else: | ||
397 | 229 | status['status'] = 'not-vulnerable' | ||
398 | 230 | status['note'] = " package in {0}, while related to the CVE in some way, is not affected{1}".format(release, note_end) | ||
399 | 231 | elif code == 'needed': | ||
400 | 232 | status['status'] = 'vulnerable' | ||
401 | 233 | status['note'] = \ | ||
402 | 234 | " package in {0} is affected and needs fixing{1}".format(release, note_end) | ||
403 | 235 | elif code == 'pending': | ||
404 | 236 | # pending means that packages have been prepared and are in | ||
405 | 237 | # -proposed or in a ppa somewhere, and should have a version | ||
406 | 238 | # attached. If there is a version, test for package existence | ||
407 | 239 | # with that version, otherwise mark as vulnerable | ||
408 | 240 | if fix_version: | ||
409 | 241 | status['status'] = 'fixed' | ||
410 | 242 | status['note'] = " package in {0} is affected. An update containing the fix has been completed and is pending publication{1}".format(release, note_end) | ||
411 | 243 | status['fix-version'] = fix_version | ||
412 | 244 | else: | ||
413 | 245 | status['status'] = 'vulnerable' | ||
414 | 246 | status['note'] = " package in {0} is affected. An update containing the fix has been completed and is pending publication{1}".format(release, note_end) | ||
415 | 247 | elif code == 'deferred': | ||
416 | 248 | status['status'] = 'vulnerable' | ||
417 | 249 | status['note'] = " package in {0} is affected, but a decision has been made to defer addressing it{1}".format(release, note_end) | ||
418 | 250 | elif code in ['released']: | ||
419 | 251 | # if there isn't a release version, then just mark | ||
420 | 252 | # as vulnerable to test for package existence | ||
421 | 253 | if not fix_version: | ||
422 | 254 | status['status'] = 'vulnerable' | ||
423 | 255 | status['note'] = " package in {0} was vulnerable and has been fixed, but no release version available for it{1}".format(release, note_end) | ||
424 | 256 | else: | ||
425 | 257 | status['status'] = 'fixed' | ||
426 | 258 | status['note'] = " package in {0} was vulnerable but has been fixed{1}".format(release, note_end) | ||
427 | 259 | status['fix-version'] = fix_version | ||
428 | 260 | elif code == 'needs-triage': | ||
429 | 261 | status['status'] = 'vulnerable' | ||
430 | 262 | status['note'] = " package in {0} is affected and may need fixing{1}".format(release, note_end) | ||
431 | 263 | else: | ||
432 | 264 | # TODO LOGGIN | ||
433 | 265 | print('Unsupported status "{0}" in {1}_{2} in "{3}". Setting to "unknown".'.format(code, release, package, filepath)) | ||
434 | 266 | status['status'] = 'unknown' | ||
435 | 267 | status['note'] = " package in {0} has a vulnerability that is not known (status: '{1}'). It is pending evaluation{2}".format(release, code, note_end) | ||
436 | 268 | |||
437 | 269 | return status | ||
438 | 270 | |||
439 | 271 | def is_not_applicable(self) -> bool: | ||
440 | 272 | return self.status in ['not-vulnerable', 'not-applicable'] | ||
441 | 273 | |||
442 | 274 | def __str__(self) -> str: | ||
443 | 275 | return f'{str(self.pkg)}:{self.status} {self.fixed_version}' | ||
444 | 276 | |||
445 | 277 | class CVE: | ||
446 | 278 | def __init__(self, number, info, pkgs=None) -> None: | ||
447 | 279 | self.number = number | ||
448 | 280 | self.description = info['Description'] | ||
449 | 281 | self.priority = info['Priority'][0] | ||
450 | 282 | self.public_date = info['PublicDate'] | ||
451 | 283 | self.public_date_at_usn = info['PublicDateAtUSN'] if 'PublicDateAtUSN' in info else '' | ||
452 | 284 | self.cvss = info['CVSS'] | ||
453 | 285 | self.assigned_to = info['Assigned-to'] if 'Assigned-to' in info else '' | ||
454 | 286 | self.discoverd_by = info['Discovered-by'] if 'Discovered-by' in info else '' | ||
455 | 287 | self.usns = [] | ||
456 | 288 | self.references = [] | ||
457 | 289 | self.bugs = [] | ||
458 | 290 | for url in info['References'].split('\n'): | ||
459 | 291 | if 'https://ubuntu.com/security/notices/USN-' in url: | ||
460 | 292 | self.usns.append(url[40:]) | ||
461 | 293 | elif re.match("https?:\/\/(bugs\.)?launchpad\.net\/(.*\/\+bug|bugs)\/\d+", url): | ||
462 | 294 | self.bugs.append(url) | ||
463 | 295 | elif url: | ||
464 | 296 | self.references.append(url) | ||
465 | 297 | |||
466 | 298 | for bug in info['Bugs'].split('\n'): | ||
467 | 299 | if bug: | ||
468 | 300 | self.bugs.append(bug) | ||
469 | 301 | |||
470 | 302 | self.pkg_rel_entries = {} | ||
471 | 303 | self.pkgs = pkgs if pkgs else [] | ||
472 | 304 | |||
473 | 305 | def get_pkgs(self, releases): | ||
474 | 306 | # We assume priority is as the order in the list | ||
475 | 307 | pkgs = [] | ||
476 | 308 | pkg_rel = {} | ||
477 | 309 | for pkg in self.pkgs: | ||
478 | 310 | if pkg.rel not in releases: | ||
479 | 311 | continue | ||
480 | 312 | |||
481 | 313 | if pkg.name not in pkg_rel: | ||
482 | 314 | pkg_rel[pkg.name] = pkg.rel | ||
483 | 315 | elif releases.index(pkg.rel) < releases.index(pkg_rel[pkg.name]): | ||
484 | 316 | pkg_rel[pkg.name] = pkg.rel | ||
485 | 317 | |||
486 | 318 | for pkg in self.pkgs: | ||
487 | 319 | if self.pkg_rel_entries[str(pkg)].is_not_applicable(): | ||
488 | 320 | continue | ||
489 | 321 | |||
490 | 322 | if pkg.name in pkg_rel and pkg_rel[pkg.name] == pkg.rel: | ||
491 | 323 | pkgs.append(pkg) | ||
492 | 324 | |||
493 | 325 | return pkgs | ||
494 | 326 | |||
495 | 327 | |||
496 | 328 | def add_pkg(self, pkg_object, release, state, note): | ||
497 | 329 | cve_pkg_entry = CVEPkgRelEntry(pkg_object, release, self, state, note) | ||
498 | 330 | self.pkg_rel_entries[str(pkg_object)] = cve_pkg_entry | ||
499 | 331 | self.pkgs.append(pkg_object) | ||
500 | 332 | pkg_object.add_cve(self) | ||
501 | 333 | |||
502 | 334 | def __str__(self) -> str: | ||
503 | 335 | return self.number | ||
504 | 336 | |||
505 | 337 | def __repr__(self): | ||
506 | 338 | return self.__str__() | ||
507 | 339 | |||
508 | 340 | class Package: | ||
509 | 341 | def __init__(self, pkgname, rel, versions_binaries): | ||
510 | 342 | self.name = pkgname | ||
511 | 343 | self.rel = rel | ||
512 | 344 | self.description = cve_lib.lookup_package_override_description(pkgname) | ||
513 | 345 | |||
514 | 346 | if not self.description: | ||
515 | 347 | if 'description' in sources[rel][pkgname]: | ||
516 | 348 | self.description = sources[rel][pkgname]['description'] | ||
517 | 349 | elif pkgname in source_map_binaries[rel] and \ | ||
518 | 350 | 'description' in source_map_binaries[rel][pkgname]: | ||
519 | 351 | self.description = source_map_binaries[rel][pkgname]['description'] | ||
520 | 352 | else: | ||
521 | 353 | # Get first description found | ||
522 | 354 | if 'binaries' in sources[self.rel][self.name]: | ||
523 | 355 | for binary in sources[self.rel][self.name]['binaries']: | ||
524 | 356 | if binary in source_map_binaries[self.rel] and 'description' in source_map_binaries[self.rel][binary]: | ||
525 | 357 | self.description = source_map_binaries[self.rel][binary]["description"] | ||
526 | 358 | break | ||
527 | 359 | |||
528 | 360 | self.section = sources[rel][pkgname]['section'] | ||
529 | 361 | self.versions_binaries = versions_binaries if versions_binaries else {} | ||
530 | 362 | self.earliest_version = self.get_earliest_version() | ||
531 | 363 | self.latest_version = self.get_latest_version() | ||
532 | 364 | |||
533 | 365 | binary_versions = self.get_binary_versions(self.earliest_version) | ||
534 | 366 | self.is_kernel_pkg = False if len(binary_versions) == 0 else \ | ||
535 | 367 | is_kernel_binaries(self.get_binaries(self.earliest_version, binary_versions[0])) | ||
536 | 368 | self.cves = [] | ||
537 | 369 | |||
538 | 370 | def add_cve(self, cve) -> None: | ||
539 | 371 | self.cves.append(cve) | ||
540 | 372 | |||
541 | 373 | def get_latest_version(self): | ||
542 | 374 | latest = None | ||
543 | 375 | for version in self.versions_binaries.keys(): | ||
544 | 376 | if not latest: | ||
545 | 377 | latest = version | ||
546 | 378 | continue | ||
547 | 379 | elif apt_pkg.version_compare(version, latest) > 0: | ||
548 | 380 | latest = version | ||
549 | 381 | |||
550 | 382 | return latest | ||
551 | 383 | |||
552 | 384 | def get_earliest_version(self): | ||
553 | 385 | earliest = None | ||
554 | 386 | for version in self.versions_binaries.keys(): | ||
555 | 387 | if not earliest: | ||
556 | 388 | earliest = version | ||
557 | 389 | continue | ||
558 | 390 | elif apt_pkg.version_compare(earliest, version) > 0: | ||
559 | 391 | earliest = version | ||
560 | 392 | |||
561 | 393 | return earliest | ||
562 | 394 | |||
563 | 395 | def version_exists(self, source_version): | ||
564 | 396 | return source_version in self.versions_binaries | ||
565 | 397 | |||
566 | 398 | def all_binaries_same_version(self, source_version): | ||
567 | 399 | if source_version not in self.versions_binaries: | ||
568 | 400 | return len(self.versions_binaries[self.earliest_version]) <= 1 | ||
569 | 401 | return len(self.versions_binaries[source_version]) <= 1 | ||
570 | 402 | |||
571 | 403 | def get_version_to_check(self, source_version): | ||
572 | 404 | if not source_version: | ||
573 | 405 | return self.latest_version | ||
574 | 406 | else: | ||
575 | 407 | if source_version in self.versions_binaries or self.all_binaries_same_version(source_version): | ||
576 | 408 | return source_version | ||
577 | 409 | else: | ||
578 | 410 | if source_version and apt_pkg.version_compare(source_version, self.earliest_version) > 0: | ||
579 | 411 | print(f'Wrong CVE entry version {source_version} - earliest for package {self.name} in {self.rel} is {self.earliest_version}') | ||
580 | 412 | |||
581 | 413 | return self.earliest_version | ||
582 | 414 | |||
583 | 415 | def get_binary_versions(self, source_version): | ||
584 | 416 | if not self.versions_binaries: return [] | ||
585 | 417 | |||
586 | 418 | if source_version not in self.versions_binaries: | ||
587 | 419 | # If this is the case, package binaries should all have the same version | ||
588 | 420 | # Relying on that, we can use the version of the CVE as the right version | ||
589 | 421 | return [source_version] | ||
590 | 422 | return list(self.versions_binaries[source_version].keys()) | ||
591 | 423 | |||
592 | 424 | def get_binaries(self, source_version, binary_version): | ||
593 | 425 | if not self.versions_binaries: return {} | ||
594 | 426 | if source_version not in self.versions_binaries: | ||
595 | 427 | if len(self.versions_binaries[self.earliest_version]) != 1: | ||
596 | 428 | print(f"WARN: Version {source_version} doesn't exist yet the package {self.name} has different versions for the binaries") | ||
597 | 429 | |||
598 | 430 | version_binaries = self.versions_binaries[self.earliest_version] | ||
599 | 431 | return version_binaries[self.get_binary_versions(self.earliest_version)[0]] | ||
600 | 432 | return self.versions_binaries[source_version][binary_version] | ||
601 | 433 | |||
602 | 434 | def __str__(self) -> str: | ||
603 | 435 | return f"{self.name}/{self.rel}" | ||
604 | 190 | 436 | ||
613 | 191 | for binary, bin_data in cache[release][source_name][cache_version]['binaries'].items(): | 437 | def __repr__(self): |
614 | 192 | # for kernel we only want linux images | 438 | return self.__str__() |
615 | 193 | if source_name.startswith('linux') and not binary.startswith('linux-image-'): | 439 | |
616 | 194 | continue | 440 | class USN: |
617 | 195 | # skip ignored packages, with exception of golang*-dev pkgs | 441 | def __init__(self, data): |
618 | 196 | if binary.startswith(('golang-go')) or \ | 442 | for item in ['description', 'releases', 'title', 'timestamp', 'summary', 'action', 'cves', 'id', 'isummary']: |
619 | 197 | not any(s in binary for s in packages_to_ignore): | 443 | if item in data: |
620 | 198 | version_map[bin_data['version']].append(binary) | 444 | setattr(self, item, data[item]) |
621 | 445 | else: | ||
622 | 446 | setattr(self, item, None) | ||
623 | 447 | |||
624 | 448 | def __str__(self) -> str: | ||
625 | 449 | return self.id | ||
626 | 199 | 450 | ||
628 | 200 | return release, version, version_map | 451 | def __repr__(self) -> str: |
629 | 452 | return self.id | ||
630 | 201 | 453 | ||
631 | 454 | # Oval Generators | ||
632 | 202 | class OvalGenerator: | 455 | class OvalGenerator: |
633 | 203 | supported_oval_elements = ('definition', 'test', 'object', 'state', 'variable') | 456 | supported_oval_elements = ('definition', 'test', 'object', 'state', 'variable') |
635 | 204 | generator_version = '1.1' | 457 | generator_version = '2' |
636 | 205 | oval_schema_version = '5.11.1' | 458 | oval_schema_version = '5.11.1' |
644 | 206 | def __init__(self, release, release_name = None, warn_method=False, outdir='./', prefix='', oval_format='dpkg') -> None: | 459 | def __init__(self, type, releases, cve_paths, packages, progress, pkg_cache, fixed_only=True, cve_cache=None, cve_prefix_dir=None, outdir='./', oval_format='dpkg') -> None: |
645 | 207 | self.release = release | 460 | self.releases = releases |
639 | 208 | # e.g. codename for trusty/esm should be trusty | ||
640 | 209 | self.release_codename = cve_lib.release_progenitor(release) if cve_lib.release_progenitor(release) else self.release.replace('/', '_') | ||
641 | 210 | self.release_name = release_name | ||
642 | 211 | #self.warn = warn_method or self.warn | ||
643 | 212 | self.tmpdir = tempfile.mkdtemp(prefix='oval_lib-') | ||
646 | 213 | self.output_dir = outdir | 461 | self.output_dir = outdir |
647 | 214 | self.oval_format = oval_format | 462 | self.oval_format = oval_format |
648 | 463 | self.generator_type = type | ||
649 | 464 | self.progress = progress | ||
650 | 465 | self.cve_cache = cve_cache | ||
651 | 466 | self.pkg_cache = pkg_cache | ||
652 | 467 | self.cve_paths = cve_paths | ||
653 | 468 | self.fixed_only = fixed_only | ||
654 | 469 | self.packages, self.cves = self._load(cve_prefix_dir, packages) | ||
655 | 470 | |||
656 | 471 | def _init_ids(self, release): | ||
657 | 472 | # e.g. codename for trusty/esm should be trusty | ||
658 | 473 | self.release = release | ||
659 | 474 | self.release_codename = cve_lib.release_progenitor(self.release) if cve_lib.release_progenitor(self.release) else self.release.replace('/', '_') | ||
660 | 475 | self.release_name = cve_lib.release_name(self.release) | ||
661 | 476 | |||
662 | 477 | self.parent_releases = set() | ||
663 | 478 | current_release = self.release | ||
664 | 479 | while(cve_lib.release_parent(current_release)): | ||
665 | 480 | current_release = cve_lib.release_parent(current_release) | ||
666 | 481 | if current_release != self.release: | ||
667 | 482 | self.parent_releases.add(current_release) | ||
668 | 483 | |||
669 | 215 | self.ns = 'oval:com.ubuntu.{0}'.format(self.release_codename) | 484 | self.ns = 'oval:com.ubuntu.{0}'.format(self.release_codename) |
670 | 216 | self.id = 100 | 485 | self.id = 100 |
671 | 217 | self.host_def_id = self.id | 486 | self.host_def_id = self.id |
672 | 218 | self.release_applicability_definition_id = '{0}:def:{1}0'.format(self.ns, self.id) | 487 | self.release_applicability_definition_id = '{0}:def:{1}0'.format(self.ns, self.id) |
674 | 219 | 488 | ### | |
675 | 489 | # ID schema: 2204|00001|0001 | ||
676 | 490 | # * The first four digits are the ubuntu release number | ||
677 | 491 | # * The next 5 digits is # just a package counter, we increase it for each definition | ||
678 | 492 | # * The last 4 digits is a counter for the criterion | ||
679 | 493 | ### | ||
680 | 494 | release_code = int(self.release_name.split(' ')[1].replace('.', '')) if self.release not in cve_lib.external_releases else 1111 | ||
681 | 495 | self.release_id = release_code * 10 ** 10 | ||
682 | 496 | self.definition_id = self.release_id | ||
683 | 497 | self.definition_step = 1 * 10 ** 5 | ||
684 | 498 | self.criterion_step = 10 | ||
685 | 499 | self.output_filepath = \ | ||
686 | 500 | '{0}com.ubuntu.{1}.{2}.oval.xml'.format('oci.' if self.oval_format == 'oci' else '', self.release.replace('/', '_'), self.generator_type) | ||
687 | 501 | |||
688 | 220 | def _add_structure(self, root) -> None: | 502 | def _add_structure(self, root) -> None: |
689 | 221 | structure = {} | 503 | structure = {} |
690 | 222 | for element in self.supported_oval_elements: | 504 | for element in self.supported_oval_elements: |
691 | @@ -225,21 +507,11 @@ class OvalGenerator: | |||
692 | 225 | 507 | ||
693 | 226 | return structure | 508 | return structure |
694 | 227 | 509 | ||
696 | 228 | def _get_root_element(self, type) -> etree.Element: | 510 | def _get_generator(self, type) -> etree.Element: |
697 | 229 | oval_timestamp = datetime.now(tz=timezone.utc).strftime( | 511 | oval_timestamp = datetime.now(tz=timezone.utc).strftime( |
698 | 230 | '%Y-%m-%dT%H:%M:%S') | 512 | '%Y-%m-%dT%H:%M:%S') |
699 | 231 | 513 | ||
711 | 232 | root_element = etree.Element("oval_definitions", attrib= { | 514 | generator = etree.Element("generator") |
701 | 233 | "xmlns":"http://oval.mitre.org/XMLSchema/oval-definitions-5", | ||
702 | 234 | "xmlns:ind-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#independent", | ||
703 | 235 | "xmlns:oval":"http://oval.mitre.org/XMLSchema/oval-common-5", | ||
704 | 236 | "xmlns:unix-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#unix", | ||
705 | 237 | "xmlns:linux-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#linux", | ||
706 | 238 | "xmlns:xsi":"http://www.w3.org/2001/XMLSchema-instance" , | ||
707 | 239 | "xsi:schemaLocation":"http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd" | ||
708 | 240 | }) | ||
709 | 241 | |||
710 | 242 | generator = etree.SubElement(root_element, "generator") | ||
712 | 243 | product_name = etree.SubElement(generator, "oval:product_name") | 515 | product_name = etree.SubElement(generator, "oval:product_name") |
713 | 244 | product_version = etree.SubElement(generator, "oval:product_version") | 516 | product_version = etree.SubElement(generator, "oval:product_version") |
714 | 245 | schema_version = etree.SubElement(generator, "oval:schema_version") | 517 | schema_version = etree.SubElement(generator, "oval:schema_version") |
715 | @@ -250,6 +522,19 @@ class OvalGenerator: | |||
716 | 250 | schema_version.text = self.oval_schema_version | 522 | schema_version.text = self.oval_schema_version |
717 | 251 | timestamp.text = oval_timestamp | 523 | timestamp.text = oval_timestamp |
718 | 252 | 524 | ||
719 | 525 | return generator | ||
720 | 526 | |||
721 | 527 | def _get_root_element(self) -> etree.Element: | ||
722 | 528 | root_element = etree.Element("oval_definitions", attrib= { | ||
723 | 529 | "xmlns":"http://oval.mitre.org/XMLSchema/oval-definitions-5", | ||
724 | 530 | "xmlns:ind-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#independent", | ||
725 | 531 | "xmlns:oval":"http://oval.mitre.org/XMLSchema/oval-common-5", | ||
726 | 532 | "xmlns:unix-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#unix", | ||
727 | 533 | "xmlns:linux-def":"http://oval.mitre.org/XMLSchema/oval-definitions-5#linux", | ||
728 | 534 | "xmlns:xsi":"http://www.w3.org/2001/XMLSchema-instance" , | ||
729 | 535 | "xsi:schemaLocation":"http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#linux linux-definitions-schema.xsd" | ||
730 | 536 | }) | ||
731 | 537 | |||
732 | 253 | xml_tree = etree.ElementTree(root_element) | 538 | xml_tree = etree.ElementTree(root_element) |
733 | 254 | return xml_tree, root_element | 539 | return xml_tree, root_element |
734 | 255 | 540 | ||
735 | @@ -396,229 +681,90 @@ class OvalGenerator: | |||
736 | 396 | 681 | ||
737 | 397 | return family_state, state | 682 | return family_state, state |
738 | 398 | 683 | ||
747 | 399 | class CVEPkgRelEntry: | 684 | def _add_new_package(self, package_name, cve, release, cve_data, packages) -> None: |
748 | 400 | def __init__(self, pkg, release, cve, status, note, cache) -> None: | 685 | if package_name not in packages: |
749 | 401 | self.pkg = pkg | 686 | _, versions_binaries = get_binarypkgs(self.pkg_cache, package_name, release) |
750 | 402 | self.cve = cve | 687 | pkg_obj = Package(package_name, release, versions_binaries) |
751 | 403 | self.orig_status = status | 688 | packages[package_name] = pkg_obj |
744 | 404 | self.orig_note = note | ||
745 | 405 | self.release = release | ||
746 | 406 | cve_info = CVEPkgRelEntry.parse_package_status(self.release, pkg.name, status, note, cve.number, cache) | ||
752 | 407 | 689 | ||
756 | 408 | self.note = cve_info['note'] | 690 | pkg_obj = packages[package_name] |
757 | 409 | self.status = cve_info['status'] | 691 | cve.add_pkg(pkg_obj, release, cve_data['pkgs'][package_name][release][0],cve_data['pkgs'][package_name][release][1]) |
755 | 410 | self.fixed_version = cve_info['fix-version'] if self.status == 'fixed' else None | ||
758 | 411 | 692 | ||
769 | 412 | @staticmethod | 693 | def _load(self, cve_prefix_dir, packages_filter=None) -> None: |
770 | 413 | def parse_package_status(release, package, status_text, note, filepath, cache): | 694 | cve_lib.load_external_subprojects() |
761 | 414 | """ parse ubuntu package status string format: | ||
762 | 415 | <status code> (<version/notes>) | ||
763 | 416 | outputs dictionary: { | ||
764 | 417 | 'status' : '<not-applicable | unknown | vulnerable | fixed>', | ||
765 | 418 | 'note' : '<description of the status>', | ||
766 | 419 | 'fix-version' : '<version with issue fixed, if applicable>', | ||
767 | 420 | 'bin-pkgs' : [] | ||
768 | 421 | } """ | ||
771 | 422 | 695 | ||
773 | 423 | # TODO fix for CVE Generator | 696 | cve_paths = [] |
774 | 697 | for pathname in self.cve_paths: | ||
775 | 698 | cve_paths = cve_paths + glob.glob(os.path.join(cve_prefix_dir, pathname)) | ||
776 | 424 | 699 | ||
782 | 425 | # break out status code and detail | 700 | cve_paths.sort(key=lambda cve: |
783 | 426 | code = status_text.lower() | 701 | (int(cve.split('/')[-1].split('-')[1]), int(cve.split('/')[-1].split('-')[2])) \ |
784 | 427 | detail = note.strip('()') if note else None | 702 | if cve.split('/')[-1].split('-')[2].isnumeric() \ |
785 | 428 | status = {} | 703 | else (int(cve.split('/')[-1].split('-')[1]), 0) |
786 | 429 | fix_version = None | 704 | ) |
787 | 430 | 705 | ||
790 | 431 | if detail and detail[0].isdigit() and len(detail.split(' ')) == 1: | 706 | packages = {} |
791 | 432 | fix_version = detail | 707 | cves = {} |
792 | 708 | base_releases = self.releases | ||
793 | 709 | final_releases = set(self.releases) | ||
794 | 710 | for current_release in base_releases: | ||
795 | 711 | while(cve_lib.release_parent(current_release)): | ||
796 | 712 | current_release = cve_lib.release_parent(current_release) | ||
797 | 713 | final_releases.add(current_release) | ||
798 | 714 | |||
799 | 715 | for release in final_releases: | ||
800 | 716 | packages.setdefault(release, {}) | ||
801 | 717 | cves.setdefault(release, {}) | ||
802 | 718 | sources[release] = load(releases=[release], skip_eol_releases=False)[release] | ||
803 | 433 | 719 | ||
809 | 434 | parent = release | 720 | orig_name = cve_lib.get_orig_rel_name(release) |
810 | 435 | if cache and code != 'dne': | 721 | if '/' in orig_name: |
811 | 436 | parent, status['source-version'], status['bin-pkgs'] = get_binarypkgs(cache, package, release, version=fix_version) | 722 | orig_name = orig_name.split('/', maxsplit=1)[1] |
812 | 437 | if parent != release: | 723 | source_map_binaries[release] = load(data_type='packages',releases=[orig_name], skip_eol_releases=False)[orig_name] \ |
813 | 438 | status['parent'] = parent | 724 | if release not in cve_lib.external_releases else {} |
814 | 439 | 725 | ||
961 | 440 | note_end = " (note: '{0}').".format(detail) if detail else '.' | 726 | i = 0 |
962 | 441 | if code == 'dne': | 727 | for cve_path in cve_paths: |
963 | 442 | status['status'] = 'not-applicable' | 728 | cve_number = cve_path.rsplit('/', 1)[1] |
964 | 443 | status['note'] = \ | 729 | i += 1 |
819 | 444 | " package does not exist in {0}{1}".format(release, note_end) | ||
820 | 445 | elif code == 'ignored': | ||
821 | 446 | status['status'] = 'vulnerable' | ||
822 | 447 | status['note'] = ": while related to the CVE in some way, a decision has been made to ignore this issue{0}".format(note_end) | ||
823 | 448 | elif code == 'not-affected': | ||
824 | 449 | # check if there is a release version and if so, test for | ||
825 | 450 | # package existence with that version | ||
826 | 451 | if fix_version: | ||
827 | 452 | status['status'] = 'fixed' | ||
828 | 453 | status['note'] = " package in {0}, is related to the CVE in some way and has been fixed{1}".format(parent, note_end) | ||
829 | 454 | status['fix-version'] = fix_version | ||
830 | 455 | else: | ||
831 | 456 | status['status'] = 'not-vulnerable' | ||
832 | 457 | status['note'] = " package in {0}, while related to the CVE in some way, is not affected{1}".format(parent, note_end) | ||
833 | 458 | elif code == 'needed': | ||
834 | 459 | status['status'] = 'vulnerable' | ||
835 | 460 | status['note'] = \ | ||
836 | 461 | " package in {0} is affected and needs fixing{1}".format(parent, note_end) | ||
837 | 462 | elif code == 'pending': | ||
838 | 463 | # pending means that packages have been prepared and are in | ||
839 | 464 | # -proposed or in a ppa somewhere, and should have a version | ||
840 | 465 | # attached. If there is a version, test for package existence | ||
841 | 466 | # with that version, otherwise mark as vulnerable | ||
842 | 467 | if fix_version: | ||
843 | 468 | status['status'] = 'fixed' | ||
844 | 469 | status['note'] = " package in {0} is affected. An update containing the fix has been completed and is pending publication{1}".format(parent, note_end) | ||
845 | 470 | status['fix-version'] = fix_version | ||
846 | 471 | else: | ||
847 | 472 | status['status'] = 'vulnerable' | ||
848 | 473 | status['note'] = " package in {0} is affected. An update containing the fix has been completed and is pending publication{1}".format(parent, note_end) | ||
849 | 474 | elif code == 'deferred': | ||
850 | 475 | status['status'] = 'vulnerable' | ||
851 | 476 | status['note'] = " package in {0} is affected, but a decision has been made to defer addressing it{1}".format(parent, note_end) | ||
852 | 477 | elif code in ['released']: | ||
853 | 478 | # if there isn't a release version, then just mark | ||
854 | 479 | # as vulnerable to test for package existence | ||
855 | 480 | if not fix_version: | ||
856 | 481 | status['status'] = 'vulnerable' | ||
857 | 482 | status['note'] = " package in {0} was vulnerable and has been fixed, but no release version available for it{1}".format(parent, note_end) | ||
858 | 483 | else: | ||
859 | 484 | status['status'] = 'fixed' | ||
860 | 485 | status['note'] = " package in {0} was vulnerable but has been fixed{1}".format(parent, note_end) | ||
861 | 486 | status['fix-version'] = fix_version | ||
862 | 487 | elif code == 'needs-triage': | ||
863 | 488 | status['status'] = 'vulnerable' | ||
864 | 489 | status['note'] = " package in {0} is affected and may need fixing{1}".format(parent, note_end) | ||
865 | 490 | else: | ||
866 | 491 | # TODO LOGGIN | ||
867 | 492 | print('Unsupported status "{0}" in {1}_{2} in "{3}". Setting to "unknown".'.format(code, release, package, filepath)) | ||
868 | 493 | status['status'] = 'unknown' | ||
869 | 494 | status['note'] = " package in {0} has a vulnerability that is not known (status: '{1}'). It is pending evaluation{2}".format(release, code, note_end) | ||
870 | 495 | |||
871 | 496 | return status | ||
872 | 497 | |||
873 | 498 | def __str__(self) -> str: | ||
874 | 499 | return f'{str(self.pkg)}:{self.status} {self.fixed_version}' | ||
875 | 500 | |||
876 | 501 | class CVE: | ||
877 | 502 | def __init__(self, number, info, pkgs=[]) -> None: | ||
878 | 503 | self.number = number | ||
879 | 504 | self.description = info['Description'] | ||
880 | 505 | self.priority = info['Priority'][0] | ||
881 | 506 | self.public_date = info['PublicDate'] | ||
882 | 507 | self.cvss = info['CVSS'] | ||
883 | 508 | self.usns = [] | ||
884 | 509 | for url in info['References'].split('\n'): | ||
885 | 510 | if 'https://ubuntu.com/security/notices/USN-' in url: | ||
886 | 511 | self.usns.append(url[40:]) | ||
887 | 512 | self.pkg_rel_entries = {} | ||
888 | 513 | self.pkgs = pkgs | ||
889 | 514 | |||
890 | 515 | def add_pkg(self, pkg_object, cve_pkg_entry): | ||
891 | 516 | if cve_pkg_entry.status in ['not-vulnerable', 'not-applicable']: | ||
892 | 517 | return | ||
893 | 518 | |||
894 | 519 | self.pkg_rel_entries[pkg_object.name] = cve_pkg_entry | ||
895 | 520 | self.pkgs.append(pkg_object) | ||
896 | 521 | pkg_object.cves.append(self) | ||
897 | 522 | |||
898 | 523 | def __str__(self) -> str: | ||
899 | 524 | return self.number | ||
900 | 525 | |||
901 | 526 | def __repr__(self): | ||
902 | 527 | return self.__str__() | ||
903 | 528 | |||
904 | 529 | class Package: | ||
905 | 530 | def __init__(self, pkgname, rel, binaries, version): | ||
906 | 531 | self.name = pkgname | ||
907 | 532 | self.rel = rel | ||
908 | 533 | self.description = cve_lib.lookup_package_override_description(pkgname) | ||
909 | 534 | |||
910 | 535 | if not self.description: | ||
911 | 536 | if 'description' in sources[rel][pkgname]: | ||
912 | 537 | self.description = sources[rel][pkgname]['description'] | ||
913 | 538 | elif pkgname in source_map_binaries[rel] and \ | ||
914 | 539 | 'description' in source_map_binaries[rel][pkgname]: | ||
915 | 540 | self.description = source_map_binaries[rel][pkgname]['description'] | ||
916 | 541 | else: | ||
917 | 542 | # Get first description found | ||
918 | 543 | if 'binaries' in sources[self.rel][self.name]: | ||
919 | 544 | for binary in sources[self.rel][self.name]['binaries']: | ||
920 | 545 | if binary in source_map_binaries[self.rel] and 'description' in source_map_binaries[self.rel][binary]: | ||
921 | 546 | self.description = source_map_binaries[self.rel][binary]["description"] | ||
922 | 547 | break | ||
923 | 548 | |||
924 | 549 | self.section = sources[rel][pkgname]['section'] | ||
925 | 550 | self.version = version | ||
926 | 551 | self.binaries = binaries if binaries else [] | ||
927 | 552 | self.cves = [] | ||
928 | 553 | |||
929 | 554 | def add_cve(self, cve) -> None: | ||
930 | 555 | self.cves.append(cve) | ||
931 | 556 | |||
932 | 557 | def __str__(self) -> str: | ||
933 | 558 | return f"{self.name}/{self.rel}" | ||
934 | 559 | |||
935 | 560 | def __repr__(self): | ||
936 | 561 | return self.__str__() | ||
937 | 562 | |||
938 | 563 | class OvalGeneratorPkg(OvalGenerator): | ||
939 | 564 | def __init__(self, release, release_name, cve_paths, packages, progress, pkg_cache, fixed_only=True, cve_cache=None, cve_prefix_dir=None, warn_method=False, outdir='./', prefix='', oval_format='dpkg') -> None: | ||
940 | 565 | super().__init__(release, release_name, warn_method, outdir, prefix, oval_format) | ||
941 | 566 | self.progress = progress | ||
942 | 567 | self.cve_cache = cve_cache | ||
943 | 568 | self.pkg_cache = pkg_cache | ||
944 | 569 | self.cve_paths = cve_paths | ||
945 | 570 | self.fixed_only = fixed_only | ||
946 | 571 | self.packages = self._load_pkgs(cve_prefix_dir, packages) | ||
947 | 572 | |||
948 | 573 | def _reset(self): | ||
949 | 574 | ### | ||
950 | 575 | # ID schema: 2204|00001|0001 | ||
951 | 576 | # * The first four digits are the ubuntu release number | ||
952 | 577 | # * The next 5 digits is # just a package counter, we increase it for each definition | ||
953 | 578 | # * The last 4 digits is a counter for the criterion | ||
954 | 579 | ### | ||
955 | 580 | release_code = int(self.release_name.split(' ')[1].replace('.', '')) if self.release not in cve_lib.external_releases else 1111 | ||
956 | 581 | self.definition_id = release_code * 10 ** 10 | ||
957 | 582 | self.definition_step = 1 * 10 ** 5 | ||
958 | 583 | self.criterion_step = 10 | ||
959 | 584 | self.output_filepath = \ | ||
960 | 585 | '{0}com.ubuntu.{1}.pkg.oval.xml'.format('oci.' if self.oval_format == 'oci' else '', self.release.replace('/', '_')) | ||
965 | 586 | 730 | ||
971 | 587 | def _generate_advisory(self, package: Package) -> etree.Element: | 731 | if self.progress: |
972 | 588 | advisory = etree.Element("advisory") | 732 | print(f'[{i:5}/{len(cve_paths)}] Processing {cve_number:18}', end='\r') |
968 | 589 | rights = etree.SubElement(advisory, "rights") | ||
969 | 590 | component = etree.SubElement(advisory, "component") | ||
970 | 591 | version = etree.SubElement(advisory, "current_version") | ||
973 | 592 | 733 | ||
979 | 593 | for cve in package.cves: | 734 | if not cve_number in self.cve_cache: |
980 | 594 | if self.fixed_only and cve.pkg_rel_entries[package.name].status != 'fixed': | 735 | self.cve_cache[cve_number] = cve_lib.load_cve(cve_path) |
976 | 595 | continue | ||
977 | 596 | cve_obj = self._generate_cve_tag(cve) | ||
978 | 597 | advisory.append(cve_obj) | ||
981 | 598 | 736 | ||
985 | 599 | rights.text = f"Copyright (C) {datetime.now().year} Canonical Ltd." | 737 | info = self.cve_cache[cve_number] |
986 | 600 | component.text = package.section | 738 | cve_obj = CVE(cve_number, info) |
987 | 601 | version.text = package.version | 739 | for pkg in info['pkgs']: |
988 | 740 | if packages_filter and pkg not in packages_filter: | ||
989 | 741 | continue | ||
990 | 602 | 742 | ||
992 | 603 | return advisory | 743 | for release in final_releases: |
993 | 744 | if pkg in sources[release] and release in info['pkgs'][pkg] and \ | ||
994 | 745 | info['pkgs'][pkg][release][0] != 'DNE': | ||
995 | 746 | self._add_new_package(pkg, cve_obj, release, info, packages[release]) | ||
996 | 747 | if cve_number not in cves[release]: | ||
997 | 748 | cves[release][cve_number] = cve_obj | ||
998 | 604 | 749 | ||
1009 | 605 | def _generate_metadata(self, package: Package) -> etree.Element: | 750 | for release in final_releases: |
1010 | 606 | metadata = etree.Element("metadata") | 751 | packages[release] = dict(sorted(packages[release].items())) |
1011 | 607 | title = etree.SubElement(metadata, "title") | 752 | cves[release] = dict(sorted(cves[release].items())) |
1002 | 608 | reference = self._generate_reference(package) | ||
1003 | 609 | advisory = self._generate_advisory(package) | ||
1004 | 610 | metadata.append(reference) | ||
1005 | 611 | description = etree.SubElement(metadata, "description") | ||
1006 | 612 | affected = etree.SubElement(metadata, "affected", attrib = {"family": "unix"}) | ||
1007 | 613 | platform = etree.SubElement(affected, "platform") | ||
1008 | 614 | metadata.append(advisory) | ||
1012 | 615 | 753 | ||
1016 | 616 | platform.text = self.release_name | 754 | if self.progress: |
1017 | 617 | title.text = package.name | 755 | print(' ' * 40, end='\r') |
1018 | 618 | description.text = package.description | 756 | return packages, cves |
1019 | 619 | 757 | ||
1021 | 620 | return metadata | 758 | def _write_oval_xml(self, xml_tree: etree.ElementTree, root_element: etree.ElementTree) -> None: |
1022 | 759 | if sys.version_info[0] >= 3 and sys.version_info[1] >= 9: | ||
1023 | 760 | etree.indent(xml_tree, level=0) # indent only available from Python 3.9 | ||
1024 | 761 | xml_tree.write(os.path.join(self.output_dir, self.output_filepath)) | ||
1025 | 762 | else: | ||
1026 | 763 | xmlstr = minidom.parseString(etree.tostring(root_element)).toprettyxml(indent=" ") | ||
1027 | 764 | with open(os.path.join(self.output_dir, self.output_filepath), 'w') as file: | ||
1028 | 765 | file.write(xmlstr) | ||
1029 | 621 | 766 | ||
1030 | 767 | # Object generators | ||
1031 | 622 | def _generate_criteria(self) -> etree.Element: | 768 | def _generate_criteria(self) -> etree.Element: |
1032 | 623 | criteria = etree.Element("criteria") | 769 | criteria = etree.Element("criteria") |
1033 | 624 | if self.oval_format == 'dpkg': | 770 | if self.oval_format == 'dpkg': |
1034 | @@ -629,38 +775,15 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1035 | 629 | extend_definition.set("applicability_check", "true") | 775 | extend_definition.set("applicability_check", "true") |
1036 | 630 | 776 | ||
1037 | 631 | return criteria | 777 | return criteria |
1063 | 632 | 778 | ||
1064 | 633 | def _generate_subcriteria(self, operator) -> etree.Element: | 779 | def _generate_definition_object(self, object) -> etree.Element: |
1040 | 634 | return etree.Element("criteria", attrib={ | ||
1041 | 635 | "operator": operator | ||
1042 | 636 | }) | ||
1043 | 637 | |||
1044 | 638 | def _generate_criterion_element(self, comment, id) -> etree.Element: | ||
1045 | 639 | criterion = etree.Element("criterion", attrib={ | ||
1046 | 640 | "test_ref": f"{self.ns}:tst:{id}", | ||
1047 | 641 | "comment": comment | ||
1048 | 642 | }) | ||
1049 | 643 | |||
1050 | 644 | return criterion | ||
1051 | 645 | |||
1052 | 646 | # Element generators | ||
1053 | 647 | def _generate_reference(self, package) -> etree.Element: | ||
1054 | 648 | reference = etree.Element("reference", attrib={ | ||
1055 | 649 | "source": "Package", | ||
1056 | 650 | "ref_id": package.name, | ||
1057 | 651 | "ref_url": f'https://launchpad.net/ubuntu/+source/{package.name}' | ||
1058 | 652 | }) | ||
1059 | 653 | |||
1060 | 654 | return reference | ||
1061 | 655 | |||
1062 | 656 | def _generate_definition_element(self, package) -> None: | ||
1065 | 657 | id = f"{self.ns}:def:{self.definition_id}" | 780 | id = f"{self.ns}:def:{self.definition_id}" |
1066 | 658 | definition = etree.Element("definition") | 781 | definition = etree.Element("definition") |
1067 | 659 | definition.set("class", "vulnerability") | 782 | definition.set("class", "vulnerability") |
1068 | 660 | definition.set("id", id) | 783 | definition.set("id", id) |
1069 | 661 | definition.set("version", "1") | 784 | definition.set("version", "1") |
1070 | 662 | 785 | ||
1072 | 663 | metadata = self._generate_metadata(package) | 786 | metadata = self._generate_metadata(object) |
1073 | 664 | criteria = self._generate_criteria() | 787 | criteria = self._generate_criteria() |
1074 | 665 | definition.append(metadata) | 788 | definition.append(metadata) |
1075 | 666 | definition.append(criteria) | 789 | definition.append(criteria) |
1076 | @@ -692,7 +815,7 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1077 | 692 | cve_tag.set('usns', ','.join(cve.usns)) | 815 | cve_tag.set('usns', ','.join(cve.usns)) |
1078 | 693 | 816 | ||
1079 | 694 | return cve_tag | 817 | return cve_tag |
1081 | 695 | 818 | ||
1082 | 696 | def _generate_var_element(self, comment, id, binaries) -> etree.Element: | 819 | def _generate_var_element(self, comment, id, binaries) -> etree.Element: |
1083 | 697 | var = etree.Element("constant_variable", | 820 | var = etree.Element("constant_variable", |
1084 | 698 | attrib={ | 821 | attrib={ |
1085 | @@ -813,6 +936,72 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1086 | 813 | 936 | ||
1087 | 814 | return object | 937 | return object |
1088 | 815 | 938 | ||
1089 | 939 | def _generate_criterion_element(self, comment, id) -> etree.Element: | ||
1090 | 940 | criterion = etree.Element("criterion", attrib={ | ||
1091 | 941 | "test_ref": f"{self.ns}:tst:{id}", | ||
1092 | 942 | "comment": comment | ||
1093 | 943 | }) | ||
1094 | 944 | |||
1095 | 945 | return criterion | ||
1096 | 946 | |||
1097 | 947 | def _generate_vulnerable_elements(self, package, binaries, obj_id=None): | ||
1098 | 948 | binary_keyword = 'binaries' if len(binaries) > 1 else 'binary' | ||
1099 | 949 | test_note = f"Does the '{package.name}' package exist?" | ||
1100 | 950 | object_note = f"The '{package.name}' package {binary_keyword}" | ||
1101 | 951 | |||
1102 | 952 | test = self._generate_test_element(test_note, self.definition_id, False, 'pkg', obj_id=obj_id) | ||
1103 | 953 | |||
1104 | 954 | if not obj_id: | ||
1105 | 955 | object = self._generate_object_element(object_note, self.definition_id, self.definition_id) | ||
1106 | 956 | |||
1107 | 957 | if package.is_kernel_pkg: | ||
1108 | 958 | regex = process_kernel_binaries(binaries, 'oci') | ||
1109 | 959 | binaries = [f'{regex}'] | ||
1110 | 960 | |||
1111 | 961 | final_binaries = [] | ||
1112 | 962 | if self.oval_format == 'oci': | ||
1113 | 963 | variable_values = '(?::\w+|)\s+(.*)$' | ||
1114 | 964 | for binary in binaries: | ||
1115 | 965 | final_binaries.append(f'^{binary}{variable_values}') | ||
1116 | 966 | else: | ||
1117 | 967 | final_binaries = binaries | ||
1118 | 968 | |||
1119 | 969 | var = self._generate_var_element(object_note, self.definition_id, final_binaries) | ||
1120 | 970 | else: | ||
1121 | 971 | object = None | ||
1122 | 972 | var = None | ||
1123 | 973 | return test, object, var | ||
1124 | 974 | |||
1125 | 975 | def _generate_fixed_elements(self, package, binaries, version, obj_id=None): | ||
1126 | 976 | binary_keyword = 'binaries' if len(binaries) > 1 else 'binary' | ||
1127 | 977 | test_note = f"Does the '{package.name}' package exist and is the version less than '{version}'?" | ||
1128 | 978 | object_note = f"The '{package.name}' package {binary_keyword}" | ||
1129 | 979 | state_note = f"The package version is less than '{version}'" | ||
1130 | 980 | |||
1131 | 981 | test = self._generate_test_element(test_note, self.definition_id, True, 'pkg', obj_id=obj_id) | ||
1132 | 982 | if not obj_id: | ||
1133 | 983 | object = self._generate_object_element(object_note, self.definition_id, self.definition_id) | ||
1134 | 984 | |||
1135 | 985 | final_binaries = binaries | ||
1136 | 986 | if self.oval_format == 'oci': | ||
1137 | 987 | if package.is_kernel_pkg: | ||
1138 | 988 | regex = process_kernel_binaries(binaries, 'oci') | ||
1139 | 989 | final_binaries = [f'^{regex}(?::\w+|)\s+(.*)$'] | ||
1140 | 990 | else: | ||
1141 | 991 | variable_values = '(?::\w+|)\s+(.*)$' | ||
1142 | 992 | |||
1143 | 993 | final_binaries = [] | ||
1144 | 994 | for binary in binaries: | ||
1145 | 995 | final_binaries.append(f'^{binary}{variable_values}') | ||
1146 | 996 | |||
1147 | 997 | var = self._generate_var_element(object_note, self.definition_id, final_binaries) | ||
1148 | 998 | else: | ||
1149 | 999 | object = None | ||
1150 | 1000 | var = None | ||
1151 | 1001 | state = self._generate_state_element(state_note, self.definition_id, version) | ||
1152 | 1002 | |||
1153 | 1003 | return test, object, var, state | ||
1154 | 1004 | |||
1155 | 816 | # Running kernel element generators | 1005 | # Running kernel element generators |
1156 | 817 | def _add_running_kernel_checks(self, root_element): | 1006 | def _add_running_kernel_checks(self, root_element): |
1157 | 818 | objects = root_element.find("objects") | 1007 | objects = root_element.find("objects") |
1158 | @@ -890,6 +1079,11 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1159 | 890 | return test | 1079 | return test |
1160 | 891 | 1080 | ||
1161 | 892 | # Kernel elements generators | 1081 | # Kernel elements generators |
1162 | 1082 | def _generate_criteria_kernel(self, operator) -> etree.Element: | ||
1163 | 1083 | return etree.Element("criteria", attrib={ | ||
1164 | 1084 | "operator": operator | ||
1165 | 1085 | }) | ||
1166 | 1086 | |||
1167 | 893 | def _generate_kernel_version_object_element(self, id, var_id) -> etree.Element: | 1087 | def _generate_kernel_version_object_element(self, id, var_id) -> etree.Element: |
1168 | 894 | object = etree.Element("ind-def:variable_object", | 1088 | object = etree.Element("ind-def:variable_object", |
1169 | 895 | attrib={ | 1089 | attrib={ |
1170 | @@ -924,12 +1118,12 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1171 | 924 | value.text = f"0:{patched}" | 1118 | value.text = f"0:{patched}" |
1172 | 925 | return state | 1119 | return state |
1173 | 926 | 1120 | ||
1175 | 927 | def _generate_kernel_package_elements(self, package: Package, root_element, running_kernel_check_id) -> etree.Element: | 1121 | def _generate_kernel_package_elements(self, package: Package, binaries, root_element, running_kernel_check_id) -> etree.Element: |
1176 | 928 | tests = root_element.find("tests") | 1122 | tests = root_element.find("tests") |
1177 | 929 | states = root_element.find("states") | 1123 | states = root_element.find("states") |
1178 | 930 | 1124 | ||
1179 | 931 | comment_running_kernel = f'Is kernel {package.name} running?' | 1125 | comment_running_kernel = f'Is kernel {package.name} running?' |
1181 | 932 | regex = process_kernel_binaries(package.binaries, self.oval_format) | 1126 | regex = process_kernel_binaries(binaries, self.oval_format) |
1182 | 933 | 1127 | ||
1183 | 934 | criterion_running_kernel = self._generate_criterion_element(comment_running_kernel, self.definition_id) | 1128 | criterion_running_kernel = self._generate_criterion_element(comment_running_kernel, self.definition_id) |
1184 | 935 | test_running_kernel = self._generate_test_element_running_kernel(self.definition_id, comment_running_kernel, running_kernel_check_id) | 1129 | test_running_kernel = self._generate_test_element_running_kernel(self.definition_id, comment_running_kernel, running_kernel_check_id) |
1185 | @@ -942,22 +1136,25 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1186 | 942 | 1136 | ||
1187 | 943 | return criterion_running_kernel | 1137 | return criterion_running_kernel |
1188 | 944 | 1138 | ||
1190 | 945 | def _add_fixed_kernel_elements(self, cve: CVE, package: Package, package_rel_entry:CVEPkgRelEntry, root_element, running_kernel_id, fixed_versions) -> etree.Element: | 1139 | def _add_kernel_elements(self, cve: CVE, package: Package, version, package_rel_entry:CVEPkgRelEntry, root_element, running_kernel_id, fixed_versions) -> etree.Element: |
1191 | 946 | tests = root_element.find("tests") | 1140 | tests = root_element.find("tests") |
1192 | 947 | objects = root_element.find("objects") | 1141 | objects = root_element.find("objects") |
1193 | 948 | states = root_element.find("states") | 1142 | states = root_element.find("states") |
1194 | 949 | 1143 | ||
1195 | 950 | comment_version = f'Kernel {package.name} version comparison' | 1144 | comment_version = f'Kernel {package.name} version comparison' |
1197 | 951 | comment_criterion = f'({cve.number}) {package.name} {package_rel_entry.note}' | 1145 | comment_criterion = '' |
1198 | 1146 | if self.generator_type == 'pkg': | ||
1199 | 1147 | comment_criterion = f'({cve.number}) ' | ||
1200 | 1148 | comment_criterion = comment_criterion + f'{package.name}{package_rel_entry.note}' | ||
1201 | 952 | 1149 | ||
1204 | 953 | if package_rel_entry.fixed_version in fixed_versions: | 1150 | if version in fixed_versions: |
1205 | 954 | criterion_version = self._generate_criterion_element(comment_criterion, fixed_versions[package_rel_entry.fixed_version]) | 1151 | criterion_version = self._generate_criterion_element(comment_criterion, fixed_versions[version]) |
1206 | 955 | else: | 1152 | else: |
1207 | 956 | create_state = False | 1153 | create_state = False |
1208 | 957 | 1154 | ||
1210 | 958 | if package_rel_entry.fixed_version: | 1155 | if version: |
1211 | 959 | create_state = True | 1156 | create_state = True |
1213 | 960 | ste_kernel_version = self._generate_state_kernel_element("Kernel check", self.definition_id, package_rel_entry.fixed_version) | 1157 | ste_kernel_version = self._generate_state_kernel_element("Kernel check", self.definition_id, version) |
1214 | 961 | states.append(ste_kernel_version) | 1158 | states.append(ste_kernel_version) |
1215 | 962 | 1159 | ||
1216 | 963 | obj_kernel_version = self._generate_kernel_version_object_element(self.definition_id, running_kernel_id) | 1160 | obj_kernel_version = self._generate_kernel_version_object_element(self.definition_id, running_kernel_id) |
1217 | @@ -969,7 +1166,7 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1218 | 969 | tests.append(test_kernel_version) | 1166 | tests.append(test_kernel_version) |
1219 | 970 | objects.append(obj_kernel_version) | 1167 | objects.append(obj_kernel_version) |
1220 | 971 | 1168 | ||
1222 | 972 | fixed_versions[package_rel_entry.fixed_version] = self.definition_id | 1169 | fixed_versions[version] = self.definition_id |
1223 | 973 | 1170 | ||
1224 | 974 | return criterion_version | 1171 | return criterion_version |
1225 | 975 | 1172 | ||
1226 | @@ -977,8 +1174,10 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1227 | 977 | def _increase_id(self, is_definition): | 1174 | def _increase_id(self, is_definition): |
1228 | 978 | if is_definition: | 1175 | if is_definition: |
1229 | 979 | self.definition_id += self.definition_step | 1176 | self.definition_id += self.definition_step |
1232 | 980 | clean_value = self.definition_step / 10 | 1177 | # Ugly hack, Python doesn't like operating big numbers |
1233 | 981 | self.definition_id = int(int(self.definition_id / clean_value) * clean_value) | 1178 | criterion_appendix_length = len(str(self.definition_step)) - 1 |
1234 | 1179 | self.definition_id = int(str(self.definition_id)[: -1 * criterion_appendix_length]) | ||
1235 | 1180 | self.definition_id = int(self.definition_id * self.definition_step) | ||
1236 | 982 | else: | 1181 | else: |
1237 | 983 | self.definition_id += self.criterion_step | 1182 | self.definition_id += self.criterion_step |
1238 | 984 | 1183 | ||
1239 | @@ -994,99 +1193,177 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1240 | 994 | criteria.append(element) | 1193 | criteria.append(element) |
1241 | 995 | 1194 | ||
1242 | 996 | def _add_criterion(self, id, package_entry, cve, definition, depth=2) -> None: | 1195 | def _add_criterion(self, id, package_entry, cve, definition, depth=2) -> None: |
1244 | 997 | criterion_note = f'({cve.number}) {package_entry.pkg.name}{package_entry.note}' | 1196 | criterion_note = f'({cve.number}) ' if self.generator_type == 'pkg' else '' |
1245 | 1197 | criterion_note += f'{package_entry.pkg.name}{package_entry.note}' | ||
1246 | 998 | criterion = self._generate_criterion_element(criterion_note, id) | 1198 | criterion = self._generate_criterion_element(criterion_note, id) |
1247 | 999 | self._add_to_criteria(definition, criterion, depth) | 1199 | self._add_to_criteria(definition, criterion, depth) |
1248 | 1000 | 1200 | ||
1250 | 1001 | def _generate_elements(self, package, binaries, pkg_rel_entry, obj_id=None): | 1201 | def _generate_elements(self, package, binaries, version, pkg_rel_entry, obj_id=None): |
1251 | 1002 | create_state = False | 1202 | create_state = False |
1252 | 1003 | state = None | 1203 | state = None |
1253 | 1004 | var = None | 1204 | var = None |
1254 | 1005 | obj = None | 1205 | obj = None |
1256 | 1006 | binary_keyword = 'binaries' if len(package.binaries) > 1 else 'binary' | 1206 | binary_keyword = 'binaries' if len(binaries) > 1 else 'binary' |
1257 | 1007 | object_note = f"The '{package.name}' package {binary_keyword}" | 1207 | object_note = f"The '{package.name}' package {binary_keyword}" |
1258 | 1008 | test_note = "" | 1208 | test_note = "" |
1259 | 1009 | 1209 | ||
1260 | 1210 | final_binaries = binaries | ||
1261 | 1010 | if self.oval_format == 'oci': | 1211 | if self.oval_format == 'oci': |
1265 | 1011 | if is_kernel_binaries(package.binaries): | 1212 | if package.is_kernel_pkg: |
1266 | 1012 | regex = process_kernel_binaries(package.binaries, 'oci') | 1213 | regex = process_kernel_binaries(binaries, 'oci') |
1267 | 1013 | binaries = [f'^{regex}(?::\w+|)\s+(.*)$'] | 1214 | final_binaries = [f'^{regex}(?::\w+|)\s+(.*)$'] |
1268 | 1014 | else: | 1215 | else: |
1269 | 1015 | variable_values = '(?::\w+|)\s+(.*)$' | 1216 | variable_values = '(?::\w+|)\s+(.*)$' |
1270 | 1016 | 1217 | ||
1274 | 1017 | binaries = [] | 1218 | final_binaries = [] |
1275 | 1018 | for binary in package.binaries: | 1219 | for binary in binaries: |
1276 | 1019 | binaries.append(f'^{binary}{variable_values}') | 1220 | final_binaries.append(f'^{binary}{variable_values}') |
1277 | 1020 | 1221 | ||
1278 | 1021 | if pkg_rel_entry.status == 'vulnerable': | 1222 | if pkg_rel_entry.status == 'vulnerable': |
1279 | 1022 | test_note = f"Does the '{package.name}' package exist?" | 1223 | test_note = f"Does the '{package.name}' package exist?" |
1280 | 1023 | elif pkg_rel_entry.status == 'fixed': | 1224 | elif pkg_rel_entry.status == 'fixed': |
1283 | 1024 | test_note = f"Does the '{package.name}' package exist and is the version less than '{pkg_rel_entry.fixed_version}'?" | 1225 | test_note = f"Does the '{package.name}' package exist and is the version less than '{version}'?" |
1284 | 1025 | state_note = f"The package version is less than '{pkg_rel_entry.fixed_version}'" | 1226 | state_note = f"The package version is less than '{version}'" |
1285 | 1026 | 1227 | ||
1287 | 1027 | state = self._generate_state_element(state_note, self.definition_id, pkg_rel_entry.fixed_version) | 1228 | state = self._generate_state_element(state_note, self.definition_id, version) |
1288 | 1028 | create_state = True | 1229 | create_state = True |
1289 | 1029 | 1230 | ||
1295 | 1030 | if not obj_id or create_state: | 1231 | if not obj_id: |
1296 | 1031 | obj_id = None | 1232 | var = self._generate_var_element(object_note, self.definition_id, final_binaries) |
1292 | 1032 | |||
1293 | 1033 | var = self._generate_var_element(object_note, self.definition_id, binaries) | ||
1294 | 1034 | |||
1297 | 1035 | obj = self._generate_object_element(object_note, self.definition_id, self.definition_id) | 1233 | obj = self._generate_object_element(object_note, self.definition_id, self.definition_id) |
1298 | 1036 | 1234 | ||
1299 | 1037 | test = self._generate_test_element(test_note, self.definition_id, create_state, 'pkg', obj_id=obj_id) | 1235 | test = self._generate_test_element(test_note, self.definition_id, create_state, 'pkg', obj_id=obj_id) |
1300 | 1038 | 1236 | ||
1301 | 1039 | return test, obj, var, state | 1237 | return test, obj, var, state |
1302 | 1040 | 1238 | ||
1308 | 1041 | def _populate_pkg(self, package, root_element): | 1239 | # returns True if we should ignore this source package; primarily used |
1309 | 1042 | tests = root_element.find("tests") | 1240 | # for -edge kernels |
1310 | 1043 | objects = root_element.find("objects") | 1241 | def _ignore_source_package(self, source): |
1311 | 1044 | variables = root_element.find("variables") | 1242 | if re.match('linux-.*-edge$', source): |
1312 | 1045 | states = root_element.find("states") | 1243 | return True |
1313 | 1244 | if re.match('linux-riscv.*$', source): | ||
1314 | 1245 | # linux-riscv.* currently causes a lot of false positives, skip | ||
1315 | 1246 | # it altogether while we don't land a better fix | ||
1316 | 1247 | return True | ||
1317 | 1248 | return False | ||
1318 | 1046 | 1249 | ||
1319 | 1047 | # Add package definition | ||
1320 | 1048 | definitions = root_element.find("definitions") | ||
1321 | 1049 | definition_element = self._generate_definition_element(package) | ||
1322 | 1050 | 1250 | ||
1328 | 1051 | # Control/cache variables | 1251 | class OvalGeneratorPkg(OvalGenerator): |
1329 | 1052 | one_time_added_id = None | 1252 | def __init__(self, releases, cve_paths, packages, progress, pkg_cache, fixed_only=True, cve_cache=None, cve_prefix_dir=None, outdir='./', oval_format='dpkg') -> None: |
1330 | 1053 | fixed_versions = {} | 1253 | super().__init__('pkg', releases, cve_paths, packages, progress, pkg_cache, fixed_only, cve_cache, cve_prefix_dir, outdir, oval_format) |
1326 | 1054 | binaries_id = None | ||
1327 | 1055 | cve_added = False | ||
1331 | 1056 | 1254 | ||
1335 | 1057 | #criteria = None | 1255 | def _generate_advisory(self, package: Package) -> etree.Element: |
1336 | 1058 | #if len(package.binaries) > 1: | 1256 | advisory = etree.Element("advisory") |
1337 | 1059 | # criteria = self._generate_subcriteria('AND') | 1257 | rights = etree.SubElement(advisory, "rights") |
1338 | 1258 | component = etree.SubElement(advisory, "component") | ||
1339 | 1259 | version = etree.SubElement(advisory, "current_version") | ||
1340 | 1060 | 1260 | ||
1341 | 1061 | for cve in package.cves: | 1261 | for cve in package.cves: |
1362 | 1062 | pkg_rel_entry = cve.pkg_rel_entries[package.name] | 1262 | if self.fixed_only and cve.pkg_rel_entries[str(package)].status != 'fixed': |
1363 | 1063 | for key in sorted(list(package.binaries)): | 1263 | continue |
1364 | 1064 | binaries = package.binaries[key] | 1264 | elif cve.pkg_rel_entries[str(package)].is_not_applicable(): |
1365 | 1065 | if pkg_rel_entry.fixed_version: | 1265 | continue |
1366 | 1066 | if pkg_rel_entry.fixed_version in fixed_versions: | 1266 | cve_obj = self._generate_cve_tag(cve) |
1367 | 1067 | self._add_test_ref_to_cve_tag(fixed_versions[pkg_rel_entry.fixed_version], cve, definition_element) | 1267 | advisory.append(cve_obj) |
1348 | 1068 | self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, definition_element) | ||
1349 | 1069 | continue | ||
1350 | 1070 | else: | ||
1351 | 1071 | self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element) | ||
1352 | 1072 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element) | ||
1353 | 1073 | fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id | ||
1354 | 1074 | elif one_time_added_id: | ||
1355 | 1075 | self._add_test_ref_to_cve_tag(one_time_added_id, cve, definition_element) | ||
1356 | 1076 | self._add_criterion(one_time_added_id, pkg_rel_entry, cve, definition_element) | ||
1357 | 1077 | continue | ||
1358 | 1078 | else: | ||
1359 | 1079 | self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element) | ||
1360 | 1080 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element) | ||
1361 | 1081 | one_time_added_id = self.definition_id | ||
1368 | 1082 | 1268 | ||
1370 | 1083 | test, obj, var, state = self._generate_elements(package, binaries, pkg_rel_entry, binaries_id) | 1269 | rights.text = f"Copyright (C) {datetime.now().year} Canonical Ltd." |
1371 | 1270 | component.text = package.section | ||
1372 | 1271 | version.text = package.get_latest_version() | ||
1373 | 1272 | |||
1374 | 1273 | return advisory | ||
1375 | 1274 | |||
1376 | 1275 | def _generate_metadata(self, package: Package) -> etree.Element: | ||
1377 | 1276 | metadata = etree.Element("metadata") | ||
1378 | 1277 | title = etree.SubElement(metadata, "title") | ||
1379 | 1278 | reference = self._generate_reference(package) | ||
1380 | 1279 | advisory = self._generate_advisory(package) | ||
1381 | 1280 | metadata.append(reference) | ||
1382 | 1281 | description = etree.SubElement(metadata, "description") | ||
1383 | 1282 | affected = etree.SubElement(metadata, "affected", attrib = {"family": "unix"}) | ||
1384 | 1283 | platform = etree.SubElement(affected, "platform") | ||
1385 | 1284 | metadata.append(advisory) | ||
1386 | 1285 | |||
1387 | 1286 | platform.text = self.release_name | ||
1388 | 1287 | title.text = package.name | ||
1389 | 1288 | description.text = package.description | ||
1390 | 1289 | |||
1391 | 1290 | return metadata | ||
1392 | 1291 | |||
1393 | 1292 | # Element generators | ||
1394 | 1293 | def _generate_reference(self, package) -> etree.Element: | ||
1395 | 1294 | reference = etree.Element("reference", attrib={ | ||
1396 | 1295 | "source": "Package", | ||
1397 | 1296 | "ref_id": package.name, | ||
1398 | 1297 | "ref_url": f'https://launchpad.net/ubuntu/+source/{package.name}' | ||
1399 | 1298 | }) | ||
1400 | 1299 | |||
1401 | 1300 | return reference | ||
1402 | 1301 | |||
1403 | 1302 | def _populate_pkg(self, package, root_element): | ||
1404 | 1303 | tests = root_element.find("tests") | ||
1405 | 1304 | objects = root_element.find("objects") | ||
1406 | 1305 | variables = root_element.find("variables") | ||
1407 | 1306 | states = root_element.find("states") | ||
1408 | 1307 | |||
1409 | 1308 | # Add package definition | ||
1410 | 1309 | definitions = root_element.find("definitions") | ||
1411 | 1310 | definition_element = self._generate_definition_object(package) | ||
1412 | 1311 | |||
1413 | 1312 | # Control/cache variables | ||
1414 | 1313 | one_time_added_id = None | ||
1415 | 1314 | fixed_versions = {} | ||
1416 | 1315 | binaries_ids = {} | ||
1417 | 1316 | cve_added = False | ||
1418 | 1317 | |||
1419 | 1318 | #criteria = None | ||
1420 | 1319 | #if len(package.binaries) > 1: | ||
1421 | 1320 | # criteria = self._generate_subcriteria('AND') | ||
1422 | 1321 | |||
1423 | 1322 | for cve in package.cves: | ||
1424 | 1323 | if self.fixed_only and cve.pkg_rel_entries[str(package)].status != 'fixed': | ||
1425 | 1324 | continue | ||
1426 | 1325 | pkg_rel_entry = cve.pkg_rel_entries[str(package)] | ||
1427 | 1326 | if pkg_rel_entry.is_not_applicable(): continue | ||
1428 | 1327 | source_version = package.get_version_to_check(pkg_rel_entry.fixed_version) | ||
1429 | 1328 | for binary_version in package.get_binary_versions(source_version): | ||
1430 | 1329 | binaries = package.get_binaries(source_version, binary_version) | ||
1431 | 1330 | |||
1432 | 1331 | # For released / not affected (version) CVEs | ||
1433 | 1332 | if pkg_rel_entry.fixed_version: | ||
1434 | 1333 | if binary_version in fixed_versions: | ||
1435 | 1334 | self._add_test_ref_to_cve_tag(fixed_versions[binary_version], cve, definition_element) | ||
1436 | 1335 | self._add_criterion(fixed_versions[binary_version], pkg_rel_entry, cve, definition_element) | ||
1437 | 1336 | continue | ||
1438 | 1337 | else: | ||
1439 | 1338 | self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element) | ||
1440 | 1339 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element) | ||
1441 | 1340 | fixed_versions[binary_version] = self.definition_id | ||
1442 | 1341 | # For not fixed CVEs, we already added one for this package | ||
1443 | 1342 | elif one_time_added_id: | ||
1444 | 1343 | self._add_test_ref_to_cve_tag(one_time_added_id, cve, definition_element) | ||
1445 | 1344 | self._add_criterion(one_time_added_id, pkg_rel_entry, cve, definition_element) | ||
1446 | 1345 | continue | ||
1447 | 1346 | # For not fixed CVEs, only need to add it once per package | ||
1448 | 1347 | else: | ||
1449 | 1348 | self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element) | ||
1450 | 1349 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, definition_element) | ||
1451 | 1350 | one_time_added_id = self.definition_id | ||
1452 | 1351 | |||
1453 | 1352 | # If version doesn't exist and only one binary_version, they all have the same binaries | ||
1454 | 1353 | if not package.version_exists(source_version) and package.all_binaries_same_version: | ||
1455 | 1354 | binary_id_version = GENERIC_VERSION | ||
1456 | 1355 | else: | ||
1457 | 1356 | binary_id_version = binary_version | ||
1458 | 1357 | |||
1459 | 1358 | binaries_ids.setdefault(binary_id_version, None) | ||
1460 | 1359 | binaries_id = binaries_ids[binary_id_version] | ||
1461 | 1360 | test, obj, var, state = self._generate_elements(package, binaries, binary_version, pkg_rel_entry, binaries_id) | ||
1462 | 1084 | 1361 | ||
1463 | 1085 | if state: | 1362 | if state: |
1464 | 1086 | states.append(state) | 1363 | states.append(state) |
1465 | 1087 | 1364 | ||
1466 | 1088 | if obj and var: | 1365 | if obj and var: |
1468 | 1089 | binaries_id = self.definition_id | 1366 | binaries_ids[binary_id_version] = self.definition_id |
1469 | 1090 | variables.append(var) | 1367 | variables.append(var) |
1470 | 1091 | objects.append(obj) | 1368 | objects.append(obj) |
1471 | 1092 | 1369 | ||
1472 | @@ -1099,29 +1376,39 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1473 | 1099 | 1376 | ||
1474 | 1100 | self._increase_id(is_definition=True) | 1377 | self._increase_id(is_definition=True) |
1475 | 1101 | 1378 | ||
1477 | 1102 | def _populate_kernel_pkg(self, package, root_element, running_kernel_id): | 1379 | def _populate_kernel_pkg(self, package, root_element, running_kernel_id): |
1478 | 1380 | for cve in package.cves: | ||
1479 | 1381 | pkg_rel_entry = cve.pkg_rel_entries[str(package)] | ||
1480 | 1382 | version_to_check = package.get_version_to_check(pkg_rel_entry.fixed_version) | ||
1481 | 1383 | for binary_version in package.get_binary_versions(version_to_check): | ||
1482 | 1384 | binaries = package.get_binaries(version_to_check, binary_version) | ||
1483 | 1103 | # Add package definition | 1385 | # Add package definition |
1484 | 1104 | definitions = root_element.find("definitions") | 1386 | definitions = root_element.find("definitions") |
1486 | 1105 | definition_element = self._generate_definition_element(package) | 1387 | definition_element = self._generate_definition_object(package) |
1487 | 1106 | 1388 | ||
1488 | 1107 | # Control/cache variables | 1389 | # Control/cache variables |
1489 | 1108 | fixed_versions = {} | 1390 | fixed_versions = {} |
1490 | 1109 | cve_added = False | 1391 | cve_added = False |
1491 | 1110 | 1392 | ||
1492 | 1393 | # Kernel binaries have all same version | ||
1493 | 1394 | version = package.get_latest_version() | ||
1494 | 1395 | binaries = package.get_binaries(version, version) | ||
1495 | 1396 | |||
1496 | 1111 | # Generate one-time elements | 1397 | # Generate one-time elements |
1499 | 1112 | kernel_criterion = self._generate_kernel_package_elements(package, root_element, running_kernel_id) | 1398 | kernel_criterion = self._generate_kernel_package_elements(package, binaries, root_element, running_kernel_id) |
1500 | 1113 | criteria = self._generate_subcriteria('OR') | 1399 | criteria = self._generate_criteria_kernel('OR') |
1501 | 1114 | 1400 | ||
1502 | 1115 | self._add_to_criteria(definition_element, kernel_criterion, operator='AND') | 1401 | self._add_to_criteria(definition_element, kernel_criterion, operator='AND') |
1503 | 1116 | self._add_to_criteria(definition_element, criteria, operator='AND') | 1402 | self._add_to_criteria(definition_element, criteria, operator='AND') |
1504 | 1117 | 1403 | ||
1505 | 1118 | for cve in package.cves: | 1404 | for cve in package.cves: |
1507 | 1119 | pkg_rel_entry = cve.pkg_rel_entries[package.name] | 1405 | pkg_rel_entry = cve.pkg_rel_entries[str(package)] |
1508 | 1406 | if pkg_rel_entry.is_not_applicable(): continue | ||
1509 | 1120 | cve_added = True | 1407 | cve_added = True |
1510 | 1121 | 1408 | ||
1511 | 1122 | self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element) | 1409 | self._add_test_ref_to_cve_tag(self.definition_id, cve, definition_element) |
1512 | 1123 | 1410 | ||
1514 | 1124 | kernel_version_criterion = self._add_fixed_kernel_elements(cve, package, pkg_rel_entry, root_element, running_kernel_id, fixed_versions) | 1411 | kernel_version_criterion = self._add_kernel_elements(cve, package, pkg_rel_entry.fixed_version, pkg_rel_entry, root_element, running_kernel_id, fixed_versions) |
1515 | 1125 | self._add_to_criteria(definition_element, kernel_version_criterion, depth=3) | 1412 | self._add_to_criteria(definition_element, kernel_version_criterion, depth=3) |
1516 | 1126 | self._increase_id(is_definition=False) | 1413 | self._increase_id(is_definition=False) |
1517 | 1127 | 1414 | ||
1518 | @@ -1129,881 +1416,447 @@ class OvalGeneratorPkg(OvalGenerator): | |||
1519 | 1129 | definitions.append(definition_element) | 1416 | definitions.append(definition_element) |
1520 | 1130 | self._increase_id(is_definition=True) | 1417 | self._increase_id(is_definition=True) |
1521 | 1131 | 1418 | ||
1522 | 1132 | def _add_new_package(self, package_name, cve, release, cve_data, packages) -> None: | ||
1523 | 1133 | if package_name not in packages: | ||
1524 | 1134 | _, version, binaries = get_binarypkgs(self.pkg_cache, package_name, release) | ||
1525 | 1135 | |||
1526 | 1136 | pkg_obj = Package(package_name, release, binaries, version) | ||
1527 | 1137 | packages[package_name] = pkg_obj | ||
1528 | 1138 | |||
1529 | 1139 | pkg_obj = packages[package_name] | ||
1530 | 1140 | cve_pkg_entry = CVEPkgRelEntry(pkg_obj, release, cve, cve_data['pkgs'][package_name][release][0], cve_data['pkgs'][package_name][release][1], self.pkg_cache) | ||
1531 | 1141 | |||
1532 | 1142 | if cve_pkg_entry.status != 'fixed' and self.fixed_only: | ||
1533 | 1143 | return | ||
1534 | 1144 | |||
1535 | 1145 | cve.add_pkg(pkg_obj, cve_pkg_entry) | ||
1536 | 1146 | |||
1537 | 1147 | def _load_pkgs(self, cve_prefix_dir, packages_filter=None) -> None: | ||
1538 | 1148 | cve_lib.load_external_subprojects() | ||
1539 | 1149 | |||
1540 | 1150 | cves = [] | ||
1541 | 1151 | for pathname in self.cve_paths: | ||
1542 | 1152 | cves = cves + glob.glob(os.path.join(cve_prefix_dir, pathname)) | ||
1543 | 1153 | |||
1544 | 1154 | cves.sort(key=lambda cve: | ||
1545 | 1155 | (int(cve.split('/')[-1].split('-')[1]), int(cve.split('/')[-1].split('-')[2])) \ | ||
1546 | 1156 | if cve.split('/')[-1].split('-')[2].isnumeric() \ | ||
1547 | 1157 | else (int(cve.split('/')[-1].split('-')[1]), 0) | ||
1548 | 1158 | ) | ||
1549 | 1159 | |||
1550 | 1160 | packages = {} | ||
1551 | 1161 | releases = [self.release] | ||
1552 | 1162 | current_release = self.release | ||
1553 | 1163 | while(cve_lib.release_parent(current_release)): | ||
1554 | 1164 | current_release = cve_lib.release_parent(current_release) | ||
1555 | 1165 | releases.append(current_release) | ||
1556 | 1166 | |||
1557 | 1167 | for release in releases: | ||
1558 | 1168 | sources[release] = load(releases=[release], skip_eol_releases=False)[release] | ||
1559 | 1169 | |||
1560 | 1170 | orig_name = cve_lib.get_orig_rel_name(release) | ||
1561 | 1171 | if '/' in orig_name: | ||
1562 | 1172 | orig_name = orig_name.split('/', maxsplit=1)[1] | ||
1563 | 1173 | source_map_binaries[release] = load(data_type='packages',releases=[orig_name], skip_eol_releases=False)[orig_name] \ | ||
1564 | 1174 | if release not in cve_lib.external_releases else {} | ||
1565 | 1175 | |||
1566 | 1176 | i = 0 | ||
1567 | 1177 | for cve_path in cves: | ||
1568 | 1178 | cve_number = cve_path.rsplit('/', 1)[1] | ||
1569 | 1179 | i += 1 | ||
1570 | 1180 | |||
1571 | 1181 | if self.progress: | ||
1572 | 1182 | print(f'[{i:5}/{len(cves)}] Processing {cve_number:18}', end='\r') | ||
1573 | 1183 | |||
1574 | 1184 | if not cve_number in self.cve_cache: | ||
1575 | 1185 | self.cve_cache[cve_number] = cve_lib.load_cve(cve_path) | ||
1576 | 1186 | |||
1577 | 1187 | info = self.cve_cache[cve_number] | ||
1578 | 1188 | cve_obj = CVE(cve_number, info) | ||
1579 | 1189 | |||
1580 | 1190 | for pkg in info['pkgs']: | ||
1581 | 1191 | if packages_filter and pkg not in packages_filter: | ||
1582 | 1192 | continue | ||
1583 | 1193 | |||
1584 | 1194 | for release in releases: | ||
1585 | 1195 | if pkg in sources[release] and release in info['pkgs'][pkg] and \ | ||
1586 | 1196 | info['pkgs'][pkg][release][0] != 'DNE': | ||
1587 | 1197 | self._add_new_package(pkg, cve_obj, release, info, packages) | ||
1588 | 1198 | break | ||
1589 | 1199 | |||
1590 | 1200 | packages = dict(sorted(packages.items())) | ||
1591 | 1201 | if self.progress: | ||
1592 | 1202 | print(' ' * 40, end='\r') | ||
1593 | 1203 | return packages | ||
1594 | 1204 | |||
1595 | 1205 | def generate_oval(self) -> None: | 1419 | def generate_oval(self) -> None: |
1599 | 1206 | self._reset() | 1420 | for release in self.releases: |
1600 | 1207 | xml_tree, root_element = self._get_root_element("Package") | 1421 | self._init_ids(release) |
1601 | 1208 | self._add_structure(root_element) | 1422 | xml_tree, root_element = self._get_root_element() |
1602 | 1423 | generator = self._get_generator("Package") | ||
1603 | 1424 | root_element.append(generator) | ||
1604 | 1425 | self._add_structure(root_element) | ||
1605 | 1209 | 1426 | ||
1612 | 1210 | if self.oval_format == 'dpkg': | 1427 | if self.oval_format == 'dpkg': |
1613 | 1211 | # One time kernel check | 1428 | # One time kernel check |
1614 | 1212 | self._add_release_checks(root_element) | 1429 | self._add_release_checks(root_element) |
1615 | 1213 | self._add_running_kernel_checks(root_element) | 1430 | self._add_running_kernel_checks(root_element) |
1616 | 1214 | running_kernel_id = self.definition_id | 1431 | running_kernel_id = self.definition_id |
1617 | 1215 | self._increase_id(is_definition=True) | 1432 | self._increase_id(is_definition=True) |
1618 | 1433 | |||
1619 | 1434 | all_pkgs = dict() | ||
1620 | 1435 | for parent_release in list(self.parent_releases)[::-1]: | ||
1621 | 1436 | all_pkgs.update(self.packages[parent_release]) | ||
1622 | 1437 | |||
1623 | 1438 | all_pkgs.update(self.packages[self.release]) | ||
1624 | 1439 | |||
1625 | 1440 | for pkg in all_pkgs: | ||
1626 | 1441 | if self._ignore_source_package(pkg): continue | ||
1627 | 1442 | if not all_pkgs[pkg].versions_binaries: continue | ||
1628 | 1443 | if not all_pkgs[pkg].get_binary_versions(next(iter(all_pkgs[pkg].versions_binaries))): continue | ||
1629 | 1444 | if all_pkgs[pkg].is_kernel_pkg and self.oval_format != 'oci': | ||
1630 | 1445 | self._populate_kernel_pkg(all_pkgs[pkg], root_element, running_kernel_id) | ||
1631 | 1446 | else: | ||
1632 | 1447 | self._populate_pkg(all_pkgs[pkg], root_element) | ||
1633 | 1216 | 1448 | ||
1637 | 1217 | for pkg in self.packages: | 1449 | self._write_oval_xml(xml_tree, root_element) |
1635 | 1218 | if len(self.packages[pkg].binaries) == 0: | ||
1636 | 1219 | continue | ||
1638 | 1220 | 1450 | ||
1643 | 1221 | if is_kernel_binaries(self.packages[pkg].binaries) and self.oval_format != 'oci': | 1451 | class OvalGeneratorCVE(OvalGenerator): |
1644 | 1222 | self._populate_kernel_pkg(self.packages[pkg], root_element, running_kernel_id) | 1452 | def __init__(self, releases, cve_paths, packages, progress, pkg_cache, fixed_only=True, cve_cache=None, cve_prefix_dir=None, outdir='./', oval_format='dpkg') -> None: |
1645 | 1223 | else: | 1453 | super().__init__('cve', releases, cve_paths, packages, progress, pkg_cache, fixed_only, cve_cache, cve_prefix_dir, outdir, oval_format) |
1642 | 1224 | self._populate_pkg(self.packages[pkg], root_element) | ||
1646 | 1225 | 1454 | ||
1649 | 1226 | #etree.indent(xml_tree, level=0) -> only available from Python 3.9 | 1455 | # For CVE OVAL, the definition ID is generated |
1650 | 1227 | xmlstr = minidom.parseString(etree.tostring(root_element)).toprettyxml(indent=" ") | 1456 | # from the CVE ID |
1651 | 1457 | def _set_definition_id(self, cve_id): | ||
1652 | 1458 | self.definition_id = int(re.sub('[^0-9]', '', cve_id)) * self.definition_step | ||
1653 | 1228 | 1459 | ||
1658 | 1229 | with open(os.path.join(self.output_dir, self.output_filepath), 'w') as file: | 1460 | def _generate_advisory(self, cve: CVE) -> etree.Element: |
1659 | 1230 | file.write(xmlstr) | 1461 | advisory = etree.Element("advisory") |
1660 | 1231 | #xml_tree.write(os.path.join(self.output_dir, self.output_filepath)) | 1462 | severity = etree.SubElement(advisory, "severity") |
1661 | 1232 | return | 1463 | rights = etree.SubElement(advisory, "rights") |
1662 | 1464 | public_date = etree.SubElement(advisory, "public_date") | ||
1663 | 1233 | 1465 | ||
1669 | 1234 | class OvalGeneratorCVE: | 1466 | if cve.public_date_at_usn: |
1670 | 1235 | supported_oval_elements = ('definition', 'test', 'object', 'state', | 1467 | public_date_at_usn = etree.SubElement(advisory, "public_date_at_usn") |
1671 | 1236 | 'variable') | 1468 | public_date_at_usn.text = cve.public_date_at_usn |
1667 | 1237 | generator_version = '1.1' | ||
1668 | 1238 | oval_schema_version = '5.11.1' | ||
1672 | 1239 | 1469 | ||
1675 | 1240 | def __init__(self, release, release_name, parent, warn_method=False, outdir='./', prefix='', oval_format='dpkg'): | 1470 | if cve.assigned_to: |
1676 | 1241 | """ constructor, set defaults for instances """ | 1471 | assigned_to = etree.SubElement(advisory, "assigned_to") |
1677 | 1472 | assigned_to.text = cve.assigned_to | ||
1678 | 1473 | |||
1679 | 1474 | if cve.discoverd_by: | ||
1680 | 1475 | discoverd_by = etree.SubElement(advisory, "discoverd_by") | ||
1681 | 1476 | discoverd_by.text = cve.discoverd_by | ||
1682 | 1242 | 1477 | ||
1696 | 1243 | self.release = release | 1478 | for bug in cve.bugs: |
1697 | 1244 | # e.g. codename for trusty/esm should be trusty | 1479 | element = etree.SubElement(advisory, 'bug') |
1698 | 1245 | self.release_codename = cve_lib.release_progenitor(release) if cve_lib.release_progenitor(release) else self.release.replace('/', '_') | 1480 | element.text = bug |
1686 | 1246 | self.release_name = release_name | ||
1687 | 1247 | self.warn = warn_method or self.warn | ||
1688 | 1248 | self.tmpdir = tempfile.mkdtemp(prefix='oval_lib-') | ||
1689 | 1249 | self.output_dir = outdir | ||
1690 | 1250 | self.oval_format = oval_format | ||
1691 | 1251 | self.output_filepath = \ | ||
1692 | 1252 | '{0}com.ubuntu.{1}.cve.oval.xml'.format(prefix, self.release.replace('/', '_')) | ||
1693 | 1253 | self.ns = 'oval:com.ubuntu.{0}'.format(self.release_codename) | ||
1694 | 1254 | self.id = 10 | ||
1695 | 1255 | self.release_applicability_definition_id = '{0}:def:{1}0'.format(self.ns, self.id) | ||
1699 | 1256 | 1481 | ||
1774 | 1257 | def __del__(self): | 1482 | for usn in cve.usns: |
1775 | 1258 | """ deconstructor, clean up """ | 1483 | element = etree.SubElement(advisory, 'ref') |
1776 | 1259 | if os.path.exists(self.tmpdir): | 1484 | element.text = f'https://ubuntu.com/security/notices/USN-{usn}' |
1703 | 1260 | recursive_rm(self.tmpdir) | ||
1704 | 1261 | |||
1705 | 1262 | def generate_cve_definition(self, cve): | ||
1706 | 1263 | """ generate an OVAL definition based on parsed CVE data """ | ||
1707 | 1264 | |||
1708 | 1265 | header = cve['header'] | ||
1709 | 1266 | # if the multiplier is not large enough, the tests IDs will | ||
1710 | 1267 | # overlap on things with large numbers of binary packages. | ||
1711 | 1268 | # if we ever have an issue that touches more than 1,000,000 | ||
1712 | 1269 | # binary packages, that will cause a problem. | ||
1713 | 1270 | id_base = int(re.sub('[^0-9]', '', header['Candidate'])) * 1000000 | ||
1714 | 1271 | if not self.unique_id_base(id_base, header['Source-note']): | ||
1715 | 1272 | self.warn('Calculated id_base "{0}" based on candidate value "{1}" is not unique. Skipping CVE.'.format(id_base, header['Candidate'])) | ||
1716 | 1273 | |||
1717 | 1274 | instruction = "" | ||
1718 | 1275 | # make test(s) for each package | ||
1719 | 1276 | test_refs = [] | ||
1720 | 1277 | packages = cve['packages'] | ||
1721 | 1278 | for package in sorted(packages.keys()): | ||
1722 | 1279 | if self.release in packages[package]['Releases']: | ||
1723 | 1280 | release_status = packages[package]['Releases'][self.release] | ||
1724 | 1281 | if 'bin-pkgs' in release_status and release_status['bin-pkgs']: | ||
1725 | 1282 | for key in sorted(list(release_status['bin-pkgs'])): | ||
1726 | 1283 | pkg = { | ||
1727 | 1284 | 'name': package, | ||
1728 | 1285 | 'binaries': release_status['bin-pkgs'][key], | ||
1729 | 1286 | 'status': release_status['status'], | ||
1730 | 1287 | 'note': release_status['note'], | ||
1731 | 1288 | 'fix-version': release_status['fix-version'] if 'fix-version' in release_status else '', | ||
1732 | 1289 | 'id_base': id_base + len(test_refs), | ||
1733 | 1290 | 'source-note': header['Source-note'] | ||
1734 | 1291 | } | ||
1735 | 1292 | if is_kernel_binaries(pkg['binaries']): | ||
1736 | 1293 | test_ref = self.get_running_kernel_testref(pkg) | ||
1737 | 1294 | if test_ref: | ||
1738 | 1295 | test_refs = test_refs + test_ref | ||
1739 | 1296 | pkg['id_base'] = id_base + 1 | ||
1740 | 1297 | else: | ||
1741 | 1298 | test_ref = self.get_oval_test_for_package(pkg) | ||
1742 | 1299 | if test_ref: | ||
1743 | 1300 | test_refs.append(test_ref) | ||
1744 | 1301 | # prepare update instructions if package is fixed | ||
1745 | 1302 | if pkg['status'] == 'fixed': | ||
1746 | 1303 | if 'parent' in release_status: | ||
1747 | 1304 | product_description = cve_lib.get_subproject_description(release_status['parent']) | ||
1748 | 1305 | else: | ||
1749 | 1306 | product_description = cve_lib.get_subproject_description(self.release) | ||
1750 | 1307 | instruction = prepare_instructions(instruction, header['Candidate'], product_description, pkg) | ||
1751 | 1308 | |||
1752 | 1309 | |||
1753 | 1310 | |||
1754 | 1311 | # if no packages for this release, then we're done | ||
1755 | 1312 | if not len(test_refs): | ||
1756 | 1313 | return False | ||
1757 | 1314 | |||
1758 | 1315 | # convert CVE data to OVAL definition metadata | ||
1759 | 1316 | mapping = { | ||
1760 | 1317 | 'ns': escape(self.ns), | ||
1761 | 1318 | 'id_base': id_base, | ||
1762 | 1319 | 'codename': escape(self.release_codename), | ||
1763 | 1320 | 'release_name': escape(self.release_name), | ||
1764 | 1321 | 'applicability_def_id': escape( | ||
1765 | 1322 | self.release_applicability_definition_id), | ||
1766 | 1323 | 'cve_title': escape(header['Candidate']), | ||
1767 | 1324 | 'description': escape('{0} {1}'.format(header['Description'], | ||
1768 | 1325 | header['Ubuntu-Description']).strip() + instruction), | ||
1769 | 1326 | 'priority': escape(header['Priority']), | ||
1770 | 1327 | 'criteria': '', | ||
1771 | 1328 | 'references': '', | ||
1772 | 1329 | 'notes': '' | ||
1773 | 1330 | } | ||
1777 | 1331 | 1485 | ||
1851 | 1332 | # convert test_refs to criteria | 1486 | advisory.append(self._generate_cve_tag(cve)) |
1852 | 1333 | if len(test_refs) == 1: | 1487 | rights.text = f"Copyright (C) {cve.public_date.split('-', 1)[0]} Canonical Ltd." |
1853 | 1334 | negation_attribute = 'negate = "true" ' \ | 1488 | severity.text = cve.priority.capitalize() |
1854 | 1335 | if 'negate' in test_refs[0] and test_refs[0]['negate'] else '' | 1489 | public_date.text = cve.public_date |
1782 | 1336 | mapping['criteria'] = \ | ||
1783 | 1337 | '<criterion test_ref="{0}" comment="{1}" {2}/>'.format( | ||
1784 | 1338 | test_refs[0]['id'], escape(test_refs[0]['comment']), negation_attribute) | ||
1785 | 1339 | else: | ||
1786 | 1340 | criteria = [] | ||
1787 | 1341 | criteria.append('<criteria operator="OR">') | ||
1788 | 1342 | for test_ref in test_refs: | ||
1789 | 1343 | if 'kernel' in test_ref: | ||
1790 | 1344 | criteria.append(' <criteria operator="AND">') | ||
1791 | 1345 | negation_attribute = 'negate = "true" ' \ | ||
1792 | 1346 | if 'negate' in test_ref and test_ref['negate'] else '' | ||
1793 | 1347 | criteria.append( | ||
1794 | 1348 | ' <criterion test_ref="{0}" comment="{1}" {2}/>'.format( | ||
1795 | 1349 | test_ref['id'], | ||
1796 | 1350 | escape(test_ref['comment']), negation_attribute)) | ||
1797 | 1351 | elif 'kernelobj' in test_ref: | ||
1798 | 1352 | criteria.append( | ||
1799 | 1353 | ' <criterion test_ref="{0}" comment="{1}" {2}/>'.format( | ||
1800 | 1354 | test_ref['id'], | ||
1801 | 1355 | escape(test_ref['comment']), negation_attribute)) | ||
1802 | 1356 | criteria.append(' </criteria>') | ||
1803 | 1357 | else: | ||
1804 | 1358 | negation_attribute = 'negate = "true" ' \ | ||
1805 | 1359 | if 'negate' in test_ref and test_ref['negate'] else '' | ||
1806 | 1360 | criteria.append( | ||
1807 | 1361 | ' <criterion test_ref="{0}" comment="{1}" {2}/>'.format( | ||
1808 | 1362 | test_ref['id'], | ||
1809 | 1363 | escape(test_ref['comment']), negation_attribute)) | ||
1810 | 1364 | criteria.append('</criteria>') | ||
1811 | 1365 | mapping['criteria'] = '\n '.join(criteria) | ||
1812 | 1366 | |||
1813 | 1367 | # convert notes | ||
1814 | 1368 | if header['Notes']: | ||
1815 | 1369 | mapping['notes'] = '\n <oval:notes>' + \ | ||
1816 | 1370 | '\n <oval:note>{0}</oval:note>'.format(escape(header['Notes'])) + \ | ||
1817 | 1371 | '\n </oval:notes>' | ||
1818 | 1372 | |||
1819 | 1373 | # convert additional data <advisory> metadata elements | ||
1820 | 1374 | advisory = [] | ||
1821 | 1375 | advisory.append('<severity>{0}</severity>'.format( | ||
1822 | 1376 | escape(header['Priority'].title()))) | ||
1823 | 1377 | advisory.append( | ||
1824 | 1378 | '<rights>Copyright (C) {0}Canonical Ltd.</rights>'.format(escape( | ||
1825 | 1379 | header['PublicDate'].split('-', 1)[0] + ' ' | ||
1826 | 1380 | if header['PublicDate'] else ''))) | ||
1827 | 1381 | if header['PublicDate']: | ||
1828 | 1382 | advisory.append('<public_date>{0}</public_date>'.format( | ||
1829 | 1383 | escape(header['PublicDate']))) | ||
1830 | 1384 | if header['PublicDateAtUSN']: | ||
1831 | 1385 | advisory.append( | ||
1832 | 1386 | '<public_date_at_usn>{0}</public_date_at_usn>'.format(escape( | ||
1833 | 1387 | header['PublicDateAtUSN']))) | ||
1834 | 1388 | if header['Assigned-to']: | ||
1835 | 1389 | advisory.append('<assigned_to>{0}</assigned_to>'.format(escape( | ||
1836 | 1390 | header['Assigned-to']))) | ||
1837 | 1391 | if header['Discovered-by']: | ||
1838 | 1392 | advisory.append('<discovered_by>{0}</discovered_by>'.format(escape( | ||
1839 | 1393 | header['Discovered-by']))) | ||
1840 | 1394 | if header['CRD']: | ||
1841 | 1395 | advisory.append('<crd>{0}</crd>'.format(escape(header['CRD']))) | ||
1842 | 1396 | for bug in header['Bugs']: | ||
1843 | 1397 | advisory.append('<bug>{0}</bug>'.format(escape(bug))) | ||
1844 | 1398 | for ref in header['References']: | ||
1845 | 1399 | if ref.startswith('https://cve.mitre'): | ||
1846 | 1400 | cve_title = ref.split('=')[-1].strip() | ||
1847 | 1401 | if not cve_title: | ||
1848 | 1402 | continue | ||
1849 | 1403 | mapping['cve_title'] = escape(cve_title) | ||
1850 | 1404 | mapping['references'] = '\n <reference source="CVE" ref_id="{0}" ref_url="{1}" />'.format(mapping['cve_title'], escape(ref)) | ||
1855 | 1405 | 1490 | ||
1859 | 1406 | cve_ref = generate_cve_tag(header) | 1491 | return advisory |
1857 | 1407 | advisory.append(cve_ref) | ||
1858 | 1408 | mapping['advisory_elements'] = '\n '.join(advisory) | ||
1860 | 1409 | 1492 | ||
1883 | 1410 | if self.oval_format == 'dpkg': | 1493 | def _generate_metadata(self, cve: CVE) -> etree.Element: |
1884 | 1411 | mapping['os_release_check'] = """<extend_definition definition_ref="{applicability_def_id}" comment="{release_name} ({codename}) is installed." applicability_check="true" />""".format(**mapping) | 1494 | metadata = etree.Element("metadata") |
1885 | 1412 | else: | 1495 | title = etree.SubElement(metadata, "title") |
1886 | 1413 | mapping['os_release_check'] = '' | 1496 | reference = self._generate_reference(cve) |
1887 | 1414 | 1497 | advisory = self._generate_advisory(cve) | |
1888 | 1415 | self.queue_element('definition', """ | 1498 | description = etree.SubElement(metadata, "description") |
1889 | 1416 | <definition class="vulnerability" id="{ns}:def:{id_base}0" version="1"> | 1499 | affected = etree.SubElement(metadata, "affected", attrib = {"family": "unix"}) |
1890 | 1417 | <metadata> | 1500 | platform = etree.SubElement(affected, "platform") |
1891 | 1418 | <title>{cve_title} on {release_name} ({codename}) - {priority}.</title> | 1501 | metadata.append(reference) |
1892 | 1419 | <description>{description}</description> | 1502 | metadata.append(advisory) |
1871 | 1420 | <affected family="unix"> | ||
1872 | 1421 | <platform>{release_name}</platform> | ||
1873 | 1422 | </affected>{references} | ||
1874 | 1423 | <advisory> | ||
1875 | 1424 | {advisory_elements} | ||
1876 | 1425 | </advisory> | ||
1877 | 1426 | </metadata>{notes} | ||
1878 | 1427 | <criteria> | ||
1879 | 1428 | {os_release_check} | ||
1880 | 1429 | {criteria} | ||
1881 | 1430 | </criteria> | ||
1882 | 1431 | </definition>\n""".format(**mapping)) | ||
1893 | 1432 | 1503 | ||
1897 | 1433 | # TODO: xml lib | 1504 | platform.text = self.release_name |
1898 | 1434 | def add_release_applicability_definition(self): | 1505 | title.text = f'{cve.number} on {self.release_name} ({self.release_codename}) - {cve.priority}' |
1899 | 1435 | """ add platform/release applicability OVAL definition for codename """ | 1506 | description.text = cve.description.replace('\n','') |
1900 | 1436 | 1507 | ||
1909 | 1437 | mapping = { | 1508 | return metadata |
1902 | 1438 | 'ns': self.ns, | ||
1903 | 1439 | 'id_base': self.id, | ||
1904 | 1440 | 'codename': self.release_codename, | ||
1905 | 1441 | 'release_name': self.release_name, | ||
1906 | 1442 | } | ||
1907 | 1443 | self.release_applicability_definition_id = \ | ||
1908 | 1444 | '{ns}:def:{id_base}0'.format(**mapping) | ||
1910 | 1445 | 1509 | ||
1968 | 1446 | if self.oval_format == 'dpkg': | 1510 | # Element generators |
1969 | 1447 | self.queue_element('definition', """ | 1511 | def _generate_reference(self, cve: CVE) -> etree.Element: |
1970 | 1448 | <definition class="inventory" id="{ns}:def:{id_base}0" version="1"> | 1512 | reference = etree.Element("reference", attrib={ |
1971 | 1449 | <metadata> | 1513 | "source": "CVE", |
1972 | 1450 | <title>Check that {release_name} ({codename}) is installed.</title> | 1514 | "ref_id": cve.number, |
1973 | 1451 | <description></description> | 1515 | "ref_url": f'https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve.number}' |
1974 | 1452 | </metadata> | 1516 | }) |
1918 | 1453 | <criteria> | ||
1919 | 1454 | <criterion test_ref="{ns}:tst:{id_base}0" comment="The host is part of the unix family." /> | ||
1920 | 1455 | <criterion test_ref="{ns}:tst:{id_base}1" comment="The host is running Ubuntu {codename}." /> | ||
1921 | 1456 | </criteria> | ||
1922 | 1457 | </definition>\n""".format(**mapping)) | ||
1923 | 1458 | |||
1924 | 1459 | self.queue_element('test', """ | ||
1925 | 1460 | <ind-def:family_test id="{ns}:tst:{id_base}0" check="at least one" check_existence="at_least_one_exists" version="1" comment="Is the host part of the unix family?"> | ||
1926 | 1461 | <ind-def:object object_ref="{ns}:obj:{id_base}0"/> | ||
1927 | 1462 | <ind-def:state state_ref="{ns}:ste:{id_base}0"/> | ||
1928 | 1463 | </ind-def:family_test> | ||
1929 | 1464 | |||
1930 | 1465 | <ind-def:textfilecontent54_test id="{ns}:tst:{id_base}1" check="at least one" check_existence="at_least_one_exists" version="1" comment="Is the host running Ubuntu {codename}?"> | ||
1931 | 1466 | <ind-def:object object_ref="{ns}:obj:{id_base}1"/> | ||
1932 | 1467 | <ind-def:state state_ref="{ns}:ste:{id_base}1"/> | ||
1933 | 1468 | </ind-def:textfilecontent54_test>\n""".format(**mapping)) | ||
1934 | 1469 | |||
1935 | 1470 | # /etc/lsb-release has to be a single path, due to some | ||
1936 | 1471 | # environments (namely snaps) not being allowed to list the | ||
1937 | 1472 | # content of /etc/ | ||
1938 | 1473 | self.queue_element('object', """ | ||
1939 | 1474 | <ind-def:family_object id="{ns}:obj:{id_base}0" version="1" comment="The singleton family object."/> | ||
1940 | 1475 | |||
1941 | 1476 | <ind-def:textfilecontent54_object id="{ns}:obj:{id_base}1" version="1" comment="The singleton release codename object."> | ||
1942 | 1477 | <ind-def:filepath>/etc/lsb-release</ind-def:filepath> | ||
1943 | 1478 | <ind-def:pattern operation="pattern match">^[\\s\\S]*DISTRIB_CODENAME=([a-z]+)$</ind-def:pattern> | ||
1944 | 1479 | <ind-def:instance datatype="int">1</ind-def:instance> | ||
1945 | 1480 | </ind-def:textfilecontent54_object>\n""".format(**mapping)) | ||
1946 | 1481 | |||
1947 | 1482 | self.queue_element('state', """ | ||
1948 | 1483 | <ind-def:family_state id="{ns}:ste:{id_base}0" version="1" comment="The singleton family object."> | ||
1949 | 1484 | <ind-def:family>unix</ind-def:family> | ||
1950 | 1485 | </ind-def:family_state> | ||
1951 | 1486 | |||
1952 | 1487 | <ind-def:textfilecontent54_state id="{ns}:ste:{id_base}1" version="1" comment="{release_name}"> | ||
1953 | 1488 | <ind-def:subexpression>{codename}</ind-def:subexpression> | ||
1954 | 1489 | </ind-def:textfilecontent54_state>\n""".format(**mapping)) | ||
1955 | 1490 | |||
1956 | 1491 | def get_oval_test_for_package(self, package): | ||
1957 | 1492 | """ create OVAL test and dependent objects for this package status | ||
1958 | 1493 | @package = { | ||
1959 | 1494 | 'name' : '<package name>', | ||
1960 | 1495 | 'binaries' : [ '<binary_pkg_name', '<binary_pkg_name', ... ], | ||
1961 | 1496 | 'status' : '<not-applicable | unknown | vulnerable | fixed>', | ||
1962 | 1497 | 'note' : '<a description of the status>', | ||
1963 | 1498 | 'fix-version' : '<the version in which the issue was fixed, if applicable>', | ||
1964 | 1499 | 'id_base' : a base for the integer section of the OVAL id, | ||
1965 | 1500 | 'source-note' : a note about the datasource for debugging | ||
1966 | 1501 | } | ||
1967 | 1502 | """ | ||
1975 | 1503 | 1517 | ||
1979 | 1504 | if package['status'] == 'fixed' and not package['fix-version']: | 1518 | return reference |
1977 | 1505 | self.warn('"{0}" package in {1} is marked fixed, but missing a fix-version. Changing status to vulnerable.'.format(package['name'], package['source-note'])) | ||
1978 | 1506 | package['status'] = 'vulnerable' | ||
1980 | 1507 | 1519 | ||
1989 | 1508 | if package['status'] == 'not-applicable': | 1520 | def prepare_instructions(self, instruction, cve: CVE, product_description, package: Package, fixed_version): |
1990 | 1509 | # if the package status is not-applicable, skip it! | 1521 | if "LSN" in cve.number: |
1991 | 1510 | return False | 1522 | instruction = """\n |
1992 | 1511 | elif package['status'] == 'not-vulnerable': | 1523 | To check your kernel type and Livepatch version, enter this command: |
1985 | 1512 | # if the packaget status is not-vulnerable, skip it! | ||
1986 | 1513 | return False | ||
1987 | 1514 | """ | ||
1988 | 1515 | object_id = self.get_package_object_id(package['name'], package['id_base'], 1) | ||
1993 | 1516 | 1524 | ||
1996 | 1517 | test_title = "Returns true whether or not the '{0}' package exists.".format(package['name']) | 1525 | canonical-livepatch status""" |
1995 | 1518 | test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id, None, 1, 'any_exist') | ||
1997 | 1519 | 1526 | ||
2003 | 1520 | package['note'] = package['name'] + package['note'] | 1527 | if not instruction: |
2004 | 1521 | return {'id': test_id, 'comment': package['note'], 'negate': True} | 1528 | instruction = """\n |
2005 | 1522 | """ | 1529 | Update Instructions: |
2001 | 1523 | elif package['status'] == 'vulnerable': | ||
2002 | 1524 | object_id = self.get_package_object_id(package['name'], package['binaries'], package['id_base']) | ||
2006 | 1525 | 1530 | ||
2009 | 1526 | test_title = "Does the '{0}' package exist?".format(package['name']) | 1531 | Run `sudo pro fix {0}` to fix the vulnerability. The problem can be corrected |
2010 | 1527 | test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id) | 1532 | by updating your system to the following package versions:""".format(cve) |
2011 | 1528 | 1533 | ||
2016 | 1529 | package['note'] = package['name'] + package['note'] | 1534 | instruction += '\n\n' |
2017 | 1530 | return {'id': test_id, 'comment': package['note']} | 1535 | source_version = package.get_version_to_check(fixed_version) |
2018 | 1531 | elif package['status'] == 'fixed': | 1536 | for binary_version in package.get_binary_versions(source_version): |
2019 | 1532 | object_id = self.get_package_object_id(package['name'], package['binaries'], package['id_base']) | 1537 | binaries = package.get_binaries(source_version, binary_version) |
2020 | 1538 | for binary in binaries: | ||
2021 | 1539 | instruction += """{0} - {1}\n""".format(binary, binary_version) | ||
2022 | 1533 | 1540 | ||
2024 | 1534 | state_id = self.get_package_version_state_id(package['id_base'], package['fix-version']) | 1541 | if "LSN" in cve.number: |
2025 | 1542 | instruction += "Livepatch subscription required" | ||
2026 | 1543 | elif "Long Term" in product_description or "Interim" in product_description: | ||
2027 | 1544 | instruction += "No subscription required" | ||
2028 | 1545 | else: | ||
2029 | 1546 | instruction += product_description | ||
2030 | 1535 | 1547 | ||
2033 | 1536 | test_title = "Does the '{0}' package exist and is the version less than '{1}'?".format(package['name'], package['fix-version']) | 1548 | return instruction |
2032 | 1537 | test_id = self.get_package_test_id(package['name'], package['id_base'], test_title, object_id, state_id) | ||
2034 | 1538 | 1549 | ||
2040 | 1539 | package['note'] = package['name'] + package['note'] | 1550 | def _populate_pkg(self, cve: CVE, package: Package, root_element, main_criteria, cache, fixed_versions) -> None: |
2041 | 1540 | return {'id': test_id, 'comment': package['note']} | 1551 | tests = root_element.find("tests") |
2042 | 1541 | else: | 1552 | objects = root_element.find("objects") |
2043 | 1542 | if package['status'] != 'unknown': | 1553 | variables = root_element.find("variables") |
2044 | 1543 | self.warn('"{0}" is not a supported package status. Outputting for "unknown" status.'.format(package['status'])) | 1554 | states = root_element.find("states") |
2045 | 1555 | pkg_rel_entry = cve.pkg_rel_entries[str(package)] | ||
2046 | 1544 | 1556 | ||
2051 | 1545 | if not hasattr(self, 'id_unknown_test'): | 1557 | source_version = package.get_version_to_check(pkg_rel_entry.fixed_version) |
2052 | 1546 | self.id_unknown_test = '{0}:tst:10'.format(self.ns) | 1558 | for binary_version in package.get_binary_versions(source_version): |
2053 | 1547 | self.queue_element('test', """ | 1559 | binaries = package.get_binaries(source_version, binary_version) |
2054 | 1548 | <ind-def:unknown_test id="{0}" check="all" comment="The result of this test is always UNKNOWN." version="1" />\n""".format(self.id_unknown_test)) | 1560 | cache_entry = f'{package.name}-{binary_version}' |
2055 | 1549 | 1561 | ||
2058 | 1550 | package['note'] = package['name'] + package['note'] | 1562 | cache.setdefault(cache_entry, dict(bin_id=None, def_id=None)) |
2057 | 1551 | return {'id': self.id_unknown_test, 'comment': package['note']} | ||
2059 | 1552 | 1563 | ||
2065 | 1553 | # TODO: xml lib | 1564 | # If version doesn't exist and only one binary_version, they all have the same binaries |
2066 | 1554 | def get_package_object_id(self, name, bin_pkgs, id_base, version=1): | 1565 | if not package.version_exists(source_version) and package.all_binaries_same_version: |
2067 | 1555 | """ create unique object for each package and return its OVAL id """ | 1566 | cache_entry_bin = f'{package.name}-{GENERIC_VERSION}' |
2068 | 1556 | if not hasattr(self, 'package_objects'): | 1567 | cache.setdefault(cache_entry_bin, dict(bin_id=None, def_id=None)) |
2069 | 1557 | self.package_objects = {} | 1568 | else: |
2070 | 1569 | cache_entry_bin = cache_entry | ||
2071 | 1558 | 1570 | ||
2073 | 1559 | key = tuple(sorted(bin_pkgs)) | 1571 | if pkg_rel_entry.status == 'vulnerable' and not self.fixed_only: |
2074 | 1572 | if not cache_entry in cache or not cache[cache_entry]['def_id']: | ||
2075 | 1573 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, main_criteria) | ||
2076 | 1560 | 1574 | ||
2079 | 1561 | if key not in self.package_objects: | 1575 | test, object, var = self._generate_vulnerable_elements(package, binaries, cache[cache_entry_bin]['bin_id']) |
2080 | 1562 | object_id = '{0}:obj:{1}0'.format(self.ns, id_base) | 1576 | tests.append(test) |
2081 | 1563 | 1577 | ||
2097 | 1564 | if len(bin_pkgs) > 1: | 1578 | if not cache[cache_entry_bin]['bin_id']: |
2098 | 1565 | # create variable for binary package names | 1579 | objects.append(object) |
2099 | 1566 | variable_id = '{0}:var:{1}0'.format(self.ns, id_base) | 1580 | variables.append(var) |
2100 | 1567 | if self.oval_format == 'dpkg': | 1581 | cache[cache_entry_bin]['bin_id'] = self.definition_id |
2086 | 1568 | variable_values = '</value>\n <value>'.join(bin_pkgs) | ||
2087 | 1569 | self.queue_element('variable', """ | ||
2088 | 1570 | <constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries"> | ||
2089 | 1571 | <value>{3}</value> | ||
2090 | 1572 | </constant_variable>\n""".format(variable_id, version, name, variable_values)) | ||
2091 | 1573 | |||
2092 | 1574 | # create an object that references the variable | ||
2093 | 1575 | self.queue_element('object', """ | ||
2094 | 1576 | <linux-def:dpkginfo_object id="{0}" version="{1}" comment="The '{2}' package binaries."> | ||
2095 | 1577 | <linux-def:name var_ref="{3}" var_check="at least one" /> | ||
2096 | 1578 | </linux-def:dpkginfo_object>\n""".format(object_id, version, name, variable_id)) | ||
2101 | 1579 | 1582 | ||
2102 | 1583 | cache[cache_entry]['def_id'] = self.definition_id | ||
2103 | 1584 | self._increase_id(is_definition=False) | ||
2104 | 1580 | else: | 1585 | else: |
2127 | 1581 | variable_values = '(?::\w+|)\s+(.*)$</value>\n <value>^'.join(bin_pkgs) | 1586 | self._add_criterion(cache[cache_entry]['def_id'], pkg_rel_entry, cve, main_criteria) |
2128 | 1582 | self.queue_element('variable', """ | 1587 | elif pkg_rel_entry.status == 'fixed': |
2129 | 1583 | <constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries"> | 1588 | if binary_version in fixed_versions: |
2130 | 1584 | <value>^{3}(?::\w+|)\s+(.*)$</value> | 1589 | self._add_criterion(fixed_versions[binary_version], pkg_rel_entry, cve, main_criteria) |
2109 | 1585 | </constant_variable>\n""".format(variable_id, version, name, variable_values)) | ||
2110 | 1586 | |||
2111 | 1587 | # create an object that references the variable | ||
2112 | 1588 | self.queue_element('object', """ | ||
2113 | 1589 | <ind-def:textfilecontent54_object id="{0}" version="{1}" comment="The '{2}' package binaries."> | ||
2114 | 1590 | <ind-def:path>.</ind-def:path> | ||
2115 | 1591 | <ind-def:filename>manifest</ind-def:filename> | ||
2116 | 1592 | <ind-def:pattern operation="pattern match" datatype="string" var_ref="{3}" var_check="at least one" /> | ||
2117 | 1593 | <ind-def:instance operation="greater than or equal" datatype="int">1</ind-def:instance> | ||
2118 | 1594 | </ind-def:textfilecontent54_object>\n""".format(object_id, version, name, variable_id)) | ||
2119 | 1595 | |||
2120 | 1596 | else: | ||
2121 | 1597 | if self.oval_format == 'dpkg': | ||
2122 | 1598 | # 1 binary package, so just use name in object (no variable) | ||
2123 | 1599 | self.queue_element('object', """ | ||
2124 | 1600 | <linux-def:dpkginfo_object id="{0}" version="{1}" comment="The '{2}' package binary."> | ||
2125 | 1601 | <linux-def:name>{3}</linux-def:name> | ||
2126 | 1602 | </linux-def:dpkginfo_object>\n""".format(object_id, version, name, bin_pkgs[0])) | ||
2131 | 1603 | else: | 1590 | else: |
2171 | 1604 | variable_id = '{0}:var:{1}0'.format(self.ns, id_base) | 1591 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, main_criteria) |
2133 | 1605 | variable_values = '(?::\w+|)\s+(.*)$</value>\n <value>^'.join(bin_pkgs) | ||
2134 | 1606 | self.queue_element('variable', """ | ||
2135 | 1607 | <constant_variable id="{0}" version="{1}" datatype="string" comment="'{2}' package binaries"> | ||
2136 | 1608 | <value>^{3}(?::\w+|)\s+(.*)$</value> | ||
2137 | 1609 | </constant_variable>\n""".format(variable_id, version, name, variable_values)) | ||
2138 | 1610 | self.queue_element('object', """ | ||
2139 | 1611 | <ind-def:textfilecontent54_object id="{0}" version="{1}" comment="The '{2}' package binary."> | ||
2140 | 1612 | <ind-def:path>.</ind-def:path> | ||
2141 | 1613 | <ind-def:filename>manifest</ind-def:filename> | ||
2142 | 1614 | <ind-def:pattern operation="pattern match" datatype="string" var_ref="{3}" var_check="at least one" /> | ||
2143 | 1615 | <ind-def:instance operation="greater than or equal" datatype="int">1</ind-def:instance> | ||
2144 | 1616 | </ind-def:textfilecontent54_object>\n""".format(object_id, version, name, variable_id)) | ||
2145 | 1617 | |||
2146 | 1618 | self.package_objects[key] = object_id | ||
2147 | 1619 | |||
2148 | 1620 | return self.package_objects[key] | ||
2149 | 1621 | |||
2150 | 1622 | # TODO: xml lib | ||
2151 | 1623 | def get_package_version_state_id(self, id_base, fix_version, version=1): | ||
2152 | 1624 | """ create unique states for each version and return its OVAL id """ | ||
2153 | 1625 | if not hasattr(self, 'package_version_states'): | ||
2154 | 1626 | self.package_version_states = {} | ||
2155 | 1627 | |||
2156 | 1628 | key = fix_version | ||
2157 | 1629 | if key not in self.package_version_states: | ||
2158 | 1630 | state_id = '{0}:ste:{1}0'.format(self.ns, id_base) | ||
2159 | 1631 | if self.oval_format == 'dpkg': | ||
2160 | 1632 | epoch_fix_version = fix_version if fix_version.find(':') != -1 else "0:" + fix_version | ||
2161 | 1633 | self.queue_element('state', """ | ||
2162 | 1634 | <linux-def:dpkginfo_state id="{0}" version="{1}" comment="The package version is less than '{2}'."> | ||
2163 | 1635 | <linux-def:evr datatype="debian_evr_string" operation="less than">{2}</linux-def:evr> | ||
2164 | 1636 | </linux-def:dpkginfo_state>\n""".format(state_id, version, epoch_fix_version)) | ||
2165 | 1637 | else: | ||
2166 | 1638 | self.queue_element('state', """ | ||
2167 | 1639 | <ind-def:textfilecontent54_state id="{0}" version="{1}" comment="The package version is less than '{2}'."> | ||
2168 | 1640 | <ind-def:subexpression datatype="debian_evr_string" operation="less than">{2}</ind-def:subexpression> | ||
2169 | 1641 | </ind-def:textfilecontent54_state>\n""".format(state_id, version, fix_version)) | ||
2170 | 1642 | self.package_version_states[key] = state_id | ||
2172 | 1643 | 1592 | ||
2174 | 1644 | return self.package_version_states[key] | 1593 | test, object, var, state = self._generate_fixed_elements(package, binaries, binary_version, cache[cache_entry_bin]['bin_id']) |
2175 | 1594 | tests.append(test) | ||
2176 | 1595 | states.append(state) | ||
2177 | 1645 | 1596 | ||
2193 | 1646 | # TODO: xml lib | 1597 | if not cache[cache_entry_bin]['bin_id']: |
2194 | 1647 | def get_package_test_id(self, name, id_base, test_title, object_id, state_id=None, version=1, check_existence='at_least_one_exists'): | 1598 | objects.append(object) |
2195 | 1648 | """ create unique test for each parameter set and return its OVAL id """ | 1599 | variables.append(var) |
2196 | 1649 | if not hasattr(self, 'package_tests'): | 1600 | cache[cache_entry_bin]['bin_id'] = self.definition_id |
2197 | 1650 | self.package_tests = {} | 1601 | |
2198 | 1651 | 1602 | fixed_versions[binary_version] = self.definition_id | |
2199 | 1652 | key = (name, test_title, object_id, state_id) | 1603 | self._increase_id(is_definition=False) |
2200 | 1653 | if key not in self.package_tests: | 1604 | |
2201 | 1654 | test_id = '{0}:tst:{1}0'.format(self.ns, id_base) | 1605 | def _populate_kernel_pkg(self, cve: CVE, package: Package, root_element, main_criteria, running_kernel_id, cache, fixed_versions) -> None: |
2202 | 1655 | if self.oval_format == 'dpkg': | 1606 | # Kernel binaries have all same version |
2203 | 1656 | state_ref = '\n <linux-def:state state_ref="{0}" />'.format(state_id) if state_id else '' | 1607 | version = package.get_latest_version() |
2204 | 1657 | self.queue_element('test', """ | 1608 | binaries = package.get_binaries(version, version) |
2205 | 1658 | <linux-def:dpkginfo_test id="{0}" version="{1}" check_existence="{5}" check="at least one" comment="{2}"> | 1609 | pkg_rel_entry = cve.pkg_rel_entries[str(package)] |
2206 | 1659 | <linux-def:object object_ref="{3}"/>{4} | 1610 | cache_entry = f'{package.name}-{pkg_rel_entry.fixed_version}' |
2207 | 1660 | </linux-def:dpkginfo_test>\n""".format(test_id, version, test_title, object_id, state_ref, check_existence)) | 1611 | |
2208 | 1612 | if not cache_entry in cache: | ||
2209 | 1613 | # Generate one-time elements | ||
2210 | 1614 | kernel_criterion = self._generate_kernel_package_elements(package, binaries, root_element, running_kernel_id) | ||
2211 | 1615 | cache[cache_entry] = kernel_criterion | ||
2212 | 1616 | |||
2213 | 1617 | if pkg_rel_entry.status == 'fixed': | ||
2214 | 1618 | criteria = self._generate_criteria_kernel('AND') | ||
2215 | 1619 | self._add_to_criteria(criteria, cache[cache_entry], operator='AND', depth=0) | ||
2216 | 1620 | |||
2217 | 1621 | kernel_version_criterion = self._add_kernel_elements(cve, package, pkg_rel_entry.fixed_version, pkg_rel_entry, root_element, running_kernel_id, fixed_versions) | ||
2218 | 1622 | self._add_to_criteria(criteria, kernel_version_criterion, depth=0) | ||
2219 | 1623 | self._add_to_criteria(main_criteria, criteria, depth=2, operator='OR') | ||
2220 | 1624 | self._increase_id(is_definition=False) | ||
2221 | 1625 | else: | ||
2222 | 1626 | self._add_to_criteria(main_criteria, cache[cache_entry], depth=2, operator='OR') | ||
2223 | 1627 | |||
2224 | 1628 | def _generate_elements_from_cve(self, cve, supported_releases, root_element, running_kernel_id, pkg_cache, fixed_versions) -> None: | ||
2225 | 1629 | if not cve.pkgs: return | ||
2226 | 1630 | added = False | ||
2227 | 1631 | definition_element = self._generate_definition_object(cve) | ||
2228 | 1632 | instructions = '' | ||
2229 | 1633 | pkgs = cve.get_pkgs(supported_releases) | ||
2230 | 1634 | for pkg in pkgs: | ||
2231 | 1635 | if not pkg.versions_binaries: continue | ||
2232 | 1636 | if not pkg.get_binary_versions(next(iter(pkg.versions_binaries))): continue | ||
2233 | 1637 | if self._ignore_source_package(pkg.name): continue | ||
2234 | 1638 | |||
2235 | 1639 | added = True | ||
2236 | 1640 | pkg_rel_entry = cve.pkg_rel_entries[str(pkg)] | ||
2237 | 1641 | if pkg.is_kernel_pkg and self.oval_format != 'oci': | ||
2238 | 1642 | self._populate_kernel_pkg(cve, pkg, root_element, definition_element, running_kernel_id, pkg_cache, fixed_versions) | ||
2239 | 1661 | else: | 1643 | else: |
2406 | 1662 | state_ref = '\n <ind-def:state state_ref="{0}" />'.format(state_id) if state_id else '' | 1644 | self._populate_pkg(cve, pkg, root_element, definition_element, pkg_cache, fixed_versions) |
2407 | 1663 | self.queue_element('test', """ | 1645 | |
2408 | 1664 | <ind-def:textfilecontent54_test id="{0}" version="{1}" check_existence="{5}" check="at least one" comment="{2}"> | 1646 | if pkg_rel_entry.status == 'fixed' and pkg.versions_binaries: |
2409 | 1665 | <ind-def:object object_ref="{3}"/>{4} | 1647 | product_description = cve_lib.get_subproject_description(pkg_rel_entry.release) |
2410 | 1666 | </ind-def:textfilecontent54_test>\n""".format(test_id, version, test_title, object_id, state_ref, check_existence)) | 1648 | instructions = self.prepare_instructions(instructions, cve, product_description, pkg, pkg_rel_entry.fixed_version) |
2411 | 1667 | self.package_tests[key] = test_id | 1649 | |
2412 | 1668 | 1650 | if added: | |
2413 | 1669 | return self.package_tests[key] | 1651 | definitions = root_element.find("definitions") |
2414 | 1670 | 1652 | metadata = definition_element.find('metadata') | |
2415 | 1671 | def get_running_kernel_testref(self, package): | 1653 | metadata.find('description').text = metadata.find('description').text + instructions |
2416 | 1672 | if package['status'] == 'not-applicable': | 1654 | definitions.append(definition_element) |
2251 | 1673 | # if the package status is not-applicable, skip it! | ||
2252 | 1674 | return | ||
2253 | 1675 | elif package['status'] == 'not-vulnerable': | ||
2254 | 1676 | # if the packaget status is not-vulnerable, skip it! | ||
2255 | 1677 | return | ||
2256 | 1678 | |||
2257 | 1679 | testref = [] | ||
2258 | 1680 | uname_regex = process_kernel_binaries(package['binaries'], self.oval_format) | ||
2259 | 1681 | if uname_regex: | ||
2260 | 1682 | if self.oval_format == 'dpkg': | ||
2261 | 1683 | var_id = self.get_running_kernel_variable_id( | ||
2262 | 1684 | uname_regex, | ||
2263 | 1685 | package['id_base']) | ||
2264 | 1686 | ste_id = self.get_running_kernel_state_id( | ||
2265 | 1687 | uname_regex, | ||
2266 | 1688 | package['id_base'], | ||
2267 | 1689 | var_id) | ||
2268 | 1690 | obj_id = self.get_running_kernel_object_id( | ||
2269 | 1691 | package['id_base']) | ||
2270 | 1692 | test_id = self.get_running_kernel_test_id( | ||
2271 | 1693 | uname_regex, package['id_base'], package['name'], | ||
2272 | 1694 | obj_id, ste_id) | ||
2273 | 1695 | testref.append({'id': test_id, | ||
2274 | 1696 | 'comment': 'Is kernel {0} running'.format(package['name']), | ||
2275 | 1697 | 'kernel': uname_regex, | ||
2276 | 1698 | 'var_id': var_id, | ||
2277 | 1699 | } | ||
2278 | 1700 | ) | ||
2279 | 1701 | |||
2280 | 1702 | # even if a cve was not fixed, we should add the test and object | ||
2281 | 1703 | # but not the state as there won't be a fixed version to compare | ||
2282 | 1704 | # with | ||
2283 | 1705 | ste_id = None | ||
2284 | 1706 | if package['fix-version']: | ||
2285 | 1707 | ste_id = self.get_patched_kernel_state_id( | ||
2286 | 1708 | package['id_base'], | ||
2287 | 1709 | package['fix-version'] | ||
2288 | 1710 | ) | ||
2289 | 1711 | |||
2290 | 1712 | obj_id = self.get_patched_kernel_object_id(package['id_base'], | ||
2291 | 1713 | var_id) | ||
2292 | 1714 | test_id = self.get_patched_kernel_test_id( | ||
2293 | 1715 | package['id_base'], | ||
2294 | 1716 | package['fix-version'], | ||
2295 | 1717 | obj_id, ste_id | ||
2296 | 1718 | ) | ||
2297 | 1719 | testref.append({'id': test_id, | ||
2298 | 1720 | 'comment': 'kernel version comparison', | ||
2299 | 1721 | 'kernelobj': True}) | ||
2300 | 1722 | else: # OCI | ||
2301 | 1723 | object_id = self.get_package_object_id(package['name'], | ||
2302 | 1724 | [uname_regex], | ||
2303 | 1725 | package['id_base']) | ||
2304 | 1726 | state_id = None | ||
2305 | 1727 | test_title = "Does the '{0}' package exist?".format(package['name']) | ||
2306 | 1728 | if package['fix-version']: | ||
2307 | 1729 | state_id = self.get_package_version_state_id(package['id_base'], | ||
2308 | 1730 | package['fix-version']) | ||
2309 | 1731 | test_title = "Does the '{0}' package exist and is the version less than '{1}'?".format(package['name'], | ||
2310 | 1732 | package['fix-version']) | ||
2311 | 1733 | test_id = self.get_package_test_id(package['name'], | ||
2312 | 1734 | package['id_base'], | ||
2313 | 1735 | test_title, | ||
2314 | 1736 | object_id, | ||
2315 | 1737 | state_id) | ||
2316 | 1738 | package['note'] = package['name'] + package['note'] | ||
2317 | 1739 | return [{'id': test_id, 'comment': package['note']}] | ||
2318 | 1740 | |||
2319 | 1741 | return testref | ||
2320 | 1742 | |||
2321 | 1743 | # TODO: xml lib | ||
2322 | 1744 | def get_running_kernel_object_id(self, id_base, version=1): | ||
2323 | 1745 | """ creates a uname_object so we can use the value from uname -r for | ||
2324 | 1746 | mainly two things: | ||
2325 | 1747 | 1. compare with the return uname is of the same version and flavour | ||
2326 | 1748 | as the kernel we fixed a CVE. This is done in | ||
2327 | 1749 | get_running_kernel_state_id | ||
2328 | 1750 | 2. store the uname value, minus the flavour, in a debian evr string | ||
2329 | 1751 | format, e.g: 0:5.4.0-1059. With this we can compare if the patched | ||
2330 | 1752 | kernel is greater than the running kernel | ||
2331 | 1753 | The result of this two will go through an AND logic to confirm | ||
2332 | 1754 | if we are or not vulnerable to such CVE""" | ||
2333 | 1755 | if not hasattr(self, 'kernel_uname_obj_id'): | ||
2334 | 1756 | self.kernel_uname_obj_id = None | ||
2335 | 1757 | |||
2336 | 1758 | if not self.kernel_uname_obj_id: | ||
2337 | 1759 | object_id = '{0}:obj:{1}0'.format(self.ns, id_base) | ||
2338 | 1760 | |||
2339 | 1761 | self.queue_element('object', """ | ||
2340 | 1762 | <unix-def:uname_object id="{0}" version="{1}"/>\n""".format(object_id, version)) | ||
2341 | 1763 | |||
2342 | 1764 | self.kernel_uname_obj_id = object_id | ||
2343 | 1765 | |||
2344 | 1766 | return self.kernel_uname_obj_id | ||
2345 | 1767 | |||
2346 | 1768 | # TODO: xml lib | ||
2347 | 1769 | def get_running_kernel_state_id(self, uname_regex, id_base, var_id, version=1): | ||
2348 | 1770 | """ create uname_state to compare the system uname to the affected kernel | ||
2349 | 1771 | uname regex, allowing us to verify we are running the same major version | ||
2350 | 1772 | and flavour as the affected kernel. | ||
2351 | 1773 | Return its OVAL id | ||
2352 | 1774 | """ | ||
2353 | 1775 | if not hasattr(self, 'uname_states'): | ||
2354 | 1776 | self.uname_states = {} | ||
2355 | 1777 | |||
2356 | 1778 | if uname_regex not in self.uname_states: | ||
2357 | 1779 | state_id = '{0}:ste:{1}0'.format(self.ns, id_base) | ||
2358 | 1780 | self.queue_element('state', """ | ||
2359 | 1781 | <unix-def:uname_state id="{0}" version="{1}"> | ||
2360 | 1782 | <unix-def:os_release operation="pattern match">{2}</unix-def:os_release> | ||
2361 | 1783 | </unix-def:uname_state>\n""".format(state_id, version, uname_regex)) | ||
2362 | 1784 | |||
2363 | 1785 | self.uname_states[uname_regex] = state_id | ||
2364 | 1786 | |||
2365 | 1787 | return self.uname_states[uname_regex] | ||
2366 | 1788 | |||
2367 | 1789 | # TODO: xml lib | ||
2368 | 1790 | def get_running_kernel_variable_id(self, uname_regex, id_base, version=1): | ||
2369 | 1791 | """ creates a local variable to store running kernel version in devian evr string""" | ||
2370 | 1792 | if not hasattr(self, 'uname_variables'): | ||
2371 | 1793 | self.uname_variables = {} | ||
2372 | 1794 | |||
2373 | 1795 | var_id = '{0}:var:{1}0'.format(self.ns, id_base) | ||
2374 | 1796 | obj_id = '{0}:obj:{1}0'.format(self.ns, id_base) | ||
2375 | 1797 | self.queue_element('variable', """ | ||
2376 | 1798 | <local_variable id="{0}" datatype="debian_evr_string" version="{1}" comment="kernel version in evr format"> | ||
2377 | 1799 | <concat> | ||
2378 | 1800 | <literal_component>0:</literal_component> | ||
2379 | 1801 | <regex_capture pattern="^([\d|\.]+-\d+)[-|\w]+$"> | ||
2380 | 1802 | <object_component object_ref="{2}" item_field="os_release" /> | ||
2381 | 1803 | </regex_capture> | ||
2382 | 1804 | </concat> | ||
2383 | 1805 | </local_variable>\n""".format(var_id, version, obj_id)) | ||
2384 | 1806 | |||
2385 | 1807 | self.uname_variables['local_variable'] = var_id | ||
2386 | 1808 | |||
2387 | 1809 | return self.uname_variables['local_variable'] | ||
2388 | 1810 | |||
2389 | 1811 | # TODO: xml lib | ||
2390 | 1812 | def get_running_kernel_test_id(self, uname_regex, id_base, name, object_id, state_id, version=1): | ||
2391 | 1813 | """ create uname test and return its OVAL id """ | ||
2392 | 1814 | if not hasattr(self, 'uname_tests'): | ||
2393 | 1815 | self.uname_tests = {} | ||
2394 | 1816 | |||
2395 | 1817 | if uname_regex not in self.uname_tests: | ||
2396 | 1818 | test_id = '{0}:tst:{1}0'.format(self.ns, id_base) | ||
2397 | 1819 | self.queue_element('test', """ | ||
2398 | 1820 | <unix-def:uname_test check="at least one" comment="Is kernel {0} currently running?" id="{1}" version="{2}"> | ||
2399 | 1821 | <unix-def:object object_ref="{3}"/> | ||
2400 | 1822 | <unix-def:state state_ref="{4}"/> | ||
2401 | 1823 | </unix-def:uname_test>\n""".format(name, test_id, version, object_id, state_id)) | ||
2402 | 1824 | |||
2403 | 1825 | self.uname_tests[uname_regex] = test_id | ||
2404 | 1826 | |||
2405 | 1827 | return self.uname_tests[uname_regex] | ||
2417 | 1828 | 1655 | ||
2418 | 1829 | def get_patched_kernel_variable_id(self, id_base, fixed_version, version=1): | ||
2419 | 1830 | """ creates a local variable to store the patched kernel version """ | ||
2420 | 1831 | if not hasattr(self, 'patched_kernel_variables'): | ||
2421 | 1832 | self.patched_kernel_variables = {} | ||
2422 | 1833 | 1656 | ||
2431 | 1834 | patched = re.search('([\d|\.]+-\d+)[\.|\d]+', fixed_version) | 1657 | def generate_oval(self) -> None: |
2432 | 1835 | if patched: | 1658 | for release in self.releases: |
2433 | 1836 | patched = patched.group(1) | 1659 | self._init_ids(release) |
2434 | 1837 | else: | 1660 | self.definition_step = 1 * 10 ** 7 |
2435 | 1838 | patched = fixed_version | 1661 | xml_tree, root_element = self._get_root_element() |
2436 | 1839 | 1662 | generator = self._get_generator("CVE") | |
2437 | 1840 | if patched not in self.patched_kernel_variables: | 1663 | root_element.append(generator) |
2438 | 1841 | var_id = '{0}:var:{1}0'.format(self.ns, id_base + 1) | 1664 | self._add_structure(root_element) |
2439 | 1665 | running_kernel_id = None | ||
2440 | 1842 | 1666 | ||
2445 | 1843 | self.queue_element('variable', """ | 1667 | if self.oval_format == 'dpkg': |
2446 | 1844 | <constant_variable id="{0}" version="{1}" datatype="debian_evr_string" comment="patched kernel"> | 1668 | # One time kernel check |
2447 | 1845 | <value>0:{2}</value> | 1669 | self._add_release_checks(root_element) |
2448 | 1846 | </constant_variable>""".format(var_id, version, patched)) | 1670 | self._add_running_kernel_checks(root_element) |
2449 | 1671 | running_kernel_id = self.definition_id | ||
2450 | 1672 | |||
2451 | 1673 | pkg_cache = {} | ||
2452 | 1674 | fixed_versions = {} | ||
2453 | 1675 | accepted_releases = list(self.parent_releases) | ||
2454 | 1676 | accepted_releases.insert(0, self.release) | ||
2455 | 1677 | |||
2456 | 1678 | all_cves = self.cves[self.release] | ||
2457 | 1679 | for parent_release in list(self.parent_releases): | ||
2458 | 1680 | for cve in self.cves[parent_release]: | ||
2459 | 1681 | if cve not in all_cves: | ||
2460 | 1682 | all_cves[cve] = self.cves[parent_release][cve] | ||
2461 | 1683 | |||
2462 | 1684 | all_cves = dict(sorted(all_cves.items())) | ||
2463 | 1685 | |||
2464 | 1686 | for cve in all_cves: | ||
2465 | 1687 | self._set_definition_id(cve_id=all_cves[cve].number) | ||
2466 | 1688 | self._generate_elements_from_cve(all_cves[cve], accepted_releases, root_element, running_kernel_id, pkg_cache, fixed_versions) | ||
2467 | 1689 | |||
2468 | 1690 | self._write_oval_xml(xml_tree, root_element) | ||
2469 | 1691 | |||
2470 | 1692 | class OvalGeneratorUSNs(OvalGenerator): | ||
2471 | 1693 | def __init__(self, release, release_name, cve_paths, packages, progress, pkg_cache, usn_db_dir, fixed_only=True, cve_cache=None, cve_prefix_dir=None, outdir='./', oval_format='dpkg') -> None: | ||
2472 | 1694 | super().__init__('usn', release, release_name, cve_paths, packages, progress, pkg_cache, fixed_only, cve_cache, cve_prefix_dir, outdir, oval_format) | ||
2473 | 1695 | self._load_usns(usn_db_dir) | ||
2474 | 1696 | |||
2475 | 1697 | def _load_usns(self, usn_db_dir): | ||
2476 | 1698 | self.usns = {} | ||
2477 | 1699 | for filename in glob.glob(os.path.join(usn_db_dir, 'database*.json')): | ||
2478 | 1700 | with open(filename, 'r') as f: | ||
2479 | 1701 | data = json.load(f) | ||
2480 | 1702 | for item in data: | ||
2481 | 1703 | usn = USN(item) | ||
2482 | 1704 | self.usns[usn.id] = usn | ||
2483 | 1705 | |||
2484 | 1706 | for usn_id in sorted(self.usns.keys()): | ||
2485 | 1707 | if re.search(r'^[0-9]+-[0-9]$', usn_id): | ||
2486 | 1708 | self.usns[usn_id]['id'] = 'USN-' + usn_id | ||
2487 | 1709 | |||
2488 | 1710 | def _generate_advisory(self, usn: USN) -> etree.Element: | ||
2489 | 1711 | severities = ['low', 'medium', 'high', 'critical'] | ||
2490 | 1712 | advisory = etree.Element("advisory") | ||
2491 | 1713 | severity = etree.SubElement(advisory, "severity") | ||
2492 | 1714 | issued = etree.SubElement(advisory, "issued") | ||
2493 | 1715 | severity = None | ||
2494 | 1716 | for cve in usn.cves: | ||
2495 | 1717 | cve_obj = self._generate_cve_tag(self.cves[cve]) | ||
2496 | 1718 | advisory.append(cve_obj) | ||
2497 | 1847 | 1719 | ||
2499 | 1848 | self.patched_kernel_variables[patched] = var_id | 1720 | if not severity or severities.index(self.cves[cve].severity) > severities.index(severity): |
2500 | 1721 | severity = self.cves[cve].severity | ||
2501 | 1849 | 1722 | ||
2503 | 1850 | return self.patched_kernel_variables[patched] | 1723 | severity.text = severity.capitalize() |
2504 | 1724 | issued.text = usn.timestamp | ||
2505 | 1851 | 1725 | ||
2510 | 1852 | def get_patched_kernel_object_id(self, id_base, var_id, version=1): | 1726 | return advisory |
2507 | 1853 | """ create variable object that points to kernel version | ||
2508 | 1854 | in evr format in local_variable | ||
2509 | 1855 | """ | ||
2511 | 1856 | 1727 | ||
2513 | 1857 | object_id = '{0}:obj:{1}0'.format(self.ns, id_base + 1) | 1728 | def _generate_metadata(self, usn: USN) -> etree.Element: |
2514 | 1729 | metadata = etree.Element("metadata") | ||
2515 | 1730 | title = etree.SubElement(metadata, "title") | ||
2516 | 1731 | description = etree.SubElement(metadata, "description") | ||
2517 | 1732 | affected = etree.SubElement(metadata, "affected", attrib = {"family": "unix"}) | ||
2518 | 1733 | platform = etree.SubElement(affected, "platform") | ||
2519 | 1858 | 1734 | ||
2524 | 1859 | self.queue_element('object', """ | 1735 | reference = self._generate_reference(usn) |
2525 | 1860 | <ind-def:variable_object id="{0}" version="{1}"> | 1736 | metadata.append(reference) |
2526 | 1861 | <ind-def:var_ref>{2}</ind-def:var_ref> | 1737 | advisory = self._generate_advisory(usn) |
2527 | 1862 | </ind-def:variable_object>\n""".format(object_id, version, var_id)) | 1738 | metadata.append(reference) |
2528 | 1739 | metadata.append(advisory) | ||
2529 | 1863 | 1740 | ||
2531 | 1864 | return object_id | 1741 | platform.text = self.release_name |
2532 | 1742 | title.text = usn.title | ||
2533 | 1743 | description.text = usn.description | ||
2534 | 1865 | 1744 | ||
2542 | 1866 | # TODO: xml lib | 1745 | return metadata |
2536 | 1867 | def get_patched_kernel_state_id(self, id_base, fixed_version, version=1): | ||
2537 | 1868 | """ create state to compare to the running kernel | ||
2538 | 1869 | Return its OVAL id | ||
2539 | 1870 | """ | ||
2540 | 1871 | if not hasattr(self, 'patched_kernel_states'): | ||
2541 | 1872 | self.patched_kernel_states = {} | ||
2543 | 1873 | 1746 | ||
2549 | 1874 | patched = re.search('([\d|\.]+-\d+)[\.|\d]+', fixed_version) | 1747 | # Element generators |
2550 | 1875 | if patched: | 1748 | def _generate_reference(self, usn: USN) -> etree.Element: |
2551 | 1876 | patched = patched.group(1) | 1749 | reference = etree.Element("reference", attrib={ |
2552 | 1877 | else: | 1750 | "source": "USN", |
2553 | 1878 | patched = fixed_version | 1751 | "ref_id": usn.id, |
2554 | 1752 | "ref_url": f'https://ubuntu.com/security/notices/{usn.id}' | ||
2555 | 1753 | }) | ||
2556 | 1879 | 1754 | ||
2559 | 1880 | if patched not in self.patched_kernel_states: | 1755 | return reference |
2558 | 1881 | state_id = '{0}:ste:{1}0'.format(self.ns, id_base + 1) | ||
2560 | 1882 | 1756 | ||
2565 | 1883 | self.queue_element('state', """ | 1757 | def _populate_pkg(self, cve: CVE, package: Package, root_element, main_criteria, cache, fixed_versions) -> None: |
2566 | 1884 | <ind-def:variable_state id="{0}" version="{1}"> | 1758 | tests = root_element.find("tests") |
2567 | 1885 | <ind-def:value datatype="debian_evr_string" operation="less than">{2}</ind-def:value> | 1759 | objects = root_element.find("objects") |
2568 | 1886 | </ind-def:variable_state>\n""".format(state_id, version, patched)) | 1760 | variables = root_element.find("variables") |
2569 | 1761 | states = root_element.find("states") | ||
2570 | 1762 | pkg_rel_entry = cve.pkg_rel_entries[str(package)] | ||
2571 | 1763 | cache.setdefault(package.name, dict(bin_id=None, def_id=None)) | ||
2572 | 1887 | 1764 | ||
2573 | 1888 | self.patched_kernel_states[patched] = state_id | ||
2574 | 1889 | 1765 | ||
2576 | 1890 | return self.patched_kernel_states[patched] | 1766 | if pkg_rel_entry.status == 'vulnerable' and not self.fixed_only: |
2577 | 1767 | if not package.name in cache or not cache[package.name]['def_id']: | ||
2578 | 1768 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, main_criteria) | ||
2579 | 1891 | 1769 | ||
2584 | 1892 | def get_patched_kernel_test_id(self, id_base, fixed_version, object_id, state_id, version=1): | 1770 | test, object, var = self._generate_vulnerable_elements(package, cache[package.name]['bin_id']) |
2585 | 1893 | """ create patched kernel test and return its OVAL id """ | 1771 | tests.append(test) |
2582 | 1894 | if not hasattr(self, 'patched_kernel_tests'): | ||
2583 | 1895 | self.patched_kernel_tests = {} | ||
2586 | 1896 | 1772 | ||
2589 | 1897 | if fixed_version not in self.patched_kernel_tests: | 1773 | if not cache[package.name]['bin_id']: |
2590 | 1898 | test_id = '{0}:tst:{1}0'.format(self.ns, id_base + 1) | 1774 | objects.append(object) |
2591 | 1775 | variables.append(var) | ||
2592 | 1776 | cache[package.name]['bin_id'] = self.definition_id | ||
2593 | 1899 | 1777 | ||
2600 | 1900 | if state_id: | 1778 | cache[package.name]['def_id'] = self.definition_id |
2601 | 1901 | self.queue_element('test', """ | 1779 | self._increase_id(is_definition=False) |
2596 | 1902 | <ind-def:variable_test id="{0}" version="1" check="all" check_existence="all_exist" comment="kernel version comparison"> | ||
2597 | 1903 | <ind-def:object object_ref="{1}"/> | ||
2598 | 1904 | <ind-def:state state_ref="{2}"/> | ||
2599 | 1905 | </ind-def:variable_test>\n""".format(test_id, object_id, state_id)) | ||
2602 | 1906 | else: | 1780 | else: |
2621 | 1907 | self.queue_element('test', """ | 1781 | self._add_criterion(cache[package.name]['def_id'], pkg_rel_entry, cve, main_criteria) |
2622 | 1908 | <ind-def:variable_test id="{0}" version="1" check="all" check_existence="all_exist" comment="kernel version comparison"> | 1782 | elif pkg_rel_entry.status == 'fixed': |
2623 | 1909 | <ind-def:object object_ref="{1}"/> | 1783 | if pkg_rel_entry.fixed_version in fixed_versions: |
2624 | 1910 | </ind-def:variable_test>\n""".format(test_id, object_id)) | 1784 | self._add_criterion(fixed_versions[pkg_rel_entry.fixed_version], pkg_rel_entry, cve, main_criteria) |
2625 | 1911 | 1785 | else: | |
2626 | 1912 | self.patched_kernel_tests[fixed_version] = test_id | 1786 | self._add_criterion(self.definition_id, pkg_rel_entry, cve, main_criteria) |
2609 | 1913 | |||
2610 | 1914 | return self.patched_kernel_tests[fixed_version] | ||
2611 | 1915 | |||
2612 | 1916 | def queue_element(self, element, xml): | ||
2613 | 1917 | """ add an OVAL element to an output queue file """ | ||
2614 | 1918 | if element not in OvalGenerator.supported_oval_elements: | ||
2615 | 1919 | self.warn('"{0}" is not a supported OVAL element.'.format(element)) | ||
2616 | 1920 | return | ||
2617 | 1921 | |||
2618 | 1922 | if not hasattr(self, 'tmp'): | ||
2619 | 1923 | self.tmp = {} | ||
2620 | 1924 | self.tmp_n = random.randrange(1000000, 9999999) | ||
2627 | 1925 | 1787 | ||
2632 | 1926 | if element not in self.tmp: | 1788 | test, object, var, state = self._generate_fixed_elements(package, pkg_rel_entry, cache[package.name]['bin_id']) |
2633 | 1927 | self.tmp[element] = _open(os.path.join(self.tmpdir, | 1789 | tests.append(test) |
2634 | 1928 | './queue.{0}.{1}.xml'.format( | 1790 | states.append(state) |
2631 | 1929 | self.tmp_n, element)), 'wt') | ||
2635 | 1930 | 1791 | ||
2641 | 1931 | # trim and fix indenting (assumes fragment is nicely indented internally) | 1792 | if not cache[package.name]['bin_id']: |
2642 | 1932 | xml = xml.strip('\n') | 1793 | objects.append(object) |
2643 | 1933 | base_indent = re.match(r'\s*', xml).group(0) | 1794 | variables.append(var) |
2644 | 1934 | xml = re.sub('^{0}'.format(base_indent), ' ', xml, 0, | 1795 | cache[package.name]['bin_id'] = self.definition_id |
2640 | 1935 | re.MULTILINE) | ||
2645 | 1936 | 1796 | ||
2647 | 1937 | self.tmp[element].write(xml + '\n') | 1797 | fixed_versions[pkg_rel_entry.fixed_version] = self.definition_id |
2648 | 1798 | self._increase_id(is_definition=False) | ||
2649 | 1938 | 1799 | ||
2655 | 1939 | # TODO: xml lib | 1800 | def _populate_kernel_pkg(self, cve: CVE, package: Package, root_element, main_criteria, running_kernel_id, cache, fixed_versions) -> None: |
2656 | 1940 | def write_to_file(self): | 1801 | if not package.name in cache: |
2657 | 1941 | """ dequeue all elements into one OVAL definitions file and clean up """ | 1802 | # Generate one-time elements |
2658 | 1942 | if not hasattr(self, 'tmp'): | 1803 | kernel_criterion = self._generate_kernel_package_elements(package, root_element, running_kernel_id) |
2659 | 1943 | return | 1804 | cache[package.name] = kernel_criterion |
2660 | 1944 | 1805 | ||
2665 | 1945 | # close queue files for writing and then open for reading | 1806 | pkg_rel_entry = cve.pkg_rel_entries[str(package)] |
2662 | 1946 | for key in self.tmp: | ||
2663 | 1947 | self.tmp[key].close() | ||
2664 | 1948 | self.tmp[key] = _open(self.tmp[key].name, 'rt') | ||
2666 | 1949 | 1807 | ||
2679 | 1950 | tmp = os.path.join(self.tmpdir, self.output_filepath) | 1808 | if pkg_rel_entry.status == 'fixed': |
2680 | 1951 | with _open(tmp, 'wt') as f: | 1809 | criteria = self._generate_criteria_kernel('AND') |
2681 | 1952 | # add header | 1810 | self._add_to_criteria(criteria, cache[package.name], operator='AND', depth=0) |
2670 | 1953 | oval_timestamp = datetime.now(tz=timezone.utc).strftime( | ||
2671 | 1954 | '%Y-%m-%dT%H:%M:%S') | ||
2672 | 1955 | f.write("""<oval_definitions | ||
2673 | 1956 | xmlns="http://oval.mitre.org/XMLSchema/oval-definitions-5" | ||
2674 | 1957 | xmlns:ind-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#independent" | ||
2675 | 1958 | xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5" | ||
2676 | 1959 | xmlns:unix-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix" | ||
2677 | 1960 | xmlns:linux-def="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux" | ||
2678 | 1961 | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd"> | ||
2682 | 1962 | 1811 | ||
2689 | 1963 | <generator> | 1812 | kernel_version_criterion = self._add_kernel_elements(cve, package, pkg_rel_entry, root_element, running_kernel_id, fixed_versions) |
2690 | 1964 | <oval:product_name>Canonical CVE OVAL Generator</oval:product_name> | 1813 | self._add_to_criteria(criteria, kernel_version_criterion, depth=0) |
2691 | 1965 | <oval:product_version>{0}</oval:product_version> | 1814 | self._add_to_criteria(main_criteria, criteria, depth=2, operator='OR') |
2692 | 1966 | <oval:schema_version>{1}</oval:schema_version> | 1815 | self._increase_id(is_definition=False) |
2693 | 1967 | <oval:timestamp>{2}</oval:timestamp> | 1816 | else: |
2694 | 1968 | </generator>\n""".format(OvalGenerator.generator_version, OvalGenerator.oval_schema_version, oval_timestamp)) | 1817 | self._add_to_criteria(main_criteria, cache[package.name], depth=2, operator='OR') |
2695 | 1818 | |||
2696 | 1819 | self._increase_id(is_definition=True) | ||
2697 | 1969 | 1820 | ||
2704 | 1970 | # add queued file content | 1821 | def generate_oval(self) -> None: |
2705 | 1971 | for element in OvalGenerator.supported_oval_elements: | 1822 | self._reset() |
2706 | 1972 | if element in self.tmp: | 1823 | xml_tree, root_element = self._get_root_element() |
2707 | 1973 | f.write("\n <{0}s>\n".format(element)) | 1824 | generator = self._get_generator("USN") |
2708 | 1974 | f.write(self.tmp[element].read().rstrip()) | 1825 | root_element.append(generator) |
2709 | 1975 | f.write("\n </{0}s>".format(element)) | 1826 | self._add_structure(root_element) |
2710 | 1976 | 1827 | ||
2713 | 1977 | # add footer | 1828 | if self.oval_format == 'dpkg': |
2714 | 1978 | f.write("\n</oval_definitions>") | 1829 | # One time kernel check |
2715 | 1830 | self._add_release_checks(root_element) | ||
2716 | 1831 | self._add_running_kernel_checks(root_element) | ||
2717 | 1832 | running_kernel_id = self.definition_id | ||
2718 | 1833 | self._increase_id(is_definition=True) | ||
2719 | 1979 | 1834 | ||
2724 | 1980 | # close and delete queue files | 1835 | definitions = root_element.find("definitions") |
2725 | 1981 | for key in self.tmp: | 1836 | pkg_cache = {} |
2726 | 1982 | self.tmp[key].close() | 1837 | fixed_versions = {} |
2723 | 1983 | os.remove(self.tmp[key].name) | ||
2727 | 1984 | 1838 | ||
2731 | 1985 | # close self.output_filepath and move into place | 1839 | for usn in self.usns: |
2732 | 1986 | f.close() | 1840 | definition_element = self._generate_definition_object(self.usns[usn]) |
2733 | 1987 | shutil.move(tmp, os.path.join(self.output_dir, self.output_filepath)) | 1841 | instructions = '' |
2734 | 1988 | 1842 | ||
2738 | 1989 | # remove tmp dir if empty | 1843 | for cve in self.cves: |
2739 | 1990 | if not os.listdir(self.tmpdir): | 1844 | for pkg in self.cves[cve].pkgs: |
2740 | 1991 | os.rmdir(self.tmpdir) | 1845 | pkg_rel_entry = self.cves[cve].pkg_rel_entries[str(pkg)] |
2741 | 1846 | if self.packages[pkg].is_kernel_pkg and self.oval_format != 'oci': | ||
2742 | 1847 | self._populate_kernel_pkg(self.cves[cve], pkg, root_element, definition_element, running_kernel_id, pkg_cache, fixed_versions) | ||
2743 | 1848 | else: | ||
2744 | 1849 | self._populate_pkg(self.cves[cve], pkg, root_element, definition_element, pkg_cache, fixed_versions) | ||
2745 | 1850 | |||
2746 | 1851 | if pkg_rel_entry.status == 'fixed' and pkg.binaries: | ||
2747 | 1852 | product_description = cve_lib.get_subproject_description(pkg_rel_entry.release) | ||
2748 | 1853 | instructions = prepare_instructions(instructions, self.cves[cve].number, product_description, {'binaries': pkg.binaries, 'fix-version': pkg_rel_entry.fixed_version}) | ||
2749 | 1854 | |||
2750 | 1855 | metadata = definition_element.find('metadata') | ||
2751 | 1856 | metadata.find('description').text = metadata.find('description').text + instructions | ||
2752 | 1857 | definitions.append(definition_element) | ||
2753 | 1992 | 1858 | ||
2768 | 1993 | def unique_id_base(self, id_base, note): | 1859 | self._write_oval_xml(xml_tree, root_element) |
2755 | 1994 | """ queue a warning message """ | ||
2756 | 1995 | if not hasattr(self, 'id_bases'): | ||
2757 | 1996 | self.id_bases = {} | ||
2758 | 1997 | is_unique = id_base not in self.id_bases.keys() | ||
2759 | 1998 | if not is_unique: | ||
2760 | 1999 | self.warn('ID Base collision {0} in {1} and {2}.'.format( | ||
2761 | 2000 | id_base, note, self.id_bases[id_base])) | ||
2762 | 2001 | self.id_bases[id_base] = note | ||
2763 | 2002 | return is_unique | ||
2764 | 2003 | |||
2765 | 2004 | def warn(self, message): | ||
2766 | 2005 | """ print a warning message """ | ||
2767 | 2006 | print('WARNING: {0}'.format(message)) | ||
2769 | 2007 | 1860 | ||
2770 | 2008 | class OvalGeneratorUSN(): | 1861 | class OvalGeneratorUSN(): |
2771 | 2009 | supported_oval_elements = ('definition', 'test', 'object', 'state', | 1862 | supported_oval_elements = ('definition', 'test', 'object', 'state', |
2772 | @@ -2709,7 +2562,7 @@ class OvalGeneratorUSN(): | |||
2773 | 2709 | xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5" | 2562 | xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5" |
2774 | 2710 | xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix" | 2563 | xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix" |
2775 | 2711 | xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux" | 2564 | xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux" |
2777 | 2712 | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd"> | 2565 | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#linux linux-definitions-schema.xsd"> |
2778 | 2713 | 2566 | ||
2779 | 2714 | <generator> | 2567 | <generator> |
2780 | 2715 | <oval:product_name>Canonical USN OVAL Generator</oval:product_name> | 2568 | <oval:product_name>Canonical USN OVAL Generator</oval:product_name> |
2781 | diff --git a/test/gold_oval_structure/oval.xml b/test/gold_oval_structure/oval.xml | |||
2782 | index c45fd96..1600b15 100644 | |||
2783 | --- a/test/gold_oval_structure/oval.xml | |||
2784 | +++ b/test/gold_oval_structure/oval.xml | |||
2785 | @@ -4,7 +4,7 @@ | |||
2786 | 4 | xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5" | 4 | xmlns:oval="http://oval.mitre.org/XMLSchema/oval-common-5" |
2787 | 5 | xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix" | 5 | xmlns:unix="http://oval.mitre.org/XMLSchema/oval-definitions-5#unix" |
2788 | 6 | xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux" | 6 | xmlns:linux="http://oval.mitre.org/XMLSchema/oval-definitions-5#linux" |
2790 | 7 | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#macos linux-definitions-schema.xsd"> | 7 | xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://oval.mitre.org/XMLSchema/oval-common-5 oval-common-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5 oval-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#independent independent-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#unix unix-definitions-schema.xsd http://oval.mitre.org/XMLSchema/oval-definitions-5#linux linux-definitions-schema.xsd"> |
2791 | 8 | 8 | ||
2792 | 9 | <generator> | 9 | <generator> |
2793 | 10 | <oval:product_name>Canonical USN OVAL Generator</oval:product_name> | 10 | <oval:product_name>Canonical USN OVAL Generator</oval:product_name> |
Hey, I just did a quick check and found some small things to improve and wanted to mention already.
I will continue to check the code later and run some tests locally too.
Thanks for all this hard work :)