diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java index 9a93d7b88..f0d3bc604 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSource.java @@ -29,6 +29,7 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService; import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState; import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumStateSerializer; @@ -42,6 +43,8 @@ import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.core.io.SimpleVersionedSerializer; +import javax.annotation.Nullable; + import java.util.Properties; /** @@ -87,6 +90,7 @@ public class DynamicKafkaSource private final OffsetsInitializer stoppingOffsetsInitializer; private final Properties properties; private final Boundedness boundedness; + @Nullable private final RecordEvaluator eofRecordEvaluator; DynamicKafkaSource( KafkaStreamSubscriber kafkaStreamSubscriber, @@ -95,7 +99,8 @@ public class DynamicKafkaSource OffsetsInitializer startingOffsetsInitializer, OffsetsInitializer stoppingOffsetsInitializer, Properties properties, - Boundedness boundedness) { + Boundedness boundedness, + @Nullable RecordEvaluator eofRecordEvaluator) { this.kafkaStreamSubscriber = kafkaStreamSubscriber; this.deserializationSchema = deserializationSchema; this.properties = properties; @@ -103,6 +108,7 @@ public class DynamicKafkaSource this.startingOffsetsInitializer = startingOffsetsInitializer; this.stoppingOffsetsInitializer = stoppingOffsetsInitializer; this.boundedness = boundedness; + this.eofRecordEvaluator = eofRecordEvaluator; } /** @@ -134,7 +140,8 @@ public Boundedness getBoundedness() { @Override public SourceReader createReader( SourceReaderContext readerContext) { - return new DynamicKafkaSourceReader<>(readerContext, deserializationSchema, properties); + return new DynamicKafkaSourceReader<>( + readerContext, deserializationSchema, properties, eofRecordEvaluator); } /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java index eab37c4ee..6de4f9d49 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService; import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber; import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber; @@ -52,6 +53,7 @@ public class DynamicKafkaSourceBuilder { private OffsetsInitializer stoppingOffsetsInitializer; private Boundedness boundedness; private final Properties props; + private RecordEvaluator eofRecordEvaluator; DynamicKafkaSourceBuilder() { this.kafkaStreamSubscriber = null; @@ -140,6 +142,18 @@ public DynamicKafkaSourceBuilder setDeserializer( return this; } + /** + * Set the {@link RecordEvaluator}. + * + * @param eofRecordEvaluator the {@link RecordEvaluator}. + * @return the builder. + */ + public DynamicKafkaSourceBuilder setEofRecordEvaluator( + RecordEvaluator eofRecordEvaluator) { + this.eofRecordEvaluator = eofRecordEvaluator; + return this; + } + /** * Set the starting offsets of the stream. This will be applied to all clusters. * @@ -217,7 +231,8 @@ public DynamicKafkaSource build() { startingOffsetsInitializer, stoppingOffsetsInitializer, props, - boundedness); + boundedness, + eofRecordEvaluator); } // Below are utility methods, code and structure are mostly copied over from KafkaSourceBuilder diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java index 8220ea14c..902bd39cf 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java @@ -26,6 +26,7 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata; @@ -54,6 +55,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -95,11 +98,13 @@ public class DynamicKafkaSourceReader implements SourceReader eofRecordEvaluator; public DynamicKafkaSourceReader( SourceReaderContext readerContext, KafkaRecordDeserializationSchema deserializationSchema, - Properties properties) { + Properties properties, + @Nullable RecordEvaluator eofRecordEvaluator) { this.readerContext = readerContext; this.clusterReaderMap = new TreeMap<>(); this.deserializationSchema = deserializationSchema; @@ -116,6 +121,7 @@ public DynamicKafkaSourceReader( this.isActivelyConsumingSplits = false; this.restartingReaders = new AtomicBoolean(); this.clustersProperties = new HashMap<>(); + this.eofRecordEvaluator = eofRecordEvaluator; } /** @@ -448,7 +454,8 @@ public UserCodeClassLoader getUserCodeClassLoader() { } }); - KafkaRecordEmitter recordEmitter = new KafkaRecordEmitter<>(deserializationSchema); + KafkaRecordEmitter recordEmitter = + new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator); return new KafkaSourceReader<>( elementsQueue, new KafkaSourceFetcherManager( @@ -463,7 +470,8 @@ public UserCodeClassLoader getUserCodeClassLoader() { recordEmitter, toConfiguration(readerSpecificProperties), readerContext, - kafkaSourceReaderMetrics); + kafkaSourceReaderMetrics, + eofRecordEvaluator); } /** diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java index 54f5f856c..a17b54425 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java @@ -31,6 +31,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; @@ -102,11 +103,13 @@ public class KafkaSource private final Properties props; // Client rackId callback private final SerializableSupplier rackIdSupplier; + @Nullable private RecordEvaluator eofRecordEvaluator; KafkaSource( KafkaSubscriber subscriber, OffsetsInitializer startingOffsetsInitializer, @Nullable OffsetsInitializer stoppingOffsetsInitializer, + @Nullable RecordEvaluator eofRecordEvaluator, Boundedness boundedness, KafkaRecordDeserializationSchema deserializationSchema, Properties props, @@ -118,6 +121,7 @@ public class KafkaSource this.deserializationSchema = deserializationSchema; this.props = props; this.rackIdSupplier = rackIdSupplier; + this.eofRecordEvaluator = eofRecordEvaluator; } /** @@ -171,7 +175,8 @@ public UserCodeClassLoader getUserCodeClassLoader() { Optional.ofNullable(rackIdSupplier) .map(Supplier::get) .orElse(null)); - KafkaRecordEmitter recordEmitter = new KafkaRecordEmitter<>(deserializationSchema); + KafkaRecordEmitter recordEmitter = + new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator); return new KafkaSourceReader<>( elementsQueue, @@ -180,7 +185,8 @@ public UserCodeClassLoader getUserCodeClassLoader() { recordEmitter, toConfiguration(props), readerContext, - kafkaSourceReaderMetrics); + kafkaSourceReaderMetrics, + eofRecordEvaluator); } @Internal @@ -251,4 +257,10 @@ KafkaSubscriber getKafkaSubscriber() { OffsetsInitializer getStoppingOffsetsInitializer() { return stoppingOffsetsInitializer; } + + @VisibleForTesting + @Nullable + RecordEvaluator getEofRecordEvaluator() { + return eofRecordEvaluator; + } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index 78a4b0b60..bf7c4beb7 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator; @@ -107,6 +108,7 @@ public class KafkaSourceBuilder { protected Properties props; // Client rackId supplier private SerializableSupplier rackIdSupplier; + private RecordEvaluator eofRecordEvaluator; KafkaSourceBuilder() { this.subscriber = null; @@ -353,6 +355,26 @@ public KafkaSourceBuilder setValueOnlyDeserializer( return this; } + /** + * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource. + * + *

When the evaluator is specified, it is invoked for each de-serialized record to determine + * whether the corresponding split has reached end of stream. If a record is matched by the + * evaluator, the source would not emit this record as well as the following records in the same + * split. + * + *

Note that the evaluator works jointly with the stopping offsets specified by the {@link + * #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source + * stops consuming from a split when any of these conditions is met. + * + * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator} + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder setEofRecordEvaluator(RecordEvaluator eofRecordEvaluator) { + this.eofRecordEvaluator = eofRecordEvaluator; + return this; + } + /** * Sets the client id prefix of this KafkaSource. * @@ -435,6 +457,7 @@ public KafkaSource build() { subscriber, startingOffsetsInitializer, stoppingOffsetsInitializer, + eofRecordEvaluator, boundedness, deserializationSchema, props, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index 23956f5d5..677198f16 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -25,6 +25,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsRemoval; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; @@ -76,6 +77,9 @@ public class KafkaPartitionSplitReader // Tracking empty splits that has not been added to finished splits in fetch() private final Set emptySplits = new HashSet<>(); + // Tracking removed splits that has not been added to finished splits in fetch() + private final Set removedSplits = new HashSet<>(); + public KafkaPartitionSplitReader( Properties props, SourceReaderContext context, @@ -116,7 +120,7 @@ public RecordsWithSplitIds> fetch() throws IOExce KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords( ConsumerRecords.empty(), kafkaSourceReaderMetrics); - markEmptySplitsAsFinished(recordsBySplits); + markSplitsAsFinished(recordsBySplits); return recordsBySplits; } KafkaPartitionSplitRecords recordsBySplits = @@ -148,7 +152,7 @@ public RecordsWithSplitIds> fetch() throws IOExce kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp); }); - markEmptySplitsAsFinished(recordsBySplits); + markSplitsAsFinished(recordsBySplits); // Unassign the partitions that has finished. if (!finishedPartitions.isEmpty()) { @@ -162,25 +166,55 @@ public RecordsWithSplitIds> fetch() throws IOExce return recordsBySplits; } - private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords recordsBySplits) { + private void markSplitsAsFinished(KafkaPartitionSplitRecords recordsBySplits) { + // Some splits are discovered as empty when handling split additions. These splits should be + // added to finished splits to clean up states in split fetcher and source reader. + markSplitsAsFinished(emptySplits, recordsBySplits); + + // Some splits are removed when handling split changes. These splits should be + // added to finished splits to clean up states in split fetcher and source reader. + markSplitsAsFinished(removedSplits, recordsBySplits); + } + + private void markSplitsAsFinished( + Set splits, KafkaPartitionSplitRecords recordsBySplits) { // Some splits are discovered as empty when handling split additions. These splits should be // added to finished splits to clean up states in split fetcher and source reader. - if (!emptySplits.isEmpty()) { - recordsBySplits.finishedSplits.addAll(emptySplits); - emptySplits.clear(); + if (!splits.isEmpty()) { + recordsBySplits.finishedSplits.addAll(splits); + splits.clear(); } } @Override public void handleSplitsChanges(SplitsChange splitsChange) { - // Get all the partition assignments and stopping offsets. - if (!(splitsChange instanceof SplitsAddition)) { + if (splitsChange instanceof SplitsAddition) { + // Get all the partition assignments and stopping offsets. + handleSplitsAddition(splitsChange); + } else if (splitsChange instanceof SplitsRemoval) { + handleSplitsRemoval(splitsChange); + } else { throw new UnsupportedOperationException( String.format( "The SplitChange type of %s is not supported.", splitsChange.getClass())); } + } + + private void handleSplitsRemoval(SplitsChange splitsRemoval) { + removedSplits.addAll( + splitsRemoval.splits().stream() + .map(KafkaPartitionSplit::splitId) + .collect(Collectors.toSet())); + List finishedPartitions = + splitsRemoval.splits().stream() + .map(KafkaPartitionSplit::getTopicPartition) + .collect(Collectors.toList()); + finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric); + unassignPartitions(finishedPartitions); + } + private void handleSplitsAddition(SplitsChange splitsAddition) { // Assignment. List newPartitionAssignments = new ArrayList<>(); // Starting offsets. @@ -192,7 +226,7 @@ public void handleSplitsChanges(SplitsChange splitsChange) Set partitionsStoppingAtCommitted = new HashSet<>(); // Parse the starting and stopping offsets. - splitsChange + splitsAddition .splits() .forEach( s -> { @@ -223,7 +257,7 @@ public void handleSplitsChanges(SplitsChange splitsChange) // After acquiring the starting and stopping offsets, remove the empty splits if necessary. removeEmptySplits(); - maybeLogSplitChangesHandlingResult(splitsChange); + maybeLogSplitChangesHandlingResult(splitsAddition); } @Override diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java index 3a1320a72..939c8e6bc 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java @@ -21,13 +21,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; +import javax.annotation.Nullable; + import java.io.IOException; +import java.util.HashSet; +import java.util.Set; /** The {@link RecordEmitter} implementation for {@link KafkaSourceReader}. */ @Internal @@ -35,10 +40,17 @@ public class KafkaRecordEmitter implements RecordEmitter, T, KafkaPartitionSplitState> { private final KafkaRecordDeserializationSchema deserializationSchema; - private final SourceOutputWrapper sourceOutputWrapper = new SourceOutputWrapper<>(); + private final SourceOutputWrapper sourceOutputWrapper; + @Nullable private final RecordEvaluator eofRecordEvaluator; + private final Set finishedSplits; - public KafkaRecordEmitter(KafkaRecordDeserializationSchema deserializationSchema) { + public KafkaRecordEmitter( + KafkaRecordDeserializationSchema deserializationSchema, + @Nullable RecordEvaluator eofRecordEvaluator) { this.deserializationSchema = deserializationSchema; + this.sourceOutputWrapper = new SourceOutputWrapper<>(eofRecordEvaluator); + this.eofRecordEvaluator = eofRecordEvaluator; + this.finishedSplits = new HashSet<>(); } @Override @@ -51,20 +63,35 @@ public void emitRecord( sourceOutputWrapper.setSourceOutput(output); sourceOutputWrapper.setTimestamp(consumerRecord.timestamp()); deserializationSchema.deserialize(consumerRecord, sourceOutputWrapper); - splitState.setCurrentOffset(consumerRecord.offset() + 1); + + if (sourceOutputWrapper.isEofRecord()) { + finishedSplits.add(splitState.splitId()); + } + if (eofRecordEvaluator == null || !finishedSplits.contains(splitState.splitId())) { + splitState.setCurrentOffset(consumerRecord.offset() + 1); + } } catch (Exception e) { throw new IOException("Failed to deserialize consumer record due to", e); } } private static class SourceOutputWrapper implements Collector { + @Nullable private final RecordEvaluator eofRecordEvaluator; private SourceOutput sourceOutput; private long timestamp; + private boolean isEofRecord = false; + + public SourceOutputWrapper(@Nullable RecordEvaluator eofRecordEvaluator) { + this.eofRecordEvaluator = eofRecordEvaluator; + } @Override public void collect(T record) { sourceOutput.collect(record, timestamp); + if (eofRecordEvaluator != null) { + isEofRecord = eofRecordEvaluator.isEndOfStream(record); + } } @Override @@ -77,5 +104,10 @@ private void setSourceOutput(SourceOutput sourceOutput) { private void setTimestamp(long timestamp) { this.timestamp = timestamp; } + + /** Whether the previous sent record is an eof record. */ + public boolean isEofRecord() { + return isEofRecord; + } } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java index cb85fc89d..1d0fc7466 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; @@ -38,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -68,8 +71,15 @@ public KafkaSourceReader( recordEmitter, Configuration config, SourceReaderContext context, - KafkaSourceReaderMetrics kafkaSourceReaderMetrics) { - super(elementsQueue, kafkaSourceFetcherManager, recordEmitter, config, context); + KafkaSourceReaderMetrics kafkaSourceReaderMetrics, + @Nullable RecordEvaluator recordEvaluator) { + super( + elementsQueue, + kafkaSourceFetcherManager, + recordEmitter, + recordEvaluator, + config, + context); this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.offsetsOfFinishedSplits = new ConcurrentHashMap<>(); this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics; diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java index 11d3c659f..e72998233 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java @@ -194,6 +194,12 @@ public class KafkaConnectorOptions { + "The value 0 disables the partition discovery." + "The default value is 5 minutes, which is equal to the default value of metadata.max.age.ms in Kafka."); + public static final ConfigOption SCAN_RECORD_EVALUATOR_CLASS = + ConfigOptions.key("scan.record.evaluator.class") + .stringType() + .noDefaultValue() + .withDescription("Record evaluator to decide the end of the Kafka stream."); + // -------------------------------------------------------------------------------------------- // Sink specific options // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java index c963da762..d789c347c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -19,10 +19,12 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; @@ -171,6 +173,8 @@ public class KafkaDynamicSource protected final String tableIdentifier; + @Nullable protected final RecordEvaluator recordEvaluator; + public KafkaDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -188,7 +192,8 @@ public KafkaDynamicSource( Map specificBoundedOffsets, long boundedTimestampMillis, boolean upsertMode, - String tableIdentifier) { + String tableIdentifier, + @Nullable RecordEvaluator recordEvaluator) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -228,6 +233,7 @@ public KafkaDynamicSource( this.boundedTimestampMillis = boundedTimestampMillis; this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; + this.recordEvaluator = recordEvaluator; } @Override @@ -344,7 +350,8 @@ public DynamicTableSource copy() { specificBoundedOffsets, boundedTimestampMillis, upsertMode, - tableIdentifier); + tableIdentifier, + recordEvaluator); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -486,6 +493,10 @@ protected KafkaSource createKafkaSource( break; } + if (recordEvaluator != null) { + kafkaSourceBuilder.setEofRecordEvaluator(recordEvaluator); + } + kafkaSourceBuilder .setProperties(properties) .setDeserializer(KafkaRecordDeserializationSchema.of(kafkaDeserializer)); @@ -568,6 +579,12 @@ private KafkaDeserializationSchema createKafkaDeserializationSchema( return format.createRuntimeDecoder(context, physicalFormatDataType); } + @VisibleForTesting + @Nullable + public RecordEvaluator getRecordEvaluator() { + return recordEvaluator; + } + // -------------------------------------------------------------------------------------------- // Metadata handling // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 8124691a5..d7aeddd89 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; @@ -74,6 +75,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_RECORD_EVALUATOR_CLASS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_TIMESTAMP_MILLIS; @@ -144,6 +146,7 @@ public Set> optionalOptions() { options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); options.add(SCAN_TOPIC_PARTITION_DISCOVERY); options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); + options.add(SCAN_RECORD_EVALUATOR_CLASS); options.add(SINK_PARTITIONER); options.add(SINK_PARALLELISM); options.add(DELIVERY_GUARANTEE); @@ -166,6 +169,7 @@ public Set> forwardOptions() { SCAN_STARTUP_SPECIFIC_OFFSETS, SCAN_TOPIC_PARTITION_DISCOVERY, SCAN_STARTUP_TIMESTAMP_MILLIS, + SCAN_RECORD_EVALUATOR_CLASS, SINK_PARTITIONER, SINK_PARALLELISM, TRANSACTIONAL_ID_PREFIX) @@ -215,6 +219,15 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + final RecordEvaluator recordEvaluator; + try { + recordEvaluator = + loadRecordEvaluator( + tableOptions.getOptional(SCAN_RECORD_EVALUATOR_CLASS).orElse(null)); + } catch (Exception e) { + throw new IllegalArgumentException("Fail to load the RecordEvaluator class.", e); + } + return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), @@ -231,7 +244,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedMode, boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, - context.getObjectIdentifier().asSummaryString()); + context.getObjectIdentifier().asSummaryString(), + recordEvaluator); } @Override @@ -378,6 +392,16 @@ private static DeliveryGuarantee validateDeprecatedSemantic(ReadableConfig table return tableOptions.get(DELIVERY_GUARANTEE); } + private RecordEvaluator loadRecordEvaluator(String recordEvaluatorClassName) + throws Exception { + if (recordEvaluatorClassName == null) { + return null; + } + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + Class recordEvaluatorClass = classLoader.loadClass(recordEvaluatorClassName); + return (RecordEvaluator) recordEvaluatorClass.newInstance(); + } + // -------------------------------------------------------------------------------------------- protected KafkaDynamicSource createKafkaTableSource( @@ -396,7 +420,8 @@ protected KafkaDynamicSource createKafkaTableSource( BoundedMode boundedMode, Map specificEndOffsets, long endTimestampMillis, - String tableIdentifier) { + String tableIdentifier, + @Nullable RecordEvaluator recordEvaluator) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -414,7 +439,8 @@ protected KafkaDynamicSource createKafkaTableSource( specificEndOffsets, endTimestampMillis, false, - tableIdentifier); + tableIdentifier, + recordEvaluator); } protected KafkaDynamicSink createKafkaTableSink( diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java index 78debc175..2d83d4442 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -167,7 +167,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, true, - context.getObjectIdentifier().asSummaryString()); + context.getObjectIdentifier().asSummaryString(), + null); } @Override diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java index 5094e0151..08da327f3 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReaderTest.java @@ -271,7 +271,8 @@ private DynamicKafkaSourceReader createReaderWithoutStart( return new DynamicKafkaSourceReader<>( context, KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class), - properties); + properties, + null); } private SourceReader startReader( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java index 2829f01e0..93fec9cd0 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilderTest.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.source; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; @@ -217,6 +218,16 @@ public void testSettingInvalidCustomDeserializers( .hasMessageContaining(expectedError); } + @Test + public void testSettingRecordEvaluator() { + assertThat(getBasicBuilder().build().getEofRecordEvaluator()).isNull(); + + RecordEvaluator recordEvaluator = s -> s.contains("test"); + final KafkaSource kafkaSource = + getBasicBuilder().setEofRecordEvaluator(recordEvaluator).build(); + assertThat(kafkaSource.getEofRecordEvaluator()).isEqualTo(recordEvaluator); + } + private KafkaSourceBuilder getBasicBuilder() { return new KafkaSourceBuilder() .setBootstrapServers("testServer") diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java index 6c0bd7e50..ec62d1f46 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.connector.kafka.testutils.DockerImageVersions; @@ -80,6 +81,7 @@ import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.PARTITION; import static org.apache.flink.connector.kafka.testutils.KafkaSourceExternalContext.SplitMappingMode.TOPIC; +import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION; import static org.apache.flink.streaming.connectors.kafka.KafkaTestBase.kafkaServer; import static org.assertj.core.api.Assertions.assertThat; @@ -168,6 +170,45 @@ public void testBasicRead(boolean enableObjectReuse) throws Exception { executeAndVerify(env, stream); } + @Test + public void testEndWithRecordEvaluator() throws Throwable { + KafkaSource source = + KafkaSource.builder() + .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) + .setGroupId("testEndWithRecordEvaluator") + .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + .setDeserializer(new TestingKafkaRecordDeserializationSchema(false)) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setBounded(new NoStoppingOffsetsInitializer()) + .setEofRecordEvaluator( + pav -> { + String tp = pav.tp; + int expectedValue = + Integer.parseInt(tp.substring(tp.lastIndexOf('-'))); + return pav.value != expectedValue; + }) + .build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + DataStream stream = + env.fromSource( + source, WatermarkStrategy.noWatermarks(), "testEndWithRecordEvaluator"); + + Map> resultPerPartition = + executeAndGetResultPerPartition(env, stream); + resultPerPartition.forEach( + (tp, values) -> { + int expectedValue = Integer.parseInt(tp.substring(tp.lastIndexOf('-') + 1)); + assertThat(values.size()).isEqualTo(1); + assertThat((int) values.get(0)) + .as( + String.format( + "The value for partition %s should be only %d", + tp, expectedValue)) + .isEqualTo(expectedValue); + }); + } + @Test public void testValueOnlyDeserializer() throws Exception { KafkaSource source = @@ -202,9 +243,7 @@ public void testValueOnlyDeserializer() throws Exception { for (int partition = 0; partition < KafkaSourceTestEnv.NUM_PARTITIONS; partition++) { - for (int value = partition; - value < KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION; - value++) { + for (int value = partition; value < NUM_RECORDS_PER_PARTITION; value++) { expectedSum += value; } } @@ -516,6 +555,30 @@ public void processElement(StreamRecord element) { private void executeAndVerify( StreamExecutionEnvironment env, DataStream stream) throws Exception { + Map> resultPerPartition = + executeAndGetResultPerPartition(env, stream); + + // Expected elements from partition P should be an integer sequence from P to + // NUM_RECORDS_PER_PARTITION. + resultPerPartition.forEach( + (tp, values) -> { + int firstExpectedValue = + Integer.parseInt(tp.substring(tp.lastIndexOf('-') + 1)); + assertThat(values.size()) + .isEqualTo(NUM_RECORDS_PER_PARTITION - firstExpectedValue); + for (int i = 0; i < values.size(); i++) { + assertThat((int) values.get(i)) + .as( + String.format( + "The %d-th value for partition %s should be %d", + i, tp, firstExpectedValue + i)) + .isEqualTo(firstExpectedValue + i); + } + }); + } + + private Map> executeAndGetResultPerPartition( + StreamExecutionEnvironment env, DataStream stream) throws Exception { stream.addSink( new RichSinkFunction() { @Override @@ -536,22 +599,7 @@ public void invoke(PartitionAndValue value, Context context) { resultPerPartition .computeIfAbsent(partitionAndValue.tp, ignored -> new ArrayList<>()) .add(partitionAndValue.value)); - - // Expected elements from partition P should be an integer sequence from P to - // NUM_RECORDS_PER_PARTITION. - resultPerPartition.forEach( - (tp, values) -> { - int firstExpectedValue = - Integer.parseInt(tp.substring(tp.lastIndexOf('-') + 1)); - for (int i = 0; i < values.size(); i++) { - assertThat((int) values.get(i)) - .as( - String.format( - "The %d-th value for partition %s should be %d", - i, tp, firstExpectedValue + i)) - .isEqualTo(firstExpectedValue + i); - } - }); + return resultPerPartition; } private static class OnEventWatermarkGenerator diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java index b592a6917..97eb16989 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReaderTest.java @@ -22,6 +22,7 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsRemoval; import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics; import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit; import org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv; @@ -100,12 +101,23 @@ public static void tearDown() throws Exception { } @Test - public void testHandleSplitChangesAndFetch() throws Exception { + public void testHandleSplitsAdditionAndFetch() throws Exception { KafkaPartitionSplitReader reader = createReader(); assignSplitsAndFetchUntilFinish(reader, 0); assignSplitsAndFetchUntilFinish(reader, 1); } + @Test + public void testHandleSplitsRemovalAndFetch() throws Exception { + KafkaPartitionSplitReader reader = createReader(); + + Map splits = assignAndRemoveSplits(reader, 0); + assignSplitsAndFetchUntilFinish(reader, splits); + + splits = assignAndRemoveSplits(reader, 1); + assignSplitsAndFetchUntilFinish(reader, splits); + } + @Test public void testWakeUp() throws Exception { KafkaPartitionSplitReader reader = createReader(); @@ -358,7 +370,12 @@ private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, i throws IOException { Map splits = assignSplits(reader, splitsByOwners.get(readerId)); + assignSplitsAndFetchUntilFinish(reader, splits); + } + private void assignSplitsAndFetchUntilFinish( + KafkaPartitionSplitReader reader, Map splits) + throws IOException { Map numConsumedRecords = new HashMap<>(); Set finishedSplits = new HashSet<>(); while (finishedSplits.size() < splits.size()) { @@ -407,6 +424,19 @@ private void assignSplitsAndFetchUntilFinish(KafkaPartitionSplitReader reader, i }); } + private Map assignAndRemoveSplits( + KafkaPartitionSplitReader reader, int readerId) { + Map splits = + assignSplits(reader, splitsByOwners.get(readerId)); + Map originalSplits = splitsByOwners.get(readerId); + for (String key : originalSplits.keySet()) { + removeSplits(reader, Collections.singletonList(originalSplits.get(key))); + splits.remove(key); + break; + } + return splits; + } + // ------------------ private KafkaPartitionSplitReader createReader() { @@ -446,6 +476,11 @@ private Map assignSplits( return splits; } + private void removeSplits(KafkaPartitionSplitReader reader, List splits) { + SplitsChange splitsChange = new SplitsRemoval<>(splits); + reader.handleSplitsChanges(splitsChange); + } + private boolean verifyConsumed( final KafkaPartitionSplit split, final long expectedStartingOffset, diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java index 5ad87ffc2..5cd477fca 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; @@ -56,6 +57,8 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import javax.annotation.Nullable; + import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -70,6 +73,8 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.COMMITS_SUCCEEDED_METRIC_COUNTER; import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.COMMITTED_OFFSET_METRIC_GAUGE; @@ -80,6 +85,9 @@ import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.PARTITION_GROUP; import static org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.TOPIC_GROUP; import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_PARTITIONS; +import static org.apache.flink.connector.kafka.testutils.KafkaSourceTestEnv.NUM_RECORDS_PER_PARTITION; +import static org.apache.flink.core.io.InputStatus.END_OF_INPUT; +import static org.apache.flink.core.io.InputStatus.NOTHING_AVAILABLE; import static org.apache.flink.core.testutils.CommonTestUtils.waitUtil; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.never; @@ -144,7 +152,7 @@ void testCommitOffsetsWithoutAliveFetchers() throws Exception { InputStatus status; do { status = reader.pollNext(output); - } while (status != InputStatus.NOTHING_AVAILABLE); + } while (status != NOTHING_AVAILABLE); pollUntil( reader, output, @@ -296,6 +304,7 @@ void testDisableOffsetCommit() throws Exception { new TestingReaderContext(), (ignore) -> {}, properties, + null, null)) { reader.addSplits( getSplits(numSplits, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED)); @@ -517,7 +526,8 @@ public void testThatReaderDoesNotCallRackIdSupplierOnInit() throws Exception { new TestingReaderContext(), (ignore) -> {}, new Properties(), - rackIdSupplier)) { + rackIdSupplier, + null)) { // Do nothing here } @@ -536,7 +546,8 @@ public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Excep new TestingReaderContext(), (ignore) -> {}, new Properties(), - rackIdSupplier)) { + rackIdSupplier, + null)) { reader.addSplits( Collections.singletonList( new KafkaPartitionSplit(new TopicPartition(TOPIC, 1), 1L))); @@ -545,6 +556,111 @@ public void testThatReaderDoesCallRackIdSupplierOnSplitAssignment() throws Excep verify(rackIdSupplier).get(); } + @Test + public void testReadingWithRecordEvaluatorAndAllSplitsFinished() throws Exception { + final int readRecordNumPerSplit = 9; + final int readSplits = 2; + final Set finishedSplits = new HashSet<>(); + try (final KafkaSourceReader reader = + (KafkaSourceReader) + createReader( + Boundedness.BOUNDED, + "groupId", + new TestingReaderContext(), + finishedSplits::addAll, + r -> (r % NUM_RECORDS_PER_SPLIT) == readRecordNumPerSplit)) { + List splits = new ArrayList<>(); + List excepted = new ArrayList<>(); + for (int i = 0; i < readSplits; i++) { + splits.add( + new KafkaPartitionSplit( + new TopicPartition(TOPIC, i), 0, Integer.MAX_VALUE)); + excepted.addAll( + IntStream.range( + i * NUM_RECORDS_PER_SPLIT, + (i + 1) * NUM_RECORDS_PER_SPLIT - 1) + .boxed() + .collect(Collectors.toList())); + } + + reader.addSplits(splits); + reader.notifyNoMoreSplits(); + + TestingReaderOutput output = new TestingReaderOutput<>(); + pollUntil( + reader, + output, + () -> finishedSplits.size() == splits.size(), + "The reader cannot get the excepted result before timeout."); + InputStatus status = reader.pollNext(output); + assertThat(output.getEmittedRecords().size()) + .isEqualTo(readRecordNumPerSplit * readSplits); + assertThat(finishedSplits) + .containsExactly( + splits.stream() + .map(s -> s.getTopicPartition().toString()) + .toArray(String[]::new)); + assertThat(output.getEmittedRecords()) + .containsExactlyInAnyOrder(excepted.toArray(new Integer[0])); + assertThat(status).isEqualTo(END_OF_INPUT); + } + } + + @Test + public void testReadingWithRecordEvaluatorAndSomeSplitsFinished() throws Exception { + final int finishPartitionIndex = 1; + final int readRecordNumInFinishedSplit = 7; + final int readSplits = 2; + final Set finishedSplits = new HashSet<>(); + + try (final KafkaSourceReader reader = + (KafkaSourceReader) + createReader( + Boundedness.BOUNDED, + "groupId", + new TestingReaderContext(), + finishedSplits::addAll, + r -> + r + == (finishPartitionIndex * NUM_RECORDS_PER_PARTITION + + readRecordNumInFinishedSplit))) { + List splits = new ArrayList<>(); + List excepted = new ArrayList<>(); + for (int i = 0; i < readSplits; i++) { + splits.add( + new KafkaPartitionSplit( + new TopicPartition(TOPIC, i), 0, Integer.MAX_VALUE)); + excepted.addAll( + IntStream.range( + i * NUM_RECORDS_PER_SPLIT, + i * NUM_RECORDS_PER_SPLIT + + (i == finishPartitionIndex + ? readRecordNumInFinishedSplit + : NUM_RECORDS_PER_SPLIT)) + .boxed() + .collect(Collectors.toList())); + } + + reader.addSplits(splits); + reader.notifyNoMoreSplits(); + + TestingReaderOutput output = new TestingReaderOutput<>(); + pollUntil( + reader, + output, + () -> output.getEmittedRecords().size() == excepted.size(), + "The reader cannot get the excepted result before timeout."); + assertThat(finishedSplits) + .containsExactly(new TopicPartition(TOPIC, finishPartitionIndex).toString()); + assertThat(output.getEmittedRecords()) + .containsExactlyInAnyOrder(excepted.toArray(new Integer[0])); + + InputStatus status = reader.pollNext(output); + assertThat(output.getEmittedRecords().size()).isEqualTo(excepted.size()); + assertThat(status).isEqualTo(NOTHING_AVAILABLE); + } + } + // ------------------------------------------ @Override @@ -599,9 +715,20 @@ private SourceReader createReader( SourceReaderContext context, Consumer> splitFinishedHook) throws Exception { + return createReader(boundedness, groupId, context, splitFinishedHook, null); + } + + private SourceReader createReader( + Boundedness boundedness, + String groupId, + SourceReaderContext context, + Consumer> splitFinishedHook, + @Nullable RecordEvaluator recordEvaluator) + throws Exception { Properties properties = new Properties(); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); - return createReader(boundedness, context, splitFinishedHook, properties, null); + return createReader( + boundedness, context, splitFinishedHook, properties, null, recordEvaluator); } private SourceReader createReader( @@ -609,7 +736,8 @@ private SourceReader createReader( SourceReaderContext context, Consumer> splitFinishedHook, Properties props, - SerializableSupplier rackIdSupplier) + SerializableSupplier rackIdSupplier, + @Nullable RecordEvaluator recordEvaluator) throws Exception { KafkaSourceBuilder builder = KafkaSource.builder() @@ -622,7 +750,8 @@ private SourceReader createReader( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaSourceTestEnv.brokerConnectionStrings) .setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - .setProperties(props); + .setProperties(props) + .setEofRecordEvaluator(recordEvaluator); if (boundedness == Boundedness.BOUNDED) { builder.setBounded(OffsetsInitializer.latest()); } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index c1d796d08..557c1a577 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.base.source.reader.RecordEvaluator; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; @@ -47,6 +48,7 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; import org.apache.flink.streaming.connectors.kafka.testutils.MockPartitionOffsetsRetriever; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRecordEvaluator; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; @@ -91,6 +93,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -212,7 +215,8 @@ public void testTableSource() { KAFKA_SOURCE_PROPERTIES, StartupMode.SPECIFIC_OFFSETS, specificOffsets, - 0); + 0, + null); assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); ScanTableSource.ScanRuntimeProvider provider = @@ -254,7 +258,8 @@ public void testTableSourceWithPattern() { KAFKA_SOURCE_PROPERTIES, StartupMode.EARLIEST, specificOffsets, - 0); + 0, + null); final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource; assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); @@ -295,7 +300,8 @@ public void testTableSourceWithKeyValue() { KAFKA_FINAL_SOURCE_PROPERTIES, StartupMode.GROUP_OFFSETS, Collections.emptyMap(), - 0); + 0, + null); assertThat(actualSource).isEqualTo(expectedKafkaSource); } @@ -346,7 +352,8 @@ public void testTableSourceWithKeyValueAndMetadata() { KAFKA_FINAL_SOURCE_PROPERTIES, StartupMode.GROUP_OFFSETS, Collections.emptyMap(), - 0); + 0, + null); expectedKafkaSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedKafkaSource.metadataKeys = Collections.singletonList("timestamp"); @@ -397,6 +404,47 @@ public void testTableSourceSetOffsetResetWithException() { ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, errorStrategy)); } + @Test + public void testTableSourceSetRecordEvaluator() { + Map tableOptions = getBasicSourceOptions(); + tableOptions.put("scan.record.evaluator.class", MockRecordEvaluator.class.getName()); + final DynamicTableSource actualSource = createTableSource(SCHEMA, tableOptions); + final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) actualSource; + + final Map specificOffsets = new HashMap<>(); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0); + specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1); + + final DecodingFormat> valueDecodingFormat = + new DecodingFormatMock(",", true); + + // Test scan source equals + final KafkaDynamicSource expectedKafkaSource = + createExpectedScanSource( + SCHEMA_DATA_TYPE, + null, + valueDecodingFormat, + new int[0], + new int[] {0, 1, 2}, + null, + Collections.singletonList(TOPIC), + null, + KAFKA_SOURCE_PROPERTIES, + StartupMode.SPECIFIC_OFFSETS, + specificOffsets, + 0, + new MockRecordEvaluator()); + assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource); + assertThat( + Objects.requireNonNull(actualKafkaSource.getRecordEvaluator()) + .isEndOfStream(null)) + .isTrue(); + + ScanTableSource.ScanRuntimeProvider provider = + actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); + assertKafkaSource(provider); + } + private void testSetOffsetResetForStartFromGroupOffsets(String value) { final Map modifiedOptions = getModifiedOptions( @@ -1188,7 +1236,8 @@ public void testDiscoverPartitionByDefault() { props, StartupMode.SPECIFIC_OFFSETS, specificOffsets, - 0); + 0, + null); assertThat(actualSource).isEqualTo(expectedKafkaSource); ScanTableSource.ScanRuntimeProvider provider = actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); @@ -1226,7 +1275,8 @@ public void testDisableDiscoverPartition() { props, StartupMode.SPECIFIC_OFFSETS, specificOffsets, - 0); + 0, + null); assertThat(actualSource).isEqualTo(expectedKafkaSource); ScanTableSource.ScanRuntimeProvider provider = actualSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE); @@ -1249,7 +1299,8 @@ private static KafkaDynamicSource createExpectedScanSource( Properties properties, StartupMode startupMode, Map specificStartupOffsets, - long startupTimestampMillis) { + long startupTimestampMillis, + @Nullable RecordEvaluator recordEvaluator) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -1267,7 +1318,8 @@ private static KafkaDynamicSource createExpectedScanSource( Collections.emptyMap(), 0, false, - FactoryMocks.IDENTIFIER.asSummaryString()); + FactoryMocks.IDENTIFIER.asSummaryString(), + recordEvaluator); } private static KafkaDynamicSink createExpectedSink( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java index acd0550e4..5f72c9bcf 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.testutils.MockRecordEvaluator; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; @@ -48,6 +49,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.Instant; @@ -83,6 +86,7 @@ /** Basic IT cases for the Kafka table source and sink. */ @RunWith(Parameterized.class) public class KafkaTableITCase extends KafkaTableTestBase { + private static final Logger LOG = LoggerFactory.getLogger(KafkaTableITCase.class); private static final String JSON_FORMAT = "json"; private static final String AVRO_FORMAT = "avro"; @@ -1324,6 +1328,93 @@ public void testStartFromGroupOffsetsNone() { .satisfies(anyCauseMatches(NoOffsetForPartitionException.class)); } + @Test + public void testKafkaSourceWithRecordEvaluator() throws Throwable { + // we always use a different topic name for each parameterized topic, + // in order to make sure the topic can be created. + final String topic = "recordEvaluator_" + format + "_" + UUID.randomUUID(); + TableResult tableResult = null; + try { + createTestTopic(topic, 3, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + String groupId = getStandardProps().getProperty("group.id"); + String bootstraps = getBootstrapServers(); + tEnv.getConfig().set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100)); + + final String createTable = + String.format( + "CREATE TABLE kafka (\n" + + " `partition_id` INT,\n" + + " `value` STRING\n" + + ") WITH (\n" + + " 'connector' = 'kafka',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'properties.group.id' = '%s',\n" + + " 'scan.startup.mode' = 'earliest-offset',\n" + + " 'sink.partitioner' = '%s',\n" + + " 'format' = '%s',\n" + + " 'scan.record.evaluator.class' = '%s'\n" + + ")", + topic, + bootstraps, + groupId, + TestPartitioner.class.getName(), + format, + MockRecordEvaluator.class.getName()); + tEnv.executeSql(createTable); + + env.setParallelism(1); + String initialValues = + "INSERT INTO kafka\n" + + "VALUES\n" + + " (0, 'test0'),\n" + + " (1, 'test1'),\n" + + " (2, 'test2'),\n" + + " (3, 'End'),\n" + + " (4, 'End'),\n" + + " (5, 'End'),\n" + + " (6, 'should not send'),\n" + + " (7, 'should not send'),\n" + + " (8, 'should not send')\n"; + tEnv.executeSql(initialValues).await(); + + // ---------- Consume stream from Kafka ------------------- + String sinkName = "MySink"; + String createSink = + "CREATE TABLE " + + sinkName + + "(\n" + + " `partition` INT,\n" + + " `value` STRING\n" + + ") WITH (\n" + + " 'connector' = 'values'\n" + + ")"; + tEnv.executeSql(createSink); + + tableResult = tEnv.executeSql("INSERT INTO " + sinkName + " SELECT * FROM kafka"); + List expected = Arrays.asList("+I[0, test0]", "+I[1, test1]", "+I[2, test2]"); + KafkaTableTestUtils.waitingExpectedResults(sinkName, expected, Duration.ofSeconds(15)); + + // insert some records and make sure that these records will not be sent + String insertValues = + "INSERT INTO kafka\n" + + " VALUES\n" + + " (9, 'Insert new values. This should not be sent.'),\n" + + " (10, 'Insert new values. This should not be sent.'),\n" + + " (11, 'Insert new values. This should not be sent.')\n"; + tEnv.executeSql(insertValues).await(); + KafkaTableTestUtils.waitingExpectedResults(sinkName, expected, Duration.ofSeconds(15)); + } finally { + // ------------- cleanup ------------------- + if (tableResult != null) { + tableResult.getJobClient().ifPresent(JobClient::cancel); + } + deleteTestTopic(topic); + } + } + private List appendNewData( String topic, String tableName, String groupId, int targetNum) throws Exception { waitUtil( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index 1bcd775a1..72e45e356 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -869,7 +869,8 @@ private KafkaDynamicSource createExpectedScanSource( Collections.emptyMap(), 0, true, - FactoryMocks.IDENTIFIER.asSummaryString()); + FactoryMocks.IDENTIFIER.asSummaryString(), + null); } private static KafkaDynamicSink createExpectedSink( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRecordEvaluator.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRecordEvaluator.java new file mode 100644 index 000000000..94dc83b4f --- /dev/null +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRecordEvaluator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.connector.base.source.reader.RecordEvaluator; +import org.apache.flink.table.data.RowData; + +/** A mock record evaluator. */ +public class MockRecordEvaluator implements RecordEvaluator { + @Override + public boolean isEndOfStream(RowData record) { + return record == null || record.getString(1).toString().contains("End"); + } +}