Skip to content

Latest commit

 

History

History
632 lines (482 loc) · 24.7 KB

spark-sql-streaming-KafkaSource.adoc

File metadata and controls

632 lines (482 loc) · 24.7 KB

KafkaSource

Note
Kafka topics are checked for new records every trigger and so there is some noticeable delay between when the records have arrived to Kafka topics and when a Spark application processes them.
Note

Structured Streaming support for Kafka is in a separate spark-sql-kafka-0-10 module (aka library dependency).

spark-sql-kafka-0-10 module is not included by default so you have to start spark-submit (and "derivatives" like spark-shell) with --packages command-line option to "install" it.

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

Replace the version of spark-sql-kafka-0-10 module (e.g. 2.2.0 above) with one of the available versions found at The Central Repository’s Search that matches your version of Spark.

KafkaSource is created for kafka format (that is registered by KafkaSourceProvider).

val kafkaSource = spark.
  readStream.
  format("kafka"). // <-- use KafkaSource
  option("subscribe", "input").
  option("kafka.bootstrap.servers", "localhost:9092").
  load
KafkaSource creating instance
Figure 1. KafkaSource Is Created for kafka Format by KafkaSourceProvider
Table 1. KafkaSource’s Options
Name Default Value Description

kafkaConsumer.pollTimeoutMs

maxOffsetsPerTrigger

(empty)

Number of records to fetch per trigger.

Note
Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.

Unless defined, KafkaSource requests KafkaOffsetReader for the latest offsets.

startingoffsets

Possible values:

  • latest

  • earliest

  • JSON with topics, partitions and their offsets, e.g.

    {"topicA":{"part":offset,"p1":-1},"topicB":{"0":-2}}
Tip

Use Scala’s tripple quotes for the JSON for topics, partitions and offsets.

option(
  "startingoffsets",
  """{"topic1":{"0":5,"4":-1},"topic2":{"0":-2}}""")

assign

Topic subscription strategy that accepts a JSON with topic names and partitions, e.g.

{"topicA":[0,1],"topicB":[0,1]}
Note
Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).

subscribe

Topic subscription strategy that accepts topic names as a comma-separated string, e.g.

topic1,topic2,topic3
Note
Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).

subscribepattern

Topic subscription strategy that uses Java’s java.util.regex.Pattern for the topic subscription regex pattern of topics to subscribe to, e.g.

topic\d
Tip

Use Scala’s tripple quotes for the regular expression for topic subscription regex pattern.

option("subscribepattern", """topic\d""")
Note
Exactly one topic subscription strategy is allowed (that KafkaSourceProvider validates before creating KafkaSource).
/**
  ./bin/kafka-console-producer.sh \
    --topic topic1 \
    --broker-list localhost:9092 \
    --property parse.key=true \
    --property key.separator=,
*/
// Extract
val records = spark.
  readStream.
  format("kafka").
  option("subscribepattern", """topic\d"""). // <-- topics with a digit at the end
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingoffsets", "latest").
  option("maxOffsetsPerTrigger", 1).
  load
// Transform
val result = records.
  select(
    $"key" cast "string",   // deserialize keys
    $"value" cast "string", // deserialize values
    $"topic",
    $"partition",
    $"offset")
// Load
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = result.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Append).
  queryName("from-kafka-to-console").
  start

// In the end, stop the streaming query
sq.stop

KafkaSource uses a predefined fixed schema (and cannot be changed).

scala> records.printSchema
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
Table 2. KafkaSource’s Dataset Schema (in the positional order)
Name Type

key

BinaryType

value

BinaryType

topic

StringType

partition

IntegerType

offset

LongType

timestamp

TimestampType

timestampType

IntegerType

Tip

Use cast method (of Column) to cast BinaryType to a string (for key and value columns).

$"value" cast "string"

KafkaSource also supports batch Datasets.

val topic1 = spark
  .read // <-- read one batch only
  .format("kafka")
  .option("subscribe", "topic1")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load
scala> topic1.printSchema
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
Table 3. KafkaSource’s Internal Registries and Counters
Name Description

currentPartitionOffsets

Current partition offsets (as Map[TopicPartition, Long])

Initially NONE and set when KafkaSource is requested to get the maximum available offsets or generate a DataFrame with records from Kafka for a batch.

