Skip to content

Commit

Permalink
fix write path router dual ingestion (#3843)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae authored Jan 16, 2025
1 parent 2d19e31 commit 6730d86
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 52 deletions.
26 changes: 26 additions & 0 deletions pkg/distributor/model/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,29 @@ func getProfileLanguageFromSpy(spyName string) string {
return "rust"
}
}

func (req *PushRequest) Clone() *PushRequest {
c := &PushRequest{
TenantID: req.TenantID,
RawProfileSize: req.RawProfileSize,
RawProfileType: req.RawProfileType,
Series: make([]*ProfileSeries, len(req.Series)),
TotalProfiles: req.TotalProfiles,
TotalBytesUncompressed: req.TotalBytesUncompressed,
}
for i, s := range req.Series {
c.Series[i] = &ProfileSeries{
Labels: phlaremodel.Labels(s.Labels).Clone(),
Samples: make([]*ProfileSample, len(s.Samples)),
Language: s.Language,
}
for j, p := range s.Samples {
c.Series[i].Samples[j] = &ProfileSample{
Profile: &pprof.Profile{Profile: p.Profile.Profile.CloneVT()},
RawProfile: nil,
ID: p.ID,
}
}
}
return c
}
84 changes: 45 additions & 39 deletions pkg/distributor/write_path/router.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package writepath

