Skip to content

Commit

Permalink
Refactor code to update imports and remove @internal annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed May 4, 2024
1 parent d9f89da commit e163a26
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 211 deletions.
6 changes: 4 additions & 2 deletions benchmark/event_bus_benchmark.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ void main() => Future<void>(() async {
await SpinifyEventBus$Benchmark().report();
});

enum _BenchmarkEvent { fake }

class SpinifyEventBus$Benchmark extends AsyncBenchmarkBase {
SpinifyEventBus$Benchmark() : super(r'SpinifyEventBus$Benchmark');

Expand All @@ -22,13 +24,13 @@ class SpinifyEventBus$Benchmark extends AsyncBenchmarkBase {
await super.setup();
_client = Spinify();
_bucket = SpinifyEventBus.instance.getBucket(_client);
_bucket.subscribe('benchmark_fake', (_) async => _received++);
_bucket.subscribe(_BenchmarkEvent.fake, (_) async => _received++);
}

@override
Future<void> run() async {
_pushed++;
await _bucket.pushEvent('benchmark_fake');
await _bucket.push(_BenchmarkEvent.fake);
}

@override
Expand Down
135 changes: 39 additions & 96 deletions lib/src/event_bus.dart
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,16 @@ abstract interface class ISpinifyEventBus$Bucket {
abstract final ISpinify? client;

/// Push an event to the client
Future<void> pushEvent(String event, [Object? data]);
Future<void> push(Enum event, [Object? data]);

/// Push an event to the client with priority
Future<void> pushPriority(Enum event, [Object? data]);

/// Subscribe to an event
void subscribe(String event, Future<void> Function(Object? data) callback);
void subscribe(Enum event, Future<void> Function(Object? data) callback);

/// Unsubscribe from an event
void unsubscribe(String event, Future<void> Function() callback);
void unsubscribe(Enum event, Future<void> Function() callback);

/// Dispose the bucket
/// Do not use it directly
Expand Down Expand Up @@ -110,39 +113,58 @@ final class SpinifyEventBus$Bucket$QueueImpl
<String, List<Future<void> Function(Object?)>>{};

/// The tasks queue, mutex of the events
final Queue<_SpinifyEventBus$Task> _queue = Queue<_SpinifyEventBus$Task>();
final Queue<_SpinifyEventBus$Task> _events = Queue<_SpinifyEventBus$Task>();

/// The priority queue, mutex of the events
final Queue<_SpinifyEventBus$Task> _priority = Queue<_SpinifyEventBus$Task>();

@override
Future<void> push(Enum event, [Object? data]) async {
final completer = Completer<void>.sync();
_events.add(_SpinifyEventBus$Task(completer, event.name, data));
log.fine('$_debugLabel pushing event $event');
if (!_processing) scheduleMicrotask(_processTasks);
return completer.future;
}

@override
Future<void> pushEvent(String event, [Object? data]) async {
Future<void> pushPriority(Enum event, [Object? data]) {
final completer = Completer<void>.sync();
_queue.add(_SpinifyEventBus$Task(completer, event, data));
log.fine('$_debugLabel Pushing event $event');
_priority.add(_SpinifyEventBus$Task(completer, event.name, data));
log.fine('$_debugLabel pushing priority event $event');
if (!_processing) scheduleMicrotask(_processTasks);
return completer.future;
}

@override
void subscribe(String event, Future<void> Function(Object? data) callback) {
void subscribe(Enum event, Future<void> Function(Object? data) callback) {
_subscribers
.putIfAbsent(event, () => <Future<void> Function(Object?)>[])
.putIfAbsent(event.name, () => <Future<void> Function(Object?)>[])
.add(callback);
}

@override
void unsubscribe(String event, Future<void> Function() callback) {
final subs = _subscribers[event];
void unsubscribe(Enum event, Future<void> Function() callback) {
final subs = _subscribers[event.name];
if (subs == null) return;
subs.remove(callback);
}

_SpinifyEventBus$Task? _getNext() {
if (_priority.isNotEmpty) return _priority.removeFirst();
if (_events.isNotEmpty) return _events.removeFirst();
return null;
}

bool _processing = false;
Future<void> _processTasks() async {
if (_processing) return;
_processing = true;
//dev.Timeline.instantSync('$_debugLabel _processEvents() start');
log.fine('$_debugLabel start processing events');
while (_queue.isNotEmpty) {
var task = _queue.removeFirst();
//log.fine('$_debugLabel start processing events');
while (true) {
var task = _getNext();
if (task == null) break;
final event = task.event;
try {
// Notify subscribers
Expand All @@ -159,94 +181,15 @@ final class SpinifyEventBus$Bucket$QueueImpl
}
}
_processing = false;
log.fine('$_debugLabel end processing events');
//log.fine('$_debugLabel end processing events');
//dev.Timeline.instantSync('$_debugLabel _processEvents() end');
}

@override
void dispose() {
_subscribers.clear();
final error = StateError('$_debugLabel client closed');
for (final task in _queue) task.completer.completeError(error);
_queue.clear();
}
}

/*
/// SpinifyEventBus$Bucket$StreamControllerImpl class
final class SpinifyEventBus$Bucket$StreamControllerImpl
implements ISpinifyEventBus$Bucket {
/// Create a new SpinifyEventBus$Bucket$StreamControllerImpl
SpinifyEventBus$Bucket$StreamControllerImpl(ISpinify client,
{String? debugLabel})
: _clientWR = WeakReference<ISpinify>(client),
_debugLabel = debugLabel ?? '[Spinify#${client.id}]' {
_subscription = _controller.stream.asyncMap(_processTask).listen((_) {});
}
final String _debugLabel;
/// The client weak reference
final WeakReference<ISpinify> _clientWR;
@override
ISpinify? get client => _clientWR.target;
/// The subscribers of the events
final Map<String, List<Future<void> Function(Object?)>> _subscribers =
<String, List<Future<void> Function(Object?)>>{};
/// The tasks queue, mutex of the events
final StreamController<_SpinifyEventBus$Task> _controller =
StreamController<_SpinifyEventBus$Task>(sync: true);
late final StreamSubscription<void> _subscription;
@override
Future<void> pushEvent(String event, [Object? data]) async {
final completer = Completer<void>.sync();
_controller.add(_SpinifyEventBus$Task(completer, event, data));
log.fine('$_debugLabel Pushing event $event');
return completer.future;
}
@override
void subscribe(String event, Future<void> Function(Object? data) callback) {
_subscribers
.putIfAbsent(event, () => <Future<void> Function(Object?)>[])
.add(callback);
}
@override
void unsubscribe(String event, Future<void> Function() callback) {
final subs = _subscribers[event];
if (subs == null) return;
subs.remove(callback);
}
Future<void> _processTask(_SpinifyEventBus$Task task) async {
final event = task.event;
try {
// Notify subscribers
final subs = _subscribers[event];
if (subs != null) for (final sub in subs) await sub(task.data);
task.completer.complete();
//dev.Timeline.instantSync('$_debugLabel $event');
log.fine('$_debugLabel $event');
} on Object catch (error, stackTrace) {
final reason = '$_debugLabel $event error';
//dev.Timeline.instantSync(reason);
log.warning(error, stackTrace, reason);
task.completer.completeError(error, stackTrace);
}
}
@override
void dispose() {
_subscribers.clear();
_subscription.cancel();
//final error = StateError('$_debugLabel client closed');
//for (final task in _queue) task.completer.completeError(error);
for (final task in _events) task.completer.completeError(error);
_events.clear();
}
}
*/
86 changes: 86 additions & 0 deletions lib/src/model/codes.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/// Server may send custom disconnect codes to a client.
/// Custom disconnect codes must be in range [3000, 4999].
///
/// Client automatically reconnects upon receiving code
/// in range 3000-3499, 4000-4499
/// (i.e. Client goes to connecting state).
/// Other codes result into going to disconnected state.
///
/// Client implementation can use codes < 3000 for client-side
/// specific disconnect reasons.
sealed class SpinifyDisconnectedCode {
/// Disconnect called.
static const int disconnectCalled = 0;

/// Connection closed by the server.
static const int unauthorized = 1;

/// Connection closed by the server.
static const int badProtocol = 2;

/// Connection closed by the server.
static const int messageSizeLimit = 3;
}

/// Close code for connecting.
sealed class SpinifyConnectingCode {
/// Connect called.
static const int connectCalled = 0;

/// Transport closed.
static const int transportClosed = 1;

/// No ping received.
static const int noPing = 2;

/// Subscribe timeout.
static const int subscribeTimeout = 3;

/// Unsubscribe timeout.
static const int unsubscribeError = 4;
}

/// Close code for subscribing.
sealed class SpinifySubscribingCode {
/// Subscribe called.
static const int subscribeCalled = 0;

/// Transport closed.
static const int transportClosed = 1;
}

/// Close code for unsubscribing.
///
/// Server may return unsubscribe codes.
/// Server unsubscribe codes must be in range [2000, 2999].
///
/// Unsubscribe codes >= 2500 coming from server to client result
/// into automatic resubscribe attempt
/// (i.e. client goes to subscribing state).
/// Codes < 2500 result into going to unsubscribed state.
///
/// Client implementation can use codes < 2000 for client-side
/// specific unsubscribe reasons.
sealed class SpinifyUnsubscribedCode {
/// Unsubscribe called.
static const int unsubscribeCalled = 0;

/// Unauthorized.
static const int unauthorized = 1;

/// Client closed.
static const int clientClosed = 2;
}

/// Server can return error codes in range 100-1999.
/// Error codes in interval 0-399 reserved by Centrifuge/Centrifugo server.
/// Codes in range [400, 1999] may be returned by application code built
/// on top of Centrifuge/Centrifugo.
///
/// Server errors contain a temporary boolean flag which works as a signal
/// that error may be fixed by a later retry.
///
/// Errors with codes 0-100 can be used by client-side implementation.
/// Client-side errors may not have code attached at all since in many
/// languages error can be distinguished by its type.
sealed class SpinifyErrorCode {}
63 changes: 63 additions & 0 deletions lib/src/model/command.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:typed_data';

import 'package:meta/meta.dart';

/// {@template command}
Expand Down Expand Up @@ -52,10 +54,30 @@ final class SpinifyConnectRequest extends SpinifyCommand {
const SpinifyConnectRequest({
required super.id,
required super.timestamp,
required this.token,
required this.data,
required this.subs,
required this.name,
required this.version,
});

@override
String get type => 'ConnectRequest';

/// Token to authenticate.
final String? token;

/// Data to send.
final List<int>? data;

/// Subscriptions to subscribe.
final Map<String, SpinifySubscribeRequest>? subs;

/// Name of client.
final String name;

/// Version of client.
final String version;
}

/// {@macro command}
Expand All @@ -64,10 +86,51 @@ final class SpinifySubscribeRequest extends SpinifyCommand {
const SpinifySubscribeRequest({
required super.id,
required super.timestamp,
required this.channel,
required this.token,
required this.recover,
required this.epoch,
required this.offset,
required this.data,
required this.positioned,
required this.recoverable,
required this.joinLeave,
});

@override
String get type => 'SubscribeRequest';

/// Channel to subscribe.
final String channel;

/// Subscription token and callback to get
/// subscription token upon expiration
final String? token;

/// Option to ask server to make subscription recoverable
final bool? recover;

/// Epoch to start subscription from
final String? epoch;

/// Offset to start subscription from
final int? offset;

/// Subscription data
/// (attached to every subscribe/resubscribe request)
final Uint8List? data;

/// Option to ask server to make subscription positioned
/// (if not forced by a server)
final bool? positioned;

/// Option to ask server to make subscription recoverable
/// (if not forced by a server)
final bool? recoverable;

/// Option to ask server to push Join/Leave messages
/// (if not forced by a server)
final bool? joinLeave;
}

/// {@macro command}
Expand Down
Loading

0 comments on commit e163a26

Please sign in to comment.