Skip to content

Commit

Permalink
output/cloudv2: Compact histogram (#3169)
Browse files Browse the repository at this point in the history
The protocol for Trend metrics conversion now uses a more compact solution for storing the histogram's distribution. It tracks only the significant buckets.

The encoding is mostly reproducing what the Prometheus Native Histogram Protobuf encoding is doing.
  • Loading branch information
codebien committed Jul 10, 2023
1 parent d25e858 commit 3906dcf
Show file tree
Hide file tree
Showing 7 changed files with 451 additions and 395 deletions.
163 changes: 59 additions & 104 deletions output/cloud/expv2/hdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package expv2
import (
"math"
"math/bits"
"sort"

"go.k6.io/k6/output/cloud/expv2/pbcloud"
)
Expand Down Expand Up @@ -37,14 +38,9 @@ const (
// The current version is: f(N = 25, m = 7) = 3200.
type histogram struct {
// Buckets stores the counters for each bin of the histogram.
// It does not include the first and the last absolute bucket,
// because they contain exception cases
// and they requires to be tracked in a dedicated way.
//
// It is expected to start and end with a non-zero bucket,
// in this way we can avoid extra allocation for not significant buckets.
// All the zero buckets in between are preserved.
Buckets []uint32
// It does not include counters for the untrackable values,
// because they contain exception cases and require to be tracked in a dedicated way.
Buckets map[uint32]uint32

// ExtraLowBucket counts occurrences of observed values smaller
// than the minimum trackable value.
Expand All @@ -54,16 +50,6 @@ type histogram struct {
// than the maximum trackable value.
ExtraHighBucket uint32

// FirstNotZeroBucket represents the index of the first bucket
// with a significant counter in the Buckets slice (a not zero value).
// In this way, all the buckets before can be omitted.
FirstNotZeroBucket uint32

// LastNotZeroBucket represents the index of the last bucket
// with a significant counter in the Buckets slice (a not zero value).
// In this way, all the buckets after can be omitted.
LastNotZeroBucket uint32

// Max is the absolute maximum observed value.
Max float64

Expand All @@ -77,19 +63,23 @@ type histogram struct {
Count uint32
}

func newHistogram() *histogram {
return &histogram{
Buckets: make(map[uint32]uint32),
Max: -math.MaxFloat64,
Min: math.MaxFloat64,
}
}

// addToBucket increments the counter of the bucket of the provided value.
// If the value is lower or higher than the trackable limits
// then it is counted into specific buckets. All the stats are also updated accordingly.
func (h *histogram) addToBucket(v float64) {
if h.Count == 0 {
h.Max, h.Min = v, v
} else {
if v > h.Max {
h.Max = v
}
if v < h.Min {
h.Min = v
}
if v > h.Max {
h.Max = v
}
if v < h.Min {
h.Min = v
}

h.Count++
Expand All @@ -104,92 +94,56 @@ func (h *histogram) addToBucket(v float64) {
return
}

index := resolveBucketIndex(v)

// they grow the current Buckets slice if there isn't enough capacity.
//
// An example with growRight:
// With Buckets [4, 1] and index equals to 5
// then we expect a slice like [4,1,0,0,0,0]
// then the counter at 5th position will be incremented
// generating the final slice [4,1,0,0,0,1]
switch {
case len(h.Buckets) == 0:
h.init(index)
case index < h.FirstNotZeroBucket:
h.prependBuckets(index)
case index > h.LastNotZeroBucket:
h.appendBuckets(index)
default:
h.Buckets[index-h.FirstNotZeroBucket]++
}
}

func (h *histogram) init(index uint32) {
h.FirstNotZeroBucket = index
h.LastNotZeroBucket = index
h.Buckets = make([]uint32, 1, 32)
h.Buckets[0] = 1
h.Buckets[resolveBucketIndex(v)]++
}

// prependBuckets expands the buckets slice with zeros up to the required index,
// then it increments the required bucket.
func (h *histogram) prependBuckets(index uint32) {
if h.FirstNotZeroBucket <= index {
panic("buckets is already contains the requested index")
// histogramAsProto converts the histogram into the equivalent Protobuf version.
func histogramAsProto(h *histogram, time int64) *pbcloud.TrendHdrValue {
var (
indexes []uint32
counters []uint32
spans []*pbcloud.BucketSpan
)

// allocate only if at least one item is available, in the case of only
// untrackable values, then Indexes and Buckets are expected to be empty.
if len(h.Buckets) > 0 {
indexes = make([]uint32, 0, len(h.Buckets))
for index := range h.Buckets {
indexes = append(indexes, index)
}
sort.Slice(indexes, func(i, j int) bool {
return indexes[i] < indexes[j]
})

// init the counters
counters = make([]uint32, 1, len(h.Buckets))
counters[0] = h.Buckets[indexes[0]]
// open the first span
spans = append(spans, &pbcloud.BucketSpan{Offset: indexes[0], Length: 1})
}

newLen := (h.FirstNotZeroBucket - index) + uint32(len(h.Buckets))

// TODO: we may consider to swap by sub-groups
// e.g [4, 1] => [4, 1, 0, 0] => [0, 0, 4, 1]
// It requires a benchmark if it is better than just copy it.

newBuckets := make([]uint32, newLen)
copy(newBuckets[h.FirstNotZeroBucket-index:], h.Buckets)
h.Buckets = newBuckets

// Update the stats
h.Buckets[0] = 1
h.FirstNotZeroBucket = index
}

// appendBuckets expands the buckets slice with zeros buckets till the required index,
// then it increments the required bucket.
// If the slice has enough capacity then it reuses it without allocate.
func (h *histogram) appendBuckets(index uint32) {
if h.LastNotZeroBucket >= index {
panic("buckets is already bigger than requested index")
}
for i := 1; i < len(indexes); i++ {
counters = append(counters, h.Buckets[indexes[i]])

newLen := index - h.FirstNotZeroBucket + 1
// if the current and the previous indexes are not consecutive
// consider as closed the current on-going span and start a new one.
if diff := indexes[i] - indexes[i-1]; diff > 1 {
spans = append(spans, &pbcloud.BucketSpan{Offset: diff, Length: 1})
continue
}

if uint32(cap(h.Buckets)) > newLen {
// See https://go.dev/ref/spec#Slice_expressions
// "For slices, the upper index bound is
// the slice capacity cap(a) rather than the length"
h.Buckets = h.Buckets[:newLen]
} else {
newBuckets := make([]uint32, newLen)
copy(newBuckets, h.Buckets)
h.Buckets = newBuckets
spans[len(spans)-1].Length++
}

// Update the stats
h.Buckets[len(h.Buckets)-1] = 1
h.LastNotZeroBucket = index
}

// histogramAsProto converts the histogram into the equivalent Protobuf version.
func histogramAsProto(h *histogram, time int64) *pbcloud.TrendHdrValue {
hval := &pbcloud.TrendHdrValue{
Time: timestampAsProto(time),
LowerCounterIndex: h.FirstNotZeroBucket,
MinValue: h.Min,
MaxValue: h.Max,
Sum: h.Sum,
Count: h.Count,
Counters: h.Buckets,
Time: timestampAsProto(time),
MinValue: h.Min,
MaxValue: h.Max,
Sum: h.Sum,
Count: h.Count,
Counters: counters,
Spans: spans,
}
if h.ExtraLowBucket > 0 {
hval.ExtraLowValuesCounter = &h.ExtraLowBucket
Expand Down Expand Up @@ -255,6 +209,7 @@ func resolveBucketIndex(val float64) uint32 {
return (nkdiff << k) + (upscaled >> nkdiff)
}

// Add implements the metricValue interface.
func (h *histogram) Add(v float64) {
h.addToBucket(v)
}
Loading

0 comments on commit 3906dcf

Please sign in to comment.