Skip to content

Commit

Permalink
[FLINK-25509][connector/kafka] Support to use RecordEvaluator to filt…
Browse files Browse the repository at this point in the history
…er records in kafka connector
  • Loading branch information
ruanhang1993 committed Oct 18, 2024
1 parent 0fed445 commit c7faaca
Show file tree
Hide file tree
Showing 21 changed files with 654 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumStateSerializer;
Expand All @@ -42,6 +43,8 @@
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import javax.annotation.Nullable;

import java.util.Properties;

/**
Expand Down Expand Up @@ -87,6 +90,7 @@ public class DynamicKafkaSource<T>
private final OffsetsInitializer stoppingOffsetsInitializer;
private final Properties properties;
private final Boundedness boundedness;
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;

DynamicKafkaSource(
KafkaStreamSubscriber kafkaStreamSubscriber,
Expand All @@ -95,14 +99,16 @@ public class DynamicKafkaSource<T>
OffsetsInitializer startingOffsetsInitializer,
OffsetsInitializer stoppingOffsetsInitializer,
Properties properties,
Boundedness boundedness) {
Boundedness boundedness,
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
this.kafkaStreamSubscriber = kafkaStreamSubscriber;
this.deserializationSchema = deserializationSchema;
this.properties = properties;
this.kafkaMetadataService = kafkaMetadataService;
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
this.boundedness = boundedness;
this.eofRecordEvaluator = eofRecordEvaluator;
}

/**
Expand Down Expand Up @@ -134,7 +140,8 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, DynamicKafkaSourceSplit> createReader(
SourceReaderContext readerContext) {
return new DynamicKafkaSourceReader<>(readerContext, deserializationSchema, properties);
return new DynamicKafkaSourceReader<>(
readerContext, deserializationSchema, properties, eofRecordEvaluator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class DynamicKafkaSourceBuilder<T> {
private OffsetsInitializer stoppingOffsetsInitializer;
private Boundedness boundedness;
private final Properties props;
private RecordEvaluator<T> eofRecordEvaluator;

DynamicKafkaSourceBuilder() {
this.kafkaStreamSubscriber = null;
Expand Down Expand Up @@ -140,6 +142,18 @@ public DynamicKafkaSourceBuilder<T> setDeserializer(
return this;
}

/**
* Set the {@link RecordEvaluator}.
*
* @param eofRecordEvaluator the {@link RecordEvaluator}.
* @return the builder.
*/
public DynamicKafkaSourceBuilder<T> setEofRecordEvaluator(
RecordEvaluator<T> eofRecordEvaluator) {
this.eofRecordEvaluator = eofRecordEvaluator;
return this;
}

/**
* Set the starting offsets of the stream. This will be applied to all clusters.
*
Expand Down Expand Up @@ -217,7 +231,8 @@ public DynamicKafkaSource<T> build() {
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
boundedness);
boundedness,
eofRecordEvaluator);
}

// Below are utility methods, code and structure are mostly copied over from KafkaSourceBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
Expand Down Expand Up @@ -54,6 +55,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -95,11 +98,13 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka
private boolean isActivelyConsumingSplits;
private boolean isNoMoreSplits;
private AtomicBoolean restartingReaders;
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;

public DynamicKafkaSourceReader(
SourceReaderContext readerContext,
KafkaRecordDeserializationSchema<T> deserializationSchema,
Properties properties) {
Properties properties,
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
this.readerContext = readerContext;
this.clusterReaderMap = new TreeMap<>();
this.deserializationSchema = deserializationSchema;
Expand All @@ -116,6 +121,7 @@ public DynamicKafkaSourceReader(
this.isActivelyConsumingSplits = false;
this.restartingReaders = new AtomicBoolean();
this.clustersProperties = new HashMap<>();
this.eofRecordEvaluator = eofRecordEvaluator;
}

/**
Expand Down Expand Up @@ -448,7 +454,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}
});

KafkaRecordEmitter<T> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
KafkaRecordEmitter<T> recordEmitter =
new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator);
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
Expand All @@ -463,7 +470,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
recordEmitter,
toConfiguration(readerSpecificProperties),
readerContext,
kafkaSourceReaderMetrics);
kafkaSourceReaderMetrics,
eofRecordEvaluator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
Expand Down Expand Up @@ -102,11 +103,13 @@ public class KafkaSource<OUT>
private final Properties props;
// Client rackId callback
private final SerializableSupplier<String> rackIdSupplier;
@Nullable private RecordEvaluator<OUT> eofRecordEvaluator;

KafkaSource(
KafkaSubscriber subscriber,
OffsetsInitializer startingOffsetsInitializer,
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
@Nullable RecordEvaluator<OUT> eofRecordEvaluator,
Boundedness boundedness,
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
Properties props,
Expand All @@ -118,6 +121,7 @@ public class KafkaSource<OUT>
this.deserializationSchema = deserializationSchema;
this.props = props;
this.rackIdSupplier = rackIdSupplier;
this.eofRecordEvaluator = eofRecordEvaluator;
}

/**
Expand Down Expand Up @@ -171,7 +175,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
Optional.ofNullable(rackIdSupplier)
.map(Supplier::get)
.orElse(null));
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
KafkaRecordEmitter<OUT> recordEmitter =
new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator);

return new KafkaSourceReader<>(
elementsQueue,
Expand All @@ -180,7 +185,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
recordEmitter,
toConfiguration(props),
readerContext,
kafkaSourceReaderMetrics);
kafkaSourceReaderMetrics,
eofRecordEvaluator);
}

@Internal
Expand Down Expand Up @@ -251,4 +257,10 @@ KafkaSubscriber getKafkaSubscriber() {
OffsetsInitializer getStoppingOffsetsInitializer() {
return stoppingOffsetsInitializer;
}

@VisibleForTesting
@Nullable
RecordEvaluator<OUT> getEofRecordEvaluator() {
return eofRecordEvaluator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
Expand Down Expand Up @@ -107,6 +108,7 @@ public class KafkaSourceBuilder<OUT> {
protected Properties props;
// Client rackId supplier
private SerializableSupplier<String> rackIdSupplier;
private RecordEvaluator<OUT> eofRecordEvaluator;

KafkaSourceBuilder() {
this.subscriber = null;
Expand Down Expand Up @@ -353,6 +355,26 @@ public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
return this;
}

/**
* Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
*
* <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
* whether the corresponding split has reached end of stream. If a record is matched by the
* evaluator, the source would not emit this record as well as the following records in the same
* split.
*
* <p>Note that the evaluator works jointly with the stopping offsets specified by the {@link
* #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source
* stops consuming from a split when any of these conditions is met.
*
* @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
* @return this KafkaSourceBuilder.
*/
public KafkaSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
this.eofRecordEvaluator = eofRecordEvaluator;
return this;
}

