diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6abcd01..a16dc9f 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,9 +6,11 @@ jreleaser = "org.jreleaser:org.jreleaser.gradle.plugin:1.20.0" jspecify = "org.jspecify:jspecify:1.0.0" junit = "org.junit.jupiter:junit-jupiter:5.14.0" junit-launcher = "org.junit.platform:junit-platform-launcher:1.14.0" +kotlin-logging-jvm = "io.github.oshai:kotlin-logging-jvm:7.0.13" kotlinx-coroutines-core = "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2" slf4j = "org.slf4j:slf4j-api:2.0.17" logback-classic = "ch.qos.logback:logback-classic:1.5.21" +trove4j-core = "net.sf.trove4j:core:3.1.0" zstd-jni = "com.github.luben:zstd-jni:1.5.7-6" [bundles] diff --git a/live-metrics-processor/build.gradle.kts b/live-metrics-processor/build.gradle.kts new file mode 100644 index 0000000..856d685 --- /dev/null +++ b/live-metrics-processor/build.gradle.kts @@ -0,0 +1,11 @@ +plugins { + `java-conventions` + kotlin("jvm") version "2.3.0" +} + +dependencies { + implementation(libs.kotlinx.coroutines.core) + implementation(libs.trove4j.core) + implementation(libs.kotlin.logging.jvm) + implementation(libs.logback.classic) +} diff --git a/live-metrics-processor/src/main/kotlin/DecompressionEntry.kt b/live-metrics-processor/src/main/kotlin/DecompressionEntry.kt new file mode 100644 index 0000000..efe40f7 --- /dev/null +++ b/live-metrics-processor/src/main/kotlin/DecompressionEntry.kt @@ -0,0 +1,7 @@ +import kotlin.time.Duration + +data class DecompressionEntry(val logIndex: UByte, val entryIndex: UInt, val timeToDecompress: Duration, val compressedSize: Int, val decompressedSize: Int) + +/** B/µs */ +val DecompressionEntry.throughput: Double + get() = decompressedSize / timeToDecompress.inWholeNanoseconds.toDouble() * 1000.0 diff --git a/live-metrics-processor/src/main/kotlin/DecompressionMetrics.kt b/live-metrics-processor/src/main/kotlin/DecompressionMetrics.kt new file mode 100644 index 0000000..86b1516 --- /dev/null +++ b/live-metrics-processor/src/main/kotlin/DecompressionMetrics.kt @@ -0,0 +1,106 @@ +import gnu.trove.list.array.TIntArrayList +import gnu.trove.list.array.TLongArrayList +import kotlin.time.Duration +import kotlin.time.Duration.Companion.nanoseconds + +class DecompressionMetrics(private val entryAmount: Int) { + var addedEntries = 0 + + var totalCompressed: Long = 0 + var totalDecompressed: Long = 0 + var totalTimeToDecompress: Duration = Duration.ZERO + + lateinit var minDecompressTime: DecompressionEntry + var averageDecompressTime: Duration = Duration.ZERO + val decompressTimes = TLongArrayList(entryAmount) + val medianDecompressTime: Duration get() = decompressTimes[decompressTimes.size() / 2].nanoseconds + lateinit var maxDecompressTime: DecompressionEntry + + // bytes per microsecond + var minThroughput: Double = 0.0 + var averageThroughput: Double = 0.0 + var maxThroughput: Double = 0.0 + + lateinit var minCompressedSize: DecompressionEntry + var averageCompressedSize: Double = 0.0 + val compressedSizes = TIntArrayList(entryAmount) + val medianCompressedSize: Int get() = compressedSizes[compressedSizes.size() / 2] + lateinit var maxCompressedSize: DecompressionEntry + + lateinit var minDecompressedSize: DecompressionEntry + var averageDecompressedSize: Double = 0.0 + val decompressedSizes = TIntArrayList(entryAmount) + val medianDecompressedSize: Int get() = decompressedSizes[decompressedSizes.size() / 2] + lateinit var maxDecompressedSize: DecompressionEntry + + @Synchronized + fun accept(entry: DecompressionEntry) { + val entryIndex = ++addedEntries + + totalCompressed += entry.compressedSize + totalDecompressed += entry.decompressedSize + totalTimeToDecompress += entry.timeToDecompress + + decompressTimes.add(entry.timeToDecompress.inWholeNanoseconds) + decompressedSizes.add(entry.decompressedSize) + compressedSizes.add(entry.compressedSize) + + // https://en.wikipedia.org/wiki/Moving_average#Cumulative_average + fun newAverage(newEntryValue: Duration, currentAverage: Duration): Duration { + return currentAverage + ((newEntryValue - currentAverage) / entryIndex) + } + + fun newAverage(newEntryValue: Double, currentAverage: Double): Double { + return currentAverage + ((newEntryValue - currentAverage) / entryIndex) + } + + if (entryIndex == 1) { + minDecompressTime = entry + maxDecompressTime = entry + averageDecompressTime = entry.timeToDecompress + + minThroughput = entry.throughput + averageThroughput = minThroughput + maxThroughput = minThroughput + + minCompressedSize = entry + maxCompressedSize = entry + averageCompressedSize = entry.compressedSize.toDouble() + + minDecompressedSize = entry + maxDecompressedSize = entry + averageDecompressedSize = entry.decompressedSize.toDouble() + } else { + minDecompressTime = if (minDecompressTime.timeToDecompress > entry.timeToDecompress) entry else minDecompressTime + maxDecompressTime = if (maxDecompressTime.timeToDecompress < entry.timeToDecompress) entry else maxDecompressTime + averageDecompressTime = newAverage(entry.timeToDecompress, averageDecompressTime) + + val newThroughput = entry.throughput + minThroughput = if (minThroughput > newThroughput) newThroughput else minThroughput + maxThroughput = if (maxThroughput < newThroughput) newThroughput else maxThroughput + averageThroughput = newAverage(newThroughput, averageThroughput) + + minCompressedSize = if (minCompressedSize.compressedSize > entry.compressedSize) entry else minCompressedSize + maxCompressedSize = if (maxCompressedSize.compressedSize < entry.compressedSize) entry else maxCompressedSize + averageCompressedSize = newAverage(entry.compressedSize.toDouble(), averageCompressedSize) + + minDecompressedSize = if (minDecompressedSize.decompressedSize > entry.decompressedSize) entry else minDecompressedSize + maxDecompressedSize = if (maxDecompressedSize.decompressedSize < entry.decompressedSize) entry else maxDecompressedSize + averageDecompressedSize = newAverage(entry.decompressedSize.toDouble(), averageDecompressedSize) + } + } + + fun finish() { + if (decompressTimes.size() != entryAmount) { + System.err.println("Mismatched entry amount, found ${decompressTimes.size()}, expected $entryAmount") + } + + decompressTimes.sort() + decompressedSizes.sort() + compressedSizes.sort() + } + + override fun toString(): String { + return "DecompressionMetrics(\n\taddedEntries=$addedEntries,\n\ttotalCompressed=$totalCompressed,\n\ttotalDecompressed=$totalDecompressed,\n\ttotalTimeToDecompress=$totalTimeToDecompress,\n\n\tminDecompressTime=$minDecompressTime,\n\taverageDecompressTime=$averageDecompressTime,\n\tmedianDecompressTime=$medianDecompressTime,\n\tmaxDecompressTime=$maxDecompressTime,\n\n\tminThroughput=$minThroughput,\n\taverageThroughput=$averageThroughput,\n\tmaxThroughput=$maxThroughput,\n\n\tminCompressedSize=$minCompressedSize,\n\taverageCompressedSize=$averageCompressedSize,\n\tmedianCompressedSize=$medianCompressedSize,\n\tmaxCompressedSize=$maxCompressedSize,\n\n\tminDecompressedSize=$minDecompressedSize,\n\taverageDecompressedSize=$averageDecompressedSize,\n\tmedianDecompressedSize=$medianDecompressedSize,\n\tmaxDecompressedSize=$maxDecompressedSize\n)" + } +} diff --git a/live-metrics-processor/src/main/kotlin/Main.kt b/live-metrics-processor/src/main/kotlin/Main.kt new file mode 100644 index 0000000..007e5e5 --- /dev/null +++ b/live-metrics-processor/src/main/kotlin/Main.kt @@ -0,0 +1,219 @@ +import io.github.oshai.kotlinlogging.KotlinLogging +import kotlinx.coroutines.* +import java.io.DataInputStream +import java.nio.file.Files +import java.nio.file.Path +import kotlin.io.path.Path +import kotlin.io.path.inputStream +import kotlin.io.path.name +import kotlin.io.path.walk +import kotlin.time.Duration.Companion.nanoseconds + +private const val entrySize = 8 + 4 + 4 + 2 + +private val logger = KotlinLogging.logger { } + +private val shardFolderRegex = Regex("""shard-(\d+)-(?:zstd|zlib)""") +private val logFileRegex = Regex("""log-(\d+)\.bin""") + +private const val minThroughput: Double = 100.0 + +suspend fun main(args: Array) { + require(args.size == 1) { + "One argument must be present for the logs input directory (decompression-logs)" + } + + val dispatcher = Dispatchers.IO.limitedParallelism(12) + + val zstdSmallShards = arrayListOf>() + val zstdBigShards = arrayListOf>() + val zlibShards = arrayListOf>() + + val logsDirectory = Path(args[0]) + withContext(Dispatchers.IO) { + Files.walk(logsDirectory, 1) + .filter { it.name.matches(shardFolderRegex) } + .sorted(Comparator.comparingInt { shardFolder -> + shardFolderRegex.matchEntire(shardFolder.name)!!.groupValues[1].toInt() + }) + .forEach { shardFolder -> + val shardId = shardFolderRegex.matchEntire(shardFolder.name)!!.groupValues[1].toInt() + val shards = when (shardId % 3) { + 0 -> zstdSmallShards + 1 -> zstdBigShards + 2 -> zlibShards + else -> error("Unhandled shard $shardId") + } + + shardFolder.walk() + .filter { it.name.matches(logFileRegex) } + .sortedBy { logFile -> + // Uses a timestamp not an index + logFileRegex.matchEntire(logFile.name)!!.groupValues[1].toLong() + } + .toList() + .also(shards::add) + } + } + + fun List>.computeEntryCount(): Int { + return sumOf { shard -> + shard.sumOf { logFile -> + var retainedEntries = 0 + readLogThroughputs(logFile) { throughput -> + if (throughput > minThroughput) { + retainedEntries++ + } + } + retainedEntries + } + } + } + + val zstdSmallMetrics = DecompressionMetrics(zstdSmallShards.computeEntryCount()) + val zstdBigMetrics = DecompressionMetrics(zstdBigShards.computeEntryCount()) + val zlibMetrics = DecompressionMetrics(zlibShards.computeEntryCount()) + + coroutineScope { + launch(dispatcher) { processShard(zstdSmallShards, zstdSmallMetrics) } + launch(dispatcher) { processShard(zstdBigShards, zstdBigMetrics) } + launch(dispatcher) { processShard(zlibShards, zlibMetrics) } + } + + coroutineScope { + launch(dispatcher) { zstdSmallMetrics.finish() } + launch(dispatcher) { zstdBigMetrics.finish() } + launch(dispatcher) { zlibMetrics.finish() } + } + + println("zstdSmall = $zstdSmallMetrics") + println("zstdBig = $zstdBigMetrics") + println("zlib = $zlibMetrics") + + fun DecompressionEntry.decompressTimeStats(): String = "$timeToDecompress
${compressedSize.prettySize()} -> ${decompressedSize.prettySize()}" + fun DecompressionEntry.compressedStats(): String = "**${compressedSize.prettySize()}** -> ${decompressedSize.prettySize()}
$timeToDecompress" + fun DecompressionEntry.decompressedStats(): String = "${compressedSize.prettySize()} -> **${decompressedSize.prettySize()}**
$timeToDecompress" + + println(""" + | Stat | Zlib | Zstd (8K buf) | Zstd (128K buf) | + |------|------|---------------|-----------------| + | Entries | ${zlibMetrics.addedEntries.pretty()} | ${zstdSmallMetrics.addedEntries.pretty()} | ${zstdBigMetrics.addedEntries.pretty()} | + | Total compressed | ${zlibMetrics.totalCompressed.prettySize()} | ${zstdSmallMetrics.totalCompressed.prettySize()} | ${zstdBigMetrics.totalCompressed.prettySize()} | + | Total decompressed | ${zlibMetrics.totalDecompressed.prettySize()} | ${zstdSmallMetrics.totalDecompressed.prettySize()} | ${zstdBigMetrics.totalDecompressed.prettySize()} | + | Total time to decompress | ${zlibMetrics.totalTimeToDecompress} | ${zstdSmallMetrics.totalTimeToDecompress} | ${zstdBigMetrics.totalTimeToDecompress} | + | Min decompress time | ${zlibMetrics.minDecompressTime.decompressTimeStats()} | ${zstdSmallMetrics.minDecompressTime.decompressTimeStats()} | ${zstdBigMetrics.minDecompressTime.decompressTimeStats()} | + | Average decompress time | ${zlibMetrics.averageDecompressTime} | ${zstdSmallMetrics.averageDecompressTime} | ${zstdBigMetrics.averageDecompressTime} | + | Median decompress time | ${zlibMetrics.medianDecompressTime} | ${zstdSmallMetrics.medianDecompressTime} | ${zstdBigMetrics.medianDecompressTime} | + | Max decompress time | ${zlibMetrics.maxDecompressTime.decompressTimeStats()} | ${zstdSmallMetrics.maxDecompressTime.decompressTimeStats()} | ${zstdBigMetrics.maxDecompressTime.decompressTimeStats()} | + | Min throughput (B/µs) | ${zlibMetrics.minThroughput} | ${zstdSmallMetrics.minThroughput} | ${zstdBigMetrics.minThroughput} | + | Average throughput (B/µs) | ${zlibMetrics.averageThroughput} | ${zstdSmallMetrics.averageThroughput} | ${zstdBigMetrics.averageThroughput} | + | Max throughput (B/µs) | ${zlibMetrics.maxThroughput} | ${zstdSmallMetrics.maxThroughput} | ${zstdBigMetrics.maxThroughput} | + | Min compressed size | ${zlibMetrics.minCompressedSize.compressedStats()} | ${zstdSmallMetrics.minCompressedSize.compressedStats()} | ${zstdBigMetrics.minCompressedSize.compressedStats()} | + | Average compressed size (B) | ${zlibMetrics.averageCompressedSize} | ${zstdSmallMetrics.averageCompressedSize} | ${zstdBigMetrics.averageCompressedSize} | + | Median compressed size (B) | ${zlibMetrics.medianCompressedSize} | ${zstdSmallMetrics.medianCompressedSize} | ${zstdBigMetrics.medianCompressedSize} | + | Max compressed size | ${zlibMetrics.maxCompressedSize.compressedStats()} | ${zstdSmallMetrics.maxCompressedSize.compressedStats()} | ${zstdBigMetrics.maxCompressedSize.compressedStats()} | + | Min decompressed size | ${zlibMetrics.minDecompressedSize.decompressedStats()} | ${zstdSmallMetrics.minDecompressedSize.decompressedStats()} | ${zstdBigMetrics.minDecompressedSize.decompressedStats()} | + | Average decompressed size (B) | ${zlibMetrics.averageDecompressedSize} | ${zstdSmallMetrics.averageDecompressedSize} | ${zstdBigMetrics.averageDecompressedSize} | + | Median decompressed size (B) | ${zlibMetrics.medianDecompressedSize} | ${zstdSmallMetrics.medianDecompressedSize} | ${zstdBigMetrics.medianDecompressedSize} | + | Max decompressed size | ${zlibMetrics.maxDecompressedSize.decompressedStats()} | ${zstdSmallMetrics.maxDecompressedSize.decompressedStats()} | ${zstdBigMetrics.maxDecompressedSize.decompressedStats()} | + """.trimIndent()) +} + +private fun CoroutineScope.processShard(shards: List>, metrics: DecompressionMetrics) { + var i = 0 + for (shard in shards) { + val shardId = i++ + + shard.forEachIndexed { logIndex, logFile -> + launch { + logger.info { "Reading shard $shardId file $logIndex" } + + val logIndexByte = logIndex.toUByte() + readLogFile(logFile, logIndexByte) { entry -> + // Take only those with a throughput of 100 bytes per microsecond + if (entry.throughput <= minThroughput) return@readLogFile + metrics.accept(entry) + } + } + } + } +} + +private fun Int.pretty(): String { + return toLong().pretty() +} + +private fun Long.pretty(): String { + val str = toString() + if (str.length <= 3) return str + + return buildString { + str.reversed().forEachIndexed { index, ch -> + if (index != 0 && index % 3 == 0) { + append('_') + } + append(ch) + } + }.reversed() +} + +private fun Int.prettySize(): String { + return toLong().prettySize() +} + +private fun Long.prettySize(): String { + if (this > 10000) { + var prettySize = this.toDouble() / 1024.0 + var prettyUnit = "KB" + if (prettySize > 1024.0) { + prettySize /= 1024.0 + prettyUnit = "MB" + } + if (prettySize > 1024.0) { + prettySize /= 1024.0 + prettyUnit = "GB" + } + return "$this B (${"%.1f".format(prettySize)} ${prettyUnit})" + } else { + return "$this B" + } +} + +private fun readLogFile(logFile: Path, logIndex: UByte, entryConsumer: (DecompressionEntry) -> Unit) { + logFile.inputStream().buffered().let(::DataInputStream).use { input -> + var entryIndex = 0u + var available = input.available() + while (available > 0) { + val timeToDecompress = input.readLong() + val compressedSize = input.readInt() + val decompressedSize = input.readInt() + input.skipBytes(2) // Separator + + entryConsumer(DecompressionEntry(logIndex, entryIndex++, timeToDecompress.nanoseconds, compressedSize, decompressedSize)) + + available -= entrySize + if (available <= 0) { + available = input.available() + } + } + } +} + +private fun readLogThroughputs(logFile: Path, consumer: (throughput: Double) -> Unit) { + logFile.inputStream().buffered().let(::DataInputStream).use { input -> + var available = input.available() + while (available > 0) { + val timeToDecompress = input.readLong() + input.skipBytes(4) // Compressed size + val decompressedSize = input.readInt() + input.skipBytes(2) // Separator + + consumer(decompressedSize / timeToDecompress.toDouble() * 1000.0) + + available -= entrySize + if (available <= 0) { + available = input.available() + } + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 108418c..76008de 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -8,3 +8,4 @@ include(":api") include(":test-data") include(":test-data-generator") include(":benchmarks", ":benchmarks:results-converter") +include(":live-metrics-processor")