Skip to content

Commit

Permalink
chore: Refactor ChatRepositorySpinifyImpl to use lazy loading for eve…
Browse files Browse the repository at this point in the history
…nt streams
  • Loading branch information
PlugFox committed May 23, 2024
1 parent 2772bd2 commit 3f8a09f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
4 changes: 2 additions & 2 deletions example/lib/src/feature/chat/data/chat_repository.dart
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ final class ChatRepositorySpinifyImpl implements IChatRepository {
decoder = const PlainMessageDecoder();
}

return _spinify.stream.publications
.where((event) => event.channel == channel)
return _spinify.stream
.publication(channel: channel)
.map<List<int>>((event) => event.data)
.map<Message>(decoder.convert)
.handleError(ignoreErrors);
Expand Down
27 changes: 16 additions & 11 deletions lib/src/model/channel_events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,36 @@ import 'channel_event.dart';
extension type ChannelEvents<T extends SpinifyChannelEvent>(Stream<T> stream)
implements Stream<T> {
/// Stream of publication events.
ChannelEvents<SpinifyPublication> get publication =>
filter<SpinifyPublication>();
ChannelEvents<SpinifyPublication> publication({String? channel}) =>
filter<SpinifyPublication>(channel: channel);

/// Stream of presence events.
ChannelEvents<SpinifyPresence> get presence => filter<SpinifyPresence>();
ChannelEvents<SpinifyPresence> presence({String? channel}) =>
filter<SpinifyPresence>(channel: channel);

/// Stream of unsubscribe events.
ChannelEvents<SpinifyUnsubscribe> get unsubscribe =>
filter<SpinifyUnsubscribe>();
ChannelEvents<SpinifyUnsubscribe> unsubscribe({String? channel}) =>
filter<SpinifyUnsubscribe>(channel: channel);

/// Stream of message events.
ChannelEvents<SpinifyMessage> get message => filter<SpinifyMessage>();
ChannelEvents<SpinifyMessage> message({String? channel}) =>
filter<SpinifyMessage>(channel: channel);

/// Stream of subscribe events.
ChannelEvents<SpinifySubscribe> get subscribe => filter<SpinifySubscribe>();
ChannelEvents<SpinifySubscribe> subscribe({String? channel}) =>
filter<SpinifySubscribe>(channel: channel);

/// Stream of connect events.
ChannelEvents<SpinifyConnect> get connect => filter<SpinifyConnect>();
ChannelEvents<SpinifyConnect> connect({String? channel}) =>
filter<SpinifyConnect>(channel: channel);

/// Stream of disconnect events.
ChannelEvents<SpinifyDisconnect> get disconnect =>
filter<SpinifyDisconnect>();
ChannelEvents<SpinifyDisconnect> disconnect({String? channel}) =>
filter<SpinifyDisconnect>(channel: channel);

/// Stream of refresh events.
ChannelEvents<SpinifyRefresh> get refresh => filter<SpinifyRefresh>();
ChannelEvents<SpinifyRefresh> refresh({String? channel}) =>
filter<SpinifyRefresh>(channel: channel);

/// Filtered stream of data of [SpinifyChannelEvent].
ChannelEvents<S> filter<S extends SpinifyChannelEvent>({String? channel}) =>
Expand Down

0 comments on commit 3f8a09f

Please sign in to comment.