Skip to content

Commit c90f289

Browse files
committed
feat: record metrics from rules and export to remote
1 parent 9d3e08d commit c90f289

File tree

7 files changed

+364
-10
lines changed

7 files changed

+364
-10
lines changed

pkg/experiment/block/compaction.go

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@ import (
1010
"strings"
1111
"sync"
1212

13+
"github.com/cespare/xxhash/v2"
14+
"github.com/google/uuid"
1315
"github.com/grafana/dskit/multierror"
16+
"github.com/oklog/ulid"
1417
"github.com/parquet-go/parquet-go"
1518
"github.com/prometheus/common/model"
1619
"github.com/prometheus/prometheus/storage"
1720
"golang.org/x/sync/errgroup"
1821

1922
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
2023
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
24+
"github.com/grafana/pyroscope/pkg/experiment/metrics"
2125
phlaremodel "github.com/grafana/pyroscope/pkg/model"
2226
"github.com/grafana/pyroscope/pkg/objstore"
2327
"github.com/grafana/pyroscope/pkg/phlaredb/block"
@@ -62,6 +66,7 @@ func Compact(
6266
ctx context.Context,
6367
blocks []*metastorev1.BlockMeta,
6468
storage objstore.Bucket,
69+
workerId uuid.UUID,
6570
options ...CompactionOption,
6671
) (m []*metastorev1.BlockMeta, err error) {
6772
c := &compactionConfig{
@@ -74,7 +79,7 @@ func Compact(
7479
}
7580

7681
objects := ObjectsFromMetas(storage, blocks, c.objectOptions...)
77-
plan, err := PlanCompaction(objects)
82+
plan, err := PlanCompaction(objects, workerId)
7883
if err != nil {
7984
return nil, err
8085
}
@@ -93,12 +98,20 @@ func Compact(
9398
return nil, compactionErr
9499
}
95100
compacted = append(compacted, md)
101+
102+
if p.metricsExporter != nil {
103+
go func() {
104+
if sendErr := p.SendRecordedMetrics(); sendErr != nil {
105+
println("ERROR", sendErr) // TODO
106+
}
107+
}()
108+
}
96109
}
97110

98111
return compacted, nil
99112
}
100113

101-
func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
114+
func PlanCompaction(objects Objects, workerId uuid.UUID) ([]*CompactionPlan, error) {
102115
if len(objects) == 0 {
103116
// Even if there's just a single object, we still need to rewrite it.
104117
return nil, ErrNoBlocksToMerge
@@ -125,6 +138,7 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
125138
obj.meta.StringTable[s.Tenant],
126139
r.meta.Shard,
127140
level,
141+
workerId,
128142
)
129143
m[obj.meta.StringTable[s.Tenant]] = tm
130144
}
@@ -149,24 +163,28 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
149163
}
150164

151165
type CompactionPlan struct {
152-
tenant string
153-
path string
154-
datasetMap map[int32]*datasetCompaction
155-
datasets []*datasetCompaction
156-
meta *metastorev1.BlockMeta
157-
strings *metadata.StringTable
166+
tenant string
167+
path string
168+
datasetMap map[int32]*datasetCompaction
169+
datasets []*datasetCompaction
170+
meta *metastorev1.BlockMeta
171+
strings *metadata.StringTable
172+
metricsExporter *metrics.Exporter
173+
workerId uuid.UUID
158174
}
159175

