From 1788237e65c30f6e36cc9031b56bdc05c1e2d23f Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Thu, 3 Aug 2023 15:14:23 +0400 Subject: [PATCH] Update subscriptions --- lib/src/client/centrifuge.dart | 4 +- .../client_subscription_impl.dart | 45 ++++++--- .../server_subscription_impl.dart | 67 ++++++++++++-- .../server_subscription_manager.dart | 43 ++++++++- lib/src/transport/transport_interface.dart | 6 +- lib/src/transport/ws_protobuf_transport.dart | 91 +++++++++++-------- 6 files changed, 185 insertions(+), 71 deletions(-) diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index b471c87..444e419 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -383,9 +383,7 @@ base mixin CentrifugeConnectionMixin try { _refreshTimer?.cancel(); _refreshTimer = null; - final subs = _serverSubscriptionManager.subscriptions.values - .toList(growable: false); - await _transport.connect(url, subs); + await _transport.connect(url, _serverSubscriptionManager); } on CentrifugeException catch (error, stackTrace) { _emitError(error, stackTrace); rethrow; diff --git a/lib/src/subscription/client_subscription_impl.dart b/lib/src/subscription/client_subscription_impl.dart index 9a21569..14eada5 100644 --- a/lib/src/subscription/client_subscription_impl.dart +++ b/lib/src/subscription/client_subscription_impl.dart @@ -68,7 +68,7 @@ abstract base class CentrifugeClientSubscriptionBase @override CentrifugeStreamPosition? get since => switch (state.since?.epoch) { String epoch => (epoch: epoch, offset: _offset), - _ => null, + _ => state.since, }; /// Weak reference to transport. @@ -133,7 +133,14 @@ abstract base class CentrifugeClientSubscriptionBase /// {@nodoc} @internal @mustCallSuper - Future close() async { + Future close([int code = 0, String reason = 'closed']) async { + if (!_state.isUnsubscribed) + _setState(CentrifugeSubscriptionState.unsubscribed( + code: code, + reason: reason, + recoverable: false, + since: since, + )); _stateController.close().ignore(); } } @@ -223,8 +230,8 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin } @override - Future close() async { - await super.close(); + Future close([int code = 0, String reason = 'closed']) async { + await super.close(code, reason); for (final controller in >[ _pushController, _publicationsController, @@ -263,8 +270,8 @@ base mixin CentrifugeClientSubscriptionErrorsMixin errors = _errorsController.stream; @override - Future close() async { - await super.close(); + Future close([int code = 0, String reason = 'closed']) async { + await super.close(code, reason); _errorsController.close().ignore(); } } @@ -294,7 +301,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin _refreshTimer?.cancel(); _refreshTimer = null; _setState(CentrifugeSubscriptionState$Subscribing( - since: since ?? state.since, + since: since, recoverable: state.recoverable, )); final subscribed = await _transport.subscribe( @@ -311,7 +318,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin final offset = subscribed.since?.offset; if (offset != null && offset > _offset) _offset = offset; _setState(CentrifugeSubscriptionState$Subscribed( - since: subscribed.since ?? since ?? state.since, + since: subscribed.since ?? since, recoverable: subscribed.recoverable, ttl: subscribed.ttl, )); @@ -389,9 +396,10 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin _setState(CentrifugeSubscriptionState.unsubscribed( code: code, reason: reason, - since: since ?? state.since, + since: since, recoverable: state.recoverable, )); + if (_transport.state.isClosed) return; try { await _transport.unsubscribe(channel, _config); } on Object catch (error, stackTrace) { @@ -451,12 +459,21 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin }).ignore(); @override - Future close() async { + Future close([int code = 0, String reason = 'closed']) async { logger.fine('Closing subscription to $channel'); _refreshTimer?.cancel(); _refreshTimer = null; - await super.close(); - await _transport.close(); + try { + if (!state.isUnsubscribed) await unsubscribe(code, reason); + } on Object catch (error, stackTrace) { + final centrifugeException = CentrifugeSubscriptionException( + message: 'Error while unsubscribing from channel $channel', + channel: channel, + error: error, + ); + _emitError(centrifugeException, stackTrace); + } + await super.close(code, reason); } } @@ -617,7 +634,7 @@ base mixin CentrifugeClientSubscriptionQueueMixin .push('presenceStats', super.presenceStats); @override - Future close() => _eventQueue - .push('close', super.close) + Future close([int code = 0, String reason = 'closed']) => _eventQueue + .push('close', () => super.close(code, reason)) .whenComplete(_eventQueue.close); } diff --git a/lib/src/subscription/server_subscription_impl.dart b/lib/src/subscription/server_subscription_impl.dart index 8eb9d33..7c3e733 100644 --- a/lib/src/subscription/server_subscription_impl.dart +++ b/lib/src/subscription/server_subscription_impl.dart @@ -21,6 +21,7 @@ import 'package:centrifuge_dart/src/subscription/subscription_state.dart'; import 'package:centrifuge_dart/src/subscription/subscription_states_stream.dart'; import 'package:centrifuge_dart/src/transport/transport_interface.dart'; import 'package:centrifuge_dart/src/util/event_queue.dart'; +import 'package:centrifuge_dart/src/util/logger.dart' as logger; import 'package:fixnum/fixnum.dart' as fixnum; import 'package:meta/meta.dart'; import 'package:stack_trace/stack_trace.dart' as st; @@ -64,7 +65,7 @@ abstract base class CentrifugeServerSubscriptionBase @override CentrifugeStreamPosition? get since => switch (state.since?.epoch) { String epoch => (epoch: epoch, offset: _offset), - _ => null, + _ => state.since, }; /// Offset of last received publication. @@ -127,7 +128,14 @@ abstract base class CentrifugeServerSubscriptionBase /// {@nodoc} @internal @mustCallSuper - Future close() async { + Future close([int code = 0, String reason = 'closed']) async { + if (!_state.isUnsubscribed) + _setState(CentrifugeSubscriptionState.unsubscribed( + code: 0, + reason: 'closed', + recoverable: false, + since: since, + )); _stateController.close().ignore(); } } @@ -207,7 +215,7 @@ base mixin CentrifugeServerSubscriptionEventReceiverMixin final offset = sub.streamPosition?.offset; if (offset != null && offset > _offset) _offset = offset; _setState(CentrifugeSubscriptionState.subscribed( - since: sub.streamPosition ?? since ?? state.since, + since: sub.streamPosition ?? since, recoverable: sub.recoverable, )); case CentrifugeUnsubscribe unsub: @@ -215,7 +223,7 @@ base mixin CentrifugeServerSubscriptionEventReceiverMixin code: unsub.code, reason: unsub.reason, recoverable: state.recoverable, - since: since ?? state.since, + since: since, )); case CentrifugeConnect _: break; @@ -227,8 +235,8 @@ base mixin CentrifugeServerSubscriptionEventReceiverMixin } @override - Future close() async { - await super.close(); + Future close([int code = 0, String reason = 'closed']) async { + await super.close(code, reason); for (final controller in >[ _pushController, _publicationsController, @@ -267,8 +275,8 @@ base mixin CentrifugeServerSubscriptionErrorsMixin errors = _errorsController.stream; @override - Future close() async { - await super.close(); + Future close([int code = 0, String reason = 'closed']) async { + await super.close(code, reason); _errorsController.close().ignore(); } } @@ -309,6 +317,45 @@ base mixin CentrifugeServerSubscriptionReadyMixin Error.throwWithStackTrace(centrifugeException, stackTrace); } } + + /// Mark subscription as ready. + /// {@nodoc} + void setSubscribed() { + if (!state.isSubscribed) + _setState(CentrifugeSubscriptionState.subscribed( + since: since, + recoverable: state.recoverable, + )); + } + + /// Mark subscription as subscribing. + /// {@nodoc} + void setSubscribing() { + if (!state.isSubscribing) + _setState(CentrifugeSubscriptionState.subscribing( + since: since, + recoverable: state.recoverable, + )); + } + + /// Mark subscription as unsubscribed. + /// {@nodoc} + void setUnsubscribed(int code, String reason) { + if (!state.isUnsubscribed) + _setState(CentrifugeSubscriptionState.unsubscribed( + code: code, + reason: reason, + recoverable: state.recoverable, + since: since, + )); + } + + @override + Future close([int code = 0, String reason = 'closed']) async { + logger.fine('Closing subscription to $channel'); + if (!state.isUnsubscribed) setUnsubscribed(code, reason); + await super.close(code, reason); + } } /// Mixin responsible for publishing. @@ -452,7 +499,7 @@ base mixin CentrifugeServerSubscriptionQueueMixin .push('presenceStats', super.presenceStats); @override - Future close() => _eventQueue - .push('close', super.close) + Future close([int code = 0, String reason = 'closed']) => _eventQueue + .push('close', () => super.close(code, reason)) .whenComplete(_eventQueue.close); } diff --git a/lib/src/subscription/server_subscription_manager.dart b/lib/src/subscription/server_subscription_manager.dart index a5418ab..4383ebb 100644 --- a/lib/src/subscription/server_subscription_manager.dart +++ b/lib/src/subscription/server_subscription_manager.dart @@ -43,12 +43,34 @@ final class ServerSubscriptionManager { /// Called on [CentrifugeUnsubscribe] push from server. void unsubscribe(CentrifugeUnsubscribe subscribe) {} + /// Called when client finished connection handshake with server. + /// Add non existing subscriptions to registry and mark all connected. + /// Remove subscriptions which are not in [subs] argument. + void upsert(List subs) { + final currentChannels = _channelSubscriptions.keys.toSet(); + // Remove subscriptions which are not in subs argument. + for (final channel in currentChannels) { + if (subs.any((e) => e.channel == channel)) continue; + _channelSubscriptions.remove(channel)?.close(); + } + // Add non existing subscriptions to registry and mark all connected. + for (final sub in subs) { + (_channelSubscriptions[sub.channel] ??= CentrifugeServerSubscriptionImpl( + channel: sub.channel, + transportWeakRef: _transportWeakRef, + )) + .onPush(sub); + } + } + /// Called when subscribed to a server-side channel upon Client moving to /// connected state or during connection lifetime if server sends Subscribe /// push message. /// {@nodoc} void setSubscribedAll() { - for (final entry in _channelSubscriptions.values) {} + for (final entry in _channelSubscriptions.values) { + if (entry.state.isSubscribed) continue; + } } /// Called when existing connection lost (Client reconnects) or Client @@ -56,18 +78,29 @@ final class ServerSubscriptionManager { /// registry with stream position information where applicable. /// {@nodoc} void setSubscribingAll() { - for (final entry in _channelSubscriptions.values) {} + for (final entry in _channelSubscriptions.values) { + if (entry.state.isSubscribing) continue; + } } /// Called when server sent unsubscribe push or server-side subscription /// previously existed in SDK registry disappeared upon Client reconnect. /// {@nodoc} void setUnsubscribedAll() { - for (final entry in _channelSubscriptions.values) {} + for (final entry in _channelSubscriptions.values) { + if (entry.state.isUnsubscribed) continue; + } } - void close() { - setUnsubscribedAll(); + /// Close all subscriptions. + /// {@nodoc} + void close([ + int code = 0, + String reason = 'client closed', + ]) { + for (final entry in _channelSubscriptions.values) { + entry.close(code, reason).ignore(); + } _channelSubscriptions.clear(); } diff --git a/lib/src/transport/transport_interface.dart b/lib/src/transport/transport_interface.dart index 26e5803..4ad628a 100644 --- a/lib/src/transport/transport_interface.dart +++ b/lib/src/transport/transport_interface.dart @@ -7,6 +7,7 @@ import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; import 'package:centrifuge_dart/src/model/refresh.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; +import 'package:centrifuge_dart/src/subscription/server_subscription_manager.dart'; import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart'; import 'package:centrifuge_dart/src/util/notifier.dart'; import 'package:meta/meta.dart'; @@ -31,7 +32,10 @@ abstract interface class ICentrifugeTransport { /// [url] is a URL of endpoint. /// [subs] is a list of server-side subscriptions to subscribe on connect. /// {@nodoc} - Future connect(String url, List subs); + Future connect( + String url, + ServerSubscriptionManager serverSubscriptionManager, + ); /// Send asynchronous message to a server. This method makes sense /// only when using Centrifuge library for Go on a server side. In Centrifuge diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index 9969ce3..f5f689c 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -17,6 +17,7 @@ import 'package:centrifuge_dart/src/model/refresh.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; import 'package:centrifuge_dart/src/model/subscribe.dart'; import 'package:centrifuge_dart/src/model/unsubscribe.dart'; +import 'package:centrifuge_dart/src/subscription/server_subscription_manager.dart'; import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart'; import 'package:centrifuge_dart/src/transport/transport_interface.dart'; import 'package:centrifuge_dart/src/transport/transport_protobuf_codec.dart'; @@ -81,7 +82,9 @@ abstract base class CentrifugeWSPBTransportBase @override @mustCallSuper Future connect( - String url, List subs) async {} + String url, + ServerSubscriptionManager serverSubscriptionManager, + ) async {} @override @mustCallSuper @@ -319,9 +322,11 @@ base mixin CentrifugeWSPBConnectionMixin CentrifugeWSPBStateHandlerMixin { @override Future connect( - String url, List subs) async { + String url, + ServerSubscriptionManager serverSubscriptionManager, + ) async { try { - await super.connect(url, subs); + await super.connect(url, serverSubscriptionManager); await _webSocket.connect(url); final request = pb.ConnectRequest(); final token = await _config.getToken?.call(); @@ -333,16 +338,19 @@ base mixin CentrifugeWSPBConnectionMixin ..name = _config.client.name ..version = _config.client.version; // Add server-side subscriptions to connect request. - for (final sub in subs) { - final subRequest = pb.SubscribeRequest() - ..recover = sub.state.recoverable; - final since = sub.state.since; - if (since != null) { - subRequest + { + final subs = serverSubscriptionManager.subscriptions.values; + for (final CentrifugeServerSubscription( + channel: String channel, + state: CentrifugeSubscriptionState(:recoverable, :since), + ) in subs) { + if (since == null) continue; + final subRequest = pb.SubscribeRequest() + ..recover = recoverable ..offset = since.offset ..epoch = since.epoch; + request.subs.putIfAbsent(channel, () => subRequest); } - request.subs.putIfAbsent(sub.channel, () => subRequest); } final pb.ConnectResult result; try { @@ -362,28 +370,27 @@ base mixin CentrifugeWSPBConnectionMixin final now = DateTime.now(); final expires = result.hasExpires() && result.expires && result.hasTtl(); - // TODO(plugfox): implement server-side subscriptions. - /* result.subs.forEach((key, value) { - _serverSubs[key] = ServerSubscription( - key, value.recoverable, value.offset, value.epoch); - final event = ServerSubscribedEvent.fromSubscribeResult(key, value); - _subscribedController.add(event); - value.publications.forEach((element) { - final event = ServerPublicationEvent.from(key, element); - _publicationController.add(event); - if (_serverSubs[key]!.recoverable && element.offset > 0) { - _serverSubs[key]!.offset = element.offset; - } - }); - }); - - _serverSubs.forEach((key, value) { - if (!result.subs.containsKey(key)) { - final event = ServerUnsubscribedEvent.from(key); - _unsubscribedController.add(event); - } - }); - _serverSubs.removeWhere((key, value) => !result.subs.containsKey(key)); */ + // Update server-side subscriptions. + { + final subs = result.subs.entries.map((e) { + final channel = e.key; + final sub = e.value; + final positioned = sub.hasPositioned() && sub.positioned; + final recoverable = sub.hasRecoverable() && sub.recoverable; + return CentrifugeSubscribe( + timestamp: now, + channel: channel, + positioned: positioned, + recoverable: recoverable, + data: sub.hasData() ? sub.data : [], + streamPosition: + (positioned || recoverable) && sub.hasOffset() && sub.hasEpoch() + ? (offset: sub.offset, epoch: sub.epoch) + : null, + ); + }).toList(growable: false); + serverSubscriptionManager.upsert(subs); + } _setState(CentrifugeState$Connected( url: url, @@ -507,7 +514,10 @@ base mixin CentrifugeWSPBStateHandlerMixin } @override - Future connect(String url, List subs) { + Future connect( + String url, + ServerSubscriptionManager serverSubscriptionManager, + ) { // Change state to connecting before connection. _setState(CentrifugeState$Connecting(url: url)); // Subscribe to websocket state after initialization. @@ -515,7 +525,7 @@ base mixin CentrifugeWSPBStateHandlerMixin _handleWebSocketClosedStates, cancelOnError: false, ); - return super.connect(url, subs); + return super.connect(url, serverSubscriptionManager); } @override @@ -545,13 +555,16 @@ base mixin CentrifugeWSPBHandlerMixin StreamSubscription>? _webSocketMessageSubscription; @override - Future connect(String url, List subs) { + Future connect( + String url, + ServerSubscriptionManager serverSubscriptionManager, + ) { // Subscribe to websocket messages after first connection. _webSocketMessageSubscription ??= _webSocket.stream.bytes.listen( _handleWebSocketMessage, cancelOnError: false, ); - return super.connect(url, subs); + return super.connect(url, serverSubscriptionManager); } /// {@nodoc} @@ -910,9 +923,11 @@ base mixin CentrifugeWSPBPingPongMixin on CentrifugeWSPBTransportBase { @override Future connect( - String url, List subs) async { + String url, + ServerSubscriptionManager serverSubscriptionManager, + ) async { _tearDownPingTimer(); - await super.connect(url, subs); + await super.connect(url, serverSubscriptionManager); _restartPingTimer(); }