From f345d1dd0e865f7e7851303db83a654d22b72c57 Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Thu, 3 Aug 2023 16:31:40 +0400 Subject: [PATCH] Add centrifuge observer --- examples/console/bin/main.dart | 8 ---- lib/src/client/centrifuge.dart | 39 +++++++------------ lib/src/client/centrifuge_interface.dart | 7 ---- lib/src/client/observer.dart | 22 +++++++++++ .../client_subscription_impl.dart | 23 +---------- .../server_subscription_impl.dart | 24 +----------- lib/src/subscription/subscription.dart | 5 --- 7 files changed, 39 insertions(+), 89 deletions(-) create mode 100644 lib/src/client/observer.dart diff --git a/examples/console/bin/main.dart b/examples/console/bin/main.dart index f28933e..231120f 100644 --- a/examples/console/bin/main.dart +++ b/examples/console/bin/main.dart @@ -37,14 +37,6 @@ void main([List? args]) { // e.g. `client.states.connected` client.states.listen((state) => print('State changed to: $state')); - // Handle all centrifuge exceptions. - client.errors.listen( - (error) => print( - 'Exception: ${error.exception}, ' - 'Stack trace: ${error.stackTrace}', - ), - ); - // TODO(plugfox): Read from stdin and send to channel. /* // Close client diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 444e419..6a89c88 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:centrifuge_dart/src/client/centrifuge_interface.dart'; import 'package:centrifuge_dart/src/client/config.dart'; import 'package:centrifuge_dart/src/client/disconnect_code.dart'; +import 'package:centrifuge_dart/src/client/observer.dart'; import 'package:centrifuge_dart/src/client/state.dart'; import 'package:centrifuge_dart/src/client/states_stream.dart'; import 'package:centrifuge_dart/src/model/channel_presence.dart'; @@ -28,7 +29,6 @@ import 'package:centrifuge_dart/src/transport/ws_protobuf_transport.dart'; import 'package:centrifuge_dart/src/util/event_queue.dart'; import 'package:centrifuge_dart/src/util/logger.dart' as logger; import 'package:meta/meta.dart'; -import 'package:stack_trace/stack_trace.dart' as st; /// {@template centrifuge} /// Centrifuge client. @@ -54,6 +54,9 @@ final class Centrifuge extends CentrifugeBase /// {@macro centrifuge} factory Centrifuge.connect(String url, [CentrifugeConfig? config]) => Centrifuge(config)..connect(url); + + /// The current [CentrifugeObserver] instance. + static CentrifugeObserver? observer; } /// {@nodoc} @@ -93,7 +96,9 @@ abstract base class CentrifugeBase implements ICentrifuge { /// {@nodoc} @protected @mustCallSuper - void _initCentrifuge() {} + void _initCentrifuge() { + Centrifuge.observer?.onCreate(this); + } /// Called when connection established. /// Right before [CentrifugeState$Connected] state. @@ -102,6 +107,7 @@ abstract base class CentrifugeBase implements ICentrifuge { @mustCallSuper void _onConnected(CentrifugeState$Connected state) { logger.fine('Connection established'); + Centrifuge.observer?.onConnected(this, state); } /// Called when connection lost. @@ -111,11 +117,15 @@ abstract base class CentrifugeBase implements ICentrifuge { @mustCallSuper void _onDisconnected(CentrifugeState$Disconnected state) { logger.fine('Connection lost'); + Centrifuge.observer?.onDisconnected(this, state); } @override @mustCallSuper - Future close() async {} + Future close() async { + await _transport.close(); + Centrifuge.observer?.onClose(this); + } } /// Mixin responsible for event receiving and distribution by controllers @@ -349,27 +359,7 @@ base mixin CentrifugeErrorsMixin on CentrifugeBase { @protected @nonVirtual void _emitError(CentrifugeException exception, StackTrace stackTrace) => - _errorsController.add( - ( - exception: exception, - stackTrace: st.Trace.from(stackTrace).terse, - ), - ); - - late final StreamController< - ({CentrifugeException exception, StackTrace stackTrace})> - _errorsController = StreamController< - ({CentrifugeException exception, StackTrace stackTrace})>.broadcast(); - - @override - late final Stream<({CentrifugeException exception, StackTrace stackTrace})> - errors = _errorsController.stream; - - @override - Future close() async { - await super.close(); - _errorsController.close().ignore(); - } + Centrifuge.observer?.onError(exception, stackTrace); } /// Mixin responsible for connection. @@ -465,7 +455,6 @@ base mixin CentrifugeConnectionMixin Future close() async { logger.fine('Interactively closing'); await super.close(); - await _transport.close(); } } diff --git a/lib/src/client/centrifuge_interface.dart b/lib/src/client/centrifuge_interface.dart index b09634e..6123359 100644 --- a/lib/src/client/centrifuge_interface.dart +++ b/lib/src/client/centrifuge_interface.dart @@ -16,10 +16,6 @@ abstract interface class ICentrifuge ICentrifugeEventReceiver, ICentrifugeClientSubscriptionsManager, ICentrifugePresenceOwner { - /// Stream of errors. - abstract final Stream< - ({CentrifugeException exception, StackTrace stackTrace})> errors; - /// Connect to the server. /// [url] is a URL of endpoint. Future connect(String url); @@ -42,9 +38,6 @@ abstract interface class ICentrifuge /// Send arbitrary RPC and wait for response. /* Future rpc(String method, data); */ - /// Publish data to the channel. - /* Future publish(String channel, List data); */ - /// Send History command. /* Future history(String channel, {int limit = 0, StreamPosition? since, bool reverse = false}); */ diff --git a/lib/src/client/observer.dart b/lib/src/client/observer.dart new file mode 100644 index 0000000..9a2f032 --- /dev/null +++ b/lib/src/client/observer.dart @@ -0,0 +1,22 @@ +import 'package:centrifuge_dart/centrifuge.dart'; +import 'package:centrifuge_dart/src/client/centrifuge_interface.dart'; + +/// An interface for observing the behavior of Centrifuge instances. +abstract class CentrifugeObserver { + /// Called whenever a [ICentrifuge] is instantiated. + void onCreate(ICentrifuge client) {} + + /// Called whenever a [ICentrifuge] client changes its state + /// to [CentrifugeState$Connecting]. + void onConnected(ICentrifuge client, CentrifugeState$Connected state) {} + + /// Called whenever a [ICentrifuge] client changes its state + /// to [CentrifugeState$Disconnected]. + void onDisconnected(ICentrifuge client, CentrifugeState$Disconnected state) {} + + /// Called whenever an error is thrown in any Centrifuge client. + void onError(CentrifugeException error, StackTrace stackTrace) {} + + /// Called whenever a [ICentrifuge] is closed. + void onClose(ICentrifuge client) {} +} diff --git a/lib/src/subscription/client_subscription_impl.dart b/lib/src/subscription/client_subscription_impl.dart index 14eada5..cf1f454 100644 --- a/lib/src/subscription/client_subscription_impl.dart +++ b/lib/src/subscription/client_subscription_impl.dart @@ -22,7 +22,6 @@ import 'package:centrifuge_dart/src/util/event_queue.dart'; import 'package:centrifuge_dart/src/util/logger.dart' as logger; import 'package:fixnum/fixnum.dart' as fixnum; import 'package:meta/meta.dart'; -import 'package:stack_trace/stack_trace.dart' as st; /// Client-side subscription implementation. /// {@nodoc} @@ -253,27 +252,7 @@ base mixin CentrifugeClientSubscriptionErrorsMixin @protected @nonVirtual void _emitError(CentrifugeException exception, StackTrace stackTrace) => - _errorsController.add( - ( - exception: exception, - stackTrace: st.Trace.from(stackTrace).terse, - ), - ); - - late final StreamController< - ({CentrifugeException exception, StackTrace stackTrace})> - _errorsController = StreamController< - ({CentrifugeException exception, StackTrace stackTrace})>.broadcast(); - - @override - late final Stream<({CentrifugeException exception, StackTrace stackTrace})> - errors = _errorsController.stream; - - @override - Future close([int code = 0, String reason = 'closed']) async { - await super.close(code, reason); - _errorsController.close().ignore(); - } + Centrifuge.observer?.onError(exception, stackTrace); } /// Mixin responsible for subscribing. diff --git a/lib/src/subscription/server_subscription_impl.dart b/lib/src/subscription/server_subscription_impl.dart index 7c3e733..4fe784f 100644 --- a/lib/src/subscription/server_subscription_impl.dart +++ b/lib/src/subscription/server_subscription_impl.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:centrifuge_dart/src/client/centrifuge.dart'; import 'package:centrifuge_dart/src/model/channel_presence.dart'; import 'package:centrifuge_dart/src/model/channel_push.dart'; import 'package:centrifuge_dart/src/model/connect.dart'; @@ -24,7 +25,6 @@ import 'package:centrifuge_dart/src/util/event_queue.dart'; import 'package:centrifuge_dart/src/util/logger.dart' as logger; import 'package:fixnum/fixnum.dart' as fixnum; import 'package:meta/meta.dart'; -import 'package:stack_trace/stack_trace.dart' as st; /// Server-side subscription implementation. /// {@nodoc} @@ -258,27 +258,7 @@ base mixin CentrifugeServerSubscriptionErrorsMixin @protected @nonVirtual void _emitError(CentrifugeException exception, StackTrace stackTrace) => - _errorsController.add( - ( - exception: exception, - stackTrace: st.Trace.from(stackTrace).terse, - ), - ); - - late final StreamController< - ({CentrifugeException exception, StackTrace stackTrace})> - _errorsController = StreamController< - ({CentrifugeException exception, StackTrace stackTrace})>.broadcast(); - - @override - late final Stream<({CentrifugeException exception, StackTrace stackTrace})> - errors = _errorsController.stream; - - @override - Future close([int code = 0, String reason = 'closed']) async { - await super.close(code, reason); - _errorsController.close().ignore(); - } + Centrifuge.observer?.onError(exception, stackTrace); } /// Mixin responsible for ready method. diff --git a/lib/src/subscription/subscription.dart b/lib/src/subscription/subscription.dart index cd87cda..dd751c8 100644 --- a/lib/src/subscription/subscription.dart +++ b/lib/src/subscription/subscription.dart @@ -1,6 +1,5 @@ import 'dart:async'; -import 'package:centrifuge_dart/src/model/exception.dart'; import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; @@ -30,10 +29,6 @@ abstract interface class ICentrifugeSubscription { /// Stream of received pushes from Centrifugo server for a channel. abstract final CentrifugePushesStream stream; - /// Errors stream. - abstract final Stream< - ({CentrifugeException exception, StackTrace stackTrace})> errors; - /// Await for subscription to be ready. /// Ready resolves when subscription successfully subscribed. /// Throws exceptions if called not in subscribing or subscribed state.