Skip to content

Commit

Permalink
perf: 支持流水线批量取消运行中构建 TencentBlueKing#10858 增加批量取消处理事件
Browse files Browse the repository at this point in the history
  • Loading branch information
royalhuang committed Aug 22, 2024
1 parent 5fd8da8 commit cd32648
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ object MQ {
const val ROUTE_PIPELINE_BUILD_FINISH = "r.engine.pipeline.build.finish"
const val QUEUE_PIPELINE_BUILD_FINISH = "q.engine.pipeline.build.finish"

const val ROUTE_PIPELINE_BUILD_BATCH_CANCEL = "r.engine.pipeline.build.cancel"
const val QUEUE_PIPELINE_BUILD_BATCH_CANCEL = "q.engine.pipeline.build.cancel"
const val ROUTE_PIPELINE_BUILD_BATCH_FINISH = "r.engine.pipeline.build.finish"
const val QUEUE_PIPELINE_BUILD_BATCH_FINISH = "q.engine.pipeline.build.finish"

const val ROUTE_PIPELINE_PAUSE_TASK_EXECUTE = "r.engine.pipeline.pause.task.execute"
const val QUEUE_PIPELINE_PAUSE_TASK_EXECUTE = "q.engine.pipeline.pause.task.execute"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.devops.process.engine.pojo.event

import com.tencent.devops.common.event.annotation.Event
import com.tencent.devops.common.event.dispatcher.pipeline.mq.MQ
import com.tencent.devops.common.event.enums.ActionType
import com.tencent.devops.common.event.pojo.pipeline.IPipelineEvent
import com.tencent.devops.common.pipeline.enums.BuildStatus

/**
*
*
* @version 1.0
*/
@Event(MQ.ENGINE_PROCESS_LISTENER_EXCHANGE, MQ.ROUTE_PIPELINE_BUILD_BATCH_CANCEL, 2000)
data class PipelineBuildBatchCancelEvent(
override val source: String,
override val projectId: String,
override val pipelineId: String,
override val userId: String,
val buildIds: List<String>,
val status: BuildStatus = BuildStatus.CANCELED,
val buildNum: Int? = null,
override var actionType: ActionType = ActionType.END,
override var delayMills: Int = 2000
) : IPipelineEvent(actionType, source, projectId, pipelineId, userId, delayMills)
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.devops.process.engine.pojo.event

import com.tencent.devops.common.event.annotation.Event
import com.tencent.devops.common.event.dispatcher.pipeline.mq.MQ
import com.tencent.devops.common.event.enums.ActionType
import com.tencent.devops.common.event.pojo.pipeline.IPipelineEvent
import com.tencent.devops.common.pipeline.enums.BuildStatus
import com.tencent.devops.common.api.pojo.ErrorType

/**
*
*
* @version 1.0
*/
@Event(MQ.ENGINE_PROCESS_LISTENER_EXCHANGE, MQ.ROUTE_PIPELINE_BUILD_BATCH_FINISH)
data class PipelineBuildBatchFinishEvent(
override val source: String,
override val projectId: String,
override val pipelineId: String,
override val userId: String,
val buildIds: List<String>,
val status: BuildStatus,
override var actionType: ActionType = ActionType.END,
override var delayMills: Int = 0,
val errorType: ErrorType? = null,
val errorCode: Int? = null,
val errorMsg: String? = null
) : IPipelineEvent(actionType, source, projectId, pipelineId, userId, delayMills)
Original file line number Diff line number Diff line change
Expand Up @@ -91,24 +91,30 @@ class BuildCancelControl @Autowired constructor(
}

@BkTimed
fun handle(event: PipelineBuildCancelEvent) {
fun handle(event: PipelineBuildCancelEvent, batchCancel: Boolean = false): Boolean {
val watcher = Watcher(id = "ENGINE|BuildCancel|${event.traceId}|${event.buildId}|${event.status}")
val redisLock = BuildIdLock(redisOperation = redisOperation, buildId = event.buildId)
try {
watcher.start("lock")
redisLock.lock()
watcher.start("execute")
execute(event)
// 如果不是批量取消的请求,则直接下发流水线完成时间
val needFinish = cancelAndNeedFinish(event)
if (needFinish && !batchCancel) {
sendBuildFinishEvent(event)
}
return needFinish
} catch (ignored: Exception) {
LOG.error("ENGINE|${event.buildId}|{${event.source}}|build finish fail: $ignored", ignored)
} finally {
redisLock.unlock()
watcher.stop()
LogUtils.printCostTimeWE(watcher = watcher)
}
return false
}

private fun execute(event: PipelineBuildCancelEvent): Boolean {
private fun cancelAndNeedFinish(event: PipelineBuildCancelEvent): Boolean {
val buildId = event.buildId
val buildInfo = pipelineRuntimeService.getBuildInfo(projectId = event.projectId, buildId = buildId)
// 已经结束的构建,不再受理,抛弃消息
Expand All @@ -118,57 +124,53 @@ class BuildCancelControl @Autowired constructor(
}

val model = pipelineBuildDetailService.getBuildModel(projectId = event.projectId, buildId = buildId)
return if (model != null) {
LOG.info("ENGINE|${event.buildId}|${event.source}|CANCEL|status=${event.status}")
if (event.actionType != ActionType.TERMINATE) {
// 往redis中设置取消构建标识以防止重复提交
setBuildCancelActionRedisFlag(buildId)
?: return false
LOG.info("ENGINE|${event.buildId}|${event.source}|CANCEL|status=${event.status}")
if (event.actionType != ActionType.TERMINATE) {
// 往redis中设置取消构建标识以防止重复提交
setBuildCancelActionRedisFlag(buildId)
}
cancelAllPendingTask(event = event, model = model)
if (event.actionType == ActionType.TERMINATE) {
// 修改detail model
pipelineBuildRecordService.buildCancel(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = event.buildId,
buildStatus = event.status,
cancelUser = event.userId,
executeCount = buildInfo.executeCount ?: 1
)
}

// 排队的则不再获取Pending Stage,防止Final Stage被执行
val pendingStage: PipelineBuildStage? =
if (buildInfo.status.isReadyToRun() || buildInfo.status.isNeverRun()) {
null
} else {
pipelineStageService.getPendingStage(event.projectId, buildId)
}
cancelAllPendingTask(event = event, model = model)
if (event.actionType == ActionType.TERMINATE) {
// 修改detail model
pipelineBuildRecordService.buildCancel(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = event.buildId,
buildStatus = event.status,
cancelUser = event.userId,
executeCount = buildInfo.executeCount ?: 1
measureService?.postCancelData(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = buildId,
userId = event.userId
)
return if (pendingStage != null) {
if (pendingStage.status.isPause()) { // 处于审核暂停的Stage需要走取消Stage逻辑
pipelineStageService.cancelStageBySystem(
userId = event.userId,
buildInfo = buildInfo,
buildStage = pendingStage,
timeout = false
)
}

// 排队的则不再获取Pending Stage,防止Final Stage被执行
val pendingStage: PipelineBuildStage? =
if (buildInfo.status.isReadyToRun() || buildInfo.status.isNeverRun()) {
null
} else {
pipelineStageService.getPendingStage(event.projectId, buildId)
}

if (pendingStage != null) {
if (pendingStage.status.isPause()) { // 处于审核暂停的Stage需要走取消Stage逻辑
pipelineStageService.cancelStageBySystem(
userId = event.userId,
buildInfo = buildInfo,
buildStage = pendingStage,
timeout = false
)
} else {
pendingStage.dispatchEvent(event)
}
} else {
sendBuildFinishEvent(event)
pendingStage.dispatchEvent(event)
}

measureService?.postCancelData(
projectId = event.projectId,
pipelineId = event.pipelineId,
buildId = buildId,
userId = event.userId
)
true
} else {
false
} else {
// 如果没有暂停的stage可以直接发送流水线finish事件
true
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.tencent.devops.common.event.dispatcher.pipeline.mq.MQ
import com.tencent.devops.common.event.dispatcher.pipeline.mq.Tools
import com.tencent.devops.process.engine.listener.run.PipelineBuildStartListener
import com.tencent.devops.process.engine.listener.run.finish.PipelineBuildBatchCancelListener
import com.tencent.devops.process.engine.listener.run.finish.PipelineBuildBatchFinishListener
import com.tencent.devops.process.engine.listener.run.finish.PipelineBuildCancelListener
import com.tencent.devops.process.engine.listener.run.finish.PipelineBuildFinishListener
import org.springframework.amqp.core.Binding
Expand Down Expand Up @@ -135,6 +137,46 @@ class BuildEngineCoreBuildConfiguration {
)
}

@Value("\${queueConcurrency.buildBatchFinish:5}")
private val buildBatchFinishConcurrency: Int? = null

/**
* 构建结束队列--- 并发一般,与Stage一致
*/
@Bean
fun pipelineBuildBatchFinishQueue() = Queue(MQ.QUEUE_PIPELINE_BUILD_BATCH_FINISH)

@Bean
fun pipelineBuildBatchFinishQueueBind(
@Autowired pipelineBuildFinishQueue: Queue,
@Autowired pipelineCoreExchange: DirectExchange
): Binding {
return BindingBuilder.bind(pipelineBuildFinishQueue)
.to(pipelineCoreExchange)
.with(MQ.ROUTE_PIPELINE_BUILD_BATCH_FINISH)
}

@Bean
fun pipelineBuildBatchFinishListenerContainer(
@Autowired connectionFactory: ConnectionFactory,
@Autowired pipelineBuildFinishQueue: Queue,
@Autowired rabbitAdmin: RabbitAdmin,
@Autowired buildListener: PipelineBuildBatchFinishListener,
@Autowired messageConverter: Jackson2JsonMessageConverter
): SimpleMessageListenerContainer {
return Tools.createSimpleMessageListenerContainer(
connectionFactory = connectionFactory,
queue = pipelineBuildFinishQueue,
rabbitAdmin = rabbitAdmin,
buildListener = buildListener,
messageConverter = messageConverter,
startConsumerMinInterval = 5000,
consecutiveActiveTrigger = 5,
concurrency = buildBatchFinishConcurrency!!,
maxConcurrency = 20
)
}

@Value("\${queueConcurrency.buildCancel:5}")
private val buildCancelConcurrency: Int? = null

Expand Down Expand Up @@ -175,4 +217,45 @@ class BuildEngineCoreBuildConfiguration {
maxConcurrency = 50
)
}

@Value("\${queueConcurrency.buildBatchCancel:5}")
private val buildBatchCancelConcurrency: Int? = null

/**
* 构建取消队列--- 并发一般,与Stage一致
*/
@Bean
fun pipelineBuildBatchCancelQueue() = Queue(MQ.QUEUE_PIPELINE_BUILD_BATCH_CANCEL)

@Bean
fun pipelineBuildBatchCancelQueueBind(
@Autowired pipelineBuildCancelQueue: Queue,
@Autowired pipelineCoreExchange: DirectExchange
): Binding {
return BindingBuilder.bind(pipelineBuildCancelQueue)
.to(pipelineCoreExchange)
.with(MQ.ROUTE_PIPELINE_BUILD_BATCH_CANCEL)
}

@Bean
fun pipelineBuildBatchCancelListenerContainer(
@Autowired connectionFactory: ConnectionFactory,
@Autowired pipelineBuildCancelQueue: Queue,
@Autowired rabbitAdmin: RabbitAdmin,
@Autowired buildListener: PipelineBuildBatchCancelListener,
@Autowired messageConverter: Jackson2JsonMessageConverter
): SimpleMessageListenerContainer {

return Tools.createSimpleMessageListenerContainer(
connectionFactory = connectionFactory,
queue = pipelineBuildCancelQueue,
rabbitAdmin = rabbitAdmin,
buildListener = buildListener,
messageConverter = messageConverter,
startConsumerMinInterval = 5000,
consecutiveActiveTrigger = 5,
concurrency = buildBatchCancelConcurrency!!,
maxConcurrency = 20
)
}
}
Loading

0 comments on commit cd32648

Please sign in to comment.