Skip to content

Latest commit

 

History

History
57 lines (38 loc) · 2.88 KB

spark-sql-streaming-KafkaOffsetRangeCalculator.adoc

File metadata and controls

57 lines (38 loc) · 2.88 KB

KafkaOffsetRangeCalculator

KafkaOffsetRangeCalculator is created when KafkaMicroBatchReader is created with the only purpose of calculating offset ranges (when KafkaMicroBatchReader is requested to planInputPartitions).

KafkaOffsetRangeCalculator takes an optional minimum number of partitions per executor (minPartitions) to be created (that can either be undefined or greater than 0).

When created with a DataSourceOptions, KafkaOffsetRangeCalculator uses minPartitions option for the minimum number of partitions per executor.

Calculating Offset Ranges — getRanges Method

getRanges(
  fromOffsets: PartitionOffsetMap,
  untilOffsets: PartitionOffsetMap,
  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange]

getRanges finds the common TopicPartitions (i.e. the TopicPartitions included in the given untilOffsets and fromOffsets).

For every TopicPartition, getRanges creates a KafkaOffsetRange (with the preferredLoc undefined). getRanges filters out the TopicPartitions that have no records to consume.

At this point, getRanges knows the TopicPartitions with records to consume.

For the minPartitions undefined or smaller than the number of KafkaOffsetRanges (i.e. TopicPartitions to consume records from), getRanges finds the preferred executor for a TopicPartition (and the given executorLocations).

Otherwise (with the minPartitions defined and greater than the number of KafkaOffsetRanges), getRanges…​FIXME

Note
getRanges is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions.

KafkaOffsetRange Class

KafkaOffsetRange is a case class with the following attributes:

  • TopicPartition

  • fromOffset offset

  • untilOffset offset

  • Optional preferred location

KafkaOffsetRange knows the size, i.e. the number of records between the untilOffset and fromOffset offsets.

Selecting Preferred Executor for TopicPartition — getLocation Internal Method

getLocation(
  tp: TopicPartition,
  executorLocations: Seq[String]): Option[String]

getLocation…​FIXME

Note
getLocation is used exclusively when KafkaOffsetRangeCalculator is requested to calculate offset ranges.