Skip to content

Commit

Permalink
Make WDLOutputJob collect all task outputs (#4602)
Browse files Browse the repository at this point in the history
Co-authored-by: Adam Novak <anovak@soe.ucsc.edu>
  • Loading branch information
stxue1 and adamnovak authored Oct 5, 2023
1 parent ef2b923 commit b73b9ef
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2284,9 +2284,10 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
# Make jobs to run all the parts of the workflow
sink = self.create_subgraph(self._workflow.body, [], bindings)

if self._workflow.outputs:
if self._workflow.outputs != []: # Compare against empty list as None means there should be outputs
# Either the output section is declared and nonempty or it is not declared
# Add evaluating the outputs after the sink
outputs_job = WDLOutputsJob(self._workflow.outputs, sink.rv(), self._execution_dir)
outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._execution_dir)
sink.addFollowOn(outputs_job)
# Caller is responsible for making sure namespaces are applied
self.defer_postprocessing(outputs_job)
Expand All @@ -2301,31 +2302,44 @@ class WDLOutputsJob(WDLBaseJob):
Returns an environment with just the outputs bound, in no namespace.
"""

def __init__(self, outputs: List[WDL.Tree.Decl], bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any):
def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], execution_dir: Optional[str] = None, **kwargs: Any):
"""
Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs.
"""
super().__init__(execution_dir, **kwargs)

self._outputs = outputs
self._bindings = bindings
self._workflow = workflow

def run(self, file_store: AbstractFileStore) -> WDLBindings:
"""
Make bindings for the outputs.
"""
super().run(file_store)

# Evaluate all the outputs in the normal, non-task-outputs library context
standard_library = ToilWDLStdLibBase(file_store, self._execution_dir)
# Combine the bindings from the previous job

output_bindings = evaluate_output_decls(self._outputs, unwrap(self._bindings), standard_library)

if self._workflow.outputs is None:
# The output section is not declared
# So get all task outputs and return that
# First get all task output names
output_set = set()
for call in self._workflow.body:
if isinstance(call, WDL.Tree.Call):
for type_binding in call.effective_outputs:
output_set.add(type_binding.name)
# Collect all bindings that are task outputs
output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings()
for binding in unwrap(self._bindings):
if binding.name in output_set:
# The bindings will already be namespaced with the task namespaces
output_bindings = output_bindings.bind(binding.name, binding.value)
else:
# Output section is declared and is nonempty, so evaluate normally
# Evaluate all the outputs in the normal, non-task-outputs library context
standard_library = ToilWDLStdLibBase(file_store, self._execution_dir)
# Combine the bindings from the previous job
output_bindings = evaluate_output_decls(self._workflow.outputs, unwrap(self._bindings), standard_library)
return self.postprocess(output_bindings)


class WDLRootJob(WDLSectionJob):
"""
Job that evaluates an entire WDL workflow, and returns the workflow outputs
Expand Down

0 comments on commit b73b9ef

Please sign in to comment.