Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

name: Build And Test

on: [pull_request]
Expand All @@ -8,12 +8,28 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- uses: ./docker/gh-build

- name: Build
uses: ./docker/gh-build
env:
DBX_TEST_CLIENT_ID: ${{ secrets.DBX_TEST_CLIENT_ID }}
DBX_TEST_ACCESS_TOKEN: ${{ secrets.DBX_TEST_ACCESS_TOKEN }}
with:
args: ./gradlew clean build -x test -x jvmTest -x jsTest -x jsNodeTest -x jsBrowserTest --info --max-workers 1 --no-daemon

- name: JVM Tests
uses: ./docker/gh-build
env:
DBX_TEST_CLIENT_ID: ${{ secrets.DBX_TEST_CLIENT_ID }}
DBX_TEST_ACCESS_TOKEN: ${{ secrets.DBX_TEST_ACCESS_TOKEN }}
with:
args: ./gradlew build --info --max-workers 1 --no-daemon
args: ./gradlew test jvmTest --info --max-workers 1 --no-daemon

- name: JS (Browser/Node) Tests
uses: ./docker/gh-build
with:
args: ./gradlew jsBrowserTest jsNodeTest --info --max-workers 1 --no-daemon

# Upload HTML test reports
- name: Upload Test Reports
uses: actions/upload-artifact@v4
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ ds
.DS_Store
logs/
.kotlin/
kotlin-js-store/
.output.txt
debug.log
4 changes: 4 additions & 0 deletions .release/migration-off-fn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
* Migrated the `lib` module away from the custom `Fn` class to standard Kotlin lambdas.
* Introduced `ExecutionScope` to provide contextual parameters to lambdas during execution, especially in distributed environments.
* Added custom serializers in the `exe` module for all migrated components to maintain backward compatibility with existing distributed execution infrastructure.
* Migrated major components including `map`, `merge`, `window`, `resample`, `flatten`, `flatMap`, `toCsv`, `toWav`, `toTable`, and `out`.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ import io.wavebeans.lib.stream.*
import java.io.File

