Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions firebase_admin/app_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@
from jwt import PyJWKClient, ExpiredSignatureError, InvalidTokenError, DecodeError
from jwt import InvalidAudienceError, InvalidIssuerError, InvalidSignatureError
from firebase_admin import _utils
from firebase_admin import _http_client

_APP_CHECK_ATTRIBUTE = '_app_check'

def _get_app_check_service(app) -> Any:
return _utils.get_app_service(app, _APP_CHECK_ATTRIBUTE, _AppCheckService)

def verify_token(token: str, app=None) -> Dict[str, Any]:
def verify_token(token: str, app=None, consume: bool = False) -> Dict[str, Any]:
"""Verifies a Firebase App Check token.

Args:
token: A token from App Check.
app: An App instance (optional).
consume: A boolean indicating whether to consume the token (optional).

Returns:
Dict[str, Any]: The token's decoded claims.
Expand All @@ -40,7 +42,7 @@ def verify_token(token: str, app=None) -> Dict[str, Any]:
or if the token's headers or payload are invalid.
PyJWKClientError: If PyJWKClient fails to fetch a valid signing key.
"""
return _get_app_check_service(app).verify_token(token)
return _get_app_check_service(app).verify_token(token, consume)

class _AppCheckService:
"""Service class that implements Firebase App Check functionality."""
Expand All @@ -50,6 +52,7 @@ class _AppCheckService:
_project_id = None
_scoped_project_id = None
_jwks_client = None
_http_client = None

_APP_CHECK_HEADERS = {
'x-goog-api-client': _utils.get_metrics_header(),
Expand All @@ -68,9 +71,12 @@ def __init__(self, app):
# Default lifespan is 300 seconds (5 minutes) so we change it to 21600 seconds (6 hours).
self._jwks_client = PyJWKClient(
self._JWKS_URL, lifespan=21600, headers=self._APP_CHECK_HEADERS)
self._http_client = _http_client.JsonHttpClient(
credential=app.credential,
base_url='https://firebaseappcheck.googleapis.com/v1beta')


def verify_token(self, token: str) -> Dict[str, Any]:
def verify_token(self, token: str, consume: bool = False) -> Dict[str, Any]:
"""Verifies a Firebase App Check token."""
_Validators.check_string("app check token", token)

Expand All @@ -87,8 +93,18 @@ def verify_token(self, token: str) -> Dict[str, Any]:
) from exception

verified_claims['app_id'] = verified_claims.get('sub')
if consume:
already_consumed = self._verify_replay_protection(token)
verified_claims['already_consumed'] = already_consumed
return verified_claims

def _verify_replay_protection(self, token: str) -> bool:
"""Verifies the token's consumption status."""
path = f'/{self._scoped_project_id}:verifyAppCheckToken'
body = {'app_check_token': token}
response = self._http_client.body('post', path, json=body)
return response.get('alreadyConsumed', False)

def _has_valid_token_headers(self, headers: Any) -> None:
"""Checks whether the token has valid headers for App Check."""
# Ensure the token's header has type JWT
Expand Down
22 changes: 22 additions & 0 deletions tests/test_app_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,28 @@ def test_verify_token(self, mocker):
expected['app_id'] = APP_ID
assert payload == expected

def test_verify_token_with_consume(self, mocker):
mocker.patch("jwt.decode", return_value=JWT_PAYLOAD_SAMPLE)
mocker.patch("jwt.PyJWKClient.get_signing_key_from_jwt", return_value=PyJWK(signing_key))
mocker.patch("jwt.get_unverified_header", return_value=JWT_PAYLOAD_SAMPLE.get("headers"))
mock_http_client = mocker.patch("firebase_admin._http_client.JsonHttpClient")
mock_http_client.return_value.body.return_value = {'alreadyConsumed': True}

# Use a fresh app to ensure _AppCheckService is re-initialized with the mock
cred = testutils.MockCredential()
app = firebase_admin.initialize_app(cred, {'projectId': PROJECT_ID}, name='test_consume')

try:
payload = app_check.verify_token("encoded", app, consume=True)
expected = JWT_PAYLOAD_SAMPLE.copy()
expected['app_id'] = APP_ID
expected['already_consumed'] = True
assert payload == expected
mock_http_client.return_value.body.assert_called_once_with(
'post', f'/{SCOPED_PROJECT_ID}:verifyAppCheckToken', json={'app_check_token': 'encoded'})
finally:
firebase_admin.delete_app(app)

def test_verify_token_with_non_list_audience_raises_error(self, mocker):
jwt_with_non_list_audience = JWT_PAYLOAD_SAMPLE.copy()
jwt_with_non_list_audience["aud"] = '1234'
Expand Down
Loading