From a4018970388f532e06a19fc2a0aec042668b5cab Mon Sep 17 00:00:00 2001 From: Yang Song Date: Thu, 21 Nov 2024 03:54:07 -0500 Subject: [PATCH 1/2] [pkg/datadog] Refactor the API that provides metrics translator (#36474) #### Description Refactor the API that provides metrics translator so that it can be used in the datadog-agent repo. --- .chloggen/dd-config-api.yaml | 27 ++++++++++++++++++ exporter/datadogexporter/metrics_exporter.go | 25 ++++++++++++++-- pkg/datadog/config/metrics.go | 30 ++------------------ pkg/datadog/go.mod | 4 +-- pkg/datadog/go.sum | 4 --- 5 files changed, 54 insertions(+), 36 deletions(-) create mode 100644 .chloggen/dd-config-api.yaml diff --git a/.chloggen/dd-config-api.yaml b/.chloggen/dd-config-api.yaml new file mode 100644 index 000000000000..653aceee4b9e --- /dev/null +++ b/.chloggen/dd-config-api.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/datadog + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Refactor the API that provides metrics translator" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36474] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: "This is API change only and does not affect end users" + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/datadogexporter/metrics_exporter.go b/exporter/datadogexporter/metrics_exporter.go index 18f5b76fe122..ed2938e0a474 100644 --- a/exporter/datadogexporter/metrics_exporter.go +++ b/exporter/datadogexporter/metrics_exporter.go @@ -19,6 +19,7 @@ import ( "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" otlpmetrics "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -29,9 +30,20 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/metrics/sketches" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/datadogexporter/internal/scrub" - datadogconfig "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog/config" ) +var metricRemappingDisableddFeatureGate = featuregate.GlobalRegistry().MustRegister( + "exporter.datadogexporter.metricremappingdisabled", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled the Datadog Exporter remaps OpenTelemetry semantic conventions to Datadog semantic conventions. This feature gate is only for internal use."), + featuregate.WithRegisterReferenceURL("https://docs.datadoghq.com/opentelemetry/schema_semantics/metrics_mapping/"), +) + +// isMetricRemappingDisabled returns true if the datadogexporter should generate Datadog-compliant metrics from OpenTelemetry metrics +func isMetricRemappingDisabled() bool { + return metricRemappingDisableddFeatureGate.IsEnabled() +} + type metricsExporter struct { params exporter.Settings cfg *Config @@ -61,7 +73,16 @@ func newMetricsExporter( metadataReporter *inframetadata.Reporter, statsOut chan []byte, ) (*metricsExporter, error) { - tr, err := datadogconfig.TranslatorFromConfig(params.TelemetrySettings, cfg.Metrics, attrsTranslator, sourceProvider, statsOut) + options := cfg.Metrics.ToTranslatorOpts() + options = append(options, otlpmetrics.WithFallbackSourceProvider(sourceProvider)) + options = append(options, otlpmetrics.WithStatsOut(statsOut)) + if isMetricRemappingDisabled() { + params.TelemetrySettings.Logger.Warn("Metric remapping is disabled in the Datadog exporter. OpenTelemetry metrics must be mapped to Datadog semantics before metrics are exported to Datadog (ex: via a processor).") + } else { + options = append(options, otlpmetrics.WithRemapping()) + } + + tr, err := otlpmetrics.NewTranslator(params.TelemetrySettings, attrsTranslator, options...) if err != nil { return nil, err } diff --git a/pkg/datadog/config/metrics.go b/pkg/datadog/config/metrics.go index fc2c307a1be3..c17a148af8e4 100644 --- a/pkg/datadog/config/metrics.go +++ b/pkg/datadog/config/metrics.go @@ -7,12 +7,8 @@ import ( "encoding" "fmt" - "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes" - "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes/source" otlpmetrics "github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" - "go.opentelemetry.io/collector/featuregate" ) // MetricsConfig defines the metrics exporter specific configuration options @@ -212,29 +208,10 @@ type MetricsExporterConfig struct { InstrumentationScopeMetadataAsTags bool `mapstructure:"instrumentation_scope_metadata_as_tags"` } -var metricRemappingDisableddFeatureGate = featuregate.GlobalRegistry().MustRegister( - "exporter.datadogexporter.metricremappingdisabled", - featuregate.StageAlpha, - featuregate.WithRegisterDescription("When enabled the Datadog Exporter remaps OpenTelemetry semantic conventions to Datadog semantic conventions. This feature gate is only for internal use."), - featuregate.WithRegisterReferenceURL("https://docs.datadoghq.com/opentelemetry/schema_semantics/metrics_mapping/"), -) - -// isMetricRemappingDisabled returns true if the datadogexporter should generate Datadog-compliant metrics from OpenTelemetry metrics -func isMetricRemappingDisabled() bool { - return metricRemappingDisableddFeatureGate.IsEnabled() -} - -// TranslatorFromConfig creates a new metrics translator from the exporter -func TranslatorFromConfig(set component.TelemetrySettings, mcfg MetricsConfig, attrsTranslator *attributes.Translator, sourceProvider source.Provider, statsOut chan []byte) (*otlpmetrics.Translator, error) { +// ToTranslatorOpts returns a list of metrics translator options from the metrics config +func (mcfg MetricsConfig) ToTranslatorOpts() []otlpmetrics.TranslatorOption { options := []otlpmetrics.TranslatorOption{ otlpmetrics.WithDeltaTTL(mcfg.DeltaTTL), - otlpmetrics.WithFallbackSourceProvider(sourceProvider), - } - - if isMetricRemappingDisabled() { - set.Logger.Warn("Metric remapping is disabled in the Datadog exporter. OpenTelemetry metrics must be mapped to Datadog semantics before metrics are exported to Datadog (ex: via a processor).") - } else { - options = append(options, otlpmetrics.WithRemapping()) } if mcfg.HistConfig.SendAggregations { @@ -262,6 +239,5 @@ func TranslatorFromConfig(set component.TelemetrySettings, mcfg MetricsConfig, a options = append(options, otlpmetrics.WithInitialCumulMonoValueMode( otlpmetrics.InitialCumulMonoValueMode(mcfg.SumConfig.InitialCumulativeMonotonicMode))) - options = append(options, otlpmetrics.WithStatsOut(statsOut)) - return otlpmetrics.NewTranslator(set, attrsTranslator, options...) + return options } diff --git a/pkg/datadog/go.mod b/pkg/datadog/go.mod index 49a3971b006c..4a5aa1aa865d 100644 --- a/pkg/datadog/go.mod +++ b/pkg/datadog/go.mod @@ -4,7 +4,6 @@ go 1.22.0 require ( github.com/DataDog/datadog-agent/pkg/util/hostname/validate v0.59.0 - github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.21.0 github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/metrics v0.21.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.114.0 @@ -17,7 +16,6 @@ require ( go.opentelemetry.io/collector/config/configtls v1.20.0 go.opentelemetry.io/collector/confmap v1.20.0 go.opentelemetry.io/collector/exporter v0.114.0 - go.opentelemetry.io/collector/featuregate v1.20.0 go.uber.org/zap v1.27.0 ) @@ -25,6 +23,7 @@ require ( github.com/DataDog/datadog-agent/pkg/proto v0.52.0-devel // indirect github.com/DataDog/datadog-agent/pkg/util/log v0.59.0 // indirect github.com/DataDog/datadog-agent/pkg/util/scrubber v0.59.0 // indirect + github.com/DataDog/opentelemetry-mapping-go/pkg/otlp/attributes v0.21.0 // indirect github.com/DataDog/opentelemetry-mapping-go/pkg/quantile v0.21.0 // indirect github.com/DataDog/sketches-go v1.4.4 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -40,7 +39,6 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect diff --git a/pkg/datadog/go.sum b/pkg/datadog/go.sum index 1b0b76693743..9b3316a54d65 100644 --- a/pkg/datadog/go.sum +++ b/pkg/datadog/go.sum @@ -53,8 +53,6 @@ github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= -github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -160,8 +158,6 @@ go.opentelemetry.io/collector/extension/experimental/storage v0.114.0 h1:hLyX9Uv go.opentelemetry.io/collector/extension/experimental/storage v0.114.0/go.mod h1:WqYRQVJjJLE1rm+y/ks1wPdPRGWePEvE1VO07xm2J2k= go.opentelemetry.io/collector/extension/extensiontest v0.114.0 h1:ibXDms1qrswlvlR6b3d2BeyI8sXUXoFV11yOi9Sop8o= go.opentelemetry.io/collector/extension/extensiontest v0.114.0/go.mod h1:/bOYmqu5yTDfI1bJZUxFqm8ZtmcodpquebiSxiQxtDY= -go.opentelemetry.io/collector/featuregate v1.20.0 h1:Mi7nMy/q52eruI+6jWnMKUOeM55XvwoPnGcdB1++O8c= -go.opentelemetry.io/collector/featuregate v1.20.0/go.mod h1:47xrISO71vJ83LSMm8+yIDsUbKktUp48Ovt7RR6VbRs= go.opentelemetry.io/collector/pdata v1.20.0 h1:ePcwt4bdtISP0loHaE+C9xYoU2ZkIvWv89Fob16o9SM= go.opentelemetry.io/collector/pdata v1.20.0/go.mod h1:Ox1YVLe87cZDB/TL30i4SUz1cA5s6AM6SpFMfY61ICs= go.opentelemetry.io/collector/pdata/pprofile v0.114.0 h1:pUNfTzsI/JUTiE+DScDM4lsrPoxnVNLI2fbTxR/oapo= From b6b8679719326f018885324d260a084083740982 Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Thu, 21 Nov 2024 08:58:06 +0000 Subject: [PATCH 2/2] [chore][exporter/elasticsearch] Test metrics in integration tests (#36455) #### Description Add metrics to integration tests #### Link to tracking issue #### Testing #### Documentation Co-authored-by: Christos Markou --- .../integrationtest/datareceiver.go | 51 ++++++++++++++++--- .../integrationtest/exporter_bench_test.go | 37 +++++++++++++- .../integrationtest/exporter_test.go | 4 +- 3 files changed, 83 insertions(+), 9 deletions(-) diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index 79a2d46dc3d4..0039b1fd893e 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -21,6 +21,7 @@ import ( "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" @@ -32,12 +33,17 @@ import ( ) const ( - // TestLogsIndex is used by the mock ES data receiver to indentify log events. + // TestLogsIndex is used by the mock ES data receiver to identify log events. // Exporter LogsIndex configuration must be configured with TestLogsIndex for // the data receiver to work properly TestLogsIndex = "logs-test-idx" - // TestTracesIndex is used by the mock ES data receiver to indentify trace + // TestMetricsIndex is used by the mock ES data receiver to identify metric events. + // Exporter MetricsIndex configuration must be configured with TestMetricsIndex for + // the data receiver to work properly + TestMetricsIndex = "metrics-test-idx" + + // TestTracesIndex is used by the mock ES data receiver to identify trace // events. Exporter TracesIndex configuration must be configured with // TestTracesIndex for the data receiver to work properly TestTracesIndex = "traces-test-idx" @@ -79,11 +85,12 @@ func withBatcherEnabled(enabled bool) dataReceiverOption { } } -func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error { +func (es *esDataReceiver) Start(tc consumer.Traces, mc consumer.Metrics, lc consumer.Logs) error { factory := receiver.NewFactory( component.MustNewType("mockelasticsearch"), createDefaultConfig, receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment), + receiver.WithMetrics(createMetricsReceiver, component.StabilityLevelDevelopment), receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), ) esURL, err := url.Parse(es.endpoint) @@ -101,6 +108,10 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu if err != nil { return fmt.Errorf("failed to create logs receiver: %w", err) } + metricsReceiver, err := factory.CreateMetrics(context.Background(), set, cfg, mc) + if err != nil { + return fmt.Errorf("failed to create metrics receiver: %w", err) + } tracesReceiver, err := factory.CreateTraces(context.Background(), set, cfg, tc) if err != nil { return fmt.Errorf("failed to create traces receiver: %w", err) @@ -108,6 +119,7 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu // Since we use SharedComponent both receivers should be same require.Same(es.t, logsReceiver, tracesReceiver) + require.Same(es.t, logsReceiver, metricsReceiver) es.receiver = logsReceiver return es.receiver.Start(context.Background(), componenttest.NewNopHost()) @@ -126,7 +138,14 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { elasticsearch: endpoints: [%s] logs_index: %s + logs_dynamic_index: + enabled: false + metrics_index: %s + metrics_dynamic_index: + enabled: false traces_index: %s + traces_dynamic_index: + enabled: false sending_queue: enabled: true retry: @@ -134,7 +153,7 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { initial_interval: 100ms max_interval: 1s max_requests: 10000`, - es.endpoint, TestLogsIndex, TestTracesIndex, + es.endpoint, TestLogsIndex, TestMetricsIndex, TestTracesIndex, ) if es.batcherEnabled == nil { @@ -189,6 +208,19 @@ func createLogsReceiver( return receiver, nil } +func createMetricsReceiver( + _ context.Context, + params receiver.Settings, + rawCfg component.Config, + next consumer.Metrics, +) (receiver.Metrics, error) { + receiver := receivers.GetOrAdd(rawCfg, func() component.Component { + return newMockESReceiver(params, rawCfg.(*config)) + }) + receiver.Unwrap().(*mockESReceiver).metricsConsumer = next + return receiver, nil +} + func createTracesReceiver( _ context.Context, params receiver.Settings, @@ -206,8 +238,9 @@ type mockESReceiver struct { params receiver.Settings config *config - tracesConsumer consumer.Traces - logsConsumer consumer.Logs + tracesConsumer consumer.Traces + logsConsumer consumer.Logs + metricsConsumer consumer.Metrics server *http.Server } @@ -231,10 +264,12 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error // Ideally bulk request items should be converted to the corresponding event record // however, since we only assert count for now there is no need to do the actual - // translation. Instead we use a pre-initialized empty logs and traces model to + // translation. Instead we use a pre-initialized empty models to // reduce allocation impact on tests and benchmarks. emptyLogs := plog.NewLogs() emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + emptyMetrics := pmetric.NewMetrics() + emptyMetrics.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptySum().DataPoints().AppendEmpty() emptyTrace := ptrace.NewTraces() emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() @@ -260,6 +295,8 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error switch item.Index { case TestLogsIndex: consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs) + case TestMetricsIndex: + consumeErr = es.metricsConsumer.ConsumeMetrics(context.Background(), emptyMetrics) case TestTracesIndex: consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace) } diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 50ed649dca0d..49106c7b084f 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -26,7 +26,7 @@ import ( ) func BenchmarkExporter(b *testing.B) { - for _, eventType := range []string{"logs", "traces"} { + for _, eventType := range []string{"logs", "metrics", "traces"} { for _, mappingMode := range []string{"none", "ecs", "raw"} { for _, tc := range []struct { name string @@ -41,6 +41,8 @@ func BenchmarkExporter(b *testing.B) { switch eventType { case "logs": benchmarkLogs(b, tc.batchSize, mappingMode) + case "metrics": + benchmarkMetrics(b, tc.batchSize, mappingMode) case "traces": benchmarkTraces(b, tc.batchSize, mappingMode) } @@ -79,6 +81,35 @@ func benchmarkLogs(b *testing.B, batchSize int, mappingMode string) { require.NoError(b, exporter.Shutdown(ctx)) } +func benchmarkMetrics(b *testing.B, batchSize int, mappingMode string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exporterSettings := exportertest.NewNopSettings() + exporterSettings.TelemetrySettings.Logger = zaptest.NewLogger(b, zaptest.Level(zap.WarnLevel)) + runnerCfg := prepareBenchmark(b, batchSize, mappingMode) + exporter, err := runnerCfg.factory.CreateMetrics( + ctx, exporterSettings, runnerCfg.esCfg, + ) + require.NoError(b, err) + require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) + + b.ReportAllocs() + b.ResetTimer() + b.StopTimer() + for i := 0; i < b.N; i++ { + metrics, _ := runnerCfg.provider.GenerateMetrics() + b.StartTimer() + require.NoError(b, exporter.ConsumeMetrics(ctx, metrics)) + b.StopTimer() + } + b.ReportMetric( + float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), + "events/s", + ) + require.NoError(b, exporter.Shutdown(ctx)) +} + func benchmarkTraces(b *testing.B, batchSize int, mappingMode string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -134,7 +165,11 @@ func prepareBenchmark( cfg.esCfg.Mapping.Mode = mappingMode cfg.esCfg.Endpoints = []string{receiver.endpoint} cfg.esCfg.LogsIndex = TestLogsIndex + cfg.esCfg.LogsDynamicIndex.Enabled = false + cfg.esCfg.MetricsIndex = TestMetricsIndex + cfg.esCfg.MetricsDynamicIndex.Enabled = false cfg.esCfg.TracesIndex = TestTracesIndex + cfg.esCfg.TracesDynamicIndex.Enabled = false cfg.esCfg.Flush.Interval = 10 * time.Millisecond cfg.esCfg.NumWorkers = 1 diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index 013994898511..57b6f936713f 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -17,7 +17,7 @@ import ( ) func TestExporter(t *testing.T) { - for _, eventType := range []string{"logs", "traces"} { + for _, eventType := range []string{"logs", "metrics", "traces"} { for _, tc := range []struct { name string @@ -68,6 +68,8 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool switch eventType { case "logs": sender = testbed.NewOTLPLogsDataSender(host, port) + case "metrics": + sender = testbed.NewOTLPMetricDataSender(host, port) case "traces": sender = testbed.NewOTLPTraceDataSender(host, port) default: