Skip to content

Commit

Permalink
Improve code
Browse files Browse the repository at this point in the history
- Add maximum relative Shannon entropy function
- Fix early return bug in Rust core
- Add preliminary filter to JWT findings
- Add description to entropy token findings
- Add filter to entropy tokens for symbols
- Improve filter patterns for entropy tokens
  • Loading branch information
dennis-carlson-sudo committed Aug 29, 2023
1 parent 7f397e1 commit a2bcdfa
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 18 deletions.
6 changes: 6 additions & 0 deletions mystik/findings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def get_shannon_entropy(string):
return -entropy


def get_relative_shannon_entropy(string):
entropy = get_shannon_entropy(string)
max_entropy = log2(len(set(string)))
return entropy / max_entropy


def get_sequence_rating(string, max_distance=1):
last_character = string[0]
sequences = 0
Expand Down
34 changes: 23 additions & 11 deletions mystik/findings/entropy-token.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,62 @@
#!/usr/bin/env python3
from regex import match as match_regex
from regex import search as search_regex

from . import SecretFinding, get_pronounceable_rating, \
get_shannon_entropy, get_sequence_rating, get_character_counts
get_shannon_entropy, get_sequence_rating, get_character_counts, \
get_relative_shannon_entropy


class EntropyToken(SecretFinding):
name = 'Entropy Token'

description = [
'TODO: Add a description for entropy tokens.'
'API tokens are a type of authentication mechanism that is used to grant access to API resources. It is a unique identifier that is generated by the API server and is used by the API client to authenticate itself during API requests. API tokens are commonly used in RESTful web services and APIs to authorize access to protected resources. The token is usually generated by the API provider when the user or client application first registers with the API server.',
'However, it is a bad idea to expose API tokens to end users because it can lead to security vulnerabilities. If an API token is exposed, it can be used by anyone to access the API and potentially perform unauthorized actions. This can be especially dangerous if the API provides access to sensitive information or functionality. Therefore, it is important to keep API tokens secure and limit their exposure to only authorized users and systems.'
]

patterns = [
r'(?i)[a-z0-9_=\.\-\+?!@#$%^&*/:]{8,}'
]

ideal_rating = 7
ideal_rating = 8

@classmethod
def should_filter_match(this, match):
capture = match.capture.decode()

# If the match is in a shared object and the capture starts with "_Z",
# it is likely an artifact and not important.
if match.file_name.lower().endswith('.so'):
if capture.startswith('_Z'):
return True

# If the match is entirely a hex value, we filter it.
if match_regex(r'(?i)^[a-f0-9]+$', capture):
if search_regex(r'(?i)^[a-f0-9]+$', capture):
return True

# If it could be a URL or path, we check it out.
if '/' in capture:
url_patterns = [
# This should catch patterns that may not specify a TLD, but DO
# specify some kind of protocol (e.g. https://, sftp://).
r'(?i)^(?:[a-z0-9]+)?://(?:[a-z0-9\-\.]+)(?:/[a-z0-9\-\+_\.%/?&=\[\]{}#]*)?$',
# specify some kind of protocol (e.g. https://, sftp:// with
# localhost, machine-01).
r'(?i)(?:[a-z0-9]+)?://(?:[a-z0-9\-\.]+)(?:/[a-z0-9\-\+_\.%/?:&=\[\]{}#]*)?',

# This should catch patterns that may not specify a protocol, but
# DO specify some kind of TLD (e.g. example.org).
r'(?i)^(?:(?:[a-z0-9]+)?://)?(?:(?:[a-z0-9\-]+\.){1,}[a-z0-9\-]+)(?:/[a-z0-9\-\+_\.%/?&=\[\]{}#]*)?$'
# DO specify some kind of TLD (e.g. example.org without
# necessarily having https://).
r'(?i)^(?:(?:[a-z0-9]+)?://)?(?:(?:[a-z0-9\-]+\.){1,}[a-z0-9\-]+)(?:/[a-z0-9\-\+_\.%/?:&=\[\]{}#]*)?$'
]

# If the match looks like a URL, we filter it.
for pattern in url_patterns:
if match_regex(pattern, capture):
match = search_regex(pattern, capture)

