Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set a configurable maximum payload size for the message decoder and add an invalid message handler to catch invalid messages #114

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ public class GrpcClientOptions {
*/
public static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.SECONDS;

/**
* The default maximum message size in bytes accepted from a server = {@code 256KB}
*/
public static final long DEFAULT_MAX_MESSAGE_SIZE = 256 * 1024;

private boolean scheduleDeadlineAutomatically;
private int timeout;
private TimeUnit timeoutUnit;
private long maxMessageSize;

/**
* Default constructor.
Expand All @@ -47,6 +53,7 @@ public GrpcClientOptions() {
scheduleDeadlineAutomatically = DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY;
timeout = DEFAULT_TIMEOUT;
timeoutUnit = DEFAULT_TIMEOUT_UNIT;
this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
}

/**
Expand All @@ -58,6 +65,7 @@ public GrpcClientOptions(GrpcClientOptions other) {
scheduleDeadlineAutomatically = other.scheduleDeadlineAutomatically;
timeout = other.timeout;
timeoutUnit = other.timeoutUnit;
maxMessageSize = other.maxMessageSize;
}

/**
Expand Down Expand Up @@ -127,4 +135,27 @@ public GrpcClientOptions setTimeoutUnit(TimeUnit timeoutUnit) {
this.timeoutUnit = Objects.requireNonNull(timeoutUnit);
return this;
}

/**
* @return the maximum message size in bytes accepted by the client
*/
public long getMaxMessageSize() {
return maxMessageSize;
}

/**
* Set the maximum message size in bytes accepted from a server, the maximum value is {@code 0xFFFFFFFF}
* @param maxMessageSize the size
* @return a reference to this, so the API can be used fluently
*/
public GrpcClientOptions setMaxMessageSize(long maxMessageSize) {
if (maxMessageSize <= 0) {
throw new IllegalArgumentException("Max message size must be > 0");
}
if (maxMessageSize > 0xFFFFFFFFL) {
throw new IllegalArgumentException("Max message size must be <= 0xFFFFFFFF");
}
this.maxMessageSize = maxMessageSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class GrpcClientImpl implements GrpcClient {
private HttpClient client;
private boolean closeClient;
private final boolean scheduleDeadlineAutomatically;
private final long maxMessageSize;
private final int timeout;
private final TimeUnit timeoutUnit;

Expand All @@ -51,6 +52,7 @@ protected GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient
this.vertx = vertx;
this.client = client;
this.scheduleDeadlineAutomatically = grpcOptions.getScheduleDeadlineAutomatically();
this.maxMessageSize = grpcOptions.getMaxMessageSize();;
this.timeout = grpcOptions.getTimeout();
this.timeoutUnit = grpcOptions.getTimeoutUnit();
this.closeClient = close;
Expand All @@ -59,7 +61,12 @@ protected GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient
public Future<GrpcClientRequest<Buffer, Buffer>> request(RequestOptions options) {
return client.request(options)
.map(httpRequest -> {
GrpcClientRequestImpl<Buffer, Buffer> grpcRequest = new GrpcClientRequestImpl<>(httpRequest, scheduleDeadlineAutomatically, GrpcMessageEncoder.IDENTITY, GrpcMessageDecoder.IDENTITY);
GrpcClientRequestImpl<Buffer, Buffer> grpcRequest = new GrpcClientRequestImpl<>(
httpRequest,
maxMessageSize,
scheduleDeadlineAutomatically,
GrpcMessageEncoder.IDENTITY,
GrpcMessageDecoder.IDENTITY);
grpcRequest.init();
configureTimeout(grpcRequest);
return grpcRequest;
Expand Down Expand Up @@ -107,7 +114,12 @@ public <Req, Resp> Future<GrpcClientRequest<Req, Resp>> request(Address server,
private <Req, Resp> Future<GrpcClientRequest<Req, Resp>> request(RequestOptions options, ServiceMethod<Resp, Req> method) {
return client.request(options)
.map(request -> {
GrpcClientRequestImpl<Req, Resp> call = new GrpcClientRequestImpl<>(request, scheduleDeadlineAutomatically, method.encoder(), method.decoder());
GrpcClientRequestImpl<Req, Resp> call = new GrpcClientRequestImpl<>(
request,
maxMessageSize,
scheduleDeadlineAutomatically,
method.encoder(),
method.decoder());
call.init();
call.serviceName(method.serviceName());
call.methodName(method.methodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class GrpcClientRequestImpl<Req, Resp> extends GrpcWriteStreamBase<GrpcCl
private boolean cancelled;

public GrpcClientRequestImpl(HttpClientRequest httpRequest,
long maxMessageSize,
boolean scheduleDeadline,
GrpcMessageEncoder<Req> messageEncoder,
GrpcMessageDecoder<Resp> messageDecoder) {
Expand Down Expand Up @@ -76,8 +77,18 @@ public GrpcClientRequestImpl(HttpClientRequest httpRequest,
}
}
if (format != null || status != null) {
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, this, format, status, httpResponse, messageDecoder);
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(
context,
this,
format,
maxMessageSize,
status,
httpResponse,
messageDecoder);
grpcResponse.init(this);
grpcResponse.invalidMessageHandler(invalidMsg -> {
cancel();
});
return Future.succeededFuture(grpcResponse);
}
httpResponse.request().reset(GrpcError.CANCELLED.http2ResetCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public class GrpcClientResponseImpl<Req, Resp> extends GrpcReadStreamBase<GrpcCl
public GrpcClientResponseImpl(ContextInternal context,
GrpcClientRequestImpl<Req, Resp> request,
WireFormat format,
long maxMessageSize,
GrpcStatus status,
HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), format, messageDecoder);
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), format, maxMessageSize, messageDecoder);
this.request = request;
this.httpResponse = httpResponse;
this.status = status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.StreamingGrpc;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpHeaders;
Expand All @@ -36,7 +37,10 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -699,4 +703,81 @@ public void testJsonMessageFormat(TestContext should) throws Exception {

done.awaitSuccess();
}

@Test
public void testDefaultMessageSizeOverflow(TestContext should) throws Exception {

Async test = should.async();

Item item = Item.newBuilder().setValue("Asmoranomardicadaistinaculdacar").build();
int itemLen = item.getSerializedSize();

StreamingGrpc.StreamingImplBase called = new StreamingGrpc.StreamingImplBase() {
@Override
public void source(Empty request, StreamObserver<Item> responseObserver) {
ServerCallStreamObserver callStreamObserver = (ServerCallStreamObserver) responseObserver;
callStreamObserver.setOnCancelHandler(() -> {
test.complete();
});
responseObserver.onNext(item);
}
};
startServer(called);

GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions().setMaxMessageSize(itemLen - 1));
client.request(SocketAddress.inetSocketAddress(port, "localhost"), STREAMING_SOURCE)
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.response().onComplete(should.asyncAssertSuccess(callResponse -> {
callResponse.handler(msg -> {
should.fail();
});
}));
callRequest.end(Empty.getDefaultInstance());
}));

test.awaitSuccess(20_000);
}

@Test
public void testInvalidMessageHandlerStream(TestContext should) throws Exception {

Async test = should.async();

List<Item> items = Arrays.asList(
Item.newBuilder().setValue("msg1").build(),
Item.newBuilder().setValue("Asmoranomardicadaistinaculdacar").build(),
Item.newBuilder().setValue("msg3").build()
);

int itemLen = items.get(1).getSerializedSize();

StreamingGrpc.StreamingImplBase called = new StreamingGrpc.StreamingImplBase() {
@Override
public void source(Empty request, StreamObserver<Item> responseObserver) {
items.forEach(item -> responseObserver.onNext(item));
responseObserver.onCompleted();
}
};
startServer(called);

GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions().setMaxMessageSize(itemLen - 1));
client.request(SocketAddress.inetSocketAddress(port, "localhost"), STREAMING_SOURCE)
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.response().onComplete(should.asyncAssertSuccess(callResponse -> {
List<Object> received = new ArrayList<>();
callResponse.invalidMessageHandler(received::add);
callResponse.handler(received::add);
callResponse.endHandler(v -> {
should.assertEquals(Item.class, received.get(0).getClass());
should.assertEquals(MessageSizeOverflowException.class, received.get(1).getClass());
should.assertEquals(Item.class, received.get(2).getClass());
should.assertEquals(3, received.size());
test.complete();
});
}));
callRequest.end(Empty.getDefaultInstance());
}));

test.awaitSuccess(20_000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public T decode(GrpcMessage msg) throws CodecException {
try {
return parser.parseFrom(msg.payload().getBytes());
} catch (InvalidProtocolBufferException e) {
return null;
throw new CodecException(e);
}
}
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vertx.grpc.common;

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
Expand Down Expand Up @@ -36,6 +37,19 @@ public interface GrpcReadStream<T> extends ReadStream<T> {
@Fluent
GrpcReadStream<T> messageHandler(@Nullable Handler<GrpcMessage> handler);

/**
* Set a message handler that is reported with invalid message errors.
*
* <p>Warning: setting this handler overwrite the default handler which takes appropriate measure
* when an invalid message is encountered such as cancelling the stream. This handler should be set
* when control over invalid messages is required.</p>
*
* @param handler the invalid message handler
* @return a reference to this, so the API can be used fluently
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
GrpcReadStream<T> invalidMessageHandler(@Nullable Handler<InvalidMessageException> handler);

/**
* Set a handler to be notified with gRPC errors.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.common;

import io.vertx.core.VertxException;

/**
* Signals an invalid message.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public abstract class InvalidMessageException extends VertxException {

InvalidMessageException() {
super((String) null, true);
}

InvalidMessageException(Throwable cause) {
super(cause, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.common;

/**
* Signals a message with an invalid payload, i.e. that could not be decoded by the protobuf codec.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public final class InvalidMessagePayloadException extends InvalidMessageException {

private GrpcMessage message;

public InvalidMessagePayloadException(GrpcMessage message, Throwable cause) {
super(cause);
this.message = message;
}

/**
* @return the invalid message that could not be decoded.
*/
public GrpcMessage message() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.common;

/**
* Signals a message that is longer than the maximum configured size.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public final class MessageSizeOverflowException extends InvalidMessageException {

private final long messageSize;

public MessageSizeOverflowException(long messageSize) {
this.messageSize = messageSize;
}

public long messageSize() {
return messageSize;
}
}
Loading
Loading