diff --git a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala index 2113fec05f..c7a5b57b8f 100644 --- a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala +++ b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala @@ -261,6 +261,7 @@ class CelebornShuffleReader[K, C]( if (handle.numMappers > 0) { val startFetchWait = System.nanoTime() var inputStream: CelebornInputStream = streams.get(partitionId) + // todo bug fix: split后,fetch时inputStream一直为空 while (inputStream == null) { if (exceptionRef.get() != null) { exceptionRef.get() match { diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index bbfcc2e0ed..6642d9df0a 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -650,9 +650,10 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends } // First, request to get allocated slots from Primary - val numGroupTask = math.ceil(numMappers.toDouble / groupMapTaskGroupSize).toInt + var numGroupTask = 1 var groupNumPartitions = numPartitions if (partitionType.getValue.equals(PartitionType.REDUCE.getValue) && groupMapTaskEnabled) { + numGroupTask = math.ceil(numMappers.toDouble / groupMapTaskGroupSize).toInt groupNumPartitions = numPartitions * numGroupTask }