Skip to content

Commit

Permalink
Add the ability to configure connection timeout, keep-alive settings,…
Browse files Browse the repository at this point in the history
… and advanced SocketOptions on the AwsCrtAsyncHttpClient. This is necessary for any clients with long-running connections that exceed default socket timeouts of services along the call path, and need to enable keep-alive settings which the CRT client supports, but the Java client wasn't exposing to callers
  • Loading branch information
nikp committed Oct 5, 2022
1 parent d6b02d5 commit 2af287f
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 8 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-c2b1dfc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "nikp",
"description": "Add the ability to configure connection timeout, keep-alive settings, and other advanced SocketOptions on the AwsCrtAsyncHttpClient to support long-running connections"
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import software.amazon.awssdk.utils.AttributeMap;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.NumericUtils;
import software.amazon.awssdk.utils.Validate;

/**
Expand Down Expand Up @@ -76,15 +77,27 @@ public final class AwsCrtAsyncHttpClient implements SdkAsyncHttpClient {
private final int maxConnectionsPerEndpoint;
private boolean isClosed = false;

private static final Duration CRT_SDK_DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(3);
// Override default connection timeout for Crt client to be in line with the CRT default:
// https://github.com/awslabs/aws-crt-java/blob/main/src/main/java/software/amazon/awssdk/crt/io/SocketOptions.java#L79
private static final AttributeMap CRT_HTTP_DEFAULTS =
AttributeMap.builder()
.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, CRT_SDK_DEFAULT_CONNECTION_TIMEOUT)
.build();

private AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) {
int maxConns = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);

Validate.isPositive(maxConns, "maxConns");
Validate.notNull(builder.cipherPreference, "cipherPreference");
Validate.isPositive(builder.readBufferSize, "readBufferSize");

if (Boolean.TRUE.equals(builder.standardOptions.get(SdkHttpConfigurationOption.TCP_KEEPALIVE))) {
Validate.notNull(builder.tcpKeepAliveConfiguration, "tcpKeepAliveConfiguration must be provided when tcpKeepAlive is enabled");
}

try (ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = new SocketOptions();
SocketOptions clientSocketOptions = buildSocketOptions(builder, config);
TlsContextOptions clientTlsContextOptions = TlsContextOptions.createDefaultClient() // NOSONAR
.withCipherPreference(builder.cipherPreference)
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
Expand Down Expand Up @@ -138,6 +151,24 @@ private HttpProxyOptions buildProxyOptions(ProxyConfiguration proxyConfiguration
return clientProxyOptions;
}

private SocketOptions buildSocketOptions(DefaultBuilder builder, AttributeMap config) {
SocketOptions clientSocketOptions = new SocketOptions();

Duration connectionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT);
if (connectionTimeout != null) {
clientSocketOptions.connectTimeoutMs = NumericUtils.saturatedCast(connectionTimeout.toMillis());
}

TcpKeepAliveConfiguration tcpKeepAliveConfiguration = builder.tcpKeepAliveConfiguration;
if (tcpKeepAliveConfiguration != null) {
clientSocketOptions.keepAliveIntervalSecs = NumericUtils.saturatedCast(tcpKeepAliveConfiguration.keepAliveInterval().getSeconds());
clientSocketOptions.keepAliveTimeoutSecs = NumericUtils.saturatedCast(tcpKeepAliveConfiguration.keepAliveTimeout().getSeconds());

}

return clientSocketOptions;
}

/**
* Marks a Native CrtResource as owned by the current Java Object.
*
Expand Down Expand Up @@ -312,10 +343,10 @@ public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClien
Builder proxyConfiguration(Consumer<ProxyConfiguration.Builder> proxyConfigurationBuilderConsumer);

/**
* Configure the health checks for for all connections established by this client.
* Configure the health checks for all connections established by this client.
*
* <p>
* eg: you can set a throughput threshold for the a connection to be considered healthy.
* eg: you can set a throughput threshold for a connection to be considered healthy.
* If the connection falls below this threshold for a configurable amount of time,
* then the connection is considered unhealthy and will be shut down.
*
Expand All @@ -325,10 +356,10 @@ public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClien
Builder connectionHealthChecksConfiguration(ConnectionHealthChecksConfiguration healthChecksConfiguration);

/**
* A convenience method to configure the health checks for for all connections established by this client.
* A convenience method to configure the health checks for all connections established by this client.
*
* <p>
* eg: you can set a throughput threshold for the a connection to be considered healthy.
* eg: you can set a throughput threshold for a connection to be considered healthy.
* If the connection falls below this threshold for a configurable amount of time,
* then the connection is considered unhealthy and will be shut down.
*
Expand All @@ -340,9 +371,58 @@ Builder connectionHealthChecksConfiguration(Consumer<ConnectionHealthChecksConfi
healthChecksConfigurationBuilder);

/**
* Configure the maximum amount of time that a connection should be allowed to remain open while idle.
* The amount of time to wait when initially establishing a connection before giving up and timing out. The maximum
* possible value, in ms, is the value of {@link Integer#MAX_VALUE}, any longer duration will be reduced to the maximum
* possible value. If not specified, the connection timeout duration will be set to value defined in
* {@link AwsCrtAsyncHttpClient#CRT_SDK_DEFAULT_CONNECTION_TIMEOUT}.
*/
Builder connectionMaxIdleTime(Duration connectionMaxIdleTime);

