Skip to content

Commit 795e487

Browse files
committed
merge
2 parents 165fb71 + b5880d8 commit 795e487

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

src/main/kotlin/rx/lang/kotlin/observables.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,5 @@ public inline fun <T> Observable<T>.subscribeWith( body : FunctionSubscriberModi
106106
modifier.body()
107107
return subscribe(modifier.subscriber)
108108
}
109+
110+
public fun <T> Observable<Observable<T>>.switchOnNext(): Observable<T> = Observable.switchOnNext(this)

src/test/kotlin/rx/lang/kotlin/ExtensionTests.kt

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import org.mockito.Mockito.*
2424
import rx.Notification
2525
import rx.Observable
2626
import rx.Subscriber
27+
import rx.schedulers.TestScheduler
28+
import java.util.concurrent.TimeUnit
2729
import kotlin.concurrent.thread
2830

2931
/**
@@ -236,6 +238,41 @@ public class ExtensionTests : KotlinTests() {
236238
assertEquals(listOf(3, 6, 9), values[2])
237239
}
238240

241+
@Test
242+
public fun testSwitchOnNext() {
243+
val testScheduler = TestScheduler()
244+
val worker = testScheduler.createWorker()
245+
246+
val observable = observable<Observable<Long>> { s ->
247+
fun at(delay: Long, func : () -> Unit){
248+
worker.schedule({
249+
func()
250+
}, delay, TimeUnit.MILLISECONDS)
251+
}
252+
253+
val first = Observable.interval(5, TimeUnit.MILLISECONDS, testScheduler).take(3)
254+
at(0, { s.onNext(first) })
255+
256+
val second = Observable.interval(5, TimeUnit.MILLISECONDS, testScheduler).take(3)
257+
at(11, { s.onNext(second) })
258+
259+
at(40, { s.onCompleted() })
260+
}
261+
262+
observable.switchOnNext().subscribe(received())
263+
264+
val inOrder = inOrder(a)
265+
testScheduler.advanceTimeTo(10, TimeUnit.MILLISECONDS)
266+
inOrder.verify(a, times(1)).received(0L)
267+
inOrder.verify(a, times(1)).received(1L)
268+
269+
testScheduler.advanceTimeTo(40, TimeUnit.MILLISECONDS)
270+
inOrder.verify(a, times(1)).received(0L)
271+
inOrder.verify(a, times(1)).received(1L)
272+
inOrder.verify(a, times(1)).received(2L)
273+
inOrder.verifyNoMoreInteractions()
274+
}
275+
239276
val funOnSubscribe: (Int, Subscriber<in String>) -> Unit = { counter, subscriber ->
240277
subscriber.onNext("hello_$counter")
241278
subscriber.onCompleted()

0 commit comments

Comments
 (0)