Skip to content

Commit

Permalink
bug: 修复StorageManager创建Node超时后误删存储 TencentBlueKing#2133
Browse files Browse the repository at this point in the history
* feat: 记录存储中的文件用于存储失败回滚 TencentBlueKing#2133

* feat: 记录存储中的文件用于存储失败回滚 TencentBlueKing#2133

* feat: 支持删除制品的同时删除staging文件 TencentBlueKing#2133

* feat: 增加存储回滚定时任务 TencentBlueKing#2133

* feat: 增加存储回滚定时任务 TencentBlueKing#2133

* feat: 移除创建node超时回滚操作 TencentBlueKing#2133

* feat: 增加存储回滚定时任务 TencentBlueKing#2133

* feat: 增加存储回滚定时任务 TencentBlueKing#2133

* feat: 增加存储回滚定时任务 TencentBlueKing#2133

* feat: 使用findAndModify创建引用,增加单元测试 TencentBlueKing#2133

* feat: 增加存储回滚任务单元测试 TencentBlueKing#2133

* feat: 修复代码检查错误 TencentBlueKing#2133

* feat: 移除FileReference创建方法 TencentBlueKing#2133

* feat: 移除FileReference创建方法 TencentBlueKing#2133

* feat: 恢复NODE_CREATE_TIMEOUT TencentBlueKing#2133

* feat: 移除存储回滚相关代码 TencentBlueKing#2133

* feat: 移除存储回滚相关代码 TencentBlueKing#2133

* feat: 移除存储回滚相关代码 TencentBlueKing#2133

* feat: 增加StorageManager单元测试 TencentBlueKing#2133
  • Loading branch information
