diff --git a/internal/beater/beater.go b/internal/beater/beater.go index 8b61c044790..fa7e7e4128c 100644 --- a/internal/beater/beater.go +++ b/internal/beater/beater.go @@ -307,6 +307,18 @@ func (s *Runner) Run(ctx context.Context) error { } otel.SetTracerProvider(tracerProvider) + exporter, err := apmotel.NewGatherer() + if err != nil { + return err + } + localExporter := telemetry.NewMetricExporter() + meterProvider := metric.NewMeterProvider( + metric.WithReader(exporter), + metric.WithReader(metric.NewPeriodicReader(localExporter)), + ) + otel.SetMeterProvider(meterProvider) + tracer.RegisterMetricsGatherer(exporter) + // Ensure the libbeat output and go-elasticsearch clients do not index // any events to Elasticsearch before the integration is ready. publishReady := make(chan struct{}) @@ -420,18 +432,7 @@ func (s *Runner) Run(ctx context.Context) error { }), finalBatchProcessor, }) - - exporter, err := apmotel.NewGatherer() - if err != nil { - return err - } - localExporter := telemetry.NewMetricExporter(batchProcessor) - meterProvider := metric.NewMeterProvider( - metric.WithReader(exporter), - metric.WithReader(metric.NewPeriodicReader(localExporter)), - ) - otel.SetMeterProvider(meterProvider) - tracer.RegisterMetricsGatherer(exporter) + localExporter.SetBatchProcessor(batchProcessor) agentConfigFetcher, fetcherRunFunc, err := newAgentConfigFetcher( ctx, diff --git a/internal/telemetry/metric_exporter.go b/internal/telemetry/metric_exporter.go index 4fb1fb42518..c260ed56adf 100644 --- a/internal/telemetry/metric_exporter.go +++ b/internal/telemetry/metric_exporter.go @@ -34,12 +34,11 @@ import ( // NewMetricExporter initializes a new MetricExporter // This export logic is heavily inspired from the OTLP input in apm-data. // https://github.com/elastic/apm-data/blob/main/input/otlp/metrics.go -func NewMetricExporter(p modelpb.BatchProcessor, opts ...ConfigOption) *MetricExporter { +func NewMetricExporter(opts ...ConfigOption) *MetricExporter { cfg := newConfig(opts...) return &MetricExporter{ - processor: p, - + processor: cfg.processor, metricFilter: cfg.MetricFilter, temporalitySelector: cfg.TemporalitySelector, aggregationSelector: cfg.AggregationSelector, @@ -56,6 +55,11 @@ type MetricExporter struct { aggregationSelector metric.AggregationSelector } +// SetBatchProcessor sets a batch processor on the exporter +func (e *MetricExporter) SetBatchProcessor(p modelpb.BatchProcessor) { + e.processor = p +} + // Temporality returns the Temporality to use for an instrument kind. func (e *MetricExporter) Temporality(k metric.InstrumentKind) metricdata.Temporality { return e.temporalitySelector(k) @@ -67,6 +71,10 @@ func (e *MetricExporter) Aggregation(k metric.InstrumentKind) metric.Aggregation } func (e *MetricExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error { + if e.processor == nil { + return nil + } + batch := modelpb.Batch{} now := time.Now() diff --git a/internal/telemetry/metric_exporter_config.go b/internal/telemetry/metric_exporter_config.go index a968b765c40..cd5b1f21264 100644 --- a/internal/telemetry/metric_exporter_config.go +++ b/internal/telemetry/metric_exporter_config.go @@ -20,6 +20,8 @@ package telemetry import ( "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/elastic/apm-data/model/modelpb" ) // Override default otel/prometheus boundaries, as we skip empty buckets and therefore able to use more accurate and higher range boundaries. @@ -34,6 +36,7 @@ var customHistogramBoundaries = []float64{ } type Config struct { + processor modelpb.BatchProcessor MetricFilter []string TemporalitySelector metric.TemporalitySelector AggregationSelector metric.AggregationSelector @@ -76,6 +79,18 @@ func defaultAggregationSelector(ik metric.InstrumentKind) metric.Aggregation { } } +// WithBatchProcessor configures the batch processor that will be used by the +// metric exporter. +// Using this option is the equivalent of using `SetBatchProcessor`. +// +// Defaults to not running any batch processing. +func WithBatchProcessor(b modelpb.BatchProcessor) ConfigOption { + return func(cfg Config) Config { + cfg.processor = b + return cfg + } +} + // WithMetricFilter configured the metrics filter. Any metric filtered here // will be the only ones to be exported. All other metrics will be ignored. // diff --git a/internal/telemetry/metric_exporter_test.go b/internal/telemetry/metric_exporter_test.go index 6c260171546..f1425a863c0 100644 --- a/internal/telemetry/metric_exporter_test.go +++ b/internal/telemetry/metric_exporter_test.go @@ -30,11 +30,19 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "google.golang.org/protobuf/testing/protocmp" "github.com/elastic/apm-data/model/modelpb" ) +func TestMetricExporterWithNoBatchProcessor(t *testing.T) { + e := NewMetricExporter() + var rm metricdata.ResourceMetrics + err := e.Export(context.Background(), &rm) + assert.NoError(t, err) +} + func TestMetricExporter(t *testing.T) { service := modelpb.Service{Name: "apm-server", Language: &modelpb.Language{Name: "go"}} @@ -293,7 +301,8 @@ func TestMetricExporter(t *testing.T) { batch = append(batch, (*b)...) return nil }) - e := NewMetricExporter(p, tt.exporterConfig...) + tt.exporterConfig = append(tt.exporterConfig, WithBatchProcessor(p)) + e := NewMetricExporter(tt.exporterConfig...) provider := sdkmetric.NewMeterProvider( sdkmetric.WithReader(sdkmetric.NewPeriodicReader(e)), @@ -309,6 +318,16 @@ func TestMetricExporter(t *testing.T) { } } +func TestMetricExporterSetBatchProcessor(t *testing.T) { + p := modelpb.ProcessBatchFunc(func(ctx context.Context, b *modelpb.Batch) error { + return nil + }) + e := NewMetricExporter() + assert.Nil(t, e.processor) + e.SetBatchProcessor(p) + assert.NotNil(t, e.processor) +} + func assertEventsMatch(t *testing.T, expected []*modelpb.APMEvent, actual []*modelpb.APMEvent) { t.Helper() sort.Slice(expected, func(i, j int) bool {