diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e335453..9c7889b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,6 +1,6 @@ # Contributing to ASGI Claim Validator -Thanks for thinking about contributing to ASGI Request Duration! I welcome your help and can't wait to see what you'll bring to the table. +Thanks for thinking about contributing to ASGI Claim Validator! I welcome your help and can't wait to see what you'll bring to the table. ## How to Contribute diff --git a/README.md b/README.md index 49fc451..e8d6335 100644 --- a/README.md +++ b/README.md @@ -8,58 +8,98 @@ [![Build Status build/pypi](https://img.shields.io/github/actions/workflow/status/feteu/asgi-claim-validator/publish-pypi.yaml?label=publish-pypi)](https://github.com/feteu/asgi-claim-validator/actions/workflows/publish-pypi.yaml) [![Build Status test](https://img.shields.io/github/actions/workflow/status/feteu/asgi-claim-validator/test.yaml?label=test)](https://github.com/feteu/asgi-claim-validator/actions/workflows/test.yaml) -# asgi-claim-validator +# asgi-claim-validator ๐Ÿš€ A focused ASGI middleware for validating additional claims within JWT tokens to enhance token-based workflows. -## Overview +> **Note:** If you find this project useful, please consider giving it a star โญ on GitHub. This helps prioritize its maintenance and development. If you encounter any typos, bugs ๐Ÿ›, or have new feature requests, feel free to open an issue. I will be happy to address them. + + +## Table of Contents ๐Ÿ“‘ + +1. [Overview ๐Ÿ“–](#overview-) + 1. [Purpose ๐ŸŽฏ](#purpose-) + 2. [Key Features โœจ](#key-features-) + 3. [Use Cases ๐Ÿ’ก](#use-cases-) + 4. [Compatibility ๐Ÿค](#compatibility-) +2. [Installation ๐Ÿ› ๏ธ](#installation-) +3. [Usage ๐Ÿ“š](#usage-) + 1. [Basic Usage ๐ŸŒŸ](#basic-usage) + 2. [Configuration โš™๏ธ](#configuration-) + 3. [Error Handlers ๐Ÿšจ](#error-handlers-) +4. [Examples ๐Ÿ“](#examples-) +5. [Testing ๐Ÿงช](#testing-) +6. [Contributing ๐Ÿค](#contributing-) +7. [License ๐Ÿ“œ](#license-) + + +## Overview ๐Ÿ“– `asgi-claim-validator` is an ASGI middleware designed to validate additional claims within JWT tokens. Built in addition to the default JWT verification implementation of Connexion, it enhances token-based workflows by ensuring that specific claims are present and meet certain criteria before allowing access to protected endpoints. This middleware allows consumers to validate claims on an endpoint/method level and is compatible with popular ASGI frameworks such as Starlette, FastAPI, and Connexion. -## Features +### Purpose ๐ŸŽฏ + +The primary purpose of `asgi-claim-validator` is to provide an additional layer of security by validating specific claims within JWT tokens. This ensures that only requests with valid and authorized tokens can access protected resources. The middleware is highly configurable, allowing developers to define essential claims, allowed values, and whether blank values are permitted. It also supports path and method filtering, enabling claim validation to be applied selectively based on the request path and HTTP method. + +### Key Features โœจ - **Claim Validation**: Validate specific claims within JWT tokens, such as `sub`, `iss`, `aud`, `exp`, `iat`, and `nbf`. - **Customizable Claims**: Define essential claims, allowed values, and whether blank values are permitted. - **Path and Method Filtering**: Apply claim validation to specific paths and HTTP methods. - **Exception Handling**: Integrate with custom exception handlers to provide meaningful error responses. - **Logging**: Log validation errors for debugging and monitoring purposes. +- **Flexible Configuration**: Easily configure the middleware using a variety of options to suit different use cases. +- **Middleware Positioning**: Integrate the middleware at different positions within the ASGI application stack. +- **Token Extraction**: Extract tokens from various parts of the request, such as headers, cookies, or query parameters. +- **Custom Claim Validators**: Implement custom claim validation logic by providing your own validation functions. +- **Support for Multiple Frameworks**: Compatible with popular ASGI frameworks such as Starlette, FastAPI, and Connexion. +- **Performance Optimization**: Efficiently handle claim validation with minimal impact on request processing time. +- **Extensive Test Coverage**: Comprehensive test suite to ensure reliability and correctness of the middleware. + +### Use Cases ๐Ÿ’ก -## Installation +- **API Security**: Enhance the security of your API by ensuring that only requests with valid JWT tokens and specific claims can access protected endpoints. +- **Role-Based Access Control**: Implement role-based access control by validating claims that represent user roles and permissions. +- **Compliance**: Ensure compliance with security policies by enforcing the presence and validity of specific claims within JWT tokens. +- **Custom Authentication Logic**: Implement custom authentication logic by providing your own claim validation functions. -Install the package using pip: +### Compatibility ๐Ÿค + +`asgi-claim-validator` is compatible with popular ASGI frameworks such as Starlette, FastAPI, and Connexion. It can be easily integrated into existing ASGI applications and configured to suit various use cases and requirements. + +By using `asgi-claim-validator`, you can enhance the security and flexibility of your token-based authentication workflows, ensuring that only authorized requests can access your protected resources. + + +## Installation ๐Ÿ› ๏ธ + +To install the `asgi-claim-validator` package, use the following pip command: ```sh pip install asgi-claim-validator ``` -## Usage -### Basic Usage +## Usage ๐Ÿ“š -Here's an example of how to use `asgi-claim-validator` with Starlette: +### Basic Usage ๐ŸŒŸ -```python -from starlette.applications import Starlette -from starlette.requests import Request -from starlette.responses import JSONResponse -from starlette.routing import Route -from asgi_claim_validator import ClaimValidatorMiddleware +Below is an example of how to integrate `ClaimValidatorMiddleware` with a Connexion application. This middleware validates specific claims within JWT tokens for certain endpoints. -async def secured_endpoint(request: Request) -> JSONResponse: - return JSONResponse({"message": "secured"}) +The `ClaimValidatorMiddleware` requires several parameters to function correctly. The `claims_callable` parameter is a callable that extracts token information from the Connexion context. This parameter must be specified and is typically dependent on the framework being used. The `secured` parameter is a dictionary that defines the secured paths and the claims that need to be validated. For instance, in the provided example, the `/secured` path requires the `sub` claim to be `admin` and the `iss` claim to be `https://example.com` for GET requests. The `skipped` parameter is a dictionary that specifies the paths and methods that should be excluded from validation. In the example, the `/skipped` path is skipped for GET requests. + +```python +from asgi_claim_validator.middleware import ClaimValidatorMiddleware +from connexion import AsyncApp -app = Starlette(routes=[ - Route("/secured", secured_endpoint, methods=["GET"]), -]) +# Create a Connexion application +app = AsyncApp(__name__, specification_dir="spec") +# Add the ClaimValidatorMiddleware app.add_middleware( ClaimValidatorMiddleware, - claims_callable=lambda: { - "sub": "admin", - "iss": "https://example.com", - }, + claims_callable=lambda scope: scope["extensions"]["connexion_context"]["token_info"], secured={ - "^/secured$": { + "^/secured/?$": { "GET": { "sub": { "essential": True, @@ -72,52 +112,135 @@ app.add_middleware( "values": ["https://example.com"], }, }, - } + }, + }, + skipped={ + "^/skipped/?$": ["GET"], }, ) ``` -## Advanced Usage -### Custom Exception Handlers +### Configuration โš™๏ธ + +The `ClaimValidatorMiddleware` requires two main configuration pieces: `secured` and `skipped`. These configurations are validated using JSON schemas to ensure correctness. + +> **Note:** The path regex patterns provided in the `secured` and `skipped` parameters will be automatically escaped by the middleware. + +#### Secured Configuration + +The `secured` configuration is a dictionary that defines the paths and the claims that need to be validated. Each path is associated with a dictionary of HTTP methods, and each method is associated with a dictionary of claims. Each claim can have the following properties: +- `essential`: A boolean indicating whether the claim is essential. +- `allow_blank`: A boolean indicating whether blank values are allowed. +- `values`: A list of allowed values for the claim. + +Example: +```python +secured={ + "^/secured/?$": { + "GET": { + "sub": { + "essential": True, + "allow_blank": False, + "values": ["admin"], + }, + "iss": { + "essential": True, + "allow_blank": False, + "values": ["https://example.com"], + }, + }, + }, +} +``` + +#### Skipped Configuration + +The `skipped` configuration is a dictionary that defines the paths and methods that should be excluded from validation. Each path is associated with a list of HTTP methods. -Integrate `asgi-claim-validator` with custom exception handlers to provide meaningful error responses. Below are examples for Starlette and Connexion. Refer to the specific framework examples in the [examples](examples) directory for detailed implementation. +Example: +```python +skipped={ + "^/skipped/?$": ["GET"], +} +``` -### Middleware Configuration +#### JSON Schema Validation -Configure the middleware with the following options: +Both `secured` and `skipped` configurations are validated using JSON schemas to ensure their correctness. This validation helps catch configuration errors early and ensures that the middleware behaves as expected. -- **claims_callable**: A callable that returns the JWT claims to be validated. -- **secured**: A dictionary defining the paths and methods that require claim validation. -- **skipped**: A dictionary defining the paths and methods to be excluded from claim validation. -- **raise_on_unspecified_path**: Raise an exception if the path is not specified in the `secured` or `skipped` dictionaries. -- **raise_on_unspecified_method**: Raise an exception if the method is not specified for a secured path. -### Claim Validation Options +### Error Handlers ๐Ÿšจ -Configure claims with the following options: +To handle exceptions raised by this middleware, you can configure your framework (such as Starlette or Connexion) to catch and process them dynamically. For security reasons, the exception messages are kept generic, but you can customize them using the exception parameters. -- **essential**: Indicates if the claim is essential (default: `False`). -- **allow_blank**: Indicates if blank values are allowed (default: `True`). -- **values**: A list of allowed values for the claim. +#### Connexion + +```python +from asgi_claim_validator import ClaimValidatorMiddleware, ClaimValidatorException +from connexion import AsyncApp +from connexion.lifecycle import ConnexionRequest, ConnexionResponse -## Examples +# [...] + +def claim_validator_error_handler(request: ConnexionRequest, exc: ClaimValidatorException) -> ConnexionResponse: + return problem(detail=exc.detail, status=exc.status, title=exc.title) + +app = AsyncApp(__name__, specification_dir="spec") +app.add_error_handler(ClaimValidatorException, claim_validator_error_handler) + +# [...] +``` + +#### Starlette + +```python +from asgi_claim_validator import ClaimValidatorMiddleware, ClaimValidatorException +from starlette.applications import Starlette +from starlette.requests import Request +from starlette.responses import JSONResponse + +# [...] + +async def claim_validator_error_handler(request: Request, exc: ClaimValidatorException) -> JSONResponse: + return JSONResponse({"error": f"{exc.title}"}, status_code=exc.status) + +exception_handlers = { + ClaimValidatorException: claim_validator_error_handler +} + +app = Starlette(routes=routes, exception_handlers=exception_handlers) + +# [...] +``` + +## Examples ๐Ÿ“ ### Starlette Example -Refer to the [app.py](examples/starlette/simple/app.py) file for a complete example using Starlette. +To see a complete example using Starlette, refer to the [app.py](examples/starlette/simple/app.py) file. ### Connexion Example -Refer to the [app.py](examples/connexion/simple/app.py) file for a complete example using Connexion. +Check out the [app.py](examples/connexion/simple/app.py) file for a simple example using Connexion. For a comprehensive example that demonstrates automatic extraction and validation of token claims with Connexion, see the [app.py](examples/connexion/complex/app.py) file. -## Testing +## Testing ๐Ÿงช Run the tests using `pytest`: ```sh poetry run pytest ``` -## Contributing +### Scope: + +- **Middleware Functionality**: Ensures correct validation of JWT claims and proper handling of secured and skipped paths. +- **Exception Handling**: Verifies that custom exceptions are raised and handled appropriately. +- **Configuration Validation**: Checks the correctness of middleware configuration for secured and skipped paths. +- **Integration with Frameworks**: Confirms seamless integration with ASGI frameworks like Starlette and Connexion. +- **Custom Claim Validators**: Tests the implementation and usage of custom claim validation logic. + + +## Contributing ๐Ÿค Contributions are welcome! Please refer to the [CONTRIBUTING.md](CONTRIBUTING.md) file for guidelines on how to contribute to this project. -## License + +## License ๐Ÿ“œ This project is licensed under the GNU GPLv3 License. See the [LICENSE](LICENSE) file for more details. \ No newline at end of file diff --git a/asgi_claim_validator/__init__.py b/asgi_claim_validator/__init__.py index 349f1b3..1fe9078 100644 --- a/asgi_claim_validator/__init__.py +++ b/asgi_claim_validator/__init__.py @@ -1,21 +1,35 @@ -from asgi_claim_validator.decorators import validate_claims_callable from asgi_claim_validator.exceptions import ( ClaimValidatorException, + InvalidClaimsConfigurationException, InvalidClaimsTypeException, InvalidClaimValueException, + InvalidSecuredConfigurationException, + InvalidSkippedConfigurationException, MissingEssentialClaimException, UnauthenticatedRequestException, UnspecifiedMethodAuthenticationException, UnspecifiedPathAuthenticationException, ) from asgi_claim_validator.middleware import ClaimValidatorMiddleware -from asgi_claim_validator.types import SecuredCompiledType, SecuredType, SkippedCompiledType, SkippedType +from asgi_claim_validator.types import ( + ClaimsCallableType, + ClaimsType, + SecuredCompiledType, + SecuredType, + SkippedCompiledType, + SkippedType, +) __all__ = ( + "ClaimsCallableType", + "ClaimsType", "ClaimValidatorException", "ClaimValidatorMiddleware", + "InvalidClaimsConfigurationException", "InvalidClaimsTypeException", "InvalidClaimValueException", + "InvalidSecuredConfigurationException", + "InvalidSkippedConfigurationException", "MissingEssentialClaimException", "SecuredCompiledType", "SecuredType", @@ -24,5 +38,4 @@ "UnauthenticatedRequestException", "UnspecifiedMethodAuthenticationException", "UnspecifiedPathAuthenticationException", - "validate_claims_callable", ) \ No newline at end of file diff --git a/asgi_claim_validator/constants.py b/asgi_claim_validator/constants.py index 6828112..47b1860 100644 --- a/asgi_claim_validator/constants.py +++ b/asgi_claim_validator/constants.py @@ -14,7 +14,7 @@ "TRACE", ] _DEFAULT_ALL_HTTP_METHODS_REGEX_GROUP: str = f"({'|'.join(map(escape, (*_DEFAULT_ALL_HTTP_METHODS, *_DEFAULT_ANY_HTTP_METHODS)))})" -_DEFAULT_CLAIMS_CALLABLE: ClaimsCallableType = lambda: dict() +_DEFAULT_CLAIMS_CALLABLE: ClaimsCallableType = lambda scope: scope.get("", dict()) _DEFAULT_RAISE_ON_INVALID_CLAIM: bool = True _DEFAULT_RAISE_ON_INVALID_CLAIMS_TYPE: bool = True _DEFAULT_RAISE_ON_MISSING_CLAIM: bool = True diff --git a/asgi_claim_validator/decorators.py b/asgi_claim_validator/decorators.py index 4c59e0e..e632f11 100644 --- a/asgi_claim_validator/decorators.py +++ b/asgi_claim_validator/decorators.py @@ -16,7 +16,13 @@ log = getLogger(__name__) def validate_claims_callable() -> Callable: - def decorator(func) -> Callable: + """ + Decorator to validate the claims_callable attribute of a class. + + Raises: + InvalidClaimsConfigurationException: If claims_callable is not a callable. + """ + def decorator(func: Callable) -> Callable: def wrapper(self, *args, **kwargs) -> Callable: claims = getattr(self, 'claims_callable', _DEFAULT_CLAIMS_CALLABLE) if not isinstance(claims, Callable): @@ -26,7 +32,13 @@ def wrapper(self, *args, **kwargs) -> Callable: return decorator def validate_secured() -> Callable: - def decorator(func) -> Callable: + """ + Decorator to validate the secured attribute of a class against a JSON schema. + + Raises: + InvalidSecuredConfigurationException: If the secured attribute does not conform to the schema. + """ + def decorator(func: Callable) -> Callable: def wrapper(self, *args, **kwargs) -> Callable: secured = getattr(self, 'secured', None) try: @@ -39,7 +51,13 @@ def wrapper(self, *args, **kwargs) -> Callable: return decorator def validate_skipped() -> Callable: - def decorator(func) -> Callable: + """ + Decorator to validate the skipped attribute of a class against a JSON schema. + + Raises: + InvalidSkippedConfigurationException: If the skipped attribute does not conform to the schema. + """ + def decorator(func: Callable) -> Callable: def wrapper(self, *args, **kwargs) -> Callable: skipped = getattr(self, 'skipped', None) try: diff --git a/asgi_claim_validator/middleware.py b/asgi_claim_validator/middleware.py index 72ec515..ad18daa 100644 --- a/asgi_claim_validator/middleware.py +++ b/asgi_claim_validator/middleware.py @@ -47,7 +47,7 @@ class ClaimValidatorMiddleware: Attributes: app (ASGIApp): The ASGI application. - claims_callable (ClaimsCallableType): A callable that returns the claims. + claims_callable (ClaimsCallableType): A callable that receives the current call scope and returns the claims as a dict. raise_on_invalid_claim (bool): Flag to raise an exception on invalid claims. raise_on_invalid_claims_type (bool): Flag to raise an exception on invalid claims type. raise_on_missing_claim (bool): Flag to raise an exception on missing claims. @@ -82,6 +82,20 @@ class ClaimValidatorMiddleware: def __post_init__(self) -> None: """ Post-initialization method to compile regular expressions for secured and skipped paths. + + This method is called after the object is initialized. It compiles the regular expressions + for the paths specified in the `secured` and `skipped` attributes, and associates them with + their corresponding HTTP methods and claims. + + Attributes: + re_flags (int): Regular expression flags, set to IGNORECASE if `re_ignorecase` is True, otherwise NOFLAG. + secured_compiled (dict): A dictionary where keys are compiled regular expressions for secured paths, + and values are dictionaries mapping HTTP methods to their corresponding claims. + skipped_compiled (dict): A dictionary where keys are compiled regular expressions for skipped paths, + and values are sets of HTTP methods in uppercase. + + Raises: + ValueError: If there is an invalid regular expression in the `secured` or `skipped` paths. """ try: self.re_flags = IGNORECASE if self.re_ignorecase else NOFLAG @@ -124,7 +138,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: method = scope["method"].upper() path = scope["path"] - claims = self.claims_callable() + claims = self.claims_callable(scope) # Check if the request path matches any skipped path patterns and if the request method is allowed for that path. # If both conditions are met, forward the request to the next middleware or application. @@ -180,7 +194,7 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: raise InvalidClaimValueException(path=path, method=method, claims=claims) except Exception as e: log.error(f"Unexpected error during claim validation: {e}") - raise + raise e await self.app(scope, receive, send) return diff --git a/asgi_claim_validator/types.py b/asgi_claim_validator/types.py index 9411910..717cb61 100644 --- a/asgi_claim_validator/types.py +++ b/asgi_claim_validator/types.py @@ -1,10 +1,11 @@ from collections.abc import Callable from joserfc.jwt import ClaimsOption, Claims from re import Pattern +from starlette.types import Scope SecuredCompiledType = dict[Pattern, dict[str, dict[str, ClaimsOption]]] SecuredType = dict[str, dict[str, dict[str, ClaimsOption]]] SkippedCompiledType = dict[Pattern, set[str]] SkippedType = dict[str, list[str]] ClaimsType = Claims -ClaimsCallableType = Callable[..., Claims] \ No newline at end of file +ClaimsCallableType = Callable[[Scope], Claims] \ No newline at end of file diff --git a/examples/connexion/complex/api/blocked.py b/examples/connexion/complex/api/blocked.py new file mode 100644 index 0000000..47b945c --- /dev/null +++ b/examples/connexion/complex/api/blocked.py @@ -0,0 +1,7 @@ +from connexion.lifecycle import ConnexionResponse + +def search() -> ConnexionResponse: + data = {"status": "blocked"} + status_code = 200 + headers = {"Content-Type": "application/json"} + return data, status_code, headers \ No newline at end of file diff --git a/examples/connexion/complex/api/secured.py b/examples/connexion/complex/api/secured.py new file mode 100644 index 0000000..e0a548d --- /dev/null +++ b/examples/connexion/complex/api/secured.py @@ -0,0 +1,12 @@ +from connexion import context +from connexion.lifecycle import ConnexionResponse + +def search() -> ConnexionResponse: + data = { + "status": "secured", + "token_info": context.context["token_info"], + "user": context.context["user"], + } + status_code = 200 + headers = {"Content-Type": "application/json"} + return data, status_code, headers \ No newline at end of file diff --git a/examples/connexion/complex/api/skipped.py b/examples/connexion/complex/api/skipped.py new file mode 100644 index 0000000..e0a548d --- /dev/null +++ b/examples/connexion/complex/api/skipped.py @@ -0,0 +1,12 @@ +from connexion import context +from connexion.lifecycle import ConnexionResponse + +def search() -> ConnexionResponse: + data = { + "status": "secured", + "token_info": context.context["token_info"], + "user": context.context["user"], + } + status_code = 200 + headers = {"Content-Type": "application/json"} + return data, status_code, headers \ No newline at end of file diff --git a/examples/connexion/complex/app.py b/examples/connexion/complex/app.py new file mode 100644 index 0000000..37352b6 --- /dev/null +++ b/examples/connexion/complex/app.py @@ -0,0 +1,80 @@ +from connexion import AsyncApp, RestyResolver +from connexion.exceptions import problem +from connexion.lifecycle import ConnexionRequest, ConnexionResponse +from connexion.middleware import MiddlewarePosition +from joserfc import jwt, jwk +from logging import getLogger +from time import time +from uvicorn import run +from asgi_claim_validator import ClaimValidatorMiddleware +from asgi_claim_validator.exceptions import ClaimValidatorException + +log = getLogger("uvicorn.error") + +JWT_ALG = "HS256" +JWT_TYP = "JWT" +JWT_TTL = 3600 +JWT_SUB = "subject" +JWT_ISS = "local.asgi_claim_validator" +JWT_SEC = "WW91IGZvdW5kIGEgZWFzdGVyIGVnZywgZW5qb3kgdGhlIGRheSE=" + +def generate_token() -> str: + timestamp = time() + header = { + "alg": JWT_ALG, + "typ": JWT_TYP, + } + claims = { + "iss": JWT_ISS, + "iat": int(timestamp), + "exp": int(timestamp + JWT_TTL), + "sub": JWT_SUB, + } + key = jwk.OctKey.import_key(JWT_SEC) + text = jwt.encode(header, claims, key) + return text + +def validate_token(token: str) -> dict: + text = token + key = jwk.OctKey.import_key(JWT_SEC) + token = jwt.decode(text, key) + return token.claims + +claims_callable = lambda scope: scope["extensions"]["connexion_context"]["token_info"] + +claim_validation_skipped = { + "^/api/1/openapi.json$": ["GET"], + "^/api/1/skipped/?$": ["GET"], + "^/api/1/ui/?.+$": ["GET"], +} +claim_validation_secured = { + "^/api/1/secured/?$": { + "GET": { + "sub" : { + "essential": True, + "allow_blank": False, + "values": [f"{JWT_SUB}"], + }, + "iss": { + "essential": True, + "allow_blank": False, + "values": [f"{JWT_ISS}"], + }, + }, + }, +} + +def claim_validator_error_handler(request: ConnexionRequest, exc: Exception) -> ConnexionResponse: + return problem(detail=exc.detail, status=exc.status, title=exc.title) + +log.info("Generated jwt token to use in the Swagger UI ...") +log.info(generate_token()) + +app = AsyncApp(__name__, specification_dir="spec") +app.add_api("openapi.yaml", resolver=RestyResolver("api")) +app.add_middleware(ClaimValidatorMiddleware, MiddlewarePosition.BEFORE_VALIDATION, secured=claim_validation_secured, skipped=claim_validation_skipped, claims_callable=claims_callable) +app.add_error_handler(ClaimValidatorException, claim_validator_error_handler) + +if __name__ == "__main__": + run(app, host='127.0.0.1', port=8000) + diff --git a/examples/connexion/complex/spec/openapi.yaml b/examples/connexion/complex/spec/openapi.yaml new file mode 100644 index 0000000..df46ea5 --- /dev/null +++ b/examples/connexion/complex/spec/openapi.yaml @@ -0,0 +1,96 @@ +openapi: "3.0.0" +info: + title: Connexion Simple Example + version: 1.0.0 +servers: + - url: /api/1/ +paths: + /blocked: + get: + summary: Blocked endpoint + tags: + - api + responses: + '200': + description: Successful response + content: + application/json: + schema: + type: object + properties: + status: + type: string + example: blocked + default: + description: unexpected error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + /secured: + get: + summary: Secured endpoint + tags: + - api + responses: + '200': + description: Successful response + content: + application/json: + schema: + type: object + properties: + status: + type: string + example: secured + default: + description: unexpected error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + security: + - jwt: ['secret'] + /skipped: + get: + summary: Skipped endpoint + tags: + - api + responses: + '200': + description: Successful response + content: + application/json: + schema: + type: object + properties: + status: + type: string + example: skipped + default: + description: unexpected error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + security: + - jwt: ['secret'] + +components: + securitySchemes: + jwt: + type: http + scheme: bearer + bearerFormat: JWT + x-bearerInfoFunc: app.validate_token + schemas: + Error: + required: + - code + - message + properties: + code: + type: integer + format: int32 + message: + type: string \ No newline at end of file diff --git a/examples/connexion/simple/app.py b/examples/connexion/simple/app.py index 6ba1c97..14851cd 100644 --- a/examples/connexion/simple/app.py +++ b/examples/connexion/simple/app.py @@ -32,7 +32,7 @@ }, } } -claim_validation_claims_callable = lambda: { +claim_validation_claims_callable = lambda _: { "sub": "admin", "iss": "https://example.com", "aud": "https://example.com", diff --git a/examples/starlette/simple/app.py b/examples/starlette/simple/app.py index 4385e56..06949cf 100644 --- a/examples/starlette/simple/app.py +++ b/examples/starlette/simple/app.py @@ -10,7 +10,7 @@ JWT_LIFETIME_SECONDS = 3600 def claims_callable() -> Callable: - return lambda: { + return lambda _: { "sub": "admin", "iss": "https://example.com", "aud": "https://example.com", diff --git a/pyproject.toml b/pyproject.toml index 6ad4fde..c726dfa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "asgi-claim-validator" -version = "1.0.0" +version = "1.0.1" description = "A focused ASGI middleware for validating additional claims within JWT tokens to enhance token-based workflows." authors = ["Fabio Greco "] maintainers = ["Fabio Greco "] diff --git a/tests/conftest.py b/tests/conftest.py index 4d21063..3e1ab5e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -11,7 +11,7 @@ @pytest.fixture def claims_callable() -> Callable: """Fixture for providing mock JWT claims callable.""" - return lambda: { + return lambda _: { "sub": "admin", "iss": "https://example.com", "aud": "https://example.com", diff --git a/tests/test_middleware.py b/tests/test_middleware.py index e55825e..4a8096e 100644 --- a/tests/test_middleware.py +++ b/tests/test_middleware.py @@ -32,7 +32,7 @@ async def test_blocked_endpoint(app: Starlette) -> None: assert response.json() == {"error": "Unauthorized"} async def test_invalid_claims_type_endpoint(app: Starlette) -> None: - app.user_middleware[0].kwargs["claims_callable"] = lambda: "not_a_dict" + app.user_middleware[0].kwargs["claims_callable"] = lambda _: "not_a_dict" transport = ASGITransport(app=app, raise_app_exceptions=False) async with AsyncClient(transport=transport, base_url="http://testserver") as client: response = await client.get("/secured") @@ -40,14 +40,14 @@ async def test_invalid_claims_type_endpoint(app: Starlette) -> None: assert response.json() == {"error": "Bad Request"} async def test_invalid_claims_type_exception(app: Starlette) -> None: - app.user_middleware[0].kwargs["claims_callable"] = lambda: "not_a_dict" + app.user_middleware[0].kwargs["claims_callable"] = lambda _: "not_a_dict" transport = ASGITransport(app=app, raise_app_exceptions=True) async with AsyncClient(transport=transport, base_url="http://testserver") as client: with pytest.raises(InvalidClaimsTypeException): await client.get("/secured") async def test_unauthenticated_endpoint(app: Starlette) -> None: - app.user_middleware[0].kwargs["claims_callable"] = lambda: {} + app.user_middleware[0].kwargs["claims_callable"] = lambda _: {} transport = ASGITransport(app=app, raise_app_exceptions=False) async with AsyncClient(transport=transport, base_url="http://testserver") as client: response = await client.get("/secured") @@ -55,7 +55,7 @@ async def test_unauthenticated_endpoint(app: Starlette) -> None: assert response.json() == {"error": "Unauthorized"} async def test_unauthenticated_exception(app: Starlette) -> None: - app.user_middleware[0].kwargs["claims_callable"] = lambda: {} + app.user_middleware[0].kwargs["claims_callable"] = lambda _: {} transport = ASGITransport(app=app, raise_app_exceptions=True) async with AsyncClient(transport=transport, base_url="http://testserver") as client: with pytest.raises(UnauthenticatedRequestException): @@ -88,7 +88,7 @@ async def test_unspecified_method_authentication_exception(app: Starlette) -> No await client.delete("/secured") async def test_missing_essential_claim_endpoint(app: Starlette) -> None: - app.user_middleware[0].kwargs["claims_callable"] = lambda: { + app.user_middleware[0].kwargs["claims_callable"] = lambda _: { "sub": "admin", } transport = ASGITransport(app=app, raise_app_exceptions=False) @@ -98,7 +98,7 @@ async def test_missing_essential_claim_endpoint(app: Starlette) -> None: assert response.json() == {"error": "Forbidden"} async def test_missing_essential_claim_exception(app: Starlette) -> None: - app.user_middleware[0].kwargs["claims_callable"] = lambda: { + app.user_middleware[0].kwargs["claims_callable"] = lambda _: { "sub": "admin", } transport = ASGITransport(app=app, raise_app_exceptions=True) @@ -107,7 +107,7 @@ async def test_missing_essential_claim_exception(app: Starlette) -> None: await client.get("/secured") async def test_invalid_claim_value_endpoint(app: Starlette) -> None: - app.user_middleware[0].kwargs["claims_callable"] = lambda: { + app.user_middleware[0].kwargs["claims_callable"] = lambda _: { "sub": "admin", "iss": "https://wrong-example.com", } @@ -118,7 +118,7 @@ async def test_invalid_claim_value_endpoint(app: Starlette) -> None: assert response.json() == {"error": "Forbidden"} async def test_invalid_claim_value_exception(app: Starlette) -> None: - app.user_middleware[0].kwargs["claims_callable"] = lambda: { + app.user_middleware[0].kwargs["claims_callable"] = lambda _: { "sub": "admin", "iss": "https://wrong-example.com", }