Skip to content

feat: fetch recording rules from tenant-settings #3874

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 50 additions & 6 deletions pkg/experiment/block/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,39 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption {
}
}

func WithSampleObserver(observer SampleObserver) CompactionOption {
return func(p *compactionConfig) {
p.sampleObserver = observer
}
}

type compactionConfig struct {
objectOptions []ObjectOption
tempdir string
source objstore.BucketReader
destination objstore.Bucket
objectOptions []ObjectOption
tempdir string
source objstore.BucketReader
destination objstore.Bucket
sampleObserver SampleObserver
}

type SampleObserver interface {
// Observe is called before the compactor appends the entry
// to the output block. This method must not modify the entry.
Observe(ProfileEntry)

// Flush is called before the compactor flushes the output dataset.
// This call invalidates all references (such as symbols) to the source
// and output blocks. Any error returned by the call terminates the
// compaction job: it's caller responsibility to suppress errors.
Flush() error
}

type NoOpObserver struct{}

func (o *NoOpObserver) Observe(row ProfileEntry) {
}

func (o *NoOpObserver) Flush() error {
return nil
}

func Compact(
Expand Down Expand Up @@ -88,7 +116,7 @@ func Compact(

compacted := make([]*metastorev1.BlockMeta, 0, len(plan))
for _, p := range plan {
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir)
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir, c.sampleObserver)
if compactionErr != nil {
return nil, compactionErr
}
Expand Down Expand Up @@ -179,18 +207,27 @@ func newBlockCompaction(
return p
}

func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) {
func (b *CompactionPlan) Compact(
ctx context.Context,
dst objstore.Bucket,
tmpdir string,
observer SampleObserver,
) (m *metastorev1.BlockMeta, err error) {
w := NewBlockWriter(dst, b.path, tmpdir)
defer func() {
err = multierror.New(err, w.Close()).Err()
}()
// Datasets are compacted in a strict order.
for _, s := range b.datasets {
s.registerSampleObserver(observer)
if err = s.compact(ctx, w); err != nil {
return nil, fmt.Errorf("compacting block: %w", err)
}
b.meta.Datasets = append(b.meta.Datasets, s.meta)
}
if err = observer.Flush(); err != nil {
return nil, fmt.Errorf("flushing sample observer: %w", err)
}
if err = w.Flush(ctx); err != nil {
return nil, fmt.Errorf("flushing block writer: %w", err)
}
Expand Down Expand Up @@ -236,6 +273,8 @@ type datasetCompaction struct {
profiles uint64

flushOnce sync.Once

observer SampleObserver
}

func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
Expand Down Expand Up @@ -284,6 +323,10 @@ func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error)
return nil
}

func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) {
m.observer = observer
}

