Skip to content

[FR] Refactor to more seamlessly support multiple DAC approaches #3407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a1e9484
[FR] Add custom rule directory support
brokensound77 Jan 28, 2024
89c3220
[FR] Add support for configurable tests and validation
brokensound77 Jan 30, 2024
f319f78
[FR] Add support to decouple actions and exceeptions
brokensound77 Feb 21, 2024
1c327bd
Merge pull request #5 from brokensound77/decoupled-exceptions-and-act…
brokensound77 Mar 13, 2024
835b88f
Merge pull request #4 from brokensound77/add-configurable-testing-and…
brokensound77 Mar 13, 2024
86e44d3
Merge remote-tracking branch 'upstream/main' into custom-rule-dir
brokensound77 Mar 13, 2024
f981078
resolve conflicts
brokensound77 Mar 13, 2024
d550b06
Merge branch 'main' into custom-rule-dir
brokensound77 Apr 21, 2024
59545a1
update actions schema
brokensound77 Apr 21, 2024
eb497b9
move config components to config.py
brokensound77 Apr 21, 2024
7a61936
update docstrings and comments
brokensound77 Apr 22, 2024
f36d874
break out function to staticmethod for formatting tests
brokensound77 Apr 22, 2024
c1a263c
rename rule_loader default global constants
brokensound77 Apr 22, 2024
021087b
add custom-rules init-config command
brokensound77 Apr 22, 2024
9618f82
update docs
brokensound77 Apr 22, 2024
b230ff5
add rule_dirs to config
brokensound77 Apr 22, 2024
3724008
init files with empty dict
brokensound77 Apr 22, 2024
a84aef1
make unit tests comprehensively exemptable
brokensound77 Apr 24, 2024
9ea9772
MOAR docstrings
brokensound77 Apr 24, 2024
760fc1b
denitting docstrings and return types
brokensound77 Apr 26, 2024
6e8eaf6
Merge remote-tracking branch 'upstream/DAC-feature' into custom-rule-dir
brokensound77 Apr 26, 2024
32ce091
correct variable names from merge
brokensound77 Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ the [README](README.md). Basic use of the CLI such as [creating a rule](CONTRIBU
[testing](CONTRIBUTING.md#testing-a-rule-with-the-cli) are referenced in the [contribution guide](CONTRIBUTING.md).


## Using a config file or environment variables
## Using a user config file or environment variables

CLI commands which are tied to Kibana and Elasticsearch are capable of parsing auth-related keyword args from a config
file or environment variables.
Expand All @@ -17,9 +17,9 @@ follows:
* config values
* prompt (this only applies to certain values)

#### Setup a config file
#### Setup a user config file

In the root directory of this repo, create the file `.detection-rules-cfg.json` and add relevant values
In the root directory of this repo, create the file `.detection-rules-cfg.json` (or `.yaml`) and add relevant values

Currently supported arguments:
* elasticsearch_url
Expand All @@ -34,13 +34,6 @@ Environment variables using the argument format: `DR_<UPPERCASED_ARG_NAME>` will
EX: `DR_USER=joe`


Using the environment variable `DR_BYPASS_NOTE_VALIDATION_AND_PARSE` will bypass the Detection Rules validation on the `note` field in toml files.

Using the environment variable `DR_BYPASS_BBR_LOOKBACK_VALIDATION` will bypass the Detection Rules lookback and interval validation
on the building block rules.

Using the environment variable `DR_BYPASS_TAGS_VALIDATION` will bypass the Detection Rules Unit Tests on the `tags` field in toml files.

## Importing rules into the repo

You can import rules into the repo using the `create-rule` or `import-rules` commands. Both of these commands will
Expand Down Expand Up @@ -380,4 +373,4 @@ value = "fast"
```

The easiest way to _update_ a rule with existing transform entries is to use `guide-plugin-convert` and manually add it
to the rule.
to the rule.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ This repository was first announced on Elastic's blog post, [Elastic Security op

Detection Rules contains more than just static rule files. This repository also contains code for unit testing in Python and integrating with the Detection Engine in Kibana.

| folder | description |
|------------------------------------------------ |------------------------------------------------------------------------------------ |
| [`detection_rules/`](detection_rules) | Python module for rule parsing, validating and packaging |
| [`etc/`](detection_rules/etc) | Miscellaneous files, such as ECS and Beats schemas |
| [`kibana/`](lib/kibana) | Python library for handling the API calls to Kibana and the Detection Engine |
| [`kql/`](lib/kql) | Python library for parsing and validating Kibana Query Language |
| [`rta/`](rta) | Red Team Automation code used to emulate attacker techniques, used for rule testing |
| [`rules/`](rules) | Root directory where rules are stored |
| [`rules_building_block/`](rules_building_block) | Root directory where building block rules are stored |
| [`tests/`](tests) | Python code for unit testing rules |
| folder | description |
|------------------------------------------------ |--------------------------------------------------------------------------------------------|
| [`detection_rules/`](detection_rules) | Python module for rule parsing, validating and packaging |
| [`docs`](docs) | Additional, more verbose documentation for the repository |
| [`etc/`](detection_rules/etc) | Miscellaneous files, such as ECS and Beats schemas |
| [`kibana/`](lib/kibana) | Python library for handling the API calls to Kibana and the Detection Engine |
| [`kql/`](lib/kql) | Python library for parsing and validating Kibana Query Language |
| [`rta/`](rta) | Red Team Automation code used to emulate attacker techniques, used for rule testing |
| [`rules/`](rules) | Root directory where rules are stored |
| [`rules_building_block/`](rules_building_block) | Root directory where building block rules are stored |
| [`tests/`](tests) | Python code for unit testing rules |


## Getting started
Expand Down
2 changes: 2 additions & 0 deletions detection_rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
assert (3, 12) <= sys.version_info < (4, 0), "Only Python 3.12+ supported"

from . import ( # noqa: E402
custom_rules,
devtools,
docs,
eswrap,
Expand All @@ -28,6 +29,7 @@
)

__all__ = (
'custom_rules',
'devtools',
'docs',
'eswrap',
Expand Down
64 changes: 64 additions & 0 deletions detection_rules/action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

"""Dataclasses for Action."""
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional

from .mixins import MarshmallowDataclassMixin
from .schemas import definitions


@dataclass(frozen=True)
class ActionMeta(MarshmallowDataclassMixin):
"""Data stored in an exception's [metadata] section of TOML."""
creation_date: definitions.Date
rule_id: List[definitions.UUIDString]
rule_name: str
updated_date: definitions.Date

# Optional fields
deprecation_date: Optional[definitions.Date]
comments: Optional[str]
maturity: Optional[definitions.Maturity]


@dataclass
class Action(MarshmallowDataclassMixin):
"""Data object for rule Action."""
@dataclass
class ActionParams:
"""Data object for rule Action params."""
body: str

action_type_id: definitions.ActionTypeId
group: str
params: ActionParams
id: Optional[str]
frequency: Optional[dict]
alerts_filter: Optional[dict]


@dataclass(frozen=True)
class TOMLActionContents(MarshmallowDataclassMixin):
"""Object for action from TOML file."""
metadata: ActionMeta
actions: List[Action]


@dataclass(frozen=True)
class TOMLAction:
"""Object for action from TOML file."""
contents: TOMLActionContents
path: Path

@property
def name(self):
return self.contents.metadata.rule_name

@property
def id(self):
return self.contents.metadata.rule_id
8 changes: 4 additions & 4 deletions detection_rules/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from .attack import matrix, tactics, build_threat_map_entry
from .rule import TOMLRule, TOMLRuleContents
from .rule_loader import (RuleCollection,
DEFAULT_RULES_DIR,
DEFAULT_BBR_DIR,
DEFAULT_PREBUILT_RULES_DIR,
DEFAULT_PREBUILT_BBR_DIR,
dict_filter)
from .schemas import definitions
from .utils import clear_caches, get_path
Expand Down Expand Up @@ -49,7 +49,7 @@ def get_collection(*args, **kwargs):
rules.load_directories(Path(d) for d in directories)

if rule_id:
rules.load_directories((DEFAULT_RULES_DIR, DEFAULT_BBR_DIR),
rules.load_directories((DEFAULT_PREBUILT_RULES_DIR, DEFAULT_PREBUILT_BBR_DIR),
toml_filter=dict_filter(rule__rule_id=rule_id))
if len(rules) != 1:
client_error(f"Could not find rule with ID {rule_id}")
Expand Down Expand Up @@ -83,7 +83,7 @@ def get_collection(*args, **kwargs):
rules.load_directories(Path(d) for d in directories)

if rule_id:
rules.load_directories((DEFAULT_RULES_DIR, DEFAULT_BBR_DIR),
rules.load_directories((DEFAULT_PREBUILT_RULES_DIR, DEFAULT_PREBUILT_BBR_DIR),
toml_filter=dict_filter(rule__rule_id=rule_id))
found_ids = {rule.id for rule in rules}
missing = set(rule_id).difference(found_ids)
Expand Down
217 changes: 217 additions & 0 deletions detection_rules/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.

"""Configuration support for custom components."""
import fnmatch
import os
from dataclasses import dataclass
from pathlib import Path
from functools import cached_property
from typing import Dict, List, Optional

import yaml
from eql.utils import load_dump

from .misc import discover_tests
from .utils import cached, load_etc_dump, get_etc_path

ROOT_DIR = Path(__file__).parent.parent
CUSTOM_RULES_DIR = os.getenv('CUSTOM_RULES_DIR', None)


@dataclass
class UnitTest:
"""Base object for unit tests configuration."""
bypass: Optional[List[str]] = None
test_only: Optional[List[str]] = None

def __post_init__(self):
assert not (self.bypass and self.test_only), 'Cannot use both test_only and bypass'


@dataclass
class RuleValidation:
"""Base object for rule validation configuration."""
bypass: Optional[List[str]] = None
test_only: Optional[List[str]] = None

def __post_init__(self):
assert not (self.bypass and self.test_only), 'Cannot use both test_only and bypass'


@dataclass
class TestConfig:
"""Detection rules test config file"""
test_file: Optional[Path] = None
unit_tests: Optional[UnitTest] = None
rule_validation: Optional[RuleValidation] = None

@classmethod
def from_dict(cls, test_file: Optional[Path] = None, unit_tests: Optional[dict] = None,
rule_validation: Optional[dict] = None):
return cls(test_file=test_file or None, unit_tests=UnitTest(**unit_tests or {}),
rule_validation=RuleValidation(**rule_validation or {}))

@cached_property
def all_tests(self):
"""Get the list of all test names."""
return discover_tests()

def tests_by_patterns(self, *patterns: str) -> List[str]:
"""Get the list of test names by patterns."""
tests = set()
for pattern in patterns:
tests.update(list(fnmatch.filter(self.all_tests, pattern)))
return sorted(tests)

@staticmethod
def parse_out_patterns(names: List[str]) -> (List[str], List[str]):
"""Parse out test patterns from a list of test names."""
patterns = []
tests = []
for name in names:
if name.startswith('pattern:') and '*' in name:
patterns.append(name[len('pattern:'):])
else:
tests.append(name)
return patterns, tests

@staticmethod
def format_tests(tests: List[str]) -> List[str]:
"""Format unit test names into expected format for direct calling."""
raw = [t.rsplit('.', maxsplit=2) for t in tests]
formatted = []
for test in raw:
path, clazz, method = test
path = f'{path.replace(".", os.path.sep)}.py'
formatted.append('::'.join([path, clazz, method]))
return formatted

def get_test_names(self, formatted: bool = False) -> (List[str], List[str]):
"""Get the list of test names to run."""
patterns_t, tests_t = self.parse_out_patterns(self.unit_tests.test_only or [])
patterns_b, tests_b = self.parse_out_patterns(self.unit_tests.bypass or [])
defined_tests = tests_t + tests_b
patterns = patterns_t + patterns_b
unknowns = sorted(set(defined_tests) - set(self.all_tests))
assert not unknowns, f'Unrecognized test names in config ({self.test_file}): {unknowns}'

combined_tests = sorted(set(defined_tests + self.tests_by_patterns(*patterns)))

if self.unit_tests.test_only is not None:
tests = combined_tests
skipped = [t for t in self.all_tests if t not in tests]
elif self.unit_tests.bypass:
tests = []
skipped = []
for test in self.all_tests:
if test not in combined_tests:
tests.append(test)
else:
skipped.append(test)
else:
tests = self.all_tests
skipped = []

if formatted:
return self.format_tests(tests), self.format_tests(skipped)
else:
return tests, skipped

def check_skip_by_rule_id(self, rule_id: str) -> bool:
"""Check if a rule_id should be skipped."""
bypass = self.rule_validation.bypass
test_only = self.rule_validation.test_only

# neither bypass nor test_only are defined, so no rules are skipped
if not (bypass or test_only):
return False
# if defined in bypass or not defined in test_only, then skip
return (bypass and rule_id in bypass) or (test_only and rule_id not in test_only)


@dataclass
class RulesConfig:
"""Detection rules config file."""
deprecated_rules_file: Path
deprecated_rules: Dict[str, dict]
packages_file: Path
packages: Dict[str, dict]
rule_dirs: List[Path]
stack_schema_map_file: Path
stack_schema_map: Dict[str, dict]
test_config: TestConfig
version_lock_file: Path
version_lock: Dict[str, dict]

action_dir: Optional[Path] = None
exception_dir: Optional[Path] = None


@cached
def parse_rules_config(path: Optional[Path] = None) -> RulesConfig:
"""Parse the _config.yaml file for default or custom rules."""
if path:
assert path.exists(), f'rules config file does not exist: {path}'
loaded = yaml.safe_load(path.read_text())
elif CUSTOM_RULES_DIR:
path = Path(CUSTOM_RULES_DIR) / '_config.yaml'
assert path.exists(), f'_config.yaml file missing in {CUSTOM_RULES_DIR}'
loaded = yaml.safe_load(path.read_text())
else:
path = Path(get_etc_path('_config.yaml'))
loaded = load_etc_dump('_config.yaml')

assert loaded, f'No data loaded from {path}'

base_dir = path.resolve().parent

# testing
# precedence to the environment variable
# environment variable is absolute path and config file is relative to the _config.yaml file
test_config_ev = os.getenv('DETECTION_RULES_TEST_CONFIG', None)
if test_config_ev:
test_config_path = Path(test_config_ev)
else:
test_config_file = loaded.get('testing', {}).get('config')
if test_config_file:
test_config_path = base_dir.joinpath(test_config_file)
else:
test_config_path = None

if test_config_path:
test_config_data = yaml.safe_load(test_config_path.read_text())

# overwrite None with empty list to allow implicit exemption of all tests with `test_only` defined to None in
# test config
if 'unit_tests' in test_config_data and test_config_data['unit_tests'] is not None:
test_config_data['unit_tests'] = {k: v or [] for k, v in test_config_data['unit_tests'].items()}
test_config = TestConfig.from_dict(test_file=test_config_path, **test_config_data)
else:
test_config = TestConfig.from_dict()

# files
# paths are relative
files = {f'{k}_file': base_dir.joinpath(v) for k, v in loaded['files'].items()}
contents = {k: load_dump(str(base_dir.joinpath(v))) for k, v in loaded['files'].items()}
contents.update(**files)

# directories
# paths are relative
if loaded.get('directories'):
contents.update({k: base_dir.joinpath(v) for k, v in loaded['directories'].items()})

# rule_dirs
# paths are relative
contents['rule_dirs'] = [base_dir.joinpath(d) for d in loaded.get('rule_dirs', [])]

rules_config = RulesConfig(test_config=test_config, **contents)
return rules_config


@cached
def load_current_package_version() -> str:
"""Load the current package version from config file."""
return parse_rules_config().packages['package']['name']
Loading
Loading