Skip to content

Commit

Permalink
Merge pull request #196 from JetBrains-Research/optimize-coroutines
Browse files Browse the repository at this point in the history
Memory restrictions and limited parallelism for model prediction
  • Loading branch information
dmitriyb authored Jul 31, 2024
2 parents b6c2e7e + 502cbba commit c83a315
Show file tree
Hide file tree
Showing 34 changed files with 279 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import io.kinference.core.optimizer.rules.OptimizerRuleSet
import io.kinference.data.ONNXData
import io.kinference.data.ONNXDataType
import io.kinference.model.IrOptimizableEngine
import io.kinference.ndarray.arrays.memory.ArrayDispatcher
import io.kinference.ndarray.arrays.memory.MemoryLimiter
import io.kinference.ndarray.arrays.memory.MemoryLimiters
import io.kinference.optimizer.GraphOptimizer
import io.kinference.optimizer.OptimizerRule
import io.kinference.protobuf.*
import io.kinference.protobuf.message.*
import io.kinference.utils.CommonDataLoader
import io.kinference.utils.PlatformUtils
import okio.Buffer
import okio.Path
import okio.Path.Companion.toPath
Expand Down Expand Up @@ -49,37 +51,37 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {

fun protoReader(bytes: ByteArray) = ProtobufReader(Buffer().write(bytes), KI_READER_CONFIG)

suspend fun loadModel(bytes: ByteArray, optimize: Boolean, useAllocator: Boolean): KIModel {
suspend fun loadModel(bytes: ByteArray, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
val rules = if (optimize) OptimizerRuleSet.DEFAULT_OPT_RULES else emptyList()
return loadModel(bytes, rules, useAllocator)
return loadModel(bytes, rules, memoryLimiter, parallelismLimit)
}

override suspend fun loadModel(bytes: ByteArray, optimize: Boolean): KIModel {
return loadModel(bytes, optimize, true)
return loadModel(bytes, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
}

override suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>): KIModel = loadModel(bytes, rules, true)
override suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>): KIModel = loadModel(bytes, rules, MemoryLimiters.NoAllocator, PlatformUtils.cores)

suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>, useAllocator: Boolean): KIModel {
suspend fun loadModel(bytes: ByteArray, rules: List<OptimizerRule<KIONNXData<*>>>, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
val modelScheme = ModelProto.decode(protoReader(bytes))
val model = KIModel(modelScheme, useAllocator)
val model = KIModel(modelScheme, memoryLimiter)

return if (rules.isNotEmpty()) {
val newGraph = GraphOptimizer(model.graph).run(rules) as KIGraph
KIModel(model.id, model.name, model.opSet, newGraph, useAllocator)
KIModel(model.id, model.name, model.opSet, newGraph, memoryLimiter, parallelismLimit)
} else {
model
}
}

override suspend fun loadModel(bytes: ByteArray): KIModel = loadModel(bytes, optimize = true)

suspend fun loadModel(path: Path, optimize: Boolean, useAllocator: Boolean): KIModel {
return loadModel(CommonDataLoader.bytes(path), optimize, useAllocator)
suspend fun loadModel(path: Path, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
return loadModel(CommonDataLoader.bytes(path), optimize, memoryLimiter, parallelismLimit)
}

override suspend fun loadModel(path: Path, optimize: Boolean): KIModel {
return loadModel(path, optimize, true)
return loadModel(path, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
}

override suspend fun loadModel(path: Path): KIModel = loadModel(path, optimize = true)
Expand All @@ -88,12 +90,12 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {
return loadModel(CommonDataLoader.bytes(path), rules)
}

suspend fun loadModel(path: String, optimize: Boolean, useAllocator: Boolean): KIModel {
return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, useAllocator)
suspend fun loadModel(path: String, optimize: Boolean, memoryLimiter: MemoryLimiter, parallelismLimit: Int): KIModel {
return loadModel(CommonDataLoader.bytes(path.toPath()), optimize, memoryLimiter, parallelismLimit)
}

override suspend fun loadModel(path: String, optimize: Boolean): KIModel {
return loadModel(path, optimize, true)
return loadModel(path, optimize, MemoryLimiters.NoAllocator, PlatformUtils.cores)
}

override suspend fun loadModel(path: String): KIModel = loadModel(path, optimize = true)
Expand All @@ -117,8 +119,4 @@ object KIEngine : IrOptimizableEngine<KIONNXData<*>> {
override suspend fun loadData(path: String, type: ONNXDataType): KIONNXData<*> {
return loadData(path.toPath(), type)
}

fun clearCache() {
ArrayDispatcher.clearCache()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,28 @@ import io.kinference.core.graph.KIGraph
import io.kinference.core.markOutput
import io.kinference.graph.Contexts
import io.kinference.model.Model
import io.kinference.ndarray.arrays.memory.*
import io.kinference.operator.OperatorSetRegistry
import io.kinference.profiler.*
import io.kinference.protobuf.message.ModelProto
import io.kinference.ndarray.arrays.memory.AllocatorContext
import kotlinx.coroutines.withContext
import io.kinference.utils.*
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.*

class KIModel(val id: String, val name: String, val opSet: OperatorSetRegistry, val graph: KIGraph, val useAllocator: Boolean = true) : Model<KIONNXData<*>>, Profilable {
private val inferenceCycleCounter = atomic(0L)
class KIModel(
val id: String,
val name: String,
val opSet: OperatorSetRegistry,
val graph: KIGraph,
memoryLimiter: MemoryLimiter = MemoryLimiters.NoAllocator,
parallelismLimit: Int = PlatformUtils.cores,
) : Model<KIONNXData<*>>, Profilable, Cacheable {
private val profiles: MutableList<ProfilingContext> = ArrayList()

@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(parallelismLimit)
private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(memoryLimiter)

override fun addProfilingContext(name: String): ProfilingContext = ProfilingContext(name).apply { profiles.add(this) }
override fun analyzeProfilingResults(): ProfileAnalysisEntry = profiles.analyze("Model $name")
override fun resetProfiles() = profiles.clear()
Expand All @@ -25,39 +37,57 @@ class KIModel(val id: String, val name: String, val opSet: OperatorSetRegistry,
if (profile) addProfilingContext("Model $name") else null
)

val results = if (useAllocator) {
withContext(AllocatorContext(id, getInferenceCycleId())) {
val limiterContext = ParallelismLimiterContext(dispatcher)
var coreReserved = false
val results = try {
withContext(NonCancellable) {
ResourcesDispatcher.reserveCore()
coreReserved = true
}

val allocatorContext = modelArrayStorage.createAllocatorContext()
val mixedContext = allocatorContext + limiterContext

withContext(mixedContext) {
val coroutineContext = coroutineContext[AllocatorContext.Key]!!
val execResult = graph.execute(input, contexts)
execResult.forEach { it.markOutput() }
coroutineContext.closeAllocated()
execResult
}
} else {
graph.execute(input, contexts)
} finally {
if (coreReserved) {
ResourcesDispatcher.releaseCore()
}
}

return results.associateBy { it.name!! }
}


override suspend fun close() {
graph.close()
modelArrayStorage.close()
}

private fun getInferenceCycleId(): Long = inferenceCycleCounter.incrementAndGet()
override fun clearCache() {
modelArrayStorage.clearCache()
}

companion object {
private val modelCounter = atomic(0)

private fun generateModelId(): Int = modelCounter.incrementAndGet()

suspend operator fun invoke(proto: ModelProto, useAllocator: Boolean = true): KIModel {
suspend operator fun invoke(
proto: ModelProto,
memoryLimiter: MemoryLimiter = MemoryLimiters.NoAllocator,
limiterParallelismCounter: Int = PlatformUtils.cores,
): KIModel {
val name = "${proto.domain}:${proto.modelVersion}"
val id = "$name:${generateModelId()}"
val opSet = OperatorSetRegistry(proto.opSetImport)
val graph = KIGraph(proto.graph!!, opSet)
return KIModel(id, name, opSet, graph, useAllocator)
return KIModel(id, name, opSet, graph, memoryLimiter, limiterParallelismCounter)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import io.kinference.operator.*
import io.kinference.optimizer.GraphOptimizer.Companion.isOpt
import io.kinference.protobuf.message.AttributeProto
import io.kinference.protobuf.message.TensorProto
import io.kinference.utils.launchWithLimitOrDefault
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlin.math.min
import kotlin.math.sqrt

Expand All @@ -34,7 +34,7 @@ sealed class Attention(name: String, info: OperatorInfo, attributes: Map<String,
coroutineScope {
for (batchNum in 0 until batchSize) {
for (numHead in 0 until numHeads) {
launch {
launchWithLimitOrDefault {
val tempScores = scores.view(batchNum, numHead) as NumberNDArrayCore
val tempOutput = output.viewMutable(batchNum, numHead) as MutableNumberNDArray

Expand Down Expand Up @@ -144,7 +144,7 @@ sealed class Attention(name: String, info: OperatorInfo, attributes: Map<String,
coroutineScope {
for (batchNum in 0 until batchSize) {
for (numHead in 0 until numHeads) {
launch {
launchWithLimitOrDefault {
val queryMatrix = queries.view(batchNum, numHead)
val presentMatrix = present.view(0, batchNum, numHead) as NumberNDArray
val scoresMatrix = scores.viewMutable(batchNum, numHead) as MutableNumberNDArray
Expand Down Expand Up @@ -257,7 +257,7 @@ class AttentionVer1(name: String, attributes: Map<String, Attribute<Any>>, input
val output = qkv[qkvIdx]
for (batchNum in 0 until batchSize) {
val inputMatrix = input.view(batchNum)
for (numHead in 0 until numHeads) launch {
for (numHead in 0 until numHeads) launchWithLimitOrDefault {
val weightsMatrix = weights.view(qkvIdx, numHead) as NumberNDArrayCore
val biasMatrix = bias.view(qkvIdx, numHead) as NumberNDArray

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import io.kinference.core.operators.layer.recurrent.LayerDirection
import io.kinference.ndarray.arrays.*
import io.kinference.ndarray.extensions.allocateNDArray
import io.kinference.primitives.types.DataType
import io.kinference.utils.launchWithLimitOrDefault
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch

class GRULayer(hiddenSize: Int, activations: List<String>, direction: LayerDirection): GRULayerBase(hiddenSize, activations, direction) {
init {
Expand Down Expand Up @@ -75,7 +75,7 @@ class GRULayer(hiddenSize: Int, activations: List<String>, direction: LayerDirec
//TODO: research optimal batchSize for run with coroutines
for (seqNum in seqRange) {
if (batchSize > 1) {
coroutineScope { wrapper(seqNum) { launch { it() } } }
coroutineScope { wrapper(seqNum) { launchWithLimitOrDefault { it() } } }
} else {
wrapper(seqNum)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import io.kinference.ndarray.arrays.*
import io.kinference.ndarray.arrays.tiled.FloatTiledArray
import io.kinference.primitives.types.DataType
import io.kinference.trees.*
import io.kinference.utils.LoggerFactory
import io.kinference.utils.PlatformUtils
import io.kinference.utils.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlin.math.*

class KICoreTreeEnsemble(
Expand Down Expand Up @@ -43,7 +41,7 @@ class KICoreTreeEnsemble(

coroutineScope {
for (i in 0 until leadDim step batchSize) {
launch {
launchWithLimitOrDefault {
for (j in i until min(i + batchSize, leadDim))
applyEntry(arrayBlocks[j], outputBlocks[j])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import io.kinference.runners.AccuracyRunner
import io.kinference.runners.PerformanceRunner
import io.kinference.utils.*

object KITestEngine : TestEngine<KIONNXData<*>>(KIEngine), Cacheable {
object KITestEngine : TestEngine<KIONNXData<*>>(KIEngine) {
override fun checkEquals(expected: KIONNXData<*>, actual: KIONNXData<*>, delta: Double) {
KIAssertions.assertEquals(expected, actual, delta)
}
Expand All @@ -26,10 +26,6 @@ object KITestEngine : TestEngine<KIONNXData<*>>(KIEngine), Cacheable {
}
}

override fun clearCache() {
KIEngine.clearCache()
}

val KIAccuracyRunner = AccuracyRunner(KITestEngine)
val KIPerformanceRunner = PerformanceRunner(KITestEngine)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package io.kinference.ndarray.arrays

typealias StateMarker = () -> Unit

enum class ArrayTypes(val index: Int) {
ByteArray(0),
UByteArray(1),
ShortArray(2),
UShortArray(3),
IntArray(4),
UIntArray(5),
LongArray(6),
ULongArray(7),
FloatArray(8),
DoubleArray(9),
BooleanArray(10);
enum class ArrayTypes(val index: Int, val size: Int) {
ByteArray(0, Byte.SIZE_BYTES),
UByteArray(1, UByte.SIZE_BYTES),
ShortArray(2, Short.SIZE_BYTES),
UShortArray(3, UShort.SIZE_BYTES),
IntArray(4, Int.SIZE_BYTES),
UIntArray(5, UInt.SIZE_BYTES),
LongArray(6, Long.SIZE_BYTES),
ULongArray(7, ULong.SIZE_BYTES),
FloatArray(8, Float.SIZE_BYTES),
DoubleArray(9, Double.SIZE_BYTES),
BooleanArray(10, 1);
}

interface MemoryControlledArray {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.kinference.ndarray

import io.kinference.ndarray.arrays.Strides
import io.kinference.utils.launchWithLimitOrDefault
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlin.math.min

fun Double.toUShort() = this.toInt().toUShort()
Expand Down Expand Up @@ -93,7 +93,7 @@ suspend fun parallelizeByBlocks(blockSize: Int,
} else {
coroutineScope {
for ((index, blockStart) in (0 until countBlocks step batchSize).withIndex()) {
launch {
launchWithLimitOrDefault {
body(blockStart, min(blockStart + batchSize, countBlocks), index)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,33 @@ package io.kinference.ndarray.arrays.memory
import io.kinference.ndarray.arrays.*
import kotlin.coroutines.CoroutineContext

data class AllocatorContext(val modelName: String, val cycleId: Long) : CoroutineContext.Element {
data class AllocatorContext internal constructor(
private val unusedContainers: ArrayStorage,
private val limiter: MemoryLimiter,
private val returnStorageFn: (ArrayStorage) -> Unit
) : CoroutineContext.Element {
private val usedContainers: ArrayDeque<ArrayContainer> = ArrayDeque()
private val unusedContainers: ArrayStorage = ArrayDispatcher.getStorage()

companion object Key : CoroutineContext.Key<AllocatorContext>
override val key: CoroutineContext.Key<*> get() = Key

internal fun getArrayContainers(type: ArrayTypes, size: Int, count: Int): Array<ArrayContainer> {
val arrayContainers = Array(count) { unusedContainers.getArrayContainer(type, size) }
usedContainers.addAll(arrayContainers)
return arrayContainers
return if (limiter !is NoAllocatorMemoryLimiter) {
val result = Array(count) { unusedContainers.getArrayContainer(type, size) }
usedContainers.addAll(result)
result
} else {
Array(count) { ArrayContainer(type, size) }
}
}


fun closeAllocated() {
usedContainers.forEach {
if (!it.isOutput) {
if (!it.isOutput && limiter.checkMemoryLimitAndAdd(it.sizeBytes.toLong())) {
unusedContainers[it.arrayTypeIndex, it.arraySizeIndex].addLast(it)
}
}
ArrayDispatcher.returnStorage(unusedContainers)
usedContainers.clear()
returnStorageFn(unusedContainers)
}
}
Loading

0 comments on commit c83a315

Please sign in to comment.