diff --git a/docs/scheduler/ResultStage.md b/docs/scheduler/ResultStage.md index 0b77cf1efa..a0cb04e8b1 100644 --- a/docs/scheduler/ResultStage.md +++ b/docs/scheduler/ResultStage.md @@ -1,6 +1,6 @@ # ResultStage -`ResultStage` is the final stage in a job that applies a function on one or many partitions of the target RDD to compute the result of an action. +`ResultStage` is the final stage in a job that applies a function to one or many partitions of the target RDD to compute the result of an action. ![Job creates ResultStage as the first stage](../images/scheduler/dagscheduler-job-resultstage.png) diff --git a/docs/scheduler/ShuffleMapStage.md b/docs/scheduler/ShuffleMapStage.md index a8351c1d6b..bb7bc29f4a 100644 --- a/docs/scheduler/ShuffleMapStage.md +++ b/docs/scheduler/ShuffleMapStage.md @@ -1,15 +1,10 @@ # ShuffleMapStage -`ShuffleMapStage` is a [Stage](Stage.md). - -`ShuffleMapStage` (_shuffle map stage_ or simply _map stage_) is one of the two types of [Stage](Stage.md)s in a physical execution DAG (beside a [ResultStage](ResultStage.md)). - -!!! note - The **logical DAG** or **logical execution plan** is the [RDD lineage](../rdd/lineage.md). +`ShuffleMapStage` (_shuffle map stage_ or simply _map stage_) is a [Stage](Stage.md). `ShuffleMapStage` corresponds to (and is associated with) a [ShuffleDependency](#shuffleDep). -`ShuffleMapStage` can be [submitted independently](DAGScheduler.md#submitMapStage) (from a [ResultStage](ResultStage.md)). +`ShuffleMapStage` can be [submitted independently](DAGScheduler.md#submitMapStage) but it is usually an intermediate step in a physical execution plan (with the final step being a [ResultStage](ResultStage.md)). ## Creating Instance @@ -140,3 +135,7 @@ rdd.count // (3) 1. Shuffle at `sortByKey()` 1. Submits a job with two stages (and two to be executed) 1. Intentionally repeat the last action that submits a new job with two stages with one being shared as already-computed + +## Map Output Files + +`ShuffleMapStage` writes out **map output files** (for a shuffle). diff --git a/docs/scheduler/Stage.md b/docs/scheduler/Stage.md index 389920c712..6fb2d9bba7 100644 --- a/docs/scheduler/Stage.md +++ b/docs/scheduler/Stage.md @@ -1,78 +1,60 @@ # Stage -`Stage` is a unit of execution (_step_) in a physical execution plan. +`Stage` is an [abstraction](#contract) of [steps](#implementations) in a physical execution plan. -A stage is a set of parallel tasks -- one task per partition (of an RDD that computes partial results of a function executed as part of a Spark job). - -![Stage, tasks and submitting a job](../images/scheduler/stage-tasks.png) - -In other words, a Spark job is a computation with that computation sliced into stages. - -A stage is uniquely identified by `id`. When a stage is created, DAGScheduler.md[DAGScheduler] increments internal counter `nextStageId` to track the number of DAGScheduler.md#submitStage[stage submissions]. - -[[rdd]] -A stage can only work on the partitions of a single RDD (identified by `rdd`), but can be associated with many other dependent parent stages (via internal field `parents`), with the boundary of a stage marked by shuffle dependencies. - -Submitting a stage can therefore trigger execution of a series of dependent parent stages (refer to DAGScheduler.md#runJob[RDDs, Job Execution, Stages, and Partitions]). - -![Submitting a job triggers execution of the stage and its parent stages](../images/scheduler/job-stage.png) - -Finally, every stage has a `firstJobId` that is the id of the job that submitted the stage. - -There are two types of stages: - -* ShuffleMapStage.md[ShuffleMapStage] is an intermediate stage (in the execution DAG) that produces data for other stage(s). It writes *map output files* for a shuffle. It can also be the final stage in a job in DAGScheduler.md#adaptive-query-planning[Adaptive Query Planning / Adaptive Scheduling]. -* ResultStage.md[ResultStage] is the final stage that executes rdd:index.md#actions[a Spark action] in a user program by running a function on an RDD. - -When a job is submitted, a new stage is created with the parent ShuffleMapStage.md[ShuffleMapStage] linked -- they can be created from scratch or linked to, i.e. shared, if other jobs use them already. +!!! note + The **logical DAG** or **logical execution plan** is the [RDD lineage](../rdd/lineage.md). -![DAGScheduler and Stages for a job](../images/scheduler/scheduler-job-shuffles-result-stages.png) +Indirectly, a `Stage` is a set of parallel tasks - one task per partition (of an RDD that computes partial results of a function executed as part of a Spark job). -A stage tracks the jobs (their ids) it belongs to (using the internal `jobIds` registry). +![Stage, tasks and submitting a job](../images/scheduler/stage-tasks.png) -`DAGScheduler` splits up a job into a collection of stages. Each stage contains a sequence of [narrow transformations](../rdd/index.md) that can be completed without shuffling the entire data set, separated at **shuffle boundaries** (where shuffle occurs). Stages are thus a result of breaking the RDD graph at shuffle boundaries. +In other words, a Spark job is a computation "sliced" (not to use the reserved term _partitioned_) into stages. -![Graph of Stages](../images/scheduler/dagscheduler-stages.png) +## Contract -Shuffle boundaries introduce a barrier where stages/tasks must wait for the previous stage to finish before they fetch map outputs. +### Missing Partitions -![DAGScheduler splits a job into stages](../images/scheduler/scheduler-job-splits-into-stages.png) +```scala +findMissingPartitions(): Seq[Int] +``` -RDD operations with rdd:index.md[narrow dependencies], like `map()` and `filter()`, are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages, i.e. one to write a set of map output files, and another to read those files after a barrier. +Missing partitions (IDs of the partitions of the [RDD](#rdd) that are missing and need to be computed) -In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the `RDD.compute()` functions of various RDDs, e.g. `MappedRDD`, `FilteredRDD`, etc. +Used when: -At some point of time in a stage's life, every partition of the stage gets transformed into a task - ShuffleMapTask.md[ShuffleMapTask] or ResultTask.md[ResultTask] for ShuffleMapStage.md[ShuffleMapStage] and ResultStage.md[ResultStage], respectively. +* `DAGScheduler` is requested to [submit missing tasks](DAGScheduler.md#submitMissingTasks) -Partitions are computed in jobs, and result stages may not always need to compute all partitions in their target RDD, e.g. for actions like `first()` and `lookup()`. +## Implementations -`DAGScheduler` prints the following INFO message when there are tasks to submit: +* [ResultStage](ResultStage.md) +* [ShuffleMapStage](ShuffleMapStage.md) -``` -Submitting 1 missing tasks from ResultStage 36 (ShuffledRDD[86] at reduceByKey at :24) -``` +## Creating Instance -There is also the following DEBUG message with pending partitions: +`Stage` takes the following to be created: -``` -New pending partitions: Set(0) -``` +* [Stage ID](#id) +* [RDD](#rdd) +* Number of tasks +* Parent `Stage`s +* First Job ID +* `CallSite` +* Resource Profile ID -Tasks are later submitted to TaskScheduler.md[Task Scheduler] (via `taskScheduler.submitTasks`). +!!! note "Abstract Class" + `Stage` is an abstract class and cannot be created directly. It is created indirectly for the [concrete Stages](#implementations). -When no tasks in a stage can be submitted, the following DEBUG message shows in the logs: +## RDD -``` -FIXME -``` +`Stage` is given a [RDD](../rdd/RDD.md) when [created](#creating-instance). -## Latest StageInfo Registry +## Stage ID -```scala -_latestInfo: StageInfo -``` +`Stage` is given an unique ID when [created](#creating-instance). -`Stage` uses `_latestInfo` internal registry for...FIXME +!!! note + `DAGScheduler` uses [nextStageId](DAGScheduler.md#nextStageId) internal counter to track the number of [stage submissions](DAGScheduler.md#submitStage). ## Making New Stage Attempt @@ -94,74 +76,3 @@ In the end, `makeNewStageAttempt` increments the [nextAttemptId](#nextAttemptId) `makeNewStageAttempt` is used when: * `DAGScheduler` is requested to [submit the missing tasks of a stage](DAGScheduler.md#submitMissingTasks) - -## Others to be Reviewed - -== [[findMissingPartitions]] Finding Missing Partitions - -[source, scala] ----- -findMissingPartitions(): Seq[Int] ----- - -findMissingPartitions gives the partition ids that are missing and need to be computed. - -findMissingPartitions is used when DAGScheduler is requested to DAGScheduler.md#submitMissingTasks[submitMissingTasks] and DAGScheduler.md#handleTaskCompletion[handleTaskCompletion]. - -== [[failedOnFetchAndShouldAbort]] `failedOnFetchAndShouldAbort` Method - -`Stage.failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean` checks whether the number of fetch failed attempts (using `fetchFailedAttemptIds`) exceeds the number of consecutive failures allowed for a given stage (that should then be aborted) - -NOTE: The number of consecutive failures for a stage is not configurable. - -== [[latestInfo]] Getting StageInfo For Most Recent Attempt - -[source, scala] ----- -latestInfo: StageInfo ----- - -`latestInfo` simply returns the <<_latestInfo, most recent `StageInfo`>> (i.e. makes it accessible). - -== [[internal-properties]] Internal Properties - -[cols="30m,70",options="header",width="100%"] -|=== -| Name -| Description - -| [[details]] `details` -| Long description of the stage - -Used when...FIXME - -| [[fetchFailedAttemptIds]] `fetchFailedAttemptIds` -| FIXME - -Used when...FIXME - -| [[jobIds]] `jobIds` -| Set of spark-scheduler-ActiveJob.md[jobs] the stage belongs to. - -Used when...FIXME - -| [[name]] `name` -| Name of the stage - -Used when...FIXME - -| [[nextAttemptId]] `nextAttemptId` -| The ID for the next attempt of the stage. - -Used when...FIXME - -| [[numPartitions]] `numPartitions` -| Number of partitions - -Used when...FIXME - -| [[pendingPartitions]] `pendingPartitions` -| Set of pending spark-rdd-partitions.md[partitions] - -Used when...FIXME -|=== diff --git a/docs/scheduler/index.md b/docs/scheduler/index.md index c65c0406e3..976e00cf03 100644 --- a/docs/scheduler/index.md +++ b/docs/scheduler/index.md @@ -4,6 +4,26 @@ Spark Scheduler uses the high-level stage-oriented [DAGScheduler](DAGScheduler.md) and the low-level task-oriented [TaskScheduler](TaskScheduler.md). +## Stage Execution + +Every partition of a [Stage](Stage.md) is transformed into a [Task](Task.md) ([ShuffleMapTask](ShuffleMapTask.md) or [ResultTask](ResultTask.md) for [ShuffleMapStage](ShuffleMapStage.md) and [ResultStage](ResultStage.md), respectively). + +Submitting a stage can therefore trigger execution of a series of dependent parent stages. + +![Submitting a job triggers execution of the stage and its parent stages](../images/scheduler/job-stage.png) + +When a Spark job is submitted, a new stage is created (they can be created from scratch or linked to, i.e. shared, if other jobs use them already). + +![DAGScheduler and Stages for a job](../images/scheduler/scheduler-job-shuffles-result-stages.png) + +`DAGScheduler` splits up a job into a collection of [Stage](Stage.md)s. A `Stage` contains a sequence of [narrow transformations](../rdd/index.md) that can be completed without shuffling data set, separated at **shuffle boundaries** (where shuffle occurs). Stages are thus a result of breaking the RDD graph at shuffle boundaries. + +![Graph of Stages](../images/scheduler/dagscheduler-stages.png) + +Shuffle boundaries introduce a barrier where stages/tasks must wait for the previous stage to finish before they fetch map outputs. + +![DAGScheduler splits a job into stages](../images/scheduler/scheduler-job-splits-into-stages.png) + ## Resources * [Deep Dive into the Apache Spark Scheduler](https://databricks.com/session/apache-spark-scheduler) by Xingbo Jiang (Databricks)