Skip to content

Latest commit

 

History

History
214 lines (150 loc) · 13.1 KB

spark-sql-streaming-FlatMapGroupsWithStateExec.adoc

File metadata and controls

214 lines (150 loc) · 13.1 KB

FlatMapGroupsWithStateExec Unary Physical Operator

FlatMapGroupsWithStateExec is a unary physical operator that represents FlatMapGroupsWithState logical operator at execution time.

Note

A unary physical operator (UnaryExecNode) is a physical operator with a single child physical operator.

Read up on UnaryExecNode (and physical operators in general) in The Internals of Spark SQL book.

Note
FlatMapGroupsWithState unary logical operator represents KeyValueGroupedDataset.mapGroupsWithState and KeyValueGroupedDataset.flatMapGroupsWithState operators in a logical query plan.

FlatMapGroupsWithStateExec is created exclusively when FlatMapGroupsWithStateStrategy execution planning strategy is requested to plan a streaming query with FlatMapGroupsWithState logical operators for execution.

FlatMapGroupsWithStateExec is a stateful physical operator that can write to a state store (and MicroBatchExecution requests whether to run another batch or not based on the GroupStateTimeout).

FlatMapGroupsWithStateExec is an ObjectProducerExec physical operator with the output object attribute.

import java.sql.Timestamp
import org.apache.spark.sql.streaming.GroupState
val stateFunc = (key: Long, values: Iterator[(Timestamp, Long)], state: GroupState[Long]) => {
  Iterator((key, values.size))
}
import java.sql.Timestamp
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
val rateGroups = spark.
  readStream.
  format("rate").
  load.
  withWatermark(eventTime = "timestamp", delayThreshold = "10 seconds").  // required for EventTimeTimeout
  as[(Timestamp, Long)].  // leave DataFrame for Dataset
  groupByKey { case (time, value) => value % 2 }. // creates two groups
  flatMapGroupsWithState(OutputMode.Update, GroupStateTimeout.EventTimeTimeout)(stateFunc)  // EventTimeTimeout requires watermark (defined above)

