Kafka Data Source is the streaming data source for Apache Kafka in Spark Structured Streaming.
Kafka Data Source can be used as a streaming source and a streaming sink in micro-batch and continuous stream processing.
Kafka Data Source is part of the spark-sql-kafka-0-10 external module that is distributed with the official distribution of Apache Spark, but it is not included in the CLASSPATH by default.
You should define spark-sql-kafka-0-10
module as part of the build definition in your Spark project, e.g. as a libraryDependency
in build.sbt
for sbt:
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "{{ book.version }}"
For Spark environments like spark-submit
(and "derivatives" like spark-shell
), you should use --packages
command-line option:
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:{{ book.version }}
Note
|
Replace the version of spark-sql-kafka-0-10 module (e.g. {{ book.version }} above) with one of the available versions found at The Central Repository’s Search that matches your version of Apache Spark.
|
With spark-sql-kafka-0-10 module you can use kafka data source format for reading records from one or more Kafka topics as a streaming Dataset.
val records = spark
.readStream
.format("kafka")
.option("subscribePattern", """topic-\d{2}""") // topics with two digits at the end
.option("kafka.bootstrap.servers", ":9092")
.load
Internally, the kafka data source format for reading is available through KafkaSourceProvider that is a MicroBatchReadSupport and ContinuousReadSupport for micro-batch and continuous stream processing, respectively.
Kafka Data Source uses a predefined (fixed) schema.
Name | Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scala> records.printSchema
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Internally, the fixed schema is defined as part of the DataSourceReader
contract through MicroBatchReader and ContinuousReader extension contracts for micro-batch and continuous stream processing, respectively.
Tip
|
Read up on DataSourceReader in The Internals of Spark SQL book. |
Tip
|
Use scala> :type records
org.apache.spark.sql.DataFrame
val values = records
.select($"value" cast "string") // deserializing values
scala> values.printSchema
root
|-- value: string (nullable = true) |
With spark-sql-kafka-0-10 module you can use kafka data source format for writing the result of executing a streaming query (a streaming Dataset) to one or more Kafka topics.
val sq = records
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", ":9092")
.option("topic", "kafka2console-output")
.option("checkpointLocation", "checkpointLocation-kafka2console")
.start
Internally, the kafka data source format for writing is available through KafkaSourceProvider that is a StreamWriteSupport.
Kafka Data Source supports Micro-Batch Stream Processing (i.e. Trigger.Once and Trigger.ProcessingTime triggers) via KafkaMicroBatchReader.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
.readStream
.format("kafka")
.option("subscribepattern", "kafka2console.*")
.option("kafka.bootstrap.servers", ":9092")
.load
.withColumn("value", $"value" cast "string") // deserializing values
.writeStream
.format("console")
.option("truncate", false) // format-specific option
.option("checkpointLocation", "checkpointLocation-kafka2console") // generic query option
.trigger(Trigger.ProcessingTime(30.seconds))
.queryName("kafka2console-microbatch")
.start
// In the end, stop the streaming query
sq.stop
Kafka Data Source can assign a single task per Kafka partition (using KafkaOffsetRangeCalculator in Micro-Batch Stream Processing).
Kafka Data Source can reuse a Kafka consumer (using KafkaMicroBatchReader in Micro-Batch Stream Processing).
Kafka Data Source supports Continuous Stream Processing (i.e. Trigger.Continuous trigger) via KafkaContinuousReader.
import org.apache.spark.sql.streaming.Trigger
import scala.concurrent.duration._
val sq = spark
.readStream
.format("kafka")
.option("subscribepattern", "kafka2console.*")
.option("kafka.bootstrap.servers", ":9092")
.load
.withColumn("value", $"value" cast "string") // convert bytes to string for display purposes
.writeStream
.format("console")
.option("truncate", false) // format-specific option
.option("checkpointLocation", "checkpointLocation-kafka2console") // generic query option
.queryName("kafka2console-continuous")
.trigger(Trigger.Continuous(10.seconds))
.start
// In the end, stop the streaming query
sq.stop
Note
|
Options with kafka. prefix (e.g. kafka.bootstrap.servers) are considered configuration properties for the Kafka consumers used on the driver and executors. |
Option | Description | ||||
---|---|---|---|---|---|
|
Topic subscription strategy that accepts a JSON with topic names and partitions, e.g.
|
||||
|
Default: Used when |
||||
|
|||||
|
|||||
|
Default: Unless defined, |
||||
|
Default: Must be undefined (default) or greater than When undefined (default) or smaller than the number of |
||||
|
Default: Possible values:
|
||||
|
Topic subscription strategy that accepts topic names as a comma-separated string, e.g.
|
||||
|
Topic subscription strategy that uses Java’s java.util.regex.Pattern for the topic subscription regex pattern of topics to subscribe to, e.g.
|
||||
|
When DataStreamReader
is requested to load a dataset with kafka data source format, it creates a DataFrame with a StreamingRelationV2 leaf logical operator.
scala> records.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@1a366d0, kafka, Map(maxOffsetsPerTrigger -> 1, startingOffsets -> latest, subscribepattern -> topic\d, kafka.bootstrap.servers -> :9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@39b3de87,kafka,List(),None,List(),None,Map(maxOffsetsPerTrigger -> 1, startingOffsets -> latest, subscribepattern -> topic\d, kafka.bootstrap.servers -> :9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
...
When DataStreamWriter
is requested to start a streaming query with kafka data source format for writing, it requests the StreamingQueryManager
to create a streaming query that in turn creates (a StreamingQueryWrapper with) a ContinuousExecution or a MicroBatchExecution for continuous and micro-batch stream processing, respectively.
scala> sq.explain(extended = true)
== Parsed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@bf98b73
+- Project [key#28 AS key#7, value#29 AS value#8, topic#30 AS topic#9, partition#31 AS partition#10, offset#32L AS offset#11L, timestamp#33 AS timestamp#12, timestampType#34 AS timestampType#13]
+- Streaming RelationV2 kafka[key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34] (Options: [subscribepattern=kafka2console.*,kafka.bootstrap.servers=:9092])