Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql): use a connection pool named "read" for some read operations in SqlExecutionRepository #4803

Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ import java.util.Optional
import org.jooq.DSLContext
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.test.context.junit.jupiter.SpringExtension
import javax.sql.DataSource

@Configuration
class SqlTestConfig {
Expand Down Expand Up @@ -121,7 +123,8 @@ class SqlTestConfig {
registry: Registry,
properties: SqlProperties,
orcaSqlProperties: OrcaSqlProperties,
compressionProperties: ExecutionCompressionProperties
compressionProperties: ExecutionCompressionProperties,
dataSource: DataSource
) = SqlExecutionRepository(
orcaSqlProperties.partitionName,
dsl,
Expand All @@ -131,7 +134,8 @@ class SqlTestConfig {
orcaSqlProperties.stageReadSize,
interlink = null,
compressionProperties = compressionProperties,
pipelineRefEnabled = false
pipelineRefEnabled = false,
dataSource = dataSource
)

@Bean
Expand Down Expand Up @@ -192,4 +196,7 @@ class SqlTestConfig {
"spring.application.name=orcaTest"
]
)
class SqlQueueIntegrationTest : QueueIntegrationTest()
class SqlQueueIntegrationTest : QueueIntegrationTest() {
@MockBean
var dataSource: DataSource? = null
}
2 changes: 2 additions & 0 deletions orca-sql/orca-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ dependencies {
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("dev.minutest:minutest")
testImplementation("com.nhaarman:mockito-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.testcontainers:mysql")
testImplementation("org.testcontainers:postgresql")

testRuntimeOnly("com.mysql:mysql-connector-j")
testRuntimeOnly("org.postgresql:postgresql")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
testRuntimeOnly(project(":keiko-sql")) // so SpringLiquibaseProxy has changelog-keiko.yml
}

test {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ import com.netflix.spinnaker.orca.sql.SqlHealthcheckActivator
import com.netflix.spinnaker.orca.sql.pipeline.persistence.ExecutionStatisticsRepository
import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository
import com.netflix.spinnaker.orca.sql.telemetry.SqlActiveExecutionsMonitor
import java.time.Clock
import java.util.Optional
import javax.sql.DataSource
import liquibase.integration.spring.SpringLiquibase
import net.javacrumbs.shedlock.core.LockProvider
import net.javacrumbs.shedlock.provider.jdbctemplate.JdbcTemplateLockProvider
Expand All @@ -49,10 +52,11 @@ import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.*
import java.time.Clock
import java.util.*
import javax.sql.DataSource
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.context.annotation.Primary

@Configuration
@ConditionalOnProperty("sql.enabled")
Expand All @@ -78,7 +82,8 @@ class SqlConfiguration {
interlink: Optional<Interlink>,
executionRepositoryListeners: Collection<ExecutionRepositoryListener>,
compressionProperties: ExecutionCompressionProperties,
pipelineRefProperties: PipelineRefProperties
pipelineRefProperties: PipelineRefProperties,
dataSource: DataSource
) =
SqlExecutionRepository(
orcaSqlProperties.partitionName,
Expand All @@ -90,7 +95,8 @@ class SqlConfiguration {
interlink = interlink.orElse(null),
executionRepositoryListeners = executionRepositoryListeners,
compressionProperties = compressionProperties,
pipelineRefEnabled = pipelineRefProperties.enabled
pipelineRefEnabled = pipelineRefProperties.enabled,
dataSource = dataSource
).let {
InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "primary"))) as ExecutionRepository
}
Expand All @@ -105,7 +111,8 @@ class SqlConfiguration {
orcaSqlProperties: OrcaSqlProperties,
@Value("\${execution-repository.sql.secondary.pool-name}") poolName: String,
compressionProperties: ExecutionCompressionProperties,
pipelineRefProperties: PipelineRefProperties
pipelineRefProperties: PipelineRefProperties,
dataSource: DataSource
) =
SqlExecutionRepository(
orcaSqlProperties.partitionName,
Expand All @@ -116,7 +123,8 @@ class SqlConfiguration {
orcaSqlProperties.stageReadSize,
poolName,
compressionProperties = compressionProperties,
pipelineRefEnabled = pipelineRefProperties.enabled
pipelineRefEnabled = pipelineRefProperties.enabled,
dataSource = dataSource
).let {
InstrumentedProxy.proxy(registry, it, "sql.executions", mapOf(Pair("repository", "secondary"))) as ExecutionRepository
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ import org.jooq.impl.DSL.table
import org.jooq.impl.DSL.timestampSub
import org.jooq.impl.DSL.value
import org.slf4j.LoggerFactory
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource
import rx.Observable
import java.io.ByteArrayOutputStream
import java.lang.System.currentTimeMillis
import java.nio.charset.StandardCharsets
import java.security.SecureRandom
import java.util.stream.Collectors.toList
import javax.sql.DataSource
import kotlin.collections.Collection
import kotlin.collections.Iterable
import kotlin.collections.Iterator
Expand Down Expand Up @@ -120,10 +122,12 @@ class SqlExecutionRepository(
private val batchReadSize: Int = 10,
private val stageReadSize: Int = 200,
private val poolName: String = "default",
internal var readPoolName: String = "read", /* internal for testing */
private val interlink: Interlink? = null,
private val executionRepositoryListeners: Collection<ExecutionRepositoryListener> = emptyList(),
private val compressionProperties: ExecutionCompressionProperties,
private val pipelineRefEnabled: Boolean
private val pipelineRefEnabled: Boolean,
private val dataSource: DataSource
) : ExecutionRepository, ExecutionStatisticsRepository {
companion object {
val ulid = SpinULID(SecureRandom())
Expand All @@ -133,7 +137,13 @@ class SqlExecutionRepository(
private val log = LoggerFactory.getLogger(javaClass)

init {
log.info("Creating SqlExecutionRepository with partition=$partitionName and pool=$poolName")
// If there's no read pool configured, fall back to the default pool
if ((dataSource !is AbstractRoutingDataSource)
|| (dataSource.resolvedDataSources[readPoolName] == null)) {
readPoolName = poolName
}

log.info("Creating SqlExecutionRepository with partition=$partitionName, pool=$poolName, readPool=$readPoolName")

try {
withPool(poolName) {
Expand Down Expand Up @@ -182,10 +192,8 @@ class SqlExecutionRepository(
validateHandledPartitionOrThrow(execution)

withPool(poolName) {
jooq.transactional {
it.delete(execution.type.stagesTableName)
.where(stageId.toWhereCondition()).execute()
}
jooq.delete(execution.type.stagesTableName)
.where(stageId.toWhereCondition()).execute()
}
}

Expand Down Expand Up @@ -277,7 +285,7 @@ class SqlExecutionRepository(
}

override fun isCanceled(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(readPoolName) {
return jooq.fetchExists(
jooq.selectFrom(type.tableName)
.where(id.toWhereCondition())
Expand Down Expand Up @@ -412,7 +420,7 @@ class SqlExecutionRepository(
}

private fun retrieve(type: ExecutionType, criteria: ExecutionCriteria, partition: String?): Observable<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.selectExecutions(
type,
fields = selectExecutionFields(compressionProperties) + field("status"),
Expand Down Expand Up @@ -441,7 +449,7 @@ class SqlExecutionRepository(
}

override fun retrievePipelinesForApplication(application: String): Observable<PipelineExecution> =
withPool(poolName) {
withPool(readPoolName) {
Observable.from(
fetchExecutions { pageSize, cursor ->
selectExecutions(PIPELINE, pageSize, cursor) {
Expand Down Expand Up @@ -581,7 +589,7 @@ class SqlExecutionRepository(
// When not filtering by status, provide an index hint to ensure use of `pipeline_config_id_idx` which
// fully satisfies the where clause and order by. Without, some lookups by config_id matching thousands
// of executions triggered costly full table scans.
withPool(poolName) {
withPool(readPoolName) {
val select = if (criteria.statuses.isEmpty() || criteria.statuses.size == ExecutionStatus.values().size) {
jooq.selectExecutions(
PIPELINE,
Expand Down Expand Up @@ -625,7 +633,7 @@ class SqlExecutionRepository(
criteria: ExecutionCriteria,
sorter: ExecutionComparator?
): MutableList<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
return jooq.selectExecutions(
ORCHESTRATION,
conditions = {
Expand Down Expand Up @@ -677,17 +685,23 @@ class SqlExecutionRepository(
)
.fetchExecution()

if (execution != null) {
if (!execution.status.isComplete) {
return execution
}
jooq.transactional {
it.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}
if (execution == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A POTENTIAL concern since we ARE querying now from the read pool: Can the orchestration be on a delayed sync, so it'd actually exist in the RW & not be sync'd yet? Wondering if a fallback is needed in such a case. OR this could be handled upstream when it doesn't find it to retry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't seen problems with this, but I agree there's a hole if two orchestrations start very quickly. One use of retrieveByCorrelationId (the only function that calls this one) is in ExecutionLauncher.checkForCorrelatedExecution, called by ExecutionLauncher.start. Same deal for retrievePipelineForCorrelationId below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's a concern - I know of some workflows that use orchestration stuff that could get impacted by this, and read-sync issues are the "biggest" headaches of RO queries based on past. This seems to be the biggest spot that's a POTENTIAL risk as I THINK (and still tracing) the others do retries on an operation that throws an error or doesn't respond. It's possible these upstream may ALSO retry, but not 100% sure YET if that exception is encountered...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, reverted back to withPool(poolName)

throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId")
}

throw ExecutionNotFoundException("No Orchestration found for correlation ID $correlationId")
if (!execution.status.isComplete) {
return execution
}
}

// If we get here, there's an execution with the given correlation id, but
// it's complete, so clean up the correlation_ids table.
withPool(poolName) {
jooq.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}

// Treat a completed execution similar to not finding one at all.
throw ExecutionNotFoundException("Complete Orchestration found for correlation ID $correlationId")
}

override fun retrievePipelineForCorrelationId(correlationId: String): PipelineExecution {
Expand All @@ -705,17 +719,22 @@ class SqlExecutionRepository(
)
.fetchExecution()

if (execution != null) {
if (!execution.status.isComplete) {
return execution
}
jooq.transactional {
it.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}
if (execution == null) {
throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId")
}

throw ExecutionNotFoundException("No Pipeline found for correlation ID $correlationId")
if (!execution.status.isComplete) {
return execution
}
}

// If we get here, there's an execution with the given correlation id, but
// it's complete, so clean up the correlation_ids table.
withPool(poolName) {
jooq.deleteFrom(table("correlation_ids")).where(field("id").eq(correlationId)).execute()
}

throw ExecutionNotFoundException("Complete Pipeline found for correlation ID $correlationId")
}

override fun retrieveBufferedExecutions(): MutableList<PipelineExecution> =
Expand All @@ -730,7 +749,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?): List<String> {
withPool(poolName) {
withPool(readPoolName) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -753,7 +772,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllApplicationNames(type: ExecutionType?, minExecutions: Int): List<String> {
withPool(poolName) {
withPool(readPoolName) {
return if (type == null) {
jooq.select(field("application"))
.from(PIPELINE.tableName)
Expand All @@ -779,7 +798,7 @@ class SqlExecutionRepository(
}

override fun countActiveExecutions(): ActiveExecutionsReport {
withPool(poolName) {
withPool(readPoolName) {
val partitionPredicate = if (partitionName != null) field(name("partition")).eq(partitionName) else value(1).eq(value(1))

val orchestrationsQuery = jooq.selectCount()
Expand Down Expand Up @@ -808,7 +827,7 @@ class SqlExecutionRepository(
buildTimeEndBoundary: Long,
executionCriteria: ExecutionCriteria
): List<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.select(selectExecutionFields(compressionProperties))
.from(PIPELINE.tableName)
.join(
Expand Down Expand Up @@ -888,7 +907,7 @@ class SqlExecutionRepository(
}

override fun hasExecution(type: ExecutionType, id: String): Boolean {
withPool(poolName) {
withPool(readPoolName) {
return jooq.selectCount()
.from(type.tableName)
.where(id.toWhereCondition())
Expand All @@ -897,7 +916,7 @@ class SqlExecutionRepository(
}

override fun retrieveAllExecutionIds(type: ExecutionType): MutableList<String> {
withPool(poolName) {
withPool(readPoolName) {
return jooq.select(field("id")).from(type.tableName).fetch("id", String::class.java)
}
}
Expand All @@ -917,7 +936,7 @@ class SqlExecutionRepository(
): Pair<String, String?> {
if (isULID(id)) return Pair(id, null)

withPool(poolName) {
withPool(readPoolName) {
val ts = (timestamp ?: System.currentTimeMillis())
val row = ctx.select(field("id"))
.from(table)
Expand Down Expand Up @@ -1256,14 +1275,10 @@ class SqlExecutionRepository(
private fun selectExecution(
ctx: DSLContext,
type: ExecutionType,
id: String,
forUpdate: Boolean = false
id: String
): PipelineExecution? {
withPool(poolName) {
val select = ctx.selectExecution(type, compressionProperties).where(id.toWhereCondition())
if (forUpdate) {
select.forUpdate()
}
return select.fetchExecution()
}
}
Expand All @@ -1274,7 +1289,7 @@ class SqlExecutionRepository(
cursor: String?,
where: ((SelectJoinStep<Record>) -> SelectConditionStep<Record>)? = null
): Collection<PipelineExecution> {
withPool(poolName) {
withPool(readPoolName) {
val select = jooq.selectExecutions(
type,
conditions = {
Expand Down
Loading
Loading