From 3cc45f9815060d3a3a98565720258fd98dcceaf0 Mon Sep 17 00:00:00 2001 From: Nick <49166439+nhakmiller@users.noreply.github.com> Date: Mon, 4 May 2020 09:56:39 -0700 Subject: [PATCH] Add support for generic filtering (#22) * add support for generic filtering * addressed pr feedback * added examples of filtering to the readme --- README.md | 56 ++++++++++++++- panther_analysis_tool/main.py | 68 +++++++++++++++++-- setup.py | 4 +- .../valid_policies/example_policy.yml | 2 +- tests/unit/panther_analysis_tool/test_main.py | 16 +++++ 5 files changed, 135 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 694919e7..6bfe88ee 100644 --- a/README.md +++ b/README.md @@ -56,8 +56,6 @@ $ pip3 install -e . ### Commands and Usage -It is important to note that wherever the `panther_analysis_tool` refers to policies, it is actually referring to both policies and rules. So the `--policies` flag can be passed either a directory of policies, rules, or both. - View available commands: ```bash @@ -138,6 +136,60 @@ AWS.CloudTrail.MFAEnabled } ``` +The `test`, `zip`, and `upload` commands all supporting filtering. Filtering works by passing the `--filter` argument with a list of filters specified in the format `KEY=VALUE1,VALUE2`. The keys can be any valid field in a policy or rule. When using a filter, only anaylsis that matches each filter specified will be considered. For example, the following command will test only items with the AnalysisType of policy AND the severity of High: + +``` +panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy Severity=High +[INFO]: Testing analysis packs in tests/fixtures/valid\_policies + +AWS.IAM.BetaTest + [PASS] Root MFA not enabled fails compliance + [PASS] User MFA not enabled fails compliance +``` + +Whereas the following command will test items with the AnalysisType policy OR rule, AND the severity High: + +``` +panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,rule Severity=High +[INFO]: Testing analysis packs in tests/fixtures/valid\_policies + +AWS.IAM.BetaTest + [PASS] Root MFA not enabled fails compliance + [PASS] User MFA not enabled fails compliance + +AWS.CloudTrail.MFAEnabled + [PASS] Root MFA not enabled fails compliance + [PASS] User MFA not enabled fails compliance +``` + +When writing policies or rules that refer to the `global` analysis types, be sure to include them in your filter. You can include an empty string as a value in a filter, and it will mean the filter is only applied if the field exists. The following command will return an error, because the policy in question imports a global but the global does not have a severity so it is excluded by the filter: + +``` +panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,global Severity=Critical +[INFO]: Testing analysis packs in tests/fixtures/valid\_policies + +AWS.IAM.MFAEnabled + [ERROR] Error loading module, skipping + +Invalid: tests/fixtures/valid\_policies/example\_policy.yml + No module named 'panther' + +[ERROR]: [('tests/fixtures/valid_policies/example_policy.yml', ModuleNotFoundError("No module named 'panther'"))] +``` + +If you want this query to work, you need to allow for the severity field to be absent like this: + +``` +panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,global Severity=Critical,"" +[INFO]: Testing analysis packs in tests/fixtures/valid\_policies + +AWS.IAM.MFAEnabled + [PASS] Root MFA not enabled fails compliance + [PASS] User MFA not enabled fails compliance +``` + +Filters work for the `zip` and `upload` commands in the exact same way they work for the `test` command. + ## Writing Policies Each Panther Policy consists of a Python body and a YAML or JSON specification file. diff --git a/panther_analysis_tool/main.py b/panther_analysis_tool/main.py index b182c172..3233375a 100644 --- a/panther_analysis_tool/main.py +++ b/panther_analysis_tool/main.py @@ -23,9 +23,9 @@ import json import logging import os -import shutil import sys from typing import Any, Callable, DefaultDict, Dict, Iterator, List, Tuple +import zipfile from schema import (Optional, Or, Schema, SchemaError, SchemaMissingKeyError, SchemaForbiddenKeyError, SchemaUnexpectedTypeError) import yaml @@ -222,10 +222,15 @@ def zip_analysis(args: argparse.Namespace) -> Tuple[int, str]: # The colon character is not valid in filenames. current_time = datetime.now().isoformat(timespec='seconds').replace( ':', '-') - filename = 'panther-analysis' - return 0, shutil.make_archive( - os.path.join(args.out, '{}-{}'.format(filename, current_time)), 'zip', - args.path) + filename = 'panther-analysis-{}.zip'.format(current_time) + with zipfile.ZipFile(filename, 'w', zipfile.ZIP_DEFLATED) as zip_out: + analysis = filter_analysis(list(load_analysis_specs(args.path)), + args.filter) + for analysis_spec_filename, dir_name, analysis_spec in analysis: + zip_out.write(analysis_spec_filename) + zip_out.write(os.path.join(dir_name, analysis_spec['Filename'])) + + return 0, filename def upload_analysis(args: argparse.Namespace) -> Tuple[int, str]: @@ -302,6 +307,10 @@ def test_analysis(args: argparse.Namespace) -> Tuple[int, list]: specs = list(load_analysis_specs(args.path)) global_analysis, analysis, invalid_specs = classify_analysis(specs) + # Apply the filters as needed + global_analysis = filter_analysis(global_analysis, args.filter) + analysis = filter_analysis(analysis, args.filter) + # First import the globals for analysis_spec_filename, dir_name, analysis_spec in global_analysis: module, load_err = load_module( @@ -349,6 +358,20 @@ def test_analysis(args: argparse.Namespace) -> Tuple[int, list]: return int(bool(failed_tests or invalid_specs)), invalid_specs +def filter_analysis(analysis: List[Any], filters: Dict[str, List]) -> List[Any]: + if filters is None: + return analysis + + filtered_analysis = [] + for file_name, dir_name, analysis_spec in analysis: + if all( + analysis_spec.get(key, "") in values + for key, values in filters.items()): + filtered_analysis.append((file_name, dir_name, analysis_spec)) + + return filtered_analysis + + def classify_analysis( specs: List[Tuple[str, str, Any]] ) -> Tuple[List[Any], List[Any], List[Any]]: @@ -415,7 +438,7 @@ def setup_parser() -> argparse.ArgumentParser: prog='panther_analysis_tool') parser.add_argument('--version', action='version', - version='panther_analysis_tool 0.2.1') + version='panther_analysis_tool 0.2.2') subparsers = parser.add_subparsers() test_parser = subparsers.add_parser( @@ -426,6 +449,10 @@ def setup_parser() -> argparse.ArgumentParser: type=str, help='The relative path to Panther policies and rules.', required=True) + test_parser.add_argument('--filter', + required=False, + metavar="KEY=VALUE", + nargs='+') test_parser.set_defaults(func=test_analysis) zip_parser = subparsers.add_parser( @@ -443,6 +470,10 @@ def setup_parser() -> argparse.ArgumentParser: type=str, help='The path to write zipped policies and rules to.', required=True) + zip_parser.add_argument('--filter', + required=False, + metavar="KEY=VALUE", + nargs='+') zip_parser.set_defaults(func=zip_analysis) upload_parser = subparsers.add_parser( @@ -460,11 +491,34 @@ def setup_parser() -> argparse.ArgumentParser: help= 'The location to store a local copy of the packaged policies and rules.', required=False) + upload_parser.add_argument('--filter', + required=False, + metavar="KEY=VALUE", + nargs='+') upload_parser.set_defaults(func=upload_analysis) return parser +# Parses the filters, expects a list of strings +def parse_filter(filters: List[str]) -> Dict[str, Any]: + parsed_filters = {} + for filt in filters: + split = filt.split('=') + if len(split) != 2 or split[0] == '' or split[1] == '': + logging.warning('Filter %s is not in format KEY=VALUE, skipping', + filt) + continue + key = split[0] + if key not in list(GLOBAL_SCHEMA.schema.keys()) + list( + POLICY_SCHEMA.schema.keys()) + list(RULE_SCHEMA.schema.keys()): + logging.warning( + 'Filter key %s is not a valid filter field, skipping', key) + continue + parsed_filters[key] = split[1].split(',') + return parsed_filters + + def run() -> None: logging.basicConfig(format='[%(levelname)s]: %(message)s', level=logging.INFO) @@ -472,6 +526,8 @@ def run() -> None: parser = setup_parser() args = parser.parse_args() try: + if args.filter is not None: + args.filter = parse_filter(args.filter) return_code, out = args.func(args) except AttributeError: parser.print_help() diff --git a/setup.py b/setup.py index 3af1aafe..3134650c 100644 --- a/setup.py +++ b/setup.py @@ -2,14 +2,14 @@ setup( name='panther_analysis_tool', packages=['panther_analysis_tool'], - version='0.2.1', + version='0.2.2', license='apache-2.0', description= 'Panther command line interface for writing, testing, and packaging policies/rules.', author='Panther Labs Inc', author_email='pypi@runpanther.io', url='https://github.com/panther-labs/panther_analysis_tool', - download_url = 'https://github.com/panther-labs/panther_analysis_tool/archive/v0.2.1.tar.gz', + download_url = 'https://github.com/panther-labs/panther_analysis_tool/archive/v0.2.2.tar.gz', keywords=['Security', 'CLI'], scripts=['bin/panther_analysis_tool'], install_requires=[ diff --git a/tests/fixtures/valid_policies/example_policy.yml b/tests/fixtures/valid_policies/example_policy.yml index 56e91d4b..4ce28397 100644 --- a/tests/fixtures/valid_policies/example_policy.yml +++ b/tests/fixtures/valid_policies/example_policy.yml @@ -2,7 +2,7 @@ AnalysisType: policy Filename: example_policy.py DisplayName: MFA Is Enabled For User Description: MFA is a security best practice that adds an extra layer of protection for your AWS account logins. -Severity: High +Severity: Critical PolicyID: AWS.IAM.MFAEnabled Enabled: true ResourceTypes: diff --git a/tests/unit/panther_analysis_tool/test_main.py b/tests/unit/panther_analysis_tool/test_main.py index 554f9fa6..19dd1b4b 100644 --- a/tests/unit/panther_analysis_tool/test_main.py +++ b/tests/unit/panther_analysis_tool/test_main.py @@ -40,6 +40,22 @@ def test_load_policy_specs_from_folder(self): assert_equal(invalid_specs[0][0], 'tests/fixtures/example_malformed_policy.yml') + def test_parse_filters(self): + args = pat.setup_parser().parse_args('test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global Severity=Critical'.split()) + args.filter = pat.parse_filter(args.filter) + assert_true('AnalysisType' in args.filter.keys()) + assert_true('policy' in args.filter['AnalysisType']) + assert_true('global' in args.filter['AnalysisType']) + assert_true('Severity' in args.filter.keys()) + assert_true('Critical' in args.filter['Severity']) + + def test_with_filters(self): + args = pat.setup_parser().parse_args('test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global'.split()) + args.filter = pat.parse_filter(args.filter) + return_code, invalid_specs = pat.test_analysis(args) + assert_equal(return_code, 0) + assert_equal(len(invalid_specs), 0) + def test_zip_analysis(self): # Note: This is a workaround for CI try: