diff --git a/commons/src/main/scala/io/github/tassiLuca/UseTimerSource.scala b/commons/src/main/scala/io/github/tassiLuca/UseTimerSource.scala
deleted file mode 100644
index 27789389..00000000
--- a/commons/src/main/scala/io/github/tassiLuca/UseTimerSource.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package io.github.tassiLuca
-
-import gears.async.TaskSchedule.RepeatUntilFailure
-import gears.async.default.given
-import gears.async.*
-
-import java.lang.Thread.sleep
-import java.time.LocalTime
-import scala.concurrent.duration.DurationInt
-
-object UseTimerSource extends App:
-
- def timerConsumer(c: ReadableChannel[Any]): Task[Unit] = Task {
- println(s"[CONSUMER] Waiting for a new item...")
- val item = c.read() // blocking!
- println(s"[CONSUMER - ${LocalTime.now()}] received $item")
- }.schedule(RepeatUntilFailure())
-
- def timerProducer(c: SendableChannel[Any])(using async: Async): Task[Unit] =
- val timer = Timer(5.seconds)
- Future { timer.run() }
- Task(c.send(timer.src.awaitResult)).schedule(RepeatUntilFailure())
-
- Async.blocking:
- val channel = BufferedChannel[Any](10)
- timerConsumer(channel.asReadable).run
- timerProducer(channel.asSendable).run
- sleep(21.seconds.toMillis)
diff --git a/commons/src/main/scala/io/github/tassiLuca/boundaries/either.scala b/commons/src/main/scala/io/github/tassiLuca/boundaries/either.scala
index c2580d8a..5a5ab44a 100644
--- a/commons/src/main/scala/io/github/tassiLuca/boundaries/either.scala
+++ b/commons/src/main/scala/io/github/tassiLuca/boundaries/either.scala
@@ -1,7 +1,5 @@
package io.github.tassiLuca.boundaries
-import gears.async.Channel
-
import scala.util.{Failure, Success, Try, boundary}
import scala.util.boundary.{Label, break}
diff --git a/docs/assets/_custom.scss b/docs/assets/_custom.scss
new file mode 100644
index 00000000..2d50c2a3
--- /dev/null
+++ b/docs/assets/_custom.scss
@@ -0,0 +1,17 @@
+.mermaid {
+ margin: 0 auto;
+ text-align: center;
+ width: auto;
+
+ svg {
+ max-width: 100%;
+ }
+
+ &.smaller {
+ width: 90%;
+ }
+
+ &.smallest {
+ width: 55%;
+ }
+}
diff --git a/docs/content/_index.md b/docs/content/_index.md
index f6744527..3d3c6548 100644
--- a/docs/content/_index.md
+++ b/docs/content/_index.md
@@ -13,8 +13,8 @@ Analyze aspects such as:
## Table of contents
-1. [Boundary and break](./docs/01-boundaries)
-2. [Blog posts service example: a direct-style vs monadic comparison](./docs/02-basics)
-3. [Channels](./docs/03-channels.md)
-4. [IoT example](./docs/04-rears.md)
-5. [Going further](./docs/05-going-further.md)
+1. [`Boundary` and `break`](./docs/01-boundaries)
+2. [Basic asynchronous constructs](./docs/02-basics)
+3. [Channels as a communication primitive](./docs/03-channels.md)
+4. [Reactive-like direct style](./docs/04-rears.md)
+5. [Conclusions](./docs/05-going-further.md)
diff --git a/docs/content/docs/02-basics.md b/docs/content/docs/02-basics.md
index 9210711d..46df6596 100644
--- a/docs/content/docs/02-basics.md
+++ b/docs/content/docs/02-basics.md
@@ -16,7 +16,7 @@ To show these weaknesses in practice, a simple example of the core of a web serv
**Idea**: develop a very simple (mocked) service which allows to retrieve and store from a repository blog posts, performing checks on the content and author before the actual storage.
-{{}}
+{{< /hint >}}
The example has been implemented using:
@@ -389,3 +389,7 @@ w.r.t. kotlin coroutines:
- "Finally, about function coloring: Capabilities are actually much better here than other language's proposals such as suspend or async which feel clunky in comparison. This becomes obvious when you consider higher order functions. Capabilities let us define a single map (with no change in signature compared to now!) that works for sync as well as async function arguments. That's the real breakthrough here, which will make everything work so much smoother. I have talked about this elsewhere and this response is already very long, so I will leave it at that."
how suspension is implemented
+
+---
+
+## Conclusions
diff --git a/docs/content/docs/03-channels.md b/docs/content/docs/03-channels.md
index 6d02b99e..be5f33f0 100644
--- a/docs/content/docs/03-channels.md
+++ b/docs/content/docs/03-channels.md
@@ -52,7 +52,7 @@ Three types of channels exist:
> Multiple producers can send data to the channel, as well as multiple consumers can read them, **but each element is handled only _once_, by _one_ of them**, i.e. consumers **compete** with each other for sent values. Once the element is handled, it is immediately removed from the channel.
-## Organization analyzer example
+## GitHub organization analyzer example
To show channels in action an example has been prepared:
@@ -66,7 +66,7 @@ Final result:
As usual, the example has been implemented using monadic `Future`s, as well as Scala gears and Kotlin Coroutines.
-### Analyzer and App controller
+### Analyzer and App Controller
The direct version in Scala gears exposes the following interface, taking in input an organization name and a function through which is possible to react to results while they are computed.
@@ -154,3 +154,5 @@ With respect to reactive programming, they are still quite less reach in terms o
- better closable
---
+
+## Conclusions
diff --git a/docs/content/docs/04-rears.md b/docs/content/docs/04-rears.md
index 2cc7c59c..2867d453 100644
--- a/docs/content/docs/04-rears.md
+++ b/docs/content/docs/04-rears.md
@@ -1,6 +1,23 @@
# An attempt to bring reactivity principles in gears
-{{< mermaid >}}
+So far, we've explored the basics of asynchronous abstraction mechanisms provided by the direct style of the Scala Gears and Kotlin Coroutines frameworks.
+
+The goal of this last example is to investigate, using a simple example, whether these two frameworks offers sufficient idiomatic abstractions to deal with **reactive-like systems**.
+
+## Smart Hub System example
+
+{{< hint info >}}
+**Idea**: in an IoT context a multitude of sensors of different types, each replicated to ensure accurate measurements, transmit their measurements to a central hub, which in turns needs to react, in real-time, forwarding to the appropriate controller the data, possibly running some kind of transformation, enabling controllers to make decisions based on their respective logic.
+{{< /hint >}}
+
+### Scala Gears version
+
+Before delving into the example, two abstractions of Gears, yet not covered, are introduced:
+
+- `Task`s provide a way, not only to run asynchronous computation, essentially wrapping a `Future`, but also to schedule it, possibly repeating it. Different scheduling strategies are available: `Every`, `ExponentialBackoff`, `FibonacciBackoff`, `RepeatUntilFailure`, `RepeatUntilSuccess`.
+ - This allows implementing easily proactive like systems, like a game loop.
+
+{{< mermaid class="smaller" >}}
classDiagram
class `Task[+T]` {
+apply(body: (Async, AsyncOperations) ?=> T) Task[T]$
@@ -20,7 +37,12 @@ classDiagram
}
{{< /mermaid >}}
-{{< mermaid >}}
+- To avoid the _work stealing behavior_ of channels consumers, a `ChannelMultiplexer` can be used. It is essentially a container of producing and consuming channels, which can be added and removed at runtime. Internally, it is implemented with a thread that continuously races the set of publishers and once it reads a value, it forwards it to each subscriber channel.
+ - Order is guaranteed only per producer;
+ - Typically, the consumer creates a channel and adds it to the multiplexer, then start reading from it, possibly using a scheduled task.
+ - if the consumer attaches the channel after the producer started, the values sent during this interval are lost, like _hot observables_ in Rx.
+
+{{< mermaid class="smallest" >}}
classDiagram
namespace javaio {
class Closeable {
@@ -41,20 +63,90 @@ classDiagram
Closeable <|-- `ChannelMultiplexer[T]`
{{< /mermaid >}}
-![expected result](../../res/img/rears.svg)
+In the proposed strawman Scala Gears library, there are no other kind of abstractions, neither a way to manipulate channels with functions inspired by Rx.
+
+The attempt was to somehow extend this framework adding first class support for the concept of `Producer` and `Consumer` and implement some of the most common Rx operators, just as a proof of concept, completely leaving out performances concerns.
+
+[Sources can be found in the `rears` submodule]
+
+```scala
+/** A publisher, i.e. a runnable active entity producing items on a channel. */
+trait Publisher[E]:
+ /** The [[Channel]] to send items to. */
+ protected val channel: Channel[E] = UnboundedChannel()
+
+ /** @return a runnable [[Task]]. */
+ def asRunnable: Task[Unit]
+
+ /** @return a [[ReadableChannel]] where produced items are placed. */
+ def publishingChannel: ReadableChannel[E] = channel.asReadable
+
+/** A consumer, i.e. a runnable active entity devoted to consuming data from a channel. */
+trait Consumer[E]:
+ /** The [[SendableChannel]] to send items to. */
+ val listeningChannel: SendableChannel[Try[E]] = UnboundedChannel()
+
+ /** @return a runnable [[Task]]. */
+ def asRunnable: Task[Unit] = Task {
+ listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach(react)
+ }.schedule(RepeatUntilFailure())
+
+ /** The suspendable reaction triggered upon a new read of an item succeeds. */
+ protected def react(e: Try[E])(using Async): Unit
+
+/** A mixin to make consumers stateful. */
+trait State[E]:
+ consumer: Consumer[E] =>
+
+ private var _state: Option[E] = None
+
+ def state: Option[E] = synchronized(_state)
+
+ override def asRunnable: Task[Unit] = Task {
+ listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach { e =>
+ react(e)
+ synchronized { _state = e.toOption }
+ }
+ }.schedule(RepeatUntilFailure())
+```
+
+```scala
+object Controller:
+
+ def oneToMany[T, R](
+ publisherChannel: ReadableChannel[T],
+ consumers: Set[Consumer[R]],
+ transformation: PipelineTransformation[T, R] = identity,
+ ): Task[Unit] = Task:
+ val multiplexer = ChannelMultiplexer[R]()
+ consumers.foreach(c => multiplexer.addSubscriber(c.listeningChannel))
+ multiplexer.addPublisher(transformation(publisherChannel))
+ // blocking call: the virtual thread on top of which this task is
+ // executed needs to block to continue publishing publisher's events
+ // towards the consumer by means of the multiplexer.
+ multiplexer.run()
+```
---
-Gears:
+Implemented transformation functions:
-- Task scheduling
- - Pro and cons
- - proactiveness
- - `Producer` + `Consumer` design
-- Manipulation of channels with functions inspired by Rx
+- `filter`
+- `debounce`
+- `groupBy`
+- `buffer`
+- `bufferWithin`
-Kotlin:
+Pay attention to: Async ?=>
+
+---
-- flows
+Going back to the example, here is presented a schema summarizing the proposed design of the system.
+
+![system design of the example](../../res/img/rears.svg)
---
+
+### Kotlin Coroutines version
+
+## Conclusions
diff --git a/docs/content/res/img/rears.svg b/docs/content/res/img/rears.svg
index f43bcad0..7b9d829b 100644
--- a/docs/content/res/img/rears.svg
+++ b/docs/content/res/img/rears.svg
@@ -1,4 +1,4 @@
-
\ No newline at end of file
+
HUB Receiver
Temperature
Temperature
Luminosity
groupBy
Multiplexer
buffer
pipeline of transformations
Lamps Controller
Thermostat
Sensor Health Checker
\ No newline at end of file
diff --git a/docs/content/res/schemas/.$diagrams.drawio.bkp b/docs/content/res/schemas/.$diagrams.drawio.bkp
index bc74c7ae..40c998b2 100644
--- a/docs/content/res/schemas/.$diagrams.drawio.bkp
+++ b/docs/content/res/schemas/.$diagrams.drawio.bkp
@@ -1,4 +1,4 @@
-
+
@@ -108,11 +108,11 @@
-
+
-
+
@@ -124,7 +124,7 @@
-
+
@@ -143,7 +143,7 @@
-
+
@@ -163,92 +163,103 @@
-
-
+
+
-
+
-
-
+
+
-
+
-
+
-
+
-
+
-
+
-
-
+
+
-
-
+
+
-
-
+
+
-
+
-
-
+
+
-
-
+
+
-
+
-
+
-
+
-
+
-
+
-
+
-
-
+
+
-
-
+
+
-
-
+
+
-
+
-
-
+
+
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/content/res/schemas/diagrams.drawio b/docs/content/res/schemas/diagrams.drawio
index 882328e2..7af3561e 100644
--- a/docs/content/res/schemas/diagrams.drawio
+++ b/docs/content/res/schemas/diagrams.drawio
@@ -1,4 +1,4 @@
-
+
@@ -108,29 +108,29 @@
-
+
-
+
-
+
-
+
-
+
-
-
+
+
@@ -143,7 +143,7 @@
-
+
@@ -163,42 +163,42 @@
-
+
-
+
-
+
-
-
+
+
-
-
+
+
-
-
+
+
-
+
-
+
-
+
-
+
-
+
-
+
@@ -208,38 +208,38 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
@@ -249,15 +249,15 @@
-
+
-
+
-
+
diff --git a/docs/static/analyzer-e2e.png b/docs/static/analyzer-e2e.png
deleted file mode 100644
index 14104145..00000000
Binary files a/docs/static/analyzer-e2e.png and /dev/null differ
diff --git a/rears-core/src/main/scala/io/github/tassiLuca/rears/Boundary.scala b/rears-core/src/main/scala/io/github/tassiLuca/rears/Boundary.scala
index eb7b9fb1..d405232e 100644
--- a/rears-core/src/main/scala/io/github/tassiLuca/rears/Boundary.scala
+++ b/rears-core/src/main/scala/io/github/tassiLuca/rears/Boundary.scala
@@ -5,23 +5,39 @@ import gears.async.TaskSchedule.RepeatUntilFailure
import scala.util.Try
+/** A publisher, i.e. a runnable active entity producing items on a channel. */
trait Publisher[E]:
- protected val channel: Channel[E] = UnboundedChannel[E]()
+ /** The [[Channel]] to send items to. */
+ protected val channel: Channel[E] = UnboundedChannel()
+
+ /** @return a runnable [[Task]]. */
def asRunnable: Task[Unit]
+
+ /** @return a [[ReadableChannel]] where produced items are placed. */
def publishingChannel: ReadableChannel[E] = channel.asReadable
+/** A consumer, i.e. a runnable active entity devoted to consuming data from a channel. */
trait Consumer[E]:
+
+ /** The [[SendableChannel]] to send items to. */
val listeningChannel: SendableChannel[Try[E]] = UnboundedChannel()
+
+ /** @return a runnable [[Task]]. */
def asRunnable: Task[Unit] = Task {
listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach(react)
}.schedule(RepeatUntilFailure())
+
+ /** The suspendable reaction triggered upon a new read of an item succeeds. */
protected def react(e: Try[E])(using Async): Unit
+/** A mixin to make consumers stateful. */
trait State[E]:
consumer: Consumer[E] =>
private var _state: Option[E] = None
+
def state: Option[E] = synchronized(_state)
+
override def asRunnable: Task[Unit] = Task {
listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach { e =>
react(e)