Skip to content

Commit f34493f

Browse files
committed
Merge pull request #17 from SalomonBrys/0.x
Proposing Observable.subscribeWith { ... }
2 parents 5786c62 + 0634d79 commit f34493f

File tree

3 files changed

+69
-1
lines changed

3 files changed

+69
-1
lines changed

src/main/kotlin/rx/lang/kotlin/observables.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rx.lang.kotlin
22

33
import rx.Observable
44
import rx.Subscriber
5+
import rx.Subscription
56
import rx.observables.BlockingObservable
67

78
public fun <T> emptyObservable() : Observable<T> = Observable.empty()
@@ -96,3 +97,12 @@ public fun <T> Observable<T>.withIndex() : Observable<IndexedValue<T>> = lift {
9697
* @returns Observable that merges all [Sequence]s produced by [body] functions
9798
*/
9899
public fun <T, R> Observable<T>.flatMapSequence( body : (T) -> Sequence<R> ) : Observable<R> = flatMap { body(it).toObservable() }
100+
101+
/**
102+
* Subscribe with a subscriber that is configured inside body
103+
*/
104+
public inline fun <T> Observable<T>.subscribeWith( body : FunctionSubscriberModifier<T>.() -> Unit ) : Subscription {
105+
val modifier = FunctionSubscriberModifier(subscriber<T>())
106+
modifier.body()
107+
return subscribe(modifier.subscriber)
108+
}

src/main/kotlin/rx/lang/kotlin/subscribers.kt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,5 +43,15 @@ public class FunctionSubscriber<T>() : Subscriber<T>() {
4343
}
4444
}
4545

46+
public class FunctionSubscriberModifier<T>(init: FunctionSubscriber<T> = subscriber()) {
47+
public var subscriber: FunctionSubscriber<T> = init
48+
private set
49+
50+
fun onCompleted(onCompletedFunction: () -> Unit) : Unit { subscriber = subscriber.onCompleted(onCompletedFunction) }
51+
fun onError(onErrorFunction: (t : Throwable) -> Unit) : Unit { subscriber = subscriber.onError(onErrorFunction) }
52+
fun onNext(onNextFunction: (t : T) -> Unit) : Unit { subscriber = subscriber.onNext(onNextFunction) }
53+
fun onStart(onStartFunction : () -> Unit) : Unit { subscriber = subscriber.onStart(onStartFunction) }
54+
}
55+
4656
public fun <T> subscriber(): FunctionSubscriber<T> = FunctionSubscriber()
4757
public fun <T> Subscriber<T>.synchronized(): Subscriber<T> = SerializedSubscriber(this)

src/test/kotlin/rx/lang/kotlin/SubscribersTest.kt

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,52 @@ public class SubscribersTest {
8686
}
8787
s.onCompleted()
8888
}
89-
}
89+
90+
test fun testSubscribeWith() {
91+
val completeObservable = observable<Int> {
92+
it.onNext(1)
93+
it.onCompleted()
94+
}
95+
96+
val events = ArrayList<String>()
97+
98+
completeObservable.subscribeWith {
99+
onNext { events.add("onNext($it)") }
100+
}
101+
102+
assertEquals(listOf("onNext(1)"), events)
103+
events.clear()
104+
105+
completeObservable.subscribeWith {
106+
onNext { events.add("onNext($it)") }
107+
onCompleted { events.add("onCompleted") }
108+
}
109+
110+
assertEquals(listOf("onNext(1)", "onCompleted"), events)
111+
events.clear()
112+
113+
val errorObservable = observable<Int> {
114+
it.onNext(1)
115+
it.onError(RuntimeException())
116+
}
117+
118+
errorObservable.subscribeWith {
119+
onNext { events.add("onNext($it)") }
120+
onError { events.add("onError(${it.javaClass.getSimpleName()})") }
121+
}
122+
123+
assertEquals(listOf("onNext(1)", "onError(RuntimeException)"), events)
124+
events.clear()
125+
126+
try {
127+
errorObservable.subscribeWith {
128+
onNext { events.add("onNext($it)") }
129+
}
130+
} catch (t: Throwable) {
131+
events.add("catch(${t.javaClass.getSimpleName()})")
132+
}
133+
134+
assertEquals(listOf("onNext(1)", "catch(OnErrorNotImplementedException)"), events)
135+
events.clear()
136+
}
137+
}

0 commit comments

Comments
 (0)