diff --git a/Package.resolved b/Package.resolved index 6ca17be3..79795895 100644 --- a/Package.resolved +++ b/Package.resolved @@ -15,7 +15,7 @@ "repositoryURL": "https://github.com/google/swift-benchmark.git", "state": { "branch": "master", - "revision": "db61b35d92fc975828bb32f5fa4acf2dedcb0d21", + "revision": "baad2fcb7e563716f2f24974d537aa768eef6bde", "version": null } } diff --git a/Sources/Benchmarks/NonBlockingThreadPool.swift b/Sources/Benchmarks/NonBlockingThreadPool.swift index 2f440ef1..7c0b3fbd 100644 --- a/Sources/Benchmarks/NonBlockingThreadPool.swift +++ b/Sources/Benchmarks/NonBlockingThreadPool.swift @@ -14,6 +14,7 @@ import Benchmark import PenguinParallelWithFoundation +import Dispatch let nonBlockingThreadPool = BenchmarkSuite(name: "NonBlockingThreadPool") { suite in @@ -42,21 +43,67 @@ let nonBlockingThreadPool = BenchmarkSuite(name: "NonBlockingThreadPool") { suit pool.join( {}, { - pool.join( - { pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }, - { pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }) + pool.join({ pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }, + { pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }) }) } + let pool2 = Pool(name: "benchmark-pool2", threadCount: 12) + suite.benchmark("parallel for, one level") { let buffer1 = helpers.buffer1 - pool.parallelFor(n: buffer1.count) { (i, n) in buffer1[i] = true } + pool2.parallelFor(n: buffer1.count) { (i, n) in buffer1[i] = true } } suite.benchmark("parallel for, two levels") { let buffer2 = helpers.buffer2 - pool.parallelFor(n: buffer2.count) { (i, n) in - pool.parallelFor(n: buffer2[i].count) { (j, _) in buffer2[i][j] = true } + pool2.parallelFor(n: buffer2.count) { (i, n) in + pool2.parallelFor(n: buffer2[i].count) { (j, _) in buffer2[i][j] = true } + } + } + + + for grainSize in [10, 100, 1000, 2000, 5000] { + suite.benchmark("parallel for, one level, grain size \(grainSize)") { + let buffer1 = helpers.buffer1 + pool2.parallelFor(n: buffer1.count, grainSize: grainSize) { (i, n) in buffer1[i] = true } + } + } + + suite.benchmark("parallel for, two levels, grain size 10 & 100") { + let buffer2 = helpers.buffer2 + pool2.parallelFor(n: buffer2.count, grainSize: 10) { (i, n) in + pool2.parallelFor(n: buffer2[i].count, grainSize: 100) { (j, _) in buffer2[i][j] = true } + } + } + + suite.benchmark("dispatch concurrent perform, one level") { + let buffer1 = helpers.buffer1 + DispatchQueue.concurrentPerform(iterations: buffer1.count) { i in + buffer1[i] = true + } + } + + suite.benchmark("dispatch concurrent perform, two levels") { + let buffer2 = helpers.buffer2 + DispatchQueue.concurrentPerform(iterations: buffer2.count) { i in + DispatchQueue.concurrentPerform(iterations: buffer2[i].count) { j in buffer2[i][j] = true } + } + } + + suite.benchmark("sequential one level") { + let buffer1 = helpers.buffer1 + for i in 0..: ComputeThreadPool { public typealias Task = () -> Void public typealias ThrowingTask = () throws -> Void - typealias Queue = TaskDeque + fileprivate typealias Queue = TaskDeque let allowNonFastPathThreads: Bool let totalThreadCount: Int let externalFastPathThreadCount: Int var externalFastPathThreadSeenCount: Int = 0 let coprimes: [Int] - let queues: [Queue] + fileprivate let queues: [Queue] var cancelledStorage: AtomicUInt64 var blockedCountStorage: AtomicUInt64 var spinningState: AtomicUInt64 @@ -135,12 +135,12 @@ public class NonBlockingThreadPool: ComputeThr perThreadKey.localValue = state } - public func dispatch(_ fn: @escaping Task) { + public func dispatch(_ fn: @escaping () -> Void) { if let local = perThreadKey.localValue { // Push onto local queue. - if let bounced = queues[local.threadId].pushFront(fn) { + if let bounced = queues[local.threadId].pushFront(.dispatch(fn)) { // If local queue is full, execute immediately. - bounced() + execute(bounced, localQueue: queues[local.threadId]) } else { wakeupWorkerIfRequired() } @@ -148,9 +148,9 @@ public class NonBlockingThreadPool: ComputeThr // Called not from within the threadpool; pick a victim thread from the pool at random. // TODO: use a faster RNG! let victim = Int.random(in: 0..: ComputeThr // TODO: Add API to allow expressing parallelFor without requiring closure allocations & test // to see if that improves performance or not. - public func join(_ a: Task, _ b: Task) { + public func join(_ a: () -> Void, _ b: () -> Void) { // add `b` to the work queue (and execute it immediately if queue is full). // if added to the queue, maybe wakeup worker if required. // @@ -174,18 +174,15 @@ public class NonBlockingThreadPool: ComputeThr // work we can do ourselves, we wait on the current thread's ConditionMutex. When `b` is // finally available, the completer must trigger the ConditionMutex. withoutActuallyEscaping(b) { b in - var workItem = WorkItem(b) - let unretainedPool = Unmanaged.passUnretained(self) - withUnsafeMutablePointer(to: &workItem) { workItem in + var joinTask = JoinTask(b) + withUnsafeMutablePointer(to: &joinTask) { joinTask in let perThread = perThreadKey.localValue // Stash in stack variable for performance. // Enqueue `b` into a work queue. if let localThreadIndex = perThread?.threadId { // Push to front of local queue. - if let bounced = queues[localThreadIndex].pushFront( - { runWorkItem(workItem, pool: unretainedPool) } - ) { - bounced() + if let bounced = queues[localThreadIndex].pushFront(.join(joinTask)) { + execute(bounced, localQueue: queues[localThreadIndex]) } else { wakeupWorkerIfRequired() } @@ -199,10 +196,8 @@ public class NonBlockingThreadPool: ComputeThr """) let victim = Int.random(in: 0..: ComputeThr // Thread pool thread... execute work on the threadpool. let q = queues[perThread.threadId] // While `b` is not done, try and be useful - while !workItem.pointee.isDoneAcquiring() { + while !joinTask.pointee.isDoneAcquiring() { if let task = q.popFront() ?? perThread.steal() ?? perThread.spin() { - task() + execute(task, localQueue: q) } else { // No work to be done without blocking, so we block ourselves specially. // This state occurs when another thread stole `b`, but hasn't finished and there's // nothing else useful for us to do. waitingMutex[perThread.threadId].lock() - // Set our handle in the workItem's state. - var state = WorkItemState(workItem.pointee.stateStorage.valueRelaxed) + // Set our handle in the joinTask's state. + var state = JoinTaskState(joinTask.pointee.stateStorage.valueRelaxed) while !state.isDone { let newState = state.settingWakeupThread(perThread.threadId) - if workItem.pointee.stateStorage.cmpxchgAcqRel( + if joinTask.pointee.stateStorage.cmpxchgAcqRel( original: &state.underlying, newValue: newState.underlying) { break @@ -235,7 +230,7 @@ public class NonBlockingThreadPool: ComputeThr } if !state.isDone { waitingMutex[perThread.threadId].await { - workItem.pointee.isDoneAcquiring() // What about cancellation? + joinTask.pointee.isDoneAcquiring() // What about cancellation? } } waitingMutex[perThread.threadId].unlock() @@ -243,14 +238,14 @@ public class NonBlockingThreadPool: ComputeThr } } else { // Do a quick check to see if we can fast-path return... - if !workItem.pointee.isDoneAcquiring() { + if !joinTask.pointee.isDoneAcquiring() { // We ran on the user's thread, so we now wait on the pool's global lock. externalWaitingMutex.lock() // Set the sentinal thread index. - var state = WorkItemState(workItem.pointee.stateStorage.valueRelaxed) + var state = JoinTaskState(joinTask.pointee.stateStorage.valueRelaxed) while !state.isDone { let newState = state.settingWakeupThread(-1) - if workItem.pointee.stateStorage.cmpxchgAcqRel( + if joinTask.pointee.stateStorage.cmpxchgAcqRel( original: &state.underlying, newValue: newState.underlying) { break @@ -258,7 +253,7 @@ public class NonBlockingThreadPool: ComputeThr } if !state.isDone { externalWaitingMutex.await { - workItem.pointee.isDoneAcquiring() // What about cancellation? + joinTask.pointee.isDoneAcquiring() // What about cancellation? } } externalWaitingMutex.unlock() @@ -292,6 +287,47 @@ public class NonBlockingThreadPool: ComputeThr if let e = err { throw e } } + public func parallelFor(n: Int, grainSize: Int, _ fn: (Int, Int) -> Void) { + parallelFor(n: n, grainSize: grainSize) { start, end, total in + for i in start.. Void) { + withoutActuallyEscaping(fn) { fn in + if let perThread = perThreadKey.localValue { + var task = ParallelForTask(parallelForFunction: fn, total: n) + withUnsafeMutablePointer(to: &task) { task in + // Run locally. + executeParallelForTask(task, start: 0, end: n, grainSize: grainSize, localQueue: queues[perThread.threadId]) + } + } else { + // Attempt to foist it onto the thread pool & block until it's done. + precondition( + allowNonFastPathThreads, + """ + Non-fast-path thread disallowed. (Set `allowNonFastPathThreads: true` when initializing + \(String(describing: type(of: self))) to allow `parallelFor` to be called from + non-registered threads. Note: this may make debugging performance problems more + difficult.) + """) + let doneCondition = Environment.ConditionMutex() + var isDone = false + doneCondition.lock() + dispatch { [self] in + self.parallelFor(n: n, grainSize: grainSize, fn) + doneCondition.lock() + isDone = true + doneCondition.unlock() + } + doneCondition.await { isDone } + doneCondition.unlock() + } + } + } + /// Shuts down the thread pool. public func shutDown() { cancelled = true @@ -310,6 +346,17 @@ public class NonBlockingThreadPool: ComputeThr } } +extension NonBlockingThreadPool where Environment: DefaultInitializable { + /// Creates `self` using a default-initialized `Environment`, and the specified `name` and + /// `threadCount`. + public convenience init(name: String, threadCount: Int) { + self.init(name: name, threadCount: threadCount, environment: Environment()) + } + + // TODO: add a convenience initializer that automatically figures out the number of threads to + // use based on available processor threads. +} + extension NonBlockingThreadPool { /// Controls whether the thread pool threads should exit and shut down. @@ -384,6 +431,121 @@ extension NonBlockingThreadPool { } } + fileprivate func execute(_ task: PoolTask, localQueue: Queue) { + switch task { + case let .dispatch(op): op() + case let .join(task): executeJoinTask(task) + case let .parallelFor(slice): executeParallelForTaskSlice(slice, localQueue: localQueue) + } + } + + fileprivate func executeJoinTask(_ task: UnsafeMutablePointer) { + assert(!task.pointee.isDoneAcquiring(), "Join task done before even starting execution?!?") + task.pointee.op() // Execute this function. + var state = JoinTaskState(task.pointee.stateStorage.valueRelaxed) + while true { + assert(!state.isDone, "state: \(state)") + let newState = state.markingDone() + if task.pointee.stateStorage.cmpxchgAcqRel( + original: &state.underlying, newValue: newState.underlying) + { + if let wakeupThread = state.wakeupThread { + // Do a lock & unlock on the corresponding thread lock. + if wakeupThread != -1 { + waitingMutex[wakeupThread].lock() + waitingMutex[wakeupThread].unlock() + } else { + externalWaitingMutex.lock() + externalWaitingMutex.unlock() + } + } + return + } + } + } + + fileprivate func executeParallelForTaskSlice(_ task: UnsafeMutablePointer, localQueue: Queue) { + executeParallelForTask(task.pointee.task, start: task.pointee.start, end: task.pointee.end, grainSize: task.pointee.grainSize, localQueue: localQueue) + // Mark the task as done. + var state = JoinTaskState(task.pointee.stateStorage.valueRelaxed) + while true { + assert(!state.isDone, "state: \(state)") + let newState = state.markingDone() + if task.pointee.stateStorage.cmpxchgAcqRel( + original: &state.underlying, newValue: newState.underlying) + { + if let wakeupThread = state.wakeupThread { + // Do a lock & unlock on the corresponding thread lock. + if wakeupThread != -1 { + waitingMutex[wakeupThread].lock() + waitingMutex[wakeupThread].unlock() + } else { + externalWaitingMutex.lock() + externalWaitingMutex.unlock() + } + } + return + } + } + } + + fileprivate func executeParallelForTask(_ task: UnsafePointer, start: Int, end: Int, grainSize: Int, localQueue: Queue) { + assert(end > start, "Unexpected start (\(start)) and end (\(end)).") + if end - start <= grainSize { + // Just execute the function. + task.pointee.parallelForFunction(start, end, task.pointee.total) + } else { + let distance = end - start + let midpoint = start + (distance / 2) + + var slice = ParallelForTaskSlice( + task: task, + start: midpoint, + end: end, + grainSize: grainSize) + withUnsafeMutablePointer(to: &slice) { slice in + // Add slice to pool, attempt to execute our half, wait / steal until done. + if let bounced = localQueue.pushFront(.parallelFor(slice)) { + assert(bounced.isParallelFor, "Bounced was: \(bounced)") + // If we bounce, we just do the whole task inline. + task.pointee.parallelForFunction(start, end, task.pointee.total) + } else { + wakeupWorkerIfRequired() + + // Execute local half using recurion. + executeParallelForTask(task, start: start, end: midpoint, grainSize: grainSize, localQueue: localQueue) + + // Do work until the other half is done. + let perThread = perThreadKey.localValue! + while !slice.pointee.isDoneAcquiring() { + if let task = localQueue.popFront() ?? perThread.steal() ?? perThread.spin() { + execute(task, localQueue: localQueue) + } else { + // No work to be done without blocking, so we block ourselves specially. + waitingMutex[perThread.threadId].lock() + // Set our handle in the parallelForSlice's state. + var state = JoinTaskState(slice.pointee.stateStorage.valueRelaxed) + while !state.isDone { + let newState = state.settingWakeupThread(perThread.threadId) + if slice.pointee.stateStorage.cmpxchgAcqRel( + original: &state.underlying, newValue: newState.underlying) + { + break + } + } + if !state.isDone { + waitingMutex[perThread.threadId].await { + slice.pointee.isDoneAcquiring() + } + } + waitingMutex[perThread.threadId].unlock() + } + } + } + } + } + } + /// The worker thread's run loop. private static func workerThread(state: PerThreadState) { state.pool.perThreadKey.localValue = state @@ -392,26 +554,13 @@ extension NonBlockingThreadPool { while !state.isCancelled { if let task = q.popFront() ?? state.steal() ?? state.spin() ?? state.parkUntilWorkAvailable() { - task() // Execute the task. + state.pool.execute(task, localQueue: q) // Execute the task. } } } } -extension NonBlockingThreadPool where Environment: DefaultInitializable { - /// Creates `self` using a default-initialized `Environment`, and the specified `name` and - /// `threadCount`. - public convenience init(name: String, threadCount: Int) { - self.init(name: name, threadCount: threadCount, environment: Environment()) - } - - // TODO: add a convenience initializer that automatically figures out the number of threads to - // use based on available processor threads. -} - fileprivate final class PerThreadState { - typealias Task = NonBlockingThreadPool.Task - init(threadId: Int, pool: NonBlockingThreadPool) { self.threadId = threadId self.pool = pool @@ -430,7 +579,7 @@ fileprivate final class PerThreadState { var isCancelled: Bool { pool.cancelled } - func steal() -> Task? { + func steal() -> PoolTask? { let r = Int(rng.next()) var selectedThreadId = fastFit(r, into: pool.totalThreadCount) let step = pool.coprimes[fastFit(r, into: pool.coprimes.count)] @@ -453,7 +602,7 @@ fileprivate final class PerThreadState { return nil } - func spin() -> Task? { + func spin() -> PoolTask? { let spinCount = pool.threads.count > 0 ? Constants.spinCount / pool.threads.count : 0 if pool.shouldStartSpinning() { @@ -473,7 +622,7 @@ fileprivate final class PerThreadState { return nil } - func parkUntilWorkAvailable() -> Task? { + func parkUntilWorkAvailable() -> PoolTask? { // Already did a best-effort emptiness check in steal, so prepare for blocking. pool.condition.preWait() // Now we do a reliable emptiness check. @@ -607,7 +756,37 @@ extension NonblockingSpinningState: CustomStringConvertible { } } -fileprivate struct WorkItem { +fileprivate enum PoolTask { + case dispatch(() -> ()) + case join(UnsafeMutablePointer) + case parallelFor(UnsafeMutablePointer) + + var isParallelFor: Bool { + switch self { + case .parallelFor: return true + default: return false + } + } +} + +fileprivate struct ParallelForTask { + let parallelForFunction: (Int, Int, Int) -> Void + let total: Int +} + +fileprivate struct ParallelForTaskSlice { + let task: UnsafePointer + let start: Int + let end: Int + let grainSize: Int + var stateStorage: AtomicUInt64 = AtomicUInt64() + + mutating func isDoneAcquiring() -> Bool { + JoinTaskState(stateStorage.valueAcquire).isDone + } +} + +fileprivate struct JoinTask { let op: () -> Void var stateStorage: AtomicUInt64 @@ -617,40 +796,11 @@ fileprivate struct WorkItem { } mutating func isDoneAcquiring() -> Bool { - WorkItemState(stateStorage.valueAcquire).isDone - } -} - -fileprivate func runWorkItem( - _ item: UnsafeMutablePointer, - pool poolUnmanaged: Unmanaged> // Avoid refcount traffic. -) { - assert(!item.pointee.isDoneAcquiring(), "Work item done before even starting execution?!?") - item.pointee.op() // Execute the function. - var state = WorkItemState(item.pointee.stateStorage.valueRelaxed) - while true { - assert(!state.isDone, "state: \(state)") - let newState = state.markingDone() - if item.pointee.stateStorage.cmpxchgAcqRel( - original: &state.underlying, newValue: newState.underlying) - { - if let wakeupThread = state.wakeupThread { - let pool = poolUnmanaged.takeUnretainedValue() - // Do a lock & unlock on the corresponding thread lock. - if wakeupThread != -1 { - pool.waitingMutex[wakeupThread].lock() - pool.waitingMutex[wakeupThread].unlock() - } else { - pool.externalWaitingMutex.lock() - pool.externalWaitingMutex.unlock() - } - } - return - } + JoinTaskState(stateStorage.valueAcquire).isDone } } -fileprivate struct WorkItemState { +fileprivate struct JoinTaskState { var underlying: UInt64 init(_ underlying: UInt64) { self.underlying = underlying } @@ -694,7 +844,7 @@ fileprivate struct WorkItemState { static let externalThreadValue: UInt64 = wakeupMask } -extension WorkItemState: CustomStringConvertible { +extension JoinTaskState: CustomStringConvertible { public var description: String { let wakeupThreadStr: String if let wakeupThread = wakeupThread { @@ -702,7 +852,7 @@ extension WorkItemState: CustomStringConvertible { } else { wakeupThreadStr = "" } - return "WorkItemState(isDone: \(isDone), wakeupThread: \(wakeupThreadStr)))" + return "JoinTaskState(isDone: \(isDone), wakeupThread: \(wakeupThreadStr)))" } } diff --git a/Tests/PenguinParallelTests/NonBlockingThreadPoolTests.swift b/Tests/PenguinParallelTests/NonBlockingThreadPoolTests.swift index 450d6951..6c23c1ea 100644 --- a/Tests/PenguinParallelTests/NonBlockingThreadPoolTests.swift +++ b/Tests/PenguinParallelTests/NonBlockingThreadPoolTests.swift @@ -62,6 +62,23 @@ final class NonBlockingThreadPoolTests: XCTestCase { XCTAssert(seenIndices.allSatisfy { $0 }) } + func testThreadIndexParallelForGrainSize() { + let threadCount = 18 + let pool = Pool(name: "testThreadIndexParallelFor", threadCount: threadCount) + + let condition = NSCondition() + var seenIndices = Array(repeating: false, count: 10000) // guarded by condition. + + pool.parallelFor(n: seenIndices.count, grainSize: 3) { (i, _) in + condition.lock() + XCTAssertFalse(seenIndices[i]) + seenIndices[i] = true + condition.unlock() + } + XCTAssert(seenIndices.allSatisfy { $0 }) + } + + func testGracefulShutdown() { typealias Platform = ThreadCountingPlatform typealias Pool = NonBlockingThreadPool @@ -105,6 +122,7 @@ final class NonBlockingThreadPoolTests: XCTestCase { static var allTests = [ ("testThreadIndexDispatching", testThreadIndexDispatching), ("testThreadIndexParallelFor", testThreadIndexParallelFor), + ("testThreadIndexParallelForGrainSize", testThreadIndexParallelForGrainSize), ("testGracefulShutdown", testGracefulShutdown), ] }