Skip to content

Commit

Permalink
Fixed asyncEmitter, marked interface methods as Synchronized to restrict
Browse files Browse the repository at this point in the history
single threaded access
  • Loading branch information
sacOO7 committed Oct 26, 2024
1 parent bd72b74 commit dc4125d
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 21 deletions.
46 changes: 25 additions & 21 deletions chat-android/src/main/java/com/ably/chat/Emitter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.ably.chat

import io.ably.lib.util.Log.ERROR
import io.ably.lib.util.Log.LogHandler
import java.util.LinkedList
import java.util.concurrent.CopyOnWriteArrayList
import java.util.TreeSet
import java.util.concurrent.LinkedBlockingQueue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
Expand All @@ -19,29 +19,33 @@ interface Emitter<V> {
}

/**
* AsyncEmitter is thread safe, highly performant async emitter implementation for kotlin.
* AsyncEmitter is thread safe, async emitter implementation for kotlin.
* Currently, use-case is limited to handle internal events.
* This can be modified in the future to handle external listeners, events etc
*/
class AsyncEmitter<V> (private val collectorScope: CoroutineScope = CoroutineScope(Dispatchers.Default)) : Emitter<V> {

// Read more on https://www.codejava.net/java-core/concurrency/java-concurrent-collection-copyonwritearraylist-examples
private val subscribers = CopyOnWriteArrayList<AsyncSubscriber<V>>()
private val subscribers = TreeSet<AsyncSubscriber<V>>()

@Synchronized
override fun emit(value: V) {
for (subscriber in subscribers) {
subscriber.notifyAsync(value)
subscriber.notify(value)
}
}

@Synchronized
override fun on(block: suspend CoroutineScope.(V) -> Unit): Subscription {
val subscriber = AsyncSubscriber(collectorScope, block)
subscribers.addIfAbsent(subscriber)
subscribers.add(subscriber)
return Subscription {
subscribers.remove(subscriber)
synchronized(this) {
subscribers.remove(subscriber)
}
}
}

@Synchronized
override fun offAll() {
subscribers.clear()
}
Expand All @@ -51,13 +55,13 @@ private class AsyncSubscriber<V>(
private val scope: CoroutineScope,
private val subscriberBlock: (suspend CoroutineScope.(V) -> Unit),
private val logger: LogHandler? = null,
) {
) : Comparable<V> {
private val values = LinkedBlockingQueue<V>()
private var isSubscriberRunning = false
private val values = LinkedList<V>()

fun notifyAsync(value: V) {
fun notify(value: V) {
values.add(value)
sequentialScope.launch {
values.add(value)
if (!isSubscriberRunning) {
isSubscriberRunning = true
while (values.isNotEmpty()) {
Expand All @@ -75,16 +79,16 @@ private class AsyncSubscriber<V>(
}
}

override fun equals(other: Any?): Boolean {
if (other is AsyncSubscriber<*>) {
// Avoid registering duplicate anonymous subscriber block with same instance id
// Common scenario when Android activity is refreshed or some app components refresh
return this.subscriberBlock.hashCode() == other.subscriberBlock.hashCode()
}
return super.equals(other)
}

companion object {
val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
}

override fun compareTo(other: V): Int {
// Avoid registering duplicate anonymous subscriber block with same instance id
// Common scenario when Android activity is refreshed or some app components refresh
if (other is AsyncSubscriber<*>) {
return this.subscriberBlock.hashCode().compareTo(other.subscriberBlock.hashCode())
}
return this.hashCode().compareTo(other.hashCode())
}
}
53 changes: 53 additions & 0 deletions chat-android/src/test/java/com/ably/chat/EmitterTest.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package com.ably.chat

import java.util.concurrent.LinkedBlockingQueue
import kotlin.time.DurationUnit
import kotlin.time.toDuration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.Assert
import org.junit.Test

Expand All @@ -14,6 +21,7 @@ class AsyncEmitterTest {
asyncEmitter.emit("1")

val subscription = asyncEmitter.on { received: String ->
delay((2000..3000).random().toDuration(DurationUnit.MILLISECONDS))
receivedValues.add(received)
}

Expand All @@ -27,7 +35,52 @@ class AsyncEmitterTest {
asyncEmitter.emit("6")

assertWaiter { receivedValues.size == 3 }.join()
Assert.assertEquals(3, receivedValues.size)

Assert.assertEquals(listOf("2", "3", "4"), receivedValues)
}

@Test
fun `should be able to handle concurrent emits and listen to them in the same order`() = runTest {
val asyncEmitter = AsyncEmitter<Int>()
val emitted = LinkedBlockingQueue<Int>()
val receivedValues1 = mutableListOf<Int>()
val receivedValues2 = mutableListOf<Int>()
val receivedValues3 = mutableListOf<Int>()


asyncEmitter.on { received ->
receivedValues1.add(received)
}

asyncEmitter.on { received ->
receivedValues2.add(received)
}

asyncEmitter.on { received ->
receivedValues3.add(received)
}

// Concurrently emit 100000 events from multiple threads
withContext(Dispatchers.IO) {
repeat(100000) {
launch {
asyncEmitter.emit(it)
emitted.add(it)
}
}
}

assertWaiter { emitted.size == 100000 }.join()
assertWaiter { receivedValues1.size == 100000 }.join()
assertWaiter { receivedValues2.size == 100000 }.join()

Assert.assertEquals(receivedValues1, receivedValues2)
Assert.assertEquals(receivedValues2, receivedValues3)

Assert.assertEquals(100000, emitted.size)
Assert.assertEquals(100000, receivedValues1.size)
Assert.assertEquals(100000, receivedValues2.size)
Assert.assertEquals(100000, receivedValues3.size)
}
}

0 comments on commit dc4125d

Please sign in to comment.