diff --git a/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByFlows.kt b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByFlows.kt index ed8baec7..7aad814e 100644 --- a/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByFlows.kt +++ b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByFlows.kt @@ -32,7 +32,6 @@ internal class GitHubAnalyzerByFlows(private val provider: GitHubRepositoryProvi } } - @Suppress("InjectDispatcher") private fun analyzeAll(repositories: List): Flow = channelFlow { repositories.forEach { repository -> launch { @@ -42,5 +41,5 @@ internal class GitHubAnalyzerByFlows(private val provider: GitHubRepositoryProvi } } } - }.flowOn(Dispatchers.Default) + } } diff --git a/docs/content/docs/03-channels.md b/docs/content/docs/03-channels.md index fbc1998c..4d962d1a 100644 --- a/docs/content/docs/03-channels.md +++ b/docs/content/docs/03-channels.md @@ -120,7 +120,7 @@ object TerminableChannel: override def close(): Unit = c.close() - override def terminate()(using Async): Unit = uninterruptible: + override def terminate()(using Async): Unit = try send(Terminated) // It happens only at the close of the channel due to the call (inside Gears // library) of a CellBuf.dequeue(channels.scala:239) which is empty! @@ -280,8 +280,7 @@ class MonadicAppController extends AppController: The interfaces of the Direct Style with Gears differ from the monadic one by their return type, which is a simpler `Either` data type, and by the fact they are **suspendable functions**, hence they require an Async context to be executed. This is the first important difference: the `analyze` method, differently from the monadic version, doesn't return immediately the control; instead, it suspends the execution of the client until the result is available (though offering the opportunity to react to each update). -This obeys the principle of **explicit asynchrony**: if the client wants to perform this operation asynchronously, it has to opt in explicitly, either using a `Future` or any other asynchronous construct (depending on the library used). -Moreover, this interface is library-agnostic, meaning that it doesn't depend on any specific asynchronous library. +This obeys the principle of **explicit asynchrony**: if the client wants to perform this operation asynchronously, it has to opt in explicitly, using a `Future`. ```scala trait Analyzer: @@ -331,18 +330,18 @@ Indeed, the GitHub API, like many ReSTful APIs, implements _pagination_: if the Until now, the `RepositoryService` has been implemented to return the whole results in one shot, leading to suspension until all pages are retrieved. It would be desirable, instead, to start performing the analysis as soon as one page is obtained from the API. -To do so, the interface of the `RepositoryService` has been extended with new methods, `incremental***`, returning a terminable channel of results: +To do so, the interface of the `RepositoryService` has been extended with new methods, `incremental***`, returning a `TerminableChannel` of results: ```scala trait RepositoryService: def incrementalRepositoriesOf( organizationName: String, - )(using Async): ReadableChannel[Terminable[Either[String, Repository]]] + )(using Async): TerminableChannel[Either[String, Repository]] def incrementalContributorsOf( organizationName: String, repositoryName: String, - )(using Async): ReadableChannel[Terminable[Either[String, Contribution]]] + )(using Async): TerminableChannel[Either[String, Contribution]] // ... @@ -354,28 +353,37 @@ Then, the implementation of the `analyze` method becomes: override def analyze(organizationName: String)( updateResults: RepositoryReport => Unit, )(using Async): Either[String, Seq[RepositoryReport]] = either: - val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName) - val collector = MutableCollector[RepositoryReport]() - var collectedRepositories = 0 - // suspend until all are retrieved - reposInfo.foreach { repository => - collector += repository.?.performAnalysis - collectedRepositories = collectedRepositories + 1 - } - (0 until collectedRepositories).map { _ => - val report = collector.results.read().?.awaitResult.? - updateResults(report) - report + val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName) // 1 + var allReports = Seq[RepositoryReport]() + var futures = Seq[Future[Unit]]() + reposInfo.foreach { repository => // 2 + futures = futures :+ Future: // 3 + val report = repository.?.performAnalysis.awaitResult.? + updateResults(report) + allReports = allReports :+ report } + futures.awaitAllOrCancel // 4 + allReports ``` -Note in this implementation the `foreach` method has been used to iterate over all the returned `TerminableChannel` as soon as they are retrieved by the service and start the analysis in a corresponding `Future`. These are gathered in a `MutableCollector` (a mutable version of the previous `Collector`) and their results are read from the channel as they come. +1. we get the channel of repositories from the repository service; +2. the `foreach` method of `TerminableChannel` is used to iterate over all the repositories sent over the channel as soon as they are retrieved by the service. This is a blocking operation, i.e. it suspends until all the repositories are retrieved; +3. we start the analysis in a separate `Future` (i.e. thread): this allows you to start the analysis as soon as a repository is fetched by the channel, preventing starting the analysis of the next repository only when the previous one is finished; +4. once all the repositories are retrieved, i.e. the `foreach` terminates, we wait for the completion of all the started `Future`s. Indeed, when the `foreach` terminates, we have the guarantee that all started futures have been started, but not yet completed! + +--- + +To start the application: -Despite the improvement, this is not yet the best solution: as soon as the repositories are retrieved the corresponding analysis is started, but the update of the results is performed only when all the repositories have been analyzed. +```bash +./gradlew analyzer-direct:run +``` + +--- ### Kotlin Coroutines version -The analyzer interface reflects the Scala one: a `Result` in place of `Either` is used, and the suspendable function `udateResults` is marked with the `suspend` keyword in place of the `using Async` context. +The analyzer interface reflects the Scala Gears one: a `Result` is used in place of `Either`, and the suspendable function `udateResults` is marked with the `suspend` keyword in place of the `using Async` context. ```kotlin interface Analyzer { @@ -388,7 +396,7 @@ interface Analyzer { Its channel-based implementation, despite syntactic differences, is also very similar to that of Scala Gears, at least conceptually: -1. get all the repositories; +1. we get all the repositories; 2. for each of them, an analysis is started to retrieve the contributors and the last release; * each analysis is started in a separate coroutine whose results are sent to a channel; * as usual, the contributors and the last release are retrieved concurrently, using the `async` coroutine builder; @@ -439,7 +447,21 @@ Where, instead, Kotlin Coroutines shine is the implementation of the `Repository Indeed, Kotlin has a built-in support for cold streams, called **`Flow`**. They are very similar (actually they have been inspired to) cold observable in reactive programming, and **they are the perfect fit for functions that need to return a stream of asynchronously computed values**. -They offer several useful operators for transforming and combining them functionally. An overview of the most common operators is provided in the following section. +The `RepositoryService` has been here extended with new methods, `flowing***`, returning a `Flow` of results: + +```kotlin +class GitHubRepositoryProvider { + + fun flowingRepositoriesOf(organizationName: String): Flow> + + fun flowingContributorsOf(organizationName: String, repositoryName: String): Flow> +} +``` + +As already mentioned, the `Flow` is a **cold stream**, meaning that it is **not** started until it is **`collect`ed**. Once the `collect` method is called a new stream is created and data starts to "flow". + + +They offer several useful operators for transforming and combining them functionally (not a complete list): {{< columns >}} @@ -469,21 +491,9 @@ They offer several useful operators for transforming and combining them function - `combine` - `flatMapConcat` / `flatMapMerge` to transform each value into a flow and then concatenate/merge them; - {{< /columns >}} -The `RepositoryService` has been here extended with new methods, `flowing***`, returning a `Flow` of results: - -```kotlin -class GitHubRepositoryProvider { - - fun flowingRepositoriesOf(organizationName: String): Flow> - - fun flowingContributorsOf(organizationName: String, repositoryName: String): Flow> -} -``` - -As already mentioned, the `Flow` is a **cold stream**, meaning that it is **not** started until it is **`collect`ed**. Once the `collect` is called a new cold stream is created and data starts to "flow". +Moreover, like in Rx, it is possible to control the context in which the flow is executed using the `flowOn` operator, which changes the context for all the steps above it (so it is typically used as the last step in a function). ```kotlin override suspend fun analyze( @@ -491,9 +501,9 @@ override suspend fun analyze( updateResults: suspend (RepositoryReport) -> Unit, ): Result> = coroutineScope { runCatching { - val reports = provider.flowingRepositoriesOf(organizationName) // 1 - .flatMapConcat { analyzeAll(it) } // 2 - .flowOn(Dispatchers.Default) // 3 + val reports = provider.flowingRepositoriesOf(organizationName) + .flatMapConcat { analyzeAll(it) } + .flowOn(Dispatchers.Default) var allReports = emptySet() // until here just "configuration" reports.collect { @@ -505,6 +515,91 @@ override suspend fun analyze( } ``` +## Introducing `Flow`s in Gears + +{{< hint info >}} + +A similar abstraction of Kotlin `Flow`s can be implemented in Scala Gears leveraging `Task`s and `TerminableChannel`s. +The following section describes the attempt made to implement it and what has been achieved. + +{{< /hint >}} + +- When building the `Flow`, the client provides a block of code through which emits values, which is wrapped inside a `Task` that is started only when the `collect` method is called; +- The values are sent (emitted) on a `TerminableChannel` which is created when the `collect` method is called; + - the behavior of the `emit` method is defined inside the `apply` method of `Flow` and injected inside caller code via the context parameter `(it: FlowCollector[T]) ?=>`. +- Once the task has finished, the channel is terminated. + +[[Source code can be found in `commons` submodule, `pimpimg` package](https://github.com/tassiLuca/PPS-22-direct-style-experiments/blob/master/commons/src/main/scala/io/github/tassiLuca/pimping/Flow.scala).] + +```scala +/** An asynchronous cold data stream that emits values, inspired to Kotlin Flows. */ +trait Flow[+T]: + + /** Start the flowing of data which can be collected reacting through the given [[collector]] function. */ + def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit + +/** An interface modeling an entity capable of [[emit]]ting [[Flow]]s values. */ +trait FlowCollector[-T]: + + /** Emits a value to the flow. */ + def emit(value: T)(using Async): Unit + +object Flow: + + /** Creates a new asynchronous cold [[Flow]] from the given [[body]]. + * Since it is cold, it starts emitting values only when the [[Flow.collect]] method is called. + * To emit a value use the [[FlowCollector]] given instance. + */ + def apply[T](body: (it: FlowCollector[T]) ?=> Unit): Flow[T] = + val flow = FlowImpl[T]() + flow.task = Task: + val channel = flow.channel + flow.sync.release() + val collector: FlowCollector[T] = new FlowCollector[T]: + override def emit(value: T)(using Async): Unit = channel.send(Success(value)) + try body(using collector) + catch case e: Exception => channel.send(Failure(e)) + flow + + private class FlowImpl[T] extends Flow[T]: + private[Flow] var task: Task[Unit] = uninitialized + private[Flow] var channel: TerminableChannel[Try[T]] = uninitialized + private[Flow] val sync = Semaphore(0) + + override def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit = + val myChannel = TerminableChannel.ofUnbounded[Try[T]] + synchronized: + channel = myChannel + task.run.onComplete(() => myChannel.terminate()) + // Ensure to leave the synchronized block after the task has been initialized + // with the correct channel instance. + sync.acquire() + myChannel.foreach(t => collector(t)) +``` + +`map` and `flatMap` have been implemented on top of `Flow`: + +```scala +object FlowOps: + + extension [T](flow: Flow[T]) + + /** @return a new [[Flow]] whose values has been transformed according to [[f]]. */ + def map[R](f: T => R): Flow[R] = new Flow[R]: + override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit = + catchFailure(collector): + flow.collect(item => collector(Success(f(item.get)))) + + /** @return a new [[Flow]] whose values are created by flattening the flows generated + * by the given function [[f]] applied to each emitted value of this. + */ + def flatMap[R](f: T => Flow[R]): Flow[R] = new Flow[R]: + override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit = + catchFailure(collector): + flow.collect(item => f(item.get).collect(x => collector(Success(x.get)))) +``` + + ## Conclusions > - `Channel`s are the basic communication and synchronization primitive for exchanging data between `Future`s/`Coroutines`.