diff --git a/NOTICES.txt b/NOTICES.txt index 11221c3e0..61ce63d4e 100644 --- a/NOTICES.txt +++ b/NOTICES.txt @@ -17,20 +17,6 @@ under the License. ------------------------------------------------------------------------------- -This product uses scripts derived from SwiftNIO's integration testing -framework: 'test_01_allocation_counts.sh', 'run-nio-alloc-counter-tests.sh' and -'test_functions.sh'. - -It also uses derivations of SwiftNIO's lock 'NIOLock.swift' and locked value box -'NIOLockedValueBox.swift'. - - * LICENSE (Apache License 2.0): - * https://github.com/apple/swift-nio/blob/main/LICENSE.txt - * HOMEPAGE: - * https://github.com/apple/swift-nio - ---- - This product uses derivations of SwiftNIOHTTP2's implementation of case insensitive comparison of strings, found in 'HPACKHeader.swift'. diff --git a/Sources/GRPCCore/Internal/Concurrency Primitives/Lock.swift b/Sources/GRPCCore/Internal/Concurrency Primitives/Lock.swift deleted file mode 100644 index 2867e9982..000000000 --- a/Sources/GRPCCore/Internal/Concurrency Primitives/Lock.swift +++ /dev/null @@ -1,291 +0,0 @@ -/* - * Copyright 2023, gRPC Authors All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -#if canImport(Darwin) -public import Darwin // should be @usableFromInline -#elseif canImport(Glibc) -public import Glibc // should be @usableFromInline -#endif - -@usableFromInline -typealias LockPrimitive = pthread_mutex_t - -@usableFromInline -enum LockOperations {} - -extension LockOperations { - @inlinable - static func create(_ mutex: UnsafeMutablePointer) { - mutex.assertValidAlignment() - - var attr = pthread_mutexattr_t() - pthread_mutexattr_init(&attr) - - let err = pthread_mutex_init(mutex, &attr) - precondition(err == 0, "\(#function) failed in pthread_mutex with error \(err)") - } - - @inlinable - static func destroy(_ mutex: UnsafeMutablePointer) { - mutex.assertValidAlignment() - - let err = pthread_mutex_destroy(mutex) - precondition(err == 0, "\(#function) failed in pthread_mutex with error \(err)") - } - - @inlinable - static func lock(_ mutex: UnsafeMutablePointer) { - mutex.assertValidAlignment() - - let err = pthread_mutex_lock(mutex) - precondition(err == 0, "\(#function) failed in pthread_mutex with error \(err)") - } - - @inlinable - static func unlock(_ mutex: UnsafeMutablePointer) { - mutex.assertValidAlignment() - - let err = pthread_mutex_unlock(mutex) - precondition(err == 0, "\(#function) failed in pthread_mutex with error \(err)") - } -} - -// Tail allocate both the mutex and a generic value using ManagedBuffer. -// Both the header pointer and the elements pointer are stable for -// the class's entire lifetime. -// -// However, for safety reasons, we elect to place the lock in the "elements" -// section of the buffer instead of the head. The reasoning here is subtle, -// so buckle in. -// -// _As a practical matter_, the implementation of ManagedBuffer ensures that -// the pointer to the header is stable across the lifetime of the class, and so -// each time you call `withUnsafeMutablePointers` or `withUnsafeMutablePointerToHeader` -// the value of the header pointer will be the same. This is because ManagedBuffer uses -// `Builtin.addressOf` to load the value of the header, and that does ~magic~ to ensure -// that it does not invoke any weird Swift accessors that might copy the value. -// -// _However_, the header is also available via the `.header` field on the ManagedBuffer. -// This presents a problem! The reason there's an issue is that `Builtin.addressOf` and friends -// do not interact with Swift's exclusivity model. That is, the various `with` functions do not -// conceptually trigger a mutating access to `.header`. For elements this isn't a concern because -// there's literally no other way to perform the access, but for `.header` it's entirely possible -// to accidentally recursively read it. -// -// Our implementation is free from these issues, so we don't _really_ need to worry about it. -// However, out of an abundance of caution, we store the Value in the header, and the LockPrimitive -// in the trailing elements. We still don't use `.header`, but it's better to be safe than sorry, -// and future maintainers will be happier that we were cautious. -// -// See also: https://github.com/apple/swift/pull/40000 -@usableFromInline -final class LockStorage: ManagedBuffer { - - @inlinable - static func create(value: Value) -> Self { - let buffer = Self.create(minimumCapacity: 1) { _ in - return value - } - let storage = unsafeDowncast(buffer, to: Self.self) - - storage.withUnsafeMutablePointers { _, lockPtr in - LockOperations.create(lockPtr) - } - - return storage - } - - @inlinable - func lock() { - self.withUnsafeMutablePointerToElements { lockPtr in - LockOperations.lock(lockPtr) - } - } - - @inlinable - func unlock() { - self.withUnsafeMutablePointerToElements { lockPtr in - LockOperations.unlock(lockPtr) - } - } - - @inlinable - deinit { - self.withUnsafeMutablePointerToElements { lockPtr in - LockOperations.destroy(lockPtr) - } - } - - @inlinable - func withLockPrimitive( - _ body: (UnsafeMutablePointer) throws -> T - ) rethrows -> T { - try self.withUnsafeMutablePointerToElements { lockPtr in - return try body(lockPtr) - } - } - - @inlinable - func withLockedValue(_ mutate: (inout Value) throws -> T) rethrows -> T { - try self.withUnsafeMutablePointers { valuePtr, lockPtr in - LockOperations.lock(lockPtr) - defer { LockOperations.unlock(lockPtr) } - return try mutate(&valuePtr.pointee) - } - } -} - -extension LockStorage: @unchecked Sendable {} - -/// A threading lock based on `libpthread` instead of `libdispatch`. -/// -/// - note: ``Lock`` has reference semantics. -/// -/// This object provides a lock on top of a single `pthread_mutex_t`. This kind -/// of lock is safe to use with `libpthread`-based threading models, such as the -/// one used by NIO. On Windows, the lock is based on the substantially similar -/// `SRWLOCK` type. -@usableFromInline -struct Lock { - @usableFromInline - internal let _storage: LockStorage - - /// Create a new lock. - @inlinable - init() { - self._storage = .create(value: ()) - } - - /// Acquire the lock. - /// - /// Whenever possible, consider using `withLock` instead of this method and - /// `unlock`, to simplify lock handling. - @inlinable - func lock() { - self._storage.lock() - } - - /// Release the lock. - /// - /// Whenever possible, consider using `withLock` instead of this method and - /// `lock`, to simplify lock handling. - @inlinable - func unlock() { - self._storage.unlock() - } - - @inlinable - internal func withLockPrimitive( - _ body: (UnsafeMutablePointer) throws -> T - ) rethrows -> T { - return try self._storage.withLockPrimitive(body) - } -} - -extension Lock { - /// Acquire the lock for the duration of the given block. - /// - /// This convenience method should be preferred to `lock` and `unlock` in - /// most situations, as it ensures that the lock will be released regardless - /// of how `body` exits. - /// - /// - Parameter body: The block to execute while holding the lock. - /// - Returns: The value returned by the block. - @inlinable - func withLock(_ body: () throws -> T) rethrows -> T { - self.lock() - defer { - self.unlock() - } - return try body() - } -} - -extension Lock: Sendable {} - -extension UnsafeMutablePointer { - @inlinable - func assertValidAlignment() { - assert(UInt(bitPattern: self) % UInt(MemoryLayout.alignment) == 0) - } -} - -@usableFromInline -struct LockedValueBox { - @usableFromInline - let storage: LockStorage - - @inlinable - init(_ value: Value) { - self.storage = .create(value: value) - } - - @inlinable - func withLockedValue(_ mutate: (inout Value) throws -> T) rethrows -> T { - return try self.storage.withLockedValue(mutate) - } - - /// An unsafe view over the locked value box. - /// - /// Prefer ``withLockedValue(_:)`` where possible. - @usableFromInline - var unsafe: Unsafe { - Unsafe(storage: self.storage) - } - - @usableFromInline - struct Unsafe { - @usableFromInline - let storage: LockStorage - - /// Manually acquire the lock. - @inlinable - func lock() { - self.storage.lock() - } - - /// Manually release the lock. - @inlinable - func unlock() { - self.storage.unlock() - } - - /// Mutate the value, assuming the lock has been acquired manually. - @inlinable - func withValueAssumingLockIsAcquired( - _ mutate: (inout Value) throws -> T - ) rethrows -> T { - return try self.storage.withUnsafeMutablePointerToHeader { value in - try mutate(&value.pointee) - } - } - } -} - -extension LockedValueBox: Sendable where Value: Sendable {} diff --git a/Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift b/Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift index ac7e6c084..4e4860508 100644 --- a/Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift +++ b/Sources/GRPCCore/Streaming/Internal/BroadcastAsyncSequence.swift @@ -15,6 +15,7 @@ */ public import DequeModule // should be @usableFromInline +public import Synchronization // should be @usableFromInline /// An `AsyncSequence` which can broadcast its values to multiple consumers concurrently. /// @@ -156,15 +157,15 @@ enum BroadcastAsyncSequenceError: Error { @usableFromInline final class _BroadcastSequenceStorage: Sendable { @usableFromInline - let _state: LockedValueBox<_BroadcastSequenceStateMachine> + let _state: Mutex<_BroadcastSequenceStateMachine> @inlinable init(bufferSize: Int) { - self._state = LockedValueBox(_BroadcastSequenceStateMachine(bufferSize: bufferSize)) + self._state = Mutex(_BroadcastSequenceStateMachine(bufferSize: bufferSize)) } deinit { - let onDrop = self._state.withLockedValue { state in + let onDrop = self._state.withLock { state in state.dropResources() } @@ -184,7 +185,7 @@ final class _BroadcastSequenceStorage: Sendable { /// - Parameter element: The element to write. @inlinable func yield(_ element: Element) async throws { - let onYield = self._state.withLockedValue { state in state.yield(element) } + let onYield = self._state.withLock { state in state.yield(element) } switch onYield { case .none: @@ -196,7 +197,7 @@ final class _BroadcastSequenceStorage: Sendable { case .suspend(let token): try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { continuation in - let onProduceMore = self._state.withLockedValue { state in + let onProduceMore = self._state.withLock { state in state.waitToProduceMore(continuation: continuation, token: token) } @@ -208,7 +209,7 @@ final class _BroadcastSequenceStorage: Sendable { } } } onCancel: { - let onCancel = self._state.withLockedValue { state in + let onCancel = self._state.withLock { state in state.cancelProducer(withToken: token) } @@ -230,7 +231,7 @@ final class _BroadcastSequenceStorage: Sendable { /// - Parameter result: Whether the stream is finishing cleanly or because of an error. @inlinable func finish(_ result: Result) { - let action = self._state.withLockedValue { state in state.finish(result: result) } + let action = self._state.withLock { state in state.finish(result: result) } switch action { case .none: () @@ -247,7 +248,7 @@ final class _BroadcastSequenceStorage: Sendable { /// - Returns: Returns a unique subscription ID. @inlinable func subscribe() -> _BroadcastSequenceStateMachine.Subscriptions.ID { - self._state.withLockedValue { $0.subscribe() } + self._state.withLock { $0.subscribe() } } /// Returns the next element for the given subscriber, if it is available. @@ -259,35 +260,32 @@ final class _BroadcastSequenceStorage: Sendable { forSubscriber id: _BroadcastSequenceStateMachine.Subscriptions.ID ) async throws -> Element? { return try await withTaskCancellationHandler { - self._state.unsafe.lock() - let onNext = self._state.unsafe.withValueAssumingLockIsAcquired { + let onNext = self._state.withLock { $0.nextElement(forSubscriber: id) } switch onNext { case .return(let returnAndProduceMore): - self._state.unsafe.unlock() returnAndProduceMore.producers.resume() return try returnAndProduceMore.nextResult.get() case .suspend: return try await withCheckedThrowingContinuation { continuation in - let onSetContinuation = self._state.unsafe.withValueAssumingLockIsAcquired { state in + let onSetContinuation = self._state.withLock { state in state.setContinuation(continuation, forSubscription: id) } - self._state.unsafe.unlock() - switch onSetContinuation { - case .resume(let continuation, let result): + case .resume(let continuation, let result, let producerContinuations): continuation.resume(with: result) + producerContinuations?.resume() case .none: () } } } } onCancel: { - let onCancel = self._state.withLockedValue { state in + let onCancel = self._state.withLock { state in state.cancelSubscription(withID: id) } @@ -304,7 +302,7 @@ final class _BroadcastSequenceStorage: Sendable { /// elements. @inlinable var isKnownSafeForNextSubscriber: Bool { - self._state.withLockedValue { state in + self._state.withLock { state in state.nextSubscriptionIsValid } } @@ -312,7 +310,7 @@ final class _BroadcastSequenceStorage: Sendable { /// Invalidates all active subscriptions. @inlinable func invalidateAllSubscriptions() { - let action = self._state.withLockedValue { state in + let action = self._state.withLock { state in state.invalidateAllSubscriptions() } @@ -467,10 +465,17 @@ struct _BroadcastSequenceStateMachine: Sendable { _ continuation: ConsumerContinuation, forSubscription id: _BroadcastSequenceStateMachine.Subscriptions.ID ) -> OnSetContinuation { - if self.subscriptions.setContinuation(continuation, forSubscriber: id) { - return .none - } else { - return .resume(continuation, .failure(CancellationError())) + // 'next(id)' must be checked first: an element might've been provided between the lock + // being dropped and a continuation being created and the lock being acquired again. + switch self.next(id) { + case .return(let resultAndProducers): + return .resume(continuation, resultAndProducers.nextResult, resultAndProducers.producers) + case .suspend: + if self.subscriptions.setContinuation(continuation, forSubscriber: id) { + return .none + } else { + return .resume(continuation, .failure(CancellationError()), nil) + } } } @@ -697,10 +702,17 @@ struct _BroadcastSequenceStateMachine: Sendable { _ continuation: ConsumerContinuation, forSubscription id: _BroadcastSequenceStateMachine.Subscriptions.ID ) -> OnSetContinuation { - if self.subscriptions.setContinuation(continuation, forSubscriber: id) { - return .none - } else { - return .resume(continuation, .failure(CancellationError())) + // 'next(id)' must be checked first: an element might've been provided between the lock + // being dropped and a continuation being created and the lock being acquired again. + switch self.next(id) { + case .return(let resultAndProducers): + return .resume(continuation, resultAndProducers.nextResult, resultAndProducers.producers) + case .suspend: + if self.subscriptions.setContinuation(continuation, forSubscriber: id) { + return .none + } else { + return .resume(continuation, .failure(CancellationError()), nil) + } } } @@ -1149,7 +1161,7 @@ struct _BroadcastSequenceStateMachine: Sendable { @usableFromInline enum OnSetContinuation { case none - case resume(ConsumerContinuation, Result) + case resume(ConsumerContinuation, Result, ProducerContinuations?) } @inlinable @@ -1175,7 +1187,7 @@ struct _BroadcastSequenceStateMachine: Sendable { self._state = .streaming(state) case .finished(let state): - onSetContinuation = .resume(continuation, state.result.map { _ in nil }) + onSetContinuation = .resume(continuation, state.result.map { _ in nil }, nil) case ._modifying: // All values must have been produced, nothing to wait for.