Skip to content

Commit

Permalink
Add since to state
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Aug 2, 2023
1 parent 3672a95 commit 4f31fc4
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 37 deletions.
28 changes: 21 additions & 7 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,10 @@ base mixin CentrifugeEventReceiverMixin
'and user ${leave.info.user}');
_presenceController.add(leave);
_leaveController.add(leave);
case CentrifugeSubscribe _:
break; // For server side subscriptions.
case CentrifugeUnsubscribe _:
break; // For server side subscriptions.
case CentrifugeSubscribe subscribe:
_serverSubscriptionManager.subscribe(subscribe);
case CentrifugeUnsubscribe unsubscribe:
_serverSubscriptionManager.unsubscribe(unsubscribe);
case CentrifugeConnect _:
break;
case CentrifugeDisconnect event:
Expand Down Expand Up @@ -383,7 +383,9 @@ base mixin CentrifugeConnectionMixin
try {
_refreshTimer?.cancel();
_refreshTimer = null;
await _transport.connect(url);
final subs = _serverSubscriptionManager.subscriptions.values
.toList(growable: false);
await _transport.connect(url, subs);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
Expand Down Expand Up @@ -544,18 +546,30 @@ base mixin CentrifugeClientSubscriptionMixin
@override
Future<void> close() async {
await super.close();
_clientSubscriptionManager.removeAll();
_clientSubscriptionManager.close();
}
}

/// Mixin responsible for server-side subscriptions.
/// {@nodoc}
@internal
base mixin CentrifugeServerSubscriptionMixin on CentrifugeBase {
@override
void _onConnected(CentrifugeState$Connected state) {
super._onConnected(state);
_serverSubscriptionManager.setSubscribedAll();
}

@override
void _onDisconnected(CentrifugeState$Disconnected state) {
super._onDisconnected(state);
_serverSubscriptionManager.setSubscribingAll();
}

@override
Future<void> close() async {
await super.close();
_serverSubscriptionManager.setUnsubscribed();
_serverSubscriptionManager.close();
}
}

Expand Down
16 changes: 13 additions & 3 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ abstract base class CentrifugeClientSubscriptionBase
/// Offset of last received publication.
late fixnum.Int64 _offset;

@override
CentrifugeStreamPosition? get since => switch (state.since?.epoch) {
String epoch => (epoch: epoch, offset: _offset),
_ => null,
};

