Skip to content

Commit 2bb97ad

Browse files
committed
flush async, rename MetricsObserver to SampleObserver, agent name
1 parent 56bc260 commit 2bb97ad

File tree

3 files changed

+18
-16
lines changed

3 files changed

+18
-16
lines changed

pkg/experiment/compactor/compaction_worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ func (w *Worker) runCompaction(job *compactionJob) {
463463

464464
func newSampleObserver(job *compactionJob) block.SampleObserver {
465465
if job.CompactionLevel == 0 {
466-
return metricsexport.NewMetricsObserver(job.Tenant, job.blocks[0])
466+
return metricsexport.NewSampleObserver(job.Tenant, job.blocks[0])
467467
}
468468
return &block.NoOpObserver{}
469469
}

pkg/experiment/metrics/exporter.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,24 @@ type Config struct {
2525
password config.Secret
2626
}
2727

28-
func NewExporter(tenant string) *Exporter {
28+
func NewExporter(tenant string, recordings []*Recording) *Exporter {
2929
cfg := configFromTenant(tenant)
30-
return &Exporter{
30+
exporter := &Exporter{
3131
config: cfg,
3232
data: map[AggregatedFingerprint]*TimeSeries{},
3333
}
34-
}
35-
36-
func (e *Exporter) AppendMetrics(recordings []*Recording) {
3734
for _, r := range recordings {
3835
for fp, ts := range r.data {
39-
e.data[fp] = ts
36+
exporter.data[fp] = ts
4037
}
4138
}
39+
return exporter
4240
}
4341

4442
func (e *Exporter) Send() error {
43+
if len(e.data) == 0 {
44+
return nil
45+
}
4546
if e.client == nil {
4647
e.client = newClient(e.config)
4748
}
@@ -78,7 +79,7 @@ func newClient(cfg Config) remote.WriteClient {
7879
panic(err)
7980
}
8081

81-
c, err := remote.NewWriteClient("exporter", &remote.ClientConfig{
82+
c, err := remote.NewWriteClient("pyroscope-metrics-exporter", &remote.ClientConfig{
8283
URL: &config.URL{URL: wURL},
8384
Timeout: model.Duration(time.Second * 10),
8485
HTTPClientConfig: config.HTTPClientConfig{

pkg/experiment/metrics/observer.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,16 @@ import (
1010
"github.com/grafana/pyroscope/pkg/experiment/block"
1111
)
1212

13-
type MetricsObserver struct {
13+
type SampleObserver struct {
1414
tenant string
1515
recorder *Recorder
1616
}
1717

18-
func NewMetricsObserver(tenant string, meta *metastorev1.BlockMeta) *MetricsObserver {
18+
func NewSampleObserver(tenant string, meta *metastorev1.BlockMeta) *SampleObserver {
1919
recordingTime := int64(ulid.MustParse(meta.Id).Time())
2020
rules := recordingRulesFromTenant(tenant)
2121
pyroscopeInstance := pyroscopeInstanceHash(meta.Shard, meta.CreatedBy)
22-
return &MetricsObserver{
22+
return &SampleObserver{
2323
tenant: tenant,
2424
recorder: NewRecorder(rules, recordingTime, pyroscopeInstance),
2525
}
@@ -32,12 +32,13 @@ func pyroscopeInstanceHash(shard uint32, createdBy int32) string {
3232
return fmt.Sprintf("%x", xxhash.Sum64(buf))
3333
}
3434

35-
func (o *MetricsObserver) Observe(row block.ProfileEntry) {
35+
func (o *SampleObserver) Observe(row block.ProfileEntry) {
3636
o.recorder.RecordRow(row.Fingerprint, row.Labels, row.Row.TotalValue())
3737
}
3838

39-
func (o *MetricsObserver) Flush() error {
40-
exporter := NewExporter(o.tenant)
41-
exporter.AppendMetrics(o.recorder.Recordings)
42-
return exporter.Send()
39+
func (o *SampleObserver) Flush() error {
40+
go func() {
41+
NewExporter(o.tenant, o.recorder.Recordings).Send() // TODO log error
42+
}()
43+
return nil
4344
}

0 commit comments

Comments
 (0)