Skip to content

Commit

Permalink
Refactor SpinifySubscriptionMixin to use specific implementation clas…
Browse files Browse the repository at this point in the history
…ses for client and server subscriptions
  • Loading branch information
PlugFox committed Jun 12, 2024
1 parent 07994aa commit 6ba54bd
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 112 deletions.
5 changes: 4 additions & 1 deletion lib/src/model/subscription_config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ class SpinifySubscriptionConfig {
this.recoverable = false,
this.joinLeave = false,
this.timeout = const Duration(seconds: 15),
});
}) : assert(
(recoverable == false && since == null) ||
(recoverable == true && since != null),
'recoverable and since must be set together');

/// Create a default config
///
Expand Down
80 changes: 10 additions & 70 deletions lib/src/model/subscription_state.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import 'package:fixnum/fixnum.dart' as fixnum;
import 'package:meta/meta.dart';

import 'stream_position.dart';

/// {@template subscription_state}
/// Subscription has 3 states:
///
Expand All @@ -17,36 +14,24 @@ import 'stream_position.dart';
@immutable
sealed class SpinifySubscriptionState extends _$SpinifySubscriptionStateBase {
/// {@macro subscription_state}
const SpinifySubscriptionState(
{required super.timestamp,
required super.since,
required super.recoverable});
const SpinifySubscriptionState({required super.timestamp});

/// Unsubscribed
/// {@macro subscription_state}
factory SpinifySubscriptionState.unsubscribed({
required int code,
required String reason,
DateTime? timestamp,
SpinifyStreamPosition? since,
bool recoverable,
}) = SpinifySubscriptionState$Unsubscribed;

/// Subscribing
/// {@macro subscription_state}
factory SpinifySubscriptionState.subscribing({
DateTime? timestamp,
SpinifyStreamPosition? since,
bool recoverable,
}) = SpinifySubscriptionState$Subscribing;

/// Subscribed
/// {@macro subscription_state}
factory SpinifySubscriptionState.subscribed({
DateTime? timestamp,
SpinifyStreamPosition? since,
bool recoverable,
DateTime? ttl,
}) = SpinifySubscriptionState$Subscribed;
}

