Skip to content

Commit

Permalink
add Observable.evalCancelable and mapCancelable
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Nov 21, 2023
1 parent 95e80bf commit 5d8e170
Showing 1 changed file with 7 additions and 0 deletions.
7 changes: 7 additions & 0 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ object Observable {
}
}

def evalCancelable(value: => Cancelable): Observable[Unit] = new Observable[Unit] {
def unsafeSubscribe(sink: Observer[Unit]): Cancelable = value
}

def evalObservable[T](value: => Observable[T]): Observable[T] = new Observable[T] {
def unsafeSubscribe(sink: Observer[T]): Cancelable = value.unsafeSubscribe(sink)
}
Expand Down Expand Up @@ -711,6 +715,9 @@ object Observable {
@inline def mapFuture[B](f: A => Future[B]): Observable[B] = concatMapFuture(f)
@inline def concatMapFuture[B](f: A => Future[B]): Observable[B] = mapEffect(v => IO.fromFuture(IO(f(v))))

@inline def mapCancelable(f: A => Cancelable): Observable[Unit] =
source.switchMap(a => Observable.evalCancelable(f(a)))

@inline def mapResource[F[_]: RunEffect: Sync, B](f: A => Resource[F, B]): Observable[B] =
mapResourceWithCancelable(f)(Cancelable.builder)

Expand Down

0 comments on commit 5d8e170

Please sign in to comment.