diff --git a/requirements.txt b/requirements.txt index e26d33e..c8e8b77 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,4 @@ clickclick>=1.0 -keyring -keyrings.alt -oauth2client>=4.0.0 PyYAML requests stups-tokens diff --git a/tests/test_api.py b/tests/test_api.py index 159bb50..34b3679 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -15,14 +15,14 @@ def test_is_valid(): assert not zign.api.is_valid({'creation_time': now - 3480, 'expires_in': 3600}) -def test_get_new_token_auth_fail(monkeypatch): +def test_get_named_token_deprecated(monkeypatch): + logger = MagicMock() response = MagicMock(status_code=401) - monkeypatch.setattr('requests.get', MagicMock(return_value=response)) - monkeypatch.setattr('stups_cli.config.store_config', lambda x, y: None) - with pytest.raises(zign.api.AuthenticationFailed) as excinfo: - zign.api.get_named_token('myrealm', ['myscope'], 'myuser', 'mypass', 'http://example.org') - - assert 'Authentication failed: Token Service' in str(excinfo) + monkeypatch.setattr('zign.api.get_token', lambda x, y: 'mytok701') + monkeypatch.setattr('zign.api.logger', logger) + token = zign.api.get_named_token('myrealm', ['myscope'], 'myuser', 'mypass', 'http://example.org') + assert 'mytok701' == token['access_token'] + logger.warning.assert_called_with('"get_named_token" is deprecated, please use "zign.api.get_token" instead') def test_get_new_token_server_error(monkeypatch): @@ -55,17 +55,6 @@ def test_get_token_existing(monkeypatch): assert zign.api.get_token('mytok', ['myscope']) == 'tt77' -def test_get_token_configuration_error(monkeypatch): - def get_token(name): - raise tokens.ConfigurationError('TEST') - - monkeypatch.setattr('tokens.get', get_token) - monkeypatch.setattr('stups_cli.config.load_config', lambda x: {}) - - with pytest.raises(zign.api.ConfigurationError): - zign.api.get_token('mytok', ['myscope']) - - def test_get_token_service_success(monkeypatch): monkeypatch.setattr('tokens.get', lambda x: 'svc123') @@ -77,9 +66,7 @@ def get_token(name): raise tokens.ConfigurationError('TEST') monkeypatch.setattr('tokens.get', get_token) - monkeypatch.setattr('stups_cli.config.load_config', lambda x: {'url': 'http://localhost'}) - monkeypatch.setattr('os.getenv', lambda x: 'mypass') - monkeypatch.setattr('zign.api.get_new_token', lambda *args, **kwargs: {'access_token': 'tt77'}) + monkeypatch.setattr('zign.api.get_token_implicit_flow', lambda *args, **kwargs: {'access_token': 'tt77'}) assert zign.api.get_token('mytok', ['myscope']) == 'tt77' @@ -105,3 +92,45 @@ def test_backwards_compatible_get_config(monkeypatch): monkeypatch.setattr('stups_cli.config.load_config', load_config) assert {'url': 'http://localhost'} == zign.api.get_config() load_config.assert_called_with('zign') + + +def test_get_config(monkeypatch): + load_config = MagicMock() + load_config.return_value = {} + store_config = MagicMock() + def prompt(message, **kwargs): + # just return the prompt text for easy assertion + return message + monkeypatch.setattr('stups_cli.config.load_config', load_config) + monkeypatch.setattr('stups_cli.config.store_config', store_config) + monkeypatch.setattr('click.prompt', prompt) + monkeypatch.setattr('requests.get', lambda x, timeout: None) + config = zign.api.get_config(zign.config.CONFIG_NAME) + expected_config = { + 'authorize_url': 'Please enter the OAuth 2 Authorization Endpoint URL', + 'business_partner_id': 'Please enter the Business Partner ID', + 'client_id': 'Please enter the OAuth 2 Client ID', + 'token_url': 'Please enter the OAuth 2 Token Endpoint URL' + } + assert config == expected_config + + + +def test_token_implicit_flow(monkeypatch): + + access_token = 'myacctok' + + def webbrowser_open(url, **kwargs): + assert url == 'https://localhost/authorize?business_partner_id=123&client_id=foobar&redirect_uri=http://localhost:8081&response_type=token' + + server = MagicMock() + server.return_value.query_params = {'access_token': access_token, 'refresh_token': 'foo', 'expires_in': 3600, 'token_type': 'Bearer'} + + load_config = MagicMock() + load_config.return_value = {'authorize_url': 'https://localhost/authorize', 'token_url': 'https://localhost/token', 'client_id': 'foobar', 'business_partner_id': '123'} + monkeypatch.setattr('stups_cli.config.load_config', load_config) + monkeypatch.setattr('zign.api.load_config_ztoken', lambda x: {}) + monkeypatch.setattr('webbrowser.open', webbrowser_open) + monkeypatch.setattr('zign.api.ClientRedirectServer', server) + token = zign.api.get_token_implicit_flow('test_token_implicit_flow') + assert access_token == token['access_token'] diff --git a/tests/test_cli_zign.py b/tests/test_cli_zign.py index 36c84da..97edbc4 100644 --- a/tests/test_cli_zign.py +++ b/tests/test_cli_zign.py @@ -7,19 +7,12 @@ def test_create_list_delete(monkeypatch): token = 'abc-123' - response = MagicMock() - response.status_code = 200 - response.json.return_value = {'access_token': token} - - monkeypatch.setattr('keyring.set_password', MagicMock()) - monkeypatch.setattr('requests.get', MagicMock(return_value=response)) - monkeypatch.setattr('stups_cli.config.store_config', MagicMock()) + monkeypatch.setattr('zign.api.perform_implicit_flow', lambda a: {'access_token': token, 'expires_in': 1, 'token_type': 'test'}) runner = CliRunner() with runner.isolated_filesystem(): - result = runner.invoke(cli_zign, ['token', '-n', 'mytok', '--password', 'mypass'], catch_exceptions=False, - input='localhost\n') + result = runner.invoke(cli_zign, ['token', '-n', 'mytok', '--password', 'mypass'], catch_exceptions=False) assert token == result.output.rstrip().split('\n')[-1] @@ -35,96 +28,3 @@ def test_create_list_delete(monkeypatch): # should work again for already deleted tokens result = runner.invoke(cli_zign, ['delete', 'mytok'], catch_exceptions=False) - - -def test_empty_config(monkeypatch): - token = 'abc-123' - - response = MagicMock() - response.status_code = 200 - response.json.return_value = {'access_token': token} - - monkeypatch.setattr('keyring.set_password', MagicMock()) - monkeypatch.setattr('stups_cli.config.load_config', lambda x: {}) - monkeypatch.setattr('stups_cli.config.store_config', lambda x, y: None) - monkeypatch.setattr('requests.get', MagicMock(return_value=response)) - - runner = CliRunner() - - with runner.isolated_filesystem(): - result = runner.invoke(cli_zign, ['token', '-n', 'mytok', '--password', 'mypass'], catch_exceptions=False, - input='localhost\n') - assert token == result.output.rstrip().split('\n')[-1] - - -def test_auth_failure(monkeypatch): - token = 'abc-123' - - def get(url, auth, **kwargs): - response = MagicMock() - if auth[1] == 'correctpass': - response.status_code = 200 - response.json.return_value = {'access_token': token} - else: - response.status_code = 401 - return response - - monkeypatch.setattr('keyring.set_password', MagicMock()) - monkeypatch.setattr('stups_cli.config.load_config', lambda x: {'url': 'http://localhost'}) - monkeypatch.setattr('stups_cli.config.store_config', lambda x, y: None) - monkeypatch.setattr('requests.get', get) - - runner = CliRunner() - - with runner.isolated_filesystem(): - result = runner.invoke(cli_zign, ['token', '-n', 'mytok', '-U', 'myusr', '--password', 'mypass'], - catch_exceptions=False, input='wrongpw\ncorrectpass\n') - assert 'Authentication failed: Token Service returned ' in result.output - assert 'Please check your username and password and try again.' in result.output - assert 'Password for myusr: ' in result.output - assert token == result.output.rstrip().split('\n')[-1] - - -def test_server_error(monkeypatch): - def get(url, **kwargs): - response = MagicMock() - response.status_code = 503 - return response - - monkeypatch.setattr('keyring.set_password', MagicMock()) - monkeypatch.setattr('stups_cli.config.load_config', lambda x: {'url': 'http://localhost'}) - monkeypatch.setattr('stups_cli.config.store_config', lambda x, y: None) - monkeypatch.setattr('requests.get', get) - - runner = CliRunner() - - with runner.isolated_filesystem(): - result = runner.invoke(cli_zign, ['token', '-n', 'mytok', '-U', 'myusr', '--password', 'mypass'], - catch_exceptions=False) - assert 'Server error: Token Service returned HTTP status 503' in result.output - - -def test_user_config(monkeypatch): - token = 'abc-123' - - response = MagicMock() - response.status_code = 200 - response.json.return_value = {'access_token': token} - - def get_token(url, auth, **kwargs): - assert url == 'https://localhost/access_token' - user, passwd = auth - assert user == 'jdoe' - return response - - monkeypatch.setattr('keyring.set_password', MagicMock()) - monkeypatch.setattr('stups_cli.config.load_config', - lambda x: {'user': 'jdoe', 'url': 'https://localhost/access_token'}) - monkeypatch.setattr('stups_cli.config.store_config', lambda x, y: None) - monkeypatch.setattr('requests.get', get_token) - - runner = CliRunner() - - with runner.isolated_filesystem(): - result = runner.invoke(cli_zign, ['token', '-n', 'mytok', '--password', 'mypass'], catch_exceptions=False) - assert token == result.output.rstrip().split('\n')[-1] diff --git a/zign/api.py b/zign/api.py index 6a1ced4..c49c901 100644 --- a/zign/api.py +++ b/zign/api.py @@ -1,6 +1,6 @@ import click from clickclick import error, info, UrlType -import keyring +import logging import os import stups_cli.config import time @@ -10,91 +10,16 @@ import webbrowser import yaml -from .config import KEYRING_KEY, OLD_CONFIG_NAME, CONFIG_NAME, REFRESH_TOKEN_FILE_PATH, TOKENS_FILE_PATH -from oauth2client import tools +from .oauth2 import ClientRedirectServer + +from .config import OLD_CONFIG_NAME, CONFIG_NAME, REFRESH_TOKEN_FILE_PATH, TOKENS_FILE_PATH from requests import RequestException -from urllib.parse import parse_qs from urllib.parse import urlparse from urllib.parse import urlunsplit TOKEN_MINIMUM_VALIDITY_SECONDS = 60*5 # 5 minutes -SUCCESS_PAGE = ''' - -
-You are now authenticated with Zign.
-The authentication flow has completed. You may close this window.
- -''' - -EXTRACT_TOKEN_PAGE = ''' - - -Redirecting...
- - -''' - -ERROR_PAGE = ''' - - -The authentication flow did not complete successfully. Please try again. You may close this - window.
- -''' +logger = logging.getLogger('zign.api') class ServerError(Exception): @@ -118,30 +43,6 @@ def __str__(self): return 'Configuration error: {}'.format(self.msg) -class ClientRedirectHandler(tools.ClientRedirectHandler): - '''Handles OAuth 2.0 redirect and return a success page if the flow has completed.''' - - def do_GET(self): - '''Handle the GET request from the redirect. - - Parses the token from the query parameters and returns a success page if the flow has completed''' - - self.send_response(200) - self.send_header('Content-type', 'text/html') - self.end_headers() - query_string = urlparse(self.path).query - - if not query_string: - self.wfile.write(EXTRACT_TOKEN_PAGE.format(port=self.server.server_port).encode('utf-8')) - else: - self.server.query_params = parse_qs(query_string) - if 'access_token' in self.server.query_params: - page = SUCCESS_PAGE - else: - page = ERROR_PAGE - self.wfile.write(page.encode('utf-8')) - - def get_config(config_module=None, override=None): '''Returns the specified module's configuration. Defaults to ztoken. @@ -198,6 +99,8 @@ def load_config_ztoken(config_file: str): def get_new_token(realm: str, scope: list, user, password, url=None, insecure=False): + logger.warning('"get_new_token" is deprecated, please use "zign.api.get_token" instead') + if not url: config = get_config(OLD_CONFIG_NAME) url = config.get('url') @@ -247,47 +150,7 @@ def store_config_ztoken(data: dict, path: str): yaml.safe_dump(data, fd) -def get_token_implicit_flow(name=None, authorize_url=None, token_url=None, client_id=None, business_partner_id=None, - refresh=False): - '''Gets a Platform IAM access token using browser redirect flow''' - - override = {'name': name, - 'authorize_url': authorize_url, - 'token_url': token_url, - 'client_id': client_id, - 'business_partner_id': business_partner_id} - config = get_config(CONFIG_NAME, override=override) - - if name and not refresh: - existing_token = get_existing_token(name) - # This will clear any non-JWT tokens - if existing_token and existing_token.get('access_token').count('.') >= 2: - return existing_token - - data = load_config_ztoken(REFRESH_TOKEN_FILE_PATH) - - # Always try with refresh token first - refresh_token = data.get('refresh_token') - if refresh_token: - payload = {'grant_type': 'refresh_token', - 'client_id': config['client_id'], - 'business_partner_id': config['business_partner_id'], - 'refresh_token': refresh_token} - try: - r = requests.post(config['token_url'], timeout=20, data=payload) - r.raise_for_status() - - token = r.json() - token['scope'] = '' - if name: - token['name'] = name - store_token(name, token) - - # Store the latest refresh token - store_config_ztoken({'refresh_token': token['refresh_token']}, REFRESH_TOKEN_FILE_PATH) - return token - except RequestException as exception: - error(exception) +def perform_implicit_flow(config: dict): # Get new token success = False @@ -297,7 +160,7 @@ def get_token_implicit_flow(name=None, authorize_url=None, token_url=None, clien while True: try: - httpd = tools.ClientRedirectServer(('localhost', port_number), ClientRedirectHandler) + httpd = ClientRedirectServer(('localhost', port_number)) except socket.error as e: if port_number > max_port_number: success = False @@ -313,7 +176,7 @@ def get_token_implicit_flow(name=None, authorize_url=None, token_url=None, clien 'client_id': config['client_id'], 'redirect_uri': 'http://localhost:{}'.format(port_number)} - param_list = ['{}={}'.format(key, params[key]) for key in params] + param_list = ['{}={}'.format(key, value) for key, value in sorted(params.items())] param_string = '&'.join(param_list) parsed_authorize_url = urlparse(config['authorize_url']) browser_url = urlunsplit((parsed_authorize_url.scheme, parsed_authorize_url.netloc, parsed_authorize_url.path, @@ -327,7 +190,7 @@ def get_token_implicit_flow(name=None, authorize_url=None, token_url=None, clien os.close(2) os.open(os.devnull, os.O_RDWR) try: - webbrowser.get().open(browser_url, new=1, autoraise=True) + webbrowser.open(browser_url, new=1, autoraise=True) finally: os.dup2(saved_stdout, 1) os.dup2(saved_stderr, 2) @@ -342,79 +205,79 @@ def get_token_implicit_flow(name=None, authorize_url=None, token_url=None, clien # Handle next request, with token httpd.handle_request() - if 'access_token' in httpd.query_params: - token = {'access_token': httpd.query_params['access_token'][0], - 'refresh_token': httpd.query_params['refresh_token'][0], - 'expires_in': int(httpd.query_params['expires_in'][0]), - 'token_type': httpd.query_params['token_type'][0], - 'scope': ''} + return httpd.query_params - store_config_ztoken({'refresh_token': token['refresh_token']}, REFRESH_TOKEN_FILE_PATH) - stups_cli.config.store_config(config, CONFIG_NAME) - if name: - token['name'] = name - store_token(name, token) - return token - else: - raise AuthenticationFailed('Failed to retrieve token') - - -def get_named_token(scope, realm, name, user, password, url=None, - insecure=False, refresh=False, use_keyring=True, prompt=False): - '''get named access token, return existing if still valid''' +def get_token_implicit_flow(name=None, authorize_url=None, token_url=None, client_id=None, business_partner_id=None, + refresh=False): + '''Gets a Platform IAM access token using browser redirect flow''' if name and not refresh: existing_token = get_existing_token(name) - if existing_token: + # This will clear any non-JWT tokens + if existing_token and existing_token.get('access_token').count('.') >= 2: return existing_token - if name and not realm: - access_token = get_service_token(name, scope) - if access_token: - return {'access_token': access_token} - - config = get_config(OLD_CONFIG_NAME) - - url = url or config.get('url') + override = {'name': name, + 'authorize_url': authorize_url, + 'token_url': token_url, + 'client_id': client_id, + 'business_partner_id': business_partner_id} + config = get_config(CONFIG_NAME, override=override) - while not url and prompt: - url = click.prompt('Please enter the OAuth access token service URL', type=UrlType()) + data = load_config_ztoken(REFRESH_TOKEN_FILE_PATH) + # Always try with refresh token first + refresh_token = data.get('refresh_token') + if refresh_token: + payload = {'grant_type': 'refresh_token', + 'client_id': config['client_id'], + 'business_partner_id': config['business_partner_id'], + 'refresh_token': refresh_token} try: - requests.get(url, timeout=5, verify=not insecure) - except: - error('Could not reach {}'.format(url)) - url = None + r = requests.post(config['token_url'], timeout=20, data=payload) + r.raise_for_status() - config['url'] = url + token = r.json() + token['scope'] = '' + if name: + token['name'] = name + store_token(name, token) - stups_cli.config.store_config(config, OLD_CONFIG_NAME) + # Store the latest refresh token + store_config_ztoken({'refresh_token': token['refresh_token']}, REFRESH_TOKEN_FILE_PATH) + return token + except RequestException as exception: + error(exception) - password = password or keyring.get_password(KEYRING_KEY, user) + response = perform_implicit_flow(config) - while True: - if not password and prompt: - password = click.prompt('Password for {}'.format(user), hide_input=True) + if 'access_token' in response: + token = {'access_token': response['access_token'], + 'refresh_token': response.get('refresh_token'), + 'expires_in': int(response['expires_in']), + 'token_type': response['token_type'], + 'scope': ''} - try: - result = get_new_token(realm, scope, user, password, url=url, insecure=insecure) - break - except AuthenticationFailed as e: - if prompt: - error(str(e)) - info('Please check your username and password and try again.') - password = None - else: - raise + if token['refresh_token']: + store_config_ztoken({'refresh_token': token['refresh_token']}, REFRESH_TOKEN_FILE_PATH) + stups_cli.config.store_config(config, CONFIG_NAME) + + if name: + token['name'] = name + store_token(name, token) + return token + else: + raise AuthenticationFailed('Failed to retrieve token') - if result and use_keyring: - keyring.set_password(KEYRING_KEY, user, password) - if name: - store_token(name, result) +def get_named_token(scope, realm, name, user, password, url=None, + insecure=False, refresh=False, use_keyring=True, prompt=False): + '''get named access token, return existing if still valid''' + logger.warning('"get_named_token" is deprecated, please use "zign.api.get_token" instead') - return result + access_token = get_token(name, scope) + return {'access_token': access_token} def is_valid(token: dict): @@ -456,20 +319,7 @@ def get_token(name: str, scopes: list): if access_token: return access_token - config = get_config(OLD_CONFIG_NAME) - user = config.get('user') or os.getenv('ZIGN_USER') or os.getenv('USER') - - if not user: - raise ConfigurationError('Missing OAuth username. ' + - 'Either set "user" in configuration file or ZIGN_USER environment variable.') - - if not config.get('url'): - raise ConfigurationError('Missing OAuth access token service URL. ' + - 'Please set "url" in configuration file.') - - password = os.getenv('ZIGN_PASSWORD') or keyring.get_password(KEYRING_KEY, user) - token = get_new_token(config.get('realm'), scopes, user, password, - url=config.get('url'), insecure=config.get('insecure')) + # TODO: support scopes for implicit flow + token = get_token_implicit_flow(name) if token: - store_token(name, token) return token['access_token'] diff --git a/zign/oauth2.py b/zign/oauth2.py new file mode 100644 index 0000000..1f6bda7 --- /dev/null +++ b/zign/oauth2.py @@ -0,0 +1,123 @@ +from http.server import BaseHTTPRequestHandler, HTTPServer + +from urllib.parse import parse_qs +from urllib.parse import urlparse + +SUCCESS_PAGE = ''' + + +You are now authenticated with Zign.
+The authentication flow has completed. You may close this window.
+ +''' + +EXTRACT_TOKEN_PAGE = ''' + + +Redirecting...
+ + +''' + +ERROR_PAGE = ''' + + +The authentication flow did not complete successfully. Please try again. You may close this + window.
+ +''' + + +class ClientRedirectHandler(BaseHTTPRequestHandler): + '''Handles OAuth 2.0 redirect and return a success page if the flow has completed.''' + + def do_GET(self): + '''Handle the GET request from the redirect. + + Parses the token from the query parameters and returns a success page if the flow has completed''' + + self.send_response(200) + self.send_header('Content-type', 'text/html') + self.end_headers() + query_string = urlparse(self.path).query + + if not query_string: + self.wfile.write(EXTRACT_TOKEN_PAGE.format(port=self.server.server_port).encode('utf-8')) + else: + query_params = {} + for key, val in parse_qs(query_string).items(): + query_params[key] = val[0] + self.server.query_params = query_params + if 'access_token' in self.server.query_params: + page = SUCCESS_PAGE + else: + page = ERROR_PAGE + self.wfile.write(page.encode('utf-8')) + + def log_message(self, format, *args): + """Do not log messages to stdout while running as cmd. line program.""" + + +class ClientRedirectServer(HTTPServer): + """A server to handle OAuth 2.0 redirects back to localhost. + + Waits for a single request and parses the query parameters + into query_params and then stops serving. + """ + query_params = {} + + def __init__(self, address): + super().__init__(address, ClientRedirectHandler)