From d8d3799d26b55ee1b690038d2343bffc9747c9f7 Mon Sep 17 00:00:00 2001 From: kirangodishala Date: Fri, 23 Apr 2021 00:37:41 +0530 Subject: [PATCH 1/8] feat(web): refactor TaskController to support externalized config properties optimize getPipelinesForApplication() by reducing the number of relevant pipeline config ids to query This is a breaking change: TaskController previously looked at these properties: `tasks.days-of-execution-history` `tasks.number-of-old-pipeline-executions-to-include` and now they live in `tasks.controller`. fix(web): give java code access to groovy to fix below error: > Task :orca-web:compileJava FAILED ../orca/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java:19: error: cannot find symbol import com.netflix.spinnaker.orca.controllers.TaskController; ^ symbol: class TaskController location: package com.netflix.spinnaker.orca.controllers 1 error --- orca-web/orca-web.gradle | 7 ++ .../orca/controllers/TaskController.groovy | 109 +++++++++++------- ...TaskControllerConfigurationProperties.java | 48 ++++++++ .../controllers/TaskControllerSpec.groovy | 33 +++--- 4 files changed, 141 insertions(+), 56 deletions(-) create mode 100644 orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java diff --git a/orca-web/orca-web.gradle b/orca-web/orca-web.gradle index 1226a50311..820fa18142 100644 --- a/orca-web/orca-web.gradle +++ b/orca-web/orca-web.gradle @@ -97,6 +97,13 @@ dependencies { testImplementation ("com.squareup.retrofit2:retrofit-mock") } +sourceSets { + main { + java { srcDirs = [] } // no source dirs for the java compiler + groovy { srcDirs = ["src/main/java", "src/main/groovy"] } // compile everything in src/ with groovy + } +} + test { //The Implementation-Version is set in the MANIFEST.MF for the JAR produced via testing so that //assertions can be made against the version (see orca-plugins-test, for example). diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index d8de0aa22b..29f4f538a4 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -18,7 +18,9 @@ package com.netflix.spinnaker.orca.controllers import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.annotations.VisibleForTesting +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.config.TaskControllerConfigurationProperties import com.netflix.spinnaker.kork.web.exceptions.NotFoundException import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder import com.netflix.spinnaker.orca.api.pipeline.models.* @@ -34,9 +36,8 @@ import com.netflix.spinnaker.orca.util.ExpressionUtils import com.netflix.spinnaker.security.AuthenticatedRequest import groovy.transform.InheritConstructors import groovy.util.logging.Slf4j -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value import org.springframework.http.HttpStatus +import org.springframework.lang.Nullable import org.springframework.security.access.prepost.PostAuthorize import org.springframework.security.access.prepost.PostFilter import org.springframework.security.access.prepost.PreAuthorize @@ -44,6 +45,8 @@ import org.springframework.security.access.prepost.PreFilter import org.springframework.web.bind.annotation.* import rx.schedulers.Schedulers +import java.util.concurrent.Callable +import java.util.concurrent.Executors import java.nio.charset.Charset import java.time.Clock import java.time.ZoneOffset @@ -60,43 +63,44 @@ import static java.time.temporal.ChronoUnit.DAYS @Slf4j @RestController class TaskController { - @Autowired(required = false) Front50Service front50Service - - @Autowired ExecutionRepository executionRepository - - @Autowired ExecutionRunner executionRunner - - @Autowired CompoundExecutionOperator executionOperator - - @Autowired Collection stageBuilders - - @Autowired ContextParameterProcessor contextParameterProcessor - - @Autowired ExpressionUtils expressionUtils - - @Autowired ObjectMapper mapper - - @Autowired Registry registry - - @Autowired StageDefinitionBuilderFactory stageDefinitionBuilderFactory - - @Value('${tasks.days-of-execution-history:14}') - int daysOfExecutionHistory - - @Value('${tasks.number-of-old-pipeline-executions-to-include:2}') - int numberOfOldPipelineExecutionsToInclude - - Clock clock = Clock.systemUTC() + TaskControllerConfigurationProperties configurationProperties + Clock clock + + TaskController(@Nullable Front50Service front50Service, + ExecutionRepository executionRepository, + ExecutionRunner executionRunner, + CompoundExecutionOperator executionOperator, + Collection stageBuilders, + ContextParameterProcessor contextParameterProcessor, + ExpressionUtils expressionUtils, + ObjectMapper mapper, + Registry registry, + StageDefinitionBuilderFactory stageDefinitionBuilderFactory, + TaskControllerConfigurationProperties configurationProperties + ) { + this.front50Service = front50Service + this.executionRepository = executionRepository + this.executionRunner = executionRunner + this.executionOperator = executionOperator + this.stageBuilders = stageBuilders + this.contextParameterProcessor = contextParameterProcessor + this.expressionUtils = expressionUtils + this.mapper = mapper + this.registry = registry + this.stageDefinitionBuilderFactory = stageDefinitionBuilderFactory + this.configurationProperties = configurationProperties + this.clock = Clock.systemUTC() + } @PreAuthorize("hasPermission(#application, 'APPLICATION', 'READ')") @RequestMapping(value = "/applications/{application}/tasks", method = RequestMethod.GET) @@ -115,7 +119,7 @@ class TaskController { clock .instant() .atZone(ZoneOffset.UTC) - .minusDays(daysOfExecutionHistory) + .minusDays(this.configurationProperties.getDaysOfExecutionHistory()) .toInstant() ) @@ -583,11 +587,9 @@ class TaskController { @PreAuthorize("hasPermission(#application, 'APPLICATION', 'READ')") @RequestMapping(value = "/applications/{application}/pipelines", method = RequestMethod.GET) List getPipelinesForApplication(@PathVariable String application, - @RequestParam(value = "limit", defaultValue = "5") - int limit, - @RequestParam(value = "statuses", required = false) - String statuses, - @RequestParam(value = "expand", defaultValue = "true") Boolean expand) { + @RequestParam(value = "limit", defaultValue = "5") int limit, + @RequestParam(value = "statuses", required = false) String statuses, + @RequestParam(value = "expand", defaultValue = "true") Boolean expand) { if (!front50Service) { throw new UnsupportedOperationException("Cannot lookup pipelines, front50 has not been enabled. Fix this by setting front50.enabled: true") } @@ -595,7 +597,6 @@ class TaskController { if (!limit) { return [] } - statuses = statuses ?: ExecutionStatus.values()*.toString().join(",") def executionCriteria = new ExecutionCriteria( pageSize: limit, @@ -603,18 +604,41 @@ class TaskController { ) def pipelineConfigIds = front50Service.getPipelines(application, false)*.id as List + log.info("received ${pipelineConfigIds.size()} pipelines for application: $application from front50") def strategyConfigIds = front50Service.getStrategies(application)*.id as List + log.info("received ${strategyConfigIds.size()} strategies for application: $application from front50") def allIds = pipelineConfigIds + strategyConfigIds - def allPipelines = rx.Observable.merge(allIds.collect { + List commonPipelineConfigIds + if (this.configurationProperties.getOptimizeExecutionRetrieval()) { + log.info("running optimized execution retrieval process") + try { + List pipelineConfigIdsInOrca = executionRepository.retrievePipelineConfigIdsForApplication(application) + log.info("found ${pipelineConfigIdsInOrca.size()} pipeline config ids for application: $application in orca") + commonPipelineConfigIds = allIds.intersect(pipelineConfigIdsInOrca) + log.info("found ${commonPipelineConfigIds.size()} pipeline config ids that are common in orca and front50 " + + "for application: $application." + + " Saved ${allIds.size() - commonPipelineConfigIds.size()} extra pipeline config id queries") + } catch (Exception e) { + log.warn("retrieving pipeline config ids from orca db failed. using the result obtained from front50 ", e) + commonPipelineConfigIds = allIds + } + } else { + commonPipelineConfigIds = allIds + } + + List allPipelineExecutions = rx.Observable.merge(commonPipelineConfigIds.collect { + log.info("processing pipeline config id: $it") executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) }).subscribeOn(Schedulers.io()).toList().toBlocking().single().sort(startTimeOrId) if (!expand) { - unexpandPipelineExecutions(allPipelines) + log.info("unexpanding pipeline executions") + unexpandPipelineExecutions(allPipelineExecutions) } - return filterPipelinesByHistoryCutoff(allPipelines, limit) + log.info("filtering pipelines by history") + return filterPipelinesByHistoryCutoff(allPipelineExecutions, limit) } private static void validateSearchForPipelinesByTriggerParameters(long triggerTimeStartBoundary, long triggerTimeEndBoundary, int startIndex, int size) { @@ -663,7 +687,7 @@ class TaskController { private List filterPipelinesByHistoryCutoff(List pipelines, int limit) { // TODO-AJ The eventual goal is to return `allPipelines` without the need to group + filter below (WIP) - def cutoffTime = clock.instant().minus(daysOfExecutionHistory, DAYS).toEpochMilli() + def cutoffTime = clock.instant().minus(this.configurationProperties.getDaysOfExecutionHistory(), DAYS).toEpochMilli() def pipelinesSatisfyingCutoff = [] pipelines.groupBy { @@ -674,8 +698,9 @@ class TaskController { !it.startTime || it.startTime > cutoffTime } if (!recentPipelines && sortedPipelinesGroup) { - // no pipeline executions within `daysOfExecutionHistory` so include the first `numberOfOldPipelineExecutionsToInclude` - def upperBounds = Math.min(sortedPipelinesGroup.size(), numberOfOldPipelineExecutionsToInclude) - 1 + // no pipeline executions within `this.configurationProperties.getDaysOfExecutionHistory()` so include + // the first `this.configurationProperties.numberOfOldPipelineExecutionsToInclude()` + def upperBounds = Math.min(sortedPipelinesGroup.size(), this.getConfigurationProperties().getNumberOfOldPipelineExecutionsToInclude()) - 1 recentPipelines = sortedPipelinesGroup[0..upperBounds] } diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java new file mode 100644 index 0000000000..36c4f56d58 --- /dev/null +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.config; + +import com.netflix.spinnaker.orca.controllers.TaskController; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties("tasks.controller") +@Data +public class TaskControllerConfigurationProperties { + /** + * flag to enable speeding up execution retrieval. This is applicable for the {@link + * TaskController#getPipelinesForApplication(String, int, String, Boolean)} endpoint + */ + boolean optimizeExecutionRetrieval = false; + + /** moved this to here. Earlier definition was in the {@link TaskController} class */ + int daysOfExecutionHistory = 14; + + /** moved this to here. Earlier definition was in the {@link TaskController} class */ + int numberOfOldPipelineExecutionsToInclude = 2; + + public boolean getOptimizeExecutionRetrieval() { + return this.optimizeExecutionRetrieval; + } + + // need to set this explicitly so that it works in kotlin tests + public void setOptimizeExecutionRetrieval(boolean optimizeExecutionRetrieval) { + this.optimizeExecutionRetrieval = optimizeExecutionRetrieval; + } +} diff --git a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy index 8dee3526af..c5cf759312 100644 --- a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy +++ b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy @@ -19,16 +19,19 @@ package com.netflix.spinnaker.orca.controllers import com.fasterxml.jackson.databind.ObjectMapper import com.google.common.collect.Collections2 import com.netflix.spectator.api.NoopRegistry +import com.netflix.spinnaker.config.TaskControllerConfigurationProperties import com.netflix.spinnaker.kork.artifacts.model.Artifact -import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus +import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType import com.netflix.spinnaker.orca.front50.Front50Service import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator import com.netflix.spinnaker.orca.pipeline.ExecutionRunner +import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory import com.netflix.spinnaker.orca.pipeline.model.* import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor +import com.netflix.spinnaker.orca.util.ExpressionUtils import groovy.json.JsonSlurper import org.springframework.http.MediaType import org.springframework.mock.web.MockHttpServletResponse @@ -61,24 +64,24 @@ class TaskControllerSpec extends Specification { def clock = Clock.fixed(Instant.now(), UTC) int daysOfExecutionHistory = 14 - int numberOfOldPipelineExecutionsToInclude = 2 ObjectMapper objectMapper = OrcaObjectMapper.newInstance() void setup() { mockMvc = MockMvcBuilders.standaloneSetup( - new TaskController( - front50Service: front50Service, - executionRepository: executionRepository, - executionRunner: executionRunner, - daysOfExecutionHistory: daysOfExecutionHistory, - numberOfOldPipelineExecutionsToInclude: numberOfOldPipelineExecutionsToInclude, - clock: clock, - mapper: mapper, - registry: registry, - contextParameterProcessor: new ContextParameterProcessor(), - executionOperator: executionOperator - ) + new TaskController( + front50Service, + executionRepository, + executionRunner, + executionOperator, + List.of(Mock(StageDefinitionBuilder)), + new ContextParameterProcessor(), + Mock(ExpressionUtils), + mapper, + registry, + Mock(StageDefinitionBuilderFactory), + new TaskControllerConfigurationProperties() + ) ).build() } @@ -247,6 +250,8 @@ class TaskControllerSpec extends Specification { front50Service.getPipelines(app, false) >> [[id: "1"], [id: "2"]] front50Service.getStrategies(app) >> [] + executionRepository.retrievePipelineConfigIdsForApplication(app) >> { return List.of( '2')} + when: def response = mockMvc.perform(get("/applications/$app/pipelines")).andReturn().response List results = new ObjectMapper().readValue(response.contentAsString, List) From 213b3083ae0d4734a52075b9bb56e042f3e86f44 Mon Sep 17 00:00:00 2001 From: kirangodishala Date: Fri, 23 Apr 2021 00:40:38 +0530 Subject: [PATCH 2/8] feat(taskController): optimize TaskController.getPipelinesForApplication() further by processing multiple pipeline config ids at a time and multi-threading each config id batch --- .../persistence/DualExecutionRepository.kt | 18 ++ .../persistence/ExecutionRepository.java | 9 + .../InMemoryExecutionRepository.kt | 18 ++ .../jedis/RedisExecutionRepository.java | 21 ++ .../persistence/SqlExecutionRepository.kt | 123 ++++++++- orca-web/orca-web.gradle | 3 + .../orca/controllers/TaskController.groovy | 69 +++-- ...TaskControllerConfigurationProperties.java | 21 ++ .../controllers/TaskControllerSpec.groovy | 5 +- .../orca/controllers/TaskControllerTest.kt | 237 ++++++++++++++++++ 10 files changed, 504 insertions(+), 20 deletions(-) create mode 100644 orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index b6362e4929..90164ef189 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -192,6 +192,13 @@ class DualExecutionRepository( ).distinct { it.id } } + override fun retrievePipelineConfigIdsForApplication(application: String): List { + return ( + primary.retrievePipelineConfigIdsForApplication(application) + + previous.retrievePipelineConfigIdsForApplication(application) + ).distinct() + } + override fun retrievePipelinesForPipelineConfigId( pipelineConfigId: String, criteria: ExecutionCriteria @@ -202,6 +209,17 @@ class DualExecutionRepository( ).distinct { it.id } } + override fun retrievePipelineExecutionsForApplication( + application: String, + pipelineConfigIds: List, + criteria: ExecutionCriteria + ): Collection { + return ( + primary.retrievePipelineExecutionsForApplication(application, pipelineConfigIds, criteria) + + previous.retrievePipelineExecutionsForApplication(application, pipelineConfigIds, criteria) + ).distinctBy { it.id } + } + override fun retrievePipelinesForPipelineConfigIdsBetweenBuildTimeBoundary( pipelineConfigIds: MutableList, buildTimeStartBoundary: Long, diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index e9e1c5976a..550de7967e 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -92,6 +92,15 @@ Observable retrieve( Observable retrievePipelinesForPipelineConfigId( @Nonnull String pipelineConfigId, @Nonnull ExecutionCriteria criteria); + @Nonnull + Collection retrievePipelineConfigIdsForApplication(@Nonnull String application); + + @Nonnull + Collection retrievePipelineExecutionsForApplication( + @Nonnull String application, + @Nonnull List pipelineConfigIds, + @Nonnull ExecutionCriteria criteria); + /** * Returns executions in the time boundary. Redis impl does not respect pageSize or offset params, * and returns all executions. Sql impl respects these params. diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 4395c9f2bf..52b286a7de 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -276,6 +276,24 @@ class InMemoryExecutionRepository : ExecutionRepository { ) } + override fun retrievePipelineExecutionsForApplication( + application: String, + pipelineConfigIds: List, + criteria: ExecutionCriteria + ): Collection { + return pipelines.values + .filter { it.pipelineConfigId in pipelineConfigIds && it.application == application } + .applyCriteria(criteria) + .distinctBy { it.id } + } + + override fun retrievePipelineConfigIdsForApplication(application: String): List { + return pipelines.values + .filter { it.application == application } + .map { it.pipelineConfigId } + .distinct() + } + override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution { return retrieveByCorrelationId(ORCHESTRATION, correlationId) } diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index 70a405e42f..62bf8a7749 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -55,6 +55,7 @@ import javax.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -373,6 +374,15 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet idsToDelete.forEach(id -> delete(type, id)); } + @Override + public @Nonnull List retrievePipelineConfigIdsForApplication( + @Nonnull String application) { + // TODO: not implemented yet - this method, at present, is primarily meant for the + // SqlExecutionRepository + // implementation. + return List.of(); + } + @Override public @Nonnull Observable retrievePipelinesForApplication( @Nonnull String application) { @@ -479,6 +489,17 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet return currentObservable; } + @Override + public @NotNull List retrievePipelineExecutionsForApplication( + @NotNull String application, + @NotNull List pipelineConfigIds, + @NotNull ExecutionCriteria executionCriteria) { + List> output = new ArrayList<>(); + pipelineConfigIds.forEach( + configId -> output.add(retrievePipelinesForPipelineConfigId(configId, executionCriteria))); + return Observable.merge(output).subscribeOn(Schedulers.io()).toList().toBlocking().single(); + } + /* * There is no guarantee that the returned results will be sorted. * @param limit and the param @offset are only implemented in SqlExecutionRepository diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 1ef18fb8e1..606cf0a4d9 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -53,8 +53,6 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.Execu import com.netflix.spinnaker.orca.pipeline.persistence.UnpausablePipelineException import com.netflix.spinnaker.orca.pipeline.persistence.UnresumablePipelineException import de.huxhorn.sulky.ulid.SpinULID -import java.lang.System.currentTimeMillis -import java.security.SecureRandom import java.time.Duration import org.jooq.DSLContext import org.jooq.DatePart @@ -80,7 +78,32 @@ import org.jooq.impl.DSL.value import org.slf4j.LoggerFactory import rx.Observable import java.io.ByteArrayOutputStream +import java.lang.System.currentTimeMillis import java.nio.charset.StandardCharsets +import java.security.SecureRandom +import kotlin.collections.Collection +import kotlin.collections.Iterable +import kotlin.collections.Iterator +import kotlin.collections.List +import kotlin.collections.Map +import kotlin.collections.MutableList +import kotlin.collections.chunked +import kotlin.collections.distinct +import kotlin.collections.firstOrNull +import kotlin.collections.forEach +import kotlin.collections.isEmpty +import kotlin.collections.isNotEmpty +import kotlin.collections.listOf +import kotlin.collections.map +import kotlin.collections.mapOf +import kotlin.collections.mutableListOf +import kotlin.collections.mutableMapOf +import kotlin.collections.plus +import kotlin.collections.set +import kotlin.collections.toList +import kotlin.collections.toMutableList +import kotlin.collections.toMutableMap +import kotlin.collections.toTypedArray /** * A generic SQL [ExecutionRepository]. @@ -427,6 +450,102 @@ class SqlExecutionRepository( ) } + override fun retrievePipelineConfigIdsForApplication(application: String): List = + withPool(poolName) { + return jooq.selectDistinct(field("config_id")) + .from(PIPELINE.tableName) + .where(field("application").eq(application)) + .fetch(0, String::class.java) + } + + /** + * this function is meant to support only the following ExecutionCriteria: + * 'limit', a.k.a page size and + * 'statuses'. + * + * It runs two queries: + * 1. get 'limit' number of executions for n pipeline config ids at a time. + * 2. get all the stage information for all the executions returned from 1. + * + * The first query with only limit execution criteria looks like this: + select id, body, compressed_body, compression_type, `partition` from ( + select config_id, id, body, application, `partition` from ( + select config_id, id, body, application, `partition`, + @row_number := IF(@prev = config_id, @row_number + 1, 1) AS row_number, + @prev := config_id // here prev is used to store a config id. row number is incremented based on how many rows are found that have the same config id + from pipelines + JOIN (SELECT @prev := NULL, @row_number := 0) AS vars) + on (application = "myapp" and config_id in ()) + order by config_id, id desc + ) as limited_executions where row_number <= limit + ) as configs left outer join pipelines_compressed_executions using (`id`); + + + * The first query with limit and status execution criteria looks like this: + select id, body, compressed_body, compression_type, `partition` from ( + select config_id, id, body, application, status, `partition` from ( + select config_id, id, body, application, status, `partition`, + @row_number := IF(@prev = config_id, @row_number + 1, 1) AS row_number, + @prev := config_id + from pipelines + JOIN (SELECT @prev := NULL, @row_number := 0) AS vars) + on (application = "myapp" and config_id in () and status in ()) + order by config_id, id desc + ) as limited_executions where row_number <=2 + ) as configs left outer join pipelines_compressed_executions using (`id`); + */ + override fun retrievePipelineExecutionsForApplication( + application: String, + pipelineConfigIds: List, + criteria: ExecutionCriteria + ): Collection { + // list of relevant fields to be returned + val innerFields = listOf(field("config_id"), + field("id"), + field("body"), + field("application"), + field("status"), + field(name("partition"))) + + val vars = listOf(field("@prev := NULL", Integer.TYPE), + field("@row_number := 0", Integer.TYPE) + ) + + // baseQueryPredicate for the flow where there are no statuses in the execution criteria + var baseQueryPredicate = field("application").eq(application) + .and(field("config_id").`in`(*pipelineConfigIds.toTypedArray())) + + // baseQueryPredicate for the flow with statuses + if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) { + val statusStrings = criteria.statuses.map { it.toString() } + baseQueryPredicate = baseQueryPredicate + .and(field("status").`in`(*statusStrings.toTypedArray())) + } + + log.info("getting execution ids") + val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) + .from( + jooq.select(innerFields) + .from( + jooq.select(innerFields + + field("@row_number := IF(@prev = config_id, @row_number + 1, 1)", Integer.TYPE) // keep a count of how many rows are found per config id + .`as`("row_number") + + field("@prev := config_id") + ) + .from(PIPELINE.tableName) + .join(jooq.select(vars)) + .on(baseQueryPredicate) + .orderBy(field("config_id"), field("id").desc()) + ) + .where(field("row_number").le(criteria.pageSize)) // filter using the count maintained above + ) + .leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id")) + .fetch() + + log.info("getting stage information for all the execution ids found") + return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq) + } + override fun retrievePipelinesForPipelineConfigId( pipelineConfigId: String, criteria: ExecutionCriteria diff --git a/orca-web/orca-web.gradle b/orca-web/orca-web.gradle index 820fa18142..88417858c8 100644 --- a/orca-web/orca-web.gradle +++ b/orca-web/orca-web.gradle @@ -94,6 +94,9 @@ dependencies { testImplementation("io.strikt:strikt-core") testImplementation("io.mockk:mockk") testImplementation("org.apache.groovy:groovy-json") + testImplementation("com.nhaarman:mockito-kotlin") + testImplementation("io.spinnaker.kork:kork-sql-test") + testImplementation("org.testcontainers:mysql") testImplementation ("com.squareup.retrofit2:retrofit-mock") } diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 29f4f538a4..e50ed605e3 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -603,35 +603,72 @@ class TaskController { statuses: (statuses.split(",") as Collection) ) + // get all relevant pipeline and strategy configs from front50 def pipelineConfigIds = front50Service.getPipelines(application, false)*.id as List log.info("received ${pipelineConfigIds.size()} pipelines for application: $application from front50") def strategyConfigIds = front50Service.getStrategies(application)*.id as List log.info("received ${strategyConfigIds.size()} strategies for application: $application from front50") - def allIds = pipelineConfigIds + strategyConfigIds - - List commonPipelineConfigIds + def allFrontIds = pipelineConfigIds + strategyConfigIds + + List allPipelineExecutions = [] + + // this optimized flow is meant to speed up the execution retrieval process per pipeline config id. It does it in two + // steps: + // 1. it first compares the list of pipeline ids obtained from front50 with what is stored in orca db itself. + // There is no need to process config ids have no executions associated with them. The absence of + // a pipeline config id from the orca db indicates the same. So to reduce the number of config ids to process, we + // intersect the result obtained from front50 and orca db, which gives us the reduced list. Note: this could be + // further optimized by cutting front50 out from the picture completely. But I do not know what other side-effects + // that may cause, so am going ahead with the above logic. + // + // 2. We now process n pipeline configs at a time from this reduced set, since processing one config at a + // time was proving to be time consuming for applications with loads of pipelines and executions. In additon to this, + // we also make use of a thread pool to do this, so we can parallelize processing of multiple such batches. By doing + // this, we move away from the Rx Observable logic that was defined previously. if (this.configurationProperties.getOptimizeExecutionRetrieval()) { - log.info("running optimized execution retrieval process") + log.info("running optimized execution retrieval process with: " + + "${this.configurationProperties.getMaxExecutionRetrievalThreads()} threads and processing" + + " ${this.configurationProperties.getMaxNumberOfPipelineConfigIdsToProcess()} pipeline config ids at a time") + + List commonIdsInFront50AndOrca try { - List pipelineConfigIdsInOrca = executionRepository.retrievePipelineConfigIdsForApplication(application) - log.info("found ${pipelineConfigIdsInOrca.size()} pipeline config ids for application: $application in orca") - commonPipelineConfigIds = allIds.intersect(pipelineConfigIdsInOrca) - log.info("found ${commonPipelineConfigIds.size()} pipeline config ids that are common in orca and front50 " + + List allOrcaIds = executionRepository.retrievePipelineConfigIdsForApplication(application) + log.info("found ${allOrcaIds.size()} pipeline config ids for application: $application in orca") + commonIdsInFront50AndOrca = allFrontIds.intersect(allOrcaIds) + log.info("found ${commonIdsInFront50AndOrca.size()} pipeline config ids that are common in orca and front50 " + "for application: $application." + - " Saved ${allIds.size() - commonPipelineConfigIds.size()} extra pipeline config id queries") + " Saved ${allFrontIds.size() - commonIdsInFront50AndOrca.size()} extra pipeline config id queries") } catch (Exception e) { log.warn("retrieving pipeline config ids from orca db failed. using the result obtained from front50 ", e) - commonPipelineConfigIds = allIds + commonIdsInFront50AndOrca = allFrontIds + } + def executor = Executors.newFixedThreadPool(this.configurationProperties.getMaxExecutionRetrievalThreads(), + new ThreadFactoryBuilder() + .setNameFormat("taskcontroller" + "-%d") + .build()) + def futures = new ArrayList(commonIdsInFront50AndOrca.size()) + + log.info("processing ${commonIdsInFront50AndOrca.size()} pipeline config ids") + commonIdsInFront50AndOrca + .collate(this.configurationProperties.getMaxNumberOfPipelineConfigIdsToProcess()) + .each { + def chunkedList = it + futures.add( + executor.submit({ + executionRepository.retrievePipelineExecutionsForApplication(application, chunkedList, executionCriteria) + } as Callable)) + } + futures.each { + allPipelineExecutions.addAll(it.get()) } } else { - commonPipelineConfigIds = allIds + allPipelineExecutions = rx.Observable.merge(allFrontIds.collect { + log.debug("processing pipeline config id: $it") + executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) + }).subscribeOn(Schedulers.io()).toList().toBlocking().single() } - List allPipelineExecutions = rx.Observable.merge(commonPipelineConfigIds.collect { - log.info("processing pipeline config id: $it") - executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) - }).subscribeOn(Schedulers.io()).toList().toBlocking().single().sort(startTimeOrId) - + allPipelineExecutions.sort(startTimeOrId) if (!expand) { log.info("unexpanding pipeline executions") unexpandPipelineExecutions(allPipelineExecutions) diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java index 36c4f56d58..e96c692ad4 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -31,6 +31,23 @@ public class TaskControllerConfigurationProperties { */ boolean optimizeExecutionRetrieval = false; + /** + * only applicable if optimizeExecutionRetrieval = true. It specifies how many threads should + * process the queries to retrieve the executions. Needs to be tuned appropriately since this has + * the potential to exhaust the connection pool size for the database. + */ + int maxExecutionRetrievalThreads = 10; + + /** + * only applicable if optimizeExecutionRetrieval = true. It specifies how many pipeline config ids + * should be processed at a time. 15 pipeline config ids was selected as the default after testing + * this number against an orca sql db that contained lots of pipelines and executions for a single + * application (about 1200 pipelines and 1000 executions). More than 15 resulted in a query that + * took too long to complete. It will have to be tuned though, since 50 config ids work for some + * other applications easily but not for others. + */ + int maxNumberOfPipelineConfigIdsToProcess = 15; + /** moved this to here. Earlier definition was in the {@link TaskController} class */ int daysOfExecutionHistory = 14; @@ -45,4 +62,8 @@ public boolean getOptimizeExecutionRetrieval() { public void setOptimizeExecutionRetrieval(boolean optimizeExecutionRetrieval) { this.optimizeExecutionRetrieval = optimizeExecutionRetrieval; } + + public int getDaysOfExecutionHistory() { + return this.daysOfExecutionHistory; + } } diff --git a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy index c5cf759312..74020cf33b 100644 --- a/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy +++ b/orca-web/src/test/groovy/com/netflix/spinnaker/orca/controllers/TaskControllerSpec.groovy @@ -63,8 +63,9 @@ class TaskControllerSpec extends Specification { def registry = new NoopRegistry() def clock = Clock.fixed(Instant.now(), UTC) - int daysOfExecutionHistory = 14 + def taskControllerConfigurationProperties = new TaskControllerConfigurationProperties() + int daysOfExecutionHistory = taskControllerConfigurationProperties.getDaysOfExecutionHistory() ObjectMapper objectMapper = OrcaObjectMapper.newInstance() void setup() { @@ -80,7 +81,7 @@ class TaskControllerSpec extends Specification { mapper, registry, Mock(StageDefinitionBuilderFactory), - new TaskControllerConfigurationProperties() + taskControllerConfigurationProperties ) ).build() } diff --git a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt new file mode 100644 index 0000000000..bb0ed39cf6 --- /dev/null +++ b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt @@ -0,0 +1,237 @@ +/* + * Copyright 2021 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.controllers + +import com.fasterxml.jackson.core.type.TypeReference +import com.netflix.spectator.api.NoopRegistry +import com.netflix.spinnaker.config.ExecutionCompressionProperties +import com.netflix.spinnaker.config.TaskControllerConfigurationProperties +import com.netflix.spinnaker.kork.sql.config.RetryProperties +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution +import com.netflix.spinnaker.orca.front50.Front50Service +import com.netflix.spinnaker.orca.jackson.OrcaObjectMapper +import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor +import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository +import com.nhaarman.mockito_kotlin.mock +import dev.minutest.junit.JUnit5Minutests +import dev.minutest.rootContext +import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.table +import org.mockito.Mockito +import org.springframework.test.web.servlet.MockMvc +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get +import org.springframework.test.web.servlet.setup.MockMvcBuilders +import strikt.api.expectThat +import strikt.assertions.isEqualTo +import java.time.Clock +import java.time.Instant +import java.time.ZoneId +import java.time.temporal.ChronoUnit + +class TaskControllerTest : JUnit5Minutests { + data class Fixture(val optimizeExecution: Boolean) { + + private val clock: Clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()) + val database: SqlTestUtil.TestDatabase = SqlTestUtil.initTcMysqlDatabase()!! + + + + private val executionRepository: SqlExecutionRepository = SqlExecutionRepository( + partitionName = "test", + jooq = database.context, + mapper = OrcaObjectMapper.getInstance(), + retryProperties = RetryProperties(), + compressionProperties = ExecutionCompressionProperties(), + pipelineRefEnabled = false + ) + + private val taskControllerConfigurationProperties: TaskControllerConfigurationProperties = TaskControllerConfigurationProperties() + .apply { optimizeExecutionRetrieval = optimizeExecution } + + private val daysOfExecutionHistory: Long = taskControllerConfigurationProperties.daysOfExecutionHistory.toLong() + + private val front50Service: Front50Service = mock() + + private val taskController: TaskController = TaskController( + front50Service, + executionRepository, + mock(), + mock(), + listOf(mock()), + ContextParameterProcessor(), + mock(), + OrcaObjectMapper.getInstance(), + NoopRegistry(), + mock(), + taskControllerConfigurationProperties + ) + + val subject: MockMvc = MockMvcBuilders.standaloneSetup(taskController).build() + + fun setup() { + database.context + .insertInto(table("pipelines"), + listOf( + field("config_id"), + field("id"), + field("application"), + field("build_time"), + field("start_time"), + field("body"), + field("status") + )) + .values( + listOf( + "1", + "1-exec-id-1", + "test-app", + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(120, ChronoUnit.MINUTES).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(120, ChronoUnit.HOURS).toEpochMilli(), + "{\"id\": \"1-exec-id-1\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"1\"}", + "SUCCEEDED" + ) + ) + .values( + listOf( + "1", + "1-exec-id-2", + "test-app", + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(115, ChronoUnit.MINUTES).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(115, ChronoUnit.MINUTES).toEpochMilli(), + "{\"id\": \"1-exec-id-2\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"1\"}", + "TERMINAL" + ) + ) + .values( + listOf( + "1", + "1-exec-id-3", + "test-app", + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(114, ChronoUnit.MINUTES).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(114, ChronoUnit.MINUTES).toEpochMilli(), + "{\"id\": \"1-exec-id-3\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"1\"}", + "RUNNING" + ) + ) + .values( + listOf( + "2", + "2-exec-id-1", + "test-app", + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(2, ChronoUnit.HOURS).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory, ChronoUnit.DAYS).minus(2, ChronoUnit.HOURS).toEpochMilli(), + "{\"id\": \"2-exec-id-1\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"2\"}", + "NOT_STARTED" + ) + ) + .values( + listOf( + "3", + "3-exec-id-1", + "test-app-2", + clock.instant().minus(daysOfExecutionHistory + 1, ChronoUnit.DAYS).minus(2, ChronoUnit.HOURS).toEpochMilli(), + clock.instant().minus(daysOfExecutionHistory + 1, ChronoUnit.DAYS).minus(2, ChronoUnit.HOURS).toEpochMilli(), + "{\"id\": \"3-exec-id-1\", \"type\": \"PIPELINE\", \"pipelineConfigId\": \"3\"}", + "STOPPED" + ) + ) + .execute() + Mockito.`when`(front50Service.getPipelines("test-app", false)) + .thenReturn( + listOf( + mapOf("id" to "1"), + mapOf("id" to "2")), + ) + + Mockito.`when`(front50Service.getStrategies("test-app")) + .thenReturn(listOf()) + } + + fun cleanUp() { + SqlTestUtil.cleanupDb(database.context) + } + } + + fun tests() = rootContext { + context("execution retrieval without optimization") { + fixture { + Fixture(false) + } + + before { setup() } + after { cleanUp() } + + test("retrieve executions with limit = 2 & expand = false") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get("/applications/test-app/pipelines?limit=2&expand=false")).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + val expectedOutput = listOf("1-exec-id-2", "1-exec-id-3","2-exec-id-1") + expectThat(results.size).isEqualTo(3) + results.forEach { + assert(it.id in expectedOutput) + } + } + + test("retrieve executions with limit = 2 & expand = false with statuses") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get( + "/applications/test-app/pipelines?limit=2&expand=false&statuses=RUNNING,SUSPENDED,PAUSED,NOT_STARTED") + ).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + val expectedOutput = listOf("1-exec-id-3","2-exec-id-1") + expectThat(results.size).isEqualTo(2) + results.forEach { + assert(it.id in expectedOutput) + } + } + } + + context("execution retrieval with optimization") { + fixture { + Fixture(true) + } + + before { setup() } + after { cleanUp() } + + test("retrieve executions with limit = 2 & expand = false") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get("/applications/test-app/pipelines?limit=2&expand=false")).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + val expectedOutput = listOf("1-exec-id-2", "1-exec-id-3","2-exec-id-1") + expectThat(results.size).isEqualTo(3) + results.forEach { + assert(it.id in expectedOutput) + } + } + + test("retrieve executions with limit = 2 & expand = false with statuses") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get( + "/applications/test-app/pipelines?limit=2&expand=false&statuses=RUNNING,SUSPENDED,PAUSED,NOT_STARTED") + ).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + val expectedOutput = listOf("1-exec-id-3","2-exec-id-1") + expectThat(results.size).isEqualTo(2) + results.forEach { + assert(it.id in expectedOutput) + } + } + } + } +} From 4aee645f441d0786b7c6b4197e97eadac1f3a3d8 Mon Sep 17 00:00:00 2001 From: Apoorv Mahajan Date: Wed, 16 Jun 2021 10:49:57 +0530 Subject: [PATCH 3/8] refactor(taskController): rework execution retrieval query optimizations --- .../persistence/DualExecutionRepository.kt | 20 ++- .../persistence/ExecutionRepository.java | 6 +- .../InMemoryExecutionRepository.kt | 32 ++-- .../jedis/RedisExecutionRepository.java | 43 +++-- .../persistence/SqlExecutionRepository.kt | 153 ++++++++++-------- .../orca/controllers/TaskController.groovy | 137 ++++++++++------ ...TaskControllerConfigurationProperties.java | 21 ++- 7 files changed, 250 insertions(+), 162 deletions(-) diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index 90164ef189..20be61abc9 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -30,6 +30,7 @@ import org.springframework.context.ApplicationContext import org.springframework.context.annotation.Primary import org.springframework.stereotype.Component import rx.Observable +import javax.annotation.Nonnull /** * Intended for performing red/black Orca deployments which do not share the @@ -209,14 +210,19 @@ class DualExecutionRepository( ).distinct { it.id } } - override fun retrievePipelineExecutionsForApplication( - application: String, - pipelineConfigIds: List, - criteria: ExecutionCriteria - ): Collection { + override fun filterPipelineExecutionsForApplication(@Nonnull application: String, + @Nonnull pipelineConfigIds: List, + @Nonnull criteria: ExecutionCriteria): List{ + return primary.filterPipelineExecutionsForApplication(application, pipelineConfigIds, criteria) + + previous.filterPipelineExecutionsForApplication(application,pipelineConfigIds, criteria) + } + + override fun retrievePipelineExecutionsDetailsForApplication( + @Nonnull application: String, + pipelineConfigIds: List): Collection { return ( - primary.retrievePipelineExecutionsForApplication(application, pipelineConfigIds, criteria) + - previous.retrievePipelineExecutionsForApplication(application, pipelineConfigIds, criteria) + primary.retrievePipelineExecutionsDetailsForApplication(application, pipelineConfigIds) + + previous.retrievePipelineExecutionsDetailsForApplication(application,pipelineConfigIds) ).distinctBy { it.id } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 550de7967e..43b6ee04f7 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -96,11 +96,15 @@ Observable retrievePipelinesForPipelineConfigId( Collection retrievePipelineConfigIdsForApplication(@Nonnull String application); @Nonnull - Collection retrievePipelineExecutionsForApplication( + Collection filterPipelineExecutionsForApplication( @Nonnull String application, @Nonnull List pipelineConfigIds, @Nonnull ExecutionCriteria criteria); + @Nonnull + Collection retrievePipelineExecutionsDetailsForApplication( + @Nonnull String application, @Nonnull List pipelineConfigIds); + /** * Returns executions in the time boundary. Redis impl does not respect pageSize or offset params, * and returns all executions. Sql impl respects these params. diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 52b286a7de..1537a4f71a 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -23,10 +23,11 @@ import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionComparator import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository.ExecutionCriteria +import rx.Observable import java.lang.System.currentTimeMillis import java.time.Instant import java.util.concurrent.ConcurrentHashMap -import rx.Observable +import javax.annotation.Nonnull class InMemoryExecutionRepository : ExecutionRepository { @@ -276,17 +277,6 @@ class InMemoryExecutionRepository : ExecutionRepository { ) } - override fun retrievePipelineExecutionsForApplication( - application: String, - pipelineConfigIds: List, - criteria: ExecutionCriteria - ): Collection { - return pipelines.values - .filter { it.pipelineConfigId in pipelineConfigIds && it.application == application } - .applyCriteria(criteria) - .distinctBy { it.id } - } - override fun retrievePipelineConfigIdsForApplication(application: String): List { return pipelines.values .filter { it.application == application } @@ -294,6 +284,24 @@ class InMemoryExecutionRepository : ExecutionRepository { .distinct() } + override fun filterPipelineExecutionsForApplication(@Nonnull application: String, + @Nonnull pipelineConfigIds: List, + @Nonnull criteria: ExecutionCriteria): List { + return pipelines.values + .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } + .applyCriteria(criteria) + .map { it.id } + } + + override fun retrievePipelineExecutionsDetailsForApplication( + application: String, + pipelineConfigIds: List): Collection { + return pipelines.values + .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } + .distinctBy { it.id } + } + + override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution { return retrieveByCorrelationId(ORCHESTRATION, correlationId) } diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index 62bf8a7749..e77deed8a8 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -374,15 +374,6 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet idsToDelete.forEach(id -> delete(type, id)); } - @Override - public @Nonnull List retrievePipelineConfigIdsForApplication( - @Nonnull String application) { - // TODO: not implemented yet - this method, at present, is primarily meant for the - // SqlExecutionRepository - // implementation. - return List.of(); - } - @Override public @Nonnull Observable retrievePipelinesForApplication( @Nonnull String application) { @@ -490,14 +481,32 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet } @Override - public @NotNull List retrievePipelineExecutionsForApplication( - @NotNull String application, - @NotNull List pipelineConfigIds, - @NotNull ExecutionCriteria executionCriteria) { - List> output = new ArrayList<>(); - pipelineConfigIds.forEach( - configId -> output.add(retrievePipelinesForPipelineConfigId(configId, executionCriteria))); - return Observable.merge(output).subscribeOn(Schedulers.io()).toList().toBlocking().single(); + public @Nonnull List retrievePipelineConfigIdsForApplication( + @Nonnull String application) { + // TODO: not implemented yet - this method, at present, is primarily meant for the + // SqlExecutionRepository + // implementation. + return List.of(); + } + + @Override + public @Nonnull List filterPipelineExecutionsForApplication( + @Nonnull String application, + @Nonnull List pipelineConfigIds, + @Nonnull ExecutionCriteria criteria) { + // TODO: not implemented yet - this method, at present, is primarily meant for the + // SqlExecutionRepository + // implementation. + return List.of(); + } + + @Override + public @NotNull List retrievePipelineExecutionsDetailsForApplication( + @Nonnull String application, @NotNull List pipelineExecutionIds) { + // TODO: not implemented yet - this method, at present, is primarily meant for the + // SqlExecutionRepository + // implementation. + return List.of(); } /* diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 606cf0a4d9..243e3de5b5 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -81,6 +81,7 @@ import java.io.ByteArrayOutputStream import java.lang.System.currentTimeMillis import java.nio.charset.StandardCharsets import java.security.SecureRandom +import java.util.stream.Collectors.toList import kotlin.collections.Collection import kotlin.collections.Iterable import kotlin.collections.Iterator @@ -453,66 +454,46 @@ class SqlExecutionRepository( override fun retrievePipelineConfigIdsForApplication(application: String): List = withPool(poolName) { return jooq.selectDistinct(field("config_id")) - .from(PIPELINE.tableName) + .from(PIPELINE.tableName) // not adding index here as it slowed down the query .where(field("application").eq(application)) .fetch(0, String::class.java) } /** - * this function is meant to support only the following ExecutionCriteria: + * this function supports the following ExecutionCriteria currently: * 'limit', a.k.a page size and * 'statuses'. * - * It runs two queries: - * 1. get 'limit' number of executions for n pipeline config ids at a time. - * 2. get all the stage information for all the executions returned from 1. + * It executes the following query to determine how many pipeline executions exist that satisfy the above + * ExecutionCriteria. It then returns a list of all these execution ids. * - * The first query with only limit execution criteria looks like this: - select id, body, compressed_body, compression_type, `partition` from ( - select config_id, id, body, application, `partition` from ( - select config_id, id, body, application, `partition`, - @row_number := IF(@prev = config_id, @row_number + 1, 1) AS row_number, - @prev := config_id // here prev is used to store a config id. row number is incremented based on how many rows are found that have the same config id - from pipelines - JOIN (SELECT @prev := NULL, @row_number := 0) AS vars) - on (application = "myapp" and config_id in ()) - order by config_id, id desc - ) as limited_executions where row_number <= limit - ) as configs left outer join pipelines_compressed_executions using (`id`); - - - * The first query with limit and status execution criteria looks like this: - select id, body, compressed_body, compression_type, `partition` from ( - select config_id, id, body, application, status, `partition` from ( - select config_id, id, body, application, status, `partition`, - @row_number := IF(@prev = config_id, @row_number + 1, 1) AS row_number, - @prev := config_id - from pipelines - JOIN (SELECT @prev := NULL, @row_number := 0) AS vars) - on (application = "myapp" and config_id in () and status in ()) - order by config_id, id desc - ) as limited_executions where row_number <=2 - ) as configs left outer join pipelines_compressed_executions using (`id`); + * It does this by executing the following query: + * - If the execution criteria does not contain any statuses: + * SELECT config_id, id + FROM pipelines force index (`pipeline_config_id_idx`) + WHERE application = "myapp" + ORDER BY + config_id; + * - If the execution criteria contains statuses: + * SELECT config_id, id + FROM pipelines force index (`pipeline_config_id_idx`) + WHERE ( + application = "myapp" and + status in ("status1", "status2) + ) + ORDER BY + config_id; + + * It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db + * when running a query where the limit was calculated in the query itself . Thereforce, we are moving that logic to + * the code below to ease the burden on the db in such circumstances. */ - override fun retrievePipelineExecutionsForApplication( - application: String, - pipelineConfigIds: List, - criteria: ExecutionCriteria - ): Collection { - // list of relevant fields to be returned - val innerFields = listOf(field("config_id"), - field("id"), - field("body"), - field("application"), - field("status"), - field(name("partition"))) - - val vars = listOf(field("@prev := NULL", Integer.TYPE), - field("@row_number := 0", Integer.TYPE) - ) + override fun filterPipelineExecutionsForApplication(application: String, + pipelineConfigIds: List, + criteria: ExecutionCriteria): List { // baseQueryPredicate for the flow where there are no statuses in the execution criteria - var baseQueryPredicate = field("application").eq(application) + var baseQueryPredicate = field("application").eq(application) .and(field("config_id").`in`(*pipelineConfigIds.toTypedArray())) // baseQueryPredicate for the flow with statuses @@ -522,28 +503,66 @@ class SqlExecutionRepository( .and(field("status").`in`(*statusStrings.toTypedArray())) } + val finalResult: MutableList = mutableListOf() + log.info("getting execution ids") - val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) - .from( - jooq.select(innerFields) - .from( - jooq.select(innerFields + - field("@row_number := IF(@prev = config_id, @row_number + 1, 1)", Integer.TYPE) // keep a count of how many rows are found per config id - .`as`("row_number") + - field("@prev := config_id") + withPool(poolName) { + val baseQuery = jooq.select(field("config_id"), field("id")) + .from( + if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_config_id_idx") + else PIPELINE.tableName + ) + .where(baseQueryPredicate) + .orderBy(field("config_id")) + .fetch().intoGroups("config_id", "id") + + baseQuery.forEach { + val count = it.value.size + if (criteria.pageSize < count) { + finalResult.addAll(it.value + .stream() + .skip((count - criteria.pageSize).toLong()) + .collect(toList()) as List ) - .from(PIPELINE.tableName) - .join(jooq.select(vars)) - .on(baseQueryPredicate) - .orderBy(field("config_id"), field("id").desc()) - ) - .where(field("row_number").le(criteria.pageSize)) // filter using the count maintained above - ) - .leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id")) - .fetch() + } else { + finalResult.addAll(it.value as List) + } + } + } + return finalResult + } + + /** + * It executes the following query to get execution details for n executions at a time in a specific application + * + * SELECT id, body, compressed_body, compression_type, `partition` + FROM pipelines + left outer join + pipelines_compressed_executions + using (`id`) + WHERE ( + application = "" and + id in ('id1', 'id2', 'id3') + ); + * + * it then get all the stage information for all the executions returned from the above query. + */ + override fun retrievePipelineExecutionsDetailsForApplication( + application: String, + pipelineExecutions: List): Collection { + withPool(poolName) { + val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) + .from(PIPELINE.tableName) + .leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id")) + .where( + field("application").eq(application) + .and(field("id").`in`(*pipelineExecutions.toTypedArray())) + ) + .fetch() - log.info("getting stage information for all the execution ids found") - return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq) + log.info("getting stage information for all the executions found so far") + return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq) + } } override fun retrievePipelinesForPipelineConfigId( diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index e50ed605e3..6bfd13c97c 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -46,6 +46,7 @@ import org.springframework.web.bind.annotation.* import rx.schedulers.Schedulers import java.util.concurrent.Callable +import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.nio.charset.Charset import java.time.Clock @@ -76,6 +77,8 @@ class TaskController { TaskControllerConfigurationProperties configurationProperties Clock clock + ExecutorService executorService + TaskController(@Nullable Front50Service front50Service, ExecutionRepository executionRepository, ExecutionRunner executionRunner, @@ -100,6 +103,10 @@ class TaskController { this.stageDefinitionBuilderFactory = stageDefinitionBuilderFactory this.configurationProperties = configurationProperties this.clock = Clock.systemUTC() + this.executorService = Executors.newFixedThreadPool(this.configurationProperties.getMaxExecutionRetrievalThreads(), + new ThreadFactoryBuilder() + .setNameFormat("taskcontroller" + "-%d") + .build()) } @PreAuthorize("hasPermission(#application, 'APPLICATION', 'READ')") @@ -608,61 +615,16 @@ class TaskController { log.info("received ${pipelineConfigIds.size()} pipelines for application: $application from front50") def strategyConfigIds = front50Service.getStrategies(application)*.id as List log.info("received ${strategyConfigIds.size()} strategies for application: $application from front50") - def allFrontIds = pipelineConfigIds + strategyConfigIds + def allFront50Ids = pipelineConfigIds + strategyConfigIds List allPipelineExecutions = [] - // this optimized flow is meant to speed up the execution retrieval process per pipeline config id. It does it in two - // steps: - // 1. it first compares the list of pipeline ids obtained from front50 with what is stored in orca db itself. - // There is no need to process config ids have no executions associated with them. The absence of - // a pipeline config id from the orca db indicates the same. So to reduce the number of config ids to process, we - // intersect the result obtained from front50 and orca db, which gives us the reduced list. Note: this could be - // further optimized by cutting front50 out from the picture completely. But I do not know what other side-effects - // that may cause, so am going ahead with the above logic. - // - // 2. We now process n pipeline configs at a time from this reduced set, since processing one config at a - // time was proving to be time consuming for applications with loads of pipelines and executions. In additon to this, - // we also make use of a thread pool to do this, so we can parallelize processing of multiple such batches. By doing - // this, we move away from the Rx Observable logic that was defined previously. if (this.configurationProperties.getOptimizeExecutionRetrieval()) { - log.info("running optimized execution retrieval process with: " + - "${this.configurationProperties.getMaxExecutionRetrievalThreads()} threads and processing" + - " ${this.configurationProperties.getMaxNumberOfPipelineConfigIdsToProcess()} pipeline config ids at a time") - - List commonIdsInFront50AndOrca - try { - List allOrcaIds = executionRepository.retrievePipelineConfigIdsForApplication(application) - log.info("found ${allOrcaIds.size()} pipeline config ids for application: $application in orca") - commonIdsInFront50AndOrca = allFrontIds.intersect(allOrcaIds) - log.info("found ${commonIdsInFront50AndOrca.size()} pipeline config ids that are common in orca and front50 " + - "for application: $application." + - " Saved ${allFrontIds.size() - commonIdsInFront50AndOrca.size()} extra pipeline config id queries") - } catch (Exception e) { - log.warn("retrieving pipeline config ids from orca db failed. using the result obtained from front50 ", e) - commonIdsInFront50AndOrca = allFrontIds - } - def executor = Executors.newFixedThreadPool(this.configurationProperties.getMaxExecutionRetrievalThreads(), - new ThreadFactoryBuilder() - .setNameFormat("taskcontroller" + "-%d") - .build()) - def futures = new ArrayList(commonIdsInFront50AndOrca.size()) - - log.info("processing ${commonIdsInFront50AndOrca.size()} pipeline config ids") - commonIdsInFront50AndOrca - .collate(this.configurationProperties.getMaxNumberOfPipelineConfigIdsToProcess()) - .each { - def chunkedList = it - futures.add( - executor.submit({ - executionRepository.retrievePipelineExecutionsForApplication(application, chunkedList, executionCriteria) - } as Callable)) - } - futures.each { - allPipelineExecutions.addAll(it.get()) - } + allPipelineExecutions.addAll( + optimizedGetPipelineExecutions(application, allFront50Ids, executionCriteria) + ) } else { - allPipelineExecutions = rx.Observable.merge(allFrontIds.collect { + allPipelineExecutions = rx.Observable.merge(allFront50Ids.collect { log.debug("processing pipeline config id: $it") executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) }).subscribeOn(Schedulers.io()).toList().toBlocking().single() @@ -912,6 +874,81 @@ class TaskController { return false } + /** + * this optimized flow speeds up the execution retrieval process for all pipelines in an application. It + * does it in three steps: + *

+ * 1. It compares the list of pipeline config ids obtained from front50 with what is stored in the orca db itself. + * Rationale: We can ignore those process config ids that have no executions. The absence of a pipeline config + * id from the orca db indicates the same. So to reduce the number of config ids to process, we + * intersect the result obtained from front50 and orca db, which gives us the reduced list. + * Note: this could be further optimized by cutting front50 out from the picture completely. + * But I do not know what other side-effects that may cause, hence I am going ahead with the above logic. + * + *

+ * 2. It then uses the list of pipeline config ids obtained from step 1 and gets all the valid executions + * associated with each one of them. The valid executions are found after applying the execution criteria. + * + *

+ * 3. It then processes n pipeline executions at a time to retrieve the complete execution details. In addition, + * we make use of a configured thread pool so that multiple batches of n executions can be processed parallelly. + */ + private List optimizedGetPipelineExecutions(String application, + List front50PipelineConfigIds, ExecutionCriteria executionCriteria) { + List finalResult = [] + log.info("running optimized execution retrieval process with: " + + "${this.configurationProperties.getMaxExecutionRetrievalThreads()} threads and processing" + + " ${this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()} pipeline executions at a time") + + List commonIdsInFront50AndOrca + try { + List allOrcaIds = executionRepository.retrievePipelineConfigIdsForApplication(application) + log.info("found ${allOrcaIds.size()} pipeline config ids for application: $application in orca") + commonIdsInFront50AndOrca = front50PipelineConfigIds.intersect(allOrcaIds) + log.info("found ${commonIdsInFront50AndOrca.size()} pipeline config ids that are common in orca and front50 " + + "for application: $application." + + " Saved ${front50PipelineConfigIds.size() - commonIdsInFront50AndOrca.size()} extra pipeline config id queries") + } catch (Exception e) { + log.warn("retrieving pipeline config ids from orca db failed. using the result obtained from front50 ", e) + commonIdsInFront50AndOrca = front50PipelineConfigIds + } + + if (commonIdsInFront50AndOrca.size() == 0 ) { + log.info("no pipelines found") + return finalResult + } + + // get complete list of executions based on the execution criteria + log.info("filtering pipeline executions based on the execution criteria: " + + "limit: ${executionCriteria.getPageSize()}, statuses: ${executionCriteria.getStatuses()}") + List filteredPipelineExecutions = executionRepository.filterPipelineExecutionsForApplication(application, + commonIdsInFront50AndOrca, + executionCriteria + ) + if (filteredPipelineExecutions.size() == 0) { + log.info("no pipeline executions found") + return finalResult + } + + + def futures = new ArrayList(filteredPipelineExecutions.size()) + + log.info("processing ${filteredPipelineExecutions.size()} pipeline executions") + filteredPipelineExecutions + .collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()) + .each { + def chunkedList = it + futures.add( + executorService.submit({ + executionRepository.retrievePipelineExecutionsDetailsForApplication(application, chunkedList) + } as Callable)) + } + futures.each { + finalResult.addAll(it.get()) + } + return finalResult + } + @InheritConstructors @ResponseStatus(HttpStatus.NOT_IMPLEMENTED) private static class FeatureNotEnabledException extends RuntimeException {} diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java index e96c692ad4..e252f01541 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -17,6 +17,7 @@ package com.netflix.spinnaker.config; import com.netflix.spinnaker.orca.controllers.TaskController; +import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -27,7 +28,9 @@ public class TaskControllerConfigurationProperties { /** * flag to enable speeding up execution retrieval. This is applicable for the {@link - * TaskController#getPipelinesForApplication(String, int, String, Boolean)} endpoint + * TaskController#getPipelinesForApplication(String, int, String, Boolean)} endpoint. Only valid + * for {@link SqlExecutionRepository} currently. The implementation for all the other execution + * repositories needs to be added */ boolean optimizeExecutionRetrieval = false; @@ -36,17 +39,19 @@ public class TaskControllerConfigurationProperties { * process the queries to retrieve the executions. Needs to be tuned appropriately since this has * the potential to exhaust the connection pool size for the database. */ - int maxExecutionRetrievalThreads = 10; + int maxExecutionRetrievalThreads = 20; /** - * only applicable if optimizeExecutionRetrieval = true. It specifies how many pipeline config ids - * should be processed at a time. 15 pipeline config ids was selected as the default after testing + * only applicable if optimizeExecutionRetrieval = true. It specifies how many pipeline executions + * should be processed at a time. 30 pipeline executions was selected as the default after testing * this number against an orca sql db that contained lots of pipelines and executions for a single - * application (about 1200 pipelines and 1000 executions). More than 15 resulted in a query that - * took too long to complete. It will have to be tuned though, since 50 config ids work for some - * other applications easily but not for others. + * application (about 1200 pipelines and 1000 executions). Each execution was 1 MB or more in + * size. + * + *

It can be further tuned, depending on your setup, since 30 executions work well for some + * applications but a higher number may be appropriate for others. */ - int maxNumberOfPipelineConfigIdsToProcess = 15; + int maxNumberOfPipelineExecutionsToProcess = 30; /** moved this to here. Earlier definition was in the {@link TaskController} class */ int daysOfExecutionHistory = 14; From 471f2f9073f3f3217963f1c6afdcde88cb8eb1bb Mon Sep 17 00:00:00 2001 From: Apoorv Mahajan Date: Thu, 17 Jun 2021 13:40:34 -0700 Subject: [PATCH 4/8] feat(taskController): have a dedicated executor service threadpool per request --- .../orca/controllers/TaskController.groovy | 72 +++++++++++++------ ...TaskControllerConfigurationProperties.java | 6 ++ .../orca/controllers/TaskControllerTest.kt | 35 ++++++++- 3 files changed, 87 insertions(+), 26 deletions(-) diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 6bfd13c97c..f00acfa40e 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -45,12 +45,15 @@ import org.springframework.security.access.prepost.PreFilter import org.springframework.web.bind.annotation.* import rx.schedulers.Schedulers -import java.util.concurrent.Callable -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors import java.nio.charset.Charset import java.time.Clock import java.time.ZoneOffset +import java.util.concurrent.Callable +import java.util.concurrent.CancellationException +import java.util.concurrent.Executors +import java.util.concurrent.ExecutorService +import java.util.concurrent.Future +import java.util.concurrent.TimeoutException import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -77,8 +80,6 @@ class TaskController { TaskControllerConfigurationProperties configurationProperties Clock clock - ExecutorService executorService - TaskController(@Nullable Front50Service front50Service, ExecutionRepository executionRepository, ExecutionRunner executionRunner, @@ -103,10 +104,6 @@ class TaskController { this.stageDefinitionBuilderFactory = stageDefinitionBuilderFactory this.configurationProperties = configurationProperties this.clock = Clock.systemUTC() - this.executorService = Executors.newFixedThreadPool(this.configurationProperties.getMaxExecutionRetrievalThreads(), - new ThreadFactoryBuilder() - .setNameFormat("taskcontroller" + "-%d") - .build()) } @PreAuthorize("hasPermission(#application, 'APPLICATION', 'READ')") @@ -930,23 +927,52 @@ class TaskController { return finalResult } + // need to define a new executor service since we want a dedicated set of threads to be made available for every + // new request for performance reasons + ExecutorService executorService = Executors.newFixedThreadPool( + this.configurationProperties.getMaxExecutionRetrievalThreads(), + new ThreadFactoryBuilder() + .setNameFormat("application-" + application + "-%d") + .build()) - def futures = new ArrayList(filteredPipelineExecutions.size()) + try { + List>> futures = new ArrayList<>(filteredPipelineExecutions.size()) + log.info("processing ${filteredPipelineExecutions.size()} pipeline executions") + + // process a chunk of the executions at a time + filteredPipelineExecutions + .collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()) + .each { List chunkedExecutions -> + futures.add( + executorService.submit({ + List result = executionRepository.retrievePipelineExecutionsDetailsForApplication( + application, chunkedExecutions + ) + log.debug("completed execution retrieval for ${result.size()} executions") + return result + } as Callable>) + ) + } - log.info("processing ${filteredPipelineExecutions.size()} pipeline executions") - filteredPipelineExecutions - .collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()) - .each { - def chunkedList = it - futures.add( - executorService.submit({ - executionRepository.retrievePipelineExecutionsDetailsForApplication(application, chunkedList) - } as Callable)) - } - futures.each { - finalResult.addAll(it.get()) + futures.each { + Future> future -> + try { + finalResult.addAll( + future.get(this.configurationProperties.getExecutionRetrievalTimeoutSeconds(), TimeUnit.SECONDS) + ) + } catch (TimeoutException | CancellationException | InterruptedException e) { + log.warn("Task failed with unexpected error", e) + } + } + return finalResult + } finally { + // attempt to shutdown the executor service + try { + executorService.shutdownNow() + } catch (Exception e) { + log.warn("shutting down the executor service failed", e) + } } - return finalResult } @InheritConstructors diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java index e252f01541..4741dcb215 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -53,6 +53,12 @@ public class TaskControllerConfigurationProperties { */ int maxNumberOfPipelineExecutionsToProcess = 30; + /** + * only applicable if optimizeExecutionRetrieval = true. No retrieval thread should take more than + * 60s to complete. + */ + long executionRetrievalTimeoutSeconds = 60; + /** moved this to here. Earlier definition was in the {@link TaskController} class */ int daysOfExecutionHistory = 14; diff --git a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt index bb0ed39cf6..dc85b3e881 100644 --- a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt +++ b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt @@ -38,13 +38,14 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get import org.springframework.test.web.servlet.setup.MockMvcBuilders import strikt.api.expectThat import strikt.assertions.isEqualTo +import strikt.assertions.isTrue import java.time.Clock import java.time.Instant import java.time.ZoneId import java.time.temporal.ChronoUnit class TaskControllerTest : JUnit5Minutests { - data class Fixture(val optimizeExecution: Boolean) { + data class Fixture(val optimizeExecution: Boolean, val timeout: Double = 60.0) { private val clock: Clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()) val database: SqlTestUtil.TestDatabase = SqlTestUtil.initTcMysqlDatabase()!! @@ -61,7 +62,10 @@ class TaskControllerTest : JUnit5Minutests { ) private val taskControllerConfigurationProperties: TaskControllerConfigurationProperties = TaskControllerConfigurationProperties() - .apply { optimizeExecutionRetrieval = optimizeExecution } + .apply { + optimizeExecutionRetrieval = optimizeExecution + executionRetrievalTimeoutSeconds = timeout.toLong() + } private val daysOfExecutionHistory: Long = taskControllerConfigurationProperties.daysOfExecutionHistory.toLong() @@ -155,7 +159,7 @@ class TaskControllerTest : JUnit5Minutests { .thenReturn( listOf( mapOf("id" to "1"), - mapOf("id" to "2")), + mapOf("id" to "2")) ) Mockito.`when`(front50Service.getStrategies("test-app")) @@ -233,5 +237,30 @@ class TaskControllerTest : JUnit5Minutests { } } } + + context("execution retrieval with optimization having timeouts") { + fixture { + Fixture(true, 0.1) + } + + before { setup() } + after { cleanUp() } + + test("retrieve executions with limit = 2 & expand = false") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get("/applications/test-app/pipelines?limit=2&expand=false")).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + expectThat(results.isEmpty()).isTrue() + } + + test("retrieve executions with limit = 2 & expand = false with statuses") { + expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) + val response = subject.perform(get( + "/applications/test-app/pipelines?limit=2&expand=false&statuses=RUNNING,SUSPENDED,PAUSED,NOT_STARTED") + ).andReturn().response + val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) + expectThat(results.isEmpty()).isTrue() + } + } } } From 77d0b12483dac93fcf4c74674511936277559d22 Mon Sep 17 00:00:00 2001 From: kirangodishala Date: Fri, 18 Jun 2021 07:08:50 +0530 Subject: [PATCH 5/8] fix(sql): add correct indexes refactor function and variable names to make them more descriptive --- .../persistence/DualExecutionRepository.kt | 18 ++++--- .../persistence/ExecutionRepository.java | 4 +- .../InMemoryExecutionRepository.kt | 15 +++--- .../jedis/RedisExecutionRepository.java | 7 ++- .../persistence/SqlExecutionRepository.kt | 47 +++++++++-------- .../orca/controllers/TaskController.groovy | 51 ++++++++++--------- ...TaskControllerConfigurationProperties.java | 14 ++--- 7 files changed, 85 insertions(+), 71 deletions(-) diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index 20be61abc9..82db894cc4 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -210,19 +210,21 @@ class DualExecutionRepository( ).distinct { it.id } } - override fun filterPipelineExecutionsForApplication(@Nonnull application: String, - @Nonnull pipelineConfigIds: List, - @Nonnull criteria: ExecutionCriteria): List{ - return primary.filterPipelineExecutionsForApplication(application, pipelineConfigIds, criteria) + - previous.filterPipelineExecutionsForApplication(application,pipelineConfigIds, criteria) + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + @Nonnull application: String, + @Nonnull pipelineConfigIds: List, + @Nonnull criteria: ExecutionCriteria + ): List { + return primary.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) + + previous.retrieveAndFilterPipelineExecutionIdsForApplication(application, pipelineConfigIds, criteria) } - override fun retrievePipelineExecutionsDetailsForApplication( + override fun retrievePipelineExecutionDetailsForApplication( @Nonnull application: String, pipelineConfigIds: List): Collection { return ( - primary.retrievePipelineExecutionsDetailsForApplication(application, pipelineConfigIds) + - previous.retrievePipelineExecutionsDetailsForApplication(application,pipelineConfigIds) + primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) + + previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) ).distinctBy { it.id } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 43b6ee04f7..6f2d65aeac 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -96,13 +96,13 @@ Observable retrievePipelinesForPipelineConfigId( Collection retrievePipelineConfigIdsForApplication(@Nonnull String application); @Nonnull - Collection filterPipelineExecutionsForApplication( + Collection retrieveAndFilterPipelineExecutionIdsForApplication( @Nonnull String application, @Nonnull List pipelineConfigIds, @Nonnull ExecutionCriteria criteria); @Nonnull - Collection retrievePipelineExecutionsDetailsForApplication( + Collection retrievePipelineExecutionDetailsForApplication( @Nonnull String application, @Nonnull List pipelineConfigIds); /** diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 1537a4f71a..51c1a5cb4d 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -279,21 +279,23 @@ class InMemoryExecutionRepository : ExecutionRepository { override fun retrievePipelineConfigIdsForApplication(application: String): List { return pipelines.values - .filter { it.application == application } + .filter { it.application == application } .map { it.pipelineConfigId } .distinct() } - override fun filterPipelineExecutionsForApplication(@Nonnull application: String, - @Nonnull pipelineConfigIds: List, - @Nonnull criteria: ExecutionCriteria): List { + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + @Nonnull application: String, + @Nonnull pipelineConfigIds: List, + @Nonnull criteria: ExecutionCriteria + ): List { return pipelines.values - .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } + .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } .applyCriteria(criteria) .map { it.id } } - override fun retrievePipelineExecutionsDetailsForApplication( + override fun retrievePipelineExecutionDetailsForApplication( application: String, pipelineConfigIds: List): Collection { return pipelines.values @@ -301,7 +303,6 @@ class InMemoryExecutionRepository : ExecutionRepository { .distinctBy { it.id } } - override fun retrieveOrchestrationForCorrelationId(correlationId: String): PipelineExecution { return retrieveByCorrelationId(ORCHESTRATION, correlationId) } diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index e77deed8a8..aef959dab8 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -55,7 +55,6 @@ import javax.annotation.Nonnull; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -490,7 +489,7 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet } @Override - public @Nonnull List filterPipelineExecutionsForApplication( + public @Nonnull List retrieveAndFilterPipelineExecutionIdsForApplication( @Nonnull String application, @Nonnull List pipelineConfigIds, @Nonnull ExecutionCriteria criteria) { @@ -501,8 +500,8 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet } @Override - public @NotNull List retrievePipelineExecutionsDetailsForApplication( - @Nonnull String application, @NotNull List pipelineExecutionIds) { + public @Nonnull List retrievePipelineExecutionDetailsForApplication( + @Nonnull String application, @Nonnull List pipelineExecutionIds) { // TODO: not implemented yet - this method, at present, is primarily meant for the // SqlExecutionRepository // implementation. diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 243e3de5b5..134fa6acb0 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -453,14 +453,14 @@ class SqlExecutionRepository( override fun retrievePipelineConfigIdsForApplication(application: String): List = withPool(poolName) { - return jooq.selectDistinct(field("config_id")) - .from(PIPELINE.tableName) // not adding index here as it slowed down the query - .where(field("application").eq(application)) - .fetch(0, String::class.java) + return jooq.selectDistinct(field("config_id")) + .from(PIPELINE.tableName) + .where(field("application").eq(application)) + .fetch(0, String::class.java) } /** - * this function supports the following ExecutionCriteria currently: + * this function supports the following ExecutionCriteria currently: * 'limit', a.k.a page size and * 'statuses'. * @@ -470,13 +470,13 @@ class SqlExecutionRepository( * It does this by executing the following query: * - If the execution criteria does not contain any statuses: * SELECT config_id, id - FROM pipelines force index (`pipeline_config_id_idx`) + FROM pipelines force index (`pipeline_application_idx`) WHERE application = "myapp" ORDER BY config_id; * - If the execution criteria contains statuses: * SELECT config_id, id - FROM pipelines force index (`pipeline_config_id_idx`) + FROM pipelines force index (`pipeline_application_status_starttime_idx`) WHERE ( application = "myapp" and status in ("status1", "status2) @@ -485,33 +485,37 @@ class SqlExecutionRepository( config_id; * It then applies the limit execution criteria on the result set obtained above. We observed load issues in the db - * when running a query where the limit was calculated in the query itself . Thereforce, we are moving that logic to + * when running a query where the limit was calculated in the query itself. Therefore, we are moving that logic to * the code below to ease the burden on the db in such circumstances. */ - override fun filterPipelineExecutionsForApplication(application: String, - pipelineConfigIds: List, - criteria: ExecutionCriteria): List { + override fun retrieveAndFilterPipelineExecutionIdsForApplication( + application: String, + pipelineConfigIds: List, + criteria: ExecutionCriteria + ): List { // baseQueryPredicate for the flow where there are no statuses in the execution criteria var baseQueryPredicate = field("application").eq(application) .and(field("config_id").`in`(*pipelineConfigIds.toTypedArray())) + var table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") + else PIPELINE.tableName // baseQueryPredicate for the flow with statuses if (criteria.statuses.isNotEmpty() && criteria.statuses.size != ExecutionStatus.values().size) { val statusStrings = criteria.statuses.map { it.toString() } baseQueryPredicate = baseQueryPredicate .and(field("status").`in`(*statusStrings.toTypedArray())) + + table = if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_status_starttime_idx") + else PIPELINE.tableName } val finalResult: MutableList = mutableListOf() log.info("getting execution ids") withPool(poolName) { - val baseQuery = jooq.select(field("config_id"), field("id")) - .from( - if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_config_id_idx") - else PIPELINE.tableName - ) + val baseQuery = jooq.select(field("config_id"), field("id")) + .from(table) .where(baseQueryPredicate) .orderBy(field("config_id")) .fetch().intoGroups("config_id", "id") @@ -536,7 +540,7 @@ class SqlExecutionRepository( * It executes the following query to get execution details for n executions at a time in a specific application * * SELECT id, body, compressed_body, compression_type, `partition` - FROM pipelines + FROM pipelines force index (`pipeline_application_idx`) left outer join pipelines_compressed_executions using (`id`) @@ -545,14 +549,17 @@ class SqlExecutionRepository( id in ('id1', 'id2', 'id3') ); * - * it then get all the stage information for all the executions returned from the above query. + * it then gets all the stage information for all the executions returned from the above query. */ - override fun retrievePipelineExecutionsDetailsForApplication( + override fun retrievePipelineExecutionDetailsForApplication( application: String, pipelineExecutions: List): Collection { withPool(poolName) { val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) - .from(PIPELINE.tableName) + .from( + if (jooq.dialect() == SQLDialect.MYSQL) PIPELINE.tableName.forceIndex("pipeline_application_idx") + else PIPELINE.tableName + ) .leftOuterJoin(PIPELINE.tableName.compressedExecTable).using(field("id")) .where( field("application").eq(application) diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index f00acfa40e..abfa53da2b 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -612,16 +612,17 @@ class TaskController { log.info("received ${pipelineConfigIds.size()} pipelines for application: $application from front50") def strategyConfigIds = front50Service.getStrategies(application)*.id as List log.info("received ${strategyConfigIds.size()} strategies for application: $application from front50") - def allFront50Ids = pipelineConfigIds + strategyConfigIds + + def allFront50PipelineConfigIds = pipelineConfigIds + strategyConfigIds List allPipelineExecutions = [] if (this.configurationProperties.getOptimizeExecutionRetrieval()) { allPipelineExecutions.addAll( - optimizedGetPipelineExecutions(application, allFront50Ids, executionCriteria) + optimizedGetPipelineExecutions(application, allFront50PipelineConfigIds, executionCriteria) ) } else { - allPipelineExecutions = rx.Observable.merge(allFront50Ids.collect { + allPipelineExecutions = rx.Observable.merge(allFront50PipelineConfigIds.collect { log.debug("processing pipeline config id: $it") executionRepository.retrievePipelinesForPipelineConfigId(it, executionCriteria) }).subscribeOn(Schedulers.io()).toList().toBlocking().single() @@ -897,32 +898,34 @@ class TaskController { "${this.configurationProperties.getMaxExecutionRetrievalThreads()} threads and processing" + " ${this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()} pipeline executions at a time") - List commonIdsInFront50AndOrca + List commonPipelineConfigIdsInFront50AndOrca try { - List allOrcaIds = executionRepository.retrievePipelineConfigIdsForApplication(application) - log.info("found ${allOrcaIds.size()} pipeline config ids for application: $application in orca") - commonIdsInFront50AndOrca = front50PipelineConfigIds.intersect(allOrcaIds) - log.info("found ${commonIdsInFront50AndOrca.size()} pipeline config ids that are common in orca and front50 " + - "for application: $application." + - " Saved ${front50PipelineConfigIds.size() - commonIdsInFront50AndOrca.size()} extra pipeline config id queries") + List allOrcaPipelineConfigIds = executionRepository.retrievePipelineConfigIdsForApplication(application) + log.info("found ${allOrcaPipelineConfigIds.size()} pipeline config ids for application: $application in orca") + commonPipelineConfigIdsInFront50AndOrca = front50PipelineConfigIds.intersect(allOrcaPipelineConfigIds) + log.info("found ${commonPipelineConfigIdsInFront50AndOrca.size()} pipeline config ids that are common in orca " + + "and front50 for application: $application. " + + "Saved ${front50PipelineConfigIds.size() - commonPipelineConfigIdsInFront50AndOrca.size()} extra pipeline " + + "config id queries") } catch (Exception e) { log.warn("retrieving pipeline config ids from orca db failed. using the result obtained from front50 ", e) - commonIdsInFront50AndOrca = front50PipelineConfigIds + commonPipelineConfigIdsInFront50AndOrca = front50PipelineConfigIds } - if (commonIdsInFront50AndOrca.size() == 0 ) { - log.info("no pipelines found") + if (commonPipelineConfigIdsInFront50AndOrca.size() == 0 ) { + log.info("no pipeline config ids found.") return finalResult } // get complete list of executions based on the execution criteria log.info("filtering pipeline executions based on the execution criteria: " + "limit: ${executionCriteria.getPageSize()}, statuses: ${executionCriteria.getStatuses()}") - List filteredPipelineExecutions = executionRepository.filterPipelineExecutionsForApplication(application, - commonIdsInFront50AndOrca, + List filteredPipelineExecutionIds = executionRepository.retrieveAndFilterPipelineExecutionIdsForApplication( + application, + commonPipelineConfigIdsInFront50AndOrca, executionCriteria ) - if (filteredPipelineExecutions.size() == 0) { + if (filteredPipelineExecutionIds.size() == 0) { log.info("no pipeline executions found") return finalResult } @@ -936,16 +939,16 @@ class TaskController { .build()) try { - List>> futures = new ArrayList<>(filteredPipelineExecutions.size()) - log.info("processing ${filteredPipelineExecutions.size()} pipeline executions") + List>> futures = new ArrayList<>(filteredPipelineExecutionIds.size()) + log.info("processing ${filteredPipelineExecutionIds.size()} pipeline executions") // process a chunk of the executions at a time - filteredPipelineExecutions + filteredPipelineExecutionIds .collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()) .each { List chunkedExecutions -> futures.add( executorService.submit({ - List result = executionRepository.retrievePipelineExecutionsDetailsForApplication( + List result = executionRepository.retrievePipelineExecutionDetailsForApplication( application, chunkedExecutions ) log.debug("completed execution retrieval for ${result.size()} executions") @@ -960,8 +963,10 @@ class TaskController { finalResult.addAll( future.get(this.configurationProperties.getExecutionRetrievalTimeoutSeconds(), TimeUnit.SECONDS) ) - } catch (TimeoutException | CancellationException | InterruptedException e) { - log.warn("Task failed with unexpected error", e) + } catch (Exception e) { + // no need to fail the entire thing if one thread fails. This means the final output will simply not + // contain any of these failed executions. + log.error("Task failed with error", e) } } return finalResult @@ -970,7 +975,7 @@ class TaskController { try { executorService.shutdownNow() } catch (Exception e) { - log.warn("shutting down the executor service failed", e) + log.error("shutting down the executor service failed", e) } } } diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java index 4741dcb215..a2ff97691a 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -39,19 +39,19 @@ public class TaskControllerConfigurationProperties { * process the queries to retrieve the executions. Needs to be tuned appropriately since this has * the potential to exhaust the connection pool size for the database. */ - int maxExecutionRetrievalThreads = 20; + int maxExecutionRetrievalThreads = 10; /** * only applicable if optimizeExecutionRetrieval = true. It specifies how many pipeline executions - * should be processed at a time. 30 pipeline executions was selected as the default after testing - * this number against an orca sql db that contained lots of pipelines and executions for a single - * application (about 1200 pipelines and 1000 executions). Each execution was 1 MB or more in - * size. + * should be processed at a time. 150 pipeline executions was selected as the default after + * testing this number against an orca sql db that contained lots of pipelines and executions for + * a single application (about 1200 pipelines and 1500 executions). Each execution was 1 MB or + * more in size. * - *

It can be further tuned, depending on your setup, since 30 executions work well for some + *

It can be further tuned, depending on your setup, since 150 executions work well for some * applications but a higher number may be appropriate for others. */ - int maxNumberOfPipelineExecutionsToProcess = 30; + int maxNumberOfPipelineExecutionsToProcess = 150; /** * only applicable if optimizeExecutionRetrieval = true. No retrieval thread should take more than From 3b3894bfe42164381eaddaf8c2cc3997a0912d05 Mon Sep 17 00:00:00 2001 From: Apoorv Mahajan Date: Tue, 22 Jun 2021 14:24:57 -0700 Subject: [PATCH 6/8] feat(taskController): add query timeouts when retrieving execution body --- .../persistence/DualExecutionRepository.kt | 8 +-- .../persistence/ExecutionRepository.java | 4 +- .../InMemoryExecutionRepository.kt | 4 +- .../jedis/RedisExecutionRepository.java | 4 +- .../persistence/SqlExecutionRepository.kt | 5 +- .../orca/controllers/TaskController.groovy | 49 +++++++++---------- ...TaskControllerConfigurationProperties.java | 6 +-- .../orca/controllers/TaskControllerTest.kt | 36 ++++++-------- 8 files changed, 59 insertions(+), 57 deletions(-) diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt index 82db894cc4..cc32aa958b 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/DualExecutionRepository.kt @@ -221,10 +221,12 @@ class DualExecutionRepository( override fun retrievePipelineExecutionDetailsForApplication( @Nonnull application: String, - pipelineConfigIds: List): Collection { + pipelineConfigIds: List, + queryTimeoutSeconds: Int + ): Collection { return ( - primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) + - previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds) + primary.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) + + previous.retrievePipelineExecutionDetailsForApplication(application, pipelineConfigIds, queryTimeoutSeconds) ).distinctBy { it.id } } diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java index 6f2d65aeac..4cfe657aef 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/ExecutionRepository.java @@ -103,7 +103,9 @@ Collection retrieveAndFilterPipelineExecutionIdsForApplication( @Nonnull Collection retrievePipelineExecutionDetailsForApplication( - @Nonnull String application, @Nonnull List pipelineConfigIds); + @Nonnull String application, + @Nonnull List pipelineConfigIds, + int queryTimeoutSeconds); /** * Returns executions in the time boundary. Redis impl does not respect pageSize or offset params, diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt index 51c1a5cb4d..96ee40242d 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/InMemoryExecutionRepository.kt @@ -297,7 +297,9 @@ class InMemoryExecutionRepository : ExecutionRepository { override fun retrievePipelineExecutionDetailsForApplication( application: String, - pipelineConfigIds: List): Collection { + pipelineConfigIds: List, + queryTimeoutSeconds: Int + ): Collection { return pipelines.values .filter { it.application == application && pipelineConfigIds.contains(it.pipelineConfigId) } .distinctBy { it.id } diff --git a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java index aef959dab8..bfc6070edb 100644 --- a/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java +++ b/orca-redis/src/main/java/com/netflix/spinnaker/orca/pipeline/persistence/jedis/RedisExecutionRepository.java @@ -501,7 +501,9 @@ public void delete(@Nonnull ExecutionType type, @Nonnull List idsToDelet @Override public @Nonnull List retrievePipelineExecutionDetailsForApplication( - @Nonnull String application, @Nonnull List pipelineExecutionIds) { + @Nonnull String application, + @Nonnull List pipelineExecutionIds, + int queryTimeoutSeconds) { // TODO: not implemented yet - this method, at present, is primarily meant for the // SqlExecutionRepository // implementation. diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 134fa6acb0..889e439507 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -553,7 +553,9 @@ class SqlExecutionRepository( */ override fun retrievePipelineExecutionDetailsForApplication( application: String, - pipelineExecutions: List): Collection { + pipelineExecutions: List, + queryTimeoutSeconds: Int + ): Collection { withPool(poolName) { val baseQuery = jooq.select(selectExecutionFields(compressionProperties)) .from( @@ -565,6 +567,7 @@ class SqlExecutionRepository( field("application").eq(application) .and(field("id").`in`(*pipelineExecutions.toTypedArray())) ) + .queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever .fetch() log.info("getting stage information for all the executions found so far") diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index abfa53da2b..49c12bcbed 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -49,11 +49,9 @@ import java.nio.charset.Charset import java.time.Clock import java.time.ZoneOffset import java.util.concurrent.Callable -import java.util.concurrent.CancellationException import java.util.concurrent.Executors import java.util.concurrent.ExecutorService import java.util.concurrent.Future -import java.util.concurrent.TimeoutException import java.util.concurrent.TimeUnit import java.util.stream.Collectors @@ -889,7 +887,7 @@ class TaskController { * *

* 3. It then processes n pipeline executions at a time to retrieve the complete execution details. In addition, - * we make use of a configured thread pool so that multiple batches of n executions can be processed parallelly. + * we make use of a configured thread pool to process multiple batches of n executions in parallel. */ private List optimizedGetPipelineExecutions(String application, List front50PipelineConfigIds, ExecutionCriteria executionCriteria) { @@ -946,34 +944,33 @@ class TaskController { filteredPipelineExecutionIds .collate(this.configurationProperties.getMaxNumberOfPipelineExecutionsToProcess()) .each { List chunkedExecutions -> - futures.add( - executorService.submit({ - List result = executionRepository.retrievePipelineExecutionDetailsForApplication( - application, chunkedExecutions - ) - log.debug("completed execution retrieval for ${result.size()} executions") - return result - } as Callable>) - ) + futures.add(executorService.submit({ + try { + List result = executionRepository.retrievePipelineExecutionDetailsForApplication( + application, + chunkedExecutions, + this.configurationProperties.getExecutionRetrievalTimeoutSeconds() + ) + log.debug("completed execution retrieval for ${result.size()} executions") + return result + } catch (Exception e) { // handle exceptions such as query timeouts etc. + log.error("error occurred while retrieving these executions: ${chunkedExecutions.toString()} " + + "for application: ${application}.", e) + // in case of errors, this will return partial results. We are going with this best-effort approach + // because the UI keeps refreshing the executions view frequently. Hence, the user will eventually see + // these executions via one of the subsequent calls. Partial data is better than an exception at this + // point since the latter will result in a UI devoid of any executions. + // + return [] + } + } as Callable>)) } - futures.each { - Future> future -> - try { - finalResult.addAll( - future.get(this.configurationProperties.getExecutionRetrievalTimeoutSeconds(), TimeUnit.SECONDS) - ) - } catch (Exception e) { - // no need to fail the entire thing if one thread fails. This means the final output will simply not - // contain any of these failed executions. - log.error("Task failed with error", e) - } - } + futures.each { Future> future -> finalResult.addAll(future.get()) } return finalResult } finally { - // attempt to shutdown the executor service try { - executorService.shutdownNow() + executorService.shutdownNow() // attempt to shutdown the executor service } catch (Exception e) { log.error("shutting down the executor service failed", e) } diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java index a2ff97691a..8407cb5598 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -54,10 +54,10 @@ public class TaskControllerConfigurationProperties { int maxNumberOfPipelineExecutionsToProcess = 150; /** - * only applicable if optimizeExecutionRetrieval = true. No retrieval thread should take more than - * 60s to complete. + * only applicable if optimizeExecutionRetrieval = true. It specifies the max time after which the + * execution retrieval query will timeout. */ - long executionRetrievalTimeoutSeconds = 60; + int executionRetrievalTimeoutSeconds = 60; /** moved this to here. Earlier definition was in the {@link TaskController} class */ int daysOfExecutionHistory = 14; diff --git a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt index dc85b3e881..7c13c15dc8 100644 --- a/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt +++ b/orca-web/src/test/kotlin/com/netflix/spinnaker/orca/controllers/TaskControllerTest.kt @@ -30,28 +30,31 @@ import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepositor import com.nhaarman.mockito_kotlin.mock import dev.minutest.junit.JUnit5Minutests import dev.minutest.rootContext +import org.jooq.exception.DataAccessException import org.jooq.impl.DSL.field import org.jooq.impl.DSL.table +import org.junit.Assert.assertThrows +import org.junit.jupiter.api.assertThrows import org.mockito.Mockito import org.springframework.test.web.servlet.MockMvc import org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get import org.springframework.test.web.servlet.setup.MockMvcBuilders +import strikt.api.expectCatching import strikt.api.expectThat +import strikt.assertions.isA import strikt.assertions.isEqualTo -import strikt.assertions.isTrue +import strikt.assertions.isFailure import java.time.Clock import java.time.Instant import java.time.ZoneId import java.time.temporal.ChronoUnit class TaskControllerTest : JUnit5Minutests { - data class Fixture(val optimizeExecution: Boolean, val timeout: Double = 60.0) { + data class Fixture(val optimizeExecution: Boolean) { private val clock: Clock = Clock.fixed(Instant.now(), ZoneId.systemDefault()) val database: SqlTestUtil.TestDatabase = SqlTestUtil.initTcMysqlDatabase()!! - - private val executionRepository: SqlExecutionRepository = SqlExecutionRepository( partitionName = "test", jooq = database.context, @@ -64,7 +67,6 @@ class TaskControllerTest : JUnit5Minutests { private val taskControllerConfigurationProperties: TaskControllerConfigurationProperties = TaskControllerConfigurationProperties() .apply { optimizeExecutionRetrieval = optimizeExecution - executionRetrievalTimeoutSeconds = timeout.toLong() } private val daysOfExecutionHistory: Long = taskControllerConfigurationProperties.daysOfExecutionHistory.toLong() @@ -238,28 +240,20 @@ class TaskControllerTest : JUnit5Minutests { } } - context("execution retrieval with optimization having timeouts") { + context("test query having explicit query timeouts") { fixture { - Fixture(true, 0.1) + Fixture(true) } before { setup() } after { cleanUp() } - test("retrieve executions with limit = 2 & expand = false") { - expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) - val response = subject.perform(get("/applications/test-app/pipelines?limit=2&expand=false")).andReturn().response - val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) - expectThat(results.isEmpty()).isTrue() - } - - test("retrieve executions with limit = 2 & expand = false with statuses") { - expectThat(database.context.fetchCount(table("pipelines"))).isEqualTo(5) - val response = subject.perform(get( - "/applications/test-app/pipelines?limit=2&expand=false&statuses=RUNNING,SUSPENDED,PAUSED,NOT_STARTED") - ).andReturn().response - val results = OrcaObjectMapper.getInstance().readValue(response.contentAsString, object : TypeReference>() {}) - expectThat(results.isEmpty()).isTrue() + test("it returns a DataAccessException on query timeout") { + expectCatching { + database.context.select(field("sleep(10)")).queryTimeout(1).execute() + } + .isFailure() + .isA() } } } From ddf724e4cd81ba6fe32eb27165e3a39eb566656f Mon Sep 17 00:00:00 2001 From: kirangodishala Date: Tue, 10 Dec 2024 23:25:10 +0530 Subject: [PATCH 7/8] fix(log): minor log refactor --- .../sql/pipeline/persistence/SqlExecutionRepository.kt | 3 +-- .../spinnaker/orca/controllers/TaskController.groovy | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index 889e439507..abd49437d5 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -512,7 +512,6 @@ class SqlExecutionRepository( val finalResult: MutableList = mutableListOf() - log.info("getting execution ids") withPool(poolName) { val baseQuery = jooq.select(field("config_id"), field("id")) .from(table) @@ -570,7 +569,7 @@ class SqlExecutionRepository( .queryTimeout(queryTimeoutSeconds) // add an explicit timeout so that the query doesn't run forever .fetch() - log.info("getting stage information for all the executions found so far") + log.debug("getting stage information for all the executions found so far") return ExecutionMapper(mapper, stageReadSize,compressionProperties, pipelineRefEnabled).map(baseQuery.intoResultSet(), jooq) } } diff --git a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy index 49c12bcbed..51c7b41c1a 100644 --- a/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy +++ b/orca-web/src/main/groovy/com/netflix/spinnaker/orca/controllers/TaskController.groovy @@ -607,9 +607,9 @@ class TaskController { // get all relevant pipeline and strategy configs from front50 def pipelineConfigIds = front50Service.getPipelines(application, false)*.id as List - log.info("received ${pipelineConfigIds.size()} pipelines for application: $application from front50") + log.debug("received ${pipelineConfigIds.size()} pipelines for application: $application from front50") def strategyConfigIds = front50Service.getStrategies(application)*.id as List - log.info("received ${strategyConfigIds.size()} strategies for application: $application from front50") + log.debug("received ${strategyConfigIds.size()} strategies for application: $application from front50") def allFront50PipelineConfigIds = pipelineConfigIds + strategyConfigIds @@ -628,11 +628,11 @@ class TaskController { allPipelineExecutions.sort(startTimeOrId) if (!expand) { - log.info("unexpanding pipeline executions") + log.debug("unexpanding pipeline executions") unexpandPipelineExecutions(allPipelineExecutions) } - log.info("filtering pipelines by history") + log.debug("filtering pipelines by history") return filterPipelinesByHistoryCutoff(allPipelineExecutions, limit) } From 28ca6e406a867897c4e287891a2f485737938235 Mon Sep 17 00:00:00 2001 From: kirangodishala Date: Wed, 18 Dec 2024 14:34:43 +0530 Subject: [PATCH 8/8] fix(taskController): adjusted default config values --- .../TaskControllerConfigurationProperties.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java index 8407cb5598..658666bd15 100644 --- a/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java +++ b/orca-web/src/main/java/com/netflix/spinnaker/config/TaskControllerConfigurationProperties.java @@ -39,19 +39,19 @@ public class TaskControllerConfigurationProperties { * process the queries to retrieve the executions. Needs to be tuned appropriately since this has * the potential to exhaust the connection pool size for the database. */ - int maxExecutionRetrievalThreads = 10; + int maxExecutionRetrievalThreads = 4; /** * only applicable if optimizeExecutionRetrieval = true. It specifies how many pipeline executions - * should be processed at a time. 150 pipeline executions was selected as the default after - * testing this number against an orca sql db that contained lots of pipelines and executions for - * a single application (about 1200 pipelines and 1500 executions). Each execution was 1 MB or - * more in size. + * should be processed at a time. 150 worked with an orca sql db that contained lots of pipelines + * and executions for a single application (about 1200 pipelines and 1500 executions with each + * execution of size >= 1 MB). 50 is kept as default, keeping in view that majority of the cases + * have lesser number of executions. * *

It can be further tuned, depending on your setup, since 150 executions work well for some * applications but a higher number may be appropriate for others. */ - int maxNumberOfPipelineExecutionsToProcess = 150; + int maxNumberOfPipelineExecutionsToProcess = 50; /** * only applicable if optimizeExecutionRetrieval = true. It specifies the max time after which the