Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receiver/prometheusreceiver: add option to fallback to collector starttime #36365

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/starttime-fallback.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `UseCollectorStartTimeFallback` option for the start time metric adjuster to use the collector start time as an approximation of process start time as a fallback.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36364]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
3 changes: 2 additions & 1 deletion receiver/prometheusreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ The prometheus receiver also supports additional top-level options:
- **trim_metric_suffixes**: [**Experimental**] When set to true, this enables trimming unit and some counter type suffixes from metric names. For example, it would cause `singing_duration_seconds_total` to be trimmed to `singing_duration`. This can be useful when trying to restore the original metric names used in OpenTelemetry instrumentation. Defaults to false.
- **use_start_time_metric**: When set to true, this enables retrieving the start time of all counter metrics from the process_start_time_seconds metric. This is only correct if all counters on that endpoint started after the process start time, and the process is the only actor exporting the metric after the process started. It should not be used in "exporters" which export counters that may have started before the process itself. Use only if you know what you are doing, as this may result in incorrect rate calculations. Defaults to false.
- **start_time_metric_regex**: The regular expression for the start time metric, and is only applied when use_start_time_metric is enabled. Defaults to process_start_time_seconds.

- **use_collector_start_time_fallback**: When set to true, this option enables using the collector start time as the metric start time if the process_start_time_seconds metric yields no result (for example if targets expose no process_start_time_seconds metric). This is useful when the collector start time is a good approximation of the process start time - for example in serverless workloads when the collector is deployed as a sidecar. This is only applied when use_start_time_metric is enabled. Defaults to false.
For example,

```yaml
Expand All @@ -128,6 +128,7 @@ receivers:
trim_metric_suffixes: true
use_start_time_metric: true
start_time_metric_regex: foo_bar_.*
use_collector_start_time_fallback: true
dashpole marked this conversation as resolved.
Show resolved Hide resolved
config:
scrape_configs:
- job_name: 'otel-collector'
Expand Down
38 changes: 30 additions & 8 deletions receiver/prometheusreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,43 @@ import (
type Config struct {
PrometheusConfig *PromConfig `mapstructure:"config"`
TrimMetricSuffixes bool `mapstructure:"trim_metric_suffixes"`
// UseStartTimeMetric enables retrieving the start time of all counter metrics
// from the process_start_time_seconds metric. This is only correct if all counters on that endpoint
// started after the process start time, and the process is the only actor exporting the metric after
// the process started. It should not be used in "exporters" which export counters that may have
// started before the process itself. Use only if you know what you are doing, as this may result
// in incorrect rate calculations.
UseStartTimeMetric bool `mapstructure:"use_start_time_metric"`
StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"`

// Settings for adjusting metrics. Will default to using an InitialPointAdjuster
// which will use the first scraped point to define the start time for the timeseries.
AdjustOpts MetricAdjusterOpts `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.

// ReportExtraScrapeMetrics - enables reporting of additional metrics for Prometheus client like scrape_body_size_bytes
ReportExtraScrapeMetrics bool `mapstructure:"report_extra_scrape_metrics"`

TargetAllocator *targetallocator.Config `mapstructure:"target_allocator"`
}

type MetricAdjusterOpts struct {
// UseStartTimeMetric enables retrieving the start time of all counter
// metrics from the process_start_time_seconds metric. This is only correct
// if all counters on that endpoint started after the process start time,
// and the process is the only actor exporting the metric after the process
// started. It should not be used in "exporters" which export counters that
// may have started before the process itself. Use only if you know what you
// are doing, as this may result in incorrect rate calculations.
UseStartTimeMetric bool `mapstructure:"use_start_time_metric"`
StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"`

// UseCollectorStartTimeFallback enables using a fallback start time if a
// start time is otherwise unavailable when adjusting metrics. This would
// happen if the UseStartTimeMetric is used but the application doesn't emit
// a process_start_time_seconds metric or a metric that matches the
// StartTimeMetricRegex provided.
//
// If enabled, the fallback start time used for adjusted metrics is an
// approximation of the collector start time.
//
// This option should be used when the collector start time is a good
// approximation of the process start time - for example in serverless
// workloads when the collector is deployed as a sidecar.
UseCollectorStartTimeFallback bool `mapstructure:"use_collector_start_time_fallback"`
}

