Skip to content

Commit

Permalink
perf(sql): use the read pool for read-only database queries in SqlExe…
Browse files Browse the repository at this point in the history
…cutionRepository
  • Loading branch information
dbyron-sf authored and kirangodishala committed Jan 2, 2025
1 parent 78abe77 commit 05d9123
Showing 1 changed file with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ class SqlExecutionRepository(
}

override fun isCanceled(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(readPoolName) {
return jooq.fetchExists(
jooq.selectFrom(type.tableName)
.where(id.toWhereCondition())
Expand Down Expand Up @@ -420,7 +420,7 @@ class SqlExecutionRepository(
}

private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?): Observable<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.selectExecutions(
type,
fields = selectExecutionFields(compressionProperties) + field("status"),
Expand Down Expand Up @@ -449,7 +449,7 @@ class SqlExecutionRepository(
}

override fun retrievePipelinesForApplication(application: String): Observable<PipelineExecution> =
withPool(poolName) {
withPool(readPoolName) {
Observable.from(
fetchExecutions { pageSize, cursor ->
selectExecutions(PIPELINE, pageSize, cursor) {
Expand Down Expand Up @@ -589,7 +589,7 @@ class SqlExecutionRepository(
// When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which
// fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands
// of executions triggered costly full table scans.
withPool(poolName) {
withPool(readPoolName) {
val select = if (criteria.statuses.isEmpty() || criteria.statuses.size == ExecutionStatus.values().size) {
jooq.selectExecutions(
PIPELINE,
Expand Down Expand Up @@ -633,7 +633,7 @@ class SqlExecutionRepository(
criteria: ExecutionCriteria,
sorter: ExecutionComparator?
): MutableList<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
return jooq.selectExecutions(
ORCHESTRATION,
conditions = {
Expand Down Expand Up @@ -749,7 +749,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?): List<String> {
withPool(poolName) {
withPool(readPoolName) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -772,7 +772,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?, minExecutions: Int): List<String> {
withPool(poolName) {
withPool(readPoolName) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -798,7 +798,7 @@ class SqlExecutionRepository(
}

override fun countActiveExecutions(): ActiveExecutionsReport {
withPool(poolName) {
withPool(readPoolName) {
val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1))

val orchestrationsQuery = jooq.selectCount()
Expand Down Expand Up @@ -827,7 +827,7 @@ class SqlExecutionRepository(
buildTimeEndBoundary: Long,
executionCriteria: ExecutionCriteria
): List<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.select(selectExecutionFields(compressionProperties))
.from(PIPELINE.tableName)
.join(
Expand Down Expand Up @@ -907,7 +907,7 @@ class SqlExecutionRepository(
}

override fun hasExecution(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(readPoolName) {
return jooq.selectCount()
.from(type.tableName)
.where(id.toWhereCondition())
Expand All @@ -916,7 +916,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllExecutionIds(type: ExecutionType): MutableList<String> {
withPool(poolName) {
withPool(readPoolName) {
return jooq.select(field("id")).from(type.tableName).fetch("id", String::class.java)
}
}
Expand All @@ -936,7 +936,7 @@ class SqlExecutionRepository(
): Pair<String, String?> {
if (isULID(id)) return Pair(id, null)

withPool(poolName) {
withPool(readPoolName) {
val ts = (timestamp ?: System.currentTimeMillis())
val row = ctx.select(field("id"))
.from(table)
Expand Down Expand Up @@ -1293,7 +1293,7 @@ class SqlExecutionRepository(
cursor: String?,
where: ((SelectJoinStep<Record>) -> SelectConditionStep<Record>)? = null
): Collection<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.selectExecutions(
type,
conditions = {
Expand Down

0 comments on commit 05d9123

Please sign in to comment.