diff --git a/example/lib/src/feature/dependencies/initialization/platform/initialization_vm.dart b/example/lib/src/feature/dependencies/initialization/platform/initialization_vm.dart index 76ba8d0..bab36ca 100644 --- a/example/lib/src/feature/dependencies/initialization/platform/initialization_vm.dart +++ b/example/lib/src/feature/dependencies/initialization/platform/initialization_vm.dart @@ -21,8 +21,8 @@ Future _desktopInitialization() async { center: true, backgroundColor: PlatformDispatcher.instance.platformBrightness == Brightness.dark - ? ThemeData.dark().colorScheme.background - : ThemeData.light().colorScheme.background, + ? ThemeData.dark().colorScheme.surface + : ThemeData.light().colorScheme.surface, skipTaskbar: false, titleBarStyle: TitleBarStyle.hidden, /* alwaysOnTop: true, */ diff --git a/lib/spinify.dart b/lib/spinify.dart index 87deab3..80ad766 100644 --- a/lib/spinify.dart +++ b/lib/spinify.dart @@ -12,14 +12,14 @@ export 'src/model/jwt.dart'; export 'src/model/metric.dart'; export 'src/model/presence_stats.dart'; export 'src/model/reply.dart'; -export 'src/model/spinify_interface.dart'; export 'src/model/state.dart'; export 'src/model/states_stream.dart'; export 'src/model/stream_position.dart'; -export 'src/model/subscription.dart'; export 'src/model/subscription_config.dart'; export 'src/model/subscription_state.dart'; export 'src/model/subscription_states_stream.dart'; export 'src/model/transport_interface.dart'; export 'src/spinify_impl.dart' show Spinify; +export 'src/spinify_interface.dart'; +export 'src/subscription_interface.dart'; export 'src/transport_fake.dart'; diff --git a/lib/src/spinify_impl.dart b/lib/src/spinify_impl.dart index ea7c60f..ac5e6ce 100644 --- a/lib/src/spinify_impl.dart +++ b/lib/src/spinify_impl.dart @@ -7,17 +7,19 @@ import 'model/channel_events.dart'; import 'model/command.dart'; import 'model/config.dart'; import 'model/constant.dart'; +import 'model/exception.dart'; import 'model/history.dart'; import 'model/metric.dart'; import 'model/presence_stats.dart'; import 'model/reply.dart'; -import 'model/spinify_interface.dart'; import 'model/state.dart'; import 'model/states_stream.dart'; import 'model/stream_position.dart'; -import 'model/subscription.dart'; import 'model/subscription_config.dart'; import 'model/transport_interface.dart'; +import 'spinify_interface.dart'; +import 'subscription_impl.dart'; +import 'subscription_interface.dart'; import 'transport_ws_pb_stub.dart' // ignore: uri_does_not_exist if (dart.library.js_util) 'transport_ws_pb_js.dart' @@ -298,9 +300,150 @@ base mixin SpinifyCommandMixin on SpinifyBase { } } +/// Base mixin for Spinify subscription management. +base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { + final StreamController _pushesController = + StreamController.broadcast(); + + @override + late final ChannelEvents stream = + ChannelEvents(_pushesController.stream); + + @override + ({ + Map client, + Map server + }) get subscriptions => throw UnimplementedError(); + + /// Registry of client subscriptions. + final Map _clientSubscriptionRegistry = + {}; + + /// Registry of server subscriptions. + final Map _serverSubscriptionRegistry = + {}; + + @override + SpinifyClientSubscription? getSubscription(String channel) => + _clientSubscriptionRegistry[channel]; + + @override + SpinifyClientSubscription newSubscription(String channel, + [SpinifySubscriptionConfig? config]) { + final sub = _clientSubscriptionRegistry[channel] ?? + _serverSubscriptionRegistry[channel]; + if (sub != null) { + this.config.logger?.call( + const SpinifyLogLevel.warning(), + 'subscription_exists_error', + 'Subscription already exists', + { + 'channel': channel, + 'subscription': sub, + }, + ); + throw SpinifySubscriptionException( + channel: channel, + message: 'Subscription already exists', + ); + } + return _clientSubscriptionRegistry[channel] = SpinifyClientSubscriptionImpl( + client: this, + channel: channel, + config: config ?? const SpinifySubscriptionConfig.byDefault(), + ); + } + + @override + Future removeSubscription( + SpinifyClientSubscription subscription) async { + final subFromRegistry = + _clientSubscriptionRegistry.remove(subscription.channel); + try { + await subFromRegistry?.unsubscribe(); + assert( + subFromRegistry != null, + 'Subscription not found in the registry', + ); + assert( + identical(subFromRegistry, subscription), + 'Subscription should be the same instance as in the registry', + ); + } on Object catch (error, stackTrace) { + config.logger?.call( + const SpinifyLogLevel.warning(), + 'subscription_remove_error', + 'Error removing subscription', + { + 'channel': subscription.channel, + 'subscription': subscription, + }, + ); + Error.throwWithStackTrace( + SpinifySubscriptionException( + channel: subscription.channel, + message: 'Error while unsubscribing', + error: error, + ), + stackTrace, + ); + } finally { + subFromRegistry?.close().ignore(); + } + } + + @override + Future _onReply(SpinifyReply reply) async { + await super._onReply(reply); + if (reply is SpinifyPush) { + // Add push to the stream. + _pushesController.add(reply.event); + config.logger?.call( + const SpinifyLogLevel.debug(), + 'push_received', + 'Push received', + { + 'push': reply, + 'event': reply.event, + }, + ); + } else if (reply is SpinifyConnectResult) { + // Update subscriptions state. + final entries = reply.subs?.entries; + // TODO(plugfox): implement subscription state update + /* for (final entry in entries) { + final MapEntry(key: channel, value: sub) = entry; + final subState = reply.subs[channel]; + if (subState != null) { + sub.state = subState; + config.logger?.call( + const SpinifyLogLevel.debug(), + 'subscription_state_updated', + 'Subscription state updated', + { + 'subscription': sub, + 'state': subState, + }, + ); + } + } */ + } + } + + @override + Future close() async { + await super.close(); + await _pushesController.close(); + } +} + /// Base mixin for Spinify client connection management (connect & disconnect). base mixin SpinifyConnectionMixin - on SpinifyBase, SpinifyCommandMixin, SpinifyStateMixin { + on + SpinifyBase, + SpinifyCommandMixin, + SpinifyStateMixin, + SpinifySubscriptionMixin { Timer? _reconnectTimer; Completer? _readyCompleter; @@ -335,19 +478,36 @@ base mixin SpinifyConnectionMixin final token = await config.getToken?.call(); assert(token == null || token.length > 5, 'Spinify JWT is too short'); final payload = await config.getPayload?.call(); + final id = _getNextCommandId(); + final now = DateTime.now(); request = SpinifyConnectRequest( - id: _getNextCommandId(), - timestamp: DateTime.now(), + id: id, + timestamp: now, token: token, data: payload, - // TODO(plugfox): Implement subscriptions. - subs: const {}, + subs: { + for (final sub in _serverSubscriptionRegistry.values) + sub.channel: SpinifySubscribeRequest( + id: id, + timestamp: now, + channel: sub.channel, + recover: sub.state.recoverable, + epoch: sub.state.since?.epoch, + offset: sub.state.since?.offset, + token: null, + data: null, + positioned: null, + recoverable: null, + joinLeave: null, + ), + }, name: config.client.name, version: config.client.version, ); } final reply = await _sendCommand(request); + _setState(SpinifyState$Connected( url: url, client: reply.client, @@ -690,58 +850,6 @@ base mixin SpinifyPingPongMixin } } -/// Base mixin for Spinify subscription management. -base mixin SpinifySubscriptionMixin on SpinifyBase { - final StreamController _pushesController = - StreamController.broadcast(); - - @override - late final ChannelEvents stream = - ChannelEvents(_pushesController.stream); - - @override - ({ - Map client, - Map server - }) get subscriptions => throw UnimplementedError(); - - @override - SpinifyClientSubscription? getSubscription(String channel) => - throw UnimplementedError(); - - @override - SpinifyClientSubscription newSubscription(String channel, - [SpinifySubscriptionConfig? config]) => - throw UnimplementedError(); - - @override - Future removeSubscription(SpinifyClientSubscription subscription) => - throw UnimplementedError(); - - @override - Future _onReply(SpinifyReply reply) async { - await super._onReply(reply); - if (!reply.isResult && reply is SpinifyPush) { - _pushesController.add(reply.event); - config.logger?.call( - const SpinifyLogLevel.debug(), - 'push_received', - 'Push received', - { - 'push': reply, - 'event': reply.event, - }, - ); - } - } - - @override - Future close() async { - await super.close(); - await _pushesController.close(); - } -} - /// Base mixin for Spinify client publications management. base mixin SpinifyPublicationsMixin on SpinifyBase { @override @@ -816,9 +924,9 @@ final class Spinify extends SpinifyBase with SpinifyStateMixin, SpinifyCommandMixin, + SpinifySubscriptionMixin, SpinifyConnectionMixin, SpinifyPingPongMixin, - SpinifySubscriptionMixin, SpinifyPublicationsMixin, SpinifyPresenceMixin, SpinifyHistoryMixin, diff --git a/lib/src/model/spinify_interface.dart b/lib/src/spinify_interface.dart similarity index 92% rename from lib/src/model/spinify_interface.dart rename to lib/src/spinify_interface.dart index a358b73..cb27faf 100644 --- a/lib/src/model/spinify_interface.dart +++ b/lib/src/spinify_interface.dart @@ -2,17 +2,17 @@ import 'dart:async'; -import 'channel_event.dart'; -import 'channel_events.dart'; -import 'config.dart'; -import 'history.dart'; -import 'metric.dart'; -import 'presence_stats.dart'; -import 'state.dart'; -import 'states_stream.dart'; -import 'stream_position.dart'; -import 'subscription.dart'; -import 'subscription_config.dart'; +import 'model/channel_event.dart'; +import 'model/channel_events.dart'; +import 'model/config.dart'; +import 'model/history.dart'; +import 'model/metric.dart'; +import 'model/presence_stats.dart'; +import 'model/state.dart'; +import 'model/states_stream.dart'; +import 'model/stream_position.dart'; +import 'model/subscription_config.dart'; +import 'subscription_interface.dart'; /// Spinify client interface. abstract interface class ISpinify diff --git a/lib/src/subscription_impl.dart b/lib/src/subscription_impl.dart new file mode 100644 index 0000000..db76db4 --- /dev/null +++ b/lib/src/subscription_impl.dart @@ -0,0 +1,100 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +import 'model/channel_event.dart'; +import 'model/channel_events.dart'; +import 'model/exception.dart'; +import 'model/history.dart'; +import 'model/presence_stats.dart'; +import 'model/stream_position.dart'; +import 'model/subscription_config.dart'; +import 'model/subscription_state.dart'; +import 'model/subscription_states_stream.dart'; +import 'spinify_interface.dart'; +import 'subscription_interface.dart'; + +@internal +final class SpinifyClientSubscriptionImpl implements SpinifyClientSubscription { + SpinifyClientSubscriptionImpl({ + required ISpinify client, + required this.channel, + required this.config, + }) : _clientWR = WeakReference(client); + + @override + final String channel; + + @override + final SpinifySubscriptionConfig config; + + /// Spinify client weak reference. + final WeakReference _clientWR; + ISpinify get _client { + final target = _clientWR.target; + if (target == null) { + throw SpinifySubscriptionException( + channel: channel, + message: 'Client is closed', + ); + } + return target; + } + + @override + SpinifyStreamPosition? get since => throw UnimplementedError(); + + @override + SpinifySubscriptionState get state => throw UnimplementedError(); + + @override + SpinifySubscriptionStateStream get states => throw UnimplementedError(); + + @override + ChannelEvents get stream => + _client.stream.filter(channel: channel); + + @override + Future history({ + int? limit, + SpinifyStreamPosition? since, + bool? reverse, + }) { + throw UnimplementedError(); + } + + @override + Future presence() { + throw UnimplementedError(); + } + + @override + Future presenceStats() { + throw UnimplementedError(); + } + + @override + Future publish(List data) { + throw UnimplementedError(); + } + + @override + FutureOr ready() { + throw UnimplementedError(); + } + + @override + Future subscribe() { + throw UnimplementedError(); + } + + @override + Future unsubscribe([ + int code = 0, + String reason = 'unsubscribe called', + ]) { + throw UnimplementedError(); + } + + Future close() async {} +} diff --git a/lib/src/model/subscription.dart b/lib/src/subscription_interface.dart similarity index 89% rename from lib/src/model/subscription.dart rename to lib/src/subscription_interface.dart index bbc8d91..685015b 100644 --- a/lib/src/model/subscription.dart +++ b/lib/src/subscription_interface.dart @@ -1,12 +1,13 @@ import 'dart:async'; -import 'channel_event.dart'; -import 'channel_events.dart'; -import 'history.dart'; -import 'presence_stats.dart'; -import 'stream_position.dart'; -import 'subscription_state.dart'; -import 'subscription_states_stream.dart'; +import 'model/channel_event.dart'; +import 'model/channel_events.dart'; +import 'model/history.dart'; +import 'model/presence_stats.dart'; +import 'model/stream_position.dart'; +import 'model/subscription_config.dart'; +import 'model/subscription_state.dart'; +import 'model/subscription_states_stream.dart'; /// {@template subscription} /// Spinify subscription interface. @@ -36,7 +37,7 @@ import 'subscription_states_stream.dart'; /// - For server-side subscriptions see [SpinifyServerSubscription]. /// {@endtemplate} /// {@category Subscription} -sealed class SpinifySubscription { +abstract interface class SpinifySubscription { /// Channel name. abstract final String channel; @@ -129,7 +130,11 @@ sealed class SpinifySubscription { /// {@endtemplate} /// {@category Subscription} /// {@subCategory Client-side} -abstract class SpinifyClientSubscription extends SpinifySubscription { +abstract interface class SpinifyClientSubscription + implements SpinifySubscription { + /// Subscription configuration. + abstract final SpinifySubscriptionConfig config; + /// Start subscribing to a channel Future subscribe(); @@ -154,4 +159,5 @@ abstract class SpinifyClientSubscription extends SpinifySubscription { /// {@endtemplate} /// {@category Subscription} /// {@subCategory Server-side} -abstract class SpinifyServerSubscription extends SpinifySubscription {} +abstract interface class SpinifyServerSubscription + implements SpinifySubscription {} diff --git a/tool/echo/echo.go b/tool/echo/echo.go index 761fe2d..aad5d8e 100644 --- a/tool/echo/echo.go +++ b/tool/echo/echo.go @@ -49,7 +49,7 @@ func waitExitSignal(n *centrifuge.Node, s *http.Server, sigCh chan os.Signal) { <-done } -var channels = []string{"public:index", "chat:index", "notification:index"} +var channels = []string{"public:index", "chat:index"} // Check whether channel is allowed for subscribing. In real case permission // check will probably be more complex than in this example. @@ -97,9 +97,15 @@ func Centrifuge() (*centrifuge.Node, error) { Subscriptions: map[string]centrifuge.SubscribeOptions{ "#" + cred.UserID: { EnableRecovery: true, - EmitPresence: true, - EmitJoinLeave: true, - PushJoinLeave: true, + EmitPresence: false, + EmitJoinLeave: false, + PushJoinLeave: false, + }, + "notification:index": { + EnableRecovery: true, + EmitPresence: false, + EmitJoinLeave: false, + PushJoinLeave: false, }, }, }, nil