-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1388] Use finer grained locks in changePartitionManager #2462
Conversation
@CodingCat, please follow the contribution guide to run |
eh? I remember this was part of unit test to detect any inconsistency between CelebornConf and doc file, it is removed? |
and after I ran it, it didn't generate any new change to the markdown file? anything I missed? |
@CodingCat, is |
buildConf("celeborn.client.shuffle.batchHandleChangePartition.parallelism") | ||
.categories("client") | ||
.internal | ||
.doc("max number of change partition requests which can be concurrently processed ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep the first letter of the document's first word capitalized.
None | ||
} | ||
} | ||
}.filter(_.isDefined).map(_.get).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not so sure of this change - it feels like it will have a lot more contention - as each entry in the requests
map will need to acquire a lock - while uncontented locks are cheap to acquire, it is not zero cost still.
If you have a test bed to validate perf, how about this ?
val batchPartitions = inBatchPartitions.get(shuffleId)
val distinctPartitions = requests.synchronized {
// For each partition only need handle one request
requests.asScala.filter { case (partitionId, _) =>
!batchPartitions.contains(partitionId)
}.map { case (partitionId, request) =>
batchPartitions.add(partitionId)
request.asScala.maxBy(_.epoch)
}.toArray
}
Essentially minimize the time within the synchronized block itself by removing unnecessary costs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on our observation in our production system, it won't bring more competition ...
if we use requests.synchronized
, all celeborn-dispatcher threads + all celeborn-client-life-cycle-manager-change-partition-executor will all compete for the same object for locking, even they are likely working on different partitions ... check the following screenshots
after this change, with a huge spark application of 300TB shuffle data, I don't see such intensive locking competition anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. Thanks for sharing the stack trace.
I would suggest that the changes I gave are relevant irrespective of the locking strategy - as it will minimize the time within a critical section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it feels like it will have a lot more contention - as each entry in the requests map will need to acquire a lock
To reduce the frequency of acquiring locks, I think we can calculate the lock buckets for each partition ids first, then group the partition ids by the lock bucket, then acquire lock and process each group (in random order). Though I'm not sure how beneficial this will be.
@@ -151,7 +156,7 @@ class ChangePartitionManager( | |||
oldPartition, | |||
cause) | |||
|
|||
requests.synchronized { | |||
locks(partitionId % locks.length).synchronized { | |||
if (requests.containsKey(partitionId)) { | |||
requests.get(partitionId).add(changePartition) | |||
logTrace(s"[handleRequestPartitionLocation] For $shuffleId, request for same partition" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could replace this with a computeIfAbsent
?
Something like:
requests.synchronized {
var newEntry = false
val set = requests.computeIfAbsent(partitionId, (v1: Integer) => {
// If new slot for the partition has been allocated, reply and return.
// Else register and allocate for it.
getLatestPartition(shuffleId, partitionId, oldEpoch).foreach { latestLoc =>
context.reply(
partitionId,
StatusCode.SUCCESS,
Some(latestLoc),
lifecycleManager.workerStatusTracker.workerAvailable(oldPartition))
logDebug(s"New partition found, old partition $partitionId-$oldEpoch return it." +
s" shuffleId: $shuffleId $latestLoc")
return
}
newEntry = true
new util.HashSet[ChangePartitionRequest]()
})
set.add(changePartition)
if (!newEntry) {
logTrace(s"[handleRequestPartitionLocation] For $shuffleId, request for same partition" +
s"$partitionId-$oldEpoch exists, register context.")
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I understand the suggest code correctly, you essentially create a set in requests for each partition and keep adding a request to it,
I thought the same when iterating on the PR, however it turns out we cannot do it ....
basically it is not what the original code was doing... the original code always add a new set containing a single request to the hash map, i.e. line 178 - 179
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original code is doing the same.
If partition exists, it will add to the set - else create new and add with the entry.
(Removing other parts of the code, it is essentially)
if (requests.containsKey(partitionId)) {
requests.get(partitionId).add(changePartition)
} else {
// an early exit condition, followed by:
val set = new util.HashSet[ChangePartitionRequest]()
set.add(changePartition)
requests.put(partitionId, set)
}
It is probing the map multiple times though, which is something we can avoid.
(the return
in the getLatestPartition
case I suggested looks wrong though - we should return null
and exit if set
is null
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requests.putIfAbsent(partitionId, set)
requests.get(partitionId).synchronized {
getLatestPartition(shuffleId, partitionId, oldEpoch).foreach { latestLoc =>
context.reply(
partitionId,
StatusCode.SUCCESS,
Some(latestLoc),
lifecycleManager.workerStatusTracker.workerAvailable(oldPartition))
logDebug(s"New partition found, old partition $partitionId-$oldEpoch return it." +
s" shuffleId: $shuffleId $latestLoc")
return
}
requests.get(partitionId).add(changePartition)
}
this was my original code, somehow this makes the application stuck , that's why I feel somehow this putIfAbsent approach changed the original semantics in a stealthy way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strictly not the same as what exists in main branch - I have not analyzed it greater detail, but the critical sections are different.
Note that the changes I proposed above are to ensure we remove avoidable probes into the map, and improve performance while not changing the critical sections ... but if the version I proposed does cause deadlocks/hangs, I would be very curious to know why ! (stack trace would definitely help) thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i have updated the code , will run more test in our env
val requestSet = inBatchPartitions.get(shuffleId) | ||
requests.asScala.map { case (partitionId, request) => | ||
locks(partitionId % locks.length).synchronized { | ||
if (!inBatchPartitions.contains(partitionId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean requestSet.contains(partitionId)
here?
Since requestSet
is just util.HashSet
, it's not thread safe. Multiple threads can concurrently modifyrequestSet
if they are processing different partition ids, which I think may cause undefined behavior. Maybe we need to change it to ConcurrentHashMap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I meant requestSet
@@ -3899,6 +3901,14 @@ object CelebornConf extends Logging { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val CLIENT_BATCH_HANDLE_CHANGE_PARTITION_PARALLELISM: ConfigEntry[Int] = | |||
buildConf("celeborn.client.shuffle.batchHandleChangePartition.parallelism") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe celeborn.client.shuffle.batchHandleChangePartition.partitionBuckets
is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just another ,optimization, in ChangePartitionManager#replySuccess
and ChangePartitionManager#replyFailure
, we should remove from requestsMap
before remove from inBatchPartitions
to avoid redundant change partitions in case. And removing from inBatchPartitions
should be guarded by locks
.
cc @AngersZhuuuu @RexXiong @FMX could you also take a look at this PR?
val requestSet = inBatchPartitions.get(shuffleId) | ||
requests.asScala.map { case (partitionId, request) => | ||
locks(partitionId % locks.length).synchronized { | ||
if (!requestSet.contains(partitionId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
contains
for ConcurrentHashMap
is actually containsValue
. It's better to use ConcurrentHashMap.newKeySet()
instead of ConcurrentHashMap[Integer, Unit]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, fixed
requests.synchronized { | ||
if (requests.containsKey(partitionId)) { | ||
requests.get(partitionId).add(changePartition) | ||
locks(partitionId % locks.length).synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@CodingCat I think the "partition Id" of different shuffles can be repeated. The lock is for the same "shuffleId" in the previous implementation but the lock can be contended by the same "partition Id" of different stages in your new implementation. Although a spark application won't run too many stages concurrently, but the spark thrift server might run many stages.
The locks variable can be changed to avoid the lock contention of different stages.
private val locks = JavaUtils.newConcurrentHashMap[Int,Array[AnyRef]]()
I think creating an array of AnyRef won't cost more than the contended locks. 256 AnyRef objects will consume 2 kb of memory, this suggestion won't introduce memory pressure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excellent point! just changed the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks! Merging to main(v0.5.0)
### What changes were proposed in this pull request? this PR proposes to use finer grained lock in changePartitionManager when handling requests for different partitions ### Why are the changes needed? we observed the intensive competition of locks when there are many partition got split. most of change-partition-executor threads are competing for the concurrenthashmap used in ChangePartitionManager...this concurrentHashMap is holding request per partition but we are lock at the whole map instead of per partition level, with this change, the driver memory footprint is significantly reduced due to the increased processing throughput... ### Does this PR introduce _any_ user-facing change? one more configs ### How was this patch tested? prod Closes apache#2462 from CodingCat/finer_grained_locks. Authored-by: CodingCat <zhunansjtu@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
this PR proposes to use finer grained lock in changePartitionManager when handling requests for different partitions we observed the intensive competition of locks when there are many partition got split. most of change-partition-executor threads are competing for the concurrenthashmap used in ChangePartitionManager...this concurrentHashMap is holding request per partition but we are lock at the whole map instead of per partition level, with this change, the driver memory footprint is significantly reduced due to the increased processing throughput... one more configs prod Closes apache#2462 from CodingCat/finer_grained_locks. Authored-by: CodingCat <zhunansjtu@gmail.com> Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
What changes were proposed in this pull request?
this PR proposes to use finer grained lock in changePartitionManager when handling requests for different partitions
Why are the changes needed?
we observed the intensive competition of locks when there are many partition got split. most of change-partition-executor threads are competing for the concurrenthashmap used in ChangePartitionManager...this concurrentHashMap is holding request per partition but we are lock at the whole map instead of per partition level,
with this change, the driver memory footprint is significantly reduced due to the increased processing throughput...
Does this PR introduce any user-facing change?
one more configs
How was this patch tested?
prod