From dc4125d85b723bf3881d42f636afaeaee6d2d8fc Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Sun, 27 Oct 2024 00:06:37 +0530 Subject: [PATCH] Fixed asyncEmitter, marked interface methods as Synchronized to restrict single threaded access --- .../src/main/java/com/ably/chat/Emitter.kt | 46 ++++++++-------- .../test/java/com/ably/chat/EmitterTest.kt | 53 +++++++++++++++++++ 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/chat-android/src/main/java/com/ably/chat/Emitter.kt b/chat-android/src/main/java/com/ably/chat/Emitter.kt index 9c222143..65c8e235 100644 --- a/chat-android/src/main/java/com/ably/chat/Emitter.kt +++ b/chat-android/src/main/java/com/ably/chat/Emitter.kt @@ -2,8 +2,8 @@ package com.ably.chat import io.ably.lib.util.Log.ERROR import io.ably.lib.util.Log.LogHandler -import java.util.LinkedList -import java.util.concurrent.CopyOnWriteArrayList +import java.util.TreeSet +import java.util.concurrent.LinkedBlockingQueue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch @@ -19,29 +19,33 @@ interface Emitter { } /** - * AsyncEmitter is thread safe, highly performant async emitter implementation for kotlin. + * AsyncEmitter is thread safe, async emitter implementation for kotlin. * Currently, use-case is limited to handle internal events. * This can be modified in the future to handle external listeners, events etc */ class AsyncEmitter (private val collectorScope: CoroutineScope = CoroutineScope(Dispatchers.Default)) : Emitter { - // Read more on https://www.codejava.net/java-core/concurrency/java-concurrent-collection-copyonwritearraylist-examples - private val subscribers = CopyOnWriteArrayList>() + private val subscribers = TreeSet>() + @Synchronized override fun emit(value: V) { for (subscriber in subscribers) { - subscriber.notifyAsync(value) + subscriber.notify(value) } } + @Synchronized override fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription { val subscriber = AsyncSubscriber(collectorScope, block) - subscribers.addIfAbsent(subscriber) + subscribers.add(subscriber) return Subscription { - subscribers.remove(subscriber) + synchronized(this) { + subscribers.remove(subscriber) + } } } + @Synchronized override fun offAll() { subscribers.clear() } @@ -51,13 +55,13 @@ private class AsyncSubscriber( private val scope: CoroutineScope, private val subscriberBlock: (suspend CoroutineScope.(V) -> Unit), private val logger: LogHandler? = null, -) { +) : Comparable { + private val values = LinkedBlockingQueue() private var isSubscriberRunning = false - private val values = LinkedList() - fun notifyAsync(value: V) { + fun notify(value: V) { + values.add(value) sequentialScope.launch { - values.add(value) if (!isSubscriberRunning) { isSubscriberRunning = true while (values.isNotEmpty()) { @@ -75,16 +79,16 @@ private class AsyncSubscriber( } } - override fun equals(other: Any?): Boolean { - if (other is AsyncSubscriber<*>) { - // Avoid registering duplicate anonymous subscriber block with same instance id - // Common scenario when Android activity is refreshed or some app components refresh - return this.subscriberBlock.hashCode() == other.subscriberBlock.hashCode() - } - return super.equals(other) - } - companion object { val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1)) } + + override fun compareTo(other: V): Int { + // Avoid registering duplicate anonymous subscriber block with same instance id + // Common scenario when Android activity is refreshed or some app components refresh + if (other is AsyncSubscriber<*>) { + return this.subscriberBlock.hashCode().compareTo(other.subscriberBlock.hashCode()) + } + return this.hashCode().compareTo(other.hashCode()) + } } diff --git a/chat-android/src/test/java/com/ably/chat/EmitterTest.kt b/chat-android/src/test/java/com/ably/chat/EmitterTest.kt index c3408526..49f6a49f 100644 --- a/chat-android/src/test/java/com/ably/chat/EmitterTest.kt +++ b/chat-android/src/test/java/com/ably/chat/EmitterTest.kt @@ -1,6 +1,13 @@ package com.ably.chat +import java.util.concurrent.LinkedBlockingQueue +import kotlin.time.DurationUnit +import kotlin.time.toDuration +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withContext import org.junit.Assert import org.junit.Test @@ -14,6 +21,7 @@ class AsyncEmitterTest { asyncEmitter.emit("1") val subscription = asyncEmitter.on { received: String -> + delay((2000..3000).random().toDuration(DurationUnit.MILLISECONDS)) receivedValues.add(received) } @@ -27,7 +35,52 @@ class AsyncEmitterTest { asyncEmitter.emit("6") assertWaiter { receivedValues.size == 3 }.join() + Assert.assertEquals(3, receivedValues.size) Assert.assertEquals(listOf("2", "3", "4"), receivedValues) } + + @Test + fun `should be able to handle concurrent emits and listen to them in the same order`() = runTest { + val asyncEmitter = AsyncEmitter() + val emitted = LinkedBlockingQueue() + val receivedValues1 = mutableListOf() + val receivedValues2 = mutableListOf() + val receivedValues3 = mutableListOf() + + + asyncEmitter.on { received -> + receivedValues1.add(received) + } + + asyncEmitter.on { received -> + receivedValues2.add(received) + } + + asyncEmitter.on { received -> + receivedValues3.add(received) + } + + // Concurrently emit 100000 events from multiple threads + withContext(Dispatchers.IO) { + repeat(100000) { + launch { + asyncEmitter.emit(it) + emitted.add(it) + } + } + } + + assertWaiter { emitted.size == 100000 }.join() + assertWaiter { receivedValues1.size == 100000 }.join() + assertWaiter { receivedValues2.size == 100000 }.join() + + Assert.assertEquals(receivedValues1, receivedValues2) + Assert.assertEquals(receivedValues2, receivedValues3) + + Assert.assertEquals(100000, emitted.size) + Assert.assertEquals(100000, receivedValues1.size) + Assert.assertEquals(100000, receivedValues2.size) + Assert.assertEquals(100000, receivedValues3.size) + } }