Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(auth): support service account impersonation from gcloud json file #831

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions auth/gcloud/aio/auth/token.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class Type(enum.Enum):
AUTHORIZED_USER = 'authorized_user'
GCE_METADATA = 'gce_metadata'
SERVICE_ACCOUNT = 'service_account'
IMPERSONATED_SERVICE_ACCOUNT = 'impersonated_service_account'


def get_service_data(
Expand Down Expand Up @@ -264,14 +265,29 @@ def __init__(
) -> None:
super().__init__(service_file=service_file, session=session)

self.scopes = ' '.join(scopes or [])
if (self.token_type == Type.SERVICE_ACCOUNT
or target_principal) and not self.scopes:
if scopes is not None:
self.scopes = ' '.join(scopes or [])
elif self.service_data is not None:
if self.token_type == Type.IMPERSONATED_SERVICE_ACCOUNT:
# If service file was provided and the type is
# IMPERSONATED_SERVICE_ACCOUNT, gcloud requires this default
# scope but does not write it to the file
self.scopes = 'https://www.googleapis.com/auth/cloud-platform'
if target_principal:
self.impersonation_uri = GCLOUD_ENDPOINT_GENERATE_ACCESS_TOKEN.format(
service_account=target_principal
)
elif (
self.service_data
and 'service_account_impersonation_url' in self.service_data
):
self.impersonation_uri = self.service_data[
'service_account_impersonation_url'
]
if self.impersonation_uri and not self.scopes:
raise Exception(
'scopes must be provided when token type is '
'service account or using target_principal',
'scopes must be provided when token type requires impersonation',
)
self.target_principal = target_principal
self.delegates = delegates

async def _refresh_authorized_user(self, timeout: int) -> TokenResponse:
Expand All @@ -290,6 +306,23 @@ async def _refresh_authorized_user(self, timeout: int) -> TokenResponse:
return TokenResponse(value=str(content['access_token']),
expires_in=int(content['expires_in']))

async def _refresh_source_authorized_user(self, timeout: int) -> TokenResponse:
source_credentials = self.service_data['source_credentials']
payload = urlencode({
'grant_type': 'refresh_token',
'client_id': source_credentials['client_id'],
'client_secret': source_credentials['client_secret'],
'refresh_token': source_credentials['refresh_token'],
})

resp = await self.session.post(
url=self.token_uri, data=payload, headers=REFRESH_HEADERS,
timeout=timeout,
)
content = await resp.json()
return TokenResponse(value=str(content['access_token']),
expires_in=int(content['expires_in']))

async def _refresh_gce_metadata(self, timeout: int) -> TokenResponse:
resp = await self.session.get(
url=self.token_uri, headers=GCE_METADATA_HEADERS, timeout=timeout,
Expand Down Expand Up @@ -340,9 +373,9 @@ async def _impersonate(self, token: TokenResponse,
})

resp = await self.session.post(
GCLOUD_ENDPOINT_GENERATE_ACCESS_TOKEN.format(
service_account=self.target_principal),
data=payload, headers=headers, timeout=timeout)
self.impersonation_uri, data=payload, headers=headers,
timeout=timeout,
)

data = await resp.json()
token.value = str(data['accessToken'])
Expand All @@ -355,10 +388,13 @@ async def refresh(self, *, timeout: int) -> TokenResponse:
resp = await self._refresh_gce_metadata(timeout=timeout)
elif self.token_type == Type.SERVICE_ACCOUNT:
resp = await self._refresh_service_account(timeout=timeout)
elif self.token_type == Type.IMPERSONATED_SERVICE_ACCOUNT:
# impersonation requires a source authorized user
resp = await self._refresh_source_authorized_user(timeout=timeout)
else:
raise Exception(f'unsupported token type {self.token_type}')

if self.target_principal:
if self.impersonation_uri:
resp = await self._impersonate(resp, timeout=timeout)

return resp
Expand Down Expand Up @@ -522,6 +558,10 @@ async def refresh(self, *, timeout: int) -> TokenResponse:
elif self.token_type == Type.SERVICE_ACCOUNT:
resp = await self._refresh_service_account(
iap_client_id, timeout)
elif self.token_type == Type.IMPERSONATED_SERVICE_ACCOUNT:
raise Exception(
'impersonation is not supported for IAP tokens',
)
else:
raise Exception(f'unsupported token type {self.token_type}')

Expand Down