From 03177ce8e31674e3f887c2da485c014d6bd2533f Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 22 Aug 2024 12:23:31 -0400 Subject: [PATCH 01/16] Start on helpers for working with MiniWDL's call cache --- src/toil/wdl/wdltoil.py | 46 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 2c7e155b23..be3c7f2b64 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -633,6 +633,52 @@ def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str, str, str]: return file_id, task_path, parent_id, file_basename +## +# Caching machinery +## + +# TODO: Move to new file? + +def get_shared_fs_path(file: WDL.Value.File) -> Optional[str]: + """ + If a File has a shared filesystem path, get that path. + + This will be the path the File was initially imported from, or the path that it has in the call cache. + """ + # TODO: We hide this on the value string itself so we can + # map_over_files_in_bindings and fetch it out. But really we should allow + # mapping over bindings and getting the WDL.Value.Base objects and hide it + # in there. + if hasattr(file.value, '_shared_fs_path'): + return cast(str, getattr(file.value, '_shared_fs_path')) + return None + +def set_shared_fs_path(file: WDL.Value.File, path: str) -> None: + """ + Mutate a File to associate it with the given shared filesystem path. + + This should be the path it was initially imported from, or the path that it has in the call cache. + """ + setattr(file.value, '_shared_fs_path', path) + +def get_miniwdl_input_digest(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> str: + """ + Get a digest for looking up the task call with the given inputs. + + Represents all files by their shared filesystem paths so that cache entries written by MiniWDL can be used by Toil. + """ + + def file_path_to_use(stored_file_string: str) -> str: + """ + Return the shared FS path if we have one, or the current string. + """ + if hasattr(stored_file_string, '_shared_fs_path'): + return cast(str, getattr(stored_file_string, '_shared_fs_path')) + return stored_file_string + + transformed_bindings = map_over_files_in_bindings(bindings, file_path_to_use) + return WDL.Value.digest_env(transformed_bindings) + DirectoryNamingStateDict = Dict[str, Tuple[Dict[str, str], Set[str]]] def choose_human_readable_directory(root_dir: str, source_task_path: str, parent_id: str, state: DirectoryNamingStateDict) -> str: From 6d302000cdcb224b7437384d95c7e4f3c22d4d07 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 12 Sep 2024 18:00:03 -0400 Subject: [PATCH 02/16] Add cache put and get but no real handling of files --- src/toil/wdl/wdltoil.py | 39 ++++++++++++++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index b6e90c36d5..a1bb251f66 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1926,6 +1926,25 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs. # For a task we are only passed the inside-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) + + # At this point we have what MiniWDL would call the "inputs" to the + # task (i.e. what you would put in a JSON file, without any defaulted + # or calculated inputs filled in). So start making cache keys. + input_digest = get_miniwdl_input_digest(bindings) + task_digest = self._task.digest + cache_key=f"{self._task.name}/{task_digest}/{input_digest}" + miniwdl_logger = logging.getLogger("MiniWDL") + # TODO: Ship config from leader? It might not see the right environment. + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + cached_result: Optional[WDLBindings] = miniwdl_cache.get(cache_key, bindings, self._task.effective_outputs) + if cached_result is not None: + logger.info("Found task call in cache") + return self.postprocess(cached_result) + else: + logger.debug("No cache hit for %s", cache_key) + # Otherwise we need to run the actual command. + # Set up the WDL standard library # UUID to use for virtualizing files standard_library = ToilWDLStdLibBase(file_store, self._task_path) @@ -2031,7 +2050,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: runtime_accelerators = [accelerator_requirement] # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) + run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options, cache_key=cache_key) # Run that as a child self.addChild(run_job) @@ -2054,7 +2073,7 @@ class WDLTaskJob(WDLBaseJob): All bindings are in terms of task-internal names. """ - def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, **kwargs: Any) -> None: + def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, cache_key: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a task. @@ -2063,11 +2082,12 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind :param task_path: Like the namespace, but including subscript numbers for scatters. + + :param cache_key: Key to save outputs under in MiniWDL-compatible + cache. Cached outputs will not be postprocessed. """ # This job should not be local because it represents a real workflow task. - # TODO: Instead of re-scheduling with more resources, add a local - # "wrapper" job like CWL uses to determine the actual requirements. super().__init__(unitName=task_path + ".command", displayName=namespace + ".command", local=False, **kwargs) logger.info("Preparing to run task %s as %s", task.name, namespace) @@ -2078,6 +2098,7 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind self._task_id = task_id self._namespace = namespace self._task_path = task_path + self._cache_key = cache_key ### # Runtime code injection system @@ -2317,7 +2338,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: raise RuntimeError(f"Could not find a working container engine to use; told to use {self._wdl_options.get('container')}") # Set up the MiniWDL container running stuff - miniwdl_logger = logging.getLogger("MiniWDLContainers") + miniwdl_logger = logging.getLogger("MiniWDL") miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) if not getattr(TaskContainerImplementation, 'toil_initialized__', False): # Initialize the cointainer system @@ -2579,6 +2600,14 @@ def get_path_in_container(path: str) -> Optional[str]: # Upload any files in the outputs if not uploaded already. Accounts for how relative paths may still need to be container-relative. output_bindings = virtualize_files(output_bindings, outputs_library) + if self._cache_key is not None: + # Offer to save the not-yet-postprocessed result to the cache. + miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + # TODO: We probably need to rewrite the file values and save copies + # of them outside the doomed jobstore + miniwdl_cache.put(self._cache_key, output_bindings) + logger.debug("Saved result to cache under %s", self._cache_key) + # Do postprocessing steps to e.g. apply namespaces. output_bindings = self.postprocess(output_bindings) From d48c7dd51441910b49692177f1c714f3bbdb8baf Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 26 Sep 2024 14:01:42 -0400 Subject: [PATCH 03/16] Hide Toil files in MiniWDL cache and assign cache file paths --- src/toil/wdl/wdltoil.py | 139 ++++++++++++++++++++++++++++++++-------- 1 file changed, 111 insertions(+), 28 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index c298191434..f18285d6a6 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -631,27 +631,41 @@ def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str, str, str]: # TODO: Move to new file? -def get_shared_fs_path(file: WDL.Value.File) -> Optional[str]: +SHARED_PATH_ATTR = "_shared_fs_path" + +def get_shared_fs_path(file: Union[str, WDL.Value.File]) -> Optional[str]: """ If a File has a shared filesystem path, get that path. This will be the path the File was initially imported from, or the path that it has in the call cache. + + Accepts either a WDL-level File or the actual str value of one. """ # TODO: We hide this on the value string itself so we can # map_over_files_in_bindings and fetch it out. But really we should allow # mapping over bindings and getting the WDL.Value.Base objects and hide it # in there. - if hasattr(file.value, '_shared_fs_path'): - return cast(str, getattr(file.value, '_shared_fs_path')) + if isinstance(file, WDL.Value.File): + file_value = file.value + else: + file_value = file + if hasattr(file_value, SHARED_PATH_ATTR): + return cast(str, getattr(file_value, SHARED_PATH_ATTR)) return None -def set_shared_fs_path(file: WDL.Value.File, path: str) -> None: +def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> None: """ Mutate a File to associate it with the given shared filesystem path. This should be the path it was initially imported from, or the path that it has in the call cache. + + Accepts either a WDL-level File or the actual str value of one. """ - setattr(file.value, '_shared_fs_path', path) + if isinstance(file, WDL.Value.File): + file_value = file.value + else: + file_value = file + setattr(file_value, SHARED_PATH_ATTR, path) def get_miniwdl_input_digest(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> str: """ @@ -664,9 +678,7 @@ def file_path_to_use(stored_file_string: str) -> str: """ Return the shared FS path if we have one, or the current string. """ - if hasattr(stored_file_string, '_shared_fs_path'): - return cast(str, getattr(stored_file_string, '_shared_fs_path')) - return stored_file_string + return get_shared_fs_path(stored_file_string) or stored_file_string transformed_bindings = map_over_files_in_bindings(bindings, file_path_to_use) return WDL.Value.digest_env(transformed_bindings) @@ -934,7 +946,8 @@ def devirtualize_to( state: DirectoryNamingStateDict, devirtualized_to_virtualized: Optional[Dict[str, str]] = None, virtualized_to_devirtualized: Optional[Dict[str, str]] = None, - enforce_existence: bool = True + enforce_existence: bool = True, + export: bool = False ) -> str: """ Download or export a WDL virtualized filename/URL to the given directory. @@ -945,14 +958,16 @@ def devirtualize_to( don't clobber each other. Called from within this class for tasks, and statically at the end of the workflow for outputs. - Returns the local path to the file. If it already had a local path - elsewhere, it might not actually be put in dest_dir. + Returns the local path to the file. If the file is already a local + path, or if it already has an entry in virtualized_to_devirtualized, + that path will be re-used instead of creating a new copy in dest_dir. The input filename could already be devirtualized. In this case, the filename - should not be added to the cache + should not be added to the cache. :param state: State dict which must be shared among successive calls into a dest_dir. :param enforce_existence: Raise an error if the file is nonexistent. Else, let it pass through. + :param export: Always create exported copies of files rather than views that a FileStore might clean up. """ if not os.path.isdir(dest_dir): @@ -962,7 +977,6 @@ def devirtualize_to( raise RuntimeError(f"Cannot devirtualize {filename} into nonexistent directory {dest_dir}") # TODO: Support people doing path operations (join, split, get parent directory) on the virtualized filenames. - # TODO: For task inputs, we are supposed to make sure to put things in the same directory if they came from the same directory. See if is_url(filename): if virtualized_to_devirtualized is not None and filename in virtualized_to_devirtualized: # The virtualized file is in the cache, so grab the already devirtualized result @@ -999,17 +1013,20 @@ def devirtualize_to( if filename.startswith(TOIL_URI_SCHEME): # Get a local path to the file - if isinstance(file_source, AbstractFileStore): + if isinstance(file_source, Toil) or export: + # Use the export method that both Toil and + # AbstractFileStore have to get an unencumbered copy. + file_source.export_file(file_id, dest_path) + result = dest_path + elif isinstance(file_source, AbstractFileStore): # Read from the file store. # File is not allowed to be modified by the task. See # . # We try to get away with symlinks and hope the task # container can mount the destination file. result = file_source.readGlobalFile(file_id, dest_path, mutable=False, symlink=True) - elif isinstance(file_source, Toil): - # Read from the Toil context - file_source.export_file(file_id, dest_path) - result = dest_path + else: + raise RuntimeError(f"Unsupported file source: {file_source}") else: # Download to a local file with the right name and execute bit. # Open it exclusively @@ -1567,6 +1584,9 @@ def import_file_from_uri(uri: str) -> str: # download them at that basename later. raise RuntimeError(f"File {candidate_uri} has no basename and so cannot be a WDL File") + # If the file has a local path outside the job store, we want to know it for caching + local_path: Optional[str] = None + # Was actually found if is_url(candidate_uri): # Might be a file URI or other URI. @@ -1575,18 +1595,25 @@ def import_file_from_uri(uri: str) -> str: parsed = urlsplit(candidate_uri) if parsed.scheme == "file:": # This is a local file URI. Convert to a path for source directory tracking. - parent_dir = os.path.dirname(unquote(parsed.path)) + local_path = unquote(parsed.path) + parent_dir = os.path.dirname(local_path) else: # This is some other URL. Get the URL to the parent directory and use that. parent_dir = urljoin(candidate_uri, ".") else: # Must be a local path + local_path = candidate_uri parent_dir = os.path.dirname(candidate_uri) # Pack a UUID of the parent directory dir_id = path_to_id.setdefault(parent_dir, uuid.uuid4()) - return pack_toil_uri(imported, task_path, dir_id, file_basename) + file_value = pack_toil_uri(imported, task_path, dir_id, file_basename) + if local_path is not None: + # Mark the file as having a known local path outside the jobstore. + # TODO: Turn URLs into local files like the MiniWDL download cache. + set_shared_fs_path(file_value, local_path) + return file_value # If we get here we tried all the candidates raise RuntimeError(f"Could not find {uri} at any of: {tried}") @@ -2297,7 +2324,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # A current limitation with the singularity/miniwdl cache is it cannot check for image updates if the # filename is the same singularity_cache = os.path.join(os.path.expanduser("~"), ".singularity") - miniwdl_cache = os.path.join(os.path.expanduser("~"), ".cache/miniwdl") + miniwdl_singularity_cache = os.path.join(os.path.expanduser("~"), ".cache/miniwdl") # Cache Singularity's layers somewhere known to have space os.environ['SINGULARITY_CACHEDIR'] = os.environ.get("SINGULARITY_CACHEDIR", singularity_cache) @@ -2308,7 +2335,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Cache Singularity images for the workflow on this machine. # Since MiniWDL does only within-process synchronization for pulls, # we also will need to pre-pull one image into here at a time. - os.environ['MINIWDL__SINGULARITY__IMAGE_CACHE'] = os.environ.get("MINIWDL__SINGULARITY__IMAGE_CACHE", miniwdl_cache) + os.environ['MINIWDL__SINGULARITY__IMAGE_CACHE'] = os.environ.get("MINIWDL__SINGULARITY__IMAGE_CACHE", miniwdl_singularity_cache) # Make sure it exists. os.makedirs(os.environ['MINIWDL__SINGULARITY__IMAGE_CACHE'], exist_ok=True) @@ -2588,16 +2615,72 @@ def get_path_in_container(path: str) -> Optional[str]: # next task. raise WDL.Error.EvalError(decl, f"non-optional value {decl.name} = {decl.expr} is missing") - # Upload any files in the outputs if not uploaded already. Accounts for how relative paths may still need to be container-relative. + # Upload any files in the outputs if not uploaded already. Accounts for + # how relative paths may still need to be container-relative. output_bindings = virtualize_files(output_bindings, outputs_library) if self._cache_key is not None: - # Offer to save the not-yet-postprocessed result to the cache. + # We might need to save to the call cache miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) - # TODO: We probably need to rewrite the file values and save copies - # of them outside the doomed jobstore - miniwdl_cache.put(self._cache_key, output_bindings) - logger.debug("Saved result to cache under %s", self._cache_key) + if miniwdl_cache._cfg["call_cache"].get_bool("put"): + # Saving to the cache is on. If we ran somethign we write it to the cache. + # TODO: Get MiniWDL to expose this. + + # Set up deduplication just for these outputs. + devirtualization_state: DirectoryNamingStateDict = {} + devirtualized_to_virtualized: Dict[str, str] = dict() + virtualized_to_devirtualized: Dict[str, str] = dict() + # TODO: if a URL is passed through multiple tasks it will be saved multiple times. Also save on input??? + + # Determine where we will save our cached versions of files. + output_directory = os.path.join(miniwdl_cache._call_cache_dir, "toil_files") + + # Adjust all files in the output bindings to have shared FS paths outside the job store. + def assign_shared_fs_path(file_value: str) -> str: + """ + Given the string value inside a File, mutate the File to have a shared FS path outside the jobstore. + + Returns the string passed in. + """ + + # TODO: Change to using File objects when required PR with + # the mapping functions for that is finally merged. + + # We need all the incoming paths to not already be local + # paths, or devirtualizing them to export them will not + # work. + # + # This ought to be the case because we just virtualized + # them all for transport out of the machine. + if not is_url(file_value): + raise RuntimeError("File {file_value} caught escaping from task unvirtualized") + + if get_shared_fs_path(file_value) is None: + # We need to save this file somewhere. + exported_path = ToilWDLStdLibBase.devirtualize_to( + file_value, + output_directory, + file_store, + self._wdl_options.get("execution_dir"), + devirtualization_state, + devirtualized_to_virtualized, + virtualized_to_devirtualized, + enforce_existence=False, + export=True + ) + + # Remember where it went + set_shared_fs_path(file_value, exported_path) + + return file_value + output_bindings = map_over_files_in_bindings(output_bindings, assign_shared_fs_path) + + # Save the bindings to the cache + miniwdl_cache.put(self._cache_key, output_bindings) + logger.debug("Saved result to cache under %s", self._cache_key) + + # Keep using the transformed bindings so that later tasks use + # the cached files in their input digests. # Do postprocessing steps to e.g. apply namespaces. output_bindings = self.postprocess(output_bindings) From 074c86514070a985a1af403c85e186bc61fcd641 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 26 Sep 2024 15:02:04 -0400 Subject: [PATCH 04/16] Interpose a str subclass but still put toilfile URIs in cache somehow --- src/toil/jobStores/abstractJobStore.py | 4 +++- src/toil/wdl/wdltoil.py | 23 ++++++++++++++++++++--- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index 99291da052..d909248ed2 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -506,8 +506,10 @@ def export_file(self, file_id: FileID, dst_uri: str) -> None: :param str file_id: The id of the file in the job store that should be exported. :param str dst_uri: URL that points to a file or object in the storage mechanism of a - supported URL scheme e.g. a blob in an AWS s3 bucket. + supported URL scheme e.g. a blob in an AWS s3 bucket. May also be a local path. """ + from toil.common import Toil + dst_uri = Toil.normalize_uri(dst_uri) parseResult = urlparse(dst_uri) otherCls = self._findJobStoreForUrl(parseResult, export=True) self._export_file(otherCls, file_id, parseResult) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index f18285d6a6..c3854ea31f 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -111,6 +111,7 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] UnimplementedURLException, JobTooBigError ) as e: + logger.exception(e) # Don't expose tracebacks to the user for exceptions that may be expected log("Could not " + task + " because:") @@ -631,8 +632,15 @@ def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str, str, str]: # TODO: Move to new file? +# We store the shared FS path in an attribute on the string value in the WDL File. +# TODO: When we can map and get the File, change this. SHARED_PATH_ATTR = "_shared_fs_path" +# Since you can't actually *set* an attribute on a str, we need to have a +# non-builtin str subclass to make the values be. +class AttrStr(str): + pass + def get_shared_fs_path(file: Union[str, WDL.Value.File]) -> Optional[str]: """ If a File has a shared filesystem path, get that path. @@ -660,12 +668,19 @@ def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> None: This should be the path it was initially imported from, or the path that it has in the call cache. Accepts either a WDL-level File or the actual str value of one. + + Returns a str that has to be assigned back to the WDL File's value, which + may be the same one. """ if isinstance(file, WDL.Value.File): file_value = file.value else: file_value = file + if not isinstance(file_value, AttrStr): + # Make it be a str subclass we can set attributes on + file_value = AttrStr(file_value) setattr(file_value, SHARED_PATH_ATTR, path) + return file_value def get_miniwdl_input_digest(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> str: """ @@ -1612,7 +1627,7 @@ def import_file_from_uri(uri: str) -> str: if local_path is not None: # Mark the file as having a known local path outside the jobstore. # TODO: Turn URLs into local files like the MiniWDL download cache. - set_shared_fs_path(file_value, local_path) + file_value = set_shared_fs_path(file_value, local_path) return file_value # If we get here we tried all the candidates @@ -2634,13 +2649,15 @@ def get_path_in_container(path: str) -> Optional[str]: # Determine where we will save our cached versions of files. output_directory = os.path.join(miniwdl_cache._call_cache_dir, "toil_files") + # This needs to exist before we can export to it + os.makedirs(output_directory, exist_ok=True) # Adjust all files in the output bindings to have shared FS paths outside the job store. def assign_shared_fs_path(file_value: str) -> str: """ Given the string value inside a File, mutate the File to have a shared FS path outside the jobstore. - Returns the string passed in. + Returns the value to put in the WDL file to actually do the mutation. """ # TODO: Change to using File objects when required PR with @@ -2670,7 +2687,7 @@ def assign_shared_fs_path(file_value: str) -> str: ) # Remember where it went - set_shared_fs_path(file_value, exported_path) + file_value = set_shared_fs_path(file_value, exported_path) return file_value output_bindings = map_over_files_in_bindings(output_bindings, assign_shared_fs_path) From e38eeacc54691b8692ae06e4793e563e202f8556 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 26 Sep 2024 18:35:48 -0400 Subject: [PATCH 05/16] Get cache to interoperate with MiniWDL for simple file cases --- src/toil/wdl/wdltoil.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index c3854ea31f..672544754c 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -682,21 +682,27 @@ def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> None: setattr(file_value, SHARED_PATH_ATTR, path) return file_value -def get_miniwdl_input_digest(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> str: +def view_shared_fs_paths(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> WDL.Env.Bindings[WDL.Value.Base]: """ - Get a digest for looking up the task call with the given inputs. - - Represents all files by their shared filesystem paths so that cache entries written by MiniWDL can be used by Toil. + Given WDL bindings, return a copy where all files have their shared filesystem paths as their values. """ - def file_path_to_use(stored_file_string: str) -> str: """ Return the shared FS path if we have one, or the current string. """ return get_shared_fs_path(stored_file_string) or stored_file_string - transformed_bindings = map_over_files_in_bindings(bindings, file_path_to_use) - return WDL.Value.digest_env(transformed_bindings) + return map_over_files_in_bindings(bindings, file_path_to_use) + + +def get_miniwdl_input_digest(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> str: + """ + Get a digest for looking up the task call with the given inputs. + + Represents all files by their shared filesystem paths so that cache entries written by MiniWDL can be used by Toil. + """ + + return WDL.Value.digest_env(view_shared_fs_paths(bindings)) DirectoryNamingStateDict = Dict[str, Tuple[Dict[str, str], Set[str]]] @@ -2648,7 +2654,7 @@ def get_path_in_container(path: str) -> Optional[str]: # TODO: if a URL is passed through multiple tasks it will be saved multiple times. Also save on input??? # Determine where we will save our cached versions of files. - output_directory = os.path.join(miniwdl_cache._call_cache_dir, "toil_files") + output_directory = os.path.join(miniwdl_cache._call_cache_dir, self._cache_key) # This needs to exist before we can export to it os.makedirs(output_directory, exist_ok=True) @@ -2692,8 +2698,8 @@ def assign_shared_fs_path(file_value: str) -> str: return file_value output_bindings = map_over_files_in_bindings(output_bindings, assign_shared_fs_path) - # Save the bindings to the cache - miniwdl_cache.put(self._cache_key, output_bindings) + # Save the bindings to the cache, representing all files with their shared filesystem paths. + miniwdl_cache.put(self._cache_key, view_shared_fs_paths(output_bindings)) logger.debug("Saved result to cache under %s", self._cache_key) # Keep using the transformed bindings so that later tasks use From 99c68f31d3472da2475137caef8923871942b988 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 26 Sep 2024 22:16:10 -0400 Subject: [PATCH 06/16] Connect Toil to MiniWDL write_* file cache --- src/toil/wdl/wdltoil.py | 149 +++++++++++++++++++++++++++++++++++----- 1 file changed, 132 insertions(+), 17 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 672544754c..255a361e9d 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -36,6 +36,7 @@ Callable, Dict, Generator, + IO, Iterable, Iterator, List, @@ -86,6 +87,21 @@ from toil.lib.threading import global_mutex from toil.provisioners.clusterScaler import JobTooBigError +import hashlib +try: + from hashlib import file_digest +except ImportError: + # Polyfill file_digest from 3.11+ + def file_digest(f: IO[bytes], alg_name: str) -> hashlib._Hash: + BUFFER_SIZE = 1024 * 1024 + hasher = hashlib.new(alg_name) + buffer = f.read(BUFFER_SIZE) + while buffer: + hasher.update(buffer) + buffer = f.read(BUFFER_SIZE) + return hasher + + logger = logging.getLogger(__name__) @@ -669,8 +685,8 @@ def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> None: Accepts either a WDL-level File or the actual str value of one. - Returns a str that has to be assigned back to the WDL File's value, which - may be the same one. + Returns a str that has to be assigned back to the WDL File's value, if the + input was not a WDL File. """ if isinstance(file, WDL.Value.File): file_value = file.value @@ -680,6 +696,9 @@ def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> None: # Make it be a str subclass we can set attributes on file_value = AttrStr(file_value) setattr(file_value, SHARED_PATH_ATTR, path) + if isinstance(file, WDL.Value.File): + # Commit mutation + file.value = file_value return file_value def view_shared_fs_paths(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> WDL.Env.Bindings[WDL.Value.Base]: @@ -695,16 +714,6 @@ def file_path_to_use(stored_file_string: str) -> str: return map_over_files_in_bindings(bindings, file_path_to_use) -def get_miniwdl_input_digest(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> str: - """ - Get a digest for looking up the task call with the given inputs. - - Represents all files by their shared filesystem paths so that cache entries written by MiniWDL can be used by Toil. - """ - - return WDL.Value.digest_env(view_shared_fs_paths(bindings)) - - DirectoryNamingStateDict = Dict[str, Tuple[Dict[str, str], Set[str]]] def choose_human_readable_directory(root_dir: str, source_task_path: str, parent_id: str, state: DirectoryNamingStateDict) -> str: """ @@ -1123,6 +1132,109 @@ def _virtualize_filename(self, filename: str) -> str: self._virtualized_to_devirtualized[result] = abs_filename return result +class ToilWDLStdLibWorkflow(ToilWDLStdLibBase): + """ + Standard library implementation for workflow scope. + + Handles deduplicating files generated by write_* calls at workflow scope + with copies already in the call cache, so that tasks that depend on them + can also be fulfilled from the cache. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + # Set up MiniWDL caching for files + miniwdl_logger = logging.getLogger("MiniWDL") + # TODO: Ship config from leader? It might not see the right environment. + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + self._miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + + # This needs to be hash-compatible with MiniWDL. + # MiniWDL hooks _virtualize_filename + # , + # but we probably don't want to hash all files virtualized at workflow + # scope, just dynamic ones. + # + # TODO: Test cache compatibility with MiniWDL when a file is virtualized + # from a string at workflow scope! + def _write( + self, serialize: Callable[[WDL.Value.Base, IO[bytes]], None] + ) -> Callable[[WDL.Value.Base], WDL.Value.File]: + + # Get the normal writer + writer = super()._write(serialize) + + def wrapper(v: WDL.Value.Base) -> WDL.Value.File: + """ + Call the normal writer, and then deduplicate its result with the cache. + """ + # TODO: If we did this before the _virtualize_filename call in the + # base _write, we could let the cache bring the info between nodes + # and not need to use the job store. + + virtualized_file = writer(v) + + # TODO: If we did this before the _virtualize_filename call in the + # base _write we wouldn't need to immediately devirtualize. But we + # have internal caches to lean on. + devirtualized_filename = self._devirtualize_filename(virtualized_file.value) + # Hash the file to hex + hex_digest = file_digest(open(devirtualized_filename, "rb"), "sha256").hexdigest() + file_input_bindings = WDL.Env.Bindings(WDL.Env.Binding("file_sha256", WDL.Value.String(hex_digest))) + # Make an environment of "file_sha256" to that as a WDL string, and + # digest that, and make a write_ cache key. No need to transform to + # shared FS paths sonce no paths are in it. + log_bindings(logger.debug, "Digesting file bindings:", [file_input_bindings]) + input_digest = WDL.Value.digest_env(file_input_bindings) + file_cache_key = "write_/" + input_digest + # Construct a description of the types we expect to get from the + # cache: just a File-type variable named "file" + expected_types = WDL.Env.Bindings(WDL.Env.Binding("file", WDL.Type.File())) + # Query the cache + file_output_bindings = self._miniwdl_cache.get(file_cache_key, file_input_bindings, expected_types) + if file_output_bindings: + # File with this hash is cached. + # Adjust virtualized_file to carry that path as its local-filesystem path. + set_shared_fs_path(virtualized_file, file_output_bindings.resolve("file").value) + elif self._miniwdl_cache._cfg["call_cache"].get_bool("put"): + # Save our novel file to the cache. + + # Determine where we will save the file. + output_directory = os.path.join(self._miniwdl_cache._call_cache_dir, file_cache_key) + # This needs to exist before we can export to it + os.makedirs(output_directory, exist_ok=True) + + # Export the file to the cache. + # write_* files will never really need to being siblings, so we + # don't need any real persistent state here. + # TODO: Will they secretly be siblings on a first run? + exported_path = self.devirtualize_to( + virtualized_file.value, + output_directory, + self._file_store, + self._execution_dir, + {}, + {}, + {}, + enforce_existence=True, + export=True + ) + + # Save the cache entry pointing to it + self._miniwdl_cache.put( + file_cache_key, + WDL.Env.Bindings(WDL.Env.Binding("file", WDL.Value.File(exported_path))) + ) + + # Apply the shared filesystem path to the virtualized file + set_shared_fs_path(virtualized_file, exported_path) + + return virtualized_file + + return wrapper + + class ToilWDLStdLibTaskCommand(ToilWDLStdLibBase): """ Standard library implementation to use inside a WDL task command evaluation. @@ -1968,14 +2080,17 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # At this point we have what MiniWDL would call the "inputs" to the # task (i.e. what you would put in a JSON file, without any defaulted # or calculated inputs filled in). So start making cache keys. - input_digest = get_miniwdl_input_digest(bindings) + # But first we need to view the inputs as shared FS files. + transformed_bindings = view_shared_fs_paths(bindings) + log_bindings(logger.debug, "Digesting input bindings:", [transformed_bindings]) + input_digest = WDL.Value.digest_env(transformed_bindings) task_digest = self._task.digest cache_key=f"{self._task.name}/{task_digest}/{input_digest}" miniwdl_logger = logging.getLogger("MiniWDL") # TODO: Ship config from leader? It might not see the right environment. miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) - cached_result: Optional[WDLBindings] = miniwdl_cache.get(cache_key, bindings, self._task.effective_outputs) + cached_result: Optional[WDLBindings] = miniwdl_cache.get(cache_key, transformed_bindings, self._task.effective_outputs) if cached_result is not None: logger.info("Found task call in cache") return self.postprocess(cached_result) @@ -2740,7 +2855,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs incoming_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) + standard_library = ToilWDLStdLibWorkflow(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) with monkeypatch_coerce(standard_library): if isinstance(self._node, WDL.Tree.Decl): # This is a variable assignment @@ -2831,7 +2946,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs current_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) + standard_library = ToilWDLStdLibWorkflow(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) with monkeypatch_coerce(standard_library): for node in self._nodes: @@ -3508,7 +3623,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) + standard_library = ToilWDLStdLibWorkflow(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) if self._workflow.inputs: with monkeypatch_coerce(standard_library): From ca89abf62babd71e6e02ab936b90449c2abff62f Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 26 Sep 2024 23:12:42 -0400 Subject: [PATCH 07/16] Satisfy MyPy via crimes --- src/toil/wdl/wdltoil.py | 70 ++++++++++++++++++++++++++++++++++------- 1 file changed, 58 insertions(+), 12 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 255a361e9d..137acfa02a 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -29,6 +29,7 @@ import sys import textwrap import uuid +import hashlib from contextlib import ExitStack, contextmanager from graphlib import TopologicalSorter from tempfile import mkstemp, gettempdir @@ -41,6 +42,7 @@ Iterator, List, Optional, + Protocol, Sequence, Set, Tuple, @@ -48,6 +50,7 @@ TypeVar, Union, cast) +from typing_extensions import Buffer from mypy_extensions import Arg, DefaultArg from urllib.error import HTTPError from urllib.parse import quote, unquote, urljoin, urlsplit @@ -87,12 +90,58 @@ from toil.lib.threading import global_mutex from toil.provisioners.clusterScaler import JobTooBigError -import hashlib +logger = logging.getLogger(__name__) + +# We want to use hashlib.file_digest to avoid a 3-line hashing loop like +# MiniWDL has. But it is only in 3.11+ +# +# So we need to have a function that is either it or a fallback with the +# hashing loop. +# +# So we need to be able to articulate the type of that function for MyPy, to +# avoid needing to write a function with the *exact* signature of the import +# (and not e.g. one that needs slightly different methods of its fileobjs or is +# missing some kwarg features). +# +# So we need to define some protocols. +# +# TODO: Move this into lib somewhere? +# TODO: Give up and license the 3 line loop MiniWDL has? +class ReadableFileObj(Protocol): + """ + Protocol that is more specific than what file_digest takes as an argument. + Also guarantees a read() method. + + Would extend the protocol from Typeshed for hashlib but those are only + declared for 3.11+. + """ + def readinto(self, buf: bytearray, /) -> int: ... + def readable(self) -> bool: ... + def read(self, number: int) -> bytes: ... + +class FileDigester(Protocol): + """ + Protocol for the features we need from hashlib.file_digest. + """ + # We need __ prefixes here or the name of the argument becomes part of the required interface. + def __call__(self, __f: ReadableFileObj, __alg_name: str) -> hashlib._Hash: ... + try: - from hashlib import file_digest + # Don't do a direct conditional import to the final name here because then + # the polyfill needs *exactly* the signature of file_digest, and not just + # one that can accept all calls we make in the file, or MyPy will complain. + # + # We need to tell MyPy we expect this import to fail, when typechecking on + # pythons that don't have it. But we also need to tell it that it is fine + # if it succeeds, for Pythons that do have it. + # + # TODO: Change to checking sys.version_info because MyPy understands that + # better? + from hashlib import file_digest as file_digest_impl # type: ignore[attr-defined,unused-ignore] + file_digest: FileDigester = file_digest_impl except ImportError: # Polyfill file_digest from 3.11+ - def file_digest(f: IO[bytes], alg_name: str) -> hashlib._Hash: + def file_digest_fallback_impl(f: ReadableFileObj, alg_name: str) -> hashlib._Hash: BUFFER_SIZE = 1024 * 1024 hasher = hashlib.new(alg_name) buffer = f.read(BUFFER_SIZE) @@ -100,11 +149,7 @@ def file_digest(f: IO[bytes], alg_name: str) -> hashlib._Hash: hasher.update(buffer) buffer = f.read(BUFFER_SIZE) return hasher - - - -logger = logging.getLogger(__name__) - + file_digest = file_digest_fallback_impl @contextmanager def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]: @@ -677,7 +722,7 @@ def get_shared_fs_path(file: Union[str, WDL.Value.File]) -> Optional[str]: return cast(str, getattr(file_value, SHARED_PATH_ATTR)) return None -def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> None: +def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> str: """ Mutate a File to associate it with the given shared filesystem path. @@ -689,8 +734,9 @@ def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> None: input was not a WDL File. """ if isinstance(file, WDL.Value.File): - file_value = file.value + file_value: str = file.value else: + assert isinstance(file, str) file_value = file if not isinstance(file_value, AttrStr): # Make it be a str subclass we can set attributes on @@ -1181,7 +1227,7 @@ def wrapper(v: WDL.Value.Base) -> WDL.Value.File: devirtualized_filename = self._devirtualize_filename(virtualized_file.value) # Hash the file to hex hex_digest = file_digest(open(devirtualized_filename, "rb"), "sha256").hexdigest() - file_input_bindings = WDL.Env.Bindings(WDL.Env.Binding("file_sha256", WDL.Value.String(hex_digest))) + file_input_bindings = WDL.Env.Bindings(WDL.Env.Binding("file_sha256", cast(WDL.Value.Base, WDL.Value.String(hex_digest)))) # Make an environment of "file_sha256" to that as a WDL string, and # digest that, and make a write_ cache key. No need to transform to # shared FS paths sonce no paths are in it. @@ -1190,7 +1236,7 @@ def wrapper(v: WDL.Value.Base) -> WDL.Value.File: file_cache_key = "write_/" + input_digest # Construct a description of the types we expect to get from the # cache: just a File-type variable named "file" - expected_types = WDL.Env.Bindings(WDL.Env.Binding("file", WDL.Type.File())) + expected_types = WDL.Env.Bindings(WDL.Env.Binding("file", cast(WDL.Type.Base, WDL.Type.File()))) # Query the cache file_output_bindings = self._miniwdl_cache.get(file_cache_key, file_input_bindings, expected_types) if file_output_bindings: From 5ddc9ab4b49a6d34376dce0866049898a2c64135 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 26 Sep 2024 23:19:17 -0400 Subject: [PATCH 08/16] Stop redefining output_bindings --- src/toil/wdl/wdltoil.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index a449caa751..f4c94216a4 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -3757,7 +3757,6 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: for subnode in node.body: stack.append(subnode) # Collect all bindings that are task outputs - output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for binding in unwrap(self._bindings): if binding.name in output_set: # The bindings will already be namespaced with the task namespaces From 0512abb988bca872c477fe8857d75553b43084e9 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 27 Sep 2024 12:14:12 -0700 Subject: [PATCH 09/16] Stop looking for : in URI scheme part --- src/toil/wdl/wdltoil.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index f4c94216a4..9400007efb 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1772,7 +1772,8 @@ def import_file_from_uri(uri: str) -> str: # We need to make sure file URIs and local paths that point to # the same place are treated the same. parsed = urlsplit(candidate_uri) - if parsed.scheme == "file:": + # Note that parsing the URL does *not* include ':' in the scheme + if parsed.scheme == "file": # This is a local file URI. Convert to a path for source directory tracking. local_path = unquote(parsed.path) parent_dir = os.path.dirname(local_path) From 37a8771bb27c71c7d5415ba0c1d532019bf39f8b Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 27 Sep 2024 12:33:22 -0700 Subject: [PATCH 10/16] Detect file URIs sneaking into local shared paths and treat file:// URIs as URIs for import --- src/toil/wdl/wdltoil.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 9400007efb..28419446b3 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -719,7 +719,9 @@ def get_shared_fs_path(file: Union[str, WDL.Value.File]) -> Optional[str]: else: file_value = file if hasattr(file_value, SHARED_PATH_ATTR): - return cast(str, getattr(file_value, SHARED_PATH_ATTR)) + result = cast(str, getattr(file_value, SHARED_PATH_ATTR)) + assert not result.startswith("file://"), f"Found URI shared FS path of {result} on {file}" + return result return None def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> str: @@ -733,6 +735,8 @@ def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> str: Returns a str that has to be assigned back to the WDL File's value, if the input was not a WDL File. """ + # We should not have URLs here, only real paths. + assert not path.startswith("file://"), f"Cannot assign URI shared FS path of {path} to {file}" if isinstance(file, WDL.Value.File): file_value: str = file.value else: @@ -755,7 +759,10 @@ def file_path_to_use(stored_file_string: str) -> str: """ Return the shared FS path if we have one, or the current string. """ - return get_shared_fs_path(stored_file_string) or stored_file_string + shared_path = get_shared_fs_path(stored_file_string) + result = shared_path or stored_file_string + assert not result.startswith("file://"), f"Trying to digest file URI {result} for file {stored_file_string} with shared path {shared_path}" + return result return map_over_files_in_bindings(bindings, file_path_to_use) @@ -1767,7 +1774,11 @@ def import_file_from_uri(uri: str) -> str: local_path: Optional[str] = None # Was actually found - if is_url(candidate_uri): + if is_url(candidate_uri) or candidate_uri.startswith("file://"): + # Is a non-file URI or file URI. + # TODO: is_url is due to be renamed. + # TODO: check for the // also for global URL/URI detection because http:stuff.dat is a valid local filename. Add it to the toilfile: URIs. + # Might be a file URI or other URI. # We need to make sure file URIs and local paths that point to # the same place are treated the same. From aa2846ebf2fad44af9c91fde05569080bce4b2af Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 27 Sep 2024 15:17:19 -0700 Subject: [PATCH 11/16] Unrelatedly quiet inner loop logging and get toil status dot output to me more useful --- src/toil/batchSystems/abstractGridEngineBatchSystem.py | 3 ++- src/toil/job.py | 4 ++-- src/toil/utils/toilStatus.py | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 24bd4bb4fb..da6a9a790a 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -25,6 +25,7 @@ from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport from toil.bus import ExternalBatchIdMessage, get_job_kind from toil.job import AcceleratorRequirement +from toil.statsAndLogging import TRACE from toil.lib.misc import CalledProcessErrorStderr from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry @@ -267,7 +268,7 @@ def _runStep(self): if self.checkOnJobs(): activity = True if not activity: - logger.debug('No activity, sleeping for %is', self.boss.sleepSeconds()) + logger.log(TRACE, 'No activity, sleeping for %is', self.boss.sleepSeconds()) return True def run(self): diff --git a/src/toil/job.py b/src/toil/job.py index 4517911864..b6132bde6f 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -967,9 +967,9 @@ def allSuccessors(self) -> Iterator[str]: def successors_by_phase(self) -> Iterator[Tuple[int, str]]: """ - Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase numbere on the stack. + Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase number on the stack. - Phases ececute higher numbers to lower numbers. + Phases execute higher numbers to lower numbers. """ for i, phase in enumerate(self.successor_phases): diff --git a/src/toil/utils/toilStatus.py b/src/toil/utils/toilStatus.py index 6e8271c3c1..54d2611a62 100644 --- a/src/toil/utils/toilStatus.py +++ b/src/toil/utils/toilStatus.py @@ -49,14 +49,14 @@ def print_dot_chart(self) -> None: # Make job IDs to node names map jobsToNodeNames: Dict[str, str] = dict( - map(lambda job: (str(job.jobStoreID), job.jobName), self.jobsToReport) + map(lambda job: (str(job.jobStoreID), str(job.jobStoreID).replace("_", "___").replace("/", "_").replace("-", "__")), self.jobsToReport) ) # Print the nodes for job in set(self.jobsToReport): print( - '{} [label="{} {}"];'.format( - jobsToNodeNames[str(job.jobStoreID)], job.jobName, job.jobStoreID + '{} [label="{} {}" color="{}"];'.format( + jobsToNodeNames[str(job.jobStoreID)], job.jobName, job.displayName, "black" if job.has_body() else "green" ) ) From d6902b0d430a8635d94265e5d8beaaf1c6c4626e Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 27 Sep 2024 16:14:19 -0700 Subject: [PATCH 12/16] Let task and workflow calls cache the same way --- src/toil/wdl/wdltoil.py | 243 ++++++++++++++++++++++++---------------- 1 file changed, 144 insertions(+), 99 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 28419446b3..d2a6dbdc43 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -687,9 +687,9 @@ def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str, str, str]: return file_id, task_path, parent_id, file_basename -## +### # Caching machinery -## +### # TODO: Move to new file? @@ -766,6 +766,115 @@ def file_path_to_use(stored_file_string: str) -> str: return map_over_files_in_bindings(bindings, file_path_to_use) +def poll_execution_cache(node: Union[WDL.Tree.Workflow, WDL.Tree.Task], bindings: WDLBindings) -> Tuple[Optional[WDLBindings], str]: + """ + Return the cached result of calling this workflow or task, and its key. + + Returns None and the key if the cache has no result for us. + + Deals in un-namespaced bindings. + """ + # View the inputs as shared FS files. + transformed_bindings = view_shared_fs_paths(bindings) + log_bindings(logger.debug, "Digesting input bindings:", [transformed_bindings]) + input_digest = WDL.Value.digest_env(transformed_bindings) + cache_key=f"{node.name}/{node.digest}/{input_digest}" + miniwdl_logger = logging.getLogger("MiniWDL") + # TODO: Ship config from leader? It might not see the right environment. + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + cached_result: Optional[WDLBindings] = miniwdl_cache.get(cache_key, transformed_bindings, node.effective_outputs) + if cached_result is not None: + logger.info("Found call in cache") + return cached_result, cache_key + else: + logger.debug("No cache hit for %s", cache_key) + return None, cache_key + +def fill_execution_cache(cache_key: str, output_bindings: WDLBindings, file_store: AbstractFileStore, execution_dir: Optional[str], miniwdl_logger: Optional[logging.Logger] = None, miniwdl_config: Optional[WDL.runtime.config.Loader] = None) -> WDLBindings: + """ + Cache the result of calling a workflow or task. + + Deals in un-namespaced bindings. + + :param execution_dir: Working directory where the user launched the + workflow. + + :returns: possibly modified bindings to continue on with, that may + reference the cache. + """ + + if miniwdl_logger is None: + miniwdl_logger = logging.getLogger("MiniWDL") + if miniwdl_config is None: + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + + miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + if not miniwdl_cache._cfg["call_cache"].get_bool("put"): + # Do nothing since the cache does not want new entries. + # TODO: Get MiniWDL to expose this. + return output_bindings + + # Set up deduplication just for these outputs. + devirtualization_state: DirectoryNamingStateDict = {} + devirtualized_to_virtualized: Dict[str, str] = dict() + virtualized_to_devirtualized: Dict[str, str] = dict() + # TODO: if a URL is passed through multiple tasks it will be saved multiple times. Also save on input??? + + # Determine where we will save our cached versions of files. + output_directory = os.path.join(miniwdl_cache._call_cache_dir, cache_key) + + # Adjust all files in the output bindings to have shared FS paths outside the job store. + def assign_shared_fs_path(file_value: str) -> str: + """ + Given the string value inside a File, mutate the File to have a shared FS path outside the jobstore. + + Returns the value to put in the WDL file to actually do the mutation. + """ + + # TODO: Change to using File objects when required PR with + # the mapping functions for that is finally merged. + + # We need all the incoming paths to not already be local + # paths, or devirtualizing them to export them will not + # work. + # + # This ought to be the case because we just virtualized + # them all for transport out of the machine. + if not is_url(file_value): + raise RuntimeError("File {file_value} caught escaping from task unvirtualized") + + if get_shared_fs_path(file_value) is None: + # We need to save this file somewhere. + # This needs to exist before we can export to it. And now we know + # we will export something, so make sure it exists. + os.makedirs(output_directory, exist_ok=True) + exported_path = ToilWDLStdLibBase.devirtualize_to( + file_value, + output_directory, + file_store, + execution_dir, + devirtualization_state, + devirtualized_to_virtualized, + virtualized_to_devirtualized, + enforce_existence=False, + export=True + ) + + # Remember where it went + file_value = set_shared_fs_path(file_value, exported_path) + + return file_value + output_bindings = map_over_files_in_bindings(output_bindings, assign_shared_fs_path) + + # Save the bindings to the cache, representing all files with their shared filesystem paths. + miniwdl_cache.put(cache_key, view_shared_fs_paths(output_bindings)) + logger.debug("Saved result to cache under %s", cache_key) + + # Keep using the transformed bindings so that later tasks use + # the cached files in their input digests. + return output_bindings + DirectoryNamingStateDict = Dict[str, Tuple[Dict[str, str], Set[str]]] def choose_human_readable_directory(root_dir: str, source_task_path: str, parent_id: str, state: DirectoryNamingStateDict) -> str: @@ -2134,27 +2243,13 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs. # For a task we are only passed the inside-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) - + # At this point we have what MiniWDL would call the "inputs" to the - # task (i.e. what you would put in a JSON file, without any defaulted - # or calculated inputs filled in). So start making cache keys. - # But first we need to view the inputs as shared FS files. - transformed_bindings = view_shared_fs_paths(bindings) - log_bindings(logger.debug, "Digesting input bindings:", [transformed_bindings]) - input_digest = WDL.Value.digest_env(transformed_bindings) - task_digest = self._task.digest - cache_key=f"{self._task.name}/{task_digest}/{input_digest}" - miniwdl_logger = logging.getLogger("MiniWDL") - # TODO: Ship config from leader? It might not see the right environment. - miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) - miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) - cached_result: Optional[WDLBindings] = miniwdl_cache.get(cache_key, transformed_bindings, self._task.effective_outputs) + # call (i.e. what you would put in a JSON file, without any defaulted + # or calculated inputs filled in). + cached_result, cache_key = poll_execution_cache(self._task, bindings) if cached_result is not None: - logger.info("Found task call in cache") return self.postprocess(cached_result) - else: - logger.debug("No cache hit for %s", cache_key) - # Otherwise we need to run the actual command. # Set up the WDL standard library # UUID to use for virtualizing files @@ -2261,7 +2356,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: runtime_accelerators = [accelerator_requirement] # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options, cache_key=cache_key) + run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cache_key=cache_key, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) # Run that as a child self.addChild(run_job) @@ -2814,69 +2909,8 @@ def get_path_in_container(path: str) -> Optional[str]: output_bindings = virtualize_files(output_bindings, outputs_library) if self._cache_key is not None: - # We might need to save to the call cache - miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) - if miniwdl_cache._cfg["call_cache"].get_bool("put"): - # Saving to the cache is on. If we ran somethign we write it to the cache. - # TODO: Get MiniWDL to expose this. - - # Set up deduplication just for these outputs. - devirtualization_state: DirectoryNamingStateDict = {} - devirtualized_to_virtualized: Dict[str, str] = dict() - virtualized_to_devirtualized: Dict[str, str] = dict() - # TODO: if a URL is passed through multiple tasks it will be saved multiple times. Also save on input??? - - # Determine where we will save our cached versions of files. - output_directory = os.path.join(miniwdl_cache._call_cache_dir, self._cache_key) - # This needs to exist before we can export to it - os.makedirs(output_directory, exist_ok=True) - - # Adjust all files in the output bindings to have shared FS paths outside the job store. - def assign_shared_fs_path(file_value: str) -> str: - """ - Given the string value inside a File, mutate the File to have a shared FS path outside the jobstore. - - Returns the value to put in the WDL file to actually do the mutation. - """ - - # TODO: Change to using File objects when required PR with - # the mapping functions for that is finally merged. - - # We need all the incoming paths to not already be local - # paths, or devirtualizing them to export them will not - # work. - # - # This ought to be the case because we just virtualized - # them all for transport out of the machine. - if not is_url(file_value): - raise RuntimeError("File {file_value} caught escaping from task unvirtualized") - - if get_shared_fs_path(file_value) is None: - # We need to save this file somewhere. - exported_path = ToilWDLStdLibBase.devirtualize_to( - file_value, - output_directory, - file_store, - self._wdl_options.get("execution_dir"), - devirtualization_state, - devirtualized_to_virtualized, - virtualized_to_devirtualized, - enforce_existence=False, - export=True - ) - - # Remember where it went - file_value = set_shared_fs_path(file_value, exported_path) - - return file_value - output_bindings = map_over_files_in_bindings(output_bindings, assign_shared_fs_path) - - # Save the bindings to the cache, representing all files with their shared filesystem paths. - miniwdl_cache.put(self._cache_key, view_shared_fs_paths(output_bindings)) - logger.debug("Saved result to cache under %s", self._cache_key) - - # Keep using the transformed bindings so that later tasks use - # the cached files in their input digests. + # We might need to save to the execution cache + output_bindings = fill_execution_cache(self._cache_key, output_bindings, file_store, self._wdl_options.get("execution_dir"), miniwdl_logger=miniwdl_logger, miniwdl_config=miniwdl_config) # Do postprocessing steps to e.g. apply namespaces. output_bindings = self.postprocess(output_bindings) @@ -3678,8 +3712,15 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: logger.info("Running workflow %s (%s) called as %s", self._workflow.name, self._workflow_id, self._namespace) # Combine the bindings we get from previous jobs. - # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) + + # At this point we have what MiniWDL would call the "inputs" to the + # call (i.e. what you would put in a JSON file, without any defaulted + # or calculated inputs filled in). + cached_result, cache_key = poll_execution_cache(self._workflow, bindings) + if cached_result is not None: + return self.postprocess(cached_result) + # Set up the WDL standard library standard_library = ToilWDLStdLibWorkflow(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) @@ -3695,34 +3736,35 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) - - if self._workflow.outputs != []: # Compare against empty list as None means there should be outputs - # Either the output section is declared and nonempty or it is not declared - # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._task_path, wdl_options=self._wdl_options) - sink.addFollowOn(outputs_job) - # Caller is responsible for making sure namespaces are applied - self.defer_postprocessing(outputs_job) - return outputs_job.rv() - else: - # No outputs from this workflow. - return self.postprocess(WDL.Env.Bindings()) + + # To support the all call outputs feature, run an outputs job even if + # we have a declared but empty outputs section. + outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._task_path, cache_key=cache_key, wdl_options=self._wdl_options) + sink.addFollowOn(outputs_job) + # Caller is responsible for making sure namespaces are applied + self.defer_postprocessing(outputs_job) + return outputs_job.rv() class WDLOutputsJob(WDLBaseJob): """ - Job which evaluates an outputs section (such as for a workflow). + Job which evaluates an outputs section for a workflow. Returns an environment with just the outputs bound, in no namespace. """ - def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any): + def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], task_path: str, cache_key: Optional[str] = None, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. + + :param cache_key: If set and storing into the call cache is on, will + cache the workflow execution result under the given key in a + MiniWDL-compatible way. """ super().__init__(wdl_options=wdl_options, **kwargs) self._bindings = bindings self._workflow = workflow self._task_path = task_path + self._cache_key = cache_key @report_wdl_errors("evaluate outputs") def run(self, file_store: AbstractFileStore) -> WDLBindings: @@ -3786,6 +3828,9 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # Null nonexistent optional values and error on the rest output_bindings = drop_missing_files(output_bindings, self._wdl_options.get("execution_dir")) + + if self._cache_key is not None: + output_bindings = fill_execution_cache(self._cache_key, output_bindings, file_store, self._wdl_options.get("execution_dir")) return self.postprocess(output_bindings) From 421f9af0f7f8b45dcb3aecb465f91dbafbfbc79f Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 27 Sep 2024 20:13:09 -0400 Subject: [PATCH 13/16] Make sure files are imported Also fix linkImports from workers, and remove the weird contect manager hardlink setup that didn't appear to work anyway. --- src/toil/common.py | 10 +++- src/toil/jobStores/fileJobStore.py | 31 ++++++------- src/toil/wdl/wdltoil.py | 73 ++++++++++++++++++------------ 3 files changed, 66 insertions(+), 48 deletions(-) diff --git a/src/toil/common.py b/src/toil/common.py index 62a052be9f..72d144e141 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -1158,7 +1158,15 @@ def import_file(self, src_uri: str, shared_file_name: None = None, symlink: bool = True, - check_existence: bool = True) -> FileID: + check_existence: Literal[True] = True) -> FileID: + ... + + @overload + def import_file(self, + src_uri: str, + shared_file_name: None = None, + symlink: bool = True, + check_existence: bool = True) -> Optional[FileID]: ... def import_file(self, diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index 4d8e94fafe..636db8e782 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -131,6 +131,10 @@ def resume(self): if not os.path.isdir(self.jobStoreDir): raise NoSuchJobStoreException(self.jobStoreDir, "file") super().resume() + # TODO: Unify with initialize() configuration + self.linkImports = self.config.symlinkImports + self.moveExports = self.config.moveOutputs + self.symlink_job_store_reads = self.config.symlink_job_store_reads def destroy(self): if os.path.exists(self.jobStoreDir): @@ -298,26 +302,19 @@ def jobs(self): # Functions that deal with temporary files associated with jobs ########################################## - @contextmanager - def optional_hard_copy(self, hardlink): - if hardlink: - saved = self.linkImports - self.linkImports = False - yield - if hardlink: - self.linkImports = saved - - def _copy_or_link(self, src_path, dst_path, symlink=False): + def _copy_or_link(self, src_path, dst_path, hardlink=False, symlink=False): # linking is not done be default because of issue #1755 - srcPath = self._extract_path_from_url(src_path) - if self.linkImports and symlink: - os.symlink(os.path.realpath(srcPath), dst_path) + # TODO: is hardlinking ever actually done? + src_path = self._extract_path_from_url(src_path) + if self.linkImports and not hardlink and symlink: + os.symlink(os.path.realpath(src_path), dst_path) else: - atomic_copy(srcPath, dst_path) + atomic_copy(src_path, dst_path) def _import_file(self, otherCls, uri, shared_file_name=None, hardlink=False, symlink=True): # symlink argument says whether the caller can take symlinks or not. # ex: if false, it means the workflow cannot work with symlinks and we need to hardlink or copy. + # TODO: Do we ever actually hardlink? # default is true since symlinking everything is ideal uri_path = unquote(uri.path) if issubclass(otherCls, FileJobStore): @@ -327,16 +324,14 @@ def _import_file(self, otherCls, uri, shared_file_name=None, hardlink=False, sym if shared_file_name is None: executable = os.stat(uri_path).st_mode & stat.S_IXUSR != 0 absPath = self._get_unique_file_path(uri_path) # use this to get a valid path to write to in job store - with self.optional_hard_copy(hardlink): - self._copy_or_link(uri, absPath, symlink=symlink) + self._copy_or_link(uri, absPath, hardlink=hardlink, symlink=symlink) # TODO: os.stat(absPath).st_size consistently gives values lower than # getDirSizeRecursively() return FileID(self._get_file_id_from_path(absPath), os.stat(absPath).st_size, executable) else: self._requireValidSharedFileName(shared_file_name) path = self._get_shared_file_path(shared_file_name) - with self.optional_hard_copy(hardlink): - self._copy_or_link(uri, path, symlink=symlink) + self._copy_or_link(uri, path, hardlink=hardlink, symlink=symlink) return None else: return super()._import_file(otherCls, uri, shared_file_name=shared_file_name) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index d2a6dbdc43..4350abe3f0 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -111,7 +111,7 @@ class ReadableFileObj(Protocol): """ Protocol that is more specific than what file_digest takes as an argument. Also guarantees a read() method. - + Would extend the protocol from Typeshed for hashlib but those are only declared for 3.11+. """ @@ -801,7 +801,7 @@ def fill_execution_cache(cache_key: str, output_bindings: WDLBindings, file_stor workflow. :returns: possibly modified bindings to continue on with, that may - reference the cache. + reference the cache. """ if miniwdl_logger is None: @@ -823,7 +823,7 @@ def fill_execution_cache(cache_key: str, output_bindings: WDLBindings, file_stor # Determine where we will save our cached versions of files. output_directory = os.path.join(miniwdl_cache._call_cache_dir, cache_key) - + # Adjust all files in the output bindings to have shared FS paths outside the job store. def assign_shared_fs_path(file_value: str) -> str: """ @@ -831,20 +831,21 @@ def assign_shared_fs_path(file_value: str) -> str: Returns the value to put in the WDL file to actually do the mutation. """ - + # TODO: Change to using File objects when required PR with # the mapping functions for that is finally merged. - # We need all the incoming paths to not already be local - # paths, or devirtualizing them to export them will not - # work. - # - # This ought to be the case because we just virtualized - # them all for transport out of the machine. - if not is_url(file_value): - raise RuntimeError("File {file_value} caught escaping from task unvirtualized") - if get_shared_fs_path(file_value) is None: + # We need all the incoming paths that aren't cache paths to not already + # be local paths, or devirtualizing them to export them will not work. + # + # This ought to be the case because we just virtualized + # them all for transport out of the machine. + if not is_url(file_value): + # TODO: If we're passing things around by URL reference and + # some of them are file: is this actually allowed? + raise RuntimeError(f"File {file_value} caught escaping from task unvirtualized") + # We need to save this file somewhere. # This needs to exist before we can export to it. And now we know # we will export something, so make sure it exists. @@ -1311,7 +1312,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: # TODO: Ship config from leader? It might not see the right environment. miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) self._miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) - + # This needs to be hash-compatible with MiniWDL. # MiniWDL hooks _virtualize_filename # , @@ -1382,13 +1383,13 @@ def wrapper(v: WDL.Value.Base) -> WDL.Value.File: enforce_existence=True, export=True ) - + # Save the cache entry pointing to it self._miniwdl_cache.put( file_cache_key, WDL.Env.Bindings(WDL.Env.Binding("file", WDL.Value.File(exported_path))) ) - + # Apply the shared filesystem path to the virtualized file set_shared_fs_path(virtualized_file, exported_path) @@ -1807,10 +1808,11 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None: task_container.input_path_map[host_path] = container_path task_container.input_path_map_rev[container_path] = host_path -def import_files(environment: WDLBindings, task_path: str, toil: Toil, path: Optional[List[str]] = None, skip_remote: bool = False) -> WDLBindings: +def import_files(environment: WDLBindings, task_path: str, file_dest: Union[AbstractFileStore, Toil], path: Optional[List[str]] = None, skip_remote: bool = False) -> WDLBindings: """ - Make sure all File values embedded in the given bindings are imported, - using the given Toil object. + Make sure all File values embedded in the given bindings are imported. + + Uses the given Toil object or FileStore. :param task_path: Dotted WDL name of the user-level code doing the importing (probably the workflow name). @@ -1845,7 +1847,12 @@ def import_file_from_uri(uri: str) -> str: # Actually import # Try to import the file. Don't raise if we can't find it, just # return None! - imported = toil.import_file(candidate_uri, check_existence=False) + if isinstance(file_dest, Toil): + imported = file_dest.import_file(candidate_uri, check_existence=False) + else: + # The file store import_file doesn't do an existence check. + # TODO: Have a more compatible interface. + imported = file_dest.import_file(candidate_uri) if imported is None: # Wasn't found there continue @@ -2240,20 +2247,28 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: super().run(file_store) logger.info("Evaluating inputs and runtime for task %s (%s) called as %s", self._task.name, self._task_id, self._namespace) + # Set up the WDL standard library + standard_library = ToilWDLStdLibBase(file_store, self._task_path) + # Combine the bindings we get from previous jobs. # For a task we are only passed the inside-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) - + # At this point we have what MiniWDL would call the "inputs" to the # call (i.e. what you would put in a JSON file, without any defaulted # or calculated inputs filled in). cached_result, cache_key = poll_execution_cache(self._task, bindings) if cached_result is not None: - return self.postprocess(cached_result) + # Virtualize any files we loaded from the cache, to maintain the + # invariant that they are in the job store, and to avoid + # re-virtualizing them later if they oass through other tasks. This + # should mostly be symlinking because we are probably using the + # FileJobStore. + # + # TODO: Allow just propagating things through by normal path + # reference into the cache? + return self.postprocess(import_files(cached_result, self._task.name, file_store)) - # Set up the WDL standard library - # UUID to use for virtualizing files - standard_library = ToilWDLStdLibBase(file_store, self._task_path) with monkeypatch_coerce(standard_library): if self._task.inputs: logger.debug("Evaluating task code") @@ -2909,7 +2924,7 @@ def get_path_in_container(path: str) -> Optional[str]: output_bindings = virtualize_files(output_bindings, outputs_library) if self._cache_key is not None: - # We might need to save to the execution cache + # We might need to save to the execution cache output_bindings = fill_execution_cache(self._cache_key, output_bindings, file_store, self._wdl_options.get("execution_dir"), miniwdl_logger=miniwdl_logger, miniwdl_config=miniwdl_config) # Do postprocessing steps to e.g. apply namespaces. @@ -3719,7 +3734,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # or calculated inputs filled in). cached_result, cache_key = poll_execution_cache(self._workflow, bindings) if cached_result is not None: - return self.postprocess(cached_result) + return self.postprocess(import_files(cached_result, self._workflow.name, file_store)) # Set up the WDL standard library standard_library = ToilWDLStdLibWorkflow(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) @@ -3736,7 +3751,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) - + # To support the all call outputs feature, run an outputs job even if # we have a declared but empty outputs section. outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._task_path, cache_key=cache_key, wdl_options=self._wdl_options) @@ -3828,7 +3843,7 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # Null nonexistent optional values and error on the rest output_bindings = drop_missing_files(output_bindings, self._wdl_options.get("execution_dir")) - + if self._cache_key is not None: output_bindings = fill_execution_cache(self._cache_key, output_bindings, file_store, self._wdl_options.get("execution_dir")) From a6d9c6432b02197a2d3b90b5523ea0e2945104c5 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 27 Sep 2024 20:15:02 -0400 Subject: [PATCH 14/16] Stop logging WDL imports twice --- src/toil/wdl/wdltoil.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 4350abe3f0..1d65343d5d 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1856,7 +1856,6 @@ def import_file_from_uri(uri: str) -> str: if imported is None: # Wasn't found there continue - logger.info('Imported %s', candidate_uri) except UnimplementedURLException as e: # We can't find anything that can even support this URL scheme. # Report to the user, they are probably missing an extra. From 772a2026227b72ef5c1913715dfee89733c9d1e8 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 27 Sep 2024 20:30:28 -0400 Subject: [PATCH 15/16] Only make a cache in the workflow std lib when we really need it --- src/toil/wdl/wdltoil.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 1d65343d5d..c6d2d3e5af 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1306,13 +1306,9 @@ class ToilWDLStdLibWorkflow(ToilWDLStdLibBase): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) - - # Set up MiniWDL caching for files - miniwdl_logger = logging.getLogger("MiniWDL") - # TODO: Ship config from leader? It might not see the right environment. - miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) - self._miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) - + + self._miniwdl_cache: Optional[WDL.runtime.cache.CallCache] = None + # This needs to be hash-compatible with MiniWDL. # MiniWDL hooks _virtualize_filename # , @@ -1324,6 +1320,14 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: def _write( self, serialize: Callable[[WDL.Value.Base, IO[bytes]], None] ) -> Callable[[WDL.Value.Base], WDL.Value.File]: + + if self._miniwdl_cache is None: + # We do indeed need a MiniWDL cache. + # Setting it up logs so make it lazily. + miniwdl_logger = logging.getLogger("MiniWDL") + # TODO: Ship config from leader? It might not see the right environment. + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + self._miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) # Get the normal writer writer = super()._write(serialize) @@ -1355,6 +1359,7 @@ def wrapper(v: WDL.Value.Base) -> WDL.Value.File: # cache: just a File-type variable named "file" expected_types = WDL.Env.Bindings(WDL.Env.Binding("file", cast(WDL.Type.Base, WDL.Type.File()))) # Query the cache + assert self._miniwdl_cache is not None file_output_bindings = self._miniwdl_cache.get(file_cache_key, file_input_bindings, expected_types) if file_output_bindings: # File with this hash is cached. From e76f346204a3aac81485422dcac62a8d909cbc42 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Fri, 27 Sep 2024 21:02:20 -0400 Subject: [PATCH 16/16] Don't make MiniWDL cache in the middle of parent class constructor --- src/toil/wdl/wdltoil.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index c6d2d3e5af..17ca44f942 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -1321,13 +1321,8 @@ def _write( self, serialize: Callable[[WDL.Value.Base, IO[bytes]], None] ) -> Callable[[WDL.Value.Base], WDL.Value.File]: - if self._miniwdl_cache is None: - # We do indeed need a MiniWDL cache. - # Setting it up logs so make it lazily. - miniwdl_logger = logging.getLogger("MiniWDL") - # TODO: Ship config from leader? It might not see the right environment. - miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) - self._miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + # Note that the parent class constructor calls this method, but doesn't + # invoke the resulting function. # Get the normal writer writer = super()._write(serialize) @@ -1342,6 +1337,14 @@ def wrapper(v: WDL.Value.Base) -> WDL.Value.File: virtualized_file = writer(v) + if self._miniwdl_cache is None: + # We do indeed need a MiniWDL cache. + # Setting it up logs so make it lazily. + miniwdl_logger = logging.getLogger("MiniWDL") + # TODO: Ship config from leader? It might not see the right environment. + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + self._miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + # TODO: If we did this before the _virtualize_filename call in the # base _write we wouldn't need to immediately devirtualize. But we # have internal caches to lean on.