// Check out the physical plan with FlatMapGroupsWithStateExec
scala> rateGroups.explain
== Physical Plan ==
*SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1 AS _1#35L, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#36]
+- FlatMapGroupsWithState <function3>, value#30: bigint, newInstance(class scala.Tuple2), [value#30L], [timestamp#20-T10000ms, value#21L], obj#34: scala.Tuple2, StatefulOperatorStateInfo(<unknown>,63491721-8724-4631-b6bc-3bb1edeb4baf,0,0), class[value[0]: bigint], Update, EventTimeTimeout, 0, 0
   +- *Sort [value#30L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#30L, 200)
         +- AppendColumns <function1>, newInstance(class scala.Tuple2), [input[0, bigint, false] AS value#30L]
            +- EventTimeWatermark timestamp#20: timestamp, interval 10 seconds
               +- StreamingRelation rate, [timestamp#20, value#21L]

// Execute the streaming query
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val sq = rateGroups.
  writeStream.
  format("console").
  trigger(Trigger.ProcessingTime(10.seconds)).
  outputMode(OutputMode.Update).  // Append is not supported
  start

// Eventually...
sq.stop

FlatMapGroupsWithStateExec uses the performance metrics of StateStoreWriter.

FlatMapGroupsWithStateExec webui query details
Figure 1. FlatMapGroupsWithStateExec in web UI (Details for Query)
Tip

Enable ALL logging level for org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec=ALL

Refer to Logging.

FlatMapGroupsWithStateExec as StateStoreWriter

FlatMapGroupsWithStateExec uses the GroupStateTimeout (and possibly the updated metadata) when asked whether to run another batch or not (when MicroBatchExecution is requested to construct the next streaming micro-batch when requested to run the activated streaming query).

FlatMapGroupsWithStateExec with Streaming Event-Time Watermark Support (WatermarkSupport)

FlatMapGroupsWithStateExec is given the optional event time watermark when created.

The event-time watermark is initially undefined (None) when planned to for execution (in FlatMapGroupsWithStateStrategy execution planning strategy).

Note

FlatMapGroupsWithStateStrategy converts FlatMapGroupsWithState unary logical operator to FlatMapGroupsWithStateExec physical operator with undefined StatefulOperatorStateInfo, batchTimestampMs, and eventTimeWatermark.

The event-time watermark (with the StatefulOperatorStateInfo and the batchTimestampMs) is only defined to the current event-time watermark of the given OffsetSeqMetadata when IncrementalExecution query execution pipeline is requested to apply the state preparation rule (as part of the preparations rules).

Note

The preparations rules are executed (applied to a physical query plan) at the executedPlan phase of Structured Query Execution Pipeline to generate an optimized physical query plan ready for execution).

IncrementalExecution is used as the lastExecution of the available streaming query execution engines. It is created in the queryPlanning phase (of the MicroBatchExecution and ContinuousExecution execution engines) based on the current OffsetSeqMetadata.

Note
The optional event-time watermark can only be defined when the state preparation rule is executed which is at the executedPlan phase of Structured Query Execution Pipeline which is also part of the queryPlanning phase.

keyExpressions Method

keyExpressions: Seq[Attribute]
Note
keyExpressions is part of the WatermarkSupport Contract to…​FIXME.

keyExpressions simply returns the grouping attributes.

Executing Physical Operator (Generating RDD[InternalRow]) — doExecute Method

doExecute(): RDD[InternalRow]
Note
doExecute is part of SparkPlan Contract to generate the runtime representation of an physical operator as a distributed computation over internal binary rows on Apache Spark (i.e. RDD[InternalRow]).

Internally, doExecute initializes metrics.

doExecute then executes child physical operator and creates a StateStoreRDD with storeUpdateFunction that:

  1. Creates a StateStoreUpdater

  2. Filters out rows from Iterator[InternalRow] that match watermarkPredicateForData (when defined and timeoutConf is EventTimeTimeout)

  3. Generates an output Iterator[InternalRow] with elements from StateStoreUpdater's updateStateForKeysWithData and updateStateForTimedOutKeys

  4. In the end, storeUpdateFunction creates a CompletionIterator that executes a completion function (aka completionFunction) after it has successfully iterated through all the elements (i.e. when a client has consumed all the rows). The completion method requests StateStore to commit followed by updating numTotalStateRows metric with the number of keys in the state store.

Creating FlatMapGroupsWithStateExec Instance

FlatMapGroupsWithStateExec takes the following to be created:

  • State function ((Any, Iterator[Any], LogicalGroupState[Any]) ⇒ Iterator[Any])

  • Key deserializer expression

  • Value deserializer expression

  • Grouping attributes (as used for grouping in KeyValueGroupedDataset for mapGroupsWithState or flatMapGroupsWithState operators)

  • Data attributes

  • Output object attribute (that is the reference to the single object field this operator outputs)

  • StatefulOperatorStateInfo

  • State encoder (ExpressionEncoder[Any])

  • State format version

  • OutputMode

  • GroupStateTimeout

  • batchTimestampMs

  • Optional event time watermark

  • Child physical operator

FlatMapGroupsWithStateExec initializes the internal properties.

shouldRunAnotherBatch Method

shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean
Note
shouldRunAnotherBatch is part of the StateStoreWriter Contract to check whether MicroBatchExecution should run another batch (based on the updated metadata).

shouldRunAnotherBatch branches off per the GroupStateTimeout as follows:

Internal Properties

Name Description

isTimeoutEnabled

stateAttributes

stateDeserializer

stateManager

stateSerializer

timestampTimeoutAttribute

watermarkPresent

Flag that says whether the child physical operator has a watermark attribute (among the output attributes).

Used exclusively when InputProcessor is requested to callFunctionAndUpdateState