Skip to content

Commit

Permalink
Netty Expect 100-Continue fix
Browse files Browse the repository at this point in the history
Signed-off-by: Maxim Nesen <maxim.nesen@oracle.com>
  • Loading branch information
senivam committed Jan 29, 2025
1 parent 9d5fbe8 commit 814dd35
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.glassfish.jersey.netty.connector;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
Expand All @@ -33,114 +31,98 @@
import javax.ws.rs.ProcessingException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class JerseyExpectContinueHandler extends ChannelInboundHandlerAdapter {

private boolean expectationAllowed = false;

private boolean handshakeStarted = false;

private boolean handshakeDone = false;
private ExpectationState currentState = ExpectationState.IDLE;

private static final List<HttpResponseStatus> statusesToBeConsidered = Arrays.asList(HttpResponseStatus.CONTINUE,
HttpResponseStatus.UNAUTHORIZED, HttpResponseStatus.EXPECTATION_FAILED,
HttpResponseStatus.METHOD_NOT_ALLOWED, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);

private CompletableFuture<HttpResponseStatus> expectedFuture = new CompletableFuture<>();
private static final List<HttpResponseStatus> expectationStatuses = Arrays.asList(HttpResponseStatus.CONTINUE,
HttpResponseStatus.EXPECTATION_FAILED, HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE);

private HttpResponseStatus status = null;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!expectationAllowed) {
ctx.fireChannelRead(msg);
return;
if (checkExpectResponse(msg)) {
currentState = ExpectationState.AWAITING;
}
resetHandler();
if (msg instanceof HttpResponse) {
final HttpResponse response = (HttpResponse) msg;
if (statusesToBeConsidered.contains(response.status())) {
expectedFuture.complete(response.status());
if (!HttpResponseStatus.CONTINUE.equals(response.status())) {
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
switch (currentState) {
case AWAITING:
final HttpResponse response = (HttpResponse) msg;
status = response.status();

boolean handshakeDone = HttpUtil.isContentLengthSet(response) || msg instanceof FullHttpMessage;
currentState = (handshakeDone) ? ExpectationState.IDLE : ExpectationState.FINISHING;

return;
case FINISHING:
if (msg instanceof LastHttpContent) {
currentState = ExpectationState.IDLE;
}
handshakeStarted = true;
handshakeDone = HttpUtil.isContentLengthSet(response) || msg instanceof FullHttpMessage;
} else {
return;
default:
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
}
}
}

} else if (msg instanceof LastHttpContent) {
if (handshakeStarted) {
handshakeDone = true;
} else {
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
}
} else {
ctx.fireChannelRead(msg); //bypass the message to the next handler in line
private boolean checkExpectResponse(Object msg) {
if (msg instanceof HttpResponse) {
return expectationStatuses.contains(((HttpResponse) msg).status());
}
return false;
}

CompletableFuture<HttpResponseStatus> processExpect100ContinueRequest(HttpRequest nettyRequest,
ClientRequest jerseyRequest,
Channel ch,
Integer timeout)
HttpRequest prepare100ContinueRequest(HttpRequest nettyRequest,
ClientRequest jerseyRequest)
throws InterruptedException, ExecutionException, TimeoutException {
//check for 100-Continue presence/availability
final Expect100ContinueConnectorExtension expect100ContinueExtension
= new Expect100ContinueConnectorExtension();

final DefaultFullHttpRequest nettyRequestHeaders =
final DefaultFullHttpRequest preparedNettyHeadersRequest =
new DefaultFullHttpRequest(nettyRequest.protocolVersion(), nettyRequest.method(), nettyRequest.uri());
nettyRequestHeaders.headers().setAll(nettyRequest.headers());
preparedNettyHeadersRequest.headers().setAll(nettyRequest.headers());

if (!nettyRequestHeaders.headers().contains(HttpHeaderNames.HOST)) {
nettyRequestHeaders.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
if (!preparedNettyHeadersRequest.headers().contains(HttpHeaderNames.HOST)) {
preparedNettyHeadersRequest.headers().add(HttpHeaderNames.HOST, jerseyRequest.getUri().getHost());
}

//If Expect:100-continue feature is enabled and client supports it, the nettyRequestHeaders will be
//enriched with the 'Expect:100-continue' header.
expect100ContinueExtension.invoke(jerseyRequest, nettyRequestHeaders);

expectationAllowed = HttpUtil.is100ContinueExpected(nettyRequestHeaders);

if (expectationAllowed) {
ch.writeAndFlush(nettyRequestHeaders).sync();
final HttpResponseStatus status = expectedFuture
.get(timeout, TimeUnit.MILLISECONDS);
expect100ContinueExtension.invoke(jerseyRequest, preparedNettyHeadersRequest);

processExpectationStatus(status);
}
return expectedFuture;
}

private void resetHandler() {
if (handshakeDone
&& handshakeStarted
&& expectationAllowed
&& expectedFuture.isDone()) {
handshakeDone =
handshakeStarted =
expectationAllowed = false;
expectedFuture = new CompletableFuture<>();
}
return preparedNettyHeadersRequest;
}

private void processExpectationStatus(HttpResponseStatus status)
void processExpectationStatus()
throws TimeoutException {
System.out.println(status);
if (status == null) {
throw new TimeoutException(); // continue without expectations
}
if (!statusesToBeConsidered.contains(status)) {
throw new ProcessingException(LocalizationMessages
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
}
if (!expectedFuture.isDone() || HttpResponseStatus.EXPECTATION_FAILED.equals(status)) {
if (HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE.equals(status)) {
throw new ProcessingException(LocalizationMessages
.REQUEST_ENTITY_TOO_LARGE(), null);
}
if (HttpResponseStatus.EXPECTATION_FAILED.equals(status)) {
throw new TimeoutException(); // continue without expectations
}
if (!HttpResponseStatus.CONTINUE.equals(status)) {
throw new ProcessingException(LocalizationMessages
.UNEXPECTED_VALUE_FOR_EXPECT_100_CONTINUE_STATUSES(status.code()), null);
}
}

private enum ExpectationState {
AWAITING,
FINISHING,
IDLE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
final Integer maxInitialLineLength = ClientProperties.getValue(config.getProperties(),
NettyClientProperties.MAX_INITIAL_LINE_LENGTH,
NettyClientProperties.DEFAULT_INITIAL_LINE_LENGTH);
/*
final Long aggregatorContentLength = ClientProperties.getValue(config.getProperties(),
ClientProperties.EXPECT_100_CONTINUE_THRESHOLD_SIZE,
ClientProperties.DEFAULT_EXPECT_100_CONTINUE_THRESHOLD_SIZE);
*/

p.addLast(new HttpClientCodec(maxInitialLineLength, maxHeaderSize, maxChunkSize));
// p.addLast(new HttpObjectAggregator(aggregatorContentLength.intValue()));
p.addLast(EXPECT_100_CONTINUE_HANDLER, expect100ContinueHandler);
p.addLast(new ChunkedWriteHandler());
p.addLast(new HttpContentDecompressor());
Expand Down Expand Up @@ -451,22 +444,27 @@ public void operationComplete(io.netty.util.concurrent.Future<? super Void> futu
// // Set later after the entity is "written"
// break;
}

final CountDownLatch headersSet = new CountDownLatch(1);
final CountDownLatch contentLengthSet = new CountDownLatch(1);

try {
expect100ContinueHandler.processExpect100ContinueRequest(nettyRequest, jerseyRequest,
ch, expect100ContinueTimeout);
final HttpRequest expect100ContinueRequest =
expect100ContinueHandler.prepare100ContinueRequest(nettyRequest, jerseyRequest);
if (HttpUtil.is100ContinueExpected(expect100ContinueRequest)) {
//send expect request, sync and wait till either response or timeout received
boolean completed = ch.writeAndFlush(expect100ContinueRequest)
.sync().await(expect100ContinueTimeout, TimeUnit.MILLISECONDS);
if (completed) {
expect100ContinueHandler.processExpectationStatus();
} //if not we just continue without expectations
}
} catch (ExecutionException e) {
responseDone.completeExceptionally(e);
} catch (TimeoutException e) {
//Expect:100-continue allows timeouts by the spec
//just removing the pipeline from processing
if (ch.pipeline().context(JerseyExpectContinueHandler.class) != null) {
ch.pipeline().remove(EXPECT_100_CONTINUE_HANDLER);
}
}

final CountDownLatch headersSet = new CountDownLatch(1);
final CountDownLatch contentLengthSet = new CountDownLatch(1);

jerseyRequest.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(int contentLength) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2016, 2023 Oracle and/or its affiliates. All rights reserved.
# Copyright (c) 2016, 2025 Oracle and/or its affiliates. All rights reserved.
#
# This program and the accompanying materials are made available under the
# terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -23,3 +23,4 @@ redirect.error.determining.location="Error determining redirect location: ({0}).
redirect.infinite.loop="Infinite loop in chained redirects detected."
redirect.limit.reached="Max chained redirect limit ({0}) exceeded."
unexpected.value.for.expect.100.continue.statuses=Unexpected value: ("{0}").
request.entity.too.large=Request Entity Too Large.

0 comments on commit 814dd35

Please sign in to comment.