Skip to content

Commit

Permalink
Add the ability to configure connection timeout, keep-alive settings,…
Browse files Browse the repository at this point in the history
… and advanced SocketOptions on the AwsCrtAsyncHttpClient. This is necessary for any clients with long-running connections that exceed default socket timeouts of services along the call path, and need to enable keep-alive settings which the CRT client supports, but the Java client wasn't exposing to callers
  • Loading branch information
nikp committed Oct 5, 2022
1 parent d6b02d5 commit 3f0ec56
Show file tree
Hide file tree
Showing 7 changed files with 535 additions and 8 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-c2b1dfc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "nikp",
"description": "Add the ability to configure connection timeout, keep-alive settings, and other advanced SocketOptions on the AwsCrtAsyncHttpClient to support long-running connections"
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,27 @@ public final class AwsCrtAsyncHttpClient implements SdkAsyncHttpClient {
private final int maxConnectionsPerEndpoint;
private boolean isClosed = false;

private static final Duration CRT_SDK_DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(3);
// Override default connection timeout for Crt client to be in line with the CRT default:
// https://github.com/awslabs/aws-crt-java/blob/main/src/main/java/software/amazon/awssdk/crt/io/SocketOptions.java#L79
private static final AttributeMap CRT_HTTP_DEFAULTS =
AttributeMap.builder()
.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, CRT_SDK_DEFAULT_CONNECTION_TIMEOUT)
.build();

private AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) {
int maxConns = config.get(SdkHttpConfigurationOption.MAX_CONNECTIONS);

Validate.isPositive(maxConns, "maxConns");
Validate.notNull(builder.cipherPreference, "cipherPreference");
Validate.isPositive(builder.readBufferSize, "readBufferSize");

if (Boolean.TRUE.equals(builder.standardOptions.get(SdkHttpConfigurationOption.TCP_KEEPALIVE))) {
Validate.notNull(builder.tcpKeepAliveConfiguration, "tcpKeepAliveConfiguration must be provided when tcpKeepAlive is enabled");
}

try (ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = new SocketOptions();
SocketOptions clientSocketOptions = buildSocketOptions(builder, config);
TlsContextOptions clientTlsContextOptions = TlsContextOptions.createDefaultClient() // NOSONAR
.withCipherPreference(builder.cipherPreference)
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
Expand Down Expand Up @@ -138,6 +150,34 @@ private HttpProxyOptions buildProxyOptions(ProxyConfiguration proxyConfiguration
return clientProxyOptions;
}

private SocketOptions buildSocketOptions(DefaultBuilder builder, AttributeMap config) {
SocketOptions clientSocketOptions = new SocketOptions();

Duration connectionTimeout = config.get(SdkHttpConfigurationOption.CONNECTION_TIMEOUT);
if (connectionTimeout != null) {
clientSocketOptions.connectTimeoutMs = (int) Long.min(connectionTimeout.toMillis(), Integer.MAX_VALUE);
}

TcpKeepAliveConfiguration tcpKeepAliveConfiguration = builder.tcpKeepAliveConfiguration;
if (tcpKeepAliveConfiguration != null) {
long keepAliveIntervalSecs = tcpKeepAliveConfiguration.keepAliveInterval().getSeconds();
long keepAliveTimeoutSecs = tcpKeepAliveConfiguration.keepAliveTimeout().getSeconds();

clientSocketOptions.keepAliveIntervalSecs = (int) Long.min(keepAliveIntervalSecs, Integer.MAX_VALUE);
clientSocketOptions.keepAliveTimeoutSecs = (int) Long.min(keepAliveTimeoutSecs, Integer.MAX_VALUE);

}

SocketOptionsConfiguration socketOptionsConfiguration = builder.socketOptionsConfiguration;
if (socketOptionsConfiguration != null) {
clientSocketOptions.domain = socketOptionsConfiguration.domain();
clientSocketOptions.type = socketOptionsConfiguration.type();
}


return clientSocketOptions;
}

/**
* Marks a Native CrtResource as owned by the current Java Object.
*
Expand Down Expand Up @@ -312,10 +352,10 @@ public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClien
Builder proxyConfiguration(Consumer<ProxyConfiguration.Builder> proxyConfigurationBuilderConsumer);

/**
* Configure the health checks for for all connections established by this client.
* Configure the health checks for all connections established by this client.
*
* <p>
* eg: you can set a throughput threshold for the a connection to be considered healthy.
* eg: you can set a throughput threshold for a connection to be considered healthy.
* If the connection falls below this threshold for a configurable amount of time,
* then the connection is considered unhealthy and will be shut down.
*
Expand All @@ -325,10 +365,10 @@ public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClien
Builder connectionHealthChecksConfiguration(ConnectionHealthChecksConfiguration healthChecksConfiguration);

/**
* A convenience method to configure the health checks for for all connections established by this client.
* A convenience method to configure the health checks for all connections established by this client.
*
* <p>
* eg: you can set a throughput threshold for the a connection to be considered healthy.
* eg: you can set a throughput threshold for a connection to be considered healthy.
* If the connection falls below this threshold for a configurable amount of time,
* then the connection is considered unhealthy and will be shut down.
*
Expand All @@ -340,9 +380,75 @@ Builder connectionHealthChecksConfiguration(Consumer<ConnectionHealthChecksConfi
healthChecksConfigurationBuilder);

/**
* Configure the maximum amount of time that a connection should be allowed to remain open while idle.
* The amount of time to wait when initially establishing a connection before giving up and timing out. The maximum
* possible value, in ms, is the value of {@link Integer#MAX_VALUE}, any longer duration will be reduced to the maximum
* possible value. If not specified, the connection timeout duration will be set to value defined in
* {@link AwsCrtAsyncHttpClient#CRT_SDK_DEFAULT_CONNECTION_TIMEOUT}.
*/
Builder connectionMaxIdleTime(Duration connectionMaxIdleTime);

/**
* Configure connection socket timeout
*/
Builder connectionTimeout(Duration connectionTimeout);

/**
* Configure whether to enable or disable TCP KeepAlive.
* The configuration will be passed to the socket option {@link java.net.SocketOptions#SO_KEEPALIVE}.
* <p>
* By default, this is disabled.
* <p>
* When enabled, the actual KeepAlive mechanism is dependent on the Operating System and therefore additional TCP
* KeepAlive values (like timeout, number of packets, etc) must be configured via the Operating System (sysctl on
* Linux/Mac, and Registry values on Windows).
*
* To enable, keepAlive interval and timeout must be additionally configured via {@link TcpKeepAliveConfiguration}
*/
Builder tcpKeepAlive(Boolean keepConnectionAlive);

/**
* Configure TCP Keep-alive configuration for all connections established by this client.
*
* <p>
* If tcpKeepAlive is enabled, this is required configuration
* and specify periodic keepalive packet intervals and including timeouts
* This may be required for certain connections for longer durations than default socket timeouts
*
* @param tcpKeepAliveConfiguration The TCP keep-alive configuration to use
* @return The builder of the method chaining.
*/
Builder tcpKeepAliveConfiguration(TcpKeepAliveConfiguration tcpKeepAliveConfiguration);

/**
* Configure TCP Keep-alive configuration for all connections established by this client.
*
* <p>
* If tcpKeepAlive is enabled, this is required configuration
* and specify periodic keepalive packet intervals and including timeouts
* This may be required for certain connections for longer durations than default socket timeouts
*
* @param tcpKeepAliveConfigurationBuilder The TCP keep-alive configuration builder to use
* @return The builder of the method chaining.
*/
Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
tcpKeepAliveConfigurationBuilder);

/**
* Configure socket options configuration for alternative transports via socket domains and ty pes
*
* @param socketOptionsConfiguration The socket configuration to use
* @return The builder of the method chaining.
*/
Builder socketOptionsConfiguration(SocketOptionsConfiguration socketOptionsConfiguration);

/**
* Configure socket options configuration for alternative transports via socket domains and ty pes
*
* @param socketOptionsConfigurationBuilder The socket configuration builder to use
* @return The builder of the method chaining.
*/
Builder socketOptionsConfiguration(Consumer<SocketOptionsConfiguration.Builder>
socketOptionsConfigurationBuilder);
}

