diff --git a/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/client/AnalyzerAppController.kt b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/client/AnalyzerAppController.kt index 0ab63bc8..4baf319a 100644 --- a/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/client/AnalyzerAppController.kt +++ b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/client/AnalyzerAppController.kt @@ -14,7 +14,7 @@ import kotlin.coroutines.CoroutineContext /** The analyzer application controller. */ class AnalyzerAppController : AppController, CoroutineScope { private val view = AnalyzerGUI(this) - private val analyzer = Analyzer.ofGitHub(GitHubRepositoryProvider()) + private val analyzer = Analyzer.ofGithubByFlows(GitHubRepositoryProvider()) private var currentComputation: Job? = null init { diff --git a/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/Analyzer.kt b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/Analyzer.kt index f29fdd8f..6b7229e6 100644 --- a/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/Analyzer.kt +++ b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/Analyzer.kt @@ -1,11 +1,5 @@ package io.github.tassiLuca.analyzer.lib -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.async -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch - /** A generic analyzer of organization/group/workspace repositories. */ interface Analyzer { @@ -19,51 +13,12 @@ interface Analyzer { ): Result> companion object { - /** Creates a new GitHub organization [Analyzer]. */ - fun ofGitHub(gitHubProvider: GitHubRepositoryProvider): Analyzer = GitHubAnalyzer(gitHubProvider) - } -} - -private class GitHubAnalyzer(private val gitHubProvider: GitHubRepositoryProvider) : Analyzer { - - override suspend fun analyze( - organizationName: String, - updateResults: suspend (RepositoryReport) -> Unit, - ): Result> = coroutineScope { - runCatching { - val repositories = gitHubProvider.repositoriesOf(organizationName).getOrThrow() - val resultsChannel = analyzeAll(organizationName, repositories) - collectResults(resultsChannel, repositories.size, updateResults) - } - } - - private fun CoroutineScope.analyzeAll( - organizationName: String, - repositories: List, - ): Channel { - val channel = Channel() - repositories.map { - launch { - val contributors = async { gitHubProvider.contributorsOf(organizationName, it.name).getOrThrow() } - val release = async { gitHubProvider.lastReleaseOf(organizationName, it.name).getOrThrow() } - channel.send(RepositoryReport(it.name, it.issues, it.stars, contributors.await(), release.await())) - } - } - return channel - } + /** Creates a new GitHub organization [Analyzer] based on Coroutines `Channel`s. */ + fun ofGitHubByChannels(gitHubProvider: GitHubRepositoryProvider): Analyzer = + GitHubAnalyzerByChannels(gitHubProvider) - private suspend fun collectResults( - resultsChannel: Channel, - expectedResults: Int, - updateResults: suspend (RepositoryReport) -> Unit, - ): Set { - var allReports = emptySet() - repeat(expectedResults) { - val report = resultsChannel.receive() - allReports = allReports + report - updateResults(report) - } - resultsChannel.close() - return allReports + /** Creates a new GitHub organization [Analyzer] based on Coroutines `Flow`s. */ + fun ofGithubByFlows(gitHubProvider: GitHubRepositoryProvider): Analyzer = + GitHubAnalyzerByFlows(gitHubProvider) } } diff --git a/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByChannels.kt b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByChannels.kt new file mode 100644 index 00000000..74c8d008 --- /dev/null +++ b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByChannels.kt @@ -0,0 +1,51 @@ +package io.github.tassiLuca.analyzer.lib + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.async +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch + +internal class GitHubAnalyzerByChannels(private val provider: GitHubRepositoryProvider) : Analyzer { + + override suspend fun analyze( + organizationName: String, + updateResults: suspend (RepositoryReport) -> Unit, + ): Result> = coroutineScope { + runCatching { + val repositories = provider.repositoriesOf(organizationName).getOrThrow() + val resultsChannel = analyzeAll(organizationName, repositories) + collectResults(resultsChannel, repositories.size, updateResults) + } + } + + private fun CoroutineScope.analyzeAll( + organizationName: String, + repositories: List, + ): Channel { + val channel = Channel() + repositories.map { + launch { + val contributors = async { provider.contributorsOf(organizationName, it.name).getOrThrow() } + val release = async { provider.lastReleaseOf(organizationName, it.name).getOrThrow() } + channel.send(RepositoryReport(it.name, it.issues, it.stars, contributors.await(), release.await())) + } + } + return channel + } + + private suspend fun collectResults( + resultsChannel: Channel, + expectedResults: Int, + updateResults: suspend (RepositoryReport) -> Unit, + ): Set { + var allReports = emptySet() + repeat(expectedResults) { + val report = resultsChannel.receive() + allReports = allReports + report + updateResults(report) + } + resultsChannel.close() + return allReports + } +} diff --git a/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByFlows.kt b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByFlows.kt new file mode 100644 index 00000000..3d1f783c --- /dev/null +++ b/analyzer-direct-kt/src/main/kotlin/io/github/tassiLuca/analyzer/lib/GitHubAnalyzerByFlows.kt @@ -0,0 +1,46 @@ +package io.github.tassiLuca.analyzer.lib + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.channelFlow +import kotlinx.coroutines.flow.flatMapConcat +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.launch + +internal class GitHubAnalyzerByFlows(private val provider: GitHubRepositoryProvider) : Analyzer { + + @OptIn(ExperimentalCoroutinesApi::class) + @SuppressWarnings("InjectDispatcher") + override suspend fun analyze( + organizationName: String, + updateResults: suspend (RepositoryReport) -> Unit, + ): Result> = coroutineScope { + runCatching { + val reports = provider.flowingRepositoriesOf(organizationName) + .flatMapConcat { analyzeAll(it) } + .flowOn(Dispatchers.Default) + var allReports = emptySet() + reports.collect { + updateResults(it) + allReports = allReports + it + } + allReports + } + } + + @SuppressWarnings("InjectDispatcher") + private fun analyzeAll(repositories: List): Flow = channelFlow { + repositories.forEach { repository -> + launch { + val release = async { provider.lastReleaseOf(repository.organization, repository.name).getOrThrow() } + provider.flowingContributorsOf(repository.organization, repository.name).toList().forEach { + send(RepositoryReport(repository.name, repository.issues, repository.stars, it, release.await())) + } + } + } + }.flowOn(Dispatchers.Default) +} diff --git a/analyzer-direct-kt/src/test/kotlin/io/github/tassiLuca/analyzer/lib/AnalyzerTest.kt b/analyzer-direct-kt/src/test/kotlin/io/github/tassiLuca/analyzer/lib/AnalyzerTest.kt index 6300ffc3..12ec5688 100644 --- a/analyzer-direct-kt/src/test/kotlin/io/github/tassiLuca/analyzer/lib/AnalyzerTest.kt +++ b/analyzer-direct-kt/src/test/kotlin/io/github/tassiLuca/analyzer/lib/AnalyzerTest.kt @@ -18,7 +18,7 @@ class AnalyzerTest : FunSpec() { ) init { - test("Analyzer should return the correct results if given in input an existing organization") { + test("Channel based analyzer should return the correct results if no errors occur") { var incrementalResults = emptySet() runBlocking { val service = successfulService() @@ -31,7 +31,7 @@ class AnalyzerTest : FunSpec() { } } - test("Analyzer should return a failure if given in input a non-existing organization") { + test("Channel based analyzer should return a failure if given in input a non-existing organization") { var incrementalResults = emptySet() runBlocking { val service = failingService() @@ -58,13 +58,13 @@ class AnalyzerTest : FunSpec() { `when`(gitHubProvider.lastReleaseOf(repo.organization, repo.name)) .thenReturn(Result.success(data.second)) } - return Analyzer.ofGitHub(gitHubProvider) + return Analyzer.ofGitHubByChannels(gitHubProvider) } private suspend fun failingService(): Analyzer { val gitHubProvider = mock() `when`(gitHubProvider.repositoriesOf("dse")) .thenReturn(Result.failure(RuntimeException("404, not found"))) - return Analyzer.ofGitHub(gitHubProvider) + return Analyzer.ofGitHubByChannels(gitHubProvider) } }