initialPartitionOffsets

Initial partition offsets (as Map[TopicPartition, Long])

Set when KafkaSource is first requested to get the available offsets (from metadata log or Kafka directly).

Used when KafkaSource is requested to generate a DataFrame with records from Kafka for a streaming batch (when the start offsets are not defined, i.e. before StreamExecution commits the first streaming batch and so nothing is in committedOffsets registry for a KafkaSource data source yet).

While being initialized, initialPartitionOffsets creates a custom HDFSMetadataLog (with KafkaSourceOffset) and gets the 0th batch’s metadata (as KafkaSourceOffset) if available.

Note
initialPartitionOffsets uses a HDFSMetadataLog with custom serialize and deserialize methods to write to and read serialized metadata from the log.

Otherwise, if the 0th batch’s metadata is not available, initialPartitionOffsets uses KafkaOffsetReader to fetch offsets per KafkaOffsetRangeLimit input parameter.

initialPartitionOffsets adds the offsets to the the metadata log as 0th batch.

Note
The 0th batch is persisted in the streaming metadata log unless stored already.

You should see the following INFO message in the logs:

INFO KafkaSource: Initial offsets: [offsets]
Tip

Enable INFO or DEBUG logging levels for org.apache.spark.sql.kafka010.KafkaSource to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaSource=DEBUG

Refer to Logging.

rateLimit Internal Method

rateLimit(
  limit: Long,
  from: Map[TopicPartition, Long],
  until: Map[TopicPartition, Long]): Map[TopicPartition, Long]

rateLimit requests KafkaOffsetReader to fetchEarliestOffsets.

Caution
FIXME
Note
rateLimit is used exclusively when KafkaSource gets available offsets (when maxOffsetsPerTrigger option is specified).

getSortedExecutorList Method

Caution
FIXME

reportDataLoss Internal Method

Caution
FIXME
Note

reportDataLoss is used when KafkaSource does the following:

Generating DataFrame with Records From Kafka for Streaming Batch — getBatch Method

getBatch(start: Option[Offset], end: Offset): DataFrame
Note
getBatch is a part of Source Contract.

getBatch initializes initial partition offsets (unless initialized already).

You should see the following INFO message in the logs:

INFO KafkaSource: GetBatch called with start = [start], end = [end]

getBatch requests KafkaSourceOffset for end partition offsets for the input end offset (known as untilPartitionOffsets).

getBatch requests KafkaSourceOffset for start partition offsets for the input start offset (if defined) or uses initial partition offsets (known as fromPartitionOffsets).

getBatch finds the new partitions (as the difference between the topic partitions in untilPartitionOffsets and fromPartitionOffsets) and requests KafkaOffsetReader to fetch their earliest offsets.

getBatch reports a data loss if the new partitions don’t match to what KafkaOffsetReader fetched.

Cannot find earliest offsets of [partitions]. Some data may have been missed

You should see the following INFO message in the logs:

INFO KafkaSource: Partitions added: [partitionOffsets]

getBatch reports a data loss if the new partitions don’t have their offsets 0.

Added partition [partition] starts from [offset] instead of 0. Some data may have been missed

getBatch reports a data loss if the fromPartitionOffsets partitions differ from untilPartitionOffsets partitions.

[partitions] are gone. Some data may have been missed

You should see the following DEBUG message in the logs:

DEBUG KafkaSource: TopicPartitions: [comma-separated topicPartitions]

getBatch gets the executors (sorted by executorId and host of the registered block managers).

Important
That is when getBatch goes very low-level to allow for cached KafkaConsumers in the executors to be re-used to read the same partition in every batch (aka location preference).

You should see the following DEBUG message in the logs:

DEBUG KafkaSource: Sorted executors: [comma-separated sortedExecutors]

getBatch creates a KafkaSourceRDDOffsetRange per TopicPartition.

getBatch filters out KafkaSourceRDDOffsetRanges for which until offsets are smaller than from offsets. getBatch reports a data loss if they are found.

Partition [topicPartition]'s offset was changed from [fromOffset] to [untilOffset], some data may have been missed

getBatch creates a KafkaSourceRDD (with executorKafkaParams, pollTimeoutMs and reuseKafkaConsumer flag enabled) and maps it to an RDD of InternalRow.

