Skip to content

Commit

Permalink
refactor(rears): allow stateful consumer to have a state as a transfo…
Browse files Browse the repository at this point in the history
…rmation of the received data
  • Loading branch information
tassiluca committed Feb 23, 2024
1 parent c3de9c9 commit 8188cdb
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 22 deletions.
18 changes: 9 additions & 9 deletions rears/src/main/scala/io/github/tassiLuca/rears/Boundary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ trait Publisher[E]:
def publishingChannel: ReadableChannel[E] = channel.asReadable

/** A consumer, i.e. a runnable entity devoted to consume data from a channel. */
trait Consumer[E]:
trait Consumer[E, S]:

/** The [[SendableChannel]] to send items to, where consumers listen for new items. */
val listeningChannel: SendableChannel[Try[E]] = UnboundedChannel()
Expand All @@ -28,19 +28,19 @@ trait Consumer[E]:
}.schedule(RepeatUntilFailure())

/** The suspendable reaction triggered upon a new read of an item succeeds. */
protected def react(e: Try[E])(using Async): Unit
protected def react(e: Try[E])(using Async): S

/** A mixin to make consumers stateful. */
trait State[E]:
consumer: Consumer[E] =>
/** A mixin to make consumer stateful. Its state is updated with the result of the [[react]]ion. */
trait State[E, S]:
consumer: Consumer[E, S] =>

private var _state: Option[E] = None
private var _state: Option[S] = None

def state: Option[E] = synchronized(_state)
/** @return the current state of the consumer, wrapped within an [[Option]]. */
def state: Option[S] = synchronized(_state)

override def asRunnable: Task[Unit] = Task {
listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach { e =>
react(e)
synchronized { _state = e.toOption }
synchronized { _state = Some(react(e)) }
}
}.schedule(RepeatUntilFailure())
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Controller:

def oneToMany[T, R](
publisherChannel: ReadableChannel[T],
consumers: Set[Consumer[R]],
consumers: Set[Consumer[R, ?]],
transformation: PipelineTransformation[T, R] = identity,
): Task[Unit] = Task:
val multiplexer = ChannelMultiplexer[R]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ControllerTest extends AnyFlatSpec with Matchers:
var consumerAValues = Seq[Try[Int]]()
var consumerBValues = Seq[Try[Int]]()
val producer = publisher
val consumers = Set(
val consumers = Set[Consumer[Int, ?]](
consumer(e => consumerAValues = consumerAValues :+ e),
consumer(e => consumerBValues = consumerBValues :+ e),
)
Expand All @@ -41,6 +41,6 @@ class ControllerTest extends AnyFlatSpec with Matchers:
i = i + 1
}.schedule(Every(1_000, maxRepetitions = items))

def consumer(action: Try[Item] => Unit): Consumer[Int] = new Consumer[Int]:
def consumer(action: Try[Item] => Unit): Consumer[Int, Unit] = new Consumer[Int, Unit]:
override val listeningChannel: SendableChannel[Try[Item]] = UnboundedChannel[Try[Int]]()
override def react(e: Try[Item])(using Async): Unit = action(e)
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ trait SensorHealthCheckerComponent[E <: SensorEvent]:
val sensorHealthChecker: SensorHealthChecker

/** A generic consumer of [[SensorEvent]] that detects */
trait SensorHealthChecker extends Consumer[Seq[E]] with State[Seq[E]]
trait SensorHealthChecker extends Consumer[Seq[E], Seq[E]] with State[Seq[E], Seq[E]]

object SensorHealthChecker:

def apply(): SensorHealthChecker = SensorHealthCheckerImpl()

private class SensorHealthCheckerImpl extends SensorHealthChecker:

override protected def react(e: Try[Seq[E]])(using Async): Unit = e match
override protected def react(e: Try[Seq[E]])(using Async): Seq[E] = e match
case Success(es) =>
if state.isDefined && es.toSet.map(_.name) != state.map(_.toSet.map(_.name)).get then
val alertMessage =
Expand All @@ -33,4 +33,5 @@ trait SensorHealthCheckerComponent[E <: SensorEvent]:
|""".stripMargin
context.alertSystem.notify(alertMessage)
context.dashboard.alertNotified(alertMessage)
case Failure(es) => context.alertSystem.notify(es.getMessage)
es
case Failure(es) => context.alertSystem.notify(es.getMessage); Seq()
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ trait ThermostatComponent:
val thermostat: Thermostat

/** The entity in charge of controlling the heater and conditioner actuators based on read [[TemperatureEntry]]s. */
trait Thermostat extends Consumer[Seq[TemperatureEntry]] with State[Seq[TemperatureEntry]]:
trait Thermostat
extends Consumer[Seq[TemperatureEntry], Option[Temperature]]
with State[Seq[TemperatureEntry], Option[Temperature]]:
val scheduler: ThermostatScheduler

object Thermostat:
Expand All @@ -22,16 +24,16 @@ trait ThermostatComponent:

private val hysteresis = 1.5

override protected def react(e: Try[Seq[TemperatureEntry]])(using Async): Unit =
for
entries <- e
average = entries.map(_.temperature).sum / entries.size
yield average.evaluate()
override protected def react(e: Try[Seq[TemperatureEntry]])(using Async): Option[Temperature] =
e.map { entries => entries.map(_.temperature).sum / entries.size }
.map { avg => avg.evaluate(); avg }
.toOption

extension (t: Temperature)
private def evaluate()(using Async): Unit =
val target = scheduler.currentTarget
context.dashboard.temperatureUpdated(t)
if t > scheduler.currentTarget + hysteresis then offHeater() else onHeater()
if t > target + hysteresis then offHeater() else if t < target then onHeater()

private def offHeater()(using Async): Unit =
context.heater.off(); context.dashboard.offHeaterNotified()
Expand Down

0 comments on commit 8188cdb

Please sign in to comment.