diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 3c5fba0701..2861ffa6d7 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -192,6 +192,8 @@ private ReceiveMessageRequest createReceiveMessageRequest() { private List processS3EventNotificationRecords(final Collection s3EventNotificationRecords) { final List deleteMessageBatchRequestEntryCollection = new ArrayList<>(); final List parsedMessagesToRead = new ArrayList<>(); + final Map messageAcknowledgementSetMap = new HashMap<>(); + final Map> messageWaitingForAcknowledgementsMap = new HashMap<>(); for (ParsedMessage parsedMessage : s3EventNotificationRecords) { if (parsedMessage.isFailedParsing()) { @@ -224,7 +226,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } LOG.info("Received {} messages from SQS. Processing {} messages.", s3EventNotificationRecords.size(), parsedMessagesToRead.size()); - + for (ParsedMessage parsedMessage : parsedMessagesToRead) { List waitingForAcknowledgements = new ArrayList<>(); AcknowledgementSet acknowledgementSet = null; @@ -262,7 +264,19 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { }, Duration.ofSeconds(progressCheckInterval)); } + messageAcknowledgementSetMap.put(parsedMessage, acknowledgementSet); + messageWaitingForAcknowledgementsMap.put(parsedMessage, waitingForAcknowledgements); } + } + + if (endToEndAcknowledgementsEnabled) { + LOG.debug("Created acknowledgement sets for {} messages.", parsedMessagesToRead.size()); + } + + // Use a separate loop for processing the S3 objects + for (ParsedMessage parsedMessage : parsedMessagesToRead) { + final AcknowledgementSet acknowledgementSet = messageAcknowledgementSetMap.get(parsedMessage); + final List waitingForAcknowledgements = messageWaitingForAcknowledgementsMap.get(parsedMessage); final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey()); final Optional deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet); if (endToEndAcknowledgementsEnabled) { @@ -271,7 +285,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } else { deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add); } - } + } return deleteMessageBatchRequestEntryCollection; }