diff --git a/src/toil/common.py b/src/toil/common.py index 056196e088..130f541e40 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -56,6 +56,7 @@ from toil.options.common import add_base_toil_options, JOBSTORE_HELP from toil.options.cwl import add_cwl_options +from toil.options.runner import add_runner_options from toil.options.wdl import add_wdl_options if sys.version_info >= (3, 8): @@ -582,6 +583,12 @@ def create_config_dict_from_parser(parser: ArgumentParser) -> CommentedMap: "`toil config ~/.toil/default.yaml`.\n\nBASE TOIL OPTIONS\n") all_data.append(toil_base_data) + parser = ArgParser(YAMLConfigFileParser()) + add_runner_options(parser) + toil_cwl_data = create_config_dict_from_parser(parser) + toil_cwl_data.yaml_set_start_comment("\nTOIL SHARED CWL AND WDL RUNNER OPTIONS") + all_data.append(toil_cwl_data) + parser = ArgParser(YAMLConfigFileParser()) add_cwl_options(parser) toil_cwl_data = create_config_dict_from_parser(parser) @@ -616,10 +623,10 @@ def create_config_dict_from_parser(parser: ArgumentParser) -> CommentedMap: def parser_with_common_options( - provisioner_options: bool = False, - jobstore_option: bool = True, - prog: Optional[str] = None, - default_log_level: Optional[int] = None + provisioner_options: bool = False, + jobstore_option: bool = True, + prog: Optional[str] = None, + default_log_level: Optional[int] = None ) -> ArgParser: parser = ArgParser(prog=prog or "Toil", formatter_class=ArgumentDefaultsHelpFormatter) @@ -694,6 +701,8 @@ def addOptions(parser: ArgumentParser, jobstore_as_flag: bool = False, cwl: bool # This is done so the config file can hold all available options add_cwl_options(parser, suppress=not cwl) add_wdl_options(parser, suppress=not wdl) + # Add shared runner options + add_runner_options(parser) def check_arguments(typ: str) -> None: """ @@ -707,6 +716,7 @@ def check_arguments(typ: str) -> None: add_cwl_options(check_parser) if typ == "cwl": add_wdl_options(check_parser) + add_runner_options(check_parser) for action in check_parser._actions: action.default = SUPPRESS other_options, _ = check_parser.parse_known_args(sys.argv[1:], ignore_help_args=True) @@ -912,7 +922,6 @@ def start(self, rootJob: "Job") -> Any: # Check that the rootJob has been initialized rootJob.check_initialized() - # Write shared files to the job store self._jobStore.write_leader_pid() self._jobStore.write_leader_node_id() diff --git a/src/toil/options/cwl.py b/src/toil/options/cwl.py index a606db34e5..0db2c80889 100644 --- a/src/toil/options/cwl.py +++ b/src/toil/options/cwl.py @@ -2,7 +2,6 @@ from argparse import ArgumentParser from configargparse import SUPPRESS -from toil.lib.conversions import human2bytes from toil.version import baseVersion @@ -282,21 +281,6 @@ def add_cwl_options(parser: ArgumentParser, suppress: bool = True) -> None: help=suppress_help or "Disable file streaming for files that have 'streamable' flag True", dest="disable_streaming", ) - parser.add_argument( - "--runImportsOnWorkers", "--run-imports-on-workers", - action="store_true", - default=False, - help=suppress_help or "Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance." - "If set to true, the argument --importWorkersDisk must also be set.", - dest="run_imports_on_workers" - ) - - parser.add_argument("--importWorkersDisk", "--import-workers-disk", - help=suppress_help or "Specify the amount of disk space an import worker will use. If file streaming for input files is not available," - "this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.", - dest="import_workers_disk", - type=lambda x: human2bytes(str(x)), - default=None) provgroup = parser.add_argument_group( "Options for recording provenance information of the execution" diff --git a/src/toil/options/runner.py b/src/toil/options/runner.py new file mode 100644 index 0000000000..bb82cdab02 --- /dev/null +++ b/src/toil/options/runner.py @@ -0,0 +1,25 @@ +from argparse import ArgumentParser +from toil.lib.conversions import human2bytes + + +def add_runner_options(parser: ArgumentParser, cwl: bool = False, wdl: bool = False) -> None: + """ + Add to the WDL or CWL runners options that are shared or the same between runners + :param parser: parser to add arguments to + :param cwl: bool + :param wdl: bool + :return: None + """ + # This function should be constructed so that even when wdl and cwl are false, the "default" options are still added + run_imports_on_workers_arguments = ["--runImportsOnWorkers"] + if cwl: + run_imports_on_workers_arguments.append("--run-imports-on-workers") + parser.add_argument(*run_imports_on_workers_arguments, action="store_true", default=False, dest="run_imports_on_workers", + help="Run the file imports on a worker instead of the leader. This is useful if the leader is not optimized for high network performance. " + "If set to true, the argument --importWorkersDisk must also be set.") + import_workers_disk_arguments = ["--importWorkersDisk"] + if cwl: + import_workers_disk_arguments.append("--import-workers-disk") + parser.add_argument(*import_workers_disk_arguments, dest="import_workers_disk", type=lambda x: human2bytes(str(x)), default=None, + help="Specify the amount of disk space an import worker will use. If file streaming for input files is not available, " + "this should be set to the size of the largest input file. This must be set in conjunction with the argument runImportsOnWorkers.") diff --git a/src/toil/test/wdl/wdltoil_test.py b/src/toil/test/wdl/wdltoil_test.py index c7ba29626c..0dadefb479 100644 --- a/src/toil/test/wdl/wdltoil_test.py +++ b/src/toil/test/wdl/wdltoil_test.py @@ -400,8 +400,18 @@ def test_giraffe(self): json_file = f"{base_uri}/params/giraffe.json" result_json = subprocess.check_output( - self.base_command + [wdl_file, json_file, '-o', self.output_dir, '--outputDialect', 'miniwdl', '--scale', - '0.1']) + self.base_command + [ + wdl_file, + json_file, + '-o', + self.output_dir, + '--outputDialect', + 'miniwdl', + '--scale', + '0.1', + '--logDebug', + ] + ) result = json.loads(result_json) # Expect MiniWDL-style output with a designated "dir" diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 9d04da92a0..4e1367602b 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -62,7 +62,7 @@ import WDL.Error import WDL.runtime.config -from configargparse import ArgParser +from configargparse import ArgParser, Namespace from WDL._util import byte_size_units, chmod_R_plus from WDL.Tree import ReadSourceResult from WDL.CLI import print_error @@ -83,7 +83,8 @@ TemporaryID, parse_accelerator, unwrap, - unwrap_all) + unwrap_all, + ParseableIndivisibleResource) from toil.jobStores.abstractJobStore import (AbstractJobStore, UnimplementedURLException, InvalidImportExportUrlException, LocatorException) from toil.lib.accelerators import get_individual_local_accelerators @@ -351,7 +352,7 @@ def first_mismatch(prefix: str, value: str) -> int: return modified -def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None) -> Iterator[str]: +def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tree.Document] = None, execution_dir: Optional[str] = None) -> Iterator[str]: """ Get potential absolute URIs to check for an imported file. @@ -393,7 +394,8 @@ def potential_absolute_uris(uri: str, path: List[str], importer: Optional[WDL.Tr full_path_list.append(Toil.normalize_uri(importer.pos.abspath)) # Then the current directory. We need to make sure to include a filename component here or it will treat the current directory with no trailing / as a document and relative paths will look 1 level up. - full_path_list.append(Toil.normalize_uri('.') + '/.') + # When importing on a worker, the cwd will be a tmpdir and will result in FileNotFoundError after os.path.abspath, so override with the execution dir + full_path_list.append(Toil.normalize_uri(execution_dir or '.') + '/.') # Then the specified paths. # TODO: @@ -789,10 +791,10 @@ def is_toil_url(filename: str) -> bool: def is_standard_url(filename: str) -> bool: - return is_url(filename, ['http:', 'https:', 's3:', 'gs:']) + return is_url(filename, ['http:', 'https:', 's3:', 'gs:', 'ftp:']) -def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', TOIL_URI_SCHEME]) -> bool: +def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', 'ftp:', TOIL_URI_SCHEME]) -> bool: """ Decide if a filename is a known kind of URL """ @@ -801,7 +803,8 @@ def is_url(filename: str, schemes: List[str]=['http:', 'https:', 's3:', 'gs:', T return True return False -def convert_remote_files(environment: WDLBindings, file_source: Toil, task_path: str, search_paths: Optional[List[str]] = None, import_remote_files: bool = True) -> None: +def convert_remote_files(environment: WDLBindings, file_source: AbstractJobStore, task_path: str, search_paths: Optional[List[str]] = None, import_remote_files: bool = True, + execution_dir: Optional[str] = None) -> None: """ Resolve relative-URI files in the given environment and import all files. @@ -827,7 +830,7 @@ def import_filename(filename: str) -> Tuple[Optional[str], Optional[str]]: """ # Search through any input search paths passed in and download it if found tried = [] - for candidate_uri in potential_absolute_uris(filename, search_paths if search_paths is not None else []): + for candidate_uri in potential_absolute_uris(filename, search_paths if search_paths is not None else [], execution_dir=execution_dir): tried.append(candidate_uri) try: if not import_remote_files and is_url(candidate_uri): @@ -840,12 +843,8 @@ def import_filename(filename: str) -> Tuple[Optional[str], Optional[str]]: return candidate_uri, None else: # Actually import - # Try to import the file. Don't raise if we can't find it, just - # return None! - imported = file_source.import_file(candidate_uri, check_existence=False) - if imported is None: - # Wasn't found there - continue + # Try to import the file. If we can't find it, continue + imported = file_source.import_file(candidate_uri) except UnimplementedURLException as e: # We can't find anything that can even support this URL scheme. # Report to the user, they are probably missing an extra. @@ -856,6 +855,9 @@ def import_filename(filename: str) -> Tuple[Optional[str], Optional[str]]: logger.warning("Checked URL %s but got HTTP status %s", candidate_uri, e.code) # Try the next location. continue + except FileNotFoundError: + # Wasn't found there + continue except Exception: # Something went wrong besides the file not being found. Maybe # we have no auth. @@ -3630,7 +3632,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: return self.postprocess(output_bindings) -class WDLRootJob(WDLSectionJob): +class WDLStartJob(WDLSectionJob): """ Job that evaluates an entire WDL workflow, and returns the workflow outputs namespaced with the workflow name. Inputs may or may not be namespaced with @@ -3666,6 +3668,44 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: self.defer_postprocessing(job) return job.rv() +class WDLImportJob(WDLSectionJob): + def __init__(self, target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, wdl_options: WDLContext, path: Optional[List[str]] = None, skip_remote: bool = False, + disk_size: Optional[ParseableIndivisibleResource] = None, **kwargs: Any): + """ + Job to take the inputs from the WDL workflow and import them on a worker instead of a leader. Assumes all local and cloud files are accessible. + + This class is only used when runImportsOnWorkers is enabled. + """ + super().__init__(wdl_options=wdl_options, local=False, + disk=disk_size, **kwargs) + self._target = target + self._inputs = inputs + self._path = path + self._skip_remote = skip_remote + + def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: + """ + Import the workflow inputs and then create and run the workflow. + :return: Promise of workflow outputs + """ + convert_remote_files(self._inputs, file_store.jobStore, self._target.name, self._path, self._skip_remote, self._wdl_options.get("execution_dir")) + root_job = WDLStartJob(self._target, self._inputs, wdl_options=self._wdl_options) + self.addChild(root_job) + return root_job.rv() + + +def make_root_job(target: Union[WDL.Tree.Workflow, WDL.Tree.Task], inputs: WDLBindings, inputs_search_path: List[str], toil: Toil, wdl_options: WDLContext, options: Namespace) -> WDLSectionJob: + if options.run_imports_on_workers: + # Run WDL imports on a worker instead + root_job: WDLSectionJob = WDLImportJob(target, inputs, wdl_options=wdl_options, path=inputs_search_path, skip_remote=options.reference_inputs, disk_size=options.import_workers_disk) + else: + # Run WDL imports on leader + # Import any files in the bindings + convert_remote_files(inputs, toil._jobStore, target.name, inputs_search_path, import_remote_files=options.reference_inputs) + # Run the workflow and get its outputs namespaced with the workflow name. + root_job = WDLStartJob(target, inputs, wdl_options=wdl_options) + return root_job + @report_wdl_errors("run workflow", exit=True) def main() -> None: @@ -3684,6 +3724,10 @@ def main() -> None: # TODO: Move cwltoil's generate_default_job_store where we can use it options.jobStore = os.path.join(mkdtemp(), 'tree') + # Take care of incompatible arguments related to file imports + if options.run_imports_on_workers is True and options.import_workers_disk is None: + raise RuntimeError("Commandline arguments --runImportsOnWorkers and --importWorkersDisk must both be set to run file imports on workers.") + # Make sure we have an output directory (or URL prefix) and we don't need # to ever worry about a None, and MyPy knows it. # If we don't have a directory assigned, make one in the current directory. @@ -3779,7 +3823,7 @@ def main() -> None: # Get the execution directory execution_dir = os.getcwd() - convert_remote_files(input_bindings, toil, task_path=target.name, search_paths=inputs_search_path, import_remote_files=options.reference_inputs) + convert_remote_files(input_bindings, toil._jobStore, task_path=target.name, search_paths=inputs_search_path, import_remote_files=options.reference_inputs) # Configure workflow interpreter options wdl_options: WDLContext = {"execution_dir": execution_dir, "container": options.container, "task_path": target.name, @@ -3787,7 +3831,7 @@ def main() -> None: assert wdl_options.get("container") is not None # Run the workflow and get its outputs namespaced with the workflow name. - root_job = WDLRootJob(target, input_bindings, wdl_options=wdl_options, local=True) + root_job = make_root_job(target, input_bindings, inputs_search_path, toil, wdl_options, options) output_bindings = toil.start(root_job) if not isinstance(output_bindings, WDL.Env.Bindings): raise RuntimeError("The output of the WDL job is not a binding.")