/**
Expand All @@ -355,20 +461,24 @@ private static final class DefaultBuilder implements Builder {
private int readBufferSize = DEFAULT_STREAM_WINDOW_SIZE;
private ProxyConfiguration proxyConfiguration;
private ConnectionHealthChecksConfiguration connectionHealthChecksConfiguration;
private TcpKeepAliveConfiguration tcpKeepAliveConfiguration;
private SocketOptionsConfiguration socketOptionsConfiguration;

private DefaultBuilder() {
}

@Override
public SdkAsyncHttpClient build() {
return new AwsCrtAsyncHttpClient(this, standardOptions.build()
.merge(CRT_HTTP_DEFAULTS)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}

@Override
public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
return new AwsCrtAsyncHttpClient(this, standardOptions.build()
.merge(serviceDefaults)
.merge(CRT_HTTP_DEFAULTS)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}

Expand Down Expand Up @@ -417,15 +527,59 @@ public Builder connectionHealthChecksConfiguration(Consumer<ConnectionHealthChec

@Override
public Builder connectionMaxIdleTime(Duration connectionMaxIdleTime) {
Validate.isPositive(connectionMaxIdleTime, "connectionMaxIdleTime");
standardOptions.put(SdkHttpConfigurationOption.CONNECTION_MAX_IDLE_TIMEOUT, connectionMaxIdleTime);
return this;
}

@Override
public Builder connectionTimeout(Duration connectionTimeout) {
Validate.isPositive(connectionTimeout, "connectionTimeout");
standardOptions.put(SdkHttpConfigurationOption.CONNECTION_TIMEOUT, connectionTimeout);
return this;
}

@Override
public Builder tcpKeepAlive(Boolean keepConnectionAlive) {
Validate.notNull(keepConnectionAlive, "keepConnectionAlive");
standardOptions.put(SdkHttpConfigurationOption.TCP_KEEPALIVE, keepConnectionAlive);
return this;
}

@Override
public Builder tcpKeepAliveConfiguration(TcpKeepAliveConfiguration tcpKeepAliveConfiguration) {
this.tcpKeepAliveConfiguration = tcpKeepAliveConfiguration;
return this;
}

@Override
public Builder tcpKeepAliveConfiguration(Consumer<TcpKeepAliveConfiguration.Builder>
tcpKeepAliveConfigurationBuilder) {
TcpKeepAliveConfiguration.Builder builder = TcpKeepAliveConfiguration.builder();
tcpKeepAliveConfigurationBuilder.accept(builder);
return tcpKeepAliveConfiguration(builder.build());
}

@Override
public Builder proxyConfiguration(Consumer<ProxyConfiguration.Builder> proxyConfigurationBuilderConsumer) {
ProxyConfiguration.Builder builder = ProxyConfiguration.builder();
proxyConfigurationBuilderConsumer.accept(builder);
return proxyConfiguration(builder.build());
}


@Override
public Builder socketOptionsConfiguration(SocketOptionsConfiguration socketOptionsConfiguration) {
this.socketOptionsConfiguration = socketOptionsConfiguration;
return this;
}

@Override
public Builder socketOptionsConfiguration(Consumer<SocketOptionsConfiguration.Builder>
socketOptionsConfigurationBuilder) {
SocketOptionsConfiguration.Builder builder = SocketOptionsConfiguration.builder();
socketOptionsConfigurationBuilder.accept(builder);
return socketOptionsConfiguration(builder.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import software.amazon.awssdk.utils.Validate;

/**
* Configuration that defines health checks for for all connections established by
* the{@link ConnectionHealthChecksConfiguration}.
* Configuration that defines health checks for all connections established by
* the {@link ConnectionHealthChecksConfiguration}.
*
* <b>NOTE:</b> This is a Preview API and is subject to change so it should not be used in production.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt;

import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.utils.Validate;

/**
* Configuration that defines socket options for all connections established by
* the {@link SocketOptionsConfiguration}.
*
* <b>NOTE:</b> This is a Preview API and is subject to change so it should not be used in production.
*/
@SdkPublicApi
@SdkPreviewApi
public final class SocketOptionsConfiguration {

private final SocketOptions.SocketDomain domain;
private final SocketOptions.SocketType type;

private SocketOptionsConfiguration(DefaultSocketOptionsConfigurationBuilder builder) {
this.domain = Validate.paramNotNull(builder.domain,
"domain");
this.type = Validate.paramNotNull(builder.type,
"type");
}

/**
* @return socket domain
*/
public SocketOptions.SocketDomain domain() {
return domain;
}

/**
* @return socket type
*/
public SocketOptions.SocketType type() {
return type;
}

public static Builder builder() {
return new DefaultSocketOptionsConfigurationBuilder();
}

/**
* A builder for {@link SocketOptionsConfiguration}.
*
* <p>All implementations of this interface are mutable and not thread safe.</p>
*/
public interface Builder {

/**
* Sets the socket domain
* @param domain socket domain
* @return Builder
*/
Builder domain(SocketOptions.SocketDomain domain);

/**
* Sets the socket type
* @param type socket type
* @return Builder
*/
Builder type(SocketOptions.SocketType type);

SocketOptionsConfiguration build();
}

/**
* An SDK-internal implementation of {@link Builder}.
*/
private static final class DefaultSocketOptionsConfigurationBuilder implements Builder {

private SocketOptions.SocketDomain domain = SocketOptions.SocketDomain.IPv6;
private SocketOptions.SocketType type = SocketOptions.SocketType.STREAM;

private DefaultSocketOptionsConfigurationBuilder() {
}

/**
* Sets the socket domain
* Default: {@link SocketOptions.SocketDomain#IPv6}
* @param domain socket domain
* @return Builder
*/
@Override
public Builder domain(SocketOptions.SocketDomain domain) {
this.domain = domain;
return this;
}

/**
* Sets the socket type
* Default: {@link SocketOptions.SocketType#STREAM}
* @param type socket type
* @return Builder
*/
@Override
public Builder type(SocketOptions.SocketType type) {
this.type = type;
return this;
}

@Override
public SocketOptionsConfiguration build() {
return new SocketOptionsConfiguration(this);
}
}
}
Loading

0 comments on commit 3f0ec56

Please sign in to comment.