Skip to content

Commit

Permalink
Add acknowledgment progress check for s3 source with folder partitions (
Browse files Browse the repository at this point in the history
opensearch-project#4957)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Sep 19, 2024
1 parent ed9f0c8 commit ce9cd64
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,7 @@ private void startProcessingObject(final long waitTimeMillis) {
partitionKeys.remove(objectToProcess.get().getPartitionKey());
}, ACKNOWLEDGEMENT_SET_TIMEOUT);

acknowledgementSet.addProgressCheck(
(ratio) -> {
try {
sourceCoordinator.renewPartitionOwnership(objectToProcess.get().getPartitionKey());
} catch (final PartitionUpdateException | PartitionNotOwnedException | PartitionNotFoundException e) {
LOG.debug("Failed to update partition ownership for {} in the acknowledgment progress check", objectToProcess.get().getPartitionKey());
partitionOwnershipUpdateFailures.increment();
}
},
CHECKPOINT_OWNERSHIP_INTERVAL);
addProgressCheck(acknowledgementSet, objectToProcess.get());
}


Expand Down Expand Up @@ -360,6 +351,7 @@ private void processObjectsForFolderPartition(final List<S3ObjectReference> obje
activeAcknowledgmentSetId = acknowledgmentSetId;

acknowledgementSet = createAcknowledgmentSetForFolderPartition(folderPartition, acknowledgmentSetId);
addProgressCheck(acknowledgementSet, folderPartition);

objectsToDeleteForAcknowledgmentSets.put(acknowledgmentSetId, new HashSet<>());

Expand Down Expand Up @@ -412,4 +404,17 @@ private AcknowledgementSet createAcknowledgmentSetForFolderPartition(final Sourc
}
}, ACKNOWLEDGEMENT_SET_TIMEOUT);
}

private void addProgressCheck(final AcknowledgementSet acknowledgementSet, final SourcePartition<S3SourceProgressState> objectToProcess) {
acknowledgementSet.addProgressCheck(
(ratio) -> {
try {
sourceCoordinator.renewPartitionOwnership(objectToProcess.getPartitionKey());
} catch (final PartitionUpdateException | PartitionNotOwnedException | PartitionNotFoundException e) {
LOG.debug("Failed to update partition ownership for {} in the acknowledgment progress check", objectToProcess.getPartitionKey());
partitionOwnershipUpdateFailures.increment();
}
},
CHECKPOINT_OWNERSHIP_INTERVAL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,9 @@ void processing_with_folder_partition_processes_objects_in_folder_and_deletes_th
.thenReturn(acknowledgementSet1)
.thenReturn(acknowledgementSet2);

doNothing().when(acknowledgementSet1).addProgressCheck(any(Consumer.class), eq(CHECKPOINT_OWNERSHIP_INTERVAL));
doNothing().when(acknowledgementSet2).addProgressCheck(any(Consumer.class), eq(CHECKPOINT_OWNERSHIP_INTERVAL));

doNothing().when(s3ObjectDeleteWorker).deleteS3Object(any(DeleteObjectRequest.class));
doNothing().when(s3ObjectHandler).parseS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class), eq(sourceCoordinator), eq(partitionKey));

Expand Down Expand Up @@ -589,6 +592,7 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje

when(acknowledgementSetManager.create(any(Consumer.class), any(Duration.class)))
.thenReturn(acknowledgementSet1);
doNothing().when(acknowledgementSet1).addProgressCheck(any(Consumer.class), eq(CHECKPOINT_OWNERSHIP_INTERVAL));

doNothing().when(s3ObjectDeleteWorker).deleteS3Object(any(DeleteObjectRequest.class));
doNothing().when(s3ObjectHandler).parseS3Object(any(S3ObjectReference.class), any(AcknowledgementSet.class), eq(sourceCoordinator), eq(partitionKey));
Expand All @@ -601,6 +605,13 @@ void processing_with_folder_partition_processes_objects_in_folder_until_max_obje
final ArgumentCaptor<Consumer> consumerArgumentCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(acknowledgementSetManager, times(1)).create(consumerArgumentCaptor.capture(), any(Duration.class));

final ArgumentCaptor<Consumer> progressCheckArgumentCaptor = ArgumentCaptor.forClass(Consumer.class);
verify(acknowledgementSet1).addProgressCheck(progressCheckArgumentCaptor.capture(), eq(CHECKPOINT_OWNERSHIP_INTERVAL));

final Consumer<ProgressCheck> progressCheckConsumer = progressCheckArgumentCaptor.getValue();
progressCheckConsumer.accept(mock(ProgressCheck.class));
verify(sourceCoordinator).renewPartitionOwnership(partitionKey);


final InOrder inOrder = inOrder(sourceCoordinator, acknowledgementSet1, s3ObjectDeleteWorker);

Expand Down

0 comments on commit ce9cd64

Please sign in to comment.