diff --git a/.chloggen/prometheusremotewrite-optimize-createattributes.yaml b/.chloggen/prometheusremotewrite-optimize-createattributes.yaml new file mode 100644 index 0000000000000..e98e09b17fd22 --- /dev/null +++ b/.chloggen/prometheusremotewrite-optimize-createattributes.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'breaking' + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: reduce allocations in createAttributes + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35184] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 1a9b9a31f8987..72e204d9f0284 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -64,6 +64,12 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...)) } +var converterPool = sync.Pool{ + New: func() any { + return prometheusremotewrite.NewPrometheusConverter() + }, +} + type buffer struct { protobuf *proto.Buffer snappy []byte @@ -211,9 +217,14 @@ func (prwe *prwExporter) Shutdown(context.Context) error { } func (prwe *prwExporter) pushMetricsV1(ctx context.Context, md pmetric.Metrics) error { - tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) + converter := converterPool.Get().(*prometheusremotewrite.PrometheusConverter) + converter.Reset() + defer converterPool.Put(converter) + + err := converter.Convert(md, prwe.exporterSettings) + tss := converter.TimeSeries() - prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap)) + prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tss)) var m []*prompb.MetricMetadata if prwe.exporterSettings.SendMetadata { @@ -221,10 +232,11 @@ func (prwe *prwExporter) pushMetricsV1(ctx context.Context, md pmetric.Metrics) } if err != nil { prwe.telemetry.recordTranslationFailure(ctx) - prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap))) + prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tss))) } + // Call export even if a conversion error, since there may be points that were successfully converted. - return prwe.handleExport(ctx, tsMap, m) + return prwe.handleExport(ctx, tss, m) } // PushMetrics converts metrics to Prometheus remote write TimeSeries and send to remote endpoint. It maintain a map of @@ -269,16 +281,16 @@ func validateAndSanitizeExternalLabels(cfg *Config) (map[string]string, error) { return sanitizedLabels, nil } -func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*prompb.TimeSeries, m []*prompb.MetricMetadata) error { +func (prwe *prwExporter) handleExport(ctx context.Context, tss []prompb.TimeSeries, m []*prompb.MetricMetadata) error { // There are no metrics to export, so return. - if len(tsMap) == 0 { + if len(tss) == 0 { return nil } state := prwe.batchStatePool.Get().(*batchTimeSeriesState) defer prwe.batchStatePool.Put(state) // Calls the helper function to convert and batch the TsMap to the desired format - requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, state) + requests, err := batchTimeSeries(tss, prwe.maxBatchSizeBytes, m, state) if err != nil { return err } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 62cf5511818b2..d53edbe34c5d3 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -282,7 +282,7 @@ func Test_export(t *testing.T) { assert.Len(t, writeReq.Timeseries, 1) require.NotNil(t, writeReq.GetTimeseries()) - assert.Equal(t, *ts1, writeReq.GetTimeseries()[0]) + assert.Equal(t, ts1, writeReq.GetTimeseries()[0]) w.WriteHeader(code) } @@ -297,21 +297,21 @@ func Test_export(t *testing.T) { }{ { "success_case", - *ts1, + ts1, true, http.StatusAccepted, false, }, { "server_no_response_case", - *ts1, + ts1, false, http.StatusAccepted, true, }, { "error_status_code_case", - *ts1, + ts1, true, http.StatusForbidden, true, @@ -331,7 +331,7 @@ func Test_export(t *testing.T) { if !tt.serverUp { server.Close() } - err := runExportPipeline(ts1, serverURL) + err := runExportPipeline(&ts1, serverURL) if tt.returnErrorOnCreate { assert.Error(t, err) return @@ -353,9 +353,9 @@ func TestNoMetricsNoError(t *testing.T) { func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error { // First we will construct a TimeSeries array from the testutils package - testmap := make(map[string]*prompb.TimeSeries) + tss := []prompb.TimeSeries{} if ts != nil { - testmap["test"] = ts + tss = append(tss, *ts) } cfg := createDefaultConfig().(*Config) @@ -384,7 +384,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error { return err } - return prwe.handleExport(context.Background(), testmap, nil) + return prwe.handleExport(context.Background(), tss, nil) } // Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as @@ -998,19 +998,16 @@ func TestWALOnExporterRoundTrip(t *testing.T) { }) require.NotNil(t, prwe.wal) - ts1 := &prompb.TimeSeries{ + ts1 := prompb.TimeSeries{ Labels: []prompb.Label{{Name: "ts1l1", Value: "ts1k1"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 100}}, } - ts2 := &prompb.TimeSeries{ + ts2 := prompb.TimeSeries{ Labels: []prompb.Label{{Name: "ts2l1", Value: "ts2k1"}}, Samples: []prompb.Sample{{Value: 2, Timestamp: 200}}, } - tsMap := map[string]*prompb.TimeSeries{ - "timeseries1": ts1, - "timeseries2": ts2, - } - errs := prwe.handleExport(ctx, tsMap, nil) + tss := []prompb.TimeSeries{ts1, ts2} + errs := prwe.handleExport(ctx, tss, nil) assert.NoError(t, errs) // Shutdown after we've written to the WAL. This ensures that our // exported data in-flight will be flushed to the WAL before exiting. @@ -1046,9 +1043,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) { gotFromWAL := reqs[0] assert.Len(t, gotFromWAL.Timeseries, 2) want := &prompb.WriteRequest{ - Timeseries: orderBySampleTimestamp([]prompb.TimeSeries{ - *ts1, *ts2, - }), + Timeseries: orderBySampleTimestamp([]prompb.TimeSeries{ts1, ts2}), } // Even after sorting timeseries, we need to sort them diff --git a/exporter/prometheusremotewriteexporter/helper.go b/exporter/prometheusremotewriteexporter/helper.go index e073099b98e07..e6445ffaf68e4 100644 --- a/exporter/prometheusremotewriteexporter/helper.go +++ b/exporter/prometheusremotewriteexporter/helper.go @@ -28,20 +28,20 @@ func newBatchTimeServicesState() *batchTimeSeriesState { } // batchTimeSeries splits series into multiple batch write requests. -func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata, state *batchTimeSeriesState) ([]*prompb.WriteRequest, error) { - if len(tsMap) == 0 { - return nil, errors.New("invalid tsMap: cannot be empty map") +func batchTimeSeries(tss []prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata, state *batchTimeSeriesState) ([]*prompb.WriteRequest, error) { + if len(tss) == 0 { + return nil, errors.New("invalid tss: cannot be empty slice") } // Allocate a buffer size of at least 10, or twice the last # of requests we sent requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize)) // Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller - tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap))) + tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tss))) sizeOfCurrentBatch := 0 i := 0 - for _, v := range tsMap { + for _, v := range tss { sizeOfSeries := v.Size() if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize { @@ -49,11 +49,11 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, wrapped := convertTimeseriesToRequest(tsArray) requests = append(requests, wrapped) - tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i)) + tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tss)-i)) sizeOfCurrentBatch = 0 } - tsArray = append(tsArray, *v) + tsArray = append(tsArray, v) sizeOfCurrentBatch += sizeOfSeries i++ } diff --git a/exporter/prometheusremotewriteexporter/helper_test.go b/exporter/prometheusremotewriteexporter/helper_test.go index 46a61735a1c71..fb17aa284d833 100644 --- a/exporter/prometheusremotewriteexporter/helper_test.go +++ b/exporter/prometheusremotewriteexporter/helper_test.go @@ -22,34 +22,34 @@ func Test_batchTimeSeries(t *testing.T) { ts1 := getTimeSeries(labels, sample1, sample2) ts2 := getTimeSeries(labels, sample1, sample2, sample3) - tsMap1 := getTimeseriesMap([]*prompb.TimeSeries{}) - tsMap2 := getTimeseriesMap([]*prompb.TimeSeries{ts1}) - tsMap3 := getTimeseriesMap([]*prompb.TimeSeries{ts1, ts2}) + tss1 := []prompb.TimeSeries{} + tss2 := []prompb.TimeSeries{ts1} + tss3 := []prompb.TimeSeries{ts1, ts2} tests := []struct { name string - tsMap map[string]*prompb.TimeSeries + tss []prompb.TimeSeries maxBatchByteSize int numExpectedRequests int returnErr bool }{ { "no_timeseries", - tsMap1, + tss1, 100, -1, true, }, { "normal_case", - tsMap2, + tss2, 300, 1, false, }, { "two_requests", - tsMap3, + tss3, 300, 2, false, @@ -59,7 +59,7 @@ func Test_batchTimeSeries(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { state := newBatchTimeServicesState() - requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, state) + requests, err := batchTimeSeries(tt.tss, tt.maxBatchByteSize, nil, state) if tt.returnErr { assert.Error(t, err) return @@ -88,16 +88,14 @@ func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) { // Benchmark for large data sizes // First allocate 100k time series - tsArray := make([]*prompb.TimeSeries, 0, 100000) + tsArray := make([]prompb.TimeSeries, 0, 100000) for i := 0; i < 100000; i++ { ts := getTimeSeries(labels, sample1, sample2, sample3) tsArray = append(tsArray, ts) } - tsMap1 := getTimeseriesMap(tsArray) - state := newBatchTimeServicesState() - requests, err := batchTimeSeries(tsMap1, 1000000, nil, state) + requests, err := batchTimeSeries(tsArray, 1000000, nil, state) assert.NoError(t, err) assert.Len(t, requests, 18) @@ -118,21 +116,19 @@ func Benchmark_batchTimeSeries(b *testing.B) { // Benchmark for large data sizes // First allocate 100k time series - tsArray := make([]*prompb.TimeSeries, 0, 100000) + tsArray := make([]prompb.TimeSeries, 0, 100000) for i := 0; i < 100000; i++ { ts := getTimeSeries(labels, sample1, sample2, sample3) tsArray = append(tsArray, ts) } - tsMap1 := getTimeseriesMap(tsArray) - b.ReportAllocs() b.ResetTimer() state := newBatchTimeServicesState() // Run batchTimeSeries 100 times with a 1mb max request size for i := 0; i < b.N; i++ { - requests, err := batchTimeSeries(tsMap1, 1000000, nil, state) + requests, err := batchTimeSeries(tsArray, 1000000, nil, state) assert.NoError(b, err) assert.Len(b, requests, 18) } diff --git a/exporter/prometheusremotewriteexporter/testutil_test.go b/exporter/prometheusremotewriteexporter/testutil_test.go index 11caf1af0cefe..88610eba9b517 100644 --- a/exporter/prometheusremotewriteexporter/testutil_test.go +++ b/exporter/prometheusremotewriteexporter/testutil_test.go @@ -4,7 +4,6 @@ package prometheusremotewriteexporter import ( - "fmt" "strings" "time" @@ -164,8 +163,8 @@ func getSample(v float64, t int64) prompb.Sample { } } -func getTimeSeries(labels []prompb.Label, samples ...prompb.Sample) *prompb.TimeSeries { - return &prompb.TimeSeries{ +func getTimeSeries(labels []prompb.Label, samples ...prompb.Sample) prompb.TimeSeries { + return prompb.TimeSeries{ Labels: labels, Samples: samples, } @@ -383,11 +382,3 @@ func getQuantiles(bounds []float64, values []float64) pmetric.SummaryDataPointVa return quantiles } - -func getTimeseriesMap(timeseries []*prompb.TimeSeries) map[string]*prompb.TimeSeries { - tsMap := make(map[string]*prompb.TimeSeries) - for i, v := range timeseries { - tsMap[fmt.Sprintf("%s%d", "timeseries_name", i)] = v - } - return tsMap -} diff --git a/exporter/prometheusremotewriteexporter/wal_test.go b/exporter/prometheusremotewriteexporter/wal_test.go index 821a3975e45c2..c90f5e8e52e6a 100644 --- a/exporter/prometheusremotewriteexporter/wal_test.go +++ b/exporter/prometheusremotewriteexporter/wal_test.go @@ -206,8 +206,8 @@ func TestExportWithWALEnabled(t *testing.T) { require.NoError(t, err) assert.NotNil(t, prwe.client) - metrics := map[string]*prompb.TimeSeries{ - "test_metric": { + metrics := []prompb.TimeSeries{ + { Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 100}}, }, diff --git a/pkg/translator/prometheusremotewrite/helper.go b/pkg/translator/prometheusremotewrite/helper.go index 0f7d5cf08049d..ecfaff21ca620 100644 --- a/pkg/translator/prometheusremotewrite/helper.go +++ b/pkg/translator/prometheusremotewrite/helper.go @@ -11,6 +11,7 @@ import ( "slices" "sort" "strconv" + "strings" "time" "unicode/utf8" @@ -105,51 +106,44 @@ var seps = []byte{'\xff'} // createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values. // Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and // if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized. -func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, +func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string, ignoreAttrs []string, logOnOverwrite bool, extras ...string, ) []prompb.Label { resourceAttrs := resource.Attributes() serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName) instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID) - // Calculate the maximum possible number of labels we could return so we can preallocate l - maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2 - - if haveServiceName { - maxLabelCount++ - } - - if haveInstanceID { - maxLabelCount++ - } - // map ensures no duplicate label name - l := make(map[string]string, maxLabelCount) + l := c.labelsMap + clear(l) + + // store duplicate labels separately in a throwaway map + // assuming this is the less common case + collisions := make(map[string][]string) - // Ensure attributes are sorted by key for consistent merging of keys which - // collide when sanitized. - labels := make([]prompb.Label, 0, maxLabelCount) // XXX: Should we always drop service namespace/service name/service instance ID from the labels // (as they get mapped to other Prometheus labels)? for key, value := range attributes.All() { if !slices.Contains(ignoreAttrs, key) { - labels = append(labels, prompb.Label{Name: key, Value: value.AsString()}) - } - } - sort.Stable(ByLabelName(labels)) - - for _, label := range labels { - finalKey := prometheustranslator.NormalizeLabel(label.Name) - if existingValue, alreadyExists := l[finalKey]; alreadyExists { - // Only append to existing value if the new value is different - if existingValue != label.Value { - l[finalKey] = existingValue + ";" + label.Value + finalKey := prometheustranslator.NormalizeLabel(key) + if existingValue, alreadyExists := l[finalKey]; alreadyExists { + if existingValue != value.AsString() { + collisions[finalKey] = append(collisions[finalKey], value.AsString()) + } + } else { + l[finalKey] = value.AsString() } - } else { - l[finalKey] = label.Value } } + for key, values := range collisions { + values = append(values, l[key]) + // Ensure attributes are sorted by key for consistent merging of keys which + // collide when sanitized. + sort.Strings(values) + l[key] = strings.Join(values, ";") + } + // Map service.name + service.namespace to job if haveServiceName { val := serviceName.AsString() @@ -187,12 +181,12 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa l[name] = extras[i+1] } - labels = labels[:0] + startIndex := len(c.labels) for k, v := range l { - labels = append(labels, prompb.Label{Name: k, Value: v}) + c.labels = append(c.labels, prompb.Label{Name: k, Value: v}) } - return labels + return c.labels[startIndex:] } // isValidAggregationTemporality checks whether an OTel metric has a valid @@ -212,13 +206,13 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool { return false } -func (c *prometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, +func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice, resource pcommon.Resource, settings Settings, baseName string, ) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) // If the sum is unset, it indicates the _sum metric point should be // omitted @@ -389,13 +383,13 @@ func mostRecentTimestampInMetric(metric pmetric.Metric) pcommon.Timestamp { return ts } -func (c *prometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, +func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource, settings Settings, baseName string, ) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) timestamp := convertTimeStamp(pt.Timestamp()) - baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) + baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false) // treat sum as a sample in an individual TimeSeries sum := &prompb.Sample{ @@ -457,7 +451,7 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr // getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false. // Otherwise it creates a new one and returns that, and true. -func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) { +func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) { h := timeSeriesSignature(lbls) ts := c.unique[h] if ts != nil { @@ -491,7 +485,7 @@ func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*promp } // addResourceTargetInfo converts the resource to the target info metric. -func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *prometheusConverter) { +func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) { if settings.DisableTargetInfo || timestamp == 0 { return } @@ -519,7 +513,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta name = settings.Namespace + "_" + name } - labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) + labels := converter.createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name) haveIdentifier := false for _, l := range labels { if l.Name == model.JobLabel || l.Name == model.InstanceLabel { diff --git a/pkg/translator/prometheusremotewrite/helper_test.go b/pkg/translator/prometheusremotewrite/helper_test.go index d3474d88bb213..2b73dbb9723b3 100644 --- a/pkg/translator/prometheusremotewrite/helper_test.go +++ b/pkg/translator/prometheusremotewrite/helper_test.go @@ -117,7 +117,7 @@ func TestPrometheusConverter_addSample(t *testing.T) { } t.Run("empty_case", func(t *testing.T) { - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addSample(nil, nil) assert.Empty(t, converter.unique) assert.Empty(t, converter.conflicts) @@ -161,7 +161,7 @@ func TestPrometheusConverter_addSample(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addSample(&tt.testCase[0].sample, tt.testCase[0].labels) converter.addSample(&tt.testCase[1].sample, tt.testCase[1].labels) assert.Exactly(t, tt.want, converter.unique) @@ -369,8 +369,9 @@ func Test_createLabelSet(t *testing.T) { } // run tests for _, tt := range tests { + c := NewPrometheusConverter() t.Run(tt.name, func(t *testing.T) { - assert.ElementsMatch(t, tt.want, createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...)) + assert.ElementsMatch(t, tt.want, c.createAttributes(tt.resource, tt.orig, tt.externalLabels, nil, true, tt.extras...)) }) } } @@ -385,10 +386,15 @@ func BenchmarkCreateAttributes(b *testing.B) { m.PutInt("test-int-key", 123) m.PutBool("test-bool-key", true) + c := NewPrometheusConverter() + // preallocate slice to simulate a fully-grown buffer + c.labels = make([]prompb.Label, 0, b.N*m.Len()) + b.ReportAllocs() b.ResetTimer() + for i := 0; i < b.N; i++ { - createAttributes(r, m, ext, nil, true) + c.createAttributes(r, m, ext, nil, true) } } @@ -449,7 +455,7 @@ func TestPrometheusConverter_addExemplars(t *testing.T) { // run tests for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - converter := &prometheusConverter{ + converter := &PrometheusConverter{ unique: tt.orig, } converter.addExemplars(tt.dataPoint, tt.bucketBounds) @@ -681,12 +687,12 @@ func TestAddResourceTargetInfo(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - converter := newPrometheusConverter() + converter := NewPrometheusConverter() addResourceTargetInfo(tc.resource, tc.settings, tc.timestamp, converter) if len(tc.wantLabels) == 0 || tc.settings.DisableTargetInfo { - assert.Empty(t, converter.timeSeries()) + assert.Empty(t, converter.TimeSeries()) return } @@ -857,7 +863,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { defer testutil.SetFeatureGateForTest(t, exportCreatedMetricGate, oldValue) metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addSummaryDataPoints( metric.Summary().DataPoints(), @@ -997,7 +1003,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { defer testutil.SetFeatureGateForTest(t, exportCreatedMetricGate, oldValue) metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addHistogramDataPoints( metric.Histogram().DataPoints(), @@ -1013,7 +1019,7 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { } func TestPrometheusConverter_getOrCreateTimeSeries(t *testing.T) { - converter := newPrometheusConverter() + converter := NewPrometheusConverter() lbls := []prompb.Label{ { Name: "key1", diff --git a/pkg/translator/prometheusremotewrite/histograms.go b/pkg/translator/prometheusremotewrite/histograms.go index 4ac4d58a96d41..e9c9de785489c 100644 --- a/pkg/translator/prometheusremotewrite/histograms.go +++ b/pkg/translator/prometheusremotewrite/histograms.go @@ -16,12 +16,12 @@ import ( const defaultZeroThreshold = 1e-128 -func (c *prometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, +func (c *PrometheusConverter) addExponentialHistogramDataPoints(dataPoints pmetric.ExponentialHistogramDataPointSlice, resource pcommon.Resource, settings Settings, baseName string, ) error { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - lbls := createAttributes( + lbls := c.createAttributes( resource, pt.Attributes(), settings.ExternalLabels, diff --git a/pkg/translator/prometheusremotewrite/histograms_test.go b/pkg/translator/prometheusremotewrite/histograms_test.go index d2b3cba24ae24..1433ec09c5541 100644 --- a/pkg/translator/prometheusremotewrite/histograms_test.go +++ b/pkg/translator/prometheusremotewrite/histograms_test.go @@ -738,7 +738,7 @@ func TestPrometheusConverter_addExponentialHistogramDataPoints(t *testing.T) { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() require.NoError(t, converter.addExponentialHistogramDataPoints( metric.ExponentialHistogram().DataPoints(), pcommon.NewResource(), diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw.go b/pkg/translator/prometheusremotewrite/metrics_to_prw.go index 2729880e3238d..966c0a41ffcb8 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "sort" - "strconv" "github.com/prometheus/prometheus/prompb" "go.opentelemetry.io/collector/pdata/pcommon" @@ -25,34 +24,39 @@ type Settings struct { SendMetadata bool } -// FromMetrics converts pmetric.Metrics to Prometheus remote write format. -func FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) { - c := newPrometheusConverter() - errs := c.fromMetrics(md, settings) - tss := c.timeSeries() - out := make(map[string]*prompb.TimeSeries, len(tss)) - for i := range tss { - out[strconv.Itoa(i)] = &tss[i] - } - - return out, errs -} - -// prometheusConverter converts from OTel write format to Prometheus write format. -type prometheusConverter struct { +// PrometheusConverter converts from OTel write format to Prometheus write format. +// +// Internally it keeps a buffer of labels to avoid expensive allocations, so it is +// best to keep it around for the lifetime of the Go process. Due to this shared +// state, PrometheusConverter is NOT thread-safe and is only intended to be used by +// a single go-routine at a time. To support thread-safe concurrent access to a pool of +// converters, use a sync.Pool. +type PrometheusConverter struct { unique map[uint64]*prompb.TimeSeries conflicts map[uint64][]*prompb.TimeSeries + labels []prompb.Label + labelsMap map[string]string } -func newPrometheusConverter() *prometheusConverter { - return &prometheusConverter{ +func NewPrometheusConverter() *PrometheusConverter { + return &PrometheusConverter{ unique: map[uint64]*prompb.TimeSeries{}, conflicts: map[uint64][]*prompb.TimeSeries{}, + labelsMap: make(map[string]string), } } -// fromMetrics converts pmetric.Metrics to Prometheus remote write format. -func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) (errs error) { +// Reset clears the internal state of the PrometheusConverter. +// If is only safe to reset when the previously returned value of TimeSeries() is no longer needed. +func (c *PrometheusConverter) Reset() { + clear(c.labels) + c.labels = c.labels[:0] + clear(c.unique) + clear(c.conflicts) +} + +// Convert converts pmetric.Metrics to Prometheus remote write format. +func (c *PrometheusConverter) Convert(md pmetric.Metrics, settings Settings) (errs error) { resourceMetricsSlice := md.ResourceMetrics() for i := 0; i < resourceMetricsSlice.Len(); i++ { resourceMetrics := resourceMetricsSlice.At(i) @@ -130,8 +134,8 @@ func (c *prometheusConverter) fromMetrics(md pmetric.Metrics, settings Settings) return } -// timeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. -func (c *prometheusConverter) timeSeries() []prompb.TimeSeries { +// TimeSeries returns a slice of the prompb.TimeSeries that were converted from OTel format. +func (c *PrometheusConverter) TimeSeries() []prompb.TimeSeries { conflicts := 0 for _, ts := range c.conflicts { conflicts += len(ts) @@ -163,7 +167,7 @@ func isSameMetric(ts *prompb.TimeSeries, lbls []prompb.Label) bool { // addExemplars adds exemplars for the dataPoint. For each exemplar, if it can find a bucket bound corresponding to its value, // the exemplar is added to the bucket bound's time series, provided that the time series' has samples. -func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { +func (c *PrometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, bucketBounds []bucketBoundsData) { if len(bucketBounds) == 0 { return } @@ -188,7 +192,7 @@ func (c *prometheusConverter) addExemplars(dataPoint pmetric.HistogramDataPoint, // If there is no corresponding TimeSeries already, it's created. // The corresponding TimeSeries is returned. // If either lbls is nil/empty or sample is nil, nothing is done. -func (c *prometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries { +func (c *PrometheusConverter) addSample(sample *prompb.Sample, lbls []prompb.Label) *prompb.TimeSeries { if sample == nil || len(lbls) == 0 { // This shouldn't happen return nil diff --git a/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go b/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go index e79bd24f49d4a..ed4450f37e258 100644 --- a/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go +++ b/pkg/translator/prometheusremotewrite/metrics_to_prw_test.go @@ -36,10 +36,12 @@ func BenchmarkFromMetrics(b *testing.B) { b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries, pcommon.Timestamp(uint64(time.Now().UnixNano()))) + c := NewPrometheusConverter() for i := 0; i < b.N; i++ { - tsMap, err := FromMetrics(payload.Metrics(), Settings{}) + err := c.Convert(payload.Metrics(), Settings{}) require.NoError(b, err) - require.NotNil(b, tsMap) + tss := c.TimeSeries() + require.NotNil(b, tss) } }) } @@ -73,10 +75,11 @@ func BenchmarkPrometheusConverter_FromMetrics(b *testing.B) { b.Run(fmt.Sprintf("exemplars per series: %v", exemplarsPerSeries), func(b *testing.B) { payload := createExportRequest(resourceAttributeCount, histogramCount, nonHistogramCount, labelsPerMetric, exemplarsPerSeries, pcommon.Timestamp(uint64(time.Now().UnixNano()))) + c := NewPrometheusConverter() for i := 0; i < b.N; i++ { - converter := newPrometheusConverter() - require.NoError(b, converter.fromMetrics(payload.Metrics(), Settings{})) - require.NotNil(b, converter.timeSeries()) + require.NoError(b, c.Convert(payload.Metrics(), Settings{})) + require.NotNil(b, c.TimeSeries()) + c.Reset() } }) } diff --git a/pkg/translator/prometheusremotewrite/number_data_points.go b/pkg/translator/prometheusremotewrite/number_data_points.go index 7376fc57da684..ccefb757a3c98 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points.go +++ b/pkg/translator/prometheusremotewrite/number_data_points.go @@ -13,12 +13,12 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" ) -func (c *prometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, +func (c *PrometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, resource pcommon.Resource, settings Settings, name string, ) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - labels := createAttributes( + labels := c.createAttributes( resource, pt.Attributes(), settings.ExternalLabels, @@ -44,12 +44,12 @@ func (c *prometheusConverter) addGaugeNumberDataPoints(dataPoints pmetric.Number } } -func (c *prometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, +func (c *PrometheusConverter) addSumNumberDataPoints(dataPoints pmetric.NumberDataPointSlice, resource pcommon.Resource, _ pmetric.Metric, settings Settings, name string, ) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - lbls := createAttributes( + lbls := c.createAttributes( resource, pt.Attributes(), settings.ExternalLabels, diff --git a/pkg/translator/prometheusremotewrite/number_data_points_test.go b/pkg/translator/prometheusremotewrite/number_data_points_test.go index f9eb51f6cfe70..3bfa4e02ea887 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points_test.go +++ b/pkg/translator/prometheusremotewrite/number_data_points_test.go @@ -51,7 +51,7 @@ func TestPrometheusConverter_addGaugeNumberDataPoints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addGaugeNumberDataPoints( metric.Gauge().DataPoints(), @@ -217,7 +217,7 @@ func TestPrometheusConverter_addSumNumberDataPoints(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { metric := tt.metric() - converter := newPrometheusConverter() + converter := NewPrometheusConverter() converter.addSumNumberDataPoints( metric.Sum().DataPoints(), diff --git a/pkg/translator/prometheusremotewrite/number_data_points_v2.go b/pkg/translator/prometheusremotewrite/number_data_points_v2.go index e84cfefa1cc71..6f31b95564f4d 100644 --- a/pkg/translator/prometheusremotewrite/number_data_points_v2.go +++ b/pkg/translator/prometheusremotewrite/number_data_points_v2.go @@ -20,7 +20,7 @@ func (c *prometheusConverterV2) addGaugeNumberDataPoints(dataPoints pmetric.Numb for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - labels := createAttributes( + labels := NewPrometheusConverter().createAttributes( resource, pt.Attributes(), settings.ExternalLabels, @@ -52,7 +52,7 @@ func (c *prometheusConverterV2) addSumNumberDataPoints(dataPoints pmetric.Number ) { for x := 0; x < dataPoints.Len(); x++ { pt := dataPoints.At(x) - lbls := createAttributes( + lbls := NewPrometheusConverter().createAttributes( resource, pt.Attributes(), settings.ExternalLabels,