Skip to content

Commit

Permalink
feat(flows): add map and flatMap operators
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Mar 3, 2024
1 parent f1b9d32 commit 10ef545
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ private class IncrementalAnalyzer(repositoryService: RepositoryService) extends
)(using Async): Either[String, Seq[RepositoryReport]] = either:
val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName)
var allReports = Seq[RepositoryReport]()
var fs = Seq[Future[Unit]]()
var futures = Seq[Future[Unit]]()
reposInfo.foreach { repository =>
val f = Future:
futures = futures :+ Future:
val report = repository.?.performAnalysis.awaitResult.?
updateResults(report)
allReports = allReports :+ report
fs = fs :+ f
}
fs.awaitAllOrCancel
futures.awaitAllOrCancel
allReports
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.github.tassiLuca.analyzer.lib

import gears.async.{Async, ReadableChannel}
import gears.async.Async
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
import io.github.tassiLuca.pimping.TerminableChannel

Expand Down
41 changes: 38 additions & 3 deletions commons/src/main/scala/io/github/tassiLuca/pimping/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,25 @@ import scala.compiletime.uninitialized
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}

/** An asynchronous cold data stream that emits values, inspired to Kotlin Flows. */
trait Flow[+T]:

/** Start the flowing of data which can be collected reacting through the given [[collector]] function. */
def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit

/** An interface modeling an entity capable of [[emit]]ting [[Flow]]s values. */
trait FlowCollector[-T]:

/** Emits a value to the flow. */
def emit(value: T)(using Async): Unit

object Flow:
def apply[T: ClassTag](body: FlowCollector[T] ?=> Unit)(using Async): Flow[T] =

/** Creates a new asynchronous cold [[Flow]] from the given [[body]].
* Since it is cold, it starts emitting values only when the [[Flow.collect]] method is called.
* To emit a value use the [[FlowCollector]] given instance.
*/
def apply[T](body: (it: FlowCollector[T]) ?=> Unit): Flow[T] =
val flow = FlowImpl[T]()
flow.task = Task:
val channel = flow.channel
Expand All @@ -26,7 +37,7 @@ object Flow:
catch case e: Exception => channel.send(Failure(e))
flow

private class FlowImpl[T: ClassTag] extends Flow[T]:
private class FlowImpl[T] extends Flow[T]:
private[Flow] var task: Task[Unit] = uninitialized
private[Flow] var channel: TerminableChannel[Try[T]] = uninitialized
private[Flow] val sync = Semaphore(0)
Expand All @@ -36,5 +47,29 @@ object Flow:
synchronized:
channel = myChannel
task.run.onComplete(() => myChannel.terminate())
// Ensure to leave the synchronized block after the task has been initialized
// with the correct channel instance.
sync.acquire()
myChannel.foreach(t => collector(t)) // blocking!
myChannel.foreach(t => collector(t))

object FlowOps:

extension [T](flow: Flow[T])

/** @return a new [[Flow]] whose values has been transformed according to [[f]]. */
def map[R](f: T => R): Flow[R] = new Flow[R]:
override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit =
catchFailure(collector):
flow.collect(item => collector(Success(f(item.get))))

/** @return a new [[Flow]] whose values are created by flattening the flows generated
* by the given function [[f]] applied to each emitted value of this.
*/
def flatMap[R](f: T => Flow[R]): Flow[R] = new Flow[R]:
override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit =
catchFailure(collector):
flow.collect(item => f(item.get).collect(x => collector(Success(x.get))))

private inline def catchFailure[X](collector: Try[X] => Unit)(inline body: => Unit): Unit =
try body
catch case e: Exception => collector(Failure(e))
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ object TerminableChannel:

override def close(): Unit = c.close()

override def terminate()(using Async): Unit = uninterruptible:
override def terminate()(using Async): Unit =
try send(Terminated)
// It happens only at the close of the channel due to the call (inside Gears library) of
// a CellBuf.dequeue(channels.scala:239) which is empty!
catch case e: NoSuchElementException => e.printStackTrace()
catch case _: NoSuchElementException => () // e.printStackTrace()

object TerminableChannelOps:

Expand Down
96 changes: 83 additions & 13 deletions commons/src/test/scala/io/github/tassiLuca/pimping/FlowTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import gears.async.default.given
import gears.async.{Async, AsyncOperations, Future}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import io.github.tassiLuca.pimping.FlowOps.{map, flatMap}

import scala.collection
import scala.collection.immutable
import scala.util.{Try, Success, Failure}
import scala.util.{Try, Success}

class FlowTest extends AnyFunSpec with Matchers:

