From b078d35a58ebce966ff5a5874b4ca69081c57d73 Mon Sep 17 00:00:00 2001 From: Antoni Czaplicki <56671347+Antoni-Czaplicki@users.noreply.github.com> Date: Tue, 14 Jan 2025 14:11:10 +0100 Subject: [PATCH] Include request signer in libray (#149) * Include request signer in libray * Replace .format with f-strings --------- Co-authored-by: Kacper Ziubryniewicz --- requirements.txt | 10 ++--- vulcan/_api.py | 2 +- vulcan/_keystore.py | 2 +- vulcan/_request_signer.py | 94 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 101 insertions(+), 7 deletions(-) create mode 100644 vulcan/_request_signer.py diff --git a/requirements.txt b/requirements.txt index 6ed7aae..0385123 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ related-without-future~=0.7.4 -aenum~=3.1.11 -aiohttp~=3.9.1 -yarl~=1.9.4 -pytz~=2024.1 -uonet-request-signer-hebe~=0.1.1 +aenum~=3.1.15 +aiohttp~=3.11.11 +yarl~=1.18.3 +pytz~=2024.2 +cryptography~=44.0.0 diff --git a/vulcan/_api.py b/vulcan/_api.py index 1fd29ca..3cef950 100644 --- a/vulcan/_api.py +++ b/vulcan/_api.py @@ -4,7 +4,6 @@ from typing import Union import aiohttp -from uonet_request_signer_hebe import get_signature_values from yarl import URL from ._api_helper import ApiHelper @@ -18,6 +17,7 @@ VulcanAPIException, ) from ._keystore import Keystore +from ._request_signer import get_signature_values from ._utils import ( APP_NAME, APP_OS, diff --git a/vulcan/_keystore.py b/vulcan/_keystore.py index d2db2c3..63e0a88 100644 --- a/vulcan/_keystore.py +++ b/vulcan/_keystore.py @@ -1,8 +1,8 @@ # -*- coding: utf-8 -*- from related import StringField, immutable -from uonet_request_signer_hebe import generate_key_pair +from ._request_signer import generate_key_pair from ._utils import default_device_model, get_firebase_token, log from .model import Serializable diff --git a/vulcan/_request_signer.py b/vulcan/_request_signer.py new file mode 100644 index 0000000..04f8e58 --- /dev/null +++ b/vulcan/_request_signer.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- + +import base64 +import hashlib +import json +import re +import urllib + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa +from cryptography.hazmat.primitives.serialization import load_der_private_key + + +def get_encoded_path(full_url): + path = re.search(r"(api/mobile/.+)", full_url) + if path is None: + raise ValueError( + "The URL does not seem correct (does not match `(api/mobile/.+)` regex)" + ) + return urllib.parse.quote(path[1], safe="").lower() + + +def get_digest(body): + if not body: + return None + + m = hashlib.sha256() + m.update(bytes(body, "utf-8")) + return base64.b64encode(m.digest()).decode("utf-8") + + +def get_headers_list(body, digest, canonical_url, timestamp): + sign_data = [ + ["vCanonicalUrl", canonical_url], + ["Digest", digest] if body else None, + ["vDate", timestamp.strftime("%a, %d %b %Y %H:%M:%S GMT")], + ] + + return ( + " ".join(item[0] for item in sign_data if item), + "".join(item[1] for item in sign_data if item), + ) + + +def get_signature(data, private_key): + data_str = json.dumps(data) if isinstance(data, (dict, list)) else str(data) + private_key = load_der_private_key( + base64.b64decode(private_key), password=None, backend=default_backend() + ) + signature = private_key.sign( + bytes(data_str, "utf-8"), padding.PKCS1v15(), hashes.SHA256() + ) + return base64.b64encode(signature).decode("utf-8") + + +def get_signature_values(fingerprint, private_key, body, full_url, timestamp): + canonical_url = get_encoded_path(full_url) + digest = get_digest(body) + headers, values = get_headers_list(body, digest, canonical_url, timestamp) + signature = get_signature(values, private_key) + + return ( + f"SHA-256={digest}" if digest else None, + canonical_url, + f'keyId="{fingerprint}",headers="{headers}",algorithm="sha256withrsa",signature=Base64(SHA256withRSA({signature}))', + ) + + +def pem_getraw(pem): + return pem.decode("utf-8").replace("\n", "").split("-----")[2] + + +def generate_key_pair(): + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=2048, backend=default_backend() + ) + private_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + public_key = private_key.public_key() + public_pem = public_key.public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + # Compute fingerprint + fingerprint = hashes.Hash(hashes.SHA1(), backend=default_backend()) + fingerprint.update(public_pem) + fingerprint_hex = fingerprint.finalize().hex() + + return pem_getraw(public_pem), fingerprint_hex, pem_getraw(private_pem)