From 2708ea6a52b311fa3b15888cbb27f2feb0cee118 Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Fri, 22 May 2020 10:32:39 -0700 Subject: [PATCH 1/4] ThreadPool cleanup (3/n): Switch to vectorized API & remove unused/confusing extensions and implementations. This change takes a first^H^H^H^H^H^Hthird whack at a bunch of tech debt: 1. Removes the Naive thread pool implementation from `PJoin`. 2. Removes the unnecessary `TypedComputeThreadPool` protocol refinement. 3. Removes the badly implemented extensions that implemented `parallelFor` in terms of `join`. 4. Removes use of `rethrows`, as the rethrows language feature is not expressive enough to allow the performance optimizations for the non-throwing case. --- .../NonBlockingThreadPool.swift | 34 ++ Sources/PenguinParallel/ThreadPool.swift | 172 ++++----- .../PenguinParallelWithFoundation/PJoin.swift | 328 ------------------ .../VertexParallelTests.swift | 29 +- .../NaiveThreadPoolTests.swift | 102 ------ .../XCTestManifests.swift | 1 - 6 files changed, 132 insertions(+), 534 deletions(-) delete mode 100644 Sources/PenguinParallelWithFoundation/PJoin.swift delete mode 100644 Tests/PenguinParallelTests/NaiveThreadPoolTests.swift diff --git a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift index b4014150..7131f7f7 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift @@ -292,6 +292,40 @@ public class NonBlockingThreadPool: ComputeThr if let e = err { throw e } } + public func parallelFor(n: Int, _ fn: VectorizedParallelForFunction) { + let grainSize = n / parallelism // TODO: Make adaptive! + + func executeParallelFor(_ start: Int, _ end: Int) { + if start + grainSize >= end { + fn(start, end, n) + } else { + // Divide into 2 & recurse. + let rangeSize = end - start + let midPoint = start + (rangeSize / 2) + self.join({ executeParallelFor(start, midPoint) }, { executeParallelFor(midPoint, end)}) + } + } + + executeParallelFor(0, n) + } + + public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForFunction) throws { + let grainSize = n / parallelism // TODO: Make adaptive! + + func executeParallelFor(_ start: Int, _ end: Int) throws { + if start + grainSize >= end { + try fn(start, end, n) + } else { + // Divide into 2 & recurse. + let rangeSize = end - start + let midPoint = start + (rangeSize / 2) + try self.join({ try executeParallelFor(start, midPoint) }, { try executeParallelFor(midPoint, end) }) + } + } + + try executeParallelFor(0, n) + } + /// Shuts down the thread pool. public func shutDown() { cancelled = true diff --git a/Sources/PenguinParallel/ThreadPool.swift b/Sources/PenguinParallel/ThreadPool.swift index bc5af71f..388f1efe 100644 --- a/Sources/PenguinParallel/ThreadPool.swift +++ b/Sources/PenguinParallel/ThreadPool.swift @@ -60,16 +60,52 @@ public protocol ComputeThreadPool { /// This is the throwing overload func join(_ a: () throws -> Void, _ b: () throws -> Void) throws + /// A function that can be executed in parallel. + /// + /// The first argument is the index of the invocation, and the second argument is the total number + /// of invocations. + typealias ParallelForFunction = (Int, Int) -> Void + /// A function that can be executed in parallel. /// /// The first argument is the index of the copy, and the second argument is the total number of /// copies being executed. - typealias ParallelForFunc = (Int, Int) throws -> Void + typealias ThrowingParallelForFunction = (Int, Int) throws -> Void + + /// A vectorized function that can be executed in parallel. + /// + /// The first argument is the start index for the vectorized operation, and the second argument + /// corresponds to the end of the range. The third argument contains the total size of the range. + typealias VectorizedParallelForFunction = (Int, Int, Int) -> Void + + /// A vectorized function that can be executed in parallel. + /// + /// The first argument is the start index for the vectorized operation, and the second argument + /// corresponds to the end of the range. The third argument contains the total size of the range. + typealias ThrowingVectorizedParallelForFunction = (Int, Int, Int) throws -> Void /// Returns after executing `fn` `n` times. /// /// - Parameter n: The total times to execute `fn`. - func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows + func parallelFor(n: Int, _ fn: ParallelForFunction) + + /// Returns after executing `fn` an unspecified number of times, guaranteeing that `fn` has been + /// called with parameters that perfectly cover of the range `0.. Void, _ b: () -> Void) { - withoutActuallyEscaping(a) { a in - let throwing: () throws -> Void = a - try! join(throwing, b) - } - } -} -/// Holds a parallel for function; this is used to avoid extra refcount overheads on the function -/// itself. -fileprivate struct ParallelForFunctionHolder { - var fn: ComputeThreadPool.ParallelForFunc -} - -/// Uses `ComputeThreadPool.join` to execute `fn` in parallel. -fileprivate func runParallelFor( - pool: C, - start: Int, - end: Int, - total: Int, - fn: UnsafePointer -) throws { - if start + 1 == end { - try fn.pointee.fn(start, total) - } else { - assert(end > start) - let distance = end - start - let midpoint = start + (distance / 2) - try pool.join( - { try runParallelFor(pool: pool, start: start, end: midpoint, total: total, fn: fn) }, - { try runParallelFor(pool: pool, start: midpoint, end: end, total: total, fn: fn) }) - } -} - -extension ComputeThreadPool { - public func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows { - try withoutActuallyEscaping(fn) { fn in - var holder = ParallelForFunctionHolder(fn: fn) - try withUnsafePointer(to: &holder) { holder in - try runParallelFor(pool: self, start: 0, end: n, total: n, fn: holder) + /// Convert a non-vectorized operation to a vectorized operation. + public func parallelFor(n: Int, _ fn: ParallelForFunction) { + parallelFor(n: n) { start, end, total in + for i in start.. Void) - - /// Run two tasks (optionally) in parallel. - /// - /// Fork-join parallelism allows for efficient work-stealing parallelism. The two non-escaping - /// functions will have finished executing before `pJoin` returns. The first function will execute on - /// the local thread immediately, and the second function will execute on another thread if resources - /// are available, or on the local thread if there are not available other resources. - func join(_ a: (Self) -> Void, _ b: (Self) -> Void) - - /// Run two throwing tasks (optionally) in parallel; if one task throws, it is unspecified - /// whether the second task is even started. - /// - /// This is the throwing overloaded variation. - func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws -} - -extension TypedComputeThreadPool { - /// Implement the non-throwing variation in terms of the throwing one. - public func join(_ a: (Self) -> Void, _ b: (Self) -> Void) { - withoutActuallyEscaping(a) { a in - let throwing: (Self) throws -> Void = a - // Implement the non-throwing in terms of the throwing implementation. - try! join(throwing, b) + /// Convert a non-vectorized operation to a vectorized operation. + public func parallelFor(n: Int, _ fn: ThrowingParallelForFunction) throws { + try parallelFor(n: n) { start, end, total in + for i in start.. Void) { - dispatch { _ in fn() } - } - - public func join(_ a: () -> Void, _ b: () -> Void) { - join({ _ in a() }, { _ in b() }) - } - - public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws { - try join({ _ in try a() }, { _ in try b() }) - } -} - /// A `ComputeThreadPool` that executes everything immediately on the current thread. /// /// This threadpool implementation is useful for testing correctness, as well as avoiding context /// switches when a computation is designed to be parallelized at a coarser level. -public struct InlineComputeThreadPool: TypedComputeThreadPool { +public struct InlineComputeThreadPool: ComputeThreadPool { /// Initializes `self`. public init() {} @@ -202,14 +164,32 @@ public struct InlineComputeThreadPool: TypedComputeThreadPool { /// Dispatch `fn` to be run at some point in the future (immediately). /// /// Note: this implementation just executes `fn` immediately. - public func dispatch(_ fn: (Self) -> Void) { - fn(self) + public func dispatch(_ fn: () -> Void) { + fn() + } + + /// Executes `a` and `b` optionally in parallel, and returns when both are complete. + /// + /// Note: this implementation simply executes them serially. + public func join(_ a: () -> Void, _ b: () -> Void) { + a() + b() + } + + /// Executes `a` and `b` optionally in parallel, and returns when both are complete. + /// + /// Note: this implementation simply executes them serially. + public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws { + try a() + try b() + } + + public func parallelFor(n: Int, _ fn: VectorizedParallelForFunction) { + fn(0, n, n) } - /// Executes `a` and `b` and returns when both are complete. - public func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws { - try a(self) - try b(self) + public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForFunction) throws { + try fn(0, n, n) } } diff --git a/Sources/PenguinParallelWithFoundation/PJoin.swift b/Sources/PenguinParallelWithFoundation/PJoin.swift deleted file mode 100644 index c6adcecc..00000000 --- a/Sources/PenguinParallelWithFoundation/PJoin.swift +++ /dev/null @@ -1,328 +0,0 @@ -// Copyright 2020 Penguin Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import Foundation - -#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) - import Darwin -#else - import Glibc -#endif - -/// A Naive ThreadPool. -/// -/// It has well-known performance problems, but is used as a reference implementation to: (1) test -/// correctness of alternate implementations, and (2) to allow higher levels of abstraction to be -/// developed (and tested) in parallel with an efficient implementation of `ThreadPool`. -final public class NaiveThreadPool: TypedComputeThreadPool { - init(workerCount: Int) { - workers.reserveCapacity(workerCount) - for i in 0.. Void) { - // TODO: Implement me! - fatalError("SORRY NOT YET IMPLEMENTED!") - } - - public func join(_ a: (NaiveThreadPool) throws -> Void, _ b: (NaiveThreadPool) throws -> Void) - throws - { - // TODO: Avoid extra closure construction! - try withoutActuallyEscaping({ try b(self) }) { b in - var item = WorkItem(op: b) - var aError: Error? = nil - contexts.addLocal(item: &item) - defer { - let tmp = contexts.popLocal() - assert(tmp == &item, "Popped something other than item!") - } - do { - try a(self) - } catch { - if item.tryTake() { - throw error - } else { - // Another thread is executing item; can't return! - aError = error - } - } - if item.tryTake() { - item.execute() - } - // In case it was stolen by a background thread, steal other work. - while !item.isFinished { - // Prefer local work over remote work. - if let work = contexts.lookForWorkLocal() { - work.pointee.execute() - continue - } - // Look for local work. - let ctxs = contexts.allContexts() - for ctx in ctxs { - if let work = ctx.lookForWork() { - work.pointee.execute() - break - } - } - } - if let error = aError { - throw error - } - if let error = item.error { - throw error - } - } - } - - private func allContexts() -> AllContexts { - return contexts - } - - private var contexts = AllContexts() - private var workers = [Worker]() - - private final class Worker: Thread { - - init(name: String, index: Int, allContexts: AllContexts) { - self.allContexts = allContexts - self.index = index - super.init() - self.name = name - } - - override final func main() { - // Touch the thread-local context to create it & add it to the AllContext's list. - _ = Context.local(index: index, allContexts: allContexts) - var foundWork = false // If we're finding work, don't wait on the condition variable. - - // Loop, looking for work. - while true { - let ctxs: [Context] - if !foundWork { - ctxs = allContexts.wait() - } else { - ctxs = allContexts.allContexts() - } - - foundWork = false - for ctx in ctxs { - if let item = ctx.lookForWork() { - item.pointee.execute() - foundWork = true - } - } - } - // TODO: have a good way for a clean shutdown. - } - - let allContexts: AllContexts - let index: Int - } - - // Note: this cheap-and-dirty implementation is nowhere close to optimal! - private final class AllContexts { - func append(_ context: Context) { - cond.lock() - defer { cond.unlock() } - cond.signal() - contexts.append(context) - } - - func addLocal(item: UnsafeMutablePointer) { - Context.local(index: -1, allContexts: self).add(item) - notify() - } - - func lookForWorkLocal() -> UnsafeMutablePointer? { - Context.local(index: -1, allContexts: self).lookForWorkLocal() - } - - func popLocal() -> UnsafeMutablePointer? { - Context.local(index: -1, allContexts: self).popLast() - } - - func allContexts() -> [Context] { - cond.lock() - defer { cond.unlock() } - return contexts - } - - func wait() -> [Context] { - cond.lock() - defer { cond.unlock() } - cond.wait() - return contexts - } - - func notify() { - cond.signal() - } - - func broadcast() { - cond.broadcast() - } - - // TODO: Keep track of contexts that might have useful things in order to avoid - // doing unnecessary work when looking for work if this turns out to be a - // performance problem. - private var contexts = [Context]() - private let cond = NSCondition() - } - - /// Thread-local contexts - private final class Context { - private var lock = NSLock() - private var workItems = [UnsafeMutablePointer]() - let index: Int - - init(index: Int) { - self.index = index - } - - // Adds a workitem to the list. - // - // Note: the caller must notify potential AllContext's waiters. (This should not be directly called, and - // only called within `AllContexts.addLocal`.) - func add(_ item: UnsafeMutablePointer) { - lock.lock() - defer { lock.unlock() } - workItems.append(item) - } - - // This should also only be called when - func popLast() -> UnsafeMutablePointer? { - lock.lock() - defer { lock.unlock() } - return workItems.popLast() - } - - func lookForWork() -> UnsafeMutablePointer? { - lock.lock() - defer { lock.unlock() } - for elem in workItems { - if elem.pointee.tryTake() { - return elem - } - } - return nil - } - - func lookForWorkLocal() -> UnsafeMutablePointer? { - lock.lock() - defer { lock.unlock() } - for elem in workItems.reversed() { - if elem.pointee.tryTake() { - return elem - } - } - return nil - } - - /// The data key for the singleton `Context` in the current thread. - /// - /// TODO: figure out what to do vis-a-vis multiple thread pools? Maybe re-structure to avoid using - /// threadlocal variables, and instead create a map keyed by thread id? - static let key: pthread_key_t = { - var key = pthread_key_t() - pthread_key_create(&key) { obj in - #if !(os(macOS) || os(iOS) || os(watchOS) || os(tvOS)) - let obj = obj! - #endif - Unmanaged.fromOpaque(obj).release() - } - return key - }() - - /// The thread-local singleton. - static func local(index: Int, allContexts: AllContexts) -> Context { - if let address = pthread_getspecific(key) { - return Unmanaged.fromOpaque(address).takeUnretainedValue() - } - let context = Context(index: index) - allContexts.append(context) - pthread_setspecific(key, Unmanaged.passRetained(context).toOpaque()) - return context - } - } - - public static let global = NaiveThreadPool() -} - -private struct WorkItem { - enum State { - case pre - case ongoing - case finished - } - var op: () throws -> Void // guarded by lock. - var error: Error? // guarded by lock. - var state: State = .pre - let lock = NSLock() // TODO: use atomic operations on State. (No need for a lock.) - - /// Returns true if this thread should execute op, false otherwise. - mutating func tryTake() -> Bool { - lock.lock() - defer { lock.unlock() } - if state == .pre { - state = .ongoing - return true - } - return false - } - - mutating func execute() { - do { - try op() - } catch { - self.error = error - } - markFinished() - } - - mutating func markFinished() { - lock.lock() - defer { lock.unlock() } - assert(state == .ongoing) - state = .finished - } - - var isFinished: Bool { - lock.lock() - defer { lock.unlock() } - return state == .finished - } -} diff --git a/Tests/PenguinGraphTests/VertexParallelTests.swift b/Tests/PenguinGraphTests/VertexParallelTests.swift index e5972859..f59f2a3f 100644 --- a/Tests/PenguinGraphTests/VertexParallelTests.swift +++ b/Tests/PenguinGraphTests/VertexParallelTests.swift @@ -271,7 +271,9 @@ final class VertexParallelTests: XCTestCase { } func testPerThreadMailboxesMultiThreaded() { - ComputeThreadPools.withPool(NaiveThreadPool.global) { + // TODO: Don't create a new thread pool in the test. + let pool = PosixNonBlockingThreadPool(name: "per-thread-mailboxes-multi-threaded") + ComputeThreadPools.withPool(pool) { XCTAssert(ComputeThreadPools.parallelism > 1) var g = makeDistanceGraph() let vIds = g.vertices @@ -501,19 +503,32 @@ fileprivate struct TestMessage: Equatable, MergeableMessage { /// A test thread pool that doesn't have parallelism, but makes it easy to pretend as if multiple /// threads are sequentially performing operations. -fileprivate struct TestSequentialThreadPool: TypedComputeThreadPool { +fileprivate struct TestSequentialThreadPool: ComputeThreadPool { /// The amount of parallelism to simulate in this thread pool. public let parallelism: Int /// Set this to define the thread this simulation should be running on. public var currentThreadIndex: Int? = nil - public func dispatch(_ fn: (Self) -> Void) { - fn(self) + public func dispatch(_ fn: @escaping () -> Void) { + fn() } - public func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws { - try a(self) - try b(self) + public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws { + try a() + try b() + } + + public func join(_ a: () -> Void, _ b: () -> Void) { + a() + b() + } + + public func parallelFor(n: Int, _ fn: VectorizedParallelForFunction) { + fn(0, n, n) + } + + public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForFunction) throws { + try fn(0, n, n) } } diff --git a/Tests/PenguinParallelTests/NaiveThreadPoolTests.swift b/Tests/PenguinParallelTests/NaiveThreadPoolTests.swift deleted file mode 100644 index fd796ade..00000000 --- a/Tests/PenguinParallelTests/NaiveThreadPoolTests.swift +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2020 Penguin Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import Foundation -import PenguinParallelWithFoundation -import XCTest - -final class NaiveThreadPoolTests: XCTestCase { - - func testThrowingJoin() throws { - do { - try NaiveThreadPool.global.join({ _ in }, { _ in throw TestError() }) - XCTFail("Should have thrown!") - } catch is TestError {} // Pass - - do { - try NaiveThreadPool.global.join({ _ in throw TestError() }, { _ in }) - XCTFail("Should have thrown!") - } catch is TestError {} // Pass - } - - func testThrowingParallelFor() throws { - do { - try NaiveThreadPool.global.parallelFor(n: 100) { (i, _) in - if i == 57 { - throw TestError() - } - } - XCTFail("Should have thrown!") - } catch is TestError { - // Pass! - return - } - } - - func testThreadIndex() { - let threadCount = NaiveThreadPool.global.parallelism - // Run `threadCount` tasks, and block on `condition` until all of them have consumed the - // thread pool resources. Check the thread index on each one to ensure we've seen them all. - // The main thread waits on `doneCondition` which is notified when `threadsSeenCount` - // returns to 0. - let condition = NSCondition() - - // Because the thread pool is intentionally racy (for performance reasons), and assumes work - // does not block the thread pool, we intentionally do not block indefinitely upon - // `condition`, and instead try a few times to ensure we appropriately satisfy the test - // conditions. - for attempt in 0..<10 { - var threadsSeenCount = 0 // guarded by condition - var seenMainThread = false // guarded by condition - var seenThreadIds = Set() // guarded by condition - var successfulRun = true // guarded by condition - - NaiveThreadPool.global.parallelFor(n: threadCount + 1) { (i, _) in - condition.lock() - if let threadId = NaiveThreadPool.global.currentThreadIndex { - threadsSeenCount += 1 - seenThreadIds.insert(threadId) - } else { - // Either we've not seen the main thread, or this must be an unsuccessful run. - XCTAssert( - !seenMainThread || !successfulRun, - "\(attempt): Main thread: \(seenMainThread), successful: \(successfulRun)") - seenMainThread = true - } - if threadsSeenCount == threadCount { - condition.broadcast() // Wake up all waiters. - } else { - if !condition.wait(until: Date() + 0.5) { // Wait no more than 500 ms. - successfulRun = false - } - } - condition.unlock() - } - // Finished. - if !successfulRun { continue } // try again. - XCTAssert(seenMainThread) - XCTAssertEqual(Set(0.. Date: Sat, 23 May 2020 16:47:23 -0700 Subject: [PATCH 2/4] Update Sources/PenguinParallel/ThreadPool.swift Co-authored-by: Dave Abrahams --- Sources/PenguinParallel/ThreadPool.swift | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Sources/PenguinParallel/ThreadPool.swift b/Sources/PenguinParallel/ThreadPool.swift index 388f1efe..a9a06c51 100644 --- a/Sources/PenguinParallel/ThreadPool.swift +++ b/Sources/PenguinParallel/ThreadPool.swift @@ -60,11 +60,13 @@ public protocol ComputeThreadPool { /// This is the throwing overload func join(_ a: () throws -> Void, _ b: () throws -> Void) throws - /// A function that can be executed in parallel. + /// A function to be invoked in parallel a specified number of times by `parallelFor`. /// - /// The first argument is the index of the invocation, and the second argument is the total number - /// of invocations. - typealias ParallelForFunction = (Int, Int) -> Void + /// - Parameter currentInvocationIndex: the index of the invocation executing + /// in the current thread. + /// - Parameter requestedInvocationCount: the number of parallel invocations requested. + typealias ParallelForBody + = (_ currentInvocationIndex: Int, _ requestedInvocationCount: Int) -> Void /// A function that can be executed in parallel. /// From 590ebd0eee76430ca3d6caf85dd022b5f8d5dba7 Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Sat, 23 May 2020 17:04:46 -0700 Subject: [PATCH 3/4] Improve the code based on feedback. --- .../NonBlockingThreadPool.swift | 12 +++++++++-- Sources/PenguinParallel/ThreadPool.swift | 20 +++++++++---------- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift index 93eef44a..7b42cef2 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift @@ -289,6 +289,7 @@ public class NonBlockingThreadPool: ComputeThr if let e = err { throw e } } + /// Executes `fn`, optionally in parallel, spanning the range `0..: ComputeThr executeParallelFor(0, n) } + /// Executes `fn`, optionally in parallel, spanning the range `0..: ComputeThr try executeParallelFor(0, n) } - /// Shuts down the thread pool. + /// Requests that all threads in the threadpool exit and cleans up their associated resources. + /// + /// This function returns only once all threads have exited and their resources have been + /// deallocated. + /// + /// Note: if a work item was submitted to the threadpool that never completes (i.e. has an + /// infinite loop), this function will never return. public func shutDown() { cancelled = true condition.notify(all: true) @@ -334,7 +342,7 @@ public class NonBlockingThreadPool: ComputeThr threads.removeAll() // Remove threads that have been shut down. } - public var parallelism: Int { totalThreadCount } + public var maxParallelism: Int { totalThreadCount } public var currentThreadIndex: Int? { perThreadKey.localValue?.threadId diff --git a/Sources/PenguinParallel/ThreadPool.swift b/Sources/PenguinParallel/ThreadPool.swift index 388f1efe..6786fc44 100644 --- a/Sources/PenguinParallel/ThreadPool.swift +++ b/Sources/PenguinParallel/ThreadPool.swift @@ -62,14 +62,14 @@ public protocol ComputeThreadPool { /// A function that can be executed in parallel. /// - /// The first argument is the index of the invocation, and the second argument is the total number - /// of invocations. + /// The first argument is the index of the invocation, and the second argument is total number + /// of invocations (`n`) requested in the `parallelFor(n:_)` call. typealias ParallelForFunction = (Int, Int) -> Void /// A function that can be executed in parallel. /// - /// The first argument is the index of the copy, and the second argument is the total number of - /// copies being executed. + /// The first argument is the index of the invocation, and the second argument is total number + /// of invocations (`n`) requested in the `parallelFor(n:_)` call. typealias ThrowingParallelForFunction = (Int, Int) throws -> Void /// A vectorized function that can be executed in parallel. @@ -116,8 +116,8 @@ public protocol ComputeThreadPool { // func parallelFor(blockingUpTo n: Int, blocksPerThread: Int, _ fn: ParallelForFunction) // func parallelFor(blockingUpTo n: Int, _ fn: ParallelForFunction) - /// The maximum amount of parallelism possible within this thread pool. - var parallelism: Int { get } + /// The maximum number of concurrent threads of execution supported by this thread pool. + var maxParallelism: Int { get } /// Returns the index of the current thread in the pool, if running on a thread-pool thread, /// nil otherwise. @@ -128,7 +128,7 @@ public protocol ComputeThreadPool { extension ComputeThreadPool { - /// Convert a non-vectorized operation to a vectorized operation. + /// Implements `parallelFor(n:_:)` (scalar) in terms of `parallelFor(n:_:)` (vectorized). public func parallelFor(n: Int, _ fn: ParallelForFunction) { parallelFor(n: n) { start, end, total in for i in start.. Date: Sat, 23 May 2020 17:13:37 -0700 Subject: [PATCH 4/4] Fix up tests. --- .../PenguinGraphTests/VertexParallelTests.swift | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Tests/PenguinGraphTests/VertexParallelTests.swift b/Tests/PenguinGraphTests/VertexParallelTests.swift index f59f2a3f..7c28d0df 100644 --- a/Tests/PenguinGraphTests/VertexParallelTests.swift +++ b/Tests/PenguinGraphTests/VertexParallelTests.swift @@ -219,18 +219,18 @@ final class VertexParallelTests: XCTestCase { } func testPerThreadMailboxesShortestPathsUserThread() { - let testPool = TestSequentialThreadPool(parallelism: 10) + let testPool = TestSequentialThreadPool(maxParallelism: 10) runParallelMailboxesTest(testPool) } func testPerThreadMailboxesShortestPathsPoolThread() { - var testPool = TestSequentialThreadPool(parallelism: 10) + var testPool = TestSequentialThreadPool(maxParallelism: 10) testPool.currentThreadIndex = 3 runParallelMailboxesTest(testPool) } func testPerThreadMailboxesWonkyMessagePatterns() { - var testPool = TestSequentialThreadPool(parallelism: 10) + var testPool = TestSequentialThreadPool(maxParallelism: 10) let g = makeDistanceGraph() let vIds = g.vertices ComputeThreadPools.withPool(testPool) { @@ -274,7 +274,7 @@ final class VertexParallelTests: XCTestCase { // TODO: Don't create a new thread pool in the test. let pool = PosixNonBlockingThreadPool(name: "per-thread-mailboxes-multi-threaded") ComputeThreadPools.withPool(pool) { - XCTAssert(ComputeThreadPools.parallelism > 1) + XCTAssert(ComputeThreadPools.maxParallelism > 1) var g = makeDistanceGraph() let vIds = g.vertices var mailboxes = PerThreadMailboxes( @@ -310,7 +310,7 @@ final class VertexParallelTests: XCTestCase { } func testPerThreadMailboxesDelivery() { - var testPool = TestSequentialThreadPool(parallelism: 10, currentThreadIndex: 3) + var testPool = TestSequentialThreadPool(maxParallelism: 10, currentThreadIndex: 3) let mailboxes = PerThreadMailboxes(vertexCount: 5, threadCount: 10) ComputeThreadPools.withPool(testPool) { @@ -505,7 +505,7 @@ fileprivate struct TestMessage: Equatable, MergeableMessage { /// threads are sequentially performing operations. fileprivate struct TestSequentialThreadPool: ComputeThreadPool { /// The amount of parallelism to simulate in this thread pool. - public let parallelism: Int + public let maxParallelism: Int /// Set this to define the thread this simulation should be running on. public var currentThreadIndex: Int? = nil @@ -524,11 +524,11 @@ fileprivate struct TestSequentialThreadPool: ComputeThreadPool { b() } - public func parallelFor(n: Int, _ fn: VectorizedParallelForFunction) { + public func parallelFor(n: Int, _ fn: VectorizedParallelForBody) { fn(0, n, n) } - public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForFunction) throws { + public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws { try fn(0, n, n) } }