160176
func newBlockCompaction(
161177
id string,
162178
tenant string,
163179
shard uint32,
164180
compactionLevel uint32,
181+
workerId uuid.UUID,
165182
) *CompactionPlan {
166183
p := &CompactionPlan{
167184
tenant: tenant,
168185
datasetMap: make(map[int32]*datasetCompaction),
169186
strings: metadata.NewStringTable(),
187+
workerId: workerId,
170188
}
171189
p.path = BuildObjectPath(tenant, shard, compactionLevel, id)
172190
p.meta = &metastorev1.BlockMeta{
@@ -176,6 +194,9 @@ func newBlockCompaction(
176194
Shard: shard,
177195
CompactionLevel: compactionLevel,
178196
}
197+
if compactionLevel == 1 {
198+
p.metricsExporter = metrics.NewExporter(tenant)
199+
}
179200
return p
180201
}
181202

@@ -189,6 +210,9 @@ func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdi
189210
if err = s.compact(ctx, w); err != nil {
190211
return nil, fmt.Errorf("compacting block: %w", err)
191212
}
213+
if b.metricsExporter != nil {
214+
b.metricsExporter.AppendMetrics(s.metricsRecorder.Recordings)
215+
}
192216
b.meta.Datasets = append(b.meta.Datasets, s.meta)
193217
}
194218
if err = w.Flush(ctx); err != nil {
@@ -217,6 +241,10 @@ func (b *CompactionPlan) addDataset(md *metastorev1.BlockMeta, s *metastorev1.Da
217241
return sm
218242
}
219243

244+
func (c *CompactionPlan) SendRecordedMetrics() error {
245+
return c.metricsExporter.Send()
246+
}
247+
220248
type datasetCompaction struct {
221249
// Dataset name.
222250
name string
@@ -236,6 +264,9 @@ type datasetCompaction struct {
236264
profiles uint64
237265

238266
flushOnce sync.Once
267+
268+
workerId uuid.UUID
269+
metricsRecorder *metrics.Recorder
239270
}
240271

241272
func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
@@ -254,6 +285,7 @@ func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompac
254285
Size: 0,
255286
Labels: nil,
256287
},
288+
workerId: b.workerId,
257289
}
258290
}
259291

@@ -309,6 +341,13 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
309341
m.indexRewriter = newIndexRewriter(m.path)
310342
m.symbolsRewriter = newSymbolsRewriter(m.path)
311343

344+
if m.parent.meta.CompactionLevel == 1 {
345+
recordingTime := int64(ulid.MustParse(m.parent.meta.Id).Time())
346+
rules := metrics.RecordingRulesFromTenant(m.parent.tenant)
347+
pyroscopeInstance := pyroscopeInstanceHash(m.parent.meta.Shard, m.workerId)
348+
m.metricsRecorder = metrics.NewRecorder(rules, recordingTime, pyroscopeInstance)
349+
}
350+
312351
g, ctx := errgroup.WithContext(ctx)
313352
for _, s := range m.datasets {
314353
s := s
@@ -330,6 +369,13 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
330369
return nil
331370
}
332371

