From a1fa9070dc3b9da8d917f1d71d5e006918b08217 Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Fri, 28 Jul 2023 11:17:10 +0400 Subject: [PATCH] Update centrifuge states --- lib/src/client/centrifuge.dart | 54 ++++++++++++++++---- lib/src/transport/transport_interface.dart | 9 ++-- lib/src/transport/ws_protobuf_transport.dart | 44 +++------------- lib/src/util/notifier.dart | 36 +++++++++---- 4 files changed, 83 insertions(+), 60 deletions(-) diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 9341895..a2b364c 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -20,6 +20,7 @@ import 'package:stack_trace/stack_trace.dart' as st; /// {@endtemplate} final class Centrifuge extends CentrifugeBase with + CentrifugeStateMixin, CentrifugeErrorsMixin, CentrifugeConnectionMixin, CentrifugeSendMixin, @@ -43,7 +44,6 @@ abstract base class CentrifugeBase implements ICentrifuge { CentrifugeBase(CentrifugeConfig config) : _config = config { _transport = CentrifugeWSPBTransport( config: config, - disconnectCallback: _onDisconnect, ); _initCentrifuge(); } @@ -54,14 +54,6 @@ abstract base class CentrifugeBase implements ICentrifuge { @nonVirtual late final ICentrifugeTransport _transport; - @override - @nonVirtual - CentrifugeState get state => _transport.state; - - @override - late final CentrifugeStatesStream states = - CentrifugeStatesStream(_transport.states); - /// Centrifuge config. /// {@nodoc} @nonVirtual @@ -88,6 +80,50 @@ abstract base class CentrifugeBase implements ICentrifuge { Future close() async {} } +/// Mixin responsible for centrifuge states +/// {@nodoc} +@internal +base mixin CentrifugeStateMixin on CentrifugeBase { + @protected + @nonVirtual + late CentrifugeState _state = _transport.states.value; + + @override + @nonVirtual + CentrifugeState get state => _state; + + @override + @nonVirtual + late final CentrifugeStatesStream states = + CentrifugeStatesStream(_statesController.stream); + + @override + void _initCentrifuge() { + _transport.states.addListener(_onStateChange); + super._initCentrifuge(); + } + + @protected + @nonVirtual + void _onStateChange(CentrifugeState newState) { + logger.info('State changed: ${_state.type} -> ${state.type}'); + _statesController.add(_state = newState); + if (newState is CentrifugeState$Disconnected) _onDisconnect(); + } + + @protected + @nonVirtual + final StreamController _statesController = + StreamController.broadcast(); + + @override + Future close() async { + await super.close(); + _transport.states.removeListener(_onStateChange); + await _statesController.close(); + } +} + /// Mixin responsible for errors stream. /// {@nodoc} @internal diff --git a/lib/src/transport/transport_interface.dart b/lib/src/transport/transport_interface.dart index 826d2f4..bbd4772 100644 --- a/lib/src/transport/transport_interface.dart +++ b/lib/src/transport/transport_interface.dart @@ -4,19 +4,16 @@ import 'package:centrifuge_dart/src/client/state.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart'; import 'package:centrifuge_dart/src/subscription/subscription_config.dart'; +import 'package:centrifuge_dart/src/util/notifier.dart'; import 'package:meta/meta.dart'; /// Class responsible for sending and receiving data from the server. /// {@nodoc} @internal abstract interface class ICentrifugeTransport { - /// State of client. + /// State observable. /// {@nodoc} - CentrifugeState get state; - - /// Stream of client states. - /// {@nodoc} - abstract final Stream states; + abstract final CentrifugeValueListenable states; /// Connect to the server. /// [url] is a URL of endpoint. diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index 51ced74..f074f24 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -9,6 +9,7 @@ import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart'; import 'package:centrifuge_dart/src/transport/transport_interface.dart'; import 'package:centrifuge_dart/src/transport/transport_protobuf_codec.dart'; import 'package:centrifuge_dart/src/util/logger.dart' as logger; +import 'package:centrifuge_dart/src/util/notifier.dart'; import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart' as pb; import 'package:ws/ws.dart'; @@ -20,9 +21,7 @@ abstract base class CentrifugeWSPBTransportBase /// {@nodoc} CentrifugeWSPBTransportBase({ required CentrifugeConfig config, - required void Function() disconnectCallback, }) : _config = config, - _disconnectCallback = disconnectCallback, _webSocket = WebSocketClient( WebSocketOptions.selector( js: () => WebSocketOptions.js( @@ -52,10 +51,6 @@ abstract base class CentrifugeWSPBTransportBase /// {@nodoc} final CentrifugeConfig _config; - /// Callback for disconnect. - /// {@nodoc} - final void Function() _disconnectCallback; - /// Init transport, override this method to add custom logic. /// {@nodoc} @protected @@ -364,38 +359,18 @@ base mixin CentrifugeWSPBStateHandlerMixin /// {@nodoc} StreamSubscription? _webSocketClosedStateSubscription; - /// Current state of client. - /// {@nodoc} - @override - @nonVirtual - CentrifugeState get state => _state; - /// {@nodoc} @override @nonVirtual - Stream get states => _stateController.stream; - - /// {@nodoc} - @protected - @nonVirtual - CentrifugeState _state = CentrifugeState$Disconnected( + late final CentrifugeValueNotifier states = + CentrifugeValueNotifier(CentrifugeState$Disconnected( timestamp: DateTime.now(), closeCode: null, closeReason: 'Not connected yet', - ); - - /// State controller. - /// {@nodoc} - @protected - @nonVirtual - late final StreamController _stateController; + )); @override void _initTransport() { - // Init state controller. - _stateController = StreamController.broadcast( - /* onListen: () => _stateController.add(_state), */ - ); super._initTransport(); } @@ -404,9 +379,7 @@ base mixin CentrifugeWSPBStateHandlerMixin @protected @nonVirtual void _setState(CentrifugeState state) { - if (_state.type == state.type) return; - logger.info('State changed: ${_state.type} -> ${state.type}'); - _stateController.add(_state = state); + states.notify(state); } @protected @@ -414,7 +387,6 @@ base mixin CentrifugeWSPBStateHandlerMixin @pragma('vm:prefer-inline') @pragma('dart2js:tryInline') void _handleWebSocketClosedStates(WebSocketClientState$Closed state) { - _disconnectCallback(); _setState( CentrifugeState$Disconnected( timestamp: DateTime.now(), @@ -506,7 +478,7 @@ base mixin CentrifugeWSPBHandlerMixin @pragma('dart2js:tryInline') void _onPing() { _restartPingTimer(); - if (state case CentrifugeState$Connected(:bool? sendPong)) { + if (states.value case CentrifugeState$Connected(:bool? sendPong)) { if (sendPong != true) return; _sendAsyncMessage(pb.PingRequest()).ignore(); logger.fine('Pong message sent'); @@ -553,7 +525,7 @@ base mixin CentrifugeWSPBSubscription CentrifugeSubscriptionConfig config, CentrifugeStreamPosition? since, ) async { - if (!state.isConnected) { + if (!states.value.isConnected) { throw CentrifugeSubscriptionException( channel: channel, message: 'Centrifuge client is not connected', @@ -658,7 +630,7 @@ base mixin CentrifugeWSPBPingPongMixin on CentrifugeWSPBTransportBase { @nonVirtual void _restartPingTimer() { _tearDownPingTimer(); - if (state case CentrifugeState$Connected(:Duration pingInterval)) { + if (states.value case CentrifugeState$Connected(:Duration pingInterval)) { _pingTimer = Timer( pingInterval + _config.serverPingDelay, () => disconnect( diff --git a/lib/src/util/notifier.dart b/lib/src/util/notifier.dart index deeddc1..fb15ff9 100644 --- a/lib/src/util/notifier.dart +++ b/lib/src/util/notifier.dart @@ -1,36 +1,54 @@ +import 'package:meta/meta.dart'; + /// Notify about value changes. /// {@nodoc} typedef ValueChanged = void Function(T value); /// Notify about value changes. /// {@nodoc} -final class CentrifugeValueNotifier { +@internal +abstract interface class CentrifugeValueListenable { + /// Current value. + /// {@nodoc} + T get value; + + /// Add listener. + /// {@nodoc} + void addListener(ValueChanged listener); + + /// Remove listener. + /// {@nodoc} + void removeListener(ValueChanged listener); +} + +/// Notify about value changes. +/// {@nodoc} +@internal +final class CentrifugeValueNotifier implements CentrifugeValueListenable { /// Notify about value changes. /// {@nodoc} CentrifugeValueNotifier(this._value); - /// Current value. - /// {@nodoc} + @override T get value => _value; T _value; /// Notify about value changes. /// {@nodoc} - void notify(T value) { - if (_value == value) return; + bool notify(T value) { + if (_value == value) return false; _value = value; for (var i = 0; i < _listeners.length; i++) _listeners[i](value); + return true; } /// Listeners. /// {@nodoc} final List> _listeners = >[]; - /// Add listener. - /// {@nodoc} + @override void addListener(ValueChanged listener) => _listeners.add(listener); - /// Remove listener. - /// {@nodoc} + @override void removeListener(ValueChanged listener) => _listeners.remove(listener); }