diff --git a/misp_stix_converter/stix2misp/converters/stix2_observed_data_converter.py b/misp_stix_converter/stix2misp/converters/stix2_observed_data_converter.py index 43d8a997..e9abdfd2 100644 --- a/misp_stix_converter/stix2misp/converters/stix2_observed_data_converter.py +++ b/misp_stix_converter/stix2misp/converters/stix2_observed_data_converter.py @@ -31,7 +31,6 @@ def __init__(self, main: _MAIN_PARSER_TYPING): self._set_main_parser(main) - class InternalSTIX2ObservedDataConverter( STIX2ObservedDataConverter, InternalSTIX2ObservableConverter): def __init__(self, main: 'InternalSTIX2toMISPParser'): diff --git a/misp_stix_converter/stix2misp/stix2_to_misp.py b/misp_stix_converter/stix2misp/stix2_to_misp.py index c54930cf..8714c92c 100644 --- a/misp_stix_converter/stix2misp/stix2_to_misp.py +++ b/misp_stix_converter/stix2misp/stix2_to_misp.py @@ -487,30 +487,32 @@ def _load_observed_data(self, observed_data: _OBSERVED_DATA_TYPING): self._observed_data = {observed_data.id: observed_data} def _load_opinion(self, opinion: Opinion): - misp_sighting = MISPSighting() - sighting_args = { - 'date_sighting': self._timestamp_from_date(opinion.modified), - 'type': '1', - **self._sanitise_attribute_uuid(opinion.id) - } - if hasattr(opinion, 'x_misp_source'): - sighting_args['source'] = opinion.x_misp_source - if hasattr(opinion, 'x_misp_author_ref'): - identity = self._identity[opinion.x_misp_author_ref] - sighting_args['Organisation'] = { - 'uuid': self._sanitise_uuid(identity.id), - 'name': identity.name + if opinion.opinion != 'neutral': + misp_sighting = MISPSighting() + sighting_args = { + 'date_sighting': self._timestamp_from_date(opinion.modified), + 'type': '1' if 'disagree' in opinion.opinion else '0' } - misp_sighting.from_dict(**sighting_args) - opinion_ref = self._sanitise_uuid(opinion.id) - try: - self._sighting['opinion'][opinion_ref] = misp_sighting - except AttributeError: - self._sighting = defaultdict(lambda: defaultdict(list)) - self._sighting['opinion'][opinion_ref] = misp_sighting - for object_ref in opinion.object_refs: - sanitised_ref = self._sanitise_uuid(object_ref) - self._sighting['opinion_refs'][sanitised_ref].append(opinion_ref) + if hasattr(opinion, 'x_misp_source'): + sighting_args['source'] = opinion.x_misp_source + if hasattr(opinion, 'x_misp_author_ref'): + identity = self._identity[opinion.x_misp_author_ref] + sighting_args['Organisation'] = { + 'uuid': self._sanitise_uuid(identity.id), + 'name': identity.name + } + misp_sighting.from_dict(**sighting_args) + opinion_ref = self._sanitise_uuid(opinion.id) + try: + self._sighting['opinion'][opinion_ref] = misp_sighting + except AttributeError: + self._sighting = defaultdict(lambda: defaultdict(list)) + self._sighting['opinion'][opinion_ref] = misp_sighting + for object_ref in opinion.object_refs: + sanitised_ref = self._sanitise_uuid(object_ref) + self._sighting['opinion_refs'][sanitised_ref].append( + opinion_ref + ) def _load_relationship(self, relationship: _RELATIONSHIP_TYPING): reference = (relationship.target_ref, relationship.relationship_type) @@ -532,8 +534,7 @@ def _load_sighting(self, sighting: _SIGHTING_TYPING): misp_sighting = MISPSighting() sighting_args = { 'date_sighting': self._timestamp_from_date(sighting.modified), - 'type': '0', - **self._sanitise_attribute_uuid(sighting.id) + 'type': '0' } if hasattr(sighting, 'description'): sighting_args['source'] = sighting.description @@ -845,6 +846,12 @@ def _handle_meta_fields(self, stix_object: _GALAXY_OBJECTS_TYPING) -> dict: # RELATIONSHIPS & SIGHTINGS PARSING METHODS. # ############################################################################ + def _check_sighting_replacements( + self, parent_uuid: str, replaced_uuid: str): + for field in ('opinion_refs', 'sighting'): + if parent_uuid in getattr(self, '_sighting', {}).get(field, {}): + self.replacement_uuids[replaced_uuid] = parent_uuid + def _handle_attribute_sightings(self, attribute: MISPAttribute): attribute_uuid = attribute.uuid if attribute_uuid in self.replacement_uuids: