-
Notifications
You must be signed in to change notification settings - Fork 15
ThreadPool cleanup (3/n): Switch to vectorized API & remove unused/co… #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2708ea6
43f726b
35e0208
590ebd0
a91c527
ac6afb3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -289,7 +289,49 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr | |
| if let e = err { throw e } | ||
| } | ||
|
|
||
| /// Shuts down the thread pool. | ||
| /// Executes `fn`, optionally in parallel, spanning the range `0..<n`. | ||
| public func parallelFor(n: Int, _ fn: VectorizedParallelForBody) { | ||
| let grainSize = n / maxParallelism // TODO: Make adaptive! | ||
|
|
||
| func executeParallelFor(_ start: Int, _ end: Int) { | ||
| if start + grainSize >= end { | ||
| fn(start, end, n) | ||
| } else { | ||
| // Divide into 2 & recurse. | ||
| let rangeSize = end - start | ||
| let midPoint = start + (rangeSize / 2) | ||
| self.join({ executeParallelFor(start, midPoint) }, { executeParallelFor(midPoint, end)}) | ||
| } | ||
| } | ||
|
|
||
| executeParallelFor(0, n) | ||
| } | ||
|
|
||
| /// Executes `fn`, optionally in parallel, spanning the range `0..<n`. | ||
| public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws { | ||
| let grainSize = n / maxParallelism // TODO: Make adaptive! | ||
|
|
||
| func executeParallelFor(_ start: Int, _ end: Int) throws { | ||
| if start + grainSize >= end { | ||
| try fn(start, end, n) | ||
| } else { | ||
| // Divide into 2 & recurse. | ||
| let rangeSize = end - start | ||
| let midPoint = start + (rangeSize / 2) | ||
| try self.join({ try executeParallelFor(start, midPoint) }, { try executeParallelFor(midPoint, end) }) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? Your change description gives the impression you are removing the implementation of parallelFor in terms of
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, good point. That description is getting ahead of the actual implementation in this patch set. I'll update the description in the PR shortly. |
||
| } | ||
| } | ||
|
|
||
| try executeParallelFor(0, n) | ||
| } | ||
|
|
||
| /// Requests that all threads in the threadpool exit and cleans up their associated resources. | ||
| /// | ||
| /// This function returns only once all threads have exited and their resources have been | ||
| /// deallocated. | ||
| /// | ||
| /// Note: if a work item was submitted to the threadpool that never completes (i.e. has an | ||
| /// infinite loop), this function will never return. | ||
| public func shutDown() { | ||
| cancelled = true | ||
| condition.notify(all: true) | ||
|
|
@@ -300,7 +342,7 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr | |
| threads.removeAll() // Remove threads that have been shut down. | ||
| } | ||
|
|
||
| public var parallelism: Int { totalThreadCount } | ||
| public var maxParallelism: Int { totalThreadCount } | ||
|
|
||
| public var currentThreadIndex: Int? { | ||
| perThreadKey.localValue?.threadId | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -60,28 +60,68 @@ public protocol ComputeThreadPool { | |||||
| /// This is the throwing overload | ||||||
| func join(_ a: () throws -> Void, _ b: () throws -> Void) throws | ||||||
|
|
||||||
| /// A function to be invoked in parallel a specified number of times by `parallelFor`. | ||||||
| /// | ||||||
| /// - Parameter currentInvocationIndex: the index of the invocation executing | ||||||
| /// in the current thread. | ||||||
| /// - Parameter requestedInvocationCount: the number of parallel invocations requested. | ||||||
| typealias ParallelForBody | ||||||
| = (_ currentInvocationIndex: Int, _ requestedInvocationCount: Int) -> Void | ||||||
|
|
||||||
| /// A function that can be executed in parallel. | ||||||
| /// | ||||||
| /// The first argument is the index of the copy, and the second argument is the total number of | ||||||
| /// copies being executed. | ||||||
| typealias ParallelForFunc = (Int, Int) throws -> Void | ||||||
| /// - Parameter currentInvocationIndex: the index of the invocation executing | ||||||
| /// in the current thread. | ||||||
| /// - Parameter requestedInvocationCount: the number of parallel invocations requested. | ||||||
| typealias ThrowingParallelForBody | ||||||
| = (_ currentInvocationIndex: Int, _ requestedInvocationCount: Int) throws -> Void | ||||||
|
|
||||||
| /// A vectorized function that can be executed in parallel. | ||||||
| /// | ||||||
| /// The first argument is the start index for the vectorized operation, and the second argument | ||||||
| /// corresponds to the end of the range. The third argument contains the total size of the range. | ||||||
| typealias VectorizedParallelForBody = (Int, Int, Int) -> Void | ||||||
|
|
||||||
| /// A vectorized function that can be executed in parallel. | ||||||
| /// | ||||||
| /// The first argument is the start index for the vectorized operation, and the second argument | ||||||
| /// corresponds to the end of the range. The third argument contains the total size of the range. | ||||||
| typealias ThrowingVectorizedParallelForBody = (Int, Int, Int) throws -> Void | ||||||
|
|
||||||
| /// Returns after executing `fn` `n` times. | ||||||
| /// | ||||||
| /// - Parameter n: The total times to execute `fn`. | ||||||
| func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows | ||||||
| func parallelFor(n: Int, _ fn: ParallelForBody) | ||||||
|
|
||||||
| /// Returns after executing `fn` an unspecified number of times, guaranteeing that `fn` has been | ||||||
| /// called with parameters that perfectly cover of the range `0..<n`. | ||||||
| /// | ||||||
| /// - Parameter n: The range of numbers `0..<n` to cover. | ||||||
| func parallelFor(n: Int, _ fn: VectorizedParallelForBody) | ||||||
|
|
||||||
| /// Returns after executing `fn` `n` times. | ||||||
| /// | ||||||
| /// - Parameter n: The total times to execute `fn`. | ||||||
| func parallelFor(n: Int, _ fn: ThrowingParallelForBody) throws | ||||||
|
|
||||||
| /// Returns after executing `fn` an unspecified number of times, guaranteeing that `fn` has been | ||||||
| /// called with parameters that perfectly cover of the range `0..<n`. | ||||||
| /// | ||||||
| /// - Parameter n: The range of numbers `0..<n` to cover. | ||||||
| func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws | ||||||
|
|
||||||
|
|
||||||
| // TODO: Add this & a default implementation! | ||||||
| // /// Returns after executing `fn` `n` times. | ||||||
| // /// | ||||||
| // /// - Parameter n: The total times to execute `fn`. | ||||||
| // /// - Parameter blocksPerThread: The minimum block size to subdivide. If unspecified, a good | ||||||
| // /// value will be chosen based on the amount of available parallelism. | ||||||
| // func parallelFor(blockingUpTo n: Int, blocksPerThread: Int, _ fn: ParallelForFunc) | ||||||
| // func parallelFor(blockingUpTo n: Int, _ fn: ParallelForFunc) | ||||||
| // func parallelFor(blockingUpTo n: Int, blocksPerThread: Int, _ fn: ParallelForBody) | ||||||
| // func parallelFor(blockingUpTo n: Int, _ fn: ParallelForBody) | ||||||
|
|
||||||
| /// The maximum amount of parallelism possible within this thread pool. | ||||||
| var parallelism: Int { get } | ||||||
| /// The maximum number of concurrent threads of execution supported by this thread pool. | ||||||
| var maxParallelism: Int { get } | ||||||
|
|
||||||
| /// Returns the index of the current thread in the pool, if running on a thread-pool thread, | ||||||
| /// nil otherwise. | ||||||
|
|
@@ -91,125 +131,69 @@ public protocol ComputeThreadPool { | |||||
| } | ||||||
|
|
||||||
| extension ComputeThreadPool { | ||||||
| /// A default implementation of the non-throwing variation in terms of the throwing one. | ||||||
| public func join(_ a: () -> Void, _ b: () -> Void) { | ||||||
| withoutActuallyEscaping(a) { a in | ||||||
| let throwing: () throws -> Void = a | ||||||
| try! join(throwing, b) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Holds a parallel for function; this is used to avoid extra refcount overheads on the function | ||||||
| /// itself. | ||||||
| fileprivate struct ParallelForFunctionHolder { | ||||||
| var fn: ComputeThreadPool.ParallelForFunc | ||||||
| } | ||||||
|
|
||||||
| /// Uses `ComputeThreadPool.join` to execute `fn` in parallel. | ||||||
| fileprivate func runParallelFor<C: ComputeThreadPool>( | ||||||
| pool: C, | ||||||
| start: Int, | ||||||
| end: Int, | ||||||
| total: Int, | ||||||
| fn: UnsafePointer<ParallelForFunctionHolder> | ||||||
| ) throws { | ||||||
| if start + 1 == end { | ||||||
| try fn.pointee.fn(start, total) | ||||||
| } else { | ||||||
| assert(end > start) | ||||||
| let distance = end - start | ||||||
| let midpoint = start + (distance / 2) | ||||||
| try pool.join( | ||||||
| { try runParallelFor(pool: pool, start: start, end: midpoint, total: total, fn: fn) }, | ||||||
| { try runParallelFor(pool: pool, start: midpoint, end: end, total: total, fn: fn) }) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| extension ComputeThreadPool { | ||||||
| public func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows { | ||||||
| try withoutActuallyEscaping(fn) { fn in | ||||||
| var holder = ParallelForFunctionHolder(fn: fn) | ||||||
| try withUnsafePointer(to: &holder) { holder in | ||||||
| try runParallelFor(pool: self, start: 0, end: n, total: n, fn: holder) | ||||||
| /// Implements `parallelFor(n:_:)` (scalar) in terms of `parallelFor(n:_:)` (vectorized). | ||||||
| public func parallelFor(n: Int, _ fn: ParallelForBody) { | ||||||
| parallelFor(n: n) { start, end, total in | ||||||
| for i in start..<end { | ||||||
| fn(i, total) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// Typed compute threadpools support additional sophisticated operations. | ||||||
| public protocol TypedComputeThreadPool: ComputeThreadPool { | ||||||
| /// Submit a task to be executed on the threadpool. | ||||||
| /// | ||||||
| /// `pRun` will execute task in parallel on the threadpool and it will complete at a future time. | ||||||
| /// `pRun` returns immediately. | ||||||
| func dispatch(_ task: (Self) -> Void) | ||||||
|
|
||||||
| /// Run two tasks (optionally) in parallel. | ||||||
| /// | ||||||
| /// Fork-join parallelism allows for efficient work-stealing parallelism. The two non-escaping | ||||||
| /// functions will have finished executing before `pJoin` returns. The first function will execute on | ||||||
| /// the local thread immediately, and the second function will execute on another thread if resources | ||||||
| /// are available, or on the local thread if there are not available other resources. | ||||||
| func join(_ a: (Self) -> Void, _ b: (Self) -> Void) | ||||||
|
|
||||||
| /// Run two throwing tasks (optionally) in parallel; if one task throws, it is unspecified | ||||||
| /// whether the second task is even started. | ||||||
| /// | ||||||
| /// This is the throwing overloaded variation. | ||||||
| func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws | ||||||
| } | ||||||
|
|
||||||
| extension TypedComputeThreadPool { | ||||||
| /// Implement the non-throwing variation in terms of the throwing one. | ||||||
| public func join(_ a: (Self) -> Void, _ b: (Self) -> Void) { | ||||||
| withoutActuallyEscaping(a) { a in | ||||||
| let throwing: (Self) throws -> Void = a | ||||||
| // Implement the non-throwing in terms of the throwing implementation. | ||||||
| try! join(throwing, b) | ||||||
| /// Implements `parallelFor(n:_:)` (scalar) in terms of `parallelFor(n:_:)` (vectorized). | ||||||
| public func parallelFor(n: Int, _ fn: ThrowingParallelForBody) throws { | ||||||
| try parallelFor(n: n) { start, end, total in | ||||||
| for i in start..<end { | ||||||
| try fn(i, total) | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| extension TypedComputeThreadPool { | ||||||
| public func dispatch(_ fn: @escaping () -> Void) { | ||||||
| dispatch { _ in fn() } | ||||||
| } | ||||||
|
|
||||||
| public func join(_ a: () -> Void, _ b: () -> Void) { | ||||||
| join({ _ in a() }, { _ in b() }) | ||||||
| } | ||||||
|
|
||||||
| public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws { | ||||||
| try join({ _ in try a() }, { _ in try b() }) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| /// A `ComputeThreadPool` that executes everything immediately on the current thread. | ||||||
| /// | ||||||
| /// This threadpool implementation is useful for testing correctness, as well as avoiding context | ||||||
| /// switches when a computation is designed to be parallelized at a coarser level. | ||||||
| public struct InlineComputeThreadPool: TypedComputeThreadPool { | ||||||
| public struct InlineComputeThreadPool: ComputeThreadPool { | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Consider the protocol name an initial suggestion. When we do mention
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In terms of thread-pools, there can be a number of different designs with different properties. In the same way that you can implement a random access collection in terms of a collection (just really inefficiently), I wanted to clearly distinguish what properties the thread-pool has. Concretely, there are I/O-focused thread-pools, where you can blocking and/or non-blocking I/O. This thread pool abstraction is focused on compute-bound tasks, and is tuned / structured with APIs focused on that domain. Does that make sense? Happy to ponder the names further... related work also uses
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Naturally.
You mean like
Also naturally. I don't get the connection to random access, though.
In principle. Are they really separate abstractions though, or at least, isn't the IO one a refinement of this one? Wouldn't you want to write most algorithms once rather than replicate them for different kinds of pools? |
||||||
| /// Initializes `self`. | ||||||
| public init() {} | ||||||
|
|
||||||
| /// The amount of parallelism available in this thread pool. | ||||||
| public var parallelism: Int { 1 } | ||||||
| /// The maximum number of concurrent threads of execution supported by this thread pool. | ||||||
| public var maxParallelism: Int { 1 } | ||||||
|
|
||||||
| /// The index of the current thread. | ||||||
| public var currentThreadIndex: Int? { 0 } | ||||||
|
|
||||||
| /// Dispatch `fn` to be run at some point in the future (immediately). | ||||||
| /// | ||||||
| /// Note: this implementation just executes `fn` immediately. | ||||||
| public func dispatch(_ fn: (Self) -> Void) { | ||||||
| fn(self) | ||||||
| public func dispatch(_ fn: () -> Void) { | ||||||
dabrahams marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| fn() | ||||||
| } | ||||||
|
|
||||||
| /// Executes `a` and `b` optionally in parallel, and returns when both are complete. | ||||||
| /// | ||||||
| /// Note: this implementation simply executes them serially. | ||||||
| public func join(_ a: () -> Void, _ b: () -> Void) { | ||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think of
Suggested change
Just an idea.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For context: I picked I think that it would be good to go over this API and think hard about naming & how the abstractions compose, but only once we understand the performance limitations & constraints. (Concretely, some of the (internal) abstractions are being re-written due to performance limitations in the current structure of things.)
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is an established term-of-art, which means we should use it in the way that has been established. You join stuff after you've forked it, in my experience, Rayon notwithstanding. And IME when you fork, you're running the same closure in both threads of execution, with a parameter passed to indicate which invocation of the closure you got, similar to what you did with I'm not sure the concurrency is optional from the programming model P.O.V., which is what matters here. There may or may not be any actual parallelism between Also the word “optional” tends to imply it's up to the user, but it's not; this is up to the library. As for putting off talking about naming and abstractions of “this API” until we know it's performance, I think if we don't do both at once, we don't know what “this API” is. You don't want to design yourself into performance constraints based on assumptions about the programming model that don't actually apply. |
||||||
| a() | ||||||
| b() | ||||||
| } | ||||||
|
|
||||||
| /// Executes `a` and `b` optionally in parallel, and returns when both are complete. | ||||||
| /// | ||||||
| /// Note: this implementation simply executes them serially. | ||||||
| public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws { | ||||||
| try a() | ||||||
| try b() | ||||||
| } | ||||||
|
|
||||||
| public func parallelFor(n: Int, _ fn: VectorizedParallelForBody) { | ||||||
| fn(0, n, n) | ||||||
| } | ||||||
|
|
||||||
| /// Executes `a` and `b` and returns when both are complete. | ||||||
| public func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws { | ||||||
| try a(self) | ||||||
| try b(self) | ||||||
| public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws { | ||||||
| try fn(0, n, n) | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
|
|
@@ -267,8 +251,8 @@ extension ComputeThreadPools { | |||||
| } | ||||||
|
|
||||||
| /// The amount of parallelism provided by the current thread-local compute pool. | ||||||
| public static var parallelism: Int { | ||||||
| local.parallelism | ||||||
| public static var maxParallelism: Int { | ||||||
| local.maxParallelism | ||||||
| } | ||||||
|
|
||||||
| /// Sets `pool` to `local` for the duration of `body`. | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan of this doc. When you say “optionally” it implies the caller gets to pass an option, but that's not the case. Also, I have no idea what it means to “execute a function spanning a range.” Was going to make some suggestions but you've already merged.