Skip to content

Commit a428dbd

Browse files
committed
Updated atomicCoroutineScope to run operations under supplied scope,
updated tests for the same
1 parent 6718da2 commit a428dbd

File tree

2 files changed

+117
-46
lines changed

2 files changed

+117
-46
lines changed

chat-android/src/main/java/com/ably/chat/AtomicCoroutineScope.kt

Lines changed: 24 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,22 @@ package com.ably.chat
22

33
import io.ably.annotation.Experimental
44
import java.util.concurrent.PriorityBlockingQueue
5-
import java.util.concurrent.atomic.AtomicInteger
65
import kotlinx.coroutines.CompletableDeferred
76
import kotlinx.coroutines.CoroutineScope
87
import kotlinx.coroutines.Dispatchers
9-
import kotlinx.coroutines.coroutineScope
108
import kotlinx.coroutines.launch
119

1210
/**
13-
* AtomicCoroutineScope makes sure all operations are atomic and run with given priority.
14-
* Accepts scope as an optional parameter to run atomic operations under the given scope.
11+
* AtomicCoroutineScope is a thread safe wrapper to run multiple operations mutually exclusive.
12+
* All operations are atomic and run with given priority.
13+
* Accepts scope as a constructor parameter to run operations under the given scope.
1514
* See [Kotlin Dispatchers](https://kt.academy/article/cc-dispatchers) for more information.
1615
*/
1716
class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default)) {
1817

1918
private val sequentialScope: CoroutineScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
2019

21-
private class Job private constructor(
20+
private class Job(
2221
private val priority: Int,
2322
val coroutineBlock: suspend CoroutineScope.() -> Any,
2423
val deferredResult: CompletableDeferred<Any>,
@@ -31,31 +30,28 @@ class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Di
3130
}
3231
return this.priority.compareTo(other.priority)
3332
}
34-
35-
companion object {
36-
var counter: AtomicInteger = AtomicInteger()
37-
fun create(priority: Int, coroutineBlock: suspend CoroutineScope.() -> Any, deferredResult: CompletableDeferred<Any>): Job {
38-
return Job(priority, coroutineBlock, deferredResult, counter.getAndIncrement())
39-
}
40-
}
4133
}
4234

43-
private val jobs: PriorityBlockingQueue<Job> = PriorityBlockingQueue()
44-
private var isRunning = false
35+
private val jobs: PriorityBlockingQueue<Job> = PriorityBlockingQueue() // Accessed from both sequentialScope and async method
36+
private var isRunning = false // Only accessed from sequentialScope
37+
private var queueCounter = 0 // Only accessed from synchronized async method
4538

