Skip to content

Commit

Permalink
Support named entries, and allow users to set side effects name.
Browse files Browse the repository at this point in the history
This commit adds support in the state machine to propagate additional entry info about "failing entries" in the EndMessage, and allows the user to set the name for side effect entries.
  • Loading branch information
slinkydeveloper committed Apr 16, 2024
1 parent 4868e29 commit 4657365
Show file tree
Hide file tree
Showing 18 changed files with 333 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,16 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}
}

override suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T {
override suspend fun <T : Any?> runBlock(
serde: Serde<T>,
name: String,
block: suspend () -> T
): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteString>>
->
syscalls.enterSideEffectBlock(
name,
object : EnterSideEffectSyscallCallback {
override fun onSuccess(t: ByteString?) {
val deferred: CompletableDeferred<ByteString> = CompletableDeferred()
Expand Down
13 changes: 10 additions & 3 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ sealed interface Context {
* suspension point) without re-executing the closure. Use this feature if you want to perform
* <b>non-deterministic operations</b>.
*
* You can name this closure using the `name` parameter. This name will be available in the
* observability tools.
*
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
* times until it records a result.
*
Expand Down Expand Up @@ -138,11 +141,12 @@ sealed interface Context {
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
*
* @param serde the type tag of the return value, used to serialize/deserialize it.
* @param name the name of the side effect.
* @param block closure to execute.
* @param T type of the return value.
* @return value of the runBlock operation.
*/
suspend fun <T : Any?> runBlock(serde: Serde<T>, block: suspend () -> T): T
suspend fun <T : Any?> runBlock(serde: Serde<T>, name: String = "", block: suspend () -> T): T

/**
* Create an [Awakeable], addressable through [Awakeable.id].
Expand Down Expand Up @@ -221,8 +225,11 @@ sealed interface Context {
* @param T type of the return value.
* @return value of the runBlock operation.
*/
suspend inline fun <reified T : Any> Context.runBlock(noinline block: suspend () -> T): T {
return this.runBlock(KtSerdes.json(), block)
suspend inline fun <reified T : Any> Context.runBlock(
name: String = "",
noinline block: suspend () -> T
): T {
return this.runBlock(KtSerdes.json(), name, block)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ class SideEffectTest : SideEffectTestSuite() {
"Hello $result"
}

override fun namedSideEffect(name: String, sideEffectOutput: String): TestInvocationBuilder =
testDefinitionForService("SideEffect") { ctx, _: Unit ->
val result = ctx.runBlock(name) { sideEffectOutput }
"Hello $result"
}

override fun consecutiveSideEffect(sideEffectOutput: String): TestInvocationBuilder =
testDefinitionForService("ConsecutiveSideEffect") { ctx, _: Unit ->
val firstResult = ctx.runBlock { sideEffectOutput }
Expand Down Expand Up @@ -54,4 +60,9 @@ class SideEffectTest : SideEffectTestSuite() {
ctx.runBlock { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
throw IllegalStateException("This point should not be reached")
}

override fun failingSideEffect(name: String, reason: String): TestInvocationBuilder =
testDefinitionForService<Unit, String>("FailingSideEffect") { ctx, _: Unit ->
ctx.runBlock(name) { throw IllegalStateException(reason) }
}
}
21 changes: 18 additions & 3 deletions sdk-api/src/main/java/dev/restate/sdk/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ default void sleep(Duration duration) {
* suspension point) without re-executing the closure. Use this feature if you want to perform
* <b>non-deterministic operations</b>.
*
* <p>You can name this closure using the {@code name} parameter. This name will be available in
* the observability tools.
*
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
* times until it records a result.
*
Expand Down Expand Up @@ -133,23 +136,35 @@ default void sleep(Duration duration) {
* To propagate run failures to the call-site, make sure to wrap them in {@link
* TerminalException}.
*
* @param name name of the side effect.
* @param serde the type tag of the return value, used to serialize/deserialize it.
* @param action closure to execute.
* @param <T> type of the return value.
* @return value of the run operation.
*/
<T> T run(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException;
<T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException;

/** Like {@link #run(Serde, ThrowingSupplier)}, but without returning a value. */
default void run(ThrowingRunnable runnable) throws TerminalException {
/** Like {@link #run(String, Serde, ThrowingSupplier)}, but without returning a value. */
default void run(String name, ThrowingRunnable runnable) throws TerminalException {
run(
name,
CoreSerdes.VOID,
() -> {
runnable.run();
return null;
});
}

/** Like {@link #run(String, Serde, ThrowingSupplier)}, but without a name. */
default <T> T run(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException {
return run(null, serde, action);
}

/** Like {@link #run(String, ThrowingRunnable)}, but without a name. */
default void run(ThrowingRunnable runnable) throws TerminalException {
run(null, runnable);
}

/**
* Create an {@link Awakeable}, addressable through {@link Awakeable#id()}.
*
Expand Down
3 changes: 2 additions & 1 deletion sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ public <T> void send(Target target, Serde<T> inputSerde, T parameter, Duration d
}

@Override
public <T> T run(Serde<T> serde, ThrowingSupplier<T> action) {
public <T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action) {
CompletableFuture<CompletableFuture<ByteString>> enterFut = new CompletableFuture<>();
syscalls.enterSideEffectBlock(
name,
new EnterSideEffectSyscallCallback() {
@Override
public void onNotExecuted() {
Expand Down
32 changes: 32 additions & 0 deletions sdk-api/src/test/java/dev/restate/sdk/SideEffectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

public class SideEffectTest extends SideEffectTestSuite {

@Override
protected TestInvocationBuilder sideEffect(String sideEffectOutput) {
return testDefinitionForService(
"SideEffect",
Expand All @@ -29,6 +30,19 @@ protected TestInvocationBuilder sideEffect(String sideEffectOutput) {
});
}

@Override
protected TestInvocationBuilder namedSideEffect(String name, String sideEffectOutput) {
return testDefinitionForService(
"SideEffect",
CoreSerdes.VOID,
CoreSerdes.JSON_STRING,
(ctx, unused) -> {
String result = ctx.run(name, CoreSerdes.JSON_STRING, () -> sideEffectOutput);
return "Hello " + result;
});
}

@Override
protected TestInvocationBuilder consecutiveSideEffect(String sideEffectOutput) {
return testDefinitionForService(
"ConsecutiveSideEffect",
Expand All @@ -42,6 +56,7 @@ protected TestInvocationBuilder consecutiveSideEffect(String sideEffectOutput) {
});
}

@Override
protected TestInvocationBuilder checkContextSwitching() {
return testDefinitionForService(
"CheckContextSwitching",
Expand All @@ -65,6 +80,7 @@ protected TestInvocationBuilder checkContextSwitching() {
});
}

@Override
protected TestInvocationBuilder sideEffectGuard() {
return testDefinitionForService(
"SideEffectGuard",
Expand All @@ -75,4 +91,20 @@ protected TestInvocationBuilder sideEffectGuard() {
throw new IllegalStateException("This point should not be reached");
});
}

@Override
protected TestInvocationBuilder failingSideEffect(String name, String reason) {
return testDefinitionForService(
"FailingSideEffect",
CoreSerdes.VOID,
CoreSerdes.JSON_STRING,
(ctx, unused) -> {
ctx.run(
name,
() -> {
throw new IllegalStateException(reason);
});
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void send(
@Nullable Duration delay,
SyscallCallback<Void> requestCallback);

void enterSideEffectBlock(EnterSideEffectSyscallCallback callback);
void enterSideEffectBlock(@Nullable String name, EnterSideEffectSyscallCallback callback);

void exitSideEffectBlock(ByteString toWrite, ExitSideEffectSyscallCallback callback);

Expand Down
57 changes: 57 additions & 0 deletions sdk-core/src/main/java/dev/restate/sdk/core/Entries.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ final class Entries {
private Entries() {}

abstract static class JournalEntry<E extends MessageLite> {
abstract String getName(E expected);

void checkEntryHeader(E expected, MessageLite actual) throws ProtocolException {}

abstract void trace(E expected, Span span);
Expand Down Expand Up @@ -57,6 +59,11 @@ static final class OutputEntry extends JournalEntry<OutputEntryMessage> {

private OutputEntry() {}

@Override
String getName(OutputEntryMessage expected) {
return expected.getName();
}

@Override
public void trace(OutputEntryMessage expected, Span span) {
span.addEvent("Output");
Expand All @@ -81,6 +88,11 @@ public boolean hasResult(GetStateEntryMessage actual) {
return actual.getResultCase() != GetStateEntryMessage.ResultCase.RESULT_NOT_SET;
}

@Override
String getName(GetStateEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(GetStateEntryMessage expected, MessageLite actual)
throws ProtocolException {
Expand Down Expand Up @@ -163,6 +175,11 @@ public boolean hasResult(GetStateKeysEntryMessage actual) {
return actual.getResultCase() != GetStateKeysEntryMessage.ResultCase.RESULT_NOT_SET;
}

@Override
String getName(GetStateKeysEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(GetStateKeysEntryMessage expected, MessageLite actual)
throws ProtocolException {
Expand Down Expand Up @@ -232,6 +249,11 @@ public void trace(ClearStateEntryMessage expected, Span span) {
"ClearState", Attributes.of(Tracing.RESTATE_STATE_KEY, expected.getKey().toString()));
}

@Override
String getName(ClearStateEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(ClearStateEntryMessage expected, MessageLite actual)
throws ProtocolException {
Expand All @@ -256,6 +278,11 @@ public void trace(ClearAllStateEntryMessage expected, Span span) {
span.addEvent("ClearAllState");
}

@Override
String getName(ClearAllStateEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(ClearAllStateEntryMessage expected, MessageLite actual)
throws ProtocolException {
Expand All @@ -281,6 +308,11 @@ public void trace(SetStateEntryMessage expected, Span span) {
"SetState", Attributes.of(Tracing.RESTATE_STATE_KEY, expected.getKey().toString()));
}

@Override
String getName(SetStateEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(SetStateEntryMessage expected, MessageLite actual)
throws ProtocolException {
Expand All @@ -305,6 +337,11 @@ static final class SleepEntry extends CompletableJournalEntry<SleepEntryMessage,

private SleepEntry() {}

@Override
String getName(SleepEntryMessage expected) {
return expected.getName();
}

@Override
void trace(SleepEntryMessage expected, Span span) {
span.addEvent(
Expand Down Expand Up @@ -362,6 +399,11 @@ public boolean hasResult(InvokeEntryMessage actual) {
return actual.getResultCase() != Protocol.InvokeEntryMessage.ResultCase.RESULT_NOT_SET;
}

@Override
String getName(InvokeEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(InvokeEntryMessage expected, MessageLite actual)
throws ProtocolException {
Expand Down Expand Up @@ -414,6 +456,11 @@ public void trace(BackgroundInvokeEntryMessage expected, Span span) {
expected.getMethodName()));
}

@Override
String getName(BackgroundInvokeEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(BackgroundInvokeEntryMessage expected, MessageLite actual)
throws ProtocolException {
Expand All @@ -427,6 +474,11 @@ static final class AwakeableEntry

private AwakeableEntry() {}

@Override
String getName(AwakeableEntryMessage expected) {
return expected.getName();
}

@Override
void trace(AwakeableEntryMessage expected, Span span) {
span.addEvent("Awakeable");
Expand Down Expand Up @@ -468,6 +520,11 @@ public void trace(CompleteAwakeableEntryMessage expected, Span span) {
span.addEvent("CompleteAwakeable");
}

@Override
String getName(CompleteAwakeableEntryMessage expected) {
return expected.getName();
}

@Override
void checkEntryHeader(CompleteAwakeableEntryMessage expected, MessageLite actual)
throws ProtocolException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public void send(
}

@Override
public void enterSideEffectBlock(EnterSideEffectSyscallCallback callback) {
syscallsExecutor.execute(() -> syscalls.enterSideEffectBlock(callback));
public void enterSideEffectBlock(String name, EnterSideEffectSyscallCallback callback) {
syscallsExecutor.execute(() -> syscalls.enterSideEffectBlock(name, callback));
}

@Override
Expand Down
Loading

0 comments on commit 4657365

Please sign in to comment.