From 86cd2fcd8e563e66a45531d5b13e4c210b9f24c5 Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Wed, 12 Jun 2024 01:50:49 +0400 Subject: [PATCH] Refactor SpinifySubscriptionMixin to use specific implementation classes for client and server subscriptions --- lib/src/spinify_impl.dart | 15 +++++-- lib/src/subscription_impl.dart | 74 +++++++++++++++++++--------------- 2 files changed, 54 insertions(+), 35 deletions(-) diff --git a/lib/src/spinify_impl.dart b/lib/src/spinify_impl.dart index fc835b2..05b87ff 100644 --- a/lib/src/spinify_impl.dart +++ b/lib/src/spinify_impl.dart @@ -320,8 +320,8 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { {}; /// Registry of server subscriptions. - final Map _serverSubscriptionRegistry = - {}; + final Map _serverSubscriptionRegistry = + {}; @override SpinifyClientSubscription? getSubscription(String channel) => @@ -387,6 +387,8 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { ), stackTrace, ); + } finally { + subFromRegistry?.close(); } } @@ -396,6 +398,9 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { if (reply is SpinifyPush) { // Add push to the stream. _pushesController.add(reply.event); + final sub = _clientSubscriptionRegistry[reply.event.channel] ?? + _serverSubscriptionRegistry[reply.event.channel]; + sub?.onEvent(reply.event); config.logger?.call( const SpinifyLogLevel.debug(), 'push_received', @@ -431,7 +436,11 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { @override Future close() async { await super.close(); - await _pushesController.close(); + for (final sub in _clientSubscriptionRegistry.values) sub.close(); + for (final sub in _serverSubscriptionRegistry.values) sub.close(); + _clientSubscriptionRegistry.clear(); + _serverSubscriptionRegistry.clear(); + _pushesController.close().ignore(); } } diff --git a/lib/src/subscription_impl.dart b/lib/src/subscription_impl.dart index e177063..a745704 100644 --- a/lib/src/subscription_impl.dart +++ b/lib/src/subscription_impl.dart @@ -15,19 +15,15 @@ import 'spinify_interface.dart'; import 'subscription_interface.dart'; @internal -final class SpinifyClientSubscriptionImpl implements SpinifyClientSubscription { - SpinifyClientSubscriptionImpl({ +abstract base class SpinifySubscriptionBase implements SpinifySubscription { + SpinifySubscriptionBase({ required ISpinify client, required this.channel, - required this.config, }) : _clientWR = WeakReference(client); @override final String channel; - @override - final SpinifySubscriptionConfig config; - /// Spinify client weak reference. final WeakReference _clientWR; ISpinify get _client { @@ -41,21 +37,50 @@ final class SpinifyClientSubscriptionImpl implements SpinifyClientSubscription { return target; } - // TODO(plugfox): set from client - @override - SpinifyStreamPosition? get since => throw UnimplementedError(); + final StreamController _stateController = + StreamController.broadcast(); + + final StreamController _eventController = + StreamController.broadcast(); - // TODO(plugfox): set from client @override SpinifySubscriptionState get state => throw UnimplementedError(); - // TODO(plugfox): get from client @override SpinifySubscriptionStateStream get states => throw UnimplementedError(); @override ChannelEvents get stream => - _client.stream.filter(channel: channel); + ChannelEvents(_eventController.stream); + + @mustCallSuper + void onEvent(SpinifyChannelEvent event) { + _eventController.add(event); + // TODO(plugfox): update since position + } + + @mustCallSuper + void close() { + _stateController.close().ignore(); + _eventController.close().ignore(); + } +} + +@internal +final class SpinifyClientSubscriptionImpl extends SpinifySubscriptionBase + implements SpinifyClientSubscription { + SpinifyClientSubscriptionImpl({ + required super.client, + required super.channel, + required this.config, + }); + + @override + final SpinifySubscriptionConfig config; + + // TODO(plugfox): set from client + @override + SpinifyStreamPosition? get since => throw UnimplementedError(); @override Future history({ @@ -101,27 +126,12 @@ final class SpinifyClientSubscriptionImpl implements SpinifyClientSubscription { } @internal -final class SpinifyServerSubscriptionImpl implements SpinifyServerSubscription { +final class SpinifyServerSubscriptionImpl extends SpinifySubscriptionBase + implements SpinifyServerSubscription { SpinifyServerSubscriptionImpl({ - required ISpinify client, - required this.channel, - }) : _clientWR = WeakReference(client); - - @override - final String channel; - - /// Spinify client weak reference. - final WeakReference _clientWR; - ISpinify get _client { - final target = _clientWR.target; - if (target == null) { - throw SpinifySubscriptionException( - channel: channel, - message: 'Client is closed', - ); - } - return target; - } + required super.client, + required super.channel, + }); // TODO(plugfox): set from client @override