Skip to content

Commit a625370

Browse files
Added merging of projections preparing parallel segment evaluation.
1 parent b9e2791 commit a625370

20 files changed

+480
-205
lines changed

stars-core/src/main/kotlin/tools/aqua/stars/core/evaluation/TSCEvaluation.kt

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ package tools.aqua.stars.core.evaluation
2020
import java.util.logging.Logger
2121
import kotlin.time.measureTime
2222
import kotlinx.coroutines.*
23-
import tools.aqua.stars.core.metric.metrics.Metrics
23+
import tools.aqua.stars.core.metric.metrics.EvaluationMetrics
24+
import tools.aqua.stars.core.metric.metrics.PostEvaluationMetrics
2425
import tools.aqua.stars.core.metric.providers.*
2526
import tools.aqua.stars.core.tsc.TSC
2627
import tools.aqua.stars.core.tsc.instance.TSCInstance
@@ -55,22 +56,50 @@ class TSCEvaluation<E : EntityType<E, T, S>, T : TickDataType<E, T, S>, S : Segm
5556
private val tscProjections: MutableList<TSCProjection<E, T, S>> = mutableListOf()
5657

5758
/** Holds a [List] of all [MetricProvider]s registered by [registerMetricProviders]. */
58-
private val metrics: Metrics<E, T, S> = Metrics()
59+
private val evaluationMetrics: EvaluationMetrics<E, T, S> = EvaluationMetrics()
60+
61+
/** Holds a [List] of all [MetricProvider]s registered by [registerMetricProviders]. */
62+
private val postEvaluationMetrics: PostEvaluationMetrics<E, T, S> = PostEvaluationMetrics()
5963

6064
/** Hold all [Deferred] instances returned by [evaluateSegment]. */
61-
private val segmentEvaluationJobs: MutableList<Deferred<Metrics<E, T, S>>> = mutableListOf()
65+
private val segmentEvaluationJobs: MutableList<Deferred<EvaluationMetrics<E, T, S>>> =
66+
mutableListOf()
6267

6368
/** Coroutine scope for segment evaluations. */
6469
val scope: CoroutineScope = CoroutineScope(Dispatchers.Unconfined)
6570

6671
/**
67-
* Registers a new [MetricProvider] to the list of metrics that should be called during
68-
* evaluation.
72+
* Registers new [MetricProvider]s to the list of metrics that should be called during evaluation.
6973
*
7074
* @param metricProviders The [MetricProvider]s that should be registered.
7175
*/
7276
fun registerMetricProviders(vararg metricProviders: MetricProvider<E, T, S>) {
73-
this.metrics.register(*metricProviders)
77+
evaluationMetrics.register(
78+
metricProviders.filterIsInstance<EvaluationMetricProvider<E, T, S>>())
79+
postEvaluationMetrics.register(
80+
metricProviders.filterIsInstance<PostEvaluationMetricProvider<E, T, S>>())
81+
}
82+
83+
/**
84+
* Registers new [EvaluationMetricProvider]s to the list of metrics that should be called during
85+
* evaluation.
86+
*
87+
* @param metricProviders The [EvaluationMetricProvider]s that should be registered.
88+
*/
89+
fun registerEvaluationMetricProviders(vararg metricProviders: EvaluationMetricProvider<E, T, S>) {
90+
this.evaluationMetrics.register(metricProviders.toList())
91+
}
92+
93+
/**
94+
* Registers new [PostEvaluationMetricProvider]s to the list of metrics that should be called
95+
* during evaluation.
96+
*
97+
* @param metricProviders The [PostEvaluationMetricProvider]s that should be registered.
98+
*/
99+
fun registerPostEvaluationMetricProviders(
100+
vararg metricProviders: PostEvaluationMetricProvider<E, T, S>
101+
) {
102+
this.postEvaluationMetrics.register(metricProviders.toList())
74103
}
75104

76105
/**
@@ -80,7 +109,9 @@ class TSCEvaluation<E : EntityType<E, T, S>, T : TickDataType<E, T, S>, S : Segm
80109
* @throws IllegalArgumentException When there are no [MetricProvider]s registered.
81110
*/
82111
fun prepare() {
83-
require(metrics.any()) { "There needs to be at least one registered MetricProviders." }
112+
require(evaluationMetrics.any() || postEvaluationMetrics.any()) {
113+
"There needs to be at least one registered MetricProviders."
114+
}
84115
require(tscProjections.isEmpty()) { "TSCEvaluation.prepare() has been called before." }
85116

86117
// Build all projections of the base TSC
@@ -108,8 +139,8 @@ class TSCEvaluation<E : EntityType<E, T, S>, T : TickDataType<E, T, S>, S : Segm
108139
segments.map { scope.async { evaluateSegment(it) }.also { it.start() } })
109140
}
110141

