diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 2c629e32a6..9db9a11edc 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -9,6 +9,7 @@ import com.google.common.collect.ImmutableList; import com.linecorp.armeria.client.retry.Backoff; import io.micrometer.core.instrument.Counter; +import org.opensearch.client.opensearch._types.ErrorCause; import org.opensearch.client.opensearch._types.OpenSearchException; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.BulkResponse; @@ -214,7 +215,7 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte public boolean canRetry(final BulkResponse response) { for (final BulkResponseItem bulkItemResponse : response.items()) { - if (bulkItemResponse.error() != null && !NON_RETRY_STATUS.contains(bulkItemResponse.status())) { + if (isItemInError(bulkItemResponse) && !NON_RETRY_STATUS.contains(bulkItemResponse.status())) { return true; } } @@ -234,7 +235,7 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating final boolean doRetry = (Objects.isNull(exceptionFromRequest)) ? canRetry(bulkResponse) : canRetry(exceptionFromRequest); if (!Objects.isNull(bulkResponse) && retryCount == 1) { // first attempt for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { - if (bulkItemResponse.error() == null) { + if (!isItemInError(bulkItemResponse)) { sentDocumentsOnFirstAttemptCounter.increment(); } } @@ -244,8 +245,9 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating LOG.warn("Bulk Operation Failed. Number of retries {}. Retrying... ", retryCount, exceptionFromRequest); if (exceptionFromRequest == null) { for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { - if (bulkItemResponse.error() != null) { - LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason()); + if(isItemInError(bulkItemResponse)) { + final ErrorCause error = bulkItemResponse.error(); + LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), error != null ? error.reason() : ""); } } } @@ -261,9 +263,13 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating private void handleFailures(final AccumulatingBulkRequest bulkRequest, final BulkResponse bulkResponse, final Throwable failure) { if (failure == null) { for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { - // Skip logging the error for version conflicts - if (bulkItemResponse.error() != null && !VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { - LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.error().reason()); + if(isItemInError(bulkItemResponse)) { + // Skip logging the error for version conflicts + final ErrorCause error = bulkItemResponse.error(); + if (error != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(error.type())) { + continue; + } + LOG.warn("operation = {}, status = {}, error = {}", bulkItemResponse.operationType(), bulkItemResponse.status(), error != null ? error.reason() : ""); } } handleFailures(bulkRequest, bulkResponse.items()); @@ -315,10 +321,10 @@ private AccumulatingBulkRequest createBulkReq for (final BulkResponseItem bulkItemResponse : response.items()) { BulkOperationWrapper bulkOperation = (BulkOperationWrapper)request.getOperationAt(index); - if (bulkItemResponse.error() != null) { + if (isItemInError(bulkItemResponse)) { if (!NON_RETRY_STATUS.contains(bulkItemResponse.status())) { requestToReissue.addOperation(bulkOperation); - } else if (VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { + } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason()); bulkOperation.releaseEventHandle(true); @@ -349,8 +355,8 @@ private void handleFailures(final AccumulatingBulkRequest= 300 || item.error() != null; + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java index 7bb1acec1e..ae8eaefa7e 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategyTests.java @@ -484,7 +484,7 @@ public void testExecuteNonRetryableResponse() throws Exception { } @Test - void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status() throws Exception { + void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status_when_error_is_provided() throws Exception { final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); final Supplier bulkRequestSupplier = mock(Supplier.class); @@ -503,6 +503,74 @@ void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retrya final BulkResponseItem responseItem = mock(BulkResponseItem.class); when(responseItem.error()).thenReturn(mock(ErrorCause.class)); + responseItems.add(responseItem); + } + final AccumulatingBulkRequest bulkRequest = mock(AccumulatingBulkRequest.class); + when(bulkRequest.getOperationsCount()).thenReturn(operations.size()); + when(bulkRequest.getOperationAt(anyInt())).thenAnswer(a -> operations.get(a.getArgument(0))); + + final BulkResponse allFailingItemsResponse = mock(BulkResponse.class); + when(allFailingItemsResponse.errors()).thenReturn(true); + when(allFailingItemsResponse.items()).thenReturn(responseItems); + + + final AccumulatingBulkRequest reissueRequest = mock(AccumulatingBulkRequest.class); + when(reissueRequest.getOperationsCount()).thenReturn(operations.size()); + when(reissueRequest.getOperationAt(anyInt())).thenAnswer(a -> operations.get(a.getArgument(0))); + when(reissueRequest.getOperations()).thenReturn(operations); + + when(requestFunction.apply(bulkRequest)).thenReturn(allFailingItemsResponse); + when(requestFunction.apply(reissueRequest)).thenReturn(allFailingItemsResponse); + + when(bulkRequestSupplier.get()).thenReturn(reissueRequest); + + objectUnderTest.execute(bulkRequest); + + for (int i = 0; i < operations.size(); i++) { + final BulkOperationWrapper operation = operations.get(i); + verify(reissueRequest, times(maxRetries - 1)).addOperation(operation); + verify(reissueRequest, times(1)).getOperationAt(i); + } + verify(reissueRequest, times(maxRetries)).getOperationsCount(); + verify(reissueRequest).getOperations(); + verifyNoMoreInteractions(reissueRequest); + + final ArgumentCaptor> actualFailedOperationsCaptor = ArgumentCaptor.forClass(List.class); + verify(logFailureConsumer).accept(actualFailedOperationsCaptor.capture(), any()); + + final List failedBulkOperations = actualFailedOperationsCaptor.getValue(); + assertThat(failedBulkOperations.size(), equalTo(operations.size())); + for (int i = 0; i < operations.size(); i++) { + final FailedBulkOperation failedBulkOperation = failedBulkOperations.get(i); + final BulkOperationWrapper operation = operations.get(i); + + assertThat(failedBulkOperation, notNullValue()); + assertThat(failedBulkOperation.getBulkOperation(), equalTo(operation)); + assertThat(failedBulkOperation.getFailure(), notNullValue()); + } + + verifyNoMoreInteractions(logFailureConsumer); + } + + @Test + void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retryable_status_when_no_error() throws Exception { + final RequestFunction, BulkResponse> requestFunction = mock(RequestFunction.class); + final Supplier bulkRequestSupplier = mock(Supplier.class); + + final int maxRetries = 3; + final BulkRetryStrategy objectUnderTest = createObjectUnderTest( + requestFunction, logFailureConsumer, maxRetries, + bulkRequestSupplier); + + final List operations = new ArrayList<>(); + final List responseItems = new ArrayList<>(); + + for (int i = 0; i < 5; i++) { + final BulkOperationWrapper bulkOperationWrapper = mock(BulkOperationWrapper.class); + operations.add(bulkOperationWrapper); + + final BulkResponseItem responseItem = mock(BulkResponseItem.class); + when(responseItem.status()).thenReturn(403); responseItems.add(responseItem); } @@ -554,7 +622,9 @@ void execute_will_not_send_messages_to_logWriter_when_all_items_fail_with_retrya } private static BulkResponseItem successItemResponse(final String index) { - return mock(BulkResponseItem.class); + final BulkResponseItem response = mock(BulkResponseItem.class); + lenient().when(response.status()).thenReturn(200); + return response; } private static BulkResponseItem badRequestItemResponse(final String index) {