Skip to content

Commit

Permalink
fix(api): storing cancelablles with actor methods in AppSyncRTC
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed Jul 29, 2024
1 parent b3fe944 commit d130d12
Showing 1 changed file with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,15 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
// Placing the actual subscription work in a deferred task and
// promptly returning the filtered publisher for downstream consumption of all error messages.
defer {
Task { [weak self] in
let task = Task { [weak self] in
guard let self = self else { return }
if !(await self.isConnected) {
try await connect()
try await waitForState(.connected)
}
await self.bindCancellableToConnection(try await self.startSubscription(id))
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
await self.storeInConnectionCancellables(try await self.startSubscription(id))
}
self.storeInConnectionCancellables(task.toAnyCancellable)
}

return filterAppSyncSubscriptionEvent(with: id)
Expand Down Expand Up @@ -236,24 +237,29 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
}

private func subscribeToWebSocketEvent() async {
await self.webSocketClient.publisher.sink { [weak self] _ in
let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in
self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
} receiveValue: { webSocketEvent in
Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}
await self?.storeInCancellables(task.toAnyCancellable)
}
}
.store(in: &cancellables)
self.storeInCancellables(cancellable)
}

private func resumeExistingSubscriptions() {
log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions")
for (id, _) in self.subscriptions {
Task {
Task { [weak self] in
do {
try await self.startSubscription(id).store(in: &cancellablesBindToConnection)
if let cancellable = try await self?.startSubscription(id) {
await self?.storeInConnectionCancellables(cancellable)
}
} catch {
log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))")
}
}
}
Expand Down Expand Up @@ -340,10 +346,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:))
}

private func bindCancellableToConnection(_ cancellable: AnyCancellable) {
cancellable.store(in: &cancellablesBindToConnection)
}

}

// MARK: - On WebSocket Events
Expand All @@ -356,8 +358,11 @@ extension AppSyncRealTimeClient {
if self.state.value == .connectionDropped {
log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop")
Task { [weak self] in
try? await self?.connect()
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
let task = Task { [weak self] in
try? await self?.connect()
}
await self?.storeInConnectionCancellables(task.toAnyCancellable)
}
}

case let .disconnected(closeCode, reason): //
Expand Down Expand Up @@ -418,21 +423,34 @@ extension AppSyncRealTimeClient {
private func monitorHeartBeats(_ connectionAck: JSONValue?) {
let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0
log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms")
heartBeats.eraseToAnyPublisher()
let cancellable = heartBeats.eraseToAnyPublisher()
.debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global())
.first()
.sink(receiveValue: {
self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
.sink(receiveValue: { [weak self] in
Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
Task { [weak self] in
await self?.reconnect()
}.toAnyCancellable.store(in: &self.cancellables)
let task = Task { [weak self] in
await self?.reconnect()
}
await self?.storeInCancellables(task.toAnyCancellable)
}
})
.store(in: &cancellablesBindToConnection)
self.storeInConnectionCancellables(cancellable)
// start counting down
heartBeats.send(())
}
}

extension AppSyncRealTimeClient {
private func storeInCancellables(_ cancellable: AnyCancellable) {
self.cancellables.insert(cancellable)
}

private func storeInConnectionCancellables(_ cancellable: AnyCancellable) {
self.cancellablesBindToConnection.insert(cancellable)
}
}

extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never {
func toAppSyncSubscriptionEventStream() -> AnyPublisher<AppSyncSubscriptionEvent, Never> {
self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in
Expand Down

0 comments on commit d130d12

Please sign in to comment.