-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
output/cloudv2: Compact histogram #3169
Changes from 6 commits
ca33225
6f7548d
0f676c5
b7cb2b3
e7bee24
976a8ff
1ec8c97
4f49f36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package expv2 | |
import ( | ||
"math" | ||
"math/bits" | ||
"sort" | ||
|
||
"go.k6.io/k6/output/cloud/expv2/pbcloud" | ||
) | ||
|
@@ -37,14 +38,13 @@ const ( | |
// The current version is: f(N = 25, m = 7) = 3200. | ||
type histogram struct { | ||
// Buckets stores the counters for each bin of the histogram. | ||
// It does not include the first and the last absolute bucket, | ||
// because they contain exception cases | ||
// and they requires to be tracked in a dedicated way. | ||
// | ||
// It is expected to start and end with a non-zero bucket, | ||
// in this way we can avoid extra allocation for not significant buckets. | ||
// All the zero buckets in between are preserved. | ||
Buckets []uint32 | ||
// It does not include counters for the untrackable values, | ||
// because they contain exception cases and require to be tracked in a dedicated way. | ||
Buckets map[uint32]uint32 | ||
|
||
// Indexes keeps an ordered slice of unique-seen buckets' indexes. | ||
// It allows to iterate the buckets in order. It uses an ascendent order. | ||
Indexes []uint32 | ||
|
||
// ExtraLowBucket counts occurrences of observed values smaller | ||
// than the minimum trackable value. | ||
|
@@ -54,16 +54,6 @@ type histogram struct { | |
// than the maximum trackable value. | ||
ExtraHighBucket uint32 | ||
|
||
// FirstNotZeroBucket represents the index of the first bucket | ||
// with a significant counter in the Buckets slice (a not zero value). | ||
// In this way, all the buckets before can be omitted. | ||
FirstNotZeroBucket uint32 | ||
|
||
// LastNotZeroBucket represents the index of the last bucket | ||
// with a significant counter in the Buckets slice (a not zero value). | ||
// In this way, all the buckets after can be omitted. | ||
LastNotZeroBucket uint32 | ||
|
||
// Max is the absolute maximum observed value. | ||
Max float64 | ||
|
||
|
@@ -77,19 +67,23 @@ type histogram struct { | |
Count uint32 | ||
} | ||
|
||
func newHistogram() *histogram { | ||
return &histogram{ | ||
Buckets: make(map[uint32]uint32), | ||
Max: -math.MaxFloat64, | ||
Min: math.MaxFloat64, | ||
} | ||
} | ||
|
||
// addToBucket increments the counter of the bucket of the provided value. | ||
// If the value is lower or higher than the trackable limits | ||
// then it is counted into specific buckets. All the stats are also updated accordingly. | ||
func (h *histogram) addToBucket(v float64) { | ||
if h.Count == 0 { | ||
h.Max, h.Min = v, v | ||
} else { | ||
if v > h.Max { | ||
h.Max = v | ||
} | ||
if v < h.Min { | ||
h.Min = v | ||
} | ||
if v > h.Max { | ||
h.Max = v | ||
} | ||
if v < h.Min { | ||
h.Min = v | ||
} | ||
|
||
h.Count++ | ||
|
@@ -104,92 +98,70 @@ func (h *histogram) addToBucket(v float64) { | |
return | ||
} | ||
|
||
index := resolveBucketIndex(v) | ||
|
||
// they grow the current Buckets slice if there isn't enough capacity. | ||
// | ||
// An example with growRight: | ||
// With Buckets [4, 1] and index equals to 5 | ||
// then we expect a slice like [4,1,0,0,0,0] | ||
// then the counter at 5th position will be incremented | ||
// generating the final slice [4,1,0,0,0,1] | ||
switch { | ||
case len(h.Buckets) == 0: | ||
h.init(index) | ||
case index < h.FirstNotZeroBucket: | ||
h.prependBuckets(index) | ||
case index > h.LastNotZeroBucket: | ||
h.appendBuckets(index) | ||
default: | ||
h.Buckets[index-h.FirstNotZeroBucket]++ | ||
ix := resolveBucketIndex(v) | ||
if _, contains := h.Buckets[ix]; !contains { | ||
h.trackBucket(ix) | ||
} | ||
} | ||
|
||
func (h *histogram) init(index uint32) { | ||
h.FirstNotZeroBucket = index | ||
h.LastNotZeroBucket = index | ||
h.Buckets = make([]uint32, 1, 32) | ||
h.Buckets[0] = 1 | ||
h.Buckets[ix]++ | ||
} | ||
|
||
// prependBuckets expands the buckets slice with zeros up to the required index, | ||
// then it increments the required bucket. | ||
func (h *histogram) prependBuckets(index uint32) { | ||
if h.FirstNotZeroBucket <= index { | ||
panic("buckets is already contains the requested index") | ||
} | ||
// trackBucket stores the unique seen buckets. | ||
func (h *histogram) trackBucket(index uint32) { | ||
i := sort.Search(len(h.Indexes), func(i int) bool { | ||
return h.Indexes[i] > index | ||
}) | ||
|
||
newLen := (h.FirstNotZeroBucket - index) + uint32(len(h.Buckets)) | ||
|
||
// TODO: we may consider to swap by sub-groups | ||
// e.g [4, 1] => [4, 1, 0, 0] => [0, 0, 4, 1] | ||
// It requires a benchmark if it is better than just copy it. | ||
|
||
newBuckets := make([]uint32, newLen) | ||
copy(newBuckets[h.FirstNotZeroBucket-index:], h.Buckets) | ||
h.Buckets = newBuckets | ||
// insert at the end | ||
if len(h.Indexes) == i { | ||
h.Indexes = append(h.Indexes, index) | ||
return | ||
} | ||
|
||
// Update the stats | ||
h.Buckets[0] = 1 | ||
h.FirstNotZeroBucket = index | ||
// insert at specific `i` | ||
h.Indexes = append(h.Indexes, 0) | ||
copy(h.Indexes[i+1:], h.Indexes[i:]) | ||
h.Indexes[i] = index | ||
} | ||
|
||
// appendBuckets expands the buckets slice with zeros buckets till the required index, | ||
// then it increments the required bucket. | ||
// If the slice has enough capacity then it reuses it without allocate. | ||
func (h *histogram) appendBuckets(index uint32) { | ||
if h.LastNotZeroBucket >= index { | ||
panic("buckets is already bigger than requested index") | ||
// histogramAsProto converts the histogram into the equivalent Protobuf version. | ||
func histogramAsProto(h *histogram, time int64) *pbcloud.TrendHdrValue { | ||
var ( | ||
counters []uint32 | ||
spans []*pbcloud.BucketSpan | ||
) | ||
|
||
// allocate only if at least one item is available, in the case of only | ||
// untrackable values, then Indexes and Buckets are expected to be empty. | ||
if len(h.Indexes) > 0 { | ||
// init the counters | ||
counters = make([]uint32, 1, len(h.Indexes)) | ||
counters[0] = h.Buckets[h.Indexes[0]] | ||
// open the first span | ||
spans = append(spans, &pbcloud.BucketSpan{Offset: h.Indexes[0], Length: 1}) | ||
} | ||
|
||
newLen := index - h.FirstNotZeroBucket + 1 | ||
|
||
if uint32(cap(h.Buckets)) > newLen { | ||
// See https://go.dev/ref/spec#Slice_expressions | ||
// "For slices, the upper index bound is | ||
// the slice capacity cap(a) rather than the length" | ||
h.Buckets = h.Buckets[:newLen] | ||
} else { | ||
newBuckets := make([]uint32, newLen) | ||
copy(newBuckets, h.Buckets) | ||
h.Buckets = newBuckets | ||
} | ||
for i := 1; i < len(h.Buckets); i++ { | ||
counters = append(counters, h.Buckets[h.Indexes[i]]) | ||
|
||
// Update the stats | ||
h.Buckets[len(h.Buckets)-1] = 1 | ||
h.LastNotZeroBucket = index | ||
} | ||
// if the current and the previous indexes are not consecutive | ||
// consider as closed the current on-going span and start a new one. | ||
if diff := h.Indexes[i] - h.Indexes[i-1]; diff > 1 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For more optimal space use this likely should be 3+ as adding 1 span is at least 2 counters (offset and length) . So we can add at least 2 zeros before we became even. "at least" here is as I don't know if there is some more overhead for this in the protobuf protocol edit: This can be done after the original merge as an optimization |
||
spans = append(spans, &pbcloud.BucketSpan{Offset: diff, Length: 1}) | ||
continue | ||
} | ||
|
||
spans[len(spans)-1].Length++ | ||
} | ||
|
||
// histogramAsProto converts the histogram into the equivalent Protobuf version. | ||
func histogramAsProto(h *histogram, time int64) *pbcloud.TrendHdrValue { | ||
hval := &pbcloud.TrendHdrValue{ | ||
Time: timestampAsProto(time), | ||
LowerCounterIndex: h.FirstNotZeroBucket, | ||
MinValue: h.Min, | ||
MaxValue: h.Max, | ||
Sum: h.Sum, | ||
Count: h.Count, | ||
Counters: h.Buckets, | ||
Time: timestampAsProto(time), | ||
MinValue: h.Min, | ||
MaxValue: h.Max, | ||
Sum: h.Sum, | ||
Count: h.Count, | ||
Counters: counters, | ||
Spans: spans, | ||
} | ||
if h.ExtraLowBucket > 0 { | ||
hval.ExtraLowValuesCounter = &h.ExtraLowBucket | ||
|
@@ -255,6 +227,7 @@ func resolveBucketIndex(val float64) uint32 { | |
return (nkdiff << k) + (upscaled >> nkdiff) | ||
} | ||
|
||
// Add implements the metricValue interface. | ||
func (h *histogram) Add(v float64) { | ||
h.addToBucket(v) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may consider using a Btree as an optimization. But I would like to achieve stability in the protocol before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can't just order the buckets indexes at the end 🤷. There is no need to constantly keep track of it -if we only want it to be in order 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it simplifies the code a lot, good suggestion.