fun main() {
// register the driver
WbFileDriver.registerDriver("file", LocalWbFileDriver)

// describe what you want compute
val out = 440.sine()
.trim(1000)
Expand Down
45 changes: 20 additions & 25 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,36 +1,31 @@
import org.gradle.kotlin.dsl.compileKotlin
import org.gradle.kotlin.dsl.kotlin
import org.gradle.kotlin.dsl.support.kotlinCompilerOptions

plugins {
alias(libs.plugins.kotlin.jvm)
alias(libs.plugins.retry)
alias(libs.plugins.retry) apply false
alias(libs.plugins.kotlin.multiplatform) apply false
alias(libs.plugins.kotlin.jvm) apply false

`java-library`
`maven-publish`
signing
}

allprojects {
apply {
plugin("kotlin")
plugin("org.gradle.test-retry")
}

repositories {
mavenCentral()
}

kotlin {
compilerOptions {
freeCompilerArgs.add("-Xlambdas=class")
}
jvmToolchain(11)
}
}

subprojects {

if (name == "lib") {
// it's a multiplatform project and defined independently
return@subprojects
}

apply(plugin = "kotlin")
apply(plugin = "org.gradle.test-retry")

group = "io.wavebeans"

dependencies {
Expand Down Expand Up @@ -75,15 +70,15 @@ subprojects {

publishing {
publications {
create<MavenPublication>("lib") {
from(subprojects.first { it.name == "lib" }.components["java"])
groupId = "io.wavebeans"
artifactId = "lib"
populatePom(
"WaveBeans Lib",
"WaveBeans API library. Provides the way to define bean streams and basic execution functionality."
)
}
// create<MavenPublication>("lib") {
// from(subprojects.first { it.name == "lib" }.components["java"])
// groupId = "io.wavebeans"
// artifactId = "lib"
// populatePom(
// "WaveBeans Lib",
// "WaveBeans API library. Provides the way to define bean streams and basic execution functionality."
// )
// }
create<MavenPublication>("exe") {
from(subprojects.first { it.name == "exe" }.components["java"])
groupId = "io.wavebeans"
Expand Down
8 changes: 8 additions & 0 deletions cli/src/main/kotlin/io/wavebeans/cli/WaveBeansCli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import ch.qos.logback.classic.Level
import ch.qos.logback.classic.LoggerContext
import io.wavebeans.cli.script.RunMode
import io.wavebeans.cli.script.ScriptRunner
import io.wavebeans.fs.local.LocalWbFileDriver
import io.wavebeans.http.WbHttpService
import io.wavebeans.lib.io.WbFileDriver
import io.wavebeans.lib.table.TableRegistry
import io.wavebeans.lib.table.TableRegistryImpl
import org.apache.commons.cli.CommandLine
Expand Down Expand Up @@ -87,6 +89,12 @@ class WaveBeansCli(
runOptions["httpLocations"] = cli.getRequired(httpCommunicator) { listOf("127.0.0.1:$it") }
}
}

// register local file driver by default
try {
WbFileDriver.registerDriver("file", LocalWbFileDriver)
} catch (ignore: IllegalStateException) {}

val sampleRate = cli.get(s) { it.toFloat() } ?: 44100.0f

val httpWait = cli.get(httpWait) { it.toLong() } ?: 0
Expand Down
3 changes: 2 additions & 1 deletion cli/src/main/kotlin/io/wavebeans/cli/script/ScriptRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.jetbrains.kotlin.scripting.compiler.plugin.impl.KJvmCompiledModuleInM
import java.io.Closeable
import java.io.File
import java.util.concurrent.*
import kotlin.io.path.createTempDirectory
import kotlin.math.absoluteValue
import kotlin.math.log10
import kotlin.reflect.jvm.jvmName
Expand Down Expand Up @@ -80,7 +81,7 @@ class ScriptRunner(
.toList()
val cleanedContent = content.replace(importsRegex, "")

val additionalClassesDir = createTempDir("wavebeans-cli", "").also { it.deleteOnExit() }
val additionalClassesDir = createTempDirectory("wavebeans-cli").toFile().also { it.deleteOnExit() }

val scriptContent = """package io.wavebeans.script

Expand Down
8 changes: 4 additions & 4 deletions cli/src/test/kotlin/io/wavebeans/cli/WaveBeansCliSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,10 @@ class WaveBeansCliSpec : DescribeSpec({
val portRange = createPorts(2)
val facilitators = portRange.map {
Facilitator(
communicatorPort = it,
threadsNumber = 2,
communicatorPort = it,
onServerShutdownTimeoutMillis = 100,
podDiscovery = object : PodDiscovery() {}
podDiscovery = object : PodDiscovery() {},
)
}
facilitators.forEach { it.start() }
Expand Down Expand Up @@ -178,10 +178,10 @@ class WaveBeansCliSpec : DescribeSpec({
val httpCommunicatorPort = findFreePort()
val gardeners = portRange.map {
Facilitator(
communicatorPort = it,
threadsNumber = 2,
communicatorPort = it,
onServerShutdownTimeoutMillis = 100,
podDiscovery = object : PodDiscovery() {}
podDiscovery = object : PodDiscovery() {},
)
}

Expand Down
37 changes: 11 additions & 26 deletions cli/src/test/kotlin/io/wavebeans/cli/script/ScriptRunnerSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import io.kotest.core.spec.style.DescribeSpec
import io.kotest.datatest.withData
import io.wavebeans.execution.PodDiscovery
import io.wavebeans.execution.distributed.Facilitator
import io.wavebeans.execution.distributed.FacilitatorConfig
import io.wavebeans.fs.local.LocalWbFileDriver
import io.wavebeans.lib.WaveBeansClassLoader
import io.wavebeans.lib.io.WbFileDriver
import io.wavebeans.tests.createPorts
import java.io.File
import java.lang.Thread.sleep
Expand All @@ -20,10 +23,13 @@ class ScriptRunnerSpec : DescribeSpec({
val facilitators = portRange
.map {
Facilitator(
communicatorPort = it,
threadsNumber = 2,
communicatorPort = it,
onServerShutdownTimeoutMillis = 100,
podDiscovery = object : PodDiscovery() {}
podDiscovery = object : PodDiscovery() {},
fileSystems = listOf(
FacilitatorConfig.FileSystemDescriptor("file", LocalWbFileDriver::class.java.canonicalName),
)
)
}

Expand Down Expand Up @@ -114,7 +120,7 @@ class ScriptRunnerSpec : DescribeSpec({
assertThat(runner.interrupt(true), "there was something to interrupt")
.isTrue()

assertThat { runner.awaitForResult(timeout = 100) }
assertThat(runCatching { runner.awaitForResult(timeout = 100) })
.isFailure()
.isInstanceOf(CancellationException::class)

Expand All @@ -132,7 +138,7 @@ class ScriptRunnerSpec : DescribeSpec({
noSuchMethod()
""".trimIndent()

assertThat { mode.eval(script) }
assertThat(runCatching { mode.eval(script) })
.isFailure()
.message().isNotNull().contains("noSuchMethod")
}
Expand Down Expand Up @@ -190,31 +196,10 @@ class ScriptRunnerSpec : DescribeSpec({
}
}

context("Defining function as class") {
withData(modes) { mode ->
val script = """
class InputFn: Fn<Pair<Long, Float>, Sample?>() {
override fun apply(argument: Pair<Long, Float>): Sample? {
return sampleOf(argument.first)
}
}

input(InputFn())
.map { it }
.trim(1)
.toDevNull()
.out()
""".trimIndent()

assertThat(mode.eval(script)).isNull()
}

}

context("Defining function as lambda") {
withData(modes) { mode ->
val script = """
input { (i, _) -> sampleOf(i) }
input { i, _ -> sampleOf(i) }
.map { it }
.trim(1)
.toDevNull()
Expand Down
Loading
Loading