From 95429539176a532a1ae7c6c993498e5ce61a0299 Mon Sep 17 00:00:00 2001 From: Ian Liu <81595625+ianliuwk1019@users.noreply.github.com> Date: Wed, 2 Oct 2024 14:59:57 -0700 Subject: [PATCH] feat: #1538 store audit records tests (#1605) Co-authored-by: Craig Yu --- .../crud/services/permission_audit_service.py | 4 +- server/backend/testspg/conftest.py | 78 +++- server/backend/testspg/constants.py | 15 + .../services/test_permission_audit_service.py | 410 ++++++++++++++++++ .../validator/test_forest_client_validator.py | 22 +- .../test_router_user_role_assignment.py | 27 ++ 6 files changed, 525 insertions(+), 31 deletions(-) create mode 100644 server/backend/testspg/crud/services/test_permission_audit_service.py diff --git a/server/backend/api/app/crud/services/permission_audit_service.py b/server/backend/api/app/crud/services/permission_audit_service.py index 71d6f14c9..5570a85ef 100644 --- a/server/backend/api/app/crud/services/permission_audit_service.py +++ b/server/backend/api/app/crud/services/permission_audit_service.py @@ -98,7 +98,7 @@ def to_change_performer_user_details( @staticmethod def to_enduser_privliege_granted_details( enduser_privliege_list: List[FamUserRoleAssignmentCreateRes] - ) -> PrivilegeDetailsSchema: + ) -> PrivilegeDetailsSchema | None: if (len(enduser_privliege_list) == 0): return @@ -152,7 +152,7 @@ def to_enduser_privliege_revoked_details( error_msg = ( "Revoke user permission encountered problem." + f"Unknown forest client number {forest_client_number} for " - + "scoped permission {revoked_permission_role.role_name}." + + f"scoped permission {revoked_permission_role.role_name}." ) LOGGER.debug(error_msg) raise HTTPException( diff --git a/server/backend/testspg/conftest.py b/server/backend/testspg/conftest.py index c9ee11f79..7c4ab3198 100644 --- a/server/backend/testspg/conftest.py +++ b/server/backend/testspg/conftest.py @@ -1,7 +1,7 @@ import logging import os import sys -from typing import List +from typing import List, Optional import jwt import pytest @@ -9,29 +9,28 @@ import testcontainers.compose from Crypto.PublicKey import RSA from fastapi.testclient import TestClient +from mock import patch from mock_alchemy.mocking import UnifiedAlchemyMagicMock from sqlalchemy import create_engine from sqlalchemy.orm import Session, sessionmaker sys.path.append(os.path.join(os.path.dirname(__file__), "..")) - import api.app.database as database import api.app.jwt_validation as jwt_validation import testspg.jwt_utils as jwt_utils -from api.app.constants import COGNITO_USERNAME_KEY, ERROR_CODE_TERMS_CONDITIONS_REQUIRED -from api.app.crud import crud_utils +from api.app.constants import (COGNITO_USERNAME_KEY, + ERROR_CODE_TERMS_CONDITIONS_REQUIRED, UserType) +from api.app.crud import crud_user, crud_utils from api.app.main import apiPrefix, app +from api.app.models.model import FamUser from api.app.routers.router_guards import ( - enforce_bceid_terms_conditions_guard, - get_current_requester, - get_verified_target_user, -) + enforce_bceid_terms_conditions_guard, get_current_requester, + get_verified_target_user) from api.app.schemas import RequesterSchema, TargetUserSchema -from testspg.constants import ( - ACCESS_GRANT_FOM_DEV_CR_IDIR, - FOM_DEV_ADMIN_ROLE, - FOM_TEST_ADMIN_ROLE, -) +from api.app.schemas.fam_user import FamUserSchema +from testspg.constants import (ACCESS_GRANT_FOM_DEV_CR_IDIR, + FOM_DEV_ADMIN_ROLE, FOM_TEST_ADMIN_ROLE, + TEST_CREATOR) LOGGER = logging.getLogger(__name__) # the folder contains test docker-compose.yml, ours in the root directory @@ -266,3 +265,56 @@ def auth_headers(test_rsa_key): token = jwt_utils.create_jwt_token(test_rsa_key) headers = jwt_utils.headers(token) return headers + + +@pytest.fixture(scope="function") +def setup_new_user(db_pg_session: Session): + """ + New user setup for testing. + The fixture returns a function to be called with new user created based on + user_type, user_name and optionally if need to add cognito_user_id. + """ + + def _setup_new_user( + user_type: UserType, user_name, user_guid, cognito_user_id: Optional[str] = None + ) -> FamUser: + new_user_create = FamUserSchema( + **{ + "user_type_code": user_type, + "user_name": user_name, + "user_guid": user_guid, + "create_user": TEST_CREATOR, + "first_name": "Fist", + "last_name": "Last", + "email": "test@test.com" + } + ) + + fam_user = crud_user.create_user(new_user_create, db_pg_session) + if cognito_user_id is not None: + # SqlAlchemy is a bit strange, need to use `.query()` to do the + # update() and query() again in order to get correct updated entity + # from session. + db_pg_session.query(FamUser).filter( + FamUser.user_id == fam_user.user_id + ).update({FamUser.cognito_user_id: cognito_user_id}) + + fam_user = ( + db_pg_session.query(FamUser) + .filter(FamUser.user_id == fam_user.user_id) + .one() + ) + + return fam_user + + return _setup_new_user + + +@pytest.fixture(scope="function", autouse=True) +def mock_forest_client_integration_service(): + # Mocked dependency class object + with patch( + "api.app.integration.forest_client_integration.ForestClientIntegrationService", + autospec=True, + ) as m: + yield m.return_value # Very important to get instance of mocked class. diff --git a/server/backend/testspg/constants.py b/server/backend/testspg/constants.py index 30c6455b5..a10f68432 100644 --- a/server/backend/testspg/constants.py +++ b/server/backend/testspg/constants.py @@ -25,6 +25,18 @@ TEST_USER_ID = 1 TEST_USER_NAME_IDIR = "TEST_USER" TEST_USER_GUID_IDIR = "MOCKEDGUID5D4ACA9FA901EE2C91CB3B" # this is a faked user guid +TEST_USER_FIREST_NAME = "FIRST NAME" +TEST_USER_LAST_NAME = "LAST NAME" +TEST_USER_EMAIL = "EMAIL" +TEST_REQUESTER = { + "cognito_user_id": "test-idir_e72a12c916afakedffae7@idir", + "user_name": TEST_USER_NAME_IDIR, + "user_guid": TEST_USER_GUID_IDIR, + "user_id": TEST_USER_ID, + "first_name": TEST_USER_FIREST_NAME, + "last_name": TEST_USER_LAST_NAME, + "email": TEST_USER_EMAIL, +} TEST_NOT_EXIST_USER_TYPE = "NS" @@ -77,6 +89,9 @@ FC_NUMBER_EXISTS_RECEIVERSHIP = "00169575" FC_NUMBER_EXISTS_SUSPENDED = "00003643" +MOCK_FIND_CLIENT_00001011_RETURN = [{ + 'clientNumber': '00001011', 'clientName': 'AKIECA EXPLORERS LTD.', 'clientStatusCode': 'ACT', 'clientTypeCode': 'C' +}] # --------------------- Testing Admin role level at token -------------- # FOM_DEV_ADMIN_ROLE = "FOM_DEV_ADMIN" diff --git a/server/backend/testspg/crud/services/test_permission_audit_service.py b/server/backend/testspg/crud/services/test_permission_audit_service.py new file mode 100644 index 000000000..2abe7bf82 --- /dev/null +++ b/server/backend/testspg/crud/services/test_permission_audit_service.py @@ -0,0 +1,410 @@ +import copy +import logging +from http import HTTPStatus +from typing import List + +import pytest +from api.app.constants import (ERROR_CODE_UNKNOWN_STATE, + PrivilegeChangeTypeEnum, + PrivilegeDetailsPermissionTypeEnum, + PrivilegeDetailsScopeTypeEnum, UserType) +from api.app.crud.services.permission_audit_service import \ + PermissionAuditService +from api.app.integration.forest_client_integration import \ + ForestClientIntegrationService +from api.app.models.model import (FamApplication, FamForestClient, + FamPrivilegeChangeAudit, FamRole, FamUser, + FamUserRoleXref) +from api.app.schemas.fam_application import FamApplicationSchema +from api.app.schemas.fam_application_user_role_assignment_get import \ + FamApplicationUserRoleAssignmentGetSchema +from api.app.schemas.fam_forest_client import FamForestClientSchema +from api.app.schemas.fam_role_min import FamRoleMinSchema +from api.app.schemas.fam_role_with_client import FamRoleWithClientSchema +from api.app.schemas.fam_user_info import FamUserInfoSchema +from api.app.schemas.fam_user_role_assignment_create_response import \ + FamUserRoleAssignmentCreateRes +from api.app.schemas.fam_user_type import FamUserTypeSchema +from api.app.schemas.requester import RequesterSchema +from fastapi import HTTPException +from mock import patch +from sqlalchemy.orm import Session +from testspg.constants import (FOM_DEV_APPLICATION_ID, + MOCK_FIND_CLIENT_00001011_RETURN, + TEST_USER_GUID_IDIR, TEST_USER_NAME_IDIR, + USER_GUID_BCEID_LOAD_2_TEST, + USER_NAME_BCEID_LOAD_2_TEST) + +LOGGER = logging.getLogger(__name__) + +@pytest.fixture(scope="function") +def new_idir_requester(db_pg_session: Session, setup_new_user): + requester_user: FamUser = setup_new_user( + UserType.IDIR, + TEST_USER_NAME_IDIR, + TEST_USER_GUID_IDIR + ) + requester = RequesterSchema.model_validate( + requester_user.__dict__ + ) + return requester + + +def test_store_end_user_audit_history_granted_role_no_scope( + db_pg_session: Session, + setup_new_user, + new_idir_requester, + mocker +): + """ + Test service saving user permission granted history on role change with no scope. + """ + # setup performer and change_target_user + performer = new_idir_requester + change_target_user: FamUser = setup_new_user( + UserType.BCEID, + USER_NAME_BCEID_LOAD_2_TEST, + USER_GUID_BCEID_LOAD_2_TEST + ) + mock_user_permission_granted_list = [copy.copy(sample_end_user_permission_granted_no_scope_details)] + enduser_privliege_granted_details_fn_spy = mocker.spy(PermissionAuditService, 'to_enduser_privliege_granted_details') + change_performer_user_details_fn_spy = mocker.spy(PermissionAuditService, 'to_change_performer_user_details') + + # test the service: granting end user role with no scope. + paService = PermissionAuditService(db_pg_session) + paService.store_user_permissions_granted_audit_history(performer, change_target_user, mock_user_permission_granted_list) + + # find the audit record and verify + audit_record = db_pg_session.query(FamPrivilegeChangeAudit).filter( + FamPrivilegeChangeAudit.application_id == mock_user_permission_granted_list[0].detail.role.application.application_id, + FamPrivilegeChangeAudit.change_performer_user_id == performer.user_id, + FamPrivilegeChangeAudit.change_target_user_id == change_target_user.user_id + ).one() + assert audit_record is not None + assert change_performer_user_details_fn_spy.call_count == 1 + assert enduser_privliege_granted_details_fn_spy.call_count == 1 + assert audit_record.privilege_change_type_code == PrivilegeChangeTypeEnum.GRANT + assert audit_record.change_target_user_id == change_target_user.user_id + verify_change_performer_user(audit_record, performer) + verify_end_user_granted_privilege_details(audit_record, mock_user_permission_granted_list) + + +def test_store_end_user_audit_history_nosaving_when_no_success_permission_granted( + db_pg_session: Session, + setup_new_user, + new_idir_requester +): + """ + Test service saving user permission granted history but no success granted permission + return from execution. Should have no audit record saved. + """ + performer = new_idir_requester + change_target_user: FamUser = setup_new_user( + UserType.BCEID, + USER_NAME_BCEID_LOAD_2_TEST, + USER_GUID_BCEID_LOAD_2_TEST + ) + mock_user_permission_granted_list = [copy.copy(sample_end_user_permission_granted_no_scope_details)] + # setup all responses not success + for item in mock_user_permission_granted_list: + item.status_code = HTTPStatus.CONFLICT + + # test the service: granting end user role with no scope. + paService = PermissionAuditService(db_pg_session) + paService.store_user_permissions_granted_audit_history(performer, change_target_user, mock_user_permission_granted_list) + + # find the audit record and verify + audit_record = db_pg_session.query(FamPrivilegeChangeAudit).filter( + FamPrivilegeChangeAudit.application_id == mock_user_permission_granted_list[0].detail.role.application.application_id, + FamPrivilegeChangeAudit.change_performer_user_id == performer.user_id, + FamPrivilegeChangeAudit.change_target_user_id == change_target_user.user_id + ).one_or_none() + assert audit_record is None + + +def test_store_end_user_audit_history_granted_role_with_client_scopes( + db_pg_session: Session, + setup_new_user, + new_idir_requester, + mocker +): + """ + Test service saving user permission granted history on role change with 'CLIENT' scopes. + """ + performer = new_idir_requester + change_target_user: FamUser = setup_new_user( + UserType.BCEID, + USER_NAME_BCEID_LOAD_2_TEST, + USER_GUID_BCEID_LOAD_2_TEST + ) + mock_user_permission_granted_list = [copy.copy(sample_end_user_permission_granted_with_scope_details)] + enduser_privliege_granted_details_fn_spy = mocker.spy(PermissionAuditService, 'to_enduser_privliege_granted_details') + change_performer_user_details_fn_spy = mocker.spy(PermissionAuditService, 'to_change_performer_user_details') + + # test the service: granting end user role with scopes. + paService = PermissionAuditService(db_pg_session) + paService.store_user_permissions_granted_audit_history(performer, change_target_user, mock_user_permission_granted_list) + + # find the audit record and verify + audit_records = db_pg_session.query(FamPrivilegeChangeAudit).filter( + FamPrivilegeChangeAudit.application_id == mock_user_permission_granted_list[0].detail.role.application.application_id, + FamPrivilegeChangeAudit.change_performer_user_id == performer.user_id, + FamPrivilegeChangeAudit.change_target_user_id == change_target_user.user_id + ).all() + + assert len(audit_records) == len(mock_user_permission_granted_list) + assert change_performer_user_details_fn_spy.call_count == 1 + assert enduser_privliege_granted_details_fn_spy.call_count == 1 + for record in audit_records: + assert record.privilege_change_type_code == PrivilegeChangeTypeEnum.GRANT + assert record.change_target_user_id == change_target_user.user_id + verify_change_performer_user(record, performer) + verify_end_user_granted_privilege_details(record, mock_user_permission_granted_list) + + +def test_store_end_user_audit_history_revoke_role_no_scopes( + db_pg_session: Session, + setup_new_user, + new_idir_requester, + mocker +): + """ + Test service saving user permission revoked history on role change with no scope. + """ + performer = new_idir_requester + change_target_user: FamUser = setup_new_user( + UserType.BCEID, + USER_NAME_BCEID_LOAD_2_TEST, + USER_GUID_BCEID_LOAD_2_TEST + ) + mock_delete_record = copy.copy(sameple_user_role_with_no_client_revoked_record) + mock_delete_record.user = change_target_user + enduser_privliege_revoked_details_fn_spy = mocker.spy(PermissionAuditService, 'to_enduser_privliege_revoked_details') + change_performer_user_details_fn_spy = mocker.spy(PermissionAuditService, 'to_change_performer_user_details') + + # test the service: granting end user role with scopes. + paService = PermissionAuditService(db_pg_session) + paService.store_user_permissions_revoked_audit_history(performer, mock_delete_record) + + # find the audit record and verify + audit_record = db_pg_session.query(FamPrivilegeChangeAudit).filter( + FamPrivilegeChangeAudit.application_id == mock_delete_record.role.application.application_id, + FamPrivilegeChangeAudit.change_performer_user_id == performer.user_id, + FamPrivilegeChangeAudit.change_target_user_id == change_target_user.user_id + ).one() + assert audit_record is not None + assert enduser_privliege_revoked_details_fn_spy.call_count == 1 + assert change_performer_user_details_fn_spy.call_count == 1 + assert audit_record.privilege_change_type_code == PrivilegeChangeTypeEnum.REVOKE + assert audit_record.change_target_user_id == change_target_user.user_id + verify_change_performer_user(audit_record, performer) + verify_end_user_revoked_privilege_details(audit_record, mock_delete_record) + + +def test_store_end_user_audit_history_revoke_role_with_client_scopes( + mock_forest_client_integration_service, + db_pg_session: Session, + setup_new_user, + new_idir_requester, + mocker +): + """ + Test service saving user permission revoked history on role change with 'CLIENT' scope. + """ + performer = new_idir_requester + change_target_user: FamUser = setup_new_user( + UserType.BCEID, + USER_NAME_BCEID_LOAD_2_TEST, + USER_GUID_BCEID_LOAD_2_TEST + ) + mock_delete_record = copy.copy(sameple_user_role_with_client_revoked_record) + mock_delete_record.user_id = change_target_user.user_id + mock_delete_record.user = change_target_user + mock_forest_client_number = "00001011" + mock_delete_record.role.client_number.forest_client_number = mock_forest_client_number + mock_delete_record.role.role_name = f"FOM_SUBMITTER_{mock_forest_client_number}" + mock_forest_client_integration_service.find_by_client_number.return_value = MOCK_FIND_CLIENT_00001011_RETURN + + enduser_privliege_revoked_details_fn_spy = mocker.spy(PermissionAuditService, 'to_enduser_privliege_revoked_details') + change_performer_user_details_fn_spy = mocker.spy(PermissionAuditService, 'to_change_performer_user_details') + forest_client_integration_fn_spy = mocker.spy(ForestClientIntegrationService, 'find_by_client_number') + + # test the service: granting end user role with scopes. + paService = PermissionAuditService(db_pg_session) + paService.store_user_permissions_revoked_audit_history(performer, mock_delete_record) + + # find the audit record and verify + audit_record = db_pg_session.query(FamPrivilegeChangeAudit).filter( + FamPrivilegeChangeAudit.application_id == mock_delete_record.role.application.application_id, + FamPrivilegeChangeAudit.change_performer_user_id == performer.user_id, + FamPrivilegeChangeAudit.change_target_user_id == change_target_user.user_id + ).one() + assert audit_record is not None + assert enduser_privliege_revoked_details_fn_spy.call_count == 1 + assert change_performer_user_details_fn_spy.call_count == 1 + assert forest_client_integration_fn_spy.call_count == 1 + assert audit_record.privilege_change_type_code == PrivilegeChangeTypeEnum.REVOKE + assert audit_record.change_target_user_id == change_target_user.user_id + verify_change_performer_user(audit_record, performer) + verify_end_user_revoked_privilege_details(audit_record, mock_delete_record) + + +def test_store_end_user_audit_history_revoke_role_client_search_error( + mock_forest_client_integration_service, + db_pg_session: Session, + setup_new_user, + new_idir_requester, + mocker +): + """ + Test service saving user permission revoked history on role change with 'CLIENT' scope + but with scenario that forest client number is not found (unknown reason) from + FC integration external service. Exception should be raised and no audit record is saved. + """ + performer = new_idir_requester + change_target_user: FamUser = setup_new_user( + UserType.BCEID, + USER_NAME_BCEID_LOAD_2_TEST, + USER_GUID_BCEID_LOAD_2_TEST + ) + mock_delete_record = copy.copy(sameple_user_role_with_notfound_client_revoked_record) + mock_forest_client_integration_service.find_by_client_number.return_value = [] # FC external service result not found. + forest_client_integration_fn_spy = mocker.spy(ForestClientIntegrationService, 'find_by_client_number') + + with pytest.raises(HTTPException) as e: + paService = PermissionAuditService(db_pg_session) + paService.store_user_permissions_revoked_audit_history(performer, mock_delete_record) + + assert str(e.value.detail.get("code")).find(ERROR_CODE_UNKNOWN_STATE) != -1 + error_msg = ( + "Revoke user permission encountered problem." + + f"Unknown forest client number {mock_delete_record.role.client_number.forest_client_number} for " + + f"scoped permission {mock_delete_record.role.role_name}." + ) + assert str(e.value.detail.get("description")).find(error_msg) != -1 + assert forest_client_integration_fn_spy.call_count == 1 + # find the audit record and verify + audit_record = db_pg_session.query(FamPrivilegeChangeAudit).filter( + FamPrivilegeChangeAudit.application_id == mock_delete_record.role.application.application_id, + FamPrivilegeChangeAudit.change_performer_user_id == performer.user_id, + FamPrivilegeChangeAudit.change_target_user_id == change_target_user.user_id + ).one_or_none() + assert audit_record is None + + +def verify_change_performer_user(audit_record: FamPrivilegeChangeAudit, performer: FamUser): + audit_record.change_performer_user_id == performer.user_id + change_performer_user_details_dict = audit_record.change_performer_user_details + assert change_performer_user_details_dict["username"] == performer.user_name + assert change_performer_user_details_dict["first_name"] == performer.first_name + assert change_performer_user_details_dict["last_name"] == performer.last_name + assert change_performer_user_details_dict["email"] == performer.email + + +def verify_end_user_granted_privilege_details( + audit_record: FamPrivilegeChangeAudit, + mock_user_permission_granted_list: List[FamUserRoleAssignmentCreateRes] +): + audit_privilege_details_dict = audit_record.privilege_details + assert audit_privilege_details_dict["permission_type"] == PrivilegeDetailsPermissionTypeEnum.END_USER + assert len(audit_privilege_details_dict["roles"]) != 0 + audit_role = audit_privilege_details_dict["roles"][0] # FAM can grant 1 role at a time for now. + granted_role = mock_user_permission_granted_list[0].detail.role + assert audit_role["role"] == granted_role.display_name + audit_scopes = audit_role.get("scopes") + if granted_role.forest_client is None: + # "scopes" attribute is not present if the granted role has no scopes. + assert audit_scopes is None + + else: + assert len(audit_scopes) == len(mock_user_permission_granted_list) + org_id_list = list(map( + lambda item: item.detail.role.forest_client.forest_client_number, mock_user_permission_granted_list + )) + for scope in audit_scopes: + scope.get("scope_type") == PrivilegeDetailsScopeTypeEnum.CLIENT # Current FAM supports 'CLIENT' type only, more in future. + scope.get("client_id") in org_id_list + + +def verify_end_user_revoked_privilege_details( + audit_record: FamPrivilegeChangeAudit, + mock_delete_record: FamUserRoleXref +): + audit_privilege_details_dict = audit_record.privilege_details + assert audit_privilege_details_dict["permission_type"] == PrivilegeDetailsPermissionTypeEnum.END_USER + assert len(audit_privilege_details_dict["roles"]) != 0 + audit_role = audit_privilege_details_dict["roles"][0] # FAM can revoke 1 role at a time for now. + revoked_role = mock_delete_record.role + assert audit_role["role"] == revoked_role.display_name + audit_scopes = audit_role.get("scopes") + if not revoked_role.client_number_id: + # "scopes" attribute is not present if the revoked role has no scopes. + assert audit_scopes is None + + else: + assert len(audit_scopes) == 1 # Fam can revoke role with 1 org at a time. + org_id = revoked_role.client_number_id + scope = audit_scopes[0] + scope.get("scope_type") == PrivilegeDetailsScopeTypeEnum.CLIENT # Current FAM supports 'CLIENT' type only, more in future. + scope.get("client_id") == org_id + + +# sample end user permission granted response - role with no scope +sample_end_user_permission_granted_no_scope_details = FamUserRoleAssignmentCreateRes( + **{'status_code': HTTPStatus.OK, + 'detail': FamApplicationUserRoleAssignmentGetSchema( + user_role_xref_id=999, user_id=9, role_id=4, + user=FamUserInfoSchema(user_name='enduser', first_name='first', last_name='last', email='a@b.com', + user_type_relation=FamUserTypeSchema(user_type_code=UserType.BCEID, description='BCEID')), + role=FamRoleWithClientSchema(role_name='FOM_REVIEWER', role_type_code='C', + application=FamApplicationSchema(application_id=2, application_name='FOM_DEV', application_description='Forest Operations Map (DEV)'), + role_id=999, display_name='Reviewer', role_purpose='Provides the privilege to review all FOMs in the system', client_number=None, parent_role=None)), + 'error_message': None + } + ) + +# sample end user permission granted response - role with forest_client scope +sample_end_user_permission_granted_with_scope_details = FamUserRoleAssignmentCreateRes( + **{'status_code': HTTPStatus.OK, + 'detail': FamApplicationUserRoleAssignmentGetSchema( + user_role_xref_id=888, user_id=9, role_id=127, + user=FamUserInfoSchema(user_name='enduser', first_name='first', last_name='last', email='a@b.com', + user_type_relation=FamUserTypeSchema(user_type_code=UserType.BCEID, description='BCEID')), + role=FamRoleWithClientSchema(role_name='FOM_SUBMITTER_00001012', role_type_code='C', + application=FamApplicationSchema(application_id=2, application_name='FOM_DEV', application_description='Forest Operations Map (DEV)'), + role_id=127, display_name='Submitter', role_purpose='Provides the privilege to submit a FOM (on behalf of a specific forest client)', + client_number=FamForestClientSchema(client_name=None, forest_client_number="00001012", status=None), + parent_role=FamRoleMinSchema(role_name="FOM_SUBMITTER", role_type_code="A", + application=FamApplicationSchema(application_id=2, application_name='FOM_DEV', application_description='Forest Operations Map (DEV)')))), + 'error_message': None + } + ) + +sameple_user_role_with_no_client_revoked_record = FamUserRoleXref(**{ + "user_id": 111, "role_id": 999, + "user": FamUser(**{"user_id": 111}), + "role": FamRole(** {"display_name": "Reviewer", "application": FamApplication(** {"application_id": 2})}) +}) + +sameple_user_role_with_client_revoked_record = FamUserRoleXref(**{ + "user_id": 111, "role_id": 999, + "user": FamUser(**{"user_id": 111}), + "role": FamRole(**{"display_name": "Submitter", "role_name": "FOM_SUBMITTER_00001011", + "application": FamApplication(** {"application_id": FOM_DEV_APPLICATION_ID, }), + "client_number_id": 3, "client_number": FamForestClient(**{ + "forest_client_number": "00001011" + }) + }) +}) + +sameple_user_role_with_notfound_client_revoked_record = FamUserRoleXref(**{ + "user_id": 111, "role_id": 999, + "user": FamUser(**{"user_id": 111}), + "role": FamRole(**{"display_name": "Submitter", "role_name": "FOM_SUBMITTER_09090909", + "application": FamApplication(** {"application_id": FOM_DEV_APPLICATION_ID, }), + "client_number_id": 3, "client_number": FamForestClient(**{ + "forest_client_number": "09090909" + }) + }) +}) + diff --git a/server/backend/testspg/crud/validator/test_forest_client_validator.py b/server/backend/testspg/crud/validator/test_forest_client_validator.py index 4a0ea3fef..d04415844 100644 --- a/server/backend/testspg/crud/validator/test_forest_client_validator.py +++ b/server/backend/testspg/crud/validator/test_forest_client_validator.py @@ -29,16 +29,6 @@ ] -@pytest.fixture(scope="function", autouse=True) -def mock_forest_client(): - # Mocked dependency class object - with patch( - "api.app.integration.forest_client_integration.ForestClientIntegrationService", - autospec=True, - ) as m: - yield m.return_value # Very important to get instance of mocked class. - - @pytest.fixture(scope="function") def forest_client_integration_service(): return ForestClientIntegrationService() @@ -82,10 +72,10 @@ def __to_mock_forest_client_return(forest_client_number, api_client_status_code) def test_forest_client_number_exists( client_id_to_test, expcted_result, - mock_forest_client, + mock_forest_client_integration_service, forest_client_integration_service, ): - mock_forest_client.find_by_client_number.return_value = ( + mock_forest_client_integration_service.find_by_client_number.return_value = ( __to_mock_forest_client_return( client_id_to_test, expcted_result["api_client_status_code"], @@ -133,10 +123,10 @@ def test_forest_client_number_exists( def test_forest_client_active( client_id_to_test, expcted_result, - mock_forest_client, + mock_forest_client_integration_service, forest_client_integration_service, ): - mock_forest_client.find_by_client_number.return_value = ( + mock_forest_client_integration_service.find_by_client_number.return_value = ( __to_mock_forest_client_return( client_id_to_test, expcted_result["api_client_status_code"], @@ -169,10 +159,10 @@ def test_forest_client_active( def test_get_forest_client_status( client_id_to_test, expcted_result, - mock_forest_client, + mock_forest_client_integration_service, forest_client_integration_service, ): - mock_forest_client.find_by_client_number.return_value = ( + mock_forest_client_integration_service.find_by_client_number.return_value = ( __to_mock_forest_client_return( client_id_to_test, expcted_result["api_client_status_code"], diff --git a/server/backend/testspg/router/test_router_user_role_assignment.py b/server/backend/testspg/router/test_router_user_role_assignment.py index ddc825fbe..666810b71 100644 --- a/server/backend/testspg/router/test_router_user_role_assignment.py +++ b/server/backend/testspg/router/test_router_user_role_assignment.py @@ -9,8 +9,11 @@ ERROR_CODE_SELF_GRANT_PROHIBITED, ERROR_CODE_TERMS_CONDITIONS_REQUIRED, UserType) from api.app.crud import crud_application, crud_role, crud_user, crud_user_role +from api.app.crud.services.permission_audit_service import \ + PermissionAuditService from api.app.jwt_validation import ERROR_PERMISSION_REQUIRED from api.app.main import apiPrefix +from api.app.models.model import FamPrivilegeChangeAudit, FamUser from fastapi.testclient import TestClient from sqlalchemy.orm import Session from testspg.conftest import create_test_user_role_assignment @@ -63,11 +66,14 @@ def test_create_user_role_assignment_many_not_authorized( ) + def test_create_user_role_assignment_many_with_concrete_role_authorize_by_delegated_admin( + db_pg_session: Session, test_client_fixture: starlette.testclient.TestClient, test_rsa_key, override_get_verified_target_user, override_enforce_bceid_terms_conditions_guard, + mocker ): """ test if user is not app admin, but is delegated admin with the correct privilege, will be able to grant access @@ -84,6 +90,7 @@ def test_create_user_role_assignment_many_with_concrete_role_authorize_by_delega "business_guid": BUSINESS_GUID_BCEID_LOAD_3_TEST, } ) + store_user_permissions_granted_audit_history_fn_spy = mocker.spy(PermissionAuditService, 'store_user_permissions_granted_audit_history') # create a token for business bceid user COGNITO_USERNAME_BCEID with no app admin role, # this user has delegated admin privilege which is granted in the local sql @@ -106,6 +113,19 @@ def test_create_user_role_assignment_many_with_concrete_role_authorize_by_delega assert "role_id" in detail assert "application_id" in detail.get("role").get("application") + # verify audit record created (service being called only) + # for detail tests see test_permission_audit_service.py). + performer = db_pg_session.query(FamUser).filter( + FamUser.cognito_user_id == jwt_utils.COGNITO_USERNAME_BCEID_DELEGATED_ADMIN + ).one() + audit_record = db_pg_session.query(FamPrivilegeChangeAudit).filter( + FamPrivilegeChangeAudit.application_id == detail["role"]["application"]["application_id"], + FamPrivilegeChangeAudit.change_performer_user_id == performer.user_id, + FamPrivilegeChangeAudit.change_target_user_id == detail["user_id"] + ).one_or_none() + assert store_user_permissions_granted_audit_history_fn_spy.call_count == 1 + assert audit_record is not None + def test_create_user_role_assignment_many_with_abstract_role_authorize_by_delegated_admin( test_client_fixture: starlette.testclient.TestClient, @@ -868,6 +888,7 @@ def test_delete_user_role_assignment_with_forest_client_number( test_rsa_key, override_get_verified_target_user, override_enforce_bceid_terms_conditions_guard, + mocker ): """ test if user is not app admin, but is delegated admin with the correct privilege, will be able to remove access @@ -894,6 +915,8 @@ def test_delete_user_role_assignment_with_forest_client_number( ACCESS_GRANT_FOM_DEV_AR_00001018_BCEID_L3T, ) + store_user_permissions_revoked_audit_history_fn_spy = mocker.spy(PermissionAuditService, 'store_user_permissions_revoked_audit_history') + # create a token for business bceid user COGNITO_USERNAME_BCEID with no app admin role, # this user has delegated admin privilege which is granted in the local sql token = jwt_utils.create_jwt_token( @@ -906,6 +929,10 @@ def test_delete_user_role_assignment_with_forest_client_number( # delete is successful assert response.status_code == HTTPStatus.NO_CONTENT + # verify audit record action (service being called only) + # for detail tests see test_permission_audit_service.py). + assert store_user_permissions_revoked_audit_history_fn_spy.call_count == 1 + # create a user role assignment for a business bceid user within the same organization as the user COGNITO_USERNAME_BCEID # with abstract role FOM_SUBMITTER and forest client number 00001011 user_role_xref_id = create_test_user_role_assignment(