Skip to content

Commit

Permalink
Refactor imports to use 'channel_event.dart' instead of 'channel_push…
Browse files Browse the repository at this point in the history
….dart'
  • Loading branch information
PlugFox committed Jun 11, 2024
1 parent 3f8a09f commit e0cfde5
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ Future<void> _desktopInitialization() async {
center: true,
backgroundColor:
PlatformDispatcher.instance.platformBrightness == Brightness.dark
? ThemeData.dark().colorScheme.background
: ThemeData.light().colorScheme.background,
? ThemeData.dark().colorScheme.surface
: ThemeData.light().colorScheme.surface,
skipTaskbar: false,
titleBarStyle: TitleBarStyle.hidden,
/* alwaysOnTop: true, */
Expand Down
4 changes: 2 additions & 2 deletions lib/spinify.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ export 'src/model/jwt.dart';
export 'src/model/metric.dart';
export 'src/model/presence_stats.dart';
export 'src/model/reply.dart';
export 'src/model/spinify_interface.dart';
export 'src/model/state.dart';
export 'src/model/states_stream.dart';
export 'src/model/stream_position.dart';
export 'src/model/subscription.dart';
export 'src/model/subscription_config.dart';
export 'src/model/subscription_state.dart';
export 'src/model/subscription_states_stream.dart';
export 'src/model/transport_interface.dart';
export 'src/spinify_impl.dart' show Spinify;
export 'src/spinify_interface.dart';
export 'src/subscription_interface.dart';
export 'src/transport_fake.dart';
228 changes: 168 additions & 60 deletions lib/src/spinify_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@ import 'model/channel_events.dart';
import 'model/command.dart';
import 'model/config.dart';
import 'model/constant.dart';
import 'model/exception.dart';
import 'model/history.dart';
import 'model/metric.dart';
import 'model/presence_stats.dart';
import 'model/reply.dart';
import 'model/spinify_interface.dart';
import 'model/state.dart';
import 'model/states_stream.dart';
import 'model/stream_position.dart';
import 'model/subscription.dart';
import 'model/subscription_config.dart';
import 'model/transport_interface.dart';
import 'spinify_interface.dart';
import 'subscription_impl.dart';
import 'subscription_interface.dart';
import 'transport_ws_pb_stub.dart'
// ignore: uri_does_not_exist
if (dart.library.js_util) 'transport_ws_pb_js.dart'
Expand Down Expand Up @@ -298,9 +300,150 @@ base mixin SpinifyCommandMixin on SpinifyBase {
}
}

/// Base mixin for Spinify subscription management.
base mixin SpinifySubscriptionMixin on SpinifyBase, SpinifyCommandMixin {
final StreamController<SpinifyChannelEvent> _pushesController =
StreamController<SpinifyChannelEvent>.broadcast();

@override
late final ChannelEvents<SpinifyChannelEvent> stream =
ChannelEvents<SpinifyChannelEvent>(_pushesController.stream);

@override
({
Map<String, SpinifyClientSubscription> client,
Map<String, SpinifyServerSubscription> server
}) get subscriptions => throw UnimplementedError();

/// Registry of client subscriptions.
final Map<String, SpinifyClientSubscriptionImpl> _clientSubscriptionRegistry =
<String, SpinifyClientSubscriptionImpl>{};

/// Registry of server subscriptions.
final Map<String, SpinifyServerSubscription> _serverSubscriptionRegistry =
<String, SpinifyServerSubscription>{};

@override
SpinifyClientSubscription? getSubscription(String channel) =>
_clientSubscriptionRegistry[channel];

@override
SpinifyClientSubscription newSubscription(String channel,
[SpinifySubscriptionConfig? config]) {
final sub = _clientSubscriptionRegistry[channel] ??
_serverSubscriptionRegistry[channel];
if (sub != null) {
this.config.logger?.call(
const SpinifyLogLevel.warning(),
'subscription_exists_error',
'Subscription already exists',
<String, Object?>{
'channel': channel,
'subscription': sub,
},
);
throw SpinifySubscriptionException(
channel: channel,
message: 'Subscription already exists',
);
}
return _clientSubscriptionRegistry[channel] = SpinifyClientSubscriptionImpl(
client: this,
channel: channel,
config: config ?? const SpinifySubscriptionConfig.byDefault(),
);
}

@override
Future<void> removeSubscription(
SpinifyClientSubscription subscription) async {
final subFromRegistry =
_clientSubscriptionRegistry.remove(subscription.channel);
try {
await subFromRegistry?.unsubscribe();
assert(
subFromRegistry != null,
'Subscription not found in the registry',
);
assert(
identical(subFromRegistry, subscription),
'Subscription should be the same instance as in the registry',
);
} on Object catch (error, stackTrace) {
config.logger?.call(
const SpinifyLogLevel.warning(),
'subscription_remove_error',
'Error removing subscription',
<String, Object?>{
'channel': subscription.channel,
'subscription': subscription,
},
);
Error.throwWithStackTrace(
SpinifySubscriptionException(
channel: subscription.channel,
message: 'Error while unsubscribing',
error: error,
),
stackTrace,
);
} finally {
subFromRegistry?.close().ignore();
}
}

@override
Future<void> _onReply(SpinifyReply reply) async {
await super._onReply(reply);
if (reply is SpinifyPush) {
// Add push to the stream.
_pushesController.add(reply.event);
config.logger?.call(
const SpinifyLogLevel.debug(),
'push_received',
'Push received',
<String, Object?>{
'push': reply,
'event': reply.event,
},
);
} else if (reply is SpinifyConnectResult) {
// Update subscriptions state.
final entries = reply.subs?.entries;
// TODO(plugfox): implement subscription state update
/* for (final entry in entries) {
final MapEntry<String, SpinifySubscribeResult>(key: channel, value: sub) = entry;
final subState = reply.subs[channel];
if (subState != null) {
sub.state = subState;
config.logger?.call(
const SpinifyLogLevel.debug(),
'subscription_state_updated',
'Subscription state updated',
<String, Object?>{
'subscription': sub,
'state': subState,
},
);
}
} */
}
}

@override
Future<void> close() async {
await super.close();
await _pushesController.close();
}
}

