From 21dc9d908f8f2aa3285130553b4c310b901b2800 Mon Sep 17 00:00:00 2001 From: Luca Tassinari Date: Wed, 7 Feb 2024 17:45:41 +0100 Subject: [PATCH] refactor(rears-core): rename Observable -> Publisher and move test to describe AnyFunSpec --- .../io/github/tassiLuca/rears/Boundary.scala | 5 +- .../github/tassiLuca/rears/Controller.scala | 6 +- .../tassiLuca/rears/ControllerTest.scala | 4 +- .../rears/PipelineTransformationsTest.scala | 95 ++++++++++--------- .../application/ThermostatHubManager.scala | 8 +- .../smarthome/core/SensorSource.scala | 7 +- .../ThermostatHubManagerTest.scala | 34 +++---- 7 files changed, 80 insertions(+), 79 deletions(-) diff --git a/rears-core/src/main/scala/io/github/tassiLuca/rears/Boundary.scala b/rears-core/src/main/scala/io/github/tassiLuca/rears/Boundary.scala index b947ed97..8ccc9d29 100644 --- a/rears-core/src/main/scala/io/github/tassiLuca/rears/Boundary.scala +++ b/rears-core/src/main/scala/io/github/tassiLuca/rears/Boundary.scala @@ -1,12 +1,11 @@ package io.github.tassiLuca.rears +import gears.async.{Async, Channel, ReadableChannel, SendableChannel, Task, UnboundedChannel} import gears.async.TaskSchedule.RepeatUntilFailure -import gears.async.* import scala.util.Try -// TODO: maybe rename into `Publisher`. -trait Observable[E]: +trait Publisher[E]: def source: Async.Source[E] def asRunnable: Task[Unit] def publishingChannel(using Async): ReadableChannel[E] = source.toChannel diff --git a/rears-core/src/main/scala/io/github/tassiLuca/rears/Controller.scala b/rears-core/src/main/scala/io/github/tassiLuca/rears/Controller.scala index 6aefeee8..ad31f8c3 100644 --- a/rears-core/src/main/scala/io/github/tassiLuca/rears/Controller.scala +++ b/rears-core/src/main/scala/io/github/tassiLuca/rears/Controller.scala @@ -1,17 +1,17 @@ package io.github.tassiLuca.rears -import gears.async.{ChannelMultiplexer, Future, ReadableChannel, Task} +import gears.async.{Async, ChannelMultiplexer, ReadableChannel, Task} object Controller: def oneToMany[T, R]( - publisher: Observable[T], + publisherChannel: ReadableChannel[T], consumers: Set[Consumer[R]], transformation: PipelineTransformation[T, R] = identity, ): Task[Unit] = Task: val multiplexer = ChannelMultiplexer[R]() consumers.foreach(c => multiplexer.addSubscriber(c.listeningChannel)) - multiplexer.addPublisher(transformation(publisher.publishingChannel)) + multiplexer.addPublisher(transformation(publisherChannel)) // blocking call: the virtual thread on top of which this task is executed needs to block // to continue publishing publisher's events towards the consumer by means of the multiplexer. multiplexer.run() diff --git a/rears-core/src/test/scala/io/github/tassiLuca/rears/ControllerTest.scala b/rears-core/src/test/scala/io/github/tassiLuca/rears/ControllerTest.scala index bdc5dd5a..85c9cdf8 100644 --- a/rears-core/src/test/scala/io/github/tassiLuca/rears/ControllerTest.scala +++ b/rears-core/src/test/scala/io/github/tassiLuca/rears/ControllerTest.scala @@ -23,7 +23,7 @@ class ControllerTest extends AnyFlatSpec with Matchers: consumer(e => consumerBValues = consumerBValues :+ e), ) Async.blocking: - Controller.oneToMany(producer, consumers, identity).run + Controller.oneToMany(producer.source.toChannel, consumers, identity).run consumers.foreach(_.asRunnable.run) producer.asRunnable.run.await // TODO: improve with an extension method that wait for a certain amount of time, @@ -37,7 +37,7 @@ class ControllerTest extends AnyFlatSpec with Matchers: // consumerBValues.size shouldBe items } - def publisher: Observable[Item] = new Observable[Int]: + def publisher: Publisher[Item] = new Publisher[Int]: private var i = 0 private val boundarySource = BoundarySource[Int]() override def source: Async.Source[Item] = boundarySource diff --git a/rears-core/src/test/scala/io/github/tassiLuca/rears/PipelineTransformationsTest.scala b/rears-core/src/test/scala/io/github/tassiLuca/rears/PipelineTransformationsTest.scala index 6510a073..409a26b8 100644 --- a/rears-core/src/test/scala/io/github/tassiLuca/rears/PipelineTransformationsTest.scala +++ b/rears-core/src/test/scala/io/github/tassiLuca/rears/PipelineTransformationsTest.scala @@ -4,6 +4,7 @@ import gears.async.TaskSchedule.Every import gears.async.default.given import gears.async.{Async, Future, Listener, ReadableChannel, Task, TaskSchedule, UnboundedChannel} import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.funspec.AnyFunSpec import org.scalatest.matchers.should.Matchers import scala.concurrent.duration.Duration @@ -11,60 +12,68 @@ import concurrent.duration.DurationInt import scala.language.postfixOps import scala.util.Random -class PipelineTransformationsTest extends AnyFlatSpec with Matchers { +class PipelineTransformationsTest extends AnyFunSpec with Matchers { - "Filtering a channel" should "return a new channel with only the elements passing the predicate" in { - Async.blocking: - val filtered = producer.filter(_ % 2 == 0) - for i <- 2 to 10 by 2 do filtered.read() shouldBe Right(i) + describe("Filtering a channel") { + it("return a new channel with only the elements passing the predicate") { + Async.blocking: + val filtered = producer.filter(_ % 2 == 0) + for i <- 2 to 10 by 2 do filtered.read() shouldBe Right(i) + } } - "Debouncing a channel" should "emit the first item immediately" in { - val span = 1.seconds - Async.blocking: - val debounced = infiniteProducer().debounce(span) - val before = System.currentTimeMillis() - debounced.read() - val now = System.currentTimeMillis() - now - before should be < span.toMillis - } - - "Debouncing a channel" should "only emit an item if the given timespan has passed without emitting another value" in { - val span = 2.seconds - Async.blocking: - val debounced = infiniteProducer().debounce(span) - debounced.read() - for _ <- 1 to 4 do + describe("Debouncing a channel") { + it("return a new channel whose first item is emitted immediately") { + val span = 1.seconds + Async.blocking: + val debounced = infiniteProducer().debounce(span) val before = System.currentTimeMillis() debounced.read() val now = System.currentTimeMillis() - now - before should be > span.toMillis - } + now - before should be < span.toMillis + } - "Buffering a channel" should "periodically gather items emitted by the channel into bundles and emit them" in { - val step = 2 - Async.blocking: - val buffered = producer.buffer(step) - for i <- 1 to 10 by step do buffered.read() shouldBe Right(List.range(i, i + step)) + it("return a new channel that emit an item if the given timespan has passed without emitting anything") { + val span = 2.seconds + Async.blocking: + val debounced = infiniteProducer().debounce(span) + debounced.read() + for _ <- 1 to 4 do + val before = System.currentTimeMillis() + debounced.read() + val now = System.currentTimeMillis() + now - before should be > span.toMillis + } } - "Buffering a channel with size not multiple of elements" should "return fewer element" in { - val step = 3 - Async.blocking: - val buffered = producer.buffer(step) - for i <- 1 to 9 by step do buffered.read() shouldBe Right(List.range(i, i + step)) - buffered.read() shouldBe Right(List(10)) + describe("Buffering a channel") { + it("return a new channel that periodically gather items into bundles and emit them") { + val step = 2 + Async.blocking: + val buffered = producer.buffer(step) + for i <- 1 to 10 by step do buffered.read() shouldBe Right(List.range(i, i + step)) + } + + it("group fewer items if the nth element is not read within the given timespan") { + val step = 3 + Async.blocking: + val buffered = producer.buffer(n = step, timespan = 2.seconds) + for i <- 1 to 9 by step do buffered.read() shouldBe Right(List.range(i, i + step)) + buffered.read() shouldBe Right(List(10)) + } } - "Grouping channel elements on a element selector" should "return a Map with the correct group of channel" in { - Async.blocking: - val grouped = producer.groupBy(_ % 2 == 0) - for _ <- 0 until 2 do - val group = grouped.read() - group.isRight shouldBe true - group.toOption.get match - case (false, c) => for i <- 1 to 10 by 2 do c.read() shouldBe Right(i) - case (true, c) => for i <- 2 to 10 by 2 do c.read() shouldBe Right(i) + describe("Grouping a channel on an element selector") { + it("return a Map with the correct group of channel") { + Async.blocking: + val grouped = producer.groupBy(_ % 2 == 0) + for _ <- 0 until 2 do + val group = grouped.read() + group.isRight shouldBe true + group.toOption.get match + case (false, c) => for i <- 1 to 10 by 2 do c.read() shouldBe Right(i) + case (true, c) => for i <- 2 to 10 by 2 do c.read() shouldBe Right(i) + } } def producer(using Async): ReadableChannel[Int] = diff --git a/smart-home/src/main/scala/io/github/tassiLuca/smarthome/application/ThermostatHubManager.scala b/smart-home/src/main/scala/io/github/tassiLuca/smarthome/application/ThermostatHubManager.scala index 7b07dabb..b59f527d 100644 --- a/smart-home/src/main/scala/io/github/tassiLuca/smarthome/application/ThermostatHubManager.scala +++ b/smart-home/src/main/scala/io/github/tassiLuca/smarthome/application/ThermostatHubManager.scala @@ -1,7 +1,7 @@ package io.github.tassiLuca.smarthome.application -import gears.async.{Async, AsyncOperations, ReadableChannel, Task} -import io.github.tassiLuca.rears.{Controller, Observable} +import gears.async.{Async, AsyncOperations, ReadableChannel} +import io.github.tassiLuca.rears.Controller import io.github.tassiLuca.smarthome.core.{ HACControllerComponent, TemperatureEntry, @@ -16,8 +16,8 @@ trait ThermostatHubManager extends ThermostatComponent with ThermostatSchedulerC def run(source: ReadableChannel[TemperatureEntry])(using Async, AsyncOperations): Unit = thermostat.asRunnable.run Controller - .oneToManyR( - source = source, + .oneToMany( + source, consumers = Set(thermostat), transformation = identity, ) diff --git a/smart-home/src/main/scala/io/github/tassiLuca/smarthome/core/SensorSource.scala b/smart-home/src/main/scala/io/github/tassiLuca/smarthome/core/SensorSource.scala index bc503974..dd942c7a 100644 --- a/smart-home/src/main/scala/io/github/tassiLuca/smarthome/core/SensorSource.scala +++ b/smart-home/src/main/scala/io/github/tassiLuca/smarthome/core/SensorSource.scala @@ -1,12 +1,11 @@ package io.github.tassiLuca.smarthome.core -import io.github.tassiLuca.rears.Observable +import io.github.tassiLuca.rears.Publisher /** A generic source of [[SensorEvent]]. */ -trait SensorSource: - val source: Observable[SensorEvent] +trait SensorSource extends Publisher[SensorEvent] -/** A detection performe by a sensing unit. */ +/** A detection performed by a sensing unit. */ sealed trait SensorEvent(val name: String) case class TemperatureEntry(temperature: Double) extends SensorEvent("temperature") case class LuminosityEntry(luminosity: Double) extends SensorEvent("luminosity") diff --git a/smart-home/src/test/scala/io/github/tassiLuca/smarthome/application/ThermostatHubManagerTest.scala b/smart-home/src/test/scala/io/github/tassiLuca/smarthome/application/ThermostatHubManagerTest.scala index 3525a841..c34afc96 100644 --- a/smart-home/src/test/scala/io/github/tassiLuca/smarthome/application/ThermostatHubManagerTest.scala +++ b/smart-home/src/test/scala/io/github/tassiLuca/smarthome/application/ThermostatHubManagerTest.scala @@ -1,12 +1,9 @@ package io.github.tassiLuca.smarthome.application -import gears.async.default.given -import io.github.tassiLuca.rears.filter -import io.github.tassiLuca.rears.groupBy import gears.async.TaskSchedule.Every -import gears.async.{Async, ReadableChannel, Task} -import io.github.tassiLuca.rears.{BoundarySource, Observable} -import io.github.tassiLuca.rears.toChannel +import gears.async.default.given +import gears.async.{Async, Future, ReadableChannel, Task} +import io.github.tassiLuca.rears.{BoundarySource, groupBy} import io.github.tassiLuca.smarthome.core.{SensorEvent, SensorSource, TemperatureEntry} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -15,7 +12,7 @@ import scala.util.Random class ThermostatHubManagerTest extends AnyFlatSpec with Matchers { - val thermostatHubManager = new ThermostatHubManager: + val thermostatHubManager: ThermostatHubManager = new ThermostatHubManager: override val haccController: HACController = new HACController: override def onHeater(): Unit = ??? override def offHeather(): Unit = ??? @@ -23,21 +20,18 @@ class ThermostatHubManagerTest extends AnyFlatSpec with Matchers { override def offAirConditioner(): Unit = ??? val sensorSource: SensorSource = new SensorSource: - override val source: Observable[SensorEvent] = new Observable[SensorEvent]: - private val boundarySource = BoundarySource[SensorEvent]() - override def source: Async.Source[SensorEvent] = boundarySource - override def asRunnable: Task[Unit] = Task { - boundarySource.notifyListeners(TemperatureEntry(Random.nextDouble())) - }.schedule(Every(1_000)) + private val boundarySource = BoundarySource[SensorEvent]() + override def source: Async.Source[SensorEvent] = boundarySource + override def asRunnable: Task[Unit] = Task { + boundarySource.notifyListeners(TemperatureEntry(Random.nextDouble())) + }.schedule(Every(1_000)) "The thermostat hub manager" should "receive event from the source" in { Async.blocking: - val s = sensorSource.source.source.toChannel - val t = sensorSource.source.asRunnable.run - s.groupBy(_.name).read() match - case Right(("temperature", c)) => thermostatHubManager.run(c.asInstanceOf[ReadableChannel[TemperatureEntry]]) - case _ => println("Boh") - // t.await + Future: + sensorSource.publishingChannel.groupBy(_.name).read() match + case Right(("temperature", c)) => thermostatHubManager.run(c.asInstanceOf[ReadableChannel[TemperatureEntry]]) + case _ => println("Boh") + sensorSource.asRunnable.run } - }