Skip to content

Commit

Permalink
Add support for generic filtering (#22)
Browse files Browse the repository at this point in the history
* add support for generic filtering

* addressed pr feedback

* added examples of filtering to the readme
  • Loading branch information
nhakmiller authored May 4, 2020
1 parent fb33cab commit 3cc45f9
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 11 deletions.
56 changes: 54 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ $ pip3 install -e .

### Commands and Usage

It is important to note that wherever the `panther_analysis_tool` refers to policies, it is actually referring to both policies and rules. So the `--policies` flag can be passed either a directory of policies, rules, or both.

View available commands:

```bash
Expand Down Expand Up @@ -138,6 +136,60 @@ AWS.CloudTrail.MFAEnabled
}
```
The `test`, `zip`, and `upload` commands all supporting filtering. Filtering works by passing the `--filter` argument with a list of filters specified in the format `KEY=VALUE1,VALUE2`. The keys can be any valid field in a policy or rule. When using a filter, only anaylsis that matches each filter specified will be considered. For example, the following command will test only items with the AnalysisType of policy AND the severity of High:
```
panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy Severity=High
[INFO]: Testing analysis packs in tests/fixtures/valid\_policies
AWS.IAM.BetaTest
[PASS] Root MFA not enabled fails compliance
[PASS] User MFA not enabled fails compliance
```
Whereas the following command will test items with the AnalysisType policy OR rule, AND the severity High:
```
panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,rule Severity=High
[INFO]: Testing analysis packs in tests/fixtures/valid\_policies
AWS.IAM.BetaTest
[PASS] Root MFA not enabled fails compliance
[PASS] User MFA not enabled fails compliance
AWS.CloudTrail.MFAEnabled
[PASS] Root MFA not enabled fails compliance
[PASS] User MFA not enabled fails compliance
```
When writing policies or rules that refer to the `global` analysis types, be sure to include them in your filter. You can include an empty string as a value in a filter, and it will mean the filter is only applied if the field exists. The following command will return an error, because the policy in question imports a global but the global does not have a severity so it is excluded by the filter:
```
panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,global Severity=Critical
[INFO]: Testing analysis packs in tests/fixtures/valid\_policies
AWS.IAM.MFAEnabled
[ERROR] Error loading module, skipping
Invalid: tests/fixtures/valid\_policies/example\_policy.yml
No module named 'panther'
[ERROR]: [('tests/fixtures/valid_policies/example_policy.yml', ModuleNotFoundError("No module named 'panther'"))]
```
If you want this query to work, you need to allow for the severity field to be absent like this:
```
panther\_analysis\_tool test --path tests/fixtures/valid\_policies --filter AnalysisType=policy,global Severity=Critical,""
[INFO]: Testing analysis packs in tests/fixtures/valid\_policies
AWS.IAM.MFAEnabled
[PASS] Root MFA not enabled fails compliance
[PASS] User MFA not enabled fails compliance
```
Filters work for the `zip` and `upload` commands in the exact same way they work for the `test` command.
## Writing Policies
Each Panther Policy consists of a Python body and a YAML or JSON specification file.
Expand Down
68 changes: 62 additions & 6 deletions panther_analysis_tool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import json
import logging
import os
import shutil
import sys
from typing import Any, Callable, DefaultDict, Dict, Iterator, List, Tuple
import zipfile
from schema import (Optional, Or, Schema, SchemaError, SchemaMissingKeyError,
SchemaForbiddenKeyError, SchemaUnexpectedTypeError)
import yaml
Expand Down Expand Up @@ -222,10 +222,15 @@ def zip_analysis(args: argparse.Namespace) -> Tuple[int, str]:
# The colon character is not valid in filenames.
current_time = datetime.now().isoformat(timespec='seconds').replace(
':', '-')
filename = 'panther-analysis'
return 0, shutil.make_archive(
os.path.join(args.out, '{}-{}'.format(filename, current_time)), 'zip',
args.path)
filename = 'panther-analysis-{}.zip'.format(current_time)
with zipfile.ZipFile(filename, 'w', zipfile.ZIP_DEFLATED) as zip_out:
analysis = filter_analysis(list(load_analysis_specs(args.path)),
args.filter)
for analysis_spec_filename, dir_name, analysis_spec in analysis:
zip_out.write(analysis_spec_filename)
zip_out.write(os.path.join(dir_name, analysis_spec['Filename']))

return 0, filename


def upload_analysis(args: argparse.Namespace) -> Tuple[int, str]:
Expand Down Expand Up @@ -302,6 +307,10 @@ def test_analysis(args: argparse.Namespace) -> Tuple[int, list]:
specs = list(load_analysis_specs(args.path))
global_analysis, analysis, invalid_specs = classify_analysis(specs)

# Apply the filters as needed
global_analysis = filter_analysis(global_analysis, args.filter)
analysis = filter_analysis(analysis, args.filter)

# First import the globals
for analysis_spec_filename, dir_name, analysis_spec in global_analysis:
module, load_err = load_module(
Expand Down Expand Up @@ -349,6 +358,20 @@ def test_analysis(args: argparse.Namespace) -> Tuple[int, list]:
return int(bool(failed_tests or invalid_specs)), invalid_specs


def filter_analysis(analysis: List[Any], filters: Dict[str, List]) -> List[Any]:
if filters is None:
return analysis

filtered_analysis = []
for file_name, dir_name, analysis_spec in analysis:
if all(
analysis_spec.get(key, "") in values
for key, values in filters.items()):
filtered_analysis.append((file_name, dir_name, analysis_spec))

return filtered_analysis


def classify_analysis(
specs: List[Tuple[str, str, Any]]
) -> Tuple[List[Any], List[Any], List[Any]]:
Expand Down Expand Up @@ -415,7 +438,7 @@ def setup_parser() -> argparse.ArgumentParser:
prog='panther_analysis_tool')
parser.add_argument('--version',
action='version',
version='panther_analysis_tool 0.2.1')
version='panther_analysis_tool 0.2.2')
subparsers = parser.add_subparsers()

