Skip to content

Commit

Permalink
add Observable#replaceWith
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Nov 25, 2022
1 parent 0f8c99c commit 301b739
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
17 changes: 17 additions & 0 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1688,6 +1688,23 @@ object Observable {
}
}

def replaceWith(obs: Observable[A]): Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = {
val replacedSubscription = Cancelable.variable()

val subscription = obs
.tap(_ => replacedSubscription.unsafeCancel())
.unsafeSubscribe(sink)

replacedSubscription.unsafeAdd(() => source.unsafeSubscribe(sink))

Cancelable.composite(
replacedSubscription,
subscription
)
}
}

def drop(num: Int): Observable[A] = {
if (num <= 0) source
else
Expand Down
68 changes: 68 additions & 0 deletions colibri/src/test/scala/colibri/ObservableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1755,4 +1755,72 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
errors shouldBe 0
cancelable.isEmpty() shouldBe false
}

it should "replaceWith" in {
var mappedA = List.empty[Unit]
var mappedB = List.empty[Unit]
var received = List.empty[Unit]
var errors = 0
val a = Subject.publish[Unit]()
val b = Subject.publish[Unit]()
val stream = (a: Observable[Unit]).tap(mappedA ::= _).replaceWith((b: Observable[Unit]).tap(mappedB ::= _))

val cancelable = stream.unsafeSubscribe(
Observer.create[Unit](
received ::= _,
_ => errors += 1,
),
)

mappedA shouldBe List()
mappedB shouldBe List()
received shouldBe List()
errors shouldBe 0
cancelable.isEmpty() shouldBe false

a.unsafeOnNext(())

mappedA shouldBe List(())
mappedB shouldBe List()
received shouldBe List(())
errors shouldBe 0
cancelable.isEmpty() shouldBe false

b.unsafeOnNext(())

mappedA shouldBe List(())
mappedB shouldBe List(())
received shouldBe List((), ())
errors shouldBe 0
cancelable.isEmpty() shouldBe false

a.unsafeOnNext(())

mappedA shouldBe List(())
mappedB shouldBe List(())
received shouldBe List((), ())
errors shouldBe 0
cancelable.isEmpty() shouldBe false
}

it should "replaceWith sync" in {
var mappedA = List.empty[Unit]
var mappedB = List.empty[Unit]
var received = List.empty[Unit]
var errors = 0
val stream = Observable.pure(()).tap(mappedA ::= _).replaceWith(Observable.pure(()).tap(mappedB ::= _))

val cancelable = stream.unsafeSubscribe(
Observer.create[Unit](
received ::= _,
_ => errors += 1,
),
)

mappedA shouldBe List()
mappedB shouldBe List(())
received shouldBe List(())
errors shouldBe 0
cancelable.isEmpty() shouldBe true
}
}

0 comments on commit 301b739

Please sign in to comment.