diff options
Diffstat (limited to 'meta/lib/oe/cve_check.py')
| -rw-r--r-- | meta/lib/oe/cve_check.py | 378 |
1 files changed, 0 insertions, 378 deletions
diff --git a/meta/lib/oe/cve_check.py b/meta/lib/oe/cve_check.py deleted file mode 100644 index ae194f27cf..0000000000 --- a/meta/lib/oe/cve_check.py +++ /dev/null | |||
| @@ -1,378 +0,0 @@ | |||
| 1 | # | ||
| 2 | # Copyright OpenEmbedded Contributors | ||
| 3 | # | ||
| 4 | # SPDX-License-Identifier: MIT | ||
| 5 | # | ||
| 6 | |||
| 7 | import collections | ||
| 8 | import functools | ||
| 9 | import itertools | ||
| 10 | import os.path | ||
| 11 | import re | ||
| 12 | import oe.patch | ||
| 13 | |||
| 14 | _Version = collections.namedtuple( | ||
| 15 | "_Version", ["release", "patch_l", "pre_l", "pre_v"] | ||
| 16 | ) | ||
| 17 | |||
| 18 | @functools.total_ordering | ||
| 19 | class Version(): | ||
| 20 | |||
| 21 | def __init__(self, version, suffix=None): | ||
| 22 | |||
| 23 | suffixes = ["alphabetical", "patch"] | ||
| 24 | |||
| 25 | if str(suffix) == "alphabetical": | ||
| 26 | version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" | ||
| 27 | elif str(suffix) == "patch": | ||
| 28 | version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" | ||
| 29 | else: | ||
| 30 | version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?""" | ||
| 31 | regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE) | ||
| 32 | |||
| 33 | match = regex.search(version) | ||
| 34 | if not match: | ||
| 35 | raise Exception("Invalid version: '{0}'".format(version)) | ||
| 36 | |||
| 37 | self._version = _Version( | ||
| 38 | release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")), | ||
| 39 | patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "", | ||
| 40 | pre_l=match.group("pre_l"), | ||
| 41 | pre_v=match.group("pre_v") | ||
| 42 | ) | ||
| 43 | |||
| 44 | self._key = _cmpkey( | ||
| 45 | self._version.release, | ||
| 46 | self._version.patch_l, | ||
| 47 | self._version.pre_l, | ||
| 48 | self._version.pre_v | ||
| 49 | ) | ||
| 50 | |||
| 51 | def __eq__(self, other): | ||
| 52 | if not isinstance(other, Version): | ||
| 53 | return NotImplemented | ||
| 54 | return self._key == other._key | ||
| 55 | |||
| 56 | def __gt__(self, other): | ||
| 57 | if not isinstance(other, Version): | ||
| 58 | return NotImplemented | ||
| 59 | return self._key > other._key | ||
| 60 | |||
| 61 | def _cmpkey(release, patch_l, pre_l, pre_v): | ||
| 62 | # remove leading 0 | ||
| 63 | _release = tuple( | ||
| 64 | reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release)))) | ||
| 65 | ) | ||
| 66 | |||
| 67 | _patch = patch_l.upper() | ||
| 68 | |||
| 69 | if pre_l is None and pre_v is None: | ||
| 70 | _pre = float('inf') | ||
| 71 | else: | ||
| 72 | _pre = float(pre_v) if pre_v else float('-inf') | ||
| 73 | return _release, _patch, _pre | ||
| 74 | |||
| 75 | |||
| 76 | def parse_cve_from_filename(patch_filename): | ||
| 77 | """ | ||
| 78 | Parses CVE ID from the filename | ||
| 79 | |||
| 80 | Matches the last "CVE-YYYY-ID" in the file name, also if written | ||
| 81 | in lowercase. Possible to have multiple CVE IDs in a single | ||
| 82 | file name, but only the last one will be detected from the file name. | ||
| 83 | |||
| 84 | Returns the last CVE ID foudn in the filename. If no CVE ID is found | ||
| 85 | an empty string is returned. | ||
| 86 | """ | ||
| 87 | cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE) | ||
| 88 | |||
| 89 | # Check patch file name for CVE ID | ||
| 90 | fname_match = cve_file_name_match.search(patch_filename) | ||
| 91 | return fname_match.group(1).upper() if fname_match else "" | ||
| 92 | |||
| 93 | |||
| 94 | def parse_cves_from_patch_contents(patch_contents): | ||
| 95 | """ | ||
| 96 | Parses CVE IDs from patch contents | ||
| 97 | |||
| 98 | Matches all CVE IDs contained on a line that starts with "CVE: ". Any | ||
| 99 | delimiter (',', '&', "and", etc.) can be used without any issues. Multiple | ||
| 100 | "CVE:" lines can also exist. | ||
| 101 | |||
| 102 | Returns a set of all CVE IDs found in the patch contents. | ||
| 103 | """ | ||
| 104 | cve_ids = set() | ||
| 105 | cve_match = re.compile(r"CVE-\d{4}-\d{4,}") | ||
| 106 | # Search for one or more "CVE: " lines | ||
| 107 | for line in patch_contents.split("\n"): | ||
| 108 | if not line.startswith("CVE:"): | ||
| 109 | continue | ||
| 110 | cve_ids.update(cve_match.findall(line)) | ||
| 111 | return cve_ids | ||
| 112 | |||
| 113 | |||
| 114 | def parse_cves_from_patch_file(patch_file): | ||
| 115 | """ | ||
| 116 | Parses CVE IDs associated with a particular patch file, using both the filename | ||
| 117 | and patch contents. | ||
| 118 | |||
| 119 | Returns a set of all CVE IDs found in the patch filename and contents. | ||
| 120 | """ | ||
| 121 | cve_ids = set() | ||
| 122 | filename_cve = parse_cve_from_filename(patch_file) | ||
| 123 | if filename_cve: | ||
| 124 | bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file)) | ||
| 125 | cve_ids.add(parse_cve_from_filename(patch_file)) | ||
| 126 | |||
| 127 | # Remote patches won't be present and compressed patches won't be | ||
| 128 | # unpacked, so say we're not scanning them | ||
| 129 | if not os.path.isfile(patch_file): | ||
| 130 | bb.note("%s is remote or compressed, not scanning content" % patch_file) | ||
| 131 | return cve_ids | ||
| 132 | |||
| 133 | with open(patch_file, "r", encoding="utf-8") as f: | ||
| 134 | try: | ||
| 135 | patch_text = f.read() | ||
| 136 | except UnicodeDecodeError: | ||
| 137 | bb.debug( | ||
| 138 | 1, | ||
| 139 | "Failed to read patch %s using UTF-8 encoding" | ||
| 140 | " trying with iso8859-1" % patch_file, | ||
| 141 | ) | ||
| 142 | f.close() | ||
| 143 | with open(patch_file, "r", encoding="iso8859-1") as f: | ||
| 144 | patch_text = f.read() | ||
| 145 | |||
| 146 | cve_ids.update(parse_cves_from_patch_contents(patch_text)) | ||
| 147 | |||
| 148 | if not cve_ids: | ||
| 149 | bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file) | ||
| 150 | else: | ||
| 151 | bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids)))) | ||
| 152 | |||
| 153 | return cve_ids | ||
| 154 | |||
| 155 | |||
| 156 | @bb.parse.vardeps("CVE_STATUS") | ||
| 157 | def get_patched_cves(d): | ||
| 158 | """ | ||
| 159 | Determines the CVE IDs that have been solved by either patches incuded within | ||
| 160 | SRC_URI or by setting CVE_STATUS. | ||
| 161 | |||
| 162 | Returns a dictionary with the CVE IDs as keys and an associated dictonary of | ||
| 163 | relevant metadata as the value. | ||
| 164 | """ | ||
| 165 | patched_cves = {} | ||
| 166 | patches = oe.patch.src_patches(d) | ||
| 167 | bb.debug(2, "Scanning %d patches for CVEs" % len(patches)) | ||
| 168 | |||
| 169 | # Check each patch file | ||
| 170 | for url in patches: | ||
| 171 | patch_file = bb.fetch.decodeurl(url)[2] | ||
| 172 | for cve_id in parse_cves_from_patch_file(patch_file): | ||
| 173 | if cve_id not in patched_cves: | ||
| 174 | patched_cves[cve_id] = { | ||
| 175 | "abbrev-status": "Patched", | ||
| 176 | "status": "fix-file-included", | ||
| 177 | "resource": [patch_file], | ||
| 178 | } | ||
| 179 | else: | ||
| 180 | patched_cves[cve_id]["resource"].append(patch_file) | ||
| 181 | |||
| 182 | # Search for additional patched CVEs | ||
| 183 | for cve_id in d.getVarFlags("CVE_STATUS") or {}: | ||
| 184 | decoded_status = decode_cve_status(d, cve_id) | ||
| 185 | products = d.getVar("CVE_PRODUCT") | ||
| 186 | if has_cve_product_match(decoded_status, products): | ||
| 187 | if cve_id in patched_cves: | ||
| 188 | bb.warn( | ||
| 189 | 'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"' | ||
| 190 | % ( | ||
| 191 | cve_id, | ||
| 192 | d.getVarFlag("CVE_STATUS", cve_id), | ||
| 193 | patched_cves[cve_id]["abbrev-status"], | ||
| 194 | patched_cves[cve_id]["status"], | ||
| 195 | ) | ||
| 196 | ) | ||
| 197 | patched_cves[cve_id] = { | ||
| 198 | "abbrev-status": decoded_status["mapping"], | ||
| 199 | "status": decoded_status["detail"], | ||
| 200 | "justification": decoded_status["description"], | ||
| 201 | "affected-vendor": decoded_status["vendor"], | ||
| 202 | "affected-product": decoded_status["product"], | ||
| 203 | } | ||
| 204 | |||
| 205 | return patched_cves | ||
| 206 | |||
| 207 | |||
| 208 | def get_cpe_ids(cve_product, version): | ||
| 209 | """ | ||
| 210 | Get list of CPE identifiers for the given product and version | ||
| 211 | """ | ||
| 212 | |||
| 213 | version = version.split("+git")[0] | ||
| 214 | |||
| 215 | cpe_ids = [] | ||
| 216 | for product in cve_product.split(): | ||
| 217 | # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not, | ||
| 218 | # use wildcard for vendor. | ||
| 219 | if ":" in product: | ||
| 220 | vendor, product = product.split(":", 1) | ||
| 221 | else: | ||
| 222 | vendor = "*" | ||
| 223 | |||
| 224 | cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version) | ||
| 225 | cpe_ids.append(cpe_id) | ||
| 226 | |||
| 227 | return cpe_ids | ||
| 228 | |||
| 229 | def cve_check_merge_jsons(output, data): | ||
| 230 | """ | ||
| 231 | Merge the data in the "package" property to the main data file | ||
| 232 | output | ||
| 233 | """ | ||
| 234 | if output["version"] != data["version"]: | ||
| 235 | bb.error("Version mismatch when merging JSON outputs") | ||
| 236 | return | ||
| 237 | |||
| 238 | for product in output["package"]: | ||
| 239 | if product["name"] == data["package"][0]["name"]: | ||
| 240 | bb.error("Error adding the same package %s twice" % product["name"]) | ||
| 241 | return | ||
| 242 | |||
| 243 | output["package"].append(data["package"][0]) | ||
| 244 | |||
| 245 | def update_symlinks(target_path, link_path): | ||
| 246 | """ | ||
| 247 | Update a symbolic link link_path to point to target_path. | ||
| 248 | Remove the link and recreate it if exist and is different. | ||
| 249 | """ | ||
| 250 | if link_path != target_path and os.path.exists(target_path): | ||
| 251 | if os.path.exists(os.path.realpath(link_path)): | ||
| 252 | os.remove(link_path) | ||
| 253 | os.symlink(os.path.basename(target_path), link_path) | ||
| 254 | |||
| 255 | |||
| 256 | def convert_cve_version(version): | ||
| 257 | """ | ||
| 258 | This function converts from CVE format to Yocto version format. | ||
| 259 | eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1 | ||
| 260 | |||
| 261 | Unless it is redefined using CVE_VERSION in the recipe, | ||
| 262 | cve_check uses the version in the name of the recipe (${PV}) | ||
| 263 | to check vulnerabilities against a CVE in the database downloaded from NVD. | ||
| 264 | |||
| 265 | When the version has an update, i.e. | ||
| 266 | "p1" in OpenSSH 8.3p1, | ||
| 267 | "-rc1" in linux kernel 6.2-rc1, | ||
| 268 | the database stores the version as version_update (8.3_p1, 6.2_rc1). | ||
| 269 | Therefore, we must transform this version before comparing to the | ||
| 270 | recipe version. | ||
| 271 | |||
| 272 | In this case, the parameter of the function is 8.3_p1. | ||
| 273 | If the version uses the Release Candidate format, "rc", | ||
| 274 | this function replaces the '_' by '-'. | ||
| 275 | If the version uses the Update format, "p", | ||
| 276 | this function removes the '_' completely. | ||
| 277 | """ | ||
| 278 | import re | ||
| 279 | |||
| 280 | matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version) | ||
| 281 | |||
| 282 | if not matches: | ||
| 283 | return version | ||
| 284 | |||
| 285 | version = matches.group(1) | ||
| 286 | update = matches.group(2) | ||
| 287 | |||
| 288 | if matches.group(3) == "rc": | ||
| 289 | return version + '-' + update | ||
| 290 | |||
| 291 | return version + update | ||
| 292 | |||
| 293 | @bb.parse.vardeps("CVE_STATUS", "CVE_CHECK_STATUSMAP") | ||
| 294 | def decode_cve_status(d, cve): | ||
| 295 | """ | ||
| 296 | Convert CVE_STATUS into status, vendor, product, detail and description. | ||
| 297 | """ | ||
| 298 | status = d.getVarFlag("CVE_STATUS", cve) | ||
| 299 | if not status: | ||
| 300 | return {} | ||
| 301 | |||
| 302 | status_split = status.split(':', 4) | ||
| 303 | status_out = {} | ||
| 304 | status_out["detail"] = status_split[0] | ||
| 305 | product = "*" | ||
| 306 | vendor = "*" | ||
| 307 | description = "" | ||
| 308 | if len(status_split) >= 4 and status_split[1].strip() == "cpe": | ||
| 309 | # Both vendor and product are mandatory if cpe: present, the syntax is then: | ||
| 310 | # detail: cpe:vendor:product:description | ||
| 311 | vendor = status_split[2].strip() | ||
| 312 | product = status_split[3].strip() | ||
| 313 | description = status_split[4].strip() | ||
| 314 | elif len(status_split) >= 2 and status_split[1].strip() == "cpe": | ||
| 315 | # Malformed CPE | ||
| 316 | bb.warn( | ||
| 317 | 'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE' | ||
| 318 | % (cve, status) | ||
| 319 | ) | ||
| 320 | else: | ||
| 321 | # Other case: no CPE, the syntax is then: | ||
| 322 | # detail: description | ||
| 323 | description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else "" | ||
| 324 | |||
| 325 | status_out["vendor"] = vendor | ||
| 326 | status_out["product"] = product | ||
| 327 | status_out["description"] = description | ||
| 328 | |||
| 329 | detail = status_out["detail"] | ||
| 330 | status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail) | ||
| 331 | if status_mapping is None: | ||
| 332 | bb.warn( | ||
| 333 | 'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched' | ||
| 334 | % (detail, cve, status) | ||
| 335 | ) | ||
| 336 | status_mapping = "Unpatched" | ||
| 337 | status_out["mapping"] = status_mapping | ||
| 338 | |||
| 339 | return status_out | ||
| 340 | |||
| 341 | def has_cve_product_match(detailed_status, products): | ||
| 342 | """ | ||
| 343 | Check product/vendor match between detailed_status from decode_cve_status and a string of | ||
| 344 | products (like from CVE_PRODUCT) | ||
| 345 | """ | ||
| 346 | for product in products.split(): | ||
| 347 | vendor = "*" | ||
| 348 | if ":" in product: | ||
| 349 | vendor, product = product.split(":", 1) | ||
| 350 | |||
| 351 | if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \ | ||
| 352 | (product == detailed_status["product"] or detailed_status["product"] == "*"): | ||
| 353 | return True | ||
| 354 | |||
| 355 | #if no match, return False | ||
| 356 | return False | ||
| 357 | |||
| 358 | def extend_cve_status(d): | ||
| 359 | # do this only once in case multiple classes use this | ||
| 360 | if d.getVar("CVE_STATUS_EXTENDED"): | ||
| 361 | return | ||
| 362 | d.setVar("CVE_STATUS_EXTENDED", "1") | ||
| 363 | |||
| 364 | # Fallback all CVEs from CVE_CHECK_IGNORE to CVE_STATUS | ||
| 365 | cve_check_ignore = d.getVar("CVE_CHECK_IGNORE") | ||
| 366 | if cve_check_ignore: | ||
| 367 | bb.warn("CVE_CHECK_IGNORE is deprecated in favor of CVE_STATUS") | ||
| 368 | for cve in (d.getVar("CVE_CHECK_IGNORE") or "").split(): | ||
| 369 | d.setVarFlag("CVE_STATUS", cve, "ignored") | ||
| 370 | |||
| 371 | # Process CVE_STATUS_GROUPS to set multiple statuses and optional detail or description at once | ||
| 372 | for cve_status_group in (d.getVar("CVE_STATUS_GROUPS") or "").split(): | ||
| 373 | cve_group = d.getVar(cve_status_group) | ||
| 374 | if cve_group is not None: | ||
| 375 | for cve in cve_group.split(): | ||
| 376 | d.setVarFlag("CVE_STATUS", cve, d.getVarFlag(cve_status_group, "status")) | ||
| 377 | else: | ||
| 378 | bb.warn("CVE_STATUS_GROUPS contains undefined variable %s" % cve_status_group) | ||
