From 6881c4ad6d85a1e556fee4b1d878851153111fc3 Mon Sep 17 00:00:00 2001 From: Plague Fox Date: Sat, 4 May 2024 02:38:18 +0400 Subject: [PATCH] Add basic event queue implementation --- lib/spinify.dart | 2 + lib/src/client/spinify.dart | 0 lib/src/event_bus.dart | 157 ++++++ lib/src/logger.dart | 49 ++ lib/src/model/channel_presence.dart | 79 +++ lib/src/model/channel_push.dart | 27 + lib/src/model/client_info.dart | 53 ++ lib/src/model/config.dart | 117 +++++ lib/src/model/connect.dart | 59 +++ lib/src/model/disconnect.dart | 32 ++ lib/src/model/event.dart | 28 + lib/src/model/exception.dart | 137 +++++ lib/src/model/history.dart | 25 + lib/src/model/jwt.dart | 409 ++++++++++++++ lib/src/model/message.dart | 24 + lib/src/model/metrics.dart | 149 ++++++ lib/src/model/presence.dart | 24 + lib/src/model/presence_stats.dart | 27 + lib/src/model/publication.dart | 41 ++ lib/src/model/pubspec.yaml.g.dart | 497 ++++++++++++++++++ lib/src/model/pushes_stream.dart | 54 ++ lib/src/model/refresh.dart | 28 + lib/src/model/refresh_result.dart | 40 ++ lib/src/model/state.dart | 486 +++++++++++++++++ lib/src/model/states_stream.dart | 36 ++ lib/src/model/stream_position.dart | 5 + lib/src/model/subscribe.dart | 37 ++ lib/src/model/subscription_config.dart | 102 ++++ lib/src/model/subscription_state.dart | 317 +++++++++++ lib/src/model/subscription_states_stream.dart | 39 ++ lib/src/model/unsubscribe.dart | 28 + lib/src/spinify_impl.dart | 226 ++++++++ lib/src/{client => }/spinify_interface.dart | 36 +- lib/src/subscription_interface.dart | 157 ++++++ pubspec.yaml | 2 +- test/unit/spinify_test.dart | 26 +- 36 files changed, 3537 insertions(+), 18 deletions(-) delete mode 100644 lib/src/client/spinify.dart create mode 100644 lib/src/event_bus.dart create mode 100644 lib/src/logger.dart create mode 100644 lib/src/model/channel_presence.dart create mode 100644 lib/src/model/channel_push.dart create mode 100644 lib/src/model/client_info.dart create mode 100644 lib/src/model/config.dart create mode 100644 lib/src/model/connect.dart create mode 100644 lib/src/model/disconnect.dart create mode 100644 lib/src/model/event.dart create mode 100644 lib/src/model/exception.dart create mode 100644 lib/src/model/history.dart create mode 100644 lib/src/model/jwt.dart create mode 100644 lib/src/model/message.dart create mode 100644 lib/src/model/metrics.dart create mode 100644 lib/src/model/presence.dart create mode 100644 lib/src/model/presence_stats.dart create mode 100644 lib/src/model/publication.dart create mode 100644 lib/src/model/pubspec.yaml.g.dart create mode 100644 lib/src/model/pushes_stream.dart create mode 100644 lib/src/model/refresh.dart create mode 100644 lib/src/model/refresh_result.dart create mode 100644 lib/src/model/state.dart create mode 100644 lib/src/model/states_stream.dart create mode 100644 lib/src/model/stream_position.dart create mode 100644 lib/src/model/subscribe.dart create mode 100644 lib/src/model/subscription_config.dart create mode 100644 lib/src/model/subscription_state.dart create mode 100644 lib/src/model/subscription_states_stream.dart create mode 100644 lib/src/model/unsubscribe.dart create mode 100644 lib/src/spinify_impl.dart rename lib/src/{client => }/spinify_interface.dart (86%) create mode 100644 lib/src/subscription_interface.dart diff --git a/lib/spinify.dart b/lib/spinify.dart index 5e15f82..84e6c92 100644 --- a/lib/spinify.dart +++ b/lib/spinify.dart @@ -1 +1,3 @@ library spinify; + +export 'src/spinify_impl.dart' show Spinify; diff --git a/lib/src/client/spinify.dart b/lib/src/client/spinify.dart deleted file mode 100644 index e69de29..0000000 diff --git a/lib/src/event_bus.dart b/lib/src/event_bus.dart new file mode 100644 index 0000000..7a9e1d3 --- /dev/null +++ b/lib/src/event_bus.dart @@ -0,0 +1,157 @@ +import 'dart:async'; +import 'dart:collection'; +import 'dart:developer' as dev; + +import 'package:meta/meta.dart'; + +import 'logger.dart' as log; +import 'spinify_interface.dart'; + +/// SpinifyBus Singleton class +/// That class is used to manage the event queue and work as a singleton +/// event bus to process, dispatch and manage all the events +/// in the Spinify clients. +@internal +@immutable +final class SpinifyEventBus { + SpinifyEventBus._internal(); + static final SpinifyEventBus _internalSingleton = SpinifyEventBus._internal(); + + /// Get the instance of the SpinifyEventBus + static SpinifyEventBus get instance => _internalSingleton; + + /// Error when client not found + static Never _clientNotFound(int clientId) => + throw StateError('Client $clientId not found'); + + /// The buckets of the clients + final Expando _buckets = + Expando('SpinifyEventBus'); + + /// Register a new client to the SpinifyBus + SpinifyEventBus$Bucket registerClient(ISpinify client) => + _buckets[client] = SpinifyEventBus$Bucket(client); + + /// Unregister a client from the SpinifyBus + void unregisterClient(ISpinify client) { + _buckets[client]?.dispose(); + _buckets[client] = null; + } + + /// Get the bucket for the client + SpinifyEventBus$Bucket getBucket(ISpinify client) => + _buckets[client] ?? _clientNotFound(client.id); + + @override + int get hashCode => 0; + + @override + bool operator ==(Object other) => identical(this, other); + + @override + String toString() => 'SpinifyEventBus{}'; +} + +/// SpinifyEventBus$Event class +@immutable +final class _SpinifyEventBus$Task { + /// Create a new SpinifyEventBus$Event + const _SpinifyEventBus$Task(this.completer, this.event, this.data); + + /// The completer + final Completer completer; + + /// The event name + final String event; + + /// The event data + final Object? data; +} + +/// SpinifyEventBus$Bucket class +final class SpinifyEventBus$Bucket { + /// Create a new SpinifyEventBus$Bucket + SpinifyEventBus$Bucket(ISpinify client, {String? debugLabel}) + : _clientWR = WeakReference(client), + _debugLabel = debugLabel ?? '[Spinify#${client.id}]'; + + final String _debugLabel; + + /// The client weak reference + final WeakReference _clientWR; + + /// The current client instance + ISpinify? get client => _clientWR.target; + + /// The tasks queue, mutex of the events + final Queue<_SpinifyEventBus$Task> _queue = Queue<_SpinifyEventBus$Task>(); + final Map Function(Object?)>> _subscribers = + Function(Object?)>>{}; + + /// Push an event to the client + Future pushEvent(String event, [Object? data]) async { + final completer = Completer.sync(); + _queue.add(_SpinifyEventBus$Task(completer, event, data)); + log.fine('$_debugLabel Pushing event $event'); + if (!_processing) scheduleMicrotask(_processEvents); + return completer.future; + } + + /// Subscribe to an event + void subscribe(String event, Future Function(Object? data) callback) { + _subscribers + .putIfAbsent(event, () => Function(Object?)>[]) + .add(callback); + } + + /// Unsubscribe from an event + void unsubscribe(String event, Future Function() callback) { + final subs = _subscribers[event]; + if (subs == null) return; + subs.remove(callback); + } + + bool _processing = false; + Future _processEvents() async { + if (_processing) return; + _processing = true; + dev.Timeline.instantSync('$_debugLabel _processEvents() start'); + log.fine('$_debugLabel start processing events'); + while (_queue.isNotEmpty) { + var task = _queue.removeFirst(); + final event = task.event; + log.fine('$_debugLabel processing "$event"'); + try { + await _notifySubscribers(event, task.data); + task.completer.complete(null); + //dev.Timeline.instantSync('$_debugLabel event "$event" processed'); + //log.fine('$_debugLabel event "$event" processed'); + } on Object catch (error, stackTrace) { + final reason = '$_debugLabel error processing event "$event"'; + dev.Timeline.instantSync(reason); + log.warning(error, stackTrace, reason); + task.completer.completeError(error, stackTrace); + } + } + _processing = false; + log.fine('$_debugLabel end processing events'); + dev.Timeline.instantSync('$_debugLabel _processEvents() end'); + } + + /// Notify the subscribers + Future _notifySubscribers(String event, Object? data) async { + final subs = _subscribers[event]; + if (subs == null) return; + for (final sub in subs) await sub(data); + } + + /// Dispose the bucket + @protected + @visibleForTesting + void dispose() { + _subscribers.clear(); + final error = StateError('$_debugLabel client closed'); + for (final task in _queue) task.completer.completeError(error); + _queue.clear(); + } +} diff --git a/lib/src/logger.dart b/lib/src/logger.dart new file mode 100644 index 0000000..d497062 --- /dev/null +++ b/lib/src/logger.dart @@ -0,0 +1,49 @@ +import 'dart:developer' as dev; + +import 'package:meta/meta.dart'; + +/// Constants used to debug the Spinify client. +/// --dart-define=dev.plugfox.spinify.debug=true +const bool $enableLogging = bool.fromEnvironment( + 'dev.plugfox.spinify.log', + defaultValue: false, +); + +/// Tracing information +@internal +final void Function(Object? message) fine = _logAll('FINE', 500); + +/// Static configuration messages +@internal +final void Function(Object? message) config = _logAll('CONF', 700); + +/// Iformational messages +@internal +final void Function(Object? message) info = _logAll('INFO', 800); + +/// Potential problems +@internal +final void Function(Object exception, [StackTrace? stackTrace, String? reason]) + warning = _logAll('WARN', 900); + +/// Serious failures +@internal +final void Function(Object error, [StackTrace stackTrace, String? reason]) + severe = _logAll('ERR!', 1000); + +void Function( + Object? message, [ + StackTrace? stackTrace, + String? reason, +]) _logAll(String prefix, int level) => (message, [stackTrace, reason]) { + // coverage:ignore-start + if (!$enableLogging) return; + dev.log( + reason ?? message?.toString() ?? '', + level: level, + name: 'spinify', + error: message is Exception || message is Error ? message : null, + stackTrace: stackTrace, + ); + // coverage:ignore-end + }; diff --git a/lib/src/model/channel_presence.dart b/lib/src/model/channel_presence.dart new file mode 100644 index 0000000..8637a3d --- /dev/null +++ b/lib/src/model/channel_presence.dart @@ -0,0 +1,79 @@ +import 'package:meta/meta.dart'; + +import 'channel_push.dart'; +import 'client_info.dart'; + +/// {@template channel_presence} +/// Channel presence. +/// Join / Leave events. +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +@immutable +sealed class SpinifyChannelPresence extends SpinifyChannelPush { + /// {@macro channel_presence} + const SpinifyChannelPresence({ + required super.timestamp, + required super.channel, + required this.info, + }); + + /// Client info + final SpinifyClientInfo info; + + /// Whether this is a join event + abstract final bool isJoin; + + /// Whether this is a leave event + abstract final bool isLeave; +} + +/// {@macro channel_presence} +/// {@category Event} +/// {@subCategory Push} +/// {@subCategory Presence} +final class SpinifyJoin extends SpinifyChannelPresence { + /// {@macro channel_presence} + const SpinifyJoin({ + required super.timestamp, + required super.channel, + required super.info, + }); + + @override + String get type => 'join'; + + @override + bool get isJoin => true; + + @override + bool get isLeave => false; + + @override + String toString() => 'SpinifyJoin{channel: $channel}'; +} + +/// {@macro channel_presence} +/// {@category Event} +/// {@subCategory Push} +/// {@subCategory Presence} +final class SpinifyLeave extends SpinifyChannelPresence { + /// {@macro channel_presence} + const SpinifyLeave({ + required super.timestamp, + required super.channel, + required super.info, + }); + + @override + String get type => 'leave'; + + @override + bool get isJoin => false; + + @override + bool get isLeave => true; + + @override + String toString() => 'SpinifyLeave{channel: $channel}'; +} diff --git a/lib/src/model/channel_push.dart b/lib/src/model/channel_push.dart new file mode 100644 index 0000000..48c579f --- /dev/null +++ b/lib/src/model/channel_push.dart @@ -0,0 +1,27 @@ +import 'package:meta/meta.dart'; + +import 'event.dart'; + +/// {@template spinify_channel_push} +/// Base class for all channel push events. +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +@immutable +abstract base class SpinifyChannelPush extends SpinifyEvent { + /// {@macro spinify_channel_push} + const SpinifyChannelPush({ + required super.timestamp, + required this.channel, + }); + + /// Channel + final String channel; + + @override + @nonVirtual + bool get isPush => true; + + @override + String toString() => 'SpinifyChannelPush{channel: $channel}'; +} diff --git a/lib/src/model/client_info.dart b/lib/src/model/client_info.dart new file mode 100644 index 0000000..d2e683c --- /dev/null +++ b/lib/src/model/client_info.dart @@ -0,0 +1,53 @@ +import 'package:meta/meta.dart'; + +/// {@template client_info} +/// Client information. +/// {@endtemplate} +/// {@category Entity} +@immutable +final class SpinifyClientInfo { + /// {@macro client_info} + const SpinifyClientInfo({ + required this.user, + required this.client, + required this.connectionInfo, + required this.channelInfo, + }); + + /// User + final String user; + + /// Client + final String client; + + /// Connection information + final List? connectionInfo; + + /// Channel information + final List? channelInfo; + + @override + int get hashCode => Object.hashAll([ + user, + client, + connectionInfo, + channelInfo, + ]); + + @override + bool operator ==(Object other) => + identical(this, other) || + other is SpinifyClientInfo && + user == other.client && + client == other.client && + connectionInfo == other.connectionInfo && + channelInfo == other.channelInfo; + + @override + String toString() => 'SpinifyClientInfo{' + 'user: $user, ' + 'client: $client, ' + 'connectionInfo: ${connectionInfo == null ? 'null' : 'bytes'}, ' + 'channelInfo: ${channelInfo == null ? 'null' : 'bytes'}' + '}'; +} diff --git a/lib/src/model/config.dart b/lib/src/model/config.dart new file mode 100644 index 0000000..2182932 --- /dev/null +++ b/lib/src/model/config.dart @@ -0,0 +1,117 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +import 'pubspec.yaml.g.dart'; + +/// Token used for authentication +/// +/// {@category Client} +/// {@category Entity} +typedef SpinifyToken = String; + +/// Callback to get/refresh tokens +/// This callback is used for initial connection +/// and for refreshing expired tokens. +/// +/// If method returns null then connection will be established without token. +/// +/// {@category Client} +/// {@category Entity} +typedef SpinifyTokenCallback = FutureOr Function(); + +/// Callback to get initial connection payload data. +/// +/// If method returns null then no payload will be sent at connect time. +/// +/// {@category Client} +/// {@category Entity} +typedef SpinifyConnectionPayloadCallback = FutureOr?> Function(); + +/// {@template spinify_config} +/// Spinify client common options. +/// +/// There are several common options available when creating Client instance. +/// +/// - [connectionRetryInterval] - tweaks for reconnect backoff +/// - [client] - the user's client name and version +/// - [headers] - headers that are set when connecting the web socket +/// - [timeout] - maximum time to wait for the connection to be established +/// {@endtemplate} +/// {@category Client} +/// {@category Entity} +@immutable +final class SpinifyConfig { + /// {@macro spinify_config} + SpinifyConfig({ + this.getToken, + this.getPayload, + this.connectionRetryInterval = ( + min: const Duration(milliseconds: 500), + max: const Duration(seconds: 20), + ), + ({String name, String version})? client, + this.timeout = const Duration(seconds: 15), + this.serverPingDelay = const Duration(seconds: 8), + Map? headers, + }) : headers = Map.unmodifiable( + headers ?? const {}), + client = client ?? + ( + name: Pubspec.name, + version: Pubspec.version.canonical, + ); + + /// Create a default config + /// + /// {@macro spinify_config} + factory SpinifyConfig.byDefault() = SpinifyConfig; + + /// Callback to get/refresh tokens + /// This callback is used for initial connection + /// and for refreshing expired tokens. + /// + /// If method returns null then connection will be established without token. + final SpinifyTokenCallback? getToken; + + /// Callback to get connection payload data. + /// The resulted data send with every connect request. + /// + /// If method returns null then no payload will be sent at connect time. + final SpinifyConnectionPayloadCallback? getPayload; + + /// The additional delay between expected server heartbeat pings. + /// + /// Centrifugo server periodically sends pings to clients and expects pong + /// from clients that works over bidirectional transports. + /// Sending ping and receiving pong allows to find broken connections faster. + /// Centrifugo sends pings on the Centrifugo client protocol level, + /// thus it's possible for clients to handle ping messages + /// on the client side to make sure connection is not broken. + /// + /// Centrifugo expects pong message + /// from bidirectional client SDK after sending ping to it. + /// By default, it waits no more than 8 seconds before closing a connection. + final Duration serverPingDelay; + + /// The [connectionRetryInterval] argument is specifying the + /// [backoff full jitter strategy](https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/) for reconnecting. + /// Tweaks for reconnect backoff algorithm (min delay, max delay) + /// If not specified, the reconnecting will be disabled. + final ({Duration min, Duration max}) connectionRetryInterval; + + /// The user's client name and version. + final ({String name, String version}) client; + + /// Headers that are set when connecting the web socket on dart:io platforms. + /// + /// Note that headers are ignored on the web platform. + final Map headers; + + /// Maximum time to wait for the connection to be established. + /// If not specified, the timeout will be 15 seconds. + final Duration timeout; + + @override + String toString() => 'SpinifyConfig{}'; +} diff --git a/lib/src/model/connect.dart b/lib/src/model/connect.dart new file mode 100644 index 0000000..7af4476 --- /dev/null +++ b/lib/src/model/connect.dart @@ -0,0 +1,59 @@ +import 'channel_push.dart'; + +/// {@template connect} +/// Connect push from Centrifugo server. +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +final class SpinifyConnect extends SpinifyChannelPush { + /// {@macro connect} + const SpinifyConnect({ + required super.timestamp, + required super.channel, + required this.client, + required this.version, + required this.data, + this.expires, + this.ttl, + this.pingInterval, + this.sendPong, + this.session, + this.node, + }); + + @override + String get type => 'connect'; + + /// Unique client connection ID server issued to this connection + final String client; + + /// Server version + final String version; + + /// Whether a server will expire connection at some point + final bool? expires; + + /// Time when connection will be expired + final DateTime? ttl; + + /// Client must periodically (once in 25 secs, configurable) send + /// ping messages to server. If pong has not beed received in 5 secs + /// (configurable) then client must disconnect from server + /// and try to reconnect with backoff strategy. + final Duration? pingInterval; + + /// Whether to send asynchronous message when pong received. + final bool? sendPong; + + /// Session ID. + final String? session; + + /// Server node ID. + final String? node; + + /// Payload of connected push. + final List data; + + @override + String toString() => 'SpinifyConnect{channel: $channel}'; +} diff --git a/lib/src/model/disconnect.dart b/lib/src/model/disconnect.dart new file mode 100644 index 0000000..d29f24b --- /dev/null +++ b/lib/src/model/disconnect.dart @@ -0,0 +1,32 @@ +import 'channel_push.dart'; + +/// {@template disconnect} +/// Disconnect push from Centrifugo server. +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +final class SpinifyDisconnect extends SpinifyChannelPush { + /// {@macro disconnect} + const SpinifyDisconnect({ + required super.timestamp, + required super.channel, + required this.code, + required this.reason, + required this.reconnect, + }); + + @override + String get type => 'disconnect'; + + /// Code of disconnect. + final int code; + + /// Reason of disconnect. + final String reason; + + /// Reconnect flag. + final bool reconnect; + + @override + String toString() => 'SpinifyDisconnect{channel: $channel}'; +} diff --git a/lib/src/model/event.dart b/lib/src/model/event.dart new file mode 100644 index 0000000..837e025 --- /dev/null +++ b/lib/src/model/event.dart @@ -0,0 +1,28 @@ +import 'package:meta/meta.dart'; + +/// {@template spinify_event} +/// Base class for all channel events. +/// {@endtemplate} +/// {@category Event} +@immutable +abstract base class SpinifyEvent implements Comparable { + /// {@macro spinify_event} + const SpinifyEvent({ + required this.timestamp, + }); + + /// Event type. + abstract final String type; + + /// Timestamp of event + final DateTime timestamp; + + /// Whether this event is a push event. + bool get isPush; + + @override + int compareTo(SpinifyEvent other) => timestamp.compareTo(other.timestamp); + + @override + String toString() => 'SpinifyEvent{type: $type}'; +} diff --git a/lib/src/model/exception.dart b/lib/src/model/exception.dart new file mode 100644 index 0000000..faba366 --- /dev/null +++ b/lib/src/model/exception.dart @@ -0,0 +1,137 @@ +import 'package:meta/meta.dart'; + +/// {@template exception} +/// Spinify exception. +/// {@endtemplate} +/// {@category Exception} +@immutable +sealed class SpinifyException implements Exception { + /// {@macro exception} + const SpinifyException( + this.code, + this.message, [ + this.error, + ]); + + /// Error code. + final String code; + + /// Error message. + final String message; + + /// Source error of exception if exists. + final Object? error; + + @override + int get hashCode => code.hashCode; + + @override + bool operator ==(Object other) => identical(this, other); + + @override + String toString() => message; +} + +/// {@macro exception} +/// {@category Exception} +final class SpinifyConnectionException extends SpinifyException { + /// {@macro exception} + const SpinifyConnectionException({String? message, Object? error}) + : super( + 'spinify_connection_exception', + message ?? 'Connection problem', + error, + ); +} + +/// {@macro exception} +/// {@category Exception} +final class SpinifyReplyException extends SpinifyException { + /// {@macro exception} + const SpinifyReplyException({ + required this.replyCode, + required String replyMessage, + required this.temporary, + }) : super( + 'spinify_reply_exception', + replyMessage, + ); + + /// Reply code. + final int replyCode; + + /// Is reply error final. + final bool temporary; +} + +/// {@macro exception} +/// {@category Exception} +final class SpinifyPingException extends SpinifyException { + /// {@macro exception} + const SpinifyPingException([Object? error]) + : super( + 'spinify_ping_exception', + 'Ping error', + error, + ); +} + +/// {@macro exception} +/// {@category Exception} +final class SpinifySubscriptionException extends SpinifyException { + /// {@macro exception} + const SpinifySubscriptionException({ + required this.channel, + required String message, + Object? error, + }) : super( + 'spinify_subscription_exception', + message, + error, + ); + + /// Subscription channel. + final String channel; +} + +/// {@macro exception} +/// {@category Exception} +final class SpinifySendException extends SpinifyException { + /// {@macro exception} + const SpinifySendException({ + String? message, + Object? error, + }) : super( + 'spinify_send_exception', + message ?? 'Failed to send message', + error, + ); +} + +/// {@macro exception} +/// {@category Exception} +final class SpinifyFetchException extends SpinifyException { + /// {@macro exception} + const SpinifyFetchException({ + String? message, + Object? error, + }) : super( + 'spinify_fetch_exception', + message ?? 'Failed to fetch data', + error, + ); +} + +/// {@macro exception} +/// {@category Exception} +final class SpinifyRefreshException extends SpinifyException { + /// {@macro exception} + const SpinifyRefreshException({ + String? message, + Object? error, + }) : super( + 'spinify_refresh_exception', + message ?? 'Error while refreshing connection token', + error, + ); +} diff --git a/lib/src/model/history.dart b/lib/src/model/history.dart new file mode 100644 index 0000000..256e531 --- /dev/null +++ b/lib/src/model/history.dart @@ -0,0 +1,25 @@ +import 'package:meta/meta.dart'; +import 'publication.dart'; +import 'stream_position.dart'; + +/// {@template history} +/// History +/// {@endtemplate} +/// {@category Entity} +@immutable +final class SpinifyHistory { + /// {@macro history} + const SpinifyHistory({ + required this.publications, + required this.since, + }); + + /// Publications + final List publications; + + /// Offset and epoch of last publication in publications list + final SpinifyStreamPosition since; + + @override + String toString() => 'SpinifyHistory{}'; +} diff --git a/lib/src/model/jwt.dart b/lib/src/model/jwt.dart new file mode 100644 index 0000000..0787190 --- /dev/null +++ b/lib/src/model/jwt.dart @@ -0,0 +1,409 @@ +import 'dart:convert'; + +import 'package:crypto/crypto.dart'; +import 'package:meta/meta.dart'; + +/// {@template jwt} +/// A JWT token consists of three parts: the header, +/// the payload, and the signature or encryption data. +/// The first two elements are JSON objects of a specific structure. +/// The third element is calculated based on the first two +/// and depends on the chosen algorithm +/// (in the case of using an unsigned JWT, it can be omitted). +/// Tokens can be re-encoded into a compact representation +/// (JWS/JWE Compact Serialization): +/// the header and payload are subjected to Base64-URL encoding, +/// after which the signature is added, +/// and all three elements are separated by periods ("."). +/// +/// https://centrifugal.dev/docs/server/authentication#connection-jwt-claims +/// {@endtemplate} +/// {@category Entity} +/// {@subCategory JWT} +@immutable +sealed class SpinifyJWT { + /// {@macro jwt} + /// + /// Creates JWT from [secret] (with HMAC-SHA256 algorithm) + const factory SpinifyJWT({ + required String sub, + int? exp, + int? iat, + String? jti, + String? aud, + String? iss, + Map? info, + String? b64info, + List? channels, + Map? subs, + Map? meta, + int? expireAt, + }) = _SpinifyJWTImpl; + + /// {@macro jwt} + /// + /// Parses JWT, if [secret] is provided + /// then checks signature by HMAC-SHA256 algorithm. + factory SpinifyJWT.decode(String jwt, [String? secret]) = + _SpinifyJWTImpl.decode; + + const SpinifyJWT._(); + + /// This is a standard JWT claim which must contain + /// an ID of the current application user (as string). + /// + /// If a user is not currently authenticated in an application, + /// but you want to let him connect anyway – you can use + /// an empty string as a user ID in sub claim. + /// This is called anonymous access. + abstract final String sub; + + /// This is a UNIX timestamp seconds when the token will expire. + /// This is a standard JWT claim - all JWT libraries + /// for different languages provide an API to set it. + /// + /// If exp claim is not provided then Centrifugo won't expire connection. + /// When provided special algorithm will find connections with exp in the past + /// and activate the connection refresh mechanism. + /// Refresh mechanism allows connection to survive and be prolonged. + /// In case of refresh failure, the client connection + /// will be eventually closed by Centrifugo + /// and won't be accepted until new valid and actual + /// credentials are provided in the connection token. + /// + /// You can use the connection expiration mechanism in + /// cases when you don't want users of your app + /// to be subscribed on channels after being banned/deactivated in the application. + /// Or to protect your users from token leakage + /// (providing a reasonably short time of expiration). + /// + /// Choose exp value wisely, you don't need small + /// values because the refresh mechanism + /// will hit your application often with refresh requests. + /// But setting this value too large can lead + /// to slow user connection deactivation. This is a trade-off. + /// + /// Read more about connection expiration below. + abstract final int? exp; + + /// This is a UNIX time when token was issued (seconds). + /// See [definition in RFC](https://datatracker.ietf.org/doc/html/rfc7519#section-4.1.6). + /// This claim is optional + abstract final int? iat; + + /// This is a token unique ID. See [definition in RFC](https://datatracker.ietf.org/doc/html/rfc7519#section-4.1.7). + /// This claim is optional. + abstract final String? jti; + + /// Audience. + /// [rfc7519 aud claim](https://datatracker.ietf.org/doc/html/rfc7519#section-4.1.3) + /// By default, Centrifugo does not check JWT audience. + /// + /// But you can force this check by setting token_audience string option: + /// ```json + /// { + /// "token_audience": "centrifugo" + /// } + /// ``` + /// + /// Setting token_audience will also affect subscription tokens + /// (used for channel token authorization). + /// If you need to separate connection token configuration + /// and subscription token configuration + /// check out separate subscription token config feature. + /// + /// This claim is optional. + abstract final String? aud; + + /// Issuer. + /// The "iss" (issuer) claim identifies the principal that issued the JWT. + /// [rfc7519 iss claim](https://datatracker.ietf.org/doc/html/rfc7519#section-4.1.1) + /// By default, Centrifugo does not check JWT issuer (rfc7519 iss claim). + /// But you can force this check by setting token_issuer string option: + /// ```json + /// { + /// "token_issuer": "my_app" + /// } + /// ``` + /// + /// Setting token_issuer will also affect subscription tokens + /// (used for channel token authorization). + /// If you need to separate connection token configuration + /// and subscription token configuration + /// check out separate subscription token config feature. + /// + /// This claim is optional. + abstract final String? iss; + + /// This claim is optional - this is additional information + /// about client connection + /// that can be provided for Centrifugo. + /// This information will be included in presence information, + /// join/leave events, and channel publication if it was published from a client-side. + abstract final Map? info; + + /// If you are using binary Protobuf protocol you may want info + /// to be custom bytes. Use this field in this case. + /// + /// This field contains a `base64` representation of your bytes. + /// After receiving Centrifugo will decode base64 back to bytes + /// and will embed the result into various places described above. + abstract final String? b64info; + + /// An optional array of strings with server-side channels + /// to subscribe a client to. + /// See more details about [server-side subscriptions](https://centrifugal.dev/docs/server/server_subs). + abstract final List? channels; + + /// Subscriptions + /// An optional map of channels with options. This is like a channels claim + /// but allows more control over server-side subscription since every channel + /// can be annotated with info, data, and so on using options. + /// The claim sub described above is a standart JWT claim to provide a user ID + /// (it's a shortcut from subject). + /// While claims have similar names they have + /// different purpose in a connection JWT. + /// + /// Example: + /// ```json + /// { + /// ... + /// "subs": { + /// "channel1": { + /// "data": {"welcome": "welcome to channel1"} + /// }, + /// "channel2": { + /// "data": {"welcome": "welcome to channel2"} + /// } + /// } + /// } + /// ``` + /// + /// Subscribe options: + /// - (optional) info - JSON object - Custom channel info + /// - (optional) b64info - string - Custom channel info in Base64 + /// - (optional) data - JSON object - Custom JSON data + /// - (optional) b64data - string - Same as `data` but in Base64 + /// - (optional) override - Override object - Override some channel options. + /// + /// Override object: + /// - (optional) presence - BoolValue - Override presence + /// - (optional) join_leave - BoolValue - Override join_leave + /// - (optional) position - BoolValue - Override position + /// - (optional) recover - BoolValue - Override recover + /// + /// BoolValue is an object like this: + /// ```json + /// { + /// "value": true + /// } + /// ``` + abstract final Map? subs; + + /// Meta is an additional JSON object (ex. `{"key": "value"}`) + /// that will be attached to a connection. + /// Unlike `info` it's never exposed to clients inside presence + /// and join/leave payloads + /// and only accessible on a backend side. It may be included + /// in proxy calls from Centrifugo + /// to the application backend (see `proxy_include_connection_meta` option). + /// Also, there is a `connections` API method in Centrifugo PRO that returns + /// this data in the connection description object. + abstract final Map? meta; + + /// By default, Centrifugo looks on `exp` claim + /// to configure connection expiration. + /// In most cases this is fine, but there could be situations + /// where you wish to decouple token expiration + /// check with connection expiration time. + /// As soon as the `expire_at` claim is provided (set) + /// in JWT Centrifugo relies on it for setting + /// connection expiration time + /// (JWT expiration still checked over `exp` though). + /// + /// `expire_at` is a UNIX timestamp seconds when the connection should expire. + /// + /// Set it to the future time for expiring connection at some point + /// Set it to 0 to disable connection expiration + /// (but still check token exp claim). + abstract final int? expireAt; + + /// Creates JWT from [secret] (with HMAC-SHA256 algorithm) + /// and current payload. + String encode(String secret); +} + +final class _SpinifyJWTImpl extends SpinifyJWT { + const _SpinifyJWTImpl({ + required this.sub, + this.exp, + this.iat, + this.jti, + this.aud, + this.iss, + this.info, + this.b64info, + this.channels, + this.subs, + this.meta, + this.expireAt, + }) : super._(); + + factory _SpinifyJWTImpl.decode(String jwt, [String? secret]) { + // Разделение токена на составляющие части + var parts = jwt.split('.'); + if (parts.length != 3) { + throw const FormatException( + 'Invalid token format, expected 3 parts separated by "."'); + } + final [encodedHeader, encodedPayload, encodedSignature] = parts; + + if (secret != null) { + // Вычисление подписи + final key = utf8.encode(secret); // Your 256 bit secret key + final bytes = utf8.encode('$encodedHeader.$encodedPayload'); + var hmacSha256 = Hmac(sha256, key); // HMAC-SHA256 + var digest = hmacSha256.convert(bytes); + + // Кодирование подписи + var computedSignature = base64Url.encode(digest.bytes); + + // Сравнение подписи в токене с вычисленной подписью + if (computedSignature != encodedSignature) { + throw const FormatException('Invalid token signature'); + } + } + + Map payload; + try { + payload = const Base64Decoder() + .fuse(const Utf8Decoder()) + .fuse>( + const JsonDecoder().cast>()) + .convert(encodedPayload); + } on Object catch (_, stackTrace) { + Error.throwWithStackTrace( + const FormatException('Can\'t decode token payload'), stackTrace); + } + try { + return _SpinifyJWTImpl( + sub: payload['sub'] as String, + exp: payload['exp'] as int?, + iat: payload['iat'] as int?, + jti: payload['jti'] as String?, + aud: payload['aud'] as String?, + iss: payload['iss'] as String?, + info: payload['info'] as Map?, + b64info: payload['b64info'] as String?, + channels: (payload['channels'] as Iterable?) + ?.whereType() + .toList(), + subs: payload['subs'] as Map?, + meta: payload['meta'] as Map?, + expireAt: payload['expire_at'] as int?, + ); + } on Object catch (_, stackTrace) { + Error.throwWithStackTrace( + const FormatException('Invalid token payload data'), stackTrace); + } + } + + static final Converter, String> _$encoder = + const JsonEncoder() + .cast, String>() + .fuse>(const Utf8Encoder()) + .fuse(const Base64Encoder.urlSafe()) + .fuse(const _UnpaddedBase64Converter()); + + static final String _$headerHmacSha256 = _$encoder.convert({ + 'alg': 'HS256', + 'typ': 'JWT', + }); + + @override + final String sub; + + @override + final int? exp; + + @override + final int? iat; + + @override + final String? jti; + + @override + final String? aud; + + @override + final String? iss; + + @override + final Map? info; + + @override + final String? b64info; + + @override + final List? channels; + + @override + final Map? subs; + + @override + final Map? meta; + + @override + final int? expireAt; + + @override + String encode(String secret) { + // Encode header and payload + final encodedHeader = _$headerHmacSha256; + final encodedPayload = _$encoder.convert({ + 'sub': sub, + if (exp != null) 'exp': exp, + if (iat != null) 'iat': iat, + if (jti != null) 'jti': jti, + if (aud != null) 'aud': aud, + if (iss != null) 'iss': iss, + if (info != null) 'info': info, + if (b64info != null) 'b64info': b64info, + if (channels != null) 'channels': channels, + if (subs != null) 'subs': subs, + if (meta != null) 'meta': meta, + if (expireAt != null) 'expire_at': expireAt, + }); + + // Payload signature + final key = utf8.encode(secret); // Your 256 bit secret key + final bytes = utf8.encode('$encodedHeader.$encodedPayload'); + + final hmacSha256 = Hmac(sha256, key); // HMAC-SHA256 + final digest = hmacSha256.convert(bytes); + + // Encode signature + final encodedSignature = const Base64Encoder.urlSafe() + .fuse(const _UnpaddedBase64Converter()) + .convert(digest.bytes); + + // Return JWT + return '$encodedHeader.$encodedPayload.$encodedSignature'; + } + + @override + String toString() => 'SpinifyJWT{sub: $sub}'; +} + +/// A converter that converts Base64-encoded strings +/// to unpadded Base64-encoded strings. +class _UnpaddedBase64Converter extends Converter { + const _UnpaddedBase64Converter(); + + @override + String convert(String input) { + final padding = input.indexOf('=', input.length - 2); + if (padding != -1) return input.substring(0, padding); + return input; + } +} diff --git a/lib/src/model/message.dart b/lib/src/model/message.dart new file mode 100644 index 0000000..7066365 --- /dev/null +++ b/lib/src/model/message.dart @@ -0,0 +1,24 @@ +import 'channel_push.dart'; + +/// {@template message} +/// Message push from Centrifugo server. +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +final class SpinifyMessage extends SpinifyChannelPush { + /// {@macro message} + const SpinifyMessage({ + required super.timestamp, + required super.channel, + required this.data, + }); + + @override + String get type => 'message'; + + /// Payload of message. + final List data; + + @override + String toString() => 'SpinifyMessage{channel: $channel}'; +} diff --git a/lib/src/model/metrics.dart b/lib/src/model/metrics.dart new file mode 100644 index 0000000..e7f5863 --- /dev/null +++ b/lib/src/model/metrics.dart @@ -0,0 +1,149 @@ +import 'package:meta/meta.dart'; + +import 'state.dart'; + +/// Subscription count +/// - total +/// - unsubscribed +/// - subscribing +/// - subscribed +/// +/// {@category Metrics} +/// {@category Entity} +typedef SpinifySubscriptionCount = ({ + int total, + int unsubscribed, + int subscribing, + int subscribed +}); + +/// {@template metrics} +/// Metrics of Spinify client. +/// {@endtemplate} +/// +/// {@category Metrics} +/// {@category Entity} +@immutable +final class SpinifyMetrics implements Comparable { + /// {@macro metrics} + const SpinifyMetrics({ + required this.timestamp, + required this.initializedAt, + required this.state, + required this.transferred, + required this.received, + required this.reconnects, + required this.subscriptions, + required this.speed, + required this.lastUrl, + required this.lastConnectTime, + required this.lastDisconnectTime, + required this.disconnects, + required this.lastDisconnect, + required this.isRefreshActive, + }); + + /// Timestamp of the metrics. + final DateTime timestamp; + + /// The time when the client was initialized. + final DateTime initializedAt; + + /// The current state of the client. + final SpinifyState state; + + /// The total number of messages & size of bytes sent. + final ({BigInt count, BigInt size}) transferred; + + /// The total number of messages & size of bytes received. + final ({BigInt count, BigInt size}) received; + + /// The total number of times the connection has been re-established. + final ({int successful, int total}) reconnects; + + /// The number of subscriptions. + final ({ + SpinifySubscriptionCount client, + SpinifySubscriptionCount server + }) subscriptions; + + /// The speed of the request/response in milliseconds. + /// - min - minimum speed + /// - avg - average speed + /// - max - maximum speed + final ({int min, int avg, int max}) speed; + + /// The last URL used to connect. + final String? lastUrl; + + /// The time of the last connect. + final DateTime? lastConnectTime; + + /// The time of the last disconnect. + final DateTime? lastDisconnectTime; + + /// The total number of times the connection has been disconnected. + final int disconnects; + + /// The last disconnect reason. + final ({int? code, String? reason})? lastDisconnect; + + /// Is refresh active. + final bool isRefreshActive; + + @override + int compareTo(SpinifyMetrics other) => timestamp.compareTo(other.timestamp); + + /// Convert metrics to JSON. + Map toJson() => { + 'timestamp': timestamp.toIso8601String(), + 'initializedAt': initializedAt.toIso8601String(), + 'lastConnectTime': lastConnectTime?.toIso8601String(), + 'lastDisconnectTime': lastDisconnectTime?.toIso8601String(), + 'state': state.toJson(), + 'lastUrl': lastUrl, + 'reconnects': { + 'successful': reconnects.successful, + 'total': reconnects.total, + }, + 'subscriptions': >{ + 'client': { + 'total': subscriptions.client.total, + 'unsubscribed': subscriptions.client.unsubscribed, + 'subscribing': subscriptions.client.subscribing, + 'subscribed': subscriptions.client.subscribed, + }, + 'server': { + 'total': subscriptions.server.total, + 'unsubscribed': subscriptions.server.unsubscribed, + 'subscribing': subscriptions.server.subscribing, + 'subscribed': subscriptions.server.subscribed, + }, + }, + 'speed': { + 'min': speed.min, + 'avg': speed.avg, + 'max': speed.max, + }, + 'transferred': { + 'count': transferred.count, + 'size': transferred.size, + }, + 'received': { + 'count': received.count, + 'size': received.size, + }, + 'isRefreshActive': isRefreshActive, + 'disconnects': disconnects, + 'lastDisconnect': switch (lastDisconnect) { + (:int? code, :String? reason) => { + 'code': code, + 'reason': reason, + }, + _ => null, + }, + }; + + @override + String toString() => 'SpinifyMetrics{}'; +} diff --git a/lib/src/model/presence.dart b/lib/src/model/presence.dart new file mode 100644 index 0000000..65fd7e2 --- /dev/null +++ b/lib/src/model/presence.dart @@ -0,0 +1,24 @@ +import 'package:meta/meta.dart'; +import 'client_info.dart'; + +/// {@template presence} +/// Presence +/// {@endtemplate} +/// {@category Entity} +@immutable +final class SpinifyPresence { + /// {@macro presence} + const SpinifyPresence({ + required this.channel, + required this.clients, + }); + + /// Channel + final String channel; + + /// Publications + final Map clients; + + @override + String toString() => 'SpinifyPresence{channel: $channel}'; +} diff --git a/lib/src/model/presence_stats.dart b/lib/src/model/presence_stats.dart new file mode 100644 index 0000000..eb3bc04 --- /dev/null +++ b/lib/src/model/presence_stats.dart @@ -0,0 +1,27 @@ +import 'package:meta/meta.dart'; + +/// {@template presence_stats} +/// Presence stats +/// {@endtemplate} +/// {@category Entity} +@immutable +final class SpinifyPresenceStats { + /// {@macro presence_stats} + const SpinifyPresenceStats({ + required this.channel, + required this.clients, + required this.users, + }); + + /// Channel + final String channel; + + /// Clients count + final int clients; + + /// Users count + final int users; + + @override + String toString() => 'SpinifyPresenceStats{channel: $channel}'; +} diff --git a/lib/src/model/publication.dart b/lib/src/model/publication.dart new file mode 100644 index 0000000..2c9e27f --- /dev/null +++ b/lib/src/model/publication.dart @@ -0,0 +1,41 @@ +import 'package:fixnum/fixnum.dart' as fixnum; +import 'package:meta/meta.dart'; +import 'channel_push.dart'; +import 'client_info.dart'; + +/// {@template publication} +/// Publication context +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +@immutable +final class SpinifyPublication extends SpinifyChannelPush { + /// {@macro publication} + const SpinifyPublication({ + required super.timestamp, + required super.channel, + required this.data, + this.offset, + this.info, + this.tags, + }); + + @override + String get type => 'publication'; + + /// Publication payload + final List data; + + /// Optional offset inside history stream, this is an incremental number + final fixnum.Int64? offset; + + /// Optional information about client connection who published this + /// (only exists if publication comes from client-side publish() API). + final SpinifyClientInfo? info; + + /// Optional tags, this is a map with string keys and string values + final Map? tags; + + @override + String toString() => 'SpinifyPublication{channel: $channel}'; +} diff --git a/lib/src/model/pubspec.yaml.g.dart b/lib/src/model/pubspec.yaml.g.dart new file mode 100644 index 0000000..5fdca7f --- /dev/null +++ b/lib/src/model/pubspec.yaml.g.dart @@ -0,0 +1,497 @@ +// ignore_for_file: lines_longer_than_80_chars, unnecessary_raw_strings +// ignore_for_file: use_raw_strings, avoid_classes_with_only_static_members +// ignore_for_file: avoid_escaping_inner_quotes, prefer_single_quotes + +/// GENERATED CODE - DO NOT MODIFY BY HAND + +library pubspec; + +// ***************************************************************************** +// * pubspec_generator * +// ***************************************************************************** + +/* + + MIT License + + Copyright (c) 2023 Plague Fox + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. + + */ + +/// Given a version number MAJOR.MINOR.PATCH, increment the: +/// +/// 1. MAJOR version when you make incompatible API changes +/// 2. MINOR version when you add functionality in a backward compatible manner +/// 3. PATCH version when you make backward compatible bug fixes +/// +/// Additional labels for pre-release and build metadata are available +/// as extensions to the MAJOR.MINOR.PATCH format. +typedef PubspecVersion = ({ + String representation, + String canonical, + int major, + int minor, + int patch, + List preRelease, + List build +}); + +/// # The pubspec file +/// +/// Code generated pubspec.yaml.g.dart from pubspec.yaml +/// This class is generated from pubspec.yaml, do not edit directly. +/// +/// Every pub package needs some metadata so it can specify its dependencies. +/// Pub packages that are shared with others also need to provide some other +/// information so users can discover them. All of this metadata goes +/// in the package’s pubspec: +/// a file named pubspec.yaml that’s written in the YAML language. +/// +/// Read more: +/// - https://pub.dev/packages/pubspec_generator +/// - https://dart.dev/tools/pub/pubspec +sealed class Pubspec { + /// Version + /// + /// Current app [version] + /// + /// Every package has a version. + /// A version number is required to host your package on the pub.dev site, + /// but can be omitted for local-only packages. + /// If you omit it, your package is implicitly versioned 0.0.0. + /// + /// Versioning is necessary for reusing code while letting it evolve quickly. + /// A version number is three numbers separated by dots, like 0.2.43. + /// It can also optionally have a build ( +1, +2, +hotfix.oopsie) + /// or prerelease (-dev.4, -alpha.12, -beta.7, -rc.5) suffix. + /// + /// Each time you publish your package, you publish it at a specific version. + /// Once that’s been done, consider it hermetically sealed: + /// you can’t touch it anymore. To make more changes, + /// you’ll need a new version. + /// + /// When you select a version, + /// follow [semantic versioning](https://semver.org/). + static const PubspecVersion version = ( + /// Non-canonical string representation of the version as provided + /// in the pubspec.yaml file. + representation: r'0.0.1-pre.6', + + /// Returns a 'canonicalized' representation + /// of the application version. + /// This represents the version string in accordance with + /// Semantic Versioning (SemVer) standards. + canonical: r'0.0.1-pre.6', + + /// MAJOR version when you make incompatible API changes. + /// The major version number: 1 in "1.2.3". + major: 0, + + /// MINOR version when you add functionality + /// in a backward compatible manner. + /// The minor version number: 2 in "1.2.3". + minor: 0, + + /// PATCH version when you make backward compatible bug fixes. + /// The patch version number: 3 in "1.2.3". + patch: 1, + + /// The pre-release identifier: "foo" in "1.2.3-foo". + preRelease: [r'pre', r'6'], + + /// The build identifier: "foo" in "1.2.3+foo". + build: [], + ); + + /// Build date and time (UTC) + static final DateTime timestamp = DateTime.utc( + 2023, + 8, + 4, + 8, + 56, + 57, + 323, + 753, + ); + + /// Name + /// + /// Current app [name] + /// + /// Every package needs a name. + /// It’s how other packages refer to yours, and how it appears to the world, + /// should you publish it. + /// + /// The name should be all lowercase, with underscores to separate words, + /// just_like_this. Use only basic Latin letters and Arabic digits: + /// [a-z0-9_]. Also, make sure the name is a valid Dart identifier—that + /// it doesn’t start with digits + /// and isn’t a [reserved word](https://dart.dev/language/keywords). + /// + /// Try to pick a name that is clear, terse, and not already in use. + /// A quick search of packages on the [pub.dev site](https://pub.dev/packages) + /// to make sure that nothing else is using your name is recommended. + static const String name = r'spinify'; + + /// Description + /// + /// Current app [description] + /// + /// This is optional for your own personal packages, + /// but if you intend to publish your package you must provide a description, + /// which should be in English. + /// The description should be relatively short, from 60 to 180 characters + /// and tell a casual reader what they might want to know about your package. + /// + /// Think of the description as the sales pitch for your package. + /// Users see it when they [browse for packages](https://pub.dev/packages). + /// The description is plain text: no markdown or HTML. + static const String description = + r'Dart client to communicate with Centrifuge and Centrifugo from Flutter and VM over WebSockets'; + + /// Homepage + /// + /// Current app [homepage] + /// + /// This should be a URL pointing to the website for your package. + /// For [hosted packages](https://dart.dev/tools/pub/dependencies#hosted-packages), + /// this URL is linked from the package’s page. + /// While providing a homepage is optional, + /// please provide it or repository (or both). + /// It helps users understand where your package is coming from. + static const String homepage = r'https://centrifugal.dev'; + + /// Repository + /// + /// Current app [repository] + /// + /// Repository + /// The optional repository field should contain the URL for your package’s + /// source code repository—for example, + /// https://github.com//. + /// If you publish your package to the pub.dev site, + /// then your package’s page displays the repository URL. + /// While providing a repository is optional, + /// please provide it or homepage (or both). + /// It helps users understand where your package is coming from. + static const String repository = r'https://github.com/PlugFox/spinify'; + + /// Issue tracker + /// + /// Current app [issueTracker] + /// + /// The optional issue_tracker field should contain a URL for the package’s + /// issue tracker, where existing bugs can be viewed and new bugs can be filed. + /// The pub.dev site attempts to display a link + /// to each package’s issue tracker, using the value of this field. + /// If issue_tracker is missing but repository is present and points to GitHub, + /// then the pub.dev site uses the default issue tracker + /// (https://github.com///issues). + static const String issueTracker = + r'https://github.com/PlugFox/spinify/issues'; + + /// Documentation + /// + /// Current app [documentation] + /// + /// Some packages have a site that hosts documentation, + /// separate from the main homepage and from the Pub-generated API reference. + /// If your package has additional documentation, add a documentation: + /// field with that URL; pub shows a link to this documentation + /// on your package’s page. + static const String documentation = r''; + + /// Publish_to + /// + /// Current app [publishTo] + /// + /// The default uses the [pub.dev](https://pub.dev/) site. + /// Specify none to prevent a package from being published. + /// This setting can be used to specify a custom pub package server to publish. + /// + /// ```yaml + /// publish_to: none + /// ``` + static const String publishTo = r'https://pub.dev/'; + + /// Funding + /// + /// Current app [funding] + /// + /// Package authors can use the funding property to specify + /// a list of URLs that provide information on how users + /// can help fund the development of the package. For example: + /// + /// ```yaml + /// funding: + /// - https://www.buymeacoffee.com/example_user + /// - https://www.patreon.com/some-account + /// ``` + /// + /// If published to [pub.dev](https://pub.dev/) the links are displayed on the package page. + /// This aims to help users fund the development of their dependencies. + static const List funding = [ + r'https://www.buymeacoffee.com/plugfox', + r'https://www.patreon.com/plugfox', + r'https://boosty.to/plugfox', + ]; + + /// False_secrets + /// + /// Current app [falseSecrets] + /// + /// When you try to publish a package, + /// pub conducts a search for potential leaks of secret credentials, + /// API keys, or cryptographic keys. + /// If pub detects a potential leak in a file that would be published, + /// then pub warns you and refuses to publish the package. + /// + /// Leak detection isn’t perfect. To avoid false positives, + /// you can tell pub not to search for leaks in certain files, + /// by creating an allowlist using gitignore + /// patterns under false_secrets in the pubspec. + /// + /// For example, the following entry causes pub not to look + /// for leaks in the file lib/src/hardcoded_api_key.dart + /// and in all .pem files in the test/localhost_certificates/ directory: + /// + /// ```yaml + /// false_secrets: + /// - /lib/src/hardcoded_api_key.dart + /// - /test/localhost_certificates/*.pem + /// ``` + /// + /// Starting a gitignore pattern with slash (/) ensures + /// that the pattern is considered relative to the package’s root directory. + static const List falseSecrets = []; + + /// Screenshots + /// + /// Current app [screenshots] + /// + /// Packages can showcase their widgets or other visual elements + /// using screenshots displayed on their pub.dev page. + /// To specify screenshots for the package to display, + /// use the screenshots field. + /// + /// A package can list up to 10 screenshots under the screenshots field. + /// Don’t include logos or other branding imagery in this section. + /// Each screenshot includes one description and one path. + /// The description explains what the screenshot depicts + /// in no more than 160 characters. For example: + /// + /// ```yaml + /// screenshots: + /// - description: 'This screenshot shows the transformation of a number of bytes + /// to a human-readable expression.' + /// path: path/to/image/in/package/500x500.webp + /// - description: 'This screenshot shows a stack trace returning a human-readable + /// representation.' + /// path: path/to/image/in/package.png + /// ``` + /// + /// Pub.dev limits screenshots to the following specifications: + /// + /// - File size: max 4 MB per image. + /// - File types: png, jpg, gif, or webp. + /// - Static and animated images are both allowed. + /// + /// Keep screenshot files small. Each download of the package + /// includes all screenshot files. + /// + /// Pub.dev generates the package’s thumbnail image from the first screenshot. + /// If this screenshot uses animation, pub.dev uses its first frame. + static const List screenshots = []; + + /// Topics + /// + /// Current app [topics] + /// + /// Package authors can use the topics field to categorize their package. Topics can be used to assist discoverability during search with filters on pub.dev. Pub.dev displays the topics on the package page as well as in the search results. + /// + /// The field consists of a list of names. For example: + /// + /// ```yaml + /// topics: + /// - network + /// - http + /// ``` + /// + /// Pub.dev requires topics to follow these specifications: + /// + /// - Tag each package with at most 5 topics. + /// - Write the topic name following these requirements: + /// 1) Use between 2 and 32 characters. + /// 2) Use only lowercase alphanumeric characters or hyphens (a-z, 0-9, -). + /// 3) Don’t use two consecutive hyphens (--). + /// 4) Start the name with lowercase alphabet characters (a-z). + /// 5) End with alphanumeric characters (a-z or 0-9). + /// + /// When choosing topics, consider if existing topics are relevant. + /// Tagging with existing topics helps users discover your package. + static const List topics = [ + r'spinify', + r'centrifugo', + r'centrifuge', + r'websocket', + r'cross-platform', + ]; + + /// Environment + static const Map environment = { + 'sdk': '>=3.0.0 <4.0.0', + }; + + /// Platforms + /// + /// Current app [platforms] + /// + /// When you [publish a package](https://dart.dev/tools/pub/publishing), + /// pub.dev automatically detects the platforms that the package supports. + /// If this platform-support list is incorrect, + /// use platforms to explicitly declare which platforms your package supports. + /// + /// For example, the following platforms entry causes + /// pub.dev to list the package as supporting + /// Android, iOS, Linux, macOS, Web, and Windows: + /// + /// ```yaml + /// # This package supports all platforms listed below. + /// platforms: + /// android: + /// ios: + /// linux: + /// macos: + /// web: + /// windows: + /// ``` + /// + /// Here is an example of declaring that the package supports only Linux and macOS (and not, for example, Windows): + /// + /// ```yaml + /// # This package supports only Linux and macOS. + /// platforms: + /// linux: + /// macos: + /// ``` + static const Map platforms = { + 'android': r'', + 'ios': r'', + 'linux': r'', + 'macos': r'', + 'web': r'', + 'windows': r'', + }; + + /// Dependencies + /// + /// Current app [dependencies] + /// + /// [Dependencies](https://dart.dev/tools/pub/glossary#dependency) + /// are the pubspec’s `raison d’être`. + /// In this section you list each package that + /// your package needs in order to work. + /// + /// Dependencies fall into one of two types. + /// Regular dependencies are listed under dependencies: + /// these are packages that anyone using your package will also need. + /// Dependencies that are only needed in + /// the development phase of the package itself + /// are listed under dev_dependencies. + /// + /// During the development process, + /// you might need to temporarily override a dependency. + /// You can do so using dependency_overrides. + /// + /// For more information, + /// see [Package dependencies](https://dart.dev/tools/pub/dependencies). + static const Map dependencies = { + 'meta': r'^1.9.1', + 'ws': r'^1.0.0-pre.6', + 'protobuf': r'^3.0.0', + 'crypto': r'^3.0.3', + 'fixnum': r'^1.1.0', + 'stack_trace': r'^1.11.0', + }; + + /// Developer dependencies + static const Map devDependencies = { + 'build_runner': r'^2.4.6', + 'pubspec_generator': r'^4.0.0', + 'lints': r'^2.0.1', + 'test': r'^1.24.2', + }; + + /// Dependency overrides + static const Map dependencyOverrides = {}; + + /// Executables + /// + /// Current app [executables] + /// + /// A package may expose one or more of its scripts as executables + /// that can be run directly from the command line. + /// To make a script publicly available, + /// list it under the executables field. + /// Entries are listed as key/value pairs: + /// + /// ```yaml + /// : + /// ``` + /// + /// For example, the following pubspec entry lists two scripts: + /// + /// ```yaml + /// executables: + /// slidy: main + /// fvm: + /// ``` + /// + /// Once the package is activated using pub global activate, + /// typing `slidy` executes `bin/main.dart`. + /// Typing `fvm` executes `bin/fvm.dart`. + /// If you don’t specify the value, it is inferred from the key. + /// + /// For more information, see pub global. + static const Map executables = {}; + + /// Source data from pubspec.yaml + static const Map source = { + 'name': name, + 'description': description, + 'repository': repository, + 'issue_tracker': issueTracker, + 'homepage': homepage, + 'documentation': documentation, + 'publish_to': publishTo, + 'version': version, + 'funding': funding, + 'false_secrets': falseSecrets, + 'screenshots': screenshots, + 'topics': topics, + 'platforms': platforms, + 'environment': environment, + 'dependencies': dependencies, + 'dev_dependencies': devDependencies, + 'dependency_overrides': dependencyOverrides, + }; +} diff --git a/lib/src/model/pushes_stream.dart b/lib/src/model/pushes_stream.dart new file mode 100644 index 0000000..acdaf95 --- /dev/null +++ b/lib/src/model/pushes_stream.dart @@ -0,0 +1,54 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; +import 'channel_presence.dart'; +import 'channel_push.dart'; +import 'event.dart'; +import 'message.dart'; +import 'publication.dart'; + +/// Stream of received pushes from Centrifugo server for a channel. +/// {@category Event} +/// {@category Client} +/// {@category Subscription} +/// {@subCategory Push} +/// {@subCategory Channel} +@immutable +final class SpinifyPushesStream extends StreamView { + /// Stream of received events. + const SpinifyPushesStream({ + required Stream pushes, + required this.publications, + required this.messages, + required this.presenceEvents, + required this.joinEvents, + required this.leaveEvents, + }) : super(pushes); + + /// Publications stream. + final Stream publications; + + /// Messages stream. + final Stream messages; + + /// Stream of presence (join & leave) events. + final Stream presenceEvents; + + /// Join events + final Stream joinEvents; + + /// Leave events + final Stream leaveEvents; + + /// Filtered stream of data of [SpinifyEvent]. + Stream whereType() => + transform(StreamTransformer.fromHandlers( + handleData: (data, sink) => switch (data) { + T valid => sink.add(valid), + _ => null, + }, + )).asBroadcastStream(); + + @override + String toString() => 'SpinifyPushesStream{}'; +} diff --git a/lib/src/model/refresh.dart b/lib/src/model/refresh.dart new file mode 100644 index 0000000..fb635d5 --- /dev/null +++ b/lib/src/model/refresh.dart @@ -0,0 +1,28 @@ +import 'channel_push.dart'; + +/// {@template refresh} +/// Refresh push from Centrifugo server. +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +final class SpinifyRefresh extends SpinifyChannelPush { + /// {@macro refresh} + const SpinifyRefresh({ + required super.timestamp, + required super.channel, + required this.expires, + this.ttl, + }); + + @override + String get type => 'refresh'; + + /// Whether a server will expire connection at some point + final bool expires; + + /// Time when connection will be expired + final DateTime? ttl; + + @override + String toString() => 'SpinifyRefresh{channel: $channel}'; +} diff --git a/lib/src/model/refresh_result.dart b/lib/src/model/refresh_result.dart new file mode 100644 index 0000000..37ef0b0 --- /dev/null +++ b/lib/src/model/refresh_result.dart @@ -0,0 +1,40 @@ +import 'package:meta/meta.dart'; + +/// Result of connection refresh +final class SpinifyRefreshResult { + /// Result of connection refresh + const SpinifyRefreshResult({ + required this.expires, + this.client, + this.version, + this.ttl, + }); + + /// Unique client connection ID server issued to this connection + final String? client; + + /// Server version + final String? version; + + /// Whether a server will expire connection at some point + final bool expires; + + /// Time when connection will be expired + final DateTime? ttl; +} + +/// Result of subscription refresh +@immutable +final class SpinifySubRefreshResult { + /// Result of connection refresh + const SpinifySubRefreshResult({ + required this.expires, + this.ttl, + }); + + /// Whether a server will expire subscription at some point + final bool expires; + + /// Time when subscription will be expired + final DateTime? ttl; +} diff --git a/lib/src/model/state.dart b/lib/src/model/state.dart new file mode 100644 index 0000000..2dea951 --- /dev/null +++ b/lib/src/model/state.dart @@ -0,0 +1,486 @@ +import 'dart:convert'; + +import 'package:meta/meta.dart'; + +/// {@template state} +/// Spinify client connection states +/// +/// Client connection has 4 states: +/// +/// - disconnected +/// - connecting +/// - connected +/// - closed +/// +/// When a new Client is created it has a disconnected state. +/// To connect to a server connect() method must be called. +/// After calling connect Client moves to the connecting state. +/// If a Client can't connect to a server it attempts to create +/// a connection with an exponential backoff algorithm (with full jitter). +/// If a connection to a server is connected then the state becomes connected. +/// +/// If a connection is lost (due to a missing network for example, +/// or due to reconnect advice received from a server, +/// or due to some client-side closed that can't be recovered +/// without reconnecting) Client goes to the connecting state again. +/// In this state Client tries to reconnect +/// (again, with an exponential backoff algorithm). +/// +/// The Client's state can become disconnected. +/// This happens when Client's disconnect() method was called by a developer. +/// Also, this can happen due to server advice from a server, +/// or due to a terminal problem that happened on the client-side. +/// {@endtemplate} +/// {@category Client} +/// {@category Entity} +@immutable +sealed class SpinifyState extends _$SpinifyStateBase { + /// {@macro state} + const SpinifyState(super.timestamp); + + /// Disconnected state + /// {@macro state} + factory SpinifyState.disconnected({ + DateTime? timestamp, + int? closeCode, + String? closeReason, + }) = SpinifyState$Disconnected; + + /// Connecting + /// {@macro state} + factory SpinifyState.connecting({required String url, DateTime? timestamp}) = + SpinifyState$Connecting; + + /// Connected + /// {@macro state} + factory SpinifyState.connected({ + required String url, + DateTime? timestamp, + String? client, + String? version, + bool? expires, + DateTime? ttl, + Duration? pingInterval, + bool? sendPong, + String? session, + String? node, + List? data, + }) = SpinifyState$Connected; + + /// Permanently closed + /// {@macro state} + factory SpinifyState.closed({DateTime? timestamp}) = SpinifyState$Closed; + + /// Restore state from JSON + /// {@macro state} + factory SpinifyState.fromJson(Map json) => switch (( + json['type']?.toString().trim().toLowerCase(), + json['timestamp'] ?? DateTime.now().microsecondsSinceEpoch, + json['url'], + )) { + ('disconnected', int timestamp, _) => SpinifyState.disconnected( + timestamp: DateTime.fromMicrosecondsSinceEpoch(timestamp), + closeCode: switch (json['closeCode']) { + int closeCode => closeCode, + _ => null, + }, + closeReason: switch (json['closeReason']) { + String closeReason => closeReason, + _ => null, + }, + ), + ('connecting', int timestamp, String url) => SpinifyState.connecting( + url: url, + timestamp: DateTime.fromMicrosecondsSinceEpoch(timestamp), + ), + ('connected', int timestamp, String url) => SpinifyState.connected( + url: url, + timestamp: DateTime.fromMicrosecondsSinceEpoch(timestamp), + client: json['client']?.toString(), + version: json['version']?.toString(), + expires: switch (json['expires']) { + bool expires => expires, + _ => null, + }, + ttl: switch (json['ttl']) { + int ttl => DateTime.fromMicrosecondsSinceEpoch(ttl), + _ => null, + }, + pingInterval: switch (json['pingInterval']) { + int pingInterval => Duration(seconds: pingInterval), + _ => null, + }, + sendPong: switch (json['sendPong']) { + bool sendPong => sendPong, + _ => null, + }, + session: json['session']?.toString(), + node: json['node']?.toString(), + data: switch (json['data']) { + String data when data.isNotEmpty => base64Decode(data), + _ => null, + }, + ), + ('closed', int timestamp, _) => SpinifyState.closed( + timestamp: DateTime.fromMicrosecondsSinceEpoch(timestamp), + ), + _ => throw FormatException('Unknown state: $json'), + }; +} + +/// Disconnected +/// Client should handle disconnect advices from server. +/// In websocket case disconnect advice is sent in CLOSE Websocket frame. +/// Disconnect advice contains uint32 code and human-readable string reason. +/// +/// {@macro state} +/// {@category Client} +/// {@category Entity} +final class SpinifyState$Disconnected extends SpinifyState { + /// Disconnected + /// + /// {@macro state} + SpinifyState$Disconnected({ + DateTime? timestamp, + this.closeCode, + this.closeReason, + }) : super(timestamp ?? DateTime.now()); + + @override + String get type => 'disconnected'; + + @override + String? get url => null; + + /// The close code set when the WebSocket connection is closed. + /// If there is no close code available this property will be null. + final int? closeCode; + + /// The close reason set when the WebSocket connection is closed. + /// If there is no close reason available this property will be null. + final String? closeReason; + + @override + bool get isDisconnected => true; + + @override + bool get isConnecting => false; + + @override + bool get isConnected => false; + + @override + bool get isClosed => false; + + @override + R map({ + required SpinifyStateMatch disconnected, + required SpinifyStateMatch connecting, + required SpinifyStateMatch connected, + required SpinifyStateMatch closed, + }) => + disconnected(this); + + @override + Map toJson() => { + ...super.toJson(), + if (closeCode != null) 'closeCode': closeCode, + if (closeReason != null) 'closeReason': closeReason, + }; + + @override + int get hashCode => 0; + + @override + bool operator ==(Object other) => + identical(this, other) || + (other is SpinifyState$Disconnected && + other.timestamp.isAtSameMomentAs(timestamp)); + + @override + String toString() => r'SpinifyState$Disconnected{}'; +} + +/// Connecting +/// +/// {@macro state} +/// {@category Client} +/// {@category Entity} +final class SpinifyState$Connecting extends SpinifyState { + /// Connecting + /// + /// {@macro state} + SpinifyState$Connecting({required this.url, DateTime? timestamp}) + : super(timestamp ?? DateTime.now()); + + @override + String get type => 'connecting'; + + @override + final String url; + + @override + bool get isDisconnected => false; + + @override + bool get isConnecting => true; + + @override + bool get isConnected => false; + + @override + bool get isClosed => false; + + @override + R map({ + required SpinifyStateMatch disconnected, + required SpinifyStateMatch connecting, + required SpinifyStateMatch connected, + required SpinifyStateMatch closed, + }) => + connecting(this); + + @override + int get hashCode => 1; + + @override + bool operator ==(Object other) => + identical(this, other) || + (other is SpinifyState$Connecting && + other.timestamp.isAtSameMomentAs(timestamp)); + + @override + String toString() => r'SpinifyState$Connecting{}'; +} + +/// Connected +/// +/// {@macro state} +/// {@category Client} +/// {@category Entity} +final class SpinifyState$Connected extends SpinifyState { + /// Connected + /// + /// {@macro state} + SpinifyState$Connected({ + required this.url, + DateTime? timestamp, + this.client, + this.version, + this.ttl, + this.expires, + this.pingInterval, + this.sendPong, + this.session, + this.node, + this.data, + }) : super(timestamp ?? DateTime.now()); + + @override + String get type => 'connected'; + + @override + final String url; + + /// Unique client connection ID server issued to this connection + final String? client; + + /// Server version + final String? version; + + /// Whether a server will expire connection at some point + final bool? expires; + + /// Time when connection will be expired + final DateTime? ttl; + + /// Client must periodically (once in 25 secs, configurable) send + /// ping messages to server. If pong has not beed received in 5 secs + /// (configurable) then client must disconnect from server + /// and try to reconnect with backoff strategy. + final Duration? pingInterval; + + /// Whether to send asynchronous message when pong received. + final bool? sendPong; + + /// Session ID. + final String? session; + + /// Server node ID. + final String? node; + + /// Additional data returned from server on connect. + final List? data; + + @override + bool get isDisconnected => false; + + @override + bool get isConnecting => false; + + @override + bool get isConnected => true; + + @override + bool get isClosed => false; + + @override + R map({ + required SpinifyStateMatch disconnected, + required SpinifyStateMatch connecting, + required SpinifyStateMatch connected, + required SpinifyStateMatch closed, + }) => + connected(this); + + @override + Map toJson() => { + ...super.toJson(), + if (client != null) 'client': client, + if (version != null) 'version': version, + if (expires != null) 'expires': expires, + if (ttl != null) 'ttl': ttl?.microsecondsSinceEpoch, + if (pingInterval != null) 'pingInterval': pingInterval?.inSeconds, + if (sendPong != null) 'sendPong': sendPong, + if (session != null) 'session': session, + if (node != null) 'node': node, + if (data != null) 'data': base64Encode(data!), + }; + + @override + int get hashCode => 2; + + @override + bool operator ==(Object other) => + identical(this, other) || + (other is SpinifyState$Connected && + other.timestamp.isAtSameMomentAs(timestamp)); + + @override + String toString() => r'SpinifyState$Connected{}'; +} + +/// Permanently closed +/// +/// {@macro state} +/// {@category Client} +/// {@category Entity} +final class SpinifyState$Closed extends SpinifyState { + /// Permanently closed + /// + /// {@macro state} + SpinifyState$Closed({DateTime? timestamp}) + : super(timestamp ?? DateTime.now()); + + @override + String get type => 'closed'; + + @override + String? get url => null; + + @override + bool get isDisconnected => false; + + @override + bool get isConnecting => false; + + @override + bool get isConnected => true; + + @override + bool get isClosed => false; + + @override + R map({ + required SpinifyStateMatch disconnected, + required SpinifyStateMatch connecting, + required SpinifyStateMatch connected, + required SpinifyStateMatch closed, + }) => + closed(this); + + @override + int get hashCode => 3; + + @override + bool operator ==(Object other) => + identical(this, other) || + (other is SpinifyState$Closed && + other.timestamp.isAtSameMomentAs(timestamp)); + + @override + String toString() => r'SpinifyState$Closed{}'; +} + +/// Pattern matching for [SpinifyState]. +/// {@category Entity} +typedef SpinifyStateMatch = R Function(S state); + +@immutable +abstract base class _$SpinifyStateBase { + const _$SpinifyStateBase(this.timestamp); + + /// Represents the current state type. + abstract final String type; + + /// URL of endpoint. + abstract final String? url; + + /// Disconnected state + abstract final bool isDisconnected; + + /// Connecting state + abstract final bool isConnecting; + + /// Connected state + abstract final bool isConnected; + + /// Closed state + abstract final bool isClosed; + + /// Timestamp of state change. + final DateTime timestamp; + + /// Pattern matching for [SpinifyState]. + R map({ + required SpinifyStateMatch disconnected, + required SpinifyStateMatch connecting, + required SpinifyStateMatch connected, + required SpinifyStateMatch closed, + }); + + /// Pattern matching for [SpinifyState]. + R maybeMap({ + required R Function() orElse, + SpinifyStateMatch? disconnected, + SpinifyStateMatch? connecting, + SpinifyStateMatch? connected, + SpinifyStateMatch? closed, + }) => + map( + disconnected: disconnected ?? (_) => orElse(), + connecting: connecting ?? (_) => orElse(), + connected: connected ?? (_) => orElse(), + closed: closed ?? (_) => orElse(), + ); + + /// Pattern matching for [SpinifyState]. + R? mapOrNull({ + SpinifyStateMatch? disconnected, + SpinifyStateMatch? connecting, + SpinifyStateMatch? connected, + SpinifyStateMatch? closed, + }) => + map( + disconnected: disconnected ?? (_) => null, + connecting: connecting ?? (_) => null, + connected: connected ?? (_) => null, + closed: closed ?? (_) => null, + ); + + Map toJson() => { + 'type': type, + 'timestamp': timestamp.toUtc().toIso8601String(), + if (url != null) 'url': url, + }; +} diff --git a/lib/src/model/states_stream.dart b/lib/src/model/states_stream.dart new file mode 100644 index 0000000..5c2996c --- /dev/null +++ b/lib/src/model/states_stream.dart @@ -0,0 +1,36 @@ +import 'dart:async'; + +import 'state.dart'; + +/// Stream of Spinify's [SpinifyState] changes. +/// {@category Client} +/// {@category Entity} +final class SpinifyStatesStream extends StreamView { + /// Stream of Spinify's [SpinifyState] changes. + SpinifyStatesStream(super.stream); + + /// Connection has not yet been established, but the WebSocket is trying. + late final Stream disconnected = + whereType(); + + /// Disconnected state + late final Stream connecting = + whereType(); + + /// Connected + late final Stream connected = + whereType(); + + /// Permanently closed + late final Stream closed = + whereType(); + + /// Filtered stream of data of [SpinifyState]. + Stream whereType() => + transform(StreamTransformer.fromHandlers( + handleData: (data, sink) => switch (data) { + T valid => sink.add(valid), + _ => null, + }, + )).asBroadcastStream(); +} diff --git a/lib/src/model/stream_position.dart b/lib/src/model/stream_position.dart new file mode 100644 index 0000000..c9047ab --- /dev/null +++ b/lib/src/model/stream_position.dart @@ -0,0 +1,5 @@ +import 'package:fixnum/fixnum.dart' as fixnum; + +/// Stream position. +/// {@category Entity} +typedef SpinifyStreamPosition = ({fixnum.Int64 offset, String epoch}); diff --git a/lib/src/model/subscribe.dart b/lib/src/model/subscribe.dart new file mode 100644 index 0000000..b855747 --- /dev/null +++ b/lib/src/model/subscribe.dart @@ -0,0 +1,37 @@ +import 'channel_push.dart'; +import 'stream_position.dart'; + +/// {@template subscribe} +/// Subscribe push from Centrifugo server. +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +final class SpinifySubscribe extends SpinifyChannelPush { + /// {@macro subscribe} + const SpinifySubscribe({ + required super.timestamp, + required super.channel, + required this.positioned, + required this.recoverable, + required this.data, + required this.streamPosition, + }); + + @override + String get type => 'subscribe'; + + /// Whether subscription is positioned. + final bool positioned; + + /// Whether subscription is recoverable. + final bool recoverable; + + /// Data attached to subscription. + final List data; + + /// Stream position. + final SpinifyStreamPosition? streamPosition; + + @override + String toString() => 'SpinifySubscribe{channel: $channel}'; +} diff --git a/lib/src/model/subscription_config.dart b/lib/src/model/subscription_config.dart new file mode 100644 index 0000000..306a503 --- /dev/null +++ b/lib/src/model/subscription_config.dart @@ -0,0 +1,102 @@ +import 'dart:async'; + +import 'package:fixnum/fixnum.dart' as fixnum; +import 'package:meta/meta.dart'; + +/// Token used for subscription. +/// {@category Subscription} +/// {@category Entity} +typedef SpinifySubscriptionToken = String; + +/// Callback to get token for subscription. +/// If method returns null then subscription will be established without token. +/// {@category Subscription} +/// {@category Entity} +typedef SpinifySubscriptionTokenCallback = FutureOr + Function(); + +/// Callback to set subscription payload data. +/// +/// If method returns null then no payload will be sent at subscribe time. + +/// {@category Subscription} +/// {@category Entity} +typedef SpinifySubscribePayloadCallback = FutureOr?> Function(); + +/// {@template subscription_config} +/// Subscription common options +/// +/// There are several common options available when +/// creating Subscription instance: +/// +/// - option to set subscription token and callback to get subscription token +/// upon expiration (see below more details) +/// - option to set subscription data +/// (attached to every subscribe/resubscribe request) +/// - options to tweak resubscribe backoff algorithm +/// - option to start Subscription since known +/// Stream Position (i.e. attempt recovery on first subscribe) +/// - option to ask server to make subscription positioned +/// (if not forced by a server) +/// - option to ask server to make subscription recoverable +/// (if not forced by a server) +/// - option to ask server to push Join/Leave messages +/// (if not forced by a server) +/// {@endtemplate} +/// {@category Subscription} +/// {@category Entity} +@immutable +class SpinifySubscriptionConfig { + /// {@macro subscription_config} + const SpinifySubscriptionConfig({ + this.getToken, + this.getPayload, + this.resubscribeInterval = ( + min: const Duration(milliseconds: 500), + max: const Duration(seconds: 20), + ), + this.since, + this.positioned = false, + this.recoverable = false, + this.joinLeave = false, + this.timeout = const Duration(seconds: 15), + }); + + /// Create a default config + /// + /// {@macro subscription_config} + @literal + const factory SpinifySubscriptionConfig.byDefault() = + SpinifySubscriptionConfig; + + /// Callback to get token for subscription + /// and get updated token upon expiration. + final SpinifySubscriptionTokenCallback? getToken; + + /// Data to send with subscription request. + /// Subscription `data` is attached to every subscribe/resubscribe request. + final SpinifySubscribePayloadCallback? getPayload; + + /// Resubscribe backoff algorithm + final ({Duration min, Duration max}) resubscribeInterval; + + /// Start Subscription [since] known Stream Position + /// (i.e. attempt recovery on first subscribe) + final ({fixnum.Int64 offset, String epoch})? since; + + /// Ask server to make subscription [positioned] (if not forced by a server) + final bool positioned; + + /// Ask server to make subscription [recoverable] (if not forced by a server) + final bool recoverable; + + /// Ask server to push Join/Leave messages (if not forced by a server) + final bool joinLeave; + + /// Maximum time to wait for the subscription to be established. + /// If not specified, the timeout will be 15 seconds. + final Duration timeout; + + @override + String toString() => 'SpinifySubscriptionConfig{}'; +} diff --git a/lib/src/model/subscription_state.dart b/lib/src/model/subscription_state.dart new file mode 100644 index 0000000..31a1f66 --- /dev/null +++ b/lib/src/model/subscription_state.dart @@ -0,0 +1,317 @@ +import 'package:fixnum/fixnum.dart' as fixnum; +import 'package:meta/meta.dart'; + +/// {@template subscription_state} +/// Subscription has 3 states: +/// +/// - `unsubscribed` +/// - `subscribing` +/// - `subscribed` +/// +/// When a new Subscription is created it has an `unsubscribed` state. +/// {@endtemplate} +/// {@category Subscription} +/// {@category Entity} +@immutable +sealed class SpinifySubscriptionState extends _$SpinifySubscriptionStateBase { + /// {@macro subscription_state} + const SpinifySubscriptionState( + {required super.timestamp, + required super.since, + required super.recoverable}); + + /// Unsubscribed + /// {@macro subscription_state} + factory SpinifySubscriptionState.unsubscribed({ + required int code, + required String reason, + DateTime? timestamp, + ({fixnum.Int64 offset, String epoch})? since, + bool recoverable, + }) = SpinifySubscriptionState$Unsubscribed; + + /// Subscribing + /// {@macro subscription_state} + factory SpinifySubscriptionState.subscribing({ + DateTime? timestamp, + ({fixnum.Int64 offset, String epoch})? since, + bool recoverable, + }) = SpinifySubscriptionState$Subscribing; + + /// Subscribed + /// {@macro subscription_state} + factory SpinifySubscriptionState.subscribed({ + DateTime? timestamp, + ({fixnum.Int64 offset, String epoch})? since, + bool recoverable, + DateTime? ttl, + }) = SpinifySubscriptionState$Subscribed; +} + +/// Unsubscribed state +/// +/// {@macro subscription_state} +/// {@category Subscription} +/// {@category Entity} +final class SpinifySubscriptionState$Unsubscribed + extends SpinifySubscriptionState { + /// {@macro subscription_state} + SpinifySubscriptionState$Unsubscribed({ + required this.code, + required this.reason, + DateTime? timestamp, + super.since, + super.recoverable = false, + }) : super(timestamp: timestamp ?? DateTime.now()); + + @override + String get type => 'unsubscribed'; + + /// Unsubscribe code. + final int code; + + /// Unsubscribe reason. + final String reason; + + @override + bool get isUnsubscribed => true; + + @override + bool get isSubscribing => false; + + @override + bool get isSubscribed => false; + + @override + R map({ + required SpinifySubscriptionStateMatch + unsubscribed, + required SpinifySubscriptionStateMatch + subscribing, + required SpinifySubscriptionStateMatch + subscribed, + }) => + unsubscribed(this); + + @override + Map toJson() => { + ...super.toJson(), + 'code': code, + 'reason': reason, + }; + + @override + int get hashCode => Object.hash(0, timestamp, since); + + @override + bool operator ==(Object other) => identical(this, other); + + @override + String toString() => r'SpinifySubscriptionState$Unsubscribed{}'; +} + +/// Subscribing state +/// +/// {@macro subscription_state} +/// {@category Subscription} +/// {@category Entity} +final class SpinifySubscriptionState$Subscribing + extends SpinifySubscriptionState { + /// {@macro subscription_state} + SpinifySubscriptionState$Subscribing({ + DateTime? timestamp, + super.since, + super.recoverable = false, + }) : super(timestamp: timestamp ?? DateTime.now()); + + @override + String get type => 'subscribing'; + + @override + bool get isUnsubscribed => false; + + @override + bool get isSubscribing => true; + + @override + bool get isSubscribed => false; + + @override + R map({ + required SpinifySubscriptionStateMatch + unsubscribed, + required SpinifySubscriptionStateMatch + subscribing, + required SpinifySubscriptionStateMatch + subscribed, + }) => + subscribing(this); + + @override + int get hashCode => Object.hash(1, timestamp, since); + + @override + bool operator ==(Object other) => identical(this, other); + + @override + String toString() => r'SpinifySubscriptionState$Subscribing{}'; +} + +/// Subscribed state +/// +/// {@macro subscription_state} +/// {@category Subscription} +/// {@category Entity} +final class SpinifySubscriptionState$Subscribed + extends SpinifySubscriptionState { + /// {@macro subscription_state} + SpinifySubscriptionState$Subscribed({ + DateTime? timestamp, + super.since, + super.recoverable = false, + this.ttl, + }) : super(timestamp: timestamp ?? DateTime.now()); + + @override + String get type => 'subscribed'; + + /// Time to live in seconds. + final DateTime? ttl; + + @override + bool get isUnsubscribed => false; + + @override + bool get isSubscribing => false; + + @override + bool get isSubscribed => true; + + @override + R map({ + required SpinifySubscriptionStateMatch + unsubscribed, + required SpinifySubscriptionStateMatch + subscribing, + required SpinifySubscriptionStateMatch + subscribed, + }) => + subscribed(this); + + @override + Map toJson() => { + ...super.toJson(), + if (ttl != null) 'ttl': ttl?.toUtc().toIso8601String(), + }; + + @override + int get hashCode => Object.hash(2, timestamp, since, recoverable, ttl); + + @override + bool operator ==(Object other) => identical(this, other); + + @override + String toString() => r'SpinifySubscriptionState$Subscribed{}'; +} + +/// Pattern matching for [SpinifySubscriptionState]. +/// {@category Entity} +typedef SpinifySubscriptionStateMatch = R + Function(S state); + +@immutable +abstract base class _$SpinifySubscriptionStateBase { + const _$SpinifySubscriptionStateBase({ + required this.timestamp, + required this.since, + required this.recoverable, + }); + + /// Represents the current state type. + abstract final String type; + + /// Timestamp of state change. + final DateTime timestamp; + + /// Stream Position + final ({fixnum.Int64 offset, String epoch})? since; + + /// Whether channel is recoverable. + final bool recoverable; + + /// Whether channel is unsubscribed. + abstract final bool isUnsubscribed; + + /// Whether channel is subscribing. + abstract final bool isSubscribing; + + /// Whether channel is subscribed. + abstract final bool isSubscribed; + + /// Pattern matching for [SpinifySubscriptionState]. + R map({ + required SpinifySubscriptionStateMatch + unsubscribed, + required SpinifySubscriptionStateMatch + subscribing, + required SpinifySubscriptionStateMatch + subscribed, + }); + + /// Pattern matching for [SpinifySubscriptionState]. + R maybeMap({ + required R Function() orElse, + SpinifySubscriptionStateMatch? + unsubscribed, + SpinifySubscriptionStateMatch? + subscribing, + SpinifySubscriptionStateMatch? + subscribed, + }) => + map( + unsubscribed: unsubscribed ?? (_) => orElse(), + subscribing: subscribing ?? (_) => orElse(), + subscribed: subscribed ?? (_) => orElse(), + ); + + /// Pattern matching for [SpinifySubscriptionState]. + R? mapOrNull({ + SpinifySubscriptionStateMatch? + unsubscribed, + SpinifySubscriptionStateMatch? + subscribing, + SpinifySubscriptionStateMatch? + subscribed, + }) => + map( + unsubscribed: unsubscribed ?? (_) => null, + subscribing: subscribing ?? (_) => null, + subscribed: subscribed ?? (_) => null, + ); + + Map toJson() => { + 'type': type, + 'timestamp': timestamp.toUtc().toIso8601String(), + if (since != null) + 'since': switch (since) { + (:fixnum.Int64 offset, :String epoch) => { + 'offset': offset, + 'epoch': epoch, + }, + _ => null, + }, + 'recoverable': recoverable, + }; +} diff --git a/lib/src/model/subscription_states_stream.dart b/lib/src/model/subscription_states_stream.dart new file mode 100644 index 0000000..abdd01c --- /dev/null +++ b/lib/src/model/subscription_states_stream.dart @@ -0,0 +1,39 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +import 'subscription_state.dart'; + +/// Stream of Spinify's [SpinifySubscriptionState] changes. +/// {@category Subscription} +/// {@category Entity} +@immutable +final class SpinifySubscriptionStateStream + extends StreamView { + /// Stream of Spinify's [SpinifySubscriptionState] changes. + SpinifySubscriptionStateStream(super.stream); + + /// Unsubscribed + late final Stream unsubscribed = + whereType(); + + /// Subscribing + late final Stream subscribing = + whereType(); + + /// Subscribed + late final Stream subscribed = + whereType(); + + /// Filtered stream of data of [SpinifySubscriptionState]. + Stream whereType() => + transform(StreamTransformer.fromHandlers( + handleData: (data, sink) => switch (data) { + T valid => sink.add(valid), + _ => null, + }, + )).asBroadcastStream(); + + @override + String toString() => 'SpinifySubscriptionStateStream{}'; +} diff --git a/lib/src/model/unsubscribe.dart b/lib/src/model/unsubscribe.dart new file mode 100644 index 0000000..c312cb0 --- /dev/null +++ b/lib/src/model/unsubscribe.dart @@ -0,0 +1,28 @@ +import 'channel_push.dart'; + +/// {@template unsubscribe} +/// Unsubscribe push from Centrifugo server. +/// {@endtemplate} +/// {@category Event} +/// {@subCategory Push} +final class SpinifyUnsubscribe extends SpinifyChannelPush { + /// {@macro unsubscribe} + const SpinifyUnsubscribe({ + required super.timestamp, + required super.channel, + required this.code, + required this.reason, + }); + + @override + String get type => 'unsubscribe'; + + /// Code of unsubscribe. + final int code; + + /// Reason of unsubscribe. + final String reason; + + @override + String toString() => 'SpinifyUnsubscribe{channel: $channel}'; +} diff --git a/lib/src/spinify_impl.dart b/lib/src/spinify_impl.dart new file mode 100644 index 0000000..1c709c8 --- /dev/null +++ b/lib/src/spinify_impl.dart @@ -0,0 +1,226 @@ +import 'dart:async'; + +import 'package:meta/meta.dart'; + +import '../src.old/subscription/subscription.dart'; +import 'event_bus.dart'; +import 'model/config.dart'; +import 'model/history.dart'; +import 'model/metrics.dart'; +import 'model/presence.dart'; +import 'model/presence_stats.dart'; +import 'model/pushes_stream.dart'; +import 'model/state.dart'; +import 'model/states_stream.dart'; +import 'model/stream_position.dart'; +import 'model/subscription_config.dart'; +import 'spinify_interface.dart'; + +/// Spinify client events. +@internal +abstract interface class ClientEvents { + static const String prefix = 'client'; + static const String init = '${prefix}_init'; + static const String close = '${prefix}_close'; +} + +/// Base class for Spinify client. +abstract base class SpinifyBase implements ISpinify { + /// Create a new Spinify client. + SpinifyBase(this.config) : id = _idCounter++ { + _bucket = SpinifyEventBus.instance.registerClient(this); + _init(); + } + + /// Unique client ID counter for Spinify clients. + static int _idCounter = 0; + + @override + final int id; + + @override + bool get isClosed => _isClosed; + bool _isClosed = false; + + /// Spinify config. + @override + @nonVirtual + final SpinifyConfig config; + + /// Event Bus Bucket for client events and event subscriptions. + late final SpinifyEventBus$Bucket _bucket; + + @mustCallSuper + void _init() { + _bucket + ..pushEvent(ClientEvents.init) + ..subscribe(ClientEvents.close, _onClose); + } + + @mustCallSuper + Future _onClose(_) async { + _isClosed = true; + SpinifyEventBus.instance.unregisterClient(this); + } + + @override + @mustCallSuper + Future close() async { + if (_isClosed) return; + await _bucket.pushEvent(ClientEvents.close); + } + + @override + // ignore: avoid_equals_and_hash_code_on_mutable_classes + int get hashCode => id; + + @override + // ignore: avoid_equals_and_hash_code_on_mutable_classes + bool operator ==(Object other) => + identical(this, other) || other is SpinifyBase && id == other.id; + + @override + String toString() => 'Spinify{}'; +} + +base mixin SpinifyStateMixin on SpinifyBase { + @override + SpinifyState get state => throw UnimplementedError(); + + @override + SpinifyStatesStream get states => throw UnimplementedError(); +} + +base mixin SpinifyConnectionMixin on SpinifyBase { + @override + Future connect(String url) { + throw UnimplementedError(); + } + + @override + FutureOr ready() { + throw UnimplementedError(); + } + + @override + Future disconnect([int code = 0, String reason = 'Disconnect called']) { + throw UnimplementedError(); + } +} + +base mixin SpinifySendMixin on SpinifyBase { + @override + Future send(List data) { + throw UnimplementedError(); + } +} + +base mixin SpinifyClientSubscriptionMixin on SpinifyBase { + @override + ({ + Map client, + Map server + }) get subscriptions => throw UnimplementedError(); + + @override + SpinifyClientSubscription? getSubscription(String channel) { + throw UnimplementedError(); + } + + @override + SpinifyClientSubscription newSubscription(String channel, + [SpinifySubscriptionConfig? config]) { + throw UnimplementedError(); + } + + @override + Future removeSubscription(SpinifyClientSubscription subscription) { + throw UnimplementedError(); + } +} + +base mixin SpinifyServerSubscriptionMixin on SpinifyBase {} + +base mixin SpinifyPublicationsMixin on SpinifyBase { + @override + Future publish(String channel, List data) { + throw UnimplementedError(); + } +} + +base mixin SpinifyPresenceMixin on SpinifyBase { + @override + Future presence(String channel) { + throw UnimplementedError(); + } + + @override + Future presenceStats(String channel) { + throw UnimplementedError(); + } +} + +base mixin SpinifyHistoryMixin on SpinifyBase { + @override + Future history(String channel, + {int? limit, SpinifyStreamPosition? since, bool? reverse}) { + throw UnimplementedError(); + } +} + +base mixin SpinifyRPCMixin on SpinifyBase { + @override + Future> rpc(String method, List data) { + throw UnimplementedError(); + } +} + +base mixin SpinifyMetricsMixin on SpinifyBase { + @override + SpinifyMetrics get metrics => throw UnimplementedError(); +} + +/// {@template spinify} +/// Spinify client for Centrifuge. +/// +/// Centrifugo SDKs use WebSocket as the main data transport and send/receive +/// messages encoded according to our bidirectional protocol. +/// That protocol is built on top of the Protobuf schema +/// (both JSON and binary Protobuf formats are supported). +/// It provides asynchronous communication, sending RPC, +/// multiplexing subscriptions to channels, etc. +/// +/// Client SDK wraps the protocol and exposes a set of APIs to developers. +/// +/// Client connection has 4 states: +/// - [SpinifyState$Disconnected] +/// - [SpinifyState$Connecting] +/// - [SpinifyState$Connected] +/// - [SpinifyState$Closed] +/// +/// {@endtemplate} +/// {@category Client} +final class Spinify extends SpinifyBase + with + SpinifyStateMixin, + SpinifyConnectionMixin, + SpinifySendMixin, + SpinifyClientSubscriptionMixin, + SpinifyServerSubscriptionMixin, + SpinifyPublicationsMixin, + SpinifyPresenceMixin, + SpinifyHistoryMixin, + SpinifyRPCMixin, + SpinifyMetricsMixin { + /// {@macro spinify} + Spinify([SpinifyConfig? config]) : super(config ?? SpinifyConfig.byDefault()); + + /// Create client and connect. + /// + /// {@macro spinify} + factory Spinify.connect(String url, [SpinifyConfig? config]) => + Spinify(config)..connect(url); + + @override + SpinifyPushesStream get stream => throw UnimplementedError(); +} diff --git a/lib/src/client/spinify_interface.dart b/lib/src/spinify_interface.dart similarity index 86% rename from lib/src/client/spinify_interface.dart rename to lib/src/spinify_interface.dart index 52db27e..98de412 100644 --- a/lib/src/client/spinify_interface.dart +++ b/lib/src/spinify_interface.dart @@ -1,17 +1,16 @@ -// ignore_for_file: one_member_abstracts - import 'dart:async'; -import '../../src.old/client/state.dart'; -import '../../src.old/client/states_stream.dart'; -import '../../src.old/model/history.dart'; -import '../../src.old/model/metrics.dart'; -import '../../src.old/model/presence.dart'; -import '../../src.old/model/presence_stats.dart'; -import '../../src.old/model/pushes_stream.dart'; -import '../../src.old/model/stream_position.dart'; -import '../../src.old/subscription/subscription.dart'; -import '../../src.old/subscription/subscription_config.dart'; +import '../src.old/subscription/subscription.dart'; +import 'model/config.dart'; +import 'model/history.dart'; +import 'model/metrics.dart'; +import 'model/presence.dart'; +import 'model/presence_stats.dart'; +import 'model/pushes_stream.dart'; +import 'model/state.dart'; +import 'model/states_stream.dart'; +import 'model/stream_position.dart'; +import 'model/subscription_config.dart'; /// Spinify client interface. abstract interface class ISpinify @@ -25,6 +24,15 @@ abstract interface class ISpinify ISpinifyHistoryOwner, ISpinifyRemoteProcedureCall, ISpinifyMetricsOwner { + /// Unique client identifier. + abstract final int id; + + /// Spinify configuration. + abstract final SpinifyConfig config; + + /// True if client is closed. + bool get isClosed; + /// Connect to the server. /// [url] is a URL of endpoint. Future connect(String url); @@ -55,12 +63,14 @@ abstract interface class ISpinifyStateOwner { } /// Spinify send publication interface. +// ignore: one_member_abstracts abstract interface class ISpinifyPublicationSender { /// Publish data to specific subscription channel Future publish(String channel, List data); } /// Spinify send asynchronous message interface. +// ignore: one_member_abstracts abstract interface class ISpinifyAsyncMessageSender { /// Send asynchronous message to a server. This method makes sense /// only when using Centrifuge library for Go on a server side. In Centrifugo @@ -122,6 +132,7 @@ abstract interface class ISpinifyPresenceOwner { } /// Spinify history owner interface. +// ignore: one_member_abstracts abstract interface class ISpinifyHistoryOwner { /// Fetch publication history inside a channel. /// Only for channels where history is enabled. @@ -134,6 +145,7 @@ abstract interface class ISpinifyHistoryOwner { } /// Spinify remote procedure call interface. +// ignore: one_member_abstracts abstract interface class ISpinifyRemoteProcedureCall { /// Send arbitrary RPC and wait for response. Future> rpc(String method, List data); diff --git a/lib/src/subscription_interface.dart b/lib/src/subscription_interface.dart new file mode 100644 index 0000000..c5e26d3 --- /dev/null +++ b/lib/src/subscription_interface.dart @@ -0,0 +1,157 @@ +import 'dart:async'; + +import 'model/history.dart'; +import 'model/presence.dart'; +import 'model/presence_stats.dart'; +import 'model/pushes_stream.dart'; +import 'model/stream_position.dart'; +import 'model/subscription_state.dart'; +import 'model/subscription_states_stream.dart'; + +/// {@template subscription} +/// Spinify subscription interface. +/// +/// Client allows subscribing on channels. +/// This can be done by creating Subscription object. +/// +/// ```dart +/// final subscription = client.newSubscription('chat'); +/// await subscription.subscribe(); +/// ``` +/// When anewSubscription method is called Client allocates a new +/// Subscription instance and saves it in the internal subscription registry. +/// Having a registry of allocated subscriptions allows SDK to manage +/// resubscribes upon reconnecting to a server. +/// Centrifugo connectors do not allow creating two subscriptions to the +/// same channel – in this case, newSubscription can throw an exception. +/// +/// Subscription has 3 states: +/// - [SpinifySubscriptionState$Unsubscribed] +/// - [SpinifySubscriptionState$Subscribing] +/// - [SpinifySubscriptionState$Subscribed] +/// +/// When a new Subscription is created it has an unsubscribed state. +/// +/// - For client-side subscriptions see [SpinifyClientSubscription]. +/// - For server-side subscriptions see [SpinifyServerSubscription]. +/// {@endtemplate} +/// {@category Subscription} +sealed class SpinifySubscription { + /// Channel name. + abstract final String channel; + + /// Current subscription state. + abstract final SpinifySubscriptionState state; + + /// Offset of last successfully received message. + abstract final SpinifyStreamPosition? since; + + /// Stream of subscription states. + abstract final SpinifySubscriptionStateStream states; + + /// Stream of received pushes from Centrifugo server for a channel. + abstract final SpinifyPushesStream stream; + + /// Await for subscription to be ready. + /// Ready resolves when subscription successfully subscribed. + /// Throws exceptions if called not in subscribing or subscribed state. + FutureOr ready(); + + /// Publish data to current Subscription channel + Future publish(List data); + + /// Fetch publication history inside a channel. + /// Only for channels where history is enabled. + Future history({ + int? limit, + SpinifyStreamPosition? since, + bool? reverse, + }); + + /// Fetch presence information inside a channel. + Future presence(); + + /// Fetch presence stats information inside a channel. + Future presenceStats(); +} + +/// {@template client_subscription} +/// # Centrifuge client-side subscription representation. +/// +/// Client allows subscribing on channels. +/// This can be done by creating Subscription object. +/// +/// When a newSubscription method is called Client allocates a new Subscription +/// instance and saves it in the internal subscription registry. +/// Having a registry of allocated subscriptions allows SDK to manage +/// resubscribes upon reconnecting to a server. +/// +/// Centrifugo connectors do not allow creating two subscriptions +/// to the same channel – in this case, newSubscription can throw an exception. +/// +/// ## Subscription has 3 states: +/// +/// - `unsubscribed` +/// - `subscribing` +/// - `subscribed` +/// +/// When a new Subscription is created it has an `unsubscribed` state. +/// +/// ## Subscription common options +/// +/// There are several common options available when +/// creating Subscription instance: +/// +/// - option to set subscription token and callback to get subscription token +/// upon expiration (see below more details) +/// - option to set subscription data +/// (attached to every subscribe/resubscribe request) +/// - options to tweak resubscribe backoff algorithm +/// - option to start Subscription since known +/// Stream Position (i.e. attempt recovery on first subscribe) +/// - option to ask server to make subscription positioned +/// (if not forced by a server) +/// - option to ask server to make subscription recoverable +/// (if not forced by a server) +/// - option to ask server to push Join/Leave messages +/// (if not forced by a server) +/// +/// ## Subscription methods +/// +/// - subscribe() – start subscribing to a channel +/// - unsubscribe() - unsubscribe from a channel +/// - publish(data) - publish data to Subscription channel +/// - history(options) - request Subscription channel history +/// - presence() - request Subscription channel online presence information +/// - presenceStats() - request Subscription channel online presence stats +/// information (number of client connections and unique users in a channel). +/// +/// {@endtemplate} +/// {@category Subscription} +/// {@subCategory Client-side} +abstract class SpinifyClientSubscription extends SpinifySubscription { + /// Start subscribing to a channel + Future subscribe(); + + /// Unsubscribe from a channel + Future unsubscribe([ + int code = 0, + String reason = 'unsubscribe called', + ]); +} + +/// {@template server_subscription} +/// Centrifuge server-side subscription representation. +/// +/// We encourage using client-side subscriptions where possible +/// as they provide a better control and isolation from connection. +/// But in some cases you may want to use server-side subscriptions +/// (i.e. subscriptions created by server upon connection establishment). +/// +/// Technically, client SDK keeps server-side subscriptions +/// in internal registry, similar to client-side subscriptions +/// but without possibility to control them. +/// {@endtemplate} +/// {@category Subscription} +/// {@subCategory Server-side} +abstract class SpinifyServerSubscription extends SpinifySubscription {} diff --git a/pubspec.yaml b/pubspec.yaml index 17e4f48..003286a 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -38,7 +38,7 @@ platforms: environment: - sdk: '>=3.0.0 <4.0.0' + sdk: '>=3.3.0 <4.0.0' dependencies: diff --git a/test/unit/spinify_test.dart b/test/unit/spinify_test.dart index 8275090..fbe32a9 100644 --- a/test/unit/spinify_test.dart +++ b/test/unit/spinify_test.dart @@ -1,13 +1,29 @@ -import 'package:spinify/spinify.old.dart'; +import 'package:spinify/spinify.dart'; import 'package:test/test.dart'; -void main() => group('Spinify', () { - const url = 'ws://localhost:8000/connection/websocket'; +void main() { + group('Spinify', () { + test('Create_and_close_client', () async { + final client = Spinify(); + expect(client.isClosed, isFalse); + await client.close(); + expect(client.isClosed, isTrue); + }); + + test('Create_and_close_multiple_clients', () async { + final clients = List.generate(10, (_) => Spinify()); + expect(clients.every((client) => !client.isClosed), isTrue); + await Future.wait(clients.map((client) => client.close())); + expect(clients.every((client) => client.isClosed), isTrue); + }); + + /* const url = 'ws://localhost:8000/connection/websocket'; test('Connection', () async { final client = Spinify(); await client.connect(url); expect(client.state, isA()); await client.disconnect(); expect(client.state, isA()); - }); - }); + }); */ + }); +}