From f18439337d9be7d32597fc61a93dd118ae9d9955 Mon Sep 17 00:00:00 2001 From: Eugene Ma Date: Wed, 20 Nov 2024 12:59:48 -0800 Subject: [PATCH] move test to a separate file due to unrelated race condition --- .../exporter_concurrency_test.go | 153 ++++++++++++++++++ .../exporter_test.go | 109 ------------- 2 files changed, 153 insertions(+), 109 deletions(-) create mode 100644 exporter/prometheusremotewriteexporter/exporter_concurrency_test.go diff --git a/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go b/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go new file mode 100644 index 000000000000..29d834af03a6 --- /dev/null +++ b/exporter/prometheusremotewriteexporter/exporter_concurrency_test.go @@ -0,0 +1,153 @@ +//go:build !race +// +build !race + +// note: this test doesn't pass currently due to a race condition in batchTimeSeries +// WARNING: DATA RACE +// Write at 0x00c0001e9550 by goroutine 34: +// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.batchTimeSeries() +// helper.go:92 +0xf8b +// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.(*prwExporter).handleExport() +// exporter.go:240 +0xe4 +// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.(*prwExporter).PushMetrics() +// exporter.go:217 +0x70f +// github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter.Test_PushMetricsConcurrent.func3() +// exporter_test.go:905 +0x78 + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package prometheusremotewriteexporter + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strconv" + "sync" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" +) + +// Test everything works when there is more than one goroutine calling PushMetrics. +// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes. +func Test_PushMetricsConcurrent(t *testing.T) { + n := 1000 + ms := make([]pmetric.Metrics, n) + testIDKey := "test_id" + for i := 0; i < n; i++ { + m := testdata.GenerateMetricsOneMetric() + dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints() + for j := 0; j < dps.Len(); j++ { + dp := dps.At(j) + dp.Attributes().PutInt(testIDKey, int64(i)) + } + ms[i] = m + } + received := make(map[int]prompb.TimeSeries) + var mu sync.Mutex + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err != nil { + t.Fatal(err) + } + assert.NotNil(t, body) + // Receives the http requests and unzip, unmarshalls, and extracts TimeSeries + assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version")) + assert.Equal(t, "snappy", r.Header.Get("Content-Encoding")) + var unzipped []byte + + dest, err := snappy.Decode(unzipped, body) + assert.NoError(t, err) + + wr := &prompb.WriteRequest{} + ok := proto.Unmarshal(dest, wr) + assert.NoError(t, ok) + assert.Len(t, wr.Timeseries, 2) + ts := wr.Timeseries[0] + foundLabel := false + for _, label := range ts.Labels { + if label.Name == testIDKey { + id, err := strconv.Atoi(label.Value) + assert.NoError(t, err) + mu.Lock() + _, ok := received[id] + assert.False(t, ok) // fail if we already saw it + received[id] = ts + mu.Unlock() + foundLabel = true + break + } + } + assert.True(t, foundLabel) + w.WriteHeader(http.StatusOK) + })) + + defer server.Close() + + // Adjusted retry settings for faster testing + retrySettings := configretry.BackOffConfig{ + Enabled: true, + InitialInterval: 100 * time.Millisecond, // Shorter initial interval + MaxInterval: 1 * time.Second, // Shorter max interval + MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time + } + clientConfig := confighttp.NewDefaultClientConfig() + clientConfig.Endpoint = server.URL + clientConfig.ReadBufferSize = 0 + clientConfig.WriteBufferSize = 512 * 1024 + cfg := &Config{ + Namespace: "", + ClientConfig: clientConfig, + MaxBatchSizeBytes: 3000000, + RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1}, + TargetInfo: &TargetInfo{ + Enabled: true, + }, + CreatedMetric: &CreatedMetric{ + Enabled: false, + }, + BackOffConfig: retrySettings, + } + + assert.NotNil(t, cfg) + set := exportertest.NewNopSettings() + set.MetricsLevel = configtelemetry.LevelBasic + + prwe, nErr := newPRWExporter(cfg, set) + + require.NoError(t, nErr) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost())) + defer func() { + require.NoError(t, prwe.Shutdown(ctx)) + }() + + var wg sync.WaitGroup + wg.Add(n) + for _, m := range ms { + go func() { + err := prwe.PushMetrics(ctx, m) + assert.NoError(t, err) + wg.Done() + }() + } + wg.Wait() + assert.Len(t, received, n) +} diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index d0fbe4225637..5fbe3ef237ab 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -802,115 +802,6 @@ func Test_PushMetrics(t *testing.T) { } } -// Test everything works when there is more than one goroutine calling PushMetrics. -// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes. -func Test_PushMetricsConcurrent(t *testing.T) { - n := 1000 - ms := make([]pmetric.Metrics, n) - testIDKey := "test_id" - for i := 0; i < n; i++ { - m := testdata.GenerateMetricsOneMetric() - dps := m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints() - for j := 0; j < dps.Len(); j++ { - dp := dps.At(j) - dp.Attributes().PutInt(testIDKey, int64(i)) - } - ms[i] = m - } - received := make(map[int]prompb.TimeSeries) - var mu sync.Mutex - - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - body, err := io.ReadAll(r.Body) - if err != nil { - t.Fatal(err) - } - assert.NotNil(t, body) - // Receives the http requests and unzip, unmarshalls, and extracts TimeSeries - assert.Equal(t, "0.1.0", r.Header.Get("X-Prometheus-Remote-Write-Version")) - assert.Equal(t, "snappy", r.Header.Get("Content-Encoding")) - var unzipped []byte - - dest, err := snappy.Decode(unzipped, body) - assert.NoError(t, err) - - wr := &prompb.WriteRequest{} - ok := proto.Unmarshal(dest, wr) - assert.NoError(t, ok) - assert.Len(t, wr.Timeseries, 2) - ts := wr.Timeseries[0] - foundLabel := false - for _, label := range ts.Labels { - if label.Name == testIDKey { - id, err := strconv.Atoi(label.Value) - assert.NoError(t, err) - mu.Lock() - _, ok := received[id] - assert.False(t, ok) // fail if we already saw it - received[id] = ts - mu.Unlock() - foundLabel = true - break - } - } - assert.True(t, foundLabel) - w.WriteHeader(http.StatusOK) - })) - - defer server.Close() - - // Adjusted retry settings for faster testing - retrySettings := configretry.BackOffConfig{ - Enabled: true, - InitialInterval: 100 * time.Millisecond, // Shorter initial interval - MaxInterval: 1 * time.Second, // Shorter max interval - MaxElapsedTime: 2 * time.Second, // Shorter max elapsed time - } - clientConfig := confighttp.NewDefaultClientConfig() - clientConfig.Endpoint = server.URL - clientConfig.ReadBufferSize = 0 - clientConfig.WriteBufferSize = 512 * 1024 - cfg := &Config{ - Namespace: "", - ClientConfig: clientConfig, - MaxBatchSizeBytes: 3000000, - RemoteWriteQueue: RemoteWriteQueue{NumConsumers: 1}, - TargetInfo: &TargetInfo{ - Enabled: true, - }, - CreatedMetric: &CreatedMetric{ - Enabled: false, - }, - BackOffConfig: retrySettings, - } - - assert.NotNil(t, cfg) - set := exportertest.NewNopSettings() - set.MetricsLevel = configtelemetry.LevelBasic - - prwe, nErr := newPRWExporter(cfg, set) - - require.NoError(t, nErr) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - require.NoError(t, prwe.Start(ctx, componenttest.NewNopHost())) - defer func() { - require.NoError(t, prwe.Shutdown(ctx)) - }() - - var wg sync.WaitGroup - wg.Add(n) - for _, m := range ms { - go func() { - err := prwe.PushMetrics(ctx, m) - assert.NoError(t, err) - wg.Done() - }() - } - wg.Wait() - assert.Len(t, received, n) -} - func Test_validateAndSanitizeExternalLabels(t *testing.T) { tests := []struct { name string