diff --git a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift index 9a8e7d94..3a09cde7 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift @@ -100,7 +100,7 @@ public class NonBlockingThreadPool: ComputeThr self.totalThreadCount = totalThreadCount self.externalFastPathThreadCount = externalFastPathThreadCount self.coprimes = positiveCoprimes(totalThreadCount) - self.queues = (0..: ComputeThr deinit { // Shut ourselves down, just in case. shutDown() + // Deallocate queues. + for var q in queues { + q.deallocate() + } } /// Registers the current thread with the thread pool for fast-path operation. diff --git a/Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift b/Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift index 8282f466..d3c33bfb 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift @@ -12,38 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. +import PenguinStructures + /// A fixed-size, partially non-blocking deque of `Element`s. /// /// Operations on the front of the deque must be done by a single thread (the "owner" thread), and /// these operations never block. Operations on the back of the queue can be done by multiple /// threads concurrently (however they are serialized through a mutex). -internal class TaskDeque: ManagedBuffer< - TaskDequeHeader, - TaskDequeElement -> -{ - +internal struct TaskDeque { // TaskDeque keeps all non-empty elements in a contiguous buffer. + typealias Buffer = UnmanagedBuffer, TaskDequeElement> + var buffer: Buffer - class func make() -> Self { + init() { precondition( Constants.capacity > 3 && Constants.capacity <= 65536, "capacity must be between [4, 65536].") precondition( Constants.capacity & (Constants.capacity - 1) == 0, - "capacity must be a power of two for fast masking.") - let deque = Self.create(minimumCapacity: Constants.capacity) { _ in TaskDequeHeader() } as! Self - deque.withUnsafeMutablePointerToElements { elems in - elems.initialize(repeating: TaskDequeElement(), count: Constants.capacity) + "capacity must be a power of two for fast masking.") + buffer = Buffer(capacity: Constants.capacity) { elements in + elements.baseAddress!.initialize(repeating: TaskDequeElement(), count: elements.count) + return TaskDequeHeader() } - return deque + } + private var header: TaskDequeHeader { + _read { yield buffer.header } + nonmutating _modify { yield &buffer.header } } - deinit { + mutating func deallocate() { assert( TaskDequeIndex(header.front.valueRelaxed).index == TaskDequeIndex(header.back.valueRelaxed).index, - "TaskDeque not empty; \(self)") + "TaskDeque not empty; \(self)") + buffer.deallocate { elements, _ in + elements.baseAddress!.deinitialize(count: elements.count) + } } /// Add a new element to the front of the queue. @@ -52,20 +57,18 @@ internal class TaskDeque: ManagedBuff /// - Returns: an `Element` if the queue is full; it is up to the caller to appropriately execute /// the returned element. func pushFront(_ elem: Element) -> Element? { - withUnsafeMutablePointerToElements { elems in - let front = TaskDequeIndex(header.front.valueRelaxed) - var state = elems[front.index].state.valueRelaxed - if TaskState(rawValue: state) != .empty - || !elems[front.index].state.cmpxchgStrongAcquire( - original: &state, newValue: TaskState.busy.rawValue) - { - return elem - } - header.front.setRelaxed(front.movedForward().underlying) - elems[front.index].element = elem - elems[front.index].state.setRelease(TaskState.ready.rawValue) - return nil + let front = TaskDequeIndex(header.front.valueRelaxed) + var state = buffer[front.index].state.valueRelaxed + if TaskState(rawValue: state) != .empty + || !buffer[front.index].state.cmpxchgStrongAcquire( + original: &state, newValue: TaskState.busy.rawValue) + { + return elem } + header.front.setRelaxed(front.movedForward().underlying) + buffer[front.index].element = elem + buffer[front.index].state.setRelease(TaskState.ready.rawValue) + return nil } /// Removes one element from the front of the queue. @@ -73,21 +76,19 @@ internal class TaskDeque: ManagedBuff /// - Invariant: this function must only ever be called by the "owner" thread. /// - Returns: an `Element` if the queue is non-empty. func popFront() -> Element? { - withUnsafeMutablePointerToElements { elems in - let front = TaskDequeIndex(header.front.valueRelaxed).movedBackward() - var state = elems[front.index].state.valueRelaxed - if TaskState(rawValue: state) != .ready - || !elems[front.index].state.cmpxchgStrongAcquire( - original: &state, newValue: TaskState.busy.rawValue) - { - return nil - } - var elem: Element? = nil - swap(&elems[front.index].element, &elem) - elems[front.index].state.setRelease(TaskState.empty.rawValue) - header.front.setRelaxed(front.underlying) - return elem + let front = TaskDequeIndex(header.front.valueRelaxed).movedBackward() + var state = buffer[front.index].state.valueRelaxed + if TaskState(rawValue: state) != .ready + || !buffer[front.index].state.cmpxchgStrongAcquire( + original: &state, newValue: TaskState.busy.rawValue) + { + return nil } + var elem: Element? = nil + swap(&buffer[front.index].element, &elem) + buffer[front.index].state.setRelease(TaskState.empty.rawValue) + header.front.setRelaxed(front.underlying) + return elem } /// Add a new element to the back of the queue. @@ -96,23 +97,21 @@ internal class TaskDeque: ManagedBuff /// - Returns: an `Element` if the queue is full; it is up to the caller to appropriately execute /// the returned element. func pushBack(_ elem: Element) -> Element? { - withUnsafeMutablePointerToElements { elems in - header.lock.lock() - defer { header.lock.unlock() } + header.lock.lock() + defer { header.lock.unlock() } - let back = TaskDequeIndex(header.back.valueRelaxed).movedBackward() - var state = elems[back.index].state.valueRelaxed - if TaskState(rawValue: state) != .empty - || !elems[back.index].state.cmpxchgStrongAcquire( - original: &state, newValue: TaskState.busy.rawValue) - { - return elem - } - header.back.setRelaxed(back.underlying) - elems[back.index].element = elem - elems[back.index].state.setRelease(TaskState.ready.rawValue) - return nil + let back = TaskDequeIndex(header.back.valueRelaxed).movedBackward() + var state = buffer[back.index].state.valueRelaxed + if TaskState(rawValue: state) != .empty + || !buffer[back.index].state.cmpxchgStrongAcquire( + original: &state, newValue: TaskState.busy.rawValue) + { + return elem } + header.back.setRelaxed(back.underlying) + buffer[back.index].element = elem + buffer[back.index].state.setRelease(TaskState.ready.rawValue) + return nil } /// Removes one element from the back of the queue. @@ -122,24 +121,22 @@ internal class TaskDeque: ManagedBuff func popBack() -> Element? { if isEmpty { return nil } // Fast-path to avoid taking lock. - return withUnsafeMutablePointerToElements { elems in - header.lock.lock() - defer { header.lock.unlock() } + header.lock.lock() + defer { header.lock.unlock() } - let back = TaskDequeIndex(header.back.valueRelaxed) - var state = elems[back.index].state.valueRelaxed - if TaskState(rawValue: state) != .ready - || !elems[back.index].state.cmpxchgStrongAcquire( - original: &state, newValue: TaskState.busy.rawValue) - { - return nil - } - var elem: Element? = nil - swap(&elems[back.index].element, &elem) - elems[back.index].state.setRelease(TaskState.empty.rawValue) - header.back.setRelaxed(back.movedForward().underlying) - return elem + let back = TaskDequeIndex(header.back.valueRelaxed) + var state = buffer[back.index].state.valueRelaxed + if TaskState(rawValue: state) != .ready + || !buffer[back.index].state.cmpxchgStrongAcquire( + original: &state, newValue: TaskState.busy.rawValue) + { + return nil } + var elem: Element? = nil + swap(&buffer[back.index].element, &elem) + buffer[back.index].state.setRelease(TaskState.empty.rawValue) + header.back.setRelaxed(back.movedForward().underlying) + return elem } /// False iff the queue contains at least one entry. diff --git a/Sources/PenguinStructures/UnmanagedBuffer.swift b/Sources/PenguinStructures/UnmanagedBuffer.swift new file mode 100644 index 00000000..19813a7f --- /dev/null +++ b/Sources/PenguinStructures/UnmanagedBuffer.swift @@ -0,0 +1,123 @@ +// Copyright 2020 Penguin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// A dynamically-sized, contiguous array of `Element`s prefixed by `Header`. +/// +/// `UnmanagedBuffer` is similar to the standard library's `ManagedBuffer`, with the key difference +/// in that `UnmanagedBuffer` does not induce reference counting overheads. `UnmanagedBuffer` +/// encapsulates a pointer onto the heap that is allocated during `init`. It is the user's +/// responsibility to call `deallocate` exactly once. +/// +/// Although `header` is initialized at `UnmanagedBuffer` initialization, `UnamanagedBuffer` (like +/// `ManagedBuffer` supports partial initialization of the `Element`s array. +public struct UnmanagedBuffer { + /// The pointer to the heap memory. + private let basePointer: UnsafeMutableRawPointer + + /// The offset from the `basePointer` to the start of the contiguous array of `Element`s. + // Note: a previous implementation dynamically computed the `elementOffsetInBytes`, but in certain + // workloads, ~25% of the CPU time was spent in `swift_getGenericMetadata`!! Since we can't + // guarantee full specialization (where this turns into a compile-time constant), we instead + // materialize it explicitly. + private let elementOffsetInBytes: UInt32 + + /// The count of `Element`s capable of being stored in the trailing dynamically-sized buffer. + // Note: we limit the capacity of `UnmanagedBuffer` to be < UInt32.max to stuff the + // UnmanagedBuffer struct itself into 2 words for performance. Future work could consider removing + // this field, and bit-stuffing `elementOffsetInBytes` into the unused bits of `basePointer`, + // making `UnmanagedBuffer` take only a single machine word. + // + // We set elementCapacity to 0 when deallocated as a sentinal. + private var elementCapacity: UInt32 + + // NOTE FOR REVIEWER: Should this be a static function `.allocate()` instead? + /// Allocates an `UnmanagedBuffer` to store up to `capacity` `Element`s, initializing + /// memory with `initializer`. + public init(capacity: Int, initializer: (UnsafeMutableBufferPointer) -> Header) { + precondition( + capacity < UInt32.max, + "Cannot allocate an UnmanagedBuffer with capacity: \(capacity).") + precondition(capacity > 0, "capacity must be > 0!") + precondition(MemoryLayout
.size < UInt32.max) + + let layout = Self.offsetsAndAlignments(capacity: capacity) + + basePointer = UnsafeMutableRawPointer.allocate( + byteCount: layout.totalBytes, + alignment: layout.bufferAlignment) + elementOffsetInBytes = UInt32(layout.elementsOffset) + elementCapacity = UInt32(capacity) + + basePointer.bindMemory(to: Header.self, capacity: 1) + (basePointer + Int(elementOffsetInBytes)).bindMemory(to: Element.self, capacity: capacity) + // (Partially) initialize memory. + headerPointer.initialize(to: initializer(elementsPointer)) + } + + // Note: you must deinitialize yourself first if required (using headerPointer, + // and elementsPointer). + public mutating func deallocate(deinitalizingElements: (UnsafeMutableBufferPointer, Header) -> Void) { + deinitalizingElements(elementsPointer, header) + headerPointer.deinitialize(count: 1) + basePointer.deallocate() + elementCapacity = 0 // Set sentinal. + } + + /// A pointer to the header. + public var headerPointer: UnsafeMutablePointer
{ + assert(elementCapacity > 0, "Attempting to access a deallocate'd UnmanagedBuffer.") + return basePointer.assumingMemoryBound(to: Header.self) + } + + /// A pointer to the collection of elements that comprise the body. + public var elementsPointer: UnsafeMutableBufferPointer { + assert(elementCapacity > 0, "Attempting to access a deallocate'd UnmanagedBuffer.") + let start = (basePointer + Int(elementOffsetInBytes)).assumingMemoryBound(to: Element.self) + return UnsafeMutableBufferPointer(start: start, count: Int(elementCapacity)) + } + + /// The header of the `UnmanagedBuffer`. + public var header: Header { + _read { yield headerPointer.pointee } + nonmutating _modify { yield &headerPointer.pointee } + } + + /// Access elements in `self`. + /// + /// Important note: indexing must only be performed when the buffer's underlying memory has been + /// initialized! + public subscript(index: Int) -> Element { + _read { + precondition(index < capacity, "Index out of bounds (\(index) >= \(capacity)).") + yield elementsPointer[index] + } + nonmutating _modify { + precondition(index < capacity, "Index out of bounds (\(index) >= \(capacity)).") + yield &elementsPointer[index] + } + } + + /// The maximum number of `Element`s that can be stored in `self`. + public var capacity: Int { Int(elementCapacity) } + + internal static func offsetsAndAlignments(capacity: Int) -> (totalBytes: Int, elementsOffset: Int, bufferAlignment: Int) { + // Compute the next alignment offset after MemoryLayout
.size bytes. (Leverages int + // division does truncation, and that MemoryLayout.alignment == 1!) + let offsetSteps = (MemoryLayout
.size + MemoryLayout.alignment - 1) / MemoryLayout.alignment + let offset = offsetSteps * MemoryLayout.alignment + let totalBytes = offset + (capacity * MemoryLayout.stride) + let alignment = max(MemoryLayout.alignment, MemoryLayout
.alignment) + return (totalBytes, offset, alignment) + } +} diff --git a/Tests/PenguinParallelTests/TaskDequeTests.swift b/Tests/PenguinParallelTests/TaskDequeTests.swift index 2bba18e7..529467b8 100644 --- a/Tests/PenguinParallelTests/TaskDequeTests.swift +++ b/Tests/PenguinParallelTests/TaskDequeTests.swift @@ -19,7 +19,11 @@ import XCTest final class TaskDequeTests: XCTestCase { func testSimplePushAndPop() { - let deque = TaskDeque.make() + let deque = TaskDeque() + defer { + var dequeCopy = deque // Make a copy in order to deallocate the backing pointer. + dequeCopy.deallocate() + } XCTAssert(deque.isEmpty) XCTAssertNil(deque.pushFront(1)) diff --git a/Tests/PenguinStructuresTests/UnmanagedBufferTests.swift b/Tests/PenguinStructuresTests/UnmanagedBufferTests.swift new file mode 100644 index 00000000..b17b3ed4 --- /dev/null +++ b/Tests/PenguinStructuresTests/UnmanagedBufferTests.swift @@ -0,0 +1,84 @@ +// Copyright 2020 Penguin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +@testable import PenguinStructures +import XCTest + +final class UnmanagedBufferTests: XCTestCase { + func testOffsetAndAlignments() { + assert(header: Void.self, elements: Int.self, capacity: 10, totalBytes: 80, elementsOffset: 0, alignment: 8) + assert(header: Int.self, elements: Void.self, capacity: 10, totalBytes: 18, elementsOffset: 8, alignment: 8) + assert(header: Int.self, elements: Int.self, capacity: 10, totalBytes: 88, elementsOffset: 8, alignment: 8) + assert(header: (Int, Int32).self, elements: Int.self, capacity: 10, totalBytes: 96, elementsOffset: 16, alignment: 8) + assert(header: Int32.self, elements: (Int.self), capacity: 10, totalBytes: 88, elementsOffset: 8, alignment: 8) + } + + func testMemoryReclaiming() { + var trackCount = 0 + var buffer = UnmanagedBuffer, Int>(capacity: 3) { _ in + // Don't initialize elements. + Tracked(3) { trackCount += $0 } + } + XCTAssertEqual(1, trackCount) + buffer.deallocate { _, _ in } // No need to de-initalize. + XCTAssertEqual(0, trackCount) + } + + func testStoringAndRetrievingElements() { + var trackCount = 0 + var buffer = UnmanagedBuffer>(capacity: 10) { buff in + for i in 0..( + header: H.Type, + elements: E.Type, + capacity: Int, + totalBytes: Int, + elementsOffset: Int, + alignment: Int, + file: StaticString = #file, + line: UInt = #line +) { + let results = UnmanagedBuffer.offsetsAndAlignments(capacity: capacity) + XCTAssertEqual(results.totalBytes, totalBytes, + "Incorrect total bytes; expected: \(totalBytes), computed: \(results.totalBytes)", + file: file, line: line) + XCTAssertEqual(results.elementsOffset, elementsOffset, + "Incorrect elements offset; expected: \(elementsOffset), computed: \(results.elementsOffset)", + file: file, line: line) + XCTAssertEqual(results.bufferAlignment, alignment, + "Incorrect buffer alignment; expected: \(alignment), computed: \(results.bufferAlignment)", + file: file, line: line) +} diff --git a/Tests/PenguinStructuresTests/XCTestManifests.swift b/Tests/PenguinStructuresTests/XCTestManifests.swift index cd892a8c..3e5de58f 100644 --- a/Tests/PenguinStructuresTests/XCTestManifests.swift +++ b/Tests/PenguinStructuresTests/XCTestManifests.swift @@ -27,6 +27,7 @@ import XCTest testCase(TupleTests.allTests), testCase(HeapTests.allTests), testCase(HierarchicalCollectionTests.allTests), + testCase(UnmanagedBufferTests.allTests), ] } #endif