diff --git a/.gitignore b/.gitignore index 8aa50e1..b3ff707 100644 --- a/.gitignore +++ b/.gitignore @@ -28,10 +28,7 @@ htmlcov/ coverage.xml # preliminary -sw360/ cli/ -xxsw360/ -xxcli/ Releases/ # internal stuff diff --git a/ChangeLog.md b/ChangeLog.md index b0dc93d..4a8ea92 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -5,6 +5,13 @@ # CaPyCli - Clearing Automation Python Command Line Tool for SW360 +## 2.2.0 (2024-01-28) + +* `getdependencies javascript` can now handle package-lock.json files of version 3. +* `bom findsources` can do source URL discovery using sw360 lookup, perform extensive + GitLab deep search, and adapt search strategy based on diverse programming languages. +* Have type support. + ## 2.1.0 (2023-12-16) * Be more resilient about missing metadata in CycloneDX SBOMs. diff --git a/RunChecks.ps1 b/RunChecks.ps1 index 7832219..e28e2d1 100644 --- a/RunChecks.ps1 +++ b/RunChecks.ps1 @@ -13,7 +13,12 @@ Write-Host "markdownlint ..." npx -q markdownlint-cli *.md --disable MD041 Write-Host "isort ..." -isort . +poetry run isort . + +Write-Host "mypy ..." +poetry run mypy . + +Write-Host "Done." # ----------------------------------- # ----------------------------------- diff --git a/capycli/bom/bom_convert.py b/capycli/bom/bom_convert.py index 8938e4e..fdc4452 100644 --- a/capycli/bom/bom_convert.py +++ b/capycli/bom/bom_convert.py @@ -13,6 +13,8 @@ from enum import Enum from typing import Any +from sortedcontainers import SortedSet + import capycli.common.json_support import capycli.common.script_base from capycli import get_logger @@ -58,15 +60,15 @@ def convert(self, # default is CaPyCLI outputformat = BomFormat.CAPYCLI - cdx_components = [] + cdx_components: SortedSet project = None sbom = None try: if inputformat == BomFormat.TEXT: - cdx_components = PlainTextSupport.flatlist_to_cdx_components(inputfile) + cdx_components = SortedSet(PlainTextSupport.flatlist_to_cdx_components(inputfile)) print_text(f" {len(cdx_components)} components read from file {inputfile}") elif inputformat == BomFormat.CSV: - cdx_components = CsvSupport.csv_to_cdx_components(inputfile) + cdx_components = SortedSet(CsvSupport.csv_to_cdx_components(inputfile)) print_text(f" {len(cdx_components)} components read from file {inputfile}") elif (inputformat == BomFormat.CAPYCLI) or (inputformat == BomFormat.SBOM): sbom = CaPyCliBom.read_sbom(inputfile) @@ -74,7 +76,7 @@ def convert(self, project = sbom.metadata.component print_text(f" {len(cdx_components)} components read from file {inputfile}") elif inputformat == BomFormat.LEGACY: - cdx_components = LegacySupport.legacy_to_cdx_components(inputfile) + cdx_components = SortedSet(LegacySupport.legacy_to_cdx_components(inputfile)) print_text(f" {len(cdx_components)} components read from file {inputfile}") elif inputformat == BomFormat.LEGACY_CX: sbom = LegacyCx.read_sbom(inputfile) @@ -89,7 +91,7 @@ def convert(self, try: if outputformat == BomFormat.TEXT: - PlainTextSupport.write_cdx_components_as_flatlist(cdx_components, outputfile) + PlainTextSupport.write_cdx_components_as_flatlist2(cdx_components, outputfile) print_text(f" {len(cdx_components)} components written to file {outputfile}") elif outputformat == BomFormat.CSV: CsvSupport.write_cdx_components_as_csv(cdx_components, outputfile) @@ -147,7 +149,7 @@ def display_help(self) -> None: print(" -if INPUTFORMAT Specify input file format: capycli|sbom|text|csv|legacy|legacy-cx") print(" -of OUTPUTFORMAT Specify output file format: capycli|text|csv|legacy|html") - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" print("\n" + capycli.APP_NAME + ", " + capycli.get_app_version() + " - Convert SBOM formats\n") diff --git a/capycli/bom/check_bom.py b/capycli/bom/check_bom.py index c3e4344..bd2fa8f 100644 --- a/capycli/bom/check_bom.py +++ b/capycli/bom/check_bom.py @@ -1,5 +1,5 @@ # ------------------------------------------------------------------------------- -# Copyright (c) 2019-23 Siemens +# Copyright (c) 2019-2024 Siemens # All Rights Reserved. # Author: thomas.graf@siemens.com # @@ -9,12 +9,13 @@ import logging import os import sys +from typing import Any, Dict, Optional import requests -import sw360.sw360_api from colorama import Fore, Style from cyclonedx.model.bom import Bom from cyclonedx.model.component import Component +from sw360 import SW360Error import capycli.common.script_base from capycli.common.capycli_bom_support import CaPyCliBom, CycloneDxSupport @@ -39,33 +40,46 @@ def _bom_has_items_without_id(self, bom: Bom) -> bool: return False - def _find_by_id(self, component: Component) -> dict: + def _find_by_id(self, component: Component) -> Optional[Dict[str, Any]]: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + sw360id = CycloneDxSupport.get_property_value(component, CycloneDxSupport.CDX_PROP_SW360ID) + version = component.version or "" for step in range(3): try: release_details = self.client.get_release(sw360id) return release_details - except sw360.sw360_api.SW360Error as swex: - if swex.response.status_code == requests.codes['not_found']: + except SW360Error as swex: + if swex.response is None: + print_red(" Unknown error: " + swex.message) + elif swex.response.status_code == requests.codes['not_found']: print_yellow( " Not found " + component.name + - ", " + component.version + ", " + sw360id) + ", " + version + ", " + sw360id) break # only report other errors if this is the third attempt if step >= 2: print(Fore.LIGHTRED_EX + " Error retrieving release data: ") print( - " " + component.name + ", " + component.version + + " " + component.name + ", " + version + ", " + sw360id) - print(" Status Code: " + str(swex.response.status_code)) + if swex.response: + print(" Status Code: " + str(swex.response.status_code)) if swex.message: print(" Message: " + swex.message) print(Style.RESET_ALL) return None - def _find_by_name(self, component: Component) -> dict: + def _find_by_name(self, component: Component) -> Optional[Dict[str, Any]]: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + + version = component.version or "" for step in range(3): try: releases = self.client.get_releases_by_name(component.name) @@ -73,23 +87,26 @@ def _find_by_name(self, component: Component) -> dict: return None for r in releases: - if r.get("version", "") == component.version: + if r.get("version", "") == version: return r return None - except sw360.sw360_api.SW360Error as swex: - if swex.response.status_code == requests.codes['not_found']: + except SW360Error as swex: + if swex.response is None: + print_red(" Unknown error: " + swex.message) + elif swex.response.status_code == requests.codes['not_found']: print_yellow( " Not found " + component.name + - ", " + component.version) + ", " + version) break # only report other errors if this is the third attempt if step >= 2: print(Fore.LIGHTRED_EX + " Error retrieving release data: ") print( - " " + component.name + ", " + component.version) - print(" Status Code: " + str(swex.response.status_code)) + " " + component.name + ", " + version) + if swex.response: + print(" Status Code: " + str(swex.response.status_code)) if swex.message: print(" Message: " + swex.message) print(Style.RESET_ALL) @@ -99,6 +116,10 @@ def _find_by_name(self, component: Component) -> dict: def check_releases(self, bom: Bom) -> int: """Checks for each release in the list whether it can be found on the specified SW360 instance.""" + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + found_count = 0 for component in bom.components: release_details = None @@ -116,7 +137,7 @@ def check_releases(self, bom: Bom) -> int: found_count += 1 continue - if not id: + if not sw360id: print_yellow( " " + component.name + ", " + component.version + @@ -125,7 +146,7 @@ def check_releases(self, bom: Bom) -> int: return found_count - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/bom/check_bom_item_status.py b/capycli/bom/check_bom_item_status.py index ecf9c14..44ae9f1 100644 --- a/capycli/bom/check_bom_item_status.py +++ b/capycli/bom/check_bom_item_status.py @@ -1,5 +1,5 @@ # ------------------------------------------------------------------------------- -# Copyright (c) 2019-23 Siemens +# Copyright (c) 2019-2024 Siemens # All Rights Reserved. # Author: thomas.graf@siemens.com # @@ -12,10 +12,10 @@ from typing import Any, Dict, Optional import requests -import sw360.sw360_api from colorama import Fore, Style from cyclonedx.model.bom import Bom from cyclonedx.model.component import Component +from sw360 import SW360Error import capycli.common.script_base from capycli.common.capycli_bom_support import CaPyCliBom, CycloneDxSupport @@ -39,20 +39,26 @@ def _bom_has_items_without_id(self, bom: Bom) -> bool: return False def _find_by_id(self, component: Component) -> Optional[Dict[str, Any]]: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + sw360id = CycloneDxSupport.get_property_value(component, CycloneDxSupport.CDX_PROP_SW360ID) + version = component.version or "" try: release_details = self.client.get_release(sw360id) return release_details - except sw360.sw360_api.SW360Error as swex: - if swex.response.status_code == requests.codes['not_found']: + except SW360Error as swex: + if swex.response is None: + print_red(" Unknown error: " + swex.message) + elif swex.response.status_code == requests.codes['not_found']: print( Fore.LIGHTYELLOW_EX + " Not found " + component.name + - ", " + component.version + ", " + - sw360id + Style.RESET_ALL) + ", " + version + ", " + sw360id + Style.RESET_ALL) else: print(Fore.LIGHTRED_EX + " Error retrieving release data: ") print( - " " + str(component.name) + ", " + str(component.version) + + " " + component.name + ", " + version + ", " + sw360id) print(" Status Code: " + str(swex.response.status_code)) if swex.message: @@ -62,25 +68,32 @@ def _find_by_id(self, component: Component) -> Optional[Dict[str, Any]]: return None def _find_by_name(self, component: Component) -> Optional[Dict[str, Any]]: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + + version = component.version or "" try: releases = self.client.get_releases_by_name(component.name) if not releases: return None for r in releases: - if r.get("version", "") == component.version: + if r.get("version", "") == version: return self.client.get_release_by_url(r["_links"]["self"]["href"]) return None - except sw360.sw360_api.SW360Error as swex: - if swex.response.status_code == requests.codes['not_found']: + except SW360Error as swex: + if swex.response is None: + print_red(" Unknown error: " + swex.message) + elif swex.response.status_code == requests.codes['not_found']: print( Fore.LIGHTYELLOW_EX + " Not found " + component.name + - ", " + component.version + ", " + + ", " + version + ", " + Style.RESET_ALL) else: print(Fore.LIGHTRED_EX + " Error retrieving release data: ") - print(" " + str(component.name) + ", " + str(component.version)) + print(" " + str(component.name) + ", " + str(version)) print(" Status Code: " + str(swex.response.status_code)) if swex.message: print(" Message: " + swex.message) @@ -89,6 +102,10 @@ def _find_by_name(self, component: Component) -> Optional[Dict[str, Any]]: return None def show_bom_item_status(self, bom: Bom, all: bool = False) -> None: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + for component in bom.components: release = None id = CycloneDxSupport.get_property_value(component, CycloneDxSupport.CDX_PROP_SW360ID) @@ -117,6 +134,9 @@ def show_bom_item_status(self, bom: Bom, all: bool = False) -> None: release["_links"]["sw360:component"]["href"] ) ) + if not comp_sw360: + print_red("Error accessing component") + continue rel_list = comp_sw360["_embedded"]["sw360:releases"] print(" " + component.name + ", " + component.version + " => ", end="", flush=True) @@ -124,6 +144,9 @@ def show_bom_item_status(self, bom: Bom, all: bool = False) -> None: for orel in rel_list: href = orel["_links"]["self"]["href"] rel = self.client.get_release_by_url(href) + if not rel: + print_red("Error accessing release " + href) + continue cs = rel.get("clearingState", "(unkown clearing state)") if cs == "APPROVED": print(Fore.LIGHTGREEN_EX, end="", flush=True) @@ -141,7 +164,7 @@ def show_bom_item_status(self, bom: Bom, all: bool = False) -> None: " => --- no id ---") continue - def run(self, args) -> None: + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/bom/check_granularity.py b/capycli/bom/check_granularity.py index 4a13092..a12de20 100644 --- a/capycli/bom/check_granularity.py +++ b/capycli/bom/check_granularity.py @@ -10,11 +10,10 @@ import importlib.resources as pkg_resources except ImportError: # Try backported to PY<37 `importlib_resources`. - import importlib_resources as pkg_resources - + import importlib_resources as pkg_resources # type: ignore import os import sys -from typing import List +from typing import Any, List, Optional import requests from cyclonedx.model import ExternalReferenceType @@ -35,22 +34,22 @@ class PotentialGranularityIssue: """Class to hold potential granularity issues.""" - def __init__(self, component, replacement, comment="", source_url=""): - self.component = component - self.replacement = replacement - self.comment = comment - self.source_url = source_url + def __init__(self, component: str, replacement: str, comment: str = "", source_url: str = "") -> None: + self.component: str = component + self.replacement: str = replacement + self.comment: str = comment + self.source_url: str = source_url class CheckGranularity(capycli.common.script_base.ScriptBase): """ Check the granularity of all releases in the SBOM. """ - def __init__(self): - self.granularity_list = [] + def __init__(self) -> None: + self.granularity_list: List[PotentialGranularityIssue] = [] @staticmethod - def get_granularity_list(download_url): + def get_granularity_list(download_url: str) -> None: '''This will only download granularity file from a public repository. Make sure to give the raw version of the granularity file seperated by ;''' response = requests.get(download_url) @@ -58,7 +57,7 @@ def get_granularity_list(download_url): with open('granularity_list.csv', 'wb') as f1: f1.write(response.content) - def read_granularity_list(self, download_url=None, local_read_granularity=None) -> None: + def read_granularity_list(self, download_url: str = "", local_read_granularity: bool = False) -> None: """Reads the granularity list from file.""" self.granularity_list = [] text_list = "" @@ -117,7 +116,7 @@ def read_granularity_list(self, download_url=None, local_read_granularity=None) issue = PotentialGranularityIssue(component, replacement, comment, source_url) self.granularity_list.append(issue) - def find_match(self, name: str) -> PotentialGranularityIssue or None: + def find_match(self, name: str) -> Optional[PotentialGranularityIssue]: """Finds a match by component name.""" for match in self.granularity_list: if match.component.lower() == name.lower(): @@ -133,24 +132,24 @@ def get_new_fixed_component(self, component: Component, new_name: str, new_src_u language_bak = CycloneDxSupport.get_property(component, CycloneDxSupport.CDX_PROP_LANGUAGE) # build new package-url - purl = "" + purl: PackageURL if component.purl: - old_purl = PackageURL.from_string(component.purl) - purl = PackageURL(old_purl.type, old_purl.namespace, new_name, component.version).to_string() + old_purl = component.purl + purl = PackageURL(old_purl.type, old_purl.namespace, new_name, component.version) if self.search_meta_data: if str(component.purl).startswith("pkg:npm"): GetJavascriptDependencies().try_find_component_metadata(component, "") else: LOG.warning(" No package-url available - creating default purl") - purl = PackageURL("generic", "", new_name, component.version).to_string() + purl = PackageURL("generic", "", new_name, component.version) # create new component (this is the only way to set a new bom_ref) component_new = Component( name=new_name, version=component.version, purl=purl, - bom_ref=purl + bom_ref=purl.to_string() ) # restore properties we can keep @@ -173,7 +172,7 @@ def get_new_fixed_component(self, component: Component, new_name: str, new_src_u def merge_duplicates(self, clist: List[Component]) -> List[Component]: """Checks for each release if there are duplicates after granularity check.""" - new_list = [] + new_list: List[Component] = [] for release in clist: count = len([item for item in new_list if item.name == release.name and item.version == release.version]) @@ -187,7 +186,7 @@ def merge_duplicates(self, clist: List[Component]) -> List[Component]: return new_list - def check_bom_items(self, sbom: Bom): + def check_bom_items(self, sbom: Bom) -> Bom: """Checks for each release in the list whether it can be found on the specified SW360 instance.""" @@ -214,7 +213,7 @@ def check_bom_items(self, sbom: Bom): sbom.components = SortedSet(reduced) return sbom - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG @@ -255,6 +254,8 @@ def run(self, args): print_text("\nLoading SBOM file", args.inputfile) try: sbom = CaPyCliBom.read_sbom(args.inputfile) + for c in sbom.components: + print(c) except Exception as ex: print_red("Error reading SBOM: " + repr(ex)) sys.exit(ResultCode.RESULT_ERROR_READING_BOM) diff --git a/capycli/bom/create_components.py b/capycli/bom/create_components.py index 8df4ac9..8d74f8a 100644 --- a/capycli/bom/create_components.py +++ b/capycli/bom/create_components.py @@ -1,5 +1,5 @@ # ------------------------------------------------------------------------------- -# Copyright (c) 2019-23 Siemens +# Copyright (c) 2019-2024 Siemens # All Rights Reserved. # Author: thomas.graf@siemens.com # @@ -11,15 +11,15 @@ import re import sys import tempfile -from typing import Any, Dict +from typing import Any, Dict, List, Optional from urllib.parse import urlparse import packageurl import requests -import sw360.sw360_api from colorama import Fore, Style from cyclonedx.model.bom import Bom from cyclonedx.model.component import Component +from sw360 import SW360Error import capycli.common.json_support import capycli.common.script_base @@ -54,13 +54,14 @@ class BomCreateComponents(capycli.common.script_base.ScriptBase): " ignore prefixes like \"2:\" (epoch) and suffixes like \".debian\"", ] - def __init__(self, onlyCreateReleases=False): - self.source_folder = None - self.download = False - self.relaxed_debian_parsing = False - self.onlyCreateReleases = onlyCreateReleases + def __init__(self, onlyCreateReleases: bool = False) -> None: + self.source_folder: str = "" + self.download: bool = False + self.relaxed_debian_parsing: bool = False + self.onlyCreateReleases: bool = onlyCreateReleases - def upload_source_file(self, release_id, sourcefile, filetype="SOURCE", comment=""): + def upload_source_file(self, release_id: str, sourcefile: str, + filetype: str = "SOURCE", comment: str = "") -> None: """Upload source code attachment @params: @@ -70,14 +71,20 @@ def upload_source_file(self, release_id, sourcefile, filetype="SOURCE", comment= comment - upload comment for SW360 attachment """ print_text(" Uploading source file: " + sourcefile) + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + try: self.client.upload_release_attachment( release_id, sourcefile, upload_type=filetype, upload_comment=comment) - except sw360.sw360_api.SW360Error as swex: + except SW360Error as swex: errortext = " Error uploading source file: " + self.get_error_message(swex) print(Fore.LIGHTRED_EX + errortext + Style.RESET_ALL) - def upload_file_from_url(self, release_id, url, filename, filetype="SOURCE", comment="", attached_filenames=[]): + def upload_file_from_url(self, release_id: str, url: Optional[str], filename: str, + filetype: str = "SOURCE", comment: str = "", + attached_filenames: List[str] = []) -> None: """Download a file from a URL if it's not available locally and upload the file as attachment to SW360. @@ -111,6 +118,10 @@ def upload_file_from_url(self, release_id, url, filename, filetype="SOURCE", com fullpath = os.path.join(tmpfolder.name, filename) try: + if not url: + print_red(" No url specified!") + return + response = requests.get(url, allow_redirects=True) if (response.status_code == requests.codes["ok"]): print_text(" Writing file", fullpath) @@ -118,7 +129,7 @@ def upload_file_from_url(self, release_id, url, filename, filetype="SOURCE", com open(fullpath, "wb").write(response.content) if response.headers.__contains__("content-disposition"): header = response.headers.get("content-disposition") - if header.__contains__("filename="): + if header and header.__contains__("filename="): print_text(" Found header:", header) newfilename = header.split("=")[-1] newfilename = newfilename.strip('"') @@ -152,7 +163,7 @@ def upload_file_from_url(self, release_id, url, filename, filetype="SOURCE", com if tmpfolder: tmpfolder.cleanup() - def prepare_release_data(self, cx_comp: Component) -> dict: + def prepare_release_data(self, cx_comp: Component) -> Dict[str, Any]: """Create release data structure as expected by SW360 REST API :param item: a single bill of materials item - a release @@ -160,16 +171,16 @@ def prepare_release_data(self, cx_comp: Component) -> dict: :return: the release :rtype: release (dictionary) """ - data = {} + data: Dict[str, Any] = {} data["name"] = cx_comp.name - data["version"] = cx_comp.version + data["version"] = cx_comp.version or "" # mandatory properties - src_url = CycloneDxSupport.get_ext_ref_source_url(cx_comp) + src_url = str(CycloneDxSupport.get_ext_ref_source_url(cx_comp)) if src_url: data["sourceCodeDownloadurl"] = src_url - bin_url = CycloneDxSupport.get_ext_ref_binary_url(cx_comp) + bin_url = str(CycloneDxSupport.get_ext_ref_binary_url(cx_comp)) if bin_url: data["binaryDownloadurl"] = bin_url @@ -177,7 +188,7 @@ def prepare_release_data(self, cx_comp: Component) -> dict: if cx_comp.purl: data["externalIds"] = {} # ensure that we have the only correct external-id name: package-url - data["externalIds"]["package-url"] = cx_comp.purl + data["externalIds"]["package-url"] = cx_comp.purl.to_string() # use project site as fallback for source code download url website = CycloneDxSupport.get_ext_ref_website(cx_comp) @@ -185,10 +196,10 @@ def prepare_release_data(self, cx_comp: Component) -> dict: if not src_url: if repo: print(" Using repository for source code download URL...") - data["sourceCodeDownloadurl"] = repo + data["sourceCodeDownloadurl"] = str(repo) elif website: print(" Using website for source code download URL...") - data["sourceCodeDownloadurl"] = website + data["sourceCodeDownloadurl"] = str(website) language = CycloneDxSupport.get_property_value(cx_comp, CycloneDxSupport.CDX_PROP_LANGUAGE) if language: @@ -197,7 +208,7 @@ def prepare_release_data(self, cx_comp: Component) -> dict: return data - def prepare_component_data(self, cx_comp: Component) -> dict: + def prepare_component_data(self, cx_comp: Component) -> Dict[str, Any]: """Create component data structure as expected by SW360 REST API :param item: single bill of materials item - a release @@ -205,7 +216,7 @@ def prepare_component_data(self, cx_comp: Component) -> dict: :return: the release structure :rtype: release (dictionary) """ - data = {} + data: Dict[str, Any] = {} data["description"] = "n/a" if cx_comp.description: data["description"] = cx_comp.description @@ -213,11 +224,12 @@ def prepare_component_data(self, cx_comp: Component) -> dict: language = CycloneDxSupport.get_property_value(cx_comp, CycloneDxSupport.CDX_PROP_LANGUAGE) if language: - data["languages"] = [] - data["languages"].append(language) + languages: List[str] = [] + languages.append(language) + data["languages"] = languages # optional properties - categories = [] + categories: List[str] = [] cat = CycloneDxSupport.get_property_value(cx_comp, CycloneDxSupport.CDX_PROP_CATEGORIES) if cat: categories.append(cat) @@ -230,14 +242,14 @@ def prepare_component_data(self, cx_comp: Component) -> dict: data["homepage"] = "n/a" website = CycloneDxSupport.get_ext_ref_website(cx_comp) if website: - data["homepage"] = website + data["homepage"] = str(website) if cx_comp.purl: purl = PurlUtils.component_purl_from_release_purl(cx_comp.purl) data["externalIds"] = {"package-url": purl} return data - def create_release(self, cx_comp: Component, component_id) -> dict: + def create_release(self, cx_comp: Component, component_id: str) -> Optional[Dict[str, Any]]: """Create a new release on SW360 :param item: a single bill of materials item - a release @@ -247,20 +259,24 @@ def create_release(self, cx_comp: Component, component_id) -> dict: :return: the release :rtype: release (dictionary) """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + data = self.prepare_release_data(cx_comp) # ensure that the release mainline state is properly set data["mainlineState"] = "OPEN" try: release_new = self.client.create_new_release( - cx_comp.name, cx_comp.version, + cx_comp.name, cx_comp.version or "", component_id, release_details=data) - except sw360.sw360_api.SW360Error as swex: + except SW360Error as swex: errortext = " Error creating component: " + self.get_error_message(swex) print_red(errortext) sys.exit(ResultCode.RESULT_ERROR_CREATING_COMPONENT) return release_new - def update_release(self, cx_comp: Component, release_data: Dict[str, Any]): + def update_release(self, cx_comp: Component, release_data: Dict[str, Any]) -> None: """Update an existing release on SW360 :param item: a single bill of materials item - a release @@ -268,6 +284,10 @@ def update_release(self, cx_comp: Component, release_data: Dict[str, Any]): :param release_data: SW360 release data :type release_data: release (dictionary) """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + release_id = self.get_sw360_id(release_data) data = self.prepare_release_data(cx_comp) @@ -327,29 +347,32 @@ def update_release(self, cx_comp: Component, release_data: Dict[str, Any]): self.upload_file(cx_comp, release_data, release_id, filetype, file_comment) def upload_file( - self, cx_comp: Component, release_data: dict, - release_id: str, filetype: str, comment: str): + self, cx_comp: Component, release_data: Dict[str, Any], + release_id: str, filetype: str, comment: str) -> None: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) url = None filename = None filehash = None if filetype in ["SOURCE", "SOURCE_SELF"]: - url = CycloneDxSupport.get_ext_ref_source_url(cx_comp) - filename = CycloneDxSupport.get_ext_ref_source_file(cx_comp) - filehash = CycloneDxSupport.get_source_file_hash(cx_comp) + url = str(CycloneDxSupport.get_ext_ref_source_url(cx_comp)) + filename = str(CycloneDxSupport.get_ext_ref_source_file(cx_comp)) + filehash = str(CycloneDxSupport.get_source_file_hash(cx_comp)) if filetype in ["BINARY", "BINARY_SELF"]: - url = CycloneDxSupport.get_ext_ref_binary_url(cx_comp) - filename = CycloneDxSupport.get_ext_ref_binary_file(cx_comp) - filehash = CycloneDxSupport.get_binary_file_hash(cx_comp) + url = str(CycloneDxSupport.get_ext_ref_binary_url(cx_comp)) + filename = str(CycloneDxSupport.get_ext_ref_binary_file(cx_comp)) + filehash = str(CycloneDxSupport.get_binary_file_hash(cx_comp)) # Note that we retreive the SHA1 has from the CycloneDX data. # But there is no guarantee that this *IS* really a SHA1 hash! - if (filename is None or filename == '') and url: - filename = urlparse(url) - if filename: - filename = os.path.basename(filename.path) + if (filename is None or filename == "") and url: + filename_parsed = urlparse(url) + if filename_parsed: + filename = os.path.basename(filename_parsed.path) if not filename: print_red(" Unable to identify filename from url!") @@ -365,7 +388,7 @@ def upload_file( for attachment in release_data.get("_embedded", {}).get("sw360:attachments", []): if attachment["attachmentType"].startswith(filetype_pattern): at_info = self.client.get_attachment_by_url(attachment['_links']['self']['href']) - if at_info.get("checkStatus", "") == "REJECTED": + if at_info and at_info.get("checkStatus", "") == "REJECTED": continue source_attachment_exists = True attached_filenames.append(attachment["filename"]) @@ -383,7 +406,7 @@ def upload_file( if not source_attachment_exists: self.upload_file_from_url(release_id, url, filename, filetype, comment, attached_filenames) - def search_for_release(self, component, cx_comp: Component): + def search_for_release(self, component: Dict[str, Any], cx_comp: Component) -> Optional[Dict[str, Any]]: """Checks whether the given component already contains the requested release @@ -400,6 +423,10 @@ def search_for_release(self, component, cx_comp: Component): if "sw360:releases" not in component["_embedded"]: return None + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + for comprel in component["_embedded"]["sw360:releases"]: if comprel.get("version", None) == cx_comp.version: return self.client.get_release_by_url(comprel["_links"]["self"]["href"]) @@ -408,6 +435,8 @@ def search_for_release(self, component, cx_comp: Component): # if there's no exact match, try relaxed search for comprel in component["_embedded"]["sw360:releases"]: # "2:5.2.1-1.debian" -> "5.2.1-1" + if not cx_comp.version: + continue bom_pattern = re.sub("^[0-9]+:", "", cx_comp.version) bom_pattern = re.sub(r"[\. \(]*[dD]ebian[ \)]*$", "", bom_pattern) sw360_pattern = re.sub("^[0-9]+:", "", comprel.get("version", "")) @@ -430,25 +459,34 @@ def get_component(self, cx_comp: Component) -> str: :param item: BOM item :return: id or None """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + component = CycloneDxSupport.get_property_value(cx_comp, CycloneDxSupport.CDX_PROP_COMPONENT_ID) if not component: if self.onlyCreateReleases: print_yellow(" No component id in bom, skipping due to createreleases mode!") - return None + return "" components = self.client.get_component_by_name(cx_comp.name) - if not component and components["_embedded"]["sw360:components"]: - for compref in components["_embedded"]["sw360:components"]: - if compref["name"].lower() != cx_comp.name.lower(): - continue - else: - component = self.get_sw360_id(compref) - break + if components: + if not component and components["_embedded"]["sw360:components"]: + for compref in components["_embedded"]["sw360:components"]: + if compref["name"].lower() != cx_comp.name.lower(): + continue + else: + component = self.get_sw360_id(compref) + break return component - def create_component(self, cx_comp: Component) -> dict: + def create_component(self, cx_comp: Component) -> Optional[Dict[str, Any]]: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + data = self.prepare_component_data(cx_comp) try: component_new = self.client.create_new_component( @@ -459,13 +497,12 @@ def create_component(self, cx_comp: Component) -> dict: component_details=data) print_yellow(" Component created") return component_new - except sw360.sw360_api.SW360Error as swex: + except SW360Error as swex: errortext = " Error creating component: " + self.get_error_message(swex) print_red(errortext) sys.exit(ResultCode.RESULT_ERROR_CREATING_COMPONENT) - return None - def update_component(self, cx_comp: Component, component_id, component_data): + def update_component(self, cx_comp: Component, component_id: str, component_data: Dict[str, Any]) -> None: """Update an existing component on SW360 :param item: a single bill of materials item - a component @@ -475,6 +512,10 @@ def update_component(self, cx_comp: Component, component_id, component_data): :param component_data: SW360 component data :type component_data: component (dictionary) """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + purl = "" if cx_comp.purl: purl = PurlUtils.component_purl_from_release_purl(cx_comp.purl) @@ -498,15 +539,22 @@ def update_component(self, cx_comp: Component, component_id, component_data): component_data["externalIds"]["package-url"], "differs from BOM id", purl) - def get_sw360_id(self, sw360_object: dict): + def get_sw360_id(self, sw360_object: Dict[str, Any]) -> str: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + return self.client.get_id_from_href(sw360_object["_links"]["self"]["href"]) - def create_component_and_release(self, cx_comp: Component): + def create_component_and_release(self, cx_comp: Component) -> None: """Create new releases and if necessary also new components :param item: a single bill of materials item - a release :type item: dictionary """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) release = None @@ -517,8 +565,9 @@ def create_component_and_release(self, cx_comp: Component): # get full component info component = self.client.get_component(component_id) - self.update_component(cx_comp, component_id, component) - release = self.search_for_release(component, cx_comp) + if component: + self.update_component(cx_comp, component_id, component) + release = self.search_for_release(component, cx_comp) else: if self.onlyCreateReleases: print_red(" Component doesn't exist!") @@ -526,6 +575,10 @@ def create_component_and_release(self, cx_comp: Component): # create component component = self.create_component(cx_comp) + if not component: + print_red("Component creation failed!") + return + component_id = self.get_sw360_id(component) try: @@ -533,12 +586,16 @@ def create_component_and_release(self, cx_comp: Component): item_name = ScriptSupport.get_full_name_from_component(cx_comp) print_red(" " + item_name + " already exists") else: + if not component: + return + release = self.create_release( cx_comp, self.get_sw360_id(component)) print_text(" Release created") - self.update_release(cx_comp, release) - except sw360.sw360_api.SW360Error as swex: + if release: + self.update_release(cx_comp, release) + except SW360Error as swex: errortext = " Error creating release: " + self.get_error_message(swex) print_red(errortext) sys.exit(ResultCode.RESULT_ERROR_CREATING_RELEASE) @@ -554,6 +611,10 @@ def create_items(self, sbom: Bom) -> None: :param bom: the bill of materials :type bom: list of components """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + ok = True for cx_comp in sbom.components: @@ -561,7 +622,9 @@ def create_items(self, sbom: Bom) -> None: id = CycloneDxSupport.get_property_value(cx_comp, CycloneDxSupport.CDX_PROP_SW360ID) if id: print_text(" " + item_name + " already exists") - self.update_release(cx_comp, self.client.get_release(id)) + rel = self.client.get_release(id) + if rel: + self.update_release(cx_comp, rel) else: print_text(" " + item_name) self.create_component_and_release(cx_comp) @@ -578,7 +641,7 @@ def create_items(self, sbom: Bom) -> None: print_red("An error occured during component/release creation!") sys.exit(ResultCode.RESULT_ERROR_CREATING_ITEM) - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/bom/csv.py b/capycli/bom/csv.py index 6ca0855..baee478 100644 --- a/capycli/bom/csv.py +++ b/capycli/bom/csv.py @@ -9,6 +9,7 @@ from typing import List from cyclonedx.model.component import Component +from sortedcontainers import SortedSet from capycli import LOG @@ -56,7 +57,7 @@ def csv_to_cdx_components(cls, inputfile: str) -> List[Component]: return bom @classmethod - def write_cdx_components_as_csv(cls, bom: List[Component], outputfile: str) -> None: + def write_cdx_components_as_csv(cls, bom: SortedSet, outputfile: str) -> None: LOG.debug(f"Writing to file {outputfile}") with open(outputfile, "w", encoding="utf-8") as fout: for cx_comp in bom: diff --git a/capycli/bom/diff_bom.py b/capycli/bom/diff_bom.py index bfd4764..a5bba51 100644 --- a/capycli/bom/diff_bom.py +++ b/capycli/bom/diff_bom.py @@ -9,7 +9,7 @@ import os import sys from enum import Enum -from typing import Tuple +from typing import Any, Dict, List, Optional, Tuple from cyclonedx.model.bom import Bom from cyclonedx.model.component import Component @@ -49,11 +49,11 @@ class DiffType(str, Enum): class DiffBom(capycli.common.script_base.ScriptBase): """Compare two SBOM files """ - def __init__(self): - self.equal_bom = None - self.diff_bom = None + def __init__(self) -> None: + self.equal_bom: Bom + self.diff_bom: Bom - def find_in_bom(self, bom: Bom, component: Component) -> Component or None: + def find_in_bom(self, bom: Bom, component: Component) -> Optional[Component]: """Searches for an item with the given name and version in the given SBOM.""" for c in bom.components: if MergeBom.are_same(c, component): @@ -88,9 +88,9 @@ def compare_boms(self, bom_old: Bom, bom_new: Bom) -> Tuple[Bom, Bom]: return equal_bom, diff_bom - def compare_boms_with_updates(self, bom_old: Bom, bom_new: Bom) -> list: + def compare_boms_with_updates(self, bom_old: Bom, bom_new: Bom) -> List[Dict[str, Any]]: """Determine differences in the bills or materials.""" - result = [] + result: List[Dict[str, Any]] = [] for comp_old in bom_old.components: ritem = {} @@ -116,7 +116,7 @@ def compare_boms_with_updates(self, bom_old: Bom, bom_new: Bom) -> list: return self.check_for_updates(result) - def check_for_updates(self, result: list) -> list: + def check_for_updates(self, result: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Try to determine if the differences are updates of existing components.""" removeList = [] @@ -151,7 +151,7 @@ def check_for_updates(self, result: list) -> list: return result - def display_result(self, result: list, show_identical: bool): + def display_result(self, result: List[Dict[str, Any]], show_identical: bool) -> None: for item in result: if item["Result"] == DiffType.IDENTICAL: if show_identical: @@ -191,11 +191,11 @@ def display_result(self, result: list, show_identical: bool): str(item["Result"]) + ": " + item["Name"] + ", " + item["Version"]) - def write_result_to_json(self, filename: str, result: list): + def write_result_to_json(self, filename: str, result: List[Dict[str, Any]]) -> None: """Write comparison result to a JSON file.""" capycli.common.json_support.write_json_to_file(result, filename) - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/bom/download_sources.py b/capycli/bom/download_sources.py index 6a9bbd2..7a00258 100644 --- a/capycli/bom/download_sources.py +++ b/capycli/bom/download_sources.py @@ -12,11 +12,11 @@ import pathlib import re import sys -from typing import Optional, Tuple +from typing import Any, Optional, Tuple from urllib.parse import urlparse import requests -from cyclonedx.model import ExternalReference, ExternalReferenceType, HashAlgorithm, HashType +from cyclonedx.model import ExternalReference, ExternalReferenceType, HashAlgorithm, HashType, XsUri from cyclonedx.model.bom import Bom import capycli.common.json_support @@ -34,18 +34,18 @@ class BomDownloadSources(capycli.common.script_base.ScriptBase): Download source files from the URL specified in the SBOM. """ - def get_filename_from_cd(self, cd: str): + def get_filename_from_cd(self, cd: str) -> str: """ Get filename from content-disposition. """ if not cd: - return None + return "" fname = re.findall('filename=(.+)', cd) if len(fname) == 0: - return None + return "" return fname[0].rstrip('"').lstrip('"') - def download_source_file(self, url: str, source_folder: str) -> Optional[Tuple]: + def download_source_file(self, url: str, source_folder: str) -> Optional[Tuple[str, str]]: """Download a file from a URL. @params: @@ -56,11 +56,11 @@ def download_source_file(self, url: str, source_folder: str) -> Optional[Tuple]: try: response = requests.get(url, allow_redirects=True) - filename = self.get_filename_from_cd(response.headers.get('content-disposition')) + filename = self.get_filename_from_cd(response.headers.get("content-disposition", "")) if not filename: - filename = urlparse(url) - if filename: - filename = os.path.basename(filename.path) + filename_ps = urlparse(url) + if filename_ps: + filename = os.path.basename(filename_ps.path) elif not filename: print_red(" Unable to identify filename from url!") @@ -95,7 +95,7 @@ def download_sources(self, sbom: Bom, source_folder: str) -> None: source_url = CycloneDxSupport.get_ext_ref_source_url(component) if source_url: - result = self.download_source_file(source_url, source_folder) + result = self.download_source_file(source_url._uri, source_folder) else: result = None print_red(" No URL specified!") @@ -114,32 +114,35 @@ def download_sources(self, sbom: Bom, source_folder: str) -> None: ext_ref = ExternalReference( reference_type=ExternalReferenceType.DISTRIBUTION, comment=CaPyCliBom.SOURCE_FILE_COMMENT, - url=path) + url=XsUri(path)) new = True else: - ext_ref.url = path + ext_ref.url = XsUri(path) ext_ref.hashes.add(HashType( algorithm=HashAlgorithm.SHA_1, hash_value=sha1)) if new: component.external_references.add(ext_ref) - def update_local_path(self, sbom: Bom, bomfile: str): + def update_local_path(self, sbom: Bom, bomfile: str) -> None: bompath = pathlib.Path(bomfile).parent for component in sbom.components: ext_ref = CycloneDxSupport.get_ext_ref( component, ExternalReferenceType.DISTRIBUTION, CaPyCliBom.SOURCE_FILE_COMMENT) if ext_ref: try: - name = CycloneDxSupport.have_relative_ext_ref_path(ext_ref, bompath) + name = CycloneDxSupport.have_relative_ext_ref_path(ext_ref, bompath.as_posix()) CycloneDxSupport.update_or_set_property( component, CycloneDxSupport.CDX_PROP_FILENAME, name) except ValueError: - print_yellow(" SBOM file is not relative to source file " + ext_ref.url) + if type(ext_ref.url) is XsUri: + print_yellow(" SBOM file is not relative to source file " + ext_ref.url._uri) + else: + print_yellow(" SBOM file is not relative to source file " + str(ext_ref.url)) - def run(self, args): + def run(self, args: Any) -> None: """Main method @params: diff --git a/capycli/bom/filter_bom.py b/capycli/bom/filter_bom.py index 5fd6c25..bc705ad 100644 --- a/capycli/bom/filter_bom.py +++ b/capycli/bom/filter_bom.py @@ -9,7 +9,7 @@ import json import os import sys -from typing import Optional +from typing import Any, Dict, List, Optional from cyclonedx.model import ExternalReferenceType from cyclonedx.model.bom import Bom @@ -53,27 +53,27 @@ class FilterBom(capycli.common.script_base.ScriptBase): ] } """ - def __init__(self): + def __init__(self) -> None: self.verbose = False - def load_filter_file(self, filter_file: str) -> dict: + def load_filter_file(self, filter_file: str) -> Dict[str, Any]: """Load a single filter file - without any further processing""" f = open(filter_file, "r") filter = json.load(f) return filter - def append_components(self, clist: list, to_add_list: list): + def append_components(self, clist: List[Dict[str, Any]], to_add_list: List[Dict[str, Any]]) -> None: for to_add in to_add_list: clist.append(to_add) - def show_filter(self, filter: dict) -> None: + def show_filter(self, filter: Dict[str, Any]) -> None: for entry in filter["Components"]: comp = entry["component"] print_text( " ", comp.get("Name", ""), comp.get("Version", ""), comp.get("RepositoryId", ""), entry["Mode"]) - def find_bom_item(self, bom: Bom, filterentry: dict) -> Optional[Component]: + def find_bom_item(self, bom: Bom, filterentry: Dict[str, Any]) -> Optional[Component]: """Find an entry in list of bom items.""" for component in bom.components: if component.purl: @@ -92,11 +92,11 @@ def find_bom_item(self, bom: Bom, filterentry: dict) -> Optional[Component]: return None - def create_bom_item_from_filter_entry(self, filterentry: dict) -> Component: + def create_bom_item_from_filter_entry(self, filterentry: Dict[str, Any]) -> Component: comp = LegacySupport.legacy_component_to_cdx(filterentry) return comp - def update_bom_item_from_filter_entry(self, component: Component, filterentry: dict): + def update_bom_item_from_filter_entry(self, component: Component, filterentry: Dict[str, Any]) -> None: if filterentry["Name"]: component.name = filterentry["Name"] @@ -210,9 +210,9 @@ def filter_bom(self, bom: Bom, filter_file: str) -> Bom: if component.purl: if prefix: - match = component.purl.startswith(prefix) + match = component.purl.to_string().startswith(prefix) else: - match = component.purl == filterentry["component"]["RepositoryId"] + match = component.purl.to_string() == filterentry["component"]["RepositoryId"] if match: if filterentry["Mode"] == "remove": @@ -231,7 +231,7 @@ def filter_bom(self, bom: Bom, filter_file: str) -> Bom: if existing_entry: self.update_bom_item_from_filter_entry(existing_entry, filterentry["component"]) if self.verbose: - print_text(" Updated " + existing_entry.name + ", " + existing_entry.version) + print_text(" Updated " + existing_entry.name + ", " + (existing_entry.version or "")) else: if filterentry["component"].get("Name") is None: print_red("To be added dependency missing Name attribute in Filter file.") @@ -240,7 +240,7 @@ def filter_bom(self, bom: Bom, filter_file: str) -> Bom: bomitem = self.create_bom_item_from_filter_entry(filterentry["component"]) list_temp.append(bomitem) if self.verbose: - print_text(" Added " + bomitem.name + ", " + bomitem.version) + print_text(" Added " + bomitem.name + ", " + (bomitem.version or "")) filterentry["Processed"] = True @@ -260,7 +260,7 @@ def filter_bom(self, bom: Bom, filter_file: str) -> Bom: return bom - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/bom/findsources.py b/capycli/bom/findsources.py index b498337..10ec65c 100644 --- a/capycli/bom/findsources.py +++ b/capycli/bom/findsources.py @@ -11,7 +11,7 @@ import re import sys import time -from typing import Any +from typing import Any, Dict, List, Tuple import requests import semver @@ -36,13 +36,13 @@ class FindSources(capycli.common.script_base.ScriptBase): """Go through the list of SBOM items and try to determine the source code.""" - def __init__(self): - self.verbose = False + def __init__(self) -> None: + self.verbose: bool = False self.version_regex = re.compile(r"[\d+\.|_]+[\d+]") self.github_project_name_regex = re.compile(r"^[a-zA-Z0-9-]+(/[a-zA-Z0-9-]+)*$") - self.github_name = None - self.github_token = None - self.sw360_url = os.environ.get("SW360ServerUrl", None) + self.github_name: str = "" + self.github_token: str = "" + self.sw360_url: str = os.environ.get("SW360ServerUrl", "") def is_sourcefile_accessible(self, sourcefile_url: str) -> bool: """Check if the URL is accessible.""" @@ -70,7 +70,7 @@ def is_sourcefile_accessible(self, sourcefile_url: str) -> bool: return False @staticmethod - def github_request(url: str, username="", token=""): + def github_request(url: str, username: str = "", token: str = "") -> Any: try: headers = {} if token: @@ -96,10 +96,11 @@ def github_request(url: str, username="", token=""): Fore.LIGHTYELLOW_EX + " Error acccessing GitHub: " + repr(ex) + Style.RESET_ALL) - return None + + return {} @staticmethod - def get_repositories(name: str, language: str, username="", token=""): + def get_repositories(name: str, language: str, username: str = "", token: str = "") -> Any: """Query for GitHub repositories""" query = name + " language:" + language.lower() search_url = "https://api.github.com/search/repositories?q=" + query @@ -122,16 +123,27 @@ def get_repo_name(github_url: str) -> str: Fore.LIGHTYELLOW_EX + " Error getting repo name from: " + github_url + Style.RESET_ALL) - return None + return "" return repo_name + if not sys.version_info < (3, 10): + get_github_info_type = List[Dict[str, Any]] | Dict[str, Any] + else: + get_github_info_type = Any + @staticmethod - def get_github_info(repository_url: str, username="", token="") -> Any: - """Query tag infos from GitHub.""" + def get_github_info(repository_url: str, username: str = "", + token: str = "") -> get_github_info_type: + """ + Query tag infos from GitHub. + + In the good case a list of tags entries (= dictionaries) is returned. + In the bad case a JSON error message is returned. + """ length_per_page = 100 page = 1 - tags = [] + tags: List[Dict[str, Any]] = [] tag_url = "https://api.github.com/repos/" + repository_url + "/tags" query = "?per_page=%s&page=%s" % (length_per_page, page) tmp = FindSources.github_request(tag_url + query, username, token) @@ -160,7 +172,7 @@ def to_semver_string(self, version: str) -> str: return str(ver + ".0") return ver - def find_github_url(self, component: Component, use_language=True) -> str: + def find_github_url(self, component: Component, use_language: bool = True) -> str: """ Find github url for component""" if not component: return "" @@ -182,7 +194,7 @@ def find_github_url(self, component: Component, use_language=True) -> str: if len(name_match): for match in name_match: tag_info = self.github_request(match["tags_url"], self.github_name, self.github_token) - source_url = self.get_matching_tag(tag_info, component.version, match["html_url"]) + source_url = self.get_matching_tag(tag_info, component.version or "", match["html_url"]) if len(name_match) == 1: return source_url elif source_url: @@ -195,21 +207,26 @@ def get_pkg_go_repo_url(self, name: str) -> str: link_repo = repo_request_url try: pkg_go_page = requests.get(repo_request_url) + if not pkg_go_page: + return "" + soup = BeautifulSoup(pkg_go_page.text, 'html.parser') - link_repo = soup.find('div', class_='UnitMeta-repo').find('a').get("href") + link_repo = soup.find('div', class_='UnitMeta-repo').find('a').get("href") # type: ignore except Exception as ex: print( Fore.LIGHTYELLOW_EX + " Error trying to get repository url: " + repr(ex) + Style.RESET_ALL) + return link_repo def find_golang_url(self, component: Component) -> str: """ Find github url for component""" if not component: return "" - component_name = component.name - component_version = component.version + + component_name = component.name or "" + component_version = component.version or "" suffix = "+incompatible" if component_version.endswith(suffix): component_version = component_version[:-len(suffix)] @@ -244,9 +261,10 @@ def find_golang_url(self, component: Component) -> str: if repository_name.startswith("https://github.com/"): repository_name = repository_name[len("https://github.com/"):] - tag_info = self.get_github_info(repository_name, self.github_name, - self.github_token) - source_url = self.get_matching_tag(tag_info, component_version, repository_name, version_prefix) + tag_info = self.get_github_info(repository_name, self.github_name, self.github_token) + tag_info_checked = self.check_for_github_error(tag_info) + source_url = self.get_matching_tag(tag_info_checked, component_version, + repository_name, version_prefix or "") # component["RepositoryUrl"] = repository_name return source_url @@ -268,15 +286,13 @@ def get_github_source_url(self, github_url: str, version: str) -> str: print_text(" repo_name:", repo_name) tag_info = self.get_github_info(repo_name, self.github_name, self.github_token) - return self.get_matching_tag(tag_info, version, github_url) + tag_info_checked = self.check_for_github_error(tag_info) + return self.get_matching_tag(tag_info_checked, version, github_url) - def get_matching_tag(self, tag_info: list, version: str, github_url: str, version_prefix=None): - if not tag_info or (len(tag_info) == 0): - print( - Fore.LIGHTRED_EX + - " No tags info reply from GitHub! " + github_url + - Style.RESET_ALL) - return None + def check_for_github_error(self, tag_info: get_github_info_type) -> List[Dict[str, Any]]: + if isinstance(tag_info, list): + # assume valid answer + return tag_info # check for 'rate limit exceeded' message if "message" in tag_info: @@ -287,17 +303,26 @@ def get_matching_tag(self, tag_info: list, version: str, github_url: str, versio print_red("Invalid GitHub credential provided - aborting!") sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SERVICE) + return [] + + def get_matching_tag(self, tag_info: List[Dict[str, Any]], version: str, github_url: str, + version_prefix: str = "") -> str: + if not tag_info or (len(tag_info) == 0): + print( + Fore.LIGHTRED_EX + + " No tags info reply from GitHub! " + github_url + + Style.RESET_ALL) + return "" + # search for a tag matching our given version information matching_tag = None for tag in tag_info: - if isinstance(tag, str): - # this should be dictionary, if it is a string then - # something went wrong! - continue try: - if version_prefix and tag.get("name").rpartition("/")[0] != version_prefix: - continue + if version_prefix: + name = tag.get("name") + if name and name.rpartition("/")[0] != version_prefix: + continue version_diff = semver.VersionInfo.parse( self.to_semver_string(tag.get("name", None))).compare( @@ -323,7 +348,7 @@ def get_matching_tag(self, tag_info: list, version: str, github_url: str, versio # print("matching_tag", matching_tag) source_url = matching_tag.get("zipball_url", "") if source_url == "": - return None + return "" source_url = source_url.replace( "https://api.github.com/repos", "https://github.com").replace( "zipball/refs/tags", "archive/refs/tags") @@ -333,35 +358,52 @@ def get_matching_tag(self, tag_info: list, version: str, github_url: str, versio def get_source_url_from_release(self, release_id: str) -> str: """ get source code url from release """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + for x in range(5): try: # print(self.client) release_details = self.client.get_release(release_id) - source_url = release_details.get("sourceCodeDownloadurl", "") - if self.verbose: - print("getting source url from get from sw360 for release_id " + release_id) - if source_url != "": - return source_url - break + if release_details: + source_url = release_details.get("sourceCodeDownloadurl", "") + if self.verbose: + print("getting source url from get from sw360 for release_id " + release_id) + if source_url != "": + return source_url + break except SW360Error as ex: - if x < 4 and ex.response.status_code == requests.codes["bad_gateway"]: + if x < 4 and ex.response and ex.response.status_code == requests.codes["bad_gateway"]: time.sleep(5) else: raise ex - return None + return "" def get_release_component_id(self, release_id: str) -> str: """ get the component id of a release """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) release_details = self.client.get_release(release_id) + if not release_details: + return "" + return str(release_details["_links"]["sw360:component"]["href"]).split('/')[-1] def find_source_url_from_component(self, component_id: str) -> str: """ find source code url from component releases """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) component_details = self.client.get_component(component_id) - source_url = None + if not component_details: + return "" + + source_url = "" github = "github.com" if component_details.get("_embedded") and \ component_details["_embedded"].get("sw360:releases"): @@ -375,7 +417,7 @@ def find_source_url_from_component(self, component_id: str) -> str: print(f'{source_url} found over component_id {component_id}') if not source_url and "github.com" in component_details.get("homepage", ""): - source_url = component_details.get("homepage") + source_url = component_details.get("homepage") or "" if source_url and self.verbose: print(f'{source_url} found on github over component homepage') @@ -383,7 +425,7 @@ def find_source_url_from_component(self, component_id: str) -> str: def find_source_url_on_release(self, component: Component) -> str: """find the url from sourceCodeDownloadurl from the Id or Sw360Id""" - url = None + url = "" release_id = "" for val in component.properties: if val.name == "siemens:sw360Id": @@ -391,13 +433,14 @@ def find_source_url_on_release(self, component: Component) -> str: if release_id: # get the existing source_url for any kind of release. url = self.get_source_url_from_release(release_id) + return url def find_source_url_recursive_by_sw360(self, component: Component) -> str: """find the url via an other release of the parent component""" - url = None + url = "" found_by_component = False - version = component.version + version = component.version or "" release_id = "" component_id = "" for val in component.properties: @@ -428,7 +471,7 @@ def find_source_url_by_language(component: Component) -> str: capycli.dependencies.javascript.GetJavascriptDependencies().try_find_component_metadata(component, "") return CycloneDxSupport.get_ext_ref_source_url(component) - def find_sources(self, bom: Bom): + def find_sources(self, bom: Bom) -> Tuple[int, int]: """Go through the list of SBOM items and try to determine the source code.""" print_text("\nLooping through SBOM:") @@ -471,7 +514,7 @@ def find_sources(self, bom: Bom): if self.verbose: print_text(" Repository URL available:", repository_url) source_url = self.get_github_source_url( - repository_url, + str(repository_url), component.version) binary_url = CycloneDxSupport.get_ext_ref_binary_url(component) if binary_url and not source_url: @@ -485,14 +528,14 @@ def find_sources(self, bom: Bom): if self.verbose: print_text(" Project site URL available:", website) source_url = self.get_github_source_url( - website, + str(website), component.version) source_code_url = CycloneDxSupport.get_ext_ref_source_code_url(component) if source_code_url and not source_url: if self.verbose: print_text(" Repository URL available:", source_code_url) source_url = self.get_github_source_url( - source_code_url, + str(source_code_url), component.version) # look via the component @@ -529,7 +572,7 @@ def find_sources(self, bom: Bom): return (found_count, exist_count) - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/bom/handle_bom.py b/capycli/bom/handle_bom.py index c7ffc2e..6f7fb92 100644 --- a/capycli/bom/handle_bom.py +++ b/capycli/bom/handle_bom.py @@ -7,6 +7,7 @@ # ------------------------------------------------------------------------------- import sys +from typing import Any import capycli.bom.bom_convert import capycli.bom.check_bom @@ -24,7 +25,7 @@ from capycli.main.result_codes import ResultCode -def run_bom_command(args) -> None: +def run_bom_command(args: Any) -> None: command = args.command[0].lower() if command != "bom": return @@ -53,81 +54,81 @@ def run_bom_command(args) -> None: subcommand = args.command[1].lower() if subcommand == "show": """Print SBOM contents to stdout.""" - app = capycli.bom.show_bom.ShowBom() - app.run(args) + app1 = capycli.bom.show_bom.ShowBom() + app1.run(args) return if subcommand == "filter": """Apply a filter file to a SBOM.""" - app = capycli.bom.filter_bom.FilterBom() - app.run(args) + app2 = capycli.bom.filter_bom.FilterBom() + app2.run(args) return if subcommand == "check": """Check that all releases listed in the SBOM really exist on the given target SW360 instance.""" - app = capycli.bom.check_bom.CheckBom() - app.run(args) + app3 = capycli.bom.check_bom.CheckBom() + app3.run(args) return if subcommand == "checkitemstatus": """Show additional information about SBOM items on SW360.""" - app = capycli.bom.check_bom_item_status.CheckBomItemStatus() - app.run(args) + app4 = capycli.bom.check_bom_item_status.CheckBomItemStatus() + app4.run(args) return if subcommand == "map": """Map a given SBOM to data on SW360.""" - app = capycli.bom.map_bom.MapBom() - app.run(args) + app5 = capycli.bom.map_bom.MapBom() + app5.run(args) return if subcommand == "createreleases": """Create new releases on SW360 for existing components.""" - app = capycli.bom.create_components.BomCreateComponents(onlyCreateReleases=True) - app.run(args) + app6 = capycli.bom.create_components.BomCreateComponents(onlyCreateReleases=True) + app6.run(args) return if subcommand == "createcomponents": """Create new components and releases on SW360.""" - app = capycli.bom.create_components.BomCreateComponents() - app.run(args) + app7 = capycli.bom.create_components.BomCreateComponents() + app7.run(args) return if subcommand == "downloadsources": """Download source files from the URL specified in the SBOM.""" - app = capycli.bom.download_sources.BomDownloadSources() - app.run(args) + app8 = capycli.bom.download_sources.BomDownloadSources() + app8.run(args) return if subcommand == "granularity": """Check the granularity of the releases in the SBOM.""" - app = capycli.bom.check_granularity.CheckGranularity() - app.run(args) + app9 = capycli.bom.check_granularity.CheckGranularity() + app9.run(args) return if subcommand == "diff": """Compare two SBOM files.""" - app = capycli.bom.diff_bom.DiffBom() - app.run(args) + app10 = capycli.bom.diff_bom.DiffBom() + app10.run(args) return if subcommand == "merge": """Merge two SBOM files.""" - app = capycli.bom.merge_bom.MergeBom() - app.run(args) + app11 = capycli.bom.merge_bom.MergeBom() + app11.run(args) return if subcommand == "findsources": """Determine the source code for SBOM items.""" - app = capycli.bom.findsources.FindSources() - app.run(args) + app12 = capycli.bom.findsources.FindSources() + app12.run(args) return if subcommand == "convert": """Convert SBOM formats.""" - app = capycli.bom.bom_convert.BomConvert() - app.run(args) + app13 = capycli.bom.bom_convert.BomConvert() + app13.run(args) return print_red("Unknown sub-command: ") diff --git a/capycli/bom/html.py b/capycli/bom/html.py index c304675..4855a28 100644 --- a/capycli/bom/html.py +++ b/capycli/bom/html.py @@ -6,9 +6,10 @@ # SPDX-License-Identifier: MIT # ------------------------------------------------------------------------------- -from typing import List, Optional +from typing import Optional from cyclonedx.model.component import Component +from sortedcontainers import SortedSet from capycli import LOG from capycli.common.html_support import HtmlSupport @@ -19,7 +20,8 @@ class HtmlConversionSupport(): @classmethod def write_cdx_components_as_html( cls, - bom: List[Component], + # bom: List[Component], + bom: SortedSet, outputfile: str, project: Optional[Component]) -> None: myhtml = HtmlSupport() diff --git a/capycli/bom/legacy.py b/capycli/bom/legacy.py index 16cc0a7..fec8b81 100644 --- a/capycli/bom/legacy.py +++ b/capycli/bom/legacy.py @@ -8,9 +8,10 @@ from typing import Any, Dict, List -from cyclonedx.model import ExternalReference, ExternalReferenceType, HashAlgorithm, HashType, Property +from cyclonedx.model import ExternalReference, ExternalReferenceType, HashAlgorithm, HashType, Property, XsUri from cyclonedx.model.component import Component -from packageurl import PackageURL # type: ignore +from packageurl import PackageURL +from sortedcontainers import SortedSet from capycli import LOG from capycli.common import json_support @@ -49,7 +50,7 @@ class LegacySupport(): @staticmethod - def get_purl_from_name(item: Dict[str, Any]) -> Any: + def get_purl_from_name(item: Dict[str, Any]) -> PackageURL: """Builds/guesses a package-url from name, version and provided language information.""" lang = "generic" @@ -63,16 +64,16 @@ def get_purl_from_name(item: Dict[str, Any]) -> Any: if item["Language"].lower() == "javascript": lang = "npm" - purl = PackageURL(type=lang, name=item.get("Name", ""), version=item.get("Version", "")).to_string() + purl = PackageURL(type=lang, name=item.get("Name", ""), version=item.get("Version", "")) return purl @staticmethod - def get_purl_from_legacy(item: Dict[str, Any]) -> Any: + def get_purl_from_legacy(item: Dict[str, Any]) -> PackageURL: if "RepositoryType" in item: if (item["RepositoryType"] == "package-url") or (item["RepositoryType"] == "purl"): id = item.get("RepositoryId", "") if id: - return id + return PackageURL.from_string(id) return LegacySupport.get_purl_from_name(item) @@ -85,12 +86,7 @@ def legacy_component_to_cdx(item: Dict[str, Any]) -> Component: name=item.get("Name", "").strip(), version=item.get("Version", "").strip(), purl=purl, - bom_ref=purl, - description=item.get("Description", "").strip()) - else: - cxcomp = Component( - name=item.get("Name", "").strip(), - version=item.get("Version", "").strip(), + bom_ref=purl.to_string(), description=item.get("Description", "").strip()) website = item.get("ProjectSite", "") @@ -99,7 +95,7 @@ def legacy_component_to_cdx(item: Dict[str, Any]) -> Component: if website: ext_ref = ExternalReference( reference_type=ExternalReferenceType.WEBSITE, - url=website) + url=XsUri(website)) cxcomp.external_references.add(ext_ref) projectClearingState = item.get("ProjectClearingState", "") @@ -152,7 +148,7 @@ def legacy_component_to_cdx(item: Dict[str, Any]) -> Component: ext_ref = ExternalReference( reference_type=ExternalReferenceType.DISTRIBUTION, comment=CaPyCliBom.SOURCE_URL_COMMENT, - url=sourceFileUrl) + url=XsUri(sourceFileUrl)) hash = item.get("SourceFileHash", "") if hash: ext_ref.hashes.add(HashType( @@ -165,7 +161,7 @@ def legacy_component_to_cdx(item: Dict[str, Any]) -> Component: ext_ref = ExternalReference( reference_type=ExternalReferenceType.DISTRIBUTION, comment=CaPyCliBom.SOURCE_FILE_COMMENT, - url=sourceUrl) + url=XsUri(sourceUrl)) hash = item.get("SourceFileHash", "") if hash: ext_ref.hashes.add(HashType( @@ -178,7 +174,7 @@ def legacy_component_to_cdx(item: Dict[str, Any]) -> Component: ext_ref = ExternalReference( reference_type=ExternalReferenceType.DISTRIBUTION, comment="source archive (local copy)", - url=sourceFile) + url=XsUri(sourceFile)) hash = item.get("SourceFileHash", "") if hash: ext_ref.hashes.add(HashType( @@ -207,7 +203,7 @@ def legacy_component_to_cdx(item: Dict[str, Any]) -> Component: ext_ref = ExternalReference( reference_type=ExternalReferenceType.DISTRIBUTION, comment=CaPyCliBom.BINARY_FILE_COMMENT, - url=binaryFile) + url=XsUri(binaryFile)) hash = item.get("BinaryFileHash", "") if hash: ext_ref.hashes.add(HashType( @@ -222,7 +218,7 @@ def legacy_component_to_cdx(item: Dict[str, Any]) -> Component: ext_ref = ExternalReference( reference_type=ExternalReferenceType.DISTRIBUTION, comment=CaPyCliBom.BINARY_URL_COMMENT, - url=binaryFileUrl) + url=XsUri(binaryFileUrl)) hash = item.get("BinaryFileHash", "") if hash: ext_ref.hashes.add(HashType( @@ -234,7 +230,7 @@ def legacy_component_to_cdx(item: Dict[str, Any]) -> Component: if repositoryUrl: ext_ref = ExternalReference( reference_type=ExternalReferenceType.VCS, - url=repositoryUrl) + url=XsUri(repositoryUrl)) cxcomp.external_references.add(ext_ref) language = item.get("Language", "") @@ -263,23 +259,23 @@ def legacy_to_cdx_components(cls, inputfile: str) -> List[Component]: @classmethod def cdx_component_to_legacy(cls, cx_comp: Component) -> Dict[str, Any]: - lcomp = {} + lcomp: Dict[str, Any] = {} lcomp["Name"] = cx_comp.name lcomp["Version"] = cx_comp.version or "" lcomp["Description"] = cx_comp.description or "" lcomp["Language"] = CycloneDxSupport.get_property_value(cx_comp, CycloneDxSupport.CDX_PROP_LANGUAGE) lcomp["SourceUrl"] = str(CycloneDxSupport.get_ext_ref_source_url(cx_comp)) - lcomp["RepositoryUrl"] = CycloneDxSupport.get_ext_ref_repository(cx_comp) + lcomp["RepositoryUrl"] = str(CycloneDxSupport.get_ext_ref_repository(cx_comp)) lcomp["SourceFile"] = str(CycloneDxSupport.get_ext_ref_source_file(cx_comp)) lcomp["SourceFileHash"] = CycloneDxSupport.get_source_file_hash(cx_comp) lcomp["BinaryFile"] = str(CycloneDxSupport.get_ext_ref_binary_file(cx_comp)) lcomp["BinaryFileHash"] = CycloneDxSupport.get_binary_file_hash(cx_comp) lcomp["BinaryFileUrl"] = str(CycloneDxSupport.get_ext_ref_binary_url(cx_comp)) - lcomp["Homepage"] = CycloneDxSupport.get_ext_ref_website(cx_comp) - lcomp["ProjectSite"] = CycloneDxSupport.get_ext_ref_website(cx_comp) # same! + lcomp["Homepage"] = str(CycloneDxSupport.get_ext_ref_website(cx_comp)) + lcomp["ProjectSite"] = str(CycloneDxSupport.get_ext_ref_website(cx_comp)) # same! if cx_comp.purl: lcomp["RepositoryType"] = "package-url" - lcomp["RepositoryId"] = cx_comp.purl or "" + lcomp["RepositoryId"] = cx_comp.purl.to_string() or "" lcomp["Sw360Id"] = CycloneDxSupport.get_property_value(cx_comp, CycloneDxSupport.CDX_PROP_SW360ID) lcomp["SourceFileType"] = CycloneDxSupport.get_property_value(cx_comp, CycloneDxSupport.CDX_PROP_SRC_FILE_TYPE) @@ -299,7 +295,7 @@ def cdx_component_to_legacy(cls, cx_comp: Component) -> Dict[str, Any]: return lcomp @classmethod - def write_cdx_components_as_legacy(cls, bom: List[Component], outputfile: str) -> None: + def write_cdx_components_as_legacy(cls, bom: SortedSet, outputfile: str) -> None: LOG.debug(f"Writing to file {outputfile}") legacy_bom = [] diff --git a/capycli/bom/map_bom.py b/capycli/bom/map_bom.py index 8c0f4ed..8ebb84d 100644 --- a/capycli/bom/map_bom.py +++ b/capycli/bom/map_bom.py @@ -13,11 +13,13 @@ import re import sys from enum import Enum -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional, Tuple -from cyclonedx.model import ExternalReference, ExternalReferenceType +from cyclonedx.model import ExternalReference, ExternalReferenceType, XsUri from cyclonedx.model.bom import Bom from cyclonedx.model.component import Component +from packageurl import PackageURL +from sw360 import SW360 import capycli.common.file_support import capycli.common.script_base @@ -48,20 +50,20 @@ class MapBom(capycli.common.script_base.ScriptBase): Map a given SBOM to data on SW360 """ def __init__(self) -> None: - self.releases = None + self.releases: List[Dict[str, Any]] = [] self.old_releases = None self.verbosity = 1 self.relaxed_debian_parsing = False self.mode = MapMode.ALL - self.purl_service: PurlService = None + self.purl_service: Optional[PurlService] = None self.no_match_by_name_only = True - def is_id_match(self, release, component: Component) -> bool: + def is_id_match(self, release: Dict[str, Any], component: Component) -> bool: """Determines whether this release is a match via identifier for the specified SBOM item""" if not component.purl: return False - cmp = component.purl.lower() + cmp = component.purl.to_string().lower() if "ExternalIds" in release: extid_list = release["ExternalIds"] else: @@ -73,7 +75,7 @@ def is_id_match(self, release, component: Component) -> bool: return False - def filter_exceptions(self, partsBomItem: list) -> None: + def filter_exceptions(self, partsBomItem: List[str]) -> List[str]: """Filter some parts that appear too often in too many component names""" if ("cordova" in partsBomItem) and ("plugin" in partsBomItem): partsBomItem.remove("cordova") @@ -81,7 +83,7 @@ def filter_exceptions(self, partsBomItem: list) -> None: return partsBomItem - def similar_name_match(self, component: Component, release: dict) -> bool: + def similar_name_match(self, component: Component, release: Dict[str, Any]) -> bool: """Determine whether there is a relase with a similar name. Similar means a combination of name words...""" SIMILARITY_THRESHOLD = 2 @@ -122,7 +124,7 @@ def similar_name_match(self, component: Component, release: dict) -> bool: return False - def is_better_match(self, releases_found: List[dict], proposed_match_code) -> bool: + def is_better_match(self, releases_found: List[Dict[str, Any]], proposed_match_code: str) -> bool: if not releases_found: return True @@ -137,7 +139,7 @@ def is_better_match(self, releases_found: List[dict], proposed_match_code) -> bo return False @staticmethod - def is_good_match(match_code) -> bool: + def is_good_match(match_code: str) -> bool: """ Returns True of this is a good match, i.e. """ @@ -162,14 +164,11 @@ def is_good_match(match_code) -> bool: return False - def map_bom_item(self, component: Component, check_similar: bool, result_required: bool): + def map_bom_item(self, component: Component, check_similar: bool, result_required: bool) -> MapResult: """Maps a single SBOM item to the list of SW360 releases""" result, _, _ = self.map_bom_commons(component) - if not self.releases: - return None - for release in self.releases: if ("Id" in release) and ("Sw360Id" not in release): release["Sw360Id"] = release["Id"] @@ -200,7 +199,7 @@ def map_bom_item(self, component: Component, check_similar: bool, result_require name_match = component.name.lower() == release["Name"].lower() version_exists = "Version" in release if (name_match - and version_exists + and version_exists and component.version and (component.version.lower() == release["Version"].lower())): if self.is_better_match( result.releases, @@ -262,7 +261,7 @@ def map_bom_item(self, component: Component, check_similar: bool, result_require break # fourth check: source filename - cmp_src_file = CycloneDxSupport.get_ext_ref_source_file(component) + cmp_src_file = str(CycloneDxSupport.get_ext_ref_source_file(component)) if (("SourceFile" in release) and cmp_src_file and release["SourceFile"]): @@ -342,8 +341,11 @@ def cut_off_debian_extras(self, version: str) -> str: new_version = parts[0] return new_version - def map_bom_item_no_cache(self, component: Component): + def map_bom_item_no_cache(self, component: Component) -> MapResult: """Maps a single SBOM item to SW360 via online checks (no cache!)""" + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) result, release_url, component_url = self.map_bom_commons(component) components = [] @@ -352,12 +354,12 @@ def map_bom_item_no_cache(self, component: Component): # if there's no purl match, search for component names if len(components) == 0: - components = self.client.get_component_by_name(component.name) - if not components: + components2 = self.client.get_component_by_name(component.name) + if not components2: return result components = [ compref["_links"]["self"]["href"] - for compref in components["_embedded"]["sw360:components"] + for compref in components2.get("_embedded", {}).get("sw360:components", []) if compref["name"].lower() == component.name.lower() ] @@ -365,15 +367,22 @@ def map_bom_item_no_cache(self, component: Component): if release_url is not None: rel_list = [{"_links": {"self": {"href": release_url}}}] else: - comp = self.client.get_component_by_url(compref) + comp = self.client.get_component_by_url(compref) # type: ignore + if not comp: + continue rel_list = comp["_embedded"].get("sw360:releases", []) # Sorted alternatives in descending version order # Please note: the release list sometimes contain just the href but no version - rel_list = sorted(rel_list, key=lambda x: "version" in x and ComparableVersion(x['version']), reverse=True) + rel_list = sorted(rel_list, + key=lambda x: "version" in x and ComparableVersion( + x.get("version", "")), reverse=True) # type: ignore for relref in rel_list: href = relref["_links"]["self"]["href"] real_release = self.client.get_release_by_url(href) + if not real_release: + print_red("Error accessign release " + href) + continue # generate proper release for result release = {} @@ -409,7 +418,7 @@ def map_bom_item_no_cache(self, component: Component): # again as we checked it when compiling component list) version_exists = "Version" in release if (version_exists - and (component.version.lower() == release["Version"].lower())): + and ((component.version or "").lower() == release.get("Version", "").lower())): if self.is_better_match( result.releases, MapResult.FULL_MATCH_BY_NAME_AND_VERSION): @@ -489,8 +498,11 @@ def map_bom_item_no_cache(self, component: Component): print_text(" ADDED (MATCH_BY_NAME) " + release["Sw360Id"]) return result - def has_release_clearing_result(self, client, result_item) -> bool: + def has_release_clearing_result(self, client: Optional[SW360], result_item: Dict[str, Any]) -> bool: """Checks whether this given result item has a clearing result""" + if not client: + return False + print_text( "Checking clearing result for " + result_item["Name"] + ", " + result_item["Version"]) @@ -519,8 +531,11 @@ def map_bom_to_releases( # retrieve missing types later purl_types = set() for component in sbom.components: - if component.purl and len(component.purl) > 8 and component.purl.startswith("pkg:"): - purl_types.add(component.purl[4:7]) + + if component.purl: + p = component.purl.to_string() + if len(p) > 8 and p.startswith("pkg:"): + purl_types.add(p[4:7]) self.external_id_svc.build_purl_cache(purl_types, self.verbosity <= 1) mapresult: list[MapResult] = [] @@ -538,25 +553,29 @@ def map_bom_to_releases( return mapresult - def search_source_hash_match(self, hashvalue): + def search_source_hash_match(self, hashvalue: str) -> None: """Searches SW360 for a release with an attachment with the specified source file hash""" pass - def search_binary_hash_match(self, hashvalue): + def search_binary_hash_match(self, hashvalue: str) -> None: """Searches SW360 for a release with an attachment with the specified binary file hash""" pass def create_overview(self, result: List[MapResult]) -> Dict[str, Any]: """Create JSON data with an mapping result overview""" - data = {} - dataitems = [] + data: Dict[str, Any] = {} + dataitems: List[Dict[str, Any]] = [] overall_result = "COMPLETE" count = 0 for item in result: - dataitem = {} - dataitem["BomItem"] = item.component.name + ", " + item.component.version + if not item: + continue + + dataitem: Dict[str, Any] = {} + if item.component: + dataitem["BomItem"] = item.component.name + ", " + (item.component.version or "") dataitem["ResultCode"] = item.result dataitem["ResultText"] = item.map_code_to_string(item.result) dataitems.append(dataitem) @@ -571,12 +590,12 @@ def create_overview(self, result: List[MapResult]) -> Dict[str, Any]: return data - def write_overview(self, overview: dict, filename: str) -> None: + def write_overview(self, overview: Dict[str, Any], filename: str) -> None: """Writes a JSON file with an mapping result overview""" with open(filename, "w") as outfile: json.dump(overview, outfile, indent=2) - def get_purl_from_match(self, match: dict) -> str: + def get_purl_from_match(self, match: Dict[str, Any]) -> str: """ Return the package-url for the given SW360 entry. """ @@ -592,7 +611,7 @@ def get_purl_from_match(self, match: dict) -> str: return purl - def update_bom_item(self, component: Component, match: dict) -> Component: + def update_bom_item(self, component: Optional[Component], match: Dict[str, Any]) -> Component: """Update the (current) SBOM item with values from the match""" # print(match.get("Name", "???"), match.get("Version", "???"), "purl =", match.get("RepositoryId", "XXX")) @@ -604,7 +623,7 @@ def update_bom_item(self, component: Component, match: dict) -> Component: component = Component( name=match.get("Name", ""), version=match.get("Version", ""), - purl=purl, + purl=PackageURL.from_string(purl), bom_ref=match.get("RepositoryId", "")) else: component = Component( @@ -625,7 +644,7 @@ def update_bom_item(self, component: Component, match: dict) -> Component: value_match = None if value_match: - component.purl = value_match + component.purl = PackageURL.from_string(value_match) # update if current is empty value_match = match.get("Language", "") @@ -658,8 +677,8 @@ def update_bom_item(self, component: Component, match: dict) -> Component: ExternalReferenceType.DISTRIBUTION, CaPyCliBom.SOURCE_URL_COMMENT, value_match) - elif not ext_ref.url: - ext_ref.url = value_match + elif str(ext_ref.url) == "": + ext_ref.url = XsUri(value_match) value_match = match.get("SourceFile", "") if value_match: @@ -673,8 +692,8 @@ def update_bom_item(self, component: Component, match: dict) -> Component: ExternalReferenceType.DISTRIBUTION, CaPyCliBom.SOURCE_FILE_COMMENT, value_match) - elif not ext_ref_src_file.url: - ext_ref_src_file.url = value_match + elif str(ext_ref_src_file.url) == "": + ext_ref_src_file.url = XsUri(value_match) value_match = match.get("BinaryFile", "") if value_match: @@ -688,8 +707,8 @@ def update_bom_item(self, component: Component, match: dict) -> Component: ExternalReferenceType.DISTRIBUTION, CaPyCliBom.BINARY_FILE_COMMENT, value_match) - elif not ext_ref_bin_file.url: - ext_ref_bin_file.url = value_match + elif str(ext_ref_bin_file.url) == "": + ext_ref_bin_file.url = XsUri(value_match) value_match = match.get("ProjectSite", "") if value_match: @@ -698,10 +717,10 @@ def update_bom_item(self, component: Component, match: dict) -> Component: if not ext_ref: ext_ref = ExternalReference( reference_type=ExternalReferenceType.WEBSITE, - url=value_match) + url=XsUri(value_match)) component.external_references.add(ext_ref) - elif not ext_ref.url: - ext_ref.url = value_match + elif str(ext_ref.url) == "": + ext_ref.url = XsUri(value_match) # no updates for # * SourceFileHash @@ -741,15 +760,16 @@ def create_updated_bom(self, old_bom: Bom, result: List[MapResult]) -> Bom: continue newitem = item.component - CycloneDxSupport.update_or_set_property( - newitem, - CycloneDxSupport.CDX_PROP_MAPRESULT, - MapResult.NO_MATCH) - if item.component_id is not None: + if newitem: CycloneDxSupport.update_or_set_property( newitem, - CycloneDxSupport.CDX_PROP_COMPONENT_ID, - item.component_id) + CycloneDxSupport.CDX_PROP_MAPRESULT, + MapResult.NO_MATCH) + if item.component_id is not None: + CycloneDxSupport.update_or_set_property( + newitem, + CycloneDxSupport.CDX_PROP_COMPONENT_ID, + item.component_id) newbom.components.add(newitem) # Sorted alternatives in descending version order @@ -773,12 +793,15 @@ def create_updated_bom(self, old_bom: Bom, result: List[MapResult]) -> Bom: return newbom - def write_mapping_result(self, result: dict, filename: str) -> None: + def write_mapping_result(self, result: List[MapResult], filename: str) -> None: """Create a JSON file with the mapping details""" data = [] for item in result: - single_result = {} + single_result: Dict[str, Any] = {} + if not item.component: + continue + for prop in item.component.properties: if prop.name == CycloneDxSupport.CDX_PROP_MAPRESULT: item.component.properties.remove(prop) @@ -797,7 +820,7 @@ def write_mapping_result(self, result: dict, filename: str) -> None: def refresh_component_cache( self, cachefile: str, use_existing_data: bool, token: str, oauth2: bool, - sw360_url: str): + sw360_url: str) -> List[Dict[str, Any]]: """Refreshes the component cache.""" cache_mgr = ComponentCacheManagement() @@ -812,25 +835,34 @@ def refresh_component_cache( cachefile, True, token, oauth2=oauth2, url=sw360_url) return rel_data - def map_bom_commons(self, component: Component): + def map_bom_commons(self, component: Component) -> Tuple[MapResult, str, str]: """ Common parts to map from a SBOM component to the SW360 component/release. :param bomitem: SBOM component :return: MapResult instance, (Optional) release url, (Optional) component url """ - if self.relaxed_debian_parsing: + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + + if self.relaxed_debian_parsing and component.version: component.version = self.cut_off_debian_extras(component.version) result = capycli.common.map_result.MapResult(component) # search release and component by purl which is independent of the component cache. - component, release = self.external_id_svc.search_component_and_release(component.purl) - if component: - result.component_id = self.client.get_id_from_href(component) - if release: - result.release_id = self.client.get_id_from_href(release) + if type(component.purl) is PackageURL: + component_href, release_href = self.external_id_svc.search_component_and_release( + component.purl.to_string()) + else: + component_href, release_href = self.external_id_svc.search_component_and_release( + component.purl) # type: ignore + if component_href: + result.component_id = self.client.get_id_from_href(component_href) + if release_href: + result.release_id = self.client.get_id_from_href(release_href) - return result, release, component + return result, release_href, component_href @property def external_id_svc(self) -> PurlService: @@ -838,12 +870,16 @@ def external_id_svc(self) -> PurlService: Lazy external id service getter :return: Purl service """ + if not self.client: + print_red(" No client!") + sys.exit(ResultCode.RESULT_ERROR_ACCESSING_SW360) + if not self.purl_service: # Initialize external id service self.purl_service = PurlService(self.client) return self.purl_service - def setup_cache(self, args: Any): + def setup_cache(self, args: Any) -> None: if not args.nocache: if args.cachefile: cachefile = args.cachefile @@ -876,7 +912,7 @@ def setup_cache(self, args: Any): print_red("No cached releases available!") sys.exit(ResultCode.RESULT_NO_CACHED_RELEASES) - def show_help(self): + def show_help(self) -> None: """Show help text.""" print("usage: CaPyCLI bom map [-h] [-cf CACHEFILE] [-rc] [-sc] [--nocache]") print(" [-ov CREATE_OVERVIEW] [-mr WRITE_MAPRESULT] [-rr]") @@ -913,7 +949,7 @@ def show_help(self): print(" so SBOM version 3.1 will match SW360 version 3.1-3.debian") print(" -all also report matches for name, but different version") - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/bom/merge_bom.py b/capycli/bom/merge_bom.py index 8f6fe15..2dbd7fe 100644 --- a/capycli/bom/merge_bom.py +++ b/capycli/bom/merge_bom.py @@ -8,6 +8,7 @@ import os import sys +from typing import Any, Optional from cyclonedx.model.bom import Bom from cyclonedx.model.component import Component @@ -72,7 +73,7 @@ def are_same(c1: Component, c2: Component, deep: bool = False) -> bool: return True - def find_in_bom(self, bom: Bom, component: Component) -> Component or None: + def find_in_bom(self, bom: Bom, component: Component) -> Optional[Component]: """Searches for an item with the given name and version in the given SBOM.""" for c in bom.components: if self.are_same(c, component): @@ -88,7 +89,7 @@ def merge_boms(self, bom_old: Bom, bom_new: Bom) -> Bom: return bom_old - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/bom/plaintext.py b/capycli/bom/plaintext.py index c6960c4..8e3f86d 100644 --- a/capycli/bom/plaintext.py +++ b/capycli/bom/plaintext.py @@ -9,6 +9,7 @@ from typing import List from cyclonedx.model.component import Component +from sortedcontainers import SortedSet from capycli import LOG from capycli.main.exceptions import CaPyCliException @@ -69,3 +70,17 @@ def write_cdx_components_as_flatlist(cls, bom: List[Component], outputfile: str) raise CaPyCliException("Error writing text file: " + str(exp)) LOG.debug("done") + + @classmethod + def write_cdx_components_as_flatlist2(cls, bom: SortedSet, outputfile: str) -> None: + LOG.debug(f"Writing to file {outputfile}") + try: + with open(outputfile, "w", encoding="utf-8") as fout: + for cx_comp in bom: + name = cx_comp.name + version = cx_comp.version + fout.write(f"{name}, {version}\n") + except Exception as exp: + raise CaPyCliException("Error writing text file: " + str(exp)) + + LOG.debug("done") diff --git a/capycli/bom/show_bom.py b/capycli/bom/show_bom.py index 9375846..96c711b 100644 --- a/capycli/bom/show_bom.py +++ b/capycli/bom/show_bom.py @@ -12,6 +12,7 @@ import os import sys +from typing import Any from cyclonedx.model.bom import Bom @@ -35,7 +36,7 @@ def display_bom(self, bom: Bom, verbose: bool) -> None: if verbose: if bomitem.purl: - print_text(" package-url:" + bomitem.purl) + print_text(" package-url:" + bomitem.purl.to_string()) sw360id = CycloneDxSupport.get_property_value(bomitem, CycloneDxSupport.CDX_PROP_SW360ID) if sw360id: @@ -49,7 +50,7 @@ def display_bom(self, bom: Bom, verbose: bool) -> None: print_text("\n" + str(len(bom.components)) + " items in bill of material\n") - def run(self, args): + def run(self, args: Any) -> None: """Main method()""" if args.debug: global LOG diff --git a/capycli/common/capycli_bom_support.py b/capycli/common/capycli_bom_support.py index bc693a5..a4ca32a 100644 --- a/capycli/common/capycli_bom_support.py +++ b/capycli/common/capycli_bom_support.py @@ -12,7 +12,7 @@ import uuid from datetime import datetime from enum import Enum -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional, Union from cyclonedx.model import ( AttachedText, @@ -31,7 +31,8 @@ from cyclonedx.output.json import JsonV1Dot4 from cyclonedx.parser import BaseParser from dateutil import parser as dateparser -from sortedcontainers import SortedSet # type: ignore +from packageurl import PackageURL +from sortedcontainers import SortedSet import capycli.common.script_base from capycli import LOG @@ -56,20 +57,20 @@ class SbomJsonParser(BaseParser): def __init__(self, json_content: Dict[str, Any], mode: ParserMode = ParserMode.SBOM): super().__init__() LOG.debug("Processing CycloneDX data...") - self.parser_mode = mode - self.metadata = self.read_metadata(json_content.get("metadata")) - serial_number = json_content.get("serialNumber", None) - self.serial_number = uuid.UUID(serial_number) \ + self.parser_mode: ParserMode = mode + self.metadata: Optional[BomMetaData] = self.read_metadata(json_content.get("metadata")) + serial_number: str = json_content.get("serialNumber", "") + self.serial_number: Optional[uuid.UUID] = uuid.UUID(serial_number) \ if self.is_valid_serial_number(serial_number) \ else None - components = json_content.get("components", None) + components = json_content.get("components", []) if components: for component_entry in components: component = self.read_component(component_entry) if component: self._components.append(component) self.external_references = self.read_external_references( - json_content.get("externalReferences", None)) + json_content.get("externalReferences", [])) LOG.debug("...done.") @@ -79,36 +80,39 @@ def get_project(self) -> Optional[Component]: if not self.metadata: return None - return self.metadata.component # type: ignore + return self.metadata.component def link_dependencies_to_project(self, bom: Bom) -> None: - if not self.metadata: + if not bom.metadata: return - if not self.metadata.component: + if not bom.metadata.component: return for component in self._components: + if not component: + continue + bom.metadata.component.dependencies.add(component.bom_ref) - def get_tools(self) -> Optional[List[Tool]]: + def get_tools(self) -> SortedSet: """Get the list of tools read by the parser.""" if not self.metadata: - return + return SortedSet() return self.metadata.tools - def get_metadata_licenses(self) -> Optional[SortedSet]: + def get_metadata_licenses(self) -> SortedSet: """Get the metadata licenses read by the parser.""" if not self.metadata: - return + return SortedSet() return self.metadata.licenses - def get_metadata_properties(self) -> Optional[SortedSet]: + def get_metadata_properties(self) -> SortedSet: """Get the list of metadata properties read by the parser.""" if not self.metadata: - return + return SortedSet() return self.metadata.properties @@ -118,14 +122,15 @@ def is_valid_serial_number(self, serial_number: str) -> bool: return not (serial_number is None or "urn:uuid:None" == serial_number) - def read_tools(self, param: Iterable[Dict[str, Any]]) -> Optional[Iterable[Tool]]: + def read_tools(self, param: Iterable[Dict[str, Any]]) -> SortedSet: + tools = SortedSet() + if not param: - return None + return tools LOG.debug("CycloneDX: reading tools") - tools = [] for tool in param: - tools.append(Tool( + tools.add(Tool( vendor=tool.get("vendor"), name=tool.get("name"), version=tool.get("version"), @@ -238,7 +243,7 @@ def read_external_references(self, values: Iterable[Dict[str, Any]]) -> Optional if entry.get("type"): ex_refs.append(ExternalReference( reference_type=self.read_external_reference_type(entry.get("type")), - url=entry.get("url", None), + url=XsUri(entry.get("url", "")), comment=entry.get("comment"), hashes=self.read_hashes(entry.get("hashes", [])) )) @@ -251,6 +256,12 @@ def read_component(self, entry: Dict[str, Any]) -> Optional[Component]: name = entry.get("name", None) version = entry.get("version") LOG.debug(f"CycloneDX: reading component {name}, {version}") + purl_str = entry.get("purl", "") + # purl: PackageURL + if purl_str: + purl = PackageURL.from_string(purl_str) + else: + purl = None return Component( name=name, version=version, @@ -258,7 +269,7 @@ def read_component(self, entry: Dict[str, Any]) -> Optional[Component]: author=entry.get("author"), description=entry.get("description"), copyright_=entry.get("copyright"), - purl=entry.get("purl"), + purl=purl, bom_ref=entry.get("bom-ref"), component_type=self.read_component_type(entry.get("type", None)), hashes=self.read_hashes(entry.get("hashes", None)), @@ -343,10 +354,10 @@ def get_ext_ref(comp: Component, type: ExternalReferenceType, comment: str) -> O @staticmethod def set_ext_ref(comp: Component, type: ExternalReferenceType, comment: str, value: str, - hash_algo: str = None, hash: str = None) -> None: + hash_algo: str = "", hash: str = "") -> None: ext_ref = ExternalReference( reference_type=type, - url=value, + url=XsUri(value), comment=comment) if hash_algo and hash: @@ -370,11 +381,14 @@ def update_or_set_ext_ref(comp: Component, type: ExternalReferenceType, comment: CycloneDxSupport.set_ext_ref(comp, type, comment, value) @staticmethod - def have_relative_ext_ref_path(ext_ref: ExternalReference, rel_to: str): - bip = pathlib.PurePath(ext_ref.url) + def have_relative_ext_ref_path(ext_ref: ExternalReference, rel_to: str) -> str: + if isinstance(ext_ref.url, str): + bip = pathlib.PurePath(ext_ref.url) + else: + bip = pathlib.PurePath(ext_ref.url._uri) file = bip.as_posix() if os.path.isfile(file): - ext_ref.url = "file://" + bip.relative_to(rel_to).as_posix() + ext_ref.url = XsUri("file://" + bip.relative_to(rel_to).as_posix()) return bip.name @staticmethod @@ -523,7 +537,7 @@ def add_profile(sbom: Bom, profile: str) -> None: sbom.metadata.properties.add(prop) @staticmethod - def create(bom: List[Component], **kwargs: bool) -> Bom: + def create(bom: Union[List[Component], SortedSet], **kwargs: bool) -> Bom: sbom = Bom() if not sbom.metadata.properties: @@ -543,20 +557,20 @@ def create(bom: List[Component], **kwargs: bool) -> Bom: SbomCreator.add_profile(sbom, "capycli") if not sbom.metadata.tools: - sbom.metadata.tools = [] + sbom.metadata.tools = SortedSet() if "addtools" in kwargs and kwargs["addtools"]: SbomCreator.add_tools(sbom.metadata.tools) if "name" in kwargs or "version" in kwargs or "description" in kwargs: - sbom.metadata.component = Component( - name=kwargs.get("name"), - version=kwargs.get("version"), - description=kwargs.get("description") - ) + _name = str(kwargs.get("name", "")) + _version = str(kwargs.get("version", "")) + _description = str(kwargs.get("description", "")) + if _name and _version and _description: + sbom.metadata.component = Component(name=_name, version=_version, description=_description) if bom: - sbom.components = bom + sbom.components = SortedSet(bom) if kwargs.get("addprojectdependencies") and sbom.metadata.component: for component in sbom.components: sbom.metadata.component.dependencies.add(component.bom_ref) @@ -672,7 +686,7 @@ def write_sbom(cls, sbom: Bom, outputfile: str) -> None: LOG.debug("done") @classmethod - def write_simple_sbom(cls, bom: List[Component], outputfile: str) -> None: + def write_simple_sbom(cls, bom: SortedSet, outputfile: str) -> None: LOG.debug(f"Writing to file {outputfile}") try: creator = SbomCreator() diff --git a/capycli/common/comparable_version.py b/capycli/common/comparable_version.py index ac688fe..ed9b28e 100644 --- a/capycli/common/comparable_version.py +++ b/capycli/common/comparable_version.py @@ -6,6 +6,10 @@ # SPDX-License-Identifier: MIT # ------------------------------------------------------------------------------- +from __future__ import annotations + +from typing import Any, List, Tuple + from capycli import get_logger LOG = get_logger(__name__) @@ -18,10 +22,10 @@ class IncompatibleVersionError(Exception): class ComparableVersion: """Version string comparison.""" - parts: list + parts: List[Tuple[Any, Any]] version: str - def __init__(self, version: str): + def __init__(self, version: str) -> None: self.version = version try: self.parts = self.parse(version) @@ -29,11 +33,11 @@ def __init__(self, version: str): LOG.warning("Unable to parse version %s", version) @staticmethod - def parse(version: str): + def parse(version: str) -> List[Tuple[bool, int | str]]: version = version.lower() isdigit = False - parts = [] + parts: List[Tuple[bool, int | str]] = [] start = 0 for i, c in enumerate(version): if c in [".", "-", "_"]: @@ -55,11 +59,11 @@ def parse(version: str): for i, part in enumerate(parts): if part[0]: - parts[i] = (part[0], int(part[1])) + parts[i] = (True, int(part[1])) return parts - def compare(self, other): + def compare(self, other: ComparableVersion) -> int: """ Compare versions :param other: other version @@ -71,7 +75,8 @@ def compare(self, other): raise IncompatibleVersionError(e) @staticmethod - def get_part_or_default(part_list: list, pos: int, other: list): + def get_part_or_default(part_list: List[Tuple[bool, int | str]], pos: int, + other: List[Tuple[bool, int | str]]) -> str | int: if pos < len(part_list): return part_list[pos][1] elif pos < len(other): @@ -80,7 +85,8 @@ def get_part_or_default(part_list: list, pos: int, other: list): else: return "" - def compare_recursive(self, me, i, other, j): + def compare_recursive(self, me: List[Tuple[bool, int | str]], i: int, + other: List[Tuple[bool, int | str]], j: int) -> int: """ Recursive go through version parts and compare them. Rules: @@ -103,58 +109,66 @@ def compare_recursive(self, me, i, other, j): if left == right: return self.compare_recursive(me, i + 1, other, j + 1) - if left > right: + # if str(left) > str(right): => test fails + # if int(left) > int(right): => test fails + if left > right: # type: ignore return 1 else: return -1 - def __eq__(self, other): + def __eq__(self, other: ComparableVersion | object) -> bool: """describes equality operator(==)""" + if not isinstance(other, self.__class__): + return False + try: return self.compare(other) == 0 except IncompatibleVersionError: return self.version.__eq__(other.version) - def __ne__(self, other): + def __ne__(self, other: ComparableVersion | object) -> bool: """describes not equal to operator(!=)""" + if not isinstance(other, self.__class__): + return False + try: return self.compare(other) != 0 except IncompatibleVersionError: return self.version.__ne__(other.version) - def __le__(self, other): + def __le__(self, other: ComparableVersion) -> bool: """descries less than or equal to (<=)""" try: return self.compare(other) <= 0 except IncompatibleVersionError: return self.version.__le__(other.version) - def __ge__(self, other): + def __ge__(self, other: ComparableVersion) -> bool: """describes greater than or equal to (>=)""" try: return self.compare(other) >= 0 except IncompatibleVersionError: return self.version.__ge__(other.version) - def __gt__(self, other): + def __gt__(self, other: ComparableVersion) -> bool: """describes greater than (>)""" try: return self.compare(other) > 0 except IncompatibleVersionError: return self.version.__gt__(other.version) - def __lt__(self, other): + def __lt__(self, other: ComparableVersion) -> bool: """describes less than operator(<)""" try: return self.compare(other) < 0 except IncompatibleVersionError: return self.version.__lt__(other.version) - def __repr__(self): + def __repr__(self) -> str: return self.version @property - def major(self): + def major(self) -> str: if self.parts: return self.parts[0][1] else: diff --git a/capycli/common/component_cache.py b/capycli/common/component_cache.py index 7f97192..365cd77 100644 --- a/capycli/common/component_cache.py +++ b/capycli/common/component_cache.py @@ -9,6 +9,7 @@ import json import os import sys +from typing import Any, Dict, List, Optional import sw360 @@ -26,15 +27,18 @@ class ComponentCacheManagement(): CACHE_FILENAME = "ComponentCache.json" CACHE_ALL_RELEASES = "AllReleases.json" - def __init__(self, token=None, oauth2=False, url=None) -> None: - self.token = token - self.oauth2 = oauth2 - self.releases = None + def __init__(self, token: Optional[str] = None, oauth2: bool = False, url: Optional[str] = None) -> None: + if token: + self.token: str = token + self.oauth2: bool = oauth2 + self.releases: List[Dict[str, Any]] = [] self.old_releases = None - self.sw360_url = None + + if url: + self.sw360_url: str = url @classmethod - def read_component_cache(cls, cachefile: str) -> dict: + def read_component_cache(cls, cachefile: str) -> List[Dict[str, Any]]: """Read the cached list of SW360 releases""" """ @@ -70,7 +74,7 @@ def read_component_cache(cls, cachefile: str) -> dict: return release_cache @classmethod - def get_attachment(cls, release: dict, att_type: str): + def get_attachment(cls, release: Dict[str, Any], att_type: str) -> Optional[Dict[str, Any]]: """Return the first attachment that matches the specified type""" if "_embedded" not in release: return None @@ -96,7 +100,7 @@ def get_attachment(cls, release: dict, att_type: str): return None @staticmethod - def get_value_or_default(release, key): + def get_value_or_default(release: Dict[str, Any], key: str) -> str: """Return a dictionary value if it exists, otherwise an empty string""" if key not in release: return "" @@ -112,14 +116,15 @@ def read_existing_component_cache(self, cachefile: str) -> int: self.old_releases = None if self.old_releases: - return len(self.old_releases) + return len(self.old_releases) # type: ignore # code is used! else: return 0 - def get_rest_client(self, token: str = None, oauth2: bool = False, url: str = None): + def get_rest_client(self, token: Optional[str] = None, oauth2: bool = False, + url: Optional[str] = None) -> sw360.SW360: """Get an instance of the REST API client""" - self.sw360_url = os.environ.get("SW360ServerUrl", None) - sw360_api_token = os.environ.get("SW360ProductionToken", None) + self.sw360_url = os.environ.get("SW360ServerUrl", "") + sw360_api_token = os.environ.get("SW360ProductionToken", "") if token: sw360_api_token = token @@ -138,7 +143,7 @@ def get_rest_client(self, token: str = None, oauth2: bool = False, url: str = No print_red(" No SW360 API token specified!") sys.exit(ResultCode.RESULT_AUTH_ERROR) - client = sw360.sw360_api.SW360(self.sw360_url, sw360_api_token, oauth2) + client = sw360.SW360(self.sw360_url, sw360_api_token, oauth2) if not client.login_api(sw360_api_token): print_red("ERROR: login failed") sys.exit(ResultCode.RESULT_AUTH_ERROR) @@ -146,15 +151,15 @@ def get_rest_client(self, token: str = None, oauth2: bool = False, url: str = No return client @classmethod - def convert_release_details(cls, client, details) -> dict: + def convert_release_details(cls, client: sw360.SW360, details: Dict[str, Any]) -> Dict[str, Any]: """ Convert the SW360 release data into our own data. """ if not details: - return + return {} try: - release = {} + release: Dict[str, Any] = {} release["Id"] = client.get_id_from_href( details["_links"]["self"]["href"] ) @@ -209,9 +214,11 @@ def convert_release_details(cls, client, details) -> dict: " Error getting details on " + details["_links"]["self"]["href"] + " " + repr(ex)) + return {} + def refresh_component_cache( - self, cachefile: str, fast: bool, token: str = None, - oauth2: bool = False, url: str = None): + self, cachefile: str, fast: bool, token: Optional[str] = None, + oauth2: bool = False, url: Optional[str] = None) -> List[Dict[str, Any]]: """ Read all releases from SW360. May take 90 minutes! The new multi-threaded approach takes about one hour for 25.000 @@ -225,6 +232,9 @@ def refresh_component_cache( # reset global list self.releases = [] + if not allnew: + return [] + for newdata in allnew: internal = self.convert_release_details(client, newdata) if internal: diff --git a/capycli/common/file_support.py b/capycli/common/file_support.py index b7a5db6..698a112 100644 --- a/capycli/common/file_support.py +++ b/capycli/common/file_support.py @@ -16,7 +16,7 @@ from colorama import Fore, Style -def create_backup(filename): +def create_backup(filename: str) -> None: """Create backup file""" try: if os.path.isfile(filename): diff --git a/capycli/common/html_support.py b/capycli/common/html_support.py index 9baac47..b30ebb8 100644 --- a/capycli/common/html_support.py +++ b/capycli/common/html_support.py @@ -99,7 +99,7 @@ def open_html_in_browser(cls) -> None: """show resulting html file""" os.system("output.html") - def create_style(self): + def create_style(self) -> str: """Create the HTML style information""" lineend = self.get_lineend() style = '