import (
"bytes"
"context"
"fmt"
"math/rand"
Expand All @@ -13,6 +12,7 @@ import (
"github.com/go-kit/log"
"github.com/google/uuid"
"github.com/grafana/dskit/services"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/sync/errgroup"
Expand All @@ -21,7 +21,8 @@ import (
segmentwriterv1 "github.com/grafana/pyroscope/api/gen/proto/go/segmentwriter/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
distributormodel "github.com/grafana/pyroscope/pkg/distributor/model"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/tenant"
"github.com/grafana/pyroscope/pkg/util"
"github.com/grafana/pyroscope/pkg/util/connectgrpc"
httputil "github.com/grafana/pyroscope/pkg/util/http"
Expand Down Expand Up @@ -124,11 +125,7 @@ func (m *Router) segwriterRoute(primary bool) *route {
path: SegmentWriterPath,
primary: primary,
send: func(ctx context.Context, req *distributormodel.PushRequest) error {
// Prepare the requests: we're trying to avoid allocating extra
// memory for serialized profiles by reusing the source request
// capacities, iff the request won't be sent to ingester.
requests := convertRequest(req, !primary)
return m.sendRequestsToSegmentWriter(ctx, requests)
return m.sendRequestsToSegmentWriter(ctx, convertRequest(req))
},
}
}
Expand All @@ -153,14 +150,14 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque
}
if shouldSegwriter {
segwriter = m.segwriterRoute(!shouldIngester)
if segwriter.primary {
// If the request is sent to segment-writer exclusively:
// the response returns to the client when the new write path
// returns.
if segwriter.primary && !config.AsyncIngest {
// The request is sent to segment-writer exclusively, and the client
// must block until the response returns.
// Failure of the new write is returned to the client.
// Failure of the old write path is NOT returned to the client.
return m.send(segwriter)(ctx, req)
}
// Request to the segment writer will be sent asynchronously.
}

// No write routes. This is possible if the write path is configured
Expand All @@ -169,20 +166,38 @@ func (m *Router) sendToBoth(ctx context.Context, req *distributormodel.PushReque
return nil
}

// If we ended up here, ingester is the primary route,
// and segment-writer is the secondary route.
c := m.sendAsync(ctx, req, ingester)
// We do not wait for the secondary request to complete.
// On shutdown, however, we will wait for all inflight
// requests to complete.
m.sendAsync(ctx, req, segwriter)
if segwriter != nil && ingester != nil {
// The request is to be sent to both asynchronously, therefore we're cloning it.
reqClone := req.Clone()
segwriterSend := segwriter.send
segwriter.send = func(context.Context, *distributormodel.PushRequest) error {
// We do not wait for the secondary request to complete.
// On shutdown, however, we will wait for all inflight
// requests to complete.
localCtx, cancel := context.WithTimeout(context.Background(), config.SegmentWriterTimeout)
localCtx = tenant.InjectTenantID(localCtx, req.TenantID)
if sp := opentracing.SpanFromContext(ctx); sp != nil {
localCtx = opentracing.ContextWithSpan(localCtx, sp)
}
defer cancel()
return segwriterSend(localCtx, reqClone)
}
}

select {
case err := <-c:
return err
case <-ctx.Done():
return ctx.Err()
if segwriter != nil {
m.sendAsync(ctx, req, segwriter)
}

if ingester != nil {
select {
case err := <-m.sendAsync(ctx, req, ingester):
return err
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

type sendFunc func(context.Context, *distributormodel.PushRequest) error
Expand Down Expand Up @@ -257,11 +272,11 @@ func (m *Router) sendRequestsToSegmentWriter(ctx context.Context, requests []*se
return g.Wait()
}

func convertRequest(req *distributormodel.PushRequest, copy bool) []*segmentwriterv1.PushRequest {
func convertRequest(req *distributormodel.PushRequest) []*segmentwriterv1.PushRequest {
r := make([]*segmentwriterv1.PushRequest, 0, len(req.Series)*2)
for _, s := range req.Series {
for _, p := range s.Samples {
r = append(r, convertProfile(p, s.Labels, req.TenantID, copy))
r = append(r, convertProfile(p, s.Labels, req.TenantID))
}
}
return r
Expand All @@ -271,25 +286,16 @@ func convertProfile(
sample *distributormodel.ProfileSample,
labels []*typesv1.LabelPair,
tenantID string,
copy bool,
) *segmentwriterv1.PushRequest {
var b *bytes.Buffer
if copy {
b = bytes.NewBuffer(make([]byte, 0, cap(sample.RawProfile)))
} else {
b = bytes.NewBuffer(sample.RawProfile[:0])
}
if _, err := sample.Profile.WriteTo(b); err != nil {
buf, err := pprof.Marshal(sample.Profile.Profile, true)
if err != nil {
panic(fmt.Sprintf("failed to marshal profile: %v", err))
}
profileID := uuid.New()
return &segmentwriterv1.PushRequest{
TenantId: tenantID,
// Note that labels are always copied because
// the API allows multiple profiles to refer to
// the same label set.
Labels: phlaremodel.Labels(labels).Clone(),
Profile: b.Bytes(),
TenantId: tenantID,
Labels: labels,
Profile: buf,
ProfileId: profileID[:],
}
}
11 changes: 8 additions & 3 deletions pkg/distributor/write_path/router_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package writepath

import (
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -13,9 +14,13 @@ type metrics struct {
func newMetrics(reg prometheus.Registerer) *metrics {
m := &metrics{
durationHistogram: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pyroscope_write_path_downstream_request_duration_seconds",
Buckets: prometheus.ExponentialBucketsRange(0.001, 10, 30),
Help: "Duration of downstream requests made by the write path router.",
Name: "pyroscope_write_path_downstream_request_duration_seconds",
Help: "Duration of downstream requests made by the write path router.",

Buckets: prometheus.ExponentialBucketsRange(0.001, 10, 30),
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 32,
NativeHistogramMinResetDuration: time.Hour,
}, []string{"route", "primary", "status"}),
}
if reg != nil {
Expand Down
11 changes: 8 additions & 3 deletions pkg/distributor/write_path/write_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"flag"
"fmt"
"time"
)

// WritePath controls the write path.
Expand Down Expand Up @@ -53,9 +54,11 @@ func (m *WritePath) Set(text string) error {
func (m *WritePath) String() string { return string(*m) }

type Config struct {
WritePath WritePath `yaml:"write_path" json:"write_path" doc:"hidden"`
IngesterWeight float64 `yaml:"write_path_ingester_weight" json:"write_path_ingester_weight" doc:"hidden"`
SegmentWriterWeight float64 `yaml:"write_path_segment_writer_weight" json:"write_path_segment_writer_weight" doc:"hidden"`
WritePath WritePath `yaml:"write_path" json:"write_path" doc:"hidden"`
IngesterWeight float64 `yaml:"write_path_ingester_weight" json:"write_path_ingester_weight" doc:"hidden"`
SegmentWriterWeight float64 `yaml:"write_path_segment_writer_weight" json:"write_path_segment_writer_weight" doc:"hidden"`
SegmentWriterTimeout time.Duration `yaml:"write_path_segment_writer_timeout" json:"write_path_segment_writer_timeout" doc:"hidden"`
AsyncIngest bool `yaml:"async_ingest" json:"async_ingest" doc:"hidden"`
}

func (o *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -65,4 +68,6 @@ func (o *Config) RegisterFlags(f *flag.FlagSet) {
"Specifies the fraction [0:1] that should be send to ingester in combined mode. 0 means no traffics is sent to ingester. 1 means 100% of requests are sent to ingester.")
f.Float64Var(&o.SegmentWriterWeight, "write-path.segment-writer-weight", 0,
"Specifies the fraction [0:1] that should be send to segment-writer in combined mode. 0 means no traffics is sent to segment-writer. 1 means 100% of requests are sent to segment-writer.")
f.DurationVar(&o.SegmentWriterTimeout, "write-path.segment-writer-timeout", 5*time.Second, "Timeout for segment writer requests.")
f.BoolVar(&o.AsyncIngest, "async-ingest", false, "If true, the write path will not wait for the segment-writer to finish processing the request. Writes to ingester always synchronous.")
}
25 changes: 19 additions & 6 deletions pkg/distributor/write_path/write_path_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (s *routerTestSuite) Test_SegmentWriterPath() {
func (s *routerTestSuite) Test_CombinedPath() {
const (
N = 100
w = 10 // Concurrent workers.
f = 0.5
d = 0.3 // Allowed delta: note that f is just a probability.
)
Expand All @@ -134,22 +135,34 @@ func (s *routerTestSuite) Test_CombinedPath() {

var sentIngester atomic.Uint32
s.ingester.On("Push", mock.Anything, mock.Anything).
Run(func(mock.Arguments) { sentIngester.Add(1) }).
Run(func(m mock.Arguments) {
sentIngester.Add(1)
// Assert that no race condition occurs: we delete series
// attempting to access it concurrently with segment writer
// that should convert the distributor request to a segment
// writer request.
m.Get(1).(*distributormodel.PushRequest).Series = nil
}).
Return(new(connect.Response[pushv1.PushResponse]), nil)

var sentSegwriter atomic.Uint32
s.segwriter.On("Push", mock.Anything, mock.Anything).
Run(func(mock.Arguments) { sentSegwriter.Add(1) }).
Run(func(m mock.Arguments) {
sentSegwriter.Add(1)
m.Get(1).(*segmentwriterv1.PushRequest).Profile = nil
}).
Return(new(segmentwriterv1.PushResponse), nil)

for i := 0; i < N; i++ {
s.Assert().NoError(s.router.Send(context.Background(), s.request))
for i := 0; i < w; i++ {
for j := 0; j < N; j++ {
s.Assert().NoError(s.router.Send(context.Background(), s.request.Clone()))
}
}

s.router.inflight.Wait()
expected := N * f
expected := N * f * w
delta := expected * d
s.Assert().Equal(N, int(sentIngester.Load()))
s.Assert().Equal(N*w, int(sentIngester.Load()))
s.Assert().Greater(int(sentSegwriter.Load()), int(expected-delta))
s.Assert().Less(int(sentSegwriter.Load()), int(expected+delta))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/read_path/read_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ func (o *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&o.EnableQueryBackend, "enable-query-backend", false,
"This parameter specifies whether the new query backend is enabled.")
f.Var((*flagext.Time)(&o.EnableQueryBackendFrom), "enable-query-backend-from",
"This parameter specifies the point in time from which data is queried from the new query backend.")
"This parameter specifies the point in time from which data is queried from the new query backend. The format if RFC3339 (2020-10-20T00:00:00Z)")
}

0 comments on commit 6730d86

Please sign in to comment.