From f38b98f149092d55733d3a34a52f48ab0576062c Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Sun, 23 Jul 2023 16:56:23 +0400 Subject: [PATCH] Update --- .vscode/tasks.json | 95 ++++++++-- examples/console/README.md | 4 +- examples/console/bin/main.dart | 28 ++- lib/centrifuge.dart | 11 +- lib/src/client/centrifuge.dart | 54 ++++-- lib/src/client/centrifuge_interface.dart | 8 +- lib/src/model/state.dart | 13 ++ lib/src/model/states_stream.dart | 36 ++++ lib/src/model/subscription.dart | 89 +++++++++ lib/src/model/subscription_state.dart | 189 +++++++++++++++++++ lib/src/transport/ws_protobuf_transport.dart | 16 +- 11 files changed, 499 insertions(+), 44 deletions(-) create mode 100644 lib/src/model/states_stream.dart create mode 100644 lib/src/model/subscription.dart create mode 100644 lib/src/model/subscription_state.dart diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 69cda69..2e20a5c 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -14,21 +14,60 @@ { "label": "Start Centrifugo Server", "type": "shell", - "command": "docker", - "args": [ - "run", - "-d", - "--rm", - "--ulimit=nofile=65536:65536", - "-p=8000:8000", - "--name=centrifugo", - "centrifugo/centrifugo:latest", - "centrifugo", - "--client_insecure", - "--admin", - "--admin_insecure", - "--log_level=debug" - ], + "windows": { + "command": "docker", + "args": [ + "run", + "-d", + "--rm", + "--ulimit=nofile=65536:65536", + "-p=8000:8000/tcp", + "--volume=${PWD}/config.json:/centrifugo/config.json:ro", + "--name=centrifugo", + "centrifugo/centrifugo:latest", + "centrifugo", + //"--client_insecure", + "--admin", + "--admin_insecure", + "--log_level=debug" + ] + }, + "linux": { + "command": "docker", + "args": [ + "run", + "-d", + "--rm", + "--ulimit=nofile=65536:65536", + "-p=8000:8000/tcp", + "--volume=${PWD}/config.json:/centrifugo/config.json:ro", + "--name=centrifugo", + "centrifugo/centrifugo:latest", + "centrifugo", + //"--client_insecure", + "--admin", + "--admin_insecure", + "--log_level=debug" + ] + }, + "osx": { + "command": "docker", + "args": [ + "run", + "-d", + "--rm", + "--ulimit=nofile=65536:65536", + "-p=8000:8000/tcp", + "--volume=${PWD}/config.json:/centrifugo/config.json:ro", + "--name=centrifugo", + "centrifugo/centrifugo:latest", + "centrifugo", + "--client_insecure", + "--admin", + "--admin_insecure", + "--log_level=debug" + ] + }, "group": { "kind": "none", "isDefault": true @@ -53,6 +92,32 @@ "reveal": "always", "panel": "dedicated" } + }, + { + "label": "Generate new user token", + "type": "shell", + "command": "docker", + "args": [ + "run", + "-it", + "--rm", + "--volume=${PWD}/config.json:/centrifugo/config.json:ro", + "--name=centrifugo-cli", + "centrifugo/centrifugo:latest", + "centrifugo", + "gentoken", + "--user=dart", + "--ttl=604800000" // 604800 + ], + "group": { + "kind": "none", + "isDefault": true + }, + "problemMatcher": [], + "presentation": { + "reveal": "always", + "panel": "dedicated" + } } ] } diff --git a/examples/console/README.md b/examples/console/README.md index 4613744..f90a9df 100644 --- a/examples/console/README.md +++ b/examples/console/README.md @@ -89,7 +89,7 @@ Bash: ```bash docker run -it --rm --volume ${PWD}/config.json:/centrifugo/config.json:ro \ --name centrifugo-cli centrifugo/centrifugo:latest \ - centrifugo gensubtoken --user dart --channel chat:index + centrifugo gentoken --user dart ``` PowerShell: @@ -97,7 +97,7 @@ PowerShell: ```powershell docker run -it --rm --volume ${PWD}/config.json:/centrifugo/config.json:ro ` --name centrifugo-cli centrifugo/centrifugo:latest ` - centrifugo gensubtoken --user dart --channel chat:index + centrifugo gentoken --user dart ``` ### Run Centrifugo diff --git a/examples/console/bin/main.dart b/examples/console/bin/main.dart index de5fb47..4a0f628 100644 --- a/examples/console/bin/main.dart +++ b/examples/console/bin/main.dart @@ -6,22 +6,44 @@ import 'dart:io' as io show exit, Platform; import 'package:args/args.dart' show ArgParser; import 'package:centrifuge_dart/centrifuge.dart'; +const url = 'ws://localhost:8000/connection/websocket?format=protobuf'; + void main([List? args]) { final options = _extractOptions(args ?? const []); runZonedGuarded( () async { + // Create centrifuge client. final client = Centrifuge( CentrifugeConfig( client: ( name: 'Centrifuge Console Example', version: '0.0.1', ), + getToken: () => + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJkYXJ0IiwiZXhwIj' + 'oyMjk0OTE1MTMyLCJpYXQiOjE2OTAxMTUxMzJ9.hIGDXKn-eMdsdj57wn6-4y5p' + 'k0tZcKoJCu0qxuuWSoQ', ), ); - await client - .connect('ws://localhost:8000/connection/websocket?format=protobuf'); - await Future.delayed(const Duration(seconds: 3)); + // Connect to centrifuge server using provided URL. + await client.connect(url); + + // Output current client state. + print('Current state after connect: ${client.state}'); + + // State changes. + // Or you can observe specific state changes. + // e.g. `client.states.connected` + client.states.listen((state) => print('State changed to: $state')); + + // Handle all centrifuge errors. + client.errors.listen( + (error) => print( + 'Exception: ${error.exception}, ' + 'Stack trace: ${error.stackTrace}', + ), + ); // TODO(plugfox): Read from stdin and send to channel. diff --git a/lib/centrifuge.dart b/lib/centrifuge.dart index 6285446..93b550e 100644 --- a/lib/centrifuge.dart +++ b/lib/centrifuge.dart @@ -1,7 +1,8 @@ library centrifuge; -export 'src/client/centrifuge.dart' show Centrifuge; -export 'src/model/config.dart'; -export 'src/model/exception.dart'; -export 'src/model/jwt.dart'; -export 'src/model/state.dart'; +export 'package:centrifuge_dart/src/client/centrifuge.dart' show Centrifuge; +export 'package:centrifuge_dart/src/model/config.dart'; +export 'package:centrifuge_dart/src/model/exception.dart'; +export 'package:centrifuge_dart/src/model/jwt.dart'; +export 'package:centrifuge_dart/src/model/state.dart'; +export 'package:centrifuge_dart/src/model/states_stream.dart'; diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 440df7f..25938ab 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -4,6 +4,7 @@ import 'package:centrifuge_dart/src/client/centrifuge_interface.dart'; import 'package:centrifuge_dart/src/model/config.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; import 'package:centrifuge_dart/src/model/state.dart'; +import 'package:centrifuge_dart/src/model/states_stream.dart'; import 'package:centrifuge_dart/src/transport/transport_interface.dart'; import 'package:centrifuge_dart/src/transport/ws_protobuf_transport.dart'; import 'package:centrifuge_dart/src/util/logger.dart' as logger; @@ -12,7 +13,8 @@ import 'package:meta/meta.dart'; /// {@template centrifuge} /// Centrifuge client. /// {@endtemplate} -final class Centrifuge extends CentrifugeBase with CentrifugeConnectionMixin { +final class Centrifuge extends CentrifugeBase + with CentrifugeErrorsMixin, CentrifugeConnectionMixin { /// {@macro centrifuge} Centrifuge([CentrifugeConfig? config]) : super(config ?? CentrifugeConfig.defaultConfig()); @@ -45,7 +47,8 @@ abstract base class CentrifugeBase implements ICentrifuge { CentrifugeState get state => _transport.state; @override - Stream get states => _transport.states; + late final CentrifugeStatesStream states = + CentrifugeStatesStream(_transport.states); /// Centrifuge config. /// {@nodoc} @@ -67,19 +70,44 @@ abstract base class CentrifugeBase implements ICentrifuge { /// Mixin responsible for connection. /// {@nodoc} @internal -base mixin CentrifugeConnectionMixin on CentrifugeBase { +base mixin CentrifugeErrorsMixin on CentrifugeBase { + @protected + @nonVirtual + void _emitError(CentrifugeException exception, StackTrace stackTrace) => + _errorsController.add((exception: exception, stackTrace: stackTrace)); + + late final StreamController< + ({CentrifugeException exception, StackTrace stackTrace})> + _errorsController = StreamController< + ({CentrifugeException exception, StackTrace stackTrace})>.broadcast(); + + @override + late final Stream<({CentrifugeException exception, StackTrace stackTrace})> + errors = _errorsController.stream; + + @override + Future close() async { + await super.close(); + _errorsController.close().ignore(); + } +} + +/// Mixin responsible for connection. +/// {@nodoc} +@internal +base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin { @override Future connect(String url) async { logger.fine('Interactively connecting to $url'); try { await _transport.connect(url); - } on CentrifugeException { + } on CentrifugeException catch (error, stackTrace) { + _emitError(error, stackTrace); rethrow; } on Object catch (error, stackTrace) { - Error.throwWithStackTrace( - CentrifugeConnectionException(error), - stackTrace, - ); + final centrifugeException = CentrifugeConnectionException(error); + _emitError(centrifugeException, stackTrace); + Error.throwWithStackTrace(centrifugeException, stackTrace); } } @@ -88,13 +116,13 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase { logger.fine('Interactively disconnecting'); try { await _transport.disconnect(0, 'Disconnect called'); - } on CentrifugeException { + } on CentrifugeException catch (error, stackTrace) { + _emitError(error, stackTrace); rethrow; } on Object catch (error, stackTrace) { - Error.throwWithStackTrace( - CentrifugeDisconnectionException(error), - stackTrace, - ); + final centrifugeException = CentrifugeConnectionException(error); + _emitError(centrifugeException, stackTrace); + Error.throwWithStackTrace(centrifugeException, stackTrace); } } diff --git a/lib/src/client/centrifuge_interface.dart b/lib/src/client/centrifuge_interface.dart index f89b1ee..5c4403f 100644 --- a/lib/src/client/centrifuge_interface.dart +++ b/lib/src/client/centrifuge_interface.dart @@ -1,4 +1,4 @@ -import 'package:centrifuge_dart/src/model/state.dart'; +import 'package:centrifuge_dart/centrifuge.dart'; /// Centrifuge client interface. abstract interface class ICentrifuge { @@ -6,7 +6,11 @@ abstract interface class ICentrifuge { CentrifugeState get state; /// Stream of client states. - abstract final Stream states; + abstract final CentrifugeStatesStream states; + + /// Stream of errors. + abstract final Stream< + ({CentrifugeException exception, StackTrace stackTrace})> errors; /* abstract final Stream publications; */ diff --git a/lib/src/model/state.dart b/lib/src/model/state.dart index 00ca5e2..0f957da 100644 --- a/lib/src/model/state.dart +++ b/lib/src/model/state.dart @@ -1,3 +1,5 @@ +import 'dart:convert'; + import 'package:meta/meta.dart'; /// {@template state} @@ -29,6 +31,7 @@ import 'package:meta/meta.dart'; /// Also, this can happen due to server advice from a server, /// or due to a terminal problem that happened on the client-side. /// {@endtemplate} +@immutable sealed class CentrifugeState extends _$CentrifugeStateBase { /// {@macro state} const CentrifugeState(super.timestamp); @@ -59,6 +62,7 @@ sealed class CentrifugeState extends _$CentrifugeStateBase { bool? sendPong, String? session, String? node, + List? data, }) = CentrifugeState$Connected; /// Permanently closed @@ -111,6 +115,10 @@ sealed class CentrifugeState extends _$CentrifugeStateBase { }, session: json['session']?.toString(), node: json['node']?.toString(), + data: switch (json['data']) { + String data when data.isNotEmpty => base64Decode(data), + _ => null, + }, ), ('closed', int timestamp, _) => CentrifugeState.closed( timestamp: DateTime.fromMicrosecondsSinceEpoch(timestamp), @@ -261,6 +269,7 @@ final class CentrifugeState$Connected extends CentrifugeState this.sendPong, this.session, this.node, + this.data, }) : super(timestamp ?? DateTime.now()); @override @@ -296,6 +305,9 @@ final class CentrifugeState$Connected extends CentrifugeState /// Server node ID. final String? node; + /// Additional data returned from server on connect. + final List? data; + @override bool get isDisconnected => false; @@ -328,6 +340,7 @@ final class CentrifugeState$Connected extends CentrifugeState if (sendPong != null) 'sendPong': sendPong, if (session != null) 'session': session, if (node != null) 'node': node, + if (data != null) 'data': base64Encode(data!), }; @override diff --git a/lib/src/model/states_stream.dart b/lib/src/model/states_stream.dart new file mode 100644 index 0000000..183f79b --- /dev/null +++ b/lib/src/model/states_stream.dart @@ -0,0 +1,36 @@ +import 'dart:async'; + +import 'package:centrifuge_dart/src/model/state.dart'; + +/// Stream of Centrifuge's [CentrifugeState] changes. +/// {@category Client} +/// {@category Entity} +final class CentrifugeStatesStream extends StreamView { + /// Stream of Centrifuge's [CentrifugeState] changes. + CentrifugeStatesStream(super.stream); + + /// Connection has not yet been established, but the WebSocket is trying. + late final Stream disconnected = + whereType(); + + /// Disconnected state + late final Stream connecting = + whereType(); + + /// Connected + late final Stream connected = + whereType(); + + /// Permanently closed + late final Stream closed = + whereType(); + + /// Filtered stream of data of [CentrifugeState]. + Stream whereType() => + transform(StreamTransformer.fromHandlers( + handleData: (data, sink) => switch (data) { + T valid => sink.add(valid), + _ => null, + }, + )).asBroadcastStream(); +} diff --git a/lib/src/model/subscription.dart b/lib/src/model/subscription.dart new file mode 100644 index 0000000..109a9c6 --- /dev/null +++ b/lib/src/model/subscription.dart @@ -0,0 +1,89 @@ +import 'package:fixnum/fixnum.dart' as fixnum; +import 'package:meta/meta.dart'; + +/// {@template subscription} +/// Centrifuge subscription interface. +/// {@endtemplate} +abstract interface class ICentrifugeSubscription { + /// Channel name. + abstract final String channel; +} + +/// {@template client_subscription} +/// Centrifuge client-side subscription representation. +/// +/// Client allows subscribing on channels. +/// This can be done by creating Subscription object. +/// +/// When a newSubscription method is called Client allocates a new Subscription +/// instance and saves it in the internal subscription registry. +/// Having a registry of allocated subscriptions allows SDK to manage +/// resubscribes upon reconnecting to a server. +/// +/// Centrifugo connectors do not allow creating two subscriptions +/// to the same channel – in this case, newSubscription can throw an exception. +/// +/// Subscription has 3 states: +/// +/// - `unsubscribed` +/// - `subscribing` +/// - `subscribed` +/// +/// When a new Subscription is created it has an `unsubscribed` state. +/// {@endtemplate} +@immutable +final class CentrifugeClientSubscription implements ICentrifugeSubscription { + /// {@macro client_subscription} + const CentrifugeClientSubscription({ + required this.channel, + }); + + @override + final String channel; + + @override + String toString() => 'CentrifugeClientSubscription{channel: $channel}'; +} + +/// {@template server_subscription} +/// Centrifuge server-side subscription representation. +/// +/// We encourage using client-side subscriptions where possible +/// as they provide a better control and isolation from connection. +/// But in some cases you may want to use server-side subscriptions +/// (i.e. subscriptions created by server upon connection establishment). +/// +/// Technically, client SDK keeps server-side subscriptions +/// in internal registry, similar to client-side subscriptions +/// but without possibility to control them. +/// {@endtemplate} +@immutable +final class CentrifugeServerSubscription implements ICentrifugeSubscription { + /// {@macro server_subscription} + const CentrifugeServerSubscription({ + required this.channel, + required this.recoverable, + required this.offset, + required this.epoch, + }); + + @override + final String channel; + + /// Recoverable flag. + final bool recoverable; + + /// Offset. + final fixnum.Int64 offset; + + /// Epoch. + final String epoch; + + /* publish(channel, data) + history(channel, options) + presence(channel) + presenceStats(channel) */ + + @override + String toString() => 'CentrifugeServerSubscription{channel: $channel}'; +} diff --git a/lib/src/model/subscription_state.dart b/lib/src/model/subscription_state.dart new file mode 100644 index 0000000..ad62aef --- /dev/null +++ b/lib/src/model/subscription_state.dart @@ -0,0 +1,189 @@ +import 'package:meta/meta.dart'; + +/// {@template subscription_state} +/// Subscription has 3 states: +/// +/// - `unsubscribed` +/// - `subscribing` +/// - `subscribed` +/// +/// When a new Subscription is created it has an `unsubscribed` state. +/// {@endtemplate} +@immutable +sealed class CentrifugeSubscriptionState + extends _$CentrifugeSubscriptionStateBase { + /// {@macro subscription_state} + const CentrifugeSubscriptionState(); + + /// Unsubscribed state + /// {@macro subscription_state} + const factory CentrifugeSubscriptionState.unsubscribed() = + CentrifugeSubscriptionState$Unsubscribed; + + /// Subscribing + /// {@macro subscription_state} + const factory CentrifugeSubscriptionState.subscribing() = + CentrifugeSubscriptionState$Subscribing; + + /// Subscribed + /// {@macro subscription_state} + const factory CentrifugeSubscriptionState.subscribed() = + CentrifugeSubscriptionState$Subscribed; +} + +/// Unsubscribed state +/// +/// {@nodoc} +final class CentrifugeSubscriptionState$Unsubscribed + extends CentrifugeSubscriptionState with _$CentrifugeSubscriptionState { + /// {@nodoc} + const CentrifugeSubscriptionState$Unsubscribed(); + + @override + R map({ + required CentrifugeSubscriptionStateMatch + unsubscribed, + required CentrifugeSubscriptionStateMatch + subscribing, + required CentrifugeSubscriptionStateMatch + subscribed, + }) => + unsubscribed(this); + + @override + int get hashCode => 0; + + @override + bool operator ==(Object other) => identical(this, other); + + @override + String toString() => 'unsubscribed'; +} + +/// Subscribing +/// {@nodoc} +final class CentrifugeSubscriptionState$Subscribing + extends CentrifugeSubscriptionState with _$CentrifugeSubscriptionState { + /// {@nodoc} + const CentrifugeSubscriptionState$Subscribing(); + + @override + R map({ + required CentrifugeSubscriptionStateMatch + unsubscribed, + required CentrifugeSubscriptionStateMatch + subscribing, + required CentrifugeSubscriptionStateMatch + subscribed, + }) => + subscribing(this); + + @override + int get hashCode => 1; + + @override + bool operator ==(Object other) => identical(this, other); + + @override + String toString() => 'subscribing'; +} + +/// Subscribed +/// {@nodoc} +final class CentrifugeSubscriptionState$Subscribed + extends CentrifugeSubscriptionState with _$CentrifugeSubscriptionState { + /// {@nodoc} + const CentrifugeSubscriptionState$Subscribed(); + + @override + R map({ + required CentrifugeSubscriptionStateMatch + unsubscribed, + required CentrifugeSubscriptionStateMatch + subscribing, + required CentrifugeSubscriptionStateMatch + subscribed, + }) => + subscribed(this); + + @override + int get hashCode => 2; + + @override + bool operator ==(Object other) => identical(this, other); + + @override + String toString() => 'subscribed'; +} + +/// {@nodoc} +base mixin _$CentrifugeSubscriptionState on CentrifugeSubscriptionState {} + +/// Pattern matching for [CentrifugeSubscriptionState]. +typedef CentrifugeSubscriptionStateMatch + = R Function(S state); + +/// {@nodoc} +@immutable +abstract base class _$CentrifugeSubscriptionStateBase { + /// {@nodoc} + const _$CentrifugeSubscriptionStateBase(); + + /// Pattern matching for [CentrifugeSubscriptionState]. + R map({ + required CentrifugeSubscriptionStateMatch + unsubscribed, + required CentrifugeSubscriptionStateMatch + subscribing, + required CentrifugeSubscriptionStateMatch + subscribed, + }); + + /// Pattern matching for [CentrifugeSubscriptionState]. + R maybeMap({ + required R Function() orElse, + CentrifugeSubscriptionStateMatch? + unsubscribed, + CentrifugeSubscriptionStateMatch? + subscribing, + CentrifugeSubscriptionStateMatch? + subscribed, + }) => + map( + unsubscribed: unsubscribed ?? (_) => orElse(), + subscribing: subscribing ?? (_) => orElse(), + subscribed: subscribed ?? (_) => orElse(), + ); + + /// Pattern matching for [CentrifugeSubscriptionState]. + R? mapOrNull({ + CentrifugeSubscriptionStateMatch? + unsubscribed, + CentrifugeSubscriptionStateMatch? + subscribing, + CentrifugeSubscriptionStateMatch? + subscribed, + }) => + map( + unsubscribed: unsubscribed ?? (_) => null, + subscribing: subscribing ?? (_) => null, + subscribed: subscribed ?? (_) => null, + ); +} diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index aeb084b..be405ab 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -305,7 +305,15 @@ base mixin CentrifugeWebSocketConnectionMixin ..version = _config.client.version; // TODO(plugfox): add subscriptions. // TODO(plugfox): Send request. - final result = await _sendMessage(request, pb.ConnectResult()); + final pb.ConnectResult result; + try { + result = await _sendMessage(request, pb.ConnectResult()); + } on Object catch (error, stackTrace) { + Error.throwWithStackTrace( + CentrifugeConnectionException(error), + stackTrace, + ); + } if (!_webSocket.state.readyState.isOpen) { throw StateError('Connection closed during connection process'); } @@ -321,6 +329,7 @@ base mixin CentrifugeWebSocketConnectionMixin pingInterval: result.hasPing() ? Duration(seconds: result.ping) : null, sendPong: result.hasPong() ? result.pong : null, session: result.hasSession() ? result.session : null, + data: result.hasData() ? result.data : null, )); } on Object { _setState(CentrifugeState$Disconnected()); @@ -371,9 +380,8 @@ base mixin CentrifugeWebSocketStateHandlerMixin void _initTransport() { // Init state controller. _stateController = StreamController.broadcast( - onListen: () => _stateController.add(_state), - onCancel: () => _stateController.close(), - ); + /* onListen: () => _stateController.add(_state), */ + ); super._initTransport(); }