Skip to content

Commit

Permalink
SparkContext.submitMapStage
Browse files Browse the repository at this point in the history
  • Loading branch information
jaceklaskowski committed Nov 14, 2021
1 parent adc8dec commit 715264c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 70 deletions.
29 changes: 13 additions & 16 deletions docs/SparkContext.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ getCheckpointDir: Option[String]

* `ReliableRDDCheckpointData` is requested for the [checkpoint path](rdd/ReliableRDDCheckpointData.md#checkpointPath)

## <span id="submitMapStage"> Submitting MapStage for Execution

```scala
submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C]): SimpleFutureAction[MapOutputStatistics]
```

`submitMapStage` requests the [DAGScheduler](#dagScheduler) to [submit](scheduler/DAGScheduler.md#submitMapStage) the given [ShuffleDependency](rdd/ShuffleDependency.md) for execution (that eventually produces a [MapOutputStatistics](scheduler/MapOutputStatistics.md)).

`submitMapStage` is used when:

* `ShuffleExchangeExec` ([Spark SQL]({{ book.spark_sql }}/physical-operators/ShuffleExchangeExec#mapOutputStatisticsFuture)) unary physical operator is executed

## <span id="ExecutorMetricsSource"><span id="_executorMetricsSource"> ExecutorMetricsSource

`SparkContext` creates an [ExecutorMetricsSource](executor/ExecutorMetricsSource.md) when [created](#creating-instance) with [spark.metrics.executorMetricsSource.enabled](metrics/configuration-properties.md#spark.metrics.executorMetricsSource.enabled) enabled.
Expand Down Expand Up @@ -1149,22 +1162,6 @@ scala> sc.startTime
res0: Long = 1464425605653
----

== [[submitMapStage]] Submitting `ShuffleDependency` for Execution -- `submitMapStage` Internal Method

[source, scala]
----
submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C]): SimpleFutureAction[MapOutputStatistics]
----

`submitMapStage` scheduler:DAGScheduler.md#submitMapStage[submits the input `ShuffleDependency` to `DAGScheduler` for execution] and returns a `SimpleFutureAction`.

Internally, `submitMapStage` <<getCallSite, calculates the call site>> first and submits it with `localProperties`.

NOTE: Interestingly, `submitMapStage` is used exclusively when Spark SQL's spark-sql-SparkPlan-ShuffleExchange.md[ShuffleExchange] physical operator is executed.

NOTE: `submitMapStage` _seems_ related to scheduler:DAGScheduler.md#adaptive-query-planning[Adaptive Query Planning / Adaptive Scheduling].

== [[cancelJobGroup]] Cancelling Job Group -- `cancelJobGroup` Method

[source, scala]
Expand Down
57 changes: 38 additions & 19 deletions docs/scheduler/DAGScheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ When DAGScheduler schedules a job as a result of rdd/index.md#actions[executing

While being created, `DAGScheduler` requests the [TaskScheduler](#taskScheduler) to [associate itself with](TaskScheduler.md#setDAGScheduler) and requests [DAGScheduler Event Bus](#eventProcessLoop) to start accepting events.

## <span id="submitMapStage"> Submitting MapStage for Execution (Posting MapStageSubmitted)

```scala
submitMapStage[K, V, C](
dependency: ShuffleDependency[K, V, C],
callback: MapOutputStatistics => Unit,
callSite: CallSite,
properties: Properties): JobWaiter[MapOutputStatistics]
```

`submitMapStage` requests the given [ShuffleDependency](../rdd/ShuffleDependency.md) for the [RDD](../rdd/ShuffleDependency.md#rdd).

`submitMapStage` gets the [job ID](#nextJobId) and increments it (for future submissions).

`submitMapStage` creates a [JobWaiter](JobWaiter.md) to wait for a [MapOutputStatistics](MapOutputStatistics.md). The `JobWaiter` waits for 1 task and, when completed successfully, executes the given `callback` function with the computed `MapOutputStatistics`.

In the end, `submitMapStage` posts a [MapStageSubmitted](DAGSchedulerEvent.md#MapStageSubmitted) and returns the `JobWaiter`.

Used when:

* `SparkContext` is requested to [submit a MapStage for execution](../SparkContext.md#submitMapStage)

## <span id="getShuffleDependenciesAndResourceProfiles"> Shuffle Dependencies and ResourceProfiles

```scala
Expand Down Expand Up @@ -1310,7 +1332,7 @@ In the end, `handleJobSubmitted` posts a [SparkListenerJobStart](../SparkListene

`handleJobSubmitted` is used when `DAGSchedulerEventProcessLoop` is requested to handle a [JobSubmitted](DAGSchedulerEvent.md#JobSubmitted) event.

### <span id="handleMapStageSubmitted"> MapStageSubmitted Event Handler
### <span id="handleMapStageSubmitted"> MapStageSubmitted

```scala
handleMapStageSubmitted(
Expand All @@ -1324,17 +1346,14 @@ handleMapStageSubmitted(
![MapStageSubmitted Event Handling](../images/scheduler/scheduler-handlemapstagesubmitted.png)

!!! note
`MapStageSubmitted` event processing is very similar to <<JobSubmitted, JobSubmitted>> events.
`MapStageSubmitted` event processing is very similar to [JobSubmitted](#JobSubmitted) event's.

`handleMapStageSubmitted` [finds or creates a new `ShuffleMapStage`](#getOrCreateShuffleMapStage) for the input [ShuffleDependency](../rdd/ShuffleDependency.md) and `jobId`.
`handleMapStageSubmitted` [finds or creates a new ShuffleMapStage](#getOrCreateShuffleMapStage) for the given [ShuffleDependency](../rdd/ShuffleDependency.md) and `jobId`.

`handleMapStageSubmitted` creates an [ActiveJob](ActiveJob.md).
`handleMapStageSubmitted` creates an [ActiveJob](ActiveJob.md) (with the given `jobId`, the `ShuffleMapStage`, the given `JobListener`).

`handleMapStageSubmitted` [clears the internal cache of RDD partition locations](#clearCacheLocs).

!!! important
FIXME Why is this clearing here so important?

`handleMapStageSubmitted` prints out the following INFO messages to the logs:

```text
Expand All @@ -1344,26 +1363,32 @@ Parents of final stage: [parents]
Missing parents: [missingParentStages]
```

`handleMapStageSubmitted` registers the new job in [jobIdToActiveJob](#jobIdToActiveJob) and [activeJobs](#activeJobs) internal registries, and [with the final `ShuffleMapStage`](ShuffleMapStage.md#addActiveJob).
`handleMapStageSubmitted` adds the new `ActiveJob` to [jobIdToActiveJob](#jobIdToActiveJob) and [activeJobs](#activeJobs) internal registries, and the [ShuffleMapStage](ShuffleMapStage.md#addActiveJob).

!!! note
`ShuffleMapStage` can have multiple ``ActiveJob``s registered.
`ShuffleMapStage` can have multiple `ActiveJob`s registered.

`handleMapStageSubmitted` [finds all the registered stages for the input `jobId`](#jobIdToStageIds) and collects [their latest `StageInfo`](Stage.md#latestInfo).

In the end, `handleMapStageSubmitted` posts [SparkListenerJobStart](../SparkListener.md#SparkListenerJobStart) message to [LiveListenerBus](LiveListenerBus.md) and [submits the `ShuffleMapStage`](#submitStage).
In the end, `handleMapStageSubmitted` posts a [SparkListenerJobStart](../SparkListener.md#SparkListenerJobStart) event to the [LiveListenerBus](#listenerBus) and [submits the ShuffleMapStage](#submitStage).

When the [`ShuffleMapStage` is available](ShuffleMapStage.md#isAvailable) already, `handleMapStageSubmitted` [marks the job finished](#markMapStageJobAsFinished).
When the [ShuffleMapStage is available](ShuffleMapStage.md#isAvailable) already, `handleMapStageSubmitted` [marks the job finished](#markMapStageJobAsFinished).

---

When `handleMapStageSubmitted` could not find or create a `ShuffleMapStage`, `handleMapStageSubmitted` prints out the following WARN message to the logs.

```text
Creating new stage failed due to exception - job: [id]
```

`handleMapStageSubmitted` notifies [`listener` about the job failure](JobListener.md#jobFailed) and exits.
`handleMapStageSubmitted` [notifies the JobListener about the job failure](JobListener.md#jobFailed) and exits.

`handleMapStageSubmitted` is used when `DAGSchedulerEventProcessLoop` is requested to handle a [MapStageSubmitted](DAGSchedulerEvent.md#MapStageSubmitted) event.
---

`handleMapStageSubmitted` is used when:

* [DAGSchedulerEventProcessLoop](DAGSchedulerEventProcessLoop.md) is requested to handle a [MapStageSubmitted](DAGSchedulerEvent.md#MapStageSubmitted) event

### <span id="resubmitFailedStages"> ResubmitFailedStages Event Handler

Expand Down Expand Up @@ -1540,12 +1565,6 @@ Posts a [SpeculativeTaskSubmitted](DAGSchedulerEvent.md#SpeculativeTaskSubmitted

Used when `TaskSetManager` is requested to [checkAndSubmitSpeculatableTask](TaskSetManager.md#checkAndSubmitSpeculatableTask)

### <span id="submitMapStage"> Posting MapStageSubmitted (submitMapStage)

Posts a [MapStageSubmitted](DAGSchedulerEvent.md#MapStageSubmitted)

Used when `SparkContext` is requested to [submit a MapStage for execution](../SparkContext.md#submitMapStage)

### <span id="taskEnded"> Posting CompletionEvent

Posts a [CompletionEvent](DAGSchedulerEvent.md#CompletionEvent)
Expand Down
4 changes: 3 additions & 1 deletion docs/scheduler/DAGSchedulerEvent.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ Carries the following:
* [JobListener](JobListener.md)
* Execution properties

Posted when `DAGScheduler` is requested to [submitMapStage](DAGScheduler.md#submitMapStage)
Posted when:

* `DAGScheduler` is requested to [submit a MapStage for execution](DAGScheduler.md#submitMapStage)

Event handler: [handleMapStageSubmitted](DAGScheduler.md#handleMapStageSubmitted)

Expand Down
4 changes: 3 additions & 1 deletion docs/scheduler/DAGSchedulerEventProcessLoop.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

* <span id="dagScheduler"> [DAGScheduler](DAGScheduler.md)

`DAGSchedulerEventProcessLoop` is created when `DAGScheduler` is [created](DAGScheduler.md#eventProcessLoop).
`DAGSchedulerEventProcessLoop` is created when:

* `DAGScheduler` is [created](DAGScheduler.md#eventProcessLoop)

## <span id="onReceive"><span id="doOnReceive"> Processing Event

Expand Down
66 changes: 33 additions & 33 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -217,51 +217,51 @@ nav:
- OutputCommitCoordinator: OutputCommitCoordinator.md
- Scheduler:
- scheduler/index.md
- DAGScheduler: scheduler/DAGScheduler.md
- Stage: scheduler/Stage.md
- ResultStage: scheduler/ResultStage.md
- ShuffleMapStage: scheduler/ShuffleMapStage.md
- ActiveJob: scheduler/ActiveJob.md
- BarrierTaskContext: scheduler/BarrierTaskContext.md
- BlacklistTracker: scheduler/BlacklistTracker.md
- StageInfo: scheduler/StageInfo.md
- JobListener: scheduler/JobListener.md
- JobWaiter: scheduler/JobWaiter.md
- TaskScheduler: scheduler/TaskScheduler.md
- TaskSchedulerImpl: scheduler/TaskSchedulerImpl.md
- SchedulerBackend: scheduler/SchedulerBackend.md
- CoarseGrainedSchedulerBackend: scheduler/CoarseGrainedSchedulerBackend.md
- DAGScheduler: scheduler/DAGScheduler.md
- DAGSchedulerEvent: scheduler/DAGSchedulerEvent.md
- DAGSchedulerEventProcessLoop: scheduler/DAGSchedulerEventProcessLoop.md
- DAGSchedulerSource: scheduler/DAGSchedulerSource.md
- DriverEndpoint: scheduler/DriverEndpoint.md
- SchedulerBackendUtils: scheduler/SchedulerBackendUtils.md
- Task: scheduler/Task.md
- ShuffleMapTask: scheduler/ShuffleMapTask.md
- ResultTask: scheduler/ResultTask.md
- TaskSet: scheduler/TaskSet.md
- TaskSetManager: scheduler/TaskSetManager.md
- ActiveJob: scheduler/ActiveJob.md
- SchedulableBuilder: scheduler/SchedulableBuilder.md
- FIFOSchedulableBuilder: scheduler/FIFOSchedulableBuilder.md
- ExecutorData: scheduler/ExecutorData.md
- ExternalClusterManager: scheduler/ExternalClusterManager.md
- FairSchedulableBuilder: scheduler/FairSchedulableBuilder.md
- Schedulable: scheduler/Schedulable.md
- Pool: scheduler/Pool.md
- SchedulingMode: scheduler/SchedulingMode.md
- TaskInfo: scheduler/TaskInfo.md
- FIFOSchedulableBuilder: scheduler/FIFOSchedulableBuilder.md
- JobListener: scheduler/JobListener.md
- JobWaiter: scheduler/JobWaiter.md
- LiveListenerBus: scheduler/LiveListenerBus.md
- MapStatuses:
- MapStatus: scheduler/MapStatus.md
- CompressedMapStatus: scheduler/CompressedMapStatus.md
- HighlyCompressedMapStatus: scheduler/HighlyCompressedMapStatus.md
- TaskDescription: scheduler/TaskDescription.md
- TaskResultGetter: scheduler/TaskResultGetter.md
- Pool: scheduler/Pool.md
- ResultStage: scheduler/ResultStage.md
- ResultTask: scheduler/ResultTask.md
- Schedulable: scheduler/Schedulable.md
- SchedulableBuilder: scheduler/SchedulableBuilder.md
- SchedulerBackend: scheduler/SchedulerBackend.md
- SchedulerBackendUtils: scheduler/SchedulerBackendUtils.md
- SchedulingMode: scheduler/SchedulingMode.md
- ShuffleMapStage: scheduler/ShuffleMapStage.md
- ShuffleMapTask: scheduler/ShuffleMapTask.md
- Stage: scheduler/Stage.md
- StageInfo: scheduler/StageInfo.md
- TaskScheduler: scheduler/TaskScheduler.md
- TaskSchedulerImpl: scheduler/TaskSchedulerImpl.md
- Task: scheduler/Task.md
- TaskContext: scheduler/TaskContext.md
- BarrierTaskContext: scheduler/BarrierTaskContext.md
- TaskContextImpl: scheduler/TaskContextImpl.md
- TaskDescription: scheduler/TaskDescription.md
- TaskInfo: scheduler/TaskInfo.md
- TaskLocation: scheduler/TaskLocation.md
- TaskResult: scheduler/TaskResult.md
- TaskResultGetter: scheduler/TaskResultGetter.md
- TaskSet: scheduler/TaskSet.md
- TaskSetBlacklist: scheduler/TaskSetBlacklist.md
- TaskLocation: scheduler/TaskLocation.md
- ExternalClusterManager: scheduler/ExternalClusterManager.md
- LiveListenerBus: scheduler/LiveListenerBus.md
- DAGSchedulerEvent: scheduler/DAGSchedulerEvent.md
- DAGSchedulerEventProcessLoop: scheduler/DAGSchedulerEventProcessLoop.md
- DAGSchedulerSource: scheduler/DAGSchedulerSource.md
- ExecutorData: scheduler/ExecutorData.md
- TaskSetManager: scheduler/TaskSetManager.md
- RPC:
- rpc/index.md
- RpcEnv: rpc/RpcEnv.md
Expand Down

0 comments on commit 715264c

Please sign in to comment.