cnlkl authored May 27, 2024
1 parent b41498a commit e48226d
Show file tree
Hide file tree
Showing 24 changed files with 276 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ artifact.node.not-found=Node [{0}] not found
artifact.node.path.invalid=Invalid node path [{0}]
artifact.node.existed=Node [{0}] existed
artifact.node.conflict=Node [{0}] conflict
artifact.node.create.timeout=Node [{0}] create timeout
artifact.node.list.too-large=Node list count too large
artifact.node.link-folder-unsupported=Link folder[{0}] was unsupported
artifact.stage.upgrade.error=Upgrade artifact stage error: [{0}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ artifact.node.not-found=节点[{0}]不存在
artifact.node.path.invalid=节点路径[{0}]无效
artifact.node.existed=节点[{0}]已存在
artifact.node.conflict=已存在同名文件,且不允许覆盖
artifact.node.create.timeout=节点[{0}]创建超时
artifact.node.list.too-large=节点列表数量过大
artifact.node.link-folder-unsupported=无法链接到目录[{0}]
artifact.stage.upgrade.error=制品晋级失败: {0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ artifact.node.not-found=節點[{0}]不存在
artifact.node.path.invalid=節點路徑[{0}]無效
artifact.node.existed=節點[{0}]已存在
artifact.node.conflict=已存在同名文件,且不允許覆蓋
artifact.node.create.timeout=節點[{0}]創建超時
artifact.node.list.too-large=節點列表數量過大
artifact.node.link-folder-unsupported=無法連結到目錄[{0}]
artifact.stage.upgrade.error=制品晉級失敗: {0}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ import com.tencent.bkrepo.common.service.util.HttpContextHolder.getRequestOrNull
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.innercos.http.HttpMethod
import com.tencent.bkrepo.repository.api.FileReferenceClient
import com.tencent.bkrepo.repository.api.NodeClient
import com.tencent.bkrepo.repository.pojo.node.NodeDetail
import com.tencent.bkrepo.repository.pojo.node.NodeInfo
import com.tencent.bkrepo.repository.pojo.node.service.NodeCreateRequest
import com.tencent.devops.plugin.api.PluginManager
import com.tencent.devops.plugin.api.applyExtension
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicBoolean

/**
* 存储管理类
Expand All @@ -66,7 +66,8 @@ import java.util.concurrent.atomic.AtomicBoolean
class StorageManager(
private val storageService: StorageService,
private val nodeClient: NodeClient,
private val nodeResourceFactoryImpl: NodeResourceFactoryImpl,
private val fileReferenceClient: FileReferenceClient,
private val nodeResourceFactory: NodeResourceFactory,
private val pluginManager: PluginManager,
) {

Expand All @@ -79,18 +80,18 @@ class StorageManager(
artifactFile: ArtifactFile,
storageCredentials: StorageCredentials?,
): NodeDetail {
val cancel = AtomicBoolean(false)
val affectedCount = storageService.store(request.sha256!!, artifactFile, storageCredentials, cancel)
val affectedCount = storageService.store(request.sha256!!, artifactFile, storageCredentials)
try {
return nodeClient.createNode(request).data!!
} catch (exception: Exception) {
// 当文件有创建,则删除文件
if (affectedCount == 1) {
try {
cancel.set(true)
storageService.delete(request.sha256!!, storageCredentials)
// 当createNode调用超时,实际node和引用创建成功时不会做任何改变
// 当文件创建成功,但是node创建失败时,则创建一个计数为0的fileReference用于清理任务清理垃圾文件
fileReferenceClient.increment(request.sha256!!, storageCredentials?.key, 0L)
} catch (exception: Exception) {
logger.error("Failed to delete new created file[${request.sha256}]", exception)
// 创建引用失败后会通过定时任务StorageReconcileJob清理垃圾文件
logger.error("Failed to create ref for new created file[${request.sha256}]", exception)
}
}
// 异常往上抛
Expand Down Expand Up @@ -124,7 +125,7 @@ class StorageManager(
if (range.isEmpty() || request?.method == HttpMethod.HEAD.name) {
return ArtifactInputStream(EmptyInputStream.INSTANCE, range)
}
val nodeResource = nodeResourceFactoryImpl.getNodeResource(node, range, storageCredentials)
val nodeResource = nodeResourceFactory.getNodeResource(node, range, storageCredentials)
return nodeResource.getArtifactInputStream()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.tencent.bkrepo.common.artifact.manager

import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.api.pojo.Response
import com.tencent.bkrepo.common.artifact.api.ArtifactFile
import com.tencent.bkrepo.common.artifact.api.FileSystemArtifactFile
import com.tencent.bkrepo.common.artifact.util.Constant.UT_PROJECT_ID
import com.tencent.bkrepo.common.artifact.util.Constant.UT_REPO_NAME
import com.tencent.bkrepo.common.artifact.util.Constant.UT_SHA256
import com.tencent.bkrepo.common.artifact.util.Constant.UT_USER
import com.tencent.bkrepo.common.storage.core.StorageService
import com.tencent.bkrepo.repository.api.FileReferenceClient
import com.tencent.bkrepo.repository.api.NodeClient
import com.tencent.bkrepo.repository.pojo.node.NodeDetail
import com.tencent.bkrepo.repository.pojo.node.NodeInfo
import com.tencent.bkrepo.repository.pojo.node.service.NodeCreateRequest
import com.tencent.devops.plugin.api.PluginManager
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.assertThrows
import org.mockito.ArgumentMatchers.anyString
import org.mockito.kotlin.any
import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest
import org.springframework.boot.test.mock.mockito.MockBean
import org.springframework.context.annotation.Import
import org.springframework.test.context.TestPropertySource
import org.springframework.util.ReflectionUtils
import java.io.File

@DisplayName("存储管理器测试")
@DataMongoTest
@SpringBootConfiguration
@EnableAutoConfiguration
@Import(StorageManager::class)
@TestPropertySource(
locations = ["classpath:bootstrap-ut.properties"]
)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class StorageManagerTest @Autowired constructor(
private val storageManager: StorageManager,
) {
@MockBean
private lateinit var fileReferenceClient: FileReferenceClient

@MockBean
private lateinit var nodeClient: NodeClient

@MockBean
private lateinit var pluginManager: PluginManager

@MockBean
private lateinit var storageService: StorageService

@MockBean
private lateinit var nodeResourceFactory: NodeResourceFactory

@BeforeEach
fun beforeEach() {
whenever(storageService.store(anyString(), any(), anyOrNull(), anyOrNull()))
.thenReturn(1)
whenever(nodeClient.createNode(any()))
.thenReturn(Response(code = 0, data = buildNodeDetail(UT_SHA256)))
whenever(fileReferenceClient.increment(anyString(), anyOrNull(), any()))
.thenReturn(Response(code = 0, data = true))
}

@Test
fun testStoreSuccess() {
store()
verify(nodeClient, times(1)).createNode(any())
verify(fileReferenceClient, times(0)).increment(anyString(), anyOrNull(), any())
}

@Test
fun `test store failed`() {
// mock
val storageService = mock<StorageService>()
whenever(storageService.store(anyString(), any(), anyOrNull(), anyOrNull())).then { throw RuntimeException() }
val field = ReflectionUtils.findField(StorageManager::class.java, "storageService")!!
field.isAccessible = true
val oldStorageService = field.get(storageManager)
field.set(storageManager, storageService)

// store failed
assertThrows<RuntimeException> { store() }
verify(nodeClient, times(0)).createNode(any())
verify(fileReferenceClient, times(0)).increment(anyString(), anyOrNull(), any())

// reset mock
field.set(storageManager, oldStorageService)
}

@Test
fun `test create node failed`() {
// mock
whenever(nodeClient.createNode(any())).then { throw RuntimeException() }

// store failed
assertThrows<RuntimeException> { store() }
verify(nodeClient, times(1)).createNode(any())
verify(fileReferenceClient, times(1)).increment(anyString(), anyOrNull(), any())
}

private fun store(): String {
val file = createTempArtifactFile()
val sha256 = file.getFileSha256()
val req = buildNodeCreateRequest("/a/b/c.txt", 10240L, sha256)
try {
storageManager.storeArtifactFile(req, file, null)
} finally {
file.delete()
}
return sha256
}

private fun createTempArtifactFile(size: Long = 10240L): ArtifactFile {
val tempFile = File.createTempFile("tmp", "")
val content = StringPool.randomString(size.toInt())
content.byteInputStream().use { input ->
tempFile.outputStream().use { output ->
input.copyTo(output)
}
}
return FileSystemArtifactFile(tempFile)
}

private fun buildNodeCreateRequest(fullPath: String, size: Long, sha256: String) = NodeCreateRequest(
projectId = UT_PROJECT_ID,
repoName = UT_REPO_NAME,
folder = false,
fullPath = fullPath,
expires = 0,
overwrite = false,
size = size,
sha256 = sha256,
md5 = "md5",
)

private fun buildNodeDetail(sha256: String): NodeDetail {
val nodeInfo = NodeInfo(
createdBy = UT_USER,
createdDate = "",
lastModifiedBy = UT_USER,
lastModifiedDate = "",
folder = false,
sha256 = sha256,
path = "/a/b",
name = "c.txt",
fullPath = "/a/b/c.txt",
size = 10240L,
projectId = UT_PROJECT_ID,
repoName = UT_REPO_NAME
)
return NodeDetail(nodeInfo)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.common.artifact.util

object Constant {
const val UT_PROJECT_ID = "ut-project"
const val UT_REPO_NAME = "ut-repo"
const val UT_USER = "system"
const val UT_SHA256 = "688787d8ff144c502c7f5cffaafe2cc588d86079f9de88304c26b0cb99ce91c6"
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ dependencies {
testImplementation("it.ozimov:embedded-redis:${Versions.EmbeddedRedis}") {
exclude("org.slf4j", "slf4j-simple")
}
testImplementation("org.mockito.kotlin:mockito-kotlin")
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import com.tencent.bkrepo.common.storage.filesystem.check.SynchronizeResult
import com.tencent.bkrepo.common.storage.message.StorageErrorException
import com.tencent.bkrepo.common.storage.message.StorageMessageCode
import com.tencent.bkrepo.common.storage.monitor.Throughput
import java.util.concurrent.atomic.AtomicBoolean
import org.slf4j.LoggerFactory
import kotlin.system.measureNanoTime

Expand All @@ -53,7 +52,6 @@ abstract class AbstractStorageService : CompressSupport() {
digest: String,
artifactFile: ArtifactFile,
storageCredentials: StorageCredentials?,
cancel: AtomicBoolean?,
storageClass: String?,
): Int {
val path = fileLocator.locate(digest)
Expand All @@ -65,7 +63,7 @@ abstract class AbstractStorageService : CompressSupport() {
} else {
val size = artifactFile.getSize()
val nanoTime = measureNanoTime {
doStore(path, digest, artifactFile, credentials, cancel, storageClass)
doStore(path, digest, artifactFile, credentials, storageClass)
}
val throughput = Throughput(size, nanoTime)
logger.info("Success to store artifact file [$digest], $throughput.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationEventPublisher
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicBoolean

/**
* 抽象存储服务辅助类
Expand Down Expand Up @@ -105,7 +104,6 @@ abstract class AbstractStorageSupport : StorageService {
filename: String,
artifactFile: ArtifactFile,
credentials: StorageCredentials,
cancel: AtomicBoolean? = null,
storageClass: String? = null,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import com.tencent.bkrepo.common.storage.core.operation.OverlayOperation
import com.tencent.bkrepo.common.storage.credentials.StorageCredentials
import com.tencent.bkrepo.common.storage.filesystem.check.SynchronizeResult
import java.nio.file.Path
import java.util.concurrent.atomic.AtomicBoolean

/**
* 存储服务接口
Expand All @@ -61,7 +60,6 @@ interface StorageService :
digest: String,
artifactFile: ArtifactFile,
storageCredentials: StorageCredentials?,
cancel: AtomicBoolean? = null,
storageClass: String? = null,
): Int

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import java.io.File
import java.io.FileNotFoundException
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicBoolean

/**
* 支持缓存的存储服务
Expand All @@ -69,7 +68,6 @@ class CacheStorageService(
filename: String,
artifactFile: ArtifactFile,
credentials: StorageCredentials,
cancel: AtomicBoolean?,
storageClass: String?,
) {
when {
Expand All @@ -90,13 +88,12 @@ class CacheStorageService(
else -> {
val cacheFile = getCacheClient(credentials).move(path, filename, artifactFile.flushToFile())
cacheFileEventPublisher.publishCacheFileLoadedEvent(credentials, cacheFile)
async2Store(cancel, filename, credentials, path, cacheFile, storageClass)
async2Store(filename, credentials, path, cacheFile, storageClass)
}
}
}

private fun async2Store(
cancel: AtomicBoolean?,
filename: String,
credentials: StorageCredentials,
path: String,
Expand All @@ -105,16 +102,8 @@ class CacheStorageService(
) {
threadPoolTaskExecutor.execute {
try {
if (cancel?.get() == true) {
logger.info("Cancel store fle [$filename] on [${credentials.key}]")
return@execute
}
fileStorage.store(path, filename, cacheFile, credentials, storageClass)
} catch (ignored: Exception) {
if (cancel?.get() == true) {
logger.info("Cancel store fle [$filename] on [${credentials.key}]")
return@execute
}
// 此处为异步上传,失败后异常不会被外层捕获,所以单独捕获打印error日志
logger.error("Failed to async store file [$filename] on [${credentials.key}]", ignored)
// 失败时把文件放入暂存区,后台任务会进行补偿。
Expand Down
Loading

0 comments on commit e48226d

Please sign in to comment.