Skip to content

Commit

Permalink
docs: more on rears
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Mar 1, 2024
1 parent 6e272f9 commit 4e8ac99
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 49 deletions.
33 changes: 30 additions & 3 deletions docs/content/docs/02-basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ trait PostsServiceComponent:
def all()(using Async): Either[String, LazyList[Post]]
```

As you can see, `Future`s are gone and the return type it's just the result of their intent (expressed with `Either` to return a meaningful message in case of failure). The fact they are _suspendable_ is expressed by means of the `Async` context, which is required to invoke those function.
As you can see, `Future`s are gone and the return type it's just the result of their intent (expressed with `Either` to return a meaningful message in case of failure). The fact they are _suspendable_ is expressed by means of the `Async` context, which is required to invoke those functions.

> Key inspiring principle (actually, "stolen" by Kotlin)
>
Expand All @@ -326,9 +326,8 @@ The other important key feature of the library is the support to **structured co
```scala
val f = Future {
// this can be interrupted
uninterruptible {
uninterruptible:
// this cannot be interrupted *immediately*
}
// this can be interrupted
}
```
Expand Down Expand Up @@ -396,4 +395,32 @@ how suspension is implemented

---

## Best practices

Some of the best practices of Kotlin Coroutines that can be applied to Scala's direct style Gears as well:

- **Do not use `Future`/`async` with an immediate `await`**: it makes no sense to define an asynchronous computation if we have to immediately wait for its result without doing anything else in the meantime:

```scala
// DON'T
val f = Future:
// some suspending operation...
service.postByTitle("Direct style guidelines")
val post = f.await

