From 4abd68a0aee9cb329651eeb4dd3d205934c1ee90 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Fri, 24 Jan 2025 02:37:53 +0100 Subject: [PATCH] [processor/cumulativetodelta] Add metric type filter (#34407) **Description:** Add metric type filter for cumulativetodelta processor **Link to tracking Issue:** #33673 **Testing:** Added unit tests **Documentation:** Extended the readme of this component to describe this new filter --------- Signed-off-by: Florian Bacher --- ...to-delta-processor-metric-type-filter.yaml | 27 ++ .../cumulativetodeltaprocessor/README.md | 46 +++- .../cumulativetodeltaprocessor/config.go | 31 +++ .../cumulativetodeltaprocessor/config_test.go | 40 +++ .../cumulativetodeltaprocessor/factory.go | 5 +- .../factory_test.go | 7 + processor/cumulativetodeltaprocessor/go.mod | 1 + processor/cumulativetodeltaprocessor/go.sum | 2 + .../cumulativetodeltaprocessor/processor.go | 68 ++++- .../processor_test.go | 258 ++++++++++++++---- .../testdata/config.yaml | 43 +++ 11 files changed, 455 insertions(+), 73 deletions(-) create mode 100644 .chloggen/cumulative-to-delta-processor-metric-type-filter.yaml diff --git a/.chloggen/cumulative-to-delta-processor-metric-type-filter.yaml b/.chloggen/cumulative-to-delta-processor-metric-type-filter.yaml new file mode 100644 index 000000000000..79f1c9d82175 --- /dev/null +++ b/.chloggen/cumulative-to-delta-processor-metric-type-filter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cumulativetodeltaprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add metric type filter for cumulativetodelta processor + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33673] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/cumulativetodeltaprocessor/README.md b/processor/cumulativetodeltaprocessor/README.md index 6963e47000ae..ed6de34eaf4e 100644 --- a/processor/cumulativetodeltaprocessor/README.md +++ b/processor/cumulativetodeltaprocessor/README.md @@ -23,8 +23,8 @@ Configuration is specified through a list of metrics. The processor uses metric The following settings can be optionally configured: -- `include`: List of metrics names or patterns to convert to delta. -- `exclude`: List of metrics names or patterns to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** +- `include`: List of metrics names (case-insensitive), patterns or metric types to convert to delta. Valid values are: `sum`, `histogram`. +- `exclude`: List of metrics names (case-insensitive), patterns or metric types to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** Valid values are: `sum`, `histogram`. - `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0 - `initial_value`: Handling of the first observed point for a given metric identity. When the collector (re)starts, there's no record of how much of a given cumulative counter has already been converted to delta values. @@ -56,6 +56,17 @@ processors: match_type: strict ``` +```yaml +processors: + # processor name: cumulativetodelta + cumulativetodelta: + + # Convert all sum metrics + include: + metric_types: + - sum +``` + ```yaml processors: # processor name: cumulativetodelta @@ -69,6 +80,21 @@ processors: match_type: regexp ``` +```yaml +processors: + # processor name: cumulativetodelta + cumulativetodelta: + + # Convert cumulative sum metrics to delta + # if and only if 'metric' is in the name + include: + metrics: + - ".*metric.*" + match_type: regexp + metric_types: + - sum +``` + ```yaml processors: # processor name: cumulativetodelta @@ -82,6 +108,22 @@ processors: match_type: regexp ``` +```yaml +processors: + # processor name: cumulativetodelta + cumulativetodelta: + + # Convert cumulative sum metrics with 'metric' in their name, + # but exclude histogram metrics + include: + metrics: + - ".*metric.*" + match_type: regexp + exclude: + metric_types: + - histogram +``` + ```yaml processors: # processor name: cumulativetodelta diff --git a/processor/cumulativetodeltaprocessor/config.go b/processor/cumulativetodeltaprocessor/config.go index dcba656c838d..adcc81090f2d 100644 --- a/processor/cumulativetodeltaprocessor/config.go +++ b/processor/cumulativetodeltaprocessor/config.go @@ -5,14 +5,24 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "fmt" + "strings" "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pmetric" + "golang.org/x/exp/maps" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterset" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor/internal/tracking" ) +var validMetricTypes = map[string]bool{ + strings.ToLower(pmetric.MetricTypeSum.String()): true, + strings.ToLower(pmetric.MetricTypeHistogram.String()): true, +} + +var validMetricTypeList = maps.Keys(validMetricTypes) + // Config defines the configuration for the processor. type Config struct { // MaxStaleness is the total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. @@ -37,6 +47,8 @@ type MatchMetrics struct { filterset.Config `mapstructure:",squash"` Metrics []string `mapstructure:"metrics"` + + MetricTypes []string `mapstructure:"metric_types"` } var _ component.Config = (*Config)(nil) @@ -52,5 +64,24 @@ func (config *Config) Validate() error { (len(config.Exclude.MatchType) > 0 && len(config.Exclude.Metrics) == 0) { return fmt.Errorf("metrics must be supplied if match_type is set") } + + for _, metricType := range config.Exclude.MetricTypes { + if valid := validMetricTypes[strings.ToLower(metricType)]; !valid { + return fmt.Errorf( + "found invalid metric type in exclude.metric_types: %s. Valid values are %s", + metricType, + validMetricTypeList, + ) + } + } + for _, metricType := range config.Include.MetricTypes { + if valid := validMetricTypes[strings.ToLower(metricType)]; !valid { + return fmt.Errorf( + "found invalid metric type in include.metric_types: %s. Valid values are %s", + metricType, + validMetricTypeList, + ) + } + } return nil } diff --git a/processor/cumulativetodeltaprocessor/config_test.go b/processor/cumulativetodeltaprocessor/config_test.go index 97c3f8952077..337c2fc13c37 100644 --- a/processor/cumulativetodeltaprocessor/config_test.go +++ b/processor/cumulativetodeltaprocessor/config_test.go @@ -4,6 +4,7 @@ package cumulativetodeltaprocessor import ( + "fmt" "path/filepath" "testing" "time" @@ -82,6 +83,45 @@ func TestLoadConfig(t *testing.T) { InitialValue: tracking.InitialValueAuto, }, }, + { + id: component.NewIDWithName(metadata.Type, "metric_type_filter"), + expected: &Config{ + Include: MatchMetrics{ + Metrics: []string{ + "a*", + }, + Config: filterset.Config{ + MatchType: "regexp", + RegexpConfig: nil, + }, + MetricTypes: []string{ + "sum", + }, + }, + Exclude: MatchMetrics{ + Metrics: []string{ + "b*", + }, + Config: filterset.Config{ + MatchType: "regexp", + RegexpConfig: nil, + }, + MetricTypes: []string{ + "histogram", + }, + }, + MaxStaleness: 10 * time.Second, + InitialValue: tracking.InitialValueAuto, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "invalid_include_metric_type_filter"), + errorMessage: fmt.Sprintf("found invalid metric type in include.metric_types: gauge. Valid values are %s", validMetricTypeList), + }, + { + id: component.NewIDWithName(metadata.Type, "invalid_exclude_metric_type_filter"), + errorMessage: fmt.Sprintf("found invalid metric type in exclude.metric_types: Invalid. Valid values are %s", validMetricTypeList), + }, { id: component.NewIDWithName(metadata.Type, "missing_match_type"), errorMessage: "match_type must be set if metrics are supplied", diff --git a/processor/cumulativetodeltaprocessor/factory.go b/processor/cumulativetodeltaprocessor/factory.go index 24ffc9c3e334..21a0af4a04b5 100644 --- a/processor/cumulativetodeltaprocessor/factory.go +++ b/processor/cumulativetodeltaprocessor/factory.go @@ -40,7 +40,10 @@ func createMetricsProcessor( return nil, fmt.Errorf("configuration parsing error") } - metricsProcessor := newCumulativeToDeltaProcessor(processorConfig, set.Logger) + metricsProcessor, err := newCumulativeToDeltaProcessor(processorConfig, set.Logger) + if err != nil { + return nil, err + } return processorhelper.NewMetrics( ctx, diff --git a/processor/cumulativetodeltaprocessor/factory_test.go b/processor/cumulativetodeltaprocessor/factory_test.go index 6926b4257d6d..b309bc430396 100644 --- a/processor/cumulativetodeltaprocessor/factory_test.go +++ b/processor/cumulativetodeltaprocessor/factory_test.go @@ -6,6 +6,7 @@ package cumulativetodeltaprocessor import ( "context" "path/filepath" + "strings" "testing" "github.com/stretchr/testify/assert" @@ -61,6 +62,12 @@ func TestCreateProcessors(t *testing.T) { processortest.NewNopSettings(), cfg, consumertest.NewNop()) + + if strings.Contains(k, "invalid") { + assert.Error(t, mErr) + assert.Nil(t, mp) + return + } assert.NotNil(t, mp) assert.NoError(t, mErr) assert.NoError(t, mp.Shutdown(context.Background())) diff --git a/processor/cumulativetodeltaprocessor/go.mod b/processor/cumulativetodeltaprocessor/go.mod index 50c40922c46e..033c5cbbf8ee 100644 --- a/processor/cumulativetodeltaprocessor/go.mod +++ b/processor/cumulativetodeltaprocessor/go.mod @@ -16,6 +16,7 @@ require ( go.opentelemetry.io/collector/processor/processortest v0.118.1-0.20250121185328-fbefb22cc2b3 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 ) require ( diff --git a/processor/cumulativetodeltaprocessor/go.sum b/processor/cumulativetodeltaprocessor/go.sum index 227311851b1e..a80ae09f6ab8 100644 --- a/processor/cumulativetodeltaprocessor/go.sum +++ b/processor/cumulativetodeltaprocessor/go.sum @@ -105,6 +105,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= diff --git a/processor/cumulativetodeltaprocessor/processor.go b/processor/cumulativetodeltaprocessor/processor.go index 0c7673a9a169..78bfbaf3fd1c 100644 --- a/processor/cumulativetodeltaprocessor/processor.go +++ b/processor/cumulativetodeltaprocessor/processor.go @@ -5,7 +5,9 @@ package cumulativetodeltaprocessor // import "github.com/open-telemetry/opentele import ( "context" + "fmt" "math" + "strings" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -15,19 +17,21 @@ import ( ) type cumulativeToDeltaProcessor struct { - includeFS filterset.FilterSet - excludeFS filterset.FilterSet - logger *zap.Logger - deltaCalculator *tracking.MetricTracker - cancelFunc context.CancelFunc + includeFS filterset.FilterSet + excludeFS filterset.FilterSet + includeMetricTypes map[pmetric.MetricType]bool + excludeMetricTypes map[pmetric.MetricType]bool + logger *zap.Logger + deltaCalculator *tracking.MetricTracker + cancelFunc context.CancelFunc } -func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulativeToDeltaProcessor { +func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) (*cumulativeToDeltaProcessor, error) { ctx, cancel := context.WithCancel(context.Background()) + p := &cumulativeToDeltaProcessor{ - logger: logger, - deltaCalculator: tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue), - cancelFunc: cancel, + logger: logger, + cancelFunc: cancel, } if len(config.Include.Metrics) > 0 { p.includeFS, _ = filterset.CreateFilterSet(config.Include.Metrics, &config.Include.Config) @@ -35,7 +39,41 @@ func newCumulativeToDeltaProcessor(config *Config, logger *zap.Logger) *cumulati if len(config.Exclude.Metrics) > 0 { p.excludeFS, _ = filterset.CreateFilterSet(config.Exclude.Metrics, &config.Exclude.Config) } - return p + + if len(config.Include.MetricTypes) > 0 { + includeMetricTypeFilter, err := getMetricTypeFilter(config.Include.MetricTypes) + if err != nil { + return nil, err + } + p.includeMetricTypes = includeMetricTypeFilter + } + + if len(config.Exclude.MetricTypes) > 0 { + excludeMetricTypeFilter, err := getMetricTypeFilter(config.Exclude.MetricTypes) + if err != nil { + return nil, err + } + p.excludeMetricTypes = excludeMetricTypeFilter + } + + p.deltaCalculator = tracking.NewMetricTracker(ctx, logger, config.MaxStaleness, config.InitialValue) + + return p, nil +} + +func getMetricTypeFilter(types []string) (map[pmetric.MetricType]bool, error) { + res := map[pmetric.MetricType]bool{} + for _, t := range types { + switch strings.ToLower(t) { + case strings.ToLower(pmetric.MetricTypeSum.String()): + res[pmetric.MetricTypeSum] = true + case strings.ToLower(pmetric.MetricTypeHistogram.String()): + res[pmetric.MetricTypeHistogram] = true + default: + return nil, fmt.Errorf("unsupported metric type filter: %s", t) + } + } + return res, nil } // processMetrics implements the ProcessMetricsFunc type. @@ -43,7 +81,7 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { rm.ScopeMetrics().RemoveIf(func(ilm pmetric.ScopeMetrics) bool { ilm.Metrics().RemoveIf(func(m pmetric.Metric) bool { - if !ctdp.shouldConvertMetric(m.Name()) { + if !ctdp.shouldConvertMetric(m) { return false } switch m.Type() { @@ -111,9 +149,11 @@ func (ctdp *cumulativeToDeltaProcessor) shutdown(context.Context) error { return nil } -func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metricName string) bool { - return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metricName)) && - (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metricName)) +func (ctdp *cumulativeToDeltaProcessor) shouldConvertMetric(metric pmetric.Metric) bool { + return (ctdp.includeFS == nil || ctdp.includeFS.Matches(metric.Name())) && + (len(ctdp.includeMetricTypes) == 0 || ctdp.includeMetricTypes[metric.Type()]) && + (ctdp.excludeFS == nil || !ctdp.excludeFS.Matches(metric.Name())) && + (len(ctdp.excludeMetricTypes) == 0 || !ctdp.excludeMetricTypes[metric.Type()]) } func (ctdp *cumulativeToDeltaProcessor) convertNumberDataPoints(dps pmetric.NumberDataPointSlice, baseIdentity tracking.MetricIdentity) { diff --git a/processor/cumulativetodeltaprocessor/processor_test.go b/processor/cumulativetodeltaprocessor/processor_test.go index d7b0a19ab6cc..c36a8f06cb0b 100644 --- a/processor/cumulativetodeltaprocessor/processor_test.go +++ b/processor/cumulativetodeltaprocessor/processor_test.go @@ -5,6 +5,7 @@ package cumulativetodeltaprocessor import ( "context" + "errors" "math" "testing" "time" @@ -35,6 +36,30 @@ type testSumMetric struct { flags [][]pmetric.DataPointFlags } +func (tm testSumMetric) addToMetrics(ms pmetric.MetricSlice, now time.Time) { + for i, name := range tm.metricNames { + m := ms.AppendEmpty() + m.SetName(name) + sum := m.SetEmptySum() + sum.SetIsMonotonic(tm.isMonotonic[i]) + + if tm.isCumulative[i] { + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } else { + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + } + + for index, value := range tm.metricValues[i] { + dp := m.Sum().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) + dp.SetDoubleValue(value) + if len(tm.flags) > i && len(tm.flags[i]) > index { + dp.SetFlags(tm.flags[i][index]) + } + } + } +} + type testHistogramMetric struct { metricNames []string metricCounts [][]uint64 @@ -46,12 +71,54 @@ type testHistogramMetric struct { flags [][]pmetric.DataPointFlags } +func (tm testHistogramMetric) addToMetrics(ms pmetric.MetricSlice, now time.Time) { + for i, name := range tm.metricNames { + m := ms.AppendEmpty() + m.SetName(name) + hist := m.SetEmptyHistogram() + + if tm.isCumulative[i] { + hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } else { + hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + } + + for index, count := range tm.metricCounts[i] { + dp := m.Histogram().DataPoints().AppendEmpty() + dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) + dp.SetCount(count) + + sums := tm.metricSums[i] + if len(sums) > 0 { + dp.SetSum(sums[index]) + } + if tm.metricMins != nil { + mins := tm.metricMins[i] + if len(mins) > 0 { + dp.SetMin(mins[index]) + } + } + if tm.metricMaxes != nil { + maxes := tm.metricMaxes[i] + if len(maxes) > 0 { + dp.SetMax(maxes[index]) + } + } + dp.BucketCounts().FromRaw(tm.metricBuckets[i][index]) + if len(tm.flags) > i && len(tm.flags[i]) > index { + dp.SetFlags(tm.flags[i][index]) + } + } + } +} + type cumulativeToDeltaTest struct { name string include MatchMetrics exclude MatchMetrics inMetrics pmetric.Metrics outMetrics pmetric.Metrics + wantError error } func TestCumulativeToDeltaProcessor(t *testing.T) { @@ -436,6 +503,123 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { isMonotonic: []bool{true}, }), }, + { + name: "cumulative_to_delta_exclude_sum_metrics", + include: MatchMetrics{}, + exclude: MatchMetrics{ + MetricTypes: []string{"sum"}, + }, + inMetrics: generateMixedTestMetrics( + testSumMetric{ + metricNames: []string{"metric_1"}, + metricValues: [][]float64{{0, 100, 200, 500}}, + isCumulative: []bool{true, true}, + isMonotonic: []bool{true, true}, + }, + testHistogramMetric{ + metricNames: []string{"metric_2"}, + metricCounts: [][]uint64{{0, 100, 200, 500}}, + metricSums: [][]float64{{0, 100, 200, 500}}, + metricBuckets: [][][]uint64{ + {{0, 0, 0}, {50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + }, + metricMins: [][]float64{ + {0, 5.0, 2.0, 3.0}, + }, + metricMaxes: [][]float64{ + {0, 800.0, 825.0, 800.0}, + }, + isCumulative: []bool{true}, + }, + ), + outMetrics: generateMixedTestMetrics( + testSumMetric{ + metricNames: []string{"metric_1"}, + metricValues: [][]float64{{0, 100, 200, 500}}, + isCumulative: []bool{true}, + isMonotonic: []bool{true}, + }, + testHistogramMetric{ + metricNames: []string{"metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}}, + metricSums: [][]float64{{100, 100, 300}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + }, + metricMins: [][]float64{ + nil, + }, + metricMaxes: [][]float64{ + nil, + }, + isCumulative: []bool{false}, + }), + }, + { + name: "cumulative_to_delta_include_histogram_metrics", + include: MatchMetrics{ + MetricTypes: []string{"histogram"}, + }, + inMetrics: generateMixedTestMetrics( + testSumMetric{ + metricNames: []string{"metric_1"}, + metricValues: [][]float64{{0, 100, 200, 500}}, + isCumulative: []bool{true, true}, + isMonotonic: []bool{true, true}, + }, + testHistogramMetric{ + metricNames: []string{"metric_2"}, + metricCounts: [][]uint64{{0, 100, 200, 500}}, + metricSums: [][]float64{{0, 100, 200, 500}}, + metricBuckets: [][][]uint64{ + {{0, 0, 0}, {50, 25, 25}, {100, 50, 50}, {250, 125, 125}}, + }, + metricMins: [][]float64{ + {0, 5.0, 2.0, 3.0}, + }, + metricMaxes: [][]float64{ + {0, 800.0, 825.0, 800.0}, + }, + isCumulative: []bool{true}, + }, + ), + outMetrics: generateMixedTestMetrics( + testSumMetric{ + metricNames: []string{"metric_1"}, + metricValues: [][]float64{{0, 100, 200, 500}}, + isCumulative: []bool{true}, + isMonotonic: []bool{true}, + }, + testHistogramMetric{ + metricNames: []string{"metric_2"}, + metricCounts: [][]uint64{{100, 100, 300}}, + metricSums: [][]float64{{100, 100, 300}}, + metricBuckets: [][][]uint64{ + {{50, 25, 25}, {50, 25, 25}, {150, 75, 75}}, + }, + metricMins: [][]float64{ + nil, + }, + metricMaxes: [][]float64{ + nil, + }, + isCumulative: []bool{false}, + }), + }, + { + name: "cumulative_to_delta_unsupported_include_metric_type", + include: MatchMetrics{ + MetricTypes: []string{"summary"}, + }, + wantError: errors.New("unsupported metric type filter: summary"), + }, + { + name: "cumulative_to_delta_unsupported_exclude_metric_type", + include: MatchMetrics{ + MetricTypes: []string{"summary"}, + }, + wantError: errors.New("unsupported metric type filter: summary"), + }, } for _, test := range testCases { @@ -453,6 +637,12 @@ func TestCumulativeToDeltaProcessor(t *testing.T) { cfg, next, ) + + if test.wantError != nil { + require.ErrorContains(t, err, test.wantError.Error()) + require.Nil(t, mgp) + return + } assert.NotNil(t, mgp) assert.NoError(t, err) @@ -540,27 +730,7 @@ func generateTestSumMetrics(tm testSumMetric) pmetric.Metrics { rm := md.ResourceMetrics().AppendEmpty() ms := rm.ScopeMetrics().AppendEmpty().Metrics() - for i, name := range tm.metricNames { - m := ms.AppendEmpty() - m.SetName(name) - sum := m.SetEmptySum() - sum.SetIsMonotonic(tm.isMonotonic[i]) - - if tm.isCumulative[i] { - sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } else { - sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) - } - - for index, value := range tm.metricValues[i] { - dp := m.Sum().DataPoints().AppendEmpty() - dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) - dp.SetDoubleValue(value) - if len(tm.flags) > i && len(tm.flags[i]) > index { - dp.SetFlags(tm.flags[i][index]) - } - } - } + tm.addToMetrics(ms, now) return md } @@ -571,44 +741,20 @@ func generateTestHistogramMetrics(tm testHistogramMetric) pmetric.Metrics { rm := md.ResourceMetrics().AppendEmpty() ms := rm.ScopeMetrics().AppendEmpty().Metrics() - for i, name := range tm.metricNames { - m := ms.AppendEmpty() - m.SetName(name) - hist := m.SetEmptyHistogram() + tm.addToMetrics(ms, now) - if tm.isCumulative[i] { - hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } else { - hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) - } + return md +} - for index, count := range tm.metricCounts[i] { - dp := m.Histogram().DataPoints().AppendEmpty() - dp.SetTimestamp(pcommon.NewTimestampFromTime(now.Add(10 * time.Second))) - dp.SetCount(count) +func generateMixedTestMetrics(tsm testSumMetric, thm testHistogramMetric) pmetric.Metrics { + md := pmetric.NewMetrics() + now := time.Now() - sums := tm.metricSums[i] - if len(sums) > 0 { - dp.SetSum(sums[index]) - } - if tm.metricMins != nil { - mins := tm.metricMins[i] - if len(mins) > 0 { - dp.SetMin(mins[index]) - } - } - if tm.metricMaxes != nil { - maxes := tm.metricMaxes[i] - if len(maxes) > 0 { - dp.SetMax(maxes[index]) - } - } - dp.BucketCounts().FromRaw(tm.metricBuckets[i][index]) - if len(tm.flags) > i && len(tm.flags[i]) > index { - dp.SetFlags(tm.flags[i][index]) - } - } - } + rm := md.ResourceMetrics().AppendEmpty() + ms := rm.ScopeMetrics().AppendEmpty().Metrics() + + tsm.addToMetrics(ms, now) + thm.addToMetrics(ms, now) return md } diff --git a/processor/cumulativetodeltaprocessor/testdata/config.yaml b/processor/cumulativetodeltaprocessor/testdata/config.yaml index 31775d239adb..07945488d5c5 100644 --- a/processor/cumulativetodeltaprocessor/testdata/config.yaml +++ b/processor/cumulativetodeltaprocessor/testdata/config.yaml @@ -42,6 +42,49 @@ cumulativetodelta/regexp: - b* max_staleness: 10s +cumulativetodelta/metric_type_filter: + include: + match_type: regexp + metrics: + - a* + metric_types: + - sum + exclude: + match_type: regexp + metrics: + - b* + metric_types: + - histogram + max_staleness: 10s + +cumulativetodelta/invalid_include_metric_type_filter: + include: + match_type: regexp + metrics: + - a* + metric_types: + - gauge + exclude: + match_type: regexp + metrics: + - b* + metric_types: + - histogram + max_staleness: 10s + +cumulativetodelta/invalid_exclude_metric_type_filter: + include: + match_type: regexp + metrics: + - a* + exclude: + match_type: regexp + metrics: + - b* + metric_types: + - Invalid + max_staleness: 10s + cumulativetodelta/auto: initial_value: auto