diff --git a/pyartifactory/models/artifact.py b/pyartifactory/models/artifact.py index effe21b..2207af6 100644 --- a/pyartifactory/models/artifact.py +++ b/pyartifactory/models/artifact.py @@ -6,7 +6,7 @@ import hashlib from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Literal, Optional, Union +from typing import Callable, Dict, List, Literal, Optional, Union from pydantic import BaseModel @@ -18,14 +18,28 @@ class Checksums(BaseModel): md5: str sha256: str + @staticmethod + def get_hasher(func: Callable[..., hashlib._Hash]) -> hashlib._Hash: + # In Python 3.9+, some hash algorithms (like md5, sha1, sha256) may be disabled in FIPS-compliant systems + # unless 'usedforsecurity=False' is specified. This flag allows the hash function to be used for non-security + # purposes such as checksums, even in FIPS environments. + try: + return func(usedforsecurity=False) + except TypeError: + return func() + @classmethod def generate(cls, file_: Path) -> Checksums: block_size: int = 65536 - mapping: dict[str, Callable[[], Any]] = {"md5": hashlib.md5, "sha1": hashlib.sha1, "sha256": hashlib.sha256} + mapping = { + "md5": hashlib.md5, + "sha1": hashlib.sha1, + "sha256": hashlib.sha256, + } results = {} for algorithm, hashing_function in mapping.items(): - hasher = hashing_function() + hasher = cls.get_hasher(hashing_function) with file_.absolute().open("rb") as fd: buf = fd.read(block_size) while len(buf) > 0: diff --git a/tests/test_artifacts.py b/tests/test_artifacts.py index 8143012..6174519 100644 --- a/tests/test_artifacts.py +++ b/tests/test_artifacts.py @@ -612,6 +612,54 @@ def test_checksum_defined_file(file_path: Path, expected_sha1: str, expected_md5 assert result == expected +def test_get_hasher_security_flag(): + calls = {"kwargs": None} + + class Dummy: + def __init__(self): + self._buf = b"" + + def update(self, b): + self._buf += b + + def hexdigest(self): + return "ok" + + def func(**kwargs): + calls["kwargs"] = kwargs + return Dummy() + + hasher = Checksums.get_hasher(func) + assert isinstance(hasher, Dummy) + assert calls["kwargs"] == {"usedforsecurity": False} + + +def test_get_hasher_type_error_thrown(): + calls = {"with_kwargs": 0, "without_kwargs": 0} + + class Dummy: + def __init__(self): + self._buf = b"" + + def update(self, b): + self._buf += b + + def hexdigest(self): + return "ok" + + def func(**kwargs): + if kwargs: + calls["with_kwargs"] += 1 + raise TypeError("unexpected kwarg") + calls["without_kwargs"] += 1 + return Dummy() + + hasher = Checksums.get_hasher(func) + assert isinstance(hasher, Dummy) + assert calls["with_kwargs"] == 1 + assert calls["without_kwargs"] == 1 + + @responses.activate def test_deploy_artifact_with_checksum_success(mocker): responses.add(responses.PUT, f"{URL}/{ARTIFACT_PATH}", status=200)