Skip to content

Commit 696bce0

Browse files
committed
feat(llc): Add message delivery receipts
This commit introduces message delivery receipts, allowing clients to acknowledge when a message has been successfully delivered. Senders can now be notified that their messages have reached the recipient's device. **Key Changes:** * **Delivery Status in `Read` Model:** The `Read` model is extended to include `lastDeliveredAt` and `lastDeliveredMessageId`, enabling tracking of message delivery alongside read status. * **New Event Type:** A `message.delivered` event is introduced to broadcast delivery acknowledgments. * **`ChannelDeliveryReporter`:** A new `ChannelDeliveryReporter` class is added to manage, batch, and throttle the sending of delivery receipts to the backend, ensuring efficient network usage. * **Message Validation Logic:** A `MessageRules` utility class is created to centralize validation logic, determining if a message is eligible for delivery receipts, can be counted as unread, or can be sent. * **API and Model Updates:** New API endpoints (`markChannelsDelivered`) and corresponding data models (`MessageDeliveryInfo`) have been added to support this feature. * **Refactoring:** Logic for determining unread counts and message validity has been refactored from `Channel` and `ChannelClientState` into the new `MessageRules` class for better separation of concerns. Helper extensions have been added to `Read` and `ChannelClientState` to simplify querying for read and delivery statuses.
1 parent 5207134 commit 696bce0

17 files changed

+809
-151
lines changed

packages/stream_chat/lib/src/client/channel.dart

