Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix generic ssrf tests #2256

Merged
merged 18 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions bbot/modules/generic_ssrf.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ class generic_ssrf(BaseModule):
produced_events = ["VULNERABILITY"]
flags = ["active", "aggressive", "web-thorough"]
meta = {"description": "Check for generic SSRFs", "created_date": "2022-07-30", "author": "@liquidsec"}
options = {
"skip_dns_interaction": False,
}
options_desc = {
"skip_dns_interaction": "Do not report DNS interactions (only HTTP interaction)",
}
in_scope_only = True

deps_apt = ["curl"]
Expand All @@ -163,7 +169,7 @@ async def setup(self):
self.interactsh_subdomain_tags = {}
self.parameter_subdomain_tags_map = {}
self.severity = None
self.generic_only = self.config.get("generic_only", False)
self.skip_dns_interaction = self.config.get("skip_dns_interaction", False)

if self.scan.config.get("interactsh_disable", False) is False:
try:
Expand Down Expand Up @@ -191,6 +197,10 @@ async def handle_event(self, event):
await s.test(event)

async def interactsh_callback(self, r):
protocol = r.get("protocol").upper()
if protocol == "DNS" and self.skip_dns_interaction:
return

full_id = r.get("full-id", None)
subdomain_tag = full_id.split(".")[0]

Expand All @@ -204,24 +214,27 @@ async def interactsh_callback(self, r):
matched_severity = match[2]
matched_echoed_response = str(match[3])

# Check if any SSRF parameter is in the DNS request
triggering_param = self.parameter_subdomain_tags_map.get(subdomain_tag, None)
description = f"Out-of-band interaction: [{matched_technique}]"
if triggering_param:
self.debug(f"Found triggering parameter: {triggering_param}")
description += f" [Triggering Parameter: {triggering_param}]"
description += f" [{r.get('protocol').upper()}] Echoed Response: {matched_echoed_response}"
description += f" [{protocol}] Echoed Response: {matched_echoed_response}"

self.debug(f"Emitting event with description: {description}") # Debug the final description

event_type = "VULNERABILITY" if protocol == "HTTP" else "FINDING"
event_data = {
"host": str(matched_event.host),
"url": matched_event.data,
"description": description,
}
if protocol == "HTTP":
event_data["severity"] = matched_severity

await self.emit_event(
{
"severity": matched_severity,
"host": str(matched_event.host),
"url": matched_event.data,
"description": description,
},
"VULNERABILITY",
event_data,
event_type,
matched_event,
context=f"{{module}} scanned {matched_event.data} and detected {{event.type}}: {matched_technique}",
)
Expand All @@ -241,7 +254,7 @@ async def cleanup(self):

async def finish(self):
if self.scan.config.get("interactsh_disable", False) is False:
await self.helpers.sleep(2)
await self.helpers.sleep(5)
try:
for r in await self.interactsh_instance.poll():
await self.interactsh_callback(r)
Expand Down
45 changes: 25 additions & 20 deletions bbot/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from contextlib import suppress
from omegaconf import OmegaConf
from pytest_httpserver import HTTPServer
import time
import queue

from bbot.core import CORE
from bbot.core.helpers.misc import execute_sync_or_async
Expand Down Expand Up @@ -53,6 +55,12 @@ def silence_live_logging():
handler.setLevel(logging.CRITICAL)


def stop_server(server):
server.stop()
while server.is_running():
time.sleep(0.1) # Wait a bit before checking again


@pytest.fixture
def bbot_httpserver():
server = HTTPServer(host="127.0.0.1", port=8888, threaded=True)
Expand All @@ -61,11 +69,7 @@ def bbot_httpserver():
yield server

server.clear()
if server.is_running():
server.stop()

# this is to check if the client has made any request where no
# `assert_request` was called on it from the test
stop_server(server) # Ensure the server is fully stopped

server.check_assertions()
server.clear()
Expand All @@ -84,11 +88,7 @@ def bbot_httpserver_ssl():
yield server

server.clear()
if server.is_running():
server.stop()

# this is to check if the client has made any request where no
# `assert_request` was called on it from the test
stop_server(server) # Ensure the server is fully stopped

server.check_assertions()
server.clear()
Expand Down Expand Up @@ -129,7 +129,7 @@ class Interactsh_mock:
def __init__(self, name):
self.name = name
self.log = logging.getLogger(f"bbot.interactsh.{self.name}")
self.interactions = []
self.interactions = asyncio.Queue() # Use an asyncio queue for async access
self.correlation_id = "deadbeef-dead-beef-dead-beefdeadbeef"
self.stop = False
self.poll_task = None
Expand All @@ -138,35 +138,40 @@ def mock_interaction(self, subdomain_tag, msg=None):
self.log.info(f"Mocking interaction to subdomain tag: {subdomain_tag}")
if msg is not None:
self.log.info(msg)
self.interactions.append(subdomain_tag)
self.interactions.put_nowait(subdomain_tag) # Add to the thread-safe queue

