Skip to content

Commit

Permalink
querier: check size of whole frontend response before sending it (#10154
Browse files Browse the repository at this point in the history
)

* Add test v1

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* querier: check size of whole frontend response before sending it

### Background

Previously we'd check the size only of the body before sending a response to the query-frontend. The reason for this check is so that we don't leave it to the gRPC library. The library returns an error and doesn't tell the server anything. This results in timeouts at the query-frontend.

### Problem

Because we only check the length of the body there are edge cases where the size of the whole response (including stats & headers) exceeds the limit. The gRPC library would still refuse to send the response and the frontend would time out waiting for it.

### Is it really happening?

I discovered this in tests with very small limits. But there are probably realistic cases where this is happening: the body is just below the limit, add stats and headers, and it's over the limit.

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Add CHANGELOG.md entry

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove unused param

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* remove unused param

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Update test

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove headers

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Undo changing the flag

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Include actual size and limit in error

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Cancel processor context in tests

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Increase test timeout to 1m

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

* Remove comment

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>

---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov authored Dec 16, 2024
1 parent 9b6bc36 commit ab55f55
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 21 deletions.
9 changes: 4 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
# Changelog

## 2.15.0-rc.0
## main / unreleased

### Grafana Mimir

* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145

### Grafana Mimir

* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185

* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154

### Mixin

Expand Down
3 changes: 1 addition & 2 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,8 +786,7 @@ func TestQueryFrontendWithQueryShardingAndTooLargeEntityRequest(t *testing.T) {
querySchedulerEnabled: false,
setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
flags = mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{
// Set the maximum entity size to 100 bytes.
// The query result payload is 107 bytes, so it will be too large for the configured limit.
// The query result payload is 202 bytes, so it will be too large for the configured limit.
"-querier.frontend-client.grpc-max-send-msg-size": "100",
})

Expand Down
28 changes: 15 additions & 13 deletions pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,22 @@ func (fp *frontendProcessor) process(execCtx context.Context, c frontendv1pb.Fro
go fp.runRequest(ctx, request.HttpRequest, request.StatsEnabled, time.Duration(request.QueueTimeNanos), func(response *httpgrpc.HTTPResponse, stats *querier_stats.Stats) error {
defer inflightQuery.Store(false)

return c.Send(&frontendv1pb.ClientToFrontend{
// Ensure responses that are too big are not retried.
msgToFrontend := &frontendv1pb.ClientToFrontend{
HttpResponse: response,
Stats: stats,
})
}

if msgSize := msgToFrontend.Size(); msgSize >= fp.maxMessageSize {
errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", msgSize, fp.maxMessageSize)
msgToFrontend.HttpResponse = &httpgrpc.HTTPResponse{
Code: http.StatusRequestEntityTooLarge,
Body: []byte(errMsg),
}
level.Error(fp.log).Log("msg", "query response larger than limit", "err", errMsg, "response_size", msgSize, "limit", fp.maxMessageSize)
}

return c.Send(msgToFrontend)
})

case frontendv1pb.GET_ID:
Expand Down Expand Up @@ -169,17 +181,7 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H
}
}

// Ensure responses that are too big are not retried.
if len(response.Body) >= fp.maxMessageSize {
errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize)
response = &httpgrpc.HTTPResponse{
Code: http.StatusRequestEntityTooLarge,
Body: []byte(errMsg),
}
level.Error(fp.log).Log("msg", "error processing query", "err", errMsg)
}

if err := sendHTTPResponse(response, stats); err != nil {
level.Error(fp.log).Log("msg", "error processing requests", "err", err)
level.Error(fp.log).Log("msg", "error sending query response", "err", err)
}
}
170 changes: 169 additions & 1 deletion pkg/querier/worker/frontend_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@ package worker

