From 42c2fd9a02b9ddce14a490e3da20afb51066d80d Mon Sep 17 00:00:00 2001 From: johannes karoff Date: Sun, 4 Dec 2022 03:38:01 +0100 Subject: [PATCH] make reactive state variables lazy --- .../colibri/reactive/LiveOwnerPlatform.scala | 15 + .../colibri/reactive/OwnedPlatform.scala | 9 - .../colibri/reactive/OwnerPlatform.scala | 27 -- .../scala-2/colibri/reactive/RxPlatform.scala | 2 +- .../reactive/internal/MacroUtils.scala | 41 +- ...Platform.scala => LiveOwnerPlatform.scala} | 2 - .../colibri/reactive/OwnedPlatform.scala | 8 - .../scala-3/colibri/reactive/RxPlatform.scala | 2 +- .../scala/colibri/reactive/LiveOwner.scala | 36 ++ .../main/scala/colibri/reactive/Owned.scala | 12 - .../main/scala/colibri/reactive/Owner.scala | 69 --- .../scala/colibri/reactive/Reactive.scala | 134 +++--- .../scala/colibri/reactive/implicits.scala | 16 - .../src/test/scala/colibri/ReactiveSpec.scala | 448 +++++++++--------- 14 files changed, 340 insertions(+), 481 deletions(-) create mode 100644 reactive/src/main/scala-2/colibri/reactive/LiveOwnerPlatform.scala delete mode 100644 reactive/src/main/scala-2/colibri/reactive/OwnedPlatform.scala delete mode 100644 reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala rename reactive/src/main/scala-3/colibri/reactive/{OwnerPlatform.scala => LiveOwnerPlatform.scala} (70%) delete mode 100644 reactive/src/main/scala-3/colibri/reactive/OwnedPlatform.scala create mode 100644 reactive/src/main/scala/colibri/reactive/LiveOwner.scala delete mode 100644 reactive/src/main/scala/colibri/reactive/Owned.scala delete mode 100644 reactive/src/main/scala/colibri/reactive/Owner.scala delete mode 100644 reactive/src/main/scala/colibri/reactive/implicits.scala diff --git a/reactive/src/main/scala-2/colibri/reactive/LiveOwnerPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/LiveOwnerPlatform.scala new file mode 100644 index 00000000..db505c9b --- /dev/null +++ b/reactive/src/main/scala-2/colibri/reactive/LiveOwnerPlatform.scala @@ -0,0 +1,15 @@ +package colibri.reactive + +import colibri.{Observable, Cancelable} + +trait LiveOwnerPlatform { + @annotation.compileTimeOnly( + "No implicit LiveOwner is available here! Wrap inside `Rx { }`, or provide an implicit `LiveOwner`.", + ) + implicit object compileTimeMock extends LiveOwner { + def cancelable: Cancelable = ??? + def unsafeNow[A](rx: Rx[A]): A = ??? + def unsafeLive[A](rx: Rx[A]): A = ??? + def liveObservable: Observable[Any] = ??? + } +} diff --git a/reactive/src/main/scala-2/colibri/reactive/OwnedPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/OwnedPlatform.scala deleted file mode 100644 index 891830a2..00000000 --- a/reactive/src/main/scala-2/colibri/reactive/OwnedPlatform.scala +++ /dev/null @@ -1,9 +0,0 @@ -package colibri.reactive - -import colibri.reactive.internal.MacroUtils -import colibri.effect.SyncEmbed -import colibri.SubscriptionOwner - -trait OwnedPlatform { - def apply[R: SubscriptionOwner: SyncEmbed](f: R): R = macro MacroUtils.ownedImpl[R] -} diff --git a/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala deleted file mode 100644 index ab9ad212..00000000 --- a/reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala +++ /dev/null @@ -1,27 +0,0 @@ -package colibri.reactive - -import colibri.{Observable, Cancelable} - -trait OwnerPlatform { - @annotation.compileTimeOnly( - "No implicit Owner is available here! Wrap inside `Owned { }`, or provide an implicit `Owner`, or `import Owner.unsafeImplicits._` (dangerous).", - ) - implicit object compileTimeMock extends Owner { - def unsafeSubscribe(): Cancelable = ??? - def unsafeOwn(subscription: () => Cancelable): Unit = ??? - def cancelable: Cancelable = ??? - } -} - -trait LiveOwnerPlatform { - @annotation.compileTimeOnly( - "No implicit LiveOwner is available here! Wrap inside `Rx { }`, or provide an implicit `LiveOwner`.", - ) - implicit object compileTimeMock extends LiveOwner { - def unsafeSubscribe(): Cancelable = ??? - def unsafeOwn(subscription: () => Cancelable): Unit = ??? - def cancelable: Cancelable = ??? - def unsafeLive[A](rx: Rx[A]): A = ??? - def liveObservable: Observable[Any] = ??? - } -} diff --git a/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala b/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala index 8aa1c5e2..bea3f090 100644 --- a/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala +++ b/reactive/src/main/scala-2/colibri/reactive/RxPlatform.scala @@ -3,5 +3,5 @@ package colibri.reactive import colibri.reactive.internal.MacroUtils trait RxPlatform { - def apply[R](f: R)(implicit owner: Owner): Rx[R] = macro MacroUtils.rxImpl[R] + def apply[R](f: R): Rx[R] = macro MacroUtils.rxImpl[R] } diff --git a/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala b/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala index 480029ce..cab7942a 100644 --- a/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala +++ b/reactive/src/main/scala-2/colibri/reactive/internal/MacroUtils.scala @@ -1,30 +1,25 @@ package colibri.reactive.internal -import colibri.reactive.{Rx, Owner, LiveOwner} -import colibri.effect.SyncEmbed -import colibri.SubscriptionOwner +import colibri.reactive.{Rx, LiveOwner} import scala.reflect.macros._ // Inspired by scala.rx object MacroUtils { - private val ownerName = "colibriOwner" private val liveOwnerName = "colibriLiveOwner" - def injectOwner[T](c: blackbox.Context)(src: c.Tree, newOwner: c.universe.TermName, exceptOwner: c.Type): c.Tree = { + def injectOwner[T](c: blackbox.Context)(src: c.Tree, newOwner: c.universe.TermName): c.Tree = { import c.universe._ - val implicitOwnerAtCaller = c.inferImplicitValue(typeOf[Owner], silent = false) val implicitLiveOwnerAtCaller = c.inferImplicitValue(typeOf[LiveOwner], silent = false) object transformer extends c.universe.Transformer { override def transform(tree: c.Tree): c.Tree = { val shouldReplaceOwner = tree != null && tree.isTerm && - (tree.tpe =:= implicitOwnerAtCaller.tpe || tree.tpe =:= implicitLiveOwnerAtCaller.tpe) && - tree.tpe <:< typeOf[Owner] && - !(tree.tpe =:= typeOf[Nothing]) && - !(tree.tpe <:< exceptOwner) + tree.tpe =:= implicitLiveOwnerAtCaller.tpe && + tree.tpe <:< typeOf[LiveOwner] && + !(tree.tpe =:= typeOf[Nothing]) if (shouldReplaceOwner) q"$newOwner" else super.transform(tree) @@ -33,37 +28,17 @@ object MacroUtils { transformer.transform(src) } - def ownedImpl[R]( - c: blackbox.Context, - )(f: c.Expr[R])(subscriptionOwner: c.Expr[SubscriptionOwner[R]], syncEmbed: c.Expr[SyncEmbed[R]]): c.Expr[R] = { - import c.universe._ - - val newOwner = c.freshName(TermName(ownerName)) - - val newTree = c.untypecheck(injectOwner(c)(f.tree, newOwner, typeOf[LiveOwner])) - - val tree = q""" - _root_.colibri.reactive.Owned.function { ($newOwner: _root_.colibri.reactive.Owner) => - $newTree - }($subscriptionOwner, $syncEmbed) - """ - - // println(tree) - - c.Expr(tree) - } - - def rxImpl[R](c: blackbox.Context)(f: c.Expr[R])(owner: c.Expr[Owner]): c.Expr[Rx[R]] = { + def rxImpl[R](c: blackbox.Context)(f: c.Expr[R]): c.Expr[Rx[R]] = { import c.universe._ val newOwner = c.freshName(TermName(liveOwnerName)) - val newTree = c.untypecheck(injectOwner(c)(f.tree, newOwner, typeOf[Nothing])) + val newTree = c.untypecheck(injectOwner(c)(f.tree, newOwner)) val tree = q""" _root_.colibri.reactive.Rx.function { ($newOwner: _root_.colibri.reactive.LiveOwner) => $newTree - }($owner) + } """ // println(tree) diff --git a/reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala b/reactive/src/main/scala-3/colibri/reactive/LiveOwnerPlatform.scala similarity index 70% rename from reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala rename to reactive/src/main/scala-3/colibri/reactive/LiveOwnerPlatform.scala index 3c8f4ef2..0248f72f 100644 --- a/reactive/src/main/scala-3/colibri/reactive/OwnerPlatform.scala +++ b/reactive/src/main/scala-3/colibri/reactive/LiveOwnerPlatform.scala @@ -1,5 +1,3 @@ package colibri.reactive -trait OwnerPlatform - trait LiveOwnerPlatform diff --git a/reactive/src/main/scala-3/colibri/reactive/OwnedPlatform.scala b/reactive/src/main/scala-3/colibri/reactive/OwnedPlatform.scala deleted file mode 100644 index 31a4914d..00000000 --- a/reactive/src/main/scala-3/colibri/reactive/OwnedPlatform.scala +++ /dev/null @@ -1,8 +0,0 @@ -package colibri.reactive - -import colibri.effect.SyncEmbed -import colibri.SubscriptionOwner - -trait OwnedPlatform { - def apply[R: SubscriptionOwner: SyncEmbed](f: Owner ?=> R): R = Owned.function(implicit owner => f) -} diff --git a/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala b/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala index 3fc09170..5f97dbed 100644 --- a/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala +++ b/reactive/src/main/scala-3/colibri/reactive/RxPlatform.scala @@ -1,5 +1,5 @@ package colibri.reactive trait RxPlatform { - def apply[R](f: LiveOwner ?=> R)(implicit owner: Owner): Rx[R] = Rx.function(implicit owner => f) + def apply[R](f: LiveOwner ?=> R): Rx[R] = Rx.function(implicit owner => f) } diff --git a/reactive/src/main/scala/colibri/reactive/LiveOwner.scala b/reactive/src/main/scala/colibri/reactive/LiveOwner.scala new file mode 100644 index 00000000..2772deed --- /dev/null +++ b/reactive/src/main/scala/colibri/reactive/LiveOwner.scala @@ -0,0 +1,36 @@ +package colibri.reactive + +import colibri._ + +@annotation.implicitNotFound( + "No implicit LiveOwner is available here! Wrap inside `Rx { }`, or provide an implicit `LiveOwner`.", +) +trait LiveOwner { + def liveObservable: Observable[Any] + + def unsafeLive[A](rx: Rx[A]): A + def unsafeNow[A](rx: Rx[A]): A + + def cancelable: Cancelable +} +object LiveOwner extends LiveOwnerPlatform { + def create(): LiveOwner = new LiveOwner { + private val ref = Cancelable.builder() + + private val subject = Subject.publish[Any]() + + val cancelable: Cancelable = ref + + val liveObservable: Observable[Any] = subject + + def unsafeNow[A](rx: Rx[A]): A = { + ref.unsafeAdd(() => rx.observable.unsafeSubscribe()) + rx.nowGet() + } + + def unsafeLive[A](rx: Rx[A]): A = { + ref.unsafeAdd(() => rx.observable.to(subject).unsafeSubscribe()) + rx.nowGet() + } + } +} diff --git a/reactive/src/main/scala/colibri/reactive/Owned.scala b/reactive/src/main/scala/colibri/reactive/Owned.scala deleted file mode 100644 index 2ee2f8b6..00000000 --- a/reactive/src/main/scala/colibri/reactive/Owned.scala +++ /dev/null @@ -1,12 +0,0 @@ -package colibri.reactive - -import colibri.effect.SyncEmbed -import colibri.SubscriptionOwner - -object Owned extends OwnedPlatform { - def function[R: SubscriptionOwner: SyncEmbed](f: Owner => R): R = SyncEmbed[R].delay { - val owner = Owner.unsafeHotRef() - val result = f(owner) - SubscriptionOwner[R].own(result)(owner.unsafeSubscribe) - } -} diff --git a/reactive/src/main/scala/colibri/reactive/Owner.scala b/reactive/src/main/scala/colibri/reactive/Owner.scala deleted file mode 100644 index b7bea9a7..00000000 --- a/reactive/src/main/scala/colibri/reactive/Owner.scala +++ /dev/null @@ -1,69 +0,0 @@ -package colibri.reactive - -import colibri._ - -@annotation.implicitNotFound( - "No implicit Owner is available here! Wrap inside `Owned { }`, or provide an implicit `Owner`, or `import Owner.unsafeImplicits._` (dangerous).", -) -trait Owner { - def cancelable: Cancelable - def unsafeSubscribe(): Cancelable - def unsafeOwn(subscription: () => Cancelable): Unit -} -object Owner extends OwnerPlatform { - def unsafeHotRef(): Owner = new Owner { - val refCountBuilder = Cancelable.refCountBuilder() - var initialRef = refCountBuilder.ref() - - def cancelable: Cancelable = refCountBuilder - - def unsafeSubscribe(): Cancelable = if (initialRef == null) { - refCountBuilder.ref() - } else { - val result = initialRef - initialRef = null - result - } - - def unsafeOwn(subscription: () => Cancelable): Unit = refCountBuilder.unsafeAdd(subscription) - } - - object unsafeGlobal extends Owner { - def cancelable: Cancelable = Cancelable.empty - def unsafeSubscribe(): Cancelable = Cancelable.empty - def unsafeOwn(subscription: () => Cancelable): Unit = { - subscription() - () - } - } - - object unsafeImplicits { - implicit def unsafeGlobalOwner: Owner = unsafeGlobal - } -} - -@annotation.implicitNotFound( - "No implicit LiveOwner is available here! Wrap inside `Rx { }`, or provide an implicit `LiveOwner`.", -) -trait LiveOwner extends Owner { - def liveObservable: Observable[Any] - def unsafeLive[A](rx: Rx[A]): A -} -object LiveOwner extends LiveOwnerPlatform { - def unsafeHotRef()(implicit parentOwner: Owner): LiveOwner = new LiveOwner { - val owner: Owner = Owner.unsafeHotRef() - parentOwner.unsafeOwn(() => owner.unsafeSubscribe()) - - val liveObservableArray = new scala.scalajs.js.Array[Observable[Any]]() - val liveObservable: Observable[Any] = Observable.mergeIterable(liveObservableArray) - - def unsafeLive[A](rx: Rx[A]): A = { - liveObservableArray.push(rx.observable) - rx.now() - } - - def unsafeSubscribe(): Cancelable = owner.unsafeSubscribe() - def unsafeOwn(subscription: () => Cancelable): Unit = owner.unsafeOwn(subscription) - def cancelable: Cancelable = owner.cancelable - } -} diff --git a/reactive/src/main/scala/colibri/reactive/Reactive.scala b/reactive/src/main/scala/colibri/reactive/Reactive.scala index 5723518c..32e61459 100644 --- a/reactive/src/main/scala/colibri/reactive/Reactive.scala +++ b/reactive/src/main/scala/colibri/reactive/Reactive.scala @@ -1,6 +1,5 @@ package colibri.reactive -import cats.Monoid import colibri._ import colibri.effect._ import monocle.{Iso, Lens, Prism} @@ -10,43 +9,51 @@ import scala.reflect.ClassTag trait Rx[+A] { def observable: Observable[A] - def now(): A + def nowOption(): Option[A] + + final def nowGet(): A = nowOption().get final def apply()(implicit liveOwner: LiveOwner): A = liveOwner.unsafeLive(this) + final def now()(implicit liveOwner: LiveOwner): A = liveOwner.unsafeNow(this) - final def map[B](f: A => B)(implicit owner: Owner): Rx[B] = transformRxSync(_.map(f)) - final def mapEither[B](f: A => Either[Throwable, B])(implicit owner: Owner): Rx[B] = transformRxSync(_.mapEither(f)) - final def tap(f: A => Unit)(implicit owner: Owner): Rx[A] = transformRxSync(_.tap(f)) + final def map[B](f: A => B): Rx[B] = transformRxSync(_.map(f)) + final def mapEither[B](f: A => Either[Throwable, B]): Rx[B] = transformRxSync(_.mapEither(f)) + final def tap(f: A => Unit): Rx[A] = transformRxSync(_.tap(f)) - final def collect[B](f: PartialFunction[A, B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.collect(f))(seed) + final def collect[B](f: PartialFunction[A, B])(seed: => B): Rx[B] = transformRx(_.collect(f))(seed) - final def mapSyncEffect[F[_]: RunSyncEffect, B](f: A => F[B])(implicit owner: Owner): Rx[B] = transformRxSync(_.mapEffect(f)) - final def mapEffect[F[_]: RunEffect, B](f: A => F[B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.mapEffect(f))(seed) - final def mapFuture[B](f: A => Future[B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.mapFuture(f))(seed) + final def mapSyncEffect[F[_]: RunSyncEffect, B](f: A => F[B]): Rx[B] = transformRxSync(_.mapEffect(f)) + final def mapEffect[F[_]: RunEffect, B](f: A => F[B])(seed: => B): Rx[B] = transformRx(_.mapEffect(f))(seed) + final def mapFuture[B](f: A => Future[B])(seed: => B): Rx[B] = transformRx(_.mapFuture(f))(seed) - final def as[B](value: B)(implicit owner: Owner): Rx[B] = transformRxSync(_.as(value)) - final def asEval[B](value: => B)(implicit owner: Owner): Rx[B] = transformRxSync(_.asEval(value)) + final def as[B](value: B): Rx[B] = transformRxSync(_.as(value)) + final def asEval[B](value: => B): Rx[B] = transformRxSync(_.asEval(value)) - final def asSyncEffect[F[_]: RunSyncEffect, B](value: F[B])(implicit owner: Owner): Rx[B] = transformRxSync(_.asEffect(value)) - final def asEffect[F[_]: RunEffect, B](value: F[B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.asEffect(value))(seed) - final def asFuture[B](value: => Future[B])(seed: => B)(implicit owner: Owner): Rx[B] = transformRx(_.asFuture(value))(seed) + final def asSyncEffect[F[_]: RunSyncEffect, B](value: F[B]): Rx[B] = transformRxSync(_.asEffect(value)) + final def asEffect[F[_]: RunEffect, B](value: F[B])(seed: => B): Rx[B] = transformRx(_.asEffect(value))(seed) + final def asFuture[B](value: => Future[B])(seed: => B): Rx[B] = transformRx(_.asFuture(value))(seed) - final def switchMap[B](f: A => Rx[B])(implicit owner: Owner): Rx[B] = transformRxSync(_.switchMap(f andThen (_.observable))) - final def mergeMap[B](f: A => Rx[B])(implicit owner: Owner): Rx[B] = transformRxSync(_.mergeMap(f andThen (_.observable))) + final def switchMap[B](f: A => Rx[B]): Rx[B] = transformRxSync(_.switchMap(f andThen (_.observable))) + final def mergeMap[B](f: A => Rx[B]): Rx[B] = transformRxSync(_.mergeMap(f andThen (_.observable))) - final def foreach(f: A => Unit)(implicit owner: Owner): Unit = owner.unsafeOwn(() => observable.unsafeForeach(f)) - final def foreachLater(f: A => Unit)(implicit owner: Owner): Unit = owner.unsafeOwn(() => observable.tail.unsafeForeach(f)) + final def transformRx[B](f: Observable[A] => Observable[B])(seed: => B): Rx[B] = Rx.observable(f(observable))(seed) + final def transformRxSync[B](f: Observable[A] => Observable[B]): Rx[B] = Rx.observableSync(f(observable)) - final def transformRx[B](f: Observable[A] => Observable[B])(seed: => B)(implicit owner: Owner): Rx[B] = Rx.observable(f(observable))(seed) - final def transformRxSync[B](f: Observable[A] => Observable[B])(implicit owner: Owner): Rx[B] = Rx.observableSync(f(observable)) + final def unsafeSubscribe(): Cancelable = observable.unsafeSubscribe() + final def unsafeForeach(f: A => Unit): Cancelable = observable.unsafeForeach(f) + final def unsafeForeachLater(f: A => Unit): Cancelable = observable.tail.unsafeForeach(f) + final def unsafeHot(): Rx[A] = { + val _ = unsafeSubscribe() + this + } } object Rx extends RxPlatform { - def function[R](f: LiveOwner => R)(implicit owner: Owner): Rx[R] = { + def function[R](f: LiveOwner => R): Rx[R] = { val subject = Subject.behavior[Any](()) val observable = subject.switchMap { _ => - val liveOwner = LiveOwner.unsafeHotRef() + val liveOwner = LiveOwner.create() val result = f(liveOwner) Observable[R](result) .subscribing(liveOwner.liveObservable.dropSyncAll.head.to(subject)) @@ -58,31 +65,23 @@ object Rx extends RxPlatform { def const[A](value: A): Rx[A] = new RxConst(value) - def observable[A](observable: Observable[A])(seed: => A)(implicit owner: Owner): Rx[A] = observableSync(observable.prependEval(seed)) + def observable[A](observable: Observable[A])(seed: => A): Rx[A] = observableSync(observable.prependEval(seed)) - def observableSync[A](observable: Observable[A])(implicit owner: Owner): Rx[A] = new RxObservableSync(observable) + def observableSync[A](observable: Observable[A]): Rx[A] = new RxObservableSync(observable) @inline implicit final class RxOps[A](private val self: Rx[A]) extends AnyVal { - def scan(f: (A, A) => A)(implicit owner: Owner): Rx[A] = scan(self.now())(f) + def scan(f: (A, A) => A): Rx[A] = scan(self.nowGet())(f) - def scan[B](seed: B)(f: (B, A) => B)(implicit owner: Owner): Rx[B] = self.transformRxSync(_.scan0(seed)(f)) + def scan[B](seed: B)(f: (B, A) => B): Rx[B] = self.transformRxSync(_.scan0(seed)(f)) - def filter(f: A => Boolean)(seed: => A)(implicit owner: Owner): Rx[A] = self.transformRx(_.filter(f))(seed) } @inline implicit class RxBooleanOps(private val source: Rx[Boolean]) extends AnyVal { - @inline def toggle[A](ifTrue: => A, ifFalse: A)(implicit owner: Owner): Rx[A] = source.map { + @inline def toggle[A](ifTrue: => A, ifFalse: A): Rx[A] = source.map { case true => ifTrue case false => ifFalse } - @inline def toggle[A: Monoid](ifTrue: => A)(implicit owner: Owner): Rx[A] = toggle(ifTrue, Monoid[A].empty) - - @inline def negated(implicit owner: Owner): Rx[Boolean] = source.map(x => !x) - } - - implicit object source extends Source[Rx] { - def unsafeSubscribe[A](source: Rx[A])(sink: Observer[A]): Cancelable = source.observable.unsafeSubscribe(sink) } } @@ -119,37 +118,28 @@ object RxWriter { } trait Var[A] extends Rx[A] with RxWriter[A] { - final def update(f: A => A) = this.set(f(this.now())) + final def updateGet(f: A => A) = this.set(f(this.nowGet())) final def transformVar[A2](f: RxWriter[A] => RxWriter[A2])(g: Rx[A] => Rx[A2]): Var[A2] = Var.combine(g(this), f(this)) final def transformVarRx(g: Rx[A] => Rx[A]): Var[A] = Var.combine(g(this), this) final def transformVarRxWriter(f: RxWriter[A] => RxWriter[A]): Var[A] = Var.combine(this, f(this)) - final def imap[A2](f: A2 => A)(g: A => A2)(implicit owner: Owner): Var[A2] = transformVar(_.contramap(f))(_.map(g)) - final def lens[B](read: A => B)(write: (A, B) => A)(implicit owner: Owner): Var[B] = - transformVar(_.contramap(write(now(), _)))(_.map(read)) - - final def prismInit[A2](f: A2 => A)(g: A => Option[A2])(initial: A2)(implicit owner: Owner): Var[A2] = - transformVar(_.contramap(f))(rx => Rx.observableSync(rx.observable.mapFilter(g).prepend(initial))) - - final def prism[A2](f: A2 => A)(g: A => Option[A2])(implicit owner: Owner): Option[Var[A2]] = - g(now()).map(prismInit(f)(g)(_)) + final def imap[A2](f: A2 => A)(g: A => A2): Var[A2] = transformVar(_.contramap(f))(_.map(g)) + final def lens[B](read: A => B)(write: (A, B) => A): Var[B] = + transformVar(_.contramap(write(nowGet(), _)))(_.map(read)) - final def subType[A2 <: A: ClassTag](implicit owner: Owner): Option[Var[A2]] = prism[A2]((x: A2) => x) { - case a: A2 => Some(a) - case _ => None - } + final def prism[A2](f: A2 => A)(g: A => Option[A2])(seed: => A2): Var[A2] = + transformVar(_.contramap(f))(rx => Rx.observableSync(rx.observable.mapFilter(g).prependEval(seed))) - final def subTypeInit[A2 <: A: ClassTag](initial: A2)(implicit owner: Owner): Var[A2] = prismInit[A2]((x: A2) => x) { + final def subType[A2 <: A: ClassTag](seed: => A2): Var[A2] = prism[A2]((x: A2) => x) { case a: A2 => Some(a) case _ => None - }(initial) + }(seed) - final def imapO[B](optic: Iso[A, B])(implicit owner: Owner): Var[B] = imap(optic.reverseGet(_))(optic.get(_)) - final def lensO[B](optic: Lens[A, B])(implicit owner: Owner): Var[B] = lens(optic.get(_))((base, zoomed) => optic.replace(zoomed)(base)) - final def prismO[B](optic: Prism[A, B])(implicit owner: Owner): Option[Var[B]] = prism(optic.reverseGet(_))(optic.getOption(_)) - final def prismInitO[B](optic: Prism[A, B])(initial: B)(implicit owner: Owner): Var[B] = - prismInit(optic.reverseGet(_))(optic.getOption(_))(initial) + final def imapO[B](optic: Iso[A, B]): Var[B] = imap(optic.reverseGet(_))(optic.get(_)) + final def lensO[B](optic: Lens[A, B]): Var[B] = lens(optic.get(_))((base, zoomed) => optic.replace(zoomed)(base)) + final def prismO[B](optic: Prism[A, B])(seed: => B): Var[B] = + prism(optic.reverseGet(_))(optic.getOption(_))(seed) } object Var { @@ -158,7 +148,7 @@ object Var { def combine[A](read: Rx[A], write: RxWriter[A]): Var[A] = new VarCombine(read, write) @inline implicit class SeqVarOperations[A](rxvar: Var[Seq[A]]) { - def sequence(implicit owner: Owner): Rx[Seq[Var[A]]] = Rx.observableSync(new Observable[Seq[Var[A]]] { + def sequence: Rx[Seq[Var[A]]] = Rx.observableSync(new Observable[Seq[Var[A]]] { def unsafeSubscribe(sink: Observer[Seq[Var[A]]]): Cancelable = { rxvar.observable.unsafeSubscribe( @@ -174,16 +164,7 @@ object Var { sink.unsafeOnError(error) } } - val observable = new Observable.Value[A] { - def now(): A = a - - def unsafeSubscribe(sink: Observer[A]): Cancelable = { - sink.unsafeOnNext(a) - Cancelable.empty - } - - } - Var.combine(Rx.observable(observable)(seed = a), RxWriter.observer(observer)) + Var.combine(Rx.const(a), RxWriter.observer(observer)) }) }, sink.unsafeOnError, @@ -194,7 +175,7 @@ object Var { } @inline implicit class OptionVarOperations[A](rxvar: Var[Option[A]]) { - def sequence(implicit owner: Owner): Rx[Option[Var[A]]] = Rx.observableSync(new Observable[Option[Var[A]]] { + def sequence: Rx[Option[Var[A]]] = Rx.observableSync(new Observable[Option[Var[A]]] { def unsafeSubscribe(outerSink: Observer[Option[Var[A]]]): Cancelable = { var cache = Option.empty[Var[A]] @@ -232,30 +213,29 @@ object Var { private final class RxConst[A](value: A) extends Rx[A] { val observable: Observable[A] = Observable.pure(value) - def now(): A = value + def nowOption() = Some(value) } -private final class RxObservableSync[A](inner: Observable[A])(implicit owner: Owner) extends Rx[A] { +private final class RxObservableSync[A](inner: Observable[A]) extends Rx[A] { private val state = new ReplayLatestSubject[A]() val observable: Observable[A] = inner.dropUntilSyncLatest.distinctOnEquals.multicast(state).refCount - owner.unsafeOwn(() => observable.unsafeSubscribe()) - def now(): A = state.now().get + def nowOption() = state.now() } private final class RxWriterObserver[A](val observer: Observer[A]) extends RxWriter[A] -private final class VarSubject[A](seed: A) extends Var[A] { +private final class VarSubject[A](seed: A) extends Var[A] { private val state = new BehaviorSubject[A](seed) val observable: Observable[A] = state.distinctOnEquals val observer: Observer[A] = state - def now(): A = state.now() + def nowOption() = Some(state.now()) } private final class VarCombine[A](innerRead: Rx[A], innerWrite: RxWriter[A]) extends Var[A] { - def now() = innerRead.now() - val observable = innerRead.observable - val observer = innerWrite.observer + def nowOption() = innerRead.nowOption() + val observable = innerRead.observable + val observer = innerWrite.observer } diff --git a/reactive/src/main/scala/colibri/reactive/implicits.scala b/reactive/src/main/scala/colibri/reactive/implicits.scala deleted file mode 100644 index 8583c406..00000000 --- a/reactive/src/main/scala/colibri/reactive/implicits.scala +++ /dev/null @@ -1,16 +0,0 @@ -package colibri.reactive - -import colibri.{Observer, Observable} - -object implicits { - @inline class ObservableRxOps[A](private val self: Observable[A]) extends AnyVal { - def foreachOwned(f: A => Unit)(implicit owner: Owner): Unit = - subscribeOwned(Observer.foreach(f)) - - def subscribeOwned(rxWriter: RxWriter[A])(implicit owner: Owner): Unit = - subscribeOwned(rxWriter.observer) - - def subscribeOwned(observer: Observer[A])(implicit owner: Owner): Unit = - owner.unsafeOwn(() => self.unsafeSubscribe(observer)) - } -} diff --git a/reactive/src/test/scala/colibri/ReactiveSpec.scala b/reactive/src/test/scala/colibri/ReactiveSpec.scala index 9428b545..ae1f6b24 100644 --- a/reactive/src/test/scala/colibri/ReactiveSpec.scala +++ b/reactive/src/test/scala/colibri/ReactiveSpec.scala @@ -1,34 +1,25 @@ package colibri.reactive -import colibri._ import cats.implicits._ -import cats.effect.SyncIO import monocle.macros.{GenLens, GenPrism} import org.scalatest.matchers.should.Matchers import org.scalatest.flatspec.AsyncFlatSpec class ReactiveSpec extends AsyncFlatSpec with Matchers { - implicit def unsafeSubscriptionOwner[T]: SubscriptionOwner[SyncIO[T]] = new SubscriptionOwner[SyncIO[T]] { - def own(owner: SyncIO[T])(subscription: () => Cancelable): SyncIO[T] = - owner.flatTap(_ => SyncIO(subscription()).void) - } - - "Rx" should "map with proper subscription lifetime" in Owned(SyncIO { + "Rx" should "map with proper subscription lifetime" in { var mapped = List.empty[Int] var received1 = List.empty[Int] var received2 = List.empty[Int] - val owner = implicitly[Owner] - val variable = Var(1) val stream = variable.map { x => mapped ::= x; x } - mapped shouldBe List(1) + mapped shouldBe List.empty received1 shouldBe List.empty received2 shouldBe List.empty - stream.foreach(received1 ::= _) + val cancelR1 = stream.unsafeForeach(received1 ::= _) mapped shouldBe List(1) received1 shouldBe List(1) @@ -40,7 +31,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(2, 1) received2 shouldBe List.empty - stream.foreach(received2 ::= _) + val cancelR2 = stream.unsafeForeach(received2 ::= _) mapped shouldBe List(2, 1) received1 shouldBe List(2, 1) @@ -52,19 +43,14 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(3, 2, 1) received2 shouldBe List(3, 2) - val cancel = owner.unsafeSubscribe() - - mapped shouldBe List(3, 2, 1) - received1 shouldBe List(3, 2, 1) - received2 shouldBe List(3, 2) - variable.set(4) mapped shouldBe List(4, 3, 2, 1) received1 shouldBe List(4, 3, 2, 1) received2 shouldBe List(4, 3, 2) - cancel.unsafeCancel() + cancelR1.unsafeCancel() + cancelR2.unsafeCancel() mapped shouldBe List(4, 3, 2, 1) received1 shouldBe List(4, 3, 2, 1) @@ -76,56 +62,75 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(4, 3, 2, 1) received2 shouldBe List(4, 3, 2) - val cancel2 = owner.unsafeSubscribe() + val cancelR1b = stream.unsafeForeach(received1 ::= _) + val cancelR2b = stream.unsafeForeach(received2 ::= _) mapped shouldBe List(5, 4, 3, 2, 1) - received1 shouldBe List(5, 4, 3, 2, 1) + received1 shouldBe List(5, 4, 4, 3, 2, 1) received2 shouldBe List(5, 4, 3, 2) variable.set(6) mapped shouldBe List(6, 5, 4, 3, 2, 1) - received1 shouldBe List(6, 5, 4, 3, 2, 1) + received1 shouldBe List(6, 5, 4, 4, 3, 2, 1) received2 shouldBe List(6, 5, 4, 3, 2) - val cancel3 = owner.unsafeSubscribe() + val cancelX = stream.unsafeSubscribe() mapped shouldBe List(6, 5, 4, 3, 2, 1) - received1 shouldBe List(6, 5, 4, 3, 2, 1) + received1 shouldBe List(6, 5, 4, 4, 3, 2, 1) received2 shouldBe List(6, 5, 4, 3, 2) variable.set(7) mapped shouldBe List(7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(7, 6, 5, 4, 3, 2, 1) - received2 shouldBe List(7, 6, 5, 4, 3, 2) - - cancel2.unsafeCancel() - - mapped shouldBe List(7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(7, 6, 5, 4, 4, 3, 2, 1) received2 shouldBe List(7, 6, 5, 4, 3, 2) variable.set(8) mapped shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(8, 7, 6, 5, 4, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) - cancel3.unsafeCancel() + cancelR2b.unsafeCancel() mapped shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(8, 7, 6, 5, 4, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) variable.set(9) - mapped shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) - received1 shouldBe List(8, 7, 6, 5, 4, 3, 2, 1) + mapped shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) - }).unsafeRunSync() - it should "nested owners" in Owned(SyncIO { + cancelR1b.unsafeCancel() + + mapped shouldBe List(9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) + + variable.set(10) + + mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) + + cancelX.unsafeCancel() + + mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) + + variable.set(11) + + mapped shouldBe List(10, 9, 8, 7, 6, 5, 4, 3, 2, 1) + received1 shouldBe List(9, 8, 7, 6, 5, 4, 4, 3, 2, 1) + received2 shouldBe List(8, 7, 6, 5, 4, 3, 2) + } + + it should "nested owners" in { var received1 = List.empty[Int] var innerRx = List.empty[Int] var outerRx = List.empty[Int] @@ -133,7 +138,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val variable = Var(1) val variable2 = Var(2) - def test(x: Int)(implicit owner: Owner) = Rx { + def test(x: Int) = Rx { innerRx ::= x variable2() * x } @@ -143,13 +148,13 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { outerRx ::= curr val result = test(curr) result() - } + }.unsafeHot() innerRx shouldBe List(1) outerRx shouldBe List(1) received1 shouldBe List.empty - rx.foreach(received1 ::= _) + rx.unsafeForeach(received1 ::= _) innerRx shouldBe List(1) outerRx shouldBe List(1) @@ -157,7 +162,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable2.set(3) - innerRx shouldBe List(1, 1, 1) //TODO: triggering too often + innerRx shouldBe List(1, 1, 1) // TODO: triggering too often outerRx shouldBe List(1, 1) received1 shouldBe List(3, 2) @@ -175,12 +180,12 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable2.set(4) - innerRx shouldBe List(3, 3, 3, 2, 1, 1, 1) //TODO: triggering too often + innerRx shouldBe List(3, 3, 3, 2, 1, 1, 1) // TODO: triggering too often outerRx shouldBe List(3, 3, 2, 1, 1) received1 shouldBe List(12, 9, 6, 3, 2) - }).unsafeRunSync() + } - it should "nested owners 2" in Owned.function(ownedOwner => SyncIO { + it should "nested owners 2" in { var received1 = List.empty[Int] var innerRx = List.empty[Int] var outerRx = List.empty[Int] @@ -188,9 +193,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val variable = Var(1) val variable2 = Var(2) - implicit val owner: Owner = ownedOwner - - def test(x: Int)(implicit owner: Owner) = Rx { + def test(x: Int) = Rx { innerRx ::= x variable2() * x } @@ -202,11 +205,11 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { result() } - innerRx shouldBe List(1) - outerRx shouldBe List(1) + innerRx shouldBe List.empty + outerRx shouldBe List.empty received1 shouldBe List.empty - rx.foreach(received1 ::= _) + rx.unsafeForeach(received1 ::= _) innerRx shouldBe List(1) outerRx shouldBe List(1) @@ -214,7 +217,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable2.set(3) - innerRx shouldBe List(1, 1, 1) //TODO: triggering too often + innerRx shouldBe List(1, 1, 1) // TODO: triggering too often outerRx shouldBe List(1, 1) received1 shouldBe List(3, 2) @@ -232,23 +235,23 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable2.set(4) - innerRx shouldBe List(3, 3, 3, 2, 1, 1, 1) //TODO: triggering too often + innerRx shouldBe List(3, 3, 3, 2, 1, 1, 1) // TODO: triggering too often outerRx shouldBe List(3, 3, 2, 1, 1) received1 shouldBe List(12, 9, 6, 3, 2) - }).unsafeRunSync() + } - it should "sequence with nesting" in Owned(SyncIO { + it should "sequence with nesting" in { var received1 = List.empty[Int] - var mapped = List.empty[Boolean] + var mapped = List.empty[Boolean] - val variable = Var[Option[Int]](None) + val variable = Var[Option[Int]](None) val stream = variable.sequence.switchMap { option => mapped ::= option.isDefined option match { case Some(rx) => - val isOdd = rx.map(_ % 2 != 0) + val isOdd = rx.map(_ % 2 != 0) val isEven = rx.map(_ % 2 == 0) isEven.switchMap { isEven => @@ -265,10 +268,10 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { } } - mapped shouldBe List(false) + mapped shouldBe List.empty received1 shouldBe List.empty - stream.foreach(received1 ::= _) + stream.unsafeForeach(received1 ::= _) mapped shouldBe List(false) received1 shouldBe List(-1) @@ -281,20 +284,20 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable.set(Some(2)) mapped shouldBe List(true, false) - received1 shouldBe List(1, 12, 2, -1) - }).unsafeRunSync() + received1 shouldBe List(1, 2, -1) + } - it should "be distinct" in Owned(SyncIO { + it should "be distinct" in { var mapped = List.empty[Int] var received1 = List.empty[Boolean] val variable = Var(1) val stream = variable.map { x => mapped ::= x; x % 2 == 0 } - mapped shouldBe List(1) + mapped shouldBe List.empty received1 shouldBe List.empty - stream.foreach(received1 ::= _) + stream.unsafeForeach(received1 ::= _) mapped shouldBe List(1) received1 shouldBe List(false) @@ -318,9 +321,9 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(5, 4, 2, 1) received1 shouldBe List(false, true, false) - }).unsafeRunSync() + } - it should "work without glitches in chain" in Owned(SyncIO { + it should "work without glitches in chain" in { var liveCounter = 0 var mapped = List.empty[Int] var received1 = List.empty[Boolean] @@ -330,7 +333,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val stream = variable.map { x => mapped ::= x; x % 2 == 0 } - stream.foreach(received1 ::= _) + stream.unsafeForeach(received1 ::= _) mapped shouldBe List(1) received1 shouldBe List(false) @@ -340,12 +343,12 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { s"${variable()}: ${stream()}" } - rx.foreach(receivedRx ::= _) + rx.unsafeForeach(receivedRx ::= _) mapped shouldBe List(1) received1 shouldBe List(false) receivedRx shouldBe List("1: false") - rx.now() shouldBe "1: false" + rx.nowGet() shouldBe "1: false" liveCounter shouldBe 1 variable.set(2) @@ -353,7 +356,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(2, 1) received1 shouldBe List(true, false) receivedRx shouldBe List("2: true", "1: false") - rx.now() shouldBe "2: true" + rx.nowGet() shouldBe "2: true" liveCounter shouldBe 2 variable.set(2) @@ -361,7 +364,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(2, 1) received1 shouldBe List(true, false) receivedRx shouldBe List("2: true", "1: false") - rx.now() shouldBe "2: true" + rx.nowGet() shouldBe "2: true" liveCounter shouldBe 2 variable.set(4) @@ -369,7 +372,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(4, 2, 1) received1 shouldBe List(true, false) receivedRx shouldBe List("4: true", "2: true", "1: false") - rx.now() shouldBe "4: true" + rx.nowGet() shouldBe "4: true" liveCounter shouldBe 3 variable.set(5) @@ -377,11 +380,11 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { mapped shouldBe List(5, 4, 2, 1) received1 shouldBe List(false, true, false) receivedRx shouldBe List("5: false", "4: true", "2: true", "1: false") - rx.now() shouldBe "5: false" + rx.nowGet() shouldBe "5: false" liveCounter shouldBe 4 - }).unsafeRunSync() + } - it should "work nested" in Owned(SyncIO { + it should "work nested" in { var liveCounter = 0 var liveCounter2 = 0 @@ -397,32 +400,32 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { } variable() + nested() - } + }.unsafeHot() - rx.now() shouldBe 3 + rx.nowGet() shouldBe 3 liveCounter shouldBe 1 liveCounter2 shouldBe 1 variable.set(2) - rx.now() shouldBe 4 + rx.nowGet() shouldBe 4 liveCounter shouldBe 2 liveCounter2 shouldBe 2 variable2.set(4) - rx.now() shouldBe 6 + rx.nowGet() shouldBe 6 liveCounter shouldBe 3 liveCounter2 shouldBe 4 // TODO: why do we jump to 4 calculations here instead of 3? variable.set(3) - rx.now() shouldBe 7 + rx.nowGet() shouldBe 7 liveCounter shouldBe 4 liveCounter2 shouldBe 5 - }).unsafeRunSync() + } - it should "work with now" in Owned(SyncIO { + it should "work with now" in { var liveCounter = 0 val variable = Var(1) @@ -432,92 +435,85 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val rx = Rx { liveCounter += 1 s"${variable()}, ${variable2()}, ${variable3.now()}" - } + }.unsafeHot() - rx.now() shouldBe "1, 2, 3" + rx.nowGet() shouldBe "1, 2, 3" liveCounter shouldBe 1 variable.set(2) - rx.now() shouldBe "2, 2, 3" + rx.nowGet() shouldBe "2, 2, 3" liveCounter shouldBe 2 variable.set(2) - rx.now() shouldBe "2, 2, 3" + rx.nowGet() shouldBe "2, 2, 3" liveCounter shouldBe 2 variable2.set(10) - rx.now() shouldBe "2, 10, 3" + rx.nowGet() shouldBe "2, 10, 3" liveCounter shouldBe 3 variable3.set(5) - rx.now() shouldBe "2, 10, 3" + rx.nowGet() shouldBe "2, 10, 3" liveCounter shouldBe 3 variable2.set(100) - rx.now() shouldBe "2, 100, 5" + rx.nowGet() shouldBe "2, 100, 5" liveCounter shouldBe 4 - }).unsafeRunSync() + } - it should "work with multi nesting" in Owned(SyncIO { + it should "work with multi nesting" in { var liveCounter = 0 val variable = Var(1) val variable2 = Var(2) val variable3 = Var(3) - val rx = Owned(SyncIO { - Owned(SyncIO { + val rx = Rx { + liveCounter += 1 + Rx { Rx { - liveCounter += 1 - - Owned(SyncIO { - Rx { - Rx { - s"${variable()}, ${variable2()}, ${variable3.now()}" - } - }(implicitly)() - }).unsafeRunSync()() + s"${variable()}, ${variable2()}, ${variable3.now()}" } - }).unsafeRunSync() - }).unsafeRunSync() + }.apply().apply() + }.unsafeHot() - rx.now() shouldBe "1, 2, 3" + rx.nowGet() shouldBe "1, 2, 3" liveCounter shouldBe 1 variable.set(2) - rx.now() shouldBe "2, 2, 3" + rx.nowGet() shouldBe "2, 2, 3" liveCounter shouldBe 2 variable.set(2) - rx.now() shouldBe "2, 2, 3" + rx.nowGet() shouldBe "2, 2, 3" liveCounter shouldBe 2 variable2.set(10) - rx.now() shouldBe "2, 10, 3" + rx.nowGet() shouldBe "2, 10, 3" liveCounter shouldBe 3 variable3.set(5) - rx.now() shouldBe "2, 10, 3" + rx.nowGet() shouldBe "2, 10, 3" liveCounter shouldBe 3 variable2.set(100) - rx.now() shouldBe "2, 100, 5" + rx.nowGet() shouldBe "2, 100, 5" liveCounter shouldBe 4 - }).unsafeRunSync() + } - it should "diamond" in Owned(SyncIO { + it should "diamond" in { var liveCounter = 0 var mapped1 = List.empty[Int] var mapped2 = List.empty[Int] @@ -530,8 +526,8 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { val stream1 = variable.map { x => mapped1 ::= x; x % 2 == 0 } val stream2 = variable.map { x => mapped2 ::= x; x % 2 == 0 } - stream1.foreach(received1 ::= _) - stream2.foreach(received2 ::= _) + stream1.unsafeForeach(received1 ::= _) + stream2.unsafeForeach(received2 ::= _) mapped1 shouldBe List(1) mapped2 shouldBe List(1) @@ -543,14 +539,14 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { s"${stream1()}:${stream2()}" } - rx.foreach(receivedRx ::= _) + rx.unsafeForeach(receivedRx ::= _) mapped1 shouldBe List(1) mapped2 shouldBe List(1) received1 shouldBe List(false) received2 shouldBe List(false) receivedRx shouldBe List("false:false") - rx.now() shouldBe "false:false" + rx.nowGet() shouldBe "false:false" liveCounter shouldBe 1 variable.set(2) @@ -560,7 +556,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(true, false) received2 shouldBe List(true, false) receivedRx shouldBe List("true:true", "true:false", "false:false") // glitch - rx.now() shouldBe "true:true" + rx.nowGet() shouldBe "true:true" liveCounter shouldBe 3 variable.set(2) @@ -570,7 +566,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(true, false) received2 shouldBe List(true, false) receivedRx shouldBe List("true:true", "true:false", "false:false") - rx.now() shouldBe "true:true" + rx.nowGet() shouldBe "true:true" liveCounter shouldBe 3 variable.set(4) @@ -580,7 +576,7 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(true, false) received2 shouldBe List(true, false) receivedRx shouldBe List("true:true", "true:false", "false:false") - rx.now() shouldBe "true:true" + rx.nowGet() shouldBe "true:true" liveCounter shouldBe 3 variable.set(5) @@ -590,16 +586,16 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { received1 shouldBe List(false, true, false) received2 shouldBe List(false, true, false) receivedRx shouldBe List("false:false", "false:true", "true:true", "true:false", "false:false") // glitch - rx.now() shouldBe "false:false" + rx.nowGet() shouldBe "false:false" liveCounter shouldBe 5 - }).unsafeRunSync() + } - it should "collect" in Owned(SyncIO { + it should "collect" in { val variable = Var[Option[Int]](Some(1)) val collected = variable.collect { case Some(x) => x }(0) var collectedStates = Vector.empty[Int] - collected.foreach(collectedStates :+= _) + collected.unsafeForeach(collectedStates :+= _) collectedStates shouldBe Vector(1) @@ -609,14 +605,14 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable.set(Some(17)) collectedStates shouldBe Vector(1, 17) - }).unsafeRunSync() + } - it should "collect initial none" in Owned(SyncIO { + it should "collect initial none" in { val variable = Var[Option[Int]](None) val collected = variable.collect { case Some(x) => x }(0) var collectedStates = Vector.empty[Int] - collected.foreach(collectedStates :+= _) + collected.unsafeForeach(collectedStates :+= _) collectedStates shouldBe Vector(0) @@ -626,143 +622,147 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { variable.set(Some(17)) collectedStates shouldBe Vector(0, 17) - }).unsafeRunSync() + } - it should "sequence on Var[Seq[T]]" in Owned(SyncIO { + it should "sequence on Var[Seq[T]]" in { { // inner.set on seed value val variable = Var[Seq[Int]](Seq(1)) - val sequence: Rx[Seq[Var[Int]]] = variable.sequence + val sequence: Rx[Seq[Var[Int]]] = variable.sequence.unsafeHot() - variable.now() shouldBe Seq(1) - sequence.now().map(_.now()) shouldBe Seq(1) + variable.nowGet() shouldBe Seq(1) + sequence.nowGet().map(_.nowGet()) shouldBe Seq(1) - sequence.now()(0).set(2) - variable.now() shouldBe Seq(2) + sequence.nowGet().apply(0).set(2) + variable.nowGet() shouldBe Seq(2) } { // inner.set on value after seed val variable = Var[Seq[Int]](Seq.empty) - val sequence: Rx[Seq[Var[Int]]] = variable.sequence + val sequence: Rx[Seq[Var[Int]]] = variable.sequence.unsafeHot() - variable.now() shouldBe Seq.empty - sequence.now().map(_.now()) shouldBe Seq.empty + variable.nowGet() shouldBe Seq.empty + sequence.nowGet().map(_.nowGet()) shouldBe Seq.empty variable.set(Seq(1)) - sequence.now().map(_.now()) shouldBe Seq(1) + sequence.nowGet().map(_.nowGet()) shouldBe Seq(1) - sequence.now()(0).set(2) - variable.now() shouldBe Seq(2) + sequence.nowGet().apply(0).set(2) + variable.nowGet() shouldBe Seq(2) } - }).unsafeRunSync() + } - it should "sequence on Var[Option[T]]" in Owned(SyncIO { + it should "sequence on Var[Option[T]]" in { { // inner.set on seed value val variable = Var[Option[Int]](Some(1)) - val sequence: Rx[Option[Var[Int]]] = variable.sequence + val sequence: Rx[Option[Var[Int]]] = variable.sequence.unsafeHot() - variable.now() shouldBe Some(1) - sequence.now().map(_.now()) shouldBe Some(1) + variable.nowGet() shouldBe Some(1) + sequence.nowGet().map(_.nowGet()) shouldBe Some(1) - sequence.now().get.set(2) - variable.now() shouldBe Some(2) + sequence.nowGet().get.set(2) + variable.nowGet() shouldBe Some(2) } { // inner.set on value after seed val variable = Var[Option[Int]](Option.empty) - val sequence: Rx[Option[Var[Int]]] = variable.sequence + val sequence: Rx[Option[Var[Int]]] = variable.sequence.unsafeHot() - variable.now() shouldBe None - sequence.now().map(_.now()) shouldBe None + variable.nowGet() shouldBe None + sequence.nowGet().map(_.nowGet()) shouldBe None variable.set(Option(1)) - sequence.now().map(_.now()) shouldBe Option(1) + sequence.nowGet().map(_.nowGet()) shouldBe Option(1) - sequence.now().get.set(2) - variable.now() shouldBe Option(2) + sequence.nowGet().get.set(2) + variable.nowGet() shouldBe Option(2) variable.set(None) - sequence.now().map(_.now()) shouldBe None + sequence.nowGet().map(_.nowGet()) shouldBe None } { // inner.set on seed value val variable = Var[Option[Int]](Some(1)) - val sequence: Rx[Option[Var[Int]]] = variable.sequence + val sequence: Rx[Option[Var[Int]]] = variable.sequence.unsafeHot() var outerTriggered = 0 var innerTriggered = 0 - sequence.foreach(_ => outerTriggered += 1) - sequence.now().foreach(_.foreach(_ => innerTriggered += 1)) + sequence.unsafeForeach(_ => outerTriggered += 1) + sequence.nowGet().foreach(_.unsafeForeach(_ => innerTriggered += 1)) - variable.now() shouldBe Some(1) - sequence.now().map(_.now()) shouldBe Some(1) + variable.nowGet() shouldBe Some(1) + sequence.nowGet().map(_.nowGet()) shouldBe Some(1) outerTriggered shouldBe 1 innerTriggered shouldBe 1 - val varRefA = sequence.now().get + val varRefA = sequence.nowGet().get variable.set(Some(2)) - variable.now() shouldBe Some(2) - sequence.now().map(_.now()) shouldBe Some(2) + variable.nowGet() shouldBe Some(2) + sequence.nowGet().map(_.nowGet()) shouldBe Some(2) outerTriggered shouldBe 1 innerTriggered shouldBe 2 - val varRefB = sequence.now().get + val varRefB = sequence.nowGet().get assert(varRefA eq varRefB) } - }).unsafeRunSync() + } - it should "lens" in Owned(SyncIO { + it should "lens" in { val a: Var[(Int, String)] = Var((0, "Wurst")) val b: Var[String] = a.lens(_._2)((a, b) => a.copy(_2 = b)) val c: Rx[String] = b.map(_ + "q") - a.now() shouldBe ((0, "Wurst")) - b.now() shouldBe "Wurst" - c.now() shouldBe "Wurstq" + a.unsafeSubscribe() + b.unsafeSubscribe() + c.unsafeSubscribe() + + a.nowGet() shouldBe ((0, "Wurst")) + b.nowGet() shouldBe "Wurst" + c.nowGet() shouldBe "Wurstq" a.set((1, "hoho")) - a.now() shouldBe ((1, "hoho")) - b.now() shouldBe "hoho" - c.now() shouldBe "hohoq" + a.nowGet() shouldBe ((1, "hoho")) + b.nowGet() shouldBe "hoho" + c.nowGet() shouldBe "hohoq" b.set("Voodoo") - a.now() shouldBe ((1, "Voodoo")) - b.now() shouldBe "Voodoo" - c.now() shouldBe "Voodooq" + a.nowGet() shouldBe ((1, "Voodoo")) + b.nowGet() shouldBe "Voodoo" + c.nowGet() shouldBe "Voodooq" a.set((3, "genau")) - a.now() shouldBe ((3, "genau")) - b.now() shouldBe "genau" - c.now() shouldBe "genauq" + a.nowGet() shouldBe ((3, "genau")) + b.nowGet() shouldBe "genau" + c.nowGet() shouldBe "genauq" b.set("Schwein") - a.now() shouldBe ((3, "Schwein")) - b.now() shouldBe "Schwein" - c.now() shouldBe "Schweinq" - }).unsafeRunSync() + a.nowGet() shouldBe ((3, "Schwein")) + b.nowGet() shouldBe "Schwein" + c.nowGet() shouldBe "Schweinq" + } it should "lens with monocle" in { case class Company(name: String, zipcode: Int) case class Employee(name: String, company: Company) - Owned(SyncIO { - val employee = Var(Employee("jules", Company("wules", 7))) - val zipcode = employee.lensO(GenLens[Employee](_.company.zipcode)) + val employee = Var(Employee("jules", Company("wules", 7))) + val zipcode = employee.lensO(GenLens[Employee](_.company.zipcode)) + + zipcode.unsafeSubscribe() - employee.now() shouldBe Employee("jules", Company("wules", 7)) - zipcode.now() shouldBe 7 + employee.nowGet() shouldBe Employee("jules", Company("wules", 7)) + zipcode.nowGet() shouldBe 7 - zipcode.set(8) - employee.now() shouldBe Employee("jules", Company("wules", 8)) - zipcode.now() shouldBe 8 + zipcode.set(8) + employee.nowGet() shouldBe Employee("jules", Company("wules", 8)) + zipcode.nowGet() shouldBe 8 - employee.set(Employee("gula", Company("bori", 6))) - employee.now() shouldBe Employee("gula", Company("bori", 6)) - zipcode.now() shouldBe 6 - }).unsafeRunSync() + employee.set(Employee("gula", Company("bori", 6))) + employee.nowGet() shouldBe Employee("gula", Company("bori", 6)) + zipcode.nowGet() shouldBe 6 } it should "optics operations" in { @@ -770,48 +770,44 @@ class ReactiveSpec extends AsyncFlatSpec with Matchers { case class EventA(i: Int) extends Event case class EventB(s: String) extends Event - Owned(SyncIO { - val eventVar: Var[Event] = Var[Event](EventA(0)) - val eventNotAVar: Var[Event] = Var[Event](EventB("")) - - val eventAVarOption: Option[Var[EventA]] = eventVar.prismO(GenPrism[Event, EventA]) - val eventAVarOption2: Option[Var[EventA]] = eventVar.subType[EventA] - val eventNotAVarOption: Option[Var[EventA]] = eventNotAVar.prismO(GenPrism[Event, EventA]) + val eventVar: Var[Event] = Var[Event](EventA(0)) + val eventNotVar: Var[Event] = Var[Event](EventB("")) - eventAVarOption.isDefined shouldBe true - eventAVarOption2.isDefined shouldBe true - eventNotAVarOption.isDefined shouldBe false + val eventAVar = eventVar.prismO(GenPrism[Event, EventA])(null) + val eventAVar2 = eventVar.subType[EventA](null) + val eventNotAVar = eventNotVar.prismO(GenPrism[Event, EventA])(null) - val eventAVar = eventAVarOption.get - val eventAVar2 = eventAVarOption2.get + eventAVar.unsafeSubscribe() + eventAVar2.unsafeSubscribe() + eventNotAVar.unsafeSubscribe() - eventVar.now() shouldBe EventA(0) - eventAVar.now() shouldBe EventA(0) - eventAVar2.now() shouldBe EventA(0) + eventVar.nowGet() shouldBe EventA(0) + eventAVar.nowGet() shouldBe EventA(0) + eventAVar2.nowGet() shouldBe EventA(0) + eventNotAVar.nowGet() shouldBe null - eventAVar.set(EventA(1)) + eventAVar.set(EventA(1)) - eventVar.now() shouldBe EventA(1) - eventAVar.now() shouldBe EventA(1) - eventAVar2.now() shouldBe EventA(1) + eventVar.nowGet() shouldBe EventA(1) + eventAVar.nowGet() shouldBe EventA(1) + eventAVar2.nowGet() shouldBe EventA(1) - eventVar.set(EventB("he")) + eventVar.set(EventB("he")) - eventVar.now() shouldBe EventB("he") - eventAVar.now() shouldBe EventA(1) - eventAVar2.now() shouldBe EventA(1) + eventVar.nowGet() shouldBe EventB("he") + eventAVar.nowGet() shouldBe EventA(1) + eventAVar2.nowGet() shouldBe EventA(1) - eventAVar.set(EventA(2)) + eventAVar.set(EventA(2)) - eventVar.now() shouldBe EventA(2) - eventAVar.now() shouldBe EventA(2) - eventAVar2.now() shouldBe EventA(2) + eventVar.nowGet() shouldBe EventA(2) + eventAVar.nowGet() shouldBe EventA(2) + eventAVar2.nowGet() shouldBe EventA(2) - eventVar.set(EventA(3)) + eventVar.set(EventA(3)) - eventVar.now() shouldBe EventA(3) - eventAVar.now() shouldBe EventA(3) - eventAVar2.now() shouldBe EventA(3) - }).unsafeRunSync() + eventVar.nowGet() shouldBe EventA(3) + eventAVar.nowGet() shouldBe EventA(3) + eventAVar2.nowGet() shouldBe EventA(3) } }