From 4f31fc4196af4bcdb72980b2097e377721f15e1d Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Thu, 3 Aug 2023 01:11:17 +0400 Subject: [PATCH] Add since to state --- lib/src/client/centrifuge.dart | 28 ++++++++++---- .../client_subscription_impl.dart | 16 ++++++-- .../client_subscription_manager.dart | 2 +- .../server_subscription_impl.dart | 13 ++++++- .../server_subscription_manager.dart | 16 ++++++-- lib/src/subscription/subscription.dart | 3 ++ lib/src/subscription/subscription_state.dart | 38 ++++++++++++++----- lib/src/transport/transport_interface.dart | 3 +- lib/src/transport/ws_protobuf_transport.dart | 34 ++++++++++++----- 9 files changed, 116 insertions(+), 37 deletions(-) diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index d27596b..7cc60ca 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -199,10 +199,10 @@ base mixin CentrifugeEventReceiverMixin 'and user ${leave.info.user}'); _presenceController.add(leave); _leaveController.add(leave); - case CentrifugeSubscribe _: - break; // For server side subscriptions. - case CentrifugeUnsubscribe _: - break; // For server side subscriptions. + case CentrifugeSubscribe subscribe: + _serverSubscriptionManager.subscribe(subscribe); + case CentrifugeUnsubscribe unsubscribe: + _serverSubscriptionManager.unsubscribe(unsubscribe); case CentrifugeConnect _: break; case CentrifugeDisconnect event: @@ -383,7 +383,9 @@ base mixin CentrifugeConnectionMixin try { _refreshTimer?.cancel(); _refreshTimer = null; - await _transport.connect(url); + final subs = _serverSubscriptionManager.subscriptions.values + .toList(growable: false); + await _transport.connect(url, subs); } on CentrifugeException catch (error, stackTrace) { _emitError(error, stackTrace); rethrow; @@ -544,7 +546,7 @@ base mixin CentrifugeClientSubscriptionMixin @override Future close() async { await super.close(); - _clientSubscriptionManager.removeAll(); + _clientSubscriptionManager.close(); } } @@ -552,10 +554,22 @@ base mixin CentrifugeClientSubscriptionMixin /// {@nodoc} @internal base mixin CentrifugeServerSubscriptionMixin on CentrifugeBase { + @override + void _onConnected(CentrifugeState$Connected state) { + super._onConnected(state); + _serverSubscriptionManager.setSubscribedAll(); + } + + @override + void _onDisconnected(CentrifugeState$Disconnected state) { + super._onDisconnected(state); + _serverSubscriptionManager.setSubscribingAll(); + } + @override Future close() async { await super.close(); - _serverSubscriptionManager.setUnsubscribed(); + _serverSubscriptionManager.close(); } } diff --git a/lib/src/subscription/client_subscription_impl.dart b/lib/src/subscription/client_subscription_impl.dart index 82630f7..9a21569 100644 --- a/lib/src/subscription/client_subscription_impl.dart +++ b/lib/src/subscription/client_subscription_impl.dart @@ -65,6 +65,12 @@ abstract base class CentrifugeClientSubscriptionBase /// Offset of last received publication. late fixnum.Int64 _offset; + @override + CentrifugeStreamPosition? get since => switch (state.since?.epoch) { + String epoch => (epoch: epoch, offset: _offset), + _ => null, + }; + /// Weak reference to transport. /// {@nodoc} @nonVirtual @@ -287,7 +293,10 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin } _refreshTimer?.cancel(); _refreshTimer = null; - _setState(CentrifugeSubscriptionState$Subscribing(since: state.since)); + _setState(CentrifugeSubscriptionState$Subscribing( + since: since ?? state.since, + recoverable: state.recoverable, + )); final subscribed = await _transport.subscribe( channel, _config, @@ -302,7 +311,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin final offset = subscribed.since?.offset; if (offset != null && offset > _offset) _offset = offset; _setState(CentrifugeSubscriptionState$Subscribed( - since: subscribed.since, + since: subscribed.since ?? since ?? state.since, recoverable: subscribed.recoverable, ttl: subscribed.ttl, )); @@ -380,7 +389,8 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin _setState(CentrifugeSubscriptionState.unsubscribed( code: code, reason: reason, - since: state.since, + since: since ?? state.since, + recoverable: state.recoverable, )); try { await _transport.unsubscribe(channel, _config); diff --git a/lib/src/subscription/client_subscription_manager.dart b/lib/src/subscription/client_subscription_manager.dart index 7cc3aea..4e5e73e 100644 --- a/lib/src/subscription/client_subscription_manager.dart +++ b/lib/src/subscription/client_subscription_manager.dart @@ -112,7 +112,7 @@ final class ClientSubscriptionManager { /// Remove all subscriptions for the specific client from internal registry. /// {@nodoc} - void removeAll([ + void close([ int code = 0, String reason = 'client closed', ]) { diff --git a/lib/src/subscription/server_subscription_impl.dart b/lib/src/subscription/server_subscription_impl.dart index 53afe67..8eb9d33 100644 --- a/lib/src/subscription/server_subscription_impl.dart +++ b/lib/src/subscription/server_subscription_impl.dart @@ -61,6 +61,12 @@ abstract base class CentrifugeServerSubscriptionBase @override final String channel; + @override + CentrifugeStreamPosition? get since => switch (state.since?.epoch) { + String epoch => (epoch: epoch, offset: _offset), + _ => null, + }; + /// Offset of last received publication. fixnum.Int64 _offset = fixnum.Int64.ZERO; @@ -198,15 +204,18 @@ base mixin CentrifugeServerSubscriptionEventReceiverMixin _presenceController.add(leave); _leaveController.add(leave); case CentrifugeSubscribe sub: + final offset = sub.streamPosition?.offset; + if (offset != null && offset > _offset) _offset = offset; _setState(CentrifugeSubscriptionState.subscribed( - since: sub.streamPosition, + since: sub.streamPosition ?? since ?? state.since, recoverable: sub.recoverable, )); case CentrifugeUnsubscribe unsub: _setState(CentrifugeSubscriptionState.unsubscribed( code: unsub.code, reason: unsub.reason, - since: state.since, + recoverable: state.recoverable, + since: since ?? state.since, )); case CentrifugeConnect _: break; diff --git a/lib/src/subscription/server_subscription_manager.dart b/lib/src/subscription/server_subscription_manager.dart index 3971ccb..a5418ab 100644 --- a/lib/src/subscription/server_subscription_manager.dart +++ b/lib/src/subscription/server_subscription_manager.dart @@ -1,6 +1,8 @@ import 'dart:collection'; import 'package:centrifuge_dart/src/model/channel_push.dart'; +import 'package:centrifuge_dart/src/model/subscribe.dart'; +import 'package:centrifuge_dart/src/model/unsubscribe.dart'; import 'package:centrifuge_dart/src/subscription/server_subscription_impl.dart'; import 'package:centrifuge_dart/src/subscription/subscription.dart'; import 'package:centrifuge_dart/src/transport/transport_interface.dart'; @@ -35,11 +37,17 @@ final class ServerSubscriptionManager { entry.key: entry.value, }); + /// Called on [CentrifugeSubscribe] push from server. + void subscribe(CentrifugeSubscribe subscribe) {} + + /// Called on [CentrifugeUnsubscribe] push from server. + void unsubscribe(CentrifugeUnsubscribe subscribe) {} + /// Called when subscribed to a server-side channel upon Client moving to /// connected state or during connection lifetime if server sends Subscribe /// push message. /// {@nodoc} - void setSubscribed() { + void setSubscribedAll() { for (final entry in _channelSubscriptions.values) {} } @@ -47,19 +55,19 @@ final class ServerSubscriptionManager { /// explicitly disconnected. Client continue keeping server-side subscription /// registry with stream position information where applicable. /// {@nodoc} - void setSubscribing() { + void setSubscribingAll() { for (final entry in _channelSubscriptions.values) {} } /// Called when server sent unsubscribe push or server-side subscription /// previously existed in SDK registry disappeared upon Client reconnect. /// {@nodoc} - void setUnsubscribed() { + void setUnsubscribedAll() { for (final entry in _channelSubscriptions.values) {} } void close() { - setUnsubscribed(); + setUnsubscribedAll(); _channelSubscriptions.clear(); } diff --git a/lib/src/subscription/subscription.dart b/lib/src/subscription/subscription.dart index 28ad582..cd87cda 100644 --- a/lib/src/subscription/subscription.dart +++ b/lib/src/subscription/subscription.dart @@ -21,6 +21,9 @@ abstract interface class ICentrifugeSubscription { /// Current subscription state. abstract final CentrifugeSubscriptionState state; + /// Offset of last successfully received message. + abstract final CentrifugeStreamPosition? since; + /// Stream of subscription states. abstract final CentrifugeSubscriptionStateStream states; diff --git a/lib/src/subscription/subscription_state.dart b/lib/src/subscription/subscription_state.dart index 62301e2..69fde7e 100644 --- a/lib/src/subscription/subscription_state.dart +++ b/lib/src/subscription/subscription_state.dart @@ -16,7 +16,10 @@ import 'package:meta/meta.dart'; sealed class CentrifugeSubscriptionState extends _$CentrifugeSubscriptionStateBase { /// {@macro subscription_state} - const CentrifugeSubscriptionState(super.timestamp, super.since); + const CentrifugeSubscriptionState( + {required super.timestamp, + required super.since, + required super.recoverable}); /// Unsubscribed /// {@macro subscription_state} @@ -25,6 +28,7 @@ sealed class CentrifugeSubscriptionState required String reason, DateTime? timestamp, ({fixnum.Int64 offset, String epoch})? since, + bool recoverable, }) = CentrifugeSubscriptionState$Unsubscribed; /// Subscribing @@ -32,6 +36,7 @@ sealed class CentrifugeSubscriptionState factory CentrifugeSubscriptionState.subscribing({ DateTime? timestamp, ({fixnum.Int64 offset, String epoch})? since, + bool recoverable, }) = CentrifugeSubscriptionState$Subscribing; /// Subscribed @@ -57,7 +62,11 @@ final class CentrifugeSubscriptionState$Unsubscribed required this.reason, DateTime? timestamp, ({fixnum.Int64 offset, String epoch})? since, - }) : super(timestamp ?? DateTime.now(), since); + bool recoverable = false, + }) : super( + timestamp: timestamp ?? DateTime.now(), + since: since, + recoverable: recoverable); /// Unsubscribe code. final int code; @@ -108,7 +117,11 @@ final class CentrifugeSubscriptionState$Subscribing CentrifugeSubscriptionState$Subscribing({ DateTime? timestamp, ({fixnum.Int64 offset, String epoch})? since, - }) : super(timestamp ?? DateTime.now(), since); + bool recoverable = false, + }) : super( + timestamp: timestamp ?? DateTime.now(), + since: since, + recoverable: recoverable); @override bool get isUnsubscribed => false; @@ -153,12 +166,12 @@ final class CentrifugeSubscriptionState$Subscribed CentrifugeSubscriptionState$Subscribed({ DateTime? timestamp, ({fixnum.Int64 offset, String epoch})? since, - this.recoverable = false, + bool recoverable = false, this.ttl, - }) : super(timestamp ?? DateTime.now(), since); - - /// Whether channel is recoverable. - final bool recoverable; + }) : super( + timestamp: timestamp ?? DateTime.now(), + since: since, + recoverable: recoverable); /// Time to live in seconds. final DateTime? ttl; @@ -208,7 +221,11 @@ typedef CentrifugeSubscriptionStateMatch connect(String url); + Future connect(String url, List subs); /// Send asynchronous message to a server. This method makes sense /// only when using Centrifuge library for Go on a server side. In Centrifuge diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index dd27f74..542ad54 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -80,7 +80,8 @@ abstract base class CentrifugeWSPBTransportBase @override @mustCallSuper - Future connect(String url) async {} + Future connect( + String url, List subs) async {} @override @mustCallSuper @@ -317,9 +318,10 @@ base mixin CentrifugeWSPBConnectionMixin CentrifugeWSPBSenderMixin, CentrifugeWSPBStateHandlerMixin { @override - Future connect(String url) async { + Future connect( + String url, List subs) async { try { - await super.connect(url); + await super.connect(url, subs); await _webSocket.connect(url); final request = pb.ConnectRequest(); final token = await _config.getToken?.call(); @@ -330,7 +332,18 @@ base mixin CentrifugeWSPBConnectionMixin request ..name = _config.client.name ..version = _config.client.version; - // TODO(plugfox): add subscriptions. + // Add server-side subscriptions to connect request. + for (final sub in subs) { + final subRequest = pb.SubscribeRequest() + ..recover = sub.state.recoverable; + final since = sub.state.since; + if (since != null) { + subRequest + ..offset = since.offset + ..epoch = since.epoch; + } + request.subs.putIfAbsent(sub.channel, () => subRequest); + } final pb.ConnectResult result; try { result = await _sendMessage(request, pb.ConnectResult()); @@ -470,7 +483,7 @@ base mixin CentrifugeWSPBStateHandlerMixin } @override - Future connect(String url) { + Future connect(String url, List subs) { // Change state to connecting before connection. _setState(CentrifugeState$Connecting(url: url)); // Subscribe to websocket state after initialization. @@ -478,7 +491,7 @@ base mixin CentrifugeWSPBStateHandlerMixin _handleWebSocketClosedStates, cancelOnError: false, ); - return super.connect(url); + return super.connect(url, subs); } @override @@ -508,13 +521,13 @@ base mixin CentrifugeWSPBHandlerMixin StreamSubscription>? _webSocketMessageSubscription; @override - Future connect(String url) { + Future connect(String url, List subs) { // Subscribe to websocket messages after first connection. _webSocketMessageSubscription ??= _webSocket.stream.bytes.listen( _handleWebSocketMessage, cancelOnError: false, ); - return super.connect(url); + return super.connect(url, subs); } /// {@nodoc} @@ -872,9 +885,10 @@ base mixin CentrifugeWSPBPingPongMixin on CentrifugeWSPBTransportBase { Timer? _pingTimer; @override - Future connect(String url) async { + Future connect( + String url, List subs) async { _tearDownPingTimer(); - await super.connect(url); + await super.connect(url, subs); _restartPingTimer(); }