diff options
Diffstat (limited to 'meta/lib/oe/cve_check.py')
-rw-r--r-- | meta/lib/oe/cve_check.py | 257 |
1 files changed, 195 insertions, 62 deletions
diff --git a/meta/lib/oe/cve_check.py b/meta/lib/oe/cve_check.py index ed5c714cb8..ae194f27cf 100644 --- a/meta/lib/oe/cve_check.py +++ b/meta/lib/oe/cve_check.py | |||
@@ -5,9 +5,11 @@ | |||
5 | # | 5 | # |
6 | 6 | ||
7 | import collections | 7 | import collections |
8 | import re | ||
9 | import itertools | ||
10 | import functools | 8 | import functools |
9 | import itertools | ||
10 | import os.path | ||
11 | import re | ||
12 | import oe.patch | ||
11 | 13 | ||
12 | _Version = collections.namedtuple( | 14 | _Version = collections.namedtuple( |
13 | "_Version", ["release", "patch_l", "pre_l", "pre_v"] | 15 | "_Version", ["release", "patch_l", "pre_l", "pre_v"] |
@@ -71,71 +73,134 @@ def _cmpkey(release, patch_l, pre_l, pre_v): | |||
71 | return _release, _patch, _pre | 73 | return _release, _patch, _pre |
72 | 74 | ||
73 | 75 | ||
74 | def get_patched_cves(d): | 76 | def parse_cve_from_filename(patch_filename): |
75 | """ | 77 | """ |
76 | Get patches that solve CVEs using the "CVE: " tag. | 78 | Parses CVE ID from the filename |
79 | |||
80 | Matches the last "CVE-YYYY-ID" in the file name, also if written | ||
81 | in lowercase. Possible to have multiple CVE IDs in a single | ||
82 | file name, but only the last one will be detected from the file name. | ||
83 | |||
84 | Returns the last CVE ID foudn in the filename. If no CVE ID is found | ||
85 | an empty string is returned. | ||
77 | """ | 86 | """ |
87 | cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE) | ||
78 | 88 | ||
79 | import re | 89 | # Check patch file name for CVE ID |
80 | import oe.patch | 90 | fname_match = cve_file_name_match.search(patch_filename) |
91 | return fname_match.group(1).upper() if fname_match else "" | ||
81 | 92 | ||
82 | cve_match = re.compile(r"CVE:( CVE-\d{4}-\d+)+") | ||
83 | 93 | ||
84 | # Matches the last "CVE-YYYY-ID" in the file name, also if written | 94 | def parse_cves_from_patch_contents(patch_contents): |
85 | # in lowercase. Possible to have multiple CVE IDs in a single | 95 | """ |
86 | # file name, but only the last one will be detected from the file name. | 96 | Parses CVE IDs from patch contents |
87 | # However, patch files contents addressing multiple CVE IDs are supported | ||
88 | # (cve_match regular expression) | ||
89 | cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d+)", re.IGNORECASE) | ||
90 | 97 | ||
91 | patched_cves = set() | 98 | Matches all CVE IDs contained on a line that starts with "CVE: ". Any |
92 | patches = oe.patch.src_patches(d) | 99 | delimiter (',', '&', "and", etc.) can be used without any issues. Multiple |
93 | bb.debug(2, "Scanning %d patches for CVEs" % len(patches)) | 100 | "CVE:" lines can also exist. |
94 | for url in patches: | ||
95 | patch_file = bb.fetch.decodeurl(url)[2] | ||
96 | 101 | ||
97 | # Check patch file name for CVE ID | 102 | Returns a set of all CVE IDs found in the patch contents. |
98 | fname_match = cve_file_name_match.search(patch_file) | 103 | """ |
99 | if fname_match: | 104 | cve_ids = set() |
100 | cve = fname_match.group(1).upper() | 105 | cve_match = re.compile(r"CVE-\d{4}-\d{4,}") |
101 | patched_cves.add(cve) | 106 | # Search for one or more "CVE: " lines |
102 | bb.debug(2, "Found %s from patch file name %s" % (cve, patch_file)) | 107 | for line in patch_contents.split("\n"): |
103 | 108 | if not line.startswith("CVE:"): | |
104 | # Remote patches won't be present and compressed patches won't be | ||
105 | # unpacked, so say we're not scanning them | ||
106 | if not os.path.isfile(patch_file): | ||
107 | bb.note("%s is remote or compressed, not scanning content" % patch_file) | ||
108 | continue | 109 | continue |
110 | cve_ids.update(cve_match.findall(line)) | ||
111 | return cve_ids | ||
112 | |||
109 | 113 | ||
110 | with open(patch_file, "r", encoding="utf-8") as f: | 114 | def parse_cves_from_patch_file(patch_file): |
111 | try: | 115 | """ |
116 | Parses CVE IDs associated with a particular patch file, using both the filename | ||
117 | and patch contents. | ||
118 | |||
119 | Returns a set of all CVE IDs found in the patch filename and contents. | ||
120 | """ | ||
121 | cve_ids = set() | ||
122 | filename_cve = parse_cve_from_filename(patch_file) | ||
123 | if filename_cve: | ||
124 | bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file)) | ||
125 | cve_ids.add(parse_cve_from_filename(patch_file)) | ||
126 | |||
127 | # Remote patches won't be present and compressed patches won't be | ||
128 | # unpacked, so say we're not scanning them | ||
129 | if not os.path.isfile(patch_file): | ||
130 | bb.note("%s is remote or compressed, not scanning content" % patch_file) | ||
131 | return cve_ids | ||
132 | |||
133 | with open(patch_file, "r", encoding="utf-8") as f: | ||
134 | try: | ||
135 | patch_text = f.read() | ||
136 | except UnicodeDecodeError: | ||
137 | bb.debug( | ||
138 | 1, | ||
139 | "Failed to read patch %s using UTF-8 encoding" | ||
140 | " trying with iso8859-1" % patch_file, | ||
141 | ) | ||
142 | f.close() | ||
143 | with open(patch_file, "r", encoding="iso8859-1") as f: | ||
112 | patch_text = f.read() | 144 | patch_text = f.read() |
113 | except UnicodeDecodeError: | 145 | |
114 | bb.debug(1, "Failed to read patch %s using UTF-8 encoding" | 146 | cve_ids.update(parse_cves_from_patch_contents(patch_text)) |
115 | " trying with iso8859-1" % patch_file) | 147 | |
116 | f.close() | 148 | if not cve_ids: |
117 | with open(patch_file, "r", encoding="iso8859-1") as f: | 149 | bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file) |
118 | patch_text = f.read() | 150 | else: |
119 | 151 | bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids)))) | |
120 | # Search for one or more "CVE: " lines | 152 | |
121 | text_match = False | 153 | return cve_ids |
122 | for match in cve_match.finditer(patch_text): | 154 | |
123 | # Get only the CVEs without the "CVE: " tag | 155 | |
124 | cves = patch_text[match.start()+5:match.end()] | 156 | @bb.parse.vardeps("CVE_STATUS") |
125 | for cve in cves.split(): | 157 | def get_patched_cves(d): |
126 | bb.debug(2, "Patch %s solves %s" % (patch_file, cve)) | 158 | """ |
127 | patched_cves.add(cve) | 159 | Determines the CVE IDs that have been solved by either patches incuded within |
128 | text_match = True | 160 | SRC_URI or by setting CVE_STATUS. |
129 | 161 | ||
130 | if not fname_match and not text_match: | 162 | Returns a dictionary with the CVE IDs as keys and an associated dictonary of |
131 | bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file) | 163 | relevant metadata as the value. |
164 | """ | ||
165 | patched_cves = {} | ||
166 | patches = oe.patch.src_patches(d) | ||
167 | bb.debug(2, "Scanning %d patches for CVEs" % len(patches)) | ||
168 | |||
169 | # Check each patch file | ||
170 | for url in patches: | ||
171 | patch_file = bb.fetch.decodeurl(url)[2] | ||
172 | for cve_id in parse_cves_from_patch_file(patch_file): | ||
173 | if cve_id not in patched_cves: | ||
174 | patched_cves[cve_id] = { | ||
175 | "abbrev-status": "Patched", | ||
176 | "status": "fix-file-included", | ||
177 | "resource": [patch_file], | ||
178 | } | ||
179 | else: | ||
180 | patched_cves[cve_id]["resource"].append(patch_file) | ||
132 | 181 | ||
133 | # Search for additional patched CVEs | 182 | # Search for additional patched CVEs |
134 | for cve in (d.getVarFlags("CVE_STATUS") or {}): | 183 | for cve_id in d.getVarFlags("CVE_STATUS") or {}: |
135 | decoded_status, _, _ = decode_cve_status(d, cve) | 184 | decoded_status = decode_cve_status(d, cve_id) |
136 | if decoded_status == "Patched": | 185 | products = d.getVar("CVE_PRODUCT") |
137 | bb.debug(2, "CVE %s is additionally patched" % cve) | 186 | if has_cve_product_match(decoded_status, products): |
138 | patched_cves.add(cve) | 187 | if cve_id in patched_cves: |
188 | bb.warn( | ||
189 | 'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"' | ||
190 | % ( | ||
191 | cve_id, | ||
192 | d.getVarFlag("CVE_STATUS", cve_id), | ||
193 | patched_cves[cve_id]["abbrev-status"], | ||
194 | patched_cves[cve_id]["status"], | ||
195 | ) | ||
196 | ) | ||
197 | patched_cves[cve_id] = { | ||
198 | "abbrev-status": decoded_status["mapping"], | ||
199 | "status": decoded_status["detail"], | ||
200 | "justification": decoded_status["description"], | ||
201 | "affected-vendor": decoded_status["vendor"], | ||
202 | "affected-product": decoded_status["product"], | ||
203 | } | ||
139 | 204 | ||
140 | return patched_cves | 205 | return patched_cves |
141 | 206 | ||
@@ -225,21 +290,89 @@ def convert_cve_version(version): | |||
225 | 290 | ||
226 | return version + update | 291 | return version + update |
227 | 292 | ||
293 | @bb.parse.vardeps("CVE_STATUS", "CVE_CHECK_STATUSMAP") | ||
228 | def decode_cve_status(d, cve): | 294 | def decode_cve_status(d, cve): |
229 | """ | 295 | """ |
230 | Convert CVE_STATUS into status, detail and description. | 296 | Convert CVE_STATUS into status, vendor, product, detail and description. |
231 | """ | 297 | """ |
232 | status = d.getVarFlag("CVE_STATUS", cve) | 298 | status = d.getVarFlag("CVE_STATUS", cve) |
233 | if not status: | 299 | if not status: |
234 | return ("", "", "") | 300 | return {} |
301 | |||
302 | status_split = status.split(':', 4) | ||
303 | status_out = {} | ||
304 | status_out["detail"] = status_split[0] | ||
305 | product = "*" | ||
306 | vendor = "*" | ||
307 | description = "" | ||
308 | if len(status_split) >= 4 and status_split[1].strip() == "cpe": | ||
309 | # Both vendor and product are mandatory if cpe: present, the syntax is then: | ||
310 | # detail: cpe:vendor:product:description | ||
311 | vendor = status_split[2].strip() | ||
312 | product = status_split[3].strip() | ||
313 | description = status_split[4].strip() | ||
314 | elif len(status_split) >= 2 and status_split[1].strip() == "cpe": | ||
315 | # Malformed CPE | ||
316 | bb.warn( | ||
317 | 'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE' | ||
318 | % (cve, status) | ||
319 | ) | ||
320 | else: | ||
321 | # Other case: no CPE, the syntax is then: | ||
322 | # detail: description | ||
323 | description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else "" | ||
235 | 324 | ||
236 | status_split = status.split(':', 1) | 325 | status_out["vendor"] = vendor |
237 | detail = status_split[0] | 326 | status_out["product"] = product |
238 | description = status_split[1].strip() if (len(status_split) > 1) else "" | 327 | status_out["description"] = description |
239 | 328 | ||
329 | detail = status_out["detail"] | ||
240 | status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail) | 330 | status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail) |
241 | if status_mapping is None: | 331 | if status_mapping is None: |
242 | bb.warn('Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched' % (detail, cve, status)) | 332 | bb.warn( |
333 | 'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched' | ||
334 | % (detail, cve, status) | ||
335 | ) | ||
243 | status_mapping = "Unpatched" | 336 | status_mapping = "Unpatched" |
337 | status_out["mapping"] = status_mapping | ||
338 | |||
339 | return status_out | ||
244 | 340 | ||
245 | return (status_mapping, detail, description) | 341 | def has_cve_product_match(detailed_status, products): |
342 | """ | ||
343 | Check product/vendor match between detailed_status from decode_cve_status and a string of | ||
344 | products (like from CVE_PRODUCT) | ||
345 | """ | ||
346 | for product in products.split(): | ||
347 | vendor = "*" | ||
348 | if ":" in product: | ||
349 | vendor, product = product.split(":", 1) | ||
350 | |||
351 | if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \ | ||
352 | (product == detailed_status["product"] or detailed_status["product"] == "*"): | ||
353 | return True | ||
354 | |||
355 | #if no match, return False | ||
356 | return False | ||
357 | |||
358 | def extend_cve_status(d): | ||
359 | # do this only once in case multiple classes use this | ||
360 | if d.getVar("CVE_STATUS_EXTENDED"): | ||
361 | return | ||
362 | d.setVar("CVE_STATUS_EXTENDED", "1") | ||
363 | |||
364 | # Fallback all CVEs from CVE_CHECK_IGNORE to CVE_STATUS | ||
365 | cve_check_ignore = d.getVar("CVE_CHECK_IGNORE") | ||
366 | if cve_check_ignore: | ||
367 | bb.warn("CVE_CHECK_IGNORE is deprecated in favor of CVE_STATUS") | ||
368 | for cve in (d.getVar("CVE_CHECK_IGNORE") or "").split(): | ||
369 | d.setVarFlag("CVE_STATUS", cve, "ignored") | ||
370 | |||
371 | # Process CVE_STATUS_GROUPS to set multiple statuses and optional detail or description at once | ||
372 | for cve_status_group in (d.getVar("CVE_STATUS_GROUPS") or "").split(): | ||
373 | cve_group = d.getVar(cve_status_group) | ||
374 | if cve_group is not None: | ||
375 | for cve in cve_group.split(): | ||
376 | d.setVarFlag("CVE_STATUS", cve, d.getVarFlag(cve_status_group, "status")) | ||
377 | else: | ||
378 | bb.warn("CVE_STATUS_GROUPS contains undefined variable %s" % cve_status_group) | ||