Skip to content

Commit

Permalink
JBAI-4832 [ndarray] add maxHeap property and revise memory handling i…
Browse files Browse the repository at this point in the history
…n KIModel
  • Loading branch information
dmitriyb committed Jul 23, 2024
1 parent 3a5df7b commit 9ca67a5
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ class KIModel(
val opSet: OperatorSetRegistry,
val graph: KIGraph,
private val useAllocator: Boolean = true,
limiterParallelismCounter: Int = PlatformUtils.cores
limiterParallelismCounter: Int = PlatformUtils.cores,
arrayStorageLimit: Long = Long.MAX_VALUE
) : Model<KIONNXData<*>>, Profilable {
private val profiles: MutableList<ProfilingContext> = ArrayList()

@OptIn(ExperimentalCoroutinesApi::class)
private val dispatcher: CoroutineDispatcher = Dispatchers.Default.limitedParallelism(limiterParallelismCounter)
// private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(800 * 1024 * 1024)
private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage()
private val modelArrayStorage: ModelArrayStorage = ModelArrayStorage(arrayStorageLimit)

override fun addProfilingContext(name: String): ProfilingContext = ProfilingContext(name).apply { profiles.add(this) }
override fun analyzeProfilingResults(): ProfileAnalysisEntry = profiles.analyze("Model $name")
Expand All @@ -49,7 +49,7 @@ class KIModel(
coreReserved = true
}

val allocatorContext = modelArrayStorage.createAllocatorContext() // AllocatorContext(id, getInferenceCycleId())
val allocatorContext = modelArrayStorage.createAllocatorContext()
val mixedContext = allocatorContext + limiterContext

withContext(mixedContext) {
Expand Down Expand Up @@ -86,13 +86,14 @@ class KIModel(
suspend operator fun invoke(
proto: ModelProto,
useAllocator: Boolean = true,
limiterParallelismCounter: Int = PlatformUtils.cores
limiterParallelismCounter: Int = PlatformUtils.cores,
arrayStorageLimit: Long = Long.MAX_VALUE
): KIModel {
val name = "${proto.domain}:${proto.modelVersion}"
val id = "$name:${generateModelId()}"
val opSet = OperatorSetRegistry(proto.opSetImport)
val graph = KIGraph(proto.graph!!, opSet)
return KIModel(id, name, opSet, graph, useAllocator, limiterParallelismCounter)
return KIModel(id, name, opSet, graph, useAllocator, limiterParallelismCounter, arrayStorageLimit)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,16 @@ data class AllocatorContext internal constructor(
for (i in 0 until count) {
val container = unusedContainers.getArrayContainer(type, size)
if (!container.isNewlyCreated)
limiter.freeMemory(container.size)
limiter.freeMemory(container.size.toLong())
arrayContainers[i] = container
usedContainers.add(container)
}
// val arrayContainers = Array(count) { unusedContainers.getArrayContainer(type, size) }
// usedContainers.addAll(arrayContainers)
return arrayContainers as Array<ArrayContainer>
}

fun closeAllocated() {
usedContainers.forEach {
if (!it.isOutput && limiter.checkMemoryLimitAndAdd(it.size)) {
if (!it.isOutput && limiter.checkMemoryLimitAndAdd(it.size.toLong())) {
unusedContainers[it.arrayTypeIndex, it.arraySizeIndex].addLast(it)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,20 @@
package io.kinference.ndarray.arrays.memory

import kotlinx.atomicfu.AtomicInt
import kotlinx.atomicfu.AtomicLong
import kotlinx.atomicfu.atomic

interface MemoryLimiter {
fun checkMemoryLimitAndAdd(returned: Int): Boolean
fun freeMemory(deducted: Int)
}

object UnlimitedMemoryLimiter : MemoryLimiter {
override fun checkMemoryLimitAndAdd(returned: Int): Boolean {
return true
}

override fun freeMemory(deducted: Int) {

}
}

class LimitedMemoryLimiter(val memoryLimit: Int) : MemoryLimiter {
private var usedMemory: AtomicInt = atomic(0)
class MemoryLimiter(private val memoryLimit: Long) {
private var usedMemory: AtomicLong = atomic(0L)

override fun checkMemoryLimitAndAdd(returned: Int): Boolean {
fun checkMemoryLimitAndAdd(returned: Long): Boolean {
val currentMemory = usedMemory.addAndGet(returned)
return if (currentMemory > memoryLimit) {
usedMemory.addAndGet(-returned)
false
} else true
}

override fun freeMemory(deducted: Int) {
fun freeMemory(deducted: Long) {
usedMemory.addAndGet(-deducted)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import io.kinference.utils.Closeable

import io.kinference.utils.ConcurrentQueue

class ModelArrayStorage(private val limiter: MemoryLimiter = UnlimitedMemoryLimiter) : Closeable {
class ModelArrayStorage(memoryLimit: Long = Long.MAX_VALUE) : Closeable {
private val limiter: MemoryLimiter = MemoryLimiter(memoryLimit)
private val unusedArrays: ConcurrentQueue<ArrayStorage> = ConcurrentQueue()

constructor(memoryLimit: Int) : this(LimitedMemoryLimiter(memoryLimit))

companion object {
private const val INIT_SIZE_VALUE: Int = 2
private val typeSize: Int = ArrayTypes.entries.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ expect object PlatformUtils {
* Number of threads based on the number of cores (usually cores - 1)
*/
val threads: Int

/**
* Maximum amount of heap memory that is available
*/
val maxHeap: Long
}

fun <T> PlatformUtils.forPlatform(jsValue: T, jvmValue: T) = when (platform) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,7 @@ actual object PlatformUtils {
//we don't support multithreading for JS yet
actual val cores: Int = 1
actual val threads: Int = 1

// we don't limit memory for JS
actual val maxHeap: Long = Long.MAX_VALUE
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ actual object PlatformUtils {
actual val cores: Int by lazy { Runtime.getRuntime().availableProcessors() }

actual val threads: Int by lazy { if (cores == 1) 1 else cores - 1 }

actual val maxHeap: Long by lazy { Runtime.getRuntime().maxMemory() }
}

0 comments on commit 9ca67a5

Please sign in to comment.