From b582e11262392927789cffd0b8f92e19968b1425 Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Fri, 21 Jul 2023 23:40:53 +0400 Subject: [PATCH] Update connection --- examples/console/bin/main.dart | 3 +- lib/src/client/centrifuge.dart | 131 ++++++++++++++++++++--- lib/src/client/centrifuge_interface.dart | 11 +- lib/src/model/config.dart | 38 +++++-- lib/src/model/messages.dart | 1 + 5 files changed, 148 insertions(+), 36 deletions(-) create mode 100644 lib/src/model/messages.dart diff --git a/examples/console/bin/main.dart b/examples/console/bin/main.dart index 0c8a458..702dbe8 100644 --- a/examples/console/bin/main.dart +++ b/examples/console/bin/main.dart @@ -31,8 +31,7 @@ void main([List? args]) => runZonedGuarded(() async { }, ); }, (error, stackTrace) { - print('Error: $error'); - print('Stacktrace: $stackTrace'); + print('Critical error: $error'); io.exit(1); }); diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 3564cad..202e4a7 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -1,8 +1,10 @@ +// TODO(plugfox): extract transport from Centrifuge client. import 'dart:async'; import 'package:centrifuge_dart/src/client/centrifuge_interface.dart'; import 'package:centrifuge_dart/src/model/config.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; +import 'package:centrifuge_dart/src/model/protobuf/client.pb.dart' as pb; import 'package:centrifuge_dart/src/model/state.dart'; import 'package:meta/meta.dart'; import 'package:ws/ws.dart'; @@ -10,7 +12,11 @@ import 'package:ws/ws.dart'; /// {@template centrifuge} /// Centrifuge client. /// {@endtemplate} -final class Centrifuge extends CentrifugeBase with CentrifugeConnectionMixin { +final class Centrifuge extends CentrifugeBase + with + CentrifugePingMixin, + CentrifugeConnectionMixin, + CentrifugeHandlerMixin { /// {@macro centrifuge} Centrifuge([CentrifugeConfig? config]) : super(config ?? CentrifugeConfig.defaultConfig()); @@ -97,14 +103,12 @@ abstract base class CentrifugeBase implements ICentrifuge { /// Mixin responsible for connection. /// {@nodoc} @internal -base mixin CentrifugeConnectionMixin on CentrifugeBase { +base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugePingMixin { StreamSubscription? _webSocketStateSubscription; /// {@nodoc} @protected @nonVirtual - @pragma('vm:prefer-inline') - @pragma('dart2js:tryInline') void _setState(CentrifugeState state) { if (_state.type == state.type) return; _stateController.add(_state = state); @@ -112,18 +116,23 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase { @override void _initCentrifuge() { - /// Listen to websocket state changes and update current client state. - _webSocketStateSubscription = _webSocket.stateChanges.listen((state) { - switch (state) { - case WebSocketClientState$Connecting state: - _setState(CentrifugeState$Connecting(url: state.url)); - case WebSocketClientState$Open _: - _setState(CentrifugeState$Connected(url: state.url)); - case WebSocketClientState$Disconnecting _: - case WebSocketClientState$Closed _: - _setState(CentrifugeState$Disconnected()); - } - }); + // Listen to websocket state changes and update current client state. + _webSocketStateSubscription = _webSocket.stateChanges.listen( + (state) { + switch (state) { + case WebSocketClientState$Connecting state: + _setState(CentrifugeState$Connecting(url: state.url)); + case WebSocketClientState$Open _: + _setState(CentrifugeState$Connected(url: state.url)); + _setUpPingTimer(); + case WebSocketClientState$Disconnecting _: + case WebSocketClientState$Closed _: + _tearDownPingTimer(); + _setState(CentrifugeState$Disconnected()); + } + }, + cancelOnError: false, + ); super._initCentrifuge(); } @@ -132,6 +141,27 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase { _setState(CentrifugeState$Connecting(url: url)); try { await _webSocket.connect(url); + } on Object catch (error, stackTrace) { + _webSocket.disconnect().ignore(); + Error.throwWithStackTrace( + CentrifugoConnectionException(error), + stackTrace, + ); + } + pb.ConnectRequest request; + try { + request = pb.ConnectRequest(); + final token = await _config.getToken?.call(); + assert(token == null || token.length > 5, 'Centrifuge JWT is too short'); + if (token != null) request.token = token; + final payload = await _config.getPayload?.call(); + if (payload != null) request.data = payload; + request + ..name = _config.client.name + ..version = _config.client.version; + // TODO(plugfox): add subscriptions. + + // TODO(plugfox): Send request. } on Object catch (error, stackTrace) { Error.throwWithStackTrace( CentrifugoConnectionException(error), @@ -153,10 +183,77 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase { } @override - @mustCallSuper Future close() async { await super.close(); await _webSocket.close(); _webSocketStateSubscription?.cancel().ignore(); } } + +/// Mixin responsible for responses/push/results from server. +/// {@nodoc} +@internal +base mixin CentrifugeHandlerMixin on CentrifugeBase { + StreamSubscription? _webSocketDataSubscription; + @override + void _initCentrifuge() { + // Listen to websocket data and handle it. + _webSocketDataSubscription = + _webSocket.stream.listen(_handleMessage, cancelOnError: false); + super._initCentrifuge(); + } + + @override + Future close() async { + await super.close(); + _webSocketDataSubscription?.cancel().ignore(); + } + + /// {@nodoc} + @protected + @nonVirtual + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') + void _handleMessage(Object response) { + print('Received data: $response'); + } +} + +/// Mixin responsible for sending ping to server. +/// {@nodoc} +base mixin CentrifugePingMixin on CentrifugeBase { + Timer? _pingTimer; + + @nonVirtual + void _setUpPingTimer() {} + + @nonVirtual + void _tearDownPingTimer() { + _pingTimer?.cancel(); + } + + /* @override + Future send(List data) async { + final request = pb.Message()..data = data; + final command = _createCommand( + request, + true, + ); + await _webSocket.add(request); + /* try { + final request = protocol.Message()..data = data; + await _webSocket.add(data); + } on Object catch (error, stackTrace) { + Error.throwWithStackTrace( + CentrifugoSendException(error), + stackTrace, + ); + } */ + } */ + + @override + Future close() { + _tearDownPingTimer(); + return super.close(); + } +} diff --git a/lib/src/client/centrifuge_interface.dart b/lib/src/client/centrifuge_interface.dart index 37112fa..4cd73ba 100644 --- a/lib/src/client/centrifuge_interface.dart +++ b/lib/src/client/centrifuge_interface.dart @@ -14,10 +14,10 @@ abstract interface class ICentrifuge { /// [url] is a URL of endpoint. Future connect(String url); - // Send asynchronous message to a server. This method makes sense - // only when using Centrifuge library for Go on a server side. In Centrifugo - // asynchronous message handler does not exist. - //Future send(List data); + /// Send asynchronous message to a server. This method makes sense + /// only when using Centrifuge library for Go on a server side. In Centrifugo + /// asynchronous message handler does not exist. + /* Future send(List data); */ /// Publish data to the channel. /* Future publish(String channel, List data); */ @@ -30,9 +30,6 @@ abstract interface class ICentrifuge { /// free all allocated resources. Future close(); - /// Send asynchronous message to the server. - /* Future send( data); */ - /// Send arbitrary RPC and wait for response. /* Future rpc(String method, data); */ } diff --git a/lib/src/model/config.dart b/lib/src/model/config.dart index dc58d0f..f0aa173 100644 --- a/lib/src/model/config.dart +++ b/lib/src/model/config.dart @@ -1,11 +1,23 @@ +import 'dart:async'; + import 'package:centrifuge_dart/src/model/pubspec.yaml.g.dart'; import 'package:meta/meta.dart'; /// Token used for authentication -//typedef CentrifugeToken = String; +typedef CentrifugeToken = String; /// Callback to get/refresh tokens -//typedef CentrifugeTokenCallback = Future Function(); +/// This callback is used for initial connection +/// and for refreshing expired tokens. +/// +/// If method returns null then connection will be established without token. +typedef CentrifugeTokenCallback = FutureOr Function(); + +/// Callback to get initial connection payload data. +/// For example to send JWT or other auth data. +/// +/// If method returns null then no payload will be sent at connect time. +typedef CentrifugeConnectionPayloadCallback = FutureOr?> Function(); /// {@template centrifuge_config} /// Centrifuge client common options. @@ -21,6 +33,8 @@ import 'package:meta/meta.dart'; final class CentrifugeConfig { /// {@macro centrifuge_config} CentrifugeConfig({ + this.getToken, + this.getPayload, ({Duration min, Duration max})? connectionRetryInterval, ({String name, String version})? client, this.timeout = const Duration(seconds: 15), @@ -41,15 +55,19 @@ final class CentrifugeConfig { /// {@macro centrifuge_config} factory CentrifugeConfig.defaultConfig() = CentrifugeConfig; - // TODO(plugfox): Add support for the following options. - /// The data send for the first request - //final List? data; - - /// The initial token used for authentication - //CentrifugeToken token; - /// Callback to get/refresh tokens - //final CentrifugeTokenCallback? getToken; + /// This callback is used for initial connection + /// and for refreshing expired tokens. + /// + /// If method returns null then connection will be established without token. + final CentrifugeTokenCallback? getToken; + + /// The data send for the first request + /// Callback to get initial connection payload data. + /// For example to send JWT or other auth data. + /// + /// If method returns null then no payload will be sent at connect time. + final CentrifugeConnectionPayloadCallback? getPayload; /// The [connectionRetryInterval] argument is specifying the /// [backoff full jitter strategy](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) for reconnecting. diff --git a/lib/src/model/messages.dart b/lib/src/model/messages.dart new file mode 100644 index 0000000..5724c4d --- /dev/null +++ b/lib/src/model/messages.dart @@ -0,0 +1 @@ +abstract interface class Message {}