From ed3f75ed3beedfb6d72af1f4b63ae0d085f72a0b Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Tue, 12 Mar 2024 10:48:38 -0500 Subject: [PATCH] =?UTF-8?q?Fix=20bug=20where=20process=20worker=20would=20?= =?UTF-8?q?shut=20down=20if=20a=20processor=20drops=20all=E2=80=A6=20(#426?= =?UTF-8?q?2)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Taylor Gray --- .../dataprepper/pipeline/ProcessWorker.java | 9 ++-- .../pipeline/ProcessWorkerTest.java | 43 +++++++++++++++++++ 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java index 7e27db0afd..141eee1829 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/pipeline/ProcessWorker.java @@ -9,12 +9,12 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.CheckpointState; import org.opensearch.dataprepper.model.buffer.Buffer; -import org.opensearch.dataprepper.model.processor.Processor; -import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; -import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.InternalEventHandle; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.pipeline.common.FutureHelper; import org.opensearch.dataprepper.pipeline.common.FutureHelperResult; import org.slf4j.Logger; @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.ArrayList; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; @@ -129,7 +128,7 @@ private void doRun() { List inputEvents = null; if (acknowledgementsEnabled) { - inputEvents = ((ArrayList>) records).stream().map(Record::getData).collect(Collectors.toList()); + inputEvents = ((List>) records).stream().map(Record::getData).collect(Collectors.toList()); } try { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java index 9b31b20691..3d13c0d49f 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/pipeline/ProcessWorkerTest.java @@ -208,4 +208,47 @@ void testProcessWorkerWithProcessorThrowingExceptionAndAcknowledgmentsEnabledIsH verify(skippedProcessor, never()).execute(any()); } + + @Test + void testProcessWorkerWithProcessorDroppingAllRecordsAndAcknowledgmentsEnabledIsHandledProperly() { + + when(source.areAcknowledgementsEnabled()).thenReturn(true); + + final List> records = new ArrayList<>(); + final Record mockRecord = mock(Record.class); + final Event mockEvent = mock(Event.class); + final EventHandle eventHandle = mock(DefaultEventHandle.class); + when(((DefaultEventHandle) eventHandle).getAcknowledgementSet()).thenReturn(mock(AcknowledgementSet.class)); + doNothing().when(eventHandle).release(true); + when(mockRecord.getData()).thenReturn(mockEvent); + when(mockEvent.getEventHandle()).thenReturn(eventHandle); + + records.add(mockRecord); + + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor = mock(Processor.class); + when(processor.execute(records)).thenReturn(Collections.emptyList()); + when(processor.isReadyForShutdown()).thenReturn(true); + + final Processor secondProcessor = mock(Processor.class); + when(secondProcessor.isReadyForShutdown()).thenReturn(true); + when(secondProcessor.execute(Collections.emptyList())).thenReturn(Collections.emptyList()); + processors = List.of(processor, secondProcessor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + + + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + } + } }