diff --git a/pkg/streamingpromql/types/fpoint_ring_buffer.go b/pkg/streamingpromql/types/fpoint_ring_buffer.go index 53dbd1d1387..2303e7d5568 100644 --- a/pkg/streamingpromql/types/fpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/fpoint_ring_buffer.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/util/pool" ) // FPointRingBuffer and HPointRingBuffer are nearly identical, but exist for each @@ -61,7 +62,7 @@ func (b *FPointRingBuffer) Append(p promql.FPoint) error { return err } - if !isPowerOfTwo(cap(newSlice)) { + if !pool.IsPowerOfTwo(cap(newSlice)) { // We rely on the capacity being a power of two for the pointsIndexMask optimisation below. // If we can guarantee that newSlice has a capacity that is a power of two in the future, then we can drop this check. return fmt.Errorf("pool returned slice of capacity %v (requested %v), but wanted a power of two", cap(newSlice), newSize) @@ -148,7 +149,7 @@ func (b *FPointRingBuffer) Release() { // should not return s to the pool themselves. // s must have a capacity that is a power of two. func (b *FPointRingBuffer) Use(s []promql.FPoint) error { - if !isPowerOfTwo(cap(s)) { + if !pool.IsPowerOfTwo(cap(s)) { // We rely on the capacity being a power of two for the pointsIndexMask optimisation below. return fmt.Errorf("slice capacity must be a power of two, but is %v", cap(s)) } @@ -261,7 +262,3 @@ func (v FPointRingBufferView) Any() bool { // These hooks exist so we can override them during unit tests. var getFPointSliceForRingBuffer = FPointSlicePool.Get var putFPointSliceForRingBuffer = FPointSlicePool.Put - -func isPowerOfTwo(n int) bool { - return (n & (n - 1)) == 0 -} diff --git a/pkg/streamingpromql/types/hpoint_ring_buffer.go b/pkg/streamingpromql/types/hpoint_ring_buffer.go index a34805d399b..f5b49b6193f 100644 --- a/pkg/streamingpromql/types/hpoint_ring_buffer.go +++ b/pkg/streamingpromql/types/hpoint_ring_buffer.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/grafana/mimir/pkg/streamingpromql/limiting" + "github.com/grafana/mimir/pkg/util/pool" ) // FPointRingBuffer and HPointRingBuffer are nearly identical, but exist for each @@ -121,7 +122,7 @@ func (b *HPointRingBuffer) NextPoint() (*promql.HPoint, error) { return nil, err } - if !isPowerOfTwo(cap(newSlice)) { + if !pool.IsPowerOfTwo(cap(newSlice)) { // We rely on the capacity being a power of two for the pointsIndexMask optimisation below. // If we can guarantee that newSlice has a capacity that is a power of two in the future, then we can drop this check. return nil, fmt.Errorf("pool returned slice of capacity %v (requested %v), but wanted a power of two", cap(newSlice), newSize) @@ -185,7 +186,7 @@ func (b *HPointRingBuffer) Release() { // should not return s to the pool themselves. // s must have a capacity that is a power of two. func (b *HPointRingBuffer) Use(s []promql.HPoint) error { - if !isPowerOfTwo(cap(s)) { + if !pool.IsPowerOfTwo(cap(s)) { // We rely on the capacity being a power of two for the pointsIndexMask optimisation below. return fmt.Errorf("slice capacity must be a power of two, but is %v", cap(s)) } diff --git a/pkg/streamingpromql/types/limiting_pool.go b/pkg/streamingpromql/types/limiting_pool.go index 2a614f4b798..6e2a01af8e8 100644 --- a/pkg/streamingpromql/types/limiting_pool.go +++ b/pkg/streamingpromql/types/limiting_pool.go @@ -13,7 +13,9 @@ import ( ) const ( - MaxExpectedPointsPerSeries = 131_072 // There's not too much science behind this number: 100,000 points allows for a point per minute for just under 70 days. Then we use the next power of two. + // There's not too much science behind this number: 100,000 points allows for a point per minute for just under 70 days. + // Then we use the next power of two, given the pools always return slices with capacity equal to a power of two. + MaxExpectedPointsPerSeries = 131_072 // Treat a native histogram sample as equivalent to this many float samples when considering max in-memory bytes limit. // Keep in mind that float sample = timestamp + float value, so 5x this is equivalent to five timestamps and five floats. diff --git a/pkg/streamingpromql/types/limiting_pool_test.go b/pkg/streamingpromql/types/limiting_pool_test.go index d8340b392b4..94212fe1581 100644 --- a/pkg/streamingpromql/types/limiting_pool_test.go +++ b/pkg/streamingpromql/types/limiting_pool_test.go @@ -25,7 +25,7 @@ func TestLimitingBucketedPool_Unlimited(t *testing.T) { tracker := limiting.NewMemoryConsumptionTracker(0, metric) p := NewLimitingBucketedPool( - pool.NewBucketedPool(1000, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }), + pool.NewBucketedPool(1024, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }), FPointSize, false, nil, @@ -78,7 +78,7 @@ func TestLimitingPool_Limited(t *testing.T) { tracker := limiting.NewMemoryConsumptionTracker(limit, metric) p := NewLimitingBucketedPool( - pool.NewBucketedPool(1000, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }), + pool.NewBucketedPool(1024, func(size int) []promql.FPoint { return make([]promql.FPoint, 0, size) }), FPointSize, false, nil, @@ -203,7 +203,7 @@ func TestLimitingPool_Mangling(t *testing.T) { tracker := limiting.NewMemoryConsumptionTracker(0, metric) p := NewLimitingBucketedPool( - pool.NewBucketedPool(1000, func(size int) []int { return make([]int, 0, size) }), + pool.NewBucketedPool(1024, func(size int) []int { return make([]int, 0, size) }), 1, false, func(_ int) int { return 123 }, @@ -228,99 +228,10 @@ func TestLimitingPool_Mangling(t *testing.T) { require.Equal(t, []int{123, 123, 123, 123}, s, "returned slice should be mangled when mangling is enabled") } -func TestLimitingBucketedPool_PowerOfTwoCapacities(t *testing.T) { - memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil) - - pool := NewLimitingBucketedPool( - pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }), - 1, - false, - nil, - ) - - cases := []struct { - requestedSize int - expectedCap int - }{ - {3, 4}, - {5, 8}, - {10, 16}, - {65_000, 65_536}, - {100_001, 131_072}, // Exceeds max, expect next power of two - } - - for _, c := range cases { - slice, err := pool.Get(c.requestedSize, memoryConsumptionTracker) - require.NoError(t, err, "Unexpected error when requesting size %d", c.requestedSize) - require.Equal(t, c.expectedCap, cap(slice), - "LimitingBucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap) - pool.Put(slice, memoryConsumptionTracker) - } -} - -func TestLimitingBucketedPool_UnreasonableSizeRequest(t *testing.T) { - const maxMemoryLimit = 1_000_000 * FPointSize - - reg, metric := createRejectedMetric() - memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(uint64(maxMemoryLimit), metric) - - pool := NewLimitingBucketedPool( - pool.NewBucketedPool(100_000, func(size int) []int { return make([]int, 0, size) }), - 1, - false, - nil, - ) - - // Request a reasonable size - slice, err := pool.Get(500_000, memoryConsumptionTracker) - require.NoError(t, err, "Expected to succeed for reasonable size request") - require.Equal(t, 524_288, cap(slice), "Capacity should be next power of two") - assertRejectedQueryCount(t, reg, 0) - - pool.Put(slice, memoryConsumptionTracker) - - // Request an unreasonable size - _, err = pool.Get(10_000_000, memoryConsumptionTracker) - require.Error(t, err, "Expected an error for unreasonably large size request") - require.Contains(t, err.Error(), "exceeded", "Error message should indicate memory consumption limit exceeded") - assertRejectedQueryCount(t, reg, 1) - - require.Equal(t, uint64(0), memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes, - "Current memory consumption should remain at 0 after rejected request") -} - func TestLimitingBucketedPool_MaxExpectedPointsPerSeriesConstantIsPowerOfTwo(t *testing.T) { // Although not strictly required (as the code should handle MaxExpectedPointsPerSeries not being a power of two correctly), // it is best that we keep it as one for now. - require.True(t, isPowerOfTwo(MaxExpectedPointsPerSeries), "MaxExpectedPointsPerSeries must be a power of two") -} - -func TestIsPowerOfTwo(t *testing.T) { - cases := []struct { - input int - expected bool - }{ - {-2, false}, - {1, true}, - {2, true}, - {3, false}, - {4, true}, - {5, false}, - {6, false}, - {7, false}, - {8, true}, - {16, true}, - {32, true}, - {1023, false}, - {1024, true}, - {1<<12 - 1, false}, - {1 << 12, true}, - } - - for _, c := range cases { - result := isPowerOfTwo(c.input) - require.Equalf(t, c.expected, result, "isPowerOfTwo(%d) should return %v", c.input, c.expected) - } + require.True(t, pool.IsPowerOfTwo(MaxExpectedPointsPerSeries), "MaxExpectedPointsPerSeries must be a power of two") } func assertRejectedQueryCount(t *testing.T, reg *prometheus.Registry, expectedRejectionCount int) { diff --git a/pkg/streamingpromql/types/pool.go b/pkg/streamingpromql/types/pool.go index 4734524d81f..7eed11885a8 100644 --- a/pkg/streamingpromql/types/pool.go +++ b/pkg/streamingpromql/types/pool.go @@ -9,7 +9,9 @@ import ( ) const ( - maxExpectedSeriesPerResult = 10_000_000 // There's not too much science behind this number: this is the based on examining the largest queries seen at Grafana Labs. + // There's not too much science behind this number: this is the based on examining the largest queries seen at Grafana Labs. + // The number must also align with a power of two for our pools. + maxExpectedSeriesPerResult = 8_388_608 ) var ( diff --git a/pkg/streamingpromql/types/pool_test.go b/pkg/streamingpromql/types/pool_test.go new file mode 100644 index 00000000000..fa5242f3b3d --- /dev/null +++ b/pkg/streamingpromql/types/pool_test.go @@ -0,0 +1,17 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package types + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/util/pool" +) + +func TestMaxExpectedSeriesPerResultConstantIsPowerOfTwo(t *testing.T) { + // Although not strictly required (as the code should handle maxExpectedSeriesPerResult not being a power of two correctly), + // it is best that we keep it as one for now. + require.True(t, pool.IsPowerOfTwo(maxExpectedSeriesPerResult), "maxExpectedSeriesPerResult must be a power of two") +} diff --git a/pkg/streamingpromql/types/ring_buffer_test.go b/pkg/streamingpromql/types/ring_buffer_test.go index 87b44f4487b..cb42d45ac65 100644 --- a/pkg/streamingpromql/types/ring_buffer_test.go +++ b/pkg/streamingpromql/types/ring_buffer_test.go @@ -135,6 +135,11 @@ func testRingBuffer[T any](t *testing.T, buf ringBuffer[T], points []T) { err = buf.Use(subsliceWithPowerOfTwoCapacity) require.NoError(t, err) shouldHavePoints(t, buf, points[4:]...) + + nonPowerOfTwoSlice := make([]T, 0, 15) + err = buf.Use(nonPowerOfTwoSlice) + require.EqualError(t, err, "slice capacity must be a power of two, but is 15", + "Error message should indicate the invalid capacity") } func TestRingBuffer_DiscardPointsBefore_ThroughWrapAround(t *testing.T) { @@ -502,27 +507,3 @@ func setupRingBufferTestingPools(t *testing.T) { putHPointSliceForRingBuffer = originalPutHPointSlice }) } - -func TestFPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) { - memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil) - buf := NewFPointRingBuffer(memoryConsumptionTracker) - - nonPowerOfTwoSlice := make([]promql.FPoint, 0, 15) - - err := buf.Use(nonPowerOfTwoSlice) - require.Error(t, err, "Use() should return an error for a non-power-of-two slice") - require.EqualError(t, err, "slice capacity must be a power of two, but is 15", - "Error message should indicate the invalid capacity") -} - -func TestHPointRingBuffer_UseReturnsErrorOnNonPowerOfTwoSlice(t *testing.T) { - memoryConsumptionTracker := limiting.NewMemoryConsumptionTracker(0, nil) - buf := NewHPointRingBuffer(memoryConsumptionTracker) - - nonPowerOfTwoSlice := make([]promql.HPoint, 0, 15) - - err := buf.Use(nonPowerOfTwoSlice) - require.Error(t, err, "Use() should return an error for a non-power-of-two slice") - require.EqualError(t, err, "slice capacity must be a power of two, but is 15", - "Error message should indicate the invalid capacity") -} diff --git a/pkg/util/pool/bucketed_pool.go b/pkg/util/pool/bucketed_pool.go index 37a01f096b9..8bb382b95e9 100644 --- a/pkg/util/pool/bucketed_pool.go +++ b/pkg/util/pool/bucketed_pool.go @@ -28,6 +28,8 @@ type BucketedPool[T ~[]E, E any] struct { func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *BucketedPool[T, E] { if maxSize <= 1 { panic("invalid maximum pool size") + } else if !IsPowerOfTwo(int(maxSize)) { + panic("bucket maxSize is not a power of two") } bucketCount := bits.Len(maxSize) @@ -42,9 +44,8 @@ func NewBucketedPool[T ~[]E, E any](maxSize uint, makeFunc func(int) T) *Buckete } // Get returns a new slice with capacity greater than or equal to size. -// If no bucket large enough exists, a slice larger than the requested size -// of the next power of two is returned. -// Get guarantees the resulting slice always has a capacity in power of twos. +// The resulting slice always has a capacity that is a power of two. +// If size is greater than maxSize, then a slice is still returned, however it may not be drawn from a pool. func (p *BucketedPool[T, E]) Get(size int) T { if size < 0 { panic(fmt.Sprintf("BucketedPool.Get with negative size %v", size)) @@ -56,7 +57,7 @@ func (p *BucketedPool[T, E]) Get(size int) T { bucketIndex := bits.Len(uint(size - 1)) - // If bucketIndex exceeds the number of available buckets, return a slice of the next power of two. + // If the requested size is larger than the size of the largest bucket, return a slice of the next power of two greater than or equal to size. if bucketIndex >= len(p.buckets) { nextPowerOfTwo := 1 << bucketIndex return p.make(nextPowerOfTwo) @@ -83,10 +84,11 @@ func (p *BucketedPool[T, E]) Put(s T) { bucketIndex := bits.Len(size - 1) if bucketIndex >= len(p.buckets) { + // This should never happen as maxSize is checked above, and enforced to be a power of 2 return // Ignore slices larger than the largest bucket } - // Ignore slices that do not align to the current power of 2 + // Ignore slices with capacity that is not a power of 2 // (this will only happen where a slice did not originally come from the pool). if size != (1 << bucketIndex) { return @@ -94,3 +96,7 @@ func (p *BucketedPool[T, E]) Put(s T) { p.buckets[bucketIndex].Put(s[0:0]) } + +func IsPowerOfTwo(n int) bool { + return (n & (n - 1)) == 0 +} diff --git a/pkg/util/pool/bucketed_pool_test.go b/pkg/util/pool/bucketed_pool_test.go index a7183694d13..373bd430b3a 100644 --- a/pkg/util/pool/bucketed_pool_test.go +++ b/pkg/util/pool/bucketed_pool_test.go @@ -64,7 +64,7 @@ func TestBucketedPool_HappyPath(t *testing.T) { } runTests := func(t *testing.T, returnToPool bool) { - testPool := NewBucketedPool(19, makeFunc) + testPool := NewBucketedPool(16, makeFunc) for _, c := range cases { ret := testPool.Get(c.size) require.Equal(t, c.expectedCap, cap(ret)) @@ -91,7 +91,7 @@ func TestBucketedPool_HappyPath(t *testing.T) { } func TestBucketedPool_SliceNotAlignedToBuckets(t *testing.T) { - pool := NewBucketedPool(1000, makeFunc) + pool := NewBucketedPool(1024, makeFunc) pool.Put(make([]int, 0, 5)) s := pool.Get(6) require.Equal(t, 8, cap(s)) @@ -99,7 +99,7 @@ func TestBucketedPool_SliceNotAlignedToBuckets(t *testing.T) { } func TestBucketedPool_PutEmptySlice(t *testing.T) { - pool := NewBucketedPool(1000, makeFunc) + pool := NewBucketedPool(1024, makeFunc) pool.Put([]int{}) s := pool.Get(1) require.Equal(t, 1, cap(s)) @@ -107,7 +107,7 @@ func TestBucketedPool_PutEmptySlice(t *testing.T) { } func TestBucketedPool_PutNilSlice(t *testing.T) { - pool := NewBucketedPool(1000, makeFunc) + pool := NewBucketedPool(1024, makeFunc) pool.Put(nil) s := pool.Get(1) require.Equal(t, 1, cap(s)) @@ -115,7 +115,7 @@ func TestBucketedPool_PutNilSlice(t *testing.T) { } func TestBucketedPool_PutSliceLargerThanMaximum(t *testing.T) { - pool := NewBucketedPool(100, makeFunc) + pool := NewBucketedPool(64, makeFunc) s1 := make([]int, 101) pool.Put(s1) s2 := pool.Get(101)[:101] @@ -124,10 +124,10 @@ func TestBucketedPool_PutSliceLargerThanMaximum(t *testing.T) { } func TestBucketedPool_GetSizeCloseToMax(t *testing.T) { - maxSize := 100000 + maxSize := 131072 pool := NewBucketedPool(uint(maxSize), makeFunc) - // Request a size that triggers the last bucket boundary. + // Request a slice with size that will be drawn from the last bucket in the pool. s := pool.Get(86401) // Check that we still get a slice with the correct size. @@ -135,45 +135,30 @@ func TestBucketedPool_GetSizeCloseToMax(t *testing.T) { require.Len(t, s, 0) } -func TestBucketedPool_AlwaysReturnsPowerOfTwoCapacities(t *testing.T) { - pool := NewBucketedPool(100_000, makeFunc) - +func TestIsPowerOfTwo(t *testing.T) { cases := []struct { - requestedSize int - expectedCap int + input int + expected bool }{ - {3, 4}, - {5, 8}, - {10, 16}, - {20, 32}, - {65_000, 65_536}, - {100_001, 131_072}, // Exceeds max bucket: next power of two is 131,072 + {-2, false}, + {1, true}, + {2, true}, + {3, false}, + {4, true}, + {5, false}, + {6, false}, + {7, false}, + {8, true}, + {16, true}, + {32, true}, + {1023, false}, + {1024, true}, + {1<<12 - 1, false}, + {1 << 12, true}, } for _, c := range cases { - slice := pool.Get(c.requestedSize) - - require.Equal(t, c.expectedCap, cap(slice), - "BucketedPool.Get() returned slice with capacity %d; expected %d", cap(slice), c.expectedCap) - - pool.Put(slice) + result := IsPowerOfTwo(c.input) + require.Equalf(t, c.expected, result, "isPowerOfTwo(%d) should return %v", c.input, c.expected) } } - -func TestBucketedPool_PutSizeCloseToMax(t *testing.T) { - maxSize := 100000 - pool := NewBucketedPool(uint(maxSize), makeFunc) - - // Create a slice with capacity that triggers the upper edge case - s := make([]int, 0, 65_000) // 86401 is close to maxSize but not aligned to power of 2 - - // Ensure Put does not panic when adding this slice - require.NotPanics(t, func() { - pool.Put(s) - }, "Put should not panic for sizes close to maxSize") - - // Validate that a subsequent Get for a smaller size works fine - ret := pool.Get(1) - require.Equal(t, 1, cap(ret)) - require.Len(t, ret, 0) -}