KafkaMicroBatchReader
is a MicroBatchReader for kafka data source in Micro-Batch Stream Processing.
KafkaMicroBatchReader
is created exclusively when KafkaSourceProvider
is requested to create a MicroBatchReader.
KafkaMicroBatchReader
uses the DataSourceOptions to access the kafkaConsumer.pollTimeoutMs option (default: spark.network.timeout
or 120s
).
KafkaMicroBatchReader
uses the DataSourceOptions to access the maxOffsetsPerTrigger option (default: (undefined)
).
KafkaMicroBatchReader
uses the Kafka properties for executors to create KafkaMicroBatchInputPartitions
when requested to planInputPartitions.
Name | Description |
---|---|
|
|
|
KafkaOffsetRangeCalculator (for the given DataSourceOptions) Used exclusively when |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
KafkaMicroBatchReader
takes the following to be created:
-
Desired starting KafkaOffsetRangeLimit
-
failOnDataLoss option
KafkaMicroBatchReader
initializes the internal registries and counters.
readSchema(): StructType
Note
|
readSchema is part of the DataSourceReader contract to…FIXME.
|
readSchema
simply returns the predefined fixed schema.
stop(): Unit
Note
|
stop is part of the BaseStreamingSource Contract to stop a streaming reader.
|
stop
simply requests the KafkaOffsetReader to close.
planInputPartitions(): java.util.List[InputPartition[InternalRow]]
Note
|
planInputPartitions is part of the DataSourceReader contract to…FIXME.
|
planInputPartitions
first finds the new partitions (TopicPartitions
that are in the endPartitionOffsets but not in the startPartitionOffsets) and requests the KafkaOffsetReader to
fetch their earliest offsets.
planInputPartitions
prints out the following INFO message to the logs:
Partitions added: [newPartitionInitialOffsets]
planInputPartitions
then prints out the following DEBUG message to the logs:
TopicPartitions: [comma-separated list of TopicPartitions]
planInputPartitions
requests the KafkaOffsetRangeCalculator for offset ranges (given the startPartitionOffsets and the newly-calculated newPartitionInitialOffsets
as the fromOffsets
, the endPartitionOffsets as the untilOffsets
, and the available executors (sorted in descending order)).
In the end, planInputPartitions
creates a KafkaMicroBatchInputPartition
for every offset range (with the Kafka properties for executors, the pollTimeoutMs, the failOnDataLoss flag and whether to reuse a Kafka consumer among Spark tasks).
Note
|
A KafkaMicroBatchInputPartition uses a shared Kafka consumer only when all the offset ranges have distinct TopicPartitions , so concurrent tasks (of a stage in a Spark job) will not interfere and read the same TopicPartitions .
|
planInputPartitions
reports data loss when…FIXME
Available Executors in Spark Cluster (Sorted By Host and Executor ID in Descending Order) — getSortedExecutorList
Internal Method
getSortedExecutorList(): Array[String]
getSortedExecutorList
requests the BlockManager
to request the BlockManagerMaster
to get the peers (the other nodes in a Spark cluster), creates a ExecutorCacheTaskLocation
for every pair of host and executor ID, and in the end, sort it in descending order.
Note
|
getSortedExecutorList is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions (and calculates offset ranges).
|