Skip to content

Commit

Permalink
Reorginazation
Browse files Browse the repository at this point in the history
-Add back experimental tag (likely true at release)
-Trim down Regex comments
-Rearchitect racf_key_map and racf_message_key_map to belong to individual administration objects
-Move Unit test samples used ONLY for additional secrets testing to Common

Signed-off-by: Elijah Swift <elijah.swift@ibm.com>
  • Loading branch information
ElijahSwiftIBM committed Feb 28, 2024
1 parent d96978f commit ff53162
Show file tree
Hide file tree
Showing 22 changed files with 122 additions and 89 deletions.
14 changes: 13 additions & 1 deletion pyracf/common/security_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class SecurityAdmin:

_valid_segment_traits = {}
_extracted_key_value_pair_segment_traits_map = {}
_racf_key_map = {}
_racf_message_key_map = {}
_racf_key_map = {}
_racf_message_key_map = {}
_case_sensitive_extracted_values = []
__running_userid = None
_logger = Logger()
Expand Down Expand Up @@ -119,6 +123,7 @@ def __init__(
self._logger.log_experimental("Replace Existing Segment Traits")
self.__replace_valid_segment_traits(replace_existing_segment_traits)
if additional_secret_traits is not None:
self._logger.log_experimental("Add Additional Secret Traits")
self.__add_additional_secret_traits(additional_secret_traits)
self.set_running_userid(run_as_userid)

Expand Down Expand Up @@ -169,7 +174,12 @@ def __raw_dump(self) -> None:
if self.__debug:
# Note, since the hex dump is logged to the console,
# secrets will be redacted.
self._logger.log_hex_dump(raw_result_xml, self.__secret_traits)
self._logger.log_hex_dump(
raw_result_xml,
self.__secret_traits,
self._racf_key_map,
self._racf_message_key_map,
)

# ============================================================================
# Secrets Redaction
Expand Down Expand Up @@ -244,6 +254,8 @@ def _make_request(
self.__running_userid,
),
self.__secret_traits,
self._racf_key_map,
self._racf_message_key_map,
)
self.__clear_state(security_request)
if isinstance(raw_result, list):
Expand Down
76 changes: 36 additions & 40 deletions pyracf/common/utilities/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@
class Logger:
"""Logging for pyRACF."""

__racf_key_map = {
"audaltr": "audit",
"audcntl": "audit",
"audnone": "audit",
"audread": "audit",
"audupdt": "audit",
}

__racf_message_key_map = {"uacc": "universal access"}

