Skip to content

Commit

Permalink
Update subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Aug 3, 2023
1 parent 0f8a39d commit 1788237
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 71 deletions.
4 changes: 1 addition & 3 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,7 @@ base mixin CentrifugeConnectionMixin
try {
_refreshTimer?.cancel();
_refreshTimer = null;
final subs = _serverSubscriptionManager.subscriptions.values
.toList(growable: false);
await _transport.connect(url, subs);
await _transport.connect(url, _serverSubscriptionManager);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
Expand Down
45 changes: 31 additions & 14 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ abstract base class CentrifugeClientSubscriptionBase
@override
CentrifugeStreamPosition? get since => switch (state.since?.epoch) {
String epoch => (epoch: epoch, offset: _offset),
_ => null,
_ => state.since,
};

/// Weak reference to transport.
Expand Down Expand Up @@ -133,7 +133,14 @@ abstract base class CentrifugeClientSubscriptionBase
/// {@nodoc}
@internal
@mustCallSuper
Future<void> close() async {
Future<void> close([int code = 0, String reason = 'closed']) async {
if (!_state.isUnsubscribed)
_setState(CentrifugeSubscriptionState.unsubscribed(
code: code,
reason: reason,
recoverable: false,
since: since,
));
_stateController.close().ignore();
}
}
Expand Down Expand Up @@ -223,8 +230,8 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin
}

