Skip to content
This repository has been archived by the owner on Jan 3, 2024. It is now read-only.

Commit

Permalink
Additional gateway disconnect updates
Browse files Browse the repository at this point in the history
This also adds parameter `clearCache` to method `Bot.disconnect()`
  • Loading branch information
Defxult committed Oct 16, 2023
1 parent 09292fd commit 4ab0a44
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 85 deletions.
14 changes: 8 additions & 6 deletions Sources/Discord/Discord.swift
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public class Bot {
/// Connect to Discord.
public func connect() {
if !isConnected {
try! app.eventLoopGroup.any().makeFutureWithTask {
try! loop.makeFutureWithTask {
try! await self.gw.startNewSession()
}.wait()
if let onceExecute {
Expand Down Expand Up @@ -427,13 +427,15 @@ public class Bot {
}
}

/// Disconnect from Discord and clears the cache.
public func disconnect() {
/// Disconnect from Discord.
/// - Parameter clearCache: Whether to clear the cache once disconnected.
public func disconnect(clearCache: Bool = true) {
if isConnected {
try! gw.ws.close(code: .normalClosure).wait()
isConnected = false // needs to be before ws.close()
gw.ws.close(code: .normalClosure, promise: nil)
gw.resetGatewayValues(withCancel: true)
clearCache()
isConnected = false
gw.heartbeatTask = nil
if clearCache { self.clearCache() }
}
}

Expand Down
154 changes: 75 additions & 79 deletions Sources/Discord/Gateway.swift
Original file line number Diff line number Diff line change
Expand Up @@ -137,37 +137,29 @@ extension Array<JSON> {
}
}

fileprivate enum HeartbeatResponse {

/// Heartbeat was sent and ACK by Discord.
case acknowledged

/// Heartbeat was sent but NOT ACK by Discord.
case noResponse

/// Response was reset after a heartbeat was sent and ACK by Discord (or initial value at startup)
case reset
fileprivate enum ConnectionType {
case reconnect
case session
}

class Gateway {

var ws: WebSocket!
var heartbeatTask: RepeatedTask? = nil
var heartbeatTask: RepeatedTask?
var initialState: InitialState?

private var bot: Bot
private var wsResume: ResumePayload?
private var gp: GatewayPayload?
private var shards: Int?
private var clientHeartbearScheduler: Scheduled<Void>?
private var heartbeatInterval = 0
private var receivedReconnectRequest = false
private var nilClose = false
private var heartbeatAllowed = true
private var clientHeartbeatResponse = HeartbeatResponse.reset
private var idenitifyAllowed = true
private var connectionOpen = false

private let elg: EventLoopGroup
private let loop: EventLoop
private let danglingClose: UInt16 = 9900
private let codeDelayedClose: UInt16 = 9900
private let settings = (
port: 443,
query: "v=10&encoding=json",
Expand All @@ -180,13 +172,6 @@ class Gateway {
self.loop = elg.any()
}

/// Close the connection with anything other than code 1000 or 1001 and reconnect.
private func danglingReconnect(log: String) async throws {
Log.message(log)
try await ws.close(code: .unknown(danglingClose))
await reconnect()
}

/// Sends the RESUME payload for reconnects.
private func reconnect() async {
let resumePayload: JSON = [
Expand Down Expand Up @@ -221,14 +206,15 @@ class Gateway {
private func websocketSetup(websocket: WebSocket) {
ws = websocket
receive()
connectionOpen = true
ws.onClose.whenComplete { _ in try! self.websocketClosed() }
}

/// Cancels the existing heartbeat (if any) and starts a new one.
private func startHeartbeat(log: String) {
Log.message(log)
heartbeatTask?.cancel()
heartbeatTask = loop.scheduleRepeatedTask(initialDelay: .seconds(20), delay: .milliseconds(Int64(heartbeatInterval)), { _ in
heartbeatTask = loop.scheduleRepeatedTask(initialDelay: .seconds(10), delay: .milliseconds(Int64(heartbeatInterval)), { _ in
self.sendHeartbeat()
})
}
Expand All @@ -242,19 +228,21 @@ class Gateway {
sendFrame(payload)
Log.message("heartbeat sent")

// Verify the heartbeat was ACK'd. Discord ACKs are pretty much instant
// so a 1.1 second delay should be more than enough.
loop.scheduleTask(in: .milliseconds(1100)) { [self] in
if clientHeartbeatResponse == .acknowledged {
clientHeartbeatResponse = .reset
Log.message("heartbeat ACK (client)")
} else {
clientHeartbeatResponse = .noResponse
Task {
try await danglingReconnect(log: "client did not receive HEARTBEAT ACK after a heartbeat was sent - attempting dangling reconnecting...")
}
let promise = loop.makePromise(of: Void.self)
let task = promise.completeWithTask {
await sleep(5000)
guard let _ = try? Task.checkCancellation() else { return }
// There's a chance the connection could be closed mid sleep, so only execute if it's open
if self.connectionOpen {
Log.message("client did not receive HEARTBEAT ACK after a heartbeat was sent - reconnecting...")
self.newConnection(type: .reconnect)
}
}

clientHeartbearScheduler = .init(promise: promise, cancellationTask: {
task.cancel()
Log.message("heartbeat ACK (client)")
})
}

private func suspendHeartbeat(log: String) {
Expand All @@ -267,11 +255,22 @@ class Gateway {
ws.send(data.serialize)
}

/// Reset core gateway values so a proper reconnect can occur.
/// Open a new connection. Either a reconnect or a new session.
private func newConnection(type: ConnectionType) {
_ = loop.makeFutureWithTask {
switch type {
case .reconnect:
await self.reconnect()
case .session:
try! await self.startNewSession()
}
}
}

/// Reset core gateway values so a proper reconnect/new session can occur.
func resetGatewayValues(withCancel: Bool) {
receivedReconnectRequest = false
nilClose = false
clientHeartbeatResponse = .reset
idenitifyAllowed = true
connectionOpen = false
if withCancel {
heartbeatTask?.cancel()
}
Expand Down Expand Up @@ -304,15 +303,11 @@ class Gateway {
).get()
}

/// Handles when the websocket is closed by Discord.
/// Handles when the websocket is closed.
private func websocketClosed() throws {
connectionOpen = false
suspendHeartbeat(log: "gateway disconnected")

func new(log: String) {
Log.message(log + " - starting new session...")
Task { try await startNewSession() }
}

if let code = ws.closeCode {
switch code {
case .unknown(let cc):
Expand All @@ -331,12 +326,16 @@ class Gateway {
throw GatewayError.alreadyAuthenticated("more than one identify payload was sent")
case 4007:
// GatewayError.invalidSequence can reconnect with a new session
new(log: "gateway error: Invalid sequence")
Log.message("gateway error: Invalid sequence - starting new session")
newConnection(type: .session)
return
case 4008:
throw GatewayError.rateLimited("too many payloads are being sent")
case 4009:
// GatewayError.sessionTimedOut can reconnect with a new session
new(log: "gateway error: Session timed out")
Log.message("gateway error: Session timed out - starting new session")
newConnection(type: .session)
return
case 4010:
throw GatewayError.invalidShard("an invalid shard was sent when identifying")
case 4011:
Expand All @@ -351,24 +350,29 @@ class Gateway {
not enabled or are not approved for. Verify your privileged intents are enabled in your developer portal.
"""
throw GatewayError.disallowedIntents(message)
case danglingClose:
// Ignore dangling close code
break
default:
Log.fatal("unhandled Discord websocket close code (\(cc))")
Log.fatal("unhandled websocket close code (\(cc))")
}
default:
break
Log.message("close code (\(code)) - ignoring")
}
}

if ws.closeCode == nil {
// A new session must be started here. If the connection is closed with a nil close code,
// vapor disconnected (for whatever reason). If that occurs, simply reconnecting somehow makes
// the stability of the connection even worse by having it disconnect randomly more frequently,
// sometimes significantly more frequent. Creating a brand new connection "refreshes" connection
// stability.
Log.message("connection closed with nil close code - starting new session...")
newConnection(type: .session)
} else {
// This must be here because WebSocket.onClose() dispatches this method on every Discord reconnect request
if bot.isConnected && !receivedReconnectRequest {
nilClose = true
Task {
try await self.danglingReconnect(log: "gateway disconnected with nil close code - bot connected/no reconnect request - attempting dangling reconnect...")
}
} else {
Log.fatal("gateway disconnected with nil close code - unknown/unhandled reason")
// If the end user did not terminate the connection, we should always reconnect.
// Also, if this is reached, that means it did not `return`. If it returned, a new
// session was required, not a reconnect. Additionally, .isConnected will never be
// set to `false` unless done so by the end user via Bot.disconnect() or Bot.close()
if bot.isConnected {
newConnection(type: .reconnect)
}
}
}
Expand Down Expand Up @@ -401,15 +405,11 @@ class Gateway {
}

case Opcode.hello:
let reset = { [self] (log: String) -> Void in
Log.message(log)
resetGatewayValues(withCancel: false)
startHeartbeat(log: "heartbeat reset after reconnect")
guard idenitifyAllowed else {
Log.message("received HELLO when IDENTIFY is not allowed - ignoring")
startHeartbeat(log: "heartbeat resumed from previous disconnect")
return
}
// Discord states there's no need to re-identify after a RECONNECT
if receivedReconnectRequest { reset("received HELLO from RECONNECT REQUEST - ignoring"); return }
else if nilClose { reset("received HELLO from NIL CLOSE RECONNECT - ignoring"); return }
else if clientHeartbeatResponse == .noResponse { reset("received HELLO from FAILED HEARTBEAT ACK RECONNECT - ignoring"); return }

Log.message("received HELLO - identifying...")
let opHello = HTTPClient.strJsonToDict(message)
Expand Down Expand Up @@ -437,24 +437,23 @@ class Gateway {
]

sendFrame(identity)
idenitifyAllowed = false
startHeartbeat(log: "starting INDEFINITE HEARTBEAT from IDENTIFY")

case Opcode.invalidSession:
Log.message("gateway SESSION INVALIDATED - starting new session...")
Task { try await startNewSession() }
Log.message("gateway SESSION INVALIDATED - waiting for disconnect and automatic new session...")

case Opcode.reconnect:
Log.message("gateway requested RECONNECT - reconnecting...")
receivedReconnectRequest = true
Task { await reconnect() }
Log.message("gateway requested RECONNECT - waiting for disconnect and automatic reconnect...")

case Opcode.heartbeat:
Log.message("gateway requested HEARTBEAT - sending heartbeat...")
sendHeartbeat()

case Opcode.heartbeatAck:
Log.message("heartbeat ACK")
clientHeartbeatResponse = .acknowledged
clientHeartbearScheduler?.cancel()
clientHeartbearScheduler = nil

default:
break
Expand Down Expand Up @@ -516,13 +515,10 @@ class Gateway {
Log.message("gateway resumed")

case .reconnect:
Log.message("received RECONNECT via DISPATCH - reconnecting...")
receivedReconnectRequest = true
await reconnect()
Log.message("received RECONNECT via DISPATCH - waiting for disconnect and automatic reconnect...")

case .invalidSession:
Log.message("received SESSION INVALIDATED via DISPATCH - starting new session...")
try! await startNewSession()
Log.message("received SESSION INVALIDATED via DISPATCH - waiting for disconnect and automatic new session...")

case .applicationCommandPermissionsUpdate:
let permissions = GuildApplicationCommandPermissions(guildAppCommandPermData: data)
Expand Down

0 comments on commit 4ab0a44

Please sign in to comment.