Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions tests/integration/test_nft_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,16 @@ def test_container_killed_on_metadata_access(self, test_workspace, coi_binary):
timeout=10,
)

# Wait for kill action
time.sleep(8)
# Wait for kill action with retry loop
killed = False
for _ in range(15):
time.sleep(1)
state = get_container_state(container_name)
if state in ("Stopped", "Unknown"):
killed = True
break

# Verify container was killed or stopped
state = get_container_state(container_name)
assert state in ("Stopped", "Unknown"), (
f"Container should have been killed but state is {state}"
)
assert killed, f"Container should have been killed but state is {state}"

finally:
proc.terminate()
Expand Down Expand Up @@ -729,18 +731,25 @@ def test_nft_rules_cleaned_on_auto_kill(self, test_workspace, coi_binary):
)

# Wait for responder to detect threat and kill container
time.sleep(10)
killed = False
for _ in range(15):
time.sleep(1)
state = get_container_state(container_name)
if state in ("Stopped", "Unknown"):
killed = True
break

# Verify container was killed
state = get_container_state(container_name)
assert state in ("Stopped", "Unknown"), (
f"Container should have been killed but state is {state}"
)
assert killed, f"Container should have been killed but state is {state}"

# Verify NFT rules are cleaned up
assert not check_nft_rules_exist(container_ip), (
f"NFT rules should be cleaned up for {container_ip} after auto-kill"
)
# Verify NFT rules are cleaned up (may take a moment after kill)
cleaned = False
for _ in range(15):
if not check_nft_rules_exist(container_ip):
cleaned = True
break
time.sleep(1)

assert cleaned, f"NFT rules should be cleaned up for {container_ip} after auto-kill"

finally:
proc.terminate()
Expand Down Expand Up @@ -895,16 +904,25 @@ def test_firewall_rules_cleaned_on_auto_kill(self, test_workspace, coi_binary):
)

# Wait for responder to detect threat and kill container
time.sleep(10)
killed = False
for _ in range(15):
time.sleep(1)
state = get_container_state(container_name)
if state in ("Stopped", "Unknown"):
killed = True
break

# Verify container was killed
state = get_container_state(container_name)
assert state in ("Stopped", "Unknown"), (
f"Container should have been killed but state is {state}"
)
assert killed, f"Container should have been killed but state is {state}"

# Verify firewall rules are cleaned up (may take a moment after kill)
cleaned = False
for _ in range(15):
if not check_firewall_rules_exist(container_ip):
cleaned = True
break
time.sleep(1)

# Verify firewall rules are cleaned up
assert not check_firewall_rules_exist(container_ip), (
assert cleaned, (
f"Firewall rules should be cleaned up for {container_ip} after auto-kill"
)

Expand Down
31 changes: 16 additions & 15 deletions tests/integration/test_security_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,21 @@ def test_reverse_shell_detection(self, test_workspace, enable_monitoring, coi_bi
stderr=stderr_fd, # Capture stderr for debugging
)

# Wait for container to be created
time.sleep(8)

# Wait for container to be created and running (may take longer on first run
# when the image is not yet cached)
container_name = get_container_name_from_workspace(test_workspace)
ready = False
for _ in range(30):
time.sleep(1)
state = get_container_state(container_name)
if state == "Running":
ready = True
break

# Verify container exists and is running
state = get_container_state(container_name)
if state == "Unknown":
if not ready:
proc.terminate()
pytest.skip(f"Container {container_name} not found")
stderr_fd.close()
pytest.skip(f"Container {container_name} not ready, state: {state}")

# Inject malicious command (simulate reverse shell)
subprocess.Popen(
Expand All @@ -215,11 +220,12 @@ def test_reverse_shell_detection(self, test_workspace, enable_monitoring, coi_bi
)

# Wait for monitoring to detect and kill
max_wait = 15
for _ in range(max_wait):
killed = False
for _ in range(15):
time.sleep(1)
state = get_container_state(container_name)
if state in ["Stopped", "Frozen", "Unknown"]:
killed = True
break

# Close stderr file and print contents for debugging (before assertions)
Expand All @@ -233,12 +239,7 @@ def test_reverse_shell_detection(self, test_workspace, enable_monitoring, coi_bi
print("=== End Debug Log ===\n")

# Verify container was killed
final_state = get_container_state(container_name)
assert final_state in [
"Stopped",
"Frozen",
"Unknown", # Container deleted after critical threat
], f"Expected container killed, got {final_state}"
assert killed, f"Expected container killed, got {state}"

# Verify threat event logged
events = get_threat_events(container_name)
Expand Down
Loading