test_parser = subparsers.add_parser(
Expand All @@ -426,6 +449,10 @@ def setup_parser() -> argparse.ArgumentParser:
type=str,
help='The relative path to Panther policies and rules.',
required=True)
test_parser.add_argument('--filter',
required=False,
metavar="KEY=VALUE",
nargs='+')
test_parser.set_defaults(func=test_analysis)

zip_parser = subparsers.add_parser(
Expand All @@ -443,6 +470,10 @@ def setup_parser() -> argparse.ArgumentParser:
type=str,
help='The path to write zipped policies and rules to.',
required=True)
zip_parser.add_argument('--filter',
required=False,
metavar="KEY=VALUE",
nargs='+')
zip_parser.set_defaults(func=zip_analysis)

upload_parser = subparsers.add_parser(
Expand All @@ -460,18 +491,43 @@ def setup_parser() -> argparse.ArgumentParser:
help=
'The location to store a local copy of the packaged policies and rules.',
required=False)
upload_parser.add_argument('--filter',
required=False,
metavar="KEY=VALUE",
nargs='+')
upload_parser.set_defaults(func=upload_analysis)

return parser


# Parses the filters, expects a list of strings
def parse_filter(filters: List[str]) -> Dict[str, Any]:
parsed_filters = {}
for filt in filters:
split = filt.split('=')
if len(split) != 2 or split[0] == '' or split[1] == '':
logging.warning('Filter %s is not in format KEY=VALUE, skipping',
filt)
continue
key = split[0]
if key not in list(GLOBAL_SCHEMA.schema.keys()) + list(
POLICY_SCHEMA.schema.keys()) + list(RULE_SCHEMA.schema.keys()):
logging.warning(
'Filter key %s is not a valid filter field, skipping', key)
continue
parsed_filters[key] = split[1].split(',')
return parsed_filters


def run() -> None:
logging.basicConfig(format='[%(levelname)s]: %(message)s',
level=logging.INFO)

parser = setup_parser()
args = parser.parse_args()
try:
if args.filter is not None:
args.filter = parse_filter(args.filter)
return_code, out = args.func(args)
except AttributeError:
parser.print_help()
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
setup(
name='panther_analysis_tool',
packages=['panther_analysis_tool'],
version='0.2.1',
version='0.2.2',
license='apache-2.0',
description=
'Panther command line interface for writing, testing, and packaging policies/rules.',
author='Panther Labs Inc',
author_email='pypi@runpanther.io',
url='https://github.com/panther-labs/panther_analysis_tool',
download_url = 'https://github.com/panther-labs/panther_analysis_tool/archive/v0.2.1.tar.gz',
download_url = 'https://github.com/panther-labs/panther_analysis_tool/archive/v0.2.2.tar.gz',
keywords=['Security', 'CLI'],
scripts=['bin/panther_analysis_tool'],
install_requires=[
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/valid_policies/example_policy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ AnalysisType: policy
Filename: example_policy.py
DisplayName: MFA Is Enabled For User
Description: MFA is a security best practice that adds an extra layer of protection for your AWS account logins.
Severity: High
Severity: Critical
PolicyID: AWS.IAM.MFAEnabled
Enabled: true
ResourceTypes:
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/panther_analysis_tool/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ def test_load_policy_specs_from_folder(self):
assert_equal(invalid_specs[0][0],
'tests/fixtures/example_malformed_policy.yml')

def test_parse_filters(self):
args = pat.setup_parser().parse_args('test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global Severity=Critical'.split())
args.filter = pat.parse_filter(args.filter)
assert_true('AnalysisType' in args.filter.keys())
assert_true('policy' in args.filter['AnalysisType'])
assert_true('global' in args.filter['AnalysisType'])
assert_true('Severity' in args.filter.keys())
assert_true('Critical' in args.filter['Severity'])

def test_with_filters(self):
args = pat.setup_parser().parse_args('test --path tests/fixtures/valid_policies --filter AnalysisType=policy,global'.split())
args.filter = pat.parse_filter(args.filter)
return_code, invalid_specs = pat.test_analysis(args)
assert_equal(return_code, 0)
assert_equal(len(invalid_specs), 0)

def test_zip_analysis(self):
# Note: This is a workaround for CI
try:
Expand Down

0 comments on commit 3cc45f9

Please sign in to comment.