Skip to content

Commit

Permalink
Add "missing" functions to dataset admin
Browse files Browse the repository at this point in the history
-Add get "individual" access to dataset admin (get user access)
-Add unit test for get user access to dataset admin
-Move resource get user access unit tests out of common into resource
-Add get audit rules functions for dataset admin
-Add get audit rules unit tests to dataset admin
-Add set audit rules functions for dataset admin
-Add set audit rules unit tests for dataset admin
-Add mapping to build template profile to extract functions and add unit tests for user and group mappings

Signed-off-by: Elijah Swift <elijah.swift@ibm.com>
  • Loading branch information
ElijahSwiftIBM committed Feb 19, 2024
1 parent e23def4 commit 024da09
Show file tree
Hide file tree
Showing 37 changed files with 1,219 additions and 155 deletions.
103 changes: 103 additions & 0 deletions pyracf/common/security_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import platform
import re
import xml.etree.ElementTree
from collections import Counter
from datetime import datetime
from typing import Any, List, Tuple, Union

Expand Down Expand Up @@ -189,6 +190,77 @@ def __add_additional_secret_traits(self, additional_secret_traits: list) -> None
continue
self.__secret_traits[secret] = self._valid_segment_traits[segment][secret]

# ============================================================================
# Common Subfunctions for Child Classes
# ============================================================================
def _validate_access_levels(
self,
success: Union[str, None] = None,
failure: Union[str, None] = None,
all: Union[str, None] = None,
):
valid_access_levels = ("alter", "control", "read", "update")
value_error_text = (
"Valid access levels include 'alter', 'control', 'read', and 'update'."
)
bad_access_levels = []
for attempt_argument in (success, failure, all):
if (
attempt_argument is not None
and str(attempt_argument).lower() not in valid_access_levels
):
bad_access_levels.append(attempt_argument)
match len(bad_access_levels):
case 0:
self.__check_for_duplicates([success, failure, all], "Access Level")
return
case 1:
value_error_text = (
f"'{bad_access_levels[0]}' is not a valid access level. "
+ f"{value_error_text}"
)
case 2:
value_error_text = (
f"'{bad_access_levels[0]}' and '{bad_access_levels[1]}' are not valid "
+ f"access levels. {value_error_text}"
)
case _:
bad_access_levels = [
f"'{bad_access_level}'" for bad_access_level in bad_access_levels
]
bad_access_levels[-1] = f"and {bad_access_levels[-1]} "
value_error_text = (
f"{', '.join(bad_access_levels)}are not valid access levels. "
+ f"{value_error_text}"
)
raise ValueError(value_error_text)

def _build_traits_from_audit_rules(self, audit_rules: dict) -> dict:
traits = {}
if "success" in audit_rules:
traits[f"base:audit_{audit_rules['success']}"] = "success"
if "failures" in audit_rules:
traits[f"base:audit_{audit_rules['failures']}"] = "failure"
if "all" in audit_rules:
traits[f"base:audit_{audit_rules['all']}"] = "all"
return traits

def __check_for_duplicates(self, argument_list: list, argument: str) -> None:
duplicates = [
key
for (key, value) in Counter(argument_list).items()
if (value > 1 and key is not None)
]
if duplicates == []:
return
value_error_text = []
for duplicate in duplicates:
value_error_text.append(
f"'{duplicate}' is provided as an '{argument}' multiple times, which is not "
+ "allowed."
)
raise ValueError("\n".join(value_error_text))

# ============================================================================
# Request Execution
# ============================================================================
Expand Down Expand Up @@ -444,6 +516,32 @@ def _get_profile(
index
]

def _build_template(
self,
profile: dict,
) -> dict:
"""Map the profile to a template for an add or alter request."""
if self._generate_requests_only:
# Allows this function to work with "self._generate_requests_only" mode.
return profile
template = {}
for segment in profile.keys():
for trait in profile[segment].keys():
if (
f"{segment}:{self.__camel_case_to_snake_case(trait)}"
not in self._valid_segment_traits[segment]
):
continue
template[f"{segment}:{self.__camel_case_to_snake_case(trait)}"] = (
profile[segment][trait]
)
if "Date" in trait:
template[f"{segment}:{self.__camel_case_to_snake_case(trait)}"] = (
re.sub(r"20([0-9][0-9])", r"\1", profile[segment][trait])
)
print(template)
return template

def _get_field(
self,
profile: Union[dict, bytes],
Expand Down Expand Up @@ -975,6 +1073,11 @@ def _profile_field_to_camel_case(self, segment: str, field: str) -> str:
[field_token.title() for field_token in field_tokens[1:]]
)

def __camel_case_to_snake_case(self, string: str) -> str:
"""Convert a camel case string to snake case"""
string = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", string)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", string).lower()

# ============================================================================
# Result Dictionary Parsing Functions
# ============================================================================
Expand Down
128 changes: 126 additions & 2 deletions pyracf/data_set/data_set_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def __init__(
)

