summaryrefslogtreecommitdiffstats
path: root/meta/lib/oe/sbom30.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oe/sbom30.py')
-rw-r--r--meta/lib/oe/sbom30.py1096
1 files changed, 1096 insertions, 0 deletions
diff --git a/meta/lib/oe/sbom30.py b/meta/lib/oe/sbom30.py
new file mode 100644
index 0000000000..227ac51877
--- /dev/null
+++ b/meta/lib/oe/sbom30.py
@@ -0,0 +1,1096 @@
1#
2# Copyright OpenEmbedded Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7from pathlib import Path
8
9import oe.spdx30
10import bb
11import re
12import hashlib
13import uuid
14import os
15import oe.spdx_common
16from datetime import datetime, timezone
17
18OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/"
19
20VEX_VERSION = "1.0.0"
21
22SPDX_BUILD_TYPE = "http://openembedded.org/bitbake"
23
24OE_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/by-doc-hash/"
25OE_DOC_ALIAS_PREFIX = "http://spdxdocs.org/openembedded-alias/doc/"
26
27
28@oe.spdx30.register(OE_SPDX_BASE + "id-alias")
29class OEIdAliasExtension(oe.spdx30.extension_Extension):
30 """
31 This extension allows an Element to provide an internal alias for the SPDX
32 ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects
33 created have a unique UUID namespace and the unihash of the task encoded in
34 their SPDX ID. However, this causes a problem for referencing documents
35 across recipes, since the taskhash of a dependency may not factor into the
36 taskhash of the current task and thus the current task won't rebuild and
37 see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and
38 tasks).
39
40 To help work around this, this extension provides a non-unique alias for an
41 Element by which it can be referenced from other tasks/recipes. When a
42 final SBoM is created, references to these aliases will be replaced with
43 the actual unique SPDX ID.
44
45 Most Elements will automatically get an alias created when they are written
46 out if they do not already have one. To suppress the creation of an alias,
47 add an extension with a blank `alias` property.
48
49
50 It is in internal extension that should be removed when writing out a final
51 SBoM
52 """
53
54 CLOSED = True
55 INTERNAL = True
56
57 @classmethod
58 def _register_props(cls):
59 super()._register_props()
60 cls._add_property(
61 "alias",
62 oe.spdx30.StringProp(),
63 OE_SPDX_BASE + "alias",
64 max_count=1,
65 )
66
67 cls._add_property(
68 "link_name",
69 oe.spdx30.StringProp(),
70 OE_SPDX_BASE + "link-name",
71 max_count=1,
72 )
73
74
75@oe.spdx30.register(OE_SPDX_BASE + "file-name-alias")
76class OEFileNameAliasExtension(oe.spdx30.extension_Extension):
77 CLOSED = True
78 INTERNAL = True
79
80 @classmethod
81 def _register_props(cls):
82 super()._register_props()
83 cls._add_property(
84 "aliases",
85 oe.spdx30.ListProp(oe.spdx30.StringProp()),
86 OE_SPDX_BASE + "filename-alias",
87 )
88
89
90@oe.spdx30.register(OE_SPDX_BASE + "license-scanned")
91class OELicenseScannedExtension(oe.spdx30.extension_Extension):
92 """
93 The presence of this extension means the file has already been scanned for
94 license information
95 """
96
97 CLOSED = True
98 INTERNAL = True
99
100
101@oe.spdx30.register(OE_SPDX_BASE + "document-extension")
102class OEDocumentExtension(oe.spdx30.extension_Extension):
103 """
104 This extension is added to a SpdxDocument to indicate various useful bits
105 of information about its contents
106 """
107
108 CLOSED = True
109
110 @classmethod
111 def _register_props(cls):
112 super()._register_props()
113 cls._add_property(
114 "is_native",
115 oe.spdx30.BooleanProp(),
116 OE_SPDX_BASE + "is-native",
117 max_count=1,
118 )
119
120
121def spdxid_hash(*items):
122 h = hashlib.md5()
123 for i in items:
124 if isinstance(i, oe.spdx30.Element):
125 h.update(i._id.encode("utf-8"))
126 else:
127 h.update(i.encode("utf-8"))
128 return h.hexdigest()
129
130
131def spdx_sde(d):
132 sde = d.getVar("SOURCE_DATE_EPOCH")
133 if not sde:
134 return datetime.now(timezone.utc)
135
136 return datetime.fromtimestamp(int(sde), timezone.utc)
137
138
139def get_element_link_id(e):
140 """
141 Get the string ID which should be used to link to an Element. If the
142 element has an alias, that will be preferred, otherwise its SPDX ID will be
143 used.
144 """
145 ext = get_alias(e)
146 if ext is not None and ext.alias:
147 return ext.alias
148 return e._id
149
150
151def get_alias(obj):
152 for ext in obj.extension:
153 if not isinstance(ext, OEIdAliasExtension):
154 continue
155 return ext
156
157 return None
158
159
160def hash_id(_id):
161 return hashlib.sha256(_id.encode("utf-8")).hexdigest()
162
163
164def to_list(l):
165 if isinstance(l, set):
166 l = sorted(list(l))
167
168 if not isinstance(l, (list, tuple)):
169 raise TypeError("Must be a list or tuple. Got %s" % type(l))
170
171 return l
172
173
174class ObjectSet(oe.spdx30.SHACLObjectSet):
175 def __init__(self, d):
176 super().__init__()
177 self.d = d
178 self.alias_prefix = None
179
180 def create_index(self):
181 self.by_sha256_hash = {}
182 super().create_index()
183
184 def add_index(self, obj):
185 # Check that all elements are given an ID before being inserted
186 if isinstance(obj, oe.spdx30.Element):
187 if not obj._id:
188 raise ValueError("Element missing ID")
189
190 alias_ext = get_alias(obj)
191 if alias_ext is not None and alias_ext.alias:
192 self.obj_by_id[alias_ext.alias] = obj
193
194 for v in obj.verifiedUsing:
195 if not isinstance(v, oe.spdx30.Hash):
196 continue
197
198 if v.algorithm != oe.spdx30.HashAlgorithm.sha256:
199 continue
200
201 self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj)
202
203 super().add_index(obj)
204 if isinstance(obj, oe.spdx30.SpdxDocument):
205 self.doc = obj
206 alias_ext = get_alias(obj)
207 if alias_ext is not None and alias_ext.alias:
208 self.alias_prefix = OE_ALIAS_PREFIX + hash_id(alias_ext.alias) + "/"
209
210 def __filter_obj(self, obj, attr_filter):
211 return all(getattr(obj, k) == v for k, v in attr_filter.items())
212
213 def foreach_filter(self, typ, *, match_subclass=True, **attr_filter):
214 for obj in self.foreach_type(typ, match_subclass=match_subclass):
215 if self.__filter_obj(obj, attr_filter):
216 yield obj
217
218 def find_filter(self, typ, *, match_subclass=True, **attr_filter):
219 for obj in self.foreach_filter(
220 typ, match_subclass=match_subclass, **attr_filter
221 ):
222 return obj
223 return None
224
225 def foreach_root(self, typ, **attr_filter):
226 for obj in self.doc.rootElement:
227 if not isinstance(obj, typ):
228 continue
229
230 if self.__filter_obj(obj, attr_filter):
231 yield obj
232
233 def find_root(self, typ, **attr_filter):
234 for obj in self.foreach_root(typ, **attr_filter):
235 return obj
236 return None
237
238 def add_root(self, obj):
239 self.add(obj)
240 self.doc.rootElement.append(obj)
241 return obj
242
243 def is_native(self):
244 for e in self.doc.extension:
245 if not isinstance(e, oe.sbom30.OEDocumentExtension):
246 continue
247
248 if e.is_native is not None:
249 return e.is_native
250
251 return False
252
253 def set_is_native(self, is_native):
254 for e in self.doc.extension:
255 if not isinstance(e, oe.sbom30.OEDocumentExtension):
256 continue
257
258 e.is_native = is_native
259 return
260
261 if is_native:
262 self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True))
263
264 def add_aliases(self):
265 for o in self.foreach_type(oe.spdx30.Element):
266 self.set_element_alias(o)
267
268 def new_alias_id(self, obj, replace):
269 unihash = self.d.getVar("BB_UNIHASH")
270 namespace = self.get_namespace()
271 if unihash not in obj._id:
272 bb.warn(f"Unihash {unihash} not found in {obj._id}")
273 return None
274
275 if namespace not in obj._id:
276 bb.warn(f"Namespace {namespace} not found in {obj._id}")
277 return None
278
279 return obj._id.replace(unihash, "UNIHASH").replace(
280 namespace, replace + self.d.getVar("PN")
281 )
282
283 def remove_internal_extensions(self):
284 def remove(o):
285 o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)]
286
287 for o in self.foreach_type(oe.spdx30.Element):
288 remove(o)
289
290 if self.doc:
291 remove(self.doc)
292
293 def get_namespace(self):
294 namespace_uuid = uuid.uuid5(
295 uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE")
296 )
297 pn = self.d.getVar("PN")
298 return "%s/%s-%s" % (
299 self.d.getVar("SPDX_NAMESPACE_PREFIX"),
300 pn,
301 str(uuid.uuid5(namespace_uuid, pn)),
302 )
303
304 def set_element_alias(self, e):
305 if not e._id or e._id.startswith("_:"):
306 return
307
308 alias_ext = get_alias(e)
309 if alias_ext is None:
310 alias_id = self.new_alias_id(e, self.alias_prefix)
311 if alias_id is not None:
312 e.extension.append(OEIdAliasExtension(alias=alias_id))
313 elif (
314 alias_ext.alias
315 and not isinstance(e, oe.spdx30.SpdxDocument)
316 and not alias_ext.alias.startswith(self.alias_prefix)
317 ):
318 bb.warn(
319 f"Element {e._id} has alias {alias_ext.alias}, but it should have prefix {self.alias_prefix}"
320 )
321
322 def new_spdxid(self, *suffix, include_unihash=True):
323 items = [self.get_namespace()]
324 if include_unihash:
325 unihash = self.d.getVar("BB_UNIHASH")
326 items.append(unihash)
327 items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix)
328 return "/".join(items)
329
330 def new_import(self, key):
331 base = f"SPDX_IMPORTS_{key}"
332 spdxid = self.d.getVar(f"{base}_spdxid")
333 if not spdxid:
334 bb.fatal(f"{key} is not a valid SPDX_IMPORTS key")
335
336 for i in self.doc.import_:
337 if i.externalSpdxId == spdxid:
338 # Already imported
339 return spdxid
340
341 m = oe.spdx30.ExternalMap(externalSpdxId=spdxid)
342
343 uri = self.d.getVar(f"{base}_uri")
344 if uri:
345 m.locationHint = uri
346
347 for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items():
348 value = self.d.getVar(f"{base}_hash_{pyname}")
349 if value:
350 m.verifiedUsing.append(
351 oe.spdx30.Hash(
352 algorithm=algorithm,
353 hashValue=value,
354 )
355 )
356
357 self.doc.import_.append(m)
358 return spdxid
359
360 def new_agent(self, varname, *, creation_info=None, add=True):
361 ref_varname = self.d.getVar(f"{varname}_ref")
362 if ref_varname:
363 if ref_varname == varname:
364 bb.fatal(f"{varname} cannot reference itself")
365 return self.new_agent(ref_varname, creation_info=creation_info)
366
367 import_key = self.d.getVar(f"{varname}_import")
368 if import_key:
369 return self.new_import(import_key)
370
371 name = self.d.getVar(f"{varname}_name")
372 if not name:
373 return None
374
375 spdxid = self.new_spdxid("agent", name)
376 agent = self.find_by_id(spdxid)
377 if agent is not None:
378 return agent
379
380 agent_type = self.d.getVar("%s_type" % varname)
381 if agent_type == "person":
382 agent = oe.spdx30.Person()
383 elif agent_type == "software":
384 agent = oe.spdx30.SoftwareAgent()
385 elif agent_type == "organization":
386 agent = oe.spdx30.Organization()
387 elif not agent_type or agent_type == "agent":
388 agent = oe.spdx30.Agent()
389 else:
390 bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname))
391
392 agent._id = spdxid
393 agent.creationInfo = creation_info or self.doc.creationInfo
394 agent.name = name
395
396 comment = self.d.getVar("%s_comment" % varname)
397 if comment:
398 agent.comment = comment
399
400 for (
401 pyname,
402 idtype,
403 ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items():
404 value = self.d.getVar("%s_id_%s" % (varname, pyname))
405 if value:
406 agent.externalIdentifier.append(
407 oe.spdx30.ExternalIdentifier(
408 externalIdentifierType=idtype,
409 identifier=value,
410 )
411 )
412
413 if add:
414 self.add(agent)
415
416 return agent
417
418 def new_creation_info(self):
419 creation_info = oe.spdx30.CreationInfo()
420
421 name = "%s %s" % (
422 self.d.getVar("SPDX_TOOL_NAME"),
423 self.d.getVar("SPDX_TOOL_VERSION"),
424 )
425 tool = self.add(
426 oe.spdx30.Tool(
427 _id=self.new_spdxid("tool", name),
428 creationInfo=creation_info,
429 name=name,
430 )
431 )
432
433 authors = []
434 for a in self.d.getVar("SPDX_AUTHORS").split():
435 varname = "SPDX_AUTHORS_%s" % a
436 author = self.new_agent(varname, creation_info=creation_info)
437
438 if not author:
439 bb.fatal("Unable to find or create author %s" % a)
440
441 authors.append(author)
442
443 creation_info.created = spdx_sde(self.d)
444 creation_info.specVersion = self.d.getVar("SPDX_VERSION")
445 creation_info.createdBy = authors
446 creation_info.createdUsing = [tool]
447
448 return creation_info
449
450 def copy_creation_info(self, copy):
451 c = oe.spdx30.CreationInfo(
452 created=spdx_sde(self.d),
453 specVersion=self.d.getVar("SPDX_VERSION"),
454 )
455
456 for author in copy.createdBy:
457 if isinstance(author, str):
458 c.createdBy.append(author)
459 else:
460 c.createdBy.append(author._id)
461
462 for tool in copy.createdUsing:
463 if isinstance(tool, str):
464 c.createdUsing.append(tool)
465 else:
466 c.createdUsing.append(tool._id)
467
468 return c
469
470 def new_annotation(self, subject, comment, typ):
471 return self.add(
472 oe.spdx30.Annotation(
473 _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)),
474 creationInfo=self.doc.creationInfo,
475 annotationType=typ,
476 subject=subject,
477 statement=comment,
478 )
479 )
480
481 def _new_relationship(
482 self,
483 cls,
484 from_,
485 typ,
486 to,
487 *,
488 spdxid_name="relationship",
489 **props,
490 ):
491 from_ = to_list(from_)
492 to = to_list(to)
493
494 if not from_:
495 return []
496
497 if not to:
498 to = [oe.spdx30.IndividualElement.NoneElement]
499
500 ret = []
501
502 for f in from_:
503 hash_args = [typ, f]
504 for k in sorted(props.keys()):
505 hash_args.append(props[k])
506 hash_args.extend(to)
507
508 relationship = self.add(
509 cls(
510 _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)),
511 creationInfo=self.doc.creationInfo,
512 from_=f,
513 relationshipType=typ,
514 to=to,
515 **props,
516 )
517 )
518 ret.append(relationship)
519
520 return ret
521
522 def new_relationship(self, from_, typ, to):
523 return self._new_relationship(oe.spdx30.Relationship, from_, typ, to)
524
525 def new_scoped_relationship(self, from_, typ, scope, to):
526 return self._new_relationship(
527 oe.spdx30.LifecycleScopedRelationship,
528 from_,
529 typ,
530 to,
531 scope=scope,
532 )
533
534 def new_license_expression(
535 self, license_expression, license_data, license_text_map={}
536 ):
537 license_list_version = license_data["licenseListVersion"]
538 # SPDX 3 requires that the license list version be a semver
539 # MAJOR.MINOR.MICRO, but the actual license version might be
540 # MAJOR.MINOR on some older versions. As such, manually append a .0
541 # micro version if its missing to keep SPDX happy
542 if license_list_version.count(".") < 2:
543 license_list_version += ".0"
544
545 spdxid = [
546 "license",
547 license_list_version,
548 re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression),
549 ]
550
551 license_text = [
552 (k, license_text_map[k]) for k in sorted(license_text_map.keys())
553 ]
554
555 if not license_text:
556 lic = self.find_filter(
557 oe.spdx30.simplelicensing_LicenseExpression,
558 simplelicensing_licenseExpression=license_expression,
559 simplelicensing_licenseListVersion=license_list_version,
560 )
561 if lic is not None:
562 return lic
563 else:
564 spdxid.append(spdxid_hash(*(v for _, v in license_text)))
565 lic = self.find_by_id(self.new_spdxid(*spdxid))
566 if lic is not None:
567 return lic
568
569 lic = self.add(
570 oe.spdx30.simplelicensing_LicenseExpression(
571 _id=self.new_spdxid(*spdxid),
572 creationInfo=self.doc.creationInfo,
573 simplelicensing_licenseExpression=license_expression,
574 simplelicensing_licenseListVersion=license_list_version,
575 )
576 )
577
578 for key, value in license_text:
579 lic.simplelicensing_customIdToUri.append(
580 oe.spdx30.DictionaryEntry(key=key, value=value)
581 )
582
583 return lic
584
585 def scan_declared_licenses(self, spdx_file, filepath, license_data):
586 for e in spdx_file.extension:
587 if isinstance(e, OELicenseScannedExtension):
588 return
589
590 file_licenses = set()
591 for extracted_lic in oe.spdx_common.extract_licenses(filepath):
592 lic = self.new_license_expression(extracted_lic, license_data)
593 self.set_element_alias(lic)
594 file_licenses.add(lic)
595
596 self.new_relationship(
597 [spdx_file],
598 oe.spdx30.RelationshipType.hasDeclaredLicense,
599 [oe.sbom30.get_element_link_id(lic_alias) for lic_alias in file_licenses],
600 )
601 spdx_file.extension.append(OELicenseScannedExtension())
602
603 def new_file(self, _id, name, path, *, purposes=[]):
604 sha256_hash = bb.utils.sha256_file(path)
605
606 for f in self.by_sha256_hash.get(sha256_hash, []):
607 if not isinstance(f, oe.spdx30.software_File):
608 continue
609
610 if purposes:
611 new_primary = purposes[0]
612 new_additional = []
613
614 if f.software_primaryPurpose:
615 new_additional.append(f.software_primaryPurpose)
616 new_additional.extend(f.software_additionalPurpose)
617
618 new_additional = sorted(
619 list(set(p for p in new_additional if p != new_primary))
620 )
621
622 f.software_primaryPurpose = new_primary
623 f.software_additionalPurpose = new_additional
624
625 if f.name != name:
626 for e in f.extension:
627 if isinstance(e, OEFileNameAliasExtension):
628 e.aliases.append(name)
629 break
630 else:
631 f.extension.append(OEFileNameAliasExtension(aliases=[name]))
632
633 return f
634
635 spdx_file = oe.spdx30.software_File(
636 _id=_id,
637 creationInfo=self.doc.creationInfo,
638 name=name,
639 )
640 if purposes:
641 spdx_file.software_primaryPurpose = purposes[0]
642 spdx_file.software_additionalPurpose = purposes[1:]
643
644 spdx_file.verifiedUsing.append(
645 oe.spdx30.Hash(
646 algorithm=oe.spdx30.HashAlgorithm.sha256,
647 hashValue=sha256_hash,
648 )
649 )
650
651 return self.add(spdx_file)
652
653 def new_cve_vuln(self, cve):
654 v = oe.spdx30.security_Vulnerability()
655 v._id = self.new_spdxid("vulnerability", cve)
656 v.creationInfo = self.doc.creationInfo
657
658 v.externalIdentifier.append(
659 oe.spdx30.ExternalIdentifier(
660 externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve,
661 identifier=cve,
662 identifierLocator=[
663 f"https://cveawg.mitre.org/api/cve/{cve}",
664 f"https://www.cve.org/CVERecord?id={cve}",
665 ],
666 )
667 )
668 return self.add(v)
669
670 def new_vex_patched_relationship(self, from_, to):
671 return self._new_relationship(
672 oe.spdx30.security_VexFixedVulnAssessmentRelationship,
673 from_,
674 oe.spdx30.RelationshipType.fixedIn,
675 to,
676 spdxid_name="vex-fixed",
677 security_vexVersion=VEX_VERSION,
678 )
679
680 def new_vex_unpatched_relationship(self, from_, to):
681 return self._new_relationship(
682 oe.spdx30.security_VexAffectedVulnAssessmentRelationship,
683 from_,
684 oe.spdx30.RelationshipType.affects,
685 to,
686 spdxid_name="vex-affected",
687 security_vexVersion=VEX_VERSION,
688 security_actionStatement="Mitigation action unknown",
689 )
690
691 def new_vex_ignored_relationship(self, from_, to, *, impact_statement):
692 return self._new_relationship(
693 oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship,
694 from_,
695 oe.spdx30.RelationshipType.doesNotAffect,
696 to,
697 spdxid_name="vex-not-affected",
698 security_vexVersion=VEX_VERSION,
699 security_impactStatement=impact_statement,
700 )
701
702 def import_bitbake_build_objset(self):
703 deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX"))
704 bb_objset = load_jsonld(
705 self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True
706 )
707 self.doc.import_.extend(bb_objset.doc.import_)
708 self.update(bb_objset.objects)
709
710 return bb_objset
711
712 def import_bitbake_build(self):
713 def find_bitbake_build(objset):
714 return objset.find_filter(
715 oe.spdx30.build_Build,
716 build_buildType=SPDX_BUILD_TYPE,
717 )
718
719 build = find_bitbake_build(self)
720 if build:
721 return build
722
723 bb_objset = self.import_bitbake_build_objset()
724 build = find_bitbake_build(bb_objset)
725 if build is None:
726 bb.fatal(f"No build found in {deploy_dir_spdx}")
727
728 return build
729
730 def new_task_build(self, name, typ):
731 current_task = self.d.getVar("BB_CURRENTTASK")
732 pn = self.d.getVar("PN")
733
734 build = self.add(
735 oe.spdx30.build_Build(
736 _id=self.new_spdxid("build", name),
737 creationInfo=self.doc.creationInfo,
738 name=f"{pn}:do_{current_task}:{name}",
739 build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}",
740 )
741 )
742
743 if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1":
744 bitbake_build = self.import_bitbake_build()
745
746 self.new_relationship(
747 [bitbake_build],
748 oe.spdx30.RelationshipType.ancestorOf,
749 [build],
750 )
751
752 if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1":
753 for varname in sorted(self.d.keys()):
754 if varname.startswith("__"):
755 continue
756
757 value = self.d.getVar(varname, expand=False)
758
759 # TODO: Deal with non-string values
760 if not isinstance(value, str):
761 continue
762
763 build.build_parameter.append(
764 oe.spdx30.DictionaryEntry(key=varname, value=value)
765 )
766
767 return build
768
769 def new_archive(self, archive_name):
770 return self.add(
771 oe.spdx30.software_File(
772 _id=self.new_spdxid("archive", str(archive_name)),
773 creationInfo=self.doc.creationInfo,
774 name=str(archive_name),
775 software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive,
776 )
777 )
778
779 @classmethod
780 def new_objset(cls, d, name, copy_from_bitbake_doc=True):
781 objset = cls(d)
782
783 document = oe.spdx30.SpdxDocument(
784 _id=objset.new_spdxid("document", name),
785 name=name,
786 )
787
788 document.extension.append(
789 OEIdAliasExtension(
790 alias=objset.new_alias_id(
791 document,
792 OE_DOC_ALIAS_PREFIX + d.getVar("PN") + "/" + name + "/",
793 ),
794 )
795 )
796 objset.doc = document
797 objset.add_index(document)
798
799 if copy_from_bitbake_doc:
800 bb_objset = objset.import_bitbake_build_objset()
801 document.creationInfo = objset.copy_creation_info(
802 bb_objset.doc.creationInfo
803 )
804 else:
805 document.creationInfo = objset.new_creation_info()
806
807 return objset
808
809 def expand_collection(self, *, add_objectsets=[]):
810 """
811 Expands a collection to pull in all missing elements
812
813 Returns the set of ids that could not be found to link into the document
814 """
815 missing_spdxids = set()
816 imports = {e.externalSpdxId: e for e in self.doc.import_}
817
818 def merge_doc(other):
819 nonlocal imports
820
821 for e in other.doc.import_:
822 if not e.externalSpdxId in imports:
823 imports[e.externalSpdxId] = e
824
825 self.objects |= other.objects
826
827 for o in add_objectsets:
828 merge_doc(o)
829
830 needed_spdxids = self.link()
831 provided_spdxids = set(self.obj_by_id.keys())
832
833 while True:
834 import_spdxids = set(imports.keys())
835 searching_spdxids = (
836 needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids
837 )
838 if not searching_spdxids:
839 break
840
841 spdxid = searching_spdxids.pop()
842 bb.debug(
843 1,
844 f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}",
845 )
846 dep_objset, dep_path = find_by_spdxid(self.d, spdxid)
847
848 if dep_objset:
849 dep_provided = set(dep_objset.obj_by_id.keys())
850 if spdxid not in dep_provided:
851 bb.fatal(f"{spdxid} not found in {dep_path}")
852 provided_spdxids |= dep_provided
853 needed_spdxids |= dep_objset.missing_ids
854 merge_doc(dep_objset)
855 else:
856 missing_spdxids.add(spdxid)
857
858 self.doc.import_ = sorted(imports.values(), key=lambda e: e.externalSpdxId)
859 bb.debug(1, "Linking...")
860 self.link()
861
862 # Manually go through all of the simplelicensing_customIdToUri DictionaryEntry
863 # items and resolve any aliases to actual objects.
864 for lic in self.foreach_type(oe.spdx30.simplelicensing_LicenseExpression):
865 for d in lic.simplelicensing_customIdToUri:
866 if d.value.startswith(OE_ALIAS_PREFIX):
867 obj = self.find_by_id(d.value)
868 if obj is not None:
869 d.value = obj._id
870 else:
871 self.missing_ids.add(d.value)
872
873 self.missing_ids -= set(imports.keys())
874 return self.missing_ids
875
876
877def load_jsonld(d, path, required=False):
878 deserializer = oe.spdx30.JSONLDDeserializer()
879 objset = ObjectSet(d)
880 try:
881 with path.open("rb") as f:
882 deserializer.read(f, objset)
883 except FileNotFoundError:
884 if required:
885 bb.fatal("No SPDX document named %s found" % path)
886 return None
887
888 if not objset.doc:
889 bb.fatal("SPDX Document %s has no SPDXDocument element" % path)
890 return None
891
892 objset.objects.remove(objset.doc)
893 return objset
894
895
896def jsonld_arch_path(d, arch, subdir, name, deploydir=None):
897 if deploydir is None:
898 deploydir = Path(d.getVar("DEPLOY_DIR_SPDX"))
899 return deploydir / arch / subdir / (name + ".spdx.json")
900
901
902def jsonld_hash_path(h):
903 return Path("by-spdxid-hash") / h[:2], h
904
905
906def load_jsonld_by_arch(d, arch, subdir, name, *, required=False):
907 path = jsonld_arch_path(d, arch, subdir, name)
908 objset = load_jsonld(d, path, required=required)
909 if objset is not None:
910 return (objset, path)
911 return (None, None)
912
913
914def find_jsonld(d, subdir, name, *, required=False):
915 package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split()
916 package_archs.reverse()
917
918 for arch in package_archs:
919 objset, path = load_jsonld_by_arch(d, arch, subdir, name)
920 if objset is not None:
921 return (objset, path)
922
923 if required:
924 bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name))
925
926 return (None, None)
927
928
929def write_jsonld_doc(d, objset, dest):
930 if not isinstance(objset, ObjectSet):
931 bb.fatal("Only an ObjsetSet can be serialized")
932 return
933
934 if not objset.doc:
935 bb.fatal("ObjectSet is missing a SpdxDocument")
936 return
937
938 objset.doc.rootElement = sorted(list(set(objset.doc.rootElement)))
939 objset.doc.profileConformance = sorted(
940 list(
941 getattr(oe.spdx30.ProfileIdentifierType, p)
942 for p in d.getVar("SPDX_PROFILES").split()
943 )
944 )
945
946 dest.parent.mkdir(exist_ok=True, parents=True)
947
948 if d.getVar("SPDX_PRETTY") == "1":
949 serializer = oe.spdx30.JSONLDSerializer(
950 indent=2,
951 )
952 else:
953 serializer = oe.spdx30.JSONLDInlineSerializer()
954
955 objset.objects.add(objset.doc)
956 with dest.open("wb") as f:
957 serializer.write(objset, f, force_at_graph=True)
958 objset.objects.remove(objset.doc)
959
960
961def write_recipe_jsonld_doc(
962 d,
963 objset,
964 subdir,
965 deploydir,
966 *,
967 create_spdx_id_links=True,
968):
969 pkg_arch = d.getVar("SSTATE_PKGARCH")
970
971 dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir)
972
973 def link_id(_id):
974 hash_path = jsonld_hash_path(hash_id(_id))
975
976 link_name = jsonld_arch_path(
977 d,
978 pkg_arch,
979 *hash_path,
980 deploydir=deploydir,
981 )
982 try:
983 link_name.parent.mkdir(exist_ok=True, parents=True)
984 link_name.symlink_to(os.path.relpath(dest, link_name.parent))
985 except:
986 target = link_name.readlink()
987 bb.warn(
988 f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}"
989 )
990 raise
991
992 return hash_path[-1]
993
994 objset.add_aliases()
995
996 try:
997 if create_spdx_id_links:
998 alias_ext = get_alias(objset.doc)
999 if alias_ext is not None and alias_ext.alias:
1000 alias_ext.link_name = link_id(alias_ext.alias)
1001
1002 finally:
1003 # It is really helpful for debugging if the JSON document is written
1004 # out, so always do that even if there is an error making the links
1005 write_jsonld_doc(d, objset, dest)
1006
1007
1008def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter):
1009 objset, fn = find_jsonld(d, subdir, fn_name, required=True)
1010
1011 spdx_obj = objset.find_root(obj_type, **attr_filter)
1012 if not spdx_obj:
1013 bb.fatal("No root %s found in %s" % (obj_type.__name__, fn))
1014
1015 return spdx_obj, objset
1016
1017
1018def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter):
1019 objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True)
1020
1021 spdx_obj = objset.find_filter(obj_type, **attr_filter)
1022 if not spdx_obj:
1023 bb.fatal("No %s found in %s" % (obj_type.__name__, fn))
1024
1025 return spdx_obj, objset
1026
1027
1028def find_by_spdxid(d, spdxid, *, required=False):
1029 if spdxid.startswith(OE_ALIAS_PREFIX):
1030 h = spdxid[len(OE_ALIAS_PREFIX) :].split("/", 1)[0]
1031 return find_jsonld(d, *jsonld_hash_path(h), required=required)
1032 return find_jsonld(d, *jsonld_hash_path(hash_id(spdxid)), required=required)
1033
1034
1035def create_sbom(d, name, root_elements, add_objectsets=[]):
1036 objset = ObjectSet.new_objset(d, name)
1037
1038 sbom = objset.add(
1039 oe.spdx30.software_Sbom(
1040 _id=objset.new_spdxid("sbom", name),
1041 name=name,
1042 creationInfo=objset.doc.creationInfo,
1043 software_sbomType=[oe.spdx30.software_SbomType.build],
1044 rootElement=root_elements,
1045 )
1046 )
1047
1048 missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets)
1049 if missing_spdxids:
1050 bb.warn(
1051 "The following SPDX IDs were unable to be resolved:\n "
1052 + "\n ".join(sorted(list(missing_spdxids)))
1053 )
1054
1055 # Filter out internal extensions from final SBoMs
1056 objset.remove_internal_extensions()
1057
1058 # SBoM should be the only root element of the document
1059 objset.doc.rootElement = [sbom]
1060
1061 # De-duplicate licenses
1062 unique = set()
1063 dedup = {}
1064 for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression):
1065 for u in unique:
1066 if (
1067 u.simplelicensing_licenseExpression
1068 == lic.simplelicensing_licenseExpression
1069 and u.simplelicensing_licenseListVersion
1070 == lic.simplelicensing_licenseListVersion
1071 ):
1072 dedup[lic] = u
1073 break
1074 else:
1075 unique.add(lic)
1076
1077 if dedup:
1078 for rel in objset.foreach_filter(
1079 oe.spdx30.Relationship,
1080 relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense,
1081 ):
1082 rel.to = [dedup.get(to, to) for to in rel.to]
1083
1084 for rel in objset.foreach_filter(
1085 oe.spdx30.Relationship,
1086 relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense,
1087 ):
1088 rel.to = [dedup.get(to, to) for to in rel.to]
1089
1090 for k, v in dedup.items():
1091 bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}")
1092 objset.objects.remove(k)
1093
1094 objset.create_index()
1095
1096 return objset, sbom