Skip to content

Commit

Permalink
MQE: track number of processed samples in each query (#10232)
Browse files Browse the repository at this point in the history
* MQE: track number of processed samples in each query

* Updated how NH are counted to samples, update testing to check NaN's and NH's

* Address PR feedback and reduce duplication

* Address PR feedback: add test case for stale markers

* Clarify variable name in `runMixedMetricsTests`

---------

Co-authored-by: Martin Valiente Ainz <64830185+tinitiuset@users.noreply.github.com>
  • Loading branch information
charleskorn and tinitiuset authored Jan 8, 2025
1 parent 8d784cb commit 4ec3018
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 6 deletions.
140 changes: 138 additions & 2 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2651,14 +2651,14 @@ func runMixedMetricsTests(t *testing.T, expressions []string, pointsPerSeries in
q, err := prometheusEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval)
require.NoError(t, err)
defer q.Close()
expectedResults := q.Exec(context.Background())
prometheusResults := q.Exec(context.Background())

q, err = mimirEngine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, tr.interval)
require.NoError(t, err)
defer q.Close()
mimirResults := q.Exec(context.Background())

testutils.RequireEqualResults(t, expr, expectedResults, mimirResults, skipAnnotationComparison)
testutils.RequireEqualResults(t, expr, prometheusResults, mimirResults, skipAnnotationComparison)
})
}
}
Expand Down Expand Up @@ -2847,3 +2847,139 @@ func TestCompareVariousMixedMetricsComparisonOps(t *testing.T) {

runMixedMetricsTests(t, expressions, pointsPerSeries, seriesData, false)
}

func TestQueryStats(t *testing.T) {
opts := NewTestEngineOpts()
mimirEngine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), log.NewNopLogger())
require.NoError(t, err)

prometheusEngine := promql.NewEngine(opts.CommonOpts)

start := timestamp.Time(0)
end := start.Add(10 * time.Minute)

storage := promqltest.LoadedStorage(t, `
load 1m
dense_series 0 1 2 3 4 5 6 7 8 9 10
start_series 0 1 _ _ _ _ _ _ _ _ _
end_series _ _ _ _ _ 5 6 7 8 9 10
sparse_series 0 _ _ _ _ _ _ 7 _ _ _
stale_series 0 1 2 3 4 5 stale 7 8 9 10
nan_series NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN
native_histogram_series {{schema:0 sum:2 count:4 buckets:[1 2 1]}} {{sum:2 count:4 buckets:[1 2 1]}}
`)

runQueryAndGetTotalSamples := func(t *testing.T, engine promql.QueryEngine, expr string, isInstantQuery bool) int64 {
var q promql.Query
var err error

if isInstantQuery {
q, err = engine.NewInstantQuery(context.Background(), storage, nil, expr, end)
} else {
q, err = engine.NewRangeQuery(context.Background(), storage, nil, expr, start, end, time.Minute)
}

require.NoError(t, err)

defer q.Close()

res := q.Exec(context.Background())
require.NoError(t, res.Err)

return q.Stats().Samples.TotalSamples
}

