Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make reactive state variables lazy #271

Merged
merged 43 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
344c5ae
make reactive state variables lazy
cornerman Dec 4, 2022
7eacf42
add Var.none/some/subjectSync
cornerman Jan 9, 2023
e1cdcf3
format
cornerman Jan 9, 2023
87b2dcb
update readme about reactive variables
cornerman Jan 9, 2023
bb812b5
remove Rx#scan without seed
cornerman Jan 10, 2023
457d1f3
fix tapLater
cornerman Jan 10, 2023
5802427
Revert "remove Rx#scan without seed"
cornerman Jan 10, 2023
788bda4
add RxEvent and VarEvent for (mostly) shared event streams
cornerman Jan 13, 2023
c77a13b
add tests
cornerman Jan 13, 2023
5ddf178
format
cornerman Jan 13, 2023
31f2a1c
update readme
cornerman Jan 13, 2023
7af27ca
wip
cornerman Jan 14, 2023
1a8b5bc
delete duplicate distinct
cornerman Jan 14, 2023
5933f80
wip
cornerman Jan 16, 2023
d12cb0b
wip
cornerman Jan 16, 2023
8cedd91
wip
cornerman Jan 16, 2023
bc4fe34
rename
cornerman Jan 16, 2023
2fb358c
format
cornerman Jan 16, 2023
2587261
fix
cornerman Jan 16, 2023
24eabdd
remove unnecessary type params for observer flatten methods
cornerman Jan 16, 2023
ef6fa70
fix stateful transform with Var.from (renamed from combine)
cornerman Jan 16, 2023
a1972ef
simplify
cornerman Jan 16, 2023
bedefe6
again
cornerman Jan 16, 2023
3ea7b6e
ok
cornerman Jan 16, 2023
8bbd80b
readme
cornerman Jan 16, 2023
0a91198
generic self types
cornerman Jan 17, 2023
558d815
rename
cornerman Jan 18, 2023
248fa06
format
cornerman Jan 18, 2023
6dd2d50
fix rxsource object
cornerman Jan 18, 2023
b514a8f
methods
cornerman Jan 18, 2023
b9b8746
format
cornerman Jan 20, 2023
0c981c2
Merge branch 'master' into lazy-rx
cornerman Aug 18, 2023
0e4fcfe
improvements
cornerman Aug 22, 2023
c8f667b
Update README.md
cornerman Sep 20, 2023
a891bf0
Merge branch 'master' into lazy-rx
cornerman Nov 6, 2023
809e29c
Update README.md
cornerman Nov 6, 2023
453eb88
add void
cornerman Nov 6, 2023
29351ce
undeprecate to again
cornerman Nov 6, 2023
4383662
fix Cancelable.Builder if it is cancelled while subscribing
cornerman Nov 8, 2023
e8454a4
own subscription for now() in lens()
cornerman Nov 8, 2023
de02047
fix
cornerman Nov 8, 2023
6021446
minor refactoring
cornerman Nov 8, 2023
0c385e8
Revert "minor refactoring"
cornerman Nov 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 62 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ This library includes:

Reactive core library with typeclasses:
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri" % "0.8.0"
```

```scala
import colibri._
```

Reactive variables with hot distinct observables (a bit like scala-rx):
Reactive variables with lazy, distinct, shared state variables (a bit like scala-rx, but lazy):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-reactive" % "0.8.0"
```

```scala
Expand All @@ -32,7 +32,7 @@ import colibri.reactive._

For jsdom-based operations in the browser (`EventObservable`, `Storage`):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-jsdom" % "0.8.0"
```

```scala
Expand All @@ -41,7 +41,7 @@ import colibri.jsdom._

For scala.rx support (only Scala 2.x):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-rx" % "0.8.0"
```

```scala
Expand All @@ -50,7 +50,7 @@ import colibri.ext.rx._

For airstream support:
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-airstream" % "0.8.0"
```