// DO
val post = service.get("Direct style guidelines")
```

In case a few async tasks need to be started the last one does not require to be run in a new `Future`/`async`, though can be beneficial for readability and maintainability reasons:

```scala
Future:
val post = Future { service.postByTitle("Direct style guidelines") }
val users = Future { service.user() } // not necessary here, but useful for readability
showPost(post.await, users.await)
```

- **Suspending functions await completion of their children**

## Conclusions
9 changes: 6 additions & 3 deletions docs/content/docs/03-channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ class MonadicAppController extends AppController:

### Scala Gears version

The interfaces of the Direct Style with Gears differ from the monadic one by their return type, which is a simpler `Either` data type, and by the fact they are **suspendable** functions, hence they require an Async context to be executed.
The interfaces of the Direct Style with Gears differ from the monadic one by their return type, which is a simpler `Either` data type, and by the fact they are **suspendable functions**, hence they require an Async context to be executed.
This is the first important difference: the `analyze` method, differently from the monadic version, doesn't return immediately the control; instead, it suspends the execution of the client until the result is available (though offering the opportunity to react to each update).
This obeys the principle of **explicit asynchrony**: if the client wants to perform this operation asynchronously, it has to opt in explicitly, either using a `Future` or any other asynchronous construct (depending on the library used).
Moreover, this interface is library-agnostic, meaning that it doesn't depend on any specific asynchronous library.

```scala
trait Analyzer:
Expand Down Expand Up @@ -362,7 +365,7 @@ override suspend fun analyze(

## Conclusions

> - `Channel`s are the basic communication and synchronization primitive for exchanging data between `Future`s/`Coroutines`.
> - `Channel`s are the basic communication and synchronization primitive for exchanging data between `Future`s/`Coroutines`.
> - Scala Gears support for `Terminable` channels or a review of the closing mechanism should be considered.
> - The `Flow` abstraction in Kotlin Coroutines is a powerful tool for handling cold streams of data, and it is a perfect fit for functions that need to return a stream of asynchronously computed values by request.
> - The `Flow` abstraction in Kotlin Coroutines is a powerful tool for handling cold streams of data, and it is a perfect fit for functions that need to return a stream of asynchronously computed values **by request**.
>
117 changes: 77 additions & 40 deletions docs/content/docs/04-rears.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,23 @@
bookToc: false
---

# An attempt to bring reactivity principles into gears
# On the reactivity

So far, we've explored the basics of asynchronous abstraction mechanisms provided by the direct style of the Scala Gears and Kotlin Coroutines frameworks.
The goal of this last example is to investigate, using a simple example, whether these two frameworks offer sufficient idiomatic abstractions to deal with event-based reactive systems.

The goal of this last example is to investigate, using a simple example, whether these two frameworks offer sufficient idiomatic abstractions to deal with **reactive-like systems**.

## Smart Hub System example
## Smart Hub example

{{< hint info >}}
**Idea**: in an IoT context, a multitude of sensors of different types, each replicated to ensure accuracy, transmit their measurements to a central hub, which in turn needs to react, in real-time, forwarding to the appropriate controller the data, possibly running some kind of transformation, enabling controllers to make decisions based on their respective logic.
**Idea**: in an IoT context, a multitude of sensors of different types, each replicated to ensure accuracy, transmit their measurements to a central hub, which in turn needs to react, in real-time, forwarding to the appropriate controller the data, possibly performing some kind of transformation.
{{< /hint >}}

### Scala Gears version

Before delving into the example, two abstractions of Gears, yet not covered, are introduced:

- `Task`s provide a way, not only to run asynchronous computation, essentially wrapping a `() => Future[T]`, but also to schedule it, possibly repeating it. Different scheduling strategies are available: `Every`, `ExponentialBackoff`, `FibonacciBackoff`, `RepeatUntilFailure`, `RepeatUntilSuccess`.
- This allows for implementing easily proactive computations, like a game loop.
- This allows for implementing easily proactive computations

{{< mermaid class="smaller" >}}
classDiagram
Expand All @@ -46,11 +45,11 @@ classDiagram

- if the body of a `Task` **does not** perform any suspending operations the `Async.blocking` blocks the current thread until the task is completed (either successfully or not);
- if the body of a `Task` **does** perform suspending operations then the `Async.blocking` **does not wait for the task to complete** and its context is left as soon as reaches its end.
- If we want to wait for the task completion, we need to use `Async.await` (or `awaitResult`)
- If we want to wait for the task completion, we need to use explicitly `Async.await` (or `awaitResult`)
- **Cons**: depending on the content of the block, the behavior is different! This is _error-prone_ and very difficult to debug with high-order functions:
{{< /hint >}}

- To avoid the _work-stealing behavior_ of channel consumers, a `ChannelMultiplexer` can be used. It is essentially a container of producing and consuming channels, which can be added and removed at runtime. Internally, it is implemented with a thread that continuously races the set of publishers and once it reads a value, it forwards it to each subscriber channel.
- To avoid the _work-stealing behavior_ of channel consumers, a `ChannelMultiplexer` can be used. It is essentially a container of `Readable` and `Sendable` channels, which can be added and removed at runtime. Internally, it is implemented with a thread that continuously races the set of publishers and once it reads a value, it forwards it to each subscriber channel.
- Order is guaranteed only per producer;
- Typically, the consumer creates a channel and adds it to the multiplexer, then starts reading from it, possibly using a scheduled task.
- if the consumer attaches to the channel after the producer has started, the values sent during this interval are lost, like _hot observables_ in Rx.
Expand Down Expand Up @@ -78,13 +77,13 @@ classDiagram

In the proposed strawman Scala Gears library, there are no other kinds of abstractions, nor a way to manipulate channels with functions inspired by Rx.

The attempt was to extend this framework adding first-class support for `Producer` and `Consumer`'s concepts and implementing some of the most common Rx operators, just as a proof of concept, completely leaving out performance concerns.
The attempt, described in the following, has been to extend this framework adding first-class support for `Producer` and `Consumer`'s concepts and implementing some of the most common Rx operators, completely leaving out performance concerns.

[Sources can be found in the `rears` submodule](https://github.com/tassiLuca/PPS-22-direct-style-experiments/tree/master/rears/src/main/scala/io/github/tassiLuca/rears).
[[Sources can be found in the `rears` submodule.]](https://github.com/tassiLuca/PPS-22-direct-style-experiments/tree/master/rears/src/main/scala/io/github/tassiLuca/rears)

- A `Producer` is a runnable entity, programmed with a `Task`, producing items on a channel. It exposes the `publishingChannel` method, which returns a `ReadableChannel` through which interested parties can read produced items.
- A `Consumer` is a runnable entity devoted to consuming data from a channel, exposed by the `listeningChannel` method which returns a `SendableChannel`.
- It can be made stateful by mixing it with the `State` trait, keeping track of its state, updated with the result of the reaction.
- A `Producer` is a runnable entity, programmed with a `Task`, producing items on a channel. It exposes the `publishingChannel` method, which returns a `ReadableChannel` through which interested consumers can read produced items.
- A `Consumer` is a runnable entity devoted to consuming data from a channel, exposed by the `listeningChannel` method which returns a `SendableChannel` to send items to.
- It can be made stateful by mixing it with the `State` trait, allowing it to keep track of its state, which is updated with the result of the reaction.

```scala
/** A producer, i.e. a runnable entity producing items on a channel. */
Expand Down Expand Up @@ -132,14 +131,18 @@ trait State[E, S](initialValue: S):
```

- The `Controller` object exposes methods wiring `Producer` and `Consumer`s altogether, possibly performing some kind of transformation on the `publisherChannel`.
- the `oneToOne` method just wires one single consumer to the `publisherChannel` given in input, possibly having it transformed with the provided transformation.
- the `oneToMany` allows many consumers to be wired to the `publisherChannel`, possibly having it transformed.
- to accomplish this, a `ChannelMultiplexer` is used, which is in charge of forwarding the items read from the transformed `publisherChannel` to all consumers' channels.

```scala
/** Simply, a function that, given in input a [[ReadableChannel]], performs some
* kind of transformation, returning, as a result, another [[ReadableChannel]]. */
type PipelineTransformation[T, R] = ReadableChannel[T] => ReadableChannel[R]

object Controller:
/** Creates a runnable [[Task]] forwarding the items read from the
* [[publisherChannel]] to the given [[consumer]], after having it
* transformed with the given [[transformation]].
/** Creates a runnable [[Task]] forwarding the items read from the [[publisherChannel]]
* to the given [[consumer]], after having it transformed with the given [[transformation]].
*/
def oneToOne[T, R](
publisherChannel: ReadableChannel[T],
Expand All @@ -151,9 +154,8 @@ object Controller:
consumer.listeningChannel.send(transformedChannel.read().tryable)
}.schedule(RepeatUntilFailure())

/** Creates a runnable [[Task]] forwarding the items read from the
* [[publisherChannel]] to all consumers' channels, after having it
* transformed with the given [[transformation]].
/** Creates a runnable [[Task]] forwarding the items read from the [[publisherChannel]]
* to all consumers' channels, after having it transformed with the given [[transformation]].
*/
def oneToMany[T, R](
publisherChannel: ReadableChannel[T],
Expand Down Expand Up @@ -186,10 +188,7 @@ The following `PipelineTransformation`s have been implemented (inspired by Rx):
* --------2--------------4------6-------8--------10->
* </pre>
*/
def filter(p: T => Boolean): ReadableChannel[T] = fromNew[T] { c =>
val value = r.read().toOption.get
if p(value) then c.send(value)
}
def filter(p: T => Boolean): ReadableChannel[T]
```

```scala
Expand All @@ -213,8 +212,7 @@ def debounce(timespan: Duration): ReadableChannel[T]
```

```scala
/** Groups the items emitted by a [[ReadableChannel]] according to the given
* [[keySelector]].
/** Groups the items emitted by a [[ReadableChannel]] according to the given [[keySelector]].
* @return key-value pairs, where the keys are the set of results obtained
* from applying the [[keySelector]] coupled to a new [[ReadableChannel]]
* where only items belonging to that grouping are emitted.
Expand Down Expand Up @@ -280,16 +278,17 @@ def buffer(n: Int, timespan: Duration = 5 seconds): ReadableChannel[List[T]]
def bufferWithin(timespan: Duration = 5 seconds): ReadableChannel[List[T]]
```

Going back to the example here is presented a schema summarizing the proposed design of the system:
Going back to the example here is presented a schema summarizing the flows of data and the transformations to apply to them. This is just a simple example used to test the proposed abstractions.

![system design of the example](../../res/img/rears.svg)

- two types of sensors: `TemperatureSensor` and `LuminositySensor`;
- For simplicity, two types of sensors are considered: `TemperatureSensor` and `LuminositySensor`;
- sensors send data to the smart hub `SensorSource` (e.g. via MQTT)
- `SensorSource` is a `Producer[SensorEvent]`, publishing received data on its `publishingChannel`:

```scala
trait SensorSource extends Producer[SensorEvent]

sealed trait SensorEvent(val name: String)
case class TemperatureEntry(sensorName: String, temperature: Temperature)
extends SensorEvent(sensorName)
Expand All @@ -298,7 +297,7 @@ Going back to the example here is presented a schema summarizing the proposed de
```

- three main controllers:
- `SensorHealthChecker` is a stateful consumer of generic `SensorEvent`s that checks the health of the sensors, sending alerts in case of malfunctioning:
- `SensorHealthChecker` is a stateful consumer of generic `SensorEvent`s that checks the health of the sensors, sending alerts in case of malfunctioning. Here the state is necessary to determine the health of the sensors, based on the last detection:

```scala
/** A [[state]]ful consumer of [[SensorEvent]] detecting possible
Expand All @@ -307,7 +306,8 @@ Going back to the example here is presented a schema summarizing the proposed de
trait SensorHealthChecker extends Consumer[Seq[E], Seq[E]] with State[Seq[E], Seq[E]]
```

- The `Thermostat` is a stateful consumer of temperature entries, taking care of controlling the heating system:
- The `Thermostat` is a stateful consumer of temperature entries, taking care of controlling the heating system. The fact the thermostat keeps track of the last average detection could be useful to a ReSTful API, for example.


```scala
/** A [[state]]ful consumer of [[TemperatureEntry]]s in charge
Expand All @@ -320,12 +320,37 @@ Going back to the example here is presented a schema summarizing the proposed de
val scheduler: ThermostatScheduler
```

- `LightingSystem` is a consumer of luminosity entries, taking care of controlling the lighting system;
- `LightingSystem` is a basic consumer of luminosity entries, taking care of controlling the lighting system;

```scala
trait LightingSystem extends Consumer[Seq[LuminosityEntry], Unit]
```

Each of these controllers reacts to the data received based on their logic and their actual state to accomplish a specific task.
For example:

- the sensor checker sends alerts whether, compared with the previous survey, it did not receive data from some sensor:

```scala
override protected def react(e: Try[Seq[E]])(using Async): Seq[E] = e match
case Success(current) =>
val noMoreActive = state.map(_.name).toSet -- current.map(_.name).toSet
if noMoreActive.nonEmpty then
sendAlert(s"[${currentTime}] Detected ${noMoreActive.mkString(", ")} no more active!")
current
case Failure(es) => sendAlert(es.getMessage); Seq()
```

- the thermostat computes the average temperature and, based on a scheduler, decides whether to turn on or off the heating system:

```scala
override protected def react(e: Try[Seq[TemperatureEntry]])(using Async): Option[Temperature] =
for
averageTemperature <- e.map { entries => entries.map(_.temperature).sum / entries.size }.toOption
_ = averageTemperature.evaluate() // here logic to decide whether turn on or off heating system
yield averageTemperature
```

- The `HubManager` takes care of grouping sensor data by their type and forwarding them to the appropriate manager, either `ThermostatManager` or `LightingManager`:

```scala
Expand Down Expand Up @@ -355,17 +380,29 @@ Going back to the example here is presented a schema summarizing the proposed de
).run
```

```scala
// LightingManager
def run(source: ReadableChannel[LuminosityEntry])(using Async, AsyncOperations): Unit =
lightingSystem.asRunnable.run
Controller.oneToOne(
publisherChannel = source,
consumer = lightingSystem,
transformation = r => r.bufferWithin(samplingWindow),
).run
```
[[Sources are available here](https://github.com/tassiLuca/PPS-22-direct-style-experiments/tree/master/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub)].

To produce a testable version of this example, a simulated source of sensor data has been created, backed to a GUI, through which the user can simulate the behavior of the sensors.
The example is runnable via:

```bash
./gradlew smart-hub-direct:run
```

Three panels should pop up, one for each sensor type, and a dashboard showing the state of the system.
Entering some value in the panels and pressing the "Send" button, after 5 seconds (the configured sampling window), the system should react to the data received, updating the dashboard with the new state.

{{< figure src="../../res/img/smart-hub.png" alt="Smart Hub application" width="90%" >}}

### Kotlin Coroutines version

Kotlin Coroutines offers two other abstractions to deal with asynchronous data streams, belonging to the `flow` "family", which are: `SharedFlow` and `StateFlow`.
Despite their names including `flow`, which we've seen are cold streams, they are actually **hot** (the terminology is a bit misleading...):

- `SharedFlow` is a hot flow that allows for multiple collectors to subscribe to it, enabling the broadcasting of values to multiple consumers or having multiple consumers be "attached" to the same stream of data.
- they can be configured to buffer a certain number of previously emitted values for new collectors so that they can catch up with the latest values -- the so-called, `replay` cache;
- `StateFlow` is an extension of the `SharedFlow`: it is a hot flow that maintains a single value representing a state, holding one value at a time. It operates as a conflated flow, meaning that when a new value is emitted, it replaces the previous value and is immediately sent to new collectors
- this type of flow is beneficial for maintaining a single source of truth for a state and automatically updating all collectors with the latest state.


## Conclusions
Loading

0 comments on commit 4e8ac99

Please sign in to comment.