Skip to content

Commit

Permalink
Handle errors from OpenSearch by checking status field as well as err…
Browse files Browse the repository at this point in the history
…or (#4335)

Handle errors from OpenSearch by checking both the status field and the error body for each bulk response item.

Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable committed Mar 26, 2024
1 parent 533b995 commit 3c7cb8e
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.google.common.collect.ImmutableList;
import com.linecorp.armeria.client.retry.Backoff;
import io.micrometer.core.instrument.Counter;
import org.opensearch.client.opensearch._types.ErrorCause;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
Expand Down Expand Up @@ -214,7 +215,7 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte

public boolean canRetry(final BulkResponse response) {
for (final BulkResponseItem bulkItemResponse : response.items()) {
if (bulkItemResponse.error() != null && !NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
if (isItemInError(bulkItemResponse) && !NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
return true;
}
}
Expand All @@ -234,7 +235,7 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest);
if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (bulkItemResponse.error() == null) {
if (!isItemInError(bulkItemResponse)) {
sentDocumentsOnFirstAttemptCounter.increment();
}
}
Expand All @@ -244,8 +245,9 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, exceptionFromRequest);
if (exceptionFromRequest == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
if (bulkItemResponse.error() != null) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
if(isItemInError(bulkItemResponse)) {
final ErrorCause error = bulkItemResponse.error();
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), error != null ? error.reason() : "");
}
}
}
Expand All @@ -261,9 +263,13 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating
private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest, final BulkResponse bulkResponse, final Throwable failure) {
if (failure == null) {
for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) {
// Skip logging the error for version conflicts
if (bulkItemResponse.error() != null && !VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason());
if(isItemInError(bulkItemResponse)) {
// Skip logging the error for version conflicts
final ErrorCause error = bulkItemResponse.error();
if (error != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(error.type())) {
continue;
}
LOG.warn("operation = {}, status = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.status(), error != null ? error.reason() : "");
}
}
handleFailures(bulkRequest, bulkResponse.items());
Expand Down Expand Up @@ -315,10 +321,10 @@ private AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> createBulkReq
for (final BulkResponseItem bulkItemResponse : response.items()) {
BulkOperationWrapper bulkOperation =
(BulkOperationWrapper)request.getOperationAt(index);
if (bulkItemResponse.error() != null) {
if (isItemInError(bulkItemResponse)) {
if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) {
requestToReissue.addOperation(bulkOperation);
} else if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
} else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
Expand Down Expand Up @@ -349,8 +355,8 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
for (int i = 0; i < itemResponses.size(); i++) {
final BulkResponseItem bulkItemResponse = itemResponses.get(i);
final BulkOperationWrapper bulkOperation = accumulatingBulkRequest.getOperationAt(i);
if (bulkItemResponse.error() != null) {
if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
if (isItemInError(bulkItemResponse)) {
if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) {
documentsVersionConflictErrors.increment();
LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason());
bulkOperation.releaseEventHandle(true);
Expand Down Expand Up @@ -384,4 +390,7 @@ private void handleFailures(final AccumulatingBulkRequest<BulkOperationWrapper,
logFailure.accept(failures.build(), failure);
}

private static boolean isItemInError(final BulkResponseItem item) {
return item.status() >= 300 || item.error() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ public void testExecuteNonRetryableResponse() throws Exception {
}

@Test
void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status() throws Exception {
void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status_when_error_is_provided() throws Exception {
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction = mock(RequestFunction.class);
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier = mock(Supplier.class);

Expand All @@ -503,6 +503,74 @@ void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retrya
final BulkResponseItem responseItem = mock(BulkResponseItem.class);

when(responseItem.error()).thenReturn(mock(ErrorCause.class));
responseItems.add(responseItem);
}
final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> bulkRequest = mock(AccumulatingBulkRequest.class);
when(bulkRequest.getOperationsCount()).thenReturn(operations.size());
when(bulkRequest.getOperationAt(anyInt())).thenAnswer(a -> operations.get(a.getArgument(0)));

final BulkResponse allFailingItemsResponse = mock(BulkResponse.class);
when(allFailingItemsResponse.errors()).thenReturn(true);
when(allFailingItemsResponse.items()).thenReturn(responseItems);


final AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest> reissueRequest = mock(AccumulatingBulkRequest.class);
when(reissueRequest.getOperationsCount()).thenReturn(operations.size());
when(reissueRequest.getOperationAt(anyInt())).thenAnswer(a -> operations.get(a.getArgument(0)));
when(reissueRequest.getOperations()).thenReturn(operations);

when(requestFunction.apply(bulkRequest)).thenReturn(allFailingItemsResponse);
when(requestFunction.apply(reissueRequest)).thenReturn(allFailingItemsResponse);

when(bulkRequestSupplier.get()).thenReturn(reissueRequest);

objectUnderTest.execute(bulkRequest);

for (int i = 0; i < operations.size(); i++) {
final BulkOperationWrapper operation = operations.get(i);
verify(reissueRequest, times(maxRetries - 1)).addOperation(operation);
verify(reissueRequest, times(1)).getOperationAt(i);
}
verify(reissueRequest, times(maxRetries)).getOperationsCount();
verify(reissueRequest).getOperations();
verifyNoMoreInteractions(reissueRequest);

final ArgumentCaptor<List<FailedBulkOperation>> actualFailedOperationsCaptor = ArgumentCaptor.forClass(List.class);
verify(logFailureConsumer).accept(actualFailedOperationsCaptor.capture(), any());

final List<FailedBulkOperation> failedBulkOperations = actualFailedOperationsCaptor.getValue();
assertThat(failedBulkOperations.size(), equalTo(operations.size()));
for (int i = 0; i < operations.size(); i++) {
final FailedBulkOperation failedBulkOperation = failedBulkOperations.get(i);
final BulkOperationWrapper operation = operations.get(i);

assertThat(failedBulkOperation, notNullValue());
assertThat(failedBulkOperation.getBulkOperation(), equalTo(operation));
assertThat(failedBulkOperation.getFailure(), notNullValue());
}

verifyNoMoreInteractions(logFailureConsumer);
}

@Test
void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status_when_no_error() throws Exception {
final RequestFunction<AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>, BulkResponse> requestFunction = mock(RequestFunction.class);
final Supplier<AccumulatingBulkRequest> bulkRequestSupplier = mock(Supplier.class);

final int maxRetries = 3;
final BulkRetryStrategy objectUnderTest = createObjectUnderTest(
requestFunction, logFailureConsumer, maxRetries,
bulkRequestSupplier);

final List<BulkOperationWrapper> operations = new ArrayList<>();
final List<BulkResponseItem> responseItems = new ArrayList<>();

for (int i = 0; i < 5; i++) {
final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class);
operations.add(bulkOperationWrapper);

final BulkResponseItem responseItem = mock(BulkResponseItem.class);

when(responseItem.status()).thenReturn(403);
responseItems.add(responseItem);
}
Expand Down Expand Up @@ -554,7 +622,9 @@ void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retrya
}

private static BulkResponseItem successItemResponse(final String index) {
return mock(BulkResponseItem.class);
final BulkResponseItem response = mock(BulkResponseItem.class);
lenient().when(response.status()).thenReturn(200);
return response;
}

private static BulkResponseItem badRequestItemResponse(final String index) {
Expand Down

0 comments on commit 3c7cb8e

Please sign in to comment.