Skip to content

Commit

Permalink
Update refresh events
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Aug 1, 2023
1 parent 5bd8df2 commit dca0d68
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 24 deletions.
47 changes: 38 additions & 9 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import 'package:stack_trace/stack_trace.dart' as st;
/// {@endtemplate}
final class Centrifuge extends CentrifugeBase
with
CentrifugeEventReceiverMixin,
CentrifugeErrorsMixin,
CentrifugeStateMixin,
CentrifugeEventReceiverMixin,
CentrifugeConnectionMixin,
CentrifugeSendMixin,
CentrifugeClientSubscriptionMixin,
Expand Down Expand Up @@ -114,7 +114,8 @@ abstract base class CentrifugeBase implements ICentrifuge {
/// Mixin responsible for event receiving and distribution by controllers
/// and streams to subscribers.
/// {@nodoc}
base mixin CentrifugeEventReceiverMixin on CentrifugeBase {
base mixin CentrifugeEventReceiverMixin
on CentrifugeBase, CentrifugeStateMixin {
@protected
@nonVirtual
final StreamController<CentrifugeChannelPush> _pushController =
Expand Down Expand Up @@ -175,24 +176,45 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase {
_pushController.add(event);
switch (event) {
case CentrifugePublication publication:
logger.fine(
'Publication event received for channel ${publication.channel}');
_publicationsController.add(publication);
case CentrifugeMessage message:
logger.fine('Message event received for channel ${message.channel}');
_messagesController.add(message);
case CentrifugeJoin join:
logger.fine('Join event received for channel ${join.channel} '
'and user ${join.info.user}');
_presenceController.add(join);
_joinController.add(join);
case CentrifugeLeave leave:
logger.fine('Leave event received for channel ${leave.channel} '
'and user ${leave.info.user}');
_presenceController.add(leave);
_leaveController.add(leave);
case CentrifugeSubscribe _:
break;
break; // For server side subscriptions.
case CentrifugeUnsubscribe _:
break;
break; // For server side subscriptions.
case CentrifugeConnect _:
break;
case CentrifugeDisconnect _:
case CentrifugeDisconnect event:
final code = event.code;
final reconnect =
code < 3500 || code >= 5000 || (code >= 4000 && code < 4500);
if (reconnect) {
logger.fine('Disconnect transport by server push '
'and reconnect after backoff delay');
_transport.disconnect(code, event.reason).ignore();
} else {
logger
.fine('Disconnect interactive by server push, without reconnect');
disconnect().ignore();
}
break;
case CentrifugeRefresh _:
logger.fine('Refresh connection token by server push');
_refreshToken();
break;
}
}
Expand Down Expand Up @@ -412,10 +434,13 @@ base mixin CentrifugeConnectionMixin
}

@override
Future<void> disconnect() async {
Future<void> disconnect([
int code = 0,
String reason = 'Disconnect called',
]) async {
logger.fine('Interactively disconnecting');
try {
await _transport.disconnect(0, 'Disconnect called');
await _transport.disconnect(code, reason);
} on CentrifugeException catch (error, stackTrace) {
_emitError(error, stackTrace);
rethrow;
Expand Down Expand Up @@ -621,8 +646,12 @@ base mixin CentrifugeQueueMixin on CentrifugeBase {
);

@override
Future<void> disconnect() =>
_eventQueue.push<void>('disconnect', super.disconnect);
Future<void> disconnect([
int code = 0,
String reason = 'Disconnect called',
]) =>
_eventQueue.push<void>(
'disconnect', () => super.disconnect(code, reason));

@override
Future<void> close() => _eventQueue
Expand Down
5 changes: 4 additions & 1 deletion lib/src/client/centrifuge_interface.dart
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ abstract interface class ICentrifuge
FutureOr<void> ready();

/// Disconnect from the server.
Future<void> disconnect();
Future<void> disconnect([
int code = 0,
String reason = 'Disconnect called',
]);

/// Client if not needed anymore.
/// Permanent close connection to the server and
Expand Down
6 changes: 3 additions & 3 deletions lib/src/model/channel_push.dart
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import 'package:centrifuge_dart/src/model/event.dart';

/// {@template centrifuge_channel_event}
/// Base class for all channel events.
/// {@template centrifuge_channel_push}
/// Base class for all channel push events.
/// {@endtemplate}
abstract base class CentrifugeChannelPush extends CentrifugeEvent {
/// {@template centrifuge_channel_event}
/// {@template centrifuge_channel_push}
const CentrifugeChannelPush({
required super.timestamp,
required this.channel,
Expand Down
4 changes: 2 additions & 2 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin
_presenceController.add(leave);
_leaveController.add(leave);
case CentrifugeSubscribe _:
break;
break; // For server side subscriptions.
case CentrifugeUnsubscribe _:
break;
break; // For server side subscriptions.
case CentrifugeConnect _:
break;
case CentrifugeDisconnect _:
Expand Down
21 changes: 12 additions & 9 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -561,29 +561,30 @@ base mixin CentrifugeWSPBHandlerMixin
@pragma('dart2js:tryInline')
void _onPush(pb.Push push) {
final now = DateTime.now();
final channel = push.hasChannel() ? push.channel : '';
if (push.hasPub()) {
events.notify($publicationDecode(push.channel)(push.pub));
} else if (push.hasMessage()) {
events.notify(
CentrifugeMessage(
timestamp: now,
channel: push.channel,
channel: channel,
data: push.message.hasData() ? push.message.data : <int>[],
),
);
} else if (push.hasJoin()) {
events.notify(
CentrifugeJoin(
timestamp: now,
channel: push.channel,
channel: channel,
info: $decodeClientInfo(push.join.info),
),
);
} else if (push.hasLeave()) {
events.notify(
CentrifugeLeave(
timestamp: now,
channel: push.channel,
channel: channel,
info: $decodeClientInfo(push.join.info),
),
);
Expand All @@ -595,7 +596,7 @@ base mixin CentrifugeWSPBHandlerMixin
events.notify(
CentrifugeSubscribe(
timestamp: now,
channel: push.channel,
channel: channel,
positioned: positioned,
recoverable: recoverable,
data: push.subscribe.hasData() ? push.subscribe.data : <int>[],
Expand All @@ -610,7 +611,7 @@ base mixin CentrifugeWSPBHandlerMixin
events.notify(
CentrifugeUnsubscribe(
timestamp: now,
channel: push.channel,
channel: channel,
code: push.unsubscribe.hasCode() ? push.unsubscribe.code : 0,
reason: push.unsubscribe.hasReason() ? push.unsubscribe.reason : 'OK',
),
Expand All @@ -622,7 +623,7 @@ base mixin CentrifugeWSPBHandlerMixin
events.notify(
CentrifugeConnect(
timestamp: now,
channel: push.channel,
channel: channel,
data: push.message.hasData() ? push.message.data : <int>[],
client: connect.hasClient() ? connect.client : '',
version: connect.hasVersion() ? connect.version : '',
Expand All @@ -639,17 +640,19 @@ base mixin CentrifugeWSPBHandlerMixin
events.notify(
CentrifugeDisconnect(
timestamp: now,
channel: push.channel,
channel: channel,
code: push.disconnect.hasCode() ? push.disconnect.code : 0,
reason: push.disconnect.hasReason() ? push.disconnect.reason : 'OK',
reason: push.disconnect.hasReason()
? push.disconnect.reason
: 'disconnect from server',
reconnect:
push.disconnect.hasReconnect() && push.disconnect.reconnect,
),
);
} else if (push.hasRefresh()) {
events.notify(CentrifugeRefresh(
timestamp: now,
channel: push.channel,
channel: channel,
expires: push.refresh.hasExpires() && push.refresh.expires,
ttl: push.refresh.hasTtl()
? now.add(Duration(seconds: push.refresh.ttl))
Expand Down

0 comments on commit dca0d68

Please sign in to comment.