Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prometheusremotewriteexporter] reduce allocations in createAttributes #35184

27 changes: 27 additions & 0 deletions .chloggen/prometheusremotewrite-optimize-createattributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an enhancement, right? Not a breaking change

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dashpole suggested this should be considered breaking, unless I misread the comment #35184 (comment).

I agree it's more of an enhancement though it is technically changing a public function (FromMetrics).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FromMetrics doesn't need to change to achieve this, does it? Rather than storing a converter pool in the PRW exporter it could be stored in the translator package and used within FromMetrics since a single invocation of that function encompasses a complete usage of the converter. Alternately, we can complete the effort that was started to formalize that interface and add a Reset() method to it, if required.

I think that should probably happen separately from optimization efforts, though. It already seems like there are several different threads being pulled at in this area. Can we work through what this API should look like to enable those desired changes, change the API, then deal with concurrency within the exporter and optimization within the translator independently?


# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusremotewriteexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: reduce allocations in createAttributes

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35184]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 9 additions & 1 deletion exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS
p.telemetryBuilder.ExporterPrometheusremotewriteTranslatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...))
}

var converterPool = sync.Pool{
New: func() any {
return prometheusremotewrite.NewPrometheusConverter()
},
}

type buffer struct {
protobuf *proto.Buffer
snappy []byte
Expand Down Expand Up @@ -191,8 +197,10 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er
case <-prwe.closeChan:
return errors.New("shutdown has been called")
default:
converter := converterPool.Get().(*prometheusremotewrite.PrometheusConverter)
defer converterPool.Put(converter)

tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings)
tsMap, err := converter.FromMetrics(md, prwe.exporterSettings)
if err != nil {
prwe.telemetry.recordTranslationFailure(ctx)
prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap)))
Expand Down
153 changes: 153 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
//go:build !race
// +build !race

// note: this test doesn't pass currently due to a race condition in batchTimeSeries
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ArthurSens I discovered a race condition when multiple goroutines run which seems unrelated to my change. The batching logic uses shared state which won't work with multiple goroutines. I added some build tags above to exclude this test until it's fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I've run your test on the main branch, and indeed, we have a data race like you said :O

I am not sure how to proceed here. To be honest, I think we need to fix the test first, so we're safe to merge this one, too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opened a PR trying to address the issue: #36524

Wanna share your opinion? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, there was a holiday and I didn't get a chance to look at this. I'll take a look this week.

// WARNING: DATA RACE
// Write at 0x00c0001e9550 by goroutine 34:
// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.batchTimeSeries()
// helper.go:92 +0xf8b
// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.(*prwExporter).handleExport()
// exporter.go:240 +0xe4
// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.(*prwExporter).PushMetrics()
// exporter.go:217 +0x70f
// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.Test_PushMetricsConcurrent.func3()
// exporter_test.go:905 +0x78

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
)

// Test everything works when there is more than one goroutine calling PushMetrics.
// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes.
func Test_PushMetricsConcurrent(t *testing.T) {
n := 1000
ms := make([]pmetric.Metrics, n)
testIDKey := "test_id"
for i := 0; i < n; i++ {
m := testdata.GenerateMetricsOneMetric()
dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
dp := dps.At(j)
dp.Attributes().PutInt(testIDKey, int64(i))
}
ms[i] = m
}
received := make(map[int]prompb.TimeSeries)
var mu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

wr := &prompb.WriteRequest{}
ok := proto.Unmarshal(dest, wr)
assert.NoError(t, ok)
assert.Len(t, wr.Timeseries, 2)
ts := wr.Timeseries[0]
foundLabel := false
for _, label := range ts.Labels {
if label.Name == testIDKey {
id, err := strconv.Atoi(label.Value)
assert.NoError(t, err)
mu.Lock()
_, ok := received[id]
assert.False(t, ok) // fail if we already saw it
received[id] = ts
mu.Unlock()
foundLabel = true
break
}
}
assert.True(t, foundLabel)
w.WriteHeader(http.StatusOK)
}))

defer server.Close()

// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
TargetInfo: &TargetInfo{
Enabled: true,
},
CreatedMetric: &CreatedMetric{
Enabled: false,
},
BackOffConfig: retrySettings,
}

assert.NotNil(t, cfg)
set := exportertest.NewNopSettings()
set.MetricsLevel = configtelemetry.LevelBasic

