diff --git a/.vscode/launch.json b/.vscode/launch.json index cce9597..d903e9d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,7 +1,7 @@ { "version": "0.2.0", "configurations": [ - { + /* { "name": "[Flutter] Example (Local)", "request": "launch", "type": "dart", @@ -17,8 +17,8 @@ "args": [ "--dart-define-from-file=config/local.json" ] - }, - { + }, */ + /* { "name": "[Flutter] Example (Development)", "request": "launch", "type": "dart", @@ -34,11 +34,11 @@ "args": [ "--dart-define-from-file=config/development.json" ] - }, + }, */ // https://pub.dev/packages/test // dart test test/unit_test.dart --color --platform=vm { - "name": "[Dart] Unit test (VM)", + "name": "[Dart] Unit test (vm)", "request": "launch", "type": "dart", "program": "test/unit_test.dart", @@ -115,7 +115,7 @@ }, // dart test test/smoke_test.dart --color --platform=vm { - "name": "[Dart] Smoke Test (VM)", + "name": "[Dart] Smoke Test (vm)", "request": "launch", "type": "dart", "program": "test/smoke_test.dart", @@ -136,8 +136,8 @@ "--concurrency=12" ], "args": [], - "preLaunchTask": "echo-server:start", - "postDebugTask": "echo-server:stop" + "preLaunchTask": "echo:start", + "postDebugTask": "echo:stop" }, // dart run server/bin/server.dart { diff --git a/lib/src/model/channel_event.dart b/lib/src/model/channel_event.dart index bc0907a..448a972 100644 --- a/lib/src/model/channel_event.dart +++ b/lib/src/model/channel_event.dart @@ -160,6 +160,19 @@ final class SpinifyPublication extends SpinifyChannelEvent { /// Optional tags, this is a map with string keys and string values final Map? tags; + /// Copy this publication with a new channel. + SpinifyPublication copyWith({required String channel}) => + channel == this.channel + ? this + : SpinifyPublication( + timestamp: timestamp, + channel: channel, + data: data, + offset: offset, + info: info, + tags: tags, + ); + @override bool get isConnect => false; diff --git a/lib/src/model/command.dart b/lib/src/model/command.dart index ee5885b..21b30fe 100644 --- a/lib/src/model/command.dart +++ b/lib/src/model/command.dart @@ -169,7 +169,7 @@ final class SpinifyPublishRequest extends SpinifyCommand { final String channel; /// Data to publish. - final Uint8List data; + final List data; } /// {@macro command} diff --git a/lib/src/model/reply.dart b/lib/src/model/reply.dart index d8d739b..dc9967f 100644 --- a/lib/src/model/reply.dart +++ b/lib/src/model/reply.dart @@ -264,8 +264,8 @@ final class SpinifyPresenceResult extends SpinifyReply @override String get type => 'PresenceResult'; - /// Presence - /// { Channel : ClientInfo } + /// Contains presence information - a map client IDs as keys + /// and client information as values. final Map presence; } @@ -298,6 +298,7 @@ final class SpinifyHistoryResult extends SpinifyReply required super.id, required super.timestamp, required this.since, + required this.publications, }); @override @@ -305,6 +306,9 @@ final class SpinifyHistoryResult extends SpinifyReply /// Offset final SpinifyStreamPosition since; + + /// Publications + final List publications; } /// {@macro reply} diff --git a/lib/src/protobuf/protobuf_codec.dart b/lib/src/protobuf/protobuf_codec.dart index edf9dbe..87296c8 100644 --- a/lib/src/protobuf/protobuf_codec.dart +++ b/lib/src/protobuf/protobuf_codec.dart @@ -386,7 +386,7 @@ final class ProtobufReplyDecoder extends Converter { for (final pub in sub.publications) SpinifyPublication( timestamp: now, - // TODO(plugfox): SpinifyPublication in SubscribeResult do not + // SpinifyPublication in SubscribeResult do not // have the "channel" field - I should fill it in manually // by copying the channel from the SubscribeRequest channel: '', @@ -486,6 +486,20 @@ final class ProtobufReplyDecoder extends Converter { offset: history.offset, epoch: history.epoch, ), + publications: [ + for (final pub in history.publications) + SpinifyPublication( + timestamp: now, + // SpinifyPublication in HistoryResult do not + // have the "channel" field - I should fill it in manually + // by copying the channel from the SubscribeRequest + channel: '', + data: pub.data, + info: _decodeClientInfo(pub.info), + offset: pub.offset, + tags: pub.tags, + ), + ], ); } else if (reply.hasRpc()) { final rpc = reply.rpc; diff --git a/lib/src/spinify_impl.dart b/lib/src/spinify_impl.dart index 3799a9b..748d5d3 100644 --- a/lib/src/spinify_impl.dart +++ b/lib/src/spinify_impl.dart @@ -6,6 +6,7 @@ import 'package:meta/meta.dart'; import 'model/channel_event.dart'; import 'model/channel_events.dart'; +import 'model/client_info.dart'; import 'model/command.dart'; import 'model/config.dart'; import 'model/constant.dart'; @@ -99,6 +100,11 @@ abstract base class SpinifyBase implements ISpinify { ); } + Future _doOnReady(Future Function() action) { + if (state.isConnected) return action(); + return ready().then((_) => action()); + } + @override Future close() async { config.logger?.call( @@ -161,10 +167,14 @@ base mixin SpinifyCommandMixin on SpinifyBase { completer})>{}; @override - Future send(List data) => _sendCommandAsync(SpinifySendRequest( - timestamp: DateTime.now(), - data: data, - )); + Future send(List data) => _doOnReady( + () => _sendCommandAsync( + SpinifySendRequest( + timestamp: DateTime.now(), + data: data, + ), + ), + ); Future _sendCommand(SpinifyCommand command) async { config.logger?.call( @@ -434,6 +444,9 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { ..setState(SpinifySubscriptionState.unsubscribed()); // TODO(plugfox): Resubscribe client subscriptions on unsubscribe // if unsubscribe.code >= 2500 + } else if (event is SpinifyMessage && event.channel.isEmpty) { + // Notify about new message from the server (without channel). + _eventController.add(event); } else { // Notify subscription about new event. final sub = _serverSubscriptionRegistry[event.channel] ?? @@ -490,14 +503,7 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin { publication.channel.isEmpty, 'Publication contains wrong channel', ); - publication = SpinifyPublication( - channel: channel, - data: publication.data, - info: publication.info, - timestamp: publication.timestamp, - tags: publication.tags, - offset: publication.offset, - ); + publication = publication.copyWith(channel: channel); } _eventController.add(publication); sub.onEvent(publication); @@ -824,6 +830,14 @@ base mixin SpinifyConnectionMixin @override Future ready() async { if (state.isConnected) return; + if (state.isClosed) + throw const SpinifyConnectionException( + message: 'Connection is closed permanently', + ); + if (!state.isConnecting) + throw const SpinifyConnectionException( + message: 'Is not connecting to the server', + ); return (_readyCompleter ??= Completer()).future; } @@ -951,25 +965,53 @@ base mixin SpinifyPingPongMixin } /// Base mixin for Spinify client publications management. -base mixin SpinifyPublicationsMixin on SpinifyBase { +base mixin SpinifyPublicationsMixin on SpinifyBase, SpinifyCommandMixin { @override - Future publish(String channel, List data) => - throw UnimplementedError(); + Future publish(String channel, List data) => _doOnReady( + () => _sendCommand( + SpinifyPublishRequest( + id: _getNextCommandId(), + channel: channel, + timestamp: DateTime.now(), + data: data, + ), + ), + ); } /// Base mixin for Spinify client presence management. -base mixin SpinifyPresenceMixin on SpinifyBase { +base mixin SpinifyPresenceMixin on SpinifyBase, SpinifyCommandMixin { @override - Future presence(String channel) => - throw UnimplementedError(); + Future> presence(String channel) => _doOnReady( + () => _sendCommand( + SpinifyPresenceRequest( + id: _getNextCommandId(), + channel: channel, + timestamp: DateTime.now(), + ), + ).then>((reply) => reply.presence), + ); @override - Future presenceStats(String channel) => - throw UnimplementedError(); + Future presenceStats(String channel) => _doOnReady( + () => _sendCommand( + SpinifyPresenceStatsRequest( + id: _getNextCommandId(), + channel: channel, + timestamp: DateTime.now(), + ), + ).then( + (reply) => SpinifyPresenceStats( + channel: channel, + clients: reply.numClients, + users: reply.numUsers, + ), + ), + ); } /// Base mixin for Spinify client history management. -base mixin SpinifyHistoryMixin on SpinifyBase { +base mixin SpinifyHistoryMixin on SpinifyBase, SpinifyCommandMixin { @override Future history( String channel, { @@ -977,21 +1019,40 @@ base mixin SpinifyHistoryMixin on SpinifyBase { SpinifyStreamPosition? since, bool? reverse, }) => - throw UnimplementedError(); + _doOnReady( + () => _sendCommand( + SpinifyHistoryRequest( + id: _getNextCommandId(), + channel: channel, + timestamp: DateTime.now(), + limit: limit, + since: since, + reverse: reverse, + ), + ).then( + (reply) => SpinifyHistory( + publications: List.unmodifiable(reply + .publications + .map((pub) => pub.copyWith(channel: channel))), + since: reply.since, + ), + ), + ); } /// Base mixin for Spinify client RPC management. base mixin SpinifyRPCMixin on SpinifyBase, SpinifyCommandMixin { @override - Future> rpc(String method, List data) => - _sendCommand( - SpinifyRPCRequest( - id: _getNextCommandId(), - timestamp: DateTime.now(), - method: method, - data: data, - ), - ).then>((reply) => reply.data); + Future> rpc(String method, List data) => _doOnReady( + () => _sendCommand( + SpinifyRPCRequest( + id: _getNextCommandId(), + timestamp: DateTime.now(), + method: method, + data: data, + ), + ).then>((reply) => reply.data), + ); } /// Base mixin for Spinify client metrics management. diff --git a/lib/src/spinify_interface.dart b/lib/src/spinify_interface.dart index e4c94ab..5978cc3 100644 --- a/lib/src/spinify_interface.dart +++ b/lib/src/spinify_interface.dart @@ -4,6 +4,7 @@ import 'dart:async'; import 'model/channel_event.dart'; import 'model/channel_events.dart'; +import 'model/client_info.dart'; import 'model/config.dart'; import 'model/history.dart'; import 'model/metric.dart'; @@ -119,7 +120,9 @@ abstract interface class ISpinifySubscriptionsManager { /// Spinify presence owner interface. abstract interface class ISpinifyPresenceOwner { /// Fetch presence information inside a channel. - Future presence(String channel); + /// Contains presence information - a map client IDs as keys + /// and client information as values. + Future> presence(String channel); /// Fetch presence stats information inside a channel. Future presenceStats(String channel); diff --git a/lib/src/subscription_impl.dart b/lib/src/subscription_impl.dart index bfbd6e3..fdc1a8d 100644 --- a/lib/src/subscription_impl.dart +++ b/lib/src/subscription_impl.dart @@ -5,6 +5,7 @@ import 'package:meta/meta.dart'; import 'model/channel_event.dart'; import 'model/channel_events.dart'; +import 'model/client_info.dart'; import 'model/config.dart'; import 'model/exception.dart'; import 'model/history.dart'; @@ -153,7 +154,7 @@ abstract base class SpinifySubscriptionBase implements SpinifySubscription { } @override - Future presence() async { + Future> presence() async { await ready().timeout(_client.config.timeout); return _client.presence(channel); } diff --git a/lib/src/subscription_interface.dart b/lib/src/subscription_interface.dart index c6b9e11..d8910b8 100644 --- a/lib/src/subscription_interface.dart +++ b/lib/src/subscription_interface.dart @@ -4,6 +4,7 @@ import 'package:fixnum/fixnum.dart' as fixnum; import 'model/channel_event.dart'; import 'model/channel_events.dart'; +import 'model/client_info.dart'; import 'model/history.dart'; import 'model/presence_stats.dart'; import 'model/stream_position.dart'; @@ -78,7 +79,7 @@ abstract interface class SpinifySubscription { }); /// Fetch presence information inside a channel. - Future presence(); + Future> presence(); /// Fetch presence stats information inside a channel. Future presenceStats(); diff --git a/lib/src/transport_fake.dart b/lib/src/transport_fake.dart index 940de2f..cea2bf8 100644 --- a/lib/src/transport_fake.dart +++ b/lib/src/transport_fake.dart @@ -6,6 +6,7 @@ import 'dart:math' as math; import 'package:fixnum/fixnum.dart'; +import 'model/channel_event.dart'; import 'model/command.dart'; import 'model/metric.dart'; import 'model/reply.dart'; @@ -181,6 +182,7 @@ class SpinifyTransportFake implements ISpinifyTransport { id: id, timestamp: now, since: (epoch: '...', offset: Int64.ZERO), + publications: const [], ), ); case SpinifyRPCRequest(:int id, :String method, :List data):