From 6ba54bd1c689b97b73699a988f526252178be1d7 Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Wed, 12 Jun 2024 12:38:42 +0400 Subject: [PATCH] Refactor SpinifySubscriptionMixin to use specific implementation classes for client and server subscriptions --- lib/src/model/subscription_config.dart | 5 +- lib/src/model/subscription_state.dart | 80 ++++---------------------- lib/src/spinify_impl.dart | 75 +++++++++++++++--------- lib/src/subscription_impl.dart | 41 ++++++++----- lib/src/subscription_interface.dart | 10 +++- 5 files changed, 99 insertions(+), 112 deletions(-) diff --git a/lib/src/model/subscription_config.dart b/lib/src/model/subscription_config.dart index 259e98a..b2eeecb 100644 --- a/lib/src/model/subscription_config.dart +++ b/lib/src/model/subscription_config.dart @@ -61,7 +61,10 @@ class SpinifySubscriptionConfig { this.recoverable = false, this.joinLeave = false, this.timeout = const Duration(seconds: 15), - }); + }) : assert( + (recoverable == false && since == null) || + (recoverable == true && since != null), + 'recoverable and since must be set together'); /// Create a default config /// diff --git a/lib/src/model/subscription_state.dart b/lib/src/model/subscription_state.dart index 05d9096..5d07d06 100644 --- a/lib/src/model/subscription_state.dart +++ b/lib/src/model/subscription_state.dart @@ -1,8 +1,5 @@ -import 'package:fixnum/fixnum.dart' as fixnum; import 'package:meta/meta.dart'; -import 'stream_position.dart'; - /// {@template subscription_state} /// Subscription has 3 states: /// @@ -17,36 +14,24 @@ import 'stream_position.dart'; @immutable sealed class SpinifySubscriptionState extends _$SpinifySubscriptionStateBase { /// {@macro subscription_state} - const SpinifySubscriptionState( - {required super.timestamp, - required super.since, - required super.recoverable}); + const SpinifySubscriptionState({required super.timestamp}); /// Unsubscribed /// {@macro subscription_state} factory SpinifySubscriptionState.unsubscribed({ - required int code, - required String reason, DateTime? timestamp, - SpinifyStreamPosition? since, - bool recoverable, }) = SpinifySubscriptionState$Unsubscribed; /// Subscribing /// {@macro subscription_state} factory SpinifySubscriptionState.subscribing({ DateTime? timestamp, - SpinifyStreamPosition? since, - bool recoverable, }) = SpinifySubscriptionState$Subscribing; /// Subscribed /// {@macro subscription_state} factory SpinifySubscriptionState.subscribed({ DateTime? timestamp, - SpinifyStreamPosition? since, - bool recoverable, - DateTime? ttl, }) = SpinifySubscriptionState$Subscribed; } @@ -59,22 +44,12 @@ final class SpinifySubscriptionState$Unsubscribed extends SpinifySubscriptionState { /// {@macro subscription_state} SpinifySubscriptionState$Unsubscribed({ - required this.code, - required this.reason, DateTime? timestamp, - super.since, - super.recoverable = false, }) : super(timestamp: timestamp ?? DateTime.now()); @override String get type => 'unsubscribed'; - /// Unsubscribe code. - final int code; - - /// Unsubscribe reason. - final String reason; - @override bool get isUnsubscribed => true; @@ -99,17 +74,11 @@ final class SpinifySubscriptionState$Unsubscribed unsubscribed(this); @override - Map toJson() => { - ...super.toJson(), - 'code': code, - 'reason': reason, - }; - - @override - int get hashCode => Object.hash(0, timestamp, since); + int get hashCode => Object.hash(0, timestamp); @override - bool operator ==(Object other) => identical(this, other); + bool operator ==(Object other) => + identical(this, other) || other is SpinifySubscriptionState$Unsubscribed; @override String toString() => r'SpinifySubscriptionState$Unsubscribed{}'; @@ -125,8 +94,6 @@ final class SpinifySubscriptionState$Subscribing /// {@macro subscription_state} SpinifySubscriptionState$Subscribing({ DateTime? timestamp, - super.since, - super.recoverable = false, }) : super(timestamp: timestamp ?? DateTime.now()); @override @@ -156,10 +123,11 @@ final class SpinifySubscriptionState$Subscribing subscribing(this); @override - int get hashCode => Object.hash(1, timestamp, since); + int get hashCode => Object.hash(1, timestamp); @override - bool operator ==(Object other) => identical(this, other); + bool operator ==(Object other) => + identical(this, other) || other is SpinifySubscriptionState$Subscribing; @override String toString() => r'SpinifySubscriptionState$Subscribing{}'; @@ -175,17 +143,11 @@ final class SpinifySubscriptionState$Subscribed /// {@macro subscription_state} SpinifySubscriptionState$Subscribed({ DateTime? timestamp, - super.since, - super.recoverable = false, - this.ttl, }) : super(timestamp: timestamp ?? DateTime.now()); @override String get type => 'subscribed'; - /// Time to live in seconds. - final DateTime? ttl; - @override bool get isUnsubscribed => false; @@ -210,16 +172,11 @@ final class SpinifySubscriptionState$Subscribed subscribed(this); @override - Map toJson() => { - ...super.toJson(), - if (ttl != null) 'ttl': ttl?.toUtc().toIso8601String(), - }; + int get hashCode => Object.hash(2, timestamp); @override - int get hashCode => Object.hash(2, timestamp, since, recoverable, ttl); - - @override - bool operator ==(Object other) => identical(this, other); + bool operator ==(Object other) => + identical(this, other) || other is SpinifySubscriptionState$Subscribed; @override String toString() => r'SpinifySubscriptionState$Subscribed{}'; @@ -234,8 +191,6 @@ typedef SpinifySubscriptionStateMatch = R abstract base class _$SpinifySubscriptionStateBase { const _$SpinifySubscriptionStateBase({ required this.timestamp, - required this.since, - required this.recoverable, }); /// Represents the current state type. @@ -244,12 +199,6 @@ abstract base class _$SpinifySubscriptionStateBase { /// Timestamp of state change. final DateTime timestamp; - /// Stream Position - final SpinifyStreamPosition? since; - - /// Whether channel is recoverable. - final bool recoverable; - /// Whether channel is unsubscribed. abstract final bool isUnsubscribed; @@ -306,14 +255,5 @@ abstract base class _$SpinifySubscriptionStateBase { Map toJson() => { 'type': type, 'timestamp': timestamp.toUtc().toIso8601String(), - if (since != null) - 'since': switch (since) { - (:fixnum.Int64 offset, :String epoch) => { - 'offset': offset, - 'epoch': epoch, - }, - _ => null, - }, - 'recoverable': recoverable, }; } diff --git a/lib/src/spinify_impl.dart b/lib/src/spinify_impl.dart index 80aec84..503580a 100644 --- a/lib/src/spinify_impl.dart +++ b/lib/src/spinify_impl.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:collection'; import 'package:meta/meta.dart'; @@ -16,6 +17,7 @@ import 'model/state.dart'; import 'model/states_stream.dart'; import 'model/stream_position.dart'; import 'model/subscription_config.dart'; +import 'model/subscription_state.dart'; import 'model/transport_interface.dart'; import 'spinify_interface.dart'; import 'subscription_impl.dart'; @@ -302,18 +304,23 @@ base mixin SpinifyCommandMixin on SpinifyBase { /// Base mixin for Spinify subscription management. base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { - final StreamController _pushesController = + final StreamController _eventController = StreamController.broadcast(); @override late final SpinifyChannelEvents stream = - SpinifyChannelEvents(_pushesController.stream); + SpinifyChannelEvents(_eventController.stream); @override ({ Map client, Map server - }) get subscriptions => throw UnimplementedError(); + }) get subscriptions => ( + client: UnmodifiableMapView( + _clientSubscriptionRegistry), + server: UnmodifiableMapView( + _serverSubscriptionRegistry), + ); /// Registry of client subscriptions. final Map _clientSubscriptionRegistry = @@ -397,31 +404,45 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { await super._onReply(reply); if (reply is SpinifyPush) { // Add push to the stream. - _pushesController.add(reply.event); + _eventController.add(reply.event); final sub = _clientSubscriptionRegistry[reply.event.channel] ?? _serverSubscriptionRegistry[reply.event.channel]; sub?.onEvent(reply.event); } else if (reply is SpinifyConnectResult) { - // Update subscriptions state. - //final entries = reply.subs?.entries; - // TODO(plugfox): implement subscription state update - /* for (final entry in entries) { - final MapEntry(key: channel, value: sub) - = entry; - final subState = reply.subs[channel]; - if (subState != null) { - sub.state = subState; - config.logger?.call( - const SpinifyLogLevel.debug(), - 'subscription_state_updated', - 'Subscription state updated', - { - 'subscription': sub, - 'state': subState, - }, - ); + // Update server subscriptions. + final newServerSubs = reply.subs ?? {}; + for (final entry in newServerSubs.entries) { + final MapEntry( + key: channel, + value: value + ) = entry; + final sub = _serverSubscriptionRegistry.putIfAbsent( + channel, + () => SpinifyServerSubscriptionImpl( + client: this, + channel: channel, + recoverable: value.recoverable, + epoch: value.since.epoch, + offset: value.since.offset, + )) + ..recoverable = value.recoverable + ..epoch = value.since.epoch + ..offset = value.since.offset + ..setState(SpinifySubscriptionState.subscribed()); + // Notify about new publications. + for (final publication in value.publications) { + _eventController.add(publication); + sub.onEvent(publication); } - } */ + } + final currentServerSubs = _serverSubscriptionRegistry.keys.toSet(); + for (final key in currentServerSubs) { + if (newServerSubs.containsKey(key)) continue; + _serverSubscriptionRegistry + .remove(key) + ?.setState(SpinifySubscriptionState.unsubscribed()); + } + // TODO(plugfox): Resubscribe client subscriptions on connect } } @@ -432,7 +453,7 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { for (final sub in _serverSubscriptionRegistry.values) sub.close(); _clientSubscriptionRegistry.clear(); _serverSubscriptionRegistry.clear(); - _pushesController.close().ignore(); + _eventController.close().ignore(); } } @@ -490,9 +511,9 @@ base mixin SpinifyConnectionMixin id: id, timestamp: now, channel: sub.channel, - recover: sub.state.recoverable, - epoch: sub.state.since?.epoch, - offset: sub.state.since?.offset, + recover: sub.recoverable, + epoch: sub.epoch, + offset: sub.offset, token: null, data: null, positioned: null, diff --git a/lib/src/subscription_impl.dart b/lib/src/subscription_impl.dart index de8bba4..0419ce9 100644 --- a/lib/src/subscription_impl.dart +++ b/lib/src/subscription_impl.dart @@ -1,5 +1,6 @@ import 'dart:async'; +import 'package:fixnum/fixnum.dart' as fixnum; import 'package:meta/meta.dart'; import 'model/channel_event.dart'; @@ -20,6 +21,9 @@ abstract base class SpinifySubscriptionBase implements SpinifySubscription { SpinifySubscriptionBase({ required ISpinify client, required this.channel, + required this.recoverable, + required this.epoch, + required this.offset, }) : _clientWR = WeakReference(client); @override @@ -40,14 +44,23 @@ abstract base class SpinifySubscriptionBase implements SpinifySubscription { SpinifyLogger? get _logger => _client.config.logger; - SpinifySubscriptionState _state = - SpinifySubscriptionState.unsubscribed(code: 0, reason: 'initial state'); + SpinifySubscriptionState _state = SpinifySubscriptionState.unsubscribed(); + final StreamController _stateController = StreamController.broadcast(); final StreamController _eventController = StreamController.broadcast(); + @override + bool recoverable; + + @override + String epoch; + + @override + fixnum.Int64 offset; + @override SpinifySubscriptionState get state => _state; @@ -62,8 +75,10 @@ abstract base class SpinifySubscriptionBase implements SpinifySubscription { @mustCallSuper void onEvent(SpinifyChannelEvent event) { _eventController.add(event); - // TODO(plugfox): update since position - + if (event is SpinifyPublication && recoverable) { + if (event.offset case fixnum.Int64 newOffset when newOffset > 0) + offset = newOffset; + } _logger?.call( const SpinifyLogLevel.debug(), 'channel_event_received', @@ -78,6 +93,7 @@ abstract base class SpinifySubscriptionBase implements SpinifySubscription { @mustCallSuper void setState(SpinifySubscriptionState state) { + if (_state == state) return; _stateController.add(_state = state); _logger?.call( const SpinifyLogLevel.debug(), @@ -105,15 +121,15 @@ final class SpinifyClientSubscriptionImpl extends SpinifySubscriptionBase required super.client, required super.channel, required this.config, - }); + }) : super( + recoverable: config.recoverable, + epoch: config.since?.epoch ?? '', + offset: config.since?.offset ?? fixnum.Int64(0), + ); @override final SpinifySubscriptionConfig config; - // TODO(plugfox): set from client - @override - SpinifyStreamPosition? get since => throw UnimplementedError(); - @override Future history({ int? limit, @@ -163,12 +179,11 @@ final class SpinifyServerSubscriptionImpl extends SpinifySubscriptionBase SpinifyServerSubscriptionImpl({ required super.client, required super.channel, + required super.recoverable, + required super.epoch, + required super.offset, }); - // TODO(plugfox): set from client - @override - SpinifyStreamPosition? get since => throw UnimplementedError(); - @override SpinifyChannelEvents get stream => _client.stream.filter(channel: channel); diff --git a/lib/src/subscription_interface.dart b/lib/src/subscription_interface.dart index 8542e1c..0782847 100644 --- a/lib/src/subscription_interface.dart +++ b/lib/src/subscription_interface.dart @@ -1,5 +1,7 @@ import 'dart:async'; +import 'package:fixnum/fixnum.dart' as fixnum; + import 'model/channel_event.dart'; import 'model/channel_events.dart'; import 'model/history.dart'; @@ -44,8 +46,14 @@ abstract interface class SpinifySubscription { /// Current subscription state. abstract final SpinifySubscriptionState state; + /// Whether subscription is recoverable using epoch and offset. + abstract final bool recoverable; + + /// Epoch of last successfully received message. + abstract final String epoch; + /// Offset of last successfully received message. - abstract final SpinifyStreamPosition? since; + abstract final fixnum.Int64 offset; /// Stream of subscription states. abstract final SpinifySubscriptionStates states;