Skip to content

Commit

Permalink
Simpler error strategy (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Aug 14, 2023
1 parent e1a6389 commit d9ebad4
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,21 @@
import dev.restate.sdk.core.BindableBlockingService;
import dev.restate.sdk.core.syscalls.Syscalls;

/** Marker interface for Restate blocking services. */
/**
* Marker interface for Restate blocking services.
*
* <h3>Error handling</h3>
*
* The error handling of Restate services works as follows:
*
* <ul>
* <li>When throwing {@link io.grpc.StatusException} or {@link io.grpc.StatusRuntimeException},
* the failure is considered "terminal" and will be used as invocation output
* <li>When throwing any other type of exception, the failure is considered "non-terminal" and the
* runtime will retry it, according to its configuration
* <li>In case {@code StreamObserver#onError} is invoked, the failure is considered "terminal"
* </ul>
*/
public interface RestateBlockingService extends BindableBlockingService {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.Status;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -68,7 +67,7 @@ private void closeWithException(Throwable e) {
serverCall.close(Util.SUSPENDED_STATUS, new Metadata());
} else {
LOG.warn("Error when processing the invocation", e);
serverCall.close(Status.fromThrowable(e), new Metadata());
serverCall.close(Util.toGrpcStatusWrappingUncaught(e), new Metadata());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public void close(Status status, Metadata trailers) {
// Let's cancel the listener first
listener.onCancel();

if (status.getCode() == Status.Code.UNKNOWN) {
// If no cause, just propagate a generic runtime exception
syscalls.fail(status.getCause() != null ? status.getCause() : status.asRuntimeException());
if (status.getCause() instanceof UncaughtException) {
// This is the case where we have uncaught exceptions from GrpcServerCallListenerAdaptor
syscalls.fail(status.getCause().getCause());
} else {
syscalls.writeOutput(
status.asRuntimeException(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dev.restate.sdk.core.impl;

/**
* Just a marker exception used to mark an exception as uncaught in {@link
* GrpcServerCallListenerAdaptor}.
*/
class UncaughtException extends RuntimeException {

UncaughtException(Throwable t) {
super(t);
}
}
18 changes: 15 additions & 3 deletions sdk-core-impl/src/main/java/dev/restate/sdk/core/impl/Util.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package dev.restate.sdk.core.impl;

import com.google.protobuf.MessageLite;
import com.google.rpc.Code;
import dev.restate.generated.sdk.java.Java;
import dev.restate.generated.service.protocol.Protocol;
import dev.restate.sdk.core.SuspendedException;
Expand Down Expand Up @@ -43,6 +42,20 @@ static <T extends Throwable> Optional<T> findCause(
return Optional.empty();
}

public static Status toGrpcStatusWrappingUncaught(Throwable t) {
Throwable cause = Objects.requireNonNull(t);
while (cause != null) {
if (cause instanceof StatusException) {
return ((StatusException) cause).getStatus();
} else if (cause instanceof StatusRuntimeException) {
return ((StatusRuntimeException) cause).getStatus();
}
cause = cause.getCause();
}
// Couldn't find a cause with a Status
return Status.UNKNOWN.withCause(new UncaughtException(t));
}

public static Optional<ProtocolException> findProtocolException(Throwable throwable) {
return findCause(throwable, t -> t instanceof ProtocolException);
}
Expand Down Expand Up @@ -81,8 +94,7 @@ static Status toGrpcStatusErasingCause(Throwable throwable) {
}

static boolean isTerminalException(Throwable throwable) {
return throwable instanceof StatusRuntimeException
&& ((StatusRuntimeException) throwable).getStatus().getCode().value() != Code.UNKNOWN_VALUE;
return throwable instanceof StatusRuntimeException || throwable instanceof StatusException;
}

static void assertIsEntry(MessageLite msg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,6 @@ Stream<TestDefinition> definitions() {
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
.assertingOutput(containsOnlyExactErrorMessage(new IllegalStateException("Whatever"))),
testInvocation(new ThrowUnknownStatusRuntimeException(), GreeterGrpc.getGreetMethod())
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
.assertingOutput(
containsOnlyExactErrorMessage(
Status.UNKNOWN.withDescription("Whatever").asRuntimeException())),
testInvocation(
new ResponseObserverOnErrorIllegalStateException(), GreeterGrpc.getGreetMethod())
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
.assertingOutput(containsOnlyExactErrorMessage(new IllegalStateException("Whatever"))),
testInvocation(new SideEffectThrowIllegalStateException(), GreeterGrpc.getGreetMethod())
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
Expand All @@ -108,11 +97,20 @@ Stream<TestDefinition> definitions() {
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
.expectingOutput(outputMessage(MY_ERROR)),
testInvocation(new ThrowUnknownStatusRuntimeException(), GreeterGrpc.getGreetMethod())
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
.expectingOutput(outputMessage(Status.UNKNOWN.withDescription("Whatever"))),
testInvocation(
new ResponseObserverOnErrorStatusRuntimeException(), GreeterGrpc.getGreetMethod())
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
.expectingOutput(outputMessage(MY_ERROR)),
testInvocation(
new ResponseObserverOnErrorIllegalStateException(), GreeterGrpc.getGreetMethod())
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
.expectingOutput(outputMessage(Status.UNKNOWN)),
testInvocation(new SideEffectThrowStatusRuntimeException(), GreeterGrpc.getGreetMethod())
.withInput(startMessage(1), inputMessage(GreetingRequest.getDefaultInstance()))
.usingAllThreadingModels()
Expand Down

0 comments on commit d9ebad4

Please sign in to comment.