Skip to content

Commit

Permalink
tmp move sign into volvooncall_cn.py
Browse files Browse the repository at this point in the history
  • Loading branch information
idreamshen committed Apr 14, 2024
1 parent 2247dad commit 9951e97
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 83 deletions.
82 changes: 0 additions & 82 deletions custom_components/volvooncall_cn/sign.py

This file was deleted.

86 changes: 85 additions & 1 deletion custom_components/volvooncall_cn/volvooncall_cn.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
from aiohttp import ClientSession, ClientTimeout, BasicAuth
from aiohttp.hdrs import METH_GET, METH_POST

from sign import sign_request
import hashlib
import hmac
import urllib.parse
from urllib.parse import urlparse
from datetime import datetime


_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -416,6 +421,84 @@ def transformlng(lng, lat):
ret += (150.0 * math.sin(lng / 12.0 * pi) + 300.0 *math.sin(lng / 30.0 * pi)) * 2.0 / 3.0
return ret


def hmac_sha256(key, msg):
return hmac.new(key.encode(), msg.encode(), hashlib.sha256).hexdigest()

def hex_encode_sha256_hash(data):
return hashlib.sha256(data.encode()).hexdigest()

def urlencode(string):
return urllib.parse.quote(string, safe='')

def find_header(headers, name):
for key, value in headers.items():
if key.lower() == name.lower():
return value
return None

def canonical_request(req, signed_headers):
payload_hash = find_header(req['headers'], 'x-sdk-content-sha256')
if payload_hash is None:
payload_hash = hex_encode_sha256_hash(req['body'] if req['body'] else '')

canonical_uri = '/'.join(urlencode(p) for p in req['uri'].split('/'))
if not canonical_uri.endswith('/'):
canonical_uri += '/'

query_string = sorted((k, v) for k, v in req['query'].items())
canonical_query_string = '&'.join(f"{urlencode(k)}={urlencode(v)}" for k, v in query_string)

canonical_headers = '\n'.join(f"{k}:{v.strip()}" for k in signed_headers for v in [req['headers'].get(k, '')])
if canonical_headers:
canonical_headers += '\n'

return f"{req['method']}\n{canonical_uri}\n{canonical_query_string}\n{canonical_headers}\n{';'.join(signed_headers)}\n{payload_hash}"

def string_to_sign(canonical_req, date_stamp, service='SDK-HMAC-SHA256'):
return f"{service}\n{date_stamp}\n{hex_encode_sha256_hash(canonical_req)}"

def create_signature(string_to_sign, secret_key):
return hmac_sha256(secret_key, string_to_sign)

def format_auth_header(signature, access_key, signed_headers):
return f"SDK-HMAC-SHA256 Access={access_key}, SignedHeaders={';'.join(signed_headers)}, Signature={signature}"

def generate_date_stamp():
return datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")

def sign_request(url, method, body):
parsed_url = urlparse(url)
request = {
'headers': {
"x-sdk-content-sha256": "UNSIGNED-PAYLOAD",
"host": "apigateway.digitalvolvo.com"
},
'method': "POST",
'body': body,
'uri': parsed_url.path,
'host': "apigateway.digitalvolvo.com",
'query': {}
}
key = "204114990"
secret = "bjGqb3TvEEZ8W8QhoyhEH4IenwCnc4JQ"
date = find_header(request['headers'], 'x-sdk-date')
if date is None:
date = generate_date_stamp()
request['headers']['x-sdk-date'] = date

if request['method'] not in ['PUT', 'PATCH', 'POST']:
request['body'] = ''

canonical_req = canonical_request(request, sorted([k.lower() for k in request['headers']]))
string_to_sign_val = string_to_sign(canonical_req, date)
signature = create_signature(string_to_sign_val, secret)

return {
'x-sdk-date': request['headers']['x-sdk-date'],
'v587sign': format_auth_header(signature, key, [k.lower() for k in request['headers']])
}

async def main():
logging.basicConfig(level=logging.DEBUG)
parser = argparse.ArgumentParser(
Expand All @@ -437,6 +520,7 @@ async def main():
await vehicle.update()
_LOGGER.debug(vehicle.__dict__)


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

0 comments on commit 9951e97

Please sign in to comment.