Skip to content

Commit

Permalink
Refactor SpinifySubscriptionMixin to use specific implementation clas…
Browse files Browse the repository at this point in the history
…ses for client and server subscriptions
  • Loading branch information
PlugFox committed Jun 11, 2024
1 parent 53ae274 commit 86cd2fc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 35 deletions.
15 changes: 12 additions & 3 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
<String, SpinifyClientSubscriptionImpl>{};

/// Registry of server subscriptions.
final Map<String, SpinifyServerSubscription> _serverSubscriptionRegistry =
<String, SpinifyServerSubscription>{};
final Map<String, SpinifyServerSubscriptionImpl> _serverSubscriptionRegistry =
<String, SpinifyServerSubscriptionImpl>{};

@override
SpinifyClientSubscription? getSubscription(String channel) =>
Expand Down Expand Up @@ -387,6 +387,8 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
),
stackTrace,
);
} finally {
subFromRegistry?.close();
}
}

Expand All @@ -396,6 +398,9 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
if (reply is SpinifyPush) {
// Add push to the stream.
_pushesController.add(reply.event);
final sub = _clientSubscriptionRegistry[reply.event.channel] ??
_serverSubscriptionRegistry[reply.event.channel];
sub?.onEvent(reply.event);
config.logger?.call(
const SpinifyLogLevel.debug(),
'push_received',
Expand Down Expand Up @@ -431,7 +436,11 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
@override
Future<void> close() async {
await super.close();
await _pushesController.close();
for (final sub in _clientSubscriptionRegistry.values) sub.close();
for (final sub in _serverSubscriptionRegistry.values) sub.close();
_clientSubscriptionRegistry.clear();
_serverSubscriptionRegistry.clear();
_pushesController.close().ignore();
}
}

Expand Down
74 changes: 42 additions & 32 deletions lib/src/subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,15 @@ import 'spinify_interface.dart';
import 'subscription_interface.dart';

@internal
final class SpinifyClientSubscriptionImpl implements SpinifyClientSubscription {
SpinifyClientSubscriptionImpl({
abstract base class SpinifySubscriptionBase implements SpinifySubscription {
SpinifySubscriptionBase({
required ISpinify client,
required this.channel,
required this.config,
}) : _clientWR = WeakReference<ISpinify>(client);

@override
final String channel;

@override
final SpinifySubscriptionConfig config;

/// Spinify client weak reference.
final WeakReference<ISpinify> _clientWR;
ISpinify get _client {
Expand All @@ -41,21 +37,50 @@ final class SpinifyClientSubscriptionImpl implements SpinifyClientSubscription {
return target;
}

// TODO(plugfox): set from client
@override
SpinifyStreamPosition? get since => throw UnimplementedError();
final StreamController<SpinifyChannelEvent> _stateController =
StreamController<SpinifyChannelEvent>.broadcast();

final StreamController<SpinifyChannelEvent> _eventController =
StreamController<SpinifyChannelEvent>.broadcast();

// TODO(plugfox): set from client
@override
SpinifySubscriptionState get state => throw UnimplementedError();

// TODO(plugfox): get from client
@override
SpinifySubscriptionStateStream get states => throw UnimplementedError();

@override
ChannelEvents<SpinifyChannelEvent> get stream =>
_client.stream.filter(channel: channel);
ChannelEvents(_eventController.stream);

@mustCallSuper
void onEvent(SpinifyChannelEvent event) {
_eventController.add(event);
// TODO(plugfox): update since position
}

@mustCallSuper
void close() {
_stateController.close().ignore();
_eventController.close().ignore();
}
}

@internal
final class SpinifyClientSubscriptionImpl extends SpinifySubscriptionBase
implements SpinifyClientSubscription {
SpinifyClientSubscriptionImpl({
required super.client,
required super.channel,
required this.config,
});

@override
final SpinifySubscriptionConfig config;

// TODO(plugfox): set from client
@override
SpinifyStreamPosition? get since => throw UnimplementedError();

@override
Future<SpinifyHistory> history({
Expand Down Expand Up @@ -101,27 +126,12 @@ final class SpinifyClientSubscriptionImpl implements SpinifyClientSubscription {
}

@internal
final class SpinifyServerSubscriptionImpl implements SpinifyServerSubscription {
final class SpinifyServerSubscriptionImpl extends SpinifySubscriptionBase
implements SpinifyServerSubscription {
SpinifyServerSubscriptionImpl({
required ISpinify client,
required this.channel,
}) : _clientWR = WeakReference<ISpinify>(client);

@override
final String channel;

/// Spinify client weak reference.
final WeakReference<ISpinify> _clientWR;
ISpinify get _client {
final target = _clientWR.target;
if (target == null) {
throw SpinifySubscriptionException(
channel: channel,
message: 'Client is closed',
);
}
return target;
}
required super.client,
required super.channel,
});

// TODO(plugfox): set from client
@override
Expand Down

0 comments on commit 86cd2fc

Please sign in to comment.