Skip to content

Commit

Permalink
Merge pull request #273 from art-community/feature/keep-alive
Browse files Browse the repository at this point in the history
Feature/keep alive
  • Loading branch information
Anton Bashirov authored Apr 19, 2020
2 parents 42b2070 + 01847fd commit ce2c506
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ static GrpcCommunicator grpcCommunicator(String host, int port, String path) {

GrpcCommunicator secured();

GrpcCommunicator keepAliveTimeNanos(long time);

GrpcCommunicator keepAliveTimeOutNanos(long timeOut);

GrpcCommunicator keepAliveWithoutCalls(boolean keepAliveWithoutCalls);

void shutdownChannel();

GrpcAsynchronousCommunicator asynchronous();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,15 @@ public class GrpcCommunicatorImplementation implements GrpcCommunicator, GrpcCom
GrpcCommunicatorImplementation(GrpcCommunicationTargetConfiguration targetConfiguration) {
configuration.setPath(validator.notEmptyField(targetConfiguration.path(), "path"));
deadlineTimeout(targetConfiguration.timeout());
keepAliveTimeNanos(targetConfiguration.keepAliveTimeNanos());
keepAliveTimeNanos(targetConfiguration.keepAliveTimeOutNanos());
keepAliveWithoutCalls(targetConfiguration.keepAliveWithoutCalls());
if (targetConfiguration.secured()) {
secured();
}
if (targetConfiguration.waitForReady()) {
waitForReady();
}
if (isNotEmpty(targetConfiguration.url())) {
configuration.setUrl(targetConfiguration.url());
return;
Expand Down Expand Up @@ -141,6 +147,24 @@ public GrpcCommunicator secured() {
return this;
}

@Override
public GrpcCommunicator keepAliveTimeNanos(long time) {
configuration.setKeepAliveTimeNanos(time);
return this;
}

@Override
public GrpcCommunicator keepAliveTimeOutNanos(long timeOut) {
configuration.setKeepAliveTimeNanos(timeOut);
return this;
}

@Override
public GrpcCommunicator keepAliveWithoutCalls(boolean keepAliveWithoutCalls) {
configuration.setKeepAliveWithoutCalls(keepAliveWithoutCalls);
return null;
}

@Override
public void shutdownChannel() {
ManagedChannel channel = this.channel.safeValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class HttpCommunicationConfiguration {
private HttpCommunicationCancellationHandler<?> cancellationHandler;
private boolean chunkedBody;
private boolean gzipCompressedBody;
private ConnectionClosingPolicy connectionClosingPolicy = CLOSE_AFTER_RESPONSE;
private RequestConfig requestConfig = httpClientModule().getRequestConfig();
private HttpVersion httpProtocolVersion = httpClientModule().getHttpVersion();
private MimeToContentTypeMapper producesMimeType = httpClientModule().getProducesMimeTypeMapper();
Expand All @@ -68,4 +67,6 @@ class HttpCommunicationConfiguration {
private CloseableHttpAsyncClient asynchronousClient;
private List<ValueInterceptor<Value, Value>> requestValueInterceptors = linkedListOf(httpClientModule().getRequestValueInterceptors());
private List<ValueInterceptor<Value, Value>> responseValueInterceptors = linkedListOf(httpClientModule().getResponseValueInterceptors());
private ConnectionClosingPolicy connectionClosingPolicy = CLOSE_AFTER_RESPONSE;
private boolean enableKeepAlive;
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@
import static ru.art.http.client.builder.HttpUriBuilder.*;
import static ru.art.http.client.constants.HttpClientExceptionMessages.*;
import static ru.art.http.client.constants.HttpClientModuleConstants.ConnectionClosingPolicy.CLOSE_AFTER_RESPONSE;
import static ru.art.http.client.constants.HttpClientModuleConstants.HTTP_HEADER_CONNECTION_CLOSE;
import static ru.art.http.client.constants.HttpClientModuleConstants.HTTP_HEADER_CONNECTION_KEEP_ALIVE;
import static ru.art.http.client.module.HttpClientModule.*;
import static ru.art.http.constants.HttpHeaders.CONNECTION;
import static ru.art.http.constants.HttpHeaders.KEEP_ALIVE;
import static ru.art.logging.LoggingModule.*;
import javax.annotation.*;
Expand Down Expand Up @@ -159,6 +162,7 @@ private static HttpUriRequest buildRequest(HttpCommunicationConfiguration config
.setCharset(configuration.getRequestContentCharset())
.setVersion(configuration.getHttpProtocolVersion());
configuration.getHeaders().forEach(requestBuilder::addHeader);
requestBuilder.addHeader(CONNECTION, configuration.isEnableKeepAlive() ? HTTP_HEADER_CONNECTION_KEEP_ALIVE : HTTP_HEADER_CONNECTION_CLOSE);
if (isNull(request)) {
return requestBuilder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ static HttpCommunicator httpCommunicator(HttpCommunicationTargetConfiguration ta

HttpCommunicator requestEncoding(String encoding);

HttpCommunicator enableKeepAlive();

<RequestType, ResponseType> Optional<ResponseType> execute(RequestType request);

<ResponseType> Optional<ResponseType> execute();
Expand All @@ -108,6 +110,7 @@ static HttpCommunicator httpCommunicator(HttpCommunicationTargetConfiguration ta

HttpAsynchronousCommunicator asynchronous();


interface HttpAsynchronousCommunicator {
void closeAsynchronousClient();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,12 @@ public HttpCommunicator requestEncoding(String encoding) {
return this;
}

@Override
public HttpCommunicator enableKeepAlive() {
configuration.setEnableKeepAlive(true);
return this;
}

@Override
public HttpCommunicator addRequestValueInterceptor(ValueInterceptor<Value, Value> interceptor) {
configuration.getRequestValueInterceptors().add(validator.notNullField(interceptor, "requestValueInterceptor"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public interface HttpClientModuleConstants {
int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 2;
int DEFAULT_MAX_CONNECTIONS_TOTAL = 20;
int DEFAULT_VALIDATE_AFTER_INACTIVITY_MILLIS = 4000;
String HTTP_HEADER_CONNECTION_CLOSE = "Close";
String HTTP_HEADER_CONNECTION_KEEP_ALIVE = "Keep-Alive";

enum ConnectionClosingPolicy {
CLOSE_AFTER_RESPONSE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class SoapCommunicationConfiguration {
private List<ValueInterceptor<XmlEntity, XmlEntity>> responseValueInterceptors = linkedListOf();
private OperationIdSource operationIdSource = REQUEST;
private ConnectionClosingPolicy connectionClosingPolicy = CLOSE_AFTER_RESPONSE;
private boolean enableKeepAlive;

void validateRequiredFields() {
boolean urlIsEmpty = isEmpty(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ static <RequestType, ResponseType> Optional<ResponseType> execute(SoapCommunicat
configuration.getResponseInterceptors().forEach(httpCommunicator::addResponseInterceptor);
configuration.getRequestValueInterceptors().forEach(interceptor -> httpCommunicator.addRequestValueInterceptor(cast(interceptor)));
configuration.getResponseValueInterceptors().forEach(interceptor -> httpCommunicator.addResponseValueInterceptor(cast(interceptor)));
if (configuration.isEnableKeepAlive()) {
httpCommunicator.enableKeepAlive();
}
httpCommunicator
.connectionClosingPolicy(configuration.getConnectionClosingPolicy())
.version(configuration.getHttpVersion())
Expand Down Expand Up @@ -68,6 +71,9 @@ static <RequestType, ResponseType> CompletableFuture<Optional<ResponseType>> exe
configuration.getRequestInterceptors().forEach(httpCommunicator::addResponseInterceptor);
configuration.getRequestValueInterceptors().forEach(interceptor -> httpCommunicator.addRequestValueInterceptor(cast(interceptor)));
configuration.getResponseValueInterceptors().forEach(interceptor -> httpCommunicator.addResponseValueInterceptor(cast(interceptor)));
if (configuration.isEnableKeepAlive()) {
httpCommunicator.enableKeepAlive();
}
httpCommunicator
.connectionClosingPolicy(configuration.getConnectionClosingPolicy())
.version(configuration.getHttpVersion())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ static SoapCommunicator soapCommunicator(HttpCommunicationTargetConfiguration ta

SoapCommunicator version(HttpVersion httpVersion);

SoapCommunicator enableKeepAlive();

<RequestType, ResponseType> Optional<ResponseType> execute(RequestType request);

SoapAsynchronousCommunicator asynchronous();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ public SoapCommunicator version(HttpVersion httpVersion) {
return this;
}

@Override
public SoapCommunicator enableKeepAlive() {
configuration.setEnableKeepAlive(true);
return this;
}

@Override
public <RequestType, ResponseType> Optional<ResponseType> execute(RequestType request) {
request = validator.notNullField(request, "request");
Expand Down
9 changes: 9 additions & 0 deletions application-test/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.FULL
import org.gradle.api.tasks.testing.logging.TestLogEvent.*
import ru.art.gradle.constants.lombok

/*
Expand Down Expand Up @@ -36,4 +38,11 @@ dependencies {
annotationProcessor(lombok().inGradleNotation())
testAnnotationProcessor(lombok().inGradleNotation())
testImplementation("org.hsqldb", "hsqldb", "2+")
}

tasks.withType<Test> {
testLogging {
events = setOf(PASSED, FAILED, SKIPPED)
exceptionFormat = FULL
}
}

0 comments on commit ce2c506

Please sign in to comment.