From f8244e2c68a4d13ff067cb43269f2c1686c21bcc Mon Sep 17 00:00:00 2001 From: Francesco Guardiani Date: Tue, 9 Apr 2024 11:34:23 +0200 Subject: [PATCH] Add support for Send delay in ingress client (#275) --- .../src/main/resources/templates/Client.hbs | 15 ++++++++++- .../src/main/resources/templates/Client.hbs | 7 ++--- .../kotlin/dev/restate/sdk/kotlin/ingress.kt | 5 +++- .../sdk/client/DefaultIngressClient.java | 21 ++++++++++----- .../dev/restate/sdk/client/IngressClient.java | 27 +++++++++++++++---- .../workflow/impl/WorkflowCodegenUtil.java | 1 + 6 files changed, 60 insertions(+), 16 deletions(-) diff --git a/sdk-api-gen/src/main/resources/templates/Client.hbs b/sdk-api-gen/src/main/resources/templates/Client.hbs index b3022fc9..077175c3 100644 --- a/sdk-api-gen/src/main/resources/templates/Client.hbs +++ b/sdk-api-gen/src/main/resources/templates/Client.hbs @@ -117,10 +117,21 @@ public class {{generatedClassSimpleName}} { }{{/handlers}} public Send send() { - return new Send(); + return new Send(null); + } + + public Send send(Duration delay) { + return new Send(delay); } public class Send { + + private final Duration delay; + + Send(Duration delay) { + this.delay = delay; + } + {{#handlers}} public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) { return this.{{name}}( @@ -133,6 +144,7 @@ public class {{generatedClassSimpleName}} { {{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}}, {{inputSerdeFieldName}}, {{#if inputEmpty}}null{{else}}req{{/if}}, + this.delay, requestOptions); } @@ -147,6 +159,7 @@ public class {{generatedClassSimpleName}} { {{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}}, {{inputSerdeFieldName}}, {{#if inputEmpty}}null{{else}}req{{/if}}, + this.delay, requestOptions); }{{/handlers}} } diff --git a/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs b/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs index ae74ac81..e0b21d14 100644 --- a/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs +++ b/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs @@ -68,17 +68,18 @@ object {{generatedClassSimpleName}} { requestOptions); }{{/handlers}} - fun send(): Send { - return Send() + fun send(delay: Duration = Duration.ZERO): Send { + return Send(delay) } - inner class Send { + inner class Send(private val delay: Duration) { {{#handlers}} suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String { return this@IngressClient.ingressClient.sendSuspend( {{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@IngressClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}}, {{inputSerdeFieldName}}, {{#if inputEmpty}}Unit{{else}}req{{/if}}, + delay, requestOptions); }{{/handlers}} } diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt index 33089173..df1accb6 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt @@ -12,6 +12,8 @@ import dev.restate.sdk.client.IngressClient import dev.restate.sdk.client.RequestOptions import dev.restate.sdk.common.Serde import dev.restate.sdk.common.Target +import kotlin.time.Duration +import kotlin.time.toJavaDuration import kotlinx.coroutines.future.await // Extension methods for the IngressClient @@ -30,9 +32,10 @@ suspend fun IngressClient.sendSuspend( target: Target, reqSerde: Serde, req: Req, + delay: Duration = Duration.ZERO, options: RequestOptions = RequestOptions.DEFAULT ): String { - return this.sendAsync(target, reqSerde, req, options).await() + return this.sendAsync(target, reqSerde, req, delay.toJavaDuration(), options).await() } suspend fun IngressClient.AwakeableHandle.resolveSuspend(serde: Serde, payload: T) { diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java index 55d0e8a0..0b35669a 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java @@ -20,6 +20,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.time.Duration; import java.util.Map; import java.util.concurrent.CompletableFuture; import org.jspecify.annotations.NonNull; @@ -45,7 +46,7 @@ public CompletableFuture callAsync( Serde resSerde, Req req, RequestOptions requestOptions) { - HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, requestOptions); + HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, null, requestOptions); return httpClient .sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) .handle( @@ -69,8 +70,8 @@ public CompletableFuture callAsync( @Override public CompletableFuture sendAsync( - Target target, Serde reqSerde, Req req, RequestOptions options) { - HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, options); + Target target, Serde reqSerde, Req req, Duration delay, RequestOptions options) { + HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, delay, options); return httpClient .sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) .handle( @@ -162,7 +163,7 @@ public CompletableFuture rejectAsync(String reason) { }; } - private URI toRequestURI(Target target, boolean isSend) { + private URI toRequestURI(Target target, boolean isSend, Duration delay) { StringBuilder builder = new StringBuilder(); builder.append("/").append(target.getComponent()); if (target.getKey() != null) { @@ -172,13 +173,21 @@ private URI toRequestURI(Target target, boolean isSend) { if (isSend) { builder.append("/send"); } + if (delay != null && !delay.isZero() && !delay.isNegative()) { + builder.append("?delay=").append(delay); + } return this.baseUri.resolve(builder.toString()); } private HttpRequest prepareHttpRequest( - Target target, boolean isSend, Serde reqSerde, Req req, RequestOptions options) { - var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend)); + Target target, + boolean isSend, + Serde reqSerde, + Req req, + Duration delay, + RequestOptions options) { + var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend, delay)); // Add content-type if (reqSerde.contentType() != null) { diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java index 3064252c..a82510b1 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java @@ -11,11 +11,13 @@ import dev.restate.sdk.common.Serde; import dev.restate.sdk.common.Target; import java.net.http.HttpClient; +import java.time.Duration; import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.jspecify.annotations.NonNull; +import org.jspecify.annotations.Nullable; public interface IngressClient { @@ -46,16 +48,26 @@ default Res call(Target target, Serde reqSerde, Serde resSe } CompletableFuture sendAsync( - Target target, Serde reqSerde, Req req, RequestOptions options); + Target target, + Serde reqSerde, + Req req, + @Nullable Duration delay, + RequestOptions options); + + default CompletableFuture sendAsync( + Target target, Serde reqSerde, Req req, @Nullable Duration delay) { + return sendAsync(target, reqSerde, req, delay, RequestOptions.DEFAULT); + } default CompletableFuture sendAsync(Target target, Serde reqSerde, Req req) { - return sendAsync(target, reqSerde, req, RequestOptions.DEFAULT); + return sendAsync(target, reqSerde, req, null, RequestOptions.DEFAULT); } - default String send(Target target, Serde reqSerde, Req req, RequestOptions options) + default String send( + Target target, Serde reqSerde, Req req, @Nullable Duration delay, RequestOptions options) throws IngressException { try { - return sendAsync(target, reqSerde, req, options).join(); + return sendAsync(target, reqSerde, req, delay, options).join(); } catch (CompletionException e) { if (e.getCause() instanceof RuntimeException) { throw (RuntimeException) e.getCause(); @@ -64,8 +76,13 @@ default String send(Target target, Serde reqSerde, Req req, RequestOp } } + default String send(Target target, Serde reqSerde, Req req, @Nullable Duration delay) + throws IngressException { + return send(target, reqSerde, req, delay, RequestOptions.DEFAULT); + } + default String send(Target target, Serde reqSerde, Req req) throws IngressException { - return send(target, reqSerde, req, RequestOptions.DEFAULT); + return send(target, reqSerde, req, null, RequestOptions.DEFAULT); } /** diff --git a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java index f2875cbe..25070acc 100644 --- a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java +++ b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java @@ -218,6 +218,7 @@ public static void invokeSharedSend( Target.service(workflowName, handlerName), WorkflowImpl.INVOKE_REQUEST_SERDE, InvokeRequest.fromAny(workflowKey, payload), + null, RequestOptions.DEFAULT); }