From 3e9a8b0bb4f1719558ab8f5c4afa2398db63d09c Mon Sep 17 00:00:00 2001 From: dmitriyb Date: Thu, 25 Jul 2024 14:44:06 +0200 Subject: [PATCH] JBAI-4832 [core] Refactored memory management in model loading: Default limiter for the 0.3 of the max heap size, NoAllocator limiter when user doesn't want to reuse any allocated arrays. --- .../kotlin/io/kinference.core/KIEngine.kt | 29 +++++----- .../io/kinference.core/model/KIModel.kt | 58 ++++++++----------- .../ndarray/arrays/memory/AllocatorContext.kt | 20 ++++--- .../ndarray/arrays/memory/MemoryLimiter.kt | 27 ++++++++- .../arrays/memory/ModelArrayStorage.kt | 3 +- 5 files changed, 77 insertions(+), 60 deletions(-) diff --git a/inference/inference-core/src/jvmMain/kotlin/io/kinference.core/KIEngine.kt b/inference/inference-core/src/jvmMain/kotlin/io/kinference.core/KIEngine.kt index 85ad2ec33..7a743afd1 100644 --- a/inference/inference-core/src/jvmMain/kotlin/io/kinference.core/KIEngine.kt +++ b/inference/inference-core/src/jvmMain/kotlin/io/kinference.core/KIEngine.kt @@ -10,11 +10,14 @@ import io.kinference.core.optimizer.rules.OptimizerRuleSet import io.kinference.data.ONNXData import io.kinference.data.ONNXDataType import io.kinference.model.IrOptimizableEngine +import io.kinference.ndarray.arrays.memory.MemoryLimiter +import io.kinference.ndarray.arrays.memory.MemoryLimiters import io.kinference.optimizer.GraphOptimizer import io.kinference.optimizer.OptimizerRule import io.kinference.protobuf.* import io.kinference.protobuf.message.* import io.kinference.utils.CommonDataLoader +import io.kinference.utils.PlatformUtils import okio.Buffer import okio.Path import okio.Path.Companion.toPath @@ -48,24 +51,24 @@ object KIEngine : IrOptimizableEngine> { fun protoReader(bytes: ByteArray) = ProtobufReader(Buffer().write(bytes), KI_READER_CONFIG) - suspend fun loadModel(bytes: ByteArray, optimize: Boolean, useAllocator: Boolean): KIModel { + suspend fun loadModel(bytes: ByteArray, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel { val rules = if (optimize) OptimizerRuleSet.DEFAULT_OPT_RULES else emptyList() - return loadModel(bytes, rules, useAllocator) + return loadModel(bytes, rules, memoryLimiter, parallelismLimit) } override suspend fun loadModel(bytes: ByteArray, optimize: Boolean): KIModel { - return loadModel(bytes, optimize, true) + return loadModel(bytes, optimize, MemoryLimiters.Default, PlatformUtils.cores) } - override suspend fun loadModel(bytes: ByteArray, rules: List>>): KIModel = loadModel(bytes, rules, true) + override suspend fun loadModel(bytes: ByteArray, rules: List>>): KIModel = loadModel(bytes, rules, MemoryLimiters.Default, PlatformUtils.cores) - suspend fun loadModel(bytes: ByteArray, rules: List>>, useAllocator: Boolean): KIModel { + suspend fun loadModel(bytes: ByteArray, rules: List>>, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel { val modelScheme = ModelProto.decode(protoReader(bytes)) - val model = KIModel(modelScheme, useAllocator) + val model = KIModel(modelScheme, memoryLimiter) return if (rules.isNotEmpty()) { val newGraph = GraphOptimizer(model.graph).run(rules) as KIGraph - KIModel(model.id, model.name, model.opSet, newGraph, useAllocator) + KIModel(model.id, model.name, model.opSet, newGraph, memoryLimiter, parallelismLimit) } else { model } @@ -73,12 +76,12 @@ object KIEngine : IrOptimizableEngine> { override suspend fun loadModel(bytes: ByteArray): KIModel = loadModel(bytes, optimize = true) - suspend fun loadModel(path: Path, optimize: Boolean, useAllocator: Boolean): KIModel { - return loadModel(CommonDataLoader.bytes(path), optimize, useAllocator) + suspend fun loadModel(path: Path, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel { + return loadModel(CommonDataLoader.bytes(path), optimize, memoryLimiter, parallelismLimit) } override suspend fun loadModel(path: Path, optimize: Boolean): KIModel { - return loadModel(path, optimize, true) + return loadModel(path, optimize, MemoryLimiters.Default, PlatformUtils.cores) } override suspend fun loadModel(path: Path): KIModel = loadModel(path, optimize = true) @@ -87,12 +90,12 @@ object KIEngine : IrOptimizableEngine> { return loadModel(CommonDataLoader.bytes(path), rules) } - suspend fun loadModel(path: String, optimize: Boolean, useAllocator: Boolean): KIModel { - return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, useAllocator) + suspend fun loadModel(path: String, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel { + return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, memoryLimiter, parallelismLimit) } override suspend fun loadModel(path: String, optimize: Boolean): KIModel { - return loadModel(path, optimize, true) + return loadModel(path, optimize, MemoryLimiters.Default, PlatformUtils.cores) } override suspend fun loadModel(path: String): KIModel = loadModel(path, optimize = true) diff --git a/inference/inference-core/src/jvmMain/kotlin/io/kinference.core/model/KIModel.kt b/inference/inference-core/src/jvmMain/kotlin/io/kinference.core/model/KIModel.kt index 0fd5ea42c..b969738b3 100644 --- a/inference/inference-core/src/jvmMain/kotlin/io/kinference.core/model/KIModel.kt +++ b/inference/inference-core/src/jvmMain/kotlin/io/kinference.core/model/KIModel.kt @@ -5,11 +5,10 @@ import io.kinference.core.graph.KIGraph import io.kinference.core.markOutput import io.kinference.graph.Contexts import io.kinference.model.Model +import io.kinference.ndarray.arrays.memory.* import io.kinference.operator.OperatorSetRegistry import io.kinference.profiler.* import io.kinference.protobuf.message.ModelProto -import io.kinference.ndarray.arrays.memory.AllocatorContext -import io.kinference.ndarray.arrays.memory.ModelArrayStorage import io.kinference.utils.* import kotlinx.atomicfu.atomic import kotlinx.coroutines.* @@ -19,15 +18,14 @@ class KIModel( val name: String, val opSet: OperatorSetRegistry, val graph: KIGraph, - private val useAllocator: Boolean = true, - limiterParallelismCounter: Int = PlatformUtils.cores, - arrayStorageLimit: Long = Long.MAX_VALUE + memoryLimiter: MemoryLimiter = MemoryLimiters.Default, + parallelismLimit: Int = PlatformUtils.cores, ) : Model>, Profilable { private val profiles: MutableList = ArrayList() @OptIn(ExperimentalCoroutinesApi::class) - private val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(limiterParallelismCounter) - private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(arrayStorageLimit) + private val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(parallelismLimit) + private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(memoryLimiter) override fun addProfilingContext(name: String): ProfilingContext = ProfilingContext(name).apply { profiles.add(this) } override fun analyzeProfilingResults(): ProfileAnalysisEntry = profiles.analyze("Model $name") @@ -40,33 +38,26 @@ class KIModel( ) val limiterContext = ParallelismLimiterContext(dispatcher) - val results = if (useAllocator) { - var coreReserved = false - - try { - withContext(NonCancellable) { - ResourcesDispatcher.reserveCore() - coreReserved = true - } + var coreReserved = false + val results = try { + withContext(NonCancellable) { + ResourcesDispatcher.reserveCore() + coreReserved = true + } - val allocatorContext = modelArrayStorage.createAllocatorContext() - val mixedContext = allocatorContext + limiterContext + val allocatorContext = modelArrayStorage.createAllocatorContext() + val mixedContext = allocatorContext + limiterContext - withContext(mixedContext) { - val coroutineContext = coroutineContext[AllocatorContext.Key]!! - val execResult = graph.execute(input, contexts) - execResult.forEach { it.markOutput() } - coroutineContext.closeAllocated() - execResult - } - } finally { - if (coreReserved) { - ResourcesDispatcher.releaseCore() - } + withContext(mixedContext) { + val coroutineContext = coroutineContext[AllocatorContext.Key]!! + val execResult = graph.execute(input, contexts) + execResult.forEach { it.markOutput() } + coroutineContext.closeAllocated() + execResult } - } else { - withContext(limiterContext) { - graph.execute(input, contexts) + } finally { + if (coreReserved) { + ResourcesDispatcher.releaseCore() } } @@ -85,15 +76,14 @@ class KIModel( suspend operator fun invoke( proto: ModelProto, - useAllocator: Boolean = true, + memoryLimiter: MemoryLimiter = MemoryLimiters.Default, limiterParallelismCounter: Int = PlatformUtils.cores, - arrayStorageLimit: Long = Long.MAX_VALUE ): KIModel { val name = "${proto.domain}:${proto.modelVersion}" val id = "$name:${generateModelId()}" val opSet = OperatorSetRegistry(proto.opSetImport) val graph = KIGraph(proto.graph!!, opSet) - return KIModel(id, name, opSet, graph, useAllocator, limiterParallelismCounter, arrayStorageLimit) + return KIModel(id, name, opSet, graph, memoryLimiter, limiterParallelismCounter) } } } diff --git a/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/AllocatorContext.kt b/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/AllocatorContext.kt index 8a9740832..45aec0583 100644 --- a/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/AllocatorContext.kt +++ b/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/AllocatorContext.kt @@ -14,15 +14,19 @@ data class AllocatorContext internal constructor( override val key: CoroutineContext.Key<*> get() = Key internal fun getArrayContainers(type: ArrayTypes, size: Int, count: Int): Array { - val arrayContainers = arrayOfNulls(count) - for (i in 0 until count) { - val container = unusedContainers.getArrayContainer(type, size) - if (!container.isNewlyCreated) - limiter.freeMemory(container.size.toLong()) - arrayContainers[i] = container - usedContainers.add(container) + if (limiter !is NoAllocatorMemoryLimiter) { + val arrayContainers = arrayOfNulls(count) + for (i in 0 until count) { + val container = unusedContainers.getArrayContainer(type, size) + if (!container.isNewlyCreated) + limiter.freeMemory(container.size.toLong()) + arrayContainers[i] = container + usedContainers.add(container) + } + return arrayContainers as Array + } else { + return Array(count) { ArrayContainer(type, size) } } - return arrayContainers as Array } fun closeAllocated() { diff --git a/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/MemoryLimiter.kt b/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/MemoryLimiter.kt index bedf42d2e..3a99faeb7 100644 --- a/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/MemoryLimiter.kt +++ b/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/MemoryLimiter.kt @@ -1,12 +1,18 @@ package io.kinference.ndarray.arrays.memory +import io.kinference.utils.PlatformUtils import kotlinx.atomicfu.AtomicLong import kotlinx.atomicfu.atomic -class MemoryLimiter(private val memoryLimit: Long) { +interface MemoryLimiter { + fun checkMemoryLimitAndAdd(returned: Long): Boolean + fun freeMemory(deducted: Long) +} + +class BaseMemoryLimiter(private val memoryLimit: Long) : MemoryLimiter { private var usedMemory: AtomicLong = atomic(0L) - fun checkMemoryLimitAndAdd(returned: Long): Boolean { + override fun checkMemoryLimitAndAdd(returned: Long): Boolean { val currentMemory = usedMemory.addAndGet(returned) return if (currentMemory > memoryLimit) { usedMemory.addAndGet(-returned) @@ -14,7 +20,22 @@ class MemoryLimiter(private val memoryLimit: Long) { } else true } - fun freeMemory(deducted: Long) { + override fun freeMemory(deducted: Long) { usedMemory.addAndGet(-deducted) } } + +object MemoryLimiters { + val Default: MemoryLimiter = BaseMemoryLimiter((PlatformUtils.maxHeap * 0.3).toLong()) + val NoAllocator: MemoryLimiter = NoAllocatorMemoryLimiter +} + +internal object NoAllocatorMemoryLimiter : MemoryLimiter { + override fun checkMemoryLimitAndAdd(returned: Long): Boolean { + return false + } + + override fun freeMemory(deducted: Long) { + + } +} diff --git a/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/ModelArrayStorage.kt b/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/ModelArrayStorage.kt index a3a64c927..fa03be260 100644 --- a/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/ModelArrayStorage.kt +++ b/ndarray/ndarray-core/src/jvmMain/kotlin/io/kinference/ndarray/arrays/memory/ModelArrayStorage.kt @@ -5,8 +5,7 @@ import io.kinference.utils.Closeable import io.kinference.utils.ConcurrentQueue -class ModelArrayStorage(memoryLimit: Long = Long.MAX_VALUE) : Closeable { - private val limiter: MemoryLimiter = MemoryLimiter(memoryLimit) +class ModelArrayStorage(private val limiter: MemoryLimiter = MemoryLimiters.Default) : Closeable { private val unusedArrays: ConcurrentQueue = ConcurrentQueue() companion object {