-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add filesize sniffing and parallelize importing on jobs #5133
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the shape of the import logic is right, but I'm concerned about some variable names and some duplicated code, and about whether the CWL-side import logic actually has all its interdependencies documented sufficiently.
src/toil/cwl/cwltoil.py
Outdated
from toil.jobStores.abstractJobStore import ( | ||
AbstractJobStore, | ||
InvalidImportExportUrlException, | ||
LocatorException, | ||
NoSuchFileException, | ||
LocatorException, | ||
InvalidImportExportUrlException, | ||
UnimplementedURLException, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this might be undoing some of the formatting/import sorting improvements we recently merged; maybe the PR should be run through the code formatter Makefile target?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like the Makefile code formatter formats quite a few files, except for cwltoil.py. I'll manually undo this change, maybe this was done on accident while I was messing with imports
src/toil/cwl/cwltoil.py
Outdated
def extract_files( | ||
fileindex: Dict[str, str], | ||
existing: Dict[str, str], | ||
file_metadata: CWLObjectType, | ||
mark_broken: bool = False, | ||
skip_remote: bool = False, | ||
) -> Optional[str]: | ||
""" | ||
Extract the filename from a CWL file record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this operates on just one filename, it should have a singular name.
This function also does more than just extract a filename from a record. It consults and sometimes updates fileindex
and it will also update file_metadata
to fill in its location
from its path
, and it will find the realpath
(i.e. resolve symlinks) for bare file paths but for some reason not for file://
URIs. The docstring needs to explain why it does these things. Otherwise the caller will be surprised when it happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function has a bunch of borrowed code from upload_file and write_file. I think if the file://
scheme exists, some resolving was already done.
""" | ||
Extract the filename from a CWL file record | ||
:param fileindex: Forward mapping of filename | ||
:param existing: Reverse mapping of filename. This function does not use this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this argument to match a pre-defined function signature, right? Maybe we should mention where the kind of function that this function needs to be is documented. And if there isn't any documentation on this kind of function, we should maybe come up with a name for it and document it.
if not urlparse(location).scheme: | ||
rp = os.path.realpath(location) | ||
else: | ||
rp = location |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why aren't symlinks in the file://
URI's path resolved here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I borrowed this code from write_file. I believe all files should be resolved into file URIs by this point, so this is likely some edge case. From my limited testing, I can't trigger the realpath branch.
src/toil/cwl/cwltoil.py
Outdated
# This is a local file, or we also need to download and re-upload remote files | ||
if location not in fileindex: | ||
# don't download twice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't have any responsibility for deciding whether to download things; why are these comments talking about whether to download things?
src/toil/wdl/wdltoil.py
Outdated
""" | ||
return is_url_with_scheme(filename, REMOTE_SCHEMES) | ||
Resolve relative-URI files in the given environment and them then into absolute normalized URIs. Returns a dictionary of WDL file values to a tuple of the normalized URI, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think and them then
makes sense.
We might also want to say something different than "WDL file values"; we mean the string values that would appear in the value
field of a WDL.Value.File
object. But you could also think that a WDL.Value.File
object is itself a "WDL file value".
src/toil/wdl/wdltoil.py
Outdated
except UnimplementedURLException as e: | ||
# We can't find anything that can even support this URL scheme. | ||
# Report to the user, they are probably missing an extra. | ||
logger.critical("Error: " + str(e)) | ||
raise | ||
except HTTPError as e: | ||
# Something went wrong looking for it there. | ||
logger.warning( | ||
"Checked URL %s but got HTTP status %s", candidate_uri, e.code | ||
) | ||
# Try the next location. | ||
continue | ||
except FileNotFoundError: | ||
# Wasn't found there | ||
continue | ||
except Exception: | ||
# Something went wrong besides the file not being found. Maybe | ||
# we have no auth. | ||
logger.error( | ||
"Something went wrong when testing for existence of %s", | ||
candidate_uri, | ||
) | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This duplicates a lot of code with the CWL-side get_file_sizes()
. Is there a common function that could be extracted here that polls a URL and returns whether it existed and, if so, the size if available, and raises on other errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separated it out into job.py
src/toil/wdl/wdltoil.py
Outdated
def convert_files( | ||
environment: WDLBindings, | ||
file_to_id: Dict[str, FileID], | ||
file_to_data: Dict[str, FileMetadata], | ||
task_path: str, | ||
) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of altering the File
objects inside environment
in place, this function should return a modified copy of environment
. The WDLBindings
objects in MiniWDL I think are meant to be immutable.
src/toil/wdl/wdltoil.py
Outdated
@@ -5180,8 +5401,8 @@ class WDLStartJob(WDLSectionJob): | |||
|
|||
def __init__( | |||
self, | |||
target: WDL.Tree.Workflow | WDL.Tree.Task, | |||
inputs: WDLBindings, | |||
target: Union[WDL.Tree.Workflow, WDL.Tree.Task], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using |
everywhere instead of Union
is another pyupgrade change that we want to keep.
src/toil/wdl/wdltoil.py
Outdated
|
||
|
||
def make_root_job( | ||
target: WDL.Tree.Workflow | WDL.Tree.Task, | ||
inputs: WDLBindings, | ||
inputs_search_path: list[str], | ||
inputs_search_path: List[str], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mr-c also changed to using the new generic support in the base list
, dict
, etc. instead of needing to import the versions from typing
, so we shouldn't undo that.
…ere/toil into issues/5114-filesize-sniffing
…14-filesize-sniffing
…14-filesize-sniffing
…ued imports in job + get rid of wdl dependency in job.py
Co-authored-by: Michael R. Crusoe <1330696+mr-c@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still don't think the idea of hoping to notice via exception whether we have exhausted our disk space is going to work. Some backends just kill the job instead of giving you an exception, whereas others let you plow right through your disk limit and interfere with other jobs. (Usually other jobs in the workflow, of which there shouldn't really be any of note during file import, but it's still not a thing jobs are meant to knowingly do.)
I am also still dubious of the extract_file_uri_once
design being the best approach there (who would think to themselves "I want to get the file URI but only if it hasn't already been put in the cache"?). But it kind of has to be the shape it is to be mapped over the files, so maybe it really is the best we can do?
Dict, | ||
Iterator, | ||
List, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need Dict
and List
anymore because we can use dict
and list
now.
importer: str | None = None, | ||
execution_dir: str | None = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might actually still need Optional
on 3.9.
if file_basename == "": | ||
# We can't have files with no basename because we need to | ||
# download them at that basename later. | ||
raise RuntimeError( | ||
f"File {candidate_uri} has no basename and so cannot be a WDL File" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message is WDL-specific. I'm not sure if it's OK to impose the constraint on CWL as well, but if we do we should just complain generically that the file has no basename.
if file_basename == "": | |
# We can't have files with no basename because we need to | |
# download them at that basename later. | |
raise RuntimeError( | |
f"File {candidate_uri} has no basename and so cannot be a WDL File" | |
) | |
if file_basename == "": | |
# We can't have files with no basename because we need to | |
# download them at that basename later in WDL. | |
raise RuntimeError( | |
f"File {candidate_uri} has no basename" | |
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have another empty basename check in the WDL-specific code; is this one maybe redundant?
streaming, so if true, assume streaming works and don't give the worker a lot of disk space to work with. | ||
If streaming fails, the worker will run out of resources and allocate a child job to handle the import with enough disk space. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need to indent this for the docs to parse it right.
streaming, so if true, assume streaming works and don't give the worker a lot of disk space to work with. | |
If streaming fails, the worker will run out of resources and allocate a child job to handle the import with enough disk space. | |
streaming, so if true, assume streaming works and don't give the worker a lot of disk space to work with. | |
If streaming fails, the worker will run out of resources and allocate a child job to handle the import with enough disk space. |
try: | ||
return self.import_files(self.filenames, file_store.jobStore) | ||
except OSError as e: | ||
# If the worker crashes due to running out of disk space and was not trying to | ||
# stream the file import, then try a new import job without streaming by actually giving | ||
# the worker enough disk space | ||
# OSError 28 is no space left on device | ||
if e.errno == 28 and self.stream is True: | ||
non_streaming_import = WorkerImportJob( | ||
self.filenames, self.disk_size, stream=False | ||
) | ||
self.addChild(non_streaming_import) | ||
return non_streaming_import.rv() | ||
else: | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a good approach; we don't always sandbox jobs to keep them within their requested disk space, so if one job goes over its disk request it can make a different job fail due to not having enough space for its temporary files.
This might honestly be better if we didn't handle the case where the attempted streaming import went over its disk space limit, and just made it the user's problem to set a disk space limit big enough if they have any imports that can't actually stream?
Or we could implement a flag on the job store import method to only allow streaming and fail if streaming is not possible, and catch that and do the fallback.
Also, if we hit the disk space limit after importing several files already, when do we delete those imported copies? It looks like we will leave them behind in the job store and then re-import the same files non-streaming with more disk space.
Closes #5114
Changelog Entry
To be copied to the draft changelog by merger:
--importWorkersDisk
replaced with--importWorkersThreshold
. This specifies the threshold where files will begin to be imported on individual jobs. Small files will be batched into the same import job up to this threshold.Reviewer Checklist
issues/XXXX-fix-the-thing
in the Toil repo, or from an external repo.camelCase
that want to be insnake_case
.docs/running/{cliOptions,cwl,wdl}.rst
Merger Checklist