-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
deltatocumulativeprocessor: enforce max bucket count for exphistograms #34157
Conversation
I'm probably missing some nuance, but I'm a bit surprised to see this responsibility added to deltatocumulativeprocessor. Is the problem specific to this processor? Could it also also be a problem for intervalprocessor, e.g. merging multiple sparsely populated cumulative histograms? I suppose the idea is that we should avoid downscaling unless max buckets is reached, in which case it wouldn't help to have a prior processor in the pipeline that downscales statelessly. Maybe in the long term there should be a single stateful aggregation processor that does both delta-to-cumulative and aggregation of cumulative metrics? |
While the problem might not be specific to only this one here, I'd prefer to get a working complete solution here first, and then refactor and identify which parts can be reused. I understand some things of this processor were already built with this in mind, and deltafill is an execution of that too. |
@RichieSams @jpkrohling do we need additional reviews for this PR? |
cc @sh0rez |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
Closed as inactive. Feel free to reopen if this PR is still being worked on. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey! thank you for contributing and so sorry for the late review.
I generally like your approach, but found some minor items :)
@@ -30,6 +30,9 @@ processors: | |||
# will be dropped | |||
[ max_streams: <int> | default = 0 (off) ] | |||
|
|||
# desired maximum number of buckets to represent in exponential histograms. | |||
# histograms will downscale as necessary to accommodate this limit | |||
[ max_exponential_histogram_buckets: <int> | default = 160 ] |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer relevant. removed configurability as suggested in another comment
return highLow{ | ||
low: 0, | ||
high: -1, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't the zero value be just as meaningful here, or is there a reason for specifically returning -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I can't remember if there was a reason but using the zero value is more idiomatic in Go so I'll use that
type highLow struct { | ||
low int32 | ||
high int32 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be always used for upper and lower bounds, so how about:
type Bounds struct {
Upper int32
Lower int32
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
// with is an accessory for Merge() to calculate ideal combined scale. | ||
func (h *highLow) with(o highLow) highLow { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not modifying h
, no need for a pointer ;)
also the width of 2x int32 is equal to a uintptr, so literally no difference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the suggestion, fixed
} | ||
|
||
// empty indicates whether there are any values in a highLow. | ||
func (h *highLow) empty() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again no need for a pointer receiver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
// changeScale computes how much downscaling is needed by shifting the | ||
// high and low values until they are separated by no more than size. | ||
func changeScale(hl highLow, size int) int32 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its not changing any scale, but returning a computed scaleChange, so lets reflect that in naming :)
func changeScale(hl highLow, size int) int32 { | |
func scaleChange(hl highLow, size int) int32 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to downscaleNeeded
for clarity
type ExpHistogram Metric | ||
type ExpHistogram struct { | ||
Metric | ||
MaxSize int | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having this a struct instead of a type alias has certain consequences (no longer castable) I wish to avoid for a future refactor I've planned for this code.
I know it's hard to include configurability with the current codebase, so I suggest to leave this out for now and just always use the recommended default. would that cause major headache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that should be okay if it aligns better long-term. I haven't tried anything other than the default tbh.
907742c
to
6f19845
Compare
@sh0rez thanks for the review! apologies on my delayed response as well |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
@sh0rez do you mind taking another look at this PR when you have a chance, thanks! |
@edma2 I'm reviewing again now. Sorry you had to wait so long and thanks for sticking around |
expo.Merge(dp.Positive(), in.Positive()) | ||
expo.Merge(dp.Negative(), in.Negative()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason to do merging before widening the zero bucket?
Afaict widening must happen first, or we might merge buckets with a different zero threshhold, which is almost certainly wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I don't remember any specific reason. This was probably an oversight and you are correct - we need to widen the zero first because it affects the bucket counts.
return b == bounds{} | ||
} | ||
|
||
// boundsAtScale is an accessory for Add() to calculate ideal combined scale. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we be more descriptive here? as a reader, I'm mostly interested in how this works, which is not explained.
i'd like to read something along the lines of "computes the bucket boundaries at given scale. it does so by dividing the bounds by two (>> operation) as many times as the scales differ."
b.upper >>= 1 | ||
b.lower >>= 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b.upper >>= 1 | |
b.lower >>= 1 | |
b.upper /= 2 | |
b.lower /= 2 |
it's the same and reads easier if you're not super familier with bitshifts
minScale := min(dp.Scale(), in.Scale()) | ||
|
||
// logic is adapted from lightstep's algorithm for enforcing max buckets: | ||
// https://github.com/lightstep/go-expohisto/blob/4375bf4ef2858552204edb8b4572330c94a4a755/structure/exponential.go#L542 | ||
// first, calculate the highest and lowest indices for each bucket, given the candidate min scale. | ||
// then, calculate how much downscaling is needed to fit the merged range within max bucket count. | ||
// finally, perform the actual downscaling. | ||
posBounds := dp.boundsAtScale(dp.Positive(), minScale) | ||
posBounds = posBounds.with(in.boundsAtScale(in.Positive(), minScale)) | ||
|
||
negBounds := dp.boundsAtScale(dp.Negative(), minScale) | ||
negBounds = negBounds.with(in.boundsAtScale(in.Negative(), minScale)) | ||
|
||
minScale = min( | ||
minScale-downscaleNeeded(posBounds, dp.MaxSize), | ||
minScale-downscaleNeeded(negBounds, dp.MaxSize), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move all of this logic into the expo
package? This is generally useful and fits that packages purpose better.
maybe:
package expo
// Limit returns the Scale a and b need to be downscaled to so that merging does
// not exceed the given max bucket length
func Limit(a, b DataPoint, max int) Scale {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
@@ -80,10 +80,11 @@ func (dp Histogram) CopyTo(dst Histogram) { | |||
|
|||
type ExpHistogram struct { | |||
expo.DataPoint | |||
MaxSize int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doubles the size of each ExpHistogram
value from 8 to 16.
Given there is likely no need to configure this at runtime anyways, can we just have a constant on the expo package?
expo.Limit(dp, in, expo.DefaultLimit)
for i := size; i < counts.Len(); i++ { | ||
counts.SetAt(i, 0) | ||
} | ||
counts.FromRaw(counts.AsRaw()[:size]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a tricky one.
This change makes us reallocate all bucket counts every time we downscale, regardless of whether we are close to the limit or not.
Worse, it allocs twice because AsRaw
copies, and FromRaw
copies again.
Generally, we should keep allocated memory even if we have no counts in there, because those might arrive in the future. This is fine because we enfore our limit by downscaling before, so we will never exceed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC I had to do this because we don't explicitly store the upper bound of the bucket, so we rely on the length of the slice itself. We must know the upper bound of the bucket to calculate the new scale. We could explicitly track the "real" length somewhere but it adds complexity.
FWIW I didn't see any noticeable hit to performance when this optimization was removed, so I went with the simpler approach.
name: "maxsize/1", | ||
dp: expdp{PosNeg: bins{0, 0, 0, ø}.Into(), Count: 0}, | ||
in: expdp{PosNeg: bins{ø, ø, ø, ø, 1, 2, 3, 4}.Into(), Count: 2 * (1 + 2 + 3 + 4)}, | ||
want: expdp{PosNeg: bins{ø, ø, 0, 10, ø}.Into(), Scale: -3, Count: 2 * (0 + (1 + 2 + 3 + 4))}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand this test case.
The max length is 1, but want
specifies 2 buckets (0, 10). why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. When downscaling, there is an edge case where we may need to extend the bucket by 1 in order to fit all the counts. So the actual bucket size might be 1 larger than max scale.
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
Closed as inactive. Feel free to reopen if this PR is still being worked on. |
Description:
When merging exponential histograms, maintain a max bucket count by scaling down as necessary. Without a max bucket count the number of buckets is unbounded and can cause an OOM.
Link to tracking Issue: #33277
Testing:
Unit tests and validated the fix in real-world environments.
Documentation:
Added flag to README.md.