Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ jreleaser = "org.jreleaser:org.jreleaser.gradle.plugin:1.20.0"
jspecify = "org.jspecify:jspecify:1.0.0"
junit = "org.junit.jupiter:junit-jupiter:5.14.0"
junit-launcher = "org.junit.platform:junit-platform-launcher:1.14.0"
kotlin-logging-jvm = "io.github.oshai:kotlin-logging-jvm:7.0.13"
kotlinx-coroutines-core = "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2"
slf4j = "org.slf4j:slf4j-api:2.0.17"
logback-classic = "ch.qos.logback:logback-classic:1.5.21"
trove4j-core = "net.sf.trove4j:core:3.1.0"
zstd-jni = "com.github.luben:zstd-jni:1.5.7-6"

[bundles]
Expand Down
11 changes: 11 additions & 0 deletions live-metrics-processor/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
plugins {
`java-conventions`
kotlin("jvm") version "2.3.0"
}

dependencies {
implementation(libs.kotlinx.coroutines.core)
implementation(libs.trove4j.core)
implementation(libs.kotlin.logging.jvm)
implementation(libs.logback.classic)
}
7 changes: 7 additions & 0 deletions live-metrics-processor/src/main/kotlin/DecompressionEntry.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import kotlin.time.Duration

data class DecompressionEntry(val logIndex: UByte, val entryIndex: UInt, val timeToDecompress: Duration, val compressedSize: Int, val decompressedSize: Int)

/** B/µs */
val DecompressionEntry.throughput: Double
get() = decompressedSize / timeToDecompress.inWholeNanoseconds.toDouble() * 1000.0
106 changes: 106 additions & 0 deletions live-metrics-processor/src/main/kotlin/DecompressionMetrics.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import gnu.trove.list.array.TIntArrayList
import gnu.trove.list.array.TLongArrayList
import kotlin.time.Duration
import kotlin.time.Duration.Companion.nanoseconds

