Skip to content

Commit 2981566

Browse files
Return server-side total bytes processed statistics as a header through query frontend (grafana#9645)
* Return server-side total bytes processed statistics as a header through the query frontend * update chanlog.md * lint error fix in changelog.md * Add bytes processed stats to existing Server-Timing header * Fix failing integration tests
1 parent 7fd21a9 commit 2981566

File tree

4 files changed

+32
-1
lines changed

4 files changed

+32
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* `-memberlist.max-concurrent-writes`
4545
* `-memberlist.acquire-writer-timeout`
4646
* [ENHANCEMENT] memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storm CPU activity in large clusters. #9594
47+
* [ENHANCEMENT] Return server-side total bytes processed statistics as a header through query frontend. #9645
4748
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
4849
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
4950
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508

integration/query_frontend_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) {
383383
if userID == 0 && cfg.queryStatsEnabled {
384384
res, _, err := c.QueryRaw("{instance=~\"hello.*\"}")
385385
require.NoError(t, err)
386-
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0])
386+
require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*, bytes_processed=[0-9.]*$", res.Header.Values("Server-Timing")[0])
387387
}
388388

389389
// Beyond the range of -querier.query-ingesters-within should return nothing. No need to repeat it for each user.

pkg/frontend/transport/handler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Head
484484
parts := make([]string, 0)
485485
parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime()))
486486
parts = append(parts, statsValue("response_time", queryResponseTime))
487+
parts = append(parts, statsBytesProcessedValue("bytes_processed", stats.LoadFetchedChunkBytes()+stats.LoadFetchedIndexBytes()))
487488
headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", "))
488489
}
489490
}
@@ -493,6 +494,10 @@ func statsValue(name string, d time.Duration) string {
493494
return name + ";dur=" + durationInMs
494495
}
495496

497+
func statsBytesProcessedValue(name string, value uint64) string {
498+
return name + "=" + strconv.FormatUint(value, 10)
499+
}
500+
496501
func httpRequestActivity(request *http.Request, userAgent string, requestParams url.Values) string {
497502
tenantID := "(unknown)"
498503
if tenantIDs, err := tenant.TenantIDs(request.Context()); err == nil {

pkg/frontend/transport/handler_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
9494
expectedMetrics int
9595
expectedActivity string
9696
expectedReadConsistency string
97+
assertHeaders func(t *testing.T, headers http.Header)
9798
}{
9899
{
99100
name: "handler with stats enabled, POST request with params",
@@ -284,6 +285,27 @@ func TestHandler_ServeHTTP(t *testing.T) {
284285
expectedActivity: "user:12345 UA: req:GET /api/v1/query query=some_metric&time=42",
285286
expectedReadConsistency: "",
286287
},
288+
{
289+
name: "handler with stats enabled, check ServiceTimingHeader",
290+
cfg: HandlerConfig{QueryStatsEnabled: true, MaxBodySize: 1024},
291+
request: func() *http.Request {
292+
req := httptest.NewRequest(http.MethodPost, "/api/v1/query", strings.NewReader("query=some_metric&time=42"))
293+
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
294+
return req
295+
},
296+
downstreamResponse: makeSuccessfulDownstreamResponse(),
297+
expectedStatusCode: 200,
298+
expectedParams: url.Values{
299+
"query": []string{"some_metric"},
300+
"time": []string{"42"},
301+
},
302+
expectedMetrics: 5,
303+
expectedActivity: "user:12345 UA: req:POST /api/v1/query query=some_metric&time=42",
304+
expectedReadConsistency: "",
305+
assertHeaders: func(t *testing.T, headers http.Header) {
306+
assert.Contains(t, headers.Get(ServiceTimingHeaderName), "bytes_processed=0")
307+
},
308+
},
287309
} {
288310
t.Run(tt.name, func(t *testing.T) {
289311
activityFile := filepath.Join(t.TempDir(), "activity-tracker")
@@ -370,6 +392,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
370392
if tt.expectedStatusCode >= 200 && tt.expectedStatusCode < 300 {
371393
require.Equal(t, int64(len(responseData)), msg["response_size_bytes"])
372394
}
395+
if tt.assertHeaders != nil {
396+
tt.assertHeaders(t, resp.Header())
397+
}
373398

374399
// Check that the HTTP or Protobuf request parameters are logged.
375400
paramsLogged := 0

0 commit comments

Comments
 (0)