Skip to content

Commit

Permalink
Merge pull request #171 from ably/152-implement-async-room-get-spec
Browse files Browse the repository at this point in the history
[ECO-5148] Implement “async room get” spec points
  • Loading branch information
lawrence-forooghian authored Dec 4, 2024
2 parents ed4c028 + e7612c9 commit 5ea07db
Show file tree
Hide file tree
Showing 5 changed files with 564 additions and 47 deletions.
14 changes: 13 additions & 1 deletion Sources/AblyChat/Errors.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public enum ErrorCode: Int {
case roomIsReleasing = 102_102
case roomIsReleased = 102_103

case roomReleasedBeforeOperationCompleted = 102_106

case roomInInvalidState = 102_107

/// Has a case for each of the ``ErrorCode`` cases that imply a fixed status code.
Expand All @@ -50,6 +52,7 @@ public enum ErrorCode: Int {
case roomInFailedState
case roomIsReleasing
case roomIsReleased
case roomReleasedBeforeOperationCompleted

internal var toNumericErrorCode: ErrorCode {
switch self {
Expand Down Expand Up @@ -81,6 +84,8 @@ public enum ErrorCode: Int {
.roomIsReleasing
case .roomIsReleased:
.roomIsReleased
case .roomReleasedBeforeOperationCompleted:
.roomReleasedBeforeOperationCompleted
}
}

Expand All @@ -91,7 +96,8 @@ public enum ErrorCode: Int {
case .inconsistentRoomOptions,
.roomInFailedState,
.roomIsReleasing,
.roomIsReleased:
.roomIsReleased,
.roomReleasedBeforeOperationCompleted:
400
case
.messagesAttachmentFailed,
Expand Down Expand Up @@ -162,6 +168,7 @@ internal enum ChatError {
case roomInFailedState
case roomIsReleasing
case roomIsReleased
case roomReleasedBeforeOperationCompleted
case presenceOperationRequiresRoomAttach(feature: RoomFeature)
case roomTransitionedToInvalidStateForPresenceOperation(cause: ARTErrorInfo?)

Expand Down Expand Up @@ -201,6 +208,8 @@ internal enum ChatError {
.fixedStatusCode(.roomIsReleasing)
case .roomIsReleased:
.fixedStatusCode(.roomIsReleased)
case .roomReleasedBeforeOperationCompleted:
.fixedStatusCode(.roomReleasedBeforeOperationCompleted)
case .roomTransitionedToInvalidStateForPresenceOperation:
// CHA-RL9c
.variableStatusCode(.roomInInvalidState, statusCode: 500)
Expand Down Expand Up @@ -260,6 +269,8 @@ internal enum ChatError {
"Cannot perform operation because the room is in a releasing state."
case .roomIsReleased:
"Cannot perform operation because the room is in a released state."
case .roomReleasedBeforeOperationCompleted:
"Room was released before the operation could complete."
case let .presenceOperationRequiresRoomAttach(feature):
"To perform this \(Self.descriptionOfFeature(feature)) operation, you must first attach the room."
case .roomTransitionedToInvalidStateForPresenceOperation:
Expand All @@ -280,6 +291,7 @@ internal enum ChatError {
.roomInFailedState,
.roomIsReleasing,
.roomIsReleased,
.roomReleasedBeforeOperationCompleted,
.presenceOperationRequiresRoomAttach:
nil
}
Expand Down
272 changes: 251 additions & 21 deletions Sources/AblyChat/Rooms.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,55 @@ internal actor DefaultRooms<RoomFactory: AblyChat.RoomFactory>: Rooms {
private let logger: InternalLogger
private let roomFactory: RoomFactory

/// The set of rooms, keyed by room ID.
private var rooms: [String: RoomFactory.Room] = [:]
/// All the state that a `DefaultRooms` instance might hold for a given room ID.
private enum RoomState {
/// There is no room map entry (see ``RoomMapEntry`` for meaning of this term) for this room ID, but a CHA-RC1g release operation is in progress.
case releaseOperationInProgress(releaseTask: Task<Void, Never>)

/// There is a room map entry for this room ID.
case roomMapEntry(RoomMapEntry)
}

/// An entry in the “room map” that CHA-RC1f and CHA-RC1g refer to.
private enum RoomMapEntry {
/// The room has been requested, but is awaiting the completion of a CHA-RC1g release operation.
case requestAwaitingRelease(
// A task which provides the result of the pending release operation.
releaseTask: Task<Void, Never>,
// The options with which the room was requested.
requestedOptions: RoomOptions,
// A task that will return the result of this room fetch request.
creationTask: Task<RoomFactory.Room, Error>,
// Calling this function will cause `creationTask` to fail with the given error.
failCreation: @Sendable (Error) -> Void
)

/// The room has been created.
case created(room: RoomFactory.Room)

/// The room options that correspond to this room map entry (either the options that were passed to the pending room fetch request, or the options of the created room).
var roomOptions: RoomOptions {
switch self {
case let .requestAwaitingRelease(_, requestedOptions: options, _, _):
options
case let .created(room):
room.options
}
}

/// Returns the room which this room map entry corresponds to. If the room map entry represents a pending request, it will return or throw with the result of this request.
func waitForRoom() async throws -> RoomFactory.Room {
switch self {
case let .requestAwaitingRelease(_, _, creationTask: creationTask, _):
try await creationTask.value
case let .created(room):
room
}
}
}

/// The value for a given room ID is the state that corresponds to that room ID.
private var roomStates: [String: RoomState] = [:]

internal init(realtime: RealtimeClient, clientOptions: ClientOptions, logger: InternalLogger, roomFactory: RoomFactory) {
self.realtime = realtime
Expand All @@ -32,39 +79,222 @@ internal actor DefaultRooms<RoomFactory: AblyChat.RoomFactory>: Rooms {
chatAPI = ChatAPI(realtime: realtime)
}

/// The types of operation that this instance can perform.
internal enum OperationType {
/// A call to ``get(roomID:options:)``.
case get
/// A call to ``release(roomID:)``.
case release
}

#if DEBUG
internal struct OperationWaitEvent {
internal var waitingOperationType: OperationType
internal var waitedOperationType: OperationType
}

// TODO: clean up old subscriptions (https://github.com/ably-labs/ably-chat-swift/issues/36)
/// Supports the ``testsOnly_subscribeToOperationWaitEvents()`` method.
private var operationWaitEventSubscriptions: [Subscription<OperationWaitEvent>] = []

/// Returns a subscription which emits an event each time one operation is going to wait for another to complete.
internal func testsOnly_subscribeToOperationWaitEvents() -> Subscription<OperationWaitEvent> {
let subscription = Subscription<OperationWaitEvent>(bufferingPolicy: .unbounded)
operationWaitEventSubscriptions.append(subscription)
return subscription
}

private func emitOperationWaitEvent(waitingOperationType: OperationType, waitedOperationType: OperationType) {
let operationWaitEvent = OperationWaitEvent(waitingOperationType: waitingOperationType, waitedOperationType: waitedOperationType)
for subscription in operationWaitEventSubscriptions {
subscription.emit(operationWaitEvent)
}
}
#endif

internal func get(roomID: String, options: RoomOptions) async throws -> any Room {
// CHA-RC1b
if let existingRoom = rooms[roomID] {
if existingRoom.options != options {
throw ARTErrorInfo(
chatError: .inconsistentRoomOptions(requested: options, existing: existingRoom.options)
if let existingRoomState = roomStates[roomID] {
switch existingRoomState {
case let .roomMapEntry(existingRoomMapEntry):
// CHA-RC1f1
if existingRoomMapEntry.roomOptions != options {
throw ARTErrorInfo(
chatError: .inconsistentRoomOptions(requested: options, existing: existingRoomMapEntry.roomOptions)
)
}

// CHA-RC1f2
logger.log(message: "Waiting for room from existing room map entry \(existingRoomMapEntry)", level: .debug)

#if DEBUG
emitOperationWaitEvent(waitingOperationType: .get, waitedOperationType: .get)
#endif

do {
let room = try await existingRoomMapEntry.waitForRoom()
logger.log(message: "Completed waiting for room from existing room map entry \(existingRoomMapEntry)", level: .debug)
return room
} catch {
logger.log(message: "Got error \(error) waiting for room from existing room map entry \(existingRoomMapEntry)", level: .debug)
throw error
}
case let .releaseOperationInProgress(releaseTask: releaseTask):
let creationFailureFunctions = makeCreationFailureFunctions()

let creationTask = Task {
logger.log(message: "At start of room creation task", level: .debug)

// We wait for the first of the following events:
//
// - a creation failure is externally signalled, in which case we throw the corresponding error
// - the in-progress release operation completes
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
try await creationFailureFunctions.throwAnySignalledCreationFailure()
}

group.addTask { [logger] in
// This task is rather messy but its aim can be summarised as the following:
//
// - if releaseTask completes, then complete
// - if the task is cancelled, then do not propagate the cancellation to releaseTask (because we haven’t properly thought through whether it can handle task cancellation; see existing TODO: https://github.com/ably/ably-chat-swift/issues/29), and do not wait for releaseTask to complete (because the CHA-RC1g4 failure is meant to happen immediately, not only once the release operation completes)

logger.log(message: "Room creation waiting for completion of release operation", level: .debug)
#if DEBUG
await self.emitOperationWaitEvent(waitingOperationType: .get, waitedOperationType: .release)
#endif

let (stream, continuation) = AsyncStream<Void>.makeStream()
Task.detached { // detached so as not to propagate task cancellation
// CHA-RC1f4
await releaseTask.value
continuation.yield(())
continuation.finish()
}

if await (stream.contains { _ in true }) {
logger.log(message: "Room creation completed waiting for completion of release operation", level: .debug)
} else {
// Task was cancelled
logger.log(message: "Room creation stopped waiting for completion of release operation", level: .debug)
}
}

// This pattern for waiting for the first of multiple tasks to complete is taken from here:
// https://forums.swift.org/t/accept-the-first-task-to-complete/54386
defer { group.cancelAll() }
try await group.next()
}

return try await createRoom(roomID: roomID, options: options)
}

roomStates[roomID] = .roomMapEntry(
.requestAwaitingRelease(
releaseTask: releaseTask,
requestedOptions: options,
creationTask: creationTask,
failCreation: creationFailureFunctions.failCreation
)
)
}

return existingRoom
} else {
let room = try await roomFactory.createRoom(realtime: realtime, chatAPI: chatAPI, roomID: roomID, options: options, logger: logger)
rooms[roomID] = room
return room
return try await creationTask.value
}
}

// CHA-RC1f3
return try await createRoom(roomID: roomID, options: options)
}

/// Creates two functions, `failCreation` and `throwAnySignalledCreationFailure`. The latter is an async function that waits until the former is called with an error as an argument; it then throws this error.
private func makeCreationFailureFunctions() -> (failCreation: @Sendable (Error) -> Void, throwAnySignalledCreationFailure: @Sendable () async throws -> Void) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Void.self, throwing: Error.self)

return (
failCreation: { @Sendable [logger] (error: Error) in
logger.log(message: "Recieved request to fail room creation with error \(error)", level: .debug)
continuation.finish(throwing: error)
},
throwAnySignalledCreationFailure: { @Sendable [logger] in
logger.log(message: "Waiting for room creation failure request", level: .debug)
do {
try await stream.first { _ in true }
} catch {
logger.log(message: "Wait for room creation failure request gave error \(error)", level: .debug)
throw error
}
logger.log(message: "Wait for room creation failure request completed without error", level: .debug)
}
)
}

private func waitForOperation(_ operationTask: Task<Void, Never>, waitingOperationType: OperationType, waitedOperationType: OperationType) async {
logger.log(message: "\(waitingOperationType) operation waiting for in-progress \(waitedOperationType) operation to complete", level: .debug)
#if DEBUG
emitOperationWaitEvent(waitingOperationType: waitingOperationType, waitedOperationType: waitedOperationType)
#endif
await operationTask.value
logger.log(message: "\(waitingOperationType) operation completed waiting for in-progress \(waitedOperationType) operation to complete", level: .debug)
}

private func createRoom(roomID: String, options: RoomOptions) async throws -> RoomFactory.Room {
logger.log(message: "Creating room with ID \(roomID), options \(options)", level: .debug)
let room = try await roomFactory.createRoom(realtime: realtime, chatAPI: chatAPI, roomID: roomID, options: options, logger: logger)
roomStates[roomID] = .roomMapEntry(.created(room: room))
return room
}

#if DEBUG
internal func testsOnly_hasExistingRoomWithID(_ roomID: String) -> Bool {
rooms[roomID] != nil
internal func testsOnly_hasRoomMapEntryWithID(_ roomID: String) -> Bool {
guard let roomState = roomStates[roomID] else {
return false
}

return if case .roomMapEntry = roomState {
true
} else {
false
}
}
#endif

internal func release(roomID: String) async throws {
guard let room = rooms[roomID] else {
// TODO: what to do here? (https://github.com/ably/specification/pull/200/files#r1837154563) — Andy replied that it’s a no-op but that this is going to be specified in an upcoming PR when we make room-getting async
guard let roomState = roomStates[roomID] else {
// CHA-RC1g2 (no-op)
return
}

// CHA-RC1d
rooms.removeValue(forKey: roomID)
switch roomState {
case let .releaseOperationInProgress(releaseTask):
// CHA-RC1g3
await waitForOperation(releaseTask, waitingOperationType: .release, waitedOperationType: .release)
case let .roomMapEntry(
.requestAwaitingRelease(
releaseTask: releaseTask,
_,
_,
failCreation: failCreation
)
):
// CHA-RC1g4
logger.log(message: "Release operation requesting failure of in-progress room creation request", level: .debug)
failCreation(ARTErrorInfo(chatError: .roomReleasedBeforeOperationCompleted))
await waitForOperation(releaseTask, waitingOperationType: .release, waitedOperationType: .release)
case let .roomMapEntry(.created(room: room)):
let releaseTask = Task {
logger.log(message: "Release operation waiting for room release operation to complete", level: .debug)
// Clear the `.releaseOperationInProgress` state (written in a `defer` in case `room.release()` becomes throwing in the future)
defer { roomStates.removeValue(forKey: roomID) }
await room.release()
logger.log(message: "Release operation completed waiting for room release operation to complete", level: .debug)
}

// Note that, since we’re in an actor, we expect `releaseTask` to always be executed _after_ this synchronous code section, meaning that the `roomStates` mutations happen in the correct order

// This also achieves CHA-RC1g5 (remove room from room map)
roomStates[roomID] = .releaseOperationInProgress(releaseTask: releaseTask)

// CHA-RL1e
await room.release()
await releaseTask.value
}
}
}
Loading

0 comments on commit 5ea07db

Please sign in to comment.