Skip to content

[exporter/prometheusremotewrite] Fix data race in batch series state by removing state altogether #36597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type prwExporter struct {
wal *prweWAL
exporterSettings prometheusremotewrite.Settings
telemetry prwTelemetry
batchTimeSeriesState batchTimeSeriesState
}

func newPRWTelemetry(set exporter.Settings) (prwTelemetry, error) {
Expand Down Expand Up @@ -140,7 +139,6 @@ func newPRWExporter(cfg *Config, set exporter.Settings) (*prwExporter, error) {
SendMetadata: cfg.SendMetadata,
},
telemetry: prwTelemetry,
batchTimeSeriesState: newBatchTimeSericesState(),
}

if prwe.exporterSettings.ExportCreatedMetric {
Expand Down Expand Up @@ -229,7 +227,7 @@ func (prwe *prwExporter) handleExport(ctx context.Context, tsMap map[string]*pro
}

// Calls the helper function to convert and batch the TsMap to the desired format
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m, &prwe.batchTimeSeriesState)
requests, err := batchTimeSeries(tsMap, prwe.maxBatchSizeBytes, m)
if err != nil {
return err
}
Expand Down
138 changes: 138 additions & 0 deletions exporter/prometheusremotewriteexporter/exporter_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package prometheusremotewriteexporter

import (
"context"
"io"
"net/http"
"net/http/httptest"
"strconv"
"sync"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata"
)

// Test everything works when there is more than one goroutine calling PushMetrics.
// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes.
func Test_PushMetricsConcurrent(t *testing.T) {
n := 1000
ms := make([]pmetric.Metrics, n)
testIDKey := "test_id"
for i := 0; i < n; i++ {
m := testdata.GenerateMetricsOneMetric()
dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints()
for j := 0; j < dps.Len(); j++ {
dp := dps.At(j)
dp.Attributes().PutInt(testIDKey, int64(i))
}
ms[i] = m
}
received := make(map[int]prompb.TimeSeries)
var mu sync.Mutex

server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
t.Fatal(err)
}
assert.NotNil(t, body)
// Receives the http requests and unzip, unmarshalls, and extracts TimeSeries
assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version"))
assert.Equal(t, "snappy", r.Header.Get("Content-Encoding"))
var unzipped []byte

dest, err := snappy.Decode(unzipped, body)
assert.NoError(t, err)

wr := &prompb.WriteRequest{}
ok := proto.Unmarshal(dest, wr)
assert.NoError(t, ok)
assert.Len(t, wr.Timeseries, 2)
ts := wr.Timeseries[0]
foundLabel := false
for _, label := range ts.Labels {
if label.Name == testIDKey {
id, err := strconv.Atoi(label.Value)
assert.NoError(t, err)
mu.Lock()
_, ok := received[id]
assert.False(t, ok) // fail if we already saw it
received[id] = ts
mu.Unlock()
foundLabel = true
break
}
}
assert.True(t, foundLabel)
w.WriteHeader(http.StatusOK)
}))

defer server.Close()

// Adjusted retry settings for faster testing
retrySettings := configretry.BackOffConfig{
Enabled: true,
InitialInterval: 100 * time.Millisecond, // Shorter initial interval
MaxInterval: 1 * time.Second, // Shorter max interval
MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time
}
clientConfig := confighttp.NewDefaultClientConfig()
clientConfig.Endpoint = server.URL
clientConfig.ReadBufferSize = 0
clientConfig.WriteBufferSize = 512 * 1024
cfg := &Config{
Namespace: "",
ClientConfig: clientConfig,
MaxBatchSizeBytes: 3000000,
RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1},
TargetInfo: &TargetInfo{
Enabled: true,
},
CreatedMetric: &CreatedMetric{
Enabled: false,
},
BackOffConfig: retrySettings,
}

assert.NotNil(t, cfg)
set := exportertest.NewNopSettings()
set.MetricsLevel = configtelemetry.LevelBasic

prwe, nErr := newPRWExporter(cfg, set)

require.NoError(t, nErr)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost()))
defer func() {
require.NoError(t, prwe.Shutdown(ctx))
}()

var wg sync.WaitGroup
wg.Add(n)
for _, m := range ms {
go func() {
err := prwe.PushMetrics(ctx, m)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
assert.Len(t, received, n)
}
16 changes: 14 additions & 2 deletions exporter/prometheusremotewriteexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"net/url"
Expand Down Expand Up @@ -1192,6 +1193,13 @@ func BenchmarkExecute(b *testing.B) {
}
}

func BenchmarkExecuteUnevenBatches(b *testing.B) {
benchmarkExecute(b, -1)
}

