Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Policy engine issuer certificate attributes #5

Merged
merged 3 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions docs/registration_policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ from jsonschema import validate, ValidationError

from scitt_emulator.scitt import ClaimInvalidError, CWTClaims
from scitt_emulator.verify_statement import verify_statement
from scitt_emulator.key_helpers import verification_key_to_object


def main():
Expand All @@ -107,23 +108,30 @@ def main():
f"Claim content type does not start with application/json: {msg.phdr[pycose.headers.ContentType]!r}"
)

cwt_cose_key, _pycose_cose_key = verify_statement(msg)
verification_key = verify_statement(msg)
unittest.TestCase().assertTrue(
cwt_cose_key,
verification_key,
"Failed to verify signature on statement",
)

cwt_protected = cwt.decode(msg.phdr[CWTClaims], cwt_cose_key)
cwt_protected = cwt.decode(msg.phdr[CWTClaims], verification_key.cwt)
issuer = cwt_protected[1]
subject = cwt_protected[2]

issuer_key_as_object = verification_key_to_object(verification_key)
unittest.TestCase().assertTrue(
issuer_key_as_object,
"Failed to convert issuer key to JSON schema verifiable object",
)

SCHEMA = json.loads(pathlib.Path(os.environ["SCHEMA_PATH"]).read_text())

try:
validate(
instance={
"$schema": "https://schema.example.com/scitt-policy-engine-jsonschema.schema.json",
"issuer": issuer,
"issuer_key": issuer_key_as_object,
"subject": subject,
"claim": json.loads(msg.payload.decode()),
},
Expand Down
17 changes: 17 additions & 0 deletions scitt_emulator/key_helper_dataclasses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dataclasses import dataclass, field
from typing import List, Any, Union

import cwt
import pycose.keys.ec2


@dataclass
class VerificationKey:
transforms: List[Any]
original: Any
original_content_type: str
original_bytes: bytes
original_bytes_encoding: str
usable: bool
cwt: Union[cwt.COSEKey, None]
cose: Union[pycose.keys.ec2.EC2Key, None]
41 changes: 41 additions & 0 deletions scitt_emulator/key_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import itertools
import importlib.metadata
from typing import Optional, Callable, List, Tuple

from scitt_emulator.key_helper_dataclasses import VerificationKey


ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT = "scitt_emulator.key_helpers.verification_key_to_object"


def verification_key_to_object(
verification_key: VerificationKey,
*,
key_transforms: Optional[List[Callable[[VerificationKey], dict]]] = None,
) -> bool:
"""
Resolve keys for statement issuer and verify signature on COSESign1
statement and embedded CWT
"""
if key_transforms is None:
key_transforms = []
# There is some difference in the return value of entry_points across
# Python versions/envs (conda vs. non-conda). Python 3.8 returns a dict.
entrypoints = importlib.metadata.entry_points()
if isinstance(entrypoints, dict):
for entrypoint in entrypoints.get(ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT, []):
key_transforms.append(entrypoint.load())
elif isinstance(entrypoints, getattr(importlib.metadata, "EntryPoints", list)):
for entrypoint in entrypoints:
if entrypoint.group == ENTRYPOINT_KEY_TRANSFORMS_TO_OBJECT:
key_transforms.append(entrypoint.load())
else:
raise TypeError(f"importlib.metadata.entry_points returned unknown type: {type(entrypoints)}: {entrypoints!r}")

for key_transform in key_transforms:
verification_key_as_object = key_transform(verification_key)
# Skip keys that we couldn't derive COSE keys for
if verification_key_as_object:
return verification_key_as_object

return None
61 changes: 32 additions & 29 deletions scitt_emulator/key_loader_format_did_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,48 @@
import cwt.algs.ec2
import pycose
import pycose.keys.ec2
import cryptography.hazmat.primitives.asymmetric.ec
from cryptography.hazmat.primitives import serialization

# TODO Remove this once we have a example flow for proper key verification
import jwcrypto.jwk

from scitt_emulator.did_helpers import DID_KEY_METHOD, did_key_to_cryptography_key
from scitt_emulator.key_helper_dataclasses import VerificationKey


# TODO What is the correct content type? Should we differ if it's been expanded?
CONTENT_TYPE = "application/key+did"


def key_loader_format_did_key(
unverified_issuer: str,
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
jwk_keys = []
cwt_cose_keys = []
pycose_cose_keys = []
cryptography_keys = []

) -> List[VerificationKey]:
if not unverified_issuer.startswith(DID_KEY_METHOD):
return pycose_cose_keys

cryptography_keys.append(did_key_to_cryptography_key(unverified_issuer))

for cryptography_key in cryptography_keys:
jwk_keys.append(
jwcrypto.jwk.JWK.from_pem(
cryptography_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
)
return []
key = did_key_to_cryptography_key(unverified_issuer)
return [
VerificationKey(
transforms=[key],
original=key,
original_content_type=CONTENT_TYPE,
original_bytes=unverified_issuer.encode("utf-8"),
original_bytes_encoding="utf-8",
usable=False,
cwt=None,
cose=None,
)

for jwk_key in jwk_keys:
cwt_cose_key = cwt.COSEKey.from_pem(
jwk_key.export_to_pem(),
kid=jwk_key.thumbprint(),
]


def transform_key_instance_cryptography_ecc_public_to_jwcrypto_jwk(
key: cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey,
) -> jwcrypto.jwk.JWK:
if not isinstance(key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey):
raise TypeError(key)
return jwcrypto.jwk.JWK.from_pem(
key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
cwt_cose_keys.append(cwt_cose_key)
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))

return pycose_cose_keys
)
58 changes: 41 additions & 17 deletions scitt_emulator/key_loader_format_url_referencing_oidc_issuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,22 @@
import jwcrypto.jwk

from scitt_emulator.did_helpers import did_web_to_url
from scitt_emulator.key_helper_dataclasses import VerificationKey


CONTENT_TYPE = "application/jwk+json"


def key_loader_format_url_referencing_oidc_issuer(
unverified_issuer: str,
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
jwk_keys = []
cwt_cose_keys = []
pycose_cose_keys = []
keys = []

if unverified_issuer.startswith("did:web:"):
unverified_issuer = did_web_to_url(unverified_issuer)

if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
return pycose_cose_keys
return keys

# TODO Logging for URLErrors
# Check if OIDC issuer
Expand All @@ -44,18 +46,40 @@ def key_loader_format_url_referencing_oidc_issuer(
jwks = json.loads(response.read())
for jwk_key_as_dict in jwks["keys"]:
jwk_key_as_string = json.dumps(jwk_key_as_dict)
jwk_keys.append(
jwcrypto.jwk.JWK.from_json(jwk_key_as_string),
jwk_key = jwcrypto.jwk.JWK.from_json(jwk_key_as_string)
keys.append(
VerificationKey(
transforms=[jwk_key],
original=jwk_key,
original_content_type=CONTENT_TYPE,
original_bytes=jwk_key_as_string.encode("utf-8"),
original_bytes_encoding="utf-8",
usable=False,
cwt=None,
cose=None,
)
)

for jwk_key in jwk_keys:
cwt_cose_key = cwt.COSEKey.from_pem(
jwk_key.export_to_pem(),
kid=jwk_key.thumbprint(),
)
cwt_cose_keys.append(cwt_cose_key)
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))

return pycose_cose_keys
return keys


def transform_key_instance_jwcrypto_jwk_to_cwt_cose(
key: jwcrypto.jwk.JWK,
) -> cwt.COSEKey:
if not isinstance(key, jwcrypto.jwk.JWK):
raise TypeError(key)
return cwt.COSEKey.from_pem(
key.export_to_pem(),
kid=key.thumbprint(),
)


def to_object_oidc_issuer(verification_key: VerificationKey) -> dict:
if verification_key.original_content_type != CONTENT_TYPE:
return

return {
**verification_key.original.export_public(as_dict=True),
"use": "sig",
"kid": verification_key.original.thumbprint(),
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@
def key_loader_format_url_referencing_ssh_authorized_keys(
unverified_issuer: str,
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
jwk_keys = []
cwt_cose_keys = []
pycose_cose_keys = []

cryptography_ssh_keys = []
keys = []

if unverified_issuer.startswith("did:web:"):
unverified_issuer = did_web_to_url(unverified_issuer)

if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
return pycose_cose_keys
return keys

# Try loading ssh keys. Example: https://github.com/username.keys
with contextlib.suppress(urllib.request.URLError):
Expand All @@ -38,28 +34,18 @@ def key_loader_format_url_referencing_ssh_authorized_keys(
with contextlib.suppress(
(ValueError, cryptography.exceptions.UnsupportedAlgorithm)
):
cryptography_ssh_keys.append(
serialization.load_ssh_public_key(line)
key = serialization.load_ssh_public_key(line)
keys.append(
VerificationKey(
transforms=[key],
original=key,
original_content_type=CONTENT_TYPE,
original_bytes=line.encode("utf-8"),
original_bytes_encoding="utf-8",
usable=False,
cwt=None,
cose=None,
)
)

for cryptography_ssh_key in cryptography_ssh_keys:
jwk_keys.append(
jwcrypto.jwk.JWK.from_pem(
cryptography_ssh_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
)
)

for jwk_key in jwk_keys:
cwt_cose_key = cwt.COSEKey.from_pem(
jwk_key.export_to_pem(),
kid=jwk_key.thumbprint(),
)
cwt_cose_keys.append(cwt_cose_key)
cwt_ec2_key_as_dict = cwt_cose_key.to_dict()
pycose_cose_key = pycose.keys.ec2.EC2Key.from_dict(cwt_ec2_key_as_dict)
pycose_cose_keys.append((cwt_cose_key, pycose_cose_key))

return pycose_cose_keys
return keys
67 changes: 67 additions & 0 deletions scitt_emulator/key_loader_format_url_referencing_x509.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import contextlib
import urllib.parse
import urllib.request
from typing import List, Tuple

import cwt
import cwt.algs.ec2
import pycose
import pycose.keys.ec2
import cryptography.exceptions
from cryptography.hazmat.primitives import serialization

# TODO Remove this once we have a example flow for proper key verification
import jwcrypto.jwk

from scitt_emulator.did_helpers import did_web_to_url
from scitt_emulator.key_helper_dataclasses import VerificationKey


CONTENT_TYPE = "application/pkix-cert"


def key_loader_format_url_referencing_x509(
unverified_issuer: str,
) -> List[Tuple[cwt.COSEKey, pycose.keys.ec2.EC2Key]]:
keys = []

if unverified_issuer.startswith("did:web:"):
unverified_issuer = did_web_to_url(unverified_issuer)

if "://" not in unverified_issuer or unverified_issuer.startswith("file://"):
return keys

with contextlib.suppress(urllib.request.URLError):
with urllib.request.urlopen(unverified_issuer) as response:
contents = response.read()
with contextlib.suppress(
(ValueError, cryptography.exceptions.UnsupportedAlgorithm)
):
for certificate in cryptography.x509.load_pem_x509_certificates(
contents
):
key = certificate.public_key()
keys.append(
VerificationKey(
transforms=[key],
original=key,
original_content_type=CONTENT_TYPE,
original_bytes=contents,
original_bytes_encoding="utf-8",
usable=False,
cwt=None,
cose=None,
)
)

return keys


def to_object_x509(verification_key: VerificationKey) -> dict:
if verification_key.original_content_type != CONTENT_TYPE:
return

# TODO to dict
verification_key.original

return {}
Loading
Loading