Skip to content

Commit

Permalink
Update tests requiring the auth api (#201)
Browse files Browse the repository at this point in the history
  • Loading branch information
FedericoNegri authored Aug 3, 2023
1 parent bd1a793 commit 716f815
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 102 deletions.
4 changes: 2 additions & 2 deletions ansys/rep/client/auth/api/auth_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
class AuthApi:
"""A python interface to the Authorization Service API.
Users with admin rights (such as the default ``repadmin`` user) can create new
users as well as modify or delete existing ones. Non-admin users are only allowed
Admin users with the Keycloak "manage-users" role can create new
users as well as modify or delete existing ones. Other users are only allowed
to query the list of existing users.
Parameters
Expand Down
2 changes: 1 addition & 1 deletion doc/source/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ Get file definitions from an existing project Job Definition and replace the fir
Modify and create users
------------------------------------------

Users with admin rights (such as the default ``repadmin`` user) can create new users as well as modify or delete existing ones.
Admin users with the Keycloak "manage-users" role can create new users as well as modify or delete existing ones.

.. code-block:: python
Expand Down
96 changes: 45 additions & 51 deletions tests/auth/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import uuid

from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakError

from ansys.rep.client import Client, REPError
Expand All @@ -18,48 +19,8 @@


class AuthClientTest(REPTestCase):
def test_auth_client(self):

if not self.is_admin:
self.skipTest(f"{self.username} is not an admin user.")

api = AuthApi(self.client)

username = f"test_user_{uuid.uuid4()}"
new_user = User(
username=username,
password="test_auth_client",
email=f"{username}@test.com",
first_name="Test",
last_name="User",
)
new_user = api.create_user(new_user)

self.assertEqual(new_user.username, username)
self.assertEqual(new_user.first_name, "Test")
self.assertEqual(new_user.last_name, "User")
self.assertEqual(new_user.email, f"{username}@test.com")

new_user.email = "update_email@test.com"
new_user.last_name = "Smith"
api.update_user(new_user)

self.assertEqual(new_user.username, username)
self.assertEqual(new_user.first_name, "Test")
self.assertEqual(new_user.last_name, "Smith")
self.assertEqual(new_user.email, "update_email@test.com")

api.delete_user(new_user)

users = api.get_users()
usernames = [x.username for x in users]
self.assertNotIn(new_user.username, usernames)

def test_get_users(self):

if not self.is_admin:
self.skipTest(f"{self.username} is not an admin user.")

api = AuthApi(self.client)

# create a new non-admin user
Expand All @@ -71,7 +32,7 @@ def test_get_users(self):
first_name="Test",
last_name="User",
)
new_user = api.create_user(new_user)
new_user = self.create_user(new_user)
self.assertEqual(new_user.first_name, "Test")
users = api.get_users(max=10)

Expand All @@ -85,7 +46,7 @@ def test_get_users(self):
new_user2 = api.get_user(new_user.id)
self.assertEqual(new_user, new_user2)

api.delete_user(new_user)
self.delete_user(new_user)
users = api.get_users(username=new_user.username)
self.assertEqual(len(users), 0)

Expand All @@ -101,10 +62,6 @@ def test_impersonate_user(self):
Requires activating the token-exchange feature in keycloak
by passing --features=token-exchange to the start command.
"""
if not self.is_admin:
self.skipTest(f"{self.username} is not an admin user.")

api = AuthApi(self.client)

username = f"test_user_{uuid.uuid4()}"
new_user = User(
Expand All @@ -114,15 +71,28 @@ def test_impersonate_user(self):
first_name="Test",
last_name="User",
)
new_user = api.create_user(new_user)
new_user = self.create_user(new_user)

realm_clients = self.keycloak_client.get_clients()
rep_impersonation_client = next(
(x for x in realm_clients if x["clientId"] == "rep-impersonation"), None
)
self.assertTrue(rep_impersonation_client is not None)

client = Client(
client_id=rep_impersonation_client["clientId"],
client_secret=rep_impersonation_client["secret"],
)

r = None
try:
r = authenticate(
url=self.rep_url,
username=self.username,
client_id=rep_impersonation_client["clientId"],
client_secret=rep_impersonation_client["secret"],
scope="opendid offline_access",
grant_type="urn:ietf:params:oauth:grant-type:token-exchange",
subject_token=self.client.access_token,
subject_token=client.access_token,
requested_token_type="urn:ietf:params:oauth:token-type:refresh_token",
requested_subject=new_user.id,
)
Expand All @@ -132,16 +102,40 @@ def test_impersonate_user(self):
f"This test requires to enable the feature 'token-exchange' in keycloak."
)

