Skip to content

Commit

Permalink
Fix bug where race condition on ack callback could cause S3 folder pa…
Browse files Browse the repository at this point in the history
…rtition to not be given up (#4835)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Aug 19, 2024
1 parent 04098b3 commit 9a82590
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,6 @@ private void processFolderPartition(final SourcePartition<S3SourceProgressState>
sourceCoordinator.saveProgressStateForPartition(folderPartition.getPartitionKey(), folderPartitionState.get());

processObjectsForFolderPartition(objectsToProcess, folderPartition);

sourceCoordinator.updatePartitionForAcknowledgmentWait(folderPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT);
}

private List<S3ObjectReference> getObjectsForPrefix(final String bucket, final String s3Prefix) {
Expand Down Expand Up @@ -364,7 +362,8 @@ private void processObjectsForFolderPartition(final List<S3ObjectReference> obje
objectIndex++;
}

// Complete the final acknowledgment set
sourceCoordinator.updatePartitionForAcknowledgmentWait(folderPartition.getPartitionKey(), ACKNOWLEDGEMENT_SET_TIMEOUT);

if (acknowledgementSet != null) {
acknowledgementSet.complete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ void processing_with_folder_partition_processes_objects_in_folder_and_deletes_th
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, firstObject.key());
inOrder.verify(acknowledgementSet1).complete();
inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, secondObject.key());
inOrder.verify(acknowledgementSet2).complete();
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
inOrder.verify(acknowledgementSet2).complete();

final Consumer<Boolean> firstAckCallback = ackCallbacks.get(0);
firstAckCallback.accept(true);
Expand Down Expand Up @@ -532,8 +532,8 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje
final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet1, s3ObjectDeleteWorker);

inOrder.verify(s3ObjectDeleteWorker).buildDeleteObjectRequest(bucket, firstObject.key());
inOrder.verify(acknowledgementSet1).complete();
inOrder.verify(sourceCoordinator).updatePartitionForAcknowledgmentWait(partitionKey, ACKNOWLEDGEMENT_SET_TIMEOUT);
inOrder.verify(acknowledgementSet1).complete();

final Consumer<Boolean> ackCallback = consumerArgumentCaptor.getValue();
ackCallback.accept(true);
Expand Down

0 comments on commit 9a82590

Please sign in to comment.