Skip to content

Commit

Permalink
Make bufferingPolicy parameter optional in public API
Browse files Browse the repository at this point in the history
When I introduced the public API in 20e7f5f, I didn’t want users to be
in a situation where there was an buffer of unbounded size growing
without them even knowing about it, and I hence forced them to be
explicit about their choice of buffering policy.

But, a couple of things:

- Marat pointed out that this makes our code examples and README look
  quite cluttered, with `bufferingPolicy` arguments all over the place.
  This distracts from demonstrating the functionality of the library.

- Swift’s own `AsyncStream` defaults to a buffering policy of
  `.unbounded`. If doing so is good enough for them, then it’s probably
  good enough for us.

So, default to `.unbounded`.

Resolves #172.
  • Loading branch information
lawrence-forooghian committed Dec 5, 2024
1 parent 2703782 commit 403292c
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 20 deletions.
14 changes: 7 additions & 7 deletions Example/AblyChatExample/ContentView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ struct ContentView: View {
}

func showMessages() async throws {
let messagesSubscription = try await room().messages.subscribe(bufferingPolicy: .unbounded)
let messagesSubscription = try await room().messages.subscribe()
let previousMessages = try await messagesSubscription.getPreviousMessages(params: .init())

for message in previousMessages.items {
Expand All @@ -189,7 +189,7 @@ struct ContentView: View {
}

func showReactions() async throws {
let reactionSubscription = try await room().reactions.subscribe(bufferingPolicy: .unbounded)
let reactionSubscription = try await room().reactions.subscribe()

// Continue listening for reactions on a background task so this function can return
Task {
Expand All @@ -206,7 +206,7 @@ struct ContentView: View {

// Continue listening for new presence events on a background task so this function can return
Task {
for await event in try await room().presence.subscribe(events: [.enter, .leave, .update], bufferingPolicy: .unbounded) {
for await event in try await room().presence.subscribe(events: [.enter, .leave, .update]) {
withAnimation {
let status = event.data?.userCustomData?["status"]?.value as? String
let clientPresenceChangeMessage = "\(event.clientID) \(event.action.displayedText)"
Expand All @@ -219,7 +219,7 @@ struct ContentView: View {
}

func showTypings() async throws {
let typingSubscription = try await room().typing.subscribe(bufferingPolicy: .unbounded)
let typingSubscription = try await room().typing.subscribe()
// Continue listening for typing events on a background task so this function can return
Task {
for await typing in typingSubscription {
Expand All @@ -241,7 +241,7 @@ struct ContentView: View {
}

Task {
for await event in try await room().occupancy.subscribe(bufferingPolicy: .unbounded) {
for await event in try await room().occupancy.subscribe() {
withAnimation {
occupancyInfo = "Connections: \(event.presenceMembers) (\(event.connections))"
}
Expand All @@ -250,7 +250,7 @@ struct ContentView: View {
}

func printConnectionStatusChange() async {
let connectionSubsciption = chatClient.connection.onStatusChange(bufferingPolicy: .unbounded)
let connectionSubsciption = chatClient.connection.onStatusChange()

// Continue listening for connection status change on a background task so this function can return
Task {
Expand All @@ -263,7 +263,7 @@ struct ContentView: View {
func showRoomStatus() async throws {
// Continue listening for status change events on a background task so this function can return
Task {
for await status in try await room().onStatusChange(bufferingPolicy: .unbounded) {
for await status in try await room().onStatusChange() {
withAnimation {
if status.current.isAttaching {
statusInfo = "\(status.current)...".capitalized
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ public protocol Connection: AnyObject, Sendable {
// TODO: (https://github.com/ably-labs/ably-chat-swift/issues/12): consider how to avoid the need for an unwrap
var error: ARTErrorInfo? { get async }
func onStatusChange(bufferingPolicy: BufferingPolicy) -> Subscription<ConnectionStatusChange>
/// Same as calling ``onStatusChange(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Connection` protocol provides a default implementation of this method.
func onStatusChange() -> Subscription<ConnectionStatusChange>
}

public extension Connection {
func onStatusChange() -> Subscription<ConnectionStatusChange> {
onStatusChange(bufferingPolicy: .unbounded)
}
}

public enum ConnectionStatus: Sendable {
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/EmitsDiscontinuities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,14 @@ public struct DiscontinuityEvent: Sendable, Equatable {

public protocol EmitsDiscontinuities {
func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent>
/// Same as calling ``subscribeToDiscontinuities(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `EmitsDiscontinuities` protocol provides a default implementation of this method.
func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent>
}

public extension EmitsDiscontinuities {
func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent> {
await subscribeToDiscontinuities(bufferingPolicy: .unbounded)
}
}
10 changes: 10 additions & 0 deletions Sources/AblyChat/Messages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ import Ably

public protocol Messages: AnyObject, Sendable, EmitsDiscontinuities {
func subscribe(bufferingPolicy: BufferingPolicy) async throws -> MessageSubscription
/// Same as calling ``subscribe(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Messages` protocol provides a default implementation of this method.
func subscribe() async throws -> MessageSubscription
func get(options: QueryOptions) async throws -> any PaginatedResult<Message>
func send(params: SendMessageParams) async throws -> Message
var channel: RealtimeChannelProtocol { get }
}

public extension Messages {
func subscribe() async throws -> MessageSubscription {
try await subscribe(bufferingPolicy: .unbounded)
}
}

public struct SendMessageParams: Sendable {
public var text: String
public var metadata: MessageMetadata?
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/Occupancy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@ import Ably

public protocol Occupancy: AnyObject, Sendable, EmitsDiscontinuities {
func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription<OccupancyEvent>
/// Same as calling ``subscribe(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Occupancy` protocol provides a default implementation of this method.
func subscribe() async -> Subscription<OccupancyEvent>
func get() async throws -> OccupancyEvent
var channel: RealtimeChannelProtocol { get }
}

public extension Occupancy {
func subscribe() async -> Subscription<OccupancyEvent> {
await subscribe(bufferingPolicy: .unbounded)
}
}

// (CHA-O2) The occupancy event format is shown here (https://sdk.ably.com/builds/ably/specification/main/chat-features/#chat-structs-occupancy-event)
public struct OccupancyEvent: Sendable, Encodable, Decodable {
public var connections: Int
Expand Down
18 changes: 18 additions & 0 deletions Sources/AblyChat/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,25 @@ public protocol Presence: AnyObject, Sendable, EmitsDiscontinuities {
func update(data: PresenceData?) async throws
func leave(data: PresenceData?) async throws
func subscribe(event: PresenceEventType, bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent>
/// Same as calling ``subscribe(event:bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Presence` protocol provides a default implementation of this method.
func subscribe(event: PresenceEventType) async -> Subscription<PresenceEvent>
func subscribe(events: [PresenceEventType], bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent>
/// Same as calling ``subscribe(events:bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Presence` protocol provides a default implementation of this method.
func subscribe(events: [PresenceEventType]) async -> Subscription<PresenceEvent>
}

public extension Presence {
func subscribe(event: PresenceEventType) async -> Subscription<PresenceEvent> {
await subscribe(event: event, bufferingPolicy: .unbounded)
}

func subscribe(events: [PresenceEventType]) async -> Subscription<PresenceEvent> {
await subscribe(events: events, bufferingPolicy: .unbounded)
}
}

public struct PresenceMember: Sendable {
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ public protocol Room: AnyObject, Sendable {
// TODO: change to `status`
var status: RoomStatus { get async }
func onStatusChange(bufferingPolicy: BufferingPolicy) async -> Subscription<RoomStatusChange>
/// Same as calling ``onStatusChange(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Room` protocol provides a default implementation of this method.
func onStatusChange() async -> Subscription<RoomStatusChange>
func attach() async throws
func detach() async throws
var options: RoomOptions { get }
}

public extension Room {
func onStatusChange() async -> Subscription<RoomStatusChange> {
await onStatusChange(bufferingPolicy: .unbounded)
}
}

/// A ``Room`` that exposes additional functionality for use within the SDK.
internal protocol InternalRoom: Room {
func release() async
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/RoomReactions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ public protocol RoomReactions: AnyObject, Sendable, EmitsDiscontinuities {
func send(params: SendReactionParams) async throws
var channel: RealtimeChannelProtocol { get }
func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription<Reaction>
/// Same as calling ``subscribe(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `RoomReactions` protocol provides a default implementation of this method.
func subscribe() async -> Subscription<Reaction>
}

public extension RoomReactions {
func subscribe() async -> Subscription<Reaction> {
await subscribe(bufferingPolicy: .unbounded)
}
}

public struct SendReactionParams: Sendable {
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/Typing.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@ import Ably

public protocol Typing: AnyObject, Sendable, EmitsDiscontinuities {
func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription<TypingEvent>
/// Same as calling ``subscribe(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Typing` protocol provides a default implementation of this method.
func subscribe() async -> Subscription<TypingEvent>
func get() async throws -> Set<String>
func start() async throws
func stop() async throws
var channel: RealtimeChannelProtocol { get }
}

public extension Typing {
func subscribe() async -> Subscription<TypingEvent> {
await subscribe(bufferingPolicy: .unbounded)
}
}

public struct TypingEvent: Sendable {
public var currentlyTyping: Set<String>

Expand Down
6 changes: 3 additions & 3 deletions Tests/AblyChatTests/DefaultMessagesTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ struct DefaultMessagesTests {
// Then
await #expect(throws: ARTErrorInfo.create(withCode: 40000, status: 400, message: "channel is attached, but channelSerial is not defined"), performing: {
// When
try await defaultMessages.subscribe(bufferingPolicy: .unbounded)
try await defaultMessages.subscribe()
})
}

Expand Down Expand Up @@ -56,7 +56,7 @@ struct DefaultMessagesTests {
)
let featureChannel = MockFeatureChannel(channel: channel)
let defaultMessages = await DefaultMessages(featureChannel: featureChannel, chatAPI: chatAPI, roomID: "basketball", clientID: "clientId", logger: TestLogger())
let subscription = try await defaultMessages.subscribe(bufferingPolicy: .unbounded)
let subscription = try await defaultMessages.subscribe()
let expectedPaginatedResult = PaginatedResultWrapper<Message>(
paginatedResponse: MockHTTPPaginatedResponse.successGetMessagesWithNoItems,
items: []
Expand All @@ -81,7 +81,7 @@ struct DefaultMessagesTests {

// When: The feature channel emits a discontinuity through `subscribeToDiscontinuities`
let featureChannelDiscontinuity = DiscontinuityEvent(error: ARTErrorInfo.createUnknownError() /* arbitrary */ )
let messagesDiscontinuitySubscription = await messages.subscribeToDiscontinuities(bufferingPolicy: .unbounded)
let messagesDiscontinuitySubscription = await messages.subscribeToDiscontinuities()
await featureChannel.emitDiscontinuity(featureChannelDiscontinuity)

// Then: The DefaultMessages instance emits this discontinuity through `subscribeToDiscontinuities`
Expand Down
4 changes: 2 additions & 2 deletions Tests/AblyChatTests/DefaultRoomReactionsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct DefaultRoomReactionsTests {
let defaultRoomReactions = await DefaultRoomReactions(featureChannel: featureChannel, clientID: "mockClientId", roomID: "basketball", logger: TestLogger())

// When
let subscription: Subscription<Reaction>? = await defaultRoomReactions.subscribe(bufferingPolicy: .unbounded)
let subscription: Subscription<Reaction>? = await defaultRoomReactions.subscribe()

// Then
#expect(subscription != nil)
Expand All @@ -72,7 +72,7 @@ struct DefaultRoomReactionsTests {

// When: The feature channel emits a discontinuity through `subscribeToDiscontinuities`
let featureChannelDiscontinuity = DiscontinuityEvent(error: ARTErrorInfo.createUnknownError() /* arbitrary */ )
let messagesDiscontinuitySubscription = await roomReactions.subscribeToDiscontinuities(bufferingPolicy: .unbounded)
let messagesDiscontinuitySubscription = await roomReactions.subscribeToDiscontinuities()
await featureChannel.emitDiscontinuity(featureChannelDiscontinuity)

// Then: The DefaultRoomReactions instance emits this discontinuity through `subscribeToDiscontinuities`
Expand Down
2 changes: 1 addition & 1 deletion Tests/AblyChatTests/DefaultRoomTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ struct DefaultRoomTests {

// When: The room lifecycle manager emits a status change through `subscribeToState`
let managerStatusChange = RoomStatusChange(current: .detached, previous: .detaching) // arbitrary
let roomStatusSubscription = await room.onStatusChange(bufferingPolicy: .unbounded)
let roomStatusSubscription = await room.onStatusChange()
await lifecycleManager.emitStatusChange(managerStatusChange)

// Then: The room emits this status change through `onStatusChange`
Expand Down
14 changes: 7 additions & 7 deletions Tests/AblyChatTests/IntegrationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ struct IntegrationTests {
)

// (3) Subscribe to room status
let rxRoomStatusSubscription = await rxRoom.onStatusChange(bufferingPolicy: .unbounded)
let rxRoomStatusSubscription = await rxRoom.onStatusChange()

// (4) Attach the room so we can receive messages on it
try await rxRoom.attach()
Expand All @@ -98,7 +98,7 @@ struct IntegrationTests {
// (1) Send a message before subscribing to messages, so that later on we can check history works.

// (2) Create a throwaway subscription and wait for it to receive a message. This is to make sure that rxRoom has seen the message that we send here, so that the first message we receive on the subscription created in (5) is that which we’ll send in (6), and not that which we send here.
let throwawayRxMessageSubscription = try await rxRoom.messages.subscribe(bufferingPolicy: .unbounded)
let throwawayRxMessageSubscription = try await rxRoom.messages.subscribe()

// (3) Send the message
let txMessageBeforeRxSubscribe = try await txRoom.messages.send(params: .init(text: "Hello from txRoom, before rxRoom subscribe"))
Expand All @@ -108,7 +108,7 @@ struct IntegrationTests {
#expect(throwawayRxMessage == txMessageBeforeRxSubscribe)

// (5) Subscribe to messages
let rxMessageSubscription = try await rxRoom.messages.subscribe(bufferingPolicy: .unbounded)
let rxMessageSubscription = try await rxRoom.messages.subscribe()

// (6) Now that we’re subscribed to messages, send a message on the other client and check that we receive it on the subscription
let txMessageAfterRxSubscribe = try await txRoom.messages.send(params: .init(text: "Hello from txRoom, after rxRoom subscribe"))
Expand Down Expand Up @@ -152,7 +152,7 @@ struct IntegrationTests {
// MARK: - Reactions

// (1) Subscribe to reactions
let rxReactionSubscription = await rxRoom.reactions.subscribe(bufferingPolicy: .unbounded)
let rxReactionSubscription = await rxRoom.reactions.subscribe()

// (2) Now that we’re subscribed to reactions, send a reaction on the other client and check that we receive it on the subscription
try await txRoom.reactions.send(params: .init(type: "heart"))
Expand All @@ -170,7 +170,7 @@ struct IntegrationTests {
#expect(currentOccupancy.presenceMembers == 0) // not yet entered presence

// (2) Subscribe to occupancy
let rxOccupancySubscription = await rxRoom.occupancy.subscribe(bufferingPolicy: .unbounded)
let rxOccupancySubscription = await rxRoom.occupancy.subscribe()

// (3) Attach the room so we can perform presence operations
try await txRoom.attach()
Expand Down Expand Up @@ -202,7 +202,7 @@ struct IntegrationTests {
// MARK: - Presence

// (1) Subscribe to presence
let rxPresenceSubscription = await rxRoom.presence.subscribe(events: [.enter, .leave, .update], bufferingPolicy: .unbounded)
let rxPresenceSubscription = await rxRoom.presence.subscribe(events: [.enter, .leave, .update])

// (2) Send `.enter` presence event with custom data on the other client and check that we receive it on the subscription
try await txRoom.presence.enter(data: .init(userCustomData: ["randomData": .string("randomValue")]))
Expand Down Expand Up @@ -243,7 +243,7 @@ struct IntegrationTests {
// MARK: - Typing Indicators

// (1) Subscribe to typing indicators
let rxTypingSubscription = await rxRoom.typing.subscribe(bufferingPolicy: .unbounded)
let rxTypingSubscription = await rxRoom.typing.subscribe()

// (2) Start typing on txRoom and check that we receive the typing event on the subscription
try await txRoom.typing.start()
Expand Down

0 comments on commit 403292c

Please sign in to comment.