Skip to content

Commit

Permalink
Fix bug where process worker would shut down if a processor drops all… (
Browse files Browse the repository at this point in the history
#4262)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Mar 12, 2024
1 parent aa40256 commit ed3f75e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.processor.Processor;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.pipeline.common.FutureHelperResult;
import org.slf4j.Logger;
Expand All @@ -23,7 +23,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -129,7 +128,7 @@ private void doRun() {

List<Event> inputEvents = null;
if (acknowledgementsEnabled) {
inputEvents = ((ArrayList<Record<Event>>) records).stream().map(Record::getData).collect(Collectors.toList());
inputEvents = ((List<Record<Event>>) records).stream().map(Record::getData).collect(Collectors.toList());
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,47 @@ void testProcessWorkerWithProcessorThrowingExceptionAndAcknowledgmentsEnabledIsH

verify(skippedProcessor, never()).execute(any());
}

@Test
void testProcessWorkerWithProcessorDroppingAllRecordsAndAcknowledgmentsEnabledIsHandledProperly() {

when(source.areAcknowledgementsEnabled()).thenReturn(true);

final List<Record<Event>> records = new ArrayList<>();
final Record<Event> mockRecord = mock(Record.class);
final Event mockEvent = mock(Event.class);
final EventHandle eventHandle = mock(DefaultEventHandle.class);
when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class));
doNothing().when(eventHandle).release(true);
when(mockRecord.getData()).thenReturn(mockEvent);
when(mockEvent.getEventHandle()).thenReturn(eventHandle);

records.add(mockRecord);

final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor = mock(Processor.class);
when(processor.execute(records)).thenReturn(Collections.emptyList());
when(processor.isReadyForShutdown()).thenReturn(true);

final Processor secondProcessor = mock(Processor.class);
when(secondProcessor.isReadyForShutdown()).thenReturn(true);
when(secondProcessor.execute(Collections.emptyList())).thenReturn(Collections.emptyList());
processors = List.of(processor, secondProcessor);

final FutureHelperResult<Void> futureHelperResult = mock(FutureHelperResult.class);
when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList());


try (final MockedStatic<FutureHelper> futureHelperMockedStatic = mockStatic(FutureHelper.class)) {
futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures))
.thenReturn(futureHelperResult);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();
}
}
}

0 comments on commit ed3f75e

Please sign in to comment.