Simple native RPC with high order functions support.
Inspired by @altavir and Communicator.
-
λRPC does not use standalone declarations (IDL) and code generation (so λRPC declarations are native for the user’s code). Instead, it allows working with library-specific data structures and default or custom serializers.
-
λRPC functions can receive and return other functions as first-class objects. Passed lambda will be executed on the client-side, so it can easily capture the state (even mutable) and be "sent" to the other language’s process.
Let’s suppose that we have a code that loads data and fits a machine learning model (represented in ONNX format for example). It also calls a given function on each epoch and continues learning while it returns true.
typealias DataLoader = suspend () -> Data
suspend fun fit(
model: Model, loader: DataLoader,
continueLearning: suspend (Epoch, Metric) -> Boolean
): Model
To transform the code to service just add facade declarations and a main
function.
A declaration is a collection of coders for all function arguments and for the result.
Coder itself is a generalization of data serializer and a specific way the λRPC works with functions as first-class objects (will be discussed later).
val loader = f( // Coder for the function: suspend () -> Data
j<Data>() // kotlinx.serialization JSON coder for the @Serializable Data
)
val fit by mlServiceId.def( // Define declaration for suspend (Epoch, Metric) -> Boolean
ModelCoder, // Model may not be @Serializable, so λRPC allows writing custom data coders
loader, f(j<Epoch>(), j<Metric>(), j<Boolean>()),
ModelCoder
)
fun main() {
// Such service that looks like a library is called libservice
val service = LibService(mlServiceId, mlEndpoint) {
fit of ::fit // Bind declaration and implementation
}
service.start()
service.awaitTermination()
}
To make calls from the client, add a ServiceDispatcher
to the coroutine context and just invoke the declaration:
// Import libservice facade like a common library
import org.mlservice.facade.fit
import org.dataservice.facade.dataloader
val serviceDispatcher = ServiceDispatcher(
mlServiceId to mlEndpoint, // Endpoint of the service with GPU for fitting
dataServiceId to dataEndpoint // Endpoint of the service that provides the data
)
fun main() = runBlocking(serviceDispatcher) {
// Keep track of the loss function values
val history = mutableListOf<Metric>()
var lastEpoch = 0
val rawModel = Model(layers=1050)
// Bind dataloader with dataEndpoint, so mlservice will communicate directly
// with the dataservice on the dataEndpoint without client in the middle
val boundLoader = dataloader.toBound()
val model = fit(rawModel, boundLoader) { epoch, metric ->
// Lambda will be executed on the client site -- the λRPC magic
println("Epoch = $epoch, metric = $metric")
val continueLearning = if (epoch < 300) true else {
val max = history.takeLast(50).maxOf { it }
loss < max
}
lastEpoch = epoch
history += metric
continueLearning
}
println("Learning finished! Epoch = $lastEpoch, metric = ${history.last()}")
model.save("my/experiments")
}
λRPC does not serialize lambdas and executes them on the client site. Thus, closures with even mutable state can be passed to the services (and services can also be written in other languages).
// Service
suspend fun f(g: suspend (Int) -> Int) = g(5) + 1
// Client
val m = 36
f { it + m }
-
Code execution in different containers or on various hardware (GPU for instance).
-
Parallel execution of independent tasks.
-
Communication with code written in other languages.
-
Rerun subtasks in case of failures or resume them using some state snapshot.
-
Run tasks that live longer than a single process.
-
Microservice architecture.
-
Communication protocol simplification:
-
Service function can easily request additional information in some cases.
-
Reduce service code duplication: make HOF and receive specific operations from the client.
-
-
Interactive computations: computing function receives closure as a parameter and calls it periodically, providing computation status information and receiving further directives.
-
Security:
-
Send closures operating on the sensitive data instead of the data itself.
-
Provide computational resources as a library of functions that are parametrized by client lambdas instead of receiving the client’s code and executing it.
-
-
Choose dynamically computation location: compute something that uses a large amount of data on a client or send data to the server and compute there.
-
Load balancing: once a task is finished, request new via client’s lambda.
-
Stateful streaming computations: nodes provide their lambdas for a mapper.
-
The backend part contains the programming language closure coders for the arguments and the result.
-
The frontend one is a callable proxy object that communicates with its backend part on call and waits for the result.
Frontend function is fully described by serializable prototype object. So frontend function can be sent to other services as a prototype to be recreated there as a callable proxy. Then λRPC provides efficient communication with the corresponding backend part.
$ ./gradlew build
$ ./gradlew :lambdarpc-core:slow
$ ./gradlew :lambdarpc-core:dokkaHtml
$ cd ./lambdarpc-core/build/dokka/html
-
interactive_ml
— readme example.
$ cd LambdaRPC.kt $ ./gradlew :examples:interactive-ml:dataservice $ ./gradlew :examples:interactive-ml:mlservice $ ./gradlew :examples:interactive-ml:client
-
promise_pipeline
— an interesting example that shows the possibility to build lazy data processing pipelines using common λRPC functionality.
$ cd LambdaRPC.kt $ ./gradlew :examples:promise-pipeline:service --args=8090 $ ./gradlew :examples:promise-pipeline:service --args=8091 # Any number of services on different ports $ ./gradlew :examples:promise-pipeline:client --args='8090 8091' # Ports of all services
-
dsl
— domain-specific language for λRPC library users. -
functions
— λRPC functions: backend and frontend parts. -
coding
— containsCoder
definition, it is a thing that can serialize data and work with functions.-
λRPC provides some default data coders based on
kotlinx.serialization
, but users can also implement their own. -
Function encoding saves language closure as backend function to the registry with some
access name
. Function decoding creates a frontend function that communicates with the corresponding backend function.
-
-
service
— libservice implementation. -
transport
— service and connection interfaces, internal protobuf serialization for λRPC protocol needs, extensions and implementations related to the gRPC backend.