diff --git a/Sources/StateGraph/Observation/GraphTracking.swift b/Sources/StateGraph/Observation/GraphTracking.swift new file mode 100644 index 0000000..163cb59 --- /dev/null +++ b/Sources/StateGraph/Observation/GraphTracking.swift @@ -0,0 +1,308 @@ +import os + +/// An asynchronous sequence that tracks changes to StateGraph nodes. +/// +/// `GraphTrackings` provides an AsyncSequence-based API for observing changes to multiple nodes, +/// similar to `withGraphTrackingMap` but using Swift's async/await concurrency model. +/// +/// ## Usage +/// ```swift +/// let firstName = Stored(wrappedValue: "John") +/// let lastName = Stored(wrappedValue: "Doe") +/// +/// for await fullName in GraphTrackings({ +/// "\(firstName.wrappedValue) \(lastName.wrappedValue)" +/// }) { +/// print("Full name: \(fullName)") +/// } +/// ``` +/// +/// ## Features +/// - Emits the current value immediately on first iteration (startWith behavior) +/// - Dynamically tracks nodes accessed during each emission +/// - Supports task cancellation +/// - Thread-safe using OSAllocatedUnfairLock +/// - Respects actor isolation context +/// +/// This implementation is inspired by Apple's Observations pattern. +/// Reference: https://github.com/swiftlang/swift/blob/main/stdlib/public/Observation/Sources/Observation/Observations.swift +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) +public struct GraphTrackings: AsyncSequence, + Sendable +{ + + public typealias AsyncIterator = Iterator + + public enum Iteration: Sendable { + case next(Element) + case finish + } + + enum Emit: Sendable { + case iteration(@isolated(any) @Sendable () throws(Failure) -> Iteration) + case element(@isolated(any) @Sendable () throws(Failure) -> Element) + + var isolation: (any Actor)? { + switch self { + case .iteration(let closure): + return closure.isolation + case .element(let closure): + return closure.isolation + } + } + } + + private let emit: Emit + + public init( + _ emit: @escaping @isolated(any) @Sendable () throws(Failure) -> Element + ) { + self.emit = .element(emit) + } + + /// Creates a GraphTrackings that iterates until the closure returns `.finish`. + /// + /// Use this when you need programmatic control over when the sequence terminates. + /// + /// ## Usage + /// ```swift + /// var count = 0 + /// for await value in GraphTrackings.untilFinished({ + /// count += 1 + /// if count > 5 { + /// return .finish + /// } + /// return .next(someNode.wrappedValue) + /// }) { + /// print(value) + /// } + /// ``` + public static func untilFinished( + @_inheritActorContext _ emit: + @escaping @isolated(any) @Sendable () throws(Failure) -> Iteration + ) -> GraphTrackings { + .init(emit: .iteration(emit)) + } + + private init(emit: Emit) { + self.emit = emit + } + + public func makeAsyncIterator() -> Iterator { + Iterator(emit: emit) + } + + // State matches Apple's _ManagedCriticalState pattern + struct State { + enum Continuation { + case cancelled + case active(UnsafeContinuation) + + func resume() { + switch self { + case .cancelled: + break + case .active(let continuation): + continuation.resume() + } + } + } + + var id: Int = 0 + var continuations: [Int: Continuation] = [:] + var dirty: Bool = false + + // create a generation id for the unique identification of the continuations + // this allows the shared awaiting of the willSets. + // Most likely, there wont be more than a handful of active iterations + // so this only needs to be unique for those active iterations + // that are in the process of calling next. + static func generation(_ state: OSAllocatedUnfairLock) -> Int { + state.withLock { state in + defer { state.id &+= 1 } + return state.id + } + } + + // the cancellation of awaiting on willSet only ferries in resuming early + // it is the responsability of the caller to check if the task is actually + // cancelled after awaiting the willSet to act accordingly. + static func cancel(_ state: OSAllocatedUnfairLock, id: Int) { + state.withLock { state in + guard let continuation = state.continuations.removeValue(forKey: id) + else { + // if there was no continuation yet active (e.g. it was cancelled at + // the start of the invocation, then put a tombstone in to gate that + // resuming later + state.continuations[id] = .cancelled + return nil as Continuation? + } + return continuation + }?.resume() + } + + // fire off ALL awaiting willChange continuations such that they are no + // longer pending. + static func emitWillChange(_ state: OSAllocatedUnfairLock) { + let continuations = state.withLock { state in + // if there are no continuations present then we have to set the state as dirty + // else if this is uncondiitonally set the state might produce duplicate events + // one for the dirty and one for the continuation. + if state.continuations.count == 0 { + state.dirty = true + } + defer { + state.continuations.removeAll() + } + return state.continuations.values + } + for continuation in continuations { + continuation.resume() + } + } + + // install a willChange continuation into the set of continuations + // this must take a locally unique id (to the active calls of next) + static func willChange( + isolation iterationIsolation: isolated (any Actor)? = #isolation, + state: OSAllocatedUnfairLock, + id: Int + ) async { + return await withUnsafeContinuation(isolation: iterationIsolation) { + continuation in + state.withLock { state in + defer { state.dirty = false } + switch state.continuations[id] { + case .cancelled: + return continuation as UnsafeContinuation? + case .active: + // the Iterator itself cannot be shared across isolations so any call to next that may share an id is a misbehavior + // or an internal book-keeping failure + fatalError("Iterator incorrectly shared across task isolations") + case .none: + if state.dirty { + return continuation + } else { + state.continuations[id] = .active(continuation) + return nil + } + } + }?.resume() + } + } + } + + public struct Iterator: AsyncIteratorProtocol { + // OSAllocatedUnfairLock pattern (Apple's _ManagedCriticalState equivalent) + private var state: OSAllocatedUnfairLock? + private let emit: Emit + private var started = false + + init(emit: Emit) { + self.emit = emit + self.state = OSAllocatedUnfairLock(initialState: State()) + } + + fileprivate mutating func terminate( + throwing failure: Failure? = nil, + id: Int + ) throws(Failure) -> Element? { + // this is purely defensive to any leaking out of iteration generation ids + state?.withLock { state in + state.continuations.removeValue(forKey: id) + }?.resume() + // flag the sequence as terminal by nil'ing out the state + state = nil + if let failure { + throw failure + } else { + return nil + } + } + + // this is the primary implementation of the tracking + // it is bound to be called on the specified isolation of the construction + fileprivate static func trackEmission( + isolation trackingIsolation: isolated (any Actor)?, + state: OSAllocatedUnfairLock, + emit: Emit + ) throws(Failure) -> Iteration { + // this ferries in an intermediate form with Result to skip over `withObservationTracking` not handling errors being thrown + // particularly this case is that the error is also an iteration state transition data point (it terminates the sequence) + // so we need to hold that to get a chance to catch and clean-up + let result = withStateGraphTracking { + switch emit { + case .element(let element): + Result(catching: element).map { Iteration.next($0) } + case .iteration(let iteration): + Result(catching: iteration) + } + } didChange: { [state] in + // resume all cases where the awaiting continuations are awaiting a willSet + State.emitWillChange(state) + } + return try result.get() + } + + fileprivate mutating func trackEmission( + isolation iterationIsolation: isolated (any Actor)?, + state: OSAllocatedUnfairLock, + id: Int + ) async throws(Failure) -> Element? { + guard !Task.isCancelled else { + // the task was cancelled while awaiting a willChange so ensure a proper termination + return try terminate(id: id) + } + // start by directly tracking the emission via a withObservation tracking on the isolation specified from the init + switch try await Iterator.trackEmission( + isolation: emit.isolation, + state: state, + emit: emit + ) { + case .finish: return try terminate(id: id) + case .next(let element): return element + } + } + + public mutating func next( + isolation actor: isolated (any Actor)? + ) async throws(Failure) -> Element? { + + guard let state else { return nil } + + // Get a unique generation ID for this wait + let id = State.generation(state) + + do { + // First call: emit immediately (startWith behavior) + if started == false { + started = true + return try await trackEmission(isolation: actor, state: state, id: id) + } else { + // wait for the willChange (and NOT the value itself) + // since this is going to be on the isolation of the object (e.g. the isolation specified in the initialization) + // this will mean our next await for the emission will ensure the suspension return of the willChange context + // back to the trailing edges of the mutations. In short, this enables the transactionality bounded by the + // isolation of the mutation. + await withTaskCancellationHandler( + operation: { + await State.willChange(isolation: actor, state: state, id: id) + }, + onCancel: { + // ensure to clean out our continuation uon cancellation + State.cancel(state, id: id) + }, + isolation: actor + ) + return try await trackEmission(isolation: actor, state: state, id: id) + } + + } catch { + // the user threw a failure in the closure so propigate that outwards and terminate the sequence + return try terminate(throwing: error, id: id) + } + + } + + } +} diff --git a/Sources/StateGraph/Observation/withTracking.swift b/Sources/StateGraph/Observation/withTracking.swift index 76ce0a3..9cec6b7 100644 --- a/Sources/StateGraph/Observation/withTracking.swift +++ b/Sources/StateGraph/Observation/withTracking.swift @@ -61,6 +61,73 @@ func withContinuousStateGraphTracking( } } +/// Creates an `AsyncStream` that emits projected values whenever tracked StateGraph nodes change. +/// +/// This function provides a convenient way to observe StateGraph node changes using Swift's +/// async/await concurrency model. The stream emits the current value immediately upon iteration, +/// then emits new values whenever any accessed node changes. +/// +/// ## Basic Usage +/// ```swift +/// let counter = Stored(wrappedValue: 0) +/// +/// for await value in withStateGraphTrackingStream(apply: { +/// counter.wrappedValue +/// }) { +/// print("Counter: \(value)") +/// } +/// ``` +/// +/// ## Important: Single-Consumer Stream +/// +/// This function returns an `AsyncStream`, which is **single-consumer**. +/// When multiple iterators consume the same stream, values are distributed between them +/// in a racing manner rather than being duplicated to each iterator. +/// +/// ```swift +/// let stream = withStateGraphTrackingStream { model.counter } +/// +/// // ⚠️ Values are NOT duplicated - they compete for values +/// let taskA = Task { for await v in stream { print("A: \(v)") } } +/// let taskB = Task { for await v in stream { print("B: \(v)") } } +/// // Output might be: A: 0, B: 1, A: 2 (racing behavior) +/// ``` +/// +/// ## Multi-Consumer Alternative +/// +/// If you need multiple independent consumers that each receive all values, +/// use ``GraphTrackings`` instead (available on iOS 18+): +/// +/// ```swift +/// // Each iterator gets its own independent stream of all values +/// let trackings = GraphTrackings { model.counter } +/// +/// let taskA = Task { for await v in trackings { print("A: \(v)") } } +/// let taskB = Task { for await v in trackings { print("B: \(v)") } } +/// // Output: A: 0, B: 0, A: 1, B: 1, A: 2, B: 2 (both receive all values) +/// ``` +/// +/// ## Comparison Table +/// +/// | Feature | `withStateGraphTrackingStream` | `GraphTrackings` | +/// |---------|-------------------------------|------------------| +/// | Return Type | `AsyncStream` | `AsyncSequence` | +/// | Consumer Model | Single-consumer | Multi-consumer | +/// | Value Distribution | Racing (values split) | Duplicated (all receive) | +/// | iOS Availability | iOS 13+ | iOS 18+ | +/// +/// - Parameters: +/// - apply: A closure that accesses StateGraph nodes and returns a projected value. +/// This closure is called initially and whenever tracked nodes change. +/// - isolation: The actor isolation context for the tracking. Defaults to the caller's isolation. +/// +/// - Returns: An `AsyncStream` that emits the projected value from `apply` whenever tracked nodes change. +/// +/// - Note: The stream automatically handles cancellation. When the consuming task is cancelled, +/// the internal tracking stops. +/// +/// - SeeAlso: ``GraphTrackings`` for multi-consumer scenarios +/// - SeeAlso: ``withContinuousStateGraphTracking(_:didChange:isolation:)`` for callback-based tracking public func withStateGraphTrackingStream( apply: @escaping () -> T, isolation: isolated (any Actor)? = #isolation diff --git a/Tests/StateGraphTests/GraphTrackingsTests.swift b/Tests/StateGraphTests/GraphTrackingsTests.swift new file mode 100644 index 0000000..470ab23 --- /dev/null +++ b/Tests/StateGraphTests/GraphTrackingsTests.swift @@ -0,0 +1,237 @@ +import Foundation +import Testing + +@testable import StateGraph + +@Suite("GraphTrackings Tests") +struct GraphTrackingsTests { + + final class ValueCollector: @unchecked Sendable { + private let lock = NSLock() + private var _values: [T] = [] + + var values: [T] { + lock.lock() + defer { lock.unlock() } + return _values + } + + var count: Int { + lock.lock() + defer { lock.unlock() } + return _values.count + } + + func append(_ value: T) { + lock.lock() + defer { lock.unlock() } + _values.append(value) + } + } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + @Test + func basicAsyncSequence() async throws { + let firstName = Stored(wrappedValue: "John") + let lastName = Stored(wrappedValue: "Doe") + let collector = ValueCollector() + + let task = Task { + for await fullName in GraphTrackings({ + "\(firstName.wrappedValue) \(lastName.wrappedValue)" + }) { + collector.append(fullName) + if collector.count >= 3 { + break + } + } + } + + try await Task.sleep(nanoseconds: 50_000_000) + + firstName.wrappedValue = "Jane" + try await Task.sleep(nanoseconds: 100_000_000) + + lastName.wrappedValue = "Smith" + try await Task.sleep(nanoseconds: 100_000_000) + + try await task.value + + #expect(collector.values == ["John Doe", "Jane Doe", "Jane Smith"]) + } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + @Test + func startWithBehavior() async throws { + let value = Stored(wrappedValue: 42) + let collector = ValueCollector() + + let task = Task { + for await v in GraphTrackings({ value.wrappedValue }) { + collector.append(v) + break + } + } + + try await task.value + + #expect(collector.values == [42]) + } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + @Test + func dynamicTracking() async throws { + let useA = Stored(wrappedValue: true) + let valueA = Stored(wrappedValue: "A") + let valueB = Stored(wrappedValue: "B") + let collector = ValueCollector() + + let task = Task { + for await v in GraphTrackings({ + useA.wrappedValue ? valueA.wrappedValue : valueB.wrappedValue + }) { + collector.append(v) + if collector.count >= 3 { + break + } + } + } + + try await Task.sleep(nanoseconds: 50_000_000) + + // Change valueB (not tracked when useA is true) + valueB.wrappedValue = "B2" + try await Task.sleep(nanoseconds: 100_000_000) + + // Switch to B + useA.wrappedValue = false + try await Task.sleep(nanoseconds: 100_000_000) + + // Now valueB is tracked + valueB.wrappedValue = "B3" + try await Task.sleep(nanoseconds: 100_000_000) + + try await task.value + + #expect(collector.values == ["A", "B2", "B3"]) + } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + @Test + func taskCancellation() async throws { + let value = Stored(wrappedValue: 0) + let collector = ValueCollector() + + let task = Task { + for await v in GraphTrackings({ value.wrappedValue }) { + collector.append(v) + } + } + + try await Task.sleep(nanoseconds: 50_000_000) + + value.wrappedValue = 1 + try await Task.sleep(nanoseconds: 100_000_000) + + task.cancel() + try await Task.sleep(nanoseconds: 50_000_000) + + value.wrappedValue = 2 + try await Task.sleep(nanoseconds: 100_000_000) + + #expect(collector.count <= 2) + } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + @Test + func untilFinished() async throws { + let iterationCounter = Stored(wrappedValue: 0) + let value = Stored(wrappedValue: 0) + let collector = ValueCollector() + + let task = Task { + for await v in GraphTrackings.untilFinished({ + iterationCounter.wrappedValue += 1 + if iterationCounter.wrappedValue > 3 { + return .finish + } + return .next(value.wrappedValue) + }) { + collector.append(v) + } + } + + try await Task.sleep(nanoseconds: 50_000_000) + + value.wrappedValue = 10 + try await Task.sleep(nanoseconds: 100_000_000) + + value.wrappedValue = 20 + try await Task.sleep(nanoseconds: 100_000_000) + + value.wrappedValue = 30 + try await Task.sleep(nanoseconds: 100_000_000) + + try await task.value + + #expect(collector.values == [0, 10, 20]) + } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + @Test + func multipleNodes() async throws { + let x = Stored(wrappedValue: 1) + let y = Stored(wrappedValue: 2) + let z = Stored(wrappedValue: 3) + let collector = ValueCollector() + + let task = Task { + for await sum in GraphTrackings({ x.wrappedValue + y.wrappedValue }) { + collector.append(sum) + if collector.count >= 3 { + break + } + } + } + + try await Task.sleep(nanoseconds: 50_000_000) + + x.wrappedValue = 5 + try await Task.sleep(nanoseconds: 100_000_000) + + // Change z (not tracked) + z.wrappedValue = 100 + try await Task.sleep(nanoseconds: 50_000_000) + + y.wrappedValue = 10 + try await Task.sleep(nanoseconds: 100_000_000) + + try await task.value + + #expect(collector.values == [3, 7, 15]) + } + + +} + +import Observation + +@available(macOS 26, iOS 26, tvOS 26, watchOS 26, *) +struct Syntax { + + + func basic() { + + let s = Observations { + + } + + Task { + for await e in s { + + } + } + + } + +} diff --git a/Tests/StateGraphTests/Tests.swift b/Tests/StateGraphTests/Tests.swift index aa13d9a..fb63ff4 100644 --- a/Tests/StateGraphTests/Tests.swift +++ b/Tests/StateGraphTests/Tests.swift @@ -545,7 +545,7 @@ struct StreamTests { _ = model.counter }) - Task.detached { + await Task.detached { for await _ in stream { print(model.counter) #expect(model.counter == expectation.withLock { $0 }) @@ -555,6 +555,7 @@ struct StreamTests { } } } + .value } @@ -580,4 +581,79 @@ struct StreamTests { } + /// Test that AsyncStream from withStateGraphTrackingStream is single-consumer. + /// Only the first iterator receives values, the second iterator gets nothing. + @Test func single_consumer_stream() async { + let model = NestedModel() + + let receivedByA = OSAllocatedUnfairLock<[Int]>(initialState: []) + let receivedByB = OSAllocatedUnfairLock<[Int]>(initialState: []) + + // Create a single stream + let stream = withStateGraphTrackingStream(apply: { + model.counter + }) + + // Iterator A - will receive values + let taskA = Task { + for await value in stream { + receivedByA.withLock { $0.append(value) } + if value == 2 { + break + } + } + } + + // Iterator B - trying to consume the same stream + let taskB = Task { + for await value in stream { + receivedByB.withLock { $0.append(value) } + if value == 2 { + break + } + } + } + + try! await Task.sleep(for: .milliseconds(100)) + + model.counter = 1 + try! await Task.sleep(for: .milliseconds(100)) + + model.counter = 2 + try! await Task.sleep(for: .milliseconds(100)) + + // Cancel tasks after a short wait to prevent hanging + // (one iterator will never receive value 2 because AsyncStream is single-consumer) + taskA.cancel() + taskB.cancel() + + // Wait a bit for cancellation to propagate + try! await Task.sleep(for: .milliseconds(50)) + + let valuesA = receivedByA.withLock { $0 } + let valuesB = receivedByB.withLock { $0 } + + print("Iterator A received: \(valuesA)") + print("Iterator B received: \(valuesB)") + + // AsyncStream is single-consumer: values are NOT duplicated + // Multiple iterators compete for values (racing behavior) + #expect(valuesA.count > 0 || valuesB.count > 0, "At least one iterator should receive values") + + // Verify single-consumer behavior: + // 1. Values are NOT duplicated (each value goes to exactly one iterator) + // 2. Together, both iterators receive all values + let allReceivedValues = Set(valuesA + valuesB) + let expectedValues = Set([0, 1, 2]) + #expect(allReceivedValues == expectedValues, "Together, iterators should receive all values: \(allReceivedValues)") + + // Verify NO duplication - if values were duplicated, combined count would be > 3 + let combinedCount = valuesA.count + valuesB.count + #expect(combinedCount == 3, "Each value should be delivered exactly once (no duplication): combined=\(combinedCount)") + + // This demonstrates that AsyncStream is single-consumer: + // - Values are NOT duplicated between iterators (unlike GraphTrackings) + // - Multiple iterators compete for values in a racing manner + } + }