diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 8cad87a5329e..29532caf3f12 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -70,6 +70,7 @@ type prwExporter struct { exporterSettings prometheusremotewrite.Settings telemetry prwTelemetry batchTimeSeriesState batchTimeSeriesState + converter prometheusremotewrite.PrometheusConverter } func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) { @@ -126,6 +127,7 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) { }, telemetry: prwTelemetry, batchTimeSeriesState: newBatchTimeSericesState(), + converter: *prometheusremotewrite.NewPrometheusConverter(), } prwe.wal = newWAL(cfg.WAL, prwe.export) @@ -173,7 +175,8 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er return errors.New("shutdown has been called") default: - tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) + tsMap, err := prwe.converter.FromMetrics(md, prwe.exporterSettings) + defer prwe.converter.Reset() if err != nil { prwe.telemetry.recordTranslationFailure(ctx) prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap))) diff --git a/pkg/translator/prometheusremotewrite/helper.go b/pkg/translator/prometheusremotewrite/helper.go index cb03a7c32959..59e2880405ae 100644 --- a/pkg/translator/prometheusremotewrite/helper.go +++ b/pkg/translator/prometheusremotewrite/helper.go @@ -11,6 +11,7 @@ import ( "slices" "sort" "strconv" + "strings" "time" "unicode/utf8" @@ -96,46 +97,40 @@ var seps = []byte{'\xff'} // createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. // Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and // if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. -func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, +func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, ignoreAttrs []string, logOnOverwrite bool, extras ...string) []prompb.Label { resourceAttrs := resource.Attributes() serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) - // Calculate the maximum possible number of labels we could return so we can preallocate l - maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 - - if haveServiceName { - maxLabelCount++ - } - - if haveInstanceID { - maxLabelCount++ - } - // map ensures no duplicate label name - l := make(map[string]string, maxLabelCount) + l := c.labelsMap + clear(l) + + // store duplicate labels separately in a throwaway map + // assuming this is the less common case + collisions := make(map[string][]string) - // Ensure attributes are sorted by key for consistent merging of keys which - // collide when sanitized. - labels := make([]prompb.Label, 0, maxLabelCount) // XXX: Should we always drop service namespace/service name/service instance ID from the labels // (as they get mapped to other Prometheus labels)? attributes.Range(func(key string, value pcommon.Value) bool { if !slices.Contains(ignoreAttrs, key) { - labels = append(labels, prompb.Label{Name: key, Value: value.AsString()}) + var finalKey = prometheustranslator.NormalizeLabel(key) + if _, alreadyExists := l[finalKey]; alreadyExists { + collisions[finalKey] = append(collisions[finalKey], value.AsString()) + } else { + l[finalKey] = value.AsString() + } } return true }) - sort.Stable(ByLabelName(labels)) - for _, label := range labels { - var finalKey = prometheustranslator.NormalizeLabel(label.Name) - if existingValue, alreadyExists := l[finalKey]; alreadyExists { - l[finalKey] = existingValue + ";" + label.Value - } else { - l[finalKey] = label.Value - } + for key, values := range collisions { + values = append(values, l[key]) + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + sort.Strings(values) + l[key] = strings.Join(values, ";") } // Map service.name + service.namespace to job @@ -175,12 +170,12 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa l[name] = extras[i+1] } - labels = labels[:0] + startIndex := len(c.labels) for k, v := range l { - labels = append(labels, prompb.Label{Name: k, Value: v}) + c.labels = append(c.labels, prompb.Label{Name: k, Value: v}) } - return labels + return c.labels[startIndex:] } // isValidAggregationTemporality checks whether an OTel metric has a valid @@ -200,12 +195,12 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { return false } -func (c *prometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, +func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, resource pcommon.Resource, settings Settings, baseName string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) // If the sum is unset, it indicates the _sum metric point should be // omitted @@ -383,12 +378,12 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp { return b } -func (c *prometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, +func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, settings Settings, baseName string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) // treat sum as a sample in an individual TimeSeries sum := &prompb.Sample{ @@ -456,7 +451,7 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr // getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. // Otherwise it creates a new one and returns that, and true. -func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) { +func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) { h := timeSeriesSignature(lbls) ts := c.unique[h] if ts != nil { @@ -492,7 +487,7 @@ func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*promp // addTimeSeriesIfNeeded adds a corresponding time series if it doesn't already exist. // If the time series doesn't already exist, it gets added with startTimestamp for its value and timestamp for its timestamp, // both converted to milliseconds. -func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) { +func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) { ts, created := c.getOrCreateTimeSeries(lbls) if created { ts.Samples = []prompb.Sample{ @@ -506,7 +501,7 @@ func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi } // addResourceTargetInfo converts the resource to the target info metric. -func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *prometheusConverter) { +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) { if settings.DisableTargetInfo || timestamp == 0 { return } @@ -534,7 +529,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta name = settings.Namespace + "_" + name } - labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) + labels := converter.createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) haveIdentifier := false for _, l := range labels { if l.Name == model.JobLabel || l.Name == model.InstanceLabel { diff --git a/pkg/translator/prometheusremotewrite/helper_test.go b/pkg/translator/prometheusremotewrite/helper_test.go index 8894c0f7a27d..19f120a6026d 100644 --- a/pkg/translator/prometheusremotewrite/helper_test.go +++ b/pkg/translator/prometheusremotewrite/helper_test.go @@ -115,7 +115,7 @@ func TestPrometheusConverter_addSample(t *testing.T) { } t.Run("empty_case", func(t *testing.T) { - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addSample(nil, nil) assert.Empty(t, converter.unique) assert.Empty(t, converter.conflicts) @@ -159,7 +159,7 @@ func TestPrometheusConverter_addSample(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addSample(&tt.testCase[0].sample, tt.testCase[0].labels) converter.addSample(&tt.testCase[1].sample, tt.testCase[1].labels) assert.Exactly(t, tt.want, converter.unique) @@ -359,8 +359,9 @@ func Test_createLabelSet(t *testing.T) { } // run tests for _, tt := range tests { + c := NewPrometheusConverter() t.Run(tt.name, func(t *testing.T) { - assert.ElementsMatch(t, tt.want, createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...)) + assert.ElementsMatch(t, tt.want, c.createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...)) }) } } @@ -375,10 +376,15 @@ func BenchmarkCreateAttributes(b *testing.B) { m.PutInt("test-int-key", 123) m.PutBool("test-bool-key", true) + c := NewPrometheusConverter() + // preallocate slice to simulate a fully-grown buffer + c.labels = make([]prompb.Label, 0, b.N*m.Len()) + b.ReportAllocs() b.ResetTimer() + for i := 0; i < b.N; i++ { - createAttributes(r, m, ext, nil, true) + c.createAttributes(r, m, ext, nil, true) } } @@ -439,7 +445,7 @@ func TestPrometheusConverter_addExemplars(t *testing.T) { // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - converter := &prometheusConverter{ + converter := &PrometheusConverter{ unique: tt.orig, } converter.addExemplars(tt.dataPoint, tt.bucketBounds) @@ -620,7 +626,7 @@ func TestAddResourceTargetInfo(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - converter := newPrometheusConverter() + converter := NewPrometheusConverter() addResourceTargetInfo(tc.resource, tc.settings, tc.timestamp, converter) @@ -765,7 +771,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addSummaryDataPoints( metric.Summary().DataPoints(), @@ -875,7 +881,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addHistogramDataPoints( metric.Histogram().DataPoints(), @@ -893,7 +899,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { } func TestPrometheusConverter_getOrCreateTimeSeries(t *testing.T) { - converter := newPrometheusConverter() + converter := NewPrometheusConverter() lbls := []prompb.Label{ { Name: "key1", diff --git a/pkg/translator/prometheusremotewrite/histograms.go b/pkg/translator/prometheusremotewrite/histograms.go index 35ec21089177..5ddc692a363d 100644 --- a/pkg/translator/prometheusremotewrite/histograms.go +++ b/pkg/translator/prometheusremotewrite/histograms.go @@ -16,11 +16,11 @@ import ( const defaultZeroThreshold = 1e-128 -func (c *prometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, +func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, resource pcommon.Resource, settings Settings, baseName string) error { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - lbls := createAttributes( + lbls := c.createAttributes( resource, pt.Attributes(), settings.ExternalLabels, diff --git a/pkg/translator/prometheusremotewrite/histograms_test.go b/pkg/translator/prometheusremotewrite/histograms_test.go index d2b3cba24ae2..1433ec09c554 100644 --- a/pkg/translator/prometheusremotewrite/histograms_test.go +++ b/pkg/translator/prometheusremotewrite/histograms_test.go @@ -738,7 +738,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() require.NoError(t, converter.addExponentialHistogramDataPoints( metric.ExponentialHistogram().DataPoints(), pcommon.NewResource(), diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw.go b/pkg/translator/prometheusremotewrite/metrics_to_prw.go index 05d62498548a..b51310464b77 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw.go @@ -27,8 +27,7 @@ type Settings struct { } // FromMetrics converts pmetric.Metrics to Prometheus remote write format. -func FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) { - c := newPrometheusConverter() +func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) { errs := c.fromMetrics(md, settings) tss := c.timeSeries() out := make(map[string]*prompb.TimeSeries, len(tss)) @@ -39,21 +38,37 @@ func FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.Time return out, errs } -// prometheusConverter converts from OTel write format to Prometheus write format. -type prometheusConverter struct { +// PrometheusConverter converts from OTel write format to Prometheus write format. +// Internally it keeps a buffer of labels to avoid expensive allocations, so it is +// best to keep it around for the lifetime of the Go process. Due to this shared +// state, PrometheusConverter is NOT thread-safe and is only intended to be used by +// a single go-routine at a time. +// Each FromMetrics call should be followed by a Reset when the metrics can be safely +// discarded. +type PrometheusConverter struct { unique map[uint64]*prompb.TimeSeries conflicts map[uint64][]*prompb.TimeSeries + labels []prompb.Label + labelsMap map[string]string } -func newPrometheusConverter() *prometheusConverter { - return &prometheusConverter{ +func NewPrometheusConverter() *PrometheusConverter { + return &PrometheusConverter{ unique: map[uint64]*prompb.TimeSeries{}, conflicts: map[uint64][]*prompb.TimeSeries{}, + labelsMap: make(map[string]string), } } +func (c *PrometheusConverter) Reset() { + clear(c.labels) + c.labels = c.labels[:0] + clear(c.unique) + clear(c.conflicts) +} + // fromMetrics converts pmetric.Metrics to Prometheus remote write format. -func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) { +func (c *PrometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) { resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { resourceMetrics := resourceMetricsSlice.At(i) @@ -132,7 +147,7 @@ func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) } // timeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. -func (c *prometheusConverter) timeSeries() []prompb.TimeSeries { +func (c *PrometheusConverter) timeSeries() []prompb.TimeSeries { conflicts := 0 for _, ts := range c.conflicts { conflicts += len(ts) @@ -164,7 +179,7 @@ func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool { // addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value, // the exemplar is added to the bucket bound's time series, provided that the time series' has samples. -func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { +func (c *PrometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { if len(bucketBounds) == 0 { return } @@ -189,7 +204,7 @@ func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, // If there is no corresponding TimeSeries already, it's created. // The corresponding TimeSeries is returned. // If either lbls is nil/empty or sample is nil, nothing is done. -func (c *prometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries { +func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries { if sample == nil || len(lbls) == 0 { // This shouldn't happen return nil diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go index f6171b2c2233..7447ea749b3f 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go @@ -34,10 +34,12 @@ func BenchmarkFromMetrics(b *testing.B) { b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries) + c := NewPrometheusConverter() for i := 0; i < b.N; i++ { - tsMap, err := FromMetrics(payload.Metrics(), Settings{}) + tsMap, err := c.FromMetrics(payload.Metrics(), Settings{}) require.NoError(b, err) require.NotNil(b, tsMap) + c.Reset() } }) } @@ -71,10 +73,11 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries) + c := NewPrometheusConverter() for i := 0; i < b.N; i++ { - converter := newPrometheusConverter() - require.NoError(b, converter.fromMetrics(payload.Metrics(), Settings{})) - require.NotNil(b, converter.timeSeries()) + require.NoError(b, c.fromMetrics(payload.Metrics(), Settings{})) + require.NotNil(b, c.timeSeries()) + c.Reset() } }) } diff --git a/pkg/translator/prometheusremotewrite/number_data_points.go b/pkg/translator/prometheusremotewrite/number_data_points.go index d128359fef80..f0dd67e56cd2 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points.go +++ b/pkg/translator/prometheusremotewrite/number_data_points.go @@ -13,11 +13,11 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) -func (c *prometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, +func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, resource pcommon.Resource, settings Settings, name string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - labels := createAttributes( + labels := c.createAttributes( resource, pt.Attributes(), settings.ExternalLabels, @@ -43,11 +43,11 @@ func (c *prometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number } } -func (c *prometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, +func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, resource pcommon.Resource, metric pmetric.Metric, settings Settings, name string) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - lbls := createAttributes( + lbls := c.createAttributes( resource, pt.Attributes(), settings.ExternalLabels, diff --git a/pkg/translator/prometheusremotewrite/number_data_points_test.go b/pkg/translator/prometheusremotewrite/number_data_points_test.go index 31dd796ae8d2..e848682d409b 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points_test.go +++ b/pkg/translator/prometheusremotewrite/number_data_points_test.go @@ -50,7 +50,7 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addGaugeNumberDataPoints( metric.Gauge().DataPoints(), @@ -224,7 +224,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addSumNumberDataPoints( metric.Sum().DataPoints(),