Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new transformation for creating new fields from Hashes field #294

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions sigma/processing/transformations.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from functools import partial
from sigma.conditions import ConditionOR, SigmaCondition
from typing import (
Any,
ClassVar,
Iterable,
List,
Dict,
Literal,
Optional,
Set,
Tuple,
Union,
Pattern,
Iterator,
Expand Down Expand Up @@ -299,6 +302,153 @@ def apply_value(
"""


@dataclass
class HashesFieldsDetectionItemTransformation(DetectionItemTransformation):
"""
Transforms the 'Hashes' field in Sigma rules by creating separate detection items for each hash type.

This transformation replaces the generic 'Hashes' field with specific fields for each hash algorithm,
optionally prefixing the field names. It supports various hash formats and can auto-detect hash types
based on their length.

Attributes:
valid_hash_algos (List[str]): List of supported hash algorithms.
field_prefix (str): Prefix to add to the new field names.
drop_algo_prefix (bool): If True, omits the algorithm name from the new field name.
hash_lengths (Dict[int, str]): Mapping of hash lengths to their corresponding algorithms.

Example:
Input:
Hashes:
- 'SHA1=5F1CBC3D99558307BC1250D084FA968521482025'
- 'MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7'
Output:
FileSHA1: '5F1CBC3D99558307BC1250D084FA968521482025'
FileMD5: '987B65CD9B9F4E9A1AFD8F8B48CF64A7'
"""

valid_hash_algos: List[str]
field_prefix: str = ""
drop_algo_prefix: bool = False
hash_lengths: ClassVar[Dict[int, str]] = {32: "MD5", 40: "SHA1", 64: "SHA256", 128: "SHA512"}

def apply_detection_item(
self, detection_item: SigmaDetectionItem
) -> Optional[Union[SigmaDetection, SigmaDetectionItem]]:
"""
Applies the transformation to a single detection item.

Args:
detection_item (SigmaDetectionItem): The detection item to transform.

Returns:
Optional[Union[SigmaDetection, SigmaDetectionItem]]: A new SigmaDetection object containing
the transformed detection items, or None if no valid hashes were found.

Raises:
Exception: If no valid hash algorithms were found in the detection item.
"""
algo_dict = self._parse_hash_values(detection_item.value)

if not algo_dict:
raise Exception(
f"No valid hash algo found in Hashes field. Please use one of the following: {', '.join(self.valid_hash_algos)}"
)

return self._create_new_detection_items(algo_dict)

def _parse_hash_values(
self, values: Union[SigmaString, List[SigmaString]]
) -> Dict[str, List[str]]:
"""
Parses the hash values from the detection item.

Args:
values (Union[SigmaString, List[SigmaString]]): The hash values to parse.

Returns:
Dict[str, List[str]]: A dictionary mapping field names to lists of hash values.
"""
algo_dict = defaultdict(list)
if not isinstance(values, list):
values = [values]

for value in values:
hash_algo, hash_value = self._extract_hash_algo_and_value(value.to_plain())
if hash_algo:
field_name = self._get_field_name(hash_algo)
algo_dict[field_name].append(hash_value)

return algo_dict

def _extract_hash_algo_and_value(self, value: str) -> Tuple[str, str]:
"""
Extracts the hash algorithm and value from a string.

Args:
value (str): The string containing the hash algorithm and value.

Returns:
Tuple[str, str]: A tuple containing the hash algorithm and value.
"""
parts = value.split("|") if "|" in value else value.split("=")
if len(parts) == 2:
hash_algo, hash_value = parts
hash_algo = hash_algo.lstrip("*").upper()
else:
hash_value = parts[0]
hash_algo = self._determine_hash_algo_by_length(hash_value)

return (hash_algo, hash_value) if hash_algo in self.valid_hash_algos else ("", hash_value)

def _determine_hash_algo_by_length(self, hash_value: str) -> str:
"""
Determines the hash algorithm based on the length of the hash value.

Args:
hash_value (str): The hash value to analyze.

Returns:
str: The determined hash algorithm, or an empty string if not recognized.
"""
return self.hash_lengths.get(len(hash_value), "")

def _get_field_name(self, hash_algo: str) -> str:
"""
Generates the field name for a given hash algorithm.

Args:
hash_algo (str): The hash algorithm.

Returns:
str: The generated field name.
"""
return f"{self.field_prefix}{'' if self.drop_algo_prefix else hash_algo}"

def _create_new_detection_items(self, algo_dict: Dict[str, List[str]]) -> SigmaDetection:
"""
Creates new detection items based on the parsed hash values.

Args:
algo_dict (Dict[str, List[str]]): A dictionary mapping field names to lists of hash values.

Returns:
SigmaDetection: A new SigmaDetection object containing the created detection items.
"""
return SigmaDetection(
detection_items=[
SigmaDetectionItem(
field=k if k != "keyword" else None,
modifiers=[],
value=[SigmaString(x) for x in v],
)
for k, v in algo_dict.items()
if k
],
item_linking=ConditionOR,
)


class StringValueTransformation(ValueTransformation):
"""
Base class for transformations that operate on SigmaString values.
Expand Down Expand Up @@ -1052,6 +1202,7 @@ def apply(
"field_name_prefix_mapping": FieldPrefixMappingTransformation,
"field_name_transform": FieldFunctionTransformation,
"drop_detection_item": DropDetectionItemTransformation,
"hashes_fields": HashesFieldsDetectionItemTransformation,
"field_name_suffix": AddFieldnameSuffixTransformation,
"field_name_prefix": AddFieldnamePrefixTransformation,
"wildcard_placeholders": WildcardPlaceholderTransformation,
Expand Down
122 changes: 122 additions & 0 deletions tests/test_processing_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
ValueListPlaceholderTransformation,
QueryExpressionPlaceholderTransformation,
ReplaceStringTransformation,
HashesFieldsDetectionItemTransformation,
)
from sigma.processing.pipeline import ProcessingPipeline, ProcessingItem
from sigma.processing.conditions import (
Expand Down Expand Up @@ -1787,3 +1788,124 @@ def class_filter(c):

for cls in inspect.getmembers(transformations_module, class_filter):
assert cls[1] in classes_with_identifiers


@pytest.fixture
def hashes_transformation():
return HashesFieldsDetectionItemTransformation(
valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"],
field_prefix="File",
drop_algo_prefix=False,
)


def test_hashes_transformation_single_hash(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")]
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]


def test_hashes_transformation_multiple_hashes(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"),
SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"),
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 2
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]
assert result.detection_items[1].field == "FileMD5"
assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")]
assert result.item_linking == ConditionOR


def test_hashes_transformation_drop_algo_prefix():
transformation = HashesFieldsDetectionItemTransformation(
valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"],
field_prefix="File",
drop_algo_prefix=True,
)
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")]
)
result = transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "File"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]


def test_hashes_transformation_invalid_hash(hashes_transformation):
detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("INVALID=123456")])
with pytest.raises(Exception, match="No valid hash algo found"):
hashes_transformation.apply_detection_item(detection_item)


def test_hashes_transformation_mixed_valid_invalid(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"),
SigmaString("INVALID=123456"),
SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"),
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 2
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]
assert result.detection_items[1].field == "FileMD5"
assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")]


def test_hashes_transformation_auto_detect_hash_type(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025"), # SHA1
SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7"), # MD5
SigmaString("A" * 64), # SHA256
SigmaString("B" * 128), # SHA512
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 4
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[1].field == "FileMD5"
assert result.detection_items[2].field == "FileSHA256"
assert result.detection_items[3].field == "FileSHA512"


def test_hashes_transformation_pipe_separator(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1|5F1CBC3D99558307BC1250D084FA968521482025")]
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]