diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift index 9e70c658a5..670f130f70 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift @@ -165,14 +165,15 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { // Placing the actual subscription work in a deferred task and // promptly returning the filtered publisher for downstream consumption of all error messages. defer { - Task { [weak self] in + let task = Task { [weak self] in guard let self = self else { return } if !(await self.isConnected) { try await connect() try await waitForState(.connected) } - await self.bindCancellableToConnection(try await self.startSubscription(id)) - }.toAnyCancellable.store(in: &cancellablesBindToConnection) + await self.storeInConnectionCancellables(try await self.startSubscription(id)) + } + self.storeInConnectionCancellables(task.toAnyCancellable) } return filterAppSyncSubscriptionEvent(with: id) @@ -236,24 +237,29 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { } private func subscribeToWebSocketEvent() async { - await self.webSocketClient.publisher.sink { [weak self] _ in + let cancellable = await self.webSocketClient.publisher.sink { [weak self] _ in self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated") } receiveValue: { webSocketEvent in Task { [weak self] in - await self?.onWebSocketEvent(webSocketEvent) - }.toAnyCancellable.store(in: &self.cancellables) + let task = Task { [weak self] in + await self?.onWebSocketEvent(webSocketEvent) + } + await self?.storeInCancellables(task.toAnyCancellable) + } } - .store(in: &cancellables) + self.storeInCancellables(cancellable) } private func resumeExistingSubscriptions() { log.debug("[AppSyncRealTimeClient] Resuming existing subscriptions") for (id, _) in self.subscriptions { - Task { + Task { [weak self] in do { - try await self.startSubscription(id).store(in: &cancellablesBindToConnection) + if let cancellable = try await self?.startSubscription(id) { + await self?.storeInConnectionCancellables(cancellable) + } } catch { - log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))") + Self.log.debug("[AppSyncRealTimeClient] Failed to resume existing subscription with id: (\(id))") } } } @@ -340,10 +346,6 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { return errors.compactMap(AppSyncRealTimeRequest.parseResponseError(error:)) } - private func bindCancellableToConnection(_ cancellable: AnyCancellable) { - cancellable.store(in: &cancellablesBindToConnection) - } - } // MARK: - On WebSocket Events @@ -356,8 +358,11 @@ extension AppSyncRealTimeClient { if self.state.value == .connectionDropped { log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop") Task { [weak self] in - try? await self?.connect() - }.toAnyCancellable.store(in: &cancellablesBindToConnection) + let task = Task { [weak self] in + try? await self?.connect() + } + await self?.storeInConnectionCancellables(task.toAnyCancellable) + } } case let .disconnected(closeCode, reason): // @@ -418,21 +423,34 @@ extension AppSyncRealTimeClient { private func monitorHeartBeats(_ connectionAck: JSONValue?) { let timeoutMs = connectionAck?.connectionTimeoutMs?.intValue ?? 0 log.debug("[AppSyncRealTimeClient] Starting heart beat monitor with interval \(timeoutMs) ms") - heartBeats.eraseToAnyPublisher() + let cancellable = heartBeats.eraseToAnyPublisher() .debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global()) .first() - .sink(receiveValue: { - self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting") + .sink(receiveValue: { [weak self] in + Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting") Task { [weak self] in - await self?.reconnect() - }.toAnyCancellable.store(in: &self.cancellables) + let task = Task { [weak self] in + await self?.reconnect() + } + await self?.storeInCancellables(task.toAnyCancellable) + } }) - .store(in: &cancellablesBindToConnection) + self.storeInConnectionCancellables(cancellable) // start counting down heartBeats.send(()) } } +extension AppSyncRealTimeClient { + private func storeInCancellables(_ cancellable: AnyCancellable) { + self.cancellables.insert(cancellable) + } + + private func storeInConnectionCancellables(_ cancellable: AnyCancellable) { + self.cancellablesBindToConnection.insert(cancellable) + } +} + extension Publisher where Output == AppSyncRealTimeSubscription.State, Failure == Never { func toAppSyncSubscriptionEventStream() -> AnyPublisher { self.compactMap { subscriptionState -> AppSyncSubscriptionEvent? in