diff --git a/src/kestrel_datasource_stixshifter/diagnosis.py b/src/kestrel_datasource_stixshifter/diagnosis.py index 56ad0059..e19d11e9 100644 --- a/src/kestrel_datasource_stixshifter/diagnosis.py +++ b/src/kestrel_datasource_stixshifter/diagnosis.py @@ -142,3 +142,5 @@ def diagnose_run_query_and_retrieval_result(self, stix_patterns, max_batch_cnt): break else: print(f"no result matched for pattern: {pattern}, go next pattern") + + return result_counts diff --git a/tests/test_cli.py b/tests/test_cli.py index 5c31333e..e831423e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,21 +1,10 @@ import pytest -import os import subprocess -@pytest.fixture() -def setup_huntflow(tmp_path): - profiles = """ -profiles: - lab101: - connector: stix_bundle - connection: - host: https://github.com/opencybersecurityalliance/data-bucket-kestrel/blob/main/stix-bundles/lab101.json?raw=true - config: - auth: - username: - password: -""" +from .utils import stixshifter_profile_lab101 +@pytest.fixture() +def create_huntflow(tmp_path): huntflow = """ procs = GET process FROM stixshifter://lab101 WHERE name = 'svchost.exe' @@ -24,27 +13,20 @@ def setup_huntflow(tmp_path): expected_result_lines = ["VARIABLE TYPE #(ENTITIES) #(RECORDS) directory* file* ipv4-addr* ipv6-addr* mac-addr* network-traffic* process* user-account* x-ecs-destination* x-ecs-network* x-ecs-process* x-ecs-source* x-ecs-user* x-oca-asset* x-oca-event*", " procs process 389 1066 1078 1114 3190 1910 1066 1014 725 1062 2016 2016 2120 2024 2124 1066 2132"] - profile_file = tmp_path / "stixshifter.yaml" huntflow_file = tmp_path / "hunt101.hf" - os.environ["KESTREL_STIXSHIFTER_CONFIG"] = str(profile_file.expanduser().resolve()) - with open(profile_file, "w") as pf: - pf.write(profiles) - with open(huntflow_file, "w") as hf: hf.write(huntflow) huntflow_file_path = str(huntflow_file.expanduser().resolve()) - # https://docs.pytest.org/en/latest/how-to/fixtures.html#teardown-cleanup-aka-fixture-finalization - yield huntflow_file_path, expected_result_lines - del os.environ["KESTREL_STIXSHIFTER_CONFIG"] + return huntflow_file_path, expected_result_lines -def test_cli(setup_huntflow): +def test_cli(create_huntflow, stixshifter_profile_lab101): - huntflow_file_path, expected_result_lines = setup_huntflow + huntflow_file_path, expected_result_lines = create_huntflow result = subprocess.run(args = ["kestrel", huntflow_file_path], universal_newlines = True, stdout = subprocess.PIPE @@ -55,9 +37,9 @@ def test_cli(setup_huntflow): assert result_lines[-2] == expected_result_lines[1] -def test_python_module_call(setup_huntflow): +def test_python_module_call(create_huntflow, stixshifter_profile_lab101): - huntflow_file_path, expected_result_lines = setup_huntflow + huntflow_file_path, expected_result_lines = create_huntflow result = subprocess.run(args = ["python", "-m", "kestrel", huntflow_file_path], universal_newlines = True, stdout = subprocess.PIPE diff --git a/tests/test_stixshifter_diagnosis.py b/tests/test_stixshifter_diagnosis.py new file mode 100644 index 00000000..f5efe47e --- /dev/null +++ b/tests/test_stixshifter_diagnosis.py @@ -0,0 +1,89 @@ +import pytest +import subprocess + +from kestrel_datasource_stixshifter.diagnosis import Diagnosis +from .utils import stixshifter_profile_lab101 + + +def test_diagnosis(stixshifter_profile_lab101): + pattern = " ".join([ + "[ipv4-addr:value LIKE '%']", + "START t'2000-01-01T00:00:00.000Z' STOP t'3000-01-01T00:00:00.000Z'", + ]) + diag = Diagnosis("lab101") + diag.diagnose_config() + diag.diagnose_ping() + assert pattern == diag.diagnose_translate_query(pattern)["queries"][0] + res = diag.diagnose_run_query_and_retrieval_result([pattern], 1) + assert len(res) == 1 and res[0] == 15 + + +def test_cli(stixshifter_profile_lab101): + + expected_output = """ +## Diagnose: config verification + +#### Kestrel specific config +retrieval batch size: 2000 +cool down after transmission: 0 +enable fast translation: False + +#### Config to be passed to stix-shifter +connector name: stix_bundle +connection object [ref: https://github.com/opencybersecurityalliance/stix-shifter/blob/develop/OVERVIEW.md#connection]: +{ + "host": "https://github.com/opencybersecurityalliance/data-bucket-kestrel/blob/main/stix-bundles/lab101.json?raw=true", + "options": { + "result_limit": 4000, + "timeout": 60 + } +} +configuration object [ref: https://github.com/opencybersecurityalliance/stix-shifter/blob/develop/OVERVIEW.md#configuration]: +{ + "auth": { + "username": null, + "password": null + } +} + +## Diagnose: stix-shifter to data source connection (network, auth) + +#### Results from stixshifter transmission.ping() +{ + "success": true +} + +## Diagnose: stix-shifter query translation + +#### Input pattern +[ipv4-addr:value LIKE '%'] START t'2000-01-01T00:00:00.000Z' STOP t'3000-01-01T00:00:00.000Z' + +#### Output data source native query +{ + "queries": [ + "[ipv4-addr:value LIKE '%'] START t'2000-01-01T00:00:00.000Z' STOP t'3000-01-01T00:00:00.000Z'" + ] +} + +## Diagnose: stix-shifter query execution: <=1 batch(s) + +#### data retrieval results: +one batch retrieved: 15 observations + +## Diagnose: stix-shifter query execution: <=5 batch(s) + +#### data retrieval results: +one batch retrieved: 15 observations +""" + + result = subprocess.run(args = ["stix-shifter-diag", "lab101"], + universal_newlines = True, + stdout = subprocess.PIPE + ) + + result_lines = result.stdout.splitlines() + result_lines = [x for x in result_lines if x] + expected_lines = expected_output.splitlines() + expected_lines = [x for x in expected_lines if x] + for x,y in zip(result_lines, expected_lines): + assert x == y diff --git a/tests/utils.py b/tests/utils.py index 2de6643c..2600cac1 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,6 +1,7 @@ import os import pytest +# https://docs.pytest.org/en/latest/how-to/fixtures.html#teardown-cleanup-aka-fixture-finalization @pytest.fixture def set_empty_kestrel_config(tmp_path): @@ -27,3 +28,23 @@ def set_no_prefetch_kestrel_config(tmp_path): ) yield None del os.environ["KESTREL_CONFIG"] + + +@pytest.fixture +def stixshifter_profile_lab101(tmp_path): + profile_file = tmp_path / "stixshifter.yaml" + os.environ["KESTREL_STIXSHIFTER_CONFIG"] = str(profile_file.expanduser().resolve()) + with open(profile_file, "w") as pf: + pf.write(""" +profiles: + lab101: + connector: stix_bundle + connection: + host: https://github.com/opencybersecurityalliance/data-bucket-kestrel/blob/main/stix-bundles/lab101.json?raw=true + config: + auth: + username: + password: +""") + yield None + del os.environ["KESTREL_STIXSHIFTER_CONFIG"]