// Validate checks the receiver configuration is valid.
func (cfg *Config) Validate() error {
if !containsScrapeConfig(cfg) && cfg.TargetAllocator == nil {
Expand Down
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestLoadConfig(t *testing.T) {
r1 := cfg.(*Config)
assert.Equal(t, "demo", r1.PrometheusConfig.ScrapeConfigs[0].JobName)
assert.Equal(t, 5*time.Second, time.Duration(r1.PrometheusConfig.ScrapeConfigs[0].ScrapeInterval))
assert.True(t, r1.UseStartTimeMetric)
assert.True(t, r1.AdjustOpts.UseStartTimeMetric)
assert.True(t, r1.TrimMetricSuffixes)
assert.Equal(t, "^(.+_)*process_start_time_seconds$", r1.StartTimeMetricRegex)
assert.Equal(t, "^(.+_)*process_start_time_seconds$", r1.AdjustOpts.StartTimeMetricRegex)
assert.True(t, r1.ReportExtraScrapeMetrics)

assert.Equal(t, "http://my-targetallocator-service", r1.TargetAllocator.Endpoint)
Expand Down
3 changes: 2 additions & 1 deletion receiver/prometheusreceiver/internal/appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func NewAppendable(
useStartTimeMetric bool,
startTimeMetricRegex *regexp.Regexp,
useCreatedMetric bool,
useCollectorStartTimeFallback bool,
enableNativeHistograms bool,
externalLabels labels.Labels,
trimSuffixes bool,
Expand All @@ -45,7 +46,7 @@ func NewAppendable(
if !useStartTimeMetric {
metricAdjuster = NewInitialPointAdjuster(set.Logger, gcInterval, useCreatedMetric)
} else {
metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex)
metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex, useCollectorStartTimeFallback)
}

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverID: set.ID, Transport: transport, ReceiverCreateSettings: set})
Expand Down
16 changes: 14 additions & 2 deletions receiver/prometheusreceiver/internal/starttimemetricadjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"errors"
"regexp"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
Expand All @@ -19,21 +20,32 @@ var (

type startTimeMetricAdjuster struct {
startTimeMetricRegex *regexp.Regexp
fallbackStartTime *time.Time
logger *zap.Logger
}

// NewStartTimeMetricAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on a start time metric.
func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp) MetricsAdjuster {
func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp, useCollectorStartTimeFallback bool) MetricsAdjuster {
var fallbackStartTime *time.Time
if useCollectorStartTimeFallback {
now := time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than assume that this is always called at/near start time of collector, should we use the github.com/shirou/gopsutil/v4/host package to request uptime like hostmetricsreceiver does for boottime?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that correct in a containerized environment, or would it give the start time of the host?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know the answer to that question, would need to be tested (I don't currently have capacity to test myself).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't be the collector start time, but the start time of this instance of the component. If the pipeline is stopped and restarted then this will result in a different timestamp even though the process has not restarted. This is a subtlety, but could be very confusing for someone using OpAMP to manage collector instances. I'm not sure whether there's a way to get a reliable process start time from the collector host. The system uptime is almost certainly not the correct value to use here. Perhaps the best option is just to clarify in the description of the new configuration field that the approximated start time will be relative to the component start time and not necessarily the collector start time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we populate it using an init function, or a variable outside of metric adjuster?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Populating a variable in an init function could work. It would be executed once near the start of the process.

fallbackStartTime = &now
}
return &startTimeMetricAdjuster{
startTimeMetricRegex: startTimeMetricRegex,
fallbackStartTime: fallbackStartTime,
logger: logger,
}
}

func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) error {
startTime, err := stma.getStartTime(metrics)
if err != nil {
return err
if stma.fallbackStartTime == nil {
return err
}
stma.logger.Warn("Couldn't get start time for metrics. Using fallback start time.", zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be at the Warn level? Wouldn't this be a fairly high-rate log entry if none of the processed metrics have a start time?

startTime = float64(stma.fallbackStartTime.Unix())
}

startTimeTs := timestampFromFloat64(startTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package internal
import (
"regexp"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestStartTimeMetricMatch(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex)
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, false)
if tt.expectedErr != nil {
assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr)
return
Expand Down Expand Up @@ -154,3 +155,100 @@ func TestStartTimeMetricMatch(t *testing.T) {
})
}
}

func TestStartTimeMetricFallback(t *testing.T) {
const startTime = pcommon.Timestamp(123 * 1e9)
const currentTime = pcommon.Timestamp(126 * 1e9)
mockStartTime := time.Now().Add(-10 * time.Hour)
mockStartTimeSeconds := float64(mockStartTime.Unix())
processStartTime := mockStartTime.Add(-10 * time.Hour)
processStartTimeSeconds := float64(processStartTime.Unix())

tests := []struct {
name string
inputs pmetric.Metrics
startTimeMetricRegex *regexp.Regexp
expectedStartTime pcommon.Timestamp
expectedErr error
}{
{
name: "regexp_match_sum_metric_no_fallback",
inputs: metrics(
sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)),
histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})),
summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})),
sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, processStartTimeSeconds)),
sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, processStartTimeSeconds)),
exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)),
),
startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"),
expectedStartTime: timestampFromFloat64(processStartTimeSeconds),
},
{
name: "regexp_match_sum_metric_fallback",
inputs: metrics(
sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)),
histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})),
summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})),
),
startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"),
expectedStartTime: timestampFromFloat64(mockStartTimeSeconds),
},
{
name: "match_default_sum_start_time_metric_fallback",
inputs: metrics(
sumMetric("test_sum_metric", doublePoint(nil, startTime, currentTime, 16)),
histogramMetric("test_histogram_metric", histogramPoint(nil, startTime, currentTime, []float64{1, 2}, []uint64{2, 3, 4})),
summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})),
),
expectedStartTime: timestampFromFloat64(mockStartTimeSeconds),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, true)
if tt.expectedErr != nil {
assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr)
return
}

