From d67a52690710ffac27f0940fac8387975ea90b44 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif Date: Thu, 14 Nov 2024 02:26:01 +0000 Subject: [PATCH] receiver/prometheusreceiver: add option to fallback to collector starttime This change adds an option to the metric adjuster to use an approximation of the collector starttime as a fallback for the start time of scraped cumulative metrics. This is useful when no start time is found and when the collector starts up alongside its targets (like in serverless environments or sidecar approaches). Signed-off-by: Ridwan Sharif --- .chloggen/starttime-fallback.yaml | 27 +++++ receiver/prometheusreceiver/README.md | 3 +- receiver/prometheusreceiver/config.go | 38 +++++-- receiver/prometheusreceiver/config_test.go | 4 +- .../prometheusreceiver/internal/appendable.go | 3 +- .../internal/starttimemetricadjuster.go | 16 ++- .../internal/starttimemetricadjuster_test.go | 100 +++++++++++++++++- .../prometheusreceiver/metrics_receiver.go | 7 +- .../metrics_receiver_helper_test.go | 4 +- ...ceiver_report_extra_scrape_metrics_test.go | 8 +- .../metrics_receiver_test.go | 6 +- 11 files changed, 190 insertions(+), 26 deletions(-) create mode 100644 .chloggen/starttime-fallback.yaml diff --git a/.chloggen/starttime-fallback.yaml b/.chloggen/starttime-fallback.yaml new file mode 100644 index 0000000000000..59ee2665165b1 --- /dev/null +++ b/.chloggen/starttime-fallback.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add `UseCollectorStartTimeFallback` option for the start time metric adjuster to use the collector start time as an approximation of process start time as a fallback. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36364] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/prometheusreceiver/README.md b/receiver/prometheusreceiver/README.md index cfa7472eb824f..0cbe82c544562 100644 --- a/receiver/prometheusreceiver/README.md +++ b/receiver/prometheusreceiver/README.md @@ -119,7 +119,7 @@ The prometheus receiver also supports additional top-level options: - **trim_metric_suffixes**: [**Experimental**] When set to true, this enables trimming unit and some counter type suffixes from metric names. For example, it would cause `singing_duration_seconds_total` to be trimmed to `singing_duration`. This can be useful when trying to restore the original metric names used in OpenTelemetry instrumentation. Defaults to false. - **use_start_time_metric**: When set to true, this enables retrieving the start time of all counter metrics from the process_start_time_seconds metric. This is only correct if all counters on that endpoint started after the process start time, and the process is the only actor exporting the metric after the process started. It should not be used in "exporters" which export counters that may have started before the process itself. Use only if you know what you are doing, as this may result in incorrect rate calculations. Defaults to false. - **start_time_metric_regex**: The regular expression for the start time metric, and is only applied when use_start_time_metric is enabled. Defaults to process_start_time_seconds. - +- **use_collector_start_time_fallback**: When set to true, this option enables using the collector start time as the metric start time if the process_start_time_seconds metric yields no result (for example if targets expose no process_start_time_seconds metric). This is useful when the collector start time is a good approximation of the process start time - for example in serverless workloads when the collector is deployed as a sidecar. This is only applied when use_start_time_metric is enabled. Defaults to false. For example, ```yaml @@ -128,6 +128,7 @@ receivers: trim_metric_suffixes: true use_start_time_metric: true start_time_metric_regex: foo_bar_.* + use_collector_start_time_fallback: true config: scrape_configs: - job_name: 'otel-collector' diff --git a/receiver/prometheusreceiver/config.go b/receiver/prometheusreceiver/config.go index 556dd42445187..3ad64974be120 100644 --- a/receiver/prometheusreceiver/config.go +++ b/receiver/prometheusreceiver/config.go @@ -23,14 +23,10 @@ import ( type Config struct { PrometheusConfig *PromConfig `mapstructure:"config"` TrimMetricSuffixes bool `mapstructure:"trim_metric_suffixes"` - // UseStartTimeMetric enables retrieving the start time of all counter metrics - // from the process_start_time_seconds metric. This is only correct if all counters on that endpoint - // started after the process start time, and the process is the only actor exporting the metric after - // the process started. It should not be used in "exporters" which export counters that may have - // started before the process itself. Use only if you know what you are doing, as this may result - // in incorrect rate calculations. - UseStartTimeMetric bool `mapstructure:"use_start_time_metric"` - StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"` + + // Settings for adjusting metrics. Will default to using an InitialPointAdjuster + // which will use the first scraped point to define the start time for the timeseries. + AdjustOpts MetricAdjusterOpts `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. // ReportExtraScrapeMetrics - enables reporting of additional metrics for Prometheus client like scrape_body_size_bytes ReportExtraScrapeMetrics bool `mapstructure:"report_extra_scrape_metrics"` @@ -38,6 +34,32 @@ type Config struct { TargetAllocator *targetallocator.Config `mapstructure:"target_allocator"` } +type MetricAdjusterOpts struct { + // UseStartTimeMetric enables retrieving the start time of all counter + // metrics from the process_start_time_seconds metric. This is only correct + // if all counters on that endpoint started after the process start time, + // and the process is the only actor exporting the metric after the process + // started. It should not be used in "exporters" which export counters that + // may have started before the process itself. Use only if you know what you + // are doing, as this may result in incorrect rate calculations. + UseStartTimeMetric bool `mapstructure:"use_start_time_metric"` + StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"` + + // UseCollectorStartTimeFallback enables using a fallback start time if a + // start time is otherwise unavailable when adjusting metrics. This would + // happen if the UseStartTimeMetric is used but the application doesn't emit + // a process_start_time_seconds metric or a metric that matches the + // StartTimeMetricRegex provided. + // + // If enabled, the fallback start time used for adjusted metrics is an + // approximation of the collector start time. + // + // This option should be used when the collector start time is a good + // approximation of the process start time - for example in serverless + // workloads when the collector is deployed as a sidecar. + UseCollectorStartTimeFallback bool `mapstructure:"use_collector_start_time_fallback"` +} + // Validate checks the receiver configuration is valid. func (cfg *Config) Validate() error { if !containsScrapeConfig(cfg) && cfg.TargetAllocator == nil { diff --git a/receiver/prometheusreceiver/config_test.go b/receiver/prometheusreceiver/config_test.go index 2c0fe018270a6..f675b1adc8c2c 100644 --- a/receiver/prometheusreceiver/config_test.go +++ b/receiver/prometheusreceiver/config_test.go @@ -43,9 +43,9 @@ func TestLoadConfig(t *testing.T) { r1 := cfg.(*Config) assert.Equal(t, "demo", r1.PrometheusConfig.ScrapeConfigs[0].JobName) assert.Equal(t, 5*time.Second, time.Duration(r1.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval)) - assert.True(t, r1.UseStartTimeMetric) + assert.True(t, r1.AdjustOpts.UseStartTimeMetric) assert.True(t, r1.TrimMetricSuffixes) - assert.Equal(t, "^(.+_)*process_start_time_seconds$", r1.StartTimeMetricRegex) + assert.Equal(t, "^(.+_)*process_start_time_seconds$", r1.AdjustOpts.StartTimeMetricRegex) assert.True(t, r1.ReportExtraScrapeMetrics) assert.Equal(t, "http://my-targetallocator-service", r1.TargetAllocator.Endpoint) diff --git a/receiver/prometheusreceiver/internal/appendable.go b/receiver/prometheusreceiver/internal/appendable.go index 086d2d639a291..0b88603bd4e64 100644 --- a/receiver/prometheusreceiver/internal/appendable.go +++ b/receiver/prometheusreceiver/internal/appendable.go @@ -37,6 +37,7 @@ func NewAppendable( useStartTimeMetric bool, startTimeMetricRegex *regexp.Regexp, useCreatedMetric bool, + useCollectorStartTimeFallback bool, enableNativeHistograms bool, externalLabels labels.Labels, trimSuffixes bool, @@ -45,7 +46,7 @@ func NewAppendable( if !useStartTimeMetric { metricAdjuster = NewInitialPointAdjuster(set.Logger, gcInterval, useCreatedMetric) } else { - metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex) + metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex, useCollectorStartTimeFallback) } obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverID: set.ID, Transport: transport, ReceiverCreateSettings: set}) diff --git a/receiver/prometheusreceiver/internal/starttimemetricadjuster.go b/receiver/prometheusreceiver/internal/starttimemetricadjuster.go index 1b3eb51529f59..2c964ab71cefa 100644 --- a/receiver/prometheusreceiver/internal/starttimemetricadjuster.go +++ b/receiver/prometheusreceiver/internal/starttimemetricadjuster.go @@ -6,6 +6,7 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co import ( "errors" "regexp" + "time" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -19,13 +20,20 @@ var ( type startTimeMetricAdjuster struct { startTimeMetricRegex *regexp.Regexp + fallbackStartTime *time.Time logger *zap.Logger } // NewStartTimeMetricAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on a start time metric. -func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp) MetricsAdjuster { +func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp, useCollectorStartTimeFallback bool) MetricsAdjuster { + var fallbackStartTime *time.Time + if useCollectorStartTimeFallback { + now := time.Now() + fallbackStartTime = &now + } return &startTimeMetricAdjuster{ startTimeMetricRegex: startTimeMetricRegex, + fallbackStartTime: fallbackStartTime, logger: logger, } } @@ -33,7 +41,11 @@ func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { startTime, err := stma.getStartTime(metrics) if err != nil { - return err + if stma.fallbackStartTime == nil { + return err + } + stma.logger.Warn("Couldn't get start time for metrics. Using fallback start time.", zap.Error(err)) + startTime = float64(stma.fallbackStartTime.Unix()) } startTimeTs := timestampFromFloat64(startTime) diff --git a/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go b/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go index 84bdc2756ed5b..e7c42e9c64f69 100644 --- a/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go +++ b/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go @@ -6,6 +6,7 @@ package internal import ( "regexp" "testing" + "time" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" @@ -113,7 +114,7 @@ func TestStartTimeMetricMatch(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex) + stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, false) if tt.expectedErr != nil { assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr) return @@ -154,3 +155,100 @@ func TestStartTimeMetricMatch(t *testing.T) { }) } } + +func TestStartTimeMetricFallback(t *testing.T) { + const startTime = pcommon.Timestamp(123 * 1e9) + const currentTime = pcommon.Timestamp(126 * 1e9) + mockStartTime := time.Now().Add(-10 * time.Hour) + mockStartTimeSeconds := float64(mockStartTime.Unix()) + processStartTime := mockStartTime.Add(-10 * time.Hour) + processStartTimeSeconds := float64(processStartTime.Unix()) + + tests := []struct { + name string + inputs pmetric.Metrics + startTimeMetricRegex *regexp.Regexp + expectedStartTime pcommon.Timestamp + expectedErr error + }{ + { + name: "regexp_match_sum_metric_no_fallback", + inputs: metrics( + sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)), + histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})), + summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), + sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, processStartTimeSeconds)), + sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, processStartTimeSeconds)), + exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)), + ), + startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"), + expectedStartTime: timestampFromFloat64(processStartTimeSeconds), + }, + { + name: "regexp_match_sum_metric_fallback", + inputs: metrics( + sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)), + histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})), + summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), + ), + startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"), + expectedStartTime: timestampFromFloat64(mockStartTimeSeconds), + }, + { + name: "match_default_sum_start_time_metric_fallback", + inputs: metrics( + sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)), + histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})), + summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), + ), + expectedStartTime: timestampFromFloat64(mockStartTimeSeconds), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, true) + if tt.expectedErr != nil { + assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr) + return + } + + // Make sure the right adjuster is used and one that has the fallback time set. + metricAdjuster, ok := stma.(*startTimeMetricAdjuster) + assert.True(t, ok) + assert.NotNil(t, metricAdjuster.fallbackStartTime) + + // To test that the adjuster is using the fallback correctly, override the fallback time to use + // directly. + metricAdjuster.fallbackStartTime = &mockStartTime + + assert.NoError(t, stma.AdjustMetrics(tt.inputs)) + for i := 0; i < tt.inputs.ResourceMetrics().Len(); i++ { + rm := tt.inputs.ResourceMetrics().At(i) + for j := 0; j < rm.ScopeMetrics().Len(); j++ { + ilm := rm.ScopeMetrics().At(j) + for k := 0; k < ilm.Metrics().Len(); k++ { + metric := ilm.Metrics().At(k) + switch metric.Type() { + case pmetric.MetricTypeSum: + dps := metric.Sum().DataPoints() + for l := 0; l < dps.Len(); l++ { + assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) + } + case pmetric.MetricTypeSummary: + dps := metric.Summary().DataPoints() + for l := 0; l < dps.Len(); l++ { + assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) + } + case pmetric.MetricTypeHistogram: + dps := metric.Histogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) + } + } + } + } + } + }) + } +} diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 239a6172c4666..fdde0f5c76093 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -123,8 +123,8 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Log }() var startTimeMetricRegex *regexp.Regexp - if r.cfg.StartTimeMetricRegex != "" { - startTimeMetricRegex, err = regexp.Compile(r.cfg.StartTimeMetricRegex) + if r.cfg.AdjustOpts.StartTimeMetricRegex != "" { + startTimeMetricRegex, err = regexp.Compile(r.cfg.AdjustOpts.StartTimeMetricRegex) if err != nil { return err } @@ -134,9 +134,10 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Log r.consumer, r.settings, gcInterval(r.cfg.PrometheusConfig), - r.cfg.UseStartTimeMetric, + r.cfg.AdjustOpts.UseStartTimeMetric, startTimeMetricRegex, useCreatedMetricGate.IsEnabled(), + r.cfg.AdjustOpts.UseCollectorStartTimeFallback, enableNativeHistogramsGate.IsEnabled(), r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels, r.cfg.TrimMetricSuffixes, diff --git a/receiver/prometheusreceiver/metrics_receiver_helper_test.go b/receiver/prometheusreceiver/metrics_receiver_helper_test.go index 1bda7ac42e905..b1b006d57a2a6 100644 --- a/receiver/prometheusreceiver/metrics_receiver_helper_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_helper_test.go @@ -687,8 +687,8 @@ func testComponent(t *testing.T, targets []*testData, alterConfig func(*Config), defer mp.Close() config := &Config{ - PrometheusConfig: cfg, - StartTimeMetricRegex: "", + PrometheusConfig: cfg, + AdjustOpts: MetricAdjusterOpts{StartTimeMetricRegex: ""}, } if alterConfig != nil { alterConfig(config) diff --git a/receiver/prometheusreceiver/metrics_receiver_report_extra_scrape_metrics_test.go b/receiver/prometheusreceiver/metrics_receiver_report_extra_scrape_metrics_test.go index 9b9fbc5a04c1d..d176cbbeef0bb 100644 --- a/receiver/prometheusreceiver/metrics_receiver_report_extra_scrape_metrics_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_report_extra_scrape_metrics_test.go @@ -52,9 +52,11 @@ func testScraperMetrics(t *testing.T, targets []*testData, reportExtraScrapeMetr cms := new(consumertest.MetricsSink) receiver := newPrometheusReceiver(receivertest.NewNopSettings(), &Config{ - PrometheusConfig: cfg, - UseStartTimeMetric: false, - StartTimeMetricRegex: "", + PrometheusConfig: cfg, + AdjustOpts: MetricAdjusterOpts{ + UseStartTimeMetric: false, + StartTimeMetricRegex: "", + }, ReportExtraScrapeMetrics: reportExtraScrapeMetrics, }, cms) diff --git a/receiver/prometheusreceiver/metrics_receiver_test.go b/receiver/prometheusreceiver/metrics_receiver_test.go index b0c1ed784ce18..31c69d75b4790 100644 --- a/receiver/prometheusreceiver/metrics_receiver_test.go +++ b/receiver/prometheusreceiver/metrics_receiver_test.go @@ -1424,7 +1424,7 @@ func TestStartTimeMetric(t *testing.T) { }, } testComponent(t, targets, func(c *Config) { - c.UseStartTimeMetric = true + c.AdjustOpts.UseStartTimeMetric = true }) } @@ -1475,8 +1475,8 @@ func TestStartTimeMetricRegex(t *testing.T) { }, } testComponent(t, targets, func(c *Config) { - c.StartTimeMetricRegex = "^(.+_)*process_start_time_seconds$" - c.UseStartTimeMetric = true + c.AdjustOpts.StartTimeMetricRegex = "^(.+_)*process_start_time_seconds$" + c.AdjustOpts.UseStartTimeMetric = true }) }