diff --git a/.github/workflows/changelog.yml b/.github/workflows/changelog.yml
index a223a996a..c56e97652 100644
--- a/.github/workflows/changelog.yml
+++ b/.github/workflows/changelog.yml
@@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Sources
- uses: actions/checkout@v4
+ uses: actions/checkout@v5
with:
fetch-depth: 0
diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml
index 8f42936af..44cb3d251 100644
--- a/.github/workflows/docs.yml
+++ b/.github/workflows/docs.yml
@@ -30,7 +30,7 @@ env:
ALGOLIA_INDEX_NAME: 'prod_kotlin_rpc'
ALGOLIA_KEY: '${{ secrets.ALGOLIA_KEY }}'
CONFIG_JSON_PRODUCT: 'kotlinx-rpc'
- CONFIG_JSON_VERSION: '0.9.1'
+ CONFIG_JSON_VERSION: '0.10.0'
DOKKA_ARTIFACT: 'dokka.zip'
ASSEMBLE_DIR: '__docs_assembled'
ASSEMBLE_ARTIFACT: 'assembled.zip'
@@ -40,7 +40,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
- uses: actions/checkout@v4
+ uses: actions/checkout@v5
with:
fetch-depth: 0
@@ -67,7 +67,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Download artifacts
- uses: actions/download-artifact@v4
+ uses: actions/download-artifact@v5
with:
name: kotlinx-rpc
path: artifacts
@@ -83,12 +83,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
- uses: actions/checkout@v4
+ uses: actions/checkout@v5
with:
fetch-depth: 0
- name: Download Writerside artifacts
- uses: actions/download-artifact@v4
+ uses: actions/download-artifact@v5
with:
name: kotlinx-rpc
@@ -140,7 +140,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Download artifacts
- uses: actions/download-artifact@v4
+ uses: actions/download-artifact@v5
with:
name: kotlinx-rpc-assembled
@@ -151,7 +151,7 @@ jobs:
uses: actions/configure-pages@v5
- name: Package and upload Pages artifact
- uses: actions/upload-pages-artifact@v3
+ uses: actions/upload-pages-artifact@v4
with:
path: publish
@@ -167,7 +167,7 @@ jobs:
image: registry.jetbrains.team/p/writerside/builder/algolia-publisher:2.0.32-3
steps:
- name: Download artifact
- uses: actions/download-artifact@v4
+ uses: actions/download-artifact@v5
with:
name: kotlinx-rpc
- name: Unzip artifact
diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml
index 0d2a414e5..d3bb26090 100644
--- a/.github/workflows/platforms.yml
+++ b/.github/workflows/platforms.yml
@@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Sources
- uses: actions/checkout@v4
+ uses: actions/checkout@v5
with:
fetch-depth: 0
- name: Setup Gradle
diff --git a/CHANGELOG.md b/CHANGELOG.md
index dd87b584e..e958bee1d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,54 @@
+# 0.10.0
+> Published 1 October 2025
+
+## Overview
+This release brings a lot of changes, work:
+- Kotlin 2.2.20 and 2.2.10 support
+- kRPC: Backpressure
+
+To read about the backpressure feature,
+see the updated [kRPC Configuration](https://kotlin.github.io/kotlinx-rpc/configuration.html#connector-dsl) page.
+
+### Breaking Changes ๐ด
+* Allow suspend calls inside ktor rpc builder #433 by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/439
+
+### Features ๐
+* Kotlin 2.2.20 by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/478
+* Kotlin 2.2.10 by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/456
+* kRPC: Backpressure by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/462
+* Add support for Wasm/Wasi to krpc #465 by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/480
+
+### Bug fixes ๐
+* Add collect once check for client streams by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/431
+* Fix diagnostic rendering for compiler plugins checkers by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/432
+* fix wrong unchecked null cast (potential NPE) by @y9maly in https://github.com/Kotlin/kotlinx-rpc/pull/445
+
+### Documentation ๐
+* Docs for gRPC with Ktor by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/394
+* Add a doc for KMP source sets with gRPC by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/405
+* Update strict-mode.topic by @BierDav in https://github.com/Kotlin/kotlinx-rpc/pull/440
+* Update grpc-configuration.topic by @flockbastian in https://github.com/Kotlin/kotlinx-rpc/pull/450
+* Added docs for release by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/482
+
+### Infra ๐ง
+* Fix docs yaml and signing tasks by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/404
+* Fix jdk resolution problems on CI by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/406
+* Use compat-patrouille for compatibility settings by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/438
+
+### Other Changes ๐งน
+* Fix how we create 'publishMavenArtifact' tasks by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/416
+* Update grpc-sample app by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/425
+* Fix LV and signing by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/424
+* Update ktor-all-platforms-app sample to sync service creation by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/455
+* Added Ktor closure tests and Cancellation tests, + minor fixes by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/479
+* Fix flaky tests by @Mr3zee in https://github.com/Kotlin/kotlinx-rpc/pull/481
+
+## New Contributors
+* @flockbastian made their first contribution in https://github.com/Kotlin/kotlinx-rpc/pull/450
+* @y9maly made their first contribution in https://github.com/Kotlin/kotlinx-rpc/pull/445
+
+**Full Changelog**: https://github.com/Kotlin/kotlinx-rpc/compare/0.9.1...0.10.0
+
# 0.9.1
> Published 17 July 2025
diff --git a/README.md b/README.md
index 23ac5ee66..b0e7369f4 100644
--- a/README.md
+++ b/README.md
@@ -136,7 +136,7 @@ Example of a setup in a project's `build.gradle.kts`:
plugins {
kotlin("multiplatform") version "2.2.20"
kotlin("plugin.serialization") version "2.2.20"
- id("org.jetbrains.kotlinx.rpc.plugin") version "0.9.1"
+ id("org.jetbrains.kotlinx.rpc.plugin") version "0.10.0"
}
```
@@ -151,15 +151,15 @@ And now you can add dependencies to your project:
```kotlin
dependencies {
// Client API
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-client:0.9.1")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-client:0.10.0")
// Server API
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-server:0.9.1")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-server:0.10.0")
// Serialization module. Also, protobuf and cbor are provided
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-serialization-json:0.9.1")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-serialization-json:0.10.0")
// Transport implementation for Ktor
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-ktor-client:0.9.1")
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-ktor-server:0.9.1")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-ktor-client:0.10.0")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-ktor-server:0.10.0")
// Ktor API
implementation("io.ktor:ktor-client-cio-jvm:$ktor_version")
diff --git a/docs/pages/kotlinx-rpc/help-versions.json b/docs/pages/kotlinx-rpc/help-versions.json
index 3416f8e7b..396332ba2 100644
--- a/docs/pages/kotlinx-rpc/help-versions.json
+++ b/docs/pages/kotlinx-rpc/help-versions.json
@@ -1,3 +1,3 @@
[
- {"version":"0.9.1","url":"/kotlinx-rpc/0.9.1/","isCurrent":true}
+ {"version":"0.10.0","url":"/kotlinx-rpc/0.10.0/","isCurrent":true}
]
diff --git a/docs/pages/kotlinx-rpc/topics/changelog.md b/docs/pages/kotlinx-rpc/topics/changelog.md
index 8486c19d4..845992918 100644
--- a/docs/pages/kotlinx-rpc/topics/changelog.md
+++ b/docs/pages/kotlinx-rpc/topics/changelog.md
@@ -2,6 +2,58 @@
This page contains all changes throughout releases of the library.
+## 0.10.0
+> Published 1 October 2025
+
+**Full Changelog**: [0.9.1...0.10.0](https://github.com/Kotlin/kotlinx-rpc/compare/0.9.1...0.10.0)
+
+### Overview {id=Overview_0_10_0}
+This release brings a lot of changes, work:
+- Kotlin 2.2.20 and 2.2.10 support
+- kRPC: Backpressure
+
+To read about the backpressure feature,
+see the updated [kRPC Configuration](https://kotlin.github.io/kotlinx-rpc/configuration.html#connector-dsl) page.
+
+#### Breaking Changes ๐ด {id=Breaking_Changes_0_10_0}
+* Allow suspend calls inside ktor rpc builder #433 by [@Mr3zee](https://github.com/Mr3zee) in [#439](https://github.com/Kotlin/kotlinx-rpc/pull/439)
+
+#### Features ๐ {id=Features_0_10_0}
+* Kotlin 2.2.20 by [@Mr3zee](https://github.com/Mr3zee) in [#478](https://github.com/Kotlin/kotlinx-rpc/pull/478)
+* Kotlin 2.2.10 by [@Mr3zee](https://github.com/Mr3zee) in [#456](https://github.com/Kotlin/kotlinx-rpc/pull/456)
+* kRPC: Backpressure by [@Mr3zee](https://github.com/Mr3zee) in [#462](https://github.com/Kotlin/kotlinx-rpc/pull/462)
+* Add support for Wasm/Wasi to krpc #465 by [@Mr3zee](https://github.com/Mr3zee) in [#480](https://github.com/Kotlin/kotlinx-rpc/pull/480)
+
+#### Bug fixes ๐ {id=Bug_fixes_0_10_0}
+* Add collect once check for client streams by [@Mr3zee](https://github.com/Mr3zee) in [#431](https://github.com/Kotlin/kotlinx-rpc/pull/431)
+* Fix diagnostic rendering for compiler plugins checkers by [@Mr3zee](https://github.com/Mr3zee) in [#432](https://github.com/Kotlin/kotlinx-rpc/pull/432)
+* fix wrong unchecked null cast (potential NPE) by [@y9maly](https://github.com/y9maly) in [#445](https://github.com/Kotlin/kotlinx-rpc/pull/445)
+
+#### Documentation ๐ {id=Documentation_0_10_0}
+* Docs for gRPC with Ktor by [@Mr3zee](https://github.com/Mr3zee) in [#394](https://github.com/Kotlin/kotlinx-rpc/pull/394)
+* Add a doc for KMP source sets with gRPC by [@Mr3zee](https://github.com/Mr3zee) in [#405](https://github.com/Kotlin/kotlinx-rpc/pull/405)
+* Update strict-mode.topic by [@BierDav](https://github.com/BierDav) in [#440](https://github.com/Kotlin/kotlinx-rpc/pull/440)
+* Update grpc-configuration.topic by [@flockbastian](https://github.com/flockbastian) in [#450](https://github.com/Kotlin/kotlinx-rpc/pull/450)
+* Added docs for release by [@Mr3zee](https://github.com/Mr3zee) in [#482](https://github.com/Kotlin/kotlinx-rpc/pull/482)
+
+#### Infra ๐ง {id=Infra_0_10_0}
+* Fix docs yaml and signing tasks by [@Mr3zee](https://github.com/Mr3zee) in [#404](https://github.com/Kotlin/kotlinx-rpc/pull/404)
+* Fix jdk resolution problems on CI by [@Mr3zee](https://github.com/Mr3zee) in [#406](https://github.com/Kotlin/kotlinx-rpc/pull/406)
+* Use compat-patrouille for compatibility settings by [@Mr3zee](https://github.com/Mr3zee) in [#438](https://github.com/Kotlin/kotlinx-rpc/pull/438)
+
+#### Other Changes ๐งน {id=Other_Changes_0_10_0}
+* Fix how we create 'publishMavenArtifact' tasks by [@Mr3zee](https://github.com/Mr3zee) in [#416](https://github.com/Kotlin/kotlinx-rpc/pull/416)
+* Update grpc-sample app by [@Mr3zee](https://github.com/Mr3zee) in [#425](https://github.com/Kotlin/kotlinx-rpc/pull/425)
+* Fix LV and signing by [@Mr3zee](https://github.com/Mr3zee) in [#424](https://github.com/Kotlin/kotlinx-rpc/pull/424)
+* Update ktor-all-platforms-app sample to sync service creation by [@Mr3zee](https://github.com/Mr3zee) in [#455](https://github.com/Kotlin/kotlinx-rpc/pull/455)
+* Added Ktor closure tests and Cancellation tests, + minor fixes by [@Mr3zee](https://github.com/Mr3zee) in [#479](https://github.com/Kotlin/kotlinx-rpc/pull/479)
+* Fix flaky tests by [@Mr3zee](https://github.com/Mr3zee) in [#481](https://github.com/Kotlin/kotlinx-rpc/pull/481)
+
+### New Contributors {id=New_Contributors_0_10_0}
+* [@flockbastian](https://github.com/flockbastian) made their first contribution in [#450](https://github.com/Kotlin/kotlinx-rpc/pull/450)
+* [@y9maly](https://github.com/y9maly) made their first contribution in [#445](https://github.com/Kotlin/kotlinx-rpc/pull/445)
+
+
## 0.9.1
> Published 17 July 2025
diff --git a/docs/pages/kotlinx-rpc/topics/configuration.topic b/docs/pages/kotlinx-rpc/topics/configuration.topic
index 64a721052..c12745a25 100644
--- a/docs/pages/kotlinx-rpc/topics/configuration.topic
+++ b/docs/pages/kotlinx-rpc/topics/configuration.topic
@@ -101,7 +101,7 @@
Note that this is per call, not per connection.
- The default value is 1
.
+ The default value is 1000
.
diff --git a/docs/pages/kotlinx-rpc/v.list b/docs/pages/kotlinx-rpc/v.list
index c50758c72..551f6a781 100644
--- a/docs/pages/kotlinx-rpc/v.list
+++ b/docs/pages/kotlinx-rpc/v.list
@@ -14,6 +14,6 @@
-
+
diff --git a/docs/pages/kotlinx-rpc/writerside.cfg b/docs/pages/kotlinx-rpc/writerside.cfg
index bc8a87839..ce3acc170 100644
--- a/docs/pages/kotlinx-rpc/writerside.cfg
+++ b/docs/pages/kotlinx-rpc/writerside.cfg
@@ -12,5 +12,5 @@
-
+
diff --git a/gradle-conventions/src/main/kotlin/util/targets/configure.kt b/gradle-conventions/src/main/kotlin/util/targets/configure.kt
index 77346f401..72c1f4d10 100644
--- a/gradle-conventions/src/main/kotlin/util/targets/configure.kt
+++ b/gradle-conventions/src/main/kotlin/util/targets/configure.kt
@@ -7,7 +7,6 @@ package util.targets
import io.gitlab.arturbosch.detekt.extensions.DetektExtension
import org.gradle.api.Action
import org.gradle.api.Project
-import org.gradle.jvm.toolchain.JavaLanguageVersion
import org.gradle.kotlin.dsl.the
import org.jetbrains.kotlin.gradle.dsl.KotlinMultiplatformExtension
import org.jetbrains.kotlin.gradle.plugin.KotlinTarget
@@ -31,7 +30,9 @@ private fun KotlinMultiplatformExtension.configureTargets(config: KmpConfig): Li
if (config.js) {
js(IR) {
nodejs()
- browser()
+ if (!config.kotlinMasterBuild) {
+ browser()
+ }
binaries.library()
}.also { targets.add(it) }
diff --git a/gradle-conventions/src/main/kotlin/util/targets/wasm.kt b/gradle-conventions/src/main/kotlin/util/targets/wasm.kt
index 30e52376a..5a3a6ab80 100644
--- a/gradle-conventions/src/main/kotlin/util/targets/wasm.kt
+++ b/gradle-conventions/src/main/kotlin/util/targets/wasm.kt
@@ -24,7 +24,10 @@ fun KmpConfig.configureWasm() {
wasmJs {
configureJsAndWasmJsTasks()
- browser()
+ if (!kotlinMasterBuild) {
+ browser()
+ }
+
nodejs()
if (wasmJsD8) {
// this platform needs some care KRPC-210
diff --git a/krpc/krpc-client/api/krpc-client.klib.api b/krpc/krpc-client/api/krpc-client.klib.api
index 53c218490..cd79fd924 100644
--- a/krpc/krpc-client/api/krpc-client.klib.api
+++ b/krpc/krpc-client/api/krpc-client.klib.api
@@ -1,5 +1,5 @@
// Klib ABI Dump
-// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm64, watchosSimulatorArm64, watchosX64]
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
diff --git a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt
index c3a8f27a3..7c90ae57a 100644
--- a/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt
+++ b/krpc/krpc-client/src/commonMain/kotlin/kotlinx/rpc/krpc/client/KrpcClient.kt
@@ -9,16 +9,11 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.DelicateCoroutinesApi
-import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
@@ -81,7 +76,6 @@ public abstract class InitializedKrpcClient(
* serializing data, tracking streams, processing exceptions, and other protocol responsibilities.
* Leaves out the delivery of encoded messages to the specific implementations with [KrpcTransport].
*/
-@OptIn(InternalCoroutinesApi::class)
public abstract class KrpcClient : RpcClient, KrpcEndpoint {
/**
* Called once to provide [KrpcTransport] for this client.
@@ -138,6 +132,9 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
@Volatile
private var clientCancelled = false
+ @Volatile
+ private var clientCancelledByServer = false
+
private fun checkTransportReadiness() {
if (!isTransportReady) {
error(
@@ -153,15 +150,22 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
val context = SupervisorJob(transport.coroutineContext.job)
- context.job.invokeOnCompletion(onCancelling = true) {
- clientCancelled = true
+ context.job.invokeOnCompletion {
+ try {
+ if (!clientCancelled && !clientCancelledByServer) {
+ sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
+ }
- sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
+ clientCancelled = true
+ } catch (_ : Exception) {
+ // ignore, we are already cancelled
+ } finally {
+ requestChannels.values.forEach {
+ val cause = CancellationException("Client cancelled")
+ it.close(cause)
+ it.cancel(cause)
+ }
- @OptIn(DelicateCoroutinesApi::class)
- @Suppress("detekt.GlobalCoroutineUsage")
- GlobalScope.launch(CoroutineName("client-request-channels-closing")) {
- requestChannels.values.forEach { it.close(CancellationException("Client cancelled")) }
requestChannels.clear()
}
}
@@ -256,7 +260,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
final override fun callServerStreaming(call: RpcCall): Flow {
return flow {
if (clientCancelled) {
- error("Client cancelled")
+ error("RpcClient was cancelled")
}
initializeAndAwaitHandshakeCompletion()
@@ -272,6 +276,10 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
try {
@Suppress("UNCHECKED_CAST")
requestChannels[callId] = channel as Channel>
+ if (clientCancelled) {
+ requestChannels.remove(callId)
+ error("RpcClient was cancelled")
+ }
val request = serializeRequest(
callId = callId,
@@ -322,6 +330,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
throw e
} finally {
channel.close()
+ channel.cancel()
requestChannels.remove(callId)
connector.unsubscribeFromMessages(call.descriptor.fqName, callId)
}
@@ -363,6 +372,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
is KrpcCallMessage.CallException -> {
val cause = message.cause.deserialize()
channel.close(cause)
+ channel.cancel(CancellationException("Call failed", cause))
}
is KrpcCallMessage.CallSuccess, is KrpcCallMessage.StreamMessage -> {
@@ -384,6 +394,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
is KrpcCallMessage.StreamCancel -> {
val cause = message.cause.deserialize()
channel.close(cause)
+ channel.cancel(CancellationException("Stream cancelled", cause))
}
}
}
@@ -392,6 +403,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
final override suspend fun handleCancellation(message: KrpcGenericMessage) {
when (val type = message.cancellationType()) {
CancellationType.ENDPOINT -> {
+ clientCancelledByServer = true
internalScope.cancel("Closing client after server cancellation") // we cancel this client
}
@@ -459,6 +471,7 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
serialFormat: SerialFormat,
serviceTypeString: String,
) {
+ var failure: Throwable? = null
try {
collectAndSendOutgoingStream(
serialFormat = serialFormat,
@@ -467,39 +480,31 @@ public abstract class KrpcClient : RpcClient, KrpcEndpoint {
serviceTypeString = serviceTypeString,
)
} catch (e: CancellationException) {
- currentCoroutineContext().ensureActive()
-
- val wrapped = ManualCancellationException(e)
- val serializedReason = serializeException(wrapped)
- val message = KrpcCallMessage.StreamCancel(
- callId = outgoingStream.callId,
- serviceType = serviceTypeString,
- streamId = outgoingStream.streamId,
- cause = serializedReason,
- connectionId = outgoingStream.connectionId,
- serviceId = outgoingStream.serviceId,
- )
- connector.sendMessageChecked(message) {
- // ignore, we are already cancelled and have a cause
- }
+ internalScope.ensureActive()
+
+ failure = e
// stop the flow and its coroutine, other flows are not affected
throw e
} catch (cause: Throwable) {
- val serializedReason = serializeException(cause)
- val message = KrpcCallMessage.StreamCancel(
- callId = outgoingStream.callId,
- serviceType = serviceTypeString,
- streamId = outgoingStream.streamId,
- cause = serializedReason,
- connectionId = outgoingStream.connectionId,
- serviceId = outgoingStream.serviceId,
- )
- connector.sendMessageChecked(message) {
- // ignore, we are already cancelled and have a cause
- }
+ failure = cause
throw cause
+ } finally {
+ if (failure != null) {
+ val serializedReason = serializeException(failure)
+ val message = KrpcCallMessage.StreamCancel(
+ callId = outgoingStream.callId,
+ serviceType = serviceTypeString,
+ streamId = outgoingStream.streamId,
+ cause = serializedReason,
+ connectionId = outgoingStream.connectionId,
+ serviceId = outgoingStream.serviceId,
+ )
+ connector.sendMessageChecked(message) {
+ // ignore, we are already cancelled and have a cause
+ }
+ }
}
val message = KrpcCallMessage.StreamFinished(
diff --git a/krpc/krpc-core/api/krpc-core.klib.api b/krpc/krpc-core/api/krpc-core.klib.api
index ac0c04c74..016eba789 100644
--- a/krpc/krpc-core/api/krpc-core.klib.api
+++ b/krpc/krpc-core/api/krpc-core.klib.api
@@ -1,5 +1,5 @@
// Klib ABI Dump
-// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm64, watchosSimulatorArm64, watchosX64]
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
diff --git a/krpc/krpc-core/build.gradle.kts b/krpc/krpc-core/build.gradle.kts
index 36889112f..70dae0153 100644
--- a/krpc/krpc-core/build.gradle.kts
+++ b/krpc/krpc-core/build.gradle.kts
@@ -51,4 +51,10 @@ kotlin {
tasks.withType {
// lincheck agent
jvmArgs("-XX:+EnableDynamicAgentLoading")
+
+ if (project.hasProperty("stressTests") && project.property("stressTests") == "true") {
+ include("kotlinx/rpc/krpc/stress/**")
+ } else {
+ exclude("kotlinx/rpc/krpc/stress/**")
+ }
}
diff --git a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/KrpcConfig.kt b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/KrpcConfig.kt
index 3ba88ceae..9ed87ade2 100644
--- a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/KrpcConfig.kt
+++ b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/KrpcConfig.kt
@@ -79,12 +79,12 @@ public sealed class KrpcConfigBuilder protected constructor() {
/**
* A buffer size for a single call.
*
- * The default value is 1,
+ * The default value is 1000,
* meaning that only after one message is handled - the next one will be sent.
*
* This buffer also applies to how many messages are cached with [waitTimeout]
*/
- public var perCallBufferSize: Int = 1
+ public var perCallBufferSize: Int = 1000
}
@Deprecated("Use connector { } instead", level = DeprecationLevel.ERROR)
diff --git a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.kt b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.kt
index 10666295c..478daa3a0 100644
--- a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.kt
+++ b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.kt
@@ -13,7 +13,11 @@ public fun serializeException(cause: Throwable): SerializedException {
val message = cause.message ?: "Unknown exception"
val stacktrace = cause.stackElements()
val serializedCause = cause.cause?.let { serializeException(it) }
- val className = cause::class.rpcInternalTypeName ?: ""
+ val className = if (cause is CancellationException || cause is kotlin.coroutines.cancellation.CancellationException) {
+ CancellationException::class.rpcInternalTypeName ?: "kotlinx.coroutines.CancellationException"
+ } else {
+ cause::class.rpcInternalTypeName ?: ""
+ }
return SerializedException(cause.toString(), message, stacktrace, serializedCause, className)
}
@@ -22,16 +26,15 @@ internal expect fun Throwable.stackElements(): List
internal expect fun SerializedException.deserializeUnsafe(): Throwable
-internal fun SerializedException.nonJvmManualCancellationExceptionDeserialize(): ManualCancellationException? {
- if (className == ManualCancellationException::class.rpcInternalTypeName) {
- val cancellation = cause?.deserializeUnsafe()
- ?: error("ManualCancellationException must have a cause")
+internal fun SerializedException.cancellationExceptionDeserialize(): CancellationException? {
+ if (className == CancellationException::class.rpcInternalTypeName
+ || className == kotlin.coroutines.cancellation.CancellationException::class.rpcInternalTypeName
+ ) {
+ val cause = this@cancellationExceptionDeserialize.cause?.deserializeUnsafe()
- return ManualCancellationException(
- CancellationException(
- message = cancellation.message,
- cause = cancellation.cause,
- )
+ return CancellationException(
+ message = message,
+ cause = cause,
)
}
@@ -44,29 +47,19 @@ public fun SerializedException.deserialize(): Throwable {
deserializeUnsafe()
}
- val result = if (cause.isFailure) {
+ return if (cause.isFailure) {
cause.exceptionOrNull()!!
} else {
- val ex = cause.getOrNull()!!
- if (ex is ManualCancellationException) {
- ex.cause
- } else {
- ex
- }
+ cause.getOrNull()!!
}
-
- return result
}
-@InternalRpcApi
-public class ManualCancellationException(override val cause: CancellationException): RuntimeException()
-
internal expect class DeserializedException(
toStringMessage: String,
message: String,
stacktrace: List,
cause: SerializedException?,
- className: String
+ className: String,
) : Throwable {
override val message: String
}
diff --git a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt
index 0ac4d2251..fd43bf693 100644
--- a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt
+++ b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcEndpoint.kt
@@ -5,7 +5,6 @@
package kotlinx.rpc.krpc.internal
import kotlinx.coroutines.CoroutineName
-import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.launch
import kotlinx.rpc.internal.utils.InternalRpcApi
diff --git a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt
index be276f6a8..a6143c5fa 100644
--- a/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt
+++ b/krpc/krpc-core/src/commonMain/kotlin/kotlinx/rpc/krpc/internal/KrpcReceiveHandler.kt
@@ -8,7 +8,6 @@ import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineStart
-import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.rpc.internal.utils.InternalRpcApi
diff --git a/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.kt b/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.kt
index 0806ba99f..21bb80a8f 100644
--- a/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.kt
+++ b/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.kt
@@ -6,19 +6,12 @@ package kotlinx.rpc.krpc
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
-import kotlinx.coroutines.delay
import kotlinx.coroutines.job
-import kotlinx.coroutines.joinAll
-import kotlinx.coroutines.launch
import kotlinx.coroutines.test.TestResult
import kotlinx.coroutines.test.TestScope
-import kotlinx.coroutines.withContext
-import kotlinx.coroutines.yield
import kotlinx.rpc.krpc.internal.HandlerKey
import kotlinx.rpc.krpc.internal.KrpcActingReceiveHandler
import kotlinx.rpc.krpc.internal.KrpcCallMessage
@@ -29,20 +22,16 @@ import kotlinx.rpc.krpc.internal.KrpcMessageSender
import kotlinx.rpc.krpc.internal.KrpcMessageSubscription
import kotlinx.rpc.krpc.internal.KrpcPluginKey
import kotlinx.rpc.krpc.internal.KrpcReceiveBuffer
-import kotlinx.rpc.krpc.internal.KrpcSendHandler
import kotlinx.rpc.krpc.internal.KrpcStoringReceiveHandler
import kotlinx.rpc.krpc.internal.WindowResult
import kotlinx.rpc.krpc.internal.decodeWindow
import kotlinx.rpc.krpc.internal.deserialize
import kotlinx.rpc.krpc.internal.isFailure
import kotlinx.rpc.krpc.internal.isSuccess
-import kotlinx.rpc.krpc.internal.onClosed
-import kotlinx.rpc.krpc.internal.onFailure
import kotlinx.rpc.test.runTestWithCoroutinesProbes
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
-import kotlin.test.fail
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
@@ -71,15 +60,15 @@ internal class KrpcReceiveHandlerTest : KrpcReceiveHandlerBaseTest() {
acting.broadcastWindowUpdate(-1, null, "service", "callId")
val windowResult2 = decodeWindow(channel.receive() as KrpcGenericMessage)
- assertEquals(-1, (windowResult2 as WindowResult.Success).update)
+ assertEquals(-1, (windowResult2 as WindowResult.Success).update)
acting.broadcastWindowUpdate(Int.MAX_VALUE, null, "service", "callId")
val windowResult3 = decodeWindow(channel.receive() as KrpcGenericMessage)
- assertEquals(Int.MAX_VALUE, (windowResult3 as WindowResult.Success).update)
+ assertEquals(Int.MAX_VALUE, (windowResult3 as WindowResult.Success).update)
acting.broadcastWindowUpdate(Int.MIN_VALUE, null, "service", "callId")
val windowResult4 = decodeWindow(channel.receive() as KrpcGenericMessage)
- assertEquals(Int.MIN_VALUE, (windowResult4 as WindowResult.Success).update)
+ assertEquals(Int.MIN_VALUE, (windowResult4 as WindowResult.Success).update)
}
@Test
@@ -107,97 +96,6 @@ internal class KrpcReceiveHandlerTest : KrpcReceiveHandlerBaseTest() {
val message2 = (channel.receive() as KrpcCallMessage.CallException).cause.deserialize().message.orEmpty()
assertTrue(message2.contains("1 messages were unprocessed"), message2)
}
-
- @OptIn(ExperimentalCoroutinesApi::class)
- @Test
- fun stressActing() {
- val actorJob = Job()
- val collected = mutableListOf()
- val bufferSize = stressBufferSize
-
- runActingTest(
- callTimeOut = 10.seconds,
- bufferSize = bufferSize,
- callHandler = { collected.add(it) },
- timeout = 360.seconds,
- ) { acting ->
- val sendChannel = Channel(Channel.UNLIMITED)
- val sender = KrpcSendHandler(sendChannel)
- sender.updateWindowSize(bufferSize)
-
- val windowJob = launch {
- while (true) {
- val window = when (val message = channel.receive()) {
- is KrpcCallMessage.CallException -> fail(
- "Unexpected call exception",
- message.cause.deserialize()
- )
-
- is KrpcGenericMessage -> decodeWindow(message)
- else -> fail("Unexpected message: $message")
- }
-
- sender.updateWindowSize((window as WindowResult.Success).update)
- }
- }
-
- val senderJob = launch {
- while (true) {
- val message = sendChannel.receive() as KrpcTransportMessage.StringMessage
-
- acting.handle(message.value.asCallMessage("1")) {
- fail(
- "Unexpected onMessageFailure call, " +
- "window: ${sender.window}, collected: ${collected.size}\"",
- it
- )
- }.onFailure {
- fail(
- "Unexpected onFailure call, " +
- "window: ${sender.window}, collected: ${collected.size}"
- )
- }.onClosed {
- fail(
- "Unexpected onClosed call, " +
- "window: ${sender.window}, collected: ${collected.size}\"",
- it
- )
- }
- }
- }
-
- val counter = Counter()
- val printJob = launch {
- while (true) {
- withContext(Dispatchers.Default) {
- delay(5.seconds)
- }
- println("Collected: ${collected.size}, launches: ${counter.launches.value}, total: ${counter.total.value}")
- }
- }
-
- val iterations = stressIterations
- List(iterations) {
- launch {
- repeat(100) {
- sender.sendMessage(KrpcTransportMessage.StringMessage("Hello"))
- counter.total.incrementAndGet()
- }
- counter.launches.incrementAndGet()
- }
- }.joinAll()
-
- while (!buffer.channel.isEmpty && sender.window != bufferSize) {
- yield()
- }
-
- assertEquals(iterations * 100, collected.size)
- actorJob.cancelAndJoin()
- senderJob.cancelAndJoin()
- windowJob.cancelAndJoin()
- printJob.cancelAndJoin()
- }
- }
}
internal abstract class KrpcReceiveHandlerBaseTest {
diff --git a/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcSendHandlerTest.kt b/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcSendHandlerTest.kt
index 759c276e4..c17880590 100644
--- a/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcSendHandlerTest.kt
+++ b/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/KrpcSendHandlerTest.kt
@@ -111,7 +111,7 @@ internal abstract class KrpcSendHandlerBaseTest {
}
protected fun runTest(
- timeout: Duration = 10.seconds,
+ timeout: Duration = 30.seconds,
body: suspend TestScope.(Channel, KrpcSendHandler) -> Unit,
) = runTestWithCoroutinesProbes(timeout = timeout) {
val channel = Channel(
diff --git a/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/stress/KrpcReceiveHandlerStressTest.kt b/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/stress/KrpcReceiveHandlerStressTest.kt
new file mode 100644
index 000000000..ccea1e936
--- /dev/null
+++ b/krpc/krpc-core/src/commonTest/kotlin/kotlinx/rpc/krpc/stress/KrpcReceiveHandlerStressTest.kt
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2023-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.rpc.krpc.stress
+
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.cancelAndJoin
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.joinAll
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.withContext
+import kotlinx.coroutines.yield
+import kotlinx.rpc.krpc.KrpcReceiveHandlerBaseTest
+import kotlinx.rpc.krpc.KrpcTransportMessage
+import kotlinx.rpc.krpc.internal.KrpcCallMessage
+import kotlinx.rpc.krpc.internal.KrpcGenericMessage
+import kotlinx.rpc.krpc.internal.KrpcMessage
+import kotlinx.rpc.krpc.internal.KrpcSendHandler
+import kotlinx.rpc.krpc.internal.WindowResult
+import kotlinx.rpc.krpc.internal.decodeWindow
+import kotlinx.rpc.krpc.internal.deserialize
+import kotlinx.rpc.krpc.internal.onClosed
+import kotlinx.rpc.krpc.internal.onFailure
+import kotlinx.rpc.krpc.stressBufferSize
+import kotlinx.rpc.krpc.stressIterations
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.fail
+import kotlin.time.Duration.Companion.minutes
+import kotlin.time.Duration.Companion.seconds
+
+internal class KrpcReceiveHandlerStressTest : KrpcReceiveHandlerBaseTest() {
+ @OptIn(ExperimentalCoroutinesApi::class)
+ @Test
+ fun stressActing() {
+ val actorJob = Job()
+ val collected = mutableListOf()
+ val bufferSize = stressBufferSize
+
+ runActingTest(
+ callTimeOut = 10.seconds,
+ bufferSize = bufferSize,
+ callHandler = { collected.add(it) },
+ timeout = 10.minutes,
+ ) { acting ->
+ val sendChannel = Channel(Channel.UNLIMITED)
+ val sender = KrpcSendHandler(sendChannel)
+ sender.updateWindowSize(bufferSize)
+
+ val windowJob = launch {
+ while (true) {
+ val window = when (val message = channel.receive()) {
+ is KrpcCallMessage.CallException -> fail(
+ "Unexpected call exception",
+ message.cause.deserialize()
+ )
+
+ is KrpcGenericMessage -> decodeWindow(message)
+ else -> fail("Unexpected message: $message")
+ }
+
+ sender.updateWindowSize((window as WindowResult.Success).update)
+ }
+ }
+
+ val senderJob = launch {
+ while (true) {
+ val message = sendChannel.receive() as KrpcTransportMessage.StringMessage
+
+ acting.handle(message.value.asCallMessage("1")) {
+ fail(
+ "Unexpected onMessageFailure call, " +
+ "window: ${sender.window}, collected: ${collected.size}\"",
+ it
+ )
+ }.onFailure {
+ fail(
+ "Unexpected onFailure call, " +
+ "window: ${sender.window}, collected: ${collected.size}"
+ )
+ }.onClosed {
+ fail(
+ "Unexpected onClosed call, " +
+ "window: ${sender.window}, collected: ${collected.size}\"",
+ it
+ )
+ }
+ }
+ }
+
+ val counter = Counter()
+ val printJob = launch {
+ while (true) {
+ withContext(Dispatchers.Default) {
+ delay(5.seconds)
+ }
+ println(
+ "Collected: ${collected.size}, " +
+ "launches: ${counter.launches.value}, " +
+ "total: ${counter.total.value}"
+ )
+ }
+ }
+
+ val iterations = stressIterations
+ List(iterations) {
+ launch {
+ repeat(100) {
+ sender.sendMessage(KrpcTransportMessage.StringMessage("Hello"))
+ counter.total.incrementAndGet()
+ }
+ counter.launches.incrementAndGet()
+ }
+ }.joinAll()
+
+ while (!buffer.channel.isEmpty && sender.window != bufferSize) {
+ yield()
+ }
+
+ assertEquals(iterations * 100, collected.size)
+ actorJob.cancelAndJoin()
+ senderJob.cancelAndJoin()
+ windowJob.cancelAndJoin()
+ printJob.cancelAndJoin()
+ }
+ }
+}
diff --git a/krpc/krpc-core/src/jsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.js.kt b/krpc/krpc-core/src/jsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.js.kt
index 030017bfa..50e78407f 100644
--- a/krpc/krpc-core/src/jsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.js.kt
+++ b/krpc/krpc-core/src/jsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.js.kt
@@ -23,6 +23,6 @@ internal actual class DeserializedException actual constructor(
internal actual fun Throwable.stackElements(): List = emptyList()
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
- return nonJvmManualCancellationExceptionDeserialize()
+ return cancellationExceptionDeserialize()
?: DeserializedException(toStringMessage, message, stacktrace, cause, className)
}
diff --git a/krpc/krpc-core/src/jvmMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.jvm.kt b/krpc/krpc-core/src/jvmMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.jvm.kt
index 9c2341324..cb30e46dc 100644
--- a/krpc/krpc-core/src/jvmMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.jvm.kt
+++ b/krpc/krpc-core/src/jvmMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.jvm.kt
@@ -37,6 +37,11 @@ internal actual fun Throwable.stackElements(): List = stackTrace.m
}
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
+ val cancellationException = cancellationExceptionDeserialize()
+ if (cancellationException != null) {
+ return cancellationException
+ }
+
try {
val clazz = Class.forName(className)
val fieldsCount = clazz.fieldsCountOrDefault(throwableFields)
diff --git a/krpc/krpc-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.jvm.kt b/krpc/krpc-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.jvm.kt
index e348e5539..ee56447be 100644
--- a/krpc/krpc-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.jvm.kt
+++ b/krpc/krpc-core/src/jvmTest/kotlin/kotlinx/rpc/krpc/KrpcReceiveHandlerTest.jvm.kt
@@ -4,5 +4,5 @@
package kotlinx.rpc.krpc
-internal actual val stressIterations: Int = 10_000
-internal actual val stressBufferSize: Int = 500
+internal actual val stressIterations: Int = 8_000
+internal actual val stressBufferSize: Int = 1000
diff --git a/krpc/krpc-core/src/nativeMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.native.kt b/krpc/krpc-core/src/nativeMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.native.kt
index 07abac13c..c2d90980a 100644
--- a/krpc/krpc-core/src/nativeMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.native.kt
+++ b/krpc/krpc-core/src/nativeMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.native.kt
@@ -22,6 +22,6 @@ internal actual class DeserializedException actual constructor(
internal actual fun Throwable.stackElements(): List = emptyList()
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
- return nonJvmManualCancellationExceptionDeserialize()
+ return cancellationExceptionDeserialize()
?: DeserializedException(toStringMessage, message, stacktrace, cause, className)
}
diff --git a/krpc/krpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasm.kt b/krpc/krpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasm.kt
index 030017bfa..50e78407f 100644
--- a/krpc/krpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasm.kt
+++ b/krpc/krpc-core/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasm.kt
@@ -23,6 +23,6 @@ internal actual class DeserializedException actual constructor(
internal actual fun Throwable.stackElements(): List = emptyList()
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
- return nonJvmManualCancellationExceptionDeserialize()
+ return cancellationExceptionDeserialize()
?: DeserializedException(toStringMessage, message, stacktrace, cause, className)
}
diff --git a/krpc/krpc-core/src/wasmWasiMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasmWasi.kt b/krpc/krpc-core/src/wasmWasiMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasmWasi.kt
index f8d294438..0ac5301c2 100644
--- a/krpc/krpc-core/src/wasmWasiMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasmWasi.kt
+++ b/krpc/krpc-core/src/wasmWasiMain/kotlin/kotlinx/rpc/krpc/internal/ExceptionUtils.wasmWasi.kt
@@ -20,6 +20,6 @@ internal actual class DeserializedException actual constructor(
internal actual fun Throwable.stackElements(): List = emptyList()
internal actual fun SerializedException.deserializeUnsafe(): Throwable {
- return nonJvmManualCancellationExceptionDeserialize()
+ return cancellationExceptionDeserialize()
?: DeserializedException(toStringMessage, message, stacktrace, cause, className)
}
diff --git a/krpc/krpc-logging/api/krpc-logging.klib.api b/krpc/krpc-logging/api/krpc-logging.klib.api
index 7d61e2883..39de30275 100644
--- a/krpc/krpc-logging/api/krpc-logging.klib.api
+++ b/krpc/krpc-logging/api/krpc-logging.klib.api
@@ -1,5 +1,5 @@
// Klib ABI Dump
-// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm64, watchosSimulatorArm64, watchosX64]
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
diff --git a/krpc/krpc-serialization/krpc-serialization-cbor/api/krpc-serialization-cbor.klib.api b/krpc/krpc-serialization/krpc-serialization-cbor/api/krpc-serialization-cbor.klib.api
index 58969ac66..7fdcb807a 100644
--- a/krpc/krpc-serialization/krpc-serialization-cbor/api/krpc-serialization-cbor.klib.api
+++ b/krpc/krpc-serialization/krpc-serialization-cbor/api/krpc-serialization-cbor.klib.api
@@ -1,5 +1,5 @@
// Klib ABI Dump
-// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm64, watchosSimulatorArm64, watchosX64]
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
diff --git a/krpc/krpc-serialization/krpc-serialization-core/api/krpc-serialization-core.klib.api b/krpc/krpc-serialization/krpc-serialization-core/api/krpc-serialization-core.klib.api
index 61c60f2c3..af5851dad 100644
--- a/krpc/krpc-serialization/krpc-serialization-core/api/krpc-serialization-core.klib.api
+++ b/krpc/krpc-serialization/krpc-serialization-core/api/krpc-serialization-core.klib.api
@@ -1,5 +1,5 @@
// Klib ABI Dump
-// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm64, watchosSimulatorArm64, watchosX64]
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
diff --git a/krpc/krpc-serialization/krpc-serialization-json/api/krpc-serialization-json.klib.api b/krpc/krpc-serialization/krpc-serialization-json/api/krpc-serialization-json.klib.api
index ef64b100d..13c68f391 100644
--- a/krpc/krpc-serialization/krpc-serialization-json/api/krpc-serialization-json.klib.api
+++ b/krpc/krpc-serialization/krpc-serialization-json/api/krpc-serialization-json.klib.api
@@ -1,5 +1,5 @@
// Klib ABI Dump
-// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm64, watchosSimulatorArm64, watchosX64]
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
diff --git a/krpc/krpc-serialization/krpc-serialization-protobuf/api/krpc-serialization-protobuf.klib.api b/krpc/krpc-serialization/krpc-serialization-protobuf/api/krpc-serialization-protobuf.klib.api
index beda43f84..746e0b1b3 100644
--- a/krpc/krpc-serialization/krpc-serialization-protobuf/api/krpc-serialization-protobuf.klib.api
+++ b/krpc/krpc-serialization/krpc-serialization-protobuf/api/krpc-serialization-protobuf.klib.api
@@ -1,5 +1,5 @@
// Klib ABI Dump
-// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm64, watchosSimulatorArm64, watchosX64]
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
diff --git a/krpc/krpc-server/api/krpc-server.klib.api b/krpc/krpc-server/api/krpc-server.klib.api
index 05eca67ef..36a889bf3 100644
--- a/krpc/krpc-server/api/krpc-server.klib.api
+++ b/krpc/krpc-server/api/krpc-server.klib.api
@@ -1,5 +1,5 @@
// Klib ABI Dump
-// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, watchosArm64, watchosSimulatorArm64, watchosX64]
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm64, watchosSimulatorArm64, watchosX64]
// Rendering settings:
// - Signature version: 2
// - Show manifest properties: true
diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt
index 14d66b16e..a21f4e56e 100644
--- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt
+++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/KrpcServer.kt
@@ -31,7 +31,6 @@ import kotlin.reflect.KClass
* @param transport [KrpcTransport] instance that will be used to send and receive RPC messages.
* IMPORTANT: Must be exclusive to this server, otherwise unexpected behavior may occur.
*/
-@OptIn(InternalCoroutinesApi::class)
public abstract class KrpcServer(
private val config: KrpcConfig.Server,
transport: KrpcTransport,
@@ -87,7 +86,7 @@ public abstract class KrpcServer(
private var cancelledByClient = false
init {
- internalScope.coroutineContext.job.invokeOnCompletion(onCancelling = true) {
+ internalScope.coroutineContext.job.invokeOnCompletion {
if (!cancelledByClient) {
sendCancellation(CancellationType.ENDPOINT, null, null, closeTransportAfterSending = true)
}
diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt
index 3eee4b6f1..9b41bd762 100644
--- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt
+++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/KrpcServerService.kt
@@ -5,7 +5,6 @@
package kotlinx.rpc.krpc.server.internal
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.rpc.annotations.Rpc
import kotlinx.rpc.descriptor.RpcInvokator
@@ -142,6 +141,7 @@ internal class KrpcServerService<@Rpc T : Any>(
var failure: Throwable? = null
val requestJob = serverScope.launch(start = CoroutineStart.LAZY) {
+ var startedCollecting = false
try {
val markedNonSuspending = callData.pluginParams.orEmpty()
.contains(KrpcPluginKey.NON_SUSPENDING_SERVER_FLOW_MARKER)
@@ -181,31 +181,39 @@ internal class KrpcServerService<@Rpc T : Any>(
)
}
+ startedCollecting = true
sendFlowMessages(serialFormat, returnSerializer, value, callData)
} else {
sendMessageValue(serialFormat, returnSerializer, value, callData)
}
} catch (cause: CancellationException) {
- currentCoroutineContext().ensureActive()
+ serverScope.ensureActive()
+ val request = requestMap[callId]
+ if (request == null || request.serviceCancelled) {
+ throw cause
+ }
- val wrapped = ManualCancellationException(cause)
+ failure = cause
- failure = wrapped
+ throw cause
} catch (cause: Throwable) {
failure = cause
} finally {
if (failure != null) {
- val serializedCause = serializeException(failure)
- val exceptionMessage = KrpcCallMessage.CallException(
- callId = callId,
- serviceType = descriptor.fqName,
- cause = serializedCause,
- connectionId = callData.connectionId,
- serviceId = callData.serviceId,
- )
+ // flow cancellations are handled by the sendFlowMessages function
+ if (!startedCollecting || !callable.isNonSuspendFunction) {
+ val serializedCause = serializeException(failure)
+ val exceptionMessage = KrpcCallMessage.CallException(
+ callId = callId,
+ serviceType = descriptor.fqName,
+ cause = serializedCause,
+ connectionId = callData.connectionId,
+ serviceId = callData.serviceId,
+ )
- connector.sendMessageChecked(exceptionMessage) {
- // ignore, the client probably already disconnected
+ connector.sendMessageChecked(exceptionMessage) {
+ // ignore, the client probably already disconnected
+ }
}
closeReceiving(callId, "Server request failed", failure, fromJob = true)
@@ -263,12 +271,14 @@ internal class KrpcServerService<@Rpc T : Any>(
connector.sendMessage(result)
}
+ @Suppress("detekt.ThrowsCount")
private suspend fun sendFlowMessages(
serialFormat: SerialFormat,
returnSerializer: KSerializer,
flow: Flow,
callData: KrpcCallMessage.CallData,
) {
+ var failure: Throwable? = null
try {
flow.collect { value ->
val result = when (serialFormat) {
@@ -316,20 +326,34 @@ internal class KrpcServerService<@Rpc T : Any>(
// do nothing
}
} catch (cause: CancellationException) {
+ serverScope.ensureActive()
+ val request = requestMap[callData.callId]
+ if (request == null || request.serviceCancelled) {
+ throw cause
+ }
+
+ failure = cause
+
throw cause
} catch (cause: Throwable) {
- val serializedCause = serializeException(cause)
- connector.sendMessageChecked(
- KrpcCallMessage.StreamCancel(
- callId = callData.callId,
- serviceType = descriptor.fqName,
- connectionId = callData.connectionId,
- serviceId = callData.serviceId,
- streamId = SINGLE_STREAM_ID,
- cause = serializedCause,
- )
- ) {
- // do nothing
+ failure = cause
+
+ throw cause
+ } finally {
+ if (failure != null) {
+ val serializedCause = serializeException(failure)
+ connector.sendMessageChecked(
+ KrpcCallMessage.StreamCancel(
+ callId = callData.callId,
+ serviceType = descriptor.fqName,
+ connectionId = callData.connectionId,
+ serviceId = callData.serviceId,
+ streamId = SINGLE_STREAM_ID,
+ cause = serializedCause,
+ )
+ ) {
+ // do nothing
+ }
}
}
}
@@ -394,12 +418,18 @@ internal class KrpcServerService<@Rpc T : Any>(
}
internal class RpcRequest(val handlerJob: Job, val streamContext: ServerStreamContext) {
+ // not user cancelled
+ var serviceCancelled: Boolean = false
+ private set
+
fun cancelAndClose(
callId: String,
message: String? = null,
cause: Throwable? = null,
fromJob: Boolean = false,
) {
+ serviceCancelled = true
+
if (!handlerJob.isCompleted && !fromJob) {
when {
message != null && cause != null -> handlerJob.cancel(message, cause)
diff --git a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt
index 7d99e1ff9..15c4ef9b8 100644
--- a/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt
+++ b/krpc/krpc-server/src/commonMain/kotlin/kotlinx/rpc/krpc/server/internal/ServerStreamContext.kt
@@ -5,6 +5,7 @@
package kotlinx.rpc.krpc.server.internal
import kotlinx.atomicfu.atomic
+import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
@@ -49,6 +50,7 @@ internal class ServerStreamContext {
fun removeCall(callId: String, cause: Throwable?) {
streams.remove(callId)?.values?.forEach {
it.channel.close(cause)
+ it.channel.cancel(cause?.let { e -> e as? CancellationException ?: CancellationException(null, e) })
}
}
diff --git a/krpc/krpc-test/build.gradle.kts b/krpc/krpc-test/build.gradle.kts
index 092aefdaa..5588cfd26 100644
--- a/krpc/krpc-test/build.gradle.kts
+++ b/krpc/krpc-test/build.gradle.kts
@@ -92,6 +92,11 @@ tasks.withType {
}
}
+// sporadic failures in CI, probably not kRPC related
+tasks.withType {
+ onlyIf { !name.contains("browser", ignoreCase = true) }
+}
+
val resourcesPath = projectDir.resolve("src/jvmTest/resources")
val tmpExt = "tmp"
val goldExt = "gold"
diff --git a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt
index bc9fb16c3..3f98c13f4 100644
--- a/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt
+++ b/krpc/krpc-test/src/commonMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.kt
@@ -113,17 +113,19 @@ abstract class KrpcTransportTestBase {
@Test
fun nonSuspendErrorOnEmit() = runTest {
- val flow = client.nonSuspendFlowErrorOnReturn()
- assertFails {
+ val flow = client.nonSuspendFlowErrorOnEmit()
+ val failure = assertFails {
flow.toList()
}
+ assertFalse(failure is CancellationException)
}
@Test
fun nonSuspendErrorOnReturn() = runTest {
- assertFails {
+ val failure = assertFails {
client.nonSuspendFlowErrorOnReturn().toList()
}
+ assertFalse(failure is CancellationException)
}
@Test
@@ -336,7 +338,9 @@ abstract class KrpcTransportTestBase {
}
@Test
- fun RPC_should_be_able_to_receive_100_000_ints_with_batching_in_reasonable_time() = runTest(timeout = EXTENDED_TIMEOUT) {
+ fun RPC_should_be_able_to_receive_100_000_ints_with_batching_in_reasonable_time() = runTest(
+ timeout = EXTENDED_TIMEOUT,
+ ) {
val n = iterations_100_000
assertEquals(client.getNIntsBatched(n).last().last(), n)
}
@@ -440,8 +444,8 @@ abstract class KrpcTransportTestBase {
@Test
fun rpc_continuation_is_called_in_the_correct_scope_and_doesnt_block_other_rpcs() = runTest {
- if (isJs) {
- println("Test is skipped on JS, because it doesn't support multiple threads.")
+ if (platform.isJs() || platform == Platform.WASI) {
+ println("Test is skipped on JS/WASM, because they don't support multiple threads.")
return@runTest
}
@@ -506,7 +510,14 @@ abstract class KrpcTransportTestBase {
}
}
-private val EXTENDED_TIMEOUT = if (isJs) 500.seconds else 200.seconds
+private val EXTENDED_TIMEOUT = if (platform.isJs()) 500.seconds else 200.seconds
+
+@Suppress("unused")
+internal enum class Platform {
+ JVM, JS, NATIVE, WASM_JS, WASI;
+
+ fun isJs(): Boolean = this == JS || this == WASM_JS
+}
-internal expect val isJs: Boolean
+internal expect val platform: Platform
internal expect val iterations_100_000 : Int
diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt
index 947cb81ef..0e26294b5 100644
--- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt
+++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationService.kt
@@ -21,7 +21,7 @@ interface CancellationService {
fun cancellationInIncomingStream(): Flow
- suspend fun cancellationInOutgoingStream(stream: Flow, cancelled: Flow)
+ suspend fun cancellationInOutgoingStream(cancelled: Flow)
suspend fun outgoingStream(stream: Flow)
@@ -74,16 +74,12 @@ class CancellationServiceImpl : CancellationService {
}
}
- override suspend fun cancellationInOutgoingStream(stream: Flow, cancelled: Flow) {
+ override suspend fun cancellationInOutgoingStream(cancelled: Flow) {
supervisorScope {
- launch {
- consume(stream)
- }
-
launch {
try {
cancelled.collect {
- if (it == 0) {
+ if (it == 1) {
firstIncomingConsumed.complete(it)
}
}
@@ -116,6 +112,7 @@ class CancellationServiceImpl : CancellationService {
override suspend fun outgoingStreamWithDelayedResponse(stream: Flow) {
try {
+ waitCounter.increment()
consume(stream)
fence.await()
diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt
index d314c7729..0453ac3be 100644
--- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt
+++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationTest.kt
@@ -10,6 +10,8 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.toList
+import kotlinx.rpc.krpc.test.Platform
+import kotlinx.rpc.krpc.test.platform
import kotlinx.rpc.withService
import kotlin.test.Ignore
import kotlin.test.Test
@@ -99,6 +101,10 @@ class CancellationTest {
@Test
fun testCancellationInServerStream() = runCancellationTest {
+ if (platform.isJs() || platform == Platform.WASI) {
+ return@runCancellationTest
+ }
+
supervisorScope {
var ex: CancellationException? = null
val requestJob = launch {
@@ -125,13 +131,7 @@ class CancellationTest {
supervisorScope {
val requestJob = launch {
service.cancellationInOutgoingStream(
- stream = flow {
- emit(42)
- println("[testCancellationInClientStream] emit 42")
- emit(43)
- println("[testCancellationInClientStream] emit 43")
- },
- cancelled = flow {
+ flow {
emit(1)
println("[testCancellationInClientStream] emit 1")
serverInstance().firstIncomingConsumed.await()
@@ -143,11 +143,8 @@ class CancellationTest {
requestJob.join()
println("[testCancellationInClientStream] Request job finished")
- serverInstance().consumedAll.await()
- println("[testCancellationInClientStream] Server consumed all")
assertFalse(requestJob.isCancelled, "Expected requestJob not to be cancelled")
- assertContentEquals(listOf(42, 43), serverInstance().consumedIncomingValues)
}
println("[testCancellationInClientStream] Scope finished")
@@ -174,7 +171,7 @@ class CancellationTest {
}
val clientFlowJob = launch {
- service.outgoingStream(flow {
+ service.outgoingStreamWithDelayedResponse(flow {
emit(0)
println("[testCancelClient] emit 0")
serverInstance().fence.await()
@@ -244,7 +241,7 @@ class CancellationTest {
}
val clientFlowJob = launch {
- service.outgoingStream(flow {
+ service.outgoingStreamWithDelayedResponse(flow {
emit(0)
println("[testCancelServer] emit 0")
serverInstance().fence.await()
@@ -354,6 +351,10 @@ class CancellationTest {
@Test
fun testRequestCancellationCancelsStream() = runCancellationTest {
+ if (platform.isJs() || platform == Platform.WASI) {
+ return@runCancellationTest
+ }
+
val fence = CompletableDeferred()
val job = launch {
@@ -378,6 +379,10 @@ class CancellationTest {
@Test
fun testRequestCancellationCancelsStreamButNotOthers() = runCancellationTest {
+ if (platform.isJs() || platform == Platform.WASI) {
+ return@runCancellationTest
+ }
+
val fence = CompletableDeferred()
val job = launch {
service.outgoingStreamWithDelayedResponse(resumableFlow(fence))
diff --git a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt
index 2002ef9dc..097567b3c 100644
--- a/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt
+++ b/krpc/krpc-test/src/commonTest/kotlin/kotlinx/rpc/krpc/test/cancellation/CancellationToolkit.kt
@@ -22,7 +22,7 @@ import kotlinx.rpc.withService
import kotlin.time.Duration.Companion.seconds
fun runCancellationTest(body: suspend CancellationToolkit.() -> Unit): TestResult {
- return runTestWithCoroutinesProbes(timeout = 15.seconds) {
+ return runTestWithCoroutinesProbes(timeout = 30.seconds) {
val toolkit = CancellationToolkit(this)
try {
body(toolkit)
diff --git a/krpc/krpc-test/src/jsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.js.kt b/krpc/krpc-test/src/jsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.js.kt
index a28ddd4d4..3e5d9022d 100644
--- a/krpc/krpc-test/src/jsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.js.kt
+++ b/krpc/krpc-test/src/jsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.js.kt
@@ -4,5 +4,5 @@
package kotlinx.rpc.krpc.test
-actual val isJs: Boolean = true
+internal actual val platform: Platform = Platform.JS
internal actual val iterations_100_000: Int = 10_000
diff --git a/krpc/krpc-test/src/jvmMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.jvm.kt b/krpc/krpc-test/src/jvmMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.jvm.kt
index caf04c984..b697e3fc4 100644
--- a/krpc/krpc-test/src/jvmMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.jvm.kt
+++ b/krpc/krpc-test/src/jvmMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.jvm.kt
@@ -4,5 +4,5 @@
package kotlinx.rpc.krpc.test
-actual val isJs: Boolean = false
+internal actual val platform: Platform = Platform.JVM
internal actual val iterations_100_000: Int = 100_000
diff --git a/krpc/krpc-test/src/nativeMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.native.kt b/krpc/krpc-test/src/nativeMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.native.kt
index caf04c984..983a97ca0 100644
--- a/krpc/krpc-test/src/nativeMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.native.kt
+++ b/krpc/krpc-test/src/nativeMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.native.kt
@@ -4,5 +4,5 @@
package kotlinx.rpc.krpc.test
-actual val isJs: Boolean = false
+internal actual val platform: Platform = Platform.NATIVE
internal actual val iterations_100_000: Int = 100_000
diff --git a/krpc/krpc-test/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.wasmJs.kt b/krpc/krpc-test/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.wasmJs.kt
index a28ddd4d4..11022d9a2 100644
--- a/krpc/krpc-test/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.wasmJs.kt
+++ b/krpc/krpc-test/src/wasmJsMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.wasmJs.kt
@@ -4,5 +4,5 @@
package kotlinx.rpc.krpc.test
-actual val isJs: Boolean = true
+internal actual val platform: Platform = Platform.WASM_JS
internal actual val iterations_100_000: Int = 10_000
diff --git a/krpc/krpc-test/src/wasmWasiMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.wasmWasi.kt b/krpc/krpc-test/src/wasmWasiMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.wasmWasi.kt
index 14eaab9e7..b9439ad64 100644
--- a/krpc/krpc-test/src/wasmWasiMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.wasmWasi.kt
+++ b/krpc/krpc-test/src/wasmWasiMain/kotlin/kotlinx/rpc/krpc/test/KrpcTransportTestBase.wasmWasi.kt
@@ -4,5 +4,5 @@
package kotlinx.rpc.krpc.test
-internal actual val isJs: Boolean = true
-internal actual val iterations_100_000: Int = 100_000
+internal actual val platform: Platform = Platform.WASI
+internal actual val iterations_100_000: Int = 10_000
diff --git a/samples/grpc-app/build.gradle.kts b/samples/grpc-app/build.gradle.kts
index ebcbf809a..7b612ffe9 100644
--- a/samples/grpc-app/build.gradle.kts
+++ b/samples/grpc-app/build.gradle.kts
@@ -4,7 +4,7 @@
plugins {
kotlin("jvm") version "2.2.0"
- id("org.jetbrains.kotlinx.rpc.plugin") version "0.10.0-grpc-121"
+ id("org.jetbrains.kotlinx.rpc.plugin") version "0.10.0-grpc-127"
}
group = "kotlinx.rpc.sample"
@@ -20,10 +20,10 @@ kotlin {
}
dependencies {
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-grpc-core:0.10.0-grpc-121")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-grpc-core:0.10.0-grpc-127")
implementation("ch.qos.logback:logback-classic:1.5.18")
- implementation("io.grpc:grpc-netty:1.73.0")
- implementation("io.grpc:grpc-kotlin-stub:1.4.1")
+ implementation("io.grpc:grpc-netty:1.75.0")
+ implementation("io.grpc:grpc-kotlin-stub:1.5.0")
}
rpc {
diff --git a/samples/grpc-app/gradle/wrapper/gradle-wrapper.jar b/samples/grpc-app/gradle/wrapper/gradle-wrapper.jar
index 9bbc975c7..1b33c55ba 100644
Binary files a/samples/grpc-app/gradle/wrapper/gradle-wrapper.jar and b/samples/grpc-app/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/samples/grpc-app/gradle/wrapper/gradle-wrapper.properties b/samples/grpc-app/gradle/wrapper/gradle-wrapper.properties
index ff23a68d7..d4081da47 100644
--- a/samples/grpc-app/gradle/wrapper/gradle-wrapper.properties
+++ b/samples/grpc-app/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
diff --git a/samples/grpc-app/gradlew b/samples/grpc-app/gradlew
index faf93008b..23d15a936 100755
--- a/samples/grpc-app/gradlew
+++ b/samples/grpc-app/gradlew
@@ -114,7 +114,7 @@ case "$( uname )" in #(
NONSTOP* ) nonstop=true ;;
esac
-CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+CLASSPATH="\\\"\\\""
# Determine the Java command to use to start the JVM.
@@ -213,7 +213,7 @@ DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
set -- \
"-Dorg.gradle.appname=$APP_BASE_NAME" \
-classpath "$CLASSPATH" \
- org.gradle.wrapper.GradleWrapperMain \
+ -jar "$APP_HOME/gradle/wrapper/gradle-wrapper.jar" \
"$@"
# Stop when "xargs" is not available.
diff --git a/samples/grpc-app/gradlew.bat b/samples/grpc-app/gradlew.bat
index 9d21a2183..db3a6ac20 100644
--- a/samples/grpc-app/gradlew.bat
+++ b/samples/grpc-app/gradlew.bat
@@ -70,11 +70,11 @@ goto fail
:execute
@rem Setup the command line
-set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+set CLASSPATH=
@rem Execute Gradle
-"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %*
:end
@rem End local scope for the variables with windows NT shell
diff --git a/samples/ktor-all-platforms-app/gradle/libs.versions.toml b/samples/ktor-all-platforms-app/gradle/libs.versions.toml
index 10bbab110..418201666 100644
--- a/samples/ktor-all-platforms-app/gradle/libs.versions.toml
+++ b/samples/ktor-all-platforms-app/gradle/libs.versions.toml
@@ -1,25 +1,25 @@
[versions]
kotlin = "2.2.0"
-agp = "8.11.0"
+agp = "8.13.0"
android-compileSdk = "36"
android-minSdk = "24"
android-targetSdk = "36"
-androidx-activityCompose = "1.10.1"
+androidx-activityCompose = "1.11.0"
androidx-appcompat = "1.7.1"
androidx-constraintlayout = "2.2.1"
-androidx-core-ktx = "1.16.0"
-androidx-espresso-core = "3.6.1"
-androidx-material = "1.12.0"
-androidx-test-junit = "1.2.1"
-compose = "1.8.3"
-compose-plugin = "1.8.2"
+androidx-core-ktx = "1.17.0"
+androidx-espresso-core = "3.7.0"
+androidx-material = "1.13.0"
+androidx-test-junit = "1.3.0"
+compose = "1.9.1"
+compose-plugin = "1.9.0"
junit = "4.13.2"
-ktor = "3.2.1"
+ktor = "3.3.0"
logback = "1.5.18"
-serialization = "1.8.1"
+serialization = "1.9.0"
coroutines = "1.10.2"
-kotlinx-rpc = "0.9.1"
+kotlinx-rpc = "0.10.0"
[libraries]
# kotlin
diff --git a/samples/ktor-all-platforms-app/gradle/wrapper/gradle-wrapper.properties b/samples/ktor-all-platforms-app/gradle/wrapper/gradle-wrapper.properties
index ff23a68d7..d4081da47 100644
--- a/samples/ktor-all-platforms-app/gradle/wrapper/gradle-wrapper.properties
+++ b/samples/ktor-all-platforms-app/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
diff --git a/samples/ktor-android-app/gradle/libs.versions.toml b/samples/ktor-android-app/gradle/libs.versions.toml
index 284370740..7b912abc0 100644
--- a/samples/ktor-android-app/gradle/libs.versions.toml
+++ b/samples/ktor-android-app/gradle/libs.versions.toml
@@ -1,21 +1,21 @@
[versions]
-agp = "8.11.0-alpha07"
+agp = "8.13.0"
kotlin = "2.2.0"
-androidx-activityCompose = "1.10.1"
+androidx-activityCompose = "1.11.0"
androidx-appcompat = "1.7.1"
androidx-constraintlayout = "2.2.1"
-androidx-core-ktx = "1.16.0"
-androidx-test-junit = "1.2.1"
-compose = "1.8.3"
+androidx-core-ktx = "1.17.0"
+androidx-test-junit = "1.3.0"
+compose = "1.9.1"
compose-plugin = "1.5.14" # https://mvnrepository.com/artifact/androidx.compose.compiler/compiler
-compose-bom = "2025.06.01"
+compose-bom = "2025.09.00"
material3 = "1.3.2"
junit = "4.13.2"
-ktor = "3.2.1"
-kotlinx-serialization-json = "1.8.1"
+ktor = "3.3.0"
+kotlinx-serialization-json = "1.9.0"
kotlinx-coroutines-core = "1.10.2"
logback = "1.5.18"
-kotlinx-rpc = "0.9.1"
+kotlinx-rpc = "0.10.0"
[libraries]
# kotlin
diff --git a/samples/ktor-android-app/gradle/wrapper/gradle-wrapper.properties b/samples/ktor-android-app/gradle/wrapper/gradle-wrapper.properties
index ff23a68d7..d4081da47 100644
--- a/samples/ktor-android-app/gradle/wrapper/gradle-wrapper.properties
+++ b/samples/ktor-android-app/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
diff --git a/samples/ktor-web-app/gradle/libs.versions.toml b/samples/ktor-web-app/gradle/libs.versions.toml
index 27366548f..a25b6d425 100644
--- a/samples/ktor-web-app/gradle/libs.versions.toml
+++ b/samples/ktor-web-app/gradle/libs.versions.toml
@@ -1,11 +1,11 @@
[versions]
kotlin = "2.2.0"
-kotlin-wrappers-bom = "2025.6.11"
-ktor = "3.2.1"
-kotlinx-serialization-json = "1.8.1"
+kotlin-wrappers-bom = "2025.9.8"
+ktor = "3.3.0"
+kotlinx-serialization-json = "1.9.0"
kotlinx-coroutines-core = "1.10.2"
logback = "1.5.18"
-kotlinx-rpc = "0.9.1"
+kotlinx-rpc = "0.10.0"
[libraries]
# kotlin
diff --git a/samples/ktor-web-app/gradle/wrapper/gradle-wrapper.properties b/samples/ktor-web-app/gradle/wrapper/gradle-wrapper.properties
index ff23a68d7..d4081da47 100644
--- a/samples/ktor-web-app/gradle/wrapper/gradle-wrapper.properties
+++ b/samples/ktor-web-app/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
diff --git a/samples/simple-ktor-app/build.gradle.kts b/samples/simple-ktor-app/build.gradle.kts
index 431b01f68..dd9b94ec5 100644
--- a/samples/simple-ktor-app/build.gradle.kts
+++ b/samples/simple-ktor-app/build.gradle.kts
@@ -5,8 +5,8 @@
plugins {
kotlin("jvm") version "2.2.0"
kotlin("plugin.serialization") version "2.2.0"
- id("io.ktor.plugin") version "3.2.1"
- id("org.jetbrains.kotlinx.rpc.plugin") version "0.9.1"
+ id("io.ktor.plugin") version "3.3.0"
+ id("org.jetbrains.kotlinx.rpc.plugin") version "0.10.0"
}
group = "kotlinx.rpc.sample"
@@ -28,12 +28,12 @@ kotlin {
}
dependencies {
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-client:0.9.1")
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-server:0.9.1")
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-serialization-json:0.9.1")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-client:0.10.0")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-server:0.10.0")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-serialization-json:0.10.0")
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-ktor-client:0.9.1")
- implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-ktor-server:0.9.1")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-ktor-client:0.10.0")
+ implementation("org.jetbrains.kotlinx:kotlinx-rpc-krpc-ktor-server:0.10.0")
implementation("io.ktor:ktor-client-cio")
implementation("io.ktor:ktor-server-netty-jvm")
diff --git a/samples/simple-ktor-app/gradle/wrapper/gradle-wrapper.properties b/samples/simple-ktor-app/gradle/wrapper/gradle-wrapper.properties
index ff23a68d7..d4081da47 100644
--- a/samples/simple-ktor-app/gradle/wrapper/gradle-wrapper.properties
+++ b/samples/simple-ktor-app/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.2-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
diff --git a/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTests.kt b/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTests.kt
index 96c0e2210..cac4b4fbb 100644
--- a/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTests.kt
+++ b/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTests.kt
@@ -13,6 +13,8 @@ import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import org.junit.jupiter.api.TestFactory
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.seconds
@@ -122,34 +124,54 @@ class KrpcProtocolCompatibilityTests : KrpcProtocolCompatibilityTestsBase() {
}
@TestFactory
- fun clientStreamCancellation() = matrixTest { service, impl ->
+ fun clientStreamCancellation() = matrixTest(
+ exclude = listOf(
+ Versions.v0_9.client,
+ Versions.v0_9.server,
+ Versions.v0_8.client,
+ Versions.v0_8.server,
+ ),
+ ) { service, impl ->
val job = launch {
+ println("[clientStreamCancellation] launching")
service.clientStreamCancellation(flow {
emit(1)
+ println("[clientStreamCancellation] emit 1")
impl.fence.await()
+ println("[clientStreamCancellation] after fence")
})
+ println("[clientStreamCancellation] after service call")
}
impl.entered.await()
+ println("[clientStreamCancellation] entered")
job.cancelAndJoin()
+ println("[clientStreamCancellation] cancelled")
impl.cancelled.await(1)
+ println("[clientStreamCancellation] awaited cancellation")
assertNoErrorsInLogs()
}
@TestFactory
- fun fastProducer() = matrixTest(timeout = 60.seconds) { service, impl ->
+ fun fastProducer() = matrixTest(timeout = 240.seconds) { service, impl ->
+ val root = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME) as ch.qos.logback.classic.Logger
+
val async = async {
service.fastServerProduce(1000).map {
// long produce
impl.entered.complete(Unit)
impl.fence.await()
+ root.info("Consumed $it")
it * it
}.toList()
}
impl.entered.await()
- repeat(10_000) {
+ repeat(5_000) {
+ if (it % 10 == 0) {
+ root.info("Parallel iteration #$it")
+ }
assertEquals(1, service.unary(1))
assertEquals(55, service.serverStreaming(10).toList().sum())
}
diff --git a/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt b/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt
index 5dc70aad3..a705a5259 100644
--- a/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt
+++ b/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/KrpcProtocolCompatibilityTestsBase.kt
@@ -6,9 +6,11 @@ package kotlinx.rpc.krpc.test.compat
import ch.qos.logback.classic.Logger
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.debug.DebugProbes
import kotlinx.coroutines.test.TestScope
+import kotlinx.coroutines.withContext
+import kotlinx.rpc.krpc.test.compat.service.TestStarter
import kotlinx.rpc.test.runTestWithCoroutinesProbes
import org.junit.jupiter.api.DynamicTest
import org.slf4j.LoggerFactory
@@ -22,6 +24,7 @@ import kotlin.time.Duration.Companion.seconds
enum class Versions {
v0_9,
v0_8,
+ Latest
;
}
@@ -36,29 +39,48 @@ class VersionRolePair(
@Suppress("unused")
val Versions.client get() = VersionRolePair(this, Role.Client)
+
@Suppress("unused")
val Versions.server get() = VersionRolePair(this, Role.Server)
abstract class KrpcProtocolCompatibilityTestsBase {
- class LoadedStarter(val version: Versions, val classLoader: URLClassLoader) {
- val starter = classLoader
+ interface LoadedStarter {
+ val version: Versions
+ val starter: Starter
+ suspend fun close()
+ }
+
+ class LoadedStarterImpl(override val version: Versions, val classLoader: URLClassLoader) : LoadedStarter {
+ override val starter = classLoader
.loadClass("kotlinx.rpc.krpc.test.compat.service.TestStarter")
.getDeclaredConstructor()
.newInstance() as Starter
- suspend fun close() {
- classLoader.close()
+ override suspend fun close() {
+ withContext(Dispatchers.IO) {
+ classLoader.close()
+ }
starter.stopClient()
starter.stopServer()
}
}
private fun prepareStarters(exclude: List): List {
- return Versions.entries.filter { it !in exclude }.map { version ->
+ return Versions.entries.filter { it !in exclude && it != Versions.Latest }.map { version ->
val versionResourcePath = javaClass.classLoader.getResource(version.name)!!
val versionClassLoader = URLClassLoader(arrayOf(versionResourcePath), javaClass.classLoader)
- LoadedStarter(version, versionClassLoader)
+ LoadedStarterImpl(version, versionClassLoader)
+ } + latestStarter()
+ }
+
+ private fun latestStarter() = object : LoadedStarter {
+ override val version: Versions = Versions.Latest
+ override val starter: Starter = TestStarter()
+
+ override suspend fun close() {
+ starter.stopClient()
+ starter.stopServer()
}
}
@@ -80,20 +102,20 @@ abstract class KrpcProtocolCompatibilityTestsBase {
timeout: Duration = 10.seconds,
body: suspend TestEnv.() -> Unit,
): Stream {
- return prepareStarters(exclude).map {
- DynamicTest.dynamicTest("$role ${it.version}") {
+ return prepareStarters(exclude).map { old ->
+ DynamicTest.dynamicTest("$role ${old.version}") {
runTestWithCoroutinesProbes(timeout = timeout) {
- DebugProbes.withDebugProbes {
- val root = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger
- val testAppender = root.getAppender("TEST") as TestLogAppender
+ val root = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger
+ val testAppender = root.getAppender("TEST") as TestLogAppender
+ testAppender.events.clear()
+ val new = latestStarter()
+ try {
+ val env = TestEnv(old.starter, new.starter, testAppender, this)
+ body(env)
+ } finally {
testAppender.events.clear()
- try {
- val env = TestEnv(it.starter, it.starter, testAppender, this)
- body(env)
- } finally {
- testAppender.events.clear()
- it.close()
- }
+ old.close()
+ new.close()
}
}
}
diff --git a/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/service/TestService.kt b/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/service/TestService.kt
index de6b12b38..b522f51c8 100644
--- a/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/service/TestService.kt
+++ b/tests/krpc-protocol-compatibility-tests/src/test/kotlin/kotlinx/rpc/krpc/test/compat/service/TestService.kt
@@ -78,13 +78,20 @@ class TestServiceImpl : TestService, CompatServiceImpl {
override suspend fun clientStreamCancellation(n: Flow) {
try {
n.collect {
+ println("[clientStreamCancellation] collected $it")
if (it != 0) {
entered.complete(Unit)
}
}
} catch (e: CancellationException) {
+ println("[clientStreamCancellation] cancelled on server")
cancelled.increment()
throw e
+ } catch (e: Throwable) {
+ println("[clientStreamCancellation] caught $e")
+ throw e
+ } finally {
+ println("[clientStreamCancellation] finally")
}
}
diff --git a/tests/test-utils/api/test-utils.api b/tests/test-utils/api/test-utils.api
new file mode 100644
index 000000000..372b1d62d
--- /dev/null
+++ b/tests/test-utils/api/test-utils.api
@@ -0,0 +1,19 @@
+public final class kotlinx/rpc/test/RunTestKt {
+ public static final fun runTestWithCoroutinesProbes-VtjQ1oo (JLkotlin/jvm/functions/Function2;)V
+}
+
+public final class kotlinx/rpc/test/RunTest_jvmKt {
+ public static final fun withDebugProbes (Lkotlin/jvm/functions/Function0;)Ljava/lang/Object;
+}
+
+public final class kotlinx/rpc/test/RunThreadIfPossible_jvmKt {
+ public static final fun runThreadIfPossible (Lkotlin/jvm/functions/Function0;)V
+}
+
+public final class kotlinx/rpc/test/WaitCounter {
+ public fun ()V
+ public final fun await (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public final fun getValue ()I
+ public final fun increment ()V
+}
+
diff --git a/tests/test-utils/api/test-utils.klib.api b/tests/test-utils/api/test-utils.klib.api
new file mode 100644
index 000000000..e0b6ed37c
--- /dev/null
+++ b/tests/test-utils/api/test-utils.klib.api
@@ -0,0 +1,32 @@
+// Klib ABI Dump
+// Targets: [iosArm64, iosSimulatorArm64, iosX64, js, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64]
+// Alias: native => [iosArm64, iosSimulatorArm64, iosX64, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64]
+// Rendering settings:
+// - Signature version: 2
+// - Show manifest properties: true
+// - Show declarations: true
+
+// Library unique name:
+final class kotlinx.rpc.test/WaitCounter { // kotlinx.rpc.test/WaitCounter|null[0]
+ constructor () // kotlinx.rpc.test/WaitCounter.|(){}[0]
+
+ final val value // kotlinx.rpc.test/WaitCounter.value|{}value[0]
+ final fun (): kotlin/Int // kotlinx.rpc.test/WaitCounter.value.|(){}[0]
+
+ final fun increment() // kotlinx.rpc.test/WaitCounter.increment|increment(){}[0]
+ final suspend fun await(kotlin/Int) // kotlinx.rpc.test/WaitCounter.await|await(kotlin.Int){}[0]
+}
+
+final inline fun <#A: kotlin/Any?> kotlinx.rpc.test/withDebugProbes(kotlin/Function0<#A>): #A // kotlinx.rpc.test/withDebugProbes|withDebugProbes(kotlin.Function0<0:0>){0ยง}[0]
+
+// Targets: [native, wasmWasi]
+final fun kotlinx.rpc.test/runTestWithCoroutinesProbes(kotlin.time/Duration, kotlin.coroutines/SuspendFunction1) // kotlinx.rpc.test/runTestWithCoroutinesProbes|runTestWithCoroutinesProbes(kotlin.time.Duration;kotlin.coroutines.SuspendFunction1){}[0]
+
+// Targets: [native]
+final fun kotlinx.rpc.test/runThreadIfPossible(kotlin/Function0) // kotlinx.rpc.test/runThreadIfPossible|runThreadIfPossible(kotlin.Function0){}[0]
+
+// Targets: [js, wasmJs, wasmWasi]
+final inline fun kotlinx.rpc.test/runThreadIfPossible(kotlin/Function0) // kotlinx.rpc.test/runThreadIfPossible|runThreadIfPossible(kotlin.Function0){}[0]
+
+// Targets: [js, wasmJs]
+final fun kotlinx.rpc.test/runTestWithCoroutinesProbes(kotlin.time/Duration, kotlin.coroutines/SuspendFunction1): kotlinx.coroutines.test.internal/JsPromiseInterfaceForTesting // kotlinx.rpc.test/runTestWithCoroutinesProbes|runTestWithCoroutinesProbes(kotlin.time.Duration;kotlin.coroutines.SuspendFunction1){}[0]
diff --git a/tests/test-utils/src/commonMain/kotlin/kotlinx/rpc/test/waitCounter.kt b/tests/test-utils/src/commonMain/kotlin/kotlinx/rpc/test/WaitCounter.kt
similarity index 80%
rename from tests/test-utils/src/commonMain/kotlin/kotlinx/rpc/test/waitCounter.kt
rename to tests/test-utils/src/commonMain/kotlin/kotlinx/rpc/test/WaitCounter.kt
index 06668ebc1..4c2436603 100644
--- a/tests/test-utils/src/commonMain/kotlin/kotlinx/rpc/test/waitCounter.kt
+++ b/tests/test-utils/src/commonMain/kotlin/kotlinx/rpc/test/WaitCounter.kt
@@ -22,13 +22,19 @@ class WaitCounter {
fun increment() {
lock.withLock {
val current = counter.incrementAndGet()
- waiters[current]?.forEach { it.resume(Unit) }
+ (0..current).forEach {
+ waiters[it]?.forEach { continuation ->
+ continuation.resume(Unit)
+ }
+
+ waiters.remove(it)
+ }
}
}
suspend fun await(value: Int) = suspendCancellableCoroutine {
lock.withLock {
- if (counter.value == value) {
+ if (counter.value >= value) {
it.resume(Unit)
} else {
waiters[value] = waiters[value].orEmpty() + it
diff --git a/versions-root/libs.versions.toml b/versions-root/libs.versions.toml
index 0f12a7ab3..b04626e7a 100644
--- a/versions-root/libs.versions.toml
+++ b/versions-root/libs.versions.toml
@@ -1,6 +1,6 @@
[versions]
# core library version
-kotlinx-rpc = "0.10.0-SNAPSHOT"
+kotlinx-rpc = "0.10.0"
# kotlin
kotlin-lang = "2.2.20" # or env.KOTLIN_VERSION
@@ -13,21 +13,21 @@ ktor = "3.3.0"
kotlin-logging = "7.0.13"
slf4j = "2.0.17"
logback = "1.3.14"
-gradle-plugin-publish = "1.3.1"
-kotlin-wrappers = "2025.6.11"
+gradle-plugin-publish = "2.0.0"
+kotlin-wrappers = "2025.9.8"
junit4 = "4.13.2"
-junit5 = "5.13.2"
+junit5 = "5.13.4"
intellij = "241.19416.19"
-kotlinx-browser = "0.3"
+kotlinx-browser = "0.5.0"
dokka = "2.0.0"
puppeteer = "24.9.0"
atomicfu = "0.29.0"
-serialization = "1.8.1"
+serialization = "1.9.0"
detekt-gradle-plugin = "1.23.8"
-kover = "0.9.1"
+kover = "0.9.2"
develocity = "3.19.2"
-common-custom-user-data = "2.3"
-compat-patrouille = "0.0.1"
+common-custom-user-data = "2.4.0"
+compat-patrouille = "0.0.2"
lincheck = "3.2"
[libraries]