Skip to content

Commit

Permalink
Allow force unit tests (#59)
Browse files Browse the repository at this point in the history
* allow force unit tests

* tweaked phrasing

* add readme stuff

* tweak some formattin
  • Loading branch information
nhakmiller authored Dec 15, 2020
1 parent 2ea4485 commit 5e8c009
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 25 deletions.
61 changes: 50 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ positional arguments:

optional arguments:
-h, --help show this help message and exit
--version show program's version number and exit```
--version show program's version number and exit
```
Run tests:
Expand Down Expand Up @@ -136,11 +137,11 @@ AWS.CloudTrail.MFAEnabled
}
```
The `test`, `zip`, and `upload` commands all supporting filtering. Filtering works by passing the `--filter` argument with a list of filters specified in the format `KEY=VALUE1,VALUE2`. The keys can be any valid field in a policy or rule. When using a filter, only anaylsis that matches each filter specified will be considered. For example, the following command will test only items with the AnalysisType of policy AND the severity of High:
The `test`, `zip`, and `upload` commands all support filtering. Filtering works by passing the `--filter` argument with a list of filters specified in the format `KEY=VALUE1,VALUE2`. The keys can be any valid field in a policy or rule. When using a filter, only anaylsis that matches each filter specified will be considered. For example, the following command will test only items with the AnalysisType of policy AND the severity of High:
```
panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy Severity=High
[INFO]: Testing analysis packs in tests/fixtures/valid\_policies
panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy Severity=High
[INFO]: Testing analysis packs in tests/fixtures/valid_policies
AWS.IAM.BetaTest
[PASS] Root MFA not enabled fails compliance
Expand All @@ -150,8 +151,8 @@ AWS.IAM.BetaTest
Whereas the following command will test items with the AnalysisType policy OR rule, AND the severity High:
```
panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,rule Severity=High
[INFO]: Testing analysis packs in tests/fixtures/valid\_policies
panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,rule Severity=High
[INFO]: Testing analysis packs in tests/fixtures/valid_policies
AWS.IAM.BetaTest
[PASS] Root MFA not enabled fails compliance
Expand All @@ -165,13 +166,13 @@ AWS.CloudTrail.MFAEnabled
When writing policies or rules that refer to the `global` analysis types, be sure to include them in your filter. You can include an empty string as a value in a filter, and it will mean the filter is only applied if the field exists. The following command will return an error, because the policy in question imports a global but the global does not have a severity so it is excluded by the filter:
```
panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,global Severity=Critical
[INFO]: Testing analysis packs in tests/fixtures/valid\_policies
panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global Severity=Critical
[INFO]: Testing analysis packs in tests/fixtures/valid_policies
AWS.IAM.MFAEnabled
[ERROR] Error loading module, skipping
Invalid: tests/fixtures/valid\_policies/example\_policy.yml
Invalid: tests/fixtures/valid_policies/example_policy.yml
No module named 'panther'
[ERROR]: [('tests/fixtures/valid_policies/example_policy.yml', ModuleNotFoundError("No module named 'panther'"))]
Expand All @@ -180,8 +181,8 @@ Invalid: tests/fixtures/valid\_policies/example\_policy.yml
If you want this query to work, you need to allow for the severity field to be absent like this:
```
panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,global Severity=Critical,""
[INFO]: Testing analysis packs in tests/fixtures/valid\_policies
panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global Severity=Critical,""
[INFO]: Testing analysis packs in tests/fixtures/valid_policies
AWS.IAM.MFAEnabled
[PASS] Root MFA not enabled fails compliance
Expand All @@ -190,6 +191,44 @@ AWS.IAM.MFAEnabled
Filters work for the `zip` and `upload` commands in the exact same way they work for the `test` command.
In addition to filtering, you can set a minimum number of unit tests with the `--minimum-tests` flag. Detections that don't have the minimum number of tests will be considered failing, and if `--minimum-tests` is set to 2 or greater it will also enforce that at least one test must return True and one must return False.

```
panther_analysis_tool test --path tests/fixtures/valid_policies --minimum-tests 2
% panther_analysis_tool test --path okta_rules --minimum-tests 2
[INFO]: Testing analysis packs in okta_rules
Okta.AdminRoleAssigned
[PASS] Admin Access Assigned
Okta.BruteForceLogins
[PASS] Failed login
Okta.GeographicallyImprobableAccess
[PASS] Non Login
[PASS] Failed Login
--------------------------
Panther CLI Test Summary
Path: okta_rules
Passed: 0
Failed: 3
Invalid: 0
--------------------------
Failed Tests Summary
Okta.AdminRoleAssigned
['Insufficient test coverage, 2 tests required but only 1 found.', 'Insufficient test coverage: expected at least one passing and one failing test.']
Okta.BruteForceLogins
['Insufficient test coverage, 2 tests required but only 1 found.', 'Insufficient test coverage: expected at least one passing and one failing test.']
Okta.GeographicallyImprobableAccess
['Insufficient test coverage: expected at least one passing and one failing test.']
```

So in this case even though the rules passed all their tests, they're still considered failing because they do not have the correct test coverage.
## Writing Policies
Each Panther Policy consists of a Python body and a YAML or JSON specification file.
Expand Down
52 changes: 47 additions & 5 deletions panther_analysis_tool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ def test_analysis(args: argparse.Namespace) -> Tuple[int, list]:

# then, import rules and policies; run tests
failed_tests, invalid_detection = setup_run_tests(log_type_to_data_model,
analysis)
analysis,
args.minimum_tests)
invalid_specs.extend(invalid_detection)

