diff --git a/Sources/Discord/Discord.swift b/Sources/Discord/Discord.swift index 5ca9815c0..88798292b 100644 --- a/Sources/Discord/Discord.swift +++ b/Sources/Discord/Discord.swift @@ -348,7 +348,6 @@ public class Bot { try! app.eventLoopGroup.any().makeFutureWithTask { try! await self.gw.startNewSession() }.wait() - isConnected = true if let onceExecute { Task { await onceExecute() } self.onceExecute = nil @@ -419,7 +418,7 @@ public class Bot { public func disconnect() { if isConnected { try! gw.ws.close(code: .normalClosure).wait() - gw.resetGatewayValues() + gw.resetGatewayValues(withCancel: true) clearCache() isConnected = false } diff --git a/Sources/Discord/Gateway.swift b/Sources/Discord/Gateway.swift index 7a6d3510c..754eb9e9f 100644 --- a/Sources/Discord/Gateway.swift +++ b/Sources/Discord/Gateway.swift @@ -148,8 +148,16 @@ class Gateway { private var shards: Int? private var heartbeatInterval = 0 private var receivedReconnectRequest = false + private var nilClose = false + + // Client heartbeat ACKs have 3 values: + // n > 0 heartbeat was sent and ACK by Discord + // n == -1 client heartbeat ACK was reset after a successfull heartbeat ACK (or initial value at startup) + // n == 0 heartbeat ACK was NOT sent by Discord after a heartbeat was sent + private var clientHeartbeatAcks = -1 private let elg: EventLoopGroup + private let danglingClose: UInt16 = 9900 private let settings = ( port: 443, query: "v=10&encoding=json", @@ -161,6 +169,13 @@ class Gateway { self.elg = elg } + /// Close the connection with anything other than code 1000 or 1001 and reconnect. + private func danglingReconnect(log: String) async throws { + Log.message(log) + try await ws.close(code: .unknown(danglingClose)) + await reconnect() + } + /// Sends the RESUME payload for reconnects. private func reconnect() async { let resumePayload: JSON = [ @@ -219,6 +234,20 @@ class Gateway { "d": wsResume == nil ? NIL : wsResume!.sequence ] sendFrame(payload) + + // Verify the heartbeat was ACK'd. Discord ACKs are pretty much instant + // so a 1.1 second delay should be more than enough. + elg.any().scheduleTask(in: .milliseconds(1100)) { + if self.clientHeartbeatAcks > 0 { + self.clientHeartbeatAcks = -1 + Log.message("heartbeat ACK (client)") + } else { + self.clientHeartbeatAcks = 0 + Task { + try await self.danglingReconnect(log: "client did not receive HEARTBEAT ACK after a heartbeat was sent - attempting dangling reconnecting...") + } + } + } } /// Shortcut for sending data (as `ByteBuffer`) through the websocket. @@ -227,9 +256,13 @@ class Gateway { } /// Reset core gateway values so a proper reconnect can occur. - func resetGatewayValues() { + func resetGatewayValues(withCancel: Bool) { receivedReconnectRequest = false - heartbeatTask?.cancel() + nilClose = false + clientHeartbeatAcks = -1 + if withCancel { + heartbeatTask?.cancel() + } } /// Simply splits the scheme and host from the standard get gateway bot/resume URLs. @@ -242,9 +275,9 @@ class Gateway { let results = try await bot.http.getGatewayBot() let cmps = extractURLComponents(results.info) - resetGatewayValues() + resetGatewayValues(withCancel: true) shards = results.shards - + try await WebSocket.connect( scheme: cmps.scheme, host: cmps.host, @@ -260,65 +293,75 @@ class Gateway { } /// Handles when the websocket is closed by Discord. - func websocketClosed() throws { + private func websocketClosed() throws { + + func new(log: String) { + Log.message(log + " - starting new session...") + Task { try await startNewSession() } + } + if let code = ws.closeCode { - let end = " - starting new session..." switch code { case .unknown(let cc): switch cc { case 4000: - throw GatewayError.unknownError("Something went wrong") + throw GatewayError.unknownError("something went wrong") case 4001: - throw GatewayError.unknownOpcode("An invalid opcode was sent") + throw GatewayError.unknownOpcode("an invalid opcode was sent") case 4002: - throw GatewayError.decodeError("An invalid payload was sent") + throw GatewayError.decodeError("an invalid payload was sent") case 4003: - throw GatewayError.notAuthenticated("A payload was sent prior to identifying") + throw GatewayError.notAuthenticated("a payload was sent prior to identifying") case 4004: - throw GatewayError.authenticationFailed("The token sent with the identify payload was incorrect") + throw GatewayError.authenticationFailed("the token sent with the identify payload was incorrect") case 4005: - throw GatewayError.alreadyAuthenticated("More than one identify payload was sent") + throw GatewayError.alreadyAuthenticated("more than one identify payload was sent") case 4007: // GatewayError.invalidSequence can reconnect with a new session - Log.message("gateway error: Invalid sequence" + end) - Task { try await self.startNewSession() } + new(log: "gateway error: Invalid sequence") case 4008: - throw GatewayError.rateLimited("Too many payloads are being sent") + throw GatewayError.rateLimited("too many payloads are being sent") case 4009: // GatewayError.sessionTimedOut can reconnect with a new session - Log.message("gateway error: Session timed out" + end) - Task { try await self.startNewSession() } + new(log: "gateway error: Session timed out") case 4010: - throw GatewayError.invalidShard("An invalid shard was sent when identifying") + throw GatewayError.invalidShard("an invalid shard was sent when identifying") case 4011: - throw GatewayError.shardingRequired("Sharding your connection is required in order to connect") + throw GatewayError.shardingRequired("sharding your connection is required in order to connect") case 4012: - throw GatewayError.invalidApiVersion("An invalid version of the gateway was sent") + throw GatewayError.invalidApiVersion("an invalid version of the gateway was sent") case 4013: - throw GatewayError.invalidIntents("Invalid intents were sent") + throw GatewayError.invalidIntents("invalid intents were sent") case 4014: let message = """ A disallowed intent was sent. An intent may have been specified that you have \ not enabled or are not approved for. Verify your privileged intents are enabled in your developer portal. """ throw GatewayError.disallowedIntents(message) + case danglingClose: + // Ignore dangling close code + break default: - return + Log.fatal("unhandled Discord websocket close code (\(cc))") } default: - // The only "unknown" error that *should* happen is `.normalClosure` via `Bot.disconnect()` or an - // external connection issue Everything else is unexpected. - if code != .normalClosure { - Log.message("unexpected gateway error (\(code)) dispatched by WebSocket" + end) - Task { try await self.startNewSession() } + break + } + } else { + // This must be here because WebSocket.onClose() dispatches this method on every Discord reconnect request + if bot.isConnected && !receivedReconnectRequest { + nilClose = true + Task { + try await self.danglingReconnect(log: "gateway disconnected with nil close code - bot connected/no reconnect request - attempting dangling reconnect...") } - bot.listeners.forEachAsync { await $0.onDisconnect() } + } else { + Log.fatal("gateway disconnected with nil close code - unknown/unhandled reason") } } } /// Handle all incoming gateway events. - func receive() { + private func receive() { ws.onText { [unowned self] (_, message) in defer { receive() } @@ -329,7 +372,8 @@ class Gateway { case Opcode.dispatch: if let event = GatewayEvent(rawValue: gatewayPayload.t!) { if event == .ready { - Log.message("READY - initializing state") + bot.isConnected = true + Log.message("received READY - initializing cache") wsResume = ResumePayload(data: resumePayload) wsResume?.resumeGatewayURL = gatewayPayload.d?["resume_gateway_url"] as? String bot.user = ClientUser(bot: bot, clientUserData: gatewayPayload.d!["user"] as! JSON) @@ -344,11 +388,14 @@ class Gateway { } case Opcode.hello: - // Discord states there's no need to re-identify after a RECONNECT - guard !receivedReconnectRequest else { - Log.message("received HELLO from RECONNECT - ignoring") - return + let reset = { (log: String) -> Void in + Log.message(log) + self.resetGatewayValues(withCancel: false) } + // Discord states there's no need to re-identify after a RECONNECT + if receivedReconnectRequest { reset("received HELLO from RECONNECT - ignoring"); return } + else if nilClose { reset("received HELLO from NIL CLOSE RECONNECT - ignoring"); return } + else if clientHeartbeatAcks == 0 { reset("received HELLO from FAILED HEARTBEAT ACK RECONNECT - ignoring"); return } Log.message("received HELLO - identifying...") let opHello = HTTPClient.strJsonToDict(message) @@ -400,6 +447,7 @@ class Gateway { case Opcode.heartbeatAck: Log.message("heartbeat ACK") + clientHeartbeatAcks += 2 default: break @@ -437,11 +485,8 @@ class Gateway { private func dispatchAndUpdate(event: GatewayEvent, data: JSON) async { func getGuildId(_ data: JSON) -> Snowflake { - if data.keys.contains(where: { $0 == "guild_id" }) { - return Conversions.snowflakeToUInt(data["guild_id"]) - } else { - Log.fatal("guild_id was not found in JSON response") - } + precondition(data.keys.contains("guild_id"), "guild_id was not found in JSON response") + return Conversions.snowflakeToUInt(data["guild_id"]) } let dispatch = bot.listeners.forEachAsync @@ -465,6 +510,7 @@ class Gateway { case .reconnect: Log.message("received RECONNECT via DISPATCH - reconnecting...") + receivedReconnectRequest = true await reconnect() case .invalidSession: