diff --git a/server/backend/api/app/constants.py b/server/backend/api/app/constants.py index 408e0eeb0..31be6c0eb 100644 --- a/server/backend/api/app/constants.py +++ b/server/backend/api/app/constants.py @@ -39,7 +39,7 @@ class AwsTargetEnv(str, Enum): # Internal defined enum client status constants for FAM 'router_forest_client'. # ACTIVE/INACTIVE are mapped from Forest Client API spce. -# See schemas.py/FamForestClientStatus class. +# See schemas/fam_forest_client_status.py class. class FamForestClientStatusType(str, Enum): ACTIVE = "A" INACTIVE = "I" diff --git a/server/backend/api/app/crud/crud_application.py b/server/backend/api/app/crud/crud_application.py index b2030c860..d5578052c 100644 --- a/server/backend/api/app/crud/crud_application.py +++ b/server/backend/api/app/crud/crud_application.py @@ -1,7 +1,7 @@ import logging from typing import List -from api.app import schemas +from api.app.schemas import RequesterSchema, FamApplicationUserRoleAssignmentGetSchema from api.app.constants import UserType from api.app.models import model as models from sqlalchemy import func, select @@ -23,8 +23,8 @@ def get_application(db: Session, application_id: int): def get_application_role_assignments( - db: Session, application_id: int, requester: schemas.Requester -) -> List[schemas.FamApplicationUserRoleAssignmentGet]: + db: Session, application_id: int, requester: RequesterSchema +) -> List[FamApplicationUserRoleAssignmentGetSchema]: """query the user / role cross reference table to retrieve the role assignments. Delegated Admin will only see user role assignments by the roles granted for them. diff --git a/server/backend/api/app/crud/crud_forest_client.py b/server/backend/api/app/crud/crud_forest_client.py index 87fdc4c54..34de25bc8 100644 --- a/server/backend/api/app/crud/crud_forest_client.py +++ b/server/backend/api/app/crud/crud_forest_client.py @@ -3,7 +3,7 @@ from api.app.models import model as models from sqlalchemy.orm import Session -from .. import schemas +from api.app.schemas import FamForestClientCreateSchema LOGGER = logging.getLogger(__name__) @@ -22,7 +22,7 @@ def get_forest_client(db: Session, forest_client_number: str) -> models.FamFores return fam_forest_client -def create_forest_client(fam_forest_client: schemas.FamForestClientCreate, db: Session): +def create_forest_client(fam_forest_client: FamForestClientCreateSchema, db: Session): LOGGER.debug(f"Creating Fam_Forest_Client with: {fam_forest_client}") fam_forest_client_dict = fam_forest_client.model_dump() @@ -45,7 +45,7 @@ def find_or_create(db: Session, forest_client_number: str, requester: str): "does not exist, add a new Forest Client." ) - request_forest_client = schemas.FamForestClientCreate( + request_forest_client = FamForestClientCreateSchema( **{ "forest_client_number": forest_client_number, "create_user": requester, diff --git a/server/backend/api/app/crud/crud_role.py b/server/backend/api/app/crud/crud_role.py index 15aecd859..b488e086b 100644 --- a/server/backend/api/app/crud/crud_role.py +++ b/server/backend/api/app/crud/crud_role.py @@ -1,10 +1,11 @@ import logging from typing import Optional +from sqlalchemy.orm import Session from api.app.models import model as models +from api.app.schemas import FamForestClientCreateSchema, FamRoleCreateSchema from sqlalchemy.orm import Session -from .. import schemas from . import crud_forest_client @@ -18,7 +19,7 @@ def get_role(db: Session, role_id: int) -> Optional[models.FamRole]: ) -def create_role(role: schemas.FamRoleCreate, db: Session) -> models.FamRole: +def create_role(role: FamRoleCreateSchema, db: Session) -> models.FamRole: LOGGER.debug(f"Creating Fam role: {role}") fam_role_dict = role.model_dump() @@ -45,7 +46,7 @@ def create_role(role: schemas.FamRoleCreate, db: Session) -> models.FamRole: + f" 327 / {forest_client_number}", "create_user": fam_role_model.create_user, } - fc_pydantic = schemas.FamForestClientCreate(**fc_dict) + fc_pydantic = FamForestClientCreateSchema(**fc_dict) forest_client_model = crud_forest_client.create_forest_client( db=db, fam_forest_client=fc_pydantic ) @@ -64,19 +65,19 @@ def create_role(role: schemas.FamRoleCreate, db: Session) -> models.FamRole: def get_role_by_role_name_and_app_id( - db: Session, - role_name: str, - application_id: int + db: Session, role_name: str, application_id: int ) -> Optional[models.FamRole]: """ Gets FAM role based on role_name and application_id. """ - LOGGER.debug(f"Getting FamRole by role_name: {role_name} and application_di: {application_id}") + LOGGER.debug( + f"Getting FamRole by role_name: {role_name} and application_di: {application_id}" + ) return ( db.query(models.FamRole) .filter( models.FamRole.role_name == role_name, - models.FamRole.application_id == application_id + models.FamRole.application_id == application_id, ) .one_or_none() ) diff --git a/server/backend/api/app/crud/crud_user.py b/server/backend/api/app/crud/crud_user.py index 386258467..9dc5bc7f3 100644 --- a/server/backend/api/app/crud/crud_user.py +++ b/server/backend/api/app/crud/crud_user.py @@ -7,8 +7,13 @@ from api.app.crud import crud_utils from api.app.integration.idim_proxy import IdimProxyService from api.config import config - -from .. import schemas +from api.app.schemas import ( + FamUserSchema, + TargetUserSchema, + FamUserUpdateResponseSchema, + IdimProxySearchParamSchema, + IdimProxyBceidSearchParamSchema, +) LOGGER = logging.getLogger(__name__) @@ -74,11 +79,11 @@ def get_user_by_domain_and_guid( ) -def create_user(fam_user: schemas.FamUser, db: Session): +def create_user(fam_user: FamUserSchema, db: Session): """used to add a new FAM user to the database :param fam_user: _description_ - :type fam_user: schemas.FamUser + :type fam_user: FamUserSchema :param db: _description_ :type db: Session :return: _description_ @@ -124,7 +129,7 @@ def find_or_create( # or found user by domain and username, but user guid does not match (this is the edge case that could happen when username changed from IDP provider) # create a new user else: - request_user = schemas.FamUser( + request_user = FamUserSchema( **{ "user_type_code": user_type_code, "user_name": user_name, @@ -191,7 +196,7 @@ def update_user_name( def update_user_properties_from_verified_target_user( db: Session, user_id: int, - target_user: schemas.TargetUser, + target_user: TargetUserSchema, requester: str, # cognito_user_id ): """ @@ -257,7 +262,7 @@ def fetch_initial_requester_info(db: Session, cognito_user_id: str): def update_user_info_from_idim_source( db: Session, use_pagination: bool, page: int, per_page: int -) -> schemas.FamUserUpdateResponse: +) -> FamUserUpdateResponseSchema: """ Go through each user record in the database, update the user information to match the record in IDIM web service, @@ -311,7 +316,7 @@ def update_user_info_from_idim_source( if user.user_type_code == UserType.IDIR: # IDIM web service doesn't support search IDIR by user_guid, so we search by userID search_result = idim_proxy_service.search_idir( - schemas.IdimProxySearchParam(**{"userId": user.user_name}) + IdimProxySearchParamSchema(**{"userId": user.user_name}) ) if not user.user_guid: @@ -338,7 +343,7 @@ def update_user_info_from_idim_source( if user.user_guid: # if found business bceid user by user_guid, update username if necessary search_result = idim_proxy_service.search_business_bceid( - schemas.IdimProxyBceidSearchParam( + IdimProxyBceidSearchParamSchema( **{ "searchUserBy": IdimSearchUserParamType.USER_GUID, "searchValue": user.user_guid, @@ -352,7 +357,7 @@ def update_user_info_from_idim_source( else: # if user has no user_guid in our database, find by user_name and add user_guid to database search_result = idim_proxy_service.search_business_bceid( - schemas.IdimProxyBceidSearchParam( + IdimProxyBceidSearchParamSchema( **{ "searchUserBy": IdimSearchUserParamType.USER_ID, "searchValue": user.user_name, @@ -395,7 +400,7 @@ def update_user_info_from_idim_source( LOGGER.debug(f"Failed to update user info: {e}") failed_user_list.append(user.user_id) - return schemas.FamUserUpdateResponse( + return FamUserUpdateResponseSchema( **{ "total_db_users_count": total_db_users_count, "current_page": page, diff --git a/server/backend/api/app/crud/crud_user_role.py b/server/backend/api/app/crud/crud_user_role.py index 0258b7edf..2e709a566 100644 --- a/server/backend/api/app/crud/crud_user_role.py +++ b/server/backend/api/app/crud/crud_user_role.py @@ -3,7 +3,14 @@ from typing import List from api.app import constants as famConstants -from api.app import schemas +from api.app.schemas import ( + FamUserRoleAssignmentCreateSchema, + TargetUserSchema, + FamUserRoleAssignmentCreateResponseSchema, + FamApplicationUserRoleAssignmentGetSchema, + GCNotifyGrantAccessEmailParamSchema, + FamRoleCreateSchema +) from api.app.crud import crud_forest_client, crud_role, crud_user, crud_utils from api.app.crud.validator.forest_client_validator import ( forest_client_active, forest_client_number_exists, @@ -12,7 +19,6 @@ from api.app.integration.gc_notify import GCNotifyEmailService from api.app.models import model as models from api.app.utils.utils import raise_http_exception -from requests import HTTPError from sqlalchemy.orm import Session LOGGER = logging.getLogger(__name__) @@ -20,10 +26,10 @@ def create_user_role_assignment_many( db: Session, - request: schemas.FamUserRoleAssignmentCreate, - target_user: schemas.TargetUser, + request: FamUserRoleAssignmentCreateSchema, + target_user: TargetUserSchema, requester: str, -) -> List[schemas.FamUserRoleAssignmentCreateResponse]: +) -> List[FamUserRoleAssignmentCreateResponseSchema]: """ Create fam_user_role_xref Association @@ -74,7 +80,7 @@ def create_user_role_assignment_many( fam_role.role_type_code == famConstants.RoleType.ROLE_TYPE_ABSTRACT ) - create_return_list: List[schemas.FamUserRoleAssignmentCreateResponse] = [] + create_return_list: List[FamUserRoleAssignmentCreateResponseSchema] = [] if require_child_role: LOGGER.debug( @@ -155,27 +161,25 @@ def create_user_role_assignment( ) if fam_user_role_xref: - error_msg = ( - f"Role {fam_user_role_xref.role.role_name} already assigned to user {fam_user_role_xref.user.user_name}." - ) - create_user_role_assginment_return = ( - schemas.FamUserRoleAssignmentCreateResponse( - **{ - "status_code": HTTPStatus.CONFLICT, - "detail": schemas.FamApplicationUserRoleAssignmentGet(**fam_user_role_xref.__dict__), - "error_message": error_msg, - } - ) + error_msg = f"Role {fam_user_role_xref.role.role_name} already assigned to user {fam_user_role_xref.user.user_name}." + create_user_role_assginment_return = FamUserRoleAssignmentCreateResponseSchema( + **{ + "status_code": HTTPStatus.CONFLICT, + "detail": FamApplicationUserRoleAssignmentGetSchema( + **fam_user_role_xref.__dict__ + ), + "error_message": error_msg, + } ) else: fam_user_role_xref = create(db, user.user_id, role.role_id, requester) - create_user_role_assginment_return = ( - schemas.FamUserRoleAssignmentCreateResponse( - **{ - "status_code": HTTPStatus.OK, - "detail": schemas.FamApplicationUserRoleAssignmentGet(**fam_user_role_xref.__dict__), - } - ) + create_user_role_assginment_return = FamUserRoleAssignmentCreateResponseSchema( + **{ + "status_code": HTTPStatus.OK, + "detail": FamApplicationUserRoleAssignmentGetSchema( + **fam_user_role_xref.__dict__ + ), + } ) return create_user_role_assginment_return @@ -267,7 +271,7 @@ def find_or_create_forest_client_child_role( if not child_role: child_role = crud_role.create_role( - schemas.FamRoleCreate( + FamRoleCreateSchema( **{ "parent_role_id": parent_role.role_id, "application_id": parent_role.application_id, @@ -301,8 +305,9 @@ def find_by_id(db: Session, user_role_xref_id: int) -> models.FamUserRoleXref: def send_user_access_granted_email( - target_user: schemas.TargetUser, - roles_assignment_responses: List[schemas.FamUserRoleAssignmentCreateResponse]): + target_user: TargetUserSchema, + roles_assignment_responses: List[FamUserRoleAssignmentCreateResponseSchema], +): """ Send email using GC Notify integration service. TODO: Erro handling when sending email encountered technical errors (400/500). Ticket #1471. @@ -325,15 +330,19 @@ def send_user_access_granted_email( ) with_client_number = "yes" if roles_assignment_responses[0].detail.role.client_number is not None else "no" email_service = GCNotifyEmailService() - email_params = schemas.GCNotifyGrantAccessEmailParam(**{ - "first_name": target_user.first_name, - "last_name": target_user.last_name, - "application_name": roles_assignment_responses[0].detail.role.application.application_description, - "role_list_string": granted_roles, - "application_team_contact_email": None, # TODO: ticket #1507 to implement this. - "send_to_email": target_user.email, - "with_client_number": with_client_number - }) + email_params = GCNotifyGrantAccessEmailParamSchema( + **{ + "first_name": target_user.first_name, + "last_name": target_user.last_name, + "application_name": roles_assignment_responses[ + 0 + ].detail.role.application.application_description, + "role_list_string": granted_roles, + "application_team_contact_email": None, # TODO: ticket #1507 to implement this. + "send_to_email": target_user.email, + "with_client_number": with_client_number, + } + ) if granted_roles == "": # no role is granted return diff --git a/server/backend/api/app/crud/validator/forest_client_validator.py b/server/backend/api/app/crud/validator/forest_client_validator.py index a48c1d554..3b00dd387 100644 --- a/server/backend/api/app/crud/validator/forest_client_validator.py +++ b/server/backend/api/app/crud/validator/forest_client_validator.py @@ -2,21 +2,21 @@ from typing import List, Union from api.app.constants import FOREST_CLIENT_STATUS -from api.app.schemas import ForestClientIntegrationFindResponse +from api.app.schemas import ForestClientIntegrationFindResponseSchema LOGGER = logging.getLogger(__name__) def forest_client_number_exists( - forest_client_find_result: List[ForestClientIntegrationFindResponse], + forest_client_find_result: List[ForestClientIntegrationFindResponseSchema], ) -> bool: # Exact client number search - should only contain 1 result. return len(forest_client_find_result) == 1 def forest_client_active( - forest_client_find_result: List[ForestClientIntegrationFindResponse], + forest_client_find_result: List[ForestClientIntegrationFindResponseSchema], ) -> bool: return ( ( @@ -29,7 +29,7 @@ def forest_client_active( def get_forest_client_status( - forest_client_find_result: List[ForestClientIntegrationFindResponse], + forest_client_find_result: List[ForestClientIntegrationFindResponseSchema], ) -> Union[str, None]: return ( forest_client_find_result[0][FOREST_CLIENT_STATUS["KEY"]] diff --git a/server/backend/api/app/crud/validator/target_user_validator.py b/server/backend/api/app/crud/validator/target_user_validator.py index 132ee5748..0e6e5991d 100644 --- a/server/backend/api/app/crud/validator/target_user_validator.py +++ b/server/backend/api/app/crud/validator/target_user_validator.py @@ -1,11 +1,19 @@ import copy import logging -from api.app.constants import (ERROR_CODE_INVALID_REQUEST_PARAMETER, ApiInstanceEnv, - IdimSearchUserParamType, UserType) +from api.app.constants import ( + ERROR_CODE_INVALID_REQUEST_PARAMETER, + ApiInstanceEnv, + IdimSearchUserParamType, + UserType, +) from api.app.integration.idim_proxy import IdimProxyService -from api.app.schemas import (IdimProxyBceidSearchParam, IdimProxySearchParam, - Requester, TargetUser) +from api.app.schemas import ( + IdimProxyBceidSearchParamSchema, + IdimProxySearchParamSchema, + RequesterSchema, + TargetUserSchema, +) from api.app.utils import utils LOGGER = logging.getLogger(__name__) @@ -14,20 +22,22 @@ class TargetUserValidator: def __init__( self, - requester: Requester, - target_user: TargetUser, + requester: RequesterSchema, + target_user: TargetUserSchema, api_instance_env: ApiInstanceEnv, ): LOGGER.debug(f"Validating target env set to: {api_instance_env}") self.verified_target_user = copy.deepcopy(target_user) self.idim_proxy_service = IdimProxyService(requester, api_instance_env) - def verify_user_exist(self) -> TargetUser: + def verify_user_exist(self) -> TargetUserSchema: search_result = None if self.verified_target_user.user_type_code == UserType.IDIR: # IDIM web service doesn't support search IDIR by user_guid, so we search by userID search_result = self.idim_proxy_service.search_idir( - IdimProxySearchParam(**{"userId": self.verified_target_user.user_name}) + IdimProxySearchParamSchema( + **{"userId": self.verified_target_user.user_name} + ) ) # in edge case, the return guid from search doesn't match the guid given from request parameter @@ -48,7 +58,7 @@ def verify_user_exist(self) -> TargetUser: elif self.verified_target_user.user_type_code == UserType.BCEID: search_result = self.idim_proxy_service.search_business_bceid( - IdimProxyBceidSearchParam( + IdimProxyBceidSearchParamSchema( **{ "searchUserBy": IdimSearchUserParamType.USER_GUID, "searchValue": self.verified_target_user.user_guid, diff --git a/server/backend/api/app/integration/forest_client/forest_client.py b/server/backend/api/app/integration/forest_client/forest_client.py index 5e7993aff..d9e615f6d 100644 --- a/server/backend/api/app/integration/forest_client/forest_client.py +++ b/server/backend/api/app/integration/forest_client/forest_client.py @@ -58,7 +58,7 @@ def find_by_client_number(self, p_client_number: str): try: r = self.session.get(url, timeout=self.TIMEOUT) r.raise_for_status() - # !! Don't map and return schema.FamForestClient or object from "scheam.py" as that + # !! Don't map and return FamForestClientSchema or object from "scheam.py" as that # will create circular dependency issue. let crud to map the result. api_result = r.json() LOGGER.debug(f"API result: {api_result}") @@ -81,4 +81,4 @@ def find_by_client_number(self, p_client_number: str): # Else raise error, including 500 # There is a general error handler, see: requests_http_error_handler - raise he \ No newline at end of file + raise he diff --git a/server/backend/api/app/integration/gc_notify.py b/server/backend/api/app/integration/gc_notify.py index a14189df6..fe3d25cec 100644 --- a/server/backend/api/app/integration/gc_notify.py +++ b/server/backend/api/app/integration/gc_notify.py @@ -1,7 +1,7 @@ import logging import requests -from api.app.schemas import GCNotifyGrantAccessEmailParam +from api.app.schemas import GCNotifyGrantAccessEmailParamSchema from api.config import config LOGGER = logging.getLogger(__name__) @@ -32,7 +32,9 @@ def __init__(self): self.session = requests.Session() self.session.headers.update(self.headers) - def send_user_access_granted_email(self, params: GCNotifyGrantAccessEmailParam): + def send_user_access_granted_email( + self, params: GCNotifyGrantAccessEmailParamSchema + ): """ Send grant access email """ @@ -47,10 +49,7 @@ def send_user_access_granted_email(self, params: GCNotifyGrantAccessEmailParam): email_params = { "email_address": params.send_to_email, "template_id": self.grant_access_email_template_id, - "personalisation": { - **params.__dict__, - "contact_message": contact_message - }, + "personalisation": {**params.__dict__, "contact_message": contact_message}, } LOGGER.debug(f"Sending user access granted email with param {email_params}") gc_notify_email_send_url = f"{self.email_base_url}/v2/notifications/email" diff --git a/server/backend/api/app/integration/idim_proxy.py b/server/backend/api/app/integration/idim_proxy.py index 57d5e4edf..7ae89cf4e 100644 --- a/server/backend/api/app/integration/idim_proxy.py +++ b/server/backend/api/app/integration/idim_proxy.py @@ -4,8 +4,11 @@ import requests from api.app.constants import IDIM_PROXY_ACCOUNT_TYPE_MAP, ApiInstanceEnv, UserType from api.app.jwt_validation import ERROR_PERMISSION_REQUIRED -from api.app.schemas import (IdimProxyBceidSearchParam, IdimProxySearchParam, - Requester) +from api.app.schemas import ( + IdimProxyBceidSearchParamSchema, + IdimProxySearchParamSchema, + RequesterSchema, +) from api.config import config from fastapi import HTTPException @@ -29,7 +32,11 @@ class IdimProxyService: TIMEOUT = (5, 10) # Timeout (connect, read) in seconds. - def __init__(self, requester: Requester, api_instance_env: ApiInstanceEnv = ApiInstanceEnv.TEST): + def __init__( + self, + requester: RequesterSchema, + api_instance_env: ApiInstanceEnv = ApiInstanceEnv.TEST, + ): self.requester = requester # by default use test idim proxy url if not specify the api instance enviornment self.api_idim_proxy_url = ( @@ -41,7 +48,7 @@ def __init__(self, requester: Requester, api_instance_env: ApiInstanceEnv = ApiI self.session = requests.Session() self.session.headers.update(self.headers) - def search_idir(self, search_params: IdimProxySearchParam): + def search_idir(self, search_params: IdimProxySearchParamSchema): """ Search on IDIR user. Note, current idim-proxy only does exact match. @@ -65,12 +72,12 @@ def search_idir(self, search_params: IdimProxySearchParam): LOGGER.debug(f"API result: {api_result}") return api_result - def search_business_bceid(self, search_params: IdimProxyBceidSearchParam): + def search_business_bceid(self, search_params: IdimProxyBceidSearchParamSchema): """ Search on Business BCEID user. This search can be perfomed by IDIR requester or BCeID requester by passing "user_guid" to "requesterUserGuid". - search_param: is of type "IdimProxyBceidSearchParam" and can be 'searchUserBy' + search_param: is of type "IdimProxyBceidSearchParamSchema" and can be 'searchUserBy' - "userId" or - "userGuid" (preferred) """ diff --git a/server/backend/api/app/routers/router_application.py b/server/backend/api/app/routers/router_application.py index 034481bf9..ebc4b778f 100644 --- a/server/backend/api/app/routers/router_application.py +++ b/server/backend/api/app/routers/router_application.py @@ -1,12 +1,14 @@ import logging from typing import List -from api.app import database, schemas +from api.app import database from api.app.crud import crud_application from api.app.routers.router_guards import ( - authorize_by_app_id, enforce_bceid_terms_conditions_guard, - get_current_requester) -from api.app.schemas import Requester + authorize_by_app_id, + enforce_bceid_terms_conditions_guard, + get_current_requester, +) +from api.app.schemas import RequesterSchema, FamApplicationUserRoleAssignmentGetSchema from fastapi import APIRouter, Depends from sqlalchemy.orm import Session @@ -16,7 +18,7 @@ @router.get( "/{application_id}/user_role_assignment", - response_model=List[schemas.FamApplicationUserRoleAssignmentGet], + response_model=List[FamApplicationUserRoleAssignmentGetSchema], status_code=200, dependencies=[ Depends(authorize_by_app_id), # Enforce application-level security @@ -26,7 +28,7 @@ def get_fam_application_user_role_assignment( application_id: int, db: Session = Depends(database.get_db), - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): """ gets the roles assignment associated with an application diff --git a/server/backend/api/app/routers/router_forest_client.py b/server/backend/api/app/routers/router_forest_client.py index edffcc278..af6d875bd 100644 --- a/server/backend/api/app/routers/router_forest_client.py +++ b/server/backend/api/app/routers/router_forest_client.py @@ -6,17 +6,17 @@ from api.app.routers.router_utils import get_api_instance_env from fastapi import APIRouter, Depends, Query -from .. import schemas +from api.app.schemas import FamForestClientSchema LOGGER = logging.getLogger(__name__) router = APIRouter() -@router.get("/search", response_model=List[schemas.FamForestClient]) +@router.get("/search", response_model=List[FamForestClientSchema]) def search( client_number: str = Query(min_length=3, max_length=8), - api_instance_env=Depends(get_api_instance_env) + api_instance_env=Depends(get_api_instance_env), ): """ Forest Client(s) search (by defined query parameter(s)). @@ -34,13 +34,12 @@ def search( return forest_clients -def __map_api_results(item) -> schemas.FamForestClient: +def __map_api_results(item) -> FamForestClientSchema: """ - Private method to map api result to schemas.FamForestClient + Private method to map api result to FamForestClientSchema """ parsed = json.loads( json.dumps(item), # need json string format, so dumps from 'dic' type 'item'. - object_hook=schemas.FamForestClient.from_api_json + object_hook=FamForestClientSchema.from_api_json, ) return parsed - diff --git a/server/backend/api/app/routers/router_guards.py b/server/backend/api/app/routers/router_guards.py index 25ee39cb3..fdc642b48 100644 --- a/server/backend/api/app/routers/router_guards.py +++ b/server/backend/api/app/routers/router_guards.py @@ -34,7 +34,7 @@ validate_token, ) from api.app.models.model import FamRole, FamUser -from api.app.schemas import Requester, TargetUser +from api.app.schemas import RequesterSchema, TargetUserSchema from api.app.utils import utils from fastapi import Depends, Request, Security from fastapi.security import APIKeyHeader @@ -55,9 +55,9 @@ def get_current_requester( request_cognito_user_id: str = Depends(get_request_cognito_user_id), access_roles: List[str] = Depends(get_access_roles), db: Session = Depends(database.get_db), -) -> Requester: +) -> RequesterSchema: LOGGER.debug( - f"Retrieving current requester from: request_cognito_user_id: {request_cognito_user_id}" + f"Retrieving current Requester from: request_cognito_user_id: {request_cognito_user_id}" ) fam_user: FamUser = crud_user.fetch_initial_requester_info( db, request_cognito_user_id @@ -73,14 +73,14 @@ def get_current_requester( else: custom_fields = _parse_custom_requester_fields(fam_user) - requester = Requester.model_validate( + requester = RequesterSchema.model_validate( { **fam_user.__dict__, # base db 'user' info "access_roles": access_roles, # role from JWT **custom_fields, # build/convert to custom attributes } ) - LOGGER.debug(f"Current request user (requester): {requester}") + LOGGER.debug(f"Current request user (Requester): {requester}") return requester @@ -112,7 +112,7 @@ def _parse_custom_requester_fields(fam_user: FamUser): def authorize( claims: dict = Depends(validate_token), - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): """ This authorize method is used by Forest Client API and IDIM Proxy API integration for a general authorization check, @@ -135,7 +135,7 @@ def authorize_by_app_id( application_id: int, db: Session = Depends(database.get_db), access_roles=Depends(get_access_roles), - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): """ This authorize_by_app_id method is used for the authorization check of a specific application, @@ -199,7 +199,7 @@ def authorize_by_application_role( role: FamRole = Depends(get_request_role_from_id), db: Session = Depends(database.get_db), access_roles=Depends(get_access_roles), - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): """ This authorize_by_application_role method is used for the authorization check of a specific application, @@ -222,7 +222,7 @@ async def authorize_by_privilege( role: FamRole = Depends(get_request_role_from_id), db: Session = Depends(database.get_db), access_roles=Depends(get_access_roles), - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): """ This authorize_by_privilege method is used for checking if the requester has the privilege to grant/remove access of the role. @@ -289,11 +289,11 @@ async def authorize_by_privilege( # Very likely in future might have "cognito_user_id" case. async def get_target_user_from_id( request: Request, db: Session = Depends(database.get_db) -) -> TargetUser: +) -> TargetUserSchema: """ This is used as FastAPI sub-dependency to find target_user for guard purpose. - Please note that the TargetUser inputs hasn't been validated yet. Need to call get_verified_target_user to validate the TargetUser. - For requester, use "get_current_requester()" above. + Please note that the TargetUserSchema inputs hasn't been validated yet. Need to call get_verified_target_user to validate the TargetUserSchema. + For RequesterSchema, use "get_current_requester()" above. """ # from path_param - "user_role_xref_id"; should exists already in db. if "user_role_xref_id" in request.path_params: @@ -304,7 +304,7 @@ async def get_target_user_from_id( ) user_role = crud_user_role.find_by_id(db, urxid) if user_role is not None: - found_target_user = TargetUser.model_validate(user_role.user) + found_target_user = TargetUserSchema.model_validate(user_role.user) return found_target_user else: error_msg = "Parameter 'user_role_xref_id' is missing or invalid." @@ -319,7 +319,7 @@ async def get_target_user_from_id( "Dependency 'get_target_user_from_id' called with " + f"request body {rbody}." ) - target_new_user = TargetUser.model_validate( + target_new_user = TargetUserSchema.model_validate( { "user_name": rbody["user_name"], "user_type_code": rbody["user_type_code"], @@ -330,8 +330,8 @@ async def get_target_user_from_id( async def authorize_by_user_type( - requester: Requester = Depends(get_current_requester), - target_user: TargetUser = Depends(get_target_user_from_id), + requester: RequesterSchema = Depends(get_current_requester), + target_user: TargetUserSchema = Depends(get_target_user_from_id), ): """ This authorize_by_user_type method is used to forbidden business bceid user manage idir user's access @@ -355,7 +355,9 @@ async def authorize_by_user_type( ) -async def internal_only_action(requester: Requester = Depends(get_current_requester)): +async def internal_only_action( + requester: RequesterSchema = Depends(get_current_requester), +): if requester.user_type_code is not UserType.IDIR: utils.raise_http_exception( status_code=HTTPStatus.FORBIDDEN, @@ -365,7 +367,7 @@ async def internal_only_action(requester: Requester = Depends(get_current_reques def external_delegated_admin_only_action( - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): if not requester.is_external_delegated_admin(): utils.raise_http_exception( @@ -376,11 +378,11 @@ def external_delegated_admin_only_action( async def enforce_self_grant_guard( - requester: Requester = Depends(get_current_requester), - target_user: TargetUser = Depends(get_target_user_from_id), + requester: RequesterSchema = Depends(get_current_requester), + target_user: TargetUserSchema = Depends(get_target_user_from_id), ): """ - Verify logged on admin (requester): + Verify logged on admin (RequesterSchema): Self granting/removing privilege currently isn't allowed. """ LOGGER.debug(f"enforce_self_grant_guard: requester - {requester}") @@ -402,10 +404,10 @@ async def enforce_self_grant_guard( async def get_verified_target_user( - requester: Requester = Depends(get_current_requester), - target_user: TargetUser = Depends(get_target_user_from_id), + requester: RequesterSchema = Depends(get_current_requester), + target_user: TargetUserSchema = Depends(get_target_user_from_id), role: FamRole = Depends(get_request_role_from_id), -) -> TargetUser: +) -> TargetUserSchema: """ Validate the target user by calling IDIM web service, and update business Guid for the found BCeID user """ @@ -420,8 +422,8 @@ async def get_verified_target_user( async def enforce_bceid_by_same_org_guard( # forbid business bceid user (requester) manage idir user's access _enforce_user_type_auth: None = Depends(authorize_by_user_type), - requester: Requester = Depends(get_current_requester), - target_user: TargetUser = Depends(get_verified_target_user), + requester: RequesterSchema = Depends(get_current_requester), + target_user: TargetUserSchema = Depends(get_verified_target_user), ): """ When requester is a BCeID user, enforce requester can only manage target @@ -440,7 +442,7 @@ async def enforce_bceid_by_same_org_guard( target_user_business_guid = target_user.business_guid if requester_business_guid is None or target_user_business_guid is None: - error_msg = "Operation encountered unexpected error. Requester or target user business GUID is missing." + error_msg = "Operation encountered unexpected error. requester or target user business GUID is missing." utils.raise_http_exception( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, error_code=ERROR_CODE_MISSING_KEY_ATTRIBUTE, @@ -456,7 +458,7 @@ async def enforce_bceid_by_same_org_guard( def enforce_bceid_terms_conditions_guard( - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): if requester.requires_accept_tc: utils.raise_http_exception( diff --git a/server/backend/api/app/routers/router_idim_proxy.py b/server/backend/api/app/routers/router_idim_proxy.py index 62d8211b9..03e97a78d 100644 --- a/server/backend/api/app/routers/router_idim_proxy.py +++ b/server/backend/api/app/routers/router_idim_proxy.py @@ -5,10 +5,10 @@ from api.app.routers.router_guards import get_current_requester, internal_only_action from api.app.routers.router_utils import get_api_instance_env from api.app.schemas import ( - IdimProxyBceidInfo, - IdimProxyBceidSearchParam, - IdimProxyIdirInfo, - IdimProxySearchParam, + IdimProxyBceidInfoSchema, + IdimProxyBceidSearchParamSchema, + IdimProxyIdirInfoSchema, + IdimProxySearchParamSchema, ) from fastapi import APIRouter, Depends, Query @@ -19,7 +19,7 @@ @router.get( "/idir", - response_model=IdimProxyIdirInfo, + response_model=IdimProxyIdirInfoSchema, dependencies=[Depends(internal_only_action)], ) def idir_search( @@ -29,27 +29,27 @@ def idir_search( # known issue: https://github.com/tiangolo/fastapi/issues/4974 # Fallback to use Query only. requester=Depends(get_current_requester), - api_instance_env: ApiInstanceEnv = Depends(get_api_instance_env) + api_instance_env: ApiInstanceEnv = Depends(get_api_instance_env), ): LOGGER.debug(f"Searching IDIR user with parameter user_id: {user_id}") idim_proxy_api = IdimProxyService(requester, api_instance_env) search_result = idim_proxy_api.search_idir( - IdimProxySearchParam(**{"userId": user_id}) + IdimProxySearchParamSchema(**{"userId": user_id}) ) return search_result # TODO later change this to "/business_bceid" -@router.get("/bceid", response_model=IdimProxyBceidInfo) +@router.get("/bceid", response_model=IdimProxyBceidInfoSchema) def bceid_search( user_id: str = Query(max_length=20), requester=Depends(get_current_requester), - api_instance_env: ApiInstanceEnv = Depends(get_api_instance_env) + api_instance_env: ApiInstanceEnv = Depends(get_api_instance_env), ): LOGGER.debug(f"Searching BCEID user with parameter user_id: {user_id}") idim_proxy_api = IdimProxyService(requester, api_instance_env) search_result = idim_proxy_api.search_business_bceid( - IdimProxyBceidSearchParam( + IdimProxyBceidSearchParamSchema( **{"searchUserBy": IdimSearchUserParamType.USER_ID, "searchValue": user_id} ) ) diff --git a/server/backend/api/app/routers/router_user.py b/server/backend/api/app/routers/router_user.py index c58e9843a..1fa6cd8ad 100644 --- a/server/backend/api/app/routers/router_user.py +++ b/server/backend/api/app/routers/router_user.py @@ -5,13 +5,17 @@ from api.app.crud import crud_user from fastapi import APIRouter, Depends from sqlalchemy.orm import Session -from api.app.schemas import FamUserUpdateResponse +from api.app.schemas import FamUserUpdateResponseSchema LOGGER = logging.getLogger(__name__) router = APIRouter() -@router.put("/users-information", status_code=HTTPStatus.OK, response_model=FamUserUpdateResponse) +@router.put( + "/users-information", + status_code=HTTPStatus.OK, + response_model=FamUserUpdateResponseSchema, +) def update_user_information_from_idim_source( page: int = 1, per_page: int = 100, diff --git a/server/backend/api/app/routers/router_user_role_assignment.py b/server/backend/api/app/routers/router_user_role_assignment.py index df976ab4c..6d3f2f790 100644 --- a/server/backend/api/app/routers/router_user_role_assignment.py +++ b/server/backend/api/app/routers/router_user_role_assignment.py @@ -4,15 +4,22 @@ from api.app.crud import crud_role, crud_user, crud_user_role from api.app.models.model import FamUser from api.app.routers.router_guards import ( - authorize_by_application_role, authorize_by_privilege, - authorize_by_user_type, enforce_bceid_by_same_org_guard, - enforce_bceid_terms_conditions_guard, enforce_self_grant_guard, - get_current_requester, get_verified_target_user) -from api.app.schemas import (FamUserRoleAssignmentCreate, - FamUserRoleAssignmentResponse, Requester, - TargetUser) -from api.app.utils.audit_util import (AuditEventLog, AuditEventOutcome, - AuditEventType) + authorize_by_application_role, + authorize_by_privilege, + authorize_by_user_type, + enforce_bceid_by_same_org_guard, + enforce_bceid_terms_conditions_guard, + enforce_self_grant_guard, + get_current_requester, + get_verified_target_user, +) +from api.app.schemas import ( + FamUserRoleAssignmentCreateSchema, + FamUserRoleAssignmentResponseSchema, + RequesterSchema, + TargetUserSchema, +) +from api.app.utils.audit_util import AuditEventLog, AuditEventOutcome, AuditEventType from fastapi import APIRouter, Depends, Request, Response from sqlalchemy.orm import Session @@ -25,17 +32,17 @@ @router.post( "", - response_model=FamUserRoleAssignmentResponse, + response_model=FamUserRoleAssignmentResponseSchema, # Guarding endpoint with Depends(). dependencies=[ Depends(enforce_self_grant_guard), Depends(enforce_bceid_terms_conditions_guard), Depends( authorize_by_application_role - ), # requester needs to be app admin or delegated admin + ), # Requester needs to be app admin or delegated admin Depends( authorize_by_privilege - ), # if requester is delegated admin, needs to have privilge to grant access with the request role + ), # if Requester is delegated admin, needs to have privilge to grant access with the request role Depends( authorize_by_user_type ), # check business bceid user cannot grant idir user access @@ -46,12 +53,12 @@ description="Grant User Access to an application's role.", ) def create_user_role_assignment_many( - role_assignment_request: FamUserRoleAssignmentCreate, + role_assignment_request: FamUserRoleAssignmentCreateSchema, request: Request, db: Session = Depends(database.get_db), token_claims: dict = Depends(jwt_validation.validate_token), - requester: Requester = Depends(get_current_requester), - target_user: TargetUser = Depends(get_verified_target_user), + requester: RequesterSchema = Depends(get_current_requester), + target_user: TargetUserSchema = Depends(get_verified_target_user), ): """ Create FAM user_role_xref association. @@ -75,9 +82,12 @@ def create_user_role_assignment_many( audit_event_log.application = role.application audit_event_log.requesting_user = requester - response = FamUserRoleAssignmentResponse( + response = FamUserRoleAssignmentResponseSchema( assignments_detail=crud_user_role.create_user_role_assignment_many( - db, role_assignment_request, target_user, requester.cognito_user_id + db, + role_assignment_request, + target_user, + requester.cognito_user_id, ) ) @@ -133,7 +143,7 @@ def delete_user_role_assignment( request: Request, user_role_xref_id: int, db: Session = Depends(database.get_db), - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ) -> None: """ Delete FAM user_role_xref association. diff --git a/server/backend/api/app/routers/router_user_terms_conditions.py b/server/backend/api/app/routers/router_user_terms_conditions.py index af0178404..466a43dc7 100644 --- a/server/backend/api/app/routers/router_user_terms_conditions.py +++ b/server/backend/api/app/routers/router_user_terms_conditions.py @@ -4,8 +4,10 @@ from api.app import database from api.app.crud import crud_user_terms_conditions from api.app.routers.router_guards import ( - external_delegated_admin_only_action, get_current_requester) -from api.app.schemas import Requester + external_delegated_admin_only_action, + get_current_requester, +) +from api.app.schemas import RequesterSchema from fastapi import APIRouter, Depends from sqlalchemy.orm import Session @@ -19,7 +21,7 @@ status_code=HTTPStatus.OK, ) def validate_user_requires_accept_terms_and_conditions( - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): return requester.requires_accept_tc @@ -33,7 +35,7 @@ def validate_user_requires_accept_terms_and_conditions( ) def create_user_terms_and_conditions( db: Session = Depends(database.get_db), - requester: Requester = Depends(get_current_requester), + requester: RequesterSchema = Depends(get_current_requester), ): """ Create a record for terms and conditions acceptance. \n diff --git a/server/backend/api/app/schemas.py b/server/backend/api/app/schemas.py deleted file mode 100644 index afd803691..000000000 --- a/server/backend/api/app/schemas.py +++ /dev/null @@ -1,331 +0,0 @@ -import logging -from typing import List, Literal, Optional, Union - -from pydantic import BaseModel, ConfigDict, EmailStr, Field, StringConstraints -from typing_extensions import Annotated - -from . import constants as famConstants - -LOGGER = logging.getLogger(__name__) - - -# --------------------------------- FAM Application --------------------------------- # -class FamApplication(BaseModel): - application_id: int - application_name: Annotated[str, StringConstraints(max_length=100)] - application_description: Annotated[str, StringConstraints(max_length=200)] - - model_config = ConfigDict(from_attributes=True) - - -# --------------------------------- FAM User --------------------------------- # -class FamUser(BaseModel): - user_type_code: famConstants.UserType - cognito_user_id: Optional[Annotated[str, StringConstraints(max_length=100)]] = ( - None # temporarily optional - ) - user_name: Annotated[str, StringConstraints(max_length=20)] - user_guid: Union[ - Annotated[str, StringConstraints(min_length=32, max_length=32)], None - ] - create_user: Annotated[str, StringConstraints(max_length=100)] - update_user: Optional[Annotated[str, StringConstraints(max_length=100)]] = None - - model_config = ConfigDict(from_attributes=True) - - -class FamUserType(BaseModel): - user_type_code: famConstants.UserType = Field(alias="code") - description: Annotated[str, StringConstraints(max_length=35)] - - # required to set populate_by_name for alias fields - model_config = ConfigDict(from_attributes=True, populate_by_name=True) - - -class FamUserInfo(BaseModel): - user_name: Annotated[str, StringConstraints(max_length=20)] - user_type_relation: FamUserType = Field(alias="user_type") - first_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - last_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - email: Optional[Annotated[str, StringConstraints(max_length=250)]] = None - - # Check https://docs.pydantic.dev/dev-v2/migration/#changes-to-config for more information. - model_config = ConfigDict( - from_attributes=True, - fields={ - "user_guid": {"exclude": True}, - "create_user": {"exclude": True}, - "update_user": {"exclude": True}, - }, - populate_by_name=True, - ) - - -class FamUserUpdateResponse(BaseModel): - total_db_users_count: int - current_page: int - users_count_on_page: int - success_user_id_list: List[int] - failed_user_id_list: List[int] - ignored_user_id_list: List[int] - mismatch_user_list: List[int] - - -# --------------------------------- FAM Forest Client--------------------------------- # -class FamForestClientCreate(BaseModel): - # Note, the request may contain string(with leading '0') - forest_client_number: Annotated[str, StringConstraints(max_length=8)] - # client_name: str - create_user: Annotated[str, StringConstraints(max_length=100)] - - model_config = ConfigDict(from_attributes=True) - - -# This is not an object from FAM model. It is an helper class to map Forest Client API -# client status into FAM's status needs (Active/Inactive). -class FamForestClientStatus(BaseModel): - status_code: famConstants.FamForestClientStatusType - description: Annotated[str, StringConstraints(max_length=10)] - - @staticmethod - def to_fam_status(forest_client_status_code: str): - # Map Forest Client API's 'clientStatusCode' to FAM - accepted_api_active_codes = [famConstants.FOREST_CLIENT_STATUS["CODE_ACTIVE"]] - status_code = ( - famConstants.FamForestClientStatusType.ACTIVE - if forest_client_status_code in accepted_api_active_codes - else famConstants.FamForestClientStatusType.INACTIVE - ) - description = ( - famConstants.DESCRIPTION_ACTIVE - if status_code == famConstants.FamForestClientStatusType.ACTIVE - else famConstants.DESCRIPTION_INACTIVE - ) - status = FamForestClientStatus(status_code=status_code, description=description) - return status - - -class FamForestClient(BaseModel): - client_name: Optional[Annotated[str, StringConstraints(max_length=60)]] = None - forest_client_number: Annotated[str, StringConstraints(max_length=8)] - status: Optional[FamForestClientStatus] = None - - model_config = ConfigDict(from_attributes=True) - - @staticmethod - def from_api_json(json_dict): - LOGGER.debug(f"from_api_json - {json_dict}") - client_name = json_dict["clientName"] - forest_client_number = json_dict["clientNumber"] - forest_client_status_code = json_dict[famConstants.FOREST_CLIENT_STATUS["KEY"]] - status = FamForestClientStatus.to_fam_status(forest_client_status_code) - fc = FamForestClient( - client_name=client_name, - forest_client_number=forest_client_number, - status=status, - ) - return fc - - -# --------------------------------- FAM Role--------------------------------- # -class FamRoleCreate(BaseModel): - role_name: Annotated[str, StringConstraints(max_length=100)] - role_purpose: Union[Annotated[str, StringConstraints(max_length=300)], None] = None - display_name: Optional[Annotated[str, StringConstraints(max_length=100)]] = None - parent_role_id: Union[int, None] = Field( - default=None, title="Reference role_id to higher role" - ) - application_id: int = Field(title="Application this role is associated with") - role_type_code: famConstants.RoleType - forest_client_number: Union[ - Annotated[str, StringConstraints(max_length=8)], None - ] = Field(default=None, title="Forest Client this role is associated with") - create_user: Annotated[str, StringConstraints(max_length=100)] - client_number: Optional[FamForestClientCreate] = None - - model_config = ConfigDict(from_attributes=True) - - -class FamRoleMin(BaseModel): - role_name: Annotated[str, StringConstraints(max_length=100)] - role_type_code: famConstants.RoleType - application: FamApplication - - model_config = ConfigDict(from_attributes=True) - - -class FamRoleWithClient(FamRoleMin): - role_id: int - display_name: Optional[Annotated[str, StringConstraints(max_length=100)]] = None - description: Optional[Annotated[str, StringConstraints(max_length=300)]] = Field( - validation_alias="role_purpose" - ) - client_number: Optional[FamForestClient] = None - parent_role: Optional[FamRoleMin] = None - - # Check https://docs.pydantic.dev/dev-v2/migration/#changes-to-config for more information. - model_config = ConfigDict( - from_attributes=True, - fields={ - "update_user": {"exclude": True}, - "role_purpose": {"exclude": True}, - "parent_role_id": {"exclude": True}, - "application_id": {"exclude": True}, - "forest_client_number": {"exclude": True}, - "role_id": {"exclude": True}, - "create_user": {"exclude": True}, - }, - ) - - -# --------------------------------- FAM User Role Assignment--------------------------------- # -# Role assignment with one role at a time for the user. -class FamUserRoleAssignmentCreate(BaseModel): - user_name: Annotated[ - str, StringConstraints(min_length=3, max_length=20) - ] # IDIM search max length - user_guid: Annotated[str, StringConstraints(min_length=32, max_length=32)] - user_type_code: famConstants.UserType - role_id: int - forest_client_numbers: Union[ - List[Annotated[str, StringConstraints(min_length=1, max_length=8)]], None - ] = None - requires_send_user_email: bool = False - - model_config = ConfigDict(from_attributes=True) - - -class FamApplicationUserRoleAssignmentGet(BaseModel): - user_role_xref_id: int - user_id: int - role_id: int - user: FamUserInfo - role: FamRoleWithClient - - # Check https://docs.pydantic.dev/dev-v2/migration/#changes-to-config for more information. - model_config = ConfigDict(from_attributes=True) - - -class FamUserRoleAssignmentCreateResponse(BaseModel): - status_code: int - detail: FamApplicationUserRoleAssignmentGet - error_message: Optional[str] = None - - model_config = ConfigDict(from_attributes=True) - - -class FamUserRoleAssignmentResponse(BaseModel): - email_sending_status: famConstants.EmailSendingStatus = famConstants.EmailSendingStatus.NOT_REQUIRED - assignments_detail: List[FamUserRoleAssignmentCreateResponse] - - -# ------------------------------------- IDIM Proxy API Integraion ---------------------------------------- # -class IdimProxySearchParam(BaseModel): - userId: Annotated[ - str, StringConstraints(max_length=20) - ] # param for Idim-Proxy search of this form (not snake case) - - -class IdimProxyBceidSearchParam(BaseModel): - searchUserBy: famConstants.IdimSearchUserParamType - searchValue: str - - -class IdimProxyIdirInfo(BaseModel): - # property returned from Idim-Proxy search of this form (not snake case) - found: bool - userId: Annotated[str, StringConstraints(max_length=20)] - guid: Optional[Annotated[str, StringConstraints(max_length=32)]] = None - firstName: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - lastName: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - email: Optional[Annotated[str, StringConstraints(max_length=250)]] = None - - -class IdimProxyBceidInfo(BaseModel): - found: bool - userId: Annotated[str, StringConstraints(max_length=20)] - guid: Optional[Annotated[str, StringConstraints(max_length=32)]] = None - businessGuid: Optional[Annotated[str, StringConstraints(max_length=32)]] = None - businessLegalName: Optional[Annotated[str, StringConstraints(max_length=60)]] = None - firstName: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - lastName: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - email: Optional[Annotated[str, StringConstraints(max_length=250)]] = None - - -# ------------------------------------- GC Notify Integraion ---------------------------------------- # -class GCNotifyGrantAccessEmailParam(BaseModel): - first_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - last_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - application_name: Annotated[str, StringConstraints(max_length=35)] - role_list_string: Annotated[str, StringConstraints(max_length=500)] - application_team_contact_email: Optional[EmailStr] = None - send_to_email: EmailStr - with_client_number: Literal['yes', 'no'] - - -# ------------------------------------- Forest Client API Integraion ---------------------------------------- # -class ForestClientIntegrationFindResponse(BaseModel): - clientNumber: str - clientName: str - clientStatusCode: str - clientTypeCode: str - - -# ---------- System schema objects ---------- # -""" -The "Requester" and "TargetUser" schema objects are internal backend system -wide objects. -They are "NOT" intended as part of the request/respoinse body for endponts. -The "Requester" means "who" is issueing the request for one of FAM endpoints. -The "TargetUser" means "who" is the user this endpoint request is targeting -for. - - The exsiting endpoints so far only target on one target user. It might be - possible some endpoints will target on multiple users. In such case, - further design or refactoring might be needed. -""" - - -class Requester(BaseModel): - """ - Class holding information for user who access FAM system after being - authenticated. Logged on user with jwt token is parsed into Requester. - It is transformed from db model "FamUser". Most endpoints will need this - Requester instance, and can be available for router handler and passed to - service layer. - """ - - # cognito_user_id => Cognito OIDC access token maps this to: username (ID token => "custom:idp_name" ) - cognito_user_id: Union[str, None] = None - user_name: Annotated[str, StringConstraints(max_length=20)] - # "B"(BCeID) or "I"(IDIR). It is the IDP provider. - user_type_code: Union[famConstants.UserType, None] = None - user_guid: Annotated[str, StringConstraints(min_length=32, max_length=32)] - business_guid: Optional[Annotated[str, StringConstraints(max_length=32)]] = None - user_id: int - - # belows are custom Requester information attributes. - access_roles: Union[ - List[Annotated[str, StringConstraints(max_length=50)]], None - ] = None - is_delegated_admin: bool = False # is delegated admin of any application - requires_accept_tc: bool = False # requires to accept terms and conditions - - def is_external_delegated_admin(self): - return ( - self.user_type_code == famConstants.UserType.BCEID - and self.is_delegated_admin - ) - - model_config = ConfigDict(from_attributes=True) - - -class TargetUser(Requester): - """ - Inherit from the class "Requester". Same as Requester, the TargetUser can - be transformed from FamUser db model. - """ - - user_id: Optional[int] = None - first_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - last_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None - email: Optional[Annotated[str, StringConstraints(max_length=250)]] = None diff --git a/server/backend/api/app/schemas/__init__.py b/server/backend/api/app/schemas/__init__.py new file mode 100644 index 000000000..c4a4d164d --- /dev/null +++ b/server/backend/api/app/schemas/__init__.py @@ -0,0 +1,58 @@ +# --------------------------------- FAM Application --------------------------------- # +from .fam_application import FamApplicationSchema + +# --------------------------------- FAM User --------------------------------- # +from .fam_user import FamUserSchema +from .fam_user_type import FamUserTypeSchema +from .fam_user_info import FamUserInfoSchema +from .fam_user_update_response import FamUserUpdateResponseSchema + +# --------------------------------- FAM Forest Client--------------------------------- # +from .fam_forest_client_create import FamForestClientCreateSchema +from .fam_forest_client_status import FamForestClientStatusSchema +from .fam_forest_client import FamForestClientSchema + +# --------------------------------- FAM Role--------------------------------- # +from .fam_role_create import FamRoleCreateSchema +from .fam_role_min import FamRoleMinSchema +from .fam_role_with_client import FamRoleWithClientSchema + +# --------------------------------- FAM User Role Assignment--------------------------------- # +from .fam_user_role_assignment_create import FamUserRoleAssignmentCreateSchema +from .fam_application_user_role_assignment_get import ( + FamApplicationUserRoleAssignmentGetSchema, +) +from .fam_user_role_assignment_create_response import ( + FamUserRoleAssignmentCreateResponseSchema, +) +from .fam_user_role_assignment_response import FamUserRoleAssignmentResponseSchema + +# ------------------------------------- IDIM Proxy API Integraion ---------------------------------------- # +from .idim_proxy_search_param import IdimProxySearchParamSchema +from .idim_proxy_bceid_search_param import IdimProxyBceidSearchParamSchema +from .idim_proxy_idir_info import IdimProxyIdirInfoSchema +from .idim_proxy_bceid_info import IdimProxyBceidInfoSchema + +# ------------------------------------- GC Notify Integraion ---------------------------------------- # +from .gc_notify_grant_access_email_param import GCNotifyGrantAccessEmailParamSchema + +# ------------------------------------- Forest Client API Integraion ---------------------------------------- # +from .forest_client_integration_find_response import ( + ForestClientIntegrationFindResponseSchema, +) + + +# ---------- System schema objects ---------- # +""" +The "Requester" and "TargetUser" schema objects are internal backend system +wide objects. +They are "NOT" intended as part of the request/respoinse body for endponts. +The "Requester" means "who" is issueing the request for one of FAM endpoints. +The "TargetUser" means "who" is the user this endpoint request is targeting +for. + - The exsiting endpoints so far only target on one target user. It might be + possible some endpoints will target on multiple users. In such case, + further design or refactoring might be needed. +""" +from .requester import RequesterSchema +from .target_user import TargetUserSchema diff --git a/server/backend/api/app/schemas/fam_application.py b/server/backend/api/app/schemas/fam_application.py new file mode 100644 index 000000000..ba2709d47 --- /dev/null +++ b/server/backend/api/app/schemas/fam_application.py @@ -0,0 +1,10 @@ +from pydantic import BaseModel, ConfigDict, StringConstraints +from typing_extensions import Annotated + + +class FamApplicationSchema(BaseModel): + application_id: int + application_name: Annotated[str, StringConstraints(max_length=100)] + application_description: Annotated[str, StringConstraints(max_length=200)] + + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/fam_application_user_role_assignment_get.py b/server/backend/api/app/schemas/fam_application_user_role_assignment_get.py new file mode 100644 index 000000000..0a40c16d5 --- /dev/null +++ b/server/backend/api/app/schemas/fam_application_user_role_assignment_get.py @@ -0,0 +1,13 @@ +from pydantic import BaseModel, ConfigDict +from .fam_user_info import FamUserInfoSchema +from .fam_role_with_client import FamRoleWithClientSchema + +class FamApplicationUserRoleAssignmentGetSchema(BaseModel): + user_role_xref_id: int + user_id: int + role_id: int + user: FamUserInfoSchema + role: FamRoleWithClientSchema + + # Check https://docs.pydantic.dev/dev-v2/migration/#changes-to-config for more information. + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/fam_forest_client.py b/server/backend/api/app/schemas/fam_forest_client.py new file mode 100644 index 000000000..750a4c36a --- /dev/null +++ b/server/backend/api/app/schemas/fam_forest_client.py @@ -0,0 +1,32 @@ +import logging +from typing import Optional +from pydantic import BaseModel, ConfigDict, StringConstraints +from typing_extensions import Annotated +from api.app.constants import FOREST_CLIENT_STATUS + +from .fam_forest_client_status import FamForestClientStatusSchema + +LOGGER = logging.getLogger(__name__) + + +class FamForestClientSchema(BaseModel): + + client_name: Optional[Annotated[str, StringConstraints(max_length=60)]] = None + forest_client_number: Annotated[str, StringConstraints(max_length=8)] + status: Optional[FamForestClientStatusSchema] = None + + model_config = ConfigDict(from_attributes=True) + + @staticmethod + def from_api_json(json_dict): + LOGGER.debug(f"from_api_json - {json_dict}") + client_name = json_dict["clientName"] + forest_client_number = json_dict["clientNumber"] + forest_client_status_code = json_dict[FOREST_CLIENT_STATUS["KEY"]] + status = FamForestClientStatusSchema.to_fam_status(forest_client_status_code) + fc = FamForestClientSchema( + client_name=client_name, + forest_client_number=forest_client_number, + status=status, + ) + return fc diff --git a/server/backend/api/app/schemas/fam_forest_client_create.py b/server/backend/api/app/schemas/fam_forest_client_create.py new file mode 100644 index 000000000..afe0057ad --- /dev/null +++ b/server/backend/api/app/schemas/fam_forest_client_create.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel, ConfigDict, StringConstraints +from typing_extensions import Annotated + + +# --------------------------------- FAM Forest Client--------------------------------- # +class FamForestClientCreateSchema(BaseModel): + # Note, the request may contain string(with leading '0') + forest_client_number: Annotated[str, StringConstraints(max_length=8)] + # client_name: str + create_user: Annotated[str, StringConstraints(max_length=100)] + + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/fam_forest_client_status.py b/server/backend/api/app/schemas/fam_forest_client_status.py new file mode 100644 index 000000000..41e9a8f4c --- /dev/null +++ b/server/backend/api/app/schemas/fam_forest_client_status.py @@ -0,0 +1,36 @@ +from pydantic import BaseModel, StringConstraints +from typing_extensions import Annotated + +from api.app.constants import ( + FOREST_CLIENT_STATUS, + DESCRIPTION_ACTIVE, + DESCRIPTION_INACTIVE, + FamForestClientStatusType, +) + + +# This is not an object from FAM model. It is an helper class to map Forest Client API +# client status into FAM's status needs (Active/Inactive). +class FamForestClientStatusSchema(BaseModel): + + status_code: FamForestClientStatusType + description: Annotated[str, StringConstraints(max_length=10)] + + @staticmethod + def to_fam_status(forest_client_status_code: str): + # Map Forest Client API's 'clientStatusCode' to FAM + accepted_api_active_codes = [FOREST_CLIENT_STATUS["CODE_ACTIVE"]] + status_code = ( + FamForestClientStatusType.ACTIVE + if forest_client_status_code in accepted_api_active_codes + else FamForestClientStatusType.INACTIVE + ) + description = ( + DESCRIPTION_ACTIVE + if status_code == FamForestClientStatusType.ACTIVE + else DESCRIPTION_INACTIVE + ) + status = FamForestClientStatusSchema( + status_code=status_code, description=description + ) + return status diff --git a/server/backend/api/app/schemas/fam_role_create.py b/server/backend/api/app/schemas/fam_role_create.py new file mode 100644 index 000000000..9919388e6 --- /dev/null +++ b/server/backend/api/app/schemas/fam_role_create.py @@ -0,0 +1,23 @@ +from typing import Optional, Union +from pydantic import BaseModel, ConfigDict, Field, StringConstraints +from typing_extensions import Annotated +from api.app.constants import RoleType +from .fam_forest_client_create import FamForestClientCreateSchema + + +class FamRoleCreateSchema(BaseModel): + role_name: Annotated[str, StringConstraints(max_length=100)] + role_purpose: Union[Annotated[str, StringConstraints(max_length=300)], None] = None + display_name: Optional[Annotated[str, StringConstraints(max_length=100)]] = None + parent_role_id: Union[int, None] = Field( + default=None, title="Reference role_id to higher role" + ) + application_id: int = Field(title="Application this role is associated with") + role_type_code: RoleType + forest_client_number: Union[ + Annotated[str, StringConstraints(max_length=8)], None + ] = Field(default=None, title="Forest Client this role is associated with") + create_user: Annotated[str, StringConstraints(max_length=100)] + client_number: Optional[FamForestClientCreateSchema] = None + + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/fam_role_min.py b/server/backend/api/app/schemas/fam_role_min.py new file mode 100644 index 000000000..d9bd5f8f7 --- /dev/null +++ b/server/backend/api/app/schemas/fam_role_min.py @@ -0,0 +1,12 @@ +from pydantic import BaseModel, ConfigDict, StringConstraints +from typing_extensions import Annotated +from api.app.constants import RoleType +from .fam_application import FamApplicationSchema + + +class FamRoleMinSchema(BaseModel): + role_name: Annotated[str, StringConstraints(max_length=100)] + role_type_code: RoleType + application: FamApplicationSchema + + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/fam_role_with_client.py b/server/backend/api/app/schemas/fam_role_with_client.py new file mode 100644 index 000000000..5843a3858 --- /dev/null +++ b/server/backend/api/app/schemas/fam_role_with_client.py @@ -0,0 +1,31 @@ +import logging +from typing import Optional + +from pydantic import ConfigDict, Field, StringConstraints +from typing_extensions import Annotated + +from .fam_role_min import FamRoleMinSchema +from .fam_forest_client import FamForestClientSchema + +class FamRoleWithClientSchema(FamRoleMinSchema): + role_id: int + display_name: Optional[Annotated[str, StringConstraints(max_length=100)]] = None + description: Optional[Annotated[str, StringConstraints(max_length=300)]] = Field( + validation_alias="role_purpose" + ) + client_number: Optional[FamForestClientSchema] = None + parent_role: Optional[FamRoleMinSchema] = None + + # Check https://docs.pydantic.dev/dev-v2/migration/#changes-to-config for more information. + model_config = ConfigDict( + from_attributes=True, + fields={ + "update_user": {"exclude": True}, + "role_purpose": {"exclude": True}, + "parent_role_id": {"exclude": True}, + "application_id": {"exclude": True}, + "forest_client_number": {"exclude": True}, + "role_id": {"exclude": True}, + "create_user": {"exclude": True}, + }, + ) diff --git a/server/backend/api/app/schemas/fam_user.py b/server/backend/api/app/schemas/fam_user.py new file mode 100644 index 000000000..49f6bb1b2 --- /dev/null +++ b/server/backend/api/app/schemas/fam_user.py @@ -0,0 +1,21 @@ +from typing import Optional, Union + +from pydantic import BaseModel, ConfigDict, StringConstraints +from typing_extensions import Annotated + +from api.app.constants import UserType + + +class FamUserSchema(BaseModel): + user_type_code: UserType + cognito_user_id: Optional[Annotated[str, StringConstraints(max_length=100)]] = ( + None # temporarily optional + ) + user_name: Annotated[str, StringConstraints(max_length=20)] + user_guid: Union[ + Annotated[str, StringConstraints(min_length=32, max_length=32)], None + ] + create_user: Annotated[str, StringConstraints(max_length=100)] + update_user: Optional[Annotated[str, StringConstraints(max_length=100)]] = None + + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/fam_user_info.py b/server/backend/api/app/schemas/fam_user_info.py new file mode 100644 index 000000000..b483a0f46 --- /dev/null +++ b/server/backend/api/app/schemas/fam_user_info.py @@ -0,0 +1,24 @@ +from typing import Optional +from pydantic import BaseModel, ConfigDict, Field, StringConstraints +from typing_extensions import Annotated +from .fam_user_type import FamUserTypeSchema + + +class FamUserInfoSchema(BaseModel): + + user_name: Annotated[str, StringConstraints(max_length=20)] + user_type_relation: FamUserTypeSchema = Field(alias="user_type") + first_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + last_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + email: Optional[Annotated[str, StringConstraints(max_length=250)]] = None + + # Check https://docs.pydantic.dev/dev-v2/migration/#changes-to-config for more information. + model_config = ConfigDict( + from_attributes=True, + fields={ + "user_guid": {"exclude": True}, + "create_user": {"exclude": True}, + "update_user": {"exclude": True}, + }, + populate_by_name=True, + ) diff --git a/server/backend/api/app/schemas/fam_user_role_assignment_create.py b/server/backend/api/app/schemas/fam_user_role_assignment_create.py new file mode 100644 index 000000000..d91c22882 --- /dev/null +++ b/server/backend/api/app/schemas/fam_user_role_assignment_create.py @@ -0,0 +1,20 @@ +from typing import List, Union + +from pydantic import BaseModel, ConfigDict, StringConstraints +from typing_extensions import Annotated +from api.app.constants import UserType + +# Role assignment with one role at a time for the user. +class FamUserRoleAssignmentCreateSchema(BaseModel): + user_name: Annotated[ + str, StringConstraints(min_length=3, max_length=20) + ] # IDIM search max length + user_guid: Annotated[str, StringConstraints(min_length=32, max_length=32)] + user_type_code: UserType + role_id: int + forest_client_numbers: Union[ + List[Annotated[str, StringConstraints(min_length=1, max_length=8)]], None + ] = None + requires_send_user_email: bool = False + + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/fam_user_role_assignment_create_response.py b/server/backend/api/app/schemas/fam_user_role_assignment_create_response.py new file mode 100644 index 000000000..f1f093109 --- /dev/null +++ b/server/backend/api/app/schemas/fam_user_role_assignment_create_response.py @@ -0,0 +1,13 @@ +from typing import Optional +from pydantic import BaseModel, ConfigDict +from .fam_application_user_role_assignment_get import ( + FamApplicationUserRoleAssignmentGetSchema, +) + + +class FamUserRoleAssignmentCreateResponseSchema(BaseModel): + status_code: int + detail: FamApplicationUserRoleAssignmentGetSchema + error_message: Optional[str] = None + + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/fam_user_role_assignment_response.py b/server/backend/api/app/schemas/fam_user_role_assignment_response.py new file mode 100644 index 000000000..feb598f3b --- /dev/null +++ b/server/backend/api/app/schemas/fam_user_role_assignment_response.py @@ -0,0 +1,11 @@ +from typing import List +from pydantic import BaseModel +from api.app.constants import EmailSendingStatus +from .fam_user_role_assignment_create_response import ( + FamUserRoleAssignmentCreateResponseSchema, +) + + +class FamUserRoleAssignmentResponseSchema(BaseModel): + email_sending_status: EmailSendingStatus = EmailSendingStatus.NOT_REQUIRED + assignments_detail: List[FamUserRoleAssignmentCreateResponseSchema] diff --git a/server/backend/api/app/schemas/fam_user_type.py b/server/backend/api/app/schemas/fam_user_type.py new file mode 100644 index 000000000..fb67d6b2d --- /dev/null +++ b/server/backend/api/app/schemas/fam_user_type.py @@ -0,0 +1,12 @@ +import logging +from pydantic import BaseModel, ConfigDict, Field, StringConstraints +from typing_extensions import Annotated +from api.app.constants import UserType + + +class FamUserTypeSchema(BaseModel): + user_type_code: UserType = Field(alias="code") + description: Annotated[str, StringConstraints(max_length=35)] + + # required to set populate_by_name for alias fields + model_config = ConfigDict(from_attributes=True, populate_by_name=True) diff --git a/server/backend/api/app/schemas/fam_user_update_response.py b/server/backend/api/app/schemas/fam_user_update_response.py new file mode 100644 index 000000000..e619878fb --- /dev/null +++ b/server/backend/api/app/schemas/fam_user_update_response.py @@ -0,0 +1,12 @@ +from typing import List +from pydantic import BaseModel + + +class FamUserUpdateResponseSchema(BaseModel): + total_db_users_count: int + current_page: int + users_count_on_page: int + success_user_id_list: List[int] + failed_user_id_list: List[int] + ignored_user_id_list: List[int] + mismatch_user_list: List[int] diff --git a/server/backend/api/app/schemas/forest_client_integration_find_response.py b/server/backend/api/app/schemas/forest_client_integration_find_response.py new file mode 100644 index 000000000..827c0cea8 --- /dev/null +++ b/server/backend/api/app/schemas/forest_client_integration_find_response.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel + + +class ForestClientIntegrationFindResponseSchema(BaseModel): + clientNumber: str + clientName: str + clientStatusCode: str + clientTypeCode: str diff --git a/server/backend/api/app/schemas/gc_notify_grant_access_email_param.py b/server/backend/api/app/schemas/gc_notify_grant_access_email_param.py new file mode 100644 index 000000000..efa715056 --- /dev/null +++ b/server/backend/api/app/schemas/gc_notify_grant_access_email_param.py @@ -0,0 +1,13 @@ +from typing import Literal, Optional +from pydantic import BaseModel, EmailStr, StringConstraints +from typing_extensions import Annotated + + +class GCNotifyGrantAccessEmailParamSchema(BaseModel): + first_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + last_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + application_name: Annotated[str, StringConstraints(max_length=35)] + role_list_string: Annotated[str, StringConstraints(max_length=500)] + application_team_contact_email: Optional[EmailStr] = None + send_to_email: EmailStr + with_client_number: Literal["yes", "no"] diff --git a/server/backend/api/app/schemas/idim_proxy_bceid_info.py b/server/backend/api/app/schemas/idim_proxy_bceid_info.py new file mode 100644 index 000000000..24ad66726 --- /dev/null +++ b/server/backend/api/app/schemas/idim_proxy_bceid_info.py @@ -0,0 +1,14 @@ +from typing import Optional +from pydantic import BaseModel, StringConstraints +from typing_extensions import Annotated + + +class IdimProxyBceidInfoSchema(BaseModel): + found: bool + userId: Annotated[str, StringConstraints(max_length=20)] + guid: Optional[Annotated[str, StringConstraints(max_length=32)]] = None + businessGuid: Optional[Annotated[str, StringConstraints(max_length=32)]] = None + businessLegalName: Optional[Annotated[str, StringConstraints(max_length=60)]] = None + firstName: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + lastName: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + email: Optional[Annotated[str, StringConstraints(max_length=250)]] = None diff --git a/server/backend/api/app/schemas/idim_proxy_bceid_search_param.py b/server/backend/api/app/schemas/idim_proxy_bceid_search_param.py new file mode 100644 index 000000000..1e968cd2a --- /dev/null +++ b/server/backend/api/app/schemas/idim_proxy_bceid_search_param.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel +from api.app.constants import IdimSearchUserParamType + + +class IdimProxyBceidSearchParamSchema(BaseModel): + searchUserBy: IdimSearchUserParamType + searchValue: str diff --git a/server/backend/api/app/schemas/idim_proxy_idir_info.py b/server/backend/api/app/schemas/idim_proxy_idir_info.py new file mode 100644 index 000000000..42e3e1cf3 --- /dev/null +++ b/server/backend/api/app/schemas/idim_proxy_idir_info.py @@ -0,0 +1,13 @@ +from typing import Optional +from pydantic import BaseModel, StringConstraints +from typing_extensions import Annotated + + +class IdimProxyIdirInfoSchema(BaseModel): + # property returned from Idim-Proxy search of this form (not snake case) + found: bool + userId: Annotated[str, StringConstraints(max_length=20)] + guid: Optional[Annotated[str, StringConstraints(max_length=32)]] = None + firstName: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + lastName: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + email: Optional[Annotated[str, StringConstraints(max_length=250)]] = None diff --git a/server/backend/api/app/schemas/idim_proxy_search_param.py b/server/backend/api/app/schemas/idim_proxy_search_param.py new file mode 100644 index 000000000..365448f74 --- /dev/null +++ b/server/backend/api/app/schemas/idim_proxy_search_param.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel, StringConstraints +from typing_extensions import Annotated + + +class IdimProxySearchParamSchema(BaseModel): + userId: Annotated[ + str, StringConstraints(max_length=20) + ] # param for Idim-Proxy search of this form (not snake case) diff --git a/server/backend/api/app/schemas/requester.py b/server/backend/api/app/schemas/requester.py new file mode 100644 index 000000000..25f7d1ec2 --- /dev/null +++ b/server/backend/api/app/schemas/requester.py @@ -0,0 +1,36 @@ +from typing import List, Optional, Union +from pydantic import BaseModel, ConfigDict, StringConstraints +from typing_extensions import Annotated + +from api.app.constants import UserType + + +class RequesterSchema(BaseModel): + """ + Class holding information for user who access FAM system after being + authenticated. Logged on user with jwt token is parsed into Requester. + It is transformed from db model "FamUser". Most endpoints will need this + Requester instance, and can be available for router handler and passed to + service layer. + """ + + # cognito_user_id => Cognito OIDC access token maps this to: username (ID token => "custom:idp_name" ) + cognito_user_id: Union[str, None] = None + user_name: Annotated[str, StringConstraints(max_length=20)] + # "B"(BCeID) or "I"(IDIR). It is the IDP provider. + user_type_code: Union[UserType, None] = None + user_guid: Annotated[str, StringConstraints(min_length=32, max_length=32)] + business_guid: Optional[Annotated[str, StringConstraints(max_length=32)]] = None + user_id: int + + # belows are custom Requester information attributes. + access_roles: Union[ + List[Annotated[str, StringConstraints(max_length=50)]], None + ] = None + is_delegated_admin: bool = False # is delegated admin of any application + requires_accept_tc: bool = False # requires to accept terms and conditions + + def is_external_delegated_admin(self): + return self.user_type_code == UserType.BCEID and self.is_delegated_admin + + model_config = ConfigDict(from_attributes=True) diff --git a/server/backend/api/app/schemas/target_user.py b/server/backend/api/app/schemas/target_user.py new file mode 100644 index 000000000..6051f1fc6 --- /dev/null +++ b/server/backend/api/app/schemas/target_user.py @@ -0,0 +1,17 @@ +from typing import Optional +from pydantic import StringConstraints +from typing_extensions import Annotated + +from .requester import RequesterSchema + + +class TargetUserSchema(RequesterSchema): + """ + Inherit from the class "Requester". Same as Requester, the TargetUser can + be transformed from FamUser db model. + """ + + user_id: Optional[int] = None + first_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + last_name: Optional[Annotated[str, StringConstraints(max_length=50)]] = None + email: Optional[Annotated[str, StringConstraints(max_length=250)]] = None diff --git a/server/backend/api/app/utils/audit_util.py b/server/backend/api/app/utils/audit_util.py index 7e5453235..a5b7be6b3 100644 --- a/server/backend/api/app/utils/audit_util.py +++ b/server/backend/api/app/utils/audit_util.py @@ -3,7 +3,7 @@ from enum import Enum import json from api.app.models import model as models -from api.app.schemas import Requester +from api.app.schemas import RequesterSchema from fastapi import Request, HTTPException @@ -27,7 +27,7 @@ class AuditEventLog: application: models.FamApplication role: models.FamRole forest_client_numbers: List[str] - requesting_user: Requester + requesting_user: RequesterSchema target_user: models.FamUser exception: Exception @@ -39,7 +39,7 @@ def __init__( application: models.FamApplication = None, role: models.FamRole = None, forest_client_numbers: List[str] = [], - requesting_user: Requester = None, + requesting_user: RequesterSchema = None, target_user: models.FamUser = None, exception: Exception = None, ): diff --git a/server/backend/testspg/conftest.py b/server/backend/testspg/conftest.py index 9b8b9fdee..9c5e5dec3 100644 --- a/server/backend/testspg/conftest.py +++ b/server/backend/testspg/conftest.py @@ -18,16 +18,20 @@ import api.app.database as database import api.app.jwt_validation as jwt_validation import testspg.jwt_utils as jwt_utils -from api.app.constants import (COGNITO_USERNAME_KEY, - ERROR_CODE_TERMS_CONDITIONS_REQUIRED) +from api.app.constants import COGNITO_USERNAME_KEY, ERROR_CODE_TERMS_CONDITIONS_REQUIRED from api.app.crud import crud_utils from api.app.main import apiPrefix, app from api.app.routers.router_guards import ( - enforce_bceid_terms_conditions_guard, get_current_requester, - get_verified_target_user) -from api.app.schemas import Requester, TargetUser -from testspg.constants import (ACCESS_GRANT_FOM_DEV_CR_IDIR, - FOM_DEV_ADMIN_ROLE, FOM_TEST_ADMIN_ROLE) + enforce_bceid_terms_conditions_guard, + get_current_requester, + get_verified_target_user, +) +from api.app.schemas import RequesterSchema, TargetUserSchema +from testspg.constants import ( + ACCESS_GRANT_FOM_DEV_CR_IDIR, + FOM_DEV_ADMIN_ROLE, + FOM_TEST_ADMIN_ROLE, +) LOGGER = logging.getLogger(__name__) # the folder contains test docker-compose.yml, ours in the root directory @@ -158,7 +162,7 @@ def fom_test_access_admin_token(test_rsa_key): @pytest.fixture(scope="function") def get_current_requester_by_token(db_pg_session): """ - Convenient fixture to get current requester from token (retrieved from database setup). + Convenient fixture to get current Requester from token (retrieved from database setup). The fixture returns a function to be called based on access_token's ["username"] , which is the user's cognito_user_id. @@ -170,7 +174,7 @@ def get_current_requester_by_token(db_pg_session): currently how Pytest can work with async from fixture.) """ - def _get_current_requester_by_token(access_token: str) -> Requester: + def _get_current_requester_by_token(access_token: str) -> RequesterSchema: claims = jwt.decode(access_token, options={"verify_signature": False}) requester = get_current_requester( @@ -190,7 +194,7 @@ def override_get_verified_target_user(test_client_fixture): # mock the return result for idim validation of the target user, to avoid calling external idim-proxy def _override_get_verified_target_user(mocked_data=ACCESS_GRANT_FOM_DEV_CR_IDIR): app = test_client_fixture.app - app.dependency_overrides[get_verified_target_user] = lambda: TargetUser( + app.dependency_overrides[get_verified_target_user] = lambda: TargetUserSchema( **mocked_data ) @@ -244,12 +248,13 @@ def override_enforce_bceid_terms_conditions_guard(test_client_fixture): # Override T&C checks based on test cases scenarios. def _override_enforce_bceid_terms_conditions_guard(mocked_tc_accepted=True): app = test_client_fixture.app - app.dependency_overrides[ - enforce_bceid_terms_conditions_guard - ] = lambda: (None if mocked_tc_accepted else crud_utils.raise_http_exception( + app.dependency_overrides[enforce_bceid_terms_conditions_guard] = lambda: ( + None + if mocked_tc_accepted + else crud_utils.raise_http_exception( error_code=ERROR_CODE_TERMS_CONDITIONS_REQUIRED, error_msg="Requires to accept terms and conditions.", ) ) - return _override_enforce_bceid_terms_conditions_guard \ No newline at end of file + return _override_enforce_bceid_terms_conditions_guard diff --git a/server/backend/testspg/crud/test_crud_forest_client.py b/server/backend/testspg/crud/test_crud_forest_client.py index fe9a74f5b..f23c7eeed 100644 --- a/server/backend/testspg/crud/test_crud_forest_client.py +++ b/server/backend/testspg/crud/test_crud_forest_client.py @@ -1,6 +1,6 @@ import logging -import api.app.schemas as schemas +from api.app.schemas import FamForestClientCreateSchema from api.app.crud import crud_forest_client from api.app.models import model as models from sqlalchemy.orm import Session @@ -10,71 +10,71 @@ TEST_FOERST_CLIENT_DATA = { "forest_client_number": "00000010", - "create_user": TEST_CREATOR + "create_user": TEST_CREATOR, } TEST_NON_EXIST_FOREST_CLIENT_NUMBER = "99999999" def test_create_forest_client(db_pg_session: Session): new_forest_client = crud_forest_client.create_forest_client( - schemas.FamForestClientCreate(**TEST_FOERST_CLIENT_DATA), - db_pg_session + FamForestClientCreateSchema(**TEST_FOERST_CLIENT_DATA), db_pg_session + ) + assert ( + new_forest_client.forest_client_number + == TEST_FOERST_CLIENT_DATA["forest_client_number"] ) - assert new_forest_client.forest_client_number == \ - TEST_FOERST_CLIENT_DATA["forest_client_number"] found_forest_client = crud_forest_client.get_forest_client( - db_pg_session, - TEST_FOERST_CLIENT_DATA["forest_client_number"] + db_pg_session, TEST_FOERST_CLIENT_DATA["forest_client_number"] + ) + assert found_forest_client.client_number_id == new_forest_client.client_number_id + assert ( + found_forest_client.forest_client_number + == new_forest_client.forest_client_number ) - assert found_forest_client.client_number_id ==\ - new_forest_client.client_number_id - assert found_forest_client.forest_client_number == \ - new_forest_client.forest_client_number def test_get_forest_client(db_pg_session: Session): # get non exist forest client found_forest_client = crud_forest_client.get_forest_client( - db_pg_session, - TEST_NON_EXIST_FOREST_CLIENT_NUMBER + db_pg_session, TEST_NON_EXIST_FOREST_CLIENT_NUMBER ) assert found_forest_client is None # create a new forest client new_forest_client = crud_forest_client.create_forest_client( - schemas.FamForestClientCreate(**TEST_FOERST_CLIENT_DATA), - db_pg_session + FamForestClientCreateSchema(**TEST_FOERST_CLIENT_DATA), db_pg_session + ) + assert ( + new_forest_client.forest_client_number + == TEST_FOERST_CLIENT_DATA["forest_client_number"] ) - assert new_forest_client.forest_client_number == \ - TEST_FOERST_CLIENT_DATA["forest_client_number"] # get the new created forest client found_forest_client = crud_forest_client.get_forest_client( - db_pg_session, - TEST_FOERST_CLIENT_DATA["forest_client_number"] + db_pg_session, TEST_FOERST_CLIENT_DATA["forest_client_number"] + ) + assert ( + found_forest_client.forest_client_number + == TEST_FOERST_CLIENT_DATA["forest_client_number"] ) - assert found_forest_client.forest_client_number == \ - TEST_FOERST_CLIENT_DATA["forest_client_number"] def test_find_or_create(db_pg_session: Session): initial_all = db_pg_session.query(models.FamForestClient).all() # verify the target forest client not exists found_forest_client = crud_forest_client.get_forest_client( - db_pg_session, - TEST_FOERST_CLIENT_DATA["forest_client_number"] + db_pg_session, TEST_FOERST_CLIENT_DATA["forest_client_number"] ) assert found_forest_client is None # find or create with an non exists forest client crud_forest_client.find_or_create( db_pg_session, TEST_FOERST_CLIENT_DATA["forest_client_number"], - TEST_FOERST_CLIENT_DATA["create_user"] + TEST_FOERST_CLIENT_DATA["create_user"], ) # verify can find that forest client now found_forest_client = crud_forest_client.get_forest_client( - db_pg_session, - TEST_FOERST_CLIENT_DATA["forest_client_number"] + db_pg_session, TEST_FOERST_CLIENT_DATA["forest_client_number"] ) assert found_forest_client is not None after_add_all = db_pg_session.query(models.FamForestClient).all() @@ -84,7 +84,7 @@ def test_find_or_create(db_pg_session: Session): crud_forest_client.find_or_create( db_pg_session, TEST_FOERST_CLIENT_DATA["forest_client_number"], - TEST_FOERST_CLIENT_DATA["create_user"] + TEST_FOERST_CLIENT_DATA["create_user"], ) all_forest_clients = db_pg_session.query(models.FamForestClient).all() # verify no new forest client add diff --git a/server/backend/testspg/crud/test_crud_role.py b/server/backend/testspg/crud/test_crud_role.py index 9c28dcec4..eb31ccc3c 100644 --- a/server/backend/testspg/crud/test_crud_role.py +++ b/server/backend/testspg/crud/test_crud_role.py @@ -2,17 +2,20 @@ import logging import api.app.constants as constants -import api.app.schemas as schemas import pytest from api.app.crud import crud_forest_client, crud_role, crud_user_role from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session -from testspg.constants import (TEST_CREATOR, FOM_DEV_APPLICATION_ID, - FOM_DEV_REVIEWER_ROLE_ID, - FOM_DEV_SUBMITTER_ROLE_ID, - FOM_TEST_APPLICATION_ID, - NOT_EXIST_APPLICATION_ID, - NOT_EXIST_ROLE_ID) +from testspg.constants import ( + TEST_CREATOR, + FOM_DEV_APPLICATION_ID, + FOM_DEV_REVIEWER_ROLE_ID, + FOM_DEV_SUBMITTER_ROLE_ID, + FOM_TEST_APPLICATION_ID, + NOT_EXIST_APPLICATION_ID, + NOT_EXIST_ROLE_ID, +) +from api.app.schemas import FamRoleCreateSchema LOGGER = logging.getLogger(__name__) @@ -50,7 +53,7 @@ def test_get_role(db_pg_session: Session): def test_create_role(db_pg_session: Session): new_role = crud_role.create_role( - schemas.FamRoleCreate(**TEST_ROLE_CREATE), + FamRoleCreateSchema(**TEST_ROLE_CREATE), db_pg_session, ) assert new_role.role_name == TEST_NEW_ROLE @@ -66,7 +69,7 @@ def test_create_role(db_pg_session: Session): def test_create_role_duplicate(db_pg_session: Session): # create a role ininitally new_role = crud_role.create_role( - schemas.FamRoleCreate(**TEST_ROLE_CREATE), + FamRoleCreateSchema(**TEST_ROLE_CREATE), db_pg_session, ) assert new_role.role_name == TEST_NEW_ROLE @@ -74,7 +77,7 @@ def test_create_role_duplicate(db_pg_session: Session): # can not create the role with same role name, even with a different creator with pytest.raises(IntegrityError) as e: crud_role.create_role( - schemas.FamRoleCreate( + FamRoleCreateSchema( **{ "application_id": TEST_ROLE_CREATE["application_id"], "role_name": TEST_ROLE_CREATE["role_name"], @@ -90,15 +93,14 @@ def test_create_role_duplicate(db_pg_session: Session): def test_create_role_child_role_with_forest_client(db_pg_session: Session): new_child_role = crud_role.create_role( - schemas.FamRoleCreate( + FamRoleCreateSchema( **{ "parent_role_id": FOM_DEV_SUBMITTER_ROLE_ID, "application_id": FOM_DEV_APPLICATION_ID, "forest_client_number": TEST_FOREST_CLIENT_NUMBER, "role_name": "FOM_SUBMITTER_" + TEST_FOREST_CLIENT_NUMBER, "role_purpose": crud_user_role.construct_forest_client_role_purpose( - "PARENT_ROLE purpose", - TEST_FOREST_CLIENT_NUMBER + "PARENT_ROLE purpose", TEST_FOREST_CLIENT_NUMBER ), "create_user": TEST_CREATOR, "role_type_code": constants.RoleType.ROLE_TYPE_CONCRETE, @@ -118,25 +120,22 @@ def test_create_role_child_role_with_forest_client(db_pg_session: Session): # make sure that a forest client record exists in the database forest_client_from_db = crud_forest_client.get_forest_client( - db=db_pg_session, forest_client_number=TEST_FOREST_CLIENT_NUMBER) - assert ( - forest_client_from_db.forest_client_number == - TEST_FOREST_CLIENT_NUMBER + db=db_pg_session, forest_client_number=TEST_FOREST_CLIENT_NUMBER ) + assert forest_client_from_db.forest_client_number == TEST_FOREST_CLIENT_NUMBER def test_create_role_child_role_with_invalid_parent(db_pg_session: Session): with pytest.raises(IntegrityError) as e: crud_role.create_role( - schemas.FamRoleCreate( + FamRoleCreateSchema( **{ "parent_role_id": NOT_EXIST_ROLE_ID, "application_id": FOM_DEV_APPLICATION_ID, "forest_client_number": TEST_FOREST_CLIENT_NUMBER_TWO, "role_name": "FOM_SUBMITTER_" + TEST_FOREST_CLIENT_NUMBER_TWO, "role_purpose": crud_user_role.construct_forest_client_role_purpose( - "PARENT_ROLE purpose", - TEST_FOREST_CLIENT_NUMBER_TWO + "PARENT_ROLE purpose", TEST_FOREST_CLIENT_NUMBER_TWO ), "create_user": TEST_CREATOR, "role_type_code": constants.RoleType.ROLE_TYPE_CONCRETE, @@ -151,7 +150,7 @@ def test_create_role_same_role_for_different_application(db_pg_session: Session) copy_test_role_create = copy.deepcopy(TEST_ROLE_CREATE) copy_test_role_create["role_name"] = TEST_NEW_ROLE_TWO first_new_role = crud_role.create_role( - schemas.FamRoleCreate(**copy_test_role_create), + FamRoleCreateSchema(**copy_test_role_create), db_pg_session, ) assert first_new_role.role_name == TEST_NEW_ROLE_TWO @@ -165,7 +164,7 @@ def test_create_role_same_role_for_different_application(db_pg_session: Session) copy_test_role_create["application_id"] = FOM_TEST_APPLICATION_ID second_new_role = crud_role.create_role( - schemas.FamRoleCreate(**copy_test_role_create), + FamRoleCreateSchema(**copy_test_role_create), db_pg_session, ) assert second_new_role.role_name == TEST_NEW_ROLE_TWO @@ -186,32 +185,24 @@ def test_create_role_same_role_for_different_application(db_pg_session: Session) def test_get_role_by_role_name_and_app_id(db_pg_session: Session): # get with non existing role name found_role = crud_role.get_role_by_role_name_and_app_id( - db_pg_session, - "TEST_NON_ROLE", - FOM_DEV_APPLICATION_ID + db_pg_session, "TEST_NON_ROLE", FOM_DEV_APPLICATION_ID ) assert found_role is None # get with non existing application id found_role = crud_role.get_role_by_role_name_and_app_id( - db_pg_session, - "FOM_REVIEWER", - NOT_EXIST_APPLICATION_ID + db_pg_session, "FOM_REVIEWER", NOT_EXIST_APPLICATION_ID ) assert found_role is None # get with role not in application found_role = crud_role.get_role_by_role_name_and_app_id( - db_pg_session, - "FOM_REVIEWER", - 1 # FAM APPLICATION ID + db_pg_session, "FOM_REVIEWER", 1 # FAM APPLICATION ID ) assert found_role is None # get existing role found_role = crud_role.get_role_by_role_name_and_app_id( - db_pg_session, - "FOM_REVIEWER", - FOM_DEV_APPLICATION_ID + db_pg_session, "FOM_REVIEWER", FOM_DEV_APPLICATION_ID ) assert found_role.role_name == "FOM_REVIEWER" diff --git a/server/backend/testspg/crud/test_crud_user.py b/server/backend/testspg/crud/test_crud_user.py index 3f3d1d5fc..d0fe8308a 100644 --- a/server/backend/testspg/crud/test_crud_user.py +++ b/server/backend/testspg/crud/test_crud_user.py @@ -1,20 +1,24 @@ import logging -import api.app.schemas as schemas +from api.app.schemas import FamUserSchema, TargetUserSchema import pytest from api.app.constants import CURRENT_TERMS_AND_CONDITIONS_VERSION, UserType from api.app.crud import crud_user from api.app.models.model import FamUserTermsConditions from sqlalchemy import insert from sqlalchemy.orm import Session -from testspg.constants import (TEST_CREATOR, TEST_NEW_BCEID_USER, - TEST_NEW_USER, TEST_NOT_EXIST_USER_TYPE, - USER_NAME_BCEID_LOAD_3_TEST, - USER_NAME_BCEID_LOAD_3_TEST_CHILD_1) +from testspg.constants import ( + TEST_CREATOR, + TEST_NEW_BCEID_USER, + TEST_NEW_USER, + TEST_NOT_EXIST_USER_TYPE, + USER_NAME_BCEID_LOAD_3_TEST, + USER_NAME_BCEID_LOAD_3_TEST_CHILD_1, +) LOGGER = logging.getLogger(__name__) NEW_USERNAME = "NEW_USERNAME" -NEW_USER_REQUEST = schemas.FamUser(**TEST_NEW_USER) +NEW_USER_REQUEST = FamUserSchema(**TEST_NEW_USER) def test_get_users(db_pg_session: Session): @@ -41,7 +45,7 @@ def test_get_user_by_domain_and_name(db_pg_session: Session): assert fam_user is None # create a new user and find it and verify found - request_user = schemas.FamUser(**TEST_NEW_USER) + request_user = FamUserSchema(**TEST_NEW_USER) new_user = crud_user.create_user(fam_user=request_user, db=db_pg_session) fam_user = crud_user.get_user_by_domain_and_name( db_pg_session, TEST_NEW_USER["user_type_code"], TEST_NEW_USER["user_name"] @@ -191,25 +195,41 @@ def test_update_user_name(db_pg_session: Session): @pytest.mark.parametrize( "new_user_initial_config, update_properties", [ - (TEST_NEW_BCEID_USER, { - "first_name": "test", "last_name": "bceid", "email": "becid_user@test.com", "business_guid": "test_business_guid" - }), - (TEST_NEW_USER, { # IDIR - "first_name": "test", "last_name": "idir", "email": "idir_user@test.com", "business_guid": None - }), - (TEST_NEW_USER, { - "first_name": None, "last_name": None, "email": None, "business_guid": None - }) - ] + ( + TEST_NEW_BCEID_USER, + { + "first_name": "test", + "last_name": "bceid", + "email": "becid_user@test.com", + "business_guid": "test_business_guid", + }, + ), + ( + TEST_NEW_USER, + { # IDIR + "first_name": "test", + "last_name": "idir", + "email": "idir_user@test.com", + "business_guid": None, + }, + ), + ( + TEST_NEW_USER, + { + "first_name": None, + "last_name": None, + "email": None, + "business_guid": None, + }, + ), + ], ) def test_update_user_properties_from_verified_target_user( - new_user_initial_config, - update_properties, - db_pg_session: Session + new_user_initial_config, update_properties, db_pg_session: Session ): # create a new user new_user = crud_user.create_user( - schemas.FamUser(**new_user_initial_config), db_pg_session + FamUserSchema(**new_user_initial_config), db_pg_session ) # verify new user is created with no additoinal properties set. found_user = crud_user.get_user_by_domain_and_name( @@ -223,15 +243,9 @@ def test_update_user_properties_from_verified_target_user( assert new_user.email is None assert new_user.business_guid is None - target_user = schemas.TargetUser( - **new_user_initial_config, - **update_properties - ) + target_user = TargetUserSchema(**new_user_initial_config, **update_properties) updated_user = crud_user.update_user_properties_from_verified_target_user( - db_pg_session, - found_user.user_id, - target_user, - found_user.create_user + db_pg_session, found_user.user_id, target_user, found_user.create_user ) assert updated_user.user_id == found_user.user_id assert updated_user.first_name == update_properties.get("first_name") @@ -240,9 +254,7 @@ def test_update_user_properties_from_verified_target_user( assert updated_user.business_guid == update_properties.get("business_guid") -def test_fetch_initial_requester_info_can_join_terms_conditions( - db_pg_session: Session -): +def test_fetch_initial_requester_info_can_join_terms_conditions(db_pg_session: Session): bceid_user = crud_user.get_user_by_domain_and_name( db_pg_session, UserType.BCEID, @@ -255,11 +267,13 @@ def test_fetch_initial_requester_info_can_join_terms_conditions( # bceid_user accepts FamUserTermsConditions db_pg_session.execute( insert(FamUserTermsConditions), - [{ - "user_id": bceid_user.user_id, - "version": CURRENT_TERMS_AND_CONDITIONS_VERSION, - "create_user": TEST_CREATOR, - }] + [ + { + "user_id": bceid_user.user_id, + "version": CURRENT_TERMS_AND_CONDITIONS_VERSION, + "create_user": TEST_CREATOR, + } + ], ) # this seems important, otherwise newly added attribute (T&C) won't @@ -277,7 +291,7 @@ def test_fetch_initial_requester_info_can_join_terms_conditions( def test_fetch_initial_requester_info_can_join_its_delegated_admin_record( - db_pg_session: Session + db_pg_session: Session, ): # the db user for backend/server has no permission to insert record # into `app_fam.fam_access_control_privileg` table. So use diff --git a/server/backend/testspg/crud/test_crud_user_role.py b/server/backend/testspg/crud/test_crud_user_role.py index 91984c013..16ce5d3cb 100644 --- a/server/backend/testspg/crud/test_crud_user_role.py +++ b/server/backend/testspg/crud/test_crud_user_role.py @@ -1,16 +1,20 @@ import copy import logging -import api.app.schemas as schemas +from api.app.schemas import TargetUserSchema, FamUserRoleAssignmentCreateSchema import pytest from api.app.crud import crud_role, crud_user_role from fastapi import HTTPException from pydantic import ValidationError from sqlalchemy.orm import Session -from testspg.constants import (ACCESS_GRANT_FOM_DEV_CR_IDIR, - FOM_DEV_REVIEWER_ROLE_ID, - FOM_DEV_SUBMITTER_ROLE_ID, NOT_EXIST_ROLE_ID, - TEST_CREATOR, TEST_NOT_EXIST_USER_TYPE) +from testspg.constants import ( + ACCESS_GRANT_FOM_DEV_CR_IDIR, + FOM_DEV_REVIEWER_ROLE_ID, + FOM_DEV_SUBMITTER_ROLE_ID, + NOT_EXIST_ROLE_ID, + TEST_CREATOR, + TEST_NOT_EXIST_USER_TYPE, +) LOGGER = logging.getLogger(__name__) @@ -19,49 +23,36 @@ def test_create_user_role_with_role_not_exists(db_pg_session: Session): - user_role = \ - copy.deepcopy(ACCESS_GRANT_FOM_DEV_CR_IDIR) + user_role = copy.deepcopy(ACCESS_GRANT_FOM_DEV_CR_IDIR) user_role["role_id"] = NOT_EXIST_ROLE_ID with pytest.raises(HTTPException) as e: - mocked_target_user = schemas.TargetUser(**user_role) + mocked_target_user = TargetUserSchema(**user_role) assert crud_user_role.create_user_role_assignment_many( db_pg_session, - schemas.FamUserRoleAssignmentCreate(**user_role), + FamUserRoleAssignmentCreateSchema(**user_role), mocked_target_user, - TEST_CREATOR + TEST_CREATOR, ) assert str(e._excinfo).find("Role id ") != -1 assert str(e._excinfo).find("does not exist") != -1 -def test_create_user_role_with_user_types_not_exists( - db_pg_session: Session -): +def test_create_user_role_with_user_types_not_exists(db_pg_session: Session): # Create a user_type_code with not supported type. - user_role = \ - copy.deepcopy(ACCESS_GRANT_FOM_DEV_CR_IDIR) - user_role["user_type_code"] = \ - TEST_NOT_EXIST_USER_TYPE + user_role = copy.deepcopy(ACCESS_GRANT_FOM_DEV_CR_IDIR) + user_role["user_type_code"] = TEST_NOT_EXIST_USER_TYPE with pytest.raises(ValidationError) as e: - assert schemas.FamUserRoleAssignmentCreate(**user_role) - assert ( - str(e.value).find( - "Input should be 'I' or 'B'" - ) - != -1 - ) + assert FamUserRoleAssignmentCreateSchema(**user_role) + assert str(e.value).find("Input should be 'I' or 'B'") != -1 LOGGER.debug(f"Expected exception raised: {e.value}") def test_create(db_pg_session: Session): crud_user_role.create( - db_pg_session, - TEST_USER_ID, - FOM_DEV_REVIEWER_ROLE_ID, - TEST_CREATOR + db_pg_session, TEST_USER_ID, FOM_DEV_REVIEWER_ROLE_ID, TEST_CREATOR ) # verify user role created @@ -84,10 +75,7 @@ def test_get_use_role_by_user_id_and_role_id(db_pg_session: Session): # create a user role assignment user_role_xref = crud_user_role.create( - db_pg_session, - TEST_USER_ID, - FOM_DEV_REVIEWER_ROLE_ID, - TEST_CREATOR + db_pg_session, TEST_USER_ID, FOM_DEV_REVIEWER_ROLE_ID, TEST_CREATOR ) xref_dict = user_role_xref.__dict__ @@ -102,16 +90,14 @@ def test_get_use_role_by_user_id_and_role_id(db_pg_session: Session): def test_construct_forest_client_role_name(): result = crud_user_role.construct_forest_client_role_name( - "PARENT_ROLE", - TEST_FOREST_CLIENT_NUMBER + "PARENT_ROLE", TEST_FOREST_CLIENT_NUMBER ) assert result == "PARENT_ROLE_" + TEST_FOREST_CLIENT_NUMBER def test_construct_forest_client_role_purpose(): result = crud_user_role.construct_forest_client_role_purpose( - "PARENT_ROLE purpose", - TEST_FOREST_CLIENT_NUMBER + "PARENT_ROLE purpose", TEST_FOREST_CLIENT_NUMBER ) assert result == "PARENT_ROLE purpose for " + TEST_FOREST_CLIENT_NUMBER @@ -120,10 +106,7 @@ def test_find_or_create_forest_client_child_role(db_pg_session: Session): # create child role for abstract parent role test_role = crud_role.get_role(db_pg_session, FOM_DEV_SUBMITTER_ROLE_ID) result = crud_user_role.find_or_create_forest_client_child_role( - db_pg_session, - TEST_FOREST_CLIENT_NUMBER, - test_role, - TEST_CREATOR + db_pg_session, TEST_FOREST_CLIENT_NUMBER, test_role, TEST_CREATOR ) # verify child role created child_role_one = crud_role.get_role(db_pg_session, result.role_id) @@ -131,15 +114,9 @@ def test_find_or_create_forest_client_child_role(db_pg_session: Session): assert child_role_one.role_name == "FOM_SUBMITTER_" + TEST_FOREST_CLIENT_NUMBER # create child role for concrete parent role - test_concrete_role = crud_role.get_role( - db_pg_session, - FOM_DEV_REVIEWER_ROLE_ID - ) + test_concrete_role = crud_role.get_role(db_pg_session, FOM_DEV_REVIEWER_ROLE_ID) result = crud_user_role.find_or_create_forest_client_child_role( - db_pg_session, - TEST_FOREST_CLIENT_NUMBER, - test_concrete_role, - TEST_CREATOR + db_pg_session, TEST_FOREST_CLIENT_NUMBER, test_concrete_role, TEST_CREATOR ) # verify child role created child_role_two = crud_role.get_role(db_pg_session, result.role_id) diff --git a/server/backend/testspg/crud/test_crud_utils.py b/server/backend/testspg/crud/test_crud_utils.py index dd1c087e3..79d49d137 100644 --- a/server/backend/testspg/crud/test_crud_utils.py +++ b/server/backend/testspg/crud/test_crud_utils.py @@ -1,8 +1,7 @@ import logging - -import api.app.models.model as model -import api.app.schemas as schemas import pytest +import api.app.models.model as model +from api.app.schemas import FamUserSchema from api.app.crud import crud_user, crud_utils from sqlalchemy.orm import Session from testspg.constants import TEST_NEW_USER @@ -10,11 +9,14 @@ LOGGER = logging.getLogger(__name__) -@pytest.mark.parametrize("str_list_to_test, expcted_str_list", [ - (['fam', 'fom', 'aws'], ['FAM', 'FOM', 'AWS']), - (['FAM', 'FOM', 'AWS'], ['FAM', 'FOM', 'AWS']), - (None, None) -]) +@pytest.mark.parametrize( + "str_list_to_test, expcted_str_list", + [ + (["fam", "fom", "aws"], ["FAM", "FOM", "AWS"]), + (["FAM", "FOM", "AWS"], ["FAM", "FOM", "AWS"]), + (None, None), + ], +) def test_to_upper(str_list_to_test, expcted_str_list): result = crud_utils.to_upper(str_list_to_test) if result: @@ -24,24 +26,26 @@ def test_to_upper(str_list_to_test, expcted_str_list): assert result == expcted_str_list -@pytest.mark.parametrize("str_list_to_test, str_to_replace, replace_with, expcted_str_list", [ - ( - ['FAM_ADMIN', 'FOM_DEV_ADMIN', 'FOM_TEST_ADMIN'], - "_ADMIN", "", - ['FAM', 'FOM_DEV', 'FOM_TEST'] - ), - ( - ['FAM_ACCESS', 'FOM_DEV', 'FOM'], - "_ACCESS", "_ADMIN", - ['FAM_ADMIN', 'FOM_DEV', 'FOM'] - ), - (None, "something", "some_other_thing", None) -]) +@pytest.mark.parametrize( + "str_list_to_test, str_to_replace, replace_with, expcted_str_list", + [ + ( + ["FAM_ADMIN", "FOM_DEV_ADMIN", "FOM_TEST_ADMIN"], + "_ADMIN", + "", + ["FAM", "FOM_DEV", "FOM_TEST"], + ), + ( + ["FAM_ACCESS", "FOM_DEV", "FOM"], + "_ACCESS", + "_ADMIN", + ["FAM_ADMIN", "FOM_DEV", "FOM"], + ), + (None, "something", "some_other_thing", None), + ], +) def test_replace_str_list( - str_list_to_test, - str_to_replace, - replace_with, - expcted_str_list + str_list_to_test, str_to_replace, replace_with, expcted_str_list ): result = crud_utils.replace_str_list(str_list_to_test, str_to_replace, replace_with) if result: @@ -77,10 +81,8 @@ def test_get_next(db_pg_session: Session): assert next_value_before > 0 # now add record and test again that the number is greater - request_user = schemas.FamUser( - **TEST_NEW_USER - ) + request_user = FamUserSchema(**TEST_NEW_USER) new_user = crud_user.create_user(fam_user=request_user, db=db_pg_session) next_value_after = crud_utils.get_next(db=db_pg_session, model=fam_user_model) - assert next_value_after > next_value_before \ No newline at end of file + assert next_value_after > next_value_before diff --git a/server/backend/testspg/crud/test_gc_notify.py b/server/backend/testspg/crud/test_gc_notify.py index 8429577e4..fdcd2d191 100644 --- a/server/backend/testspg/crud/test_gc_notify.py +++ b/server/backend/testspg/crud/test_gc_notify.py @@ -3,7 +3,7 @@ import pytest from requests import HTTPError from api.app.integration.gc_notify import GCNotifyEmailService -from api.app.schemas import GCNotifyGrantAccessEmailParam +from api.app.schemas import GCNotifyGrantAccessEmailParamSchema LOGGER = logging.getLogger(__name__) @@ -34,7 +34,7 @@ def test_with_invalid_email_address(self): The test checks the error handling when provide an invalid email address """ with pytest.raises(Exception) as excinfo: - _test_params = GCNotifyGrantAccessEmailParam( + _test_params = GCNotifyGrantAccessEmailParamSchema( **{ "user_name": "cmeng", "application_name": "fam", diff --git a/server/backend/testspg/crud/test_idim_proxy_service.py b/server/backend/testspg/crud/test_idim_proxy_service.py index 6381bc5e7..a54542749 100644 --- a/server/backend/testspg/crud/test_idim_proxy_service.py +++ b/server/backend/testspg/crud/test_idim_proxy_service.py @@ -6,17 +6,22 @@ from api.app.constants import IdimSearchUserParamType from api.app.integration.idim_proxy import IdimProxyService from api.app.jwt_validation import ERROR_PERMISSION_REQUIRED -from api.app.schemas import (IdimProxyBceidSearchParam, IdimProxySearchParam, - Requester) +from api.app.schemas import ( + IdimProxyBceidSearchParamSchema, + IdimProxySearchParamSchema, + RequesterSchema, +) from fastapi import HTTPException from requests import HTTPError -from testspg.constants import (TEST_BCEID_REQUESTER_DICT, - TEST_IDIR_REQUESTER_DICT, - USER_GUID_BCEID_LOAD_3_TEST, - USER_GUID_BCEID_LOAD_3_TEST_CHILD_1, - USER_NAME_BCEID_LOAD_2_TEST, - USER_NAME_BCEID_LOAD_3_TEST, - USER_NAME_BCEID_LOAD_3_TEST_CHILD_1) +from testspg.constants import ( + TEST_BCEID_REQUESTER_DICT, + TEST_IDIR_REQUESTER_DICT, + USER_GUID_BCEID_LOAD_3_TEST, + USER_GUID_BCEID_LOAD_3_TEST_CHILD_1, + USER_NAME_BCEID_LOAD_2_TEST, + USER_NAME_BCEID_LOAD_3_TEST, + USER_NAME_BCEID_LOAD_3_TEST_CHILD_1, +) LOGGER = logging.getLogger(__name__) @@ -29,27 +34,29 @@ class TestIdimProxyServiceClass(object): """ # Valid test IDIR user. - search_params_idir = IdimProxySearchParam( - **{"userId": "ianliu"} - ) + search_params_idir = IdimProxySearchParamSchema(**{"userId": "ianliu"}) # Valid test Business Bceid user - search_params_business_bceid_same_org = IdimProxyBceidSearchParam( - **{"searchUserBy": IdimSearchUserParamType.USER_ID, - "searchValue": USER_NAME_BCEID_LOAD_3_TEST_CHILD_1} + search_params_business_bceid_same_org = IdimProxyBceidSearchParamSchema( + **{ + "searchUserBy": IdimSearchUserParamType.USER_ID, + "searchValue": USER_NAME_BCEID_LOAD_3_TEST_CHILD_1, + } ) # Valid test Business Bceid user. - search_params_business_bceid_diff_org = IdimProxyBceidSearchParam( - **{"searchUserBy": IdimSearchUserParamType.USER_ID, - "searchValue": USER_NAME_BCEID_LOAD_2_TEST} + search_params_business_bceid_diff_org = IdimProxyBceidSearchParamSchema( + **{ + "searchUserBy": IdimSearchUserParamType.USER_ID, + "searchValue": USER_NAME_BCEID_LOAD_2_TEST, + } ) def setup_class(self): - # local valid mock requester - self.requester_idir = Requester(**TEST_IDIR_REQUESTER_DICT) + # local valid mock RequesterSchema + self.requester_idir = RequesterSchema(**TEST_IDIR_REQUESTER_DICT) self.requester_idir.user_guid = TEST_IDIR_USER_GUID # This tester uses "LOAD-3-TEST" - self.requester_business_bceid = Requester(**TEST_BCEID_REQUESTER_DICT) + self.requester_business_bceid = RequesterSchema(**TEST_BCEID_REQUESTER_DICT) def test_verify_init(self): idim_proxy_api = IdimProxyService(self.requester_idir) @@ -101,7 +108,7 @@ def test_search_idir__user_not_exist_no_user_found(self): assert search_result["found"] == False - # --- Performs search_business_bceid user (IDIR requester/BCeID requester) --- + # --- Performs search_business_bceid user (IDIR Requester/BCeID Requester) --- def test_search_bceid__user_not_exist_not_found(self): idim_proxy_api = IdimProxyService(self.requester_idir) @@ -115,7 +122,7 @@ def test_search_bceid__user_not_exist_not_found(self): def test_search_bceid__idir_requester_by_userid_search_pass(self): idim_proxy_api = IdimProxyService(copy.deepcopy(self.requester_idir)) - # for IDIR requester, it does not matter the "business organization" for BCeID user. + # for IDIR Requester, it does not matter the "business organization" for BCeID user. search_result = idim_proxy_api.search_business_bceid( self.search_params_business_bceid_same_org ) @@ -161,10 +168,12 @@ def test_search_bceid__bceid_requester_by_userid_diff_org_search_not_allow(self) def test_search_bceid__idir_requester_by_user_guid_search_pass(self): idim_proxy_api = IdimProxyService(self.requester_idir) - # for IDIR requester, it does not matter the "business organization" for BCeID user. - search_params = IdimProxyBceidSearchParam( - **{"searchUserBy": IdimSearchUserParamType.USER_GUID, - "searchValue": USER_GUID_BCEID_LOAD_3_TEST} + # for IDIR Requester, it does not matter the "business organization" for BCeID user. + search_params = IdimProxyBceidSearchParamSchema( + **{ + "searchUserBy": IdimSearchUserParamType.USER_GUID, + "searchValue": USER_GUID_BCEID_LOAD_3_TEST, + } ) search_result = idim_proxy_api.search_business_bceid(search_params) @@ -180,13 +189,15 @@ def test_search_bceid__idir_requester_by_user_guid_search_pass(self): reason="Search BCeID by user_guid is not enabled. Enable this test when ready." ) def test_search_bceid__bceid_requester_by_user_guid_same_org_search_pass(self): - # business bceid requester + # business bceid Requester idim_proxy_api = IdimProxyService(self.requester_business_bceid) # This search_params uses "TEST-3-LOAD-CHILD-1", same org with "LOAD-3-TEST" - search_params = IdimProxyBceidSearchParam( - **{"searchUserBy": IdimSearchUserParamType.USER_GUID, - "searchValue": USER_GUID_BCEID_LOAD_3_TEST_CHILD_1} + search_params = IdimProxyBceidSearchParamSchema( + **{ + "searchUserBy": IdimSearchUserParamType.USER_GUID, + "searchValue": USER_GUID_BCEID_LOAD_3_TEST_CHILD_1, + } ) search_result = idim_proxy_api.search_business_bceid(search_params) diff --git a/server/backend/testspg/crud/validator/test_user_validator.py b/server/backend/testspg/crud/validator/test_user_validator.py index cdb3fc294..6ae7b0342 100644 --- a/server/backend/testspg/crud/validator/test_user_validator.py +++ b/server/backend/testspg/crud/validator/test_user_validator.py @@ -1,16 +1,22 @@ import logging import pytest -from api.app.constants import ERROR_CODE_INVALID_REQUEST_PARAMETER, UserType, ApiInstanceEnv +from api.app.constants import ( + ERROR_CODE_INVALID_REQUEST_PARAMETER, + UserType, + ApiInstanceEnv, +) from api.app.crud.validator.target_user_validator import TargetUserValidator from api.app.integration.idim_proxy import IdimProxyService -from api.app.schemas import Requester, TargetUser +from api.app.schemas import RequesterSchema, TargetUserSchema from fastapi import HTTPException from mock import patch -from testspg.constants import (BUSINESS_GUID_BCEID_LOAD_2_TEST, - TEST_IDIR_REQUESTER_DICT, - USER_GUID_BCEID_LOAD_2_TEST, - USER_NAME_BCEID_LOAD_2_TEST) +from testspg.constants import ( + BUSINESS_GUID_BCEID_LOAD_2_TEST, + TEST_IDIR_REQUESTER_DICT, + USER_GUID_BCEID_LOAD_2_TEST, + USER_NAME_BCEID_LOAD_2_TEST, +) LOGGER = logging.getLogger(__name__) @@ -41,14 +47,16 @@ class TestUserValidatorClass(object): """ def setup_class(self): - # local valid mock requester - self.requester_idir = Requester(**TEST_IDIR_REQUESTER_DICT) + # local valid mock RequesterSchema + self.requester_idir = RequesterSchema(**TEST_IDIR_REQUESTER_DICT) @patch.object(IdimProxyService, "search_idir") def test_verify_user_exist_idir(self, mock_search_idir): mock_search_idir.return_value = MOCK_SERACH_IDIR_RETURN - target_user = TargetUser(**TEST_IDIR_REQUESTER_DICT) - target_user_validaor = TargetUserValidator(self.requester_idir, target_user, ApiInstanceEnv.TEST) + target_user = TargetUserSchema(**TEST_IDIR_REQUESTER_DICT) + target_user_validaor = TargetUserValidator( + self.requester_idir, target_user, ApiInstanceEnv.TEST + ) verified_target_user = target_user_validaor.verify_user_exist() # test the verified target user assert verified_target_user.user_guid == target_user.user_guid @@ -58,10 +66,12 @@ def test_verify_user_exist_idir(self, mock_search_idir): @patch.object(IdimProxyService, "search_idir") def test_verify_user_exist_idir_not_found(self, mock_search_idir): mock_search_idir.return_value = {**MOCK_SERACH_IDIR_RETURN, "found": False} - target_user = TargetUser( + target_user = TargetUserSchema( **{**TEST_IDIR_REQUESTER_DICT, "user_name": "USER_NOT_EXISTS"} ) - target_user_validaor = TargetUserValidator(self.requester_idir, target_user, ApiInstanceEnv.TEST) + target_user_validaor = TargetUserValidator( + self.requester_idir, target_user, ApiInstanceEnv.TEST + ) with pytest.raises(HTTPException) as e: target_user_validaor.verify_user_exist() @@ -79,13 +89,15 @@ def test_verify_user_exist_idir_not_found(self, mock_search_idir): @patch.object(IdimProxyService, "search_idir") def test_verify_user_exist_idir_mismatch_info(self, mock_search_idir): mock_search_idir.return_value = MOCK_SERACH_IDIR_RETURN - target_user = TargetUser( + target_user = TargetUserSchema( **{ **TEST_IDIR_REQUESTER_DICT, "user_guid": "USERGUIDNOTEXISTSPOJHSLEJFNSEKSL", } ) - target_user_validaor = TargetUserValidator(self.requester_idir, target_user, ApiInstanceEnv.TEST) + target_user_validaor = TargetUserValidator( + self.requester_idir, target_user, ApiInstanceEnv.TEST + ) with pytest.raises(HTTPException) as e: target_user_validaor.verify_user_exist() @@ -101,8 +113,10 @@ def test_verify_user_exist_idir_mismatch_info(self, mock_search_idir): @patch.object(IdimProxyService, "search_business_bceid") def test_verify_user_exist_bceid(self, mock_search_business_bceid): mock_search_business_bceid.return_value = MOCK_SERACH_BCEID_RETURN - target_user = TargetUser(**TEST_TARGET_USER_BCEID_LOAD_2) - target_user_validaor = TargetUserValidator(self.requester_idir, target_user, ApiInstanceEnv.TEST) + target_user = TargetUserSchema(**TEST_TARGET_USER_BCEID_LOAD_2) + target_user_validaor = TargetUserValidator( + self.requester_idir, target_user, ApiInstanceEnv.TEST + ) verified_target_user = target_user_validaor.verify_user_exist() # test the verified target user, business guid is added assert verified_target_user.user_guid == target_user.user_guid @@ -116,13 +130,15 @@ def test_verify_user_exist_bceid_not_found(self, mock_search_business_bceid): **MOCK_SERACH_BCEID_RETURN, "found": False, } - target_user = TargetUser( + target_user = TargetUserSchema( **{ **TEST_TARGET_USER_BCEID_LOAD_2, "user_guid": "USERGUIDNOTEXISTSPOJHSLEJFNSEKSL", } ) - target_user_validaor = TargetUserValidator(self.requester_idir, target_user, ApiInstanceEnv.TEST) + target_user_validaor = TargetUserValidator( + self.requester_idir, target_user, ApiInstanceEnv.TEST + ) with pytest.raises(HTTPException) as e: target_user_validaor.verify_user_exist() @@ -140,13 +156,15 @@ def test_verify_user_exist_bceid_not_found(self, mock_search_business_bceid): @patch.object(IdimProxyService, "search_business_bceid") def test_verify_user_exist_bceid_mismatch_info(self, mock_search_business_bceid): mock_search_business_bceid.return_value = MOCK_SERACH_BCEID_RETURN - target_user = TargetUser( + target_user = TargetUserSchema( **{ **TEST_TARGET_USER_BCEID_LOAD_2, "user_name": "USER_NOT_EXISTS", } ) - target_user_validaor = TargetUserValidator(self.requester_idir, target_user, ApiInstanceEnv.TEST) + target_user_validaor = TargetUserValidator( + self.requester_idir, target_user, ApiInstanceEnv.TEST + ) with pytest.raises(HTTPException) as e: target_user_validaor.verify_user_exist() diff --git a/server/backend/testspg/router/test_router_forest_client.py b/server/backend/testspg/router/test_router_forest_client.py index a0283d0fa..10153860f 100644 --- a/server/backend/testspg/router/test_router_forest_client.py +++ b/server/backend/testspg/router/test_router_forest_client.py @@ -6,69 +6,74 @@ import testspg.jwt_utils as jwt_utils from api.app.constants import FamForestClientStatusType from api.app.main import apiPrefix -from api.app.schemas import FamForestClient +from api.app.schemas import FamForestClientSchema from fastapi.testclient import TestClient -from testspg.constants import (FC_NUMBER_EXISTS_ACTIVE_00000001, - FC_NUMBER_EXISTS_DEACTIVATED, - FC_NUMBER_EXISTS_DECEASED, - FC_NUMBER_EXISTS_RECEIVERSHIP, - FC_NUMBER_EXISTS_SUSPENDED, - FOM_DEV_APPLICATION_ID) +from testspg.constants import ( + FC_NUMBER_EXISTS_ACTIVE_00000001, + FC_NUMBER_EXISTS_DEACTIVATED, + FC_NUMBER_EXISTS_DECEASED, + FC_NUMBER_EXISTS_RECEIVERSHIP, + FC_NUMBER_EXISTS_SUSPENDED, + FOM_DEV_APPLICATION_ID, +) LOGGER = logging.getLogger(__name__) dummy_test_application_id_search_param = FOM_DEV_APPLICATION_ID endPoint_search = f"{apiPrefix}/forest_clients/search?application_id={dummy_test_application_id_search_param}" -@pytest.mark.parametrize("client_id_to_test, expcted_error_type", [ - ("11", "string_too_short"), - ("000001011", "string_too_long") -]) +@pytest.mark.parametrize( + "client_id_to_test, expcted_error_type", + [("11", "string_too_short"), ("000001011", "string_too_long")], +) def test_search_client_number_invalid_length_error( client_id_to_test, expcted_error_type, test_client_fixture: starlette.testclient.TestClient, - test_rsa_key + test_rsa_key, ): invalid_length_param = f"&client_number={client_id_to_test}" # less than 8 digits test_end_point = endPoint_search + invalid_length_param LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) - response = test_client_fixture.get(f"{test_end_point}", headers=jwt_utils.headers(token)) + response = test_client_fixture.get( + f"{test_end_point}", headers=jwt_utils.headers(token) + ) data = response.json() LOGGER.debug(f"data: {data}") assert expcted_error_type in data["detail"][0]["type"] def test_search_client_number_correct_length_not_valid_format( - test_client_fixture: starlette.testclient.TestClient, - test_rsa_key + test_client_fixture: starlette.testclient.TestClient, test_rsa_key ): invalid_param = "&client_number=001a2b3d" test_end_point = endPoint_search + invalid_param LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) - response = test_client_fixture.get(f"{test_end_point}", headers=jwt_utils.headers(token)) + response = test_client_fixture.get( + f"{test_end_point}", headers=jwt_utils.headers(token) + ) data = response.json() assert len(data) == 0 # Expect empty def test_search_client_number_not_exists_noresult( - test_client_fixture: TestClient, - test_rsa_key + test_client_fixture: TestClient, test_rsa_key ): invalid_param = "&client_number=99999999" test_end_point = endPoint_search + invalid_param LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) - response = test_client_fixture.get(f"{test_end_point}", headers=jwt_utils.headers(token)) + response = test_client_fixture.get( + f"{test_end_point}", headers=jwt_utils.headers(token) + ) data = response.json() assert len(data) == 0 # Expect empty def test_search_client_number_exists_with_one_result( - test_client_fixture: TestClient, - test_rsa_key + test_client_fixture: TestClient, test_rsa_key ): """ Client "00000001" have ACT (Active) status. @@ -77,14 +82,18 @@ def test_search_client_number_exists_with_one_result( test_end_point = endPoint_search + f"&client_number={exist_forest_client_number}" LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) - response = test_client_fixture.get(f"{test_end_point}", headers=jwt_utils.headers(token)) + response = test_client_fixture.get( + f"{test_end_point}", headers=jwt_utils.headers(token) + ) data = response.json() assert len(data) == 1 # Expect only 1 - fc: FamForestClient + fc: FamForestClientSchema try: - fc = FamForestClient(**data[0]) + fc = FamForestClientSchema(**data[0]) except Exception: - assert False # If response data conversion fails, something is wrong in structure. + assert ( + False # If response data conversion fails, something is wrong in structure. + ) assert hasattr(fc, "client_name") assert hasattr(fc, "forest_client_number") @@ -92,23 +101,48 @@ def test_search_client_number_exists_with_one_result( assert fc.forest_client_number == exist_forest_client_number -@pytest.mark.parametrize("client_id_to_test, expcted_status", [ - (FC_NUMBER_EXISTS_ACTIVE_00000001, - {"code": FamForestClientStatusType.ACTIVE, "description": constants.DESCRIPTION_ACTIVE}), - (FC_NUMBER_EXISTS_DEACTIVATED, - {"code": FamForestClientStatusType.INACTIVE, "description": constants.DESCRIPTION_INACTIVE}), - (FC_NUMBER_EXISTS_DECEASED, - {"code": FamForestClientStatusType.INACTIVE, "description": constants.DESCRIPTION_INACTIVE}), - (FC_NUMBER_EXISTS_RECEIVERSHIP, - {"code": FamForestClientStatusType.INACTIVE, "description": constants.DESCRIPTION_INACTIVE}), - (FC_NUMBER_EXISTS_SUSPENDED, - {"code": FamForestClientStatusType.INACTIVE, "description": constants.DESCRIPTION_INACTIVE}), -]) +@pytest.mark.parametrize( + "client_id_to_test, expcted_status", + [ + ( + FC_NUMBER_EXISTS_ACTIVE_00000001, + { + "code": FamForestClientStatusType.ACTIVE, + "description": constants.DESCRIPTION_ACTIVE, + }, + ), + ( + FC_NUMBER_EXISTS_DEACTIVATED, + { + "code": FamForestClientStatusType.INACTIVE, + "description": constants.DESCRIPTION_INACTIVE, + }, + ), + ( + FC_NUMBER_EXISTS_DECEASED, + { + "code": FamForestClientStatusType.INACTIVE, + "description": constants.DESCRIPTION_INACTIVE, + }, + ), + ( + FC_NUMBER_EXISTS_RECEIVERSHIP, + { + "code": FamForestClientStatusType.INACTIVE, + "description": constants.DESCRIPTION_INACTIVE, + }, + ), + ( + FC_NUMBER_EXISTS_SUSPENDED, + { + "code": FamForestClientStatusType.INACTIVE, + "description": constants.DESCRIPTION_INACTIVE, + }, + ), + ], +) def test_search_client_number_with_status_mapping_correctly( - client_id_to_test, - expcted_status, - test_client_fixture: TestClient, - test_rsa_key + client_id_to_test, expcted_status, test_client_fixture: TestClient, test_rsa_key ): """ Forest Client API has following status codes. @@ -123,12 +157,14 @@ def test_search_client_number_with_status_mapping_correctly( test_end_point = endPoint_search + f"&client_number={client_id_to_test}" LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) - response = test_client_fixture.get(f"{test_end_point}", headers=jwt_utils.headers(token)) + response = test_client_fixture.get( + f"{test_end_point}", headers=jwt_utils.headers(token) + ) data = response.json() assert len(data) == 1 # Expect only 1 - fc: FamForestClient + fc: FamForestClientSchema try: - fc = FamForestClient(**data[0]) + fc = FamForestClientSchema(**data[0]) except Exception: assert False # Conversion fail assert fc.forest_client_number == client_id_to_test diff --git a/server/backend/testspg/router/test_router_idim_proxy.py b/server/backend/testspg/router/test_router_idim_proxy.py index 42134bb97..62b8502c8 100644 --- a/server/backend/testspg/router/test_router_idim_proxy.py +++ b/server/backend/testspg/router/test_router_idim_proxy.py @@ -5,7 +5,7 @@ from api.app.main import apiPrefix from api.app.constants import ERROR_CODE_REQUESTER_NOT_EXISTS from api.app.routers.router_guards import get_current_requester -from api.app.schemas import Requester +from api.app.schemas import RequesterSchema from api.app.jwt_validation import ERROR_PERMISSION_REQUIRED, ERROR_GROUPS_REQUIRED from api.app.utils.utils import raise_http_exception import testspg.jwt_utils as jwt_utils @@ -15,7 +15,7 @@ TEST_BCEID_REQUESTER_DICT, TEST_VALID_BUSINESS_BCEID_USERNAME_ONE, TEST_VALID_BUSINESS_BCEID_USERNAME_TWO, - FOM_DEV_APPLICATION_ID + FOM_DEV_APPLICATION_ID, ) @@ -31,26 +31,26 @@ async def mock_get_current_requester_with_idir_user(): """ - A mock for router dependency, for requester who is IDIR user. + A mock for router dependency, for Requester who is IDIR user. """ - return Requester(**TEST_IDIR_REQUESTER_DICT) + return RequesterSchema(**TEST_IDIR_REQUESTER_DICT) async def mock_get_current_requester_with_business_bceid_user(): """ - A mock for router dependency, for requester who is not IDIR user. + A mock for router dependency, for Requester who is not IDIR user. """ - return Requester(**TEST_BCEID_REQUESTER_DICT) + return RequesterSchema(**TEST_BCEID_REQUESTER_DICT) async def mock_get_current_requester_user_not_exists(): """ - A mock for router dependency, for requester who does not exists. + A mock for router dependency, for Requester who does not exists. """ raise_http_exception( status_code=HTTPStatus.FORBIDDEN, error_code=ERROR_CODE_REQUESTER_NOT_EXISTS, - error_msg="Requester does not exist, action is not allowed." + error_msg="Requester does not exist, action is not allowed.", ) @@ -61,13 +61,17 @@ def test_search_idir_with_valid_user_found_result( """ Test valid user_id to search. """ - # override dependency for requester on router. + # override dependency for Requester on router. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_with_idir_user ) - test_end_point = endPoint_search_idir + f"?user_id={valid_user_id_param}" + endPoint_search_param_application_id + test_end_point = ( + endPoint_search_idir + + f"?user_id={valid_user_id_param}" + + endPoint_search_param_application_id + ) LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) response = test_client_fixture.get( @@ -87,14 +91,18 @@ def test_search_idir_with_invalid_user_return_not_found( """ Test invalid user_id to search. """ - # override dependency for requester on router. + # override dependency for Requester on router. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_with_idir_user ) invalid_user_id_param = "USERNOTEXISTS" - test_end_point = endPoint_search_idir + f"?user_id={invalid_user_id_param}" + endPoint_search_param_application_id + test_end_point = ( + endPoint_search_idir + + f"?user_id={invalid_user_id_param}" + + endPoint_search_param_application_id + ) LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) response = test_client_fixture.get( @@ -112,16 +120,20 @@ def test_none_idir_user_cannot_search_idir_user( test_client_fixture: TestClient, test_rsa_key ): """ - Test requester is external. + Test Requester is external. """ - # override dependency for requester on router. + # override dependency for Requester on router. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_with_business_bceid_user ) - test_end_point = endPoint_search_idir + f"?user_id={valid_user_id_param}" + endPoint_search_param_application_id + test_end_point = ( + endPoint_search_idir + + f"?user_id={valid_user_id_param}" + + endPoint_search_param_application_id + ) token = jwt_utils.create_jwt_token(test_rsa_key) response = test_client_fixture.get( f"{test_end_point}", headers=jwt_utils.headers(token) @@ -135,16 +147,20 @@ def test_search_idir_user_requester_not_found_error_raised( test_client_fixture: TestClient, test_rsa_key ): """ - Test requester does not exist. + Test Requester does not exist. """ - # override dependency for requester not exists. + # override dependency for Requester not exists. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_user_not_exists ) - test_end_point = endPoint_search_idir + f"?user_id={valid_user_id_param}" + endPoint_search_param_application_id + test_end_point = ( + endPoint_search_idir + + f"?user_id={valid_user_id_param}" + + endPoint_search_param_application_id + ) token = jwt_utils.create_jwt_token(test_rsa_key) response = test_client_fixture.get( f"{test_end_point}", headers=jwt_utils.headers(token) @@ -161,13 +177,15 @@ def test_search_bceid_with_valid_user_same_org_found_result( """ Test business bceid user search valid business bceid user_id within same organization """ - # override dependency for requester on router. + # override dependency for Requester on router. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_with_business_bceid_user ) test_end_point = ( - endPoint_search_bceid + f"?user_id={TEST_VALID_BUSINESS_BCEID_USERNAME_ONE}" + endPoint_search_param_application_id + endPoint_search_bceid + + f"?user_id={TEST_VALID_BUSINESS_BCEID_USERNAME_ONE}" + + endPoint_search_param_application_id ) LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) @@ -191,13 +209,15 @@ def test_search_bceid_with_valid_user_diff_org_fail( """ Test business bceid user search valid business bceid user_id from different organization """ - # override dependency for requester on router. + # override dependency for Requester on router. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_with_business_bceid_user ) test_end_point = ( - endPoint_search_bceid + f"?user_id={TEST_VALID_BUSINESS_BCEID_USERNAME_TWO}" + endPoint_search_param_application_id + endPoint_search_bceid + + f"?user_id={TEST_VALID_BUSINESS_BCEID_USERNAME_TWO}" + + endPoint_search_param_application_id ) LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) @@ -216,13 +236,15 @@ def test_search_bceid_with_valid_user_without_authorization_fail( """ Test business bceid user search valid business bceid user_id without authorization """ - # override dependency for requester on router. + # override dependency for Requester on router. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_with_business_bceid_user ) test_end_point = ( - endPoint_search_bceid + f"?user_id={TEST_VALID_BUSINESS_BCEID_USERNAME_ONE}" + endPoint_search_param_application_id + endPoint_search_bceid + + f"?user_id={TEST_VALID_BUSINESS_BCEID_USERNAME_ONE}" + + endPoint_search_param_application_id ) LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key, []) @@ -241,14 +263,18 @@ def test_search_bceid_with_invalid_user_return_not_found( """ Test idir user search invalid business bceid user_id. """ - # override dependency for requester on router. + # override dependency for Requester on router. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_with_business_bceid_user ) invalid_user_id_param = "USERNOTEXISTS" - test_end_point = endPoint_search_bceid + f"?user_id={invalid_user_id_param}" + endPoint_search_param_application_id + test_end_point = ( + endPoint_search_bceid + + f"?user_id={invalid_user_id_param}" + + endPoint_search_param_application_id + ) LOGGER.debug(f"test_end_point: {test_end_point}") token = jwt_utils.create_jwt_token(test_rsa_key) response = test_client_fixture.get( @@ -264,17 +290,19 @@ def test_search_bceid_user_requester_not_found_error_raised( test_client_fixture: TestClient, test_rsa_key ): """ - Test requester does not exist. + Test Requester does not exist. """ - # override dependency for requester not exists. + # override dependency for Requester not exists. app = test_client_fixture.app app.dependency_overrides[get_current_requester] = ( mock_get_current_requester_user_not_exists ) test_end_point = ( - endPoint_search_bceid + f"?user_id={TEST_VALID_BUSINESS_BCEID_USERNAME_ONE}" + endPoint_search_param_application_id + endPoint_search_bceid + + f"?user_id={TEST_VALID_BUSINESS_BCEID_USERNAME_ONE}" + + endPoint_search_param_application_id ) token = jwt_utils.create_jwt_token(test_rsa_key) response = test_client_fixture.get(