Important
getBatch creates a KafkaSourceRDD with reuseKafkaConsumer flag enabled.

You should see the following INFO message in the logs:

INFO KafkaSource: GetBatch generating RDD of offset range: [comma-separated offsetRanges sorted by topicPartition]

getBatch sets currentPartitionOffsets if it was empty (which is when…​FIXME)

In the end, getBatch creates a DataFrame from the RDD of InternalRow and schema.

Fetching Offsets (From Metadata Log or Kafka Directly) — getOffset Method

getOffset: Option[Offset]
Note
getOffset is a part of the Source Contract.

Internally, getOffset fetches the initial partition offsets (from the metadata log or Kafka directly).

KafkaSource initialPartitionOffsets
Figure 2. KafkaSource Initializing initialPartitionOffsets While Fetching Initial Offsets
Note
initialPartitionOffsets is a lazy value and is initialized the very first time getOffset is called (which is when StreamExecution constructs a streaming batch).
scala> spark.version
res0: String = 2.3.0-SNAPSHOT

// Case 1: Checkpoint directory undefined
// initialPartitionOffsets read from Kafka directly
val records = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load
// Start the streaming query
// dump records to the console every 10 seconds
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val q = records.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Update).
  start
// Note the temporary checkpoint directory
17/08/07 11:09:29 INFO StreamExecution: Starting [id = 75dd261d-6b62-40fc-a368-9d95d3cb6f5f, runId = f18a5eb5-ccab-4d9d-8a81-befed41a72bd] with file:///private/var/folders/0w/kb0d3rqn4zb9fcc91pxhgn8w0000gn/T/temporary-d0055630-24e4-4d9a-8f36-7a12a0f11bc0 to store the query checkpoint.
...
INFO KafkaSource: Initial offsets: {"topic1":{"0":1}}

// Stop the streaming query
q.stop

// Case 2: Checkpoint directory defined
// initialPartitionOffsets read from Kafka directly
// since the checkpoint directory is not available yet
// it will be the next time the query is started
val records = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  select($"value" cast "string", $"topic", $"partition", $"offset")
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val q = records.
  writeStream.
  format("console").
  option("truncate", false).
  option("checkpointLocation", "/tmp/checkpoint"). // <-- checkpoint directory
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Update).
  start
// Note the checkpoint directory in use
17/08/07 11:21:25 INFO StreamExecution: Starting [id = b8f59854-61c1-4c2f-931d-62bbaf90ee3b, runId = 70d06a3b-f2b1-4fa8-a518-15df4cf59130] with file:///tmp/checkpoint to store the query checkpoint.
...
INFO KafkaSource: Initial offsets: {"topic1":{"0":1}}
...
INFO StreamExecution: Stored offsets for batch 0. Metadata OffsetSeqMetadata(0,1502098526848,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))

// Review the checkpoint location
// $ ls -ltr /tmp/checkpoint/offsets
// total 8
// -rw-r--r--  1 jacek  wheel  248  7 sie 11:21 0
// $ tail -2 /tmp/checkpoint/offsets/0 | jq

// Produce messages to Kafka so the latest offset changes
// And more importanly the offset gets stored to checkpoint location
-------------------------------------------
Batch: 1
-------------------------------------------
+---------------------------+------+---------+------+
|value                      |topic |partition|offset|
+---------------------------+------+---------+------+
|testing checkpoint location|topic1|0        |2     |
+---------------------------+------+---------+------+

// and one more
// Note the offset
-------------------------------------------
Batch: 2
-------------------------------------------
+------------+------+---------+------+
|value       |topic |partition|offset|
+------------+------+---------+------+
|another test|topic1|0        |3     |
+------------+------+---------+------+

// See what was checkpointed
// $ ls -ltr /tmp/checkpoint/offsets
// total 24
// -rw-r--r--  1 jacek  wheel  248  7 sie 11:35 0
// -rw-r--r--  1 jacek  wheel  248  7 sie 11:37 1
// -rw-r--r--  1 jacek  wheel  248  7 sie 11:38 2
// $ tail -2 /tmp/checkpoint/offsets/2 | jq

// Stop the streaming query
q.stop

