Skip to content

Commit

Permalink
add param cool_down_after_transmission to ss
Browse files Browse the repository at this point in the history
  • Loading branch information
subbyte committed Jul 24, 2023
1 parent cfcd2b4 commit 1299330
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 57 deletions.
82 changes: 47 additions & 35 deletions src/kestrel_datasource_stixshifter/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ENV_VAR_PREFIX = "STIXSHIFTER_"
RETRIEVAL_BATCH_SIZE = 2000
SINGLE_BATCH_TIMEOUT = 60
COOL_DOWN_AFTER_TRANSMISSION = 0
FAST_TRANSLATE_CONNECTORS = [] # Suggested: ["qradar", "elastic_ecs"]


Expand Down Expand Up @@ -139,45 +140,38 @@ def get_datasource_from_profiles(profile_name, profiles):
if "options" not in connection:
connection["options"] = {}

retrieval_batch_size = RETRIEVAL_BATCH_SIZE
if "retrieval_batch_size" in connection["options"]:
# remove the non-stix-shifter field "retrieval_batch_size" to avoid stix-shifter error
try:
retrieval_batch_size = int(
connection["options"].pop("retrieval_batch_size")
)
except:
raise InvalidDataSource(
profile_name,
"stixshifter",
f"invalid {profile_name} connection section: options.retrieval_batch_size",
)
# rename this field for stix-shifter use; x2 the size to ensure retrieval
_logger.debug(
f"profile-loaded retrieval_batch_size: {retrieval_batch_size}"
)
retrieval_batch_size = _extract_integer_param_from_connection_config(
"retrieval_batch_size",
RETRIEVAL_BATCH_SIZE,
connection,
profile_name,
)
# rename this field for stix-shifter use; x2 the size to ensure retrieval
connection["options"]["result_limit"] = retrieval_batch_size * 2

single_batch_timeout = SINGLE_BATCH_TIMEOUT
if "single_batch_timeout" in connection["options"]:
# remove the non-stix-shifter field "single_batch_timeout" to avoid stix-shifter error
try:
single_batch_timeout = int(
connection["options"].pop("single_batch_timeout")
)
except:
raise InvalidDataSource(
profile_name,
"stixshifter",
f"invalid {profile_name} connection section: options.single_batch_timeout",
)
# rename this field for stix-shifter use
_logger.debug(
f"profile-loaded single_batch_timeout: {single_batch_timeout}"
)
single_batch_timeout = _extract_integer_param_from_connection_config(
"single_batch_timeout",
SINGLE_BATCH_TIMEOUT,
connection,
profile_name,
)
# rename this field for stix-shifter use
connection["options"]["timeout"] = single_batch_timeout

return connector_name, connection, configuration, retrieval_batch_size
cool_down_after_transmission = _extract_integer_param_from_connection_config(
"cool_down_after_transmission",
COOL_DOWN_AFTER_TRANSMISSION,
connection,
profile_name,
)

return (
connector_name,
connection,
configuration,
retrieval_batch_size,
cool_down_after_transmission,
)


def load_profiles():
Expand Down Expand Up @@ -212,3 +206,21 @@ def load_options():
2, max(1, multiprocessing.cpu_count() - 2)
)
return config["options"]


