Skip to content

Commit

Permalink
Add DevTools extension, set onDisconnect handler in transport_interfa…
Browse files Browse the repository at this point in the history
…ce.dart, and use ping instead of send in smoke_test.dart
  • Loading branch information
PlugFox committed May 5, 2024
1 parent 26e6a7d commit 2fa209a
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 26 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Connection related features
- ❌ Optimistic subscriptions
- ❌ Run in separate isolate
- ❌ JSON transport
- ❌ DevTools extension
## Example
Expand Down
15 changes: 10 additions & 5 deletions lib/src/model/spinify_interface.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// ignore_for_file: one_member_abstracts

import 'dart:async';

import 'config.dart';
Expand All @@ -23,7 +25,8 @@ abstract interface class ISpinify
ISpinifyPresenceOwner,
ISpinifyHistoryOwner,
ISpinifyRemoteProcedureCall,
ISpinifyMetricsOwner {
ISpinifyMetricsOwner,
ISpinifyPing {
/// Unique client identifier.
abstract final int id;

Expand Down Expand Up @@ -63,14 +66,12 @@ abstract interface class ISpinifyStateOwner {
}

/// Spinify send publication interface.
// ignore: one_member_abstracts
abstract interface class ISpinifyPublicationSender {
/// Publish data to specific subscription channel
Future<void> publish(String channel, List<int> data);
}

/// Spinify send asynchronous message interface.
// ignore: one_member_abstracts
abstract interface class ISpinifyAsyncMessageSender {
/// Send asynchronous message to a server. This method makes sense
/// only when using Centrifuge library for Go on a server side. In Centrifugo
Expand Down Expand Up @@ -132,7 +133,6 @@ abstract interface class ISpinifyPresenceOwner {
}

/// Spinify history owner interface.
// ignore: one_member_abstracts
abstract interface class ISpinifyHistoryOwner {
/// Fetch publication history inside a channel.
/// Only for channels where history is enabled.
Expand All @@ -145,7 +145,6 @@ abstract interface class ISpinifyHistoryOwner {
}

/// Spinify remote procedure call interface.
// ignore: one_member_abstracts
abstract interface class ISpinifyRemoteProcedureCall {
/// Send arbitrary RPC and wait for response.
Future<List<int>> rpc(String method, List<int> data);
Expand All @@ -156,3 +155,9 @@ abstract interface class ISpinifyMetricsOwner {
/// Get metrics of Spinify client.
SpinifyMetrics get metrics;
}

/// Spinify ping interface.
abstract interface class ISpinifyPing {
/// Send ping to server.
Future<void> ping();
}
4 changes: 4 additions & 0 deletions lib/src/model/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ abstract interface class ISpinifyTransport {
// ignore: avoid_setters_without_getters
set onReply(void Function(SpinifyReply reply) handler);

/// Set handler for connection close event.
// ignore: avoid_setters_without_getters
set onDisconnect(void Function() handler);

/// Disconnect from the server.
/// Client if not needed anymore.
Future<void> disconnect([int? code, String? reason]);
Expand Down
42 changes: 33 additions & 9 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,16 @@ base mixin SpinifyCommandMixin on SpinifyBase {
}

Future<T> _sendCommand<T extends SpinifyReply>(SpinifyCommand command) async {
final completer = _replies[command.id] = Completer<T>();
await _sendCommandAsync(command);
return completer.future;
try {
final completer = _replies[command.id] = Completer<T>();
await _sendCommandAsync(command);
return await completer.future.timeout(config.timeout);
} on Object catch (error, stackTrace) {
final completer = _replies.remove(command.id);
if (completer != null && !completer.isCompleted)
completer.completeError(error, stackTrace);
rethrow;
}
}

Future<void> _sendCommandAsync(SpinifyCommand command) async {
Expand Down Expand Up @@ -273,7 +280,11 @@ base mixin SpinifyConnectionMixin

// Create new transport.
_transport = await _createTransport(url, config.headers)
..onReply = _onReply;
..onReply = _onReply
..onDisconnect = () => _onDisconnect((
code: SpinifyConnectingCode.transportClosed,
reason: 'Transport closed',
)).ignore();

// Prepare connect request.
final request = await _prepareConnectRequest();
Expand Down Expand Up @@ -355,6 +366,14 @@ base mixin SpinifyPingPongMixin
@nonVirtual
Timer? _pingTimer;

@override
Future<void> ping() => _bucket.push(
ClientEvent.command,
(int id, DateTime timestamp) => SpinifyPingRequest(
id: id,
timestamp: timestamp,
));

/// Stop keepalive timer.
@protected
@nonVirtual
Expand All @@ -369,13 +388,18 @@ base mixin SpinifyPingPongMixin
_tearDownPingTimer();
assert(!_isClosed, 'Client is closed');
assert(state.isConnected, 'Invalid state');
if (state case SpinifyState$Connected(:Duration pingInterval)) {
if (state case SpinifyState$Connected(:Duration? pingInterval)
when pingInterval != null && pingInterval > Duration.zero) {
_pingTimer = Timer(
pingInterval + config.serverPingDelay,
() => disconnect(
SpinifyConnectingCode.noPing,
'No ping from server',
),
() {
// Reconnect if no pong received.
if (state case SpinifyState$Connected(:String url)) connect(url);
/* disconnect(
SpinifyConnectingCode.noPing,
'No ping from server',
); */
},
);
}
}
Expand Down
15 changes: 10 additions & 5 deletions lib/src/transport_fake.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// ignore_for_file: avoid_setters_without_getters

import 'dart:async';

import 'package:fixnum/fixnum.dart';
Expand Down Expand Up @@ -120,18 +118,25 @@ class SpinifyTransportFake implements ISpinifyTransport {
Duration(milliseconds: _delay),
() {
if (!_isConnected) return;
_handler?.call(reply(DateTime.now()));
_onReply?.call(reply(DateTime.now()));
},
);

@override
set onReply(void Function(SpinifyReply reply) handler) => _handler = handler;
void Function(SpinifyReply reply)? _handler;
// ignore: avoid_setters_without_getters
set onReply(void Function(SpinifyReply reply) handler) => _onReply = handler;
void Function(SpinifyReply reply)? _onReply;

@override
// ignore: avoid_setters_without_getters
set onDisconnect(void Function() handler) => _onDisconnect = handler;
void Function()? _onDisconnect;

@override
Future<void> disconnect([int? code, String? reason]) async {
if (!_isConnected) return;
await _sleep();
_onDisconnect?.call();
_timer?.cancel();
_timer = null;
}
Expand Down
17 changes: 13 additions & 4 deletions lib/src/transport_ws_pb_vm.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,26 @@ final class SpinifyTransport$WS$PB$VM implements ISpinifyTransport {
_subscription = _socket.listen(
_onData,
cancelOnError: false,
onDone: () {
assert(_onDisconnect != null, 'Disconnect handler is not set');
_onDisconnect?.call();
},
);
}

final io.WebSocket _socket;
late final StreamSubscription<dynamic> _subscription;

void Function(SpinifyReply reply)? _handler;
void Function(SpinifyReply reply)? _onReply;

@override
// ignore: avoid_setters_without_getters
set onReply(void Function(SpinifyReply reply) handler) => _handler = handler;
set onReply(void Function(SpinifyReply reply) handler) => _onReply = handler;

@override
// ignore: avoid_setters_without_getters
set onDisconnect(void Function() handler) => _onDisconnect = handler;
void Function()? _onDisconnect;

void _onData(Object? bytes) {
const decoder = ProtobufReplyDecoder();
Expand All @@ -55,8 +64,8 @@ final class SpinifyTransport$WS$PB$VM implements ISpinifyTransport {
return;
}
final reply = decoder.convert(bytes);
assert(_handler != null, 'Handler is not set');
_handler?.call(reply);
assert(_onReply != null, 'Reply handler is not set');
_onReply?.call(reply);
}

@override
Expand Down
5 changes: 2 additions & 3 deletions test/smoke/smoke_test.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import 'dart:convert';

import 'package:spinify/spinify.dart';
import 'package:test/test.dart';

Expand All @@ -10,7 +8,8 @@ void main() {
final client = Spinify();
await client.connect(url);
expect(client.state, isA<SpinifyState$Connected>());
await client.send(utf8.encode('Hello, Spinify!'));
await client.ping();
//await client.send(utf8.encode('Hello, Spinify!'));
await client.disconnect();
expect(client.state, isA<SpinifyState$Disconnected>());
await client.close();
Expand Down

0 comments on commit 2fa209a

Please sign in to comment.