diff --git a/google_yubikey/__main__.py b/google_yubikey/__main__.py index 3998432..859600b 100755 --- a/google_yubikey/__main__.py +++ b/google_yubikey/__main__.py @@ -14,6 +14,7 @@ get_yubikey, gen_private_key, get_access_token, get_id_token,\ DEFAULT_LIFETIME, DEFAULT_SCOPES from google_yubikey.metadata import get_gce_metadata +from google_yubikey.util import CachedItem class ArgEnum(Enum): @@ -177,6 +178,10 @@ def parse_args(): '-m', '--prompt-management-key', action='store_true', help='Prompt for management key', ) + parser_serve.add_argument( + '-c', '--cache-lifetime', type=int, default=CachedItem.DEFAULT_LIFETIME_SEC, + help='Token/PIN cache lifetime, in seconds', + ) parser_serve.add_argument( '-v', '--verbosity', type=Verbosity.from_str(Verbosity), choices=list(Verbosity), help='Prompt for management key', default=Verbosity.INFO, @@ -210,7 +215,8 @@ def main(): elif args.action == str(Action.SERVE): get_gce_metadata( args.slot, args.prompt_management_key, args.numeric_project_id, - args.service_account_email, args.token_lifetime, args.verbosity.name, + args.service_account_email, args.token_lifetime, args.cache_lifetime, + args.verbosity.name, ).run() diff --git a/google_yubikey/device.py b/google_yubikey/device.py index 243e22a..7685d6c 100644 --- a/google_yubikey/device.py +++ b/google_yubikey/device.py @@ -41,7 +41,8 @@ def get_yubikey(): _CACHED_MGMT_KEY = CachedItem() -def authenticate(yubikey: YubiKey, prompt_management_key: bool, stream=sys.stderr): +def authenticate(yubikey: YubiKey, prompt_management_key: bool, + cache_lifetime=CachedItem.DEFAULT_LIFETIME_SEC, stream=sys.stderr): """ Authenticates user to the YubiKey """ global _CACHED_PIN, _CACHED_MGMT_KEY # pylint: disable=global-statement @@ -50,13 +51,13 @@ def authenticate(yubikey: YubiKey, prompt_management_key: bool, stream=sys.stder pin = _CACHED_PIN.value if _CACHED_PIN.expired(): pin = getpass('Enter PIN: ', stream) - _CACHED_PIN = CachedItem(None, pin) + _CACHED_PIN = CachedItem(None, pin, cache_lifetime) yubikey.verify(pin, touch_callback=prompt_for_touch) mgmt_key = _CACHED_MGMT_KEY.value if prompt_management_key and _CACHED_MGMT_KEY.expired(): mgmt_key = getpass('Enter management key: ', stream) - _CACHED_MGMT_KEY = CachedItem(None, mgmt_key) + _CACHED_MGMT_KEY = CachedItem(None, mgmt_key, cache_lifetime) else: mgmt_key = DEFAULT_MANAGEMENT_KEY yubikey.authenticate(mgmt_key, touch_callback=prompt_for_touch) @@ -102,9 +103,9 @@ def _json_b64encode(obj: dict): def _get_jwt(yubikey: YubiKey, slot: int, prompt_management_key: bool, service_account_email: str, audience: str, scopes: List[str], - token_lifetime: int, stream=sys.stderr): + token_lifetime: int, cache_lifetime: int, stream=sys.stderr): """ Generates a general-purpose Google JWT with a YubiKey """ - authenticate(yubikey, prompt_management_key, stream) + authenticate(yubikey, prompt_management_key, cache_lifetime, stream) iat = time() header = { @@ -128,25 +129,26 @@ def _get_jwt(yubikey: YubiKey, slot: int, prompt_management_key: bool, def get_id_token(yubikey: YubiKey, slot: int, prompt_management_key: bool, - service_account_email: str, audience: str, - token_lifetime: int, stream=sys.stderr): + service_account_email: str, audience: str, token_lifetime: int, + cache_lifetime=CachedItem.DEFAULT_LIFETIME_SEC, stream=sys.stderr): """ Generates a Google ID token with a YubiKey """ if not audience: raise ValueError('ID tokens must use a non-empty audience') return _get_jwt( yubikey, slot, prompt_management_key, - service_account_email, audience, [], token_lifetime, stream, + service_account_email, audience, [], + token_lifetime, cache_lifetime, stream, ) def get_access_token(yubikey: YubiKey, slot: int, prompt_management_key: bool, - service_account_email: str, scopes: List[str], - token_lifetime: int, stream=sys.stderr): + service_account_email: str, scopes: List[str], token_lifetime: int, + cache_lifetime=CachedItem.DEFAULT_LIFETIME_SEC, stream=sys.stderr): """ Generates a Google Access token with a YubiKey """ assertion = _get_jwt( yubikey, slot, prompt_management_key, service_account_email, _GOOGLE_OAUTH2_TOKEN_ENDPOINT, scopes, - token_lifetime, stream, + token_lifetime, cache_lifetime, stream, ) response = requests.post( url=_GOOGLE_OAUTH2_TOKEN_ENDPOINT, diff --git a/google_yubikey/metadata.py b/google_yubikey/metadata.py index 5fc8b2e..60956b8 100644 --- a/google_yubikey/metadata.py +++ b/google_yubikey/metadata.py @@ -38,12 +38,13 @@ class GCEMetadata: def __init__(self, slot: SLOT, prompt_management_key: bool, numeric_project_id: int, service_account_email: str, - token_lifetime: int, verbosity: str): + token_lifetime: int, cache_lifetime: int, verbosity: str): self.slot = slot self.prompt_management_key = prompt_management_key self.numeric_project_id = numeric_project_id self.service_account_email = service_account_email self.token_lifetime = token_lifetime + self.cache_lifetime = cache_lifetime self.verbosity = verbosity self.add_ip() @@ -96,6 +97,7 @@ def run(self): '--set', f'numeric_project_id={self.numeric_project_id}', '--set', f'service_account_email={self.service_account_email}', '--set', f'token_lifetime={self.token_lifetime}', + '--set', f'cache_lifetime={self.cache_lifetime}', '--set', f'verbosity={self.verbosity}', '--processes', '1', '--honour-stdin', @@ -143,7 +145,8 @@ def delete_ip(self): def get_gce_metadata(slot: SLOT, prompt_management_key: bool, numeric_project_id: int, service_account_email: str, - token_lifetime: int, verbosity: str) -> GCEMetadata: + token_lifetime: int, cache_lifetime: int, + verbosity: str) -> GCEMetadata: """ Returns GCEMetadata instance for your OS """ os_name = platform.system() metadata_type: Type[GCEMetadata] @@ -155,7 +158,7 @@ def get_gce_metadata(slot: SLOT, prompt_management_key: bool, raise NotImplementedError('Sorry, your OS is not supported yet') return metadata_type( slot, prompt_management_key, numeric_project_id, - service_account_email, token_lifetime, verbosity, + service_account_email, token_lifetime, cache_lifetime, verbosity, ) @@ -174,6 +177,7 @@ def __init__(self): self.project_id = self.service_account_email \ .split('@')[1].split('.')[0] self.token_lifetime = int(self._get('token_lifetime')) + self.cache_lifetime = int(self._get('cache_lifetime')) self.verbosity = self._get('verbosity') def _get(self, name: str): @@ -296,10 +300,10 @@ def _get_id_token(): return _CACHED_IDENTITY.value response = get_id_token( yubikey, opts.slot, opts.prompt_management_key, - opts.service_account_email, audience, opts.token_lifetime, - stream=sys.stdout, + opts.service_account_email, audience, + opts.token_lifetime, opts.cache_lifetime, sys.stdout, ) - _CACHED_IDENTITY = CachedItem(audience, response) + _CACHED_IDENTITY = CachedItem(audience, response, opts.cache_lifetime) return response @app.route(_SA_ROOT + f'/{opts.service_account_email}/scopes') @@ -322,10 +326,10 @@ def _get_access_token(): response = get_access_token( yubikey, opts.slot, opts.prompt_management_key, - opts.service_account_email, scopes, opts.token_lifetime, - stream=sys.stdout, + opts.service_account_email, scopes, + opts.token_lifetime, opts.cache_lifetime, sys.stdout, ) - _CACHED_TOKEN = CachedItem(scopes, response) + _CACHED_TOKEN = CachedItem(scopes, response, opts.cache_lifetime) return response return app diff --git a/google_yubikey/util.py b/google_yubikey/util.py index fedff62..724c1c1 100644 --- a/google_yubikey/util.py +++ b/google_yubikey/util.py @@ -9,12 +9,14 @@ class CachedItem(): """ Holds a cached item """ + DEFAULT_LIFETIME_SEC = 10 + def __init__(self, key: Any = None, value: Any = None, - timeout=timedelta(seconds=10)): + timeout_sec=DEFAULT_LIFETIME_SEC): self.key = key self.value = value self._time = datetime.now() - self._timeout = timeout + self._timeout = timedelta(seconds=timeout_sec) def expired(self, key: Any = None): """ Determines if the token needs to be refreshed """