diff --git a/pyracf/common/security_admin.py b/pyracf/common/security_admin.py index cbf6934..005343b 100644 --- a/pyracf/common/security_admin.py +++ b/pyracf/common/security_admin.py @@ -526,6 +526,36 @@ def _get_profile( index ] + def _build_connection_template( + self, + profile: dict, + name: str, + ) -> dict: + """Map the profile to a template for connection requests.""" + templates = [] + connections = [] + if self._generate_requests_only: + # Allows this function to work with "self._generate_requests_only" mode. + return profile + if self._profile_type == "user": + user_connections = profile["base"].get("groups", {}) + for key in user_connections.keys(): + user_connections[key].update([("group", key), ("userid", name)]) + connections.append(user_connections[key]) + elif self._profile_type == "group": + connections = profile["base"].get("users", {}) + for connection in connections: + connection.update({"group": name}) + else: + return profile + for connection in connections: + template = {} + template["userid"] = connection["userid"] + template["group"] = connection["group"] + template["traits"] = self.__make_connection_traits(connection) + templates.append(template) + return templates + def _build_template( self, profile: dict, @@ -563,6 +593,43 @@ def __map_extract_to_template( template[self.__template_map[key]] = value return template + def __make_connection_traits(self, traits_dict: dict) -> dict: + connection_map = { + "connectOwner": "base:owner", + "universalAccess": "base:universal_access", + "access": "base:group_authority", + "auth": "base:group_authority", + } + attribute_map = { + "adsp": "base:automatic_data_set_protection", + "auditor": "base:auditor", + "grpacc": "base:group_access", + "operations": "base:operations", + "special": "base:special", + } + template_dict = {} + if traits_dict["resumeDate"] is not None: + resume_date = datetime.strptime(traits_dict["resumeDate"], "%y.%j").date() + date = datetime.now() + if resume_date <= date: + template_dict["base:resume"] = True + else: + template_dict["base:resume"] = resume_date.strftime("%m/%d/%y") + if traits_dict["revokeDate"] is not None: + revoke_date = datetime.strptime(traits_dict["revokeDate"], "%y.%j").date() + traits_dict["revokeDate"] = True + if revoke_date <= date: + template_dict["base:revoke"] = True + else: + template_dict["base:revoke"] = revoke_date.strftime("%m/%d/%y") + for trait, value in traits_dict.items(): + if trait in connection_map: + template_dict[connection_map[trait]] = value + for attribute in traits_dict["connectAttributes"]: + if attribute in attribute_map: + template_dict[attribute_map[attribute]] = True + return template_dict + def _get_field( self, profile: Union[dict, bytes], diff --git a/pyracf/group/group_admin.py b/pyracf/group/group_admin.py index 5f0af32..5afeda8 100644 --- a/pyracf/group/group_admin.py +++ b/pyracf/group/group_admin.py @@ -166,6 +166,7 @@ def extract( segments: List[str] = [], profile_only: bool = False, group_template=False, + connection_template=False, ) -> Union[dict, bytes]: """Extract a group's profile.""" self._build_segment_dictionary(segments) @@ -176,6 +177,8 @@ def extract( return self._get_profile(result) if group_template: return self._build_template(self._get_profile(result)) + if connection_template: + return self._build_connection_template(self._get_profile(result), group) return result def delete(self, group: str) -> Union[dict, bytes]: diff --git a/pyracf/user/user_admin.py b/pyracf/user/user_admin.py index 0f018e4..1343620 100644 --- a/pyracf/user/user_admin.py +++ b/pyracf/user/user_admin.py @@ -817,6 +817,7 @@ def extract( segments: List[str] = [], profile_only: bool = False, user_template: bool = False, + connection_template: bool = False, ) -> Union[dict, bytes]: """Extract a user's profile.""" self._build_segment_dictionary(segments) @@ -827,6 +828,8 @@ def extract( return self._get_profile(result) if user_template: return self._build_template(self._get_profile(result)) + if connection_template: + return self._build_connection_template(self._get_profile(result), userid) return result def delete(self, userid: str) -> Union[dict, bytes]: diff --git a/tests/group/test_group_constants.py b/tests/group/test_group_constants.py index de2e8a5..78b3050 100644 --- a/tests/group/test_group_constants.py +++ b/tests/group/test_group_constants.py @@ -96,6 +96,27 @@ def get_sample(sample_file: str) -> Union[str, bytes]: "base:terminal_universal_access": True, "omvs:gid": 1234567, } +TEST_EXTRACT_GROUP_CONNECTION_TEMPLATE_TRAITS = [ + { + "userid": "eswift", + "group": "testgrp0", + "traits": { + "base:group_authority": "use", + "base:universal_access": None, + "base:special": True, + }, + }, + { + "userid": "leonard", + "group": "testgrp0", + "traits": { + "base:group_authority": "use", + "base:universal_access": None, + "base:operations": True, + }, + }, +] + # Delete Group TEST_DELETE_GROUP_REQUEST_XML = get_sample("delete_group_request.xml") diff --git a/tests/group/test_group_result_parser.py b/tests/group/test_group_result_parser.py index 40dd674..8e2072e 100644 --- a/tests/group/test_group_result_parser.py +++ b/tests/group/test_group_result_parser.py @@ -257,3 +257,19 @@ def test_group_admin_can_build_template_group_base_omvs_tso_revoke_resume_succes ), TestGroupConstants.TEST_EXTRACT_GROUP_BASE_OMVS_TEMPLATE_TRAITS, ) + + def test_group_admin_can_build_template_connection_success_xml( + self, + call_racf_mock: Mock, + ): + call_racf_mock.return_value = ( + TestGroupConstants.TEST_EXTRACT_GROUP_RESULT_BASE_ONLY_SUCCESS_XML + ) + template = self.group_admin.extract( + "testgrp0", segments=["omvs"], connection_template=True + ) + print(template) + self.assertEqual( + template, + TestGroupConstants.TEST_EXTRACT_GROUP_CONNECTION_TEMPLATE_TRAITS, + ) diff --git a/tests/user/test_user_constants.py b/tests/user/test_user_constants.py index 02aacfe..e19bc52 100644 --- a/tests/user/test_user_constants.py +++ b/tests/user/test_user_constants.py @@ -262,6 +262,13 @@ def get_sample(sample_file: str) -> Union[str, bytes]: "tso:user_data": "abcd", "tso:logon_command": "ispf", } +TEST_EXTRACT_USER_CONNECTION_TEMPLATE_TRAITS = [ + { + "userid": "squidwrd", + "group": "SYS1", + "traits": {"base:group_authority": "use", "base:owner": "leonard"}, + } +] # Delete User TEST_DELETE_USER_REQUEST_XML = get_sample("delete_user_request.xml") diff --git a/tests/user/test_user_result_parser.py b/tests/user/test_user_result_parser.py index 120df30..1736c86 100644 --- a/tests/user/test_user_result_parser.py +++ b/tests/user/test_user_result_parser.py @@ -539,7 +539,7 @@ def test_user_admin_custom_secret_redacted_on_error( ) # ============================================================================ - # Build template user + # Build Templates # ============================================================================ def test_user_admin_can_build_template_user_base_omvs_tso_revoke_resume_success_xml( @@ -555,3 +555,17 @@ def test_user_admin_can_build_template_user_base_omvs_tso_revoke_resume_success_ ), TestUserConstants.TEST_EXTRACT_USER_BASE_OMVS_TSO_REVOKE_RESUME_TEMPLATE_TRAITS, ) + + def test_user_admin_can_build_template_connection_success_xml( + self, + call_racf_mock: Mock, + ): + call_racf_mock.return_value = ( + TestUserConstants.TEST_EXTRACT_USER_RESULT_BASE_OMVS_TSO_REVOKE_RESUME_XML + ) + self.assertEqual( + self.user_admin.extract( + "squidwrd", segments=["omvs", "tso"], connection_template=True + ), + TestUserConstants.TEST_EXTRACT_USER_CONNECTION_TEMPLATE_TRAITS, + )