Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support importing on workers in WDL #5103

Merged
merged 22 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
33fa295
mirror import on workers update to wdl
stxue1 Sep 26, 2024
9a6612a
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Sep 26, 2024
e90bfbe
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Sep 26, 2024
a46a001
Apply suggestions from code review
stxue1 Sep 27, 2024
c55330c
Merge branch 'master' of github.com:DataBiosphere/toil into issues/50…
stxue1 Sep 27, 2024
413e3ea
Rename WDLRootJob, remove kebab case from WDL, and raise an error if …
stxue1 Sep 27, 2024
0fcdf35
Move out shared runner options
stxue1 Sep 28, 2024
d615a4e
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Sep 30, 2024
361551d
Add runner.py
stxue1 Sep 30, 2024
10f02f2
Merge branch 'issues/5025-import-on-workers-wdl' of github.com:DataBi…
stxue1 Sep 30, 2024
c6b4079
Merge branch 'master' of github.com:DataBiosphere/toil into issues/50…
stxue1 Oct 2, 2024
9329327
Add logDebug to giraffe workflow
stxue1 Oct 3, 2024
881ab9a
Add toil.import_file logic into the wdl import function
stxue1 Oct 3, 2024
1bc9b3e
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 3, 2024
b6f2ab0
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 3, 2024
b016723
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 3, 2024
f626dea
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 4, 2024
a1ebbf4
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 4, 2024
7b2e167
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 7, 2024
3ba0391
Reconnect test command argument to its option
adamnovak Oct 7, 2024
0a17f9c
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 8, 2024
3f8c8ca
Merge master into issues/5025-import-on-workers-wdl
github-actions[bot] Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

from toil.options.common import add_base_toil_options, JOBSTORE_HELP
from toil.options.cwl import add_cwl_options
from toil.options.runner import add_runner_options
from toil.options.wdl import add_wdl_options

if sys.version_info >= (3, 8):
Expand Down Expand Up @@ -582,6 +583,12 @@ def create_config_dict_from_parser(parser: ArgumentParser) -> CommentedMap:
"`toil config ~/.toil/default.yaml`.\n\nBASE TOIL OPTIONS\n")
all_data.append(toil_base_data)

parser = ArgParser(YAMLConfigFileParser())
add_runner_options(parser)
toil_cwl_data = create_config_dict_from_parser(parser)
toil_cwl_data.yaml_set_start_comment("\nTOIL SHARED CWL AND WDL RUNNER OPTIONS")
all_data.append(toil_cwl_data)

parser = ArgParser(YAMLConfigFileParser())
add_cwl_options(parser)
toil_cwl_data = create_config_dict_from_parser(parser)
Expand Down Expand Up @@ -616,10 +623,10 @@ def create_config_dict_from_parser(parser: ArgumentParser) -> CommentedMap:


def parser_with_common_options(
provisioner_options: bool = False,
jobstore_option: bool = True,
prog: Optional[str] = None,
default_log_level: Optional[int] = None
provisioner_options: bool = False,
jobstore_option: bool = True,
prog: Optional[str] = None,
default_log_level: Optional[int] = None
) -> ArgParser:
parser = ArgParser(prog=prog or "Toil", formatter_class=ArgumentDefaultsHelpFormatter)

