Skip to content

Commit

Permalink
Use the new AwakeableIdentifier format
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Aug 15, 2023
1 parent 00a3cb3 commit ab4b604
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package dev.restate.sdk.blocking;

import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.sdk.core.syscalls.DeferredResult;
import dev.restate.sdk.core.syscalls.Syscalls;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -14,22 +13,22 @@
*
* <p>For example you can send a Kafka record including the {@link Awakeable#id()}, and then let
* another service consume from Kafka the responses of given external system interaction by using
* {@link RestateContext#awakeableHandle(AwakeableIdentifier)}.
* {@link RestateContext#awakeableHandle(String)}.
*/
@NotThreadSafe
public final class Awakeable<T> extends Awaitable<T> {

private final AwakeableIdentifier identifier;
private final String identifier;

Awakeable(Syscalls syscalls, DeferredResult<T> deferredResult, AwakeableIdentifier identifier) {
Awakeable(Syscalls syscalls, DeferredResult<T> deferredResult, String identifier) {
super(syscalls, deferredResult);
this.identifier = identifier;
}

/**
* @return the unique identifier of this {@link Awakeable} instance.
*/
public AwakeableIdentifier id() {
public String id() {
return identifier;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.sdk.blocking;

import com.google.protobuf.MessageLite;
import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.sdk.core.StateKey;
import dev.restate.sdk.core.TypeTag;
import dev.restate.sdk.core.serde.Serde;
Expand Down Expand Up @@ -128,7 +127,7 @@ default <T> Awakeable<T> awakeable(Class<T> type) {
* <p>You can use this feature to implement external asynchronous systems interactions, for
* example you can send a Kafka record including the {@link Awakeable#id()}, and then let another
* service consume from Kafka the responses of given external system interaction by using {@link
* #awakeableHandle(AwakeableIdentifier)}.
* #awakeableHandle(String)}.
*
* @param typeTag the response type tag to use for deserializing
* @return the result value of the external system interaction
Expand All @@ -137,10 +136,10 @@ default <T> Awakeable<T> awakeable(Class<T> type) {
<T> Awakeable<T> awakeable(TypeTag<T> typeTag);

/**
* Create a new {@link AwakeableHandle} for the provided {@link AwakeableIdentifier}. You can use
* it to {@link AwakeableHandle#complete(TypeTag, Object)} the linked {@link Awakeable}.
* Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link
* AwakeableHandle#complete(TypeTag, Object)} the linked {@link Awakeable}.
*
* @see Awakeable
*/
AwakeableHandle awakeableHandle(AwakeableIdentifier id);
AwakeableHandle awakeableHandle(String id);
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.sdk.blocking;

import com.google.protobuf.MessageLite;
import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.sdk.core.StateKey;
import dev.restate.sdk.core.TypeTag;
import dev.restate.sdk.core.syscalls.*;
Expand Down Expand Up @@ -144,14 +143,14 @@ public void onCancel(@Nullable Throwable t) {
@Override
public <T> Awakeable<T> awakeable(TypeTag<T> typeTag) throws StatusRuntimeException {
// Retrieve the awakeable
Map.Entry<AwakeableIdentifier, DeferredResult<T>> awakeable =
Map.Entry<String, DeferredResult<T>> awakeable =
Util.blockOnSyscall(cb -> syscalls.awakeable(typeTag, cb));

return new Awakeable<>(syscalls, awakeable.getValue(), awakeable.getKey());
}

@Override
public AwakeableHandle awakeableHandle(AwakeableIdentifier id) {
public AwakeableHandle awakeableHandle(String id) {
return new AwakeableHandle() {
@Override
public <T> void complete(TypeTag<T> typeTag, @Nonnull T payload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.MessageLite;
import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.sdk.core.TypeTag;
import dev.restate.sdk.core.syscalls.*;
import io.grpc.MethodDescriptor;
Expand Down Expand Up @@ -164,17 +163,13 @@ public void exitSideEffectBlockWithException(

@Override
public <T> void awakeable(
TypeTag<T> typeTag,
SyscallCallback<Map.Entry<AwakeableIdentifier, DeferredResult<T>>> callback) {
TypeTag<T> typeTag, SyscallCallback<Map.Entry<String, DeferredResult<T>>> callback) {
syscallsExecutor.execute(() -> syscalls.awakeable(typeTag, callback));
}

@Override
public <T> void completeAwakeable(
AwakeableIdentifier id,
TypeTag<T> typeTag,
@Nonnull T payload,
SyscallCallback<Void> requestCallback) {
String id, TypeTag<T> typeTag, @Nonnull T payload, SyscallCallback<Void> requestCallback) {
syscallsExecutor.execute(
() -> syscalls.completeAwakeable(id, typeTag, payload, requestCallback));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static dev.restate.sdk.core.impl.Util.toProtocolFailure;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.generated.sdk.java.Java;
Expand Down Expand Up @@ -222,30 +223,40 @@ public void exitSideEffectBlockWithException(

@Override
public <T> void awakeable(
TypeTag<T> typeTag,
SyscallCallback<Map.Entry<AwakeableIdentifier, DeferredResult<T>>> callback) {
TypeTag<T> typeTag, SyscallCallback<Map.Entry<String, DeferredResult<T>>> callback) {
LOG.trace("callback");
this.stateMachine.processCompletableJournalEntry(
Protocol.AwakeableEntryMessage.getDefaultInstance(),
new AwakeableEntry<>(serdeDeserializer(typeTag)),
SyscallCallback.mappingTo(
deferredResult ->
new AbstractMap.SimpleImmutableEntry<>(
AwakeableIdentifier.newBuilder()
.setServiceName(stateMachine.getServiceName())
.setInstanceKey(stateMachine.getInstanceKey())
.setInvocationId(stateMachine.getInvocationId())
.setEntryIndex(
((SingleDeferredResultInternal<T>) deferredResult).entryIndex())
.build(),
Base64.getUrlEncoder()
.encodeToString(
AwakeableIdentifier.newBuilder()
.setServiceName(stateMachine.getServiceName())
.setInstanceKey(stateMachine.getInstanceKey())
.setInvocationId(stateMachine.getInvocationId())
.setEntryIndex(
((SingleDeferredResultInternal<T>) deferredResult).entryIndex())
.build()
.toByteArray()),
deferredResult),
callback));
}

@Override
public <T> void completeAwakeable(
AwakeableIdentifier id, TypeTag<T> ty, @Nonnull T payload, SyscallCallback<Void> callback) {
String serializedId, TypeTag<T> ty, @Nonnull T payload, SyscallCallback<Void> callback) {
LOG.trace("completeAwakeable");

Protocol.AwakeableIdentifier id;
try {
id = Protocol.AwakeableIdentifier.parseFrom(Base64.getUrlDecoder().decode(serializedId));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Cannot decode AwakeableIdentifier", e);
}

Objects.requireNonNull(payload);
ByteString serialized = serialize(ty, payload);

Expand All @@ -255,7 +266,7 @@ public <T> void completeAwakeable(
.setInstanceKey(id.getInstanceKey())
.setInvocationId(id.getInvocationId())
.setEntryIndex(id.getEntryIndex())
.setPayload(serialized)
.setValue(serialized)
.build();
this.stateMachine.processJournalEntry(expectedEntry, CompleteAwakeableEntry.INSTANCE, callback);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.MessageLite;
import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.sdk.core.TypeTag;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
Expand Down Expand Up @@ -78,14 +77,10 @@ void exitSideEffectBlockWithException(
Throwable toWrite, ExitSideEffectSyscallCallback<?> callback);

<T> void awakeable(
TypeTag<T> typeTag,
SyscallCallback<Map.Entry<AwakeableIdentifier, DeferredResult<T>>> callback);
TypeTag<T> typeTag, SyscallCallback<Map.Entry<String, DeferredResult<T>>> callback);

<T> void completeAwakeable(
AwakeableIdentifier id,
TypeTag<T> ty,
@Nonnull T payload,
SyscallCallback<Void> requestCallback);
String id, TypeTag<T> ty, @Nonnull T payload, SyscallCallback<Void> requestCallback);

// ----- Deferred

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package dev.restate.sdk.kotlin

import dev.restate.generated.core.AwakeableIdentifier
import dev.restate.sdk.core.TypeTag
import dev.restate.sdk.core.syscalls.DeferredResult
import dev.restate.sdk.core.syscalls.Syscalls
Expand All @@ -12,7 +11,7 @@ sealed interface Awaitable<T> {
}

sealed interface Awakeable<T> : Awaitable<T> {
val id: AwakeableIdentifier
val id: String
}

internal abstract class BaseAwaitableImpl<JAVA_T, KT_T>
Expand Down Expand Up @@ -61,11 +60,10 @@ internal class AwakeableImpl<T>
internal constructor(
syscalls: Syscalls,
deferredResult: DeferredResult<T>,
override val id: AwakeableIdentifier
override val id: String
) : NonNullAwaitableImpl<T>(syscalls, deferredResult), Awakeable<T>

internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: AwakeableIdentifier) :
AwakeableHandle {
internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: String) : AwakeableHandle {
override suspend fun <T : Any> complete(typeTag: TypeTag<T>, payload: T) {
return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.completeAwakeable(id, typeTag, payload, completingUnitContinuation(cont))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.sdk.kotlin

import com.google.protobuf.MessageLite
import dev.restate.generated.core.AwakeableIdentifier
import dev.restate.sdk.core.StateKey
import dev.restate.sdk.core.TypeTag
import io.grpc.MethodDescriptor
Expand Down Expand Up @@ -55,7 +54,7 @@ sealed interface RestateContext {

suspend fun <T> awakeable(typeTag: TypeTag<T>): Awakeable<T>

fun awakeableHandle(id: AwakeableIdentifier): AwakeableHandle
fun awakeableHandle(id: String): AwakeableHandle
}

sealed interface AwakeableHandle {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.sdk.kotlin

import com.google.protobuf.MessageLite
import dev.restate.generated.core.AwakeableIdentifier
import dev.restate.sdk.core.*
import dev.restate.sdk.core.syscalls.DeferredResult
import dev.restate.sdk.core.syscalls.EnterSideEffectSyscallCallback
Expand Down Expand Up @@ -158,14 +157,14 @@ internal class RestateContextImpl internal constructor(private val syscalls: Sys
override suspend fun <T> awakeable(typeTag: TypeTag<T>): Awakeable<T> {
val (aid, deferredResult) =
suspendCancellableCoroutine {
cont: CancellableContinuation<Map.Entry<AwakeableIdentifier, DeferredResult<T>>> ->
cont: CancellableContinuation<Map.Entry<String, DeferredResult<T>>> ->
syscalls.awakeable(typeTag, completingContinuation(cont))
}

return AwakeableImpl(syscalls, deferredResult, aid)
}

override fun awakeableHandle(id: AwakeableIdentifier): AwakeableHandle {
override fun awakeableHandle(id: String): AwakeableHandle {
return AwakeableHandleImpl(syscalls, id)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public void handleAwakeableEntryMessage() {
routeMessage(
Protocol.CompletionMessage.newBuilder()
.setEntryIndex(completeAwakeMsg.getEntryIndex())
.setValue(completeAwakeMsg.getPayload())
.setValue(completeAwakeMsg.getValue())
.build());
return null;
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.sdk.testing.services;

import com.google.protobuf.Empty;
import dev.restate.generated.core.AwakeableIdentifier;
import dev.restate.sdk.blocking.AwakeableHandle;
import dev.restate.sdk.blocking.RestateBlockingService;
import dev.restate.sdk.blocking.RestateContext;
Expand All @@ -21,15 +20,8 @@ public class AwakeService extends AwakeServiceGrpc.AwakeServiceImplBase
public void awake(AwakeServiceRequest request, StreamObserver<Empty> responseObserver) {
LOG.debug("Executing the GreeterTwo.awakeTheOtherService method");
RestateContext ctx = restateContext();
AwakeableIdentifier identifier =
AwakeableIdentifier.newBuilder()
.setServiceName(request.getServiceName())
.setInstanceKey(request.getInstanceKey())
.setInvocationId(request.getInvocationId())
.setEntryIndex(request.getEntryIndex())
.build();

AwakeableHandle awakeableHandle = ctx.awakeableHandle(identifier);
AwakeableHandle awakeableHandle = ctx.awakeableHandle(request.getId());
awakeableHandle.complete(TypeTag.STRING_UTF8, "Wake up!");

responseObserver.onNext(Empty.getDefaultInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,7 @@ public void sleepAndGetWokenUp(
Awakeable<String> a1 = ctx.awakeable(TypeTag.STRING_UTF8);

// Tell GreeterTwo to wake us up with the awakeable identifier.
AwakeServiceRequest info =
AwakeServiceRequest.newBuilder()
.setServiceName(a1.id().getServiceName())
.setInstanceKey(a1.id().getInstanceKey())
.setEntryIndex(a1.id().getEntryIndex())
.setInvocationId(a1.id().getInvocationId())
.build();
AwakeServiceRequest info = AwakeServiceRequest.newBuilder().setId(a1.id()).build();
ctx.backgroundCall(AwakeServiceGrpc.getAwakeMethod(), info);

// Suspend until GreeterTwo wakes us up.
Expand Down
5 changes: 1 addition & 4 deletions sdk-testing/src/test/proto/awake-service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,5 @@ service AwakeService {
}

message AwakeServiceRequest {
string service_name = 1;
bytes instance_key = 2;
bytes invocation_id = 3;
uint32 entry_index = 4;
string id = 1;
}

0 comments on commit ab4b604

Please sign in to comment.