From 1ae770d518118307b2b304b3674db01a1aa26431 Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Mon, 25 May 2020 00:18:46 -0700 Subject: [PATCH 1/2] Copies references to data structures to avoid extra ARC traffic. Note: this change (by itself) does not reduce ARC traffic, but in concert with `UnmanagedBuffer` (#41), we see a ~15x performance improvement in the parallelFor benchmark. --- .../NonBlockingThreadPool.swift | 55 +++++++++++-------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift index 9a8e7d94..23dbb97c 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift @@ -69,9 +69,9 @@ public class NonBlockingThreadPool: ComputeThr var cancelledStorage: AtomicUInt64 var blockedCountStorage: AtomicUInt64 var spinningState: AtomicUInt64 - var condition: NonblockingCondition - var waitingMutex: [Environment.ConditionMutex] // TODO: modify condition to add per-thread wakeup - var externalWaitingMutex: Environment.ConditionMutex + let condition: NonblockingCondition + let waitingMutex: [Environment.ConditionMutex] // TODO: modify condition to add per-thread wakeup + let externalWaitingMutex: Environment.ConditionMutex var threads: [Environment.Thread] private let perThreadKey = Environment.ThreadLocalStorage.makeKey( @@ -451,6 +451,10 @@ fileprivate final class PerThreadState { init(threadId: Int, pool: NonBlockingThreadPool) { self.threadId = threadId self.pool = pool + self.totalThreadCount = pool.totalThreadCount + self.coprimes = pool.coprimes + self.queues = pool.queues + self.condition = pool.condition self.rng = PCGRandomNumberGenerator(state: UInt64(threadId)) } let threadId: Int @@ -462,35 +466,40 @@ fileprivate final class PerThreadState { // possible to provide a safer API that doesn't leak by default without inducing an extra pointer // dereference on critical paths. :-( + let totalThreadCount: Int + let coprimes: [Int] + let queues: [NonBlockingThreadPool.Queue] + let condition: NonblockingCondition + var rng: PCGRandomNumberGenerator var isCancelled: Bool { pool.cancelled } func steal() -> Task? { let r = Int(rng.next()) - var selectedThreadId = fastFit(r, into: pool.totalThreadCount) - let step = pool.coprimes[fastFit(r, into: pool.coprimes.count)] + var selectedThreadId = fastFit(r, into: totalThreadCount) + let step = coprimes[fastFit(r, into: coprimes.count)] assert( - step < pool.totalThreadCount, "step: \(step), pool threadcount: \(pool.totalThreadCount)") + step < totalThreadCount, "step: \(step), pool threadcount: \(totalThreadCount)") - for i in 0..= pool.totalThreadCount { - selectedThreadId -= pool.totalThreadCount + if selectedThreadId >= totalThreadCount { + selectedThreadId -= totalThreadCount } } return nil } func spin() -> Task? { - let spinCount = pool.threads.count > 0 ? Constants.spinCount / pool.threads.count : 0 + let spinCount = totalThreadCount > 0 ? Constants.spinCount / totalThreadCount : 0 if pool.shouldStartSpinning() { // Call steal spin_count times; break if steal returns something. @@ -511,12 +520,12 @@ fileprivate final class PerThreadState { func parkUntilWorkAvailable() -> Task? { // Already did a best-effort emptiness check in steal, so prepare for blocking. - pool.condition.preWait() + condition.preWait() // Now we do a reliable emptiness check. if let nonEmptyQueueIndex = findNonEmptyQueueIndex() { - pool.condition.cancelWait() + condition.cancelWait() // Steal from `nonEmptyQueueIndex`. - return pool.queues[nonEmptyQueueIndex].popBack() + return queues[nonEmptyQueueIndex].popBack() } let blockedCount = pool.blockedCountStorage.increment() + 1 // increment returns old value. if blockedCount == pool.threads.count { @@ -526,7 +535,7 @@ fileprivate final class PerThreadState { pool.condition.cancelWait() return nil } - pool.condition.commitWait(threadId) + condition.commitWait(threadId) _ = pool.blockedCountStorage.decrement() return nil } @@ -534,13 +543,13 @@ fileprivate final class PerThreadState { private func findNonEmptyQueueIndex() -> Int? { let r = Int(rng.next()) let increment = - pool.totalThreadCount == 1 ? 1 : pool.coprimes[fastFit(r, into: pool.coprimes.count)] - var threadIndex = fastFit(r, into: pool.totalThreadCount) - for _ in 0..= pool.totalThreadCount { - threadIndex -= pool.totalThreadCount + if threadIndex >= totalThreadCount { + threadIndex -= totalThreadCount } } return nil From 7c31bec12870cd58050ad0c5928c1e360d25765c Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Tue, 9 Jun 2020 20:49:59 -0700 Subject: [PATCH 2/2] Use workerThreadCount instead of totalThreadCount. --- .../NonblockingThreadPool/NonBlockingThreadPool.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift index f2531c72..446a319a 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift @@ -452,6 +452,7 @@ fileprivate final class PerThreadState { self.threadId = threadId self.pool = pool self.totalThreadCount = pool.totalThreadCount + self.workerThreadCount = pool.totalThreadCount - pool.externalFastPathThreadCount self.coprimes = pool.coprimes self.queues = pool.queues self.condition = pool.condition @@ -467,6 +468,7 @@ fileprivate final class PerThreadState { // dereference on critical paths. :-( let totalThreadCount: Int + let workerThreadCount: Int let coprimes: [Int] let queues: [NonBlockingThreadPool.Queue] let condition: NonblockingCondition @@ -499,7 +501,7 @@ fileprivate final class PerThreadState { } func spin() -> Task? { - let spinCount = totalThreadCount > 0 ? Constants.spinCount / totalThreadCount : 0 + let spinCount = workerThreadCount > 0 ? Constants.spinCount / workerThreadCount : 0 if pool.shouldStartSpinning() { // Call steal spin_count times; break if steal returns something.