Skip to content

Commit

Permalink
JBAI-4832 [core] Refactored memory management in model loading: Defau…
Browse files Browse the repository at this point in the history
…lt limiter for the 0.3 of the max heap size, NoAllocator limiter when user doesn't want to reuse any allocated arrays.
  • Loading branch information
dmitriyb committed Jul 25, 2024
1 parent 9ca67a5 commit 3e9a8b0
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import io.kinference.core.optimizer.rules.OptimizerRuleSet
import io.kinference.data.ONNXData
import io.kinference.data.ONNXDataType
import io.kinference.model.IrOptimizableEngine
import io.kinference.ndarray.arrays.memory.MemoryLimiter
import io.kinference.ndarray.arrays.memory.MemoryLimiters
import io.kinference.optimizer.GraphOptimizer
import io.kinference.optimizer.OptimizerRule
import io.kinference.protobuf.*
import io.kinference.protobuf.message.*
import io.kinference.utils.CommonDataLoader
import io.kinference.utils.PlatformUtils
import okio.Buffer
import okio.Path
import okio.Path.Companion.toPath
Expand Down Expand Up @@ -48,37 +51,37 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {

fun protoReader(bytes: ByteArray) = ProtobufReader(Buffer().write(bytes), KI_READER_CONFIG)

suspend fun loadModel(bytes: ByteArray, optimize: Boolean, useAllocator: Boolean): KIModel {
suspend fun loadModel(bytes: ByteArray, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
val rules = if (optimize) OptimizerRuleSet.DEFAULT_OPT_RULES else emptyList()
return loadModel(bytes, rules, useAllocator)
return loadModel(bytes, rules, memoryLimiter, parallelismLimit)
}

override suspend fun loadModel(bytes: ByteArray, optimize: Boolean): KIModel {
return loadModel(bytes, optimize, true)
return loadModel(bytes, optimize, MemoryLimiters.Default, PlatformUtils.cores)
}

override suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>): KIModel = loadModel(bytes, rules, true)
override suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>): KIModel = loadModel(bytes, rules, MemoryLimiters.Default, PlatformUtils.cores)

suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>, useAllocator: Boolean): KIModel {
suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
val modelScheme = ModelProto.decode(protoReader(bytes))
val model = KIModel(modelScheme, useAllocator)
val model = KIModel(modelScheme, memoryLimiter)

return if (rules.isNotEmpty()) {
val newGraph = GraphOptimizer(model.graph).run(rules) as KIGraph
KIModel(model.id, model.name, model.opSet, newGraph, useAllocator)
KIModel(model.id, model.name, model.opSet, newGraph, memoryLimiter, parallelismLimit)
} else {
model
}
}

override suspend fun loadModel(bytes: ByteArray): KIModel = loadModel(bytes, optimize = true)

suspend fun loadModel(path: Path, optimize: Boolean, useAllocator: Boolean): KIModel {
return loadModel(CommonDataLoader.bytes(path), optimize, useAllocator)
suspend fun loadModel(path: Path, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
return loadModel(CommonDataLoader.bytes(path), optimize, memoryLimiter, parallelismLimit)
}

override suspend fun loadModel(path: Path, optimize: Boolean): KIModel {
return loadModel(path, optimize, true)
return loadModel(path, optimize, MemoryLimiters.Default, PlatformUtils.cores)
}

override suspend fun loadModel(path: Path): KIModel = loadModel(path, optimize = true)
Expand All @@ -87,12 +90,12 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {
return loadModel(CommonDataLoader.bytes(path), rules)
}

suspend fun loadModel(path: String, optimize: Boolean, useAllocator: Boolean): KIModel {
return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, useAllocator)
suspend fun loadModel(path: String, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, memoryLimiter, parallelismLimit)
}

override suspend fun loadModel(path: String, optimize: Boolean): KIModel {
return loadModel(path, optimize, true)
return loadModel(path, optimize, MemoryLimiters.Default, PlatformUtils.cores)
}