testCases := map[string]struct {
expr string
isInstantQuery bool
expectedTotalSamples int64
}{
"instant vector selector with point at every time step": {
expr: `dense_series{}`,
expectedTotalSamples: 11,
},
"instant vector selector with points only in start of time range": {
expr: `start_series{}`,
expectedTotalSamples: 2 + 4, // 2 for original points, plus 4 for lookback to last point.
},
"instant vector selector with points only at end of time range": {
expr: `end_series{}`,
expectedTotalSamples: 6,
},
"instant vector selector with sparse points": {
expr: `sparse_series{}`,
expectedTotalSamples: 5 + 4, // 5 for first point at T=0, and 4 for second point at T=7
},
"instant vector selector with stale marker": {
expr: `stale_series{}`,
expectedTotalSamples: 10, // Instant vector selectors ignore stale markers.
},

"raw range vector selector with single point": {
expr: `dense_series[45s]`,
isInstantQuery: true,
expectedTotalSamples: 1,
},
"raw range vector selector with multiple points": {
expr: `dense_series[3m45s]`,
isInstantQuery: true,
expectedTotalSamples: 4,
},

"range vector selector with point at every time step": {
expr: `sum_over_time(dense_series{}[30s])`,
expectedTotalSamples: 11,
},
"range vector selector with points only in start of time range": {
expr: `sum_over_time(start_series{}[30s])`,
expectedTotalSamples: 2,
},
"range vector selector with points only at end of time range": {
expr: `sum_over_time(end_series{}[30s])`,
expectedTotalSamples: 6,
},
"range vector selector with sparse points": {
expr: `sum_over_time(sparse_series{}[30s])`,
expectedTotalSamples: 2,
},
"range vector selector where range overlaps previous step's range": {
expr: `sum_over_time(dense_series{}[1m30s])`,
expectedTotalSamples: 21, // Each step except the first selects two points.
},
"range vector selector with stale marker": {
expr: `count_over_time(stale_series{}[1m30s])`,
expectedTotalSamples: 19, // Each step except the first selects two points. Range vector selectors ignore stale markers.
},

"expression with multiple selectors": {
expr: `dense_series{} + end_series{}`,
expectedTotalSamples: 11 + 6,
},
"instant vector selector with NaNs": {
expr: `nan_series{}`,
expectedTotalSamples: 11,
},
"range vector selector with NaNs": {
expr: `sum_over_time(nan_series{}[1m])`,
expectedTotalSamples: 11,
},
"instant vector selector with native histograms": {
expr: `native_histogram_series{}`,
expectedTotalSamples: 78,
},
"range vector selector with native histograms": {
expr: `sum_over_time(native_histogram_series{}[1m])`,
expectedTotalSamples: 26,
},
}

for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
prometheusCount := runQueryAndGetTotalSamples(t, prometheusEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, prometheusCount, "invalid test case: expected samples does not match value from Prometheus' engine")