@override
Future<void> close() async {
await super.close();
Future<void> close([int code = 0, String reason = 'closed']) async {
await super.close(code, reason);
for (final controller in <StreamSink<CentrifugeEvent>>[
_pushController,
_publicationsController,
Expand Down Expand Up @@ -263,8 +270,8 @@ base mixin CentrifugeClientSubscriptionErrorsMixin
errors = _errorsController.stream;

@override
Future<void> close() async {
await super.close();
Future<void> close([int code = 0, String reason = 'closed']) async {
await super.close(code, reason);
_errorsController.close().ignore();
}
}
Expand Down Expand Up @@ -294,7 +301,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
_refreshTimer?.cancel();
_refreshTimer = null;
_setState(CentrifugeSubscriptionState$Subscribing(
since: since ?? state.since,
since: since,
recoverable: state.recoverable,
));
final subscribed = await _transport.subscribe(
Expand All @@ -311,7 +318,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
final offset = subscribed.since?.offset;
if (offset != null && offset > _offset) _offset = offset;
_setState(CentrifugeSubscriptionState$Subscribed(
since: subscribed.since ?? since ?? state.since,
since: subscribed.since ?? since,
recoverable: subscribed.recoverable,
ttl: subscribed.ttl,
));
Expand Down Expand Up @@ -389,9 +396,10 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
_setState(CentrifugeSubscriptionState.unsubscribed(
code: code,
reason: reason,
since: since ?? state.since,
since: since,
recoverable: state.recoverable,
));
if (_transport.state.isClosed) return;
try {
await _transport.unsubscribe(channel, _config);
} on Object catch (error, stackTrace) {
Expand Down Expand Up @@ -451,12 +459,21 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
}).ignore();

@override
Future<void> close() async {
Future<void> close([int code = 0, String reason = 'closed']) async {
logger.fine('Closing subscription to $channel');
_refreshTimer?.cancel();
_refreshTimer = null;
await super.close();
await _transport.close();
try {
if (!state.isUnsubscribed) await unsubscribe(code, reason);
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeSubscriptionException(
message: 'Error while unsubscribing from channel $channel',
channel: channel,
error: error,
);
_emitError(centrifugeException, stackTrace);
}
await super.close(code, reason);
}
}

Expand Down Expand Up @@ -617,7 +634,7 @@ base mixin CentrifugeClientSubscriptionQueueMixin
.push<CentrifugePresenceStats>('presenceStats', super.presenceStats);

@override
Future<void> close() => _eventQueue
.push<void>('close', super.close)
Future<void> close([int code = 0, String reason = 'closed']) => _eventQueue
.push<void>('close', () => super.close(code, reason))
.whenComplete(_eventQueue.close);
}
67 changes: 57 additions & 10 deletions lib/src/subscription/server_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import 'package:centrifuge_dart/src/subscription/subscription_state.dart';
import 'package:centrifuge_dart/src/subscription/subscription_states_stream.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
import 'package:centrifuge_dart/src/util/event_queue.dart';
import 'package:centrifuge_dart/src/util/logger.dart' as logger;
import 'package:fixnum/fixnum.dart' as fixnum;
import 'package:meta/meta.dart';
import 'package:stack_trace/stack_trace.dart' as st;
Expand Down Expand Up @@ -64,7 +65,7 @@ abstract base class CentrifugeServerSubscriptionBase
@override
CentrifugeStreamPosition? get since => switch (state.since?.epoch) {
String epoch => (epoch: epoch, offset: _offset),
_ => null,
_ => state.since,
};

/// Offset of last received publication.
Expand Down Expand Up @@ -127,7 +128,14 @@ abstract base class CentrifugeServerSubscriptionBase
/// {@nodoc}
@internal
@mustCallSuper
Future<void> close() async {
Future<void> close([int code = 0, String reason = 'closed']) async {
if (!_state.isUnsubscribed)
_setState(CentrifugeSubscriptionState.unsubscribed(
code: 0,
reason: 'closed',
recoverable: false,
since: since,
));
_stateController.close().ignore();
}
}
Expand Down Expand Up @@ -207,15 +215,15 @@ base mixin CentrifugeServerSubscriptionEventReceiverMixin
final offset = sub.streamPosition?.offset;
if (offset != null && offset > _offset) _offset = offset;
_setState(CentrifugeSubscriptionState.subscribed(
since: sub.streamPosition ?? since ?? state.since,
since: sub.streamPosition ?? since,
recoverable: sub.recoverable,
));
case CentrifugeUnsubscribe unsub:
_setState(CentrifugeSubscriptionState.unsubscribed(
code: unsub.code,
reason: unsub.reason,
recoverable: state.recoverable,
since: since ?? state.since,
since: since,
));
case CentrifugeConnect _:
break;
Expand All @@ -227,8 +235,8 @@ base mixin CentrifugeServerSubscriptionEventReceiverMixin
}

@override
Future<void> close() async {
await super.close();
Future<void> close([int code = 0, String reason = 'closed']) async {
await super.close(code, reason);
for (final controller in <StreamSink<CentrifugeEvent>>[
_pushController,
_publicationsController,
Expand Down Expand Up @@ -267,8 +275,8 @@ base mixin CentrifugeServerSubscriptionErrorsMixin
errors = _errorsController.stream;

@override
Future<void> close() async {
await super.close();
Future<void> close([int code = 0, String reason = 'closed']) async {
await super.close(code, reason);
_errorsController.close().ignore();
}
}
Expand Down Expand Up @@ -309,6 +317,45 @@ base mixin CentrifugeServerSubscriptionReadyMixin
Error.throwWithStackTrace(centrifugeException, stackTrace);
}
}

/// Mark subscription as ready.
/// {@nodoc}
void setSubscribed() {
if (!state.isSubscribed)
_setState(CentrifugeSubscriptionState.subscribed(
since: since,
recoverable: state.recoverable,
));
}

/// Mark subscription as subscribing.
/// {@nodoc}
void setSubscribing() {
if (!state.isSubscribing)
_setState(CentrifugeSubscriptionState.subscribing(
since: since,
recoverable: state.recoverable,
));
}

/// Mark subscription as unsubscribed.
/// {@nodoc}
void setUnsubscribed(int code, String reason) {
if (!state.isUnsubscribed)
_setState(CentrifugeSubscriptionState.unsubscribed(
code: code,
reason: reason,
recoverable: state.recoverable,
since: since,
));
}

@override
Future<void> close([int code = 0, String reason = 'closed']) async {
logger.fine('Closing subscription to $channel');
if (!state.isUnsubscribed) setUnsubscribed(code, reason);
await super.close(code, reason);
}
}

/// Mixin responsible for publishing.
Expand Down Expand Up @@ -452,7 +499,7 @@ base mixin CentrifugeServerSubscriptionQueueMixin
.push<CentrifugePresenceStats>('presenceStats', super.presenceStats);

@override
Future<void> close() => _eventQueue
.push<void>('close', super.close)
Future<void> close([int code = 0, String reason = 'closed']) => _eventQueue
.push<void>('close', () => super.close(code, reason))
.whenComplete(_eventQueue.close);
}
43 changes: 38 additions & 5 deletions lib/src/subscription/server_subscription_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,31 +43,64 @@ final class ServerSubscriptionManager {
/// Called on [CentrifugeUnsubscribe] push from server.
void unsubscribe(CentrifugeUnsubscribe subscribe) {}

/// Called when client finished connection handshake with server.
/// Add non existing subscriptions to registry and mark all connected.
/// Remove subscriptions which are not in [subs] argument.
void upsert(List<CentrifugeSubscribe> subs) {
final currentChannels = _channelSubscriptions.keys.toSet();
// Remove subscriptions which are not in subs argument.
for (final channel in currentChannels) {
if (subs.any((e) => e.channel == channel)) continue;
_channelSubscriptions.remove(channel)?.close();
}
// Add non existing subscriptions to registry and mark all connected.
for (final sub in subs) {
(_channelSubscriptions[sub.channel] ??= CentrifugeServerSubscriptionImpl(
channel: sub.channel,
transportWeakRef: _transportWeakRef,
))
.onPush(sub);
}
}

/// Called when subscribed to a server-side channel upon Client moving to
/// connected state or during connection lifetime if server sends Subscribe
/// push message.
/// {@nodoc}
void setSubscribedAll() {
for (final entry in _channelSubscriptions.values) {}
for (final entry in _channelSubscriptions.values) {
if (entry.state.isSubscribed) continue;
}
}

/// Called when existing connection lost (Client reconnects) or Client
/// explicitly disconnected. Client continue keeping server-side subscription
/// registry with stream position information where applicable.
/// {@nodoc}
void setSubscribingAll() {
for (final entry in _channelSubscriptions.values) {}
for (final entry in _channelSubscriptions.values) {
if (entry.state.isSubscribing) continue;
}
}

/// Called when server sent unsubscribe push or server-side subscription
/// previously existed in SDK registry disappeared upon Client reconnect.
/// {@nodoc}
void setUnsubscribedAll() {
for (final entry in _channelSubscriptions.values) {}
for (final entry in _channelSubscriptions.values) {
if (entry.state.isUnsubscribed) continue;
}
}

void close() {
setUnsubscribedAll();
/// Close all subscriptions.
/// {@nodoc}
void close([
int code = 0,
String reason = 'client closed',
]) {
for (final entry in _channelSubscriptions.values) {
entry.close(code, reason).ignore();
}
_channelSubscriptions.clear();
}

Expand Down
6 changes: 5 additions & 1 deletion lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
import 'package:centrifuge_dart/src/model/refresh.dart';
import 'package:centrifuge_dart/src/model/stream_position.dart';
import 'package:centrifuge_dart/src/subscription/server_subscription_manager.dart';
import 'package:centrifuge_dart/src/subscription/subcibed_on_channel.dart';
import 'package:centrifuge_dart/src/util/notifier.dart';
import 'package:meta/meta.dart';
Expand All @@ -31,7 +32,10 @@ abstract interface class ICentrifugeTransport {
/// [url] is a URL of endpoint.
/// [subs] is a list of server-side subscriptions to subscribe on connect.
/// {@nodoc}
Future<void> connect(String url, List<CentrifugeServerSubscription> subs);
Future<void> connect(
String url,
ServerSubscriptionManager serverSubscriptionManager,
);

/// Send asynchronous message to a server. This method makes sense
/// only when using Centrifuge library for Go on a server side. In Centrifuge
Expand Down
Loading

0 comments on commit 1788237

Please sign in to comment.