From 2d1192c0dd978ff00689f01e0ab1407b0cdbc7c2 Mon Sep 17 00:00:00 2001 From: Matthias Dellweg Date: Mon, 25 Sep 2023 14:57:21 +0200 Subject: [PATCH] OpenPGP key support This adds a repository type as a keyring and content types to handle keys, keyids and key signatures. fixes #3024 --- CHANGES/3024.feature | 1 + ...penpgpkeyring_openpgppublickey_and_more.py | 227 ++++++++ pulpcore/app/models/__init__.py | 10 + pulpcore/app/models/openpgp.py | 233 ++++++++ pulpcore/app/openpgp.py | 529 ++++++++++++++++++ pulpcore/app/serializers/__init__.py | 5 + pulpcore/app/serializers/openpgp.py | 261 +++++++++ pulpcore/app/viewsets/__init__.py | 9 + pulpcore/app/viewsets/openpgp.py | 184 ++++++ pulpcore/pytest_plugin.py | 15 + pulpcore/tests/functional/api/test_openpgp.py | 146 +++++ 11 files changed, 1620 insertions(+) create mode 100644 CHANGES/3024.feature create mode 100644 pulpcore/app/migrations/0125_openpgpdistribution_openpgpkeyring_openpgppublickey_and_more.py create mode 100644 pulpcore/app/models/openpgp.py create mode 100644 pulpcore/app/openpgp.py create mode 100644 pulpcore/app/serializers/openpgp.py create mode 100644 pulpcore/app/viewsets/openpgp.py create mode 100644 pulpcore/tests/functional/api/test_openpgp.py diff --git a/CHANGES/3024.feature b/CHANGES/3024.feature new file mode 100644 index 0000000000..65eb912766 --- /dev/null +++ b/CHANGES/3024.feature @@ -0,0 +1 @@ +Added OpenPGP keyring repository type and OpenPGP key content type. diff --git a/pulpcore/app/migrations/0125_openpgpdistribution_openpgpkeyring_openpgppublickey_and_more.py b/pulpcore/app/migrations/0125_openpgpdistribution_openpgpkeyring_openpgppublickey_and_more.py new file mode 100644 index 0000000000..3b5f7a0617 --- /dev/null +++ b/pulpcore/app/migrations/0125_openpgpdistribution_openpgpkeyring_openpgppublickey_and_more.py @@ -0,0 +1,227 @@ +# Generated by Django 4.2.15 on 2024-10-11 08:29 + +from django.db import migrations, models +import django.db.models.deletion +import pulpcore.app.models.access_policy +import pulpcore.app.util + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0124_task_deferred_task_immediate"), + ] + + operations = [ + migrations.CreateModel( + name="OpenPGPDistribution", + fields=[ + ( + "distribution_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.distribution", + ), + ), + ], + options={ + "permissions": [ + ("manage_roles_openpgpdistribution", "Can manage roles on gem distributions") + ], + "default_related_name": "%(app_label)s_%(model_name)s", + }, + bases=("core.distribution", pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + migrations.CreateModel( + name="OpenPGPKeyring", + fields=[ + ( + "repository_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.repository", + ), + ), + ], + options={ + "permissions": [ + ("modify_openpgpkeyring", "Can modify content of the keyring"), + ("manage_roles_openpgpkeyring", "Can manage roles on keyrings"), + ("repair_openpgpkeyring", "Can repair repository versions"), + ], + "default_related_name": "%(app_label)s_%(model_name)s", + }, + bases=("core.repository", pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + migrations.CreateModel( + name="OpenPGPPublicKey", + fields=[ + ( + "content_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.content", + ), + ), + ("raw_data", models.BinaryField()), + ("fingerprint", models.CharField(max_length=64)), + ("created", models.DateTimeField()), + ( + "_pulp_domain", + models.ForeignKey( + default=pulpcore.app.util.get_domain_pk, + on_delete=django.db.models.deletion.PROTECT, + to="core.domain", + ), + ), + ], + options={ + "default_related_name": "%(app_label)s_%(model_name)s", + "unique_together": {("_pulp_domain", "fingerprint")}, + }, + bases=("core.content",), + ), + migrations.CreateModel( + name="OpenPGPUserID", + fields=[ + ( + "content_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.content", + ), + ), + ("raw_data", models.BinaryField()), + ("user_id", models.CharField()), + ( + "public_key", + models.ForeignKey( + on_delete=django.db.models.deletion.PROTECT, + related_name="user_ids", + to="core.openpgppublickey", + ), + ), + ], + options={ + "default_related_name": "%(app_label)s_%(model_name)s", + "unique_together": {("public_key", "user_id")}, + }, + bases=("core.content",), + ), + migrations.CreateModel( + name="OpenPGPUserAttribute", + fields=[ + ( + "content_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.content", + ), + ), + ("raw_data", models.BinaryField()), + ("sha256", models.CharField(max_length=128)), + ( + "public_key", + models.ForeignKey( + on_delete=django.db.models.deletion.PROTECT, + related_name="user_attributes", + to="core.openpgppublickey", + ), + ), + ], + options={ + "default_related_name": "%(app_label)s_%(model_name)s", + "unique_together": {("public_key", "sha256")}, + }, + bases=("core.content",), + ), + migrations.CreateModel( + name="OpenPGPSignature", + fields=[ + ( + "content_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.content", + ), + ), + ("raw_data", models.BinaryField()), + ("sha256", models.CharField(max_length=128)), + ("signature_type", models.PositiveSmallIntegerField()), + ("created", models.DateTimeField()), + ("expiration_time", models.DurationField(null=True)), + ("key_expiration_time", models.DurationField(null=True)), + ("issuer", models.CharField(max_length=16, null=True)), + ("signers_user_id", models.CharField(null=True)), + ( + "signed_content", + models.ForeignKey( + on_delete=django.db.models.deletion.PROTECT, + related_name="openpgp_signatures", + to="core.content", + ), + ), + ], + options={ + "default_related_name": "%(app_label)s_%(model_name)s", + "unique_together": {("signed_content", "sha256")}, + }, + bases=("core.content",), + ), + migrations.CreateModel( + name="OpenPGPPublicSubkey", + fields=[ + ( + "content_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.content", + ), + ), + ("raw_data", models.BinaryField()), + ("fingerprint", models.CharField(max_length=64)), + ("created", models.DateTimeField()), + ( + "public_key", + models.ForeignKey( + on_delete=django.db.models.deletion.PROTECT, + related_name="public_subkeys", + to="core.openpgppublickey", + ), + ), + ], + options={ + "default_related_name": "%(app_label)s_%(model_name)s", + "unique_together": {("public_key", "fingerprint")}, + }, + bases=("core.content",), + ), + ] diff --git a/pulpcore/app/models/__init__.py b/pulpcore/app/models/__init__.py index 4eb55919d3..02f3aac228 100644 --- a/pulpcore/app/models/__init__.py +++ b/pulpcore/app/models/__init__.py @@ -92,3 +92,13 @@ # Moved here to avoid a circular import with GroupProgressReport from .replica import UpstreamPulp + +from .openpgp import ( + OpenPGPDistribution, + OpenPGPKeyring, + OpenPGPPublicKey, + OpenPGPPublicSubkey, + OpenPGPSignature, + OpenPGPUserAttribute, + OpenPGPUserID, +) diff --git a/pulpcore/app/models/openpgp.py b/pulpcore/app/models/openpgp.py new file mode 100644 index 0000000000..00b3ff9fcc --- /dev/null +++ b/pulpcore/app/models/openpgp.py @@ -0,0 +1,233 @@ +import re + +from aiohttp.web_response import Response +from django.db import models +from django.utils import timezone +from pulpcore.app.models import AutoAddObjPermsMixin, Content, Distribution, Repository +from pulpcore.app.openpgp import wrap_armor +from pulpcore.app.util import get_domain_pk, gpg_verify + + +def _openpgp_packlen(length): + if length < 192: + return length.to_bytes(1, "big") + if length < 8384: + return (length + 48960).to_bytes(2, "big") + return b"\xff" + length.to_bytes(4, "big") + + +class _OpenPGPContent(Content): + # WARNING! This is an abstact class. + # Never export it in the plugin api! + raw_data = models.BinaryField() + + def packet(self): + return ( + (0xC0 | self.PACKAGE_TYPE).to_bytes(1, "big") + + _openpgp_packlen(len(self.raw_data)) + + self.raw_data + ) + + class Meta: + abstract = True + + +class OpenPGPPublicKey(_OpenPGPContent): + TYPE = "openpgp_publickey" + repo_key_fields = ("fingerprint",) + PACKAGE_TYPE = 6 + + fingerprint = models.CharField(max_length=64) + created = models.DateTimeField() + _pulp_domain = models.ForeignKey("core.Domain", default=get_domain_pk, on_delete=models.PROTECT) + + def represent(self, repository_version=None): + if repository_version: + content_filter = {"pk__in": repository_version.content} + else: + content_filter = {} + data = self.packet() + for signature in self.openpgp_signatures.filter(**content_filter): + data += signature.packet() + for user_id in self.user_ids.filter(**content_filter): + data += user_id.packet() + for signature in user_id.openpgp_signatures.filter(**content_filter): + data += signature.packet() + for user_attribute in self.user_attributes.filter(**content_filter): + data += user_attribute.packet() + for signature in user_attribute.openpgp_signatures.filter(**content_filter): + data += signature.packet() + for public_subkey in self.public_subkeys.filter(**content_filter): + data += public_subkey.packet() + for signature in public_subkey.openpgp_signatures.filter(**content_filter): + data += signature.packet() + return wrap_armor(data) + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + unique_together = ( + "_pulp_domain", + "fingerprint", + ) + + +class OpenPGPPublicSubkey(_OpenPGPContent): + TYPE = "openpgp_publicsubkey" + repo_key_fields = ("public_key", "fingerprint") + PACKAGE_TYPE = 14 + + public_key = models.ForeignKey( + OpenPGPPublicKey, related_name="public_subkeys", on_delete=models.PROTECT + ) + fingerprint = models.CharField(max_length=64) + created = models.DateTimeField() + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + unique_together = ( + "public_key", + "fingerprint", + ) + + +class OpenPGPUserID(_OpenPGPContent): + TYPE = "openpgp_userid" + repo_key_fields = ("public_key", "user_id") + PACKAGE_TYPE = 13 + + public_key = models.ForeignKey( + OpenPGPPublicKey, related_name="user_ids", on_delete=models.PROTECT + ) + user_id = models.CharField() + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + unique_together = ("public_key", "user_id") + + +class OpenPGPUserAttribute(_OpenPGPContent): + TYPE = "openpgp_userattribute" + repo_key_fields = ("public_key", "sha256") + PACKAGE_TYPE = 17 + + sha256 = models.CharField(max_length=128) + public_key = models.ForeignKey( + OpenPGPPublicKey, related_name="user_attributes", on_delete=models.PROTECT + ) + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + unique_together = ("public_key", "sha256") + + +class OpenPGPSignature(_OpenPGPContent): + TYPE = "openpgp_signature" + repo_key_fields = ("signed_content", "sha256") + PACKAGE_TYPE = 2 + + sha256 = models.CharField(max_length=128) + signature_type = models.PositiveSmallIntegerField() + created = models.DateTimeField() # 2 + expiration_time = models.DurationField(null=True) # 3 + key_expiration_time = models.DurationField(null=True) # 9 + issuer = models.CharField(max_length=16, null=True) # 16 + signers_user_id = models.CharField(null=True) # 28 + signed_content = models.ForeignKey( + Content, related_name="openpgp_signatures", on_delete=models.PROTECT + ) + + @property + def expired(self): + return self.expiration_time and timezone.now() > self.created + self.expiration_time + + @property + def key_expired(self): + if self.signature_type == 0x18: + return ( + bool(self.key_expiration_time) + and timezone.now() > self.signed_content.cast().created + self.key_expiration_time + ) + # In case, we don't know or this is not applicable, return None, not False. + return None + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + unique_together = ( + "signed_content", + "sha256", + ) + + +class OpenPGPKeyring(Repository, AutoAddObjPermsMixin): + """ + A Repository to hold OpenPGP (rfc4880bis) public key material. + """ + + TYPE = "openpgp" + CONTENT_TYPES = [ + OpenPGPPublicKey, + OpenPGPUserID, + OpenPGPUserAttribute, + OpenPGPPublicSubkey, + OpenPGPSignature, + ] + + def gpg_verify(self, signature, detached_data=None): + public_keys = "\n".join( + [ + pubkey.represent(repository_version=self.latest_version()) + for pubkey in OpenPGPPublicKey.objects.filter(pk__in=self.latest_version().content) + ] + ) + return gpg_verify(public_keys, signature, detached_data) + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + permissions = [ + ("modify_openpgpkeyring", "Can modify content of the keyring"), + ("manage_roles_openpgpkeyring", "Can manage roles on keyrings"), + ("repair_openpgpkeyring", "Can repair repository versions"), + ] + + +class OpenPGPDistribution(Distribution, AutoAddObjPermsMixin): + """ + A Distribution to allow downloading OpenPGP keys. + """ + + TYPE = "openpgp" + SERVE_FROM_PUBLICATION = False + PATH_REGEX = re.compile(r"(?P[0-9a-fA-F]{16})\.pub") + + def content_handler(self, path): + if result := self.PATH_REGEX.fullmatch(path): + repository_version = self.repository_version or self.repository.latest_version() + if repository_version is None: + return None + key_id = result.group("key_id") + key = OpenPGPPublicKey.objects.filter( + pk__in=repository_version.content, fingerprint__iendswith=key_id + ).first() + if key is None: + return None + return Response( + text=key.represent(repository_version), + content_type="application/pgp-keys", + charset="us-ascii", + ) + return None + + def content_handler_list_directory(self, rel_path): + if rel_path == "": + repository_version = self.repository_version or self.repository.latest_version() + fingerprints = OpenPGPPublicKey.objects.filter( + pk__in=repository_version.content + ).values_list("fingerprint", flat=True) + return {fingerprint[-16:] + ".pub" for fingerprint in fingerprints} + return set() + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + permissions = [ + ("manage_roles_openpgpdistribution", "Can manage roles on gem distributions"), + ] diff --git a/pulpcore/app/openpgp.py b/pulpcore/app/openpgp.py new file mode 100644 index 0000000000..6dc68c76fc --- /dev/null +++ b/pulpcore/app/openpgp.py @@ -0,0 +1,529 @@ +import hashlib +from base64 import b64decode, b64encode + +from django.utils import timezone + +# Source of information: +# * rfc4880 +# * rfc4880bis +# * gnupg sources +# * https://datatracker.ietf.org/doc/html/draft-shaw-openpgp-hkp-00 + + +PACKET_TYPES = { + 1: "Public-Key Encrypted Session Key Packet", + 2: "Signature Packet", + 3: "Symmetric-Key Encrypted Session Key Packet", + 4: "One-Pass Signature Packet", + 5: "Secret-Key Packet", + 6: "Public-Key Packet", + 7: "Secret-Subkey Packet", + 8: "Compressed Data Packet", + 9: "Symmetrically Encrypted Data Packet", + 10: "Marker Packet", + 11: "Literal Data Packet", + 12: "Trust Packet", + 13: "User ID Packet", + 14: "Public-Subkey Packet", + 17: "User Attribute Packet", + 18: "Sym. Encrypted and Integrity Protected Data Packet", + 19: "Modification Detection Code Packet", + 20: "OCB Encrypted Data Packet", +} + + +SIG_SUBPACKAGE_TYPES = { + 2: "Signature Creation Time", + 3: "Signature Expiration Time", + 4: "Exportable Certification", + 5: "Trust Signature", + 6: "Regular Expression", + 7: "Revocable", + 9: "Key Expiration Time", + 10: "Placeholder for backward compatibility", + 11: "Preferred Symmetric Algorithms", + 12: "Revocation Key", + 16: "Issuer", + 20: "Notation Data", + 21: "Preferred Hash Algorithms", + 22: "Preferred Compression Algorithms", + 23: "Key Server Preferences", + 24: "Preferred Key Server", + 25: "Primary User ID", + 26: "Policy URI", + 27: "Key Flags", + 28: "Signer's User ID", + 29: "Reason for Revocation", + 30: "Features", + 31: "Signature Target", + 32: "Embedded Signature", + 33: "Issuer Fingerprint", + 34: "Preferred Encryption Modes", + 35: "Intended Recipient Fingerprint", + 37: "Attested Certifications", + 38: "Key Block", + 40: "Literal Data Meta Hash", + 41: "Trust Alias", +} + + +PUBKEY_ALGORITHMS = { + 1: { + "name": "RSA", + "format": "mm", + }, + 2: { + "name": "RSA Encrypt-Only", + "format": "mm", + }, + 3: { + "name": "RSA Sign-Only", + "format": "mm", + }, + 16: { + "name": "Elgamal", + "format": "mmm", + }, + 17: { + "name": "DSA", + "format": "mmmm", + }, + 18: { + "name": "ECDH", + "format": "omk", + }, + 19: { + "name": "ECDSA", + "format": "om", + }, + 22: { + "name": "EdDSA", + "format": "om", + }, +} + + +HASH_ALGORITHMS = { + 1: "md5", + 2: "sha1", + 3: "ripemd160", + 8: "sha256", + 9: "sha384", + 10: "sha512", + 11: "sha224", + 12: "sha3-256", + 14: "sha3-512", +} + + +SYMMETRIC_ALGORITHMS = { + 0: "Plaintext", + 1: "IDEA", + 2: "TripleDES", + 3: "CAST5", + 4: "Blowfish", + 7: "AES128", + 8: "AES192", + 9: "AES256", + 10: "Twofish", + 11: "Camellia 128", + 12: "Camellia 192", + 13: "Camellia 256", +} + + +COMPRESSION_ALGORITHMS = { + 0: "Uncompressed", + 1: "ZIP", + 2: "ZLIB", + 3: "BZip2", +} + + +ENCRYPTION_MODES = { + 1: "EAX", + 2: "OCB", +} + + +def packet_iter(data): + begin = 0 + pos = 0 + while pos < len(data): + packet_tag = data[pos] + pos += 1 + if not packet_tag & 0x80: + raise ValueError("Invalid Packet Tag") + new_format = bool(packet_tag & 0x40) + + if new_format: + packet_type = packet_tag & 0x1F + + if data[pos] < 0xC0: + # 1-octet length + length = data[pos] + pos += 1 + elif data[pos] < 0xE0: + # 2-octet length + length = ((data[pos] - 0xC0) << 8) + data[pos + 1] + 0xC0 + pos += 2 + elif data[pos] == 0xFF: + # 5-octet length + length = int.from_bytes(data[pos + 1 : pos + 5], "big") + pos += 5 + else: + # Partial body length + raise NotImplementedError("Partial Packet Body is not implemented") + else: + packet_type = (packet_tag & 0x3C) >> 2 + length_type = packet_tag & 0x03 + if length_type == 3: + # Indeterminate packet length + length = len(data) - pos + else: + length_bytes = 1 << length_type + length = int.from_bytes(data[pos : pos + length_bytes], "big") + pos += length_bytes + + if packet_type == 0: + raise ValueError("Invalid Packet Type") + yield { + "type": packet_type, + "body": data[pos : pos + length], + "raw": data[begin : pos + length], + } + pos += length + begin = pos + if pos != len(data): + raise ValueError("Broken Stream") + + +def subpacket_iter(data): + begin = 0 + pos = 0 + while pos < len(data): + if data[pos] < 0xC0: + # 1-octet length + length = data[pos] + pos += 1 + elif data[pos] < 0xD0: + # 2-octet length + length = ((data[pos] - 0xC0) << 8) + data[pos + 1] + 0xC0 + pos += 2 + elif data[pos] == 0xFF: + # 5-octet length + length = int.from_bytes(data[pos + 1 : pos + 5], "big") + pos += 5 + else: + raise ValueError("Partial packet lengths are not allowed.") + yield { + "type": data[pos] & 0x7F, + "critical": bool(data[pos] & 0x80), + "body": data[pos + 1 : pos + length], + "raw": data[begin : pos + length], + } + pos += length + begin = pos + if pos != len(data): + raise ValueError("Broken Stream") + + +def extract_mpi(data): + bit_length = int.from_bytes(data[0 : 0 + 2], "big") + length = (bit_length + 7) // 8 + return {"bit_length": bit_length, "body": data[2 : 2 + length], "raw": data[0 : 2 + length]} + + +def extract_oid_kdf(data): + # These two types use the same method to decribe the length, which is all we want. + length = data[0] + return {"body": data[1 : 1 + length], "raw": data[0 : 1 + length]} + + +def analyze_sig_subpackets(data): + signature_attributes = {} + for packet in subpacket_iter(data): + packet_type = packet["type"] + body = packet["body"] + if packet_type == 2: + signature_attributes["created"] = timezone.datetime.fromtimestamp( + int.from_bytes(body, "big") + ).astimezone() + elif packet_type == 3: + signature_attributes["expiration_time"] = timezone.timedelta( + seconds=int.from_bytes(body, "big") + ) + elif packet_type == 9: + signature_attributes["key_expiration_time"] = timezone.timedelta( + seconds=int.from_bytes(body, "big") + ) + elif packet_type == 16: + signature_attributes["issuer"] = body.hex() + elif packet_type == 28: + signature_attributes["signers_user_id"] = body.decode() + return signature_attributes + + +def analyze_signature(data, pubkey, signed_packet_type, signed_packet): + # Type 2 + version = data[0] + if version == 3: + raise NotImplementedError("Version 3 signatures are not implemented.") + elif version in [4, 5]: + signature_type = data[1] + # pubkey_algorithm = PUBKEY_ALGORITHMS.get(data[2]) # Unused here. + hash_algorithm = HASH_ALGORITHMS.get(data[3]) + hashed_size = (data[4] << 8) + data[5] + hashed_data = data[6 : 6 + hashed_size] + unhashed_size = (data[6 + hashed_size] << 8) + data[7 + hashed_size] + unhashed_data = data[8 + hashed_size : 8 + hashed_size + unhashed_size] + canary = data[8 + hashed_size + unhashed_size : 10 + hashed_size + unhashed_size] + # signature = data[10 + hashed_size + unhashed_size :] # Unused here. + + if signature_type in [0x18, 0x19, 0x28]: + # 0x18 Subkey Binding Signature + # 0x19 Primary Key Binding Signature + # 0x28 Subkey Revocation Signature + if signed_packet_type != 14: + raise ValueError("Out of band subkey key signature.") + if version == 4: + hash_payload = b"\x99" + len(signed_packet).to_bytes(2, "big") + signed_packet + else: # version == 5 + hash_payload = b"\x9A" + len(signed_packet).to_bytes(4, "big") + signed_packet + elif signature_type in [0x10, 0x11, 0x12, 0x13, 0x16, 0x30]: + # 0x10 - 0x13 Certification of a user id or attribute + # 0x16 Attested Key Signature + # 0x30 Certification Revocation Signature + if signed_packet_type == 13: + hash_payload = b"\xB4" + len(signed_packet).to_bytes(4, "big") + signed_packet + elif signed_packet_type == 17: + hash_payload = b"\xD1" + len(signed_packet).to_bytes(4, "big") + signed_packet + else: + raise ValueError("Out of band user ID or attribute signature.") + elif signature_type in [0x1F, 0x20, 0x30]: + # 0x1F Direct Key Signature + # 0x20 Key Revocation Signature + # 0x30 Certification Revocation Signature + if signed_packet_type != 6: + raise ValueError("Out of band key signature.") + hash_payload = b"" + else: + # 0x50 Third-Party Confirmation Signature (does this even apply to keys?) + raise NotImplementedError(f"Unsupported signature type {signature_type:#x}.") + + # Validate the signature against the canary value + if hash_algorithm is None: + raise ValueError(f"Unknown hash algorithm {data[3]:#x} used for signature.") + h = hashlib.new(hash_algorithm) + if version == 4: + h.update(b"\x99" + len(pubkey).to_bytes(2, "big") + pubkey) + else: # version == 5 + h.update(b"\x9A" + len(pubkey).to_bytes(4, "big") + pubkey) + h.update(hash_payload) + if version == 4: + h.update( + data[: 6 + hashed_size] + + b"\x04\xFF" + + ((6 + hashed_size) % (1 << 32)).to_bytes(4, "big") + ) + else: # version == 5 + h.update( + data[: 6 + hashed_size] + + b"\x05\xFF" + + ((6 + hashed_size) % (1 << 64)).to_bytes(8, "big") + ) + if not h.digest().startswith(canary): + raise ValueError("Signature canary mismatch") + + # Hash the signature packet for db-uniqueness + sha256 = hashlib.sha256(data).hexdigest() + signature_attributes = { + "sha256": sha256, + "signature_type": signature_type, + "raw_data": data, + } + # Hashed Subpackets + signature_attributes.update(analyze_sig_subpackets(hashed_data)) + # Unhashed Subpackets + signature_attributes.update(analyze_sig_subpackets(unhashed_data)) + return signature_attributes + else: + raise ValueError(f"Invalid Packet version {version}") + + +def analyze_user_id(data): + # Type 13 + user_id = data.decode() + return {"raw_data": data, "user_id": user_id} + + +def analyze_user_attribute(data): + # Type 17 + # Treat as an opaque packet for now. + sha256 = hashlib.sha256(data).hexdigest() + return {"raw_data": data, "sha256": sha256} + + +def analyze_pubkey(data): + # Type 5, 6, 7 or 14 + # Type 5 and 7 are actually secret key packages. They begin with the corresponding public key + # package. Secret bits are ignored by us here. + version = data[0] + created = timezone.datetime.fromtimestamp(int.from_bytes(data[1:5], "big")).astimezone() + if version == 3: + n = extract_mpi(data[8:]) + e = extract_mpi(data[8 + len(n["raw"])]) + fingerprint = hashlib.md5(n["body"] + e["body"]).hexdigest() + # expiration = int.from_bytes(data[5:7], "big") # Unused here. Kept for documentation. + pubkey_algorithm = PUBKEY_ALGORITHMS.get(data[7]) + elif version in [4, 5]: + pubkey_algorithm = PUBKEY_ALGORITHMS.get(data[5]) + if version == 4: + key_data = data[6:] + else: + key_data_len = int.from_bytes(data[6:10], "big") + key_data = data[10 : 10 + key_data_len] + pub_key_body = data[: 10 + key_data_len] + fingerprint = hashlib.sha256( + b"\x9a" + len(pub_key_body).to_bytes(4, "big") + pub_key_body + ).hexdigest() + if pubkey_algorithm and "format" in pubkey_algorithm: + pos = 0 + for item_type in pubkey_algorithm["format"]: + if item_type == "m": + # Multi precision integer + mpi = extract_mpi(key_data[pos:]) + pos += len(mpi["raw"]) + elif item_type == "o": + # OID + oid = extract_oid_kdf(key_data[pos:]) + pos += len(oid["raw"]) + elif item_type == "k": + # KDF parameters + kdf = extract_oid_kdf(key_data[pos:]) + pos += len(kdf["raw"]) + else: + raise RuntimeError("Unknown key material format.") + if version == 4: + key_data_len = pos + pub_key_body = data[: 6 + key_data_len] + fingerprint = hashlib.sha1( + b"\x99" + len(pub_key_body).to_bytes(2, "big") + pub_key_body + ).hexdigest() + else: + if version == 4: + # We needed to analyse the public key algorithm to calculate the fingerprint of + # version 4 keys. Version 5 keys do not have this limitation, and we can get away + # with an unknown algorithm. + raise ValueError("Unknown public key algorithm.") + else: + raise ValueError(f"Invalid Packet version {version}") + return {"raw_data": data, "created": created, "fingerprint": fingerprint} + + +def gpg_crc24(data): + crc = 0xB704CE + for byte in data: + crc ^= byte << 16 + for i in range(8): + crc <<= 1 + if crc & 0x1000000: + crc ^= 0x1864CFB + return (crc & 0xFFFFFF).to_bytes(3, "big") + + +def unwrap_armor(data): + try: + lines = data.decode().strip().split("\n") + except UnicodeDecodeError: + # assume raw binary data + return data + line = lines.pop(0).strip() + if line.startswith("-----BEGIN ") and line.endswith("-----"): + message_type = line[11:-5] + else: + # Header not found assume raw binary data + return data + line = lines.pop(0).strip() + while line != "": + # Armor Headers + line = lines.pop(0).strip() + armor = "" + while line != "-----END " + message_type + "-----": + armor += line + line = lines.pop(0).strip() + if armor[-5] != "=": + raise ValueError("Broken Stream") + raw = b64decode(armor[:-5]) + checksum = b64decode(armor[-4:]) + if gpg_crc24(raw) != checksum: + raise ValueError("Checksum Mismatch") + return raw + + +def wrap_armor(raw, message_type="PGP PUBLIC KEY BLOCK"): + checksum = "=" + b64encode(gpg_crc24(raw)).decode() + data = b64encode(raw).decode() + lines = ["-----BEGIN " + message_type + "-----", ""] + while data: + line = data[:76] + data = data[76:] + lines.append(line) + lines.append(checksum) + lines.append("-----END " + message_type + "-----") + return "\n".join(lines) + + +def read_public_key(data): + data = unwrap_armor(data) + packets = packet_iter(data) + + # The first packet must be the public key. + packet = next(packets) + if packet["type"] != 6: + raise ValueError("Not a public key.") + public_key = analyze_pubkey(packet["body"]) + public_key.update( + {"user_ids": [], "user_attributes": [], "public_subkeys": [], "signatures": []} + ) + signed_content = public_key + signed_packet_type = 6 + + for packet in packets: + packet_type = packet["type"] + body = packet["body"] + if packet_type == 2: + signed_content["signatures"].append( + analyze_signature( + body, + public_key["raw_data"], + signed_packet_type, + signed_content["raw_data"], + ) + ) + + elif packet_type == 13: + user_id = analyze_user_id(body) + user_id["signatures"] = [] + signed_content = user_id + signed_packet_type = packet_type + public_key["user_ids"].append(user_id) + + elif packet_type == 14: + public_subkey = analyze_pubkey(body) + public_subkey["signatures"] = [] + signed_content = public_subkey + signed_packet_type = packet_type + public_key["public_subkeys"].append(public_subkey) + + elif packet_type == 17: + user_attribute = analyze_user_attribute(body) + user_attribute["signatures"] = [] + signed_content = user_attribute + signed_packet_type = packet_type + public_key["user_attributes"].append(user_attribute) + + else: + raise NotImplementedError("Invalid or unknown rfc4880 packet.") + + return public_key diff --git a/pulpcore/app/serializers/__init__.py b/pulpcore/app/serializers/__init__.py index f05416fcbc..75ec2ed84b 100644 --- a/pulpcore/app/serializers/__init__.py +++ b/pulpcore/app/serializers/__init__.py @@ -117,3 +117,8 @@ UserSerializer, ) from .replica import UpstreamPulpSerializer +from .openpgp import ( + OpenPGPDistributionSerializer, + OpenPGPKeyringSerializer, + OpenPGPPublicKeySerializer, +) diff --git a/pulpcore/app/serializers/openpgp.py b/pulpcore/app/serializers/openpgp.py new file mode 100644 index 0000000000..69e69fa9a1 --- /dev/null +++ b/pulpcore/app/serializers/openpgp.py @@ -0,0 +1,261 @@ +from gettext import gettext as _ + +from pulpcore.app import models +from pulpcore.app.openpgp import read_public_key +from pulpcore.app.serializers import ( + DetailRelatedField, + DistributionSerializer, + NoArtifactContentSerializer, + RepositorySerializer, + RepositoryVersionRelatedField, +) +from pulpcore.app.util import get_domain_pk +from pulpcore.plugin.serializers import NoArtifactContentUploadSerializer +from rest_framework import serializers + + +class NestedOpenPGPSignatureSerializer(NoArtifactContentSerializer): + expired = serializers.BooleanField() + + class Meta: + model = models.OpenPGPSignature + fields = ( + "issuer", + "created", + "expiration_time", + "signers_user_id", + "key_expiration_time", + "expired", + "key_expired", + ) + + +class OpenPGPSignatureSerializer(NestedOpenPGPSignatureSerializer): + signed_content = DetailRelatedField( + view_name_pattern=r"content(-.*/.*)-detail", + read_only=True, + ) + + def retrieve(self, validated_data): + return models.OpenPGPSignature.objects.filter( + signed_content=validated_data["signed_content"], sha256=validated_data["sha256"] + ).first() + + def create(self, validated_data): + signature = super().create(validated_data) + self.context["added_content_pks"].add(signature.pk) + return signature + + class Meta: + model = models.OpenPGPSignature + fields = ( + NoArtifactContentSerializer.Meta.fields + + NestedOpenPGPSignatureSerializer.Meta.fields + + ("signed_content",) + ) + + +class NestedOpenPGPUserIDSerializer(NoArtifactContentSerializer): + signatures = NestedOpenPGPSignatureSerializer( + many=True, read_only=True, source="openpgp_signatures" + ) + + class Meta: + model = models.OpenPGPUserID + fields = ("user_id", "signatures") + + +class OpenPGPUserIDSerializer(NestedOpenPGPUserIDSerializer): + public_key = DetailRelatedField( + view_name=r"content-core/openpgp_publickey-detail", + read_only=True, + ) + + def retrieve(self, validated_data): + return models.OpenPGPUserID.objects.filter( + public_key=validated_data["public_key"], user_id=validated_data["user_id"] + ).first() + + def create(self, validated_data): + signatures_data = validated_data.pop("signatures") + user_id = super().create(validated_data) + self.context["added_content_pks"].add(user_id.pk) + for data in signatures_data: + OpenPGPSignatureSerializer(context=self.context).create( + {"signed_content": user_id, **data} + ) + return user_id + + class Meta: + model = models.OpenPGPUserID + fields = ( + NoArtifactContentSerializer.Meta.fields + + NestedOpenPGPUserIDSerializer.Meta.fields + + ("public_key",) + ) + + +class NestedOpenPGPUserAttributeSerializer(NoArtifactContentSerializer): + signatures = NestedOpenPGPSignatureSerializer( + many=True, read_only=True, source="openpgp_signatures" + ) + + class Meta: + model = models.OpenPGPUserAttribute + fields = ("sha256", "signatures") + + +class OpenPGPUserAttributeSerializer(NestedOpenPGPUserAttributeSerializer): + public_key = DetailRelatedField( + view_name=r"content-core/openpgp_publickey-detail", + read_only=True, + ) + + def retrieve(self, validated_data): + return models.OpenPGPUserAttribute.objects.filter( + public_key=validated_data["public_key"], sha256=validated_data["sha256"] + ).first() + + def create(self, validated_data): + signatures_data = validated_data.pop("signatures") + user_attribute = super().create(validated_data) + self.context["added_content_pks"].add(user_attribute.pk) + for data in signatures_data: + OpenPGPSignatureSerializer(context=self.context).create( + {"signed_content": user_attribute, **data} + ) + return user_attribute + + class Meta: + model = models.OpenPGPUserAttribute + fields = ( + NoArtifactContentSerializer.Meta.fields + + NestedOpenPGPUserAttributeSerializer.Meta.fields + + ("public_key", "sha256") + ) + + +class NestedOpenPGPPublicSubkeySerializer(NoArtifactContentSerializer): + signatures = NestedOpenPGPSignatureSerializer( + many=True, read_only=True, source="openpgp_signatures" + ) + + class Meta: + model = models.OpenPGPPublicSubkey + fields = ( + "fingerprint", + "created", + "signatures", + ) + + +class OpenPGPPublicSubkeySerializer(NestedOpenPGPPublicSubkeySerializer): + public_key = DetailRelatedField( + view_name=r"content-core/openpgp_publickey-detail", + read_only=True, + ) + + def retrieve(self, validated_data): + return models.OpenPGPPublicSubkey.objects.filter( + public_key=validated_data["public_key"], fingerprint=validated_data["fingerprint"] + ).first() + + def create(self, validated_data): + signatures_data = validated_data.pop("signatures") + public_subkey = super().create(validated_data) + self.context["added_content_pks"].add(public_subkey.pk) + for data in signatures_data: + OpenPGPSignatureSerializer(context=self.context).create( + {"signed_content": public_subkey, **data} + ) + return public_subkey + + class Meta: + model = models.OpenPGPPublicSubkey + fields = ( + NoArtifactContentSerializer.Meta.fields + + NestedOpenPGPPublicSubkeySerializer.Meta.fields + + ("public_key",) + ) + + +class OpenPGPPublicKeySerializer(NoArtifactContentUploadSerializer): + fingerprint = serializers.CharField(max_length=64, read_only=True) + created = serializers.DateTimeField(read_only=True) + user_ids = NestedOpenPGPUserIDSerializer(many=True, read_only=True) + user_attributes = NestedOpenPGPUserAttributeSerializer(many=True, read_only=True) + public_subkeys = NestedOpenPGPPublicSubkeySerializer(many=True, read_only=True) + + def deferred_validate(self, data): + data = super().deferred_validate(data) + file = data.pop("file") + try: + data.update(read_public_key(file.read())) + except (ValueError, NotImplementedError) as e: + raise serializers.ValidationError(str(e)) + return data + + def retrieve(self, validated_data): + return models.OpenPGPPublicKey.objects.filter( + pulp_domain=get_domain_pk(), fingerprint=validated_data["fingerprint"] + ).first() + + def create(self, validated_data): + # We need to handle that ourselves to not create a bunch of versions here. + repository = validated_data.pop("repository", None) + + signatures_data = validated_data.pop("signatures") + user_ids_data = validated_data.pop("user_ids") + user_attributes_data = validated_data.pop("user_attributes") + public_subkeys_data = validated_data.pop("public_subkeys") + + public_key = super().create(validated_data) + self.context["added_content_pks"] = {public_key.pk} + for data in signatures_data: + OpenPGPSignatureSerializer(context=self.context).create( + {"signed_content": public_key, **data} + ) + for data in user_ids_data: + OpenPGPUserIDSerializer(context=self.context).create({"public_key": public_key, **data}) + for data in user_attributes_data: + OpenPGPUserAttributeSerializer(context=self.context).create( + {"public_key": public_key, **data} + ) + for data in public_subkeys_data: + OpenPGPPublicSubkeySerializer(context=self.context).create( + {"public_key": public_key, **data} + ) + + if repository: + with repository.new_version() as new_version: + new_version.add_content( + models.Content.objects.filter(pk__in=self.context["added_content_pks"]) + ) + + return public_key + + class Meta: + model = models.OpenPGPPublicKey + fields = NoArtifactContentUploadSerializer.Meta.fields + ( + "fingerprint", + "created", + "user_ids", + "user_attributes", + "public_subkeys", + ) + + +class OpenPGPKeyringSerializer(RepositorySerializer): + class Meta: + fields = RepositorySerializer.Meta.fields + model = models.OpenPGPKeyring + + +class OpenPGPDistributionSerializer(DistributionSerializer): + repository_version = RepositoryVersionRelatedField( + required=False, help_text=_("RepositoryVersion to be served"), allow_null=True + ) + + class Meta: + fields = DistributionSerializer.Meta.fields + ("repository_version",) + model = models.OpenPGPDistribution diff --git a/pulpcore/app/viewsets/__init__.py b/pulpcore/app/viewsets/__init__.py index 3d01156594..468850e7ed 100644 --- a/pulpcore/app/viewsets/__init__.py +++ b/pulpcore/app/viewsets/__init__.py @@ -81,3 +81,12 @@ UserRoleViewSet, ) from .replica import UpstreamPulpViewSet +from .openpgp import ( + OpenPGPDistributionViewSet, + OpenPGPKeyringViewSet, + OpenPGPPublicKeyViewSet, + OpenPGPPublicSubkeyViewSet, + OpenPGPSignatureViewSet, + OpenPGPUserAttributeViewSet, + OpenPGPUserIDViewSet, +) diff --git a/pulpcore/app/viewsets/openpgp.py b/pulpcore/app/viewsets/openpgp.py new file mode 100644 index 0000000000..1567bb9f5c --- /dev/null +++ b/pulpcore/app/viewsets/openpgp.py @@ -0,0 +1,184 @@ +from pulpcore.app import models +from pulpcore.app.serializers.openpgp import ( + OpenPGPDistributionSerializer, + OpenPGPKeyringSerializer, + OpenPGPPublicKeySerializer, + OpenPGPPublicSubkeySerializer, + OpenPGPSignatureSerializer, + OpenPGPUserAttributeSerializer, + OpenPGPUserIDSerializer, +) +from pulpcore.app.viewsets.base import NAME_FILTER_OPTIONS, RolesMixin +from pulpcore.app.viewsets.content import ContentFilter, ReadOnlyContentViewSet +from pulpcore.app.viewsets.repository import RepositoryViewSet +from pulpcore.app.viewsets.publication import DistributionFilter, DistributionViewSet +from pulpcore.plugin.actions import ModifyRepositoryActionMixin +from pulpcore.plugin.viewsets import NoArtifactContentUploadViewSet + + +class OpenPGPSignatureFilter(ContentFilter): + # Wishlist: filter by expired + + class Meta: + model = models.OpenPGPSignature + fields = ["issuer"] + + +class OpenPGPUserIDFilter(ContentFilter): + class Meta: + model = models.OpenPGPUserID + fields = {"user_id": NAME_FILTER_OPTIONS} + + +class OpenPGPUserAttributeFilter(ContentFilter): + class Meta: + model = models.OpenPGPUserAttribute + fields = ["sha256"] + + +class OpenPGPPublicSubkeyFilter(ContentFilter): + class Meta: + model = models.OpenPGPPublicSubkey + fields = ["fingerprint"] + + +class OpenPGPPublicKeyFilter(ContentFilter): + # Wishlist: filter by user id + + class Meta: + model = models.OpenPGPPublicKey + fields = ["fingerprint"] + + +class OpenPGPDistributionFilter(DistributionFilter): + class Meta: + model = models.OpenPGPDistribution + fields = ["repository_version"] + + +class OpenPGPSignatureViewSet(ReadOnlyContentViewSet): + endpoint_name = "openpgp_signature" + queryset = models.OpenPGPSignature.objects.all() + serializer_class = OpenPGPSignatureSerializer + filterset_class = OpenPGPSignatureFilter + + +class OpenPGPUserIDViewSet(ReadOnlyContentViewSet): + endpoint_name = "openpgp_userid" + queryset = models.OpenPGPUserID.objects.all() + serializer_class = OpenPGPUserIDSerializer + filterset_class = OpenPGPUserIDFilter + + +class OpenPGPUserAttributeViewSet(ReadOnlyContentViewSet): + endpoint_name = "openpgp_userattribute" + queryset = models.OpenPGPUserAttribute.objects.all() + serializer_class = OpenPGPUserAttributeSerializer + filterset_class = OpenPGPUserAttributeFilter + + +class OpenPGPPublicSubkeyViewSet(ReadOnlyContentViewSet): + endpoint_name = "openpgp_publicsubkey" + queryset = models.OpenPGPPublicSubkey.objects.all() + serializer_class = OpenPGPPublicSubkeySerializer + filterset_class = OpenPGPPublicSubkeyFilter + + +class OpenPGPPublicKeyViewSet(NoArtifactContentUploadViewSet): + endpoint_name = "openpgp_publickey" + queryset = models.OpenPGPPublicKey.objects.prefetch_related( + "user_ids__openpgp_signatures", + "user_attributes__openpgp_signatures", + "public_subkeys__openpgp_signatures", + ) + serializer_class = OpenPGPPublicKeySerializer + filterset_class = OpenPGPPublicKeyFilter + + +class OpenPGPKeyringViewSet(RepositoryViewSet, ModifyRepositoryActionMixin, RolesMixin): + endpoint_name = "openpgp_keyring" + queryset = models.OpenPGPKeyring.objects.all() + serializer_class = OpenPGPKeyringSerializer + queryset_filtering_required_permission = "core.view_openpgpkeyring" + + DEFAULT_ACCESS_POLICY = { + "statements": [ + { + "action": ["list", "my_permissions"], + "principal": "authenticated", + "effect": "allow", + }, + { + "action": ["create"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_domain_perms:core.add_openpgpkeyring", + ], + }, + { + "action": ["retrieve"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_model_or_domain_or_obj_perms:core.view_openpgpkeyring", + }, + { + "action": ["destroy"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_domain_or_obj_perms:core.delete_openpgpkeyring", + ], + }, + { + "action": ["update", "partial_update", "set_label", "unset_label"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_domain_or_obj_perms:core.change_openpgpkeyring", + ], + }, + { + "action": ["modify"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_domain_or_obj_perms:core.modify_openpgpkeyring", + ], + }, + { + "action": ["list_roles", "add_role", "remove_role"], + "principal": "authenticated", + "effect": "allow", + "condition": ["has_model_or_domain_or_obj_perms:core.manage_roles_openpgpkeyring"], + }, + ], + "creation_hooks": [ + { + "function": "add_roles_for_object_creator", + "parameters": {"roles": "core.openpgpkeyring_owner"}, + }, + ], + "queryset_scoping": {"function": "scope_queryset"}, + } + LOCKED_ROLES = { + "core.openpgpkeyring_creator": ["core.add_openpgpkeyring"], + "core.openpgpkeyring_owner": [ + "core.view_openpgpkeyring", + "core.change_openpgpkeyring", + "core.delete_openpgpkeyring", + "core.modify_openpgpkeyring", + "core.manage_roles_openpgpkeyring", + "core.repair_openpgpkeyring", + ], + "core.openpgpkeyring_viewer": ["core.view_openpgpkeyring"], + } + + +class OpenPGPDistributionViewSet(DistributionViewSet): + endpoint_name = "openpgp" + queryset = models.OpenPGPDistribution.objects.all() + serializer_class = OpenPGPDistributionSerializer + filterset_class = OpenPGPDistributionFilter + + # DEFAULT_ACCESS_POLICY diff --git a/pulpcore/pytest_plugin.py b/pulpcore/pytest_plugin.py index 16b07822ee..1656926808 100644 --- a/pulpcore/pytest_plugin.py +++ b/pulpcore/pytest_plugin.py @@ -1251,3 +1251,18 @@ def ascii_armored_detached_signing_service( return pulpcore_bindings.SigningServicesApi.list( name=_ascii_armored_detached_signing_service_name ).results[0] + + +@pytest.fixture(scope="class") +def openpgp_keyring_factory(pulpcore_bindings, gen_object_with_cleanup): + def _openpgp_keyring_factory(**kwargs): + extra_args = {} + if pulp_domain := kwargs.pop("pulp_domain", None): + extra_args["pulp_domain"] = pulp_domain + body = {"name": str(uuid.uuid4())} + body.update(kwargs) + return gen_object_with_cleanup( + pulpcore_bindings.RepositoriesOpenpgpKeyringApi, body, **extra_args + ) + + return _openpgp_keyring_factory diff --git a/pulpcore/tests/functional/api/test_openpgp.py b/pulpcore/tests/functional/api/test_openpgp.py new file mode 100644 index 0000000000..172f86a320 --- /dev/null +++ b/pulpcore/tests/functional/api/test_openpgp.py @@ -0,0 +1,146 @@ +import pytest + + +ALICE_PUB = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: Alice's OpenPGP certificate +Comment: https://www.ietf.org/id/draft-bre-openpgp-samples-01.html + +mDMEXEcE6RYJKwYBBAHaRw8BAQdArjWwk3FAqyiFbFBKT4TzXcVBqPTB3gmzlC/U +b7O1u120JkFsaWNlIExvdmVsYWNlIDxhbGljZUBvcGVucGdwLmV4YW1wbGU+iJAE +ExYIADgCGwMFCwkIBwIGFQoJCAsCBBYCAwECHgECF4AWIQTrhbtfozp14V6UTmPy +MVUMT0fjjgUCXaWfOgAKCRDyMVUMT0fjjukrAPoDnHBSogOmsHOsd9qGsiZpgRnO +dypvbm+QtXZqth9rvwD9HcDC0tC+PHAsO7OTh1S1TC9RiJsvawAfCPaQZoed8gK4 +OARcRwTpEgorBgEEAZdVAQUBAQdAQv8GIa2rSTzgqbXCpDDYMiKRVitCsy203x3s +E9+eviIDAQgHiHgEGBYIACAWIQTrhbtfozp14V6UTmPyMVUMT0fjjgUCXEcE6QIb +DAAKCRDyMVUMT0fjjlnQAQDFHUs6TIcxrNTtEZFjUFm1M0PJ1Dng/cDW4xN80fsn +0QEA22Kr7VkCjeAEC08VSTeV+QFsmz55/lntWkwYWhmvOgE= +=iIGO +-----END PGP PUBLIC KEY BLOCK----- +""" + + +ALICE_REVOCATION = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: Alice's revocation certificate +Comment: https://www.ietf.org/id/draft-bre-openpgp-samples-01.html + +iHgEIBYIACAWIQTrhbtfozp14V6UTmPyMVUMT0fjjgUCXaWkOwIdAAAKCRDyMVUM +T0fjjoBlAQDA9ukZFKRFGCooVcVoDVmxTaHLUXlIg9TPh2f7zzI9KgD/SLNXUOaH +O6TozOS7C9lwIHwwdHdAxgf5BzuhLT9iuAM= +=Tm8h +-----END PGP PUBLIC KEY BLOCK----- +""" + + +ALICE_REVOKED = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +mDMEXEcE6RYJKwYBBAHaRw8BAQdArjWwk3FAqyiFbFBKT4TzXcVBqPTB3gmzlC/U +b7O1u12IeAQgFggAIBYhBOuFu1+jOnXhXpROY/IxVQxPR+OOBQJdpaQ7Ah0AAAoJ +EPIxVQxPR+OOgGUBAMD26RkUpEUYKihVxWgNWbFNoctReUiD1M+HZ/vPMj0qAP9I +s1dQ5oc7pOjM5LsL2XAgfDB0d0DGB/kHO6EtP2K4A7QmQWxpY2UgTG92ZWxhY2Ug +PGFsaWNlQG9wZW5wZ3AuZXhhbXBsZT6IkAQTFggAOAIbAwULCQgHAgYVCgkICwIE +FgIDAQIeAQIXgBYhBOuFu1+jOnXhXpROY/IxVQxPR+OOBQJdpZ86AAoJEPIxVQxP +R+OO6SsA+gOccFKiA6awc6x32oayJmmBGc53Km9ub5C1dmq2H2u/AP0dwMLS0L48 +cCw7s5OHVLVML1GImy9rAB8I9pBmh53yArg4BFxHBOkSCisGAQQBl1UBBQEBB0BC +/wYhratJPOCptcKkMNgyIpFWK0KzLbTfHewT356+IgMBCAeIeAQYFggAIBYhBOuF +u1+jOnXhXpROY/IxVQxPR+OOBQJcRwTpAhsMAAoJEPIxVQxPR+OOWdABAMUdSzpM +hzGs1O0RkWNQWbUzQ8nUOeD9wNbjE3zR+yfRAQDbYqvtWQKN4AQLTxVJN5X5AWyb +Pnn+We1aTBhaGa86AQ== +=W1yt +-----END PGP PUBLIC KEY BLOCK----- +""" + + +BOB_PUB = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: Bob's OpenPGP certificate +Comment: https://www.ietf.org/id/draft-bre-openpgp-samples-01.html + +mQGNBF2lnPIBDAC5cL9PQoQLTMuhjbYvb4Ncuuo0bfmgPRFywX53jPhoFf4Zg6mv +/seOXpgecTdOcVttfzC8ycIKrt3aQTiwOG/ctaR4Bk/t6ayNFfdUNxHWk4WCKzdz +/56fW2O0F23qIRd8UUJp5IIlN4RDdRCtdhVQIAuzvp2oVy/LaS2kxQoKvph/5pQ/ +5whqsyroEWDJoSV0yOb25B/iwk/pLUFoyhDG9bj0kIzDxrEqW+7Ba8nocQlecMF3 +X5KMN5kp2zraLv9dlBBpWW43XktjcCZgMy20SouraVma8Je/ECwUWYUiAZxLIlMv +9CurEOtxUw6N3RdOtLmYZS9uEnn5y1UkF88o8Nku890uk6BrewFzJyLAx5wRZ4F0 +qV/yq36UWQ0JB/AUGhHVPdFf6pl6eaxBwT5GXvbBUibtf8YI2og5RsgTWtXfU7eb +SGXrl5ZMpbA6mbfhd0R8aPxWfmDWiIOhBufhMCvUHh1sApMKVZnvIff9/0Dca3wb +vLIwa3T4CyshfT0AEQEAAbQhQm9iIEJhYmJhZ2UgPGJvYkBvcGVucGdwLmV4YW1w +bGU+iQHOBBMBCgA4AhsDBQsJCAcCBhUKCQgLAgQWAgMBAh4BAheAFiEE0aZuGiOx +gsmYD3iM+/zIKgFeczAFAl2lnvoACgkQ+/zIKgFeczBvbAv/VNk90a6hG8Od9xTz +XxH5YRFUSGfIA1yjPIVOnKqhMwps2U+sWE3urL+MvjyQRlyRV8oY9IOhQ5Esm6DO +ZYrTnE7qVETm1ajIAP2OFChEc55uH88x/anpPOXOJY7S8jbn3naC9qad75BrZ+3g +9EBUWiy5p8TykP05WSnSxNRt7vFKLfEB4nGkehpwHXOVF0CRNwYle42bg8lpmdXF +DcCZCi+qEbafmTQzkAqyzS3nCh3IAqq6Y0kBuaKLm2tSNUOlZbD+OHYQNZ5Jix7c +ZUzs6Xh4+I55NRWl5smrLq66yOQoFPy9jot/Qxikx/wP3MsAzeGaZSEPc0fHp5G1 +6rlGbxQ3vl8/usUV7W+TMEMljgwd5x8POR6HC8EaCDfVnUBCPi/Gv+egLjsIbPJZ +ZEroiE40e6/UoCiQtlpQB5exPJYSd1Q1txCwueih99PHepsDhmUQKiACszNU+RRo +zAYau2VdHqnRJ7QYdxHDiH49jPK4NTMyb/tJh2TiIwcmsIpGuQGNBF2lnPIBDADW +ML9cbGMrp12CtF9b2P6z9TTT74S8iyBOzaSvdGDQY/sUtZXRg21HWamXnn9sSXvI +DEINOQ6A9QxdxoqWdCHrOuW3ofneYXoG+zeKc4dC86wa1TR2q9vW+RMXSO4uImA+ +Uzula/6k1DogDf28qhCxMwG/i/m9g1c/0aApuDyKdQ1PXsHHNlgd/Dn6rrd5y2AO +baifV7wIhEJnvqgFXDN2RXGjLeCOHV4Q2WTYPg/S4k1nMXVDwZXrvIsA0YwIMgIT +86Rafp1qKlgPNbiIlC1g9RY/iFaGN2b4Ir6GDohBQSfZW2+LXoPZuVE/wGlQ01rh +827KVZW4lXvqsge+wtnWlszcselGATyzqOK9LdHPdZGzROZYI2e8c+paLNDdVPL6 +vdRBUnkCaEkOtl1mr2JpQi5nTU+gTX4IeInC7E+1a9UDF/Y85ybUz8XV8rUnR76U +qVC7KidNepdHbZjjXCt8/Zo+Tec9JNbYNQB/e9ExmDntmlHEsSEQzFwzj8sxH48A +EQEAAYkBtgQYAQoAIBYhBNGmbhojsYLJmA94jPv8yCoBXnMwBQJdpZzyAhsMAAoJ +EPv8yCoBXnMw6f8L/26C34dkjBffTzMj5Bdzm8MtF67OYneJ4TQMw7+41IL4rVcS +KhIhk/3Ud5knaRtP2ef1+5F66h9/RPQOJ5+tvBwhBAcUWSupKnUrdVaZQanYmtSx +cVV2PL9+QEiNN3tzluhaWO//rACxJ+K/ZXQlIzwQVTpNhfGzAaMVV9zpf3u0k14i +tcv6alKY8+rLZvO1wIIeRZLmU0tZDD5HtWDvUV7rIFI1WuoLb+KZgbYn3OWjCPHV +dTrdZ2CqnZbG3SXw6awH9bzRLV9EXkbhIMez0deCVdeo+wFFklh8/5VK2b0vk/+w +qMJxfpa1lHvJLobzOP9fvrswsr92MA2+k901WeISR7qEzcI0Fdg8AyFAExaEK6Vy +jP7SXGLwvfisw34OxuZr3qmx1Sufu4toH3XrB7QJN8XyqqbsGxUCBqWif9RSK4xj +zRTe56iPeiSJJOIciMP9i2ldI+KgLycyeDvGoBj0HCLO3gVaBe4ubVrj5KjhX2PV +NEJd3XZRzaXZE2aAMQ== +=NXei +-----END PGP PUBLIC KEY BLOCK----- +""" + +BOB_REVOCATION = """ +-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: Bob's revocation certificate +Comment: https://www.ietf.org/id/draft-bre-openpgp-samples-01.html + +iQG2BCABCgAgFiEE0aZuGiOxgsmYD3iM+/zIKgFeczAFAl2lnQQCHQAACgkQ+/zI +KgFeczAIHAv/RrlGlPFKsW0BShC8sVtPfbT1N9lUqyrsgBhrUryM/i+rBtkbnSjp +28R5araupt0og1g2L5VsCRM+ql0jf0zrZXOorKfAO70HCP3X+MlEquvztMUZGJRZ +7TSMgIY1MeFgLmOw9pDKf3tSoouBOpPe5eVfXviEDDo2zOfdntjPyCMlxHgAcjZo +XqMaurV+nKWoIx0zbdpNLsRy4JZcmnOSFdPw37R8U2miPi2qNyVwcyCxQy0LjN7Y +AWadrs9vE0DrneSVP2OpBhl7g+Dj2uXJQRPVXcq6w9g5Fir6DnlhekTLsa78T5cD +n8q7aRusMlALPAOosENOgINgsVcjuILkPN1eD+zGAgHgdiKaep1+P3pbo5n0CLki +UCAsLnCEo8eBV9DCb/n1FlI5yhQhgQyMYlp/49H0JSc3IY9KHhv6f0zIaRWs0JuD +ajcXTJ9AyB+SA6GBb9Q+XsNXjZ1gj75ekUD1sQ3ezTvVfovgP5bD+vPvILhSImKB +aU6V3zld/x/1 +=mMwU +-----END PGP PUBLIC KEY BLOCK----- +""" + + +@pytest.mark.parallel +def test_key_upload(tmpdir, openpgp_keyring_factory, pulpcore_bindings, monitor_task): + keyring = openpgp_keyring_factory() + + alice_pub = tmpdir / "alice.pub" + alice_pub.write_text(ALICE_PUB, "UTF-8") + + alice_revoked = tmpdir / "alice.revoked" + alice_revoked.write_text(ALICE_REVOKED, "UTF-8") + + bob_pub = tmpdir / "bob.pub" + bob_pub.write_text(BOB_PUB, "UTF-8") + + result = pulpcore_bindings.ContentOpenpgpPublickeyApi.create( + file=alice_pub, repository=keyring.pulp_href + ) + monitor_task(result.task) + result = pulpcore_bindings.ContentOpenpgpPublickeyApi.create( + file=bob_pub, repository=keyring.pulp_href + ) + monitor_task(result.task) + result = pulpcore_bindings.ContentOpenpgpPublickeyApi.create( + file=alice_revoked, repository=keyring.pulp_href + ) + monitor_task(result.task)