Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robust error handling #26

Merged
merged 21 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyracf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Make security admin subclasses available from package root."""
from .access.access_admin import AccessAdmin
from .common.add_operation_error import AddOperationError
from .common.alter_operation_error import AlterOperationError
from .common.security_request_error import SecurityRequestError
from .common.segment_error import SegmentError
from .common.segment_trait_error import SegmentTraitError
from .connection.connection_admin import ConnectionAdmin
from .data_set.data_set_admin import DataSetAdmin
from .group.group_admin import GroupAdmin
Expand Down
25 changes: 4 additions & 21 deletions pyracf/access/access_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from typing import List, Union

from pyracf.access.access_request import AccessRequest
from pyracf.common.security_admin import SecurityAdmin

from .access_request import AccessRequest


class AccessAdmin(SecurityAdmin):
"""RACF Access Administration."""
Expand Down Expand Up @@ -53,7 +54,7 @@ def __init__(
# ============================================================================
# Base Functions
# ============================================================================
def add(
def permit(
self,
resource: str,
class_name: str,
Expand All @@ -62,31 +63,13 @@ def add(
volume: Union[str, None] = None,
generic: bool = False,
) -> Union[dict, bytes]:
"""Create a new permission."""
"""Change a permission (or add a new one)"""
traits["base:id"] = auth_id
self._build_segment_dictionaries(traits)
access_request = AccessRequest(resource, class_name, "set", volume, generic)
self._add_traits_directly_to_request_xml_with_no_segments(access_request)
return self._make_request(access_request)

def alter(
self,
resource: str,
class_name: str,
auth_id: str,
traits: dict,
volume: Union[str, None] = None,
generic: bool = False,
) -> Union[dict, bytes]:
"""Alter an existing permission."""
traits["base:id"] = auth_id
self._build_segment_dictionaries(traits)
access_request = AccessRequest(resource, class_name, "set", volume, generic)
self._add_traits_directly_to_request_xml_with_no_segments(
access_request, alter=True
)
return self._make_request(access_request)

def delete(
self,
resource: str,
Expand Down
24 changes: 24 additions & 0 deletions pyracf/common/add_operation_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Exception to use when Add operation would alter an existing profile."""


class AddOperationError(Exception):
"""
lcarcaramo marked this conversation as resolved.
Show resolved Hide resolved
Raised when a profile cannot be added because it already exists.
"""

def __init__(self, profile_name: str, class_name: str) -> None:
lcarcaramo marked this conversation as resolved.
Show resolved Hide resolved
self.message = "Security request made to IRRSMO00 failed."
lcarcaramo marked this conversation as resolved.
Show resolved Hide resolved
admin_types = ["user", "group", "dataSet"]
if class_name not in admin_types:
self.message += (
"\n\nTarget profile "
+ f"'{profile_name}' already exists as a profile in the '{class_name}' class."
)
else:
self.message += (
"\n\nTarget profile "
+ f"'{profile_name}' already exists as a '{class_name}' profile."
)