/**
* Configure connection socket timeout
*/
Builder connectionTimeout(Duration connectionTimeout);

/**
* Configure whether to enable or disable TCP KeepAlive.
* The configuration will be passed to the socket option {@link java.net.SocketOptions#SO_KEEPALIVE}.
* <p>
* By default, this is disabled.
* <p>
* When enabled, the actual KeepAlive mechanism is dependent on the Operating System and therefore additional TCP
* KeepAlive values (like timeout, number of packets, etc) must be configured via the Operating System (sysctl on
* Linux/Mac, and Registry values on Windows).
*
* To enable, keepAlive interval and timeout must be additionally configured via {@link TcpKeepAliveConfiguration}
*/
Builder tcpKeepAlive(Boolean keepConnectionAlive);

/**
* Configure TCP Keep-alive configuration for all connections established by this client.
*
* <p>
* If tcpKeepAlive is enabled, this is required configuration
* and specify periodic keepalive packet intervals and including timeouts
* This may be required for certain connections for longer durations than default socket timeouts
*
* @param tcpKeepAliveConfiguration The TCP keep-alive configuration to use
* @return The builder of the method chaining.
*/
Builder tcpKeepAliveConfiguration(TcpKeepAliveConfiguration tcpKeepAliveConfiguration);

/**
* Configure TCP Keep-alive configuration for all connections established by this client.
*
* <p>
* If tcpKeepAlive is enabled, this is required configuration
* and specify periodic keepalive packet intervals and including timeouts
* This may be required for certain connections for longer durations than default socket timeouts
*
* @param tcpKeepAliveConfigurationBuilder The TCP keep-alive configuration builder to use
* @return The builder of the method chaining.
*/
Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
tcpKeepAliveConfigurationBuilder);
}

