diff --git a/chat-android/src/main/java/com/ably/chat/Emitter.kt b/chat-android/src/main/java/com/ably/chat/Emitter.kt new file mode 100644 index 0000000..8321677 --- /dev/null +++ b/chat-android/src/main/java/com/ably/chat/Emitter.kt @@ -0,0 +1,132 @@ +package com.ably.chat + +import java.util.TreeSet +import java.util.concurrent.LinkedBlockingQueue +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch + +/** + * Kotlin Emitter interface for supplied value + * Spec: RTE1 + */ +internal interface Emitter { + fun emit(value: V) + fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription + fun once(block: suspend CoroutineScope.(V) -> Unit): Subscription + fun offAll() +} + +/** + * ScopedEmitter is a thread-safe, non-blocking emitter implementation for Kotlin. + * It ensures that all subscribers receive events asynchronously in the same order under given scope. + * + * @param V The type of value to be emitted. + * @param subscriberScope The CoroutineScope in which the subscribers will run. Defaults to Dispatchers.Default. + * @param logger An optional logger for logging errors during event processing. + */ +internal class ScopedEmitter ( + private val subscriberScope: CoroutineScope = CoroutineScope(Dispatchers.Default), + private val logger: Logger? = null, +) : Emitter { + + // Sorted list of unique subscribers based on supplied block + private val subscribers = TreeSet>() + + // Emitter scope to make sure all subscribers receive events in same order. + // Will be automatically garbage collected once all jobs are performed. + private val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1)) + + val finishedProcessing: Boolean + get() = subscribers.all { it.values.isEmpty() && !it.isSubscriberRunning } + + @get:Synchronized + val subscribersCount: Int + get() = subscribers.size + + @Synchronized + override fun emit(value: V) { + for (subscriber in subscribers.toList()) { + subscriber.inform(value) + if (subscriber.once) { + off(subscriber) + } + } + } + + private fun register(subscriber: AsyncSubscriber): Subscription { + subscribers.add(subscriber) + return Subscription { + off(subscriber) + } + } + + @Synchronized + override fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription { + val subscriber = AsyncSubscriber(sequentialScope, subscriberScope, block, false, logger) + return register(subscriber) + } + + @Synchronized + override fun once(block: suspend CoroutineScope.(V) -> Unit): Subscription { + val subscriber = AsyncSubscriber(sequentialScope, subscriberScope, block, true, logger) + return register(subscriber) + } + + @Synchronized + override fun offAll() { + subscribers.clear() + } + + @Synchronized + private fun off(subscriber: AsyncSubscriber) { + subscribers.remove(subscriber) + } +} + +private class AsyncSubscriber( + private val emitterSequentialScope: CoroutineScope, + private val subscriberScope: CoroutineScope, + private val subscriberBlock: (suspend CoroutineScope.(V) -> Unit), + val once: Boolean, + private val logger: Logger? = null, +) : Comparable { + val values = LinkedBlockingQueue() // Accessed by both Emitter#emit and emitterSequentialScope + var isSubscriberRunning = false // Only accessed as a part of emitterSequentialScope + + fun inform(value: V) { + values.add(value) + emitterSequentialScope.launch { + if (!isSubscriberRunning) { + isSubscriberRunning = true + while (values.isNotEmpty()) { + val valueTobeEmitted = values.poll() + safelyPublish(valueTobeEmitted as V) // Process sequentially, similar to core ably eventEmitter + } + isSubscriberRunning = false + } + } + } + + private suspend fun safelyPublish(value: V) { + runCatching { + subscriberScope.launch { + try { + subscriberBlock(value) + } catch (t: Throwable) { + // Catching exception to avoid error propagation to parent + logger?.warn("Error processing value $value", t) + } + }.join() + } + } + + override fun compareTo(other: V): Int { + // Avoid registering duplicate anonymous subscriber block with same instance id + // Common scenario when Android activity is refreshed or some app components refresh + if (other is AsyncSubscriber<*>) { + return this.subscriberBlock.hashCode().compareTo(other.subscriberBlock.hashCode()) + } + return this.hashCode().compareTo(other.hashCode()) + } +} diff --git a/chat-android/src/test/java/com/ably/chat/EmitterTest.kt b/chat-android/src/test/java/com/ably/chat/EmitterTest.kt new file mode 100644 index 0000000..d355970 --- /dev/null +++ b/chat-android/src/test/java/com/ably/chat/EmitterTest.kt @@ -0,0 +1,383 @@ +package com.ably.chat + +import java.util.concurrent.LinkedBlockingQueue +import kotlin.time.DurationUnit +import kotlin.time.toDuration +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext +import org.hamcrest.CoreMatchers.containsString +import org.hamcrest.MatcherAssert +import org.junit.Assert +import org.junit.Test + +class EmitterTest { + + @Test + fun `should be able to emit and listen to the values in the same order`() = runTest { + val emitter = ScopedEmitter() + val receivedValues = mutableListOf() + + emitter.on { received: Int -> + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues.add(received) + } + + repeat(10) { + emitter.emit(it) + } + + assertWaiter { receivedValues.size == 10 } + Assert.assertTrue(emitter.finishedProcessing) + + Assert.assertEquals((0..9).toList(), receivedValues) + } + + @Test + fun `should receive event only once with emitter once`() = runTest { + val emitter = ScopedEmitter() + val receivedValues = mutableListOf() + val allReceivedValues = mutableListOf() + + emitter.once { received: Int -> + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues.add(received) + } + + emitter.on { received: Int -> + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + allReceivedValues.add(received) + } + + repeat(10) { + emitter.emit(it) + } + + assertWaiter { receivedValues.size == 1 } + assertWaiter { allReceivedValues.size == 10 } + Assert.assertTrue(emitter.finishedProcessing) + + Assert.assertEquals(listOf(0), receivedValues) + Assert.assertEquals((0..9).toList(), allReceivedValues) + } + + @Test + fun `should start listening to events when subscribed and stop when unsubscribed`() = runTest { + val emitter = ScopedEmitter() + val receivedValues1 = mutableListOf() + val receivedValues2 = mutableListOf() + + emitter.emit("1") + emitter.emit("10") + Assert.assertTrue(emitter.finishedProcessing) // Since no subscribers, returns true + + val subscription1 = emitter.on { received: String -> + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues1.add(received) + } + + val subscription2 = emitter.on { received: String -> + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues2.add(received) + } + + emitter.emit("2") + emitter.emit("3") + emitter.emit("4") + Assert.assertFalse(emitter.finishedProcessing) // Both subscribers are processing + + subscription1.unsubscribe() + + emitter.emit("5") + Assert.assertFalse(emitter.finishedProcessing) // second subscriber is processing + + subscription2.unsubscribe() + + emitter.emit("6") + + assertWaiter { receivedValues1.size == 3 } + Assert.assertEquals(listOf("2", "3", "4"), receivedValues1) + + assertWaiter { receivedValues2.size == 4 } + Assert.assertEquals(listOf("2", "3", "4", "5"), receivedValues2) + + Assert.assertTrue(emitter.finishedProcessing) + } + + @Test + fun `should be able to handle sequential emits and listen them in same order by multiple subscribers`() = runTest { + val emitter = ScopedEmitter() + val emittedValues = mutableListOf() + val receivedValues1 = mutableListOf() + val receivedValues2 = mutableListOf() + val receivedValues3 = mutableListOf() + + emitter.on { received -> + delay((10..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues1.add(received) + } + + emitter.on { received -> + delay((20..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues2.add(received) + } + + emitter.on { received -> + delay((30..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues3.add(received) + } + + // emit 100 events from same thread + repeat(100) { + emitter.emit(it) + emittedValues.add(it) + } + + Assert.assertFalse(emitter.finishedProcessing) // Processing events + + assertWaiter { emittedValues.size == 100 } + assertWaiter { receivedValues1.size == 100 } + assertWaiter { receivedValues2.size == 100 } + assertWaiter { receivedValues3.size == 100 } + + Assert.assertEquals(emittedValues, receivedValues1) + Assert.assertEquals(emittedValues, receivedValues2) + Assert.assertEquals(emittedValues, receivedValues3) + + Assert.assertTrue(emitter.finishedProcessing) // Finished processing + } + + @Test + fun `all subscribers should receive events in custom (room) scope`() = runTest { + val roomScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName("roomId")) + val emitter = ScopedEmitter(roomScope) + + val contexts1 = mutableListOf() + val contextNames1 = mutableListOf() + + val contexts2 = mutableListOf() + val contextNames2 = mutableListOf() + + val contexts3 = mutableListOf() + val contextNames3 = mutableListOf() + + emitter.on { + contexts1.add(coroutineContext.toString()) + contextNames1.add(coroutineContext[CoroutineName]!!.name) + } + emitter.on { + contexts2.add(coroutineContext.toString()) + contextNames2.add(coroutineContext[CoroutineName]!!.name) + } + emitter.on { + contexts3.add(coroutineContext.toString()) + contextNames3.add(coroutineContext[CoroutineName]!!.name) + } + + // emit 10000 concurrent events + withContext(Dispatchers.IO) { + repeat(10_000) { + launch { + emitter.emit(it) + } + } + } + + Assert.assertFalse(emitter.finishedProcessing) // Processing events + + assertWaiter { contextNames1.size == 10_000 } + assertWaiter { contextNames2.size == 10_000 } + assertWaiter { contextNames3.size == 10_000 } + + repeat(10_000) { + Assert.assertEquals("roomId", contextNames1[it]) + Assert.assertEquals("roomId", contextNames2[it]) + Assert.assertEquals("roomId", contextNames3[it]) + + MatcherAssert.assertThat(contexts1[it], containsString("Dispatchers.Default.limitedParallelism(1)")) + MatcherAssert.assertThat(contexts2[it], containsString("Dispatchers.Default.limitedParallelism(1)")) + MatcherAssert.assertThat(contexts3[it], containsString("Dispatchers.Default.limitedParallelism(1)")) + } + + Assert.assertTrue(emitter.finishedProcessing) // Finished processing + } + + @Test + fun `should be able to handle concurrent emits and all subscribers should receive them in the same order`() = runTest { + val emitter = ScopedEmitter() + val emitted = LinkedBlockingQueue() + val receivedValues1 = mutableListOf() + val receivedValues2 = mutableListOf() + val receivedValues3 = mutableListOf() + + emitter.on { received -> + receivedValues1.add(received) + } + + emitter.on { received -> + receivedValues2.add(received) + } + + emitter.on { received -> + receivedValues3.add(received) + } + + // Concurrently emit 100000 events from multiple threads + withContext(Dispatchers.IO) { + repeat(1_00_000) { + launch { + emitter.emit(it) + emitted.add(it) + } + } + } + + Assert.assertFalse(emitter.finishedProcessing) + + assertWaiter { emitted.size == 1_00_000 } + assertWaiter { receivedValues1.size == 1_00_000 } + assertWaiter { receivedValues2.size == 1_00_000 } + assertWaiter { receivedValues3.size == 1_00_000 } + + // Due to concurrent emits, emit order is not guaranteed + // i.e. assertEquals(emittedValues, receivedValues1) will fail + // But order of received messages will be same across all subscribers + Assert.assertEquals(receivedValues1, receivedValues2) + Assert.assertEquals(receivedValues1, receivedValues3) + + Assert.assertTrue(emitter.finishedProcessing) + } + + @Test + fun `should be able to handle concurrent emits and all async subscribers should receive them in the same order`() = runTest { + val emitter = ScopedEmitter() + val emitted = LinkedBlockingQueue() + val receivedValues1 = mutableListOf() + val receivedValues2 = mutableListOf() + val receivedValues3 = mutableListOf() + + emitter.on { received -> + delay((30..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues1.add(received) + } + + emitter.on { received -> + delay((30..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues2.add(received) + } + + emitter.on { received -> + delay((30..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues3.add(received) + } + + // Concurrently emit 100 events from multiple threads + withContext(Dispatchers.IO) { + repeat(100) { + launch { + emitter.emit(it) + emitted.add(it) + } + } + } + + Assert.assertFalse(emitter.finishedProcessing) + + assertWaiter { emitted.size == 100 } + assertWaiter { receivedValues1.size == 100 } + assertWaiter { receivedValues2.size == 100 } + assertWaiter { receivedValues3.size == 100 } + + // Due to concurrent emits, emit order is not guaranteed + // i.e. assertEquals(emittedValues, receivedValues1) will fail + // But order of received messages will be same across all subscribers + Assert.assertEquals(receivedValues1, receivedValues2) + Assert.assertEquals(receivedValues1, receivedValues3) + + Assert.assertTrue(emitter.finishedProcessing) + } + + @Test + fun `shouldn't register same subscriber block twice`() = runTest { + val emitter = ScopedEmitter() + val receivedValues = mutableListOf() + + val block: suspend CoroutineScope.(Int) -> Unit = { + delay((200..800).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues.add(it) + } + + emitter.on(block) + emitter.on(block) + emitter.on(block) + + Assert.assertEquals(1, emitter.subscribersCount) + + emitter.emit(1) + + assertWaiter { receivedValues.size == 1 } + Assert.assertTrue(emitter.finishedProcessing) + + Assert.assertEquals(1, receivedValues[0]) + } + + @Test + fun `Ignore subscriber errors while processing events`() = runTest { + val emitter = ScopedEmitter() + val emittedValues = mutableListOf() + val receivedValues1 = mutableListOf() + val receivedValues2 = mutableListOf() + val receivedValues3 = mutableListOf() + + emitter.on { received -> + if (received % 2 == 0) { + throw UnsupportedOperationException("Can't process integers divisible by 2") + } + delay((20..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues1.add(received) + } + + emitter.on { received -> + if (received % 5 == 0) { + throw UnsupportedOperationException("Can't process integers divisible by 5") + } + delay((20..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues2.add(received) + } + + emitter.on { received -> + if (received % 7 == 0) { + throw UnsupportedOperationException("Can't process integers divisible by 7") + } + delay((30..100).random().toDuration(DurationUnit.MILLISECONDS)) + receivedValues3.add(received) + } + + // emit 100 events from same thread + repeat(100) { + emitter.emit(it) + emittedValues.add(it) + } + + Assert.assertFalse(emitter.finishedProcessing) // Processing events + + val expectedReceivedValues1 = (0..99).toList().filter { it % 2 != 0 } + val expectedReceivedValues2 = (0..99).toList().filter { it % 5 != 0 } + val expectedReceivedValues3 = (0..99).toList().filter { it % 7 != 0 } + + assertWaiter { emittedValues.size == 100 } + assertWaiter { receivedValues1.size == expectedReceivedValues1.size } + assertWaiter { receivedValues2.size == expectedReceivedValues2.size } + assertWaiter { receivedValues3.size == expectedReceivedValues3.size } + + Assert.assertEquals(expectedReceivedValues1, receivedValues1) + Assert.assertEquals(expectedReceivedValues2, receivedValues2) + Assert.assertEquals(expectedReceivedValues3, receivedValues3) + + Assert.assertTrue(emitter.finishedProcessing) // Finished processing + } +}