summaryrefslogtreecommitdiffstats
path: root/scripts/cvert.py
diff options
context:
space:
mode:
authorAndrii Bordunov via Openembedded-core <openembedded-core@lists.openembedded.org>2018-10-10 19:25:09 +0300
committerArmin Kuster <akuster808@gmail.com>2019-05-26 21:58:33 -0700
commitfbc9b4607569520c92baf1352041c813606e8524 (patch)
tree29f2cdf5ecc2e7bfdef32d2d7723516ce5ce2d06 /scripts/cvert.py
parentde00a8fd41b576d6b2afc4d457ed6f3f6eeb273a (diff)
downloadmeta-security-fbc9b4607569520c92baf1352041c813606e8524.tar.gz
cve-report: add scripts to generate CVE reports
cvert-foss - generate CVE report for the list of packages. Analyze the whole image manifest to align with the complex CPE configurations. cvert-update - update NVD feeds and store CVE structues dump. CVE dump is a pickled representation of the cve_struct dictionary. cvert.py - python library used by cvert-* scripts. NVD JSON Vulnerability Feeds https://nvd.nist.gov/vuln/data-feeds#JSON_FEED Usage examples: o Download CVE feeds to "nvdfeed" directory % cvert-update nvdfeed o Update CVE feeds and store a dump in a file % cvert-update --store cvedump nvdfeed o Generate a CVE report % cvert-foss --feed-dir nvdfeed --output report-foss.txt cve-manifest o (faster) Use dump file to generate a CVE report % cvert-foss --restore cvedump --output report-foss.txt cve-manifest o Generate a full report % cvert-foss --restore cvedump --show-description --show-reference \ --output report-foss-full.txt cve-manifest Manifest example: bash,4.2,CVE-2014-7187 python,2.7.35, python,3.5.5,CVE-2017-17522 CVE-2018-1061 Report example: patched | 7.5 | CVE-2018-1061 | python | 3.5.5 patched | 10.0 | CVE-2014-7187 | bash | 4.2 patched | 8.8 | CVE-2017-17522 | python | 3.5.5 unpatched | 10.0 | CVE-2014-6271 | bash | 4.2 unpatched | 10.0 | CVE-2014-6277 | bash | 4.2 unpatched | 10.0 | CVE-2014-6278 | bash | 4.2 unpatched | 10.0 | CVE-2014-7169 | bash | 4.2 unpatched | 10.0 | CVE-2014-7186 | bash | 4.2 unpatched | 4.6 | CVE-2012-3410 | bash | 4.2 unpatched | 8.4 | CVE-2016-7543 | bash | 4.2 unpatched | 5.0 | CVE-2010-3492 | python | 2.7.35 unpatched | 5.3 | CVE-2016-1494 | python | 2.7.35 unpatched | 6.5 | CVE-2017-18207 | python | 3.5.5 unpatched | 6.5 | CVE-2017-18207 | python | 2.7.35 unpatched | 7.1 | CVE-2013-7338 | python | 2.7.35 unpatched | 7.5 | CVE-2018-1060 | python | 3.5.5 unpatched | 8.8 | CVE-2017-17522 | python | 2.7.35 Signed-off-by: grygorii tertychnyi <gtertych@cisco.com> Signed-off-by: Armin Kuster <akuster808@gmail.com>
Diffstat (limited to 'scripts/cvert.py')
-rw-r--r--scripts/cvert.py473
1 files changed, 473 insertions, 0 deletions
diff --git a/scripts/cvert.py b/scripts/cvert.py
new file mode 100644
index 0000000..f93b95c
--- /dev/null
+++ b/scripts/cvert.py
@@ -0,0 +1,473 @@
1#!/usr/bin/env python3
2#
3# Copyright (c) 2018 by Cisco Systems, Inc.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License version 2 as
7# published by the Free Software Foundation.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License along
15# with this program; if not, write to the Free Software Foundation, Inc.,
16# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17#
18
19""" CVERT library: set of functions for CVE reports
20"""
21
22
23import os
24import re
25import sys
26import json
27import gzip
28import pickle
29import logging
30import hashlib
31import datetime
32import textwrap
33import urllib.request
34import distutils.version
35
36
37logging.getLogger(__name__).addHandler(logging.NullHandler())
38
39
40def generate_report(manifest, cve_struct):
41 """Generate CVE report"""
42
43 report = []
44
45 for cve in cve_struct:
46 affected = set()
47
48 for conf in cve_struct[cve]["nodes"]:
49 affected = affected.union(process_configuration(manifest, conf))
50
51 for key in affected:
52 product, version = key.split(",")
53 patched = manifest[product][version]
54
55 if cve in patched:
56 cve_item = {"status": "patched"}
57 else:
58 cve_item = {"status": "unpatched"}
59
60 cve_item["CVSS"] = "{0:.1f}".format(cve_struct[cve]["score"])
61 cve_item["CVE"] = cve
62 cve_item["product"] = product
63 cve_item["version"] = version
64 cve_item["description"] = cve_struct[cve]["description"]
65 cve_item["reference"] = [x["url"] for x in cve_struct[cve]["reference"]]
66
67 logging.debug("%9s %s %s,%s",
68 cve_item["status"], cve_item["CVE"],
69 cve_item["product"], cve_item["version"])
70
71 report.append(cve_item)
72
73 return sorted(report, key=lambda x: (x["status"], x["product"], x["CVSS"], x["CVE"]))
74
75
76def process_configuration(manifest, conf):
77 """Recursive call to process all CVE configurations"""
78
79 operator = conf["operator"]
80
81 if operator not in ["OR", "AND"]:
82 raise ValueError("operator {} is not supported".format(operator))
83
84 operator = True if operator == "AND" else False
85 match = False
86 affected = set()
87
88 if "cpe" in conf:
89 match = process_cpe(manifest, conf["cpe"][0], affected)
90
91 for cpe in conf["cpe"][1:]:
92 package_match = process_cpe(manifest, cpe, affected)
93
94 # match = match <operator> package_match
95 match = operator ^ ((operator ^ match) or (operator ^ package_match))
96 elif "children" in conf:
97 product_set = process_configuration(manifest, conf["children"][0])
98
99 if product_set:
100 match = True
101 affected = affected.union(product_set)
102
103 for child in conf["children"][1:]:
104 product_set = process_configuration(manifest, child)
105 package_match = True if product_set else False
106
107 # match = match OP package_match
108 match = operator ^ ((operator ^ match) or (operator ^ package_match))
109
110 if package_match:
111 affected = affected.union(product_set)
112
113 if match:
114 return affected
115
116 return ()
117
118
119def process_cpe(manifest, cpe, affected):
120 """Match CPE with all manifest packages"""
121
122 if not cpe["vulnerable"]:
123 # ignore non vulnerable part
124 return False
125
126 version_range = {}
127
128 for flag in ["versionStartIncluding",
129 "versionStartExcluding",
130 "versionEndIncluding",
131 "versionEndExcluding"]:
132 if flag in cpe:
133 version_range[flag] = cpe[flag]
134
135 # take only "product" and "version"
136 product, version = cpe["cpe23Uri"].split(":")[4:6]
137
138 if product not in manifest:
139 return False
140
141 if not version_range:
142 if version == "*":
143 # ignore CVEs that touches all versions of package,
144 # can not fix it anyway
145 logging.debug('ignore "*" in %s', cpe["cpe23Uri"])
146 return False
147 elif version == "-":
148 # "-" means NA
149 #
150 # NA (i.e. "not applicable/not used"). The logical value NA
151 # SHOULD be assigned when there is no legal or meaningful
152 # value for that attribute, or when that attribute is not
153 # used as part of the description.
154 # This includes the situation in which an attribute has
155 # an obtainable value that is null
156 #
157 # Ignores CVEs if version is not set
158 logging.debug('ignore "-" in %s', cpe["cpe23Uri"])
159 return False
160 else:
161 version_range["versionExactMatch"] = version
162
163 result = False
164
165 for version in manifest[product]:
166 try:
167 if match_version(version,
168 version_range):
169 logging.debug("match %s %s: %s", product, version, cpe["cpe23Uri"])
170 affected.add("{},{}".format(product, version))
171
172 result = True
173 except TypeError:
174 # version comparison is a very tricky
175 # sometimes provider changes product version in a strange manner
176 # and the above comparison just failed
177 # so here we try to make version string "more standard"
178
179 if match_version(twik_version(version),
180 [twik_version(v) for v in version_range]):
181 logging.debug("match %s %s (twiked): %s", product, twik_version(version),
182 cpe["cpe23Uri"])
183 affected.add("{},{}".format(product, version))
184
185 result = True
186
187 return result
188
189
190def match_version(version, vrange):
191 """Match version with the version range"""
192
193 result = False
194 version = util_version(version)
195
196 if "versionExactMatch" in vrange:
197 if version == util_version(vrange["versionExactMatch"]):
198 result = True
199 else:
200 result = True
201
202 if "versionStartIncluding" in vrange:
203 result = result and version >= util_version(vrange["versionStartIncluding"])
204
205 if "versionStartExcluding" in vrange:
206 result = result and version > util_version(vrange["versionStartExcluding"])
207
208 if "versionEndIncluding" in vrange:
209 result = result and version <= util_version(vrange["versionEndIncluding"])
210
211 if "versionEndExcluding" in vrange:
212 result = result and version < util_version(vrange["versionEndExcluding"])
213
214 return result
215
216
217def util_version(version):
218 """Simplify package version"""
219 return distutils.version.LooseVersion(version.split("+git")[0])
220
221
222def twik_version(version):
223 """Return "standard" version for complex cases"""
224 return "v1" + re.sub(r"^[a-zA-Z]+", "", version)
225
226
227def print_report(report, width=70, show_description=False, show_reference=False, output=sys.stdout):
228 """Print out final report"""
229
230 for cve in report:
231 print("{0:>9s} | {1:>4s} | {2:18s} | {3} | {4}".format(cve["status"], cve["CVSS"],
232 cve["CVE"], cve["product"],
233 cve["version"]),
234 file=output)
235
236 if show_description:
237 print("{0:>9s} + {1}".format(" ", "Description"), file=output)
238
239 for lin in textwrap.wrap(cve["description"], width=width):
240 print("{0:>9s} {1}".format(" ", lin), file=output)
241
242 if show_reference:
243 print("{0:>9s} + {1}".format(" ", "Reference"), file=output)
244
245 for url in cve["reference"]:
246 print("{0:>9s} {1}".format(" ", url), file=output)
247
248
249def update_feeds(feed_dir, offline=False, start=2002):
250 """Update all JSON feeds"""
251
252 feed_dir = os.path.realpath(feed_dir)
253 year_now = datetime.datetime.now().year
254 cve_struct = {}
255
256 for year in range(start, year_now + 1):
257 update_year(cve_struct, year, feed_dir, offline)
258
259 return cve_struct
260
261
262def update_year(cve_struct, year, feed_dir, offline):
263 """Update one JSON feed for the particular year"""
264
265 url_prefix = "https://static.nvd.nist.gov/feeds/json/cve/1.0"
266 file_prefix = "nvdcve-1.0-{0}".format(year)
267
268 meta = {
269 "url": "{0}/{1}.meta".format(url_prefix, file_prefix),
270 "file": os.path.join(feed_dir, "{0}.meta".format(file_prefix))
271 }
272
273 feed = {
274 "url": "{0}/{1}.json.gz".format(url_prefix, file_prefix),
275 "file": os.path.join(feed_dir, "{0}.json.gz".format(file_prefix))
276 }
277
278 ctx = {}
279
280 if not offline:
281 ctx = download_feed(meta, feed)
282
283 if not "meta" in ctx or not "feed" in ctx:
284 return
285
286 if not os.path.isfile(meta["file"]):
287 return
288
289 if not os.path.isfile(feed["file"]):
290 return
291
292 if not "meta" in ctx:
293 ctx["meta"] = ctx_meta(meta["file"])
294
295 if not "sha256" in ctx["meta"]:
296 return
297
298 if not "feed" in ctx:
299 ctx["feed"] = ctx_gzip(feed["file"], ctx["meta"]["sha256"])
300
301 if not ctx["feed"]:
302 return
303
304 logging.debug("parsing year %s", year)
305
306 for cve_item in ctx["feed"]["CVE_Items"]:
307 iden, cve = parse_item(cve_item)
308
309 if not iden:
310 continue
311
312 if not cve:
313 logging.error("%s parse error", iden)
314 break
315
316 if iden in cve_struct:
317 logging.error("%s duplicated", iden)
318 break
319
320 cve_struct[iden] = cve
321
322 logging.debug("cve records: %d", len(cve_struct))
323
324
325def ctx_meta(filename):
326 """Parse feed meta file"""
327
328 if not os.path.isfile(filename):
329 return {}
330
331 ctx = {}
332
333 with open(filename) as fil:
334 for lin in fil:
335 pair = lin.split(":", maxsplit=1)
336 ctx[pair[0]] = pair[1].rstrip()
337
338 return ctx
339
340
341def ctx_gzip(filename, checksum=""):
342 """Parse feed archive file"""
343
344 if not os.path.isfile(filename):
345 return {}
346
347 with gzip.open(filename) as fil:
348 try:
349 ctx = fil.read()
350 except (EOFError, OSError):
351 logging.error("failed to process gz archive %s", filename, exc_info=True)
352 return {}
353
354 if checksum and checksum.upper() != hashlib.sha256(ctx).hexdigest().upper():
355 return {}
356
357 return json.loads(ctx.decode())
358
359
360def parse_item(cve_item):
361 """Parse one JSON CVE entry"""
362
363 cve_id = cve_item["cve"]["CVE_data_meta"]["ID"][:]
364 impact = cve_item["impact"]
365
366 if not impact:
367 # REJECTed CVE
368 return None, None
369
370 if "baseMetricV3" in impact:
371 score = impact["baseMetricV3"]["cvssV3"]["baseScore"]
372 elif "baseMetricV2" in impact:
373 score = impact["baseMetricV2"]["cvssV2"]["baseScore"]
374 else:
375 return cve_id, None
376
377 return cve_id, {
378 "score": score,
379 "nodes": cve_item["configurations"]["nodes"][:],
380 "reference": cve_item["cve"]["references"]["reference_data"][:],
381 "description": cve_item["cve"]["description"]["description_data"][0]["value"]
382 }
383
384
385def download_feed(meta, feed):
386 """Download and parse feed"""
387
388 ctx = {}
389
390 if not retrieve_url(meta["url"], meta["file"]):
391 return {}
392
393 ctx["meta"] = ctx_meta(meta["file"])
394
395 if not "sha256" in ctx["meta"]:
396 return {}
397
398 ctx["feed"] = ctx_gzip(feed["file"], ctx["meta"]["sha256"])
399
400 if not ctx["feed"]:
401 if not retrieve_url(feed["url"], feed["file"]):
402 return {}
403
404 ctx["feed"] = ctx_gzip(feed["file"], ctx["meta"]["sha256"])
405
406 return ctx
407
408
409def retrieve_url(url, filename=None):
410 """Download file by URL"""
411
412 if filename:
413 os.makedirs(os.path.dirname(filename), exist_ok=True)
414
415 logging.debug("downloading %s", url)
416
417 try:
418 urllib.request.urlretrieve(url, filename=filename)
419 except urllib.error.HTTPError:
420 logging.error("failed to download URL %s", url, exc_info=True)
421 return False
422
423 return True
424
425
426def logconfig(debug_flag=False):
427 """Return default log config"""
428
429 return {
430 "version": 1,
431 "formatters": {
432 "f": {
433 "format": "# %(asctime)s %% CVERT %% %(levelname)-8s %% %(message)s"
434 }
435 },
436 "handlers": {
437 "h": {
438 "class": "logging.StreamHandler",
439 "formatter": "f",
440 "level": logging.DEBUG if debug_flag else logging.INFO
441 }
442 },
443 "root": {
444 "handlers": ["h"],
445 "level": logging.DEBUG if debug_flag else logging.INFO
446 },
447 }
448
449
450def save_cve(filename, cve_struct):
451 """Save CVE structure in the file"""
452
453 filename = os.path.realpath(filename)
454
455 logging.debug("saving %d CVE records to %s", len(cve_struct), filename)
456
457 with open(filename, "wb") as fil:
458 pickle.dump(cve_struct, fil)
459
460
461def load_cve(filename):
462 """Load CVE structure from the file"""
463
464 filename = os.path.realpath(filename)
465
466 logging.debug("loading from %s", filename)
467
468 with open(filename, "rb") as fil:
469 cve_struct = pickle.load(fil)
470
471 logging.debug("cve records: %d", len(cve_struct))
472
473 return cve_struct