From ab55f558b0c8959dd09965041e97cf1ffe9d63d5 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov Date: Mon, 16 Dec 2024 16:31:58 +0100 Subject: [PATCH] querier: check size of whole frontend response before sending it (#10154) * Add test v1 Signed-off-by: Dimitar Dimitrov * querier: check size of whole frontend response before sending it ### Background Previously we'd check the size only of the body before sending a response to the query-frontend. The reason for this check is so that we don't leave it to the gRPC library. The library returns an error and doesn't tell the server anything. This results in timeouts at the query-frontend. ### Problem Because we only check the length of the body there are edge cases where the size of the whole response (including stats & headers) exceeds the limit. The gRPC library would still refuse to send the response and the frontend would time out waiting for it. ### Is it really happening? I discovered this in tests with very small limits. But there are probably realistic cases where this is happening: the body is just below the limit, add stats and headers, and it's over the limit. Signed-off-by: Dimitar Dimitrov * Add CHANGELOG.md entry Signed-off-by: Dimitar Dimitrov * Remove unused param Signed-off-by: Dimitar Dimitrov * remove unused param Signed-off-by: Dimitar Dimitrov * Update test Signed-off-by: Dimitar Dimitrov * Remove headers Signed-off-by: Dimitar Dimitrov * Undo changing the flag Signed-off-by: Dimitar Dimitrov * Include actual size and limit in error Signed-off-by: Dimitar Dimitrov * Cancel processor context in tests Signed-off-by: Dimitar Dimitrov * Increase test timeout to 1m Signed-off-by: Dimitar Dimitrov * Remove comment Signed-off-by: Dimitar Dimitrov --------- Signed-off-by: Dimitar Dimitrov --- CHANGELOG.md | 9 +- integration/query_frontend_test.go | 3 +- pkg/querier/worker/frontend_processor.go | 28 +-- pkg/querier/worker/frontend_processor_test.go | 170 +++++++++++++++++- 4 files changed, 189 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c82a36355db..2a12fd7e0e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,16 +1,15 @@ # Changelog -## 2.15.0-rc.0 +## main / unreleased + +### Grafana Mimir * [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236 * [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256 * [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168 * [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145 - -### Grafana Mimir - * [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185 - +* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154 ### Mixin diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 4cfdc35d643..75a55de735e 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -786,8 +786,7 @@ func TestQueryFrontendWithQueryShardingAndTooLargeEntityRequest(t *testing.T) { querySchedulerEnabled: false, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { flags = mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{ - // Set the maximum entity size to 100 bytes. - // The query result payload is 107 bytes, so it will be too large for the configured limit. + // The query result payload is 202 bytes, so it will be too large for the configured limit. "-querier.frontend-client.grpc-max-send-msg-size": "100", }) diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 31805530bbe..d87d110205e 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -122,10 +122,22 @@ func (fp *frontendProcessor) process(execCtx context.Context, c frontendv1pb.Fro go fp.runRequest(ctx, request.HttpRequest, request.StatsEnabled, time.Duration(request.QueueTimeNanos), func(response *httpgrpc.HTTPResponse, stats *querier_stats.Stats) error { defer inflightQuery.Store(false) - return c.Send(&frontendv1pb.ClientToFrontend{ + // Ensure responses that are too big are not retried. + msgToFrontend := &frontendv1pb.ClientToFrontend{ HttpResponse: response, Stats: stats, - }) + } + + if msgSize := msgToFrontend.Size(); msgSize >= fp.maxMessageSize { + errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", msgSize, fp.maxMessageSize) + msgToFrontend.HttpResponse = &httpgrpc.HTTPResponse{ + Code: http.StatusRequestEntityTooLarge, + Body: []byte(errMsg), + } + level.Error(fp.log).Log("msg", "query response larger than limit", "err", errMsg, "response_size", msgSize, "limit", fp.maxMessageSize) + } + + return c.Send(msgToFrontend) }) case frontendv1pb.GET_ID: @@ -169,17 +181,7 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H } } - // Ensure responses that are too big are not retried. - if len(response.Body) >= fp.maxMessageSize { - errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize) - response = &httpgrpc.HTTPResponse{ - Code: http.StatusRequestEntityTooLarge, - Body: []byte(errMsg), - } - level.Error(fp.log).Log("msg", "error processing query", "err", errMsg) - } - if err := sendHTTPResponse(response, stats); err != nil { - level.Error(fp.log).Log("msg", "error processing requests", "err", err) + level.Error(fp.log).Log("msg", "error sending query response", "err", err) } } diff --git a/pkg/querier/worker/frontend_processor_test.go b/pkg/querier/worker/frontend_processor_test.go index 2cd8244aa43..60f729e64ee 100644 --- a/pkg/querier/worker/frontend_processor_test.go +++ b/pkg/querier/worker/frontend_processor_test.go @@ -7,10 +7,16 @@ package worker import ( "context" + "fmt" + "net" + "os" + "strings" "testing" "time" "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/test" "github.com/stretchr/testify/assert" @@ -225,7 +231,7 @@ func prepareFrontendProcessor() (*frontendProcessor, *frontendProcessClientMock, requestHandler := &requestHandlerMock{} - fp := newFrontendProcessor(Config{QuerierID: "test-querier-id"}, requestHandler, log.NewNopLogger()) + fp := newFrontendProcessor(Config{QuerierID: "test-querier-id", QueryFrontendGRPCClientConfig: grpcclient.Config{MaxSendMsgSize: 1}}, requestHandler, log.NewNopLogger()) fp.frontendClientFactory = func(_ *grpc.ClientConn) frontendv1pb.FrontendClient { return frontendClient } @@ -302,3 +308,165 @@ func (m *frontendProcessClientMock) RecvMsg(msg interface{}) error { args := m.Called(msg) return args.Error(0) } + +type mockFrontendServer struct { + frontendv1pb.UnimplementedFrontendServer + receiveFunc func(*frontendv1pb.ClientToFrontend) error +} + +func (m *mockFrontendServer) Process(srv frontendv1pb.Frontend_ProcessServer) error { + // Send test HTTP request + err := srv.Send(&frontendv1pb.FrontendToClient{ + Type: frontendv1pb.HTTP_REQUEST, + HttpRequest: &httpgrpc.HTTPRequest{ + Method: "GET", + Url: "/test", + }, + StatsEnabled: true, + }) + if err != nil { + return err + } + + // Receive response + resp, err := srv.Recv() + if err != nil { + return err + } + + return m.receiveFunc(resp) +} + +type mockHandlerFunc func(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) + +func (m mockHandlerFunc) Handle(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + return m(ctx, req) +} + +func TestFrontendProcessor(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + + tests := []struct { + name string + customizeConfig func(*Config) + handlerResponse *httpgrpc.HTTPResponse + handlerError error + expectedResponse *httpgrpc.HTTPResponse + }{ + { + name: "success case", + handlerResponse: &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte("success"), + }, + expectedResponse: &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte("success"), + }, + }, + { + name: "response too large", + customizeConfig: func(cfg *Config) { + cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize = 100 + }, + handlerResponse: &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte(strings.Repeat("some very large response", 100)), + }, + expectedResponse: &httpgrpc.HTTPResponse{ + Code: 413, + Body: []byte("response larger than the max (2417 vs 100)"), + }, + }, + { + name: "small body but large headers", + customizeConfig: func(cfg *Config) { + cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize = 1000 + }, + handlerResponse: &httpgrpc.HTTPResponse{ + Code: 200, + Headers: []*httpgrpc.Header{ + {Key: "Header1", Values: []string{strings.Repeat("x", 500)}}, + }, + Body: []byte(strings.Repeat("x", 500)), + }, + expectedResponse: &httpgrpc.HTTPResponse{ + Code: 413, + Body: []byte("response larger than the max (1032 vs 1000)"), + }, + }, + { + name: "handler error", + handlerError: fmt.Errorf("handler error"), + expectedResponse: &httpgrpc.HTTPResponse{ + Code: 500, + Body: []byte("handler error"), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + srv := grpc.NewServer() + + receivedResponse := make(chan *httpgrpc.HTTPResponse, 1) + + // Setup mock frontend server + mockFrontend := &mockFrontendServer{ + receiveFunc: func(resp *frontendv1pb.ClientToFrontend) error { + receivedResponse <- resp.HttpResponse + return nil + }, + } + frontendv1pb.RegisterFrontendServer(srv, mockFrontend) + + // Start server + go func() { + _ = srv.Serve(lis) + }() + t.Cleanup(srv.Stop) + + // Create client connection + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + cfg := Config{} + flagext.DefaultValues(&cfg) + if tc.customizeConfig != nil { + tc.customizeConfig(&cfg) + } + + dialOpts, err := cfg.QueryFrontendGRPCClientConfig.DialOption(nil, nil) + require.NoError(t, err) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + + conn, err := grpc.NewClient(lis.Addr().String(), dialOpts...) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn.Close()) + }) + + mockHandler := mockHandlerFunc(func(_ context.Context, _ *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + if tc.handlerError != nil { + return nil, tc.handlerError + } + return tc.handlerResponse, nil + }) + + // Create frontend processor + processor := newFrontendProcessor(cfg, mockHandler, logger) + go processor.processQueriesOnSingleStream(ctx, conn, lis.Addr().String()) + + // Wait for response and verify + select { + case resp := <-receivedResponse: + require.Equal(t, *tc.expectedResponse, *resp) + case <-time.After(time.Minute): + t.Fatal("timeout waiting for response") + } + }) + } +}