Skip to content

Commit

Permalink
fix: [stix2 import] In the end we have to parse the Sighting & Opinio…
Browse files Browse the repository at this point in the history
…n objects and convert them as MISP Sighting when they are used

- Parsing them when the loading methods are called
  can raise issues with some referenced identity
  objects are not loaded already
  • Loading branch information
chrisr3d committed Feb 22, 2024
1 parent ed2e3bc commit 0570b9b
Showing 1 changed file with 55 additions and 47 deletions.
102 changes: 55 additions & 47 deletions misp_stix_converter/stix2misp/stix2_to_misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,32 +487,15 @@ def _load_observed_data(self, observed_data: _OBSERVED_DATA_TYPING):
self._observed_data = {observed_data.id: observed_data}

def _load_opinion(self, opinion: Opinion):
if opinion.opinion != 'neutral':
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(opinion.modified),
'type': '1' if 'disagree' in opinion.opinion else '0'
}
if hasattr(opinion, 'x_misp_source'):
sighting_args['source'] = opinion.x_misp_source
if hasattr(opinion, 'x_misp_author_ref'):
identity = self._identity[opinion.x_misp_author_ref]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
misp_sighting.from_dict(**sighting_args)
opinion_ref = self._sanitise_uuid(opinion.id)
try:
self._sighting['opinion'][opinion_ref] = misp_sighting
except AttributeError:
self._sighting = defaultdict(lambda: defaultdict(list))
self._sighting['opinion'][opinion_ref] = misp_sighting
for object_ref in opinion.object_refs:
sanitised_ref = self._sanitise_uuid(object_ref)
self._sighting['opinion_refs'][sanitised_ref].append(
opinion_ref
)
opinion_ref = self._sanitise_uuid(opinion.id)
try:
self._sighting['opinion'][opinion_ref] = opinion
except AttributeError:
self._sighting = defaultdict(lambda: defaultdict(list))
self._sighting['opinion'][opinion_ref] = opinion
for object_ref in opinion.object_refs:
sanitised_ref = self._sanitise_uuid(object_ref)
self._sighting['opinion_refs'][sanitised_ref].append(opinion_ref)

def _load_relationship(self, relationship: _RELATIONSHIP_TYPING):
reference = (relationship.target_ref, relationship.relationship_type)
Expand All @@ -531,26 +514,12 @@ def _load_report(self, report: _REPORT_TYPING):
self._report = {report.id: report}

def _load_sighting(self, sighting: _SIGHTING_TYPING):
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(sighting.modified),
'type': '0'
}
if hasattr(sighting, 'description'):
sighting_args['source'] = sighting.description
if hasattr(sighting, 'where_sighted_refs'):
identity = self._identity[sighting.where_sighted_refs[0]]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
misp_sighting.from_dict(**sighting_args)
sighting_of_ref = self._sanitise_uuid(sighting.sighting_of_ref)
try:
self._sighting['sighting'][sighting_of_ref].append(misp_sighting)
self._sighting['sighting'][sighting_of_ref].append(sighting)
except AttributeError:
self._sighting = defaultdict(lambda: defaultdict(list))
self._sighting['sighting'][sighting_of_ref].append(misp_sighting)
self._sighting['sighting'][sighting_of_ref].append(sighting)

def _load_threat_actor(self, threat_actor: _THREAT_ACTOR_TYPING):
self._check_uuid(threat_actor.id)
Expand Down Expand Up @@ -858,10 +827,12 @@ def _handle_attribute_sightings(self, attribute: MISPAttribute):
attribute_uuid = self.replacement_uuids[attribute_uuid]
if attribute_uuid in self._sighting.get('sighting', {}):
for sighting in self._sighting['sighting'][attribute_uuid]:
attribute.add_sighting(sighting)
attribute.add_sighting(self._parse_sighting(sighting))
if attribute_uuid in self._sighting.get('opinion_refs', {}):
for opinion_ref in self._sighting['opinion_refs'][attribute_uuid]:
attribute.add_sighting(self._sighting['opinion'][opinion_ref])
attribute.add_sighting(
self._parse_opinion(self._sighting['opinion'][opinion_ref])
)
elif attribute_uuid in self._sighting.get('custom_opinion', {}):
for sighting in self._sighting['custom_opinion'][attribute_uuid]:
attribute.add_sighting(sighting)
Expand All @@ -872,13 +843,16 @@ def _handle_object_sightings(self, misp_object: MISPObject):
object_uuid = self.replacement_uuids[object_uuid]
if object_uuid in self._sighting.get('sighting', {}):
for sighting in self._sighting['sighting'][object_uuid]:
misp_sighting = self._parse_sighting(sighting)
for attribute in misp_object.attributes:
attribute.add_sighting(sighting)
attribute.add_sighting(misp_sighting)
if object_uuid in self._sighting.get('opinion_refs', {}):
for opinion_ref in self._sighting['opinion_refs'][object_uuid]:
sighting = self._sighting['opinion'][opinion_ref]
misp_sighting = self._parse_opinion(
self._sighting['opinion'][opinion_ref]
)
for attribute in misp_object.attributes:
attribute.add_sighting(sighting)
attribute.add_sighting(misp_sighting)
elif misp_object.uuid in self._sighting.get('custom_opinion', {}):
for sighting in self._sighting['custom_opinion'][object_uuid]:
for attribute in misp_object.attributes:
Expand Down Expand Up @@ -969,6 +943,23 @@ def _parse_object_relationships_as_tag_names(self, misp_object: MISPObject):
self._sanitise_uuid(referenced_uuid), relationship_type
)

def _parse_opinion(self, opinion: Opinion) -> MISPSighting:
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(opinion.modified),
'type': '1' if 'disagree' in opinion.opinion else '0'
}
if hasattr(opinion, 'x_misp_source'):
sighting_args['source'] = opinion.x_misp_source
if hasattr(opinion, 'x_misp_author_ref'):
identity = self._identity[opinion.x_misp_author_ref]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
misp_sighting.from_dict(**sighting_args)
return misp_sighting

def _parse_relationships(self):
for attribute in self.misp_event.attributes:
if attribute.uuid in self._relationship:
Expand Down Expand Up @@ -1011,6 +1002,23 @@ def _parse_relationships_and_sightings(self):
if not self.galaxies_as_tags:
self._parse_galaxy_relationships()

def _parse_sighting(self, sighting: _SIGHTING_TYPING) -> MISPSighting:
misp_sighting = MISPSighting()
sighting_args = {
'date_sighting': self._timestamp_from_date(sighting.modified),
'type': '0'
}
if hasattr(sighting, 'description'):
sighting_args['source'] = sighting.description
if hasattr(sighting, 'where_sighted_refs'):
identity = self._identity[sighting.where_sighted_refs[0]]
sighting_args['Organisation'] = {
'uuid': self._sanitise_uuid(identity.id),
'name': identity.name
}
misp_sighting.from_dict(**sighting_args)
return misp_sighting

def _parse_sightings(self):
for attribute in self.misp_event.attributes:
self._handle_attribute_sightings(attribute)
Expand Down

0 comments on commit 0570b9b

Please sign in to comment.