Skip to content

Latest commit



188 lines (120 loc) · 8.63 KB


File metadata and controls

188 lines (120 loc) · 8.63 KB


A TaskScheduler schedules tasks for a single Spark application according to scheduling mode.

sparkstandalone sparkcontext taskscheduler schedulerbackend
Figure 1. TaskScheduler works for a single SparkContext

A TaskScheduler gets sets of tasks (as TaskSets) submitted to it from the DAGScheduler for each stage, and is responsible for sending the tasks to the cluster, running them, retrying if there are failures, and mitigating stragglers.

TaskScheduler is a private[spark] Scala trait. You can find the sources in org.apache.spark.scheduler.TaskScheduler.

TaskScheduler Contract

Every TaskScheduler follows the following contract:

FIXME Have an exercise to create a SchedulerBackend.

TaskScheduler’s Lifecycle

A TaskScheduler is created while SparkContext is being created (by calling SparkContext.createTaskScheduler for a given master URL and deploy mode).

taskscheduler uses schedulerbackend
Figure 2. TaskScheduler uses SchedulerBackend to support different clusters

At this point in SparkContext’s lifecycle, the internal _taskScheduler points at the TaskScheduler (and it is "announced" by sending a blocking TaskSchedulerIsSet message to HeartbeatReceiver RPC endpoint).

The TaskScheduler is started right after the blocking TaskSchedulerIsSet message receives a response.

The application ID and the application’s attempt ID are set at this point (and SparkContext uses the application id to set up, SparkUI, and BlockManager).

FIXME The application id is described as "associated with the job." in TaskScheduler, but I think it is "associated with the application" and you can have many jobs per application.

Right before SparkContext is fully initialized, TaskScheduler.postStartHook is called.

The internal _taskScheduler is cleared (i.e. set to null) while SparkContext is being stopped.

FIXME If it is SparkContext to start a TaskScheduler, shouldn’t SparkContext stop it too? Why is this the way it is now?

Starting TaskScheduler

start(): Unit

start is currently called while SparkContext is being created.

Stopping TaskScheduler

stop(): Unit

stop is currently called while DAGScheduler is being stopped.

Post-Start Initialization (postStartHook method)

postStartHook() {}

postStartHook does nothing by default, but allows custom implementations to do some post-start initialization.

It is currently called right before SparkContext’s initialization finishes.

Submitting TaskSets for Execution

submitTasks(taskSet: TaskSet): Unit

submitTasks accepts a TaskSet for execution.

Cancelling Tasks for Stage (cancelTasks method)

cancelTasks(stageId: Int, interruptThread: Boolean): Unit

cancelTasks cancels all tasks submitted for execution in a stage stageId.

It is currently called by DAGScheduler when it cancels a stage.

Setting Custom DAGScheduler

setDAGScheduler(dagScheduler: DAGScheduler): Unit

setDAGScheduler sets a custom DAGScheduler.

It is currently called by DAGScheduler when it is created.

Calculating Default Level of Parallelism (defaultParallelism method)

defaultParallelism(): Int

defaultParallelism calculates the default level of parallelism to use in a Spark application as the number of partitions in RDDs and also as a hint for sizing jobs.

Read more in Calculating Default Level of Parallelism (defaultParallelism method) for the one and only implementation of the TaskScheduler contract — TaskSchedulerImpl.

Calculating Application ID (applicationId method)

applicationId(): String

applicationId gives the current application’s id. It is in the format spark-application-[System.currentTimeMillis] by default.

It is currently used in SparkContext while it is being initialized.

Calculating Application Attempt ID (applicationAttemptId method)

applicationAttemptId(): Option[String]

applicationAttemptId gives the current application’s attempt id.

It is currently used in SparkContext while it is being initialized.

Handling Executor’s Heartbeats (executorHeartbeatReceived method)

  execId: String,
  accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
  blockManagerId: BlockManagerId): Boolean

executorHeartbeatReceived handles heartbeats from an executor execId with the partial values of accumulators and BlockManagerId.

It is expected to be positive (i.e. return true) when the executor execId is managed by the TaskScheduler.

Handling Executor Lost Events (executorLost method)

executorLost(executorId: String, reason: ExecutorLossReason): Unit

executorLost handles events about an executor executorId being lost for a given reason.

It is currently used in HeartbeatReceiver RPC endpoint in SparkContext to process host expiration events and to remove executors in scheduler backends.

Available Implementations

Spark comes with the following task schedulers: