From a698f45465abc59e8e533d749980a914c01c824c Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 26 Sep 2024 12:48:16 -0400 Subject: [PATCH] Sniffing for and workaround for keeping output of croo workflows (#5096) * Stack up scatters and conditionals to find tasks and fix #5094 * Use conformance tests with correct failure expectation * Add --allCallOutputs to add call outputs to workflow output * Default to including all task outputs for croo workflows * Document new option and missing option * Improve option doc wording --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Lon Blauvelt --- docs/wdl/running.rst | 12 ++++ src/toil/options/wdl.py | 7 +- src/toil/test/wdl/testfiles/croo.wdl | 38 +++++++++++ .../test/wdl/testfiles/not_enough_outputs.wdl | 33 ++++++++++ src/toil/test/wdl/wdltoil_test.py | 66 ++++++++++++++++++- src/toil/wdl/wdltoil.py | 44 ++++++++++--- 6 files changed, 189 insertions(+), 11 deletions(-) create mode 100644 src/toil/test/wdl/testfiles/croo.wdl create mode 100644 src/toil/test/wdl/testfiles/not_enough_outputs.wdl diff --git a/docs/wdl/running.rst b/docs/wdl/running.rst index 6274a68f32..79fab3160b 100644 --- a/docs/wdl/running.rst +++ b/docs/wdl/running.rst @@ -68,10 +68,22 @@ input JSON file, for compatibility with other WDL runners. ``cromwell`` to just return the workflow's output values as JSON or ``miniwdl`` to nest that under an ``outputs`` key and includes a ``dir`` key. +``--referenceInputs``: Specifies whether input files to Toil should be passed +around by URL reference instead of being imported into Toil's storage. Defaults +to off. Can be ``True`` or ``False`` or other similar words. + ``--container``: Specifies the container engine to use to run tasks. By default this is ``auto``, which tries Singularity if it is installed and Docker if it isn't. Can also be set to ``docker`` or ``singularity`` explicitly. +``--allCallOutputs``: Specifies whether outputs from all calls in a workflow +should be included alongside the outputs from the ``output`` section, when an +``output`` section is defined. For strict WDL spec compliance, should be set to +``False``. Usually defaults to ``False``. If the workflow includes metadata for +the `Cromwell Output Organizer (croo)`_, will default to ``True``. + +.. _`Cromwell Output Organizer (croo)`: https://github.com/ENCODE-DCC/croo + Any number of other Toil options may also be specified. For defined Toil options, see :ref:`commandRef`. diff --git a/src/toil/options/wdl.py b/src/toil/options/wdl.py index 09d6a9ecfe..6c40cf0e07 100644 --- a/src/toil/options/wdl.py +++ b/src/toil/options/wdl.py @@ -2,6 +2,8 @@ from configargparse import SUPPRESS +from toil.lib.conversions import strtobool + def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None: """ @@ -30,8 +32,11 @@ def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None: parser.add_argument(*output_file_arguments, dest="output_file", type=str, default=None, help=suppress_help or "File or URI to save output JSON to.") reference_inputs_arguments = ["--wdlReferenceInputs"] + (["--referenceInputs"] if not suppress else []) - parser.add_argument(*reference_inputs_arguments, dest="reference_inputs", type=bool, default=False, + parser.add_argument(*reference_inputs_arguments, dest="reference_inputs", type=strtobool, default=False, help=suppress_help or "Pass input files by URL") container_arguments = ["--wdlContainer"] + (["--container"] if not suppress else []) parser.add_argument(*container_arguments, dest="container", type=str, choices=["singularity", "docker", "auto"], default="auto", help=suppress_help or "Container engine to use to run WDL tasks") + all_call_outputs_arguments = ["--wdlAllCallOutputs"] + (["--allCallOutputs"] if not suppress else []) + parser.add_argument(*all_call_outputs_arguments, dest="all_call_outputs", type=strtobool, default=None, + help=suppress_help or "Keep and return all call outputs as workflow outputs") diff --git a/src/toil/test/wdl/testfiles/croo.wdl b/src/toil/test/wdl/testfiles/croo.wdl new file mode 100644 index 0000000000..bca2b9a253 --- /dev/null +++ b/src/toil/test/wdl/testfiles/croo.wdl @@ -0,0 +1,38 @@ +version 1.0 + +workflow wf { + meta { + # Advertise as needing the Cromwell Output Organizer + croo_out_def: 'https://storage.googleapis.com/encode-pipeline-output-definition/atac.croo.v5.json' + } + + input { + } + + call do_math { + input: + number = 3 + } + + Int should_never_output = do_math.cube - 1 + + output { + Int only_result = do_math.square + } +} + +task do_math { + input { + Int number + } + + # Not allowed to not have a command + command <<< + >>> + + output { + Int square = number * number + Int cube = number * number * number + } +} + diff --git a/src/toil/test/wdl/testfiles/not_enough_outputs.wdl b/src/toil/test/wdl/testfiles/not_enough_outputs.wdl new file mode 100644 index 0000000000..48986661b7 --- /dev/null +++ b/src/toil/test/wdl/testfiles/not_enough_outputs.wdl @@ -0,0 +1,33 @@ +version 1.0 + +workflow wf { + input { + } + + call do_math { + input: + number = 3 + } + + Int should_never_output = do_math.cube - 1 + + output { + Int only_result = do_math.square + } +} + +task do_math { + input { + Int number + } + + # Not allowed to not have a command + command <<< + >>> + + output { + Int square = number * number + Int cube = number * number * number + } +} + diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 83b7d4dc87..79a521714f 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -166,7 +166,7 @@ def test_url_to_file(self): assert 'url_to_file.first_line' in result assert isinstance(result['url_to_file.first_line'], str) self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328') - + @needs_docker def test_wait(self): """ @@ -182,6 +182,70 @@ def test_wait(self): assert isinstance(result['wait.result'], str) self.assertEqual(result['wait.result'], 'waited') + @needs_singularity_or_docker + def test_all_call_outputs(self): + """ + Test if Toil can collect all call outputs from a workflow that doesn't expose them. + """ + wdl = os.path.abspath('src/toil/test/wdl/testfiles/not_enough_outputs.wdl') + + # With no flag we don't include the call outputs + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' not in result + assert 'wf.do_math.cube' not in result + assert 'wf.should_never_output' not in result + + # With flag off we don't include the call outputs + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=false']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' not in result + assert 'wf.do_math.cube' not in result + assert 'wf.should_never_output' not in result + + # With flag on we do include the call outputs + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=on']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' in result + assert 'wf.do_math.cube' in result + assert 'wf.should_never_output' not in result + + @needs_singularity_or_docker + def test_croo_detection(self): + """ + Test if Toil can detect and do something sensible with Cromwell Output Organizer workflows. + """ + wdl = os.path.abspath('src/toil/test/wdl/testfiles/croo.wdl') + + # With no flag we should include all task outputs + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' in result + assert 'wf.do_math.cube' in result + assert 'wf.should_never_output' not in result + + # With flag off we obey the WDL spec even if we're suspicious + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=off']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' not in result + assert 'wf.do_math.cube' not in result + assert 'wf.should_never_output' not in result + def test_url_to_optional_file(self): """ Test if missing and error-producing URLs are handled correctly for optional File? values. diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 4e3069bc16..113a40d468 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -3381,9 +3381,20 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) try: - if self._workflow.outputs is None: - # The output section is not declared - # So get all task outputs and return that + if self._workflow.outputs is not None: + # Output section is declared and is nonempty, so evaluate normally + + # Combine the bindings from the previous job + with monkeypatch_coerce(standard_library): + output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) + else: + # If no output section is present, start with an empty bindings + output_bindings = WDL.Env.Bindings() + + if self._workflow.outputs is None or self._wdl_options.get("all_call_outputs", False): + # The output section is not declared, or we want to keep task outputs anyway. + + # Get all task outputs and return that # First get all task output names output_set = set() # We need to recurse down through scatters and conditionals to find all the task names. @@ -3409,12 +3420,6 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: if binding.name in output_set: # The bindings will already be namespaced with the task namespaces output_bindings = output_bindings.bind(binding.name, binding.value) - else: - # Output section is declared and is nonempty, so evaluate normally - - # Combine the bindings from the previous job - with monkeypatch_coerce(standard_library): - output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) finally: # We don't actually know when all our files are downloaded since # anything we evaluate might devirtualize inside any expression. @@ -3553,6 +3558,26 @@ def main() -> None: else: raise WDL.Error.InputError("WDL document is empty!") + if "croo_out_def" in target.meta: + # This workflow or task wants to have its outputs + # "organized" by the Cromwell Output Organizer: + # . + # + # TODO: We don't support generating anything that CROO can read. + logger.warning("This WDL expects to be used with the Cromwell Output Organizer (croo) . Toil cannot yet produce the outputs that croo requires. You will not be able to use croo on the output of this Toil run!") + + # But we can assume that we need to preserve individual + # taks outputs since the point of CROO is fetching those + # from Cromwell's output directories. + # + # This isn't quite WDL spec compliant but it will rescue + # runs of the popular + # + if options.all_call_outputs is None: + logger.warning("Inferring --allCallOutputs=True to preserve probable actual outputs of a croo WDL file.") + options.all_call_outputs = True + + if options.inputs_uri: # Load the inputs. Use the same loading mechanism, which means we # have to break into async temporarily. @@ -3612,6 +3637,7 @@ def main() -> None: wdl_options["execution_dir"] = execution_dir wdl_options["container"] = options.container assert wdl_options.get("container") is not None + wdl_options["all_call_outputs"] = options.all_call_outputs # Run the workflow and get its outputs namespaced with the workflow name. root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options)