From 4ab0a44e9c3b6f4577bcd4529612c023526f0b4c Mon Sep 17 00:00:00 2001 From: Defxult <52070034+Defxult@users.noreply.github.com> Date: Mon, 16 Oct 2023 18:24:35 -0400 Subject: [PATCH] Additional gateway disconnect updates This also adds parameter `clearCache` to method `Bot.disconnect()` --- Sources/Discord/Discord.swift | 14 ++-- Sources/Discord/Gateway.swift | 154 +++++++++++++++++----------------- 2 files changed, 83 insertions(+), 85 deletions(-) diff --git a/Sources/Discord/Discord.swift b/Sources/Discord/Discord.swift index f84025d25..e2526b061 100644 --- a/Sources/Discord/Discord.swift +++ b/Sources/Discord/Discord.swift @@ -347,7 +347,7 @@ public class Bot { /// Connect to Discord. public func connect() { if !isConnected { - try! app.eventLoopGroup.any().makeFutureWithTask { + try! loop.makeFutureWithTask { try! await self.gw.startNewSession() }.wait() if let onceExecute { @@ -427,13 +427,15 @@ public class Bot { } } - /// Disconnect from Discord and clears the cache. - public func disconnect() { + /// Disconnect from Discord. + /// - Parameter clearCache: Whether to clear the cache once disconnected. + public func disconnect(clearCache: Bool = true) { if isConnected { - try! gw.ws.close(code: .normalClosure).wait() + isConnected = false // needs to be before ws.close() + gw.ws.close(code: .normalClosure, promise: nil) gw.resetGatewayValues(withCancel: true) - clearCache() - isConnected = false + gw.heartbeatTask = nil + if clearCache { self.clearCache() } } } diff --git a/Sources/Discord/Gateway.swift b/Sources/Discord/Gateway.swift index 1c09c3438..9625b457b 100644 --- a/Sources/Discord/Gateway.swift +++ b/Sources/Discord/Gateway.swift @@ -137,37 +137,29 @@ extension Array { } } -fileprivate enum HeartbeatResponse { - - /// Heartbeat was sent and ACK by Discord. - case acknowledged - - /// Heartbeat was sent but NOT ACK by Discord. - case noResponse - - /// Response was reset after a heartbeat was sent and ACK by Discord (or initial value at startup) - case reset +fileprivate enum ConnectionType { + case reconnect + case session } class Gateway { var ws: WebSocket! - var heartbeatTask: RepeatedTask? = nil + var heartbeatTask: RepeatedTask? var initialState: InitialState? private var bot: Bot private var wsResume: ResumePayload? private var gp: GatewayPayload? private var shards: Int? + private var clientHeartbearScheduler: Scheduled? private var heartbeatInterval = 0 - private var receivedReconnectRequest = false - private var nilClose = false - private var heartbeatAllowed = true - private var clientHeartbeatResponse = HeartbeatResponse.reset + private var idenitifyAllowed = true + private var connectionOpen = false private let elg: EventLoopGroup private let loop: EventLoop - private let danglingClose: UInt16 = 9900 + private let codeDelayedClose: UInt16 = 9900 private let settings = ( port: 443, query: "v=10&encoding=json", @@ -180,13 +172,6 @@ class Gateway { self.loop = elg.any() } - /// Close the connection with anything other than code 1000 or 1001 and reconnect. - private func danglingReconnect(log: String) async throws { - Log.message(log) - try await ws.close(code: .unknown(danglingClose)) - await reconnect() - } - /// Sends the RESUME payload for reconnects. private func reconnect() async { let resumePayload: JSON = [ @@ -221,6 +206,7 @@ class Gateway { private func websocketSetup(websocket: WebSocket) { ws = websocket receive() + connectionOpen = true ws.onClose.whenComplete { _ in try! self.websocketClosed() } } @@ -228,7 +214,7 @@ class Gateway { private func startHeartbeat(log: String) { Log.message(log) heartbeatTask?.cancel() - heartbeatTask = loop.scheduleRepeatedTask(initialDelay: .seconds(20), delay: .milliseconds(Int64(heartbeatInterval)), { _ in + heartbeatTask = loop.scheduleRepeatedTask(initialDelay: .seconds(10), delay: .milliseconds(Int64(heartbeatInterval)), { _ in self.sendHeartbeat() }) } @@ -242,19 +228,21 @@ class Gateway { sendFrame(payload) Log.message("heartbeat sent") - // Verify the heartbeat was ACK'd. Discord ACKs are pretty much instant - // so a 1.1 second delay should be more than enough. - loop.scheduleTask(in: .milliseconds(1100)) { [self] in - if clientHeartbeatResponse == .acknowledged { - clientHeartbeatResponse = .reset - Log.message("heartbeat ACK (client)") - } else { - clientHeartbeatResponse = .noResponse - Task { - try await danglingReconnect(log: "client did not receive HEARTBEAT ACK after a heartbeat was sent - attempting dangling reconnecting...") - } + let promise = loop.makePromise(of: Void.self) + let task = promise.completeWithTask { + await sleep(5000) + guard let _ = try? Task.checkCancellation() else { return } + // There's a chance the connection could be closed mid sleep, so only execute if it's open + if self.connectionOpen { + Log.message("client did not receive HEARTBEAT ACK after a heartbeat was sent - reconnecting...") + self.newConnection(type: .reconnect) } } + + clientHeartbearScheduler = .init(promise: promise, cancellationTask: { + task.cancel() + Log.message("heartbeat ACK (client)") + }) } private func suspendHeartbeat(log: String) { @@ -267,11 +255,22 @@ class Gateway { ws.send(data.serialize) } - /// Reset core gateway values so a proper reconnect can occur. + /// Open a new connection. Either a reconnect or a new session. + private func newConnection(type: ConnectionType) { + _ = loop.makeFutureWithTask { + switch type { + case .reconnect: + await self.reconnect() + case .session: + try! await self.startNewSession() + } + } + } + + /// Reset core gateway values so a proper reconnect/new session can occur. func resetGatewayValues(withCancel: Bool) { - receivedReconnectRequest = false - nilClose = false - clientHeartbeatResponse = .reset + idenitifyAllowed = true + connectionOpen = false if withCancel { heartbeatTask?.cancel() } @@ -304,15 +303,11 @@ class Gateway { ).get() } - /// Handles when the websocket is closed by Discord. + /// Handles when the websocket is closed. private func websocketClosed() throws { + connectionOpen = false suspendHeartbeat(log: "gateway disconnected") - func new(log: String) { - Log.message(log + " - starting new session...") - Task { try await startNewSession() } - } - if let code = ws.closeCode { switch code { case .unknown(let cc): @@ -331,12 +326,16 @@ class Gateway { throw GatewayError.alreadyAuthenticated("more than one identify payload was sent") case 4007: // GatewayError.invalidSequence can reconnect with a new session - new(log: "gateway error: Invalid sequence") + Log.message("gateway error: Invalid sequence - starting new session") + newConnection(type: .session) + return case 4008: throw GatewayError.rateLimited("too many payloads are being sent") case 4009: // GatewayError.sessionTimedOut can reconnect with a new session - new(log: "gateway error: Session timed out") + Log.message("gateway error: Session timed out - starting new session") + newConnection(type: .session) + return case 4010: throw GatewayError.invalidShard("an invalid shard was sent when identifying") case 4011: @@ -351,24 +350,29 @@ class Gateway { not enabled or are not approved for. Verify your privileged intents are enabled in your developer portal. """ throw GatewayError.disallowedIntents(message) - case danglingClose: - // Ignore dangling close code - break default: - Log.fatal("unhandled Discord websocket close code (\(cc))") + Log.fatal("unhandled websocket close code (\(cc))") } default: - break + Log.message("close code (\(code)) - ignoring") } + } + + if ws.closeCode == nil { + // A new session must be started here. If the connection is closed with a nil close code, + // vapor disconnected (for whatever reason). If that occurs, simply reconnecting somehow makes + // the stability of the connection even worse by having it disconnect randomly more frequently, + // sometimes significantly more frequent. Creating a brand new connection "refreshes" connection + // stability. + Log.message("connection closed with nil close code - starting new session...") + newConnection(type: .session) } else { - // This must be here because WebSocket.onClose() dispatches this method on every Discord reconnect request - if bot.isConnected && !receivedReconnectRequest { - nilClose = true - Task { - try await self.danglingReconnect(log: "gateway disconnected with nil close code - bot connected/no reconnect request - attempting dangling reconnect...") - } - } else { - Log.fatal("gateway disconnected with nil close code - unknown/unhandled reason") + // If the end user did not terminate the connection, we should always reconnect. + // Also, if this is reached, that means it did not `return`. If it returned, a new + // session was required, not a reconnect. Additionally, .isConnected will never be + // set to `false` unless done so by the end user via Bot.disconnect() or Bot.close() + if bot.isConnected { + newConnection(type: .reconnect) } } } @@ -401,15 +405,11 @@ class Gateway { } case Opcode.hello: - let reset = { [self] (log: String) -> Void in - Log.message(log) - resetGatewayValues(withCancel: false) - startHeartbeat(log: "heartbeat reset after reconnect") + guard idenitifyAllowed else { + Log.message("received HELLO when IDENTIFY is not allowed - ignoring") + startHeartbeat(log: "heartbeat resumed from previous disconnect") + return } - // Discord states there's no need to re-identify after a RECONNECT - if receivedReconnectRequest { reset("received HELLO from RECONNECT REQUEST - ignoring"); return } - else if nilClose { reset("received HELLO from NIL CLOSE RECONNECT - ignoring"); return } - else if clientHeartbeatResponse == .noResponse { reset("received HELLO from FAILED HEARTBEAT ACK RECONNECT - ignoring"); return } Log.message("received HELLO - identifying...") let opHello = HTTPClient.strJsonToDict(message) @@ -437,16 +437,14 @@ class Gateway { ] sendFrame(identity) + idenitifyAllowed = false startHeartbeat(log: "starting INDEFINITE HEARTBEAT from IDENTIFY") case Opcode.invalidSession: - Log.message("gateway SESSION INVALIDATED - starting new session...") - Task { try await startNewSession() } + Log.message("gateway SESSION INVALIDATED - waiting for disconnect and automatic new session...") case Opcode.reconnect: - Log.message("gateway requested RECONNECT - reconnecting...") - receivedReconnectRequest = true - Task { await reconnect() } + Log.message("gateway requested RECONNECT - waiting for disconnect and automatic reconnect...") case Opcode.heartbeat: Log.message("gateway requested HEARTBEAT - sending heartbeat...") @@ -454,7 +452,8 @@ class Gateway { case Opcode.heartbeatAck: Log.message("heartbeat ACK") - clientHeartbeatResponse = .acknowledged + clientHeartbearScheduler?.cancel() + clientHeartbearScheduler = nil default: break @@ -516,13 +515,10 @@ class Gateway { Log.message("gateway resumed") case .reconnect: - Log.message("received RECONNECT via DISPATCH - reconnecting...") - receivedReconnectRequest = true - await reconnect() + Log.message("received RECONNECT via DISPATCH - waiting for disconnect and automatic reconnect...") case .invalidSession: - Log.message("received SESSION INVALIDATED via DISPATCH - starting new session...") - try! await startNewSession() + Log.message("received SESSION INVALIDATED via DISPATCH - waiting for disconnect and automatic new session...") case .applicationCommandPermissionsUpdate: let permissions = GuildApplicationCommandPermissions(guildAppCommandPermData: data)