Skip to content

Commit

Permalink
move account id resolution to separate class
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlm committed Oct 27, 2023
1 parent 49e071f commit dee1e9f
Showing 1 changed file with 59 additions and 48 deletions.
107 changes: 59 additions & 48 deletions botocore/regions.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ class EndpointResolverBuiltins(str, Enum):
AWS_ACCOUNT_ID = "AWS::Auth::AccountId"


class EndpointRulesetResolver:
"""Resolves endpoints using a service's endpoint ruleset"""
class CredentialBuiltinResolver:
"""Resolves endpoint builtins sourced from credentials."""

DEFAULT_ACCOUNT_ID_ENDPOINT_MODE = 'preferred'
# Allowed values for the ``account_id_endpoint_mode`` config field.
Expand All @@ -467,6 +467,55 @@ class EndpointRulesetResolver:
'required',
)

def __init__(self, credentials, account_id_endpoint_mode):
self._credentials = credentials
if account_id_endpoint_mode is None:
account_id_endpoint_mode = self.DEFAULT_ACCOUNT_ID_ENDPOINT_MODE
self._account_id_endpoint_mode = account_id_endpoint_mode

def resolve_account_id_builtin(self, builtins, signing_enabled):
"""Resolve the ``AWS::Auth::AccountId`` builtin."""
self._validate_account_id_endpoint_mode()
acct_id_builtin_key = EndpointResolverBuiltins.AWS_ACCOUNT_ID
if not self._should_resolve_account_id_builtin(signing_enabled):
# Unset the account ID if endpoint mode is disabled.
builtins[acct_id_builtin_key] = None

elif builtins.get(acct_id_builtin_key) is None:
self._do_resolve_account_id_builtin(builtins)

def _validate_account_id_endpoint_mode(self):
valid_modes = self.VALID_ACCOUNT_ID_ENDPOINT_MODES
if self._account_id_endpoint_mode not in valid_modes:
error_msg = (
f'Invalid value "{self._account_id_endpoint_mode}" '
'for account_id_endpoint_mode. Valid values are: '
f'{", ".join(valid_modes)}.'
)
raise InvalidConfigError(error_msg=error_msg)

def _should_resolve_account_id_builtin(self, signing_enabled):
creds_available = self._credentials is not None
mode_enabled = self._account_id_endpoint_mode != 'disabled'
return signing_enabled and creds_available and mode_enabled

def _do_resolve_account_id_builtin(self, builtins):
frozen_creds = self._credentials.get_frozen_credentials()
account_id = frozen_creds.account_id
if account_id is None:
acct_id_ep_mode = self._account_id_endpoint_mode
msg = f'"account_id_endpoint_mode" is set to {acct_id_ep_mode}'
if acct_id_ep_mode == 'preferred':
LOG.debug('%s, but account ID was not found.', msg)
elif acct_id_ep_mode == 'required':
raise AccountIdNotFound(msg=msg)
else:
builtins[EndpointResolverBuiltins.AWS_ACCOUNT_ID] = account_id


class EndpointRulesetResolver:
"""Resolves endpoints using a service's endpoint ruleset"""

def __init__(
self,
endpoint_ruleset_data,
Expand All @@ -492,10 +541,9 @@ def __init__(
self._use_ssl = use_ssl
self._requested_auth_scheme = requested_auth_scheme
self._instance_cache = {}
self._credentials = credentials
if account_id_endpoint_mode is None:
account_id_endpoint_mode = self.DEFAULT_ACCOUNT_ID_ENDPOINT_MODE
self._account_id_endpoint_mode = account_id_endpoint_mode
self._credential_builtin_resolver = CredentialBuiltinResolver(
credentials, account_id_endpoint_mode
)

def construct_endpoint(
self,
Expand Down Expand Up @@ -564,7 +612,7 @@ def _get_provider_params(
customized_builtins = self._get_customized_builtins(
operation_model, call_args, request_context
)
self._resolve_account_id_builtin(customized_builtins)
self._resolve_credential_builtins(customized_builtins)
for param_name, param_def in self._param_definitions.items():
param_val = self._resolve_param_from_context(
param_name=param_name,
Expand All @@ -581,48 +629,11 @@ def _get_provider_params(

return provider_params

def _resolve_account_id_builtin(self, builtins):
"""Resolve the ``AWS::Auth::AccountId`` builtin."""
if 'AccountId' not in self._param_definitions:
return

self._validate_account_id_endpoint_mode()
acct_id_builtin_key = EndpointResolverBuiltins.AWS_ACCOUNT_ID
if not self._should_resolve_account_id_builtin():
# Unset the account ID if endpoint mode is disabled.
builtins[acct_id_builtin_key] = None

elif builtins.get(acct_id_builtin_key) is None:
self._do_resolve_account_id_builtin(builtins)

def _validate_account_id_endpoint_mode(self):
valid_modes = self.VALID_ACCOUNT_ID_ENDPOINT_MODES
if self._account_id_endpoint_mode not in valid_modes:
error_msg = (
f'Invalid value "{self._account_id_endpoint_mode}" '
'for account_id_endpoint_mode. Valid values are: '
f'{", ".join(valid_modes)}.'
)
raise InvalidConfigError(error_msg=error_msg)

def _should_resolve_account_id_builtin(self):
def _resolve_credential_builtins(self, builtins):
resolver = self._credential_builtin_resolver
signing_enabled = self._requested_auth_scheme != UNSIGNED
creds_available = self._credentials is not None
mode_enabled = self._account_id_endpoint_mode != 'disabled'
return signing_enabled and creds_available and mode_enabled

def _do_resolve_account_id_builtin(self, builtins):
frozen_creds = self._credentials.get_frozen_credentials()
account_id = frozen_creds.account_id
if account_id is None:
acct_id_ep_mode = self._account_id_endpoint_mode
msg = f'"account_id_endpoint_mode" is set to {acct_id_ep_mode}'
if acct_id_ep_mode == 'preferred':
LOG.debug('%s, but account ID was not found.', msg)
elif acct_id_ep_mode == 'required':
raise AccountIdNotFound(msg=msg)
else:
builtins[EndpointResolverBuiltins.AWS_ACCOUNT_ID] = account_id
if 'AccountId' in self._param_definitions:
resolver.resolve_account_id_builtin(builtins, signing_enabled)

def _resolve_param_from_context(
self, param_name, operation_model, call_args
Expand Down

0 comments on commit dee1e9f

Please sign in to comment.