372+
func pyroscopeInstanceHash(shard uint32, id uuid.UUID) string {
373+
buf := make([]byte, 0, 40)
374+
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
375+
buf = append(buf, id.String()...)
376+
return fmt.Sprintf("%x", xxhash.Sum64(buf))
377+
}
378+
333379
func (m *datasetCompaction) mergeAndClose(ctx context.Context) (err error) {
334380
defer func() {
335381
err = multierror.New(err, m.close()).Err()
@@ -366,6 +412,9 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) {
366412
if err = m.symbolsRewriter.rewriteRow(r); err != nil {
367413
return err
368414
}
415+
if m.metricsRecorder != nil {
416+
m.metricsRecorder.RecordRow(r.Fingerprint, r.Labels, r.Row.TotalValue())
417+
}
369418
return m.profilesWriter.writeRow(r)
370419
}
371420

pkg/experiment/block/compaction_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"path/filepath"
88
"testing"
99

10+
"github.com/google/uuid"
1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
1213
"google.golang.org/protobuf/encoding/protojson"
@@ -26,7 +27,7 @@ func Test_CompactBlocks(t *testing.T) {
2627
require.NoError(t, err)
2728

2829
dst, tempdir := testutil.NewFilesystemBucket(t, ctx, t.TempDir())
29-
compactedBlocks, err := Compact(ctx, resp.Blocks, bucket,
30+
compactedBlocks, err := Compact(ctx, resp.Blocks, bucket, uuid.New(),
3031
WithCompactionDestination(dst),
3132
WithCompactionTempDir(tempdir),
3233
WithCompactionObjectOptions(

pkg/experiment/compactor/compaction_worker.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/go-kit/log"
1717
"github.com/go-kit/log/level"
18+
"github.com/google/uuid"
1819
"github.com/grafana/dskit/services"
1920
"github.com/opentracing/opentracing-go"
2021
"github.com/pkg/errors"
@@ -46,6 +47,8 @@ type Worker struct {
4647
stopped atomic.Bool
4748
closeOnce sync.Once
4849
wg sync.WaitGroup
50+
51+
id uuid.UUID
4952
}
5053

5154
type Config struct {
@@ -96,6 +99,7 @@ func New(
9699
client: client,
97100
storage: storage,
98101
metrics: newMetrics(reg),
102+
id: uuid.New(),
99103
}
100104
w.threads = config.JobConcurrency
101105
if w.threads < 1 {
@@ -363,7 +367,7 @@ func (w *Worker) runCompaction(job *compactionJob) {
363367

364368
tempdir := filepath.Join(w.config.TempDir, job.Name)
365369
sourcedir := filepath.Join(tempdir, "source")
366-
compacted, err := block.Compact(ctx, job.blocks, w.storage,
370+
compacted, err := block.Compact(ctx, job.blocks, w.storage, w.id,
367371
block.WithCompactionTempDir(tempdir),
368372
block.WithCompactionObjectOptions(
369373
block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize),

pkg/experiment/metrics/exporter.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"net/url"
6+
"time"
7+
8+
"github.com/gogo/protobuf/proto"
9+
"github.com/klauspost/compress/snappy"
10+
"github.com/prometheus/common/config"
11+
"github.com/prometheus/common/model"
12+
"github.com/prometheus/prometheus/prompb"
13+
"github.com/prometheus/prometheus/storage/remote"
14+
)
15+
16+
type Exporter struct {
17+
config Config
18+
client remote.WriteClient
19+
data map[AggregatedFingerprint]*TimeSeries
20+
}
21+
22+
type Config struct {
23+
url string
24+
username string
25+
password config.Secret
26+
}
27+
28+
func NewExporter(tenant string) *Exporter {
29+
cfg := configFromTenant(tenant)
30+
return &Exporter{
31+
config: cfg,
32+
data: map[AggregatedFingerprint]*TimeSeries{},
33+
}
34+
}
35+
36+
func (e *Exporter) AppendMetrics(recordings []*Recording) {
37+
for _, r := range recordings {
38+
for fp, ts := range r.data {
39+
e.data[fp] = ts
40+
}
41+
}
42+
}
43+
44+
func (e *Exporter) Send() error {
45+
if e.client == nil {
46+
e.client = newClient(e.config)
47+
}
48+
49+
p := &prompb.WriteRequest{Timeseries: make([]prompb.TimeSeries, 0, len(e.data))}
50+
for _, ts := range e.data {
51+
pts := prompb.TimeSeries{
52+
Labels: make([]prompb.Label, 0, len(ts.Labels)),
53+
}
54+
for _, l := range ts.Labels {
55+
pts.Labels = append(pts.Labels, prompb.Label{
56+
Name: l.Name,
57+
Value: l.Value,
58+
})
59+
}
60+
for _, s := range ts.Samples {
61+
pts.Samples = append(pts.Samples, prompb.Sample{
62+
Value: s.Value,
63+
Timestamp: s.Timestamp,
64+
})
65+
}
66+
p.Timeseries = append(p.Timeseries, pts)
67+
}
68+
buf := proto.NewBuffer(nil)
69+
if err := buf.Marshal(p); err != nil {
70+
return err
71+
}
72+
return e.client.Store(context.Background(), snappy.Encode(nil, buf.Bytes()), 0)
73+
}
74+
75+
func newClient(cfg Config) remote.WriteClient {
76+
wURL, err := url.Parse(cfg.url)
77+
if err != nil {
78+
panic(err)
79+
}
80+
81+
c, err := remote.NewWriteClient("exporter", &remote.ClientConfig{
82+
URL: &config.URL{URL: wURL},
83+
Timeout: model.Duration(time.Second * 10),
84+
HTTPClientConfig: config.HTTPClientConfig{
85+
BasicAuth: &config.BasicAuth{
86+
Username: cfg.username,
87+
Password: cfg.password,
88+
},
89+
},
90+
SigV4Config: nil,
91+
AzureADConfig: nil,
92+
Headers: nil,
93+
RetryOnRateLimit: false,
94+
})
95+
if err != nil {
96+
panic(err)
97+
}
98+
return c
99+
}
100+
101+
func configFromTenant(tenant string) Config {
102+
// TODO
103+
return Config{
104+
url: "omitted",
105+
username: "omitted",
106+
password: "omitted",
107+
}
108+
}

0 commit comments

Comments
 (0)