Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions pyartifactory/models/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import hashlib
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Union
from typing import Callable, Dict, List, Literal, Optional, Union

from pydantic import BaseModel

Expand All @@ -18,14 +18,28 @@ class Checksums(BaseModel):
md5: str
sha256: str

@staticmethod
def get_hasher(func: Callable[..., hashlib._Hash]) -> hashlib._Hash:
# In Python 3.9+, some hash algorithms (like md5, sha1, sha256) may be disabled in FIPS-compliant systems
# unless 'usedforsecurity=False' is specified. This flag allows the hash function to be used for non-security
# purposes such as checksums, even in FIPS environments.
try:
return func(usedforsecurity=False)
except TypeError:
return func()

@classmethod
def generate(cls, file_: Path) -> Checksums:
block_size: int = 65536
mapping: dict[str, Callable[[], Any]] = {"md5": hashlib.md5, "sha1": hashlib.sha1, "sha256": hashlib.sha256}
mapping = {
"md5": hashlib.md5,
"sha1": hashlib.sha1,
"sha256": hashlib.sha256,
}
results = {}

for algorithm, hashing_function in mapping.items():
hasher = hashing_function()
hasher = cls.get_hasher(hashing_function)
with file_.absolute().open("rb") as fd:
buf = fd.read(block_size)
while len(buf) > 0:
Expand Down
48 changes: 48 additions & 0 deletions tests/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,54 @@ def test_checksum_defined_file(file_path: Path, expected_sha1: str, expected_md5
assert result == expected


def test_get_hasher_security_flag():
calls = {"kwargs": None}

class Dummy:
def __init__(self):
self._buf = b""

def update(self, b):
self._buf += b

def hexdigest(self):
return "ok"

def func(**kwargs):
calls["kwargs"] = kwargs
return Dummy()

hasher = Checksums.get_hasher(func)
assert isinstance(hasher, Dummy)
assert calls["kwargs"] == {"usedforsecurity": False}


def test_get_hasher_type_error_thrown():
calls = {"with_kwargs": 0, "without_kwargs": 0}

class Dummy:
def __init__(self):
self._buf = b""

def update(self, b):
self._buf += b

def hexdigest(self):
return "ok"

def func(**kwargs):
if kwargs:
calls["with_kwargs"] += 1
raise TypeError("unexpected kwarg")
calls["without_kwargs"] += 1
return Dummy()

hasher = Checksums.get_hasher(func)
assert isinstance(hasher, Dummy)
assert calls["with_kwargs"] == 1
assert calls["without_kwargs"] == 1


@responses.activate
def test_deploy_artifact_with_checksum_success(mocker):
responses.add(responses.PUT, f"{URL}/{ARTIFACT_PATH}", status=200)
Expand Down
Loading