// benchmarkExecute benchmarks the execute function with a given number of samples.
// If numSample is -1, the batch size will switch between 10, 100, 1000 and 10000 samples
// for each interaction so we benchmark uneven batch sizes.
func benchmarkExecute(b *testing.B, numSample int) {
mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -1232,6 +1240,10 @@ func benchmarkExecute(b *testing.B, numSample int) {
reqs := make([]*prompb.WriteRequest, 0, b.N)
const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte"
for n := 0; n < b.N; n++ {
actualSampleSize := numSample
if numSample == -1 {
actualSampleSize = int(math.Pow(10, float64((n%4)+1)))
}
num := strings.Repeat(strconv.Itoa(n), 16)
req := &prompb.WriteRequest{
Metadata: []prompb.MetricMetadata{
Expand All @@ -1248,14 +1260,14 @@ func benchmarkExecute(b *testing.B, numSample int) {
},
Timeseries: []prompb.TimeSeries{
{
Samples: generateSamples(numSample),
Samples: generateSamples(actualSampleSize),
Labels: []prompb.Label{
{Name: "__name__", Value: "test_metric"},
{Name: "test_label_name_" + num, Value: labelValue + num},
},
},
{
Histograms: generateHistograms(numSample),
Histograms: generateHistograms(actualSampleSize),
Labels: []prompb.Label{
{Name: "__name__", Value: "test_histogram"},
{Name: "test_label_name_" + num, Value: labelValue + num},
Expand Down
52 changes: 16 additions & 36 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,37 @@ package prometheusremotewriteexporter // import "github.com/open-telemetry/opent

import (
"errors"
"math"
"sort"

"github.com/prometheus/prometheus/prompb"
)

type batchTimeSeriesState struct {
// Track batch sizes sent to avoid over allocating huge buffers.
// This helps in the case where large batches are sent to avoid allocating too much unused memory
nextTimeSeriesBufferSize int
nextMetricMetadataBufferSize int
nextRequestBufferSize int
}

func newBatchTimeSericesState() batchTimeSeriesState {
return batchTimeSeriesState{
nextTimeSeriesBufferSize: math.MaxInt,
nextMetricMetadataBufferSize: math.MaxInt,
nextRequestBufferSize: 0,
}
}

// batchTimeSeries splits series into multiple batch write requests.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata, state *batchTimeSeriesState) ([]*prompb.WriteRequest, error) {
// batchTimeSeries splits series into multiple write requests if they exceed the maxBatchByteSize.
func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int, m []*prompb.MetricMetadata) ([]*prompb.WriteRequest, error) {
if len(tsMap) == 0 {
return nil, errors.New("invalid tsMap: cannot be empty map")
}

// Allocate a buffer size of at least 10, or twice the last # of requests we sent
requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize))
// Allocate a buffer size of at least 10.
requests := make([]*prompb.WriteRequest, 0, 10)

// Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller
tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)))
// Allocate a time series buffer with the length of the input.
tsArray := make([]prompb.TimeSeries, 0, len(tsMap))
sizeOfCurrentBatch := 0

i := 0
for _, v := range tsMap {
sizeOfSeries := v.Size()
for _, series := range tsMap {
sizeOfSeries := series.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray))
wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)

tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i))
tsArray = make([]prompb.TimeSeries, 0, len(tsMap)-i)
sizeOfCurrentBatch = 0
}

tsArray = append(tsArray, *v)
tsArray = append(tsArray, *series)
sizeOfCurrentBatch += sizeOfSeries
i++
}
Expand All @@ -63,23 +45,22 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

// Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller
mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)))
// Allocate a metric metadata with the length of the input.
mArray := make([]prompb.MetricMetadata, 0, len(m))
sizeOfCurrentBatch = 0
i = 0
for _, v := range m {
sizeOfM := v.Size()
for _, metadata := range m {
sizeOfM := metadata.Size()

if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize {
state.nextMetricMetadataBufferSize = max(10, 2*len(mArray))
wrapped := convertMetadataToRequest(mArray)
requests = append(requests, wrapped)

mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i))
mArray = make([]prompb.MetricMetadata, 0, len(m)-i)
sizeOfCurrentBatch = 0
}

mArray = append(mArray, *v)
mArray = append(mArray, *metadata)
sizeOfCurrentBatch += sizeOfM
i++
}
Expand All @@ -89,7 +70,6 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

state.nextRequestBufferSize = 2 * len(requests)
return requests, nil
}

Expand Down
22 changes: 3 additions & 19 deletions exporter/prometheusremotewriteexporter/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package prometheusremotewriteexporter

import (
"math"
"testing"

"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -58,24 +57,14 @@ func Test_batchTimeSeries(t *testing.T) {
// run tests
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
state := newBatchTimeSericesState()
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil, &state)
requests, err := batchTimeSeries(tt.tsMap, tt.maxBatchByteSize, nil)
if tt.returnErr {
assert.Error(t, err)
return
}

assert.NoError(t, err)
assert.Len(t, requests, tt.numExpectedRequests)
if tt.numExpectedRequests <= 1 {
assert.Equal(t, math.MaxInt, state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 2*len(requests), state.nextRequestBufferSize)
} else {
assert.Equal(t, max(10, len(requests[len(requests)-2].Timeseries)*2), state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 2*len(requests), state.nextRequestBufferSize)
}
})
}
}
Expand All @@ -96,14 +85,10 @@ func Test_batchTimeSeriesUpdatesStateForLargeBatches(t *testing.T) {

tsMap1 := getTimeseriesMap(tsArray)

state := newBatchTimeSericesState()
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
requests, err := batchTimeSeries(tsMap1, 1000000, nil)

assert.NoError(t, err)
assert.Len(t, requests, 18)
assert.Equal(t, len(requests[len(requests)-2].Timeseries)*2, state.nextTimeSeriesBufferSize)
assert.Equal(t, math.MaxInt, state.nextMetricMetadataBufferSize)
assert.Equal(t, 36, state.nextRequestBufferSize)
}

// Benchmark_batchTimeSeries checks batchTimeSeries
Expand All @@ -129,10 +114,9 @@ func Benchmark_batchTimeSeries(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()

state := newBatchTimeSericesState()
// Run batchTimeSeries 100 times with a 1mb max request size
for i := 0; i < b.N; i++ {
requests, err := batchTimeSeries(tsMap1, 1000000, nil, &state)
requests, err := batchTimeSeries(tsMap1, 1000000, nil)
assert.NoError(b, err)
assert.Len(b, requests, 18)
}
Expand Down
Loading