Skip to content

Commit e1221a0

Browse files
committed
observer pattern
1 parent 75e37e3 commit e1221a0

File tree

5 files changed

+111
-56
lines changed

5 files changed

+111
-56
lines changed

pkg/experiment/block/compaction.go

Lines changed: 46 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,14 @@ import (
1010
"strings"
1111
"sync"
1212

13-
"github.com/cespare/xxhash/v2"
1413
"github.com/grafana/dskit/multierror"
15-
"github.com/oklog/ulid"
1614
"github.com/parquet-go/parquet-go"
1715
"github.com/prometheus/common/model"
1816
"github.com/prometheus/prometheus/storage"
1917
"golang.org/x/sync/errgroup"
2018

2119
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
2220
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
23-
"github.com/grafana/pyroscope/pkg/experiment/metrics"
2421
phlaremodel "github.com/grafana/pyroscope/pkg/model"
2522
"github.com/grafana/pyroscope/pkg/objstore"
2623
"github.com/grafana/pyroscope/pkg/phlaredb/block"
@@ -54,11 +51,30 @@ func WithCompactionDestination(storage objstore.Bucket) CompactionOption {
5451
}
5552
}
5653

54+
func WithSampleObserver(observer SampleObserver) CompactionOption {
55+
return func(p *compactionConfig) {
56+
p.sampleObserver = observer
57+
}
58+
}
59+
5760
type compactionConfig struct {
58-
objectOptions []ObjectOption
59-
tempdir string
60-
source objstore.BucketReader
61-
destination objstore.Bucket
61+
objectOptions []ObjectOption
62+
tempdir string
63+
source objstore.BucketReader
64+
destination objstore.Bucket
65+
sampleObserver SampleObserver
66+
}
67+
68+
type SampleObserver interface {
69+
// Observe is called before the compactor appends the entry
70+
// to the output block. This method must not modify the entry.
71+
Observe(ProfileEntry)
72+
73+
// Flush is called before the compactor flushes the output dataset.
74+
// This call invalidates all references (such as symbols) to the source
75+
// and output blocks. Any error returned by the call terminates the
76+
// compaction job: it's caller responsibility to suppress errors.
77+
Flush() error
6278
}
6379