def _extract_integer_param_from_connection_config(
param_name, default, connection, profile_name
):
value = default
if param_name in connection["options"]:
# remove the non-stix-shifter field {param_name} to avoid stix-shifter error
try:
value = int(connection["options"].pop(param_name))
except:
raise InvalidDataSource(
profile_name,
"stixshifter",
f"invalid {profile_name} connection section: options.{param_name}",
)
_logger.debug(f"profile-loaded {param_name}: {value}")
return value
1 change: 1 addition & 0 deletions src/kestrel_datasource_stixshifter/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
options: # use any of this section when needed
retrieval_batch_size: 10000 # set to 10000 to match default Elasticsearch page size; Kestrel default across connectors: 2000
single_batch_timeout: 120 # increase it if hit 60 seconds (Kestrel default) timeout error for each batch of retrieval
cool_down_after_transmission: 2 # seconds to cool down between data source API calls, required by some API such as sentinelone; Kestrel default: 0
dialects: # more info: https://github.com/opencybersecurityalliance/stix-shifter/tree/develop/stix_shifter_modules/elastic_ecs#dialects
- beats # need it if the index is created by Filebeat/Winlogbeat/*beat
config:
Expand Down
2 changes: 2 additions & 0 deletions src/kestrel_datasource_stixshifter/multiproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def transmit(
configuration_dict: dict,
retrieval_batch_size: int,
translators_count: int,
cool_down_after_transmission: int,
queries: list,
raw_records_queue: Queue,
limit: Optional[int],
Expand All @@ -32,6 +33,7 @@ def transmit(
configuration_dict,
retrieval_batch_size,
translators_count,
cool_down_after_transmission,
queries,
raw_records_queue,
limit,
Expand Down
2 changes: 2 additions & 0 deletions src/kestrel_datasource_stixshifter/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def query_datasource(uri, pattern, session_id, config, store, limit=None):
connection_dict,
configuration_dict,
retrieval_batch_size,
cool_down_after_transmission,
) = map(
copy.deepcopy, get_datasource_from_profiles(profile, config["profiles"])
)
Expand Down Expand Up @@ -120,6 +121,7 @@ def query_datasource(uri, pattern, session_id, config, store, limit=None):
configuration_dict,
retrieval_batch_size,
config["options"]["translation_workers_count"],
cool_down_after_transmission,
dsl["queries"],
raw_records_queue,
profile_limit,
Expand Down
12 changes: 10 additions & 2 deletions src/kestrel_datasource_stixshifter/worker/transmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(
configuration_dict: dict,
retrieval_batch_size: int,
number_of_translators: int,
cool_down_after_transmission: int,
queries: list,
output_queue: Queue,
limit: Optional[int],
Expand All @@ -29,6 +30,7 @@ def __init__(
self.configuration_dict = configuration_dict
self.retrieval_batch_size = retrieval_batch_size
self.number_of_translators = number_of_translators
self.cool_down_after_transmission = cool_down_after_transmission
self.queries = queries
self.queue = output_queue
self.limit = limit
Expand All @@ -40,6 +42,7 @@ def run(self):
self.connection_dict,
self.configuration_dict,
self.retrieval_batch_size,
self.cool_down_after_transmission,
query,
self.queue,
self.limit,
Expand All @@ -61,6 +64,7 @@ def __init__(
connection_dict: dict,
configuration_dict: dict,
retrieval_batch_size: int,
cool_down_after_transmission: int,
query: str,
output_queue: Queue,
limit: Optional[int],
Expand All @@ -71,6 +75,7 @@ def __init__(
self.connection_dict = connection_dict
self.configuration_dict = configuration_dict
self.retrieval_batch_size = retrieval_batch_size
self.cool_down_after_transmission = cool_down_after_transmission
self.query = query
self.queue = output_queue
self.limit = limit
Expand Down Expand Up @@ -116,8 +121,10 @@ def wait_datasource_search(self):
and status["progress"] < 100
and status["status"] in ("KINIT", "RUNNING")
):
if status["status"] == "RUNNING":
time.sleep(1)
if status["status"] == "KINIT":
time.sleep(self.cool_down_after_transmission)
elif status["status"] == "RUNNING":
time.sleep(max(1, self.cool_down_after_transmission))
status = self.transmission.status(self.search_id)
if not status["success"]:
err_msg = (
Expand Down Expand Up @@ -148,6 +155,7 @@ def retrieve_data(self):

while has_remaining_results:
packet = None
time.sleep(self.cool_down_after_transmission)
result_batch = self.transmission.results(
self.search_id,
result_retrieval_offset,
Expand Down
36 changes: 18 additions & 18 deletions tests/test_command_find.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def proc_bundle_file():
return os.path.join(cwd, "doctored-1k.json")


def test_return_table_not_exist(fake_bundle_file):
def test_return_table_not_exist(set_empty_kestrel_config, fake_bundle_file):
with Session() as s:
stmt = f"""
conns = get network-traffic
Expand Down Expand Up @@ -59,7 +59,7 @@ def test_return_table_not_exist(fake_bundle_file):
assert output_dict == correct_dict


def test_find_srcs(fake_bundle_file):
def test_find_srcs(set_empty_kestrel_config, fake_bundle_file):
with Session() as s:
stmt = f"""
conns = get network-traffic
Expand All @@ -72,7 +72,7 @@ def test_find_srcs(fake_bundle_file):
assert len(srcs) == 24


def test_find_srcs_limit(fake_bundle_file):
def test_find_srcs_limit(set_empty_kestrel_config, fake_bundle_file):
with Session() as s:
stmt = f"""
conns = get network-traffic
Expand All @@ -85,7 +85,7 @@ def test_find_srcs_limit(fake_bundle_file):
assert len(srcs) == 1


def test_find_file_linked_to_process(proc_bundle_file):
def test_find_file_linked_to_process(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
procs = get process
Expand All @@ -102,7 +102,7 @@ def test_find_file_linked_to_process(proc_bundle_file):
assert len(files) == 4


def test_find_file_linked_to_process_limit_1(proc_bundle_file):
def test_find_file_linked_to_process_limit_1(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
procs = get process
Expand All @@ -119,7 +119,7 @@ def test_find_file_linked_to_process_limit_1(proc_bundle_file):
assert len(files) == 1


def test_find_file_linked_to_process_limit_2(proc_bundle_file):
def test_find_file_linked_to_process_limit_2(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
procs = get process
Expand All @@ -136,7 +136,7 @@ def test_find_file_linked_to_process_limit_2(proc_bundle_file):
assert len(files) == 4


def test_find_file_linked_to_process_2():
def test_find_file_linked_to_process_2(set_empty_kestrel_config):
stixshifter_data_url = "https://raw.githubusercontent.com/opencybersecurityalliance/stix-shifter/develop/data/cybox"
bundle = f"{stixshifter_data_url}/carbon_black/cb_observed_156.json"
with Session() as s:
Expand All @@ -152,7 +152,7 @@ def test_find_file_linked_to_process_2():
assert len(files) == 3


def test_find_file_linked_to_process_2_limit():
def test_find_file_linked_to_process_2_limit(set_empty_kestrel_config):
stixshifter_data_url = "https://raw.githubusercontent.com/opencybersecurityalliance/stix-shifter/develop/data/cybox"
bundle = f"{stixshifter_data_url}/carbon_black/cb_observed_156.json"
with Session() as s:
Expand All @@ -168,7 +168,7 @@ def test_find_file_linked_to_process_2_limit():
assert len(files) == 2


def test_find_file_loaded_by_process(proc_bundle_file):
def test_find_file_loaded_by_process(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
procs = get process
Expand All @@ -185,7 +185,7 @@ def test_find_file_loaded_by_process(proc_bundle_file):
assert len(files) == 1


def test_find_process_created_process(proc_bundle_file):
def test_find_process_created_process(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
procs = get process
Expand All @@ -199,7 +199,7 @@ def test_find_process_created_process(proc_bundle_file):
assert len(data)


def test_find_refs_resolution_not_reversed_src_ref(proc_bundle_file):
def test_find_refs_resolution_not_reversed_src_ref(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
nt = get network-traffic
Expand All @@ -213,7 +213,7 @@ def test_find_refs_resolution_not_reversed_src_ref(proc_bundle_file):
assert len(p) >= 948 # FIXME: duplicate process objects


def test_find_refs_resolution_not_reversed_src_ref_limit(proc_bundle_file):
def test_find_refs_resolution_not_reversed_src_ref_limit(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
nt = get network-traffic
Expand All @@ -227,7 +227,7 @@ def test_find_refs_resolution_not_reversed_src_ref_limit(proc_bundle_file):
assert len(p) == 10


def test_find_refs_resolution_reversed_src_ref(proc_bundle_file):
def test_find_refs_resolution_reversed_src_ref(set_empty_kestrel_config, proc_bundle_file):
with Session(debug_mode=True) as s:
stmt = f"""
procs = get process
Expand All @@ -247,7 +247,7 @@ def test_find_refs_resolution_reversed_src_ref(proc_bundle_file):
print(json.dumps(data, indent=4))


def test_find_refs_resolution_reversed_src_ref_limit(proc_bundle_file):
def test_find_refs_resolution_reversed_src_ref_limit(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
procs = get process
Expand All @@ -267,7 +267,7 @@ def test_find_refs_resolution_reversed_src_ref_limit(proc_bundle_file):
print(json.dumps(data, indent=4))


def test_find_without_where_ext_pattern(proc_bundle_file):
def test_find_without_where_ext_pattern(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
conns = get network-traffic
Expand Down Expand Up @@ -312,7 +312,7 @@ def test_find_with_where_ext_pattern(set_no_prefetch_kestrel_config, proc_bundle
assert procs.records_count == 203


def test_find_with_limit(proc_bundle_file):
def test_find_with_limit(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
conns = get network-traffic
Expand All @@ -333,7 +333,7 @@ def test_find_with_limit(proc_bundle_file):
assert procs.records_count == 100


def test_find_with_where_centered_pattern(proc_bundle_file):
def test_find_with_where_centered_pattern(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
conns = get network-traffic
Expand All @@ -355,7 +355,7 @@ def test_find_with_where_centered_pattern(proc_bundle_file):
assert procs.records_count == 1


def test_find_from_empty_input(proc_bundle_file):
def test_find_from_empty_input(set_empty_kestrel_config, proc_bundle_file):
with Session() as s:
stmt = f"""
conns = get network-traffic
Expand Down
Loading

0 comments on commit 1299330

Please sign in to comment.