Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support WDL call caching #5105

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
03177ce
Start on helpers for working with MiniWDL's call cache
adamnovak Aug 22, 2024
5d4b11f
Merge remote-tracking branch 'upstream/master' into issues/4797-share…
adamnovak Sep 12, 2024
6d30200
Add cache put and get but no real handling of files
adamnovak Sep 12, 2024
f83f122
Merge remote-tracking branch 'upstream/master' into issues/4797-share…
adamnovak Sep 26, 2024
d48c7dd
Hide Toil files in MiniWDL cache and assign cache file paths
adamnovak Sep 26, 2024
074c865
Interpose a str subclass but still put toilfile URIs in cache somehow
adamnovak Sep 26, 2024
e38eeac
Get cache to interoperate with MiniWDL for simple file cases
adamnovak Sep 26, 2024
99c68f3
Connect Toil to MiniWDL write_* file cache
adamnovak Sep 27, 2024
ca89abf
Satisfy MyPy via crimes
adamnovak Sep 27, 2024
aeaf5f5
Merge remote-tracking branch 'upstream/master' into issues/4797-share…
adamnovak Sep 27, 2024
5ddc9ab
Stop redefining output_bindings
adamnovak Sep 27, 2024
0512abb
Stop looking for : in URI scheme part
adamnovak Sep 27, 2024
37a8771
Detect file URIs sneaking into local shared paths and treat file:// U…
adamnovak Sep 27, 2024
aa2846e
Unrelatedly quiet inner loop logging and get toil status dot output t…
adamnovak Sep 27, 2024
d6902b0
Let task and workflow calls cache the same way
adamnovak Sep 27, 2024
421f9af
Make sure files are imported
adamnovak Sep 28, 2024
a6d9c64
Stop logging WDL imports twice
adamnovak Sep 28, 2024
772a202
Only make a cache in the workflow std lib when we really need it
adamnovak Sep 28, 2024
e76f346
Don't make MiniWDL cache in the middle of parent class constructor
adamnovak Sep 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from toil.batchSystems.cleanup_support import BatchSystemCleanupSupport
from toil.bus import ExternalBatchIdMessage, get_job_kind
from toil.job import AcceleratorRequirement
from toil.statsAndLogging import TRACE
from toil.lib.misc import CalledProcessErrorStderr
from toil.lib.retry import old_retry, DEFAULT_DELAYS, retry

Expand Down Expand Up @@ -267,7 +268,7 @@ def _runStep(self):
if self.checkOnJobs():
activity = True
if not activity:
logger.debug('No activity, sleeping for %is', self.boss.sleepSeconds())
logger.log(TRACE, 'No activity, sleeping for %is', self.boss.sleepSeconds())
return True

def run(self):
Expand Down
10 changes: 9 additions & 1 deletion src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,15 @@ def import_file(self,
src_uri: str,
shared_file_name: None = None,
symlink: bool = True,
check_existence: bool = True) -> FileID:
check_existence: Literal[True] = True) -> FileID:
...

@overload
def import_file(self,
src_uri: str,
shared_file_name: None = None,
symlink: bool = True,
check_existence: bool = True) -> Optional[FileID]:
...

