diff --git a/pyracf/common/security_admin.py b/pyracf/common/security_admin.py index a080fe61..e026e4ee 100644 --- a/pyracf/common/security_admin.py +++ b/pyracf/common/security_admin.py @@ -3,6 +3,7 @@ import platform import re import xml.etree.ElementTree +from collections import Counter from datetime import datetime from typing import Any, List, Tuple, Union @@ -189,6 +190,77 @@ def __add_additional_secret_traits(self, additional_secret_traits: list) -> None continue self.__secret_traits[secret] = self._valid_segment_traits[segment][secret] + # ============================================================================ + # Common Subfunctions for Child Classes + # ============================================================================ + def _validate_access_levels( + self, + success: Union[str, None] = None, + failure: Union[str, None] = None, + all: Union[str, None] = None, + ): + valid_access_levels = ("alter", "control", "read", "update") + value_error_text = ( + "Valid access levels include 'alter', 'control', 'read', and 'update'." + ) + bad_access_levels = [] + for attempt_argument in (success, failure, all): + if ( + attempt_argument is not None + and str(attempt_argument).lower() not in valid_access_levels + ): + bad_access_levels.append(attempt_argument) + match len(bad_access_levels): + case 0: + self.__check_for_duplicates([success, failure, all], "Access Level") + return + case 1: + value_error_text = ( + f"'{bad_access_levels[0]}' is not a valid access level. " + + f"{value_error_text}" + ) + case 2: + value_error_text = ( + f"'{bad_access_levels[0]}' and '{bad_access_levels[1]}' are not valid " + + f"access levels. {value_error_text}" + ) + case _: + bad_access_levels = [ + f"'{bad_access_level}'" for bad_access_level in bad_access_levels + ] + bad_access_levels[-1] = f"and {bad_access_levels[-1]} " + value_error_text = ( + f"{', '.join(bad_access_levels)}are not valid access levels. " + + f"{value_error_text}" + ) + raise ValueError(value_error_text) + + def _build_traits_from_audit_rules(self, audit_rules: dict) -> dict: + traits = {} + if "success" in audit_rules: + traits[f"base:audit_{audit_rules['success']}"] = "success" + if "failures" in audit_rules: + traits[f"base:audit_{audit_rules['failures']}"] = "failure" + if "all" in audit_rules: + traits[f"base:audit_{audit_rules['all']}"] = "all" + return traits + + def __check_for_duplicates(self, argument_list: list, argument: str) -> None: + duplicates = [ + key + for (key, value) in Counter(argument_list).items() + if (value > 1 and key is not None) + ] + if duplicates == []: + return + value_error_text = [] + for duplicate in duplicates: + value_error_text.append( + f"'{duplicate}' is provided as an '{argument}' multiple times, which is not " + + "allowed." + ) + raise ValueError("\n".join(value_error_text)) + # ============================================================================ # Request Execution # ============================================================================ @@ -444,6 +516,32 @@ def _get_profile( index ] + def _build_template( + self, + profile: dict, + ) -> dict: + """Map the profile to a template for an add or alter request.""" + if self._generate_requests_only: + # Allows this function to work with "self._generate_requests_only" mode. + return profile + template = {} + for segment in profile.keys(): + for trait in profile[segment].keys(): + if ( + f"{segment}:{self.__camel_case_to_snake_case(trait)}" + not in self._valid_segment_traits[segment] + ): + continue + template[f"{segment}:{self.__camel_case_to_snake_case(trait)}"] = ( + profile[segment][trait] + ) + if "Date" in trait: + template[f"{segment}:{self.__camel_case_to_snake_case(trait)}"] = ( + re.sub(r"20([0-9][0-9])", r"\1", profile[segment][trait]) + ) + print(template) + return template + def _get_field( self, profile: Union[dict, bytes], @@ -975,6 +1073,11 @@ def _profile_field_to_camel_case(self, segment: str, field: str) -> str: [field_token.title() for field_token in field_tokens[1:]] ) + def __camel_case_to_snake_case(self, string: str) -> str: + """Convert a camel case string to snake case""" + string = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", string) + return re.sub("([a-z0-9])([A-Z])", r"\1_\2", string).lower() + # ============================================================================ # Result Dictionary Parsing Functions # ============================================================================ diff --git a/pyracf/data_set/data_set_admin.py b/pyracf/data_set/data_set_admin.py index 9233ee0c..c5f387e5 100644 --- a/pyracf/data_set/data_set_admin.py +++ b/pyracf/data_set/data_set_admin.py @@ -80,7 +80,7 @@ def __init__( ) # ============================================================================ - # Access + # Universal Access # ============================================================================ def get_universal_access(self, data_set: str) -> Union[str, bytes, None]: """Get universal access for data set profile.""" @@ -94,11 +94,132 @@ def set_universal_access( result = self.alter(data_set, {"base:universal_access": universal_acccess}) return self._to_steps(result) + # ============================================================================ + # Individual Access + # ============================================================================ def get_my_access(self, data_set: str) -> Union[str, bytes, None]: - """Get the access associated with your own data set profile.""" + """Get your own level of access associated with a data set profile.""" + profile = self.extract(data_set, profile_only=True) + return self._get_field(profile, "base", "yourAccess") + + def get_user_access(self, data_set: str, userid: str) -> Union[str, bytes, None]: + """Get a target user's own level of access associated with a data set profile.""" + original_userid = self.get_running_userid() + self.set_running_userid(userid) profile = self.extract(data_set, profile_only=True) + self.set_running_userid(original_userid) return self._get_field(profile, "base", "yourAccess") + # ============================================================================ + # Auditing Rules + # ============================================================================ + def get_audit_rules(self, data_set: str) -> Union[dict, bytes, None]: + """Get the auditing rules associated with this data set profile.""" + profile = self.extract(data_set, profile_only=True) + return self._get_field(profile, "base", "auditing") + + def overwrite_audit_rules_by_attempt( + self, + data_set: str, + success: Union[str, None] = None, + failure: Union[str, None] = None, + all: Union[str, None] = None, + ) -> Union[dict, bytes]: + """ + Overwrites the auditing rules for this data set profile with new + rules to audit based on specified access attempts. + """ + self._validate_access_levels(success, failure, all) + traits = {} + if success is not None: + traits[f"base:audit_{success}"] = "success" + if failure is not None: + traits[f"base:audit_{failure}"] = "failure" + if all is not None: + traits[f"base:audit_{all}"] = "all" + result = self.alter(data_set, traits=traits) + return self._to_steps(result) + + def alter_audit_rules_by_attempt( + self, + data_set: str, + success: Union[str, None] = None, + failure: Union[str, None] = None, + all: Union[str, None] = None, + ) -> Union[dict, bytes]: + """ + Alters the auditing rules for this data set profile with new rules + to audit by access level, preserving existing non-conflicting rules. + """ + self._validate_access_levels(success, failure, all) + audit_rules = self.get_audit_rules(data_set) + traits = self._build_traits_from_audit_rules(audit_rules) + if success is not None: + traits[f"base:audit_{success}"] = "success" + if failure is not None: + traits[f"base:audit_{failure}"] = "failure" + if all is not None: + traits[f"base:audit_{all}"] = "all" + result = self.alter(data_set, traits=traits) + return self._to_steps(result) + + def overwrite_audit_rules_by_access_level( + self, + data_set: str, + alter: Union[str, None] = None, + control: Union[str, None] = None, + read: Union[str, None] = None, + update: Union[str, None] = None, + ) -> Union[dict, bytes]: + """ + Overwrites the auditing rules for this data set profile with new + rules to audit based on specified access levels. + """ + traits = {} + if alter is not None: + traits["base:audit_alter"] = alter + if control is not None: + traits["base:audit_control"] = control + if read is not None: + traits["base:audit_read"] = read + if update is not None: + traits["base:audit_update"] = update + result = self.alter(data_set, traits=traits) + return self._to_steps(result) + + def alter_audit_rules_by_access_level( + self, + data_set: str, + alter: Union[str, None] = None, + control: Union[str, None] = None, + read: Union[str, None] = None, + update: Union[str, None] = None, + ) -> Union[dict, bytes]: + """ + Alters the auditing rules for this data set profile with a new + rule to audit alter access, preserving existing non-conflicting rules. + """ + audit_rules = self.get_audit_rules(data_set) + traits = self._build_traits_from_audit_rules(audit_rules) + if alter is not None: + traits["base:audit_alter"] = alter + if control is not None: + traits["base:audit_control"] = control + if read is not None: + traits["base:audit_read"] = read + if update is not None: + traits["base:audit_update"] = update + result = self.alter(data_set, traits=traits) + return self._to_steps(result) + + def remove_all_audit_rules( + self, + data_set: str, + ) -> Union[dict, bytes]: + """Clears the auditing rules completely.""" + result = self.alter(data_set, {"base:audit_none": True}) + return self._to_steps(result) + # ============================================================================ # Base Functions # ============================================================================ @@ -162,6 +283,7 @@ def extract( volume: Union[str, None] = None, generic: bool = False, profile_only: bool = False, + data_set_template: bool = False, ) -> Union[dict, bytes]: """Extract a data set profile.""" self._build_segment_dictionary(segments) @@ -170,6 +292,8 @@ def extract( result = self._extract_and_check_result(data_set_request) if profile_only: return self._get_profile(result) + if data_set_template: + return self._build_template(self._get_profile(result)) return result def delete( diff --git a/pyracf/group/group_admin.py b/pyracf/group/group_admin.py index bcc2f707..5f0af32b 100644 --- a/pyracf/group/group_admin.py +++ b/pyracf/group/group_admin.py @@ -27,7 +27,7 @@ def __init__( self._valid_segment_traits = { "base": { "base:installation_data": "racf:data", - "base:data_set_model": "racf:model", + "base:model_data_set": "racf:model", "base:owner": "racf:owner", "base:superior_group": "racf:supgroup", "base:terminal_universal_access": "racf:termuacc", @@ -161,7 +161,11 @@ def alter(self, group: str, traits: dict) -> Union[dict, bytes]: return self._make_request(group_request, irrsmo00_precheck=True) def extract( - self, group: str, segments: List[str] = [], profile_only: bool = False + self, + group: str, + segments: List[str] = [], + profile_only: bool = False, + group_template=False, ) -> Union[dict, bytes]: """Extract a group's profile.""" self._build_segment_dictionary(segments) @@ -170,6 +174,8 @@ def extract( result = self._extract_and_check_result(group_request) if profile_only: return self._get_profile(result) + if group_template: + return self._build_template(self._get_profile(result)) return result def delete(self, group: str) -> Union[dict, bytes]: diff --git a/pyracf/resource/resource_admin.py b/pyracf/resource/resource_admin.py index 2a4c04b0..de4a1eeb 100644 --- a/pyracf/resource/resource_admin.py +++ b/pyracf/resource/resource_admin.py @@ -1,6 +1,5 @@ """General Resource Profile Administration.""" -from collections import Counter from typing import List, Union from pyracf.common.exceptions.add_operation_error import AddOperationError @@ -312,7 +311,7 @@ def overwrite_audit_rules_by_attempt( Overwrites the auditing rules for this general resource profile with new rules to audit based on specified access attempts. """ - self.__validate_access_levels(success, failure, all) + self._validate_access_levels(success, failure, all) traits = {} if success is not None: traits[f"base:audit_{success}"] = "success" @@ -335,15 +334,9 @@ def alter_audit_rules_by_attempt( Alters the auditing rules for this general resource profile with new rules to audit by access level, preserving existing non-conflicting rules. """ - self.__validate_access_levels(success, failure, all) + self._validate_access_levels(success, failure, all) audit_rules = self.get_audit_rules(resource, class_name) - traits = {} - if "success" in audit_rules: - traits[f"base:audit_{audit_rules['success']}"] = "success" - if "failures" in audit_rules: - traits[f"base:audit_{audit_rules['failures']}"] = "failure" - if "all" in audit_rules: - traits[f"base:audit_{audit_rules['all']}"] = "all" + traits = self._build_traits_from_audit_rules(audit_rules) if success is not None: traits[f"base:audit_{success}"] = "success" if failure is not None: @@ -392,13 +385,7 @@ def alter_audit_rules_by_access_level( rule to audit alter access, preserving existing non-conflicting rules. """ audit_rules = self.get_audit_rules(resource, class_name) - traits = {} - if "success" in audit_rules: - traits[f"base:audit_{audit_rules['success']}"] = "success" - if "failures" in audit_rules: - traits[f"base:audit_{audit_rules['failures']}"] = "failure" - if "all" in audit_rules: - traits[f"base:audit_{audit_rules['all']}"] = "all" + traits = self._build_traits_from_audit_rules(audit_rules) if alter is not None: traits["base:audit_alter"] = alter if control is not None: @@ -686,6 +673,7 @@ def extract( class_name: str, segments: List[str] = [], profile_only: bool = False, + resource_template: bool = False, ) -> Union[dict, bytes]: """Extract a general resource profile.""" self._build_segment_dictionary(segments) @@ -694,6 +682,8 @@ def extract( result = self._extract_and_check_result(resource_request) if profile_only: return self._get_profile(result) + if resource_template: + return self._build_template(self._get_profile(result)) return result def delete( @@ -738,61 +728,3 @@ def _format_profile(self, result: dict) -> None: del result["securityResult"]["resource"]["commands"][0]["messages"] result["securityResult"]["resource"]["commands"][0]["profiles"] = profiles - - def __validate_access_levels( - self, - success: Union[str, None] = None, - failure: Union[str, None] = None, - all: Union[str, None] = None, - ): - valid_access_levels = ("alter", "control", "read", "update") - value_error_text = ( - "Valid access levels include 'alter', 'control', 'read', and 'update'." - ) - bad_access_levels = [] - for attempt_argument in (success, failure, all): - if ( - attempt_argument is not None - and str(attempt_argument).lower() not in valid_access_levels - ): - bad_access_levels.append(attempt_argument) - match len(bad_access_levels): - case 0: - self.__check_for_duplicates([success, failure, all], "Access Level") - return - case 1: - value_error_text = ( - f"'{bad_access_levels[0]}' is not a valid access level. " - + f"{value_error_text}" - ) - case 2: - value_error_text = ( - f"'{bad_access_levels[0]}' and '{bad_access_levels[1]}' are not valid " - + f"access levels. {value_error_text}" - ) - case _: - bad_access_levels = [ - f"'{bad_access_level}'" for bad_access_level in bad_access_levels - ] - bad_access_levels[-1] = f"and {bad_access_levels[-1]} " - value_error_text = ( - f"{', '.join(bad_access_levels)}are not valid access levels. " - + f"{value_error_text}" - ) - raise ValueError(value_error_text) - - def __check_for_duplicates(self, argument_list: list, argument: str) -> None: - duplicates = [ - key - for (key, value) in Counter(argument_list).items() - if (value > 1 and key is not None) - ] - if duplicates == []: - return - value_error_text = [] - for duplicate in duplicates: - value_error_text.append( - f"'{duplicate}' is provided as an '{argument}' multiple times, which is not " - + "allowed." - ) - raise ValueError("\n".join(value_error_text)) diff --git a/pyracf/user/user_admin.py b/pyracf/user/user_admin.py index 86db789e..0f018e48 100644 --- a/pyracf/user/user_admin.py +++ b/pyracf/user/user_admin.py @@ -812,7 +812,11 @@ def alter(self, userid: str, traits: dict) -> Union[dict, bytes]: return self._make_request(user_request, irrsmo00_precheck=True) def extract( - self, userid: str, segments: List[str] = [], profile_only: bool = False + self, + userid: str, + segments: List[str] = [], + profile_only: bool = False, + user_template: bool = False, ) -> Union[dict, bytes]: """Extract a user's profile.""" self._build_segment_dictionary(segments) @@ -821,6 +825,8 @@ def extract( result = self._extract_and_check_result(user_request) if profile_only: return self._get_profile(result) + if user_template: + return self._build_template(self._get_profile(result)) return result def delete(self, userid: str) -> Union[dict, bytes]: diff --git a/tests/common/test_common_constants.py b/tests/common/test_common_constants.py index 7a0c83ff..318062a1 100644 --- a/tests/common/test_common_constants.py +++ b/tests/common/test_common_constants.py @@ -103,9 +103,6 @@ def get_sample(sample_file: str) -> Union[str, bytes]: TEST_RUNNING_USERID = "eswift" TEST_ALTER_USER_SUCCESS_AS_ESWIFT_LOG = get_sample("alter_user_success_as_eswift.log") -TEST_EXTRACT_RESOURCE_PRECHECK_AS_SQUIDWRD_LOG = get_sample( - "extract_resource_precheck_as_squidwrd.log" -) # ============================================================================ # Downstream Fatal Error diff --git a/tests/common/test_run_as_userid.py b/tests/common/test_run_as_userid.py index af90d468..ab34a34f 100644 --- a/tests/common/test_run_as_userid.py +++ b/tests/common/test_run_as_userid.py @@ -10,7 +10,7 @@ import tests.common.test_common_constants as TestCommonConstants import tests.user.test_user_constants as TestUserConstants -from pyracf import ResourceAdmin, UserAdmin, UserIdError +from pyracf import UserAdmin, UserIdError # Resolves F401 __init__ @@ -110,40 +110,3 @@ def test_get_running_userid(self): user_admin = UserAdmin(run_as_userid="ESWIFT") running_user = user_admin.get_running_userid() self.assertEqual(running_user, TestCommonConstants.TEST_RUNNING_USERID) - - @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") - def test_get_user_access( - self, - call_racf_mock: Mock, - ): - precheck_profile_as_squidwrd = ( - TestCommonConstants.TEST_EXTRACT_RESOURCE_RESULT_PRECHECK_SUCCESS_XML - ) - precheck_profile_as_squidwrd = precheck_profile_as_squidwrd.replace( - " 00 ESWIFT READ ALTER NO", - " 00 ESWIFT READ READ NO", - ) - resource_admin = ResourceAdmin(debug=True, run_as_userid="ESWIFT") - call_racf_mock.return_value = precheck_profile_as_squidwrd - stdout = io.StringIO() - with contextlib.redirect_stdout(stdout): - access = resource_admin.get_user_access( - "IRR.IRRSMO00.PRECHECK", "XFACILIT", "squidwrd" - ) - success_log = self.ansi_escape.sub("", stdout.getvalue()) - self.assertEqual( - success_log, - TestCommonConstants.TEST_EXTRACT_RESOURCE_PRECHECK_AS_SQUIDWRD_LOG, - ) - self.assertEqual(access, "read") - - def test_get_user_access_raises_userid_error(self): - userid = "squidwrdtest" - resource_admin = ResourceAdmin(debug=True, run_as_userid="ESWIFT") - with self.assertRaises(UserIdError) as exception: - resource_admin.get_user_access("IRR.IRRSMO00.PRECHECK", "XFACILIT", userid) - self.assertEqual( - exception.exception.message, - f"Unable to run as userid '{userid}'. Userid must " - + "be a string value between 1 to 8 characters in length.", - ) diff --git a/tests/data_set/data_set_log_samples/extract_data_set_base_as_squidwrd_success.log b/tests/data_set/data_set_log_samples/extract_data_set_base_as_squidwrd_success.log new file mode 100644 index 00000000..e3f8036e --- /dev/null +++ b/tests/data_set/data_set_log_samples/extract_data_set_base_as_squidwrd_success.log @@ -0,0 +1,166 @@ + + [pyRACF:Debug] + Request Dictionary + DataSetAdmin.get_user_access() + + +{} + + + [pyRACF:Debug] + Request XML + DataSetAdmin.get_user_access() + + + + + + + + [pyRACF:Debug] + Result XML + DataSetAdmin.get_user_access() + + + + + + + 0 + 0 + 0 + LISTDSD DATASET ('ESWIFT.TEST.T1136242.P3020470') + INFORMATION FOR DATASET ESWIFT.TEST.T1136242.P3020470 + + LEVEL OWNER UNIVERSAL ACCESS WARNING ERASE + ----- -------- ---------------- ------- ----- + 00 ESWIFT READ NO NO + + AUDITING + -------- + FAILURES(READ) + + NOTIFY + -------- + NO USER TO BE NOTIFIED + + YOUR ACCESS CREATION GROUP DATASET TYPE + ----------- -------------- ------------ + READ SYS1 NON-VSAM + + VOLUMES ON WHICH DATASET RESIDES + -------------------------------- + USRAT2 + + NO INSTALLATION DATA + + + 0 + 0 + + + + [pyRACF:Debug] + Result Dictionary + DataSetAdmin.get_user_access() + + +{ + "securityResult": { + "dataSet": { + "name": "ESWIFT.TEST.T1136242.P3020470", + "operation": "listdata", + "generic": "no", + "requestId": "DatasetRequest", + "commands": [ + { + "safReturnCode": 0, + "returnCode": 0, + "reasonCode": 0, + "image": "LISTDSD DATASET ('ESWIFT.TEST.T1136242.P3020470')", + "messages": [ + "INFORMATION FOR DATASET ESWIFT.TEST.T1136242.P3020470", + null, + "LEVEL OWNER UNIVERSAL ACCESS WARNING ERASE", + "----- -------- ---------------- ------- -----", + " 00 ESWIFT READ NO NO", + null, + "AUDITING", + "--------", + "FAILURES(READ)", + null, + "NOTIFY", + "--------", + "NO USER TO BE NOTIFIED", + null, + "YOUR ACCESS CREATION GROUP DATASET TYPE", + "----------- -------------- ------------", + " READ SYS1 NON-VSAM", + null, + "VOLUMES ON WHICH DATASET RESIDES", + "--------------------------------", + "USRAT2", + null, + "NO INSTALLATION DATA" + ] + } + ] + }, + "returnCode": 0, + "reasonCode": 0, + "runningUserid": "squidwrd" + } +} + + + [pyRACF:Debug] + Result Dictionary (Formatted Profile) + DataSetAdmin.get_user_access() + + +{ + "securityResult": { + "dataSet": { + "name": "ESWIFT.TEST.T1136242.P3020470", + "operation": "listdata", + "generic": "no", + "requestId": "DatasetRequest", + "commands": [ + { + "safReturnCode": 0, + "returnCode": 0, + "reasonCode": 0, + "image": "LISTDSD DATASET ('ESWIFT.TEST.T1136242.P3020470')", + "profiles": [ + { + "base": { + "name": "eswift.test.t1136242.p3020470", + "level": 0, + "owner": "eswift", + "universalAccess": "read", + "warning": null, + "erase": null, + "auditing": { + "failures": "read" + }, + "notify": null, + "yourAccess": "read", + "creationGroup": "sys1", + "dataSetType": "non-vsam", + "volumes": [ + "usrat2" + ], + "installationData": null, + "generic": false + } + } + ] + } + ] + }, + "returnCode": 0, + "reasonCode": 0, + "runningUserid": "squidwrd" + } +} + diff --git a/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level.xml b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level.xml new file mode 100644 index 00000000..5ca40360 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level.xml @@ -0,0 +1,9 @@ + + + + success + failure + success + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_all.xml b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_all.xml new file mode 100644 index 00000000..a7552b39 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_all.xml @@ -0,0 +1,10 @@ + + + + all + all + success + failure + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_multiple.xml b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_multiple.xml new file mode 100644 index 00000000..10727317 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_multiple.xml @@ -0,0 +1,10 @@ + + + + success + failure + success + failure + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_none.xml b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_none.xml new file mode 100644 index 00000000..16932c33 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_access_level_none.xml @@ -0,0 +1,8 @@ + + + + success + all + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt.xml b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt.xml new file mode 100644 index 00000000..d4d13e8b --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt.xml @@ -0,0 +1,9 @@ + + + + success + failure + failure + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_all.xml b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_all.xml new file mode 100644 index 00000000..6cb37e3d --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_all.xml @@ -0,0 +1,10 @@ + + + + success + all + success + failure + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_multiple.xml b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_multiple.xml new file mode 100644 index 00000000..8a7f1a00 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_multiple.xml @@ -0,0 +1,9 @@ + + + + success + all + failure + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_none.xml b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_none.xml new file mode 100644 index 00000000..16932c33 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_alter_audit_rules_by_attempt_none.xml @@ -0,0 +1,8 @@ + + + + success + all + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level.xml b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level.xml new file mode 100644 index 00000000..1df0f188 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level.xml @@ -0,0 +1,7 @@ + + + + success + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_all.xml b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_all.xml new file mode 100644 index 00000000..894a0fec --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_all.xml @@ -0,0 +1,10 @@ + + + + success + all + failure + success + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_multiple.xml b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_multiple.xml new file mode 100644 index 00000000..516271a1 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_multiple.xml @@ -0,0 +1,8 @@ + + + + success + all + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_none.xml b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_none.xml new file mode 100644 index 00000000..e7ede1fd --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_access_level_none.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt.xml b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt.xml new file mode 100644 index 00000000..d7545b61 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt.xml @@ -0,0 +1,7 @@ + + + + failure + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_all.xml b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_all.xml new file mode 100644 index 00000000..d08abcf2 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_all.xml @@ -0,0 +1,9 @@ + + + + success + failure + all + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_multiple.xml b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_multiple.xml new file mode 100644 index 00000000..aadf8d91 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_multiple.xml @@ -0,0 +1,8 @@ + + + + success + all + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_none.xml b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_none.xml new file mode 100644 index 00000000..e7ede1fd --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_overwrite_audit_rules_by_attempt_none.xml @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/tests/data_set/data_set_request_samples/data_set_remove_all_audit_rules.xml b/tests/data_set/data_set_request_samples/data_set_remove_all_audit_rules.xml new file mode 100644 index 00000000..289adeb3 --- /dev/null +++ b/tests/data_set/data_set_request_samples/data_set_remove_all_audit_rules.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/tests/data_set/test_data_set_constants.py b/tests/data_set/test_data_set_constants.py index cf050ab2..beaabcdb 100644 --- a/tests/data_set/test_data_set_constants.py +++ b/tests/data_set/test_data_set_constants.py @@ -110,6 +110,7 @@ def get_sample(sample_file: str) -> Union[str, bytes]: TEST_EXTRACT_DATA_SET_REQUEST_GENERIC_BASE_TRAITS = { "datasetname": "ESWIFT.TEST.T1136242.*" } +TEST_EXTRACT_DATA_SET_BASE_ONLY_TEMPLATE_TRAITS = {} # Delete Data Set TEST_DELETE_DATA_SET_REQUEST_XML = get_sample("delete_data_set_request.xml") @@ -119,6 +120,68 @@ def get_sample(sample_file: str) -> Union[str, bytes]: # ============================================================================ TEST_DATA_SET_SET_UNIVERSAL_ACCESS_XML = get_sample("data_set_set_universal_access.xml") +# Audit Rules Request Samples +TEST_DATA_SET_REMOVE_ALL_AUDIT_RULES_REQUEST_XML = get_sample( + "data_set_remove_all_audit_rules.xml" +) +TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ATTEMPT_REQUEST_XML = get_sample( + "data_set_alter_audit_rules_by_attempt.xml" +) +TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ATTEMPT_MULT_REQUEST_XML = get_sample( + "data_set_alter_audit_rules_by_attempt_multiple.xml" +) +TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ATTEMPT_ALL_REQUEST_XML = get_sample( + "data_set_alter_audit_rules_by_attempt_all.xml" +) +TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ATTEMPT_NONE_REQUEST_XML = get_sample( + "data_set_alter_audit_rules_by_attempt_none.xml" +) +TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ACCESS_LEVEL_REQUEST_XML = get_sample( + "data_set_alter_audit_rules_by_access_level.xml" +) +TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ACCESS_LEVEL_MULT_REQUEST_XML = get_sample( + "data_set_alter_audit_rules_by_access_level_multiple.xml" +) +TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ACCESS_LEVEL_ALL_REQUEST_XML = get_sample( + "data_set_alter_audit_rules_by_access_level_all.xml" +) +TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ACCESS_LEVEL_NONE_REQUEST_XML = get_sample( + "data_set_alter_audit_rules_by_access_level_none.xml" +) +TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ATTEMPT_REQUEST_XML = get_sample( + "data_set_overwrite_audit_rules_by_attempt.xml" +) +TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ATTEMPT_MULT_REQUEST_XML = get_sample( + "data_set_overwrite_audit_rules_by_attempt_multiple.xml" +) +TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ATTEMPT_ALL_REQUEST_XML = get_sample( + "data_set_overwrite_audit_rules_by_attempt_all.xml" +) +TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ATTEMPT_NONE_REQUEST_XML = get_sample( + "data_set_overwrite_audit_rules_by_attempt_none.xml" +) +TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ACCESS_LEVEL_REQUEST_XML = get_sample( + "data_set_overwrite_audit_rules_by_access_level.xml" +) +# The following Test variables break convention to avoid E501 length errors from Flake8 +TEST_DATA_SET_OVERWRITE_AUDIT_BY_ACCESS_LEVEL_MULT_REQUEST_XML = get_sample( + "data_set_overwrite_audit_rules_by_access_level_multiple.xml" +) +TEST_DATA_SET_OVERWRITE_AUDIT_BY_ACCESS_LEVEL_ALL_REQUEST_XML = get_sample( + "data_set_overwrite_audit_rules_by_access_level_all.xml" +) +TEST_DATA_SET_OVERWRITE_AUDIT_BY_ACCESS_LEVEL_NONE_REQUEST_XML = get_sample( + "data_set_overwrite_audit_rules_by_access_level_none.xml" +) + + +# ============================================================================ +# Data Set Administration Getters Result Data +# ============================================================================ + +TEST_GET_AUDIT_RULES = {"success": "update", "failures": "read"} +TEST_GET_AUDIT_RULES_SINGLE = {"failures": "read"} +TEST_GET_AUDIT_RULES_WITH_ALL = {"success": "update", "all": "read"} # ============================================================================ # Debug Logging @@ -133,3 +196,6 @@ def get_sample(sample_file: str) -> Union[str, bytes]: TEST_EXTRACT_DATA_SET_BASE_ONLY_ERROR_LOG = get_sample( "extract_data_set_base_only_error.log" ) +TEST_EXTRACT_DATA_SET_BASE_AS_SQUIDWRD_SUCCESS = get_sample( + "extract_data_set_base_as_squidwrd_success.log" +) diff --git a/tests/data_set/test_data_set_getters.py b/tests/data_set/test_data_set_getters.py index 63d3f4d1..1a55c949 100644 --- a/tests/data_set/test_data_set_getters.py +++ b/tests/data_set/test_data_set_getters.py @@ -1,25 +1,29 @@ """Test data set profile getter functions.""" +import contextlib +import io +import re import unittest from unittest.mock import Mock, patch import __init__ import tests.data_set.test_data_set_constants as TestDataSetConstants -from pyracf import DataSetAdmin, SecurityRequestError +from pyracf import DataSetAdmin, SecurityRequestError, UserIdError # Resolves F401 __init__ -@patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") class TestDataSetGetters(unittest.TestCase): maxDiff = None + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") data_set_admin = DataSetAdmin() # ============================================================================ - # Access + # Universal Access # ============================================================================ + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_data_set_admin_get_universal_access_returns_valid_when_read( self, call_racf_mock: Mock, @@ -32,6 +36,7 @@ def test_data_set_admin_get_universal_access_returns_valid_when_read( "read", ) + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_data_set_admin_get_universal_access_returns_valid_when_none( self, call_racf_mock: Mock, @@ -51,6 +56,7 @@ def test_data_set_admin_get_universal_access_returns_valid_when_none( ) # Error in environment, ESWIFT.TEST.T1136242.P3020470 already deleted/not added + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_data_set_admin_get_universal_access_raises_an_exception_when_extract_fails( self, call_racf_mock: Mock, @@ -61,6 +67,10 @@ def test_data_set_admin_get_universal_access_raises_an_exception_when_extract_fa with self.assertRaises(SecurityRequestError): self.data_set_admin.get_universal_access("ESWIFT.TEST.T1136242.P3020470") + # ============================================================================ + # Individual Access + # ============================================================================ + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_data_set_admin_get_my_access_returns_valid_when_alter( self, call_racf_mock: Mock, @@ -72,6 +82,7 @@ def test_data_set_admin_get_my_access_returns_valid_when_alter( self.data_set_admin.get_my_access("ESWIFT.TEST.T1136242.P3020470"), "alter" ) + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_data_set_admin_get_my_access_returns_valid_when_none( self, call_racf_mock: Mock, @@ -89,6 +100,7 @@ def test_data_set_admin_get_my_access_returns_valid_when_none( ) # Error in environment, ESWIFT.TEST.T1136242.P3020470 already deleted/not added + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_data_set_admin_get_my_access_raises_an_exception_when_extract_fails( self, call_racf_mock: Mock, @@ -98,3 +110,106 @@ def test_data_set_admin_get_my_access_raises_an_exception_when_extract_fails( ) with self.assertRaises(SecurityRequestError): self.data_set_admin.get_my_access("ESWIFT.TEST.T1136242.P3020470") + + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") + def test_get_user_access_data_set( + self, + call_racf_mock: Mock, + ): + precheck_profile_as_squidwrd = ( + TestDataSetConstants.TEST_EXTRACT_DATA_SET_RESULT_BASE_ONLY_SUCCESS_XML + ) + precheck_profile_as_squidwrd = precheck_profile_as_squidwrd.replace( + " ALTER SYS1 NON-VSAM", + " READ SYS1 NON-VSAM", + ) + data_set_admin = DataSetAdmin(debug=True, run_as_userid="ESWIFT") + call_racf_mock.return_value = precheck_profile_as_squidwrd + stdout = io.StringIO() + with contextlib.redirect_stdout(stdout): + access = data_set_admin.get_user_access( + "ESWIFT.TEST.T1136242.P3020470", "squidwrd" + ) + success_log = self.ansi_escape.sub("", stdout.getvalue()) + self.assertEqual( + success_log, + TestDataSetConstants.TEST_EXTRACT_DATA_SET_BASE_AS_SQUIDWRD_SUCCESS, + ) + self.assertEqual(access, "read") + + def test_get_user_access_dataset_raises_userid_error(self): + userid = "squidwrdtest" + data_set_admin = DataSetAdmin(debug=True, run_as_userid="ESWIFT") + with self.assertRaises(UserIdError) as exception: + data_set_admin.get_user_access("ESWIFT.TEST.T1136242.P3020470", userid) + self.assertEqual( + exception.exception.message, + f"Unable to run as userid '{userid}'. Userid must " + + "be a string value between 1 to 8 characters in length.", + ) + + # ============================================================================ + # Auditing Rules + # ============================================================================ + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") + def test_data_set_admin_get_audit_rules( + self, + call_racf_mock: Mock, + ): + profile_with_success_and_failure_auditing = ( + TestDataSetConstants.TEST_EXTRACT_DATA_SET_RESULT_BASE_ONLY_SUCCESS_XML + ) + profile_with_success_and_failure_auditing = ( + profile_with_success_and_failure_auditing.replace( + "FAILURES(READ)", + "SUCCESS(UPDATE),FAILURES(READ)", + ) + ) + + call_racf_mock.return_value = profile_with_success_and_failure_auditing + self.assertEqual( + self.data_set_admin.get_audit_rules("ESWIFT.TEST.T1136242.P3020470"), + TestDataSetConstants.TEST_GET_AUDIT_RULES, + ) + + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") + def test_data_set_admin_get_audit_rules_single( + self, + call_racf_mock: Mock, + ): + call_racf_mock.return_value = ( + TestDataSetConstants.TEST_EXTRACT_DATA_SET_RESULT_BASE_ONLY_SUCCESS_XML + ) + self.assertEqual( + self.data_set_admin.get_audit_rules("ESWIFT.TEST.T1136242.P3020470"), + TestDataSetConstants.TEST_GET_AUDIT_RULES_SINGLE, + ) + + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") + def test_data_set_admin_get_audit_rules_none( + self, + call_racf_mock: Mock, + ): + profile_with_none_auditing = ( + TestDataSetConstants.TEST_EXTRACT_DATA_SET_RESULT_BASE_ONLY_SUCCESS_XML + ) + profile_with_none_auditing = profile_with_none_auditing.replace( + "FAILURES(READ)", + "NONE", + ) + call_racf_mock.return_value = profile_with_none_auditing + self.assertIsNone( + self.data_set_admin.get_audit_rules("ESWIFT.TEST.T1136242.P3020470") + ) + + # Error in environment, TESTING already deleted/not added + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") + def test_data_set_admin_get_audit_rules_raises_an_exception_when_extract_fails( + self, + call_racf_mock: Mock, + ): + call_racf_mock.return_value = ( + TestDataSetConstants.TEST_EXTRACT_DATA_SET_RESULT_BASE_ONLY_ERROR_XML + ) + with self.assertRaises(SecurityRequestError): + self.data_set_admin.get_audit_rules("ESWIFT.TEST.T1136242.P3020470") diff --git a/tests/data_set/test_data_set_result_parser.py b/tests/data_set/test_data_set_result_parser.py index 11e4f962..811ff600 100644 --- a/tests/data_set/test_data_set_result_parser.py +++ b/tests/data_set/test_data_set_result_parser.py @@ -287,3 +287,19 @@ def test_data_set_admin_can_parse_delete_data_set_error_xml( exception.exception.result, TestDataSetConstants.TEST_DELETE_DATA_SET_RESULT_ERROR_DICTIONARY, ) + + # ============================================================================ + # Build Template Data Set Profile + # ============================================================================ + + # def test_data_set_admin_can_build_template_data_set_base_success_xml( + # self, + # call_racf_mock: Mock, + # ): + # call_racf_mock.return_value = ( + # TestDataSetConstants.TEST_EXTRACT_DATA_SET_RESULT_BASE_ONLY_SUCCESS_XML + # ) + # self.assertEqual( + # self.data_set_admin.extract("ESWIFT.TEST.T1136242.P3020470",data_set_template = True), + # TestDataSetConstants.TEST_EXTRACT_DATA_SET_BASE_ONLY_TEMPLATE_TRAITS, + # ) diff --git a/tests/data_set/test_data_set_setters.py b/tests/data_set/test_data_set_setters.py index 2ad385a2..97a402ac 100644 --- a/tests/data_set/test_data_set_setters.py +++ b/tests/data_set/test_data_set_setters.py @@ -1,6 +1,7 @@ """Test data set setter functions.""" import unittest +from unittest.mock import Mock, patch import __init__ @@ -22,3 +23,300 @@ def test_data_set_admin_build_set_uacc_request(self): self.assertEqual( result, TestDataSetConstants.TEST_DATA_SET_SET_UNIVERSAL_ACCESS_XML ) + + # ============================================================================ + # Auditing Rules + # ============================================================================ + def test_data_set_admin_build_remove_all_audit_rules_request(self): + result = self.data_set_admin.remove_all_audit_rules( + "ESWIFT.TEST.T1136242.P3020470" + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_REMOVE_ALL_AUDIT_RULES_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_access_level_request(self): + result = self.data_set_admin.overwrite_audit_rules_by_access_level( + "ESWIFT.TEST.T1136242.P3020470", alter="success" + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ACCESS_LEVEL_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_access_level_none_request( + self, + ): + result = self.data_set_admin.overwrite_audit_rules_by_access_level( + "ESWIFT.TEST.T1136242.P3020470" + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_OVERWRITE_AUDIT_BY_ACCESS_LEVEL_NONE_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_access_level_multiple_request( + self, + ): + result = self.data_set_admin.overwrite_audit_rules_by_access_level( + "ESWIFT.TEST.T1136242.P3020470", alter="success", control="all" + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_OVERWRITE_AUDIT_BY_ACCESS_LEVEL_MULT_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_access_level_all_request( + self, + ): + result = self.data_set_admin.overwrite_audit_rules_by_access_level( + "ESWIFT.TEST.T1136242.P3020470", + alter="success", + control="all", + update="success", + read="failure", + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_OVERWRITE_AUDIT_BY_ACCESS_LEVEL_ALL_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_attempt_request(self): + result = self.data_set_admin.overwrite_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", failure="control" + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ATTEMPT_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_attempt_none_request(self): + result = self.data_set_admin.overwrite_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470" + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ATTEMPT_NONE_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_attempt_multiple_request( + self, + ): + result = self.data_set_admin.overwrite_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", success="alter", all="read" + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ATTEMPT_MULT_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_attempt_all_request(self): + result = self.data_set_admin.overwrite_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", + success="alter", + all="read", + failure="update", + ) + self.assertEqual( + result, + TestDataSetConstants.TEST_DATA_SET_OVERWRITE_AUDIT_RULES_BY_ATTEMPT_ALL_REQUEST_XML, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_attempt_value_error(self): + bad_success = "problem" + with self.assertRaises(ValueError) as exception: + self.data_set_admin.overwrite_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", success=bad_success + ) + error_string = ( + f"'{bad_success}' is not a valid access level. Valid access levels include " + + "'alter', 'control', 'read', and 'update'." + ) + self.assertEqual( + str(exception.exception), + error_string, + ) + + def test_data_set_admin_build_overwrite_audit_rules_by_attempt_value_duplicates( + self, + ): + success = "alter" + failure = "alter" + with self.assertRaises(ValueError) as exception: + self.data_set_admin.overwrite_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", success=success, failure=failure + ) + error_string = ( + f"'{success}' is provided as an 'Access Level' multiple times, which is not " + + "allowed." + ) + self.assertEqual( + str(exception.exception), + error_string, + ) + + @patch("pyracf.data_set.data_set_admin.DataSetAdmin.get_audit_rules") + def test_data_set_admin_build_alter_audit_rules_by_access_level_request( + self, + data_set_admin_get_audit_rules_mock: Mock, + ): + data_set_admin_get_audit_rules_mock.return_value = ( + TestDataSetConstants.TEST_GET_AUDIT_RULES + ) + self.assertEqual( + self.data_set_admin.alter_audit_rules_by_access_level( + "ESWIFT.TEST.T1136242.P3020470", alter="success" + ), + TestDataSetConstants.TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ACCESS_LEVEL_REQUEST_XML, + ) + + @patch("pyracf.data_set.data_set_admin.DataSetAdmin.get_audit_rules") + def test_data_set_admin_build_alter_audit_rules_by_access_level_none_request( + self, + data_set_admin_get_audit_rules_mock: Mock, + ): + data_set_admin_get_audit_rules_mock.return_value = ( + TestDataSetConstants.TEST_GET_AUDIT_RULES_WITH_ALL + ) + self.assertEqual( + self.data_set_admin.alter_audit_rules_by_access_level( + "ESWIFT.TEST.T1136242.P3020470" + ), + TestDataSetConstants.TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ACCESS_LEVEL_NONE_REQUEST_XML, + ) + + @patch("pyracf.data_set.data_set_admin.DataSetAdmin.get_audit_rules") + def test_data_set_admin_build_alter_audit_rules_by_access_level_multiple_request( + self, + data_set_admin_get_audit_rules_mock: Mock, + ): + data_set_admin_get_audit_rules_mock.return_value = ( + TestDataSetConstants.TEST_GET_AUDIT_RULES + ) + self.assertEqual( + self.data_set_admin.alter_audit_rules_by_access_level( + "ESWIFT.TEST.T1136242.P3020470", alter="success", control="failure" + ), + TestDataSetConstants.TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ACCESS_LEVEL_MULT_REQUEST_XML, + ) + + @patch("pyracf.data_set.data_set_admin.DataSetAdmin.get_audit_rules") + def test_data_set_admin_build_alter_audit_rules_by_access_level_all_request( + self, + data_set_admin_get_audit_rules_mock: Mock, + ): + data_set_admin_get_audit_rules_mock.return_value = ( + TestDataSetConstants.TEST_GET_AUDIT_RULES + ) + self.assertEqual( + self.data_set_admin.alter_audit_rules_by_access_level( + "ESWIFT.TEST.T1136242.P3020470", + alter="success", + control="failure", + update="all", + read="all", + ), + TestDataSetConstants.TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ACCESS_LEVEL_ALL_REQUEST_XML, + ) + + @patch("pyracf.data_set.data_set_admin.DataSetAdmin.get_audit_rules") + def test_data_set_admin_build_alter_audit_rules_by_attempt_request( + self, + data_set_admin_get_audit_rules_mock: Mock, + ): + data_set_admin_get_audit_rules_mock.return_value = ( + TestDataSetConstants.TEST_GET_AUDIT_RULES + ) + self.assertEqual( + self.data_set_admin.alter_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", failure="control" + ), + TestDataSetConstants.TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ATTEMPT_REQUEST_XML, + ) + + @patch("pyracf.data_set.data_set_admin.DataSetAdmin.get_audit_rules") + def test_data_set_admin_build_alter_audit_rules_by_attempt_none_request( + self, + data_set_admin_get_audit_rules_mock: Mock, + ): + data_set_admin_get_audit_rules_mock.return_value = ( + TestDataSetConstants.TEST_GET_AUDIT_RULES_WITH_ALL + ) + self.assertEqual( + self.data_set_admin.alter_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470" + ), + TestDataSetConstants.TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ATTEMPT_NONE_REQUEST_XML, + ) + + @patch("pyracf.data_set.data_set_admin.DataSetAdmin.get_audit_rules") + def test_data_set_admin_build_alter_audit_rules_by_attempt_multiple_request( + self, + data_set_admin_get_audit_rules_mock: Mock, + ): + data_set_admin_get_audit_rules_mock.return_value = ( + TestDataSetConstants.TEST_GET_AUDIT_RULES + ) + self.assertEqual( + self.data_set_admin.alter_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", failure="control", all="read" + ), + TestDataSetConstants.TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ATTEMPT_MULT_REQUEST_XML, + ) + + @patch("pyracf.data_set.data_set_admin.DataSetAdmin.get_audit_rules") + def test_data_set_admin_build_alter_audit_rules_by_attempt_all_request( + self, + data_set_admin_get_audit_rules_mock: Mock, + ): + data_set_admin_get_audit_rules_mock.return_value = ( + TestDataSetConstants.TEST_GET_AUDIT_RULES + ) + self.assertEqual( + self.data_set_admin.alter_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", + failure="control", + success="alter", + all="read", + ), + TestDataSetConstants.TEST_DATA_SET_ALTER_AUDIT_RULES_BY_ATTEMPT_ALL_REQUEST_XML, + ) + + def test_data_set_admin_build_alter_audit_rules_by_attempt_value_error(self): + bad_success = "problem" + bad_all = "value" + with self.assertRaises(ValueError) as exception: + self.data_set_admin.overwrite_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", + success=bad_success, + all=bad_all, + ) + error_string = ( + f"'{bad_success}' and '{bad_all}' are not valid access levels. Valid " + + "access levels include 'alter', 'control', 'read', and 'update'." + ) + self.assertEqual( + str(exception.exception), + error_string, + ) + + def test_data_set_admin_build_alter_audit_rules_by_attempt_value_error_all(self): + bad_success = "problem" + bad_failure = ["improper"] + bad_all = 1234 + with self.assertRaises(ValueError) as exception: + self.data_set_admin.overwrite_audit_rules_by_attempt( + "ESWIFT.TEST.T1136242.P3020470", + success=bad_success, + failure=bad_failure, + all=bad_all, + ) + error_string = ( + f"'{bad_success}', '{bad_failure}', and '{bad_all}' are not valid access levels. " + + "Valid access levels include 'alter', 'control', 'read', and 'update'." + ) + self.assertEqual( + str(exception.exception), + error_string, + ) diff --git a/tests/group/test_group_constants.py b/tests/group/test_group_constants.py index 478a1ca9..de2e8a56 100644 --- a/tests/group/test_group_constants.py +++ b/tests/group/test_group_constants.py @@ -88,7 +88,14 @@ def get_sample(sample_file: str) -> Union[str, bytes]: TEST_EXTRACT_GROUP_REQUEST_BASE_OMVS_XML = get_sample( "extract_group_request_base_omvs.xml" ) - +TEST_EXTRACT_GROUP_BASE_OMVS_TEMPLATE_TRAITS = { + "base:installation_data": None, + "base:model_data_set": None, + "base:owner": "eswift", + "base:superior_group": "sys1", + "base:terminal_universal_access": True, + "omvs:gid": 1234567, +} # Delete Group TEST_DELETE_GROUP_REQUEST_XML = get_sample("delete_group_request.xml") diff --git a/tests/group/test_group_result_parser.py b/tests/group/test_group_result_parser.py index 139e549a..40dd674e 100644 --- a/tests/group/test_group_result_parser.py +++ b/tests/group/test_group_result_parser.py @@ -239,3 +239,21 @@ def test_group_admin_can_parse_delete_group_error_xml( exception.exception.result, TestGroupConstants.TEST_DELETE_GROUP_RESULT_ERROR_DICTIONARY, ) + + # ============================================================================ + # Build template group + # ============================================================================ + + def test_group_admin_can_build_template_group_base_omvs_tso_revoke_resume_success_xml( + self, + call_racf_mock: Mock, + ): + call_racf_mock.return_value = ( + TestGroupConstants.TEST_EXTRACT_GROUP_RESULT_BASE_OMVS_SUCCESS_XML + ) + self.assertEqual( + self.group_admin.extract( + "testgrp0", segments=["omvs"], group_template=True + ), + TestGroupConstants.TEST_EXTRACT_GROUP_BASE_OMVS_TEMPLATE_TRAITS, + ) diff --git a/tests/common/common_log_samples/extract_resource_precheck_as_squidwrd.log b/tests/resource/resource_log_samples/extract_resource_base_as_squidwrd_success.log similarity index 79% rename from tests/common/common_log_samples/extract_resource_precheck_as_squidwrd.log rename to tests/resource/resource_log_samples/extract_resource_base_as_squidwrd_success.log index fa796fe7..b56b29bb 100644 --- a/tests/common/common_log_samples/extract_resource_precheck_as_squidwrd.log +++ b/tests/resource/resource_log_samples/extract_resource_base_as_squidwrd_success.log @@ -13,7 +13,7 @@ - + @@ -24,23 +24,19 @@ - + 0 0 0 - RLIST XFACILIT (IRR.IRRSMO00.PRECHECK) + RLIST ELIJTEST (TESTING) CLASS NAME ----- ---- - XFACILIT IRR.IRRSMO00.PRECHECK - - GROUP CLASS NAME - ----- ----- ---- - GXFACILI + ELIJTEST TESTING LEVEL OWNER UNIVERSAL ACCESS YOUR ACCESS WARNING ----- -------- ---------------- ----------- ------- - 00 ESWIFT READ READ NO + 00 ESWIFT READ ALTER NO INSTALLATION DATA ----------------- @@ -52,7 +48,7 @@ AUDITING -------- - FAILURES(READ) + SUCCESS(UPDATE),FAILURES(READ) NOTIFY ------ @@ -72,8 +68,8 @@ { "securityResult": { "resource": { - "name": "IRR.IRRSMO00.PRECHECK", - "class": "XFACILIT", + "name": "TESTING", + "class": "ELIJTEST", "operation": "listdata", "requestId": "ResourceRequest", "commands": [ @@ -81,19 +77,15 @@ "safReturnCode": 0, "returnCode": 0, "reasonCode": 0, - "image": "RLIST XFACILIT (IRR.IRRSMO00.PRECHECK) ", + "image": "RLIST ELIJTEST (TESTING) ", "messages": [ "CLASS NAME", "----- ----", - "XFACILIT IRR.IRRSMO00.PRECHECK", - " ", - "GROUP CLASS NAME", - "----- ----- ----", - "GXFACILI", + "ELIJTEST TESTING", " ", "LEVEL OWNER UNIVERSAL ACCESS YOUR ACCESS WARNING", "----- -------- ---------------- ----------- -------", - " 00 ESWIFT READ READ NO", + " 00 ESWIFT READ ALTER NO", " ", "INSTALLATION DATA", "-----------------", @@ -105,7 +97,7 @@ " ", "AUDITING", "--------", - "FAILURES(READ)", + "SUCCESS(UPDATE),FAILURES(READ)", " ", "NOTIFY", "------", @@ -129,8 +121,8 @@ { "securityResult": { "resource": { - "name": "IRR.IRRSMO00.PRECHECK", - "class": "XFACILIT", + "name": "TESTING", + "class": "ELIJTEST", "operation": "listdata", "requestId": "ResourceRequest", "commands": [ @@ -138,21 +130,21 @@ "safReturnCode": 0, "returnCode": 0, "reasonCode": 0, - "image": "RLIST XFACILIT (IRR.IRRSMO00.PRECHECK) ", + "image": "RLIST ELIJTEST (TESTING) ", "profiles": [ { "base": { - "class": "xfacilit", - "name": "irr.irrsmo00.precheck", - "groupClassName": "gxfacili", + "class": "elijtest", + "name": "testing", "level": 0, "owner": "eswift", "universalAccess": "read", - "yourAccess": "read", + "yourAccess": "alter", "warning": null, "installationData": null, "applicationData": null, "auditing": { + "success": "update", "failures": "read" }, "notify": null, diff --git a/tests/resource/test_resource_constants.py b/tests/resource/test_resource_constants.py index a1448ccd..8b742026 100644 --- a/tests/resource/test_resource_constants.py +++ b/tests/resource/test_resource_constants.py @@ -182,6 +182,9 @@ def get_sample(sample_file: str) -> Union[str, bytes]: TEST_EXTRACT_RESOURCE_BASE_SUCCESS_LOG = get_sample("extract_resource_base_success.log") TEST_EXTRACT_RESOURCE_BASE_ERROR_LOG = get_sample("extract_resource_base_error.log") +TEST_EXTRACT_RESOURCE_BASE_AS_SQUIDWRD_SUCCESS_LOG = get_sample( + "extract_resource_base_as_squidwrd_success.log" +) # ============================================================================ # Class Administration diff --git a/tests/resource/test_resource_getters.py b/tests/resource/test_resource_getters.py index 204fa3ff..ef7602f6 100644 --- a/tests/resource/test_resource_getters.py +++ b/tests/resource/test_resource_getters.py @@ -1,25 +1,29 @@ """Test general resource profile getter functions.""" +import contextlib +import io +import re import unittest from unittest.mock import Mock, patch import __init__ import tests.resource.test_resource_constants as TestResourceConstants -from pyracf import ResourceAdmin, SecurityRequestError +from pyracf import ResourceAdmin, SecurityRequestError, UserIdError # Resolves F401 __init__ -@patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") class TestResourceGetters(unittest.TestCase): maxDiff = None + ansi_escape = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") resource_admin = ResourceAdmin() # ============================================================================ # Universal Access # ============================================================================ + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_universal_access_returns_valid_when_read( self, call_racf_mock: Mock, @@ -31,6 +35,7 @@ def test_resource_admin_get_universal_access_returns_valid_when_read( self.resource_admin.get_universal_access("TESTING", "ELIJTEST"), "read" ) + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_universal_access_returns_valid_when_none( self, call_racf_mock: Mock, @@ -48,6 +53,7 @@ def test_resource_admin_get_universal_access_returns_valid_when_none( ) # Error in environment, TESTING already deleted/not added + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_universal_access_raises_an_exception_when_extract_fails( self, call_racf_mock: Mock, @@ -59,8 +65,9 @@ def test_resource_admin_get_universal_access_raises_an_exception_when_extract_fa self.resource_admin.get_universal_access("TESTING", "ELIJTEST") # ============================================================================ - # My Access + # Individual Access # ============================================================================ + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_my_access_read( self, call_racf_mock: Mock, @@ -72,6 +79,7 @@ def test_resource_admin_get_my_access_read( self.resource_admin.get_my_access("TESTING", "ELIJTEST"), "read" ) + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_my_access_none( self, call_racf_mock: Mock, @@ -87,6 +95,7 @@ def test_resource_admin_get_my_access_none( self.assertIsNone(self.resource_admin.get_my_access("TESTING", "ELIJTEST")) # Error in environment, TESTING already deleted/not added + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_my_access_raises_an_exception_when_extract_fails( self, call_racf_mock: Mock, @@ -97,9 +106,45 @@ def test_resource_admin_get_my_access_raises_an_exception_when_extract_fails( with self.assertRaises(SecurityRequestError): self.resource_admin.get_my_access("TESTING", "ELIJTEST") + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") + def test_get_user_access_general_resource( + self, + call_racf_mock: Mock, + ): + precheck_profile_as_squidwrd = ( + TestResourceConstants.TEST_EXTRACT_RESOURCE_RESULT_BASE_SUCCESS_XML + ) + precheck_profile_as_squidwrd = precheck_profile_as_squidwrd.replace( + " 00 ESWIFT READ READ NO", + " 00 ESWIFT READ ALTER NO", + ) + resource_admin = ResourceAdmin(debug=True, run_as_userid="ESWIFT") + call_racf_mock.return_value = precheck_profile_as_squidwrd + stdout = io.StringIO() + with contextlib.redirect_stdout(stdout): + access = resource_admin.get_user_access("TESTING", "ELIJTEST", "squidwrd") + success_log = self.ansi_escape.sub("", stdout.getvalue()) + self.assertEqual( + success_log, + TestResourceConstants.TEST_EXTRACT_RESOURCE_BASE_AS_SQUIDWRD_SUCCESS_LOG, + ) + self.assertEqual(access, "alter") + + def test_get_user_access_general_resource_raises_userid_error(self): + userid = "squidwrdtest" + resource_admin = ResourceAdmin(debug=True, run_as_userid="ESWIFT") + with self.assertRaises(UserIdError) as exception: + resource_admin.get_user_access("TESTING", "ELIJTEST", userid) + self.assertEqual( + exception.exception.message, + f"Unable to run as userid '{userid}'. Userid must " + + "be a string value between 1 to 8 characters in length.", + ) + # ============================================================================ # Auditing Rules # ============================================================================ + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_audit_rules( self, call_racf_mock: Mock, @@ -112,6 +157,7 @@ def test_resource_admin_get_audit_rules( TestResourceConstants.TEST_GET_AUDIT_RULES, ) + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_audit_rules_single( self, call_racf_mock: Mock, @@ -124,6 +170,7 @@ def test_resource_admin_get_audit_rules_single( TestResourceConstants.TEST_GET_AUDIT_RULES_SINGLE, ) + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_audit_rules_none( self, call_racf_mock: Mock, @@ -139,6 +186,7 @@ def test_resource_admin_get_audit_rules_none( self.assertIsNone(self.resource_admin.get_audit_rules("TESTING", "ELIJTEST")) # Error in environment, TESTING already deleted/not added + @patch("pyracf.common.irrsmo00.IRRSMO00.call_racf") def test_resource_admin_get_audit_rules_raises_an_exception_when_extract_fails( self, call_racf_mock: Mock, diff --git a/tests/resource/test_resource_result_parser.py b/tests/resource/test_resource_result_parser.py index 6b94b0e4..57b0a35a 100644 --- a/tests/resource/test_resource_result_parser.py +++ b/tests/resource/test_resource_result_parser.py @@ -399,3 +399,21 @@ def test_resource_admin_delete_can_check_for_unnecessary_refresh_xml( self.resource_admin.delete("TESTING", "ELIJTEST", check_refresh=True), False, ) + + # ============================================================================ + # Build template general resource profile + # ============================================================================ + + # def test_group_admin_can_build_template_group_base_omvs_tso_revoke_resume_success_xml( + # self, + # call_racf_mock: Mock, + # ): + # call_racf_mock.return_value = ( + # TestGroupConstants.TEST_EXTRACT_GROUP_RESULT_BASE_OMVS_SUCCESS_XML + # ) + # self.assertEqual( + # self.group_admin.extract( + # "testgrp0", segments=["omvs"], group_template=True + # ), + # TestGroupConstants.TEST_EXTRACT_GROUP_BASE_OMVS_TEMPLATE_TRAITS, + # ) diff --git a/tests/user/test_user_constants.py b/tests/user/test_user_constants.py index 3808e6e4..02aacfed 100644 --- a/tests/user/test_user_constants.py +++ b/tests/user/test_user_constants.py @@ -229,6 +229,39 @@ def get_sample(sample_file: str) -> Union[str, bytes]: TEST_EXTRACT_USER_REQUEST_BASE_OMVS_XML = get_sample( "extract_user_request_base_omvs.xml" ) +TEST_EXTRACT_USER_BASE_OMVS_TSO_REVOKE_RESUME_TEMPLATE_TRAITS = { + "base:name": "squidward", + "base:owner": "leonard", + "base:default_group": "sys1", + "base:revoke_date": "10/22/23", + "base:resume_date": "11/2/23", + "base:class_authorizations": [], + "base:logon_allowed_days": "anyday", + "base:logon_allowed_time": "anytime", + "base:security_level": None, + "base:security_label": None, + "omvs:uid": 1919, + "omvs:home_directory": "/u/squidward", + "omvs:default_shell": "/bin/sh", + "omvs:max_cpu_time": 1500, + "omvs:max_address_space_size": 10485760, + "omvs:max_files_per_process": 50, + "omvs:max_processes": 128, + "omvs:max_threads": 48, + "omvs:max_file_mapping_pages": 350, + "omvs:max_non_shared_memory": "4g", + "omvs:max_shared_memory": "2g", + "tso:account_number": "sb29", + "tso:hold_class": "a", + "tso:message_class": "b", + "tso:logon_procedure": "proc", + "tso:default_region_size": 1024, + "tso:max_region_size": 2048, + "tso:sysout_class": "o", + "tso:data_set_allocation_unit": "sysda", + "tso:user_data": "abcd", + "tso:logon_command": "ispf", +} # Delete User TEST_DELETE_USER_REQUEST_XML = get_sample("delete_user_request.xml") diff --git a/tests/user/test_user_result_parser.py b/tests/user/test_user_result_parser.py index 271003e6..120df303 100644 --- a/tests/user/test_user_result_parser.py +++ b/tests/user/test_user_result_parser.py @@ -537,3 +537,21 @@ def test_user_admin_custom_secret_redacted_on_error( f"({TestUserConstants.TEST_ALTER_USER_REQUEST_TRAITS_UID_ERROR['omvs:uid']})", exception.exception.result, ) + + # ============================================================================ + # Build template user + # ============================================================================ + + def test_user_admin_can_build_template_user_base_omvs_tso_revoke_resume_success_xml( + self, + call_racf_mock: Mock, + ): + call_racf_mock.return_value = ( + TestUserConstants.TEST_EXTRACT_USER_RESULT_BASE_OMVS_TSO_REVOKE_RESUME_XML + ) + self.assertEqual( + self.user_admin.extract( + "squidwrd", segments=["omvs", "tso"], user_template=True + ), + TestUserConstants.TEST_EXTRACT_USER_BASE_OMVS_TSO_REVOKE_RESUME_TEMPLATE_TRAITS, + )