Skip to content

Commit

Permalink
Merge pull request #59 from VEuPathDB/issue-58
Browse files Browse the repository at this point in the history
Message ack timeout
  • Loading branch information
Foxcapades authored Aug 15, 2024
2 parents ce98828 + 05b41b7 commit d26fb1a
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 31 deletions.
6 changes: 3 additions & 3 deletions lib/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
}

group = "org.veupathdb.lib"
version = "1.7.5"
version = "1.8.0"


dependencies {
Expand All @@ -23,15 +23,15 @@ dependencies {
implementation("org.veupathdb.lib:jackson-singleton:3.1.1")

// DB
implementation("com.zaxxer:HikariCP:5.0.1")
implementation("com.zaxxer:HikariCP:5.1.0")
implementation("org.postgresql:postgresql:42.7.3")

// S3
api("org.veupathdb.lib.s3:s34k-minio:0.7.2+s34k-0.11.0")
api("org.veupathdb.lib.s3:workspaces:4.1.2")

// Rabbit
implementation("org.veupathdb.lib:rabbit-job-queue:1.2.1")
implementation("org.veupathdb.lib:rabbit-job-queue:2.0.0")

// Metrics
implementation("io.prometheus:simpleclient:0.16.0")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package org.veupathdb.lib.compute.platform.config

import kotlin.time.Duration
import kotlin.time.Duration.Companion.minutes

private const val DefaultRabbitMQPort = 5672
private const val DefaultWorkerCount = 5
private const val DefaultMessageAckTimeoutMinutes = 30

/**
* Configuration entry for a single RabbitMQ queue.
Expand Down Expand Up @@ -29,6 +33,14 @@ private const val DefaultWorkerCount = 5
* queue.
*
* Default value is `5`.
*
* @param messageAckTimeout Timeout window in which a queue message must be
* acknowledged. RabbitMQ will kill channels on which a message has not been
* acknowledged within this window.
*
* This value *MUST* be at least 5 minutes.
*
* Default value is 30 minutes.
*/
class AsyncQueueConfig(
internal val id: String,
Expand All @@ -37,6 +49,7 @@ class AsyncQueueConfig(
internal val host: String,
internal val port: Int,
internal val workers: Int,
internal val messageAckTimeout: Duration,
) {

/**
Expand All @@ -55,7 +68,7 @@ class AsyncQueueConfig(
* @param host Hostname for the target RabbitMQ instance.
*/
constructor(id: String, username: String, password: String, host: String) :
this(id, username, password, host, DefaultRabbitMQPort, DefaultWorkerCount)
this(id, username, password, host, DefaultRabbitMQPort, DefaultWorkerCount, DefaultMessageAckTimeoutMinutes.minutes)

/**
* Constructs a new [AsyncQueueConfig] instance.
Expand All @@ -78,7 +91,7 @@ class AsyncQueueConfig(
* Default value is `5`.
*/
constructor(id: String, username: String, password: String, host: String, workers: Int) :
this(id, username, password, host, DefaultRabbitMQPort, workers)
this(id, username, password, host, DefaultRabbitMQPort, workers, DefaultMessageAckTimeoutMinutes.minutes)

companion object {
@JvmStatic
Expand Down Expand Up @@ -135,6 +148,17 @@ class AsyncQueueConfig(
*/
var workers = DefaultWorkerCount

/**
* Message acknowledgement timeout value to set for this queue.
*
* This value should be long enough to accommodate the longest expected job
* runtimes for the queue. RabbitMQ itself will disconnect the queue if a
* message is not acknowledged within the configured timeout window.
*
* Default value is 30 minutes.
*/
var messageAckTimeout = DefaultMessageAckTimeoutMinutes.minutes

/**
* Sets the unique identifier for the queue.
*/
Expand Down Expand Up @@ -183,6 +207,17 @@ class AsyncQueueConfig(
return this
}

/**
* Sets the [messageAckTimeout] value to the given duration.
*/
fun messageAckTimeout(timeout: Duration) = apply { this.messageAckTimeout = timeout }

/**
* Sets the [messageAckTimeout] value to a duration of the given value in
* minutes.
*/
fun messageAckTimeoutMinutes(timeout: Int) = apply { this.messageAckTimeout = timeout.minutes }

fun build(): AsyncQueueConfig {
// We check null and blank because these are likely coming from env vars
// and docker compose will set blank values for vars defined in the
Expand All @@ -208,7 +243,11 @@ class AsyncQueueConfig(
if (host!!.isBlank())
throw IllegalStateException("Cannot build an AsyncQueueConfig instance with a blank host!")

return AsyncQueueConfig(id!!, username!!, password!!, host!!, port, workers)
if (messageAckTimeout < 5.minutes)
throw IllegalStateException("Message ack timeout values less than 5 minutes are likely to cause undefined" +
" behavior in RabbitMQ")

return AsyncQueueConfig(id!!, username!!, password!!, host!!, port, workers, messageAckTimeout)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@ import com.fasterxml.jackson.databind.JsonNode
import org.slf4j.LoggerFactory
import org.veupathdb.lib.compute.platform.JobManager
import org.veupathdb.lib.compute.platform.config.AsyncQueueConfig
import org.veupathdb.lib.compute.platform.intern.db.QueueDB
import org.veupathdb.lib.compute.platform.intern.jobs.JobExecContext
import org.veupathdb.lib.compute.platform.intern.jobs.JobExecutors
import org.veupathdb.lib.compute.platform.intern.metrics.JobMetrics
import org.veupathdb.lib.compute.platform.intern.metrics.QueueMetrics
import org.veupathdb.lib.compute.platform.intern.s3.S3
import org.veupathdb.lib.compute.platform.job.PlatformJobResultStatus
import org.veupathdb.lib.hash_id.HashID
import org.veupathdb.lib.rabbit.jobs.QueueConfig
import org.veupathdb.lib.rabbit.jobs.QueueDispatcher
import org.veupathdb.lib.rabbit.jobs.QueueWorker
import org.veupathdb.lib.rabbit.jobs.JobQueueDispatcher
import org.veupathdb.lib.rabbit.jobs.JobQueueExecutor
import org.veupathdb.lib.rabbit.jobs.config.QueueConfig
import org.veupathdb.lib.rabbit.jobs.model.ErrorNotification
import org.veupathdb.lib.rabbit.jobs.model.JobDispatch
import org.veupathdb.lib.rabbit.jobs.model.SuccessNotification
Expand All @@ -33,31 +31,36 @@ internal class QueueWrapper(conf: AsyncQueueConfig) {

val name = conf.id

private val dispatch: QueueDispatcher
private val handler: QueueWorker
private val dispatch: JobQueueDispatcher
private val handler: JobQueueExecutor

init {
Log.info("initializing queue wrapper for queue {}", name)

val qc = QueueConfig().also {
it.hostname = conf.host
it.hostPort = conf.port
it.username = conf.username
it.password = conf.password
it.workers = conf.workers

it.jobQueueName = "${conf.id}_jobs"
it.successQueueName = "${conf.id}_success"
it.errorQueueName = "${conf.id}_error"
}
val qc = QueueConfig()
.connection {
hostname = conf.host
hostPort = conf.port
username = conf.username
password = conf.password
}
.executor {
workers = conf.workers
maxJobExecutionTime = conf.messageAckTimeout
}
.apply {
jobQueueName = "${conf.id}_jobs"
successQueueName = "${conf.id}_success"
errorQueueName = "${conf.id}_error"
}

// Setup dispatch end of queue the wrapper
dispatch = QueueDispatcher(qc)
dispatch = JobQueueDispatcher(qc)
dispatch.onError(this::onError)
dispatch.onSuccess(this::onSuccess)

// Setup worker end of the queue wrapper
handler = QueueWorker(qc)
handler = JobQueueExecutor(qc)
handler.onJob(this::onJob)
}

Expand Down Expand Up @@ -98,7 +101,7 @@ internal class QueueWrapper(conf: AsyncQueueConfig) {
}
} catch (e: Throwable) {
Log.error("job execution failed with an exception for job ${job.jobID}", e)
handler.sendError(ErrorNotification(job.jobID, 1, e.message))
handler.sendError(ErrorNotification(jobID = job.jobID, code = 1, message = e.message))
}
}
}
2 changes: 1 addition & 1 deletion readme.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ image::docs/assets/overview.png[]
[source, kotlin]
----
dependencies {
implementation("org.veupathdb.lib:compute-platform:1.6.1")
implementation("org.veupathdb.lib:compute-platform:1.8.0")
}
----

Expand Down
2 changes: 1 addition & 1 deletion test/docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ services:
POSTGRES_PASSWORD: password

rabbit:
image: rabbitmq:3.11.10-management-alpine
image: rabbitmq:3.13.3-management-alpine
ports:
- "5672:5672"

18 changes: 16 additions & 2 deletions test/src/main/kotlin/lcp/main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ import org.veupathdb.lib.compute.platform.config.*
import org.veupathdb.lib.compute.platform.job.JobContext
import org.veupathdb.lib.compute.platform.job.JobExecutor
import org.veupathdb.lib.compute.platform.job.JobResult
import org.veupathdb.lib.s3.s34k.S3Api
import org.veupathdb.lib.s3.s34k.S3Config
import org.veupathdb.lib.s3.s34k.fields.BucketName
import kotlin.time.Duration.Companion.minutes

private val Log = LoggerFactory.getLogger("main.kt")

fun main() {
// Init minio
setupS3ForTests()

// Init test target
initPlatform()

// Run test
Log.info("{}", AsyncPlatform.listJobReferences())
}

Expand All @@ -23,10 +32,10 @@ private fun initPlatform() {
.port(5432)
.username("postgres")
.password("password")
.poolSize(1)
.poolSize(5)
.build())
.s3Config(AsyncS3Config("localhost", 9000, false, "derp", "minioadmin", "minioadmin", "flumps"))
.addQueue(AsyncQueueConfig("queue", "guest", "guest", "localhost", 5672, 1))
.addQueue(AsyncQueueConfig("queue", "guest", "guest", "localhost", 5672, 1, 5.minutes))
.jobConfig(AsyncJobConfig({ Executor() }, 1))
.localWorkspaceRoot("/tmp/florp")
.build())
Expand All @@ -39,4 +48,9 @@ class Executor : JobExecutor {
log.info("I'm job {}!", ctx.jobID)
return JobResult.success()
}
}

private fun setupS3ForTests() {
val client = S3Api.newClient(S3Config("localhost", 9000u, false, "minioadmin", "minioadmin"))
client.buckets.createIfNotExists(BucketName("derp"))
}

0 comments on commit d26fb1a

Please sign in to comment.