Skip to content

Commit

Permalink
APP-4522
Browse files Browse the repository at this point in the history
  • Loading branch information
bpurdy-tc committed Jun 27, 2024
1 parent f71e657 commit 48a4b86
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 110 deletions.
1 change: 1 addition & 0 deletions tcex/api/tc/v3/_gen/_gen_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ def tap(self, type_: str):
'categories',
'results',
'subtypes',
'keyword_sections'
]:
return 'tcex.api.tc.v3.intel_requirements'

Expand Down
71 changes: 69 additions & 2 deletions tcex/api/tc/v3/_gen/_gen_object_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,67 @@ def as_entity(self) -> dict:
)
return '\n'.join(as_entity_property_method)

def _gen_code_object_replace_type_method(self, type_: str, model_type: str | None = None) -> str:
"""Return the method code.
def replace_artifact(self, **kwargs):
'''Replace an Artifact on the object. (mark as staged)
"""
type_ = self.util.snake_string(type_)
model_type = self.util.snake_string(model_type or type_)
model_reference = model_type

# get model from map and update requirements
model_import_data = self._module_import_data(type_)
model_class = model_import_data.get('model_class')
self.requirements['first-party'].append(
f'''from {model_import_data.get('model_module')} '''
f'''import {model_import_data.get('model_class')}'''
)
stage_method = [
(
f'''{self.i1}def replace_{model_type.singular()}(self, '''
f'''data: dict | list | ObjectABC | {model_class}'''
f'''):'''
),
f'''{self.i2}"""Replace {type_.singular()} on the object."""''',
f'''{self.i2}if isinstance(data, ObjectABC):''',
f'''{self.i3}transformed_data = data''',
(
f'''{self.i2}elif isinstance(data, list) and '''
f'''all(isinstance(item, {model_class}) for item in data):'''
),
f'''{self.i3}transformed_data = data''',
(
f'''{self.i2}elif isinstance(data, list) and '''
f'''all(isinstance(item, ObjectABC) for item in data):'''
),
f'''{self.i3}transformed_data = data''',
(
f'''{self.i2}elif isinstance(data, list) and '''
f'''all(isinstance(item, dict) for item in data):'''
),
f'''{self.i3}transformed_data = [{model_class}(**d) for d in data]''',
f'''{self.i2}elif isinstance(data, dict):''',
f'''{self.i3}transformed_data = {model_class}(**data)''',
f'''{self.i2}elif isinstance(data, {model_class}):''',
f'''{self.i3}transformed_data = data''',
f'''{self.i2}else:'''
f'''{self.i3}raise ValueError("Invalid data to replace_{model_type.singular()}")''',
'',
'',
f'''{self.i2}if isinstance(transformed_data, list):''',
f'''{self.i3}for item in transformed_data:''',
f'''{self.i4}item._staged = True''',
f'''{self.i2}elif isinstance(transformed_data, {model_class}):''',
f'''{self.i3}transformed_data._staged = True''',
f'''{self.i2}self.model.{model_reference} = transformed_data # type: ignore''',
'',
'',
]

return '\n'.join(stage_method)

