Skip to content

Commit

Permalink
cloudv2: Use a compact version for histogram
Browse files Browse the repository at this point in the history
The histogram now uses a more compact solution for storing the
distribution. It tracks only the significant buckets.
  • Loading branch information
codebien committed Jul 6, 2023
1 parent 6f7548d commit 0f676c5
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 228 deletions.
149 changes: 60 additions & 89 deletions output/cloud/expv2/hdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package expv2
import (
"math"
"math/bits"
"sort"

"go.k6.io/k6/output/cloud/expv2/pbcloud"
)
Expand Down Expand Up @@ -37,14 +38,13 @@ const (
// The current version is: f(N = 25, m = 7) = 3200.
type histogram struct {
// Buckets stores the counters for each bin of the histogram.
// It does not include the first and the last absolute bucket,
// because they contain exception cases
// and they requires to be tracked in a dedicated way.
//
// It is expected to start and end with a non-zero bucket,
// in this way we can avoid extra allocation for not significant buckets.
// All the zero buckets in between are preserved.
Buckets []uint32
// It does not include counters for the untrackable values,
// because they contain exception cases and require to be tracked in a dedicated way.
Buckets map[uint32]uint32

// Indexes keeps an ordered slice of unique-seen buckets' indexes.
// It allows to iterate the buckets in order. It uses an ascendent order.
Indexes []uint32

// ExtraLowBucket counts occurrences of observed values smaller
// than the minimum trackable value.
Expand All @@ -54,16 +54,6 @@ type histogram struct {
// than the maximum trackable value.
ExtraHighBucket uint32

// FirstNotZeroBucket represents the index of the first bucket
// with a significant counter in the Buckets slice (a not zero value).
// In this way, all the buckets before can be omitted.
FirstNotZeroBucket uint32

// LastNotZeroBucket represents the index of the last bucket
// with a significant counter in the Buckets slice (a not zero value).
// In this way, all the buckets after can be omitted.
LastNotZeroBucket uint32

// Max is the absolute maximum observed value.
Max float64

Expand Down Expand Up @@ -104,92 +94,72 @@ func (h *histogram) addToBucket(v float64) {
return
}

index := resolveBucketIndex(v)
ix := resolveBucketIndex(v)
if _, contains := h.Buckets[ix]; !contains {
h.trackBucket(ix)
}

// they grow the current Buckets slice if there isn't enough capacity.
//
// An example with growRight:
// With Buckets [4, 1] and index equals to 5
// then we expect a slice like [4,1,0,0,0,0]
// then the counter at 5th position will be incremented
// generating the final slice [4,1,0,0,0,1]
switch {
case len(h.Buckets) == 0:
h.init(index)
case index < h.FirstNotZeroBucket:
h.prependBuckets(index)
case index > h.LastNotZeroBucket:
h.appendBuckets(index)
default:
h.Buckets[index-h.FirstNotZeroBucket]++
if h.Buckets == nil {
h.Buckets = make(map[uint32]uint32)
}
h.Buckets[ix]++
}

func (h *histogram) init(index uint32) {
h.FirstNotZeroBucket = index
h.LastNotZeroBucket = index
h.Buckets = make([]uint32, 1, 32)
h.Buckets[0] = 1
}
// trackBucket stores the unique seen buckets.
func (h *histogram) trackBucket(index uint32) {
i := sort.Search(len(h.Indexes), func(i int) bool {
return h.Indexes[i] > index
})

// prependBuckets expands the buckets slice with zeros up to the required index,
// then it increments the required bucket.
func (h *histogram) prependBuckets(index uint32) {
if h.FirstNotZeroBucket <= index {
panic("buckets is already contains the requested index")
// insert at the end
if len(h.Indexes) == i {
h.Indexes = append(h.Indexes, index)
return
}

newLen := (h.FirstNotZeroBucket - index) + uint32(len(h.Buckets))

// TODO: we may consider to swap by sub-groups
// e.g [4, 1] => [4, 1, 0, 0] => [0, 0, 4, 1]
// It requires a benchmark if it is better than just copy it.

newBuckets := make([]uint32, newLen)
copy(newBuckets[h.FirstNotZeroBucket-index:], h.Buckets)
h.Buckets = newBuckets

// Update the stats
h.Buckets[0] = 1
h.FirstNotZeroBucket = index
// insert in the middle
h.Indexes = append(h.Indexes, 0) // expand the slice
copy(h.Indexes[i+1:], h.Indexes[i:]) // make the space
h.Indexes[i] = index // set the index
}

// appendBuckets expands the buckets slice with zeros buckets till the required index,
// then it increments the required bucket.
// If the slice has enough capacity then it reuses it without allocate.
func (h *histogram) appendBuckets(index uint32) {
if h.LastNotZeroBucket >= index {
panic("buckets is already bigger than requested index")
// histogramAsProto converts the histogram into the equivalent Protobuf version.
func histogramAsProto(h *histogram, time int64) *pbcloud.TrendHdrValue {
var (
counters []uint32
spans []*pbcloud.BucketSpan
)

// allocate only if at least one item is available
if len(h.Indexes) > 0 {
// init the counters
counters = make([]uint32, 1, len(h.Indexes))
counters[0] = h.Buckets[h.Indexes[0]]
// open the first span
spans = append(spans, &pbcloud.BucketSpan{Offset: h.Indexes[0], Length: 1})
}

newLen := index - h.FirstNotZeroBucket + 1
for i := 1; i < len(h.Buckets); i++ {
counters = append(counters, h.Buckets[h.Indexes[i]])

if uint32(cap(h.Buckets)) > newLen {
// See https://go.dev/ref/spec#Slice_expressions
// "For slices, the upper index bound is
// the slice capacity cap(a) rather than the length"
h.Buckets = h.Buckets[:newLen]
} else {
newBuckets := make([]uint32, newLen)
copy(newBuckets, h.Buckets)
h.Buckets = newBuckets
}
// if the current and the previous indexes are not consecutive
// consider as closed the current on-going span and start a new one.
if diff := h.Indexes[i] - h.Indexes[i-1]; diff > 1 {
spans = append(spans, &pbcloud.BucketSpan{Offset: diff, Length: 1})
continue
}

// Update the stats
h.Buckets[len(h.Buckets)-1] = 1
h.LastNotZeroBucket = index
}
spans[len(spans)-1].Length++
}

// histogramAsProto converts the histogram into the equivalent Protobuf version.
func histogramAsProto(h *histogram, time int64) *pbcloud.TrendHdrValue {
hval := &pbcloud.TrendHdrValue{
Time: timestampAsProto(time),
LowerCounterIndex: h.FirstNotZeroBucket,
MinValue: h.Min,
MaxValue: h.Max,
Sum: h.Sum,
Count: h.Count,
Counters: h.Buckets,
Time: timestampAsProto(time),
MinValue: h.Min,
MaxValue: h.Max,
Sum: h.Sum,
Count: h.Count,
Counters: counters,
Spans: spans,
}
if h.ExtraLowBucket > 0 {
hval.ExtraLowValuesCounter = &h.ExtraLowBucket
Expand Down Expand Up @@ -255,6 +225,7 @@ func resolveBucketIndex(val float64) uint32 {
return (nkdiff << k) + (upscaled >> nkdiff)
}

// Add implements the metricValue interface.
func (h *histogram) Add(v float64) {
h.addToBucket(v)
}
Loading

0 comments on commit 0f676c5

Please sign in to comment.