summaryrefslogtreecommitdiffstats
path: root/meta/lib/oe/cve_check.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oe/cve_check.py')
-rw-r--r--meta/lib/oe/cve_check.py378
1 files changed, 0 insertions, 378 deletions
diff --git a/meta/lib/oe/cve_check.py b/meta/lib/oe/cve_check.py
deleted file mode 100644
index ae194f27cf..0000000000
--- a/meta/lib/oe/cve_check.py
+++ /dev/null
@@ -1,378 +0,0 @@
1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: MIT
5#
6
7import collections
8import functools
9import itertools
10import os.path
11import re
12import oe.patch
13
14_Version = collections.namedtuple(
15 "_Version", ["release", "patch_l", "pre_l", "pre_v"]
16)
17
18@functools.total_ordering
19class Version():
20
21 def __init__(self, version, suffix=None):
22
23 suffixes = ["alphabetical", "patch"]
24
25 if str(suffix) == "alphabetical":
26 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
27 elif str(suffix) == "patch":
28 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
29 else:
30 version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
31 regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE)
32
33 match = regex.search(version)
34 if not match:
35 raise Exception("Invalid version: '{0}'".format(version))
36
37 self._version = _Version(
38 release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")),
39 patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "",
40 pre_l=match.group("pre_l"),
41 pre_v=match.group("pre_v")
42 )
43
44 self._key = _cmpkey(
45 self._version.release,
46 self._version.patch_l,
47 self._version.pre_l,
48 self._version.pre_v
49 )
50
51 def __eq__(self, other):
52 if not isinstance(other, Version):
53 return NotImplemented
54 return self._key == other._key
55
56 def __gt__(self, other):
57 if not isinstance(other, Version):
58 return NotImplemented
59 return self._key > other._key
60
61def _cmpkey(release, patch_l, pre_l, pre_v):
62 # remove leading 0
63 _release = tuple(
64 reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release))))
65 )
66
67 _patch = patch_l.upper()
68
69 if pre_l is None and pre_v is None:
70 _pre = float('inf')
71 else:
72 _pre = float(pre_v) if pre_v else float('-inf')
73 return _release, _patch, _pre
74
75
76def parse_cve_from_filename(patch_filename):
77 """
78 Parses CVE ID from the filename
79
80 Matches the last "CVE-YYYY-ID" in the file name, also if written
81 in lowercase. Possible to have multiple CVE IDs in a single
82 file name, but only the last one will be detected from the file name.
83
84 Returns the last CVE ID foudn in the filename. If no CVE ID is found
85 an empty string is returned.
86 """
87 cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE)
88
89 # Check patch file name for CVE ID
90 fname_match = cve_file_name_match.search(patch_filename)
91 return fname_match.group(1).upper() if fname_match else ""
92
93
94def parse_cves_from_patch_contents(patch_contents):
95 """
96 Parses CVE IDs from patch contents
97
98 Matches all CVE IDs contained on a line that starts with "CVE: ". Any
99 delimiter (',', '&', "and", etc.) can be used without any issues. Multiple
100 "CVE:" lines can also exist.
101
102 Returns a set of all CVE IDs found in the patch contents.
103 """
104 cve_ids = set()
105 cve_match = re.compile(r"CVE-\d{4}-\d{4,}")
106 # Search for one or more "CVE: " lines
107 for line in patch_contents.split("\n"):
108 if not line.startswith("CVE:"):
109 continue
110 cve_ids.update(cve_match.findall(line))
111 return cve_ids
112
113
114def parse_cves_from_patch_file(patch_file):
115 """
116 Parses CVE IDs associated with a particular patch file, using both the filename
117 and patch contents.
118
119 Returns a set of all CVE IDs found in the patch filename and contents.
120 """
121 cve_ids = set()
122 filename_cve = parse_cve_from_filename(patch_file)
123 if filename_cve:
124 bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file))
125 cve_ids.add(parse_cve_from_filename(patch_file))
126
127 # Remote patches won't be present and compressed patches won't be
128 # unpacked, so say we're not scanning them
129 if not os.path.isfile(patch_file):
130 bb.note("%s is remote or compressed, not scanning content" % patch_file)
131 return cve_ids
132
133 with open(patch_file, "r", encoding="utf-8") as f:
134 try:
135 patch_text = f.read()
136 except UnicodeDecodeError:
137 bb.debug(
138 1,
139 "Failed to read patch %s using UTF-8 encoding"
140 " trying with iso8859-1" % patch_file,
141 )
142 f.close()
143 with open(patch_file, "r", encoding="iso8859-1") as f:
144 patch_text = f.read()
145
146 cve_ids.update(parse_cves_from_patch_contents(patch_text))
147
148 if not cve_ids:
149 bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file)
150 else:
151 bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids))))
152
153 return cve_ids
154
155
156@bb.parse.vardeps("CVE_STATUS")
157def get_patched_cves(d):
158 """
159 Determines the CVE IDs that have been solved by either patches incuded within
160 SRC_URI or by setting CVE_STATUS.
161
162 Returns a dictionary with the CVE IDs as keys and an associated dictonary of
163 relevant metadata as the value.
164 """
165 patched_cves = {}
166 patches = oe.patch.src_patches(d)
167 bb.debug(2, "Scanning %d patches for CVEs" % len(patches))
168
169 # Check each patch file
170 for url in patches:
171 patch_file = bb.fetch.decodeurl(url)[2]
172 for cve_id in parse_cves_from_patch_file(patch_file):
173 if cve_id not in patched_cves:
174 patched_cves[cve_id] = {
175 "abbrev-status": "Patched",
176 "status": "fix-file-included",
177 "resource": [patch_file],
178 }
179 else:
180 patched_cves[cve_id]["resource"].append(patch_file)
181
182 # Search for additional patched CVEs
183 for cve_id in d.getVarFlags("CVE_STATUS") or {}:
184 decoded_status = decode_cve_status(d, cve_id)
185 products = d.getVar("CVE_PRODUCT")
186 if has_cve_product_match(decoded_status, products):
187 if cve_id in patched_cves:
188 bb.warn(
189 'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"'
190 % (
191 cve_id,
192 d.getVarFlag("CVE_STATUS", cve_id),
193 patched_cves[cve_id]["abbrev-status"],
194 patched_cves[cve_id]["status"],
195 )
196 )
197 patched_cves[cve_id] = {
198 "abbrev-status": decoded_status["mapping"],
199 "status": decoded_status["detail"],
200 "justification": decoded_status["description"],
201 "affected-vendor": decoded_status["vendor"],
202 "affected-product": decoded_status["product"],
203 }
204
205 return patched_cves
206
207
208def get_cpe_ids(cve_product, version):
209 """
210 Get list of CPE identifiers for the given product and version
211 """
212
213 version = version.split("+git")[0]
214
215 cpe_ids = []
216 for product in cve_product.split():
217 # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not,
218 # use wildcard for vendor.
219 if ":" in product:
220 vendor, product = product.split(":", 1)
221 else:
222 vendor = "*"
223
224 cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version)
225 cpe_ids.append(cpe_id)
226
227 return cpe_ids
228
229def cve_check_merge_jsons(output, data):
230 """
231 Merge the data in the "package" property to the main data file
232 output
233 """
234 if output["version"] != data["version"]:
235 bb.error("Version mismatch when merging JSON outputs")
236 return
237
238 for product in output["package"]:
239 if product["name"] == data["package"][0]["name"]:
240 bb.error("Error adding the same package %s twice" % product["name"])
241 return
242
243 output["package"].append(data["package"][0])
244
245def update_symlinks(target_path, link_path):
246 """
247 Update a symbolic link link_path to point to target_path.
248 Remove the link and recreate it if exist and is different.
249 """
250 if link_path != target_path and os.path.exists(target_path):
251 if os.path.exists(os.path.realpath(link_path)):
252 os.remove(link_path)
253 os.symlink(os.path.basename(target_path), link_path)
254
255
256def convert_cve_version(version):
257 """
258 This function converts from CVE format to Yocto version format.
259 eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1
260
261 Unless it is redefined using CVE_VERSION in the recipe,
262 cve_check uses the version in the name of the recipe (${PV})
263 to check vulnerabilities against a CVE in the database downloaded from NVD.
264
265 When the version has an update, i.e.
266 "p1" in OpenSSH 8.3p1,
267 "-rc1" in linux kernel 6.2-rc1,
268 the database stores the version as version_update (8.3_p1, 6.2_rc1).
269 Therefore, we must transform this version before comparing to the
270 recipe version.
271
272 In this case, the parameter of the function is 8.3_p1.
273 If the version uses the Release Candidate format, "rc",
274 this function replaces the '_' by '-'.
275 If the version uses the Update format, "p",
276 this function removes the '_' completely.
277 """
278 import re
279
280 matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version)
281
282 if not matches:
283 return version
284
285 version = matches.group(1)
286 update = matches.group(2)
287
288 if matches.group(3) == "rc":
289 return version + '-' + update
290
291 return version + update
292
293@bb.parse.vardeps("CVE_STATUS", "CVE_CHECK_STATUSMAP")
294def decode_cve_status(d, cve):
295 """
296 Convert CVE_STATUS into status, vendor, product, detail and description.
297 """
298 status = d.getVarFlag("CVE_STATUS", cve)
299 if not status:
300 return {}
301
302 status_split = status.split(':', 4)
303 status_out = {}
304 status_out["detail"] = status_split[0]
305 product = "*"
306 vendor = "*"
307 description = ""
308 if len(status_split) >= 4 and status_split[1].strip() == "cpe":
309 # Both vendor and product are mandatory if cpe: present, the syntax is then:
310 # detail: cpe:vendor:product:description
311 vendor = status_split[2].strip()
312 product = status_split[3].strip()
313 description = status_split[4].strip()
314 elif len(status_split) >= 2 and status_split[1].strip() == "cpe":
315 # Malformed CPE
316 bb.warn(
317 'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE'
318 % (cve, status)
319 )
320 else:
321 # Other case: no CPE, the syntax is then:
322 # detail: description
323 description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else ""
324
325 status_out["vendor"] = vendor
326 status_out["product"] = product
327 status_out["description"] = description
328
329 detail = status_out["detail"]
330 status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail)
331 if status_mapping is None:
332 bb.warn(
333 'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched'
334 % (detail, cve, status)
335 )
336 status_mapping = "Unpatched"
337 status_out["mapping"] = status_mapping
338
339 return status_out
340
341def has_cve_product_match(detailed_status, products):
342 """
343 Check product/vendor match between detailed_status from decode_cve_status and a string of
344 products (like from CVE_PRODUCT)
345 """
346 for product in products.split():
347 vendor = "*"
348 if ":" in product:
349 vendor, product = product.split(":", 1)
350
351 if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \
352 (product == detailed_status["product"] or detailed_status["product"] == "*"):
353 return True
354
355 #if no match, return False
356 return False
357
358def extend_cve_status(d):
359 # do this only once in case multiple classes use this
360 if d.getVar("CVE_STATUS_EXTENDED"):
361 return
362 d.setVar("CVE_STATUS_EXTENDED", "1")
363
364 # Fallback all CVEs from CVE_CHECK_IGNORE to CVE_STATUS
365 cve_check_ignore = d.getVar("CVE_CHECK_IGNORE")
366 if cve_check_ignore:
367 bb.warn("CVE_CHECK_IGNORE is deprecated in favor of CVE_STATUS")
368 for cve in (d.getVar("CVE_CHECK_IGNORE") or "").split():
369 d.setVarFlag("CVE_STATUS", cve, "ignored")
370
371 # Process CVE_STATUS_GROUPS to set multiple statuses and optional detail or description at once
372 for cve_status_group in (d.getVar("CVE_STATUS_GROUPS") or "").split():
373 cve_group = d.getVar(cve_status_group)
374 if cve_group is not None:
375 for cve in cve_group.split():
376 d.setVarFlag("CVE_STATUS", cve, d.getVarFlag(cve_status_group, "status"))
377 else:
378 bb.warn("CVE_STATUS_GROUPS contains undefined variable %s" % cve_status_group)