Skip to content

Commit 1fa1d65

Browse files
Improve Http2 ping timeout logic (#4027)
* 1/2 Improve HTTP2/PING timeout logic Improve PING timeout logic to avoid premature closing of the connection that can occur due to scheduling or flushing delays. * 2/2 Improve HTTP2/PING timeout logic Improve PING timeout logic, by allowing data frame packets to reset the timer. Since the goal is to identify and tear down a dead connection, receiving data packets serve as activity as well. This will help in case of scheduling and network delays if the PING ACK is not received in time. * Add changelog * Fix tests based on new ping timeout logic * Address review comments
1 parent bdbd872 commit 1fa1d65

File tree

6 files changed

+221
-26
lines changed

6 files changed

+221
-26
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "akidambisrinivasan",
5+
"description": "Improve HTTP2/PING timeout logic. Improvements are to reset the ack timers on incoming data packets because it supports liveness of the connection. Also avoiding premature timeouts due to delays in flushing or scheduling."
6+
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/http/timers/HttpClientApiCallTimeoutTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class HttpClientApiCallTimeoutTest {
5656

5757
private AmazonSyncHttpClient httpClient;
5858

59+
private final static int EPSILON_MILLIS = 10;
60+
5961
@Before
6062
public void setup() {
6163
httpClient = testClientBuilder()
@@ -70,7 +72,7 @@ public void successfulResponse_SlowResponseHandler_ThrowsApiCallTimeoutException
7072
.willReturn(aResponse().withStatus(200).withBody("{}")));
7173

7274
assertThatThrownBy(() -> requestBuilder().execute(combinedSyncResponseHandler(
73-
superSlowResponseHandler(API_CALL_TIMEOUT.toMillis()), null)))
75+
superSlowResponseHandler(API_CALL_TIMEOUT.toMillis() + EPSILON_MILLIS), null)))
7476
.isInstanceOf(ApiCallTimeoutException.class);
7577
}
7678

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2PingHandler.java

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import io.netty.channel.Channel;
2121
import io.netty.channel.ChannelHandlerContext;
22+
import io.netty.channel.ChannelInboundHandlerAdapter;
2223
import io.netty.channel.ChannelPipeline;
23-
import io.netty.channel.SimpleChannelInboundHandler;
2424
import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
2525
import io.netty.handler.codec.http2.Http2PingFrame;
2626
import io.netty.util.concurrent.ScheduledFuture;
@@ -37,18 +37,24 @@
3737
* If a channel is found to be unhealthy, this will invoke {@link ChannelPipeline#fireExceptionCaught(Throwable)}.
3838
*/
3939
@SdkInternalApi
40-
public class Http2PingHandler extends SimpleChannelInboundHandler<Http2PingFrame> {
40+
public class Http2PingHandler extends ChannelInboundHandlerAdapter {
4141
private static final NettyClientLogger log = NettyClientLogger.getLogger(Http2PingHandler.class);
4242
private static final Http2PingFrame DEFAULT_PING_FRAME = new DefaultHttp2PingFrame(0);
4343

44+
/**
45+
* Time limit in ms for delays that results in a warning message being printed.
46+
*/
47+
private final long delayWarningTimeLimitMs;
48+
4449
private final long pingTimeoutMillis;
4550

4651
private ScheduledFuture<?> periodicPing;
4752
private long lastPingSendTime = 0;
48-
private long lastPingAckTime = 0;
53+
private long lastMsgRcvdTime = 0;
4954

5055
public Http2PingHandler(int pingTimeoutMillis) {
5156
this.pingTimeoutMillis = pingTimeoutMillis;
57+
delayWarningTimeLimitMs = Math.min(100, pingTimeoutMillis / 10);
5258
}
5359

5460
@Override
@@ -62,7 +68,7 @@ private void start(Protocol protocol, ChannelHandlerContext ctx) {
6268
if (protocol == Protocol.HTTP2 && periodicPing == null) {
6369
periodicPing = ctx.channel()
6470
.eventLoop()
65-
.scheduleAtFixedRate(() -> doPeriodicPing(ctx.channel()), 0, pingTimeoutMillis, MILLISECONDS);
71+
.schedule(() -> doPeriodicPing(ctx.channel()), 0, MILLISECONDS);
6672
}
6773
}
6874

@@ -78,33 +84,68 @@ public void channelInactive(ChannelHandlerContext ctx) {
7884
}
7985

8086
@Override
81-
protected void channelRead0(ChannelHandlerContext ctx, Http2PingFrame frame) {
82-
if (frame.ack()) {
83-
log.debug(ctx.channel(), () -> "Received PING ACK from channel " + ctx.channel());
84-
lastPingAckTime = System.currentTimeMillis();
87+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
88+
// if there is any message read on this connection, it also suggests the channel is
89+
// active, extend the time of connection inactiveness for ping timeout logic.
90+
lastMsgRcvdTime = System.currentTimeMillis();
91+
if (msg instanceof Http2PingFrame) {
92+
doChannelRead(ctx, (Http2PingFrame) msg);
8593
} else {
94+
ctx.fireChannelRead(msg);
95+
}
96+
}
97+
98+
private void doChannelRead(ChannelHandlerContext ctx, Http2PingFrame frame) {
99+
log.debug(ctx.channel(), () -> "Received PING from channel, ack=" + frame.ack());
100+
if (!frame.ack()) {
86101
ctx.fireChannelRead(frame);
87102
}
88103
}
89104

90105
private void doPeriodicPing(Channel channel) {
91-
if (lastPingAckTime <= lastPingSendTime - pingTimeoutMillis) {
106+
if (lastMsgRcvdTime <= lastPingSendTime - pingTimeoutMillis) {
107+
log.warn(channel, () -> "PING timeout occurred");
92108
long timeSinceLastPingSend = System.currentTimeMillis() - lastPingSendTime;
93109
channelIsUnhealthy(channel, new PingFailedException("Server did not respond to PING after " +
94110
timeSinceLastPingSend + "ms (limit: " +
95111
pingTimeoutMillis + "ms)"));
96112
} else {
113+
log.debug(channel, () -> "Sending HTTP2/PING frame");
114+
long scheduleTime = lastPingSendTime == 0 ? 0 : System.currentTimeMillis() - lastPingSendTime;
115+
if (scheduleTime - pingTimeoutMillis > delayWarningTimeLimitMs) {
116+
log.warn(channel, () -> "PING timer scheduled after " + scheduleTime + "ms");
117+
}
97118
sendPing(channel);
98119
}
99120
}
100121

101122
private void sendPing(Channel channel) {
123+
long writeMs = System.currentTimeMillis();
102124
channel.writeAndFlush(DEFAULT_PING_FRAME).addListener(res -> {
103125
if (!res.isSuccess()) {
104126
log.debug(channel, () -> "Failed to write and flush PING frame to connection", res.cause());
105127
channelIsUnhealthy(channel, new PingFailedException("Failed to send PING to the service", res.cause()));
106128
} else {
129+
log.debug(channel, () -> "Successfully flushed PING frame to connection");
107130
lastPingSendTime = System.currentTimeMillis();
131+
long flushTime = lastPingSendTime - writeMs;
132+
if (flushTime > delayWarningTimeLimitMs) {
133+
log.warn(channel, () -> "Flushing PING frame took " + flushTime + "ms");
134+
}
135+
// ping frame was flushed to the socket, schedule to send the next ping now to avoid premature timeout.
136+
//
137+
// Scenario we want to avoid is shown below (NOTE: ptm - pingTimeoutMillis, Wx - Write Ping x,
138+
// Fx - Flush Ping x, Rx - Receive ack x, T -Timeout)
139+
// When timer is scheduled periodically, even though ack2 comes < ptm after being flushed to the socket, we
140+
// will still timeout at 2ptm.
141+
// 0 1ptm 2ptm 3ptm
142+
// |----------|----------|----------|-------> time
143+
// W1F1 R1 W2 F2 T R2
144+
// When timer is scheduled after flushing, we allow time for ack to come back and don't prematurely timeout.
145+
// 0 ptm1 ptm2
146+
// |-|----------|------|----------|-----------> time
147+
// W1F1 R1 W2 F2 R2 W3
148+
periodicPing = channel.eventLoop().schedule(() -> doPeriodicPing(channel), pingTimeoutMillis, MILLISECONDS);
108149
}
109150
});
110151
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/PingTimeoutTest.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,23 +99,37 @@ public void methodTeardown() throws InterruptedException {
9999
netty = null;
100100
}
101101

102+
/**
103+
* Default ping timeout value is 5000ms, so when the channel has no activity including
104+
* no data packets received or ping acks received for the timeout value, the connection
105+
* will be closed. During connection setup for testing, there will be Setting frame and
106+
* the SettingsAck frame exchanged which counts as activity on the channel, so the channel
107+
* should timeout during the second interval which is 10 seconds.
108+
*/
102109
@Test
103-
public void pingHealthCheck_null_shouldThrowExceptionAfter5Sec() {
110+
public void pingHealthCheck_null_shouldThrowExceptionAfter10Sec() {
104111
Instant a = Instant.now();
105112
assertThatThrownBy(() -> makeRequest(null).join())
106113
.hasMessageContaining("An error occurred on the connection")
107114
.hasCauseInstanceOf(IOException.class)
108115
.hasRootCauseInstanceOf(PingFailedException.class);
109-
assertThat(Duration.between(a, Instant.now())).isBetween(Duration.ofSeconds(5), Duration.ofSeconds(7));
116+
assertThat(Duration.between(a, Instant.now())).isBetween(Duration.ofSeconds(10), Duration.ofSeconds(12));
110117
}
111118

119+
/**
120+
* Test when ping timeout value is 3000ms. When the channel has no activity including
121+
* no data packets received or ping acks received for the timeout value, the connection
122+
* will be closed. During connection setup for testing, there will be Settings frame and
123+
* the SettingsAck frame exchanged which counts as activity on the channel, so the channel
124+
* should timeout during the second interval which is 6 seconds.
125+
*/
112126
@Test
113-
public void pingHealthCheck_10sec_shouldThrowExceptionAfter10Secs() {
127+
public void pingHealthCheck_3sec_shouldThrowExceptionAfter6Secs() {
114128
Instant a = Instant.now();
115-
assertThatThrownBy(() -> makeRequest(Duration.ofSeconds(10)).join()).hasCauseInstanceOf(IOException.class)
129+
assertThatThrownBy(() -> makeRequest(Duration.ofSeconds(3)).join()).hasCauseInstanceOf(IOException.class)
116130
.hasMessageContaining("An error occurred on the connection")
117131
.hasRootCauseInstanceOf(PingFailedException.class);
118-
assertThat(Duration.between(a, Instant.now())).isBetween(Duration.ofSeconds(10), Duration.ofSeconds(12));
132+
assertThat(Duration.between(a, Instant.now())).isBetween(Duration.ofSeconds(6), Duration.ofSeconds(8));
119133
}
120134

121135
@Test

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/ServerNotRespondingTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.time.Duration;
5555
import java.util.concurrent.CompletableFuture;
5656
import java.util.concurrent.atomic.AtomicInteger;
57+
import java.util.logging.Level;
5758
import org.junit.jupiter.api.AfterEach;
5859
import org.junit.jupiter.api.BeforeEach;
5960
import org.junit.jupiter.api.Test;
@@ -114,8 +115,9 @@ public void connectionNotAckPing_newRequestShouldUseNewConnection() throws Inter
114115
// First request should succeed
115116
firstRequest.join();
116117

117-
// Wait for Ping to close the connection
118-
Thread.sleep(200);
118+
// Wait for Ping to close the connection, it takes 2 * healthCheckPingPeriod (200ms)
119+
// to identify a silent connection to be closed.
120+
Thread.sleep(400);
119121
server.notRespondOnFirstChannel = false;
120122
sendGetRequest().join();
121123
assertThat(server.h2ConnectionCount.get()).isEqualTo(2);

0 commit comments

Comments
 (0)