From 3fe61d6b722bae828a45bda8f478e9938cb7cfa3 Mon Sep 17 00:00:00 2001 From: Qingmin Duanmu Date: Wed, 22 Jan 2025 14:41:11 +0800 Subject: [PATCH 1/4] feat: add control implmentations and implemented requirements --- trestlebot/tasks/sync_cac_content_task.py | 313 ++++++++++++++++++++- trestlebot/transformers/cac_transformer.py | 5 +- 2 files changed, 310 insertions(+), 8 deletions(-) diff --git a/trestlebot/tasks/sync_cac_content_task.py b/trestlebot/tasks/sync_cac_content_task.py index 49ab3c9b..36dd443f 100644 --- a/trestlebot/tasks/sync_cac_content_task.py +++ b/trestlebot/tasks/sync_cac_content_task.py @@ -6,16 +6,32 @@ import logging import os import pathlib -from typing import List +import re +from typing import Dict, List, Optional, Pattern, Set # from ssg.products import get_all -from ssg.profiles import get_profiles_from_products -from trestle.common.list_utils import none_if_empty +from ssg.controls import Control, ControlsManager +from ssg.products import load_product_yaml, product_yaml_path +from ssg.profiles import _load_yaml_profile_file, get_profiles_from_products +from trestle.common.common_types import TypeWithParts, TypeWithProps +from trestle.common.const import TRESTLE_HREF_HEADING +from trestle.common.list_utils import as_list, none_if_empty from trestle.common.model_utils import ModelUtils +from trestle.core.catalog.catalog_interface import CatalogInterface +from trestle.core.control_interface import ControlInterface from trestle.core.generators import generate_sample_model from trestle.core.models.file_content_type import FileContentType +from trestle.core.profile_resolver import ProfileResolver +from trestle.oscal.catalog import Catalog from trestle.oscal.common import Property -from trestle.oscal.component import ComponentDefinition, DefinedComponent +from trestle.oscal.component import ( + ComponentDefinition, + ControlImplementation, + DefinedComponent, + ImplementedRequirement, + SetParameter, + Statement, +) from trestlebot import const from trestlebot.tasks.base_task import TaskBase @@ -29,6 +45,65 @@ logger = logging.getLogger(__name__) +SECTION_PATTERN = r"Section ([a-z]):" + + +class OSCALProfileHelper: + """Helper class to handle OSCAL profile.""" + + def __init__(self, trestle_root: pathlib.Path) -> None: + """Initialize.""" + self._root = trestle_root + self.profile_controls: Set[str] = set() + self.controls_by_label: Dict[str, str] = dict() + + def load(self, profile_path: str) -> None: + """Load the profile catalog.""" + profile_resolver = ProfileResolver() + resolved_catalog: Catalog = profile_resolver.get_resolved_profile_catalog( + self._root, + profile_path, + block_params=False, + params_format="[.]", + show_value_warnings=True, + ) + + for control in CatalogInterface(resolved_catalog).get_all_controls_from_dict(): + self.profile_controls.add(control.id) + label = ControlInterface.get_label(control) + if label: + self.controls_by_label[label] = control.id + self._handle_parts(control) + + def _handle_parts( + self, + control: TypeWithParts, + ) -> None: + """Handle parts of a control.""" + if control.parts: + for part in control.parts: + if not part.id: + continue + self.profile_controls.add(part.id) + label = ControlInterface.get_label(part) + # Avoiding key collision here. The higher level control object will take + # precedence. + if label and label not in self.controls_by_label.keys(): + self.controls_by_label[label] = part.id + self._handle_parts(part) + + def validate(self, control_id: str) -> Optional[str]: + """Validate that the control id exists in the catalog and return the id""" + if control_id in self.controls_by_label.keys(): + logger.debug(f"Found control {control_id} in control labels") + return self.controls_by_label.get(control_id) + elif control_id in self.profile_controls: + logger.debug(f"Found control {control_id} in profile control ids") + return control_id + + logger.debug(f"Control {control_id} does not exist in the profile") + return None + class SyncCacContentTask(TaskBase): """Sync CaC content to OSCAL component definition task.""" @@ -47,7 +122,12 @@ def __init__( self.cac_profile: str = cac_profile self.cac_content_root: str = cac_content_root self.compdef_type: str = compdef_type + self.oscal_profile: str = oscal_profile self.rules: List[str] = [] + self.controls: List[Control] = list() + self.profile_href: str = "" + self.profile_path: str = "" + self._rules_by_id: Dict[str, RuleInfo] = dict() super().__init__(working_dir, None) @@ -68,7 +148,8 @@ def _get_rules_properties(self) -> List[Property]: self.cac_profile, ) rules_transformer.add_rules(self.rules) - rules: List[RuleInfo] = rules_transformer.get_all_rules() + self.rules_by_id: Dict[str, RuleInfo] = rules_transformer.get_all_rule_objs() + rules: List[RuleInfo] = list(self.rules_by_id.values()) all_rule_properties: List[Property] = rules_transformer.transform(rules) return all_rule_properties @@ -91,6 +172,217 @@ def _add_props(self, oscal_component: DefinedComponent) -> DefinedComponent: oscal_component.props = props return oscal_component + def _get_source(self, profile_name_or_href: str) -> None: + """Get the href and source of the profile.""" + profile_in_trestle_dir = "://" not in profile_name_or_href + self.profile_href = profile_name_or_href + if profile_in_trestle_dir: + local_path = f"profiles/{profile_name_or_href}/profile.json" + self.profile_href = TRESTLE_HREF_HEADING + local_path + self.profile_path = os.path.join(self.working_dir, local_path) + else: + self.profile_path = self.profile_href + + def _load_controls_manager(self) -> ControlsManager: + """ + Loads and initializes a ControlsManager instance. + """ + product_yml_path = product_yaml_path(self.cac_content_root, self.product) + product_yaml = load_product_yaml(product_yml_path) + controls_dir = os.path.join(self.cac_content_root, "controls") + control_mgr = ControlsManager(controls_dir, product_yaml) + control_mgr.load() + return control_mgr + + def _get_controls(self) -> None: + controls_manager = self._load_controls_manager() + policies = controls_manager.policies + profile_yaml = _load_yaml_profile_file(self.cac_profile) + selections = profile_yaml.get("selections", []) + for selected in selections: + if ":" in selected: + parts = selected.split(":") + if len(parts) == 3: + policy_id, level = parts[0], parts[2] + else: + policy_id, level = parts[0], "all" + policy = policies.get(policy_id) + if policy is not None: + self.controls.extend( + controls_manager.get_all_controls_of_level(policy_id, level) + ) + + @staticmethod + def _build_sections_dict( + control_response: str, + section_pattern: Pattern[str], + ) -> Dict[str, List[str]]: + """Find all sections in the control response and build a dictionary of them.""" + lines = control_response.split("\n") + + sections_dict: Dict[str, List[str]] = dict() + current_section_label = None + + for line in lines: + match = section_pattern.match(line) + + if match: + current_section_label = match.group(1) + sections_dict[current_section_label] = [line] + elif current_section_label is not None: + sections_dict[current_section_label].append(line) + + return sections_dict + + def _create_statement(self, statement_id: str, description: str = "") -> Statement: + """Create a statement.""" + statement = generate_sample_model(Statement) + statement.statement_id = statement_id + if description: + statement.description = description + return statement + + def _handle_response( + self, + implemented_req: ImplementedRequirement, + control: Control, + profile: OSCALProfileHelper, + ) -> None: + """ + Break down the response into parts. + + Args: + implemented_req: The implemented requirement to add the response and statements to. + control_response: The control response to add to the implemented requirement. + """ + # If control notes is unavailable, consider to use other input as replacement + # or a generic information. + control_response = control.notes + pattern = re.compile(SECTION_PATTERN, re.IGNORECASE) + + sections_dict = self._build_sections_dict(control_response, pattern) + # oscal_status = OscalStatus.from_string(control.status) + + if sections_dict: + # self._add_response_by_status(implemented_req, oscal_status, REPLACE_ME) + implemented_req.statements = list() + for section_label, section_content in sections_dict.items(): + statement_id = profile.validate( + f"{implemented_req.control_id}_smt.{section_label}" + ) + if statement_id is None: + continue + + section_content_str = "\n".join(section_content) + section_content_str = pattern.sub("", section_content_str) + statement = self._create_statement( + statement_id, section_content_str.strip() + ) + implemented_req.statements.append(statement) + # else: + # self._add_response_by_status( + # implemented_req, oscal_status, control_response.strip() + # ) + + def _process_rule_ids(self, rule_ids: List[str]) -> List[str]: + """ + Process rule ids. + Notes: Rule ids with an "=" are parameters and should not be included + # when searching for rules. + """ + processed_rule_ids: List[str] = list() + for rule_id in rule_ids: + parts = rule_id.split("=") + if len(parts) == 1: + processed_rule_ids.append(rule_id) + return processed_rule_ids + + def _attach_rules( + self, + type_with_props: TypeWithProps, + rule_ids: List[str], + ) -> None: + """Add rules to a type with props.""" + all_props: List[Property] = as_list(type_with_props.props) + # Get a subset from self.rules_by_id according to rule_ids + rules_by_id = {k: v for k, v in self.rules_by_id.items() if k in rule_ids} + rules: List[RuleInfo] = list(rules_by_id.values()) + rules_transformer = RulesTransformer( + self.cac_content_root, + self.product, + self.cac_profile, + ) + rule_properties: List[Property] = rules_transformer.transform(rules) + all_props.extend(rule_properties) + type_with_props.props = none_if_empty(all_props) + + def _add_set_parameters( + self, control_implementation: ControlImplementation + ) -> None: + """Add set parameters to a control implementation.""" + rules: List[RuleInfo] = list(self.rules_by_id.values()) + params = [] + for rule in rules: + params.extend(rule._parameters) + param_selections = {param.id: param.selected_value for param in params} + + if param_selections: + all_set_params: List[SetParameter] = as_list( + control_implementation.set_parameters + ) + for param_id, value in param_selections.items(): + set_param = generate_sample_model(SetParameter) + set_param.param_id = param_id + set_param.values = [value] + all_set_params.append(set_param) + control_implementation.set_parameters = none_if_empty(all_set_params) + + def _create_implemented_requirement( + self, control: Control + ) -> Optional[ImplementedRequirement]: + """Create implemented requirement from a control object""" + + logger.info(f"Creating implemented requirement for {control.id}") + profile = OSCALProfileHelper(pathlib.Path(self.working_dir)) + profile.load(self.profile_path) + + control_id = profile.validate(control.id) + if control_id: + implemented_req = generate_sample_model(ImplementedRequirement) + implemented_req.control_id = control_id + self._handle_response(implemented_req, control, profile) + rule_ids = self._process_rule_ids(control.rules) + self._attach_rules(implemented_req, rule_ids) + return implemented_req + return None + + def _create_control_implementation(self) -> ControlImplementation: + """Create control implementation for a component.""" + ci = generate_sample_model(ControlImplementation) + ci.source = self.profile_href + all_implement_reqs = list() + self._get_controls() + + # Get all profile related controls here: + for control in self.controls: + implemented_req = self._create_implemented_requirement(control) + if implemented_req: + all_implement_reqs.append(implemented_req) + ci.implemented_requirements = all_implement_reqs + self._add_set_parameters(ci) + return ci + + def _add_control_implementations( + self, oscal_component: DefinedComponent + ) -> DefinedComponent: + """Add control implementations to OSCAL component.""" + self._get_source(self.oscal_profile) + control_implementation: ControlImplementation = ( + self._create_control_implementation() + ) + oscal_component.control_implementations = [control_implementation] + return oscal_component + def _update_compdef( self, cd_json: pathlib.Path, oscal_component: DefinedComponent ) -> None: @@ -106,6 +398,16 @@ def _update_compdef( logger.info(f"Start to update props of {component.title}") compdef.components[index].props = oscal_component.props updated = True + if ( + component.control_implementations + != oscal_component.control_implementations + ): + logger.info(f"Start to update props of {component.title}") + compdef.components[index].control_implementations = ( + oscal_component.control_implementations + ) + updated = True + if updated: compdef.oscal_write(cd_json) break @@ -140,6 +442,7 @@ def _create_or_update_compdef(self, compdef_type: str = "service") -> None: """Create or update component definition for specified CaC profile.""" oscal_component = generate_sample_model(DefinedComponent) oscal_component = self._add_props(oscal_component) + oscal_component = self._add_control_implementations(oscal_component) repo_path = pathlib.Path(self.working_dir) cd_json: pathlib.Path = ModelUtils.get_model_path_for_name_and_class( diff --git a/trestlebot/transformers/cac_transformer.py b/trestlebot/transformers/cac_transformer.py index 6e0b9847..61a65d84 100644 --- a/trestlebot/transformers/cac_transformer.py +++ b/trestlebot/transformers/cac_transformer.py @@ -332,9 +332,8 @@ def _get_rule_properties(self, ruleset: str, rule_obj: RuleInfo) -> List[Propert return rule_properties - def get_all_rules(self) -> List[RuleInfo]: - """Get all rules that have been loaded""" - return list(self._rules_by_id.values()) + def get_all_rule_objs(self) -> Dict[str, RuleInfo]: + return self._rules_by_id def transform(self, rule_objs: List[RuleInfo]) -> List[Property]: """Get the rules properties for a set of rule ids.""" From 76d7f9b88af125f1c72bdaec47d117b83da0f49e Mon Sep 17 00:00:00 2001 From: Qingmin Duanmu Date: Wed, 22 Jan 2025 15:55:59 +0800 Subject: [PATCH 2/4] chore: update testing for sync_cac_content --- .../cli/test_sync_cac_content_cmd.py | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/tests/trestlebot/cli/test_sync_cac_content_cmd.py b/tests/trestlebot/cli/test_sync_cac_content_cmd.py index 28254df0..d42cd0fa 100644 --- a/tests/trestlebot/cli/test_sync_cac_content_cmd.py +++ b/tests/trestlebot/cli/test_sync_cac_content_cmd.py @@ -117,7 +117,7 @@ def test_sync_product(tmp_repo: Tuple[str, Repo]) -> None: assert len(compdef.components) == 1 component = compdef.components[0] assert component.title == "rhel8" - # Check rules component props + # Check rules component props are added assert len(component.props) == 24 rule_ids = [p.value for p in component.props if p.name == "Rule_Id"] assert sorted(rule_ids) == [ @@ -125,13 +125,30 @@ def test_sync_product(tmp_repo: Tuple[str, Repo]) -> None: "file_groupownership_sshd_private_key", "sshd_set_keepalive", ] - # Check parameters props + # Check parameters props are added param_ids = [p.value for p in component.props if p.name == "Parameter_Id"] assert sorted(list(set(param_ids))) == [ "var_sshd_set_keepalive", "var_system_crypto_policy", ] + # Check control_implementations are attached + ci = component.control_implementations[0] + assert ci.source == "trestle://profiles/simplified_nist_profile/profile.json" + set_parameters = ci.set_parameters + assert len(set_parameters) == 2 + set_params_ids = [] + set_params_dict = {} + for param in set_parameters: + set_params_ids.append(param.param_id) + set_params_dict.update({param.param_id: param.values}) + assert sorted(set_params_ids) == [ + "var_sshd_set_keepalive", + "var_system_crypto_policy", + ] + assert set_params_dict["var_sshd_set_keepalive"] == ["1"] + assert set_params_dict["var_system_crypto_policy"] == ["fips"] + def test_sync_product_create_validation_component(tmp_repo: Tuple[str, Repo]) -> None: """Tests sync Cac content to create validation component.""" From debfd954c044eddec7b3e7aab92f275e7a214aa9 Mon Sep 17 00:00:00 2001 From: Qingmin Duanmu Date: Wed, 22 Jan 2025 20:06:33 +0800 Subject: [PATCH 3/4] chore: transformation performance improvement --- trestlebot/tasks/sync_cac_content_task.py | 55 ++++++++++++---------- trestlebot/transformers/cac_transformer.py | 5 ++ 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/trestlebot/tasks/sync_cac_content_task.py b/trestlebot/tasks/sync_cac_content_task.py index 36dd443f..df8c6799 100644 --- a/trestlebot/tasks/sync_cac_content_task.py +++ b/trestlebot/tasks/sync_cac_content_task.py @@ -118,6 +118,7 @@ def __init__( working_dir: str, ) -> None: """Initialize CaC content sync task.""" + self.product: str = product self.cac_profile: str = cac_profile self.cac_content_root: str = cac_content_root @@ -125,9 +126,11 @@ def __init__( self.oscal_profile: str = oscal_profile self.rules: List[str] = [] self.controls: List[Control] = list() + self.rules_by_id: Dict[str, RuleInfo] = dict() + self.profile_href: str = "" self.profile_path: str = "" - self._rules_by_id: Dict[str, RuleInfo] = dict() + self.profile = OSCALProfileHelper(pathlib.Path(working_dir)) super().__init__(working_dir, None) @@ -148,7 +151,7 @@ def _get_rules_properties(self) -> List[Property]: self.cac_profile, ) rules_transformer.add_rules(self.rules) - self.rules_by_id: Dict[str, RuleInfo] = rules_transformer.get_all_rule_objs() + self.rules_by_id = rules_transformer.get_all_rule_objs() rules: List[RuleInfo] = list(self.rules_by_id.values()) all_rule_properties: List[Property] = rules_transformer.transform(rules) return all_rule_properties @@ -195,6 +198,7 @@ def _load_controls_manager(self) -> ControlsManager: return control_mgr def _get_controls(self) -> None: + """Collect controls selected by profile.""" controls_manager = self._load_controls_manager() policies = controls_manager.policies profile_yaml = _load_yaml_profile_file(self.cac_profile) @@ -246,7 +250,6 @@ def _handle_response( self, implemented_req: ImplementedRequirement, control: Control, - profile: OSCALProfileHelper, ) -> None: """ Break down the response into parts. @@ -267,7 +270,7 @@ def _handle_response( # self._add_response_by_status(implemented_req, oscal_status, REPLACE_ME) implemented_req.statements = list() for section_label, section_content in sections_dict.items(): - statement_id = profile.validate( + statement_id = self.profile.validate( f"{implemented_req.control_id}_smt.{section_label}" ) if statement_id is None: @@ -301,18 +304,15 @@ def _attach_rules( self, type_with_props: TypeWithProps, rule_ids: List[str], + rules_transformer: RulesTransformer, ) -> None: """Add rules to a type with props.""" all_props: List[Property] = as_list(type_with_props.props) - # Get a subset from self.rules_by_id according to rule_ids - rules_by_id = {k: v for k, v in self.rules_by_id.items() if k in rule_ids} - rules: List[RuleInfo] = list(rules_by_id.values()) - rules_transformer = RulesTransformer( - self.cac_content_root, - self.product, - self.cac_profile, - ) - rule_properties: List[Property] = rules_transformer.transform(rules) + all_rule_ids = self.rules_by_id.keys() + error_rules = list(filter(lambda x: x not in all_rule_ids, rule_ids)) + if error_rules: + raise ValueError(f"Could not find rules: {', '.join(error_rules)}") + rule_properties: List[Property] = rules_transformer.get_rule_id_props(rule_ids) all_props.extend(rule_properties) type_with_props.props = none_if_empty(all_props) @@ -338,21 +338,18 @@ def _add_set_parameters( control_implementation.set_parameters = none_if_empty(all_set_params) def _create_implemented_requirement( - self, control: Control + self, control: Control, rules_transformer: RulesTransformer ) -> Optional[ImplementedRequirement]: """Create implemented requirement from a control object""" logger.info(f"Creating implemented requirement for {control.id}") - profile = OSCALProfileHelper(pathlib.Path(self.working_dir)) - profile.load(self.profile_path) - - control_id = profile.validate(control.id) + control_id = self.profile.validate(control.id) if control_id: implemented_req = generate_sample_model(ImplementedRequirement) implemented_req.control_id = control_id - self._handle_response(implemented_req, control, profile) + self._handle_response(implemented_req, control) rule_ids = self._process_rule_ids(control.rules) - self._attach_rules(implemented_req, rule_ids) + self._attach_rules(implemented_req, rule_ids, rules_transformer) return implemented_req return None @@ -362,10 +359,16 @@ def _create_control_implementation(self) -> ControlImplementation: ci.source = self.profile_href all_implement_reqs = list() self._get_controls() + rules_transformer = RulesTransformer( + self.cac_content_root, + self.product, + self.cac_profile, + ) - # Get all profile related controls here: for control in self.controls: - implemented_req = self._create_implemented_requirement(control) + implemented_req = self._create_implemented_requirement( + control, rules_transformer + ) if implemented_req: all_implement_reqs.append(implemented_req) ci.implemented_requirements = all_implement_reqs @@ -377,6 +380,7 @@ def _add_control_implementations( ) -> DefinedComponent: """Add control implementations to OSCAL component.""" self._get_source(self.oscal_profile) + self.profile.load(self.profile_path) control_implementation: ControlImplementation = ( self._create_control_implementation() ) @@ -398,11 +402,14 @@ def _update_compdef( logger.info(f"Start to update props of {component.title}") compdef.components[index].props = oscal_component.props updated = True + # The way to check control implementations needs to be updated if ( component.control_implementations != oscal_component.control_implementations ): - logger.info(f"Start to update props of {component.title}") + logger.info( + f"Start to update control implementations of {component.title}" + ) compdef.components[index].control_implementations = ( oscal_component.control_implementations ) @@ -438,7 +445,7 @@ def _create_compdef( component_definition.components.append(oscal_component) component_definition.oscal_write(cd_json) - def _create_or_update_compdef(self, compdef_type: str = "service") -> None: + def _create_or_update_compdef(self) -> None: """Create or update component definition for specified CaC profile.""" oscal_component = generate_sample_model(DefinedComponent) oscal_component = self._add_props(oscal_component) diff --git a/trestlebot/transformers/cac_transformer.py b/trestlebot/transformers/cac_transformer.py index 61a65d84..a74d0722 100644 --- a/trestlebot/transformers/cac_transformer.py +++ b/trestlebot/transformers/cac_transformer.py @@ -332,6 +332,11 @@ def _get_rule_properties(self, ruleset: str, rule_obj: RuleInfo) -> List[Propert return rule_properties + def get_rule_id_props(self, rule_ids: List[str]) -> List[Property]: + """Get the rule props with rule ids.""" + props: List[Property] = [add_prop(RULE_ID, rule_id) for rule_id in rule_ids] + return props + def get_all_rule_objs(self) -> Dict[str, RuleInfo]: return self._rules_by_id From 3a2ea5d24f7dcc367a76b5858abf267589b2a292 Mon Sep 17 00:00:00 2001 From: Qingmin Duanmu Date: Thu, 23 Jan 2025 14:09:31 +0800 Subject: [PATCH 4/4] chore: update checking on component data --- trestlebot/tasks/sync_cac_content_task.py | 26 ++++++++++++----------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/trestlebot/tasks/sync_cac_content_task.py b/trestlebot/tasks/sync_cac_content_task.py index df8c6799..a0071740 100644 --- a/trestlebot/tasks/sync_cac_content_task.py +++ b/trestlebot/tasks/sync_cac_content_task.py @@ -396,41 +396,43 @@ def _update_compdef( updated = False for index, component in enumerate(compdef.components): components_titles.append(component.title) - # If the component exists and the props need to be updated + # Check if the component exists and needs to be updated if component.title == oscal_component.title: - if component.props != oscal_component.props: - logger.info(f"Start to update props of {component.title}") + if not ModelUtils.models_are_equivalent( + component.props, oscal_component.props, ignore_all_uuid=True + ): + logger.info(f"Component props of {component.title} has an update") compdef.components[index].props = oscal_component.props updated = True - # The way to check control implementations needs to be updated - if ( - component.control_implementations - != oscal_component.control_implementations + if not ModelUtils.models_are_equivalent( + component.control_implementations, + oscal_component.control_implementations, + ignore_all_uuid=True, ): logger.info( - f"Start to update control implementations of {component.title}" + f"Control implementations of {component.title} has an update" ) compdef.components[index].control_implementations = ( oscal_component.control_implementations ) updated = True if updated: - compdef.oscal_write(cd_json) break if oscal_component.title not in components_titles: - logger.info(f"Start to append component {oscal_component.title}") + logger.info(f"Component {oscal_component.title} needs to be added") compdef.components.append(oscal_component) - compdef.oscal_write(cd_json) updated = True if updated: - logger.info(f"Update component definition: {cd_json}") compdef.metadata.version = str( "{:.1f}".format(float(compdef.metadata.version) + 0.1) ) ModelUtils.update_last_modified(compdef) compdef.oscal_write(cd_json) + logger.info(f"Component definition: {cd_json} is updated") + else: + logger.info(f"No update in component definition: {cd_json}") def _create_compdef( self, cd_json: pathlib.Path, oscal_component: DefinedComponent