diff --git a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift index 9e70c658a5..1b1b1c1e92 100644 --- a/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift +++ b/AmplifyPlugins/API/Sources/AWSAPIPlugin/AppSyncRealTimeClient/AppSyncRealTimeClient.swift @@ -236,14 +236,14 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol { } private func subscribeToWebSocketEvent() async { - await self.webSocketClient.publisher.sink { [weak self] _ in - self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated") - } receiveValue: { webSocketEvent in - Task { [weak self] in + await self.webSocketClient.publisher + .flatMap { [weak self] webSocketEvent in Future { [weak self] in await self?.onWebSocketEvent(webSocketEvent) - }.toAnyCancellable.store(in: &self.cancellables) - } - .store(in: &cancellables) + }} + .sink(receiveCompletion: { _ in + Self.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated") + }, receiveValue: { _ in }) + .store(in: &cancellables) } private func resumeExistingSubscriptions() { @@ -356,7 +356,11 @@ extension AppSyncRealTimeClient { if self.state.value == .connectionDropped { log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop") Task { [weak self] in - try? await self?.connect() + do { + try await self?.connect() + } catch { + Self.log.debug("[AppSyncRealTimeClient] failed to connect due to error \(error)") + } }.toAnyCancellable.store(in: &cancellablesBindToConnection) } @@ -421,12 +425,11 @@ extension AppSyncRealTimeClient { heartBeats.eraseToAnyPublisher() .debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global()) .first() - .sink(receiveValue: { - self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting") - Task { [weak self] in - await self?.reconnect() - }.toAnyCancellable.store(in: &self.cancellables) - }) + .flatMap { [weak self] in Future { [weak self] in + Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, reconnecting") + await self?.reconnect() + }} + .sink(receiveValue: { }) .store(in: &cancellablesBindToConnection) // start counting down heartBeats.send(()) @@ -476,3 +479,13 @@ fileprivate extension Task { } } } + +fileprivate extension Future where Failure == Never { + convenience init(asyncTask: @escaping () async -> Output) { + self.init { promise in + Task { + promise(.success(await asyncTask())) + } + } + } +}