Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions coroutines/api/kotlinx-io-coroutines.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
public final class kotlinx/io/coroutines/DelimitingByteStreamDecoder : kotlinx/io/coroutines/StreamingDecoder {
public fun <init> ()V
public fun <init> (B)V
public synthetic fun <init> (BILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun decode ([BLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getDelimiter ()B
public fun onClose (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/io/coroutines/SourceFlowKt {
public static final field READ_BUFFER_SIZE J
public static final fun asFlow (Lkotlinx/io/RawSource;Lkotlinx/io/coroutines/StreamingDecoder;J)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun asFlow$default (Lkotlinx/io/RawSource;Lkotlinx/io/coroutines/StreamingDecoder;JILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
}

public abstract interface class kotlinx/io/coroutines/StreamingDecoder {
public abstract fun decode ([BLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun onClose (Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

49 changes: 49 additions & 0 deletions coroutines/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2010-2024 JetBrains s.r.o. and respective authors and developers.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
*/

plugins {
kotlin("multiplatform")
// id("kotlinx-io-multiplatform")
id("kotlinx-io-publish")
id("kotlinx-io-dokka")
id("kotlinx-io-compatibility")
alias(libs.plugins.kover)
}

kotlin {
sourceSets {
commonMain.dependencies {
api(project(":kotlinx-io-core"))
api(libs.kotlinx.coroutines.core)
}
commonTest.dependencies {
implementation(kotlin("test"))
implementation(libs.kotlinx.coroutines.test)
}
}

jvm()

explicitApi()

withSourcesJar()

// js {
// nodejs {
// testTask {
// useMocha {
// timeout = "300s"
// }
// }
// }
// browser {
// testTask {
// useMocha {
// timeout = "300s"
// }
// }
// }
// }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
*/
package kotlinx.io.coroutines

import kotlinx.io.Buffer
import kotlinx.io.readByteArray

/**
* A streaming decoder that reads a continuous stream of bytes and separates it into discrete
* chunks based on a specified delimiter. The default delimiter is the newline character (`'\n'`).
*
* This class buffers incoming byte arrays and emits individual byte arrays once a delimiter is
* encountered. Any remaining bytes in the buffer are emitted when [onClose] is called.
*
* ## Example
*
* ```kotlin
* val decoder = DelimitingByteStreamDecoder()
* val source: RawSource = // ...
* source.asFlow(decoder).collect { line ->
* println("Received: ${line.decodeToString()}")
* }
* ```
*
* ## Thread Safety
*
* This class is **not thread-safe**. Each instance maintains internal mutable state and must
* not be shared across multiple flows or concurrent coroutines.
*
* ## Lifecycle
*
* After [onClose] is called, this decoder **cannot be reused**. The internal buffer is closed
* and the decoder should be discarded.
*
* @property delimiter The byte value used as a delimiter to separate the stream into chunks.
* Defaults to the newline character (`'\n'`).
*/
public class DelimitingByteStreamDecoder(
public val delimiter: Byte = '\n'.code.toByte(),
Copy link
Contributor

@JakeWharton JakeWharton Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supporting only a single byte makes this pretty inflexible. It should probably take a ByteString, at the very least.

) : StreamingDecoder<ByteArray> {

private val buffer = Buffer()

override suspend fun decode(bytes: ByteArray, byteConsumer: suspend (ByteArray) -> Unit) {
var startIndex = 0
for (i in bytes.indices) {
if (bytes[i] == delimiter) {
buffer.write(bytes, startIndex, i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design of this API causes a lot of data copies which undermines the overall design of the library which is built around avoiding copies.

Here, the flow function would create a ByteArray and copies data out of the segment to invoke this function. This function then copies the data back into a new segment, and then proceeds to create a new ByteArray and copies all the data back out. This is a very inefficient usage of the backing mechanism.

// flush and clear buffer
byteConsumer.invoke(buffer.readByteArray())
startIndex = i + 1
}
}
// Buffer any remaining bytes after the last delimiter
if (startIndex < bytes.size) {
buffer.write(bytes, startIndex, bytes.size)
}
}

override suspend fun onClose(byteConsumer: suspend (ByteArray) -> Unit) {
if (buffer.size > 0) {
byteConsumer.invoke(buffer.readByteArray())
}
buffer.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no-op

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
*/
package kotlinx.io.coroutines

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext
import kotlinx.coroutines.yield
import kotlinx.io.Buffer
import kotlinx.io.IOException
import kotlinx.io.RawSource
import kotlinx.io.readByteArray

public const val READ_BUFFER_SIZE: Long = 8196

/**
* Converts this [RawSource] into a Kotlin [Flow], emitting decoded data using the provided [StreamingDecoder].
*
* This function reads data from the source in chunks, decodes it using the provided decoder, and emits
* the decoded elements downstream. The returned flow is cold and will start reading from the source
* when collected.
*
* ## Lifecycle and Resource Management
*
* - The source is automatically closed when the flow completes, fails, or is cancelled
* - The decoder's [StreamingDecoder.onClose] is always called for cleanup, even on cancellation
* - On normal completion or [IOException], any remaining buffered data in the decoder is emitted
* - On cancellation, the decoder is cleaned up but remaining data is discarded
*
* ## Backpressure
*
* The flow respects structured concurrency and backpressure. Reading from the source is suspended
* when the downstream collector cannot keep up.
*
* @param T The type of elements emitted by the Flow after decoding.
* @param decoder The [StreamingDecoder] used to decode data read from this source.
* @param readBufferSize The size of the buffer used for reading from the source. Defaults to [READ_BUFFER_SIZE].
* @return A cold [Flow] that emits decoded elements of type [T].
* @throws IOException if an I/O error occurs while reading from the source.
*/
public fun <T> RawSource.asFlow(
decoder: StreamingDecoder<T>,
readBufferSize: Long = READ_BUFFER_SIZE
): Flow<T> = flow {
val source = this@asFlow
val buffer = Buffer()
var decoderClosed = false
try {
source.use { source ->
while (true) {
val bytesRead = source.readAtMostTo(buffer, readBufferSize)
if (bytesRead == -1L) {
break
}

if (bytesRead > 0L) {
val bytes = buffer.readByteArray()
decoder.decode(bytes) {
emit(it)
}
}

yield() // Giving other coroutines a chance to run
}
}
// Normal completion: emit any remaining buffered data
decoder.onClose { emit(it) }
decoderClosed = true
} catch (e: CancellationException) {
throw e
} catch (exception: IOException) {
// IO error: try to emit remaining data, then close with error
runCatching { decoder.onClose { emit(it) } }.onSuccess { decoderClosed = true }
throw exception
} finally {
// Ensure decoder cleanup even on cancellation or other exceptions
if (!decoderClosed) {
withContext(NonCancellable) {
runCatching { decoder.onClose { /* discard data, cleanup only */ } }
}
}
buffer.clear()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2017-2025 JetBrains s.r.o. and respective authors and developers.
* Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file.
*/
package kotlinx.io.coroutines

/**
* A generic interface for decoding a stream of bytes into discrete elements of type [T].
*
* Implementations of this interface are responsible for processing input byte arrays, decoding
* them into meaningful elements, and delivering them to the provided `byteConsumer` function in
* sequential order. This allows for efficient handling of streaming data and enables
* processing without requiring the entire stream to be loaded into memory.
*
* ## Lifecycle
*
* The decoder processes a stream through repeated calls to [decode], followed by a final call
* to [onClose] when the stream ends. After [onClose] is called, the decoder should not be reused.
*
* ## Thread Safety
*
* Implementations are not required to be thread-safe. Each decoder instance should be used with
* a single stream and should not be shared across concurrent coroutines.
*
* @param T The type of elements produced by the decoder.
*/
public interface StreamingDecoder<T> {
/**
* Decodes a chunk of bytes from the input stream.
*
* This method may be called multiple times as data arrives. Implementations should buffer
* incomplete elements internally and emit complete elements via [byteConsumer].
*
* @param bytes The input byte array to decode.
* @param byteConsumer A suspend function that receives decoded elements.
*/
public suspend fun decode(bytes: ByteArray, byteConsumer: suspend (T) -> Unit)

/**
* Called when the input stream ends, allowing the decoder to emit any remaining buffered data
* and perform cleanup.
*
* After this method is called, the decoder should not be used again.
*
* @param byteConsumer A suspend function that receives any final decoded elements.
*/
public suspend fun onClose(byteConsumer: suspend (T) -> Unit)
}

Loading