Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a method to authenticate via an action token #517

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ansys/hps/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from .__version__ import __ansys_apps_version__, __version__
from .auth import AuthApi
from .authenticate import authenticate
from .authenticate import authenticate, authenticate_via_action_token
from .client import Client
from .exceptions import APIError, ClientError, HPSError
from .jms import JmsApi, ProjectApi
Expand Down
162 changes: 161 additions & 1 deletion src/ansys/hps/client/authenticate.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"""Module that provides authentication for the user with a password or refresh token against the
HPS authentication service."""
import logging
from typing import Union
from typing import List, Union

import requests

Expand Down Expand Up @@ -155,3 +155,163 @@ def authenticate(

raise_for_status(r)
return r.json()


def authenticate_via_action_token(
auth_url: str = "https://127.0.0.1:8443/hps/auth/realms/rep",
action_token_pathprefix: str = "job-action-token",
username: str = None,
scope: List[str] = ["openid"],
client_id: str = "rep-cli",
audience: List[str] = None,
access_token: str = None,
timeout: float = 10.0,
verify: Union[bool, str] = True,
**kwargs,
):
"""
Authenticates the user by requesting an action token.

If this method is successful, the response includes access and refresh tokens.

Parameters
----------

auth_url : str, optional
Base path for the server to call. The default is ``'https://127.0.0.1:8443/hps/auth/realms/rep'``.
scope : str, optional
String containing one or more requested scopes. The default is ``'openid'``.
client_id : str, optional
Client type. The default is ``'rep-cli'``.
client_secret : str, optional
Client secret. The default is ``None``.
username : str, optional
Username.
timeout : float, optional
Timeout in seconds. The default is ``10.0``.
verify: Union[bool, str], optional
If a Boolean, whether to verify the server's TLS certificate. If a string, the
path to the CA bundle to use. For more information, see the :class:`requests.Session`
documentation.

Returns
-------
dict
JSON-encoded content of a :class:`requests.Response` object.
"""

action_token_resp = request_action_token(
auth_url,
action_token_pathprefix,
username,
scope,
client_id,
access_token,
audience,
timeout=timeout,
verify=verify,
**kwargs,
)

auth_api = action_token_resp["link"]

log.debug(f"Action token received {action_token_resp['token']}.")

with requests.Session() as session:
session.verify = verify
resp = session.get(auth_api, timeout=timeout)

raise_for_status(resp)

resp_jsonify = resp.json()

log.debug(
f"Tokens generated via an action token. \
Access token: {resp_jsonify['access_token']}, \
Refresh token: {resp_jsonify['refresh_token']}."
)

return resp_jsonify


def request_action_token(
auth_url: str = "https://127.0.0.1:8443/hps/auth/realms/rep",
action_token_pathprefix: str = "job-action-token",
username: str = None,
scope: List[str] = ["openid"],
client_id: str = "job-api-call",
access_token: str = None,
audience: List[str] = None,
timeout: float = 10.0,
verify: Union[bool, str] = True,
**kwargs,
):
"""
Request an action token for a user fully authenticated.

If this method is successful, the response includes an action token and
the link that can be used to get an access and refresh tokens.

Parameters
----------

auth_url : str, optional
Base path for the server to call. The default is ``'https://127.0.0.1:8443/rep'``.
username : str, optional
Username of the user to impersonate.
action_token_pathprefix : str, optional
Path prefix of the action token endpoint.
scope : str, optional
String containing one or more requested scopes of the impersonated session. The default is ``'openid'``.
client_id : str, optional
Client type. The default is ``'rep-cli'``.
audience : str, optional.
The audience of the impersonated session (clientId).
acces_token : str, optional
Access token.
timeout : float, optional
Timeout in seconds. The default is ``10.0``.
verify: Union[bool, str], optional
If a Boolean, whether to verify the server's TLS certificate. If a string, the
path to the CA bundle to use. For more information, see the :class:`requests.Session`
documentation.

Returns
-------
dict
JSON-encoded content of a :class:`requests.Response` object.
"""

if not access_token:
raise ValueError("Access token must be provided to request an action token.")
if not username:
raise ValueError("A username must be provided to request an action token.")

auth_url = str(auth_url)
action_token_url = auth_url.rstrip("/") + "/" + action_token_pathprefix

with requests.Session() as session:
session.verify = verify
session.headers.update({"content-type": "application/json"})
session.headers.update({"Authorization": f"Bearer {access_token}"})

data = {
"clientId": client_id,
"scope": scope,
"username": username,
}

if audience:
data["audience"] = audience

for key, value in kwargs.items():
data[key] = value

log.debug(
f"Requesting an action token for {client_id} from {auth_url} to impersonate {username}."
)

resp = session.post(action_token_url, json=data, timeout=timeout)

raise_for_status(resp)
return resp.json()
Loading