prwe, nErr := newPRWExporter(cfg, set)

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, prwe.Shutdown(ctx))
}()

var wg sync.WaitGroup
wg.Add(n)
for _, m := range ms {
go func() {
err := prwe.PushMetrics(ctx, m)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
assert.Len(t, received, n)
}
67 changes: 31 additions & 36 deletions pkg/translator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"slices"
"sort"
"strconv"
"strings"
"time"
"unicode/utf8"

Expand Down Expand Up @@ -104,47 +105,41 @@ var seps = []byte{'\xff'}
// createAttributes creates a slice of Prometheus Labels with OTLP attributes and pairs of string values.
// Unpaired string values are ignored. String pairs overwrite OTLP labels if collisions happen and
// if logOnOverwrite is true, the overwrite is logged. Resulting label names are sanitized.
func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
func (c *PrometheusConverter) createAttributes(resource pcommon.Resource, attributes pcommon.Map, externalLabels map[string]string,
ignoreAttrs []string, logOnOverwrite bool, extras ...string,
) []prompb.Label {
resourceAttrs := resource.Attributes()
serviceName, haveServiceName := resourceAttrs.Get(conventions.AttributeServiceName)
instance, haveInstanceID := resourceAttrs.Get(conventions.AttributeServiceInstanceID)

// Calculate the maximum possible number of labels we could return so we can preallocate l
maxLabelCount := attributes.Len() + len(externalLabels) + len(extras)/2

if haveServiceName {
maxLabelCount++
}

if haveInstanceID {
maxLabelCount++
}

// map ensures no duplicate label name
l := make(map[string]string, maxLabelCount)
l := c.labelsMap
clear(l)
Comment on lines +116 to +117
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also clear labelMap when we call reset()? I think it would be cleaner if we reset the state in one single place instead, or is there any particular reason to do it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the same reason as https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35184/files/928529a1cf587e8e5b29bd4880f2c36157eb8194#r1829677356 we want to isolate the contents of this map between calls to createAttributes, so we do that by clearing it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, thank you! Could we add a comment explaining it?


// store duplicate labels separately in a throwaway map
// assuming this is the less common case
collisions := make(map[string][]string)

// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
labels := make([]prompb.Label, 0, maxLabelCount)
// XXX: Should we always drop service namespace/service name/service instance ID from the labels
// (as they get mapped to other Prometheus labels)?
attributes.Range(func(key string, value pcommon.Value) bool {
if !slices.Contains(ignoreAttrs, key) {
labels = append(labels, prompb.Label{Name: key, Value: value.AsString()})
finalKey := prometheustranslator.NormalizeLabel(key)
if _, alreadyExists := l[finalKey]; alreadyExists {
collisions[finalKey] = append(collisions[finalKey], value.AsString())
} else {
l[finalKey] = value.AsString()
}
}
return true
})
sort.Stable(ByLabelName(labels))

for _, label := range labels {
finalKey := prometheustranslator.NormalizeLabel(label.Name)
if existingValue, alreadyExists := l[finalKey]; alreadyExists {
l[finalKey] = existingValue + ";" + label.Value
} else {
l[finalKey] = label.Value
}
for key, values := range collisions {
values = append(values, l[key])
// Ensure attributes are sorted by key for consistent merging of keys which
// collide when sanitized.
sort.Strings(values)
l[key] = strings.Join(values, ";")
}

// Map service.name + service.namespace to job
Expand Down Expand Up @@ -184,12 +179,12 @@ func createAttributes(resource pcommon.Resource, attributes pcommon.Map, externa
l[name] = extras[i+1]
}

labels = labels[:0]
startIndex := len(c.labels)
for k, v := range l {
labels = append(labels, prompb.Label{Name: k, Value: v})
c.labels = append(c.labels, prompb.Label{Name: k, Value: v})
}

return labels
return c.labels[startIndex:]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility that len(c.labels) is not 0 here? It's reset every time we call FromMetrics and I couldn't find any other place in the code where we write to this array, so why not just return c.labels and not worry about startIndex? I might be missing something but it feels like we're overcomplicating things here

for k, v := range l {
	c.labels = append(c.labels, prompb.Label{Name: k, Value: v})
}
return c.labels

Copy link
Contributor Author

@edma2 edma2 Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startIndex is important for keeping the returned slices isolated from each other while sharing the same underlying array within a single FromMetrics call. It is 0 only for the first series of a batch.

Here is how it works: FromMetrics is called once per batch, and createAttributes for every series within the batch. We want to re-use the backing array of the labels slice for all series within a single batch. We do that by appending the labels of each series to the end of the slice. Finally we return only starting from startIndex so the caller doesn't see labels from other series (while reusing the same backing array which naturally grows up to the size needed to fit a single FromMetrics call).

For example, if X1...X4 are labels from series X and Y1...Y3 are labels from series Y, then the backing array of c.labels will look like [X1, X2, X3, X4, Y1, Y2, Y3] after calling createAttributes twice (this is a simplification as the backing array will probably have excess capacity from resizing or previous calls). Meanwhile, the first call to createAttributes will have returned [X1, X2, X3, X4] and the second call returned [Y1, Y2, Y3]. On the next FromMetrics call the index is reset to 0 and we can re-use the entire array with zero allocations.

Copy link
Member

@ArthurSens ArthurSens Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thank you for the explanation :)

Now what I'm thinking is if we have tests that assure concurrency works. Mostly to make sure we don't break the non-thread-safe promise by accident

}

// isValidAggregationTemporality checks whether an OTel metric has a valid
Expand All @@ -209,13 +204,13 @@ func isValidAggregationTemporality(metric pmetric.Metric) bool {
return false
}

func (c *prometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice,
func (c *PrometheusConverter) addHistogramDataPoints(dataPoints pmetric.HistogramDataPointSlice,
resource pcommon.Resource, settings Settings, baseName string,
) {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)

// If the sum is unset, it indicates the _sum metric point should be
// omitted
Expand Down Expand Up @@ -392,13 +387,13 @@ func maxTimestamp(a, b pcommon.Timestamp) pcommon.Timestamp {
return b
}

func (c *prometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
func (c *PrometheusConverter) addSummaryDataPoints(dataPoints pmetric.SummaryDataPointSlice, resource pcommon.Resource,
settings Settings, baseName string,
) {
for x := 0; x < dataPoints.Len(); x++ {
pt := dataPoints.At(x)
timestamp := convertTimeStamp(pt.Timestamp())
baseLabels := createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)
baseLabels := c.createAttributes(resource, pt.Attributes(), settings.ExternalLabels, nil, false)

// treat sum as a sample in an individual TimeSeries
sum := &prompb.Sample{
Expand Down Expand Up @@ -466,7 +461,7 @@ func createLabels(name string, baseLabels []prompb.Label, extras ...string) []pr

// getOrCreateTimeSeries returns the time series corresponding to the label set if existent, and false.
// Otherwise it creates a new one and returns that, and true.
func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) {
func (c *PrometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*prompb.TimeSeries, bool) {
h := timeSeriesSignature(lbls)
ts := c.unique[h]
if ts != nil {
Expand Down Expand Up @@ -502,7 +497,7 @@ func (c *prometheusConverter) getOrCreateTimeSeries(lbls []prompb.Label) (*promp
// addTimeSeriesIfNeeded adds a corresponding time series if it doesn't already exist.
// If the time series doesn't already exist, it gets added with startTimestamp for its value and timestamp for its timestamp,
// both converted to milliseconds.
func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) {
func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTimestamp pcommon.Timestamp, timestamp pcommon.Timestamp) {
ts, created := c.getOrCreateTimeSeries(lbls)
if created {
ts.Samples = []prompb.Sample{
Expand All @@ -516,7 +511,7 @@ func (c *prometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi
}

// addResourceTargetInfo converts the resource to the target info metric.
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *prometheusConverter) {
func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timestamp pcommon.Timestamp, converter *PrometheusConverter) {
if settings.DisableTargetInfo || timestamp == 0 {
return
}
Expand Down Expand Up @@ -544,7 +539,7 @@ func addResourceTargetInfo(resource pcommon.Resource, settings Settings, timesta
name = settings.Namespace + "_" + name
}

labels := createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name)
labels := converter.createAttributes(resource, attributes, settings.ExternalLabels, identifyingAttrs, false, model.MetricNameLabel, name)
haveIdentifier := false
for _, l := range labels {
if l.Name == model.JobLabel || l.Name == model.InstanceLabel {
Expand Down
Loading
Loading