diff --git a/src/toil/batchSystems/abstractGridEngineBatchSystem.py b/src/toil/batchSystems/abstractGridEngineBatchSystem.py index 24bd4bb4fb..da6a9a790a 100644 --- a/src/toil/batchSystems/abstractGridEngineBatchSystem.py +++ b/src/toil/batchSystems/abstractGridEngineBatchSystem.py @@ -25,6 +25,7 @@ from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport from toil.bus import ExternalBatchIdMessage, get_job_kind from toil.job import AcceleratorRequirement +from toil.statsAndLogging import TRACE from toil.lib.misc import CalledProcessErrorStderr from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry @@ -267,7 +268,7 @@ def _runStep(self): if self.checkOnJobs(): activity = True if not activity: - logger.debug('No activity, sleeping for %is', self.boss.sleepSeconds()) + logger.log(TRACE, 'No activity, sleeping for %is', self.boss.sleepSeconds()) return True def run(self): diff --git a/src/toil/common.py b/src/toil/common.py index 62a052be9f..72d144e141 100644 --- a/src/toil/common.py +++ b/src/toil/common.py @@ -1158,7 +1158,15 @@ def import_file(self, src_uri: str, shared_file_name: None = None, symlink: bool = True, - check_existence: bool = True) -> FileID: + check_existence: Literal[True] = True) -> FileID: + ... + + @overload + def import_file(self, + src_uri: str, + shared_file_name: None = None, + symlink: bool = True, + check_existence: bool = True) -> Optional[FileID]: ... def import_file(self, diff --git a/src/toil/job.py b/src/toil/job.py index 4517911864..b6132bde6f 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -967,9 +967,9 @@ def allSuccessors(self) -> Iterator[str]: def successors_by_phase(self) -> Iterator[Tuple[int, str]]: """ - Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase numbere on the stack. + Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase number on the stack. - Phases ececute higher numbers to lower numbers. + Phases execute higher numbers to lower numbers. """ for i, phase in enumerate(self.successor_phases): diff --git a/src/toil/jobStores/abstractJobStore.py b/src/toil/jobStores/abstractJobStore.py index 99291da052..d909248ed2 100644 --- a/src/toil/jobStores/abstractJobStore.py +++ b/src/toil/jobStores/abstractJobStore.py @@ -506,8 +506,10 @@ def export_file(self, file_id: FileID, dst_uri: str) -> None: :param str file_id: The id of the file in the job store that should be exported. :param str dst_uri: URL that points to a file or object in the storage mechanism of a - supported URL scheme e.g. a blob in an AWS s3 bucket. + supported URL scheme e.g. a blob in an AWS s3 bucket. May also be a local path. """ + from toil.common import Toil + dst_uri = Toil.normalize_uri(dst_uri) parseResult = urlparse(dst_uri) otherCls = self._findJobStoreForUrl(parseResult, export=True) self._export_file(otherCls, file_id, parseResult) diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index 4d8e94fafe..636db8e782 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -131,6 +131,10 @@ def resume(self): if not os.path.isdir(self.jobStoreDir): raise NoSuchJobStoreException(self.jobStoreDir, "file") super().resume() + # TODO: Unify with initialize() configuration + self.linkImports = self.config.symlinkImports + self.moveExports = self.config.moveOutputs + self.symlink_job_store_reads = self.config.symlink_job_store_reads def destroy(self): if os.path.exists(self.jobStoreDir): @@ -298,26 +302,19 @@ def jobs(self): # Functions that deal with temporary files associated with jobs ########################################## - @contextmanager - def optional_hard_copy(self, hardlink): - if hardlink: - saved = self.linkImports - self.linkImports = False - yield - if hardlink: - self.linkImports = saved - - def _copy_or_link(self, src_path, dst_path, symlink=False): + def _copy_or_link(self, src_path, dst_path, hardlink=False, symlink=False): # linking is not done be default because of issue #1755 - srcPath = self._extract_path_from_url(src_path) - if self.linkImports and symlink: - os.symlink(os.path.realpath(srcPath), dst_path) + # TODO: is hardlinking ever actually done? + src_path = self._extract_path_from_url(src_path) + if self.linkImports and not hardlink and symlink: + os.symlink(os.path.realpath(src_path), dst_path) else: - atomic_copy(srcPath, dst_path) + atomic_copy(src_path, dst_path) def _import_file(self, otherCls, uri, shared_file_name=None, hardlink=False, symlink=True): # symlink argument says whether the caller can take symlinks or not. # ex: if false, it means the workflow cannot work with symlinks and we need to hardlink or copy. + # TODO: Do we ever actually hardlink? # default is true since symlinking everything is ideal uri_path = unquote(uri.path) if issubclass(otherCls, FileJobStore): @@ -327,16 +324,14 @@ def _import_file(self, otherCls, uri, shared_file_name=None, hardlink=False, sym if shared_file_name is None: executable = os.stat(uri_path).st_mode & stat.S_IXUSR != 0 absPath = self._get_unique_file_path(uri_path) # use this to get a valid path to write to in job store - with self.optional_hard_copy(hardlink): - self._copy_or_link(uri, absPath, symlink=symlink) + self._copy_or_link(uri, absPath, hardlink=hardlink, symlink=symlink) # TODO: os.stat(absPath).st_size consistently gives values lower than # getDirSizeRecursively() return FileID(self._get_file_id_from_path(absPath), os.stat(absPath).st_size, executable) else: self._requireValidSharedFileName(shared_file_name) path = self._get_shared_file_path(shared_file_name) - with self.optional_hard_copy(hardlink): - self._copy_or_link(uri, path, symlink=symlink) + self._copy_or_link(uri, path, hardlink=hardlink, symlink=symlink) return None else: return super()._import_file(otherCls, uri, shared_file_name=shared_file_name) diff --git a/src/toil/utils/toilStatus.py b/src/toil/utils/toilStatus.py index 6e8271c3c1..54d2611a62 100644 --- a/src/toil/utils/toilStatus.py +++ b/src/toil/utils/toilStatus.py @@ -49,14 +49,14 @@ def print_dot_chart(self) -> None: # Make job IDs to node names map jobsToNodeNames: Dict[str, str] = dict( - map(lambda job: (str(job.jobStoreID), job.jobName), self.jobsToReport) + map(lambda job: (str(job.jobStoreID), str(job.jobStoreID).replace("_", "___").replace("/", "_").replace("-", "__")), self.jobsToReport) ) # Print the nodes for job in set(self.jobsToReport): print( - '{} [label="{} {}"];'.format( - jobsToNodeNames[str(job.jobStoreID)], job.jobName, job.jobStoreID + '{} [label="{} {}" color="{}"];'.format( + jobsToNodeNames[str(job.jobStoreID)], job.jobName, job.displayName, "black" if job.has_body() else "green" ) ) diff --git a/src/toil/wdl/wdltoil.py b/src/toil/wdl/wdltoil.py index 113a40d468..17ca44f942 100755 --- a/src/toil/wdl/wdltoil.py +++ b/src/toil/wdl/wdltoil.py @@ -29,6 +29,7 @@ import sys import textwrap import uuid +import hashlib from contextlib import ExitStack, contextmanager from graphlib import TopologicalSorter from tempfile import mkstemp, gettempdir @@ -36,10 +37,12 @@ Callable, Dict, Generator, + IO, Iterable, Iterator, List, Optional, + Protocol, Sequence, Set, Tuple, @@ -47,6 +50,7 @@ TypeVar, Union, cast) +from typing_extensions import Buffer from mypy_extensions import Arg, DefaultArg from urllib.error import HTTPError from urllib.parse import quote, unquote, urljoin, urlsplit @@ -86,9 +90,66 @@ from toil.lib.threading import global_mutex from toil.provisioners.clusterScaler import JobTooBigError - logger = logging.getLogger(__name__) +# We want to use hashlib.file_digest to avoid a 3-line hashing loop like +# MiniWDL has. But it is only in 3.11+ +# +# So we need to have a function that is either it or a fallback with the +# hashing loop. +# +# So we need to be able to articulate the type of that function for MyPy, to +# avoid needing to write a function with the *exact* signature of the import +# (and not e.g. one that needs slightly different methods of its fileobjs or is +# missing some kwarg features). +# +# So we need to define some protocols. +# +# TODO: Move this into lib somewhere? +# TODO: Give up and license the 3 line loop MiniWDL has? +class ReadableFileObj(Protocol): + """ + Protocol that is more specific than what file_digest takes as an argument. + Also guarantees a read() method. + + Would extend the protocol from Typeshed for hashlib but those are only + declared for 3.11+. + """ + def readinto(self, buf: bytearray, /) -> int: ... + def readable(self) -> bool: ... + def read(self, number: int) -> bytes: ... + +class FileDigester(Protocol): + """ + Protocol for the features we need from hashlib.file_digest. + """ + # We need __ prefixes here or the name of the argument becomes part of the required interface. + def __call__(self, __f: ReadableFileObj, __alg_name: str) -> hashlib._Hash: ... + +try: + # Don't do a direct conditional import to the final name here because then + # the polyfill needs *exactly* the signature of file_digest, and not just + # one that can accept all calls we make in the file, or MyPy will complain. + # + # We need to tell MyPy we expect this import to fail, when typechecking on + # pythons that don't have it. But we also need to tell it that it is fine + # if it succeeds, for Pythons that do have it. + # + # TODO: Change to checking sys.version_info because MyPy understands that + # better? + from hashlib import file_digest as file_digest_impl # type: ignore[attr-defined,unused-ignore] + file_digest: FileDigester = file_digest_impl +except ImportError: + # Polyfill file_digest from 3.11+ + def file_digest_fallback_impl(f: ReadableFileObj, alg_name: str) -> hashlib._Hash: + BUFFER_SIZE = 1024 * 1024 + hasher = hashlib.new(alg_name) + buffer = f.read(BUFFER_SIZE) + while buffer: + hasher.update(buffer) + buffer = f.read(BUFFER_SIZE) + return hasher + file_digest = file_digest_fallback_impl @contextmanager def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] = logger.critical) -> Generator[None, None, None]: @@ -111,6 +172,7 @@ def wdl_error_reporter(task: str, exit: bool = False, log: Callable[[str], None] UnimplementedURLException, JobTooBigError ) as e: + logger.exception(e) # Don't expose tracebacks to the user for exceptions that may be expected log("Could not " + task + " because:") @@ -625,6 +687,195 @@ def unpack_toil_uri(toil_uri: str) -> Tuple[FileID, str, str, str]: return file_id, task_path, parent_id, file_basename +### +# Caching machinery +### + +# TODO: Move to new file? + +# We store the shared FS path in an attribute on the string value in the WDL File. +# TODO: When we can map and get the File, change this. +SHARED_PATH_ATTR = "_shared_fs_path" + +# Since you can't actually *set* an attribute on a str, we need to have a +# non-builtin str subclass to make the values be. +class AttrStr(str): + pass + +def get_shared_fs_path(file: Union[str, WDL.Value.File]) -> Optional[str]: + """ + If a File has a shared filesystem path, get that path. + + This will be the path the File was initially imported from, or the path that it has in the call cache. + + Accepts either a WDL-level File or the actual str value of one. + """ + # TODO: We hide this on the value string itself so we can + # map_over_files_in_bindings and fetch it out. But really we should allow + # mapping over bindings and getting the WDL.Value.Base objects and hide it + # in there. + if isinstance(file, WDL.Value.File): + file_value = file.value + else: + file_value = file + if hasattr(file_value, SHARED_PATH_ATTR): + result = cast(str, getattr(file_value, SHARED_PATH_ATTR)) + assert not result.startswith("file://"), f"Found URI shared FS path of {result} on {file}" + return result + return None + +def set_shared_fs_path(file: Union[str, WDL.Value.File], path: str) -> str: + """ + Mutate a File to associate it with the given shared filesystem path. + + This should be the path it was initially imported from, or the path that it has in the call cache. + + Accepts either a WDL-level File or the actual str value of one. + + Returns a str that has to be assigned back to the WDL File's value, if the + input was not a WDL File. + """ + # We should not have URLs here, only real paths. + assert not path.startswith("file://"), f"Cannot assign URI shared FS path of {path} to {file}" + if isinstance(file, WDL.Value.File): + file_value: str = file.value + else: + assert isinstance(file, str) + file_value = file + if not isinstance(file_value, AttrStr): + # Make it be a str subclass we can set attributes on + file_value = AttrStr(file_value) + setattr(file_value, SHARED_PATH_ATTR, path) + if isinstance(file, WDL.Value.File): + # Commit mutation + file.value = file_value + return file_value + +def view_shared_fs_paths(bindings: WDL.Env.Bindings[WDL.Value.Base]) -> WDL.Env.Bindings[WDL.Value.Base]: + """ + Given WDL bindings, return a copy where all files have their shared filesystem paths as their values. + """ + def file_path_to_use(stored_file_string: str) -> str: + """ + Return the shared FS path if we have one, or the current string. + """ + shared_path = get_shared_fs_path(stored_file_string) + result = shared_path or stored_file_string + assert not result.startswith("file://"), f"Trying to digest file URI {result} for file {stored_file_string} with shared path {shared_path}" + return result + + return map_over_files_in_bindings(bindings, file_path_to_use) + +def poll_execution_cache(node: Union[WDL.Tree.Workflow, WDL.Tree.Task], bindings: WDLBindings) -> Tuple[Optional[WDLBindings], str]: + """ + Return the cached result of calling this workflow or task, and its key. + + Returns None and the key if the cache has no result for us. + + Deals in un-namespaced bindings. + """ + # View the inputs as shared FS files. + transformed_bindings = view_shared_fs_paths(bindings) + log_bindings(logger.debug, "Digesting input bindings:", [transformed_bindings]) + input_digest = WDL.Value.digest_env(transformed_bindings) + cache_key=f"{node.name}/{node.digest}/{input_digest}" + miniwdl_logger = logging.getLogger("MiniWDL") + # TODO: Ship config from leader? It might not see the right environment. + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + cached_result: Optional[WDLBindings] = miniwdl_cache.get(cache_key, transformed_bindings, node.effective_outputs) + if cached_result is not None: + logger.info("Found call in cache") + return cached_result, cache_key + else: + logger.debug("No cache hit for %s", cache_key) + return None, cache_key + +def fill_execution_cache(cache_key: str, output_bindings: WDLBindings, file_store: AbstractFileStore, execution_dir: Optional[str], miniwdl_logger: Optional[logging.Logger] = None, miniwdl_config: Optional[WDL.runtime.config.Loader] = None) -> WDLBindings: + """ + Cache the result of calling a workflow or task. + + Deals in un-namespaced bindings. + + :param execution_dir: Working directory where the user launched the + workflow. + + :returns: possibly modified bindings to continue on with, that may + reference the cache. + """ + + if miniwdl_logger is None: + miniwdl_logger = logging.getLogger("MiniWDL") + if miniwdl_config is None: + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + + miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + if not miniwdl_cache._cfg["call_cache"].get_bool("put"): + # Do nothing since the cache does not want new entries. + # TODO: Get MiniWDL to expose this. + return output_bindings + + # Set up deduplication just for these outputs. + devirtualization_state: DirectoryNamingStateDict = {} + devirtualized_to_virtualized: Dict[str, str] = dict() + virtualized_to_devirtualized: Dict[str, str] = dict() + # TODO: if a URL is passed through multiple tasks it will be saved multiple times. Also save on input??? + + # Determine where we will save our cached versions of files. + output_directory = os.path.join(miniwdl_cache._call_cache_dir, cache_key) + + # Adjust all files in the output bindings to have shared FS paths outside the job store. + def assign_shared_fs_path(file_value: str) -> str: + """ + Given the string value inside a File, mutate the File to have a shared FS path outside the jobstore. + + Returns the value to put in the WDL file to actually do the mutation. + """ + + # TODO: Change to using File objects when required PR with + # the mapping functions for that is finally merged. + + if get_shared_fs_path(file_value) is None: + # We need all the incoming paths that aren't cache paths to not already + # be local paths, or devirtualizing them to export them will not work. + # + # This ought to be the case because we just virtualized + # them all for transport out of the machine. + if not is_url(file_value): + # TODO: If we're passing things around by URL reference and + # some of them are file: is this actually allowed? + raise RuntimeError(f"File {file_value} caught escaping from task unvirtualized") + + # We need to save this file somewhere. + # This needs to exist before we can export to it. And now we know + # we will export something, so make sure it exists. + os.makedirs(output_directory, exist_ok=True) + exported_path = ToilWDLStdLibBase.devirtualize_to( + file_value, + output_directory, + file_store, + execution_dir, + devirtualization_state, + devirtualized_to_virtualized, + virtualized_to_devirtualized, + enforce_existence=False, + export=True + ) + + # Remember where it went + file_value = set_shared_fs_path(file_value, exported_path) + + return file_value + output_bindings = map_over_files_in_bindings(output_bindings, assign_shared_fs_path) + + # Save the bindings to the cache, representing all files with their shared filesystem paths. + miniwdl_cache.put(cache_key, view_shared_fs_paths(output_bindings)) + logger.debug("Saved result to cache under %s", cache_key) + + # Keep using the transformed bindings so that later tasks use + # the cached files in their input digests. + return output_bindings + DirectoryNamingStateDict = Dict[str, Tuple[Dict[str, str], Set[str]]] def choose_human_readable_directory(root_dir: str, source_task_path: str, parent_id: str, state: DirectoryNamingStateDict) -> str: @@ -888,7 +1139,8 @@ def devirtualize_to( state: DirectoryNamingStateDict, devirtualized_to_virtualized: Optional[Dict[str, str]] = None, virtualized_to_devirtualized: Optional[Dict[str, str]] = None, - enforce_existence: bool = True + enforce_existence: bool = True, + export: bool = False ) -> str: """ Download or export a WDL virtualized filename/URL to the given directory. @@ -899,14 +1151,16 @@ def devirtualize_to( don't clobber each other. Called from within this class for tasks, and statically at the end of the workflow for outputs. - Returns the local path to the file. If it already had a local path - elsewhere, it might not actually be put in dest_dir. + Returns the local path to the file. If the file is already a local + path, or if it already has an entry in virtualized_to_devirtualized, + that path will be re-used instead of creating a new copy in dest_dir. The input filename could already be devirtualized. In this case, the filename - should not be added to the cache + should not be added to the cache. :param state: State dict which must be shared among successive calls into a dest_dir. :param enforce_existence: Raise an error if the file is nonexistent. Else, let it pass through. + :param export: Always create exported copies of files rather than views that a FileStore might clean up. """ if not os.path.isdir(dest_dir): @@ -916,7 +1170,6 @@ def devirtualize_to( raise RuntimeError(f"Cannot devirtualize {filename} into nonexistent directory {dest_dir}") # TODO: Support people doing path operations (join, split, get parent directory) on the virtualized filenames. - # TODO: For task inputs, we are supposed to make sure to put things in the same directory if they came from the same directory. See if is_url(filename): if virtualized_to_devirtualized is not None and filename in virtualized_to_devirtualized: # The virtualized file is in the cache, so grab the already devirtualized result @@ -953,17 +1206,20 @@ def devirtualize_to( if filename.startswith(TOIL_URI_SCHEME): # Get a local path to the file - if isinstance(file_source, AbstractFileStore): + if isinstance(file_source, Toil) or export: + # Use the export method that both Toil and + # AbstractFileStore have to get an unencumbered copy. + file_source.export_file(file_id, dest_path) + result = dest_path + elif isinstance(file_source, AbstractFileStore): # Read from the file store. # File is not allowed to be modified by the task. See # . # We try to get away with symlinks and hope the task # container can mount the destination file. result = file_source.readGlobalFile(file_id, dest_path, mutable=False, symlink=True) - elif isinstance(file_source, Toil): - # Read from the Toil context - file_source.export_file(file_id, dest_path) - result = dest_path + else: + raise RuntimeError(f"Unsupported file source: {file_source}") else: # Download to a local file with the right name and execute bit. # Open it exclusively @@ -1039,6 +1295,117 @@ def _virtualize_filename(self, filename: str) -> str: self._virtualized_to_devirtualized[result] = abs_filename return result +class ToilWDLStdLibWorkflow(ToilWDLStdLibBase): + """ + Standard library implementation for workflow scope. + + Handles deduplicating files generated by write_* calls at workflow scope + with copies already in the call cache, so that tasks that depend on them + can also be fulfilled from the cache. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + self._miniwdl_cache: Optional[WDL.runtime.cache.CallCache] = None + + # This needs to be hash-compatible with MiniWDL. + # MiniWDL hooks _virtualize_filename + # , + # but we probably don't want to hash all files virtualized at workflow + # scope, just dynamic ones. + # + # TODO: Test cache compatibility with MiniWDL when a file is virtualized + # from a string at workflow scope! + def _write( + self, serialize: Callable[[WDL.Value.Base, IO[bytes]], None] + ) -> Callable[[WDL.Value.Base], WDL.Value.File]: + + # Note that the parent class constructor calls this method, but doesn't + # invoke the resulting function. + + # Get the normal writer + writer = super()._write(serialize) + + def wrapper(v: WDL.Value.Base) -> WDL.Value.File: + """ + Call the normal writer, and then deduplicate its result with the cache. + """ + # TODO: If we did this before the _virtualize_filename call in the + # base _write, we could let the cache bring the info between nodes + # and not need to use the job store. + + virtualized_file = writer(v) + + if self._miniwdl_cache is None: + # We do indeed need a MiniWDL cache. + # Setting it up logs so make it lazily. + miniwdl_logger = logging.getLogger("MiniWDL") + # TODO: Ship config from leader? It might not see the right environment. + miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) + self._miniwdl_cache = WDL.runtime.cache.new(miniwdl_config, miniwdl_logger) + + # TODO: If we did this before the _virtualize_filename call in the + # base _write we wouldn't need to immediately devirtualize. But we + # have internal caches to lean on. + devirtualized_filename = self._devirtualize_filename(virtualized_file.value) + # Hash the file to hex + hex_digest = file_digest(open(devirtualized_filename, "rb"), "sha256").hexdigest() + file_input_bindings = WDL.Env.Bindings(WDL.Env.Binding("file_sha256", cast(WDL.Value.Base, WDL.Value.String(hex_digest)))) + # Make an environment of "file_sha256" to that as a WDL string, and + # digest that, and make a write_ cache key. No need to transform to + # shared FS paths sonce no paths are in it. + log_bindings(logger.debug, "Digesting file bindings:", [file_input_bindings]) + input_digest = WDL.Value.digest_env(file_input_bindings) + file_cache_key = "write_/" + input_digest + # Construct a description of the types we expect to get from the + # cache: just a File-type variable named "file" + expected_types = WDL.Env.Bindings(WDL.Env.Binding("file", cast(WDL.Type.Base, WDL.Type.File()))) + # Query the cache + assert self._miniwdl_cache is not None + file_output_bindings = self._miniwdl_cache.get(file_cache_key, file_input_bindings, expected_types) + if file_output_bindings: + # File with this hash is cached. + # Adjust virtualized_file to carry that path as its local-filesystem path. + set_shared_fs_path(virtualized_file, file_output_bindings.resolve("file").value) + elif self._miniwdl_cache._cfg["call_cache"].get_bool("put"): + # Save our novel file to the cache. + + # Determine where we will save the file. + output_directory = os.path.join(self._miniwdl_cache._call_cache_dir, file_cache_key) + # This needs to exist before we can export to it + os.makedirs(output_directory, exist_ok=True) + + # Export the file to the cache. + # write_* files will never really need to being siblings, so we + # don't need any real persistent state here. + # TODO: Will they secretly be siblings on a first run? + exported_path = self.devirtualize_to( + virtualized_file.value, + output_directory, + self._file_store, + self._execution_dir, + {}, + {}, + {}, + enforce_existence=True, + export=True + ) + + # Save the cache entry pointing to it + self._miniwdl_cache.put( + file_cache_key, + WDL.Env.Bindings(WDL.Env.Binding("file", WDL.Value.File(exported_path))) + ) + + # Apply the shared filesystem path to the virtualized file + set_shared_fs_path(virtualized_file, exported_path) + + return virtualized_file + + return wrapper + + class ToilWDLStdLibTaskCommand(ToilWDLStdLibBase): """ Standard library implementation to use inside a WDL task command evaluation. @@ -1449,10 +1816,11 @@ def add_paths(task_container: TaskContainer, host_paths: Iterable[str]) -> None: task_container.input_path_map[host_path] = container_path task_container.input_path_map_rev[container_path] = host_path -def import_files(environment: WDLBindings, task_path: str, toil: Toil, path: Optional[List[str]] = None, skip_remote: bool = False) -> WDLBindings: +def import_files(environment: WDLBindings, task_path: str, file_dest: Union[AbstractFileStore, Toil], path: Optional[List[str]] = None, skip_remote: bool = False) -> WDLBindings: """ - Make sure all File values embedded in the given bindings are imported, - using the given Toil object. + Make sure all File values embedded in the given bindings are imported. + + Uses the given Toil object or FileStore. :param task_path: Dotted WDL name of the user-level code doing the importing (probably the workflow name). @@ -1487,11 +1855,15 @@ def import_file_from_uri(uri: str) -> str: # Actually import # Try to import the file. Don't raise if we can't find it, just # return None! - imported = toil.import_file(candidate_uri, check_existence=False) + if isinstance(file_dest, Toil): + imported = file_dest.import_file(candidate_uri, check_existence=False) + else: + # The file store import_file doesn't do an existence check. + # TODO: Have a more compatible interface. + imported = file_dest.import_file(candidate_uri) if imported is None: # Wasn't found there continue - logger.info('Imported %s', candidate_uri) except UnimplementedURLException as e: # We can't find anything that can even support this URL scheme. # Report to the user, they are probably missing an extra. @@ -1521,26 +1893,41 @@ def import_file_from_uri(uri: str) -> str: # download them at that basename later. raise RuntimeError(f"File {candidate_uri} has no basename and so cannot be a WDL File") + # If the file has a local path outside the job store, we want to know it for caching + local_path: Optional[str] = None + # Was actually found - if is_url(candidate_uri): + if is_url(candidate_uri) or candidate_uri.startswith("file://"): + # Is a non-file URI or file URI. + # TODO: is_url is due to be renamed. + # TODO: check for the // also for global URL/URI detection because http:stuff.dat is a valid local filename. Add it to the toilfile: URIs. + # Might be a file URI or other URI. # We need to make sure file URIs and local paths that point to # the same place are treated the same. parsed = urlsplit(candidate_uri) - if parsed.scheme == "file:": + # Note that parsing the URL does *not* include ':' in the scheme + if parsed.scheme == "file": # This is a local file URI. Convert to a path for source directory tracking. - parent_dir = os.path.dirname(unquote(parsed.path)) + local_path = unquote(parsed.path) + parent_dir = os.path.dirname(local_path) else: # This is some other URL. Get the URL to the parent directory and use that. parent_dir = urljoin(candidate_uri, ".") else: # Must be a local path + local_path = candidate_uri parent_dir = os.path.dirname(candidate_uri) # Pack a UUID of the parent directory dir_id = path_to_id.setdefault(parent_dir, uuid.uuid4()) - return pack_toil_uri(imported, task_path, dir_id, file_basename) + file_value = pack_toil_uri(imported, task_path, dir_id, file_basename) + if local_path is not None: + # Mark the file as having a known local path outside the jobstore. + # TODO: Turn URLs into local files like the MiniWDL download cache. + file_value = set_shared_fs_path(file_value, local_path) + return file_value # If we get here we tried all the candidates raise RuntimeError(f"Could not find {uri} at any of: {tried}") @@ -1867,12 +2254,28 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: super().run(file_store) logger.info("Evaluating inputs and runtime for task %s (%s) called as %s", self._task.name, self._task_id, self._namespace) + # Set up the WDL standard library + standard_library = ToilWDLStdLibBase(file_store, self._task_path) + # Combine the bindings we get from previous jobs. # For a task we are only passed the inside-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) - # Set up the WDL standard library - # UUID to use for virtualizing files - standard_library = ToilWDLStdLibBase(file_store, self._task_path) + + # At this point we have what MiniWDL would call the "inputs" to the + # call (i.e. what you would put in a JSON file, without any defaulted + # or calculated inputs filled in). + cached_result, cache_key = poll_execution_cache(self._task, bindings) + if cached_result is not None: + # Virtualize any files we loaded from the cache, to maintain the + # invariant that they are in the job store, and to avoid + # re-virtualizing them later if they oass through other tasks. This + # should mostly be symlinking because we are probably using the + # FileJobStore. + # + # TODO: Allow just propagating things through by normal path + # reference into the cache? + return self.postprocess(import_files(cached_result, self._task.name, file_store)) + with monkeypatch_coerce(standard_library): if self._task.inputs: logger.debug("Evaluating task code") @@ -1975,7 +2378,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: runtime_accelerators = [accelerator_requirement] # Schedule to get resources. Pass along the bindings from evaluating all the inputs and decls, and the runtime, with files virtualized. - run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) + run_job = WDLTaskJob(self._task, virtualize_files(bindings, standard_library), virtualize_files(runtime_bindings, standard_library), self._task_id, self._namespace, self._task_path, cache_key=cache_key, cores=runtime_cores or self.cores, memory=runtime_memory or self.memory, disk=runtime_disk or self.disk, accelerators=runtime_accelerators or self.accelerators, wdl_options=self._wdl_options) # Run that as a child self.addChild(run_job) @@ -1998,7 +2401,7 @@ class WDLTaskJob(WDLBaseJob): All bindings are in terms of task-internal names. """ - def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, **kwargs: Any) -> None: + def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBindings], runtime_bindings: Promised[WDLBindings], task_id: List[str], namespace: str, task_path: str, cache_key: Optional[str] = None, **kwargs: Any) -> None: """ Make a new job to run a task. @@ -2007,11 +2410,12 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind :param task_path: Like the namespace, but including subscript numbers for scatters. + + :param cache_key: Key to save outputs under in MiniWDL-compatible + cache. Cached outputs will not be postprocessed. """ # This job should not be local because it represents a real workflow task. - # TODO: Instead of re-scheduling with more resources, add a local - # "wrapper" job like CWL uses to determine the actual requirements. super().__init__(unitName=task_path + ".command", displayName=namespace + ".command", local=False, **kwargs) logger.info("Preparing to run task %s as %s", task.name, namespace) @@ -2022,6 +2426,7 @@ def __init__(self, task: WDL.Tree.Task, task_internal_bindings: Promised[WDLBind self._task_id = task_id self._namespace = namespace self._task_path = task_path + self._cache_key = cache_key ### # Runtime code injection system @@ -2230,7 +2635,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # A current limitation with the singularity/miniwdl cache is it cannot check for image updates if the # filename is the same singularity_cache = os.path.join(os.path.expanduser("~"), ".singularity") - miniwdl_cache = os.path.join(os.path.expanduser("~"), ".cache/miniwdl") + miniwdl_singularity_cache = os.path.join(os.path.expanduser("~"), ".cache/miniwdl") # Cache Singularity's layers somewhere known to have space os.environ['SINGULARITY_CACHEDIR'] = os.environ.get("SINGULARITY_CACHEDIR", singularity_cache) @@ -2241,7 +2646,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Cache Singularity images for the workflow on this machine. # Since MiniWDL does only within-process synchronization for pulls, # we also will need to pre-pull one image into here at a time. - os.environ['MINIWDL__SINGULARITY__IMAGE_CACHE'] = os.environ.get("MINIWDL__SINGULARITY__IMAGE_CACHE", miniwdl_cache) + os.environ['MINIWDL__SINGULARITY__IMAGE_CACHE'] = os.environ.get("MINIWDL__SINGULARITY__IMAGE_CACHE", miniwdl_singularity_cache) # Make sure it exists. os.makedirs(os.environ['MINIWDL__SINGULARITY__IMAGE_CACHE'], exist_ok=True) @@ -2262,7 +2667,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: raise RuntimeError(f"Could not find a working container engine to use; told to use {self._wdl_options.get('container')}") # Set up the MiniWDL container running stuff - miniwdl_logger = logging.getLogger("MiniWDLContainers") + miniwdl_logger = logging.getLogger("MiniWDL") miniwdl_config = WDL.runtime.config.Loader(miniwdl_logger) if not getattr(TaskContainerImplementation, 'toil_initialized__', False): # Initialize the cointainer system @@ -2521,9 +2926,14 @@ def get_path_in_container(path: str) -> Optional[str]: # next task. raise WDL.Error.EvalError(decl, f"non-optional value {decl.name} = {decl.expr} is missing") - # Upload any files in the outputs if not uploaded already. Accounts for how relative paths may still need to be container-relative. + # Upload any files in the outputs if not uploaded already. Accounts for + # how relative paths may still need to be container-relative. output_bindings = virtualize_files(output_bindings, outputs_library) + if self._cache_key is not None: + # We might need to save to the execution cache + output_bindings = fill_execution_cache(self._cache_key, output_bindings, file_store, self._wdl_options.get("execution_dir"), miniwdl_logger=miniwdl_logger, miniwdl_config=miniwdl_config) + # Do postprocessing steps to e.g. apply namespaces. output_bindings = self.postprocess(output_bindings) @@ -2559,7 +2969,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs incoming_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) + standard_library = ToilWDLStdLibWorkflow(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) with monkeypatch_coerce(standard_library): if isinstance(self._node, WDL.Tree.Decl): # This is a variable assignment @@ -2650,7 +3060,7 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Combine the bindings we get from previous jobs current_bindings = combine_bindings(unwrap_all(self._prev_node_results)) # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) + standard_library = ToilWDLStdLibWorkflow(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) with monkeypatch_coerce(standard_library): for node in self._nodes: @@ -3324,10 +3734,17 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: logger.info("Running workflow %s (%s) called as %s", self._workflow.name, self._workflow_id, self._namespace) # Combine the bindings we get from previous jobs. - # For a task we only see the insode-the-task namespace. bindings = combine_bindings(unwrap_all(self._prev_node_results)) + + # At this point we have what MiniWDL would call the "inputs" to the + # call (i.e. what you would put in a JSON file, without any defaulted + # or calculated inputs filled in). + cached_result, cache_key = poll_execution_cache(self._workflow, bindings) + if cached_result is not None: + return self.postprocess(import_files(cached_result, self._workflow.name, file_store)) + # Set up the WDL standard library - standard_library = ToilWDLStdLibBase(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) + standard_library = ToilWDLStdLibWorkflow(file_store, self._task_path, execution_dir=self._wdl_options.get("execution_dir")) if self._workflow.inputs: with monkeypatch_coerce(standard_library): @@ -3342,33 +3759,34 @@ def run(self, file_store: AbstractFileStore) -> Promised[WDLBindings]: # Make jobs to run all the parts of the workflow sink = self.create_subgraph(self._workflow.body, [], bindings) - if self._workflow.outputs != []: # Compare against empty list as None means there should be outputs - # Either the output section is declared and nonempty or it is not declared - # Add evaluating the outputs after the sink - outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._task_path, wdl_options=self._wdl_options) - sink.addFollowOn(outputs_job) - # Caller is responsible for making sure namespaces are applied - self.defer_postprocessing(outputs_job) - return outputs_job.rv() - else: - # No outputs from this workflow. - return self.postprocess(WDL.Env.Bindings()) + # To support the all call outputs feature, run an outputs job even if + # we have a declared but empty outputs section. + outputs_job = WDLOutputsJob(self._workflow, sink.rv(), self._task_path, cache_key=cache_key, wdl_options=self._wdl_options) + sink.addFollowOn(outputs_job) + # Caller is responsible for making sure namespaces are applied + self.defer_postprocessing(outputs_job) + return outputs_job.rv() class WDLOutputsJob(WDLBaseJob): """ - Job which evaluates an outputs section (such as for a workflow). + Job which evaluates an outputs section for a workflow. Returns an environment with just the outputs bound, in no namespace. """ - def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], task_path: str, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any): + def __init__(self, workflow: WDL.Tree.Workflow, bindings: Promised[WDLBindings], task_path: str, cache_key: Optional[str] = None, wdl_options: Optional[Dict[str, str]] = None, **kwargs: Any): """ Make a new WDLWorkflowOutputsJob for the given workflow, with the given set of bindings after its body runs. + + :param cache_key: If set and storing into the call cache is on, will + cache the workflow execution result under the given key in a + MiniWDL-compatible way. """ super().__init__(wdl_options=wdl_options, **kwargs) self._bindings = bindings self._workflow = workflow self._task_path = task_path + self._cache_key = cache_key @report_wdl_errors("evaluate outputs") def run(self, file_store: AbstractFileStore) -> WDLBindings: @@ -3415,7 +3833,6 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: for subnode in node.body: stack.append(subnode) # Collect all bindings that are task outputs - output_bindings: WDL.Env.Bindings[WDL.Value.Base] = WDL.Env.Bindings() for binding in unwrap(self._bindings): if binding.name in output_set: # The bindings will already be namespaced with the task namespaces @@ -3434,6 +3851,9 @@ def run(self, file_store: AbstractFileStore) -> WDLBindings: # Null nonexistent optional values and error on the rest output_bindings = drop_missing_files(output_bindings, self._wdl_options.get("execution_dir")) + if self._cache_key is not None: + output_bindings = fill_execution_cache(self._cache_key, output_bindings, file_store, self._wdl_options.get("execution_dir")) + return self.postprocess(output_bindings) class WDLRootJob(WDLSectionJob):