From a966bace718471ec296b675f0e3bbef77a81b2a8 Mon Sep 17 00:00:00 2001 From: Daniel Li <68623003+danhli@users.noreply.github.com> Date: Fri, 23 Aug 2024 11:59:12 -0700 Subject: [PATCH] Fix visibility timeout errors (#4812) (#4831) Fix visibility timeout errors (#4812) Signed-off-by: Daniel Li (cherry picked from commit 910533abb35d4fd7e8ad7a64e358fa1140f24e6f) --- .../plugins/source/s3/SqsWorker.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 3c5fba0701..2861ffa6d7 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -192,6 +192,8 @@ private ReceiveMessageRequest createReceiveMessageRequest() { private List processS3EventNotificationRecords(final Collection s3EventNotificationRecords) { final List deleteMessageBatchRequestEntryCollection = new ArrayList<>(); final List parsedMessagesToRead = new ArrayList<>(); + final Map messageAcknowledgementSetMap = new HashMap<>(); + final Map> messageWaitingForAcknowledgementsMap = new HashMap<>(); for (ParsedMessage parsedMessage : s3EventNotificationRecords) { if (parsedMessage.isFailedParsing()) { @@ -224,7 +226,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } LOG.info("Received {} messages from SQS. Processing {} messages.", s3EventNotificationRecords.size(), parsedMessagesToRead.size()); - + for (ParsedMessage parsedMessage : parsedMessagesToRead) { List waitingForAcknowledgements = new ArrayList<>(); AcknowledgementSet acknowledgementSet = null; @@ -262,7 +264,19 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { }, Duration.ofSeconds(progressCheckInterval)); } + messageAcknowledgementSetMap.put(parsedMessage, acknowledgementSet); + messageWaitingForAcknowledgementsMap.put(parsedMessage, waitingForAcknowledgements); } + } + + if (endToEndAcknowledgementsEnabled) { + LOG.debug("Created acknowledgement sets for {} messages.", parsedMessagesToRead.size()); + } + + // Use a separate loop for processing the S3 objects + for (ParsedMessage parsedMessage : parsedMessagesToRead) { + final AcknowledgementSet acknowledgementSet = messageAcknowledgementSetMap.get(parsedMessage); + final List waitingForAcknowledgements = messageWaitingForAcknowledgementsMap.get(parsedMessage); final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey()); final Optional deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet); if (endToEndAcknowledgementsEnabled) { @@ -271,7 +285,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } else { deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add); } - } + } return deleteMessageBatchRequestEntryCollection; }