Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subwords capability to ffuf_shortnames #2237

Merged
merged 14 commits into from
Feb 13, 2025
19 changes: 18 additions & 1 deletion bbot/core/helpers/web/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async def download(self, url, **kwargs):
if success:
return filename

async def wordlist(self, path, lines=None, **kwargs):
async def wordlist(self, path, lines=None, zip=False, zip_filename=None, **kwargs):
"""
Asynchronous function for retrieving wordlists, either from a local path or a URL.
Allows for optional line-based truncation and caching. Returns the full path of the wordlist
Expand All @@ -242,6 +242,9 @@ async def wordlist(self, path, lines=None, **kwargs):
path (str): The local or remote path of the wordlist.
lines (int, optional): Number of lines to read from the wordlist.
If specified, will return a truncated wordlist with this many lines.
zip (bool, optional): Whether to unzip the file after downloading. Defaults to False.
zip_filename (str, optional): The name of the file to extract from the ZIP archive.
Required if zip is True.
cache_hrs (float, optional): Number of hours to cache the downloaded wordlist.
Defaults to 720 hours (30 days) for remote wordlists.
**kwargs: Additional keyword arguments to pass to the 'download' function for remote wordlists.
Expand All @@ -259,6 +262,8 @@ async def wordlist(self, path, lines=None, **kwargs):
Fetching and truncating to the first 100 lines
>>> wordlist_path = await self.helpers.wordlist("/root/rockyou.txt", lines=100)
"""
import zipfile

if not path:
raise WordlistError(f"Invalid wordlist: {path}")
if "cache_hrs" not in kwargs:
Expand All @@ -272,6 +277,18 @@ async def wordlist(self, path, lines=None, **kwargs):
if not filename.is_file():
raise WordlistError(f"Unable to find wordlist at {path}")

if zip:
if not zip_filename:
raise WordlistError("zip_filename must be specified when zip is True")
try:
with zipfile.ZipFile(filename, "r") as zip_ref:
if zip_filename not in zip_ref.namelist():
raise WordlistError(f"File {zip_filename} not found in the zip archive {filename}")
zip_ref.extract(zip_filename, filename.parent)
filename = filename.parent / zip_filename
except Exception as e:
raise WordlistError(f"Error unzipping file {filename}: {e}")

if lines is None:
return filename
else:
Expand Down
18 changes: 12 additions & 6 deletions bbot/modules/deadly/ffuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ class ffuf(BaseModule):
"lines": 5000,
"max_depth": 0,
"extensions": "",
"ignore_case": False,
}

options_desc = {
"wordlist": "Specify wordlist to use when finding directories",
"lines": "take only the first N lines from the wordlist when finding directories",
"max_depth": "the maximum directory depth to attempt to solve",
"extensions": "Optionally include a list of extensions to extend the keyword with (comma separated)",
"ignore_case": "Only put lowercase words into the wordlist",
}

deps_common = ["ffuf"]
Expand Down Expand Up @@ -301,11 +303,12 @@ async def execute_ffuf(
]
if len(pre_emit_temp_canary) == 0:
yield found_json

