From cae0d3bb3c56808d46f9d8a8cbeb917bb81d44d7 Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Thu, 21 Mar 2024 10:22:53 +0100 Subject: [PATCH] Add resolve/reject awakeables from ingress. Fix #246 (#254) --- .../sdk/client/DefaultIngressClient.java | 70 +++++++++++++++++++ .../dev/restate/sdk/client/IngressClient.java | 29 ++++++++ 2 files changed, 99 insertions(+) diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java index 50289c54..2357999d 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java @@ -22,6 +22,7 @@ import java.net.http.HttpResponse; import java.util.Map; import java.util.concurrent.CompletableFuture; +import org.jspecify.annotations.NonNull; public class DefaultIngressClient implements IngressClient { @@ -92,6 +93,75 @@ public CompletableFuture sendAsync( }); } + @Override + public AwakeableHandle awakeableHandle(String id) { + return new AwakeableHandle() { + @Override + public CompletableFuture resolve(Serde serde, @NonNull T payload) { + // Prepare request + var reqBuilder = + HttpRequest.newBuilder().uri(URI.create("/restate/awakeables/" + id + "/resolve")); + + // Add content-type + if (serde.contentType() != null) { + reqBuilder.header("content-type", serde.contentType()); + } + + // Add headers + headers.forEach(reqBuilder::header); + + // Build and Send request + HttpRequest request = + reqBuilder + .POST(HttpRequest.BodyPublishers.ofByteArray(serde.serialize(payload))) + .build(); + return httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) + .handle( + (response, throwable) -> { + if (throwable != null) { + throw new IngressException("Error when executing the request", throwable); + } + + if (response.statusCode() >= 300) { + handleNonSuccessResponse(response); + } + + return null; + }); + } + + @Override + public CompletableFuture reject(String reason) { + // Prepare request + var reqBuilder = + HttpRequest.newBuilder() + .uri(URI.create("/restate/awakeables/" + id + "/reject")) + .header("content-type", "text-plain"); + + // Add headers + headers.forEach(reqBuilder::header); + + // Build and Send request + HttpRequest request = reqBuilder.POST(HttpRequest.BodyPublishers.ofString(reason)).build(); + return httpClient + .sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) + .handle( + (response, throwable) -> { + if (throwable != null) { + throw new IngressException("Error when executing the request", throwable); + } + + if (response.statusCode() >= 300) { + handleNonSuccessResponse(response); + } + + return null; + }); + } + }; + } + private URI toRequestURI(Target target, boolean isSend) { StringBuilder builder = new StringBuilder(); builder.append("/").append(target.getComponent()); diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java index 9105bcae..1e927a97 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java @@ -15,6 +15,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import org.jspecify.annotations.NonNull; public interface IngressClient { @@ -67,6 +68,34 @@ default String send(Target target, Serde reqSerde, Req req) throws In return send(target, reqSerde, req, RequestOptions.DEFAULT); } + /** + * Create a new {@link AwakeableHandle} for the provided identifier. You can use it to {@link + * AwakeableHandle#resolve(Serde, Object)} or {@link AwakeableHandle#reject(String)} an Awakeable + * from the ingress. + */ + AwakeableHandle awakeableHandle(String id); + + /** + * This class represents a handle to an Awakeable. It can be used to complete awakeables from the + * ingress + */ + interface AwakeableHandle { + /** + * Complete with success the Awakeable. + * + * @param serde used to serialize the Awakeable result payload. + * @param payload the result payload. MUST NOT be null. + */ + CompletableFuture resolve(Serde serde, @NonNull T payload); + + /** + * Complete with failure the Awakeable. + * + * @param reason the rejection reason. MUST NOT be null. + */ + CompletableFuture reject(String reason); + } + static IngressClient defaultClient(String baseUri) { return defaultClient(baseUri, Collections.emptyMap()); }