Skip to content

Commit

Permalink
Refactor SpinifyImpl to handle missing subscriptions in SpinifySubscr…
Browse files Browse the repository at this point in the history
…iptionMixin
  • Loading branch information
PlugFox committed Jun 13, 2024
1 parent c34dfe0 commit 5816d7b
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 46 deletions.
16 changes: 8 additions & 8 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"version": "0.2.0",
"configurations": [
{
/* {
"name": "[Flutter] Example (Local)",
"request": "launch",
"type": "dart",
Expand All @@ -17,8 +17,8 @@
"args": [
"--dart-define-from-file=config/local.json"
]
},
{
}, */
/* {
"name": "[Flutter] Example (Development)",
"request": "launch",
"type": "dart",
Expand All @@ -34,11 +34,11 @@
"args": [
"--dart-define-from-file=config/development.json"
]
},
}, */
// https://pub.dev/packages/test
// dart test test/unit_test.dart --color --platform=vm
{
"name": "[Dart] Unit test (VM)",
"name": "[Dart] Unit test (vm)",
"request": "launch",
"type": "dart",
"program": "test/unit_test.dart",
Expand Down Expand Up @@ -115,7 +115,7 @@
},
// dart test test/smoke_test.dart --color --platform=vm
{
"name": "[Dart] Smoke Test (VM)",
"name": "[Dart] Smoke Test (vm)",
"request": "launch",
"type": "dart",
"program": "test/smoke_test.dart",
Expand All @@ -136,8 +136,8 @@
"--concurrency=12"
],
"args": [],
"preLaunchTask": "echo-server:start",
"postDebugTask": "echo-server:stop"
"preLaunchTask": "echo:start",
"postDebugTask": "echo:stop"
},
// dart run server/bin/server.dart
{
Expand Down
13 changes: 13 additions & 0 deletions lib/src/model/channel_event.dart
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ final class SpinifyPublication extends SpinifyChannelEvent {
/// Optional tags, this is a map with string keys and string values
final Map<String, String>? tags;

/// Copy this publication with a new channel.
SpinifyPublication copyWith({required String channel}) =>
channel == this.channel
? this
: SpinifyPublication(
timestamp: timestamp,
channel: channel,
data: data,
offset: offset,
info: info,
tags: tags,
);

@override
bool get isConnect => false;

Expand Down
2 changes: 1 addition & 1 deletion lib/src/model/command.dart
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ final class SpinifyPublishRequest extends SpinifyCommand {
final String channel;

/// Data to publish.
final Uint8List data;
final List<int> data;
}

/// {@macro command}
Expand Down
8 changes: 6 additions & 2 deletions lib/src/model/reply.dart
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ final class SpinifyPresenceResult extends SpinifyReply
@override
String get type => 'PresenceResult';

/// Presence
/// { Channel : ClientInfo }
/// Contains presence information - a map client IDs as keys
/// and client information as values.
final Map<String, SpinifyClientInfo> presence;
}

Expand Down Expand Up @@ -298,13 +298,17 @@ final class SpinifyHistoryResult extends SpinifyReply
required super.id,
required super.timestamp,
required this.since,
required this.publications,
});

@override
String get type => 'HistoryResult';

/// Offset
final SpinifyStreamPosition since;

/// Publications
final List<SpinifyPublication> publications;
}

/// {@macro reply}
Expand Down
16 changes: 15 additions & 1 deletion lib/src/protobuf/protobuf_codec.dart
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ final class ProtobufReplyDecoder extends Converter<pb.Reply, SpinifyReply> {
for (final pub in sub.publications)
SpinifyPublication(
timestamp: now,
// TODO(plugfox): SpinifyPublication in SubscribeResult do not
// SpinifyPublication in SubscribeResult do not
// have the "channel" field - I should fill it in manually
// by copying the channel from the SubscribeRequest
channel: '',
Expand Down Expand Up @@ -486,6 +486,20 @@ final class ProtobufReplyDecoder extends Converter<pb.Reply, SpinifyReply> {
offset: history.offset,
epoch: history.epoch,
),
publications: <SpinifyPublication>[
for (final pub in history.publications)
SpinifyPublication(
timestamp: now,
// SpinifyPublication in HistoryResult do not
// have the "channel" field - I should fill it in manually
// by copying the channel from the SubscribeRequest
channel: '',
data: pub.data,
info: _decodeClientInfo(pub.info),
offset: pub.offset,
tags: pub.tags,
),
],
);
} else if (reply.hasRpc()) {
final rpc = reply.rpc;
Expand Down
123 changes: 92 additions & 31 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'package:meta/meta.dart';

import 'model/channel_event.dart';
import 'model/channel_events.dart';
import 'model/client_info.dart';
import 'model/command.dart';
import 'model/config.dart';
import 'model/constant.dart';
Expand Down Expand Up @@ -99,6 +100,11 @@ abstract base class SpinifyBase implements ISpinify {
);
}