override suspend fun loadModel(path: String): KIModel = loadModel(path, optimize = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import io.kinference.core.graph.KIGraph
import io.kinference.core.markOutput
import io.kinference.graph.Contexts
import io.kinference.model.Model
import io.kinference.ndarray.arrays.memory.*
import io.kinference.operator.OperatorSetRegistry
import io.kinference.profiler.*
import io.kinference.protobuf.message.ModelProto
import io.kinference.ndarray.arrays.memory.AllocatorContext
import io.kinference.ndarray.arrays.memory.ModelArrayStorage
import io.kinference.utils.*
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*
Expand All @@ -19,15 +18,14 @@ class KIModel(
val name: String,
val opSet: OperatorSetRegistry,
val graph: KIGraph,
private val useAllocator: Boolean = true,
limiterParallelismCounter: Int = PlatformUtils.cores,
arrayStorageLimit: Long = Long.MAX_VALUE
memoryLimiter: MemoryLimiter = MemoryLimiters.Default,
parallelismLimit: Int = PlatformUtils.cores,
) : Model<KIONNXData<*>>, Profilable {
private val profiles: MutableList<ProfilingContext> = ArrayList()

@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(limiterParallelismCounter)
private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(arrayStorageLimit)
private val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(parallelismLimit)
private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(memoryLimiter)

override fun addProfilingContext(name: String): ProfilingContext = ProfilingContext(name).apply { profiles.add(this) }
override fun analyzeProfilingResults(): ProfileAnalysisEntry = profiles.analyze("Model $name")
Expand All @@ -40,33 +38,26 @@ class KIModel(
)

val limiterContext = ParallelismLimiterContext(dispatcher)
val results = if (useAllocator) {
var coreReserved = false

try {
withContext(NonCancellable) {
ResourcesDispatcher.reserveCore()
coreReserved = true
}
var coreReserved = false
val results = try {
withContext(NonCancellable) {
ResourcesDispatcher.reserveCore()
coreReserved = true
}

val allocatorContext = modelArrayStorage.createAllocatorContext()
val mixedContext = allocatorContext + limiterContext
val allocatorContext = modelArrayStorage.createAllocatorContext()
val mixedContext = allocatorContext + limiterContext

withContext(mixedContext) {
val coroutineContext = coroutineContext[AllocatorContext.Key]!!
val execResult = graph.execute(input, contexts)
execResult.forEach { it.markOutput() }
coroutineContext.closeAllocated()
execResult
}
} finally {
if (coreReserved) {
ResourcesDispatcher.releaseCore()
}
withContext(mixedContext) {
val coroutineContext = coroutineContext[AllocatorContext.Key]!!
val execResult = graph.execute(input, contexts)
execResult.forEach { it.markOutput() }
coroutineContext.closeAllocated()
execResult
}
} else {
withContext(limiterContext) {
graph.execute(input, contexts)
} finally {
if (coreReserved) {
ResourcesDispatcher.releaseCore()
}
}

Expand All @@ -85,15 +76,14 @@ class KIModel(

suspend operator fun invoke(
proto: ModelProto,
useAllocator: Boolean = true,
memoryLimiter: MemoryLimiter = MemoryLimiters.Default,
limiterParallelismCounter: Int = PlatformUtils.cores,
arrayStorageLimit: Long = Long.MAX_VALUE
): KIModel {
val name = "${proto.domain}:${proto.modelVersion}"
val id = "$name:${generateModelId()}"
val opSet = OperatorSetRegistry(proto.opSetImport)
val graph = KIGraph(proto.graph!!, opSet)
return KIModel(id, name, opSet, graph, useAllocator, limiterParallelismCounter, arrayStorageLimit)
return KIModel(id, name, opSet, graph, memoryLimiter, limiterParallelismCounter)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ data class AllocatorContext internal constructor(
override val key: CoroutineContext.Key<*> get() = Key

internal fun getArrayContainers(type: ArrayTypes, size: Int, count: Int): Array<ArrayContainer> {
val arrayContainers = arrayOfNulls<ArrayContainer>(count)
for (i in 0 until count) {
val container = unusedContainers.getArrayContainer(type, size)
if (!container.isNewlyCreated)
limiter.freeMemory(container.size.toLong())
arrayContainers[i] = container
usedContainers.add(container)
if (limiter !is NoAllocatorMemoryLimiter) {
val arrayContainers = arrayOfNulls<ArrayContainer>(count)
for (i in 0 until count) {
val container = unusedContainers.getArrayContainer(type, size)
if (!container.isNewlyCreated)
limiter.freeMemory(container.size.toLong())
arrayContainers[i] = container
usedContainers.add(container)
}
return arrayContainers as Array<ArrayContainer>
} else {
return Array(count) { ArrayContainer(type, size) }
}
return arrayContainers as Array<ArrayContainer>
}

fun closeAllocated() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
package io.kinference.ndarray.arrays.memory

import io.kinference.utils.PlatformUtils
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic

class MemoryLimiter(private val memoryLimit: Long) {
interface MemoryLimiter {
fun checkMemoryLimitAndAdd(returned: Long): Boolean
fun freeMemory(deducted: Long)
}

class BaseMemoryLimiter(private val memoryLimit: Long) : MemoryLimiter {
private var usedMemory: AtomicLong = atomic(0L)

fun checkMemoryLimitAndAdd(returned: Long): Boolean {
override fun checkMemoryLimitAndAdd(returned: Long): Boolean {
val currentMemory = usedMemory.addAndGet(returned)
return if (currentMemory > memoryLimit) {
usedMemory.addAndGet(-returned)
false
} else true
}

fun freeMemory(deducted: Long) {
override fun freeMemory(deducted: Long) {
usedMemory.addAndGet(-deducted)
}
}

object MemoryLimiters {
val Default: MemoryLimiter = BaseMemoryLimiter((PlatformUtils.maxHeap * 0.3).toLong())
val NoAllocator: MemoryLimiter = NoAllocatorMemoryLimiter
}

internal object NoAllocatorMemoryLimiter : MemoryLimiter {
override fun checkMemoryLimitAndAdd(returned: Long): Boolean {
return false
}

override fun freeMemory(deducted: Long) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import io.kinference.utils.Closeable

import io.kinference.utils.ConcurrentQueue

class ModelArrayStorage(memoryLimit: Long = Long.MAX_VALUE) : Closeable {
private val limiter: MemoryLimiter = MemoryLimiter(memoryLimit)
class ModelArrayStorage(private val limiter: MemoryLimiter = MemoryLimiters.Default) : Closeable {
private val unusedArrays: ConcurrentQueue<ArrayStorage> = ConcurrentQueue()

companion object {
Expand Down

0 comments on commit 3e9a8b0

Please sign in to comment.