diff --git a/.gitignore b/.gitignore index b827105..14a555b 100644 --- a/.gitignore +++ b/.gitignore @@ -130,3 +130,6 @@ dmypy.json .wdltest.changes.json .wdltest.submission.json + +# MAC +*.DS_STORE diff --git a/README.md b/README.md index b0e3db6..f8a74ba 100644 --- a/README.md +++ b/README.md @@ -339,6 +339,7 @@ Test params can be used to avoid repeating paths and values for test inputs and - For parameters whose value is an object, object members can be accessed using `.`s: `"reference_index": ${reference.fasta.data_index}` - Global params will replace values for all engines - Engine params will replace values only when submitting to a particular engine; useful if for example input sets exist across multiple environments and are prefixed with different paths +- For parameters with the same key, `engine_params` take precedence over `global_params` - Objects and arrays can be used for parameters; if you are using a complex parameter as an input or output value, this parameter must be the only content of the value, e.g. `"my_input": "${complex_param}"`, not `"my_input": "gs://my_bucket/${complex_param}"` - The values of complex parameters can themselves use parameters, and will be substituted appropriately @@ -459,6 +460,31 @@ If a configuration file already exists, this will add workflows or tasks that do `wdl-ci generate-config [--remove]` +## Calculate coverage + +Calculates percent coverage for each task and workflow. Task coverage is computed as the sum of task outputs with at least one test divided by the number of task outputs, while workflow coverage is computed as the sum of task outputs across all tasks with at least one test divided by the number of task outputs across all tasks within a given workflow. Total coverage is also computed, as the total number of outputs with tests defined divided by the number of total outputs across all workflows. If there are any tasks or outputs with no tests, a warning is provided to the user, listing the relevant outputs and associated tasks. + +As long as an output is covered by one task, it is considered to be 100% covered, while if 3/4 outputs for a task have at least one test, that task has 75% test coverage. + +Example output: + +``` +workflowNameA coverage: 84% + taskA coverage: 75% + taskB coverage: 63% + taskC coverage: 100% + taskD coverage: 100% +workflowNameB coverage: 100% + taskA coverage: 100% + taskB coverage: 100% +... +Total coverage across outputs and tasks among all workflows: 89% +``` + +`wdl-ci coverage` can be run with additional options to filter results and set thresholds. The `--workflow-name` option allows the user to specify a workflow name to filter the results, showing only the coverage for that specific workflow. The `--target-coverage` option takes a float percent value and allows the user to set a threshold percentage; only tasks and workflows with coverage below this threshold will be displayed. If no tasks/workflows match the specified filter, a message will be printed to inform the user. Additionally, if all optional outputs are tested, a confirmation message will be displayed. + +`wdl-ci coverage --workflow-name --target-coverage ` + ## Lint workflows `wdl-ci lint` diff --git a/src/wdlci/cli/__main__.py b/src/wdlci/cli/__main__.py index 2e4f37f..1388456 100644 --- a/src/wdlci/cli/__main__.py +++ b/src/wdlci/cli/__main__.py @@ -5,6 +5,7 @@ from wdlci.cli.submit import submit_handler from wdlci.cli.monitor import monitor_handler from wdlci.cli.cleanup import cleanup_handler +from wdlci.cli.coverage import coverage_handler from wdlci.utils.ordered_group import OrderedGroup remove_option = click.option( @@ -34,6 +35,23 @@ help="Do not exit upon encountering a linting warning or error", ) +target_coverage = click.option( + "--target-coverage", + "-t", + type=float, + default=None, + show_default=True, + help="Target coverage (%); only output tasks or workflows with test coverage below this threshold", +) + +workflow_name = click.option( + "--workflow-name", + "-w", + multiple=True, + show_default=True, + help="Set of workflows to filter by; should be the full path to this workflow (same as the key in the config file)", +) + @click.group(cls=OrderedGroup) @click.version_option( @@ -100,3 +118,18 @@ def cleanup(**kwargs): """Clean Workbench namespace of transient artifacts""" cleanup_handler(kwargs) + + +@main.command +@target_coverage +@workflow_name +# TODO: Add options for minimalist output; e.g., maybe hide warning output for: +# tests that don't have tests both excluding and including optional inputs +# list of outputs that are not tested for each task +# skipped workflows +# workflows that have outputs but no tests +# TODO: Add option to consider totally untested tasks/outputs as 0% coverage or ignore them +def coverage(**kwargs): + """Outputs percent coverage for each task and output, and which tasks/outputs have no associated tests""" + + coverage_handler(kwargs) diff --git a/src/wdlci/cli/coverage.py b/src/wdlci/cli/coverage.py new file mode 100644 index 0000000..ddc0b04 --- /dev/null +++ b/src/wdlci/cli/coverage.py @@ -0,0 +1,354 @@ +import WDL +import os +import sys + +from wdlci.config import Config +from wdlci.exception.wdl_test_cli_exit_exception import WdlTestCliExitException +from wdlci.utils.initialize_worklows_and_tasks import find_wdl_files + + +def coverage_handler(kwargs): + # Initialize dictionary with necessary variables to compute coverage + coverage_summary = { + "untested_workflows_list": [], + # {workflow_name: [task_name]} + "untested_tasks_dict": {}, + # {workflow_name: {task_name: [output_name]}} + "untested_outputs_dict": {}, + # {workflow_name: {task_name: [output_name]}} + "untested_outputs_with_optional_inputs_dict": {}, + # {workflow_name: {task_name: [output_name]}} + "tested_outputs_dict": {}, + "total_output_count": 0, + "all_tested_outputs_list": [], + "skipped_workflows_list": [], + } + + threshold = kwargs["target_coverage"] + workflow_name_filters = kwargs["workflow_name"] + print(f"Target coverage threshold: ", threshold) + print(f"Workflow name filters: {workflow_name_filters}\n") + + # TODO come back to this; compute on the fly at the end? + tasks_below_threshold = False + workflows_below_threshold = False + + print("┍━━━━━━━━━━━━━┑") + print("│ Coverage │") + print("┕━━━━━━━━━━━━━┙") + try: + # Load the config file + Config.load(kwargs) + config = Config.instance() + + # Load all WDL files in the directory + wdl_files = find_wdl_files() + + wdl_files_filtered = [] + if len(workflow_name_filters) > 0: + for workflow_name in workflow_name_filters: + if workflow_name in wdl_files: + wdl_files_filtered.append(workflow_name) + else: + raise WdlTestCliExitException( + f"No workflows found matching the filter: [{workflow_name}]. Possible workflow options are:\n{wdl_files}", + 1, + ) + else: + wdl_files_filtered = [workflow_name for workflow_name in wdl_files] + + # Iterate over each WDL file + for workflow_name in wdl_files_filtered: + workflow_tested_outputs_list = [] + workflow_output_count = 0 + + # Load the WDL document + doc = WDL.load(workflow_name) + + # Handle the case where the WDL file is not in the configuration but is present in the directory + if workflow_name not in config._file.workflows: + coverage_summary["skipped_workflows_list"].append(workflow_name) + continue + + # Check if the WDL document has > 0 tasks or a workflow attribute exists; structs might be part of the config and do not have tasks nor do they have outputs to test. Additionally, just checking for > 0 tasks misses parent workflows that just import and call other tasks/workflows. TBD if we want to include these 'parent' workflows, but ultimately, if there are no tasks or a workflow attribute, we skip the WDL file and print a warning + if len(doc.tasks) > 0 or doc.workflow is not None: + # Iterate over each task in the WDL document + for task in doc.tasks: + # Add to counters for total output count and workflow output count + coverage_summary["total_output_count"] += len(task.outputs) + workflow_output_count += len(task.outputs) + # Initialize a list of task test dictionaries + task_tests_list = [] + + # Create a list of dictionaries for each set of task tests in our config file + task_tests_list = ( + config._file.workflows[workflow_name].tasks[task.name].tests + ) + + # We are reducing the set of tested_outputs (output names) across input sets for the same task, ie if the same output is tested multiple times with different inputs, we'll count it as tested + # Create a list of all the outputs that are tested in the config file and found in the task output_tests dictionary + tested_outputs = list( + set( + [ + output_name + for test in task_tests_list + for output_name, output_test in test.output_tests.items() + # Handle the case where test_tasks exists but is an empty list + if len(output_test.get("test_tasks")) > 0 + ] + ) + ) + + # Update coverage_summary with tested outputs for each task + if len(tested_outputs) > 0: + coverage_summary = _update_coverage_summary( + coverage_summary, + "tested_outputs_dict", + workflow_name, + task.name, + output_names=tested_outputs, + ) + + # Create a list of all the outputs that are present in the task + all_task_outputs = [output.name for output in task.outputs] + + # Create a list of outputs that are present in the task but not in the config file + missing_outputs = [ + output_name + for output_name in all_task_outputs + if output_name not in tested_outputs + ] + + # Add tested outputs to workflow_tested_outputs_list and all_tested_outputs_list + workflow_tested_outputs_list.extend(tested_outputs) + coverage_summary["all_tested_outputs_list"].extend(tested_outputs) + + # Add missing outputs to the coverage_summary[untested_outputs] dictionary if there are any missing outputs + if len(missing_outputs) > 0: + coverage_summary = _update_coverage_summary( + coverage_summary, + "untested_outputs_dict", + workflow_name, + task.name, + output_names=missing_outputs, + ) + + # Check for optional inputs and check if there is a test that covers running that task with the optional input and without it + optional_inputs = [ + input.name for input in task.inputs if input.type.optional + ] + optional_inputs_not_dually_tested = [] + for optional_input in optional_inputs: + test_exists_with_optional_set = False + test_exists_with_optional_not_set = False + for task_test in task_tests_list: + if optional_input in task_test.inputs: + test_exists_with_optional_set = True + else: + test_exists_with_optional_not_set = True + + if not ( + test_exists_with_optional_set + and test_exists_with_optional_not_set + ): + optional_inputs_not_dually_tested.extend( + [output.name for output in task.outputs] + ) + + coverage_summary = _update_coverage_summary( + coverage_summary, + "untested_outputs_with_optional_inputs_dict", + workflow_name, + task.name, + output_names=list(set(optional_inputs_not_dually_tested)), + ) + + # If there are outputs but no tests, add the task to the untested_tasks list. If there are outputs and tests, calculate the task coverage + if len(task.outputs) > 0: + # Handle the case where the task is in the config but has no associated tests + if len(task_tests_list) == 0: + coverage_summary = _update_coverage_summary( + coverage_summary, + "untested_tasks_dict", + workflow_name, + task.name, + ) + else: + # Calculate and print the task coverage + task_coverage = ( + len( + coverage_summary["tested_outputs_dict"][ + workflow_name + ][task.name] + ) + / len(task.outputs) + ) * 100 + if threshold is not None and task_coverage < threshold: + tasks_below_threshold = True + print(f"\ntask.{task.name}: {task_coverage:.2f}%") + else: + print(f"\ntask.{task.name}: {task_coverage:.2f}%") + + # Calculate workflow coverage; only calculate if there are outputs and tests for the workflow. If there are no outputs or tests but there is a workflow block and name, add the workflow to the untested_workflows list + # Need to make sure there is a valid workflow and that the workflow has a name; avoids trying to calculate coverage for struct workflows + if workflow_output_count > 0 and len(workflow_tested_outputs_list) > 0: + workflow_coverage = ( + len(workflow_tested_outputs_list) / workflow_output_count + ) * 100 + if threshold is not None and workflow_coverage < threshold: + workflows_below_threshold = True + print( + f"\n" + + f"\033[34mWorkflow: {workflow_name}: {workflow_coverage:.2f}%\033[0m" + ) + print("-" * 150) + elif ( + workflow_output_count == 0 or len(workflow_tested_outputs_list) == 0 + ): + coverage_summary["untested_workflows_list"].append(workflow_name) + + # Append the workflow to the skipped_workflows list if there are no tasks or workflow blocks + else: + coverage_summary["skipped_workflows_list"].append(workflow_name) + + # Calculate and print the total coverage + if ( + len(coverage_summary["all_tested_outputs_list"]) > 0 + and coverage_summary["total_output_count"] > 0 + ): + total_coverage = ( + len(coverage_summary["all_tested_outputs_list"]) + / coverage_summary["total_output_count"] + ) * 100 + print("\n" + f"\033[33mTotal coverage: {total_coverage:.2f}%\033[0m") + else: + print("There are no outputs to compute coverage for.") + + # Sum the total number of untested outputs and untested outputs with optional inputs where there is not a test for the output with and without the optional input + total_untested_outputs = _sum_outputs(coverage_summary, "untested_outputs_dict") + total_untested_outputs_with_optional_inputs = _sum_outputs( + coverage_summary, "untested_outputs_with_optional_inputs_dict" + ) + total_tested_outputs = _sum_outputs(coverage_summary, "tested_outputs_dict") + if total_tested_outputs > 0: + print("\n The following outputs are tested:") + _print_coverage_items(coverage_summary, "tested_outputs_dict") + + # Check if any outputs are below the threshold and there are no untested outputs; if so return to the user that all outputs exceed the threshold + ## TODO: rephrase / assess if needed + if _check_threshold(tasks_below_threshold, total_untested_outputs, threshold): + print("\n✓ All outputs are tested.") + + if ( + total_untested_outputs > 0 + or total_untested_outputs_with_optional_inputs > 0 + or len(coverage_summary["untested_tasks_dict"]) > 0 + or len(coverage_summary["untested_workflows_list"]) > 0 + or len(coverage_summary["skipped_workflows_list"]) + ): + print("┍━━━━━━━━━━━━━┑") + print("│ Warning(s) │") + print("┕━━━━━━━━━━━━━┙") + + # Warn the user if any outputs have no tests + if total_untested_outputs > 0: + print("\n" + "\033[31m[WARN]: The following outputs have no tests:\033[0m") + _print_coverage_items(coverage_summary, "untested_outputs_dict") + + # Warn the user if any outputs are part of a task that contains an optional input and are not covered by at least two tests (one for each case where the optional input is and is not provided) + if total_untested_outputs_with_optional_inputs > 0: + print( + "\n" + + "\033[31m[WARN]: The following outputs are part of a task that contains an optional input and are not covered by at least two tests; they should be covered for both cases where the optional input is and is not provided:\033[0m" + ) + _print_coverage_items( + coverage_summary, "untested_outputs_with_optional_inputs_dict" + ) + + # Check if any tasks are below the threshold and there are no untested tasks; if so, return to the user that all tasks exceed the threshold + if _check_threshold( + tasks_below_threshold, + len(coverage_summary["untested_tasks_dict"]), + threshold, + ): + print("\n✓ All tasks exceed the specified coverage threshold.") + + # Warn the user if any tasks have no tests + if len(coverage_summary["untested_tasks_dict"]) > 0: + print("\n" + "\033[31m[WARN]: The following tasks have no tests:\033[0m") + for workflow, tasks in coverage_summary["untested_tasks_dict"].items(): + for task in tasks: + print(f"\t{workflow}.{task}") + + # Check if any workflows are below the threshold and there are no untested workflows; if so, return to the user that all workflows exceed the threshold + if _check_threshold( + workflows_below_threshold, + len(coverage_summary["untested_workflows_list"]), + threshold, + ): + print("\n✓ All workflows exceed the specified coverage threshold.") + + # If there are any workflows that are below the threshold, warn the user. Include a check for the workflow_name_filter to ensure that only the specified workflow is printed if a filter is provided + else: + if len(coverage_summary["untested_workflows_list"]) > 0: + print( + "\n" + + "\033[31m[WARN]: The following workflows have outputs but no tests:\033[0m" + ) + for workflow in coverage_summary["untested_workflows_list"]: + print(f"\t{workflow}") + + # Warn the user if any workflows were skipped + if len(coverage_summary["skipped_workflows_list"]) > 0: + print( + "\n" + + "\033[31m[WARN]: The following workflows were skipped as they were not found in the wdl-ci.config.json but are present in the directory, or they were present in the config JSON had no task blocks:\033[0m" + ) + for workflow in coverage_summary["skipped_workflows_list"]: + print(f"\t{workflow}") + + except WdlTestCliExitException as e: + print(f"exiting with code {e.exit_code}, message: {e.message}") + sys.exit(e.exit_code) + + # Reset config regardless of try and except outcome + finally: + config.reset() + + +# Helper functions +def _sum_outputs(coverage_summary, key): + """ + Returns: + (int): The number of outputs for the given category + """ + return sum( + len(outputs) + for tasks in coverage_summary[key].values() + for outputs in tasks.values() + ) + + +def _update_coverage_summary(coverage_summary, key, workflow_name, task_name, **kwargs): + output_names = kwargs.get("output_names", []) + if workflow_name not in coverage_summary[key]: + coverage_summary[key][workflow_name] = {} + if task_name not in coverage_summary[key][workflow_name]: + if len(output_names) > 0: + coverage_summary[key][workflow_name][task_name] = output_names + else: + coverage_summary[key][workflow_name][task_name] = [] + return coverage_summary + + +def _print_coverage_items(coverage_summary, key): + for workflow, tasks in coverage_summary[key].items(): + for task, outputs in tasks.items(): + if len(outputs) > 0: + print(f"\t{workflow}.{task}: {outputs}") + + +def _check_threshold(below_threshold_flag, untested_count, threshold): + return ( + below_threshold_flag is False and untested_count == 0 and threshold is not None + ) diff --git a/src/wdlci/cli/generate_config.py b/src/wdlci/cli/generate_config.py index 7dee502..db3ff2c 100644 --- a/src/wdlci/cli/generate_config.py +++ b/src/wdlci/cli/generate_config.py @@ -2,6 +2,7 @@ import WDL from wdlci.config.config_file import WorkflowConfig, WorkflowTaskConfig from wdlci.constants import CONFIG_JSON +from wdlci.utils.initialize_worklows_and_tasks import find_wdl_files import os.path @@ -19,14 +20,7 @@ def generate_config_handler(kwargs, update_task_digests=False): config = Config.instance() # Initialize workflows and tasks - cwd = os.getcwd() - wdl_files = [] - for root_path, subfolders, filenames in os.walk(cwd): - for filename in filenames: - if filename.endswith(".wdl"): - wdl_files.append( - os.path.relpath(os.path.join(root_path, filename), cwd) - ) + wdl_files = find_wdl_files() for workflow_path in wdl_files: if workflow_path not in config.file.workflows: diff --git a/src/wdlci/cli/submit.py b/src/wdlci/cli/submit.py index 7ee71a2..7bca484 100644 --- a/src/wdlci/cli/submit.py +++ b/src/wdlci/cli/submit.py @@ -67,6 +67,9 @@ def submit_handler(kwargs): # register workflow(s) tasks_to_test = dict() + workflow_outputs = [] + missing_outputs_list = [] + for workflow_key in changeset.get_workflow_keys(): for task_key in changeset.get_tasks(workflow_key): doc = WDL.load(workflow_key) @@ -77,9 +80,22 @@ def submit_handler(kwargs): # Register and create workflows for all tasks with tests for test_index, test_input_set in enumerate(task.tests): doc_main_task = doc_tasks[task_key] + # Create list of workflow output names + for output in doc_main_task.outputs: + workflow_outputs.append(output.name) output_tests = test_input_set.output_tests + missing_outputs_list = [ + key for key in output_tests if key not in workflow_outputs + ] + + if len(missing_outputs_list) > 0: + raise WdlTestCliExitException( + f"Expected output(s): [{', '.join(missing_outputs_list)}] not found in task: {doc_main_task.name}; has this output been removed from the workflow?\nIf so, you will need to remove this output from the wdl-ci.config.json before proceeding.", + 1, + ) + # Skip input sets with no test tasks defined for any output all_test_tasks = [ test_task diff --git a/src/wdlci/config/__init__.py b/src/wdlci/config/__init__.py index 5a5136a..17f8974 100644 --- a/src/wdlci/config/__init__.py +++ b/src/wdlci/config/__init__.py @@ -37,6 +37,11 @@ def instance(cls): raise WdlTestCliExitException("Config not loaded, use load() first") return cls._instance + @classmethod + def reset(cls): + cls._cli_kwargs = None + cls._instance = None + def write(self): with open(CONFIG_JSON, "w") as f: json.dump(self.file, f, cls=ConfigEncoder, indent=2) diff --git a/src/wdlci/utils/initialize_worklows_and_tasks.py b/src/wdlci/utils/initialize_worklows_and_tasks.py new file mode 100644 index 0000000..6af5cdd --- /dev/null +++ b/src/wdlci/utils/initialize_worklows_and_tasks.py @@ -0,0 +1,13 @@ +import os.path + + +def find_wdl_files(): + cwd = os.getcwd() + wdl_files = [] + for root_path, _, filenames in os.walk(cwd): + for filename in filenames: + if filename.endswith(".wdl"): + wdl_files.append( + os.path.relpath(os.path.join(root_path, filename), cwd) + ) + return wdl_files diff --git a/tests/test_coverage_handler.py b/tests/test_coverage_handler.py new file mode 100644 index 0000000..cafa7ee --- /dev/null +++ b/tests/test_coverage_handler.py @@ -0,0 +1,517 @@ +import unittest +import os +import subprocess +import json +import warnings +import sys +from io import StringIO +from unittest.mock import patch + +from src.wdlci.cli.coverage import coverage_handler, coverage_summary + + +class TestCoverageHandler(unittest.TestCase): + EXAMPLE_WDL_WORKFLOW = """ + version 1.0 + + struct Reference { + File fasta + String organism + } + + workflow call_variants { + input { + File bam + Reference ref + String input_str + } + + call freebayes as freebayes_1 { + input: + bam=bam, + ref=ref + } + + call hello_world { + input: + input_str = input_str + } + + output { + File vcf = freebayes_1.vcf + File greeting = hello_world.greeting + } + } + + task freebayes { + input { + File bam + Reference ref + Float? min_alternate_fraction + } + + String prefix = basename(bam, ".bam") + Float default_min_alternate_fraction = select_first([min_alternate_fraction, 0.2]) + + command <<< + freebayes -v '~{prefix}.vcf' -f ~{ref.fasta} \ + -F ~{default_min_alternate_fraction} \ + ~{bam} + >>> + + runtime { + docker: "quay.io/biocontainers/freebayes:1.3.2--py36hc088bd4_0" + } + + output { + File vcf = "${prefix}.vcf" + } + } + + task hello_world { + input { + String input_str + } + + command <<< + echo ~{input_str} > output_str.txt + >>> + + runtime { + docker: "ubuntu:xenial" + } + + output { + File greeting = "output_str.txt" + File unused_greeting = "unused_output_str.txt" + } + } +""" + + def setUp(self): + # Redirect stdout to hide the output of the coverage command and just see the test results + self._original_stdout = sys.stdout + sys.stdout = open(os.devnull, "w") + # Suppress the ResourceWarning complaining about the config_file json.load not being closed + warnings.simplefilter("ignore", ResourceWarning) + # Create the WDL files with different workflow names + wdl_workflow_1 = self.EXAMPLE_WDL_WORKFLOW.replace( + "call_variants", "call_variants_1" + ) + wdl_workflow_2 = self.EXAMPLE_WDL_WORKFLOW.replace( + "call_variants", "call_variants_2" + ) + + with open("test_call-variants_1.wdl", "w") as f: + f.write(wdl_workflow_1) + with open("test_call-variants_2.wdl", "w") as f: + f.write(wdl_workflow_2) + # Run the wdl-ci generate-config subcommand + subprocess.run( + ["wdl-ci", "generate-config"], check=True, stdout=subprocess.DEVNULL + ) + + def tearDown(self): + # Remove the WDL files + if os.path.exists("test_call-variants_1.wdl"): + os.remove("test_call-variants_1.wdl") + if os.path.exists("test_call-variants_2.wdl"): + os.remove("test_call-variants_2.wdl") + if os.path.exists("wdl-ci.config.json"): + os.remove("wdl-ci.config.json") + sys.stdout.close() + sys.stdout = self._original_stdout + self.reset_coverage_summary() + + def update_config_with_tests(self, workflow_name, task_name, wdl_tests): + # Read the existing config file + with open("wdl-ci.config.json", "r") as f: + config = json.load(f) + + config["workflows"][workflow_name]["tasks"][task_name]["tests"] = wdl_tests + + # Write the updated config back to the file + with open("wdl-ci.config.json", "w") as f: + json.dump(config, f, indent=2) + + def reset_coverage_summary(self): + coverage_summary["untested_workflows_list"] = [] + coverage_summary["untested_tasks_dict"] = {} + coverage_summary["untested_outputs_dict"] = {} + coverage_summary["untested_outputs_with_optional_inputs_dict"] = {} + coverage_summary["tested_outputs_dict"] = {} + coverage_summary["total_output_count"] = 0 + coverage_summary["all_outputs_list"] = [] + coverage_summary["skipped_workflows_list"] = [] + + def test_identical_output_names_with_threshold(self): + test_cases = [ + { + "inputs": { + "bam": "test.bam", + "ref": {"fasta": "test.fasta", "organism": "test_organism"}, + }, + "output_tests": { + "vcf": { + "value": "test.vcf", + "test_tasks": ["compare_file_basename"], + } + }, + } + ] + self.update_config_with_tests( + workflow_name="test_call-variants_1.wdl", + task_name="freebayes", + wdl_tests=test_cases, + ) + self.update_config_with_tests( + workflow_name="test_call-variants_2.wdl", + task_name="freebayes", + wdl_tests=test_cases, + ) + + # Call the coverage_handler function + kwargs = {"target_coverage": 50, "workflow_name": None} + coverage_handler(kwargs) + + # Assert both workflows are not in the untested workflows list + self.assertNotIn("call_variants_1", coverage_summary["untested_workflows_list"]) + self.assertNotIn("call_variants_2", coverage_summary["untested_workflows_list"]) + # Assert that the corresponding task is not in the untested tasks dictionary + self.assertNotIn("freebayes", coverage_summary["untested_tasks_dict"]) + # Assert that "vcf" is found in both sets of {workflow: {task: [tested_outputs]}} in the parent untested_outputs_with_optional_inputs_dict + self.assertIn( + "vcf", + coverage_summary["untested_outputs_with_optional_inputs_dict"][ + "call_variants_1" + ]["freebayes"], + ) + self.assertIn( + "vcf", + coverage_summary["untested_outputs_with_optional_inputs_dict"][ + "call_variants_2" + ]["freebayes"], + ) + # Assert that "vcf" is found in both sets of {workflow: {task: [tested_outputs]}} in the parent tested_output_dict + self.assertIn( + "vcf", + coverage_summary["tested_outputs_dict"]["call_variants_1"]["freebayes"], + ) + self.assertIn( + "vcf", + coverage_summary["tested_outputs_dict"]["call_variants_2"]["freebayes"], + ) + + # Case where no tasks are tested at all + def test_no_tasks_in_workflow(self): + # Update the "tests" list for specific workflows + test_cases = [] + self.update_config_with_tests( + workflow_name="test_call-variants_1.wdl", + task_name="freebayes", + wdl_tests=test_cases, + ) + self.update_config_with_tests( + workflow_name="test_call-variants_2.wdl", + task_name="freebayes", + wdl_tests=test_cases, + ) + # Call the coverage_handler function + kwargs = { + "target_coverage": None, + "workflow_name": None, + } + coverage_handler(kwargs) + + # Assert all four outputs are untested + self.assertEqual( + sum( + len(tasks) + for tasks in coverage_summary["untested_outputs_dict"].values() + ), + 4, + ) + + # Assert both outputs with optional inputs are not dually tested + self.assertEqual( + sum( + len(tasks) + for tasks in coverage_summary[ + "untested_outputs_with_optional_inputs_dict" + ].values() + ), + 2, + ) + # Assert all four tasks are untested + self.assertEqual( + sum( + len(tasks) for tasks in coverage_summary["untested_tasks_dict"].values() + ), + 4, + ) + # Assert both workflows are untested + self.assertGreaterEqual(len(coverage_summary["untested_workflows_list"]), 2) + + # Providing a valid workflow name to --workflow name where the workflow exists, but has no tests + def test_valid_workflow_name_with_no_tasks(self): + test_cases = [] + # Update the workflow we are NOT filtering for + self.update_config_with_tests( + workflow_name="test_call-variants_1.wdl", + task_name="hello_world", + wdl_tests=test_cases, + ) + kwargs = {"target_coverage": None, "workflow_name": "call_variants_2"} + coverage_handler(kwargs) + + # Assert one workflow is untested (in reality both are, but we are filtering for one) + self.assertEqual(len(coverage_summary["untested_workflows_list"]), 1) + + # Assert both tasks in the workflow we are filtering for are untested + self.assertEqual( + sum( + len(tasks) for tasks in coverage_summary["untested_tasks_dict"].values() + ), + 2, + ) + + # Case where some tasks with optional inputs have outputs dually tested but others do not + def test_handling_of_optional_inputs(self): + dually_tested_optional_input_test_cases = [ + { + "inputs": { + "bam": "test.bam", + "ref": {"fasta": "test.fasta", "organism": "test_organism"}, + "min_alternate_fraction": 0.5, + }, + "output_tests": { + "vcf": { + "value": "test.vcf", + "test_tasks": ["compare_file_basename"], + } + }, + }, + { + "inputs": { + "bam": "test.bam", + "ref": {"fasta": "test.fasta", "organism": "test_organism"}, + }, + "output_tests": { + "vcf": { + "value": "test.vcf", + "test_tasks": ["compare_file_basename"], + } + }, + }, + ] + + untested_optionals_test_cases = [ + { + "inputs": { + "bam": "test.bam", + "ref": {"fasta": "test.fasta", "organism": "test_organism"}, + "min_alternate_fraction": 0.5, + }, + "output_tests": { + "vcf": { + "value": "test.vcf", + "test_tasks": ["compare_file_basename"], + } + }, + } + ] + + self.update_config_with_tests( + workflow_name="test_call-variants_1.wdl", + task_name="freebayes", + wdl_tests=dually_tested_optional_input_test_cases, + ) + self.update_config_with_tests( + workflow_name="test_call-variants_2.wdl", + task_name="freebayes", + wdl_tests=untested_optionals_test_cases, + ) + kwargs = {"target_coverage": None, "workflow_name": None} + coverage_handler(kwargs) + + self.assertEqual( + sum( + len(outputs) + for outputs in coverage_summary[ + "untested_outputs_with_optional_inputs_dict" + ].values() + ), + 1, + ) + self.assertEqual( + list(coverage_summary["untested_outputs_with_optional_inputs_dict"].keys()), + ["call_variants_2"], + ) + + # Threshold testing cases + # Test case where workflow exceeds target coverage but outputs and tasks do not + def test_workflow_exceed_threshold(self): + workflow_threshold_pass_test_cases = [ + { + "inputs": { + "bam": "test.bam", + "ref": {"fasta": "test.fasta", "organism": "test_organism"}, + }, + "output_tests": { + "vcf": { + "value": "test.vcf", + "test_tasks": ["compare_file_basename"], + } + }, + } + ] + + self.update_config_with_tests( + workflow_name="test_call-variants_1.wdl", + task_name="freebayes", + wdl_tests=workflow_threshold_pass_test_cases, + ) + with patch("sys.stdout", new=StringIO()) as fake_out: + kwargs = {"target_coverage": 30, "workflow_name": "call_variants_1"} + coverage_handler(kwargs) + coverage_handler_stdout = fake_out.getvalue() + + expected_workflow_pass_message = ( + "All workflows exceed the specified coverage threshold" + ) + self.assertIn(expected_workflow_pass_message, coverage_handler_stdout) + + # Test case where workflow and tasks exceeds target coverage but outputs not + def test_workflow_and_tasks_exceed_threshold(self): + freebayes_workflow_and_task_threshold_pass_test_cases = [ + { + "inputs": { + "bam": "test.bam", + "ref": {"fasta": "test.fasta", "organism": "test_organism"}, + }, + "output_tests": { + "vcf": { + "value": "test.vcf", + "test_tasks": ["compare_file_basename"], + } + }, + } + ] + hello_world_workflow_and_task_threshold_pass_test_cases = [ + { + "inputs": {"input_str": "test"}, + "output_tests": { + "greeting": { + "value": "output_str_test.txt", + "test_tasks": ["compare_file_basename"], + } + }, + }, + ] + + self.update_config_with_tests( + workflow_name="test_call-variants_2.wdl", + task_name="freebayes", + wdl_tests=freebayes_workflow_and_task_threshold_pass_test_cases, + ) + self.update_config_with_tests( + workflow_name="test_call-variants_2.wdl", + task_name="hello_world", + wdl_tests=hello_world_workflow_and_task_threshold_pass_test_cases, + ) + with patch("sys.stdout", new=StringIO()) as fake_out: + kwargs = {"target_coverage": 45, "workflow_name": "call_variants_2"} + coverage_handler(kwargs) + coverage_handler_stdout = fake_out.getvalue() + + expected_workflow_pass_message = ( + "All workflows exceed the specified coverage threshold" + ) + self.assertIn(expected_workflow_pass_message, coverage_handler_stdout) + expected_task_pass_message = "All tasks exceed the specified coverage threshold" + self.assertIn(expected_task_pass_message, coverage_handler_stdout) + + # Test case where workflow, tasks, and outputs exceeds target coverage + def test_workflow_and_tasks_and_outputs_exceed_threshold(self): + freebayes_workflow_and_task_threshold_pass_test_cases = [ + { + "inputs": { + "bam": "test.bam", + "ref": {"fasta": "test.fasta", "organism": "test_organism"}, + }, + "output_tests": { + "vcf": { + "value": "test.vcf", + "test_tasks": ["compare_file_basename"], + } + }, + } + ] + hello_world_workflow_and_task_threshold_pass_test_cases = [ + { + "inputs": {"input_str": "test"}, + "output_tests": { + "greeting": { + "value": "output_str_test.txt", + "test_tasks": ["compare_file_basename"], + } + }, + }, + { + "inputs": {"input_str": "test"}, + "output_tests": { + "unused_greeting": { + "value": "output_str_test.txt", + "test_tasks": ["compare_file_basename"], + } + }, + }, + ] + + self.update_config_with_tests( + workflow_name="test_call-variants_2.wdl", + task_name="freebayes", + wdl_tests=freebayes_workflow_and_task_threshold_pass_test_cases, + ) + self.update_config_with_tests( + workflow_name="test_call-variants_2.wdl", + task_name="hello_world", + wdl_tests=hello_world_workflow_and_task_threshold_pass_test_cases, + ) + with patch("sys.stdout", new=StringIO()) as fake_out: + kwargs = {"target_coverage": 45, "workflow_name": "call_variants_2"} + coverage_handler(kwargs) + coverage_handler_stdout = fake_out.getvalue() + + expected_workflow_pass_message = ( + "All workflows exceed the specified coverage threshold" + ) + self.assertIn(expected_workflow_pass_message, coverage_handler_stdout) + expected_task_pass_message = "All tasks exceed the specified coverage threshold" + self.assertIn(expected_task_pass_message, coverage_handler_stdout) + expected_output_pass_message = ( + "All outputs exceed the specified coverage threshold" + ) + self.assertIn(expected_output_pass_message, coverage_handler_stdout) + + +if __name__ == "__main__": + unittest.main() + + # Providing an invalid workflow name to --workflow-name + # # TODO: this doesn't work as the config doesn't get reset when the workflow exits with the No workflows found method -- adjust config reset to support + # def test_invalid_workflow_name(self): + # kwargs = {"target_coverage": None, "workflow_name": "nonexistent_workflow"} + + # coverage_handler(kwargs) + # output = sys.stdout + + # # Assert that the output contains the expected message + # expected_message = f"No workflows found matching the filter: [{kwargs['workflow_name']}] or the workflow you searched for has no tasks or workflow attribute" + # self.assertIn(expected_message, output) + + #### Additional tests I'd like to add #### + # Test workflows with >1 tasks + # Test WDL files with >1 tasks but no 'workflow' block + # Providing an extremely large wdl-ci.config.json + # Test case where a WDL file is not in the configuration but is present in the directory and make sure it's appended to the skipped workflows list