diff --git a/CHANGELOG.md b/CHANGELOG.md index c82a36355db..2a12fd7e0e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,16 +1,15 @@ # Changelog -## 2.15.0-rc.0 +## main / unreleased + +### Grafana Mimir * [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236 * [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256 * [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168 * [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145 - -### Grafana Mimir - * [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185 - +* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154 ### Mixin diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 4cfdc35d643..75a55de735e 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -786,8 +786,7 @@ func TestQueryFrontendWithQueryShardingAndTooLargeEntityRequest(t *testing.T) { querySchedulerEnabled: false, setup: func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { flags = mergeFlags(BlocksStorageFlags(), BlocksStorageS3Flags(), map[string]string{ - // Set the maximum entity size to 100 bytes. - // The query result payload is 107 bytes, so it will be too large for the configured limit. + // The query result payload is 202 bytes, so it will be too large for the configured limit. "-querier.frontend-client.grpc-max-send-msg-size": "100", }) diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 31805530bbe..d87d110205e 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -122,10 +122,22 @@ func (fp *frontendProcessor) process(execCtx context.Context, c frontendv1pb.Fro go fp.runRequest(ctx, request.HttpRequest, request.StatsEnabled, time.Duration(request.QueueTimeNanos), func(response *httpgrpc.HTTPResponse, stats *querier_stats.Stats) error { defer inflightQuery.Store(false) - return c.Send(&frontendv1pb.ClientToFrontend{ + // Ensure responses that are too big are not retried. + msgToFrontend := &frontendv1pb.ClientToFrontend{ HttpResponse: response, Stats: stats, - }) + } + + if msgSize := msgToFrontend.Size(); msgSize >= fp.maxMessageSize { + errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", msgSize, fp.maxMessageSize) + msgToFrontend.HttpResponse = &httpgrpc.HTTPResponse{ + Code: http.StatusRequestEntityTooLarge, + Body: []byte(errMsg), + } + level.Error(fp.log).Log("msg", "query response larger than limit", "err", errMsg, "response_size", msgSize, "limit", fp.maxMessageSize) + } + + return c.Send(msgToFrontend) }) case frontendv1pb.GET_ID: @@ -169,17 +181,7 @@ func (fp *frontendProcessor) runRequest(ctx context.Context, request *httpgrpc.H } } - // Ensure responses that are too big are not retried. - if len(response.Body) >= fp.maxMessageSize { - errMsg := fmt.Sprintf("response larger than the max (%d vs %d)", len(response.Body), fp.maxMessageSize) - response = &httpgrpc.HTTPResponse{ - Code: http.StatusRequestEntityTooLarge, - Body: []byte(errMsg), - } - level.Error(fp.log).Log("msg", "error processing query", "err", errMsg) - } - if err := sendHTTPResponse(response, stats); err != nil { - level.Error(fp.log).Log("msg", "error processing requests", "err", err) + level.Error(fp.log).Log("msg", "error sending query response", "err", err) } } diff --git a/pkg/querier/worker/frontend_processor_test.go b/pkg/querier/worker/frontend_processor_test.go index 2cd8244aa43..60f729e64ee 100644 --- a/pkg/querier/worker/frontend_processor_test.go +++ b/pkg/querier/worker/frontend_processor_test.go @@ -7,10 +7,16 @@ package worker import ( "context" + "fmt" + "net" + "os" + "strings" "testing" "time" "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/test" "github.com/stretchr/testify/assert" @@ -225,7 +231,7 @@ func prepareFrontendProcessor() (*frontendProcessor, *frontendProcessClientMock, requestHandler := &requestHandlerMock{} - fp := newFrontendProcessor(Config{QuerierID: "test-querier-id"}, requestHandler, log.NewNopLogger()) + fp := newFrontendProcessor(Config{QuerierID: "test-querier-id", QueryFrontendGRPCClientConfig: grpcclient.Config{MaxSendMsgSize: 1}}, requestHandler, log.NewNopLogger()) fp.frontendClientFactory = func(_ *grpc.ClientConn) frontendv1pb.FrontendClient { return frontendClient } @@ -302,3 +308,165 @@ func (m *frontendProcessClientMock) RecvMsg(msg interface{}) error { args := m.Called(msg) return args.Error(0) } + +type mockFrontendServer struct { + frontendv1pb.UnimplementedFrontendServer + receiveFunc func(*frontendv1pb.ClientToFrontend) error +} + +func (m *mockFrontendServer) Process(srv frontendv1pb.Frontend_ProcessServer) error { + // Send test HTTP request + err := srv.Send(&frontendv1pb.FrontendToClient{ + Type: frontendv1pb.HTTP_REQUEST, + HttpRequest: &httpgrpc.HTTPRequest{ + Method: "GET", + Url: "/test", + }, + StatsEnabled: true, + }) + if err != nil { + return err + } + + // Receive response + resp, err := srv.Recv() + if err != nil { + return err + } + + return m.receiveFunc(resp) +} + +type mockHandlerFunc func(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) + +func (m mockHandlerFunc) Handle(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + return m(ctx, req) +} + +func TestFrontendProcessor(t *testing.T) { + logger := log.NewLogfmtLogger(os.Stdout) + + tests := []struct { + name string + customizeConfig func(*Config) + handlerResponse *httpgrpc.HTTPResponse + handlerError error + expectedResponse *httpgrpc.HTTPResponse + }{ + { + name: "success case", + handlerResponse: &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte("success"), + }, + expectedResponse: &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte("success"), + }, + }, + { + name: "response too large", + customizeConfig: func(cfg *Config) { + cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize = 100 + }, + handlerResponse: &httpgrpc.HTTPResponse{ + Code: 200, + Body: []byte(strings.Repeat("some very large response", 100)), + }, + expectedResponse: &httpgrpc.HTTPResponse{ + Code: 413, + Body: []byte("response larger than the max (2417 vs 100)"), + }, + }, + { + name: "small body but large headers", + customizeConfig: func(cfg *Config) { + cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize = 1000 + }, + handlerResponse: &httpgrpc.HTTPResponse{ + Code: 200, + Headers: []*httpgrpc.Header{ + {Key: "Header1", Values: []string{strings.Repeat("x", 500)}}, + }, + Body: []byte(strings.Repeat("x", 500)), + }, + expectedResponse: &httpgrpc.HTTPResponse{ + Code: 413, + Body: []byte("response larger than the max (1032 vs 1000)"), + }, + }, + { + name: "handler error", + handlerError: fmt.Errorf("handler error"), + expectedResponse: &httpgrpc.HTTPResponse{ + Code: 500, + Body: []byte("handler error"), + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + lis, err := net.Listen("tcp", "localhost:0") + require.NoError(t, err) + + srv := grpc.NewServer() + + receivedResponse := make(chan *httpgrpc.HTTPResponse, 1) + + // Setup mock frontend server + mockFrontend := &mockFrontendServer{ + receiveFunc: func(resp *frontendv1pb.ClientToFrontend) error { + receivedResponse <- resp.HttpResponse + return nil + }, + } + frontendv1pb.RegisterFrontendServer(srv, mockFrontend) + + // Start server + go func() { + _ = srv.Serve(lis) + }() + t.Cleanup(srv.Stop) + + // Create client connection + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + cfg := Config{} + flagext.DefaultValues(&cfg) + if tc.customizeConfig != nil { + tc.customizeConfig(&cfg) + } + + dialOpts, err := cfg.QueryFrontendGRPCClientConfig.DialOption(nil, nil) + require.NoError(t, err) + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + + conn, err := grpc.NewClient(lis.Addr().String(), dialOpts...) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, conn.Close()) + }) + + mockHandler := mockHandlerFunc(func(_ context.Context, _ *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + if tc.handlerError != nil { + return nil, tc.handlerError + } + return tc.handlerResponse, nil + }) + + // Create frontend processor + processor := newFrontendProcessor(cfg, mockHandler, logger) + go processor.processQueriesOnSingleStream(ctx, conn, lis.Addr().String()) + + // Wait for response and verify + select { + case resp := <-receivedResponse: + require.Equal(t, *tc.expectedResponse, *resp) + case <-time.After(time.Minute): + t.Fatal("timeout waiting for response") + } + }) + } +}