Skip to content

Commit

Permalink
use finer grained locks in changePartitionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
CodingCat committed Apr 16, 2024
1 parent 3121828 commit 263a4c3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class ChangePartitionManager(
// shuffleId -> (partitionId -> set of ChangePartition)
private val changePartitionRequests =
JavaUtils.newConcurrentHashMap[Int, ConcurrentHashMap[Integer, JSet[ChangePartitionRequest]]]()
private val locks = Array.fill(conf.batchHandleChangePartitionParallelism)(new AnyRef())

// shuffleId -> set of partition id
private val inBatchPartitions = JavaUtils.newConcurrentHashMap[Int, JSet[Integer]]()

Expand Down Expand Up @@ -79,14 +81,17 @@ class ChangePartitionManager(
batchHandleChangePartitionExecutors.submit {
new Runnable {
override def run(): Unit = {
val distinctPartitions = requests.synchronized {
// For each partition only need handle one request
requests.asScala.filter { case (partitionId, _) =>
!inBatchPartitions.get(shuffleId).contains(partitionId)
}.map { case (partitionId, request) =>
inBatchPartitions.get(shuffleId).add(partitionId)
request.asScala.toArray.maxBy(_.epoch)
}.toArray
val distinctPartitions = {
requests.asScala.map { case (partitionId, request) =>
locks(partitionId % locks.length).synchronized {
if (!inBatchPartitions.contains(partitionId)) {
inBatchPartitions.get(shuffleId).add(partitionId)
Some(request.asScala.toArray.maxBy(_.epoch))
} else {
None
}
}
}.filter(_.isDefined).map(_.get).toArray
}
if (distinctPartitions.nonEmpty) {
handleRequestPartitions(
Expand Down Expand Up @@ -151,7 +156,7 @@ class ChangePartitionManager(
oldPartition,
cause)

requests.synchronized {
locks(partitionId % locks.length).synchronized {
if (requests.containsKey(partitionId)) {
requests.get(partitionId).add(changePartition)
logTrace(s"[handleRequestPartitionLocation] For $shuffleId, request for same partition" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
PartitionSplitMode.valueOf(get(SHUFFLE_PARTITION_SPLIT_MODE))
def shufflePartitionSplitThreshold: Long = get(SHUFFLE_PARTITION_SPLIT_THRESHOLD)
def batchHandleChangePartitionEnabled: Boolean = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_ENABLED)
def batchHandleChangePartitionParallelism: Int =
get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_PARALLELISM)
def batchHandleChangePartitionNumThreads: Int = get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_THREADS)
def batchHandleChangePartitionRequestInterval: Long =
get(CLIENT_BATCH_HANDLE_CHANGE_PARTITION_INTERVAL)
Expand Down Expand Up @@ -3899,6 +3901,15 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(true)

val CLIENT_BATCH_HANDLE_CHANGE_PARTITION_PARALLELISM: ConfigEntry[Int] =
buildConf("celeborn.client.shuffle.batchHandleChangePartition.parallelism")
.categories("client")
.internal
.doc("max number of change partition requests which can be concurrently processed ")
.version("0.5.0")
.intConf
.createWithDefault(256)

val CLIENT_BATCH_HANDLE_CHANGE_PARTITION_THREADS: ConfigEntry[Int] =
buildConf("celeborn.client.shuffle.batchHandleChangePartition.threads")
.withAlternative("celeborn.shuffle.batchHandleChangePartition.threads")
Expand Down

0 comments on commit 263a4c3

Please sign in to comment.