diff --git a/misp_stix_converter/stix2misp/stix2_to_misp.py b/misp_stix_converter/stix2misp/stix2_to_misp.py index 8714c92c..63d76419 100644 --- a/misp_stix_converter/stix2misp/stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/stix2_to_misp.py @@ -487,32 +487,15 @@ def _load_observed_data(self, observed_data: _OBSERVED_DATA_TYPING): self._observed_data = {observed_data.id: observed_data} def _load_opinion(self, opinion: Opinion): - if opinion.opinion != 'neutral': - misp_sighting = MISPSighting() - sighting_args = { - 'date_sighting': self._timestamp_from_date(opinion.modified), - 'type': '1' if 'disagree' in opinion.opinion else '0' - } - if hasattr(opinion, 'x_misp_source'): - sighting_args['source'] = opinion.x_misp_source - if hasattr(opinion, 'x_misp_author_ref'): - identity = self._identity[opinion.x_misp_author_ref] - sighting_args['Organisation'] = { - 'uuid': self._sanitise_uuid(identity.id), - 'name': identity.name - } - misp_sighting.from_dict(**sighting_args) - opinion_ref = self._sanitise_uuid(opinion.id) - try: - self._sighting['opinion'][opinion_ref] = misp_sighting - except AttributeError: - self._sighting = defaultdict(lambda: defaultdict(list)) - self._sighting['opinion'][opinion_ref] = misp_sighting - for object_ref in opinion.object_refs: - sanitised_ref = self._sanitise_uuid(object_ref) - self._sighting['opinion_refs'][sanitised_ref].append( - opinion_ref - ) + opinion_ref = self._sanitise_uuid(opinion.id) + try: + self._sighting['opinion'][opinion_ref] = opinion + except AttributeError: + self._sighting = defaultdict(lambda: defaultdict(list)) + self._sighting['opinion'][opinion_ref] = opinion + for object_ref in opinion.object_refs: + sanitised_ref = self._sanitise_uuid(object_ref) + self._sighting['opinion_refs'][sanitised_ref].append(opinion_ref) def _load_relationship(self, relationship: _RELATIONSHIP_TYPING): reference = (relationship.target_ref, relationship.relationship_type) @@ -531,26 +514,12 @@ def _load_report(self, report: _REPORT_TYPING): self._report = {report.id: report} def _load_sighting(self, sighting: _SIGHTING_TYPING): - misp_sighting = MISPSighting() - sighting_args = { - 'date_sighting': self._timestamp_from_date(sighting.modified), - 'type': '0' - } - if hasattr(sighting, 'description'): - sighting_args['source'] = sighting.description - if hasattr(sighting, 'where_sighted_refs'): - identity = self._identity[sighting.where_sighted_refs[0]] - sighting_args['Organisation'] = { - 'uuid': self._sanitise_uuid(identity.id), - 'name': identity.name - } - misp_sighting.from_dict(**sighting_args) sighting_of_ref = self._sanitise_uuid(sighting.sighting_of_ref) try: - self._sighting['sighting'][sighting_of_ref].append(misp_sighting) + self._sighting['sighting'][sighting_of_ref].append(sighting) except AttributeError: self._sighting = defaultdict(lambda: defaultdict(list)) - self._sighting['sighting'][sighting_of_ref].append(misp_sighting) + self._sighting['sighting'][sighting_of_ref].append(sighting) def _load_threat_actor(self, threat_actor: _THREAT_ACTOR_TYPING): self._check_uuid(threat_actor.id) @@ -858,10 +827,12 @@ def _handle_attribute_sightings(self, attribute: MISPAttribute): attribute_uuid = self.replacement_uuids[attribute_uuid] if attribute_uuid in self._sighting.get('sighting', {}): for sighting in self._sighting['sighting'][attribute_uuid]: - attribute.add_sighting(sighting) + attribute.add_sighting(self._parse_sighting(sighting)) if attribute_uuid in self._sighting.get('opinion_refs', {}): for opinion_ref in self._sighting['opinion_refs'][attribute_uuid]: - attribute.add_sighting(self._sighting['opinion'][opinion_ref]) + attribute.add_sighting( + self._parse_opinion(self._sighting['opinion'][opinion_ref]) + ) elif attribute_uuid in self._sighting.get('custom_opinion', {}): for sighting in self._sighting['custom_opinion'][attribute_uuid]: attribute.add_sighting(sighting) @@ -872,13 +843,16 @@ def _handle_object_sightings(self, misp_object: MISPObject): object_uuid = self.replacement_uuids[object_uuid] if object_uuid in self._sighting.get('sighting', {}): for sighting in self._sighting['sighting'][object_uuid]: + misp_sighting = self._parse_sighting(sighting) for attribute in misp_object.attributes: - attribute.add_sighting(sighting) + attribute.add_sighting(misp_sighting) if object_uuid in self._sighting.get('opinion_refs', {}): for opinion_ref in self._sighting['opinion_refs'][object_uuid]: - sighting = self._sighting['opinion'][opinion_ref] + misp_sighting = self._parse_opinion( + self._sighting['opinion'][opinion_ref] + ) for attribute in misp_object.attributes: - attribute.add_sighting(sighting) + attribute.add_sighting(misp_sighting) elif misp_object.uuid in self._sighting.get('custom_opinion', {}): for sighting in self._sighting['custom_opinion'][object_uuid]: for attribute in misp_object.attributes: @@ -969,6 +943,23 @@ def _parse_object_relationships_as_tag_names(self, misp_object: MISPObject): self._sanitise_uuid(referenced_uuid), relationship_type ) + def _parse_opinion(self, opinion: Opinion) -> MISPSighting: + misp_sighting = MISPSighting() + sighting_args = { + 'date_sighting': self._timestamp_from_date(opinion.modified), + 'type': '1' if 'disagree' in opinion.opinion else '0' + } + if hasattr(opinion, 'x_misp_source'): + sighting_args['source'] = opinion.x_misp_source + if hasattr(opinion, 'x_misp_author_ref'): + identity = self._identity[opinion.x_misp_author_ref] + sighting_args['Organisation'] = { + 'uuid': self._sanitise_uuid(identity.id), + 'name': identity.name + } + misp_sighting.from_dict(**sighting_args) + return misp_sighting + def _parse_relationships(self): for attribute in self.misp_event.attributes: if attribute.uuid in self._relationship: @@ -1011,6 +1002,23 @@ def _parse_relationships_and_sightings(self): if not self.galaxies_as_tags: self._parse_galaxy_relationships() + def _parse_sighting(self, sighting: _SIGHTING_TYPING) -> MISPSighting: + misp_sighting = MISPSighting() + sighting_args = { + 'date_sighting': self._timestamp_from_date(sighting.modified), + 'type': '0' + } + if hasattr(sighting, 'description'): + sighting_args['source'] = sighting.description + if hasattr(sighting, 'where_sighted_refs'): + identity = self._identity[sighting.where_sighted_refs[0]] + sighting_args['Organisation'] = { + 'uuid': self._sanitise_uuid(identity.id), + 'name': identity.name + } + misp_sighting.from_dict(**sighting_args) + return misp_sighting + def _parse_sightings(self): for attribute in self.misp_event.attributes: self._handle_attribute_sightings(attribute)