Skip to content

Commit

Permalink
bug tod
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Nov 11, 2024
1 parent 2d18124 commit 5d628b9
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ class CelebornShuffleReader[K, C](
if (handle.numMappers > 0) {
val startFetchWait = System.nanoTime()
var inputStream: CelebornInputStream = streams.get(partitionId)
// todo bug fix: split后,fetch时inputStream一直为空
while (inputStream == null) {
if (exceptionRef.get() != null) {
exceptionRef.get() match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,9 +650,10 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
}

// First, request to get allocated slots from Primary
val numGroupTask = math.ceil(numMappers.toDouble / groupMapTaskGroupSize).toInt
var numGroupTask = 1
var groupNumPartitions = numPartitions
if (partitionType.getValue.equals(PartitionType.REDUCE.getValue) && groupMapTaskEnabled) {
numGroupTask = math.ceil(numMappers.toDouble / groupMapTaskGroupSize).toInt
groupNumPartitions = numPartitions * numGroupTask
}

Expand Down

0 comments on commit 5d628b9

Please sign in to comment.