From 29a2164079fbd71417e439ce3b78155c266b0f31 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 18 Sep 2024 11:47:29 -0400 Subject: [PATCH 1/6] Stack up scatters and conditionals to find tasks and fix #5094 --- src/toil/test/wdl/wdltoil_test.py | 2 +- src/toil/wdl/wdltoil.py | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 6de389b11c..410aa2240e 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -46,7 +46,7 @@ def tearDown(self) -> None: WDL_CONFORMANCE_TEST_REPO = "https://github.com/DataBiosphere/wdl-conformance-tests.git" -WDL_CONFORMANCE_TEST_COMMIT = "01401a46bc0e60240fb2b69af4b978d0a5bd8fc8" +WDL_CONFORMANCE_TEST_COMMIT = "aa3b9807d13165e265f4ea0db0fdc6957925e545" # These tests are known to require things not implemented by # Toil and will not be run in CI. WDL_CONFORMANCE_TESTS_UNSUPPORTED_BY_TOIL= [ diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 158398dd5f..219a543217 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -3396,10 +3396,23 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # So get all task outputs and return that # First get all task output names output_set = set() - for call in self._workflow.body: - if isinstance(call, WDL.Tree.Call): - for type_binding in call.effective_outputs: + # We need to recurse down through scatters and conditionals to find all the task names. + # The output variable names won't involve the scatters or conditionals as components. + stack = list(self._workflow.body) + while stack != []: + node = stack.pop() + if isinstance(node, WDL.Tree.Call): + # For calls, promote all output names to workflow output names + # TODO: Does effective_outputs already have the right + # stuff for calls to workflows that themselves lack + # output sections? If so, can't we just use that for + # *this* workflow? + for type_binding in node.effective_outputs: output_set.add(type_binding.name) + elif isinstance(node, WDL.Tree.Scatter) or isinstance(node, WDL.Tree.Conditional): + # For scatters and conditionals, recurse looking for calls. + for subnode in node.body: + stack.append(subnode) # Collect all bindings that are task outputs output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for binding in unwrap(self._bindings): From 27e1afeddf468dcbf5cdc944a787c4170e9bde88 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Tue, 24 Sep 2024 14:06:27 -0400 Subject: [PATCH 2/6] Use conformance tests with correct failure expectation --- src/toil/test/wdl/wdltoil_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 410aa2240e..83b7d4dc87 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -46,7 +46,7 @@ def tearDown(self) -> None: WDL_CONFORMANCE_TEST_REPO = "https://github.com/DataBiosphere/wdl-conformance-tests.git" -WDL_CONFORMANCE_TEST_COMMIT = "aa3b9807d13165e265f4ea0db0fdc6957925e545" +WDL_CONFORMANCE_TEST_COMMIT = "2d617b703a33791f75f30a9db43c3740a499cd89" # These tests are known to require things not implemented by # Toil and will not be run in CI. WDL_CONFORMANCE_TESTS_UNSUPPORTED_BY_TOIL= [ From 61133d3cef72cc435179793ed110215c2db00acf Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 18 Sep 2024 12:20:28 -0400 Subject: [PATCH 3/6] Add --allCallOutputs to add call outputs to workflow output --- src/toil/options/wdl.py | 7 +++- .../test/wdl/testfiles/not_enough_outputs.wdl | 33 +++++++++++++++++++ src/toil/test/wdl/wdltoil_test.py | 29 +++++++++++++++- src/toil/wdl/wdltoil.py | 27 ++++++++------- 4 files changed, 83 insertions(+), 13 deletions(-) create mode 100644 src/toil/test/wdl/testfiles/not_enough_outputs.wdl diff --git a/src/toil/options/wdl.py b/src/toil/options/wdl.py index 09d6a9ecfe..88c9921a21 100644 --- a/src/toil/options/wdl.py +++ b/src/toil/options/wdl.py @@ -2,6 +2,8 @@ from configargparse import SUPPRESS +from toil.lib.conversions import strtobool + def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None: """ @@ -30,8 +32,11 @@ def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None: parser.add_argument(*output_file_arguments, dest="output_file", type=str, default=None, help=suppress_help or "File or URI to save output JSON to.") reference_inputs_arguments = ["--wdlReferenceInputs"] + (["--referenceInputs"] if not suppress else []) - parser.add_argument(*reference_inputs_arguments, dest="reference_inputs", type=bool, default=False, + parser.add_argument(*reference_inputs_arguments, dest="reference_inputs", type=strtobool, default=False, help=suppress_help or "Pass input files by URL") container_arguments = ["--wdlContainer"] + (["--container"] if not suppress else []) parser.add_argument(*container_arguments, dest="container", type=str, choices=["singularity", "docker", "auto"], default="auto", help=suppress_help or "Container engine to use to run WDL tasks") + all_call_outputs_arguments = ["--wdlAllCallOutputs"] + (["--allCallOutputs"] if not suppress else []) + parser.add_argument(*all_call_outputs_arguments, dest="all_call_outputs", type=strtobool, default=False, + help=suppress_help or "Keep and return all call outputs as workflow outputs") diff --git a/src/toil/test/wdl/testfiles/not_enough_outputs.wdl b/src/toil/test/wdl/testfiles/not_enough_outputs.wdl new file mode 100644 index 0000000000..48986661b7 --- /dev/null +++ b/src/toil/test/wdl/testfiles/not_enough_outputs.wdl @@ -0,0 +1,33 @@ +version 1.0 + +workflow wf { + input { + } + + call do_math { + input: + number = 3 + } + + Int should_never_output = do_math.cube - 1 + + output { + Int only_result = do_math.square + } +} + +task do_math { + input { + Int number + } + + # Not allowed to not have a command + command <<< + >>> + + output { + Int square = number * number + Int cube = number * number * number + } +} + diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 83b7d4dc87..7b978a9fb8 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -166,7 +166,7 @@ def test_url_to_file(self): assert 'url_to_file.first_line' in result assert isinstance(result['url_to_file.first_line'], str) self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328') - + @needs_docker def test_wait(self): """ @@ -182,6 +182,33 @@ def test_wait(self): assert isinstance(result['wait.result'], str) self.assertEqual(result['wait.result'], 'waited') + @needs_singularity_or_docker + def test_all_call_outputs(self): + """ + Test if Toil can collect all call outputs from a workflow that doesn't expose them. + """ + wdl = os.path.abspath('src/toil/test/wdl/testfiles/not_enough_outputs.wdl') + + # With flag off we don't include the call outputs + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=false']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' not in result + assert 'wf.do_math.cube' not in result + assert 'wf.should_never_output' not in result + + # With flag on we do include the call outputs + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=on']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' in result + assert 'wf.do_math.cube' in result + assert 'wf.should_never_output' not in result + def test_url_to_optional_file(self): """ Test if missing and error-producing URLs are handled correctly for optional File? values. diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 219a543217..2840f4b86b 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -3391,9 +3391,20 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) try: - if self._workflow.outputs is None: - # The output section is not declared - # So get all task outputs and return that + if self._workflow.outputs is not None: + # Output section is declared and is nonempty, so evaluate normally + + # Combine the bindings from the previous job + with monkeypatch_coerce(standard_library): + output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) + else: + # If no output section is present, start with an empty bindings + output_bindings = WDL.Env.Bindings() + + if self._workflow.outputs is None or self._wdl_options.get("all_call_outputs", False): + # The output section is not declared, or we want to keep task outputs anyway. + + # Get all task outputs and return that # First get all task output names output_set = set() # We need to recurse down through scatters and conditionals to find all the task names. @@ -3413,18 +3424,11 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # For scatters and conditionals, recurse looking for calls. for subnode in node.body: stack.append(subnode) - # Collect all bindings that are task outputs - output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() + # Add in all bindings that are task outputs for binding in unwrap(self._bindings): if binding.name in output_set: # The bindings will already be namespaced with the task namespaces output_bindings = output_bindings.bind(binding.name, binding.value) - else: - # Output section is declared and is nonempty, so evaluate normally - - # Combine the bindings from the previous job - with monkeypatch_coerce(standard_library): - output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library) finally: # We don't actually know when all our files are downloaded since # anything we evaluate might devirtualize inside any expression. @@ -3622,6 +3626,7 @@ def main() -> None: wdl_options["execution_dir"] = execution_dir wdl_options["container"] = options.container assert wdl_options.get("container") is not None + wdl_options["all_call_outputs"] = options.all_call_outputs # Run the workflow and get its outputs namespaced with the workflow name. root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options) From 9bc9dff364fc898d639cfdc9173c6a659f4a123d Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 18 Sep 2024 12:41:16 -0400 Subject: [PATCH 4/6] Default to including all task outputs for croo workflows --- src/toil/options/wdl.py | 2 +- src/toil/test/wdl/testfiles/croo.wdl | 38 ++++++++++++++++++++++++++++ src/toil/test/wdl/wdltoil_test.py | 37 +++++++++++++++++++++++++++ src/toil/wdl/wdltoil.py | 20 +++++++++++++++ 4 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 src/toil/test/wdl/testfiles/croo.wdl diff --git a/src/toil/options/wdl.py b/src/toil/options/wdl.py index 88c9921a21..6c40cf0e07 100644 --- a/src/toil/options/wdl.py +++ b/src/toil/options/wdl.py @@ -38,5 +38,5 @@ def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None: parser.add_argument(*container_arguments, dest="container", type=str, choices=["singularity", "docker", "auto"], default="auto", help=suppress_help or "Container engine to use to run WDL tasks") all_call_outputs_arguments = ["--wdlAllCallOutputs"] + (["--allCallOutputs"] if not suppress else []) - parser.add_argument(*all_call_outputs_arguments, dest="all_call_outputs", type=strtobool, default=False, + parser.add_argument(*all_call_outputs_arguments, dest="all_call_outputs", type=strtobool, default=None, help=suppress_help or "Keep and return all call outputs as workflow outputs") diff --git a/src/toil/test/wdl/testfiles/croo.wdl b/src/toil/test/wdl/testfiles/croo.wdl new file mode 100644 index 0000000000..bca2b9a253 --- /dev/null +++ b/src/toil/test/wdl/testfiles/croo.wdl @@ -0,0 +1,38 @@ +version 1.0 + +workflow wf { + meta { + # Advertise as needing the Cromwell Output Organizer + croo_out_def: 'https://storage.googleapis.com/encode-pipeline-output-definition/atac.croo.v5.json' + } + + input { + } + + call do_math { + input: + number = 3 + } + + Int should_never_output = do_math.cube - 1 + + output { + Int only_result = do_math.square + } +} + +task do_math { + input { + Int number + } + + # Not allowed to not have a command + command <<< + >>> + + output { + Int square = number * number + Int cube = number * number * number + } +} + diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index 7b978a9fb8..79a521714f 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -189,6 +189,16 @@ def test_all_call_outputs(self): """ wdl = os.path.abspath('src/toil/test/wdl/testfiles/not_enough_outputs.wdl') + # With no flag we don't include the call outputs + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' not in result + assert 'wf.do_math.cube' not in result + assert 'wf.should_never_output' not in result + # With flag off we don't include the call outputs result_json = subprocess.check_output( self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=false']) @@ -209,6 +219,33 @@ def test_all_call_outputs(self): assert 'wf.do_math.cube' in result assert 'wf.should_never_output' not in result + @needs_singularity_or_docker + def test_croo_detection(self): + """ + Test if Toil can detect and do something sensible with Cromwell Output Organizer workflows. + """ + wdl = os.path.abspath('src/toil/test/wdl/testfiles/croo.wdl') + + # With no flag we should include all task outputs + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' in result + assert 'wf.do_math.cube' in result + assert 'wf.should_never_output' not in result + + # With flag off we obey the WDL spec even if we're suspicious + result_json = subprocess.check_output( + self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=off']) + result = json.loads(result_json) + + assert 'wf.only_result' in result + assert 'wf.do_math.square' not in result + assert 'wf.do_math.cube' not in result + assert 'wf.should_never_output' not in result + def test_url_to_optional_file(self): """ Test if missing and error-producing URLs are handled correctly for optional File? values. diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 2840f4b86b..e6f96e1adf 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -3567,6 +3567,26 @@ def main() -> None: else: raise WDL.Error.InputError("WDL document is empty!") + if "croo_out_def" in target.meta: + # This workflow or task wants to have its outputs + # "organized" by the Cromwell Output Organizer: + # . + # + # TODO: We don't support generating anything that CROO can read. + logger.warning("This WDL expects to be used with the Cromwell Output Organizer (croo) . Toil cannot yet produce the outputs that croo requires. You will not be able to use croo on the output of this Toil run!") + + # But we can assume that we need to preserve individual + # taks outputs since the point of CROO is fetching those + # from Cromwell's output directories. + # + # This isn't quite WDL spec compliant but it will rescue + # runs of the popular + # + if options.all_call_outputs is None: + logger.warning("Inferring --allCallOutputs=True to preserve probable actual outputs of a croo WDL file.") + options.all_call_outputs = True + + if options.inputs_uri: # Load the inputs. Use the same loading mechanism, which means we # have to break into async temporarily. From c7ba1d1877f90a95f2ec12d5d3b423e0304bb3b6 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 18 Sep 2024 12:48:53 -0400 Subject: [PATCH 5/6] Document new option and missing option --- docs/wdl/running.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/wdl/running.rst b/docs/wdl/running.rst index 6274a68f32..e18160190d 100644 --- a/docs/wdl/running.rst +++ b/docs/wdl/running.rst @@ -68,10 +68,23 @@ input JSON file, for compatibility with other WDL runners. ``cromwell`` to just return the workflow's output values as JSON or ``miniwdl`` to nest that under an ``outputs`` key and includes a ``dir`` key. +``--referenceInputs``: Specifies whether input files to Toil should be passed +around by URL reference instead of being imported into Toil's storage. Defaults +to off. Can be ``True`` or ``False`` or other similar words. + ``--container``: Specifies the container engine to use to run tasks. By default this is ``auto``, which tries Singularity if it is installed and Docker if it isn't. Can also be set to ``docker`` or ``singularity`` explicitly. +``--allCallOutputs``: Specifies whether outputs from all calls in a workflow +should be included alongside the outputs from the ``output`` section, when an +``output`` section is defined. For strict WDL 1.0 compliance, should be set to +``False``. Defaults to unset. If the workflow includes metadata for the +`Cromwell Output Organizer (croo)`_, will default to ``True``. Otherwise, +defaults to ``False``. + +.. _`Cromwell Output Organizer (croo)`: https://github.com/ENCODE-DCC/croo + Any number of other Toil options may also be specified. For defined Toil options, see :ref:`commandRef`. From e71e7aa0315d402e6a20c1116f5ab95e94509c37 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Wed, 18 Sep 2024 12:51:17 -0400 Subject: [PATCH 6/6] Improve option doc wording --- docs/wdl/running.rst | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/wdl/running.rst b/docs/wdl/running.rst index e18160190d..79fab3160b 100644 --- a/docs/wdl/running.rst +++ b/docs/wdl/running.rst @@ -78,10 +78,9 @@ isn't. Can also be set to ``docker`` or ``singularity`` explicitly. ``--allCallOutputs``: Specifies whether outputs from all calls in a workflow should be included alongside the outputs from the ``output`` section, when an -``output`` section is defined. For strict WDL 1.0 compliance, should be set to -``False``. Defaults to unset. If the workflow includes metadata for the -`Cromwell Output Organizer (croo)`_, will default to ``True``. Otherwise, -defaults to ``False``. +``output`` section is defined. For strict WDL spec compliance, should be set to +``False``. Usually defaults to ``False``. If the workflow includes metadata for +the `Cromwell Output Organizer (croo)`_, will default to ``True``. .. _`Cromwell Output Organizer (croo)`: https://github.com/ENCODE-DCC/croo