Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Apr 18, 2024
1 parent 24fbd9c commit 24c6905
Showing 1 changed file with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ class ChangePartitionManager(
new Runnable {
override def run(): Unit = {
val distinctPartitions = {
val requestSet = inBatchPartitions.get(shuffleId)
requests.asScala.map { case (partitionId, request) =>
locks(partitionId % locks.length).synchronized {
if (!inBatchPartitions.contains(partitionId)) {
inBatchPartitions.get(shuffleId).add(partitionId)
requestSet.add(partitionId)
Some(request.asScala.toArray.maxBy(_.epoch))
} else {
None
Expand Down Expand Up @@ -157,14 +158,17 @@ class ChangePartitionManager(
cause)

locks(partitionId % locks.length).synchronized {
if (requests.containsKey(partitionId)) {
requests.get(partitionId).add(changePartition)
var newEntry = false
val set = requests.computeIfAbsent(partitionId,
(_: Integer) => {
newEntry = true
new util.HashSet[ChangePartitionRequest]()
})

if (newEntry) {
logTrace(s"[handleRequestPartitionLocation] For $shuffleId, request for same partition" +
s"$partitionId-$oldEpoch exists, register context.")
return
} else {
// If new slot for the partition has been allocated, reply and return.
// Else register and allocate for it.
getLatestPartition(shuffleId, partitionId, oldEpoch).foreach { latestLoc =>
context.reply(
partitionId,
Expand All @@ -175,10 +179,8 @@ class ChangePartitionManager(
s" shuffleId: $shuffleId $latestLoc")
return
}
val set = new util.HashSet[ChangePartitionRequest]()
set.add(changePartition)
requests.put(partitionId, set)
}
set.add(changePartition)
}
if (!batchHandleChangePartitionEnabled) {
handleRequestPartitions(shuffleId, Array(changePartition))
Expand Down

0 comments on commit 24c6905

Please sign in to comment.