diff --git a/colibri/src/main/scala/colibri/CanCancel.scala b/colibri/src/main/scala/colibri/CanCancel.scala index 4b8ab9a7..dc39a28f 100644 --- a/colibri/src/main/scala/colibri/CanCancel.scala +++ b/colibri/src/main/scala/colibri/CanCancel.scala @@ -2,9 +2,6 @@ package colibri trait CanCancel[-T] { def unsafeCancel(cancelable: T): Unit - - @deprecated("Use unsafeCancel instead", "0.2.7") - @inline final def cancel(cancelable: T): Unit = unsafeCancel(cancelable) } object CanCancel { @inline def apply[T](implicit unsafeCancel: CanCancel[T]): CanCancel[T] = unsafeCancel diff --git a/colibri/src/main/scala/colibri/Cancelable.scala b/colibri/src/main/scala/colibri/Cancelable.scala index 977b8896..1979752e 100644 --- a/colibri/src/main/scala/colibri/Cancelable.scala +++ b/colibri/src/main/scala/colibri/Cancelable.scala @@ -10,9 +10,6 @@ trait Cancelable { def isEmpty(): Boolean def unsafeCancel(): Unit - @deprecated("Use unsafeCancel() instead", "0.2.7") - @inline final def cancel(): Unit = unsafeCancel() - final def cancelF[F[_]: Sync]: F[Unit] = Sync[F].delay(unsafeCancel()) final def cancelIO: IO[Unit] = cancelF[IO] final def cancelSyncIO: SyncIO[Unit] = cancelF[SyncIO] diff --git a/colibri/src/main/scala/colibri/Connectable.scala b/colibri/src/main/scala/colibri/Connectable.scala index e8a2f1f4..03859e4f 100644 --- a/colibri/src/main/scala/colibri/Connectable.scala +++ b/colibri/src/main/scala/colibri/Connectable.scala @@ -1,55 +1,25 @@ package colibri -final class Connectable[+T] private (val value: T, val connect: () => Cancelable) { - def map[A](f: T => A): Connectable[A] = new Connectable(f(value), connect) +final class Connectable[+T] private (val value: T, val unsafeConnect: () => Cancelable) { + def map[A](f: T => A): Connectable[A] = new Connectable(f(value), unsafeConnect) def flatMap[A](f: T => Connectable[A]): Connectable[A] = { val connectable = f(value) - new Connectable(connectable.value, () => Cancelable.composite(connect(), connectable.connect())) + new Connectable(connectable.value, () => Cancelable.composite(unsafeConnect(), connectable.unsafeConnect())) } } object Connectable { - def apply[T](value: T, connect: () => Cancelable) = { - val cancelable = Cancelable.refCount(connect) + def apply[T](value: T, unsafeConnect: () => Cancelable) = { + val cancelable = Cancelable.refCount(unsafeConnect) new Connectable(value, cancelable.ref) } @inline implicit class ConnectableObservableOperations[A](val source: Connectable[Observable[A]]) extends AnyVal { def refCount: Observable[A] = new Observable[A] { - def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.connect()) + def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect()) } - @deprecated("Use unsafeHot instead", "0.5.0") - def hot: Observable.Hot[A] = unsafeHot() - def unsafeHot(): Observable.Hot[A] = new Observable.Hot[A] { - val cancelable = source.connect() - def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink) - } - } - - @inline implicit class ConnectableObservableValueOperations[A](val source: Connectable[Observable.Value[A]]) extends AnyVal { - def refCount: Observable.Value[A] = new Observable.Value[A] { - def now() = source.value.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.connect()) - } - @deprecated("Use unsafeHot instead", "0.7.8") - def hot: Observable.HotValue[A] = unsafeHot() - def unsafeHot(): Observable.HotValue[A] = new Observable.HotValue[A] { - val cancelable = source.connect() - def now() = source.value.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink) - } - } - - @inline implicit class ConnectableObservableMaybeValueOperations[A](val source: Connectable[Observable.MaybeValue[A]]) extends AnyVal { - def refCount: Observable.MaybeValue[A] = new Observable.MaybeValue[A] { - def now() = source.value.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.connect()) - } - @deprecated("Use unsafeHot instead", "0.7.8") - def hot: Observable.HotMaybeValue[A] = unsafeHot() - def unsafeHot(): Observable.HotMaybeValue[A] = new Observable.HotMaybeValue[A] { - val cancelable = source.connect() - def now() = source.value.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink) + def unsafeHot(): Observable[A] = { + val _ = source.unsafeConnect() + source.value } } } diff --git a/colibri/src/main/scala/colibri/Observable.scala b/colibri/src/main/scala/colibri/Observable.scala index 45242b39..762fcaab 100644 --- a/colibri/src/main/scala/colibri/Observable.scala +++ b/colibri/src/main/scala/colibri/Observable.scala @@ -2,7 +2,7 @@ package colibri import cats._ import cats.implicits._ -import colibri.effect.{RunSyncEffect, RunEffect} +import colibri.effect.RunEffect import cats.effect.{Sync, SyncIO, Async, IO, Resource} import scala.scalajs.js @@ -85,21 +85,6 @@ object Observable { def flatMap[B](f: A => Observable[B]): Observable[B] } - trait Value[+A] extends Observable[A] { - def now(): A - } - trait MaybeValue[+A] extends Observable[A] { - def now(): Option[A] - } - - trait HasCancelable { - def cancelable: Cancelable - } - - trait Hot[+A] extends Observable[A] with HasCancelable - trait HotValue[+A] extends Value[A] with HasCancelable - trait HotMaybeValue[+A] extends MaybeValue[A] with HasCancelable - object Empty extends Observable[Nothing] { @inline def unsafeSubscribe(sink: Observer[Nothing]): Cancelable = Cancelable.empty } @@ -129,8 +114,6 @@ object Observable { def unsafeSubscribe(sink: Observer[T]): Cancelable = value.unsafeSubscribe(sink) } - @deprecated("Use Observable.raiseError instead", "0.3.0") - def failure[T](error: Throwable): Observable[T] = raiseError(error) def raiseError[T](error: Throwable): Observable[T] = new Observable[T] { def unsafeSubscribe(sink: Observer[T]): Cancelable = { sink.unsafeOnError(error) @@ -173,10 +156,6 @@ object Observable { } } - @deprecated("Use fromEffect instead", "0.3.0") - def fromSync[F[_]: RunSyncEffect, A](effect: F[A]): Observable[A] = fromEffect(effect) - @deprecated("Use fromEffect instead", "0.3.0") - def fromAsync[F[_]: RunEffect, A](effect: F[A]): Observable[A] = fromEffect(effect) def fromEffect[F[_]: RunEffect, A](effect: F[A]): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = RunEffect[F].unsafeRunSyncOrAsyncCancelable[A](effect)(_.fold(sink.unsafeOnError, sink.unsafeOnNext)) @@ -212,11 +191,6 @@ object Observable { def like[H[_]: ObservableLike, A](observableLike: H[A]): Observable[A] = ObservableLike[H].toObservable(observableLike) - @deprecated("Use concatEffect instead", "0.3.0") - def concatSync[F[_]: RunSyncEffect, T](effects: F[T]*): Observable[T] = concatEffect(effects: _*) - @deprecated("Use concatEffect instead", "0.3.0") - def concatAsync[F[_]: RunEffect, T](effects: F[T]*): Observable[T] = concatEffect(effects: _*) - def concatEffect[F[_]: RunEffect, T](effects: F[T]*): Observable[T] = fromIterable(effects).mapEffect(identity) def concatFuture[T](value1: => Future[T]): Observable[T] = concatEffect(IO.fromFuture(IO(value1))) @@ -240,10 +214,6 @@ object Observable { IO.fromFuture(IO(value5)), ) - @deprecated("Use concatEffect instead", "0.3.0") - def concatSync[F[_]: RunSyncEffect, T](effect: F[T], source: Observable[T]): Observable[T] = concatEffect(effect, source) - @deprecated("Use concatEffect instead", "0.3.0") - def concatAsync[F[_]: RunEffect, T](effect: F[T], source: Observable[T]): Observable[T] = concatEffect(effect, source) def concatEffect[F[_]: RunEffect, T](effect: F[T], source: Observable[T]): Observable[T] = new Observable[T] { def unsafeSubscribe(sink: Observer[T]): Cancelable = { val consecutive = Cancelable.consecutive() @@ -263,8 +233,6 @@ object Observable { @inline def merge[A](sources: Observable[A]*): Observable[A] = mergeIterable(sources) - @deprecated("Use mergeIterable instead", "0.4.5") - def mergeSeq[A](sources: Seq[Observable[A]]): Observable[A] = mergeIterable(sources) def mergeIterable[A](sources: Iterable[Observable[A]]): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { val subscriptions = sources.map { source => @@ -277,8 +245,6 @@ object Observable { @inline def switch[A](sources: Observable[A]*): Observable[A] = switchIterable(sources) - @deprecated("Use switchIterable instead", "0.4.5") - def switchSeq[A](sources: Seq[Observable[A]]): Observable[A] = switchIterable(sources) def switchIterable[A](sources: Iterable[Observable[A]]): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { val variable = Cancelable.variable() @@ -293,8 +259,6 @@ object Observable { @inline def concat[A](sources: Observable[A]*): Observable[A] = concatIterable(sources) - @deprecated("Use concatIterable instead", "0.4.5") - def concatSeq[A](sources: Seq[Observable[A]]): Observable[A] = concatIterable(sources) def concatIterable[A](sources: Iterable[Observable[A]]): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { val consecutive = Cancelable.consecutive() @@ -564,9 +528,6 @@ object Observable { source.unsafeSubscribe(Observer.createFromEither(sink.unsafeOnNext)) } - @deprecated("Use attempt instead", "0.3.0") - def recoverToEither: Observable[Either[Throwable, A]] = source.attempt - def recoverMap(f: Throwable => A): Observable[A] = recover { case t => f(t) } def recover(f: PartialFunction[Throwable, A]): Observable[A] = recoverOption(f andThen (Some(_))) @@ -614,8 +575,6 @@ object Observable { def tapFailedEffect[F[_]: RunEffect: Applicative](f: Throwable => F[Unit]): Observable[A] = attempt.tapEffect(_.swap.traverseTap(f).void).flattenEither - @deprecated("Use .tapSubscribe(f) instead", "0.3.4") - def doOnSubscribe(f: () => Cancelable): Observable[A] = tapSubscribe(f) def tapSubscribe(f: () => Cancelable): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { val cancelable = f() @@ -637,8 +596,6 @@ object Observable { } } - @deprecated("Use .tap(f) instead", "0.3.4") - def doOnNext(f: A => Unit): Observable[A] = tap(f) def tap(f: A => Unit): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { source.unsafeSubscribe(sink.doOnNext { value => @@ -648,8 +605,6 @@ object Observable { } } - @deprecated("Use .tapFailed(f) instead", "0.3.4") - def doOnError(f: Throwable => Unit): Observable[A] = tapFailed(f) def tapFailed(f: Throwable => Unit): Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { source.unsafeSubscribe(sink.doOnError { error => @@ -696,11 +651,6 @@ object Observable { } } - @deprecated("Use mapEffect instead", "0.3.0") - @inline def mapSync[F[_]: RunSyncEffect, B](f: A => F[B]): Observable[B] = mapEffect(f) - @deprecated("Use mapEffect instead", "0.3.0") - def mapAsync[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = mapEffect(f) - @inline def mapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = concatMapEffect(f) def concatMapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = new Observable[B] { def unsafeSubscribe(sink: Observer[B]): Cancelable = { @@ -783,10 +733,6 @@ object Observable { } } - @deprecated("Use singleMapEffect instead", "0.3.0") - def mapAsyncSingleOrDrop[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = singleMapEffect(f) - @deprecated("Use singleMapEffect instead", "0.7.2") - def mapEffectSingleOrDrop[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = singleMapEffect(f) def singleMapEffect[F[_]: RunEffect, B](f: A => F[B]): Observable[B] = new Observable[B] { def unsafeSubscribe(sink: Observer[B]): Cancelable = { val single = Cancelable.singleOrDrop() @@ -817,8 +763,6 @@ object Observable { } } - @deprecated("Use singleMapFuture instead", "0.7.2") - @inline def mapFutureSingleOrDrop[B](f: A => Future[B]): Observable[B] = singleMapFuture(f) @inline def singleMapFuture[B](f: A => Future[B]): Observable[B] = singleMapEffect(v => IO.fromFuture(IO(f(v)))) @@ -1377,21 +1321,6 @@ object Observable { @inline def distinctByOnEquals[B](f: A => B): Observable[A] = distinctBy(f)(Eq.fromUniversalEquals) @inline def distinctOnEquals: Observable[A] = distinct(Eq.fromUniversalEquals) - @deprecated("Manage subscriptions directly with subscribe, via, etc.", "0.4.3") - def withDefaultSubscription(sink: Observer[A]): Observable[A] = new Observable[A] { - private var defaultSubscription = source.unsafeSubscribe(sink) - - def unsafeSubscribe(sink: Observer[A]): Cancelable = { - // stop the default subscription. - if (defaultSubscription != null) { - defaultSubscription.unsafeCancel() - defaultSubscription = null - } - - source.unsafeSubscribe(sink) - } - } - @inline def transformSource[B](transform: Observable[A] => Observable[B]): Observable[B] = new Observable[B] { def unsafeSubscribe(sink: Observer[B]): Cancelable = transform(source).unsafeSubscribe(sink) } @@ -1401,25 +1330,21 @@ object Observable { } @inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]()) - @deprecated("Use replayLatest instead", "0.3.4") - @inline def replay: Connectable[Observable.MaybeValue[A]] = replayLatest - @inline def replayLatest: Connectable[Observable.MaybeValue[A]] = multicastMaybeValue(Subject.replayLatest[A]()) + @inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]()) @inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]()) - @inline def behavior(seed: A): Connectable[Observable.Value[A]] = multicastValue(Subject.behavior(seed)) + @inline def behavior(seed: A): Connectable[Observable[A]] = multicast(Subject.behavior(seed)) @inline def publishShare: Observable[A] = publish.refCount - @inline def replayLatestShare: Observable.MaybeValue[A] = replayLatest.refCount + @inline def replayLatestShare: Observable[A] = replayLatest.refCount @inline def replayAllShare: Observable[A] = replayAll.refCount - @inline def behaviorShare(seed: A): Observable.Value[A] = behavior(seed).refCount + @inline def behaviorShare(seed: A): Observable[A] = behavior(seed).refCount @inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount)) - @deprecated("Use replayLatestSelector instead", "0.3.4") - @inline def replaySelector[B](f: Observable.MaybeValue[A] => Observable[B]): Observable[B] = replayLatestSelector(f) - @inline def replayLatestSelector[B](f: Observable.MaybeValue[A] => Observable[B]): Observable[B] = + @inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.replayLatest.refCount)) @inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.replayAll.refCount)) - @inline def behaviorSelector[B](value: A)(f: Observable.Value[A] => Observable[B]): Observable[B] = + @inline def behaviorSelector[B](value: A)(f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.behavior(value).refCount)) def multicast(pipe: Subject[A]): Connectable[Observable[A]] = Connectable( @@ -1429,32 +1354,11 @@ object Observable { () => source.unsafeSubscribe(pipe), ) - def multicastValue(pipe: Subject.Value[A]): Connectable[Observable.Value[A]] = Connectable( - new Value[A] { - def now(): A = pipe.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = pipe.unsafeSubscribe(sink) - }, - () => source.unsafeSubscribe(pipe), - ) - - def multicastMaybeValue(pipe: Subject.MaybeValue[A]): Connectable[Observable.MaybeValue[A]] = Connectable( - new MaybeValue[A] { - def now(): Option[A] = pipe.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = pipe.unsafeSubscribe(sink) - }, - () => source.unsafeSubscribe(pipe), - ) - def fold[B](seed: B)(f: (B, A) => B): Observable[B] = scan(seed)(f).last def foldF[F[_]: Async, B](seed: B)(f: (B, A) => B): F[B] = scan(seed)(f).lastF[F] def foldIO[B](seed: B)(f: (B, A) => B): IO[B] = scan(seed)(f).lastIO def unsafeFoldFuture[B](seed: B)(f: (B, A) => B): Future[B] = scan(seed)(f).unsafeLastFuture() - @deprecated("Use prependEffect instead", "0.3.0") - @inline def prependSync[F[_]: RunSyncEffect](value: F[A]): Observable[A] = prependEffect(value) - @deprecated("Use prependEffect instead", "0.3.0") - @inline def prependAsync[F[_]: RunEffect](value: F[A]): Observable[A] = prependEffect(value) - @inline def prependEffect[F[_]: RunEffect](value: F[A]): Observable[A] = concatEffect[F, A](value, source) @inline def prependFuture(value: => Future[A]): Observable[A] = concatFuture[A](value, source) @@ -1767,13 +1671,6 @@ object Observable { @inline def unsafeSubscribe(): Cancelable = source.unsafeSubscribe(Observer.empty) @inline def unsafeForeach(f: A => Unit): Cancelable = source.unsafeSubscribe(Observer.create(f)) - - @deprecated("Use unsafeSubscribe(sink) or to(sink).unsafeSubscribe() or to(sink).subscribeF[F] instead", "0.3.0") - @inline def subscribe(sink: Observer[A]): Cancelable = source.unsafeSubscribe(sink) - @deprecated("Use unsafeSubscribe() or subscribeF[F] instead", "0.3.0") - @inline def subscribe(): Cancelable = unsafeSubscribe() - @deprecated("Use unsafeForeach(f) or foreach_(f).subscribeF[F] instead", "0.3.0") - @inline def foreach(f: A => Unit): Cancelable = unsafeForeach(f) } @inline implicit class ThrowableOperations(private val source: Observable[Throwable]) extends AnyVal { @@ -1810,24 +1707,6 @@ object Observable { @inline def flattenSwitch: Observable[A] = source.switchMap(o => ObservableLike[F].toObservable(o)) } - @inline implicit class SubjectValueOperations[A](val handler: Subject.Value[A]) extends AnyVal { - def lens[B](read: A => B)(write: (A, B) => A): Subject.Value[B] = new Observer[B] with Observable.Value[B] { - @inline def now() = read(handler.now()) - @inline def unsafeOnNext(value: B): Unit = handler.unsafeOnNext(write(handler.now(), value)) - @inline def unsafeOnError(error: Throwable): Unit = handler.unsafeOnError(error) - @inline def unsafeSubscribe(sink: Observer[B]): Cancelable = handler.map(read).unsafeSubscribe(sink) - } - } - - @inline implicit class SubjectMaybeValueOperations[A](val handler: Subject.MaybeValue[A]) extends AnyVal { - def lens[B](seed: => A)(read: A => B)(write: (A, B) => A): Subject.MaybeValue[B] = new Observer[B] with Observable.MaybeValue[B] { - @inline def now() = handler.now().map(read) - @inline def unsafeOnNext(value: B): Unit = handler.unsafeOnNext(write(handler.now().getOrElse(seed), value)) - @inline def unsafeOnError(error: Throwable): Unit = handler.unsafeOnError(error) - @inline def unsafeSubscribe(sink: Observer[B]): Cancelable = handler.map(read).unsafeSubscribe(sink) - } - } - @inline implicit class ProSubjectOperations[I, O](val handler: ProSubject[I, O]) extends AnyVal { @inline def transformSubjectSource[O2](g: Observable[O] => Observable[O2]): ProSubject[I, O2] = ProSubject.from[I, O2](handler, g(handler)) @@ -1844,15 +1723,13 @@ object Observable { } @inline implicit class ListSubjectOperations[A](val handler: Subject[Seq[A]]) extends AnyVal { - def sequence: Observable[Seq[Subject.Value[A]]] = new Observable[Seq[Subject.Value[A]]] { - def unsafeSubscribe(sink: Observer[Seq[Subject.Value[A]]]): Cancelable = { + def sequence: Observable[Seq[Subject[A]]] = new Observable[Seq[Subject[A]]] { + def unsafeSubscribe(sink: Observer[Seq[Subject[A]]]): Cancelable = { handler.unsafeSubscribe( Observer.create( { sequence => sink.unsafeOnNext(sequence.zipWithIndex.map { case (a, idx) => - new Observer[A] with Observable.Value[A] { - def now(): A = a - + new Observer[A] with Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { sink.unsafeOnNext(a) Cancelable.empty diff --git a/colibri/src/main/scala/colibri/Observer.scala b/colibri/src/main/scala/colibri/Observer.scala index daa6ca40..3ca0e24d 100644 --- a/colibri/src/main/scala/colibri/Observer.scala +++ b/colibri/src/main/scala/colibri/Observer.scala @@ -29,11 +29,6 @@ object Observer { } } - @deprecated("Use createUnrecovered instead", "") - @inline def unsafeCreate[A]( - consume: A => Unit, - failure: Throwable => Unit = UnhandledErrorReporter.errorSubject.unsafeOnNext, - ): Observer[A] = createUnrecovered(consume, failure) @inline def createUnrecovered[A]( consume: A => Unit, failure: Throwable => Unit = UnhandledErrorReporter.errorSubject.unsafeOnNext, @@ -90,8 +85,6 @@ object Observer { @inline def combine[A](sinks: Observer[A]*): Observer[A] = combineIterable(sinks) - @deprecated("Use combineIterable instead", "0.5.0") - def combineSeq[A](sinks: Seq[Observer[A]]): Observer[A] = combineIterable(sinks) def combineIterable[A](sinks: Iterable[Observer[A]]): Observer[A] = new Observer[A] { def unsafeOnNext(value: A): Unit = sinks.foreach(_.unsafeOnNext(value)) def unsafeOnError(error: Throwable): Unit = sinks.foreach(_.unsafeOnError(error)) @@ -217,11 +210,6 @@ object Observer { Connectable(handler, () => source.unsafeSubscribe(sink)) } - @deprecated("Use unsafeOnNext instead", "") - def onNext(value: A): Unit = sink.unsafeOnNext(value) - @deprecated("Use unsafeOnError instead", "") - def onError(error: Throwable): Unit = sink.unsafeOnError(error) - def onNextF[F[_]: Sync](value: A): F[Unit] = Sync[F].delay(sink.unsafeOnNext(value)) def onNextIO(value: A): IO[Unit] = onNextF[IO](value) def onNextSyncIO(value: A): SyncIO[Unit] = onNextF[SyncIO](value) @@ -232,7 +220,7 @@ object Observer { } @inline implicit class UnitOperations(private val sink: Observer[Unit]) extends AnyVal { - @inline def void: Observer[Any] = sink.contramap(_ => ()) + @inline def void: Observer[Any] = sink.as(()) } @inline implicit class ThrowableOperations(private val sink: Observer[Throwable]) extends AnyVal { diff --git a/colibri/src/main/scala/colibri/Source.scala b/colibri/src/main/scala/colibri/Source.scala index 13efd067..1e6c86ba 100644 --- a/colibri/src/main/scala/colibri/Source.scala +++ b/colibri/src/main/scala/colibri/Source.scala @@ -2,9 +2,6 @@ package colibri trait Source[-H[_]] { def unsafeSubscribe[A](source: H[A])(sink: Observer[A]): Cancelable - - @deprecated("Use unsafeSubscribe instead", "0.2.7") - @inline final def subscribe[A](source: H[A])(sink: Observer[A]): Cancelable = unsafeSubscribe(source)(sink) } object Source { @inline def apply[H[_]](implicit source: Source[H]): Source[H] = source diff --git a/colibri/src/main/scala/colibri/Subject.scala b/colibri/src/main/scala/colibri/Subject.scala index 330ce243..5f67d4d6 100644 --- a/colibri/src/main/scala/colibri/Subject.scala +++ b/colibri/src/main/scala/colibri/Subject.scala @@ -3,7 +3,7 @@ package colibri import scala.scalajs.js import colibri.helpers._ -final class ReplayLatestSubject[A] extends Observer[A] with Observable.MaybeValue[A] { +final class ReplayLatestSubject[A] extends Observer[A] with Observable[A] { private val state = new PublishSubject[A] @@ -59,7 +59,7 @@ final class ReplayAllSubject[A] extends Observer[A] with Observable[A] { } } -final class BehaviorSubject[A](private var current: A) extends Observer[A] with Observable.Value[A] { +final class BehaviorSubject[A](private var current: A) extends Observer[A] with Observable[A] { private val state = new PublishSubject[A] @@ -111,13 +111,6 @@ final class PublishSubject[A] extends Observer[A] with Observable[A] { } object Subject { - type Value[A] = Observer[A] with Observable.Value[A] - type MaybeValue[A] = Observer[A] with Observable.MaybeValue[A] - - @deprecated("Use replayLatest instead", "0.3.4") - def replay[O](): ReplayLatestSubject[O] = replayLatest[O]() - @deprecated("Use replayLatest instead", "0.4.0") - def replayLast[O](): ReplayLatestSubject[O] = replayLatest[O]() def replayLatest[O](): ReplayLatestSubject[O] = new ReplayLatestSubject[O] def replayAll[O](): ReplayAllSubject[O] = new ReplayAllSubject[O] @@ -131,9 +124,6 @@ object Subject { } object ProSubject { - type Value[-I, +O] = Observer[I] with Observable.Value[O] - type MaybeValue[-I, +O] = Observer[I] with Observable.MaybeValue[O] - def from[I, O](sink: Observer[I], source: Observable[O]): ProSubject[I, O] = new Observer[I] with Observable[O] { @inline def unsafeOnNext(value: I): Unit = sink.unsafeOnNext(value) @inline def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error) diff --git a/colibri/src/test/scala/colibri/ObservableSpec.scala b/colibri/src/test/scala/colibri/ObservableSpec.scala index aca4ae23..9914a969 100644 --- a/colibri/src/test/scala/colibri/ObservableSpec.scala +++ b/colibri/src/test/scala/colibri/ObservableSpec.scala @@ -1824,52 +1824,4 @@ class ObservableSpec extends AsyncFlatSpec with Matchers { errors shouldBe 0 cancelable.isEmpty() shouldBe true } - - it should "lens" in { - case class Recipe(user: String, age: Int) - - var receivedString = List.empty[String] - var receivedRecipe = List.empty[Recipe] - var errors = 0 - - val hdlRecipe = Subject.behavior[Recipe](Recipe("hans", 12)) - - val hdlUser = hdlRecipe.lens[String](_.user)((state, newState) => state.copy(user = newState)) - - hdlRecipe.unsafeSubscribe( - Observer.create[Recipe]( - receivedRecipe ::= _, - _ => errors += 1, - ), - ) - - hdlUser.unsafeSubscribe( - Observer.create[String]( - receivedString ::= _, - _ => errors += 1, - ), - ) - - receivedString shouldBe List("hans") - receivedRecipe shouldBe List(Recipe("hans", 12)) - errors shouldBe 0 - - hdlRecipe.unsafeOnNext(Recipe("hans", 13)) - - receivedString shouldBe List("hans", "hans") - receivedRecipe shouldBe List(Recipe("hans", 13), Recipe("hans", 12)) - errors shouldBe 0 - - hdlRecipe.unsafeOnNext(Recipe("gisela", 14)) - - receivedString shouldBe List("gisela", "hans", "hans") - receivedRecipe shouldBe List(Recipe("gisela", 14), Recipe("hans", 13), Recipe("hans", 12)) - errors shouldBe 0 - - hdlUser.unsafeOnNext("dieter") - - receivedString shouldBe List("dieter", "gisela", "hans", "hans") - receivedRecipe shouldBe List(Recipe("dieter", 14), Recipe("gisela", 14), Recipe("hans", 13), Recipe("hans", 12)) - errors shouldBe 0 - } }