Lines changed: 170 additions & 132 deletions
Large diffs are not rendered by default.
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import 'package:logging/logging.dart';
2+
import 'package:rate_limiter/rate_limiter.dart';
3+
import 'package:stream_chat/src/client/channel.dart';
4+
import 'package:stream_chat/src/core/models/message.dart';
5+
import 'package:stream_chat/src/core/models/message_delivery_info.dart';
6+
import 'package:stream_chat/src/core/util/message_rules.dart';
7+
import 'package:synchronized/synchronized.dart';
8+
9+
/// A callback that sends delivery receipts for multiple channels.
10+
///
11+
/// Each [MessageDeliveryInfo] represents an acknowledgment that the current
12+
/// user has received a message.
13+
typedef MarkChannelsDelivered = Future<void> Function(
14+
Iterable<MessageDeliveryInfo> messages,
15+
);
16+
17+
/// Manages the delivery reporting for channel messages.
18+
///
19+
/// Collects channels that need delivery acknowledgments and efficiently
20+
/// reports them to the server.
21+
class ChannelDeliveryReporter {
22+
/// Creates a new channel delivery reporter.
23+
///
24+
/// The [onMarkChannelsDelivered] callback is invoked when delivery receipts
25+
/// are ready to be sent to the server.
26+
///
27+
/// The [throttleDuration] controls how frequently delivery receipts are sent.
28+
///
29+
/// The optional [logger] logs warnings and errors during operation.
30+
ChannelDeliveryReporter({
31+
Logger? logger,
32+
required this.onMarkChannelsDelivered,
33+
Duration throttleDuration = const Duration(seconds: 1),
34+
}) : _logger = logger,
35+
_markAsDeliveredThrottleDuration = throttleDuration;
36+
37+
final Logger? _logger;
38+
final Duration _markAsDeliveredThrottleDuration;
39+
40+
/// The callback invoked to send delivery receipts.
41+
///
42+
/// Receives delivery receipts acknowledging that messages were received.
43+
final MarkChannelsDelivered onMarkChannelsDelivered;
44+
45+
final _deliveryCandidatesLock = Lock();
46+
final _deliveryCandidates = <String /* cid */, Message /* message */ >{};
47+
48+
/// Submits [channels] for delivery reporting.
49+
///
50+
/// Marks each channel's last message as delivered if it meets the delivery
51+
/// requirements according to [MessageRules.canMarkAsDelivered]. Channels
52+
/// without a valid cid or last message are skipped.
53+
///
54+
/// Typically used after message.new events or initial channel queries. For
55+
/// read/delivered events see [reconcileDelivery], for hidden/left channels
56+
/// see [cancelDelivery].
57+
Future<void> submitForDelivery(Iterable<Channel> channels) async {
58+
await _deliveryCandidatesLock.synchronized(() {
59+
for (final channel in channels) {
60+
final channelCid = channel.cid;
61+
if (channelCid == null) continue;
62+
63+
final lastMessage = channel.state?.lastMessage;
64+
if (lastMessage == null) continue;
65+
66+
// Only submit for delivery if the message can be marked as delivered.
67+
if (!MessageRules.canMarkAsDelivered(lastMessage, channel)) continue;
68+
69+
_logger?.fine(
70+
'Submitted channel $channelCid for delivery '
71+
'(message: ${lastMessage.id})',
72+
);
73+
74+
// Update the latest message for the channel
75+
_deliveryCandidates[channelCid] = lastMessage;
76+
}
77+
});
78+
79+
// Trigger mark channels delivered request
80+
_throttledMarkCandidatesAsDelivered.call();
81+
}
82+
83+
/// Reconciles delivery reporting for [channels] with their current state.
84+
///
85+
/// Re-evaluates whether messages still need to be marked as delivered based
86+
/// on the channel's current state. Stops tracking messages that are already
87+
/// read, delivered, or otherwise don't need delivery reporting.
88+
///
89+
/// This prevents duplicate delivery reports when a message is marked
90+
/// delivered on another device, and avoids unnecessary reports when a user
91+
/// reads a channel (since read supersedes delivered).
92+
///
93+
/// Typically used after message.read or message.delivered events. See
94+
/// [cancelDelivery] to remove channels entirely, or [submitForDelivery]
95+
/// to add new messages.
96+
///
97+
/// ```dart
98+
/// // After a message.read or message.delivered event
99+
/// reporter.reconcileDelivery([channel]);
100+
/// ```
101+
Future<void> reconcileDelivery(Iterable<Channel> channels) async {
102+
return _deliveryCandidatesLock.synchronized(() {
103+
for (final channel in channels) {
104+
final channelCid = channel.cid;
105+
if (channelCid == null) continue;
106+
107+
// Get the existing candidate message
108+
final message = _deliveryCandidates[channelCid];
109+
if (message == null) continue;
110+
111+
// If the message can still be marked as delivered, keep it
112+
if (MessageRules.canMarkAsDelivered(message, channel)) continue;
113+
114+
_logger?.fine(
115+
'Reconciled delivery for channel $channelCid '
116+
'(message: ${message.id}), removing from candidates',
117+
);
118+
119+
// Otherwise, remove it from the candidates
120+
_deliveryCandidates.remove(channelCid);
121+
}
122+
});
123+
}
124+
125+
/// Cancels pending delivery reports for [channels].
126+
///
127+
/// Prevents the specified channels from being marked as delivered. Typically
128+
/// used when channels are hidden, left, or removed from view.
129+
///
130+
/// See [reconcileDelivery] to re-evaluate based on current read/delivered
131+
/// state instead of removing channels entirely.
132+
Future<void> cancelDelivery(Iterable<String> channels) {
133+
return _deliveryCandidatesLock.synchronized(() {
134+
for (final channelCid in channels) {
135+
if (!_deliveryCandidates.containsKey(channelCid)) continue;
136+
137+
final message = _deliveryCandidates.remove(channelCid);
138+
139+
_logger?.fine(
140+
'Canceled delivery for channel $channelCid '
141+
'(message: ${message?.id})',
142+
);
143+
}
144+
});
145+
}
146+
147+
late final _throttledMarkCandidatesAsDelivered = Throttle(
148+
leading: false,
149+
_markCandidatesAsDelivered,
150+
_markAsDeliveredThrottleDuration,
151+
);
152+
153+
static const _maxCandidatesPerBatch = 100;
154+
Future<void> _markCandidatesAsDelivered() async {
155+
// We only process at-most 100 channels at a time to avoid large payloads.
156+
final batch = {..._deliveryCandidates}.entries.take(_maxCandidatesPerBatch);
157+
final messageInfoPayload = batch.map(
158+
(it) => MessageDeliveryInfo(channelCid: it.key, messageId: it.value.id),
159+
);
160+
161+
if (messageInfoPayload.isEmpty) return;
162+
163+
_logger?.info('Marking ${messageInfoPayload.length} channels as delivered');
164+
165+
try {
166+
await onMarkChannelsDelivered(messageInfoPayload);
167+
168+
// Clear the successfully delivered candidates. If a channel's message ID
169+
// has changed since we started delivery, keep it for the next batch.
170+
await _deliveryCandidatesLock.synchronized(() {
171+
for (final messageInfo in messageInfoPayload) {
172+
final deliveredChannelCid = messageInfo.channelCid;
173+
final deliveredMessageId = messageInfo.messageId;
174+
175+
final currentMessage = _deliveryCandidates[deliveredChannelCid];
176+
// Skip removal if a newer message has been added while we were
177+
// processing the current batch.
178+
if (currentMessage?.id != deliveredMessageId) continue;
179+
_deliveryCandidates.remove(deliveredChannelCid);
180+
}
181+
182+
// Schedule the next batch if there are remaining candidates.
183+
if (_deliveryCandidates.isNotEmpty) {
184+
_throttledMarkCandidatesAsDelivered.call();
185+
}
186+
});
187+
} catch (e, stk) {
188+
_logger?.warning('Failed to mark channels as delivered', e, stk);
189+
}
190+
}
191+
192+
/// Cancels all pending delivery reports.
193+
///
194+
/// Typically used when shutting down the reporter or permanently stopping
195+
/// delivery reporting.
196+
void cancel() => _throttledMarkCandidatesAsDelivered.cancel();
197+
}

