From 0ab795aa54da26c52dfdf45afc81a1fca58aa8b8 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Tue, 13 Aug 2024 21:52:01 +0000 Subject: [PATCH 1/2] Fix visibility timeout errors (#4812) Signed-off-by: Daniel Li --- .../plugins/source/s3/SqsWorker.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 3c5fba0701..15abb6a288 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -192,6 +192,8 @@ private ReceiveMessageRequest createReceiveMessageRequest() { private List processS3EventNotificationRecords(final Collection s3EventNotificationRecords) { final List deleteMessageBatchRequestEntryCollection = new ArrayList<>(); final List parsedMessagesToRead = new ArrayList<>(); + final Map messageAcknowledgementSetMap = new HashMap<>(); + final Map> messageWaitingForAcknowledgementsMap = new HashMap<>(); for (ParsedMessage parsedMessage : s3EventNotificationRecords) { if (parsedMessage.isFailedParsing()) { @@ -224,7 +226,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } LOG.info("Received {} messages from SQS. Processing {} messages.", s3EventNotificationRecords.size(), parsedMessagesToRead.size()); - + for (ParsedMessage parsedMessage : parsedMessagesToRead) { List waitingForAcknowledgements = new ArrayList<>(); AcknowledgementSet acknowledgementSet = null; @@ -262,7 +264,19 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { }, Duration.ofSeconds(progressCheckInterval)); } + messageAcknowledgementSetMap.put(parsedMessage, acknowledgementSet); + messageWaitingForAcknowledgementsMap.put(parsedMessage, waitingForAcknowledgements); } + } + + if (endToEndAcknowledgementsEnabled) { + LOG.info("Created acknowledgement sets for {} messages.", parsedMessagesToRead.size()); + } + + // Use a separate loop for processing the S3 objects + for (ParsedMessage parsedMessage : parsedMessagesToRead) { + final AcknowledgementSet acknowledgementSet = messageAcknowledgementSetMap.get(parsedMessage); + final List waitingForAcknowledgements = messageWaitingForAcknowledgementsMap.get(parsedMessage); final S3ObjectReference s3ObjectReference = populateS3Reference(parsedMessage.getBucketName(), parsedMessage.getObjectKey()); final Optional deleteMessageBatchRequestEntry = processS3Object(parsedMessage, s3ObjectReference, acknowledgementSet); if (endToEndAcknowledgementsEnabled) { @@ -271,7 +285,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } else { deleteMessageBatchRequestEntry.ifPresent(deleteMessageBatchRequestEntryCollection::add); } - } + } return deleteMessageBatchRequestEntryCollection; } From a63230b62dc44ffeb3808d0172ef298766c650e2 Mon Sep 17 00:00:00 2001 From: Daniel Li Date: Mon, 19 Aug 2024 23:08:26 +0000 Subject: [PATCH 2/2] Change log level from info to debug (#4812) Signed-off-by: Daniel Li --- .../org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java index 15abb6a288..2861ffa6d7 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorker.java @@ -270,7 +270,7 @@ && isEventBridgeEventTypeCreated(parsedMessage)) { } if (endToEndAcknowledgementsEnabled) { - LOG.info("Created acknowledgement sets for {} messages.", parsedMessagesToRead.size()); + LOG.debug("Created acknowledgement sets for {} messages.", parsedMessagesToRead.size()); } // Use a separate loop for processing the S3 objects