if match and len(match.group()) > len(capture) * 0.5:
return True

# If the match looks like a path, we exclude it.
if match_regex(r'(?i)^(?:[a-z0-9\-\+_\. =]+/?){1,}$', capture):
if search_regex(r'(?i)^/?(?:[a-z0-9\-\+_\. :$]+/?){1,}$', capture):
return True

# If the match appears to be some kind of sequence, we skip it.
Expand Down
27 changes: 26 additions & 1 deletion mystik/findings/json-web-token.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from json.decoder import JSONDecodeError
from binascii import Error as BinError

from . import SecretFinding
from . import SecretFinding, get_pronounceable_rating, \
get_shannon_entropy, get_sequence_rating, get_character_counts, \
get_relative_shannon_entropy


class JSONWebToken(SecretFinding):
Expand All @@ -20,6 +22,29 @@ class JSONWebToken(SecretFinding):

ideal_rating = 6

@classmethod
def should_filter_match(this, match):
capture = match.capture.decode()

# If the match appears to be some kind of sequence, we skip it.
if get_sequence_rating(capture) > 0.5:
return True

# We try to decode the header section first.
try:
header = from_json(standard_b64decode(match.groups[0] + b'==').decode())
except:
return True

# We try to decode the data section next.
try:
data = from_json(standard_b64decode(match.groups[1] + b'==').decode())
except:
if 'enc' not in header:
return True

return False

@classmethod
def get_indicators(this, context, capture, capture_start, capture_end, groups): # noqa: C901,E261
indicators = super().get_indicators(context, capture, capture_start, capture_end, groups)
Expand Down
20 changes: 14 additions & 6 deletions mystik_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ fn recursive_regex_search(py: Python, path: &str, patterns: Vec<(String, String,
let file_open_result = File::open(&path);

if file_open_result.is_err() {
let _ = error_sender.lock().unwrap().send(PyErr::new::<PyIOError, _>(format!("Failed to open file: {}", path.display())));
error_sender.lock().unwrap().send(
PyErr::new::<PyIOError, _>(format!("Failed to open file: {}", path.display()))
).unwrap();
return;
}

Expand All @@ -138,7 +140,9 @@ fn recursive_regex_search(py: Python, path: &str, patterns: Vec<(String, String,
let file_metadata_result = file.metadata();

if file_metadata_result.is_err() {
let _ = error_sender.lock().unwrap().send(PyErr::new::<PyIOError, _>(format!("Failed to get file metadata: {}", path.display())));
error_sender.lock().unwrap().send(
PyErr::new::<PyIOError, _>(format!("Failed to get file metadata: {}", path.display()))
).unwrap();
return;
}

Expand All @@ -153,7 +157,9 @@ fn recursive_regex_search(py: Python, path: &str, patterns: Vec<(String, String,
let mut contents = Vec::new();

if file.read_to_end(&mut contents).is_err() {
let _ = error_sender.lock().unwrap().send(PyErr::new::<PyIOError, _>(format!("Failed to read the file: {}", path.display())));
error_sender.lock().unwrap().send(
PyErr::new::<PyIOError, _>(format!("Failed to read the file: {}", path.display()))
).unwrap();
return;
}

Expand Down Expand Up @@ -218,8 +224,10 @@ fn recursive_regex_search(py: Python, path: &str, patterns: Vec<(String, String,
});

if filter_result.is_err() {
let _ = error_sender.lock().unwrap().send(PyErr::new::<PyRuntimeError, _>(format!("Failed to filter the finding: {}", pattern_tag.to_string())));
return;
error_sender.lock().unwrap().send(
PyErr::new::<PyRuntimeError, _>(format!("Failed to filter the finding: {}", pattern_tag.to_string()))
).unwrap();
continue;
}

// Technically, this conversion can fail. However, if done
Expand All @@ -228,7 +236,7 @@ fn recursive_regex_search(py: Python, path: &str, patterns: Vec<(String, String,
let is_filtered: bool = filter_result.unwrap();

if is_filtered {
return;
continue;
}
}

Expand Down

0 comments on commit a2bcdfa

Please sign in to comment.