self.assertTrue(r is not None)
self.assertTrue("refresh_token" in r)

refresh_token_impersonated = r["refresh_token"]

client_impersonated = Client(
self.rep_url,
client.rep_url,
username=new_user.username,
grant_type="refresh_token",
refresh_token=refresh_token_impersonated,
client_id=rep_impersonation_client["clientId"],
client_secret=rep_impersonation_client["secret"],
)

self.assertTrue(client_impersonated.access_token is not None)
self.assertTrue(client_impersonated.refresh_token is not None)

api.delete_user(new_user)
keycloak_openid = KeycloakOpenID(
server_url=client.auth_api_url,
client_id="account",
realm_name="rep",
client_secret_key="**********",
verify=False,
)
KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n"
KEYCLOAK_PUBLIC_KEY += keycloak_openid.public_key()
KEYCLOAK_PUBLIC_KEY += "\n-----END PUBLIC KEY-----"

options = {"verify_signature": True, "verify_aud": True, "verify_exp": True}
token_info = keycloak_openid.decode_token(
client_impersonated.access_token,
key=KEYCLOAK_PUBLIC_KEY,
options=options,
)
self.assertEqual(token_info["preferred_username"], new_user.username)

self.delete_user(new_user)
8 changes: 4 additions & 4 deletions tests/jms/test_project_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_modify_project_permissions(self):
existing_users = [u.username for u in auth_api.get_users()]

