Skip to content

Commit

Permalink
refactor(rears-core): rename Observable -> Publisher and move test to…
Browse files Browse the repository at this point in the history
… describe AnyFunSpec
  • Loading branch information
tassiluca committed Feb 7, 2024
1 parent 4626bf8 commit 21dc9d9
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package io.github.tassiLuca.rears

import gears.async.{Async, Channel, ReadableChannel, SendableChannel, Task, UnboundedChannel}
import gears.async.TaskSchedule.RepeatUntilFailure
import gears.async.*

import scala.util.Try

// TODO: maybe rename into `Publisher`.
trait Observable[E]:
trait Publisher[E]:
def source: Async.Source[E]
def asRunnable: Task[Unit]
def publishingChannel(using Async): ReadableChannel[E] = source.toChannel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package io.github.tassiLuca.rears

import gears.async.{ChannelMultiplexer, Future, ReadableChannel, Task}
import gears.async.{Async, ChannelMultiplexer, ReadableChannel, Task}

object Controller:

def oneToMany[T, R](
publisher: Observable[T],
publisherChannel: ReadableChannel[T],
consumers: Set[Consumer[R]],
transformation: PipelineTransformation[T, R] = identity,
): Task[Unit] = Task:
val multiplexer = ChannelMultiplexer[R]()
consumers.foreach(c => multiplexer.addSubscriber(c.listeningChannel))
multiplexer.addPublisher(transformation(publisher.publishingChannel))
multiplexer.addPublisher(transformation(publisherChannel))
// blocking call: the virtual thread on top of which this task is executed needs to block
// to continue publishing publisher's events towards the consumer by means of the multiplexer.
multiplexer.run()
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ControllerTest extends AnyFlatSpec with Matchers:
consumer(e => consumerBValues = consumerBValues :+ e),
)
Async.blocking:
Controller.oneToMany(producer, consumers, identity).run
Controller.oneToMany(producer.source.toChannel, consumers, identity).run
consumers.foreach(_.asRunnable.run)
producer.asRunnable.run.await
// TODO: improve with an extension method that wait for a certain amount of time,
Expand All @@ -37,7 +37,7 @@ class ControllerTest extends AnyFlatSpec with Matchers:
// consumerBValues.size shouldBe items
}

def publisher: Observable[Item] = new Observable[Int]:
def publisher: Publisher[Item] = new Publisher[Int]:
private var i = 0
private val boundarySource = BoundarySource[Int]()
override def source: Async.Source[Item] = boundarySource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,76 @@ import gears.async.TaskSchedule.Every
import gears.async.default.given
import gears.async.{Async, Future, Listener, ReadableChannel, Task, TaskSchedule, UnboundedChannel}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers

import scala.concurrent.duration.Duration
import concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.util.Random

