Skip to content

Commit

Permalink
Added test to perform operation mutually exclusive with given priority
Browse files Browse the repository at this point in the history
  • Loading branch information
sacOO7 committed Oct 18, 2024
1 parent e4d0870 commit 2fb5092
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
9 changes: 5 additions & 4 deletions chat-android/src/main/java/com/ably/chat/AtomicExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ private class Task(
val result: TaskResult<Any>)
: Comparable<Task> {
override fun compareTo(other: Task): Int {
return other.priority - this.priority
return this.priority.compareTo(other.priority)
}
suspend fun setResult(result: Result<Any>) {
this.result.channel.send(result)

suspend fun setResult(res: Result<Any>) {
result.channel.send(res)
}
}

Expand Down Expand Up @@ -47,8 +48,8 @@ class AtomicExecutor(private val scope: CoroutineScope) {
*/
suspend fun <T : Any>execute(priority:Int = 0, coroutineBlock: suspend CoroutineScope.() -> T) : TaskResult<T> {
val taskResult = TaskResult<Any>()
tasks.add(Task(priority, coroutineBlock, taskResult))
scope.launch {
tasks.add(Task(priority, coroutineBlock, taskResult))
if (!isRunning) {
isRunning = true
while (tasks.isNotEmpty()) {
Expand Down
31 changes: 31 additions & 0 deletions chat-android/src/test/java/com/ably/chat/AtomicExecutorTest.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.ably.chat

import java.util.concurrent.Executors
import kotlin.time.DurationUnit
import kotlin.time.toDuration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
Expand All @@ -23,4 +25,33 @@ class AtomicExecutorTest {
Assert.assertFalse(result.isFailure)
Assert.assertEquals("Operation Success!", result.getOrNull())
}

@Test
fun `should perform mutually exclusive operations with given priority`() = runTest {
val singleThreadedDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
val atomicExecutor = AtomicExecutor(CoroutineScope(singleThreadedDispatcher))
val taskResults = mutableListOf<TaskResult<Int>>()
var operationInProgress = false
var counter = 0

repeat(20) {
val result = atomicExecutor.execute(it) {
if (operationInProgress) {
throw IllegalStateException("Can't perform operation when other operation is going on")
}
operationInProgress = true
delay((200..600).random().toDuration(DurationUnit.MILLISECONDS))
operationInProgress = false
return@execute counter++
}
taskResults.add(result)
}

val results = taskResults.map { it.await() }

repeat(20) {
Assert.assertTrue(results[it].isSuccess)
Assert.assertEquals(it, results[it].getOrNull())
}
}
}

0 comments on commit 2fb5092

Please sign in to comment.