Skip to content

Commit

Permalink
Add basic event queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed May 3, 2024
1 parent 200b2f9 commit 6881c4a
Show file tree
Hide file tree
Showing 36 changed files with 3,537 additions and 18 deletions.
2 changes: 2 additions & 0 deletions lib/spinify.dart
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
library spinify;

export 'src/spinify_impl.dart' show Spinify;
Empty file removed lib/src/client/spinify.dart
Empty file.
157 changes: 157 additions & 0 deletions lib/src/event_bus.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import 'dart:async';
import 'dart:collection';
import 'dart:developer' as dev;

import 'package:meta/meta.dart';

import 'logger.dart' as log;
import 'spinify_interface.dart';

/// SpinifyBus Singleton class
/// That class is used to manage the event queue and work as a singleton
/// event bus to process, dispatch and manage all the events
/// in the Spinify clients.
@internal
@immutable
final class SpinifyEventBus {
SpinifyEventBus._internal();
static final SpinifyEventBus _internalSingleton = SpinifyEventBus._internal();

/// Get the instance of the SpinifyEventBus
static SpinifyEventBus get instance => _internalSingleton;

/// Error when client not found
static Never _clientNotFound(int clientId) =>
throw StateError('Client $clientId not found');

/// The buckets of the clients
final Expando<SpinifyEventBus$Bucket> _buckets =
Expando<SpinifyEventBus$Bucket>('SpinifyEventBus');

/// Register a new client to the SpinifyBus
SpinifyEventBus$Bucket registerClient(ISpinify client) =>
_buckets[client] = SpinifyEventBus$Bucket(client);

/// Unregister a client from the SpinifyBus
void unregisterClient(ISpinify client) {
_buckets[client]?.dispose();
_buckets[client] = null;
}

/// Get the bucket for the client
SpinifyEventBus$Bucket getBucket(ISpinify client) =>
_buckets[client] ?? _clientNotFound(client.id);

@override
int get hashCode => 0;

@override
bool operator ==(Object other) => identical(this, other);

@override
String toString() => 'SpinifyEventBus{}';
}

/// SpinifyEventBus$Event class
@immutable
final class _SpinifyEventBus$Task {
/// Create a new SpinifyEventBus$Event
const _SpinifyEventBus$Task(this.completer, this.event, this.data);

/// The completer
final Completer<void> completer;

/// The event name
final String event;

/// The event data
final Object? data;
}

/// SpinifyEventBus$Bucket class
final class SpinifyEventBus$Bucket {
/// Create a new SpinifyEventBus$Bucket
SpinifyEventBus$Bucket(ISpinify client, {String? debugLabel})
: _clientWR = WeakReference<ISpinify>(client),
_debugLabel = debugLabel ?? '[Spinify#${client.id}]';

final String _debugLabel;

/// The client weak reference
final WeakReference<ISpinify> _clientWR;

/// The current client instance
ISpinify? get client => _clientWR.target;

/// The tasks queue, mutex of the events
final Queue<_SpinifyEventBus$Task> _queue = Queue<_SpinifyEventBus$Task>();
final Map<String, List<Future<void> Function(Object?)>> _subscribers =
<String, List<Future<void> Function(Object?)>>{};

/// Push an event to the client
Future<void> pushEvent(String event, [Object? data]) async {
final completer = Completer<void>.sync();
_queue.add(_SpinifyEventBus$Task(completer, event, data));
log.fine('$_debugLabel Pushing event $event');
if (!_processing) scheduleMicrotask(_processEvents);
return completer.future;
}

/// Subscribe to an event
void subscribe(String event, Future<void> Function(Object? data) callback) {
_subscribers
.putIfAbsent(event, () => <Future<void> Function(Object?)>[])
.add(callback);
}

/// Unsubscribe from an event
void unsubscribe(String event, Future<void> Function() callback) {
final subs = _subscribers[event];
if (subs == null) return;
subs.remove(callback);
}

bool _processing = false;
Future<void> _processEvents() async {
if (_processing) return;
_processing = true;
dev.Timeline.instantSync('$_debugLabel _processEvents() start');
log.fine('$_debugLabel start processing events');
while (_queue.isNotEmpty) {
var task = _queue.removeFirst();
final event = task.event;
log.fine('$_debugLabel processing "$event"');
try {
await _notifySubscribers(event, task.data);
task.completer.complete(null);
//dev.Timeline.instantSync('$_debugLabel event "$event" processed');
//log.fine('$_debugLabel event "$event" processed');
} on Object catch (error, stackTrace) {
final reason = '$_debugLabel error processing event "$event"';
dev.Timeline.instantSync(reason);
log.warning(error, stackTrace, reason);
task.completer.completeError(error, stackTrace);
}
}
_processing = false;
log.fine('$_debugLabel end processing events');
dev.Timeline.instantSync('$_debugLabel _processEvents() end');
}

/// Notify the subscribers
Future<void> _notifySubscribers(String event, Object? data) async {
final subs = _subscribers[event];
if (subs == null) return;
for (final sub in subs) await sub(data);
}

/// Dispose the bucket
@protected
@visibleForTesting
void dispose() {
_subscribers.clear();
final error = StateError('$_debugLabel client closed');
for (final task in _queue) task.completer.completeError(error);
_queue.clear();
}
}
49 changes: 49 additions & 0 deletions lib/src/logger.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import 'dart:developer' as dev;

