Skip to content

Commit 5786c62

Browse files
committed
Merge pull request #16 from cy6erGn0m/multihandlers
Multiple handlers in functional subscriber
2 parents 049d8d9 + 69573ea commit 5786c62

File tree

2 files changed

+72
-17
lines changed

2 files changed

+72
-17
lines changed
Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,47 @@
11
package rx.lang.kotlin
22

33
import rx.Subscriber
4-
import rx.observers.SerializedSubscriber
54
import rx.exceptions.OnErrorNotImplementedException
5+
import rx.observers.SerializedSubscriber
6+
import java.util.ArrayList
7+
8+
public class FunctionSubscriber<T>() : Subscriber<T>() {
9+
private val onCompletedFunctions = ArrayList<() -> Unit>()
10+
private val onErrorFunctions = ArrayList<(e: Throwable) -> Unit>()
11+
private val onNextFunctions = ArrayList<(value: T) -> Unit>()
12+
private val onStartFunctions = ArrayList<() -> Unit>()
13+
14+
override fun onCompleted() = onCompletedFunctions.forEach { it() }
15+
16+
override fun onError(e: Throwable?) = (e ?: RuntimeException("exception is unknown")).let { ex ->
17+
if (onErrorFunctions.isEmpty()) {
18+
throw OnErrorNotImplementedException(ex)
19+
} else {
20+
onErrorFunctions.forEach { it(ex) }
21+
}
22+
}
623

7-
public class FunctionSubscriber<T>(onCompletedFunction: () -> Unit, onErrorFunction: (e : Throwable) -> Unit, onNextFunction: (value : T) -> Unit, onStartFunction : () -> Unit) : Subscriber<T>() {
8-
private val onCompletedFunction: () -> Unit = onCompletedFunction
9-
private val onErrorFunction: (e : Throwable) -> Unit = onErrorFunction
10-
private val onNextFunction: (value : T) -> Unit = onNextFunction
11-
private val onStartFunction : () -> Unit = onStartFunction
24+
override fun onNext(t: T) = onNextFunctions.forEach { it(t) }
1225

13-
override fun onCompleted() = onCompletedFunction()
26+
override fun onStart() = onStartFunctions.forEach { it() }
1427

15-
override fun onError(e: Throwable?) = onErrorFunction(e ?: RuntimeException("exception is unknown"))
28+
fun onCompleted(onCompletedFunction: () -> Unit): FunctionSubscriber<T> = copy { onCompletedFunctions.add(onCompletedFunction) }
29+
fun onError(onErrorFunction: (t: Throwable) -> Unit): FunctionSubscriber<T> = copy { onErrorFunctions.add(onErrorFunction) }
30+
fun onNext(onNextFunction: (t: T) -> Unit): FunctionSubscriber<T> = copy { onNextFunctions.add(onNextFunction) }
31+
fun onStart(onStartFunction : () -> Unit) : FunctionSubscriber<T> = copy { onStartFunctions.add(onStartFunction) }
1632

17-
override fun onNext(t: T) = onNextFunction(t)
33+
private fun copy(block: FunctionSubscriber<T>.() -> Unit): FunctionSubscriber<T> {
34+
val newSubscriber = FunctionSubscriber<T>()
35+
newSubscriber.onCompletedFunctions.addAll(onCompletedFunctions)
36+
newSubscriber.onErrorFunctions.addAll(onErrorFunctions)
37+
newSubscriber.onNextFunctions.addAll(onNextFunctions)
38+
newSubscriber.onStartFunctions.addAll(onStartFunctions)
1839

19-
override fun onStart() = onStartFunction()
40+
newSubscriber.block()
2041

21-
fun onCompleted(onCompletedFunction: () -> Unit) : FunctionSubscriber<T> = FunctionSubscriber(onCompletedFunction, this.onErrorFunction, this.onNextFunction, this.onStartFunction)
22-
fun onError(onErrorFunction: (t : Throwable) -> Unit) : FunctionSubscriber<T> = FunctionSubscriber(this.onCompletedFunction, onErrorFunction, this.onNextFunction, this.onStartFunction)
23-
fun onNext(onNextFunction: (t : T) -> Unit) : FunctionSubscriber<T> = FunctionSubscriber(this.onCompletedFunction, this.onErrorFunction, onNextFunction, this.onStartFunction)
24-
fun onStart(onStartFunction : () -> Unit) : FunctionSubscriber<T> = FunctionSubscriber(this.onCompletedFunction, this.onErrorFunction, this.onNextFunction, onStartFunction)
42+
return newSubscriber
43+
}
2544
}
2645

27-
public fun <T> subscriber(): FunctionSubscriber<T> = FunctionSubscriber({}, {throw OnErrorNotImplementedException(it)}, {}, {})
28-
public fun <T> Subscriber<T>.synchronized() : Subscriber<T> = SerializedSubscriber(this)
46+
public fun <T> subscriber(): FunctionSubscriber<T> = FunctionSubscriber()
47+
public fun <T> Subscriber<T>.synchronized(): Subscriber<T> = SerializedSubscriber(this)

src/test/kotlin/rx/lang/kotlin/SubscribersTest.kt

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package rx.lang.kotlin
22

3-
import org.junit.Test as test
43
import rx.Subscriber
4+
import rx.exceptions.OnErrorNotImplementedException
55
import java.util.ArrayList
66
import kotlin.test.assertEquals
7+
import kotlin.test.assertTrue
8+
import org.junit.Test as test
79

810
public class SubscribersTest {
911
test fun testEmptySubscriber() {
@@ -39,7 +41,41 @@ public class SubscribersTest {
3941
events.clear()
4042
}
4143

44+
test(expected = javaClass<OnErrorNotImplementedException>())
45+
fun testNoErrorHandlers() {
46+
subscriber<Int>().onError(Exception("expected"))
47+
}
48+
49+
test fun testErrorHandlers() {
50+
var errorsCaught = 0
51+
52+
subscriber<Int>().
53+
onError { errorsCaught++ }.
54+
onError { errorsCaught++ }.
55+
onError(Exception("expected"))
56+
57+
assertEquals(2, errorsCaught)
58+
}
59+
60+
test fun testMultipleOnNextHandlers() {
61+
var nextCaught = 0
62+
63+
subscriber<Int>().
64+
onNext { nextCaught ++ }.
65+
onNext { nextCaught ++ }.
66+
onNext(1)
67+
68+
assertEquals(2, nextCaught)
69+
}
70+
71+
test fun testOnStart() {
72+
var started = false
73+
subscriber<Int>().onStart { started = true }.onStart()
74+
assertTrue(started)
75+
}
76+
4277
private fun callSubscriberMethods(hasOnError : Boolean, s: Subscriber<Int>) {
78+
s.onStart()
4379
s.onNext(1)
4480
try {
4581
s.onError(RuntimeException())

0 commit comments

Comments
 (0)