Caution
|
FIXME |
DataFrameWriter
is a part of Structured Streaming API.
val df: DataFrame = ...
import org.apache.spark.sql.streaming.ProcessingTime
import scala.concurrent.duration._
df.writeStream
.queryName("textStream")
.trigger(ProcessingTime(10.seconds))
.format("console")
.start
-
startStream to start a continuous write.
DataFrameWriter
comes with two startStream
methods to return a StreamingQuery object to continually write data.
startStream(): StreamingQuery
startStream(path: String): StreamingQuery // (1)
-
Sets
path
option topath
and callsstartStream()
Note
|
startStream uses StreamingQueryManager.startQuery to create StreamingQuery.
|
Note
|
Whether or not you have to specify path option depends on the DataSource in use.
|
Recognized options:
-
queryName
is the name of active streaming query. -
checkpointLocation
is the directory for checkpointing.
Note
|
It is a new feature of Spark 2.0.0. |
outputMode(outputMode: OutputMode): DataStreamWriter[T]
outputMode
specifies output mode of a streaming Dataset which is what gets written to a streaming sink when there is a new data available.
Currently, the following output modes are supported:
-
OutputMode.Append
— only the new rows in the streaming dataset will be written to a sink. -
OutputMode.Complete
— entire streaming dataset (with all the rows) will be written to a sink every time there are updates. It is supported only for streaming queries with aggregations.
queryName(queryName: String): DataStreamWriter[T]
queryName
sets the name of a streaming query.
Internally, it is just an additional option with the key queryName
.
trigger(trigger: Trigger): DataStreamWriter[T]
trigger
method sets the time interval of the trigger (batch) for a streaming query.
Note
|
Trigger specifies how often results should be produced by a StreamingQuery. See Trigger.
|
The default trigger is ProcessingTime(0L) that runs a streaming query as often as possible.
Tip
|
Consult Trigger to learn about Trigger and ProcessingTime types.
|