```scala
Expand All @@ -59,7 +59,7 @@ import colibri.ext.airstream._

For zio support:
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-zio" % "0.8.0"
```

```scala
Expand All @@ -68,7 +68,7 @@ import colibri.ext.zio._

For fs2 support (`Source` only):
```scala
libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.5.0"
libraryDependencies += "com.github.cornerman" %%% "colibri-fs2" % "0.8.0"
```

```scala
Expand Down Expand Up @@ -146,79 +146,99 @@ You can convert any `Source` into an `Observable` with `Observable.lift(source)`

## Reactive variables

The module `colibri-reactive` exposes reactive variables. This is hot, distinct observables that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy `Observable` in the core `colibri` library.
The module `colibri-reactive` exposes reactive variables. This is lazy, distinct shared state variables (internally using observables) that always have a value. These reactive variables are meant for managing state - opposed to managing events which is a perfect fit for lazy `Observable` in the core `colibri` library.

This module behaves very similar to scala-rx - just built on top of colibri Observables for seamless integration and powerful operators. It is not entirely glitch-free because invalid state can appear in operators like map or foreach, but you always have a consistent state in `now()` and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using `dropSyncGlitches` which will introduce an async boundary (micro-task).
This module behaves similar to scala-rx - though variables are not hot and it is built on top of colibri Observables for seamless integration and powerful operators.

The whole thing is not entirely glitch-free, as invalid state can appear in operators like map or foreach. But you always have a consistent state in `now()` and it reduces the number of intermediate triggers or glitches. You can become completely glitch-free by converting back to observable and using `dropSyncGlitches` which will introduce an async boundary (micro-task).

A state variable is of type `Var[A] extends Rx[A] with RxWriter[A]`.

The laziness of variables means that the current value is only tracked if anyone subscribes to the `Rx[A]`. So an Rx does not compute anything on its own. You can still always call `now()` on it - if it is currently not subscribed, it will lazily calculate the current value.

Example:

```scala

import colibri.reactive._

import colibri.owner.unsafeImplicits._ // dangerous. This never cancels subscriptions. See below!

val variable = Var(1)
val variable2 = Var("Test")

val rx = Rx {
s"${variable()} - ${variable2()}"
}

rx.foreach(println(_))
val cancelable = rx.unsafeForeach(println(_))

println(variable.now()) // 1
println(variable2.now()) // "Test"
println(rx.now()) // "1 - Test"

variable.set(2)
variable.set(2) // println("2 - Test")

println(variable.now()) // 2
println(variable2.now()) // "Test"
println(rx.now()) // "2 - Test"

variable2.set("Foo")
variable2.set("Foo") // println("2 - Foo")

println(variable.now()) // 2
println(variable2.now()) // "Foo"
println(rx.now()) // "2 - Foo"

```
cancelable.unsafeCancel()

If you want to work with reactive variables (hot observable), then someone need to cleanup the subscriptions. We call this concept an `Owner`. We use an *unsafe* owner in the above example. It actually never cleans up. It should only ever be used in your main method or for global state.
println(variable.now()) // 2
println(variable2.now()) // "Foo"
println(rx.now()) // "2 - Foo"

You can even work without ever using the unsafe owner or having to pass it implictly. You can use `Owned` blocks instead. Inside an `Owned` block, you will have to return a type that has a `SubscriptionOwner` instance. Example:
variable.set(3) // no println

```scala
// now calculates new value lazily
println(variable.now()) // 3
println(variable2.now()) // "Foo"
println(rx.now()) // "3 - Foo"
```

import colibri._
Apart from `Rx` which always has an initial value, there is `RxLater` (and `VarLater`) which will eventually have a value (both extend RxState which extends RxSource). It also meant for representing state just without an initial state. It is lazy, distinct and has shared execution just like `Rx`.

