Skip to content

Commit

Permalink
Banish ghost jobs (#4563)
Browse files Browse the repository at this point in the history
* Revise __deepcopy__ to banish ghost jobs

I was seeing messages with my new logging like:

[2023-08-15T17:40:32-0400] [MainThread] [D] [toil.fileStores.cachingFileStore] Starting commit of 'down' kind-down/instance-6thju8v8 v14 forked from 'down' kind-down/instance-6thju8v8 v26

That's a pretty clear indicator that the deep copy of the job
description is not producing a proper copy of the version I just handed
it, so I'm reimplementing the deep copy differently on the theory that
the basic approach is doomed.

* Quiet debugging
  • Loading branch information
adamnovak authored Aug 17, 2023
1 parent 9be96b1 commit acbbf86
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 26 deletions.
7 changes: 6 additions & 1 deletion src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1814,6 +1814,11 @@ def startCommit(self, jobState=False):
# might be necessary for later jobs to see earlier jobs' deleted
# before they are committed?

logger.debug('Starting commit of %s forked from %s', state_to_commit, self.jobDesc)
# Make sure the deep copy isn't summoning ghosts of old job
# versions. It must be as new or newer at this point.
self.jobDesc.check_new_version(state_to_commit)

# Bump the original's version since saving will do that too and we
# don't want duplicate versions.
self.jobDesc.reserve_versions(1 if len(state_to_commit.filesToDelete) == 0 else 2)
Expand Down Expand Up @@ -1848,7 +1853,7 @@ def startCommitThread(self, state_to_commit: Optional[JobDescription]):
if state_to_commit is not None:
# Do all the things that make this job not redoable

logger.debug('Committing file deletes and job state changes asynchronously')
logger.debug('Committing file deletes and job state changes asynchronously from %s', state_to_commit)

# Complete the job
self.jobStore.update_job(state_to_commit)
Expand Down
41 changes: 20 additions & 21 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,28 +450,15 @@ def __copy__(self) -> "Requirer":

def __deepcopy__(self, memo: Any) -> "Requirer":
"""Return a semantically-deep copy of the object, for :meth:`copy.deepcopy`."""
# We used to use <https://stackoverflow.com/a/40484215> but that was
# discovered to not actually work right, because you would get the
# copy, if later copied again, stamping out copies of the *original*
# object, due to putting back a method as a member that was already
# bound to a self parameter.
# We used to use <https://stackoverflow.com/a/40484215> and
# <https://stackoverflow.com/a/71125311> but that would result in
# copies sometimes resurrecting weirdly old job versions. So now we
# just actually implement __deepcopy__.

# So we have to also tinker with the method binding as noted in
# <https://stackoverflow.com/a/71125311>.
# TODO: What's the default implementation actually?

# Hide this override
implementation = self.__deepcopy__
self.__deepcopy__ = None # type: ignore[assignment]

# Do the deepcopy which omits the config via __getstate__ override
clone = copy.deepcopy(self, memo)

# Put back the override on us
self.__deepcopy__ = implementation # type: ignore[assignment]

# Bind the override to the copy and put it on the copy
clone.__deepcopy__ = types.MethodType(implementation.__func__, clone) # type: ignore[assignment]
clone = type(self).__new__(self.__class__)
state = self.__getstate__()
clone_state = copy.deepcopy(state, memo)
clone.__dict__.update(clone_state)

if self._config is not None:
# Share a config reference
Expand Down Expand Up @@ -871,6 +858,8 @@ def makeString(x: Union[str, bytes, None]) -> str:
# Every time we update a job description in place in the job store, we
# increment this.
self._job_version = 0
# And we log who made the version (by PID)
self._job_version_writer = 0

# Human-readable names of jobs that were run as part of this job's
# invocation, starting with this job
Expand Down Expand Up @@ -1060,6 +1049,14 @@ def replace(self, other: "JobDescription") -> None:
raise RuntimeError("Trying to take on the ID of anothe job while in the process of being committed!")

self._job_version = other._job_version
self._job_version_writer = os.getpid()

def check_new_version(self, other: "JobDescription") -> None:
"""
Make sure a prospective new version of the JobDescription is actually moving forward in time and not backward.
"""
if other._job_version < self._job_version:
raise RuntimeError(f"Cannot replace {self} from PID {self._job_version_writer} with older version {other} from PID {other._job_version_writer}")

def addChild(self, childID: str) -> None:
"""Make the job with the given ID a child of the described job."""
Expand Down Expand Up @@ -1244,6 +1241,7 @@ def reserve_versions(self, count: int) -> None:
Reserve a job version number for later, for journaling asynchronously.
"""
self._job_version += count
self._job_version_writer = os.getpid()
logger.debug("Skip ahead to job version: %s", self)

def pre_update_hook(self) -> None:
Expand All @@ -1253,6 +1251,7 @@ def pre_update_hook(self) -> None:
Called by the job store.
"""
self._job_version += 1
self._job_version_writer = os.getpid()
logger.debug("New job version: %s", self)

def get_job_kind(self) -> str:
Expand Down
9 changes: 5 additions & 4 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,18 +252,19 @@ def update_job(self, job):

job.pre_update_hook()

dest_filename = self._get_job_file_name(job.jobStoreID)

# The job is serialised to a file suffixed by ".new"
# We insist on creating the file; an existing .new file indicates
# multiple simultaneous attempts to update the job, which will lose
# updates.
# The file is then moved to its correct path.
# Atomicity guarantees use the fact the underlying file systems "move"
# Atomicity guarantees use the fact the underlying file system's "move"
# function is atomic.
with open(self._get_job_file_name(job.jobStoreID) + ".new", 'xb') as f:
with open(dest_filename + ".new", 'xb') as f:
pickle.dump(job, f)
# This should be atomic for the file system
os.rename(self._get_job_file_name(job.jobStoreID) + ".new", self._get_job_file_name(job.jobStoreID))

os.rename(dest_filename + ".new", dest_filename)
def delete_job(self, job_id):
# The jobStoreID is the relative path to the directory containing the job,
# removing this directory deletes the job.
Expand Down
1 change: 1 addition & 0 deletions src/toil/toilState.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def reset_job(self, job_id: str) -> None:
if job_id in self.__job_database:
# Update the one true copy in place
old_truth = self.__job_database[job_id]
old_truth.check_new_version(new_truth)
old_truth.__dict__.update(new_truth.__dict__)
else:
# Just keep the new one
Expand Down

0 comments on commit acbbf86

Please sign in to comment.