Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deltatocumulativeprocessor: enforce max bucket count for exphistograms #34157

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/deltatocumulative-cap-exphisto.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulativeprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: cap the number of exponential histogram buckets to 160

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33277]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
4 changes: 2 additions & 2 deletions processor/deltatocumulativeprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ processors:
deltatocumulative:
# how long until a series not receiving new samples is removed
[ max_stale: <duration> | default = 5m ]

# upper limit of streams to track. new streams exceeding this limit
# will be dropped
[ max_streams: <int> | default = 9223372036854775807 (max int) ]
Expand All @@ -39,4 +39,4 @@ There is no further configuration required. All delta samples are converted to c
## Troubleshooting

When [Telemetry is
enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry), this component exports [several metrics](./documentation.md).
enabled](https://opentelemetry.io/docs/collector/configuration/#telemetry), this component exports [several metrics](./documentation.md).
90 changes: 80 additions & 10 deletions processor/deltatocumulativeprocessor/internal/data/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,95 @@ func (dp Histogram) Add(in Histogram) Histogram {
return dp
}

type bounds struct {
lower int32
upper int32
}

// with is an accessory for Merge() to calculate ideal combined scale.
func (b bounds) with(o bounds) bounds {
if o.empty() {
return b
}
if b.empty() {
return o
}
return bounds{
lower: min(b.lower, o.lower),
upper: max(b.upper, o.upper),
}
}

// empty indicates whether there are any values in a bounds.
func (b bounds) empty() bool {
return b == bounds{}
}

// boundsAtScale is an accessory for Add() to calculate ideal combined scale.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we be more descriptive here? as a reader, I'm mostly interested in how this works, which is not explained.

i'd like to read something along the lines of "computes the bucket boundaries at given scale. it does so by dividing the bounds by two (>> operation) as many times as the scales differ."

func (dp ExpHistogram) boundsAtScale(b expo.Buckets, scale int32) bounds {
if b.BucketCounts().Len() == 0 {
return bounds{}
}
shift := dp.Scale() - scale
a := expo.Abs(b)
return bounds{
lower: int32(a.Lower()) >> shift,
upper: int32(a.Upper()) >> shift,
}
}

// downscaleNeeded computes how much downscaling is needed by shifting the
// upper and lower bounds until they are separated by no more than size.
func downscaleNeeded(b bounds, size int) int32 {
var change int32

for b.upper-b.lower > int32(size) {
b.upper >>= 1
b.lower >>= 1
Comment on lines +111 to +112
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
b.upper >>= 1
b.lower >>= 1
b.upper /= 2
b.lower /= 2

it's the same and reads easier if you're not super familier with bitshifts

change++
}
return change
}

func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {
type H = ExpHistogram

if dp.Scale() != in.Scale() {
hi, lo := expo.HiLo(dp, in, H.Scale)
from, to := expo.Scale(hi.Scale()), expo.Scale(lo.Scale())
expo.Downscale(hi.Positive(), from, to)
expo.Downscale(hi.Negative(), from, to)
hi.SetScale(lo.Scale())
}
minScale := min(dp.Scale(), in.Scale())

// logic is adapted from lightstep's algorithm for enforcing max buckets:
// https://github.com/lightstep/go-expohisto/blob/4375bf4ef2858552204edb8b4572330c94a4a755/structure/exponential.go#L542
// first, calculate the highest and lowest indices for each bucket, given the candidate min scale.
// then, calculate how much downscaling is needed to fit the merged range within max bucket count.
// finally, perform the actual downscaling.
posBounds := dp.boundsAtScale(dp.Positive(), minScale)
posBounds = posBounds.with(in.boundsAtScale(in.Positive(), minScale))

negBounds := dp.boundsAtScale(dp.Negative(), minScale)
negBounds = negBounds.with(in.boundsAtScale(in.Negative(), minScale))

minScale = min(
minScale-downscaleNeeded(posBounds, dp.MaxSize),
minScale-downscaleNeeded(negBounds, dp.MaxSize),
)
Comment on lines +121 to +137
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move all of this logic into the expo package? This is generally useful and fits that packages purpose better.

maybe:

package expo

// Limit returns the Scale a and b need to be downscaled to so that merging does
// not exceed the given max bucket length
func Limit(a, b DataPoint, max int) Scale {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.


from, to := expo.Scale(dp.Scale()), expo.Scale(minScale)
expo.Downscale(dp.Positive(), from, to)
expo.Downscale(dp.Negative(), from, to)
dp.SetScale(minScale)

from = expo.Scale(in.Scale())
expo.Downscale(in.Positive(), from, to)
expo.Downscale(in.Negative(), from, to)
in.SetScale(minScale)

expo.Merge(dp.Positive(), in.Positive())
expo.Merge(dp.Negative(), in.Negative())

if dp.ZeroThreshold() != in.ZeroThreshold() {
hi, lo := expo.HiLo(dp, in, H.ZeroThreshold)
expo.WidenZero(lo.DataPoint, hi.ZeroThreshold())
}

expo.Merge(dp.Positive(), in.Positive())
expo.Merge(dp.Negative(), in.Negative())
Comment on lines -84 to -85
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason to do merging before widening the zero bucket?

Afaict widening must happen first, or we might merge buckets with a different zero threshhold, which is almost certainly wrong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, I don't remember any specific reason. This was probably an oversight and you are correct - we need to widen the zero first because it affects the bucket counts.


dp.SetTimestamp(in.Timestamp())
dp.SetCount(dp.Count() + in.Count())
dp.SetZeroCount(dp.ZeroCount() + in.ZeroCount())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func (dp Histogram) CopyTo(dst Histogram) {

type ExpHistogram struct {
expo.DataPoint
MaxSize int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doubles the size of each ExpHistogram value from 8 to 16.

Given there is likely no need to configure this at runtime anyways, can we just have a constant on the expo package?

expo.Limit(dp, in, expo.DefaultLimit)

}

func (dp ExpHistogram) Clone() ExpHistogram {
clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()}
clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint(), MaxSize: dp.MaxSize}
if dp.DataPoint != (expo.DataPoint{}) {
dp.CopyTo(clone)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,5 @@ func Collapse(bs Buckets) {
counts.SetAt(i, counts.At(k)+counts.At(k+1))
}
}

// zero the excess area. its not needed to represent the observation
// anymore, but kept for two reasons:
// 1. future observations may need it, no need to re-alloc then if kept
// 2. [pcommon.Uint64Slice] can not, in fact, be sliced, so getting rid
// of it would alloc ¯\_(ツ)_/¯
for i := size; i < counts.Len(); i++ {
counts.SetAt(i, 0)
}
counts.FromRaw(counts.AsRaw()[:size])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky one.

This change makes us reallocate all bucket counts every time we downscale, regardless of whether we are close to the limit or not.
Worse, it allocs twice because AsRaw copies, and FromRaw copies again.

Generally, we should keep allocated memory even if we have no counts in there, because those might arrive in the future. This is fine because we enfore our limit by downscaling before, so we will never exceed it.

Copy link
Contributor Author

@edma2 edma2 Nov 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC I had to do this because we don't explicitly store the upper bound of the bucket, so we rely on the length of the slice itself. We must know the upper bound of the bucket to calculate the new scale. We could explicitly track the "real" length somewhere but it adds complexity.

FWIW I didn't see any noticeable hit to performance when this optimization was removed, so I went with the simpler approach.

}
46 changes: 38 additions & 8 deletions processor/deltatocumulativeprocessor/internal/data/expo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ func TestExpoAdd(t *testing.T) {
obs0 := expotest.Observe0

cases := []struct {
name string
dp, in expdp
want expdp
flip bool
name string
dp, in expdp
want expdp
flip bool
maxSize int
}{{
name: "noop",
dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0},
Expand All @@ -37,6 +38,30 @@ func TestExpoAdd(t *testing.T) {
dp: expdp{PosNeg: bins{0, 0, 0, 0, 0, 0, 0, 0}.Into(), Count: 0},
in: expdp{PosNeg: bins{1, 2, 3, 4, 5, 6, 7, 8}.Into(), Count: 2 * (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8)},
want: expdp{PosNeg: bins{1, 2, 3, 4, 5, 6, 7, 8}.Into(), Count: 2 * (0 + (1 + 2 + 3 + 4 + 5 + 6 + 7 + 8))},
}, {}, {
name: "maxsize/1",
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0},
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)},
want: expdp{PosNeg: bins{ø, ø, 0, 10, ø}.Into(), Scale: -3, Count: 2 * (0 + (1 + 2 + 3 + 4))},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this test case.

The max length is 1, but want specifies 2 buckets (0, 10). why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. When downscaling, there is an edge case where we may need to extend the bucket by 1 in order to fit all the counts. So the actual bucket size might be 1 larger than max scale.

maxSize: 1,
}, {}, {
name: "maxsize/2",
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0},
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)},
want: expdp{PosNeg: bins{ø, ø, 0, 6, 4, ø}.Into(), Scale: -2, Count: 2 * (0 + (1 + 2 + 3 + 4))},
maxSize: 2,
}, {
name: "maxsize/4",
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0},
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)},
want: expdp{PosNeg: bins{ø, 0, 0, 1, 5, 4, ø}.Into(), Scale: -1, Count: 2 * (0 + (1 + 2 + 3 + 4))},
maxSize: 4,
}, {
name: "maxsize/8",
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0},
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)},
want: expdp{PosNeg: bins{0, 0, 0, 0, 1, 2, 3, 4}.Into(), Scale: 0, Count: 2 * (0 + (1 + 2 + 3 + 4))},
maxSize: 8,
}, {
name: "lower+shorter",
dp: expdp{PosNeg: bins{ø, ø, ø, ø, ø, 1, 1, 1}.Into(), Count: 2 * 3},
Expand Down Expand Up @@ -85,7 +110,6 @@ func TestExpoAdd(t *testing.T) {
bs := pmetric.NewExponentialHistogramDataPointBuckets()
expotest.ObserveInto(bs, expo.Scale(0), 1, 2, 3, 4)
expotest.ObserveInto(bs, expo.Scale(0), 4, 3, 2, 1)
bs.BucketCounts().Append([]uint64{0, 0}...) // rescaling leaves zeroed memory. this is expected
return bs
}()},
}}
Expand All @@ -95,10 +119,15 @@ func TestExpoAdd(t *testing.T) {
return func(t *testing.T) {
is := datatest.New(t)

maxSize := 160
if cs.maxSize > 0 {
maxSize = cs.maxSize
}

var (
dp = ExpHistogram{dp.Into()}
in = ExpHistogram{in.Into()}
want = ExpHistogram{cs.want.Into()}
dp = ExpHistogram{dp.Into(), maxSize}
in = ExpHistogram{in.Into(), maxSize}
want = ExpHistogram{cs.want.Into(), maxSize}
)

dp.SetTimestamp(0)
Expand All @@ -107,6 +136,7 @@ func TestExpoAdd(t *testing.T) {

got := dp.Add(in)
is.Equal(want.DataPoint, got.DataPoint)
is.Equalf(want.MaxSize, got.MaxSize, "MaxSize")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ func (s Histogram) SetAggregationTemporality(at pmetric.AggregationTemporality)

type ExpHistogram Metric

const expHistogramMaxSize = 160

func (s ExpHistogram) At(i int) data.ExpHistogram {
dp := Metric(s).ExponentialHistogram().DataPoints().At(i)
return data.ExpHistogram{DataPoint: dp}
dp := s.Metric.ExponentialHistogram().DataPoints().At(i)
return data.ExpHistogram{DataPoint: dp, MaxSize: expHistogramMaxSize}
}

func (s ExpHistogram) Len() int {
Expand All @@ -87,7 +89,7 @@ func (s ExpHistogram) Ident() Ident {

func (s ExpHistogram) Filter(expr func(data.ExpHistogram) bool) {
s.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
return !expr(data.ExpHistogram{DataPoint: dp})
return !expr(data.ExpHistogram{DataPoint: dp, MaxSize: expHistogramMaxSize})
})
}

Expand Down