class DecompressionMetrics(private val entryAmount: Int) {
var addedEntries = 0

var totalCompressed: Long = 0
var totalDecompressed: Long = 0
var totalTimeToDecompress: Duration = Duration.ZERO

lateinit var minDecompressTime: DecompressionEntry
var averageDecompressTime: Duration = Duration.ZERO
val decompressTimes = TLongArrayList(entryAmount)
val medianDecompressTime: Duration get() = decompressTimes[decompressTimes.size() / 2].nanoseconds
lateinit var maxDecompressTime: DecompressionEntry

// bytes per microsecond
var minThroughput: Double = 0.0
var averageThroughput: Double = 0.0
var maxThroughput: Double = 0.0

lateinit var minCompressedSize: DecompressionEntry
var averageCompressedSize: Double = 0.0
val compressedSizes = TIntArrayList(entryAmount)
val medianCompressedSize: Int get() = compressedSizes[compressedSizes.size() / 2]
lateinit var maxCompressedSize: DecompressionEntry

lateinit var minDecompressedSize: DecompressionEntry
var averageDecompressedSize: Double = 0.0
val decompressedSizes = TIntArrayList(entryAmount)
val medianDecompressedSize: Int get() = decompressedSizes[decompressedSizes.size() / 2]
lateinit var maxDecompressedSize: DecompressionEntry

@Synchronized
fun accept(entry: DecompressionEntry) {
val entryIndex = ++addedEntries

totalCompressed += entry.compressedSize
totalDecompressed += entry.decompressedSize
totalTimeToDecompress += entry.timeToDecompress

decompressTimes.add(entry.timeToDecompress.inWholeNanoseconds)
decompressedSizes.add(entry.decompressedSize)
compressedSizes.add(entry.compressedSize)

// https://en.wikipedia.org/wiki/Moving_average#Cumulative_average
fun newAverage(newEntryValue: Duration, currentAverage: Duration): Duration {
return currentAverage + ((newEntryValue - currentAverage) / entryIndex)
}

fun newAverage(newEntryValue: Double, currentAverage: Double): Double {
return currentAverage + ((newEntryValue - currentAverage) / entryIndex)
}

if (entryIndex == 1) {
minDecompressTime = entry
maxDecompressTime = entry
averageDecompressTime = entry.timeToDecompress

minThroughput = entry.throughput
averageThroughput = minThroughput
maxThroughput = minThroughput

minCompressedSize = entry
maxCompressedSize = entry
averageCompressedSize = entry.compressedSize.toDouble()

minDecompressedSize = entry
maxDecompressedSize = entry
averageDecompressedSize = entry.decompressedSize.toDouble()
} else {
minDecompressTime = if (minDecompressTime.timeToDecompress > entry.timeToDecompress) entry else minDecompressTime
maxDecompressTime = if (maxDecompressTime.timeToDecompress < entry.timeToDecompress) entry else maxDecompressTime
averageDecompressTime = newAverage(entry.timeToDecompress, averageDecompressTime)

val newThroughput = entry.throughput
minThroughput = if (minThroughput > newThroughput) newThroughput else minThroughput
maxThroughput = if (maxThroughput < newThroughput) newThroughput else maxThroughput
averageThroughput = newAverage(newThroughput, averageThroughput)

minCompressedSize = if (minCompressedSize.compressedSize > entry.compressedSize) entry else minCompressedSize
maxCompressedSize = if (maxCompressedSize.compressedSize < entry.compressedSize) entry else maxCompressedSize
averageCompressedSize = newAverage(entry.compressedSize.toDouble(), averageCompressedSize)

minDecompressedSize = if (minDecompressedSize.decompressedSize > entry.decompressedSize) entry else minDecompressedSize
maxDecompressedSize = if (maxDecompressedSize.decompressedSize < entry.decompressedSize) entry else maxDecompressedSize
averageDecompressedSize = newAverage(entry.decompressedSize.toDouble(), averageDecompressedSize)
}
}

fun finish() {
if (decompressTimes.size() != entryAmount) {
System.err.println("Mismatched entry amount, found ${decompressTimes.size()}, expected $entryAmount")
}

decompressTimes.sort()
decompressedSizes.sort()
compressedSizes.sort()
}

override fun toString(): String {
return "DecompressionMetrics(\n\taddedEntries=$addedEntries,\n\ttotalCompressed=$totalCompressed,\n\ttotalDecompressed=$totalDecompressed,\n\ttotalTimeToDecompress=$totalTimeToDecompress,\n\n\tminDecompressTime=$minDecompressTime,\n\taverageDecompressTime=$averageDecompressTime,\n\tmedianDecompressTime=$medianDecompressTime,\n\tmaxDecompressTime=$maxDecompressTime,\n\n\tminThroughput=$minThroughput,\n\taverageThroughput=$averageThroughput,\n\tmaxThroughput=$maxThroughput,\n\n\tminCompressedSize=$minCompressedSize,\n\taverageCompressedSize=$averageCompressedSize,\n\tmedianCompressedSize=$medianCompressedSize,\n\tmaxCompressedSize=$maxCompressedSize,\n\n\tminDecompressedSize=$minDecompressedSize,\n\taverageDecompressedSize=$averageDecompressedSize,\n\tmedianDecompressedSize=$medianDecompressedSize,\n\tmaxDecompressedSize=$maxDecompressedSize\n)"
}
}
219 changes: 219 additions & 0 deletions live-metrics-processor/src/main/kotlin/Main.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import java.io.DataInputStream
import java.nio.file.Files
import java.nio.file.Path
import kotlin.io.path.Path
import kotlin.io.path.inputStream
import kotlin.io.path.name
import kotlin.io.path.walk
import kotlin.time.Duration.Companion.nanoseconds

private const val entrySize = 8 + 4 + 4 + 2

private val logger = KotlinLogging.logger { }

private val shardFolderRegex = Regex("""shard-(\d+)-(?:zstd|zlib)""")
private val logFileRegex = Regex("""log-(\d+)\.bin""")

private const val minThroughput: Double = 100.0

