Skip to content

Commit

Permalink
Add transport implementation for subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Jul 27, 2023
1 parent cd69c87 commit 5c5a2db
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 31 deletions.
2 changes: 1 addition & 1 deletion lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ base mixin CentrifugeClientSubscriptionMixin
rethrow;
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeSubscriptionException(
subscription: subscription,
channel: subscription.channel,
message: 'Error while unsubscribing',
error: error,
);
Expand Down
6 changes: 2 additions & 4 deletions lib/src/model/config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ typedef CentrifugeToken = String;
typedef CentrifugeTokenCallback = FutureOr<CentrifugeToken?> Function();

/// Callback to get initial connection payload data.
/// For example to send JWT or other auth data.
///
/// If method returns null then no payload will be sent at connect time.
typedef CentrifugeConnectionPayloadCallback = FutureOr<List<int>?> Function();
Expand Down Expand Up @@ -63,9 +62,8 @@ final class CentrifugeConfig {
/// If method returns null then connection will be established without token.
final CentrifugeTokenCallback? getToken;

/// The data send for the first request
/// Callback to get initial connection payload data.
/// For example to send JWT or other auth data.
/// Callback to get connection payload data.
/// The resulted data send with every connect request.
///
/// If method returns null then no payload will be sent at connect time.
final CentrifugeConnectionPayloadCallback? getPayload;
Expand Down
7 changes: 3 additions & 4 deletions lib/src/model/exception.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import 'package:centrifuge_dart/interface.dart';
import 'package:meta/meta.dart';

/// {@template exception}
Expand Down Expand Up @@ -94,7 +93,7 @@ final class CentrifugePingException extends CentrifugeException {
final class CentrifugeSubscriptionException extends CentrifugeException {
/// {@macro exception}
const CentrifugeSubscriptionException({
required this.subscription,
required this.channel,
required String message,
Object? error,
}) : super(
Expand All @@ -103,8 +102,8 @@ final class CentrifugeSubscriptionException extends CentrifugeException {
error,
);

/// Subscription
final ICentrifugeSubscription subscription;
/// Subscription channel.
final String channel;
}

/// {@macro exception}
Expand Down
4 changes: 4 additions & 0 deletions lib/src/model/stream_position.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import 'package:fixnum/fixnum.dart' as fixnum;

/// Stream position.
typedef CentrifugeStreamPosition = ({fixnum.Int64 offset, String epoch});
51 changes: 51 additions & 0 deletions lib/src/model/subcibed_on_channel.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import 'package:centrifuge_dart/src/model/publication.dart';
import 'package:centrifuge_dart/src/model/stream_position.dart';
import 'package:meta/meta.dart';

/// Subscribed on channel message.
@immutable
final class SubcibedOnChannel {
/// Subscribed on channel message.
const SubcibedOnChannel({
required this.channel,
required this.expires,
required this.ttl,
required this.recoverable,
required this.since,
required this.publications,
required this.recovered,
required this.positioned,
required this.wasRecovering,
required this.data,
});

/// Channel name.
final String channel;

/// Whether channel is expired.
final bool expires;

/// Time to live in seconds.
final DateTime? ttl;

/// Whether channel is recoverable.
final bool recoverable;

/// Stream position.
final CentrifugeStreamPosition? since;

/// List of publications since last stream position.
final List<CentrifugePublication> publications;

/// Whether channel is recovered after stream failure.
final bool recovered;

/// Whether channel is positioned at last stream position.
final bool positioned;

/// Whether channel is recovering after stream failure.
final bool wasRecovering;

/// Raw data.
final List<int>? data;
}
13 changes: 9 additions & 4 deletions lib/src/model/subscription_config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ typedef CentrifugeSubscriptionToken = String;
typedef CentrifugeSubscriptionTokenCallback
= FutureOr<CentrifugeSubscriptionToken?> Function();

/// Callback to set subscription payload data.
///
/// If method returns null then no payload will be sent at subscribe time.
typedef CentrifugeSubscribePayloadCallback = FutureOr<List<int>?> Function();

/// {@template subscription_config}
/// Subscription common options
///
Expand Down Expand Up @@ -38,7 +43,7 @@ class CentrifugeSubscriptionConfig {
/// {@macro subscription_config}
const CentrifugeSubscriptionConfig({
this.getToken,
this.data,
this.getPayload,
this.resubscribeInterval = (
min: const Duration(milliseconds: 500),
max: const Duration(seconds: 20),
Expand All @@ -62,13 +67,13 @@ class CentrifugeSubscriptionConfig {
final CentrifugeSubscriptionTokenCallback? getToken;

/// Data to send with subscription request.
/// Subscription [data] is attached to every subscribe/resubscribe request.
final List<int>? data;
/// Subscription `data` is attached to every subscribe/resubscribe request.
final CentrifugeSubscribePayloadCallback? getPayload;

/// Resubscribe backoff algorithm
final ({Duration min, Duration max}) resubscribeInterval;

/// start Subscription [since] known Stream Position
/// Start Subscription [since] known Stream Position
/// (i.e. attempt recovery on first subscribe)
final ({fixnum.Int64 offset, String epoch})? since;

Expand Down
26 changes: 18 additions & 8 deletions lib/src/model/subscription_state.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'package:fixnum/fixnum.dart' as fixnum;
import 'package:meta/meta.dart';

/// {@template subscription_state}
Expand All @@ -15,7 +16,7 @@ import 'package:meta/meta.dart';
sealed class CentrifugeSubscriptionState
extends _$CentrifugeSubscriptionStateBase {
/// {@macro subscription_state}
const CentrifugeSubscriptionState(super.timestamp);
const CentrifugeSubscriptionState(super.timestamp, super.since);

/// Unsubscribed
/// {@macro subscription_state}
Expand All @@ -41,8 +42,10 @@ sealed class CentrifugeSubscriptionState
final class CentrifugeSubscriptionState$Unsubscribed
extends CentrifugeSubscriptionState with _$CentrifugeSubscriptionState {
/// {@nodoc}
CentrifugeSubscriptionState$Unsubscribed({DateTime? timestamp})
: super(timestamp ?? DateTime.now());
CentrifugeSubscriptionState$Unsubscribed({
DateTime? timestamp,
({fixnum.Int64 offset, String epoch})? since,
}) : super(timestamp ?? DateTime.now(), since);

@override
R map<R>({
Expand Down Expand Up @@ -75,8 +78,10 @@ final class CentrifugeSubscriptionState$Unsubscribed
final class CentrifugeSubscriptionState$Subscribing
extends CentrifugeSubscriptionState with _$CentrifugeSubscriptionState {
/// {@nodoc}
CentrifugeSubscriptionState$Subscribing({DateTime? timestamp})
: super(timestamp ?? DateTime.now());
CentrifugeSubscriptionState$Subscribing({
DateTime? timestamp,
({fixnum.Int64 offset, String epoch})? since,
}) : super(timestamp ?? DateTime.now(), since);

@override
R map<R>({
Expand Down Expand Up @@ -109,8 +114,10 @@ final class CentrifugeSubscriptionState$Subscribing
final class CentrifugeSubscriptionState$Subscribed
extends CentrifugeSubscriptionState with _$CentrifugeSubscriptionState {
/// {@nodoc}
CentrifugeSubscriptionState$Subscribed({DateTime? timestamp})
: super(timestamp ?? DateTime.now());
CentrifugeSubscriptionState$Subscribed({
DateTime? timestamp,
({fixnum.Int64 offset, String epoch})? since,
}) : super(timestamp ?? DateTime.now(), since);

@override
R map<R>({
Expand Down Expand Up @@ -148,11 +155,14 @@ typedef CentrifugeSubscriptionStateMatch<R,
@immutable
abstract base class _$CentrifugeSubscriptionStateBase {
/// {@nodoc}
const _$CentrifugeSubscriptionStateBase(this.timestamp);
const _$CentrifugeSubscriptionStateBase(this.timestamp, this.since);

/// Timestamp of state change.
final DateTime timestamp;

/// Stream Position
final ({fixnum.Int64 offset, String epoch})? since;

/// Pattern matching for [CentrifugeSubscriptionState].
R map<R>({
required CentrifugeSubscriptionStateMatch<R,
Expand Down
6 changes: 3 additions & 3 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeSubscriptionException(
message: 'Error while subscribing',
subscription: this,
channel: channel,
error: error,
);
_emitError(centrifugeException, stackTrace);
Expand All @@ -199,7 +199,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
case CentrifugeSubscriptionState$Unsubscribed _:
throw CentrifugeSubscriptionException(
message: 'Subscription is not subscribed',
subscription: this,
channel: channel,
);
case CentrifugeSubscriptionState$Subscribed _:
return;
Expand All @@ -212,7 +212,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin
} on Object catch (error, stackTrace) {
final centrifugeException = CentrifugeSubscriptionException(
message: 'Subscription is not subscribed',
subscription: this,
channel: channel,
error: error,
);
_emitError(centrifugeException, stackTrace);
Expand Down
4 changes: 2 additions & 2 deletions lib/src/subscription/client_subscription_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ final class ClientSubscriptionManager {
) {
if (_channelSubscriptions.containsKey(channel)) {
throw CentrifugeSubscriptionException(
subscription: _channelSubscriptions[channel]!,
channel: channel,
message: 'Subscription to a channel "$channel" already exists '
'in client\'s internal registry',
);
Expand Down Expand Up @@ -80,7 +80,7 @@ final class ClientSubscriptionManager {
} on Object catch (error, stackTrace) {
Error.throwWithStackTrace(
CentrifugeSubscriptionException(
subscription: subscription,
channel: subscription.channel,
message: 'Error while unsubscribing',
error: error,
),
Expand Down
10 changes: 10 additions & 0 deletions lib/src/transport/transport_interface.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import 'dart:async';

import 'package:centrifuge_dart/src/model/state.dart';
import 'package:centrifuge_dart/src/model/stream_position.dart';
import 'package:centrifuge_dart/src/model/subcibed_on_channel.dart';
import 'package:centrifuge_dart/src/model/subscription_config.dart';
import 'package:meta/meta.dart';

/// Class responsible for sending and receiving data from the server.
Expand All @@ -25,6 +28,13 @@ abstract interface class ICentrifugeTransport {
/// asynchronous message handler does not exist.
Future<void> sendAsyncMessage(List<int> data);

/// Subscribe on channel with optional [since] position.
Future<SubcibedOnChannel> subscribe(
String channel,
CentrifugeSubscriptionConfig config,
CentrifugeStreamPosition? since,
);

/// Disconnect from the server.
/// e.g. code: 0, reason: 'disconnect called'
/// {@nodoc}
Expand Down
90 changes: 85 additions & 5 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import 'dart:async';
import 'dart:convert';

import 'package:centrifuge_dart/src/model/config.dart';
import 'package:centrifuge_dart/centrifuge.dart';
import 'package:centrifuge_dart/src/model/disconnect_code.dart';
import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/protobuf/client.pb.dart' as pb;
import 'package:centrifuge_dart/src/model/state.dart';
import 'package:centrifuge_dart/src/model/stream_position.dart';
import 'package:centrifuge_dart/src/model/subcibed_on_channel.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
import 'package:centrifuge_dart/src/transport/transport_protobuf_codec.dart';
import 'package:centrifuge_dart/src/util/logger.dart' as logger;
Expand Down Expand Up @@ -545,8 +545,88 @@ base mixin CentrifugeWSPBHandlerMixin
/// Mixin responsible for centrifuge subscriptions.
/// {@nodoc}
@internal
base mixin CentrifugeWSPBSubscription on CentrifugeWSPBTransportBase {
Future<void> resubscribe() async {}
base mixin CentrifugeWSPBSubscription
on CentrifugeWSPBTransportBase, CentrifugeWSPBSenderMixin {
@override
Future<SubcibedOnChannel> subscribe(
String channel,
CentrifugeSubscriptionConfig config,
CentrifugeStreamPosition? since,
) async {
if (!state.isConnected) {
throw CentrifugeSubscriptionException(
channel: channel,
message: 'Centrifuge client is not connected',
);
}
final request = pb.SubscribeRequest()
..channel = channel
..positioned = config.positioned
..recoverable = config.recoverable
..joinLeave = config.joinLeave;
final token = await config.getToken?.call();
assert(
token == null || token.length > 5,
'Centrifuge Subscription JWT is too short',
);
if (token != null && token.isNotEmpty) request.token = token;
final data = await config.getPayload?.call();
if (data != null) request.data = data;
if (since != null) {
request
..recover = true
..offset = since.offset
..epoch = since.epoch;
} else {
request.recover = false;
}
final pb.SubscribeResult result;
try {
result = await _sendMessage(request, pb.SubscribeResult())
.timeout(config.timeout);
} on Object catch (error, stackTrace) {
Error.throwWithStackTrace(
CentrifugeSubscriptionException(
channel: channel,
message: 'Error while making subscribe request',
error: error,
),
stackTrace,
);
}
final now = DateTime.now();
final publications = <CentrifugePublication>[
for (final pub in result.publications)
CentrifugePublication(
offset: pub.offset,
data: pub.data,
info: pub.hasInfo()
? CentrifugeClientInfo(
channelInfo: pub.info.chanInfo,
client: pub.info.client,
user: pub.info.user,
connectionInfo: pub.info.connInfo,
)
: null,
),
];
final recoverable = result.hasRecoverable() && result.recoverable;
final expires = result.hasExpires() && result.expires && result.hasTtl();
return SubcibedOnChannel(
channel: channel,
expires: expires,
ttl: expires ? now.add(Duration(seconds: result.ttl)) : null,
recoverable: recoverable,
since: recoverable && result.hasOffset() && result.hasEpoch()
? (offset: result.offset, epoch: result.epoch)
: null,
publications: publications,
recovered: result.hasRecovered() && result.recovered,
positioned: result.hasPositioned() && result.positioned,
wasRecovering: result.hasWasRecovering() && result.wasRecovering,
data: result.hasData() ? result.data : null,
);
}
}

/// To maintain connection alive and detect broken connections
Expand Down

0 comments on commit 5c5a2db

Please sign in to comment.