Skip to content

Commit

Permalink
Add protection for the I/O reactor and troubleshooting docs
Browse files Browse the repository at this point in the history
  • Loading branch information
swallez committed Oct 12, 2023
1 parent 4d555ba commit 1d4508c
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 2 deletions.
3 changes: 3 additions & 0 deletions docs/troubleshooting/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

* <<missing-required-property>>
* <<no-such-method-request-options>>
* <<io-reactor-errors>>

[discrete]
=== Miscellaneous
Expand All @@ -23,4 +24,6 @@

include::missing-required-property.asciidoc[]
include::no-such-method-request-options.asciidoc[]
include::io-reactor-errors.asciidoc[]

include::serialize-without-typed-keys.asciidoc[]
33 changes: 33 additions & 0 deletions docs/troubleshooting/io-reactor-errors.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[[io-reactor-errors]]
=== Apache http-client I/O reactor errors

Sending requests can sometimes fail with one of the following errors, coming from the Apache http-client library:

* `Request cannot be executed; I/O reactor status: STOPPED`
* `I/O reactor terminated abnormally`
* `I/O reactor has been shut down`

The I/O Reactor is the internal event loop in the http client library. It can terminate when an application callback throws an `Error`, like an `OutOfMemoryError` or a `StackOverflowError`. Remember that `Error` is different from a regular `Exception` and – https://docs.oracle.com/javase/8/docs/api/?java/lang/Error.html[quoting the Java documentation] – _indicates serious problems that a reasonable application should not try to catch_.

In the context of the Elasticsearch Java clients, this can happen on two occasions:

* the application calls the low level `RestClient` directly, using the asynchronous `performRequestAsync` method, and an `Error` is thrown in the `ResponseListener` provided by the application.
* an `OutOfMemoryError` happens while buffering the body of an http response.

In the first case, it is the application's responsibility to catch `Error` in its `ResponseListener` and decide what to do when these errors happen.

The second case is taken care of in the {java-client} since version 8.12: the error is wrapped in a `RuntimeException` that is reported to the application.

In previous versions of the {java-client}, you can copy/paste the `SafeResponseConsumer` class in your project and initialize the `RestClientTransport` as follows:

["source","java"]
------
RestClient restClient = ...
JsonpMapper mapper = ...
RestClientOptions options = new RestClientOptions(
SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS
);
RestClientTransport transport = new RestClientTransport(
restClient, mapper, options
);
------
5 changes: 4 additions & 1 deletion java-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,11 @@ dependencies {
testImplementation("org.testcontainers", "testcontainers", "1.17.3")
testImplementation("org.testcontainers", "elasticsearch", "1.17.3")


testImplementation("io.opentelemetry", "opentelemetry-sdk", openTelemetryVersion)

// Apache-2.0
// https://github.com/awaitility/awaitility
testImplementation("org.awaitility", "awaitility", "4.2.0")
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public RestClientOptions build() {
}

static RestClientOptions initialOptions() {
return new RestClientOptions(RequestOptions.DEFAULT);
return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS);
}

private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.transport.rest_client;

import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.protocol.HttpContext;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;

import java.io.IOException;

/**
* A response consumer that will propagate Errors as RuntimeExceptions to avoid crashing the IOReactor.
*/
public class SafeResponseConsumer<T> implements HttpAsyncResponseConsumer<T> {

private final HttpAsyncResponseConsumer<T> delegate;

/**
* A consumer factory that safely wraps the one provided by {@code RequestOptions.DEFAULT}.
*/
public static final HttpAsyncResponseConsumerFactory DEFAULT_FACTORY = () -> new SafeResponseConsumer<>(
RequestOptions.DEFAULT.getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer()
);

/**
* Same as {@code RequestOptions.DEFAULT} with a safe consumer factory
*/
public static final RequestOptions DEFAULT_REQUEST_OPTIONS;

static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
builder.setHttpAsyncResponseConsumerFactory(DEFAULT_FACTORY);
DEFAULT_REQUEST_OPTIONS = builder.build();
}

public SafeResponseConsumer(HttpAsyncResponseConsumer<T> delegate) {
this.delegate = delegate;
}

@SuppressWarnings("unchecked")
private static <T extends Throwable> void throwUnchecked(Throwable thr) throws T {
throw (T) thr;
}

@Override
public void responseReceived(HttpResponse response) throws IOException, HttpException {
try {
delegate.responseReceived(response);
} catch(Exception e) {
throwUnchecked(e);
} catch(Throwable e) {
throw new RuntimeException("Error receiving response", e);
}
}

@Override
public void consumeContent(ContentDecoder decoder, IOControl ioControl) throws IOException {
try {
delegate.consumeContent(decoder, ioControl);
} catch(Exception e) {
throwUnchecked(e);
} catch(Throwable e) {
throw new RuntimeException("Error consuming content", e);
}
}

@Override
public void responseCompleted(HttpContext context) {
try {
delegate.responseCompleted(context);
} catch(Exception e) {
throwUnchecked(e);
} catch(Throwable e) {
throw new RuntimeException("Error completing response", e);
}
}

@Override
public void failed(Exception ex) {
try {
delegate.failed(ex);
} catch(Exception e) {
throwUnchecked(e);
} catch(Throwable e) {
throw new RuntimeException("Error handling failure", e);
}
}

@Override
public Exception getException() {
return delegate.getException();
}

@Override
public T getResult() {
return delegate.getResult();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public boolean cancel() {
return delegate.cancel();
}
}
Loading

0 comments on commit 1d4508c

Please sign in to comment.