Skip to content

Commit

Permalink
Merge pull request #53 from VEuPathDB/broken-job-list
Browse files Browse the repository at this point in the history
List failed jobs
  • Loading branch information
Foxcapades authored Nov 9, 2023
2 parents 907d3de + c9bc061 commit 0bd92c9
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 5 deletions.
7 changes: 4 additions & 3 deletions lib/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@ plugins {
}

group = "org.veupathdb.lib"
version = "1.5.4"
version = "1.6.0"


dependencies {
implementation(kotlin("stdlib-jdk8"))

// Logging
implementation("org.slf4j:slf4j-api:1.7.36")

// Jackson
implementation(platform("com.fasterxml.jackson:jackson-bom:2.15.3"))
implementation("com.fasterxml.jackson.core:jackson-databind")
implementation("org.veupathdb.lib:jackson-singleton:3.0.0")
implementation("org.veupathdb.lib:jackson-singleton:3.1.1")

// DB
implementation("com.zaxxer:HikariCP:5.0.1")
Expand All @@ -37,7 +38,7 @@ dependencies {
implementation("io.prometheus:simpleclient_common:0.16.0")

// Misc & Utils
api("org.veupathdb.lib:hash-id:1.0.2")
api("org.veupathdb.lib:hash-id:1.1.0")

// Testing
testImplementation(kotlin("test"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.slf4j.LoggerFactory
import org.veupathdb.lib.compute.platform.config.AsyncPlatformConfig
import org.veupathdb.lib.compute.platform.errors.UnownedJobException
import org.veupathdb.lib.compute.platform.intern.JobPruner
import org.veupathdb.lib.compute.platform.intern.db.AsyncDBJob
import org.veupathdb.lib.compute.platform.intern.db.DatabaseMigrator
import org.veupathdb.lib.compute.platform.intern.db.QueueDB
import org.veupathdb.lib.compute.platform.intern.jobs.JobExecutors
Expand Down Expand Up @@ -274,13 +275,17 @@ object AsyncPlatform {
*
* @param jobID Hash ID of the job that should be deleted.
*
* @param throwOnNotExists Whether an exception should be thrown if the target
* job does not exist in S3.
*
* @throws IllegalStateException If the target job does not exist, is not
* owned by the current service or process, or is not in a completed status.
*
* @since 1.2.0
*/
@JvmStatic
fun deleteJob(jobID: HashID) {
@JvmOverloads
fun deleteJob(jobID: HashID, throwOnNotExists: Boolean = true) {
Log.debug("Deleting job {}", jobID)

// Assert that the job is both owned by this process and is complete
Expand All @@ -290,7 +295,7 @@ object AsyncPlatform {
}

QueueDB.deleteJob(jobID)
S3.deleteWorkspace(jobID)
S3.deleteWorkspace(jobID, throwOnNotExists)
}

/**
Expand Down Expand Up @@ -340,4 +345,19 @@ object AsyncPlatform {

JobManager.setJobExpired(jobID)
}

/**
* Fetches a list of jobs owned by the current campus that are in the `failed`
* status.
*
* @return The retrieved list of broken jobs.
*
* @since 1.6.0
*/
@JvmStatic
fun getOwnedBrokenJobs(): List<AsyncJob> {
Log.debug("Listing broken jobs owned by the current campus")
return QueueDB.getFailedJobs()
.use { stream -> stream.map { AsyncDBJob(it, -1) }.toList() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,21 @@ internal object QueueDB {
return ListRunningJobs(ds.connection)
}

/**
* Retrieves a stream of failed job records.
*
* The returned stream **MUST** be closed when the caller is done with it to
* prevent DB connection leaks.
*
* @return Stream of queued jobs ordered by job creation date.
*/
fun getFailedJobs(): Stream<JobRecord> {
Log.debug("Getting list of queued jobs.")

// Connection is not closed here as the caller is responsible for closing
// the stream.
return ListFailedJobs(ds.connection)
}
/**
* Retrieves a stream of all job records.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.veupathdb.lib.compute.platform.intern.db.queries.select

import org.veupathdb.lib.compute.platform.intern.db.model.JobRecord
import org.veupathdb.lib.compute.platform.intern.db.util.stream
import java.sql.Connection
import java.sql.ResultSet
import java.util.stream.Stream

private const val SQL = """
SELECT
job_id
, status
, queue
, config
, input_files
, created
, last_accessed
, grabbed
, finished
FROM
compute.jobs
WHERE
status = 'failed'
ORDER BY
created ASC
"""

/**
* Fetches a stream over all the jobs in the database currently in the
* `failed` status, ordered by job creation date ascending.
*
* The returned stream wraps the live database [ResultSet] and **MUST** be
* closed when the caller is done with it.
*
* @param con Open database connection that will be used to execute the query.
*
* @return A stream over the `in-progress` jobs in the database.
*
* **WARNING**: The returned stream **MUST** be closed on completion to avoid DB
* connection leaks.
*/
internal fun ListFailedJobs(con: Connection): Stream<JobRecord> {
// Nothing is closed in this method as the caller is responsible for closing
// the returned stream (which will close the connection, statement, and
// result-set)
return con.createStatement().executeQuery(SQL).stream().map(ResultSet::toJobRow)
}

0 comments on commit 0bd92c9

Please sign in to comment.