Skip to content

Commit

Permalink
Refactor SpinifySubscriptionMixin to use specific implementation clas…
Browse files Browse the repository at this point in the history
…ses for client and server subscriptions
  • Loading branch information
PlugFox committed Jun 11, 2024
1 parent 0d3f7bb commit a47f9b6
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 71 deletions.
2 changes: 1 addition & 1 deletion lib/spinify.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export 'src/model/states_stream.dart';
export 'src/model/stream_position.dart';
export 'src/model/subscription_config.dart';
export 'src/model/subscription_state.dart';
export 'src/model/subscription_states_stream.dart';
export 'src/model/subscription_states.dart';
export 'src/model/transport_interface.dart';
export 'src/spinify_impl.dart' show Spinify;
export 'src/spinify_interface.dart';
Expand Down
25 changes: 13 additions & 12 deletions lib/src/model/channel_events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,44 @@ import 'channel_event.dart';
/// {@category Subscription}
/// {@subCategory Push}
/// {@subCategory Channel}
extension type ChannelEvents<T extends SpinifyChannelEvent>(Stream<T> stream)
implements Stream<T> {
extension type SpinifyChannelEvents<T extends SpinifyChannelEvent>(
Stream<T> stream) implements Stream<T> {
/// Stream of publication events.
ChannelEvents<SpinifyPublication> publication({String? channel}) =>
SpinifyChannelEvents<SpinifyPublication> publication({String? channel}) =>
filter<SpinifyPublication>(channel: channel);

/// Stream of presence events.
ChannelEvents<SpinifyPresence> presence({String? channel}) =>
SpinifyChannelEvents<SpinifyPresence> presence({String? channel}) =>
filter<SpinifyPresence>(channel: channel);

/// Stream of unsubscribe events.
ChannelEvents<SpinifyUnsubscribe> unsubscribe({String? channel}) =>
SpinifyChannelEvents<SpinifyUnsubscribe> unsubscribe({String? channel}) =>
filter<SpinifyUnsubscribe>(channel: channel);

/// Stream of message events.
ChannelEvents<SpinifyMessage> message({String? channel}) =>
SpinifyChannelEvents<SpinifyMessage> message({String? channel}) =>
filter<SpinifyMessage>(channel: channel);

/// Stream of subscribe events.
ChannelEvents<SpinifySubscribe> subscribe({String? channel}) =>
SpinifyChannelEvents<SpinifySubscribe> subscribe({String? channel}) =>
filter<SpinifySubscribe>(channel: channel);

/// Stream of connect events.
ChannelEvents<SpinifyConnect> connect({String? channel}) =>
SpinifyChannelEvents<SpinifyConnect> connect({String? channel}) =>
filter<SpinifyConnect>(channel: channel);

/// Stream of disconnect events.
ChannelEvents<SpinifyDisconnect> disconnect({String? channel}) =>
SpinifyChannelEvents<SpinifyDisconnect> disconnect({String? channel}) =>
filter<SpinifyDisconnect>(channel: channel);

/// Stream of refresh events.
ChannelEvents<SpinifyRefresh> refresh({String? channel}) =>
SpinifyChannelEvents<SpinifyRefresh> refresh({String? channel}) =>
filter<SpinifyRefresh>(channel: channel);

/// Filtered stream of data of [SpinifyChannelEvent].
ChannelEvents<S> filter<S extends SpinifyChannelEvent>({String? channel}) =>
ChannelEvents<S>(transform<S>(StreamTransformer<T, S>.fromHandlers(
SpinifyChannelEvents<S> filter<S extends SpinifyChannelEvent>(
{String? channel}) =>
SpinifyChannelEvents<S>(transform<S>(StreamTransformer<T, S>.fromHandlers(
handleData: (data, sink) => switch (data) {
S valid when channel == null || valid.channel == channel =>
sink.add(valid),
Expand Down
35 changes: 35 additions & 0 deletions lib/src/model/subscription_states.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import 'dart:async';

import 'subscription_state.dart';

/// Stream of Spinify's [SpinifySubscriptionState] changes.
/// {@category State}
/// {@category Client}
/// {@category Subscription}
extension type SpinifySubscriptionStates<T extends SpinifySubscriptionState>(
Stream<T> stream) implements Stream<T> {
/// Unsubscribed
SpinifySubscriptionStates<SpinifySubscriptionState$Unsubscribed> unsubscribed(
{String? channel}) =>
filter<SpinifySubscriptionState$Unsubscribed>();

/// Subscribing
SpinifySubscriptionStates<SpinifySubscriptionState$Subscribing> subscribing(
{String? channel}) =>
filter<SpinifySubscriptionState$Subscribing>();

/// Subscribed
SpinifySubscriptionStates<SpinifySubscriptionState$Subscribed> subscribed(
{String? channel}) =>
filter<SpinifySubscriptionState$Subscribed>();

/// Filtered stream of [SpinifySubscriptionState].
SpinifySubscriptionStates<S> filter<S extends SpinifySubscriptionState>() =>
SpinifySubscriptionStates<S>(
transform<S>(StreamTransformer<T, S>.fromHandlers(
handleData: (data, sink) => switch (data) {
S valid => sink.add(valid),
_ => null,
},
)));
}
39 changes: 0 additions & 39 deletions lib/src/model/subscription_states_stream.dart

This file was deleted.

4 changes: 2 additions & 2 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
StreamController<SpinifyChannelEvent>.broadcast();

@override
late final ChannelEvents<SpinifyChannelEvent> stream =
ChannelEvents<SpinifyChannelEvent>(_pushesController.stream);
late final SpinifyChannelEvents<SpinifyChannelEvent> stream =
SpinifyChannelEvents<SpinifyChannelEvent>(_pushesController.stream);

@override
({
Expand Down
2 changes: 1 addition & 1 deletion lib/src/spinify_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ abstract interface class ISpinifyAsyncMessageSender {
/// Spinify event receiver interface.
abstract interface class ISpinifyEventReceiver {
/// Stream of received pushes from Centrifugo server for a channel.
abstract final ChannelEvents<SpinifyChannelEvent> stream;
abstract final SpinifyChannelEvents<SpinifyChannelEvent> stream;
}

/// Spinify client subscriptions manager interface.
Expand Down
18 changes: 5 additions & 13 deletions lib/src/subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import 'model/presence_stats.dart';
import 'model/stream_position.dart';
import 'model/subscription_config.dart';
import 'model/subscription_state.dart';
import 'model/subscription_states_stream.dart';
import 'model/subscription_states.dart';
import 'spinify_interface.dart';
import 'subscription_interface.dart';

Expand Down Expand Up @@ -49,11 +49,11 @@ abstract base class SpinifySubscriptionBase implements SpinifySubscription {
SpinifySubscriptionState get state => _state;

@override
SpinifySubscriptionStateStream get states => throw UnimplementedError();
SpinifySubscriptionStates get states => throw UnimplementedError();

@override
ChannelEvents<SpinifyChannelEvent> get stream =>
ChannelEvents(_eventController.stream);
SpinifyChannelEvents<SpinifyChannelEvent> get stream =>
SpinifyChannelEvents(_eventController.stream);

@mustCallSuper
void onEvent(SpinifyChannelEvent event) {
Expand Down Expand Up @@ -144,16 +144,8 @@ final class SpinifyServerSubscriptionImpl extends SpinifySubscriptionBase
@override
SpinifyStreamPosition? get since => throw UnimplementedError();

// TODO(plugfox): set from client
@override
SpinifySubscriptionState get state => throw UnimplementedError();

// TODO(plugfox): get from client
@override
SpinifySubscriptionStateStream get states => throw UnimplementedError();

@override
ChannelEvents<SpinifyChannelEvent> get stream =>
SpinifyChannelEvents<SpinifyChannelEvent> get stream =>
_client.stream.filter(channel: channel);

@override
Expand Down
6 changes: 3 additions & 3 deletions lib/src/subscription_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import 'model/presence_stats.dart';
import 'model/stream_position.dart';
import 'model/subscription_config.dart';
import 'model/subscription_state.dart';
import 'model/subscription_states_stream.dart';
import 'model/subscription_states.dart';

/// {@template subscription}
/// Spinify subscription interface.
Expand Down Expand Up @@ -48,10 +48,10 @@ abstract interface class SpinifySubscription {
abstract final SpinifyStreamPosition? since;

/// Stream of subscription states.
abstract final SpinifySubscriptionStateStream states;
abstract final SpinifySubscriptionStates states;

/// Stream of received pushes from Centrifugo server for a channel.
abstract final ChannelEvents<SpinifyChannelEvent> stream;
abstract final SpinifyChannelEvents<SpinifyChannelEvent> stream;

/// Await for subscription to be ready.
/// Ready resolves when subscription successfully subscribed.
Expand Down

0 comments on commit a47f9b6

Please sign in to comment.