/**
* Sets the client id prefix of this KafkaSource.
*
Expand Down Expand Up @@ -435,6 +457,7 @@ public KafkaSource<OUT> build() {
subscriber,
startingOffsetsInitializer,
stoppingOffsetsInitializer,
eofRecordEvaluator,
boundedness,
deserializationSchema,
props,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsRemoval;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class KafkaPartitionSplitReader
// Tracking empty splits that has not been added to finished splits in fetch()
private final Set<String> emptySplits = new HashSet<>();

// Tracking removed splits that has not been added to finished splits in fetch()
private final Set<String> removedSplits = new HashSet<>();

public KafkaPartitionSplitReader(
Properties props,
SourceReaderContext context,
Expand Down Expand Up @@ -116,7 +120,7 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(
ConsumerRecords.empty(), kafkaSourceReaderMetrics);
markEmptySplitsAsFinished(recordsBySplits);
markSplitsAsFinished(recordsBySplits);
return recordsBySplits;
}
KafkaPartitionSplitRecords recordsBySplits =
Expand Down Expand Up @@ -148,7 +152,7 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp);
});

markEmptySplitsAsFinished(recordsBySplits);
markSplitsAsFinished(recordsBySplits);

// Unassign the partitions that has finished.
if (!finishedPartitions.isEmpty()) {
Expand All @@ -162,25 +166,55 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
return recordsBySplits;
}

private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords recordsBySplits) {
private void markSplitsAsFinished(KafkaPartitionSplitRecords recordsBySplits) {
// Some splits are discovered as empty when handling split additions. These splits should be
// added to finished splits to clean up states in split fetcher and source reader.
markSplitsAsFinished(emptySplits, recordsBySplits);

// Some splits are removed when handling split changes. These splits should be
// added to finished splits to clean up states in split fetcher and source reader.
markSplitsAsFinished(removedSplits, recordsBySplits);
}

private void markSplitsAsFinished(
Set<String> splits, KafkaPartitionSplitRecords recordsBySplits) {
// Some splits are discovered as empty when handling split additions. These splits should be
// added to finished splits to clean up states in split fetcher and source reader.
if (!emptySplits.isEmpty()) {
recordsBySplits.finishedSplits.addAll(emptySplits);
emptySplits.clear();
if (!splits.isEmpty()) {
recordsBySplits.finishedSplits.addAll(splits);
splits.clear();
}
}

@Override
public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
// Get all the partition assignments and stopping offsets.
if (!(splitsChange instanceof SplitsAddition)) {
if (splitsChange instanceof SplitsAddition) {
// Get all the partition assignments and stopping offsets.
handleSplitsAddition(splitsChange);
} else if (splitsChange instanceof SplitsRemoval) {
handleSplitsRemoval(splitsChange);
} else {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChange.getClass()));
}
}

private void handleSplitsRemoval(SplitsChange<KafkaPartitionSplit> splitsRemoval) {
removedSplits.addAll(
splitsRemoval.splits().stream()
.map(KafkaPartitionSplit::splitId)
.collect(Collectors.toSet()));
List<TopicPartition> finishedPartitions =
splitsRemoval.splits().stream()
.map(KafkaPartitionSplit::getTopicPartition)
.collect(Collectors.toList());
finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);
unassignPartitions(finishedPartitions);
}

private void handleSplitsAddition(SplitsChange<KafkaPartitionSplit> splitsAddition) {
// Assignment.
List<TopicPartition> newPartitionAssignments = new ArrayList<>();
// Starting offsets.
Expand All @@ -192,7 +226,7 @@ public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange)
Set<TopicPartition> partitionsStoppingAtCommitted = new HashSet<>();

// Parse the starting and stopping offsets.
splitsChange
splitsAddition
.splits()
.forEach(
s -> {
Expand Down Expand Up @@ -223,7 +257,7 @@ public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange)
// After acquiring the starting and stopping offsets, remove the empty splits if necessary.
removeEmptySplits();

maybeLogSplitChangesHandlingResult(splitsChange);
maybeLogSplitChangesHandlingResult(splitsAddition);
}

@Override
Expand Down
Loading

0 comments on commit c7faaca

Please sign in to comment.