```
import colibri.reactive._
import cats.effect.SyncIO

sealed trait Modifier
object Modifier {
case class ReactiveModifier(rx: Rx[String]) extends Modifier
case class SubscriptionModifier(subscription: () => Cancelable) extends Modifier
case class CombineModifier(modifierA: Modifier, modifierB: Modifier) extends Modifier
val variable = VarLater[Int]()

implicit object subcriptionOwner extends SubscriptionOwner[Modifier] {
def own(owner: Modifier)(subscription: () => Cancelable): Modifier = CombineModifier(owner, SubscriptionModifier(subscription))
}
}
val stream1 = RxLater.empty
val stream2 = RxLater.future(Future.successful(1)).map(_ + 1)

val component: SyncIO[Modifier] = Owned {
val variable = Var(1)
val mapped = rx.map(_ + 1)
val cancelable = variable.unsafeForeach(println(_))
val cancelable1 = stream1.unsafeForeach(println(_))
val cancelable2 = stream2.unsafeForeach(println(_))

val rx = Rx {
"Hallo: ${mapped()}"
}
println(variable.toRx.now()) // None
println(stream1.toRx.now()) // None
println(stream2.toRx.now()) // Some(2)

ReactiveModifier(rx)
}
variable.set(13)

println(variable.toRx.now()) // Some(13)
```

There also exist `RxEvent` and `VarEvent`, which are event observables with shared execution. That is they behave like `Rx` and `Var` such that transformations are only applied once and not per subscription. But `RxEvent` and `VarEvent` are not distinct and have no current value. They should be used for event streams.

```
import colibri.reactive._

val variable = VarEvent[Int]()

val stream = RxEvent.empty

val mapped = RxEvent.merge(variable.tap(println(_)).map(_ + 1), stream)

val cancelable = mapped.unsafeForeach(println(_))
```

