Skip to content

Latest commit

 

History

History
172 lines (101 loc) · 7.41 KB

spark-sparkcontext-HeartbeatReceiver.adoc

File metadata and controls

172 lines (101 loc) · 7.41 KB

HeartbeatReceiver RPC Endpoint

HeartbeatReceiver RPC endpoint is a ThreadSafeRpcEndpoint and a SparkListener.

It keeps track of executors (through messages) and informs TaskScheduler and SparkContext about lost executors.

When created, it requires a SparkContext and a Clock. Later, it uses the SparkContext to register itself as a SparkListener and TaskScheduler (as scheduler).

Note
HeartbeatReceiver RPC endpoint is registered while SparkContext is being created.
Tip

Enable DEBUG or TRACE logging levels for org.apache.spark.HeartbeatReceiver to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.HeartbeatReceiver=TRACE

Refer to Logging.

Starting (onStart method)

Note
onStart is part of the RpcEndpoint Contract

Stopping (onStop method)

Note
onStop is part of the RpcEndpoint Contract

When called, HeartbeatReceiver cancels the checking task (that sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread - Heartbeat Receiver Event Loop Thread - see Starting (onStart method)) and shuts down eventLoopThread and killExecutorThread executors.

killExecutorThread - Kill Executor Thread

killExecutorThread is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is kill-executor-thread.

Note
It is used to request SparkContext to kill the executor.

eventLoopThread - Heartbeat Receiver Event Loop Thread

eventLoopThread is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is heartbeat-receiver-event-loop-thread.

Messages

ExecutorRegistered

ExecutorRegistered(executorId: String)

When ExecutorRegistered arrives, executorId is simply added to executorLastSeen internal registry.

Note
HeartbeatReceiver sends a ExecutorRegistered message to itself (from addExecutor internal method). It is as a follow-up to SparkListener.onExecutorAdded when a driver announces a new executor registration.
Note
It is an internal message.

ExecutorRemoved

ExecutorRemoved(executorId: String)

When ExecutorRemoved arrives, executorId is simply removed from executorLastSeen internal registry.

Note
HeartbeatReceiver itself sends a ExecutorRegistered message (from removeExecutor internal method). It is as a follow-up to SparkListener.onExecutorRemoved when a driver removes an executor.
Note
It is an internal message.

ExpireDeadHosts

ExpireDeadHosts

When ExpireDeadHosts arrives the following TRACE is printed out to the logs:

TRACE HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver.

Each executor (in executorLastSeen registry) is checked whether the time it was last seen is not longer than spark.network.timeout.

For any such executor, the following WARN message is printed out to the logs:

WARN HeartbeatReceiver: Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms

TaskScheduler.executorLost is called (with SlaveLost("Executor heartbeat timed out after [timeout] ms").

SparkContext.killAndReplaceExecutor is asynchronously called for the executor (i.e. on killExecutorThread).

The executor is removed from executorLastSeen.

Note
It is an internal message.

Heartbeat

Heartbeat(executorId: String,
  accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
  blockManagerId: BlockManagerId)

When Heartbeat arrives and the internal scheduler is not set yet (no TaskSchedulerIsSet earlier), the following WARN is printed out to the logs:

WARN HeartbeatReceiver: Dropping [heartbeat] because TaskScheduler is not ready yet

And the response is HeartbeatResponse(reregisterBlockManager = true).

If however the internal scheduler was set already, HeartbeatReceiver checks whether the executor executorId is known (in executorLastSeen).

If the executor is not recognized, the following DEBUG message is printed out to the logs:

DEBUG HeartbeatReceiver: Received heartbeat from unknown executor [executorId]

And the response is HeartbeatResponse(reregisterBlockManager = true).

If however the internal scheduler is set and the executor is recognized (in executorLastSeen), the current time is recorded in executorLastSeen and TaskScheduler.executorHeartbeatReceived is called asynchronously (i.e. on a separate thread) on eventLoopThread.

The response is HeartbeatResponse(reregisterBlockManager = unknownExecutor) where unknownExecutor corresponds to the result of calling TaskScheduler.executorHeartbeatReceived.

Caution
FIXME Figure

TaskSchedulerIsSet

When TaskSchedulerIsSet arrives, HeartbeatReceiver sets scheduler internal attribute (using SparkContext.taskScheduler).

Note
It is an internal message.

Internal Registries

  • executorLastSeen - a registry of executor ids and the timestamps of when the last heartbeat was received.

Settings

spark.storage.blockManagerSlaveTimeoutMs

spark.storage.blockManagerSlaveTimeoutMs (default: 120s)

spark.network.timeout

spark.network.timeout (default: spark.storage.blockManagerSlaveTimeoutMs)

Other

  • spark.storage.blockManagerTimeoutIntervalMs (default: 60s)

  • spark.network.timeoutInterval (default: spark.storage.blockManagerTimeoutIntervalMs)