Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail on invalid token when checking multiple OR security schemes #1778

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 7 additions & 47 deletions connexion/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,24 +558,15 @@ async def wrapper(request):
@classmethod
def verify_security(cls, auth_funcs):
async def verify_fn(request):
token_info = NO_VALUE
errors = []
for func in auth_funcs:
try:
token_info = func(request)
while asyncio.iscoroutine(token_info):
token_info = await token_info
if token_info is not NO_VALUE:
break
except Exception as err:
errors.append(err)

token_info = func(request)
while asyncio.iscoroutine(token_info):
token_info = await token_info
if token_info is not NO_VALUE:
break
else:
if errors != []:
cls._raise_most_specific(errors)
else:
logger.info("... No auth provided. Aborting with 401.")
raise OAuthProblem(detail="No authorization token provided")
logger.info("... No auth provided. Aborting with 401.")
raise OAuthProblem(detail="No authorization token provided")

request.context.update(
{
Expand All @@ -586,34 +577,3 @@ async def verify_fn(request):
)

return verify_fn

@staticmethod
def _raise_most_specific(exceptions: t.List[Exception]) -> None:
"""Raises the most specific error from a list of exceptions by status code.

The status codes are expected to be either in the `code`
or in the `status` attribute of the exceptions.

The order is as follows:
- 403: valid credentials but not enough privileges
- 401: no or invalid credentials
- for other status codes, the smallest one is selected

:param errors: List of exceptions.
:type errors: t.List[Exception]
"""
if not exceptions:
return
# We only use status code attributes from exceptions
# We use 600 as default because 599 is highest valid status code
status_to_exc = {
getattr(exc, "status_code", getattr(exc, "status", 600)): exc
for exc in exceptions
}
if 403 in status_to_exc:
raise status_to_exc[403]
elif 401 in status_to_exc:
raise status_to_exc[401]
else:
lowest_status_code = min(status_to_exc)
raise status_to_exc[lowest_status_code]
23 changes: 20 additions & 3 deletions docs/security.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,30 @@ Multiple Authentication Schemes
-------------------------------

With Connexion, it is also possible to combine multiple authentication schemes
as described in the `OpenAPI specification`_. When multiple authentication
schemes are combined using logical AND, the ``token_info`` argument will
consist of a dictionary mapping the names of the security scheme to their
as described in the `OpenAPI specification`_.

AND
```

When multiple authentication schemes are combined using logical AND, the ``token_info`` argument
will consist of a dictionary mapping the names of the security scheme to their
corresponding ``token_info``.

Multiple OAuth2 security schemes in AND fashion are not supported.

OR
``

When multiple authentication schemes are combined using logical OR, the schemes are checked in
the order they are listed in the OpenAPI spec. For each security scheme the following logic is
applied:

- If a token matching the security schem is found in the request and is valid, allow the request
to pass
- If a token matching the security scheme is found in the request and is invalid, return an error
- If no token matching the security scheme is found in the request, move on to the next scheme,
or return a 401 error if this was the last scheme

.. _OpenAPI specification: https://swagger.io/docs/specification/authentication/#multiple

Custom security handlers
Expand Down
4 changes: 2 additions & 2 deletions tests/api/test_secure_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ def test_security(oauth_requests, secure_endpoint_app):
assert response.text == '"Authenticated"\n'
headers = {"X-AUTH": "wrong-key"}
response = app_client.get("/v1.0/optional-auth", headers=headers)
assert response.text == '"Unauthenticated"\n'
assert response.status_code == 200
assert response.json()["title"] == "Unauthorized"
assert response.status_code == 401

# security function throws exception
response = app_client.get("/v1.0/auth-exception", headers={"X-Api-Key": "foo"})
Expand Down
29 changes: 0 additions & 29 deletions tests/decorators/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,32 +299,3 @@ async def test_verify_security_oauthproblem():

assert exc_info.value.status_code == 401
assert exc_info.value.detail == "No authorization token provided"


@pytest.mark.parametrize(
"errors, most_specific",
[
([OAuthProblem()], OAuthProblem),
([OAuthProblem(), OAuthScopeProblem([], [])], OAuthScopeProblem),
(
[OAuthProblem(), OAuthScopeProblem([], []), BadRequestProblem],
OAuthScopeProblem,
),
(
[
OAuthProblem(),
OAuthScopeProblem([], []),
BadRequestProblem,
ConnexionException,
],
OAuthScopeProblem,
),
([BadRequestProblem(), ConnexionException()], BadRequestProblem),
([ConnexionException()], ConnexionException),
],
)
def test_raise_most_specific(errors, most_specific):
"""Tests whether most specific exception is raised from a list."""
security_handler_factory = SecurityHandlerFactory()
with pytest.raises(most_specific):
security_handler_factory._raise_most_specific(errors)
5 changes: 2 additions & 3 deletions tests/fakeapi/hello/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,8 @@ def more_than_one_scope_defined(**kwargs):
return "OK"


def optional_auth(**kwargs):
key = apikey_info(request.headers.get("X-AUTH"))
if key is None:
def optional_auth(context_):
if not context_.get("token_info"):
return "Unauthenticated"
else:
return "Authenticated"
Expand Down
Loading