Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr
self.totalThreadCount = totalThreadCount
self.externalFastPathThreadCount = externalFastPathThreadCount
self.coprimes = positiveCoprimes(totalThreadCount)
self.queues = (0..<totalThreadCount).map { _ in Queue.make() }
self.queues = (0..<totalThreadCount).map { _ in Queue() }
self.cancelledStorage = AtomicUInt64()
self.blockedCountStorage = AtomicUInt64()
self.spinningState = AtomicUInt64()
Expand All @@ -123,6 +123,10 @@ public class NonBlockingThreadPool<Environment: ConcurrencyPlatform>: ComputeThr
deinit {
// Shut ourselves down, just in case.
shutDown()
// Deallocate queues.
for var q in queues {
q.deallocate()
}
}

/// Registers the current thread with the thread pool for fast-path operation.
Expand Down
141 changes: 69 additions & 72 deletions Sources/PenguinParallel/NonblockingThreadPool/TaskDeque.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,43 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import PenguinStructures

/// A fixed-size, partially non-blocking deque of `Element`s.
///
/// Operations on the front of the deque must be done by a single thread (the "owner" thread), and
/// these operations never block. Operations on the back of the queue can be done by multiple
/// threads concurrently (however they are serialized through a mutex).
internal class TaskDeque<Element, Environment: ConcurrencyPlatform>: ManagedBuffer<
TaskDequeHeader<Environment>,
TaskDequeElement<Element>
>
{

internal struct TaskDeque<Element, Environment: ConcurrencyPlatform> {
// TaskDeque keeps all non-empty elements in a contiguous buffer.
typealias Buffer = UnmanagedBuffer<TaskDequeHeader<Environment>, TaskDequeElement<Element>>
var buffer: Buffer

class func make() -> Self {
init() {
precondition(
Constants.capacity > 3 && Constants.capacity <= 65536,
"capacity must be between [4, 65536].")
precondition(
Constants.capacity & (Constants.capacity - 1) == 0,
"capacity must be a power of two for fast masking.")
let deque = Self.create(minimumCapacity: Constants.capacity) { _ in TaskDequeHeader() } as! Self
deque.withUnsafeMutablePointerToElements { elems in
elems.initialize(repeating: TaskDequeElement(), count: Constants.capacity)
"capacity must be a power of two for fast masking.")
buffer = Buffer(capacity: Constants.capacity) { elements in
elements.baseAddress!.initialize(repeating: TaskDequeElement(), count: elements.count)
return TaskDequeHeader()
}
return deque
}
private var header: TaskDequeHeader<Environment> {
_read { yield buffer.header }
nonmutating _modify { yield &buffer.header }
}

deinit {
mutating func deallocate() {
assert(
TaskDequeIndex(header.front.valueRelaxed).index
== TaskDequeIndex(header.back.valueRelaxed).index,
"TaskDeque not empty; \(self)")
"TaskDeque not empty; \(self)")
buffer.deallocate { elements, _ in
elements.baseAddress!.deinitialize(count: elements.count)
}
}

/// Add a new element to the front of the queue.
Expand All @@ -52,42 +57,38 @@ internal class TaskDeque<Element, Environment: ConcurrencyPlatform>: ManagedBuff
/// - Returns: an `Element` if the queue is full; it is up to the caller to appropriately execute
/// the returned element.
func pushFront(_ elem: Element) -> Element? {
withUnsafeMutablePointerToElements { elems in
let front = TaskDequeIndex(header.front.valueRelaxed)
var state = elems[front.index].state.valueRelaxed
if TaskState(rawValue: state) != .empty
|| !elems[front.index].state.cmpxchgStrongAcquire(
original: &state, newValue: TaskState.busy.rawValue)
{
return elem
}
header.front.setRelaxed(front.movedForward().underlying)
elems[front.index].element = elem
elems[front.index].state.setRelease(TaskState.ready.rawValue)
return nil
let front = TaskDequeIndex(header.front.valueRelaxed)
var state = buffer[front.index].state.valueRelaxed
if TaskState(rawValue: state) != .empty
|| !buffer[front.index].state.cmpxchgStrongAcquire(
original: &state, newValue: TaskState.busy.rawValue)
{
return elem
}
header.front.setRelaxed(front.movedForward().underlying)
buffer[front.index].element = elem
buffer[front.index].state.setRelease(TaskState.ready.rawValue)
return nil
}

/// Removes one element from the front of the queue.
///
/// - Invariant: this function must only ever be called by the "owner" thread.
/// - Returns: an `Element` if the queue is non-empty.
func popFront() -> Element? {
withUnsafeMutablePointerToElements { elems in
let front = TaskDequeIndex(header.front.valueRelaxed).movedBackward()
var state = elems[front.index].state.valueRelaxed
if TaskState(rawValue: state) != .ready
|| !elems[front.index].state.cmpxchgStrongAcquire(
original: &state, newValue: TaskState.busy.rawValue)
{
return nil
}
var elem: Element? = nil
swap(&elems[front.index].element, &elem)
elems[front.index].state.setRelease(TaskState.empty.rawValue)
header.front.setRelaxed(front.underlying)
return elem
let front = TaskDequeIndex(header.front.valueRelaxed).movedBackward()
var state = buffer[front.index].state.valueRelaxed
if TaskState(rawValue: state) != .ready
|| !buffer[front.index].state.cmpxchgStrongAcquire(
original: &state, newValue: TaskState.busy.rawValue)
{
return nil
}
var elem: Element? = nil
swap(&buffer[front.index].element, &elem)
buffer[front.index].state.setRelease(TaskState.empty.rawValue)
header.front.setRelaxed(front.underlying)
return elem
}

/// Add a new element to the back of the queue.
Expand All @@ -96,23 +97,21 @@ internal class TaskDeque<Element, Environment: ConcurrencyPlatform>: ManagedBuff
/// - Returns: an `Element` if the queue is full; it is up to the caller to appropriately execute
/// the returned element.
func pushBack(_ elem: Element) -> Element? {
withUnsafeMutablePointerToElements { elems in
header.lock.lock()
defer { header.lock.unlock() }
header.lock.lock()
defer { header.lock.unlock() }

let back = TaskDequeIndex(header.back.valueRelaxed).movedBackward()
var state = elems[back.index].state.valueRelaxed
if TaskState(rawValue: state) != .empty
|| !elems[back.index].state.cmpxchgStrongAcquire(
original: &state, newValue: TaskState.busy.rawValue)
{
return elem
}
header.back.setRelaxed(back.underlying)
elems[back.index].element = elem
elems[back.index].state.setRelease(TaskState.ready.rawValue)
return nil
let back = TaskDequeIndex(header.back.valueRelaxed).movedBackward()
var state = buffer[back.index].state.valueRelaxed
if TaskState(rawValue: state) != .empty
|| !buffer[back.index].state.cmpxchgStrongAcquire(
original: &state, newValue: TaskState.busy.rawValue)
{
return elem
}
header.back.setRelaxed(back.underlying)
buffer[back.index].element = elem
buffer[back.index].state.setRelease(TaskState.ready.rawValue)
return nil
}

/// Removes one element from the back of the queue.
Expand All @@ -122,24 +121,22 @@ internal class TaskDeque<Element, Environment: ConcurrencyPlatform>: ManagedBuff
func popBack() -> Element? {
if isEmpty { return nil } // Fast-path to avoid taking lock.

return withUnsafeMutablePointerToElements { elems in
header.lock.lock()
defer { header.lock.unlock() }
header.lock.lock()
defer { header.lock.unlock() }

let back = TaskDequeIndex(header.back.valueRelaxed)
var state = elems[back.index].state.valueRelaxed
if TaskState(rawValue: state) != .ready
|| !elems[back.index].state.cmpxchgStrongAcquire(
original: &state, newValue: TaskState.busy.rawValue)
{
return nil
}
var elem: Element? = nil
swap(&elems[back.index].element, &elem)
elems[back.index].state.setRelease(TaskState.empty.rawValue)
header.back.setRelaxed(back.movedForward().underlying)
return elem
let back = TaskDequeIndex(header.back.valueRelaxed)
var state = buffer[back.index].state.valueRelaxed
if TaskState(rawValue: state) != .ready
|| !buffer[back.index].state.cmpxchgStrongAcquire(
original: &state, newValue: TaskState.busy.rawValue)
{
return nil
}
var elem: Element? = nil
swap(&buffer[back.index].element, &elem)
buffer[back.index].state.setRelease(TaskState.empty.rawValue)
header.back.setRelaxed(back.movedForward().underlying)
return elem
}

/// False iff the queue contains at least one entry.
Expand Down
123 changes: 123 additions & 0 deletions Sources/PenguinStructures/UnmanagedBuffer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2020 Penguin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// A dynamically-sized, contiguous array of `Element`s prefixed by `Header`.
///
/// `UnmanagedBuffer` is similar to the standard library's `ManagedBuffer`, with the key difference
/// in that `UnmanagedBuffer` does not induce reference counting overheads. `UnmanagedBuffer`
/// encapsulates a pointer onto the heap that is allocated during `init`. It is the user's
/// responsibility to call `deallocate` exactly once.
///
/// Although `header` is initialized at `UnmanagedBuffer` initialization, `UnamanagedBuffer` (like
/// `ManagedBuffer` supports partial initialization of the `Element`s array.
public struct UnmanagedBuffer<Header, Element> {
/// The pointer to the heap memory.
private let basePointer: UnsafeMutableRawPointer

/// The offset from the `basePointer` to the start of the contiguous array of `Element`s.
// Note: a previous implementation dynamically computed the `elementOffsetInBytes`, but in certain
// workloads, ~25% of the CPU time was spent in `swift_getGenericMetadata`!! Since we can't
// guarantee full specialization (where this turns into a compile-time constant), we instead
// materialize it explicitly.
private let elementOffsetInBytes: UInt32

/// The count of `Element`s capable of being stored in the trailing dynamically-sized buffer.
// Note: we limit the capacity of `UnmanagedBuffer` to be < UInt32.max to stuff the
// UnmanagedBuffer struct itself into 2 words for performance. Future work could consider removing
// this field, and bit-stuffing `elementOffsetInBytes` into the unused bits of `basePointer`,
// making `UnmanagedBuffer` take only a single machine word.
//
// We set elementCapacity to 0 when deallocated as a sentinal.
private var elementCapacity: UInt32

// NOTE FOR REVIEWER: Should this be a static function `.allocate()` instead?
/// Allocates an `UnmanagedBuffer` to store up to `capacity` `Element`s, initializing
/// memory with `initializer`.
public init(capacity: Int, initializer: (UnsafeMutableBufferPointer<Element>) -> Header) {
precondition(
capacity < UInt32.max,
"Cannot allocate an UnmanagedBuffer with capacity: \(capacity).")
precondition(capacity > 0, "capacity must be > 0!")
precondition(MemoryLayout<Header>.size < UInt32.max)

let layout = Self.offsetsAndAlignments(capacity: capacity)

basePointer = UnsafeMutableRawPointer.allocate(
byteCount: layout.totalBytes,
alignment: layout.bufferAlignment)
elementOffsetInBytes = UInt32(layout.elementsOffset)
elementCapacity = UInt32(capacity)

basePointer.bindMemory(to: Header.self, capacity: 1)
(basePointer + Int(elementOffsetInBytes)).bindMemory(to: Element.self, capacity: capacity)
// (Partially) initialize memory.
headerPointer.initialize(to: initializer(elementsPointer))
}

// Note: you must deinitialize yourself first if required (using headerPointer,
// and elementsPointer).
public mutating func deallocate(deinitalizingElements: (UnsafeMutableBufferPointer<Element>, Header) -> Void) {
deinitalizingElements(elementsPointer, header)
headerPointer.deinitialize(count: 1)
basePointer.deallocate()
elementCapacity = 0 // Set sentinal.
}

/// A pointer to the header.
public var headerPointer: UnsafeMutablePointer<Header> {
assert(elementCapacity > 0, "Attempting to access a deallocate'd UnmanagedBuffer.")
return basePointer.assumingMemoryBound(to: Header.self)
}

/// A pointer to the collection of elements that comprise the body.
public var elementsPointer: UnsafeMutableBufferPointer<Element> {
assert(elementCapacity > 0, "Attempting to access a deallocate'd UnmanagedBuffer.")
let start = (basePointer + Int(elementOffsetInBytes)).assumingMemoryBound(to: Element.self)
return UnsafeMutableBufferPointer(start: start, count: Int(elementCapacity))
}

/// The header of the `UnmanagedBuffer`.
public var header: Header {
_read { yield headerPointer.pointee }
nonmutating _modify { yield &headerPointer.pointee }
}

/// Access elements in `self`.
///
/// Important note: indexing must only be performed when the buffer's underlying memory has been
/// initialized!
public subscript(index: Int) -> Element {
_read {
precondition(index < capacity, "Index out of bounds (\(index) >= \(capacity)).")
yield elementsPointer[index]
}
nonmutating _modify {
precondition(index < capacity, "Index out of bounds (\(index) >= \(capacity)).")
yield &elementsPointer[index]
}
}

/// The maximum number of `Element`s that can be stored in `self`.
public var capacity: Int { Int(elementCapacity) }

internal static func offsetsAndAlignments(capacity: Int) -> (totalBytes: Int, elementsOffset: Int, bufferAlignment: Int) {
// Compute the next alignment offset after MemoryLayout<Header>.size bytes. (Leverages int
// division does truncation, and that MemoryLayout<ZERO_SIZED_TYPE>.alignment == 1!)
let offsetSteps = (MemoryLayout<Header>.size + MemoryLayout<Element>.alignment - 1) / MemoryLayout<Element>.alignment
let offset = offsetSteps * MemoryLayout<Element>.alignment
let totalBytes = offset + (capacity * MemoryLayout<Element>.stride)
let alignment = max(MemoryLayout<Element>.alignment, MemoryLayout<Header>.alignment)
return (totalBytes, offset, alignment)
}
}
6 changes: 5 additions & 1 deletion Tests/PenguinParallelTests/TaskDequeTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import XCTest

final class TaskDequeTests: XCTestCase {
func testSimplePushAndPop() {
let deque = TaskDeque<Int, PosixConcurrencyPlatform>.make()
let deque = TaskDeque<Int, PosixConcurrencyPlatform>()
defer {
var dequeCopy = deque // Make a copy in order to deallocate the backing pointer.
dequeCopy.deallocate()
}

XCTAssert(deque.isEmpty)
XCTAssertNil(deque.pushFront(1))
Expand Down
Loading