print_summary(args.path, len(analysis), failed_tests, invalid_specs)
Expand Down Expand Up @@ -388,8 +389,8 @@ def setup_data_models(


def setup_run_tests(
log_type_to_data_model: Dict[str, DataModel],
analysis: List[Any]) -> Tuple[DefaultDict[str, List[Any]], List[Any]]:
log_type_to_data_model: Dict[str, DataModel], analysis: List[Any],
minimum_tests: int) -> Tuple[DefaultDict[str, List[Any]], List[Any]]:
invalid_specs = []
failed_tests: DefaultDict[str, list] = defaultdict(list)
for analysis_spec_filename, dir_name, analysis_spec in analysis:
Expand All @@ -415,7 +416,8 @@ def setup_run_tests(
analysis_funcs['title'] = module.title

failed_tests = run_tests(analysis_spec, analysis_funcs,
log_type_to_data_model, failed_tests)
log_type_to_data_model, failed_tests,
minimum_tests)
print('')
return failed_tests, invalid_specs

Expand Down Expand Up @@ -556,7 +558,13 @@ def handle_wrong_key_error(err: SchemaWrongKeyError, keys: list) -> Exception:

def run_tests(analysis: Dict[str, Any], analysis_funcs: Dict[str, Any],
analysis_data_models: Dict[str, DataModel],
failed_tests: DefaultDict[str, list]) -> DefaultDict[str, list]:
failed_tests: DefaultDict[str, list],
minimum_tests: int) -> DefaultDict[str, list]:

if len(analysis.get('Tests', [])) < minimum_tests:
failed_tests[analysis.get('PolicyID') or analysis['RuleID']].append(
'Insufficient test coverage: {} tests required but only {} found'.
format(minimum_tests, len(analysis.get('Tests', []))))

# First check if any tests exist, so we can print a helpful message if not
if 'Tests' not in analysis:
Expand Down Expand Up @@ -611,6 +619,13 @@ def run_tests(analysis: Dict[str, Any], analysis_funcs: Dict[str, Any],
print('\t\t[{}] [{}] {}'.format(
test_result[func], func, analysis_funcs[func](test_case)))

if minimum_tests > 1 and not (
[x for x in analysis['Tests'] if x['ExpectedResult']] and
[x for x in analysis['Tests'] if not x['ExpectedResult']]):
failed_tests[analysis.get('PolicyID') or analysis['RuleID']].append(
'Insufficient test coverage: expected at least one positive and one negative test'
)

return failed_tests


Expand All @@ -637,6 +652,15 @@ def setup_parser() -> argparse.ArgumentParser:
required=False,
metavar="KEY=VALUE",
nargs='+')
test_parser.add_argument(
'--minimum-tests',
default='0',
type=int,
help=
'The minimum number of tests in order for a detection to be considered passing. If a number'
+
'greater than 1 is specified, at least one True and one False test is required.',
required=False)
test_parser.add_argument('--debug', action='store_true', dest='debug')
test_parser.set_defaults(func=test_analysis)

Expand All @@ -661,6 +685,15 @@ def setup_parser() -> argparse.ArgumentParser:
required=False,
metavar="KEY=VALUE",
nargs='+')
zip_parser.add_argument(
'--minimum-tests',
default='0',
type=int,
help=
'The minimum number of tests in order for a detection to be considered passing. If a number'
+
'greater than 1 is specified, at least one True and one False test is required.',
required=False)
zip_parser.add_argument('--debug', action='store_true', dest='debug')
zip_parser.set_defaults(func=zip_analysis)

