Skip to content

Commit

Permalink
Set a configurable maximum payload size for the message decoder and a…
Browse files Browse the repository at this point in the history
…dd an invalid message handler to catch invalid messages.

Motivation:

The gRPC message decoder uses the default limit allowed by the gRPC HTTP/2 transport (2^32 bytes). The default maximum size should be smaller and configurable.

Changes:

Add options for configuring the maximum message size and use a lower default value (256KB) for both client and server. In addition an invalid message handler can be set on the GrpcReadStream to catch invalid message reports and let the application recover invalid messages. The invalid message handler can be triggered by a capacity overflow or a decoder exception.

Results:

gRPC server and client now uses a smaller default maximum message size which can be configured according to the application needs. Invalid message handler can also be set to catch invalid messages.
  • Loading branch information
vietj committed Sep 4, 2024
1 parent 605ef3b commit 7e048f5
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ public class GrpcClientOptions {
*/
public static final TimeUnit DEFAULT_TIMEOUT_UNIT = TimeUnit.SECONDS;

/**
* The default maximum message size in bytes accepted from a server = {@code 256KB}
*/
public static final long DEFAULT_MAX_MESSAGE_SIZE = 256 * 1024;

private boolean scheduleDeadlineAutomatically;
private int timeout;
private TimeUnit timeoutUnit;
private long maxMessageSize;

/**
* Default constructor.
Expand All @@ -47,6 +53,7 @@ public GrpcClientOptions() {
scheduleDeadlineAutomatically = DEFAULT_SCHEDULE_DEADLINE_AUTOMATICALLY;
timeout = DEFAULT_TIMEOUT;
timeoutUnit = DEFAULT_TIMEOUT_UNIT;
this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
}

/**
Expand All @@ -58,6 +65,7 @@ public GrpcClientOptions(GrpcClientOptions other) {
scheduleDeadlineAutomatically = other.scheduleDeadlineAutomatically;
timeout = other.timeout;
timeoutUnit = other.timeoutUnit;
maxMessageSize = other.maxMessageSize;
}

/**
Expand Down Expand Up @@ -127,4 +135,27 @@ public GrpcClientOptions setTimeoutUnit(TimeUnit timeoutUnit) {
this.timeoutUnit = Objects.requireNonNull(timeoutUnit);
return this;
}

/**
* @return the maximum message size in bytes accepted by the client
*/
public long getMaxMessageSize() {
return maxMessageSize;
}

/**
* Set the maximum message size in bytes accepted from a server, the maximum value is {@code 0xFFFFFFFF}
* @param maxMessageSize the size
* @return a reference to this, so the API can be used fluently
*/
public GrpcClientOptions setMaxMessageSize(long maxMessageSize) {
if (maxMessageSize <= 0) {
throw new IllegalArgumentException("Max message size must be > 0");
}
if (maxMessageSize > 0xFFFFFFFFL) {
throw new IllegalArgumentException("Max message size must be <= 0xFFFFFFFF");
}
this.maxMessageSize = maxMessageSize;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class GrpcClientImpl implements GrpcClient {
private HttpClient client;
private boolean closeClient;
private final boolean scheduleDeadlineAutomatically;
private final long maxMessageSize;
private final int timeout;
private final TimeUnit timeoutUnit;

Expand All @@ -51,6 +52,7 @@ protected GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient
this.vertx = vertx;
this.client = client;
this.scheduleDeadlineAutomatically = grpcOptions.getScheduleDeadlineAutomatically();
this.maxMessageSize = grpcOptions.getMaxMessageSize();;
this.timeout = grpcOptions.getTimeout();
this.timeoutUnit = grpcOptions.getTimeoutUnit();
this.closeClient = close;
Expand All @@ -59,7 +61,12 @@ protected GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient
public Future<GrpcClientRequest<Buffer, Buffer>> request(RequestOptions options) {
return client.request(options)
.map(httpRequest -> {
GrpcClientRequestImpl<Buffer, Buffer> grpcRequest = new GrpcClientRequestImpl<>(httpRequest, scheduleDeadlineAutomatically, GrpcMessageEncoder.IDENTITY, GrpcMessageDecoder.IDENTITY);
GrpcClientRequestImpl<Buffer, Buffer> grpcRequest = new GrpcClientRequestImpl<>(
httpRequest,
maxMessageSize,
scheduleDeadlineAutomatically,
GrpcMessageEncoder.IDENTITY,
GrpcMessageDecoder.IDENTITY);
grpcRequest.init();
configureTimeout(grpcRequest);
return grpcRequest;
Expand Down Expand Up @@ -107,7 +114,12 @@ public <Req, Resp> Future<GrpcClientRequest<Req, Resp>> request(Address server,
private <Req, Resp> Future<GrpcClientRequest<Req, Resp>> request(RequestOptions options, ServiceMethod<Resp, Req> method) {
return client.request(options)
.map(request -> {
GrpcClientRequestImpl<Req, Resp> call = new GrpcClientRequestImpl<>(request, scheduleDeadlineAutomatically, method.encoder(), method.decoder());
GrpcClientRequestImpl<Req, Resp> call = new GrpcClientRequestImpl<>(
request,
maxMessageSize,
scheduleDeadlineAutomatically,
method.encoder(),
method.decoder());
call.init();
call.serviceName(method.serviceName());
call.methodName(method.methodName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class GrpcClientRequestImpl<Req, Resp> extends GrpcWriteStreamBase<GrpcCl
private boolean cancelled;

public GrpcClientRequestImpl(HttpClientRequest httpRequest,
long maxMessageSize,
boolean scheduleDeadline,
GrpcMessageEncoder<Req> messageEncoder,
GrpcMessageDecoder<Resp> messageDecoder) {
Expand Down Expand Up @@ -76,8 +77,18 @@ public GrpcClientRequestImpl(HttpClientRequest httpRequest,
}
}
if (format != null || status != null) {
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(context, this, format, status, httpResponse, messageDecoder);
GrpcClientResponseImpl<Req, Resp> grpcResponse = new GrpcClientResponseImpl<>(
context,
this,
format,
maxMessageSize,
status,
httpResponse,
messageDecoder);
grpcResponse.init(this);
grpcResponse.invalidMessageHandler(invalidMsg -> {
cancel();
});
return Future.succeededFuture(grpcResponse);
}
httpResponse.request().reset(GrpcError.CANCELLED.http2ResetCode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public class GrpcClientResponseImpl<Req, Resp> extends GrpcReadStreamBase<GrpcCl
public GrpcClientResponseImpl(ContextInternal context,
GrpcClientRequestImpl<Req, Resp> request,
WireFormat format,
long maxMessageSize,
GrpcStatus status,
HttpClientResponse httpResponse, GrpcMessageDecoder<Resp> messageDecoder) {
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), format, messageDecoder);
super(context, httpResponse, httpResponse.headers().get("grpc-encoding"), format, maxMessageSize, messageDecoder);
this.request = request;
this.httpResponse = httpResponse;
this.status = status;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.grpc.examples.streaming.Empty;
import io.grpc.examples.streaming.Item;
import io.grpc.examples.streaming.StreamingGrpc;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpHeaders;
Expand All @@ -36,7 +37,10 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -699,4 +703,81 @@ public void testJsonMessageFormat(TestContext should) throws Exception {

done.awaitSuccess();
}

@Test
public void testDefaultMessageSizeOverflow(TestContext should) throws Exception {

Async test = should.async();

Item item = Item.newBuilder().setValue("Asmoranomardicadaistinaculdacar").build();
int itemLen = item.getSerializedSize();

StreamingGrpc.StreamingImplBase called = new StreamingGrpc.StreamingImplBase() {
@Override
public void source(Empty request, StreamObserver<Item> responseObserver) {
ServerCallStreamObserver callStreamObserver = (ServerCallStreamObserver) responseObserver;
callStreamObserver.setOnCancelHandler(() -> {
test.complete();
});
responseObserver.onNext(item);
}
};
startServer(called);

GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions().setMaxMessageSize(itemLen - 1));
client.request(SocketAddress.inetSocketAddress(port, "localhost"), STREAMING_SOURCE)
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.response().onComplete(should.asyncAssertSuccess(callResponse -> {
callResponse.handler(msg -> {
should.fail();
});
}));
callRequest.end(Empty.getDefaultInstance());
}));

test.awaitSuccess(20_000);
}

@Test
public void testInvalidMessageHandlerStream(TestContext should) throws Exception {

Async test = should.async();

List<Item> items = Arrays.asList(
Item.newBuilder().setValue("msg1").build(),
Item.newBuilder().setValue("Asmoranomardicadaistinaculdacar").build(),
Item.newBuilder().setValue("msg3").build()
);

int itemLen = items.get(1).getSerializedSize();

StreamingGrpc.StreamingImplBase called = new StreamingGrpc.StreamingImplBase() {
@Override
public void source(Empty request, StreamObserver<Item> responseObserver) {
items.forEach(item -> responseObserver.onNext(item));
responseObserver.onCompleted();
}
};
startServer(called);

GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions().setMaxMessageSize(itemLen - 1));
client.request(SocketAddress.inetSocketAddress(port, "localhost"), STREAMING_SOURCE)
.onComplete(should.asyncAssertSuccess(callRequest -> {
callRequest.response().onComplete(should.asyncAssertSuccess(callResponse -> {
List<Object> received = new ArrayList<>();
callResponse.invalidMessageHandler(received::add);
callResponse.handler(received::add);
callResponse.endHandler(v -> {
should.assertEquals(Item.class, received.get(0).getClass());
should.assertEquals(MessageSizeOverflowException.class, received.get(1).getClass());
should.assertEquals(Item.class, received.get(2).getClass());
should.assertEquals(3, received.size());
test.complete();
});
}));
callRequest.end(Empty.getDefaultInstance());
}));

test.awaitSuccess(20_000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public T decode(GrpcMessage msg) throws CodecException {
try {
return parser.parseFrom(msg.payload().getBytes());
} catch (InvalidProtocolBufferException e) {
return null;
throw new CodecException(e);
}
}
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vertx.grpc.common;

import io.vertx.codegen.annotations.Fluent;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
Expand Down Expand Up @@ -36,6 +37,19 @@ public interface GrpcReadStream<T> extends ReadStream<T> {
@Fluent
GrpcReadStream<T> messageHandler(@Nullable Handler<GrpcMessage> handler);

/**
* Set a message handler that is reported with invalid message errors.
*
* <p>Warning: setting this handler overwrite the default handler which takes appropriate measure
* when an invalid message is encountered such as cancelling the stream. This handler should be set
* when control over invalid messages is required.</p>
*
* @param handler the invalid message handler
* @return a reference to this, so the API can be used fluently
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
GrpcReadStream<T> invalidMessageHandler(@Nullable Handler<InvalidMessageException> handler);

/**
* Set a handler to be notified with gRPC errors.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.common;

import io.vertx.core.VertxException;

/**
* Signals an invalid message.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public abstract class InvalidMessageException extends VertxException {

InvalidMessageException() {
super((String) null, true);
}

InvalidMessageException(Throwable cause) {
super(cause, true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.common;

/**
* Signals a message with an invalid payload, i.e. that could not be decoded by the protobuf codec.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public final class InvalidMessagePayloadException extends InvalidMessageException {

private GrpcMessage message;

public InvalidMessagePayloadException(GrpcMessage message, Throwable cause) {
super(cause);
this.message = message;
}

/**
* @return the invalid message that could not be decoded.
*/
public GrpcMessage message() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.grpc.common;

/**
* Signals a message that is longer than the maximum configured size.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public final class MessageSizeOverflowException extends InvalidMessageException {

private final long messageSize;

public MessageSizeOverflowException(long messageSize) {
this.messageSize = messageSize;
}

public long messageSize() {
return messageSize;
}
}
Loading

0 comments on commit 7e048f5

Please sign in to comment.