A MapOutputTracker is a Spark service to track the locations of the (shuffle) map outputs of a stage. It uses an internal MapStatus map with an array of MapStatus
for every partition for a shuffle id.
There are two versions of MapOutputTracker
:
-
MapOutputTrackerMaster for a driver
-
MapOutputTrackerWorker for executors
MapOutputTracker is available under SparkEnv.get.mapOutputTracker
. It is also available as MapOutputTracker
in the driver’s RPC Environment.
Tip
|
Enable Add the following line to
|
It works with ShuffledRDD when it asks for preferred locations for a shuffle using tracker.getPreferredLocationsForShuffle
.
It is also used for mapOutputTracker.containsShuffle
and MapOutputTrackerMaster.registerShuffle when a new ShuffleMapStage is created.
Caution
|
FIXME DAGScheduler.mapOutputTracker
|
MapOutputTrackerMaster.getStatistics(dependency) returns MapOutputStatistics that becomes the result of JobWaiter.taskSucceeded
for ShuffleMapStage if it’s the final stage in a job.
MapOutputTrackerMaster.registerMapOutputs for a shuffle id and a list of MapStatus
when a ShuffleMapStage is finished.
A MapStatus is the result returned by a ShuffleMapTask to DAGScheduler that includes:
-
the location where ShuffleMapTask ran (as
def location: BlockManagerId
) -
an estimated size for the reduce block, in bytes (as
def getSizeForBlock(reduceId: Int): Long
).
There are two types of MapStatus:
-
CompressedMapStatus that compresses the estimated map output size to 8 bits (
Byte
) for efficient reporting. -
HighlyCompressedMapStatus that stores the average size of non-empty blocks, and a compressed bitmap for tracking which blocks are empty.
When the number of blocks (the size of uncompressedSizes
) is greater than 2000, HighlyCompressedMapStatus is chosen.
Caution
|
FIXME What exactly is 2000? Is this the number of tasks in a job? |
Caution
|
FIXME Review ShuffleManager |
A MapOutputTrackerMaster is the MapOutputTracker
for a driver.
A MapOutputTrackerMaster is the source of truth for the collection of MapStatus
objects (map output locations) per shuffle id (as recorded from ShuffleMapTasks).
MapOutputTrackerMaster
uses Spark’s org.apache.spark.util.TimeStampedHashMap
for mapStatuses
.
Note
|
There is currently a hardcoded limit of map and reduce tasks above which Spark does not assign preferred locations aka locality preferences based on map output sizes — 1000 for map and reduce each.
|
It uses MetadataCleaner
with MetadataCleanerType.MAP_OUTPUT_TRACKER
as cleanerType
and cleanup function to drop entries in mapStatuses
.
You should see the following INFO message when the MapOutputTrackerMaster is created (FIXME it uses MapOutputTrackerMasterEndpoint
):
INFO SparkEnv: Registering MapOutputTracker
cleanup(cleanupTime: Long)
method removes old entries in mapStatuses
and cachedSerializedStatuses
that have timestamp earlier than cleanupTime
.
It uses org.apache.spark.util.TimeStampedHashMap.clearOldValues
method.
Tip
|
Enable Add the following line to
|
You should see the following DEBUG message in the logs for entries being removed:
DEBUG Removing key [entry.getKey]
-
spark.shuffle.reduceLocality.enabled
(default: true) - whether to compute locality preferences for reduce tasks.If
true
,MapOutputTrackerMaster
computes the preferred hosts on which to run a given map output partition in a given shuffle, i.e. the nodes that the most outputs for that partition are on.
A MapOutputTrackerWorker is the MapOutputTracker
for executors. The internal mapStatuses
map serves as a cache and any miss triggers a fetch from the driver’s MapOutputTrackerMaster.
Note
|
The only difference between MapOutputTrackerWorker and the base abstract class MapOutputTracker is that the internal mapStatuses mapping between ints and an array of MapStatus objects is an instance of the thread-safe java.util.concurrent.ConcurrentHashMap.
|