Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies = [
"babel",
"certifi",
"coloredlogs",
"ecdsa",
"cryptography",
"packaging",
"pathvalidate",
"pygments",
Expand Down
28 changes: 16 additions & 12 deletions pytr/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import asyncio
import base64
import hashlib
import json
import pathlib
import ssl
Expand All @@ -35,8 +34,9 @@
import certifi
import requests
import websockets
from ecdsa import NIST256p, SigningKey # type: ignore[import-untyped]
from ecdsa.util import sigencode_der # type: ignore[import-untyped]
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec

from pytr.utils import get_logger

Expand Down Expand Up @@ -116,7 +116,7 @@ def __init__(
self.keyfile = keyfile if keyfile else KEY_FILE
try:
with open(self.keyfile, "rb") as f:
self.sk = SigningKey.from_pem(f.read(), hashfunc=hashlib.sha512)
self.sk = serialization.load_pem_private_key(f.read(), password=None, backend=default_backend())
except FileNotFoundError:
pass

Expand All @@ -126,7 +126,7 @@ def __init__(
self._websession.cookies = MozillaCookieJar(self._cookies_file)

def initiate_device_reset(self):
self.sk = SigningKey.generate(curve=NIST256p, hashfunc=hashlib.sha512)
self.sk = ec.generate_private_key(ec.SECP256R1(), default_backend())

r = requests.post(
f"{self._host}/api/v1/auth/account/reset/device",
Expand All @@ -140,7 +140,10 @@ def complete_device_reset(self, token):
if not self._process_id and not self.sk:
raise ValueError("Initiate Device Reset first.")

pubkey_bytes = self.sk.get_verifying_key().to_string("uncompressed")
public_key = self.sk.public_key()
pubkey_bytes = public_key.public_bytes(
encoding=serialization.Encoding.X962, format=serialization.PublicFormat.UncompressedPoint
)
pubkey_string = base64.b64encode(pubkey_bytes).decode("ascii")

r = requests.post(
Expand All @@ -150,7 +153,12 @@ def complete_device_reset(self, token):
)
if r.status_code == 200:
with open(self.keyfile, "wb") as f:
f.write(self.sk.to_pem())
pem = self.sk.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
f.write(pem)

def login(self):
self.log.info("Logging in")
Expand All @@ -171,11 +179,7 @@ def _sign_request(self, url_path, payload=None, method="POST"):
ts = int(time.time() * 1000)
payload_string = json.dumps(payload) if payload else ""
signature_payload = f"{ts}.{payload_string}"
signature = self.sk.sign(
bytes(signature_payload, "utf-8"),
hashfunc=hashlib.sha512,
sigencode=sigencode_der,
)
signature = self.sk.sign(bytes(signature_payload, "utf-8"), ec.ECDSA(hashes.SHA512()))
signature_string = base64.b64encode(signature).decode("ascii")

headers = self._default_headers.copy()
Expand Down
Loading