Skip to content

Commit

Permalink
feat(analyzer): flow based implementation in kotlin
Browse files Browse the repository at this point in the history
  • Loading branch information
tassiluca committed Feb 28, 2024
1 parent c4f0ac4 commit c9c8060
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.coroutines.CoroutineContext
/** The analyzer application controller. */
class AnalyzerAppController : AppController, CoroutineScope {
private val view = AnalyzerGUI(this)
private val analyzer = Analyzer.ofGitHub(GitHubRepositoryProvider())
private val analyzer = Analyzer.ofGithubByFlows(GitHubRepositoryProvider())
private var currentComputation: Job? = null

init {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,5 @@
package io.github.tassiLuca.analyzer.lib

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch

/** A generic analyzer of organization/group/workspace repositories. */
interface Analyzer {

Expand All @@ -19,51 +13,12 @@ interface Analyzer {
): Result<Set<RepositoryReport>>

companion object {
/** Creates a new GitHub organization [Analyzer]. */
fun ofGitHub(gitHubProvider: GitHubRepositoryProvider): Analyzer = GitHubAnalyzer(gitHubProvider)
}
}

private class GitHubAnalyzer(private val gitHubProvider: GitHubRepositoryProvider) : Analyzer {

override suspend fun analyze(
organizationName: String,
updateResults: suspend (RepositoryReport) -> Unit,
): Result<Set<RepositoryReport>> = coroutineScope {
runCatching {
val repositories = gitHubProvider.repositoriesOf(organizationName).getOrThrow()
val resultsChannel = analyzeAll(organizationName, repositories)
collectResults(resultsChannel, repositories.size, updateResults)
}
}

private fun CoroutineScope.analyzeAll(
organizationName: String,
repositories: List<Repository>,
): Channel<RepositoryReport> {
val channel = Channel<RepositoryReport>()
repositories.map {
launch {
val contributors = async { gitHubProvider.contributorsOf(organizationName, it.name).getOrThrow() }
val release = async { gitHubProvider.lastReleaseOf(organizationName, it.name).getOrThrow() }
channel.send(RepositoryReport(it.name, it.issues, it.stars, contributors.await(), release.await()))
}
}
return channel
}
/** Creates a new GitHub organization [Analyzer] based on Coroutines `Channel`s. */
fun ofGitHubByChannels(gitHubProvider: GitHubRepositoryProvider): Analyzer =
GitHubAnalyzerByChannels(gitHubProvider)

private suspend fun collectResults(
resultsChannel: Channel<RepositoryReport>,
expectedResults: Int,
updateResults: suspend (RepositoryReport) -> Unit,
): Set<RepositoryReport> {
var allReports = emptySet<RepositoryReport>()
repeat(expectedResults) {
val report = resultsChannel.receive()
allReports = allReports + report
updateResults(report)
}
resultsChannel.close()
return allReports
/** Creates a new GitHub organization [Analyzer] based on Coroutines `Flow`s. */
fun ofGithubByFlows(gitHubProvider: GitHubRepositoryProvider): Analyzer =
GitHubAnalyzerByFlows(gitHubProvider)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.github.tassiLuca.analyzer.lib

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch

internal class GitHubAnalyzerByChannels(private val provider: GitHubRepositoryProvider) : Analyzer {

override suspend fun analyze(
organizationName: String,
updateResults: suspend (RepositoryReport) -> Unit,
): Result<Set<RepositoryReport>> = coroutineScope {
runCatching {
val repositories = provider.repositoriesOf(organizationName).getOrThrow()
val resultsChannel = analyzeAll(organizationName, repositories)
collectResults(resultsChannel, repositories.size, updateResults)
}
}

private fun CoroutineScope.analyzeAll(
organizationName: String,
repositories: List<Repository>,
): Channel<RepositoryReport> {
val channel = Channel<RepositoryReport>()
repositories.map {
launch {
val contributors = async { provider.contributorsOf(organizationName, it.name).getOrThrow() }
val release = async { provider.lastReleaseOf(organizationName, it.name).getOrThrow() }
channel.send(RepositoryReport(it.name, it.issues, it.stars, contributors.await(), release.await()))
}
}
return channel
}

private suspend fun collectResults(
resultsChannel: Channel<RepositoryReport>,
expectedResults: Int,
updateResults: suspend (RepositoryReport) -> Unit,
): Set<RepositoryReport> {
var allReports = emptySet<RepositoryReport>()
repeat(expectedResults) {
val report = resultsChannel.receive()
allReports = allReports + report
updateResults(report)
}
resultsChannel.close()
return allReports
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.github.tassiLuca.analyzer.lib

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch

internal class GitHubAnalyzerByFlows(private val provider: GitHubRepositoryProvider) : Analyzer {

@OptIn(ExperimentalCoroutinesApi::class)
@SuppressWarnings("InjectDispatcher")
override suspend fun analyze(
organizationName: String,
updateResults: suspend (RepositoryReport) -> Unit,
): Result<Set<RepositoryReport>> = coroutineScope {
runCatching {
val reports = provider.flowingRepositoriesOf(organizationName)
.flatMapConcat { analyzeAll(it) }
.flowOn(Dispatchers.Default)
var allReports = emptySet<RepositoryReport>()
reports.collect {
updateResults(it)
allReports = allReports + it
}
allReports
}
}

@SuppressWarnings("InjectDispatcher")
private fun analyzeAll(repositories: List<Repository>): Flow<RepositoryReport> = channelFlow {
repositories.forEach { repository ->
launch {
val release = async { provider.lastReleaseOf(repository.organization, repository.name).getOrThrow() }
provider.flowingContributorsOf(repository.organization, repository.name).toList().forEach {
send(RepositoryReport(repository.name, repository.issues, repository.stars, it, release.await()))
}
}
}
}.flowOn(Dispatchers.Default)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class AnalyzerTest : FunSpec() {
)

init {
test("Analyzer should return the correct results if given in input an existing organization") {
test("Channel based analyzer should return the correct results if no errors occur") {
var incrementalResults = emptySet<RepositoryReport>()
runBlocking {
val service = successfulService()
Expand All @@ -31,7 +31,7 @@ class AnalyzerTest : FunSpec() {
}
}

test("Analyzer should return a failure if given in input a non-existing organization") {
test("Channel based analyzer should return a failure if given in input a non-existing organization") {
var incrementalResults = emptySet<RepositoryReport>()
runBlocking {
val service = failingService()
Expand All @@ -58,13 +58,13 @@ class AnalyzerTest : FunSpec() {
`when`(gitHubProvider.lastReleaseOf(repo.organization, repo.name))
.thenReturn(Result.success(data.second))
}
return Analyzer.ofGitHub(gitHubProvider)
return Analyzer.ofGitHubByChannels(gitHubProvider)
}

private suspend fun failingService(): Analyzer {
val gitHubProvider = mock<GitHubRepositoryProvider>()
`when`(gitHubProvider.repositoriesOf("dse"))
.thenReturn(Result.failure(RuntimeException("404, not found")))
return Analyzer.ofGitHub(gitHubProvider)
return Analyzer.ofGitHubByChannels(gitHubProvider)
}
}

0 comments on commit c9c8060

Please sign in to comment.