Expand Down Expand Up @@ -694,6 +701,8 @@ def addOptions(parser: ArgumentParser, jobstore_as_flag: bool = False, cwl: bool
# This is done so the config file can hold all available options
add_cwl_options(parser, suppress=not cwl)
add_wdl_options(parser, suppress=not wdl)
# Add shared runner options
add_runner_options(parser)

def check_arguments(typ: str) -> None:
"""
Expand All @@ -707,6 +716,7 @@ def check_arguments(typ: str) -> None:
add_cwl_options(check_parser)
if typ == "cwl":
add_wdl_options(check_parser)
add_runner_options(check_parser)
for action in check_parser._actions:
action.default = SUPPRESS
other_options, _ = check_parser.parse_known_args(sys.argv[1:], ignore_help_args=True)
Expand Down Expand Up @@ -912,7 +922,6 @@ def start(self, rootJob: "Job") -> Any:
# Check that the rootJob has been initialized
rootJob.check_initialized()


# Write shared files to the job store
self._jobStore.write_leader_pid()
self._jobStore.write_leader_node_id()
Expand Down
16 changes: 0 additions & 16 deletions src/toil/options/cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from argparse import ArgumentParser

from configargparse import SUPPRESS
from toil.lib.conversions import human2bytes

from toil.version import baseVersion

Expand Down Expand Up @@ -282,21 +281,6 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None:
help=suppress_help or "Disable file streaming for files that have 'streamable' flag True",
dest="disable_streaming",
)
parser.add_argument(
"--runImportsOnWorkers", "--run-imports-on-workers",
action="store_true",
default=False,
help=suppress_help or "Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance."
"If set to true, the argument --importWorkersDisk must also be set.",
dest="run_imports_on_workers"
)

parser.add_argument("--importWorkersDisk", "--import-workers-disk",
help=suppress_help or "Specify the amount of disk space an import worker will use. If file streaming for input files is not available,"
"this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.",
dest="import_workers_disk",
type=lambda x: human2bytes(str(x)),
default=None)

provgroup = parser.add_argument_group(
"Options for recording provenance information of the execution"
Expand Down
25 changes: 25 additions & 0 deletions src/toil/options/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from argparse import ArgumentParser
from toil.lib.conversions import human2bytes


def add_runner_options(parser: ArgumentParser, cwl: bool = False, wdl: bool = False) -> None:
"""
Add to the WDL or CWL runners options that are shared or the same between runners
:param parser: parser to add arguments to
:param cwl: bool
:param wdl: bool
:return: None
"""
# This function should be constructed so that even when wdl and cwl are false, the "default" options are still added
run_imports_on_workers_arguments = ["--runImportsOnWorkers"]
if cwl:
run_imports_on_workers_arguments.append("--run-imports-on-workers")
parser.add_argument(*run_imports_on_workers_arguments, action="store_true", default=False, dest="run_imports_on_workers",
Comment on lines +14 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we usually like to make new boolean options use str2bool and accept False/True, since we've been upgrading so many of the old ones to that format. Maybe we should upgrade this one too?

help="Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance. "
"If set to true, the argument --importWorkersDisk must also be set.")
import_workers_disk_arguments = ["--importWorkersDisk"]
if cwl:
import_workers_disk_arguments.append("--import-workers-disk")
parser.add_argument(*import_workers_disk_arguments, dest="import_workers_disk", type=lambda x: human2bytes(str(x)), default=None,
help="Specify the amount of disk space an import worker will use. If file streaming for input files is not available, "
"this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.")
14 changes: 12 additions & 2 deletions src/toil/test/wdl/wdltoil_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,18 @@ def test_giraffe(self):
json_file = f"{base_uri}/params/giraffe.json"

result_json = subprocess.check_output(
self.base_command + [wdl_file, json_file, '-o', self.output_dir, '--outputDialect', 'miniwdl', '--scale',
'0.1'])
self.base_command + [
wdl_file,
json_file,
'-o',
self.output_dir,
'--outputDialect',
'miniwdl',
'--scale',
'0.1',
'--logDebug',
]
)
result = json.loads(result_json)

# Expect MiniWDL-style output with a designated "dir"
Expand Down
78 changes: 61 additions & 17 deletions src/toil/wdl/wdltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@

import WDL.Error
import WDL.runtime.config
from configargparse import ArgParser
from configargparse import ArgParser, Namespace
from WDL._util import byte_size_units, chmod_R_plus
from WDL.Tree import ReadSourceResult
from WDL.CLI import print_error
Expand All @@ -83,7 +83,8 @@
TemporaryID,
parse_accelerator,
unwrap,
unwrap_all)
unwrap_all,
ParseableIndivisibleResource)
from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException,
InvalidImportExportUrlException, LocatorException)
from toil.lib.accelerators import get_individual_local_accelerators
Expand Down Expand Up @@ -351,7 +352,7 @@ def first_mismatch(prefix: str, value: str) -> int:
return modified


def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None) -> Iterator[str]:
def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None, execution_dir: Optional[str] = None) -> Iterator[str]:
"""
Get potential absolute URIs to check for an imported file.

Expand Down Expand Up @@ -393,7 +394,8 @@ def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tr
full_path_list.append(Toil.normalize_uri(importer.pos.abspath))

# Then the current directory. We need to make sure to include a filename component here or it will treat the current directory with no trailing / as a document and relative paths will look 1 level up.
full_path_list.append(Toil.normalize_uri('.') + '/.')
# When importing on a worker, the cwd will be a tmpdir and will result in FileNotFoundError after os.path.abspath, so override with the execution dir
full_path_list.append(Toil.normalize_uri(execution_dir or '.') + '/.')

# Then the specified paths.
# TODO:
Expand Down Expand Up @@ -789,10 +791,10 @@ def is_toil_url(filename: str) -> bool:


def is_standard_url(filename: str) -> bool:
return is_url(filename, ['http:', 'https:', 's3:', 'gs:'])
return is_url(filename, ['http:', 'https:', 's3:', 'gs:', 'ftp:'])


def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool:
def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', 'ftp:', TOIL_URI_SCHEME]) -> bool:
"""
Decide if a filename is a known kind of URL
"""
Expand All @@ -801,7 +803,8 @@ def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', T
return True
return False

def convert_remote_files(environment: WDLBindings, file_source: Toil, task_path: str, search_paths: Optional[List[str]] = None, import_remote_files: bool = True) -> None:
def convert_remote_files(environment: WDLBindings, file_source: AbstractJobStore, task_path: str, search_paths: Optional[List[str]] = None, import_remote_files: bool = True,
execution_dir: Optional[str] = None) -> None:
"""
Resolve relative-URI files in the given environment and import all files.

Expand All @@ -827,7 +830,7 @@ def import_filename(filename: str) -> Tuple[Optional[str], Optional[str]]:
"""
# Search through any input search paths passed in and download it if found
tried = []
for candidate_uri in potential_absolute_uris(filename, search_paths if search_paths is not None else []):
for candidate_uri in potential_absolute_uris(filename, search_paths if search_paths is not None else [], execution_dir=execution_dir):
tried.append(candidate_uri)
try:
if not import_remote_files and is_url(candidate_uri):
Expand All @@ -840,12 +843,8 @@ def import_filename(filename: str) -> Tuple[Optional[str], Optional[str]]:
return candidate_uri, None
else:
# Actually import
# Try to import the file. Don't raise if we can't find it, just
# return None!
imported = file_source.import_file(candidate_uri, check_existence=False)
if imported is None:
# Wasn't found there
continue
# Try to import the file. If we can't find it, continue
imported = file_source.import_file(candidate_uri)
except UnimplementedURLException as e:
# We can't find anything that can even support this URL scheme.
# Report to the user, they are probably missing an extra.
Expand All @@ -856,6 +855,9 @@ def import_filename(filename: str) -> Tuple[Optional[str], Optional[str]]:
logger.warning("Checked URL %s but got HTTP status %s", candidate_uri, e.code)
# Try the next location.
continue
except FileNotFoundError:
# Wasn't found there
continue
except Exception:
# Something went wrong besides the file not being found. Maybe
# we have no auth.
Expand Down Expand Up @@ -3630,7 +3632,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings:

return self.postprocess(output_bindings)

class WDLRootJob(WDLSectionJob):
class WDLStartJob(WDLSectionJob):
"""
Job that evaluates an entire WDL workflow, and returns the workflow outputs
namespaced with the workflow name. Inputs may or may not be namespaced with
Expand Down Expand Up @@ -3666,6 +3668,44 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
self.defer_postprocessing(job)
return job.rv()

class WDLImportJob(WDLSectionJob):
def __init__(self, target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, wdl_options: WDLContext, path: Optional[List[str]] = None, skip_remote: bool = False,
disk_size: Optional[ParseableIndivisibleResource] = None, **kwargs: Any):
"""
Job to take the inputs from the WDL workflow and import them on a worker instead of a leader. Assumes all local and cloud files are accessible.

This class is only used when runImportsOnWorkers is enabled.
"""
super().__init__(wdl_options=wdl_options, local=False,
disk=disk_size, **kwargs)
self._target = target
self._inputs = inputs
self._path = path
self._skip_remote = skip_remote

def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]:
"""
Import the workflow inputs and then create and run the workflow.
:return: Promise of workflow outputs
"""
convert_remote_files(self._inputs, file_store.jobStore, self._target.name, self._path, self._skip_remote, self._wdl_options.get("execution_dir"))
root_job = WDLStartJob(self._target, self._inputs, wdl_options=self._wdl_options)
self.addChild(root_job)
return root_job.rv()


def make_root_job(target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, inputs_search_path: List[str], toil: Toil, wdl_options: WDLContext, options: Namespace) -> WDLSectionJob:
if options.run_imports_on_workers:
# Run WDL imports on a worker instead
root_job: WDLSectionJob = WDLImportJob(target, inputs, wdl_options=wdl_options, path=inputs_search_path, skip_remote=options.reference_inputs, disk_size=options.import_workers_disk)
else:
# Run WDL imports on leader
# Import any files in the bindings
convert_remote_files(inputs, toil._jobStore, target.name, inputs_search_path, import_remote_files=options.reference_inputs)
# Run the workflow and get its outputs namespaced with the workflow name.
root_job = WDLStartJob(target, inputs, wdl_options=wdl_options)
return root_job


@report_wdl_errors("run workflow", exit=True)
def main() -> None:
Expand All @@ -3684,6 +3724,10 @@ def main() -> None:
# TODO: Move cwltoil's generate_default_job_store where we can use it
options.jobStore = os.path.join(mkdtemp(), 'tree')

# Take care of incompatible arguments related to file imports
if options.run_imports_on_workers is True and options.import_workers_disk is None:
raise RuntimeError("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.")

# Make sure we have an output directory (or URL prefix) and we don't need
# to ever worry about a None, and MyPy knows it.
# If we don't have a directory assigned, make one in the current directory.
Expand Down Expand Up @@ -3779,15 +3823,15 @@ def main() -> None:
# Get the execution directory
execution_dir = os.getcwd()

convert_remote_files(input_bindings, toil, task_path=target.name, search_paths=inputs_search_path, import_remote_files=options.reference_inputs)
convert_remote_files(input_bindings, toil._jobStore, task_path=target.name, search_paths=inputs_search_path, import_remote_files=options.reference_inputs)

# Configure workflow interpreter options
wdl_options: WDLContext = {"execution_dir": execution_dir, "container": options.container, "task_path": target.name,
"namespace": target.name, "all_call_outputs": options.all_call_outputs}
assert wdl_options.get("container") is not None

# Run the workflow and get its outputs namespaced with the workflow name.
root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options, local=True)
root_job = make_root_job(target, input_bindings, inputs_search_path, toil, wdl_options, options)
output_bindings = toil.start(root_job)
if not isinstance(output_bindings, WDL.Env.Bindings):
raise RuntimeError("The output of the WDL job is not a binding.")
Expand Down