From 8811fa9fe1e27aac91a3ace1d772bb4903d20e66 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Fri, 13 Sep 2024 12:35:12 -0400 Subject: [PATCH 1/2] Expand relational data store query capability; expand relational data store unit test suite --- .../datastore/relational_db/input_creation.py | 2 +- stix2/datastore/relational_db/query.py | 587 ++++++++++++++++++ .../datastore/relational_db/relational_db.py | 46 +- stix2/properties.py | 4 +- .../test/v21/test_datastore_relational_db.py | 390 +++++++++++- 5 files changed, 964 insertions(+), 65 deletions(-) create mode 100644 stix2/datastore/relational_db/query.py diff --git a/stix2/datastore/relational_db/input_creation.py b/stix2/datastore/relational_db/input_creation.py index d66fd4f3..006b4fed 100644 --- a/stix2/datastore/relational_db/input_creation.py +++ b/stix2/datastore/relational_db/input_creation.py @@ -129,7 +129,7 @@ def generate_insert_information(self, name, stix_object, **kwargs): # noqa: F81 @add_method(HexProperty) def generate_insert_information(self, name, stix_object, **kwargs): # noqa: F811 - v = bytes(stix_object[name], 'utf-8') + v = bytes.fromhex(stix_object[name]) return {name: v} diff --git a/stix2/datastore/relational_db/query.py b/stix2/datastore/relational_db/query.py new file mode 100644 index 00000000..50bac5e0 --- /dev/null +++ b/stix2/datastore/relational_db/query.py @@ -0,0 +1,587 @@ +import inspect + +import sqlalchemy as sa + +import stix2 +from stix2.datastore import DataSourceError +from stix2.datastore.relational_db.utils import ( + canonicalize_table_name, schema_for, table_name_for, +) +import stix2.properties +import stix2.utils + + +def _check_support(stix_id): + """ + Misc support checks for the relational data source. May be better to error + out up front and say a type is not supported, than die with some cryptic + SQLAlchemy or other error later. This runs for side-effects (raises + an exception) and doesn't return anything. + + :param stix_id: A STIX ID. The basis for reading an object, used to + determine support + """ + # language-content has a complicated structure in its "contents" + # property, which is not currently supported for storage in a + # relational database. + stix_type = stix2.utils.get_type_from_id(stix_id) + if stix_type in ("language-content",): + raise DataSourceError(f"Reading {stix_type} objects is not supported.") + + +def _tables_for(stix_class, metadata): + """ + Get the core and type-specific tables for the given class + + :param stix_class: A class for a STIX object type + :param metadata: SQLAlchemy Metadata object containing all the table + information + :return: A (core_table, type_table) 2-tuple as SQLAlchemy Table objects + """ + # Info about the type-specific table + type_table_name = table_name_for(stix_class) + type_schema_name = schema_for(stix_class) + type_table = metadata.tables[f"{type_schema_name}.{type_table_name}"] + + # Some fixed info about core tables + if type_schema_name == "sco": + core_table_name = "common.core_sco" + else: + # for SROs and SMOs too? + core_table_name = "common.core_sdo" + + core_table = metadata.tables[core_table_name] + + return core_table, type_table + + +def _stix2_class_for(stix_id): + """ + Find the class for the STIX type indicated by the given STIX ID. + + :param stix_id: A STIX ID + """ + stix_type = stix2.utils.get_type_from_id(stix_id) + stix_class = stix2.registry.class_for_type( + # TODO: give user control over STIX version used? + stix_type, stix_version=stix2.DEFAULT_VERSION, + ) + + return stix_class + + +def _read_simple_properties(stix_id, core_table, type_table, conn): + """ + Read "simple" property values, i.e. those which don't need tables other + than the core/type-specific tables: they're stored directly in columns of + those tables. These two tables are joined and must have a defined foreign + key constraint between them. + + :param stix_id: A STIX ID + :param core_table: A core table + :param type_table: A type-specific table + :param conn: An SQLAlchemy DB connection + :return: A mapping containing the properties and values read + """ + # Both core and type-specific tables have "id"; let's not duplicate that + # in the result set columns. Is there a better way to do this? + type_cols_except_id = ( + col for col in type_table.c if col.key != "id" + ) + + core_type_select = sa.select(core_table, *type_cols_except_id) \ + .join(type_table) \ + .where(core_table.c.id == stix_id) + + # Should be at most one matching row + obj_dict = conn.execute(core_type_select).mappings().first() + + return obj_dict + + +def _read_hashes(fk_id, hashes_table, conn): + """ + Read hashes from a table. + + :param fk_id: A foreign key value used to filter table rows + :param hashes_table: An SQLAlchemy Table object + :param conn: An SQLAlchemy DB connection + :return: The hashes as a dict, or None if no hashes were found + """ + stmt = sa.select(hashes_table.c.hash_name, hashes_table.c.hash_value).where( + hashes_table.c.id == fk_id + ) + + results = conn.execute(stmt) + hashes = dict(results.all()) or None + return hashes + + +def _read_external_references(stix_id, metadata, conn): + """ + Read external references from some fixed tables in the common schema. + + :param stix_id: A STIX ID used to filter table rows + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: The external references, as a list of dicts + """ + ext_refs_table = metadata.tables["common.external_references"] + ext_refs_hashes_table = metadata.tables["common.external_references_hashes"] + ext_refs = [] + + ext_refs_columns = (col for col in ext_refs_table.c if col.key != "id") + stmt = sa.select(*ext_refs_columns).where(ext_refs_table.c.id == stix_id) + ext_refs_results = conn.execute(stmt) + for ext_ref_mapping in ext_refs_results.mappings(): + # make a dict; we will need to modify this mapping + ext_ref_dict = dict(ext_ref_mapping) + hash_ref_id = ext_ref_dict.pop("hash_ref_id") + + hashes_dict = _read_hashes(hash_ref_id, ext_refs_hashes_table, conn) + if hashes_dict: + ext_ref_dict["hashes"] = hashes_dict + + ext_refs.append(ext_ref_dict) + + return ext_refs + + +def _read_object_marking_refs(stix_id, stix_type_class, metadata, conn): + """ + Read object marking refs from one of a couple special tables in the common + schema. + + :param stix_id: A STIX ID, used to filter table rows + :param stix_type_class: STIXTypeClass enum value, used to determine whether + to read the table for SDOs or SCOs + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: The references as a list of strings + """ + + marking_table_name = "object_marking_refs_" + if stix_type_class is stix2.utils.STIXTypeClass.SCO: + marking_table_name += "sco" + else: + marking_table_name += "sdo" + + # The SCO/SDO object_marking_refs tables are mostly identical; they just + # have different foreign key constraints (to different core tables). + marking_table = metadata.tables["common." + marking_table_name] + + stmt = sa.select(marking_table.c.ref_id).where(marking_table.c.id == stix_id) + refs = conn.scalars(stmt).all() + + return refs + + +def _read_granular_markings(stix_id, stix_type_class, metadata, conn): + """ + Read granular markings from one of a couple special tables in the common + schema. + + :param stix_id: A STIX ID, used to filter table rows + :param stix_type_class: STIXTypeClass enum value, used to determine whether + to read the table for SDOs or SCOs + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: Granular markings as a list of dicts + """ + + marking_table_name = "granular_marking_" + if stix_type_class is stix2.utils.STIXTypeClass.SCO: + marking_table_name += "sco" + else: + marking_table_name += "sdo" + + marking_table = metadata.tables["common." + marking_table_name] + + stmt = sa.select( + marking_table.c.lang, + marking_table.c.marking_ref, + marking_table.c.selectors, + ).where(marking_table.c.id == stix_id) + + marking_dicts = conn.execute(stmt).mappings().all() + return marking_dicts + + +def _read_simple_array(fk_id, elt_column_name, array_table, conn): + """ + Read array elements from a given table. + + :param fk_id: A foreign key value used to find the correct array elements + :param elt_column_name: The name of the table column which contains the + array elements + :param array_table: A SQLAlchemy Table object containing the array data + :param conn: An SQLAlchemy DB connection + :return: The array, as a list + """ + stmt = sa.select(array_table.c[elt_column_name]).where(array_table.c.id == fk_id) + refs = conn.scalars(stmt).all() + return refs + + +def _read_kill_chain_phases(stix_id, type_table, metadata, conn): + """ + Read kill chain phases from a table. + + :param stix_id: A STIX ID used to filter table rows + :param type_table: A "parent" table whose name is used to compute the + kill chain phases table name + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: Kill chain phases as a list of dicts + """ + + kill_chain_phases_table = metadata.tables[type_table.fullname + "_kill_chain_phase"] + stmt = sa.select( + kill_chain_phases_table.c.kill_chain_name, + kill_chain_phases_table.c.phase_name + ).where(kill_chain_phases_table.c.id == stix_id) + + kill_chain_phases = conn.execute(stmt).mappings().all() + return kill_chain_phases + + +def _read_dictionary_property(stix_id, type_table, prop_name, prop_instance, metadata, conn): + """ + Read a dictionary from a table. + + :param stix_id: A STIX ID, used to filter table rows + :param type_table: A "parent" table whose name is used to compute the name + of the dictionary table + :param prop_name: The dictionary property name + :param prop_instance: The dictionary property instance + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: The dictionary, or None if no dictionary entries were found + """ + dict_table_name = f"{type_table.fullname}_{prop_name}" + dict_table = metadata.tables[dict_table_name] + + if len(prop_instance.valid_types) == 1: + stmt = sa.select( + dict_table.c.name, dict_table.c.value + ).where( + dict_table.c.id == stix_id + ) + + results = conn.execute(stmt) + dict_value = dict(results.all()) + + else: + # In this case, we get one column per valid type + type_cols = (col for col in dict_table.c if col.key not in ("id", "name")) + stmt = sa.select(dict_table.c.name, *type_cols).where(dict_table.c.id == stix_id) + results = conn.execute(stmt) + + dict_value = {} + for row in results: + key, *type_values = row + # Exactly one of the type columns should be non-None; get that one + non_null_values = (v for v in type_values if v is not None) + first_non_null_value = next(non_null_values, None) + if first_non_null_value is None: + raise DataSourceError( + f'In dictionary table {dict_table.fullname}, key "{key}"' + " did not map to a non-null value" + ) + + dict_value[key] = first_non_null_value + + # DictionaryProperty doesn't like empty dicts. + dict_value = dict_value or None + + return dict_value + + +def _read_embedded_object(obj_id, parent_table, embedded_type, metadata, conn): + """ + Read an embedded object from the database. + + :param obj_id: An ID value used to identify a particular embedded object, + used to filter table rows + :param parent_table: A "parent" table whose name is used to compute the + name of the embedded object table + :param embedded_type: The Python class used to represent the embedded + type (a _STIXBase subclass) + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: An instance of embedded_type + """ + + embedded_table_name = canonicalize_table_name( + f"{parent_table.name}_{embedded_type.__name__}", + parent_table.schema + ) + embedded_table = metadata.tables[embedded_table_name] + + # The PK column in this case is a bookkeeping column and does not + # correspond to an actual embedded object property. So don't select + # that one. + non_id_cols = (col for col in embedded_table.c if col.key != "id") + + stmt = sa.select(*non_id_cols).where(embedded_table.c.id == obj_id) + mapping_row = conn.execute(stmt).mappings().first() + + if mapping_row is None: + obj = None + + else: + obj_dict = dict(mapping_row) + + for prop_name, prop_instance in embedded_type._properties.items(): + if prop_name not in obj_dict: + prop_value = _read_complex_property_value( + obj_id, + prop_name, + prop_instance, + embedded_table, + metadata, + conn + ) + + if prop_value is not None: + obj_dict[prop_name] = prop_value + + obj = embedded_type(**obj_dict, allow_custom=True) + + return obj + + +def _read_embedded_object_list(fk_id, join_table, embedded_type, metadata, conn): + """ + Read a list of embedded objects from database tables. + + :param fk_id: A foreign key ID used to filter rows from the join table, + which acts to find relevant embedded objects + :param join_table: An SQLAlchemy Table object which is the required join + table + :param embedded_type: The Python class used to represent the list element + embedded type (a _STIXBase subclass) + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: A list of instances of embedded_type + """ + + embedded_table_name = canonicalize_table_name( + f"{join_table.name}_{embedded_type.__name__}", + join_table.schema + ) + embedded_table = metadata.tables[embedded_table_name] + + stmt = sa.select(embedded_table).join(join_table).where(join_table.c.id == fk_id) + results = conn.execute(stmt) + obj_list = [] + for result_mapping in results.mappings(): + obj_dict = dict(result_mapping) + obj_id = obj_dict.pop("id") + + for prop_name, prop_instance in embedded_type._properties.items(): + if prop_name not in obj_dict: + prop_value = _read_complex_property_value( + obj_id, + prop_name, + prop_instance, + embedded_table, + metadata, + conn + ) + + if prop_value is not None: + obj_dict[prop_name] = prop_value + + obj = embedded_type(**obj_dict, allow_custom=True) + obj_list.append(obj) + + return obj_list + + +def _read_complex_property_value(obj_id, prop_name, prop_instance, obj_table, metadata, conn): + """ + Read property values which require auxiliary tables to store. These are + idiosyncratic and just require a lot of special cases. This function has + no special support for top-level common properties, so it is more + general-purpose, suitable for any sort of object (top level or embedded). + + :param obj_id: An ID of the owning object. Would be a STIX ID for a + top-level object, but could also be something else for sub-objects. + Used as a foreign key value in queries, so we only get values for this + object. + :param prop_name: The name of the property to read + :param prop_instance: A Property (subclass) instance with property + config information + :param obj_table: The table for the owning object. Mainly used for its + name; auxiliary table names are based on this + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: The property value + """ + + prop_value = None + + if isinstance(prop_instance, stix2.properties.ListProperty): + + if isinstance(prop_instance.contained, stix2.properties.ReferenceProperty): + ref_table_name = f"{obj_table.fullname}_{prop_name}" + ref_table = metadata.tables[ref_table_name] + prop_value = _read_simple_array(obj_id, "ref_id", ref_table, conn) + + elif isinstance(prop_instance.contained, stix2.properties.EnumProperty): + enum_table_name = f"{obj_table.fullname}_{prop_name}" + enum_table = metadata.tables[enum_table_name] + prop_value = _read_simple_array(obj_id, prop_name, enum_table, conn) + + elif isinstance(prop_instance.contained, stix2.properties.EmbeddedObjectProperty): + join_table_name = f"{obj_table.fullname}_{prop_name}" + join_table = metadata.tables[join_table_name] + prop_value = _read_embedded_object_list( + obj_id, + join_table, + prop_instance.contained.type, + metadata, + conn + ) + + elif inspect.isclass(prop_instance.contained) and issubclass(prop_instance.contained, stix2.KillChainPhase): + prop_value = _read_kill_chain_phases(obj_id, obj_table, metadata, conn) + + else: + raise DataSourceError( + f'Not implemented: read "{prop_name}" property value' + f" of type list-of {prop_instance.contained}" + ) + + elif isinstance(prop_instance, stix2.properties.HashesProperty): + hashes_table_name = f"{obj_table.fullname}_{prop_name}" + hashes_table = metadata.tables[hashes_table_name] + prop_value = _read_hashes(obj_id, hashes_table, conn) + + elif isinstance(prop_instance, stix2.properties.ExtensionsProperty): + # TODO: add support for extensions + pass + + elif isinstance(prop_instance, stix2.properties.DictionaryProperty): + # ExtensionsProperty/HashesProperty subclasses DictionaryProperty, so + # this must come after those + prop_value = _read_dictionary_property(obj_id, obj_table, prop_name, prop_instance, metadata, conn) + + elif isinstance(prop_instance, stix2.properties.EmbeddedObjectProperty): + prop_value = _read_embedded_object( + obj_id, + obj_table, + prop_instance.type, + metadata, + conn + ) + + else: + raise DataSourceError( + f'Not implemented: read "{prop_name}" property value' + f" of type {prop_instance.__class__}" + ) + + return prop_value + + +def _read_complex_top_level_property_value(stix_id, stix_type_class, prop_name, prop_instance, type_table, metadata, conn): + """ + Read property values which require auxiliary tables to store. These + require a lot of special cases. This function has additional support for + reading top-level common properties, which use special fixed tables. + + :param stix_id: STIX ID of an object to read + :param stix_type_class: The kind of object (SCO, SDO, etc). Which DB + tables to read can depend on this. + :param prop_name: The name of the property to read + :param prop_instance: A Property (subclass) instance with property + config information + :param type_table: The non-core base table used for this STIX type. Mainly + used for its name; auxiliary table names are based on this + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: The property value + """ + + # Common properties: these use a fixed set of tables for all STIX objects + if prop_name == "external_references": + prop_value = _read_external_references(stix_id, metadata, conn) + + elif prop_name == "object_marking_refs": + prop_value = _read_object_marking_refs(stix_id, stix_type_class, metadata, conn) + + elif prop_name == "granular_markings": + prop_value = _read_granular_markings(stix_id, stix_type_class, metadata, conn) + + else: + # Other properties use specific table patterns depending on property type + prop_value = _read_complex_property_value(stix_id, prop_name, prop_instance, type_table, metadata, conn) + + return prop_value + + +def read_object(stix_id, metadata, conn): + """ + Read a STIX object from the database, identified by a STIX ID. + + :param stix_id: A STIX ID + :param metadata: SQLAlchemy Metadata object containing all the table + information + :param conn: An SQLAlchemy DB connection + :return: A STIX object + """ + _check_support(stix_id) + + stix_class = _stix2_class_for(stix_id) + + if not stix_class: + stix_type = stix2.utils.get_type_from_id(stix_id) + raise DataSourceError("Can't find registered class for type: " + stix_type) + + core_table, type_table = _tables_for(stix_class, metadata) + + if type_table.schema == "common": + # Applies to extension-definition SMO, whose data is stored in the + # common schema; it does not get its own. This type class is used to + # determine which markings tables to use; its markings are + # in the *_sdo tables. + stix_type_class = stix2.utils.STIXTypeClass.SDO + else: + stix_type_class = stix2.utils.to_enum(type_table.schema, stix2.utils.STIXTypeClass) + + simple_props = _read_simple_properties(stix_id, core_table, type_table, conn) + if simple_props is None: + # could not find anything for the given ID! + return None + + obj_dict = dict(simple_props) + obj_dict["type"] = stix_class._type + + for prop_name, prop_instance in stix_class._properties.items(): + if prop_name not in obj_dict: + prop_value = _read_complex_top_level_property_value( + stix_id, + stix_type_class, + prop_name, + prop_instance, + type_table, + metadata, + conn + ) + + if prop_value is not None: + obj_dict[prop_name] = prop_value + + stix_obj = stix_class(**obj_dict, allow_custom=True) + return stix_obj diff --git a/stix2/datastore/relational_db/relational_db.py b/stix2/datastore/relational_db/relational_db.py index 434e37f8..282b7de7 100644 --- a/stix2/datastore/relational_db/relational_db.py +++ b/stix2/datastore/relational_db/relational_db.py @@ -9,6 +9,7 @@ from stix2.datastore.relational_db.input_creation import ( generate_insert_for_object, ) +from stix2.datastore.relational_db.query import read_object from stix2.datastore.relational_db.table_creation import create_table_objects from stix2.datastore.relational_db.utils import ( canonicalize_table_name, schema_for, table_name_for, @@ -80,7 +81,6 @@ def __init__( them. """ database_connection = create_engine(database_connection_url) - print(database_connection) self.metadata = MetaData() create_table_objects( self.metadata, stix_object_classes, @@ -257,44 +257,14 @@ def __init__( ) def get(self, stix_id, version=None, _composite_filters=None): + with self.database_connection.connect() as conn: + stix_obj = read_object( + stix_id, + self.metadata, + conn + ) - stix_type = stix2.utils.get_type_from_id(stix_id) - stix_class = stix2.registry.class_for_type( - # TODO: give user control over STIX version used? - stix_type, stix_version=stix2.DEFAULT_VERSION, - ) - - # Info about the type-specific table - type_table_name = table_name_for(stix_type) - type_schema_name = schema_for(stix_class) - type_table = self.metadata.tables[f"{type_schema_name}.{type_table_name}"] - - # Some fixed info about core tables - if type_schema_name == "sco": - core_table_name = "common.core_sco" - else: - # for SROs and SMOs too? - core_table_name = "common.core_sdo" - - core_table = self.metadata.tables[core_table_name] - - # Both core and type-specific tables have "id"; let's not duplicate - # that in the result set columns. Is there a better way to do this? - type_cols_except_id = ( - col for col in type_table.c if col.key != "id" - ) - - core_type_select = select(core_table, *type_cols_except_id) \ - .join(type_table) \ - .where(core_table.c.id == stix_id) - - obj_dict = {} - with self.database_connection.begin() as conn: - # Should be at most one matching row - sco_data = conn.execute(core_type_select).mappings().first() - obj_dict.update(sco_data) - - return stix_class(**obj_dict, allow_custom=True) + return stix_obj def all_versions(self, stix_id, version=None, _composite_filters=None): pass diff --git a/stix2/properties.py b/stix2/properties.py index f8e79ac9..59a5da1a 100644 --- a/stix2/properties.py +++ b/stix2/properties.py @@ -547,7 +547,9 @@ def clean(self, value, allow_custom=False, strict=False): class HexProperty(Property): def clean(self, value, allow_custom=False, strict=False): - if not re.match(r"^([a-fA-F0-9]{2})+$", value): + if isinstance(value, (bytes, bytearray)): + value = value.hex() + elif not re.match(r"^([a-fA-F0-9]{2})+$", value): raise ValueError("must contain an even number of hexadecimal characters") return value, False diff --git a/stix2/test/v21/test_datastore_relational_db.py b/stix2/test/v21/test_datastore_relational_db.py index 39b461e6..f963229a 100644 --- a/stix2/test/v21/test_datastore_relational_db.py +++ b/stix2/test/v21/test_datastore_relational_db.py @@ -1,12 +1,21 @@ +import contextlib +import datetime import json import os +import pytest + import stix2 from stix2.datastore.relational_db.relational_db import RelationalDBStore +from stix2.datastore import DataSourceError import stix2.properties +import stix2.registry +import stix2.v21 + +_DB_CONNECT_URL = f"postgresql://{os.getenv('POSTGRES_USER', 'postgres')}:{os.getenv('POSTGRES_PASSWORD', 'postgres')}@0.0.0.0:5432/postgres", store = RelationalDBStore( - f"postgresql://{os.getenv('POSTGRES_USER', 'postgres')}:{os.getenv('POSTGRES_PASSWORD', 'postgres')}@0.0.0.0:5432/postgres", + _DB_CONNECT_URL, True, None, False @@ -51,8 +60,6 @@ def test_encrypted_artifact(): read_obj = json.loads(store.get(artifact_stix_object['id']).serialize()) for attrib in encrypted_artifact_dict.keys(): - if attrib == 'hashes': # TODO hashes are saved to separate table, functionality to retrieve is WIP - continue assert encrypted_artifact_dict[attrib] == read_obj[attrib] @@ -98,8 +105,6 @@ def test_directory(): read_obj = json.loads(store.get(directory_obj['id']).serialize()) for attrib in directory_dict.keys(): - if attrib == "contains_refs": # TODO remove skip once we can pull from table join - continue if attrib == "ctime" or attrib == "mtime": # convert both into stix2 date format for consistency assert stix2.utils.parse_into_datetime(directory_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue @@ -196,9 +201,9 @@ def test_email_addr(): "cc_refs": ["email-addr--e4ee5301-b52d-59cd-a8fa-8036738c7194"], "subject": "Check out this picture of a cat!", "additional_header_fields": { - "Content-Disposition": "inline", - "X-Mailer": "Mutt/1.5.23", - "X-Originating-IP": "198.51.100.3", + "Content-Disposition": ["inline"], + "X-Mailer": ["Mutt/1.5.23"], + "X-Originating-IP": ["198.51.100.3"], }, "body_multipart": [ { @@ -226,9 +231,6 @@ def test_email_msg(): read_obj = json.loads(store.get(email_msg_stix_object['id']).serialize()) for attrib in email_msg_dict.keys(): - if attrib == "to_refs" or attrib == "cc_refs" or attrib == "bcc_refs" \ - or attrib == "additional_header_fields": # join multiple tables not implemented yet - continue if attrib == "date": assert stix2.utils.parse_into_datetime(email_msg_dict[attrib]) == stix2.utils.parse_into_datetime( read_obj[attrib], @@ -243,9 +245,6 @@ def test_multipart_email_msg(): read_obj = json.loads(store.get(multipart_email_msg_stix_object['id']).serialize()) for attrib in multipart_email_msg_dict.keys(): - if attrib == "to_refs" or attrib == "cc_refs" or attrib == "bcc_refs" \ - or attrib == "additional_header_fields" or attrib == "body_multipart": # join multiple tables not implemented yet - continue if attrib == "date": assert stix2.utils.parse_into_datetime(multipart_email_msg_dict[attrib]) == stix2.utils.parse_into_datetime( read_obj[attrib], @@ -267,6 +266,7 @@ def test_multipart_email_msg(): "name": "qwerty.dll", "size": 25536, "name_enc": "windows-1252", + "magic_number_hex": "a1b2c3", "mime_type": "application/msword", "ctime": "2018-11-23T08:17:27.000Z", "mtime": "2018-11-23T08:17:27.000Z", @@ -284,8 +284,6 @@ def test_file(): read_obj = json.loads(store.get(file_stix_object['id']).serialize()) for attrib in file_dict.keys(): - if attrib == "contains_refs" or attrib == "hashes": # join multiple tables not implemented yet - continue if attrib == "ctime" or attrib == "mtime" or attrib == "atime": assert stix2.utils.parse_into_datetime(file_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue @@ -381,8 +379,6 @@ def test_network_traffic(): read_obj = store.get(network_traffic_stix_object['id']) for attrib in network_traffic_dict.keys(): - if attrib == "encapsulates_refs": # multiple table join not implemented - continue if attrib == "start" or attrib == "end": assert stix2.utils.parse_into_datetime(network_traffic_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue @@ -421,9 +417,6 @@ def test_process(): read_obj = json.loads(store.get(process_stix_object['id']).serialize()) for attrib in process_dict.keys(): - if attrib == "child_refs" or attrib == "opened_connection_refs" or attrib == "environment_variables": - # join multiple tables not implemented yet - continue if attrib == "created_time": assert stix2.utils.parse_into_datetime(process_dict[attrib]) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue @@ -536,8 +529,6 @@ def test_windows_registry(): read_obj = json.loads(store.get(windows_registry_stix_object['id']).serialize()) for attrib in windows_registry_dict.keys(): - if attrib == "values": # skip multiple table join - continue if attrib == "modified_time": assert stix2.utils.parse_into_datetime(windows_registry_dict[attrib]) == stix2.utils.parse_into_datetime( read_obj[attrib], @@ -610,11 +601,360 @@ def test_x509_certificate_with_extensions(): read_obj = json.loads(store.get(extensions_x509_certificate_stix_object['id']).serialize()) for attrib in extensions_x509_certificate_dict.keys(): - if attrib == "x509_v3_extensions": # skipping multi-table join - continue if attrib == "validity_not_before" or attrib == "validity_not_after": assert stix2.utils.parse_into_datetime( extensions_x509_certificate_dict[attrib], ) == stix2.utils.parse_into_datetime(read_obj[attrib]) continue assert extensions_x509_certificate_dict[attrib] == read_obj[attrib] + + +def test_source_get_not_exists(): + obj = store.get("identity--00000000-0000-0000-0000-000000000000") + assert obj is None + + +def test_source_no_registration(): + with pytest.raises(DataSourceError): + # error, since no registered class can be found + store.get("doesnt-exist--a9e52398-3312-4377-90c2-86d49446c0d0") + + +def _unregister(reg_section, stix_type, ext_id=None): + """ + Unregister a class from the stix2 library's registry. + + :param reg_section: A registry section; depends on the kind of + class which was registered + :param stix_type: A STIX type + :param ext_id: An extension-definition ID, if applicable. A second + unregistration will occur in the extensions section of the registry if + given. + """ + # We ought to have a library function for this... + del stix2.registry.STIX2_OBJ_MAPS["2.1"][reg_section][stix_type] + if ext_id: + del stix2.registry.STIX2_OBJ_MAPS["2.1"]["extensions"][ext_id] + + +@contextlib.contextmanager +def _register_object(*args, **kwargs): + """ + A contextmanager which can register a class for an SDO/SRO and ensure it is + unregistered afterword. + + :param args: Positional args to a @CustomObject decorator + :param kwargs: Keyword args to a @CustomObject decorator + :return: The registered class + """ + @stix2.CustomObject(*args, **kwargs) + class TestClass: + pass + + try: + yield TestClass + except: + ext_id = kwargs.get("extension_name") + if not ext_id and len(args) >= 3: + ext_id = args[2] + + _unregister("objects", TestClass._type, ext_id) + + raise + + +@contextlib.contextmanager +def _register_observable(*args, **kwargs): + """ + A contextmanager which can register a class for an SCO and ensure it is + unregistered afterword. + + :param args: Positional args to a @CustomObservable decorator + :param kwargs: Keyword args to a @CustomObservable decorator + :return: The registered class + """ + @stix2.CustomObservable(*args, **kwargs) + class TestClass: + pass + + try: + yield TestClass + except: + ext_id = kwargs.get("extension_name") + if not ext_id and len(args) >= 4: + ext_id = args[3] + + _unregister("observables", TestClass._type, ext_id) + + raise + + +# "Base" properties used to derive property variations for testing (e.g. in a +# list, in a dictionary, in an embedded object, etc). Also includes sample +# values used to create test objects. The keys here are used to parameterize a +# fixture below. Parameterizing fixtures via simple strings makes for more +# understandable unit test output, although it can be kind of awkward in the +# implementation (can require long if-then chains checking the parameter +# strings). +_TEST_PROPERTIES = { + "binary": (stix2.properties.BinaryProperty(), "Af9J"), + "boolean": (stix2.properties.BooleanProperty(), True), + "float": (stix2.properties.FloatProperty(), 1.23), + "hex": (stix2.properties.HexProperty(), "a1b2c3"), + "integer": (stix2.properties.IntegerProperty(), 1), + "string": (stix2.properties.StringProperty(), "test"), + "timestamp": ( + stix2.properties.TimestampProperty(), + datetime.datetime.now(tz=datetime.timezone.utc) + ), + "ref": ( + stix2.properties.ReferenceProperty("SDO"), + "identity--ec83b570-0743-4179-a5e3-66fd2fae4711" + ), + "enum": ( + stix2.properties.EnumProperty(["value1", "value2"]), + "value1" + ) +} + + +@pytest.fixture(params=_TEST_PROPERTIES.keys()) +def base_property_value(request): + """Produce basic property instances and test values.""" + + base = _TEST_PROPERTIES.get(request.param) + if not base: + pytest.fail("Unrecognized base property: " + request.param) + + return base + + +@pytest.fixture( + params=[ + "base", + "list-of", + "dict-of", + # The following two test nesting lists inside dicts and vice versa + "dict-list-of", + "list-dict-of", + "subobject", + "list-of-subobject-prop", + "list-of-subobject-class" + ] +) +def property_variation_value(request, base_property_value): + """ + Produce property variations (and corresponding value variations) based on a + base property instance and value. E.g. in a list, in a sub-object, etc. + """ + base_property, prop_value = base_property_value + + class Embedded(stix2.v21._STIXBase21): + """ + Used for property variations where the property is embedded in a + sub-object. + """ + _properties = { + "embedded": base_property + } + + if request.param == "base": + prop_variation = base_property + prop_variation_value = prop_value + + elif request.param == "list-of": + prop_variation = stix2.properties.ListProperty(base_property) + prop_variation_value = [prop_value] + + elif request.param == "dict-of": + prop_variation = stix2.properties.DictionaryProperty( + # DictionaryProperty.valid_types does not accept property + # instances (except ListProperty instances), only classes... + valid_types=type(base_property) + ) + # key name doesn't matter here + prop_variation_value = {"key": prop_value} + + elif request.param == "dict-list-of": + prop_variation = stix2.properties.DictionaryProperty( + valid_types=stix2.properties.ListProperty(base_property) + ) + # key name doesn't matter here + prop_variation_value = {"key": [prop_value]} + + elif request.param == "list-dict-of": + # These seem to all fail... perhaps there is no intent to support + # this? + pytest.xfail("ListProperty(DictionaryProperty) not supported?") + + # prop_variation = stix2.properties.ListProperty( + # stix2.properties.DictionaryProperty(valid_types=type(base_property)) + # ) + # key name doesn't matter here + # prop_variation_value = [{"key": prop_value}] + + elif request.param == "subobject": + prop_variation = stix2.properties.EmbeddedObjectProperty(Embedded) + prop_variation_value = {"embedded": prop_value} + + elif request.param == "list-of-subobject-prop": + # list-of-embedded values via EmbeddedObjectProperty + prop_variation = stix2.properties.ListProperty( + stix2.properties.EmbeddedObjectProperty(Embedded) + ) + prop_variation_value = [{"embedded": prop_value}] + + elif request.param == "list-of-subobject-class": + # Skip all of these since we know the data sink currently chokes on it + pytest.xfail("Data sink doesn't yet support ListProperty(<_STIXBase subclass>)") + + # list-of-embedded values using the embedded class directly + # prop_variation = stix2.properties.ListProperty(Embedded) + # prop_variation_value = [{"embedded": prop_value}] + + else: + pytest.fail("Unrecognized property variation: " + request.param) + + return prop_variation, prop_variation_value + + +@pytest.fixture(params=["sdo", "sco", "sro"]) +def object_variation(request, property_variation_value): + """ + Create and register a custom class variation (SDO, SCO, etc), then + instantiate it and produce the resulting object. + """ + + property_instance, property_value = property_variation_value + + # Fixed extension ID for everything + ext_id = "extension-definition--15de9cdb-3515-4271-8479-8141154c5647" + + if request.param == "sdo": + @stix2.CustomObject( + "test-object", [ + ("prop_name", property_instance) + ], + ext_id, + is_sdo=True + ) + class TestClass: + pass + + elif request.param == "sro": + @stix2.CustomObject( + "test-object", [ + ("prop_name", property_instance) + ], + ext_id, + is_sdo=False + ) + class TestClass: + pass + + elif request.param == "sco": + @stix2.CustomObservable( + "test-object", [ + ("prop_name", property_instance) + ], + ["prop_name"], + ext_id + ) + class TestClass: + pass + + else: + pytest.fail("Unrecognized object variation: " + request.param) + + try: + instance = TestClass(prop_name=property_value) + yield instance + finally: + reg_section = "observables" if request.param == "sco" else "objects" + _unregister(reg_section, TestClass._type, ext_id) + + +def test_property(object_variation): + """ + Try to more exhaustively test many different property configurations: + ensure schemas can be created and values can be stored and retrieved. + """ + rdb_store = RelationalDBStore( + _DB_CONNECT_URL, + True, + None, + True, + True, + type(object_variation) + ) + + rdb_store.add(object_variation) + read_obj = rdb_store.get(object_variation["id"]) + + assert read_obj == object_variation + + +def test_dictionary_property_complex(): + """ + Test a dictionary property with multiple valid_types + """ + with _register_object( + "test-object", [ + ("prop_name", + stix2.properties.DictionaryProperty( + valid_types=[ + stix2.properties.IntegerProperty, + stix2.properties.FloatProperty, + stix2.properties.StringProperty + ] + ) + ) + ], + "extension-definition--15de9cdb-3515-4271-8479-8141154c5647", + is_sdo=True + ) as cls: + + obj = cls( + prop_name={"a": 1, "b": 2.3, "c": "foo"} + ) + + rdb_store = RelationalDBStore( + _DB_CONNECT_URL, + True, + None, + True, + True, + cls + ) + + rdb_store.add(obj) + read_obj = rdb_store.get(obj["id"]) + assert read_obj == obj + + +def test_extension_definition(): + obj = stix2.ExtensionDefinition( + created_by_ref="identity--8a5fb7e4-aabe-4635-8972-cbcde1fa4792", + name="test", + schema="a schema", + version="1.2.3", + extension_types=["property-extension", "new-sdo", "new-sro"], + object_marking_refs=[ + "marking-definition--caa0d913-5db8-4424-aae0-43e770287d30", + "marking-definition--122a27a0-b96f-46bc-8fcd-f7a159757e77" + ], + granular_markings=[ + { + "lang": "en_US", + "selectors": ["name", "schema"] + }, + { + "marking_ref": "marking-definition--50902d70-37ae-4f85-af68-3f4095493b42", + "selectors": ["name", "schema"] + } + ] + ) + + store.add(obj) + read_obj = store.get(obj["id"]) + assert read_obj == obj From c33d879a43e7e239c9d641ceb0d621c00dd612c7 Mon Sep 17 00:00:00 2001 From: Michael Chisholm Date: Fri, 13 Sep 2024 12:44:52 -0400 Subject: [PATCH 2/2] Remove a stray comma --- stix2/test/v21/test_datastore_relational_db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stix2/test/v21/test_datastore_relational_db.py b/stix2/test/v21/test_datastore_relational_db.py index f963229a..78dd9f3d 100644 --- a/stix2/test/v21/test_datastore_relational_db.py +++ b/stix2/test/v21/test_datastore_relational_db.py @@ -12,7 +12,7 @@ import stix2.registry import stix2.v21 -_DB_CONNECT_URL = f"postgresql://{os.getenv('POSTGRES_USER', 'postgres')}:{os.getenv('POSTGRES_PASSWORD', 'postgres')}@0.0.0.0:5432/postgres", +_DB_CONNECT_URL = f"postgresql://{os.getenv('POSTGRES_USER', 'postgres')}:{os.getenv('POSTGRES_PASSWORD', 'postgres')}@0.0.0.0:5432/postgres" store = RelationalDBStore( _DB_CONNECT_URL,