Skip to content

Commit

Permalink
Adds introspection endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kipparker committed Apr 16, 2024
1 parent 3c5da76 commit 81b63ad
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 215 deletions.
136 changes: 133 additions & 3 deletions authentication/api/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,38 @@
import base64
import time
from urllib.parse import unquote
import logging
import email.utils
import uuid

from cryptography.hazmat.primitives import hashes
import jwt
from cryptography import x509
from cryptography.hazmat.backends import default_backend

from . import conf
from . import examples

log = logging.getLogger(__name__)


class AccessTokenValidatorError(Exception):
pass


class AccessTokenNoCertificateError(AccessTokenValidatorError):
pass


class AccessTokenInactiveError(AccessTokenValidatorError):
pass


class AccessTokenTimeError(AccessTokenValidatorError):
pass


class AccessTokenCertificateError(AccessTokenValidatorError):
pass


def parse_cert(client_certificate: str) -> x509.Certificate:
Expand All @@ -25,6 +49,105 @@ def parse_cert(client_certificate: str) -> x509.Certificate:
return cert


def _check_certificate(cert, introspection_response):
if "cnf" in introspection_response:
# thumbprint from introspection response
try:
sha256 = introspection_response["cnf"]["x5t#S256"]
except KeyError:
log.warning("No x5t#S256 claim in token response, unable to proceed!")
raise AccessTokenCertificateError(
"Token does not contain a certificate binding"
)
# thumbprint from presented client certificate
fingerprint = str(
base64.urlsafe_b64encode(cert.fingerprint(hashes.SHA256())).replace(
b"=", b""
),
"utf-8",
)
if fingerprint != sha256:
log.warning(
f"introspection response thumbprint {sha256} does not match "
f"presented client cert thumbprint {fingerprint}"
)
raise AccessTokenCertificateError(
"Token certificate binding does not match presented client cert"
)
else:
# No CNF claim in the introspection response
log.warning("No cnf claim in token response, unable to proceed!")
raise AccessTokenCertificateError(
"Token does not contain a certificate binding"
)
return True


def introspect(client_certificate: str, token: str) -> dict:
"""
Introspection fails if:
1. Querying the token introspection endpoint fails
2. A token is returned with active: false
3. Scope is specified, and the required scope is not in the token scopes
4. Issued time is in the future
5. Expiry time is in the past
6. Certificate binding is enabled (default) and the fingerprint of the
presented client cert isn't a match for the claim in the
introspection response
If introspection succeeds, return a dict suitable to use as headers
including Date and x-fapi-interaction-id, as well as the introspection response
"""

# Deny access to non-MTLS connections
cert = parse_cert(client_certificate)
if cert is None:
log.warning("no client cert presented")
raise AccessTokenNoCertificateError("No client certificate presented")
introspection_response = jwt.decode(
token, algorithms=["ES256"], options={"verify_signature": False}
)
introspection_response["active"] = True
log.debug(f"introspection response {introspection_response}")

# All valid introspection responses contain 'active', as the default behaviour
# for an invalid token is to create a simple JSON {'active':false} response
if (
"active" not in introspection_response
or introspection_response["active"] is not True
):
raise AccessTokenInactiveError(
"Invalid introspection response, does not contain 'active' or is not True"
)

now = time.time()
if "iat" in introspection_response:
# Issue time must be in the past
if now < introspection_response["iat"]:
log.warning("token issued in the future")
raise AccessTokenTimeError("Token issued in the future")
if "exp" in introspection_response:
# Expiry time must be in the future
if now > introspection_response["exp"]:
log.warning("token expired")
raise AccessTokenTimeError("Token expired")

# If the token response contains a certificate binding then check it against the
# current client cert. See https://tools.ietf.org/html/rfc8705

_check_certificate(cert, introspection_response)
# If we required a particular scope, check that it's in the list of scopes
# defined for this token. Scope comparison is case insensitive
# TODO enable scope checking
# if scope:
# token_scopes = introspection_response['scope'].lower().split(' ') \
# if 'scope' in introspection_response else []
# log.debug(f'found scopes in token {token_scopes}')
# if scope.lower() not in token_scopes:
# log.warning(f'scope \'{scope}\' not in token scopes {token_scopes}')
return introspection_response


def get_thumbprint(cert: str) -> str:
"""
Returns the thumbprint of a certificate
Expand Down Expand Up @@ -52,8 +175,15 @@ def create_id_token(subject="platform_user") -> str:
private_key_path = get_key("key")
with open(private_key_path, "rb") as f:
private_key = f.read()
id_token = jwt.encode(claims, private_key, algorithm="ES256", headers={"kid": "1"})
return id_token
return jwt.encode(claims, private_key, algorithm="ES256", headers={"kid": "1"})


def create_enhanced_access_token(claims: dict, client_certificate: str) -> str:
claims["cnf"] = {"x5t#S256": get_thumbprint(client_certificate)}
private_key_path = get_key("key")
with open(private_key_path, "rb") as f:
private_key = f.read()
return jwt.encode(claims, private_key, algorithm="ES256")


def get_key(type: str = "key") -> str:
Expand Down
99 changes: 0 additions & 99 deletions authentication/api/authentication.py

This file was deleted.

Loading

0 comments on commit 81b63ad

Please sign in to comment.