-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(wip): work in progress experimentation on API style
- Loading branch information
Showing
11 changed files
with
278 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
--- | ||
title: Exceptions | ||
--- | ||
# Exceptions | ||
|
||
::: federatedidentity.exceptions |
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
--- | ||
title: OpenID Connect | ||
--- | ||
# OpenID Connect (OIDC) | ||
|
||
::: federatedidentity.oidc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,6 @@ | ||
from .exceptions import ( | ||
FederatedIdentityError, | ||
InvalidClaimsError, | ||
InvalidIssuerError, | ||
InvalidJWKSUrlError, | ||
InvalidOIDCDiscoveryDocumentError, | ||
InvalidTokenError, | ||
TransportError, | ||
) | ||
from .oidc import AsyncOIDCTokenIssuer, OIDCTokenIssuer | ||
from ._verify import async_verify_oidc_token, verify_oidc_token | ||
|
||
__all__ = [ | ||
"FederatedIdentityError", | ||
"InvalidClaimsError", | ||
"InvalidIssuerError", | ||
"InvalidJWKSUrlError", | ||
"InvalidOIDCDiscoveryDocumentError", | ||
"InvalidTokenError", | ||
"TransportError", | ||
"OIDCTokenIssuer", | ||
"AsyncOIDCTokenIssuer", | ||
"verify_oidc_token", | ||
"async_verify_oidc_token", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
""" | ||
Utility functions for asyncio. | ||
""" | ||
|
||
from collections.abc import Awaitable | ||
from inspect import isawaitable | ||
from typing import TypeVar, Union | ||
|
||
_T = TypeVar("_T") | ||
|
||
|
||
async def make_awaitable(v: _T) -> _T: | ||
""" | ||
Wrap a constant value into an awaitable. | ||
""" | ||
return v | ||
|
||
|
||
def ensure_awaitable(v: Union[_T, Awaitable[_T]]) -> Awaitable[_T]: | ||
""" | ||
Return argument if it is awaitable otherwise wrap it as an awaitable. | ||
""" | ||
return v if isawaitable(v) else make_awaitable(v) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from collections.abc import Awaitable, Callable, Iterable | ||
from typing import Any, Optional, Union | ||
|
||
from ._async_helpers import ensure_awaitable | ||
|
||
__all__ = [ | ||
"verify_oidc_token", | ||
"async_verify_oidc_token", | ||
] | ||
|
||
|
||
def verify_oidc_token( | ||
token: Union[str, bytes], | ||
issuers: Iterable[Union[str, Callable[[str], bool]]], | ||
*, | ||
claims: Optional[Iterable[Union[dict[str, Any], Callable[[dict[str, Any]], None]]]] = None, | ||
request: Optional[Callable[[str], bytes]] = None, | ||
) -> dict[str, Any]: | ||
""" | ||
Verify an OIDC identity token. | ||
Returns: | ||
the token's claims dictionary. | ||
Parameters: | ||
token: OIDC token to verify. If a string is passed it is first encoded using the ASCII | ||
codec before verification. | ||
issuers: Iterable of acceptable issuers. An issuer may be a static string to match or a | ||
callable used. A match callable should return True if the issuer is acceptable or False | ||
otherwise. At least one issuer must match the token's issuer for the token to be | ||
verified. | ||
claims: Iterable of claim verifiers. A claim verifier may be a dictionary of | ||
acceptable claim values or a callable which takes the claims dictionary. A claims | ||
verifier callable should raise [federatedidentity.exceptions.InvalidClaimsError][] if | ||
the claims do not match the expected values. If omitted only claims required by OIDC | ||
are validated. | ||
request: Callable used to fetch issuer discovery documents. If omitted a default | ||
implementation based on the [requests][] library is used. | ||
Raises: | ||
federatedidentity.exceptions.FederatedIdentityError: the token could not be verified. | ||
""" | ||
raise NotImplementedError() | ||
|
||
|
||
async def async_verify_oidc_token( | ||
token: Union[str, bytes], | ||
issuers: Iterable[Union[str, Awaitable[str], Callable[[str], Union[bool, Awaitable[bool]]]]], | ||
*, | ||
claims: Optional[ | ||
Iterable[Union[dict[str, Any], Callable[[dict[str, Any]], Union[None, Awaitable[None]]]]] | ||
] = None, | ||
request: Optional[Callable[[str], bytes | Awaitable[bytes]]] = None, | ||
) -> dict[str, Any]: | ||
""" | ||
Asynchronously verify an OIDC identity token. | ||
Returns: | ||
the token's claims dictionary. | ||
Parameters: | ||
token: OIDC token to verify. If a string is passed it is first encoded using the ASCII | ||
codec before verification. | ||
issuers: Iterable of acceptable issuers. An issuer may be a static string to match or a | ||
callable used. A match callable should return True if the issuer is acceptable or False | ||
otherwise. At least one issuer must match the token's issuer for the token to be | ||
verified. Callables may be asynchronous. | ||
claims: Iterable of claim verifiers. A claim verifier may be a dictionary of acceptable | ||
claim values or a callable which takes the claims dictionary. A claims verifier | ||
callable should raise [federatedidentity.exceptions.InvalidClaimsError][] if the claims | ||
do not match the expected values. If omitted only claims required by OIDC are | ||
validated. | ||
request: Callable used to fetch issuer discovery documents. If omitted a default | ||
implementation based on the [requests][] library is used. | ||
Raises: | ||
federatedidentity.exceptions.FederatedIdentityError: the token could not be verified. | ||
""" | ||
raise NotImplementedError() | ||
|
||
|
||
def _any_issuer_matches(issuer: str, issuers: Iterable[Union[str, Callable[[str], bool]]]) -> bool: | ||
for element in issuers: | ||
if callable(element) and element(issuer): | ||
return True | ||
if element == issuer: | ||
return True | ||
return False | ||
|
||
|
||
async def _async_any_issuer_matches( | ||
issuer: str, | ||
issuers: Iterable[Union[str, Awaitable[str], Callable[[str], Union[bool, Awaitable[bool]]]]], | ||
) -> bool: | ||
for element in issuers: | ||
if callable(element) and await ensure_awaitable(element(issuer)): | ||
return True | ||
if (await ensure_awaitable(element)) == issuer: | ||
return True | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
import pytest | ||
|
||
from federatedidentity._async_helpers import ensure_awaitable | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_ensure_non_awaitable(faker): | ||
"ensure_awaitable() converts a non-awaitable to awaitable" | ||
v = faker.slug() | ||
assert (await ensure_awaitable(v)) is v | ||
|
||
|
||
@pytest.mark.asyncio | ||
async def test_ensure_awaitable(faker): | ||
"ensure_awaitable() returns an awaitable as-is" | ||
v = faker.slug() | ||
|
||
async def f(): | ||
return v | ||
|
||
assert (await ensure_awaitable(f())) is v |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.