Skip to content

Commit

Permalink
docs: more on terminable channels
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Mar 3, 2024
1 parent 63be07a commit 64e4647
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 11 deletions.
124 changes: 114 additions & 10 deletions docs/content/docs/03-channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ classDiagram

The channel is defined through three distinct interfaces: `SendableChannel[-T]`, `ReadableChannel[+T]` and `Channel[T]`, where the latter extends from both `SendableChannel` and `ReadableChannel`. Typically, a `Channel` is created and a `SendableChannel` and `ReadableChannel` instances are respectively provided to the producer and the consumer, restricting their access to it. The same, almost identical, design is present also in Kotlin Coroutines where `SendChannel` and `ReceiveChannel` take over, respectively, the Gears `SendableChannel` and `ReadableChannel`.

Moreover, `Channel` inherits from `java.io.Closable`, making them closable objects: once closed, they raise `ChannelClosedException` when attempting to write to them and immediately return a `Left(Closed)` when attempting to read from them, preventing the consumer from finishing reading all the values sent on the channel before its closing.
{{< hint warning >}}

`Channel` inherits from `java.io.Closable`, making them closable objects: once closed, they raise `ChannelClosedException` when attempting to write to them and immediately return a `Left(Closed)` when attempting to read from them, preventing the consumer from finishing reading all the values sent on the channel before its closing.
This is not the case for Kotlin Coroutines where closing a channel indicates that no more values are coming, but doesn't prevent consuming already sent values. Moreover, in Kotlin is possible to use a regular for loop to receive elements from a channel (blocking the coroutine):

{{< /hint >}}

```kotlin
val channel = Channel<Int>()
launch {
Expand All @@ -57,9 +61,107 @@ for (y in channel) println(y) // blocks until channel is closed
println("Done!")
```

A similar behavior can be achieved also in Gears pimping the framework with the concept of `Terminable` channel. After all, closing a channel in coroutines is a matter of sending a special token to it, allowing stop the iteration as soon as this token is received.
{{< hint info >}}

Similar behavior can be achieved also in Gears extending the framework with the concept of **`Terminable`** channel. After all, closing a channel in coroutines is a matter of sending a special token to it, allowing stop the iteration as soon as this token is received.

{{< /hint >}}

