Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourc
Method <org.apache.flink.connector.kafka.dynamic.source.reader.DynamicKafkaSourceReader.syncAvailabilityHelperWithReaders()> calls method <org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper.anyOf(int, java.util.concurrent.CompletableFuture)> in (DynamicKafkaSourceReader.java:500)
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getProducerPool()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
Method <org.apache.flink.connector.kafka.sink.ExactlyOnceKafkaWriter.getTransactionalIdPrefix()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (ExactlyOnceKafkaWriter.java:0)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:178)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:181)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:177)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:180)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:177)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getCoLocationGroupKey()> in (KafkaSink.java:183)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getInputs()> in (KafkaSink.java:186)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.getOutputType()> in (KafkaSink.java:182)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> calls method <org.apache.flink.api.dag.Transformation.setCoLocationGroupKey(java.lang.String)> in (KafkaSink.java:185)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> checks instanceof <org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo> in (KafkaSink.java:182)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.addPostCommitTopology(org.apache.flink.streaming.api.datastream.DataStream)> has generic parameter type <org.apache.flink.streaming.api.datastream.DataStream<org.apache.flink.streaming.api.connector.sink2.CommittableMessage<org.apache.flink.connector.kafka.sink.KafkaCommittable>>> with type argument depending on <org.apache.flink.streaming.api.connector.sink2.CommittableMessage> in (KafkaSink.java:0)
Method <org.apache.flink.connector.kafka.sink.KafkaSink.getKafkaProducerConfig()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSink.java:0)
Method <org.apache.flink.connector.kafka.sink.KafkaSinkBuilder.setRecordSerializer(org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema)> calls method <org.apache.flink.api.java.ClosureCleaner.clean(java.lang.Object, org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel, boolean)> in (KafkaSinkBuilder.java:154)
Expand All @@ -39,7 +39,9 @@ Method <org.apache.flink.connector.kafka.source.KafkaSource.createReader(org.apa
Method <org.apache.flink.connector.kafka.source.KafkaSource.getConfiguration()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
Method <org.apache.flink.connector.kafka.source.KafkaSource.getKafkaSubscriber()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
Method <org.apache.flink.connector.kafka.source.KafkaSource.getStoppingOffsetsInitializer()> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSource.java:0)
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeTopicPartitions(java.util.Collection)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV1(java.util.Collection)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV2(java.util.Collection, boolean)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer.serializeV3(org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumStateSerializer.java:0)
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.deepCopyProperties(java.util.Properties, java.util.Properties)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getPartitionChange(java.util.Set)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
Method <org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSplitOwner(org.apache.kafka.common.TopicPartition, int)> is annotated with <org.apache.flink.annotation.VisibleForTesting> in (KafkaSourceEnumerator.java:0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,13 @@
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus;
import org.apache.flink.connector.kafka.source.enumerator.SplitAndAssignmentStatus;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.Preconditions;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -174,8 +173,8 @@ public DynamicKafkaSourceEnumerator(
dynamicKafkaSourceEnumState.getClusterEnumeratorStates().entrySet()) {
this.latestClusterTopicsMap.put(
clusterEnumState.getKey(),
clusterEnumState.getValue().assignedPartitions().stream()
.map(TopicPartition::topic)
clusterEnumState.getValue().assignedSplits().stream()
.map(KafkaPartitionSplit::getTopic)
.collect(Collectors.toSet()));

createEnumeratorWithAssignedTopicPartitions(
Expand Down Expand Up @@ -291,9 +290,9 @@ private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams
final Set<String> activeTopics = activeClusterTopics.getValue();

// filter out removed topics
Set<TopicPartitionAndAssignmentStatus> partitions =
kafkaSourceEnumState.partitions().stream()
.filter(tp -> activeTopics.contains(tp.topicPartition().topic()))
Set<SplitAndAssignmentStatus> partitions =
kafkaSourceEnumState.splits().stream()
.filter(tp -> activeTopics.contains(tp.split().getTopic()))
.collect(Collectors.toSet());

newKafkaSourceEnumState =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@ public enum AssignmentStatus {

/** Partitions that have been assigned to readers. */
ASSIGNED(0),
/**
* The partitions that have been discovered during initialization but not assigned to readers
* yet.
*/
UNASSIGNED_INITIAL(1);
/** The partitions that have been discovered but not assigned to readers yet. */
UNASSIGNED(1);
private final int statusCode;

AssignmentStatus(int statusCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,76 +19,73 @@
package org.apache.flink.connector.kafka.source.enumerator;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;

import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;

/** The state of Kafka source enumerator. */
@Internal
public class KafkaSourceEnumState {
/** Partitions with status: ASSIGNED or UNASSIGNED_INITIAL. */
private final Set<TopicPartitionAndAssignmentStatus> partitions;
/** Splits with status: ASSIGNED or UNASSIGNED_INITIAL. */
private final Set<SplitAndAssignmentStatus> splits;
/**
* this flag will be marked as true if initial partitions are discovered after enumerator
* starts.
*/
private final boolean initialDiscoveryFinished;

public KafkaSourceEnumState(
Set<TopicPartitionAndAssignmentStatus> partitions, boolean initialDiscoveryFinished) {
this.partitions = partitions;
Set<SplitAndAssignmentStatus> splits, boolean initialDiscoveryFinished) {
this.splits = splits;
this.initialDiscoveryFinished = initialDiscoveryFinished;
}

public KafkaSourceEnumState(
Set<TopicPartition> assignPartitions,
Set<TopicPartition> unassignedInitialPartitions,
Collection<KafkaPartitionSplit> assignedSplits,
Collection<KafkaPartitionSplit> unassignedSplits,
boolean initialDiscoveryFinished) {
this.partitions = new HashSet<>();
partitions.addAll(
assignPartitions.stream()
this.splits = new HashSet<>();
splits.addAll(
assignedSplits.stream()
.map(
topicPartition ->
new TopicPartitionAndAssignmentStatus(
new SplitAndAssignmentStatus(
topicPartition, AssignmentStatus.ASSIGNED))
.collect(Collectors.toSet()));
partitions.addAll(
unassignedInitialPartitions.stream()
splits.addAll(
unassignedSplits.stream()
.map(
topicPartition ->
new TopicPartitionAndAssignmentStatus(
topicPartition,
AssignmentStatus.UNASSIGNED_INITIAL))
new SplitAndAssignmentStatus(
topicPartition, AssignmentStatus.UNASSIGNED))
.collect(Collectors.toSet()));
this.initialDiscoveryFinished = initialDiscoveryFinished;
}

public Set<TopicPartitionAndAssignmentStatus> partitions() {
return partitions;
public Set<SplitAndAssignmentStatus> splits() {
return splits;
}

public Set<TopicPartition> assignedPartitions() {
return filterPartitionsByAssignmentStatus(AssignmentStatus.ASSIGNED);
public Collection<KafkaPartitionSplit> assignedSplits() {
return filterByAssignmentStatus(AssignmentStatus.ASSIGNED);
}

public Set<TopicPartition> unassignedInitialPartitions() {
return filterPartitionsByAssignmentStatus(AssignmentStatus.UNASSIGNED_INITIAL);
public Collection<KafkaPartitionSplit> unassignedSplits() {
return filterByAssignmentStatus(AssignmentStatus.UNASSIGNED);
}

public boolean initialDiscoveryFinished() {
return initialDiscoveryFinished;
}

private Set<TopicPartition> filterPartitionsByAssignmentStatus(
private Collection<KafkaPartitionSplit> filterByAssignmentStatus(
AssignmentStatus assignmentStatus) {
return partitions.stream()
.filter(
partitionWithStatus ->
partitionWithStatus.assignmentStatus().equals(assignmentStatus))
.map(TopicPartitionAndAssignmentStatus::topicPartition)
.collect(Collectors.toSet());
return splits.stream()
.filter(split -> split.assignmentStatus().equals(assignmentStatus))
.map(SplitAndAssignmentStatus::split)
.collect(Collectors.toList());
}
}
Loading
Loading