-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
deltatocumulativeprocessor: enforce max bucket count for exphistograms #34157
Changes from 13 commits
fd2d9a5
f8c3f2f
3646903
6f19845
4cd2e97
0a8d2f9
e44a578
b4eb99c
68718cc
12338d3
0909365
bbce1fd
ee00b32
ddb3817
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: enhancement | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: deltatocumulativeprocessor | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: cap the number of exponential histogram buckets to 160 | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [33277] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -65,25 +65,95 @@ func (dp Histogram) Add(in Histogram) Histogram { | |||||||||
return dp | ||||||||||
} | ||||||||||
|
||||||||||
type bounds struct { | ||||||||||
lower int32 | ||||||||||
upper int32 | ||||||||||
} | ||||||||||
|
||||||||||
// with is an accessory for Merge() to calculate ideal combined scale. | ||||||||||
func (b bounds) with(o bounds) bounds { | ||||||||||
if o.empty() { | ||||||||||
return b | ||||||||||
} | ||||||||||
if b.empty() { | ||||||||||
return o | ||||||||||
} | ||||||||||
return bounds{ | ||||||||||
lower: min(b.lower, o.lower), | ||||||||||
upper: max(b.upper, o.upper), | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// empty indicates whether there are any values in a bounds. | ||||||||||
func (b bounds) empty() bool { | ||||||||||
return b == bounds{} | ||||||||||
} | ||||||||||
|
||||||||||
// boundsAtScale is an accessory for Add() to calculate ideal combined scale. | ||||||||||
func (dp ExpHistogram) boundsAtScale(b expo.Buckets, scale int32) bounds { | ||||||||||
if b.BucketCounts().Len() == 0 { | ||||||||||
return bounds{} | ||||||||||
} | ||||||||||
shift := dp.Scale() - scale | ||||||||||
a := expo.Abs(b) | ||||||||||
return bounds{ | ||||||||||
lower: int32(a.Lower()) >> shift, | ||||||||||
upper: int32(a.Upper()) >> shift, | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// downscaleNeeded computes how much downscaling is needed by shifting the | ||||||||||
// upper and lower bounds until they are separated by no more than size. | ||||||||||
func downscaleNeeded(b bounds, size int) int32 { | ||||||||||
var change int32 | ||||||||||
|
||||||||||
for b.upper-b.lower > int32(size) { | ||||||||||
b.upper >>= 1 | ||||||||||
b.lower >>= 1 | ||||||||||
Comment on lines
+111
to
+112
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
it's the same and reads easier if you're not super familier with bitshifts |
||||||||||
change++ | ||||||||||
} | ||||||||||
return change | ||||||||||
} | ||||||||||
|
||||||||||
func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram { | ||||||||||
type H = ExpHistogram | ||||||||||
|
||||||||||
if dp.Scale() != in.Scale() { | ||||||||||
hi, lo := expo.HiLo(dp, in, H.Scale) | ||||||||||
from, to := expo.Scale(hi.Scale()), expo.Scale(lo.Scale()) | ||||||||||
expo.Downscale(hi.Positive(), from, to) | ||||||||||
expo.Downscale(hi.Negative(), from, to) | ||||||||||
hi.SetScale(lo.Scale()) | ||||||||||
} | ||||||||||
minScale := min(dp.Scale(), in.Scale()) | ||||||||||
|
||||||||||
// logic is adapted from lightstep's algorithm for enforcing max buckets: | ||||||||||
// https://github.com/lightstep/go-expohisto/blob/4375bf4ef2858552204edb8b4572330c94a4a755/structure/exponential.go#L542 | ||||||||||
// first, calculate the highest and lowest indices for each bucket, given the candidate min scale. | ||||||||||
// then, calculate how much downscaling is needed to fit the merged range within max bucket count. | ||||||||||
// finally, perform the actual downscaling. | ||||||||||
posBounds := dp.boundsAtScale(dp.Positive(), minScale) | ||||||||||
posBounds = posBounds.with(in.boundsAtScale(in.Positive(), minScale)) | ||||||||||
|
||||||||||
negBounds := dp.boundsAtScale(dp.Negative(), minScale) | ||||||||||
negBounds = negBounds.with(in.boundsAtScale(in.Negative(), minScale)) | ||||||||||
|
||||||||||
minScale = min( | ||||||||||
minScale-downscaleNeeded(posBounds, dp.MaxSize), | ||||||||||
minScale-downscaleNeeded(negBounds, dp.MaxSize), | ||||||||||
) | ||||||||||
Comment on lines
+121
to
+137
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we move all of this logic into the maybe: package expo
// Limit returns the Scale a and b need to be downscaled to so that merging does
// not exceed the given max bucket length
func Limit(a, b DataPoint, max int) Scale {} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good. |
||||||||||
|
||||||||||
from, to := expo.Scale(dp.Scale()), expo.Scale(minScale) | ||||||||||
expo.Downscale(dp.Positive(), from, to) | ||||||||||
expo.Downscale(dp.Negative(), from, to) | ||||||||||
dp.SetScale(minScale) | ||||||||||
|
||||||||||
from = expo.Scale(in.Scale()) | ||||||||||
expo.Downscale(in.Positive(), from, to) | ||||||||||
expo.Downscale(in.Negative(), from, to) | ||||||||||
in.SetScale(minScale) | ||||||||||
|
||||||||||
expo.Merge(dp.Positive(), in.Positive()) | ||||||||||
expo.Merge(dp.Negative(), in.Negative()) | ||||||||||
|
||||||||||
if dp.ZeroThreshold() != in.ZeroThreshold() { | ||||||||||
hi, lo := expo.HiLo(dp, in, H.ZeroThreshold) | ||||||||||
expo.WidenZero(lo.DataPoint, hi.ZeroThreshold()) | ||||||||||
} | ||||||||||
|
||||||||||
expo.Merge(dp.Positive(), in.Positive()) | ||||||||||
expo.Merge(dp.Negative(), in.Negative()) | ||||||||||
Comment on lines
-84
to
-85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the reason to do merging before widening the zero bucket? Afaict widening must happen first, or we might merge buckets with a different zero threshhold, which is almost certainly wrong There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I don't remember any specific reason. This was probably an oversight and you are correct - we need to widen the zero first because it affects the bucket counts. |
||||||||||
|
||||||||||
dp.SetTimestamp(in.Timestamp()) | ||||||||||
dp.SetCount(dp.Count() + in.Count()) | ||||||||||
dp.SetZeroCount(dp.ZeroCount() + in.ZeroCount()) | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,10 +80,11 @@ func (dp Histogram) CopyTo(dst Histogram) { | |
|
||
type ExpHistogram struct { | ||
expo.DataPoint | ||
MaxSize int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doubles the size of each Given there is likely no need to configure this at runtime anyways, can we just have a constant on the expo package?
|
||
} | ||
|
||
func (dp ExpHistogram) Clone() ExpHistogram { | ||
clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()} | ||
clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint(), MaxSize: dp.MaxSize} | ||
if dp.DataPoint != (expo.DataPoint{}) { | ||
dp.CopyTo(clone) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,13 +103,5 @@ func Collapse(bs Buckets) { | |
counts.SetAt(i, counts.At(k)+counts.At(k+1)) | ||
} | ||
} | ||
|
||
// zero the excess area. its not needed to represent the observation | ||
// anymore, but kept for two reasons: | ||
// 1. future observations may need it, no need to re-alloc then if kept | ||
// 2. [pcommon.Uint64Slice] can not, in fact, be sliced, so getting rid | ||
// of it would alloc ¯\_(ツ)_/¯ | ||
for i := size; i < counts.Len(); i++ { | ||
counts.SetAt(i, 0) | ||
} | ||
counts.FromRaw(counts.AsRaw()[:size]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a tricky one. This change makes us reallocate all bucket counts every time we downscale, regardless of whether we are close to the limit or not. Generally, we should keep allocated memory even if we have no counts in there, because those might arrive in the future. This is fine because we enfore our limit by downscaling before, so we will never exceed it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC I had to do this because we don't explicitly store the upper bound of the bucket, so we rely on the length of the slice itself. We must know the upper bound of the bucket to calculate the new scale. We could explicitly track the "real" length somewhere but it adds complexity. FWIW I didn't see any noticeable hit to performance when this optimization was removed, so I went with the simpler approach. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,10 +23,11 @@ func TestExpoAdd(t *testing.T) { | |
var obs0 = expotest.Observe0 | ||
|
||
cases := []struct { | ||
name string | ||
dp, in expdp | ||
want expdp | ||
flip bool | ||
name string | ||
dp, in expdp | ||
want expdp | ||
flip bool | ||
maxSize int | ||
}{{ | ||
name: "noop", | ||
dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, | ||
|
@@ -37,6 +38,30 @@ func TestExpoAdd(t *testing.T) { | |
dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0}, | ||
in: expdp{PosNeg: bins{1, 2, 3, 4, 5, 6, 7, 8}.Into(), Count: 2 * (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8)}, | ||
want: expdp{PosNeg: bins{1, 2, 3, 4, 5, 6, 7, 8}.Into(), Count: 2 * (0 + (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8))}, | ||
}, {}, { | ||
name: "maxsize/1", | ||
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0}, | ||
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)}, | ||
want: expdp{PosNeg: bins{ø, ø, 0, 10, ø}.Into(), Scale: -3, Count: 2 * (0 + (1 + 2 + 3 + 4))}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not understand this test case. The max length is 1, but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. When downscaling, there is an edge case where we may need to extend the bucket by 1 in order to fit all the counts. So the actual bucket size might be 1 larger than max scale. |
||
maxSize: 1, | ||
}, {}, { | ||
name: "maxsize/2", | ||
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0}, | ||
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)}, | ||
want: expdp{PosNeg: bins{ø, ø, 0, 6, 4, ø}.Into(), Scale: -2, Count: 2 * (0 + (1 + 2 + 3 + 4))}, | ||
maxSize: 2, | ||
}, { | ||
name: "maxsize/4", | ||
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0}, | ||
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)}, | ||
want: expdp{PosNeg: bins{ø, 0, 0, 1, 5, 4, ø}.Into(), Scale: -1, Count: 2 * (0 + (1 + 2 + 3 + 4))}, | ||
maxSize: 4, | ||
}, { | ||
name: "maxsize/8", | ||
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0}, | ||
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)}, | ||
want: expdp{PosNeg: bins{0, 0, 0, 0, 1, 2, 3, 4}.Into(), Scale: 0, Count: 2 * (0 + (1 + 2 + 3 + 4))}, | ||
maxSize: 8, | ||
}, { | ||
name: "lower+shorter", | ||
dp: expdp{PosNeg: bins{ø, ø, ø, ø, ø, 1, 1, 1}.Into(), Count: 2 * 3}, | ||
|
@@ -85,7 +110,6 @@ func TestExpoAdd(t *testing.T) { | |
bs := pmetric.NewExponentialHistogramDataPointBuckets() | ||
expotest.ObserveInto(bs, expo.Scale(0), 1, 2, 3, 4) | ||
expotest.ObserveInto(bs, expo.Scale(0), 4, 3, 2, 1) | ||
bs.BucketCounts().Append([]uint64{0, 0}...) // rescaling leaves zeroed memory. this is expected | ||
return bs | ||
}()}, | ||
}} | ||
|
@@ -95,10 +119,15 @@ func TestExpoAdd(t *testing.T) { | |
return func(t *testing.T) { | ||
is := datatest.New(t) | ||
|
||
maxSize := 160 | ||
if cs.maxSize > 0 { | ||
maxSize = cs.maxSize | ||
} | ||
|
||
var ( | ||
dp = ExpHistogram{dp.Into()} | ||
in = ExpHistogram{in.Into()} | ||
want = ExpHistogram{cs.want.Into()} | ||
dp = ExpHistogram{dp.Into(), maxSize} | ||
in = ExpHistogram{in.Into(), maxSize} | ||
want = ExpHistogram{cs.want.Into(), maxSize} | ||
) | ||
|
||
dp.SetTimestamp(0) | ||
|
@@ -107,6 +136,7 @@ func TestExpoAdd(t *testing.T) { | |
|
||
got := dp.Add(in) | ||
is.Equal(want.DataPoint, got.DataPoint) | ||
is.Equalf(want.MaxSize, got.MaxSize, "MaxSize") | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we be more descriptive here? as a reader, I'm mostly interested in how this works, which is not explained.
i'd like to read something along the lines of "computes the bucket boundaries at given scale. it does so by dividing the bounds by two (>> operation) as many times as the scales differ."