diff --git a/colibri/src/main/scala/colibri/Connectable.scala b/colibri/src/main/scala/colibri/Connectable.scala index 80a3b0fb..03859e4f 100644 --- a/colibri/src/main/scala/colibri/Connectable.scala +++ b/colibri/src/main/scala/colibri/Connectable.scala @@ -17,39 +17,9 @@ object Connectable def refCount: Observable[A] = new Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect()) } - @deprecated("Use unsafeHot instead", "0.5.0") - def hot: Observable.Hot[A] = unsafeHot() - def unsafeHot(): Observable.Hot[A] = new Observable.Hot[A] { - val cancelable = source.unsafeConnect() - def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink) - } - } - - @inline implicit class ConnectableObservableValueOperations[A](val source: Connectable[Observable.Value[A]]) extends AnyVal { - def refCount: Observable.Value[A] = new Observable.Value[A] { - def now() = source.value.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect()) - } - @deprecated("Use unsafeHot instead", "0.7.8") - def hot: Observable.HotValue[A] = unsafeHot() - def unsafeHot(): Observable.HotValue[A] = new Observable.HotValue[A] { - val cancelable = source.unsafeConnect() - def now() = source.value.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink) - } - } - - @inline implicit class ConnectableObservableMaybeValueOperations[A](val source: Connectable[Observable.MaybeValue[A]]) extends AnyVal { - def refCount: Observable.MaybeValue[A] = new Observable.MaybeValue[A] { - def now() = source.value.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect()) - } - @deprecated("Use unsafeHot instead", "0.7.8") - def hot: Observable.HotMaybeValue[A] = unsafeHot() - def unsafeHot(): Observable.HotMaybeValue[A] = new Observable.HotMaybeValue[A] { - val cancelable = source.unsafeConnect() - def now() = source.value.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink) + def unsafeHot(): Observable[A] = { + val _ = source.unsafeConnect() + source.value } } } diff --git a/colibri/src/main/scala/colibri/Observable.scala b/colibri/src/main/scala/colibri/Observable.scala index 1b7114a4..40c4cb3e 100644 --- a/colibri/src/main/scala/colibri/Observable.scala +++ b/colibri/src/main/scala/colibri/Observable.scala @@ -85,21 +85,6 @@ object Observable { def flatMap[B](f: A => Observable[B]): Observable[B] } - trait Value[+A] extends Observable[A] { - def now(): A - } - trait MaybeValue[+A] extends Observable[A] { - def now(): Option[A] - } - - trait HasCancelable { - def cancelable: Cancelable - } - - trait Hot[+A] extends Observable[A] with HasCancelable - trait HotValue[+A] extends Value[A] with HasCancelable - trait HotMaybeValue[+A] extends MaybeValue[A] with HasCancelable - object Empty extends Observable[Nothing] { @inline def unsafeSubscribe(sink: Observer[Nothing]): Cancelable = Cancelable.empty } @@ -1401,25 +1386,21 @@ object Observable { } @inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]()) - @deprecated("Use replayLatest instead", "0.3.4") - @inline def replay: Connectable[Observable.MaybeValue[A]] = replayLatest - @inline def replayLatest: Connectable[Observable.MaybeValue[A]] = multicastMaybeValue(Subject.replayLatest[A]()) + @inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]()) @inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]()) - @inline def behavior(seed: A): Connectable[Observable.Value[A]] = multicastValue(Subject.behavior(seed)) + @inline def behavior(seed: A): Connectable[Observable[A]] = multicast(Subject.behavior(seed)) @inline def publishShare: Observable[A] = publish.refCount - @inline def replayLatestShare: Observable.MaybeValue[A] = replayLatest.refCount + @inline def replayLatestShare: Observable[A] = replayLatest.refCount @inline def replayAllShare: Observable[A] = replayAll.refCount - @inline def behaviorShare(seed: A): Observable.Value[A] = behavior(seed).refCount + @inline def behaviorShare(seed: A): Observable[A] = behavior(seed).refCount @inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount)) - @deprecated("Use replayLatestSelector instead", "0.3.4") - @inline def replaySelector[B](f: Observable.MaybeValue[A] => Observable[B]): Observable[B] = replayLatestSelector(f) - @inline def replayLatestSelector[B](f: Observable.MaybeValue[A] => Observable[B]): Observable[B] = + @inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.replayLatest.refCount)) @inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.replayAll.refCount)) - @inline def behaviorSelector[B](value: A)(f: Observable.Value[A] => Observable[B]): Observable[B] = + @inline def behaviorSelector[B](value: A)(f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.behavior(value).refCount)) def multicast(pipe: Subject[A]): Connectable[Observable[A]] = Connectable( @@ -1429,22 +1410,6 @@ object Observable { () => source.unsafeSubscribe(pipe), ) - def multicastValue(pipe: Subject.Value[A]): Connectable[Observable.Value[A]] = Connectable( - new Value[A] { - def now(): A = pipe.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = pipe.unsafeSubscribe(sink) - }, - () => source.unsafeSubscribe(pipe), - ) - - def multicastMaybeValue(pipe: Subject.MaybeValue[A]): Connectable[Observable.MaybeValue[A]] = Connectable( - new MaybeValue[A] { - def now(): Option[A] = pipe.now() - def unsafeSubscribe(sink: Observer[A]): Cancelable = pipe.unsafeSubscribe(sink) - }, - () => source.unsafeSubscribe(pipe), - ) - def fold[B](seed: B)(f: (B, A) => B): Observable[B] = scan(seed)(f).last def foldF[F[_]: Async, B](seed: B)(f: (B, A) => B): F[B] = scan(seed)(f).lastF[F] def foldIO[B](seed: B)(f: (B, A) => B): IO[B] = scan(seed)(f).lastIO @@ -1810,24 +1775,6 @@ object Observable { @inline def flattenSwitch: Observable[A] = source.switchMap(o => ObservableLike[F].toObservable(o)) } - @inline implicit class SubjectValueOperations[A](val handler: Subject.Value[A]) extends AnyVal { - def lens[B](read: A => B)(write: (A, B) => A): Subject.Value[B] = new Observer[B] with Observable.Value[B] { - @inline def now() = read(handler.now()) - @inline def unsafeOnNext(value: B): Unit = handler.unsafeOnNext(write(handler.now(), value)) - @inline def unsafeOnError(error: Throwable): Unit = handler.unsafeOnError(error) - @inline def unsafeSubscribe(sink: Observer[B]): Cancelable = handler.map(read).unsafeSubscribe(sink) - } - } - - @inline implicit class SubjectMaybeValueOperations[A](val handler: Subject.MaybeValue[A]) extends AnyVal { - def lens[B](seed: => A)(read: A => B)(write: (A, B) => A): Subject.MaybeValue[B] = new Observer[B] with Observable.MaybeValue[B] { - @inline def now() = handler.now().map(read) - @inline def unsafeOnNext(value: B): Unit = handler.unsafeOnNext(write(handler.now().getOrElse(seed), value)) - @inline def unsafeOnError(error: Throwable): Unit = handler.unsafeOnError(error) - @inline def unsafeSubscribe(sink: Observer[B]): Cancelable = handler.map(read).unsafeSubscribe(sink) - } - } - @inline implicit class ProSubjectOperations[I, O](val handler: ProSubject[I, O]) extends AnyVal { @inline def transformSubjectSource[O2](g: Observable[O] => Observable[O2]): ProSubject[I, O2] = ProSubject.from[I, O2](handler, g(handler)) @@ -1844,15 +1791,13 @@ object Observable { } @inline implicit class ListSubjectOperations[A](val handler: Subject[Seq[A]]) extends AnyVal { - def sequence: Observable[Seq[Subject.Value[A]]] = new Observable[Seq[Subject.Value[A]]] { - def unsafeSubscribe(sink: Observer[Seq[Subject.Value[A]]]): Cancelable = { + def sequence: Observable[Seq[Subject[A]]] = new Observable[Seq[Subject[A]]] { + def unsafeSubscribe(sink: Observer[Seq[Subject[A]]]): Cancelable = { handler.unsafeSubscribe( Observer.create( { sequence => sink.unsafeOnNext(sequence.zipWithIndex.map { case (a, idx) => - new Observer[A] with Observable.Value[A] { - def now(): A = a - + new Observer[A] with Observable[A] { def unsafeSubscribe(sink: Observer[A]): Cancelable = { sink.unsafeOnNext(a) Cancelable.empty diff --git a/colibri/src/main/scala/colibri/Observer.scala b/colibri/src/main/scala/colibri/Observer.scala index 73bf7a17..29395afb 100644 --- a/colibri/src/main/scala/colibri/Observer.scala +++ b/colibri/src/main/scala/colibri/Observer.scala @@ -90,8 +90,6 @@ object Observer { @inline def combine[A](sinks: Observer[A]*): Observer[A] = combineIterable(sinks) - @deprecated("Use combineIterable instead", "0.5.0") - def combineSeq[A](sinks: Seq[Observer[A]]): Observer[A] = combineIterable(sinks) def combineIterable[A](sinks: Iterable[Observer[A]]): Observer[A] = new Observer[A] { def unsafeOnNext(value: A): Unit = sinks.foreach(_.unsafeOnNext(value)) def unsafeOnError(error: Throwable): Unit = sinks.foreach(_.unsafeOnError(error)) @@ -172,6 +170,8 @@ object Observer { def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error) } + def as[T](value: A): Observer[Any] = sink.contramap(_ => value) + def tap(f: A => Unit): Observer[A] = new Observer[A] { def unsafeOnNext(value: A): Unit = { f(value) @@ -230,7 +230,7 @@ object Observer { } @inline implicit class UnitOperations(private val sink: Observer[Unit]) extends AnyVal { - @inline def void: Observer[Any] = sink.contramap(_ => ()) + @inline def void: Observer[Any] = sink.as(()) } @inline implicit class ThrowableOperations(private val sink: Observer[Throwable]) extends AnyVal { diff --git a/colibri/src/main/scala/colibri/Subject.scala b/colibri/src/main/scala/colibri/Subject.scala index 330ce243..5f67d4d6 100644 --- a/colibri/src/main/scala/colibri/Subject.scala +++ b/colibri/src/main/scala/colibri/Subject.scala @@ -3,7 +3,7 @@ package colibri import scala.scalajs.js import colibri.helpers._ -final class ReplayLatestSubject[A] extends Observer[A] with Observable.MaybeValue[A] { +final class ReplayLatestSubject[A] extends Observer[A] with Observable[A] { private val state = new PublishSubject[A] @@ -59,7 +59,7 @@ final class ReplayAllSubject[A] extends Observer[A] with Observable[A] { } } -final class BehaviorSubject[A](private var current: A) extends Observer[A] with Observable.Value[A] { +final class BehaviorSubject[A](private var current: A) extends Observer[A] with Observable[A] { private val state = new PublishSubject[A] @@ -111,13 +111,6 @@ final class PublishSubject[A] extends Observer[A] with Observable[A] { } object Subject { - type Value[A] = Observer[A] with Observable.Value[A] - type MaybeValue[A] = Observer[A] with Observable.MaybeValue[A] - - @deprecated("Use replayLatest instead", "0.3.4") - def replay[O](): ReplayLatestSubject[O] = replayLatest[O]() - @deprecated("Use replayLatest instead", "0.4.0") - def replayLast[O](): ReplayLatestSubject[O] = replayLatest[O]() def replayLatest[O](): ReplayLatestSubject[O] = new ReplayLatestSubject[O] def replayAll[O](): ReplayAllSubject[O] = new ReplayAllSubject[O] @@ -131,9 +124,6 @@ object Subject { } object ProSubject { - type Value[-I, +O] = Observer[I] with Observable.Value[O] - type MaybeValue[-I, +O] = Observer[I] with Observable.MaybeValue[O] - def from[I, O](sink: Observer[I], source: Observable[O]): ProSubject[I, O] = new Observer[I] with Observable[O] { @inline def unsafeOnNext(value: I): Unit = sink.unsafeOnNext(value) @inline def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error)