diff options
Diffstat (limited to 'meta/lib/oe/sbom30.py')
-rw-r--r-- | meta/lib/oe/sbom30.py | 1096 |
1 files changed, 1096 insertions, 0 deletions
diff --git a/meta/lib/oe/sbom30.py b/meta/lib/oe/sbom30.py new file mode 100644 index 0000000000..227ac51877 --- /dev/null +++ b/meta/lib/oe/sbom30.py | |||
@@ -0,0 +1,1096 @@ | |||
1 | # | ||
2 | # Copyright OpenEmbedded Contributors | ||
3 | # | ||
4 | # SPDX-License-Identifier: GPL-2.0-only | ||
5 | # | ||
6 | |||
7 | from pathlib import Path | ||
8 | |||
9 | import oe.spdx30 | ||
10 | import bb | ||
11 | import re | ||
12 | import hashlib | ||
13 | import uuid | ||
14 | import os | ||
15 | import oe.spdx_common | ||
16 | from datetime import datetime, timezone | ||
17 | |||
18 | OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/" | ||
19 | |||
20 | VEX_VERSION = "1.0.0" | ||
21 | |||
22 | SPDX_BUILD_TYPE = "http://openembedded.org/bitbake" | ||
23 | |||
24 | OE_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/by-doc-hash/" | ||
25 | OE_DOC_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/doc/" | ||
26 | |||
27 | |||
28 | @oe.spdx30.register(OE_SPDX_BASE + "id-alias") | ||
29 | class OEIdAliasExtension(oe.spdx30.extension_Extension): | ||
30 | """ | ||
31 | This extension allows an Element to provide an internal alias for the SPDX | ||
32 | ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects | ||
33 | created have a unique UUID namespace and the unihash of the task encoded in | ||
34 | their SPDX ID. However, this causes a problem for referencing documents | ||
35 | across recipes, since the taskhash of a dependency may not factor into the | ||
36 | taskhash of the current task and thus the current task won't rebuild and | ||
37 | see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and | ||
38 | tasks). | ||
39 | |||
40 | To help work around this, this extension provides a non-unique alias for an | ||
41 | Element by which it can be referenced from other tasks/recipes. When a | ||
42 | final SBoM is created, references to these aliases will be replaced with | ||
43 | the actual unique SPDX ID. | ||
44 | |||
45 | Most Elements will automatically get an alias created when they are written | ||
46 | out if they do not already have one. To suppress the creation of an alias, | ||
47 | add an extension with a blank `alias` property. | ||
48 | |||
49 | |||
50 | It is in internal extension that should be removed when writing out a final | ||
51 | SBoM | ||
52 | """ | ||
53 | |||
54 | CLOSED = True | ||
55 | INTERNAL = True | ||
56 | |||
57 | @classmethod | ||
58 | def _register_props(cls): | ||
59 | super()._register_props() | ||
60 | cls._add_property( | ||
61 | "alias", | ||
62 | oe.spdx30.StringProp(), | ||
63 | OE_SPDX_BASE + "alias", | ||
64 | max_count=1, | ||
65 | ) | ||
66 | |||
67 | cls._add_property( | ||
68 | "link_name", | ||
69 | oe.spdx30.StringProp(), | ||
70 | OE_SPDX_BASE + "link-name", | ||
71 | max_count=1, | ||
72 | ) | ||
73 | |||
74 | |||
75 | @oe.spdx30.register(OE_SPDX_BASE + "file-name-alias") | ||
76 | class OEFileNameAliasExtension(oe.spdx30.extension_Extension): | ||
77 | CLOSED = True | ||
78 | INTERNAL = True | ||
79 | |||
80 | @classmethod | ||
81 | def _register_props(cls): | ||
82 | super()._register_props() | ||
83 | cls._add_property( | ||
84 | "aliases", | ||
85 | oe.spdx30.ListProp(oe.spdx30.StringProp()), | ||
86 | OE_SPDX_BASE + "filename-alias", | ||
87 | ) | ||
88 | |||
89 | |||
90 | @oe.spdx30.register(OE_SPDX_BASE + "license-scanned") | ||
91 | class OELicenseScannedExtension(oe.spdx30.extension_Extension): | ||
92 | """ | ||
93 | The presence of this extension means the file has already been scanned for | ||
94 | license information | ||
95 | """ | ||
96 | |||
97 | CLOSED = True | ||
98 | INTERNAL = True | ||
99 | |||
100 | |||
101 | @oe.spdx30.register(OE_SPDX_BASE + "document-extension") | ||
102 | class OEDocumentExtension(oe.spdx30.extension_Extension): | ||
103 | """ | ||
104 | This extension is added to a SpdxDocument to indicate various useful bits | ||
105 | of information about its contents | ||
106 | """ | ||
107 | |||
108 | CLOSED = True | ||
109 | |||
110 | @classmethod | ||
111 | def _register_props(cls): | ||
112 | super()._register_props() | ||
113 | cls._add_property( | ||
114 | "is_native", | ||
115 | oe.spdx30.BooleanProp(), | ||
116 | OE_SPDX_BASE + "is-native", | ||
117 | max_count=1, | ||
118 | ) | ||
119 | |||
120 | |||
121 | def spdxid_hash(*items): | ||
122 | h = hashlib.md5() | ||
123 | for i in items: | ||
124 | if isinstance(i, oe.spdx30.Element): | ||
125 | h.update(i._id.encode("utf-8")) | ||
126 | else: | ||
127 | h.update(i.encode("utf-8")) | ||
128 | return h.hexdigest() | ||
129 | |||
130 | |||
131 | def spdx_sde(d): | ||
132 | sde = d.getVar("SOURCE_DATE_EPOCH") | ||
133 | if not sde: | ||
134 | return datetime.now(timezone.utc) | ||
135 | |||
136 | return datetime.fromtimestamp(int(sde), timezone.utc) | ||
137 | |||
138 | |||
139 | def get_element_link_id(e): | ||
140 | """ | ||
141 | Get the string ID which should be used to link to an Element. If the | ||
142 | element has an alias, that will be preferred, otherwise its SPDX ID will be | ||
143 | used. | ||
144 | """ | ||
145 | ext = get_alias(e) | ||
146 | if ext is not None and ext.alias: | ||
147 | return ext.alias | ||
148 | return e._id | ||
149 | |||
150 | |||
151 | def get_alias(obj): | ||
152 | for ext in obj.extension: | ||
153 | if not isinstance(ext, OEIdAliasExtension): | ||
154 | continue | ||
155 | return ext | ||
156 | |||
157 | return None | ||
158 | |||
159 | |||
160 | def hash_id(_id): | ||
161 | return hashlib.sha256(_id.encode("utf-8")).hexdigest() | ||
162 | |||
163 | |||
164 | def to_list(l): | ||
165 | if isinstance(l, set): | ||
166 | l = sorted(list(l)) | ||
167 | |||
168 | if not isinstance(l, (list, tuple)): | ||
169 | raise TypeError("Must be a list or tuple. Got %s" % type(l)) | ||
170 | |||
171 | return l | ||
172 | |||
173 | |||
174 | class ObjectSet(oe.spdx30.SHACLObjectSet): | ||
175 | def __init__(self, d): | ||
176 | super().__init__() | ||
177 | self.d = d | ||
178 | self.alias_prefix = None | ||
179 | |||
180 | def create_index(self): | ||
181 | self.by_sha256_hash = {} | ||
182 | super().create_index() | ||
183 | |||
184 | def add_index(self, obj): | ||
185 | # Check that all elements are given an ID before being inserted | ||
186 | if isinstance(obj, oe.spdx30.Element): | ||
187 | if not obj._id: | ||
188 | raise ValueError("Element missing ID") | ||
189 | |||
190 | alias_ext = get_alias(obj) | ||
191 | if alias_ext is not None and alias_ext.alias: | ||
192 | self.obj_by_id[alias_ext.alias] = obj | ||
193 | |||
194 | for v in obj.verifiedUsing: | ||
195 | if not isinstance(v, oe.spdx30.Hash): | ||
196 | continue | ||
197 | |||
198 | if v.algorithm != oe.spdx30.HashAlgorithm.sha256: | ||
199 | continue | ||
200 | |||
201 | self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj) | ||
202 | |||
203 | super().add_index(obj) | ||
204 | if isinstance(obj, oe.spdx30.SpdxDocument): | ||
205 | self.doc = obj | ||
206 | alias_ext = get_alias(obj) | ||
207 | if alias_ext is not None and alias_ext.alias: | ||
208 | self.alias_prefix = OE_ALIAS_PREFIX + hash_id(alias_ext.alias) + "/" | ||
209 | |||
210 | def __filter_obj(self, obj, attr_filter): | ||
211 | return all(getattr(obj, k) == v for k, v in attr_filter.items()) | ||
212 | |||
213 | def foreach_filter(self, typ, *, match_subclass=True, **attr_filter): | ||
214 | for obj in self.foreach_type(typ, match_subclass=match_subclass): | ||
215 | if self.__filter_obj(obj, attr_filter): | ||
216 | yield obj | ||
217 | |||
218 | def find_filter(self, typ, *, match_subclass=True, **attr_filter): | ||
219 | for obj in self.foreach_filter( | ||
220 | typ, match_subclass=match_subclass, **attr_filter | ||
221 | ): | ||
222 | return obj | ||
223 | return None | ||
224 | |||
225 | def foreach_root(self, typ, **attr_filter): | ||
226 | for obj in self.doc.rootElement: | ||
227 | if not isinstance(obj, typ): | ||
228 | continue | ||
229 | |||
230 | if self.__filter_obj(obj, attr_filter): | ||
231 | yield obj | ||
232 | |||
233 | def find_root(self, typ, **attr_filter): | ||
234 | for obj in self.foreach_root(typ, **attr_filter): | ||
235 | return obj | ||
236 | return None | ||
237 | |||
238 | def add_root(self, obj): | ||
239 | self.add(obj) | ||
240 | self.doc.rootElement.append(obj) | ||
241 | return obj | ||
242 | |||
243 | def is_native(self): | ||
244 | for e in self.doc.extension: | ||
245 | if not isinstance(e, oe.sbom30.OEDocumentExtension): | ||
246 | continue | ||
247 | |||
248 | if e.is_native is not None: | ||
249 | return e.is_native | ||
250 | |||
251 | return False | ||
252 | |||
253 | def set_is_native(self, is_native): | ||
254 | for e in self.doc.extension: | ||
255 | if not isinstance(e, oe.sbom30.OEDocumentExtension): | ||
256 | continue | ||
257 | |||
258 | e.is_native = is_native | ||
259 | return | ||
260 | |||
261 | if is_native: | ||
262 | self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True)) | ||
263 | |||
264 | def add_aliases(self): | ||
265 | for o in self.foreach_type(oe.spdx30.Element): | ||
266 | self.set_element_alias(o) | ||
267 | |||
268 | def new_alias_id(self, obj, replace): | ||
269 | unihash = self.d.getVar("BB_UNIHASH") | ||
270 | namespace = self.get_namespace() | ||
271 | if unihash not in obj._id: | ||
272 | bb.warn(f"Unihash {unihash} not found in {obj._id}") | ||
273 | return None | ||
274 | |||
275 | if namespace not in obj._id: | ||
276 | bb.warn(f"Namespace {namespace} not found in {obj._id}") | ||
277 | return None | ||
278 | |||
279 | return obj._id.replace(unihash, "UNIHASH").replace( | ||
280 | namespace, replace + self.d.getVar("PN") | ||
281 | ) | ||
282 | |||
283 | def remove_internal_extensions(self): | ||
284 | def remove(o): | ||
285 | o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)] | ||
286 | |||
287 | for o in self.foreach_type(oe.spdx30.Element): | ||
288 | remove(o) | ||
289 | |||
290 | if self.doc: | ||
291 | remove(self.doc) | ||
292 | |||
293 | def get_namespace(self): | ||
294 | namespace_uuid = uuid.uuid5( | ||
295 | uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE") | ||
296 | ) | ||
297 | pn = self.d.getVar("PN") | ||
298 | return "%s/%s-%s" % ( | ||
299 | self.d.getVar("SPDX_NAMESPACE_PREFIX"), | ||
300 | pn, | ||
301 | str(uuid.uuid5(namespace_uuid, pn)), | ||
302 | ) | ||
303 | |||
304 | def set_element_alias(self, e): | ||
305 | if not e._id or e._id.startswith("_:"): | ||
306 | return | ||
307 | |||
308 | alias_ext = get_alias(e) | ||
309 | if alias_ext is None: | ||
310 | alias_id = self.new_alias_id(e, self.alias_prefix) | ||
311 | if alias_id is not None: | ||
312 | e.extension.append(OEIdAliasExtension(alias=alias_id)) | ||
313 | elif ( | ||
314 | alias_ext.alias | ||
315 | and not isinstance(e, oe.spdx30.SpdxDocument) | ||
316 | and not alias_ext.alias.startswith(self.alias_prefix) | ||
317 | ): | ||
318 | bb.warn( | ||
319 | f"Element {e._id} has alias {alias_ext.alias}, but it should have prefix {self.alias_prefix}" | ||
320 | ) | ||
321 | |||
322 | def new_spdxid(self, *suffix, include_unihash=True): | ||
323 | items = [self.get_namespace()] | ||
324 | if include_unihash: | ||
325 | unihash = self.d.getVar("BB_UNIHASH") | ||
326 | items.append(unihash) | ||
327 | items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix) | ||
328 | return "/".join(items) | ||
329 | |||
330 | def new_import(self, key): | ||
331 | base = f"SPDX_IMPORTS_{key}" | ||
332 | spdxid = self.d.getVar(f"{base}_spdxid") | ||
333 | if not spdxid: | ||
334 | bb.fatal(f"{key} is not a valid SPDX_IMPORTS key") | ||
335 | |||
336 | for i in self.doc.import_: | ||
337 | if i.externalSpdxId == spdxid: | ||
338 | # Already imported | ||
339 | return spdxid | ||
340 | |||
341 | m = oe.spdx30.ExternalMap(externalSpdxId=spdxid) | ||
342 | |||
343 | uri = self.d.getVar(f"{base}_uri") | ||
344 | if uri: | ||
345 | m.locationHint = uri | ||
346 | |||
347 | for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items(): | ||
348 | value = self.d.getVar(f"{base}_hash_{pyname}") | ||
349 | if value: | ||
350 | m.verifiedUsing.append( | ||
351 | oe.spdx30.Hash( | ||
352 | algorithm=algorithm, | ||
353 | hashValue=value, | ||
354 | ) | ||
355 | ) | ||
356 | |||
357 | self.doc.import_.append(m) | ||
358 | return spdxid | ||
359 | |||
360 | def new_agent(self, varname, *, creation_info=None, add=True): | ||
361 | ref_varname = self.d.getVar(f"{varname}_ref") | ||
362 | if ref_varname: | ||
363 | if ref_varname == varname: | ||
364 | bb.fatal(f"{varname} cannot reference itself") | ||
365 | return self.new_agent(ref_varname, creation_info=creation_info) | ||
366 | |||
367 | import_key = self.d.getVar(f"{varname}_import") | ||
368 | if import_key: | ||
369 | return self.new_import(import_key) | ||
370 | |||
371 | name = self.d.getVar(f"{varname}_name") | ||
372 | if not name: | ||
373 | return None | ||
374 | |||
375 | spdxid = self.new_spdxid("agent", name) | ||
376 | agent = self.find_by_id(spdxid) | ||
377 | if agent is not None: | ||
378 | return agent | ||
379 | |||
380 | agent_type = self.d.getVar("%s_type" % varname) | ||
381 | if agent_type == "person": | ||
382 | agent = oe.spdx30.Person() | ||
383 | elif agent_type == "software": | ||
384 | agent = oe.spdx30.SoftwareAgent() | ||
385 | elif agent_type == "organization": | ||
386 | agent = oe.spdx30.Organization() | ||
387 | elif not agent_type or agent_type == "agent": | ||
388 | agent = oe.spdx30.Agent() | ||
389 | else: | ||
390 | bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname)) | ||
391 | |||
392 | agent._id = spdxid | ||
393 | agent.creationInfo = creation_info or self.doc.creationInfo | ||
394 | agent.name = name | ||
395 | |||
396 | comment = self.d.getVar("%s_comment" % varname) | ||
397 | if comment: | ||
398 | agent.comment = comment | ||
399 | |||
400 | for ( | ||
401 | pyname, | ||
402 | idtype, | ||
403 | ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items(): | ||
404 | value = self.d.getVar("%s_id_%s" % (varname, pyname)) | ||
405 | if value: | ||
406 | agent.externalIdentifier.append( | ||
407 | oe.spdx30.ExternalIdentifier( | ||
408 | externalIdentifierType=idtype, | ||
409 | identifier=value, | ||
410 | ) | ||
411 | ) | ||
412 | |||
413 | if add: | ||
414 | self.add(agent) | ||
415 | |||
416 | return agent | ||
417 | |||
418 | def new_creation_info(self): | ||
419 | creation_info = oe.spdx30.CreationInfo() | ||
420 | |||
421 | name = "%s %s" % ( | ||
422 | self.d.getVar("SPDX_TOOL_NAME"), | ||
423 | self.d.getVar("SPDX_TOOL_VERSION"), | ||
424 | ) | ||
425 | tool = self.add( | ||
426 | oe.spdx30.Tool( | ||
427 | _id=self.new_spdxid("tool", name), | ||
428 | creationInfo=creation_info, | ||
429 | name=name, | ||
430 | ) | ||
431 | ) | ||
432 | |||
433 | authors = [] | ||
434 | for a in self.d.getVar("SPDX_AUTHORS").split(): | ||
435 | varname = "SPDX_AUTHORS_%s" % a | ||
436 | author = self.new_agent(varname, creation_info=creation_info) | ||
437 | |||
438 | if not author: | ||
439 | bb.fatal("Unable to find or create author %s" % a) | ||
440 | |||
441 | authors.append(author) | ||
442 | |||
443 | creation_info.created = spdx_sde(self.d) | ||
444 | creation_info.specVersion = self.d.getVar("SPDX_VERSION") | ||
445 | creation_info.createdBy = authors | ||
446 | creation_info.createdUsing = [tool] | ||
447 | |||
448 | return creation_info | ||
449 | |||
450 | def copy_creation_info(self, copy): | ||
451 | c = oe.spdx30.CreationInfo( | ||
452 | created=spdx_sde(self.d), | ||
453 | specVersion=self.d.getVar("SPDX_VERSION"), | ||
454 | ) | ||
455 | |||
456 | for author in copy.createdBy: | ||
457 | if isinstance(author, str): | ||
458 | c.createdBy.append(author) | ||
459 | else: | ||
460 | c.createdBy.append(author._id) | ||
461 | |||
462 | for tool in copy.createdUsing: | ||
463 | if isinstance(tool, str): | ||
464 | c.createdUsing.append(tool) | ||
465 | else: | ||
466 | c.createdUsing.append(tool._id) | ||
467 | |||
468 | return c | ||
469 | |||
470 | def new_annotation(self, subject, comment, typ): | ||
471 | return self.add( | ||
472 | oe.spdx30.Annotation( | ||
473 | _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)), | ||
474 | creationInfo=self.doc.creationInfo, | ||
475 | annotationType=typ, | ||
476 | subject=subject, | ||
477 | statement=comment, | ||
478 | ) | ||
479 | ) | ||
480 | |||
481 | def _new_relationship( | ||
482 | self, | ||
483 | cls, | ||
484 | from_, | ||
485 | typ, | ||
486 | to, | ||
487 | *, | ||
488 | spdxid_name="relationship", | ||
489 | **props, | ||
490 | ): | ||
491 | from_ = to_list(from_) | ||
492 | to = to_list(to) | ||
493 | |||
494 | if not from_: | ||
495 | return [] | ||
496 | |||
497 | if not to: | ||
498 | to = [oe.spdx30.IndividualElement.NoneElement] | ||
499 | |||
500 | ret = [] | ||
501 | |||
502 | for f in from_: | ||
503 | hash_args = [typ, f] | ||
504 | for k in sorted(props.keys()): | ||
505 | hash_args.append(props[k]) | ||
506 | hash_args.extend(to) | ||
507 | |||
508 | relationship = self.add( | ||
509 | cls( | ||
510 | _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)), | ||
511 | creationInfo=self.doc.creationInfo, | ||
512 | from_=f, | ||
513 | relationshipType=typ, | ||
514 | to=to, | ||
515 | **props, | ||
516 | ) | ||
517 | ) | ||
518 | ret.append(relationship) | ||
519 | |||
520 | return ret | ||
521 | |||
522 | def new_relationship(self, from_, typ, to): | ||
523 | return self._new_relationship(oe.spdx30.Relationship, from_, typ, to) | ||
524 | |||
525 | def new_scoped_relationship(self, from_, typ, scope, to): | ||
526 | return self._new_relationship( | ||
527 | oe.spdx30.LifecycleScopedRelationship, | ||
528 | from_, | ||
529 | typ, | ||
530 | to, | ||
531 | scope=scope, | ||
532 | ) | ||
533 | |||
534 | def new_license_expression( | ||
535 | self, license_expression, license_data, license_text_map={} | ||
536 | ): | ||
537 | license_list_version = license_data["licenseListVersion"] | ||
538 | # SPDX 3 requires that the license list version be a semver | ||
539 | # MAJOR.MINOR.MICRO, but the actual license version might be | ||
540 | # MAJOR.MINOR on some older versions. As such, manually append a .0 | ||
541 | # micro version if its missing to keep SPDX happy | ||
542 | if license_list_version.count(".") < 2: | ||
543 | license_list_version += ".0" | ||
544 | |||
545 | spdxid = [ | ||
546 | "license", | ||
547 | license_list_version, | ||
548 | re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression), | ||
549 | ] | ||
550 | |||
551 | license_text = [ | ||
552 | (k, license_text_map[k]) for k in sorted(license_text_map.keys()) | ||
553 | ] | ||
554 | |||
555 | if not license_text: | ||
556 | lic = self.find_filter( | ||
557 | oe.spdx30.simplelicensing_LicenseExpression, | ||
558 | simplelicensing_licenseExpression=license_expression, | ||
559 | simplelicensing_licenseListVersion=license_list_version, | ||
560 | ) | ||
561 | if lic is not None: | ||
562 | return lic | ||
563 | else: | ||
564 | spdxid.append(spdxid_hash(*(v for _, v in license_text))) | ||
565 | lic = self.find_by_id(self.new_spdxid(*spdxid)) | ||
566 | if lic is not None: | ||
567 | return lic | ||
568 | |||
569 | lic = self.add( | ||
570 | oe.spdx30.simplelicensing_LicenseExpression( | ||
571 | _id=self.new_spdxid(*spdxid), | ||
572 | creationInfo=self.doc.creationInfo, | ||
573 | simplelicensing_licenseExpression=license_expression, | ||
574 | simplelicensing_licenseListVersion=license_list_version, | ||
575 | ) | ||
576 | ) | ||
577 | |||
578 | for key, value in license_text: | ||
579 | lic.simplelicensing_customIdToUri.append( | ||
580 | oe.spdx30.DictionaryEntry(key=key, value=value) | ||
581 | ) | ||
582 | |||
583 | return lic | ||
584 | |||
585 | def scan_declared_licenses(self, spdx_file, filepath, license_data): | ||
586 | for e in spdx_file.extension: | ||
587 | if isinstance(e, OELicenseScannedExtension): | ||
588 | return | ||
589 | |||
590 | file_licenses = set() | ||
591 | for extracted_lic in oe.spdx_common.extract_licenses(filepath): | ||
592 | lic = self.new_license_expression(extracted_lic, license_data) | ||
593 | self.set_element_alias(lic) | ||
594 | file_licenses.add(lic) | ||
595 | |||
596 | self.new_relationship( | ||
597 | [spdx_file], | ||
598 | oe.spdx30.RelationshipType.hasDeclaredLicense, | ||
599 | [oe.sbom30.get_element_link_id(lic_alias) for lic_alias in file_licenses], | ||
600 | ) | ||
601 | spdx_file.extension.append(OELicenseScannedExtension()) | ||
602 | |||
603 | def new_file(self, _id, name, path, *, purposes=[]): | ||
604 | sha256_hash = bb.utils.sha256_file(path) | ||
605 | |||
606 | for f in self.by_sha256_hash.get(sha256_hash, []): | ||
607 | if not isinstance(f, oe.spdx30.software_File): | ||
608 | continue | ||
609 | |||
610 | if purposes: | ||
611 | new_primary = purposes[0] | ||
612 | new_additional = [] | ||
613 | |||
614 | if f.software_primaryPurpose: | ||
615 | new_additional.append(f.software_primaryPurpose) | ||
616 | new_additional.extend(f.software_additionalPurpose) | ||
617 | |||
618 | new_additional = sorted( | ||
619 | list(set(p for p in new_additional if p != new_primary)) | ||
620 | ) | ||
621 | |||
622 | f.software_primaryPurpose = new_primary | ||
623 | f.software_additionalPurpose = new_additional | ||
624 | |||
625 | if f.name != name: | ||
626 | for e in f.extension: | ||
627 | if isinstance(e, OEFileNameAliasExtension): | ||
628 | e.aliases.append(name) | ||
629 | break | ||
630 | else: | ||
631 | f.extension.append(OEFileNameAliasExtension(aliases=[name])) | ||
632 | |||
633 | return f | ||
634 | |||
635 | spdx_file = oe.spdx30.software_File( | ||
636 | _id=_id, | ||
637 | creationInfo=self.doc.creationInfo, | ||
638 | name=name, | ||
639 | ) | ||
640 | if purposes: | ||
641 | spdx_file.software_primaryPurpose = purposes[0] | ||
642 | spdx_file.software_additionalPurpose = purposes[1:] | ||
643 | |||
644 | spdx_file.verifiedUsing.append( | ||
645 | oe.spdx30.Hash( | ||
646 | algorithm=oe.spdx30.HashAlgorithm.sha256, | ||
647 | hashValue=sha256_hash, | ||
648 | ) | ||
649 | ) | ||
650 | |||
651 | return self.add(spdx_file) | ||
652 | |||
653 | def new_cve_vuln(self, cve): | ||
654 | v = oe.spdx30.security_Vulnerability() | ||
655 | v._id = self.new_spdxid("vulnerability", cve) | ||
656 | v.creationInfo = self.doc.creationInfo | ||
657 | |||
658 | v.externalIdentifier.append( | ||
659 | oe.spdx30.ExternalIdentifier( | ||
660 | externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve, | ||
661 | identifier=cve, | ||
662 | identifierLocator=[ | ||
663 | f"https://cveawg.mitre.org/api/cve/{cve}", | ||
664 | f"https://www.cve.org/CVERecord?id={cve}", | ||
665 | ], | ||
666 | ) | ||
667 | ) | ||
668 | return self.add(v) | ||
669 | |||
670 | def new_vex_patched_relationship(self, from_, to): | ||
671 | return self._new_relationship( | ||
672 | oe.spdx30.security_VexFixedVulnAssessmentRelationship, | ||
673 | from_, | ||
674 | oe.spdx30.RelationshipType.fixedIn, | ||
675 | to, | ||
676 | spdxid_name="vex-fixed", | ||
677 | security_vexVersion=VEX_VERSION, | ||
678 | ) | ||
679 | |||
680 | def new_vex_unpatched_relationship(self, from_, to): | ||
681 | return self._new_relationship( | ||
682 | oe.spdx30.security_VexAffectedVulnAssessmentRelationship, | ||
683 | from_, | ||
684 | oe.spdx30.RelationshipType.affects, | ||
685 | to, | ||
686 | spdxid_name="vex-affected", | ||
687 | security_vexVersion=VEX_VERSION, | ||
688 | security_actionStatement="Mitigation action unknown", | ||
689 | ) | ||
690 | |||
691 | def new_vex_ignored_relationship(self, from_, to, *, impact_statement): | ||
692 | return self._new_relationship( | ||
693 | oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship, | ||
694 | from_, | ||
695 | oe.spdx30.RelationshipType.doesNotAffect, | ||
696 | to, | ||
697 | spdxid_name="vex-not-affected", | ||
698 | security_vexVersion=VEX_VERSION, | ||
699 | security_impactStatement=impact_statement, | ||
700 | ) | ||
701 | |||
702 | def import_bitbake_build_objset(self): | ||
703 | deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX")) | ||
704 | bb_objset = load_jsonld( | ||
705 | self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True | ||
706 | ) | ||
707 | self.doc.import_.extend(bb_objset.doc.import_) | ||
708 | self.update(bb_objset.objects) | ||
709 | |||
710 | return bb_objset | ||
711 | |||
712 | def import_bitbake_build(self): | ||
713 | def find_bitbake_build(objset): | ||
714 | return objset.find_filter( | ||
715 | oe.spdx30.build_Build, | ||
716 | build_buildType=SPDX_BUILD_TYPE, | ||
717 | ) | ||
718 | |||
719 | build = find_bitbake_build(self) | ||
720 | if build: | ||
721 | return build | ||
722 | |||
723 | bb_objset = self.import_bitbake_build_objset() | ||
724 | build = find_bitbake_build(bb_objset) | ||
725 | if build is None: | ||
726 | bb.fatal(f"No build found in {deploy_dir_spdx}") | ||
727 | |||
728 | return build | ||
729 | |||
730 | def new_task_build(self, name, typ): | ||
731 | current_task = self.d.getVar("BB_CURRENTTASK") | ||
732 | pn = self.d.getVar("PN") | ||
733 | |||
734 | build = self.add( | ||
735 | oe.spdx30.build_Build( | ||
736 | _id=self.new_spdxid("build", name), | ||
737 | creationInfo=self.doc.creationInfo, | ||
738 | name=f"{pn}:do_{current_task}:{name}", | ||
739 | build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}", | ||
740 | ) | ||
741 | ) | ||
742 | |||
743 | if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1": | ||
744 | bitbake_build = self.import_bitbake_build() | ||
745 | |||
746 | self.new_relationship( | ||
747 | [bitbake_build], | ||
748 | oe.spdx30.RelationshipType.ancestorOf, | ||
749 | [build], | ||
750 | ) | ||
751 | |||
752 | if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1": | ||
753 | for varname in sorted(self.d.keys()): | ||
754 | if varname.startswith("__"): | ||
755 | continue | ||
756 | |||
757 | value = self.d.getVar(varname, expand=False) | ||
758 | |||
759 | # TODO: Deal with non-string values | ||
760 | if not isinstance(value, str): | ||
761 | continue | ||
762 | |||
763 | build.build_parameter.append( | ||
764 | oe.spdx30.DictionaryEntry(key=varname, value=value) | ||
765 | ) | ||
766 | |||
767 | return build | ||
768 | |||
769 | def new_archive(self, archive_name): | ||
770 | return self.add( | ||
771 | oe.spdx30.software_File( | ||
772 | _id=self.new_spdxid("archive", str(archive_name)), | ||
773 | creationInfo=self.doc.creationInfo, | ||
774 | name=str(archive_name), | ||
775 | software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive, | ||
776 | ) | ||
777 | ) | ||
778 | |||
779 | @classmethod | ||
780 | def new_objset(cls, d, name, copy_from_bitbake_doc=True): | ||
781 | objset = cls(d) | ||
782 | |||
783 | document = oe.spdx30.SpdxDocument( | ||
784 | _id=objset.new_spdxid("document", name), | ||
785 | name=name, | ||
786 | ) | ||
787 | |||
788 | document.extension.append( | ||
789 | OEIdAliasExtension( | ||
790 | alias=objset.new_alias_id( | ||
791 | document, | ||
792 | OE_DOC_ALIAS_PREFIX + d.getVar("PN") + "/" + name + "/", | ||
793 | ), | ||
794 | ) | ||
795 | ) | ||
796 | objset.doc = document | ||
797 | objset.add_index(document) | ||
798 | |||
799 | if copy_from_bitbake_doc: | ||
800 | bb_objset = objset.import_bitbake_build_objset() | ||
801 | document.creationInfo = objset.copy_creation_info( | ||
802 | bb_objset.doc.creationInfo | ||
803 | ) | ||
804 | else: | ||
805 | document.creationInfo = objset.new_creation_info() | ||
806 | |||
807 | return objset | ||
808 | |||
809 | def expand_collection(self, *, add_objectsets=[]): | ||
810 | """ | ||
811 | Expands a collection to pull in all missing elements | ||
812 | |||
813 | Returns the set of ids that could not be found to link into the document | ||
814 | """ | ||
815 | missing_spdxids = set() | ||
816 | imports = {e.externalSpdxId: e for e in self.doc.import_} | ||
817 | |||
818 | def merge_doc(other): | ||
819 | nonlocal imports | ||
820 | |||
821 | for e in other.doc.import_: | ||
822 | if not e.externalSpdxId in imports: | ||
823 | imports[e.externalSpdxId] = e | ||
824 | |||
825 | self.objects |= other.objects | ||
826 | |||
827 | for o in add_objectsets: | ||
828 | merge_doc(o) | ||
829 | |||
830 | needed_spdxids = self.link() | ||
831 | provided_spdxids = set(self.obj_by_id.keys()) | ||
832 | |||
833 | while True: | ||
834 | import_spdxids = set(imports.keys()) | ||
835 | searching_spdxids = ( | ||
836 | needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids | ||
837 | ) | ||
838 | if not searching_spdxids: | ||
839 | break | ||
840 | |||
841 | spdxid = searching_spdxids.pop() | ||
842 | bb.debug( | ||
843 | 1, | ||
844 | f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}", | ||
845 | ) | ||
846 | dep_objset, dep_path = find_by_spdxid(self.d, spdxid) | ||
847 | |||
848 | if dep_objset: | ||
849 | dep_provided = set(dep_objset.obj_by_id.keys()) | ||
850 | if spdxid not in dep_provided: | ||
851 | bb.fatal(f"{spdxid} not found in {dep_path}") | ||
852 | provided_spdxids |= dep_provided | ||
853 | needed_spdxids |= dep_objset.missing_ids | ||
854 | merge_doc(dep_objset) | ||
855 | else: | ||
856 | missing_spdxids.add(spdxid) | ||
857 | |||
858 | self.doc.import_ = sorted(imports.values(), key=lambda e: e.externalSpdxId) | ||
859 | bb.debug(1, "Linking...") | ||
860 | self.link() | ||
861 | |||
862 | # Manually go through all of the simplelicensing_customIdToUri DictionaryEntry | ||
863 | # items and resolve any aliases to actual objects. | ||
864 | for lic in self.foreach_type(oe.spdx30.simplelicensing_LicenseExpression): | ||
865 | for d in lic.simplelicensing_customIdToUri: | ||
866 | if d.value.startswith(OE_ALIAS_PREFIX): | ||
867 | obj = self.find_by_id(d.value) | ||
868 | if obj is not None: | ||
869 | d.value = obj._id | ||
870 | else: | ||
871 | self.missing_ids.add(d.value) | ||
872 | |||
873 | self.missing_ids -= set(imports.keys()) | ||
874 | return self.missing_ids | ||
875 | |||
876 | |||
877 | def load_jsonld(d, path, required=False): | ||
878 | deserializer = oe.spdx30.JSONLDDeserializer() | ||
879 | objset = ObjectSet(d) | ||
880 | try: | ||
881 | with path.open("rb") as f: | ||
882 | deserializer.read(f, objset) | ||
883 | except FileNotFoundError: | ||
884 | if required: | ||
885 | bb.fatal("No SPDX document named %s found" % path) | ||
886 | return None | ||
887 | |||
888 | if not objset.doc: | ||
889 | bb.fatal("SPDX Document %s has no SPDXDocument element" % path) | ||
890 | return None | ||
891 | |||
892 | objset.objects.remove(objset.doc) | ||
893 | return objset | ||
894 | |||
895 | |||
896 | def jsonld_arch_path(d, arch, subdir, name, deploydir=None): | ||
897 | if deploydir is None: | ||
898 | deploydir = Path(d.getVar("DEPLOY_DIR_SPDX")) | ||
899 | return deploydir / arch / subdir / (name + ".spdx.json") | ||
900 | |||
901 | |||
902 | def jsonld_hash_path(h): | ||
903 | return Path("by-spdxid-hash") / h[:2], h | ||
904 | |||
905 | |||
906 | def load_jsonld_by_arch(d, arch, subdir, name, *, required=False): | ||
907 | path = jsonld_arch_path(d, arch, subdir, name) | ||
908 | objset = load_jsonld(d, path, required=required) | ||
909 | if objset is not None: | ||
910 | return (objset, path) | ||
911 | return (None, None) | ||
912 | |||
913 | |||
914 | def find_jsonld(d, subdir, name, *, required=False): | ||
915 | package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split() | ||
916 | package_archs.reverse() | ||
917 | |||
918 | for arch in package_archs: | ||
919 | objset, path = load_jsonld_by_arch(d, arch, subdir, name) | ||
920 | if objset is not None: | ||
921 | return (objset, path) | ||
922 | |||
923 | if required: | ||
924 | bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name)) | ||
925 | |||
926 | return (None, None) | ||
927 | |||
928 | |||
929 | def write_jsonld_doc(d, objset, dest): | ||
930 | if not isinstance(objset, ObjectSet): | ||
931 | bb.fatal("Only an ObjsetSet can be serialized") | ||
932 | return | ||
933 | |||
934 | if not objset.doc: | ||
935 | bb.fatal("ObjectSet is missing a SpdxDocument") | ||
936 | return | ||
937 | |||
938 | objset.doc.rootElement = sorted(list(set(objset.doc.rootElement))) | ||
939 | objset.doc.profileConformance = sorted( | ||
940 | list( | ||
941 | getattr(oe.spdx30.ProfileIdentifierType, p) | ||
942 | for p in d.getVar("SPDX_PROFILES").split() | ||
943 | ) | ||
944 | ) | ||
945 | |||
946 | dest.parent.mkdir(exist_ok=True, parents=True) | ||
947 | |||
948 | if d.getVar("SPDX_PRETTY") == "1": | ||
949 | serializer = oe.spdx30.JSONLDSerializer( | ||
950 | indent=2, | ||
951 | ) | ||
952 | else: | ||
953 | serializer = oe.spdx30.JSONLDInlineSerializer() | ||
954 | |||
955 | objset.objects.add(objset.doc) | ||
956 | with dest.open("wb") as f: | ||
957 | serializer.write(objset, f, force_at_graph=True) | ||
958 | objset.objects.remove(objset.doc) | ||
959 | |||
960 | |||
961 | def write_recipe_jsonld_doc( | ||
962 | d, | ||
963 | objset, | ||
964 | subdir, | ||
965 | deploydir, | ||
966 | *, | ||
967 | create_spdx_id_links=True, | ||
968 | ): | ||
969 | pkg_arch = d.getVar("SSTATE_PKGARCH") | ||
970 | |||
971 | dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir) | ||
972 | |||
973 | def link_id(_id): | ||
974 | hash_path = jsonld_hash_path(hash_id(_id)) | ||
975 | |||
976 | link_name = jsonld_arch_path( | ||
977 | d, | ||
978 | pkg_arch, | ||
979 | *hash_path, | ||
980 | deploydir=deploydir, | ||
981 | ) | ||
982 | try: | ||
983 | link_name.parent.mkdir(exist_ok=True, parents=True) | ||
984 | link_name.symlink_to(os.path.relpath(dest, link_name.parent)) | ||
985 | except: | ||
986 | target = link_name.readlink() | ||
987 | bb.warn( | ||
988 | f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}" | ||
989 | ) | ||
990 | raise | ||
991 | |||
992 | return hash_path[-1] | ||
993 | |||
994 | objset.add_aliases() | ||
995 | |||
996 | try: | ||
997 | if create_spdx_id_links: | ||
998 | alias_ext = get_alias(objset.doc) | ||
999 | if alias_ext is not None and alias_ext.alias: | ||
1000 | alias_ext.link_name = link_id(alias_ext.alias) | ||
1001 | |||
1002 | finally: | ||
1003 | # It is really helpful for debugging if the JSON document is written | ||
1004 | # out, so always do that even if there is an error making the links | ||
1005 | write_jsonld_doc(d, objset, dest) | ||
1006 | |||
1007 | |||
1008 | def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter): | ||
1009 | objset, fn = find_jsonld(d, subdir, fn_name, required=True) | ||
1010 | |||
1011 | spdx_obj = objset.find_root(obj_type, **attr_filter) | ||
1012 | if not spdx_obj: | ||
1013 | bb.fatal("No root %s found in %s" % (obj_type.__name__, fn)) | ||
1014 | |||
1015 | return spdx_obj, objset | ||
1016 | |||
1017 | |||
1018 | def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter): | ||
1019 | objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True) | ||
1020 | |||
1021 | spdx_obj = objset.find_filter(obj_type, **attr_filter) | ||
1022 | if not spdx_obj: | ||
1023 | bb.fatal("No %s found in %s" % (obj_type.__name__, fn)) | ||
1024 | |||
1025 | return spdx_obj, objset | ||
1026 | |||
1027 | |||
1028 | def find_by_spdxid(d, spdxid, *, required=False): | ||
1029 | if spdxid.startswith(OE_ALIAS_PREFIX): | ||
1030 | h = spdxid[len(OE_ALIAS_PREFIX) :].split("/", 1)[0] | ||
1031 | return find_jsonld(d, *jsonld_hash_path(h), required=required) | ||
1032 | return find_jsonld(d, *jsonld_hash_path(hash_id(spdxid)), required=required) | ||
1033 | |||
1034 | |||
1035 | def create_sbom(d, name, root_elements, add_objectsets=[]): | ||
1036 | objset = ObjectSet.new_objset(d, name) | ||
1037 | |||
1038 | sbom = objset.add( | ||
1039 | oe.spdx30.software_Sbom( | ||
1040 | _id=objset.new_spdxid("sbom", name), | ||
1041 | name=name, | ||
1042 | creationInfo=objset.doc.creationInfo, | ||
1043 | software_sbomType=[oe.spdx30.software_SbomType.build], | ||
1044 | rootElement=root_elements, | ||
1045 | ) | ||
1046 | ) | ||
1047 | |||
1048 | missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets) | ||
1049 | if missing_spdxids: | ||
1050 | bb.warn( | ||
1051 | "The following SPDX IDs were unable to be resolved:\n " | ||
1052 | + "\n ".join(sorted(list(missing_spdxids))) | ||
1053 | ) | ||
1054 | |||
1055 | # Filter out internal extensions from final SBoMs | ||
1056 | objset.remove_internal_extensions() | ||
1057 | |||
1058 | # SBoM should be the only root element of the document | ||
1059 | objset.doc.rootElement = [sbom] | ||
1060 | |||
1061 | # De-duplicate licenses | ||
1062 | unique = set() | ||
1063 | dedup = {} | ||
1064 | for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression): | ||
1065 | for u in unique: | ||
1066 | if ( | ||
1067 | u.simplelicensing_licenseExpression | ||
1068 | == lic.simplelicensing_licenseExpression | ||
1069 | and u.simplelicensing_licenseListVersion | ||
1070 | == lic.simplelicensing_licenseListVersion | ||
1071 | ): | ||
1072 | dedup[lic] = u | ||
1073 | break | ||
1074 | else: | ||
1075 | unique.add(lic) | ||
1076 | |||
1077 | if dedup: | ||
1078 | for rel in objset.foreach_filter( | ||
1079 | oe.spdx30.Relationship, | ||
1080 | relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense, | ||
1081 | ): | ||
1082 | rel.to = [dedup.get(to, to) for to in rel.to] | ||
1083 | |||
1084 | for rel in objset.foreach_filter( | ||
1085 | oe.spdx30.Relationship, | ||
1086 | relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense, | ||
1087 | ): | ||
1088 | rel.to = [dedup.get(to, to) for to in rel.to] | ||
1089 | |||
1090 | for k, v in dedup.items(): | ||
1091 | bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}") | ||
1092 | objset.objects.remove(k) | ||
1093 | |||
1094 | objset.create_index() | ||
1095 | |||
1096 | return objset, sbom | ||