Skip to content

Commit

Permalink
Merge pull request #30 from VEuPathDB/issue-29
Browse files Browse the repository at this point in the history
Fix issue #29
  • Loading branch information
Foxcapades committed Mar 1, 2023
2 parents 4006345 + dd1736f commit c14e154
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 10 deletions.
10 changes: 5 additions & 5 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
kotlin("jvm") version "1.6.21"
id("org.jetbrains.dokka") version "1.6.21"
kotlin("jvm") version "1.8.0"
id("org.jetbrains.dokka") version "1.7.20"
java
`maven-publish`
}

group = "org.veupathdb.lib"
version = "1.3.5"
version = "1.3.6"

repositories {
mavenLocal()
Expand All @@ -29,13 +29,13 @@ dependencies {
implementation("org.slf4j:slf4j-api:1.7.36")

// Jackson
implementation(platform("com.fasterxml.jackson:jackson-bom:2.13.4"))
implementation(platform("com.fasterxml.jackson:jackson-bom:2.14.2"))
implementation("com.fasterxml.jackson.core:jackson-databind")
implementation("org.veupathdb.lib:jackson-singleton:3.0.0")

// DB
implementation("com.zaxxer:HikariCP:5.0.1")
implementation("org.postgresql:postgresql:42.5.0")
implementation("org.postgresql:postgresql:42.5.1")

// S3
implementation("org.veupathdb.lib.s3:s34k-minio:0.3.6+s34k-0.7.2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,24 @@ object AsyncPlatform {
throw IllegalArgumentException("Attempted to submit a job to nonexistent queue '$queue'.")

// Lookup the job to see if it already exists.
getJob(job.jobID)
val exists = getJob(job.jobID)
// If it does exist
?.also {
// And it is not owned by this service instance
if (!it.owned)
// Throw an exception
throw IllegalStateException("Attempted to submit a job that would overwrite an existing job owned by another campus (${job.jobID})")
}

// Record the new job in the database
QueueDB.submitJob(queue, job.jobID, job.config?.toString(), job.inputs.keys)
.let { it != null }

// If the job already exists
if (exists)
// Reset the job status to queued and update the queue name
QueueDB.markJobAsQueued(job.jobID, queue)
// Else, if the job does not already exist
else
// Record the new job in the database
QueueDB.submitJob(queue, job.jobID, job.config?.toString(), job.inputs.keys)

// Remove any previous workspace at this location
S3.deleteWorkspace(job.jobID, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ internal object QueueDB {
return ds!!.connection.use { GetExpiredJobs(it, cutoff) }
}

@JvmStatic
fun markJobAsQueued(jobID: HashID, queue: String) {
Log.debug("Marking job {} as queued", jobID)
ds!!.connection.use { MarkJobQueued(it, jobID, queue) }
}

/**
* Marks the target job as expired in the database.
Expand All @@ -86,7 +91,6 @@ internal object QueueDB {
ds!!.connection.use { MarkJobExpired(it, jobID) }
}


/**
* Marks the target job as failed in the database.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.veupathdb.lib.compute.platform.intern.db.queries.update

import org.veupathdb.lib.hash_id.HashID
import java.sql.Connection

private const val SQL = """
UPDATE
compute.jobs
SET
queue = ?
, status = 'queued'
WHERE
job_id = ?
"""

/**
* Resets the target job to queued in the database.
*
* @param con Open database connection to be used for the query.
*
* @param jobID Hash ID of the job to mark as expired.
*
* @param queue Name of the new queue the job was submitted to.
*/
internal fun MarkJobQueued(con: Connection, jobID: HashID, queue: String) {
con.prepareStatement(SQL).use { ps ->
ps.setString(1, queue)
ps.setBytes(2, jobID.bytes)
ps.execute()
}
}

0 comments on commit c14e154

Please sign in to comment.