Skip to content

Commit

Permalink
ClientSubscriptionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 23, 2023
1 parent 9a113de commit 3c17028
Show file tree
Hide file tree
Showing 12 changed files with 472 additions and 34 deletions.
3 changes: 3 additions & 0 deletions lib/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ export 'package:centrifuge_dart/src/model/exception.dart';
export 'package:centrifuge_dart/src/model/jwt.dart';
export 'package:centrifuge_dart/src/model/state.dart';
export 'package:centrifuge_dart/src/model/states_stream.dart';
export 'package:centrifuge_dart/src/model/subscription.dart';
export 'package:centrifuge_dart/src/model/subscription_config.dart';
export 'package:centrifuge_dart/src/model/subscription_state.dart';
4 changes: 4 additions & 0 deletions lib/interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@ export 'src/model/config.dart';
export 'src/model/exception.dart';
export 'src/model/jwt.dart';
export 'src/model/state.dart';
export 'src/model/states_stream.dart';
export 'src/model/subscription.dart';
export 'src/model/subscription_config.dart';
export 'src/model/subscription_state.dart';
84 changes: 78 additions & 6 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import 'package:centrifuge_dart/src/model/config.dart';
import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/state.dart';
import 'package:centrifuge_dart/src/model/states_stream.dart';
import 'package:centrifuge_dart/src/model/subscription.dart';
import 'package:centrifuge_dart/src/model/subscription_config.dart';
import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
import 'package:centrifuge_dart/src/transport/ws_protobuf_transport.dart';
import 'package:centrifuge_dart/src/util/logger.dart' as logger;
Expand All @@ -15,10 +18,13 @@ import 'package:stack_trace/stack_trace.dart' as st;
/// Centrifuge client.
/// {@endtemplate}
final class Centrifuge extends CentrifugeBase
with CentrifugeErrorsMixin, CentrifugeConnectionMixin {
with
CentrifugeErrorsMixin,
CentrifugeConnectionMixin,
CentrifugeClientSubscriptionMixin {
/// {@macro centrifuge}
Centrifuge([CentrifugeConfig? config])
: super(config ?? CentrifugeConfig.defaultConfig());
: super(config ?? CentrifugeConfig.byDefault());

/// Create client and connect.
///
Expand All @@ -31,17 +37,19 @@ final class Centrifuge extends CentrifugeBase
@internal
abstract base class CentrifugeBase implements ICentrifuge {
/// {@nodoc}
CentrifugeBase(CentrifugeConfig config)
: _transport = CentrifugeWebSocketProtobufTransport(config),
_config = config {
CentrifugeBase(CentrifugeConfig config) : _config = config {
_transport = CentrifugeWebSocketProtobufTransport(
config: config,
disconnectCallback: _onDisconnect,
);
_initCentrifuge();
}

/// Internal transport responsible
/// for sending, receiving, encoding and decoding data from the server.
/// {@nodoc}
@nonVirtual
final ICentrifugeTransport _transport;
late final ICentrifugeTransport _transport;

@override
@nonVirtual
Expand All @@ -63,6 +71,15 @@ abstract base class CentrifugeBase implements ICentrifuge {
@mustCallSuper
void _initCentrifuge() {}

/// Called when connection lost.
/// Right before [CentrifugeState$Disconnected] state.
/// {@nodoc}
@protected
@mustCallSuper
void _onDisconnect() {
logger.fine('Connection lost');
}

@override
@mustCallSuper
Future<void> close() async {}
Expand Down Expand Up @@ -139,3 +156,58 @@ base mixin CentrifugeConnectionMixin on CentrifugeBase, CentrifugeErrorsMixin {
await _transport.close();
}
}

/// Mixin responsible for client-side subscriptions.
/// {@nodoc}
base mixin CentrifugeClientSubscriptionMixin
on CentrifugeBase, CentrifugeErrorsMixin {
static final ClientSubscriptionManager _clientSubscriptionManager =
ClientSubscriptionManager();

@override
CentrifugeClientSubscription newSubscription(
String channel, [
CentrifugeSubscriptionConfig? config,
]) =>
_clientSubscriptionManager.newSubscription(channel, config, this);

@override
Map<String, CentrifugeClientSubscription> get subscriptions =>
_clientSubscriptionManager.subscriptions;

@override
CentrifugeClientSubscription? getSubscription(String channel) =>
_clientSubscriptionManager[channel];

@override
Future<void> removeSubscription(
CentrifugeClientSubscription subscription,
) async {
try {
await _clientSubscriptionManager.removeSubscription(subscription);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeSubscriptionException(
subscription: subscription,
message: 'Error while unsubscribing',
error: error,
);
_emitError(centrifugeException, stackTrace);
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}

@override
void _onDisconnect() {
super._onDisconnect();
_clientSubscriptionManager.disconnectAllFor(this).ignore();
}

@override
Future<void> close() async {
await super.close();
_clientSubscriptionManager.removeAllFor(this).ignore();
}
}
50 changes: 43 additions & 7 deletions lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
// ignore_for_file: one_member_abstracts

import 'package:centrifuge_dart/centrifuge.dart';

/// Centrifuge client interface.
abstract interface class ICentrifuge {
/// State of client.
CentrifugeState get state;

/// Stream of client states.
abstract final CentrifugeStatesStream states;

abstract interface class ICentrifuge
implements ICentrifugeStateOwner, ICentrifugeClientSubscriptionsManager {
/// Stream of errors.
abstract final Stream<
({CentrifugeException exception, StackTrace stackTrace})> errors;
Expand Down Expand Up @@ -37,3 +34,42 @@ abstract interface class ICentrifuge {
/// Send arbitrary RPC and wait for response.
/* Future<void> rpc(String method, data); */
}

/// Centrifuge client state owner interface.
abstract interface class ICentrifugeStateOwner {
/// State of client.
CentrifugeState get state;

/// Stream of client states.
abstract final CentrifugeStatesStream states;
}

/// Centrifuge client subscriptions manager interface.
abstract interface class ICentrifugeClientSubscriptionsManager {
/// Create new client-side subscription.
/// `newSubscription(channel, config)` allocates a new Subscription
/// in the registry or throws an exception if the Subscription
/// is already there. We will discuss common Subscription options below.
CentrifugeClientSubscription newSubscription(
String channel, [
CentrifugeSubscriptionConfig? config,
]);

/// Get subscription to the channel
/// from internal registry or null if not found.
///
/// You need to call [CentrifugeClientSubscription.subscribe]
/// to start receiving events
/// in the channel.
CentrifugeClientSubscription? getSubscription(String channel);

/// Remove the [Subscription] from internal registry
/// and unsubscribe from [CentrifugeClientSubscription.channel].
Future<void> removeSubscription(CentrifugeClientSubscription subscription);

/// Get map wirth all registered client-side subscriptions.
/// Returns all registered subscriptions,
/// so you can iterate over all and do some action if required
/// (for example, you want to unsubscribe/remove all subscriptions).
Map<String, CentrifugeClientSubscription> get subscriptions;
}
14 changes: 6 additions & 8 deletions lib/src/model/config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,15 @@ final class CentrifugeConfig {
CentrifugeConfig({
this.getToken,
this.getPayload,
({Duration min, Duration max})? connectionRetryInterval,
this.connectionRetryInterval = (
min: const Duration(milliseconds: 500),
max: const Duration(seconds: 20),
),
({String name, String version})? client,
this.timeout = const Duration(seconds: 15),
this.serverPingDelay = const Duration(seconds: 8),
this.headers,
}) : connectionRetryInterval = connectionRetryInterval ??
(
min: const Duration(milliseconds: 500),
max: const Duration(seconds: 30),
),
client = client ??
}) : client = client ??
(
name: Pubspec.name,
version: Pubspec.version.canonical,
Expand All @@ -54,7 +52,7 @@ final class CentrifugeConfig {
/// Create a default config
///
/// {@macro centrifuge_config}
factory CentrifugeConfig.defaultConfig() = CentrifugeConfig;
factory CentrifugeConfig.byDefault() = CentrifugeConfig;

/// Callback to get/refresh tokens
/// This callback is used for initial connection
Expand Down
36 changes: 36 additions & 0 deletions lib/src/model/disconnect_code.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import 'package:meta/meta.dart';

/// Disconnect codes.
///
/// Server may send custom disconnect codes to a client.
/// Custom disconnect codes must be in range [3000, 4999].
///
/// Client automatically reconnects upon receiving code
/// in range 3000-3499, 4000-4499 (i.e. Client goes to connecting state).
/// Other codes result into going to disconnected state.
///
/// Client implementation can use codes <3000 for client-side
/// specific disconnect reasons.
/// {@nodoc}
@internal
enum DisconnectCodes {
/// Disconnect called
disconnectCalled(0, 'disconnect called'),

/// Unauthorized
unauthorized(1, 'unauthorized'),

/// Bad protocol
badProtocol(2, 'bad protocol'),

/// Client message write error
messageSizeLimit(3, 'message size limit exceeded');

const DisconnectCodes(this.code, this.reason);

/// Disconnect code.
final int code;

/// Disconnect reason.
final String reason;
}
18 changes: 18 additions & 0 deletions lib/src/model/exception.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'package:centrifuge_dart/interface.dart';
import 'package:meta/meta.dart';

/// {@template exception}
Expand Down Expand Up @@ -82,3 +83,20 @@ final class CentrifugePingException extends CentrifugeException {
error,
);
}

/// {@macro exception}
final class CentrifugeSubscriptionException extends CentrifugeException {
/// {@macro exception}
const CentrifugeSubscriptionException({
required this.subscription,
required String message,
Object? error,
}) : super(
'centrifuge_subscription_exception',
message,
error,
);

/// Subscription
final ICentrifugeSubscription subscription;
}
1 change: 0 additions & 1 deletion lib/src/model/messages.dart

This file was deleted.

38 changes: 33 additions & 5 deletions lib/src/model/subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ abstract interface class ICentrifugeSubscription {
}

/// {@template client_subscription}
/// Centrifuge client-side subscription representation.
/// # Centrifuge client-side subscription representation.
///
/// Client allows subscribing on channels.
/// This can be done by creating Subscription object.
Expand All @@ -23,20 +23,48 @@ abstract interface class ICentrifugeSubscription {
/// Centrifugo connectors do not allow creating two subscriptions
/// to the same channel – in this case, newSubscription can throw an exception.
///
/// Subscription has 3 states:
/// ## Subscription has 3 states:
///
/// - `unsubscribed`
/// - `subscribing`
/// - `subscribed`
///
/// When a new Subscription is created it has an `unsubscribed` state.
///
/// ## Subscription common options
///
/// There are several common options available when
/// creating Subscription instance:
///
/// - option to set subscription token and callback to get subscription token
/// upon expiration (see below more details)
/// - option to set subscription data
/// (attached to every subscribe/resubscribe request)
/// - options to tweak resubscribe backoff algorithm
/// - option to start Subscription since known
/// Stream Position (i.e. attempt recovery on first subscribe)
/// - option to ask server to make subscription positioned
/// (if not forced by a server)
/// - option to ask server to make subscription recoverable
/// (if not forced by a server)
/// - option to ask server to push Join/Leave messages
/// (if not forced by a server)
///
/// ## Subscription methods
///
/// - subscribe() – start subscribing to a channel
/// - unsubscribe() - unsubscribe from a channel
/// - publish(data) - publish data to Subscription channel
/// - history(options) - request Subscription channel history
/// - presence() - request Subscription channel online presence information
/// - presenceStats() - request Subscription channel online presence stats
/// information (number of client connections and unique users in a channel).
///
/// {@endtemplate}
@immutable
final class CentrifugeClientSubscription implements ICentrifugeSubscription {
/// {@macro client_subscription}
const CentrifugeClientSubscription({
required this.channel,
});
const CentrifugeClientSubscription({required this.channel});

@override
final String channel;
Expand Down
Loading

0 comments on commit 3c17028

Please sign in to comment.