diff --git a/README.md b/README.md index ac69208..e448ed0 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ Connection related features - ❌ Optimistic subscriptions - ❌ Run in separate isolate - ❌ JSON transport +- ❌ DevTools extension ## Example diff --git a/lib/src/model/spinify_interface.dart b/lib/src/model/spinify_interface.dart index cf04344..8d03cdb 100644 --- a/lib/src/model/spinify_interface.dart +++ b/lib/src/model/spinify_interface.dart @@ -1,3 +1,5 @@ +// ignore_for_file: one_member_abstracts + import 'dart:async'; import 'config.dart'; @@ -23,7 +25,8 @@ abstract interface class ISpinify ISpinifyPresenceOwner, ISpinifyHistoryOwner, ISpinifyRemoteProcedureCall, - ISpinifyMetricsOwner { + ISpinifyMetricsOwner, + ISpinifyPing { /// Unique client identifier. abstract final int id; @@ -63,14 +66,12 @@ abstract interface class ISpinifyStateOwner { } /// Spinify send publication interface. -// ignore: one_member_abstracts abstract interface class ISpinifyPublicationSender { /// Publish data to specific subscription channel Future publish(String channel, List data); } /// Spinify send asynchronous message interface. -// ignore: one_member_abstracts abstract interface class ISpinifyAsyncMessageSender { /// Send asynchronous message to a server. This method makes sense /// only when using Centrifuge library for Go on a server side. In Centrifugo @@ -132,7 +133,6 @@ abstract interface class ISpinifyPresenceOwner { } /// Spinify history owner interface. -// ignore: one_member_abstracts abstract interface class ISpinifyHistoryOwner { /// Fetch publication history inside a channel. /// Only for channels where history is enabled. @@ -145,7 +145,6 @@ abstract interface class ISpinifyHistoryOwner { } /// Spinify remote procedure call interface. -// ignore: one_member_abstracts abstract interface class ISpinifyRemoteProcedureCall { /// Send arbitrary RPC and wait for response. Future> rpc(String method, List data); @@ -156,3 +155,9 @@ abstract interface class ISpinifyMetricsOwner { /// Get metrics of Spinify client. SpinifyMetrics get metrics; } + +/// Spinify ping interface. +abstract interface class ISpinifyPing { + /// Send ping to server. + Future ping(); +} diff --git a/lib/src/model/transport_interface.dart b/lib/src/model/transport_interface.dart index a2fdb40..cd6af24 100644 --- a/lib/src/model/transport_interface.dart +++ b/lib/src/model/transport_interface.dart @@ -20,6 +20,10 @@ abstract interface class ISpinifyTransport { // ignore: avoid_setters_without_getters set onReply(void Function(SpinifyReply reply) handler); + /// Set handler for connection close event. + // ignore: avoid_setters_without_getters + set onDisconnect(void Function() handler); + /// Disconnect from the server. /// Client if not needed anymore. Future disconnect([int? code, String? reason]); diff --git a/lib/src/spinify_impl.dart b/lib/src/spinify_impl.dart index 151bbd7..516be23 100644 --- a/lib/src/spinify_impl.dart +++ b/lib/src/spinify_impl.dart @@ -230,9 +230,16 @@ base mixin SpinifyCommandMixin on SpinifyBase { } Future _sendCommand(SpinifyCommand command) async { - final completer = _replies[command.id] = Completer(); - await _sendCommandAsync(command); - return completer.future; + try { + final completer = _replies[command.id] = Completer(); + await _sendCommandAsync(command); + return await completer.future.timeout(config.timeout); + } on Object catch (error, stackTrace) { + final completer = _replies.remove(command.id); + if (completer != null && !completer.isCompleted) + completer.completeError(error, stackTrace); + rethrow; + } } Future _sendCommandAsync(SpinifyCommand command) async { @@ -273,7 +280,11 @@ base mixin SpinifyConnectionMixin // Create new transport. _transport = await _createTransport(url, config.headers) - ..onReply = _onReply; + ..onReply = _onReply + ..onDisconnect = () => _onDisconnect(( + code: SpinifyConnectingCode.transportClosed, + reason: 'Transport closed', + )).ignore(); // Prepare connect request. final request = await _prepareConnectRequest(); @@ -355,6 +366,14 @@ base mixin SpinifyPingPongMixin @nonVirtual Timer? _pingTimer; + @override + Future ping() => _bucket.push( + ClientEvent.command, + (int id, DateTime timestamp) => SpinifyPingRequest( + id: id, + timestamp: timestamp, + )); + /// Stop keepalive timer. @protected @nonVirtual @@ -369,13 +388,18 @@ base mixin SpinifyPingPongMixin _tearDownPingTimer(); assert(!_isClosed, 'Client is closed'); assert(state.isConnected, 'Invalid state'); - if (state case SpinifyState$Connected(:Duration pingInterval)) { + if (state case SpinifyState$Connected(:Duration? pingInterval) + when pingInterval != null && pingInterval > Duration.zero) { _pingTimer = Timer( pingInterval + config.serverPingDelay, - () => disconnect( - SpinifyConnectingCode.noPing, - 'No ping from server', - ), + () { + // Reconnect if no pong received. + if (state case SpinifyState$Connected(:String url)) connect(url); + /* disconnect( + SpinifyConnectingCode.noPing, + 'No ping from server', + ); */ + }, ); } } diff --git a/lib/src/transport_fake.dart b/lib/src/transport_fake.dart index cbc6d38..98cb771 100644 --- a/lib/src/transport_fake.dart +++ b/lib/src/transport_fake.dart @@ -1,5 +1,3 @@ -// ignore_for_file: avoid_setters_without_getters - import 'dart:async'; import 'package:fixnum/fixnum.dart'; @@ -120,18 +118,25 @@ class SpinifyTransportFake implements ISpinifyTransport { Duration(milliseconds: _delay), () { if (!_isConnected) return; - _handler?.call(reply(DateTime.now())); + _onReply?.call(reply(DateTime.now())); }, ); @override - set onReply(void Function(SpinifyReply reply) handler) => _handler = handler; - void Function(SpinifyReply reply)? _handler; + // ignore: avoid_setters_without_getters + set onReply(void Function(SpinifyReply reply) handler) => _onReply = handler; + void Function(SpinifyReply reply)? _onReply; + + @override + // ignore: avoid_setters_without_getters + set onDisconnect(void Function() handler) => _onDisconnect = handler; + void Function()? _onDisconnect; @override Future disconnect([int? code, String? reason]) async { if (!_isConnected) return; await _sleep(); + _onDisconnect?.call(); _timer?.cancel(); _timer = null; } diff --git a/lib/src/transport_ws_pb_vm.dart b/lib/src/transport_ws_pb_vm.dart index 3bdc910..6818803 100644 --- a/lib/src/transport_ws_pb_vm.dart +++ b/lib/src/transport_ws_pb_vm.dart @@ -36,17 +36,26 @@ final class SpinifyTransport$WS$PB$VM implements ISpinifyTransport { _subscription = _socket.listen( _onData, cancelOnError: false, + onDone: () { + assert(_onDisconnect != null, 'Disconnect handler is not set'); + _onDisconnect?.call(); + }, ); } final io.WebSocket _socket; late final StreamSubscription _subscription; - void Function(SpinifyReply reply)? _handler; + void Function(SpinifyReply reply)? _onReply; @override // ignore: avoid_setters_without_getters - set onReply(void Function(SpinifyReply reply) handler) => _handler = handler; + set onReply(void Function(SpinifyReply reply) handler) => _onReply = handler; + + @override + // ignore: avoid_setters_without_getters + set onDisconnect(void Function() handler) => _onDisconnect = handler; + void Function()? _onDisconnect; void _onData(Object? bytes) { const decoder = ProtobufReplyDecoder(); @@ -55,8 +64,8 @@ final class SpinifyTransport$WS$PB$VM implements ISpinifyTransport { return; } final reply = decoder.convert(bytes); - assert(_handler != null, 'Handler is not set'); - _handler?.call(reply); + assert(_onReply != null, 'Reply handler is not set'); + _onReply?.call(reply); } @override diff --git a/test/smoke/smoke_test.dart b/test/smoke/smoke_test.dart index 694cdb6..982fd59 100644 --- a/test/smoke/smoke_test.dart +++ b/test/smoke/smoke_test.dart @@ -1,5 +1,3 @@ -import 'dart:convert'; - import 'package:spinify/spinify.dart'; import 'package:test/test.dart'; @@ -10,7 +8,8 @@ void main() { final client = Spinify(); await client.connect(url); expect(client.state, isA()); - await client.send(utf8.encode('Hello, Spinify!')); + await client.ping(); + //await client.send(utf8.encode('Hello, Spinify!')); await client.disconnect(); expect(client.state, isA()); await client.close();