# # Copyright OpenEmbedded Contributors # # SPDX-License-Identifier: GPL-2.0-only # from pathlib import Path import oe.spdx30 import bb import re import hashlib import uuid import os import oe.spdx_common from datetime import datetime, timezone OE_SPDX_BASE = "https://rdf.openembedded.org/spdx/3.0/" VEX_VERSION = "1.0.0" SPDX_BUILD_TYPE = "http://openembedded.org/bitbake" @oe.spdx30.register(OE_SPDX_BASE + "link-extension") class OELinkExtension(oe.spdx30.extension_Extension): """ This custom extension controls if an Element creates a symlink based on its SPDX ID in the deploy directory. Some elements may not be able to be linked because they are duplicated in multiple documents (e.g. the bitbake Build Element). Those elements can add this extension and set link_spdx_id to False It is in internal extension that should be removed when writing out a final SBoM """ CLOSED = True INTERNAL = True @classmethod def _register_props(cls): super()._register_props() cls._add_property( "link_spdx_id", oe.spdx30.BooleanProp(), OE_SPDX_BASE + "link-spdx-id", min_count=1, max_count=1, ) # The symlinks written to the deploy directory are based on the hash of # the SPDX ID. While this makes it easy to look them up, it can be # difficult to trace a Element to the hashed symlink name. As a # debugging aid, this property is set to the basename of the symlink # when the symlink is created to make it easier to trace cls._add_property( "link_name", oe.spdx30.StringProp(), OE_SPDX_BASE + "link-name", max_count=1, ) @oe.spdx30.register(OE_SPDX_BASE + "id-alias") class OEIdAliasExtension(oe.spdx30.extension_Extension): """ This extension allows an Element to provide an internal alias for the SPDX ID. Since SPDX requires unique URIs for each SPDX ID, most of the objects created have a unique UUID namespace and the unihash of the task encoded in their SPDX ID. However, this causes a problem for referencing documents across recipes, since the taskhash of a dependency may not factor into the taskhash of the current task and thus the current task won't rebuild and see the new SPDX ID when the dependency changes (e.g. ABI safe recipes and tasks). To help work around this, this extension provides a non-unique alias for an Element by which it can be referenced from other tasks/recipes. When a final SBoM is created, references to these aliases will be replaced with the actual unique SPDX ID. Most Elements will automatically get an alias created when they are written out if they do not already have one. To suppress the creation of an alias, add an extension with a blank `alias` property. It is in internal extension that should be removed when writing out a final SBoM """ CLOSED = True INTERNAL = True @classmethod def _register_props(cls): super()._register_props() cls._add_property( "alias", oe.spdx30.StringProp(), OE_SPDX_BASE + "alias", max_count=1, ) cls._add_property( "link_name", oe.spdx30.StringProp(), OE_SPDX_BASE + "link-name", max_count=1, ) @oe.spdx30.register(OE_SPDX_BASE + "file-name-alias") class OEFileNameAliasExtension(oe.spdx30.extension_Extension): CLOSED = True INTERNAL = True @classmethod def _register_props(cls): super()._register_props() cls._add_property( "aliases", oe.spdx30.ListProp(oe.spdx30.StringProp()), OE_SPDX_BASE + "filename-alias", ) @oe.spdx30.register(OE_SPDX_BASE + "license-scanned") class OELicenseScannedExtension(oe.spdx30.extension_Extension): """ The presence of this extension means the file has already been scanned for license information """ CLOSED = True INTERNAL = True @oe.spdx30.register(OE_SPDX_BASE + "document-extension") class OEDocumentExtension(oe.spdx30.extension_Extension): """ This extension is added to a SpdxDocument to indicate various useful bits of information about its contents """ CLOSED = True @classmethod def _register_props(cls): super()._register_props() cls._add_property( "is_native", oe.spdx30.BooleanProp(), OE_SPDX_BASE + "is-native", max_count=1, ) def spdxid_hash(*items): h = hashlib.md5() for i in items: if isinstance(i, oe.spdx30.Element): h.update(i._id.encode("utf-8")) else: h.update(i.encode("utf-8")) return h.hexdigest() def spdx_sde(d): sde = d.getVar("SOURCE_DATE_EPOCH") if not sde: return datetime.now(timezone.utc) return datetime.fromtimestamp(int(sde), timezone.utc) def get_element_link_id(e): """ Get the string ID which should be used to link to an Element. If the element has an alias, that will be preferred, otherwise its SPDX ID will be used. """ ext = get_alias(e) if ext is not None and ext.alias: return ext.alias return e._id def set_alias(obj, alias): for ext in obj.extension: if not isinstance(ext, OEIdAliasExtension): continue ext.alias = alias return ext ext = OEIdAliasExtension(alias=alias) obj.extension.append(ext) return ext def get_alias(obj): for ext in obj.extension: if not isinstance(ext, OEIdAliasExtension): continue return ext return None def to_list(l): if isinstance(l, set): l = sorted(list(l)) if not isinstance(l, (list, tuple)): raise TypeError("Must be a list or tuple. Got %s" % type(l)) return l class ObjectSet(oe.spdx30.SHACLObjectSet): def __init__(self, d): super().__init__() self.d = d def create_index(self): self.by_sha256_hash = {} super().create_index() def add_index(self, obj): # Check that all elements are given an ID before being inserted if isinstance(obj, oe.spdx30.Element): if not obj._id: raise ValueError("Element missing ID") for ext in obj.extension: if not isinstance(ext, OEIdAliasExtension): continue if ext.alias: self.obj_by_id[ext.alias] = obj for v in obj.verifiedUsing: if not isinstance(v, oe.spdx30.Hash): continue if v.algorithm == oe.spdx30.HashAlgorithm.sha256: continue self.by_sha256_hash.setdefault(v.hashValue, set()).add(obj) super().add_index(obj) if isinstance(obj, oe.spdx30.SpdxDocument): self.doc = obj def __filter_obj(self, obj, attr_filter): return all(getattr(obj, k) == v for k, v in attr_filter.items()) def foreach_filter(self, typ, *, match_subclass=True, **attr_filter): for obj in self.foreach_type(typ, match_subclass=match_subclass): if self.__filter_obj(obj, attr_filter): yield obj def find_filter(self, typ, *, match_subclass=True, **attr_filter): for obj in self.foreach_filter( typ, match_subclass=match_subclass, **attr_filter ): return obj return None def foreach_root(self, typ, **attr_filter): for obj in self.doc.rootElement: if not isinstance(obj, typ): continue if self.__filter_obj(obj, attr_filter): yield obj def find_root(self, typ, **attr_filter): for obj in self.foreach_root(typ, **attr_filter): return obj return None def add_root(self, obj): self.add(obj) self.doc.rootElement.append(obj) return obj def is_native(self): for e in self.doc.extension: if not isinstance(e, oe.sbom30.OEDocumentExtension): continue if e.is_native is not None: return e.is_native return False def set_is_native(self, is_native): for e in self.doc.extension: if not isinstance(e, oe.sbom30.OEDocumentExtension): continue e.is_native = is_native return if is_native: self.doc.extension.append(oe.sbom30.OEDocumentExtension(is_native=True)) def add_aliases(self): for o in self.foreach_type(oe.spdx30.Element): if not o._id or o._id.startswith("_:"): continue alias_ext = get_alias(o) if alias_ext is None: unihash = self.d.getVar("BB_UNIHASH") namespace = self.get_namespace() if unihash not in o._id: bb.warn(f"Unihash {unihash} not found in {o._id}") elif namespace not in o._id: bb.warn(f"Namespace {namespace} not found in {o._id}") else: alias_ext = set_alias( o, o._id.replace(unihash, "UNIHASH").replace( namespace, self.d.getVar("PN") ), ) def remove_internal_extensions(self): def remove(o): o.extension = [e for e in o.extension if not getattr(e, "INTERNAL", False)] for o in self.foreach_type(oe.spdx30.Element): remove(o) if self.doc: remove(self.doc) def get_namespace(self): namespace_uuid = uuid.uuid5( uuid.NAMESPACE_DNS, self.d.getVar("SPDX_UUID_NAMESPACE") ) pn = self.d.getVar("PN") return "%s/%s-%s" % ( self.d.getVar("SPDX_NAMESPACE_PREFIX"), pn, str(uuid.uuid5(namespace_uuid, pn)), ) def new_spdxid(self, *suffix, include_unihash=True): items = [self.get_namespace()] if include_unihash: unihash = self.d.getVar("BB_UNIHASH") items.append(unihash) items.extend(re.sub(r"[^a-zA-Z0-9_-]", "_", s) for s in suffix) return "/".join(items) def new_import(self, key): base = f"SPDX_IMPORTS_{key}" spdxid = self.d.getVar(f"{base}_spdxid") if not spdxid: bb.fatal(f"{key} is not a valid SPDX_IMPORTS key") for i in self.docs.imports: if i.externalSpdxId == spdxid: # Already imported return spdxid m = oe.spdx30.ExternalMap(externalSpdxId=spdxid) uri = self.d.getVar(f"{base}_uri") if uri: m.locationHint = uri for pyname, algorithm in oe.spdx30.HashAlgorithm.NAMED_INDIVIDUALS.items(): value = self.d.getVar(f"{base}_hash_{pyname}") if value: m.verifiedUsing.append( oe.spdx30.Hash( algorithm=algorithm, hashValue=value, ) ) self.doc.imports.append(m) return spdxid def new_agent(self, varname, *, creation_info=None, add=True): ref_varname = self.d.getVar(f"{varname}_ref") if ref_varname: if ref_varname == varname: bb.fatal(f"{varname} cannot reference itself") return new_agent(varname, creation_info=creation_info) import_key = self.d.getVar(f"{varname}_import") if import_key: return self.new_import(import_key) name = self.d.getVar(f"{varname}_name") if not name: return None spdxid = self.new_spdxid("agent", name) agent = self.find_by_id(spdxid) if agent is not None: return agent agent_type = self.d.getVar("%s_type" % varname) if agent_type == "person": agent = oe.spdx30.Person() elif agent_type == "software": agent = oe.spdx30.SoftwareAgent() elif agent_type == "organization": agent = oe.spdx30.Organization() elif not agent_type or agent_type == "agent": agent = oe.spdx30.Agent() else: bb.fatal("Unknown agent type '%s' in %s_type" % (agent_type, varname)) agent._id = spdxid agent.creationInfo = creation_info or self.doc.creationInfo agent.name = name comment = self.d.getVar("%s_comment" % varname) if comment: agent.comment = comment for ( pyname, idtype, ) in oe.spdx30.ExternalIdentifierType.NAMED_INDIVIDUALS.items(): value = self.d.getVar("%s_id_%s" % (varname, pyname)) if value: agent.externalIdentifier.append( oe.spdx30.ExternalIdentifier( externalIdentifierType=idtype, identifier=value, ) ) if add: self.add(agent) return agent def new_creation_info(self): creation_info = oe.spdx30.CreationInfo() name = "%s %s" % ( self.d.getVar("SPDX_TOOL_NAME"), self.d.getVar("SPDX_TOOL_VERSION"), ) tool = self.add( oe.spdx30.Tool( _id=self.new_spdxid("tool", name), creationInfo=creation_info, name=name, ) ) authors = [] for a in self.d.getVar("SPDX_AUTHORS").split(): varname = "SPDX_AUTHORS_%s" % a author = self.new_agent(varname, creation_info=creation_info) if not author: bb.fatal("Unable to find or create author %s" % a) authors.append(author) creation_info.created = spdx_sde(self.d) creation_info.specVersion = self.d.getVar("SPDX_VERSION") creation_info.createdBy = authors creation_info.createdUsing = [tool] return creation_info def copy_creation_info(self, copy): c = oe.spdx30.CreationInfo( created=spdx_sde(self.d), specVersion=self.d.getVar("SPDX_VERSION"), ) for author in copy.createdBy: if isinstance(author, str): c.createdBy.append(author) else: c.createdBy.append(author._id) for tool in copy.createdUsing: if isinstance(tool, str): c.createdUsing.append(tool) else: c.createdUsing.append(tool._id) return c def new_annotation(self, subject, comment, typ): return self.add( oe.spdx30.Annotation( _id=self.new_spdxid("annotation", spdxid_hash(comment, typ)), creationInfo=self.doc.creationInfo, annotationType=typ, subject=subject, statement=comment, ) ) def _new_relationship( self, cls, from_, typ, to, *, spdxid_name="relationship", **props, ): from_ = to_list(from_) to = to_list(to) if not from_: return [] if not to: # TODO: Switch to the code constant once SPDX 3.0.1 is released to = ["https://spdx.org/rdf/3.0.0/terms/Core/NoneElement"] ret = [] for f in from_: hash_args = [typ, f] for k in sorted(props.keys()): hash_args.append(props[k]) hash_args.extend(to) relationship = self.add( cls( _id=self.new_spdxid(spdxid_name, spdxid_hash(*hash_args)), creationInfo=self.doc.creationInfo, from_=f, relationshipType=typ, to=to, **props, ) ) ret.append(relationship) return ret def new_relationship(self, from_, typ, to): return self._new_relationship(oe.spdx30.Relationship, from_, typ, to) def new_scoped_relationship(self, from_, typ, scope, to): return self._new_relationship( oe.spdx30.LifecycleScopedRelationship, from_, typ, to, scope=scope, ) def new_license_expression(self, license_expression, license_data, license_text_map={}): license_list_version = license_data["licenseListVersion"] # SPDX 3 requires that the license list version be a semver # MAJOR.MINOR.MICRO, but the actual license version might be # MAJOR.MINOR on some older versions. As such, manually append a .0 # micro version if its missing to keep SPDX happy if license_list_version.count(".") < 2: license_list_version += ".0" spdxid = [ "license", license_list_version, re.sub(r"[^a-zA-Z0-9_-]", "_", license_expression), ] license_text = ( (k, license_text_map[k]) for k in sorted(license_text_map.keys()) ) if not license_text: lic = self.find_filter( oe.spdx30.simplelicensing_LicenseExpression, simplelicensing_licenseExpression=license_expression, simplelicensing_licenseListVersion=license_list_version, ) if lic is not None: return lic else: spdxid.append(spdxid_hash(*(v for _, v in license_text))) lic = self.find_by_id(self.new_spdxid(*spdxid)) if lic is not None: return lic lic = self.add( oe.spdx30.simplelicensing_LicenseExpression( _id=self.new_spdxid(*spdxid), creationInfo=self.doc.creationInfo, simplelicensing_licenseExpression=license_expression, simplelicensing_licenseListVersion=license_list_version, ) ) for key, value in license_text: lic.simplelicensing_customIdToUri.append( oe.spdx30.DictionaryEntry(key=key, value=value) ) return lic def scan_declared_licenses(self, spdx_file, filepath, license_data): for e in spdx_file.extension: if isinstance(e, OELicenseScannedExtension): return file_licenses = set() for extracted_lic in oe.spdx_common.extract_licenses(filepath): file_licenses.add(self.new_license_expression(extracted_lic, license_data)) self.new_relationship( [spdx_file], oe.spdx30.RelationshipType.hasDeclaredLicense, file_licenses, ) spdx_file.extension.append(OELicenseScannedExtension()) def new_file(self, _id, name, path, *, purposes=[]): sha256_hash = bb.utils.sha256_file(path) for f in self.by_sha256_hash.get(sha256_hash, []): if not isinstance(oe.spdx30.software_File): continue if purposes: new_primary = purposes[0] new_additional = [] if f.software_primaryPurpose: new_additional.append(f.software_primaryPurpose) new_additional.extend(f.software_additionalPurpose) new_additional = sorted( list(set(p for p in new_additional if p != new_primary)) ) f.software_primaryPurpose = new_primary f.software_additionalPurpose = new_additional if f.name != name: for e in f.extension: if isinstance(e, OEFileNameAliasExtension): e.aliases.append(name) break else: f.extension.append(OEFileNameAliasExtension(aliases=[name])) return f spdx_file = oe.spdx30.software_File( _id=_id, creationInfo=self.doc.creationInfo, name=name, ) if purposes: spdx_file.software_primaryPurpose = purposes[0] spdx_file.software_additionalPurpose = purposes[1:] spdx_file.verifiedUsing.append( oe.spdx30.Hash( algorithm=oe.spdx30.HashAlgorithm.sha256, hashValue=sha256_hash, ) ) return self.add(spdx_file) def new_cve_vuln(self, cve): v = oe.spdx30.security_Vulnerability() v._id = self.new_spdxid("vulnerability", cve) v.creationInfo = self.doc.creationInfo v.externalIdentifier.append( oe.spdx30.ExternalIdentifier( externalIdentifierType=oe.spdx30.ExternalIdentifierType.cve, identifier=cve, identifierLocator=[ f"https://cveawg.mitre.org/api/cve/{cve}", f"https://www.cve.org/CVERecord?id={cve}", ], ) ) return self.add(v) def new_vex_patched_relationship(self, from_, to): return self._new_relationship( oe.spdx30.security_VexFixedVulnAssessmentRelationship, from_, oe.spdx30.RelationshipType.fixedIn, to, spdxid_name="vex-fixed", security_vexVersion=VEX_VERSION, ) def new_vex_unpatched_relationship(self, from_, to): return self._new_relationship( oe.spdx30.security_VexAffectedVulnAssessmentRelationship, from_, oe.spdx30.RelationshipType.affects, to, spdxid_name="vex-affected", security_vexVersion=VEX_VERSION, ) def new_vex_ignored_relationship(self, from_, to, *, impact_statement): return self._new_relationship( oe.spdx30.security_VexNotAffectedVulnAssessmentRelationship, from_, oe.spdx30.RelationshipType.doesNotAffect, to, spdxid_name="vex-not-affected", security_vexVersion=VEX_VERSION, security_impactStatement=impact_statement, ) def import_bitbake_build_objset(self): deploy_dir_spdx = Path(self.d.getVar("DEPLOY_DIR_SPDX")) bb_objset = load_jsonld( self.d, deploy_dir_spdx / "bitbake.spdx.json", required=True ) self.doc.imports.extend(bb_objset.doc.imports) self.update(bb_objset.objects) return bb_objset def import_bitbake_build(self): def find_bitbake_build(objset): return objset.find_filter( oe.spdx30.build_Build, build_buildType=SPDX_BUILD_TYPE, ) build = find_bitbake_build(self) if build: return build bb_objset = self.import_bitbake_build_objset() build = find_bitbake_build(bb_objset) if build is None: bb.fatal(f"No build found in {deploy_dir_spdx}") return build def new_task_build(self, name, typ): current_task = self.d.getVar("BB_CURRENTTASK") pn = self.d.getVar("PN") build = self.add( oe.spdx30.build_Build( _id=self.new_spdxid("build", name), creationInfo=self.doc.creationInfo, name=f"{pn}:do_{current_task}:{name}", build_buildType=f"{SPDX_BUILD_TYPE}/do_{current_task}/{typ}", ) ) if self.d.getVar("SPDX_INCLUDE_BITBAKE_PARENT_BUILD") == "1": bitbake_build = self.import_bitbake_build() self.new_relationship( [bitbake_build], oe.spdx30.RelationshipType.ancestorOf, [build], ) if self.d.getVar("SPDX_INCLUDE_BUILD_VARIABLES") == "1": for varname in sorted(self.d.keys()): if varname.startswith("__"): continue value = self.d.getVar(varname, expand=False) # TODO: Deal with non-string values if not isinstance(value, str): continue build.parameters.append( oe.spdx30.DictionaryEntry(key=varname, value=value) ) return build def new_archive(self, archive_name): return self.add( oe.spdx30.software_File( _id=self.new_spdxid("archive", str(archive_name)), creationInfo=self.doc.creationInfo, name=str(archive_name), software_primaryPurpose=oe.spdx30.software_SoftwarePurpose.archive, ) ) @classmethod def new_objset(cls, d, name, copy_from_bitbake_doc=True): objset = cls(d) document = oe.spdx30.SpdxDocument( _id=objset.new_spdxid("document", name), name=name, ) document.extension.append(OEIdAliasExtension()) document.extension.append(OELinkExtension(link_spdx_id=False)) objset.doc = document if copy_from_bitbake_doc: bb_objset = objset.import_bitbake_build_objset() document.creationInfo = objset.copy_creation_info( bb_objset.doc.creationInfo ) else: document.creationInfo = objset.new_creation_info() return objset def expand_collection(self, *, add_objectsets=[]): """ Expands a collection to pull in all missing elements Returns the set of ids that could not be found to link into the document """ missing_spdxids = set() imports = {e.externalSpdxId: e for e in self.doc.imports} def merge_doc(other): nonlocal imports for e in other.doc.imports: if not e.externalSpdxId in imports: imports[e.externalSpdxId] = e self.objects |= other.objects for o in add_objectsets: merge_doc(o) needed_spdxids = self.link() provided_spdxids = set(self.obj_by_id.keys()) while True: import_spdxids = set(imports.keys()) searching_spdxids = ( needed_spdxids - provided_spdxids - missing_spdxids - import_spdxids ) if not searching_spdxids: break spdxid = searching_spdxids.pop() bb.debug( 1, f"Searching for {spdxid}. Remaining: {len(searching_spdxids)}, Total: {len(provided_spdxids)}, Missing: {len(missing_spdxids)}, Imports: {len(import_spdxids)}", ) dep_objset, dep_path = find_by_spdxid(self.d, spdxid) if dep_objset: dep_provided = set(dep_objset.obj_by_id.keys()) if spdxid not in dep_provided: bb.fatal(f"{spdxid} not found in {dep_path}") provided_spdxids |= dep_provided needed_spdxids |= dep_objset.missing_ids merge_doc(dep_objset) else: missing_spdxids.add(spdxid) bb.debug(1, "Linking...") missing = self.link() if missing != missing_spdxids: bb.fatal( f"Linked document doesn't match missing SPDX ID list. Got: {missing}\nExpected: {missing_spdxids}" ) self.doc.imports = sorted(imports.values(), key=lambda e: e.externalSpdxId) return missing_spdxids def load_jsonld(d, path, required=False): deserializer = oe.spdx30.JSONLDDeserializer() objset = ObjectSet(d) try: with path.open("rb") as f: deserializer.read(f, objset) except FileNotFoundError: if required: bb.fatal("No SPDX document named %s found" % path) return None if not objset.doc: bb.fatal("SPDX Document %s has no SPDXDocument element" % path) return None objset.objects.remove(objset.doc) return objset def jsonld_arch_path(d, arch, subdir, name, deploydir=None): if deploydir is None: deploydir = Path(d.getVar("DEPLOY_DIR_SPDX")) return deploydir / arch / subdir / (name + ".spdx.json") def jsonld_hash_path(_id): h = hashlib.sha256(_id.encode("utf-8")).hexdigest() return Path("by-spdxid-hash") / h[:2], h def load_jsonld_by_arch(d, arch, subdir, name, *, required=False): path = jsonld_arch_path(d, arch, subdir, name) objset = load_jsonld(d, path, required=required) if objset is not None: return (objset, path) return (None, None) def find_jsonld(d, subdir, name, *, required=False): package_archs = d.getVar("SPDX_MULTILIB_SSTATE_ARCHS").split() package_archs.reverse() for arch in package_archs: objset, path = load_jsonld_by_arch(d, arch, subdir, name) if objset is not None: return (objset, path) if required: bb.fatal("Could not find a %s SPDX document named %s" % (subdir, name)) return (None, None) def write_jsonld_doc(d, objset, dest): if not isinstance(objset, ObjectSet): bb.fatal("Only an ObjsetSet can be serialized") return if not objset.doc: bb.fatal("ObjectSet is missing a SpdxDocument") return objset.doc.rootElement = sorted(list(set(objset.doc.rootElement))) objset.doc.profileConformance = sorted( list( getattr(oe.spdx30.ProfileIdentifierType, p) for p in d.getVar("SPDX_PROFILES").split() ) ) dest.parent.mkdir(exist_ok=True, parents=True) if d.getVar("SPDX_PRETTY") == "1": serializer = oe.spdx30.JSONLDSerializer( indent=2, ) else: serializer = oe.spdx30.JSONLDInlineSerializer() objset.objects.add(objset.doc) with dest.open("wb") as f: serializer.write(objset, f, force_at_graph=True) objset.objects.remove(objset.doc) def write_recipe_jsonld_doc( d, objset, subdir, deploydir, *, create_spdx_id_links=True, ): pkg_arch = d.getVar("SSTATE_PKGARCH") dest = jsonld_arch_path(d, pkg_arch, subdir, objset.doc.name, deploydir=deploydir) def link_id(_id): hash_path = jsonld_hash_path(_id) link_name = jsonld_arch_path( d, pkg_arch, *hash_path, deploydir=deploydir, ) try: link_name.parent.mkdir(exist_ok=True, parents=True) link_name.symlink_to(os.path.relpath(dest, link_name.parent)) except: target = link_name.readlink() bb.warn( f"Unable to link {_id} in {dest} as {link_name}. Already points to {target}" ) raise return hash_path[-1] objset.add_aliases() try: if create_spdx_id_links: for o in objset.foreach_type(oe.spdx30.Element): if not o._id or o._id.startswith("_:"): continue ext = None for e in o.extension: if not isinstance(e, OELinkExtension): continue ext = e break if ext is None: ext = OELinkExtension(link_spdx_id=True) o.extension.append(ext) if ext.link_spdx_id: ext.link_name = link_id(o._id) alias_ext = get_alias(o) if alias_ext is not None and alias_ext.alias: alias_ext.link_name = link_id(alias_ext.alias) finally: # It is really helpful for debugging if the JSON document is written # out, so always do that even if there is an error making the links write_jsonld_doc(d, objset, dest) def find_root_obj_in_jsonld(d, subdir, fn_name, obj_type, **attr_filter): objset, fn = find_jsonld(d, subdir, fn_name, required=True) spdx_obj = objset.find_root(obj_type, **attr_filter) if not spdx_obj: bb.fatal("No root %s found in %s" % (obj_type.__name__, fn)) return spdx_obj, objset def load_obj_in_jsonld(d, arch, subdir, fn_name, obj_type, **attr_filter): objset, fn = load_jsonld_by_arch(d, arch, subdir, fn_name, required=True) spdx_obj = objset.find_filter(obj_type, **attr_filter) if not spdx_obj: bb.fatal("No %s found in %s" % (obj_type.__name__, fn)) return spdx_obj, objset def find_by_spdxid(d, spdxid, *, required=False): return find_jsonld(d, *jsonld_hash_path(spdxid), required=required) def create_sbom(d, name, root_elements, add_objectsets=[]): objset = ObjectSet.new_objset(d, name) sbom = objset.add( oe.spdx30.software_Sbom( _id=objset.new_spdxid("sbom", name), name=name, creationInfo=objset.doc.creationInfo, software_sbomType=[oe.spdx30.software_SbomType.build], rootElement=root_elements, ) ) missing_spdxids = objset.expand_collection(add_objectsets=add_objectsets) if missing_spdxids: bb.warn( "The following SPDX IDs were unable to be resolved:\n " + "\n ".join(sorted(list(missing_spdxids))) ) # Filter out internal extensions from final SBoMs objset.remove_internal_extensions() # SBoM should be the only root element of the document objset.doc.rootElement = [sbom] # De-duplicate licenses unique = set() dedup = {} for lic in objset.foreach_type(oe.spdx30.simplelicensing_LicenseExpression): for u in unique: if ( u.simplelicensing_licenseExpression == lic.simplelicensing_licenseExpression and u.simplelicensing_licenseListVersion == lic.simplelicensing_licenseListVersion ): dedup[lic] = u break else: unique.add(lic) if dedup: for rel in objset.foreach_filter( oe.spdx30.Relationship, relationshipType=oe.spdx30.RelationshipType.hasDeclaredLicense, ): rel.to = [dedup.get(to, to) for to in rel.to] for rel in objset.foreach_filter( oe.spdx30.Relationship, relationshipType=oe.spdx30.RelationshipType.hasConcludedLicense, ): rel.to = [dedup.get(to, to) for to in rel.to] for k, v in dedup.items(): bb.debug(1, f"Removing duplicate License {k._id} -> {v._id}") objset.objects.remove(k) objset.create_index() return objset, sbom