suspend fun main(args: Array<String>) {
require(args.size == 1) {
"One argument must be present for the logs input directory (decompression-logs)"
}

val dispatcher = Dispatchers.IO.limitedParallelism(12)

val zstdSmallShards = arrayListOf<List<Path>>()
val zstdBigShards = arrayListOf<List<Path>>()
val zlibShards = arrayListOf<List<Path>>()

val logsDirectory = Path(args[0])
withContext(Dispatchers.IO) {
Files.walk(logsDirectory, 1)
.filter { it.name.matches(shardFolderRegex) }
.sorted(Comparator.comparingInt { shardFolder ->
shardFolderRegex.matchEntire(shardFolder.name)!!.groupValues[1].toInt()
})
.forEach { shardFolder ->
val shardId = shardFolderRegex.matchEntire(shardFolder.name)!!.groupValues[1].toInt()
val shards = when (shardId % 3) {
0 -> zstdSmallShards
1 -> zstdBigShards
2 -> zlibShards
else -> error("Unhandled shard $shardId")
}

shardFolder.walk()
.filter { it.name.matches(logFileRegex) }
.sortedBy { logFile ->
// Uses a timestamp not an index
logFileRegex.matchEntire(logFile.name)!!.groupValues[1].toLong()
}
.toList()
.also(shards::add)
}
}

fun List<List<Path>>.computeEntryCount(): Int {
return sumOf { shard ->
shard.sumOf { logFile ->
var retainedEntries = 0
readLogThroughputs(logFile) { throughput ->
if (throughput > minThroughput) {
retainedEntries++
}
}
retainedEntries
}
}
}

val zstdSmallMetrics = DecompressionMetrics(zstdSmallShards.computeEntryCount())
val zstdBigMetrics = DecompressionMetrics(zstdBigShards.computeEntryCount())
val zlibMetrics = DecompressionMetrics(zlibShards.computeEntryCount())

coroutineScope {
launch(dispatcher) { processShard(zstdSmallShards, zstdSmallMetrics) }
launch(dispatcher) { processShard(zstdBigShards, zstdBigMetrics) }
launch(dispatcher) { processShard(zlibShards, zlibMetrics) }
}

coroutineScope {
launch(dispatcher) { zstdSmallMetrics.finish() }
launch(dispatcher) { zstdBigMetrics.finish() }
launch(dispatcher) { zlibMetrics.finish() }
}

println("zstdSmall = $zstdSmallMetrics")
println("zstdBig = $zstdBigMetrics")
println("zlib = $zlibMetrics")

fun DecompressionEntry.decompressTimeStats(): String = "$timeToDecompress<br>${compressedSize.prettySize()} -> ${decompressedSize.prettySize()}"
fun DecompressionEntry.compressedStats(): String = "**${compressedSize.prettySize()}** -> ${decompressedSize.prettySize()}<br>$timeToDecompress"
fun DecompressionEntry.decompressedStats(): String = "${compressedSize.prettySize()} -> **${decompressedSize.prettySize()}**<br>$timeToDecompress"

println("""
| Stat | Zlib | Zstd (8K buf) | Zstd (128K buf) |
|------|------|---------------|-----------------|
| Entries | ${zlibMetrics.addedEntries.pretty()} | ${zstdSmallMetrics.addedEntries.pretty()} | ${zstdBigMetrics.addedEntries.pretty()} |
| Total compressed | ${zlibMetrics.totalCompressed.prettySize()} | ${zstdSmallMetrics.totalCompressed.prettySize()} | ${zstdBigMetrics.totalCompressed.prettySize()} |
| Total decompressed | ${zlibMetrics.totalDecompressed.prettySize()} | ${zstdSmallMetrics.totalDecompressed.prettySize()} | ${zstdBigMetrics.totalDecompressed.prettySize()} |
| Total time to decompress | ${zlibMetrics.totalTimeToDecompress} | ${zstdSmallMetrics.totalTimeToDecompress} | ${zstdBigMetrics.totalTimeToDecompress} |
| Min decompress time | ${zlibMetrics.minDecompressTime.decompressTimeStats()} | ${zstdSmallMetrics.minDecompressTime.decompressTimeStats()} | ${zstdBigMetrics.minDecompressTime.decompressTimeStats()} |
| Average decompress time | ${zlibMetrics.averageDecompressTime} | ${zstdSmallMetrics.averageDecompressTime} | ${zstdBigMetrics.averageDecompressTime} |
| Median decompress time | ${zlibMetrics.medianDecompressTime} | ${zstdSmallMetrics.medianDecompressTime} | ${zstdBigMetrics.medianDecompressTime} |
| Max decompress time | ${zlibMetrics.maxDecompressTime.decompressTimeStats()} | ${zstdSmallMetrics.maxDecompressTime.decompressTimeStats()} | ${zstdBigMetrics.maxDecompressTime.decompressTimeStats()} |
| Min throughput (B/µs) | ${zlibMetrics.minThroughput} | ${zstdSmallMetrics.minThroughput} | ${zstdBigMetrics.minThroughput} |
| Average throughput (B/µs) | ${zlibMetrics.averageThroughput} | ${zstdSmallMetrics.averageThroughput} | ${zstdBigMetrics.averageThroughput} |
| Max throughput (B/µs) | ${zlibMetrics.maxThroughput} | ${zstdSmallMetrics.maxThroughput} | ${zstdBigMetrics.maxThroughput} |
| Min compressed size | ${zlibMetrics.minCompressedSize.compressedStats()} | ${zstdSmallMetrics.minCompressedSize.compressedStats()} | ${zstdBigMetrics.minCompressedSize.compressedStats()} |
| Average compressed size (B) | ${zlibMetrics.averageCompressedSize} | ${zstdSmallMetrics.averageCompressedSize} | ${zstdBigMetrics.averageCompressedSize} |
| Median compressed size (B) | ${zlibMetrics.medianCompressedSize} | ${zstdSmallMetrics.medianCompressedSize} | ${zstdBigMetrics.medianCompressedSize} |
| Max compressed size | ${zlibMetrics.maxCompressedSize.compressedStats()} | ${zstdSmallMetrics.maxCompressedSize.compressedStats()} | ${zstdBigMetrics.maxCompressedSize.compressedStats()} |
| Min decompressed size | ${zlibMetrics.minDecompressedSize.decompressedStats()} | ${zstdSmallMetrics.minDecompressedSize.decompressedStats()} | ${zstdBigMetrics.minDecompressedSize.decompressedStats()} |
| Average decompressed size (B) | ${zlibMetrics.averageDecompressedSize} | ${zstdSmallMetrics.averageDecompressedSize} | ${zstdBigMetrics.averageDecompressedSize} |
| Median decompressed size (B) | ${zlibMetrics.medianDecompressedSize} | ${zstdSmallMetrics.medianDecompressedSize} | ${zstdBigMetrics.medianDecompressedSize} |
| Max decompressed size | ${zlibMetrics.maxDecompressedSize.decompressedStats()} | ${zstdSmallMetrics.maxDecompressedSize.decompressedStats()} | ${zstdBigMetrics.maxDecompressedSize.decompressedStats()} |
""".trimIndent())
}

