From 1d4508cbd16e14fd800fedc96d33bb26aa16624e Mon Sep 17 00:00:00 2001 From: Sylvain Wallez Date: Thu, 12 Oct 2023 19:49:55 +0200 Subject: [PATCH] Add protection for the I/O reactor and troubleshooting docs --- docs/troubleshooting/index.asciidoc | 3 + .../io-reactor-errors.asciidoc | 33 +++ java-client/build.gradle.kts | 5 +- .../rest_client/RestClientOptions.java | 2 +- .../rest_client/SafeResponseConsumer.java | 135 ++++++++++++ .../rest_client/SafeResponseConsumerTest.java | 206 ++++++++++++++++++ 6 files changed, 382 insertions(+), 2 deletions(-) create mode 100644 docs/troubleshooting/io-reactor-errors.asciidoc create mode 100644 java-client/src/main/java/co/elastic/clients/transport/rest_client/SafeResponseConsumer.java create mode 100644 java-client/src/test/java/co/elastic/clients/transport/rest_client/SafeResponseConsumerTest.java diff --git a/docs/troubleshooting/index.asciidoc b/docs/troubleshooting/index.asciidoc index f011fbf94..0fb11a41d 100644 --- a/docs/troubleshooting/index.asciidoc +++ b/docs/troubleshooting/index.asciidoc @@ -9,6 +9,7 @@ * <> * <> +* <> [discrete] === Miscellaneous @@ -23,4 +24,6 @@ include::missing-required-property.asciidoc[] include::no-such-method-request-options.asciidoc[] +include::io-reactor-errors.asciidoc[] + include::serialize-without-typed-keys.asciidoc[] diff --git a/docs/troubleshooting/io-reactor-errors.asciidoc b/docs/troubleshooting/io-reactor-errors.asciidoc new file mode 100644 index 000000000..8146646ff --- /dev/null +++ b/docs/troubleshooting/io-reactor-errors.asciidoc @@ -0,0 +1,33 @@ +[[io-reactor-errors]] +=== Apache http-client I/O reactor errors + +Sending requests can sometimes fail with one of the following errors, coming from the Apache http-client library: + +* `Request cannot be executed; I/O reactor status: STOPPED` +* `I/O reactor terminated abnormally` +* `I/O reactor has been shut down` + +The I/O Reactor is the internal event loop in the http client library. It can terminate when an application callback throws an `Error`, like an `OutOfMemoryError` or a `StackOverflowError`. Remember that `Error` is different from a regular `Exception` and – https://docs.oracle.com/javase/8/docs/api/?java/lang/Error.html[quoting the Java documentation] – _indicates serious problems that a reasonable application should not try to catch_. + +In the context of the Elasticsearch Java clients, this can happen on two occasions: + +* the application calls the low level `RestClient` directly, using the asynchronous `performRequestAsync` method, and an `Error` is thrown in the `ResponseListener` provided by the application. +* an `OutOfMemoryError` happens while buffering the body of an http response. + +In the first case, it is the application's responsibility to catch `Error` in its `ResponseListener` and decide what to do when these errors happen. + +The second case is taken care of in the {java-client} since version 8.12: the error is wrapped in a `RuntimeException` that is reported to the application. + +In previous versions of the {java-client}, you can copy/paste the `SafeResponseConsumer` class in your project and initialize the `RestClientTransport` as follows: + +["source","java"] +------ +RestClient restClient = ... +JsonpMapper mapper = ... +RestClientOptions options = new RestClientOptions( + SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS +); +RestClientTransport transport = new RestClientTransport( + restClient, mapper, options +); +------ diff --git a/java-client/build.gradle.kts b/java-client/build.gradle.kts index 337ad7fd6..31fa9b31c 100644 --- a/java-client/build.gradle.kts +++ b/java-client/build.gradle.kts @@ -251,8 +251,11 @@ dependencies { testImplementation("org.testcontainers", "testcontainers", "1.17.3") testImplementation("org.testcontainers", "elasticsearch", "1.17.3") - testImplementation("io.opentelemetry", "opentelemetry-sdk", openTelemetryVersion) + + // Apache-2.0 + // https://github.com/awaitility/awaitility + testImplementation("org.awaitility", "awaitility", "4.2.0") } diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java index 455459c18..9de6da07e 100644 --- a/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/RestClientOptions.java @@ -188,7 +188,7 @@ public RestClientOptions build() { } static RestClientOptions initialOptions() { - return new RestClientOptions(RequestOptions.DEFAULT); + return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS); } private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) { diff --git a/java-client/src/main/java/co/elastic/clients/transport/rest_client/SafeResponseConsumer.java b/java-client/src/main/java/co/elastic/clients/transport/rest_client/SafeResponseConsumer.java new file mode 100644 index 000000000..c7d6cfde0 --- /dev/null +++ b/java-client/src/main/java/co/elastic/clients/transport/rest_client/SafeResponseConsumer.java @@ -0,0 +1,135 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest_client; + +import org.apache.http.HttpException; +import org.apache.http.HttpResponse; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; +import org.apache.http.protocol.HttpContext; +import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; +import org.elasticsearch.client.RequestOptions; + +import java.io.IOException; + +/** + * A response consumer that will propagate Errors as RuntimeExceptions to avoid crashing the IOReactor. + */ +public class SafeResponseConsumer implements HttpAsyncResponseConsumer { + + private final HttpAsyncResponseConsumer delegate; + + /** + * A consumer factory that safely wraps the one provided by {@code RequestOptions.DEFAULT}. + */ + public static final HttpAsyncResponseConsumerFactory DEFAULT_FACTORY = () -> new SafeResponseConsumer<>( + RequestOptions.DEFAULT.getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer() + ); + + /** + * Same as {@code RequestOptions.DEFAULT} with a safe consumer factory + */ + public static final RequestOptions DEFAULT_REQUEST_OPTIONS; + + static { + RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); + builder.setHttpAsyncResponseConsumerFactory(DEFAULT_FACTORY); + DEFAULT_REQUEST_OPTIONS = builder.build(); + } + + public SafeResponseConsumer(HttpAsyncResponseConsumer delegate) { + this.delegate = delegate; + } + + @SuppressWarnings("unchecked") + private static void throwUnchecked(Throwable thr) throws T { + throw (T) thr; + } + + @Override + public void responseReceived(HttpResponse response) throws IOException, HttpException { + try { + delegate.responseReceived(response); + } catch(Exception e) { + throwUnchecked(e); + } catch(Throwable e) { + throw new RuntimeException("Error receiving response", e); + } + } + + @Override + public void consumeContent(ContentDecoder decoder, IOControl ioControl) throws IOException { + try { + delegate.consumeContent(decoder, ioControl); + } catch(Exception e) { + throwUnchecked(e); + } catch(Throwable e) { + throw new RuntimeException("Error consuming content", e); + } + } + + @Override + public void responseCompleted(HttpContext context) { + try { + delegate.responseCompleted(context); + } catch(Exception e) { + throwUnchecked(e); + } catch(Throwable e) { + throw new RuntimeException("Error completing response", e); + } + } + + @Override + public void failed(Exception ex) { + try { + delegate.failed(ex); + } catch(Exception e) { + throwUnchecked(e); + } catch(Throwable e) { + throw new RuntimeException("Error handling failure", e); + } + } + + @Override + public Exception getException() { + return delegate.getException(); + } + + @Override + public T getResult() { + return delegate.getResult(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public boolean cancel() { + return delegate.cancel(); + } +} diff --git a/java-client/src/test/java/co/elastic/clients/transport/rest_client/SafeResponseConsumerTest.java b/java-client/src/test/java/co/elastic/clients/transport/rest_client/SafeResponseConsumerTest.java new file mode 100644 index 000000000..2910ed5d5 --- /dev/null +++ b/java-client/src/test/java/co/elastic/clients/transport/rest_client/SafeResponseConsumerTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package co.elastic.clients.transport.rest_client; + +import com.sun.net.httpserver.HttpServer; +import org.apache.http.HttpEntity; +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.entity.ContentType; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.IOControl; +import org.elasticsearch.client.HeapBufferedAsyncResponseConsumer; +import org.elasticsearch.client.HttpAsyncResponseConsumerFactory; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +public class SafeResponseConsumerTest { + + static HttpServer Server; + static HttpHost ESHost; + + // A consumer factory that throws an Error, to simulate the effect of an OOME + static HttpAsyncResponseConsumerFactory FailingConsumerFactory = () -> new HeapBufferedAsyncResponseConsumer(100 * 1024 * 1024) { + @Override + protected void onResponseReceived(HttpResponse httpResponse) throws HttpException, IOException { + super.onResponseReceived(httpResponse); + } + + @Override + protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException { + super.onContentReceived(decoder, ioctrl); + throw new Error("Error in onContentReceived"); + } + + @Override + protected void onEntityEnclosed(HttpEntity entity, ContentType contentType) throws IOException { + super.onEntityEnclosed(entity, contentType); + } + }; + + @BeforeAll + public static void setup() throws Exception { + Server = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + Server.start(); + + Server.createContext("/", exchange -> { + String path = exchange.getRequestURI().getPath(); + exchange.getResponseHeaders().set("Content-Type", "application/json"); + exchange.getResponseHeaders().set("X-Elastic-Product", "Elasticsearch"); + + if (path.equals("/")) { + byte[] bytes = Info.getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(200, bytes.length); + exchange.getResponseBody().write(bytes); + exchange.close(); + return; + } + + exchange.sendResponseHeaders(404, -1); + exchange.close(); + }); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + Server.stop(1); + } catch (Exception e) { + // Ignore + } + })); + + ESHost = new HttpHost(Server.getAddress().getAddress(), Server.getAddress().getPort()); + } + + @AfterAll + public static void tearDown() { + Server.stop(0); + } + + @Test + public void testReactorDeath() throws Exception { + + // Request options that will simulate an OOME and cause the reactor to die + RequestOptions.Builder failingOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + failingOptionsBuilder.setHttpAsyncResponseConsumerFactory(FailingConsumerFactory); + RequestOptions failingOptions = failingOptionsBuilder.build(); + + RestClient restClient = RestClient.builder(ESHost).build(); + + // First request, to warm things up. + // An "indice exists" request, that has no response body + Request existsReq = new Request("HEAD", "/index-name"); + restClient.performRequest(existsReq); + + try { + Request infoReq = new Request("GET", "/"); + infoReq.setOptions(failingOptions); + + restClient.performRequest(infoReq); + Assertions.fail("First request should not succeed"); + } catch(Exception t) { +// System.err.println("Request 1 error"); +// t.printStackTrace(); + } + + Thread.sleep(1000); + + try { + // 2nd request with no specific options + Request infoReq = new Request("GET", "/"); + restClient.performRequest(infoReq); + Assertions.fail("Second request should not succeed"); + } catch(Exception t) { +// System.err.println("Request 2 error"); +// t.printStackTrace(); + } + + restClient.close(); + } + + @Test + public void testReactorSurvival() throws Exception { + + // Request options that will simulate an OOME and wrapped in the safe consumer that will + // avoid the reactor's death + RequestOptions.Builder protectedFailingOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + protectedFailingOptionsBuilder.setHttpAsyncResponseConsumerFactory(() -> + new SafeResponseConsumer<>(FailingConsumerFactory.createHttpAsyncResponseConsumer()) + ); + RequestOptions protectedFailingOptions = protectedFailingOptionsBuilder.build(); + + RestClient restClient = RestClient.builder(ESHost).build(); + + // First request, to warm things up. + // An "indice exists" request, that has no response body + Request existsReq = new Request("HEAD", "/index-name"); + restClient.performRequest(existsReq); + + try { + Request infoReq = new Request("GET", "/"); + infoReq.setOptions(protectedFailingOptions); + + restClient.performRequest(infoReq); + Assertions.fail("First request should not succeed"); + } catch(Exception t) { + // System.err.println("Request 1 error"); + // t.printStackTrace(); + } + + { + // 2nd request with no specific options + Request infoReq = new Request("GET", "/"); + + Response resp = restClient.performRequest(infoReq); + Assertions.assertEquals(200, resp.getStatusLine().getStatusCode()); + } + + restClient.close(); + } + + private static final String Info = "{\n" + + " \"cluster_name\": \"foo\",\n" + + " \"cluster_uuid\": \"bar\",\n" + + " \"version\": {\n" + + " \"build_date\": \"2022-01-28T08:36:04.875279988Z\",\n" + + " \"minimum_wire_compatibility_version\": \"6.8.0\",\n" + + " \"build_hash\": \"bee86328705acaa9a6daede7140defd4d9ec56bd\",\n" + + " \"number\": \"7.17.0\",\n" + + " \"lucene_version\": \"8.11.1\",\n" + + " \"minimum_index_compatibility_version\": \"6.0.0-beta1\",\n" + + " \"build_flavor\": \"default\",\n" + + " \"build_snapshot\": false,\n" + + " \"build_type\": \"docker\"\n" + + " },\n" + + " \"name\": \"instance-0000000000\",\n" + + " \"tagline\": \"You Know, for Search\"\n" + + "}"; +}