diff --git a/src/ansys/hps/client/__init__.py b/src/ansys/hps/client/__init__.py index 5132e93a..2fc6f64e 100644 --- a/src/ansys/hps/client/__init__.py +++ b/src/ansys/hps/client/__init__.py @@ -23,7 +23,7 @@ from .__version__ import __ansys_apps_version__, __version__ from .auth import AuthApi -from .authenticate import authenticate +from .authenticate import authenticate, authenticate_via_action_token from .client import Client from .exceptions import APIError, ClientError, HPSError from .jms import JmsApi, ProjectApi diff --git a/src/ansys/hps/client/authenticate.py b/src/ansys/hps/client/authenticate.py index d15de453..51a7f58b 100644 --- a/src/ansys/hps/client/authenticate.py +++ b/src/ansys/hps/client/authenticate.py @@ -22,7 +22,7 @@ """Module that provides authentication for the user with a password or refresh token against the HPS authentication service.""" import logging -from typing import Union +from typing import List, Union import requests @@ -155,3 +155,163 @@ def authenticate( raise_for_status(r) return r.json() + + +def authenticate_via_action_token( + auth_url: str = "https://127.0.0.1:8443/hps/auth/realms/rep", + action_token_pathprefix: str = "job-action-token", + username: str = None, + scope: List[str] = ["openid"], + client_id: str = "rep-cli", + audience: List[str] = None, + access_token: str = None, + timeout: float = 10.0, + verify: Union[bool, str] = True, + **kwargs, +): + """ + Authenticates the user by requesting an action token. + + If this method is successful, the response includes access and refresh tokens. + + Parameters + ---------- + + auth_url : str, optional + Base path for the server to call. The default is ``'https://127.0.0.1:8443/hps/auth/realms/rep'``. + scope : str, optional + String containing one or more requested scopes. The default is ``'openid'``. + client_id : str, optional + Client type. The default is ``'rep-cli'``. + client_secret : str, optional + Client secret. The default is ``None``. + username : str, optional + Username. + timeout : float, optional + Timeout in seconds. The default is ``10.0``. + verify: Union[bool, str], optional + If a Boolean, whether to verify the server's TLS certificate. If a string, the + path to the CA bundle to use. For more information, see the :class:`requests.Session` + documentation. + + Returns + ------- + dict + JSON-encoded content of a :class:`requests.Response` object. + """ + + action_token_resp = request_action_token( + auth_url, + action_token_pathprefix, + username, + scope, + client_id, + access_token, + audience, + timeout=timeout, + verify=verify, + **kwargs, + ) + + auth_api = action_token_resp["link"] + + log.debug(f"Action token received {action_token_resp['token']}.") + + with requests.Session() as session: + session.verify = verify + resp = session.get(auth_api, timeout=timeout) + + raise_for_status(resp) + + resp_jsonify = resp.json() + + log.debug( + f"Tokens generated via an action token. \ + Access token: {resp_jsonify['access_token']}, \ + Refresh token: {resp_jsonify['refresh_token']}." + ) + + return resp_jsonify + + +def request_action_token( + auth_url: str = "https://127.0.0.1:8443/hps/auth/realms/rep", + action_token_pathprefix: str = "job-action-token", + username: str = None, + scope: List[str] = ["openid"], + client_id: str = "job-api-call", + access_token: str = None, + audience: List[str] = None, + timeout: float = 10.0, + verify: Union[bool, str] = True, + **kwargs, +): + """ + Request an action token for a user fully authenticated. + + If this method is successful, the response includes an action token and + the link that can be used to get an access and refresh tokens. + + Parameters + ---------- + + auth_url : str, optional + Base path for the server to call. The default is ``'https://127.0.0.1:8443/rep'``. + username : str, optional + Username of the user to impersonate. + action_token_pathprefix : str, optional + Path prefix of the action token endpoint. + scope : str, optional + String containing one or more requested scopes of the impersonated session. The default is ``'openid'``. + client_id : str, optional + Client type. The default is ``'rep-cli'``. + audience : str, optional. + The audience of the impersonated session (clientId). + acces_token : str, optional + Access token. + timeout : float, optional + Timeout in seconds. The default is ``10.0``. + verify: Union[bool, str], optional + If a Boolean, whether to verify the server's TLS certificate. If a string, the + path to the CA bundle to use. For more information, see the :class:`requests.Session` + documentation. + + Returns + ------- + dict + JSON-encoded content of a :class:`requests.Response` object. + """ + + if not access_token: + raise ValueError("Access token must be provided to request an action token.") + if not username: + raise ValueError("A username must be provided to request an action token.") + + auth_url = str(auth_url) + action_token_url = auth_url.rstrip("/") + "/" + action_token_pathprefix + + with requests.Session() as session: + session.verify = verify + session.headers.update({"content-type": "application/json"}) + session.headers.update({"Authorization": f"Bearer {access_token}"}) + + data = { + "clientId": client_id, + "scope": scope, + "username": username, + } + + if audience: + data["audience"] = audience + + for key, value in kwargs.items(): + data[key] = value + + log.debug( + f"Requesting an action token for {client_id} from {auth_url} to impersonate {username}." + ) + + resp = session.post(action_token_url, json=data, timeout=timeout) + + raise_for_status(resp) + return resp.json()