Expand All @@ -59,22 +44,12 @@ final class SpinifySubscriptionState$Unsubscribed
extends SpinifySubscriptionState {
/// {@macro subscription_state}
SpinifySubscriptionState$Unsubscribed({
required this.code,
required this.reason,
DateTime? timestamp,
super.since,
super.recoverable = false,
}) : super(timestamp: timestamp ?? DateTime.now());

@override
String get type => 'unsubscribed';

/// Unsubscribe code.
final int code;

/// Unsubscribe reason.
final String reason;

@override
bool get isUnsubscribed => true;

Expand All @@ -99,17 +74,11 @@ final class SpinifySubscriptionState$Unsubscribed
unsubscribed(this);

@override
Map<String, Object?> toJson() => <String, Object?>{
...super.toJson(),
'code': code,
'reason': reason,
};

@override
int get hashCode => Object.hash(0, timestamp, since);
int get hashCode => Object.hash(0, timestamp);

@override
bool operator ==(Object other) => identical(this, other);
bool operator ==(Object other) =>
identical(this, other) || other is SpinifySubscriptionState$Unsubscribed;

@override
String toString() => r'SpinifySubscriptionState$Unsubscribed{}';
Expand All @@ -125,8 +94,6 @@ final class SpinifySubscriptionState$Subscribing
/// {@macro subscription_state}
SpinifySubscriptionState$Subscribing({
DateTime? timestamp,
super.since,
super.recoverable = false,
}) : super(timestamp: timestamp ?? DateTime.now());

@override
Expand Down Expand Up @@ -156,10 +123,11 @@ final class SpinifySubscriptionState$Subscribing
subscribing(this);

@override
int get hashCode => Object.hash(1, timestamp, since);
int get hashCode => Object.hash(1, timestamp);

@override
bool operator ==(Object other) => identical(this, other);
bool operator ==(Object other) =>
identical(this, other) || other is SpinifySubscriptionState$Subscribing;

@override
String toString() => r'SpinifySubscriptionState$Subscribing{}';
Expand All @@ -175,17 +143,11 @@ final class SpinifySubscriptionState$Subscribed
/// {@macro subscription_state}
SpinifySubscriptionState$Subscribed({
DateTime? timestamp,
super.since,
super.recoverable = false,
this.ttl,
}) : super(timestamp: timestamp ?? DateTime.now());

@override
String get type => 'subscribed';

/// Time to live in seconds.
final DateTime? ttl;

@override
bool get isUnsubscribed => false;

Expand All @@ -210,16 +172,11 @@ final class SpinifySubscriptionState$Subscribed
subscribed(this);

@override
Map<String, Object?> toJson() => <String, Object?>{
...super.toJson(),
if (ttl != null) 'ttl': ttl?.toUtc().toIso8601String(),
};
int get hashCode => Object.hash(2, timestamp);

@override
int get hashCode => Object.hash(2, timestamp, since, recoverable, ttl);

@override
bool operator ==(Object other) => identical(this, other);
bool operator ==(Object other) =>
identical(this, other) || other is SpinifySubscriptionState$Subscribed;

@override
String toString() => r'SpinifySubscriptionState$Subscribed{}';
Expand All @@ -234,8 +191,6 @@ typedef SpinifySubscriptionStateMatch<R, S extends SpinifySubscriptionState> = R
abstract base class _$SpinifySubscriptionStateBase {
const _$SpinifySubscriptionStateBase({
required this.timestamp,
required this.since,
required this.recoverable,
});

/// Represents the current state type.
Expand All @@ -244,12 +199,6 @@ abstract base class _$SpinifySubscriptionStateBase {
/// Timestamp of state change.
final DateTime timestamp;

/// Stream Position
final SpinifyStreamPosition? since;

/// Whether channel is recoverable.
final bool recoverable;

/// Whether channel is unsubscribed.
abstract final bool isUnsubscribed;

Expand Down Expand Up @@ -306,14 +255,5 @@ abstract base class _$SpinifySubscriptionStateBase {
Map<String, Object?> toJson() => <String, Object?>{
'type': type,
'timestamp': timestamp.toUtc().toIso8601String(),
if (since != null)
'since': switch (since) {
(:fixnum.Int64 offset, :String epoch) => <String, Object>{
'offset': offset,
'epoch': epoch,
},
_ => null,
},
'recoverable': recoverable,
};
}
75 changes: 48 additions & 27 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:collection';

import 'package:meta/meta.dart';

Expand All @@ -16,6 +17,7 @@ import 'model/state.dart';
import 'model/states_stream.dart';
import 'model/stream_position.dart';
import 'model/subscription_config.dart';
import 'model/subscription_state.dart';
import 'model/transport_interface.dart';
import 'spinify_interface.dart';
import 'subscription_impl.dart';
Expand Down Expand Up @@ -302,18 +304,23 @@ base mixin SpinifyCommandMixin on SpinifyBase {

/// Base mixin for Spinify subscription management.
base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
final StreamController<SpinifyChannelEvent> _pushesController =
final StreamController<SpinifyChannelEvent> _eventController =
StreamController<SpinifyChannelEvent>.broadcast();

@override
late final SpinifyChannelEvents<SpinifyChannelEvent> stream =
SpinifyChannelEvents<SpinifyChannelEvent>(_pushesController.stream);
SpinifyChannelEvents<SpinifyChannelEvent>(_eventController.stream);

@override
({
Map<String, SpinifyClientSubscription> client,
Map<String, SpinifyServerSubscription> server
}) get subscriptions => throw UnimplementedError();
}) get subscriptions => (
client: UnmodifiableMapView<String, SpinifyClientSubscription>(
_clientSubscriptionRegistry),
server: UnmodifiableMapView<String, SpinifyServerSubscription>(
_serverSubscriptionRegistry),
);

/// Registry of client subscriptions.
final Map<String, SpinifyClientSubscriptionImpl> _clientSubscriptionRegistry =
Expand Down Expand Up @@ -397,31 +404,45 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
await super._onReply(reply);
if (reply is SpinifyPush) {
// Add push to the stream.
_pushesController.add(reply.event);
_eventController.add(reply.event);
final sub = _clientSubscriptionRegistry[reply.event.channel] ??
_serverSubscriptionRegistry[reply.event.channel];
sub?.onEvent(reply.event);
} else if (reply is SpinifyConnectResult) {
// Update subscriptions state.
//final entries = reply.subs?.entries;
// TODO(plugfox): implement subscription state update
/* for (final entry in entries) {
final MapEntry<String, SpinifySubscribeResult>(key: channel, value: sub)
= entry;
final subState = reply.subs[channel];
if (subState != null) {
sub.state = subState;
config.logger?.call(
const SpinifyLogLevel.debug(),
'subscription_state_updated',
'Subscription state updated',
<String, Object?>{
'subscription': sub,
'state': subState,
},
);
// Update server subscriptions.
final newServerSubs = reply.subs ?? <String, SpinifySubscribeResult>{};
for (final entry in newServerSubs.entries) {
final MapEntry<String, SpinifySubscribeResult>(
key: channel,
value: value
) = entry;
final sub = _serverSubscriptionRegistry.putIfAbsent(
channel,
() => SpinifyServerSubscriptionImpl(
client: this,
channel: channel,
recoverable: value.recoverable,
epoch: value.since.epoch,
offset: value.since.offset,
))
..recoverable = value.recoverable
..epoch = value.since.epoch
..offset = value.since.offset
..setState(SpinifySubscriptionState.subscribed());
// Notify about new publications.
for (final publication in value.publications) {
_eventController.add(publication);
sub.onEvent(publication);
}
} */
}
final currentServerSubs = _serverSubscriptionRegistry.keys.toSet();
for (final key in currentServerSubs) {
if (newServerSubs.containsKey(key)) continue;
_serverSubscriptionRegistry
.remove(key)
?.setState(SpinifySubscriptionState.unsubscribed());
}
// TODO(plugfox): Resubscribe client subscriptions on connect
}
}

Expand All @@ -432,7 +453,7 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
for (final sub in _serverSubscriptionRegistry.values) sub.close();
_clientSubscriptionRegistry.clear();
_serverSubscriptionRegistry.clear();
_pushesController.close().ignore();
_eventController.close().ignore();
}
}

Expand Down Expand Up @@ -490,9 +511,9 @@ base mixin SpinifyConnectionMixin
id: id,
timestamp: now,
channel: sub.channel,
recover: sub.state.recoverable,
epoch: sub.state.since?.epoch,
offset: sub.state.since?.offset,
recover: sub.recoverable,
epoch: sub.epoch,
offset: sub.offset,
token: null,
data: null,
positioned: null,
Expand Down
Loading

0 comments on commit 6ba54bd

Please sign in to comment.