Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sorted slices to represent HDR histogram #52

Merged
merged 11 commits into from
Jul 31, 2023
274 changes: 101 additions & 173 deletions aggregationpb/aggregation.pb.go

Large diffs are not rendered by default.

792 changes: 303 additions & 489 deletions aggregationpb/aggregation_vtproto.pb.go

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions aggregators/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,9 +427,7 @@ func histogramFromProto(h *hdrhistogram.HistogramRepresentation, pb *aggregation
h.CountsRep.Reset()

for i := 0; i < len(pb.Buckets); i++ {
bucket := pb.Buckets[i]
counts := pb.Counts[i]
h.CountsRep.Add(bucket, counts)
h.CountsRep.Add(pb.Buckets[i], pb.Counts[i])
}
}

Expand All @@ -455,9 +453,9 @@ func setHistogramProto(h *hdrhistogram.HistogramRepresentation, pb *aggregationp
if countsLen > cap(pb.Counts) {
pb.Counts = make([]int64, 0, countsLen)
}
h.CountsRep.ForEach(func(bucket int32, value int64) {
h.CountsRep.ForEach(func(bucket int32, count int64) {
pb.Buckets = append(pb.Buckets, bucket)
pb.Counts = append(pb.Counts, value)
pb.Counts = append(pb.Counts, count)
})
}

Expand Down
98 changes: 62 additions & 36 deletions aggregators/internal/hdrhistogram/hdrhistogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"math"
"math/bits"
"time"

"golang.org/x/exp/slices"
)

const (
Expand Down Expand Up @@ -120,24 +122,21 @@ func (h *HistogramRepresentation) Buckets() (int64, []int64, []float64) {
values := make([]float64, 0, h.CountsRep.Len())

var totalCount int64
var bucketsSeen int
var prevBucket int32
iter := h.iterator()
// TODO @lahsivjar: Assuming sparse representation, we might be better off
// by sorting CountsRep rather than what we do now to avoid sorting.
for idx := 0; iter.next(); idx++ {
if bucketsSeen == h.CountsRep.Len() {
break
iter.nextCountAtIdx()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if the iterator type is still needed, and whether we could simplify things. Would it be possible to store the bucket value, rather than index, in CountsRep? Then Buckets is simply iterating through and copying to slices.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to store the bucket value, rather than index, in CountsRep?

Do you mean save the highestEquivalentValue for each bucket instead of index? That value is currently calculated based on the bucket and sub bucket index and I think it would require some re-implementation of the core logic.

Copy link
Contributor Author

@lahsivjar lahsivjar Jul 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be able to use the bucketIdx to derive the highestEquivalentValue directly without requiring the subBucket (which is the whole reason I kept the iterator). Giving this a shot now.

This doesn't work because we require the subBucketIdx even in this case. I think we might be able to do some refactoring to achieve a simpler state but maybe in a future PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 let's take it to a followup

h.CountsRep.ForEach(func(bucket int32, scaledCounts int64) {
if scaledCounts <= 0 {
return
}
scaledCount, ok := h.CountsRep.Get(int32(idx))
if !ok || scaledCount <= 0 {
continue
if iter.advance(int(bucket - prevBucket)) {
count := int64(math.Round(float64(scaledCounts) / histogramCountScale))
counts = append(counts, count)
values = append(values, float64(iter.highestEquivalentValue))
totalCount += count
}
bucketsSeen++
count := int64(math.Round(float64(scaledCount) / histogramCountScale))
counts = append(counts, count)
values = append(values, float64(iter.highestEquivalentValue))
totalCount += count
}
prevBucket = bucket
})
return totalCount, counts, values
}

Expand Down Expand Up @@ -203,9 +202,12 @@ type iterator struct {
highestEquivalentValue int64
}

func (i *iterator) next() bool {
if !i.nextCountAtIdx() {
return false
// advance advances the iterator by count
func (i *iterator) advance(count int) bool {
for c := 0; c < count; c++ {
if !i.nextCountAtIdx() {
return false
}
}
i.highestEquivalentValue = i.h.highestEquivalentValue(i.valueFromIdx)
return true
Expand Down Expand Up @@ -280,46 +282,59 @@ func getBucketCount() int32 {
return bucketsNeeded
}

// bar represents a bar of histogram. Each bar has a bucket, representing
// where the bar belongs to in the histogram range, and the count of values
// in each bucket.
type bar struct {
Bucket int32
Count int64
}

// HybridCountsRep represents a hybrid counts representation for
// sparse histogram. It is optimized to record a single value as
// integer type and more values as map.
type HybridCountsRep struct {
bucket int32
value int64
m map[int32]int64
s []bar
}

// Add adds a new value to a bucket of given index.
func (c *HybridCountsRep) Add(bucket int32, value int64) {
if c.m == nil && c.bucket == 0 && c.value == 0 {
if c.s == nil && c.bucket == 0 && c.value == 0 {
c.bucket = bucket
c.value = value
return
}
if c.m == nil {
c.m = make(map[int32]int64)
// automatic promotion to map
c.m[c.bucket] = c.value
if c.s == nil {
// automatic promotion to slice
c.s = make([]bar, 0, 128) // TODO: Use pool
c.s = slices.Insert(c.s, 0, bar{Bucket: c.bucket, Count: c.value})
c.bucket, c.value = 0, 0
}
c.m[bucket] += value
at, found := slices.BinarySearchFunc(c.s, bar{Bucket: bucket}, compareBar)
if found {
c.s[at].Count += value
return
}
c.s = slices.Insert(c.s, at, bar{Bucket: bucket, Count: value})
}

// ForEach iterates over each bucket and calls the given function.
func (c *HybridCountsRep) ForEach(f func(int32, int64)) {
if c.m == nil && (c.bucket != 0 || c.value != 0) {
if c.s == nil && (c.bucket != 0 || c.value != 0) {
f(c.bucket, c.value)
return
}
for k, v := range c.m {
f(k, v)
for i := range c.s {
f(c.s[i].Bucket, c.s[i].Count)
}
}

// Len returns the number of buckets currently recording.
func (c *HybridCountsRep) Len() int {
if c.m != nil {
return len(c.m)
if c.s != nil {
return len(c.s)
}
if c.bucket != 0 || c.value != 0 {
return 1
Expand All @@ -330,23 +345,24 @@ func (c *HybridCountsRep) Len() int {
// Get returns the count of values in a given bucket along with a bool
// which is false if the bucket is not found.
func (c *HybridCountsRep) Get(bucket int32) (int64, bool) {
if c.m == nil {
if c.s == nil {
if c.bucket == bucket {
return c.value, true
}
return 0, false
}
val, ok := c.m[bucket]
return val, ok
at, found := slices.BinarySearchFunc(c.s, bar{Bucket: bucket}, compareBar)
if found {
return c.s[at].Count, true
}
return 0, false
}

// Reset resets the values recorded.
func (c *HybridCountsRep) Reset() {
c.bucket = 0
c.value = 0
for k := range c.m {
delete(c.m, k)
}
c.s = c.s[:0]
}

// Equal returns true if same bucket and count is recorded in both.
Expand All @@ -366,3 +382,13 @@ func (c *HybridCountsRep) Equal(h *HybridCountsRep) bool {
})
return equal
}

func compareBar(a, b bar) int {
if a.Bucket == b.Bucket {
return 0
}
if a.Bucket > b.Bucket {
return 1
}
return -1
}
83 changes: 56 additions & 27 deletions aggregators/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package aggregators

import (
"io"
"sync"

"github.com/axiomhq/hyperloglog"
"golang.org/x/exp/slices"

"github.com/elastic/apm-aggregation/aggregationpb"
)
Expand Down Expand Up @@ -407,39 +407,68 @@ func mergeSpanMetrics(to, from *aggregationpb.SpanMetrics) {
to.Sum += from.Sum
}

// mapPool is a pool of maps to facilitate histogram merges.
var mapPool = sync.Pool{New: func() interface{} {
return make(map[int32]int64)
}}

// mergeHistogram merges two proto representation of HDRHistogram. The
// merge assumes both histograms are created with same arguments and
// their representations are sorted by bucket.
func mergeHistogram(to, from *aggregationpb.HDRHistogram) {
// Assume both histograms are created with same arguments
m := mapPool.Get().(map[int32]int64)
defer mapPool.Put(m)
for k := range m {
delete(m, k)
if len(from.Buckets) == 0 {
return
}

for i := 0; i < len(to.Buckets); i++ {
m[to.Buckets[i]] = to.Counts[i]
}
for i := 0; i < len(from.Buckets); i++ {
m[from.Buckets[i]] += from.Counts[i]
if len(to.Buckets) == 0 {
to.Buckets = append(to.Buckets, from.Buckets...)
to.Counts = append(to.Counts, from.Counts...)
return
}

if cap(to.Buckets) < len(m) {
to.Buckets = make([]int32, len(m))
}
if cap(to.Counts) < len(m) {
to.Counts = make([]int64, len(m))
requiredLen := len(to.Buckets) + len(from.Buckets)
for toIdx, fromIdx := 0, 0; toIdx < len(to.Buckets) && fromIdx < len(from.Buckets); {
v := to.Buckets[toIdx] - from.Buckets[fromIdx]
switch {
case v == 0:
// For every bucket that is common, we need one less bucket in final slice
requiredLen--
toIdx++
fromIdx++
case v < 0:
toIdx++
case v > 0:
fromIdx++
}
}

to.Buckets = to.Buckets[:0]
to.Counts = to.Counts[:0]

for b, c := range m {
to.Buckets = append(to.Buckets, b)
to.Counts = append(to.Counts, c)
toIdx, fromIdx := len(to.Buckets)-1, len(from.Buckets)-1
to.Buckets = slices.Grow(to.Buckets, requiredLen)
to.Counts = slices.Grow(to.Counts, requiredLen)
Comment on lines +441 to +442
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe there's a slight issue with slices.Grow here, since slices.Grow(s, n) ensures there's extra space for n more elements. The right call here should be to.Buckets = slices.Grow(to.Buckets, requiredLen-len(to.Buckets)).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added #56

to.Buckets = to.Buckets[:requiredLen]
to.Counts = to.Counts[:requiredLen]
for idx := len(to.Buckets) - 1; idx >= 0; idx-- {
if fromIdx < 0 {
break
}
if toIdx < 0 {
to.Counts[idx] = from.Counts[fromIdx]
to.Buckets[idx] = from.Buckets[fromIdx]
fromIdx--
continue
}
v := to.Buckets[toIdx] - from.Buckets[fromIdx]
switch {
case v == 0:
to.Counts[toIdx] += from.Counts[fromIdx]
to.Counts[idx] = to.Counts[toIdx]
to.Buckets[idx] = to.Buckets[toIdx]
toIdx--
fromIdx--
case v > 0:
to.Counts[idx] = to.Counts[toIdx]
to.Buckets[idx] = to.Buckets[toIdx]
toIdx--
case v < 0:
to.Counts[idx] = from.Counts[fromIdx]
to.Buckets[idx] = from.Buckets[fromIdx]
fromIdx--
}
}
}

Expand Down
80 changes: 60 additions & 20 deletions aggregators/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,26 +999,66 @@ func TestCardinalityEstimationOnSubKeyCollision(t *testing.T) {
}

func TestMergeHistogram(t *testing.T) {
// Test assumes histogram representation Merge is correct
hist1, hist2 := hdrhistogram.New(), hdrhistogram.New()

for i := 0; i < 1_000_000; i++ {
v1, v2 := rand.Int63n(3_600_000_000), rand.Int63n(3_600_000_000)
c1, c2 := rand.Int63n(1_000), rand.Int63n(1_000)
hist1.RecordValues(v1, c1)
hist2.RecordValues(v2, c2)
}
for _, tc := range []struct {
name string
recordFunc func(h1, h2 *hdrhistogram.HistogramRepresentation)
}{
{
name: "zero_values",
recordFunc: func(h1, h2 *hdrhistogram.HistogramRepresentation) {
h1.RecordValues(0, 0)
h2.RecordValues(0, 0)
},
},
{
name: "random_only_to",
recordFunc: func(h1, h2 *hdrhistogram.HistogramRepresentation) {
for i := 0; i < 1_000_000; i++ {
v := rand.Int63n(3_600_000_000)
c := rand.Int63n(1_000)
h1.RecordValues(v, c)
}
},
},
{
name: "random_only_from",
recordFunc: func(h1, h2 *hdrhistogram.HistogramRepresentation) {
for i := 0; i < 1_000_000; i++ {
v := rand.Int63n(3_600_000_000)
c := rand.Int63n(1_000)
h2.RecordValues(v, c)
}
},
},
{
name: "random_both",
recordFunc: func(h1, h2 *hdrhistogram.HistogramRepresentation) {
for i := 0; i < 1_000_000; i++ {
v1, v2 := rand.Int63n(3_600_000_000), rand.Int63n(3_600_000_000)
c1, c2 := rand.Int63n(1_000), rand.Int63n(1_000)
h1.RecordValues(v1, c1)
h2.RecordValues(v2, c2)
}
},
},
} {
t.Run(tc.name, func(t *testing.T) {
// Test assumes histogram representation Merge is correct
hist1, hist2 := hdrhistogram.New(), hdrhistogram.New()

histproto1, histproto2 := histogramToProto(hist1), histogramToProto(hist2)
hist1.Merge(hist2)
mergeHistogram(histproto1, histproto2)
histActual := hdrhistogram.New()
histogramFromProto(histActual, histproto1)
tc.recordFunc(hist1, hist2)
histproto1, histproto2 := histogramToProto(hist1), histogramToProto(hist2)
hist1.Merge(hist2)
mergeHistogram(histproto1, histproto2)
histActual := hdrhistogram.New()
histogramFromProto(histActual, histproto1)

assert.Empty(t, cmp.Diff(
hist1,
histActual,
cmp.AllowUnexported(hdrhistogram.HistogramRepresentation{}),
cmp.AllowUnexported(hdrhistogram.HybridCountsRep{}),
))
assert.Empty(t, cmp.Diff(
hist1,
histActual,
cmp.AllowUnexported(hdrhistogram.HistogramRepresentation{}),
cmp.AllowUnexported(hdrhistogram.HybridCountsRep{}),
))
})
}
}
Loading