def _gen_code_object_stage_type_method(self, type_: str, model_type: str | None = None) -> str:
"""Return the method code.
Expand Down Expand Up @@ -978,10 +1039,12 @@ def filter ...
_code += self._gen_code_object_stage_type_method('victim_assets')

# generate stage_associated_group method
if 'associatedCases' in add_properties:
# ESUP-2532 - Associations are not Bi-Directional for IRs
if 'associatedCases' in add_properties and self.type_ != 'intel_requirements':
_code += self._gen_code_object_stage_type_method('cases', 'associated_cases')

if 'associatedArtifacts' in add_properties:
# ESUP-2532 - Associations are not Bi-Directional for IRs
if 'associatedArtifacts' in add_properties and self.type_ != 'intel_requirements':
_code += self._gen_code_object_stage_type_method('artifacts', 'associated_artifacts')

# victims have associatedGroups but groups must be
Expand Down Expand Up @@ -1015,6 +1078,10 @@ def filter ...
if 'fileOccurrences' in add_properties:
_code += self._gen_code_object_stage_type_method('file_occurrences')

# generate stage_keyword_section method
if 'keywordSections' in add_properties and self.type_ == 'intel_requirements':
_code += self._gen_code_object_replace_type_method('keyword_sections')

# generate stage_note method
if 'notes' in add_properties:
_code += self._gen_code_object_stage_type_method('notes')
Expand Down
2 changes: 1 addition & 1 deletion tcex/api/tc/v3/_gen/model/_property_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def __process_special_types(cls, pm: 'PropertyModel', extra: dict[str, str]):
}
)
elif pm.type == 'KeywordSection':
bi += 'intel_requirements.keyword_section_model'
bi += 'intel_requirements.keyword_sections.keyword_section_model'
extra.update(
{
'import_data': f'{bi} import KeywordSectionModel',
Expand Down
51 changes: 25 additions & 26 deletions tcex/api/tc/v3/intel_requirements/intel_requirement.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

# first-party
from tcex.api.tc.v3.api_endpoints import ApiEndpoints
from tcex.api.tc.v3.artifacts.artifact_model import ArtifactModel
from tcex.api.tc.v3.cases.case_model import CaseModel
from tcex.api.tc.v3.groups.group_model import GroupModel
from tcex.api.tc.v3.indicators.indicator_model import IndicatorModel
from tcex.api.tc.v3.intel_requirements.intel_requirement_filter import IntelRequirementFilter
from tcex.api.tc.v3.intel_requirements.intel_requirement_model import (
IntelRequirementModel,
IntelRequirementsModel,
)
from tcex.api.tc.v3.intel_requirements.keyword_sections.keyword_section_model import (
KeywordSectionModel,
)
from tcex.api.tc.v3.object_abc import ObjectABC
from tcex.api.tc.v3.object_collection_abc import ObjectCollectionABC
from tcex.api.tc.v3.tags.tag_model import TagModel
Expand Down Expand Up @@ -140,30 +141,6 @@ def tags(self) -> Generator['Tag', None, None]:

yield from self._iterate_over_sublist(Tags) # type: ignore

def stage_associated_case(self, data: dict | ObjectABC | CaseModel):
"""Stage case on the object."""
if isinstance(data, ObjectABC):
data = data.model # type: ignore
elif isinstance(data, dict):
data = CaseModel(**data)

if not isinstance(data, CaseModel):
raise RuntimeError('Invalid type passed in to stage_associated_case')
data._staged = True
self.model.associated_cases.data.append(data) # type: ignore

def stage_associated_artifact(self, data: dict | ObjectABC | ArtifactModel):
"""Stage artifact on the object."""
if isinstance(data, ObjectABC):
data = data.model # type: ignore
elif isinstance(data, dict):
data = ArtifactModel(**data)

if not isinstance(data, ArtifactModel):
raise RuntimeError('Invalid type passed in to stage_associated_artifact')
data._staged = True
self.model.associated_artifacts.data.append(data) # type: ignore

def stage_associated_group(self, data: dict | ObjectABC | GroupModel):
"""Stage group on the object."""
if isinstance(data, ObjectABC):
Expand Down Expand Up @@ -200,6 +177,28 @@ def stage_associated_indicator(self, data: dict | ObjectABC | IndicatorModel):
data._staged = True
self.model.associated_indicators.data.append(data) # type: ignore

def replace_keyword_section(self, data: dict | list | ObjectABC | KeywordSectionModel):
"""Replace keyword_section on the object."""
if not isinstance(data, list):
data = [data]

if (
isinstance(data, list) and
all(isinstance(item, (ObjectABC, KeywordSectionModel)) for item in data)
):
transformed_data = data
elif isinstance(data, list) and all(isinstance(item, dict) for item in data):
transformed_data = [KeywordSectionModel(**d) for d in data]
elif isinstance(data, dict):
transformed_data = KeywordSectionModel(**data)
else:
raise ValueError("Invalid data to replace_keyword_section")

if isinstance(transformed_data, list):
for item in transformed_data:
item._staged = True
self.model.keyword_sections = transformed_data # type: ignore

def stage_tag(self, data: dict | ObjectABC | TagModel):
"""Stage tag on the object."""
if isinstance(data, ObjectABC):
Expand Down
4 changes: 3 additions & 1 deletion tcex/api/tc/v3/intel_requirements/intel_requirement_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ class IntelRequirementsModel(
from tcex.api.tc.v3.groups.group_model import GroupsModel
from tcex.api.tc.v3.indicators.indicator_model import IndicatorsModel
from tcex.api.tc.v3.intel_requirements.intel_req_type_model import IntelReqTypeModel
from tcex.api.tc.v3.intel_requirements.keyword_section_model import KeywordSectionModel
from tcex.api.tc.v3.intel_requirements.keyword_sections.keyword_section_model import (
KeywordSectionModel,
)
from tcex.api.tc.v3.security.users.user_model import UserModel
from tcex.api.tc.v3.tags.tag_model import TagsModel
from tcex.api.tc.v3.victim_assets.victim_asset_model import VictimAssetsModel
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,13 @@
# pylint: disable=no-member,no-self-argument,wrong-import-position

# third-party
from pydantic import Field
from pydantic import Field, PrivateAttr

# first-party
from tcex.api.tc.v3.v3_model_abc import V3ModelABC
from tcex.util import Util


class KeywordModel(
V3ModelABC,
title='Keyword Model',
alias_generator=Util().snake_to_camel,
validate_assignment=True,
):
"""Model Definition"""

value: str | None = Field(
None,
description='The value of the keyword.',
methods=['POST', 'PUT'],
read_only=False,
title='value',
)


class KeywordSectionModel(
V3ModelABC,
title='Keyword Section Model',
Expand All @@ -44,22 +27,27 @@ class KeywordSectionModel(
}
"""

_associated_type = PrivateAttr(False)
_cm_type = PrivateAttr(False)
_shared_type = PrivateAttr(False)
_staged = PrivateAttr(False)

section_number: int | None = Field(
None,
description='The section number of the keyword section.',
methods=['POST', 'PUT'],
read_only=False,
title='sectionNumber',
)
compareValue: str | None = Field(
compare_value: str | None = Field(
None,
description='The compare value for the keyword section.',
methods=['POST', 'PUT'],
read_only=False,
title='compareValue',
)
keywords: list[KeywordModel] | None = Field(
None,
keywords: list[dict] | None = Field(
[],
description='A list of keywords for the keyword section.',
methods=['POST', 'PUT'],
read_only=False,
Expand Down
10 changes: 8 additions & 2 deletions tcex/api/tc/v3/v3_model_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,14 @@ def __init__(self, **kwargs):

# when "id" field is present it indicates that the data was returned from the
# API, otherwise the assumption is that the developer staged the data during
# instantiation of the object.
if kwargs and hasattr(self, 'id') and self.id is None: # pylint: disable=no-member
# instantiation of the object. Keyword Section Model is the only exception to
# this rule.
# pylint: disable=no-member
if (
kwargs and hasattr(self, 'id') and
self.id is None and
self.__config__.title != 'Keyword Section Model'
):
self._staged = True

# store initial dict hash of model
Expand Down
Loading

0 comments on commit 48a4b86

Please sign in to comment.