diff --git a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java index 828f70029..c8e15cb17 100644 --- a/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java +++ b/connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java @@ -11,6 +11,7 @@ import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable; import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext; import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; +import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset; import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset; import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord; @@ -89,6 +90,18 @@ public RecordsIterable> apply(final RecordsIterable> flushed = node + .flush(newContextFor(FileObjectOffset::empty, fileObjectObject.metadata())); + results.addAll(flushed); + node = node.onSuccess; + } + } + return new RecordsIterable<>(results); } diff --git a/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java b/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java index 5061a3cb5..d05ffb606 100644 --- a/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java +++ b/connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.kafka.common.config.ConfigDef; import org.junit.Test; +import org.junit.jupiter.api.Assertions; public class DefaultRecordFilterPipelineTest { @@ -190,6 +191,101 @@ public void shouldNotFlushBufferedRecordsGivenNoAcceptFilterAndThereIsNoRemainin assertEquals(record2, records.collect().get(0)); } + @Test + public void shouldFlushBufferedRecordsGivenAcceptFilterEmptyRecordsIterableAndNoRemainingRecords() { + + final FileRecord record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1"); + final FileRecord record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2"); + + List> bufferedRecords = List.of(record1, record2); + TestFilter filter1 = new TestFilter() + .setBuffer(bufferedRecords); + + DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(Collections.singletonList(filter1)); + pipeline.init(context); + + RecordsIterable> records = pipeline.apply(new RecordsIterable<>(), false); + + assertNotNull(records); + List> filteredRecords = records.collect(); + Assertions.assertIterableEquals(bufferedRecords, filteredRecords); + } + + @Test + public void shouldFlushBufferedRecordsFromFirstFilterGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() { + + final FileRecord record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1"); + final FileRecord record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2"); + + List> bufferedRecords = List.of(record1, record2); + TestFilter filter1 = new TestFilter() + .setBuffer(bufferedRecords); + TestFilter filter2 = new TestFilter() + .setFunction(((context1, record, hasNext) -> RecordsIterable.of(record))); + + DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2)); + pipeline.init(context); + + RecordsIterable> records = pipeline.apply(new RecordsIterable<>(), false); + + assertNotNull(records); + List> filteredRecords = records.collect(); + Assertions.assertIterableEquals(bufferedRecords, filteredRecords); + } + + @Test + public void shouldFlushBufferedRecordsFromLastFilterGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() { + + final FileRecord record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1"); + final FileRecord record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2"); + + List> bufferedRecords = List.of(record1, record2); + TestFilter filter1 = new TestFilter(); + TestFilter filter2 = new TestFilter() + .setBuffer(bufferedRecords); + DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2)); + pipeline.init(context); + + RecordsIterable> records = pipeline.apply(new RecordsIterable<>(), false); + + assertNotNull(records); + List> filteredRecords = records.collect(); + Assertions.assertIterableEquals(bufferedRecords, filteredRecords); + } + + @Test + public void shouldFlushBufferedRecordsFromAllFiltersGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() { + + final FileRecord record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1"); + final FileRecord record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2"); + final FileRecord record3 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value3"); + final FileRecord record4 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value4"); + final FileRecord record5 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value5"); + + List> allBuffered = List.of(record1, record2, record3, record4, record5); + + List> bufferedRecords1 = List.of(record1); + List> bufferedRecords2 = List.of(record2, record3); + List> bufferedRecords3 = List.of(record4, record5); + TestFilter filter1 = new TestFilter() + .setFunction(((context1, record, hasNext) -> RecordsIterable.of(record))) + .setBuffer(bufferedRecords1); + TestFilter filter2 = new TestFilter() + .setFunction(((context1, record, hasNext) -> RecordsIterable.of(record))) + .setBuffer(bufferedRecords2); + TestFilter filter3 = new TestFilter() + .setFunction(((context1, record, hasNext) -> RecordsIterable.of(record))) + .setBuffer(bufferedRecords3); + DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2, filter3)); + pipeline.init(context); + + RecordsIterable> records = pipeline.apply(new RecordsIterable<>(), false); + + assertNotNull(records); + List> filteredRecords = records.collect(); + Assertions.assertIterableEquals(allBuffered, filteredRecords); + } + @Test public void shouldReturnRecordUnchangedGivenNoFilter() {