6480
func Compact(
@@ -91,19 +107,11 @@ func Compact(
91107

92108
compacted := make([]*metastorev1.BlockMeta, 0, len(plan))
93109
for _, p := range plan {
94-
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir)
110+
md, compactionErr := p.Compact(ctx, c.destination, c.tempdir, c.sampleObserver)
95111
if compactionErr != nil {
96112
return nil, compactionErr
97113
}
98114
compacted = append(compacted, md)
99-
100-
if p.metricsExporter != nil {
101-
go func() {
102-
if sendErr := p.SendRecordedMetrics(); sendErr != nil {
103-
println("ERROR", sendErr) // TODO
104-
}
105-
}()
106-
}
107115
}
108116

109117
return compacted, nil
@@ -160,13 +168,12 @@ func PlanCompaction(objects Objects) ([]*CompactionPlan, error) {
160168
}
161169

162170
type CompactionPlan struct {
163-
tenant string
164-
path string
165-
datasetMap map[int32]*datasetCompaction
166-
datasets []*datasetCompaction
167-
meta *metastorev1.BlockMeta
168-
strings *metadata.StringTable
169-
metricsExporter *metrics.Exporter
171+
tenant string
172+
path string
173+
datasetMap map[int32]*datasetCompaction
174+
datasets []*datasetCompaction
175+
meta *metastorev1.BlockMeta
176+
strings *metadata.StringTable
170177
}
171178

172179
func newBlockCompaction(
@@ -188,27 +195,30 @@ func newBlockCompaction(
188195
Shard: shard,
189196
CompactionLevel: compactionLevel,
190197
}
191-
if compactionLevel == 1 {
192-
p.metricsExporter = metrics.NewExporter(tenant)
193-
}
194198
return p
195199
}
196200

197-
func (b *CompactionPlan) Compact(ctx context.Context, dst objstore.Bucket, tmpdir string) (m *metastorev1.BlockMeta, err error) {
201+
func (b *CompactionPlan) Compact(
202+
ctx context.Context,
203+
dst objstore.Bucket,
204+
tmpdir string,
205+
observer SampleObserver,
206+
) (m *metastorev1.BlockMeta, err error) {
198207
w := NewBlockWriter(dst, b.path, tmpdir)
199208
defer func() {
200209
err = multierror.New(err, w.Close()).Err()
201210
}()
202211
// Datasets are compacted in a strict order.
203212
for _, s := range b.datasets {
213+
s.registerSampleObserver(observer)
204214
if err = s.compact(ctx, w); err != nil {
205215
return nil, fmt.Errorf("compacting block: %w", err)
206216
}
207-
if b.metricsExporter != nil {
208-
b.metricsExporter.AppendMetrics(s.metricsRecorder.Recordings)
209-
}
210217
b.meta.Datasets = append(b.meta.Datasets, s.meta)
211218
}
219+
if err = observer.Flush(); err != nil {
220+
return nil, fmt.Errorf("flushing sample observer: %w", err)
221+
}
212222
if err = w.Flush(ctx); err != nil {
213223
return nil, fmt.Errorf("flushing block writer: %w", err)
214224
}
@@ -235,10 +245,6 @@ func (b *CompactionPlan) addDataset(md *metastorev1.BlockMeta, s *metastorev1.Da
235245
return sm
236246
}
237247

238-
func (c *CompactionPlan) SendRecordedMetrics() error {
239-
return c.metricsExporter.Send()
240-
}
241-
242248
type datasetCompaction struct {
243249
// Dataset name.
244250
name string
@@ -259,7 +265,7 @@ type datasetCompaction struct {
259265

260266
flushOnce sync.Once
261267

262-
metricsRecorder *metrics.Recorder
268+
observer SampleObserver
263269
}
264270

265271
func (b *CompactionPlan) newDatasetCompaction(tenant, name int32) *datasetCompaction {
@@ -308,6 +314,10 @@ func (m *datasetCompaction) compact(ctx context.Context, w *Writer) (err error)
308314
return nil
309315
}
310316

317+
func (m *datasetCompaction) registerSampleObserver(observer SampleObserver) {
318+
m.observer = observer
319+
}
320+
311321
func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
312322
m.path = path
313323
defer func() {
@@ -333,13 +343,6 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
333343
m.indexRewriter = newIndexRewriter(m.path)
334344
m.symbolsRewriter = newSymbolsRewriter(m.path)
335345

336-
if m.parent.meta.CompactionLevel == 1 {
337-
recordingTime := int64(ulid.MustParse(m.parent.meta.Id).Time())
338-
rules := metrics.RecordingRulesFromTenant(m.parent.tenant)
339-
pyroscopeInstance := pyroscopeInstanceHash(m.parent.meta.Shard, m.parent.meta.CreatedBy)
340-
m.metricsRecorder = metrics.NewRecorder(rules, recordingTime, pyroscopeInstance)
341-
}
342-
343346
g, ctx := errgroup.WithContext(ctx)
344347
for _, s := range m.datasets {
345348
s := s
@@ -361,13 +364,6 @@ func (m *datasetCompaction) open(ctx context.Context, path string) (err error) {
361364
return nil
362365
}
363366

364-
func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
365-
buf := make([]byte, 0, 8)
366-
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
367-
buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy))
368-
return fmt.Sprintf("%x", xxhash.Sum64(buf))
369-
}
370-
371367
func (m *datasetCompaction) mergeAndClose(ctx context.Context) (err error) {
372368
defer func() {
373369
err = multierror.New(err, m.close()).Err()
@@ -404,9 +400,7 @@ func (m *datasetCompaction) writeRow(r ProfileEntry) (err error) {
404400
if err = m.symbolsRewriter.rewriteRow(r); err != nil {
405401
return err
406402
}
407-
if m.metricsRecorder != nil {
408-
m.metricsRecorder.RecordRow(r.Fingerprint, r.Labels, r.Row.TotalValue())
409-
}
403+
m.observer.Observe(r)
410404
return m.profilesWriter.writeRow(r)
411405
}
412406

pkg/experiment/compactor/compaction_worker.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
2626
"github.com/grafana/pyroscope/pkg/experiment/block"
2727
"github.com/grafana/pyroscope/pkg/experiment/block/metadata"
28+
metrics2 "github.com/grafana/pyroscope/pkg/experiment/metrics"
2829
"github.com/grafana/pyroscope/pkg/objstore"
2930
"github.com/grafana/pyroscope/pkg/util"
3031
)
@@ -368,7 +369,9 @@ func (w *Worker) runCompaction(job *compactionJob) {
368369
block.WithCompactionObjectOptions(
369370
block.WithObjectMaxSizeLoadInMemory(w.config.SmallObjectSize),
370371
block.WithObjectDownload(sourcedir),
371-
))
372+
),
373+
block.WithSampleObserver(newSampleObserver(job)),
374+
)
372375

373376
switch {
374377
case err == nil:
@@ -417,6 +420,13 @@ func (w *Worker) runCompaction(job *compactionJob) {
417420
_ = deleteGroup.Wait()
418421
}
419422

423+
func newSampleObserver(job *compactionJob) block.SampleObserver {
424+
if job.CompactionLevel == 0 {
425+
return metrics2.NewMetricsObserver(job.Tenant, job.blocks[0])
426+
}
427+
return &metrics2.NoOpObserver{}
428+
}
429+
420430
func (w *Worker) getBlockMetadata(logger log.Logger, job *compactionJob) error {
421431
ctx, cancel := context.WithTimeout(job.ctx, w.config.RequestTimeout)
422432
defer cancel()

pkg/experiment/metrics/observer.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package metrics
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/cespare/xxhash/v2"
7+
"github.com/oklog/ulid"
8+
9+
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
10+
"github.com/grafana/pyroscope/pkg/experiment/block"
11+
)
12+
13+
type MetricsObserver struct {
14+
tenant string
15+
recorder *Recorder
16+
}
17+
18+
func NewMetricsObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsObserver {
19+
recordingTime := int64(ulid.MustParse(meta.Id).Time())
20+
rules := recordingRulesFromTenant(tenant)
21+
pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy)
22+
return &MetricsObserver{
23+
tenant: tenant,
24+
recorder: NewRecorder(rules, recordingTime, pyroscopeInstance),
25+
}
26+
}
27+
28+
func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
29+
buf := make([]byte, 0, 8)
30+
buf = append(buf, byte(shard>>24), byte(shard>>16), byte(shard>>8), byte(shard))
31+
buf = append(buf, byte(createdBy>>24), byte(createdBy>>16), byte(createdBy>>8), byte(createdBy))
32+
return fmt.Sprintf("%x", xxhash.Sum64(buf))
33+
}
34+
35+
func (o *MetricsObserver) Observe(row block.ProfileEntry) {
36+
o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue())
37+
}
38+
39+
func (o *MetricsObserver) Flush() error {
40+
exporter := NewExporter(o.tenant)
41+
exporter.AppendMetrics(o.recorder.Recordings)
42+
return exporter.Send()
43+
}
44+
45+
type NoOpObserver struct{}
46+
47+
func (o *NoOpObserver) Observe(row block.ProfileEntry) {
48+
}
49+
50+
func (o *NoOpObserver) Flush() error {
51+
return nil
52+
}

pkg/experiment/metrics/recorder.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ func NewRecorder(recordingRules []*RecordingRule, recordingTime int64, pyroscope
6666
for i, rule := range recordingRules {
6767
recordings[i] = &Recording{
6868
rule: *rule,
69-
// fps: make(map[model.Fingerprint]*AggregatedFingerprint),
7069
data: make(map[AggregatedFingerprint]*TimeSeries),
7170
state: &recordingState{
7271
fp: nil,
@@ -111,7 +110,7 @@ func generateExportedLabels(labelsMap map[string]string, rec *Recording, pyrosco
111110
Value: rec.rule.metricName,
112111
},
113112
labels.Label{
114-
Name: "__pyroscope_instance__",
113+
Name: "pyroscope_instance",
115114
Value: pyroscopeInstance,
116115
},
117116
}

pkg/experiment/metrics/rules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ type RecordingRule struct {
1111
keepLabels []string
1212
}
1313

14-
func RecordingRulesFromTenant(tenant string) []*RecordingRule {
14+
func recordingRulesFromTenant(tenant string) []*RecordingRule {
1515
// TODO
1616
return []*RecordingRule{
1717
{

0 commit comments

Comments
 (0)