diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java index 267a1ee50..5c8a3a271 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/AtlasRegistry.java @@ -305,7 +305,7 @@ void sendToLWC() { if (config.lwcEnabled()) { logger.debug("sending to LWC for time: {}", t); try { - EvalPayload payload = evaluator.eval(t); + EvalPayload payload = evaluator.eval(t, parallelPolling); if (!payload.getMetrics().isEmpty()) { List> futures = new ArrayList<>(); payload.consumeBatches(batchSize, p -> futures.add(publisher.publish(p))); diff --git a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java index 252b1c10f..c41c6e9fb 100644 --- a/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java +++ b/spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/Evaluator.java @@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -31,10 +32,12 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.stream.StreamSupport; /** * Evaluates all the expressions for a set of subscriptions. @@ -154,8 +157,24 @@ public void update(Id id, long t, double v) { * Payload representing the results of the evaluation. */ public EvalPayload eval(long timestamp) { - List metrics = new ArrayList<>(); - subscriptions.values().forEach(subEntry -> { + return eval(timestamp, false); + } + + /** + * Evaluate the expressions for all subscriptions against the data available for the provided + * timestamp. The data must be populated by calling {@link #update(Id, long, double)} prior to + * performing the evaluation. + * + * @param timestamp + * Timestamp for the interval to evaluate. + * @param parallel + * Should the subscriptions be evaluated in parallel? + * @return + * Payload representing the results of the evaluation. + */ + public EvalPayload eval(long timestamp, boolean parallel) { + Collection metrics = new ConcurrentLinkedQueue<>(); + StreamSupport.stream(subscriptions.values().spliterator(), parallel).forEach(subEntry -> { final String subId = subEntry.subscription.getId(); final long step = subEntry.subscription.getFrequency(); @@ -215,7 +234,7 @@ public EvalPayload eval(long timestamp) { } }); - return new EvalPayload(timestamp, metrics); + return new EvalPayload(timestamp, new ArrayList<>(metrics)); } private void putCommonTags(Map dst, Set keys) {