[[The full implementation can be found in `commons` submodule](https://github.com/tassiLuca/PPS-22-direct-style-experiments/blob/master/commons/src/main/scala/io/github/tassiLuca/pimping/TerminableChannel.scala).]

```scala
/** A token to be sent to a channel to signal that it has been terminated. */
case object Terminated

type Terminated = Terminated.type

/** A union type of [[T]] and [[Terminated]]. */
type Terminable[T] = T | Terminated

/** Exception being raised by [[TerminableChannel.send()]] on terminated [[TerminableChannel]]. */
class ChannelTerminatedException extends Exception

/** A [[Channel]] that can be terminated, signalling no more items will be sent,
* still allowing to consumer to read pending values.
* Trying to `send` values after its termination arise a [[ChannelTerminatedException]].
* When one consumer reads the [[Terminated]] token, the channel is closed. Any subsequent
* read will return `Left(Channel.Closed`.
*/
trait TerminableChannel[T] extends Channel[Terminable[T]]:
def terminate()(using Async): Unit

object TerminableChannel:

/** Creates a [[TerminableChannel]] backed to [[SyncChannel]]. */
def ofSync[T: ClassTag]: TerminableChannel[T] = TerminableChannelImpl(SyncChannel())

/** Creates a [[TerminableChannel]] backed to [[BufferedChannel]]. */
def ofBuffered[T: ClassTag]: TerminableChannel[T] = TerminableChannelImpl(BufferedChannel())

/** Creates a [[TerminableChannel]] backed to an [[UnboundedChannel]]. */
def ofUnbounded[T: ClassTag]: TerminableChannel[T] = TerminableChannelImpl(UnboundedChannel())

private class TerminableChannelImpl[T: ClassTag](c: Channel[Terminable[T]]) extends TerminableChannel[T]:
opaque type Res[R] = Either[Channel.Closed, R]

private var _terminated: Boolean = false

override val readSource: Async.Source[Res[Terminable[T]]] =
c.readSource.transformValuesWith {
case Right(Terminated) => c.close(); Left(Channel.Closed)
case v @ _ => v
}

override def sendSource(x: Terminable[T]): Async.Source[Res[Unit]] =
synchronized:
if _terminated then throw ChannelTerminatedException()
else if x == Terminated then _terminated = true
c.sendSource(x)

override def close(): Unit = c.close()

override def terminate()(using Async): Unit = uninterruptible:
try send(Terminated)
// It happens only at the close of the channel due to the call (inside Gears
// library) of a CellBuf.dequeue(channels.scala:239) which is empty!
catch case e: NoSuchElementException => e.printStackTrace()
```

Now, also in Scala with Gears is possible to write:

```scala
val channel = TerminableChannel.ofUnbounded[Int]
Future:
(0 until 10).foreach(x => channel.send(x))
channel.terminate() // we're done sending
channel.foreach(println(_)) // blocks until channel is closed
println("Done!")
```

[[Other tests can be found in `TerminableChannelTest`]().]

`TBD`
On top of this new abstraction is possible to implement, for example, the `foreach` and `toSeq` methods, which can be useful to wait for all the items sent over the channel.

```scala
object TerminableChannelOps:

extension [T: ClassTag](c: TerminableChannel[T])
/** Blocking consume channel items, executing the given function [[f]] for each element. */
@tailrec
def foreach[U](f: T => U)(using Async): Unit = c.read() match
case Left(Channel.Closed) => ()
case Right(value) =>
value match
case Terminated => ()
case v: T => f(v); foreach(f)

/** @return a [[Seq]] containing channel items, after having them read.
* This is a blocking operation! */
def toSeq(using Async): Seq[T] =
var results = Seq[T]()
c.foreach(t => results = results :+ t)
results
```

Three types of channels exist:

Expand All @@ -76,21 +178,23 @@ Three types of channels exist:
- if the programs run out of memory you can get an out-of-memory exception!
- in Kotlin they are called **Unlimited Channel**.

Kotlin offers also a fourth type: the **Conflated Channel**, where every new element sent to it overwirtes the previously sent one, *never blocking*, so that the receiver gets always the latest element.
Kotlin offers also a fourth type: the **Conflated Channel**, where every new element sent to it overwrites the previously sent one, *never blocking*, so that the receiver gets always the latest element.

Concerning channel behavior, it is important to note that:

> 1. Multiple producers can send data to the channel, as well as multiple consumers can read them, **but each element is handled only _once_, by _one_ of them**, i.e. consumers **compete** with each other for sent values;
> 2. Once the element is handled, it is immediately removed from the channel;
> 3. Fairness: `TBD`
{{< hint info >}}

1. Multiple producers can send data to the channel, as well as multiple consumers can read them, **but each element is handled only _once_, by _one_ of them**, i.e. consumers **compete** with each other for sent values;
2. Once the element is handled, it is immediately removed from the channel;
* Channels are fair: `send` and `read` operations to channels are fair w.r.t. the order of their invocations from multiple threads (they are served in first-in first-out order).

{{< /hint >}}

## Analyzer example

To show channels in action an example has been prepared:

{{< hint info >}}
**Idea**: we want to realize a little asynchronous library allowing clients to collect the common statistics about repositories (issues, stars, last release) and contributors of a given GitHub organization.
{{< /hint >}}
> **Idea**: we want to realize a little asynchronous library allowing clients to collect the common statistics about repositories (issues, stars, last release) and contributors of a given GitHub organization.
The final result is a GUI application that, given an organization name, starts the analysis of all its repositories,
listing their information along with all their contributors as soon as they are computed. Moreover, the application allows the user to cancel the current computation at any point in time.
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/04-rears.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ Example:

Going back to the example here is presented a schema summarizing the flows of data and the transformations to apply to them. This is just a simple example used to test the proposed abstractions.

![system design of the example](../../res/img/rears.svg)
{{< figure src="../../res/img/rears.svg" alt="System design of the example" width="90%" class="center" >}}

[[Sources are available in `smart-hub-direct` submodule](https://github.com/tassiLuca/PPS-22-direct-style-experiments/tree/master/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub)].

Expand Down

0 comments on commit 64e4647

Please sign in to comment.