packages/stream_chat/lib/src/client/client.dart

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import 'package:logging/logging.dart';
66
import 'package:meta/meta.dart';
77
import 'package:rxdart/rxdart.dart';
88
import 'package:stream_chat/src/client/channel.dart';
9+
import 'package:stream_chat/src/client/channel_delivery_reporter.dart';
910
import 'package:stream_chat/src/client/retry_policy.dart';
1011
import 'package:stream_chat/src/core/api/attachment_file_uploader.dart';
1112
import 'package:stream_chat/src/core/api/requests.dart';
@@ -27,6 +28,7 @@ import 'package:stream_chat/src/core/models/event.dart';
2728
import 'package:stream_chat/src/core/models/filter.dart';
2829
import 'package:stream_chat/src/core/models/member.dart';
2930
import 'package:stream_chat/src/core/models/message.dart';
31+
import 'package:stream_chat/src/core/models/message_delivery_info.dart';
3032
import 'package:stream_chat/src/core/models/message_reminder.dart';
3133
import 'package:stream_chat/src/core/models/own_user.dart';
3234
import 'package:stream_chat/src/core/models/poll.dart';
@@ -228,6 +230,15 @@ class StreamChatClient {
228230

229231
StreamSubscription<List<ConnectionStatus>>? _connectionStatusSubscription;
230232

233+
/// Manages delivery receipt reporting for channel messages.
234+
///
235+
/// Collects and batches delivery receipts to acknowledge message delivery
236+
/// to senders across multiple channels.
237+
late final channelDeliveryReporter = ChannelDeliveryReporter(
238+
logger: detachedLogger('🧾'),
239+
onMarkChannelsDelivered: markChannelsDelivered,
240+
);
241+
231242
final _eventController = PublishSubject<Event>();
232243

233244
/// Stream of [Event] coming from [_ws] connection
@@ -776,6 +787,8 @@ class StreamChatClient {
776787
logger.info('Got ${res.channels.length} channels from api');
777788

778789
final updateData = _mapChannelStateToChannel(channels);
790+
// Submit delivery report for the channels fetched in this query.
791+
await channelDeliveryReporter.submitForDelivery(updateData.value);
779792

780793
await chatPersistenceClient?.updateChannelQueries(
781794
filter,
@@ -1661,6 +1674,29 @@ class StreamChatClient {
16611674
/// Mark all channels for this user as read
16621675
Future<EmptyResponse> markAllRead() => _chatApi.channel.markAllRead();
16631676

1677+
/// Sends delivery receipts for the latest messages in multiple channels.
1678+
///
1679+
/// Useful when receiving messages through push notifications where only
1680+
/// channel IDs and message IDs are available, without full channel/message
1681+
/// objects. For in-app message delivery, use [channelDeliveryReporter]
1682+
/// which handles this automatically.
1683+
///
1684+
/// ```dart
1685+
/// // From notification payload
1686+
/// final receipt = MessageDeliveryInfo(
1687+
/// channelCid: notificationData['channel_id'],
1688+
/// messageId: notificationData['message_id'],
1689+
/// );
1690+
/// await client.markChannelsDelivered([receipt]);
1691+
/// ```
1692+
///
1693+
/// Accepts up to 100 channels per call.
1694+
Future<EmptyResponse> markChannelsDelivered(
1695+
Iterable<MessageDeliveryInfo> messages,
1696+
) {
1697+
return _chatApi.channel.markChannelsDelivered(messages);
1698+
}
1699+
16641700
/// Send an event to a particular channel
16651701
Future<EmptyResponse> sendEvent(
16661702
String channelId,
@@ -2097,6 +2133,9 @@ class StreamChatClient {
20972133
Future<void> disconnectUser({bool flushChatPersistence = false}) async {
20982134
logger.info('Disconnecting user : ${state.currentUser?.id}');
20992135

2136+
// Cancelling delivery reporter.
2137+
channelDeliveryReporter.cancel();
2138+
21002139
// closing web-socket connection
21012140
closeConnection();
21022141

@@ -2236,10 +2275,12 @@ class ClientState {
22362275
void _listenAllChannelsRead() {
22372276
_eventsSubscription?.add(
22382277
_client.on(EventType.notificationMarkRead).listen((event) {
2239-
if (event.cid == null) {
2240-
channels.forEach((key, value) {
2241-
value.state?.unreadCount = 0;
2242-
});
2278+
// If a cid is provided, it means it's for a specific channel.
2279+
if (event.cid != null) return;
2280+
2281+
// Update all channels' unread count to 0.
2282+
for (final channel in channels.values) {
2283+
channel.state?.unreadCount = 0;
22432284
}
22442285
}),
22452286
);
@@ -2343,10 +2384,7 @@ class ClientState {
23432384

23442385
/// Adds a list of channels to the current list of cached channels
23452386
void addChannels(Map<String, Channel> channelMap) {
2346-
final newChannels = {
2347-
...channels,
2348-
...channelMap,
2349-
};
2387+
final newChannels = {...channels, ...channelMap};
23502388
channels = newChannels;
23512389
}
23522390

packages/stream_chat/lib/src/core/api/channel_api.dart

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import 'package:stream_chat/src/core/models/channel_state.dart';
88
import 'package:stream_chat/src/core/models/event.dart';
99
import 'package:stream_chat/src/core/models/filter.dart';
1010
import 'package:stream_chat/src/core/models/message.dart';
11+
import 'package:stream_chat/src/core/models/message_delivery_info.dart';
1112

1213
/// Defines the api dedicated to channel operations
1314
class ChannelApi {
@@ -84,10 +85,7 @@ class ChannelApi {
8485

8586
/// Mark all channels for this user as read
8687
Future<EmptyResponse> markAllRead() async {
87-
final response = await _client.post(
88-
'/channels/read',
89-
data: {},
90-
);
88+
final response = await _client.post('/channels/read', data: {});
9189
return EmptyResponse.fromJson(response.data);
9290
}
9391

@@ -395,4 +393,17 @@ class ChannelApi {
395393
);
396394
return PartialUpdateMemberResponse.fromJson(response.data);
397395
}
396+
397+
/// Sends delivery receipts for the latest messages in multiple channels.
398+
///
399+
/// Accepts up to 100 channels per call.
400+
Future<EmptyResponse> markChannelsDelivered(
401+
Iterable<MessageDeliveryInfo> messages,
402+
) async {
403+
final response = await _client.post(
404+
'/channels/delivered',
405+
data: jsonEncode({'latest_delivered_messages': messages}),
406+
);
407+
return EmptyResponse.fromJson(response.data);
408+
}
398409
}

packages/stream_chat/lib/src/core/models/channel_config.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ class ChannelConfig {
2727
this.skipLastMsgUpdateForSystemMsgs = false,
2828
this.userMessageReminders = false,
2929
this.markMessagesPending = false,
30+
this.deliveryEvents = false,
3031
}) : createdAt = createdAt ?? DateTime.now(),
3132
updatedAt = updatedAt ?? DateTime.now();
3233

@@ -95,6 +96,9 @@ class ChannelConfig {
9596
/// Whether pending messages are enabled for this channel.
9697
final bool markMessagesPending;
9798

99+
/// Whether delivery events are enabled for this channel.
100+
final bool deliveryEvents;
101+
98102
/// Serialize to json
99103
Map<String, dynamic> toJson() => _$ChannelConfigToJson(this);
100104
}

packages/stream_chat/lib/src/core/models/channel_config.g.dart

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/stream_chat/lib/src/core/models/channel_model.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,9 @@ extension type const ChannelCapability(String capability) implements String {
357357
/// Ability to receive read events.
358358
static const readEvents = ChannelCapability('read-events');
359359

360+
/// Ability to receive delivery events.
361+
static const deliveryEvents = ChannelCapability('delivery-events');
362+
360363
/// Ability to receive connect events.
361364
static const connectEvents = ChannelCapability('connect-events');
362365

0 commit comments

Comments
 (0)