diff --git a/pulp_rpm/app/tasks/signing.py b/pulp_rpm/app/tasks/signing.py index aac448565..3f340cde1 100644 --- a/pulp_rpm/app/tasks/signing.py +++ b/pulp_rpm/app/tasks/signing.py @@ -5,6 +5,7 @@ from pulpcore.plugin.util import get_url from pulp_rpm.app.models.content import RpmPackageSigningService +from pulp_rpm.app.shared_utils import get_sha256 def sign_and_create( @@ -23,12 +24,21 @@ def sign_and_create( package_signing_service = RpmPackageSigningService.objects.get(pk=signing_service_pk) uploaded_package = PulpTemporaryFile.objects.get(pk=temporary_file_pk) with NamedTemporaryFile(mode="wb", dir=".", delete=False) as final_package: - with uploaded_package.file.open() as unsigned_package_file: - final_package.write(unsigned_package_file.read()) - final_package.flush() - package_signing_service.sign(final_package.name, pubkey_fingerprint=signing_fingerprint) + print("*" * 100) + fd = uploaded_package.file.open() + final_package.write(fd.read()) + final_package.flush() + fd.close() + final_package.seek(0) + print(f"{signing_fingerprint}") + print("digest_before", get_sha256(final_package.name)) + package_signing_service.sign(final_package.name, pubkey_fingerprint=signing_fingerprint) + final_package.seek(0) artifact = Artifact.init_and_validate(final_package.name) + print("local_pkg_diges", get_sha256(final_package.name)) + print(f"{artifact.sha256=}") + print("*" * 100) artifact.save() resource = CreatedResource(content_object=artifact) resource.save() diff --git a/pulp_rpm/tests/functional/api/test_package_signing.py b/pulp_rpm/tests/functional/api/test_package_signing.py index 78f38cc85..ccdd37d53 100644 --- a/pulp_rpm/tests/functional/api/test_package_signing.py +++ b/pulp_rpm/tests/functional/api/test_package_signing.py @@ -1,16 +1,19 @@ +from pathlib import Path + import pytest import requests -from pulpcore.client.pulp_rpm import RpmRpmPublication from pulpcore.exceptions.validation import InvalidSignatureError from pulp_rpm.app.shared_utils import RpmTool -from pulp_rpm.tests.functional.constants import ( - RPM_PACKAGE_FILENAME, - RPM_UNSIGNED_URL, -) +from pulp_rpm.tests.functional.constants import RPM_PACKAGE_FILENAME, RPM_UNSIGNED_URL from pulp_rpm.tests.functional.utils import get_package_repo_path +def get_fixture(path: Path, url: str) -> Path: + path.write_bytes(requests.get(url).content) + return path + + @pytest.mark.parallel def test_register_rpm_package_signing_service(rpm_package_signing_service): """ @@ -20,6 +23,25 @@ def test_register_rpm_package_signing_service(rpm_package_signing_service): assert "/api/v3/signing-services/" in service.pulp_href +@pytest.fixture +def signing_gpg_extra(signing_gpg_metadata): + """GPG instance with an extra gpg keypair registered.""" + PRIVATE_KEY_PULP_QE = ( + "https://raw.githubusercontent.com/pulp/pulp-fixtures/master/common/GPG-PRIVATE-KEY-pulp-qe" + ) + gpg, fingerprint_a, keyid_a = signing_gpg_metadata + + response_private = requests.get(PRIVATE_KEY_PULP_QE) + response_private.raise_for_status() + import_result = gpg.import_keys(response_private.content) + fingerprint_b = import_result.fingerprints[0] + gpg.trust_keys(fingerprint_b, "TRUST_ULTIMATE") + + pubkey_a = gpg.export_keys(fingerprint_a) + pubkey_b = gpg.export_keys(fingerprint_b) + return fingerprint_a, pubkey_a, fingerprint_b, pubkey_b + + @pytest.mark.parallel def test_sign_package_on_upload( tmp_path, @@ -27,47 +49,70 @@ def test_sign_package_on_upload( monitor_task, gen_object_with_cleanup, download_content_unit, + signing_gpg_extra, rpm_package_signing_service, rpm_package_api, rpm_repository_factory, - rpm_publication_api, + rpm_publication_factory, rpm_package_factory, rpm_distribution_factory, ): - """Sign an Rpm Package with the Package Upload endpoint.""" - # Setup rpm package file to upload + """ + Sign an Rpm Package with the Package Upload endpoint. + + This ensures different + """ + # Setup RPM tool and package to upload + fingerprint_a, pubkey_a, fingerprint_b, pubkey_b = signing_gpg_extra + assert rpm_package_signing_service.pubkey_fingerprint == fingerprint_a + assert rpm_package_signing_service.pubkey_fingerprint != fingerprint_b + rpm_tool = RpmTool(tmp_path) - rpm_tool.import_pubkey_string(rpm_package_signing_service.public_key) + rpm_tool.import_pubkey_string(pubkey_a) + rpm_tool.import_pubkey_string(pubkey_b) + file_to_upload = tmp_path / RPM_PACKAGE_FILENAME file_to_upload.write_bytes(requests.get(RPM_UNSIGNED_URL).content) - - # Assure it is not signed with pytest.raises(InvalidSignatureError, match="The package is not signed: .*"): rpm_tool.verify_signature(file_to_upload) - # Create Repository with related signing service + # Upload Package to Repository with signing-option on + # The same file is uploaded, but signed with different keys each time + for fingerprint in (fingerprint_a, fingerprint_b): + repository = rpm_repository_factory( + package_signing_service=rpm_package_signing_service.pulp_href, + package_signing_pubkey=fingerprint, + ) + upload_response = rpm_package_api.create( + file=str(file_to_upload.absolute()), + repository=repository.pulp_href, + sign_package=True, + ) + package_a_href = monitor_task(upload_response.task).created_resources[2] + pkg_location_href = rpm_package_api.read(package_a_href).location_href + + # Verify that the final served package is signed + publication = rpm_publication_factory(repository=repository.pulp_href) + distribution = rpm_distribution_factory(publication=publication.pulp_href) + downloaded_package = tmp_path / "package.rpm" + downloaded_package.write_bytes( + download_content_unit(distribution.base_path, get_package_repo_path(pkg_location_href)) + ) + assert rpm_tool.verify_signature(downloaded_package) + + # Can't upload same file with same key + with pytest.raises(InvalidSignatureError, match="The package is not signed: .*"): + rpm_tool.verify_signature(file_to_upload) repository = rpm_repository_factory( package_signing_service=rpm_package_signing_service.pulp_href, - package_signing_pubkey=rpm_package_signing_service.pubkey_fingerprint, + package_signing_pubkey=fingerprint_a, ) + upload_response = rpm_package_api.create( + file=str(file_to_upload.absolute()), + repository=repository.pulp_href, + sign_package=True, + ) + package_a_href = monitor_task(upload_response.task) + # import epdb;epdb.serve(port=12345) - # Upload Package to Repository with signing-option on - upload_attrs = { - "file": str(file_to_upload.absolute()), - "repository": repository.pulp_href, - "sign_package": True, - } - upload_task_href = rpm_package_api.create(**upload_attrs).task - package_href = monitor_task(upload_task_href).created_resources[2] - package_loc_href = rpm_package_api.read(package_href).location_href - - # Verify that the final served package is signed - publish_data = RpmRpmPublication(repository=repository.pulp_href) - publication = gen_object_with_cleanup(rpm_publication_api, publish_data) - distribution = rpm_distribution_factory(publication=publication.pulp_href) - - pkg_path = get_package_repo_path(package_loc_href) - package_bytes = download_content_unit(distribution.base_path, pkg_path) - downloaded_package = tmp_path / "package.rpm" - downloaded_package.write_bytes(package_bytes) - assert rpm_tool.verify_signature(downloaded_package) + # Cant use unreachable/invalid repository.signing_package_pubkey diff --git a/pulp_rpm/tests/functional/conftest.py b/pulp_rpm/tests/functional/conftest.py index 3c42e68d5..21ab9a199 100644 --- a/pulp_rpm/tests/functional/conftest.py +++ b/pulp_rpm/tests/functional/conftest.py @@ -2,6 +2,7 @@ import json import subprocess import uuid +from dataclasses import dataclass from tempfile import NamedTemporaryFile import gnupg @@ -434,6 +435,42 @@ def _sign_with_rpm_package_signing_service(filename): return _sign_with_rpm_package_signing_service +@dataclass +class GPGMetadata: + public_key: str + fingerprint: str + keyid: str + + +@pytest.fixture(scope="session") +def signing_gpg_metadata2(signing_gpg_homedir_path) -> tuple[gnupg.GPG, list[GPGMetadata]]: + """ + A fixture that returns a GPG instance and related metadata (i.e., fingerprint, keyid). + """ + PRIVATE_KEY_URLS = ( + "https://raw.githubusercontent.com/pulp/pulp-fixtures/master/common/GPG-PRIVATE-KEY-fixture-signing", # noqa: E501 + "https://raw.githubusercontent.com/pulp/pulp-fixtures/master/common/GPG-PRIVATE-KEY-pulp-qe", # noqa: E501 + ) + + gpg = gnupg.GPG(gnupghome=signing_gpg_homedir_path) + keys = [] + for privatekey_url in PRIVATE_KEY_URLS: + response_private = requests.get(privatekey_url) + response_private.raise_for_status() + + gpg.import_keys(response_private.content) + key_info = gpg.list_keys()[-1] + gpg_md = GPGMetadata( + fingerprint=key_info["fingerprint"], + keyid=key_info["keyid"], + public_key=gpg.export_keys(key_info["keyid"]), + ) + gpg.trust_keys(gpg_md.fingerprint, "TRUST_ULTIMATE") + keys.append(gpg_md) + + return (gpg, keys) + + @pytest.fixture(scope="session") def signing_gpg_metadata(signing_gpg_homedir_path): """