Skip to content

Commit

Permalink
refactor: update gears to 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Mar 17, 2024
1 parent b5d768f commit 01dab97
Show file tree
Hide file tree
Showing 60 changed files with 696 additions and 553 deletions.
2 changes: 2 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .idea/kotlinc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 0 additions & 13 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Direct style for Functional Reactive Programming: an analysis in Scala & Kotlin
# Analysis of direct style (for asynchronous reactive programming) in Scala

## Goals of the project

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import io.github.tassiLuca.analyzer.commons.lib.RepositoryReport
import io.github.tassiLuca.analyzer.lib.{Analyzer, RepositoryService}

object AppController:
def direct(using Async, AsyncOperations): AppController = DirectAppController()
def direct(using Async.Spawn, AsyncOperations): AppController = DirectAppController()

private class DirectAppController(using Async, AsyncOperations) extends AppController:
private class DirectAppController(using Async.Spawn, AsyncOperations) extends AppController:
private val view = AnalyzerView.gui(this)
private val analyzer = Analyzer.incremental(RepositoryService.ofGitHub())
private var currentComputation: Option[Future[Unit]] = None
Expand All @@ -18,10 +18,10 @@ object AppController:
override def runSession(organizationName: String): Unit =
var organizationReport: OrganizationReport = (Map(), Set())
val f = Future:
analyzer.analyze(organizationName) { report =>
analyzer.analyze(organizationName): report =>
organizationReport = organizationReport.mergeWith(report)
view.update(organizationReport)
} match { case Left(e) => view.error(e); case _ => view.endComputation() }
match { case Left(e) => view.error(e); case _ => view.endComputation() }
currentComputation = Some(f)

override def stopSession(): Unit = currentComputation.foreach(_.cancel())
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ object Analyzer:
* making the computation faster when dealing with a high number of repositories.
*/
def incremental(service: RepositoryService): Analyzer = IncrementalAnalyzer(service)

def flowing(service: RepositoryService): Analyzer = FlowingAnalyzer(service)
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,57 @@ import io.github.tassiLuca.dse.boundaries.either
import io.github.tassiLuca.dse.boundaries.either.?
import io.github.tassiLuca.dse.pimping.TerminableChannelOps.foreach
import io.github.tassiLuca.dse.pimping.asTry
import io.github.tassiLuca.dse.pimping.FlowOps.{map, toSeq}

import scala.util.boundary.Label

abstract class AbstractAnalyzer(repositoryService: RepositoryService) extends Analyzer:

extension (r: Repository)
protected def performAnalysis(using Async): Task[RepositoryReport] = Task:
val contributions = Future:
repositoryService.contributorsOf(r.organization, r.name)
val release = repositoryService.lastReleaseOf(r.organization, r.name)
RepositoryReport(r.name, r.issues, r.stars, contributions.await.getOrElse(Seq()), release.toOption)
Async.group:
val contributions = Future:
repositoryService.contributorsOf(r.organization, r.name)
val release = repositoryService.lastReleaseOf(r.organization, r.name)
RepositoryReport(r.name, r.issues, r.stars, contributions.await.getOrElse(Seq()), release.toOption)

private class BasicAnalyzer(repositoryService: RepositoryService) extends AbstractAnalyzer(repositoryService):

override def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async, AsyncOperations): Either[String, Seq[RepositoryReport]] = either:
val reposInfo = repositoryService.repositoriesOf(organizationName).?.map(_.performAnalysis.run)
val collector = Collector[RepositoryReport](reposInfo.toList*)
reposInfo.foreach(_ => updateResults(collector.results.read().asTry.?.awaitResult.?))
reposInfo.awaitAll
Async.group:
val reposInfo = repositoryService.repositoriesOf(organizationName).?
.map(_.performAnalysis.start())
val collector = Collector[RepositoryReport](reposInfo.toList*)
reposInfo.foreach: _ =>
updateResults(collector.results.read().asTry.?.awaitResult.?)
reposInfo.awaitAll

private class IncrementalAnalyzer(repositoryService: RepositoryService) extends AbstractAnalyzer(repositoryService):

override def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async, AsyncOperations): Either[String, Seq[RepositoryReport]] = either:
val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName)
var futures = Seq[Future[RepositoryReport]]()
reposInfo.foreach { repository =>
futures = futures :+ Future:
val report = repository.?.performAnalysis.run.awaitResult.?
synchronized(updateResults(report))
report
}
futures.awaitAll
Async.group:
val reposInfo = repositoryService.incrementalRepositoriesOf(organizationName)
var futureResults = Seq[Future[RepositoryReport]]()
reposInfo.foreach: repository =>
futureResults = futureResults :+ Future:
val report = repository.?.performAnalysis.start().awaitResult.?
synchronized(updateResults(report))
report
futureResults.awaitAllOrCancel

private class FlowingAnalyzer(repositoryService: RepositoryService) extends AbstractAnalyzer(repositoryService):

override def analyze(organizationName: String)(
updateResults: RepositoryReport => Unit,
)(using Async, AsyncOperations): Either[String, Seq[RepositoryReport]] = either:
Async.group:
repositoryService.flowingRepositoriesOf(organizationName).map: repository =>
Future:
val report = repository.performAnalysis.start().awaitResult.?
synchronized(updateResults(report))
report
.toSeq.?.awaitAllOrCancel
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package io.github.tassiLuca.analyzer.lib

