@@ -180,6 +180,11 @@ data class SendMessageParams(
180
180
)
181
181
182
182
interface MessagesSubscription : Subscription {
183
+ /* *
184
+ * (CHA-M5j)
185
+ * Get the previous messages that were sent to the room before the listener was subscribed.
186
+ * @return paginated result of messages, in newest-to-oldest order.
187
+ */
183
188
suspend fun getPreviousMessages (start : Long? = null, end : Long? = null, limit : Int = 100): PaginatedResult <Message >
184
189
}
185
190
@@ -195,6 +200,18 @@ internal class DefaultMessagesSubscription(
195
200
196
201
override suspend fun getPreviousMessages (start : Long? , end : Long? , limit : Int ): PaginatedResult <Message > {
197
202
val fromSerial = fromSerialProvider().await()
203
+
204
+ // (CHA-M5j)
205
+ if (end != null && end > Timeserial .parse(fromSerial).timestamp) {
206
+ throw AblyException .fromErrorInfo(
207
+ ErrorInfo (
208
+ " The `end` parameter is specified and is more recent than the subscription point timeserial" ,
209
+ HttpStatusCodes .BadRequest ,
210
+ ErrorCodes .BadRequest ,
211
+ ),
212
+ )
213
+ }
214
+
198
215
val queryOptions = QueryOptions (start = start, end = end, limit = limit, orderBy = NewestFirst )
199
216
return chatApi.getMessages(
200
217
roomId = roomId,
@@ -217,6 +234,7 @@ internal class DefaultMessages(
217
234
private var lock = Any ()
218
235
219
236
/* *
237
+ * (CHA-M1)
220
238
* the channel name for the chat messages channel.
221
239
*/
222
240
private val messagesChannelName = " $roomId ::\$ chat::\$ chatMessages"
@@ -249,8 +267,9 @@ internal class DefaultMessages(
249
267
)
250
268
listener.onEvent(MessageEvent (type = MessageEventType .Created , message = chatMessage))
251
269
}
252
-
270
+ // (CHA-M4d)
253
271
channel.subscribe(MessageEventType .Created .eventName, messageListener)
272
+ // (CHA-M5) setting subscription point
254
273
associateWithCurrentChannelSerial(deferredChannelSerial)
255
274
256
275
return DefaultMessagesSubscription (
@@ -293,10 +312,11 @@ internal class DefaultMessages(
293
312
private fun associateWithCurrentChannelSerial (channelSerialProvider : DeferredValue <String >) {
294
313
if (channel.state == = ChannelState .attached) {
295
314
channelSerialProvider.completeWith(requireChannelSerial())
315
+ return
296
316
}
297
317
298
318
channel.once(ChannelState .attached) {
299
- channelSerialProvider.completeWith(requireChannelSerial ())
319
+ channelSerialProvider.completeWith(requireAttachSerial ())
300
320
}
301
321
}
302
322
@@ -307,6 +327,13 @@ internal class DefaultMessages(
307
327
)
308
328
}
309
329
330
+ private fun requireAttachSerial (): String {
331
+ return channel.properties.attachSerial
332
+ ? : throw AblyException .fromErrorInfo(
333
+ ErrorInfo (" Channel has been attached, but attachSerial is not defined" , HttpStatusCodes .BadRequest , ErrorCodes .BadRequest ),
334
+ )
335
+ }
336
+
310
337
private fun addListener (listener : Messages .Listener , deferredChannelSerial : DeferredValue <String >) {
311
338
synchronized(lock) {
312
339
listeners + = listener to deferredChannelSerial
@@ -319,9 +346,12 @@ internal class DefaultMessages(
319
346
}
320
347
}
321
348
349
+ /* *
350
+ * (CHA-M5c), (CHA-M5d)
351
+ */
322
352
private fun updateChannelSerialsAfterDiscontinuity () {
323
353
val deferredChannelSerial = DeferredValue <String >()
324
- associateWithCurrentChannelSerial( deferredChannelSerial)
354
+ deferredChannelSerial.completeWith(requireAttachSerial() )
325
355
326
356
synchronized(lock) {
327
357
listeners = listeners.mapValues {
0 commit comments