Skip to content

Commit 0634d79

Browse files
committed
Merged with #16
2 parents 355e27b + 5786c62 commit 0634d79

File tree

2 files changed

+71
-17
lines changed

2 files changed

+71
-17
lines changed
Lines changed: 35 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,46 @@
11
package rx.lang.kotlin
22

33
import rx.Subscriber
4-
import rx.observers.SerializedSubscriber
54
import rx.exceptions.OnErrorNotImplementedException
5+
import rx.observers.SerializedSubscriber
6+
import java.util.ArrayList
7+
8+
public class FunctionSubscriber<T>() : Subscriber<T>() {
9+
private val onCompletedFunctions = ArrayList<() -> Unit>()
10+
private val onErrorFunctions = ArrayList<(e: Throwable) -> Unit>()
11+
private val onNextFunctions = ArrayList<(value: T) -> Unit>()
12+
private val onStartFunctions = ArrayList<() -> Unit>()
13+
14+
override fun onCompleted() = onCompletedFunctions.forEach { it() }
15+
16+
override fun onError(e: Throwable?) = (e ?: RuntimeException("exception is unknown")).let { ex ->
17+
if (onErrorFunctions.isEmpty()) {
18+
throw OnErrorNotImplementedException(ex)
19+
} else {
20+
onErrorFunctions.forEach { it(ex) }
21+
}
22+
}
623

7-
public class FunctionSubscriber<T>(onCompletedFunction: () -> Unit, onErrorFunction: (e : Throwable) -> Unit, onNextFunction: (value : T) -> Unit, onStartFunction : () -> Unit) : Subscriber<T>() {
8-
private val onCompletedFunction: () -> Unit = onCompletedFunction
9-
private val onErrorFunction: (e : Throwable) -> Unit = onErrorFunction
10-
private val onNextFunction: (value : T) -> Unit = onNextFunction
11-
private val onStartFunction : () -> Unit = onStartFunction
24+
override fun onNext(t: T) = onNextFunctions.forEach { it(t) }
1225

13-
override fun onCompleted() = onCompletedFunction()
26+
override fun onStart() = onStartFunctions.forEach { it() }
1427

15-
override fun onError(e: Throwable?) = onErrorFunction(e ?: RuntimeException("exception is unknown"))
28+
fun onCompleted(onCompletedFunction: () -> Unit): FunctionSubscriber<T> = copy { onCompletedFunctions.add(onCompletedFunction) }
29+
fun onError(onErrorFunction: (t: Throwable) -> Unit): FunctionSubscriber<T> = copy { onErrorFunctions.add(onErrorFunction) }
30+
fun onNext(onNextFunction: (t: T) -> Unit): FunctionSubscriber<T> = copy { onNextFunctions.add(onNextFunction) }
31+
fun onStart(onStartFunction : () -> Unit) : FunctionSubscriber<T> = copy { onStartFunctions.add(onStartFunction) }
1632

17-
override fun onNext(t: T) = onNextFunction(t)
33+
private fun copy(block: FunctionSubscriber<T>.() -> Unit): FunctionSubscriber<T> {
34+
val newSubscriber = FunctionSubscriber<T>()
35+
newSubscriber.onCompletedFunctions.addAll(onCompletedFunctions)
36+
newSubscriber.onErrorFunctions.addAll(onErrorFunctions)
37+
newSubscriber.onNextFunctions.addAll(onNextFunctions)
38+
newSubscriber.onStartFunctions.addAll(onStartFunctions)
1839

19-
override fun onStart() = onStartFunction()
40+
newSubscriber.block()
2041

21-
fun onCompleted(onCompletedFunction: () -> Unit) : FunctionSubscriber<T> = FunctionSubscriber(onCompletedFunction, this.onErrorFunction, this.onNextFunction, this.onStartFunction)
22-
fun onError(onErrorFunction: (t : Throwable) -> Unit) : FunctionSubscriber<T> = FunctionSubscriber(this.onCompletedFunction, onErrorFunction, this.onNextFunction, this.onStartFunction)
23-
fun onNext(onNextFunction: (t : T) -> Unit) : FunctionSubscriber<T> = FunctionSubscriber(this.onCompletedFunction, this.onErrorFunction, onNextFunction, this.onStartFunction)
24-
fun onStart(onStartFunction : () -> Unit) : FunctionSubscriber<T> = FunctionSubscriber(this.onCompletedFunction, this.onErrorFunction, this.onNextFunction, onStartFunction)
42+
return newSubscriber
43+
}
2544
}
2645

2746
public class FunctionSubscriberModifier<T>(init: FunctionSubscriber<T> = subscriber()) {
@@ -34,5 +53,5 @@ public class FunctionSubscriberModifier<T>(init: FunctionSubscriber<T> = subscri
3453
fun onStart(onStartFunction : () -> Unit) : Unit { subscriber = subscriber.onStart(onStartFunction) }
3554
}
3655

37-
public fun <T> subscriber(): FunctionSubscriber<T> = FunctionSubscriber({}, {throw OnErrorNotImplementedException(it)}, {}, {})
38-
public fun <T> Subscriber<T>.synchronized() : Subscriber<T> = SerializedSubscriber(this)
56+
public fun <T> subscriber(): FunctionSubscriber<T> = FunctionSubscriber()
57+
public fun <T> Subscriber<T>.synchronized(): Subscriber<T> = SerializedSubscriber(this)

src/test/kotlin/rx/lang/kotlin/SubscribersTest.kt

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package rx.lang.kotlin
22

3-
import org.junit.Test as test
43
import rx.Subscriber
54
import rx.exceptions.OnErrorNotImplementedException
65
import java.util.ArrayList
76
import kotlin.test.assertEquals
7+
import kotlin.test.assertTrue
8+
import org.junit.Test as test
89

910
public class SubscribersTest {
1011
test fun testEmptySubscriber() {
@@ -40,7 +41,41 @@ public class SubscribersTest {
4041
events.clear()
4142
}
4243

44+
test(expected = javaClass<OnErrorNotImplementedException>())
45+
fun testNoErrorHandlers() {
46+
subscriber<Int>().onError(Exception("expected"))
47+
}
48+
49+
test fun testErrorHandlers() {
50+
var errorsCaught = 0
51+
52+
subscriber<Int>().
53+
onError { errorsCaught++ }.
54+
onError { errorsCaught++ }.
55+
onError(Exception("expected"))
56+
57+
assertEquals(2, errorsCaught)
58+
}
59+
60+
test fun testMultipleOnNextHandlers() {
61+
var nextCaught = 0
62+
63+
subscriber<Int>().
64+
onNext { nextCaught ++ }.
65+
onNext { nextCaught ++ }.
66+
onNext(1)
67+
68+
assertEquals(2, nextCaught)
69+
}
70+
71+
test fun testOnStart() {
72+
var started = false
73+
subscriber<Int>().onStart { started = true }.onStart()
74+
assertTrue(started)
75+
}
76+
4377
private fun callSubscriberMethods(hasOnError : Boolean, s: Subscriber<Int>) {
78+
s.onStart()
4479
s.onNext(1)
4580
try {
4681
s.onError(RuntimeException())

0 commit comments

Comments
 (0)