From 716f815a0541686bd0598eca9063154a5b0a9c6a Mon Sep 17 00:00:00 2001 From: Federico Negri Date: Thu, 3 Aug 2023 07:50:55 +0200 Subject: [PATCH] Update tests requiring the auth api (#201) --- ansys/rep/client/auth/api/auth_api.py | 4 +- doc/source/quickstart.rst | 2 +- tests/auth/test_api.py | 96 ++++++++++----------- tests/jms/test_project_permissions.py | 8 +- tests/jms/test_task_definition_templates.py | 67 ++++++-------- tests/rep_test.py | 49 ++++++++++- 6 files changed, 124 insertions(+), 102 deletions(-) diff --git a/ansys/rep/client/auth/api/auth_api.py b/ansys/rep/client/auth/api/auth_api.py index a2c92892..0b30a158 100644 --- a/ansys/rep/client/auth/api/auth_api.py +++ b/ansys/rep/client/auth/api/auth_api.py @@ -17,8 +17,8 @@ class AuthApi: """A python interface to the Authorization Service API. - Users with admin rights (such as the default ``repadmin`` user) can create new - users as well as modify or delete existing ones. Non-admin users are only allowed + Admin users with the Keycloak "manage-users" role can create new + users as well as modify or delete existing ones. Other users are only allowed to query the list of existing users. Parameters diff --git a/doc/source/quickstart.rst b/doc/source/quickstart.rst index d12ba3b7..d6c1e555 100644 --- a/doc/source/quickstart.rst +++ b/doc/source/quickstart.rst @@ -278,7 +278,7 @@ Get file definitions from an existing project Job Definition and replace the fir Modify and create users ------------------------------------------ -Users with admin rights (such as the default ``repadmin`` user) can create new users as well as modify or delete existing ones. +Admin users with the Keycloak "manage-users" role can create new users as well as modify or delete existing ones. .. code-block:: python diff --git a/tests/auth/test_api.py b/tests/auth/test_api.py index ae825cc5..550fbd9a 100644 --- a/tests/auth/test_api.py +++ b/tests/auth/test_api.py @@ -8,6 +8,7 @@ import logging import uuid +from keycloak import KeycloakOpenID from keycloak.exceptions import KeycloakError from ansys.rep.client import Client, REPError @@ -18,48 +19,8 @@ class AuthClientTest(REPTestCase): - def test_auth_client(self): - - if not self.is_admin: - self.skipTest(f"{self.username} is not an admin user.") - - api = AuthApi(self.client) - - username = f"test_user_{uuid.uuid4()}" - new_user = User( - username=username, - password="test_auth_client", - email=f"{username}@test.com", - first_name="Test", - last_name="User", - ) - new_user = api.create_user(new_user) - - self.assertEqual(new_user.username, username) - self.assertEqual(new_user.first_name, "Test") - self.assertEqual(new_user.last_name, "User") - self.assertEqual(new_user.email, f"{username}@test.com") - - new_user.email = "update_email@test.com" - new_user.last_name = "Smith" - api.update_user(new_user) - - self.assertEqual(new_user.username, username) - self.assertEqual(new_user.first_name, "Test") - self.assertEqual(new_user.last_name, "Smith") - self.assertEqual(new_user.email, "update_email@test.com") - - api.delete_user(new_user) - - users = api.get_users() - usernames = [x.username for x in users] - self.assertNotIn(new_user.username, usernames) - def test_get_users(self): - if not self.is_admin: - self.skipTest(f"{self.username} is not an admin user.") - api = AuthApi(self.client) # create a new non-admin user @@ -71,7 +32,7 @@ def test_get_users(self): first_name="Test", last_name="User", ) - new_user = api.create_user(new_user) + new_user = self.create_user(new_user) self.assertEqual(new_user.first_name, "Test") users = api.get_users(max=10) @@ -85,7 +46,7 @@ def test_get_users(self): new_user2 = api.get_user(new_user.id) self.assertEqual(new_user, new_user2) - api.delete_user(new_user) + self.delete_user(new_user) users = api.get_users(username=new_user.username) self.assertEqual(len(users), 0) @@ -101,10 +62,6 @@ def test_impersonate_user(self): Requires activating the token-exchange feature in keycloak by passing --features=token-exchange to the start command. """ - if not self.is_admin: - self.skipTest(f"{self.username} is not an admin user.") - - api = AuthApi(self.client) username = f"test_user_{uuid.uuid4()}" new_user = User( @@ -114,15 +71,28 @@ def test_impersonate_user(self): first_name="Test", last_name="User", ) - new_user = api.create_user(new_user) + new_user = self.create_user(new_user) + realm_clients = self.keycloak_client.get_clients() + rep_impersonation_client = next( + (x for x in realm_clients if x["clientId"] == "rep-impersonation"), None + ) + self.assertTrue(rep_impersonation_client is not None) + + client = Client( + client_id=rep_impersonation_client["clientId"], + client_secret=rep_impersonation_client["secret"], + ) + + r = None try: r = authenticate( url=self.rep_url, - username=self.username, + client_id=rep_impersonation_client["clientId"], + client_secret=rep_impersonation_client["secret"], scope="opendid offline_access", grant_type="urn:ietf:params:oauth:grant-type:token-exchange", - subject_token=self.client.access_token, + subject_token=client.access_token, requested_token_type="urn:ietf:params:oauth:token-type:refresh_token", requested_subject=new_user.id, ) @@ -132,16 +102,40 @@ def test_impersonate_user(self): f"This test requires to enable the feature 'token-exchange' in keycloak." ) + self.assertTrue(r is not None) + self.assertTrue("refresh_token" in r) + refresh_token_impersonated = r["refresh_token"] client_impersonated = Client( - self.rep_url, + client.rep_url, username=new_user.username, grant_type="refresh_token", refresh_token=refresh_token_impersonated, + client_id=rep_impersonation_client["clientId"], + client_secret=rep_impersonation_client["secret"], ) self.assertTrue(client_impersonated.access_token is not None) self.assertTrue(client_impersonated.refresh_token is not None) - api.delete_user(new_user) + keycloak_openid = KeycloakOpenID( + server_url=client.auth_api_url, + client_id="account", + realm_name="rep", + client_secret_key="**********", + verify=False, + ) + KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + KEYCLOAK_PUBLIC_KEY += keycloak_openid.public_key() + KEYCLOAK_PUBLIC_KEY += "\n-----END PUBLIC KEY-----" + + options = {"verify_signature": True, "verify_aud": True, "verify_exp": True} + token_info = keycloak_openid.decode_token( + client_impersonated.access_token, + key=KEYCLOAK_PUBLIC_KEY, + options=options, + ) + self.assertEqual(token_info["preferred_username"], new_user.username) + + self.delete_user(new_user) diff --git a/tests/jms/test_project_permissions.py b/tests/jms/test_project_permissions.py index e1eacf04..beaa0f09 100644 --- a/tests/jms/test_project_permissions.py +++ b/tests/jms/test_project_permissions.py @@ -80,7 +80,7 @@ def test_modify_project_permissions(self): existing_users = [u.username for u in auth_api.get_users()] if user_credentials["user1"]["username"] not in existing_users: - user1 = auth_api.create_user( + user1 = self.create_user( User( username=user_credentials["user1"]["username"], password=user_credentials["user1"]["password"], @@ -96,7 +96,7 @@ def test_modify_project_permissions(self): log.info(f"User 1: {user1}") if user_credentials["user2"]["username"] not in existing_users: - user2 = auth_api.create_user( + user2 = self.create_user( User( username=user_credentials["user2"]["username"], password=user_credentials["user2"]["password"], @@ -178,8 +178,8 @@ def test_modify_project_permissions(self): self.assertTrue(except_obj.response.status_code, 403) root_api1.delete_project(proj) - auth_api.delete_user(user1) - auth_api.delete_user(user2) + self.delete_user(user1) + self.delete_user(user2) if __name__ == "__main__": diff --git a/tests/jms/test_task_definition_templates.py b/tests/jms/test_task_definition_templates.py index 0c2696bb..66d24e1b 100644 --- a/tests/jms/test_task_definition_templates.py +++ b/tests/jms/test_task_definition_templates.py @@ -8,14 +8,13 @@ import json import logging -from typing import Tuple import unittest import uuid from marshmallow.utils import missing -from ansys.rep.client import Client, REPError -from ansys.rep.client.auth import AuthApi, User +from ansys.rep.client import REPError +from ansys.rep.client.auth import AuthApi from ansys.rep.client.jms import JmsApi from ansys.rep.client.jms.resource import ( HpcResources, @@ -29,25 +28,6 @@ log = logging.getLogger(__name__) -def create_new_user_client( - admin_cl: Client, - username=None, - password="test", -) -> Tuple[User, Client]: - - if username is None: - username = f"testuser-{uuid.uuid4().hex[:8]}" - - auth_api = AuthApi(admin_cl) - user = auth_api.create_user(User(username=username, password=password)) - client = Client( - rep_url=admin_cl.rep_url, - username=user.username, - password=password, - ) - return user, client - - class TaskDefinitionTemplateTest(REPTestCase): def test_template_deserialization(self): @@ -214,8 +194,7 @@ def test_template_permissions(self): self.assertIsNotNone(permissions[0].value_id) # create test user - auth_api = AuthApi(client) - user1, client1 = create_new_user_client(client) + user1, client1 = self.create_new_user_client() jms_api1 = JmsApi(client1) # verify test user can't access the template @@ -267,17 +246,18 @@ def test_template_permissions(self): self.assertEqual(permissions[0].value_id, user1.id) # verify that an admin user can access the template - admin_templates = jms_api.get_task_definition_templates(id=template.id) - log.info(admin_templates) - self.assertEqual(len(admin_templates), 1) - self.assertEqual(admin_templates[0].name, template.name) - self.assertEqual(admin_templates[0].version, template.version) + if self.is_admin: + admin_templates = jms_api.get_task_definition_templates(id=template.id) + log.info(admin_templates) + self.assertEqual(len(admin_templates), 1) + self.assertEqual(admin_templates[0].name, template.name) + self.assertEqual(admin_templates[0].version, template.version) # Delete template jms_api1.delete_task_definition_templates([template]) # Delete user - auth_api.delete_user(user1) + self.delete_user(user1) def test_template_permissions_update(self): @@ -291,14 +271,18 @@ def test_template_permissions_update(self): self.assertEqual(len(permissions), 1) self.assertEqual(permissions[0].permission_type, "user") - # remove permissions - permissions = [] + # change permissions + permissions = [Permission(permission_type="anyone", role="admin", value_id=None)] permissions = jms_api.update_task_definition_template_permissions( template_id=template.id, permissions=permissions ) - self.assertEqual(len(permissions), 0) + self.assertEqual(len(permissions), 1) permissions = jms_api.get_task_definition_template_permissions(template_id=template.id) - self.assertEqual(len(permissions), 0) + self.assertEqual(len(permissions), 1) + self.assertEqual(permissions[0].permission_type, "anyone") + + # delete template + jms_api.delete_task_definition_templates([template]) def test_template_anyone_permission(self): @@ -315,8 +299,7 @@ def test_template_anyone_permission(self): self.assertIsNotNone(permissions[0].value_id) # create test user - auth_api = AuthApi(client) - user1, client1 = create_new_user_client(client) + user1, client1 = self.create_new_user_client() jms_api1 = JmsApi(client1) # verify test user can't access the template @@ -360,7 +343,7 @@ def test_template_anyone_permission(self): jms_api.delete_task_definition_templates([template]) # Delete user - auth_api.delete_user(user1) + self.delete_user(user1) def test_template_delete(self): @@ -369,10 +352,10 @@ def test_template_delete(self): # create 2 non-admin users jms_api = JmsApi(client) - user1, client1 = create_new_user_client(client) + user1, client1 = self.create_new_user_client() self.assertFalse(auth_api.user_is_admin(user1.id)) jms_api1 = JmsApi(client1) - user2, client2 = create_new_user_client(client) + user2, client2 = self.create_new_user_client() self.assertFalse(auth_api.user_is_admin(user2.id)) jms_api2 = JmsApi(client2) @@ -410,11 +393,11 @@ def test_template_delete(self): self.assertEqual(except_obj.description, "Access to this resource has been restricted") # Delete the template - jms_api.delete_task_definition_templates([template]) + jms_api1.delete_task_definition_templates([template]) # Delete users - auth_api.delete_user(user1) - auth_api.delete_user(user2) + self.delete_user(user1) + self.delete_user(user2) if __name__ == "__main__": diff --git a/tests/rep_test.py b/tests/rep_test.py index d37b0b3e..5d0043e5 100644 --- a/tests/rep_test.py +++ b/tests/rep_test.py @@ -8,10 +8,15 @@ import logging import os +from typing import Tuple import unittest +import uuid + +from keycloak import KeycloakAdmin from ansys.rep.client import Client -from ansys.rep.client.auth import AuthApi +from ansys.rep.client.auth import AuthApi, User +from ansys.rep.client.auth.api.auth_api import create_user class REPTestCase(unittest.TestCase): @@ -29,6 +34,8 @@ def setUp(self): self.rep_url = os.environ.get("REP_TEST_URL") or "https://127.0.0.1:8443/rep" self.username = os.environ.get("REP_TEST_USERNAME") or "repadmin" self.password = os.environ.get("REP_TEST_PASSWORD") or "repadmin" + self.keycloak_username = os.environ.get("REP_TEST_KEYCLOAK_USERNAME") or "keycloak" + self.keycloak_password = os.environ.get("REP_TEST_KEYCLOAK_PASSWORD") or "keycloak123" # Create a unique run_id (to be used when creating new projects) # to avoid conflicts in case of @@ -46,6 +53,7 @@ def setUp(self): self.run_id = f"{agent_id}_{build_id}".lower() self._client = None + self._keycloak_client = None self._is_admin = None def tearDown(self): @@ -53,11 +61,25 @@ def tearDown(self): pass @property - def client(self): + def client(self) -> Client: if self._client is None: self._client = Client(self.rep_url, self.username, self.password) return self._client + @property + def keycloak_client(self) -> KeycloakAdmin: + if self._keycloak_client is None: + self._keycloak_client = KeycloakAdmin( + server_url=self.client.auth_api_url, + username=self.keycloak_username, + password=self.keycloak_password, + realm_name="master", + client_id="admin-cli", + verify=False, + ) + self._keycloak_client.realm_name = "rep" + return self._keycloak_client + @property def is_admin(self): if self._is_admin is None: @@ -66,3 +88,26 @@ def is_admin(self): self.assertEqual(len(users), 1) self._is_admin = api.user_is_admin(users[0].id) return self._is_admin + + def create_user(self, user: User) -> User: + return create_user(self.keycloak_client, user) + + def delete_user(self, user: User) -> User: + return self.keycloak_client.delete_user(user.id) + + def create_new_user_client( + self, + username=None, + password="test", + ) -> Tuple[User, Client]: + + if username is None: + username = f"testuser-{uuid.uuid4().hex[:8]}" + + user = self.create_user(User(username=username, password=password)) + client = Client( + rep_url=self.rep_url, + username=user.username, + password=password, + ) + return user, client