diff --git a/.idea/.gitignore b/.idea/.gitignore
index 13566b81..a9d7db9c 100644
--- a/.idea/.gitignore
+++ b/.idea/.gitignore
@@ -6,3 +6,5 @@
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
+# GitHub Copilot persisted chat sessions
+/copilot/chatSessions
diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml
index 8d81632f..fe63bb67 100644
--- a/.idea/kotlinc.xml
+++ b/.idea/kotlinc.xml
@@ -1,6 +1,6 @@
-
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
index e9b9da12..7b9b47d6 100644
--- a/.idea/modules.xml
+++ b/.idea/modules.xml
@@ -5,23 +5,10 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/modules/analyzer-commons/direct-style-experiments.analyzer-commons.main.iml b/.idea/modules/analyzer-commons/direct-style-experiments.analyzer-commons.main.iml
index 0bed05a7..aa1ae7b4 100644
--- a/.idea/modules/analyzer-commons/direct-style-experiments.analyzer-commons.main.iml
+++ b/.idea/modules/analyzer-commons/direct-style-experiments.analyzer-commons.main.iml
@@ -3,7 +3,6 @@
-
\ No newline at end of file
diff --git a/.idea/modules/commons/direct-style-experiments.commons.main.iml b/.idea/modules/commons/direct-style-experiments.commons.main.iml
index 11670010..8046ae9e 100644
--- a/.idea/modules/commons/direct-style-experiments.commons.main.iml
+++ b/.idea/modules/commons/direct-style-experiments.commons.main.iml
@@ -3,7 +3,6 @@
-
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
index 44b0b39c..92a317a9 100644
--- a/.idea/vcs.xml
+++ b/.idea/vcs.xml
@@ -3,5 +3,6 @@
+
\ No newline at end of file
diff --git a/README.md b/README.md
index b27aa557..cd0a5f3d 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# Direct style for Functional Reactive Programming: an analysis in Scala & Kotlin
+# Analysis of direct style (for asynchronous reactive programming) in Scala
## Goals of the project
diff --git a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/client/AppController.scala b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/client/AppController.scala
index d539cb4f..049e65a6 100644
--- a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/client/AppController.scala
+++ b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/client/AppController.scala
@@ -6,9 +6,9 @@ import io.github.tassiLuca.analyzer.commons.lib.RepositoryReport
import io.github.tassiLuca.analyzer.lib.{Analyzer, RepositoryService}
object AppController:
- def direct(using Async, AsyncOperations): AppController = DirectAppController()
+ def direct(using Async.Spawn, AsyncOperations): AppController = DirectAppController()
- private class DirectAppController(using Async, AsyncOperations) extends AppController:
+ private class DirectAppController(using Async.Spawn, AsyncOperations) extends AppController:
private val view = AnalyzerView.gui(this)
private val analyzer = Analyzer.incremental(RepositoryService.ofGitHub())
private var currentComputation: Option[Future[Unit]] = None
@@ -18,10 +18,10 @@ object AppController:
override def runSession(organizationName: String): Unit =
var organizationReport: OrganizationReport = (Map(), Set())
val f = Future:
- analyzer.analyze(organizationName) { report =>
+ analyzer.analyze(organizationName): report =>
organizationReport = organizationReport.mergeWith(report)
view.update(organizationReport)
- } match { case Left(e) => view.error(e); case _ => view.endComputation() }
+ match { case Left(e) => view.error(e); case _ => view.endComputation() }
currentComputation = Some(f)
override def stopSession(): Unit = currentComputation.foreach(_.cancel())
diff --git a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/Analyzer.scala b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/Analyzer.scala
index 3bc88f29..80ee69e4 100644
--- a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/Analyzer.scala
+++ b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/Analyzer.scala
@@ -25,3 +25,5 @@ object Analyzer:
* making the computation faster when dealing with a high number of repositories.
*/
def incremental(service: RepositoryService): Analyzer = IncrementalAnalyzer(service)
+
+ def flowing(service: RepositoryService): Analyzer = FlowingAnalyzer(service)
diff --git a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/Analyzers.scala b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/Analyzers.scala
index 12844c13..a4a11d10 100644
--- a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/Analyzers.scala
+++ b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/Analyzers.scala
@@ -8,6 +8,7 @@ import io.github.tassiLuca.dse.boundaries.either
import io.github.tassiLuca.dse.boundaries.either.?
import io.github.tassiLuca.dse.pimping.TerminableChannelOps.foreach
import io.github.tassiLuca.dse.pimping.asTry
+import io.github.tassiLuca.dse.pimping.FlowOps.{map, toSeq}
import scala.util.boundary.Label
@@ -15,32 +16,49 @@ abstract class AbstractAnalyzer(repositoryService: RepositoryService) extends An
extension (r: Repository)
protected def performAnalysis(using Async): Task[RepositoryReport] = Task:
- val contributions = Future:
- repositoryService.contributorsOf(r.organization, r.name)
- val release = repositoryService.lastReleaseOf(r.organization, r.name)
- RepositoryReport(r.name, r.issues, r.stars, contributions.await.getOrElse(Seq()), release.toOption)
+ Async.group:
+ val contributions = Future:
+ repositoryService.contributorsOf(r.organization, r.name)
+ val release = repositoryService.lastReleaseOf(r.organization, r.name)
+ RepositoryReport(r.name, r.issues, r.stars, contributions.await.getOrElse(Seq()), release.toOption)
private class BasicAnalyzer(repositoryService: RepositoryService) extends AbstractAnalyzer(repositoryService):
override def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async, AsyncOperations): Either[String, Seq[RepositoryReport]] = either:
- val reposInfo = repositoryService.repositoriesOf(organizationName).?.map(_.performAnalysis.run)
- val collector = Collector[RepositoryReport](reposInfo.toList*)
- reposInfo.foreach(_ => updateResults(collector.results.read().asTry.?.awaitResult.?))
- reposInfo.awaitAll
+ Async.group:
+ val reposInfo = repositoryService.repositoriesOf(organizationName).?
+ .map(_.performAnalysis.start())
+ val collector = Collector[RepositoryReport](reposInfo.toList*)
+ reposInfo.foreach: _ =>
+ updateResults(collector.results.read().asTry.?.awaitResult.?)
+ reposInfo.awaitAll
private class IncrementalAnalyzer(repositoryService: RepositoryService) extends AbstractAnalyzer(repositoryService):
override def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async, AsyncOperations): Either[String, Seq[RepositoryReport]] = either:
- val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName)
- var futures = Seq[Future[RepositoryReport]]()
- reposInfo.foreach { repository =>
- futures = futures :+ Future:
- val report = repository.?.performAnalysis.run.awaitResult.?
- synchronized(updateResults(report))
- report
- }
- futures.awaitAll
+ Async.group:
+ val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName)
+ var futureResults = Seq[Future[RepositoryReport]]()
+ reposInfo.foreach: repository =>
+ futureResults = futureResults :+ Future:
+ val report = repository.?.performAnalysis.start().awaitResult.?
+ synchronized(updateResults(report))
+ report
+ futureResults.awaitAllOrCancel
+
+private class FlowingAnalyzer(repositoryService: RepositoryService) extends AbstractAnalyzer(repositoryService):
+
+ override def analyze(organizationName: String)(
+ updateResults: RepositoryReport => Unit,
+ )(using Async, AsyncOperations): Either[String, Seq[RepositoryReport]] = either:
+ Async.group:
+ repositoryService.flowingRepositoriesOf(organizationName).map: repository =>
+ Future:
+ val report = repository.performAnalysis.start().awaitResult.?
+ synchronized(updateResults(report))
+ report
+ .toSeq.?.awaitAllOrCancel
diff --git a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/GitHubRepositoryService.scala b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/GitHubRepositoryService.scala
index 39910498..af04f4b1 100644
--- a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/GitHubRepositoryService.scala
+++ b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/GitHubRepositoryService.scala
@@ -1,15 +1,15 @@
package io.github.tassiLuca.analyzer.lib
-import gears.async.{Async, Future, Listener, ReadableChannel, UnboundedChannel}
+import gears.async.{Async, Future}
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
-import io.github.tassiLuca.dse.pimping.{TerminableChannel, Terminated}
+import io.github.tassiLuca.dse.pimping.{Flow, TerminableChannel}
import scala.annotation.tailrec
private class GitHubRepositoryService extends RepositoryService:
import sttp.model.Uri
- import sttp.client3.{SimpleHttpClient, UriContext, basicRequest, Response}
+ import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest, Response}
import upickle.default.{read, Reader}
private val baseUrl = "https://api.github.com"
@@ -20,9 +20,12 @@ private class GitHubRepositoryService extends RepositoryService:
override def incrementalRepositoriesOf(
organizationName: String,
- )(using Async): TerminableChannel[Either[String, Repository]] =
+ )(using Async.Spawn): TerminableChannel[Either[String, Repository]] =
incrementalPaginatedRequest(uri"$baseUrl/orgs/$organizationName/repos")
+ override def flowingRepositoriesOf(organizationName: String)(using Async): Flow[Repository] =
+ flowingPaginatedRequest(uri"$baseUrl/orgs/$organizationName/repos")
+
override def contributorsOf(
organizationName: String,
repositoryName: String,
@@ -32,9 +35,9 @@ private class GitHubRepositoryService extends RepositoryService:
override def incrementalContributorsOf(
organizationName: String,
repositoryName: String,
- )(using Async): TerminableChannel[Either[String, Contribution]] =
+ )(using Async.Spawn): TerminableChannel[Either[String, Contribution]] =
incrementalPaginatedRequest(uri"$baseUrl/repos/$organizationName/$repositoryName/contributors")
-
+
override def lastReleaseOf(organizationName: String, repositoryName: String)(using Async): Either[String, Release] =
plainRequest[Release](uri"$baseUrl/repos/$organizationName/$repositoryName/releases/latest")
@@ -54,7 +57,7 @@ private class GitHubRepositoryService extends RepositoryService:
private def incrementalPaginatedRequest[T](
endpoint: Uri,
- )(using Reader[T], Async): TerminableChannel[Either[String, T]] =
+ )(using Reader[T], Async.Spawn): TerminableChannel[Either[String, T]] =
val channel = TerminableChannel.ofUnbounded[Either[String, T]]
@tailrec
def withPagination(next: Option[Uri]): Unit = next match
@@ -69,9 +72,24 @@ private class GitHubRepositoryService extends RepositoryService:
Future(withPagination(Some(endpoint)))
channel
+ private def flowingPaginatedRequest[T](endpoint: Uri)(using Reader[T], Async): Flow[T] = Flow:
+ @tailrec
+ def withPagination(next: Option[Uri]): Unit = next match
+ case None => ()
+ case Some(uri) =>
+ val response = doRequest(uri)
+ response.body.map(read[Seq[T]](_)).fold(
+ errorMessage => failWith(errorMessage),
+ results => results.foreach(it.emit(_)),
+ )
+ withPagination(nextPage(response))
+ withPagination(Some(endpoint))
+
private def doRequest(endpoint: Uri): Response[Either[String, String]] =
- SimpleHttpClient().send(request.get(endpoint))
+ HttpClientSyncBackend().send(request.get(endpoint))
private def nextPage(response: Response[Either[String, String]]): Option[Uri] = response.headers("Link")
.flatMap(_.split(",")).find(_.contains("rel=\"next\""))
.map(_.takeWhile(_ != ';').trim.stripPrefix("<").stripSuffix(">")).flatMap(Uri.parse(_).toOption)
+
+ private def failWith(errorMessage: String): Nothing = throw Exception(errorMessage)
diff --git a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/RepositoryService.scala b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/RepositoryService.scala
index a7787d2a..daabe9b6 100644
--- a/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/RepositoryService.scala
+++ b/analyzer-direct/src/main/scala/io/github/tassiLuca/analyzer/lib/RepositoryService.scala
@@ -2,7 +2,7 @@ package io.github.tassiLuca.analyzer.lib
import gears.async.Async
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
-import io.github.tassiLuca.dse.pimping.TerminableChannel
+import io.github.tassiLuca.dse.pimping.{Flow, TerminableChannel}
/** A service exposing functions to retrieve data from a central hosting repository service. */
trait RepositoryService:
@@ -17,7 +17,9 @@ trait RepositoryService:
*/
def incrementalRepositoriesOf(
organizationName: String,
- )(using Async): TerminableChannel[Either[String, Repository]]
+ )(using Async.Spawn): TerminableChannel[Either[String, Repository]]
+
+ def flowingRepositoriesOf(organizationName: String)(using Async): Flow[Repository]
/** @return [[Right]] with the [[Seq]]uence of [[Contribution]] for the given [[repositoryName]] owned by
* the given [[organizationName]] or a [[Left]] with a explanatory message in case of errors.
@@ -30,7 +32,7 @@ trait RepositoryService:
def incrementalContributorsOf(
organizationName: String,
repositoryName: String,
- )(using Async): TerminableChannel[Either[String, Contribution]]
+ )(using Async.Spawn): TerminableChannel[Either[String, Contribution]]
/** @return a [[Right]] with the last [[Release]] of the given [[repositoryName]] owned by [[organizationName]]
* if it exists, or a [[Left]] with a explanatory message in case of errors.
diff --git a/analyzer-direct/src/test/scala/io/github/tassiLuca/analyzer/lib/AnalyzerTest.scala b/analyzer-direct/src/test/scala/io/github/tassiLuca/analyzer/lib/AnalyzerTest.scala
index 61f5597d..25200865 100644
--- a/analyzer-direct/src/test/scala/io/github/tassiLuca/analyzer/lib/AnalyzerTest.scala
+++ b/analyzer-direct/src/test/scala/io/github/tassiLuca/analyzer/lib/AnalyzerTest.scala
@@ -18,9 +18,7 @@ abstract class AnalyzerTest extends AnyFlatSpec with Matchers with MockFactory:
"Analyzer" should "return the correct results if given in input an existing organization" in {
var incrementalResults = Set[RepositoryReport]()
Async.blocking:
- val allResults = successfulService.analyze("dse") { report =>
- incrementalResults += report
- }
+ val allResults = successfulService.analyze("dse")(incrementalResults += _)
incrementalResults shouldBe expectedResults
allResults.isRight shouldBe true
allResults.foreach(_ should contain theSameElementsAs expectedResults)
@@ -29,9 +27,7 @@ abstract class AnalyzerTest extends AnyFlatSpec with Matchers with MockFactory:
"Analyzer" should "return a failure in case the given organization doesn't exists" in {
var incrementalResults = Set[RepositoryReport]()
Async.blocking:
- val allResults = failingService.analyze("non-existing") { report =>
- incrementalResults += report
- }
+ val allResults = failingService.analyze("non-existing")(incrementalResults += _)
incrementalResults shouldBe empty
allResults.isLeft shouldBe true
}
@@ -45,12 +41,13 @@ abstract class AnalyzerTest extends AnyFlatSpec with Matchers with MockFactory:
def successfulService(using Async): Analyzer =
val gitHubService: RepositoryService = mock[RepositoryService]
registerSuccessfulRepositoriesResult(gitHubService)
- dummiesData.foreach { (repo, data) =>
- when(gitHubService.contributorsOf(_: String, _: String)(using _: Async)).expects(repo.organization, repo.name, *)
+ dummiesData.foreach: (repo, data) =>
+ when(gitHubService.contributorsOf(_: String, _: String)(using _: Async))
+ .expects(repo.organization, repo.name, *)
.returning(Right(data._1))
- when(gitHubService.lastReleaseOf(_: String, _: String)(using _: Async)).expects(repo.organization, repo.name, *)
+ when(gitHubService.lastReleaseOf(_: String, _: String)(using _: Async))
+ .expects(repo.organization, repo.name, *)
.returning(data._2.toRight("404, not found"))
- }
analyzerProvider(gitHubService)
def registerSuccessfulRepositoriesResult(service: RepositoryService)(using Async): Any
@@ -68,10 +65,14 @@ class BasicAnalyzerTest extends AnalyzerTest:
override val analyzerProvider: RepositoryService => Analyzer = Analyzer.basic
override def registerSuccessfulRepositoriesResult(service: RepositoryService)(using Async): Any =
- when(service.repositoriesOf(_: String)(using _: Async)).expects("dse", *).returning(Right(dummiesData.keys.toSeq))
+ when(service.repositoriesOf(_: String)(using _: Async))
+ .expects("dse", *)
+ .returning(Right(dummiesData.keys.toSeq))
override def registerFailureRepositoriesResult(service: RepositoryService)(using Async): Any =
- when(service.repositoriesOf(_: String)(using _: Async)).expects("non-existing", *).returning(Left("404, not found"))
+ when(service.repositoriesOf(_: String)(using _: Async))
+ .expects("non-existing", *)
+ .returning(Left("404, not found"))
end BasicAnalyzerTest
class IncrementalAnalyzerTest extends AnalyzerTest:
@@ -82,12 +83,14 @@ class IncrementalAnalyzerTest extends AnalyzerTest:
val repositoriesResult = TerminableChannel.ofUnbounded[Either[String, Repository]]
dummiesData.keys.foreach(repo => repositoriesResult.send(Right(repo)))
repositoriesResult.terminate()
- when(service.incrementalRepositoriesOf(_: String)(using _: Async)).expects("dse", *).returning(repositoriesResult)
+ when(service.incrementalRepositoriesOf(_: String)(using _: Async.Spawn))
+ .expects("dse", *).returning(repositoriesResult)
override def registerFailureRepositoriesResult(service: RepositoryService)(using Async): Any =
val repositoriesResult = TerminableChannel.ofUnbounded[Either[String, Repository]]
repositoriesResult.send(Left("404, not found"))
repositoriesResult.terminate()
- when(service.incrementalRepositoriesOf(_: String)(using _: Async)).expects("non-existing", *)
+ when(service.incrementalRepositoriesOf(_: String)(using _: Async.Spawn))
+ .expects("non-existing", *)
.returning(repositoriesResult)
end IncrementalAnalyzerTest
diff --git a/analyzer-direct/src/test/scala/io/github/tassiLuca/analyzer/lib/GitHubServiceTest.scala b/analyzer-direct/src/test/scala/io/github/tassiLuca/analyzer/lib/GitHubServiceTest.scala
index a13b3b04..a8ad780f 100644
--- a/analyzer-direct/src/test/scala/io/github/tassiLuca/analyzer/lib/GitHubServiceTest.scala
+++ b/analyzer-direct/src/test/scala/io/github/tassiLuca/analyzer/lib/GitHubServiceTest.scala
@@ -1,6 +1,7 @@
package io.github.tassiLuca.analyzer.lib
import gears.async.Async
+import gears.async.AsyncOperations.sleep
import gears.async.default.given
import io.github.tassiLuca.analyzer.commons.lib.Repository
import io.github.tassiLuca.dse.pimping.TerminableChannelOps.toSeq
@@ -82,6 +83,45 @@ class GitHubServiceTest extends AnyFunSpec with Matchers {
}
}
+ describe("with flowing results"):
+ describe("when asked for repositories"):
+ it("of an existing organization should return all of them"):
+ var repos: Seq[Repository] = Seq.empty
+ Async.blocking:
+ val reposFlow = gitHubService.flowingRepositoriesOf(organization)
+ reposFlow.collect: r =>
+ r.isSuccess shouldBe true
+ repos = repos :+ r.get
+ repos.size should be > defaultNumberOfResultsPerPage
+ repos.foreach(_.organization shouldBe organization)
+ repos.count(_.name == repository) shouldBe 1
+
+ it("of a non-existing organization should fail"):
+ Async.blocking:
+ val reposFlow = gitHubService.flowingRepositoriesOf(nonExistingOrganization)
+ reposFlow.collect:
+ _.isFailure shouldBe true
+
+ it("for showcasing / 1") {
+ Async.blocking:
+ val reposFlow = gitHubService.flowingRepositoriesOf(organization)
+ log("Still not collecting...")
+ sleep(1000)
+ log("Starting collecting...")
+ reposFlow.collect(log)
+ log("Done!")
+ }
+
+ it("for showcasing / 2") {
+ Async.blocking:
+ val reposFlow = gitHubService.flowingRepositoriesOf(nonExistingOrganization)
+ log("Still not collecting...")
+ sleep(1000)
+ log("Starting collecting...")
+ reposFlow.collect(log)
+ log("Done!")
+ }
+
describe("when asked for the last release of an existing repository") {
it("should return it if it exists") {
Async.blocking:
@@ -96,4 +136,6 @@ class GitHubServiceTest extends AnyFunSpec with Matchers {
}
}
}
+
+ private def log(x: Any): Unit = println(s"[${System.currentTimeMillis()}] $x")
}
diff --git a/blog-ws-direct/src/main/scala/io/github/tassiLuca/dse/blog/PostsRepositoryComponent.scala b/blog-ws-direct/src/main/scala/io/github/tassiLuca/dse/blog/PostsRepositoryComponent.scala
index e4b57916..008e0881 100644
--- a/blog-ws-direct/src/main/scala/io/github/tassiLuca/dse/blog/PostsRepositoryComponent.scala
+++ b/blog-ws-direct/src/main/scala/io/github/tassiLuca/dse/blog/PostsRepositoryComponent.scala
@@ -3,7 +3,7 @@ package io.github.tassiLuca.dse.blog
import gears.async.Async
import io.github.tassiLuca.dse.blog.core.{PostsModel, simulates}
import io.github.tassiLuca.dse.boundaries.either
-import io.github.tassiLuca.dse.boundaries.either.{?, left}
+import io.github.tassiLuca.dse.boundaries.either.{?, leave}
/** The component exposing blog posts repositories. */
trait PostsRepositoryComponent:
@@ -34,7 +34,7 @@ trait PostsRepositoryComponent:
private var posts: Set[Post] = Set()
override def save(post: Post)(using Async): Either[String, Post] = either:
- if exists(post.title).? then left("A post with same title has already been saved")
+ if exists(post.title).? then leave("A post with same title has already been saved")
"PostsRepository".simulates(s"saving post '${post.title}'")
synchronized { posts = posts + post }
post
diff --git a/blog-ws-direct/src/main/scala/io/github/tassiLuca/dse/blog/PostsServiceComponent.scala b/blog-ws-direct/src/main/scala/io/github/tassiLuca/dse/blog/PostsServiceComponent.scala
index 2a3e0264..8e71a60a 100644
--- a/blog-ws-direct/src/main/scala/io/github/tassiLuca/dse/blog/PostsServiceComponent.scala
+++ b/blog-ws-direct/src/main/scala/io/github/tassiLuca/dse/blog/PostsServiceComponent.scala
@@ -5,7 +5,7 @@ import gears.async.{Async, Future, Task}
import io.github.tassiLuca.dse.blog.core.{PostsModel, simulates}
import io.github.tassiLuca.dse.boundaries.EitherConversions.given
import io.github.tassiLuca.dse.boundaries.either
-import io.github.tassiLuca.dse.boundaries.either.{?, left}
+import io.github.tassiLuca.dse.boundaries.either.{?, leave}
import java.util.Date
import scala.util.{Failure, Try}
@@ -40,12 +40,11 @@ trait PostsServiceComponent:
) extends PostsService:
override def create(authorId: AuthorId, title: Title, body: Body)(using Async): Either[String, Post] = either:
- if context.repository.exists(title).? then left(s"A post entitled $title already exists")
- val f = Future:
- val content = verifyContent(title, body).run
- val author = authorBy(authorId).run
- content.zip(author).await
- val (post, author) = f.awaitResult.?
+ if context.repository.exists(title).? then leave(s"A post entitled $title already exists")
+ val (post, author) = Async.group:
+ val content = verifyContent(title, body).start()
+ val author = authorBy(authorId).start()
+ content.zip(author).awaitResult.?
context.repository.save(Post(author.?, post.?._1, post.?._2, Date())).?
/* Pretending to make a call to the Authorship Service that keeps track of authorized authors. */
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/boundaries/either.scala b/commons/src/main/scala/io/github/tassiLuca/dse/boundaries/either.scala
index 0799c84d..696c41d4 100644
--- a/commons/src/main/scala/io/github/tassiLuca/dse/boundaries/either.scala
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/boundaries/either.scala
@@ -11,7 +11,7 @@ object either:
boundary(Right(body))
/** Quickly break to the enclosing boundary with a [[Left]] filled with [[l]]. */
- inline def left[L, R](l: L)(using Label[Left[L, Nothing]]): Nothing = break(Left(l))
+ inline def leave[L, R](l: L)(using Label[Left[L, R]]): R = break(Left(l))
extension [L, R](e: Either[L, R])
/** @return this [[Right]] value or break to the enclosing boundary with the [[Left]] value. */
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/effects/EffectsShowcase.scala b/commons/src/main/scala/io/github/tassiLuca/dse/effects/EffectsShowcase.scala
new file mode 100644
index 00000000..f081aac5
--- /dev/null
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/effects/EffectsShowcase.scala
@@ -0,0 +1,38 @@
+package io.github.tassiLuca.dse.effects
+
+import gears.async.AsyncOperations.sleep
+import gears.async.default.given
+import gears.async.{Async, AsyncOperations, Future}
+import io.github.tassiLuca.dse.boundaries.either
+import io.github.tassiLuca.dse.boundaries.either.{?, leave}
+import io.github.tassiLuca.dse.boundaries.EitherConversions.given
+
+import scala.util.boundary.Label
+
+object EffectsShowcase extends App:
+
+ type CanFail = Label[Left[String, Nothing]]
+
+ def f(using Async, CanFail): String =
+ Async.group:
+ val f1 = Future:
+ sleep(1_000)
+ "hello"
+ val f2 = Future:
+ "world"
+ f1.awaitResult.? + " " + f2.awaitResult.?
+
+ def g(f: => String): String =
+ "g: " + f
+
+ @main def useEffectfulComputation(): Unit =
+ Async.blocking:
+ print:
+ either:
+ f
+
+ @main def useG(): Unit =
+ Async.blocking:
+ print:
+ either:
+ g(f)
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/effects/Generator.scala b/commons/src/main/scala/io/github/tassiLuca/dse/effects/Generator.scala
new file mode 100644
index 00000000..9e24397c
--- /dev/null
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/effects/Generator.scala
@@ -0,0 +1,53 @@
+package io.github.tassiLuca.dse.effects
+
+import gears.async.VThreadSupport.{boundary, suspend}
+import io.github.tassiLuca.dse.effects.Tree.{Inner, Leaf}
+
+enum Tree[T]:
+ case Leaf(x: T)
+ case Inner(xs: List[Tree[T]])
+
+trait Generator[T]:
+ def nextOption: Option[T]
+
+// What in Koka we would call effect produce
+trait Produce[-T]:
+ def produce(x: T): Unit
+
+def generate[T](body: (it: Produce[T]) ?=> Unit) = new Generator[T]:
+ override def nextOption: Option[T] = step()
+
+ var step: () => Option[T] = () =>
+ boundary:
+ // what in Koka we would call handler
+ given Produce[T] with
+ override def produce(x: T): Unit =
+ suspend[Unit, Option[T]]: k =>
+ step = () => k.resume(())
+ Some(x)
+ body
+ None
+
+object Tree:
+ extension [T](t: Tree[T])
+ def leafs: Generator[T] = generate:
+ def recur(t: Tree[T]): Unit = t match
+ case Leaf(x) => it.produce(x)
+ case Inner(xs) => xs.foreach(recur)
+ recur(t)
+
+@main def useTree(): Unit =
+ /*
+ *
+ / | \
+ 1 * 5
+ / \
+ 2 3
+ */
+ val tree = Tree.Inner(
+ Leaf(1) :: Inner(
+ Leaf(2) :: Leaf(3) :: Nil
+ ) :: Leaf(5) :: Nil
+ )
+ val leafs = tree.leafs
+ for _ <- 0 to 4 do println(leafs.nextOption)
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/effects/SaferExceptions.scala b/commons/src/main/scala/io/github/tassiLuca/dse/effects/SaferExceptions.scala
new file mode 100644
index 00000000..ac147ac6
--- /dev/null
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/effects/SaferExceptions.scala
@@ -0,0 +1,23 @@
+package io.github.tassiLuca.dse.effects
+
+import language.experimental.saferExceptions
+import scala.annotation.experimental
+
+@experimental
+object SaferExceptions extends App:
+
+ class DivisionByZero extends Exception
+
+ // or equivalently, CanThrow[DivisionByZero] ?=> Int
+ def div(n: Int, m: Int)(using CanThrow[DivisionByZero]): Int = m match
+ case 0 => throw DivisionByZero()
+ case _ => n / m
+
+ println:
+ try div(10, 0)
+ catch case _: DivisionByZero => "Division by zero"
+
+ val values = (10, 1) :: (5, 2) :: (4, 2) :: (5, 1) :: Nil
+ println:
+ try values.map(div)
+ catch case _: DivisionByZero => "Division by zero"
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/examples/ShowcasingFlows.scala b/commons/src/main/scala/io/github/tassiLuca/dse/examples/ShowcasingFlows.scala
index 10bf7169..060c638d 100644
--- a/commons/src/main/scala/io/github/tassiLuca/dse/examples/ShowcasingFlows.scala
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/examples/ShowcasingFlows.scala
@@ -22,13 +22,14 @@ object ShowcasingFlows:
)
def allWriters(using Async): Flow[Writer] = Flow:
- users.foreach { u =>
- sleep(2_000)
+ users.foreach: u =>
+ sleep(2_000) // something meaningful
it.emit(u)
- }
def booksByWriter(writer: WriterId)(using Async): Flow[Book] = Flow:
- books(writer).foreach(it.emit)
+ books(writer).foreach: w =>
+ sleep(2_000) // something meaningful
+ it.emit(w)
def failingWriters(using Async): Flow[Writer] = Flow:
throw IllegalStateException("The library is closed")
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/examples/UseChannelMultiplexer.scala b/commons/src/main/scala/io/github/tassiLuca/dse/examples/UseChannelMultiplexer.scala
index 517d302e..ba12774f 100644
--- a/commons/src/main/scala/io/github/tassiLuca/dse/examples/UseChannelMultiplexer.scala
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/examples/UseChannelMultiplexer.scala
@@ -42,26 +42,23 @@ object UseChannelMultiplexer:
def loopedConsumer(name: String): (Task[Unit], Channel[Try[Item]]) =
val channel = BufferedChannel[Try[Item]](bufferSize)
val consumingTask = Task:
- while (true) {
+ while (true)
println(s"[CONSUMER-$name - ${Thread.currentThread()} @ ${LocalTime.now()}] Waiting for a new item...")
val item = channel.read() // blocking operation
println(s"[CONSUMER-$name - ${Thread.currentThread()} @ ${LocalTime.now()}] received $item")
- }
(consumingTask, channel)
@main def useMultiplexer(): Unit = Async.blocking:
val multiplexer = ChannelMultiplexer[Item]()
- Future {
- // blocking call until the multiplexer is closed => needs to be called on a new thread
- multiplexer.run()
- }
+ Future:
+ multiplexer.run() // blocking call until the multiplexer is closed => needs to be called on a new thread
for i <- 0 until producers do
val (producer, producerChannel) = scheduledProducer(i.toString, Random.nextLong(4_000))
multiplexer.addPublisher(producerChannel.asReadable)
- producer.run
+ producer.start()
Thread.sleep(10_000)
for i <- 0 until consumers do
val (consumer, consumerChannel) = loopedConsumer(i.toString)
multiplexer.addSubscriber(consumerChannel.asSendable)
- consumer.run
+ consumer.start()
Thread.sleep(60_000)
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/examples/UseSimpleChannels.scala b/commons/src/main/scala/io/github/tassiLuca/dse/examples/UseSimpleChannels.scala
index 009a7468..8ea952d9 100644
--- a/commons/src/main/scala/io/github/tassiLuca/dse/examples/UseSimpleChannels.scala
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/examples/UseSimpleChannels.scala
@@ -40,14 +40,13 @@ object UseSimpleChannels:
/** A generic consumer of items. */
def consumer(c: ReadableChannel[Int]): Task[Unit] = Task:
- while (true) {
+ while (true)
println(s"[CONSUMER - ${Thread.currentThread()} @ ${LocalTime.now()}] Waiting for a new item...")
val item = c.read() // blocking operation
println(s"[CONSUMER - ${Thread.currentThread()} @ ${LocalTime.now()}] received $item")
- }
@main def useChannels(): Unit = Async.blocking:
- for _ <- 0 until consumers do consumer(channel.asReadable).run
+ for _ <- 0 until consumers do consumer(channel.asReadable).start()
sleep(10_000)
- scheduledProducer(channel.asSendable).run
+ scheduledProducer(channel.asSendable).start()
sleep(30_000)
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/pimping/Flow.scala b/commons/src/main/scala/io/github/tassiLuca/dse/pimping/Flow.scala
index b08ee0ff..d8fdb0b4 100644
--- a/commons/src/main/scala/io/github/tassiLuca/dse/pimping/Flow.scala
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/pimping/Flow.scala
@@ -7,7 +7,8 @@ import io.github.tassiLuca.dse.pimping.TerminableChannelOps.foreach
import java.util.concurrent.Semaphore
import scala.compiletime.uninitialized
import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
+import scala.util.boundary.break
+import scala.util.{Failure, Success, Try, boundary}
/** An asynchronous cold data stream that emits values, inspired to Kotlin Flows. */
trait Flow[+T]:
@@ -15,7 +16,7 @@ trait Flow[+T]:
/** Start the flowing of data which can be collected reacting through the given [[collector]] function. */
def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit
-/** An interface modeling an entity capable of [[emit]]ting [[Flow]]s values. */
+/** An interface modeling an entity capable of [[emit]]ting [[Flow]]able values. */
trait FlowCollector[-T]:
/** Emits a value to the flow. */
@@ -32,9 +33,9 @@ object Flow:
flow.task = Task:
val channel = flow.channel
flow.sync.release()
- val collector: FlowCollector[T] = new FlowCollector[T]:
+ given FlowCollector[T] with
override def emit(value: T)(using Async): Unit = channel.send(Success(value))
- try body(using collector)
+ try body
catch case e: Exception => channel.send(Failure(e))
flow
@@ -43,11 +44,11 @@ object Flow:
private[Flow] var channel: TerminableChannel[Try[T]] = uninitialized
private[Flow] val sync = Semaphore(0)
- override def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit =
+ override def collect(collector: Try[T] => Unit)(using Async, AsyncOperations): Unit = Async.group:
val myChannel = TerminableChannel.ofUnbounded[Try[T]]
synchronized:
channel = myChannel
- task.run.onComplete(() => myChannel.terminate())
+ task.start().onComplete(() => myChannel.terminate())
// Ensure to leave the synchronized block after the task has been initialized
// with the correct channel instance.
sync.acquire()
@@ -70,6 +71,13 @@ object FlowOps:
override def collect(collector: Try[R] => Unit)(using Async, AsyncOperations): Unit =
catchFailure(collector):
flow.collect(item => f(item.get).collect(x => collector(Success(x.get))))
+
+ def toSeq(using Async, AsyncOperations): Try[Seq[T]] = boundary:
+ var result = Seq.empty[T]
+ flow.collect:
+ case Success(value) => result = result :+ value
+ case e => break(e.asInstanceOf[Try[Seq[T]]])
+ Success(result)
private inline def catchFailure[X](collector: Try[X] => Unit)(inline body: => Unit): Unit =
try body
diff --git a/commons/src/main/scala/io/github/tassiLuca/dse/pimping/TerminableChannel.scala b/commons/src/main/scala/io/github/tassiLuca/dse/pimping/TerminableChannel.scala
index 30037eb6..413f1833 100644
--- a/commons/src/main/scala/io/github/tassiLuca/dse/pimping/TerminableChannel.scala
+++ b/commons/src/main/scala/io/github/tassiLuca/dse/pimping/TerminableChannel.scala
@@ -44,10 +44,9 @@ object TerminableChannel:
private var _terminated: Boolean = false
override val readSource: Async.Source[Res[Terminable[T]]] =
- c.readSource.transformValuesWith {
+ c.readSource.transformValuesWith:
case Right(Terminated) => c.close(); Left(Channel.Closed)
case v @ _ => v
- }
override def sendSource(x: Terminable[T]): Async.Source[Res[Unit]] =
synchronized:
@@ -67,7 +66,7 @@ object TerminableChannelOps:
extension [T: ClassTag](c: TerminableChannel[T])
- /** Blocking consume channel items, executing the given function [[f]] for each element. */
+ /** Consume channel items, executing the given function [[f]] for each element. This is a blocking operation. */
@tailrec
def foreach[U](f: T => U)(using Async): Unit = c.read() match
case Left(Channel.Closed) => ()
diff --git a/commons/src/test/scala/io/github/tassiLuca/dse/ChannelsContextTest.scala b/commons/src/test/scala/io/github/tassiLuca/dse/ChannelsContextTest.scala
index 2eff2f45..7bb038f4 100644
--- a/commons/src/test/scala/io/github/tassiLuca/dse/ChannelsContextTest.scala
+++ b/commons/src/test/scala/io/github/tassiLuca/dse/ChannelsContextTest.scala
@@ -20,12 +20,11 @@ class ChannelsContextTest extends AnyFunSpec with Matchers {
var i = 0
val channel = BufferedChannel[Item](items)
Async.blocking:
- channel.consume {
+ channel.consume:
case Left(_) => ()
case Right(_) => i = i + 1
- }
Async.blocking:
- produceOn(channel).run.await
+ produceOn(channel).start().await
i shouldBe 0
}
diff --git a/commons/src/test/scala/io/github/tassiLuca/dse/StructuredConcurrencyTest.scala b/commons/src/test/scala/io/github/tassiLuca/dse/StructuredConcurrencyTest.scala
index fb5615ad..46bb1105 100644
--- a/commons/src/test/scala/io/github/tassiLuca/dse/StructuredConcurrencyTest.scala
+++ b/commons/src/test/scala/io/github/tassiLuca/dse/StructuredConcurrencyTest.scala
@@ -14,14 +14,28 @@ class StructuredConcurrencyTest extends AnyFunSpec with Matchers {
it("ensure all nested computations are contained within the lifetime of the enclosing one") {
Async.blocking:
val before = System.currentTimeMillis()
- val f = Future:
+ val result = Async.group:
val f1 = Future { "hello" }
val f2 = Future { sleep(2_000); "gears!" }
f1.await + " " + f2.await
- f.await shouldBe "hello gears!"
+ result shouldBe "hello gears!"
val now = System.currentTimeMillis()
now - before should be > 2_000L
}
+
+ describe("Async.group") {
+ it("test") {
+ Async.blocking:
+ val before = System.currentTimeMillis()
+ Async.group:
+ Future { println("hello") }
+ Future { sleep(2_000); println("gears!") }
+ val now = System.currentTimeMillis()
+ println(now - before)
+ sleep(5_000)
+ println("done")
+ }
+ }
describe("in case of failures") {
it("if the first nested computation we wait fails with an exception the other is cancelled") {
@@ -76,7 +90,7 @@ class StructuredConcurrencyTest extends AnyFunSpec with Matchers {
val before = System.currentTimeMillis()
val f1 = Future { sleep(1_000); "faster won" }
val f2 = Future { sleep(2_000); stillAlive = true }
- val result = f1.altWithCancel(f2).awaitResult
+ val result = f1.orWithCancel(f2).awaitResult
val now = System.currentTimeMillis()
now - before should (be >= 1_000L and be < 2_000L)
result.isSuccess shouldBe true
diff --git a/commons/src/test/scala/io/github/tassiLuca/dse/TasksTest.scala b/commons/src/test/scala/io/github/tassiLuca/dse/TasksTest.scala
index 4458444a..0a374a06 100644
--- a/commons/src/test/scala/io/github/tassiLuca/dse/TasksTest.scala
+++ b/commons/src/test/scala/io/github/tassiLuca/dse/TasksTest.scala
@@ -19,10 +19,10 @@ class TasksTest extends AnyFunSpec with Matchers {
it("do not leave the Async context if millis = 0 and no suspending calls are performed") {
var i = 0
Async.blocking:
- Task {
+ Task:
i = i + 1
if i == items then Failure(Error()) else i
- }.schedule(TaskSchedule.RepeatUntilFailure()).run
+ .schedule(TaskSchedule.RepeatUntilFailure()).start()
// millis = 0 is the default --------------Ʌ
i shouldBe 5
}
@@ -31,20 +31,20 @@ class TasksTest extends AnyFunSpec with Matchers {
it("leaves the Async context") {
var i = 0
Async.blocking:
- Task {
+ Task:
i = i + 1
if i == items then Failure(Error()) else i
- }.schedule(TaskSchedule.RepeatUntilFailure(millis = 1)).run
+ .schedule(TaskSchedule.RepeatUntilFailure(millis = 1)).start()
i should be < items
}
it("unless an await is called on the future") {
var i = 0
Async.blocking:
- Task {
+ Task:
i = i + 1
if i == items then Failure(Error()) else i
- }.schedule(TaskSchedule.RepeatUntilFailure(millis = 1)).run.await
+ .schedule(TaskSchedule.RepeatUntilFailure(millis = 1)).start().await
i shouldBe 5
}
}
@@ -55,11 +55,10 @@ class TasksTest extends AnyFunSpec with Matchers {
Async.blocking:
val timer = Timer(2.seconds)
Future(timer.run())
- produce { _ =>
+ produce: _ =>
timer.src.awaitResult
consumedItems = consumedItems + 1
if consumedItems == items then Failure(Error()) else Success(())
- }
consumedItems shouldBe items
}
@@ -68,28 +67,27 @@ class TasksTest extends AnyFunSpec with Matchers {
Async.blocking:
val timer = Timer(2.seconds)
Future(timer.run())
- produceWithLabel { _ =>
+ produceWithLabel: _ =>
timer.src.awaitResult
consumedItems = consumedItems + 1
if consumedItems == items then Failure(Error()) else Success(())
- }
consumedItems should be < items
}
}
}
}
- def produce[T](action: SendableChannel[T] => Try[Unit])(using Async): ReadableChannel[T] =
+ def produce[T](action: SendableChannel[T] => Try[Unit])(using Async.Spawn): ReadableChannel[T] =
val channel = UnboundedChannel[T]()
- Task {
+ Task:
action(channel.asSendable)
- }.schedule(RepeatUntilFailure()).run
+ .schedule(RepeatUntilFailure()).start()
channel.asReadable
- def produceWithLabel[T](action: Async ?=> SendableChannel[T] => Try[Unit])(using Async): ReadableChannel[T] =
+ def produceWithLabel[T](action: Async ?=> SendableChannel[T] => Try[Unit])(using Async.Spawn): ReadableChannel[T] =
val channel = UnboundedChannel[T]()
- Task {
+ Task:
action(channel.asSendable)
- }.schedule(RepeatUntilFailure()).run
+ .schedule(RepeatUntilFailure()).start()
channel.asReadable
}
diff --git a/commons/src/test/scala/io/github/tassiLuca/dse/pimping/FlowTest.scala b/commons/src/test/scala/io/github/tassiLuca/dse/pimping/FlowTest.scala
index 0dc35884..568a686a 100644
--- a/commons/src/test/scala/io/github/tassiLuca/dse/pimping/FlowTest.scala
+++ b/commons/src/test/scala/io/github/tassiLuca/dse/pimping/FlowTest.scala
@@ -5,7 +5,7 @@ import gears.async.{Async, AsyncOperations, Future}
import io.github.tassiLuca.dse.pimping.Flow
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
-import io.github.tassiLuca.dse.pimping.FlowOps.{flatMap, map}
+import io.github.tassiLuca.dse.pimping.FlowOps.{flatMap, map, toSeq}
import scala.collection
import scala.collection.immutable
@@ -143,6 +143,21 @@ class FlowTest extends AnyFunSpec with Matchers:
intercept[IllegalStateException](collected.head.get)
}
}
+
+ describe("Flows `toSeq") {
+ it("should return a Sequence wrapped inside a Success if no error occurrs") {
+ Async.blocking:
+ val result = simpleFlow.toSeq
+ result shouldBe Success(Seq.range(0, items))
+ }
+
+ it("should return a Failure if an error occurrs") {
+ Async.blocking:
+ val result = failingFlow.toSeq
+ result.isFailure shouldBe true
+ intercept[IllegalStateException](result.get)
+ }
+ }
}
def simpleFlow(using Async): Flow[Item] = Flow:
diff --git a/commons/src/test/scala/io/github/tassiLuca/dse/pimping/TerminableChannelTest.scala b/commons/src/test/scala/io/github/tassiLuca/dse/pimping/TerminableChannelTest.scala
index 8858e22e..cc85bc2a 100644
--- a/commons/src/test/scala/io/github/tassiLuca/dse/pimping/TerminableChannelTest.scala
+++ b/commons/src/test/scala/io/github/tassiLuca/dse/pimping/TerminableChannelTest.scala
@@ -40,7 +40,7 @@ class TerminableChannelTest extends AnyFunSpec with Matchers {
Async.blocking:
var collectedItems = Seq[Item]()
val channel = TerminableChannel.ofUnbounded[Item]
- produceOn(channel).run.onComplete(Listener((_, _) => channel.terminate()))
+ produceOn(channel).start().onComplete(Listener((_, _) => channel.terminate()))
channel.foreach(res => collectedItems = collectedItems :+ res)
collectedItems shouldBe Seq.range(0, itemsProduced)
}
@@ -48,7 +48,7 @@ class TerminableChannelTest extends AnyFunSpec with Matchers {
it("once closed should be possible to transform it into a Sequence") {
Async.blocking:
val channel = TerminableChannel.ofUnbounded[Item]
- produceOn(channel).run.onComplete(Listener((_, _) => channel.terminate()))
+ produceOn(channel).start().onComplete(Listener((_, _) => channel.terminate()))
channel.toSeq shouldBe Seq.range(0, itemsProduced)
}
}
diff --git a/docs/presentation/archetypes/default.md b/docs/presentation/archetypes/default.md
deleted file mode 100644
index c6f3fcef..00000000
--- a/docs/presentation/archetypes/default.md
+++ /dev/null
@@ -1,5 +0,0 @@
-+++
-title = '{{ replace .File.ContentBaseName "-" " " | title }}'
-date = {{ .Date }}
-draft = true
-+++
diff --git a/docs/presentation/assets/custom-theme.scss b/docs/presentation/assets/custom-theme.scss
deleted file mode 100644
index d2c9cfa2..00000000
--- a/docs/presentation/assets/custom-theme.scss
+++ /dev/null
@@ -1,154 +0,0 @@
-@import "reveal-js/dist/theme/template/mixins";
-@import "reveal-js/dist/theme/template/settings";
-
-// Include theme-specific fonts
-@import url('https://fonts.googleapis.com/css?family=Signika Negative');
-
-$mainFontSize: 2em;
-$mainFont: 'Signika Negative', sans-serif;
-$headingFont: 'Signika Negative', sans-serif;
-$headingLetterSpacing: normal;
-$headingFontWeight: 600;
-
-$heading1Size: 2.5em;
-$heading2Size: 1.6em;
-$heading3Size: 1.3em;
-$heading4Size: 1.0em;
-$heading5Size: 0.8em;
-$heading6Size: 0.6em;
-
-// Solarized color theme
-/**
- * Solarized colors by Ethan Schoonover
- */
-
-// Solarized colors
-$base03: #002b36;
-$base02: #073642;
-$base01: #586e75;
-$base00: #657b83;
-$base0: #839496;
-$base1: #93a1a1;
-$base2: #eee8d5;
-$base3: #fdf6e3;
-$base4: #fcfcfc;
-$yellow: #b58900;
-$orange: #cb4b16;
-$red: #dc322f;
-$magenta: #d33682;
-$violet: #6c71c4;
-$blue: #268bd2;
-$cyan: #118b81;
-$green: #859900;
-
-// Override theme settings (see ../template/settings.scss)
-$mainColor: $base02;
-$headingColor: $blue;
-$headingTextShadow: none;
-$backgroundColor: $base4;
-$linkColor: $blue;
-$linkColorHover: lighten( $linkColor, 20% );
-$selectionBackgroundColor: $cyan;
-
-@import "reveal-js/dist/theme/template/theme";
-
-.reveal pre {
- margin: 1em auto;
- font-size: 1em;
-}
-
-.reveal strong,
-.reveal b {
- font-weight: bold;
-}
-
-.reveal em {
- font-style: italic;
-}
-
-.reveal section img {
- border: 0em;
- box-shadow: none;
-}
-
-.reveal ul {
- margin: .2em .2em .2em;
-}
-
-.gist-it-gist .gist-file .gist-data pre, .gister-gist .gist-file .gist-data pre {
- box-shadow: none !important;
-}
-
-.container {
- display: flex;
- justify-content: center;
- align-items: flex-start;
- margin-bottom: 0.4em;
-}
-
-.col {
- flex: 1;
- margin: 0.1em;
-}
-
-.white-text {
- color: white !important;
-}
-
-.highlight {
- color: $cyan !important;
-}
-
-.accent {
- color: $magenta !important;
-}
-
-.reveal .progress {
- color: $magenta;
-}
-
-.bad {
- color: $red !important;
-}
-
-.ok {
- color: $green !important;
-}
-
-.no-bullet {
- list-style-type: none !important;
-}
-
-.inverse {
- color: $base2 !important;
-}
-
-em {
- color: $cyan;
-}
-
-strong {
- color: $magenta;
-}
-
-.reveal code {
- color: rgb(209, 28, 124);
- font-family: "JetBrains Mono", "Oxygen Mono", "Roboto Mono", "Ubuntu Mono", "Courier New", Courier, monospace;
- // font-weight: bold;
-}
-
-.reveal pre {
- margin: 1em auto;
-}
-
-.w50 {
- width: 50%
-}
-
-.w75 {
- width: 75%
-}
-
-.w100 {
- width: 100%
-}
\ No newline at end of file
diff --git a/docs/presentation/content/_index.md b/docs/presentation/content/_index.md
deleted file mode 100644
index cfacbc65..00000000
--- a/docs/presentation/content/_index.md
+++ /dev/null
@@ -1,41 +0,0 @@
-+++
-title = "Direct style experiments presentation"
-outputs = ["Reveal"]
-+++
-
-# Direct Style for **Functional** **Reactive** Programming in *Scala*
-
-
-
-Tassinari Luca
-
----
-
-## Effects, Monads and Capabilities
-
-- Most of the time when we say *effect* we mean *monads*
-- Monads do have cons:
- - syntactic and pedagogical overhead
- - awkward to integrate with regular control structures
- - composing monads is tricky even with monad transformers, MLT, ...
-
-- *Research-field*: instead of pushing effects and resources into frameworks, upgrade the _type system_ to track them directly in the program {{< math "\Rightarrow" />}} **CAPabilities for RESources and Effects (CAPRESE)** @ Programming Methods Laboratory EPFL
-
- - to have an effect you need the capability to have that effect
- - **Capability**: a value that is passed (usually implicitly) to the function that need to perform the effect the capability enables
-
----
-
-## Suspension Effect
-
----
-
-## Aside: Boundary & Break
-
----
-
-## Scala Gears
-
----
-
-
diff --git a/docs/presentation/go.mod b/docs/presentation/go.mod
deleted file mode 100644
index cdb5baea..00000000
--- a/docs/presentation/go.mod
+++ /dev/null
@@ -1,5 +0,0 @@
-module github.com/tassiLuca/direct-style-experiments/presentation
-
-go 1.22.1
-
-require github.com/dzello/reveal-hugo v0.0.0-20240121164136-5e38035fe41f // indirect
diff --git a/docs/presentation/go.sum b/docs/presentation/go.sum
deleted file mode 100644
index 40af216c..00000000
--- a/docs/presentation/go.sum
+++ /dev/null
@@ -1,2 +0,0 @@
-github.com/dzello/reveal-hugo v0.0.0-20240121164136-5e38035fe41f h1:UPALL8009oW6MhGR7iwp5Ew/wcSmLILh3e9MsSsz6vg=
-github.com/dzello/reveal-hugo v0.0.0-20240121164136-5e38035fe41f/go.mod h1:9mgTYgEkMfnc36vaNSvk3xRedXIv2qXjejlugfwyb3o=
diff --git a/docs/presentation/hugo.toml b/docs/presentation/hugo.toml
deleted file mode 100644
index d14e947e..00000000
--- a/docs/presentation/hugo.toml
+++ /dev/null
@@ -1,35 +0,0 @@
-baseURL = 'https://tassiluca.github.io/direct-style-experiments/presentation/'
-languageCode = 'en-us'
-title = 'My New Hugo Site'
-theme = "github.com/dzello/reveal-hugo"
-
-[markup.goldmark.renderer]
-unsafe = true
-
-[outputFormats.Reveal]
-baseName = "index"
-mediaType = "text/html"
-isHTML = true
-
-[markup.highlight]
-codeFences = true
-style = "catppuccin-latte"
-
-[params.reveal_hugo]
-history = true
-theme = "serif"
-slide_number = true
-transition = "slide"
-transition_speed = "normal"
-custom_theme = "custom-theme.scss"
-custom_theme_compile = true
-width = 1440
-height = 900
-
-[params.reveal_hugo.custom_theme_options]
-targetPath = "css/custom-theme.css"
-enableSourceMap = true
-
-[[params.reveal_hugo.mermaid]]
-startOnLoad = false
-theme = "default"
diff --git a/docs/site/content/res/img/rears copy.png b/docs/site/content/res/img/rears copy.png
new file mode 100644
index 00000000..a417dccb
Binary files /dev/null and b/docs/site/content/res/img/rears copy.png differ
diff --git a/docs/site/content/res/img/rears.png b/docs/site/content/res/img/rears.png
new file mode 100644
index 00000000..1eb7c721
Binary files /dev/null and b/docs/site/content/res/img/rears.png differ
diff --git a/docs/site/content/res/schemas/.$diagrams.drawio.bkp b/docs/site/content/res/schemas/.$diagrams.drawio.bkp
index 38479850..a00607bc 100644
--- a/docs/site/content/res/schemas/.$diagrams.drawio.bkp
+++ b/docs/site/content/res/schemas/.$diagrams.drawio.bkp
@@ -1,6 +1,6 @@
-
+
-
+
@@ -102,7 +102,7 @@
-
+
@@ -112,63 +112,63 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
@@ -178,41 +178,41 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
@@ -396,4 +396,81 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/site/content/res/schemas/diagrams.drawio b/docs/site/content/res/schemas/diagrams.drawio
index c07ece63..e79647ce 100644
--- a/docs/site/content/res/schemas/diagrams.drawio
+++ b/docs/site/content/res/schemas/diagrams.drawio
@@ -1,6 +1,6 @@
-
+
-
+
@@ -134,41 +134,41 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
@@ -178,41 +178,41 @@
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
-
+
@@ -396,4 +396,92 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/libs/gears b/libs/gears
index eb172470..dc313af1 160000
--- a/libs/gears
+++ b/libs/gears
@@ -1 +1 @@
-Subproject commit eb1724705b3cbe83cba61cfa5f35efe11aeee88f
+Subproject commit dc313af11bdb22a2b55f2d405aec3af139467b56
diff --git a/libs/gears_3-javadoc.jar b/libs/gears_3-javadoc.jar
index 9a1f06b2..b089ad1e 100644
Binary files a/libs/gears_3-javadoc.jar and b/libs/gears_3-javadoc.jar differ
diff --git a/libs/gears_3-sources.jar b/libs/gears_3-sources.jar
index ea76a89c..5fa86649 100644
Binary files a/libs/gears_3-sources.jar and b/libs/gears_3-sources.jar differ
diff --git a/libs/gears_3.jar b/libs/gears_3.jar
index 25744322..0a846687 100644
Binary files a/libs/gears_3.jar and b/libs/gears_3.jar differ
diff --git a/rears/src/main/scala/io/github/tassiLuca/rears/Boundary.scala b/rears/src/main/scala/io/github/tassiLuca/rears/Boundary.scala
index 5c0c2e86..60e1d4bc 100644
--- a/rears/src/main/scala/io/github/tassiLuca/rears/Boundary.scala
+++ b/rears/src/main/scala/io/github/tassiLuca/rears/Boundary.scala
@@ -1,5 +1,6 @@
package io.github.tassiLuca.rears
+import gears.async.Async.Spawn
import gears.async.{Async, Channel, ReadableChannel, SendableChannel, Task, UnboundedChannel}
import gears.async.TaskSchedule.RepeatUntilFailure
@@ -23,14 +24,14 @@ trait Consumer[E, S]:
val listeningChannel: SendableChannel[Try[E]] = UnboundedChannel()
/** @return a runnable [[Task]]. */
- def asRunnable: Task[Unit] = Task {
+ def asRunnable(using Async.Spawn): Task[Unit] = Task:
listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach(react)
- }.schedule(RepeatUntilFailure())
+ .schedule(RepeatUntilFailure())
/** The suspendable reaction triggered upon a new read of an item succeeds. */
- protected def react(e: Try[E])(using Async): S
+ protected def react(e: Try[E])(using Async.Spawn): S
-/** A mixin to make consumer stateful. Its state is updated with the result of the [[react]]ion.
+/** A mixin to turn consumer stateful. Its state is updated with the result of the [[react]]ion.
* Initially its state is set to [[initialValue]].
*/
trait State[E, S](initialValue: S):
@@ -41,8 +42,8 @@ trait State[E, S](initialValue: S):
/** @return the current state of the consumer. */
def state: S = synchronized(_state)
- override def asRunnable: Task[Unit] = Task {
- listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach { e =>
- synchronized { _state = react(e) }
- }
- }.schedule(RepeatUntilFailure())
+ override def asRunnable(using Async.Spawn): Task[Unit] = Task:
+ listeningChannel.asInstanceOf[Channel[Try[E]]].read().foreach: e =>
+ synchronized:
+ _state = react(e)
+ .schedule(RepeatUntilFailure())
diff --git a/rears/src/main/scala/io/github/tassiLuca/rears/Controller.scala b/rears/src/main/scala/io/github/tassiLuca/rears/Controller.scala
index b51db125..3ebb7430 100644
--- a/rears/src/main/scala/io/github/tassiLuca/rears/Controller.scala
+++ b/rears/src/main/scala/io/github/tassiLuca/rears/Controller.scala
@@ -15,9 +15,9 @@ object Controller:
transformation: PipelineTransformation[T, R] = identity,
): Task[Unit] =
val transformedChannel = transformation(publisherChannel)
- Task {
+ Task:
consumer.listeningChannel.send(transformedChannel.read())
- }.schedule(RepeatUntilFailure())
+ .schedule(RepeatUntilFailure())
/** Creates a runnable [[Task]] forwarding the items read from the [[publisherChannel]] to
* all consumers' channels, after having it transformed with the given [[transformation]].
diff --git a/rears/src/main/scala/io/github/tassiLuca/rears/PipelineTransformation.scala b/rears/src/main/scala/io/github/tassiLuca/rears/PipelineTransformation.scala
index f0df83a3..d05f24e8 100644
--- a/rears/src/main/scala/io/github/tassiLuca/rears/PipelineTransformation.scala
+++ b/rears/src/main/scala/io/github/tassiLuca/rears/PipelineTransformation.scala
@@ -15,7 +15,7 @@ import scala.util.{Failure, Success, Try}
*/
type PipelineTransformation[T, R] = ReadableChannel[T] => ReadableChannel[R]
-extension [T](r: ReadableChannel[T])(using Async)
+extension [T](r: ReadableChannel[T])(using Async.Spawn)
/** @return a new [[ReadableChannel]] whose elements passes the given predicate [[p]].
*
@@ -30,10 +30,10 @@ extension [T](r: ReadableChannel[T])(using Async)
* --------2--------------4------6-------8--------10->
*
*/
- def filter(p: T => Boolean): ReadableChannel[T] = fromNew[T] { emitter =>
- val value = r.read().get
- if p(value) then emitter.send(value)
- }
+ def filter(p: T => Boolean): ReadableChannel[T] =
+ fromNew[T]: emitter =>
+ val value = r.read().get
+ if p(value) then emitter.send(value)
/** @return a new [[ReadableChannel]] whose values are transformed accordingly to the given function [[f]].
*
@@ -48,9 +48,8 @@ extension [T](r: ReadableChannel[T])(using Async)
* ----1---4-------9----16--25-----36-------49------->
*
*/
- def map[R](f: T => R): ReadableChannel[R] = fromNew[R] { emitter =>
+ def map[R](f: T => R): ReadableChannel[R] = fromNew[R]: emitter =>
emitter.send(f(r.read().get))
- }
/** @return a new [[ReadableChannel]] whose elements are emitted only after
* the given [[timespan]] has elapsed since the last emission.
@@ -70,13 +69,12 @@ extension [T](r: ReadableChannel[T])(using Async)
*/
def debounce(timespan: Duration): ReadableChannel[T] =
var lastEmission: Option[Long] = None
- fromNew[T] { emitter =>
+ fromNew[T]: emitter =>
val value = r.read().get
val now = System.currentTimeMillis()
if lastEmission.isEmpty || now - lastEmission.get >= timespan.toMillis then
emitter.send(value)
lastEmission = Some(now)
- }
/** Groups the items emitted by a [[ReadableChannel]] according to the given [[keySelector]].
* @return key-value pairs, where the keys are the set of results obtained from applying the
@@ -103,14 +101,13 @@ extension [T](r: ReadableChannel[T])(using Async)
*/
def groupBy[K](keySelector: T => K): ReadableChannel[(K, ReadableChannel[T])] =
var channels = Map[K, UnboundedChannel[T]]()
- fromNew[(K, UnboundedChannel[T])] { emitter =>
+ fromNew[(K, UnboundedChannel[T])]: emitter =>
val value = r.read().get
val key = keySelector(value)
if !channels.contains(key) then
channels = channels + (key -> UnboundedChannel[T]())
emitter.send(key -> channels(key))
channels(key).send(value)
- }
/** @return a new [[ReadableChannel]] whose elements are buffered in a [[List]] of size [[n]].
* If [[timespan]] duration is elapsed since last read the list is emitted
@@ -130,16 +127,15 @@ extension [T](r: ReadableChannel[T])(using Async)
*/
def buffer(n: Int, timespan: Duration = 5 seconds): ReadableChannel[List[T]] =
var buffer = List[T]()
- fromNew[List[T]] { emitter =>
+ fromNew[List[T]]: emitter =>
val timer = Timer(timespan)
- Future { timer.run() }
+ Future(timer.run())
val value = Async.raceWithOrigin(r.readSource, timer.src).awaitResult
timer.cancel()
if value._2 != timer.src then buffer = buffer :+ value._1.asInstanceOf[Either[Closed, T]].get
if value._2 == timer.src || buffer.size == n then
emitter.send(buffer)
buffer = List.empty
- }
/** @return a new [[ReadableChannel]] whose elements are buffered in a [[List]] of items
* if emitted within [[timespan]] duration after the first one (default: 5 seconds).
@@ -158,31 +154,27 @@ extension [T](r: ReadableChannel[T])(using Async)
*/
def bufferWithin(timespan: Duration = 5 seconds): ReadableChannel[List[T]] =
var buffer = List[T]()
- fromNew[List[T]] { emitter =>
+ fromNew[List[T]]: emitter =>
val timer = Timer(timespan)
buffer = buffer :+ r.read().get
- Future { timer.run() }
+ Future(timer.run())
Async.group:
- val tf = Future { timer.src.awaitResult }
- val tr = Task {
+ val tf = Future(timer.src.awaitResult)
+ val tr = Task:
buffer = buffer :+ r.read().get
- }.schedule(RepeatUntilFailure()).run
- tr.altWithCancel(tf).awaitResult
+ .schedule(RepeatUntilFailure()).start()
+ tr.orWithCancel(tf).awaitResult
emitter.send(buffer)
buffer = List.empty
timer.cancel()
- }
-// IMPORTANT REMARK: if Async ?=> is omitted the body of the task is intended to be **not**
-// suspendable, leading to the block of the context until the task fails!
-// See `TasksTest` in commons tests for more about the task scheduling behavior.
private def fromNew[T](
- transformation: Async ?=> SendableChannel[T] => Unit,
-)(using Async): ReadableChannel[T] =
+ transformation: Async.Spawn ?=> SendableChannel[T] => Unit,
+)(using Async.Spawn): ReadableChannel[T] =
val channel = UnboundedChannel[T]()
- Task {
+ Task:
Try(transformation(channel.asSendable)) match
case s @ Success(_) => s
case f @ Failure(_) => channel.close(); f
- }.schedule(RepeatUntilFailure()).run
+ .schedule(RepeatUntilFailure()).start()
channel.asReadable
diff --git a/rears/src/test/scala/io/github/tassiLuca/rears/ControllerTest.scala b/rears/src/test/scala/io/github/tassiLuca/rears/ControllerTest.scala
index 49658b38..22264e43 100644
--- a/rears/src/test/scala/io/github/tassiLuca/rears/ControllerTest.scala
+++ b/rears/src/test/scala/io/github/tassiLuca/rears/ControllerTest.scala
@@ -23,11 +23,9 @@ class ControllerTest extends AnyFlatSpec with Matchers:
consumer(e => consumerBValues = consumerBValues :+ e),
)
Async.blocking:
- Controller.oneToMany(producer.publishingChannel, consumers, identity).run
- consumers.foreach(_.asRunnable.run)
- producer.asRunnable.run.await
- // TODO: improve with an extension method that wait for a certain amount of time,
- // at the expiration of which the channel are closed and stop blocking!
+ Controller.oneToMany(producer.publishingChannel, consumers, identity).start()
+ consumers.foreach(_.asRunnable.start())
+ producer.asRunnable.start().await
AsyncOperations.sleep(2_000) // Ensure consumers have completed their reaction to publisher's events
consumerAValues shouldEqual consumerBValues
consumerAValues shouldBe Seq.range(0, items).map(Success(_))
@@ -43,4 +41,4 @@ class ControllerTest extends AnyFlatSpec with Matchers:
def consumer(action: Try[Item] => Unit): Consumer[Int, Unit] = new Consumer[Int, Unit]:
override val listeningChannel: SendableChannel[Try[Item]] = UnboundedChannel[Try[Int]]()
- override def react(e: Try[Item])(using Async): Unit = action(e)
+ override def react(e: Try[Item])(using Async.Spawn): Unit = action(e)
diff --git a/rears/src/test/scala/io/github/tassiLuca/rears/PipelineTransformationsTest.scala b/rears/src/test/scala/io/github/tassiLuca/rears/PipelineTransformationsTest.scala
index 23d0cc03..d646ff13 100644
--- a/rears/src/test/scala/io/github/tassiLuca/rears/PipelineTransformationsTest.scala
+++ b/rears/src/test/scala/io/github/tassiLuca/rears/PipelineTransformationsTest.scala
@@ -15,17 +15,19 @@ class PipelineTransformationsTest extends AnyFunSpec with Matchers {
describe("Filtering a channel") {
it("return a new channel with only the elements passing the predicate") {
Async.blocking:
- val filtered = producer.filter(_ % 2 == 0)
- for i <- 2 to 10 by 2 do filtered.read() shouldBe Right(i)
+ withResource(producer): c =>
+ val filtered = c.filter(_ % 2 == 0)
+ for i <- 2 to 10 by 2 do filtered.read() shouldBe Right(i)
}
}
describe("Mapping a channel") {
it("return a new channel whose values are transformed accordingly to the given function") {
+ val f: Int => Int = x => x * x
Async.blocking:
- val f: Int => Int = x => x * x
- val mapped = producer.map(f)
- for i <- 1 to 10 do mapped.read() shouldBe Right(f(i))
+ withResource(producer): c =>
+ val mapped = c.map(f)
+ for i <- 1 to 10 do mapped.read() shouldBe Right(f(i))
}
}
@@ -33,24 +35,26 @@ class PipelineTransformationsTest extends AnyFunSpec with Matchers {
it("return a new channel whose first item is emitted immediately") {
val span = 1.seconds
Async.blocking:
- val debounced = infiniteProducer().debounce(span)
- val before = System.currentTimeMillis()
- debounced.read()
- val now = System.currentTimeMillis()
- now - before should be < span.toMillis
+ withResource(infiniteProducer()): c =>
+ val debounced = c.debounce(span)
+ val before = System.currentTimeMillis()
+ debounced.read()
+ val now = System.currentTimeMillis()
+ now - before should be < span.toMillis
}
it("return a new channel that emit an item if the given timespan has passed without emitting anything") {
val span = 2.seconds
val tolerance = 10.milliseconds
Async.blocking:
- val debounced = infiniteProducer().debounce(span)
- debounced.read()
- val before = System.currentTimeMillis()
- for _ <- 1 to 4 do
+ withResource(infiniteProducer()): c =>
+ val debounced = c.debounce(span)
debounced.read()
- val now = System.currentTimeMillis()
- now - before should be > (span.toMillis - tolerance.toMillis)
+ val before = System.currentTimeMillis()
+ for _ <- 1 to 4 do
+ debounced.read()
+ val now = System.currentTimeMillis()
+ now - before should be > (span.toMillis - tolerance.toMillis)
}
}
@@ -58,16 +62,18 @@ class PipelineTransformationsTest extends AnyFunSpec with Matchers {
it("return a new channel that periodically gather items into bundles and emit them") {
val step = 2
Async.blocking:
- val buffered = producer.buffer(step)
- for i <- 1 to 10 by step do buffered.read() shouldBe Right(List.range(i, i + step))
+ withResource(producer): c =>
+ val buffered = c.buffer(step)
+ for i <- 1 to 10 by step do buffered.read() shouldBe Right(List.range(i, i + step))
}
it("group fewer items if the nth element is not read within the given timespan") {
val step = 3
Async.blocking:
- val buffered = producer.buffer(n = step, timespan = 2.seconds)
- for i <- 1 to 9 by step do buffered.read() shouldBe Right(List.range(i, i + step))
- buffered.read() shouldBe Right(List(10))
+ withResource(producer): c =>
+ val buffered = c.buffer(n = step, timespan = 2.seconds)
+ for i <- 1 to 9 by step do buffered.read() shouldBe Right(List.range(i, i + step))
+ buffered.read() shouldBe Right(List(10))
}
}
@@ -79,47 +85,56 @@ class PipelineTransformationsTest extends AnyFunSpec with Matchers {
infiniteProducer(every = 3500 milliseconds, channel = c)
val buffered = c.bufferWithin(2 seconds)
for i <- 0 to 3 do buffered.read() shouldBe Right(List(i, i))
+ c.close()
}
}
describe("Grouping a channel on an element selector") {
it("return a Map with the correct group of channel") {
Async.blocking:
- val grouped = producer.groupBy(_ % 2 == 0)
- for _ <- 0 until 2 do
- val group = grouped.read()
- group.isRight shouldBe true
- group.toOption.get match
- case (false, c) => for i <- 1 to 10 by 2 do c.read() shouldBe Right(i)
- case (true, c) => for i <- 2 to 10 by 2 do c.read() shouldBe Right(i)
+ withResource(producer): c =>
+ val grouped = c.groupBy(_ % 2 == 0)
+ for _ <- 0 until 2 do
+ val group = grouped.read()
+ group.isRight shouldBe true
+ group.toOption.get match
+ case (false, c) => for i <- 1 to 10 by 2 do c.read() shouldBe Right(i)
+ case (true, c) => for i <- 2 to 10 by 2 do c.read() shouldBe Right(i)
}
}
- describe("Transforming a channel already closed should determine the closing of the new channel") {
- Async.blocking:
- val c = UnboundedChannel[Int]()
- c.close()
- c.filter(_ % 2 == 0).read() shouldBe Left(Channel.Closed)
- c.map(_ * 2).read() shouldBe Left(Channel.Closed)
- c.debounce(1 second).read() shouldBe Left(Channel.Closed)
- c.buffer(2).read() shouldBe Left(Channel.Closed)
- c.bufferWithin(2 seconds).read() shouldBe Left(Channel.Closed)
- c.groupBy(_ % 2 == 0).read() shouldBe Left(Channel.Closed)
+ describe("Transforming a channel already closed") {
+ it("determine the closing of the new channel") {
+ Async.blocking:
+ val c = UnboundedChannel[Int]()
+ c.close()
+ c.filter(_ % 2 == 0).read() shouldBe Left(Channel.Closed)
+ c.map(_ * 2).read() shouldBe Left(Channel.Closed)
+ c.debounce(1 second).read() shouldBe Left(Channel.Closed)
+ c.buffer(2).read() shouldBe Left(Channel.Closed)
+ c.bufferWithin(2 seconds).read() shouldBe Left(Channel.Closed)
+ c.groupBy(_ % 2 == 0).read() shouldBe Left(Channel.Closed)
+ }
}
- def producer(using Async): ReadableChannel[Int] =
+ def withResource(channel: Channel[Int])(test: Channel[Int] => Unit): Unit =
+ test(channel)
+ channel.close()
+
+ def producer(using Async.Spawn): Channel[Int] =
val channel = UnboundedChannel[Int]()
- Future { for i <- 1 to 10 do channel.send(i) }
- channel.asReadable
+ Future:
+ for i <- 1 to 10 do channel.send(i)
+ channel
def infiniteProducer(
every: Duration = 500 milliseconds,
channel: Channel[Int] = UnboundedChannel[Int](),
- )(using Async): ReadableChannel[Int] =
+ )(using Async.Spawn): Channel[Int] =
var i = 0
- Task {
+ Task:
channel.send(i)
i = i + 1
- }.schedule(TaskSchedule.Every(every.toMillis)).run
- channel.asReadable
+ .schedule(TaskSchedule.Every(every.toMillis)).start()
+ channel
}
diff --git a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/Launcher.scala b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/Launcher.scala
index ef143ad0..8de1a454 100644
--- a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/Launcher.scala
+++ b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/Launcher.scala
@@ -5,6 +5,5 @@ import gears.async.Async
import io.github.tassiLuca.hub.adapters.MockedHubManager
/** The launcher of a mocked version of the application, using UI simulators. */
-@main def launchUIMockedHub(): Unit =
- Async.blocking:
- MockedHubManager().run()
+@main def launchUIMockedHub(): Unit = Async.blocking:
+ MockedHubManager().run()
diff --git a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/adapters/GraphicalSource.scala b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/adapters/GraphicalSource.scala
index 72827f73..3b14eb78 100644
--- a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/adapters/GraphicalSource.scala
+++ b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/adapters/GraphicalSource.scala
@@ -6,7 +6,7 @@ import gears.async.{Async, Future, Task}
import io.github.tassiLuca.hub.adapters.ui.SourceUI
import io.github.tassiLuca.hub.core.{LuminosityEntry, SensorSource, TemperatureEntry}
-class GraphicalSource(using Async) extends SensorSource:
+class GraphicalSource(using Async.Spawn) extends SensorSource:
private val sources = Set(
SourceUI("temperature", publishTemperatureEntry),
diff --git a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/adapters/MockedHubManager.scala b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/adapters/MockedHubManager.scala
index a416e741..e9db19d7 100644
--- a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/adapters/MockedHubManager.scala
+++ b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/adapters/MockedHubManager.scala
@@ -9,7 +9,7 @@ import io.github.tassiLuca.rears.groupBy
import scala.language.postfixOps
/** A concrete hub manager, mocking sources with graphical views. */
-class MockedHubManager(using Async, AsyncOperations):
+class MockedHubManager(using Async.Spawn, AsyncOperations):
private val ui = DashboardUI()
private val sensorsSource = GraphicalSource()
@@ -21,12 +21,12 @@ class MockedHubManager(using Async, AsyncOperations):
thermostatManager.thermostat.scheduler.schedule.map((d, t) => (s"${d._1}", s"${d._2}") -> s"$t"),
)
val channelBySensor = sensorsSource.publishingChannel.groupBy(_.getClass)
- Task {
+ Task:
channelBySensor.read() match
case Right((clazz, c)) if clazz == classOf[TemperatureEntry] =>
thermostatManager.run(c.asInstanceOf[ReadableChannel[TemperatureEntry]])
case Right((clazz, c)) if clazz == classOf[LuminosityEntry] =>
lightingManager.run(c.asInstanceOf[ReadableChannel[LuminosityEntry]])
case _ => ()
- }.schedule(RepeatUntilFailure()).run
- sensorsSource.asRunnable.run.await
+ .schedule(RepeatUntilFailure()).start()
+ sensorsSource.asRunnable.start().await
diff --git a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/application/LightingManager.scala b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/application/LightingManager.scala
index 5f5b27ee..e36383f3 100644
--- a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/application/LightingManager.scala
+++ b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/application/LightingManager.scala
@@ -14,10 +14,10 @@ trait LightingManager extends LightingSystemComponent with LampsComponent with D
private val samplingWindow = 10 seconds
/** Runs the manager, spawning a new controller consuming the given [[source]] of events. */
- def run(source: ReadableChannel[LuminosityEntry])(using Async, AsyncOperations): Unit =
- lightingSystem.asRunnable.run
+ def run(source: ReadableChannel[LuminosityEntry])(using Async.Spawn, AsyncOperations): Unit =
+ lightingSystem.asRunnable.start()
Controller.oneToOne(
publisherChannel = source,
consumer = lightingSystem,
- transformation = r => r.bufferWithin(samplingWindow),
- ).run
+ transformation = _.bufferWithin(samplingWindow),
+ ).start()
diff --git a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/application/ThermostatManager.scala b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/application/ThermostatManager.scala
index 2e1121ce..1bbe8323 100644
--- a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/application/ThermostatManager.scala
+++ b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/application/ThermostatManager.scala
@@ -26,11 +26,11 @@ trait ThermostatManager
private val samplingWindow = 10 seconds
/** Runs the manager, spawning a new controller consuming the given [[source]] of events. */
- def run(source: ReadableChannel[TemperatureEntry])(using Async, AsyncOperations): Unit =
- thermostat.asRunnable.run
- sensorHealthChecker.asRunnable.run
+ def run(source: ReadableChannel[TemperatureEntry])(using Async.Spawn, AsyncOperations): Unit =
+ thermostat.asRunnable.start()
+ sensorHealthChecker.asRunnable.start()
Controller.oneToMany(
publisherChannel = source,
consumers = Set(thermostat, sensorHealthChecker),
- transformation = r => r.bufferWithin(samplingWindow),
- ).run
+ transformation = _.bufferWithin(samplingWindow),
+ ).start()
diff --git a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/LightingSystemComponent.scala b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/LightingSystemComponent.scala
index 05f1b7ad..baa73aac 100644
--- a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/LightingSystemComponent.scala
+++ b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/LightingSystemComponent.scala
@@ -20,6 +20,6 @@ trait LightingSystemComponent:
def apply(): LightingSystem = LightingSystemImpl()
private class LightingSystemImpl extends LightingSystem:
- override protected def react(e: Try[Seq[LuminosityEntry]])(using Async): Unit =
+ override protected def react(e: Try[Seq[LuminosityEntry]])(using Async.Spawn): Unit =
val averageLuminosity = e.map { entries => entries.map(_.luminosity).sum / entries.size }.toOption
averageLuminosity.foreach(context.dashboard.luminosityUpdate(_))
diff --git a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/SensorHealthCheckerComponent.scala b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/SensorHealthCheckerComponent.scala
index c7ae3efe..b4178a9b 100644
--- a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/SensorHealthCheckerComponent.scala
+++ b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/SensorHealthCheckerComponent.scala
@@ -25,14 +25,14 @@ trait SensorHealthCheckerComponent[E <: SensorEvent]:
private class SensorHealthCheckerImpl extends SensorHealthChecker with State[Seq[E], Seq[E]](Seq()):
- override protected def react(e: Try[Seq[E]])(using Async): Seq[E] = e match
+ override protected def react(e: Try[Seq[E]])(using Async.Spawn): Seq[E] = e match
case Success(current) =>
val noMoreActive = state.map(_.name).toSet -- current.map(_.name).toSet
if noMoreActive.nonEmpty then sendAlert(s"[$currentTime] ${noMoreActive.mkString(", ")} no more active!")
current
case Failure(es) => sendAlert(es.getMessage); Seq()
- private def sendAlert(message: String)(using Async): Unit = Future:
+ private def sendAlert(message: String)(using Async.Spawn): Unit = Future:
context.alertSystem.notify(message)
context.dashboard.alertNotified(message)
diff --git a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/ThermostatComponent.scala b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/ThermostatComponent.scala
index 1f2c06f4..b070eac0 100644
--- a/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/ThermostatComponent.scala
+++ b/smart-hub-direct/src/main/scala/io/github/tassiLuca/hub/core/ThermostatComponent.scala
@@ -31,22 +31,22 @@ trait ThermostatComponent[T <: ThermostatScheduler]:
private val hysteresis = 1.5
- override protected def react(e: Try[Seq[TemperatureEntry]])(using Async): Option[Temperature] =
+ override protected def react(e: Try[Seq[TemperatureEntry]])(using Async.Spawn): Option[Temperature] =
for
averageTemperature <- e.map { entries => entries.map(_.temperature).sum / entries.size }.toOption
_ = averageTemperature.evaluate()
yield averageTemperature
extension (t: Temperature)
- private def evaluate()(using Async): Unit =
+ private def evaluate()(using Async.Spawn): Unit =
val target = scheduler.currentTarget
if t > target + hysteresis then offHeater() else if t < target then onHeater()
context.dashboard.temperatureUpdated(t)
- private def offHeater()(using Async): Unit = Future:
+ private def offHeater()(using Async.Spawn): Unit = Future:
context.heater.off()
context.dashboard.offHeaterNotified()
- private def onHeater()(using Async): Unit = Future:
+ private def onHeater()(using Async.Spawn): Unit = Future:
context.heater.on()
context.dashboard.onHeaterNotified()
diff --git a/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/LightingManagerTest.scala b/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/LightingManagerTest.scala
index 4e488d7a..e767dc33 100644
--- a/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/LightingManagerTest.scala
+++ b/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/LightingManagerTest.scala
@@ -1,7 +1,7 @@
package io.github.tassiLuca.hub.application
import gears.async.default.given
-import gears.async.{Async, AsyncOperations, ReadableChannel, SendableChannel, Task, UnboundedChannel}
+import gears.async.{Async, AsyncOperations, Channel, ReadableChannel, SendableChannel, Task, UnboundedChannel}
import io.github.tassiLuca.hub.core.LuminosityEntry
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
@@ -10,21 +10,15 @@ class LightingManagerTest extends AnyFlatSpec with Matchers {
private val thermostatManager = TestableLightingManager
- private def sensorSource(
- publishLogic: Async ?=> SendableChannel[LuminosityEntry] => Unit,
- )(using Async): (ReadableChannel[LuminosityEntry], Task[Unit]) =
- val channel = UnboundedChannel[LuminosityEntry]()
- (channel.asReadable, Task { publishLogic(channel.asSendable) })
-
"The lighting system" should "receive event from the source" in {
Async.blocking:
- val (channel, task) = sensorSource { c =>
+ val (channel, task) = sensorSource[LuminosityEntry]: c =>
c.send(LuminosityEntry("l1", 500))
c.send(LuminosityEntry("l2", 700L))
- }
thermostatManager.run(channel)
- task.run.awaitResult
+ task.start().awaitResult
AsyncOperations.sleep(samplingWindow.toMillis + 1_000)
thermostatManager.dashboardMessages should contain(Message.LuminosityUpdate)
+ channel.close()
}
}
diff --git a/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/ThermostatManagerTest.scala b/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/ThermostatManagerTest.scala
index 42b3c8a8..5af763e0 100644
--- a/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/ThermostatManagerTest.scala
+++ b/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/ThermostatManagerTest.scala
@@ -2,7 +2,7 @@ package io.github.tassiLuca.hub.application
import gears.async.TaskSchedule.Every
import gears.async.default.given
-import gears.async.{Async, AsyncOperations, ReadableChannel, SendableChannel, Task, UnboundedChannel}
+import gears.async.{Async, AsyncOperations, Channel, ReadableChannel, SendableChannel, Task, UnboundedChannel}
import io.github.tassiLuca.hub.core.TemperatureEntry
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
@@ -10,36 +10,31 @@ import org.scalatest.matchers.should.Matchers
class ThermostatManagerTest extends AnyFlatSpec with Matchers {
private val thermostatManager = TestableThermostatManager
- private def sensorSource(
- publishLogic: Async ?=> SendableChannel[TemperatureEntry] => Unit,
- )(using Async): (ReadableChannel[TemperatureEntry], Task[Unit]) =
- val channel = UnboundedChannel[TemperatureEntry]()
- (channel.asReadable, Task { publishLogic(channel.asSendable) })
"The thermostat" should "receive event from the source" in {
Async.blocking:
- val (channel, task) = sensorSource { c =>
+ val (channel, task) = sensorSource[TemperatureEntry]: c =>
c.send(TemperatureEntry("t1", 0L))
c.send(TemperatureEntry("t2", 2L))
- }
- thermostatManager.run(channel)
- task.run.awaitResult
+ thermostatManager.run(channel.asReadable)
+ task.start().awaitResult
AsyncOperations.sleep(samplingWindow.toMillis + 1_000)
thermostatManager.heaterActions should contain(Message.HeaterOn)
thermostatManager.thermostat.state should be(Some(1.0))
thermostatManager.alerts should not contain Message.Alert
+ channel.close()
}
"The checker" should "note possible malfunctioning" in {
val sensorNames = scala.collection.mutable.Set("t1", "t2", "t3", "t4")
Async.blocking:
- val (channel, task) = sensorSource { c =>
+ val (channel, task) = sensorSource[TemperatureEntry]: c =>
c.send(TemperatureEntry(sensorNames.head, 0L))
c.send(TemperatureEntry(sensorNames.head, 1L))
- }
- thermostatManager.run(channel)
- task.schedule(Every(samplingWindow.toMillis, maxRepetitions = 2)).run.awaitResult
+ thermostatManager.run(channel.asReadable)
+ task.schedule(Every(samplingWindow.toMillis, maxRepetitions = 2)).start().awaitResult
AsyncOperations.sleep(samplingWindow.toMillis + 1_000)
thermostatManager.alerts should contain(Message.Alert)
+ channel.close()
}
}
diff --git a/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/Utils.scala b/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/Utils.scala
new file mode 100644
index 00000000..3e91fafd
--- /dev/null
+++ b/smart-hub-direct/src/test/scala/io/github/tassiLuca/hub/application/Utils.scala
@@ -0,0 +1,10 @@
+package io.github.tassiLuca.hub.application
+
+import gears.async.{Async, Channel, SendableChannel, Task, UnboundedChannel}
+import io.github.tassiLuca.hub.core.TemperatureEntry
+
+def sensorSource[T](
+ publishLogic: Async ?=> SendableChannel[T] => Unit,
+)(using Async): (Channel[T], Task[Unit]) =
+ val channel = UnboundedChannel[T]()
+ (channel, Task { publishLogic(channel.asSendable) })