private fun CoroutineScope.processShard(shards: List<List<Path>>, metrics: DecompressionMetrics) {
var i = 0
for (shard in shards) {
val shardId = i++

shard.forEachIndexed { logIndex, logFile ->
launch {
logger.info { "Reading shard $shardId file $logIndex" }

val logIndexByte = logIndex.toUByte()
readLogFile(logFile, logIndexByte) { entry ->
// Take only those with a throughput of 100 bytes per microsecond
if (entry.throughput <= minThroughput) return@readLogFile
metrics.accept(entry)
}
}
}
}
}

private fun Int.pretty(): String {
return toLong().pretty()
}

private fun Long.pretty(): String {
val str = toString()
if (str.length <= 3) return str

return buildString {
str.reversed().forEachIndexed { index, ch ->
if (index != 0 && index % 3 == 0) {
append('_')
}
append(ch)
}
}.reversed()
}

private fun Int.prettySize(): String {
return toLong().prettySize()
}

private fun Long.prettySize(): String {
if (this > 10000) {
var prettySize = this.toDouble() / 1024.0
var prettyUnit = "KB"
if (prettySize > 1024.0) {
prettySize /= 1024.0
prettyUnit = "MB"
}
if (prettySize > 1024.0) {
prettySize /= 1024.0
prettyUnit = "GB"
}
return "$this B (${"%.1f".format(prettySize)} ${prettyUnit})"
} else {
return "$this B"
}
}

private fun readLogFile(logFile: Path, logIndex: UByte, entryConsumer: (DecompressionEntry) -> Unit) {
logFile.inputStream().buffered().let(::DataInputStream).use { input ->
var entryIndex = 0u
var available = input.available()
while (available > 0) {
val timeToDecompress = input.readLong()
val compressedSize = input.readInt()
val decompressedSize = input.readInt()
input.skipBytes(2) // Separator

entryConsumer(DecompressionEntry(logIndex, entryIndex++, timeToDecompress.nanoseconds, compressedSize, decompressedSize))

available -= entrySize
if (available <= 0) {
available = input.available()
}
}
}
}

private fun readLogThroughputs(logFile: Path, consumer: (throughput: Double) -> Unit) {
logFile.inputStream().buffered().let(::DataInputStream).use { input ->
var available = input.available()
while (available > 0) {
val timeToDecompress = input.readLong()
input.skipBytes(4) // Compressed size
val decompressedSize = input.readInt()
input.skipBytes(2) // Separator

consumer(decompressedSize / timeToDecompress.toDouble() * 1000.0)

available -= entrySize
if (available <= 0) {
available = input.available()
}
}
}
}
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ include(":api")
include(":test-data")
include(":test-data-generator")
include(":benchmarks", ":benchmarks:results-converter")
include(":live-metrics-processor")