Skip to content
This repository has been archived by the owner on Jan 3, 2024. It is now read-only.

Commit

Permalink
Fix random gateway disconnects
Browse files Browse the repository at this point in the history
  • Loading branch information
Defxult committed Oct 8, 2023
1 parent a68bef0 commit 16cf349
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 41 deletions.
3 changes: 1 addition & 2 deletions Sources/Discord/Discord.swift
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,6 @@ public class Bot {
try! app.eventLoopGroup.any().makeFutureWithTask {
try! await self.gw.startNewSession()
}.wait()
isConnected = true
if let onceExecute {
Task { await onceExecute() }
self.onceExecute = nil
Expand Down Expand Up @@ -419,7 +418,7 @@ public class Bot {
public func disconnect() {
if isConnected {
try! gw.ws.close(code: .normalClosure).wait()
gw.resetGatewayValues()
gw.resetGatewayValues(withCancel: true)
clearCache()
isConnected = false
}
Expand Down
124 changes: 85 additions & 39 deletions Sources/Discord/Gateway.swift
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,16 @@ class Gateway {
private var shards: Int?
private var heartbeatInterval = 0
private var receivedReconnectRequest = false
private var nilClose = false

// Client heartbeat ACKs have 3 values:
// n > 0 heartbeat was sent and ACK by Discord
// n == -1 client heartbeat ACK was reset after a successfull heartbeat ACK (or initial value at startup)
// n == 0 heartbeat ACK was NOT sent by Discord after a heartbeat was sent
private var clientHeartbeatAcks = -1

private let elg: EventLoopGroup
private let danglingClose: UInt16 = 9900
private let settings = (
port: 443,
query: "v=10&encoding=json",
Expand All @@ -161,6 +169,13 @@ class Gateway {
self.elg = elg
}

/// Close the connection with anything other than code 1000 or 1001 and reconnect.
private func danglingReconnect(log: String) async throws {
Log.message(log)
try await ws.close(code: .unknown(danglingClose))
await reconnect()
}

/// Sends the RESUME payload for reconnects.
private func reconnect() async {
let resumePayload: JSON = [
Expand Down Expand Up @@ -219,6 +234,20 @@ class Gateway {
"d": wsResume == nil ? NIL : wsResume!.sequence
]
sendFrame(payload)

// Verify the heartbeat was ACK'd. Discord ACKs are pretty much instant
// so a 1.1 second delay should be more than enough.
elg.any().scheduleTask(in: .milliseconds(1100)) {
if self.clientHeartbeatAcks > 0 {
self.clientHeartbeatAcks = -1
Log.message("heartbeat ACK (client)")
} else {
self.clientHeartbeatAcks = 0
Task {
try await self.danglingReconnect(log: "client did not receive HEARTBEAT ACK after a heartbeat was sent - attempting dangling reconnecting...")
}
}
}
}

/// Shortcut for sending data (as `ByteBuffer`) through the websocket.
Expand All @@ -227,9 +256,13 @@ class Gateway {
}

/// Reset core gateway values so a proper reconnect can occur.
func resetGatewayValues() {
func resetGatewayValues(withCancel: Bool) {
receivedReconnectRequest = false
heartbeatTask?.cancel()
nilClose = false
clientHeartbeatAcks = -1
if withCancel {
heartbeatTask?.cancel()
}
}

/// Simply splits the scheme and host from the standard get gateway bot/resume URLs.
Expand All @@ -242,9 +275,9 @@ class Gateway {
let results = try await bot.http.getGatewayBot()
let cmps = extractURLComponents(results.info)

resetGatewayValues()
resetGatewayValues(withCancel: true)
shards = results.shards

try await WebSocket.connect(
scheme: cmps.scheme,
host: cmps.host,
Expand All @@ -260,65 +293,75 @@ class Gateway {
}

/// Handles when the websocket is closed by Discord.
func websocketClosed() throws {
private func websocketClosed() throws {

func new(log: String) {
Log.message(log + " - starting new session...")
Task { try await startNewSession() }
}

if let code = ws.closeCode {
let end = " - starting new session..."
switch code {
case .unknown(let cc):
switch cc {
case 4000:
throw GatewayError.unknownError("Something went wrong")
throw GatewayError.unknownError("something went wrong")
case 4001:
throw GatewayError.unknownOpcode("An invalid opcode was sent")
throw GatewayError.unknownOpcode("an invalid opcode was sent")
case 4002:
throw GatewayError.decodeError("An invalid payload was sent")
throw GatewayError.decodeError("an invalid payload was sent")
case 4003:
throw GatewayError.notAuthenticated("A payload was sent prior to identifying")
throw GatewayError.notAuthenticated("a payload was sent prior to identifying")
case 4004:
throw GatewayError.authenticationFailed("The token sent with the identify payload was incorrect")
throw GatewayError.authenticationFailed("the token sent with the identify payload was incorrect")
case 4005:
throw GatewayError.alreadyAuthenticated("More than one identify payload was sent")
throw GatewayError.alreadyAuthenticated("more than one identify payload was sent")
case 4007:
// GatewayError.invalidSequence can reconnect with a new session
Log.message("gateway error: Invalid sequence" + end)
Task { try await self.startNewSession() }
new(log: "gateway error: Invalid sequence")
case 4008:
throw GatewayError.rateLimited("Too many payloads are being sent")
throw GatewayError.rateLimited("too many payloads are being sent")
case 4009:
// GatewayError.sessionTimedOut can reconnect with a new session
Log.message("gateway error: Session timed out" + end)
Task { try await self.startNewSession() }
new(log: "gateway error: Session timed out")
case 4010:
throw GatewayError.invalidShard("An invalid shard was sent when identifying")
throw GatewayError.invalidShard("an invalid shard was sent when identifying")
case 4011:
throw GatewayError.shardingRequired("Sharding your connection is required in order to connect")
throw GatewayError.shardingRequired("sharding your connection is required in order to connect")
case 4012:
throw GatewayError.invalidApiVersion("An invalid version of the gateway was sent")
throw GatewayError.invalidApiVersion("an invalid version of the gateway was sent")
case 4013:
throw GatewayError.invalidIntents("Invalid intents were sent")
throw GatewayError.invalidIntents("invalid intents were sent")
case 4014:
let message = """
A disallowed intent was sent. An intent may have been specified that you have \
not enabled or are not approved for. Verify your privileged intents are enabled in your developer portal.
"""
throw GatewayError.disallowedIntents(message)
case danglingClose:
// Ignore dangling close code
break
default:
return
Log.fatal("unhandled Discord websocket close code (\(cc))")
}
default:
// The only "unknown" error that *should* happen is `.normalClosure` via `Bot.disconnect()` or an
// external connection issue Everything else is unexpected.
if code != .normalClosure {
Log.message("unexpected gateway error (\(code)) dispatched by WebSocket" + end)
Task { try await self.startNewSession() }
break
}
} else {
// This must be here because WebSocket.onClose() dispatches this method on every Discord reconnect request
if bot.isConnected && !receivedReconnectRequest {
nilClose = true
Task {
try await self.danglingReconnect(log: "gateway disconnected with nil close code - bot connected/no reconnect request - attempting dangling reconnect...")
}
bot.listeners.forEachAsync { await $0.onDisconnect() }
} else {
Log.fatal("gateway disconnected with nil close code - unknown/unhandled reason")
}
}
}

/// Handle all incoming gateway events.
func receive() {
private func receive() {
ws.onText { [unowned self] (_, message) in
defer { receive() }

Expand All @@ -329,7 +372,8 @@ class Gateway {
case Opcode.dispatch:
if let event = GatewayEvent(rawValue: gatewayPayload.t!) {
if event == .ready {
Log.message("READY - initializing state")
bot.isConnected = true
Log.message("received READY - initializing cache")
wsResume = ResumePayload(data: resumePayload)
wsResume?.resumeGatewayURL = gatewayPayload.d?["resume_gateway_url"] as? String
bot.user = ClientUser(bot: bot, clientUserData: gatewayPayload.d!["user"] as! JSON)
Expand All @@ -344,11 +388,14 @@ class Gateway {
}

case Opcode.hello:
// Discord states there's no need to re-identify after a RECONNECT
guard !receivedReconnectRequest else {
Log.message("received HELLO from RECONNECT - ignoring")
return
let reset = { (log: String) -> Void in
Log.message(log)
self.resetGatewayValues(withCancel: false)
}
// Discord states there's no need to re-identify after a RECONNECT
if receivedReconnectRequest { reset("received HELLO from RECONNECT - ignoring"); return }
else if nilClose { reset("received HELLO from NIL CLOSE RECONNECT - ignoring"); return }
else if clientHeartbeatAcks == 0 { reset("received HELLO from FAILED HEARTBEAT ACK RECONNECT - ignoring"); return }

Log.message("received HELLO - identifying...")
let opHello = HTTPClient.strJsonToDict(message)
Expand Down Expand Up @@ -400,6 +447,7 @@ class Gateway {

case Opcode.heartbeatAck:
Log.message("heartbeat ACK")
clientHeartbeatAcks += 2

default:
break
Expand Down Expand Up @@ -437,11 +485,8 @@ class Gateway {
private func dispatchAndUpdate(event: GatewayEvent, data: JSON) async {

func getGuildId(_ data: JSON) -> Snowflake {
if data.keys.contains(where: { $0 == "guild_id" }) {
return Conversions.snowflakeToUInt(data["guild_id"])
} else {
Log.fatal("guild_id was not found in JSON response")
}
precondition(data.keys.contains("guild_id"), "guild_id was not found in JSON response")
return Conversions.snowflakeToUInt(data["guild_id"])
}

let dispatch = bot.listeners.forEachAsync
Expand All @@ -465,6 +510,7 @@ class Gateway {

case .reconnect:
Log.message("received RECONNECT via DISPATCH - reconnecting...")
receivedReconnectRequest = true
await reconnect()

case .invalidSession:
Expand Down

0 comments on commit 16cf349

Please sign in to comment.