From 246f77b2758449fe721ed3574ad8b6bef4ca9aa0 Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Tue, 9 Dec 2025 12:06:58 +0200 Subject: [PATCH 1/3] Add RawSource.asFlow transformer - Introduced converter from `RawSource` to Kotlin Flows. - Added `StreamingDecoder` interface and its implementation `DelimitingByteStreamDecoder` for processing byte streams with a specified delimiter. - Added tests for decoder, and flow behavior cases. --- core/build.gradle.kts | 5 +- .../coroutines/DelimitingByteStreamDecoder.kt | 68 ++++ core/common/src/coroutines/SourceFlow.kt | 88 +++++ .../common/src/coroutines/StreamingDecoder.kt | 49 +++ .../DelimitingByteStreamDecoderTest.kt | 231 +++++++++++ .../KeyValueStreamIntegrationTest.kt | 360 ++++++++++++++++++ .../test/coroutines/RawSourceAsFlowTest.kt | 214 +++++++++++ gradle/libs.versions.toml | 1 + 8 files changed, 1014 insertions(+), 2 deletions(-) create mode 100644 core/common/src/coroutines/DelimitingByteStreamDecoder.kt create mode 100644 core/common/src/coroutines/SourceFlow.kt create mode 100644 core/common/src/coroutines/StreamingDecoder.kt create mode 100644 core/common/test/coroutines/DelimitingByteStreamDecoderTest.kt create mode 100644 core/common/test/coroutines/KeyValueStreamIntegrationTest.kt create mode 100644 core/common/test/coroutines/RawSourceAsFlowTest.kt diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 2799ec44..6cee5ced 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -48,9 +48,10 @@ kotlin { sourceSets { commonMain.dependencies { api(project(":kotlinx-io-bytestring")) + api(libs.kotlinx.coroutines.core) } - appleTest.dependencies { - implementation(libs.kotlinx.coroutines.core) + commonTest.dependencies { + implementation(libs.kotlinx.coroutines.test) } } } diff --git a/core/common/src/coroutines/DelimitingByteStreamDecoder.kt b/core/common/src/coroutines/DelimitingByteStreamDecoder.kt new file mode 100644 index 00000000..af418a1d --- /dev/null +++ b/core/common/src/coroutines/DelimitingByteStreamDecoder.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ +package kotlinx.io.coroutines + +import kotlinx.io.Buffer +import kotlinx.io.readByteArray + +/** + * A streaming decoder that reads a continuous stream of bytes and separates it into discrete + * chunks based on a specified delimiter. The default delimiter is the newline character (`'\n'`). + * + * This class buffers incoming byte arrays and emits individual byte arrays once a delimiter is + * encountered. Any remaining bytes in the buffer are emitted when [onClose] is called. + * + * ## Example + * + * ```kotlin + * val decoder = DelimitingByteStreamDecoder() + * val source: RawSource = // ... + * source.asFlow(decoder).collect { line -> + * println("Received: ${line.decodeToString()}") + * } + * ``` + * + * ## Thread Safety + * + * This class is **not thread-safe**. Each instance maintains internal mutable state and must + * not be shared across multiple flows or concurrent coroutines. + * + * ## Lifecycle + * + * After [onClose] is called, this decoder **cannot be reused**. The internal buffer is closed + * and the decoder should be discarded. + * + * @property delimiter The byte value used as a delimiter to separate the stream into chunks. + * Defaults to the newline character (`'\n'`). + */ +public class DelimitingByteStreamDecoder( + public val delimiter: Byte = '\n'.code.toByte(), +) : StreamingDecoder { + + private val buffer = Buffer() + + override suspend fun decode(bytes: ByteArray, byteConsumer: suspend (ByteArray) -> Unit) { + var startIndex = 0 + for (i in bytes.indices) { + if (bytes[i] == delimiter) { + buffer.write(bytes, startIndex, i) + // flush and clear buffer + byteConsumer.invoke(buffer.readByteArray()) + startIndex = i + 1 + } + } + // Buffer any remaining bytes after the last delimiter + if (startIndex < bytes.size) { + buffer.write(bytes, startIndex, bytes.size) + } + } + + override suspend fun onClose(byteConsumer: suspend (ByteArray) -> Unit) { + if (buffer.size > 0) { + byteConsumer.invoke(buffer.readByteArray()) + } + buffer.close() + } +} \ No newline at end of file diff --git a/core/common/src/coroutines/SourceFlow.kt b/core/common/src/coroutines/SourceFlow.kt new file mode 100644 index 00000000..f3ff51d2 --- /dev/null +++ b/core/common/src/coroutines/SourceFlow.kt @@ -0,0 +1,88 @@ +/* + * Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ +package kotlinx.io.coroutines + +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.withContext +import kotlinx.coroutines.yield +import kotlinx.io.Buffer +import kotlinx.io.IOException +import kotlinx.io.RawSource +import kotlinx.io.readByteArray + +public const val READ_BUFFER_SIZE: Long = 8196 + +/** + * Converts this [RawSource] into a Kotlin [Flow], emitting decoded data using the provided [StreamingDecoder]. + * + * This function reads data from the source in chunks, decodes it using the provided decoder, and emits + * the decoded elements downstream. The returned flow is cold and will start reading from the source + * when collected. + * + * ## Lifecycle and Resource Management + * + * - The source is automatically closed when the flow completes, fails, or is cancelled + * - The decoder's [StreamingDecoder.onClose] is always called for cleanup, even on cancellation + * - On normal completion or [IOException], any remaining buffered data in the decoder is emitted + * - On cancellation, the decoder is cleaned up but remaining data is discarded + * + * ## Backpressure + * + * The flow respects structured concurrency and backpressure. Reading from the source is suspended + * when the downstream collector cannot keep up. + * + * @param T The type of elements emitted by the Flow after decoding. + * @param decoder The [StreamingDecoder] used to decode data read from this source. + * @param readBufferSize The size of the buffer used for reading from the source. Defaults to [READ_BUFFER_SIZE]. + * @return A cold [Flow] that emits decoded elements of type [T]. + * @throws IOException if an I/O error occurs while reading from the source. + */ +public fun RawSource.asFlow( + decoder: StreamingDecoder, + readBufferSize: Long = READ_BUFFER_SIZE +): Flow = + channelFlow { + val source = this@asFlow + val buffer = Buffer() + var decoderClosed = false + try { + source.use { source -> + while (isActive) { + val bytesRead = source.readAtMostTo(buffer, readBufferSize) + if (bytesRead == -1L) { + break + } + + if (bytesRead > 0L) { + val bytes = buffer.readByteArray() + buffer.clear() + decoder.decode(bytes) { + send(it) + } + } + + yield() // Giving other coroutines a chance to run + } + } + // Normal completion: emit any remaining buffered data + decoder.onClose { send(it) } + decoderClosed = true + } catch (exception: IOException) { + // IO error: try to emit remaining data, then close with error + runCatching { decoder.onClose { send(it) } }.onSuccess { decoderClosed = true } + throw exception + } finally { + // Ensure decoder cleanup even on cancellation or other exceptions + if (!decoderClosed) { + withContext(NonCancellable) { + runCatching { decoder.onClose { /* discard data, cleanup only */ } } + } + } + buffer.clear() + } + } \ No newline at end of file diff --git a/core/common/src/coroutines/StreamingDecoder.kt b/core/common/src/coroutines/StreamingDecoder.kt new file mode 100644 index 00000000..cb169294 --- /dev/null +++ b/core/common/src/coroutines/StreamingDecoder.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ +package kotlinx.io.coroutines + +/** + * A generic interface for decoding a stream of bytes into discrete elements of type [T]. + * + * Implementations of this interface are responsible for processing input byte arrays, decoding + * them into meaningful elements, and delivering them to the provided `byteConsumer` function in + * sequential order. This allows for efficient handling of streaming data and enables + * processing without requiring the entire stream to be loaded into memory. + * + * ## Lifecycle + * + * The decoder processes a stream through repeated calls to [decode], followed by a final call + * to [onClose] when the stream ends. After [onClose] is called, the decoder should not be reused. + * + * ## Thread Safety + * + * Implementations are not required to be thread-safe. Each decoder instance should be used with + * a single stream and should not be shared across concurrent coroutines. + * + * @param T The type of elements produced by the decoder. + */ +public interface StreamingDecoder { + /** + * Decodes a chunk of bytes from the input stream. + * + * This method may be called multiple times as data arrives. Implementations should buffer + * incomplete elements internally and emit complete elements via [byteConsumer]. + * + * @param bytes The input byte array to decode. + * @param byteConsumer A suspend function that receives decoded elements. + */ + public suspend fun decode(bytes: ByteArray, byteConsumer: suspend (T) -> Unit) + + /** + * Called when the input stream ends, allowing the decoder to emit any remaining buffered data + * and perform cleanup. + * + * After this method is called, the decoder should not be used again. + * + * @param byteConsumer A suspend function that receives any final decoded elements. + */ + public suspend fun onClose(byteConsumer: suspend (T) -> Unit) +} + diff --git a/core/common/test/coroutines/DelimitingByteStreamDecoderTest.kt b/core/common/test/coroutines/DelimitingByteStreamDecoderTest.kt new file mode 100644 index 00000000..7b4bc573 --- /dev/null +++ b/core/common/test/coroutines/DelimitingByteStreamDecoderTest.kt @@ -0,0 +1,231 @@ +/* + * Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io.coroutines + +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertContentEquals +import kotlin.test.assertEquals + +class DelimitingByteStreamDecoderTest { + + @Test + fun singleMessageWithNewline() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + decoder.decode("hello\n".encodeToByteArray()) { results.add(it) } + + assertEquals(1, results.size) + assertContentEquals("hello".encodeToByteArray(), results[0]) + } + + @Test + fun multipleMessagesInSingleBatch() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + decoder.decode("first\nsecond\nthird\n".encodeToByteArray()) { results.add(it) } + + assertEquals(3, results.size) + assertContentEquals("first".encodeToByteArray(), results[0]) + assertContentEquals("second".encodeToByteArray(), results[1]) + assertContentEquals("third".encodeToByteArray(), results[2]) + } + + @Test + fun fragmentedMessageAcrossMultipleCalls() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + // Message split across three decode calls + decoder.decode("hel".encodeToByteArray()) { results.add(it) } + assertEquals(0, results.size, "No complete message yet") + + decoder.decode("lo wor".encodeToByteArray()) { results.add(it) } + assertEquals(0, results.size, "Still no complete message") + + decoder.decode("ld\n".encodeToByteArray()) { results.add(it) } + assertEquals(1, results.size) + assertContentEquals("hello world".encodeToByteArray(), results[0]) + } + + @Test + fun partialMessageFollowedByCompleteMessages() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + // First call: incomplete message + decoder.decode("partial".encodeToByteArray()) { results.add(it) } + assertEquals(0, results.size) + + // Second call: complete the first and add two more + decoder.decode(" message\nfull message\nanother\n".encodeToByteArray()) { results.add(it) } + + assertEquals(3, results.size) + assertContentEquals("partial message".encodeToByteArray(), results[0]) + assertContentEquals("full message".encodeToByteArray(), results[1]) + assertContentEquals("another".encodeToByteArray(), results[2]) + } + + @Test + fun emptyMessages() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + decoder.decode("\n\n\n".encodeToByteArray()) { results.add(it) } + + assertEquals(3, results.size) + assertEquals(0, results[0].size, "Empty message should produce empty byte array") + assertEquals(0, results[1].size) + assertEquals(0, results[2].size) + } + + @Test + fun noDelimiterLeavesDataInBuffer() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + // Data without delimiter stays in buffer + decoder.decode("no delimiter here".encodeToByteArray()) { results.add(it) } + assertEquals(0, results.size, "Should buffer incomplete message") + + // Adding delimiter flushes the buffer + decoder.decode(" - now it ends\n".encodeToByteArray()) { results.add(it) } + assertEquals(1, results.size) + assertContentEquals("no delimiter here - now it ends".encodeToByteArray(), results[0]) + } + + @Test + fun customDelimiter() = runTest { + val decoder = DelimitingByteStreamDecoder(delimiter = '|'.code.toByte()) + val results = mutableListOf() + + decoder.decode("first|second|third|".encodeToByteArray()) { results.add(it) } + + assertEquals(3, results.size) + assertContentEquals("first".encodeToByteArray(), results[0]) + assertContentEquals("second".encodeToByteArray(), results[1]) + assertContentEquals("third".encodeToByteArray(), results[2]) + } + + @Test + fun emptyInput() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + decoder.decode(ByteArray(0)) { results.add(it) } + + assertEquals(0, results.size) + } + + @Test + fun singleDelimiterOnly() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + decoder.decode("\n".encodeToByteArray()) { results.add(it) } + + assertEquals(1, results.size) + assertEquals(0, results[0].size) + } + + @Test + fun byteByByteDecoding() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + val message = "test\n" + + // Decode one byte at a time + message.forEach { char -> + decoder.decode(byteArrayOf(char.code.toByte())) { results.add(it) } + } + + assertEquals(1, results.size) + assertContentEquals("test".encodeToByteArray(), results[0]) + } + + @Test + fun largeMessage() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + val largeContent = "x".repeat(10000) + + decoder.decode("$largeContent\n".encodeToByteArray()) { results.add(it) } + + assertEquals(1, results.size) + assertContentEquals(largeContent.encodeToByteArray(), results[0]) + } + + @Test + fun mixedCompleteAndIncompleteMessages() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + // Complete message followed by incomplete + decoder.decode("complete\nincomp".encodeToByteArray()) { results.add(it) } + assertEquals(1, results.size) + assertContentEquals("complete".encodeToByteArray(), results[0]) + + // Complete the incomplete message and add new incomplete + decoder.decode("lete\nanother incomplete".encodeToByteArray()) { results.add(it) } + assertEquals(2, results.size) + assertContentEquals("incomplete".encodeToByteArray(), results[1]) + + // Finish with complete message + decoder.decode(" message\nfinal\n".encodeToByteArray()) { results.add(it) } + assertEquals(4, results.size) + assertContentEquals("another incomplete message".encodeToByteArray(), results[2]) + assertContentEquals("final".encodeToByteArray(), results[3]) + } + + @Test + fun delimiterAtBufferBoundary() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + // Delimiter exactly at the end of input + decoder.decode("message".encodeToByteArray()) { results.add(it) } + decoder.decode("\n".encodeToByteArray()) { results.add(it) } + + assertEquals(1, results.size) + assertContentEquals("message".encodeToByteArray(), results[0]) + } + + @Test + fun consecutiveDelimiters() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + decoder.decode("a\n\n\nb".encodeToByteArray()) { results.add(it) } + + // Should produce: "a", "", "", and buffered "b" + assertEquals(3, results.size) + assertContentEquals("a".encodeToByteArray(), results[0]) + assertEquals(0, results[1].size) + assertEquals(0, results[2].size) + + // Complete the last message + decoder.decode("\n".encodeToByteArray()) { results.add(it) } + assertEquals(4, results.size) + assertContentEquals("b".encodeToByteArray(), results[3]) + } + + @Test + fun binaryDataWithNewlineDelimiter() = runTest { + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + // Binary data that happens to contain our delimiter byte + val binaryData = byteArrayOf(0x00, 0x01, 0x02, '\n'.code.toByte(), 0x03, 0x04, '\n'.code.toByte()) + + decoder.decode(binaryData) { results.add(it) } + + assertEquals(2, results.size) + assertContentEquals(byteArrayOf(0x00, 0x01, 0x02), results[0]) + assertContentEquals(byteArrayOf(0x03, 0x04), results[1]) + } +} diff --git a/core/common/test/coroutines/KeyValueStreamIntegrationTest.kt b/core/common/test/coroutines/KeyValueStreamIntegrationTest.kt new file mode 100644 index 00000000..51a32382 --- /dev/null +++ b/core/common/test/coroutines/KeyValueStreamIntegrationTest.kt @@ -0,0 +1,360 @@ +/* + * Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io.coroutines + +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import kotlinx.io.Buffer +import kotlinx.io.Source +import kotlinx.io.writeString +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class KeyValueStreamIntegrationTest { + + private fun parseKeyValue(line: String): Pair { + val parts = line.split(":", limit = 2) + return if (parts.size == 2) { + parts[0].trim() to parts[1].trim() + } else { + parts[0].trim() to "" + } + } + + @Test + fun basicKeyValueStreaming() = runTest { + val source = Buffer().apply { + writeString("name:Alice\n") + writeString("age:30\n") + writeString("city:NewYork\n") + } + + val decoder = DelimitingByteStreamDecoder() + + val entries = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + + assertEquals(3, entries.size) + assertEquals("name" to "Alice", entries[0]) + assertEquals("age" to "30", entries[1]) + assertEquals("city" to "NewYork", entries[2]) + } + + @Test + fun filteringKeyValueStream() = runTest { + val source = Buffer().apply { + writeString("metric:cpu\n") + writeString("value:45.2\n") + writeString("metric:memory\n") + writeString("value:2048\n") + writeString("metric:disk\n") + writeString("value:512\n") + } + + val decoder = DelimitingByteStreamDecoder() + + // Filter only metric keys + val metrics = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .filter { (key, _) -> key == "metric" } + .toList() + + assertEquals(3, metrics.size) + assertEquals("cpu", metrics[0].second) + assertEquals("memory", metrics[1].second) + assertEquals("disk", metrics[2].second) + } + + @Test + fun largeKeyValueStream() = runTest { + val source = Buffer().apply { + repeat(1000) { index -> + writeString("record_$index:value_$index\n") + } + } + + val decoder = DelimitingByteStreamDecoder() + + val entries = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + + assertEquals(1000, entries.size) + assertEquals("record_0" to "value_0", entries[0]) + assertEquals("record_500" to "value_500", entries[500]) + assertEquals("record_999" to "value_999", entries[999]) + } + + @Test + fun keyValueStreamWithEmptyLines() = runTest { + val source = Buffer().apply { + writeString("status:starting\n") + writeString("\n") // Empty line + writeString("status:running\n") + writeString("\n") // Another empty line + writeString("status:complete\n") + } + + val decoder = DelimitingByteStreamDecoder() + + val entries = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .filter { it.isNotBlank() } // Filter empty lines + .map { line -> parseKeyValue(line) } + .toList() + + assertEquals(3, entries.size) + assertEquals("starting", entries[0].second) + assertEquals("running", entries[1].second) + assertEquals("complete", entries[2].second) + } + + @Test + fun valueWithColons() = runTest { + val source = Buffer().apply { + writeString("timestamp:2025-12-09T10:00:00Z\n") + writeString("url:https://example.com:8080/path\n") + writeString("message:Error: Connection failed\n") + } + + val decoder = DelimitingByteStreamDecoder() + + val entries = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + + assertEquals(3, entries.size) + assertEquals("timestamp" to "2025-12-09T10:00:00Z", entries[0]) + assertEquals("url" to "https://example.com:8080/path", entries[1]) + assertEquals("message" to "Error: Connection failed", entries[2]) + } + + @Test + fun buildingMapFromStream() = runTest { + val source = Buffer().apply { + writeString("host:localhost\n") + writeString("port:8080\n") + writeString("protocol:https\n") + writeString("timeout:30\n") + writeString("retries:3\n") + } + + val decoder = DelimitingByteStreamDecoder() + + val config = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + .toMap() + + assertEquals(5, config.size) + assertEquals("localhost", config["host"]) + assertEquals("8080", config["port"]) + assertEquals("https", config["protocol"]) + assertEquals("30", config["timeout"]) + assertEquals("3", config["retries"]) + } + + @Test + fun multipleSourcesAggregation() = runTest { + fun createMetricSource(sourceId: String, count: Int): Source = Buffer().apply { + repeat(count) { index -> + writeString("source:$sourceId\n") + writeString("seq:$index\n") + } + } + + val decoder1 = DelimitingByteStreamDecoder() + val decoder2 = DelimitingByteStreamDecoder() + + val source1Data = createMetricSource("server1", 10) + .asFlow(decoder1, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + + val source2Data = createMetricSource("server2", 15) + .asFlow(decoder2, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + + assertEquals(20, source1Data.size) // 10 source + 10 seq + assertEquals(30, source2Data.size) // 15 source + 15 seq + + val allData = source1Data + source2Data + assertEquals(50, allData.size) + } + + @Test + fun streamingCounterMetrics() = runTest { + val source = Buffer().apply { + writeString("requests:100\n") + writeString("errors:5\n") + writeString("requests:150\n") + writeString("errors:8\n") + writeString("requests:200\n") + writeString("errors:3\n") + } + + val decoder = DelimitingByteStreamDecoder() + + val metrics = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + + // Sum all requests + val totalRequests = metrics + .filter { (key, _) -> key == "requests" } + .sumOf { (_, value) -> value.toIntOrNull() ?: 0 } + + // Sum all errors + val totalErrors = metrics + .filter { (key, _) -> key == "errors" } + .sumOf { (_, value) -> value.toIntOrNull() ?: 0 } + + assertEquals(450, totalRequests) + assertEquals(16, totalErrors) + } + + @Test + fun veryLongValues() = runTest { + val longValue = "x".repeat(10000) + val source = Buffer().apply { + writeString("short:value\n") + writeString("long:$longValue\n") + writeString("another:data\n") + } + + val decoder = DelimitingByteStreamDecoder() + + val entries = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + + assertEquals(3, entries.size) + assertEquals("short" to "value", entries[0]) + assertEquals("long", entries[1].first) + assertEquals(10000, entries[1].second.length) + assertEquals("another" to "data", entries[2]) + } + + @Test + fun keyValuePipelineWithTransformation() = runTest { + val source = Buffer().apply { + writeString("temperature:20.5\n") + writeString("humidity:65\n") + writeString("temperature:22.3\n") + writeString("humidity:70\n") + writeString("temperature:19.8\n") + writeString("humidity:60\n") + } + + val decoder = DelimitingByteStreamDecoder() + + data class Measurement(val type: String, val value: Double) + + val measurements = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .map { (key, value) -> Measurement(key, value.toDoubleOrNull() ?: 0.0) } + .toList() + + assertEquals(6, measurements.size) + + // Calculate average temperature + val avgTemp = measurements + .filter { it.type == "temperature" } + .map { it.value } + .average() + + assertTrue(avgTemp > 20.0 && avgTemp < 22.0) + + // Calculate average humidity + val avgHumidity = measurements + .filter { it.type == "humidity" } + .map { it.value } + .average() + + assertEquals(65.0, avgHumidity) + } + + @Test + fun propertyFileStyleParsing() = runTest { + // Similar to Java .properties files + val source = Buffer().apply { + writeString("app.name:MyApplication\n") + writeString("app.version:1.0.0\n") + writeString("app.debug:true\n") + writeString("db.host:localhost\n") + writeString("db.port:5432\n") + writeString("db.name:mydb\n") + writeString("db.user:admin\n") + } + + val decoder = DelimitingByteStreamDecoder() + + val properties = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + .toMap() + + // Verify app properties + assertEquals("MyApplication", properties["app.name"]) + assertEquals("1.0.0", properties["app.version"]) + assertEquals("true", properties["app.debug"]) + + // Verify db properties + val dbProperties = properties.filterKeys { it.startsWith("db.") } + assertEquals(4, dbProperties.size) + assertEquals("localhost", dbProperties["db.host"]) + assertEquals("5432", dbProperties["db.port"]) + } + + @Test + fun eventStreamProcessing() = runTest { + val source = Buffer().apply { + writeString("event:login\n") + writeString("user:alice\n") + writeString("event:purchase\n") + writeString("amount:99.99\n") + writeString("event:logout\n") + writeString("user:alice\n") + writeString("event:login\n") + writeString("user:bob\n") + } + + val decoder = DelimitingByteStreamDecoder() + + val entries = source.asFlow(decoder, READ_BUFFER_SIZE) + .map { bytes -> bytes.decodeToString() } + .map { line -> parseKeyValue(line) } + .toList() + + // Count events + val events = entries.filter { (key, _) -> key == "event" } + assertEquals(4, events.size) + + // Get all event types + val eventTypes = events.map { (_, value) -> value }.toSet() + assertEquals(setOf("login", "purchase", "logout"), eventTypes) + + // Count logins + val loginCount = events.count { (_, value) -> value == "login" } + assertEquals(2, loginCount) + } +} diff --git a/core/common/test/coroutines/RawSourceAsFlowTest.kt b/core/common/test/coroutines/RawSourceAsFlowTest.kt new file mode 100644 index 00000000..fb2246ba --- /dev/null +++ b/core/common/test/coroutines/RawSourceAsFlowTest.kt @@ -0,0 +1,214 @@ +/* + * Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io.coroutines + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.test.runTest +import kotlinx.io.Buffer +import kotlinx.io.IOException +import kotlinx.io.RawSource +import kotlinx.io.writeString +import kotlin.test.Test +import kotlin.test.assertContentEquals +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class RawSourceAsFlowTest { + + @Test + fun consumeDelimitedStream() = runTest { + val source = Buffer().apply { + writeString("line1\nline2\nline3") + } + + val decoder = DelimitingByteStreamDecoder() + val results = source.asFlow(decoder, READ_BUFFER_SIZE).toList() + + assertEquals(3, results.size) + assertContentEquals("line1".encodeToByteArray(), results[0]) + assertContentEquals("line2".encodeToByteArray(), results[1]) + assertContentEquals("line3".encodeToByteArray(), results[2]) + } + + @Test + fun emptySource() = runTest { + val source = Buffer() + val decoder = DelimitingByteStreamDecoder() + val results = source.asFlow(decoder, READ_BUFFER_SIZE).toList() + + assertEquals(0, results.size) + } + + @Test + fun largeStreamWithMultipleChunks() = runTest { + // Create a large source that will require multiple read operations + val source = Buffer().apply { + repeat(1000) { i -> + writeString("message_$i\n") + } + } + + val decoder = DelimitingByteStreamDecoder() + val results = source.asFlow(decoder, READ_BUFFER_SIZE).toList() + + assertEquals(1000, results.size) + results.forEachIndexed { index, bytes -> + assertContentEquals("message_$index".encodeToByteArray(), bytes) + } + } + + @Test + fun flowCancellationClosesSource() = runTest { + var sourceClosed = false + val source = object : RawSource { + val buffer = Buffer().apply { + repeat(100) { writeString("data\n") } + } + + override fun readAtMostTo(sink: Buffer, byteCount: Long): Long { + return buffer.readAtMostTo(sink, byteCount) + } + + override fun close() { + sourceClosed = true + } + } + + val decoder = DelimitingByteStreamDecoder() + + // Take only the first 5 items, cancelling the flow early + val results = source.asFlow(decoder, READ_BUFFER_SIZE).take(5).toList() + + assertEquals(5, results.size) + assertTrue(sourceClosed, "Source should be closed when flow is cancelled") + } + + @Test + fun ioExceptionPropagatedAsFlowFailure() = runTest { + val expectedMessage = "Simulated I/O error" + val failingSource = object : RawSource { + override fun readAtMostTo(sink: Buffer, byteCount: Long): Long { + throw IOException(expectedMessage) + } + + override fun close() { + // no-op + } + } + + val decoder = DelimitingByteStreamDecoder() + var caughtException: Throwable? = null + + failingSource.asFlow(decoder) + .catch { e -> caughtException = e } + .toList() + + assertTrue(caughtException is IOException) + assertEquals(expectedMessage, caughtException?.message) + } + + @Test + fun partialMessageHandlingAcrossReadBoundaries() = runTest { + // Create a source where messages span across READ_BUFFER_SIZE boundaries + val longMessage = "x".repeat(READ_BUFFER_SIZE.toInt() + 100) + val source = Buffer().apply { + writeString("short\n") + writeString("$longMessage\n") + writeString("another\n") + } + + val decoder = DelimitingByteStreamDecoder() + val results = source.asFlow(decoder, READ_BUFFER_SIZE).toList() + + assertEquals(3, results.size) + assertContentEquals("short".encodeToByteArray(), results[0]) + assertContentEquals(longMessage.encodeToByteArray(), results[1]) + assertContentEquals("another".encodeToByteArray(), results[2]) + } + + @Test + fun customDecoderWithCustomLogic() = runTest { + // Custom decoder that counts bytes and emits the count + val countingDecoder = object : StreamingDecoder { + override suspend fun decode(bytes: ByteArray, byteConsumer: suspend (Int) -> Unit) { + byteConsumer(bytes.size) + } + + override suspend fun onClose(byteConsumer: suspend (Int) -> Unit) { + byteConsumer(0) + } + } + + val source = Buffer().apply { + write(ByteArray(100)) + write(ByteArray(200)) + write(ByteArray(150)) + } + + val results = source.asFlow(countingDecoder, READ_BUFFER_SIZE).toList() + + assertTrue(results.isNotEmpty()) + assertEquals(results.sum(), 450) + } + + @Test + fun sourceExhaustionHandledGracefully() = runTest { + val source = Buffer().apply { + writeString("first\nsecond\n") + } + + val decoder = DelimitingByteStreamDecoder() + + // Collect flow twice to ensure it properly handles exhaustion + val firstCollection = source.asFlow(decoder, READ_BUFFER_SIZE).toList() + + assertEquals(2, firstCollection.size) + + // Source is now exhausted, second collection should be empty + assertEquals(0, source.asFlow(decoder, READ_BUFFER_SIZE).toList().size) + } + + @Test + fun coroutineCancellationDuringRead() = runTest { + val source = Buffer().apply { + repeat(1000) { writeString("message $it\n") } + } + + val decoder = DelimitingByteStreamDecoder() + val results = mutableListOf() + + try { + source.asFlow(decoder, READ_BUFFER_SIZE).collect { bytes -> + results.add(bytes) + if (results.size == 10) { + throw CancellationException("Test cancellation") + } + } + } catch (_: CancellationException) { + // Expected + } + + assertEquals(10, results.size, "Should collect exactly 10 items before cancellation") + } + + @Test + fun veryLargeMessagesHandledCorrectly() = runTest { + // Message larger than multiple READ_BUFFER_SIZE chunks + val veryLargeMessage = "A".repeat(READ_BUFFER_SIZE.toInt() * 3) + val source = Buffer().apply { + writeString("$veryLargeMessage\n") + } + + val decoder = DelimitingByteStreamDecoder() + val results = source.asFlow(decoder, READ_BUFFER_SIZE).toList() + + assertEquals(1, results.size) + assertContentEquals(veryLargeMessage.encodeToByteArray(), results[0]) + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 72367a19..58aa62dd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -18,6 +18,7 @@ kotlin-gradle-plugin = { group = "org.jetbrains.kotlin", name = "kotlin-gradle-p dokka-gradle-plugin = { group = "org.jetbrains.dokka", name = "dokka-gradle-plugin", version.ref = "dokka" } power-assert-plugin = { group = "org.jetbrains.kotlin.plugin.power-assert", name = "org.jetbrains.kotlin.plugin.power-assert.gradle.plugin", version.ref = "kotlin" } kotlinx-coroutines-core = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-core", version.ref = "coroutines" } +kotlinx-coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutines" } animalsniffer-gradle-plugin = { group = "ru.vyarus", name = "gradle-animalsniffer-plugin", version.ref = "animalsniffer" } okio = { group = "com.squareup.okio", name = "okio", version.ref = "okio" } From bddb29737d639b19d16a999cb98a262c3e282d23 Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Wed, 10 Dec 2025 00:04:58 +0200 Subject: [PATCH 2/3] Introduce kotlinx-io-coroutines module --- core/build.gradle.kts | 5 +- coroutines/api/kotlinx-io-coroutines.api | 20 ++++++++ coroutines/build.gradle.kts | 49 +++++++++++++++++++ .../coroutines/DelimitingByteStreamDecoder.kt | 0 .../kotlin}/coroutines/SourceFlow.kt | 0 .../kotlin}/coroutines/StreamingDecoder.kt | 0 .../DelimitingByteStreamDecoderTest.kt | 0 .../KeyValueStreamIntegrationTest.kt | 0 .../kotlin}/coroutines/RawSourceAsFlowTest.kt | 0 settings.gradle.kts | 2 + 10 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 coroutines/api/kotlinx-io-coroutines.api create mode 100644 coroutines/build.gradle.kts rename {core/common/src => coroutines/src/commonMain/kotlin}/coroutines/DelimitingByteStreamDecoder.kt (100%) rename {core/common/src => coroutines/src/commonMain/kotlin}/coroutines/SourceFlow.kt (100%) rename {core/common/src => coroutines/src/commonMain/kotlin}/coroutines/StreamingDecoder.kt (100%) rename {core/common/test => coroutines/src/commonTest/kotlin}/coroutines/DelimitingByteStreamDecoderTest.kt (100%) rename {core/common/test => coroutines/src/commonTest/kotlin}/coroutines/KeyValueStreamIntegrationTest.kt (100%) rename {core/common/test => coroutines/src/commonTest/kotlin}/coroutines/RawSourceAsFlowTest.kt (100%) diff --git a/core/build.gradle.kts b/core/build.gradle.kts index 6cee5ced..2799ec44 100644 --- a/core/build.gradle.kts +++ b/core/build.gradle.kts @@ -48,10 +48,9 @@ kotlin { sourceSets { commonMain.dependencies { api(project(":kotlinx-io-bytestring")) - api(libs.kotlinx.coroutines.core) } - commonTest.dependencies { - implementation(libs.kotlinx.coroutines.test) + appleTest.dependencies { + implementation(libs.kotlinx.coroutines.core) } } } diff --git a/coroutines/api/kotlinx-io-coroutines.api b/coroutines/api/kotlinx-io-coroutines.api new file mode 100644 index 00000000..35740e41 --- /dev/null +++ b/coroutines/api/kotlinx-io-coroutines.api @@ -0,0 +1,20 @@ +public final class kotlinx/io/coroutines/DelimitingByteStreamDecoder : kotlinx/io/coroutines/StreamingDecoder { + public fun ()V + public fun (B)V + public synthetic fun (BILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun decode ([BLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public final fun getDelimiter ()B + public fun onClose (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public final class kotlinx/io/coroutines/SourceFlowKt { + public static final field READ_BUFFER_SIZE J + public static final fun asFlow (Lkotlinx/io/RawSource;Lkotlinx/io/coroutines/StreamingDecoder;J)Lkotlinx/coroutines/flow/Flow; + public static synthetic fun asFlow$default (Lkotlinx/io/RawSource;Lkotlinx/io/coroutines/StreamingDecoder;JILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; +} + +public abstract interface class kotlinx/io/coroutines/StreamingDecoder { + public abstract fun decode ([BLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun onClose (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + diff --git a/coroutines/build.gradle.kts b/coroutines/build.gradle.kts new file mode 100644 index 00000000..b109f025 --- /dev/null +++ b/coroutines/build.gradle.kts @@ -0,0 +1,49 @@ +/* + * Copyright 2010-2024 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +plugins { + kotlin("multiplatform") +// id("kotlinx-io-multiplatform") + id("kotlinx-io-publish") + id("kotlinx-io-dokka") + id("kotlinx-io-compatibility") + alias(libs.plugins.kover) +} + +kotlin { + sourceSets { + commonMain.dependencies { + api(project(":kotlinx-io-core")) + api(libs.kotlinx.coroutines.core) + } + commonTest.dependencies { + implementation(kotlin("test")) + implementation(libs.kotlinx.coroutines.test) + } + } + + jvm() + + explicitApi() + + withSourcesJar() + +// js { +// nodejs { +// testTask { +// useMocha { +// timeout = "300s" +// } +// } +// } +// browser { +// testTask { +// useMocha { +// timeout = "300s" +// } +// } +// } +// } +} diff --git a/core/common/src/coroutines/DelimitingByteStreamDecoder.kt b/coroutines/src/commonMain/kotlin/coroutines/DelimitingByteStreamDecoder.kt similarity index 100% rename from core/common/src/coroutines/DelimitingByteStreamDecoder.kt rename to coroutines/src/commonMain/kotlin/coroutines/DelimitingByteStreamDecoder.kt diff --git a/core/common/src/coroutines/SourceFlow.kt b/coroutines/src/commonMain/kotlin/coroutines/SourceFlow.kt similarity index 100% rename from core/common/src/coroutines/SourceFlow.kt rename to coroutines/src/commonMain/kotlin/coroutines/SourceFlow.kt diff --git a/core/common/src/coroutines/StreamingDecoder.kt b/coroutines/src/commonMain/kotlin/coroutines/StreamingDecoder.kt similarity index 100% rename from core/common/src/coroutines/StreamingDecoder.kt rename to coroutines/src/commonMain/kotlin/coroutines/StreamingDecoder.kt diff --git a/core/common/test/coroutines/DelimitingByteStreamDecoderTest.kt b/coroutines/src/commonTest/kotlin/coroutines/DelimitingByteStreamDecoderTest.kt similarity index 100% rename from core/common/test/coroutines/DelimitingByteStreamDecoderTest.kt rename to coroutines/src/commonTest/kotlin/coroutines/DelimitingByteStreamDecoderTest.kt diff --git a/core/common/test/coroutines/KeyValueStreamIntegrationTest.kt b/coroutines/src/commonTest/kotlin/coroutines/KeyValueStreamIntegrationTest.kt similarity index 100% rename from core/common/test/coroutines/KeyValueStreamIntegrationTest.kt rename to coroutines/src/commonTest/kotlin/coroutines/KeyValueStreamIntegrationTest.kt diff --git a/core/common/test/coroutines/RawSourceAsFlowTest.kt b/coroutines/src/commonTest/kotlin/coroutines/RawSourceAsFlowTest.kt similarity index 100% rename from core/common/test/coroutines/RawSourceAsFlowTest.kt rename to coroutines/src/commonTest/kotlin/coroutines/RawSourceAsFlowTest.kt diff --git a/settings.gradle.kts b/settings.gradle.kts index 79515254..e07053a0 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -20,12 +20,14 @@ plugins { rootProject.name = "kotlinx-io" include(":kotlinx-io-core") +include(":kotlinx-io-coroutines") include(":kotlinx-io-benchmarks") include(":kotlinx-io-bytestring") include(":kotlinx-io-smoke-tests") include(":kotlinx-io-okio") project(":kotlinx-io-core").projectDir = file("./core") +project(":kotlinx-io-coroutines").projectDir = file("./coroutines") project(":kotlinx-io-benchmarks").projectDir = file("./benchmarks") project(":kotlinx-io-bytestring").projectDir = file("./bytestring") project(":kotlinx-io-smoke-tests").projectDir = file("./smoke-tests") From f09e06cd6db4724eddf579a266883d1f498bec3b Mon Sep 17 00:00:00 2001 From: Konstantin Pavlov <1517853+kpavlov@users.noreply.github.com> Date: Thu, 11 Dec 2025 15:14:15 +0100 Subject: [PATCH 3/3] Refactor SourceFlow --- .../coroutines/DelimitingByteStreamDecoder.kt | 0 .../{ => kotlinx/io}/coroutines/SourceFlow.kt | 74 +++++++++---------- .../io}/coroutines/StreamingDecoder.kt | 0 .../DelimitingByteStreamDecoderTest.kt | 0 .../KeyValueStreamIntegrationTest.kt | 0 .../io}/coroutines/RawSourceAsFlowTest.kt | 0 6 files changed, 37 insertions(+), 37 deletions(-) rename coroutines/src/commonMain/kotlin/{ => kotlinx/io}/coroutines/DelimitingByteStreamDecoder.kt (100%) rename coroutines/src/commonMain/kotlin/{ => kotlinx/io}/coroutines/SourceFlow.kt (58%) rename coroutines/src/commonMain/kotlin/{ => kotlinx/io}/coroutines/StreamingDecoder.kt (100%) rename coroutines/src/commonTest/kotlin/{ => kotlinx/io}/coroutines/DelimitingByteStreamDecoderTest.kt (100%) rename coroutines/src/commonTest/kotlin/{ => kotlinx/io}/coroutines/KeyValueStreamIntegrationTest.kt (100%) rename coroutines/src/commonTest/kotlin/{ => kotlinx/io}/coroutines/RawSourceAsFlowTest.kt (100%) diff --git a/coroutines/src/commonMain/kotlin/coroutines/DelimitingByteStreamDecoder.kt b/coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/DelimitingByteStreamDecoder.kt similarity index 100% rename from coroutines/src/commonMain/kotlin/coroutines/DelimitingByteStreamDecoder.kt rename to coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/DelimitingByteStreamDecoder.kt diff --git a/coroutines/src/commonMain/kotlin/coroutines/SourceFlow.kt b/coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/SourceFlow.kt similarity index 58% rename from coroutines/src/commonMain/kotlin/coroutines/SourceFlow.kt rename to coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/SourceFlow.kt index f3ff51d2..a814360d 100644 --- a/coroutines/src/commonMain/kotlin/coroutines/SourceFlow.kt +++ b/coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/SourceFlow.kt @@ -4,10 +4,10 @@ */ package kotlinx.io.coroutines +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.channelFlow -import kotlinx.coroutines.isActive +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.withContext import kotlinx.coroutines.yield import kotlinx.io.Buffer @@ -45,44 +45,44 @@ public const val READ_BUFFER_SIZE: Long = 8196 public fun RawSource.asFlow( decoder: StreamingDecoder, readBufferSize: Long = READ_BUFFER_SIZE -): Flow = - channelFlow { - val source = this@asFlow - val buffer = Buffer() - var decoderClosed = false - try { - source.use { source -> - while (isActive) { - val bytesRead = source.readAtMostTo(buffer, readBufferSize) - if (bytesRead == -1L) { - break - } +): Flow = flow { + val source = this@asFlow + val buffer = Buffer() + var decoderClosed = false + try { + source.use { source -> + while (true) { + val bytesRead = source.readAtMostTo(buffer, readBufferSize) + if (bytesRead == -1L) { + break + } - if (bytesRead > 0L) { - val bytes = buffer.readByteArray() - buffer.clear() - decoder.decode(bytes) { - send(it) - } + if (bytesRead > 0L) { + val bytes = buffer.readByteArray() + decoder.decode(bytes) { + emit(it) } - - yield() // Giving other coroutines a chance to run } + + yield() // Giving other coroutines a chance to run } - // Normal completion: emit any remaining buffered data - decoder.onClose { send(it) } - decoderClosed = true - } catch (exception: IOException) { - // IO error: try to emit remaining data, then close with error - runCatching { decoder.onClose { send(it) } }.onSuccess { decoderClosed = true } - throw exception - } finally { - // Ensure decoder cleanup even on cancellation or other exceptions - if (!decoderClosed) { - withContext(NonCancellable) { - runCatching { decoder.onClose { /* discard data, cleanup only */ } } - } + } + // Normal completion: emit any remaining buffered data + decoder.onClose { emit(it) } + decoderClosed = true + } catch (e: CancellationException) { + throw e + } catch (exception: IOException) { + // IO error: try to emit remaining data, then close with error + runCatching { decoder.onClose { emit(it) } }.onSuccess { decoderClosed = true } + throw exception + } finally { + // Ensure decoder cleanup even on cancellation or other exceptions + if (!decoderClosed) { + withContext(NonCancellable) { + runCatching { decoder.onClose { /* discard data, cleanup only */ } } } - buffer.clear() } - } \ No newline at end of file + buffer.clear() + } +} \ No newline at end of file diff --git a/coroutines/src/commonMain/kotlin/coroutines/StreamingDecoder.kt b/coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/StreamingDecoder.kt similarity index 100% rename from coroutines/src/commonMain/kotlin/coroutines/StreamingDecoder.kt rename to coroutines/src/commonMain/kotlin/kotlinx/io/coroutines/StreamingDecoder.kt diff --git a/coroutines/src/commonTest/kotlin/coroutines/DelimitingByteStreamDecoderTest.kt b/coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/DelimitingByteStreamDecoderTest.kt similarity index 100% rename from coroutines/src/commonTest/kotlin/coroutines/DelimitingByteStreamDecoderTest.kt rename to coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/DelimitingByteStreamDecoderTest.kt diff --git a/coroutines/src/commonTest/kotlin/coroutines/KeyValueStreamIntegrationTest.kt b/coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/KeyValueStreamIntegrationTest.kt similarity index 100% rename from coroutines/src/commonTest/kotlin/coroutines/KeyValueStreamIntegrationTest.kt rename to coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/KeyValueStreamIntegrationTest.kt diff --git a/coroutines/src/commonTest/kotlin/coroutines/RawSourceAsFlowTest.kt b/coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/RawSourceAsFlowTest.kt similarity index 100% rename from coroutines/src/commonTest/kotlin/coroutines/RawSourceAsFlowTest.kt rename to coroutines/src/commonTest/kotlin/kotlinx/io/coroutines/RawSourceAsFlowTest.kt