Expand All @@ -21,9 +22,7 @@ class FlowTest extends AnyFunSpec with Matchers:
val flow = simpleFlow
AsyncOperations.sleep(2_000)
emitted should be(empty)
flow.collect { value =>
emitted = emitted :+ value
}
flow.collect(value => emitted = emitted :+ value)
emitted shouldBe Seq.range(0, items).map(Success(_))
}

Expand All @@ -32,8 +31,8 @@ class FlowTest extends AnyFunSpec with Matchers:
var emitted2: Seq[Try[Item]] = Seq()
Async.blocking:
val flow = simpleFlow
flow.collect { value => emitted1 = emitted1 :+ value }
flow.collect { value => emitted2 = emitted2 :+ value }
flow.collect(value => emitted1 = emitted1 :+ value)
flow.collect(value => emitted2 = emitted2 :+ value)
emitted1 shouldBe Seq.range(0, items).map(Success(_))
emitted2 shouldBe Seq.range(0, items).map(Success(_))
}
Expand All @@ -44,9 +43,9 @@ class FlowTest extends AnyFunSpec with Matchers:
var emitted2: Seq[Try[Item]] = Seq()
val flow = simpleFlow
val f1 = Future:
flow.collect { value => emitted1 = emitted1 :+ value }
flow.collect(value => emitted1 = emitted1 :+ value)
val f2 = Future:
flow.collect { value => emitted2 = emitted2 :+ value }
flow.collect(value => emitted2 = emitted2 :+ value)
(f1 :: f2 :: Nil).awaitAll
emitted1 shouldBe Seq.range(0, items).map(Success(_))
emitted2 shouldBe Seq.range(0, items).map(Success(_))
Expand All @@ -55,7 +54,7 @@ class FlowTest extends AnyFunSpec with Matchers:
it("when throwing an exception inside the `body` should emit a failure and stop flowing") {
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
failingFlow.collect { value => emitted = emitted :+ value }
failingFlow.collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
Expand All @@ -65,16 +64,87 @@ class FlowTest extends AnyFunSpec with Matchers:
Async.blocking:
var fs = Seq[Future[Int]]()
simpleFlow.collect { v =>
fs = fs :+ Future { AsyncOperations.sleep(2_000); v.getOrElse(-1) }
fs = fs :+ Future:
AsyncOperations.sleep(2_000)
v.getOrElse(-1)
}
fs.awaitAll
fs.map(_.await) shouldBe Seq.range(0, items)
}

it("allows to cancel the task execution inside a collect") {
Async.blocking:
val longLastingFlow = Flow:
(0 until items).foreach { x =>
AsyncOperations.sleep(1_000); it.emit(x)
}
longLastingFlow.collect(v => if v == Success(3) then Async.current.group.cancel() else println(v))
}

describe("Flows `map`") {
it("should work") {
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
val f = (x: Item) => x * x
simpleFlow.map(f).collect(value => emitted = emitted :+ value)
emitted shouldBe Seq.range(0, items).map(f).map(Success(_))
}

it("should emit a `Failure` if an exception is thrown in the original flow") {
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
failingFlow.map(_ * 2).collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
}

it("should emit a `Failure` if an exception is thrown in the given function") {
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
val failingFunction = (_: Item) => throw IllegalStateException()
simpleFlow.map(failingFunction).collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
}
}

describe("Flows `flatMap`") {
it("should work") {
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
val f = (x: Item) => Flow { it.emit(x); it.emit(x + 1) }
alternatingFlow.flatMap(f).collect(value => emitted = emitted :+ value)
emitted shouldBe Seq.range(0, items).map(Success(_))
}

it("should emit a `Failure` if an exception is thrown in the original flow") {
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
failingFlow.flatMap(_ => alternatingFlow).collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
}

it("should emit a `Failure` if an exception is thrown in the given function") {
Async.blocking:
var emitted: Seq[Try[Item]] = Seq()
failingFlow.flatMap(_ => throw IllegalStateException()).collect(value => emitted = emitted :+ value)
emitted.size shouldBe 1
emitted.head.isFailure shouldBe true
intercept[IllegalStateException](emitted.head.get)
}
}
}

def simpleFlow(using Async): Flow[Item] = Flow:
(0 until items).foreach(i => summon[FlowCollector[Item]].emit(i))
(0 until items).foreach(it.emit(_))

def alternatingFlow(using Async): Flow[Item] = Flow:
(0 until items by 2).foreach(it.emit(_))

def failingFlow(using Async): Flow[Item] = Flow:
throw IllegalStateException("Something went wrong...")
summon[FlowCollector[Item]].emit(10)
throw IllegalStateException()
it.emit(10)

0 comments on commit 10ef545

Please sign in to comment.