Skip to content

Commit

Permalink
Add the ability to configure SocketOptions on the AwsCrtAsyncHttpClie…
Browse files Browse the repository at this point in the history
…nt. This is necessary for any clients with long-running connections that exceed default socket timeouts of services along the call path, and need to enable keep-alive settings which the CRT client supports, but the Java client wasn't exposing to callers
  • Loading branch information
nikp committed Oct 3, 2022
1 parent b775951 commit 96b08f5
Show file tree
Hide file tree
Showing 5 changed files with 396 additions and 14 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-c2b1dfc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "nikp",
"description": "Add the ability to configure SocketOptions on the AwsCrtAsyncHttpClient to allow explicit enabling of keep-alive signals for long-running connections"
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private AwsCrtAsyncHttpClient(DefaultBuilder builder, AttributeMap config) {
Validate.isPositive(builder.readBufferSize, "readBufferSize");

try (ClientBootstrap clientBootstrap = new ClientBootstrap(null, null);
SocketOptions clientSocketOptions = new SocketOptions();
SocketOptions clientSocketOptions = buildSocketOptions(builder.socketOptionsConfiguration);
TlsContextOptions clientTlsContextOptions = TlsContextOptions.createDefaultClient() // NOSONAR
.withCipherPreference(builder.cipherPreference)
.withVerifyPeer(!config.get(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES));
Expand Down Expand Up @@ -138,13 +138,29 @@ private HttpProxyOptions buildProxyOptions(ProxyConfiguration proxyConfiguration
return clientProxyOptions;
}

/**
* Marks a Native CrtResource as owned by the current Java Object.
*
* @param subresource The Resource to own.
* @param <T> The CrtResource Type
* @return The CrtResource passed in
*/
private SocketOptions buildSocketOptions(SocketOptionsConfiguration socketOptionsConfiguration) {
SocketOptions clientSocketOptions = new SocketOptions();

if (socketOptionsConfiguration == null) {
return clientSocketOptions;
}

clientSocketOptions.domain = socketOptionsConfiguration.domain();
clientSocketOptions.type = socketOptionsConfiguration.type();
clientSocketOptions.connectTimeoutMs = socketOptionsConfiguration.connectTimeoutMs();
clientSocketOptions.keepAliveIntervalSecs = socketOptionsConfiguration.keepAliveIntervalSecs();
clientSocketOptions.keepAliveTimeoutSecs = socketOptionsConfiguration.keepAliveTimeoutSecs();

return clientSocketOptions;
}

/**
* Marks a Native CrtResource as owned by the current Java Object.
*
* @param subresource The Resource to own.
* @param <T> The CrtResource Type
* @return The CrtResource passed in
*/
private <T extends CrtResource> T registerOwnedResource(T subresource) {
if (subresource != null) {
subresource.addRef();
Expand Down Expand Up @@ -312,10 +328,10 @@ public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClien
Builder proxyConfiguration(Consumer<ProxyConfiguration.Builder> proxyConfigurationBuilderConsumer);

/**
* Configure the health checks for for all connections established by this client.
* Configure the health checks for all connections established by this client.
*
* <p>
* eg: you can set a throughput threshold for the a connection to be considered healthy.
* eg: you can set a throughput threshold for a connection to be considered healthy.
* If the connection falls below this threshold for a configurable amount of time,
* then the connection is considered unhealthy and will be shut down.
*
Expand All @@ -325,10 +341,10 @@ public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClien
Builder connectionHealthChecksConfiguration(ConnectionHealthChecksConfiguration healthChecksConfiguration);

/**
* A convenience method to configure the health checks for for all connections established by this client.
* A convenience method to configure the health checks for all connections established by this client.
*
* <p>
* eg: you can set a throughput threshold for the a connection to be considered healthy.
* eg: you can set a throughput threshold for a connection to be considered healthy.
* If the connection falls below this threshold for a configurable amount of time,
* then the connection is considered unhealthy and will be shut down.
*
Expand All @@ -343,6 +359,34 @@ Builder connectionHealthChecksConfiguration(Consumer<ConnectionHealthChecksConfi
* Configure the maximum amount of time that a connection should be allowed to remain open while idle.
*/
Builder connectionMaxIdleTime(Duration connectionMaxIdleTime);

/**
* Configure socket options configuration for all connections established by this client.
*
* <p>
* eg: you can override socket domains and types, connection timeouts,
* and enable periodic keepalive packets which are disabled out of the box, including keepalive timeouts
* This may be required for certain connections for longer durations than default socket timeouts
*
* @param socketOptionsConfiguration The socket configuration to use
* @return The builder of the method chaining.
*/
Builder socketOptionsConfiguration(SocketOptionsConfiguration socketOptionsConfiguration);

/**
* A convenience method to configure socket options configuration for all connections established by this client.
*
* <p>
* eg: you can override socket domains and types, connection timeouts,
* and enable periodic keepalive packets which are disabled out of the box, including keepalive timeouts
* This may be required for certain connections for longer durations than default socket timeouts
*
* @param socketOptionsConfigurationBuilder The socket configuration builder to use
* @return The builder of the method chaining.
* @see #socketOptionsConfiguration(SocketOptionsConfiguration)
*/
Builder socketOptionsConfiguration(Consumer<SocketOptionsConfiguration.Builder>
socketOptionsConfigurationBuilder);
}

/**
Expand All @@ -355,6 +399,7 @@ private static final class DefaultBuilder implements Builder {
private int readBufferSize = DEFAULT_STREAM_WINDOW_SIZE;
private ProxyConfiguration proxyConfiguration;
private ConnectionHealthChecksConfiguration connectionHealthChecksConfiguration;
private SocketOptionsConfiguration socketOptionsConfiguration;

private DefaultBuilder() {
}
Expand Down Expand Up @@ -427,5 +472,20 @@ public Builder proxyConfiguration(Consumer<ProxyConfiguration.Builder> proxyConf
proxyConfigurationBuilderConsumer.accept(builder);
return proxyConfiguration(builder.build());
}


@Override
public Builder socketOptionsConfiguration(SocketOptionsConfiguration socketOptionsConfiguration) {
this.socketOptionsConfiguration = socketOptionsConfiguration;
return this;
}

@Override
public Builder socketOptionsConfiguration(Consumer<SocketOptionsConfiguration.Builder>
socketOptionsConfigurationBuilder) {
SocketOptionsConfiguration.Builder builder = SocketOptionsConfiguration.builder();
socketOptionsConfigurationBuilder.accept(builder);
return socketOptionsConfiguration(builder.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import software.amazon.awssdk.utils.Validate;

/**
* Configuration that defines health checks for for all connections established by
* the{@link ConnectionHealthChecksConfiguration}.
* Configuration that defines health checks for all connections established by
* the {@link ConnectionHealthChecksConfiguration}.
*
* <b>NOTE:</b> This is a Preview API and is subject to change so it should not be used in production.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.crt;

import software.amazon.awssdk.annotations.SdkPreviewApi;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.utils.Validate;

/**
* Configuration that defines socket options for all connections established by
* the {@link SocketOptionsConfiguration}.
*
* <b>NOTE:</b> This is a Preview API and is subject to change so it should not be used in production.
*/
@SdkPublicApi
@SdkPreviewApi
public final class SocketOptionsConfiguration {

private final SocketOptions.SocketDomain domain;
private final SocketOptions.SocketType type;
private final int connectTimeoutMs;
private final int keepAliveIntervalSecs;
private final int keepAliveTimeoutSecs;

private SocketOptionsConfiguration(DefaultSocketOptionsConfigurationBuilder builder) {
this.domain = Validate.paramNotNull(builder.domain,
"domain");
this.type = Validate.paramNotNull(builder.type,
"type");
this.connectTimeoutMs = Validate.isPositive(builder.connectTimeoutMs,
"connectTimeoutMs");
this.keepAliveIntervalSecs = Validate.isNotNegative(builder.keepAliveIntervalSecs,
"keepAliveIntervalSecs");
this.keepAliveTimeoutSecs = Validate.isNotNegative(builder.keepAliveTimeoutSecs,
"keepAliveTimeoutSecs");
}

/**
* @return socket domain
*/
public SocketOptions.SocketDomain domain() {
return domain;
}

/**
* @return socket type
*/
public SocketOptions.SocketType type() {
return type;
}

/**
* @return number of milliseconds before a connection will be considered timed out
*/
public int connectTimeoutMs() {
return connectTimeoutMs;
}

/**
* @return number of seconds between TCP keepalive packets being sent to the peer
*/
public int keepAliveIntervalSecs() {
return keepAliveIntervalSecs;
}

/**
* @return number of seconds to wait for a keepalive response before considering the connection timed out
*/
public int keepAliveTimeoutSecs() {
return keepAliveTimeoutSecs;
}

public static Builder builder() {
return new DefaultSocketOptionsConfigurationBuilder();
}

/**
* A builder for {@link SocketOptionsConfiguration}.
*
* <p>All implementations of this interface are mutable and not thread safe.</p>
*/
public interface Builder {

/**
* Sets the socket domain
* @param domain socket domain
* @return Builder
*/
Builder domain(SocketOptions.SocketDomain domain);

/**
* Sets the socket type
* @param type socket type
* @return Builder
*/
Builder type(SocketOptions.SocketType type);

/**
* Sets the number of milliseconds before a connection will be considered timed out
* @param connectTimeoutMs number of milliseconds before a connection will be considered timed out
* @return Builder
*/
Builder connectTimeoutMs(int connectTimeoutMs);

/**
* Sets the number of seconds between TCP keepalive packets being sent to the peer
* @param keepAliveIntervalSecs number of seconds between TCP keepalive packets being sent to the peer
* @return Builder
*/
Builder keepAliveIntervalSecs(int keepAliveIntervalSecs);

/**
* Sets the number of seconds to wait for a keepalive response before considering the connection timed out
* @param keepAliveTimeoutSecs number of seconds to wait for a keepalive response before considering the connection timed out
* @return Builder
*/
Builder keepAliveTimeoutSecs(int keepAliveTimeoutSecs);

SocketOptionsConfiguration build();
}

/**
* An SDK-internal implementation of {@link Builder}.
*/
private static final class DefaultSocketOptionsConfigurationBuilder implements Builder {

private SocketOptions.SocketDomain domain = SocketOptions.SocketDomain.IPv6;
private SocketOptions.SocketType type = SocketOptions.SocketType.STREAM;
private int connectTimeoutMs = 3000;
private int keepAliveIntervalSecs = 0;
private int keepAliveTimeoutSecs = 0;

private DefaultSocketOptionsConfigurationBuilder() {
}

/**
* Sets the socket domain
* Default: {@link SocketOptions.SocketDomain#IPv6}
* @param domain socket domain
* @return Builder
*/
@Override
public Builder domain(SocketOptions.SocketDomain domain) {
this.domain = domain;
return this;
}

/**
* Sets the socket type
* Default: {@link SocketOptions.SocketType#STREAM}
* @param type socket type
* @return Builder
*/
@Override
public Builder type(SocketOptions.SocketType type) {
this.type = type;
return this;
}

/**
* Sets the number of milliseconds before a connection will be considered timed out
* Default: 3000ms
* @param connectTimeoutMs number of milliseconds before a connection will be considered timed out
* @return Builder
*/
@Override
public Builder connectTimeoutMs(int connectTimeoutMs) {
this.connectTimeoutMs = connectTimeoutMs;
return this;
}

/**
* Sets the number of seconds between TCP keepalive packets being sent to the peer
* Default: 0 disables keepalive
* @param keepAliveIntervalSecs number of seconds between TCP keepalive packets being sent to the peer
* @return Builder
*/
@Override
public Builder keepAliveIntervalSecs(int keepAliveIntervalSecs) {
this.keepAliveIntervalSecs = keepAliveIntervalSecs;
return this;
}

/**
* Sets the number of seconds to wait for a keepalive response before considering the connection timed out
* Default: 0 disables keepalive
* @param keepAliveTimeoutSecs number of seconds to wait for a keepalive response before considering the connection timed out
* @return Builder
*/
@Override
public Builder keepAliveTimeoutSecs(int keepAliveTimeoutSecs) {
this.keepAliveTimeoutSecs = keepAliveTimeoutSecs;
return this;
}

@Override
public SocketOptionsConfiguration build() {
return new SocketOptionsConfiguration(this);
}
}
}
Loading

0 comments on commit 96b08f5

Please sign in to comment.