/// Weak reference to transport.
/// {@nodoc}
@nonVirtual
Expand Down Expand Up @@ -287,7 +293,10 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
}
_refreshTimer?.cancel();
_refreshTimer = null;
_setState(CentrifugeSubscriptionState$Subscribing(since: state.since));
_setState(CentrifugeSubscriptionState$Subscribing(
since: since ?? state.since,
recoverable: state.recoverable,
));
final subscribed = await _transport.subscribe(
channel,
_config,
Expand All @@ -302,7 +311,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
final offset = subscribed.since?.offset;
if (offset != null && offset > _offset) _offset = offset;
_setState(CentrifugeSubscriptionState$Subscribed(
since: subscribed.since,
since: subscribed.since ?? since ?? state.since,
recoverable: subscribed.recoverable,
ttl: subscribed.ttl,
));
Expand Down Expand Up @@ -380,7 +389,8 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
_setState(CentrifugeSubscriptionState.unsubscribed(
code: code,
reason: reason,
since: state.since,
since: since ?? state.since,
recoverable: state.recoverable,
));
try {
await _transport.unsubscribe(channel, _config);
Expand Down
2 changes: 1 addition & 1 deletion lib/src/subscription/client_subscription_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ final class ClientSubscriptionManager {

/// Remove all subscriptions for the specific client from internal registry.
/// {@nodoc}
void removeAll([
void close([
int code = 0,
String reason = 'client closed',
]) {
Expand Down
13 changes: 11 additions & 2 deletions lib/src/subscription/server_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ abstract base class CentrifugeServerSubscriptionBase
@override
final String channel;

@override
CentrifugeStreamPosition? get since => switch (state.since?.epoch) {
String epoch => (epoch: epoch, offset: _offset),
_ => null,
};

/// Offset of last received publication.
fixnum.Int64 _offset = fixnum.Int64.ZERO;

Expand Down Expand Up @@ -198,15 +204,18 @@ base mixin CentrifugeServerSubscriptionEventReceiverMixin
_presenceController.add(leave);
_leaveController.add(leave);
case CentrifugeSubscribe sub:
final offset = sub.streamPosition?.offset;
if (offset != null && offset > _offset) _offset = offset;
_setState(CentrifugeSubscriptionState.subscribed(
since: sub.streamPosition,
since: sub.streamPosition ?? since ?? state.since,
recoverable: sub.recoverable,
));
case CentrifugeUnsubscribe unsub:
_setState(CentrifugeSubscriptionState.unsubscribed(
code: unsub.code,
reason: unsub.reason,
since: state.since,
recoverable: state.recoverable,
since: since ?? state.since,
));
case CentrifugeConnect _:
break;
Expand Down
16 changes: 12 additions & 4 deletions lib/src/subscription/server_subscription_manager.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import 'dart:collection';

import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/subscribe.dart';
import 'package:centrifuge_dart/src/model/unsubscribe.dart';
import 'package:centrifuge_dart/src/subscription/server_subscription_impl.dart';
import 'package:centrifuge_dart/src/subscription/subscription.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
Expand Down Expand Up @@ -35,31 +37,37 @@ final class ServerSubscriptionManager {
entry.key: entry.value,
});

/// Called on [CentrifugeSubscribe] push from server.
void subscribe(CentrifugeSubscribe subscribe) {}

/// Called on [CentrifugeUnsubscribe] push from server.
void unsubscribe(CentrifugeUnsubscribe subscribe) {}

/// Called when subscribed to a server-side channel upon Client moving to
/// connected state or during connection lifetime if server sends Subscribe
/// push message.
/// {@nodoc}
void setSubscribed() {
void setSubscribedAll() {
for (final entry in _channelSubscriptions.values) {}
}

/// Called when existing connection lost (Client reconnects) or Client
/// explicitly disconnected. Client continue keeping server-side subscription
/// registry with stream position information where applicable.
/// {@nodoc}
void setSubscribing() {
void setSubscribingAll() {
for (final entry in _channelSubscriptions.values) {}
}

/// Called when server sent unsubscribe push or server-side subscription
/// previously existed in SDK registry disappeared upon Client reconnect.
/// {@nodoc}
void setUnsubscribed() {
void setUnsubscribedAll() {
for (final entry in _channelSubscriptions.values) {}
}

void close() {
setUnsubscribed();
setUnsubscribedAll();
_channelSubscriptions.clear();
}

Expand Down
3 changes: 3 additions & 0 deletions lib/src/subscription/subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ abstract interface class ICentrifugeSubscription {
/// Current subscription state.
abstract final CentrifugeSubscriptionState state;

/// Offset of last successfully received message.
abstract final CentrifugeStreamPosition? since;

/// Stream of subscription states.
abstract final CentrifugeSubscriptionStateStream states;

Expand Down
38 changes: 29 additions & 9 deletions lib/src/subscription/subscription_state.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import 'package:meta/meta.dart';
sealed class CentrifugeSubscriptionState
extends _$CentrifugeSubscriptionStateBase {
/// {@macro subscription_state}
const CentrifugeSubscriptionState(super.timestamp, super.since);
const CentrifugeSubscriptionState(
{required super.timestamp,
required super.since,
required super.recoverable});

/// Unsubscribed
/// {@macro subscription_state}
Expand All @@ -25,13 +28,15 @@ sealed class CentrifugeSubscriptionState
required String reason,
DateTime? timestamp,
({fixnum.Int64 offset, String epoch})? since,
bool recoverable,
}) = CentrifugeSubscriptionState$Unsubscribed;

/// Subscribing
/// {@macro subscription_state}
factory CentrifugeSubscriptionState.subscribing({
DateTime? timestamp,
({fixnum.Int64 offset, String epoch})? since,
bool recoverable,
}) = CentrifugeSubscriptionState$Subscribing;

/// Subscribed
Expand All @@ -57,7 +62,11 @@ final class CentrifugeSubscriptionState$Unsubscribed
required this.reason,
DateTime? timestamp,
({fixnum.Int64 offset, String epoch})? since,
}) : super(timestamp ?? DateTime.now(), since);
bool recoverable = false,
}) : super(
timestamp: timestamp ?? DateTime.now(),
since: since,
recoverable: recoverable);

/// Unsubscribe code.
final int code;
Expand Down Expand Up @@ -108,7 +117,11 @@ final class CentrifugeSubscriptionState$Subscribing
CentrifugeSubscriptionState$Subscribing({
DateTime? timestamp,
({fixnum.Int64 offset, String epoch})? since,
}) : super(timestamp ?? DateTime.now(), since);
bool recoverable = false,
}) : super(
timestamp: timestamp ?? DateTime.now(),
since: since,
recoverable: recoverable);

@override
bool get isUnsubscribed => false;
Expand Down Expand Up @@ -153,12 +166,12 @@ final class CentrifugeSubscriptionState$Subscribed
CentrifugeSubscriptionState$Subscribed({
DateTime? timestamp,
({fixnum.Int64 offset, String epoch})? since,
this.recoverable = false,
bool recoverable = false,
this.ttl,
}) : super(timestamp ?? DateTime.now(), since);

/// Whether channel is recoverable.
final bool recoverable;
}) : super(
timestamp: timestamp ?? DateTime.now(),
since: since,
recoverable: recoverable);

/// Time to live in seconds.
final DateTime? ttl;
Expand Down Expand Up @@ -208,14 +221,21 @@ typedef CentrifugeSubscriptionStateMatch<R,
@immutable
abstract base class _$CentrifugeSubscriptionStateBase {
/// {@nodoc}
const _$CentrifugeSubscriptionStateBase(this.timestamp, this.since);
const _$CentrifugeSubscriptionStateBase({
required this.timestamp,
required this.since,
required this.recoverable,
});

/// Timestamp of state change.
final DateTime timestamp;

/// Stream Position
final ({fixnum.Int64 offset, String epoch})? since;

/// Whether channel is recoverable.
final bool recoverable;

/// Whether channel is unsubscribed.
abstract final bool isUnsubscribed;

Expand Down
3 changes: 2 additions & 1 deletion lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ abstract interface class ICentrifugeTransport {

/// Connect to the server.
/// [url] is a URL of endpoint.
/// [subs] is a list of server-side subscriptions to subscribe on connect.
/// {@nodoc}
Future<void> connect(String url);
Future<void> connect(String url, List<CentrifugeServerSubscription> subs);

/// Send asynchronous message to a server. This method makes sense
/// only when using Centrifuge library for Go on a server side. In Centrifuge
Expand Down
Loading

0 comments on commit 4f31fc4

Please sign in to comment.