Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sniffing for and workaround for keeping output of croo workflows #5096

Merged
merged 9 commits into from
Sep 26, 2024
12 changes: 12 additions & 0 deletions docs/wdl/running.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,22 @@ input JSON file, for compatibility with other WDL runners.
``cromwell`` to just return the workflow's output values as JSON or ``miniwdl``
to nest that under an ``outputs`` key and includes a ``dir`` key.

``--referenceInputs``: Specifies whether input files to Toil should be passed
around by URL reference instead of being imported into Toil's storage. Defaults
to off. Can be ``True`` or ``False`` or other similar words.

``--container``: Specifies the container engine to use to run tasks. By default
this is ``auto``, which tries Singularity if it is installed and Docker if it
isn't. Can also be set to ``docker`` or ``singularity`` explicitly.

``--allCallOutputs``: Specifies whether outputs from all calls in a workflow
should be included alongside the outputs from the ``output`` section, when an
``output`` section is defined. For strict WDL spec compliance, should be set to
``False``. Usually defaults to ``False``. If the workflow includes metadata for
the `Cromwell Output Organizer (croo)`_, will default to ``True``.

.. _`Cromwell Output Organizer (croo)`: https://github.com/ENCODE-DCC/croo

Any number of other Toil options may also be specified. For defined Toil options,
see :ref:`commandRef`.

Expand Down
7 changes: 6 additions & 1 deletion src/toil/options/wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from configargparse import SUPPRESS

from toil.lib.conversions import strtobool


def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None:
"""
Expand Down Expand Up @@ -30,8 +32,11 @@ def add_wdl_options(parser: ArgumentParser, suppress: bool = True) -> None:
parser.add_argument(*output_file_arguments, dest="output_file", type=str, default=None,
help=suppress_help or "File or URI to save output JSON to.")
reference_inputs_arguments = ["--wdlReferenceInputs"] + (["--referenceInputs"] if not suppress else [])
parser.add_argument(*reference_inputs_arguments, dest="reference_inputs", type=bool, default=False,
parser.add_argument(*reference_inputs_arguments, dest="reference_inputs", type=strtobool, default=False,
help=suppress_help or "Pass input files by URL")
container_arguments = ["--wdlContainer"] + (["--container"] if not suppress else [])
parser.add_argument(*container_arguments, dest="container", type=str, choices=["singularity", "docker", "auto"], default="auto",
help=suppress_help or "Container engine to use to run WDL tasks")
all_call_outputs_arguments = ["--wdlAllCallOutputs"] + (["--allCallOutputs"] if not suppress else [])
parser.add_argument(*all_call_outputs_arguments, dest="all_call_outputs", type=strtobool, default=None,
help=suppress_help or "Keep and return all call outputs as workflow outputs")
38 changes: 38 additions & 0 deletions src/toil/test/wdl/testfiles/croo.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
version 1.0

workflow wf {
meta {
# Advertise as needing the Cromwell Output Organizer
croo_out_def: 'https://storage.googleapis.com/encode-pipeline-output-definition/atac.croo.v5.json'
}

input {
}

call do_math {
input:
number = 3
}

Int should_never_output = do_math.cube - 1

output {
Int only_result = do_math.square
}
}

task do_math {
input {
Int number
}

# Not allowed to not have a command
command <<<
>>>

output {
Int square = number * number
Int cube = number * number * number
}
}

33 changes: 33 additions & 0 deletions src/toil/test/wdl/testfiles/not_enough_outputs.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
version 1.0

workflow wf {
input {
}

call do_math {
input:
number = 3
}

Int should_never_output = do_math.cube - 1

output {
Int only_result = do_math.square
}
}

task do_math {
input {
Int number
}

# Not allowed to not have a command
command <<<
>>>

output {
Int square = number * number
Int cube = number * number * number
}
}

68 changes: 66 additions & 2 deletions src/toil/test/wdl/wdltoil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def tearDown(self) -> None:


WDL_CONFORMANCE_TEST_REPO = "https://github.com/DataBiosphere/wdl-conformance-tests.git"
WDL_CONFORMANCE_TEST_COMMIT = "01401a46bc0e60240fb2b69af4b978d0a5bd8fc8"
WDL_CONFORMANCE_TEST_COMMIT = "2d617b703a33791f75f30a9db43c3740a499cd89"
# These tests are known to require things not implemented by
# Toil and will not be run in CI.
WDL_CONFORMANCE_TESTS_UNSUPPORTED_BY_TOIL= [
Expand Down Expand Up @@ -166,7 +166,7 @@ def test_url_to_file(self):
assert 'url_to_file.first_line' in result
assert isinstance(result['url_to_file.first_line'], str)
self.assertEqual(result['url_to_file.first_line'], 'chr1\t248387328')

@needs_docker
def test_wait(self):
"""
Expand All @@ -182,6 +182,70 @@ def test_wait(self):
assert isinstance(result['wait.result'], str)
self.assertEqual(result['wait.result'], 'waited')

@needs_singularity_or_docker
def test_all_call_outputs(self):
"""
Test if Toil can collect all call outputs from a workflow that doesn't expose them.
"""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/not_enough_outputs.wdl')