__ansi_reset = "\033[0m"
__ansi_gray = "\033[2m"
__ansi_green = "\033[32m"
Expand Down Expand Up @@ -167,41 +157,36 @@ def __redact_string(
This function employs the following regular expressions explained below
Regex 1 ("quoted")
This is designed to match the pattern TRAIT('value') by matching the TRAIT name:
{trait_key.upper()}, a variable (potentially zero) amount of spaces ( +){{0,}}, then the
('value') portion which must start and end with (' and '), but can conceivably contain
any characters, but a negative lookbehind is used to look for any unescaped single quotes
.*?(?<!\') that would indicate the matching of (' and ') is otherwise a coincidence.
Regex 1 ("quoted") - {trait_key.upper()}( +){{0,}}\(\'.*?(?<!\')\'\)
This is designed to match the pattern TRAIT('value') by matching the TRAIT name, a
variable (potentially zero) amount of spaces, then the ('value') portion which must
start and end with (' and '), but can conceivably contain any characters, but a negative
lookbehind is used to look for any unescaped single quotes that would indicate the matching
of ') is otherwise a coincidence.
Regex 2 ("nested")
Regex 2 ("nested") - {trait_key.upper()}( +){{0,}}\([^)]*\(.*\)( +){{0,}}\)
This is designed to match the pattern TRAIT( subtrait1(value) subtrait2(value)) by
matching the TRAIT name: {trait_key.upper()}, a variable (potentially zero) amount of
spaces ( +){{0,}}, then the ( subtrait1(value) subtrait2(value)) portion which must
start and end with ( and ), but must also contain a nested set of opened parentheses
rather than a direct seqence of them. The pattern \([^)]*\( looks for these nested
open parenthesis as a sequence would have a ) character between them. Then the expression
allows any non-newline characters: .* and the "end pattern" of ) and ) separated by a
variable (potentially zero) whitespace.
matching the TRAIT name, a variable (potentially zero) amount of spaces, then the
( subtrait1(value) subtrait2(value)) portion which must start and end with ( and ),
but must also contain a nested set of opened parentheses rather than a direct seqence of
them. The pattern looks for these nested open parenthesis as a sequence would have a )
character between them. Then the expression allows any non-newline characters and the
"end pattern" of ) and ) separated by a variable (potentially zero) whitespace.
If neither of these two patterns is found for a supplied trait_key, then it is assumed
this trait is set with the default pattern below.
Regex 3 ("default")
This is designed to match the pattern TRAIT(value) by matching the TRAIT name:
{trait_key.upper()}, a variable (potentially zero) amount of spaces ( +){{0,}}, then the
(value) portion which must start and end with ( and ), but can conceivably contain any
characters, but a negative lookbehind is used to look for any escape \ character .*?(?<!\\)
that would indicate the matching of the ( and ) is otherwise a coincidence.
Regex 3 ("default") - {trait_key.upper()}( +){{0,}}\(.*?(?<!\\)\)
This is designed to match the pattern TRAIT(value) by matching the TRAIT name, a variable
(potentially zero) amount of spaces, then the (value) portion which must start and end
with ( and ), but can conceivably contain any characters, but a negative lookbehind
is used to look for any escape \ character that would indicate the matching of the
( and ) is otherwise a coincidence.
In all replacement expressions, the variable amounts of whitespace are captured so that
they can be preserved by this redaction operations. This is indicated by the \1 in the
replacement string.
they can be preserved by this redaction operations.
"""
# Regex 1 ("quoted") - {trait_key.upper()}( +){{0,}}\(\'.*?(?<!\')\'\)
# Regex 2 ("nested") - {trait_key.upper()}( +){{0,}}\([^)]*\(.*\)( +){{0,}}\)
# Regex 3 ("default") - {trait_key.upper()}( +){{0,}}\(.*?(?<!\\)\)
asterisks = "********"
is_bytes = False
if isinstance(input_string, bytes):
Expand Down Expand Up @@ -286,6 +271,8 @@ def redact_result_xml(
self,
security_result: Union[str, bytes, List[int]],
secret_traits: dict,
racf_key_map: dict = {},
racf_message_key_map: dict = {},
) -> str:
"""
Redacts a list of specific secret traits in a result xml string.
Expand All @@ -299,8 +286,9 @@ def redact_result_xml(
return security_result
for xml_key in secret_traits.values():
racf_key = xml_key.split(":")[1] if ":" in xml_key else xml_key
if racf_key in self.__racf_key_map:
racf_key = self.__racf_key_map[racf_key]
if racf_key in racf_key_map:
print(racf_key_map[racf_key])
racf_key = racf_key_map[racf_key]
if isinstance(security_result, bytes):
match = re.search(
rf"{racf_key.upper()}( +){{0,}}\(", security_result.decode("cp1047")
Expand All @@ -310,8 +298,8 @@ def redact_result_xml(
if not match:
continue
security_result = self.__redact_string(security_result, racf_key)
if racf_key in self.__racf_message_key_map:
racf_key = self.__racf_message_key_map[racf_key]
if racf_key in racf_message_key_map:
racf_key = racf_message_key_map[racf_key]
if isinstance(security_result, bytes):
security_result = re.sub(
rf"<message>([A-Z]*[0-9]*[A-Z]) [^<>]*{racf_key.upper()}[^<>]*<\/message>",
Expand Down Expand Up @@ -457,7 +445,13 @@ def __indent_xml(self, minified_xml: str) -> str:
indented_xml += f"{' ' * indent_level}{current_line}\n"
return indented_xml[:-2]

def log_hex_dump(self, raw_result_xml: bytes, secret_traits: dict) -> None:
def log_hex_dump(
self,
raw_result_xml: bytes,
secret_traits: dict,
racf_key_map: dict,
racf_message_key_map: dict,
) -> None:
"""
Log the raw result XML returned by IRRSMO00 as a hex dump.
"""
Expand All @@ -470,6 +464,8 @@ def log_hex_dump(self, raw_result_xml: bytes, secret_traits: dict) -> None:
raw_result_xml = self.redact_result_xml(
raw_result_xml,
secret_traits,
racf_key_map,
racf_message_key_map,
)
for byte in raw_result_xml:
color_function = self.__green
Expand Down
8 changes: 8 additions & 0 deletions pyracf/data_set/data_set_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ def __init__(
"dfp": {"dfp:owner": "racf:resowner", "dfp:ckds_data_key": "racf:datakey"},
"tme": {"tme:roles": "racf:roles"},
}
self._racf_key_map = {
"audaltr": "audit",
"audcntl": "audit",
"audnone": "audit",
"audread": "audit",
"audupdt": "audit",
}
self._racf_message_key_map = {"uacc": "universal access"}
self._valid_segment_traits["base"].update(
self._common_base_traits_data_set_generic
)
Expand Down
8 changes: 8 additions & 0 deletions pyracf/resource/resource_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ def __init__(
"sigrequired": "signatureRequired",
},
}
self._racf_key_map = {
"audaltr": "audit",
"audcntl": "audit",
"audnone": "audit",
"audread": "audit",
"audupdt": "audit",
}
self._racf_message_key_map = {"uacc": "universal access"}
super().__init__(
"resource",
irrsmo00_result_buffer_size=irrsmo00_result_buffer_size,
Expand Down
25 changes: 13 additions & 12 deletions tests/common/test_additional_secrets_redaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import __init__

import tests.common.test_common_constants as TestCommonConstants
import tests.data_set.test_data_set_constants as TestDataSetConstants
import tests.group.test_group_constants as TestGroupConstants
import tests.resource.test_resource_constants as TestResourceConstants
Expand Down Expand Up @@ -43,7 +44,7 @@ def test_user_admin_custom_secret_redacted_on_success(
)
self.assertEqual(
result,
TestUserConstants.TEST_ALTER_USER_RESULT_EXTENDED_SUCCESS_DICTIONARY,
TestCommonConstants.TEST_ALTER_USER_RESULT_SUCCESS_UID_SECRET_DICTIONARY,
)
self.assertNotIn(
TestUserConstants.TEST_ALTER_USER_REQUEST_TRAITS_EXTENDED["omvs:uid"],
Expand All @@ -67,7 +68,7 @@ def test_user_admin_custom_secret_redacted_on_error(
)
self.assertEqual(
exception.exception.result,
TestUserConstants.TEST_ALTER_USER_RESULT_ERROR_UID_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_USER_RESULT_ERROR_UID_SECRET_DICTIONARY,
)
self.assertNotIn(
TestUserConstants.TEST_ALTER_USER_REQUEST_TRAITS_UID_ERROR["omvs:uid"],
Expand All @@ -82,15 +83,15 @@ def test_user_admin_custom_secret_redacted_when_complex_characters(
user_admin = UserAdmin(additional_secret_traits=["base:installation_data"])
call_racf_mock.side_effect = [
TestUserConstants.TEST_EXTRACT_USER_RESULT_BASE_ONLY_SUCCESS_XML,
TestUserConstants.TEST_ALTER_USER_RESULT_INST_DATA_SUCCESS_XML,
TestCommonConstants.TEST_ALTER_USER_RESULT_INST_DATA_SUCCESS_XML,
]
result = user_admin.alter(
"squidwrd",
traits=TestUserConstants.TEST_ALTER_USER_REQUEST_TRAITS_INST_DATA,
)
self.assertEqual(
result,
TestUserConstants.TEST_ALTER_USER_RESULT_SUCCESS_INST_DATA_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_USER_RESULT_SUCCESS_INST_DATA_SECRET_DICTIONARY,
)
self.assertNotIn(
TestUserConstants.TEST_ALTER_USER_REQUEST_TRAITS_INST_DATA[
Expand Down Expand Up @@ -134,7 +135,7 @@ def test_group_admin_custom_secret_redacted_on_success(
)
self.assertEqual(
result,
TestGroupConstants.TEST_ALTER_GROUP_RESULT_SUCCESS_GID_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_GROUP_RESULT_SUCCESS_GID_SECRET_DICTIONARY,
)
self.assertNotIn(
TestGroupConstants.TEST_ALTER_GROUP_REQUEST_TRAITS["omvs:gid"],
Expand All @@ -158,7 +159,7 @@ def test_group_admin_custom_secret_redacted_on_error(
)
self.assertEqual(
exception.exception.result,
TestGroupConstants.TEST_ALTER_GROUP_RESULT_ERROR_GID_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_GROUP_RESULT_ERROR_GID_SECRET_DICTIONARY,
)
self.assertNotIn(
TestGroupConstants.TEST_ALTER_GROUP_REQUEST_ERROR_TRAITS["omvs:gid"],
Expand Down Expand Up @@ -200,7 +201,7 @@ def test_resource_admin_custom_secret_redacted_on_success(
)
self.assertEqual(
result,
TestResourceConstants.TEST_ALTER_RESOURCE_RESULT_SUCCESS_OWNER_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_RESOURCE_RESULT_SUCCESS_OWNER_SECRET_DICTIONARY,
)
self.assertNotIn(
TestResourceConstants.TEST_ALTER_RESOURCE_REQUEST_TRAITS["base:owner"],
Expand All @@ -215,7 +216,7 @@ def test_resource_admin_custom_mapped_secret_redacted_on_success(
resource_admin = ResourceAdmin(additional_secret_traits=["base:audit_update"])
call_racf_mock.side_effect = [
TestResourceConstants.TEST_EXTRACT_RESOURCE_RESULT_BASE_SUCCESS_XML,
TestResourceConstants.TEST_ALTER_RESOURCE_OVERWRITE_AUDIT_RESULT_SUCCESS_XML,
TestCommonConstants.TEST_ALTER_RESOURCE_OVERWRITE_AUDIT_RESULT_SUCCESS_XML,
]
result = resource_admin.overwrite_audit_rules_by_access_level(
"TESTING",
Expand All @@ -224,7 +225,7 @@ def test_resource_admin_custom_mapped_secret_redacted_on_success(
)
self.assertEqual(
result,
TestResourceConstants.TEST_ALTER_RESOURCE_RESULT_SUCCESS_AUDIT_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_RESOURCE_RESULT_SUCCESS_AUDIT_SECRET_DICTIONARY,
)
self.assertNotIn(
"ALL",
Expand All @@ -250,7 +251,7 @@ def test_resource_admin_custom_secret_redacted_on_error(
)
self.assertEqual(
exception.exception.result,
TestResourceConstants.TEST_ALTER_RESOURCE_RESULT_ERROR_UACC_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_RESOURCE_RESULT_ERROR_UACC_SECRET_DICTIONARY,
)
self.assertNotIn(
TestResourceConstants.TEST_ALTER_RESOURCE_REQUEST_ERROR_TRAITS[
Expand Down Expand Up @@ -295,7 +296,7 @@ def test_data_set_admin_custom_secret_redacted_on_success(
)
self.assertEqual(
result,
TestDataSetConstants.TEST_ALTER_DATA_SET_RESULT_SUCCESS_OWNER_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_DATA_SET_RESULT_SUCCESS_OWNER_SECRET_DICTIONARY,
)
self.assertNotIn(
TestDataSetConstants.TEST_ALTER_DATA_SET_REQUEST_TRAITS["base:owner"],
Expand All @@ -320,7 +321,7 @@ def test_data_set_admin_custom_secret_redacted_on_error(
)
self.assertEqual(
exception.exception.result,
TestDataSetConstants.TEST_ALTER_DATA_SET_RESULT_ERROR_UACC_SECRET_DICTIONARY,
TestCommonConstants.TEST_ALTER_DATA_SET_RESULT_ERROR_UACC_SECRET_DICTIONARY,
)
self.assertNotIn(
TestDataSetConstants.TEST_ALTER_DATA_SET_REQUEST_TRAITS[f"{secret_trait}"],
Expand Down
44 changes: 44 additions & 0 deletions tests/common/test_common_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,50 @@ def get_sample(sample_file: str) -> Union[str, bytes]:
"extract_user_result_base_omvs_csdata_success.json"
)

# ============================================================================
# Additional Secrets Redaction
# ============================================================================

TEST_ALTER_DATA_SET_RESULT_SUCCESS_OWNER_SECRET_DICTIONARY = get_sample(
"alter_data_set_result_success_owner_secret.json"
)
TEST_ALTER_DATA_SET_RESULT_ERROR_UACC_SECRET_DICTIONARY = get_sample(
"alter_data_set_result_error_uacc_secret.json"
)

TEST_ALTER_GROUP_RESULT_SUCCESS_GID_SECRET_DICTIONARY = get_sample(
"alter_group_result_success_gid_secret.json"
)
TEST_ALTER_GROUP_RESULT_ERROR_GID_SECRET_DICTIONARY = get_sample(
"alter_group_result_error_gid_secret.json"
)

TEST_ALTER_RESOURCE_OVERWRITE_AUDIT_RESULT_SUCCESS_XML = get_sample(
"alter_resource_overwrite_audit_result_success.xml"
)
TEST_ALTER_RESOURCE_RESULT_SUCCESS_OWNER_SECRET_DICTIONARY = get_sample(
"alter_resource_result_success_owner_secret.json"
)
TEST_ALTER_RESOURCE_RESULT_SUCCESS_AUDIT_SECRET_DICTIONARY = get_sample(
"alter_resource_result_success_audit_secret.json"
)
TEST_ALTER_RESOURCE_RESULT_ERROR_UACC_SECRET_DICTIONARY = get_sample(
"alter_resource_result_error_uacc_secret.json"
)

TEST_ALTER_USER_RESULT_SUCCESS_UID_SECRET_DICTIONARY = get_sample(
"alter_user_result_success_uid_secret.json"
)
TEST_ALTER_USER_RESULT_ERROR_UID_SECRET_DICTIONARY = get_sample(
"alter_user_result_error_uid_secret.json"
)
TEST_ALTER_USER_RESULT_INST_DATA_SUCCESS_XML = get_sample(
"alter_user_result_success_inst_data.xml"
)
TEST_ALTER_USER_RESULT_SUCCESS_INST_DATA_SECRET_DICTIONARY = get_sample(
"alter_user_result_success_inst_data_secret.json"
)

# ============================================================================
# Run As UserId
# ============================================================================
Expand Down
6 changes: 0 additions & 6 deletions tests/data_set/test_data_set_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,10 @@ def get_sample(sample_file: str) -> Union[str, bytes]:
TEST_ALTER_DATA_SET_RESULT_SUCCESS_DICTIONARY = get_sample(
"alter_data_set_result_success.json"
)
TEST_ALTER_DATA_SET_RESULT_SUCCESS_OWNER_SECRET_DICTIONARY = get_sample(
"alter_data_set_result_success_owner_secret.json"
)
TEST_ALTER_DATA_SET_RESULT_ERROR_XML = get_sample("alter_data_set_result_error.xml")
TEST_ALTER_DATA_SET_RESULT_ERROR_DICTIONARY = get_sample(
"alter_data_set_result_error.json"
)
TEST_ALTER_DATA_SET_RESULT_ERROR_UACC_SECRET_DICTIONARY = get_sample(
"alter_data_set_result_error_uacc_secret.json"
)

# Extract Data Set
TEST_EXTRACT_DATA_SET_RESULT_BASE_ONLY_SUCCESS_XML = get_sample(
Expand Down
Loading

0 comments on commit ff53162

Please sign in to comment.