import (
"context"
"fmt"
"net"
"os"
"strings"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/test"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -225,7 +231,7 @@ func prepareFrontendProcessor() (*frontendProcessor, *frontendProcessClientMock,

requestHandler := &requestHandlerMock{}

fp := newFrontendProcessor(Config{QuerierID: "test-querier-id"}, requestHandler, log.NewNopLogger())
fp := newFrontendProcessor(Config{QuerierID: "test-querier-id", QueryFrontendGRPCClientConfig: grpcclient.Config{MaxSendMsgSize: 1}}, requestHandler, log.NewNopLogger())
fp.frontendClientFactory = func(_ *grpc.ClientConn) frontendv1pb.FrontendClient {
return frontendClient
}
Expand Down Expand Up @@ -302,3 +308,165 @@ func (m *frontendProcessClientMock) RecvMsg(msg interface{}) error {
args := m.Called(msg)
return args.Error(0)
}

type mockFrontendServer struct {
frontendv1pb.UnimplementedFrontendServer
receiveFunc func(*frontendv1pb.ClientToFrontend) error
}

func (m *mockFrontendServer) Process(srv frontendv1pb.Frontend_ProcessServer) error {
// Send test HTTP request
err := srv.Send(&frontendv1pb.FrontendToClient{
Type: frontendv1pb.HTTP_REQUEST,
HttpRequest: &httpgrpc.HTTPRequest{
Method: "GET",
Url: "/test",
},
StatsEnabled: true,
})
if err != nil {
return err
}

// Receive response
resp, err := srv.Recv()
if err != nil {
return err
}

return m.receiveFunc(resp)
}

type mockHandlerFunc func(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error)

func (m mockHandlerFunc) Handle(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
return m(ctx, req)
}

func TestFrontendProcessor(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stdout)

tests := []struct {
name string
customizeConfig func(*Config)
handlerResponse *httpgrpc.HTTPResponse
handlerError error
expectedResponse *httpgrpc.HTTPResponse
}{
{
name: "success case",
handlerResponse: &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte("success"),
},
expectedResponse: &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte("success"),
},
},
{
name: "response too large",
customizeConfig: func(cfg *Config) {
cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize = 100
},
handlerResponse: &httpgrpc.HTTPResponse{
Code: 200,
Body: []byte(strings.Repeat("some very large response", 100)),
},
expectedResponse: &httpgrpc.HTTPResponse{
Code: 413,
Body: []byte("response larger than the max (2417 vs 100)"),
},
},
{
name: "small body but large headers",
customizeConfig: func(cfg *Config) {
cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize = 1000
},
handlerResponse: &httpgrpc.HTTPResponse{
Code: 200,
Headers: []*httpgrpc.Header{
{Key: "Header1", Values: []string{strings.Repeat("x", 500)}},
},
Body: []byte(strings.Repeat("x", 500)),
},
expectedResponse: &httpgrpc.HTTPResponse{
Code: 413,
Body: []byte("response larger than the max (1032 vs 1000)"),
},
},
{
name: "handler error",
handlerError: fmt.Errorf("handler error"),
expectedResponse: &httpgrpc.HTTPResponse{
Code: 500,
Body: []byte("handler error"),
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

srv := grpc.NewServer()

receivedResponse := make(chan *httpgrpc.HTTPResponse, 1)

// Setup mock frontend server
mockFrontend := &mockFrontendServer{
receiveFunc: func(resp *frontendv1pb.ClientToFrontend) error {
receivedResponse <- resp.HttpResponse
return nil
},
}
frontendv1pb.RegisterFrontendServer(srv, mockFrontend)

// Start server
go func() {
_ = srv.Serve(lis)
}()
t.Cleanup(srv.Stop)

// Create client connection
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

cfg := Config{}
flagext.DefaultValues(&cfg)
if tc.customizeConfig != nil {
tc.customizeConfig(&cfg)
}

dialOpts, err := cfg.QueryFrontendGRPCClientConfig.DialOption(nil, nil)
require.NoError(t, err)
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))

conn, err := grpc.NewClient(lis.Addr().String(), dialOpts...)
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, conn.Close())
})

mockHandler := mockHandlerFunc(func(_ context.Context, _ *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
if tc.handlerError != nil {
return nil, tc.handlerError
}
return tc.handlerResponse, nil
})

// Create frontend processor
processor := newFrontendProcessor(cfg, mockHandler, logger)
go processor.processQueriesOnSingleStream(ctx, conn, lis.Addr().String())

// Wait for response and verify
select {
case resp := <-receivedResponse:
require.Equal(t, *tc.expectedResponse, *resp)
case <-time.After(time.Minute):
t.Fatal("timeout waiting for response")
}
})
}
}

0 comments on commit ab55f55

Please sign in to comment.