Skip to content

Commit

Permalink
Merge pull request #182 from ably/172-subscribe-without-buffering-policy
Browse files Browse the repository at this point in the history
[ECO-5167] Make `bufferingPolicy` parameter optional in public API
  • Loading branch information
lawrence-forooghian authored Dec 5, 2024
2 parents 2d7f42a + 403292c commit d6adda5
Show file tree
Hide file tree
Showing 23 changed files with 133 additions and 45 deletions.
12 changes: 6 additions & 6 deletions Example/AblyChatExample/ContentView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ struct ContentView: View {
}

func showMessages() async throws {
let messagesSubscription = try await room().messages.subscribe(bufferingPolicy: .unbounded)
let messagesSubscription = try await room().messages.subscribe()
let previousMessages = try await messagesSubscription.getPreviousMessages(params: .init())

for message in previousMessages.items {
Expand All @@ -189,7 +189,7 @@ struct ContentView: View {
}

func showReactions() async throws {
let reactionSubscription = try await room().reactions.subscribe(bufferingPolicy: .unbounded)
let reactionSubscription = try await room().reactions.subscribe()

// Continue listening for reactions on a background task so this function can return
Task {
Expand Down Expand Up @@ -219,7 +219,7 @@ struct ContentView: View {
}

func showTypings() async throws {
let typingSubscription = try await room().typing.subscribe(bufferingPolicy: .unbounded)
let typingSubscription = try await room().typing.subscribe()
// Continue listening for typing events on a background task so this function can return
Task {
for await typing in typingSubscription {
Expand All @@ -241,7 +241,7 @@ struct ContentView: View {
}

Task {
for await event in try await room().occupancy.subscribe(bufferingPolicy: .unbounded) {
for await event in try await room().occupancy.subscribe() {
withAnimation {
occupancyInfo = "Connections: \(event.presenceMembers) (\(event.connections))"
}
Expand All @@ -250,7 +250,7 @@ struct ContentView: View {
}

func printConnectionStatusChange() async {
let connectionSubsciption = chatClient.connection.onStatusChange(bufferingPolicy: .unbounded)
let connectionSubsciption = chatClient.connection.onStatusChange()

// Continue listening for connection status change on a background task so this function can return
Task {
Expand All @@ -263,7 +263,7 @@ struct ContentView: View {
func showRoomStatus() async throws {
// Continue listening for status change events on a background task so this function can return
Task {
for await status in try await room().onStatusChange(bufferingPolicy: .unbounded) {
for await status in try await room().onStatusChange() {
withAnimation {
if status.current.isAttaching {
statusInfo = "\(status.current)...".capitalized
Expand Down
14 changes: 7 additions & 7 deletions Example/AblyChatExample/Mocks/MockClients.swift
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ actor MockMessages: Messages {
return message
}

func subscribeToDiscontinuities() -> Subscription<DiscontinuityEvent> {
func subscribeToDiscontinuities(bufferingPolicy _: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
fatalError("Not yet implemented")
}
}
Expand Down Expand Up @@ -195,7 +195,7 @@ actor MockRoomReactions: RoomReactions {
.init(mockAsyncSequence: createSubscription())
}

func subscribeToDiscontinuities() -> Subscription<DiscontinuityEvent> {
func subscribeToDiscontinuities(bufferingPolicy _: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
fatalError("Not yet implemented")
}
}
Expand Down Expand Up @@ -244,7 +244,7 @@ actor MockTyping: Typing {
}
}

func subscribeToDiscontinuities() -> Subscription<DiscontinuityEvent> {
func subscribeToDiscontinuities(bufferingPolicy _: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
fatalError("Not yet implemented")
}
}
Expand Down Expand Up @@ -340,15 +340,15 @@ actor MockPresence: Presence {
}
}

func subscribe(event _: PresenceEventType) -> Subscription<PresenceEvent> {
func subscribe(event _: PresenceEventType, bufferingPolicy _: BufferingPolicy) -> Subscription<PresenceEvent> {
.init(mockAsyncSequence: createSubscription())
}

func subscribe(events _: [PresenceEventType]) -> Subscription<PresenceEvent> {
func subscribe(events _: [PresenceEventType], bufferingPolicy _: BufferingPolicy) -> Subscription<PresenceEvent> {
.init(mockAsyncSequence: createSubscription())
}

func subscribeToDiscontinuities() -> Subscription<DiscontinuityEvent> {
func subscribeToDiscontinuities(bufferingPolicy _: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
fatalError("Not yet implemented")
}
}
Expand Down Expand Up @@ -383,7 +383,7 @@ actor MockOccupancy: Occupancy {
OccupancyEvent(connections: 10, presenceMembers: 5)
}

func subscribeToDiscontinuities() -> Subscription<DiscontinuityEvent> {
func subscribeToDiscontinuities(bufferingPolicy _: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
fatalError("Not yet implemented")
}
}
Expand Down
2 changes: 1 addition & 1 deletion Example/AblyChatExample/Mocks/MockSubscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct MockSubscription<T: Sendable>: Sendable, AsyncSequence {
}

init(randomElement: @escaping @Sendable () -> Element, interval: Double) {
let (stream, continuation) = AsyncStream.makeStream(of: Element.self, bufferingPolicy: .unbounded)
let (stream, continuation) = AsyncStream.makeStream(of: Element.self)
self.continuation = continuation
let timer: AsyncTimerSequence<ContinuousClock> = .init(interval: .seconds(interval), clock: .init())
mergedSequence = merge(stream, timer.map { _ in
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ public protocol Connection: AnyObject, Sendable {
// TODO: (https://github.com/ably-labs/ably-chat-swift/issues/12): consider how to avoid the need for an unwrap
var error: ARTErrorInfo? { get async }
func onStatusChange(bufferingPolicy: BufferingPolicy) -> Subscription<ConnectionStatusChange>
/// Same as calling ``onStatusChange(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Connection` protocol provides a default implementation of this method.
func onStatusChange() -> Subscription<ConnectionStatusChange>
}

public extension Connection {
func onStatusChange() -> Subscription<ConnectionStatusChange> {
onStatusChange(bufferingPolicy: .unbounded)
}
}

public enum ConnectionStatus: Sendable {
Expand Down
4 changes: 2 additions & 2 deletions Sources/AblyChat/DefaultMessages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ internal final class DefaultMessages: Messages, EmitsDiscontinuities {
}

// (CHA-M7) Users may subscribe to discontinuity events to know when there’s been a break in messages that they need to resolve. Their listener will be called when a discontinuity event is triggered from the room lifecycle.
internal func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities()
internal func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities(bufferingPolicy: bufferingPolicy)
}

private func getBeforeSubscriptionStart(_ uuid: UUID, params: QueryOptions) async throws -> any PaginatedResult<Message> {
Expand Down
4 changes: 2 additions & 2 deletions Sources/AblyChat/DefaultOccupancy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ internal final class DefaultOccupancy: Occupancy, EmitsDiscontinuities {
}

// (CHA-O5) Users may subscribe to discontinuity events to know when there’s been a break in occupancy. Their listener will be called when a discontinuity event is triggered from the room lifecycle. For occupancy, there shouldn’t need to be user action as most channels will send occupancy updates regularly as clients churn.
internal func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities()
internal func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities(bufferingPolicy: bufferingPolicy)
}
}
12 changes: 6 additions & 6 deletions Sources/AblyChat/DefaultPresence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {

// (CHA-PR7a) Users may provide a listener to subscribe to all presence events in a room.
// (CHA-PR7b) Users may provide a listener and a list of selected presence events, to subscribe to just those events in a room.
internal func subscribe(event: PresenceEventType) async -> Subscription<PresenceEvent> {
internal func subscribe(event: PresenceEventType, bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent> {
logger.log(message: "Subscribing to presence events", level: .debug)
let subscription = Subscription<PresenceEvent>(bufferingPolicy: .unbounded)
let subscription = Subscription<PresenceEvent>(bufferingPolicy: bufferingPolicy)
channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in
logger.log(message: "Received presence message: \(message)", level: .debug)
Task {
Expand All @@ -178,9 +178,9 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
return subscription
}

internal func subscribe(events: [PresenceEventType]) async -> Subscription<PresenceEvent> {
internal func subscribe(events: [PresenceEventType], bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent> {
logger.log(message: "Subscribing to presence events", level: .debug)
let subscription = Subscription<PresenceEvent>(bufferingPolicy: .unbounded)
let subscription = Subscription<PresenceEvent>(bufferingPolicy: bufferingPolicy)
for event in events {
channel.presence.subscribe(event.toARTPresenceAction()) { [processPresenceSubscribe, logger] message in
logger.log(message: "Received presence message: \(message)", level: .debug)
Expand All @@ -194,8 +194,8 @@ internal final class DefaultPresence: Presence, EmitsDiscontinuities {
}

// (CHA-PR8) Users may subscribe to discontinuity events to know when there’s been a break in presence. Their listener will be called when a discontinuity event is triggered from the room lifecycle. For presence, there shouldn’t need to be user action as the underlying core SDK will heal the presence set.
internal func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities()
internal func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities(bufferingPolicy: bufferingPolicy)
}

private func decodePresenceData(from data: Any?) -> PresenceData? {
Expand Down
4 changes: 2 additions & 2 deletions Sources/AblyChat/DefaultRoomLifecycleContributor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ internal actor DefaultRoomLifecycleContributor: RoomLifecycleContributor, EmitsD
}
}

internal func subscribeToDiscontinuities() -> Subscription<DiscontinuityEvent> {
let subscription = Subscription<DiscontinuityEvent>(bufferingPolicy: .unbounded)
internal func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) -> Subscription<DiscontinuityEvent> {
let subscription = Subscription<DiscontinuityEvent>(bufferingPolicy: bufferingPolicy)
// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
discontinuitySubscriptions.append(subscription)
return subscription
Expand Down
4 changes: 2 additions & 2 deletions Sources/AblyChat/DefaultRoomReactions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ internal final class DefaultRoomReactions: RoomReactions, EmitsDiscontinuities {
}

// (CHA-ER5) Users may subscribe to discontinuity events to know when there’s been a break in reactions that they need to resolve. Their listener will be called when a discontinuity event is triggered from the room lifecycle.
internal func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities()
internal func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities(bufferingPolicy: bufferingPolicy)
}

private enum RoomReactionsError: Error {
Expand Down
6 changes: 3 additions & 3 deletions Sources/AblyChat/DefaultTyping.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ internal final class DefaultTyping: Typing {

// (CHA-T6) Users may subscribe to typing events – updates to a set of clientIDs that are typing. This operation, like all subscription operations, has no side-effects in relation to room lifecycle.
internal func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription<TypingEvent> {
let subscription = Subscription<TypingEvent>(bufferingPolicy: .unbounded)
let subscription = Subscription<TypingEvent>(bufferingPolicy: bufferingPolicy)
let eventTracker = EventTracker()

channel.presence.subscribe { [weak self] message in
Expand Down Expand Up @@ -160,8 +160,8 @@ internal final class DefaultTyping: Typing {
}

// (CHA-T7) Users may subscribe to discontinuity events to know when there’s been a break in typing indicators. Their listener will be called when a discontinuity event is triggered from the room lifecycle. For typing, there shouldn’t need to be user action as the underlying core SDK will heal the presence set.
internal func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities()
internal func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent> {
await featureChannel.subscribeToDiscontinuities(bufferingPolicy: bufferingPolicy)
}

private func processPresenceGet(members: [ARTPresenceMessage]?, error: ARTErrorInfo?) throws -> Set<String> {
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/EmitsDiscontinuities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,15 @@ public struct DiscontinuityEvent: Sendable, Equatable {
}

public protocol EmitsDiscontinuities {
func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent>
/// Same as calling ``subscribeToDiscontinuities(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `EmitsDiscontinuities` protocol provides a default implementation of this method.
func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent>
}

public extension EmitsDiscontinuities {
func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent> {
await subscribeToDiscontinuities(bufferingPolicy: .unbounded)
}
}
10 changes: 10 additions & 0 deletions Sources/AblyChat/Messages.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,21 @@ import Ably

public protocol Messages: AnyObject, Sendable, EmitsDiscontinuities {
func subscribe(bufferingPolicy: BufferingPolicy) async throws -> MessageSubscription
/// Same as calling ``subscribe(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Messages` protocol provides a default implementation of this method.
func subscribe() async throws -> MessageSubscription
func get(options: QueryOptions) async throws -> any PaginatedResult<Message>
func send(params: SendMessageParams) async throws -> Message
var channel: RealtimeChannelProtocol { get }
}

public extension Messages {
func subscribe() async throws -> MessageSubscription {
try await subscribe(bufferingPolicy: .unbounded)
}
}

public struct SendMessageParams: Sendable {
public var text: String
public var metadata: MessageMetadata?
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/Occupancy.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@ import Ably

public protocol Occupancy: AnyObject, Sendable, EmitsDiscontinuities {
func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription<OccupancyEvent>
/// Same as calling ``subscribe(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Occupancy` protocol provides a default implementation of this method.
func subscribe() async -> Subscription<OccupancyEvent>
func get() async throws -> OccupancyEvent
var channel: RealtimeChannelProtocol { get }
}

public extension Occupancy {
func subscribe() async -> Subscription<OccupancyEvent> {
await subscribe(bufferingPolicy: .unbounded)
}
}

// (CHA-O2) The occupancy event format is shown here (https://sdk.ably.com/builds/ably/specification/main/chat-features/#chat-structs-occupancy-event)
public struct OccupancyEvent: Sendable, Encodable, Decodable {
public var connections: Int
Expand Down
18 changes: 18 additions & 0 deletions Sources/AblyChat/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,28 @@ public protocol Presence: AnyObject, Sendable, EmitsDiscontinuities {
func enter(data: PresenceData?) async throws
func update(data: PresenceData?) async throws
func leave(data: PresenceData?) async throws
func subscribe(event: PresenceEventType, bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent>
/// Same as calling ``subscribe(event:bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Presence` protocol provides a default implementation of this method.
func subscribe(event: PresenceEventType) async -> Subscription<PresenceEvent>
func subscribe(events: [PresenceEventType], bufferingPolicy: BufferingPolicy) async -> Subscription<PresenceEvent>
/// Same as calling ``subscribe(events:bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Presence` protocol provides a default implementation of this method.
func subscribe(events: [PresenceEventType]) async -> Subscription<PresenceEvent>
}

public extension Presence {
func subscribe(event: PresenceEventType) async -> Subscription<PresenceEvent> {
await subscribe(event: event, bufferingPolicy: .unbounded)
}

func subscribe(events: [PresenceEventType]) async -> Subscription<PresenceEvent> {
await subscribe(events: events, bufferingPolicy: .unbounded)
}
}

public struct PresenceMember: Sendable {
public enum Action: Sendable {
case present
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/Room.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,21 @@ public protocol Room: AnyObject, Sendable {
// TODO: change to `status`
var status: RoomStatus { get async }
func onStatusChange(bufferingPolicy: BufferingPolicy) async -> Subscription<RoomStatusChange>
/// Same as calling ``onStatusChange(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `Room` protocol provides a default implementation of this method.
func onStatusChange() async -> Subscription<RoomStatusChange>
func attach() async throws
func detach() async throws
var options: RoomOptions { get }
}

public extension Room {
func onStatusChange() async -> Subscription<RoomStatusChange> {
await onStatusChange(bufferingPolicy: .unbounded)
}
}

/// A ``Room`` that exposes additional functionality for use within the SDK.
internal protocol InternalRoom: Room {
func release() async
Expand Down
4 changes: 2 additions & 2 deletions Sources/AblyChat/RoomFeature.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ internal struct DefaultFeatureChannel: FeatureChannel {
internal var contributor: DefaultRoomLifecycleContributor
internal var roomLifecycleManager: RoomLifecycleManager

internal func subscribeToDiscontinuities() async -> Subscription<DiscontinuityEvent> {
await contributor.subscribeToDiscontinuities()
internal func subscribeToDiscontinuities(bufferingPolicy: BufferingPolicy) async -> Subscription<DiscontinuityEvent> {
await contributor.subscribeToDiscontinuities(bufferingPolicy: bufferingPolicy)
}

internal func waitToBeAbleToPerformPresenceOperations(requestedByFeature requester: RoomFeature) async throws(ARTErrorInfo) {
Expand Down
10 changes: 10 additions & 0 deletions Sources/AblyChat/RoomReactions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ public protocol RoomReactions: AnyObject, Sendable, EmitsDiscontinuities {
func send(params: SendReactionParams) async throws
var channel: RealtimeChannelProtocol { get }
func subscribe(bufferingPolicy: BufferingPolicy) async -> Subscription<Reaction>
/// Same as calling ``subscribe(bufferingPolicy:)`` with ``BufferingPolicy.unbounded``.
///
/// The `RoomReactions` protocol provides a default implementation of this method.
func subscribe() async -> Subscription<Reaction>
}

public extension RoomReactions {
func subscribe() async -> Subscription<Reaction> {
await subscribe(bufferingPolicy: .unbounded)
}
}

public struct SendReactionParams: Sendable {
Expand Down
Loading

0 comments on commit d6adda5

Please sign in to comment.