async def register(self, callback=None):
if callable(callback):
self.poll_task = asyncio.create_task(self.poll_loop(callback))
return "fakedomain.fakeinteractsh.com"

async def deregister(self, callback=None):
await asyncio.sleep(1)
self.stop = True
if self.poll_task is not None:
self.poll_task.cancel()
with suppress(BaseException):
with suppress(asyncio.CancelledError):
await self.poll_task

async def poll_loop(self, callback=None):
while not self.stop:
data_list = await self.poll(callback)
if not data_list:
await asyncio.sleep(1)
await asyncio.sleep(0.5)
continue
await asyncio.sleep(1)
await self.poll(callback)

async def poll(self, callback=None):
poll_results = []
for subdomain_tag in self.interactions:
result = {"full-id": f"{subdomain_tag}.fakedomain.fakeinteractsh.com", "protocol": "HTTP"}
poll_results.append(result)
if callback is not None:
await execute_sync_or_async(callback, result)
self.interactions = []
while not self.interactions.empty():
subdomain_tag = await self.interactions.get() # Get the first element from the asyncio queue
for protocol in ["HTTP", "DNS"]:
result = {"full-id": f"{subdomain_tag}.fakedomain.fakeinteractsh.com", "protocol": protocol}
poll_results.append(result)
if callback is not None:
await execute_sync_or_async(callback, result)
await asyncio.sleep(0.1)
return poll_results


Expand Down
3 changes: 1 addition & 2 deletions bbot/test/test_step_2/module_tests/test_module_dotnetnuke.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import re
from .base import ModuleTestBase
from werkzeug.wrappers import Response
Expand Down Expand Up @@ -171,7 +172,5 @@ def check(self, module_test, events):
if e.type == "VULNERABILITY" and "DotNetNuke Blind-SSRF (CVE 2017-0929)" in e.data["description"]:
dnn_dnnimagehandler_blindssrf = True

assert self.interactsh_mock_instance.interactions == []

assert dnn_technology_detection, "DNN Technology Detection Failed"
assert dnn_dnnimagehandler_blindssrf, "dnnimagehandler.ashx Blind SSRF Detection Failed"
37 changes: 34 additions & 3 deletions bbot/test/test_step_2/module_tests/test_module_generic_ssrf.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import asyncio
from werkzeug.wrappers import Response

from .base import ModuleTestBase
Expand All @@ -23,15 +24,16 @@ def request_handler(self, request):
elif request.method == "POST":
subdomain_tag = extract_subdomain_tag(request.data.decode())
if subdomain_tag:
self.interactsh_mock_instance.mock_interaction(
subdomain_tag, msg=f"{request.method}: {request.data.decode()}"
asyncio.run(
self.interactsh_mock_instance.mock_interaction(
subdomain_tag, msg=f"{request.method}: {request.data.decode()}"
)
)

return Response("alive", status=200)

async def setup_before_prep(self, module_test):
self.interactsh_mock_instance = module_test.mock_interactsh("generic_ssrf")
self.interactsh_mock_instance.mock_interaction("asdf")
module_test.monkeypatch.setattr(
module_test.scan.helpers, "interactsh", lambda *args, **kwargs: self.interactsh_mock_instance
)
Expand All @@ -41,6 +43,18 @@ async def setup_after_prep(self, module_test):
module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler)

def check(self, module_test, events):
total_vulnerabilities = 0
total_findings = 0

for e in events:
if e.type == "VULNERABILITY":
total_vulnerabilities += 1
elif e.type == "FINDING":
total_findings += 1

assert total_vulnerabilities == 30, "Incorrect number of vulnerabilities detected"
assert total_findings == 30, "Incorrect number of findings detected"

assert any(
e.type == "VULNERABILITY"
and "Out-of-band interaction: [Generic SSRF (GET)]"
Expand All @@ -55,3 +69,20 @@ def check(self, module_test, events):
e.type == "VULNERABILITY" and "Out-of-band interaction: [Generic XXE] [HTTP]" in e.data["description"]
for e in events
), "Failed to detect Generic SSRF (XXE)"


class TestGeneric_SSRF_httponly(TestGeneric_SSRF):
config_overrides = {"modules": {"generic_ssrf": {"skip_dns_interaction": True}}}

def check(self, module_test, events):
total_vulnerabilities = 0
total_findings = 0

for e in events:
if e.type == "VULNERABILITY":
total_vulnerabilities += 1
elif e.type == "FINDING":
total_findings += 1

assert total_vulnerabilities == 30, "Incorrect number of vulnerabilities detected"
assert total_findings == 0, "Incorrect number of findings detected"
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import re
from werkzeug.wrappers import Response

Expand Down
Loading