Skip to content

Commit

Permalink
fix: [stix2 import] Fixed relationships handling between sighting & o…
Browse files Browse the repository at this point in the history
…pinion objects, and their references
  • Loading branch information
chrisr3d committed Feb 22, 2024
1 parent a906f1e commit ed2e3bc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def __init__(self, main: _MAIN_PARSER_TYPING):
self._set_main_parser(main)



class InternalSTIX2ObservedDataConverter(
STIX2ObservedDataConverter, InternalSTIX2ObservableConverter):
def __init__(self, main: 'InternalSTIX2toMISPParser'):
Expand Down
57 changes: 32 additions & 25 deletions misp_stix_converter/stix2misp/stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,30 +487,32 @@ def _load_observed_data(self, observed_data: _OBSERVED_DATA_TYPING):
self._observed_data = {observed_data.id: observed_data}

def _load_opinion(self, opinion: Opinion):
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(opinion.modified),
'type': '1',
**self._sanitise_attribute_uuid(opinion.id)
}
if hasattr(opinion, 'x_misp_source'):
sighting_args['source'] = opinion.x_misp_source
if hasattr(opinion, 'x_misp_author_ref'):
identity = self._identity[opinion.x_misp_author_ref]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
if opinion.opinion != 'neutral':
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(opinion.modified),
'type': '1' if 'disagree' in opinion.opinion else '0'
}
misp_sighting.from_dict(**sighting_args)
opinion_ref = self._sanitise_uuid(opinion.id)
try:
self._sighting['opinion'][opinion_ref] = misp_sighting
except AttributeError:
self._sighting = defaultdict(lambda: defaultdict(list))
self._sighting['opinion'][opinion_ref] = misp_sighting
for object_ref in opinion.object_refs:
sanitised_ref = self._sanitise_uuid(object_ref)
self._sighting['opinion_refs'][sanitised_ref].append(opinion_ref)
if hasattr(opinion, 'x_misp_source'):
sighting_args['source'] = opinion.x_misp_source
if hasattr(opinion, 'x_misp_author_ref'):
identity = self._identity[opinion.x_misp_author_ref]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
misp_sighting.from_dict(**sighting_args)
opinion_ref = self._sanitise_uuid(opinion.id)
try:
self._sighting['opinion'][opinion_ref] = misp_sighting
except AttributeError:
self._sighting = defaultdict(lambda: defaultdict(list))
self._sighting['opinion'][opinion_ref] = misp_sighting
for object_ref in opinion.object_refs:
sanitised_ref = self._sanitise_uuid(object_ref)
self._sighting['opinion_refs'][sanitised_ref].append(
opinion_ref
)

def _load_relationship(self, relationship: _RELATIONSHIP_TYPING):
reference = (relationship.target_ref, relationship.relationship_type)
Expand All @@ -532,8 +534,7 @@ def _load_sighting(self, sighting: _SIGHTING_TYPING):
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(sighting.modified),
'type': '0',
**self._sanitise_attribute_uuid(sighting.id)
'type': '0'
}
if hasattr(sighting, 'description'):
sighting_args['source'] = sighting.description
Expand Down Expand Up @@ -845,6 +846,12 @@ def _handle_meta_fields(self, stix_object: _GALAXY_OBJECTS_TYPING) -> dict:
# RELATIONSHIPS & SIGHTINGS PARSING METHODS. #
############################################################################

def _check_sighting_replacements(
self, parent_uuid: str, replaced_uuid: str):
for field in ('opinion_refs', 'sighting'):
if parent_uuid in getattr(self, '_sighting', {}).get(field, {}):
self.replacement_uuids[replaced_uuid] = parent_uuid

def _handle_attribute_sightings(self, attribute: MISPAttribute):
attribute_uuid = attribute.uuid
if attribute_uuid in self.replacement_uuids:
Expand Down

0 comments on commit ed2e3bc

Please sign in to comment.