-
Notifications
You must be signed in to change notification settings - Fork 276
Transport level retry #954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2aad9b0
7843f17
4eb7ed0
cf02e28
74dd40a
8012980
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package co.elastic.clients.transport.rest_client; | ||
|
||
import org.elasticsearch.client.Cancellable; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
* The {@code Future} implementation returned by async requests. | ||
* It wraps the RestClient's cancellable and propagates cancellation. | ||
*/ | ||
public class RequestFuture<T> extends CompletableFuture<T> { | ||
private volatile Cancellable cancellable; | ||
|
||
@Override | ||
public boolean cancel(boolean mayInterruptIfRunning) { | ||
boolean cancelled = super.cancel(mayInterruptIfRunning); | ||
if (cancelled && cancellable != null) { | ||
cancellable.cancel(); | ||
} | ||
return cancelled; | ||
} | ||
|
||
public void setCancellable(Cancellable cancellable) { | ||
this.cancellable = cancellable; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
package co.elastic.clients.transport.rest_client; | ||
|
||
import co.elastic.clients.transport.BackoffPolicy; | ||
import co.elastic.clients.transport.TransportOptions; | ||
import co.elastic.clients.transport.http.TransportHttpClient; | ||
import org.elasticsearch.client.ResponseException; | ||
|
||
import javax.annotation.Nullable; | ||
import java.io.IOException; | ||
import java.util.Iterator; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
public class RetryRestClientHttpClient implements TransportHttpClient { | ||
|
||
private TransportHttpClient delegate; | ||
private BackoffPolicy backoffPolicy; | ||
|
||
public RetryRestClientHttpClient(TransportHttpClient delegate, BackoffPolicy backoffPolicy) { | ||
this.delegate = delegate; | ||
this.backoffPolicy = backoffPolicy; | ||
} | ||
|
||
@Override | ||
public Response performRequest(String endpointId, @Nullable Node node, Request request, | ||
TransportOptions options) throws IOException { | ||
return performRequestRetry(endpointId, node, request, options, backoffPolicy.iterator()); | ||
} | ||
|
||
public Response performRequestRetry(String endpointId, @Nullable Node node, Request request, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need a separate public method? |
||
TransportOptions options, Iterator<Long> backoffIter) throws IOException { | ||
try { | ||
return delegate.performRequest(endpointId, node, request, options); | ||
} catch (ResponseException e) { | ||
if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be independent of the http client here, and the fact that we have an exception with an actual response is specific to So the logic could be:
|
||
// synchronous retry | ||
if (backoffIter.hasNext()) { | ||
try { | ||
Thread.sleep(backoffIter.next()); | ||
} catch (InterruptedException ie) { | ||
throw e; // TODO okay with masking IE and just returning original exception? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, |
||
} | ||
System.out.println("Retrying"); | ||
return performRequestRetry(endpointId, node, request, options, backoffIter); | ||
} | ||
} | ||
// error not retryable | ||
throw e; | ||
} | ||
} | ||
|
||
@Override | ||
public CompletableFuture<Response> performRequestAsync(String endpointId, @Nullable Node node, | ||
Request request, TransportOptions options) { | ||
RequestFuture<Response> futureResult = new RequestFuture<>(); | ||
return performRequestAsyncRetry(endpointId, node, request, options, backoffPolicy.iterator(), | ||
futureResult); | ||
} | ||
|
||
public CompletableFuture<Response> performRequestAsyncRetry(String endpointId, @Nullable Node node, | ||
Request request, | ||
TransportOptions options, | ||
Iterator<Long> backoffIter, | ||
CompletableFuture<Response> futureResult) { | ||
CompletableFuture<Response> res = delegate.performRequestAsync(endpointId, node, request, options); | ||
|
||
res.whenComplete((resp, e) -> { | ||
if (e != null) { | ||
if (e instanceof ResponseException) { | ||
if (((ResponseException) e).getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded? | ||
if (backoffIter.hasNext()) { | ||
try { | ||
Thread.sleep(backoffIter.next()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid calling A caveat of |
||
} catch (InterruptedException ie) { | ||
// TODO okay with masking IE and just returning original exception? | ||
futureResult.completeExceptionally(e); | ||
} | ||
System.out.println("Retrying"); | ||
performRequestAsyncRetry(endpointId, node, request, options, backoffIter,futureResult); | ||
} | ||
} | ||
} | ||
} | ||
else { | ||
futureResult.complete(resp); | ||
} | ||
}); | ||
|
||
return futureResult; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException { | ||
delegate.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class should be
co.elastic.transport.http.RetryingHttpClient
and be independent fromorg.elasticsearch.client
so that it can be independent of the http client implementation (see also below on exception handling)