Skip to content

Commit

Permalink
Update centrifuge states
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 28, 2023
1 parent bbee0f5 commit a1fa907
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 60 deletions.
54 changes: 45 additions & 9 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import 'package:stack_trace/stack_trace.dart' as st;
/// {@endtemplate}
final class Centrifuge extends CentrifugeBase
with
CentrifugeStateMixin,
CentrifugeErrorsMixin,
CentrifugeConnectionMixin,
CentrifugeSendMixin,
Expand All @@ -43,7 +44,6 @@ abstract base class CentrifugeBase implements ICentrifuge {
CentrifugeBase(CentrifugeConfig config) : _config = config {
_transport = CentrifugeWSPBTransport(
config: config,
disconnectCallback: _onDisconnect,
);
_initCentrifuge();
}
Expand All @@ -54,14 +54,6 @@ abstract base class CentrifugeBase implements ICentrifuge {
@nonVirtual
late final ICentrifugeTransport _transport;

@override
@nonVirtual
CentrifugeState get state => _transport.state;

@override
late final CentrifugeStatesStream states =
CentrifugeStatesStream(_transport.states);

/// Centrifuge config.
/// {@nodoc}
@nonVirtual
Expand All @@ -88,6 +80,50 @@ abstract base class CentrifugeBase implements ICentrifuge {
Future<void> close() async {}
}

/// Mixin responsible for centrifuge states
/// {@nodoc}
@internal
base mixin CentrifugeStateMixin on CentrifugeBase {
@protected
@nonVirtual
late CentrifugeState _state = _transport.states.value;

@override
@nonVirtual
CentrifugeState get state => _state;

@override
@nonVirtual
late final CentrifugeStatesStream states =
CentrifugeStatesStream(_statesController.stream);

@override
void _initCentrifuge() {
_transport.states.addListener(_onStateChange);
super._initCentrifuge();
}

@protected
@nonVirtual
void _onStateChange(CentrifugeState newState) {
logger.info('State changed: ${_state.type} -> ${state.type}');
_statesController.add(_state = newState);
if (newState is CentrifugeState$Disconnected) _onDisconnect();
}

@protected
@nonVirtual
final StreamController<CentrifugeState> _statesController =
StreamController<CentrifugeState>.broadcast();

@override
Future<void> close() async {
await super.close();
_transport.states.removeListener(_onStateChange);
await _statesController.close();
}
}

/// Mixin responsible for errors stream.
/// {@nodoc}
@internal
Expand Down
9 changes: 3 additions & 6 deletions lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ import 'package:centrifuge_dart/src/client/state.dart';
import 'package:centrifuge_dart/src/model/stream_position.dart';
import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart';
import 'package:centrifuge_dart/src/subscription/subscription_config.dart';
import 'package:centrifuge_dart/src/util/notifier.dart';
import 'package:meta/meta.dart';

/// Class responsible for sending and receiving data from the server.
/// {@nodoc}
@internal
abstract interface class ICentrifugeTransport {
/// State of client.
/// State observable.
/// {@nodoc}
CentrifugeState get state;

/// Stream of client states.
/// {@nodoc}
abstract final Stream<CentrifugeState> states;
abstract final CentrifugeValueListenable<CentrifugeState> states;

/// Connect to the server.
/// [url] is a URL of endpoint.
Expand Down
44 changes: 8 additions & 36 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
import 'package:centrifuge_dart/src/transport/transport_protobuf_codec.dart';
import 'package:centrifuge_dart/src/util/logger.dart' as logger;
import 'package:centrifuge_dart/src/util/notifier.dart';
import 'package:meta/meta.dart';
import 'package:protobuf/protobuf.dart' as pb;
import 'package:ws/ws.dart';
Expand All @@ -20,9 +21,7 @@ abstract base class CentrifugeWSPBTransportBase
/// {@nodoc}
CentrifugeWSPBTransportBase({
required CentrifugeConfig config,
required void Function() disconnectCallback,
}) : _config = config,
_disconnectCallback = disconnectCallback,
_webSocket = WebSocketClient(
WebSocketOptions.selector(
js: () => WebSocketOptions.js(
Expand Down Expand Up @@ -52,10 +51,6 @@ abstract base class CentrifugeWSPBTransportBase
/// {@nodoc}
final CentrifugeConfig _config;

/// Callback for disconnect.
/// {@nodoc}
final void Function() _disconnectCallback;

/// Init transport, override this method to add custom logic.
/// {@nodoc}
@protected
Expand Down Expand Up @@ -364,38 +359,18 @@ base mixin CentrifugeWSPBStateHandlerMixin
/// {@nodoc}
StreamSubscription<WebSocketClientState>? _webSocketClosedStateSubscription;

/// Current state of client.
/// {@nodoc}
@override
@nonVirtual
CentrifugeState get state => _state;

/// {@nodoc}
@override
@nonVirtual
Stream<CentrifugeState> get states => _stateController.stream;

/// {@nodoc}
@protected
@nonVirtual
CentrifugeState _state = CentrifugeState$Disconnected(
late final CentrifugeValueNotifier<CentrifugeState> states =
CentrifugeValueNotifier(CentrifugeState$Disconnected(
timestamp: DateTime.now(),
closeCode: null,
closeReason: 'Not connected yet',
);

/// State controller.
/// {@nodoc}
@protected
@nonVirtual
late final StreamController<CentrifugeState> _stateController;
));

@override
void _initTransport() {
// Init state controller.
_stateController = StreamController<CentrifugeState>.broadcast(
/* onListen: () => _stateController.add(_state), */
);
super._initTransport();
}

Expand All @@ -404,17 +379,14 @@ base mixin CentrifugeWSPBStateHandlerMixin
@protected
@nonVirtual
void _setState(CentrifugeState state) {
if (_state.type == state.type) return;
logger.info('State changed: ${_state.type} -> ${state.type}');
_stateController.add(_state = state);
states.notify(state);
}

@protected
@nonVirtual
@pragma('vm:prefer-inline')
@pragma('dart2js:tryInline')
void _handleWebSocketClosedStates(WebSocketClientState$Closed state) {
_disconnectCallback();
_setState(
CentrifugeState$Disconnected(
timestamp: DateTime.now(),
Expand Down Expand Up @@ -506,7 +478,7 @@ base mixin CentrifugeWSPBHandlerMixin
@pragma('dart2js:tryInline')
void _onPing() {
_restartPingTimer();
if (state case CentrifugeState$Connected(:bool? sendPong)) {
if (states.value case CentrifugeState$Connected(:bool? sendPong)) {
if (sendPong != true) return;
_sendAsyncMessage(pb.PingRequest()).ignore();
logger.fine('Pong message sent');
Expand Down Expand Up @@ -553,7 +525,7 @@ base mixin CentrifugeWSPBSubscription
CentrifugeSubscriptionConfig config,
CentrifugeStreamPosition? since,
) async {
if (!state.isConnected) {
if (!states.value.isConnected) {
throw CentrifugeSubscriptionException(
channel: channel,
message: 'Centrifuge client is not connected',
Expand Down Expand Up @@ -658,7 +630,7 @@ base mixin CentrifugeWSPBPingPongMixin on CentrifugeWSPBTransportBase {
@nonVirtual
void _restartPingTimer() {
_tearDownPingTimer();
if (state case CentrifugeState$Connected(:Duration pingInterval)) {
if (states.value case CentrifugeState$Connected(:Duration pingInterval)) {
_pingTimer = Timer(
pingInterval + _config.serverPingDelay,
() => disconnect(
Expand Down
36 changes: 27 additions & 9 deletions lib/src/util/notifier.dart
Original file line number Diff line number Diff line change
@@ -1,36 +1,54 @@
import 'package:meta/meta.dart';

/// Notify about value changes.
/// {@nodoc}
typedef ValueChanged<T> = void Function(T value);

/// Notify about value changes.
/// {@nodoc}
final class CentrifugeValueNotifier<T> {
@internal
abstract interface class CentrifugeValueListenable<T> {
/// Current value.
/// {@nodoc}
T get value;

/// Add listener.
/// {@nodoc}
void addListener(ValueChanged<T> listener);

/// Remove listener.
/// {@nodoc}
void removeListener(ValueChanged<T> listener);
}

/// Notify about value changes.
/// {@nodoc}
@internal
final class CentrifugeValueNotifier<T> implements CentrifugeValueListenable<T> {
/// Notify about value changes.
/// {@nodoc}
CentrifugeValueNotifier(this._value);

/// Current value.
/// {@nodoc}
@override
T get value => _value;
T _value;

/// Notify about value changes.
/// {@nodoc}
void notify(T value) {
if (_value == value) return;
bool notify(T value) {
if (_value == value) return false;
_value = value;
for (var i = 0; i < _listeners.length; i++) _listeners[i](value);
return true;
}

/// Listeners.
/// {@nodoc}
final List<ValueChanged<T>> _listeners = <ValueChanged<T>>[];

/// Add listener.
/// {@nodoc}
@override
void addListener(ValueChanged<T> listener) => _listeners.add(listener);

/// Remove listener.
/// {@nodoc}
@override
void removeListener(ValueChanged<T> listener) => _listeners.remove(listener);
}

0 comments on commit a1fa907

Please sign in to comment.