111-
private suspend fun evaluateSegment(segment: S): Metrics<E, T, S> {
112-
val metricHolder = metrics.copy()
142+
private suspend fun evaluateSegment(segment: S): EvaluationMetrics<E, T, S> {
143+
val metricHolder = evaluationMetrics.copy()
113144
val segmentEvaluationTime = measureTime {
114145
// Run the "evaluate" function for all SegmentMetricProviders on the current segment
115146
metricHolder.evaluateSegmentMetrics(segment)
@@ -150,9 +181,14 @@ class TSCEvaluation<E : EntityType<E, T, S>, T : TickDataType<E, T, S>, S : Segm
150181
tscProjections.clear()
151182

152183
runBlocking {
153-
Metrics.merge(segmentEvaluationJobs.awaitAll()).apply {
184+
EvaluationMetrics.merge(segmentEvaluationJobs.awaitAll()).apply {
154185
printState()
155-
evaluatePostEvaluationMetrics()
186+
plotData()
187+
close()
188+
}
189+
190+
postEvaluationMetrics.apply {
191+
evaluate()
156192
plotData()
157193
close()
158194
}

stars-core/src/main/kotlin/tools/aqua/stars/core/ListExtensions.kt renamed to stars-core/src/main/kotlin/tools/aqua/stars/core/extensions.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package tools.aqua.stars.core
2121

22+
import kotlin.contracts.ExperimentalContracts
23+
import kotlin.contracts.contract
24+
2225
/** Creates TxT cross product. */
2326
fun <T> List<T>.x2(): List<Pair<T, T>> = this.flatMap { a -> this.map { b -> a to b } }
2427

@@ -74,3 +77,13 @@ fun <T> List<List<List<T>>>.crossProduct(): List<List<T>> {
7477

7578
return if (size == 2) nextLevelList else (listOf(nextLevelList) + subList(2, size)).crossProduct()
7679
}
80+
81+
/**
82+
* Throws an [IllegalArgumentException] with the result of calling [lazyMessage] if the [obj] is not
83+
* of type [T]. Contracts type [T] on successful return.
84+
*/
85+
@OptIn(ExperimentalContracts::class)
86+
inline fun <reified T> requireIsInstance(obj: Any, lazyMessage: () -> Any) {
87+
contract { returns() implies (obj is T) }
88+
require(obj is T, lazyMessage)
89+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2023 The STARS Project Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package tools.aqua.stars.core.metric.metrics
19+
20+
import kotlinx.coroutines.*
21+
import tools.aqua.stars.core.metric.providers.*
22+
import tools.aqua.stars.core.tsc.instance.TSCInstance
23+
import tools.aqua.stars.core.tsc.projection.TSCProjection
24+
import tools.aqua.stars.core.types.EntityType
25+
import tools.aqua.stars.core.types.SegmentType
26+
import tools.aqua.stars.core.types.TickDataType
27+
28+
/** Wrapper class for [EvaluationMetricProvider]s. */
29+
class EvaluationMetrics<
30+
E : EntityType<E, T, S>, T : TickDataType<E, T, S>, S : SegmentType<E, T, S>>(
31+
private val segmentMetrics:
32+
MutableMap<Class<SegmentMetricProvider<E, T, S>>, SegmentMetricProvider<E, T, S>> =
33+
mutableMapOf(),
34+
private val projectionMetrics:
35+
MutableMap<Class<ProjectionMetricProvider<E, T, S>>, ProjectionMetricProvider<E, T, S>> =
36+
mutableMapOf(),
37+
private val tscInstanceMetrics:
38+
MutableMap<Class<TSCInstanceMetricProvider<E, T, S>>, TSCInstanceMetricProvider<E, T, S>> =
39+
mutableMapOf(),
40+
private val tscInstanceAndProjectionMetrics:
41+
MutableMap<
42+
Class<TSCInstanceAndProjectionNodeMetricProvider<E, T, S>>,
43+
TSCInstanceAndProjectionNodeMetricProvider<E, T, S>> =
44+
mutableMapOf()
45+
) : Metrics<EvaluationMetricProvider<E, T, S>, E, T, S>() {
46+
47+
/** Returns all registered [EvaluationMetricProvider]s. */
48+
override fun all():
49+
Map<Class<out EvaluationMetricProvider<E, T, S>>, EvaluationMetricProvider<E, T, S>> =
50+
segmentMetrics + projectionMetrics + tscInstanceMetrics + tscInstanceAndProjectionMetrics
51+
52+
/** Registers metric providers. */
53+
fun register(metrics: List<EvaluationMetricProvider<E, T, S>>) {
54+
metrics.forEach {
55+
when (it) {
56+
is SegmentMetricProvider -> segmentMetrics[it.javaClass] = it
57+
is ProjectionMetricProvider -> projectionMetrics[it.javaClass] = it
58+
is TSCInstanceMetricProvider -> tscInstanceMetrics[it.javaClass] = it
59+
is TSCInstanceAndProjectionNodeMetricProvider ->
60+
tscInstanceAndProjectionMetrics[it.javaClass] = it
61+
}
62+
}
63+
}
64+
65+
/**
66+
* Run the "evaluate" function for all [SegmentMetricProvider]s on the current [segment].
67+
*
68+
* @param segment The current [SegmentType].
69+
*/
70+
fun evaluateSegmentMetrics(segment: SegmentType<E, T, S>) {
71+
segmentMetrics.values.forEach { it.evaluate(segment) }
72+
}
73+
74+
/**
75+
* Run the "evaluate" function for all [ProjectionMetricProvider]s on the current [projection].
76+
*
77+
* @param projection The current [TSCProjection].
78+
*/
79+
fun evaluateProjectionMetrics(projection: TSCProjection<E, T, S>) {
80+
projectionMetrics.values.forEach { it.evaluate(projection) }
81+
}
82+
83+
/**
84+
* Run the "evaluate" function for all [TSCInstanceMetricProvider]s on the current [instance].
85+
*
86+
* @param instance The current [TSCInstance].
87+
*/
88+
fun evaluateTSCInstanceMetrics(instance: TSCInstance<E, T, S>) {
89+
tscInstanceMetrics.values.forEach { it.evaluate(instance) }
90+
}
91+
92+
/**
93+
* Run the "evaluate" function for all [TSCInstanceAndProjectionNodeMetricProvider]s on the
94+
* current [instance] and [projection].
95+
*
96+
* @param instance The current [TSCInstance].
97+
* @param projection The current [TSCProjection].
98+
*/
99+
fun evaluateTSCInstanceAndProjectionMetrics(
100+
instance: TSCInstance<E, T, S>,
101+
projection: TSCProjection<E, T, S>
102+
) {
103+
tscInstanceAndProjectionMetrics.values.forEach { it.evaluate(instance, projection) }
104+
}
105+
106+
/** Deeply copies the [EvaluationMetrics] object resetting all saved instances. */
107+
override fun copy(): EvaluationMetrics<E, T, S> =
108+
EvaluationMetrics(
109+
segmentMetrics.map { (k, v) -> Pair(k, v.copy()) }.toMap(mutableMapOf()),
110+
projectionMetrics.map { (k, v) -> Pair(k, v.copy()) }.toMap(mutableMapOf()),
111+
tscInstanceMetrics.map { (k, v) -> Pair(k, v.copy()) }.toMap(mutableMapOf()),
112+
tscInstanceAndProjectionMetrics.map { (k, v) -> Pair(k, v.copy()) }.toMap(mutableMapOf()))
113+
114+
companion object {
115+
/**
116+
* Merges given list of [EvaluationMetrics] in parallel. Returns a new [EvaluationMetrics]
117+
* instance.
118+
*/
119+
fun <E : EntityType<E, T, S>, T : TickDataType<E, T, S>, S : SegmentType<E, T, S>> merge(
120+
metrics: List<EvaluationMetrics<E, T, S>>
121+
): EvaluationMetrics<E, T, S> =
122+
EvaluationMetrics<E, T, S>().apply {
123+
register(
124+
runBlocking {
125+
metrics
126+
// Receive all provider maps
127+
.map { it.all() }
128+
.asSequence()
129+
// Flatmap and group to one map
130+
.flatMap { it.asSequence() }
131+
.groupBy({ it.key }, { it.value })
132+
// Create async jobs
133+
.map { async { combineResults(it.value.toMutableList()) } }
134+
.awaitAll() // Await completion
135+
})
136+
}
137+
138+
private fun <
139+
K : EvaluationMetricProvider<E, T, S>,
140+
E : EntityType<E, T, S>,
141+
T : TickDataType<E, T, S>,
142+
S : SegmentType<E, T, S>> combineResults(providers: MutableList<K>): K {
143+
check(providers.isNotEmpty()) { "Empty list of metric providers encountered." }
144+
145+
val instances = providers.toMutableList()
146+
val instance = instances.removeFirst()
147+
148+
instances.forEach { instance.merge(it) }
149+
150+
return instance
151+
}
152+
}
153+
}

0 commit comments

Comments
 (0)