diff --git a/pkg/distributor/model/push.go b/pkg/distributor/model/push.go index efff9d30a2..65141a836c 100644 --- a/pkg/distributor/model/push.go +++ b/pkg/distributor/model/push.go @@ -10,6 +10,7 @@ type RawProfileType string const RawProfileTypePPROF = RawProfileType("pprof") const RawProfileTypeJFR = RawProfileType("jfr") +const RawProfileTypeOTEL = RawProfileType("otel") type PushRequest struct { TenantID string diff --git a/pkg/ingester/otlp/ingest_handler.go b/pkg/ingester/otlp/ingest_handler.go index faff1d127e..085c3d5b32 100644 --- a/pkg/ingester/otlp/ingest_handler.go +++ b/pkg/ingester/otlp/ingest_handler.go @@ -3,11 +3,13 @@ package otlp import ( "context" "fmt" - "github.com/grafana/pyroscope/pkg/model" - model2 "github.com/prometheus/common/model" "net/http" "strings" + distirbutormodel "github.com/grafana/pyroscope/pkg/distributor/model" + pyromodel "github.com/grafana/pyroscope/pkg/model" + "github.com/grafana/pyroscope/pkg/pprof" + "connectrpc.com/connect" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -36,7 +38,7 @@ type Handler interface { } type PushService interface { - Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) + PushParsed(ctx context.Context, req *distirbutormodel.PushRequest) (*connect.Response[pushv1.PushResponse], error) } func NewOTLPIngestHandler(svc PushService, l log.Logger, me bool) Handler { @@ -112,24 +114,24 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi Value: svc, }) - pprofBytes, err := pprofProfile.MarshalVT() - if err != nil { - return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to marshal pprof profile: %w", err) - } - - req := &pushv1.PushRequest{ - Series: []*pushv1.RawProfileSeries{ + req := &distirbutormodel.PushRequest{ + RawProfileSize: p.Profile.Size(), + RawProfileType: distirbutormodel.RawProfileTypeOTEL, + Series: []*distirbutormodel.ProfileSeries{ { Labels: labels, - Samples: []*pushv1.RawSample{{ - RawProfile: pprofBytes, - ID: uuid.New().String(), - }}, + Samples: []*distirbutormodel.ProfileSample{ + { + RawProfile: nil, + Profile: pprof.RawFromProto(pprofProfile), + ID: uuid.New().String(), + }, + }, }, }, } - _, err = h.svc.Push(ctx, connect.NewRequest(req)) + _, err := h.svc.PushParsed(ctx, req) if err != nil { h.log.Log("msg", "failed to push profile", "err", err) return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to make a GRPC request: %w", err) @@ -161,15 +163,15 @@ func getServiceNameFromAttributes(attrs []v1.KeyValue) string { func getDefaultLabels() []*typesv1.LabelPair { return []*typesv1.LabelPair{ { - Name: model2.MetricNameLabel, + Name: pyromodel.LabelNameProfileName, Value: "process_cpu", }, { - Name: model.LabelNameDelta, + Name: pyromodel.LabelNameDelta, Value: "false", }, { - Name: model.LabelNameOTEL, + Name: pyromodel.LabelNameOTEL, Value: "true", }, { diff --git a/pkg/ingester/otlp/ingest_handler_test.go b/pkg/ingester/otlp/ingest_handler_test.go index 7f76399758..09d6325080 100644 --- a/pkg/ingester/otlp/ingest_handler_test.go +++ b/pkg/ingester/otlp/ingest_handler_test.go @@ -6,13 +6,12 @@ import ( "strings" "testing" - "connectrpc.com/connect" + "github.com/grafana/pyroscope/pkg/distributor/model" + "github.com/prometheus/prometheus/util/testutil" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" - pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" v1experimental2 "github.com/grafana/pyroscope/api/otlp/collector/profiles/v1experimental" "github.com/grafana/pyroscope/api/otlp/profiles/v1experimental" "github.com/grafana/pyroscope/pkg/og/convert/pprof/bench" @@ -165,10 +164,10 @@ func TestSymbolizedFunctionNames(t *testing.T) { // Create two unsymbolized locations at 0x1e0 and 0x2f0 // Expect both of them to be present in the converted pprof svc := mockotlp.NewMockPushService(t) - var profiles [][]byte - svc.On("Push", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - c := (args.Get(1)).(*connect.Request[pushv1.PushRequest]) - profiles = append(profiles, c.Msg.Series[0].Samples[0].RawProfile) + var profiles []*model.PushRequest + svc.On("PushParsed", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + c := (args.Get(1)).(*model.PushRequest) + profiles = append(profiles, c) }).Return(nil, nil) otlpb := new(otlpbuilder) @@ -204,9 +203,7 @@ func TestSymbolizedFunctionNames(t *testing.T) { assert.NoError(t, err) require.Equal(t, 1, len(profiles)) - gp := new(googlev1.Profile) - err = gp.UnmarshalVT(profiles[0]) - require.NoError(t, err) + gp := profiles[0].Series[0].Samples[0].Profile.Profile ss := bench.StackCollapseProtoWithOptions(gp, bench.StackCollapseOptions{ ValueIdx: 0, @@ -222,10 +219,10 @@ func TestSampleAttributes(t *testing.T) { // one process=firefox, the other process=chrome // expect both of them to be present in the converted pprof as labels, but not series labels svc := mockotlp.NewMockPushService(t) - var profiles []*pushv1.PushRequest - svc.On("Push", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - c := (args.Get(1)).(*connect.Request[pushv1.PushRequest]) - profiles = append(profiles, c.Msg) + var profiles []*model.PushRequest + svc.On("PushParsed", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + c := (args.Get(1)).(*model.PushRequest) + profiles = append(profiles, c) }).Return(nil, nil) otlpb := new(otlpbuilder) @@ -300,9 +297,7 @@ func TestSampleAttributes(t *testing.T) { assert.Equal(t, "", seriesLabelsMap["process"]) assert.NotContains(t, seriesLabelsMap, "service.name") - gp := new(googlev1.Profile) - err = gp.UnmarshalVT(profiles[0].Series[0].Samples[0].RawProfile) - require.NoError(t, err) + gp := profiles[0].Series[0].Samples[0].Profile.Profile ss := bench.StackCollapseProtoWithOptions(gp, bench.StackCollapseOptions{ ValueIdx: 0, @@ -319,10 +314,10 @@ func TestDifferentServiceNames(t *testing.T) { // Create a profile with two samples having different service.name attributes // Expect them to be pushed as separate profiles svc := mockotlp.NewMockPushService(t) - var profiles []*pushv1.PushRequest - svc.On("Push", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - c := (args.Get(1)).(*connect.Request[pushv1.PushRequest]) - profiles = append(profiles, c.Msg) + var profiles []*model.PushRequest + svc.On("PushParsed", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + c := (args.Get(1)).(*model.PushRequest) + profiles = append(profiles, c) }).Return(nil, nil) otlpb := new(otlpbuilder) @@ -462,10 +457,7 @@ func TestDifferentServiceNames(t *testing.T) { require.Contains(t, []string{"service-a", "service-b"}, serviceName) assert.NotContains(t, seriesLabelsMap, "service.name") - // Verify the profile contents - gp := new(googlev1.Profile) - err = gp.UnmarshalVT(p.Series[0].Samples[0].RawProfile) - require.NoError(t, err) + gp := p.Series[0].Samples[0].Profile.Profile // Verify sample types require.Equal(t, 1, len(gp.SampleType)) diff --git a/pkg/test/mocks/mockotlp/mock_push_service.go b/pkg/test/mocks/mockotlp/mock_push_service.go index 49c9a770ba..1201953486 100644 --- a/pkg/test/mocks/mockotlp/mock_push_service.go +++ b/pkg/test/mocks/mockotlp/mock_push_service.go @@ -9,6 +9,8 @@ import ( mock "github.com/stretchr/testify/mock" + model "github.com/grafana/pyroscope/pkg/distributor/model" + pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1" ) @@ -25,20 +27,20 @@ func (_m *MockPushService) EXPECT() *MockPushService_Expecter { return &MockPushService_Expecter{mock: &_m.Mock} } -// Push provides a mock function with given fields: ctx, req -func (_m *MockPushService) Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error) { +// PushParsed provides a mock function with given fields: ctx, req +func (_m *MockPushService) PushParsed(ctx context.Context, req *model.PushRequest) (*connect.Response[pushv1.PushResponse], error) { ret := _m.Called(ctx, req) if len(ret) == 0 { - panic("no return value specified for Push") + panic("no return value specified for PushParsed") } var r0 *connect.Response[pushv1.PushResponse] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error)); ok { + if rf, ok := ret.Get(0).(func(context.Context, *model.PushRequest) (*connect.Response[pushv1.PushResponse], error)); ok { return rf(ctx, req) } - if rf, ok := ret.Get(0).(func(context.Context, *connect.Request[pushv1.PushRequest]) *connect.Response[pushv1.PushResponse]); ok { + if rf, ok := ret.Get(0).(func(context.Context, *model.PushRequest) *connect.Response[pushv1.PushResponse]); ok { r0 = rf(ctx, req) } else { if ret.Get(0) != nil { @@ -46,7 +48,7 @@ func (_m *MockPushService) Push(ctx context.Context, req *connect.Request[pushv1 } } - if rf, ok := ret.Get(1).(func(context.Context, *connect.Request[pushv1.PushRequest]) error); ok { + if rf, ok := ret.Get(1).(func(context.Context, *model.PushRequest) error); ok { r1 = rf(ctx, req) } else { r1 = ret.Error(1) @@ -55,31 +57,31 @@ func (_m *MockPushService) Push(ctx context.Context, req *connect.Request[pushv1 return r0, r1 } -// MockPushService_Push_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Push' -type MockPushService_Push_Call struct { +// MockPushService_PushParsed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PushParsed' +type MockPushService_PushParsed_Call struct { *mock.Call } -// Push is a helper method to define mock.On call +// PushParsed is a helper method to define mock.On call // - ctx context.Context -// - req *connect.Request[pushv1.PushRequest] -func (_e *MockPushService_Expecter) Push(ctx interface{}, req interface{}) *MockPushService_Push_Call { - return &MockPushService_Push_Call{Call: _e.mock.On("Push", ctx, req)} +// - req *model.PushRequest +func (_e *MockPushService_Expecter) PushParsed(ctx interface{}, req interface{}) *MockPushService_PushParsed_Call { + return &MockPushService_PushParsed_Call{Call: _e.mock.On("PushParsed", ctx, req)} } -func (_c *MockPushService_Push_Call) Run(run func(ctx context.Context, req *connect.Request[pushv1.PushRequest])) *MockPushService_Push_Call { +func (_c *MockPushService_PushParsed_Call) Run(run func(ctx context.Context, req *model.PushRequest)) *MockPushService_PushParsed_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*connect.Request[pushv1.PushRequest])) + run(args[0].(context.Context), args[1].(*model.PushRequest)) }) return _c } -func (_c *MockPushService_Push_Call) Return(_a0 *connect.Response[pushv1.PushResponse], _a1 error) *MockPushService_Push_Call { +func (_c *MockPushService_PushParsed_Call) Return(_a0 *connect.Response[pushv1.PushResponse], _a1 error) *MockPushService_PushParsed_Call { _c.Call.Return(_a0, _a1) return _c } -func (_c *MockPushService_Push_Call) RunAndReturn(run func(context.Context, *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error)) *MockPushService_Push_Call { +func (_c *MockPushService_PushParsed_Call) RunAndReturn(run func(context.Context, *model.PushRequest) (*connect.Response[pushv1.PushResponse], error)) *MockPushService_PushParsed_Call { _c.Call.Return(run) return _c }