def import_file(self,
Expand Down
4 changes: 2 additions & 2 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -967,9 +967,9 @@ def allSuccessors(self) -> Iterator[str]:

def successors_by_phase(self) -> Iterator[Tuple[int, str]]:
"""
Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase numbere on the stack.
Get an iterator over all child/follow-on/chained inherited successor job IDs, along with their phase number on the stack.

Phases ececute higher numbers to lower numbers.
Phases execute higher numbers to lower numbers.
"""

for i, phase in enumerate(self.successor_phases):
Expand Down
4 changes: 3 additions & 1 deletion src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,10 @@ def export_file(self, file_id: FileID, dst_uri: str) -> None:
:param str file_id: The id of the file in the job store that should be exported.

:param str dst_uri: URL that points to a file or object in the storage mechanism of a
supported URL scheme e.g. a blob in an AWS s3 bucket.
supported URL scheme e.g. a blob in an AWS s3 bucket. May also be a local path.
"""
from toil.common import Toil
dst_uri = Toil.normalize_uri(dst_uri)
parseResult = urlparse(dst_uri)
otherCls = self._findJobStoreForUrl(parseResult, export=True)
self._export_file(otherCls, file_id, parseResult)
Expand Down
31 changes: 13 additions & 18 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ def resume(self):
if not os.path.isdir(self.jobStoreDir):
raise NoSuchJobStoreException(self.jobStoreDir, "file")
super().resume()
# TODO: Unify with initialize() configuration
self.linkImports = self.config.symlinkImports
self.moveExports = self.config.moveOutputs
self.symlink_job_store_reads = self.config.symlink_job_store_reads

def destroy(self):
if os.path.exists(self.jobStoreDir):
Expand Down Expand Up @@ -298,26 +302,19 @@ def jobs(self):
# Functions that deal with temporary files associated with jobs
##########################################

@contextmanager
def optional_hard_copy(self, hardlink):
if hardlink:
saved = self.linkImports
self.linkImports = False
yield
if hardlink:
self.linkImports = saved

def _copy_or_link(self, src_path, dst_path, symlink=False):
def _copy_or_link(self, src_path, dst_path, hardlink=False, symlink=False):
# linking is not done be default because of issue #1755
srcPath = self._extract_path_from_url(src_path)
if self.linkImports and symlink:
os.symlink(os.path.realpath(srcPath), dst_path)
# TODO: is hardlinking ever actually done?
src_path = self._extract_path_from_url(src_path)
if self.linkImports and not hardlink and symlink:
os.symlink(os.path.realpath(src_path), dst_path)
else:
atomic_copy(srcPath, dst_path)
atomic_copy(src_path, dst_path)

def _import_file(self, otherCls, uri, shared_file_name=None, hardlink=False, symlink=True):
# symlink argument says whether the caller can take symlinks or not.
# ex: if false, it means the workflow cannot work with symlinks and we need to hardlink or copy.
# TODO: Do we ever actually hardlink?
# default is true since symlinking everything is ideal
uri_path = unquote(uri.path)
if issubclass(otherCls, FileJobStore):
Expand All @@ -327,16 +324,14 @@ def _import_file(self, otherCls, uri, shared_file_name=None, hardlink=False, sym
if shared_file_name is None:
executable = os.stat(uri_path).st_mode & stat.S_IXUSR != 0
absPath = self._get_unique_file_path(uri_path) # use this to get a valid path to write to in job store
with self.optional_hard_copy(hardlink):
self._copy_or_link(uri, absPath, symlink=symlink)
self._copy_or_link(uri, absPath, hardlink=hardlink, symlink=symlink)
# TODO: os.stat(absPath).st_size consistently gives values lower than
# getDirSizeRecursively()
return FileID(self._get_file_id_from_path(absPath), os.stat(absPath).st_size, executable)
else:
self._requireValidSharedFileName(shared_file_name)
path = self._get_shared_file_path(shared_file_name)
with self.optional_hard_copy(hardlink):
self._copy_or_link(uri, path, symlink=symlink)
self._copy_or_link(uri, path, hardlink=hardlink, symlink=symlink)
return None
else:
return super()._import_file(otherCls, uri, shared_file_name=shared_file_name)
Expand Down
6 changes: 3 additions & 3 deletions src/toil/utils/toilStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ def print_dot_chart(self) -> None:

# Make job IDs to node names map
jobsToNodeNames: Dict[str, str] = dict(
map(lambda job: (str(job.jobStoreID), job.jobName), self.jobsToReport)
map(lambda job: (str(job.jobStoreID), str(job.jobStoreID).replace("_", "___").replace("/", "_").replace("-", "__")), self.jobsToReport)
)

# Print the nodes
for job in set(self.jobsToReport):
print(
'{} [label="{} {}"];'.format(
jobsToNodeNames[str(job.jobStoreID)], job.jobName, job.jobStoreID
'{} [label="{} {}" color="{}"];'.format(
jobsToNodeNames[str(job.jobStoreID)], job.jobName, job.displayName, "black" if job.has_body() else "green"
)
)

Expand Down
Loading
Loading