diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 038b465..41291c8 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -34,9 +34,9 @@ import 'package:stack_trace/stack_trace.dart' as st; /// {@endtemplate} final class Centrifuge extends CentrifugeBase with - CentrifugeEventReceiverMixin, CentrifugeErrorsMixin, CentrifugeStateMixin, + CentrifugeEventReceiverMixin, CentrifugeConnectionMixin, CentrifugeSendMixin, CentrifugeClientSubscriptionMixin, @@ -114,7 +114,8 @@ abstract base class CentrifugeBase implements ICentrifuge { /// Mixin responsible for event receiving and distribution by controllers /// and streams to subscribers. /// {@nodoc} -base mixin CentrifugeEventReceiverMixin on CentrifugeBase { +base mixin CentrifugeEventReceiverMixin + on CentrifugeBase, CentrifugeStateMixin { @protected @nonVirtual final StreamController _pushController = @@ -175,24 +176,45 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase { _pushController.add(event); switch (event) { case CentrifugePublication publication: + logger.fine( + 'Publication event received for channel ${publication.channel}'); _publicationsController.add(publication); case CentrifugeMessage message: + logger.fine('Message event received for channel ${message.channel}'); _messagesController.add(message); case CentrifugeJoin join: + logger.fine('Join event received for channel ${join.channel} ' + 'and user ${join.info.user}'); _presenceController.add(join); _joinController.add(join); case CentrifugeLeave leave: + logger.fine('Leave event received for channel ${leave.channel} ' + 'and user ${leave.info.user}'); _presenceController.add(leave); _leaveController.add(leave); case CentrifugeSubscribe _: - break; + break; // For server side subscriptions. case CentrifugeUnsubscribe _: - break; + break; // For server side subscriptions. case CentrifugeConnect _: break; - case CentrifugeDisconnect _: + case CentrifugeDisconnect event: + final code = event.code; + final reconnect = + code < 3500 || code >= 5000 || (code >= 4000 && code < 4500); + if (reconnect) { + logger.fine('Disconnect transport by server push ' + 'and reconnect after backoff delay'); + _transport.disconnect(code, event.reason).ignore(); + } else { + logger + .fine('Disconnect interactive by server push, without reconnect'); + disconnect().ignore(); + } break; case CentrifugeRefresh _: + logger.fine('Refresh connection token by server push'); + _refreshToken(); break; } } @@ -412,10 +434,13 @@ base mixin CentrifugeConnectionMixin } @override - Future disconnect() async { + Future disconnect([ + int code = 0, + String reason = 'Disconnect called', + ]) async { logger.fine('Interactively disconnecting'); try { - await _transport.disconnect(0, 'Disconnect called'); + await _transport.disconnect(code, reason); } on CentrifugeException catch (error, stackTrace) { _emitError(error, stackTrace); rethrow; @@ -621,8 +646,12 @@ base mixin CentrifugeQueueMixin on CentrifugeBase { ); @override - Future disconnect() => - _eventQueue.push('disconnect', super.disconnect); + Future disconnect([ + int code = 0, + String reason = 'Disconnect called', + ]) => + _eventQueue.push( + 'disconnect', () => super.disconnect(code, reason)); @override Future close() => _eventQueue diff --git a/lib/src/client/centrifuge_interface.dart b/lib/src/client/centrifuge_interface.dart index 2fb955b..b09634e 100644 --- a/lib/src/client/centrifuge_interface.dart +++ b/lib/src/client/centrifuge_interface.dart @@ -29,7 +29,10 @@ abstract interface class ICentrifuge FutureOr ready(); /// Disconnect from the server. - Future disconnect(); + Future disconnect([ + int code = 0, + String reason = 'Disconnect called', + ]); /// Client if not needed anymore. /// Permanent close connection to the server and diff --git a/lib/src/model/channel_push.dart b/lib/src/model/channel_push.dart index 6f50286..e6f8873 100644 --- a/lib/src/model/channel_push.dart +++ b/lib/src/model/channel_push.dart @@ -1,10 +1,10 @@ import 'package:centrifuge_dart/src/model/event.dart'; -/// {@template centrifuge_channel_event} -/// Base class for all channel events. +/// {@template centrifuge_channel_push} +/// Base class for all channel push events. /// {@endtemplate} abstract base class CentrifugeChannelPush extends CentrifugeEvent { - /// {@template centrifuge_channel_event} + /// {@template centrifuge_channel_push} const CentrifugeChannelPush({ required super.timestamp, required this.channel, diff --git a/lib/src/subscription/client_subscription_impl.dart b/lib/src/subscription/client_subscription_impl.dart index de8b84a..748f074 100644 --- a/lib/src/subscription/client_subscription_impl.dart +++ b/lib/src/subscription/client_subscription_impl.dart @@ -202,9 +202,9 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin _presenceController.add(leave); _leaveController.add(leave); case CentrifugeSubscribe _: - break; + break; // For server side subscriptions. case CentrifugeUnsubscribe _: - break; + break; // For server side subscriptions. case CentrifugeConnect _: break; case CentrifugeDisconnect _: diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index 6cfb0bb..dd27f74 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -561,13 +561,14 @@ base mixin CentrifugeWSPBHandlerMixin @pragma('dart2js:tryInline') void _onPush(pb.Push push) { final now = DateTime.now(); + final channel = push.hasChannel() ? push.channel : ''; if (push.hasPub()) { events.notify($publicationDecode(push.channel)(push.pub)); } else if (push.hasMessage()) { events.notify( CentrifugeMessage( timestamp: now, - channel: push.channel, + channel: channel, data: push.message.hasData() ? push.message.data : [], ), ); @@ -575,7 +576,7 @@ base mixin CentrifugeWSPBHandlerMixin events.notify( CentrifugeJoin( timestamp: now, - channel: push.channel, + channel: channel, info: $decodeClientInfo(push.join.info), ), ); @@ -583,7 +584,7 @@ base mixin CentrifugeWSPBHandlerMixin events.notify( CentrifugeLeave( timestamp: now, - channel: push.channel, + channel: channel, info: $decodeClientInfo(push.join.info), ), ); @@ -595,7 +596,7 @@ base mixin CentrifugeWSPBHandlerMixin events.notify( CentrifugeSubscribe( timestamp: now, - channel: push.channel, + channel: channel, positioned: positioned, recoverable: recoverable, data: push.subscribe.hasData() ? push.subscribe.data : [], @@ -610,7 +611,7 @@ base mixin CentrifugeWSPBHandlerMixin events.notify( CentrifugeUnsubscribe( timestamp: now, - channel: push.channel, + channel: channel, code: push.unsubscribe.hasCode() ? push.unsubscribe.code : 0, reason: push.unsubscribe.hasReason() ? push.unsubscribe.reason : 'OK', ), @@ -622,7 +623,7 @@ base mixin CentrifugeWSPBHandlerMixin events.notify( CentrifugeConnect( timestamp: now, - channel: push.channel, + channel: channel, data: push.message.hasData() ? push.message.data : [], client: connect.hasClient() ? connect.client : '', version: connect.hasVersion() ? connect.version : '', @@ -639,9 +640,11 @@ base mixin CentrifugeWSPBHandlerMixin events.notify( CentrifugeDisconnect( timestamp: now, - channel: push.channel, + channel: channel, code: push.disconnect.hasCode() ? push.disconnect.code : 0, - reason: push.disconnect.hasReason() ? push.disconnect.reason : 'OK', + reason: push.disconnect.hasReason() + ? push.disconnect.reason + : 'disconnect from server', reconnect: push.disconnect.hasReconnect() && push.disconnect.reconnect, ), @@ -649,7 +652,7 @@ base mixin CentrifugeWSPBHandlerMixin } else if (push.hasRefresh()) { events.notify(CentrifugeRefresh( timestamp: now, - channel: push.channel, + channel: channel, expires: push.refresh.hasExpires() && push.refresh.expires, ttl: push.refresh.hasTtl() ? now.add(Duration(seconds: push.refresh.ttl))