if user_credentials["user1"]["username"] not in existing_users:
user1 = auth_api.create_user(
user1 = self.create_user(
User(
username=user_credentials["user1"]["username"],
password=user_credentials["user1"]["password"],
Expand All @@ -96,7 +96,7 @@ def test_modify_project_permissions(self):
log.info(f"User 1: {user1}")

if user_credentials["user2"]["username"] not in existing_users:
user2 = auth_api.create_user(
user2 = self.create_user(
User(
username=user_credentials["user2"]["username"],
password=user_credentials["user2"]["password"],
Expand Down Expand Up @@ -178,8 +178,8 @@ def test_modify_project_permissions(self):
self.assertTrue(except_obj.response.status_code, 403)

root_api1.delete_project(proj)
auth_api.delete_user(user1)
auth_api.delete_user(user2)
self.delete_user(user1)
self.delete_user(user2)


if __name__ == "__main__":
Expand Down
67 changes: 25 additions & 42 deletions tests/jms/test_task_definition_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@

import json
import logging
from typing import Tuple
import unittest
import uuid

from marshmallow.utils import missing

from ansys.rep.client import Client, REPError
from ansys.rep.client.auth import AuthApi, User
from ansys.rep.client import REPError
from ansys.rep.client.auth import AuthApi
from ansys.rep.client.jms import JmsApi
from ansys.rep.client.jms.resource import (
HpcResources,
Expand All @@ -29,25 +28,6 @@
log = logging.getLogger(__name__)


def create_new_user_client(
admin_cl: Client,
username=None,
password="test",
) -> Tuple[User, Client]:

if username is None:
username = f"testuser-{uuid.uuid4().hex[:8]}"

auth_api = AuthApi(admin_cl)
user = auth_api.create_user(User(username=username, password=password))
client = Client(
rep_url=admin_cl.rep_url,
username=user.username,
password=password,
)
return user, client


class TaskDefinitionTemplateTest(REPTestCase):
def test_template_deserialization(self):

Expand Down Expand Up @@ -214,8 +194,7 @@ def test_template_permissions(self):
self.assertIsNotNone(permissions[0].value_id)

# create test user
auth_api = AuthApi(client)
user1, client1 = create_new_user_client(client)
user1, client1 = self.create_new_user_client()
jms_api1 = JmsApi(client1)

# verify test user can't access the template
Expand Down Expand Up @@ -267,17 +246,18 @@ def test_template_permissions(self):
self.assertEqual(permissions[0].value_id, user1.id)

# verify that an admin user can access the template
admin_templates = jms_api.get_task_definition_templates(id=template.id)
log.info(admin_templates)
self.assertEqual(len(admin_templates), 1)
self.assertEqual(admin_templates[0].name, template.name)
self.assertEqual(admin_templates[0].version, template.version)
if self.is_admin:
admin_templates = jms_api.get_task_definition_templates(id=template.id)
log.info(admin_templates)
self.assertEqual(len(admin_templates), 1)
self.assertEqual(admin_templates[0].name, template.name)
self.assertEqual(admin_templates[0].version, template.version)

# Delete template
jms_api1.delete_task_definition_templates([template])

# Delete user
auth_api.delete_user(user1)
self.delete_user(user1)

def test_template_permissions_update(self):

Expand All @@ -291,14 +271,18 @@ def test_template_permissions_update(self):
self.assertEqual(len(permissions), 1)
self.assertEqual(permissions[0].permission_type, "user")

# remove permissions
permissions = []
# change permissions
permissions = [Permission(permission_type="anyone", role="admin", value_id=None)]
permissions = jms_api.update_task_definition_template_permissions(
template_id=template.id, permissions=permissions
)
self.assertEqual(len(permissions), 0)
self.assertEqual(len(permissions), 1)
permissions = jms_api.get_task_definition_template_permissions(template_id=template.id)
self.assertEqual(len(permissions), 0)
self.assertEqual(len(permissions), 1)
self.assertEqual(permissions[0].permission_type, "anyone")

# delete template
jms_api.delete_task_definition_templates([template])

def test_template_anyone_permission(self):

Expand All @@ -315,8 +299,7 @@ def test_template_anyone_permission(self):
self.assertIsNotNone(permissions[0].value_id)

# create test user
auth_api = AuthApi(client)
user1, client1 = create_new_user_client(client)
user1, client1 = self.create_new_user_client()
jms_api1 = JmsApi(client1)

# verify test user can't access the template
Expand Down Expand Up @@ -360,7 +343,7 @@ def test_template_anyone_permission(self):
jms_api.delete_task_definition_templates([template])

# Delete user
auth_api.delete_user(user1)
self.delete_user(user1)

def test_template_delete(self):

Expand All @@ -369,10 +352,10 @@ def test_template_delete(self):

# create 2 non-admin users
jms_api = JmsApi(client)
user1, client1 = create_new_user_client(client)
user1, client1 = self.create_new_user_client()
self.assertFalse(auth_api.user_is_admin(user1.id))
jms_api1 = JmsApi(client1)
user2, client2 = create_new_user_client(client)
user2, client2 = self.create_new_user_client()
self.assertFalse(auth_api.user_is_admin(user2.id))
jms_api2 = JmsApi(client2)

Expand Down Expand Up @@ -410,11 +393,11 @@ def test_template_delete(self):
self.assertEqual(except_obj.description, "Access to this resource has been restricted")

# Delete the template
jms_api.delete_task_definition_templates([template])
jms_api1.delete_task_definition_templates([template])

# Delete users
auth_api.delete_user(user1)
auth_api.delete_user(user2)
self.delete_user(user1)
self.delete_user(user2)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 716f815

Please sign in to comment.