Skip to content

Commit

Permalink
Avoid int overflow when receive large MAX_CONCURRENT_STREAMS value (#…
Browse files Browse the repository at this point in the history
…2572)

Motivation:

RFC defines that settings values are represented as unsigned 32-bit
numbers: https://datatracker.ietf.org/doc/html/rfc9113#section-6.5.1
Since it doesn't explicitly describe any additional restrictions for
`SETTINGS_MAX_CONCURRENT_STREAMS` value, it can be as high as
4,294,967,295. When this happens, our concurrency controller receives
`-1` and it forces the LB to create a new connection for every HTTP/2
request.

Modifications:

- `H2ClientParentConnectionContext` should adjust high
`maxConcurrentStreams` values to `Integer.MAX_VALUE`. This is an
equivalent of unbounded concurrency. Netty's
`DefaultHttp2ConnectionDecoder` and `DefaultHttp2ConnectionEncoder` do
the same.
- Add a reproducer;

Result:

Large `SETTINGS_MAX_CONCURRENT_STREAMS` values adjusted to
`Integer.MAX_VALUE`.
  • Loading branch information
idelpivnitskiy committed May 2, 2023
1 parent 825f235 commit 6da8c12
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static io.servicetalk.transport.netty.internal.ChannelSet.CHANNEL_CLOSEABLE_KEY;
import static io.servicetalk.transport.netty.internal.CloseHandler.forNonPipelined;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

final class H2ClientParentConnectionContext extends H2ParentConnectionContext {
Expand Down Expand Up @@ -225,7 +226,7 @@ boolean ackSettings(final ChannelHandlerContext ctx, final Http2SettingsFrame se
}

maxConcurrencyProcessor.onNext(new MaxConcurrencyConsumableEvent(
maxConcurrentStreams.intValue(), ctx.channel()));
(int) min(maxConcurrentStreams, Integer.MAX_VALUE), ctx.channel()));
return false;
}

Expand Down Expand Up @@ -533,6 +534,7 @@ private static final class MaxConcurrencyConsumableEvent implements ConsumableEv
private final Channel channel;

