Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
cornerman committed Jan 9, 2023
1 parent 972b15b commit 18d6318
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 82 deletions.
12 changes: 6 additions & 6 deletions colibri/src/main/scala/colibri/Connectable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@ object Connectable
}

@inline implicit class ConnectableObservableValueOperations[A](val source: Connectable[Observable.Value[A]]) extends AnyVal {
def refCount: Observable.Value[A] = new Observable.Value[A] {
def refCount: Observable.Value[A] = new Observable.Value[A] {
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.connect())
}
@deprecated("Use unsafeHot instead", "0.7.8")
def hot: Observable.HotValue[A] = unsafeHot()
def unsafeHot(): Observable.HotValue[A] = new Observable.HotValue[A] {
def hot: Observable.HotValue[A] = unsafeHot()
def unsafeHot(): Observable.HotValue[A] = new Observable.HotValue[A] {
val cancelable = source.connect()
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink)
}
}

@inline implicit class ConnectableObservableMaybeValueOperations[A](val source: Connectable[Observable.MaybeValue[A]]) extends AnyVal {
def refCount: Observable.MaybeValue[A] = new Observable.MaybeValue[A] {
def refCount: Observable.MaybeValue[A] = new Observable.MaybeValue[A] {
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = Cancelable.composite(source.value.unsafeSubscribe(sink), source.connect())
}
@deprecated("Use unsafeHot instead", "0.7.8")
def hot: Observable.HotMaybeValue[A] = unsafeHot()
def unsafeHot(): Observable.HotMaybeValue[A] = new Observable.HotMaybeValue[A] {
def hot: Observable.HotMaybeValue[A] = unsafeHot()
def unsafeHot(): Observable.HotMaybeValue[A] = new Observable.HotMaybeValue[A] {
val cancelable = source.connect()
def now() = source.value.now()
def unsafeSubscribe(sink: Observer[A]): Cancelable = source.value.unsafeSubscribe(sink)
Expand Down
21 changes: 11 additions & 10 deletions colibri/src/main/scala/colibri/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,8 @@ object Observable {
def unsafeSubscribe(sink: Observer[B]): Cancelable = source.unsafeSubscribe(sink.contrascan(seed)(f))
}

def scanReduce(f: (A, A) => A): Observable[A] = scan[Option[A]](None)((previous, current) => Some(previous.fold(current)(f(_, current)))).flattenOption
def scanReduce(f: (A, A) => A): Observable[A] =
scan[Option[A]](None)((previous, current) => Some(previous.fold(current)(f(_, current)))).flattenOption

def switchScan[B](seed: B)(f: (B, A) => Observable[B]): Observable[B] = new Observable[B] {
override def unsafeSubscribe(sink: Observer[B]): Cancelable = {
Expand Down Expand Up @@ -875,17 +876,17 @@ object Observable {
}

def forMerge: ForSemantic[A] = new ForSemantic[A] {
def map[B](f: A => B): Observable[B] = source.map(f)
def map[B](f: A => B): Observable[B] = source.map(f)
def flatMap[B](f: A => Observable[B]): Observable[B] = source.mergeMap(f)
}

def forSwitch: ForSemantic[A] = new ForSemantic[A] {
def map[B](f: A => B): Observable[B] = source.map(f)
def map[B](f: A => B): Observable[B] = source.map(f)
def flatMap[B](f: A => Observable[B]): Observable[B] = source.switchMap(f)
}

def forConcat: ForSemantic[A] = new ForSemantic[A] {
def map[B](f: A => B): Observable[B] = source.map(f)
def map[B](f: A => B): Observable[B] = source.map(f)
def flatMap[B](f: A => Observable[B]): Observable[B] = source.concatMap(f)
}

Expand Down Expand Up @@ -1515,16 +1516,16 @@ object Observable {

@inline def syncAll: Observable[A] = Observable.fromEffect(syncAllSyncIO).flattenIterable

@inline def collectFirst[B](f: PartialFunction[A, B]): Observable[B] = Observable.fromEffect(collectFirstIO(f))
@inline def collectFirstIO[B](f: PartialFunction[A, B]): IO[B] = collectFirstF[IO, B](f)
@inline def collectFirst[B](f: PartialFunction[A, B]): Observable[B] = Observable.fromEffect(collectFirstIO(f))
@inline def collectFirstIO[B](f: PartialFunction[A, B]): IO[B] = collectFirstF[IO, B](f)
@inline def collectFirstF[F[_]: Async, B](f: PartialFunction[A, B]): F[B] = mapFilterFirstF(f.lift)

@inline def mapFilterFirst[B](f: A => Option[B]): Observable[B] = Observable.fromEffect(mapFilterFirstIO(f))
@inline def mapFilterFirstIO[B](f: A => Option[B]): IO[B] = mapFilterFirstF[IO, B](f)
@inline def mapFilterFirst[B](f: A => Option[B]): Observable[B] = Observable.fromEffect(mapFilterFirstIO(f))
@inline def mapFilterFirstIO[B](f: A => Option[B]): IO[B] = mapFilterFirstF[IO, B](f)
@inline def mapFilterFirstF[F[_]: Async, B](f: A => Option[B]): F[B] = mapFilter(f).headF

@inline def collectWhile[B](f: PartialFunction[A, B]): Observable[B] = mapFilterWhile(f.lift)
@inline def mapFilterWhile[B](f: A => Option[B]): Observable[B] = map(f).takeWhile(_.isDefined).flattenOption
@inline def mapFilterWhile[B](f: A => Option[B]): Observable[B] = map(f).takeWhile(_.isDefined).flattenOption

def headF[F[_]: Async]: F[A] = Async[F].async[A] { callback =>
Async[F].delay {
Expand Down Expand Up @@ -1701,7 +1702,7 @@ object Observable {

Cancelable.composite(
replacedSubscription,
subscription
subscription,
)
}
}
Expand Down
37 changes: 19 additions & 18 deletions colibri/src/test/scala/colibri/ObservableSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,8 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
it should "lastIO async complex" in {
val last = Observable(2)
.prependEffect(IO.cede *> IO.pure(1))
.concatMap(x => Observable(x,x).prependEffect(IO.cede *> IO.pure(0))).take(100)
.concatMap(x => Observable(x, x).prependEffect(IO.cede *> IO.pure(0)))
.take(100)
.dropSyncAll
.prepend(1000)
.switchMap(x => Observable(x).delayMillis(40))
Expand Down Expand Up @@ -1606,7 +1607,7 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
}

it should "mapFilterFirst" in {
val result = Observable(1, 2, 3).mapFilterFirstIO(v => Option.when(v == 2)(v))
val result = Observable(1, 2, 3).mapFilterFirstIO(v => Option.when(v == 2)(v))

val test = result.map { v =>
v shouldBe 2
Expand Down Expand Up @@ -1635,7 +1636,7 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
it should "forSemantic" in {
var received = List.empty[Int]
var errors = 0
val stream = for {
val stream = for {
a <- Observable(1).concat(Observable(2).delayMillis(1)).forSwitch
b <- Observable(10).concat(Observable(20).delayMillis(2)).forMerge
c <- Observable(100).concat(Observable(200).delayMillis(3)).forConcat
Expand Down Expand Up @@ -1666,9 +1667,9 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
it should "sampleWith" in {
var received = List.empty[Int]
var errors = 0
val trigger = Subject.publish[Unit]()
val subject = Subject.publish[Int]()
val stream = subject.sampleWith(trigger)
val trigger = Subject.publish[Unit]()
val subject = Subject.publish[Int]()
val stream = subject.sampleWith(trigger)

val cancelable = stream.unsafeSubscribe(
Observer.create[Int](
Expand Down Expand Up @@ -1722,9 +1723,9 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
it should "sampleWith initial" in {
var received = List.empty[Int]
var errors = 0
val trigger = Subject.publish[Unit]()
val subject = Subject.publish[Int]()
val stream = subject.prepend(-100).sampleWith(trigger.prepend(()))
val trigger = Subject.publish[Unit]()
val subject = Subject.publish[Int]()
val stream = subject.prepend(-100).sampleWith(trigger.prepend(()))

val cancelable = stream.unsafeSubscribe(
Observer.create[Int](
Expand Down Expand Up @@ -1757,13 +1758,13 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
}

it should "replaceWith" in {
var mappedA = List.empty[Unit]
var mappedB = List.empty[Unit]
var mappedA = List.empty[Unit]
var mappedB = List.empty[Unit]
var received = List.empty[Unit]
var errors = 0
val a = Subject.publish[Unit]()
val b = Subject.publish[Unit]()
val stream = (a: Observable[Unit]).tap(mappedA ::= _).replaceWith((b: Observable[Unit]).tap(mappedB ::= _))
val a = Subject.publish[Unit]()
val b = Subject.publish[Unit]()
val stream = (a: Observable[Unit]).tap(mappedA ::= _).replaceWith((b: Observable[Unit]).tap(mappedB ::= _))

val cancelable = stream.unsafeSubscribe(
Observer.create[Unit](
Expand Down Expand Up @@ -1804,11 +1805,11 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {
}

it should "replaceWith sync" in {
var mappedA = List.empty[Unit]
var mappedB = List.empty[Unit]
var mappedA = List.empty[Unit]
var mappedB = List.empty[Unit]
var received = List.empty[Unit]
var errors = 0
val stream = Observable.pure(()).tap(mappedA ::= _).replaceWith(Observable.pure(()).tap(mappedB ::= _))
val stream = Observable.pure(()).tap(mappedA ::= _).replaceWith(Observable.pure(()).tap(mappedB ::= _))

val cancelable = stream.unsafeSubscribe(
Observer.create[Unit](
Expand All @@ -1829,7 +1830,7 @@ class ObservableSpec extends AsyncFlatSpec with Matchers {

var receivedString = List.empty[String]
var receivedRecipe = List.empty[Recipe]
var errors = 0
var errors = 0

val hdlRecipe = Subject.behavior[Recipe](Recipe("hans", 12))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package colibri.reactive

trait RxPlatform {
def apply[R](f: LiveOwner ?=> R): Rx[R] = Rx.function(implicit owner => f)
def apply[R](f: LiveOwner ?=> R): Rx[R] = Rx.function(implicit owner => f)
}
7 changes: 4 additions & 3 deletions reactive/src/main/scala/colibri/reactive/Owner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,23 @@ package colibri.reactive

import colibri._

trait NowOwner {
trait NowOwner {
def unsafeNow[A](rx: Rx[A]): A
}
object NowOwner {
implicit object global extends NowOwner {
def unsafeNow[A](rx: Rx[A]): A = {
val cancelable = rx.unsafeSubscribe()
try(rx.nowIfSubscribed()) finally(cancelable.unsafeCancel())
try (rx.nowIfSubscribed())
finally (cancelable.unsafeCancel())
}
}
}

@annotation.implicitNotFound(
"No implicit LiveOwner is available here! Wrap inside `Rx { <code> }`, or provide an implicit `LiveOwner`.",
)
trait LiveOwner extends NowOwner {
trait LiveOwner extends NowOwner {
def cancelable: Cancelable

def liveObservable: Observable[Any]
Expand Down
43 changes: 22 additions & 21 deletions reactive/src/main/scala/colibri/reactive/Reactive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ trait Rx[+A] {
final def nowIfSubscribed(): A = nowOption().getOrElse(throw RxMissingNowException)

final def collect[B](f: PartialFunction[A, B])(seed: => B): Rx[B] = transformRx(_.collect(f))(seed)
final def map[B](f: A => B): Rx[B] = transformRxSync(_.map(f))
final def mapEither[B](f: A => Either[Throwable, B]): Rx[B] = transformRxSync(_.mapEither(f))
final def tap(f: A => Unit): Rx[A] = transformRxSync(_.tap(f))
final def tapLater(f: A => Unit): Rx[A] = transformRxSync(_.tail.tap(f))
final def map[B](f: A => B): Rx[B] = transformRxSync(_.map(f))
final def mapEither[B](f: A => Either[Throwable, B]): Rx[B] = transformRxSync(_.mapEither(f))
final def tap(f: A => Unit): Rx[A] = transformRxSync(_.tap(f))
final def tapLater(f: A => Unit): Rx[A] = transformRxSync(_.tail.tap(f))

final def mapSyncEffect[F[_]: RunSyncEffect, B](f: A => F[B]): Rx[B] = transformRxSync(_.mapEffect(f))
final def mapEffect[F[_]: RunEffect, B](f: A => F[B])(seed: => B): Rx[B] = transformRx(_.mapEffect(f))(seed)
Expand All @@ -47,7 +47,7 @@ trait Rx[+A] {
final def transformRx[B](f: Observable[A] => Observable[B])(seed: => B): Rx[B] = Rx.observable(f(observable))(seed)
final def transformRxSync[B](f: Observable[A] => Observable[B]): Rx[B] = Rx.observableSync(f(observable))

final def hot: SyncIO[Rx[A]] = SyncIO(unsafeHot())
final def hot: SyncIO[Rx[A]] = SyncIO(unsafeHot())

final def unsafeHot(): Rx[A] = {
val _ = unsafeSubscribe()
Expand Down Expand Up @@ -151,7 +151,7 @@ object RxWriter {

trait Var[A] extends RxWriter[A] with Rx[A] {
final def updateIfSubscribed(f: A => A): Unit = set(f(nowIfSubscribed()))
final def update(f: A => A): Unit = set(f(now()))
final def update(f: A => A): Unit = set(f(now()))

final def transformVar[A2](f: RxWriter[A] => RxWriter[A2])(g: Rx[A] => Rx[A2]): Var[A2] = Var.combine(g(this), f(this))
final def transformVarRx(g: Rx[A] => Rx[A]): Var[A] = Var.combine(g(this), this)
Expand All @@ -169,16 +169,16 @@ trait Var[A] extends RxWriter[A] with Rx[A] {
case _ => None
}(seed)

final def imapO[B](optic: Iso[A, B]): Var[B] = imap(optic.reverseGet(_))(optic.get(_))
final def lensO[B](optic: Lens[A, B]): Var[B] = lens(optic.get(_))((base, zoomed) => optic.replace(zoomed)(base))
final def imapO[B](optic: Iso[A, B]): Var[B] = imap(optic.reverseGet(_))(optic.get(_))
final def lensO[B](optic: Lens[A, B]): Var[B] = lens(optic.get(_))((base, zoomed) => optic.replace(zoomed)(base))
final def prismO[B](optic: Prism[A, B])(seed: => B): Var[B] =
prism(optic.reverseGet(_))(optic.getOption(_))(seed)
}

object Var {
def apply[A](seed: A): Var[A] = new VarSubject(seed)

def none[A](): Var[Option[A]] = new VarSubject(None)
def none[A](): Var[Option[A]] = new VarSubject(None)
def some[A](seed: A): Var[Option[A]] = new VarSubject(Some(seed))

def subjectSync[A](read: Subject[A]): Var[A] = combine(Rx.observableSync(read), RxWriter.observer(read))
Expand Down Expand Up @@ -250,38 +250,39 @@ object Var {
}

private final class RxConst[A](value: A) extends Rx[A] {
val observable: Observable[A] = Observable.pure(value)
def nowOption(): Option[A] = Some(value)
def now()(implicit owner: NowOwner): A = value
val observable: Observable[A] = Observable.pure(value)
def nowOption(): Option[A] = Some(value)
def now()(implicit owner: NowOwner): A = value
def apply()(implicit owner: LiveOwner): A = value
}

private final class RxObservableSync[A](inner: Observable[A]) extends Rx[A] {
private val state = new ReplayLatestSubject[A]()

val observable: Observable[A] = inner.dropUntilSyncLatest.distinctOnEquals.tapCancel(state.unsafeResetState).multicast(state).refCount.distinctOnEquals
val observable: Observable[A] =
inner.dropUntilSyncLatest.distinctOnEquals.tapCancel(state.unsafeResetState).multicast(state).refCount.distinctOnEquals

def nowOption() = state.now()
def now()(implicit owner: NowOwner) = owner.unsafeNow(this)
def nowOption() = state.now()
def now()(implicit owner: NowOwner) = owner.unsafeNow(this)
def apply()(implicit owner: LiveOwner): A = owner.unsafeLive(this)
}

private final class RxWriterObserver[A](val observer: Observer[A]) extends RxWriter[A]

private final class VarSubject[A](seed: A) extends Var[A] {
private final class VarSubject[A](seed: A) extends Var[A] {
private val state = new BehaviorSubject[A](seed)

val observable: Observable[A] = state.distinctOnEquals
val observer: Observer[A] = state

def nowOption() = Some(state.now())
def now()(implicit owner: NowOwner) = state.now()
def nowOption() = Some(state.now())
def now()(implicit owner: NowOwner) = state.now()
def apply()(implicit owner: LiveOwner): A = owner.unsafeLive(this)
}
private final class VarCombine[A](innerRead: Rx[A], innerWrite: RxWriter[A]) extends Var[A] {
def nowOption() = innerRead.nowOption()
def nowOption() = innerRead.nowOption()
def now()(implicit owner: NowOwner) = innerRead.now()
def apply()(implicit owner: LiveOwner): A = innerRead()
val observable = innerRead.observable
val observer = innerWrite.observer
val observable = innerRead.observable
val observer = innerWrite.observer
}
Loading

0 comments on commit 18d6318

Please sign in to comment.