Skip to content

Commit

Permalink
Ensure QuicStreamChannel.shutdownOutput() is only called once all pre…
Browse files Browse the repository at this point in the history
…vious writes were processed.

Motivation:
We need to ensure QuicStreamChannel.shutdownOutput() is only called once all previous writes were processed. This is necessary as otherwise shutdownOutput() might be called while some writes are still queued (due flowcontrol).

Modifications:
- Always do the shutdownOutput() via a ChannelFutureListener
- Adjust tests

Result:
Always drain write queue first before shutdown the output.
  • Loading branch information
normanmaurer committed Jan 16, 2024
1 parent 572300d commit 2b1dbb9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
Expand Down Expand Up @@ -50,7 +51,7 @@
* and back. It can be used as an adapter in conjunction with {@link
* Http3ServerConnectionHandler} or {@link Http3ClientConnectionHandler} to make http/3 connections
* backward-compatible with {@link ChannelHandler}s expecting {@link HttpObject}.
*
* <p>
* For simplicity, it converts to chunked encoding unless the entire stream
* is a single header.
*/
Expand Down Expand Up @@ -148,7 +149,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
return;
} else {
throw new EncoderException(
HttpResponseStatus.CONTINUE.toString() + " must be a FullHttpResponse");
HttpResponseStatus.CONTINUE + " must be a FullHttpResponse");
}
}
}
Expand Down Expand Up @@ -187,18 +188,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
Http3Headers headers = HttpConversionUtil.toHttp3Headers(last.trailingHeaders(), validateHeaders);
promise = writeWithOptionalCombiner(ctx,
new DefaultHttp3HeadersFrame(headers), promise, combiner, true);
}
if (!readable) {
} else if (!readable) {
// Release the data and just use EMPTY_BUFFER. This might allow us to give back memory to the allocator
// faster.
last.release();
if (combiner == null) {
// We only need to write something if there was no write before.
promise = writeWithOptionalCombiner(ctx,
new DefaultHttp3DataFrame(Unpooled.EMPTY_BUFFER), promise, combiner, true);
}
}

if (!readable && !hasTrailers && combiner == null) {
// we had to write nothing. happy days!
((QuicStreamChannel) ctx.channel()).shutdownOutput();
promise.trySuccess();
} else {
promise.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
}
// The shutdown is always done via the listener to ensure previous written data is correctly drained
// before QuicStreamChannel.shutdownOutput() is called. Missing to do so might cause previous queued data
// to be failed with a ClosedChannelException.
promise.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
} else if (msg instanceof HttpContent) {
promise = writeWithOptionalCombiner(ctx,
new DefaultHttp3DataFrame(((HttpContent) msg).content()), promise, combiner, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.util.CharsetUtil;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
Expand All @@ -71,6 +76,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down Expand Up @@ -221,6 +227,13 @@ public void testUpgradeEmptyEnd() {
ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT);

assertTrue(ch.isOutputShutdown());
Http3DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().readableBytes(), is(0));
} finally {
dataFrame.release();
}

assertFalse(ch.finish());
}

Expand Down Expand Up @@ -510,6 +523,13 @@ public void testEncodeEmptyEndAsClient() {
ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT);

assertTrue(ch.isOutputShutdown());
Http3DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().readableBytes(), is(0));
} finally {
dataFrame.release();
}

assertFalse(ch.finish());
}

Expand Down Expand Up @@ -606,6 +626,13 @@ public void testEncodeEmptyLastPromiseCompletes() {
assertThat(headers.path().toString(), is("/hello/world"));
assertTrue(ch.isOutputShutdown());

Http3DataFrame dataFrame = ch.readOutbound();
try {
assertThat(dataFrame.content().readableBytes(), is(0));
} finally {
dataFrame.release();
}

assertFalse(ch.finish());
}

Expand Down Expand Up @@ -684,31 +711,30 @@ public void testEncodeVoidPromise() {
assertFalse(ch.finish());
}

@Test
public void testEncodeCombinations() {
// this test goes through all the branches of Http3FrameToHttpObjectCodec and ensures right functionality

for (boolean headers : new boolean[]{false, true}) {
for (boolean last : new boolean[]{false, true}) {
for (boolean nonEmptyContent : new boolean[]{false, true}) {
for (boolean hasTrailers : new boolean[]{false, true}) {
for (boolean voidPromise : new boolean[]{false, true}) {
testEncodeCombination(headers, last, nonEmptyContent, hasTrailers, voidPromise);
private static final class EncodeCombinationsArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
List<Arguments> arguments = new ArrayList<>();
for (boolean headers : new boolean[]{false, true}) {
for (boolean last : new boolean[]{false, true}) {
for (boolean nonEmptyContent : new boolean[]{false, true}) {
for (boolean hasTrailers : new boolean[]{false, true}) {
for (boolean voidPromise : new boolean[]{false, true}) {
// this test goes through all the branches of Http3FrameToHttpObjectCodec
// and ensures right functionality
arguments.add(Arguments.of(headers, last, nonEmptyContent, hasTrailers, voidPromise));
}
}
}
}
}
return arguments.stream();
}
}

/**
* @param headers Should this be an initial message, with headers ({@link HttpRequest})?
* @param last Should this be a last message ({@link LastHttpContent})?
* @param nonEmptyContent Should this message have non-empty content?
* @param hasTrailers Should this {@code last} message have trailers?
* @param voidPromise Should the write operation use a void promise?
*/
private static void testEncodeCombination(
@ParameterizedTest(name = "headers: {0}, last: {1}, nonEmptyContent: {2}, hasTrailers: {3}, voidPromise: {4}")
@ArgumentsSource(value = EncodeCombinationsArgumentsProvider.class)
public void testEncodeCombination(
boolean headers,
boolean last,
boolean nonEmptyContent,
Expand Down Expand Up @@ -772,31 +798,28 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
Http3DataFrame dataFrame = ch.readOutbound();
assertThat(dataFrame.content().readableBytes(), is(1));
dataFrame.release();
} else if (!headers && !hasTrailers && !last) {
ch.<Http3DataFrame>readOutbound().release();
}
if (hasTrailers) {
Http3HeadersFrame trailersFrame = ch.readOutbound();
assertThat(trailersFrame.headers().get("foo"), is("bar"));
} else if (!nonEmptyContent && !headers) {
Http3DataFrame dataFrame = ch.readOutbound();
assertThat(dataFrame.content().readableBytes(), is(0));
dataFrame.release();
}
// empty LastHttpContent has no data written and will complete the promise immediately
boolean anyData = hasTrailers || nonEmptyContent || headers || !last;

if (!voidPromise) {
if (anyData) {
assertFalse(fullPromise.isDone());
} else {
// nothing to write, immediately complete
assertTrue(fullPromise.isDone());
}
}
if (!last || anyData) {
assertFalse(ch.isOutputShutdown());
assertFalse(fullPromise.isDone());
}

assertFalse(ch.isOutputShutdown());
for (ChannelPromise framePromise : framePromises) {
framePromise.trySuccess();
}
if (last) {
assertTrue(ch.isOutputShutdown());
} else {
assertFalse(ch.isOutputShutdown());
}
if (!voidPromise) {
assertTrue(fullPromise.isDone());
Expand Down

0 comments on commit 2b1dbb9

Please sign in to comment.