def __str__(self) -> str:
return self.message
24 changes: 24 additions & 0 deletions pyracf/common/alter_operation_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Exception to use when Alter operation would add a new profile."""


class AlterOperationError(Exception):
"""
lcarcaramo marked this conversation as resolved.
Show resolved Hide resolved
Raised when a profile cannot be altered because it does not exist.
"""

def __init__(self, profile_name: str, class_name: str) -> None:
lcarcaramo marked this conversation as resolved.
Show resolved Hide resolved
self.message = "Security request made to IRRSMO00 failed."
admin_types = ["user", "group", "dataSet"]
if class_name not in admin_types:
self.message += (
"\n\nTarget profile "
+ f"'{profile_name}' does not exist as a profile in the '{class_name}' class."
)
else:
self.message += (
"\n\nTarget profile "
+ f"'{profile_name}' does not exist as a '{class_name}' profile."
)

def __str__(self) -> str:
return self.message
2 changes: 1 addition & 1 deletion pyracf/common/irrsmo00.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self) -> None:

def call_racf(self, request_xml: bytes, precheck: bool = False) -> str:
"""Make request to call_irrsmo00 in the cpyracf Python extension."""
options = 11 if precheck else 9
options = 15 if precheck else 13
lcarcaramo marked this conversation as resolved.
Show resolved Hide resolved
return call_irrsmo00(
xml_str=request_xml, xml_len=len(request_xml), opts=options
).decode("cp1047")
46 changes: 32 additions & 14 deletions pyracf/common/security_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from .security_request import SecurityRequest
from .security_request_error import SecurityRequestError
from .security_result import SecurityResult
from .segment_error import SegmentError
from .segment_trait_error import SegmentTraitError


class SecurityAdmin:
Expand Down Expand Up @@ -64,13 +66,13 @@ def __init__(
"base:passphrase": "racf:phrase",
}
self.__irrsmo00 = IRRSMO00()
self.__profile_type = profile_type
self._profile_type = profile_type
self._segment_traits = {}
# used to preserve segment traits for debug logging.
self.__preserved_segment_traits = {}
self._trait_map = {}
self.__debug = debug
self.__generate_requests_only = generate_requests_only
self._generate_requests_only = generate_requests_only
if update_existing_segment_traits is not None:
self.__update_valid_segment_traits(update_existing_segment_traits)
if replace_existing_segment_traits is not None:
Expand Down Expand Up @@ -123,7 +125,7 @@ def _extract_and_check_result(
) -> dict:
"""Extract a RACF profile."""
result = self._make_request(security_request)
if self.__generate_requests_only:
if self._generate_requests_only:
return result
self._format_profile(result)
if self.__debug:
Expand Down Expand Up @@ -155,7 +157,7 @@ def _make_request(
security_request.dump_request_xml(encoding="utf-8"),
secret_traits=self.__secret_traits,
)
if self.__generate_requests_only:
if self._generate_requests_only:
request_xml = self.__logger.redact_request_xml(
security_request.dump_request_xml(encoding="utf-8"),
secret_traits=self.__secret_traits,
Expand Down Expand Up @@ -202,7 +204,7 @@ def _to_steps(self, results: Union[List[dict], dict, bytes]) -> Union[dict, byte
"""
if isinstance(results, dict) or isinstance(results, bytes):
results = [results]
if self.__generate_requests_only:
if self._generate_requests_only:
concatenated_xml = b""
for request_xml in results:
if request_xml:
Expand Down Expand Up @@ -260,17 +262,33 @@ def __validate_and_add_trait(

def _build_bool_segment_dictionaries(self, segments: List[str]) -> None:
"""Build segment dictionaries for profile extract."""
bad_segments = []
for segment in segments:
if segment in self._valid_segment_traits:
self._segment_traits[segment] = True
else:
bad_segments.append(segment)
if bad_segments:
raise SegmentError(bad_segments, self._profile_type)
# preserve segment traits for debug logging.
self.__preserved_segment_traits = self._segment_traits

def _build_segment_dictionaries(self, traits: dict) -> None:
"""Build segemnt dictionaries for each segment."""
bad_traits = []
for trait in traits:
trait_valid = False
for segment in self._valid_segment_traits:
self.__validate_and_add_trait(trait, segment, traits[trait])
trait_valid = self.__validate_and_add_trait(
trait, segment, traits[trait]
)
if trait_valid:
break
if not trait_valid:
bad_traits.append(trait)
if bad_traits:
raise SegmentTraitError(bad_traits, self._profile_type)

# preserve segment traits for debug logging.
self.__preserved_segment_traits = self._segment_traits

Expand Down Expand Up @@ -302,10 +320,10 @@ def _get_profile(
self, result: Union[dict, bytes], index: int = 0
) -> Union[dict, bytes]:
"""Extract the profile section from a result dictionary."""
if self.__generate_requests_only:
# Allows this function to work with "self.__generate_requests_only" mode.
if self._generate_requests_only:
# Allows this function to work with "self._generate_requests_only" mode.
return result
return result["securityResult"][self.__profile_type]["commands"][0]["profiles"][
return result["securityResult"][self._profile_type]["commands"][0]["profiles"][
index
]

Expand All @@ -317,8 +335,8 @@ def _get_field(
string: bool = False,
) -> Union[bytes, Any, None]:
"""Extract the value of a field from a segment in a profile."""
if self.__generate_requests_only:
# Allows this function to work with "self.__generate_requests_only" mode.
if self._generate_requests_only:
# Allows this function to work with "self._generate_requests_only" mode.
return profile
try:
field = profile[segment][field]
Expand Down Expand Up @@ -353,15 +371,15 @@ def _format_profile_generic(self, messages: str) -> None:
current_segment = messages[i].split()[0].lower()
profile[current_segment] = {}
i += 2
if self.__profile_type in ("dataSet", "resource"):
if self._profile_type in ("dataSet", "resource"):
i = self.__format_data_set_generic_profile_data(
messages, profile, current_segment, i
)
if self.__profile_type == "user":
if self._profile_type == "user":
i = self.__format_user_profile_data(
messages, profile, current_segment, i
)
if self.__profile_type == "group":
if self._profile_type == "group":
i = self.__format_group_profile_data(
messages, profile, current_segment, i
)
Expand Down
14 changes: 14 additions & 0 deletions pyracf/common/security_request_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,17 @@ def __init__(self, result: dict) -> None:

def __str__(self) -> str:
return self.message

def contains_error_message(
self, security_definition_tag: str, error_message_id: str
):
commands = self.result["securityResult"][security_definition_tag].get(
"commands"
)
if not isinstance(commands, list):
return False
messages = commands[0].get("messages", [])
if error_message_id in "".join(messages):
return True
else:
return False
17 changes: 17 additions & 0 deletions pyracf/common/segment_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Exception to use when the user passes bad segment name(s) on an extract request."""


class SegmentError(Exception):
"""
Raised when a user passes a bad segment name on an extract request.
"""

def __init__(self, bad_segments: list, profile_type: str) -> None:
self.message = "Unable to build Security Request.\n\n"
for segment in bad_segments:
self.message += (
f"'{segment}' is not a known segment for '{profile_type}'.\n"
lcarcaramo marked this conversation as resolved.
Show resolved Hide resolved
)

def __str__(self) -> str:
return self.message
16 changes: 16 additions & 0 deletions pyracf/common/segment_trait_error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Exception to use when the user passes bad segment-trait name(s) on an add/alter request."""


class SegmentTraitError(Exception):
"""
Raised when a user passes a bad segment-trait combination in the traits dictionary.
"""

def __init__(self, bad_traits: list, profile_type: str) -> None:
self.message = "Unable to build Security Request.\n\n"
for trait in bad_traits:
self.message += f"'{trait}' is not a known segment-trait "
self.message += f"combination for '{profile_type}'.\n"
lcarcaramo marked this conversation as resolved.
Show resolved Hide resolved

def __str__(self) -> str:
return self.message
Loading