Skip to content

Commit

Permalink
docs: add more on flows
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Mar 3, 2024
1 parent 10ef545 commit 800f2b4
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ internal class GitHubAnalyzerByFlows(private val provider: GitHubRepositoryProvi
}
}

@Suppress("InjectDispatcher")
private fun analyzeAll(repositories: List<Repository>): Flow<RepositoryReport> = channelFlow {
repositories.forEach { repository ->
launch {
Expand All @@ -42,5 +41,5 @@ internal class GitHubAnalyzerByFlows(private val provider: GitHubRepositoryProvi
}
}
}
}.flowOn(Dispatchers.Default)
}
}
173 changes: 134 additions & 39 deletions docs/content/docs/03-channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object TerminableChannel:

override def close(): Unit = c.close()

override def terminate()(using Async): Unit = uninterruptible:
override def terminate()(using Async): Unit =
try send(Terminated)
// It happens only at the close of the channel due to the call (inside Gears
// library) of a CellBuf.dequeue(channels.scala:239) which is empty!
Expand Down Expand Up @@ -280,8 +280,7 @@ class MonadicAppController extends AppController:

The interfaces of the Direct Style with Gears differ from the monadic one by their return type, which is a simpler `Either` data type, and by the fact they are **suspendable functions**, hence they require an Async context to be executed.
This is the first important difference: the `analyze` method, differently from the monadic version, doesn't return immediately the control; instead, it suspends the execution of the client until the result is available (though offering the opportunity to react to each update).
This obeys the principle of **explicit asynchrony**: if the client wants to perform this operation asynchronously, it has to opt in explicitly, either using a `Future` or any other asynchronous construct (depending on the library used).
Moreover, this interface is library-agnostic, meaning that it doesn't depend on any specific asynchronous library.
This obeys the principle of **explicit asynchrony**: if the client wants to perform this operation asynchronously, it has to opt in explicitly, using a `Future`.

```scala
trait Analyzer:
Expand Down Expand Up @@ -331,18 +330,18 @@ Indeed, the GitHub API, like many ReSTful APIs, implements _pagination_: if the
Until now, the `RepositoryService` has been implemented to return the whole results in one shot, leading to suspension until all pages are retrieved.
It would be desirable, instead, to start performing the analysis as soon as one page is obtained from the API.

To do so, the interface of the `RepositoryService` has been extended with new methods, `incremental***`, returning a terminable channel of results:
To do so, the interface of the `RepositoryService` has been extended with new methods, `incremental***`, returning a `TerminableChannel` of results:

```scala
trait RepositoryService:
def incrementalRepositoriesOf(
organizationName: String,
)(using Async): ReadableChannel[Terminable[Either[String, Repository]]]
)(using Async): TerminableChannel[Either[String, Repository]]

def incrementalContributorsOf(
organizationName: String,
repositoryName: String,
)(using Async): ReadableChannel[Terminable[Either[String, Contribution]]]
)(using Async): TerminableChannel[Either[String, Contribution]]

// ...

