diff --git a/model_signing/manifest/in_toto.py b/model_signing/manifest/in_toto.py deleted file mode 100644 index d3cdb39f..00000000 --- a/model_signing/manifest/in_toto.py +++ /dev/null @@ -1,131 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" -This package provides functionality to convert file based manifests -to in-toto statements. It is necessary because sigstore does not -support arbitrary payloads in DSSE envelopes. -""" -import pathlib - -from in_toto_attestation.v1 import statement -from in_toto_attestation.v1 import resource_descriptor - -from model_signing.hashing.hashing import Digest -from model_signing.manifest import manifest -from model_signing.manifest.manifest import FileLevelManifest -from model_signing.manifest.manifest import FileManifestItem - - -_PREDICATE_TYPE = 'model_signing/v1/manifest' -_FILE_LEVEL_MANIFEST = 'FileLevelManifest' - - -def _file_level_manifest_to_statement( - manifest: FileLevelManifest, - ) -> statement.Statement: - """ - Converts a model signing FileLevelManifest to an - in-toto statement. - - Args: - manifest (FileLevelManifest): the manifest to convert - algorithm (str): the used hash algorithm - - Returns: - statement.Statement: the in-toto statement representing the manifest - """ - subjects: list[resource_descriptor.ResourceDescriptor] = [] - for path, digest in manifest.files.items(): - s = resource_descriptor.ResourceDescriptor( - name=str(path), - digest={digest.algorithm: digest.digest_hex}, - ).pb - subjects.append(s) - return statement.Statement( - subjects=subjects, - predicate_type=_PREDICATE_TYPE, - predicate={'manifest_type': _FILE_LEVEL_MANIFEST}, - ) - - -def _statement_to_file_level_manifest( - statement: statement.Statement, - ) -> FileLevelManifest: - """ - Converts an in-toto statement to a FileLevelManifest. - - Args: - statement (statement.Statement): the in-toto statement - algorithm (str): the hash algorithm used - - Returns: - FileLevelManifest: the resutling FileLevelManifest - """ - items: list[FileManifestItem] = [] - for s in statement.pb.subject: - # no support for multiple hashes - alg, dig = list(s.digest.items())[0] - items.append( - FileManifestItem( - path=pathlib.Path(s.name), - digest=Digest( - algorithm=alg, - digest_value=bytes.fromhex(dig), - ) - ) - ) - return FileLevelManifest(items) - - -def manifest_to_statement( - model_manifest: manifest.Manifest - ) -> statement.Statement: - """Converts a manifest to an in-toto statement - - Args: - model_manifest (manifest.Manifest): the manifest - - Raises: - ValueError: for non supported manifest types - - Returns: - statement.Statement: the resulting in-toto statement - """ - # TODO(#248): support for the other manifest types - if isinstance(model_manifest, manifest.FileLevelManifest): - return _file_level_manifest_to_statement(model_manifest) - raise ValueError('manifest type not supported') - - -def statement_to_manifest( - stmnt: statement.Statement - ) -> manifest.Manifest: - """Converts a statement to a manifest type. - - The type of the manifest depends on the in-tota statments - `manifest_type` predicate. - - Args: - stmnt (statement.Statement): the statement - - Raises: - ValueError: for non supported manifest types - - Returns: - manifest.Manifest: manifest according to the statement - """ - # TODO(#248): support for the other manifest types - if stmnt.pb.predicate['manifest_type'] == _FILE_LEVEL_MANIFEST: - return _statement_to_file_level_manifest(stmnt) - raise ValueError('manifest type not supported') diff --git a/model_signing/manifest/in_toto_test.py b/model_signing/manifest/in_toto_test.py deleted file mode 100644 index 06d7d111..00000000 --- a/model_signing/manifest/in_toto_test.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import pathlib - -from model_signing.hashing import hashing -from model_signing.manifest import in_toto -from model_signing.manifest import manifest - -_MANIFEST = manifest.FileLevelManifest([ - manifest.FileManifestItem( - path=pathlib.Path('/abc'), - digest=hashing.Digest('FANCY_HASH', digest_value=b'\x41\x41\x41\x41')) -]) - - -def test_conversion(): - stmnt = in_toto.manifest_to_statement(_MANIFEST) - man = in_toto.statement_to_manifest(stmnt) - assert man == _MANIFEST diff --git a/model_signing/serialize.py b/model_signing/serialize.py deleted file mode 100644 index 3c3ba0f1..00000000 --- a/model_signing/serialize.py +++ /dev/null @@ -1,392 +0,0 @@ -# Copyright 2024 The Sigstore Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import hashlib -import base64 -import os -from concurrent.futures import ProcessPoolExecutor -from multiprocessing import get_start_method, set_start_method -from pathlib import Path -import platform - -# Use for testing while keeping disk size low. -allow_symlinks = False - - -class Hasher: - @staticmethod - def node_header(name: str, ty: str) -> bytes: - header = ty.encode('utf-8') + b'.' + \ - base64.b64encode(name.encode('utf-8')) + b'.' - return header - - @staticmethod - def root_folder(path: Path, content: bytes) -> str: - return Hasher._node_folder_compute(name="root", content=content) - - @staticmethod - def node_folder(path: Path, content: bytes) -> str: - return Hasher._node_folder_compute(name=path.name, content=content) - - @staticmethod - def _node_folder_compute(name: str, content: bytes) -> bytes: - value = Hasher.node_header(name, "dir") + content - return hashlib.sha256(value).digest() - - @staticmethod - def root_file(path: Path, chunk: int) -> bytes: - return Hasher._node_file_compute(path, b'', chunk) - - @staticmethod - def node_file(path: Path, chunk: int = 0) -> bytes: - if not path.is_file(): - raise ValueError(f"path {path} is not a file") - header = Hasher.node_header(path.name, "file") - return Hasher._node_file_compute(path, header, chunk) - - @staticmethod - def _node_file_compute(path: Path, header: bytes, chunk: int) -> bytes: - h = hashlib.sha256(header) - with open(path, "rb") as f: - if chunk == 0: - all_data = f.read() - h.update(all_data) - else: - # Compute the hash by reading chunk bytes at a time. - while True: - chunk_data = f.read(chunk) - if not chunk_data: - break - h.update(chunk_data) - return h.digest() - - @staticmethod - def _node_file_compute_v1(path: Path, header: bytes, - start: int, end: int, chunk: int) -> bytes: - h = hashlib.sha256(header) - with open(path, "rb") as f: - # WARNING: We must start reading the file at the starting offset. - f.seek(start) - # Read all at once. - if chunk == 0 or chunk >= (end - start): - content = f.read(end - start) - # print(f"all: {f.name}: {start}-{end}") - h.update(content) - else: - # Compute the hash by reading chunk bytes at a time. - remains = end - start - while remains != 0: - # read = (end - start) - remains - # print(f"loop {i}: {f.name}: - # {read}-{read + min(chunk, remains)}") - processed = min(chunk, remains) - chunk_data = f.read(processed) - if processed != len(chunk_data): - raise ValueError("internal: unread bytes: " + - f"{processed} != {len(chunk_data)}") - if not chunk_data: - raise ValueError("internal: no data: " + - f"filename={str(path)}, " + - f"remains={remains}, " + - f"{processed} != {len(chunk_data)}") - h.update(chunk_data) - remains -= processed - return h.digest() - - -def remove_prefix(text, prefix): - if text.startswith(prefix): - return text[len(prefix):] - return text - - -def validate_signature_path(model_path: Path, sig_path: Path): - if model_path.is_file(): - return - # Note: Only allow top-level folder to have the signature for simplicity. - if sig_path is not None and sig_path.is_relative_to(model_path) and \ - sig_path.parent != model_path: - raise ValueError(f"{sig_path} must be in the folder root") - - -def is_relative_to(p: Path, path_list: [Path]) -> bool: - for e in path_list: - if p.is_relative_to(e): - return True - return False - - -# TODO(): add a context "AI model"? -class Serializer: - @staticmethod - # TODO: type of returned value. - def _ordered_files(path: Path, ignorepaths: [Path]) -> []: - children: [Path] - if path.is_file(): - children = [path] - else: - # NOTE: the parent (..) and current directory (.) are not present. - # NOTE: this returns hidden files as well. - # TODO: tests that this pattern reports all files, - # regardless of their depth. - children = sorted(path.glob("**/*")) - - filtered = [] - total_size = 0 - for child in children: - if is_relative_to(child, ignorepaths): - continue - - # To avoid bugs where we read the link rather than its target, - # we don't allow symlinks for now. - # NOTE: It seems that Python's read() *always* follows symlinks, - # so it may be safe to allow them. (readlink() is the function - # to read the link metadata). - if not allow_symlinks and child.is_symlink(): - raise ValueError(f"{str(child)} is symlink") - - if not child.is_file() and not child.is_dir(): - raise ValueError(f"{str(child)} is not a dir or file") - - # The recorded path must *not* contains the folder name, - # since users may rename it. - record_path = remove_prefix( - str(child.as_posix()), str(path.as_posix() + '/')) - record_type = "file" if child.is_file() else "dir" - record_size = \ - os.path.getsize(str(child)) if record_type == "file" else 0 - filtered += [(record_path, record_type, record_size)] - total_size += record_size - return filtered - - @staticmethod - # TODO: type of returned value. - def _create_tasks(children: [], shard_size: int) -> [[]]: - tasks = [[]] * 0 - curr_file = 0 - curr_pos = 0 - - while True: - # All files have been processed. - if curr_file >= len(children): - break - - name, typ, size = children[curr_file] - - # It's a directory. - # NOTE: It is fast to compute the hash because there's no data - # besides the name and the type. - # TODO(#12): do we need this at all? This only matters - # if we care about empty directories, since non-empty ones have - # their file + path recorded. - if typ == "dir": - # Record the task. - tasks += [(name, typ, 0, size)] - curr_file += 1 - curr_pos = 0 - continue - - # It's a file. - - # Sanity checks. - if size <= curr_pos and size > 0: - raise ValueError(f"internal: size={size}, " + - f"curr_pos={curr_pos} " + - f"for {children[curr_file]}") - - # Compute the number of bytes to process. - remains = size - curr_pos - if remains < 0: - raise ValueError(f"internal: remains is {remains}") - processed = min(remains, shard_size) - end_pos = curr_pos + processed - - # Record the task. - tasks += [(name, typ, curr_pos, end_pos)] - - # Update position. - curr_pos += processed - - # If we have processed all bytes, we move on to the next file. - if remains == processed: - curr_file += 1 - curr_pos = 0 - return tasks - - @staticmethod - # TODO: type of tasks - def _run_tasks(path: Path, chunk: int, tasks: []) -> bytes: - # See https://superfastpython.com/processpoolexecutor-in-python/ - # NOTE: 32 = length of sha256 digest. - digest_len = 32 - all_hashes = [None] * (digest_len*len(tasks)) - org_len = len(all_hashes) - - # Use fork on Linux as it's supposed to be faster. - if platform.system() == "Linux" and get_start_method() != "fork": - set_start_method('fork') - with ProcessPoolExecutor() as ppe: - futures = [ppe.submit(Serializer.task, (path, chunk, task)) - for task in tasks] - results = [f.result() for f in futures] - for i, result in enumerate(results): - all_hashes[i*digest_len:(i+1)*digest_len] = result - # Sanity check. - if len(all_hashes) != org_len: - raise ValueError(f"internal: {len(all_hashes)} != {org_len}") - return bytes(all_hashes) - - @staticmethod - # TODO: type of task_info. - def task(task_info: []): - # NOTE: we can get process info using: - # from multiprocessing import current_process - # worker = current_process() - # print(f'Task {task_info}, - # worker name={worker.name}, pid={worker.pid}', flush=True) - - model_path, chunk, (name, ty, start_pos, end_pos) = task_info - - # Header format is: "type.b64(filename).start-end." - header = ty.encode('utf-8') + b'.' + \ - base64.b64encode(name.encode('utf-8')) + \ - b'.' + f"{start_pos}-{end_pos}".encode('utf-8') + b'.' - - # To hash a directory, we use "none" content. - # TODO(#12): do we need this at all? This only matters - # if we care about empty directories, since non-empty ones have - # their file + path recorded. - if ty == "dir": - value = header + b'none' - return hashlib.sha256(value).digest() - - # We need to hash a file. - - # The model is a directory. - if model_path.is_dir(): - return Hasher._node_file_compute_v1(model_path.joinpath(name), - header, start_pos, - end_pos, chunk) - - # The model is a single file. - # We update the file name to a generic "root". - header = ty.encode('utf-8') + b'.' + \ - base64.b64encode("root".encode('utf-8')) + \ - b'.' + f"{start_pos}-{end_pos}".encode('utf-8') + b'.' - return Hasher._node_file_compute_v1(name, - header, start_pos, end_pos, chunk) - - @staticmethod - def _serialize_v1(path: Path, chunk: int, shard: int, signature_path: Path, - ignorepaths: [Path] = []) -> bytes: - if not path.exists(): - raise ValueError(f"{str(path)} does not exist") - - if not allow_symlinks and path.is_symlink(): - raise ValueError(f"{str(path)} is a symlink") - - if chunk < 0: - raise ValueError(f"{str(chunk)} is invalid") - - if not path.is_file() and not path.is_dir(): - raise ValueError(f"{str(path)} is not a dir or file") - - # Validate the signature path. - validate_signature_path(path, signature_path) - - # Children to hash. - children = Serializer._ordered_files(path, - [signature_path] + ignorepaths) - - # We shard the computation by creating independent "tasks". - if shard < 0: - raise ValueError(f"{str(shard)} is invalid") - tasks = Serializer._create_tasks(children, shard) - - # Share the computation of hashes. - # For simplicity, we pre-allocate the entire array that will hold - # the concatenation of all hashes. - all_hashes = Serializer._run_tasks(path, chunk, tasks) - - # Finally, we hash everything. - return hashlib.sha256(bytes(all_hashes)).digest() - - def serialize_v1(path: Path, chunk: int, signature_path: Path, - ignorepaths: [Path] = []) -> bytes: - # NOTE: The shard size must be the same for all clients for - # compatibility. We could make it configurable; but in this - # case the signature file must contain the value used by the signer. - shard_size = 1000000000 # 1GB - return Serializer._serialize_v1(path, chunk, shard_size, - signature_path, ignorepaths) - - @staticmethod - def serialize_v0(path: Path, chunk: int, signature_path: Path, - ignorepaths: [Path] = []) -> bytes: - if not path.exists(): - raise ValueError(f"{str(path)} does not exist") - - if not allow_symlinks and path.is_symlink(): - raise ValueError(f"{str(path)} is a symlink") - - if chunk < 0: - raise ValueError(f"{str(chunk)} is invalid") - - if path.is_file(): - return Hasher.root_file(path, chunk) - - if not path.is_dir(): - raise ValueError(f"{str(path)} is not a dir") - - # Validate the signature path. - validate_signature_path(path, signature_path) - - children = sorted([x for x in path.iterdir() - if x != signature_path and x not in ignorepaths]) - # TODO: remove this special case? - if len(children) == 0: - return Hasher.root_folder(path, b"empty") - - hash = hashlib.sha256() - for child in children: - child_hash = Serializer._serialize_node(child, chunk, " ", - ignorepaths) - hash.update(child_hash) - content = hash.digest() - return Hasher.root_folder(path, content) - - @staticmethod - def _serialize_node(path: Path, chunk: int, indent="", - ignorepaths: [Path] = []) -> bytes: - if not allow_symlinks and path.is_symlink(): - raise ValueError(f"{str(path)} is a symlink") - - if path.is_file(): - return Hasher.node_file(path, chunk) - - if not path.is_dir(): - raise ValueError(f"{str(path)} is not a dir") - - children = sorted([x for x in path.iterdir() if x not in ignorepaths]) - # TODO: remove this special case? - if len(children) == 0: - return Hasher.node_folder(path, b"empty") - - hash = hashlib.sha256() - for child in children: - child_hash = Serializer._serialize_node(child, chunk, indent + " ", - ignorepaths) - hash.update(child_hash) - content = hash.digest() - return Hasher.node_folder(path, content) diff --git a/model_signing/serialize_test.py b/model_signing/serialize_test.py deleted file mode 100644 index 2afc46e8..00000000 --- a/model_signing/serialize_test.py +++ /dev/null @@ -1,818 +0,0 @@ -# Copyright 2024 The Sigstore Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -from pathlib import Path -import pytest -from model_signing.serialize import Serializer -import shutil - - -testdata_dir = "testdata" - - -# Utility functions. -def create_empty_folder(name: str) -> Path: - p = os.path.join(os.getcwd(), testdata_dir, name) - os.makedirs(p) - return Path(p) - - -def create_random_folders(name: str) -> (Path, int, [Path], [Path]): - p = os.path.join(os.getcwd(), testdata_dir, name) - - content = os.urandom(1) - dirs = [p] - # Generate 8 directories. - for i in range(8): - bit = (content[0] >> i) & 1 - if bit > 0: - # Add depth to the previously-created directory. - dirs[-1] = os.path.join(dirs[-1], "dir_%d" % i) - else: - # Add a directory in the same directory as the previous entry. - parent = os.path.dirname(dirs[-1]) - if Path(parent) == Path(p).parent: - parent = str(p) - dirs += [os.path.join(parent, "dir_%d" % i)] - for d in dirs: - os.makedirs(d) - - # Create at most 3 files in each directory. - files = [] - for d in dirs: - b = os.urandom(1) - n = b[0] & 3 - for i in range(n): - files += [os.path.join(d, "file_%d" % n)] - content = os.urandom(28) - with open(files[-1], "wb") as f: - f.write(content) - - return Path(p), 28, [Path(d) for d in sorted(dirs)], [Path(f) for f in sorted(files)] # noqa: E501 ignore long line warning - - -def create_symlinks(src: str, dst: str) -> Path: - psrc = os.path.join(os.getcwd(), testdata_dir, src) - pdst = os.path.join(os.getcwd(), testdata_dir, dst) - os.symlink(psrc, pdst) - return Path(dst) - - -def cleanup_model(p: Path) -> None: - if p.is_dir(): - shutil.rmtree(p) - elif p.is_file(): - os.unlink(p) - try: - os.unlink(p.with_suffix(".sig")) - except FileNotFoundError: - pass - - -def create_file(name: str, data: bytes) -> Path: - p = os.path.join(os.getcwd(), testdata_dir, name) - with open(p, "wb") as f: - f.write(data) - return Path(p) - - -def create_random_file(name: str, size: int) -> (Path, bytes): - p = os.path.join(os.getcwd(), testdata_dir, name) - content = os.urandom(size) - with open(p, "wb") as f: - f.write(content) - return Path(p), content - - -def signature_path(model: Path) -> Path: - if model.is_file(): - return model.with_suffix(".sig") - return model.joinpath("model.sig") - - -class Test_serialize_v0: - # File serialization works. - def test_known_file(self): - file = "v0_test_known_file" - data = b"hellow world content" - model = create_file(file, data) - sig_path = signature_path(model) - expected = b'x\x9d\xa4N\x9f\xeajd\xd8\x87\x84\x1a\xd3\xb3\xfc\xeb\xf6\r\x01\x9fi8#\xd8qU\x90\xca\x9d\x83\xe1\x8b' # noqa: E501 ignore long line warning - computed = Serializer.serialize_v0(model, 0, sig_path) - assert (computed == expected) - cleanup_model(model) - - # File serialization returns the same results for different chunk sizes. - def test_file_chunks(self): - file = "v0_test_file_chunks" - file_size = 999 - model, _ = create_random_file(file, file_size) - sig_path = signature_path(model) - result = Serializer.serialize_v0(model, 0, sig_path) - # NOTE: we want to also test a chunk size larger than the files size. - for c in range(1, file_size + 1): - r = Serializer.serialize_v0(model, c, sig_path) - assert (r == result) - cleanup_model(model) - - # File serialization raises error for negative chunk values. - def test_file_negative_chunks(self): - file = "v0_test_file_negative_chunks" - data = b"hellow world content" - model = create_file(file, data) - sig_path = signature_path(model) - with pytest.raises(ValueError): - _ = Serializer.serialize_v0(model, -1, sig_path) - cleanup_model(model) - - # File serialization returns the same results for different file names. - def test_different_filename(self): - file = "v0_test_different_filename" - data = b"hellow world content" - model = create_file(file, data) - sig_path = signature_path(model) - r0 = Serializer.serialize_v0(model, 0, sig_path) - cleanup_model(model) - - file = "v0_test_different_filename2" - model = create_file(file, data) - sig_path = signature_path(model) - r1 = Serializer.serialize_v0(model, 0, sig_path) - cleanup_model(model) - - assert (r0 == r1) - - # File serialization returns a different result for different model - # contents. - def test_altered_file(self): - file = "v0_test_altered_file" - file_size = 999 - model, content = create_random_file(file, file_size) - sig_path = signature_path(model) - result = Serializer.serialize_v0(model, 0, sig_path) - for c in range(file_size): - altered_content = content[:c] + bytes([content[c] ^ 1]) + \ - content[c+1:] - altered_file = file + (".%d" % c) - altered_model = create_file(altered_file, altered_content) - altered_sig_path = signature_path(altered_model) - altered_result = Serializer.serialize_v0(altered_model, 0, - altered_sig_path) - assert (altered_result != result) - cleanup_model(altered_model) - cleanup_model(model) - - # symlink in root folder raises ValueError exception. - def test_folder_symlink_root(self): - folder = "v0_test_folder_symlink_root" - model = create_empty_folder(folder) - sig = signature_path(model) - create_symlinks(".", os.path.join(folder, "root_link")) - with pytest.raises(ValueError): - Serializer.serialize_v0(Path(folder), 0, sig) - cleanup_model(model) - - # symlink in non-root folder raises ValueError exception. - def test_folder_symlink_nonroot(self): - model = create_empty_folder("v0_test_folder_symlink_nonroot") - sub_folder = model.joinpath("sub") - create_empty_folder(str(sub_folder)) - sig = signature_path(model) - create_symlinks(".", os.path.join(sub_folder, "sub_link")) - with pytest.raises(ValueError): - Serializer.serialize_v0(model, 0, sig) - cleanup_model(model) - - # Folder serialization works. - def test_known_folder(self): - folder = "v0_test_known_folder" - model = create_empty_folder(folder) - sig = signature_path(model) - os.mkdir(model.joinpath("dir1")) - os.mkdir(model.joinpath("dir2")) - os.mkdir(model.joinpath("dir3")) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - with open(model.joinpath("dir1", "f12"), "wb") as f: - f.write(b"content f12") - with open(model.joinpath("dir3", "f31"), "wb") as f: - f.write(b"content f31") - result = Serializer.serialize_v0(model, 0, sig) - expected = b's\xac\xf7\xbdC\x14\x97fv\x97\x9c\xd3\xe4=,\xe7\x99.d(oP\xff\xe2\xd8~\xa2\x9cS\xe2/\xd9' # noqa: E501 ignore long line warning - assert (result == expected) - cleanup_model(model) - - # Folder serialization raises error for negative chunk values. - def test_folder_negative_chunks(self): - dir = "v0_test_folder_negative_chunks" - model = create_empty_folder(dir) - sig_path = signature_path(model) - with pytest.raises(ValueError): - _ = Serializer.serialize_v0(model, -1, sig_path) - cleanup_model(model) - - # Folder serialization returns the same results for different folder names. - def test_different_dirname(self): - folder = "v0_test_different_dirname" - model = create_empty_folder(folder) - sig = signature_path(model) - os.mkdir(model.joinpath("dir1")) - os.mkdir(model.joinpath("dir2")) - os.mkdir(model.joinpath("dir3")) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - with open(model.joinpath("dir1", "f12"), "wb") as f: - f.write(b"content f12") - with open(model.joinpath("dir3", "f31"), "wb") as f: - f.write(b"content f31") - r0 = Serializer.serialize_v0(model, 0, sig) - - # Rename the folder. - new_model = model.parent.joinpath("model_dir2") - os.rename(model, new_model) - sig_path = signature_path(new_model) - r1 = Serializer.serialize_v0(new_model, 0, sig_path) - cleanup_model(new_model) - - assert (r0 == r1) - - # Folder serialization returns the same results for different folder or - # file names and / or file contents. - def test_different_ignored_paths(self): - folder = "v0_test_different_ignored_paths" - model = create_empty_folder(folder) - sig = signature_path(model) - os.mkdir(model.joinpath("dir1")) - os.mkdir(model.joinpath("dir2")) - os.mkdir(model.joinpath("dir2/dir3")) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - with open(model.joinpath("dir2", "f21"), "wb") as f: - f.write(b"content f21") - with open(model.joinpath("dir2/dir3", "f31"), "wb") as f: - f.write(b"content f31") - r0 = Serializer.serialize_v1(model, 0, sig) - r1 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir1")]) - r2 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir2")]) - r3 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir2/dir3")]) # noqa: E501 ignore long line warning - r4 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir2/dir3/f31")]) # noqa: E501 ignore long line warning - - # Sanity checks. - s = set({r0, r1, r2, r3, r4}) - assert (len(s) == 5) - - # Rename the file under dir1. - new_file = model.joinpath("dir1/f11_altered") - os.rename(model.joinpath("dir1/f11"), new_file) - r11 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir1")]) - assert (r11 == r1) - os.rename(new_file, model.joinpath("dir1/f11")) - - # Update the file under dir1. - r11 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir1")]) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11 altered") - assert (r11 == r1) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - - # Rename the folder dir2. - new_dir = model.joinpath("dir2/dir3_altered") - os.rename(model.joinpath("dir2/dir3"), new_dir) - r22 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir2")]) - assert (r22 == r2) - os.rename(new_dir, model.joinpath("dir2/dir3")) - - # Add a file under dir2. - with open(model.joinpath("dir2", "new_file"), "wb") as f: - f.write(b"new file!!") - r22 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir2")]) - assert (r22 == r2) - os.unlink(model.joinpath("dir2", "new_file")) - - # Update the content of f31 file. - with open(model.joinpath("dir2/dir3", "f31"), "wb") as f: - f.write(b"content f31 altered") - r22 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir2")]) - assert (r22 == r2) - r33 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir2/dir3")]) # noqa: E501 ignore long line warning - assert (r33 == r3) - r44 = Serializer.serialize_v0(model, 0, sig, [model.joinpath("dir2/dir3/f31")]) # noqa: E501 ignore long line warning - assert (r44 == r4) - with open(model.joinpath("dir2/dir3", "f31"), "wb") as f: - f.write(b"content f31") - - cleanup_model(model) - - # Folder serialization returns different results - # for an empty file or directory with the same name. - def test_file_dir(self): - folder = "v0_test_file_dir" - model = create_empty_folder(folder) - sig = signature_path(model) - os.mkdir(model.joinpath("dir1")) - os.mkdir(model.joinpath("dir2")) - os.mkdir(model.joinpath("dir3")) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - with open(model.joinpath("dir1", "f12"), "wb") as f: - f.write(b"content f12") - with open(model.joinpath("dir3", "f31"), "wb") as f: - f.write(b"content f31") - r0 = Serializer.serialize_v0(model, 0, sig) - - # Remove dir2 and create an empty file with the same name. - dir2 = model.joinpath("dir2") - os.rmdir(dir2) - with open(dir2, 'w') as _: - pass - r1 = Serializer.serialize_v0(model, 0, sig) - assert (r0 != r1) - cleanup_model(model) - - # Folder serialization return different values for different - # sub-directory names. - def test_random_folder_different_folder_names(self): - dir = "v0_test_random_folder_different_folder_names" - model, _, dirs, _ = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer.serialize_v0(model, 0, sig_path) - for d in dirs: - if d == model: - # Ignore the model folder. - continue - new_folder = d.parent.joinpath(d.name + "_altered") - os.rename(d, new_folder) - r = Serializer.serialize_v0(model, 0, sig_path) - os.rename(new_folder, d) - assert (r != result) - cleanup_model(model) - - # Folder serialization return different values for different file names. - def test_random_folder_different_filenames(self): - dir = "v0_test_random_folder_different_filenames" - model, _, _, files = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer.serialize_v0(model, 0, sig_path) - for f in files: - new_file = f.parent.joinpath(f.name + "_altered") - os.rename(f, new_file) - r = Serializer.serialize_v0(model, 0, sig_path) - os.rename(new_file, f) - assert (r != result) - cleanup_model(model) - - # Folder serialization return different values for different file contents. - def test_random_folder_different_file_content(self): - dir = "v0_test_random_folder_different_file_content" - model, _, _, files = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer.serialize_v0(model, 0, sig_path) - for f in files: - content = b'' - with open(f, "rb") as ff: - content = ff.read() - for c in range(len(content)): - # Alter the file content, one byte at a time. - altered_content = content[:c] + bytes([content[c] ^ 1]) + \ - content[c+1:] - with open(f, "wb") as ff: - ff.write(altered_content) - r = Serializer.serialize_v0(model, 0, sig_path) - assert (r != result) - # Write the original content back to the file. - with open(f, "wb") as ff: - ff.write(content) - cleanup_model(model) - - # Folder serialization return same results for different chunk sizes. - def test_random_folder_different_chunks(self): - dir = "v0_test_random_folder_different_chunks" - model, max_size, _, _ = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer.serialize_v0(model, 0, sig_path) - # NOTE: we want to also test a chunk size larger than the files size. - for c in range(1, max_size + 1): - r = Serializer.serialize_v0(model, c, sig_path) - assert (r == result) - cleanup_model(model) - - # Folder serialization raises an exception if the signature - # file is not in the root folder. - def test_folfer_invalid_sign_path(self): - dir = "v0_test_folfer_invalid_sign_path" - model = create_empty_folder(dir) - sig_path = model.joinpath("sub/model.sig") - with pytest.raises(ValueError): - _ = Serializer.serialize_v0(model, 0, sig_path) - cleanup_model(model) - - -class Test_serialize_v1: - # File serialization works. - def test_known_file(self): - file = "v1_test_known_file" - data = b"hellow world content" - model = create_file(file, data) - sig_path = signature_path(model) - expected = b'\xfd\xe0s^{ \xf8\xed\xb4\x9c\xbf\xc0\xf6\x87\x0f\x1a\x896~\xeeBH\xec\xf57<\x9d\x04B"7\xb1' # noqa: E501 ignore long line warning - computed = Serializer.serialize_v1(model, 0, sig_path) - assert (computed == expected) - cleanup_model(model) - - # File serialization returns the same results for different chunk sizes. - def test_file_chunks(self): - file = "v1_test_file_chunks" - file_size = 99 - model, _ = create_random_file(file, file_size) - sig_path = signature_path(model) - result = Serializer.serialize_v1(model, 0, sig_path) - # NOTE: we want to also test a chunk size larger than the files size. - for c in range(1, file_size + 1): - r = Serializer.serialize_v1(model, c, sig_path) - assert (r == result) - cleanup_model(model) - - # File serialization raises an exception for negative shard sizes. - def test_file_negative_shards(self): - file = "v1_test_file_negative_shards" - data = b"hellow world content" - model = create_file(file, data) - sig_path = signature_path(model) - with pytest.raises(ValueError): - _ = Serializer._serialize_v1(model, 0, -1, sig_path) - cleanup_model(model) - - # File serialization returns different results for different shard sizes. - def test_file_shards(self): - file = "v1_test_file_shards" - file_size = 99 - model, _ = create_random_file(file, file_size) - sig_path = signature_path(model) - result = Serializer._serialize_v1(model, 1, 1, sig_path) - results = [result] - for shard in range(2, file_size + 1): - r = Serializer._serialize_v1(model, 1, shard, sig_path) - assert (r not in results) - results += [r] - cleanup_model(model) - - # File serialization returns different results for different shard sizes - # but same results for different chunk sizes with shard size fixed. - def test_file_shard_chunks(self): - file = "v1_test_file_shard_chunks" - file_size = 21 - model, _ = create_random_file(file, file_size) - sig_path = signature_path(model) - result = Serializer._serialize_v1(model, 1, 1, sig_path) - results = [result] - for shard in range(2, file_size + 1): - r = Serializer._serialize_v1(model, 1, shard, sig_path) - assert (r not in results) - results += [r] - for c in range(1, file_size + 1): - rc = Serializer._serialize_v1(model, c, shard, sig_path) - assert (rc == r) - cleanup_model(model) - - # File serialization returns the same results for different file names. - def test_different_filename(self): - file = "v1_test_different_filename" - data = b"hellow world content" - model = create_file(file, data) - sig_path = signature_path(model) - r0 = Serializer.serialize_v1(model, 0, sig_path) - cleanup_model(model) - - file = "v1_test_different_filename2" - model = create_file(file, data) - sig_path = signature_path(model) - r1 = Serializer.serialize_v1(model, 0, sig_path) - cleanup_model(model) - - assert (r0 == r1) - - # File serialization returns a different result for different model - # contents. - def test_altered_file(self): - file = "v1_test_altered_file" - file_size = 99 - model, content = create_random_file(file, file_size) - sig_path = signature_path(model) - result = Serializer._serialize_v1(model, 0, 19, sig_path) - for c in range(file_size): - altered_content = content[:c] + bytes([content[c] ^ 1]) + \ - content[c+1:] - altered_file = file + (".%d" % c) - altered_model = create_file(altered_file, altered_content) - altered_sig_path = signature_path(altered_model) - altered_result = Serializer._serialize_v1(altered_model, 0, - 19, altered_sig_path) - assert (altered_result != result) - cleanup_model(altered_model) - cleanup_model(model) - - # File serialization works on large files. - def test_large_file(self): - file = "v1_test_large_file" - file_size = 1000100001 - model, _ = create_random_file(file, file_size) - sig_path = signature_path(model) - _ = Serializer.serialize_v1(model, 0, sig_path) - cleanup_model(model) - - # symlink in root folder raises ValueError exception. - def test_folder_symlink_root(self): - folder = "v1_test_folder_symlink_root" - model = create_empty_folder(folder) - sig = signature_path(model) - create_symlinks(".", os.path.join(folder, "root_link")) - with pytest.raises(ValueError): - Serializer.serialize_v1(Path(folder), 0, sig) - cleanup_model(model) - - # symlink in non-root folder raises ValueError exception. - def test_folder_symlink_nonroot(self): - model = create_empty_folder("v1_test_folder_symlink_nonroot") - sub_folder = model.joinpath("sub") - create_empty_folder(str(sub_folder)) - sig = signature_path(model) - create_symlinks(".", os.path.join(sub_folder, "sub_link")) - with pytest.raises(ValueError): - Serializer.serialize_v1(model, 0, sig) - cleanup_model(model) - - # Folder serialization works. - def test_known_folder(self): - folder = "v1_test_known_folder" - model = create_empty_folder(folder) - sig = signature_path(model) - os.mkdir(model.joinpath("dir1")) - os.mkdir(model.joinpath("dir2")) - os.mkdir(model.joinpath("dir3")) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - with open(model.joinpath("dir1", "f12"), "wb") as f: - f.write(b"content f12") - with open(model.joinpath("dir3", "f31"), "wb") as f: - f.write(b"content f31") - result = Serializer.serialize_v1(model, 0, sig) - expected = b'\x8b\xc3\xdc\xf1\xaf\xd8\x1b\x1f\xa0\x18&\x0eo|\xc4\xc6f~]]\xd6\x91\x15\x94-Vm\xf6\xa5\xed\xc8L' # noqa: E501 ignore long line warning - assert (result == expected) - cleanup_model(model) - - # Folder serialization raises error for negative chunk values. - def test_folder_negative_chunks(self): - dir = "v1_test_folder_negative_chunks" - model = create_empty_folder(dir) - sig_path = signature_path(model) - with pytest.raises(ValueError): - _ = Serializer.serialize_v1(model, -1, sig_path) - cleanup_model(model) - - # Folder serialization returns the same results for different folder names. - def test_different_dirname(self): - folder = "v1_test_different_dirname" - model = create_empty_folder(folder) - sig = signature_path(model) - os.mkdir(model.joinpath("dir1")) - os.mkdir(model.joinpath("dir2")) - os.mkdir(model.joinpath("dir3")) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - with open(model.joinpath("dir1", "f12"), "wb") as f: - f.write(b"content f12") - with open(model.joinpath("dir3", "f31"), "wb") as f: - f.write(b"content f31") - r0 = Serializer.serialize_v1(model, 0, sig) - - # Rename the folder. - new_model = model.parent.joinpath("model_dir2") - os.rename(model, new_model) - sig_path = signature_path(new_model) - r1 = Serializer.serialize_v1(new_model, 0, sig_path) - cleanup_model(new_model) - - assert (r0 == r1) - - # Folder serialization returns the same results for different folder or - # file names and / or file contents. - def test_different_ignored_paths(self): - folder = "v1_test_different_ignored_paths" - model = create_empty_folder(folder) - sig = signature_path(model) - os.mkdir(model.joinpath("dir1")) - os.mkdir(model.joinpath("dir2")) - os.mkdir(model.joinpath("dir2/dir3")) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - with open(model.joinpath("dir2", "f21"), "wb") as f: - f.write(b"content f21") - with open(model.joinpath("dir2/dir3", "f31"), "wb") as f: - f.write(b"content f31") - r0 = Serializer.serialize_v1(model, 0, sig) - r1 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir1")]) - r2 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir2")]) - r3 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir2/dir3")]) # noqa: E501 ignore long line warning - r4 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir2/dir3/f31")]) # noqa: E501 ignore long line warning - - # Sanity checks. - s = set({r0, r1, r2, r3, r4}) - assert (len(s) == 5) - - # Rename the file under dir1. - new_file = model.joinpath("dir1/f11_altered") - os.rename(model.joinpath("dir1/f11"), new_file) - r11 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir1")]) - assert (r11 == r1) - os.rename(new_file, model.joinpath("dir1/f11")) - - # Update the file under dir1. - r11 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir1")]) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11 altered") - assert (r11 == r1) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - - # Rename the folder dir2. - new_dir = model.joinpath("dir2/dir3_altered") - os.rename(model.joinpath("dir2/dir3"), new_dir) - r22 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir2")]) - assert (r22 == r2) - os.rename(new_dir, model.joinpath("dir2/dir3")) - - # Add a file under dir2. - with open(model.joinpath("dir2", "new_file"), "wb") as f: - f.write(b"new file!!") - r22 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir2")]) - assert (r22 == r2) - os.unlink(model.joinpath("dir2", "new_file")) - - # Update the content of f31 file. - with open(model.joinpath("dir2/dir3", "f31"), "wb") as f: - f.write(b"content f31 altered") - r22 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir2")]) - assert (r22 == r2) - r33 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir2/dir3")]) # noqa: E501 ignore long line warning - assert (r33 == r3) - r44 = Serializer.serialize_v1(model, 0, sig, [model.joinpath("dir2/dir3/f31")]) # noqa: E501 ignore long line warning - assert (r44 == r4) - with open(model.joinpath("dir2/dir3", "f31"), "wb") as f: - f.write(b"content f31") - - cleanup_model(model) - - # Folder serialization returns different results - # for an empty file or directory with the same name. - def test_file_dir(self): - folder = "v1_test_file_dir" - model = create_empty_folder(folder) - sig = signature_path(model) - os.mkdir(model.joinpath("dir1")) - os.mkdir(model.joinpath("dir2")) - os.mkdir(model.joinpath("dir3")) - with open(model.joinpath("dir1", "f11"), "wb") as f: - f.write(b"content f11") - with open(model.joinpath("dir1", "f12"), "wb") as f: - f.write(b"content f12") - with open(model.joinpath("dir3", "f31"), "wb") as f: - f.write(b"content f31") - r0 = Serializer.serialize_v1(model, 0, sig) - - # Remove dir2 and create an empty file with the same name. - dir2 = model.joinpath("dir2") - os.rmdir(dir2) - with open(dir2, 'w') as _: - pass - r1 = Serializer.serialize_v1(model, 0, sig) - assert (r0 != r1) - cleanup_model(model) - - # Folder serialization return different values for different - # sub-directory names. - def test_random_folder_different_folder_names(self): - dir = "v1_test_random_folder_different_folder_names" - model, _, dirs, _ = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer.serialize_v1(model, 0, sig_path) - for d in dirs: - if d == model: - # Ignore the model folder. - continue - new_folder = d.parent.joinpath(d.name + "_altered") - os.rename(d, new_folder) - r = Serializer.serialize_v1(model, 0, sig_path) - os.rename(new_folder, d) - assert (r != result) - cleanup_model(model) - - # Folder serialization return different values for different file names. - def test_random_folder_different_filenames(self): - dir = "v1_test_random_folder_different_filenames" - model, _, _, files = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer.serialize_v1(model, 0, sig_path) - for f in files: - new_file = f.parent.joinpath(f.name + "_altered") - os.rename(f, new_file) - r = Serializer.serialize_v1(model, 0, sig_path) - os.rename(new_file, f) - assert (r != result) - cleanup_model(model) - - # Folder serialization return different values for different file contents. - def test_random_folder_different_file_content(self): - dir = "v1_test_random_folder_different_file_content" - model, _, _, files = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer.serialize_v1(model, 0, sig_path) - for f in files: - content = b'' - with open(f, "rb") as ff: - content = ff.read() - for c in range(len(content)): - # Alter the file content, one byte at a time. - altered_content = content[:c] + bytes([content[c] ^ 1]) + \ - content[c+1:] - with open(f, "wb") as ff: - ff.write(altered_content) - r = Serializer.serialize_v1(model, 0, sig_path) - assert (r != result) - # Write the original content back to the file. - with open(f, "wb") as ff: - ff.write(content) - cleanup_model(model) - - # Folder serialization return same results for different chunk sizes. - def test_random_folder_different_chunks(self): - dir = "v1_test_random_folder_different_chunks" - model, max_size, _, _ = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer.serialize_v1(model, 0, sig_path) - # NOTE: we want to also test a chunk size larger than the files size. - for c in range(1, max_size + 1): - r = Serializer.serialize_v1(model, c, sig_path) - assert (r == result) - cleanup_model(model) - - # Folder serialization raises an exception if the signature - # file is not in the root folder. - def test_folfer_invalid_sign_path(self): - dir = "v1_test_folfer_invalid_sign_path" - model = create_empty_folder(dir) - sig_path = model.joinpath("sub/model.sig") - with pytest.raises(ValueError): - _ = Serializer.serialize_v1(model, 0, sig_path) - cleanup_model(model) - - # Folder serialization raises an exception for negative shard sizes. - def test_folder_negative_shards(self): - folder = "v1_test_folder_negative_shards" - model = create_empty_folder(folder) - sig_path = signature_path(model) - with pytest.raises(ValueError): - _ = Serializer._serialize_v1(model, 0, -1, sig_path) - cleanup_model(model) - - # Folder serialization returns different results for different shard sizes. - def test_folder_shards(self): - dir = "v1_test_folder_shards" - model, max_size, _, files = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer._serialize_v1(model, 1, 1, sig_path) - results = [result] - for shard in range(2, max_size + 1): - r = Serializer._serialize_v1(model, 1, shard, sig_path) - assert (r not in results) - results += [r] - cleanup_model(model) - - # Folder serialization returns different results for different shard sizes - # but same results for different chunk sizes with shard size fixed. - def test_folder_shard_chunks(self): - dir = "v1_test_folder_shard_chunks" - model, max_size, _, _ = create_random_folders(dir) - sig_path = signature_path(model) - result = Serializer._serialize_v1(model, 1, 1, sig_path) - results = [result] - for shard in range(2, max_size + 1): - r = Serializer._serialize_v1(model, 1, shard, sig_path) - assert (r not in results) - results += [r] - for c in range(1, max_size + 1): - rc = Serializer._serialize_v1(model, c, shard, sig_path) - assert (rc == r) - cleanup_model(model) diff --git a/model_signing/signature/fake.py b/model_signing/signature/fake.py index 6ef8ba8c..330cbb8c 100644 --- a/model_signing/signature/fake.py +++ b/model_signing/signature/fake.py @@ -20,7 +20,7 @@ from sigstore_protobuf_specs.dev.sigstore.common import v1 as common_pb from sigstore_protobuf_specs.io import intoto as intoto_pb -from model_signing.signature.signature import Signer +from model_signing.signature.signing import Signer from model_signing.signature.encoding import PAYLOAD_TYPE from model_signing.signature.verifying import Verifier diff --git a/model_signing/signing/in_toto.py b/model_signing/signing/in_toto.py index 66af0fac..cbfd4ec6 100644 --- a/model_signing/signing/in_toto.py +++ b/model_signing/signing/in_toto.py @@ -1,4 +1,5 @@ # Copyright 2024 The Sigstore Authors +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,14 +20,22 @@ envelope format is DSSE, see https://github.com/secure-systems-lab/dsse. """ +from pathlib import Path from typing import Final, Self +from google.protobuf import json_format from in_toto_attestation.v1 import statement +from in_toto_attestation.v1 import statement_pb2 +from in_toto_attestation.v1 import resource_descriptor_pb2 from typing_extensions import override +from sigstore_protobuf_specs.dev.sigstore.bundle import v1 as bundle_pb +from model_signing.hashing import hashing from model_signing.hashing import memory from model_signing.manifest import manifest as manifest_module from model_signing.signing import signing +from model_signing.signature import signing as signature_signing +from model_signing.signature import verifying as signature_verifying class IntotoPayload(signing.SigningPayload): @@ -40,6 +49,7 @@ class IntotoPayload(signing.SigningPayload): """ predicate_type: Final[str] + statement: statement.Statement class SingleDigestIntotoPayload(IntotoPayload): @@ -131,7 +141,7 @@ def _convert_descriptors_to_hashed_statement( *, predicate_type: str, predicate_top_level_name: str, -): +) -> statement.Statement: """Converts manifest descriptors to an in-toto statement with payload. Args: @@ -356,7 +366,7 @@ def from_manifest(cls, manifest: manifest_module.Manifest) -> Self: def _convert_descriptors_to_direct_statement( manifest: manifest_module.Manifest, predicate_type: str -): +) -> statement.Statement: """Converts manifest descriptors to an in-toto statement, as subjects. Args: @@ -586,3 +596,118 @@ def from_manifest(cls, manifest: manifest_module.Manifest) -> Self: manifest, predicate_type=cls.predicate_type ) return cls(statement) + + +def _convert_shard_subject( + res_desc: resource_descriptor_pb2.ResourceDescriptor + ) -> manifest_module.ShardedFileManifestItem: + name, start, end = res_desc.name.split(":") + digest = hashing.Digest( + algorithm=res_desc.annotations["actual_hash_algorithm"], + digest_value=bytearray.fromhex(res_desc.digest["sha256"]) + ) + return manifest_module.ShardedFileManifestItem( + path=Path(name), + start=int(start), + end=int(end), + digest=digest + ) + + +def _convert_shard_predicate( + shard: dict[str, str]) -> manifest_module.ShardedFileManifestItem: + name, start, end = shard["name"].split(":") + digest = hashing.Digest( + algorithm=shard["algorithm"], + digest_value=bytearray.fromhex(shard["digest"]) + ) + return manifest_module.ShardedFileManifestItem( + path=Path(name), + start=int(start), + end=int(end), + digest=digest + ) + + +class IntotoSignature(signing.Signature): + + def __init__(self, bundle: bundle_pb.Bundle): + self._bundle = bundle + + @override + def write(self, path: Path) -> None: + path.write_text(self._bundle.to_json()) + + @classmethod + @override + def read(cls, path: Path) -> Self: + bundle = bundle_pb.Bundle().from_json(path.read_text()) + return cls(bundle) + + def to_manifest(self) -> manifest_module.Manifest: + payload = self._bundle.dsse_envelope.payload + stmnt_pb = json_format.Parse(payload, statement_pb2.Statement()) + if stmnt_pb.predicate_type == ShardDigestsIntotoPayload.predicate_type: + return manifest_module.ShardLevelManifest( + items=[_convert_shard_subject(f) for f in stmnt_pb.subject] + ) + elif stmnt_pb.predicate_type == DigestsIntotoPayload.predicate_type: + return manifest_module.FileLevelManifest( + items=[manifest_module.FileManifestItem( + path=Path(s.name), + digest=hashing.Digest( + algorithm=s.annotations["actual_hash_algorithm"], + digest_value=bytearray.fromhex(s.digest["sha256"]) + ) + ) for s in stmnt_pb.subject] + ) + elif stmnt_pb.predicate_type == DigestOfShardDigestsIntotoPayload.predicate_type: + return manifest_module.ShardLevelManifest( + items=[_convert_shard_predicate(f) + for f in stmnt_pb.predicate["shards"]] + ) + elif stmnt_pb.predicate_type == DigestOfDigestsIntotoPayload.predicate_type: + return manifest_module.FileLevelManifest( + items=[manifest_module.FileManifestItem( + path=Path(f["name"]), + digest=hashing.Digest( + algorithm=f["algorithm"], + digest_value=bytearray.fromhex(f["digest"]) + ) + ) for f in stmnt_pb.predicate["files"]] + ) + elif stmnt_pb.predicate_type == SingleDigestIntotoPayload.predicate_type: + return manifest_module.DigestManifest( + digest=hashing.Digest( + algorithm=stmnt_pb.predicate["actual_hash_algorithm"], + digest_value=bytearray.fromhex( + stmnt_pb.subject[0].digest["sha256"]) + )) + else: + raise TypeError(f"{stmnt_pb.type} is not supported") + + +class IntotoSigner(signing.Signer): + + def __init__(self, sig_signer: signature_signing.Signer): + self._sig_signer = sig_signer + + @override + def sign(self, payload: signing.SigningPayload) -> signing.Signature: + if not isinstance(payload, IntotoPayload): + raise TypeError("only IntotoPayloads are supported") + bundle = self._sig_signer.sign(payload.statement) + return IntotoSignature(bundle) + + +class IntotoVerifier(signing.Verifier): + + def __init__(self, sig_verifier: signature_verifying.Verifier): + self._sig_verifier = sig_verifier + + @override + def verify(self, signature: signing.Signature) -> manifest_module.Manifest: + if not isinstance(signature, IntotoSignature): + raise TypeError("only IntotoSignature is supported") + self._sig_verifier.verify(signature._bundle) + return signature.to_manifest() diff --git a/model_signing/signing/in_toto_test.py b/model_signing/signing/in_toto_test.py index 51ccbe0a..6b7bc92e 100644 --- a/model_signing/signing/in_toto_test.py +++ b/model_signing/signing/in_toto_test.py @@ -34,6 +34,7 @@ from model_signing.serialization import serialize_by_file from model_signing.serialization import serialize_by_file_shard from model_signing.signing import in_toto +from model_signing.signature import fake class TestSingleDigestIntotoPayload: @@ -326,3 +327,48 @@ def test_only_runs_on_expected_manifest_types(self): match="Only ShardLevelManifest is supported", ): in_toto.ShardDigestsIntotoPayload.from_manifest(manifest) + + +class TestIntotoSignature: + + def _shard_hasher_factory( + self, path: pathlib.Path, start: int, end: int + ) -> file.ShardedFileHasher: + return file.ShardedFileHasher( + path, memory.SHA256(), start=start, end=end + ) + + def _hasher_factory( + self, path: pathlib.Path, + ) -> file.BinaryIO: + return file.SimpleFileHasher( + path, memory.SHA256() + ) + + def test_to_manifest(self, sample_model_folder): + signer = in_toto.IntotoSigner(fake.FakeSigner()) + shard_serializer = serialize_by_file_shard.ManifestSerializer( + self._shard_hasher_factory, allow_symlinks=True, + ) + shard_manifest = shard_serializer.serialize(sample_model_folder) + + for payload_type in [ + in_toto.ShardDigestsIntotoPayload, + in_toto.DigestOfShardDigestsIntotoPayload]: + payload = payload_type.from_manifest(shard_manifest) + print(payload.statement.pb.predicate_type) + sig = signer.sign(payload) + manifest = sig.to_manifest() + assert shard_manifest == manifest + + file_serializer = serialize_by_file.ManifestSerializer( + self._hasher_factory, allow_symlinks=True, + ) + file_manifest = file_serializer.serialize(sample_model_folder) + for payload_type in [ + in_toto.DigestOfDigestsIntotoPayload, + in_toto.DigestsIntotoPayload]: + payload = payload_type.from_manifest(file_manifest) + sig = signer.sign(payload) + manifest = sig.to_manifest() + assert file_manifest == manifest diff --git a/model_signing/verify.py b/model_signing/verify.py index f0d9dba5..458c39cc 100644 --- a/model_signing/verify.py +++ b/model_signing/verify.py @@ -149,7 +149,7 @@ def hasher_factory(file_path: pathlib.Path) -> file.FileHasher: verifier=verifier, model_path=args.model_path, serializer=serializer, - ignore_paths=[args.sig_path.name]) + ignore_paths=[args.sig_path]) except verifying.VerificationError as err: log.error(f'verification failed: {err}')