diff --git a/Sources/.DS_Store b/Sources/.DS_Store new file mode 100644 index 0000000..d6f5935 Binary files /dev/null and b/Sources/.DS_Store differ diff --git a/Sources/FoundationExtensions/Combine/Publisher+Stream.swift b/Sources/FoundationExtensions/Combine/Publisher+Stream.swift new file mode 100644 index 0000000..3bb1247 --- /dev/null +++ b/Sources/FoundationExtensions/Combine/Publisher+Stream.swift @@ -0,0 +1,305 @@ +// Copyright © 2023 Lautsprecher Teufel GmbH. All rights reserved. + +import Combine + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension Publisher where Failure == Error { + /// Apple's `.values` implementation doesn't throw `Error`. Therefore, please use + /// `.stream` computed property over `.values`. + public var stream: AsyncThrowingStream { + return CombineAsyncStream(self).eraseToThrowingStream() + } +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension Publisher where Failure == Never { + public var stream: AsyncStream { + if #available(iOS 15.0, *) { + return values.eraseToStream() + } else { + return CombineAsyncStream(self).eraseToStream() + } + } +} + +// MARK: - Helpers +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +fileprivate class CombineAsyncStream: AsyncSequence { + typealias Element = Upstream.Output + typealias AsyncIterator = CombineAsyncStream + + private var stream: AsyncThrowingStream + private var cancellable: AnyCancellable? + private lazy var iterator = stream.makeAsyncIterator() + + fileprivate init(_ upstream: Upstream) { + stream = .init { _ in } + cancellable = nil + stream = .init { continuation in + continuation.onTermination = { [weak self] _ in + self?.cancellable?.cancel() + } + + cancellable = upstream + .handleEvents( + receiveCancel: { [weak self] in + continuation.finish(throwing: nil) + self?.cancellable = nil + } + ) + .sink(receiveCompletion: { [weak self] completion in + switch completion { + case .failure(let error): + continuation.finish(throwing: error) + case .finished: + continuation.finish(throwing: nil) + } + self?.cancellable = nil + }, receiveValue: { value in + continuation.yield(value) + }) + } } + + fileprivate func makeAsyncIterator() -> Self { + return self + } +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension CombineAsyncStream: AsyncIteratorProtocol { + fileprivate func next() async throws -> Upstream.Output? { + return try await iterator.next() + } +} + +// MARK: Below extensions copied from https://github.com/pointfreeco/swift-dependencies v0.2.0 +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncThrowingStream where Failure == Error { + /// Produces an `AsyncThrowingStream` from an `AsyncSequence` by consuming the sequence till it + /// terminates, rethrowing any failure. + /// + /// - Parameter sequence: An async sequence. + fileprivate init(_ sequence: S) where S.Element == Element { + var iterator: S.AsyncIterator? + self.init { + if iterator == nil { + iterator = sequence.makeAsyncIterator() + } + return try await iterator?.next() + } + } + + /// Constructs and returns a stream along with its backing continuation. + /// + /// This is handy for immediately escaping the continuation from an async stream, which typically + /// requires multiple steps: + /// + /// ```swift + /// var _continuation: AsyncThrowingStream.Continuation! + /// let stream = AsyncThrowingStream { continuation = $0 } + /// let continuation = _continuation! + /// + /// // vs. + /// + /// let (stream, continuation) = AsyncThrowingStream.streamWithContinuation() + /// ``` + /// + /// This tool is usually used for tests where we need to supply an async sequence to a dependency + /// endpoint and get access to its continuation so that we can emulate the dependency emitting + /// data. For example, suppose you have a dependency exposing an async sequence for listening to + /// notifications. To test this you can use `streamWithContinuation`: + /// + /// ```swift + /// func testScreenshots() { + /// let screenshots = AsyncThrowingStream.streamWithContinuation() + /// + /// let model = withDependencies { + /// $0.screenshots = { screenshots.stream } + /// } operation: { + /// FeatureModel() + /// } + /// + /// XCTAssertEqual(model.screenshotCount, 0) + /// screenshots.continuation.yield() // Simulate a screenshot being taken. + /// XCTAssertEqual(model.screenshotCount, 1) + /// } + /// ``` + /// + /// > Warning: ⚠️ `AsyncStream` does not support multiple subscribers, therefore you can only use + /// > this helper to test features that do not subscribe multiple times to the dependency + /// > endpoint. + /// + /// - Parameters: + /// - elementType: The type of element the `AsyncThrowingStream` produces. + /// - limit: A Continuation.BufferingPolicy value to set the stream’s buffering behavior. By + /// default, the stream buffers an unlimited number of elements. You can also set the policy + /// to buffer a specified number of oldest or newest elements. + /// - Returns: An `AsyncThrowingStream`. + fileprivate static func streamWithContinuation( + _ elementType: Element.Type = Element.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: Self, continuation: Continuation) { + var continuation: Continuation! + return (Self(elementType, bufferingPolicy: limit) { continuation = $0 }, continuation) + } + + /// An `AsyncThrowingStream` that never emits and never completes unless cancelled. + fileprivate static var never: Self { + Self { _ in } + } + + /// An `AsyncThrowingStream` that completes immediately. + /// + /// - Parameter error: An optional error the stream completes with. + fileprivate static func finished(throwing error: Failure? = nil) -> Self { + Self { $0.finish(throwing: error) } + } +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncSequence { + /// Erases this async sequence to an async throwing stream that produces elements till this + /// sequence terminates, rethrowing any error on failure. + fileprivate func eraseToThrowingStream() -> AsyncThrowingStream { + AsyncThrowingStream(self) + } +} + +// Below extension copied from https://github.com/pointfreeco/swift-dependencies v0.2.0 +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncStream { + /// Produces an `AsyncStream` from an `AsyncSequence` by consuming the sequence till it + /// terminates, ignoring any failure. + /// + /// Useful as a kind of type eraser for live `AsyncSequence`-based dependencies. + /// + /// For example, your feature may want to subscribe to screenshot notifications. You can model + /// this as a dependency client that returns an `AsyncStream`: + /// + /// ```swift + /// struct ScreenshotsClient { + /// var screenshots: () -> AsyncStream + /// func callAsFunction() -> AsyncStream { self.screenshots() } + /// } + /// ``` + /// + /// The "live" implementation of the dependency can supply a stream by erasing the appropriate + /// `NotificationCenter.Notifications` async sequence: + /// + /// ```swift + /// extension ScreenshotsClient { + /// static let live = Self( + /// screenshots: { + /// AsyncStream( + /// NotificationCenter.default + /// .notifications(named: UIApplication.userDidTakeScreenshotNotification) + /// .map { _ in } + /// ) + /// } + /// ) + /// } + /// ``` + /// + /// While your tests can use `AsyncStream.streamWithContinuation` to spin up a controllable stream + /// for tests: + /// + /// ```swift + /// func testScreenshots() { + /// let screenshots = AsyncStream.streamWithContinuation() + /// + /// let model = withDependencies { + /// $0.screenshots = { screenshots.stream } + /// } operation: { + /// FeatureModel() + /// } + /// + /// XCTAssertEqual(model.screenshotCount, 0) + /// screenshots.continuation.yield() // Simulate a screenshot being taken. + /// XCTAssertEqual(model.screenshotCount, 1) + /// } + /// ``` + /// + /// - Parameter sequence: An async sequence. + fileprivate init(_ sequence: S) where S.Element == Element { + var iterator: S.AsyncIterator? + self.init { + if iterator == nil { + iterator = sequence.makeAsyncIterator() + } + return try? await iterator?.next() + } + } + + /// Constructs and returns a stream along with its backing continuation. + /// + /// This is handy for immediately escaping the continuation from an async stream, which typically + /// requires multiple steps: + /// + /// ```swift + /// var _continuation: AsyncStream.Continuation! + /// let stream = AsyncStream { continuation = $0 } + /// let continuation = _continuation! + /// + /// // vs. + /// + /// let (stream, continuation) = AsyncStream.streamWithContinuation() + /// ``` + /// + /// This tool is usually used for tests where we need to supply an async sequence to a dependency + /// endpoint and get access to its continuation so that we can emulate the dependency emitting + /// data. For example, suppose you have a dependency exposing an async sequence for listening to + /// notifications. To test this you can use `streamWithContinuation`: + /// + /// ```swift + /// func testScreenshots() { + /// let screenshots = AsyncStream.streamWithContinuation() + /// + /// let model = withDependencies { + /// $0.screenshots = { screenshots.stream } + /// } operation: { + /// FeatureModel() + /// } + /// + /// XCTAssertEqual(model.screenshotCount, 0) + /// screenshots.continuation.yield() // Simulate a screenshot being taken. + /// XCTAssertEqual(model.screenshotCount, 1) + /// } + /// ``` + /// + /// > Warning: ⚠️ `AsyncStream` does not support multiple subscribers, therefore you can only use + /// > this helper to test features that do not subscribe multiple times to the dependency + /// > endpoint. + /// + /// - Parameters: + /// - elementType: The type of element the `AsyncStream` produces. + /// - limit: A Continuation.BufferingPolicy value to set the stream’s buffering behavior. By + /// default, the stream buffers an unlimited number of elements. You can also set the policy + /// to buffer a specified number of oldest or newest elements. + /// - Returns: An `AsyncStream`. + fileprivate static func streamWithContinuation( + _ elementType: Element.Type = Element.self, + bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded + ) -> (stream: Self, continuation: Continuation) { + var continuation: Continuation! + return (Self(elementType, bufferingPolicy: limit) { continuation = $0 }, continuation) + } + + /// An `AsyncStream` that never emits and never completes unless cancelled. + fileprivate static var never: Self { + Self { _ in } + } + + /// An `AsyncStream` that never emits and completes immediately. + fileprivate static var finished: Self { + Self { $0.finish() } + } +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncSequence { + /// Erases this async sequence to an async stream that produces elements till this sequence + /// terminates (or fails). + fileprivate func eraseToStream() -> AsyncStream { + AsyncStream(self) + } +} diff --git a/Sources/FoundationExtensions/Promise/Promise+Value.swift b/Sources/FoundationExtensions/Promise/Promise+Value.swift new file mode 100644 index 0000000..f25af56 --- /dev/null +++ b/Sources/FoundationExtensions/Promise/Promise+Value.swift @@ -0,0 +1,32 @@ +// Copyright © 2023 Lautsprecher Teufel GmbH. All rights reserved. + +import Combine + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension Publishers.Promise { + /// Convert a `Promise` to `async throws -> Output`. + /// + /// Usage + /// + /// Publishers.Promise(value: 1).value + public var value: Output { + get async throws { + let asyncPromise = AsyncPromise() + return try await withTaskCancellationHandler(operation: { try await asyncPromise.value(from: self) }, + onCancel: { asyncPromise.cancel }) + } + } +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +fileprivate class AsyncPromise { + private var cancellable: AnyCancellable? + fileprivate var cancel: Void { cancellable = nil } + fileprivate func value(from promise: Publishers.Promise) async throws -> Success { + try await withCheckedThrowingContinuation { [weak self] continuation in + self?.cancellable = promise + .run(onSuccess: { success in continuation.resume(returning: success) }, + onFailure: { error in continuation.resume(throwing: error) }) + } + } +} diff --git a/Tests/FoundationExtensionsTests/Combine/Combine+Stream.swift b/Tests/FoundationExtensionsTests/Combine/Combine+Stream.swift new file mode 100644 index 0000000..d64a86b --- /dev/null +++ b/Tests/FoundationExtensionsTests/Combine/Combine+Stream.swift @@ -0,0 +1,131 @@ +// Copyright © 2023 Lautsprecher Teufel GmbH. All rights reserved. + +#if !os(watchOS) && canImport(Combine) +import FoundationExtensions +import Combine +import XCTest + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +class PublishersCombineValuesTests: XCTestCase { + func test_AsyncStream_WhenCurrentValueSubjectPublishesInt1_StreamPublishesInt1() async throws { + // given + let testStore = TestStore(nil) + let result = 1 + let publisher = CurrentValueSubject(nil) + _ = Task { + for try await value in publisher.stream { + await testStore.setValue(value) + } + } + + // when + publisher.send(result) + + // then + try await then + let receivedValue = await testStore.value + XCTAssertEqual(receivedValue, result) + } + + func test_AsyncStream_WhenCurrentValueSubjectFinishes_StreamReturnsVoid() async { + // given + let publisher = CurrentValueSubject(nil) + let sut = Task { + for await _ in publisher.stream { } + } + + // when + publisher.send(completion: .finished) + + // then + let isCompletionSuccessful = await sut.result.isSuccess + XCTAssertTrue(isCompletionSuccessful) + } + + func test_AsyncThrowingStream_WhenCurrentValueSubjectPublishesInt1_StreamPublishesInt1() async throws { + // given + let testStore = TestStore(nil) + let result = 1 + let publisher = CurrentValueSubject(nil) + _ = Task { + for try await value in publisher.stream { + await testStore.setValue(value) + } + } + + // when + publisher.send(result) + + // then + try await then + let receivedValue = await testStore.value + XCTAssertEqual(receivedValue, result) + } + + func test_AsyncThrowingStream_WhenCurrentValueSubjectFinishesWithError_StreamThrows() async { + // given + let result = TestFailure.foo + let publisher = CurrentValueSubject(nil) + let sut = Task { + for try await _ in publisher.stream { } + } + + // when + publisher.send(completion: .failure(TestFailure.foo)) + + // then + let error = await returnError { try await sut.value } as? TestFailure + XCTAssertEqual(error, result) + } + + func test_AsyncThrowingStream_WhenCurrentValueSubjectFinishes_StreamReturnsVoid() async { + // given + let publisher = CurrentValueSubject(nil) + let sut = Task { + for try await _ in publisher.stream { } + } + + // when + publisher.send(completion: .finished) + + // then + let isCompletionSuccessful = await sut.result.isSuccess + XCTAssertTrue(isCompletionSuccessful) + } +} + +// MARK: - Helpers +private enum TestFailure: Error, Equatable { + case foo +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private func returnError(_ context: @escaping () async throws -> Void) async -> Error? { + do { + try await context() + return nil + } catch { + return error + } +} + +// This is necessary to sync tasks from background threads. +// `actor` will queue all write / read process from any thread. +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private actor TestStore { + private(set) var value: V + init(_ value: V) { self.value = value } + func setValue(_ value: V) { + self.value = value + } +} + +// Workaround! +// This is needed before accessing actor's property otherwise we can't get updated value. +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private var then: Void { + get async throws { + try await Task.sleep(nanoseconds: 0) + } +} +#endif diff --git a/Tests/FoundationExtensionsTests/Promise/PromiseTests.swift b/Tests/FoundationExtensionsTests/Promise/PromiseTests.swift index 258f1b1..1abad73 100644 --- a/Tests/FoundationExtensionsTests/Promise/PromiseTests.swift +++ b/Tests/FoundationExtensionsTests/Promise/PromiseTests.swift @@ -308,6 +308,56 @@ extension PromiseTests { timeout: 0.1) waiter() } + + func test_PromiseValueComputedProperty_WhenPromisePublishesInt1After1SecondDelay_TryAwaitValueReturns1() async throws { + // given + let result = 1 + let sut = Publishers.Promise { promise in + Task { + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + promise(.success(result)) + } + return AnyCancellable {} + } + + // when + let promisedValue = try await sut.value + + // then + XCTAssertEqual(promisedValue, result) + } + + func test_PromiseValueComputedProperty_WhenPromisePublishesErrorAfter1SecondDelay_TryAwaitValueThrows() async throws { + // given + let result = TestFailure.foo + let sut = Publishers.Promise { promise in + Task { + try await Task.sleep(nanoseconds: NSEC_PER_SEC) + promise(.failure(result)) + } + return AnyCancellable {} + } + + // when + let error = await returnError { _ = try await sut.value } as? TestFailure + + XCTAssertEqual(error, result) + } +} + +// MARK: - Helpers +private enum TestFailure: Error { + case foo +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private func returnError(_ context: @escaping () async throws -> Void) async -> Error? { + do { + try await context() + return nil + } catch { + return error + } } extension XCTestCase {