From 129933087cc76025f36107da891534c8ff21319e Mon Sep 17 00:00:00 2001 From: Xiaokui Shu Date: Mon, 24 Jul 2023 15:40:41 -0400 Subject: [PATCH] add param cool_down_after_transmission to ss --- src/kestrel_datasource_stixshifter/config.py | 82 +++++++++++-------- .../interface.py | 1 + .../multiproc.py | 2 + src/kestrel_datasource_stixshifter/query.py | 2 + .../worker/transmitter.py | 12 ++- tests/test_command_find.py | 36 ++++---- tests/test_stixshifter.py | 7 +- 7 files changed, 85 insertions(+), 57 deletions(-) diff --git a/src/kestrel_datasource_stixshifter/config.py b/src/kestrel_datasource_stixshifter/config.py index 82179bea..7f5b967d 100644 --- a/src/kestrel_datasource_stixshifter/config.py +++ b/src/kestrel_datasource_stixshifter/config.py @@ -16,6 +16,7 @@ ENV_VAR_PREFIX = "STIXSHIFTER_" RETRIEVAL_BATCH_SIZE = 2000 SINGLE_BATCH_TIMEOUT = 60 +COOL_DOWN_AFTER_TRANSMISSION = 0 FAST_TRANSLATE_CONNECTORS = [] # Suggested: ["qradar", "elastic_ecs"] @@ -139,45 +140,38 @@ def get_datasource_from_profiles(profile_name, profiles): if "options" not in connection: connection["options"] = {} - retrieval_batch_size = RETRIEVAL_BATCH_SIZE - if "retrieval_batch_size" in connection["options"]: - # remove the non-stix-shifter field "retrieval_batch_size" to avoid stix-shifter error - try: - retrieval_batch_size = int( - connection["options"].pop("retrieval_batch_size") - ) - except: - raise InvalidDataSource( - profile_name, - "stixshifter", - f"invalid {profile_name} connection section: options.retrieval_batch_size", - ) - # rename this field for stix-shifter use; x2 the size to ensure retrieval - _logger.debug( - f"profile-loaded retrieval_batch_size: {retrieval_batch_size}" - ) + retrieval_batch_size = _extract_integer_param_from_connection_config( + "retrieval_batch_size", + RETRIEVAL_BATCH_SIZE, + connection, + profile_name, + ) + # rename this field for stix-shifter use; x2 the size to ensure retrieval connection["options"]["result_limit"] = retrieval_batch_size * 2 - single_batch_timeout = SINGLE_BATCH_TIMEOUT - if "single_batch_timeout" in connection["options"]: - # remove the non-stix-shifter field "single_batch_timeout" to avoid stix-shifter error - try: - single_batch_timeout = int( - connection["options"].pop("single_batch_timeout") - ) - except: - raise InvalidDataSource( - profile_name, - "stixshifter", - f"invalid {profile_name} connection section: options.single_batch_timeout", - ) - # rename this field for stix-shifter use - _logger.debug( - f"profile-loaded single_batch_timeout: {single_batch_timeout}" - ) + single_batch_timeout = _extract_integer_param_from_connection_config( + "single_batch_timeout", + SINGLE_BATCH_TIMEOUT, + connection, + profile_name, + ) + # rename this field for stix-shifter use connection["options"]["timeout"] = single_batch_timeout - return connector_name, connection, configuration, retrieval_batch_size + cool_down_after_transmission = _extract_integer_param_from_connection_config( + "cool_down_after_transmission", + COOL_DOWN_AFTER_TRANSMISSION, + connection, + profile_name, + ) + + return ( + connector_name, + connection, + configuration, + retrieval_batch_size, + cool_down_after_transmission, + ) def load_profiles(): @@ -212,3 +206,21 @@ def load_options(): 2, max(1, multiprocessing.cpu_count() - 2) ) return config["options"] + + +def _extract_integer_param_from_connection_config( + param_name, default, connection, profile_name +): + value = default + if param_name in connection["options"]: + # remove the non-stix-shifter field {param_name} to avoid stix-shifter error + try: + value = int(connection["options"].pop(param_name)) + except: + raise InvalidDataSource( + profile_name, + "stixshifter", + f"invalid {profile_name} connection section: options.{param_name}", + ) + _logger.debug(f"profile-loaded {param_name}: {value}") + return value diff --git a/src/kestrel_datasource_stixshifter/interface.py b/src/kestrel_datasource_stixshifter/interface.py index e17c613d..c9ad6d0f 100644 --- a/src/kestrel_datasource_stixshifter/interface.py +++ b/src/kestrel_datasource_stixshifter/interface.py @@ -31,6 +31,7 @@ options: # use any of this section when needed retrieval_batch_size: 10000 # set to 10000 to match default Elasticsearch page size; Kestrel default across connectors: 2000 single_batch_timeout: 120 # increase it if hit 60 seconds (Kestrel default) timeout error for each batch of retrieval + cool_down_after_transmission: 2 # seconds to cool down between data source API calls, required by some API such as sentinelone; Kestrel default: 0 dialects: # more info: https://github.com/opencybersecurityalliance/stix-shifter/tree/develop/stix_shifter_modules/elastic_ecs#dialects - beats # need it if the index is created by Filebeat/Winlogbeat/*beat config: diff --git a/src/kestrel_datasource_stixshifter/multiproc.py b/src/kestrel_datasource_stixshifter/multiproc.py index 2ac71afb..aeadfc83 100644 --- a/src/kestrel_datasource_stixshifter/multiproc.py +++ b/src/kestrel_datasource_stixshifter/multiproc.py @@ -21,6 +21,7 @@ def transmit( configuration_dict: dict, retrieval_batch_size: int, translators_count: int, + cool_down_after_transmission: int, queries: list, raw_records_queue: Queue, limit: Optional[int], @@ -32,6 +33,7 @@ def transmit( configuration_dict, retrieval_batch_size, translators_count, + cool_down_after_transmission, queries, raw_records_queue, limit, diff --git a/src/kestrel_datasource_stixshifter/query.py b/src/kestrel_datasource_stixshifter/query.py index d910ed1e..f3fa5ea6 100644 --- a/src/kestrel_datasource_stixshifter/query.py +++ b/src/kestrel_datasource_stixshifter/query.py @@ -81,6 +81,7 @@ def query_datasource(uri, pattern, session_id, config, store, limit=None): connection_dict, configuration_dict, retrieval_batch_size, + cool_down_after_transmission, ) = map( copy.deepcopy, get_datasource_from_profiles(profile, config["profiles"]) ) @@ -120,6 +121,7 @@ def query_datasource(uri, pattern, session_id, config, store, limit=None): configuration_dict, retrieval_batch_size, config["options"]["translation_workers_count"], + cool_down_after_transmission, dsl["queries"], raw_records_queue, profile_limit, diff --git a/src/kestrel_datasource_stixshifter/worker/transmitter.py b/src/kestrel_datasource_stixshifter/worker/transmitter.py index 3eb1b8cf..a9115767 100644 --- a/src/kestrel_datasource_stixshifter/worker/transmitter.py +++ b/src/kestrel_datasource_stixshifter/worker/transmitter.py @@ -18,6 +18,7 @@ def __init__( configuration_dict: dict, retrieval_batch_size: int, number_of_translators: int, + cool_down_after_transmission: int, queries: list, output_queue: Queue, limit: Optional[int], @@ -29,6 +30,7 @@ def __init__( self.configuration_dict = configuration_dict self.retrieval_batch_size = retrieval_batch_size self.number_of_translators = number_of_translators + self.cool_down_after_transmission = cool_down_after_transmission self.queries = queries self.queue = output_queue self.limit = limit @@ -40,6 +42,7 @@ def run(self): self.connection_dict, self.configuration_dict, self.retrieval_batch_size, + self.cool_down_after_transmission, query, self.queue, self.limit, @@ -61,6 +64,7 @@ def __init__( connection_dict: dict, configuration_dict: dict, retrieval_batch_size: int, + cool_down_after_transmission: int, query: str, output_queue: Queue, limit: Optional[int], @@ -71,6 +75,7 @@ def __init__( self.connection_dict = connection_dict self.configuration_dict = configuration_dict self.retrieval_batch_size = retrieval_batch_size + self.cool_down_after_transmission = cool_down_after_transmission self.query = query self.queue = output_queue self.limit = limit @@ -116,8 +121,10 @@ def wait_datasource_search(self): and status["progress"] < 100 and status["status"] in ("KINIT", "RUNNING") ): - if status["status"] == "RUNNING": - time.sleep(1) + if status["status"] == "KINIT": + time.sleep(self.cool_down_after_transmission) + elif status["status"] == "RUNNING": + time.sleep(max(1, self.cool_down_after_transmission)) status = self.transmission.status(self.search_id) if not status["success"]: err_msg = ( @@ -148,6 +155,7 @@ def retrieve_data(self): while has_remaining_results: packet = None + time.sleep(self.cool_down_after_transmission) result_batch = self.transmission.results( self.search_id, result_retrieval_offset, diff --git a/tests/test_command_find.py b/tests/test_command_find.py index d25cdca0..0d3778c1 100644 --- a/tests/test_command_find.py +++ b/tests/test_command_find.py @@ -19,7 +19,7 @@ def proc_bundle_file(): return os.path.join(cwd, "doctored-1k.json") -def test_return_table_not_exist(fake_bundle_file): +def test_return_table_not_exist(set_empty_kestrel_config, fake_bundle_file): with Session() as s: stmt = f""" conns = get network-traffic @@ -59,7 +59,7 @@ def test_return_table_not_exist(fake_bundle_file): assert output_dict == correct_dict -def test_find_srcs(fake_bundle_file): +def test_find_srcs(set_empty_kestrel_config, fake_bundle_file): with Session() as s: stmt = f""" conns = get network-traffic @@ -72,7 +72,7 @@ def test_find_srcs(fake_bundle_file): assert len(srcs) == 24 -def test_find_srcs_limit(fake_bundle_file): +def test_find_srcs_limit(set_empty_kestrel_config, fake_bundle_file): with Session() as s: stmt = f""" conns = get network-traffic @@ -85,7 +85,7 @@ def test_find_srcs_limit(fake_bundle_file): assert len(srcs) == 1 -def test_find_file_linked_to_process(proc_bundle_file): +def test_find_file_linked_to_process(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" procs = get process @@ -102,7 +102,7 @@ def test_find_file_linked_to_process(proc_bundle_file): assert len(files) == 4 -def test_find_file_linked_to_process_limit_1(proc_bundle_file): +def test_find_file_linked_to_process_limit_1(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" procs = get process @@ -119,7 +119,7 @@ def test_find_file_linked_to_process_limit_1(proc_bundle_file): assert len(files) == 1 -def test_find_file_linked_to_process_limit_2(proc_bundle_file): +def test_find_file_linked_to_process_limit_2(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" procs = get process @@ -136,7 +136,7 @@ def test_find_file_linked_to_process_limit_2(proc_bundle_file): assert len(files) == 4 -def test_find_file_linked_to_process_2(): +def test_find_file_linked_to_process_2(set_empty_kestrel_config): stixshifter_data_url = "https://raw.githubusercontent.com/opencybersecurityalliance/stix-shifter/develop/data/cybox" bundle = f"{stixshifter_data_url}/carbon_black/cb_observed_156.json" with Session() as s: @@ -152,7 +152,7 @@ def test_find_file_linked_to_process_2(): assert len(files) == 3 -def test_find_file_linked_to_process_2_limit(): +def test_find_file_linked_to_process_2_limit(set_empty_kestrel_config): stixshifter_data_url = "https://raw.githubusercontent.com/opencybersecurityalliance/stix-shifter/develop/data/cybox" bundle = f"{stixshifter_data_url}/carbon_black/cb_observed_156.json" with Session() as s: @@ -168,7 +168,7 @@ def test_find_file_linked_to_process_2_limit(): assert len(files) == 2 -def test_find_file_loaded_by_process(proc_bundle_file): +def test_find_file_loaded_by_process(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" procs = get process @@ -185,7 +185,7 @@ def test_find_file_loaded_by_process(proc_bundle_file): assert len(files) == 1 -def test_find_process_created_process(proc_bundle_file): +def test_find_process_created_process(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" procs = get process @@ -199,7 +199,7 @@ def test_find_process_created_process(proc_bundle_file): assert len(data) -def test_find_refs_resolution_not_reversed_src_ref(proc_bundle_file): +def test_find_refs_resolution_not_reversed_src_ref(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" nt = get network-traffic @@ -213,7 +213,7 @@ def test_find_refs_resolution_not_reversed_src_ref(proc_bundle_file): assert len(p) >= 948 # FIXME: duplicate process objects -def test_find_refs_resolution_not_reversed_src_ref_limit(proc_bundle_file): +def test_find_refs_resolution_not_reversed_src_ref_limit(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" nt = get network-traffic @@ -227,7 +227,7 @@ def test_find_refs_resolution_not_reversed_src_ref_limit(proc_bundle_file): assert len(p) == 10 -def test_find_refs_resolution_reversed_src_ref(proc_bundle_file): +def test_find_refs_resolution_reversed_src_ref(set_empty_kestrel_config, proc_bundle_file): with Session(debug_mode=True) as s: stmt = f""" procs = get process @@ -247,7 +247,7 @@ def test_find_refs_resolution_reversed_src_ref(proc_bundle_file): print(json.dumps(data, indent=4)) -def test_find_refs_resolution_reversed_src_ref_limit(proc_bundle_file): +def test_find_refs_resolution_reversed_src_ref_limit(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" procs = get process @@ -267,7 +267,7 @@ def test_find_refs_resolution_reversed_src_ref_limit(proc_bundle_file): print(json.dumps(data, indent=4)) -def test_find_without_where_ext_pattern(proc_bundle_file): +def test_find_without_where_ext_pattern(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" conns = get network-traffic @@ -312,7 +312,7 @@ def test_find_with_where_ext_pattern(set_no_prefetch_kestrel_config, proc_bundle assert procs.records_count == 203 -def test_find_with_limit(proc_bundle_file): +def test_find_with_limit(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" conns = get network-traffic @@ -333,7 +333,7 @@ def test_find_with_limit(proc_bundle_file): assert procs.records_count == 100 -def test_find_with_where_centered_pattern(proc_bundle_file): +def test_find_with_where_centered_pattern(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" conns = get network-traffic @@ -355,7 +355,7 @@ def test_find_with_where_centered_pattern(proc_bundle_file): assert procs.records_count == 1 -def test_find_from_empty_input(proc_bundle_file): +def test_find_from_empty_input(set_empty_kestrel_config, proc_bundle_file): with Session() as s: stmt = f""" conns = get network-traffic diff --git a/tests/test_stixshifter.py b/tests/test_stixshifter.py index e51d921e..d8cb93a5 100644 --- a/tests/test_stixshifter.py +++ b/tests/test_stixshifter.py @@ -50,6 +50,7 @@ def test_yaml_profiles_refresh(tmp_path): options: retrieval_batch_size: 10000 single_batch_timeout: 120 + cool_down_after_transmission: 5 dialects: - beats config: @@ -78,13 +79,14 @@ def test_yaml_profiles_refresh(tmp_path): ss_config = s.config["datasources"]["kestrel_datasource_stixshifter"] ss_profiles = ss_config["profiles"] - connector_name, connection, configuration, retrieval_batch_size = get_datasource_from_profiles("host101", ss_profiles) + connector_name, connection, configuration, retrieval_batch_size, cool_down_after_transmission = get_datasource_from_profiles("host101", ss_profiles) assert connector_name == "elastic_ecs" assert configuration["auth"]["id"] == "profileA" assert configuration["auth"]["api_key"] == "qwer" assert connection["options"]["timeout"] == 60 assert connection["options"]["result_limit"] == 2000 * 2 assert retrieval_batch_size == 2000 + assert cool_down_after_transmission == 0 with open(profile_file, "w") as pf: pf.write(profileB) @@ -93,12 +95,13 @@ def test_yaml_profiles_refresh(tmp_path): # need to refresh the pointers since the dict is updated ss_profiles = ss_config["profiles"] - connector_name, connection, configuration, retrieval_batch_size = get_datasource_from_profiles("host101", ss_profiles) + connector_name, connection, configuration, retrieval_batch_size, cool_down_after_transmission = get_datasource_from_profiles("host101", ss_profiles) assert connector_name == "elastic_ecs" assert configuration["auth"]["id"] == "profileB" assert configuration["auth"]["api_key"] == "asdf" assert connection["options"]["timeout"] == 120 assert connection["options"]["result_limit"] == 10000 * 2 assert retrieval_batch_size == 10000 + assert cool_down_after_transmission == 5 del os.environ["KESTREL_STIXSHIFTER_CONFIG"]