Skip to content

Commit

Permalink
remove observables types for Hot/Value/MaybeValue
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Dec 7, 2022
1 parent 6b5d722 commit 3b64d25
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 112 deletions.
36 changes: 3 additions & 33 deletions colibri/src/main/scala/colibri/Connectable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,9 @@ object Connectable
def refCount: Observable[A] = new Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect())
}
@deprecated("Use unsafeHot instead", "0.5.0")
def hot: Observable.Hot[A] = unsafeHot()
def unsafeHot(): Observable.Hot[A] = new Observable.Hot[A] {
val cancelable = source.unsafeConnect()
def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink)
}
}

@inline implicit class ConnectableObservableValueOperations[A](val source: Connectable[Observable.Value[A]]) extends AnyVal {
def refCount: Observable.Value[A] = new Observable.Value[A] {
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect())
}
@deprecated("Use unsafeHot instead", "0.7.8")
def hot: Observable.HotValue[A] = unsafeHot()
def unsafeHot(): Observable.HotValue[A] = new Observable.HotValue[A] {
val cancelable = source.unsafeConnect()
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink)
}
}

@inline implicit class ConnectableObservableMaybeValueOperations[A](val source: Connectable[Observable.MaybeValue[A]]) extends AnyVal {
def refCount: Observable.MaybeValue[A] = new Observable.MaybeValue[A] {
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.unsafeConnect())
}
@deprecated("Use unsafeHot instead", "0.7.8")
def hot: Observable.HotMaybeValue[A] = unsafeHot()
def unsafeHot(): Observable.HotMaybeValue[A] = new Observable.HotMaybeValue[A] {
val cancelable = source.unsafeConnect()
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink)
def unsafeHot(): Observable[A] = {
val _ = source.unsafeConnect()
source.value
}
}
}
73 changes: 9 additions & 64 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,6 @@ object Observable {
def flatMap[B](f: A => Observable[B]): Observable[B]
}

trait Value[+A] extends Observable[A] {
def now(): A
}
trait MaybeValue[+A] extends Observable[A] {
def now(): Option[A]
}

trait HasCancelable {
def cancelable: Cancelable
}

trait Hot[+A] extends Observable[A] with HasCancelable
trait HotValue[+A] extends Value[A] with HasCancelable
trait HotMaybeValue[+A] extends MaybeValue[A] with HasCancelable

object Empty extends Observable[Nothing] {
@inline def unsafeSubscribe(sink: Observer[Nothing]): Cancelable = Cancelable.empty
}
Expand Down Expand Up @@ -1401,25 +1386,21 @@ object Observable {
}

@inline def publish: Connectable[Observable[A]] = multicast(Subject.publish[A]())
@deprecated("Use replayLatest instead", "0.3.4")
@inline def replay: Connectable[Observable.MaybeValue[A]] = replayLatest
@inline def replayLatest: Connectable[Observable.MaybeValue[A]] = multicastMaybeValue(Subject.replayLatest[A]())
@inline def replayLatest: Connectable[Observable[A]] = multicast(Subject.replayLatest[A]())
@inline def replayAll: Connectable[Observable[A]] = multicast(Subject.replayAll[A]())
@inline def behavior(seed: A): Connectable[Observable.Value[A]] = multicastValue(Subject.behavior(seed))
@inline def behavior(seed: A): Connectable[Observable[A]] = multicast(Subject.behavior(seed))

@inline def publishShare: Observable[A] = publish.refCount
@inline def replayLatestShare: Observable.MaybeValue[A] = replayLatest.refCount
@inline def replayLatestShare: Observable[A] = replayLatest.refCount
@inline def replayAllShare: Observable[A] = replayAll.refCount
@inline def behaviorShare(seed: A): Observable.Value[A] = behavior(seed).refCount
@inline def behaviorShare(seed: A): Observable[A] = behavior(seed).refCount