/// Base mixin for Spinify client connection management (connect & disconnect).
base mixin SpinifyConnectionMixin
on SpinifyBase, SpinifyCommandMixin, SpinifyStateMixin {
on
SpinifyBase,
SpinifyCommandMixin,
SpinifyStateMixin,
SpinifySubscriptionMixin {
Timer? _reconnectTimer;
Completer<void>? _readyCompleter;

Expand Down Expand Up @@ -335,19 +478,36 @@ base mixin SpinifyConnectionMixin
final token = await config.getToken?.call();
assert(token == null || token.length > 5, 'Spinify JWT is too short');
final payload = await config.getPayload?.call();
final id = _getNextCommandId();
final now = DateTime.now();
request = SpinifyConnectRequest(
id: _getNextCommandId(),
timestamp: DateTime.now(),
id: id,
timestamp: now,
token: token,
data: payload,
// TODO(plugfox): Implement subscriptions.
subs: const <String, SpinifySubscribeRequest>{},
subs: <String, SpinifySubscribeRequest>{
for (final sub in _serverSubscriptionRegistry.values)
sub.channel: SpinifySubscribeRequest(
id: id,
timestamp: now,
channel: sub.channel,
recover: sub.state.recoverable,
epoch: sub.state.since?.epoch,
offset: sub.state.since?.offset,
token: null,
data: null,
positioned: null,
recoverable: null,
joinLeave: null,
),
},
name: config.client.name,
version: config.client.version,
);
}

final reply = await _sendCommand<SpinifyConnectResult>(request);

_setState(SpinifyState$Connected(
url: url,
client: reply.client,
Expand Down Expand Up @@ -690,58 +850,6 @@ base mixin SpinifyPingPongMixin
}
}

/// Base mixin for Spinify subscription management.
base mixin SpinifySubscriptionMixin on SpinifyBase {
final StreamController<SpinifyChannelEvent> _pushesController =
StreamController<SpinifyChannelEvent>.broadcast();

@override
late final ChannelEvents<SpinifyChannelEvent> stream =
ChannelEvents<SpinifyChannelEvent>(_pushesController.stream);

@override
({
Map<String, SpinifyClientSubscription> client,
Map<String, SpinifyServerSubscription> server
}) get subscriptions => throw UnimplementedError();

@override
SpinifyClientSubscription? getSubscription(String channel) =>
throw UnimplementedError();

@override
SpinifyClientSubscription newSubscription(String channel,
[SpinifySubscriptionConfig? config]) =>
throw UnimplementedError();

@override
Future<void> removeSubscription(SpinifyClientSubscription subscription) =>
throw UnimplementedError();

@override
Future<void> _onReply(SpinifyReply reply) async {
await super._onReply(reply);
if (!reply.isResult && reply is SpinifyPush) {
_pushesController.add(reply.event);
config.logger?.call(
const SpinifyLogLevel.debug(),
'push_received',
'Push received',
<String, Object?>{
'push': reply,
'event': reply.event,
},
);
}
}

@override
Future<void> close() async {
await super.close();
await _pushesController.close();
}
}

/// Base mixin for Spinify client publications management.
base mixin SpinifyPublicationsMixin on SpinifyBase {
@override
Expand Down Expand Up @@ -816,9 +924,9 @@ final class Spinify extends SpinifyBase
with
SpinifyStateMixin,
SpinifyCommandMixin,
SpinifySubscriptionMixin,
SpinifyConnectionMixin,
SpinifyPingPongMixin,
SpinifySubscriptionMixin,
SpinifyPublicationsMixin,
SpinifyPresenceMixin,
SpinifyHistoryMixin,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

import 'dart:async';

import 'channel_event.dart';
import 'channel_events.dart';
import 'config.dart';
import 'history.dart';
import 'metric.dart';
import 'presence_stats.dart';
import 'state.dart';
import 'states_stream.dart';
import 'stream_position.dart';
import 'subscription.dart';
import 'subscription_config.dart';
import 'model/channel_event.dart';
import 'model/channel_events.dart';
import 'model/config.dart';
import 'model/history.dart';
import 'model/metric.dart';
import 'model/presence_stats.dart';
import 'model/state.dart';
import 'model/states_stream.dart';
import 'model/stream_position.dart';
import 'model/subscription_config.dart';
import 'subscription_interface.dart';

/// Spinify client interface.
abstract interface class ISpinify
Expand Down
Loading

0 comments on commit e0cfde5

Please sign in to comment.