func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
m.path = path
defer func() {
Expand Down Expand Up @@ -366,6 +409,7 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) {
if err = m.symbolsRewriter.rewriteRow(r); err != nil {
return err
}
m.observer.Observe(r)
return m.profilesWriter.writeRow(r)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/experiment/block/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func Test_CompactBlocks(t *testing.T) {
WithCompactionObjectOptions(
WithObjectDownload(filepath.Join(tempdir, "source")),
WithObjectMaxSizeLoadInMemory(0)), // Force download.
WithSampleObserver(&NoOpObserver{}),
)

require.NoError(t, err)
Expand Down
12 changes: 11 additions & 1 deletion pkg/experiment/compactor/compaction_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
metricsexport "github.com/grafana/pyroscope/pkg/experiment/metrics"
"github.com/grafana/pyroscope/pkg/objstore"
"github.com/grafana/pyroscope/pkg/util"
)
Expand Down Expand Up @@ -399,7 +400,9 @@ func (w *Worker) runCompaction(job *compactionJob) {
block.WithCompactionObjectOptions(
block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize),
block.WithObjectDownload(sourcedir),
))
),
block.WithSampleObserver(newSampleObserver(job)),
)
defer func() {
if err = os.RemoveAll(tempdir); err != nil {
level.Warn(logger).Log("msg", "failed to remove compaction directory", "path", tempdir, "err", err)
Expand Down Expand Up @@ -458,6 +461,13 @@ func (w *Worker) runCompaction(job *compactionJob) {
_ = deleteGroup.Wait()
}

func newSampleObserver(job *compactionJob) block.SampleObserver {
if job.CompactionLevel == 0 {
return metricsexport.NewMetricsObserver(job.Tenant, job.blocks[0])
}
return &block.NoOpObserver{}
}

func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error {
ctx, cancel := context.WithTimeout(job.ctx, w.config.RequestTimeout)
defer cancel()
Expand Down
108 changes: 108 additions & 0 deletions pkg/experiment/metrics/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package metrics

import (
"context"
"net/url"
"time"

"github.com/gogo/protobuf/proto"
"github.com/klauspost/compress/snappy"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
)

type Exporter struct {
config Config
client remote.WriteClient
data map[AggregatedFingerprint]*TimeSeries
}

type Config struct {
url string
username string
password config.Secret
}

func NewExporter(tenant string) *Exporter {
cfg := configFromTenant(tenant)
return &Exporter{
config: cfg,
data: map[AggregatedFingerprint]*TimeSeries{},
}
}

func (e *Exporter) AppendMetrics(recordings []*Recording) {
for _, r := range recordings {
for fp, ts := range r.data {
e.data[fp] = ts
}
}
}

func (e *Exporter) Send() error {
if e.client == nil {
e.client = newClient(e.config)
}

p := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(e.data))}
for _, ts := range e.data {
pts := prompb.TimeSeries{
Labels: make([]prompb.Label, 0, len(ts.Labels)),
}
for _, l := range ts.Labels {
pts.Labels = append(pts.Labels, prompb.Label{
Name: l.Name,
Value: l.Value,
})
}
for _, s := range ts.Samples {
pts.Samples = append(pts.Samples, prompb.Sample{
Value: s.Value,
Timestamp: s.Timestamp,
})
}
p.Timeseries = append(p.Timeseries, pts)
}
buf := proto.NewBuffer(nil)
if err := buf.Marshal(p); err != nil {
return err
}
return e.client.Store(context.Background(), snappy.Encode(nil, buf.Bytes()), 0)
}

func newClient(cfg Config) remote.WriteClient {
wURL, err := url.Parse(cfg.url)
if err != nil {
panic(err)
}

c, err := remote.NewWriteClient("exporter", &remote.ClientConfig{
URL: &config.URL{URL: wURL},
Timeout: model.Duration(time.Second * 10),
HTTPClientConfig: config.HTTPClientConfig{
BasicAuth: &config.BasicAuth{
Username: cfg.username,
Password: cfg.password,
},
},
SigV4Config: nil,
AzureADConfig: nil,
Headers: nil,
RetryOnRateLimit: false,
})
if err != nil {
panic(err)
}
return c
}

func configFromTenant(tenant string) Config {
// TODO
return Config{
url: "omitted",
username: "omitted",
password: "omitted",
}
}
43 changes: 43 additions & 0 deletions pkg/experiment/metrics/observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package metrics

import (
"fmt"

"github.com/cespare/xxhash/v2"
"github.com/oklog/ulid"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/block"
)

type MetricsObserver struct {
tenant string
recorder *Recorder
}

func NewMetricsObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsObserver {
recordingTime := int64(ulid.MustParse(meta.Id).Time())
rules := recordingRulesFromTenant(tenant)
pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy)
return &MetricsObserver{
tenant: tenant,
recorder: NewRecorder(rules, recordingTime, pyroscopeInstance),
}
}

func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
buf := make([]byte, 0, 8)
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy))
return fmt.Sprintf("%x", xxhash.Sum64(buf))
}

func (o *MetricsObserver) Observe(row block.ProfileEntry) {
o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue())
}

func (o *MetricsObserver) Flush() error {
exporter := NewExporter(o.tenant)
exporter.AppendMetrics(o.recorder.Recordings)
return exporter.Send()
}
Loading
Loading