diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/Awakeable.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/Awakeable.java index b24e9c12..46ad1a4a 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/Awakeable.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/Awakeable.java @@ -1,6 +1,5 @@ package dev.restate.sdk.blocking; -import dev.restate.generated.core.AwakeableIdentifier; import dev.restate.sdk.core.syscalls.DeferredResult; import dev.restate.sdk.core.syscalls.Syscalls; import javax.annotation.concurrent.NotThreadSafe; @@ -14,14 +13,14 @@ * *

For example you can send a Kafka record including the {@link Awakeable#id()}, and then let * another service consume from Kafka the responses of given external system interaction by using - * {@link RestateContext#awakeableHandle(AwakeableIdentifier)}. + * {@link RestateContext#awakeableHandle(String)}. */ @NotThreadSafe public final class Awakeable extends Awaitable { - private final AwakeableIdentifier identifier; + private final String identifier; - Awakeable(Syscalls syscalls, DeferredResult deferredResult, AwakeableIdentifier identifier) { + Awakeable(Syscalls syscalls, DeferredResult deferredResult, String identifier) { super(syscalls, deferredResult); this.identifier = identifier; } @@ -29,7 +28,7 @@ public final class Awakeable extends Awaitable { /** * @return the unique identifier of this {@link Awakeable} instance. */ - public AwakeableIdentifier id() { + public String id() { return identifier; } } diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java index 2f808ad3..807b87ed 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContext.java @@ -1,7 +1,6 @@ package dev.restate.sdk.blocking; import com.google.protobuf.MessageLite; -import dev.restate.generated.core.AwakeableIdentifier; import dev.restate.sdk.core.StateKey; import dev.restate.sdk.core.TypeTag; import dev.restate.sdk.core.serde.Serde; @@ -128,7 +127,7 @@ default Awakeable awakeable(Class type) { *

You can use this feature to implement external asynchronous systems interactions, for * example you can send a Kafka record including the {@link Awakeable#id()}, and then let another * service consume from Kafka the responses of given external system interaction by using {@link - * #awakeableHandle(AwakeableIdentifier)}. + * #awakeableHandle(String)}. * * @param typeTag the response type tag to use for deserializing * @return the result value of the external system interaction @@ -137,10 +136,10 @@ default Awakeable awakeable(Class type) { Awakeable awakeable(TypeTag typeTag); /** - * Create a new {@link AwakeableHandle} for the provided {@link AwakeableIdentifier}. You can use - * it to {@link AwakeableHandle#complete(TypeTag, Object)} the linked {@link Awakeable}. + * Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link + * AwakeableHandle#complete(TypeTag, Object)} the linked {@link Awakeable}. * * @see Awakeable */ - AwakeableHandle awakeableHandle(AwakeableIdentifier id); + AwakeableHandle awakeableHandle(String id); } diff --git a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java index e8d72365..cb381399 100644 --- a/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java +++ b/sdk-blocking/src/main/java/dev/restate/sdk/blocking/RestateContextImpl.java @@ -1,7 +1,6 @@ package dev.restate.sdk.blocking; import com.google.protobuf.MessageLite; -import dev.restate.generated.core.AwakeableIdentifier; import dev.restate.sdk.core.StateKey; import dev.restate.sdk.core.TypeTag; import dev.restate.sdk.core.syscalls.*; @@ -144,14 +143,14 @@ public void onCancel(@Nullable Throwable t) { @Override public Awakeable awakeable(TypeTag typeTag) throws StatusRuntimeException { // Retrieve the awakeable - Map.Entry> awakeable = + Map.Entry> awakeable = Util.blockOnSyscall(cb -> syscalls.awakeable(typeTag, cb)); return new Awakeable<>(syscalls, awakeable.getValue(), awakeable.getKey()); } @Override - public AwakeableHandle awakeableHandle(AwakeableIdentifier id) { + public AwakeableHandle awakeableHandle(String id) { return new AwakeableHandle() { @Override public void complete(TypeTag typeTag, @Nonnull T payload) { diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java index 9026e09d..b9f549cc 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/ExecutorSwitchingWrappers.java @@ -2,7 +2,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.MessageLite; -import dev.restate.generated.core.AwakeableIdentifier; import dev.restate.sdk.core.TypeTag; import dev.restate.sdk.core.syscalls.*; import io.grpc.MethodDescriptor; @@ -164,17 +163,13 @@ public void exitSideEffectBlockWithException( @Override public void awakeable( - TypeTag typeTag, - SyscallCallback>> callback) { + TypeTag typeTag, SyscallCallback>> callback) { syscallsExecutor.execute(() -> syscalls.awakeable(typeTag, callback)); } @Override public void completeAwakeable( - AwakeableIdentifier id, - TypeTag typeTag, - @Nonnull T payload, - SyscallCallback requestCallback) { + String id, TypeTag typeTag, @Nonnull T payload, SyscallCallback requestCallback) { syscallsExecutor.execute( () -> syscalls.completeAwakeable(id, typeTag, payload, requestCallback)); } diff --git a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java index a9695875..0f288f16 100644 --- a/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java +++ b/sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/SyscallsImpl.java @@ -4,6 +4,7 @@ import static dev.restate.sdk.core.impl.Util.toProtocolFailure; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; import dev.restate.generated.core.AwakeableIdentifier; import dev.restate.generated.sdk.java.Java; @@ -222,8 +223,7 @@ public void exitSideEffectBlockWithException( @Override public void awakeable( - TypeTag typeTag, - SyscallCallback>> callback) { + TypeTag typeTag, SyscallCallback>> callback) { LOG.trace("callback"); this.stateMachine.processCompletableJournalEntry( Protocol.AwakeableEntryMessage.getDefaultInstance(), @@ -231,21 +231,32 @@ public void awakeable( SyscallCallback.mappingTo( deferredResult -> new AbstractMap.SimpleImmutableEntry<>( - AwakeableIdentifier.newBuilder() - .setServiceName(stateMachine.getServiceName()) - .setInstanceKey(stateMachine.getInstanceKey()) - .setInvocationId(stateMachine.getInvocationId()) - .setEntryIndex( - ((SingleDeferredResultInternal) deferredResult).entryIndex()) - .build(), + Base64.getUrlEncoder() + .encodeToString( + AwakeableIdentifier.newBuilder() + .setServiceName(stateMachine.getServiceName()) + .setInstanceKey(stateMachine.getInstanceKey()) + .setInvocationId(stateMachine.getInvocationId()) + .setEntryIndex( + ((SingleDeferredResultInternal) deferredResult).entryIndex()) + .build() + .toByteArray()), deferredResult), callback)); } @Override public void completeAwakeable( - AwakeableIdentifier id, TypeTag ty, @Nonnull T payload, SyscallCallback callback) { + String serializedId, TypeTag ty, @Nonnull T payload, SyscallCallback callback) { LOG.trace("completeAwakeable"); + + Protocol.AwakeableIdentifier id; + try { + id = Protocol.AwakeableIdentifier.parseFrom(Base64.getUrlDecoder().decode(serializedId)); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Cannot decode AwakeableIdentifier", e); + } + Objects.requireNonNull(payload); ByteString serialized = serialize(ty, payload); @@ -255,7 +266,7 @@ public void completeAwakeable( .setInstanceKey(id.getInstanceKey()) .setInvocationId(id.getInvocationId()) .setEntryIndex(id.getEntryIndex()) - .setPayload(serialized) + .setValue(serialized) .build(); this.stateMachine.processJournalEntry(expectedEntry, CompleteAwakeableEntry.INSTANCE, callback); } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java index 9db7649b..c41f01fd 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/syscalls/Syscalls.java @@ -2,7 +2,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.MessageLite; -import dev.restate.generated.core.AwakeableIdentifier; import dev.restate.sdk.core.TypeTag; import io.grpc.Context; import io.grpc.MethodDescriptor; @@ -78,14 +77,10 @@ void exitSideEffectBlockWithException( Throwable toWrite, ExitSideEffectSyscallCallback callback); void awakeable( - TypeTag typeTag, - SyscallCallback>> callback); + TypeTag typeTag, SyscallCallback>> callback); void completeAwakeable( - AwakeableIdentifier id, - TypeTag ty, - @Nonnull T payload, - SyscallCallback requestCallback); + String id, TypeTag ty, @Nonnull T payload, SyscallCallback requestCallback); // ----- Deferred diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt index 71538071..dc8c4db7 100644 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/Awaitables.kt @@ -1,6 +1,5 @@ package dev.restate.sdk.kotlin -import dev.restate.generated.core.AwakeableIdentifier import dev.restate.sdk.core.TypeTag import dev.restate.sdk.core.syscalls.DeferredResult import dev.restate.sdk.core.syscalls.Syscalls @@ -12,7 +11,7 @@ sealed interface Awaitable { } sealed interface Awakeable : Awaitable { - val id: AwakeableIdentifier + val id: String } internal abstract class BaseAwaitableImpl @@ -61,11 +60,10 @@ internal class AwakeableImpl internal constructor( syscalls: Syscalls, deferredResult: DeferredResult, - override val id: AwakeableIdentifier + override val id: String ) : NonNullAwaitableImpl(syscalls, deferredResult), Awakeable -internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: AwakeableIdentifier) : - AwakeableHandle { +internal class AwakeableHandleImpl(val syscalls: Syscalls, val id: String) : AwakeableHandle { override suspend fun complete(typeTag: TypeTag, payload: T) { return suspendCancellableCoroutine { cont: CancellableContinuation -> syscalls.completeAwakeable(id, typeTag, payload, completingUnitContinuation(cont)) diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContext.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContext.kt index 35fd1542..6813d03d 100644 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContext.kt +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContext.kt @@ -1,7 +1,6 @@ package dev.restate.sdk.kotlin import com.google.protobuf.MessageLite -import dev.restate.generated.core.AwakeableIdentifier import dev.restate.sdk.core.StateKey import dev.restate.sdk.core.TypeTag import io.grpc.MethodDescriptor @@ -55,7 +54,7 @@ sealed interface RestateContext { suspend fun awakeable(typeTag: TypeTag): Awakeable - fun awakeableHandle(id: AwakeableIdentifier): AwakeableHandle + fun awakeableHandle(id: String): AwakeableHandle } sealed interface AwakeableHandle { diff --git a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt index baf444be..3cf3e5b1 100644 --- a/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt +++ b/sdk-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RestateContextImpl.kt @@ -1,7 +1,6 @@ package dev.restate.sdk.kotlin import com.google.protobuf.MessageLite -import dev.restate.generated.core.AwakeableIdentifier import dev.restate.sdk.core.* import dev.restate.sdk.core.syscalls.DeferredResult import dev.restate.sdk.core.syscalls.EnterSideEffectSyscallCallback @@ -158,14 +157,14 @@ internal class RestateContextImpl internal constructor(private val syscalls: Sys override suspend fun awakeable(typeTag: TypeTag): Awakeable { val (aid, deferredResult) = suspendCancellableCoroutine { - cont: CancellableContinuation>> -> + cont: CancellableContinuation>> -> syscalls.awakeable(typeTag, completingContinuation(cont)) } return AwakeableImpl(syscalls, deferredResult, aid) } - override fun awakeableHandle(id: AwakeableIdentifier): AwakeableHandle { + override fun awakeableHandle(id: String): AwakeableHandle { return AwakeableHandleImpl(syscalls, id) } } diff --git a/sdk-testing/src/main/java/dev/restate/sdk/testing/TestRestateRuntime.java b/sdk-testing/src/main/java/dev/restate/sdk/testing/TestRestateRuntime.java index 39360361..43de1fba 100644 --- a/sdk-testing/src/main/java/dev/restate/sdk/testing/TestRestateRuntime.java +++ b/sdk-testing/src/main/java/dev/restate/sdk/testing/TestRestateRuntime.java @@ -420,7 +420,7 @@ public void handleAwakeableEntryMessage() { routeMessage( Protocol.CompletionMessage.newBuilder() .setEntryIndex(completeAwakeMsg.getEntryIndex()) - .setValue(completeAwakeMsg.getPayload()) + .setValue(completeAwakeMsg.getValue()) .build()); return null; } else { diff --git a/sdk-testing/src/test/java/dev/restate/sdk/testing/services/AwakeService.java b/sdk-testing/src/test/java/dev/restate/sdk/testing/services/AwakeService.java index 89f00780..469103ca 100644 --- a/sdk-testing/src/test/java/dev/restate/sdk/testing/services/AwakeService.java +++ b/sdk-testing/src/test/java/dev/restate/sdk/testing/services/AwakeService.java @@ -1,7 +1,6 @@ package dev.restate.sdk.testing.services; import com.google.protobuf.Empty; -import dev.restate.generated.core.AwakeableIdentifier; import dev.restate.sdk.blocking.AwakeableHandle; import dev.restate.sdk.blocking.RestateBlockingService; import dev.restate.sdk.blocking.RestateContext; @@ -21,15 +20,8 @@ public class AwakeService extends AwakeServiceGrpc.AwakeServiceImplBase public void awake(AwakeServiceRequest request, StreamObserver responseObserver) { LOG.debug("Executing the GreeterTwo.awakeTheOtherService method"); RestateContext ctx = restateContext(); - AwakeableIdentifier identifier = - AwakeableIdentifier.newBuilder() - .setServiceName(request.getServiceName()) - .setInstanceKey(request.getInstanceKey()) - .setInvocationId(request.getInvocationId()) - .setEntryIndex(request.getEntryIndex()) - .build(); - AwakeableHandle awakeableHandle = ctx.awakeableHandle(identifier); + AwakeableHandle awakeableHandle = ctx.awakeableHandle(request.getId()); awakeableHandle.complete(TypeTag.STRING_UTF8, "Wake up!"); responseObserver.onNext(Empty.getDefaultInstance()); diff --git a/sdk-testing/src/test/java/dev/restate/sdk/testing/services/GreeterOne.java b/sdk-testing/src/test/java/dev/restate/sdk/testing/services/GreeterOne.java index 74999dc4..6472b712 100644 --- a/sdk-testing/src/test/java/dev/restate/sdk/testing/services/GreeterOne.java +++ b/sdk-testing/src/test/java/dev/restate/sdk/testing/services/GreeterOne.java @@ -205,13 +205,7 @@ public void sleepAndGetWokenUp( Awakeable a1 = ctx.awakeable(TypeTag.STRING_UTF8); // Tell GreeterTwo to wake us up with the awakeable identifier. - AwakeServiceRequest info = - AwakeServiceRequest.newBuilder() - .setServiceName(a1.id().getServiceName()) - .setInstanceKey(a1.id().getInstanceKey()) - .setEntryIndex(a1.id().getEntryIndex()) - .setInvocationId(a1.id().getInvocationId()) - .build(); + AwakeServiceRequest info = AwakeServiceRequest.newBuilder().setId(a1.id()).build(); ctx.backgroundCall(AwakeServiceGrpc.getAwakeMethod(), info); // Suspend until GreeterTwo wakes us up. diff --git a/sdk-testing/src/test/proto/awake-service.proto b/sdk-testing/src/test/proto/awake-service.proto index 2548e325..b1329e8a 100644 --- a/sdk-testing/src/test/proto/awake-service.proto +++ b/sdk-testing/src/test/proto/awake-service.proto @@ -16,8 +16,5 @@ service AwakeService { } message AwakeServiceRequest { - string service_name = 1; - bytes instance_key = 2; - bytes invocation_id = 3; - uint32 entry_index = 4; + string id = 1; } \ No newline at end of file