|
4 | 4 | //
|
5 | 5 | // At some point we should define how this thing behaves when you iterate over it from multiple loops, or when you pass it around. I’m not yet sufficiently experienced with `AsyncSequence` to know what’s idiomatic. I tried the same thing out with `AsyncStream` (two tasks iterating over a single stream) and it appears that each element is delivered to precisely one consumer. But we can leave that for later. On a similar note consider whether it makes a difference whether this is a struct or a class.
|
6 | 6 | //
|
7 |
| -// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier. |
8 |
| -public struct Subscription<Element>: Sendable, AsyncSequence { |
9 |
| - // This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. |
10 |
| - public init<T: AsyncSequence>(mockAsyncSequence _: T) where T.Element == Element { |
11 |
| - fatalError("Not implemented") |
| 7 | +// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier; see https://github.com/ably-labs/ably-chat-swift/issues/21. |
| 8 | +public struct Subscription<Element: Sendable>: Sendable, AsyncSequence { |
| 9 | + private enum Mode: Sendable { |
| 10 | + case `default`(stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) |
| 11 | + case mockAsyncSequence(AnyNonThrowingAsyncSequence) |
12 | 12 | }
|
13 | 13 |
|
14 |
| - // (The below is just necessary boilerplate to get this to compile; the key point is that `next()` does not have a `throws` annotation.) |
| 14 | + /// A type-erased AsyncSequence that doesn’t throw any errors. |
| 15 | + fileprivate struct AnyNonThrowingAsyncSequence: AsyncSequence, Sendable { |
| 16 | + private var makeAsyncIteratorImpl: @Sendable () -> AsyncIterator |
| 17 | + |
| 18 | + init<T: AsyncSequence & Sendable>(asyncSequence: T) where T.Element == Element { |
| 19 | + makeAsyncIteratorImpl = { |
| 20 | + AsyncIterator(asyncIterator: asyncSequence.makeAsyncIterator()) |
| 21 | + } |
| 22 | + } |
| 23 | + |
| 24 | + fileprivate struct AsyncIterator: AsyncIteratorProtocol { |
| 25 | + private var nextImpl: () async -> Element? |
| 26 | + |
| 27 | + init<T: AsyncIteratorProtocol>(asyncIterator: T) where T.Element == Element { |
| 28 | + var iterator = asyncIterator |
| 29 | + nextImpl = { () async -> Element? in |
| 30 | + do { |
| 31 | + return try await iterator.next() |
| 32 | + } catch { |
| 33 | + fatalError("The AsyncSequence passed to Subscription.init(mockAsyncSequence:) threw an error: \(error). This is not supported.") |
| 34 | + } |
| 35 | + } |
| 36 | + } |
| 37 | + |
| 38 | + mutating func next() async -> Element? { |
| 39 | + await nextImpl() |
| 40 | + } |
| 41 | + } |
| 42 | + |
| 43 | + func makeAsyncIterator() -> AsyncIterator { |
| 44 | + makeAsyncIteratorImpl() |
| 45 | + } |
| 46 | + } |
| 47 | + |
| 48 | + private let mode: Mode |
| 49 | + |
| 50 | + internal init(bufferingPolicy: BufferingPolicy) { |
| 51 | + let (stream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy.asAsyncStreamBufferingPolicy()) |
| 52 | + mode = .default(stream: stream, continuation: continuation) |
| 53 | + } |
| 54 | + |
| 55 | + // This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. It is a programmer error to pass a throwing AsyncSequence. |
| 56 | + public init<T: AsyncSequence & Sendable>(mockAsyncSequence: T) where T.Element == Element { |
| 57 | + mode = .mockAsyncSequence(.init(asyncSequence: mockAsyncSequence)) |
| 58 | + } |
| 59 | + |
| 60 | + /** |
| 61 | + Causes the subscription to make a new element available on its `AsyncSequence` interface. |
| 62 | + |
| 63 | + It is a programmer error to call this when the receiver was created using ``init(mockAsyncSequence:)``. |
| 64 | + */ |
| 65 | + internal func emit(_ element: Element) { |
| 66 | + switch mode { |
| 67 | + case let .default(_, continuation): |
| 68 | + continuation.yield(element) |
| 69 | + case .mockAsyncSequence: |
| 70 | + fatalError("`emit` cannot be called on a Subscription that was created using init(mockAsyncSequence:)") |
| 71 | + } |
| 72 | + } |
15 | 73 |
|
16 | 74 | public struct AsyncIterator: AsyncIteratorProtocol {
|
| 75 | + fileprivate enum Mode { |
| 76 | + case `default`(iterator: AsyncStream<Element>.AsyncIterator) |
| 77 | + case mockAsyncSequence(iterator: AnyNonThrowingAsyncSequence.AsyncIterator) |
| 78 | + |
| 79 | + mutating func next() async -> Element? { |
| 80 | + switch self { |
| 81 | + case var .default(iterator: iterator): |
| 82 | + let next = await iterator.next() |
| 83 | + self = .default(iterator: iterator) |
| 84 | + return next |
| 85 | + case var .mockAsyncSequence(iterator: iterator): |
| 86 | + let next = await iterator.next() |
| 87 | + self = .mockAsyncSequence(iterator: iterator) |
| 88 | + return next |
| 89 | + } |
| 90 | + } |
| 91 | + } |
| 92 | + |
| 93 | + private var mode: Mode |
| 94 | + |
| 95 | + fileprivate init(mode: Mode) { |
| 96 | + self.mode = mode |
| 97 | + } |
| 98 | + |
17 | 99 | public mutating func next() async -> Element? {
|
18 |
| - fatalError("Not implemented") |
| 100 | + await mode.next() |
19 | 101 | }
|
20 | 102 | }
|
21 | 103 |
|
22 | 104 | public func makeAsyncIterator() -> AsyncIterator {
|
23 |
| - fatalError("Not implemented") |
| 105 | + let iteratorMode: AsyncIterator.Mode = switch mode { |
| 106 | + case let .default(stream: stream, continuation: _): |
| 107 | + .default(iterator: stream.makeAsyncIterator()) |
| 108 | + case let .mockAsyncSequence(asyncSequence): |
| 109 | + .mockAsyncSequence(iterator: asyncSequence.makeAsyncIterator()) |
| 110 | + } |
| 111 | + |
| 112 | + return .init(mode: iteratorMode) |
24 | 113 | }
|
25 | 114 | }
|
0 commit comments