MaxConcurrencyConsumableEvent(final int maxConcurrentStreams, final Channel channel) {
assert maxConcurrentStreams >= 0;
this.maxConcurrentStreams = maxConcurrentStreams;
this.channel = channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public void onNext(@Nullable final ConsumableEvent<Integer> event) {
assert event != null : "event can not be null in onNext.";
final int currentConcurrency = lastMaxConcurrency;
final int newConcurrency = event.event();
assert newConcurrency >= 0;
if (currentConcurrency < newConcurrency) {
// When concurrency increases, consume event to notify Netty asap, then update the value to
// allow more requests to go through. Even if this event is offloaded, eventConsumed() will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2StreamChannel;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand All @@ -61,6 +61,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_UNSIGNED_INT;
import static io.servicetalk.concurrent.api.internal.BlockingUtils.blockingInvocation;
import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder;
import static io.servicetalk.http.api.HttpExecutionStrategies.defaultStrategy;
Expand All @@ -82,14 +83,14 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Named.named;

class H2ConcurrencyControllerTest {

private static final int SERVER_MAX_CONCURRENT_STREAMS_VALUE = 1;
private static final int N_ITERATIONS = 3;
private static final int STEP = 4;
private static final int REPEATED_TEST_ITERATIONS = 16;
Expand All @@ -102,12 +103,14 @@ class H2ConcurrencyControllerTest {
private final CountDownLatch[] latches = new CountDownLatch[N_ITERATIONS];
private final AtomicReference<Channel> serverParentChannel = new AtomicReference<>();
private final AtomicBoolean alwaysEcho = new AtomicBoolean(false);
@Nullable
private EventLoopGroup serverEventLoopGroup;
@Nullable
private Channel serverAcceptorChannel;
@Nullable
private HostAndPort serverAddress;

@BeforeEach
void setUp() throws Exception {
private void setUp(long maxConcurrentStreams) throws Exception {
serverEventLoopGroup = createIoExecutor(1, "server-io").eventLoopGroup();
for (int i = 0; i < N_ITERATIONS; i++) {
latches[i] = new CountDownLatch(1);
Expand Down Expand Up @@ -135,7 +138,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final Http2HeadersF
}, parentPipeline -> {
serverParentChannel.set(parentPipeline.channel());
}, h2Builder -> {
h2Builder.initialSettings().maxConcurrentStreams(SERVER_MAX_CONCURRENT_STREAMS_VALUE);
h2Builder.initialSettings().maxConcurrentStreams(maxConcurrentStreams);
return h2Builder;
});

Expand All @@ -144,12 +147,19 @@ protected void channelRead0(final ChannelHandlerContext ctx, final Http2HeadersF

@AfterEach
void tearDown() throws Exception {
safeSync(() -> serverAcceptorChannel.close().sync());
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync());
if (serverAcceptorChannel != null) {
safeSync(() -> serverAcceptorChannel.close().sync());
}
if (serverEventLoopGroup != null) {
safeSync(() -> serverEventLoopGroup.shutdownGracefully(0, 0, MILLISECONDS).sync());
}
}

@RepeatedTest(REPEATED_TEST_ITERATIONS)
void noMaxActiveStreamsViolatedErrorAfterCancel() throws Exception {
int serverMaxConcurrentStreams = 1;
setUp(serverMaxConcurrentStreams);
assert serverAddress != null;
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.appendClientFilter(disableAutoRetries()) // All exceptions should be propagated
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
Expand Down Expand Up @@ -187,7 +197,7 @@ public void onError(final Throwable t) {
return conn;
}).toFuture().get()) {
awaitMaxConcurrentStreamsSettingsUpdate(connection, maxConcurrentStreams,
SERVER_MAX_CONCURRENT_STREAMS_VALUE);
serverMaxConcurrentStreams);
connection.releaseAsync().toFuture().get();

BlockingQueue<Throwable> exceptions = new LinkedBlockingDeque<>();
Expand Down Expand Up @@ -236,7 +246,10 @@ void noMaxActiveStreamsViolatedErrorWhenLimitDecreases(HttpExecutionStrategy str

private void noMaxActiveStreamsViolatedErrorWhenLimitChanges(boolean increase,
HttpExecutionStrategy strategy) throws Exception {
int serverMaxConcurrentStreams = 1;
setUp(serverMaxConcurrentStreams);
alwaysEcho.set(true); // server should always respond
assert serverAddress != null;
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.executionStrategy(strategy)
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
Expand All @@ -263,7 +276,7 @@ private void noMaxActiveStreamsViolatedErrorWhenLimitChanges(boolean increase,
return conn;
}).toFuture().get()) {
awaitMaxConcurrentStreamsSettingsUpdate(connection, maxConcurrentStreams,
SERVER_MAX_CONCURRENT_STREAMS_VALUE);
serverMaxConcurrentStreams);
connection.releaseAsync().toFuture().get();

final Channel serverParentChannel = this.serverParentChannel.get();
Expand Down Expand Up @@ -305,6 +318,28 @@ private void noMaxActiveStreamsViolatedErrorWhenLimitChanges(boolean increase,
}
}

@Test
void maxActiveStreamsOutsideIntRange() throws Exception {
setUp(MAX_UNSIGNED_INT);
assert serverAddress != null;
assertThat(MAX_UNSIGNED_INT, is(greaterThan((long) Integer.MAX_VALUE)));
try (HttpClient client = newClientBuilder(serverAddress, CLIENT_CTX, HTTP_2)
.appendConnectionFilter(MulticastTransportEventsStreamingHttpConnectionFilter.INSTANCE)
.protocols(HTTP_2.config)
.build()) {

BlockingQueue<Integer> maxConcurrentStreams = new LinkedBlockingDeque<>();
try (ReservedHttpConnection connection = client.reserveConnection(client.get("/")).map(conn -> {
conn.transportEventStream(MAX_CONCURRENCY_NO_OFFLOADING)
.forEach(event -> maxConcurrentStreams.add(event.event()));
return conn;
}).toFuture().get()) {
// The value is expected to be adjusted to avoid int overflow
awaitMaxConcurrentStreamsSettingsUpdate(connection, maxConcurrentStreams, Integer.MAX_VALUE);
}
}
}

private static void awaitMaxConcurrentStreamsSettingsUpdate(ReservedHttpConnection connection,
BlockingQueue<Integer> maxConcurrentStreams, int expectedValue) throws Exception {
// In some cases preface and initial settings won't be sent until after we make the first request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public void onSubscribe(final PublisherSource.Subscription subscription) {
public void onNext(@Nullable final ConsumableEvent<Integer> integerConsumableEvent) {
if (integerConsumableEvent != null) {
final int concurrency = integerConsumableEvent.event();
assert concurrency >= 0;
// Connections may set their max concurrency to 0 before shutting down.
// Map cleanup is done in terminal method.
if (concurrency > 0) {
Expand Down

0 comments on commit 6da8c12

Please sign in to comment.