From 32e52aea4323f3b0c0df17aaa8cbfbdbcbbe967c Mon Sep 17 00:00:00 2001 From: Elizabeth Date: Fri, 17 Mar 2023 09:37:33 -0400 Subject: [PATCH 1/2] updates --- .../lib/compute/platform/AsyncPlatform.kt | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt b/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt index 2d7ae7c..871e1b9 100644 --- a/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt +++ b/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt @@ -12,6 +12,7 @@ import org.veupathdb.lib.compute.platform.intern.s3.S3 import org.veupathdb.lib.compute.platform.intern.ws.ScratchSpaces import org.veupathdb.lib.compute.platform.job.AsyncJob import org.veupathdb.lib.compute.platform.job.JobFileReference +import org.veupathdb.lib.compute.platform.job.JobStatus import org.veupathdb.lib.compute.platform.job.JobSubmission import org.veupathdb.lib.compute.platform.model.JobReference import org.veupathdb.lib.hash_id.HashID @@ -125,18 +126,18 @@ object AsyncPlatform { throw IllegalArgumentException("Attempted to submit a job to nonexistent queue '$queue'.") // Lookup the job to see if it already exists. - val exists = getJob(job.jobID) + val existingJob = getJob(job.jobID) // If it does exist ?.also { - // And it is not owned by this service instance - if (!it.owned) + // And it is not owned by this service instance AND the job is not + // expired, bail here. + if (!it.owned && it.status != JobStatus.Expired) // Throw an exception - throw IllegalStateException("Attempted to submit a job that would overwrite an existing job owned by another campus (${job.jobID})") + throw IllegalStateException("Attempted to submit a job that would overwrite an existing, non-expired job owned by another campus (${job.jobID})") } - .let { it != null } // If the job already exists - if (exists) + if (existingJob != null && existingJob.owned) // Reset the job status to queued and update the queue name QueueDB.markJobAsQueued(job.jobID, queue) // Else, if the job does not already exist @@ -171,6 +172,21 @@ object AsyncPlatform { QueueDB.getJob(jobID)?.also { // It does... Log.debug("Job found in the managed database") + + val s3Job = S3.getJob(jobID) + + // If the status as determined by looking at S3 does not align with the + // status we last knew in our internal database, then another campus has + // claimed ownership of the job. + if (s3Job != null && s3Job.status != it.status) { + // Delete our DB record for the job and return the S3 instance instead. + QueueDB.deleteJob(jobID) + return s3Job + } + + // The statuses did align, so we (this service instance) presumably still + // own the job. + // update it's last accessed date QueueDB.updateJobLastAccessed(jobID) // and return it From d0f0ced16ffd0b99ac4c4fbbcaee6d30e0fc2766 Mon Sep 17 00:00:00 2001 From: Elizabeth Date: Fri, 17 Mar 2023 09:52:06 -0400 Subject: [PATCH 2/2] update docs --- .../org/veupathdb/lib/compute/platform/AsyncPlatform.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt b/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt index 871e1b9..83458c9 100644 --- a/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt +++ b/lib/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt @@ -114,8 +114,8 @@ object AsyncPlatform { * @throws IllegalArgumentException If the given [queue] value is not a valid * queue ID/name. * - * @throws IllegalStateException If the ID of the given job already exists and - * belongs to another instance of this service. + * @throws IllegalStateException If the ID of the given job already exists, + * belongs to another instance of this service, and is not expired. */ @JvmStatic fun submitJob(queue: String, job: JobSubmission) {