From 46f128c9d165775ec1cee6d129238ee8ec8ae0d8 Mon Sep 17 00:00:00 2001 From: Abitofevrything <54505189+abitofevrything@users.noreply.github.com> Date: Sun, 26 Nov 2023 20:09:43 +0100 Subject: [PATCH] Try to fix invalid sessions received when reconnecting to the Gateway (#592) * Adjust log level for unresumable sessions * Add delay when catching errors on the shard runner * Fix logging plugin not directing logs to the right output at stderrLevel * Remove duplicate error logging * Add logging for payloads sent by shard * Use error close code when reconnecting --- lib/nyxx.dart | 2 +- lib/src/gateway/gateway.dart | 9 ++------- lib/src/gateway/message.dart | 9 +++++++++ lib/src/gateway/shard.dart | 10 +++++++--- lib/src/gateway/shard_runner.dart | 29 +++++++++++++++++++---------- lib/src/plugin/logging.dart | 2 +- 6 files changed, 39 insertions(+), 22 deletions(-) diff --git a/lib/nyxx.dart b/lib/nyxx.dart index 3c6582746..870b1d66a 100644 --- a/lib/nyxx.dart +++ b/lib/nyxx.dart @@ -100,7 +100,7 @@ export 'src/http/managers/interaction_manager.dart' show InteractionManager; export 'src/http/managers/entitlement_manager.dart' show EntitlementManager; export 'src/gateway/gateway.dart' show Gateway; -export 'src/gateway/message.dart' show Disconnecting, Dispose, ErrorReceived, EventReceived, GatewayMessage, Send, ShardData, ShardMessage; +export 'src/gateway/message.dart' show Disconnecting, Dispose, ErrorReceived, EventReceived, GatewayMessage, Send, Sent, ShardData, ShardMessage; export 'src/gateway/shard.dart' show Shard; export 'src/models/discord_color.dart' show DiscordColor; diff --git a/lib/src/gateway/gateway.dart b/lib/src/gateway/gateway.dart index ca886858e..e20a8cc2b 100644 --- a/lib/src/gateway/gateway.dart +++ b/lib/src/gateway/gateway.dart @@ -92,13 +92,7 @@ class Gateway extends GatewayManager with EventParser { Gateway(this.client, this.gatewayBot, this.shards, this.totalShards, this.shardIds) : super.create() { for (final shard in shards) { shard.listen( - (message) { - if (message is ErrorReceived) { - shard.logger.warning('Received error: ${message.error}', message.error, message.stackTrace); - } - - _messagesController.add(message); - }, + _messagesController.add, onError: _messagesController.addError, onDone: () async { if (_closing) { @@ -109,6 +103,7 @@ class Gateway extends GatewayManager with EventParser { throw ShardDisconnectedError(shard); }, + cancelOnError: false, ); } diff --git a/lib/src/gateway/message.dart b/lib/src/gateway/message.dart index 1da7da02e..098d36d35 100644 --- a/lib/src/gateway/message.dart +++ b/lib/src/gateway/message.dart @@ -61,6 +61,15 @@ class Disconnecting extends ShardMessage { Disconnecting({required this.reason}); } +/// A shard message sent when the shard adds a payload to the connection. +class Sent extends ShardMessage { + /// The payload that was sent. + final Send payload; + + /// Create a new [Sent]. + Sent({required this.payload}); +} + /// The base class for all control messages sent from the client to the shard. abstract class GatewayMessage with ToStringHelper {} diff --git a/lib/src/gateway/shard.dart b/lib/src/gateway/shard.dart index f70697f8a..1261abc51 100644 --- a/lib/src/gateway/shard.dart +++ b/lib/src/gateway/shard.dart @@ -45,7 +45,11 @@ class Shard extends Stream implements StreamSink { /// Create a new [Shard]. Shard(this.id, this.isolate, this.receiveStream, this.sendPort, this.client) { final subscription = listen((message) { - if (message is ErrorReceived) { + if (message is Sent) { + logger + ..fine('Sent payload: ${message.payload.opcode.name}') + ..finer('Opcode: ${message.payload.opcode.value}, Data: ${message.payload.data}'); + } else if (message is ErrorReceived) { logger.warning('Error: ${message.error}', message.error, message.stackTrace); } else if (message is Disconnecting) { logger.info('Disconnecting: ${message.reason}'); @@ -61,7 +65,7 @@ class Shard extends Stream implements StreamSink { if (isResumable) { logger.info('Reconnecting: invalid session'); } else { - logger.severe('Unresumable invalid session, disconnecting'); + logger.warning('Reconnecting: unresumable invalid session'); } case HelloEvent(:final heartbeatInterval): logger.finest('Heartbeat Interval: $heartbeatInterval'); @@ -141,7 +145,7 @@ class Shard extends Stream implements StreamSink { void add(GatewayMessage event) { if (event is Send) { logger - ..fine('Send: ${event.opcode.name}') + ..fine('Sending: ${event.opcode.name}') ..finer('Opcode: ${event.opcode.value}, Data: ${event.data}'); } else if (event is Dispose) { logger.info('Disposing'); diff --git a/lib/src/gateway/shard_runner.dart b/lib/src/gateway/shard_runner.dart index 80655bdab..6dc85d44f 100644 --- a/lib/src/gateway/shard_runner.dart +++ b/lib/src/gateway/shard_runner.dart @@ -58,7 +58,7 @@ class ShardRunner { final controller = StreamController(); // The subscription to the control messages stream. - // This subscription is paused whenever the shard is not successfully connected,. + // This subscription is paused whenever the shard is not successfully connected. final controlSubscription = messages.listen((message) { if (message is Send) { connection!.add(message); @@ -87,6 +87,7 @@ class ShardRunner { // Open the websocket connection. connection = await ShardConnection.connect(gatewayUri.toString(), this); + connection!.onSent.listen(controller.add); // Obtain the heartbeat interval from the HELLO event and start heartbeating. final hello = await connection!.first; @@ -105,7 +106,7 @@ class ShardRunner { sendIdentify(); } - canResume = false; + canResume = true; // We are connected, start handling control messages. controlSubscription.resume(); @@ -127,7 +128,7 @@ class ShardRunner { } } else if (event is ReconnectEvent) { canResume = true; - connection!.close(); + connection!.close(4000); } else if (event is InvalidSessionEvent) { if (event.isResumable) { canResume = true; @@ -136,7 +137,8 @@ class ShardRunner { gatewayUri = originalGatewayUri; } - connection!.close(); + // Don't use 4000 as it will always try to resume + connection!.close(4999); } else if (event is HeartbeatAckEvent) { lastHeartbeatAcked = true; heartbeatStopwatch = null; @@ -159,8 +161,7 @@ class ShardRunner { // Check if we can resume based on close code. // A manual close where we set closeCode earlier would have a close code of 1000, so this // doesn't change closeCode if we set it manually. - // 1001 is the close code used for a ping failure, so include it in the resumable codes. - const resumableCodes = [null, 1001, 4000, 4001, 4002, 4003, 4007, 4008, 4009]; + const resumableCodes = [null, 4000, 4001, 4002, 4003, 4007, 4008, 4009]; final closeCode = connection!.websocket.closeCode; canResume = canResume || resumableCodes.contains(closeCode); @@ -171,9 +172,11 @@ class ShardRunner { } } catch (error, stackTrace) { controller.add(ErrorReceived(error: error, stackTrace: stackTrace)); + // Prevents the while-true loop from looping too often when no internet is available. + await Future.delayed(Duration(milliseconds: 100)); } finally { // Reset connection properties. - connection?.close(); + connection?.close(4000); connection = null; heartbeatTimer?.cancel(); heartbeatTimer = null; @@ -246,11 +249,13 @@ class ShardConnection extends Stream implements StreamSink { final Stream events; final ShardRunner runner; + final StreamController _sentController = StreamController(); + Stream get onSent => _sentController.stream; + ShardConnection(this.websocket, this.events, this.runner); static Future connect(String gatewayUri, ShardRunner runner) async { final connection = await WebSocket.connect(gatewayUri); - connection.pingInterval = const Duration(seconds: 20); final uncompressedStream = switch (runner.data.apiOptions.compression) { GatewayCompression.transport => decompressTransport(connection.cast>()), @@ -293,6 +298,7 @@ class ShardConnection extends Stream implements StreamSink { }; websocket.add(encoded); + _sentController.add(Sent(payload: event)); } @override @@ -302,10 +308,13 @@ class ShardConnection extends Stream implements StreamSink { Future addStream(Stream stream) => stream.forEach(add); @override - Future close([int? code]) => websocket.close(code ?? 1000); + Future close([int? code]) async { + await websocket.close(code ?? 1000); + await _sentController.close(); + } @override - Future get done => websocket.done; + Future get done => websocket.done.then((_) => _sentController.done); } Stream decompressTransport(Stream> raw) { diff --git a/lib/src/plugin/logging.dart b/lib/src/plugin/logging.dart index a2ce512a2..7567ef32e 100644 --- a/lib/src/plugin/logging.dart +++ b/lib/src/plugin/logging.dart @@ -108,7 +108,7 @@ class Logging extends NyxxPlugin { } } - final outSink = rec.level > stderrLevel ? stderr : stdout; + final outSink = rec.level >= stderrLevel ? stderr : stdout; outSink.write(messageString); }); }