Expand All @@ -686,6 +719,15 @@ def setup_parser() -> argparse.ArgumentParser:
help=
'The location to store a local copy of the packaged policies and rules.',
required=False)
upload_parser.add_argument(
'--minimum-tests',
default='0',
type=int,
help=
'The minimum number of tests in order for a detection to be considered passing. If a number'
+
'greater than 1 is specified, at least one True and one False test is required.',
required=False)
upload_parser.add_argument('--filter',
required=False,
metavar="KEY=VALUE",
Expand Down
12 changes: 6 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
# functional dependencies
boto3==1.16.20
boto3==1.16.35
jsonpath-ng==1.5.2
ruamel.yaml==0.16.12
schema==0.7.2
# ci dependencies
bandit==1.6.2
bandit==1.7.0
mypy==0.790
pylint==2.6.0
pyfakefs==4.2.1
pyfakefs==4.3.2
yapf==0.30.0
nose==1.3.7
PyYAML==5.3.1
## The following requirements were added by pip freeze:
astroid==2.4.2
botocore==1.19.20
botocore==1.19.35
contextlib2==0.6.0.post1
decorator==4.4.2
gitdb==4.0.5
GitPython==3.1.11
importlib-metadata==2.0.0
importlib-metadata==3.3.0
isort==5.6.4
jmespath==0.10.0
lazy-object-proxy==1.4.3
Expand All @@ -31,7 +31,7 @@ ruamel.yaml.clib==0.2.2
s3transfer==0.3.3
six==1.15.0
smmap==3.0.4
stevedore==3.2.2
stevedore==3.3.0
toml==0.10.2
typed-ast==1.4.1
typing-extensions==3.7.4.3
Expand Down
14 changes: 14 additions & 0 deletions tests/fixtures/example_policy_required_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import panther
IGNORED_USERS = {}


def policy(resource):
if resource['UserName'] in IGNORED_USERS:
return False

cred_report = resource.get('CredentialReport', {})
if not cred_report:
return True

return cred_report.get('PasswordEnabled', False) and cred_report.get(
'MfaActive', False)
33 changes: 33 additions & 0 deletions tests/fixtures/example_policy_required_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
AnalysisType: policy
Filename: example_policy_required_tests.py
DisplayName: MFA Is Enabled For User
Description: MFA is a security best practice that adds an extra layer of protection for your AWS account logins.
Severity: High
PolicyID: IAM.MFAEnabled.Required.Tests
Enabled: true
ResourceTypes:
- AWS.IAM.RootUser.Snapshot
- AWS.IAM.User.Snapshot
Tags:
- AWS Managed Rules - Security, Identity & Compliance
- AWS
- CIS
- SOC2
Runbook: >
Find out who disabled MFA on the account.
Reference: https://www.link-to-info.io
Suppressions:
- aws:resource:1
- aws:.*:other-resource
Tests:
-
Name: Root MFA not enabled triggers a violation.
ExpectedResult: false
ResourceType: AWS.IAM.User.Snapshot (extraneous field)
Resource:
Arn: arn:aws:iam::123456789012:user/root
CreateDate: 2019-01-01T00:00:00Z
CredentialReport:
MfaActive: false
PasswordEnabled: true
UserName: root
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import panther
IGNORED_USERS = {}


def policy(resource):
if resource['UserName'] in IGNORED_USERS:
return False

cred_report = resource.get('CredentialReport', {})
if not cred_report:
return True

return cred_report.get('PasswordEnabled', False) and cred_report.get(
'MfaActive', False)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
AnalysisType: policy
Filename: example_policy.py
Filename: example_policy_extraneous_fields.py
DisplayName: MFA Is Enabled For User
Description: MFA is a security best practice that adds an extra layer of protection for your AWS account logins.
Severity: High
Expand Down Expand Up @@ -30,4 +30,4 @@ Tests:
CredentialReport:
MfaActive: false
PasswordEnabled: true
UserName: root
UserName: root
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from panther import test_helper # pylint: disable=import-error

IGNORED_USERS = {}


def rule(event):
if event['UserName'] in IGNORED_USERS:
return False

cred_report = event.get('CredentialReport', {})
if not cred_report:
return True

return (test_helper() and
cred_report.get('PasswordEnabled', False) and
cred_report.get('MfaActive', False))

def dedup(event):
return event['UserName']

def title(event):
return '{} does not have MFA enabled'.format(event['UserName'])
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
AnalysisType: rule
Filename: example_rule.py
Filename: example_rule_extraneous_fields.py
DisplayName: MFA Rule
Description: MFA is a security best practice that adds an extra layer of protection for your AWS account logins.
Severity: High
Expand Down
21 changes: 21 additions & 0 deletions tests/unit/panther_analysis_tool/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,27 @@ def test_with_tag_filters(self):
assert_equal(return_code, 0)
assert_equal(len(invalid_specs), 0)

def test_with_minimum_tests(self):
args = pat.setup_parser().parse_args('test --path tests/fixtures/valid_analysis --minimum-tests 1'.split())
return_code, invalid_specs = pat.test_analysis(args)
assert_equal(return_code, 0)
assert_equal(len(invalid_specs), 0)

def test_with_minimum_tests_failing(self):
args = pat.setup_parser().parse_args('test --path tests/fixtures/valid_analysis --minimum-tests 2'.split())
return_code, invalid_specs = pat.test_analysis(args)
# Failing, because some of the fixtures only have one test case
assert_equal(return_code, 1)
assert_equal(len(invalid_specs), 0)

def test_with_minimum_tests_no_passing(self):
args = pat.setup_parser().parse_args('test --path tests/fixtures --filter PolicyID=IAM.MFAEnabled.Required.Tests --minimum-tests 2'.split())
args.filter = pat.parse_filter(args.filter)
return_code, invalid_specs = pat.test_analysis(args)
# Failing, because while there are two unit tests they both have expected result False
assert_equal(return_code, 1)
assert_equal(len(invalid_specs), 4)

def test_zip_analysis(self):
# Note: This is a workaround for CI
try:
Expand Down

0 comments on commit 5e8c009

Please sign in to comment.