From c680f6dc7e379850953fdd93c424d46e77f923a2 Mon Sep 17 00:00:00 2001 From: "xiyu.zk" Date: Wed, 7 Feb 2024 17:33:27 +0800 Subject: [PATCH] [CELEBORN-1267] Add config to control worker check in CelebornShuffleFallbackPolicyRunner --- .../CelebornShuffleFallbackPolicyRunner.scala | 4 ++++ .../org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala index 6248e08ffb..4711903aa2 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleFallbackPolicyRunner.scala @@ -80,6 +80,10 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging { * @return if celeborn cluster has available workers. */ def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = { + if (!conf.forceCheckWorkerEnabled) { + return true + } + val resp = lifecycleManager.checkWorkersAvailable() if (!resp.getAvailable) { logWarning(s"No workers available for current user ${lifecycleManager.getUserIdentifier}.") diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index c4aad0306e..aefa22b053 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -884,6 +884,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def shufflePartitionType: PartitionType = PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE)) def shuffleRangeReadFilterEnabled: Boolean = get(SHUFFLE_RANGE_READ_FILTER_ENABLED) def shuffleForceFallbackEnabled: Boolean = get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED) + def forceCheckWorkerEnabled: Boolean = get(FORCE_CHECK_WORKER_ENABLED) def shuffleForceFallbackPartitionThreshold: Long = get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD) def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL) @@ -4003,6 +4004,16 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val FORCE_CHECK_WORKER_ENABLED: ConfigEntry[Boolean] = + buildConf("celeborn.client.spark.shuffle.forceCheckWorker.enabled") + .categories("client") + .doc("When true, before registering shuffle, LifecycleManager should check " + + "if current cluster have available workers, if cluster don't have available " + + "workers, fallback to Spark's default shuffle") + .version("0.5.0") + .booleanConf + .createWithDefault(true) + val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] = buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold") .withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")