import gears.async.{Async, Future, Listener, ReadableChannel, UnboundedChannel}
import gears.async.{Async, Future}
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
import io.github.tassiLuca.dse.pimping.{TerminableChannel, Terminated}
import io.github.tassiLuca.dse.pimping.{Flow, TerminableChannel}

import scala.annotation.tailrec

private class GitHubRepositoryService extends RepositoryService:

import sttp.model.Uri
import sttp.client3.{SimpleHttpClient, UriContext, basicRequest, Response}
import sttp.client3.{HttpClientSyncBackend, UriContext, basicRequest, Response}
import upickle.default.{read, Reader}

private val baseUrl = "https://api.github.com"
Expand All @@ -20,9 +20,12 @@ private class GitHubRepositoryService extends RepositoryService:

override def incrementalRepositoriesOf(
organizationName: String,
)(using Async): TerminableChannel[Either[String, Repository]] =
)(using Async.Spawn): TerminableChannel[Either[String, Repository]] =
incrementalPaginatedRequest(uri"$baseUrl/orgs/$organizationName/repos")

override def flowingRepositoriesOf(organizationName: String)(using Async): Flow[Repository] =
flowingPaginatedRequest(uri"$baseUrl/orgs/$organizationName/repos")

override def contributorsOf(
organizationName: String,
repositoryName: String,
Expand All @@ -32,9 +35,9 @@ private class GitHubRepositoryService extends RepositoryService:
override def incrementalContributorsOf(
organizationName: String,
repositoryName: String,
)(using Async): TerminableChannel[Either[String, Contribution]] =
)(using Async.Spawn): TerminableChannel[Either[String, Contribution]] =
incrementalPaginatedRequest(uri"$baseUrl/repos/$organizationName/$repositoryName/contributors")

override def lastReleaseOf(organizationName: String, repositoryName: String)(using Async): Either[String, Release] =
plainRequest[Release](uri"$baseUrl/repos/$organizationName/$repositoryName/releases/latest")

Expand All @@ -54,7 +57,7 @@ private class GitHubRepositoryService extends RepositoryService:

private def incrementalPaginatedRequest[T](
endpoint: Uri,
)(using Reader[T], Async): TerminableChannel[Either[String, T]] =
)(using Reader[T], Async.Spawn): TerminableChannel[Either[String, T]] =
val channel = TerminableChannel.ofUnbounded[Either[String, T]]
@tailrec
def withPagination(next: Option[Uri]): Unit = next match
Expand All @@ -69,9 +72,24 @@ private class GitHubRepositoryService extends RepositoryService:
Future(withPagination(Some(endpoint)))
channel

private def flowingPaginatedRequest[T](endpoint: Uri)(using Reader[T], Async): Flow[T] = Flow:
@tailrec
def withPagination(next: Option[Uri]): Unit = next match
case None => ()
case Some(uri) =>
val response = doRequest(uri)
response.body.map(read[Seq[T]](_)).fold(
errorMessage => failWith(errorMessage),
results => results.foreach(it.emit(_)),
)
withPagination(nextPage(response))
withPagination(Some(endpoint))

private def doRequest(endpoint: Uri): Response[Either[String, String]] =
SimpleHttpClient().send(request.get(endpoint))
HttpClientSyncBackend().send(request.get(endpoint))

private def nextPage(response: Response[Either[String, String]]): Option[Uri] = response.headers("Link")
.flatMap(_.split(",")).find(_.contains("rel=\"next\""))
.map(_.takeWhile(_ != ';').trim.stripPrefix("<").stripSuffix(">")).flatMap(Uri.parse(_).toOption)

private def failWith(errorMessage: String): Nothing = throw Exception(errorMessage)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.github.tassiLuca.analyzer.lib

import gears.async.Async
import io.github.tassiLuca.analyzer.commons.lib.{Contribution, Release, Repository}
import io.github.tassiLuca.dse.pimping.TerminableChannel
import io.github.tassiLuca.dse.pimping.{Flow, TerminableChannel}

/** A service exposing functions to retrieve data from a central hosting repository service. */
trait RepositoryService:
Expand All @@ -17,7 +17,9 @@ trait RepositoryService:
*/
def incrementalRepositoriesOf(
organizationName: String,
)(using Async): TerminableChannel[Either[String, Repository]]
)(using Async.Spawn): TerminableChannel[Either[String, Repository]]

def flowingRepositoriesOf(organizationName: String)(using Async): Flow[Repository]

/** @return [[Right]] with the [[Seq]]uence of [[Contribution]] for the given [[repositoryName]] owned by
* the given [[organizationName]] or a [[Left]] with a explanatory message in case of errors.
Expand All @@ -30,7 +32,7 @@ trait RepositoryService:
def incrementalContributorsOf(
organizationName: String,
repositoryName: String,
)(using Async): TerminableChannel[Either[String, Contribution]]
)(using Async.Spawn): TerminableChannel[Either[String, Contribution]]

/** @return a [[Right]] with the last [[Release]] of the given [[repositoryName]] owned by [[organizationName]]
* if it exists, or a [[Left]] with a explanatory message in case of errors.
Expand Down
Loading

0 comments on commit 01dab97

Please sign in to comment.