Skip to content

Commit

Permalink
Add support for Send delay in ingress client (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Apr 9, 2024
1 parent ed651fd commit f8244e2
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 16 deletions.
15 changes: 14 additions & 1 deletion sdk-api-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,21 @@ public class {{generatedClassSimpleName}} {
}{{/handlers}}

public Send send() {
return new Send();
return new Send(null);
}

public Send send(Duration delay) {
return new Send(delay);
}

public class Send {

private final Duration delay;

Send(Duration delay) {
this.delay = delay;
}

{{#handlers}}
public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
return this.{{name}}(
Expand All @@ -133,6 +144,7 @@ public class {{generatedClassSimpleName}} {
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}null{{else}}req{{/if}},
this.delay,
requestOptions);
}

Expand All @@ -147,6 +159,7 @@ public class {{generatedClassSimpleName}} {
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}null{{else}}req{{/if}},
this.delay,
requestOptions);
}{{/handlers}}
}
Expand Down
7 changes: 4 additions & 3 deletions sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,18 @@ object {{generatedClassSimpleName}} {
requestOptions);
}{{/handlers}}

fun send(): Send {
return Send()
fun send(delay: Duration = Duration.ZERO): Send {
return Send(delay)
}

inner class Send {
inner class Send(private val delay: Duration) {
{{#handlers}}
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String {
return this@IngressClient.ingressClient.sendSuspend(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@IngressClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}Unit{{else}}req{{/if}},
delay,
requestOptions);
}{{/handlers}}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import dev.restate.sdk.client.IngressClient
import dev.restate.sdk.client.RequestOptions
import dev.restate.sdk.common.Serde
import dev.restate.sdk.common.Target
import kotlin.time.Duration
import kotlin.time.toJavaDuration
import kotlinx.coroutines.future.await

// Extension methods for the IngressClient
Expand All @@ -30,9 +32,10 @@ suspend fun <Req> IngressClient.sendSuspend(
target: Target,
reqSerde: Serde<Req>,
req: Req,
delay: Duration = Duration.ZERO,
options: RequestOptions = RequestOptions.DEFAULT
): String {
return this.sendAsync(target, reqSerde, req, options).await()
return this.sendAsync(target, reqSerde, req, delay.toJavaDuration(), options).await()
}

suspend fun <T> IngressClient.AwakeableHandle.resolveSuspend(serde: Serde<T>, payload: T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.jspecify.annotations.NonNull;
Expand All @@ -45,7 +46,7 @@ public <Req, Res> CompletableFuture<Res> callAsync(
Serde<Res> resSerde,
Req req,
RequestOptions requestOptions) {
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, requestOptions);
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, null, requestOptions);
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.handle(
Expand All @@ -69,8 +70,8 @@ public <Req, Res> CompletableFuture<Res> callAsync(

@Override
public <Req> CompletableFuture<String> sendAsync(
Target target, Serde<Req> reqSerde, Req req, RequestOptions options) {
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, options);
Target target, Serde<Req> reqSerde, Req req, Duration delay, RequestOptions options) {
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, delay, options);
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.handle(
Expand Down Expand Up @@ -162,7 +163,7 @@ public CompletableFuture<Void> rejectAsync(String reason) {
};
}

private URI toRequestURI(Target target, boolean isSend) {
private URI toRequestURI(Target target, boolean isSend, Duration delay) {
StringBuilder builder = new StringBuilder();
builder.append("/").append(target.getComponent());
if (target.getKey() != null) {
Expand All @@ -172,13 +173,21 @@ private URI toRequestURI(Target target, boolean isSend) {
if (isSend) {
builder.append("/send");
}
if (delay != null && !delay.isZero() && !delay.isNegative()) {
builder.append("?delay=").append(delay);
}

return this.baseUri.resolve(builder.toString());
}

private <Req> HttpRequest prepareHttpRequest(
Target target, boolean isSend, Serde<Req> reqSerde, Req req, RequestOptions options) {
var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend));
Target target,
boolean isSend,
Serde<Req> reqSerde,
Req req,
Duration delay,
RequestOptions options) {
var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend, delay));

// Add content-type
if (reqSerde.contentType() != null) {
Expand Down
27 changes: 22 additions & 5 deletions sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.Target;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

public interface IngressClient {

Expand Down Expand Up @@ -46,16 +48,26 @@ default <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSe
}

<Req> CompletableFuture<String> sendAsync(
Target target, Serde<Req> reqSerde, Req req, RequestOptions options);
Target target,
Serde<Req> reqSerde,
Req req,
@Nullable Duration delay,
RequestOptions options);

default <Req> CompletableFuture<String> sendAsync(
Target target, Serde<Req> reqSerde, Req req, @Nullable Duration delay) {
return sendAsync(target, reqSerde, req, delay, RequestOptions.DEFAULT);
}

default <Req> CompletableFuture<String> sendAsync(Target target, Serde<Req> reqSerde, Req req) {
return sendAsync(target, reqSerde, req, RequestOptions.DEFAULT);
return sendAsync(target, reqSerde, req, null, RequestOptions.DEFAULT);
}

default <Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options)
default <Req> String send(
Target target, Serde<Req> reqSerde, Req req, @Nullable Duration delay, RequestOptions options)
throws IngressException {
try {
return sendAsync(target, reqSerde, req, options).join();
return sendAsync(target, reqSerde, req, delay, options).join();
} catch (CompletionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
Expand All @@ -64,8 +76,13 @@ default <Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOp
}
}

default <Req> String send(Target target, Serde<Req> reqSerde, Req req, @Nullable Duration delay)
throws IngressException {
return send(target, reqSerde, req, delay, RequestOptions.DEFAULT);
}

default <Req> String send(Target target, Serde<Req> reqSerde, Req req) throws IngressException {
return send(target, reqSerde, req, RequestOptions.DEFAULT);
return send(target, reqSerde, req, null, RequestOptions.DEFAULT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public static void invokeSharedSend(
Target.service(workflowName, handlerName),
WorkflowImpl.INVOKE_REQUEST_SERDE,
InvokeRequest.fromAny(workflowKey, payload),
null,
RequestOptions.DEFAULT);
}

Expand Down

0 comments on commit f8244e2

Please sign in to comment.