/**
Expand All @@ -355,20 +435,23 @@ private static final class DefaultBuilder implements Builder {
private int readBufferSize = DEFAULT_STREAM_WINDOW_SIZE;
private ProxyConfiguration proxyConfiguration;
private ConnectionHealthChecksConfiguration connectionHealthChecksConfiguration;
private TcpKeepAliveConfiguration tcpKeepAliveConfiguration;

private DefaultBuilder() {
}

@Override
public SdkAsyncHttpClient build() {
return new AwsCrtAsyncHttpClient(this, standardOptions.build()
.merge(CRT_HTTP_DEFAULTS)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}

@Override
public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
return new AwsCrtAsyncHttpClient(this, standardOptions.build()
.merge(serviceDefaults)
.merge(CRT_HTTP_DEFAULTS)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}

Expand Down Expand Up @@ -417,10 +500,39 @@ public Builder connectionHealthChecksConfiguration(Consumer<ConnectionHealthChec

@Override
public Builder connectionMaxIdleTime(Duration connectionMaxIdleTime) {
Validate.isPositive(connectionMaxIdleTime, "connectionMaxIdleTime");
standardOptions.put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, connectionMaxIdleTime);
return this;
}

@Override
public Builder connectionTimeout(Duration connectionTimeout) {
Validate.isPositive(connectionTimeout, "connectionTimeout");
standardOptions.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, connectionTimeout);
return this;
}

@Override
public Builder tcpKeepAlive(Boolean keepConnectionAlive) {
Validate.notNull(keepConnectionAlive, "keepConnectionAlive");
standardOptions.put(SdkHttpConfigurationOption.TCP_KEEPALIVE, keepConnectionAlive);
return this;
}

@Override
public Builder tcpKeepAliveConfiguration(TcpKeepAliveConfiguration tcpKeepAliveConfiguration) {
this.tcpKeepAliveConfiguration = tcpKeepAliveConfiguration;
return this;
}

@Override
public Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
tcpKeepAliveConfigurationBuilder) {
TcpKeepAliveConfiguration.Builder builder = TcpKeepAliveConfiguration.builder();
tcpKeepAliveConfigurationBuilder.accept(builder);
return tcpKeepAliveConfiguration(builder.build());
}

@Override
public Builder proxyConfiguration(Consumer<ProxyConfiguration.Builder> proxyConfigurationBuilderConsumer) {
ProxyConfiguration.Builder builder = ProxyConfiguration.builder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import software.amazon.awssdk.utils.Validate;

/**
* Configuration that defines health checks for for all connections established by
* the{@link ConnectionHealthChecksConfiguration}.
* Configuration that defines health checks for all connections established by
* the {@link ConnectionHealthChecksConfiguration}.
*
* <b>NOTE:</b> This is a Preview API and is subject to change so it should not be used in production.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt;

import java.time.Duration;
import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.Validate;

/**
* Configuration that defines keep-alive options for all connections established by
* the {@link TcpKeepAliveConfiguration}.
*
* <b>NOTE:</b> This is a Preview API and is subject to change so it should not be used in production.
*/
@SdkPublicApi
@SdkPreviewApi
public final class TcpKeepAliveConfiguration {

private final Duration keepAliveInterval;
private final Duration keepAliveTimeout;

private TcpKeepAliveConfiguration(DefaultTcpKeepAliveConfigurationBuilder builder) {
this.keepAliveInterval = Validate.isPositive(builder.keepAliveInterval,
"keepAliveInterval");
this.keepAliveTimeout = Validate.isPositive(builder.keepAliveTimeout,
"keepAliveTimeout");
}

/**
* @return number of seconds between TCP keepalive packets being sent to the peer
*/
public Duration keepAliveInterval() {
return keepAliveInterval;
}

/**
* @return number of seconds to wait for a keepalive response before considering the connection timed out
*/
public Duration keepAliveTimeout() {
return keepAliveTimeout;
}

public static Builder builder() {
return new DefaultTcpKeepAliveConfigurationBuilder();
}

/**
* A builder for {@link TcpKeepAliveConfiguration}.
*
* <p>All implementations of this interface are mutable and not thread safe.</p>
*/
public interface Builder {
/**
* Sets the Duration between TCP keepalive packets being sent to the peer
* @param keepAliveInterval Duration between TCP keepalive packets being sent to the peer
* @return Builder
*/
Builder keepAliveInterval(Duration keepAliveInterval);

/**
* Sets the Duration to wait for a keepalive response before considering the connection timed out
* @param keepAliveTimeout Duration to wait for a keepalive response before considering the connection timed out
* @return Builder
*/
Builder keepAliveTimeout(Duration keepAliveTimeout);

TcpKeepAliveConfiguration build();
}

/**
* An SDK-internal implementation of {@link Builder}.
*/
private static final class DefaultTcpKeepAliveConfigurationBuilder implements Builder {
private Duration keepAliveInterval;
private Duration keepAliveTimeout;

private DefaultTcpKeepAliveConfigurationBuilder() {
}

/**
* Sets the Duration between TCP keepalive packets being sent to the peer
* @param keepAliveInterval Duration between TCP keepalive packets being sent to the peer
* @return Builder
*/
@Override
public Builder keepAliveInterval(Duration keepAliveInterval) {
this.keepAliveInterval = keepAliveInterval;
return this;
}

/**
* Sets the Duration to wait for a keepalive response before considering the connection timed out
* @param keepAliveTimeout Duration to wait for a keepalive response before considering the connection timed out
* @return Builder
*/
@Override
public Builder keepAliveTimeout(Duration keepAliveTimeout) {
this.keepAliveTimeout = keepAliveTimeout;
return this;
}

@Override
public TcpKeepAliveConfiguration build() {
return new TcpKeepAliveConfiguration(this);
}
}
}
Loading

0 comments on commit 2af287f

Please sign in to comment.