Skip to content

Commit

Permalink
fix(otel): use PushParsed instead of Push
Browse files Browse the repository at this point in the history
  • Loading branch information
korniltsev committed Dec 19, 2024
1 parent f718fd3 commit 02dab4e
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 59 deletions.
1 change: 1 addition & 0 deletions pkg/distributor/model/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type RawProfileType string

const RawProfileTypePPROF = RawProfileType("pprof")
const RawProfileTypeJFR = RawProfileType("jfr")
const RawProfileTypeOTEL = RawProfileType("otel")

type PushRequest struct {
TenantID string
Expand Down
38 changes: 20 additions & 18 deletions pkg/ingester/otlp/ingest_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package otlp
import (
"context"
"fmt"
"github.com/grafana/pyroscope/pkg/model"
model2 "github.com/prometheus/common/model"
"net/http"
"strings"

distirbutormodel "github.com/grafana/pyroscope/pkg/distributor/model"
pyromodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/pprof"

"connectrpc.com/connect"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -36,7 +38,7 @@ type Handler interface {
}

type PushService interface {
Push(ctx context.Context, req *connect.Request[pushv1.PushRequest]) (*connect.Response[pushv1.PushResponse], error)
PushParsed(ctx context.Context, req *distirbutormodel.PushRequest) (*connect.Response[pushv1.PushResponse], error)
}

func NewOTLPIngestHandler(svc PushService, l log.Logger, me bool) Handler {
Expand Down Expand Up @@ -112,24 +114,24 @@ func (h *ingestHandler) Export(ctx context.Context, er *pprofileotlp.ExportProfi
Value: svc,
})

pprofBytes, err := pprofProfile.MarshalVT()
if err != nil {
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to marshal pprof profile: %w", err)
}

req := &pushv1.PushRequest{
Series: []*pushv1.RawProfileSeries{
req := &distirbutormodel.PushRequest{
RawProfileSize: p.Profile.Size(),
RawProfileType: distirbutormodel.RawProfileTypeOTEL,
Series: []*distirbutormodel.ProfileSeries{
{
Labels: labels,
Samples: []*pushv1.RawSample{{
RawProfile: pprofBytes,
ID: uuid.New().String(),
}},
Samples: []*distirbutormodel.ProfileSample{
{
RawProfile: nil,
Profile: pprof.RawFromProto(pprofProfile),
ID: uuid.New().String(),
},
},
},
},
}

_, err = h.svc.Push(ctx, connect.NewRequest(req))
_, err := h.svc.PushParsed(ctx, req)
if err != nil {
h.log.Log("msg", "failed to push profile", "err", err)
return &pprofileotlp.ExportProfilesServiceResponse{}, fmt.Errorf("failed to make a GRPC request: %w", err)
Expand Down Expand Up @@ -161,15 +163,15 @@ func getServiceNameFromAttributes(attrs []v1.KeyValue) string {
func getDefaultLabels() []*typesv1.LabelPair {
return []*typesv1.LabelPair{
{
Name: model2.MetricNameLabel,
Name: pyromodel.LabelNameProfileName,
Value: "process_cpu",
},
{
Name: model.LabelNameDelta,
Name: pyromodel.LabelNameDelta,
Value: "false",
},
{
Name: model.LabelNameOTEL,
Name: pyromodel.LabelNameOTEL,
Value: "true",
},
{
Expand Down
42 changes: 17 additions & 25 deletions pkg/ingester/otlp/ingest_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ import (
"strings"
"testing"

"connectrpc.com/connect"
"github.com/grafana/pyroscope/pkg/distributor/model"

"github.com/prometheus/prometheus/util/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

googlev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
pushv1 "github.com/grafana/pyroscope/api/gen/proto/go/push/v1"
v1experimental2 "github.com/grafana/pyroscope/api/otlp/collector/profiles/v1experimental"
"github.com/grafana/pyroscope/api/otlp/profiles/v1experimental"
"github.com/grafana/pyroscope/pkg/og/convert/pprof/bench"
Expand Down Expand Up @@ -165,10 +164,10 @@ func TestSymbolizedFunctionNames(t *testing.T) {
// Create two unsymbolized locations at 0x1e0 and 0x2f0
// Expect both of them to be present in the converted pprof
svc := mockotlp.NewMockPushService(t)
var profiles [][]byte
svc.On("Push", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
c := (args.Get(1)).(*connect.Request[pushv1.PushRequest])
profiles = append(profiles, c.Msg.Series[0].Samples[0].RawProfile)
var profiles []*model.PushRequest
svc.On("PushParsed", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
c := (args.Get(1)).(*model.PushRequest)
profiles = append(profiles, c)
}).Return(nil, nil)

otlpb := new(otlpbuilder)
Expand Down Expand Up @@ -204,9 +203,7 @@ func TestSymbolizedFunctionNames(t *testing.T) {
assert.NoError(t, err)
require.Equal(t, 1, len(profiles))

gp := new(googlev1.Profile)
err = gp.UnmarshalVT(profiles[0])
require.NoError(t, err)
gp := profiles[0].Series[0].Samples[0].Profile.Profile

ss := bench.StackCollapseProtoWithOptions(gp, bench.StackCollapseOptions{
ValueIdx: 0,
Expand All @@ -222,10 +219,10 @@ func TestSampleAttributes(t *testing.T) {
// one process=firefox, the other process=chrome
// expect both of them to be present in the converted pprof as labels, but not series labels
svc := mockotlp.NewMockPushService(t)
var profiles []*pushv1.PushRequest
svc.On("Push", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
c := (args.Get(1)).(*connect.Request[pushv1.PushRequest])
profiles = append(profiles, c.Msg)
var profiles []*model.PushRequest
svc.On("PushParsed", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
c := (args.Get(1)).(*model.PushRequest)
profiles = append(profiles, c)
}).Return(nil, nil)

otlpb := new(otlpbuilder)
Expand Down Expand Up @@ -300,9 +297,7 @@ func TestSampleAttributes(t *testing.T) {
assert.Equal(t, "", seriesLabelsMap["process"])
assert.NotContains(t, seriesLabelsMap, "service.name")

gp := new(googlev1.Profile)
err = gp.UnmarshalVT(profiles[0].Series[0].Samples[0].RawProfile)
require.NoError(t, err)
gp := profiles[0].Series[0].Samples[0].Profile.Profile

ss := bench.StackCollapseProtoWithOptions(gp, bench.StackCollapseOptions{
ValueIdx: 0,
Expand All @@ -319,10 +314,10 @@ func TestDifferentServiceNames(t *testing.T) {
// Create a profile with two samples having different service.name attributes
// Expect them to be pushed as separate profiles
svc := mockotlp.NewMockPushService(t)
var profiles []*pushv1.PushRequest
svc.On("Push", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
c := (args.Get(1)).(*connect.Request[pushv1.PushRequest])
profiles = append(profiles, c.Msg)
var profiles []*model.PushRequest
svc.On("PushParsed", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
c := (args.Get(1)).(*model.PushRequest)
profiles = append(profiles, c)
}).Return(nil, nil)

otlpb := new(otlpbuilder)
Expand Down Expand Up @@ -462,10 +457,7 @@ func TestDifferentServiceNames(t *testing.T) {
require.Contains(t, []string{"service-a", "service-b"}, serviceName)
assert.NotContains(t, seriesLabelsMap, "service.name")

// Verify the profile contents
gp := new(googlev1.Profile)
err = gp.UnmarshalVT(p.Series[0].Samples[0].RawProfile)
require.NoError(t, err)
gp := p.Series[0].Samples[0].Profile.Profile

// Verify sample types
require.Equal(t, 1, len(gp.SampleType))
Expand Down
34 changes: 18 additions & 16 deletions pkg/test/mocks/mockotlp/mock_push_service.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 02dab4e

Please sign in to comment.