Skip to content

Commit

Permalink
Add a test for ValidatingTokenStorage on refresh
Browse files Browse the repository at this point in the history
A dedicated test case stores token data twice, using a refresh token
response for the second store. It verifies that under such a usage,
the original identity information is preserved.
  • Loading branch information
sirosen committed Sep 19, 2024
1 parent 468e6b1 commit 0beda61
Showing 1 changed file with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
import pytest

import globus_sdk
from globus_sdk import MISSING, MissingType, OAuthTokenResponse, Scope
from globus_sdk import (
MISSING,
MissingType,
OAuthRefreshTokenResponse,
OAuthTokenResponse,
Scope,
)
from globus_sdk.experimental.globus_app import ValidatingTokenStorage
from globus_sdk.experimental.globus_app.errors import (
IdentityMismatchError,
Expand Down Expand Up @@ -117,6 +123,33 @@ def test_validating_token_storage_loads_identity_info_from_storage(
assert new_adapter.identity_id == identity_id


def test_validating_token_storage_stores_with_saved_identity_id_on_refresh_tokens(
make_token_response,
):
# Create an in memory storage adapter
storage = MemoryTokenStorage()
adapter = ValidatingTokenStorage(storage, {})

# Store an identifiable token response
identity_id = str(uuid.uuid4())
token_response = make_token_response(identity_id=identity_id)
adapter.store_token_response(token_response)

# now get and store a replacement token response, identified with a different user
# however, in this case make it a refresh token response
other_identity_id = str(uuid.uuid4())
refresh_token_response = make_token_response(
response_class=OAuthRefreshTokenResponse, identity_id=other_identity_id
)
adapter.store_token_response(refresh_token_response)

# read back the data, and verify that it contains tokens from the refresh, but the
# original identity_id
result = adapter.get_token_data("auth.globus.org")
assert result.access_token == refresh_token_response["access_token"]
assert result.identity_id == identity_id


def test_validating_token_storage_raises_error_when_no_token_data():
adapter = ValidatingTokenStorage(MemoryTokenStorage(), {})

Expand All @@ -129,6 +162,7 @@ def make_token_response(make_response):
def _make_token_response(
scopes: dict[str, str] | None = None,
identity_id: str | None | MissingType = MISSING,
response_class: type[OAuthTokenResponse] = OAuthTokenResponse,
):
"""
:param scopes: A dictionary of resource server to scope mappings to fill in
Expand Down Expand Up @@ -169,7 +203,7 @@ def _make_token_response(
# be a real JWT ID token.
data["id_token"] = _make_id_token()

response = make_response(response_class=OAuthTokenResponse, json_body=data)
response = make_response(response_class=response_class, json_body=data)

if identity_id is not None:
decoded_id_token = _decoded_id_token(identity_id)
Expand Down

0 comments on commit 0beda61

Please sign in to comment.