Skip to content

Commit

Permalink
Send RetryInfo on OTel Timeouts (#4294)
Browse files Browse the repository at this point in the history
DataPrepper is sending `RESOURCE_EXHAUSTED` gRPC responses
whenever a buffer is full or a circuit breaker is active. These statuses do
not contain a retry info. In the OpenTelemetry protocol, this implies a
non-retryable error, that will lead to message drops, e.g. in the OTel
collector. To apply proper back pressure in these scenarios a retry info is
added to the status.

Implementation uses exponential backoff. Idea is to start with a minimum
delay on the first time-out or circuit breaker activation. If the next such
event happens within twice the last delay after the previous event, double
the delay until a maximum delay is reached. Use the maximum delay from
then on, until a sufficiently long period (maximum delay) without an event
happens. Then the delay is reset to minimum.

---------

Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
Signed-off-by: Tomas Longo <tomas.longo@sap.com>
Co-authored-by: David Venable <dlv@amazon.com>
  • Loading branch information
KarstenSchnitter and dlvenable authored Nov 13, 2024
1 parent 059e1c5 commit 2595076
Show file tree
Hide file tree
Showing 25 changed files with 1,009 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

package org.opensearch.dataprepper;

import com.google.protobuf.Any;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.grpc.GrpcStatusFunction;
import com.linecorp.armeria.common.grpc.GoogleGrpcExceptionHandlerFunction;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
import io.grpc.Status;
Expand All @@ -22,9 +23,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
public class GrpcRequestExceptionHandler implements GoogleGrpcExceptionHandlerFunction {
private static final Logger LOG = LoggerFactory.getLogger(GrpcRequestExceptionHandler.class);
static final String ARMERIA_REQUEST_TIMEOUT_MESSAGE = "Timeout waiting for request to be served. This is usually due to the buffer being full.";

Expand All @@ -37,53 +39,57 @@ public class GrpcRequestExceptionHandler implements GrpcStatusFunction {
private final Counter badRequestsCounter;
private final Counter requestsTooLargeCounter;
private final Counter internalServerErrorCounter;
private final GrpcRetryInfoCalculator retryInfoCalculator;

public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics) {
public GrpcRequestExceptionHandler(final PluginMetrics pluginMetrics, Duration retryInfoMinDelay, Duration retryInfoMaxDelay) {
requestTimeoutsCounter = pluginMetrics.counter(REQUEST_TIMEOUTS);
badRequestsCounter = pluginMetrics.counter(BAD_REQUESTS);
requestsTooLargeCounter = pluginMetrics.counter(REQUESTS_TOO_LARGE);
internalServerErrorCounter = pluginMetrics.counter(INTERNAL_SERVER_ERROR);
retryInfoCalculator = new GrpcRetryInfoCalculator(retryInfoMinDelay, retryInfoMaxDelay);
}

@Override
public @Nullable Status apply(final RequestContext context, final Throwable exception, final Metadata metadata) {
final Throwable exceptionCause = exception instanceof BufferWriteException ? exception.getCause() : exception;

public com.google.rpc.@Nullable Status applyStatusProto(RequestContext ctx, Throwable throwable,
Metadata metadata) {
final Throwable exceptionCause = throwable instanceof BufferWriteException ? throwable.getCause() : throwable;
return handleExceptions(exceptionCause);
}

private Status handleExceptions(final Throwable e) {
private com.google.rpc.Status handleExceptions(final Throwable e) {
String message = e.getMessage();
if (e instanceof RequestTimeoutException || e instanceof TimeoutException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
return createStatus(e, Status.Code.RESOURCE_EXHAUSTED);
} else if (e instanceof SizeOverflowException) {
requestsTooLargeCounter.increment();
return createStatus(e, Status.RESOURCE_EXHAUSTED);
return createStatus(e, Status.Code.RESOURCE_EXHAUSTED);
} else if (e instanceof BadRequestException) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
return createStatus(e, Status.Code.INVALID_ARGUMENT);
} else if ((e instanceof StatusRuntimeException) && (message.contains("Invalid protobuf byte sequence") || message.contains("Can't decode compressed frame"))) {
badRequestsCounter.increment();
return createStatus(e, Status.INVALID_ARGUMENT);
return createStatus(e, Status.Code.INVALID_ARGUMENT);
} else if (e instanceof RequestCancelledException) {
requestTimeoutsCounter.increment();
return createStatus(e, Status.CANCELLED);
return createStatus(e, Status.Code.CANCELLED);
}

internalServerErrorCounter.increment();
LOG.error("Unexpected exception handling gRPC request", e);
return createStatus(e, Status.INTERNAL);
return createStatus(e, Status.Code.INTERNAL);
}

private Status createStatus(final Throwable e, final Status status) {
final String message;
private com.google.rpc.Status createStatus(final Throwable e, final Status.Code code) {
com.google.rpc.Status.Builder builder = com.google.rpc.Status.newBuilder().setCode(code.value());
if (e instanceof RequestTimeoutException) {
message = ARMERIA_REQUEST_TIMEOUT_MESSAGE;
builder.setMessage(ARMERIA_REQUEST_TIMEOUT_MESSAGE);
} else {
message = e.getMessage() == null ? status.getCode().name() : e.getMessage();
builder.setMessage(e.getMessage() == null ? code.name() :e.getMessage());
}

return status.withDescription(message);
if (code == Status.Code.RESOURCE_EXHAUSTED) {
builder.addDetails(Any.pack(retryInfoCalculator.createRetryInfo()));
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.opensearch.dataprepper;

import com.google.rpc.RetryInfo;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;

class GrpcRetryInfoCalculator {

private final Duration minimumDelay;
private final Duration maximumDelay;

private final AtomicReference<Instant> lastTimeCalled;
private final AtomicReference<Duration> nextDelay;

GrpcRetryInfoCalculator(Duration minimumDelay, Duration maximumDelay) {
this.minimumDelay = minimumDelay;
this.maximumDelay = maximumDelay;
// Create a cushion so that the calculator treats a first quick exception (after prepper startup) as normal request (e.g. does not calculate a backoff)
this.lastTimeCalled = new AtomicReference<>(Instant.now().minus(maximumDelay));
this.nextDelay = new AtomicReference<>(minimumDelay);
}

private static RetryInfo createProtoResult(Duration delay) {
return RetryInfo.newBuilder().setRetryDelay(mapDuration(delay)).build();
}

private static Duration minDuration(Duration left, Duration right) {
return left.compareTo(right) <= 0 ? left : right;
}

private static com.google.protobuf.Duration.Builder mapDuration(Duration duration) {
return com.google.protobuf.Duration.newBuilder().setSeconds(duration.getSeconds()).setNanos(duration.getNano());
}

RetryInfo createRetryInfo() {
Instant now = Instant.now();
// Is the last time we got called longer ago than the next delay?
if (lastTimeCalled.getAndSet(now).isBefore(now.minus(nextDelay.get()))) {
// Use minimum delay and reset the saved delay
nextDelay.set(minimumDelay);
return createProtoResult(minimumDelay);
}
Duration delay = nextDelay.getAndUpdate(d -> minDuration(maximumDelay, d.multipliedBy(2)));
return createProtoResult(delay);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper;

import com.google.protobuf.Any;
import com.google.rpc.RetryInfo;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.server.RequestTimeoutException;
import io.grpc.Metadata;
Expand All @@ -13,6 +15,9 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.exceptions.BadRequestException;
Expand All @@ -22,11 +27,15 @@
import org.opensearch.dataprepper.model.buffer.SizeOverflowException;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import static com.linecorp.armeria.internal.common.grpc.MetadataUtil.GRPC_STATUS_DETAILS_BIN_KEY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -55,6 +64,9 @@ public class GrpcRequestExceptionHandlerTest {
@Mock
private Metadata metadata;

@Captor
private ArgumentCaptor<com.google.rpc.Status> status;

private GrpcRequestExceptionHandler grpcRequestExceptionHandler;

@BeforeEach
Expand All @@ -64,7 +76,7 @@ public void setUp() {
when(pluginMetrics.counter(HttpRequestExceptionHandler.REQUESTS_TOO_LARGE)).thenReturn(requestsTooLargeCounter);
when(pluginMetrics.counter(HttpRequestExceptionHandler.INTERNAL_SERVER_ERROR)).thenReturn(internalServerErrorCounter);

grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
grpcRequestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics, Duration.ofMillis(100), Duration.ofSeconds(2));
}

@Test
Expand Down Expand Up @@ -99,6 +111,12 @@ public void testHandleTimeoutException() {
assertThat(messageStatus.getDescription(), equalTo(exceptionMessage));

verify(requestTimeoutsCounter, times(2)).increment();

verify(metadata, times(2)).put(ArgumentMatchers.eq(GRPC_STATUS_DETAILS_BIN_KEY), status.capture());
for (com.google.rpc.Status currentStatus: status.getAllValues()) {
Optional<Any> retryInfo = currentStatus.getDetailsList().stream().filter(d -> d.is(RetryInfo.class)).findFirst();
assertTrue(retryInfo.isPresent(), "No RetryInfo at status:\n" + currentStatus.toString());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.opensearch.dataprepper;

import com.google.rpc.RetryInfo;
import org.junit.jupiter.api.Test;

import java.time.Duration;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

public class GrpcRetryInfoCalculatorTest {

@Test
public void testMinimumDelayOnFirstCall() {
RetryInfo retryInfo = new GrpcRetryInfoCalculator(Duration.ofMillis(100), Duration.ofSeconds(1)).createRetryInfo();

assertThat(retryInfo.getRetryDelay().getNanos(), equalTo(100_000_000));
assertThat(retryInfo.getRetryDelay().getSeconds(), equalTo(0L));
}

@Test
public void testExponentialBackoff() {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(10));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();
RetryInfo retryInfo4 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(2L));
assertThat(retryInfo4.getRetryDelay().getSeconds(), equalTo(4L));
}

@Test
public void testUsesMaximumAsLongestDelay() {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(1), Duration.ofSeconds(2));
RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(1L));
assertThat(retryInfo3.getRetryDelay().getSeconds(), equalTo(2L));
}

@Test
public void testResetAfterDelayWearsOff() throws InterruptedException {
int minDelayNanos = 1_000_000;
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofNanos(minDelayNanos), Duration.ofSeconds(1));

RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();
RetryInfo retryInfo3 = calculator.createRetryInfo();
sleep(retryInfo3);
RetryInfo retryInfo4 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getNanos(), equalTo(minDelayNanos));
assertThat(retryInfo2.getRetryDelay().getNanos(), equalTo(minDelayNanos));
assertThat(retryInfo3.getRetryDelay().getNanos(), equalTo(minDelayNanos * 2));
assertThat(retryInfo4.getRetryDelay().getNanos(), equalTo(minDelayNanos));
}

@Test
public void testQuickFirstExceptionDoesNotTriggerBackoffCalculationEvenWithLongMinDelay() throws InterruptedException {
GrpcRetryInfoCalculator calculator =
new GrpcRetryInfoCalculator(Duration.ofSeconds(10), Duration.ofSeconds(20));

RetryInfo retryInfo1 = calculator.createRetryInfo();
RetryInfo retryInfo2 = calculator.createRetryInfo();

assertThat(retryInfo1.getRetryDelay().getSeconds(), equalTo(10L));
assertThat(retryInfo2.getRetryDelay().getSeconds(), equalTo(10L));
}

private void sleep(RetryInfo retryInfo) throws InterruptedException {
// make sure we let enough time pass by adding a few milliseconds on top
Thread.sleep((retryInfo.getRetryDelay().getNanos() / 1_000_000) + 200 );
}
}
12 changes: 12 additions & 0 deletions data-prepper-plugins/otel-logs-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ source:
* `none`: no compression
* `gzip`: apply GZip de-compression on the incoming request.

### Retry Information

Data Prepper replies with a `RetryInfo` specifying how long to wait for the next request in case backpressure builds up. The retry information is implemented as exponential backoff, with a max delay of `retry_info.max_delay`.

```yaml
source:
otel_trace_source:
retry_info:
min_delay: 1000ms # defaults to 100ms
max_delay: 5s # defaults to 2s
```
### SSL
* ssl(Optional) => A boolean enables TLS/SSL. Default is ```true```.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.otellogs;

import com.linecorp.armeria.common.grpc.GrpcExceptionHandlerFunction;
import com.linecorp.armeria.server.encoding.DecodingService;
import org.opensearch.dataprepper.GrpcRequestExceptionHandler;
import org.opensearch.dataprepper.plugins.codec.CompressionOption;
Expand Down Expand Up @@ -43,6 +44,7 @@

import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -54,14 +56,16 @@ public class OTelLogsSource implements Source<Record<Object>> {
private static final String PIPELINE_NAME_PLACEHOLDER = "${pipelineName}";
static final String SERVER_CONNECTIONS = "serverConnections";

// Default RetryInfo with minimum 100ms and maximum 2s
private static final RetryInfoConfig DEFAULT_RETRY_INFO = new RetryInfoConfig(Duration.ofMillis(100), Duration.ofMillis(2000));

private final OTelLogsSourceConfig oTelLogsSourceConfig;
private final String pipelineName;
private final PluginMetrics pluginMetrics;
private final GrpcAuthenticationProvider authenticationProvider;
private final CertificateProviderFactory certificateProviderFactory;
private final GrpcRequestExceptionHandler requestExceptionHandler;
private final ByteDecoder byteDecoder;
private Server server;
private ByteDecoder byteDecoder;

@DataPrepperPluginConstructor
public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
Expand All @@ -80,7 +84,6 @@ public OTelLogsSource(final OTelLogsSourceConfig oTelLogsSourceConfig,
this.certificateProviderFactory = certificateProviderFactory;
this.pipelineName = pipelineDescription.getPipelineName();
this.authenticationProvider = createAuthenticationProvider(pluginFactory);
this.requestExceptionHandler = new GrpcRequestExceptionHandler(pluginMetrics);
this.byteDecoder = new OTelLogsDecoder();
}

Expand Down Expand Up @@ -110,7 +113,7 @@ public void start(Buffer<Record<Object>> buffer) {
.builder()
.useClientTimeoutHeader(false)
.useBlockingTaskExecutor(true)
.exceptionMapping(requestExceptionHandler);
.exceptionHandler(createGrpExceptionHandler());

final MethodDescriptor<ExportLogsServiceRequest, ExportLogsServiceResponse> methodDescriptor = LogsServiceGrpc.getExportMethod();
final String oTelLogsSourcePath = oTelLogsSourceConfig.getPath();
Expand Down Expand Up @@ -205,6 +208,14 @@ public void stop() {
LOG.info("Stopped otel_logs_source.");
}

private GrpcExceptionHandlerFunction createGrpExceptionHandler() {
RetryInfoConfig retryInfo = oTelLogsSourceConfig.getRetryInfo() != null
? oTelLogsSourceConfig.getRetryInfo()
: DEFAULT_RETRY_INFO;

return new GrpcRequestExceptionHandler(pluginMetrics, retryInfo.getMinDelay(), retryInfo.getMaxDelay());
}

private List<ServerInterceptor> getAuthenticationInterceptor() {
final ServerInterceptor authenticationInterceptor = authenticationProvider.getAuthenticationInterceptor();
if (authenticationInterceptor == null) {
Expand Down
Loading

0 comments on commit 2595076

Please sign in to comment.