diff --git a/lib/src/client/centrifuge.dart b/lib/src/client/centrifuge.dart index 94ee1b2..038b465 100644 --- a/lib/src/client/centrifuge.dart +++ b/lib/src/client/centrifuge.dart @@ -7,12 +7,18 @@ import 'package:centrifuge_dart/src/client/state.dart'; import 'package:centrifuge_dart/src/client/states_stream.dart'; import 'package:centrifuge_dart/src/model/channel_presence.dart'; import 'package:centrifuge_dart/src/model/channel_push.dart'; +import 'package:centrifuge_dart/src/model/connect.dart'; +import 'package:centrifuge_dart/src/model/disconnect.dart'; import 'package:centrifuge_dart/src/model/event.dart'; import 'package:centrifuge_dart/src/model/exception.dart'; +import 'package:centrifuge_dart/src/model/message.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; import 'package:centrifuge_dart/src/model/publication.dart'; import 'package:centrifuge_dart/src/model/pushes_stream.dart'; +import 'package:centrifuge_dart/src/model/refresh.dart'; +import 'package:centrifuge_dart/src/model/subscribe.dart'; +import 'package:centrifuge_dart/src/model/unsubscribe.dart'; import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart'; import 'package:centrifuge_dart/src/subscription/subscription.dart'; import 'package:centrifuge_dart/src/subscription/subscription_config.dart'; @@ -119,6 +125,11 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase { final StreamController _publicationsController = StreamController.broadcast(); + @protected + @nonVirtual + final StreamController _messagesController = + StreamController.broadcast(); + @protected @nonVirtual final StreamController _joinController = @@ -139,6 +150,7 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase { late final CentrifugePushesStream stream = CentrifugePushesStream( pushes: _pushController.stream, publications: _publicationsController.stream, + messages: _messagesController.stream, presenceEvents: _presenceController.stream, joinEvents: _joinController.stream, leaveEvents: _leaveController.stream, @@ -164,12 +176,24 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase { switch (event) { case CentrifugePublication publication: _publicationsController.add(publication); + case CentrifugeMessage message: + _messagesController.add(message); case CentrifugeJoin join: _presenceController.add(join); _joinController.add(join); case CentrifugeLeave leave: _presenceController.add(leave); _leaveController.add(leave); + case CentrifugeSubscribe _: + break; + case CentrifugeUnsubscribe _: + break; + case CentrifugeConnect _: + break; + case CentrifugeDisconnect _: + break; + case CentrifugeRefresh _: + break; } } @@ -180,6 +204,7 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase { for (final controller in >[ _pushController, _publicationsController, + _messagesController, _joinController, _leaveController, _presenceController, diff --git a/lib/src/model/pushes_stream.dart b/lib/src/model/pushes_stream.dart index 3ad7269..a8f5af7 100644 --- a/lib/src/model/pushes_stream.dart +++ b/lib/src/model/pushes_stream.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:centrifuge_dart/src/model/channel_presence.dart'; import 'package:centrifuge_dart/src/model/channel_push.dart'; import 'package:centrifuge_dart/src/model/event.dart'; +import 'package:centrifuge_dart/src/model/message.dart'; import 'package:centrifuge_dart/src/model/publication.dart'; /// Stream of received pushes from Centrifugo server for a channel. @@ -14,6 +15,7 @@ final class CentrifugePushesStream extends StreamView { CentrifugePushesStream({ required Stream pushes, required this.publications, + required this.messages, required this.presenceEvents, required this.joinEvents, required this.leaveEvents, @@ -22,6 +24,9 @@ final class CentrifugePushesStream extends StreamView { /// Publications stream. final Stream publications; + /// Messages stream. + final Stream messages; + /// Stream of presence (join & leave) events. final Stream presenceEvents; diff --git a/lib/src/subscription/client_subscription_impl.dart b/lib/src/subscription/client_subscription_impl.dart index 169a525..de8b84a 100644 --- a/lib/src/subscription/client_subscription_impl.dart +++ b/lib/src/subscription/client_subscription_impl.dart @@ -4,12 +4,18 @@ import 'package:centrifuge_dart/centrifuge.dart'; import 'package:centrifuge_dart/src/client/disconnect_code.dart'; import 'package:centrifuge_dart/src/model/channel_presence.dart'; import 'package:centrifuge_dart/src/model/channel_push.dart'; +import 'package:centrifuge_dart/src/model/connect.dart'; +import 'package:centrifuge_dart/src/model/disconnect.dart'; import 'package:centrifuge_dart/src/model/event.dart'; import 'package:centrifuge_dart/src/model/history.dart'; +import 'package:centrifuge_dart/src/model/message.dart'; import 'package:centrifuge_dart/src/model/presence.dart'; import 'package:centrifuge_dart/src/model/presence_stats.dart'; import 'package:centrifuge_dart/src/model/pushes_stream.dart'; +import 'package:centrifuge_dart/src/model/refresh.dart'; import 'package:centrifuge_dart/src/model/stream_position.dart'; +import 'package:centrifuge_dart/src/model/subscribe.dart'; +import 'package:centrifuge_dart/src/model/unsubscribe.dart'; import 'package:centrifuge_dart/src/subscription/subscription_states_stream.dart'; import 'package:centrifuge_dart/src/transport/transport_interface.dart'; import 'package:centrifuge_dart/src/util/event_queue.dart'; @@ -139,6 +145,11 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin final StreamController _publicationsController = StreamController.broadcast(); + @protected + @nonVirtual + final StreamController _messagesController = + StreamController.broadcast(); + @protected @nonVirtual final StreamController _joinController = @@ -159,6 +170,7 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin late final CentrifugePushesStream stream = CentrifugePushesStream( pushes: _pushController.stream, publications: _publicationsController.stream, + messages: _messagesController.stream, presenceEvents: _presenceController.stream, joinEvents: _joinController.stream, leaveEvents: _leaveController.stream, @@ -181,12 +193,24 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin case CentrifugePublication publication: _handlePublication(publication); _publicationsController.add(publication); + case CentrifugeMessage message: + _messagesController.add(message); case CentrifugeJoin join: _presenceController.add(join); _joinController.add(join); case CentrifugeLeave leave: _presenceController.add(leave); _leaveController.add(leave); + case CentrifugeSubscribe _: + break; + case CentrifugeUnsubscribe _: + break; + case CentrifugeConnect _: + break; + case CentrifugeDisconnect _: + break; + case CentrifugeRefresh _: + break; } } @@ -196,6 +220,7 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin for (final controller in >[ _pushController, _publicationsController, + _messagesController, _joinController, _leaveController, _presenceController, diff --git a/lib/src/transport/ws_protobuf_transport.dart b/lib/src/transport/ws_protobuf_transport.dart index dcdaaaf..6cfb0bb 100644 --- a/lib/src/transport/ws_protobuf_transport.dart +++ b/lib/src/transport/ws_protobuf_transport.dart @@ -563,6 +563,14 @@ base mixin CentrifugeWSPBHandlerMixin final now = DateTime.now(); if (push.hasPub()) { events.notify($publicationDecode(push.channel)(push.pub)); + } else if (push.hasMessage()) { + events.notify( + CentrifugeMessage( + timestamp: now, + channel: push.channel, + data: push.message.hasData() ? push.message.data : [], + ), + ); } else if (push.hasJoin()) { events.notify( CentrifugeJoin( @@ -607,25 +615,6 @@ base mixin CentrifugeWSPBHandlerMixin reason: push.unsubscribe.hasReason() ? push.unsubscribe.reason : 'OK', ), ); - } else if (push.hasMessage()) { - events.notify( - CentrifugeMessage( - timestamp: now, - channel: push.channel, - data: push.message.hasData() ? push.message.data : [], - ), - ); - } else if (push.hasDisconnect()) { - events.notify( - CentrifugeDisconnect( - timestamp: now, - channel: push.channel, - code: push.disconnect.hasCode() ? push.disconnect.code : 0, - reason: push.disconnect.hasReason() ? push.disconnect.reason : 'OK', - reconnect: - push.disconnect.hasReconnect() && push.disconnect.reconnect, - ), - ); } else if (push.hasConnect()) { final connect = push.connect; final expires = @@ -646,6 +635,17 @@ base mixin CentrifugeWSPBHandlerMixin session: connect.hasSession() ? connect.session : null, ), ); + } else if (push.hasDisconnect()) { + events.notify( + CentrifugeDisconnect( + timestamp: now, + channel: push.channel, + code: push.disconnect.hasCode() ? push.disconnect.code : 0, + reason: push.disconnect.hasReason() ? push.disconnect.reason : 'OK', + reconnect: + push.disconnect.hasReconnect() && push.disconnect.reconnect, + ), + ); } else if (push.hasRefresh()) { events.notify(CentrifugeRefresh( timestamp: now,