# With no flag we don't include the call outputs
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' not in result
assert 'wf.do_math.cube' not in result
assert 'wf.should_never_output' not in result

# With flag off we don't include the call outputs
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=false'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' not in result
assert 'wf.do_math.cube' not in result
assert 'wf.should_never_output' not in result

# With flag on we do include the call outputs
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=on'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' in result
assert 'wf.do_math.cube' in result
assert 'wf.should_never_output' not in result

@needs_singularity_or_docker
def test_croo_detection(self):
"""
Test if Toil can detect and do something sensible with Cromwell Output Organizer workflows.
"""
wdl = os.path.abspath('src/toil/test/wdl/testfiles/croo.wdl')

# With no flag we should include all task outputs
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' in result
assert 'wf.do_math.cube' in result
assert 'wf.should_never_output' not in result

# With flag off we obey the WDL spec even if we're suspicious
result_json = subprocess.check_output(
self.base_command + [wdl, '-o', self.output_dir, '--logInfo', '--retryCount=0', '--allCallOutputs=off'])
result = json.loads(result_json)

assert 'wf.only_result' in result
assert 'wf.do_math.square' not in result
assert 'wf.do_math.cube' not in result
assert 'wf.should_never_output' not in result

def test_url_to_optional_file(self):
"""
Test if missing and error-producing URLs are handled correctly for optional File? values.
Expand Down
66 changes: 52 additions & 14 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3381,27 +3381,44 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings:
standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir"))

try:
if self._workflow.outputs is None:
# The output section is not declared
# So get all task outputs and return that
if self._workflow.outputs is not None:
# Output section is declared and is nonempty, so evaluate normally

# Combine the bindings from the previous job
with monkeypatch_coerce(standard_library):
output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library)
else:
# If no output section is present, start with an empty bindings
output_bindings = WDL.Env.Bindings()

if self._workflow.outputs is None or self._wdl_options.get("all_call_outputs", False):
# The output section is not declared, or we want to keep task outputs anyway.

# Get all task outputs and return that
# First get all task output names
output_set = set()
for call in self._workflow.body:
if isinstance(call, WDL.Tree.Call):
for type_binding in call.effective_outputs:
# We need to recurse down through scatters and conditionals to find all the task names.
# The output variable names won't involve the scatters or conditionals as components.
stack = list(self._workflow.body)
while stack != []:
node = stack.pop()
if isinstance(node, WDL.Tree.Call):
# For calls, promote all output names to workflow output names
# TODO: Does effective_outputs already have the right
# stuff for calls to workflows that themselves lack
# output sections? If so, can't we just use that for
# *this* workflow?
for type_binding in node.effective_outputs:
output_set.add(type_binding.name)
# Collect all bindings that are task outputs
output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings()
elif isinstance(node, WDL.Tree.Scatter) or isinstance(node, WDL.Tree.Conditional):
# For scatters and conditionals, recurse looking for calls.
for subnode in node.body:
stack.append(subnode)
# Add in all bindings that are task outputs
for binding in unwrap(self._bindings):
if binding.name in output_set:
# The bindings will already be namespaced with the task namespaces
output_bindings = output_bindings.bind(binding.name, binding.value)
else:
# Output section is declared and is nonempty, so evaluate normally

# Combine the bindings from the previous job
with monkeypatch_coerce(standard_library):
output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library)
finally:
# We don't actually know when all our files are downloaded since
# anything we evaluate might devirtualize inside any expression.
Expand Down Expand Up @@ -3540,6 +3557,26 @@ def main() -> None:
else:
raise WDL.Error.InputError("WDL document is empty!")

if "croo_out_def" in target.meta:
# This workflow or task wants to have its outputs
# "organized" by the Cromwell Output Organizer:
# <https://github.com/ENCODE-DCC/croo>.
#
# TODO: We don't support generating anything that CROO can read.
logger.warning("This WDL expects to be used with the Cromwell Output Organizer (croo) <https://github.com/ENCODE-DCC/croo>. Toil cannot yet produce the outputs that croo requires. You will not be able to use croo on the output of this Toil run!")

# But we can assume that we need to preserve individual
# taks outputs since the point of CROO is fetching those
# from Cromwell's output directories.
#
# This isn't quite WDL spec compliant but it will rescue
# runs of the popular
# <https://github.com/ENCODE-DCC/atac-seq-pipeline>
if options.all_call_outputs is None:
logger.warning("Inferring --allCallOutputs=True to preserve probable actual outputs of a croo WDL file.")
options.all_call_outputs = True


if options.inputs_uri:
# Load the inputs. Use the same loading mechanism, which means we
# have to break into async temporarily.
Expand Down Expand Up @@ -3599,6 +3636,7 @@ def main() -> None:
wdl_options["execution_dir"] = execution_dir
wdl_options["container"] = options.container
assert wdl_options.get("container") is not None
wdl_options["all_call_outputs"] = options.all_call_outputs

# Run the workflow and get its outputs namespaced with the workflow name.
root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options)
Expand Down