Skip to content

Commit

Permalink
perf: Sort hdrhistogram proto and merge sorted slices
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jul 28, 2023
1 parent fe23835 commit e12de07
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 25 deletions.
1 change: 1 addition & 0 deletions aggregators/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ func histogramToProto(h *hdrhistogram.HistogramRepresentation) *aggregationpb.HD
pb.Buckets = append(pb.Buckets, bucket)
pb.Counts = append(pb.Counts, value)
})
sort.Sort(hdrhistogram.SortBy[int32, int64]{By: pb.Buckets, Other: pb.Counts})
return pb
}

Expand Down
23 changes: 23 additions & 0 deletions aggregators/internal/hdrhistogram/sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package hdrhistogram

import "golang.org/x/exp/constraints"

// SortBy sorts 2 slices together. SortBy.By will be sorted and corresponding
// swaps will happen in SortBy.Other.
type SortBy[T constraints.Ordered, T2 any] struct {
By []T
Other []T2
}

func (s SortBy[T, T2]) Len() int {
return len(s.By)
}

func (s SortBy[T, T2]) Swap(i, j int) {
s.By[i], s.By[j] = s.By[j], s.By[i]
s.Other[i], s.Other[j] = s.Other[j], s.Other[i]
}

func (s SortBy[T, T2]) Less(i, j int) bool {
return s.By[i] < s.By[j]
}
15 changes: 15 additions & 0 deletions aggregators/internal/hdrhistogram/sort_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package hdrhistogram

import (
"fmt"
"sort"
)

func Example() {
x := []int{3, 2, 1}
y := []string{"a", "b", "c"}
sort.Sort(SortBy[int, string]{x, y})
fmt.Println(x, y)
// Output:
// [1 2 3] [c b a]
}
94 changes: 69 additions & 25 deletions aggregators/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package aggregators

import (
"io"
"sync"
"sort"

"github.com/axiomhq/hyperloglog"
"golang.org/x/exp/slices"

"github.com/elastic/apm-aggregation/aggregationpb"
)
Expand Down Expand Up @@ -407,40 +408,83 @@ func mergeSpanMetrics(to, from *aggregationpb.SpanMetrics) {
to.Sum += from.Sum
}

// mapPool is a pool of maps to facilitate histogram merges.
var mapPool = sync.Pool{New: func() interface{} {
return make(map[int32]int64)
}}

// mergeHistogram merges histogram `from` to `to` in 2 steps.
// It mutates `from`.
// 1. Using binary search, it adds `from.Counts` to `to.Counts` if their corresponding bucket exists in to.Buckets.
// 2. If there are from.Buckets that do not exist in to.Buckets, allocate memory and merge the buckets and counts.
func mergeHistogram(to, from *aggregationpb.HDRHistogram) {
// Assume both histograms are created with same arguments
m := mapPool.Get().(map[int32]int64)
defer mapPool.Put(m)
for k := range m {
delete(m, k)
li, lj := len(to.Buckets), len(from.Buckets)

var extra int
for j := 0; j < lj; j++ {
i, found := sort.Find(li, func(i int) int {
// Avoid int overflow by comparing instead of subtracting
y := from.Buckets[j]
x := to.Buckets[i]
if y == x {
return 0
} else if y > x {
return 1
} else {
return -1
}
})
if found {
to.Counts[i] += from.Counts[j]
from.Counts[j] = 0
} else {
extra++
}
}

for i := 0; i < len(to.Buckets); i++ {
m[to.Buckets[i]] = to.Counts[i]
}
for i := 0; i < len(from.Buckets); i++ {
m[from.Buckets[i]] += from.Counts[i]
if extra == 0 {
return
}

if cap(to.Buckets) < len(m) {
to.Buckets = make([]int32, len(m))
}
if cap(to.Counts) < len(m) {
to.Counts = make([]int64, len(m))
to.Buckets = slices.Grow(to.Buckets, extra)
to.Counts = slices.Grow(to.Counts, extra)

buckets := to.Buckets[:li+extra]
counts := to.Counts[:li+extra]

i := li - 1
j := lj - 1
k := li + extra - 1
for j >= 0 && i >= 0 {
if from.Buckets[j] == to.Buckets[i] {
// No need to sum up for from.Buckets[j] == to.Buckets[i]
// because from.Counts[j] must be 0
j--
continue
} else if from.Buckets[j] > to.Buckets[i] {
buckets[k] = from.Buckets[j]
counts[k] = from.Counts[j]
j--
} else {
buckets[k] = to.Buckets[i]
counts[k] = to.Counts[i]
i--
}
k--
}

to.Buckets = to.Buckets[:0]
to.Counts = to.Counts[:0]
for i >= 0 {
buckets[k] = to.Buckets[i]
counts[k] = to.Counts[i]
i--
k--
}

for b, c := range m {
to.Buckets = append(to.Buckets, b)
to.Counts = append(to.Counts, c)
for j >= 0 {
buckets[k] = from.Buckets[j]
counts[k] = from.Counts[j]
j--
k--
}

to.Buckets = buckets
to.Counts = counts
}

// getServiceMetrics returns the service metric from a combined metrics based on the
Expand Down

0 comments on commit e12de07

Please sign in to comment.