Skip to content

Commit

Permalink
fix(api): potential fix for crash in AppSyncRealTimeClient actor
Browse files Browse the repository at this point in the history
  • Loading branch information
5d committed Jun 14, 2024
1 parent b9a7baa commit cb8ce68
Showing 1 changed file with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,14 @@ actor AppSyncRealTimeClient: AppSyncRealTimeClientProtocol {
}

private func subscribeToWebSocketEvent() async {
await self.webSocketClient.publisher.sink { [weak self] _ in
self?.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
} receiveValue: { webSocketEvent in
Task { [weak self] in
await self.webSocketClient.publisher
.flatMap { [weak self] webSocketEvent in Future { [weak self] in
await self?.onWebSocketEvent(webSocketEvent)
}.toAnyCancellable.store(in: &self.cancellables)
}
.store(in: &cancellables)
}}
.sink(receiveCompletion: { _ in
Self.log.debug("[AppSyncRealTimeClient] WebSocketClient terminated")
}, receiveValue: { _ in })
.store(in: &cancellables)
}

private func resumeExistingSubscriptions() {
Expand Down Expand Up @@ -356,7 +356,11 @@ extension AppSyncRealTimeClient {
if self.state.value == .connectionDropped {
log.debug("[AppSyncRealTimeClient] reconnecting appSyncClient after connection drop")
Task { [weak self] in
try? await self?.connect()
do {
try await self?.connect()
} catch {
Self.log.debug("[AppSyncRealTimeClient] failed to connect due to error \(error)")
}
}.toAnyCancellable.store(in: &cancellablesBindToConnection)
}

Expand Down Expand Up @@ -421,12 +425,11 @@ extension AppSyncRealTimeClient {
heartBeats.eraseToAnyPublisher()
.debounce(for: .milliseconds(timeoutMs), scheduler: DispatchQueue.global())
.first()
.sink(receiveValue: {
self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, disconnecting")
Task { [weak self] in
await self?.reconnect()
}.toAnyCancellable.store(in: &self.cancellables)
})
.flatMap { [weak self] in Future { [weak self] in
Self.log.debug("[AppSyncRealTimeClient] KeepAlive timed out, reconnecting")
await self?.reconnect()
}}
.sink(receiveValue: { })
.store(in: &cancellablesBindToConnection)
// start counting down
heartBeats.send(())
Expand Down Expand Up @@ -476,3 +479,13 @@ fileprivate extension Task {
}
}
}

fileprivate extension Future where Failure == Never {
convenience init(asyncTask: @escaping () async -> Output) {
self.init { promise in
Task {
promise(.success(await asyncTask()))
}
}
}
}

0 comments on commit cb8ce68

Please sign in to comment.