diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/CachedValuesHistogram.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/CachedValuesHistogram.java index 09691cc51..8bff8623a 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/metric/CachedValuesHistogram.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/CachedValuesHistogram.java @@ -17,9 +17,18 @@ import org.HdrHistogram.Histogram; +import java.util.concurrent.ConcurrentLinkedQueue; + public class CachedValuesHistogram { - private final Histogram underlying; + static int POOL_SIZE = 1000; + static ConcurrentLinkedQueue HISTOGRAM_POOL = new ConcurrentLinkedQueue(); + + static { + for (int i = 0; i < POOL_SIZE; i++) { + HISTOGRAM_POOL.add(new Histogram(3)); + } + } private final int mean; private final int p0; @@ -49,13 +58,13 @@ public class CachedValuesHistogram { private final int p99_99; private final int p100; + private final long totalCount; + public static CachedValuesHistogram backedBy(Histogram underlying) { return new CachedValuesHistogram(underlying); } private CachedValuesHistogram(Histogram underlying) { - this.underlying = underlying; - /** * Single thread calculates a variety of commonly-accessed quantities. * This way, all threads can access the cached values without synchronization @@ -89,10 +98,10 @@ private CachedValuesHistogram(Histogram underlying) { p99_95 = (int) underlying.getValueAtPercentile(99.95); p99_99 = (int) underlying.getValueAtPercentile(99.99); p100 = (int) underlying.getValueAtPercentile(100); - } - public Histogram getUnderlying() { - return underlying; + totalCount = underlying.getTotalCount(); + + release(underlying); } /** @@ -138,20 +147,24 @@ public int getValueAtPercentile(double percentile) { case 9995: return p99_95; case 9999: return p99_99; case 10000: return p100; - default: return getArbitraryPercentile(percentile); + default: throw new IllegalArgumentException("Percentile (" + percentile + ") is not currently cached"); } } - /** - * Since this can be accessed by any thread, need external synchronization - * @param percentile percentile of distribution - * @return value at percentile - */ - private synchronized int getArbitraryPercentile(double percentile) { - return (int) underlying.getValueAtPercentile(percentile); + public long getTotalCount() { + return totalCount; } - public synchronized long getTotalCount() { - return underlying.getTotalCount(); + private static void release(Histogram histogram) { + histogram.reset(); + HISTOGRAM_POOL.offer(histogram); + } + + public static Histogram getNewHistogram() { + Histogram histogram = HISTOGRAM_POOL.poll(); + if (histogram == null) { + histogram = new Histogram(3); + } + return histogram; } } diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/RollingDistributionStream.java b/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/RollingDistributionStream.java index 08e45fee2..37d617c46 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/RollingDistributionStream.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/metric/consumer/RollingDistributionStream.java @@ -22,16 +22,13 @@ import org.HdrHistogram.Histogram; import rx.Observable; import rx.Subscription; -import rx.functions.Action1; import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; -import rx.observers.Subscribers; import rx.subjects.BehaviorSubject; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -49,7 +46,7 @@ */ public class RollingDistributionStream { private AtomicReference rollingDistributionSubscription = new AtomicReference(null); - private final BehaviorSubject rollingDistribution = BehaviorSubject.create(CachedValuesHistogram.backedBy(getNewHistogram())); + private final BehaviorSubject rollingDistribution = BehaviorSubject.create(CachedValuesHistogram.backedBy(CachedValuesHistogram.getNewHistogram())); private final Observable rollingDistributionStream; private static final Func2 distributionAggregator = new Func2() { @@ -67,15 +64,6 @@ public Observable call(Observable window) { } }; - private static final Action1> releaseOlderOfTwoDistributions = new Action1>() { - @Override - public void call(List histograms) { - if (histograms != null && histograms.size() == 2) { - releaseHistogram(histograms.get(0).getUnderlying()); - } - } - }; - private static final Func1 cacheHistogramValues = new Func1() { @Override public CachedValuesHistogram call(Histogram histogram) { @@ -95,13 +83,13 @@ protected RollingDistributionStream(final HystrixEventStream stream, fina final Func2 addValuesToBucket) { final List emptyDistributionsToStart = new ArrayList(); for (int i = 0; i < numBuckets; i++) { - emptyDistributionsToStart.add(getNewHistogram()); + emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram()); } final Func1, Observable> reduceBucketToSingleDistribution = new Func1, Observable>() { @Override public Observable call(Observable bucket) { - return bucket.reduce(getNewHistogram(), addValuesToBucket); + return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket); } }; @@ -148,13 +136,6 @@ public void startCachingStreamValuesIfUnstarted() { Subscription candidateSubscription = observe().subscribe(rollingDistribution); if (rollingDistributionSubscription.compareAndSet(null, candidateSubscription)) { //won the race to set the subscription - - //as soon as subject receives a new Histogram, old one may be released - rollingDistribution - .window(2, 1) //subject is used as a single-value, but can be viewed as a stream. Here, get the latest 2 values of the subject - .flatMap(convertToList) //convert to list (of length 2) - .doOnNext(releaseOlderOfTwoDistributions) //if there are 2, then the oldest one will never be read, so we can reclaim its memory - .unsafeSubscribe(Subscribers.empty()); //no need to emit anywhere, this is side-effect only (release the reference to the old Histogram) } else { //lost the race to set the subscription, so we need to cancel this one candidateSubscription.unsubscribe(); @@ -178,26 +159,4 @@ public void unsubscribe() { rollingDistributionSubscription.compareAndSet(s, null); } } - - private static Histogram getNewHistogram() { - Histogram histogram = HISTOGRAM_POOL.poll(); - if (histogram == null) { - histogram = new Histogram(3); - } - return histogram; - } - - private static void releaseHistogram(Histogram histogram) { - histogram.reset(); - HISTOGRAM_POOL.offer(histogram); - } - - static int POOL_SIZE = 1000; - static ConcurrentLinkedQueue HISTOGRAM_POOL = new ConcurrentLinkedQueue(); - - static { - for (int i = 0; i < POOL_SIZE; i++) { - HISTOGRAM_POOL.add(new Histogram(3)); - } - } }