Expand All @@ -354,28 +353,37 @@ Then, the implementation of the `analyze` method becomes:
override def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async): Either[String, Seq[RepositoryReport]] = either:
val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName)
val collector = MutableCollector[RepositoryReport]()
var collectedRepositories = 0
// suspend until all are retrieved
reposInfo.foreach { repository =>
collector += repository.?.performAnalysis
collectedRepositories = collectedRepositories + 1
}
(0 until collectedRepositories).map { _ =>
val report = collector.results.read().?.awaitResult.?
updateResults(report)
report
val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName) // 1
var allReports = Seq[RepositoryReport]()
var futures = Seq[Future[Unit]]()
reposInfo.foreach { repository => // 2
futures = futures :+ Future: // 3
val report = repository.?.performAnalysis.awaitResult.?
updateResults(report)
allReports = allReports :+ report
}
futures.awaitAllOrCancel // 4
allReports
```

Note in this implementation the `foreach` method has been used to iterate over all the returned `TerminableChannel` as soon as they are retrieved by the service and start the analysis in a corresponding `Future`. These are gathered in a `MutableCollector` (a mutable version of the previous `Collector`) and their results are read from the channel as they come.
1. we get the channel of repositories from the repository service;
2. the `foreach` method of `TerminableChannel` is used to iterate over all the repositories sent over the channel as soon as they are retrieved by the service. This is a blocking operation, i.e. it suspends until all the repositories are retrieved;
3. we start the analysis in a separate `Future` (i.e. thread): this allows you to start the analysis as soon as a repository is fetched by the channel, preventing starting the analysis of the next repository only when the previous one is finished;
4. once all the repositories are retrieved, i.e. the `foreach` terminates, we wait for the completion of all the started `Future`s. Indeed, when the `foreach` terminates, we have the guarantee that all started futures have been started, but not yet completed!

---

To start the application:

Despite the improvement, this is not yet the best solution: as soon as the repositories are retrieved the corresponding analysis is started, but the update of the results is performed only when all the repositories have been analyzed.
```bash
./gradlew analyzer-direct:run
```

---

### Kotlin Coroutines version

The analyzer interface reflects the Scala one: a `Result` in place of `Either` is used, and the suspendable function `udateResults` is marked with the `suspend` keyword in place of the `using Async` context.
The analyzer interface reflects the Scala Gears one: a `Result` is used in place of `Either`, and the suspendable function `udateResults` is marked with the `suspend` keyword in place of the `using Async` context.

```kotlin
interface Analyzer {
Expand All @@ -388,7 +396,7 @@ interface Analyzer {

Its channel-based implementation, despite syntactic differences, is also very similar to that of Scala Gears, at least conceptually:

1. get all the repositories;
1. we get all the repositories;
2. for each of them, an analysis is started to retrieve the contributors and the last release;
* each analysis is started in a separate coroutine whose results are sent to a channel;
* as usual, the contributors and the last release are retrieved concurrently, using the `async` coroutine builder;
Expand Down Expand Up @@ -439,7 +447,21 @@ Where, instead, Kotlin Coroutines shine is the implementation of the `Repository

Indeed, Kotlin has a built-in support for cold streams, called **`Flow`**. They are very similar (actually they have been inspired to) cold observable in reactive programming, and **they are the perfect fit for functions that need to return a stream of asynchronously computed values**.

They offer several useful operators for transforming and combining them functionally. An overview of the most common operators is provided in the following section.
The `RepositoryService` has been here extended with new methods, `flowing***`, returning a `Flow` of results:

```kotlin
class GitHubRepositoryProvider {

fun flowingRepositoriesOf(organizationName: String): Flow<List<Repository>>

fun flowingContributorsOf(organizationName: String, repositoryName: String): Flow<List<Contribution>>
}
```

As already mentioned, the `Flow` is a **cold stream**, meaning that it is **not** started until it is **`collect`ed**. Once the `collect` method is called a new stream is created and data starts to "flow".


They offer several useful operators for transforming and combining them functionally (not a complete list):

{{< columns >}}

Expand Down Expand Up @@ -469,31 +491,19 @@ They offer several useful operators for transforming and combining them function
- `combine`
- `flatMapConcat` / `flatMapMerge` to transform each value into a flow and then concatenate/merge them;


{{< /columns >}}

The `RepositoryService` has been here extended with new methods, `flowing***`, returning a `Flow` of results:

```kotlin
class GitHubRepositoryProvider {

fun flowingRepositoriesOf(organizationName: String): Flow<List<Repository>>

fun flowingContributorsOf(organizationName: String, repositoryName: String): Flow<List<Contribution>>
}
```

As already mentioned, the `Flow` is a **cold stream**, meaning that it is **not** started until it is **`collect`ed**. Once the `collect` is called a new cold stream is created and data starts to "flow".
Moreover, like in Rx, it is possible to control the context in which the flow is executed using the `flowOn` operator, which changes the context for all the steps above it (so it is typically used as the last step in a function).

```kotlin
override suspend fun analyze(
organizationName: String,
updateResults: suspend (RepositoryReport) -> Unit,
): Result<Set<RepositoryReport>> = coroutineScope {
runCatching {
val reports = provider.flowingRepositoriesOf(organizationName) // 1
.flatMapConcat { analyzeAll(it) } // 2
.flowOn(Dispatchers.Default) // 3
val reports = provider.flowingRepositoriesOf(organizationName)
.flatMapConcat { analyzeAll(it) }
.flowOn(Dispatchers.Default)
var allReports = emptySet<RepositoryReport>()
// until here just "configuration"
reports.collect {
Expand All @@ -505,6 +515,91 @@ override suspend fun analyze(
}
```

## Introducing `Flow`s in Gears

{{< hint info >}}

A similar abstraction of Kotlin `Flow`s can be implemented in Scala Gears leveraging `Task`s and `TerminableChannel`s.
The following section describes the attempt made to implement it and what has been achieved.

{{< /hint >}}

- When building the `Flow`, the client provides a block of code through which emits values, which is wrapped inside a `Task` that is started only when the `collect` method is called;
- The values are sent (emitted) on a `TerminableChannel` which is created when the `collect` method is called;
- the behavior of the `emit` method is defined inside the `apply` method of `Flow` and injected inside caller code via the context parameter `(it: FlowCollector[T]) ?=>`.
- Once the task has finished, the channel is terminated.

[[Source code can be found in `commons` submodule, `pimpimg` package](https://github.com/tassiLuca/PPS-22-direct-style-experiments/blob/master/commons/src/main/scala/io/github/tassiLuca/pimping/Flow.scala).]

```scala
/** An asynchronous cold data stream that emits values, inspired to Kotlin Flows. */
trait Flow[+T]:

/** Start the flowing of data which can be collected reacting through the given [[collector]] function. */
def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit

/** An interface modeling an entity capable of [[emit]]ting [[Flow]]s values. */
trait FlowCollector[-T]:

/** Emits a value to the flow. */
def emit(value: T)(using Async): Unit

object Flow:

/** Creates a new asynchronous cold [[Flow]] from the given [[body]].
* Since it is cold, it starts emitting values only when the [[Flow.collect]] method is called.
* To emit a value use the [[FlowCollector]] given instance.
*/
def apply[T](body: (it: FlowCollector[T]) ?=> Unit): Flow[T] =
val flow = FlowImpl[T]()
flow.task = Task:
val channel = flow.channel
flow.sync.release()
val collector: FlowCollector[T] = new FlowCollector[T]:
override def emit(value: T)(using Async): Unit = channel.send(Success(value))
try body(using collector)
catch case e: Exception => channel.send(Failure(e))
flow

private class FlowImpl[T] extends Flow[T]:
private[Flow] var task: Task[Unit] = uninitialized
private[Flow] var channel: TerminableChannel[Try[T]] = uninitialized
private[Flow] val sync = Semaphore(0)

override def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit =
val myChannel = TerminableChannel.ofUnbounded[Try[T]]
synchronized:
channel = myChannel
task.run.onComplete(() => myChannel.terminate())
// Ensure to leave the synchronized block after the task has been initialized
// with the correct channel instance.
sync.acquire()
myChannel.foreach(t => collector(t))
```

`map` and `flatMap` have been implemented on top of `Flow`:

```scala
object FlowOps:

extension [T](flow: Flow[T])

/** @return a new [[Flow]] whose values has been transformed according to [[f]]. */
def map[R](f: T => R): Flow[R] = new Flow[R]:
override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit =
catchFailure(collector):
flow.collect(item => collector(Success(f(item.get))))

/** @return a new [[Flow]] whose values are created by flattening the flows generated
* by the given function [[f]] applied to each emitted value of this.
*/
def flatMap[R](f: T => Flow[R]): Flow[R] = new Flow[R]:
override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit =
catchFailure(collector):
flow.collect(item => f(item.get).collect(x => collector(Success(x.get))))
```


## Conclusions

> - `Channel`s are the basic communication and synchronization primitive for exchanging data between `Future`s/`Coroutines`.
Expand Down

0 comments on commit 800f2b4

Please sign in to comment.