Skip to content

Conversation

@kpavlov
Copy link

@kpavlov kpavlov commented Dec 9, 2025

Add RawSource.asFlow transformer

  • Introduced converter from RawSource to Kotlin Flows.
  • Added StreamingDecoder interface and its implementation DelimitingByteStreamDecoder for processing byte streams with a specified delimiter. Inspired by Netty. Decoders can be very generic, e.g. can handle decoding (byte->String/Gzip), splitting, buffering, etc.
  • Added tests for decoder and flow behavior cases.

Example:

val source = Buffer().apply {
    writeString("name:Alice\n")
    writeString("age:30\n")
    writeString("city:NewYork\n")
}

val decoder = DelimitingByteStreamDecoder()

val entries = source.asFlow(decoder, READ_BUFFER_SIZE)
    .map { bytes -> bytes.decodeToString() }
    .map { line -> parseKeyValue(line) }
    .toList()

assertEquals(3, entries.size)
assertEquals("name" to "Alice", entries[0])
assertEquals("age" to "30", entries[1])
assertEquals("city" to "NewYork", entries[2])

Related to: #486, #447, #454, #421

@kpavlov kpavlov requested review from fzhinkin and whyoleg December 9, 2025 10:21
@kpavlov
Copy link
Author

kpavlov commented Dec 9, 2025

I need some help with updateLegacyAbi. It doesn't work for the kotlinx-coroutines-core dependency. I need an advice here, should it be kotlinx-io-coroutines?

- Introduced converter from `RawSource` to Kotlin Flows.
- Added `StreamingDecoder` interface and its implementation `DelimitingByteStreamDecoder` for processing byte streams with a specified delimiter.
- Added tests for decoder, and flow behavior cases.
@kpavlov kpavlov force-pushed the kpavlov/RawSource-asFlow branch from f21bd51 to 246f77b Compare December 9, 2025 10:53
@kpavlov kpavlov changed the base branch from master to develop December 9, 2025 11:18
@whyoleg
Copy link
Contributor

whyoleg commented Dec 9, 2025

(Note: I'm not a maintainer of kotlinx-io, so my thoughts here are mostly based on my experience of working with kotlinx-io in cryptography-kotlin and rsocket-kotlin)


Do you mind sharing more use cases apart from delimiter based decoding, and how they will be implemented with this new API? To be honest, I don't think that compression and buffering should be handled via this API.

It feels like existing readLine and indexOf operations on Source are enough to create any reasonable abstraction (including using Flow).
E.g., without any new APIs, we can do the same:

inline fun Source.forEachLine(block: (String) -> Unit): Unit = use {
    while (true) block(it.readLine() ?: break)
}
val source = Buffer().apply {
    writeString("name:Alice\n")
    writeString("age:30\n")
    writeString("city:NewYork\n")
}

source.forEachLine {
    println("received: $it")
}
// or if flow is needed - NOTE: the flow can be consumed only once!!!
flow {
    source.forEachLine {
        emit(it)
    }
}.map { line -> anyTransformation(line) }.collect { transformed ->
    println("received: $transformed")
}

Or, if you need delimiter-based, it's possible to use indexOf and write a function similar to readLine:

// those could be rather easily implemented via `indexOf` inside or outside of kotlinx-io
public fun Source.readUntilDelimiter(delimiter: Char): String? = TODO()
public fun Source.readUntilDelimiter(delimiter: Byte): ByteString? = TODO()
// and `forEach` variant
public fun Source.forEachDelimited(delimiter: Char, block: (String) -> Unit): Unit = use {
    while (true) block(it.readUntilDelimiter(delimiter) ?: break)
}
// and use it
Buffer().apply {
    writeString("first|second|third|")
}.forEachDelimited('|') {
    println("received: $it")
}

It's even possible to convert this into Flow using a nice extension function.


In all of those examples (and in the PR), the most problematic part is that Source is one-use, but Flow is multi-use, so additional care needs to be taken to avoid multiple reads from the flow itself, which is not easy to express in the API itself, not to mention other concerns from #421.

@bjhham
Copy link

bjhham commented Dec 9, 2025

I think maybe for introducing coroutines to kotlinx-io we'd want a sub-library or something that's more integrated with async I/O calls on the OS, and we'd want to introduce something similar to the coroutine I/O types in Ktor. I like the idea of supporting search / transformations for RawSource and building out a functional API though.

Copy link
Contributor

@JakeWharton JakeWharton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the motivating case is here, but this design feels both too inflexible and too inefficient in its current iteration. I also do not think the core artifact should expose a coroutines dependency.

decoder: StreamingDecoder<T>,
readBufferSize: Long = READ_BUFFER_SIZE
): Flow<T> =
channelFlow {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems unnecessary as opposed to a normal flow { }.

decoderClosed = true
} catch (exception: IOException) {
// IO error: try to emit remaining data, then close with error
runCatching { decoder.onClose { send(it) } }.onSuccess { decoderClosed = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure closing resources should be conflated with completion, since nothing differentiates the normal case vs. the error case.

* Defaults to the newline character (`'\n'`).
*/
public class DelimitingByteStreamDecoder(
public val delimiter: Byte = '\n'.code.toByte(),
Copy link
Contributor

@JakeWharton JakeWharton Dec 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supporting only a single byte makes this pretty inflexible. It should probably take a ByteString, at the very least.

if (buffer.size > 0) {
byteConsumer.invoke(buffer.readByteArray())
}
buffer.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a no-op

var startIndex = 0
for (i in bytes.indices) {
if (bytes[i] == delimiter) {
buffer.write(bytes, startIndex, i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design of this API causes a lot of data copies which undermines the overall design of the library which is built around avoiding copies.

Here, the flow function would create a ByteArray and copies data out of the segment to invoke this function. This function then copies the data back into a new segment, and then proceeds to create a new ByteArray and copies all the data back out. This is a very inefficient usage of the backing mechanism.

@kpavlov kpavlov marked this pull request as draft December 9, 2025 15:22
@kpavlov
Copy link
Author

kpavlov commented Dec 9, 2025

First of all, thank you all for feedback! 🤗

Do you mind sharing more use cases apart from delimiter based decoding

As noted in the description, decoders can be quite general and may handle tasks such as converting bytes to strings, applying gzip, buffering, and splitting. This is similar in spirit to Netty’s ByteToMessageDecoder. My goal is to separate reading from the source from the transformation and consumption of the byte stream, each in its own coroutine context, so building pipelines becomes simpler.
While the current API can support this, users shouldn’t have to repeat the same boilerplate each time, and I’d like to explore how we can improve that.

You’re right that resource usage isn’t fully optimized yet — the Buffer could be used more efficiently.

And I agree that this API belongs in a separate artifact so the core doesn’t depend on coroutines.

@kpavlov kpavlov force-pushed the kpavlov/RawSource-asFlow branch from eabbb19 to f09e06c Compare December 11, 2025 14:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants