Skip to content

Commit

Permalink
Update Phrasing to be more General (#8)
Browse files Browse the repository at this point in the history
* made verbage less policy centric

* Updated readme

* fix Makefile

* renamed  output-path to path

* renamed again
  • Loading branch information
nhakmiller authored Mar 23, 2020
1 parent f0db84e commit 02f611d
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 113 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ unit:
pipenv run nosetests -v

integration:
panther_analysis_tool test --policies tests/fixtures/valid_policies/
panther_analysis_tool test --path tests/fixtures/valid_policies/
47 changes: 20 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,53 +64,52 @@ View available commands:
$ panther_analysis_tool --help
usage: panther_analysis_tool [-h] [--version] {test,zip,upload} ...

Panther Analysis Tool: A tool for writing, testing, and packaging Panther Policies/Rules
Panther Analaysis Tool: A command line tool for managing Panther policies and
rules.

positional arguments:
{test,zip,upload}
test Validate policy specifications and run policy tests.
zip Create an archive of local policies for uploading to
Panther.
upload Upload specified policies to a Panther deployment.
test Validate analysis specifications and run policy and rule
tests.
zip Create an archive of local policies and rules for
uploading to Panther.
upload Upload specified policies and rules to a Panther
deployment.

optional arguments:
-h, --help show this help message and exit
--version show program's version number and exit
```
--version show program's version number and exit```
Run tests:
```bash
$ panther_analysis_tool test --policies tests/fixtures/valid_policies/
[INFO]: Testing Policies in tests/fixtures/valid_policies/
$ panther_analysis_tool test --path tests/fixtures/valid_policies/
[INFO]: Testing analysis packs in tests/fixtures/valid_policies/
Testing policy 'AWS.IAM.MFAEnabled'
AWS.IAM.MFAEnabled
[PASS] Root MFA not enabled fails compliance
[PASS] User MFA not enabled fails compliance
```
Create packages to upload via the Panther UI:
```bash
$ panther_analysis_tool zip --policies tests/fixtures/valid_policies/ --output-path tmp
$ panther_analysis_tool zip --path tests/fixtures/valid_policies/ --out tmp
[INFO]: Testing analysis packs in tests/fixtures/valid_policies/
[INFO]: Testing Policies in tests/fixtures/valid_policies/
Testing policy 'AWS.IAM.MFAEnabled'
AWS.IAM.MFAEnabled
[PASS] Root MFA not enabled fails compliance
[PASS] User MFA not enabled fails compliance
[INFO]: Zipping policies in tests/fixtures/valid_policies/ to tmp
[INFO]: /Users/user_name/panther_analysis_tool/tmp/panther-policies-2019-01-01T16-00-00.zip
[INFO]: Zipping analysis packs in tests/fixtures/valid_policies/ to tmp
[INFO]: <current working directory>/tmp/panther-analysis-2020-03-23T12-48-18.zip
```
Upload packages to Panther directly. Note, this expects your environment to be setup the same way as if you were using the AWS CLI, see the setup instructions [here](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html). We also recommend using a credentials manager such as [aws-vault](https://github.com/99designs/aws-vault).
```bash
$ panther_analysis_tool upload --policies tests/fixtures/valid_policies/ --output-path tmp
[INFO]: Testing Policies in tests/fixtures/valid_policies/
$ panther_analysis_tool upload --path tests/fixtures/valid_policies/ --out tmp
[INFO]: Testing analysis packs in tests/fixtures/valid_policies/
AWS.IAM.MFAEnabled
[PASS] Root MFA not enabled fails compliance
Expand All @@ -124,7 +123,7 @@ AWS.CloudTrail.MFAEnabled
[PASS] Root MFA not enabled fails compliance
[PASS] User MFA not enabled fails compliance
[INFO]: Zipping policies in tests/fixtures/valid\_policies/ to tmp
[INFO]: Zipping analysis packs in tests/fixtures/valid_policies/ to tmp
[INFO]: Found credentials in environment variables.
[INFO]: Uploading pack to Panther
[INFO]: Upload success.
Expand All @@ -139,12 +138,6 @@ AWS.CloudTrail.MFAEnabled
}
```
In order to upload all currently available policies and rules to your Panther deployment, run the following command. This command will recursively traverse all directories under `analysis` and package all rules and policies it finds into one package and then upload them:
```bash
$ panther_analysis_tool upload --policies analysis/ --output-path tmp
```
## Writing Policies
Each Panther Policy consists of a Python body and a YAML or JSON specification file.
Expand Down
153 changes: 81 additions & 72 deletions panther_analysis_tool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def get(self, arg: str, default: Any = None) -> Any:


def load_module(filename: str) -> Tuple[Any, Any]:
"""Loads the Policy function module from a file.
"""Loads the analysis function module from a file.
Args:
filename: The relative path to the file.
Expand All @@ -114,14 +114,14 @@ def load_module(filename: str) -> Tuple[Any, Any]:
return module, None


def load_policy_specs(directory: str) -> Iterator[Tuple[str, str, Any]]:
"""Loads the Policy function module from a file.
def load_analysis_specs(directory: str) -> Iterator[Tuple[str, str, Any]]:
"""Loads the analysis specifications from a file.
Args:
directory: The relativie path to Panther policies.
directory: The relative path to Panther policies or rules.
Yields:
A tuple of the relative filepath, directory name, and loaded policy specification dict.
A tuple of the relative filepath, directory name, and loaded analysis specification dict.
"""
for dir_name, _, file_list in os.walk(directory):
for filename in sorted(file_list):
Expand All @@ -148,46 +148,45 @@ def datetime_converted(obj: Any) -> Any:
return obj


def zip_policies(args: argparse.Namespace) -> Tuple[int, str]:
"""Tests, validates, and then archives all policies into a local zip file.
def zip_analysis(args: argparse.Namespace) -> Tuple[int, str]:
"""Tests, validates, and then archives all policies and rules into a local zip file.
Returns 1 if the policy test or validation fails.
Returns 1 if the analysis tests or validation fails.
Args:
args: The populated Argparse namespace with parsed command-line arguments.
Returns:
A tuple of return code and the archive filename.
"""
return_code, _ = test_policies(args)
return_code, _ = test_analysis(args)

if return_code == 1:
return return_code, ''

logging.info('Zipping policies in %s to %s', args.policies,
args.output_path)
logging.info('Zipping analysis packs in %s to %s', args.path, args.out)
# example: 2019-08-05T18-23-25
# The colon character is not valid in filenames.
current_time = datetime.now().isoformat(timespec='seconds').replace(
':', '-')
filename = 'panther-policies'
filename = 'panther-analysis'
return 0, shutil.make_archive(
os.path.join(args.output_path, '{}-{}'.format(filename, current_time)),
'zip', args.policies)
os.path.join(args.out, '{}-{}'.format(filename, current_time)), 'zip',
args.path)


def upload_policies(args: argparse.Namespace) -> Tuple[int, str]:
"""Tests, validates, packages, and then uploads all policies into a Panther deployment.
def upload_analysis(args: argparse.Namespace) -> Tuple[int, str]:
"""Tests, validates, packages, and uploads all policies and rules into a Panther deployment.
Returns 1 if the policy tests, validation, or packaging fails.
Returns 1 if the analysis tests, validation, or packaging fails.
Args:
args: The populated Argparse namespace with parsed command-line arguments.
Returns:
A tuple of return code and the archive filename.
"""
return_code, archive = zip_policies(args)
return_code, archive = zip_analysis(args)
if return_code == 1:
return return_code, ''

Expand Down Expand Up @@ -230,8 +229,8 @@ def upload_policies(args: argparse.Namespace) -> Tuple[int, str]:
return 0, ''


def test_policies(args: argparse.Namespace) -> Tuple[int, list]:
"""Imports each Policy/Rule and runs their tests.
def test_analysis(args: argparse.Namespace) -> Tuple[int, list]:
"""Imports each policy or rule and runs their tests.
Args:
args: The populated Argparse namespace with parsed command-line arguments.
Expand All @@ -242,71 +241,72 @@ def test_policies(args: argparse.Namespace) -> Tuple[int, list]:
invalid_specs = []
failed_tests: DefaultDict[str, list] = defaultdict(list)
tests: List[str] = []
logging.info('Testing Policies in %s\n', args.policies)
logging.info('Testing analysis packs in %s\n', args.path)

# First import the globals file
specs = list(load_policy_specs(args.policies))
for policy_spec_filename, dir_name, policy_spec in specs:
if policy_spec.get('PolicyID') != 'aws_globals':
specs = list(load_analysis_specs(args.path))
for analysis_spec_filename, dir_name, analysis_spec in specs:
if analysis_spec.get('PolicyID') != 'aws_globals':
continue
module, load_err = load_module(
os.path.join(dir_name, policy_spec['Filename']))
os.path.join(dir_name, analysis_spec['Filename']))
# If the module could not be loaded, continue to the next
if load_err:
invalid_specs.append((policy_spec_filename, load_err))
invalid_specs.append((analysis_spec_filename, load_err))
break
sys.modules['aws_globals'] = module

# Next import each policy and run its tests
for policy_spec_filename, dir_name, policy_spec in specs:
if policy_spec.get('PolicyID') == 'aws_globals':
# Next import each policy or rule and run its tests
for analysis_spec_filename, dir_name, analysis_spec in specs:
if analysis_spec.get('PolicyID') == 'aws_globals':
continue

try:
SPEC_SCHEMA.validate(policy_spec)
SPEC_SCHEMA.validate(analysis_spec)
except (SchemaError, SchemaMissingKeyError, SchemaForbiddenKeyError,
SchemaUnexpectedTypeError) as err:
invalid_specs.append((policy_spec_filename, err))
invalid_specs.append((analysis_spec_filename, err))
continue

print(policy_spec['PolicyID'])
print(analysis_spec['PolicyID'])

# Check if the PolicyID has already been loaded
if policy_spec['PolicyID'] in tests:
if analysis_spec['PolicyID'] in tests:
print('\t[ERROR] Conflicting PolicyID\n')
invalid_specs.append(
(policy_spec_filename,
'Conflicting PolicyID: {}'.format(policy_spec['PolicyID'])))
(analysis_spec_filename,
'Conflicting PolicyID: {}'.format(analysis_spec['PolicyID'])))
continue

module, load_err = load_module(
os.path.join(dir_name, policy_spec['Filename']))
os.path.join(dir_name, analysis_spec['Filename']))
# If the module could not be loaded, continue to the next
if load_err:
invalid_specs.append((policy_spec_filename, load_err))
invalid_specs.append((analysis_spec_filename, load_err))
continue

tests.append(policy_spec['PolicyID'])
if policy_spec['AnalysisType'] == 'policy':
tests.append(analysis_spec['PolicyID'])
if analysis_spec['AnalysisType'] == 'policy':
run_func = module.policy
elif policy_spec['AnalysisType'] == 'rule':
elif analysis_spec['AnalysisType'] == 'rule':
run_func = module.rule
failed_tests = run_tests(policy_spec, run_func, failed_tests)
failed_tests = run_tests(analysis_spec, run_func, failed_tests)
print('')

for policy_id in failed_tests:
print("Failed: {}\n\t{}\n".format(policy_id, failed_tests[policy_id]))
for analysis_id in failed_tests:
print("Failed: {}\n\t{}\n".format(analysis_id,
failed_tests[analysis_id]))

for spec_filename, spec_error in invalid_specs:
print("Invalid: {}\n\t{}\n".format(spec_filename, spec_error))

return int(bool(failed_tests or invalid_specs)), invalid_specs


def run_tests(policy: Dict[str, Any], run_func: Callable[[TestCase], bool],
def run_tests(analysis: Dict[str, Any], run_func: Callable[[TestCase], bool],
failed_tests: DefaultDict[str, list]) -> DefaultDict[str, list]:

for unit_test in policy['Tests']:
for unit_test in analysis['Tests']:
try:
test_case = TestCase(unit_test['Resource'],
unit_test['ResourceType'])
Expand All @@ -317,7 +317,7 @@ def run_tests(policy: Dict[str, Any], run_func: Callable[[TestCase], bool],
test_result = 'PASS'
if result != unit_test['ExpectedResult']:
test_result = 'FAIL'
failed_tests[policy['PolicyID']].append(unit_test['Name'])
failed_tests[analysis['PolicyID']].append(unit_test['Name'])
print('\t[{}] {}'.format(test_result, unit_test['Name']))

return failed_tests
Expand All @@ -326,47 +326,56 @@ def run_tests(policy: Dict[str, Any], run_func: Callable[[TestCase], bool],
def setup_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description=
'Panther Analaysis Tool: A tool for writing, testing, and packaging Panther Policies/Rules',
'Panther Analaysis Tool: A command line tool for managing Panther policies and rules.',
prog='panther_analysis_tool')
parser.add_argument('--version',
action='version',
version='panther_analysis_tool 0.1.4')
version='panther_analysis_tool 0.1.7')
subparsers = parser.add_subparsers()

test_parser = subparsers.add_parser(
'test', help='Validate policy specifications and run policy tests.')
test_parser.add_argument('--policies',
type=str,
help='The relative path to Panther policies.',
required=True)
test_parser.set_defaults(func=test_policies)
'test',
help='Validate analysis specifications and run policy and rule tests.')
test_parser.add_argument(
'--path',
type=str,
help='The relative path to Panther policies and rules.',
required=True)
test_parser.set_defaults(func=test_analysis)

zip_parser = subparsers.add_parser(
'zip',
help='Create an archive of local policies for uploading to Panther.')
zip_parser.add_argument('--policies',
type=str,
help='The relative path to Panther policies.',
required=True)
zip_parser.add_argument('--output-path',
type=str,
help='The path to write zipped policies to.',
required=True)
zip_parser.set_defaults(func=zip_policies)
help=
'Create an archive of local policies and rules for uploading to Panther.'
)
zip_parser.add_argument(
'--path',
type=str,
help='The relative path to Panther policies and rules.',
required=True)
zip_parser.add_argument(
'--out',
type=str,
help='The path to write zipped policies and rules to.',
required=True)
zip_parser.set_defaults(func=zip_analysis)

upload_parser = subparsers.add_parser(
'upload', help='Upload specified policies to a Panther deployment.')
upload_parser.add_argument('--policies',
type=str,
help='The relative path to Panther policies.',
required=True)
'upload',
help='Upload specified policies and rules to a Panther deployment.')
upload_parser.add_argument(
'--path',
type=str,
help='The relative path to Panther policies and rules.',
required=True)
upload_parser.add_argument(
'--output-path',
'--out',
default='.',
type=str,
help='The location to store a local copy of the packaged policies.',
help=
'The location to store a local copy of the packaged policies and rules.',
required=False)
upload_parser.set_defaults(func=upload_policies)
upload_parser.set_defaults(func=upload_analysis)

return parser

Expand Down
Loading

0 comments on commit 02f611d

Please sign in to comment.