mimirCount := runQueryAndGetTotalSamples(t, mimirEngine, testCase.expr, testCase.isInstantQuery)
require.Equal(t, testCase.expectedTotalSamples, mimirCount)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type InstantVectorSelector struct {
Selector *Selector
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
Stats *types.QueryStats

chunkIterator chunkenc.Iterator
memoizedIterator *storage.MemoizedSeriesIterator
Expand Down Expand Up @@ -143,6 +144,9 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
data.Histograms = append(data.Histograms, promql.HPoint{T: stepT, H: h})
lastHistogramT = t
lastHistogram = h

// For consistency with Prometheus' engine, we convert each histogram point to an equivalent number of float points.
v.Stats.TotalSamples += types.EquivalentFloatSampleCount(h)
} else {
// Only create the slice once we know the series is a histogram or not.
if len(data.Floats) == 0 {
Expand All @@ -161,6 +165,8 @@ func (v *InstantVectorSelector) NextSeries(ctx context.Context) (types.InstantVe
return types.InstantVectorSeriesData{}, v.memoizedIterator.Err()
}

v.Stats.TotalSamples += int64(len(data.Floats))

return data, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func TestInstantVectorSelector_NativeHistogramPointerHandling(t *testing.T) {
LookbackDelta: 5 * time.Minute,
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Stats: &types.QueryStats{},
}

ctx := context.Background()
Expand Down Expand Up @@ -239,6 +240,7 @@ func TestInstantVectorSelector_SliceSizing(t *testing.T) {
LookbackDelta: 5 * time.Minute,
},
MemoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(0, nil),
Stats: &types.QueryStats{},
}

ctx := context.Background()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

type RangeVectorSelector struct {
Selector *Selector
Stats *types.QueryStats

rangeMilliseconds int64
chunkIterator chunkenc.Iterator
Expand All @@ -32,9 +33,10 @@ type RangeVectorSelector struct {

var _ types.RangeVectorOperator = &RangeVectorSelector{}

func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker) *RangeVectorSelector {
func NewRangeVectorSelector(selector *Selector, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, stats *types.QueryStats) *RangeVectorSelector {
return &RangeVectorSelector{
Selector: selector,
Stats: stats,
floats: types.NewFPointRingBuffer(memoryConsumptionTracker),
histograms: types.NewHPointRingBuffer(memoryConsumptionTracker),
stepData: &types.RangeVectorStepData{},
Expand Down Expand Up @@ -102,6 +104,8 @@ func (m *RangeVectorSelector) NextStepSamples() (*types.RangeVectorStepData, err
m.stepData.RangeStart = rangeStart
m.stepData.RangeEnd = rangeEnd

m.Stats.TotalSamples += int64(m.stepData.Floats.Count()) + m.stepData.Histograms.EquivalentFloatSampleCount()

return m.stepData, nil
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Query struct {
cancel context.CancelCauseFunc
memoryConsumptionTracker *limiting.MemoryConsumptionTracker
annotations *annotations.Annotations
stats *types.QueryStats

// Time range of the top-level query.
// Subqueries may use a different range.
Expand Down Expand Up @@ -79,6 +80,7 @@ func newQuery(ctx context.Context, queryable storage.Queryable, opts promql.Quer
qs: qs,
memoryConsumptionTracker: limiting.NewMemoryConsumptionTracker(maxEstimatedMemoryConsumptionPerQuery, engine.queriesRejectedDueToPeakMemoryConsumption),
annotations: annotations.New(),
stats: &types.QueryStats{},

statement: &parser.EvalStmt{
Expr: expr,
Expand Down Expand Up @@ -164,6 +166,7 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr, timeRange types

ExpressionPosition: e.PositionRange(),
},
Stats: q.stats,
}, nil
case *parser.AggregateExpr:
if !q.engine.featureToggles.EnableAggregationOperations {
Expand Down Expand Up @@ -350,7 +353,7 @@ func (q *Query) convertToRangeVectorOperator(expr parser.Expr, timeRange types.Q
ExpressionPosition: e.PositionRange(),
}

return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker), nil
return selectors.NewRangeVectorSelector(selector, q.memoryConsumptionTracker, q.stats), nil

case *parser.SubqueryExpr:
if !q.engine.featureToggles.EnableSubqueries {
Expand Down Expand Up @@ -836,8 +839,12 @@ func (q *Query) Statement() parser.Statement {
}

func (q *Query) Stats() *stats.Statistics {
// Not yet supported.
return nil
return &stats.Statistics{
Timers: stats.NewQueryTimers(),
Samples: &stats.QuerySamples{
TotalSamples: q.stats.TotalSamples,
},
}
}

func (q *Query) Cancel() {
Expand Down
16 changes: 16 additions & 0 deletions pkg/streamingpromql/types/hpoint_ring_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,22 @@ func (v HPointRingBufferView) Count() int {
return v.size
}

// EquivalentFloatSampleCount returns the equivalent number of float samples in this ring buffer view.
func (v HPointRingBufferView) EquivalentFloatSampleCount() int64 {
count := int64(0)
head, tail := v.UnsafePoints()

for _, p := range head {
count += EquivalentFloatSampleCount(p.H)
}

for _, p := range tail {
count += EquivalentFloatSampleCount(p.H)
}

return count
}

// Any returns true if this ring buffer view contains any points.
func (v HPointRingBufferView) Any() bool {
return v.size != 0
Expand Down
30 changes: 30 additions & 0 deletions pkg/streamingpromql/types/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/util/stats/query_stats.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Prometheus Authors

package types

import (
"unsafe"

"github.com/prometheus/prometheus/model/histogram"
)

// QueryStats tracks statistics about the execution of a single query.
//
// It is not safe to use this type from multiple goroutines simultaneously.
type QueryStats struct {
// The total number of samples processed during the query.
//
// In the case of range vector selectors, each sample is counted once for each time step it appears in.
// For example, if a query is running with a step of 30s with a range vector selector with range 45s,
// then samples in the overlapping 15s are counted twice.
TotalSamples int64
}

const timestampFieldSize = int64(unsafe.Sizeof(int64(0)))

func EquivalentFloatSampleCount(h *histogram.FloatHistogram) int64 {
return (int64(h.Size()) + timestampFieldSize) / int64(FPointSize)
}

0 comments on commit 4ec3018

Please sign in to comment.