Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e058d77
Migrate `input` functions to use standard Kotlin lambdas instead of `…
asubb Dec 20, 2025
48985c7
Deprecate `BeanStream.map` with `Fn` parameter, migrate to lambda-bas…
asubb Dec 20, 2025
d61d726
Refactor stream operations to eliminate `Fn` usage, streamline lambda…
asubb Dec 20, 2025
1536059
Replace `Fn` with Kotlin lambdas in `CsvStreamOutput` implementations…
asubb Dec 20, 2025
3d1dbfb
Refactor `merge` operation to support `ExecutionScope`, update serial…
asubb Dec 20, 2025
3b0f193
Migrate `Fn`-based resampling and output mechanisms to Kotlin lambdas…
asubb Dec 21, 2025
26bedbb
Migrate `FunctionStreamOutput` to use Kotlin lambdas, update serializ…
asubb Dec 21, 2025
6fd6154
Migrate serializers and stream operations to support `ExecutionScope`…
asubb Dec 21, 2025
8b016f0
Add @Suppress("UNCHECKED_CAST") annotations across serializers and st…
asubb Dec 21, 2025
97f5c16
Add `@Suppress("UNCHECKED_CAST")` annotations to serializers and upda…
asubb Dec 21, 2025
f65237b
Migrate remaining `Fn` usages to Kotlin lambdas, refactor related tes…
asubb Dec 21, 2025
006c2ba
Migrate remaining `Fn` references in documentation and code comments …
asubb Dec 21, 2025
0e44f69
Remove `JvmFnWrapper` implementation, migrate all remaining usages of…
asubb Dec 21, 2025
a60cec5
Remove `Fn` and related components, adapt all serializers and tests t…
asubb Dec 22, 2025
33062e1
Update documentation, tests, and stream operations to support `Execut…
asubb Dec 26, 2025
b8a5ba0
Update `build.gradle.kts` to improve browser test task readability an…
asubb Dec 26, 2025
41fd471
Simplify Chromium installation in Dockerfile by removing redundant sy…
asubb Dec 26, 2025
3140a0c
Update Dockerfile to replace Chromium with Google Chrome for tests an…
asubb Dec 26, 2025
f0b82c5
Add custom Karma launcher to support ChromeHeadless in Docker with no…
asubb Dec 26, 2025
17d4830
Simplify Karma configuration by using `useChromeHeadless` and update …
asubb Dec 26, 2025
c3aad2e
Split GitHub Actions workflow into separate build, JVM test, and JS t…
asubb Dec 26, 2025
ca4d931
Add Xvfb setup to GitHub Actions for JS browser tests and replace `se…
asubb Dec 26, 2025
8dbc581
Update GitHub Actions workflow to use custom Docker action for JS tests
asubb Dec 26, 2025
4db67a7
Switch GitHub Actions script from `run` to `with.args` for JS tests step
asubb Dec 26, 2025
295393e
Remove `--disable-setuid-sandbox` flag from ChromeHeadlessNoSandbox l…
asubb Dec 29, 2025
745be04
Run Chrome headless tests as non-root user in Dockerfile for GitHub A…
asubb Dec 29, 2025
cbbfb48
Update Dockerfile: adjust user directory paths for GitHub Actions com…
asubb Dec 29, 2025
3cd4c4d
Comment out build and JVM test steps in GitHub Actions workflow and s…
asubb Dec 29, 2025
d244da5
Update Karma configuration to use ChromeHeadlessNoSandbox launcher
asubb Dec 29, 2025
d6ae2ac
Uncomment build and JVM test steps in GitHub Actions workflow and rem…
asubb Dec 29, 2025
d57e5e1
Remove Xvfb setup from GitHub Actions and revise test specs to enable…
asubb Dec 29, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

name: Build And Test

on: [pull_request]
Expand All @@ -8,12 +8,28 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: ./docker/gh-build

- name: Build
uses: ./docker/gh-build
env:
DBX_TEST_CLIENT_ID: ${{ secrets.DBX_TEST_CLIENT_ID }}
DBX_TEST_ACCESS_TOKEN: ${{ secrets.DBX_TEST_ACCESS_TOKEN }}
with:
args: ./gradlew clean build -x test -x jvmTest -x jsTest -x jsNodeTest -x jsBrowserTest --info --max-workers 1 --no-daemon

- name: JVM Tests
uses: ./docker/gh-build
env:
DBX_TEST_CLIENT_ID: ${{ secrets.DBX_TEST_CLIENT_ID }}
DBX_TEST_ACCESS_TOKEN: ${{ secrets.DBX_TEST_ACCESS_TOKEN }}
with:
args: ./gradlew build --info --max-workers 1 --no-daemon
args: ./gradlew test jvmTest --info --max-workers 1 --no-daemon

- name: JS (Browser/Node) Tests
uses: ./docker/gh-build
with:
args: ./gradlew jsBrowserTest jsNodeTest --info --max-workers 1 --no-daemon

# Upload HTML test reports
- name: Upload Test Reports
uses: actions/upload-artifact@v4
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ ds
logs/
.kotlin/
kotlin-js-store/
.output.txt
debug.log
4 changes: 4 additions & 0 deletions .release/migration-off-fn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* Migrated the `lib` module away from the custom `Fn` class to standard Kotlin lambdas.
* Introduced `ExecutionScope` to provide contextual parameters to lambdas during execution, especially in distributed environments.
* Added custom serializers in the `exe` module for all migrated components to maintain backward compatibility with existing distributed execution infrastructure.
* Migrated major components including `map`, `merge`, `window`, `resample`, `flatten`, `flatMap`, `toCsv`, `toWav`, `toTable`, and `out`.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ import io.wavebeans.lib.stream.*
import java.io.File

fun main() {
// register the driver
WbFileDriver.registerDriver("file", LocalWbFileDriver)

// describe what you want compute
val out = 440.sine()
.trim(1000)
Expand Down
8 changes: 8 additions & 0 deletions cli/src/main/kotlin/io/wavebeans/cli/WaveBeansCli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import ch.qos.logback.classic.Level
import ch.qos.logback.classic.LoggerContext
import io.wavebeans.cli.script.RunMode
import io.wavebeans.cli.script.ScriptRunner
import io.wavebeans.fs.local.LocalWbFileDriver
import io.wavebeans.http.WbHttpService
import io.wavebeans.lib.io.WbFileDriver
import io.wavebeans.lib.table.TableRegistry
import io.wavebeans.lib.table.TableRegistryImpl
import org.apache.commons.cli.CommandLine
Expand Down Expand Up @@ -87,6 +89,12 @@ class WaveBeansCli(
runOptions["httpLocations"] = cli.getRequired(httpCommunicator) { listOf("127.0.0.1:$it") }
}
}

// register local file driver by default
try {
WbFileDriver.registerDriver("file", LocalWbFileDriver)
} catch (ignore: IllegalStateException) {}

val sampleRate = cli.get(s) { it.toFloat() } ?: 44100.0f

val httpWait = cli.get(httpWait) { it.toLong() } ?: 0
Expand Down
8 changes: 4 additions & 4 deletions cli/src/test/kotlin/io/wavebeans/cli/WaveBeansCliSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class WaveBeansCliSpec : DescribeSpec({
val portRange = createPorts(2)
val facilitators = portRange.map {
Facilitator(
communicatorPort = it,
threadsNumber = 2,
communicatorPort = it,
onServerShutdownTimeoutMillis = 100,
podDiscovery = object : PodDiscovery() {}
podDiscovery = object : PodDiscovery() {},
)
}
facilitators.forEach { it.start() }
Expand Down Expand Up @@ -178,10 +178,10 @@ class WaveBeansCliSpec : DescribeSpec({
val httpCommunicatorPort = findFreePort()
val gardeners = portRange.map {
Facilitator(
communicatorPort = it,
threadsNumber = 2,
communicatorPort = it,
onServerShutdownTimeoutMillis = 100,
podDiscovery = object : PodDiscovery() {}
podDiscovery = object : PodDiscovery() {},
)
}

Expand Down
33 changes: 9 additions & 24 deletions cli/src/test/kotlin/io/wavebeans/cli/script/ScriptRunnerSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import io.kotest.core.spec.style.DescribeSpec
import io.kotest.datatest.withData
import io.wavebeans.execution.PodDiscovery
import io.wavebeans.execution.distributed.Facilitator
import io.wavebeans.execution.distributed.FacilitatorConfig
import io.wavebeans.fs.local.LocalWbFileDriver
import io.wavebeans.lib.WaveBeansClassLoader
import io.wavebeans.lib.io.WbFileDriver
import io.wavebeans.tests.createPorts
import java.io.File
import java.lang.Thread.sleep
Expand All @@ -20,10 +23,13 @@ class ScriptRunnerSpec : DescribeSpec({
val facilitators = portRange
.map {
Facilitator(
communicatorPort = it,
threadsNumber = 2,
communicatorPort = it,
onServerShutdownTimeoutMillis = 100,
podDiscovery = object : PodDiscovery() {}
podDiscovery = object : PodDiscovery() {},
fileSystems = listOf(
FacilitatorConfig.FileSystemDescriptor("file", LocalWbFileDriver::class.java.canonicalName),
)
)
}

Expand Down Expand Up @@ -190,31 +196,10 @@ class ScriptRunnerSpec : DescribeSpec({
}
}

context("Defining function as class") {
withData(modes) { mode ->
val script = """
class InputFn: Fn<Pair<Long, Float>, Sample?>() {
override fun apply(argument: Pair<Long, Float>): Sample? {
return sampleOf(argument.first)
}
}

input(InputFn())
.map { it }
.trim(1)
.toDevNull()
.out()
""".trimIndent()

assertThat(mode.eval(script)).isNull()
}

}

context("Defining function as lambda") {
withData(modes) { mode ->
val script = """
input { (i, _) -> sampleOf(i) }
input { i, _ -> sampleOf(i) }
.map { it }
.trim(1)
.toDevNull()
Expand Down
15 changes: 11 additions & 4 deletions docker/gh-build/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ FROM eclipse-temurin:11-jdk

LABEL maintainer="WaveBeans"
LABEL "com.github.actions.name"="JDK 11 with Kotlin 2.2.20"
LABEL "com.github.actions.description"="Can run java app and uses Kotlin SDK"
LABEL "com.github.actions.description"="Can run java app and uses Kotlin SDK, have Google Chrome installed for tests"

RUN apt-get update &&\
apt-get install unzip
RUN apt-get update && \
apt-get install -y wget gnupg unzip && \
wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - && \
echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list && \
apt-get update && \
apt-get install -y google-chrome-stable && \
rm -rf /var/lib/apt/lists/*

ENV CHROME_BIN=/usr/bin/google-chrome

RUN cd /usr/lib && \
wget -q https://github.com/JetBrains/kotlin/releases/download/v2.2.20/kotlin-compiler-2.2.20.zip && \
unzip kotlin-compiler-*.zip && \
rm kotlin-compiler-*.zip

ENV PATH=$PATH:/usr/lib/kotlinc/bin
ENV PATH=$PATH:/usr/lib/kotlinc/bin
9 changes: 9 additions & 0 deletions docs/dev/distributed-execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- [Registering Bush Endpoints](#registering-bush-endpoints)
- [Starting job and tracking its progress](#starting-job-and-tracking-its-progress)
- [Pods distribution](#pods-distribution)
- [Lambda Serialization and ExecutionScope](#lambda-serialization-and-executionscope)

<!-- END doctoc generated TOC please keep comment here to allow auto update -->

Expand Down Expand Up @@ -140,6 +141,14 @@ Distributed overseer, while created, provided with the list of [Facilitators](de

The idea behind this planner is to be able to distribute based on Facilitator states, i.e. taking into account current capacity and assignments, as well as Bean aware deployment like, for example, inputs and outputs are better spread across different overseers as they may have high IO, or even requires special types of the nodes. All this things Planner can fetch upon start and make a better judgement what to deploy where. And the overseer will blindly follow the lead.

### Lambda Serialization and ExecutionScope

In distributed mode, the topology is serialized into JSON. Functional beans (like `MapStream`) use `LambdaSerializer` to handle the serialization of Kotlin lambdas.

Since Kotlin lambdas are not natively serializable across different JVM processes without the exact same context, WaveBeans wraps them into an internal `Fn` representation during serialization.

The `ExecutionScope` is serialized alongside the functional bean parameters. When the pod is instantiated on a worker node, the `ExecutionScope` is reconstructed, and the lambda is invoked with this scope as its receiver. This ensures that parameters passed via `executionScope { ... }` are available on all worker nodes.



[actors-hierarchy]: assets/distributed-execution-actors-hierarchy.png "Actors Hierarchy"
Expand Down
165 changes: 165 additions & 0 deletions docs/migration_off_fn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
### Migration off `Fn` within `lib`

This document tracks the progress of migrating away from the `Fn` class and its related infrastructure within the `lib` module. The goal is to replace `Fn` with more standard or efficient functional representations where applicable.

#### Migration Instructions

The goal of this migration is to replace the use of the `Fn` class with standard Kotlin functional interfaces (lambdas) in the `lib` module while maintaining serialization compatibility in the `exe` module.

##### Step 1: Update the `lib` module

Modify the classes in the `lib` module to use standard Kotlin functional interfaces instead of `Fn`.

- Change constructor parameters and properties from `Fn<T, R>` to `(T) -> R` (or appropriate functional type).
- Update the implementation to call the lambda directly instead of using `.apply()`.
- Keep the `BeanParams` classes and other structures, but update their properties to use lambdas.
- If the component needs to access parameters from the environment (e.g., multiplier in `changeAmplitude`), it should use `ExecutionScope`.
- `ExecutionScope` should be added to the `BeanParams` class and passed to the lambda as a receiver: `ExecutionScope.(T) -> R`.
- If the `BeanParams` had a custom serializer within the `lib` module, it should be moved or replaced by a more general approach, as lambdas cannot be directly serialized by `kotlinx.serialization` without extra help.

Example with `ExecutionScope` (`MapStreamParams` in `io.wavebeans.lib.stream.MapStream`):
```kotlin
class MapStreamParams<T : Any, R : Any>(
val scope: ExecutionScope,
val transform: ExecutionScope.(T) -> R
) : BeanParams
```

Example (`InputParams` in `io.wavebeans.lib.io.FunctionInput`):
```kotlin
// Before
class InputParams<T : Any>(
val generator: Fn<Pair<Long, Float>, T?>,
val sampleRate: Float? = null
) : BeanParams