// Make sure the right adjuster is used and one that has the fallback time set.
metricAdjuster, ok := stma.(*startTimeMetricAdjuster)
assert.True(t, ok)
assert.NotNil(t, metricAdjuster.fallbackStartTime)

// To test that the adjuster is using the fallback correctly, override the fallback time to use
// directly.
metricAdjuster.fallbackStartTime = &mockStartTime

assert.NoError(t, stma.AdjustMetrics(tt.inputs))
for i := 0; i < tt.inputs.ResourceMetrics().Len(); i++ {
rm := tt.inputs.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
ilm := rm.ScopeMetrics().At(j)
for k := 0; k < ilm.Metrics().Len(); k++ {
metric := ilm.Metrics().At(k)
switch metric.Type() {
case pmetric.MetricTypeSum:
dps := metric.Sum().DataPoints()
for l := 0; l < dps.Len(); l++ {
assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp())
}
case pmetric.MetricTypeSummary:
dps := metric.Summary().DataPoints()
for l := 0; l < dps.Len(); l++ {
assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp())
}
case pmetric.MetricTypeHistogram:
dps := metric.Histogram().DataPoints()
for l := 0; l < dps.Len(); l++ {
assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp())
}
}
}
}
}
})
}
}
7 changes: 4 additions & 3 deletions receiver/prometheusreceiver/metrics_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Log
}()

var startTimeMetricRegex *regexp.Regexp
if r.cfg.StartTimeMetricRegex != "" {
startTimeMetricRegex, err = regexp.Compile(r.cfg.StartTimeMetricRegex)
if r.cfg.AdjustOpts.StartTimeMetricRegex != "" {
startTimeMetricRegex, err = regexp.Compile(r.cfg.AdjustOpts.StartTimeMetricRegex)
if err != nil {
return err
}
Expand All @@ -134,9 +134,10 @@ func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Log
r.consumer,
r.settings,
gcInterval(r.cfg.PrometheusConfig),
r.cfg.UseStartTimeMetric,
r.cfg.AdjustOpts.UseStartTimeMetric,
startTimeMetricRegex,
useCreatedMetricGate.IsEnabled(),
r.cfg.AdjustOpts.UseCollectorStartTimeFallback,
enableNativeHistogramsGate.IsEnabled(),
r.cfg.PrometheusConfig.GlobalConfig.ExternalLabels,
r.cfg.TrimMetricSuffixes,
Expand Down
4 changes: 2 additions & 2 deletions receiver/prometheusreceiver/metrics_receiver_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,8 @@ func testComponent(t *testing.T, targets []*testData, alterConfig func(*Config),
defer mp.Close()

config := &Config{
PrometheusConfig: cfg,
StartTimeMetricRegex: "",
PrometheusConfig: cfg,
AdjustOpts: MetricAdjusterOpts{StartTimeMetricRegex: ""},
}
if alterConfig != nil {
alterConfig(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ func testScraperMetrics(t *testing.T, targets []*testData, reportExtraScrapeMetr

cms := new(consumertest.MetricsSink)
receiver := newPrometheusReceiver(receivertest.NewNopSettings(), &Config{
PrometheusConfig: cfg,
UseStartTimeMetric: false,
StartTimeMetricRegex: "",
PrometheusConfig: cfg,
AdjustOpts: MetricAdjusterOpts{
UseStartTimeMetric: false,
StartTimeMetricRegex: "",
},
ReportExtraScrapeMetrics: reportExtraScrapeMetrics,
}, cms)

Expand Down
6 changes: 3 additions & 3 deletions receiver/prometheusreceiver/metrics_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ func TestStartTimeMetric(t *testing.T) {
},
}
testComponent(t, targets, func(c *Config) {
c.UseStartTimeMetric = true
c.AdjustOpts.UseStartTimeMetric = true
})
}

Expand Down Expand Up @@ -1475,8 +1475,8 @@ func TestStartTimeMetricRegex(t *testing.T) {
},
}
testComponent(t, targets, func(c *Config) {
c.StartTimeMetricRegex = "^(.+_)*process_start_time_seconds$"
c.UseStartTimeMetric = true
c.AdjustOpts.StartTimeMetricRegex = "^(.+_)*process_start_time_seconds$"
c.AdjustOpts.UseStartTimeMetric = true
})
}

Expand Down