4639
/**
4740
* @param priority Defines priority for the operation execution.
4841
* @param coroutineBlock Suspended function that needs to be executed mutually exclusive under given scope.
4942
*/
43+
@Synchronized
5044
fun <T : Any>async(priority: Int = 0, coroutineBlock: suspend CoroutineScope.() -> T): CompletableDeferred<T> {
5145
val deferredResult = CompletableDeferred<Any>()
52-
jobs.add(Job.create(priority, coroutineBlock, deferredResult))
46+
jobs.add(Job(priority, coroutineBlock, deferredResult, queueCounter++))
5347
sequentialScope.launch {
5448
if (!isRunning) {
5549
isRunning = true
5650
while (jobs.isNotEmpty()) {
5751
val job = jobs.poll()
58-
safeExecute(job)
52+
job?.let {
53+
safeExecute(it)
54+
}
5955
}
6056
isRunning = false
6157
}
@@ -65,32 +61,22 @@ class AtomicCoroutineScope(private val scope: CoroutineScope = CoroutineScope(Di
6561
return deferredResult as CompletableDeferred<T>
6662
}
6763

68-
private suspend fun safeExecute(job: Job?) {
69-
job?.let {
70-
runCatching {
71-
scope.launch {
72-
try {
73-
val result = it.coroutineBlock(this)
74-
it.deferredResult.complete(result)
75-
} catch (t: Throwable) {
76-
it.deferredResult.completeExceptionally(t)
77-
}
78-
}.join()
79-
}
64+
private suspend fun safeExecute(job: Job) {
65+
runCatching {
66+
scope.launch {
67+
try {
68+
val result = job.coroutineBlock(this)
69+
job.deferredResult.complete(result)
70+
} catch (t: Throwable) {
71+
job.deferredResult.completeExceptionally(t)
72+
}
73+
}.join()
74+
}.onFailure {
75+
job.deferredResult.completeExceptionally(it)
8076
}
8177
}
8278

8379
@Experimental
8480
val finishedProcessing: Boolean
8581
get() = jobs.isEmpty() && !isRunning
86-
87-
/**
88-
* Clears queuedJobs
89-
*/
90-
fun cancel(message: String? = "Atomic coroutine scope cancelled", cause: Throwable? = null) {
91-
jobs.clear()
92-
Job.counter.set(0)
93-
// Once sequentialScope.cancel called, AtomicCoroutineScope can't be reused
94-
// So, once all jobs are executed, it should be garbage collected.
95-
}
9682
}

chat-android/src/test/java/com/ably/chat/AtomicCoroutineScopeTest.kt

Lines changed: 93 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,20 @@ package com.ably.chat
22

33
import io.ably.lib.types.AblyException
44
import io.ably.lib.types.ErrorInfo
5+
import java.util.concurrent.LinkedBlockingQueue
56
import kotlin.time.DurationUnit
67
import kotlin.time.toDuration
8+
import kotlinx.coroutines.CompletableDeferred
9+
import kotlinx.coroutines.CoroutineName
710
import kotlinx.coroutines.CoroutineScope
811
import kotlinx.coroutines.Deferred
912
import kotlinx.coroutines.Dispatchers
1013
import kotlinx.coroutines.awaitAll
1114
import kotlinx.coroutines.delay
15+
import kotlinx.coroutines.launch
1216
import kotlinx.coroutines.runBlocking
1317
import kotlinx.coroutines.test.runTest
18+
import kotlinx.coroutines.withContext
1419
import org.hamcrest.CoreMatchers.containsString
1520
import org.junit.Assert
1621
import org.junit.Test
@@ -69,8 +74,8 @@ class AtomicCoroutineScopeTest {
6974
}
7075
operationInProgress = true
7176
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
72-
operationInProgress = false
7377
val returnValue = counter++
78+
operationInProgress = false
7479
return@async returnValue
7580
}
7681
deferredResults.add(result)
@@ -86,24 +91,63 @@ class AtomicCoroutineScopeTest {
8691
}
8792

8893
@Test
89-
fun `should perform mutually exclusive operations with custom scope`() = runTest {
90-
val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1))
94+
fun `Concurrently perform mutually exclusive operations`() = runTest {
95+
val atomicCoroutineScope = AtomicCoroutineScope()
96+
val deferredResults = LinkedBlockingQueue<CompletableDeferred<Unit>>()
97+
98+
var operationInProgress = false
99+
var counter = 0
100+
val countedValues = mutableListOf<Int>()
101+
102+
// Concurrently schedule 100000 jobs from multiple threads
103+
withContext(Dispatchers.IO) {
104+
repeat(100000) {
105+
launch {
106+
val result = atomicCoroutineScope.async {
107+
if (operationInProgress) {
108+
error("Can't perform operation when other operation is going on")
109+
}
110+
operationInProgress = true
111+
countedValues.add(counter++)
112+
operationInProgress = false
113+
}
114+
deferredResults.add(result)
115+
}
116+
}
117+
}
118+
119+
Assert.assertFalse(atomicCoroutineScope.finishedProcessing)
120+
assertWaiter { deferredResults.size == 100000 }
121+
122+
deferredResults.awaitAll()
123+
assertWaiter { atomicCoroutineScope.finishedProcessing }
124+
Assert.assertEquals((0..99999).toList(), countedValues)
125+
}
126+
127+
@Test
128+
fun `should perform mutually exclusive operations with custom named scope`() = runTest {
129+
val sequentialScope = CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName("roomId"))
91130
val atomicCoroutineScope = AtomicCoroutineScope(sequentialScope)
92131
val deferredResults = mutableListOf<Deferred<Int>>()
132+
93133
val contexts = mutableListOf<String>()
134+
val contextNames = mutableListOf<String>()
135+
94136
var operationInProgress = false
95137
var counter = 0
96138

97139
repeat(10) {
98140
val result = atomicCoroutineScope.async {
99-
contexts.add(this.coroutineContext.toString())
100141
if (operationInProgress) {
101142
error("Can't perform operation when other operation is going on")
102143
}
103144
operationInProgress = true
145+
contexts.add(coroutineContext.toString())
146+
contextNames.add(coroutineContext[CoroutineName]!!.name)
147+
104148
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
105-
operationInProgress = false
106149
val returnValue = counter++
150+
operationInProgress = false
107151
return@async returnValue
108152
}
109153
deferredResults.add(result)
@@ -113,6 +157,7 @@ class AtomicCoroutineScopeTest {
113157
val results = deferredResults.awaitAll()
114158
repeat(10) {
115159
Assert.assertEquals(it, results[it])
160+
Assert.assertEquals("roomId", contextNames[it])
116161
Assert.assertThat(contexts[it], containsString("Dispatchers.Default.limitedParallelism(1)"))
117162
}
118163
Assert.assertTrue(atomicCoroutineScope.finishedProcessing)
@@ -138,14 +183,14 @@ class AtomicCoroutineScopeTest {
138183
// Add more jobs, will be processed based on priority
139184
repeat(10) {
140185
val result = atomicCoroutineScope.async(10 - it) {
141-
contexts.add(this.coroutineContext.toString())
142186
if (operationInProgress) {
143187
error("Can't perform operation when other operation is going on")
144188
}
145189
operationInProgress = true
190+
contexts.add(this.coroutineContext.toString())
146191
delay((200..800).random().toDuration(DurationUnit.MILLISECONDS))
147-
operationInProgress = false
148192
val returnValue = counter++
193+
operationInProgress = false
149194
return@async returnValue
150195
}
151196
deferredResults.add(result)
@@ -161,6 +206,46 @@ class AtomicCoroutineScopeTest {
161206
}
162207

163208
@Test
164-
fun `reuse AtomicCoroutineScope once cancelled`() = runTest {
209+
fun `Concurrently execute mutually exclusive operations with given priority`() = runTest {
210+
val atomicCoroutineScope = AtomicCoroutineScope()
211+
val deferredResults = LinkedBlockingQueue<Deferred<Unit>>()
212+
213+
var operationInProgress = false
214+
val processedValues = mutableListOf<Int>()
215+
216+
// This will start first internal operation
217+
deferredResults.add(
218+
atomicCoroutineScope.async {
219+
delay(1000)
220+
processedValues.add(1000)
221+
Unit
222+
},
223+
)
224+
225+
// Add more jobs, will be processed based on priority
226+
// Concurrently schedule 1000 jobs with incremental priority from multiple threads
227+
withContext(Dispatchers.IO) {
228+
repeat(1000) {
229+
launch {
230+
val result = atomicCoroutineScope.async(1000 - it) {
231+
if (operationInProgress) {
232+
error("Can't perform operation when other operation is going on")
233+
}
234+
operationInProgress = true
235+
processedValues.add(it)
236+
operationInProgress = false
237+
}
238+
deferredResults.add(result)
239+
}
240+
}
241+
}
242+
243+
Assert.assertFalse(atomicCoroutineScope.finishedProcessing)
244+
deferredResults.awaitAll()
245+
val expectedResults = (1000 downTo 0).toList()
246+
repeat(1001) {
247+
Assert.assertEquals(expectedResults[it], processedValues[it])
248+
}
249+
Assert.assertTrue(atomicCoroutineScope.finishedProcessing)
165250
}
166251
}

0 commit comments

Comments
 (0)