Skip to content

Commit

Permalink
Merge pull request #41 from VEuPathDB/issue-40
Browse files Browse the repository at this point in the history
allow expiring and deleting incomplete jobs
  • Loading branch information
Foxcapades authored Mar 16, 2023
2 parents 4c8bc52 + 3a49690 commit 2998988
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package org.veupathdb.lib.compute.platform

import org.slf4j.LoggerFactory
import org.veupathdb.lib.compute.platform.config.AsyncPlatformConfig
import org.veupathdb.lib.compute.platform.errors.IncompleteJobException
import org.veupathdb.lib.compute.platform.errors.UnownedJobException
import org.veupathdb.lib.compute.platform.intern.JobPruner
import org.veupathdb.lib.compute.platform.intern.db.DatabaseMigrator
Expand Down Expand Up @@ -225,9 +224,8 @@ object AsyncPlatform {
* Deletes the target job only if it exists, is owned by the current service
* or process, and is in a completed status.
*
* If the target job does not exist, is not owned by the current service or
* process, or is not in a completed status, this method throws an
* [IllegalStateException].
* If the target job is not owned by the current service instance, this method
* throws an [UnownedJobException].
*
* Callers should first use [getJob] to test the completion and ownership
* statuses of the job before attempting to use this method.
Expand All @@ -244,9 +242,10 @@ object AsyncPlatform {
Log.debug("Deleting job {}", jobID)

// Assert that the job is both owned by this process and is complete
(QueueDB.getJob(jobID)
?: throw UnownedJobException("Attempted to delete unowned job $jobID"))
.finished ?: throw IncompleteJobException("Attempted to delete incomplete job $jobID")
with(QueueDB.getJob(jobID) ?: throw UnownedJobException("Attempted to delete unowned job $jobID")) {
if (finished == null)
Log.info("deleting incomplete job {}", jobID)
}

QueueDB.deleteJob(jobID)
S3.deleteWorkspace(jobID)
Expand Down Expand Up @@ -282,9 +281,6 @@ object AsyncPlatform {
* If the target job is not owned by the current Async Platform instance, an
* [UnownedJobException] will be thrown.
*
* If the target job is not yet in a 'finished' state (complete or failed), an
* [IncompleteJobException] will be thrown.
*
* @param jobID ID of the job to expire.
*
* @since 1.5.0
Expand All @@ -295,9 +291,10 @@ object AsyncPlatform {

// Verify that this job exists and is owned bt the current platform
// instance.
(QueueDB.getJob(jobID)
?: throw UnownedJobException("Attempted to expire unowned job $jobID"))
.finished ?: throw IncompleteJobException("Attempted expire to incomplete job $jobID")
with(QueueDB.getJob(jobID) ?: throw UnownedJobException("Attempted to expire unowned job $jobID")) {
if (finished == null)
Log.info("expiring incomplete job {}", jobID)
}

QueueDB.markJobAsExpired(jobID)
S3.expireWorkspace(jobID)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ internal class JobExecutionHandler(private val executor: JobExecutor) {
// Lookup the job in the DB to get the input file list.
val dbJob = QueueDB.getJobInternal(jobID)

// If the job wasn't found something has gone terribly wrong
// If the job wasn't found the job was most likely deleted.
if (dbJob == null) {
Log.error("db job lookup failed in job execution for job {}", jobID)
return PlatformJobResultStatus.Failure
Log.error("job {} was deleted and is being aborted", jobID)
return PlatformJobResultStatus.Aborted
}

// If the job does have input files
Expand Down Expand Up @@ -81,16 +81,20 @@ internal class JobExecutionHandler(private val executor: JobExecutor) {

// Verify that the job is still valid (hasn't been deleted or expired
// while it was waiting in the queue).
if (!jobIsStillRunnable(jobID))
if (!jobIsStillRunnable(jobID)) {
Log.info("aborting job {} for no longer being in a runnable state (deleted or expired)", jobID)
return PlatformJobResultStatus.Aborted
}

// Execute the job via the given JobExecutor implementation.
val res = executor.execute(JobContext(jobID, conf, workspace))

// Verify that the job is _still_ still valid (didn't get deleted or
// expired out from under us while we were running the executor).
if (!jobIsStillRunnable(jobID))
if (!jobIsStillRunnable(jobID)) {
Log.info("aborting job {} for no longer being in a runnable state (deleted or expired)", jobID)
return PlatformJobResultStatus.Aborted
}

// Persist the outputs of the job to S3.
S3.persistFiles(jobID, workspace.getFiles(res.outputFiles))
Expand All @@ -101,6 +105,7 @@ internal class JobExecutionHandler(private val executor: JobExecutor) {
// Verify that the job wasn't invalidated while we were busy writing files
// to S3.
if (jobWasInvalidated(jobID)) {
Log.info("aborting job {} for no longer being in a runnable state (deleted or expired)", jobID)
S3.wipeWorkspace(jobID)
return PlatformJobResultStatus.Aborted
}
Expand Down

0 comments on commit 2998988

Please sign in to comment.