Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setting the metric batch exporter early #11545

Merged
merged 3 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,18 @@ func (s *Runner) Run(ctx context.Context) error {
}
otel.SetTracerProvider(tracerProvider)

exporter, err := apmotel.NewGatherer()
if err != nil {
return err
}
localExporter := telemetry.NewMetricExporter()
meterProvider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithReader(metric.NewPeriodicReader(localExporter)),
)
otel.SetMeterProvider(meterProvider)
tracer.RegisterMetricsGatherer(exporter)

// Ensure the libbeat output and go-elasticsearch clients do not index
// any events to Elasticsearch before the integration is ready.
publishReady := make(chan struct{})
Expand Down Expand Up @@ -420,18 +432,7 @@ func (s *Runner) Run(ctx context.Context) error {
}),
finalBatchProcessor,
})

exporter, err := apmotel.NewGatherer()
if err != nil {
return err
}
localExporter := telemetry.NewMetricExporter(batchProcessor)
meterProvider := metric.NewMeterProvider(
metric.WithReader(exporter),
metric.WithReader(metric.NewPeriodicReader(localExporter)),
)
otel.SetMeterProvider(meterProvider)
tracer.RegisterMetricsGatherer(exporter)
localExporter.SetBatchProcessor(batchProcessor)

agentConfigFetcher, fetcherRunFunc, err := newAgentConfigFetcher(
ctx,
Expand Down
14 changes: 11 additions & 3 deletions internal/telemetry/metric_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ import (
// NewMetricExporter initializes a new MetricExporter
// This export logic is heavily inspired from the OTLP input in apm-data.
// https://github.com/elastic/apm-data/blob/main/input/otlp/metrics.go
func NewMetricExporter(p modelpb.BatchProcessor, opts ...ConfigOption) *MetricExporter {
func NewMetricExporter(opts ...ConfigOption) *MetricExporter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: tied to the question below but if the processor is required we should leave it as part of the method instead of moving it to the config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then it goes back to the issue we currently have, which is that we cannot create the metrics exporter (and therefore get a valid meter) until we have a batch processor.
But we also need a meter earlier, when we create the middlewares/interceptors.

We can't add a new exporter on an existing meter provider either. So this PR makes the batch processor optional to allow us configuring telemetry early, and extending it once we have a full boot.

cfg := newConfig(opts...)

return &MetricExporter{
processor: p,

processor: cfg.processor,
metricFilter: cfg.MetricFilter,
temporalitySelector: cfg.TemporalitySelector,
aggregationSelector: cfg.AggregationSelector,
Expand All @@ -56,6 +55,11 @@ type MetricExporter struct {
aggregationSelector metric.AggregationSelector
}

// SetBatchProcessor sets a batch processor on the exporter
func (e *MetricExporter) SetBatchProcessor(p modelpb.BatchProcessor) {
e.processor = p
}

// Temporality returns the Temporality to use for an instrument kind.
func (e *MetricExporter) Temporality(k metric.InstrumentKind) metricdata.Temporality {
return e.temporalitySelector(k)
Expand All @@ -67,6 +71,10 @@ func (e *MetricExporter) Aggregation(k metric.InstrumentKind) metric.Aggregation
}

func (e *MetricExporter) Export(ctx context.Context, rm *metricdata.ResourceMetrics) error {
if e.processor == nil {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: is it considered a valid state to run the exporter without a processor ? If not we should return an error here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may then end up with errors at boot time just because one export happened before the batch processor was setup. I don't think that's a good behavior either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I might be missing some context. If we end up with errors for early exports, why are we creating the exporter early ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant we may end up with early errors if we don't expect the processor to be nil, hence this check

}

batch := modelpb.Batch{}
now := time.Now()

Expand Down
15 changes: 15 additions & 0 deletions internal/telemetry/metric_exporter_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package telemetry
import (
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"

"github.com/elastic/apm-data/model/modelpb"
)

// Override default otel/prometheus boundaries, as we skip empty buckets and therefore able to use more accurate and higher range boundaries.
Expand All @@ -34,6 +36,7 @@ var customHistogramBoundaries = []float64{
}

type Config struct {
processor modelpb.BatchProcessor
MetricFilter []string
TemporalitySelector metric.TemporalitySelector
AggregationSelector metric.AggregationSelector
Expand Down Expand Up @@ -76,6 +79,18 @@ func defaultAggregationSelector(ik metric.InstrumentKind) metric.Aggregation {
}
}

// WithBatchProcessor configures the batch processor that will be used by the
// metric exporter.
// Using this option is the equivalent of using `SetBatchProcessor`.
//
// Defaults to not running any batch processing.
func WithBatchProcessor(b modelpb.BatchProcessor) ConfigOption {
return func(cfg Config) Config {
cfg.processor = b
return cfg
}
}

// WithMetricFilter configured the metrics filter. Any metric filtered here
// will be the only ones to be exported. All other metrics will be ignored.
//
Expand Down
21 changes: 20 additions & 1 deletion internal/telemetry/metric_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,19 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/model/modelpb"
)

func TestMetricExporterWithNoBatchProcessor(t *testing.T) {
e := NewMetricExporter()
var rm metricdata.ResourceMetrics
err := e.Export(context.Background(), &rm)
assert.NoError(t, err)
}

func TestMetricExporter(t *testing.T) {
service := modelpb.Service{Name: "apm-server", Language: &modelpb.Language{Name: "go"}}

Expand Down Expand Up @@ -293,7 +301,8 @@ func TestMetricExporter(t *testing.T) {
batch = append(batch, (*b)...)
return nil
})
e := NewMetricExporter(p, tt.exporterConfig...)
tt.exporterConfig = append(tt.exporterConfig, WithBatchProcessor(p))
e := NewMetricExporter(tt.exporterConfig...)

provider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(e)),
Expand All @@ -309,6 +318,16 @@ func TestMetricExporter(t *testing.T) {
}
}

func TestMetricExporterSetBatchProcessor(t *testing.T) {
p := modelpb.ProcessBatchFunc(func(ctx context.Context, b *modelpb.Batch) error {
return nil
})
e := NewMetricExporter()
assert.Nil(t, e.processor)
e.SetBatchProcessor(p)
assert.NotNil(t, e.processor)
}

func assertEventsMatch(t *testing.T, expected []*modelpb.APMEvent, actual []*modelpb.APMEvent) {
t.Helper()
sort.Slice(expected, func(i, j int) bool {
Expand Down