Skip to content

Commit

Permalink
add unit tests for ss diagnosis
Browse files Browse the repository at this point in the history
  • Loading branch information
subbyte committed Jul 26, 2023
1 parent 17093ba commit 0d653f9
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 26 deletions.
2 changes: 2 additions & 0 deletions src/kestrel_datasource_stixshifter/diagnosis.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,5 @@ def diagnose_run_query_and_retrieval_result(self, stix_patterns, max_batch_cnt):
break
else:
print(f"no result matched for pattern: {pattern}, go next pattern")

return result_counts
34 changes: 8 additions & 26 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
import pytest
import os
import subprocess

@pytest.fixture()
def setup_huntflow(tmp_path):
profiles = """
profiles:
lab101:
connector: stix_bundle
connection:
host: https://github.com/opencybersecurityalliance/data-bucket-kestrel/blob/main/stix-bundles/lab101.json?raw=true
config:
auth:
username:
password:
"""
from .utils import stixshifter_profile_lab101

@pytest.fixture()
def create_huntflow(tmp_path):
huntflow = """
procs = GET process FROM stixshifter://lab101
WHERE name = 'svchost.exe'
Expand All @@ -24,27 +13,20 @@ def setup_huntflow(tmp_path):

expected_result_lines = ["VARIABLE TYPE #(ENTITIES) #(RECORDS) directory* file* ipv4-addr* ipv6-addr* mac-addr* network-traffic* process* user-account* x-ecs-destination* x-ecs-network* x-ecs-process* x-ecs-source* x-ecs-user* x-oca-asset* x-oca-event*", " procs process 389 1066 1078 1114 3190 1910 1066 1014 725 1062 2016 2016 2120 2024 2124 1066 2132"]

profile_file = tmp_path / "stixshifter.yaml"
huntflow_file = tmp_path / "hunt101.hf"

os.environ["KESTREL_STIXSHIFTER_CONFIG"] = str(profile_file.expanduser().resolve())
with open(profile_file, "w") as pf:
pf.write(profiles)

with open(huntflow_file, "w") as hf:
hf.write(huntflow)

huntflow_file_path = str(huntflow_file.expanduser().resolve())

# https://docs.pytest.org/en/latest/how-to/fixtures.html#teardown-cleanup-aka-fixture-finalization
yield huntflow_file_path, expected_result_lines
del os.environ["KESTREL_STIXSHIFTER_CONFIG"]
return huntflow_file_path, expected_result_lines



def test_cli(setup_huntflow):
def test_cli(create_huntflow, stixshifter_profile_lab101):

huntflow_file_path, expected_result_lines = setup_huntflow
huntflow_file_path, expected_result_lines = create_huntflow
result = subprocess.run(args = ["kestrel", huntflow_file_path],
universal_newlines = True,
stdout = subprocess.PIPE
Expand All @@ -55,9 +37,9 @@ def test_cli(setup_huntflow):
assert result_lines[-2] == expected_result_lines[1]


def test_python_module_call(setup_huntflow):
def test_python_module_call(create_huntflow, stixshifter_profile_lab101):

huntflow_file_path, expected_result_lines = setup_huntflow
huntflow_file_path, expected_result_lines = create_huntflow
result = subprocess.run(args = ["python", "-m", "kestrel", huntflow_file_path],
universal_newlines = True,
stdout = subprocess.PIPE
Expand Down
89 changes: 89 additions & 0 deletions tests/test_stixshifter_diagnosis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import pytest
import subprocess

from kestrel_datasource_stixshifter.diagnosis import Diagnosis
from .utils import stixshifter_profile_lab101


def test_diagnosis(stixshifter_profile_lab101):
pattern = " ".join([
"[ipv4-addr:value LIKE '%']",
"START t'2000-01-01T00:00:00.000Z' STOP t'3000-01-01T00:00:00.000Z'",
])
diag = Diagnosis("lab101")
diag.diagnose_config()
diag.diagnose_ping()
assert pattern == diag.diagnose_translate_query(pattern)["queries"][0]
res = diag.diagnose_run_query_and_retrieval_result([pattern], 1)
assert len(res) == 1 and res[0] == 15


def test_cli(stixshifter_profile_lab101):

expected_output = """
## Diagnose: config verification
#### Kestrel specific config
retrieval batch size: 2000
cool down after transmission: 0
enable fast translation: False
#### Config to be passed to stix-shifter
connector name: stix_bundle
connection object [ref: https://github.com/opencybersecurityalliance/stix-shifter/blob/develop/OVERVIEW.md#connection]:
{
"host": "https://github.com/opencybersecurityalliance/data-bucket-kestrel/blob/main/stix-bundles/lab101.json?raw=true",
"options": {
"result_limit": 4000,
"timeout": 60
}
}
configuration object [ref: https://github.com/opencybersecurityalliance/stix-shifter/blob/develop/OVERVIEW.md#configuration]:
{
"auth": {
"username": null,
"password": null
}
}
## Diagnose: stix-shifter to data source connection (network, auth)
#### Results from stixshifter transmission.ping()
{
"success": true
}
## Diagnose: stix-shifter query translation
#### Input pattern
[ipv4-addr:value LIKE '%'] START t'2000-01-01T00:00:00.000Z' STOP t'3000-01-01T00:00:00.000Z'
#### Output data source native query
{
"queries": [
"[ipv4-addr:value LIKE '%'] START t'2000-01-01T00:00:00.000Z' STOP t'3000-01-01T00:00:00.000Z'"
]
}
## Diagnose: stix-shifter query execution: <=1 batch(s)
#### data retrieval results:
one batch retrieved: 15 observations
## Diagnose: stix-shifter query execution: <=5 batch(s)
#### data retrieval results:
one batch retrieved: 15 observations
"""

result = subprocess.run(args = ["stix-shifter-diag", "lab101"],
universal_newlines = True,
stdout = subprocess.PIPE
)

result_lines = result.stdout.splitlines()
result_lines = [x for x in result_lines if x]
expected_lines = expected_output.splitlines()
expected_lines = [x for x in expected_lines if x]
for x,y in zip(result_lines, expected_lines):
assert x == y
21 changes: 21 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import pytest

# https://docs.pytest.org/en/latest/how-to/fixtures.html#teardown-cleanup-aka-fixture-finalization

@pytest.fixture
def set_empty_kestrel_config(tmp_path):
Expand All @@ -27,3 +28,23 @@ def set_no_prefetch_kestrel_config(tmp_path):
)
yield None
del os.environ["KESTREL_CONFIG"]


@pytest.fixture
def stixshifter_profile_lab101(tmp_path):
profile_file = tmp_path / "stixshifter.yaml"
os.environ["KESTREL_STIXSHIFTER_CONFIG"] = str(profile_file.expanduser().resolve())
with open(profile_file, "w") as pf:
pf.write("""
profiles:
lab101:
connector: stix_bundle
connection:
host: https://github.com/opencybersecurityalliance/data-bucket-kestrel/blob/main/stix-bundles/lab101.json?raw=true
config:
auth:
username:
password:
""")
yield None
del os.environ["KESTREL_STIXSHIFTER_CONFIG"]

0 comments on commit 0d653f9

Please sign in to comment.