else:
self.warning(
"Baseline changed mid-scan. This is probably due to a WAF turning on a block against you."
self.verbose(
f"Would have reported URL [{found_json['url']}], but baseline check failed. This could be due to a WAF turning on mid-scan, or an unusual web server configuration."
)
self.warning(f"Aborting the current run against [{url}]")
self.verbose(f"Aborting the current run against [{url}]")
return

yield found_json
Expand All @@ -328,7 +331,8 @@ def generate_templist(self, prefix=None):
return self.helpers.tempfile(virtual_file, pipe=False), len(virtual_file)

def generate_wordlist(self, wordlist_file):
wordlist = []
wordlist_set = set() # Use a set to avoid duplicates
ignore_case = self.config.get("ignore_case", False) # Get the ignore_case option
for line in self.helpers.read_file(wordlist_file):
line = line.strip()
if not line:
Expand All @@ -339,5 +343,7 @@ def generate_wordlist(self, wordlist_file):
if any(x in line for x in self.banned_characters):
self.debug(f"Skipping adding [{line}] to wordlist because it has a banned character")
continue
wordlist.append(line)
return wordlist
if ignore_case:
line = line.lower() # Convert to lowercase if ignore_case is enabled
wordlist_set.add(line) # Add to set to handle duplicates
return list(wordlist_set) # Convert set back to list before returning
81 changes: 68 additions & 13 deletions bbot/modules/ffuf_shortnames.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
class ffuf_shortnames(ffuf):
watched_events = ["URL_HINT"]
produced_events = ["URL_UNVERIFIED"]
deps_pip = ["numpy"]
flags = ["aggressive", "active", "iis-shortnames", "web-thorough"]
meta = {
"description": "Use ffuf in combination IIS shortnames",
Expand All @@ -18,41 +17,44 @@ class ffuf_shortnames(ffuf):
}

options = {
"wordlist": "", # default is defined within setup function
"wordlist_extensions": "", # default is defined within setup function
"max_depth": 1,
"version": "2.0.0",
"extensions": "",
"ignore_redirects": True,
"find_common_prefixes": False,
"find_delimiters": True,
"find_subwords": False,
"max_predictions": 250,
}

options_desc = {
"wordlist": "Specify wordlist to use when finding directories",
"wordlist_extensions": "Specify wordlist to use when making extension lists",
"max_depth": "the maximum directory depth to attempt to solve",
"version": "ffuf version",
"extensions": "Optionally include a list of extensions to extend the keyword with (comma separated)",
"ignore_redirects": "Explicitly ignore redirects (301,302)",
"find_common_prefixes": "Attempt to automatically detect common prefixes and make additional ffuf runs against them",
"find_delimiters": "Attempt to detect common delimiters and make additional ffuf runs against them",
"find_subwords": "Attempt to detect subwords and make additional ffuf runs against them",
"max_predictions": "The maximum number of predictions to generate per shortname prefix",
}

deps_common = ["ffuf"]

in_scope_only = True

def generate_templist(self, prefix, shortname_type):
virtual_file = []
supplementary_words = ["html", "ajax", "xml", "json", "api"]

def generate_templist(self, hint, shortname_type):
virtual_file = set() # Use a set to avoid duplicates

for prediction, score in self.predict(hint, self.max_predictions, model=shortname_type):
prediction_lower = prediction.lower() # Convert to lowercase
self.debug(f"Got prediction: [{prediction_lower}] from prefix [{hint}] with score [{score}]")
virtual_file.add(prediction_lower) # Add to set to ensure uniqueness

for prediction, score in self.predict(prefix, self.max_predictions, model=shortname_type):
self.debug(f"Got prediction: [{prediction}] from prefix [{prefix}] with score [{score}]")
virtual_file.append(prediction)
virtual_file.append(self.canary)
return self.helpers.tempfile(virtual_file, pipe=False), len(virtual_file)
virtual_file.add(self.canary.lower()) # Ensure canary is also lowercase
return self.helpers.tempfile(list(virtual_file), pipe=False), len(virtual_file)

def predict(self, prefix, n=25, model="endpoint"):
predictor_name = f"{model}_predictor"
Expand Down Expand Up @@ -92,6 +94,7 @@ async def setup(self):
self.wordlist_extensions = await self.helpers.wordlist(wordlist_extensions)
self.ignore_redirects = self.config.get("ignore_redirects")
self.max_predictions = self.config.get("max_predictions")
self.find_subwords = self.config.get("find_subwords")

class MinimalWordPredictor:
def __init__(self):
Expand All @@ -116,10 +119,11 @@ def find_class(self, module, name):
return MinimalWordPredictor
return super().find_class(module, name)

endpoint_model = await self.helpers.download(
self.info("Loading ffuf_shortnames prediction models, could take a while if not cached")
endpoint_model = await self.helpers.wordlist(
"https://raw.githubusercontent.com/blacklanternsecurity/wordpredictor/refs/heads/main/trained_models/endpoints.bin"
)
directory_model = await self.helpers.download(
directory_model = await self.helpers.wordlist(
"https://raw.githubusercontent.com/blacklanternsecurity/wordpredictor/refs/heads/main/trained_models/directories.bin"
)

Expand All @@ -133,8 +137,24 @@ def find_class(self, module, name):
unpickler = CustomUnpickler(f)
self.directory_predictor = unpickler.load()

self.subword_list = []
if self.find_subwords:
self.debug("Acquiring ffuf_shortnames subword list")
subwords = await self.helpers.wordlist(
"https://raw.githubusercontent.com/nltk/nltk_data/refs/heads/gh-pages/packages/corpora/words.zip",
zip=True,
zip_filename="words/en",
)
with open(subwords, "r") as f:
subword_list_content = f.readlines()
self.subword_list = {word.lower().strip() for word in subword_list_content if 3 <= len(word.strip()) <= 5}
self.debug(f"Created subword_list with {len(self.subword_list)} words")
self.subword_list = self.subword_list.union(self.supplementary_words)
self.debug(f"Extended subword_list with supplementary words, total size: {len(self.subword_list)}")

self.per_host_collection = {}
self.shortname_to_event = {}

return True

def build_extension_list(self, event):
Expand All @@ -159,10 +179,20 @@ def find_delimiter(self, hint):
return None

async def filter_event(self, event):
if "iis-magic-url" in event.tags:
return False, "iis-magic-url URL_HINTs are not solvable by ffuf_shortnames"
if event.parent.type != "URL":
return False, "its parent event is not of type URL"
return True

def find_subword(self, word):
for i in range(len(word), 2, -1): # Start from full length down to 3 characters
candidate = word[:i]
if candidate in self.subword_list:
leftover = word[i:]
return candidate, leftover
return None, word # No match found, return None and the original word

async def handle_event(self, event):
filename_hint = re.sub(r"~\d", "", event.parsed_url.path.rsplit(".", 1)[0].split("/")[-1]).lower()

Expand Down Expand Up @@ -256,6 +286,31 @@ async def handle_event(self, event):
context=f'{{module}} brute-forced {ext.upper()} files with detected prefix "{ffuf_prefix}" and found {{event.type}}: {{event.data}}',
)

if self.config.get("find_subwords"):
subword, suffix = self.find_subword(filename_hint)
if subword:
if "shortname-directory" in event.tags:
tempfile, tempfile_len = self.generate_templist(suffix, "directory")
async for r in self.execute_ffuf(tempfile, root_url, prefix=subword, exts=["/"]):
await self.emit_event(
r["url"],
"URL_UNVERIFIED",
parent=event,
tags=[f"status-{r['status']}"],
context=f'{{module}} brute-forced directories with detected subword "{subword}" and found {{event.type}}: {{event.data}}',
)
elif "shortname-endpoint" in event.tags:
for ext in used_extensions:
tempfile, tempfile_len = self.generate_templist(suffix, "endpoint")
async for r in self.execute_ffuf(tempfile, root_url, prefix=subword, suffix=f".{ext}"):
await self.emit_event(
r["url"],
"URL_UNVERIFIED",
parent=event,
tags=[f"status-{r['status']}"],
context=f'{{module}} brute-forced {ext.upper()} files with detected subword "{subword}" and found {{event.type}}: {{event.data}}',
)

async def finish(self):
if self.config.get("find_common_prefixes"):
per_host_collection = dict(self.per_host_collection)
Expand Down
3 changes: 3 additions & 0 deletions bbot/presets/web/dotnet-audit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ config:
modules:
ffuf:
extensions: asp,aspx,ashx,asmx,ascx
extensions_ignore_case: True
ffuf_shortnames:
find_subwords: True
telerik:
exploit_RAU_crypto: True
include_subdirs: True # Run against every directory, not the default first received URL per-host
24 changes: 24 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_ffuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,30 @@ def check(self, module_test, events):
assert not any(e.type == "URL_UNVERIFIED" and "11111111" in e.data for e in events)


class TestFFUF_ignorecase(TestFFUF):
test_wordlist = ["11111111", "Admin", "admin", "zzzjunkword2"]
config_overrides = {
"modules": {"ffuf": {"wordlist": tempwordlist(test_wordlist), "extensions": "php", "ignore_case": True}}
}

async def setup_before_prep(self, module_test):
expect_args = {"method": "GET", "uri": "/admin"}
respond_args = {"response_data": "alive admin page"}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

expect_args = {"method": "GET", "uri": "/Admin"}
respond_args = {"response_data": "alive admin page"}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

expect_args = {"method": "GET", "uri": "/"}
respond_args = {"response_data": "alive"}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

def check(self, module_test, events):
assert any(e.type == "URL_UNVERIFIED" and "admin" in e.data for e in events)
assert not any(e.type == "URL_UNVERIFIED" and "Admin" in e.data for e in events)


class TestFFUFHeaders(TestFFUF):
test_wordlist = ["11111111", "console", "junkword1", "zzzjunkword2"]
config_overrides = {
Expand Down
19 changes: 19 additions & 0 deletions bbot/test/test_step_2/module_tests/test_module_ffuf_shortnames.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class TestFFUFShortnames(ModuleTestBase):
"modules": {
"ffuf_shortnames": {
"find_common_prefixes": True,
"find_subwords": True,
"wordlist": tempwordlist(test_wordlist),
}
}
Expand Down Expand Up @@ -142,6 +143,16 @@ async def setup_after_prep(self, module_test):
tags=["shortname-endpoint"],
)
)

seed_events.append(
module_test.scan.make_event(
"http://127.0.0.1:8888/newpro~1.asp",
"URL_HINT",
parent_event,
module="iis_shortnames",
tags=["shortname-endpoint"],
)
)
module_test.scan.target.seeds.events = set(seed_events)

expect_args = {"method": "GET", "uri": "/administrator.aspx"}
Expand Down Expand Up @@ -172,6 +183,10 @@ async def setup_after_prep(self, module_test):
respond_args = {"response_data": "alive"}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

expect_args = {"method": "GET", "uri": "/newproxy.aspx"}
respond_args = {"response_data": "alive"}
module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args)

def check(self, module_test, events):
basic_detection = False
directory_detection = False
Expand All @@ -180,6 +195,7 @@ def check(self, module_test, events):
directory_delimiter_detection = False
prefix_delimiter_detection = False
short_extensions_detection = False
subword_detection = False

for e in events:
if e.type == "URL_UNVERIFIED":
Expand All @@ -197,6 +213,8 @@ def check(self, module_test, events):
prefix_delimiter_detection = True
if e.data == "http://127.0.0.1:8888/short.pl":
short_extensions_detection = True
if e.data == "http://127.0.0.1:8888/newproxy.aspx":
subword_detection = True

assert basic_detection
assert directory_detection
Expand All @@ -205,3 +223,4 @@ def check(self, module_test, events):
assert directory_delimiter_detection
assert prefix_delimiter_detection
assert short_extensions_detection
assert subword_detection
Loading