// After
class InputParams<T : Any>(
val generator: (Long, Float) -> T?,
val sampleRate: Float? = null
) : BeanParams
```

##### Step 2: Create a custom serializer in the `exe` module

Since lambdas are not serializable, create a custom `KSerializer` in the `exe` module (typically under `io.wavebeans.execution.serializer`) that wraps the lambda into an `Fn` during serialization and unwraps it during deserialization.

- The `serialize` method should use `io.wavebeans.lib.wrap()` to convert the lambda to an `Fn`.
- If using `ExecutionScope`, ensure it is also serialized (using `ExecutionScope.serializer()`) and passed to `wrap()` if necessary, or handled in the lambda returned by `deserialize`.
- The `deserialize` method should decode the `Fn` and then return a lambda that calls `fn.apply()`.
- Use `FnSerializer` to handle the actual serialization/deserialization of the wrapped `Fn`.

Example with `ExecutionScope` (`MapStreamParamsSerializer` in `io.wavebeans.execution.serializer`):
```kotlin
object MapStreamParamsSerializer : KSerializer<MapStreamParams<*, *>> {

override val descriptor: SerialDescriptor = buildClassSerialDescriptor(MapStreamParams::class.className()) {
element("scope", ExecutionScope.serializer().descriptor)
element("transformFn", FnSerializer.descriptor)
}

override fun deserialize(decoder: Decoder): MapStreamParams<*, *> {
return decoder.decodeStructure(descriptor) {
lateinit var fn: Fn<Any, Any>
lateinit var scope: ExecutionScope
loop@ while (true) {
when (val i = decodeElementIndex(descriptor)) {
CompositeDecoder.DECODE_DONE -> break@loop
0 -> scope = decodeSerializableElement(descriptor, i, ExecutionScope.serializer())
1 -> fn = decodeSerializableElement(descriptor, i, FnSerializer) as Fn<Any, Any>
else -> throw SerializationException("Unknown index $i")
}
}
MapStreamParams<Any, Any>(scope) { fn.apply(it) }
}
}

override fun serialize(encoder: Encoder, value: MapStreamParams<*, *>) {
encoder.encodeStructure(descriptor) {
encodeSerializableElement(descriptor, 0, ExecutionScope.serializer(), value.scope)
encodeSerializableElement(descriptor, 1, FnSerializer, wrap(value.transform))
}
}
}
```

Example (`InputParamsSerializer` in `io.wavebeans.execution.serializer`):
```kotlin
object InputParamsSerializer : KSerializer<InputParams<*>> {
override val descriptor: SerialDescriptor = buildClassSerialDescriptor(InputParams::class.className()) {
element("generateFn", FnSerializer.descriptor)
element("sampleRate", Float.serializer().nullable.descriptor)
}

override fun deserialize(decoder: Decoder): InputParams<*> {
return decoder.decodeStructure(descriptor) {
var sampleRate: Float? = null
lateinit var func: Fn<Pair<Long, Float>, Any?>
loop@ while (true) {
when (val i = decodeElementIndex(descriptor)) {
CompositeDecoder.DECODE_DONE -> break@loop
0 -> func = decodeSerializableElement(descriptor, i, FnSerializer) as Fn<Pair<Long, Float>, Any?>
1 -> sampleRate = decodeNullableSerializableElement(descriptor, i, Float.serializer().nullable)
else -> throw SerializationException("Unknown index $i")
}
}
InputParams({ a, b -> func.apply(a to b) }, sampleRate)
}
}

override fun serialize(encoder: Encoder, value: InputParams<*>) {
encoder.encodeStructure(descriptor) {
encodeSerializableElement(descriptor, 0, FnSerializer, wrap(value.generator))
encodeNullableSerializableElement(descriptor, 1, Float.serializer().nullable, value.sampleRate)
}
}
}
```

##### Step 3: Register the serializer in `SerializationUtils.kt`

Update `io.wavebeans.execution.SerializationUtils.kt` to register the new serializer in the `beanParams()` method. This ensures that when a `BeanParams` is encountered during topology serialization, it uses your custom serializer.

```kotlin
fun SerializersModuleBuilder.beanParams() {
polymorphic(BeanParams::class) {
// ...
subclass(InputParams::class, InputParamsSerializer)
// ...
}
}
```

#### Technical Debt

The following items are temporary measures introduced during the migration and should be resolved once the migration is complete:

- [ ] Migrate `sincResampleFunc` and `SincResampleFn` to use lambdas instead of `Fn`.
- [ ] Migrate `SimpleResampleFn` to use lambdas instead of `Fn`.

#### Classes to Migrate

- [ ] `io.wavebeans.lib.stream.SincResampleFn`
- [x] `io.wavebeans.lib.io.CsvStreamOutput`
- [x] `io.wavebeans.lib.io.CsvStreamOutputParams`
- [x] `io.wavebeans.lib.io.CsvPartialStreamOutput`
- [x] `io.wavebeans.lib.stream.window.MapWindowFn`
- [x] `io.wavebeans.lib.stream.ResampleStreamParams`
- [x] `io.wavebeans.lib.stream.ResampleBeanStream`
- [x] `io.wavebeans.lib.stream.ResampleFiniteStream`
- [x] `io.wavebeans.lib.stream.AbstractResampleStream`
- [x] `io.wavebeans.lib.io.InputParams` (in `io.wavebeans.lib.io.FunctionInput`)
- [x] `io.wavebeans.lib.io.Input` (in `io.wavebeans.lib.io.FunctionInput`)
- [x] `io.wavebeans.lib.io.FunctionStreamOutput`
- [x] `io.wavebeans.lib.io.FunctionStreamOutputParams`
- [x] `io.wavebeans.lib.stream.FlattenStreamsParams` (in `io.wavebeans.lib.stream.FlattenStream`)
- [x] `io.wavebeans.lib.stream.FlattenStream`
- [x] `io.wavebeans.lib.stream.FlattenWindowStreamsParams` (in `io.wavebeans.lib.stream.FlattenWindowStream`)
- [x] `io.wavebeans.lib.stream.FlattenWindowStream`
- [x] `io.wavebeans.lib.stream.FunctionMergedStreamParams`
- [x] `io.wavebeans.lib.stream.FunctionMergedStream`
- [x] Identify areas for `ExecutionScope` documentation.
- [x] Update `docs/user/api/functions.md` with `ExecutionScope` and `ScopeParameters`.
- [x] Update operation-specific docs (`map`, `merge`, `input`, `out`) with `ExecutionScope` examples.
- [x] Update `distributed-execution.md` with technical details of `ExecutionScope` serialization.
- [x] Update `docs/user/api/readme.md` with `ExecutionScope` as a key concept.
Loading
Loading