-
Notifications
You must be signed in to change notification settings - Fork 55
Description
Right now, each engine provides a list of strings indicating what types of observables are supported. But that list should only ever include a standard set of texts (e.g. "URL", "Email", "FQDN", etc.) Any other string value would be at best ignored and at worst throw an error.
Then, when processing observables, those strings are repeated and compared against that list. This leads to a great deal of repetition and the potential for typos leading to errors.
Instead, we could use an Enum Flag object listing the supported observable types, and use that for simplified comparison. If there was a typo, the IDE would flag it as an invalid member.
Consider the code below as an example:
# base_engine.py
from enum import Flag, auto
class ObservableType(Flag):
CHROME_EXTENSION = auto()
EMAIL = auto()
FQDN = auto()
IPv4 = auto()
IPv6 = auto()
MD5 = auto()
SHA1 = auto()
SHA256 = auto()
URL = auto()Since ObservableType is an enum Flag, multiple options can all be selected at once, and python automatically checks for any/all of them as needed.
The BaseEngine.supported_types would now return an ObservableType object:
class BaseEngine(ABC):
...
@property
@abstractmethod
def supported_types(self) -> ObservableType:
"""Observable types this engine supports.
e.g., SupportedTypes.IPv4 | SupportedTypes.URL
"""
passUse the Alienvault engine as a concrete example:
class AlienVaultEngine(BaseEngine):
...
@property
def supported_types(self):
return (
ObservableType.FQDN
| ObservableType.IPv4
| ObservableType.IPv6
| ObservableType.MD5
| ObservableType.SHA1
| ObservableType.SHA256
| ObservableType.URL
)Now, we can convert the observable dict to a modeled dataclass, easily passed around and validated:
@dataclass(slots=True)
class Observable:
type: ObservableType
value: str
def __hash__(self) -> int:
"""Set membership requires the object to be hashable"""
return hash(self.value)Observables would then be identified and extracted similar to this:
# utils/utils.py
def identify_observable_type(observable_input: str) -> ObservableType | str:
"""testing the observable against a set of patterns to identify its type"""
patterns: dict[ObservableType, str] = {
ObservableType.IPv4: r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$",
ObservableType.IPv6: r"^([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$",
ObservableType.MD5: r"^[a-fA-F0-9]{32}$",
ObservableType.SHA1: r"^[a-fA-F0-9]{40}$",
ObservableType.SHA256: r"^[a-fA-F0-9]{64}$",
ObservableType.EMAIL: r"^[\w\.-]+@[\w\.-]+\.\w+$",
ObservableType.FQDN: r"^(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}$",
ObservableType.URL: r"^(https?|ftp)://[^\s/$.?#].[^\s]*$",
ObservableType.CHROME_EXTENSION: r"^[a-z]{32}$",
}
for observable_type, pattern in patterns.items():
if re.match(pattern, observable_input):
return observable_type
# Raise an error here, instead? Or add another ObservableType.UNKNOWN?
return "Unknown"
def extract_observables(text: str) -> set[Observable]:
"""Extract observables from text, focusing on full URLs with http or https."""
patterns: dict[ObservableType, str] = {
ObservableType.IPv4: r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b",
ObservableType.MD5: r"\b[a-fA-F0-9]{32}\b",
ObservableType.SHA1: r"\b[a-fA-F0-9]{40}\b",
ObservableType.SHA256: r"\b[a-fA-F0-9]{64}\b",
ObservableType.EMAIL: r"\b[\w\.-]+@[\w\.-]+\.\w+\b",
# Simplified URL pattern for http(s) only
# "URL": r"\bhttps?://[^\s/$.?#].[^\s]*\b",
ObservableType.URL: r"\bhttps?://[^\s/$.?#].[^\s<>\"'\?,;\]\[\}\{]*",
ObservableType.FQDN: r"\b(?:[a-zA-Z0-9-]+\.)+[a-zA-Z]{2,}\b",
ObservableType.CHROME_EXTENSION: r"\b[a-z]{32}\b",
}
results: set[Observable] = set()
seen: set[str] = set()
# Extract URLs first to prevent FQDN overlap
url_matches: list[str] = re.findall(patterns[ObservableType.URL], text)
# Extract other types of observables from remaining text
for observable_type, pattern in patterns.items():
matches: list[str] = re.findall(pattern, text)
for match in matches:
# Skip FQDNs if they are already extracted as URLs
if observable_type is ObservableType.FQDN and match in str(url_matches):
continue
if match not in seen:
seen.add(match)
results.add(Observable(value=match, type=observable_type))
"""Trimmed IPv6 regex logic here, for brevity..."""
# Add IPv6 at the end
for ipv6 in ipv6_addresses:
if ipv6 not in seen:
seen.add(ipv6)
results.add(Observable(value=ipv6, type=ObservableType.IPv6))
# filter invalid TLDs using tldextract and the list of invalid TLDs
filtered_results: set[Observable] = set()
for result in results:
if result.type is ObservableType.FQDN:
tld = result.value.split(".")[-1]
extracted = tldextract.extract(result.value)
if tld in INVALID_TLD or not extracted.suffix:
continue
filtered_results.add(result)
return filtered_resultsLastly, if you want to see if an engine supports a type, the check is simple:
if observable.type in engine.supported_types:
engine.analyze(observable)
...This is the framework of the idea. I'm sure there are some details that would need hammering out, but I think the benefits would be significant. Not only would we reduce the potential for errors with the observable types, but now the observable itself is modeled and easier to work with and troubleshoot.
Thoughts?