Skip to content

Commit

Permalink
Update centrifuge pushes
Browse files Browse the repository at this point in the history
  • Loading branch information
PlugFox committed Aug 1, 2023
1 parent 7ae1846 commit 8367d37
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 19 deletions.
25 changes: 25 additions & 0 deletions lib/src/client/centrifuge.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ import 'package:centrifuge_dart/src/client/state.dart';
import 'package:centrifuge_dart/src/client/states_stream.dart';
import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/connect.dart';
import 'package:centrifuge_dart/src/model/disconnect.dart';
import 'package:centrifuge_dart/src/model/event.dart';
import 'package:centrifuge_dart/src/model/exception.dart';
import 'package:centrifuge_dart/src/model/message.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
import 'package:centrifuge_dart/src/model/publication.dart';
import 'package:centrifuge_dart/src/model/pushes_stream.dart';
import 'package:centrifuge_dart/src/model/refresh.dart';
import 'package:centrifuge_dart/src/model/subscribe.dart';
import 'package:centrifuge_dart/src/model/unsubscribe.dart';
import 'package:centrifuge_dart/src/subscription/client_subscription_manager.dart';
import 'package:centrifuge_dart/src/subscription/subscription.dart';
import 'package:centrifuge_dart/src/subscription/subscription_config.dart';
Expand Down Expand Up @@ -119,6 +125,11 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase {
final StreamController<CentrifugePublication> _publicationsController =
StreamController<CentrifugePublication>.broadcast();

@protected
@nonVirtual
final StreamController<CentrifugeMessage> _messagesController =
StreamController<CentrifugeMessage>.broadcast();

@protected
@nonVirtual
final StreamController<CentrifugeJoin> _joinController =
Expand All @@ -139,6 +150,7 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase {
late final CentrifugePushesStream stream = CentrifugePushesStream(
pushes: _pushController.stream,
publications: _publicationsController.stream,
messages: _messagesController.stream,
presenceEvents: _presenceController.stream,
joinEvents: _joinController.stream,
leaveEvents: _leaveController.stream,
Expand All @@ -164,12 +176,24 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase {
switch (event) {
case CentrifugePublication publication:
_publicationsController.add(publication);
case CentrifugeMessage message:
_messagesController.add(message);
case CentrifugeJoin join:
_presenceController.add(join);
_joinController.add(join);
case CentrifugeLeave leave:
_presenceController.add(leave);
_leaveController.add(leave);
case CentrifugeSubscribe _:
break;
case CentrifugeUnsubscribe _:
break;
case CentrifugeConnect _:
break;
case CentrifugeDisconnect _:
break;
case CentrifugeRefresh _:
break;
}
}

Expand All @@ -180,6 +204,7 @@ base mixin CentrifugeEventReceiverMixin on CentrifugeBase {
for (final controller in <StreamSink<CentrifugeEvent>>[
_pushController,
_publicationsController,
_messagesController,
_joinController,
_leaveController,
_presenceController,
Expand Down
5 changes: 5 additions & 0 deletions lib/src/model/pushes_stream.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/event.dart';
import 'package:centrifuge_dart/src/model/message.dart';
import 'package:centrifuge_dart/src/model/publication.dart';

/// Stream of received pushes from Centrifugo server for a channel.
Expand All @@ -14,6 +15,7 @@ final class CentrifugePushesStream extends StreamView<CentrifugeChannelPush> {
CentrifugePushesStream({
required Stream<CentrifugeChannelPush> pushes,
required this.publications,
required this.messages,
required this.presenceEvents,
required this.joinEvents,
required this.leaveEvents,
Expand All @@ -22,6 +24,9 @@ final class CentrifugePushesStream extends StreamView<CentrifugeChannelPush> {
/// Publications stream.
final Stream<CentrifugePublication> publications;

/// Messages stream.
final Stream<CentrifugeMessage> messages;

/// Stream of presence (join & leave) events.
final Stream<CentrifugeChannelPresence> presenceEvents;

Expand Down
25 changes: 25 additions & 0 deletions lib/src/subscription/client_subscription_impl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@ import 'package:centrifuge_dart/centrifuge.dart';
import 'package:centrifuge_dart/src/client/disconnect_code.dart';
import 'package:centrifuge_dart/src/model/channel_presence.dart';
import 'package:centrifuge_dart/src/model/channel_push.dart';
import 'package:centrifuge_dart/src/model/connect.dart';
import 'package:centrifuge_dart/src/model/disconnect.dart';
import 'package:centrifuge_dart/src/model/event.dart';
import 'package:centrifuge_dart/src/model/history.dart';
import 'package:centrifuge_dart/src/model/message.dart';
import 'package:centrifuge_dart/src/model/presence.dart';
import 'package:centrifuge_dart/src/model/presence_stats.dart';
import 'package:centrifuge_dart/src/model/pushes_stream.dart';
import 'package:centrifuge_dart/src/model/refresh.dart';
import 'package:centrifuge_dart/src/model/stream_position.dart';
import 'package:centrifuge_dart/src/model/subscribe.dart';
import 'package:centrifuge_dart/src/model/unsubscribe.dart';
import 'package:centrifuge_dart/src/subscription/subscription_states_stream.dart';
import 'package:centrifuge_dart/src/transport/transport_interface.dart';
import 'package:centrifuge_dart/src/util/event_queue.dart';
Expand Down Expand Up @@ -139,6 +145,11 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin
final StreamController<CentrifugePublication> _publicationsController =
StreamController<CentrifugePublication>.broadcast();

@protected
@nonVirtual
final StreamController<CentrifugeMessage> _messagesController =
StreamController<CentrifugeMessage>.broadcast();

@protected
@nonVirtual
final StreamController<CentrifugeJoin> _joinController =
Expand All @@ -159,6 +170,7 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin
late final CentrifugePushesStream stream = CentrifugePushesStream(
pushes: _pushController.stream,
publications: _publicationsController.stream,
messages: _messagesController.stream,
presenceEvents: _presenceController.stream,
joinEvents: _joinController.stream,
leaveEvents: _leaveController.stream,
Expand All @@ -181,12 +193,24 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin
case CentrifugePublication publication:
_handlePublication(publication);
_publicationsController.add(publication);
case CentrifugeMessage message:
_messagesController.add(message);
case CentrifugeJoin join:
_presenceController.add(join);
_joinController.add(join);
case CentrifugeLeave leave:
_presenceController.add(leave);
_leaveController.add(leave);
case CentrifugeSubscribe _:
break;
case CentrifugeUnsubscribe _:
break;
case CentrifugeConnect _:
break;
case CentrifugeDisconnect _:
break;
case CentrifugeRefresh _:
break;
}
}

Expand All @@ -196,6 +220,7 @@ base mixin CentrifugeClientSubscriptionEventReceiverMixin
for (final controller in <StreamSink<CentrifugeEvent>>[
_pushController,
_publicationsController,
_messagesController,
_joinController,
_leaveController,
_presenceController,
Expand Down
38 changes: 19 additions & 19 deletions lib/src/transport/ws_protobuf_transport.dart
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,14 @@ base mixin CentrifugeWSPBHandlerMixin
final now = DateTime.now();
if (push.hasPub()) {
events.notify($publicationDecode(push.channel)(push.pub));
} else if (push.hasMessage()) {
events.notify(
CentrifugeMessage(
timestamp: now,
channel: push.channel,
data: push.message.hasData() ? push.message.data : <int>[],
),
);
} else if (push.hasJoin()) {
events.notify(
CentrifugeJoin(
Expand Down Expand Up @@ -607,25 +615,6 @@ base mixin CentrifugeWSPBHandlerMixin
reason: push.unsubscribe.hasReason() ? push.unsubscribe.reason : 'OK',
),
);
} else if (push.hasMessage()) {
events.notify(
CentrifugeMessage(
timestamp: now,
channel: push.channel,
data: push.message.hasData() ? push.message.data : <int>[],
),
);
} else if (push.hasDisconnect()) {
events.notify(
CentrifugeDisconnect(
timestamp: now,
channel: push.channel,
code: push.disconnect.hasCode() ? push.disconnect.code : 0,
reason: push.disconnect.hasReason() ? push.disconnect.reason : 'OK',
reconnect:
push.disconnect.hasReconnect() && push.disconnect.reconnect,
),
);
} else if (push.hasConnect()) {
final connect = push.connect;
final expires =
Expand All @@ -646,6 +635,17 @@ base mixin CentrifugeWSPBHandlerMixin
session: connect.hasSession() ? connect.session : null,
),
);
} else if (push.hasDisconnect()) {
events.notify(
CentrifugeDisconnect(
timestamp: now,
channel: push.channel,
code: push.disconnect.hasCode() ? push.disconnect.code : 0,
reason: push.disconnect.hasReason() ? push.disconnect.reason : 'OK',
reconnect:
push.disconnect.hasReconnect() && push.disconnect.reconnect,
),
);
} else if (push.hasRefresh()) {
events.notify(CentrifugeRefresh(
timestamp: now,
Expand Down

0 comments on commit 8367d37

Please sign in to comment.