@inline def publishSelector[B](f: Observable[A] => Observable[B]): Observable[B] = transformSource(s => f(s.publish.refCount))
@deprecated("Use replayLatestSelector instead", "0.3.4")
@inline def replaySelector[B](f: Observable.MaybeValue[A] => Observable[B]): Observable[B] = replayLatestSelector(f)
@inline def replayLatestSelector[B](f: Observable.MaybeValue[A] => Observable[B]): Observable[B] =
@inline def replayLatestSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.replayLatest.refCount))
@inline def replayAllSelector[B](f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.replayAll.refCount))
@inline def behaviorSelector[B](value: A)(f: Observable.Value[A] => Observable[B]): Observable[B] =
@inline def behaviorSelector[B](value: A)(f: Observable[A] => Observable[B]): Observable[B] =
transformSource(s => f(s.behavior(value).refCount))

def multicast(pipe: Subject[A]): Connectable[Observable[A]] = Connectable(
Expand All @@ -1429,22 +1410,6 @@ object Observable {
() => source.unsafeSubscribe(pipe),
)

def multicastValue(pipe: Subject.Value[A]): Connectable[Observable.Value[A]] = Connectable(
new Value[A] {
def now(): A = pipe.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = pipe.unsafeSubscribe(sink)
},
() => source.unsafeSubscribe(pipe),
)

def multicastMaybeValue(pipe: Subject.MaybeValue[A]): Connectable[Observable.MaybeValue[A]] = Connectable(
new MaybeValue[A] {
def now(): Option[A] = pipe.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = pipe.unsafeSubscribe(sink)
},
() => source.unsafeSubscribe(pipe),
)

def fold[B](seed: B)(f: (B, A) => B): Observable[B] = scan(seed)(f).last
def foldF[F[_]: Async, B](seed: B)(f: (B, A) => B): F[B] = scan(seed)(f).lastF[F]
def foldIO[B](seed: B)(f: (B, A) => B): IO[B] = scan(seed)(f).lastIO
Expand Down Expand Up @@ -1810,24 +1775,6 @@ object Observable {
@inline def flattenSwitch: Observable[A] = source.switchMap(o => ObservableLike[F].toObservable(o))
}

@inline implicit class SubjectValueOperations[A](val handler: Subject.Value[A]) extends AnyVal {
def lens[B](read: A => B)(write: (A, B) => A): Subject.Value[B] = new Observer[B] with Observable.Value[B] {
@inline def now() = read(handler.now())
@inline def unsafeOnNext(value: B): Unit = handler.unsafeOnNext(write(handler.now(), value))
@inline def unsafeOnError(error: Throwable): Unit = handler.unsafeOnError(error)
@inline def unsafeSubscribe(sink: Observer[B]): Cancelable = handler.map(read).unsafeSubscribe(sink)
}
}

@inline implicit class SubjectMaybeValueOperations[A](val handler: Subject.MaybeValue[A]) extends AnyVal {
def lens[B](seed: => A)(read: A => B)(write: (A, B) => A): Subject.MaybeValue[B] = new Observer[B] with Observable.MaybeValue[B] {
@inline def now() = handler.now().map(read)
@inline def unsafeOnNext(value: B): Unit = handler.unsafeOnNext(write(handler.now().getOrElse(seed), value))
@inline def unsafeOnError(error: Throwable): Unit = handler.unsafeOnError(error)
@inline def unsafeSubscribe(sink: Observer[B]): Cancelable = handler.map(read).unsafeSubscribe(sink)
}
}

@inline implicit class ProSubjectOperations[I, O](val handler: ProSubject[I, O]) extends AnyVal {
@inline def transformSubjectSource[O2](g: Observable[O] => Observable[O2]): ProSubject[I, O2] =
ProSubject.from[I, O2](handler, g(handler))
Expand All @@ -1844,15 +1791,13 @@ object Observable {
}

@inline implicit class ListSubjectOperations[A](val handler: Subject[Seq[A]]) extends AnyVal {
def sequence: Observable[Seq[Subject.Value[A]]] = new Observable[Seq[Subject.Value[A]]] {
def unsafeSubscribe(sink: Observer[Seq[Subject.Value[A]]]): Cancelable = {
def sequence: Observable[Seq[Subject[A]]] = new Observable[Seq[Subject[A]]] {
def unsafeSubscribe(sink: Observer[Seq[Subject[A]]]): Cancelable = {
handler.unsafeSubscribe(
Observer.create(
{ sequence =>
sink.unsafeOnNext(sequence.zipWithIndex.map { case (a, idx) =>
new Observer[A] with Observable.Value[A] {
def now(): A = a

new Observer[A] with Observable[A] {
def unsafeSubscribe(sink: Observer[A]): Cancelable = {
sink.unsafeOnNext(a)
Cancelable.empty
Expand Down
6 changes: 3 additions & 3 deletions colibri/src/main/scala/colibri/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ object Observer {

@inline def combine[A](sinks: Observer[A]*): Observer[A] = combineIterable(sinks)

@deprecated("Use combineIterable instead", "0.5.0")
def combineSeq[A](sinks: Seq[Observer[A]]): Observer[A] = combineIterable(sinks)
def combineIterable[A](sinks: Iterable[Observer[A]]): Observer[A] = new Observer[A] {
def unsafeOnNext(value: A): Unit = sinks.foreach(_.unsafeOnNext(value))
def unsafeOnError(error: Throwable): Unit = sinks.foreach(_.unsafeOnError(error))
Expand Down Expand Up @@ -172,6 +170,8 @@ object Observer {
def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error)
}

def as[T](value: A): Observer[Any] = sink.contramap(_ => value)

def tap(f: A => Unit): Observer[A] = new Observer[A] {
def unsafeOnNext(value: A): Unit = {
f(value)
Expand Down Expand Up @@ -230,7 +230,7 @@ object Observer {
}

@inline implicit class UnitOperations(private val sink: Observer[Unit]) extends AnyVal {
@inline def void: Observer[Any] = sink.contramap(_ => ())
@inline def void: Observer[Any] = sink.as(())
}

@inline implicit class ThrowableOperations(private val sink: Observer[Throwable]) extends AnyVal {
Expand Down
14 changes: 2 additions & 12 deletions colibri/src/main/scala/colibri/Subject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package colibri
import scala.scalajs.js
import colibri.helpers._

final class ReplayLatestSubject[A] extends Observer[A] with Observable.MaybeValue[A] {
final class ReplayLatestSubject[A] extends Observer[A] with Observable[A] {

private val state = new PublishSubject[A]

Expand Down Expand Up @@ -59,7 +59,7 @@ final class ReplayAllSubject[A] extends Observer[A] with Observable[A] {
}
}

final class BehaviorSubject[A](private var current: A) extends Observer[A] with Observable.Value[A] {
final class BehaviorSubject[A](private var current: A) extends Observer[A] with Observable[A] {

private val state = new PublishSubject[A]

Expand Down Expand Up @@ -111,13 +111,6 @@ final class PublishSubject[A] extends Observer[A] with Observable[A] {
}

object Subject {
type Value[A] = Observer[A] with Observable.Value[A]
type MaybeValue[A] = Observer[A] with Observable.MaybeValue[A]

@deprecated("Use replayLatest instead", "0.3.4")
def replay[O](): ReplayLatestSubject[O] = replayLatest[O]()
@deprecated("Use replayLatest instead", "0.4.0")
def replayLast[O](): ReplayLatestSubject[O] = replayLatest[O]()
def replayLatest[O](): ReplayLatestSubject[O] = new ReplayLatestSubject[O]
def replayAll[O](): ReplayAllSubject[O] = new ReplayAllSubject[O]

Expand All @@ -131,9 +124,6 @@ object Subject {
}

object ProSubject {
type Value[-I, +O] = Observer[I] with Observable.Value[O]
type MaybeValue[-I, +O] = Observer[I] with Observable.MaybeValue[O]

def from[I, O](sink: Observer[I], source: Observable[O]): ProSubject[I, O] = new Observer[I] with Observable[O] {
@inline def unsafeOnNext(value: I): Unit = sink.unsafeOnNext(value)
@inline def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error)
Expand Down

0 comments on commit 3b64d25

Please sign in to comment.