diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index cfc7015..94ee1b2 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -5,11 +5,14 @@ import 'package:centrifuge_dart/src/client/config.dart'; import 'package:centrifuge_dart/src/client/disconnect_code.dart'; import 'package:centrifuge_dart/src/client/state.dart'; import 'package:centrifuge_dart/src/client/states_stream.dart'; +import 'package:centrifuge_dart/src/model/channel_presence.dart'; +import 'package:centrifuge_dart/src/model/channel_push.dart'; import 'package:centrifuge_dart/src/model/event.dart'; -import 'package:centrifuge_dart/src/model/event_stream.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; +import 'package:centrifuge_dart/src/model/publication.dart'; +import 'package:centrifuge_dart/src/model/pushes_stream.dart'; import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart'; import 'package:centrifuge_dart/src/subscription/subscription.dart'; import 'package:centrifuge_dart/src/subscription/subscription_config.dart'; @@ -25,6 +28,7 @@ import 'package:stack_trace/stack_trace.dart' as st; /// {@endtemplate} final class Centrifuge extends CentrifugeBase with + CentrifugeEventReceiverMixin, CentrifugeErrorsMixin, CentrifugeStateMixin, CentrifugeConnectionMixin, @@ -66,24 +70,17 @@ abstract base class CentrifugeBase implements ICentrifuge { @nonVirtual final CentrifugeConfig _config; - @protected - @nonVirtual - final StreamController _eventsController = - StreamController.broadcast(); - - @override - @nonVirtual - late final CentrifugeEventStream events = - CentrifugeEventStream(_eventsController.stream); + /// Manager responsible for client-side subscriptions. + /// {@nodoc} + late final ClientSubscriptionManager _clientSubscriptionManager = + ClientSubscriptionManager(_transport); /// Init centrifuge client, override this method to add custom logic. /// This method is called in constructor. /// {@nodoc} @protected @mustCallSuper - void _initCentrifuge() { - _transport.events.addListener(_onEvent); - } + void _initCentrifuge() {} /// Called when connection established. /// Right before [CentrifugeState$Connected] state. @@ -103,25 +100,92 @@ abstract base class CentrifugeBase implements ICentrifuge { logger.fine('Connection lost'); } + @override + @mustCallSuper + Future close() async {} +} + +/// Mixin responsible for event receiving and distribution by controllers +/// and streams to subscribers. +/// {@nodoc} +base mixin CentrifugeEventReceiverMixin on CentrifugeBase { + @protected + @nonVirtual + final StreamController _pushController = + StreamController.broadcast(); + + @protected + @nonVirtual + final StreamController _publicationsController = + StreamController.broadcast(); + + @protected + @nonVirtual + final StreamController _joinController = + StreamController.broadcast(); + + @protected + @nonVirtual + final StreamController _leaveController = + StreamController.broadcast(); + + @protected + @nonVirtual + final StreamController _presenceController = + StreamController.broadcast(); + + @override + @nonVirtual + late final CentrifugePushesStream stream = CentrifugePushesStream( + pushes: _pushController.stream, + publications: _publicationsController.stream, + presenceEvents: _presenceController.stream, + joinEvents: _joinController.stream, + leaveEvents: _leaveController.stream, + ); + + @override + void _initCentrifuge() { + _transport.events.addListener(_onEvent); + super._initCentrifuge(); + } + /// Router for all events. /// {@nodoc} @protected - @mustCallSuper + @nonVirtual + @pragma('vm:prefer-inline') + @pragma('dart2js:tryInline') void _onEvent(CentrifugeEvent event) { - /* switch (event) { + if (event is! CentrifugeChannelPush) return; + // This is a push to a channel. + _clientSubscriptionManager.onPush(event); + _pushController.add(event); + switch (event) { case CentrifugePublication publication: - _onPublication(publication); - case CentrifugeChannelPresenceEvent event: - _onPresenceEvent(event); - } */ - _eventsController.add(event); + _publicationsController.add(publication); + case CentrifugeJoin join: + _presenceController.add(join); + _joinController.add(join); + case CentrifugeLeave leave: + _presenceController.add(leave); + _leaveController.add(leave); + } } @override - @mustCallSuper Future close() async { + await super.close(); _transport.events.removeListener(_onEvent); - _eventsController.close().ignore(); + for (final controller in >[ + _pushController, + _publicationsController, + _joinController, + _leaveController, + _presenceController, + ]) { + controller.close().ignore(); + } } } @@ -373,9 +437,6 @@ base mixin CentrifugeSendMixin on CentrifugeBase, CentrifugeErrorsMixin { @internal base mixin CentrifugeClientSubscriptionMixin on CentrifugeBase, CentrifugeErrorsMixin { - late final ClientSubscriptionManager _clientSubscriptionManager = - ClientSubscriptionManager(_transport); - @override CentrifugeClientSubscription newSubscription( String channel, [ diff --git a/lib/src/client/centrifuge_interface.dart b/lib/src/client/centrifuge_interface.dart index fb8353f..2fb955b 100644 --- a/lib/src/client/centrifuge_interface.dart +++ b/lib/src/client/centrifuge_interface.dart @@ -3,9 +3,9 @@ import 'dart:async'; import 'package:centrifuge_dart/centrifuge.dart'; -import 'package:centrifuge_dart/src/model/event_stream.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; +import 'package:centrifuge_dart/src/model/pushes_stream.dart'; /// Centrifuge client interface. abstract interface class ICentrifuge @@ -72,8 +72,8 @@ abstract interface class ICentrifugeAsyncMessageSender { /// Centrifuge event receiver interface. abstract interface class ICentrifugeEventReceiver { - /// Stream of centrifuge events. - abstract final CentrifugeEventStream events; + /// Stream of received pushes from Centrifugo server for a channel. + abstract final CentrifugePushesStream stream; } /// Centrifuge client subscriptions manager interface. diff --git a/lib/src/model/channel_presence.dart b/lib/src/model/channel_presence.dart index 72fb37e..3253232 100644 --- a/lib/src/model/channel_presence.dart +++ b/lib/src/model/channel_presence.dart @@ -1,8 +1,8 @@ -import 'package:centrifuge_dart/src/model/channel_event.dart'; +import 'package:centrifuge_dart/src/model/channel_push.dart'; import 'package:centrifuge_dart/src/model/client_info.dart'; import 'package:meta/meta.dart'; -/// {@template channel_presence_event} +/// {@template channel_presence} /// Channel presence. /// Join / Leave events. /// {@endtemplate} @@ -10,9 +10,9 @@ import 'package:meta/meta.dart'; /// {@subCategory Channel} /// {@subCategory Presence} @immutable -sealed class CentrifugeChannelPresenceEvent extends CentrifugeChannelEvent { - /// {@macro channel_presence_event} - const CentrifugeChannelPresenceEvent({ +sealed class CentrifugeChannelPresence extends CentrifugeChannelPush { + /// {@macro channel_presence} + const CentrifugeChannelPresence({ required super.channel, required this.info, }); @@ -27,14 +27,17 @@ sealed class CentrifugeChannelPresenceEvent extends CentrifugeChannelEvent { abstract final bool isLeave; } -/// {@macro channel_presence_event} -final class CentrifugeJoinEvent extends CentrifugeChannelPresenceEvent { - /// {@macro channel_presence_event} - const CentrifugeJoinEvent({ +/// {@macro channel_presence} +final class CentrifugeJoin extends CentrifugeChannelPresence { + /// {@macro channel_presence} + const CentrifugeJoin({ required super.channel, required super.info, }); + @override + String get type => 'join'; + @override bool get isJoin => true; @@ -42,14 +45,17 @@ final class CentrifugeJoinEvent extends CentrifugeChannelPresenceEvent { bool get isLeave => false; } -/// {@macro channel_presence_event} -final class CentrifugeLeaveEvent extends CentrifugeChannelPresenceEvent { - /// {@macro channel_presence_event} - const CentrifugeLeaveEvent({ +/// {@macro channel_presence} +final class CentrifugeLeave extends CentrifugeChannelPresence { + /// {@macro channel_presence} + const CentrifugeLeave({ required super.channel, required super.info, }); + @override + String get type => 'leave'; + @override bool get isJoin => false; diff --git a/lib/src/model/channel_event.dart b/lib/src/model/channel_push.dart similarity index 72% rename from lib/src/model/channel_event.dart rename to lib/src/model/channel_push.dart index 955de29..6285b0c 100644 --- a/lib/src/model/channel_event.dart +++ b/lib/src/model/channel_push.dart @@ -3,9 +3,9 @@ import 'package:centrifuge_dart/src/model/event.dart'; /// {@template centrifuge_channel_event} /// Base class for all channel events. /// {@endtemplate} -abstract base class CentrifugeChannelEvent extends CentrifugeEvent { +abstract base class CentrifugeChannelPush extends CentrifugeEvent { /// {@template centrifuge_channel_event} - const CentrifugeChannelEvent({ + const CentrifugeChannelPush({ required this.channel, }); diff --git a/lib/src/model/event.dart b/lib/src/model/event.dart index 9bf7c05..4bd5169 100644 --- a/lib/src/model/event.dart +++ b/lib/src/model/event.dart @@ -7,4 +7,7 @@ import 'package:meta/meta.dart'; abstract base class CentrifugeEvent { /// {@template centrifuge_event} const CentrifugeEvent(); + + /// Event type. + abstract final String type; } diff --git a/lib/src/model/event_stream.dart b/lib/src/model/event_stream.dart deleted file mode 100644 index 9941aad..0000000 --- a/lib/src/model/event_stream.dart +++ /dev/null @@ -1,39 +0,0 @@ -import 'dart:async'; - -import 'package:centrifuge_dart/src/model/channel_presence.dart'; -import 'package:centrifuge_dart/src/model/event.dart'; -import 'package:centrifuge_dart/src/model/publication.dart'; - -/// Stream of received events. -/// {@category Entity} -/// {@subCategory Event} -/// {@subCategory Channel} -final class CentrifugeEventStream extends StreamView { - /// Stream of received events. - CentrifugeEventStream(super.stream); - - /// Publications stream. - late final Stream publications = - whereType(); - - /// Stream of presence (join & leave) events. - late final Stream presenceEvents = - whereType(); - - /// Join events - late final Stream joinEvents = - whereType(); - - /// Leave events - late final Stream leaveEvents = - whereType(); - - /// Filtered stream of data of [CentrifugeEvent]. - Stream whereType() => - transform(StreamTransformer.fromHandlers( - handleData: (data, sink) => switch (data) { - T valid => sink.add(valid), - _ => null, - }, - )).asBroadcastStream(); -} diff --git a/lib/src/model/publication.dart b/lib/src/model/publication.dart index 89193fa..20c1e1b 100644 --- a/lib/src/model/publication.dart +++ b/lib/src/model/publication.dart @@ -1,4 +1,4 @@ -import 'package:centrifuge_dart/src/model/channel_event.dart'; +import 'package:centrifuge_dart/src/model/channel_push.dart'; import 'package:centrifuge_dart/src/model/client_info.dart'; import 'package:fixnum/fixnum.dart' as fixnum; import 'package:meta/meta.dart'; @@ -8,7 +8,7 @@ import 'package:meta/meta.dart'; /// {@endtemplate} /// {@category Entity} @immutable -final class CentrifugePublication extends CentrifugeChannelEvent { +final class CentrifugePublication extends CentrifugeChannelPush { /// {@macro publication} const CentrifugePublication({ required super.channel, @@ -18,6 +18,9 @@ final class CentrifugePublication extends CentrifugeChannelEvent { this.tags, }); + @override + String get type => 'publication'; + /// Publication payload final List data; diff --git a/lib/src/model/pushes_stream.dart b/lib/src/model/pushes_stream.dart new file mode 100644 index 0000000..3ad7269 --- /dev/null +++ b/lib/src/model/pushes_stream.dart @@ -0,0 +1,42 @@ +import 'dart:async'; + +import 'package:centrifuge_dart/src/model/channel_presence.dart'; +import 'package:centrifuge_dart/src/model/channel_push.dart'; +import 'package:centrifuge_dart/src/model/event.dart'; +import 'package:centrifuge_dart/src/model/publication.dart'; + +/// Stream of received pushes from Centrifugo server for a channel. +/// {@category Entity} +/// {@subCategory Event} +/// {@subCategory Channel} +final class CentrifugePushesStream extends StreamView { + /// Stream of received events. + CentrifugePushesStream({ + required Stream pushes, + required this.publications, + required this.presenceEvents, + required this.joinEvents, + required this.leaveEvents, + }) : super(pushes); + + /// Publications stream. + final Stream publications; + + /// Stream of presence (join & leave) events. + final Stream presenceEvents; + + /// Join events + final Stream joinEvents; + + /// Leave events + final Stream leaveEvents; + + /// Filtered stream of data of [CentrifugeEvent]. + Stream whereType() => + transform(StreamTransformer.fromHandlers( + handleData: (data, sink) => switch (data) { + T valid => sink.add(valid), + _ => null, + }, + )).asBroadcastStream(); +} diff --git a/lib/src/model/subscribe_event.dart b/lib/src/model/subscribe_event.dart new file mode 100644 index 0000000..f8f68c5 --- /dev/null +++ b/lib/src/model/subscribe_event.dart @@ -0,0 +1,12 @@ +import 'package:centrifuge_dart/src/model/channel_push.dart'; + +/// {@macro subscribe_event} +final class CentrifugeSubscribeEvent extends CentrifugeChannelPush { + /// {@macro subscribe_event} + const CentrifugeSubscribeEvent({ + required super.channel, + }); + + @override + String get type => 'subscribe'; +} diff --git a/lib/src/subscription/client_subscription_impl.dart b/lib/src/subscription/client_subscription_impl.dart index 93b4429..169a525 100644 --- a/lib/src/subscription/client_subscription_impl.dart +++ b/lib/src/subscription/client_subscription_impl.dart @@ -2,12 +2,13 @@ import 'dart:async'; import 'package:centrifuge_dart/centrifuge.dart'; import 'package:centrifuge_dart/src/client/disconnect_code.dart'; -import 'package:centrifuge_dart/src/model/channel_event.dart'; +import 'package:centrifuge_dart/src/model/channel_presence.dart'; +import 'package:centrifuge_dart/src/model/channel_push.dart'; import 'package:centrifuge_dart/src/model/event.dart'; -import 'package:centrifuge_dart/src/model/event_stream.dart'; import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; +import 'package:centrifuge_dart/src/model/pushes_stream.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; import 'package:centrifuge_dart/src/subscription/subscription_states_stream.dart'; import 'package:centrifuge_dart/src/transport/transport_interface.dart'; @@ -23,6 +24,7 @@ import 'package:stack_trace/stack_trace.dart' as st; final class CentrifugeClientSubscriptionImpl extends CentrifugeClientSubscriptionBase with + CentrifugeClientSubscriptionEventReceiverMixin, CentrifugeClientSubscriptionErrorsMixin, CentrifugeClientSubscriptionSubscribeMixin, CentrifugeClientSubscriptionPublishingMixin, @@ -71,16 +73,6 @@ abstract base class CentrifugeClientSubscriptionBase /// {@nodoc} final CentrifugeSubscriptionConfig _config; - @protected - @nonVirtual - final StreamController _eventsController = - StreamController.broadcast(); - - @override - @nonVirtual - late final CentrifugeEventStream events = - CentrifugeEventStream(_eventsController.stream); - /// Init subscription. /// {@nodoc} @protected @@ -89,16 +81,6 @@ abstract base class CentrifugeClientSubscriptionBase _state = CentrifugeSubscriptionState.unsubscribed( since: _config.since, code: 0, reason: 'initial state'); _offset = _config.since?.offset ?? fixnum.Int64.ZERO; - _transport.events.addListener(_onEvent); - } - - /// Router for all events. - /// {@nodoc} - @protected - @mustCallSuper - void _onEvent(CentrifugeEvent event) { - if (event is! CentrifugeChannelEvent || event.channel != channel) return; - _eventsController.add(event); } /// Subscription has 3 states: @@ -130,9 +112,8 @@ abstract base class CentrifugeClientSubscriptionBase /// Notify about new publication. /// {@nodoc} - @internal @nonVirtual - void handlePublication(CentrifugePublication publication) { + void _handlePublication(CentrifugePublication publication) { final offset = publication.offset; if (offset != null && offset > _offset) _offset = offset; } @@ -140,8 +121,87 @@ abstract base class CentrifugeClientSubscriptionBase /// {@nodoc} @internal @mustCallSuper + Future close() async {} +} + +/// Mixin responsible for event receiving and distribution by controllers +/// and streams to subscribers. +/// {@nodoc} +base mixin CentrifugeClientSubscriptionEventReceiverMixin + on CentrifugeClientSubscriptionBase { + @protected + @nonVirtual + final StreamController _pushController = + StreamController.broadcast(); + + @protected + @nonVirtual + final StreamController _publicationsController = + StreamController.broadcast(); + + @protected + @nonVirtual + final StreamController _joinController = + StreamController.broadcast(); + + @protected + @nonVirtual + final StreamController _leaveController = + StreamController.broadcast(); + + @protected + @nonVirtual + final StreamController _presenceController = + StreamController.broadcast(); + + @override + @nonVirtual + late final CentrifugePushesStream stream = CentrifugePushesStream( + pushes: _pushController.stream, + publications: _publicationsController.stream, + presenceEvents: _presenceController.stream, + joinEvents: _joinController.stream, + leaveEvents: _leaveController.stream, + ); + + @override + void _initSubscription() { + super._initSubscription(); + } + + /// Handle push event from server for the specific channel. + /// Called from `CentrifugeClientSubscriptionsManager.onPush` + /// {@nodoc} + @internal + @nonVirtual + void onPush(CentrifugeChannelPush push) { + // This is a push to a channel. + _pushController.add(push); + switch (push) { + case CentrifugePublication publication: + _handlePublication(publication); + _publicationsController.add(publication); + case CentrifugeJoin join: + _presenceController.add(join); + _joinController.add(join); + case CentrifugeLeave leave: + _presenceController.add(leave); + _leaveController.add(leave); + } + } + + @override Future close() async { - await _eventsController.close(); + await super.close(); + for (final controller in >[ + _pushController, + _publicationsController, + _joinController, + _leaveController, + _presenceController, + ]) { + controller.close().ignore(); + } } } @@ -220,7 +280,7 @@ base mixin CentrifugeClientSubscriptionSubscribeMixin ttl: subscribed.ttl, )); if (subscribed.publications.isNotEmpty) - subscribed.publications.forEach(handlePublication); + subscribed.publications.forEach(_handlePublication); if (subscribed.expires) _setRefreshTimer(subscribed.ttl); } on CentrifugeException catch (error, stackTrace) { unsubscribe(0, 'error while subscribing').ignore(); diff --git a/lib/src/subscription/client_subscription_manager.dart b/lib/src/subscription/client_subscription_manager.dart index ce29f65..c2c6f91 100644 --- a/lib/src/subscription/client_subscription_manager.dart +++ b/lib/src/subscription/client_subscription_manager.dart @@ -1,5 +1,6 @@ import 'dart:collection'; +import 'package:centrifuge_dart/src/model/channel_push.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; import 'package:centrifuge_dart/src/subscription/client_subscription_impl.dart'; import 'package:centrifuge_dart/src/subscription/subscription.dart'; @@ -122,6 +123,12 @@ final class ClientSubscriptionManager { _channelSubscriptions.clear(); } + /// Handle push event from server for the specific channel. + /// {@nodoc} + @internal + void onPush(CentrifugeChannelPush push) => + _channelSubscriptions[push.channel]?.onPush(push); + /// Get subscription to the channel /// from internal registry or null if not found. /// diff --git a/lib/src/subscription/subscription.dart b/lib/src/subscription/subscription.dart index 7f6587f..3a0f93a 100644 --- a/lib/src/subscription/subscription.dart +++ b/lib/src/subscription/subscription.dart @@ -1,10 +1,10 @@ import 'dart:async'; -import 'package:centrifuge_dart/src/model/event_stream.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; import 'package:centrifuge_dart/src/model/history.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; +import 'package:centrifuge_dart/src/model/pushes_stream.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; import 'package:centrifuge_dart/src/subscription/subscription_state.dart'; import 'package:centrifuge_dart/src/subscription/subscription_states_stream.dart'; @@ -85,9 +85,8 @@ abstract interface class CentrifugeClientSubscription /// Stream of subscription states. abstract final CentrifugeSubscriptionStateStream states; - /// Stream of events (publications, join/leave messages and etc) at - /// this subscription. - abstract final CentrifugeEventStream events; + /// Stream of received pushes from Centrifugo server for a channel. + abstract final CentrifugePushesStream stream; /// Errors stream. abstract final Stream< diff --git a/lib/src/transport/transport_protobuf_codec.dart b/lib/src/transport/transport_protobuf_codec.dart index c45f5de..9438b46 100644 --- a/lib/src/transport/transport_protobuf_codec.dart +++ b/lib/src/transport/transport_protobuf_codec.dart @@ -1,6 +1,7 @@ import 'dart:convert'; import 'package:centrifuge_dart/src/model/protobuf/client.pb.dart' as pb; +import 'package:centrifuge_dart/src/util/logger.dart' as logger; import 'package:meta/meta.dart'; import 'package:protobuf/protobuf.dart' as pb; @@ -50,9 +51,17 @@ final class TransportProtobufDecoder Iterable convert(List input) sync* { final reader = pb.CodedBufferReader(input); while (!reader.isAtEnd()) { - final reply = pb.Reply(); - reader.readMessage(reply, pb.ExtensionRegistry.EMPTY); - yield reply; + try { + final reply = pb.Reply(); + reader.readMessage(reply, pb.ExtensionRegistry.EMPTY); + yield reply; + } on Object catch (error, stackTrace) { + logger.warning( + error, + stackTrace, + 'Failed to decode reply: $error', + ); + } } } } diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index 61ad9ab..53b9491 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -520,7 +520,7 @@ base mixin CentrifugeWSPBHandlerMixin void _handleWebSocketMessage(List response) { final replies = _replyDecoder.convert(response); for (final reply in replies) { - if (reply.id > 0) { + if (reply.hasId() && reply.id > 0) { logger.fine('Reply for command #${reply.id} received'); _completeReply(reply); } else if (reply.hasPush()) { @@ -546,6 +546,10 @@ base mixin CentrifugeWSPBHandlerMixin } } + /// Push can be sent to a client as part of Reply in case of bidirectional + /// transport or without additional wrapping in case of unidirectional + /// transports. ProtocolVersion2 uses channel and one of the possible concrete + /// push messages. @protected @nonVirtual @pragma('vm:prefer-inline') @@ -555,14 +559,14 @@ base mixin CentrifugeWSPBHandlerMixin events.notify($publicationDecode(push.channel)(push.pub)); } else if (push.hasJoin()) { events.notify( - CentrifugeJoinEvent( + CentrifugeJoin( channel: push.channel, info: $decodeClientInfo(push.join.info), ), ); } else if (push.hasLeave()) { events.notify( - CentrifugeLeaveEvent( + CentrifugeLeave( channel: push.channel, info: $decodeClientInfo(push.join.info), ),