For example, [Outwatch](https://github.com/outwatch/outwatch) supports `Owned`:
[Outwatch](https://github.com/outwatch/outwatch) works perfectly with Rx (or RxLater, RxEvent which all extend RxSource) - just like Observable.

```scala

Expand All @@ -227,7 +247,7 @@ import outwatch.dsl._
import colibri.reactive._
import cats.effect.SyncIO

val component: SyncIO[VModifier] = Owned {
val component: VModifier = {
val variable = Var(1)
val mapped = rx.map(_ + 1)

Expand All @@ -241,9 +261,9 @@ val component: SyncIO[VModifier] = Owned {

### Memory management

Every subscription that is created inside of colibri-reactive methods is owned by an implicit `Owner`. For example `map` or `foreach` take an implicit `Owner`. As long as the `Owner` is cancelled when it is not needed anymore, all subscriptions will be cleaned up. The exception is the `Owner.unsafeGlobal` that never cleans up and is meant for global state.
The same principles as for Observables hold. Any cancelable that is returned from the API needs to be handled by the the caller. Best practice: use subscribe/foreach as seldomly as possible - only in selected spots or within a library.

If you are working with `Outwatch`, you can just use `Owned`-blocks returning `VModifier` and everything is handled automatically for you. No memory leaks.
If you are working with `Outwatch`, you can just use `Rx` without ever subscribing yourself. Then all memory management is handled for you automatically. No memory leaks.

## Information

Expand Down
3 changes: 2 additions & 1 deletion colibri/src/main/scala/colibri/Cancelable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ object Cancelable {

def unsafeAdd(subscription: () => Cancelable): Unit = if (buffer != null) {
val cancelable = subscription()
buffer.push(cancelable)
if (buffer == null) cancelable.unsafeCancel()
else buffer.push(cancelable)
()
}

Expand Down
5 changes: 2 additions & 3 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,6 @@ object Observable {
def unsafeSubscribe(sink2: Observer[A]): Cancelable = source.unsafeSubscribe(Observer.combine(sink, sink2))
}

@deprecated("Use via instead", "0.7.8")
def to(sink: Observer[A]): Observable[Unit] = via(sink).void

@deprecated("Use tap instead", "0.7.8")
Expand Down Expand Up @@ -512,9 +511,9 @@ object Observable {
@deprecated("Use scan0 instead", "0.7.8")
def scan0ToList: Observable[List[A]] = scan0(List.empty[A])((list, x) => x :: list)

def scan0[B](seed: B)(f: (B, A) => B): Observable[B] = scan(seed)(f).prepend(seed)
def scan0[B](seed: => B)(f: (B, A) => B): Observable[B] = scan(seed)(f).prependEval(seed)

def scan[B](seed: B)(f: (B, A) => B): Observable[B] = new Observable[B] {
def scan[B](seed: => B)(f: (B, A) => B): Observable[B] = new Observable[B] {
def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(sink.contrascan(seed)(f))
}

Expand Down
6 changes: 3 additions & 3 deletions colibri/src/main/scala/colibri/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ object Observer {
def unsafeOnError(error: Throwable): Unit = sink.unsafeOnError(error)
}

def contraflattenIterable[B]: Observer[Iterable[A]] = contramapIterable(identity)
def contraflattenEither[B]: Observer[Either[Throwable, A]] = contramapEither(identity)
def contraflattenOption[B]: Observer[Option[A]] = contramapFilter(identity)
def contraflattenIterable: Observer[Iterable[A]] = contramapIterable(identity)
def contraflattenEither: Observer[Either[Throwable, A]] = contramapEither(identity)
def contraflattenOption: Observer[Option[A]] = contramapFilter(identity)

// TODO return effect
def contrascan[B](seed: A)(f: (A, B) => A): Observer[B] = new Observer[B] {
Expand Down
8 changes: 8 additions & 0 deletions colibri/src/main/scala/colibri/Subject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ final class ReplayLatestSubject[A] extends Observer[A] with Observable.MaybeValu

@inline def now(): Option[A] = current

def unsafeResetState(): Unit = {
current = None
}

def unsafeOnNext(value: A): Unit = {
current = Some(value)
state.unsafeOnNext(value)
Expand All @@ -37,6 +41,10 @@ final class ReplayAllSubject[A] extends Observer[A] with Observable[A] {

@inline def now(): Seq[A] = current.toSeq

def unsafeResetState(): Unit = {
current.clear()
}

def unsafeOnNext(value: A): Unit = {
current += value
state.unsafeOnNext(value)
Expand Down

This file was deleted.

20 changes: 4 additions & 16 deletions reactive/src/main/scala-2/colibri/reactive/OwnerPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,14 @@ package colibri.reactive

import colibri.{Observable, Cancelable}

trait OwnerPlatform {
@annotation.compileTimeOnly(
"No implicit Owner is available here! Wrap inside `Owned { <code> }`, or provide an implicit `Owner`, or `import Owner.unsafeImplicits._` (dangerous).",
)
implicit object compileTimeMock extends Owner {
def unsafeSubscribe(): Cancelable = ???
def unsafeOwn(subscription: () => Cancelable): Unit = ???
def cancelable: Cancelable = ???
}
}

trait LiveOwnerPlatform {
@annotation.compileTimeOnly(
"No implicit LiveOwner is available here! Wrap inside `Rx { <code> }`, or provide an implicit `LiveOwner`.",
)
implicit object compileTimeMock extends LiveOwner {
def unsafeSubscribe(): Cancelable = ???
def unsafeOwn(subscription: () => Cancelable): Unit = ???
def cancelable: Cancelable = ???
def unsafeLive[A](rx: Rx[A]): A = ???
def liveObservable: Observable[Any] = ???
def cancelable: Cancelable = ???
def unsafeNow[A](rx: Rx[A]): A = ???
def unsafeLive[A](rx: Rx[A]): A = ???
def liveObservable: Observable[Any] = ???
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package colibri.reactive
import colibri.reactive.internal.MacroUtils

trait RxPlatform {
def apply[R](f: R)(implicit owner: Owner): Rx[R] = macro MacroUtils.rxImpl[R]
def apply[R](f: R): Rx[R] = macro MacroUtils.rxImpl[R]
}
Loading
Loading