From 9e31153761351c76b153b9893808e447998e6e8d Mon Sep 17 00:00:00 2001 From: Brennan Saeta Date: Sun, 24 May 2020 16:34:54 -0700 Subject: [PATCH] Implement `UnmanagedBuffer` and switch `TaskDeque` to use it. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, `TaskDeque` was implemented in terms of `ManagedBuffer`. While `ManagedBuffer` implements the semantics we'd like, it is implemented as a class. This can induce a significant amount of reference couting traffic, especially when stored in `Array`s. `UnmanagedBuffer` implements a similar interface to `ManagedBuffer`, but instead uses manual pointer allocation and management. This allows us to avoid all reference counting traffic, at the cost of requiring explicit destruction. Switching `TaskDeque` from `ManagedBuffer` to `UnmanagedBuffer` yields between a 2x and 6x performance improvemenet for key workloads that stress the `TaskDeque` data structure within the `NonBlockingThreadPool`. Below are performance numbers across 2 machines & operating systems demonstrating performance improvements. Note: because this change has been extracted from a stack of related performance improvements, if you benchmark this PR itself, you will not see the expected performance improvements. Instead, this PR has been separated out to facilitate easier reviewing. Benchmark numbers on machine A: ------------------------------- Before (from previous commit): ``` name time std iterations -------------------------------------------------------------------------------------------------------------------- NonBlockingThreadPool: join, one level 700.0 ns ± 70289.84998218225 127457 NonBlockingThreadPool: join, two levels 2107.0 ns ± 131041.5070696377 31115 NonBlockingThreadPool: join, three levels 4960.0 ns ± 178122.9562964306 15849 NonBlockingThreadPool: join, four levels, three on thread pool thread 5893.0 ns ± 224021.47900401088 13763 NonBlockingThreadPool: parallel for, one level 22420.0 ns ± 203689.69689780468 7581 NonBlockingThreadPool: parallel for, two levels 500985.5 ns ± 642136.0139757036 1390 ``` After: ``` name time std iterations -------------------------------------------------------------------------------------------------------------------- NonBlockingThreadPool: join, one level 429.0 ns ± 59095.36128834737 223050 NonBlockingThreadPool: join, two levels 1270.0 ns ± 101587.48601579959 64903 NonBlockingThreadPool: join, three levels 3098.0 ns ± 165407.1669656578 28572 NonBlockingThreadPool: join, four levels, three on thread pool thread 3990.5 ns ± 227217.34017343252 10000 NonBlockingThreadPool: parallel for, one level 16853.0 ns ± 260015.39296821563 8660 NonBlockingThreadPool: parallel for, two levels 563926.0 ns ± 609298.6358076902 2189 ``` Benchmark numbers from machine B: --------------------------------- Before: ``` name time std iterations --------------------------------------------------------------------------------------------------------------------- NonBlockingThreadPool: join, one level 3022.0 ns ± 366686.3050127019 21717 NonBlockingThreadPool: join, two levels 13313.5 ns ± 550429.476815564 5970 NonBlockingThreadPool: join, three levels 39009.5 ns ± 716172.9687807652 3546 NonBlockingThreadPool: join, four levels, three on thread pool thread 341631.0 ns ± 767483.9227743072 2367 NonBlockingThreadPool: parallel for, one level 404375.0 ns ± 590178.6724299589 3123 NonBlockingThreadPool: parallel for, two levels 1000872.0 ns ± 1592704.2766365155 805 ``` After: ``` name time std iterations --------------------------------------------------------------------------------------------------------------------- NonBlockingThreadPool: join, one level 749.0 ns ± 174096.69284101788 91247 NonBlockingThreadPool: join, two levels 12046.5 ns ± 414670.5686344325 5920 NonBlockingThreadPool: join, three levels 46975.0 ns ± 543858.2306554643 3561 NonBlockingThreadPool: join, four levels, three on thread pool thread 559837.0 ns ± 591477.1893574063 2795 NonBlockingThreadPool: parallel for, one level 66446.0 ns ± 627245.5098742851 2236 NonBlockingThreadPool: parallel for, two levels 1668739.0 ns ± 1536323.375783659 765 ``` --- .../NonBlockingThreadPool.swift | 6 +- .../NonblockingThreadPool/TaskDeque.swift | 141 +++++++++--------- .../PenguinStructures/UnmanagedBuffer.swift | 123 +++++++++++++++ .../PenguinParallelTests/TaskDequeTests.swift | 6 +- .../UnmanagedBufferTests.swift | 84 +++++++++++ .../XCTestManifests.swift | 1 + 6 files changed, 287 insertions(+), 74 deletions(-) create mode 100644 Sources/PenguinStructures/UnmanagedBuffer.swift create mode 100644 Tests/PenguinStructuresTests/UnmanagedBufferTests.swift diff --git a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift index 9fe54f58..774e9503 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift @@ -100,7 +100,7 @@ public class NonBlockingThreadPool: ComputeThr self.totalThreadCount = totalThreadCount self.externalFastPathThreadCount = externalFastPathThreadCount self.coprimes = positiveCoprimes(totalThreadCount) - self.queues = (0..: ComputeThr deinit { // Shut ourselves down, just in case. shutDown() + // Deallocate queues. + for var q in queues { + q.deallocate() + } } /// Registers the current thread with the thread pool for fast-path operation. diff --git a/Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift b/Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift index 8282f466..d3c33bfb 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift @@ -12,38 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. +import PenguinStructures + /// A fixed-size, partially non-blocking deque of `Element`s. /// /// Operations on the front of the deque must be done by a single thread (the "owner" thread), and /// these operations never block. Operations on the back of the queue can be done by multiple /// threads concurrently (however they are serialized through a mutex). -internal class TaskDeque: ManagedBuffer< - TaskDequeHeader, - TaskDequeElement -> -{ - +internal struct TaskDeque { // TaskDeque keeps all non-empty elements in a contiguous buffer. + typealias Buffer = UnmanagedBuffer, TaskDequeElement> + var buffer: Buffer - class func make() -> Self { + init() { precondition( Constants.capacity > 3 && Constants.capacity <= 65536, "capacity must be between [4, 65536].") precondition( Constants.capacity & (Constants.capacity - 1) == 0, - "capacity must be a power of two for fast masking.") - let deque = Self.create(minimumCapacity: Constants.capacity) { _ in TaskDequeHeader() } as! Self - deque.withUnsafeMutablePointerToElements { elems in - elems.initialize(repeating: TaskDequeElement(), count: Constants.capacity) + "capacity must be a power of two for fast masking.") + buffer = Buffer(capacity: Constants.capacity) { elements in + elements.baseAddress!.initialize(repeating: TaskDequeElement(), count: elements.count) + return TaskDequeHeader() } - return deque + } + private var header: TaskDequeHeader { + _read { yield buffer.header } + nonmutating _modify { yield &buffer.header } } - deinit { + mutating func deallocate() { assert( TaskDequeIndex(header.front.valueRelaxed).index == TaskDequeIndex(header.back.valueRelaxed).index, - "TaskDeque not empty; \(self)") + "TaskDeque not empty; \(self)") + buffer.deallocate { elements, _ in + elements.baseAddress!.deinitialize(count: elements.count) + } } /// Add a new element to the front of the queue. @@ -52,20 +57,18 @@ internal class TaskDeque: ManagedBuff /// - Returns: an `Element` if the queue is full; it is up to the caller to appropriately execute /// the returned element. func pushFront(_ elem: Element) -> Element? { - withUnsafeMutablePointerToElements { elems in - let front = TaskDequeIndex(header.front.valueRelaxed) - var state = elems[front.index].state.valueRelaxed - if TaskState(rawValue: state) != .empty - || !elems[front.index].state.cmpxchgStrongAcquire( - original: &state, newValue: TaskState.busy.rawValue) - { - return elem - } - header.front.setRelaxed(front.movedForward().underlying) - elems[front.index].element = elem - elems[front.index].state.setRelease(TaskState.ready.rawValue) - return nil + let front = TaskDequeIndex(header.front.valueRelaxed) + var state = buffer[front.index].state.valueRelaxed + if TaskState(rawValue: state) != .empty + || !buffer[front.index].state.cmpxchgStrongAcquire( + original: &state, newValue: TaskState.busy.rawValue) + { + return elem } + header.front.setRelaxed(front.movedForward().underlying) + buffer[front.index].element = elem + buffer[front.index].state.setRelease(TaskState.ready.rawValue) + return nil } /// Removes one element from the front of the queue. @@ -73,21 +76,19 @@ internal class TaskDeque: ManagedBuff /// - Invariant: this function must only ever be called by the "owner" thread. /// - Returns: an `Element` if the queue is non-empty. func popFront() -> Element? { - withUnsafeMutablePointerToElements { elems in - let front = TaskDequeIndex(header.front.valueRelaxed).movedBackward() - var state = elems[front.index].state.valueRelaxed - if TaskState(rawValue: state) != .ready - || !elems[front.index].state.cmpxchgStrongAcquire( - original: &state, newValue: TaskState.busy.rawValue) - { - return nil - } - var elem: Element? = nil - swap(&elems[front.index].element, &elem) - elems[front.index].state.setRelease(TaskState.empty.rawValue) - header.front.setRelaxed(front.underlying) - return elem + let front = TaskDequeIndex(header.front.valueRelaxed).movedBackward() + var state = buffer[front.index].state.valueRelaxed + if TaskState(rawValue: state) != .ready + || !buffer[front.index].state.cmpxchgStrongAcquire( + original: &state, newValue: TaskState.busy.rawValue) + { + return nil } + var elem: Element? = nil + swap(&buffer[front.index].element, &elem) + buffer[front.index].state.setRelease(TaskState.empty.rawValue) + header.front.setRelaxed(front.underlying) + return elem } /// Add a new element to the back of the queue. @@ -96,23 +97,21 @@ internal class TaskDeque: ManagedBuff /// - Returns: an `Element` if the queue is full; it is up to the caller to appropriately execute /// the returned element. func pushBack(_ elem: Element) -> Element? { - withUnsafeMutablePointerToElements { elems in - header.lock.lock() - defer { header.lock.unlock() } + header.lock.lock() + defer { header.lock.unlock() } - let back = TaskDequeIndex(header.back.valueRelaxed).movedBackward() - var state = elems[back.index].state.valueRelaxed - if TaskState(rawValue: state) != .empty - || !elems[back.index].state.cmpxchgStrongAcquire( - original: &state, newValue: TaskState.busy.rawValue) - { - return elem - } - header.back.setRelaxed(back.underlying) - elems[back.index].element = elem - elems[back.index].state.setRelease(TaskState.ready.rawValue) - return nil + let back = TaskDequeIndex(header.back.valueRelaxed).movedBackward() + var state = buffer[back.index].state.valueRelaxed + if TaskState(rawValue: state) != .empty + || !buffer[back.index].state.cmpxchgStrongAcquire( + original: &state, newValue: TaskState.busy.rawValue) + { + return elem } + header.back.setRelaxed(back.underlying) + buffer[back.index].element = elem + buffer[back.index].state.setRelease(TaskState.ready.rawValue) + return nil } /// Removes one element from the back of the queue. @@ -122,24 +121,22 @@ internal class TaskDeque: ManagedBuff func popBack() -> Element? { if isEmpty { return nil } // Fast-path to avoid taking lock. - return withUnsafeMutablePointerToElements { elems in - header.lock.lock() - defer { header.lock.unlock() } + header.lock.lock() + defer { header.lock.unlock() } - let back = TaskDequeIndex(header.back.valueRelaxed) - var state = elems[back.index].state.valueRelaxed - if TaskState(rawValue: state) != .ready - || !elems[back.index].state.cmpxchgStrongAcquire( - original: &state, newValue: TaskState.busy.rawValue) - { - return nil - } - var elem: Element? = nil - swap(&elems[back.index].element, &elem) - elems[back.index].state.setRelease(TaskState.empty.rawValue) - header.back.setRelaxed(back.movedForward().underlying) - return elem + let back = TaskDequeIndex(header.back.valueRelaxed) + var state = buffer[back.index].state.valueRelaxed + if TaskState(rawValue: state) != .ready + || !buffer[back.index].state.cmpxchgStrongAcquire( + original: &state, newValue: TaskState.busy.rawValue) + { + return nil } + var elem: Element? = nil + swap(&buffer[back.index].element, &elem) + buffer[back.index].state.setRelease(TaskState.empty.rawValue) + header.back.setRelaxed(back.movedForward().underlying) + return elem } /// False iff the queue contains at least one entry. diff --git a/Sources/PenguinStructures/UnmanagedBuffer.swift b/Sources/PenguinStructures/UnmanagedBuffer.swift new file mode 100644 index 00000000..19813a7f --- /dev/null +++ b/Sources/PenguinStructures/UnmanagedBuffer.swift @@ -0,0 +1,123 @@ +// Copyright 2020 Penguin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// A dynamically-sized, contiguous array of `Element`s prefixed by `Header`. +/// +/// `UnmanagedBuffer` is similar to the standard library's `ManagedBuffer`, with the key difference +/// in that `UnmanagedBuffer` does not induce reference counting overheads. `UnmanagedBuffer` +/// encapsulates a pointer onto the heap that is allocated during `init`. It is the user's +/// responsibility to call `deallocate` exactly once. +/// +/// Although `header` is initialized at `UnmanagedBuffer` initialization, `UnamanagedBuffer` (like +/// `ManagedBuffer` supports partial initialization of the `Element`s array. +public struct UnmanagedBuffer { + /// The pointer to the heap memory. + private let basePointer: UnsafeMutableRawPointer + + /// The offset from the `basePointer` to the start of the contiguous array of `Element`s. + // Note: a previous implementation dynamically computed the `elementOffsetInBytes`, but in certain + // workloads, ~25% of the CPU time was spent in `swift_getGenericMetadata`!! Since we can't + // guarantee full specialization (where this turns into a compile-time constant), we instead + // materialize it explicitly. + private let elementOffsetInBytes: UInt32 + + /// The count of `Element`s capable of being stored in the trailing dynamically-sized buffer. + // Note: we limit the capacity of `UnmanagedBuffer` to be < UInt32.max to stuff the + // UnmanagedBuffer struct itself into 2 words for performance. Future work could consider removing + // this field, and bit-stuffing `elementOffsetInBytes` into the unused bits of `basePointer`, + // making `UnmanagedBuffer` take only a single machine word. + // + // We set elementCapacity to 0 when deallocated as a sentinal. + private var elementCapacity: UInt32 + + // NOTE FOR REVIEWER: Should this be a static function `.allocate()` instead? + /// Allocates an `UnmanagedBuffer` to store up to `capacity` `Element`s, initializing + /// memory with `initializer`. + public init(capacity: Int, initializer: (UnsafeMutableBufferPointer) -> Header) { + precondition( + capacity < UInt32.max, + "Cannot allocate an UnmanagedBuffer with capacity: \(capacity).") + precondition(capacity > 0, "capacity must be > 0!") + precondition(MemoryLayout
.size < UInt32.max) + + let layout = Self.offsetsAndAlignments(capacity: capacity) + + basePointer = UnsafeMutableRawPointer.allocate( + byteCount: layout.totalBytes, + alignment: layout.bufferAlignment) + elementOffsetInBytes = UInt32(layout.elementsOffset) + elementCapacity = UInt32(capacity) + + basePointer.bindMemory(to: Header.self, capacity: 1) + (basePointer + Int(elementOffsetInBytes)).bindMemory(to: Element.self, capacity: capacity) + // (Partially) initialize memory. + headerPointer.initialize(to: initializer(elementsPointer)) + } + + // Note: you must deinitialize yourself first if required (using headerPointer, + // and elementsPointer). + public mutating func deallocate(deinitalizingElements: (UnsafeMutableBufferPointer, Header) -> Void) { + deinitalizingElements(elementsPointer, header) + headerPointer.deinitialize(count: 1) + basePointer.deallocate() + elementCapacity = 0 // Set sentinal. + } + + /// A pointer to the header. + public var headerPointer: UnsafeMutablePointer
{ + assert(elementCapacity > 0, "Attempting to access a deallocate'd UnmanagedBuffer.") + return basePointer.assumingMemoryBound(to: Header.self) + } + + /// A pointer to the collection of elements that comprise the body. + public var elementsPointer: UnsafeMutableBufferPointer { + assert(elementCapacity > 0, "Attempting to access a deallocate'd UnmanagedBuffer.") + let start = (basePointer + Int(elementOffsetInBytes)).assumingMemoryBound(to: Element.self) + return UnsafeMutableBufferPointer(start: start, count: Int(elementCapacity)) + } + + /// The header of the `UnmanagedBuffer`. + public var header: Header { + _read { yield headerPointer.pointee } + nonmutating _modify { yield &headerPointer.pointee } + } + + /// Access elements in `self`. + /// + /// Important note: indexing must only be performed when the buffer's underlying memory has been + /// initialized! + public subscript(index: Int) -> Element { + _read { + precondition(index < capacity, "Index out of bounds (\(index) >= \(capacity)).") + yield elementsPointer[index] + } + nonmutating _modify { + precondition(index < capacity, "Index out of bounds (\(index) >= \(capacity)).") + yield &elementsPointer[index] + } + } + + /// The maximum number of `Element`s that can be stored in `self`. + public var capacity: Int { Int(elementCapacity) } + + internal static func offsetsAndAlignments(capacity: Int) -> (totalBytes: Int, elementsOffset: Int, bufferAlignment: Int) { + // Compute the next alignment offset after MemoryLayout
.size bytes. (Leverages int + // division does truncation, and that MemoryLayout.alignment == 1!) + let offsetSteps = (MemoryLayout
.size + MemoryLayout.alignment - 1) / MemoryLayout.alignment + let offset = offsetSteps * MemoryLayout.alignment + let totalBytes = offset + (capacity * MemoryLayout.stride) + let alignment = max(MemoryLayout.alignment, MemoryLayout
.alignment) + return (totalBytes, offset, alignment) + } +} diff --git a/Tests/PenguinParallelTests/TaskDequeTests.swift b/Tests/PenguinParallelTests/TaskDequeTests.swift index 2bba18e7..529467b8 100644 --- a/Tests/PenguinParallelTests/TaskDequeTests.swift +++ b/Tests/PenguinParallelTests/TaskDequeTests.swift @@ -19,7 +19,11 @@ import XCTest final class TaskDequeTests: XCTestCase { func testSimplePushAndPop() { - let deque = TaskDeque.make() + let deque = TaskDeque() + defer { + var dequeCopy = deque // Make a copy in order to deallocate the backing pointer. + dequeCopy.deallocate() + } XCTAssert(deque.isEmpty) XCTAssertNil(deque.pushFront(1)) diff --git a/Tests/PenguinStructuresTests/UnmanagedBufferTests.swift b/Tests/PenguinStructuresTests/UnmanagedBufferTests.swift new file mode 100644 index 00000000..b17b3ed4 --- /dev/null +++ b/Tests/PenguinStructuresTests/UnmanagedBufferTests.swift @@ -0,0 +1,84 @@ +// Copyright 2020 Penguin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +@testable import PenguinStructures +import XCTest + +final class UnmanagedBufferTests: XCTestCase { + func testOffsetAndAlignments() { + assert(header: Void.self, elements: Int.self, capacity: 10, totalBytes: 80, elementsOffset: 0, alignment: 8) + assert(header: Int.self, elements: Void.self, capacity: 10, totalBytes: 18, elementsOffset: 8, alignment: 8) + assert(header: Int.self, elements: Int.self, capacity: 10, totalBytes: 88, elementsOffset: 8, alignment: 8) + assert(header: (Int, Int32).self, elements: Int.self, capacity: 10, totalBytes: 96, elementsOffset: 16, alignment: 8) + assert(header: Int32.self, elements: (Int.self), capacity: 10, totalBytes: 88, elementsOffset: 8, alignment: 8) + } + + func testMemoryReclaiming() { + var trackCount = 0 + var buffer = UnmanagedBuffer, Int>(capacity: 3) { _ in + // Don't initialize elements. + Tracked(3) { trackCount += $0 } + } + XCTAssertEqual(1, trackCount) + buffer.deallocate { _, _ in } // No need to de-initalize. + XCTAssertEqual(0, trackCount) + } + + func testStoringAndRetrievingElements() { + var trackCount = 0 + var buffer = UnmanagedBuffer>(capacity: 10) { buff in + for i in 0..( + header: H.Type, + elements: E.Type, + capacity: Int, + totalBytes: Int, + elementsOffset: Int, + alignment: Int, + file: StaticString = #file, + line: UInt = #line +) { + let results = UnmanagedBuffer.offsetsAndAlignments(capacity: capacity) + XCTAssertEqual(results.totalBytes, totalBytes, + "Incorrect total bytes; expected: \(totalBytes), computed: \(results.totalBytes)", + file: file, line: line) + XCTAssertEqual(results.elementsOffset, elementsOffset, + "Incorrect elements offset; expected: \(elementsOffset), computed: \(results.elementsOffset)", + file: file, line: line) + XCTAssertEqual(results.bufferAlignment, alignment, + "Incorrect buffer alignment; expected: \(alignment), computed: \(results.bufferAlignment)", + file: file, line: line) +} diff --git a/Tests/PenguinStructuresTests/XCTestManifests.swift b/Tests/PenguinStructuresTests/XCTestManifests.swift index 27abeb95..5876d163 100644 --- a/Tests/PenguinStructuresTests/XCTestManifests.swift +++ b/Tests/PenguinStructuresTests/XCTestManifests.swift @@ -25,6 +25,7 @@ import XCTest testCase(ArrayStorageTests.allTests), testCase(ArrayStorageExtensionTests.allTests), testCase(ArrayBufferTests.allTests), + testCase(UnmanagedBufferTests.allTests), ] } #endif