import 'package:meta/meta.dart';

/// Constants used to debug the Spinify client.
/// --dart-define=dev.plugfox.spinify.debug=true
const bool $enableLogging = bool.fromEnvironment(
'dev.plugfox.spinify.log',
defaultValue: false,
);

/// Tracing information
@internal
final void Function(Object? message) fine = _logAll('FINE', 500);

/// Static configuration messages
@internal
final void Function(Object? message) config = _logAll('CONF', 700);

/// Iformational messages
@internal
final void Function(Object? message) info = _logAll('INFO', 800);

/// Potential problems
@internal
final void Function(Object exception, [StackTrace? stackTrace, String? reason])
warning = _logAll('WARN', 900);

/// Serious failures
@internal
final void Function(Object error, [StackTrace stackTrace, String? reason])
severe = _logAll('ERR!', 1000);

void Function(
Object? message, [
StackTrace? stackTrace,
String? reason,
]) _logAll(String prefix, int level) => (message, [stackTrace, reason]) {
// coverage:ignore-start
if (!$enableLogging) return;
dev.log(
reason ?? message?.toString() ?? '',
level: level,
name: 'spinify',
error: message is Exception || message is Error ? message : null,
stackTrace: stackTrace,
);
// coverage:ignore-end
};
79 changes: 79 additions & 0 deletions lib/src/model/channel_presence.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import 'package:meta/meta.dart';

import 'channel_push.dart';
import 'client_info.dart';

/// {@template channel_presence}
/// Channel presence.
/// Join / Leave events.
/// {@endtemplate}
/// {@category Event}
/// {@subCategory Push}
@immutable
sealed class SpinifyChannelPresence extends SpinifyChannelPush {
/// {@macro channel_presence}
const SpinifyChannelPresence({
required super.timestamp,
required super.channel,
required this.info,
});

/// Client info
final SpinifyClientInfo info;

/// Whether this is a join event
abstract final bool isJoin;

/// Whether this is a leave event
abstract final bool isLeave;
}

/// {@macro channel_presence}
/// {@category Event}
/// {@subCategory Push}
/// {@subCategory Presence}
final class SpinifyJoin extends SpinifyChannelPresence {
/// {@macro channel_presence}
const SpinifyJoin({
required super.timestamp,
required super.channel,
required super.info,
});

@override
String get type => 'join';

@override
bool get isJoin => true;

@override
bool get isLeave => false;

@override
String toString() => 'SpinifyJoin{channel: $channel}';
}

/// {@macro channel_presence}
/// {@category Event}
/// {@subCategory Push}
/// {@subCategory Presence}
final class SpinifyLeave extends SpinifyChannelPresence {
/// {@macro channel_presence}
const SpinifyLeave({
required super.timestamp,
required super.channel,
required super.info,
});

@override
String get type => 'leave';

@override
bool get isJoin => false;

@override
bool get isLeave => true;

@override
String toString() => 'SpinifyLeave{channel: $channel}';
}
27 changes: 27 additions & 0 deletions lib/src/model/channel_push.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import 'package:meta/meta.dart';

import 'event.dart';

/// {@template spinify_channel_push}
/// Base class for all channel push events.
/// {@endtemplate}
/// {@category Event}
/// {@subCategory Push}
@immutable
abstract base class SpinifyChannelPush extends SpinifyEvent {
/// {@macro spinify_channel_push}
const SpinifyChannelPush({
required super.timestamp,
required this.channel,
});

/// Channel
final String channel;

@override
@nonVirtual
bool get isPush => true;

@override
String toString() => 'SpinifyChannelPush{channel: $channel}';
}
53 changes: 53 additions & 0 deletions lib/src/model/client_info.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import 'package:meta/meta.dart';

/// {@template client_info}
/// Client information.
/// {@endtemplate}
/// {@category Entity}
@immutable
final class SpinifyClientInfo {
/// {@macro client_info}
const SpinifyClientInfo({
required this.user,
required this.client,
required this.connectionInfo,
required this.channelInfo,
});

/// User
final String user;

/// Client
final String client;

/// Connection information
final List<int>? connectionInfo;

/// Channel information
final List<int>? channelInfo;

@override
int get hashCode => Object.hashAll([
user,
client,
connectionInfo,
channelInfo,
]);

@override
bool operator ==(Object other) =>
identical(this, other) ||
other is SpinifyClientInfo &&
user == other.client &&
client == other.client &&
connectionInfo == other.connectionInfo &&
channelInfo == other.channelInfo;

@override
String toString() => 'SpinifyClientInfo{'
'user: $user, '
'client: $client, '
'connectionInfo: ${connectionInfo == null ? 'null' : 'bytes'}, '
'channelInfo: ${channelInfo == null ? 'null' : 'bytes'}'
'}';
}
Loading

0 comments on commit 6881c4a

Please sign in to comment.