Skip to content

Commit

Permalink
[CELEBORN-1267] Add config to control worker check in CelebornShuffle…
Browse files Browse the repository at this point in the history
…FallbackPolicyRunner
  • Loading branch information
kerwin-zk committed Feb 7, 2024
1 parent ab4c0bc commit c680f6d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ class CelebornShuffleFallbackPolicyRunner(conf: CelebornConf) extends Logging {
* @return if celeborn cluster has available workers.
*/
def checkWorkersAvailable(lifecycleManager: LifecycleManager): Boolean = {
if (!conf.forceCheckWorkerEnabled) {
return true
}

val resp = lifecycleManager.checkWorkersAvailable()
if (!resp.getAvailable) {
logWarning(s"No workers available for current user ${lifecycleManager.getUserIdentifier}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def shufflePartitionType: PartitionType = PartitionType.valueOf(get(SHUFFLE_PARTITION_TYPE))
def shuffleRangeReadFilterEnabled: Boolean = get(SHUFFLE_RANGE_READ_FILTER_ENABLED)
def shuffleForceFallbackEnabled: Boolean = get(SPARK_SHUFFLE_FORCE_FALLBACK_ENABLED)
def forceCheckWorkerEnabled: Boolean = get(FORCE_CHECK_WORKER_ENABLED)
def shuffleForceFallbackPartitionThreshold: Long =
get(SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD)
def shuffleExpiredCheckIntervalMs: Long = get(SHUFFLE_EXPIRED_CHECK_INTERVAL)
Expand Down Expand Up @@ -4003,6 +4004,16 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)

val FORCE_CHECK_WORKER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.spark.shuffle.forceCheckWorker.enabled")
.categories("client")
.doc("When true, before registering shuffle, LifecycleManager should check " +
"if current cluster have available workers, if cluster don't have available " +
"workers, fallback to Spark's default shuffle")
.version("0.5.0")
.booleanConf
.createWithDefault(true)

val SPARK_SHUFFLE_FORCE_FALLBACK_PARTITION_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.client.spark.shuffle.forceFallback.numPartitionsThreshold")
.withAlternative("celeborn.shuffle.forceFallback.numPartitionsThreshold")
Expand Down

0 comments on commit c680f6d

Please sign in to comment.