Skip to content

Commit 202f70a

Browse files
Implement the Subscription type
Note that XCTAssertEqual doesn’t allow you to write `await` inside its arguments, hence the indirection to get the result of a couple of `async let`s. Hopefully we’ll be able to migrate to Swift Testing at some point, which will resolve this; see #21. I’ve also implemented MessageSubscription by wrapping Subscription.
1 parent 783865c commit 202f70a

File tree

7 files changed

+224
-14
lines changed

7 files changed

+224
-14
lines changed

Package.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ let package = Package(
5959
name: "AblyChatTests",
6060
dependencies: [
6161
"AblyChat",
62+
.product(
63+
name: "AsyncAlgorithms",
64+
package: "swift-async-algorithms"
65+
),
6266
]
6367
),
6468
.executableTarget(

Package@swift-6.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ let package = Package(
4545
name: "AblyChatTests",
4646
dependencies: [
4747
"AblyChat",
48+
.product(
49+
name: "AsyncAlgorithms",
50+
package: "swift-async-algorithms"
51+
),
4852
]
4953
),
5054
.executableTarget(

Sources/AblyChat/BufferingPolicy.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,15 @@ public enum BufferingPolicy: Sendable {
44
case unbounded
55
case bufferingOldest(Int)
66
case bufferingNewest(Int)
7+
8+
internal func asAsyncStreamBufferingPolicy<T>() -> AsyncStream<T>.Continuation.BufferingPolicy {
9+
switch self {
10+
case let .bufferingNewest(count):
11+
.bufferingNewest(count)
12+
case let .bufferingOldest(count):
13+
.bufferingOldest(count)
14+
case .unbounded:
15+
.unbounded
16+
}
17+
}
718
}

Sources/AblyChat/Messages.swift

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,21 +54,43 @@ public struct QueryOptionsWithoutDirection: Sendable {
5454
public struct MessageSubscription: Sendable, AsyncSequence {
5555
public typealias Element = Message
5656

57-
public init<T: AsyncSequence>(mockAsyncSequence _: T) where T.Element == Element {
58-
fatalError("Not yet implemented")
57+
private var subscription: Subscription<Element>
58+
59+
private typealias MockGetPreviousMessages = @Sendable (QueryOptionsWithoutDirection) async throws -> any PaginatedResult<Message>
60+
private var mockGetPreviousMessages: MockGetPreviousMessages?
61+
62+
internal init(bufferingPolicy: BufferingPolicy) {
63+
subscription = .init(bufferingPolicy: bufferingPolicy)
64+
}
65+
66+
public init<T: AsyncSequence & Sendable>(mockAsyncSequence: T, mockGetPreviousMessages _: (QueryOptionsWithoutDirection) async throws -> any PaginatedResult<Message>) where T.Element == Element {
67+
subscription = .init(mockAsyncSequence: mockAsyncSequence)
68+
}
69+
70+
internal func emit(_ element: Element) {
71+
subscription.emit(element)
5972
}
6073

61-
public func getPreviousMessages(params _: QueryOptionsWithoutDirection) async throws -> any PaginatedResult<Message> {
62-
fatalError("Not yet implemented")
74+
public func getPreviousMessages(params: QueryOptionsWithoutDirection) async throws -> any PaginatedResult<Message> {
75+
guard let mockImplementation = mockGetPreviousMessages else {
76+
fatalError("Not yet implemented")
77+
}
78+
return try await mockImplementation(params)
6379
}
6480

6581
public struct AsyncIterator: AsyncIteratorProtocol {
82+
private var subscriptionIterator: Subscription<Element>.AsyncIterator
83+
84+
fileprivate init(subscriptionIterator: Subscription<Element>.AsyncIterator) {
85+
self.subscriptionIterator = subscriptionIterator
86+
}
87+
6688
public mutating func next() async -> Element? {
67-
fatalError("Not implemented")
89+
await subscriptionIterator.next()
6890
}
6991
}
7092

7193
public func makeAsyncIterator() -> AsyncIterator {
72-
fatalError("Not implemented")
94+
.init(subscriptionIterator: subscription.makeAsyncIterator())
7395
}
7496
}

Sources/AblyChat/Subscription.swift

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,111 @@
44
//
55
// At some point we should define how this thing behaves when you iterate over it from multiple loops, or when you pass it around. I’m not yet sufficiently experienced with `AsyncSequence` to know what’s idiomatic. I tried the same thing out with `AsyncStream` (two tasks iterating over a single stream) and it appears that each element is delivered to precisely one consumer. But we can leave that for later. On a similar note consider whether it makes a difference whether this is a struct or a class.
66
//
7-
// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier.
8-
public struct Subscription<Element>: Sendable, AsyncSequence {
9-
// This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it.
10-
public init<T: AsyncSequence>(mockAsyncSequence _: T) where T.Element == Element {
11-
fatalError("Not implemented")
7+
// TODO: I wanted to implement this as a protocol (from which `MessageSubscription` would then inherit) but struggled to do so, hence the struct. Try again sometime. We can also revisit our implementation of `AsyncSequence` if we migrate to Swift 6, which adds primary types and typed errors to `AsyncSequence` and should make things easier; see https://github.com/ably-labs/ably-chat-swift/issues/21.
8+
public struct Subscription<Element: Sendable>: Sendable, AsyncSequence {
9+
private enum Mode: Sendable {
10+
case `default`(stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation)
11+
case mockAsyncSequence(AnyNonThrowingAsyncSequence)
1212
}
1313

14-
// (The below is just necessary boilerplate to get this to compile; the key point is that `next()` does not have a `throws` annotation.)
14+
/// A type-erased AsyncSequence that doesn’t throw any errors.
15+
fileprivate struct AnyNonThrowingAsyncSequence: AsyncSequence, Sendable {
16+
private var makeAsyncIteratorImpl: @Sendable () -> AsyncIterator
17+
18+
init<T: AsyncSequence & Sendable>(asyncSequence: T) where T.Element == Element {
19+
makeAsyncIteratorImpl = {
20+
AsyncIterator(asyncIterator: asyncSequence.makeAsyncIterator())
21+
}
22+
}
23+
24+
fileprivate struct AsyncIterator: AsyncIteratorProtocol {
25+
private var nextImpl: () async -> Element?
26+
27+
init<T: AsyncIteratorProtocol>(asyncIterator: T) where T.Element == Element {
28+
var iterator = asyncIterator
29+
nextImpl = { () async -> Element? in
30+
do {
31+
return try await iterator.next()
32+
} catch {
33+
fatalError("The AsyncSequence passed to Subscription.init(mockAsyncSequence:) threw an error: \(error). This is not supported.")
34+
}
35+
}
36+
}
37+
38+
mutating func next() async -> Element? {
39+
await nextImpl()
40+
}
41+
}
42+
43+
func makeAsyncIterator() -> AsyncIterator {
44+
makeAsyncIteratorImpl()
45+
}
46+
}
47+
48+
private let mode: Mode
49+
50+
internal init(bufferingPolicy: BufferingPolicy) {
51+
let (stream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: bufferingPolicy.asAsyncStreamBufferingPolicy())
52+
mode = .default(stream: stream, continuation: continuation)
53+
}
54+
55+
// This is a workaround for the fact that, as mentioned above, `Subscription` is a struct when I would have liked it to be a protocol. It allows people mocking our SDK to create a `Subscription` so that they can return it from their mocks. The intention of this initializer is that if you use it, then the created `Subscription` will just replay the sequence that you pass it. It is a programmer error to pass a throwing AsyncSequence.
56+
public init<T: AsyncSequence & Sendable>(mockAsyncSequence: T) where T.Element == Element {
57+
mode = .mockAsyncSequence(.init(asyncSequence: mockAsyncSequence))
58+
}
59+
60+
/**
61+
Causes the subscription to make a new element available on its `AsyncSequence` interface.
62+
63+
It is a programmer error to call this when the receiver was created using ``init(mockAsyncSequence:)``.
64+
*/
65+
internal func emit(_ element: Element) {
66+
switch mode {
67+
case let .default(_, continuation):
68+
continuation.yield(element)
69+
case .mockAsyncSequence:
70+
fatalError("`emit` cannot be called on a Subscription that was created using init(mockAsyncSequence:)")
71+
}
72+
}
1573

1674
public struct AsyncIterator: AsyncIteratorProtocol {
75+
fileprivate enum Mode {
76+
case `default`(iterator: AsyncStream<Element>.AsyncIterator)
77+
case mockAsyncSequence(iterator: AnyNonThrowingAsyncSequence.AsyncIterator)
78+
79+
mutating func next() async -> Element? {
80+
switch self {
81+
case var .default(iterator: iterator):
82+
let next = await iterator.next()
83+
self = .default(iterator: iterator)
84+
return next
85+
case var .mockAsyncSequence(iterator: iterator):
86+
let next = await iterator.next()
87+
self = .mockAsyncSequence(iterator: iterator)
88+
return next
89+
}
90+
}
91+
}
92+
93+
private var mode: Mode
94+
95+
fileprivate init(mode: Mode) {
96+
self.mode = mode
97+
}
98+
1799
public mutating func next() async -> Element? {
18-
fatalError("Not implemented")
100+
await mode.next()
19101
}
20102
}
21103

22104
public func makeAsyncIterator() -> AsyncIterator {
23-
fatalError("Not implemented")
105+
let iteratorMode: AsyncIterator.Mode = switch mode {
106+
case let .default(stream: stream, continuation: _):
107+
.default(iterator: stream.makeAsyncIterator())
108+
case let .mockAsyncSequence(asyncSequence):
109+
.mockAsyncSequence(iterator: asyncSequence.makeAsyncIterator())
110+
}
111+
112+
return .init(mode: iteratorMode)
24113
}
25114
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
@testable import AblyChat
2+
import AsyncAlgorithms
3+
import XCTest
4+
5+
class SubscriptionTests: XCTestCase {
6+
func testWithMockAsyncSequence() async {
7+
let subscription = Subscription(mockAsyncSequence: ["First", "Second"].async)
8+
9+
async let emittedElements = Array(subscription.prefix(2))
10+
11+
let awaitedEmittedElements = await emittedElements
12+
XCTAssertEqual(awaitedEmittedElements, ["First", "Second"])
13+
}
14+
15+
func testEmit() async {
16+
let subscription = Subscription<String>(bufferingPolicy: .unbounded)
17+
18+
async let emittedElements = Array(subscription.prefix(2))
19+
20+
subscription.emit("First")
21+
subscription.emit("Second")
22+
23+
let awaitedEmittedElements = await emittedElements
24+
XCTAssertEqual(awaitedEmittedElements, ["First", "Second"])
25+
}
26+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
@testable import AblyChat
2+
import AsyncAlgorithms
3+
import XCTest
4+
5+
private final class MockPaginatedResult<T>: PaginatedResult {
6+
var items: [T] { fatalError("Not implemented") }
7+
8+
var hasNext: Bool { fatalError("Not implemented") }
9+
10+
var isLast: Bool { fatalError("Not implemented") }
11+
12+
var next: (any AblyChat.PaginatedResult<T>)? { fatalError("Not implemented") }
13+
14+
var first: any AblyChat.PaginatedResult<T> { fatalError("Not implemented") }
15+
16+
var current: any AblyChat.PaginatedResult<T> { fatalError("Not implemented") }
17+
18+
init() {}
19+
}
20+
21+
class MessageSubscriptionTests: XCTestCase {
22+
let messages = ["First", "Second"].map {
23+
Message(timeserial: "", clientID: "", roomID: "", text: $0, createdAt: .now, metadata: [:], headers: [:])
24+
}
25+
26+
func testWithMockAsyncSequence() async {
27+
let subscription = MessageSubscription(mockAsyncSequence: messages.async) { _ in fatalError("Not implemented") }
28+
29+
async let emittedElements = Array(subscription.prefix(2))
30+
31+
let awaitedEmittedElements = await emittedElements
32+
XCTAssertEqual(awaitedEmittedElements.map(\.text), ["First", "Second"])
33+
}
34+
35+
func testEmit() async {
36+
let subscription = MessageSubscription(bufferingPolicy: .unbounded)
37+
38+
async let emittedElements = Array(subscription.prefix(2))
39+
40+
subscription.emit(messages[0])
41+
subscription.emit(messages[1])
42+
43+
let awaitedEmittedElements = await emittedElements
44+
XCTAssertEqual(awaitedEmittedElements.map(\.text), ["First", "Second"])
45+
}
46+
47+
func funcTestMockGetPreviousMessages() async throws {
48+
let mockPaginatedResult = MockPaginatedResult<Message>()
49+
let subscription = MessageSubscription(mockAsyncSequence: [].async) { _ in mockPaginatedResult }
50+
51+
let result = try await subscription.getPreviousMessages(params: .init())
52+
XCTAssertIdentical(result, mockPaginatedResult)
53+
}
54+
}

0 commit comments

Comments
 (0)