-
Notifications
You must be signed in to change notification settings - Fork 19
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Sync and Async clients Implementing parts of v6 API Removing all v5 and v5.1 API implementations Lacking documentation rewrite Builds on #53, #54, #56, #57, #58
- Loading branch information
Showing
17 changed files
with
745 additions
and
968 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
from typing import Optional, Tuple, Dict, Any, Awaitable | ||
|
||
import httpx | ||
|
||
from bankid.base import BankIDClientBaseclass | ||
from bankid.exceptions import get_json_error_class | ||
|
||
|
||
class BankIdAsyncClient(BankIDClientBaseclass): | ||
"""The asynchronous client to use for communicating with BankID servers via the v6 API. | ||
:param certificates: Tuple of string paths to the certificate to use and | ||
the key to sign with. | ||
:type certificates: tuple | ||
:param test_server: Use the test server for authenticating and signing. | ||
:type test_server: bool | ||
:param request_timeout: Timeout for BankID requests. | ||
:type request_timeout: int | ||
""" | ||
|
||
def __init__(self, certificates: Tuple[str, str], test_server: bool = False, request_timeout: Optional[int] = None): | ||
super().__init__(certificates, test_server, request_timeout) | ||
|
||
kwargs = { | ||
"cert": self.certs, | ||
"headers": {"Content-Type": "application/json"}, | ||
"verify": self.verify_cert, | ||
} | ||
if request_timeout: | ||
kwargs["timeout"] = request_timeout | ||
self.client = httpx.AsyncClient(**kwargs) | ||
|
||
async def authenticate( | ||
self, | ||
end_user_ip: str, | ||
requirement: Dict[str, Any] = None, | ||
user_visible_data: str = None, | ||
user_non_visible_data: str = None, | ||
user_visible_data_format: str = None, | ||
) -> Dict[str, str]: | ||
"""Request an authentication order. The :py:meth:`collect` method | ||
is used to query the status of the order. | ||
Example data returned: | ||
.. code-block:: json | ||
{ | ||
"orderRef": "ee3421ea-2096-4000-8130-82648efe0927", | ||
"autoStartToken": "e8df5c3c-c67b-4a01-bfe5-fefeab760beb", | ||
"qrStartToken": "01f94e28-857f-4d8a-bf8e-6c5a24466658", | ||
"qrStartSecret": "b4214886-3b5b-46ab-bc08-6862fddc0e06" | ||
} | ||
:param end_user_ip: The user IP address as seen by RP. String. IPv4 and IPv6 is allowed. | ||
:type end_user_ip: str | ||
:param requirement: Requirements on how the auth order must be performed. | ||
See the section `Requirements <https://www.bankid.com/en/utvecklare/guider/teknisk-integrationsguide/graenssnittsbeskrivning/auth>`_ for more details. | ||
:type requirement: dict | ||
:param user_visible_data: Text displayed to the user during authentication with BankID, | ||
with the purpose of providing context for the authentication and to enable users | ||
to detect identification errors and averting fraud attempts. | ||
:type user_visible_data: str | ||
:param user_non_visible_data: Data is not displayed to the user. | ||
:type user_non_visible_data: str | ||
:param user_visible_data_format: If present, and set to “simpleMarkdownV1”, | ||
this parameter indicates that userVisibleData holds formatting characters which | ||
potentially make for a more pleasant user experience. | ||
:type user_visible_data_format: str | ||
:return: The order response. | ||
:rtype: dict | ||
:raises BankIDError: raises a subclass of this error | ||
when error has been returned from server. | ||
""" | ||
data = self._create_payload( | ||
end_user_ip, | ||
requirement=requirement, | ||
user_visible_data=user_visible_data, | ||
user_non_visible_data=user_non_visible_data, | ||
user_visible_data_format=user_visible_data_format, | ||
) | ||
|
||
response = await self.client.post(self._auth_endpoint, json=data) | ||
|
||
if response.status_code == 200: | ||
return response.json() | ||
else: | ||
raise get_json_error_class(response) | ||
|
||
async def sign( | ||
self, | ||
end_user_ip, | ||
requirement: Dict[str, Any] = None, | ||
user_visible_data: str = None, | ||
user_non_visible_data: str = None, | ||
user_visible_data_format: str = None, | ||
) -> Dict[str, str]: | ||
"""Request a signing order. The :py:meth:`collect` method | ||
is used to query the status of the order. | ||
Example data returned: | ||
.. code-block:: json | ||
{ | ||
"orderRef": "ee3421ea-2096-4000-8130-82648efe0927", | ||
"autoStartToken": "e8df5c3c-c67b-4a01-bfe5-fefeab760beb", | ||
"qrStartToken": "01f94e28-857f-4d8a-bf8e-6c5a24466658", | ||
"qrStartSecret": "b4214886-3b5b-46ab-bc08-6862fddc0e06" | ||
} | ||
:param end_user_ip: The user IP address as seen by RP. String. IPv4 and IPv6 is allowed. | ||
:type end_user_ip: str | ||
:param requirement: Requirements on how the sign order must be performed. | ||
See the section `Requirements <https://www.bankid.com/en/utvecklare/guider/teknisk-integrationsguide/graenssnittsbeskrivning/sign>`_ for more details. | ||
:type requirement: dict | ||
:param user_visible_data: Text to be displayed to the user. | ||
:type user_visible_data: str | ||
:param user_non_visible_data: Data is not displayed to the user. | ||
:type user_non_visible_data: str | ||
:param user_visible_data_format: If present, and set to “simpleMarkdownV1”, | ||
this parameter indicates that userVisibleData holds formatting characters which | ||
potentially make for a more pleasant user experience. | ||
:type user_visible_data_format: str | ||
:return: The order response. | ||
:rtype: dict | ||
:raises BankIDError: raises a subclass of this error | ||
when error has been returned from server. | ||
""" | ||
data = self._create_payload( | ||
end_user_ip, | ||
requirement=requirement, | ||
user_visible_data=user_visible_data, | ||
user_non_visible_data=user_non_visible_data, | ||
user_visible_data_format=user_visible_data_format, | ||
) | ||
|
||
response = await self.client.post(self._sign_endpoint, json=data) | ||
|
||
if response.status_code == 200: | ||
return response.json() | ||
else: | ||
raise get_json_error_class(response) | ||
|
||
async def collect(self, order_ref: str) -> dict: | ||
"""Collects the result of a sign or auth order using the | ||
``orderRef`` as reference. | ||
RP should keep on calling collect every two seconds if status is pending. | ||
RP must abort if status indicates failed. The user identity is returned | ||
when complete. | ||
Example collect results returned while authentication or signing is | ||
still pending: | ||
.. code-block:: json | ||
{ | ||
"orderRef":"131daac9-16c6-4618-beb0-365768f37288", | ||
"status":"pending", | ||
"hintCode":"userSign" | ||
} | ||
Example collect result when authentication or signing has failed: | ||
.. code-block:: json | ||
{ | ||
"orderRef":"131daac9-16c6-4618-beb0-365768f37288", | ||
"status":"failed", | ||
"hintCode":"userCancel" | ||
} | ||
Example collect result when authentication or signing is successful | ||
and completed: | ||
.. code-block:: json | ||
{ | ||
"orderRef": "131daac9-16c6-4618-beb0-365768f37288", | ||
"status": "complete", | ||
"completionData": { | ||
"user": { | ||
"personalNumber": "190000000000", | ||
"name": "Karl Karlsson", | ||
"givenName": "Karl", | ||
"surname": "Karlsson" | ||
}, | ||
"device": { | ||
"ipAddress": "192.168.0.1" | ||
}, | ||
"bankIdIssueDate": "2020-02-01", | ||
"signature": "<base64-encoded data>", | ||
"ocspResponse": "<base64-encoded data>" | ||
} | ||
} | ||
See `BankID Integration Guide <https://www.bankid.com/en/utvecklare/guider/teknisk-integrationsguide/graenssnittsbeskrivning/collect>`_ | ||
for more details about how to inform end user of the current status, | ||
whether it is pending, failed or completed. | ||
:param order_ref: The ``orderRef`` UUID returned from auth or sign. | ||
:type order_ref: str | ||
:return: The CollectResponse parsed to a dictionary. | ||
:rtype: dict | ||
:raises BankIDError: raises a subclass of this error | ||
when error has been returned from server. | ||
""" | ||
response = await self.client.post(self._collect_endpoint, json={"orderRef": order_ref}) | ||
|
||
if response.status_code == 200: | ||
return response.json() | ||
else: | ||
raise get_json_error_class(response) | ||
|
||
async def cancel(self, order_ref: str) -> bool: | ||
"""Cancels an ongoing sign or auth order. | ||
This is typically used if the user cancels the order | ||
in your service or app. | ||
:param order_ref: The UUID string specifying which order to cancel. | ||
:type order_ref: str | ||
:return: Boolean regarding success of cancellation. | ||
:rtype: bool | ||
:raises BankIDError: raises a subclass of this error | ||
when error has been returned from server. | ||
""" | ||
response = await self.client.post(self._cancel_endpoint, json={"orderRef": order_ref}) | ||
|
||
if response.status_code == 200: | ||
return response.json() == {} | ||
else: | ||
raise get_json_error_class(response) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import base64 | ||
from datetime import datetime | ||
import hashlib | ||
import hmac | ||
from math import floor | ||
import time | ||
from typing import Tuple, Optional, Dict, Any | ||
from urllib.parse import urljoin | ||
|
||
from bankid.certutils import resolve_cert_path | ||
|
||
|
||
class BankIDClientBaseclass: | ||
"""Baseclass for BankID clients.""" | ||
|
||
def __init__( | ||
self, | ||
certificates: Tuple[str, str], | ||
test_server: bool = False, | ||
request_timeout: Optional[int] = None, | ||
): | ||
self.certs = certificates | ||
self._request_timeout = request_timeout | ||
|
||
if test_server: | ||
self.api_url = "https://appapi2.test.bankid.com/rp/v6.0/" | ||
self.verify_cert = resolve_cert_path("appapi2.test.bankid.com.pem") | ||
else: | ||
self.api_url = "https://appapi2.bankid.com/rp/v6.0/" | ||
self.verify_cert = resolve_cert_path("appapi2.bankid.com.pem") | ||
|
||
self._auth_endpoint = urljoin(self.api_url, "auth") | ||
self._sign_endpoint = urljoin(self.api_url, "sign") | ||
self._collect_endpoint = urljoin(self.api_url, "collect") | ||
self._cancel_endpoint = urljoin(self.api_url, "cancel") | ||
|
||
self.client = None | ||
|
||
@staticmethod | ||
def _encode_user_data(user_data): | ||
if isinstance(user_data, str): | ||
return base64.b64encode(user_data.encode("utf-8")).decode("ascii") | ||
else: | ||
return base64.b64encode(user_data).decode("ascii") | ||
|
||
@staticmethod | ||
def generate_qr_code_content(qr_start_token: str, start_t: [float, datetime], qr_start_secret: str): | ||
"""Given QR start token, time.time() or UTC datetime when initiated authentication call was made and the | ||
QR start secret, calculate the current QR code content to display. | ||
""" | ||
if isinstance(start_t, datetime): | ||
start_t = start_t.timestamp() | ||
elapsed_seconds_since_call = int(floor(time.time() - start_t)) | ||
qr_auth_code = hmac.new( | ||
qr_start_secret.encode(), | ||
msg=str(elapsed_seconds_since_call).encode(), | ||
digestmod=hashlib.sha256, | ||
).hexdigest() | ||
return f"bankid.{qr_start_token}.{elapsed_seconds_since_call}.{qr_auth_code}" | ||
|
||
def _create_payload( | ||
self, | ||
end_user_ip: str, | ||
requirement: Dict[str, Any] = None, | ||
user_visible_data: str = None, | ||
user_non_visible_data: str = None, | ||
user_visible_data_format: str = None, | ||
): | ||
data = {"endUserIp": end_user_ip} | ||
if requirement and isinstance(requirement, dict): | ||
data["requirement"] = requirement | ||
if user_visible_data: | ||
data["userVisibleData"] = self._encode_user_data(user_visible_data) | ||
if user_non_visible_data: | ||
data["userNonVisibleData"] = self._encode_user_data(user_non_visible_data) | ||
if user_visible_data_format and user_visible_data_format == "simpleMarkdownV1": | ||
data["userVisibleDataFormat"] = "simpleMarkdownV1" | ||
return data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.