Skip to content

Commit

Permalink
atlas: support parallel evaluation (#1125)
Browse files Browse the repository at this point in the history
Update evaluator for streaming expressions to allow them
to be evaluated in parallel. This can be useful when run
as a centralized service with thousands of expressions.
  • Loading branch information
brharrington authored Mar 18, 2024
1 parent 48bfb38 commit 576c9f2
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ void sendToLWC() {
if (config.lwcEnabled()) {
logger.debug("sending to LWC for time: {}", t);
try {
EvalPayload payload = evaluator.eval(t);
EvalPayload payload = evaluator.eval(t, parallelPolling);
if (!payload.getMetrics().isEmpty()) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
payload.consumeBatches(batchSize, p -> futures.add(publisher.publish(p)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -31,10 +32,12 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;

/**
* Evaluates all the expressions for a set of subscriptions.
Expand Down Expand Up @@ -154,8 +157,24 @@ public void update(Id id, long t, double v) {
* Payload representing the results of the evaluation.
*/
public EvalPayload eval(long timestamp) {
List<EvalPayload.Metric> metrics = new ArrayList<>();
subscriptions.values().forEach(subEntry -> {
return eval(timestamp, false);
}

/**
* Evaluate the expressions for all subscriptions against the data available for the provided
* timestamp. The data must be populated by calling {@link #update(Id, long, double)} prior to
* performing the evaluation.
*
* @param timestamp
* Timestamp for the interval to evaluate.
* @param parallel
* Should the subscriptions be evaluated in parallel?
* @return
* Payload representing the results of the evaluation.
*/
public EvalPayload eval(long timestamp, boolean parallel) {
Collection<EvalPayload.Metric> metrics = new ConcurrentLinkedQueue<>();
StreamSupport.stream(subscriptions.values().spliterator(), parallel).forEach(subEntry -> {
final String subId = subEntry.subscription.getId();
final long step = subEntry.subscription.getFrequency();

Expand Down Expand Up @@ -215,7 +234,7 @@ public EvalPayload eval(long timestamp) {
}
});

return new EvalPayload(timestamp, metrics);
return new EvalPayload(timestamp, new ArrayList<>(metrics));
}

private void putCommonTags(Map<String, String> dst, Set<String> keys) {
Expand Down

0 comments on commit 576c9f2

Please sign in to comment.