From c6c77766eb5733f7a332d22beafd980c808f6ccd Mon Sep 17 00:00:00 2001 From: Ian Liu <81595625+ianliuwk1019@users.noreply.github.com> Date: Thu, 25 Jul 2024 15:11:39 -0700 Subject: [PATCH] fix: #1454 remove python-jose package dependency from BCSC integration. (#1500) --- .../app/integration/bcsc/bcsc_constants.py | 107 ++++++ .../{ => integration/bcsc}/bcsc_decryption.py | 50 ++- .../api/app/integration/bcsc/bcsc_jwk.py | 342 ++++++++++++++++++ .../api/app/routers/router_bcsc_proxy.py | 9 +- server/backend/api/app/utils/utils.py | 28 ++ server/backend/requirements.txt | 1 - terraform/dev/terragrunt.hcl | 4 +- 7 files changed, 520 insertions(+), 21 deletions(-) create mode 100644 server/backend/api/app/integration/bcsc/bcsc_constants.py rename server/backend/api/app/{ => integration/bcsc}/bcsc_decryption.py (80%) create mode 100644 server/backend/api/app/integration/bcsc/bcsc_jwk.py diff --git a/server/backend/api/app/integration/bcsc/bcsc_constants.py b/server/backend/api/app/integration/bcsc/bcsc_constants.py new file mode 100644 index 000000000..94e1b1346 --- /dev/null +++ b/server/backend/api/app/integration/bcsc/bcsc_constants.py @@ -0,0 +1,107 @@ +import hashlib + + +class JWEError(Exception): + """Base error for all JWE errors""" + pass + + +class JWEParseError(JWEError): + """Could not parse the JWE string provided""" + pass + + +class JOSEError(Exception): + pass + + +class JWKError(JOSEError): + pass + + +class Algorithms: + # DS Algorithms + NONE = "none" + HS256 = "HS256" + HS384 = "HS384" + HS512 = "HS512" + RS256 = "RS256" + RS384 = "RS384" + RS512 = "RS512" + ES256 = "ES256" + ES384 = "ES384" + ES512 = "ES512" + + # Content Encryption Algorithms + A128CBC_HS256 = "A128CBC-HS256" + A192CBC_HS384 = "A192CBC-HS384" + A256CBC_HS512 = "A256CBC-HS512" + A128GCM = "A128GCM" + A192GCM = "A192GCM" + A256GCM = "A256GCM" + + # Pseudo algorithm for encryption + A128CBC = "A128CBC" + A192CBC = "A192CBC" + A256CBC = "A256CBC" + + # CEK Encryption Algorithms + DIR = "dir" + RSA1_5 = "RSA1_5" + RSA_OAEP = "RSA-OAEP" + RSA_OAEP_256 = "RSA-OAEP-256" + A128KW = "A128KW" + A192KW = "A192KW" + A256KW = "A256KW" + ECDH_ES = "ECDH-ES" + ECDH_ES_A128KW = "ECDH-ES+A128KW" + ECDH_ES_A192KW = "ECDH-ES+A192KW" + ECDH_ES_A256KW = "ECDH-ES+A256KW" + A128GCMKW = "A128GCMKW" + A192GCMKW = "A192GCMKW" + A256GCMKW = "A256GCMKW" + PBES2_HS256_A128KW = "PBES2-HS256+A128KW" + PBES2_HS384_A192KW = "PBES2-HS384+A192KW" + PBES2_HS512_A256KW = "PBES2-HS512+A256KW" + + # Compression Algorithms + DEF = "DEF" + + HMAC = {HS256, HS384, HS512} + RSA_DS = {RS256, RS384, RS512} + RSA_KW = {RSA1_5, RSA_OAEP, RSA_OAEP_256} + RSA = RSA_DS.union(RSA_KW) + EC_DS = {ES256, ES384, ES512} + EC_KW = {ECDH_ES, ECDH_ES_A128KW, ECDH_ES_A192KW, ECDH_ES_A256KW} + EC = EC_DS.union(EC_KW) + AES_PSEUDO = {A128CBC, A192CBC, A256CBC, A128GCM, A192GCM, A256GCM} + AES_JWE_ENC = {A128CBC_HS256, A192CBC_HS384, A256CBC_HS512, A128GCM, A192GCM, A256GCM} + AES_ENC = AES_JWE_ENC.union(AES_PSEUDO) + AES_KW = {A128KW, A192KW, A256KW} + AEC_GCM_KW = {A128GCMKW, A192GCMKW, A256GCMKW} + AES = AES_ENC.union(AES_KW) + PBES2_KW = {PBES2_HS256_A128KW, PBES2_HS384_A192KW, PBES2_HS512_A256KW} + + HMAC_AUTH_TAG = {A128CBC_HS256, A192CBC_HS384, A256CBC_HS512} + GCM = {A128GCM, A192GCM, A256GCM} + + SUPPORTED = HMAC.union(RSA_DS).union(EC_DS).union([DIR]).union(AES_JWE_ENC).union(RSA_KW).union(AES_KW) + + ALL = SUPPORTED.union([NONE]).union(AEC_GCM_KW).union(EC_KW).union(PBES2_KW) + + HASHES = { + HS256: hashlib.sha256, + HS384: hashlib.sha384, + HS512: hashlib.sha512, + RS256: hashlib.sha256, + RS384: hashlib.sha384, + RS512: hashlib.sha512, + ES256: hashlib.sha256, + ES384: hashlib.sha384, + ES512: hashlib.sha512, + } + + KEYS = {} + + +ALGORITHMS = Algorithms() diff --git a/server/backend/api/app/bcsc_decryption.py b/server/backend/api/app/integration/bcsc/bcsc_decryption.py similarity index 80% rename from server/backend/api/app/bcsc_decryption.py rename to server/backend/api/app/integration/bcsc/bcsc_decryption.py index ca0fbb42c..dc1a8dda0 100644 --- a/server/backend/api/app/bcsc_decryption.py +++ b/server/backend/api/app/integration/bcsc/bcsc_decryption.py @@ -1,11 +1,31 @@ import binascii import json +import logging from collections.abc import Mapping from struct import pack -from jose import jwk -from jose.constants import ALGORITHMS -from jose.exceptions import JWEError, JWEParseError -from jose.utils import base64url_decode, ensure_binary + +from api.app.integration.bcsc import bcsc_jwk +from api.app.integration.bcsc.bcsc_constants import (ALGORITHMS, JWEError, + JWEParseError) +from api.app.utils import utils + +LOGGER = logging.getLogger(__name__) + +""" +Note: +# - All integration files under "integration/bcsc" path are based on previous "python-jose" package + and ported library code to FAM with modification in order to make FAM-BCSC login flow + encryption-decryptoin works + - To get rid of "python-jose" package (which has security vulnerability and did not have updated version + and currently is not maintained for years), attempts to use other libraries with jwe capabilities were + not successful (authlib and joserfc libraries), so temporary solution for now is to ported more code to + be able to remove dependency. This is far more than ideal and code is ugly. +# - The difficult decision for why copying library code to modify for fitting BCSC authentication can be + found on ticket: (#1454 - https://github.com/orgs/bcgov/projects/65/views/1?pane=issue&itemId=67731674) +# - It would be better later if it is still possble to resolve jwe token decryption difficulties with + BCSC team to standardize the JWE token encryption-decryption with common known practice and not with + BCSC customized internal token spec. +""" def decrypt(jwe_str, decrypted_key): @@ -27,7 +47,7 @@ def decrypt(jwe_str, decrypted_key): >>> jwe.decrypt(jwe_string, 'asecret128bitkey') 'Hello, World!' """ - header, encoded_header, encrypted_key, iv, cipher_text, auth_tag = _jwe_compact_deserialize(jwe_str) + header, encoded_header, _encrypted_key, iv, cipher_text, auth_tag = _jwe_compact_deserialize(jwe_str) try: # Determine the Key Management Mode employed by the algorithm @@ -116,8 +136,10 @@ def _decrypt_and_auth(cek_bytes, enc, cipher_text, iv, aad, auth_tag): if enc in ALGORITHMS.HMAC_AUTH_TAG: encryption_key, mac_key, key_len = _get_encryption_key_mac_key_and_key_length_from_cek(cek_bytes, enc) auth_tag_check = _auth_tag(cipher_text, iv, aad, mac_key, key_len) + + # BCSC enc uses algorithm in ALGORITHMS.HMAC_AUTH_TAG, below will not run. elif enc in ALGORITHMS.GCM: - encryption_key = jwk.construct(cek_bytes, enc) + encryption_key = bcsc_jwk.jwk_construct(cek_bytes, enc) auth_tag_check = auth_tag # GCM check auth on decrypt else: raise NotImplementedError(f"enc {enc} is not implemented!") @@ -141,7 +163,7 @@ def _get_hmac_key(enc, mac_key_bytes): (HMACKey): The key to perform HMAC actions """ _, hash_alg = enc.split("-") - mac_key = jwk.construct(mac_key_bytes, hash_alg) + mac_key = bcsc_jwk.jwk_construct(mac_key_bytes, hash_alg) return mac_key @@ -151,7 +173,7 @@ def _get_encryption_key_mac_key_and_key_length_from_cek(cek_bytes, enc): mac_key = _get_hmac_key(enc, mac_key_bytes) encryption_key_bytes = cek_bytes[-derived_key_len:] encryption_alg, _ = enc.split("-") - encryption_key = jwk.construct(encryption_key_bytes, encryption_alg) + encryption_key = bcsc_jwk.jwk_construct(encryption_key_bytes, encryption_alg) return encryption_key, mac_key, derived_key_len @@ -170,12 +192,12 @@ def _jwe_compact_deserialize(jwe_bytes): # Vector, the JWE Ciphertext, the JWE Authentication Tag, and the # JWE AAD, following the restriction that no line breaks, # whitespace, or other additional characters have been used. - jwe_bytes = ensure_binary(jwe_bytes) + jwe_bytes = utils.ensure_binary(jwe_bytes) try: header_segment, encrypted_key_segment, iv_segment, cipher_text_segment, auth_tag_segment = jwe_bytes.split( b".", 4 ) - header_data = base64url_decode(header_segment) + header_data = utils.base64url_decode(header_segment) except ValueError: raise JWEParseError("Not enough segments") except (TypeError): @@ -207,22 +229,22 @@ def _jwe_compact_deserialize(jwe_bytes): raise JWEParseError("Invalid header string: must be a json object") try: - encrypted_key = base64url_decode(encrypted_key_segment) + encrypted_key = utils.base64url_decode(encrypted_key_segment) except (TypeError, binascii.Error): raise JWEParseError("Invalid encrypted key") try: - iv = base64url_decode(iv_segment) + iv = utils.base64url_decode(iv_segment) except (TypeError, binascii.Error): raise JWEParseError("Invalid IV") try: - ciphertext = base64url_decode(cipher_text_segment) + ciphertext = utils.base64url_decode(cipher_text_segment) except (TypeError, binascii.Error): raise JWEParseError("Invalid cyphertext") try: - auth_tag = base64url_decode(auth_tag_segment) + auth_tag = utils.base64url_decode(auth_tag_segment) except (TypeError, binascii.Error): raise JWEParseError("Invalid auth tag") diff --git a/server/backend/api/app/integration/bcsc/bcsc_jwk.py b/server/backend/api/app/integration/bcsc/bcsc_jwk.py new file mode 100644 index 000000000..f7a0cc754 --- /dev/null +++ b/server/backend/api/app/integration/bcsc/bcsc_jwk.py @@ -0,0 +1,342 @@ +import base64 +import hashlib +import hmac +import logging + +from api.app.integration.bcsc.bcsc_constants import (ALGORITHMS, JWEError, + JWKError) +from api.app.utils import utils +from api.app.utils.utils import base64url_decode +from cryptography.exceptions import InvalidTag +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.ciphers import (Cipher, aead, algorithms, + modes) +from cryptography.hazmat.primitives.padding import PKCS7 + +LOGGER = logging.getLogger(__name__) + +# This code partial is from "python-jose" not maintained library. +# https://pypi.org/project/python-jose/, please see notes at +# bcsc_dscryption.py. + + +def jwk_construct(key_data, algorithm=None): + """ + Construct a Key object for the given algorithm with the given + key_data. + """ + + # Allow for pulling the algorithm off of the passed in jwk. + if not algorithm and isinstance(key_data, dict): + algorithm = key_data.get("alg", None) + + if not algorithm: + raise JWKError("Unable to find an algorithm for key: %s" % key_data) + + key_class = get_key(algorithm) + LOGGER.debug(f"key_class: {key_class}") + if not key_class: + raise JWKError("Unable to find an algorithm for key: %s" % key_data) + return key_class(key_data, algorithm) + + +# BCSC uses HMAC and AES for now. Comment out conditions to +# be easier to be ported. +def get_key(algorithm): + if algorithm in ALGORITHMS.KEYS: + return ALGORITHMS.KEYS[algorithm] + elif algorithm in ALGORITHMS.HMAC: # noqa: F811 + return HMACKey + elif algorithm in ALGORITHMS.AES: + from jose.backends import AESKey # noqa: F811 + + return AESKey + """ + # elif algorithm in ALGORITHMS.RSA: + # from jose.backends import RSAKey # noqa: F811 + + # return RSAKey + # elif algorithm in ALGORITHMS.EC: + # from jose.backends import ECKey # noqa: F811 + + # return ECKey + # elif algorithm == ALGORITHMS.DIR: + # from jose.backends import DIRKey # noqa: F811 + + # return DIRKey + """ + return None + + +class Key: + """ + A simple interface for implementing JWK keys. + """ + + def __init__(self, key, algorithm): + pass # sonar fix: this is library code interface. + + def sign(self, msg): + raise NotImplementedError() + + def verify(self, msg, sig): + raise NotImplementedError() + + def public_key(self): + raise NotImplementedError() + + def to_pem(self): + raise NotImplementedError() + + def to_dict(self): + raise NotImplementedError() + + def encrypt(self, plain_text, aad=None): + """ + Encrypt the plain text and generate an auth tag if appropriate + + Args: + plain_text (bytes): Data to encrypt + aad (bytes, optional): Authenticated Additional Data if key's algorithm supports auth mode + + Returns: + (bytes, bytes, bytes): IV, cipher text, and auth tag + """ + raise NotImplementedError() + + def decrypt(self, cipher_text, iv=None, aad=None, tag=None): + """ + Decrypt the cipher text and validate the auth tag if present + Args: + cipher_text (bytes): Cipher text to decrypt + iv (bytes): IV if block mode + aad (bytes): Additional Authenticated Data to verify if auth mode + tag (bytes): Authentication tag if auth mode + + Returns: + bytes: Decrypted value + """ + raise NotImplementedError() + + def wrap_key(self, key_data): + """ + Wrap the the plain text key data + + Args: + key_data (bytes): Key data to wrap + + Returns: + bytes: Wrapped key + """ + raise NotImplementedError() + + def unwrap_key(self, wrapped_key): + """ + Unwrap the the wrapped key data + + Args: + wrapped_key (bytes): Wrapped key data to unwrap + + Returns: + bytes: Unwrapped key + """ + raise NotImplementedError() + + +class HMACKey(Key): + """ + Performs signing and verification operations using HMAC + and the specified hash function. + """ + + HASHES = {ALGORITHMS.HS256: hashlib.sha256, ALGORITHMS.HS384: hashlib.sha384, ALGORITHMS.HS512: hashlib.sha512} + + def __init__(self, key, algorithm): + if algorithm not in ALGORITHMS.HMAC: + raise JWKError("hash_alg: %s is not a valid hash algorithm" % algorithm) + self._algorithm = algorithm + self._hash_alg = self.HASHES.get(algorithm) + + if isinstance(key, dict): + self.prepared_key = self._process_jwk(key) + return + + if not isinstance(key, str) and not isinstance(key, bytes): + raise JWKError("Expecting a string- or bytes-formatted key.") + + if isinstance(key, str): + key = key.encode("utf-8") + + invalid_strings = [ + b"-----BEGIN PUBLIC KEY-----", + b"-----BEGIN RSA PUBLIC KEY-----", + b"-----BEGIN CERTIFICATE-----", + b"ssh-rsa", + ] + + if any(string_value in key for string_value in invalid_strings): + raise JWKError( + "The specified key is an asymmetric key or x509 certificate and" + " should not be used as an HMAC secret." + ) + + self.prepared_key = key + + def _process_jwk(self, jwk_dict): + if not jwk_dict.get("kty") == "oct": + raise JWKError("Incorrect key type. Expected: 'oct', Received: %s" % jwk_dict.get("kty")) + + k = jwk_dict.get("k") + k = k.encode("utf-8") + k = bytes(k) + k = base64url_decode(k) + + return k + + def sign(self, msg): + return hmac.new(self.prepared_key, msg, self._hash_alg).digest() + + def verify(self, msg, sig): + return hmac.compare_digest(sig, self.sign(msg)) + + def to_dict(self): + return { + "alg": self._algorithm, + "kty": "oct", + "k": base64url_encode(self.prepared_key).decode("ASCII"), + } + + +class CryptographyAESKey(Key): + KEY_128 = (ALGORITHMS.A128GCM, ALGORITHMS.A128GCMKW, ALGORITHMS.A128KW, ALGORITHMS.A128CBC) + KEY_192 = (ALGORITHMS.A192GCM, ALGORITHMS.A192GCMKW, ALGORITHMS.A192KW, ALGORITHMS.A192CBC) + KEY_256 = ( + ALGORITHMS.A256GCM, + ALGORITHMS.A256GCMKW, + ALGORITHMS.A256KW, + ALGORITHMS.A128CBC_HS256, + ALGORITHMS.A256CBC, + ) + KEY_384 = (ALGORITHMS.A192CBC_HS384,) + KEY_512 = (ALGORITHMS.A256CBC_HS512,) + + AES_KW_ALGS = (ALGORITHMS.A128KW, ALGORITHMS.A192KW, ALGORITHMS.A256KW) + + MODES = { + ALGORITHMS.A128GCM: modes.GCM, + ALGORITHMS.A192GCM: modes.GCM, + ALGORITHMS.A256GCM: modes.GCM, + ALGORITHMS.A128CBC_HS256: modes.CBC, + ALGORITHMS.A192CBC_HS384: modes.CBC, + ALGORITHMS.A256CBC_HS512: modes.CBC, + ALGORITHMS.A128CBC: modes.CBC, + ALGORITHMS.A192CBC: modes.CBC, + ALGORITHMS.A256CBC: modes.CBC, + ALGORITHMS.A128GCMKW: modes.GCM, + ALGORITHMS.A192GCMKW: modes.GCM, + ALGORITHMS.A256GCMKW: modes.GCM, + ALGORITHMS.A128KW: None, + ALGORITHMS.A192KW: None, + ALGORITHMS.A256KW: None, + } + + def __init__(self, key, algorithm): + if algorithm not in ALGORITHMS.AES: + raise JWKError("%s is not a valid AES algorithm" % algorithm) + if algorithm not in ALGORITHMS.SUPPORTED.union(ALGORITHMS.AES_PSEUDO): + raise JWKError("%s is not a supported algorithm" % algorithm) + + self._algorithm = algorithm + self._mode = self.MODES.get(self._algorithm) + + if algorithm in self.KEY_128 and len(key) != 16: + raise JWKError(f"Key must be 128 bit for alg {algorithm}") + elif algorithm in self.KEY_192 and len(key) != 24: + raise JWKError(f"Key must be 192 bit for alg {algorithm}") + elif algorithm in self.KEY_256 and len(key) != 32: + raise JWKError(f"Key must be 256 bit for alg {algorithm}") + elif algorithm in self.KEY_384 and len(key) != 48: + raise JWKError(f"Key must be 384 bit for alg {algorithm}") + elif algorithm in self.KEY_512 and len(key) != 64: + raise JWKError(f"Key must be 512 bit for alg {algorithm}") + + self._key = key + + def to_dict(self): + data = {"alg": self._algorithm, "kty": "oct", "k": base64url_encode(self._key)} + return data + + # Commented out, no encryption is needed for FAM-BCSC. + # def encrypt(self, plain_text, aad=None): + # plain_text = utils.ensure_binary(plain_text) + # try: + # iv = get_random_bytes(algorithms.AES.block_size // 8) + # mode = self._mode(iv) + # if mode.name == "GCM": + # cipher = aead.AESGCM(self._key) + # cipher_text_and_tag = cipher.encrypt(iv, plain_text, aad) + # cipher_text = cipher_text_and_tag[: len(cipher_text_and_tag) - 16] + # auth_tag = cipher_text_and_tag[-16:] + # else: + # cipher = Cipher(algorithms.AES(self._key), mode, backend=default_backend()) + # encryptor = cipher.encryptor() + # padder = PKCS7(algorithms.AES.block_size).padder() + # padded_data = padder.update(plain_text) + # padded_data += padder.finalize() + # cipher_text = encryptor.update(padded_data) + encryptor.finalize() + # auth_tag = None + # return iv, cipher_text, auth_tag + # except Exception as e: + # raise JWEError(e) + + def decrypt(self, cipher_text, iv=None, aad=None, tag=None): + cipher_text = utils.ensure_binary(cipher_text) + try: + iv = utils.ensure_binary(iv) + mode = self._mode(iv) + if mode.name == "GCM": + if tag is None: + raise ValueError("tag cannot be None") + cipher = aead.AESGCM(self._key) + cipher_text_and_tag = cipher_text + tag + try: + plain_text = cipher.decrypt(iv, cipher_text_and_tag, aad) + except InvalidTag: + raise JWEError("Invalid JWE Auth Tag") + else: + cipher = Cipher(algorithms.AES(self._key), mode, backend=default_backend()) + decryptor = cipher.decryptor() + padded_plain_text = decryptor.update(cipher_text) + padded_plain_text += decryptor.finalize() + unpadder = PKCS7(algorithms.AES.block_size).unpadder() + plain_text = unpadder.update(padded_plain_text) + plain_text += unpadder.finalize() + + return plain_text + except Exception as e: + raise JWEError(e) + + # Commented out, no encryption is needed for FAM-BCSC. + # def wrap_key(self, key_data): + # key_data = utils.ensure_binary(key_data) + # cipher_text = aes_key_wrap(self._key, key_data, default_backend()) + # return cipher_text # IV, cipher text, auth tag + + # Commented out, no encryption is needed for FAM-BCSC. + # def unwrap_key(self, wrapped_key): + # wrapped_key = utils.ensure_binary(wrapped_key) + # try: + # plain_text = aes_key_unwrap(self._key, wrapped_key, default_backend()) + # except InvalidUnwrap as cause: + # raise JWEError(cause) + # return plain_text + + +def base64url_encode(input): + """Helper method to base64url_encode a string. + + Args: + input (str): A base64url_encoded string to encode. + + """ + return base64.urlsafe_b64encode(input).replace(b"=", b"") \ No newline at end of file diff --git a/server/backend/api/app/routers/router_bcsc_proxy.py b/server/backend/api/app/routers/router_bcsc_proxy.py index 45d86e6b9..41e244cbe 100644 --- a/server/backend/api/app/routers/router_bcsc_proxy.py +++ b/server/backend/api/app/routers/router_bcsc_proxy.py @@ -1,17 +1,18 @@ import json import logging +import jwt import requests +from api.app.integration.bcsc import bcsc_decryption +from api.app.utils import utils from authlib.jose import JsonWebKey from cryptography.hazmat.primitives import \ serialization as crypto_serialization from fastapi import APIRouter, Depends, HTTPException, Request, Response from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse -from jose import jwt -from jose.utils import base64url_decode -from .. import bcsc_decryption, kms_lookup +from .. import kms_lookup LOGGER = logging.getLogger(__name__) @@ -104,7 +105,7 @@ def bcsc_userinfo(request: Request, bcsc_userinfo_uri): LOGGER.debug(f"encrypted_key_segment: [{encrypted_key_segment}]") # In AWS Decode and decrypt the cek (only works in AWS because kms code) - as_bytes = base64url_decode(encrypted_key_segment) + as_bytes = utils.base64url_decode(encrypted_key_segment) LOGGER.debug(f"as_bytes: [{as_bytes}]") decrypted_key = kms_lookup.decrypt(as_bytes) diff --git a/server/backend/api/app/utils/utils.py b/server/backend/api/app/utils/utils.py index 57e737edb..1f7c8b043 100644 --- a/server/backend/api/app/utils/utils.py +++ b/server/backend/api/app/utils/utils.py @@ -1,3 +1,4 @@ +import base64 import json import logging from http import HTTPStatus @@ -30,3 +31,30 @@ def raise_http_exception( "description": error_msg, } ) + + +# This is util function to add padding (for base64.urlsafe_b64decode to work) +# https://stackoverflow.com/questions/3302946/how-to-decode-base64-url-in-python +def base64url_decode(input): + """Helper method to base64url_decode a string. + + Args: + input (str): A base64url_encoded string to decode. + + """ + rem = len(input) % 4 + + if rem > 0: + input += b"=" * (4 - rem) + + return base64.urlsafe_b64decode(input) + + +def ensure_binary(s): + """Coerce **s** to bytes.""" + + if isinstance(s, bytes): + return s + if isinstance(s, str): + return s.encode("utf-8", "strict") + raise TypeError(f"not expecting type '{type(s)}'") \ No newline at end of file diff --git a/server/backend/requirements.txt b/server/backend/requirements.txt index f54735218..6c8d2b0fb 100644 --- a/server/backend/requirements.txt +++ b/server/backend/requirements.txt @@ -10,7 +10,6 @@ email-validator==2.1.1 requests==2.32.0 psycopg2-binary==2.9.9 mangum==0.17.0 -python-jose==3.3.0 cryptography==42.0.7 authlib==1.3.0 pyjwt==2.8.0 \ No newline at end of file diff --git a/terraform/dev/terragrunt.hcl b/terraform/dev/terragrunt.hcl index 34fe2f3f2..598e0aa0f 100644 --- a/terraform/dev/terragrunt.hcl +++ b/terraform/dev/terragrunt.hcl @@ -43,10 +43,10 @@ generate "dev_tfvars" { fam_console_idp_name_bceid = "TEST-BCEIDBUSINESS" forest_client_api_base_url_test = "${local.common_vars.inputs.forest_client_api_test_base_url}" use_override_proxy_endpoints = true - dev_override_bcsc_userinfo_proxy_endpoint = "https://6mud7781pe.execute-api.ca-central-1.amazonaws.com/v1/bcsc/userinfo/dev" + dev_override_bcsc_userinfo_proxy_endpoint = "https://xy7pk81p4h.execute-api.ca-central-1.amazonaws.com/v1/bcsc/userinfo/dev" test_override_bcsc_userinfo_proxy_endpoint = "https://6mud7781pe.execute-api.ca-central-1.amazonaws.com/v1/bcsc/userinfo/test" prod_override_bcsc_userinfo_proxy_endpoint = "https://6mud7781pe.execute-api.ca-central-1.amazonaws.com/v1/bcsc/userinfo/prod" - dev_override_bcsc_token_proxy_endpoint = "https://6mud7781pe.execute-api.ca-central-1.amazonaws.com/v1/bcsc/token/dev" + dev_override_bcsc_token_proxy_endpoint = "https://xy7pk81p4h.execute-api.ca-central-1.amazonaws.com/v1/bcsc/token/dev" test_override_bcsc_token_proxy_endpoint = "https://6mud7781pe.execute-api.ca-central-1.amazonaws.com/v1/bcsc/token/test" prod_override_bcsc_token_proxy_endpoint = "https://6mud7781pe.execute-api.ca-central-1.amazonaws.com/v1/bcsc/token/prod" EOF