Skip to content

Commit

Permalink
[CELEBORN-1685] ShuffleFallbackPolicy supports ShuffleFallbackCount m…
Browse files Browse the repository at this point in the history
…etric
  • Loading branch information
SteNicholas committed Nov 8, 2024
1 parent 4545cdc commit cfda3f1
Show file tree
Hide file tree
Showing 21 changed files with 305 additions and 96 deletions.
90 changes: 89 additions & 1 deletion assets/grafana/celeborn-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The count of shuffle fallbacks.",
"description": "The total count of shuffle including celeborn shuffle and spark built-in shuffle.",
"fieldConfig": {
"defaults": {
"color": {
Expand Down Expand Up @@ -1544,6 +1544,94 @@
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"expr": "metrics_ShuffleTotalCount_Value{instance=~\"${instance}\"}",
"legendFormat": "${baseLegend}",
"range": true,
"refId": "A"
}
],
"title": "metrics_ShuffleTotalCount_Value",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The count of shuffle fallbacks.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green"
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 34
},
"id": 219,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"maxHeight": 600,
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);

lifecycleManager.shuffleCount().increment();
if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) {
logger.warn("Fallback to SortShuffleManager!");
sortShuffleIds.add(shuffleId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.shuffle.celeborn

import java.util.function.BiFunction

import scala.collection.JavaConverters._

import org.apache.spark.ShuffleDependency
Expand All @@ -35,12 +37,26 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
def applyFallbackPolicies[K, V, C](
dependency: ShuffleDependency[K, V, C],
lifecycleManager: LifecycleManager): Boolean = {
val needFallback =
shuffleFallbackPolicies.exists(_.needFallback(dependency, conf, lifecycleManager))
if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
throw new CelebornIOException(
"Fallback to spark built-in shuffle implementation is prohibited.")
val fallbackPolicy =
shuffleFallbackPolicies.find(_.needFallback(dependency, conf, lifecycleManager))
if (fallbackPolicy.isDefined) {
if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
throw new CelebornIOException(
"Fallback to spark built-in shuffle implementation is prohibited.")
} else {
lifecycleManager.shuffleFallbackCounts.compute(
fallbackPolicy.getClass.getName,
new BiFunction[String, java.lang.Long, java.lang.Long] {
override def apply(k: String, v: java.lang.Long): java.lang.Long = {
if (v == null) {
1L
} else {
v + 1L
}
}
})
}
}
needFallback
fallbackPolicy.isDefined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ public <K, V, C> ShuffleHandle registerShuffle(
String appId = SparkUtils.appUniqueId(dependency.rdd().context());
initializeLifecycleManager(appId);

lifecycleManager.shuffleCount().increment();
if (fallbackPolicyRunner.applyFallbackPolicies(dependency, lifecycleManager)) {
lifecycleManager.shuffleFallbackCount().increment();
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)
&& !conf.getBoolean("spark.shuffle.service.enabled", false)) {
logger.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
def applyFallbackPolicies[K, V, C](
dependency: ShuffleDependency[K, V, C],
lifecycleManager: LifecycleManager): Boolean = {
val needFallback =
shuffleFallbackPolicies.exists(_.needFallback(dependency, conf, lifecycleManager))
if (needFallback && FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
throw new CelebornIOException(
"Fallback to spark built-in shuffle implementation is prohibited.")
val fallbackPolicy =
shuffleFallbackPolicies.find(_.needFallback(dependency, conf, lifecycleManager))
if (fallbackPolicy.isDefined) {
if (FallbackPolicy.NEVER.equals(shuffleFallbackPolicy)) {
throw new CelebornIOException(
"Fallback to spark built-in shuffle implementation is prohibited.")
} else {
lifecycleManager.shuffleFallbackCounts.compute(
fallbackPolicy.getClass.getName,
(_, v) => {
if (v == null) {
1L
} else {
v + 1L
}
})
}
}
needFallback
fallbackPolicy.isDefined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ApplicationHeartbeater(
appId: String,
conf: CelebornConf,
masterClient: MasterClient,
shuffleMetrics: () => ((Long, Long), Long),
shuffleMetrics: () => ((Long, Long), (Long, Map[String, java.lang.Long])),
workerStatusTracker: WorkerStatusTracker,
registeredShuffles: ConcurrentHashMap.KeySetView[Int, java.lang.Boolean],
cancelAllActiveStages: String => Unit) extends Logging {
Expand All @@ -59,18 +59,21 @@ class ApplicationHeartbeater(
override def run(): Unit = {
try {
require(masterClient != null, "When sending a heartbeat, client shouldn't be null.")
val ((tmpTotalWritten, tmpTotalFileCount), tmpShuffleFallbackCount) = shuffleMetrics()
val (
(tmpTotalWritten, tmpTotalFileCount),
(tmpShuffleCount, tmpShuffleFallbackCounts)) = shuffleMetrics()
logInfo("Send app heartbeat with " +
s"written: ${Utils.bytesToString(tmpTotalWritten)}, file count: $tmpTotalFileCount, " +
s"shuffle fallback count: $tmpShuffleFallbackCount")
s"shuffle count: $tmpShuffleCount, shuffle fallback count: $tmpShuffleFallbackCounts")
// UserResourceConsumption and DiskInfo are eliminated from WorkerInfo
// during serialization of HeartbeatFromApplication
val appHeartbeat =
HeartbeatFromApplication(
appId,
tmpTotalWritten,
tmpTotalFileCount,
tmpShuffleFallbackCount,
tmpShuffleCount,
tmpShuffleFallbackCounts.asJava,
workerStatusTracker.getNeedCheckedWorkers().toList.asJava,
ZERO_UUID,
true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
private val unregisterShuffleTime = JavaUtils.newConcurrentHashMap[Int, Long]()

val registeredShuffle = ConcurrentHashMap.newKeySet[Int]()
val shuffleFallbackCount = new LongAdder()
val shuffleCount = new LongAdder()
val shuffleFallbackCounts = JavaUtils.newConcurrentHashMap[String, java.lang.Long]()
// maintain each shuffle's map relation of WorkerInfo and partition location
val shuffleAllocatedWorkers = new ShuffleAllocatedWorkers
// shuffle id -> (partitionId -> newest PartitionLocation)
Expand Down Expand Up @@ -210,7 +211,10 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
appUniqueId,
conf,
masterClient,
() => commitManager.commitMetrics() -> shuffleFallbackCount.sumThenReset(),
() =>
commitManager.commitMetrics() ->
(shuffleCount.sumThenReset(),
shuffleFallbackCounts.asScala.filter(_._2 > 0L).toMap),
workerStatusTracker,
registeredShuffle,
reason => cancelAllActiveStages(reason))
Expand Down
6 changes: 4 additions & 2 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ message PbHeartbeatFromApplication {
string requestId = 4;
repeated PbWorkerInfo needCheckedWorkerList = 5;
bool shouldResponse = 6;
int64 shuffleFallbackCount = 7;
int64 shuffleCount = 7;
map<string, int64> shuffleFallbackCounts = 8;
}

message PbHeartbeatFromApplicationResponse {
Expand Down Expand Up @@ -675,7 +676,8 @@ message PbSnapshotMetaInfo {
map<string, PbWorkerEventInfo> workerEventInfos = 15;
map<string, PbApplicationMeta> applicationMetas = 16;
repeated PbWorkerInfo decommissionWorkers = 17;
int64 shuffleTotalFallbackCount = 18;
int64 shuffleTotalCount = 18;
map<string, int64> shuffleFallbackCounts = 19;
}

message PbOpenStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ object ControlMessages extends Logging {
appId: String,
totalWritten: Long,
fileCount: Long,
shuffleFallbackCount: Long,
shuffleCount: Long,
shuffleFallbackCounts: util.Map[String, java.lang.Long],
needCheckedWorkerList: util.List[WorkerInfo],
override var requestId: String = ZERO_UUID,
shouldResponse: Boolean = false) extends MasterRequestMessage
Expand Down Expand Up @@ -810,7 +811,8 @@ object ControlMessages extends Logging {
appId,
totalWritten,
fileCount,
shuffleFallbackCount,
shuffleCount,
shuffleFallbackCounts,
needCheckedWorkerList,
requestId,
shouldResponse) =>
Expand All @@ -819,7 +821,8 @@ object ControlMessages extends Logging {
.setRequestId(requestId)
.setTotalWritten(totalWritten)
.setFileCount(fileCount)
.setShuffleFallbackCount(shuffleFallbackCount)
.setShuffleCount(shuffleCount)
.putAllShuffleFallbackCounts(shuffleFallbackCounts)
.addAllNeedCheckedWorkerList(needCheckedWorkerList.asScala.map(
PbSerDeUtils.toPbWorkerInfo(_, true, true)).toList.asJava)
.setShouldResponse(shouldResponse)
Expand Down Expand Up @@ -1208,11 +1211,16 @@ object ControlMessages extends Logging {

case HEARTBEAT_FROM_APPLICATION_VALUE =>
val pbHeartbeatFromApplication = PbHeartbeatFromApplication.parseFrom(message.getPayload)
val shuffleFallbackCountMap = new util.HashMap[String, java.lang.Long]()
pbHeartbeatFromApplication.getShuffleFallbackCountsMap.asScala.foreach { entry =>
shuffleFallbackCountMap.put(entry._1, entry._2)
}
HeartbeatFromApplication(
pbHeartbeatFromApplication.getAppId,
pbHeartbeatFromApplication.getTotalWritten,
pbHeartbeatFromApplication.getFileCount,
pbHeartbeatFromApplication.getShuffleFallbackCount,
pbHeartbeatFromApplication.getShuffleCount,
shuffleFallbackCountMap,
new util.ArrayList[WorkerInfo](
pbHeartbeatFromApplication.getNeedCheckedWorkerListList.asScala
.map(PbSerDeUtils.fromPbWorkerInfo).toList.asJava),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ object PbSerDeUtils {
workers: java.util.Set[WorkerInfo],
partitionTotalWritten: java.lang.Long,
partitionTotalFileCount: java.lang.Long,
shuffleTotalFallbackCount: java.lang.Long,
shuffleTotalCount: java.lang.Long,
shuffleFallbackCounts: java.util.Map[String, java.lang.Long],
appDiskUsageMetricSnapshots: Array[AppDiskUsageSnapShot],
currentAppDiskUsageMetricsSnapshot: AppDiskUsageSnapShot,
lostWorkers: ConcurrentHashMap[WorkerInfo, java.lang.Long],
Expand All @@ -488,7 +489,8 @@ object PbSerDeUtils {
.addAllWorkers(workers.asScala.map(toPbWorkerInfo(_, true, false)).asJava)
.setPartitionTotalWritten(partitionTotalWritten)
.setPartitionTotalFileCount(partitionTotalFileCount)
.setShuffleTotalFallbackCount(shuffleTotalFallbackCount)
.setShuffleTotalCount(shuffleTotalCount)
.putAllShuffleFallbackCounts(shuffleFallbackCounts)
// appDiskUsageMetricSnapshots can have null values,
// protobuf repeated value can't support null value in list.
.addAllAppDiskUsageMetricSnapshots(appDiskUsageMetricSnapshots.filter(_ != null)
Expand Down
37 changes: 19 additions & 18 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,24 +92,25 @@ These metrics are exposed by Celeborn master.

- namespace=master

| Metric Name | Description |
|--------------------------|---------------------------------------------------------------------------------|
| RegisteredShuffleCount | The count of registered shuffle. |
| DeviceCelebornFreeBytes | The actual usable space of Celeborn for device. |
| DeviceCelebornTotalBytes | The total space of Celeborn for device. |
| RunningApplicationCount | The count of running applications. |
| ActiveShuffleSize | The active shuffle size of workers. |
| ActiveShuffleFileCount | The active shuffle file count of workers. |
| ShuffleFallbackCount | The count of shuffle fallbacks. |
| WorkerCount | The count of active workers. |
| LostWorkerCount | The count of workers in lost list. |
| ExcludedWorkerCount | The count of workers in excluded list. |
| AvailableWorkerCount | The count of workers in available list. |
| ShutdownWorkerCount | The count of workers in shutdown list. |
| DecommissionWorkerCount | The count of workers in decommission list. |
| IsActiveMaster | Whether the current master is active. |
| PartitionSize | The size of estimated shuffle partition. |
| OfferSlotsTime | The time for masters to handle `RequestSlots` request when registering shuffle. |
| Metric Name | Description |
|--------------------------|-----------------------------------------------------------------------------------|
| RegisteredShuffleCount | The count of registered shuffle. |
| DeviceCelebornFreeBytes | The actual usable space of Celeborn for device. |
| DeviceCelebornTotalBytes | The total space of Celeborn for device. |
| RunningApplicationCount | The count of running applications. |
| ActiveShuffleSize | The active shuffle size of workers. |
| ActiveShuffleFileCount | The active shuffle file count of workers. |
| ShuffleTotalCount | The total count of shuffle including celeborn shuffle and spark built-in shuffle. |
| ShuffleFallbackCount | The count of shuffle fallbacks. |
| WorkerCount | The count of active workers. |
| LostWorkerCount | The count of workers in lost list. |
| ExcludedWorkerCount | The count of workers in excluded list. |
| AvailableWorkerCount | The count of workers in available list. |
| ShutdownWorkerCount | The count of workers in shutdown list. |
| DecommissionWorkerCount | The count of workers in decommission list. |
| IsActiveMaster | Whether the current master is active. |
| PartitionSize | The size of estimated shuffle partition. |
| OfferSlotsTime | The time for masters to handle `RequestSlots` request when registering shuffle. |

- namespace=CPU

Expand Down
Loading

0 comments on commit cfda3f1

Please sign in to comment.