diff --git a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java index 3cddb1a2e8..471a0efa3d 100644 --- a/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/ScanObjectWorker.java @@ -280,8 +280,6 @@ private void processFolderPartition(final SourcePartition sourceCoordinator.saveProgressStateForPartition(folderPartition.getPartitionKey(), folderPartitionState.get()); processObjectsForFolderPartition(objectsToProcess, folderPartition); - - sourceCoordinator.updatePartitionForAcknowledgmentWait(folderPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT); } private List getObjectsForPrefix(final String bucket, final String s3Prefix) { @@ -364,7 +362,8 @@ private void processObjectsForFolderPartition(final List obje objectIndex++; } - // Complete the final acknowledgment set + sourceCoordinator.updatePartitionForAcknowledgmentWait(folderPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT); + if (acknowledgementSet != null) { acknowledgementSet.complete(); } diff --git a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java index a02fec1af4..fa1645db8d 100644 --- a/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerTest.java @@ -460,8 +460,8 @@ void processing_with_folder_partition_processes_objects_in_folder_and_deletes_th inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, firstObject.key()); inOrder.verify(acknowledgementSet1).complete(); inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, secondObject.key()); - inOrder.verify(acknowledgementSet2).complete(); inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT); + inOrder.verify(acknowledgementSet2).complete(); final Consumer firstAckCallback = ackCallbacks.get(0); firstAckCallback.accept(true); @@ -532,8 +532,8 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet1, s3ObjectDeleteWorker); inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, firstObject.key()); - inOrder.verify(acknowledgementSet1).complete(); inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT); + inOrder.verify(acknowledgementSet1).complete(); final Consumer ackCallback = consumerArgumentCaptor.getValue(); ackCallback.accept(true);