Future<T> _doOnReady<T>(Future<T> Function() action) {
if (state.isConnected) return action();
return ready().then<T>((_) => action());
}

@override
Future<void> close() async {
config.logger?.call(
Expand Down Expand Up @@ -161,10 +167,14 @@ base mixin SpinifyCommandMixin on SpinifyBase {
<int, ({SpinifyCommand command, Completer<SpinifyReply> completer})>{};

@override
Future<void> send(List<int> data) => _sendCommandAsync(SpinifySendRequest(
timestamp: DateTime.now(),
data: data,
));
Future<void> send(List<int> data) => _doOnReady(
() => _sendCommandAsync(
SpinifySendRequest(
timestamp: DateTime.now(),
data: data,
),
),
);

Future<T> _sendCommand<T extends SpinifyReply>(SpinifyCommand command) async {
config.logger?.call(
Expand Down Expand Up @@ -434,6 +444,9 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
..setState(SpinifySubscriptionState.unsubscribed());
// TODO(plugfox): Resubscribe client subscriptions on unsubscribe
// if unsubscribe.code >= 2500
} else if (event is SpinifyMessage && event.channel.isEmpty) {
// Notify about new message from the server (without channel).
_eventController.add(event);
} else {
// Notify subscription about new event.
final sub = _serverSubscriptionRegistry[event.channel] ??
Expand Down Expand Up @@ -490,14 +503,7 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
publication.channel.isEmpty,
'Publication contains wrong channel',
);
publication = SpinifyPublication(
channel: channel,
data: publication.data,
info: publication.info,
timestamp: publication.timestamp,
tags: publication.tags,
offset: publication.offset,
);
publication = publication.copyWith(channel: channel);
}
_eventController.add(publication);
sub.onEvent(publication);
Expand Down Expand Up @@ -824,6 +830,14 @@ base mixin SpinifyConnectionMixin
@override
Future<void> ready() async {
if (state.isConnected) return;
if (state.isClosed)
throw const SpinifyConnectionException(
message: 'Connection is closed permanently',
);
if (!state.isConnecting)
throw const SpinifyConnectionException(
message: 'Is not connecting to the server',
);
return (_readyCompleter ??= Completer<void>()).future;
}

Expand Down Expand Up @@ -951,47 +965,94 @@ base mixin SpinifyPingPongMixin
}

/// Base mixin for Spinify client publications management.
base mixin SpinifyPublicationsMixin on SpinifyBase {
base mixin SpinifyPublicationsMixin on SpinifyBase, SpinifyCommandMixin {
@override
Future<void> publish(String channel, List<int> data) =>
throw UnimplementedError();
Future<void> publish(String channel, List<int> data) => _doOnReady(
() => _sendCommand<SpinifyPublishResult>(
SpinifyPublishRequest(
id: _getNextCommandId(),
channel: channel,
timestamp: DateTime.now(),
data: data,
),
),
);
}

/// Base mixin for Spinify client presence management.
base mixin SpinifyPresenceMixin on SpinifyBase {
base mixin SpinifyPresenceMixin on SpinifyBase, SpinifyCommandMixin {
@override
Future<SpinifyPresence> presence(String channel) =>
throw UnimplementedError();
Future<Map<String, SpinifyClientInfo>> presence(String channel) => _doOnReady(
() => _sendCommand<SpinifyPresenceResult>(
SpinifyPresenceRequest(
id: _getNextCommandId(),
channel: channel,
timestamp: DateTime.now(),
),
).then<Map<String, SpinifyClientInfo>>((reply) => reply.presence),
);

@override
Future<SpinifyPresenceStats> presenceStats(String channel) =>
throw UnimplementedError();
Future<SpinifyPresenceStats> presenceStats(String channel) => _doOnReady(
() => _sendCommand<SpinifyPresenceStatsResult>(
SpinifyPresenceStatsRequest(
id: _getNextCommandId(),
channel: channel,
timestamp: DateTime.now(),
),
).then<SpinifyPresenceStats>(
(reply) => SpinifyPresenceStats(
channel: channel,
clients: reply.numClients,
users: reply.numUsers,
),
),
);
}

/// Base mixin for Spinify client history management.
base mixin SpinifyHistoryMixin on SpinifyBase {
base mixin SpinifyHistoryMixin on SpinifyBase, SpinifyCommandMixin {
@override
Future<SpinifyHistory> history(
String channel, {
int? limit,
SpinifyStreamPosition? since,
bool? reverse,
}) =>
throw UnimplementedError();
_doOnReady(
() => _sendCommand<SpinifyHistoryResult>(
SpinifyHistoryRequest(
id: _getNextCommandId(),
channel: channel,
timestamp: DateTime.now(),
limit: limit,
since: since,
reverse: reverse,
),
).then<SpinifyHistory>(
(reply) => SpinifyHistory(
publications: List<SpinifyPublication>.unmodifiable(reply
.publications
.map((pub) => pub.copyWith(channel: channel))),
since: reply.since,
),
),
);
}

/// Base mixin for Spinify client RPC management.
base mixin SpinifyRPCMixin on SpinifyBase, SpinifyCommandMixin {
@override
Future<List<int>> rpc(String method, List<int> data) =>
_sendCommand<SpinifyRPCResult>(
SpinifyRPCRequest(
id: _getNextCommandId(),
timestamp: DateTime.now(),
method: method,
data: data,
),
).then<List<int>>((reply) => reply.data);
Future<List<int>> rpc(String method, List<int> data) => _doOnReady(
() => _sendCommand<SpinifyRPCResult>(
SpinifyRPCRequest(
id: _getNextCommandId(),
timestamp: DateTime.now(),
method: method,
data: data,
),
).then<List<int>>((reply) => reply.data),
);
}

/// Base mixin for Spinify client metrics management.
Expand Down
5 changes: 4 additions & 1 deletion lib/src/spinify_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:async';

import 'model/channel_event.dart';
import 'model/channel_events.dart';
import 'model/client_info.dart';
import 'model/config.dart';
import 'model/history.dart';
import 'model/metric.dart';
Expand Down Expand Up @@ -119,7 +120,9 @@ abstract interface class ISpinifySubscriptionsManager {
/// Spinify presence owner interface.
abstract interface class ISpinifyPresenceOwner {
/// Fetch presence information inside a channel.
Future<SpinifyPresence> presence(String channel);
/// Contains presence information - a map client IDs as keys
/// and client information as values.
Future<Map<String, SpinifyClientInfo>> presence(String channel);

/// Fetch presence stats information inside a channel.
Future<SpinifyPresenceStats> presenceStats(String channel);
Expand Down
3 changes: 2 additions & 1 deletion lib/src/subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:meta/meta.dart';

import 'model/channel_event.dart';
import 'model/channel_events.dart';
import 'model/client_info.dart';
import 'model/config.dart';
import 'model/exception.dart';
import 'model/history.dart';
Expand Down Expand Up @@ -153,7 +154,7 @@ abstract base class SpinifySubscriptionBase implements SpinifySubscription {
}

@override
Future<SpinifyPresence> presence() async {
Future<Map<String, SpinifyClientInfo>> presence() async {
await ready().timeout(_client.config.timeout);
return _client.presence(channel);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/src/subscription_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'package:fixnum/fixnum.dart' as fixnum;

import 'model/channel_event.dart';
import 'model/channel_events.dart';
import 'model/client_info.dart';
import 'model/history.dart';
import 'model/presence_stats.dart';
import 'model/stream_position.dart';
Expand Down Expand Up @@ -78,7 +79,7 @@ abstract interface class SpinifySubscription {
});

/// Fetch presence information inside a channel.
Future<SpinifyPresence> presence();
Future<Map<String, SpinifyClientInfo>> presence();

/// Fetch presence stats information inside a channel.
Future<SpinifyPresenceStats> presenceStats();
Expand Down
Loading

0 comments on commit 5816d7b

Please sign in to comment.