diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index cc9c41f103..aca748b3ef 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -128,9 +128,11 @@ public final class BulkRetryStrategy { static class BulkOperationRequestResponse { final AccumulatingBulkRequest bulkRequest; final BulkResponse response; - public BulkOperationRequestResponse(final AccumulatingBulkRequest bulkRequest, final BulkResponse response) { + final Exception exception; + public BulkOperationRequestResponse(final AccumulatingBulkRequest bulkRequest, final BulkResponse response, final Exception exception) { this.bulkRequest = bulkRequest; this.response = response; + this.exception = exception; } AccumulatingBulkRequest getBulkRequest() { return bulkRequest; @@ -138,6 +140,9 @@ AccumulatingBulkRequest getBulkRequest() { BulkResponse getResponse() { return response; } + String getExceptionMessage() { + return exception != null ? exception.getMessage() : "-"; + } } public BulkRetryStrategy(final RequestFunction, BulkResponse> requestFunction, @@ -200,10 +205,12 @@ public void execute(final AccumulatingBulkRequest bulkRequest) throws Interrupte operationResponse = handleRetry(request, response, attempt); if (operationResponse != null) { final long delayMillis = backoff.nextDelayMillis(attempt++); + String exceptionMessage = ""; request = operationResponse.getBulkRequest(); response = operationResponse.getResponse(); + exceptionMessage = operationResponse.getExceptionMessage(); if (delayMillis < 0) { - RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d)", maxRetries)); + RuntimeException e = new RuntimeException(String.format("Number of retries reached the limit of max retries (configured value %d. Last exception message: %s)", maxRetries, exceptionMessage)); handleFailures(request, null, e); break; } @@ -251,13 +258,13 @@ private BulkOperationRequestResponse handleRetriesAndFailures(final Accumulating for (final BulkResponseItem bulkItemResponse : bulkResponse.items()) { if(isItemInError(bulkItemResponse)) { final ErrorCause error = bulkItemResponse.error(); - LOG.warn("operation = {}, error = {}", bulkItemResponse.operationType(), error != null ? error.reason() : ""); + LOG.warn("index = {} operation = {}, error = {}", bulkItemResponse.index(), bulkItemResponse.operationType(), error != null ? error.reason() : ""); } } } } bulkRequestNumberOfRetries.increment(); - return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse); + return new BulkOperationRequestResponse(bulkRequestForRetry, bulkResponse, exceptionFromRequest); } else { handleFailures(bulkRequestForRetry, bulkResponse, exceptionFromRequest); } @@ -273,7 +280,7 @@ private void handleFailures(final AccumulatingBulkRequest createBulkReq requestToReissue.addOperation(bulkOperation); } else if (bulkItemResponse.error() != null && VERSION_CONFLICT_EXCEPTION_TYPE.equals(bulkItemResponse.error().type())) { documentsVersionConflictErrors.increment(); - LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason()); + LOG.debug("Index: {}, Received version conflict from OpenSearch: {}", bulkItemResponse.index(), bulkItemResponse.error().reason()); bulkOperation.releaseEventHandle(true); } else { nonRetryableFailures.add(FailedBulkOperation.builder() @@ -368,7 +375,7 @@ private void handleFailures(final AccumulatingBulkRequest