class PipelineTransformationsTest extends AnyFlatSpec with Matchers {
class PipelineTransformationsTest extends AnyFunSpec with Matchers {

"Filtering a channel" should "return a new channel with only the elements passing the predicate" in {
Async.blocking:
val filtered = producer.filter(_ % 2 == 0)
for i <- 2 to 10 by 2 do filtered.read() shouldBe Right(i)
describe("Filtering a channel") {
it("return a new channel with only the elements passing the predicate") {
Async.blocking:
val filtered = producer.filter(_ % 2 == 0)
for i <- 2 to 10 by 2 do filtered.read() shouldBe Right(i)
}
}

"Debouncing a channel" should "emit the first item immediately" in {
val span = 1.seconds
Async.blocking:
val debounced = infiniteProducer().debounce(span)
val before = System.currentTimeMillis()
debounced.read()
val now = System.currentTimeMillis()
now - before should be < span.toMillis
}

"Debouncing a channel" should "only emit an item if the given timespan has passed without emitting another value" in {
val span = 2.seconds
Async.blocking:
val debounced = infiniteProducer().debounce(span)
debounced.read()
for _ <- 1 to 4 do
describe("Debouncing a channel") {
it("return a new channel whose first item is emitted immediately") {
val span = 1.seconds
Async.blocking:
val debounced = infiniteProducer().debounce(span)
val before = System.currentTimeMillis()
debounced.read()
val now = System.currentTimeMillis()
now - before should be > span.toMillis
}
now - before should be < span.toMillis
}

"Buffering a channel" should "periodically gather items emitted by the channel into bundles and emit them" in {
val step = 2
Async.blocking:
val buffered = producer.buffer(step)
for i <- 1 to 10 by step do buffered.read() shouldBe Right(List.range(i, i + step))
it("return a new channel that emit an item if the given timespan has passed without emitting anything") {
val span = 2.seconds
Async.blocking:
val debounced = infiniteProducer().debounce(span)
debounced.read()
for _ <- 1 to 4 do
val before = System.currentTimeMillis()
debounced.read()
val now = System.currentTimeMillis()
now - before should be > span.toMillis
}
}

"Buffering a channel with size not multiple of elements" should "return fewer element" in {
val step = 3
Async.blocking:
val buffered = producer.buffer(step)
for i <- 1 to 9 by step do buffered.read() shouldBe Right(List.range(i, i + step))
buffered.read() shouldBe Right(List(10))
describe("Buffering a channel") {
it("return a new channel that periodically gather items into bundles and emit them") {
val step = 2
Async.blocking:
val buffered = producer.buffer(step)
for i <- 1 to 10 by step do buffered.read() shouldBe Right(List.range(i, i + step))
}

it("group fewer items if the nth element is not read within the given timespan") {
val step = 3
Async.blocking:
val buffered = producer.buffer(n = step, timespan = 2.seconds)
for i <- 1 to 9 by step do buffered.read() shouldBe Right(List.range(i, i + step))
buffered.read() shouldBe Right(List(10))
}
}

"Grouping channel elements on a element selector" should "return a Map with the correct group of channel" in {
Async.blocking:
val grouped = producer.groupBy(_ % 2 == 0)
for _ <- 0 until 2 do
val group = grouped.read()
group.isRight shouldBe true
group.toOption.get match
case (false, c) => for i <- 1 to 10 by 2 do c.read() shouldBe Right(i)
case (true, c) => for i <- 2 to 10 by 2 do c.read() shouldBe Right(i)
describe("Grouping a channel on an element selector") {
it("return a Map with the correct group of channel") {
Async.blocking:
val grouped = producer.groupBy(_ % 2 == 0)
for _ <- 0 until 2 do
val group = grouped.read()
group.isRight shouldBe true
group.toOption.get match
case (false, c) => for i <- 1 to 10 by 2 do c.read() shouldBe Right(i)
case (true, c) => for i <- 2 to 10 by 2 do c.read() shouldBe Right(i)
}
}

def producer(using Async): ReadableChannel[Int] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.tassiLuca.smarthome.application

import gears.async.{Async, AsyncOperations, ReadableChannel, Task}
import io.github.tassiLuca.rears.{Controller, Observable}
import gears.async.{Async, AsyncOperations, ReadableChannel}
import io.github.tassiLuca.rears.Controller
import io.github.tassiLuca.smarthome.core.{
HACControllerComponent,
TemperatureEntry,
Expand All @@ -16,8 +16,8 @@ trait ThermostatHubManager extends ThermostatComponent with ThermostatSchedulerC
def run(source: ReadableChannel[TemperatureEntry])(using Async, AsyncOperations): Unit =
thermostat.asRunnable.run
Controller
.oneToManyR(
source = source,
.oneToMany(
source,
consumers = Set(thermostat),
transformation = identity,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package io.github.tassiLuca.smarthome.core

import io.github.tassiLuca.rears.Observable
import io.github.tassiLuca.rears.Publisher

/** A generic source of [[SensorEvent]]. */
trait SensorSource:
val source: Observable[SensorEvent]
trait SensorSource extends Publisher[SensorEvent]

/** A detection performe by a sensing unit. */
/** A detection performed by a sensing unit. */
sealed trait SensorEvent(val name: String)
case class TemperatureEntry(temperature: Double) extends SensorEvent("temperature")
case class LuminosityEntry(luminosity: Double) extends SensorEvent("luminosity")
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package io.github.tassiLuca.smarthome.application

import gears.async.default.given
import io.github.tassiLuca.rears.filter
import io.github.tassiLuca.rears.groupBy
import gears.async.TaskSchedule.Every
import gears.async.{Async, ReadableChannel, Task}
import io.github.tassiLuca.rears.{BoundarySource, Observable}
import io.github.tassiLuca.rears.toChannel
import gears.async.default.given
import gears.async.{Async, Future, ReadableChannel, Task}
import io.github.tassiLuca.rears.{BoundarySource, groupBy}
import io.github.tassiLuca.smarthome.core.{SensorEvent, SensorSource, TemperatureEntry}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -15,29 +12,26 @@ import scala.util.Random

class ThermostatHubManagerTest extends AnyFlatSpec with Matchers {

val thermostatHubManager = new ThermostatHubManager:
val thermostatHubManager: ThermostatHubManager = new ThermostatHubManager:
override val haccController: HACController = new HACController:
override def onHeater(): Unit = ???
override def offHeather(): Unit = ???
override def onAirConditioner(): Unit = ???
override def offAirConditioner(): Unit = ???

val sensorSource: SensorSource = new SensorSource:
override val source: Observable[SensorEvent] = new Observable[SensorEvent]:
private val boundarySource = BoundarySource[SensorEvent]()
override def source: Async.Source[SensorEvent] = boundarySource
override def asRunnable: Task[Unit] = Task {
boundarySource.notifyListeners(TemperatureEntry(Random.nextDouble()))
}.schedule(Every(1_000))
private val boundarySource = BoundarySource[SensorEvent]()
override def source: Async.Source[SensorEvent] = boundarySource
override def asRunnable: Task[Unit] = Task {
boundarySource.notifyListeners(TemperatureEntry(Random.nextDouble()))
}.schedule(Every(1_000))

"The thermostat hub manager" should "receive event from the source" in {
Async.blocking:
val s = sensorSource.source.source.toChannel
val t = sensorSource.source.asRunnable.run
s.groupBy(_.name).read() match
case Right(("temperature", c)) => thermostatHubManager.run(c.asInstanceOf[ReadableChannel[TemperatureEntry]])
case _ => println("Boh")
// t.await
Future:
sensorSource.publishingChannel.groupBy(_.name).read() match
case Right(("temperature", c)) => thermostatHubManager.run(c.asInstanceOf[ReadableChannel[TemperatureEntry]])
case _ => println("Boh")
sensorSource.asRunnable.run
}

}

0 comments on commit 21dc9d9

Please sign in to comment.