Skip to content

Commit

Permalink
Try to fix invalid sessions received when reconnecting to the Gateway (
Browse files Browse the repository at this point in the history
…#592)

* Adjust log level for unresumable sessions

* Add delay when catching errors on the shard runner

* Fix logging plugin not directing logs to the right output at stderrLevel

* Remove duplicate error logging

* Add logging for payloads sent by shard

* Use error close code when reconnecting
  • Loading branch information
abitofevrything authored Nov 26, 2023
1 parent 573c9bc commit 46f128c
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 22 deletions.
2 changes: 1 addition & 1 deletion lib/nyxx.dart
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ export 'src/http/managers/interaction_manager.dart' show InteractionManager;
export 'src/http/managers/entitlement_manager.dart' show EntitlementManager;

export 'src/gateway/gateway.dart' show Gateway;
export 'src/gateway/message.dart' show Disconnecting, Dispose, ErrorReceived, EventReceived, GatewayMessage, Send, ShardData, ShardMessage;
export 'src/gateway/message.dart' show Disconnecting, Dispose, ErrorReceived, EventReceived, GatewayMessage, Send, Sent, ShardData, ShardMessage;
export 'src/gateway/shard.dart' show Shard;

export 'src/models/discord_color.dart' show DiscordColor;
Expand Down
9 changes: 2 additions & 7 deletions lib/src/gateway/gateway.dart
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,7 @@ class Gateway extends GatewayManager with EventParser {
Gateway(this.client, this.gatewayBot, this.shards, this.totalShards, this.shardIds) : super.create() {
for (final shard in shards) {
shard.listen(
(message) {
if (message is ErrorReceived) {
shard.logger.warning('Received error: ${message.error}', message.error, message.stackTrace);
}

_messagesController.add(message);
},
_messagesController.add,
onError: _messagesController.addError,
onDone: () async {
if (_closing) {
Expand All @@ -109,6 +103,7 @@ class Gateway extends GatewayManager with EventParser {

throw ShardDisconnectedError(shard);
},
cancelOnError: false,
);
}

Expand Down
9 changes: 9 additions & 0 deletions lib/src/gateway/message.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ class Disconnecting extends ShardMessage {
Disconnecting({required this.reason});
}

/// A shard message sent when the shard adds a payload to the connection.
class Sent extends ShardMessage {
/// The payload that was sent.
final Send payload;

/// Create a new [Sent].
Sent({required this.payload});
}

/// The base class for all control messages sent from the client to the shard.
abstract class GatewayMessage with ToStringHelper {}

Expand Down
10 changes: 7 additions & 3 deletions lib/src/gateway/shard.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
/// Create a new [Shard].
Shard(this.id, this.isolate, this.receiveStream, this.sendPort, this.client) {
final subscription = listen((message) {
if (message is ErrorReceived) {
if (message is Sent) {
logger
..fine('Sent payload: ${message.payload.opcode.name}')
..finer('Opcode: ${message.payload.opcode.value}, Data: ${message.payload.data}');
} else if (message is ErrorReceived) {
logger.warning('Error: ${message.error}', message.error, message.stackTrace);
} else if (message is Disconnecting) {
logger.info('Disconnecting: ${message.reason}');
Expand All @@ -61,7 +65,7 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
if (isResumable) {
logger.info('Reconnecting: invalid session');
} else {
logger.severe('Unresumable invalid session, disconnecting');
logger.warning('Reconnecting: unresumable invalid session');
}
case HelloEvent(:final heartbeatInterval):
logger.finest('Heartbeat Interval: $heartbeatInterval');
Expand Down Expand Up @@ -141,7 +145,7 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
void add(GatewayMessage event) {
if (event is Send) {
logger
..fine('Send: ${event.opcode.name}')
..fine('Sending: ${event.opcode.name}')
..finer('Opcode: ${event.opcode.value}, Data: ${event.data}');
} else if (event is Dispose) {
logger.info('Disposing');
Expand Down
29 changes: 19 additions & 10 deletions lib/src/gateway/shard_runner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ShardRunner {
final controller = StreamController<ShardMessage>();

// The subscription to the control messages stream.
// This subscription is paused whenever the shard is not successfully connected,.
// This subscription is paused whenever the shard is not successfully connected.
final controlSubscription = messages.listen((message) {
if (message is Send) {
connection!.add(message);
Expand Down Expand Up @@ -87,6 +87,7 @@ class ShardRunner {

// Open the websocket connection.
connection = await ShardConnection.connect(gatewayUri.toString(), this);
connection!.onSent.listen(controller.add);

// Obtain the heartbeat interval from the HELLO event and start heartbeating.
final hello = await connection!.first;
Expand All @@ -105,7 +106,7 @@ class ShardRunner {
sendIdentify();
}

canResume = false;
canResume = true;

// We are connected, start handling control messages.
controlSubscription.resume();
Expand All @@ -127,7 +128,7 @@ class ShardRunner {
}
} else if (event is ReconnectEvent) {
canResume = true;
connection!.close();
connection!.close(4000);
} else if (event is InvalidSessionEvent) {
if (event.isResumable) {
canResume = true;
Expand All @@ -136,7 +137,8 @@ class ShardRunner {
gatewayUri = originalGatewayUri;
}

connection!.close();
// Don't use 4000 as it will always try to resume
connection!.close(4999);
} else if (event is HeartbeatAckEvent) {
lastHeartbeatAcked = true;
heartbeatStopwatch = null;
Expand All @@ -159,8 +161,7 @@ class ShardRunner {
// Check if we can resume based on close code.
// A manual close where we set closeCode earlier would have a close code of 1000, so this
// doesn't change closeCode if we set it manually.
// 1001 is the close code used for a ping failure, so include it in the resumable codes.
const resumableCodes = [null, 1001, 4000, 4001, 4002, 4003, 4007, 4008, 4009];
const resumableCodes = [null, 4000, 4001, 4002, 4003, 4007, 4008, 4009];
final closeCode = connection!.websocket.closeCode;
canResume = canResume || resumableCodes.contains(closeCode);

Expand All @@ -171,9 +172,11 @@ class ShardRunner {
}
} catch (error, stackTrace) {
controller.add(ErrorReceived(error: error, stackTrace: stackTrace));
// Prevents the while-true loop from looping too often when no internet is available.
await Future.delayed(Duration(milliseconds: 100));
} finally {
// Reset connection properties.
connection?.close();
connection?.close(4000);
connection = null;
heartbeatTimer?.cancel();
heartbeatTimer = null;
Expand Down Expand Up @@ -246,11 +249,13 @@ class ShardConnection extends Stream<GatewayEvent> implements StreamSink<Send> {
final Stream<GatewayEvent> events;
final ShardRunner runner;

final StreamController<Sent> _sentController = StreamController();
Stream<Sent> get onSent => _sentController.stream;

ShardConnection(this.websocket, this.events, this.runner);

static Future<ShardConnection> connect(String gatewayUri, ShardRunner runner) async {
final connection = await WebSocket.connect(gatewayUri);
connection.pingInterval = const Duration(seconds: 20);

final uncompressedStream = switch (runner.data.apiOptions.compression) {
GatewayCompression.transport => decompressTransport(connection.cast<List<int>>()),
Expand Down Expand Up @@ -293,6 +298,7 @@ class ShardConnection extends Stream<GatewayEvent> implements StreamSink<Send> {
};

websocket.add(encoded);
_sentController.add(Sent(payload: event));
}

@override
Expand All @@ -302,10 +308,13 @@ class ShardConnection extends Stream<GatewayEvent> implements StreamSink<Send> {
Future<void> addStream(Stream<Send> stream) => stream.forEach(add);

@override
Future<void> close([int? code]) => websocket.close(code ?? 1000);
Future<void> close([int? code]) async {
await websocket.close(code ?? 1000);
await _sentController.close();
}

@override
Future<void> get done => websocket.done;
Future<void> get done => websocket.done.then((_) => _sentController.done);
}

Stream<dynamic> decompressTransport(Stream<List<int>> raw) {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/plugin/logging.dart
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class Logging extends NyxxPlugin {
}
}

final outSink = rec.level > stderrLevel ? stderr : stdout;
final outSink = rec.level >= stderrLevel ? stderr : stdout;
outSink.write(messageString);
});
}
Expand Down

0 comments on commit 46f128c

Please sign in to comment.