From b03e1ac4adea2fa9239811c85a903059c45c16c5 Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Sun, 10 May 2020 11:14:14 -0700 Subject: [PATCH 1/5] Add benchmarks and reduce time spent on heap operations in parallelFor. --- .../Benchmarks/NonBlockingThreadPool.swift | 65 ++++++++++++++++--- .../NonBlockingThreadPool.swift | 43 ++++++++++++ 2 files changed, 99 insertions(+), 9 deletions(-) diff --git a/Sources/Benchmarks/NonBlockingThreadPool.swift b/Sources/Benchmarks/NonBlockingThreadPool.swift index 2f440ef1..0ccfbd96 100644 --- a/Sources/Benchmarks/NonBlockingThreadPool.swift +++ b/Sources/Benchmarks/NonBlockingThreadPool.swift @@ -14,6 +14,7 @@ import Benchmark import PenguinParallelWithFoundation +import Dispatch let nonBlockingThreadPool = BenchmarkSuite(name: "NonBlockingThreadPool") { suite in @@ -38,25 +39,71 @@ let nonBlockingThreadPool = BenchmarkSuite(name: "NonBlockingThreadPool") { suit { pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }) } - suite.benchmark("join, four levels, three on thread pool thread") { + suite.benchmark("join, four levels, three on thread pool thread", settings: .iterations(1000)) { pool.join( {}, { - pool.join( - { pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }, - { pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }) + pool.join({ pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }, + { pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }) }) } - suite.benchmark("parallel for, one level") { + let pool2 = Pool(name: "benchmark-pool2", threadCount: 12) + + suite.benchmark("parallel for, one level", settings: .iterations(100)) { let buffer1 = helpers.buffer1 - pool.parallelFor(n: buffer1.count) { (i, n) in buffer1[i] = true } + pool2.parallelFor(n: buffer1.count) { (i, n) in buffer1[i] = true } + } + + suite.benchmark("parallel for, two levels", settings: .iterations(100)) { + let buffer2 = helpers.buffer2 + pool2.parallelFor(n: buffer2.count) { (i, n) in + pool2.parallelFor(n: buffer2[i].count) { (j, _) in buffer2[i][j] = true } + } + } + + + for grainSize in [10, 100, 1000, 2000, 5000] { + suite.benchmark("parallel for, one level, grain size \(grainSize)", settings: .iterations(100)) { + let buffer1 = helpers.buffer1 + pool2.parallelFor(n: buffer1.count, grainSize: grainSize) { (i, n) in buffer1[i] = true } + } + } + + suite.benchmark("parallel for, two levels, grain size 10 & 100", settings: .iterations(100)) { + let buffer2 = helpers.buffer2 + pool2.parallelFor(n: buffer2.count, grainSize: 10) { (i, n) in + pool2.parallelFor(n: buffer2[i].count, grainSize: 100) { (j, _) in buffer2[i][j] = true } + } + } + + suite.benchmark("dispatch concurrent perform, one level", settings: .iterations(100)) { + let buffer1 = helpers.buffer1 + DispatchQueue.concurrentPerform(iterations: buffer1.count) { i in + buffer1[i] = true + } + } + + suite.benchmark("dispatch concurrent perform, two levels", settings: .iterations(100)) { + let buffer2 = helpers.buffer2 + DispatchQueue.concurrentPerform(iterations: buffer2.count) { i in + DispatchQueue.concurrentPerform(iterations: buffer2[i].count) { j in buffer2[i][j] = true } + } + } + + suite.benchmark("sequential one level") { + let buffer1 = helpers.buffer1 + for i in 0..: ComputeThr if let e = err { throw e } } + public func parallelFor(n: Int, grainSize: Int, _ fn: (Int, Int) -> Void) { + withoutActuallyEscaping(fn) { fn in + var holder = ParallelForFunctionHolder { (start, end, total) in + for i in start..: ComputeThr } } +/// Holds a parallel for function; this is used to avoid extra refcount overheads on the function +/// itself. +fileprivate struct ParallelForFunctionHolder { + /// A vectorized function that takes `start`, `end`, and `total`. + typealias VectorizedFunction = (Int, Int, Int) -> Void + var fn: VectorizedFunction +} + +/// Uses `ComputeThreadPool.join` to execute `fn` in parallel. +fileprivate func runParallelFor( + pool: C, + start: Int, + end: Int, + grainSize: Int, + total: Int, + fn: UnsafePointer +) { + assert(end > start, "Unexpected start & end; \(start) -> \(end)") + if end - start <= grainSize { + fn.pointee.fn(start, end, total) + } else { + assert(end > start) + let distance = end - start + let midpoint = start + (distance / 2) + pool.join( + { runParallelFor(pool: pool, start: start, end: midpoint, grainSize: grainSize, total: total, fn: fn) }, + { runParallelFor(pool: pool, start: midpoint, end: end, grainSize: grainSize, total: total, fn: fn) }) + } +} + extension NonBlockingThreadPool { /// Controls whether the thread pool threads should exit and shut down. From 5ed0864b38e71faf16ed713e7269acb55ad7784b Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Wed, 13 May 2020 13:17:59 -0700 Subject: [PATCH 2/5] Remove `.iterations` from Benchmarks, as the auto-tuning works better. --- Sources/Benchmarks/NonBlockingThreadPool.swift | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Sources/Benchmarks/NonBlockingThreadPool.swift b/Sources/Benchmarks/NonBlockingThreadPool.swift index 0ccfbd96..c98b7c1c 100644 --- a/Sources/Benchmarks/NonBlockingThreadPool.swift +++ b/Sources/Benchmarks/NonBlockingThreadPool.swift @@ -50,12 +50,12 @@ let nonBlockingThreadPool = BenchmarkSuite(name: "NonBlockingThreadPool") { suit let pool2 = Pool(name: "benchmark-pool2", threadCount: 12) - suite.benchmark("parallel for, one level", settings: .iterations(100)) { + suite.benchmark("parallel for, one level") { let buffer1 = helpers.buffer1 pool2.parallelFor(n: buffer1.count) { (i, n) in buffer1[i] = true } } - suite.benchmark("parallel for, two levels", settings: .iterations(100)) { + suite.benchmark("parallel for, two levels") { let buffer2 = helpers.buffer2 pool2.parallelFor(n: buffer2.count) { (i, n) in pool2.parallelFor(n: buffer2[i].count) { (j, _) in buffer2[i][j] = true } @@ -64,27 +64,27 @@ let nonBlockingThreadPool = BenchmarkSuite(name: "NonBlockingThreadPool") { suit for grainSize in [10, 100, 1000, 2000, 5000] { - suite.benchmark("parallel for, one level, grain size \(grainSize)", settings: .iterations(100)) { + suite.benchmark("parallel for, one level, grain size \(grainSize)") { let buffer1 = helpers.buffer1 pool2.parallelFor(n: buffer1.count, grainSize: grainSize) { (i, n) in buffer1[i] = true } } } - suite.benchmark("parallel for, two levels, grain size 10 & 100", settings: .iterations(100)) { + suite.benchmark("parallel for, two levels, grain size 10 & 100") { let buffer2 = helpers.buffer2 pool2.parallelFor(n: buffer2.count, grainSize: 10) { (i, n) in pool2.parallelFor(n: buffer2[i].count, grainSize: 100) { (j, _) in buffer2[i][j] = true } } } - suite.benchmark("dispatch concurrent perform, one level", settings: .iterations(100)) { + suite.benchmark("dispatch concurrent perform, one level") { let buffer1 = helpers.buffer1 DispatchQueue.concurrentPerform(iterations: buffer1.count) { i in buffer1[i] = true } } - suite.benchmark("dispatch concurrent perform, two levels", settings: .iterations(100)) { + suite.benchmark("dispatch concurrent perform, two levels") { let buffer2 = helpers.buffer2 DispatchQueue.concurrentPerform(iterations: buffer2.count) { i in DispatchQueue.concurrentPerform(iterations: buffer2[i].count) { j in buffer2[i][j] = true } From ba46a39017d605f9bab1ffabf9e683da07ee0747 Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Wed, 13 May 2020 13:38:58 -0700 Subject: [PATCH 3/5] Remove last explicit iterations setting. --- Sources/Benchmarks/NonBlockingThreadPool.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Benchmarks/NonBlockingThreadPool.swift b/Sources/Benchmarks/NonBlockingThreadPool.swift index c98b7c1c..7c0b3fbd 100644 --- a/Sources/Benchmarks/NonBlockingThreadPool.swift +++ b/Sources/Benchmarks/NonBlockingThreadPool.swift @@ -39,7 +39,7 @@ let nonBlockingThreadPool = BenchmarkSuite(name: "NonBlockingThreadPool") { suit { pool.join({ pool.join({}, {}) }, { pool.join({}, {}) }) }) } - suite.benchmark("join, four levels, three on thread pool thread", settings: .iterations(1000)) { + suite.benchmark("join, four levels, three on thread pool thread") { pool.join( {}, { From 394f97ed36eebae2beda97b8a170ffb632b412e7 Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Wed, 13 May 2020 13:42:09 -0700 Subject: [PATCH 4/5] Optimize parallelFor further (also adds a test for grainSize!) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before: name time std iterations --------------------------------------------------------------------------------------------------------------------- NonBlockingThreadPool: join, one level 690.0 ns ± 163522.04859994436 76147 NonBlockingThreadPool: join, two levels 2064.0 ns ± 316859.10411173315 15613 NonBlockingThreadPool: join, three levels 4941.0 ns ± 351192.3836080291 11462 NonBlockingThreadPool: join, four levels, three on thread pool thread 6369.0 ns ± 400867.0104087665 8426 NonBlockingThreadPool: parallel for, one level 2862068.0 ns ± 395500.70262353314 509 NonBlockingThreadPool: parallel for, two levels 2609616.5 ns ± 251679.60080760892 486 NonBlockingThreadPool: parallel for, one level, grain size 10 345706.0 ns ± 247728.9607701072 2859 NonBlockingThreadPool: parallel for, one level, grain size 100 142316.0 ns ± 292455.2283264145 4417 NonBlockingThreadPool: parallel for, one level, grain size 1000 78977.0 ns ± 214052.89967520352 6321 NonBlockingThreadPool: parallel for, one level, grain size 2000 50729.0 ns ± 164390.21843606833 7247 NonBlockingThreadPool: parallel for, one level, grain size 5000 39656.0 ns ± 190176.9821963313 8649 NonBlockingThreadPool: parallel for, two levels, grain size 10 & 100 1657159.5 ns ± 806196.7324534839 842 NonBlockingThreadPool: dispatch concurrent perform, one level 237865.0 ns ± 21098.405428758302 6223 NonBlockingThreadPool: dispatch concurrent perform, two levels 40180.0 ns ± 5290.912573146778 38178 NonBlockingThreadPool: sequential one level 370.0 ns ± 224.22220630175732 1000000 NonBlockingThreadPool: sequential two levels 485.0 ns ± 279.54687747682283 1000000 After: name time std iterations --------------------------------------------------------------------------------------------------------------------- NonBlockingThreadPool: join, one level 484.0 ns ± 161129.49945627034 78655 NonBlockingThreadPool: join, two levels 1446.0 ns ± 255823.17176138048 29165 NonBlockingThreadPool: join, three levels 3369.0 ns ± 244942.08954270577 21109 NonBlockingThreadPool: join, four levels, three on thread pool thread 3833.0 ns ± 270662.97199023387 15008 NonBlockingThreadPool: parallel for, one level 2661959.0 ns ± 360213.96138696355 543 NonBlockingThreadPool: parallel for, two levels 2469215.5 ns ± 334972.7168585615 538 NonBlockingThreadPool: parallel for, one level, grain size 10 327907.0 ns ± 473609.1412896736 2708 NonBlockingThreadPool: parallel for, one level, grain size 100 125174.5 ns ± 329418.62320991984 3934 NonBlockingThreadPool: parallel for, one level, grain size 1000 73999.0 ns ± 238626.6115465731 6940 NonBlockingThreadPool: parallel for, one level, grain size 2000 50771.0 ns ± 205223.8350497277 7089 NonBlockingThreadPool: parallel for, one level, grain size 5000 26571.5 ns ± 220587.47147580027 8398 NonBlockingThreadPool: parallel for, two levels, grain size 10 & 100 1162189.0 ns ± 940733.3616910677 991 NonBlockingThreadPool: dispatch concurrent perform, one level 247549.0 ns ± 23605.70739546625 5815 NonBlockingThreadPool: dispatch concurrent perform, two levels 41744.0 ns ± 6449.1291628621575 33719 NonBlockingThreadPool: sequential one level 402.0 ns ± 170.32164151542221 1000000 NonBlockingThreadPool: sequential two levels 466.0 ns ± 195.13074674728864 1000000 --- .../NonBlockingThreadPool.swift | 333 ++++++++++++------ .../NonBlockingThreadPoolTests.swift | 18 + 2 files changed, 238 insertions(+), 113 deletions(-) diff --git a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift index 1c54847a..e4d3b101 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift @@ -58,14 +58,14 @@ import PenguinStructures public class NonBlockingThreadPool: ComputeThreadPool { public typealias Task = () -> Void public typealias ThrowingTask = () throws -> Void - typealias Queue = TaskDeque + fileprivate typealias Queue = TaskDeque let allowNonFastPathThreads: Bool let totalThreadCount: Int let externalFastPathThreadCount: Int var externalFastPathThreadSeenCount: Int = 0 let coprimes: [Int] - let queues: [Queue] + fileprivate let queues: [Queue] var cancelledStorage: AtomicUInt64 var blockedCountStorage: AtomicUInt64 var spinningState: AtomicUInt64 @@ -135,12 +135,12 @@ public class NonBlockingThreadPool: ComputeThr perThreadKey.localValue = state } - public func dispatch(_ fn: @escaping Task) { + public func dispatch(_ fn: @escaping () -> Void) { if let local = perThreadKey.localValue { // Push onto local queue. - if let bounced = queues[local.threadId].pushFront(fn) { + if let bounced = queues[local.threadId].pushFront(.dispatch(fn)) { // If local queue is full, execute immediately. - bounced() + execute(bounced, localQueue: queues[local.threadId]) } else { wakeupWorkerIfRequired() } @@ -148,9 +148,9 @@ public class NonBlockingThreadPool: ComputeThr // Called not from within the threadpool; pick a victim thread from the pool at random. // TODO: use a faster RNG! let victim = Int.random(in: 0..: ComputeThr // TODO: Add API to allow expressing parallelFor without requiring closure allocations & test // to see if that improves performance or not. - public func join(_ a: Task, _ b: Task) { + public func join(_ a: () -> Void, _ b: () -> Void) { // add `b` to the work queue (and execute it immediately if queue is full). // if added to the queue, maybe wakeup worker if required. // @@ -174,18 +174,15 @@ public class NonBlockingThreadPool: ComputeThr // work we can do ourselves, we wait on the current thread's ConditionMutex. When `b` is // finally available, the completer must trigger the ConditionMutex. withoutActuallyEscaping(b) { b in - var workItem = WorkItem(b) - let unretainedPool = Unmanaged.passUnretained(self) - withUnsafeMutablePointer(to: &workItem) { workItem in + var joinTask = JoinTask(b) + withUnsafeMutablePointer(to: &joinTask) { joinTask in let perThread = perThreadKey.localValue // Stash in stack variable for performance. // Enqueue `b` into a work queue. if let localThreadIndex = perThread?.threadId { // Push to front of local queue. - if let bounced = queues[localThreadIndex].pushFront( - { runWorkItem(workItem, pool: unretainedPool) } - ) { - bounced() + if let bounced = queues[localThreadIndex].pushFront(.join(joinTask)) { + execute(bounced, localQueue: queues[localThreadIndex]) } else { wakeupWorkerIfRequired() } @@ -199,10 +196,8 @@ public class NonBlockingThreadPool: ComputeThr """) let victim = Int.random(in: 0..: ComputeThr // Thread pool thread... execute work on the threadpool. let q = queues[perThread.threadId] // While `b` is not done, try and be useful - while !workItem.pointee.isDoneAcquiring() { + while !joinTask.pointee.isDoneAcquiring() { if let task = q.popFront() ?? perThread.steal() ?? perThread.spin() { - task() + execute(task, localQueue: q) } else { // No work to be done without blocking, so we block ourselves specially. // This state occurs when another thread stole `b`, but hasn't finished and there's // nothing else useful for us to do. waitingMutex[perThread.threadId].lock() - // Set our handle in the workItem's state. - var state = WorkItemState(workItem.pointee.stateStorage.valueRelaxed) + // Set our handle in the joinTask's state. + var state = JoinTaskState(joinTask.pointee.stateStorage.valueRelaxed) while !state.isDone { let newState = state.settingWakeupThread(perThread.threadId) - if workItem.pointee.stateStorage.cmpxchgAcqRel( + if joinTask.pointee.stateStorage.cmpxchgAcqRel( original: &state.underlying, newValue: newState.underlying) { break @@ -235,7 +230,7 @@ public class NonBlockingThreadPool: ComputeThr } if !state.isDone { waitingMutex[perThread.threadId].await { - workItem.pointee.isDoneAcquiring() // What about cancellation? + joinTask.pointee.isDoneAcquiring() // What about cancellation? } } waitingMutex[perThread.threadId].unlock() @@ -243,14 +238,14 @@ public class NonBlockingThreadPool: ComputeThr } } else { // Do a quick check to see if we can fast-path return... - if !workItem.pointee.isDoneAcquiring() { + if !joinTask.pointee.isDoneAcquiring() { // We ran on the user's thread, so we now wait on the pool's global lock. externalWaitingMutex.lock() // Set the sentinal thread index. - var state = WorkItemState(workItem.pointee.stateStorage.valueRelaxed) + var state = JoinTaskState(joinTask.pointee.stateStorage.valueRelaxed) while !state.isDone { let newState = state.settingWakeupThread(-1) - if workItem.pointee.stateStorage.cmpxchgAcqRel( + if joinTask.pointee.stateStorage.cmpxchgAcqRel( original: &state.underlying, newValue: newState.underlying) { break @@ -258,7 +253,7 @@ public class NonBlockingThreadPool: ComputeThr } if !state.isDone { externalWaitingMutex.await { - workItem.pointee.isDoneAcquiring() // What about cancellation? + joinTask.pointee.isDoneAcquiring() // What about cancellation? } } externalWaitingMutex.unlock() @@ -293,14 +288,42 @@ public class NonBlockingThreadPool: ComputeThr } public func parallelFor(n: Int, grainSize: Int, _ fn: (Int, Int) -> Void) { + parallelFor(n: n, grainSize: grainSize) { start, end, total in + for i in start.. Void) { withoutActuallyEscaping(fn) { fn in - var holder = ParallelForFunctionHolder { (start, end, total) in - for i in start..: ComputeThr } } -/// Holds a parallel for function; this is used to avoid extra refcount overheads on the function -/// itself. -fileprivate struct ParallelForFunctionHolder { - /// A vectorized function that takes `start`, `end`, and `total`. - typealias VectorizedFunction = (Int, Int, Int) -> Void - var fn: VectorizedFunction -} - -/// Uses `ComputeThreadPool.join` to execute `fn` in parallel. -fileprivate func runParallelFor( - pool: C, - start: Int, - end: Int, - grainSize: Int, - total: Int, - fn: UnsafePointer -) { - assert(end > start, "Unexpected start & end; \(start) -> \(end)") - if end - start <= grainSize { - fn.pointee.fn(start, end, total) - } else { - assert(end > start) - let distance = end - start - let midpoint = start + (distance / 2) - pool.join( - { runParallelFor(pool: pool, start: start, end: midpoint, grainSize: grainSize, total: total, fn: fn) }, - { runParallelFor(pool: pool, start: midpoint, end: end, grainSize: grainSize, total: total, fn: fn) }) +extension NonBlockingThreadPool where Environment: DefaultInitializable { + /// Creates `self` using a default-initialized `Environment`, and the specified `name` and + /// `threadCount`. + public convenience init(name: String, threadCount: Int) { + self.init(name: name, threadCount: threadCount, environment: Environment()) } + + // TODO: add a convenience initializer that automatically figures out the number of threads to + // use based on available processor threads. } extension NonBlockingThreadPool { @@ -427,6 +431,121 @@ extension NonBlockingThreadPool { } } + fileprivate func execute(_ task: PoolTask, localQueue: Queue) { + switch task { + case let .dispatch(op): op() + case let .join(task): executeJoinTask(task) + case let .parallelFor(slice): executeParallelForTaskSlice(slice, localQueue: localQueue) + } + } + + fileprivate func executeJoinTask(_ task: UnsafeMutablePointer) { + assert(!task.pointee.isDoneAcquiring(), "Join task done before even starting execution?!?") + task.pointee.op() // Execute this function. + var state = JoinTaskState(task.pointee.stateStorage.valueRelaxed) + while true { + assert(!state.isDone, "state: \(state)") + let newState = state.markingDone() + if task.pointee.stateStorage.cmpxchgAcqRel( + original: &state.underlying, newValue: newState.underlying) + { + if let wakeupThread = state.wakeupThread { + // Do a lock & unlock on the corresponding thread lock. + if wakeupThread != -1 { + waitingMutex[wakeupThread].lock() + waitingMutex[wakeupThread].unlock() + } else { + externalWaitingMutex.lock() + externalWaitingMutex.unlock() + } + } + return + } + } + } + + fileprivate func executeParallelForTaskSlice(_ task: UnsafeMutablePointer, localQueue: Queue) { + executeParallelForTask(task.pointee.task, start: task.pointee.start, end: task.pointee.end, grainSize: task.pointee.grainSize, localQueue: localQueue) + // Mark the task as done. + var state = JoinTaskState(task.pointee.stateStorage.valueRelaxed) + while true { + assert(!state.isDone, "state: \(state)") + let newState = state.markingDone() + if task.pointee.stateStorage.cmpxchgAcqRel( + original: &state.underlying, newValue: newState.underlying) + { + if let wakeupThread = state.wakeupThread { + // Do a lock & unlock on the corresponding thread lock. + if wakeupThread != -1 { + waitingMutex[wakeupThread].lock() + waitingMutex[wakeupThread].unlock() + } else { + externalWaitingMutex.lock() + externalWaitingMutex.unlock() + } + } + return + } + } + } + + fileprivate func executeParallelForTask(_ task: UnsafePointer, start: Int, end: Int, grainSize: Int, localQueue: Queue) { + assert(end > start, "Unexpected start (\(start)) and end (\(end)).") + if end - start <= grainSize { + // Just execute the function. + task.pointee.parallelForFunction(start, end, task.pointee.total) + } else { + let distance = end - start + let midpoint = start + (distance / 2) + + var slice = ParallelForTaskSlice( + task: task, + start: midpoint, + end: end, + grainSize: grainSize) + withUnsafeMutablePointer(to: &slice) { slice in + // Add slice to pool, attempt to execute our half, wait / steal until done. + if let bounced = localQueue.pushFront(.parallelFor(slice)) { + assert(bounced.isParallelFor, "Bounced was: \(bounced)") + // If we bounce, we just do the whole task inline. + task.pointee.parallelForFunction(start, end, task.pointee.total) + } else { + wakeupWorkerIfRequired() + + // Execute local half using recurion. + executeParallelForTask(task, start: start, end: midpoint, grainSize: grainSize, localQueue: localQueue) + + // Do work until the other half is done. + let perThread = perThreadKey.localValue! + while !slice.pointee.isDoneAcquiring() { + if let task = localQueue.popFront() ?? perThread.steal() ?? perThread.spin() { + execute(task, localQueue: localQueue) + } else { + // No work to be done without blocking, so we block ourselves specially. + waitingMutex[perThread.threadId].lock() + // Set our handle in the parallelForSlice's state. + var state = JoinTaskState(slice.pointee.stateStorage.valueRelaxed) + while !state.isDone { + let newState = state.settingWakeupThread(perThread.threadId) + if slice.pointee.stateStorage.cmpxchgAcqRel( + original: &state.underlying, newValue: newState.underlying) + { + break + } + } + if !state.isDone { + waitingMutex[perThread.threadId].await { + slice.pointee.isDoneAcquiring() + } + } + waitingMutex[perThread.threadId].unlock() + } + } + } + } + } + } + /// The worker thread's run loop. private static func workerThread(state: PerThreadState) { state.pool.perThreadKey.localValue = state @@ -435,26 +554,13 @@ extension NonBlockingThreadPool { while !state.isCancelled { if let task = q.popFront() ?? state.steal() ?? state.spin() ?? state.parkUntilWorkAvailable() { - task() // Execute the task. + state.pool.execute(task, localQueue: q) // Execute the task. } } } } -extension NonBlockingThreadPool where Environment: DefaultInitializable { - /// Creates `self` using a default-initialized `Environment`, and the specified `name` and - /// `threadCount`. - public convenience init(name: String, threadCount: Int) { - self.init(name: name, threadCount: threadCount, environment: Environment()) - } - - // TODO: add a convenience initializer that automatically figures out the number of threads to - // use based on available processor threads. -} - fileprivate final class PerThreadState { - typealias Task = NonBlockingThreadPool.Task - init(threadId: Int, pool: NonBlockingThreadPool) { self.threadId = threadId self.pool = pool @@ -473,7 +579,7 @@ fileprivate final class PerThreadState { var isCancelled: Bool { pool.cancelled } - func steal() -> Task? { + func steal() -> PoolTask? { let r = Int(rng.next()) var selectedThreadId = fastFit(r, into: pool.totalThreadCount) let step = pool.coprimes[fastFit(r, into: pool.coprimes.count)] @@ -496,7 +602,7 @@ fileprivate final class PerThreadState { return nil } - func spin() -> Task? { + func spin() -> PoolTask? { let spinCount = pool.threads.count > 0 ? Constants.spinCount / pool.threads.count : 0 if pool.shouldStartSpinning() { @@ -516,7 +622,7 @@ fileprivate final class PerThreadState { return nil } - func parkUntilWorkAvailable() -> Task? { + func parkUntilWorkAvailable() -> PoolTask? { // Already did a best-effort emptiness check in steal, so prepare for blocking. pool.condition.preWait() // Now we do a reliable emptiness check. @@ -650,7 +756,37 @@ extension NonblockingSpinningState: CustomStringConvertible { } } -fileprivate struct WorkItem { +fileprivate enum PoolTask { + case dispatch(() -> ()) + case join(UnsafeMutablePointer) + case parallelFor(UnsafeMutablePointer) + + var isParallelFor: Bool { + switch self { + case .parallelFor: return true + default: return false + } + } +} + +fileprivate struct ParallelForTask { + let parallelForFunction: (Int, Int, Int) -> Void + let total: Int +} + +fileprivate struct ParallelForTaskSlice { + let task: UnsafePointer + let start: Int + let end: Int + let grainSize: Int + var stateStorage: AtomicUInt64 = AtomicUInt64() + + mutating func isDoneAcquiring() -> Bool { + JoinTaskState(stateStorage.valueAcquire).isDone + } +} + +fileprivate struct JoinTask { let op: () -> Void var stateStorage: AtomicUInt64 @@ -660,40 +796,11 @@ fileprivate struct WorkItem { } mutating func isDoneAcquiring() -> Bool { - WorkItemState(stateStorage.valueAcquire).isDone - } -} - -fileprivate func runWorkItem( - _ item: UnsafeMutablePointer, - pool poolUnmanaged: Unmanaged> // Avoid refcount traffic. -) { - assert(!item.pointee.isDoneAcquiring(), "Work item done before even starting execution?!?") - item.pointee.op() // Execute the function. - var state = WorkItemState(item.pointee.stateStorage.valueRelaxed) - while true { - assert(!state.isDone, "state: \(state)") - let newState = state.markingDone() - if item.pointee.stateStorage.cmpxchgAcqRel( - original: &state.underlying, newValue: newState.underlying) - { - if let wakeupThread = state.wakeupThread { - let pool = poolUnmanaged.takeUnretainedValue() - // Do a lock & unlock on the corresponding thread lock. - if wakeupThread != -1 { - pool.waitingMutex[wakeupThread].lock() - pool.waitingMutex[wakeupThread].unlock() - } else { - pool.externalWaitingMutex.lock() - pool.externalWaitingMutex.unlock() - } - } - return - } + JoinTaskState(stateStorage.valueAcquire).isDone } } -fileprivate struct WorkItemState { +fileprivate struct JoinTaskState { var underlying: UInt64 init(_ underlying: UInt64) { self.underlying = underlying } @@ -737,7 +844,7 @@ fileprivate struct WorkItemState { static let externalThreadValue: UInt64 = wakeupMask } -extension WorkItemState: CustomStringConvertible { +extension JoinTaskState: CustomStringConvertible { public var description: String { let wakeupThreadStr: String if let wakeupThread = wakeupThread { @@ -745,7 +852,7 @@ extension WorkItemState: CustomStringConvertible { } else { wakeupThreadStr = "" } - return "WorkItemState(isDone: \(isDone), wakeupThread: \(wakeupThreadStr)))" + return "JoinTaskState(isDone: \(isDone), wakeupThread: \(wakeupThreadStr)))" } } diff --git a/Tests/PenguinParallelTests/NonBlockingThreadPoolTests.swift b/Tests/PenguinParallelTests/NonBlockingThreadPoolTests.swift index 450d6951..6c23c1ea 100644 --- a/Tests/PenguinParallelTests/NonBlockingThreadPoolTests.swift +++ b/Tests/PenguinParallelTests/NonBlockingThreadPoolTests.swift @@ -62,6 +62,23 @@ final class NonBlockingThreadPoolTests: XCTestCase { XCTAssert(seenIndices.allSatisfy { $0 }) } + func testThreadIndexParallelForGrainSize() { + let threadCount = 18 + let pool = Pool(name: "testThreadIndexParallelFor", threadCount: threadCount) + + let condition = NSCondition() + var seenIndices = Array(repeating: false, count: 10000) // guarded by condition. + + pool.parallelFor(n: seenIndices.count, grainSize: 3) { (i, _) in + condition.lock() + XCTAssertFalse(seenIndices[i]) + seenIndices[i] = true + condition.unlock() + } + XCTAssert(seenIndices.allSatisfy { $0 }) + } + + func testGracefulShutdown() { typealias Platform = ThreadCountingPlatform typealias Pool = NonBlockingThreadPool @@ -105,6 +122,7 @@ final class NonBlockingThreadPoolTests: XCTestCase { static var allTests = [ ("testThreadIndexDispatching", testThreadIndexDispatching), ("testThreadIndexParallelFor", testThreadIndexParallelFor), + ("testThreadIndexParallelForGrainSize", testThreadIndexParallelForGrainSize), ("testGracefulShutdown", testGracefulShutdown), ] } From 9ddfef44bc6578101d5b90f86a15a8cde100a4c7 Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Wed, 13 May 2020 15:41:38 -0700 Subject: [PATCH 5/5] Update Benchmark package. --- Package.resolved | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Package.resolved b/Package.resolved index 6ca17be3..79795895 100644 --- a/Package.resolved +++ b/Package.resolved @@ -15,7 +15,7 @@ "repositoryURL": "https://github.com/google/swift-benchmark.git", "state": { "branch": "master", - "revision": "db61b35d92fc975828bb32f5fa4acf2dedcb0d21", + "revision": "baad2fcb7e563716f2f24974d537aa768eef6bde", "version": null } }