# ============================================================================
# Access
# Universal Access
# ============================================================================
def get_universal_access(self, data_set: str) -> Union[str, bytes, None]:
"""Get universal access for data set profile."""
Expand All @@ -94,11 +94,132 @@ def set_universal_access(
result = self.alter(data_set, {"base:universal_access": universal_acccess})
return self._to_steps(result)

# ============================================================================
# Individual Access
# ============================================================================
def get_my_access(self, data_set: str) -> Union[str, bytes, None]:
"""Get the access associated with your own data set profile."""
"""Get your own level of access associated with a data set profile."""
profile = self.extract(data_set, profile_only=True)
return self._get_field(profile, "base", "yourAccess")

def get_user_access(self, data_set: str, userid: str) -> Union[str, bytes, None]:
"""Get a target user's own level of access associated with a data set profile."""
original_userid = self.get_running_userid()
self.set_running_userid(userid)
profile = self.extract(data_set, profile_only=True)
self.set_running_userid(original_userid)
return self._get_field(profile, "base", "yourAccess")

# ============================================================================
# Auditing Rules
# ============================================================================
def get_audit_rules(self, data_set: str) -> Union[dict, bytes, None]:
"""Get the auditing rules associated with this data set profile."""
profile = self.extract(data_set, profile_only=True)
return self._get_field(profile, "base", "auditing")

def overwrite_audit_rules_by_attempt(
self,
data_set: str,
success: Union[str, None] = None,
failure: Union[str, None] = None,
all: Union[str, None] = None,
) -> Union[dict, bytes]:
"""
Overwrites the auditing rules for this data set profile with new
rules to audit based on specified access attempts.
"""
self._validate_access_levels(success, failure, all)
traits = {}
if success is not None:
traits[f"base:audit_{success}"] = "success"
if failure is not None:
traits[f"base:audit_{failure}"] = "failure"
if all is not None:
traits[f"base:audit_{all}"] = "all"
result = self.alter(data_set, traits=traits)
return self._to_steps(result)

def alter_audit_rules_by_attempt(
self,
data_set: str,
success: Union[str, None] = None,
failure: Union[str, None] = None,
all: Union[str, None] = None,
) -> Union[dict, bytes]:
"""
Alters the auditing rules for this data set profile with new rules
to audit by access level, preserving existing non-conflicting rules.
"""
self._validate_access_levels(success, failure, all)
audit_rules = self.get_audit_rules(data_set)
traits = self._build_traits_from_audit_rules(audit_rules)
if success is not None:
traits[f"base:audit_{success}"] = "success"
if failure is not None:
traits[f"base:audit_{failure}"] = "failure"
if all is not None:
traits[f"base:audit_{all}"] = "all"
result = self.alter(data_set, traits=traits)
return self._to_steps(result)

def overwrite_audit_rules_by_access_level(
self,
data_set: str,
alter: Union[str, None] = None,
control: Union[str, None] = None,
read: Union[str, None] = None,
update: Union[str, None] = None,
) -> Union[dict, bytes]:
"""
Overwrites the auditing rules for this data set profile with new
rules to audit based on specified access levels.
"""
traits = {}
if alter is not None:
traits["base:audit_alter"] = alter
if control is not None:
traits["base:audit_control"] = control
if read is not None:
traits["base:audit_read"] = read
if update is not None:
traits["base:audit_update"] = update
result = self.alter(data_set, traits=traits)
return self._to_steps(result)

def alter_audit_rules_by_access_level(
self,
data_set: str,
alter: Union[str, None] = None,
control: Union[str, None] = None,
read: Union[str, None] = None,
update: Union[str, None] = None,
) -> Union[dict, bytes]:
"""
Alters the auditing rules for this data set profile with a new
rule to audit alter access, preserving existing non-conflicting rules.
"""
audit_rules = self.get_audit_rules(data_set)
traits = self._build_traits_from_audit_rules(audit_rules)
if alter is not None:
traits["base:audit_alter"] = alter
if control is not None:
traits["base:audit_control"] = control
if read is not None:
traits["base:audit_read"] = read
if update is not None:
traits["base:audit_update"] = update
result = self.alter(data_set, traits=traits)
return self._to_steps(result)

def remove_all_audit_rules(
self,
data_set: str,
) -> Union[dict, bytes]:
"""Clears the auditing rules completely."""
result = self.alter(data_set, {"base:audit_none": True})
return self._to_steps(result)

# ============================================================================
# Base Functions
# ============================================================================
Expand Down Expand Up @@ -162,6 +283,7 @@ def extract(
volume: Union[str, None] = None,
generic: bool = False,
profile_only: bool = False,
data_set_template: bool = False,
) -> Union[dict, bytes]:
"""Extract a data set profile."""
self._build_segment_dictionary(segments)
Expand All @@ -170,6 +292,8 @@ def extract(
result = self._extract_and_check_result(data_set_request)
if profile_only:
return self._get_profile(result)
if data_set_template:
return self._build_template(self._get_profile(result))
return result

def delete(
Expand Down
10 changes: 8 additions & 2 deletions pyracf/group/group_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(
self._valid_segment_traits = {
"base": {
"base:installation_data": "racf:data",
"base:data_set_model": "racf:model",
"base:model_data_set": "racf:model",
"base:owner": "racf:owner",
"base:superior_group": "racf:supgroup",
"base:terminal_universal_access": "racf:termuacc",
Expand Down Expand Up @@ -161,7 +161,11 @@ def alter(self, group: str, traits: dict) -> Union[dict, bytes]:
return self._make_request(group_request, irrsmo00_precheck=True)

def extract(
self, group: str, segments: List[str] = [], profile_only: bool = False
self,
group: str,
segments: List[str] = [],
profile_only: bool = False,
group_template=False,
) -> Union[dict, bytes]:
"""Extract a group's profile."""
self._build_segment_dictionary(segments)
Expand All @@ -170,6 +174,8 @@ def extract(
result = self._extract_and_check_result(group_request)
if profile_only:
return self._get_profile(result)
if group_template:
return self._build_template(self._get_profile(result))
return result

def delete(self, group: str) -> Union[dict, bytes]:
Expand Down
Loading

0 comments on commit 024da09

Please sign in to comment.