diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/ActiveProjectEmptyFolderCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/ActiveProjectEmptyFolderCleanupJob.kt index 9cfa5878c2..64019ee85f 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/ActiveProjectEmptyFolderCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/ActiveProjectEmptyFolderCleanupJob.kt @@ -27,9 +27,11 @@ package com.tencent.bkrepo.job.batch.task.stat +import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.job.batch.base.ActiveProjectService import com.tencent.bkrepo.job.batch.base.JobContext import com.tencent.bkrepo.job.batch.context.EmptyFolderCleanupJobContext +import com.tencent.bkrepo.job.batch.utils.FolderUtils import com.tencent.bkrepo.job.config.properties.ActiveProjectEmptyFolderCleanupJobProperties import org.slf4j.LoggerFactory import org.springframework.boot.context.properties.EnableConfigurationProperties @@ -45,7 +47,7 @@ import java.time.Duration @Component @EnableConfigurationProperties(ActiveProjectEmptyFolderCleanupJobProperties::class) class ActiveProjectEmptyFolderCleanupJob( - private val properties: ActiveProjectEmptyFolderCleanupJobProperties, + val properties: ActiveProjectEmptyFolderCleanupJobProperties, executor: ThreadPoolTaskExecutor, private val activeProjectService: ActiveProjectService, private val mongoTemplate: MongoTemplate, @@ -59,6 +61,15 @@ class ActiveProjectEmptyFolderCleanupJob( logger.info("empty folder cleanup job for active projects finished") } + + override fun beforeRunProject(projectId: String) { + if (properties.userMemory) return + // 每次任务启动前要将redis上对应的key清理, 避免干扰 + val key = KEY_PREFIX + StringPool.COLON + + FolderUtils.buildCacheKey(projectId = projectId, repoName = StringPool.EMPTY) + emptyFolderCleanup.removeRedisKey(key) + } + override fun runRow(row: StatNode, context: JobContext) { require(context is EmptyFolderCleanupJobContext) try { @@ -71,7 +82,12 @@ class ActiveProjectEmptyFolderCleanupJob( folder = row.folder, size = row.size ) - emptyFolderCleanup.collectEmptyFolder(node, context) + emptyFolderCleanup.collectEmptyFolderWithMemory( + row = node, + context = context, + keyPrefix = KEY_PREFIX, + useMemory = properties.userMemory + ) } catch (e: Exception) { logger.error("run empty folder clean for Node $row failed, ${e.message}") } @@ -94,16 +110,38 @@ class ActiveProjectEmptyFolderCleanupJob( override fun onRunProjectFinished(collection: String, projectId: String, context: JobContext) { require(context is EmptyFolderCleanupJobContext) logger.info("will filter empty folder in project $projectId") - emptyFolderCleanup.emptyFolderHandler( - collection = collection, - context = context, - deletedEmptyFolder = properties.deletedEmptyFolder, - projectId = projectId, - deleteFolderRepos = properties.deleteFolderRepos - ) + + if (!properties.userMemory) { + emptyFolderCleanup.collectEmptyFolderWithRedis( + context = context, + force = true, + keyPrefix = KEY_PREFIX, + collectionName = null, + projectId = projectId + ) + } + if (properties.userMemory) { + emptyFolderCleanup.emptyFolderHandlerWithMemory( + collection = collection, + context = context, + deletedEmptyFolder = properties.deletedEmptyFolder, + projectId = projectId, + deleteFolderRepos = properties.deleteFolderRepos + ) + } else { + emptyFolderCleanup.emptyFolderHandlerWithRedis( + collection = collection, + keyPrefix = KEY_PREFIX, + context = context, + deletedEmptyFolder = properties.deletedEmptyFolder, + projectId = projectId, + deleteFolderRepos = properties.deleteFolderRepos + ) + } } companion object { private val logger = LoggerFactory.getLogger(ActiveProjectEmptyFolderCleanupJob::class.java) + private const val KEY_PREFIX = "activeEmptyFolder" } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/ActiveProjectNodeFolderStatJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/ActiveProjectNodeFolderStatJob.kt index c851c155b3..ecf2d6bbc0 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/ActiveProjectNodeFolderStatJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/ActiveProjectNodeFolderStatJob.kt @@ -27,10 +27,12 @@ package com.tencent.bkrepo.job.batch.task.stat +import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.job.FOLDER import com.tencent.bkrepo.job.batch.base.ActiveProjectService import com.tencent.bkrepo.job.batch.base.JobContext import com.tencent.bkrepo.job.batch.context.NodeFolderJobContext +import com.tencent.bkrepo.job.batch.utils.FolderUtils import com.tencent.bkrepo.job.config.properties.ActiveProjectNodeFolderStatJobProperties import org.slf4j.LoggerFactory import org.springframework.boot.context.properties.EnableConfigurationProperties @@ -46,7 +48,7 @@ import java.time.Duration @Component @EnableConfigurationProperties(ActiveProjectNodeFolderStatJobProperties::class) class ActiveProjectNodeFolderStatJob( - properties: ActiveProjectNodeFolderStatJobProperties, + val properties: ActiveProjectNodeFolderStatJobProperties, executor: ThreadPoolTaskExecutor, private val activeProjectService: ActiveProjectService, private val mongoTemplate: MongoTemplate, @@ -65,6 +67,14 @@ class ActiveProjectNodeFolderStatJob( return Criteria().and(FOLDER).`is`(false) } + override fun beforeRunProject(projectId: String) { + if (properties.userMemory) return + // 每次任务启动前要将redis上对应的key清理, 避免干扰 + val key = KEY_PREFIX + StringPool.COLON + + FolderUtils.buildCacheKey(projectId = projectId, repoName = StringPool.EMPTY) + nodeFolderStat.removeRedisKey(key) + } + override fun runRow(row: StatNode, context: JobContext) { require(context is NodeFolderJobContext) val node = nodeFolderStat.buildNode( @@ -76,7 +86,12 @@ class ActiveProjectNodeFolderStatJob( folder = row.folder, size = row.size ) - nodeFolderStat.collectNode(node, context) + nodeFolderStat.collectNode( + node = node, + context = context, + useMemory = properties.userMemory, + keyPrefix = KEY_PREFIX + ) } override fun createJobContext(): NodeFolderJobContext { @@ -94,8 +109,22 @@ class ActiveProjectNodeFolderStatJob( */ override fun onRunProjectFinished(collection: String, projectId: String, context: JobContext) { require(context is NodeFolderJobContext) - logger.info("store memory cache to db with projectId $projectId") - nodeFolderStat.storeMemoryCacheToDB(context, collection, projectId) + if (!properties.userMemory) { + nodeFolderStat.updateRedisCache( + context = context, + force = true, + keyPrefix = KEY_PREFIX, + collectionName = null, + projectId = projectId + ) + } + + logger.info("store cache to db with projectId $projectId") + if (properties.userMemory) { + nodeFolderStat.storeMemoryCacheToDB(context, collection, projectId) + } else { + nodeFolderStat.storeRedisCacheToDB(context, KEY_PREFIX, collection, projectId) + } } /** @@ -105,5 +134,6 @@ class ActiveProjectNodeFolderStatJob( companion object { private val logger = LoggerFactory.getLogger(ActiveProjectNodeFolderStatJob::class.java) + private const val KEY_PREFIX = "activeProjectNode" } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/EmptyFolderCleanup.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/EmptyFolderCleanup.kt index 3458f5848b..d799648340 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/EmptyFolderCleanup.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/EmptyFolderCleanup.kt @@ -34,13 +34,13 @@ import com.tencent.bkrepo.job.DELETED_DATE import com.tencent.bkrepo.job.FOLDER import com.tencent.bkrepo.job.FULL_PATH import com.tencent.bkrepo.job.LAST_MODIFIED_DATE -import com.tencent.bkrepo.job.NODE_NUM import com.tencent.bkrepo.job.PROJECT import com.tencent.bkrepo.job.REPO import com.tencent.bkrepo.job.SIZE import com.tencent.bkrepo.job.batch.context.EmptyFolderCleanupJobContext import com.tencent.bkrepo.job.batch.utils.FolderUtils import com.tencent.bkrepo.job.batch.utils.FolderUtils.extractFolderInfoFromCacheKey +import com.tencent.bkrepo.job.pojo.FolderInfo import org.bson.types.ObjectId import org.slf4j.LoggerFactory import org.springframework.data.mongodb.core.MongoTemplate @@ -48,12 +48,16 @@ import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.Update import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.data.redis.core.HashOperations +import org.springframework.data.redis.core.RedisTemplate +import org.springframework.data.redis.core.ScanOptions import org.springframework.stereotype.Component import java.time.LocalDateTime @Component class EmptyFolderCleanup( private val mongoTemplate: MongoTemplate, + private val redisTemplate: RedisTemplate, ) { fun buildNode( @@ -76,9 +80,15 @@ class EmptyFolderCleanup( ) } - fun collectEmptyFolder( + fun removeRedisKey(key: String) { + redisTemplate.delete(key) + } + + fun collectEmptyFolderWithMemory( row: Node, context: EmptyFolderCleanupJobContext, + useMemory: Boolean, + keyPrefix: String, collectionName: String? = null ) { if (row.folder) { @@ -105,9 +115,66 @@ class EmptyFolderCleanup( folderMetric.nodeNum.increment() } } + if (!useMemory) { + collectEmptyFolderWithRedis( + context = context, + keyPrefix = keyPrefix, + projectId = row.projectId, + collectionName = collectionName + ) + } + } + + /** + * 将存储在内存中的临时记录更新到redis + */ + fun collectEmptyFolderWithRedis( + context: EmptyFolderCleanupJobContext, + force: Boolean = false, + keyPrefix: String, + projectId: String = StringPool.EMPTY, + collectionName: String? = null + ) { + if (!force && context.folders.size < 100000) return + if (context.folders.isEmpty()) return + val movedToRedis: MutableList = mutableListOf() + val storedFolderPrefix = if (collectionName.isNullOrEmpty()) { + FolderUtils.buildCacheKey(collectionName = collectionName, projectId = projectId) + StringPool.COLON + } else { + FolderUtils.buildCacheKey(collectionName = collectionName, projectId = StringPool.EMPTY) + } + // 避免每次设置值都创建一个 Redis 连接 + redisTemplate.execute { connection -> + val hashCommands = connection.hashCommands() + for (entry in context.folders) { + if (!entry.key.startsWith(storedFolderPrefix)) continue + val folderInfo = extractFolderInfoFromCacheKey(entry.key, collectionName != null) ?: continue + val nodeNumHKey = FolderUtils.buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = NODE_NUM + ) + + val key = keyPrefix + StringPool.COLON + FolderUtils.buildCacheKey( + collectionName = collectionName, projectId = projectId + ) + hashCommands.hIncrBy(key.toByteArray(), nodeNumHKey.toByteArray(), entry.value.nodeNum.toLong()) + entry.value.id?.let { + val idHKey = FolderUtils.buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = NODE_ID + ) + hashCommands.hSet(key.toByteArray(), idHKey.toByteArray(), entry.value.id!!.toByteArray()) + } + movedToRedis.add(entry.key) + } + null + } + for (key in movedToRedis) { + context.folders.remove(key) + } } - fun emptyFolderHandler( + fun emptyFolderHandlerWithMemory( collection: String, context: EmptyFolderCleanupJobContext, deletedEmptyFolder: Boolean, @@ -124,27 +191,116 @@ class EmptyFolderCleanup( for (entry in context.folders) { if (!entry.key.startsWith(prefix) || entry.value.nodeNum.toLong() > 0) continue val folderInfo = extractFolderInfoFromCacheKey(entry.key, runCollection) ?: continue - if (emptyFolderDoubleCheck( - projectId = folderInfo.projectId, - repoName = folderInfo.repoName, - path = folderInfo.fullPath, - collectionName = collection - )) { - val deletedFlag = deletedFolderFlag( - repoName = folderInfo.repoName, - deletedEmptyFolder = deletedEmptyFolder, - deleteFolderRepos = deleteFolderRepos + emptyFolderHandler( + folderInfo = folderInfo, + context = context, + collection = collection, + deleteFolderRepos = deleteFolderRepos, + deletedEmptyFolder = deletedEmptyFolder, + id = entry.value.id!!, + ) + } + clearContextCache(projectId, context, collection, runCollection) + } + + fun emptyFolderHandlerWithRedis( + collection: String, + keyPrefix: String, + deletedEmptyFolder: Boolean, + deleteFolderRepos: List, + context: EmptyFolderCleanupJobContext, + runCollection: Boolean = false, + projectId: String = StringPool.EMPTY, + ) { + + val keySuffix = if (runCollection) { + FolderUtils.buildCacheKey(collectionName = collection, projectId = projectId) + } else { + FolderUtils.buildCacheKey(collectionName = null, projectId = projectId) + } + val key = keyPrefix + StringPool.COLON + keySuffix + val hashOps = redisTemplate.opsForHash() + val options = ScanOptions.scanOptions().build() + redisTemplate.execute { connection -> + val hashCommands = connection.hashCommands() + val cursor = hashCommands.hScan(key.toByteArray(), options) + while (cursor.hasNext()) { + val entry: Map.Entry = cursor.next() + val keyStr = String(entry.key).substringBeforeLast(StringPool.COLON) + val folderInfo = extractFolderInfoFromCacheKey(keyStr) ?: continue + val statInfo = getFolderStatInfo( + key, entry, folderInfo, hashOps ) - logger.info( - "will delete empty folder ${folderInfo.fullPath}" + - " in repo ${folderInfo.projectId}|${folderInfo.repoName} " + - "with config deletedFlag: $deletedFlag" + if (statInfo.nodeNum > 0) continue + emptyFolderHandler( + folderInfo = folderInfo, + context = context, + collection = collection, + deleteFolderRepos = deleteFolderRepos, + deletedEmptyFolder = deletedEmptyFolder, + id = statInfo.id!!, ) - doEmptyFolderDelete(entry.value.id, collection, deletedFlag) - context.totalDeletedNum.increment() } } - clearContextCache(projectId, context, collection, runCollection) + redisTemplate.delete(key) + } + + private fun emptyFolderHandler( + folderInfo: FolderInfo, + collection: String, + deletedEmptyFolder: Boolean, + deleteFolderRepos: List, + id: String, + context: EmptyFolderCleanupJobContext, + ) { + if (emptyFolderDoubleCheck( + projectId = folderInfo.projectId, + repoName = folderInfo.repoName, + path = folderInfo.fullPath, + collectionName = collection + )) { + val deletedFlag = deletedFolderFlag( + repoName = folderInfo.repoName, + deletedEmptyFolder = deletedEmptyFolder, + deleteFolderRepos = deleteFolderRepos + ) + logger.info( + "will delete empty folder ${folderInfo.fullPath}" + + " in repo ${folderInfo.projectId}|${folderInfo.repoName} " + + "with config deletedFlag: $deletedFlag" + ) + doEmptyFolderDelete(id, collection, deletedFlag) + context.totalDeletedNum.increment() + } + } + + /** + * 从redis中获取对应目录的统计信息 + */ + private fun getFolderStatInfo( + key: String, + entry: Map.Entry, + folderInfo: FolderInfo, + hashOps: HashOperations + ): StatInfo { + val id: String + val nodeNum: Long + if (String(entry.key).endsWith(SIZE)) { + val nodeNumKey = FolderUtils.buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = NODE_NUM + ) + id = String(entry.value) + nodeNum = hashOps.get(key, nodeNumKey)?.toLongOrNull() ?: 0 + } else { + val idKey = FolderUtils.buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = NODE_ID + ) + nodeNum = String(entry.value).toLongOrNull() ?: 0 + id = hashOps.get(key, idKey) ?: StringPool.EMPTY + } + return StatInfo(id, nodeNum) } /** @@ -237,8 +393,17 @@ class EmptyFolderCleanup( val repoName: String, ) + data class StatInfo( + var id: String?, + var nodeNum: Long + ) + + companion object { private val logger = LoggerFactory.getLogger(EmptyFolderCleanup::class.java) const val FULL_PATH_IDX = "projectId_repoName_fullPath_idx" + private const val NODE_NUM = "nodeNum" + private const val NODE_ID = "nodeId" + } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/InactiveProjectEmptyFolderCleanupJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/InactiveProjectEmptyFolderCleanupJob.kt index 6878dd0c76..996918846e 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/InactiveProjectEmptyFolderCleanupJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/InactiveProjectEmptyFolderCleanupJob.kt @@ -27,6 +27,7 @@ package com.tencent.bkrepo.job.batch.task.stat +import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.job.DELETED_DATE import com.tencent.bkrepo.job.REPO import com.tencent.bkrepo.job.SHARDING_COUNT @@ -34,6 +35,7 @@ import com.tencent.bkrepo.job.batch.base.ActiveProjectService import com.tencent.bkrepo.job.batch.base.DefaultContextMongoDbJob import com.tencent.bkrepo.job.batch.base.JobContext import com.tencent.bkrepo.job.batch.context.EmptyFolderCleanupJobContext +import com.tencent.bkrepo.job.batch.utils.FolderUtils import com.tencent.bkrepo.job.batch.utils.StatUtils.specialRepoRunCheck import com.tencent.bkrepo.job.config.properties.InactiveProjectEmptyFolderCleanupJobProperties import org.slf4j.LoggerFactory @@ -98,7 +100,10 @@ class InactiveProjectEmptyFolderCleanupJob( folder = row.folder, size = row.size ) - emptyFolderCleanup.collectEmptyFolder(node, context, collectionName) + emptyFolderCleanup.collectEmptyFolderWithMemory( + row = node, context = context, collectionName = collectionName, + keyPrefix = KEY_PREFIX, useMemory = properties.userMemory + ) } override fun getLockAtMostFor(): Duration { @@ -106,6 +111,7 @@ class InactiveProjectEmptyFolderCleanupJob( } override fun createJobContext(): EmptyFolderCleanupJobContext { + beforeRunCollection() val temp = mutableMapOf() activeProjectService.getActiveProjects().forEach { temp[it] = true @@ -119,13 +125,44 @@ class InactiveProjectEmptyFolderCleanupJob( require(context is EmptyFolderCleanupJobContext) super.onRunCollectionFinished(collectionName, context) logger.info("will filter empty folder in $collectionName") - emptyFolderCleanup.emptyFolderHandler( - collection = collectionName, - context = context, - deletedEmptyFolder = properties.deletedEmptyFolder, - runCollection = true, - deleteFolderRepos = properties.deleteFolderRepos - ) + + if (!properties.userMemory) { + emptyFolderCleanup.collectEmptyFolderWithRedis( + context = context, + force = true, + keyPrefix = KEY_PREFIX, + collectionName = collectionName, + ) + } + if (properties.userMemory) { + emptyFolderCleanup.emptyFolderHandlerWithMemory( + collection = collectionName, + context = context, + deletedEmptyFolder = properties.deletedEmptyFolder, + runCollection = true, + deleteFolderRepos = properties.deleteFolderRepos + ) + } else { + emptyFolderCleanup.emptyFolderHandlerWithRedis( + collection = collectionName, + keyPrefix = KEY_PREFIX, + context = context, + deletedEmptyFolder = properties.deletedEmptyFolder, + runCollection = true, + deleteFolderRepos = properties.deleteFolderRepos + ) + } + } + + private fun beforeRunCollection() { + if (properties.userMemory) return + // 每次任务启动前要将redis上对应的key清理, 避免干扰 + collectionNames().forEach { + val key = KEY_PREFIX + StringPool.COLON + FolderUtils.buildCacheKey( + collectionName = it, projectId = StringPool.EMPTY + ) + emptyFolderCleanup.removeRedisKey(key) + } } data class Node( @@ -152,5 +189,6 @@ class InactiveProjectEmptyFolderCleanupJob( private val logger = LoggerFactory.getLogger(InactiveProjectEmptyFolderCleanupJob::class.java) const val FULL_PATH_IDX = "projectId_repoName_fullPath_idx" private const val COLLECTION_NAME_PREFIX = "node_" + private const val KEY_PREFIX = "inactiveEmptyFolder" } } \ No newline at end of file diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/InactiveProjectNodeFolderStatJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/InactiveProjectNodeFolderStatJob.kt index ef4b3327fe..a3a8c7f24e 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/InactiveProjectNodeFolderStatJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/InactiveProjectNodeFolderStatJob.kt @@ -27,6 +27,7 @@ package com.tencent.bkrepo.job.batch.task.stat +import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.job.DELETED_DATE import com.tencent.bkrepo.job.FOLDER import com.tencent.bkrepo.job.IGNORE_PROJECT_PREFIX_LIST @@ -36,6 +37,7 @@ import com.tencent.bkrepo.job.batch.base.ActiveProjectService import com.tencent.bkrepo.job.batch.base.DefaultContextMongoDbJob import com.tencent.bkrepo.job.batch.base.JobContext import com.tencent.bkrepo.job.batch.context.NodeFolderJobContext +import com.tencent.bkrepo.job.batch.utils.FolderUtils import com.tencent.bkrepo.job.batch.utils.StatUtils.specialRepoRunCheck import com.tencent.bkrepo.job.config.properties.InactiveProjectNodeFolderStatJobProperties import org.slf4j.LoggerFactory @@ -104,10 +106,17 @@ class InactiveProjectNodeFolderStatJob( folder = row.folder, size = row.size ) - nodeFolderStat.collectNode(node, context, collectionName) + nodeFolderStat.collectNode( + node = node, + context = context, + useMemory = properties.userMemory, + keyPrefix = KEY_PREFIX, + collectionName = collectionName + ) } override fun createJobContext(): NodeFolderJobContext { + beforeRunCollection() val temp = mutableMapOf() activeProjectService.getActiveProjects().forEach { temp[it] = true @@ -117,12 +126,38 @@ class InactiveProjectNodeFolderStatJob( ) } + private fun beforeRunCollection() { + if (properties.userMemory) return + // 每次任务启动前要将redis上对应的key清理, 避免干扰 + collectionNames().forEach { + val key = KEY_PREFIX + StringPool.COLON + FolderUtils.buildCacheKey( + collectionName = it, projectId = StringPool.EMPTY + ) + nodeFolderStat.removeRedisKey(key) + } + } + override fun onRunCollectionFinished(collectionName: String, context: JobContext) { super.onRunCollectionFinished(collectionName, context) require(context is NodeFolderJobContext) // 当表执行完成后,将属于该表的所有记录写入数据库 logger.info("store memory cache to db withe table $collectionName") - nodeFolderStat.storeMemoryCacheToDB(context, collectionName, runCollection = true) + if (!properties.userMemory) { + nodeFolderStat.updateRedisCache( + context = context, + force = true, + keyPrefix = KEY_PREFIX, + collectionName = collectionName, + ) + } + + if (properties.userMemory) { + logger.info("store memory cache to db withe table $collectionName") + nodeFolderStat.storeMemoryCacheToDB(context, collectionName, runCollection = true) + } else { + logger.info("store redis cache to db withe table $collectionName") + nodeFolderStat.storeRedisCacheToDB(context, KEY_PREFIX, collectionName, runCollection = true) + } } /** @@ -169,5 +204,7 @@ class InactiveProjectNodeFolderStatJob( companion object { private val logger = LoggerFactory.getLogger(InactiveProjectNodeFolderStatJob::class.java) private const val COLLECTION_NAME_PREFIX = "node_" + private const val KEY_PREFIX = "inactiveProjectNode" + } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/NodeFolderStat.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/NodeFolderStat.kt index c34347ec9f..6790dacc99 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/NodeFolderStat.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/NodeFolderStat.kt @@ -37,6 +37,7 @@ import com.tencent.bkrepo.job.REPO import com.tencent.bkrepo.job.batch.context.NodeFolderJobContext import com.tencent.bkrepo.job.batch.utils.FolderUtils import com.tencent.bkrepo.job.batch.utils.FolderUtils.extractFolderInfoFromCacheKey +import com.tencent.bkrepo.job.pojo.FolderInfo import org.slf4j.LoggerFactory import org.springframework.data.mongodb.core.BulkOperations import org.springframework.data.mongodb.core.MongoTemplate @@ -44,11 +45,15 @@ import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.Update import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.data.redis.core.HashOperations +import org.springframework.data.redis.core.RedisTemplate +import org.springframework.data.redis.core.ScanOptions import org.springframework.stereotype.Component @Component class NodeFolderStat( private val mongoTemplate: MongoTemplate, + private val redisTemplate: RedisTemplate, ) { fun buildNode( @@ -74,7 +79,9 @@ class NodeFolderStat( fun collectNode( node: Node, context: NodeFolderJobContext, - collectionName: String? = null + useMemory: Boolean, + keyPrefix: String, + collectionName: String? = null, ) { //只统计非目录类节点;没有根目录这个节点,不需要统计 if (node.path == PathUtils.ROOT) { @@ -92,6 +99,15 @@ class NodeFolderStat( context = context, collectionName = collectionName ) + if (!useMemory) { + // 避免每次请求都去请求redis, 先将数据缓存在本地cache中,到达上限后更新到redis + updateRedisCache( + context = context, + collectionName = collectionName, + keyPrefix = keyPrefix, + projectId = node.projectId + ) + } } } @@ -106,7 +122,6 @@ class NodeFolderStat( context: NodeFolderJobContext, collectionName: String? = null ) { - val key = FolderUtils.buildCacheKey( collectionName = collectionName, projectId = projectId, repoName = repoName, fullPath = fullPath ) @@ -115,6 +130,58 @@ class NodeFolderStat( folderMetrics.nodeNum.increment() } + /** + * 将存储在内存中的临时记录更新到redis + */ + fun updateRedisCache( + context: NodeFolderJobContext, + keyPrefix: String, + projectId: String = StringPool.EMPTY, + force: Boolean = false, + collectionName: String? + ) { + if (!force && context.folderCache.size < 100000) return + if (context.folderCache.isEmpty()) return + val movedToRedis: MutableList = mutableListOf() + val storedFolderPrefix = if (collectionName.isNullOrEmpty()) { + FolderUtils.buildCacheKey(collectionName = collectionName, projectId = projectId) + StringPool.COLON + } else { + FolderUtils.buildCacheKey(collectionName = collectionName, projectId = StringPool.EMPTY) + } + + // 避免每次设置值都创建一个 Redis 连接 + redisTemplate.execute { connection -> + val hashCommands = connection.hashCommands() + for (entry in context.folderCache) { + if (!entry.key.startsWith(storedFolderPrefix)) continue + val folderInfo = extractFolderInfoFromCacheKey(entry.key, collectionName != null) ?: continue + val sizeHKey = FolderUtils.buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = SIZE + ) + val nodeNumHKey = FolderUtils.buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = NODE_NUM + ) + val key = keyPrefix + StringPool.COLON + FolderUtils.buildCacheKey( + collectionName = collectionName, projectId = projectId + ) + // hkey为projectId:repoName:fullPath:size或者nodenum, hvalue为对应值, + hashCommands.hIncrBy(key.toByteArray(), sizeHKey.toByteArray(), entry.value.capSize.toLong()) + hashCommands.hIncrBy(key.toByteArray(), nodeNumHKey.toByteArray(), entry.value.nodeNum.toLong()) + movedToRedis.add(entry.key) + } + null + } + for (key in movedToRedis) { + context.folderCache.remove(key) + } + } + + fun removeRedisKey(key: String) { + redisTemplate.delete(key) + } + /** * 将memory缓存中属于collectionName下的记录写入DB中 */ @@ -165,6 +232,101 @@ class NodeFolderStat( } } + /** + * 将redis缓存中属于collectionName下的记录写入DB中 + */ + fun storeRedisCacheToDB( + context: NodeFolderJobContext, + keyPrefix: String, + collectionName: String, + projectId: String = StringPool.EMPTY, + runCollection: Boolean = false + ) { + val keySuffix = if (runCollection) { + FolderUtils.buildCacheKey(collectionName = collectionName, projectId = projectId) + } else { + FolderUtils.buildCacheKey(collectionName = null, projectId = projectId) + } + val key = keyPrefix + StringPool.COLON + keySuffix + storeRedisCacheToDB(key, collectionName) + } + + /** + * 存储对应项目下缓存在redis下的folder记录 + */ + private fun storeRedisCacheToDB( + key: String, + collectionName: String?, + ) { + val hashOps = redisTemplate.opsForHash() + val updateList = ArrayList>() + val options = ScanOptions.scanOptions().build() + redisTemplate.execute { connection -> + val hashCommands = connection.hashCommands() + val cursor = hashCommands.hScan(key.toByteArray(), options) + while (cursor.hasNext()) { + val entry: Map.Entry = cursor.next() + val keyStr = String(entry.key).substringBeforeLast(StringPool.COLON) + val folderInfo = extractFolderInfoFromCacheKey(keyStr) ?: continue + val statInfo = getFolderStatInfo( + key, entry, folderInfo, hashOps + ) + updateList.add( + buildUpdateClausesForFolder( + projectId = folderInfo.projectId, + repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, + size = statInfo.size, + nodeNum = statInfo.nodeNum + ) + ) + if (updateList.size >= BATCH_LIMIT) { + mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, collectionName) + .updateOne(updateList) + .execute() + updateList.clear() + } + } + } + if (updateList.isNotEmpty()) { + mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, collectionName) + .updateOne(updateList) + .execute() + updateList.clear() + } + redisTemplate.delete(key) + } + + + /** + * 从redis中获取对应目录的统计信息 + */ + private fun getFolderStatInfo( + key: String, + entry: Map.Entry, + folderInfo: FolderInfo, + hashOps: HashOperations + ): StatInfo { + val size: Long + val nodeNum: Long + if (String(entry.key).endsWith(SIZE)) { + val nodeNumKey = FolderUtils.buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = NODE_NUM + ) + size = String(entry.value).toLongOrNull() ?: 0 + nodeNum = hashOps.get(key, nodeNumKey)?.toLongOrNull() ?: 0 + } else { + val sizeKey = FolderUtils.buildCacheKey( + projectId = folderInfo.projectId, repoName = folderInfo.repoName, + fullPath = folderInfo.fullPath, tag = SIZE + ) + nodeNum = String(entry.value).toLongOrNull() ?: 0 + size = hashOps.get(key, sizeKey)?.toLongOrNull() ?: 0 + } + return StatInfo(size, nodeNum) + } + /** * 生成db更新语句 */ @@ -197,10 +359,18 @@ class NodeFolderStat( val repoName: String, ) + + data class StatInfo( + var size: Long, + var nodeNum: Long + ) + companion object { private val logger = LoggerFactory.getLogger(NodeFolderStat::class.java) private const val SIZE = "size" private const val NODE_NUM = "nodeNum" private const val BATCH_LIMIT = 500 + private const val STORED = "stored" + private const val REDIS_KEY_PREFIX = "node_folder_stat:" } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/StatBaseJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/StatBaseJob.kt index 49c76b1225..437b9bbd6a 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/StatBaseJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/stat/StatBaseJob.kt @@ -105,6 +105,8 @@ open class StatBaseJob( } } + open fun beforeRunProject(projectId: String) {} + open fun runRow(row: StatNode, context: JobContext) {} open fun onRunProjectFinished(collection: String, projectId: String, context: JobContext) {} @@ -151,6 +153,7 @@ open class StatBaseJob( action: (String) -> Unit, ) { semaphore.acquire() + beforeRunProject(projectId) futureList.add( executor.submit( Callable { diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/FolderUtils.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/FolderUtils.kt index be7a9a544b..36f437f167 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/FolderUtils.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/FolderUtils.kt @@ -80,4 +80,20 @@ object FolderUtils { null } } + + /** + * 从缓存key中解析出collectionName + */ + fun extractCollectionNameFromCacheKey(key: String, runCollection: Boolean = false): String? { + val values = key.split(StringPool.COLON) + return try { + if (runCollection) { + values.firstOrNull() + } else { + null + } + } catch (e: Exception) { + null + } + } } \ No newline at end of file diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StatJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StatJobProperties.kt index 1a86532510..efaad42831 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StatJobProperties.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StatJobProperties.kt @@ -38,4 +38,5 @@ open class StatJobProperties( // 特殊仓库在每周第几天执行,默认周六 var specialDay: Int = 6, var concurrencyNum: Int = 1, + var userMemory: Boolean = true ) : MongodbJobProperties() \ No newline at end of file