Skip to content

Commit

Permalink
Merge pull request #18 from OpenRailAssociation/clearlydefined-batches
Browse files Browse the repository at this point in the history
Request multiple packages at once from ClearlyDefined
  • Loading branch information
mxmehl authored Aug 14, 2024
2 parents b8b9d47 + 5c5ac71 commit b272c1c
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 49 deletions.
72 changes: 56 additions & 16 deletions complassist/_clearlydefined.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _cdapi_call(
method: str = "GET",
api_url: str = "https://api.clearlydefined.io",
basepath: str = "definitions",
json_dict: dict | None = None,
json_dict: dict | list | None = None,
**params: str,
) -> dict:
"""
Expand Down Expand Up @@ -104,7 +104,7 @@ def _cdapi_call(
"""
url = urljoin(api_url, pathjoin(basepath, path))
if json_dict:
result = make_request_with_retry(method=method, url=url, json=json_dict)
result = make_request_with_retry(method=method, url=url, json=json_dict, params=params)
else:
result = make_request_with_retry(method=method, url=url, params=params)

Expand Down Expand Up @@ -158,17 +158,23 @@ def _extract_license_copyright(cd_api_response: dict) -> tuple[str, str]:
return license_declared, "\n".join(copyrights).strip()


def _send_cd_harvest_request(coordinates: str) -> None:
def _handle_missing_license_and_request_harvest(coordinates: str) -> None:
"""
Sends a harvest request to ClearlyDefined for the given coordinates.
Handles the case when a declared license is not found and triggers a harvest
request.
Triggers a ClearlyDefined harvest operation for the provided coordinates to
request the collection of metadata and license information.
Logs the event of a missing license and sends a harvest request to
ClearlyDefined for the given coordinates.
Args:
coordinates (str): The ClearlyDefined coordinates or Package URL (purl)
for which to request harvesting.
coordinates (str): The ClearlyDefined coordinates or Package URL for
which the license is missing.
"""
logging.info(
"Adding %s to be harvested by ClearlyDefined. "
"Make sure the package and this version actually exists, and try again later.",
coordinates,
)
_cdapi_call(
path="",
method="POST",
Expand All @@ -179,8 +185,7 @@ def _send_cd_harvest_request(coordinates: str) -> None:

def get_clearlydefined_license_and_copyright(coordinates: str) -> tuple[str, str]:
"""
Retrieves the declared license for the specified coordinates from
ClearlyDefined.
Retrieves the declared license for the specified coordinates from ClearlyDefined.
Queries the ClearlyDefined API to get the declared license for the provided
coordinates or Package URL (purl). If no license is found, it initiates a
Expand All @@ -203,16 +208,51 @@ def get_clearlydefined_license_and_copyright(coordinates: str) -> tuple[str, str

# Declared license couldn't be extracted. Add to harvest
if not declared_license:
logging.info(
"Adding %s to be harvest by ClearlyDefined. "
"Make sure the package and this version actually exists, and try again later.",
coordinates,
)
_send_cd_harvest_request(coordinates)
_handle_missing_license_and_request_harvest(coordinates)

return declared_license, copyrights


def get_clearlydefined_license_and_copyright_in_batches(
purls: list[str],
) -> dict[str, tuple[str, str]]:
"""
Retrieves the declared license for multiple purls from ClearlyDefined.
Queries the ClearlyDefined API to get the declared license for the provided
packages via Package URLs. If no license is found, it initiates a
harvest request.
Args:
coordinates (str): The ClearlyDefined coordinates or Package URL for
which to retrieve the license.
Returns:
tuple[str, str]: A tuple containing:
- The declared license as a string, or an empty string if not found.
- The detected copyright attributions as a single string, with each
attribution separated by a newline, or an empty string if not
found.
"""
coordinates_purls = {purl_to_cd_coordinates(purl): purl for purl in purls}
api_return = _cdapi_call(
path="", method="POST", json_dict=list(coordinates_purls.keys()), expand="-files"
)

result: dict[str, tuple[str, str]] = {}
for pkg_coordinates, cd_data in api_return.items():
pkg_purl = coordinates_purls[pkg_coordinates]
declared_license, copyrights = _extract_license_copyright(cd_data)

# Declared license couldn't be extracted. Add to harvest
if not declared_license:
_handle_missing_license_and_request_harvest(pkg_coordinates)

result[pkg_purl] = (declared_license, copyrights)

return result


def print_clearlydefined_result(results: tuple[str, str]) -> None:
"""
Pretty-print the results for declared license and copyright attributions
Expand Down
14 changes: 11 additions & 3 deletions complassist/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@ def read_json_file(path: str) -> dict:


def write_json_file(data: dict, path: str) -> None:
"""Write a dict into a JSON file"""
with open(path, "w", encoding="UTF-8") as jsonfile:
return json.dump(data, jsonfile, indent=2)
"""Write a dict into a JSON file, unless path is `-` for which it will be stdout"""
if path == "-":
print(json.dumps(data, indent=2))
else:
with open(path, "w", encoding="UTF-8") as jsonfile:
json.dump(data, jsonfile, indent=2)


def print_json_file(path: str) -> None:
"""Open a JSON file and print it to stdout"""
write_json_file(read_json_file(path), "-")


def delete_file(path: str) -> None:
Expand Down
30 changes: 30 additions & 0 deletions complassist/_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# SPDX-FileCopyrightText: 2024 DB Systel GmbH
#
# SPDX-License-Identifier: Apache-2.0

"""Logging functions"""
import logging


def configure_logger(args) -> logging.Logger:
"""Set logging options"""
# Base logger config
log = logging.getLogger()
logging.basicConfig(
encoding="utf-8",
format="%(levelname)s: %(message)s",
level=logging.INFO,
)
# Adapt logging level
if args.verbose:
log.setLevel("DEBUG")
# Activate extreme logging for requests to also get POST data
if hasattr(args, "http_debug") and args.http_debug:
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
import http.client as http_client # pylint: disable=import-outside-toplevel

http_client.HTTPConnection.debuglevel = 1

return log
57 changes: 46 additions & 11 deletions complassist/_sbom_enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
from . import __version__
from ._clearlydefined import (
get_clearlydefined_license_and_copyright,
get_clearlydefined_license_and_copyright_in_batches,
purl_to_cd_coordinates,
)
from ._helpers import extract_excerpt, read_json_file, write_json_file
from ._sbom_parse import (
extract_items_from_cdx_sbom,
extract_items_from_component,
licenses_short_to_string,
spdx_expression_to_cdx_licenses,
Expand All @@ -22,7 +24,7 @@

def _compare_sbom_cd_license(
component: dict,
cd_license: str,
cd_license: str | None,
sbom_license: str,
sbom_licenses_item: list[dict],
sbom_licenses_short_item: list[dict],
Expand Down Expand Up @@ -86,7 +88,7 @@ def _compare_sbom_cd_license(


def _compare_sbom_cd_copyright(
component: dict, cd_copyright: str, sbom_copyright: str
component: dict, cd_copyright: str | None, sbom_copyright: str
) -> tuple[str, str]:
"""
Compares and potentially updates the SBOM component's copyright information
Expand Down Expand Up @@ -132,12 +134,17 @@ def _compare_sbom_cd_copyright(
return msg, msg_level


def _enrich_component_with_cd_data(component: dict) -> None:
def _enrich_component_with_cd_data(
component: dict, clearlydefined_data: dict[str, dict[str, str]]
) -> None:
"""
Enriches a single component with data from ClearlyDefined.
Args:
component (dict): The component data to enrich.
clearlydefined_data (dict): Previously fetched data for detected PURLs
from ClearlyDefined.
"""
# Get purl, original licenses, and short/simplified licenses data from component
raw_data = extract_items_from_component(
Expand All @@ -150,10 +157,9 @@ def _enrich_component_with_cd_data(component: dict) -> None:
sbom_license = licenses_short_to_string(sbom_licenses_short_item)
sbom_copyright = raw_data["copyright"]

# Get licensing/copyright data from ClearlyDefined
cd_license, cd_copyright = get_clearlydefined_license_and_copyright(
coordinates=purl_to_cd_coordinates(purl)
)
# Get fetched licensing/copyright data from ClearlyDefined
cd_license = clearlydefined_data[purl].get("license")
cd_copyright = clearlydefined_data[purl].get("copyright")

# Compare license data of SBOM with ClearlyDefined
msg, msg_level = _compare_sbom_cd_license(
Expand Down Expand Up @@ -228,7 +234,9 @@ def _update_sbom_metadata(sbom: dict) -> dict:
return sbom


def enrich_sbom_with_clearlydefined(sbom_file: str, output_file: str) -> None:
def enrich_sbom_with_clearlydefined(
sbom_file: str, output_file: str, in_batches: bool = True, batch_size: int = 15
) -> None:
"""
Parse a SBOM and enrich license/copyright data of each component with
ClearlyDefined. Write result to new SBOM file.
Expand All @@ -244,13 +252,40 @@ def enrich_sbom_with_clearlydefined(sbom_file: str, output_file: str) -> None:
Args:
sbom_file (str): Path to the input SBOM file.
output_file (str): Path to save the enriched SBOM.
in_batches (bool): Ask ClearlyDefined API for multiple packages at once.
batch_size (int): Number of packages for batch request at ClearlyDefined.
"""

sbom = read_json_file(sbom_file)
sbom: dict[str, list[dict]] = read_json_file(sbom_file)

# Loop all contained components, and collect ClearlyDefined data
clearlydefined_data: dict[str, dict[str, str]] = {}
all_purls: list[str] = [
c["purl"] for c in extract_items_from_cdx_sbom(sbom_file, information=["purl"])
]
if in_batches:
# Split all purls in batches of `batch_size` size
purls_batches: list[list[str]] = [
all_purls[x : x + batch_size] for x in range(0, len(all_purls), batch_size)
]
for batch in purls_batches:
logging.info("Getting ClearlyDefined data for %s", ", ".join(batch))
result = get_clearlydefined_license_and_copyright_in_batches(batch)
# Unpack result batches, and add to clearlydefined_data
for purl, (cd_license, cd_copyright) in result.items():
clearlydefined_data[purl] = {"license": cd_license, "copyright": cd_copyright}

else:
for purl in all_purls:
logging.info("Getting ClearlyDefined data for %s", purl)
cd_license, cd_copyright = get_clearlydefined_license_and_copyright(
coordinates=purl_to_cd_coordinates(purl)
)
clearlydefined_data[purl] = {"license": cd_license, "copyright": cd_copyright}

# Loop all contained components, and collect updates
# Now, update the components with the fetched ClearlyDefined data
for component in sbom.get("components", []):
_enrich_component_with_cd_data(component)
_enrich_component_with_cd_data(component, clearlydefined_data)

# Update SBOM metadata
sbom = _update_sbom_metadata(sbom)
Expand Down
9 changes: 7 additions & 2 deletions complassist/_sbom_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import docker
from docker.errors import APIError, ContainerError, DockerException, ImageNotFound

from ._helpers import print_json_file


def _sanitize_container_name(name: str) -> str:
"""
Expand Down Expand Up @@ -143,8 +145,11 @@ def generate_cdx_sbom(directory: str, output: str = "") -> str:
with NamedTemporaryFile() as tmpfile:
_run_cdxgen(dclient, directory, cont_name, tmpfile.name)

# Copy to final destination with user permissions
copy2(tmpfile.name, output)
# Copy to final destination with user permissions, or print file if requested
if output == "-":
print_json_file(tmpfile.name)
else:
copy2(tmpfile.name, output)

logging.info("SBOM has been saved to %s", output)

Expand Down
4 changes: 3 additions & 1 deletion complassist/_sbom_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,12 @@ def licenses_short_to_string(licenses: list) -> str:
return ""


def spdx_expression_to_cdx_licenses(spdx_expression: str) -> list:
def spdx_expression_to_cdx_licenses(spdx_expression: str | None) -> list:
"""
Convert a SPDX expression to a valid CycloneDX licenses item
"""
if spdx_expression is None:
return [{"expression": spdx_expression}]
return [{"expression": spdx_expression}]


Expand Down
Loading

0 comments on commit b272c1c

Please sign in to comment.