// And start over to see what offset the query starts from
// Checkpoint location should have the offsets
val q = records.
  writeStream.
  format("console").
  option("truncate", false).
  option("checkpointLocation", "/tmp/checkpoint"). // <-- checkpoint directory
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Update).
  start
// Whoops...console format does not support recovery (!)
// Reported as https://issues.apache.org/jira/browse/SPARK-21667
org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete /tmp/checkpoint/offsets to start over.;
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
  ... 61 elided

// Change the sink (= output format) to JSON
val q = records.
  writeStream.
  format("json").
  option("path", "/tmp/json-sink").
  option("checkpointLocation", "/tmp/checkpoint"). // <-- checkpoint directory
  trigger(Trigger.ProcessingTime(10.seconds)).
  start
// Note the checkpoint directory in use
17/08/07 12:09:02 INFO StreamExecution: Starting [id = 02e00924-5f0d-4501-bcb8-80be8a8be385, runId = 5eba2576-dad6-4f95-9031-e72514475edc] with file:///tmp/checkpoint to store the query checkpoint.
...
17/08/07 12:09:02 INFO KafkaSource: GetBatch called with start = Some({"topic1":{"0":3}}), end = {"topic1":{"0":4}}
17/08/07 12:09:02 INFO KafkaSource: Partitions added: Map()
17/08/07 12:09:02 DEBUG KafkaSource: TopicPartitions: topic1-0
17/08/07 12:09:02 DEBUG KafkaSource: Sorted executors:
17/08/07 12:09:02 INFO KafkaSource: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(topic1-0,3,4,None)
17/08/07 12:09:03 DEBUG KafkaOffsetReader: Partitions assigned to consumer: [topic1-0]. Seeking to the end.
17/08/07 12:09:03 DEBUG KafkaOffsetReader: Got latest offsets for partition : Map(topic1-0 -> 4)
17/08/07 12:09:03 DEBUG KafkaSource: GetOffset: ArrayBuffer((topic1-0,4))
17/08/07 12:09:03 DEBUG StreamExecution: getOffset took 122 ms
17/08/07 12:09:03 DEBUG StreamExecution: Resuming at batch 3 with committed offsets {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} and available offsets {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}}
17/08/07 12:09:03 DEBUG StreamExecution: Stream running from {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}} to {KafkaSource[Subscribe[topic1]]: {"topic1":{"0":4}}}

getOffset requests KafkaOffsetReader to fetchLatestOffsets (known later as latest).

Note
(Possible performance degradation?) It is possible that getOffset will request the latest offsets from Kafka twice, i.e. while initializing initialPartitionOffsets (when no metadata log is available and KafkaSource’s KafkaOffsetRangeLimit is LatestOffsetRangeLimit) and always as part of getOffset itself.

getOffset then calculates currentPartitionOffsets based on the maxOffsetsPerTrigger option.

Table 4. getOffset’s Offset Calculation per maxOffsetsPerTrigger
maxOffsetsPerTrigger Offsets

Unspecified (i.e. None)

latest

Defined (but currentPartitionOffsets is empty)

rateLimit with limit limit, initialPartitionOffsets as from, until as latest

Defined (and currentPartitionOffsets contains partitions and offsets)

rateLimit with limit limit, currentPartitionOffsets as from, until as latest

You should see the following DEBUG message in the logs:

DEBUG KafkaSource: GetOffset: [offsets]

In the end, getOffset creates a KafkaSourceOffset with offsets (as Map[TopicPartition, Long]).

Creating KafkaSource Instance

KafkaSource takes the following when created:

KafkaSource initializes the internal registries and counters.

Fetching and Verifying Specific Offsets — fetchAndVerify Internal Method

fetchAndVerify(specificOffsets: Map[TopicPartition, Long]): KafkaSourceOffset

fetchAndVerify requests KafkaOffsetReader to fetchSpecificOffsets for the given specificOffsets.

fetchAndVerify makes sure that the starting offsets in specificOffsets are the same as in Kafka and reports a data loss otherwise.

startingOffsets for [tp] was [off] but consumer reset to [result(tp)]

In the end, fetchAndVerify creates a KafkaSourceOffset (with the result of KafkaOffsetReader).

Note
fetchAndVerify is used exclusively when KafkaSource initializes initial partition offsets.