From 5e8c009de374ea1daff17f944ea0f10ec4206590 Mon Sep 17 00:00:00 2001 From: Nick <49166439+nhakmiller@users.noreply.github.com> Date: Tue, 15 Dec 2020 13:53:42 -0800 Subject: [PATCH] Allow force unit tests (#59) * allow force unit tests * tweaked phrasing * add readme stuff * tweak some formattin --- README.md | 61 +++++++++++++++---- panther_analysis_tool/main.py | 52 ++++++++++++++-- requirements.txt | 12 ++-- .../fixtures/example_policy_required_tests.py | 14 +++++ .../example_policy_required_tests.yml | 33 ++++++++++ .../example_policy_extraneous_fields.py | 14 +++++ .../example_policy_extraneous_fields.yml | 4 +- .../rules/example_rule_extraneous_fields.py | 22 +++++++ .../rules/example_rule_extraneous_fields.yml | 2 +- tests/unit/panther_analysis_tool/test_main.py | 21 +++++++ 10 files changed, 210 insertions(+), 25 deletions(-) create mode 100644 tests/fixtures/example_policy_required_tests.py create mode 100644 tests/fixtures/example_policy_required_tests.yml create mode 100644 tests/fixtures/valid_analysis/policies/example_policy_extraneous_fields.py create mode 100644 tests/fixtures/valid_analysis/rules/example_rule_extraneous_fields.py diff --git a/README.md b/README.md index cacac1d0..78d102d8 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,8 @@ positional arguments: optional arguments: -h, --help show this help message and exit - --version show program's version number and exit``` + --version show program's version number and exit +``` Run tests: @@ -136,11 +137,11 @@ AWS.CloudTrail.MFAEnabled } ``` -The `test`, `zip`, and `upload` commands all supporting filtering. Filtering works by passing the `--filter` argument with a list of filters specified in the format `KEY=VALUE1,VALUE2`. The keys can be any valid field in a policy or rule. When using a filter, only anaylsis that matches each filter specified will be considered. For example, the following command will test only items with the AnalysisType of policy AND the severity of High: +The `test`, `zip`, and `upload` commands all support filtering. Filtering works by passing the `--filter` argument with a list of filters specified in the format `KEY=VALUE1,VALUE2`. The keys can be any valid field in a policy or rule. When using a filter, only anaylsis that matches each filter specified will be considered. For example, the following command will test only items with the AnalysisType of policy AND the severity of High: ``` -panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy Severity=High -[INFO]: Testing analysis packs in tests/fixtures/valid\_policies +panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy Severity=High +[INFO]: Testing analysis packs in tests/fixtures/valid_policies AWS.IAM.BetaTest [PASS] Root MFA not enabled fails compliance @@ -150,8 +151,8 @@ AWS.IAM.BetaTest Whereas the following command will test items with the AnalysisType policy OR rule, AND the severity High: ``` -panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,rule Severity=High -[INFO]: Testing analysis packs in tests/fixtures/valid\_policies +panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,rule Severity=High +[INFO]: Testing analysis packs in tests/fixtures/valid_policies AWS.IAM.BetaTest [PASS] Root MFA not enabled fails compliance @@ -165,13 +166,13 @@ AWS.CloudTrail.MFAEnabled When writing policies or rules that refer to the `global` analysis types, be sure to include them in your filter. You can include an empty string as a value in a filter, and it will mean the filter is only applied if the field exists. The following command will return an error, because the policy in question imports a global but the global does not have a severity so it is excluded by the filter: ``` -panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,global Severity=Critical -[INFO]: Testing analysis packs in tests/fixtures/valid\_policies +panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global Severity=Critical +[INFO]: Testing analysis packs in tests/fixtures/valid_policies AWS.IAM.MFAEnabled [ERROR] Error loading module, skipping -Invalid: tests/fixtures/valid\_policies/example\_policy.yml +Invalid: tests/fixtures/valid_policies/example_policy.yml No module named 'panther' [ERROR]: [('tests/fixtures/valid_policies/example_policy.yml', ModuleNotFoundError("No module named 'panther'"))] @@ -180,8 +181,8 @@ Invalid: tests/fixtures/valid\_policies/example\_policy.yml If you want this query to work, you need to allow for the severity field to be absent like this: ``` -panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,global Severity=Critical,"" -[INFO]: Testing analysis packs in tests/fixtures/valid\_policies +panther_analysis_tool test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global Severity=Critical,"" +[INFO]: Testing analysis packs in tests/fixtures/valid_policies AWS.IAM.MFAEnabled [PASS] Root MFA not enabled fails compliance @@ -190,6 +191,44 @@ AWS.IAM.MFAEnabled Filters work for the `zip` and `upload` commands in the exact same way they work for the `test` command. +In addition to filtering, you can set a minimum number of unit tests with the `--minimum-tests` flag. Detections that don't have the minimum number of tests will be considered failing, and if `--minimum-tests` is set to 2 or greater it will also enforce that at least one test must return True and one must return False. + +``` +panther_analysis_tool test --path tests/fixtures/valid_policies --minimum-tests 2 +% panther_analysis_tool test --path okta_rules --minimum-tests 2 +[INFO]: Testing analysis packs in okta_rules + +Okta.AdminRoleAssigned + [PASS] Admin Access Assigned + +Okta.BruteForceLogins + [PASS] Failed login + +Okta.GeographicallyImprobableAccess + [PASS] Non Login + [PASS] Failed Login + +-------------------------- +Panther CLI Test Summary + Path: okta_rules + Passed: 0 + Failed: 3 + Invalid: 0 + +-------------------------- +Failed Tests Summary + Okta.AdminRoleAssigned + ['Insufficient test coverage, 2 tests required but only 1 found.', 'Insufficient test coverage: expected at least one passing and one failing test.'] + + Okta.BruteForceLogins + ['Insufficient test coverage, 2 tests required but only 1 found.', 'Insufficient test coverage: expected at least one passing and one failing test.'] + + Okta.GeographicallyImprobableAccess + ['Insufficient test coverage: expected at least one passing and one failing test.'] +``` + +So in this case even though the rules passed all their tests, they're still considered failing because they do not have the correct test coverage. + ## Writing Policies Each Panther Policy consists of a Python body and a YAML or JSON specification file. diff --git a/panther_analysis_tool/main.py b/panther_analysis_tool/main.py index 6513bd19..f0ddbebd 100644 --- a/panther_analysis_tool/main.py +++ b/panther_analysis_tool/main.py @@ -331,7 +331,8 @@ def test_analysis(args: argparse.Namespace) -> Tuple[int, list]: # then, import rules and policies; run tests failed_tests, invalid_detection = setup_run_tests(log_type_to_data_model, - analysis) + analysis, + args.minimum_tests) invalid_specs.extend(invalid_detection) print_summary(args.path, len(analysis), failed_tests, invalid_specs) @@ -388,8 +389,8 @@ def setup_data_models( def setup_run_tests( - log_type_to_data_model: Dict[str, DataModel], - analysis: List[Any]) -> Tuple[DefaultDict[str, List[Any]], List[Any]]: + log_type_to_data_model: Dict[str, DataModel], analysis: List[Any], + minimum_tests: int) -> Tuple[DefaultDict[str, List[Any]], List[Any]]: invalid_specs = [] failed_tests: DefaultDict[str, list] = defaultdict(list) for analysis_spec_filename, dir_name, analysis_spec in analysis: @@ -415,7 +416,8 @@ def setup_run_tests( analysis_funcs['title'] = module.title failed_tests = run_tests(analysis_spec, analysis_funcs, - log_type_to_data_model, failed_tests) + log_type_to_data_model, failed_tests, + minimum_tests) print('') return failed_tests, invalid_specs @@ -556,7 +558,13 @@ def handle_wrong_key_error(err: SchemaWrongKeyError, keys: list) -> Exception: def run_tests(analysis: Dict[str, Any], analysis_funcs: Dict[str, Any], analysis_data_models: Dict[str, DataModel], - failed_tests: DefaultDict[str, list]) -> DefaultDict[str, list]: + failed_tests: DefaultDict[str, list], + minimum_tests: int) -> DefaultDict[str, list]: + + if len(analysis.get('Tests', [])) < minimum_tests: + failed_tests[analysis.get('PolicyID') or analysis['RuleID']].append( + 'Insufficient test coverage: {} tests required but only {} found'. + format(minimum_tests, len(analysis.get('Tests', [])))) # First check if any tests exist, so we can print a helpful message if not if 'Tests' not in analysis: @@ -611,6 +619,13 @@ def run_tests(analysis: Dict[str, Any], analysis_funcs: Dict[str, Any], print('\t\t[{}] [{}] {}'.format( test_result[func], func, analysis_funcs[func](test_case))) + if minimum_tests > 1 and not ( + [x for x in analysis['Tests'] if x['ExpectedResult']] and + [x for x in analysis['Tests'] if not x['ExpectedResult']]): + failed_tests[analysis.get('PolicyID') or analysis['RuleID']].append( + 'Insufficient test coverage: expected at least one positive and one negative test' + ) + return failed_tests @@ -637,6 +652,15 @@ def setup_parser() -> argparse.ArgumentParser: required=False, metavar="KEY=VALUE", nargs='+') + test_parser.add_argument( + '--minimum-tests', + default='0', + type=int, + help= + 'The minimum number of tests in order for a detection to be considered passing. If a number' + + + 'greater than 1 is specified, at least one True and one False test is required.', + required=False) test_parser.add_argument('--debug', action='store_true', dest='debug') test_parser.set_defaults(func=test_analysis) @@ -661,6 +685,15 @@ def setup_parser() -> argparse.ArgumentParser: required=False, metavar="KEY=VALUE", nargs='+') + zip_parser.add_argument( + '--minimum-tests', + default='0', + type=int, + help= + 'The minimum number of tests in order for a detection to be considered passing. If a number' + + + 'greater than 1 is specified, at least one True and one False test is required.', + required=False) zip_parser.add_argument('--debug', action='store_true', dest='debug') zip_parser.set_defaults(func=zip_analysis) @@ -686,6 +719,15 @@ def setup_parser() -> argparse.ArgumentParser: help= 'The location to store a local copy of the packaged policies and rules.', required=False) + upload_parser.add_argument( + '--minimum-tests', + default='0', + type=int, + help= + 'The minimum number of tests in order for a detection to be considered passing. If a number' + + + 'greater than 1 is specified, at least one True and one False test is required.', + required=False) upload_parser.add_argument('--filter', required=False, metavar="KEY=VALUE", diff --git a/requirements.txt b/requirements.txt index 1ad49467..b2fc3315 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,24 +1,24 @@ # functional dependencies -boto3==1.16.20 +boto3==1.16.35 jsonpath-ng==1.5.2 ruamel.yaml==0.16.12 schema==0.7.2 # ci dependencies -bandit==1.6.2 +bandit==1.7.0 mypy==0.790 pylint==2.6.0 -pyfakefs==4.2.1 +pyfakefs==4.3.2 yapf==0.30.0 nose==1.3.7 PyYAML==5.3.1 ## The following requirements were added by pip freeze: astroid==2.4.2 -botocore==1.19.20 +botocore==1.19.35 contextlib2==0.6.0.post1 decorator==4.4.2 gitdb==4.0.5 GitPython==3.1.11 -importlib-metadata==2.0.0 +importlib-metadata==3.3.0 isort==5.6.4 jmespath==0.10.0 lazy-object-proxy==1.4.3 @@ -31,7 +31,7 @@ ruamel.yaml.clib==0.2.2 s3transfer==0.3.3 six==1.15.0 smmap==3.0.4 -stevedore==3.2.2 +stevedore==3.3.0 toml==0.10.2 typed-ast==1.4.1 typing-extensions==3.7.4.3 diff --git a/tests/fixtures/example_policy_required_tests.py b/tests/fixtures/example_policy_required_tests.py new file mode 100644 index 00000000..036bc88c --- /dev/null +++ b/tests/fixtures/example_policy_required_tests.py @@ -0,0 +1,14 @@ +import panther +IGNORED_USERS = {} + + +def policy(resource): + if resource['UserName'] in IGNORED_USERS: + return False + + cred_report = resource.get('CredentialReport', {}) + if not cred_report: + return True + + return cred_report.get('PasswordEnabled', False) and cred_report.get( + 'MfaActive', False) diff --git a/tests/fixtures/example_policy_required_tests.yml b/tests/fixtures/example_policy_required_tests.yml new file mode 100644 index 00000000..41a21704 --- /dev/null +++ b/tests/fixtures/example_policy_required_tests.yml @@ -0,0 +1,33 @@ +AnalysisType: policy +Filename: example_policy_required_tests.py +DisplayName: MFA Is Enabled For User +Description: MFA is a security best practice that adds an extra layer of protection for your AWS account logins. +Severity: High +PolicyID: IAM.MFAEnabled.Required.Tests +Enabled: true +ResourceTypes: + - AWS.IAM.RootUser.Snapshot + - AWS.IAM.User.Snapshot +Tags: + - AWS Managed Rules - Security, Identity & Compliance + - AWS + - CIS + - SOC2 +Runbook: > + Find out who disabled MFA on the account. +Reference: https://www.link-to-info.io +Suppressions: + - aws:resource:1 + - aws:.*:other-resource +Tests: + - + Name: Root MFA not enabled triggers a violation. + ExpectedResult: false + ResourceType: AWS.IAM.User.Snapshot (extraneous field) + Resource: + Arn: arn:aws:iam::123456789012:user/root + CreateDate: 2019-01-01T00:00:00Z + CredentialReport: + MfaActive: false + PasswordEnabled: true + UserName: root diff --git a/tests/fixtures/valid_analysis/policies/example_policy_extraneous_fields.py b/tests/fixtures/valid_analysis/policies/example_policy_extraneous_fields.py new file mode 100644 index 00000000..036bc88c --- /dev/null +++ b/tests/fixtures/valid_analysis/policies/example_policy_extraneous_fields.py @@ -0,0 +1,14 @@ +import panther +IGNORED_USERS = {} + + +def policy(resource): + if resource['UserName'] in IGNORED_USERS: + return False + + cred_report = resource.get('CredentialReport', {}) + if not cred_report: + return True + + return cred_report.get('PasswordEnabled', False) and cred_report.get( + 'MfaActive', False) diff --git a/tests/fixtures/valid_analysis/policies/example_policy_extraneous_fields.yml b/tests/fixtures/valid_analysis/policies/example_policy_extraneous_fields.yml index 9b20f41e..7c2a6e89 100644 --- a/tests/fixtures/valid_analysis/policies/example_policy_extraneous_fields.yml +++ b/tests/fixtures/valid_analysis/policies/example_policy_extraneous_fields.yml @@ -1,5 +1,5 @@ AnalysisType: policy -Filename: example_policy.py +Filename: example_policy_extraneous_fields.py DisplayName: MFA Is Enabled For User Description: MFA is a security best practice that adds an extra layer of protection for your AWS account logins. Severity: High @@ -30,4 +30,4 @@ Tests: CredentialReport: MfaActive: false PasswordEnabled: true - UserName: root \ No newline at end of file + UserName: root diff --git a/tests/fixtures/valid_analysis/rules/example_rule_extraneous_fields.py b/tests/fixtures/valid_analysis/rules/example_rule_extraneous_fields.py new file mode 100644 index 00000000..34ad019c --- /dev/null +++ b/tests/fixtures/valid_analysis/rules/example_rule_extraneous_fields.py @@ -0,0 +1,22 @@ +from panther import test_helper # pylint: disable=import-error + +IGNORED_USERS = {} + + +def rule(event): + if event['UserName'] in IGNORED_USERS: + return False + + cred_report = event.get('CredentialReport', {}) + if not cred_report: + return True + + return (test_helper() and + cred_report.get('PasswordEnabled', False) and + cred_report.get('MfaActive', False)) + +def dedup(event): + return event['UserName'] + +def title(event): + return '{} does not have MFA enabled'.format(event['UserName']) diff --git a/tests/fixtures/valid_analysis/rules/example_rule_extraneous_fields.yml b/tests/fixtures/valid_analysis/rules/example_rule_extraneous_fields.yml index 582a0440..1e02b5d9 100644 --- a/tests/fixtures/valid_analysis/rules/example_rule_extraneous_fields.yml +++ b/tests/fixtures/valid_analysis/rules/example_rule_extraneous_fields.yml @@ -1,5 +1,5 @@ AnalysisType: rule -Filename: example_rule.py +Filename: example_rule_extraneous_fields.py DisplayName: MFA Rule Description: MFA is a security best practice that adds an extra layer of protection for your AWS account logins. Severity: High diff --git a/tests/unit/panther_analysis_tool/test_main.py b/tests/unit/panther_analysis_tool/test_main.py index 99ec3fa0..5e37865a 100644 --- a/tests/unit/panther_analysis_tool/test_main.py +++ b/tests/unit/panther_analysis_tool/test_main.py @@ -147,6 +147,27 @@ def test_with_tag_filters(self): assert_equal(return_code, 0) assert_equal(len(invalid_specs), 0) + def test_with_minimum_tests(self): + args = pat.setup_parser().parse_args('test --path tests/fixtures/valid_analysis --minimum-tests 1'.split()) + return_code, invalid_specs = pat.test_analysis(args) + assert_equal(return_code, 0) + assert_equal(len(invalid_specs), 0) + + def test_with_minimum_tests_failing(self): + args = pat.setup_parser().parse_args('test --path tests/fixtures/valid_analysis --minimum-tests 2'.split()) + return_code, invalid_specs = pat.test_analysis(args) + # Failing, because some of the fixtures only have one test case + assert_equal(return_code, 1) + assert_equal(len(invalid_specs), 0) + + def test_with_minimum_tests_no_passing(self): + args = pat.setup_parser().parse_args('test --path tests/fixtures --filter PolicyID=IAM.MFAEnabled.Required.Tests --minimum-tests 2'.split()) + args.filter = pat.parse_filter(args.filter) + return_code, invalid_specs = pat.test_analysis(args) + # Failing, because while there are two unit tests they both have expected result False + assert_equal(return_code, 1) + assert_equal(len(invalid_specs), 4) + def test_zip_analysis(self): # Note: This is a workaround for CI try: