From a2f45804349f78424101ae26f6b7a040cdcd6be4 Mon Sep 17 00:00:00 2001 From: Jack Naglieri Date: Wed, 10 Jun 2020 10:09:58 -0700 Subject: [PATCH] 0.3.2 final edits (#28) * final edits * Update panther_analysis_tool/main.py Co-authored-by: Austin Byers Co-authored-by: Austin Byers --- Makefile | 4 +++ panther_analysis_tool/main.py | 51 +++++++++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index df912f03..3dda4c05 100644 --- a/Makefile +++ b/Makefile @@ -34,3 +34,7 @@ integration: panther_analysis_tool test --path tests/fixtures/valid_analysis/ test: unit + +pypi: + pipenv run python3 setup.py sdist + pipenv run twine upload dist/* diff --git a/panther_analysis_tool/main.py b/panther_analysis_tool/main.py index 7fb09ff0..dd6314eb 100644 --- a/panther_analysis_tool/main.py +++ b/panther_analysis_tool/main.py @@ -102,7 +102,7 @@ def load_analysis_specs(directory: str) -> Iterator[Tuple[str, str, Any]]: RULES_PATH_PATTERN, POLICIES_PATH_PATTERN) ]): - logging.info('Skipping path %s', relative_path) + logging.debug('Skipping path %s', relative_path) continue for filename in sorted(file_list): spec_filename = os.path.join(relative_path, filename) @@ -300,14 +300,34 @@ def test_analysis(args: argparse.Namespace) -> Tuple[int, list]: failed_tests = run_tests(analysis_spec, analysis_funcs, failed_tests) print('') - for analysis_id in failed_tests: - print("Failed: {}\n\t{}\n".format(analysis_id, - failed_tests[analysis_id])) + print_summary(args.path, len(analysis), failed_tests, invalid_specs) + return int(bool(failed_tests or invalid_specs)), invalid_specs - for spec_filename, spec_error in invalid_specs: - print("Invalid: {}\n\t{}\n".format(spec_filename, spec_error)) - return int(bool(failed_tests or invalid_specs)), invalid_specs +def print_summary(test_path: str, num_tests: int, failed_tests: List[Any], + invalid_specs: List[Any]) -> None: + '''Print a summary of passed, failed, and invalid specs''' + print('--------------------------') + print('Panther CLI Test Summary') + print('\tPath: {}'.format(test_path)) + print("\tPassed: {}".format(num_tests - + (len(failed_tests) + len(invalid_specs)))) + print("\tFailed: {}".format(len(failed_tests))) + print("\tInvalid: {}\n".format(len(invalid_specs))) + + err_message = "\t{}\n\t\t{}\n" + + if failed_tests: + print('--------------------------') + print('Failed Tests Summary') + for analysis_id in failed_tests: + print(err_message.format(analysis_id, failed_tests[analysis_id])) + + if invalid_specs: + print('--------------------------') + print('Invalid Tests Summary') + for spec_filename, spec_error in invalid_specs: + print(err_message.format(spec_filename, spec_error)) def filter_analysis(analysis: List[Any], filters: Dict[str, List]) -> List[Any]: @@ -316,6 +336,11 @@ def filter_analysis(analysis: List[Any], filters: Dict[str, List]) -> List[Any]: filtered_analysis = [] for file_name, dir_name, analysis_spec in analysis: + if fnmatch(dir_name, HELPERS_PATH_PATTERN): + logging.debug('auto-adding helpers file %s', + os.path.join(file_name)) + filtered_analysis.append((file_name, dir_name, analysis_spec)) + continue match = True for key, values in filters.items(): spec_value = analysis_spec.get(key, "") @@ -425,6 +450,7 @@ def setup_parser() -> argparse.ArgumentParser: required=False, metavar="KEY=VALUE", nargs='+') + test_parser.add_argument('--debug', action='store_true', dest='debug') test_parser.set_defaults(func=test_analysis) zip_parser = subparsers.add_parser( @@ -448,6 +474,7 @@ def setup_parser() -> argparse.ArgumentParser: required=False, metavar="KEY=VALUE", nargs='+') + zip_parser.add_argument('--debug', action='store_true', dest='debug') zip_parser.set_defaults(func=zip_analysis) upload_parser = subparsers.add_parser( @@ -470,6 +497,7 @@ def setup_parser() -> argparse.ArgumentParser: required=False, metavar="KEY=VALUE", nargs='+') + upload_parser.add_argument('--debug', action='store_true', dest='debug') upload_parser.set_defaults(func=upload_analysis) return parser @@ -499,11 +527,12 @@ def parse_filter(filters: List[str]) -> Dict[str, Any]: def run() -> None: - logging.basicConfig(format='[%(levelname)s]: %(message)s', - level=logging.INFO) - parser = setup_parser() args = parser.parse_args() + + logging.basicConfig(format='[%(levelname)s]: %(message)s', + level=logging.DEBUG if args.debug else logging.INFO) + try: if args.filter is not None: args.filter = parse_filter(args.filter) @@ -513,7 +542,7 @@ def run() -> None: sys.exit(1) except Exception as err: # pylint: disable=broad-except # Catch arbitrary exceptions without printing help message - logging.warning('Unexpected exception: "%s"', err) + logging.warning('Unhandled exception: "%s"', err) sys.exit(1) if return_code == 1: