Skip to content

Commit

Permalink
Handler API (#227)
Browse files Browse the repository at this point in the history
* Codegen for Handler API
* Reuse code between workflow and sdk-api
* Use ServiceLoader to load the ServiceAdapter
* Add explicit key passing
* Put in place basic stuff for the new manifest
* Renamings all around.
  • Loading branch information
slinkydeveloper authored Feb 27, 2024
1 parent b8f05e2 commit bb0612e
Show file tree
Hide file tree
Showing 112 changed files with 2,936 additions and 924 deletions.
10 changes: 5 additions & 5 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ For a sample project configuration and more elaborated examples, check out the t

Available examples:

* [`Counter`](src/main/java/dev/restate/sdk/examples/Counter.java): Shows a simple service using state primitives.
* [`Counter`](src/main/java/dev/restate/sdk/examples/Counter.java): Shows a simple virtual object using state primitives.
* [`VanillaGrpcCounter`](src/main/java/dev/restate/sdk/examples/VanillaGrpcCounter.java): Same as `Counter` but using the vanilla gRPC code generator output.
* [`CounterKt`](src/main/kotlin/dev/restate/sdk/examples/CounterKt.kt): Same as `Counter` but using Kotlin.

Expand All @@ -22,11 +22,11 @@ You'll find the shadowed jar in the `build` directory.

The class to configure in Lambda is `dev.restate.sdk.examples.LambdaHandler`.

By default, the [`dev.restate.sdk.examples.Counter`](src/main/java/dev/restate/sdk/examples/Counter.java) service is deployed. Set the env variable `LAMBDA_FACTORY_SERVICE_CLASS` to one of the available example classes to change the deployed class.
By default, the [`dev.restate.sdk.examples.Counter`](src/main/java/dev/restate/sdk/examples/Counter.java) component is deployed. Set the env variable `LAMBDA_FACTORY_SERVICE_CLASS` to one of the available example classes to change the deployed class.

## Running the examples (HTTP)

You can run the Java counter service via:
You can run the Java counter component via:

```shell
./gradlew :examples:run
Expand All @@ -38,9 +38,9 @@ You can modify the class to run setting `-PmainClass=<FQCN>`, for example, in or
./gradlew :examples:run -PmainClass=dev.restate.sdk.examples.CounterKt
```

## Invoking the counter service
## Invoking the counter component

If you want to invoke the counter service via [grpcurl](https://github.com/fullstorydev/grpcurl):
If you want to invoke the counter component via [grpcurl](https://github.com/fullstorydev/grpcurl):

```shell
grpcurl -plaintext -d '{"counter_name": "my_counter"}' localhost:9090 counter.Counter/Get
Expand Down
10 changes: 5 additions & 5 deletions examples/src/main/java/dev/restate/sdk/examples/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.examples;

import dev.restate.sdk.KeyedContext;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.examples.generated.*;
Expand All @@ -23,26 +23,26 @@ public class Counter extends CounterRestate.CounterRestateImplBase {
private static final StateKey<Long> TOTAL = StateKey.of("total", CoreSerdes.JSON_LONG);

@Override
public void reset(KeyedContext ctx, CounterRequest request) {
public void reset(ObjectContext ctx, CounterRequest request) {
ctx.clear(TOTAL);
}

@Override
public void add(KeyedContext ctx, CounterAddRequest request) {
public void add(ObjectContext ctx, CounterAddRequest request) {
long currentValue = ctx.get(TOTAL).orElse(0L);
long newValue = currentValue + request.getValue();
ctx.set(TOTAL, newValue);
}

@Override
public GetResponse get(KeyedContext ctx, CounterRequest request) {
public GetResponse get(ObjectContext ctx, CounterRequest request) {
long currentValue = ctx.get(TOTAL).orElse(0L);

return GetResponse.newBuilder().setValue(currentValue).build();
}

@Override
public CounterUpdateResult getAndAdd(KeyedContext ctx, CounterAddRequest request) {
public CounterUpdateResult getAndAdd(ObjectContext ctx, CounterAddRequest request) {
LOG.info("Invoked get and add with " + request.getValue());

long currentValue = ctx.get(TOTAL).orElse(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package dev.restate.sdk.examples;

import com.google.protobuf.Empty;
import dev.restate.sdk.KeyedContext;
import dev.restate.sdk.RestateService;
import dev.restate.sdk.Component;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.examples.generated.*;
Expand All @@ -19,23 +19,23 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class VanillaGrpcCounter extends CounterGrpc.CounterImplBase implements RestateService {
public class VanillaGrpcCounter extends CounterGrpc.CounterImplBase implements Component {

private static final Logger LOG = LogManager.getLogger(VanillaGrpcCounter.class);

private static final StateKey<Long> TOTAL = StateKey.of("total", CoreSerdes.JSON_LONG);

@Override
public void reset(CounterRequest request, StreamObserver<Empty> responseObserver) {
KeyedContext.current().clear(TOTAL);
ObjectContext.current().clear(TOTAL);

responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}

@Override
public void add(CounterAddRequest request, StreamObserver<Empty> responseObserver) {
KeyedContext ctx = KeyedContext.current();
ObjectContext ctx = ObjectContext.current();

long currentValue = ctx.get(TOTAL).orElse(0L);
long newValue = currentValue + request.getValue();
Expand All @@ -47,7 +47,7 @@ public void add(CounterAddRequest request, StreamObserver<Empty> responseObserve

@Override
public void get(CounterRequest request, StreamObserver<GetResponse> responseObserver) {
long currentValue = KeyedContext.current().get(TOTAL).orElse(0L);
long currentValue = ObjectContext.current().get(TOTAL).orElse(0L);

responseObserver.onNext(GetResponse.newBuilder().setValue(currentValue).build());
responseObserver.onCompleted();
Expand All @@ -58,7 +58,7 @@ public void getAndAdd(
CounterAddRequest request, StreamObserver<CounterUpdateResult> responseObserver) {
LOG.info("Invoked get and add with " + request.getValue());

KeyedContext ctx = KeyedContext.current();
ObjectContext ctx = ObjectContext.current();

long currentValue = ctx.get(TOTAL).orElse(0L);
long newValue = currentValue + request.getValue();
Expand Down
76 changes: 76 additions & 0 deletions examples/src/main/java/my/restate/sdk/examples/Counter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package my.restate.sdk.examples;

import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.annotation.Handler;
import dev.restate.sdk.annotation.VirtualObject;
import dev.restate.sdk.common.CoreSerdes;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@VirtualObject
public class Counter {

private static final Logger LOG = LogManager.getLogger(Counter.class);

private static final StateKey<Long> TOTAL = StateKey.of("total", CoreSerdes.JSON_LONG);

@Handler
public void reset(ObjectContext ctx) {
ctx.clearAll();
}

@Handler
public void add(ObjectContext ctx, Long request) {
long currentValue = ctx.get(TOTAL).orElse(0L);
long newValue = currentValue + request;
ctx.set(TOTAL, newValue);
}

@Handler
public Long get(ObjectContext ctx) {
return ctx.get(TOTAL).orElse(0L);
}

@Handler
public CounterUpdateResult getAndAdd(ObjectContext ctx, Long request) {
LOG.info("Invoked get and add with " + request);

long currentValue = ctx.get(TOTAL).orElse(0L);
long newValue = currentValue + request;
ctx.set(TOTAL, newValue);

return new CounterUpdateResult(newValue, currentValue);
}

public static void main(String[] args) {
RestateHttpEndpointBuilder.builder().with(new Counter()).buildAndListen();
}

public static class CounterUpdateResult {
private final Long newValue;
private final Long oldValue;

public CounterUpdateResult(Long newValue, Long oldValue) {
this.newValue = newValue;
this.oldValue = oldValue;
}

public Long getNewValue() {
return newValue;
}

public Long getOldValue() {
return oldValue;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import dev.restate.sdk.Context;
import dev.restate.sdk.annotation.Service;
import dev.restate.sdk.annotation.ServiceType;
import dev.restate.sdk.annotation.Shared;
import dev.restate.sdk.annotation.Workflow;
import dev.restate.sdk.common.CoreSerdes;
Expand All @@ -36,7 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

@Service(ServiceType.WORKFLOW)
@Workflow
public class LoanWorkflow {

// --- Data types used by the Loan Worfklow
Expand Down Expand Up @@ -176,7 +174,8 @@ public static void main(String[] args) {
// To invoke the workflow:
Channel restateChannel =
NettyChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext().build();
LoanWorkflowExternalClient client = new LoanWorkflowExternalClient(restateChannel, "my-loan");
LoanWorkflowClient.IngressClient client =
LoanWorkflowClient.fromIngress(restateChannel, "my-loan");

WorkflowExecutionState state =
client.submit(
Expand Down
12 changes: 6 additions & 6 deletions examples/src/main/kotlin/dev/restate/sdk/examples/CounterKt.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ package dev.restate.sdk.examples
import dev.restate.sdk.common.StateKey
import dev.restate.sdk.examples.generated.*
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder
import dev.restate.sdk.kotlin.KeyedContext
import dev.restate.sdk.kotlin.KtSerdes
import dev.restate.sdk.kotlin.ObjectContext
import org.apache.logging.log4j.LogManager

class CounterKt : CounterRestateKt.CounterRestateKtImplBase() {
Expand All @@ -21,20 +21,20 @@ class CounterKt : CounterRestateKt.CounterRestateKtImplBase() {

private val TOTAL = StateKey.of<Long>("total", KtSerdes.json())

override suspend fun reset(context: KeyedContext, request: CounterRequest) {
override suspend fun reset(context: ObjectContext, request: CounterRequest) {
context.clear(TOTAL)
}

override suspend fun add(context: KeyedContext, request: CounterAddRequest) {
override suspend fun add(context: ObjectContext, request: CounterAddRequest) {
updateCounter(context, request.value)
}

override suspend fun get(context: KeyedContext, request: CounterRequest): GetResponse {
override suspend fun get(context: ObjectContext, request: CounterRequest): GetResponse {
return getResponse { value = context.get(TOTAL) ?: 0L }
}

override suspend fun getAndAdd(
context: KeyedContext,
context: ObjectContext,
request: CounterAddRequest
): CounterUpdateResult {
LOG.info("Invoked get and add with " + request.value)
Expand All @@ -45,7 +45,7 @@ class CounterKt : CounterRestateKt.CounterRestateKtImplBase() {
}
}

private suspend fun updateCounter(context: KeyedContext, add: Long): Pair<Long, Long> {
private suspend fun updateCounter(context: ObjectContext, add: Long): Pair<Long, Long> {
val currentValue = context.get(TOTAL) ?: 0L
val newValue = currentValue + add

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ private ServiceContext buildServiceContext(
serviceContext.contextType =
serviceProto.getOptions().getExtension(Ext.serviceType) == ServiceType.UNKEYED
? "Context"
: "KeyedContext";
: "ObjectContext";

// Resolve javadoc
DescriptorProtos.SourceCodeInfo.Location serviceLocation =
Expand Down
20 changes: 10 additions & 10 deletions protoc-gen-restate/src/main/resources/javaStub.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package {{packageName}};
{{/packageName}}

import dev.restate.sdk.Context;
import dev.restate.sdk.KeyedContext;
import dev.restate.sdk.ObjectContext;
import dev.restate.sdk.Awaitable;
import dev.restate.sdk.common.syscalls.Syscalls;
import java.time.Duration;
Expand All @@ -16,7 +16,7 @@ public class {{className}} {
private {{className}}() {}

/**
* Create a new client from the given {@link KeyedContext}.
* Create a new client from the given {@link ObjectContext}.
*/
public static {{serviceName}}RestateClient newClient(Context ctx) {
return new {{serviceName}}RestateClient(ctx);
Expand Down Expand Up @@ -87,7 +87,7 @@ public class {{className}} {
}

{{{apidoc}}}
public static abstract class {{serviceName}}RestateImplBase implements dev.restate.sdk.RestateService {
public static abstract class {{serviceName}}RestateImplBase implements dev.restate.sdk.Component {
{{#methods}}
{{#deprecated}}
Expand All @@ -114,34 +114,34 @@ public class {{className}} {
private static final class HandlerAdapter<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp> {
private final java.util.function.BiFunction<KeyedContext, Req, Resp> handler;
private final java.util.function.BiFunction<ObjectContext, Req, Resp> handler;
private HandlerAdapter(java.util.function.BiFunction<KeyedContext, Req, Resp> handler) {
private HandlerAdapter(java.util.function.BiFunction<ObjectContext, Req, Resp> handler) {
this.handler = handler;
}

@Override
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
responseObserver.onNext(handler.apply(KeyedContext.fromSyscalls(Syscalls.current()), request));
responseObserver.onNext(handler.apply(ObjectContext.fromSyscalls(Syscalls.current()), request));
responseObserver.onCompleted();
}

private static <Req, Resp> HandlerAdapter<Req, Resp> of(java.util.function.BiFunction<KeyedContext, Req, Resp> handler) {
private static <Req, Resp> HandlerAdapter<Req, Resp> of(java.util.function.BiFunction<ObjectContext, Req, Resp> handler) {
return new HandlerAdapter<>(handler);
}

private static <Resp> HandlerAdapter<com.google.protobuf.Empty, Resp> of(java.util.function.Function<KeyedContext, Resp> handler) {
private static <Resp> HandlerAdapter<com.google.protobuf.Empty, Resp> of(java.util.function.Function<ObjectContext, Resp> handler) {
return new HandlerAdapter<>((ctx, e) -> handler.apply(ctx));
}

private static <Req> HandlerAdapter<Req, com.google.protobuf.Empty> of(java.util.function.BiConsumer<KeyedContext, Req> handler) {
private static <Req> HandlerAdapter<Req, com.google.protobuf.Empty> of(java.util.function.BiConsumer<ObjectContext, Req> handler) {
return new HandlerAdapter<>((ctx, req) -> {
handler.accept(ctx, req);
return com.google.protobuf.Empty.getDefaultInstance();
});
}

private static HandlerAdapter<com.google.protobuf.Empty, com.google.protobuf.Empty> of(java.util.function.Consumer<KeyedContext> handler) {
private static HandlerAdapter<com.google.protobuf.Empty, com.google.protobuf.Empty> of(java.util.function.Consumer<ObjectContext> handler) {
return new HandlerAdapter<>((ctx, req) -> {
handler.accept(ctx);
return com.google.protobuf.Empty.getDefaultInstance();
Expand Down
Loading

0 comments on commit bb0612e

Please sign in to comment.