Skip to content

Commit

Permalink
Stage
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Nov 13, 2021
1 parent 666088a commit adc8dec
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 131 deletions.
2 changes: 1 addition & 1 deletion docs/scheduler/ResultStage.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# ResultStage

`ResultStage` is the final stage in a job that applies a function on one or many partitions of the target RDD to compute the result of an action.
`ResultStage` is the final stage in a job that applies a function to one or many partitions of the target RDD to compute the result of an action.

![Job creates ResultStage as the first stage](../images/scheduler/dagscheduler-job-resultstage.png)

Expand Down
13 changes: 6 additions & 7 deletions docs/scheduler/ShuffleMapStage.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
# ShuffleMapStage

`ShuffleMapStage` is a [Stage](Stage.md).

`ShuffleMapStage` (_shuffle map stage_ or simply _map stage_) is one of the two types of [Stage](Stage.md)s in a physical execution DAG (beside a [ResultStage](ResultStage.md)).

!!! note
The **logical DAG** or **logical execution plan** is the [RDD lineage](../rdd/lineage.md).
`ShuffleMapStage` (_shuffle map stage_ or simply _map stage_) is a [Stage](Stage.md).

`ShuffleMapStage` corresponds to (and is associated with) a [ShuffleDependency](#shuffleDep).

`ShuffleMapStage` can be [submitted independently](DAGScheduler.md#submitMapStage) (from a [ResultStage](ResultStage.md)).
`ShuffleMapStage` can be [submitted independently](DAGScheduler.md#submitMapStage) but it is usually an intermediate step in a physical execution plan (with the final step being a [ResultStage](ResultStage.md)).

## Creating Instance

Expand Down Expand Up @@ -140,3 +135,7 @@ rdd.count // (3)
1. Shuffle at `sortByKey()`
1. Submits a job with two stages (and two to be executed)
1. Intentionally repeat the last action that submits a new job with two stages with one being shared as already-computed

## Map Output Files

`ShuffleMapStage` writes out **map output files** (for a shuffle).
157 changes: 34 additions & 123 deletions docs/scheduler/Stage.md
Original file line number Diff line number Diff line change
@@ -1,78 +1,60 @@
# Stage

`Stage` is a unit of execution (_step_) in a physical execution plan.
`Stage` is an [abstraction](#contract) of [steps](#implementations) in a physical execution plan.

A stage is a set of parallel tasks -- one task per partition (of an RDD that computes partial results of a function executed as part of a Spark job).

![Stage, tasks and submitting a job](../images/scheduler/stage-tasks.png)

In other words, a Spark job is a computation with that computation sliced into stages.

A stage is uniquely identified by `id`. When a stage is created, DAGScheduler.md[DAGScheduler] increments internal counter `nextStageId` to track the number of DAGScheduler.md#submitStage[stage submissions].

[[rdd]]
A stage can only work on the partitions of a single RDD (identified by `rdd`), but can be associated with many other dependent parent stages (via internal field `parents`), with the boundary of a stage marked by shuffle dependencies.

Submitting a stage can therefore trigger execution of a series of dependent parent stages (refer to DAGScheduler.md#runJob[RDDs, Job Execution, Stages, and Partitions]).

![Submitting a job triggers execution of the stage and its parent stages](../images/scheduler/job-stage.png)

Finally, every stage has a `firstJobId` that is the id of the job that submitted the stage.

There are two types of stages:

* ShuffleMapStage.md[ShuffleMapStage] is an intermediate stage (in the execution DAG) that produces data for other stage(s). It writes *map output files* for a shuffle. It can also be the final stage in a job in DAGScheduler.md#adaptive-query-planning[Adaptive Query Planning / Adaptive Scheduling].
* ResultStage.md[ResultStage] is the final stage that executes rdd:index.md#actions[a Spark action] in a user program by running a function on an RDD.

When a job is submitted, a new stage is created with the parent ShuffleMapStage.md[ShuffleMapStage] linked -- they can be created from scratch or linked to, i.e. shared, if other jobs use them already.
!!! note
The **logical DAG** or **logical execution plan** is the [RDD lineage](../rdd/lineage.md).

![DAGScheduler and Stages for a job](../images/scheduler/scheduler-job-shuffles-result-stages.png)
Indirectly, a `Stage` is a set of parallel tasks - one task per partition (of an RDD that computes partial results of a function executed as part of a Spark job).

A stage tracks the jobs (their ids) it belongs to (using the internal `jobIds` registry).
![Stage, tasks and submitting a job](../images/scheduler/stage-tasks.png)

`DAGScheduler` splits up a job into a collection of stages. Each stage contains a sequence of [narrow transformations](../rdd/index.md) that can be completed without shuffling the entire data set, separated at **shuffle boundaries** (where shuffle occurs). Stages are thus a result of breaking the RDD graph at shuffle boundaries.
In other words, a Spark job is a computation "sliced" (not to use the reserved term _partitioned_) into stages.

![Graph of Stages](../images/scheduler/dagscheduler-stages.png)
## Contract

Shuffle boundaries introduce a barrier where stages/tasks must wait for the previous stage to finish before they fetch map outputs.
### <span id="findMissingPartitions"> Missing Partitions

![DAGScheduler splits a job into stages](../images/scheduler/scheduler-job-splits-into-stages.png)
```scala
findMissingPartitions(): Seq[Int]
```

RDD operations with rdd:index.md[narrow dependencies], like `map()` and `filter()`, are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages, i.e. one to write a set of map output files, and another to read those files after a barrier.
Missing partitions (IDs of the partitions of the [RDD](#rdd) that are missing and need to be computed)

In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the `RDD.compute()` functions of various RDDs, e.g. `MappedRDD`, `FilteredRDD`, etc.
Used when:

At some point of time in a stage's life, every partition of the stage gets transformed into a task - ShuffleMapTask.md[ShuffleMapTask] or ResultTask.md[ResultTask] for ShuffleMapStage.md[ShuffleMapStage] and ResultStage.md[ResultStage], respectively.
* `DAGScheduler` is requested to [submit missing tasks](DAGScheduler.md#submitMissingTasks)

Partitions are computed in jobs, and result stages may not always need to compute all partitions in their target RDD, e.g. for actions like `first()` and `lookup()`.
## Implementations

`DAGScheduler` prints the following INFO message when there are tasks to submit:
* [ResultStage](ResultStage.md)
* [ShuffleMapStage](ShuffleMapStage.md)

```
Submitting 1 missing tasks from ResultStage 36 (ShuffledRDD[86] at reduceByKey at <console>:24)
```
## Creating Instance

There is also the following DEBUG message with pending partitions:
`Stage` takes the following to be created:

```
New pending partitions: Set(0)
```
* [Stage ID](#id)
* [RDD](#rdd)
* <span id="numTasks"> Number of tasks
* <span id="parents"> Parent `Stage`s
* <span id="firstJobId"> First Job ID
* <span id="callSite"> `CallSite`
* <span id="resourceProfileId"> Resource Profile ID

Tasks are later submitted to TaskScheduler.md[Task Scheduler] (via `taskScheduler.submitTasks`).
!!! note "Abstract Class"
`Stage` is an abstract class and cannot be created directly. It is created indirectly for the [concrete Stages](#implementations).

When no tasks in a stage can be submitted, the following DEBUG message shows in the logs:
## <span id="rdd"> RDD

```
FIXME
```
`Stage` is given a [RDD](../rdd/RDD.md) when [created](#creating-instance).

## <span id="_latestInfo"> Latest StageInfo Registry
## <span id="id"> Stage ID

```scala
_latestInfo: StageInfo
```
`Stage` is given an unique ID when [created](#creating-instance).

`Stage` uses `_latestInfo` internal registry for...FIXME
!!! note
`DAGScheduler` uses [nextStageId](DAGScheduler.md#nextStageId) internal counter to track the number of [stage submissions](DAGScheduler.md#submitStage).

## <span id="makeNewStageAttempt"> Making New Stage Attempt

Expand All @@ -94,74 +76,3 @@ In the end, `makeNewStageAttempt` increments the [nextAttemptId](#nextAttemptId)
`makeNewStageAttempt` is used when:

* `DAGScheduler` is requested to [submit the missing tasks of a stage](DAGScheduler.md#submitMissingTasks)

## Others to be Reviewed

== [[findMissingPartitions]] Finding Missing Partitions

[source, scala]
----
findMissingPartitions(): Seq[Int]
----

findMissingPartitions gives the partition ids that are missing and need to be computed.

findMissingPartitions is used when DAGScheduler is requested to DAGScheduler.md#submitMissingTasks[submitMissingTasks] and DAGScheduler.md#handleTaskCompletion[handleTaskCompletion].

== [[failedOnFetchAndShouldAbort]] `failedOnFetchAndShouldAbort` Method

`Stage.failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean` checks whether the number of fetch failed attempts (using `fetchFailedAttemptIds`) exceeds the number of consecutive failures allowed for a given stage (that should then be aborted)

NOTE: The number of consecutive failures for a stage is not configurable.

== [[latestInfo]] Getting StageInfo For Most Recent Attempt

[source, scala]
----
latestInfo: StageInfo
----

`latestInfo` simply returns the <<_latestInfo, most recent `StageInfo`>> (i.e. makes it accessible).

== [[internal-properties]] Internal Properties

[cols="30m,70",options="header",width="100%"]
|===
| Name
| Description

| [[details]] `details`
| Long description of the stage

Used when...FIXME

| [[fetchFailedAttemptIds]] `fetchFailedAttemptIds`
| FIXME

Used when...FIXME

| [[jobIds]] `jobIds`
| Set of spark-scheduler-ActiveJob.md[jobs] the stage belongs to.

Used when...FIXME

| [[name]] `name`
| Name of the stage

Used when...FIXME

| [[nextAttemptId]] `nextAttemptId`
| The ID for the next attempt of the stage.

Used when...FIXME

| [[numPartitions]] `numPartitions`
| Number of partitions

Used when...FIXME

| [[pendingPartitions]] `pendingPartitions`
| Set of pending spark-rdd-partitions.md[partitions]

Used when...FIXME
|===
20 changes: 20 additions & 0 deletions docs/scheduler/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@

Spark Scheduler uses the high-level stage-oriented [DAGScheduler](DAGScheduler.md) and the low-level task-oriented [TaskScheduler](TaskScheduler.md).

## Stage Execution

Every partition of a [Stage](Stage.md) is transformed into a [Task](Task.md) ([ShuffleMapTask](ShuffleMapTask.md) or [ResultTask](ResultTask.md) for [ShuffleMapStage](ShuffleMapStage.md) and [ResultStage](ResultStage.md), respectively).

Submitting a stage can therefore trigger execution of a series of dependent parent stages.

![Submitting a job triggers execution of the stage and its parent stages](../images/scheduler/job-stage.png)

When a Spark job is submitted, a new stage is created (they can be created from scratch or linked to, i.e. shared, if other jobs use them already).

![DAGScheduler and Stages for a job](../images/scheduler/scheduler-job-shuffles-result-stages.png)

`DAGScheduler` splits up a job into a collection of [Stage](Stage.md)s. A `Stage` contains a sequence of [narrow transformations](../rdd/index.md) that can be completed without shuffling data set, separated at **shuffle boundaries** (where shuffle occurs). Stages are thus a result of breaking the RDD graph at shuffle boundaries.

![Graph of Stages](../images/scheduler/dagscheduler-stages.png)

Shuffle boundaries introduce a barrier where stages/tasks must wait for the previous stage to finish before they fetch map outputs.

![DAGScheduler splits a job into stages](../images/scheduler/scheduler-job-splits-into-stages.png)

## Resources

* [Deep Dive into the Apache Spark Scheduler](https://databricks.com/session/apache-spark-scheduler) by Xingbo Jiang (Databricks)

0 comments on commit adc8dec

Please sign in to comment.