From 6730d86df96d7e8da3a229e6161b6e54866f01c1 Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Thu, 16 Jan 2025 13:51:21 +0800 Subject: [PATCH] fix write path router dual ingestion (#3843) --- pkg/distributor/model/push.go | 26 ++++++ pkg/distributor/write_path/router.go | 84 ++++++++++--------- pkg/distributor/write_path/router_metrics.go | 11 ++- pkg/distributor/write_path/write_path.go | 11 ++- pkg/distributor/write_path/write_path_test.go | 25 ++++-- pkg/frontend/read_path/read_path.go | 2 +- 6 files changed, 107 insertions(+), 52 deletions(-) diff --git a/pkg/distributor/model/push.go b/pkg/distributor/model/push.go index 65141a836c..e8304abfc5 100644 --- a/pkg/distributor/model/push.go +++ b/pkg/distributor/model/push.go @@ -68,3 +68,29 @@ func getProfileLanguageFromSpy(spyName string) string { return "rust" } } + +func (req *PushRequest) Clone() *PushRequest { + c := &PushRequest{ + TenantID: req.TenantID, + RawProfileSize: req.RawProfileSize, + RawProfileType: req.RawProfileType, + Series: make([]*ProfileSeries, len(req.Series)), + TotalProfiles: req.TotalProfiles, + TotalBytesUncompressed: req.TotalBytesUncompressed, + } + for i, s := range req.Series { + c.Series[i] = &ProfileSeries{ + Labels: phlaremodel.Labels(s.Labels).Clone(), + Samples: make([]*ProfileSample, len(s.Samples)), + Language: s.Language, + } + for j, p := range s.Samples { + c.Series[i].Samples[j] = &ProfileSample{ + Profile: &pprof.Profile{Profile: p.Profile.Profile.CloneVT()}, + RawProfile: nil, + ID: p.ID, + } + } + } + return c +} diff --git a/pkg/distributor/write_path/router.go b/pkg/distributor/write_path/router.go index ced6570dc1..8e5facc5be 100644 --- a/pkg/distributor/write_path/router.go +++ b/pkg/distributor/write_path/router.go @@ -1,7 +1,6 @@ package writepath import ( - "bytes" "context" "fmt" "math/rand" @@ -13,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/google/uuid" "github.com/grafana/dskit/services" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "golang.org/x/sync/errgroup" @@ -21,7 +21,8 @@ import ( segmentwriterv1 "github.com/grafana/pyroscope/api/gen/proto/go/segmentwriter/v1" typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" distributormodel "github.com/grafana/pyroscope/pkg/distributor/model" - phlaremodel "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/pprof" + "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/util" "github.com/grafana/pyroscope/pkg/util/connectgrpc" httputil "github.com/grafana/pyroscope/pkg/util/http" @@ -124,11 +125,7 @@ func (m *Router) segwriterRoute(primary bool) *route { path: SegmentWriterPath, primary: primary, send: func(ctx context.Context, req *distributormodel.PushRequest) error { - // Prepare the requests: we're trying to avoid allocating extra - // memory for serialized profiles by reusing the source request - // capacities, iff the request won't be sent to ingester. - requests := convertRequest(req, !primary) - return m.sendRequestsToSegmentWriter(ctx, requests) + return m.sendRequestsToSegmentWriter(ctx, convertRequest(req)) }, } } @@ -153,14 +150,14 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque } if shouldSegwriter { segwriter = m.segwriterRoute(!shouldIngester) - if segwriter.primary { - // If the request is sent to segment-writer exclusively: - // the response returns to the client when the new write path - // returns. + if segwriter.primary && !config.AsyncIngest { + // The request is sent to segment-writer exclusively, and the client + // must block until the response returns. // Failure of the new write is returned to the client. // Failure of the old write path is NOT returned to the client. return m.send(segwriter)(ctx, req) } + // Request to the segment writer will be sent asynchronously. } // No write routes. This is possible if the write path is configured @@ -169,20 +166,38 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque return nil } - // If we ended up here, ingester is the primary route, - // and segment-writer is the secondary route. - c := m.sendAsync(ctx, req, ingester) - // We do not wait for the secondary request to complete. - // On shutdown, however, we will wait for all inflight - // requests to complete. - m.sendAsync(ctx, req, segwriter) + if segwriter != nil && ingester != nil { + // The request is to be sent to both asynchronously, therefore we're cloning it. + reqClone := req.Clone() + segwriterSend := segwriter.send + segwriter.send = func(context.Context, *distributormodel.PushRequest) error { + // We do not wait for the secondary request to complete. + // On shutdown, however, we will wait for all inflight + // requests to complete. + localCtx, cancel := context.WithTimeout(context.Background(), config.SegmentWriterTimeout) + localCtx = tenant.InjectTenantID(localCtx, req.TenantID) + if sp := opentracing.SpanFromContext(ctx); sp != nil { + localCtx = opentracing.ContextWithSpan(localCtx, sp) + } + defer cancel() + return segwriterSend(localCtx, reqClone) + } + } - select { - case err := <-c: - return err - case <-ctx.Done(): - return ctx.Err() + if segwriter != nil { + m.sendAsync(ctx, req, segwriter) } + + if ingester != nil { + select { + case err := <-m.sendAsync(ctx, req, ingester): + return err + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil } type sendFunc func(context.Context, *distributormodel.PushRequest) error @@ -257,11 +272,11 @@ func (m *Router) sendRequestsToSegmentWriter(ctx context.Context, requests []*se return g.Wait() } -func convertRequest(req *distributormodel.PushRequest, copy bool) []*segmentwriterv1.PushRequest { +func convertRequest(req *distributormodel.PushRequest) []*segmentwriterv1.PushRequest { r := make([]*segmentwriterv1.PushRequest, 0, len(req.Series)*2) for _, s := range req.Series { for _, p := range s.Samples { - r = append(r, convertProfile(p, s.Labels, req.TenantID, copy)) + r = append(r, convertProfile(p, s.Labels, req.TenantID)) } } return r @@ -271,25 +286,16 @@ func convertProfile( sample *distributormodel.ProfileSample, labels []*typesv1.LabelPair, tenantID string, - copy bool, ) *segmentwriterv1.PushRequest { - var b *bytes.Buffer - if copy { - b = bytes.NewBuffer(make([]byte, 0, cap(sample.RawProfile))) - } else { - b = bytes.NewBuffer(sample.RawProfile[:0]) - } - if _, err := sample.Profile.WriteTo(b); err != nil { + buf, err := pprof.Marshal(sample.Profile.Profile, true) + if err != nil { panic(fmt.Sprintf("failed to marshal profile: %v", err)) } profileID := uuid.New() return &segmentwriterv1.PushRequest{ - TenantId: tenantID, - // Note that labels are always copied because - // the API allows multiple profiles to refer to - // the same label set. - Labels: phlaremodel.Labels(labels).Clone(), - Profile: b.Bytes(), + TenantId: tenantID, + Labels: labels, + Profile: buf, ProfileId: profileID[:], } } diff --git a/pkg/distributor/write_path/router_metrics.go b/pkg/distributor/write_path/router_metrics.go index 9aac63808a..82a6586a3d 100644 --- a/pkg/distributor/write_path/router_metrics.go +++ b/pkg/distributor/write_path/router_metrics.go @@ -2,6 +2,7 @@ package writepath import ( "strconv" + "time" "github.com/prometheus/client_golang/prometheus" ) @@ -13,9 +14,13 @@ type metrics struct { func newMetrics(reg prometheus.Registerer) *metrics { m := &metrics{ durationHistogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Name: "pyroscope_write_path_downstream_request_duration_seconds", - Buckets: prometheus.ExponentialBucketsRange(0.001, 10, 30), - Help: "Duration of downstream requests made by the write path router.", + Name: "pyroscope_write_path_downstream_request_duration_seconds", + Help: "Duration of downstream requests made by the write path router.", + + Buckets: prometheus.ExponentialBucketsRange(0.001, 10, 30), + NativeHistogramBucketFactor: 1.1, + NativeHistogramMaxBucketNumber: 32, + NativeHistogramMinResetDuration: time.Hour, }, []string{"route", "primary", "status"}), } if reg != nil { diff --git a/pkg/distributor/write_path/write_path.go b/pkg/distributor/write_path/write_path.go index 91f0dd5a7d..23ef7da064 100644 --- a/pkg/distributor/write_path/write_path.go +++ b/pkg/distributor/write_path/write_path.go @@ -4,6 +4,7 @@ import ( "errors" "flag" "fmt" + "time" ) // WritePath controls the write path. @@ -53,9 +54,11 @@ func (m *WritePath) Set(text string) error { func (m *WritePath) String() string { return string(*m) } type Config struct { - WritePath WritePath `yaml:"write_path" json:"write_path" doc:"hidden"` - IngesterWeight float64 `yaml:"write_path_ingester_weight" json:"write_path_ingester_weight" doc:"hidden"` - SegmentWriterWeight float64 `yaml:"write_path_segment_writer_weight" json:"write_path_segment_writer_weight" doc:"hidden"` + WritePath WritePath `yaml:"write_path" json:"write_path" doc:"hidden"` + IngesterWeight float64 `yaml:"write_path_ingester_weight" json:"write_path_ingester_weight" doc:"hidden"` + SegmentWriterWeight float64 `yaml:"write_path_segment_writer_weight" json:"write_path_segment_writer_weight" doc:"hidden"` + SegmentWriterTimeout time.Duration `yaml:"write_path_segment_writer_timeout" json:"write_path_segment_writer_timeout" doc:"hidden"` + AsyncIngest bool `yaml:"async_ingest" json:"async_ingest" doc:"hidden"` } func (o *Config) RegisterFlags(f *flag.FlagSet) { @@ -65,4 +68,6 @@ func (o *Config) RegisterFlags(f *flag.FlagSet) { "Specifies the fraction [0:1] that should be send to ingester in combined mode. 0 means no traffics is sent to ingester. 1 means 100% of requests are sent to ingester.") f.Float64Var(&o.SegmentWriterWeight, "write-path.segment-writer-weight", 0, "Specifies the fraction [0:1] that should be send to segment-writer in combined mode. 0 means no traffics is sent to segment-writer. 1 means 100% of requests are sent to segment-writer.") + f.DurationVar(&o.SegmentWriterTimeout, "write-path.segment-writer-timeout", 5*time.Second, "Timeout for segment writer requests.") + f.BoolVar(&o.AsyncIngest, "async-ingest", false, "If true, the write path will not wait for the segment-writer to finish processing the request. Writes to ingester always synchronous.") } diff --git a/pkg/distributor/write_path/write_path_test.go b/pkg/distributor/write_path/write_path_test.go index 8a7df18719..1536eec8ce 100644 --- a/pkg/distributor/write_path/write_path_test.go +++ b/pkg/distributor/write_path/write_path_test.go @@ -122,6 +122,7 @@ func (s *routerTestSuite) Test_SegmentWriterPath() { func (s *routerTestSuite) Test_CombinedPath() { const ( N = 100 + w = 10 // Concurrent workers. f = 0.5 d = 0.3 // Allowed delta: note that f is just a probability. ) @@ -134,22 +135,34 @@ func (s *routerTestSuite) Test_CombinedPath() { var sentIngester atomic.Uint32 s.ingester.On("Push", mock.Anything, mock.Anything). - Run(func(mock.Arguments) { sentIngester.Add(1) }). + Run(func(m mock.Arguments) { + sentIngester.Add(1) + // Assert that no race condition occurs: we delete series + // attempting to access it concurrently with segment writer + // that should convert the distributor request to a segment + // writer request. + m.Get(1).(*distributormodel.PushRequest).Series = nil + }). Return(new(connect.Response[pushv1.PushResponse]), nil) var sentSegwriter atomic.Uint32 s.segwriter.On("Push", mock.Anything, mock.Anything). - Run(func(mock.Arguments) { sentSegwriter.Add(1) }). + Run(func(m mock.Arguments) { + sentSegwriter.Add(1) + m.Get(1).(*segmentwriterv1.PushRequest).Profile = nil + }). Return(new(segmentwriterv1.PushResponse), nil) - for i := 0; i < N; i++ { - s.Assert().NoError(s.router.Send(context.Background(), s.request)) + for i := 0; i < w; i++ { + for j := 0; j < N; j++ { + s.Assert().NoError(s.router.Send(context.Background(), s.request.Clone())) + } } s.router.inflight.Wait() - expected := N * f + expected := N * f * w delta := expected * d - s.Assert().Equal(N, int(sentIngester.Load())) + s.Assert().Equal(N*w, int(sentIngester.Load())) s.Assert().Greater(int(sentSegwriter.Load()), int(expected-delta)) s.Assert().Less(int(sentSegwriter.Load()), int(expected+delta)) } diff --git a/pkg/frontend/read_path/read_path.go b/pkg/frontend/read_path/read_path.go index 673f191047..930b351305 100644 --- a/pkg/frontend/read_path/read_path.go +++ b/pkg/frontend/read_path/read_path.go @@ -16,5 +16,5 @@ func (o *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&o.EnableQueryBackend, "enable-query-backend", false, "This parameter specifies whether the new query backend is enabled.") f.Var((*flagext.Time)(&o.EnableQueryBackendFrom), "enable-query-backend-from", - "This parameter specifies the point in time from which data is queried from the new query backend.") + "This parameter specifies the point in time from which data is queried from the new query backend. The format if RFC3339 (2020-10-20T00:00:00Z)") }