diff --git a/build.gradle.kts b/build.gradle.kts index 883cf3b..746d01e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,14 +1,14 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile plugins { - kotlin("jvm") version "1.6.21" - id("org.jetbrains.dokka") version "1.6.21" + kotlin("jvm") version "1.8.0" + id("org.jetbrains.dokka") version "1.7.20" java `maven-publish` } group = "org.veupathdb.lib" -version = "1.3.5" +version = "1.3.6" repositories { mavenLocal() @@ -29,13 +29,13 @@ dependencies { implementation("org.slf4j:slf4j-api:1.7.36") // Jackson - implementation(platform("com.fasterxml.jackson:jackson-bom:2.13.4")) + implementation(platform("com.fasterxml.jackson:jackson-bom:2.14.2")) implementation("com.fasterxml.jackson.core:jackson-databind") implementation("org.veupathdb.lib:jackson-singleton:3.0.0") // DB implementation("com.zaxxer:HikariCP:5.0.1") - implementation("org.postgresql:postgresql:42.5.0") + implementation("org.postgresql:postgresql:42.5.1") // S3 implementation("org.veupathdb.lib.s3:s34k-minio:0.3.6+s34k-0.7.2") diff --git a/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt b/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt index c07fbbe..4639afc 100644 --- a/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt +++ b/src/main/kotlin/org/veupathdb/lib/compute/platform/AsyncPlatform.kt @@ -123,7 +123,7 @@ object AsyncPlatform { throw IllegalArgumentException("Attempted to submit a job to nonexistent queue '$queue'.") // Lookup the job to see if it already exists. - getJob(job.jobID) + val exists = getJob(job.jobID) // If it does exist ?.also { // And it is not owned by this service instance @@ -131,9 +131,16 @@ object AsyncPlatform { // Throw an exception throw IllegalStateException("Attempted to submit a job that would overwrite an existing job owned by another campus (${job.jobID})") } - - // Record the new job in the database - QueueDB.submitJob(queue, job.jobID, job.config?.toString(), job.inputs.keys) + .let { it != null } + + // If the job already exists + if (exists) + // Reset the job status to queued and update the queue name + QueueDB.markJobAsQueued(job.jobID, queue) + // Else, if the job does not already exist + else + // Record the new job in the database + QueueDB.submitJob(queue, job.jobID, job.config?.toString(), job.inputs.keys) // Remove any previous workspace at this location S3.deleteWorkspace(job.jobID, false) diff --git a/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/db/QueueDB.kt b/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/db/QueueDB.kt index 2333a22..2a62551 100644 --- a/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/db/QueueDB.kt +++ b/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/db/QueueDB.kt @@ -72,6 +72,11 @@ internal object QueueDB { return ds!!.connection.use { GetExpiredJobs(it, cutoff) } } + @JvmStatic + fun markJobAsQueued(jobID: HashID, queue: String) { + Log.debug("Marking job {} as queued", jobID) + ds!!.connection.use { MarkJobQueued(it, jobID, queue) } + } /** * Marks the target job as expired in the database. @@ -86,7 +91,6 @@ internal object QueueDB { ds!!.connection.use { MarkJobExpired(it, jobID) } } - /** * Marks the target job as failed in the database. * diff --git a/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/db/queries/update/queue-job.kt b/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/db/queries/update/queue-job.kt new file mode 100644 index 0000000..b10f672 --- /dev/null +++ b/src/main/kotlin/org/veupathdb/lib/compute/platform/intern/db/queries/update/queue-job.kt @@ -0,0 +1,31 @@ +package org.veupathdb.lib.compute.platform.intern.db.queries.update + +import org.veupathdb.lib.hash_id.HashID +import java.sql.Connection + +private const val SQL = """ + UPDATE + compute.jobs + SET + queue = ? + , status = 'queued' + WHERE + job_id = ? +""" + +/** + * Resets the target job to queued in the database. + * + * @param con Open database connection to be used for the query. + * + * @param jobID Hash ID of the job to mark as expired. + * + * @param queue Name of the new queue the job was submitted to. + */ +internal fun MarkJobQueued(con: Connection, jobID: HashID, queue: String) { + con.prepareStatement(SQL).use { ps -> + ps.setString(1, queue) + ps.setBytes(2, jobID.bytes) + ps.execute() + } +} \ No newline at end of file