From 9466fb255f89d0ea7245db7ea75a56c3a0ea239d Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 19 Nov 2024 12:06:50 +0100 Subject: [PATCH] processor/deltatocumulative: telemetry tests (#35742) #### Description Tests internal telemetry (metadata.TelemetryBuilder) is recorded as expected. Introduces `internal/testing/sdktest` for this. Introduces `-- telemetry --` section to testdata. #### Testing Existing tests were extended to have a `-- telemetry --` section that specifies expected meter readings in `sdktest.Format` #### Documentation not needed --- .../internal/data/datatest/equal.go | 2 +- .../internal/lineartelemetry/metrics.go | 9 +- .../datatest => testing}/compare/compare.go | 21 ++- .../internal/testing/sdktest/compare.go | 136 ++++++++++++++++++ .../internal/testing/sdktest/example_test.go | 80 +++++++++++ .../internal/testing/sdktest/into.go | 68 +++++++++ .../internal/testing/sdktest/metrics.go | 125 ++++++++++++++++ .../internal/{ => testing}/testar/decode.go | 8 +- .../{ => testing}/testar/read_test.go | 4 +- .../processor_test.go | 39 +++-- .../testdata/exponential/1.test | 96 +++++++++++++ .../testdata/histograms/1.test | 53 +++++++ .../testdata/limit/1.test | 8 ++ .../testdata/limit/2.test | 10 ++ .../testdata/notemporality-ignored/1.test | 19 +++ .../testdata/timestamps/1.test | 8 ++ .../testdata/tracking/1.test | 3 + 17 files changed, 665 insertions(+), 24 deletions(-) rename processor/deltatocumulativeprocessor/internal/{data/datatest => testing}/compare/compare.go (54%) create mode 100644 processor/deltatocumulativeprocessor/internal/testing/sdktest/compare.go create mode 100644 processor/deltatocumulativeprocessor/internal/testing/sdktest/example_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/testing/sdktest/into.go create mode 100644 processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go rename processor/deltatocumulativeprocessor/internal/{ => testing}/testar/decode.go (93%) rename processor/deltatocumulativeprocessor/internal/{ => testing}/testar/read_test.go (93%) create mode 100644 processor/deltatocumulativeprocessor/testdata/exponential/1.test create mode 100644 processor/deltatocumulativeprocessor/testdata/histograms/1.test diff --git a/processor/deltatocumulativeprocessor/internal/data/datatest/equal.go b/processor/deltatocumulativeprocessor/internal/data/datatest/equal.go index 6e0ed0f7fcc1..a5aabb91b59e 100644 --- a/processor/deltatocumulativeprocessor/internal/data/datatest/equal.go +++ b/processor/deltatocumulativeprocessor/internal/data/datatest/equal.go @@ -11,8 +11,8 @@ import ( "github.com/stretchr/testify/require" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare" ) // T is the testing helper. Most notably it provides [T.Equal] diff --git a/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go index c81068d75c79..7576883075c5 100644 --- a/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go @@ -16,12 +16,13 @@ import ( ) func New(set component.TelemetrySettings) (Metrics, error) { + zero := func() int { return -1 } m := Metrics{ - tracked: func() int { return 0 }, + tracked: &zero, } trackedCb := metadata.WithDeltatocumulativeStreamsTrackedLinearCallback(func() int64 { - return int64(m.tracked()) + return int64((*m.tracked)()) }) telb, err := metadata.NewTelemetryBuilder(set, trackedCb) @@ -36,7 +37,7 @@ func New(set component.TelemetrySettings) (Metrics, error) { type Metrics struct { metadata.TelemetryBuilder - tracked func() int + tracked *func() int } func (m Metrics) Datapoints() Counter { @@ -44,7 +45,7 @@ func (m Metrics) Datapoints() Counter { } func (m *Metrics) WithTracked(streams func() int) { - m.tracked = streams + *m.tracked = streams } func Error(msg string) attribute.KeyValue { diff --git a/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go b/processor/deltatocumulativeprocessor/internal/testing/compare/compare.go similarity index 54% rename from processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go rename to processor/deltatocumulativeprocessor/internal/testing/compare/compare.go index eb8c0f11174a..d3c6c927566a 100644 --- a/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go +++ b/processor/deltatocumulativeprocessor/internal/testing/compare/compare.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare" +package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare" import ( "reflect" @@ -11,17 +11,28 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" ) -var Opts = []cmp.Option{ +var allow = []string{ + "go.opentelemetry.io/collector/pdata", + "go.opentelemetry.io/otel", + "github.com/open-telemetry/opentelemetry-collector-contrib", +} + +var Opts = cmp.Options{ cmpopts.EquateApprox(0, 1e-9), cmp.Exporter(func(ty reflect.Type) bool { - return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") || strings.HasPrefix(ty.PkgPath(), "github.com/open-telemetry/opentelemetry-collector-contrib") + for _, prefix := range allow { + if strings.HasPrefix(ty.PkgPath(), prefix) { + return true + } + } + return false }), } func Equal[T any](a, b T, opts ...cmp.Option) bool { - return cmp.Equal(a, b, append(Opts, opts...)...) + return cmp.Equal(a, b, Opts, cmp.Options(opts)) } func Diff[T any](a, b T, opts ...cmp.Option) string { - return cmp.Diff(a, b, append(Opts, opts...)...) + return cmp.Diff(a, b, Opts, cmp.Options(opts)) } diff --git a/processor/deltatocumulativeprocessor/internal/testing/sdktest/compare.go b/processor/deltatocumulativeprocessor/internal/testing/sdktest/compare.go new file mode 100644 index 000000000000..d6af542b1dd9 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/testing/sdktest/compare.go @@ -0,0 +1,136 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// sdktest performs partial comparison of [sdk.ResourceMetrics] to a [Spec]. +package sdktest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" + +import ( + "context" + "fmt" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric" + sdk "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare" +) + +type Option = cmp.Option + +// Test the metrics returned by [metric.ManualReader.Collect] against the [Spec] +func Test(spec Spec, mr *metric.ManualReader, opts ...Option) error { + var rm sdk.ResourceMetrics + if err := mr.Collect(context.Background(), &rm); err != nil { + return err + } + return Compare(spec, rm, opts...) +} + +// Compare the [sdk.ResourceMetrics] against the [Spec] +func Compare(spec Spec, rm sdk.ResourceMetrics, opts ...Option) error { + got := Flatten(rm) + want := Metrics(spec) + + diff := compare.Diff(want, got, + IgnoreUnspec(spec), + IgnoreTime(), + IgnoreMetadata(), + cmpopts.EquateEmpty(), + Transform(), + Sort(), + cmp.Options(opts), + ) + + if diff != "" { + return fmt.Errorf("\n%s", diff) + } + return nil +} + +// IgnoreTime ignores [sdk.DataPoint.Time] and [sdk.DataPoint.StartTime], +// because those are changing per run and typically not of interest. +func IgnoreTime() Option { + return cmp.Options{ + cmpopts.IgnoreFields(sdk.DataPoint[int64]{}, "StartTime", "Time"), + cmpopts.IgnoreFields(sdk.DataPoint[float64]{}, "StartTime", "Time"), + } +} + +// IgnoreTime ignores [sdk.Metrics.Unit] and [sdk.Metrics.Description], +// because those are usually static +func IgnoreMetadata() Option { + return cmpopts.IgnoreFields(sdk.Metrics{}, "Description", "Unit") +} + +// IgnoreUnspec ignores any Metrics not present in the [Spec] +func IgnoreUnspec(spec Spec) Option { + return cmpopts.IgnoreSliceElements(func(m sdk.Metrics) bool { + _, ok := spec[m.Name] + return !ok + }) +} + +// Sort [sdk.Metrics] by name and [sdk.DataPoint] by their [attribute.Set] +func Sort() Option { + return cmp.Options{ + cmpopts.SortSlices(func(a, b sdk.Metrics) bool { + return a.Name < b.Name + }), + sort[int64](), sort[float64](), + } +} + +func sort[N int64 | float64]() Option { + return cmpopts.SortSlices(func(a, b DataPoint[N]) bool { + as := a.DataPoint.Attributes.Encoded(attribute.DefaultEncoder()) + bs := b.DataPoint.Attributes.Encoded(attribute.DefaultEncoder()) + return as < bs + }) +} + +// DataPoint holds a [sdk.DataPoints] and its attributes as a plain map. +// See [Transform] +type DataPoint[N int64 | float64] struct { + Attributes map[string]any + sdk.DataPoint[N] +} + +// Transform turns []sdk.DataPoint[N] into []DataPoint[N]. +// +// Primarily done to have DataPoint.Attributes as a flat, diffable map instead +// of the hard to understand internal structure of [attribute.Set], which is +// being truncated by go-cmp before reaching the depth where attribute values +// appear. +// +// This must happen on the slice level, transforming the values is not +// sufficient because when entire DataPoints are added / removed, go-cmp does +// not apply transformers on the fields. +func Transform() Option { + return cmp.Options{ + transform[int64](), + transform[float64](), + cmpopts.IgnoreTypes(attribute.Set{}), + } +} + +func transform[N int64 | float64]() Option { + return cmpopts.AcyclicTransformer(fmt.Sprintf("sdktest.Transform.%T", *new(N)), + func(dps []sdk.DataPoint[N]) []DataPoint[N] { + out := make([]DataPoint[N], len(dps)) + for i, dp := range dps { + out[i] = DataPoint[N]{DataPoint: dp, Attributes: attrMap(dp.Attributes)} + } + return out + }, + ) +} + +func attrMap(set attribute.Set) map[string]any { + m := make(map[string]any) + for _, kv := range set.ToSlice() { + m[string(kv.Key)] = kv.Value.AsInterface() + } + return m +} diff --git a/processor/deltatocumulativeprocessor/internal/testing/sdktest/example_test.go b/processor/deltatocumulativeprocessor/internal/testing/sdktest/example_test.go new file mode 100644 index 000000000000..e7dc5ba5d067 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/testing/sdktest/example_test.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sdktest + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + sdk "go.opentelemetry.io/otel/sdk/metric" +) + +// The output of [Test] and [Compare] is similar to the following: +// +// []metricdata.Metrics{ +// - { +// - Name: "not.exist", +// - Data: metricdata.Sum[float64]{ +// - DataPoints: []metricdata.DataPoint[float64]{{...}}, +// - Temporality: s"CumulativeTemporality", +// - }, +// - }, +// { +// Name: "requests.total", +// Description: "I will be inherited", +// Unit: "", +// Data: metricdata.Sum[int64]{ +// DataPoints: []metricdata.DataPoint[int64](Inverse(sdktest.Transform.int64, []sdktest.DataPoint[int64]{ +// {DataPoint: {StartTime: s"2024-10-11 11:23:37.966150738 +0200 CEST m=+0.001489569", Time: s"2024-10-11 11:23:37.966174238 +0200 CEST m=+0.001513070", Value: 20, ...}, Attributes: {}}, +// { +// DataPoint: metricdata.DataPoint[int64]{ +// ... // 1 ignored field +// StartTime: s"2024-10-11 11:23:37.966150738 +0200 CEST m=+0.001489569", +// Time: s"2024-10-11 11:23:37.966174238 +0200 CEST m=+0.001513070", +// - Value: 4, +// + Value: 3, +// Exemplars: nil, +// }, +// Attributes: {"error": string("limit")}, +// }, +// })), +// Temporality: s"CumulativeTemporality", +// IsMonotonic: true, +// }, +// }, +// } +// +// Which is used as follows: +func Example() { + var spec Spec + _ = Unmarshal([]byte(` +gauge streams.tracked: + - int: 40 + +counter requests.total: + - int: 20 + - int: 4 + attr: {error: "limit"} + +updown not.exist: + - float: 33.3 +`), &spec) + + mr := sdk.NewManualReader() + meter := sdk.NewMeterProvider(sdk.WithReader(mr)).Meter("test") + + ctx := context.TODO() + + gauge, _ := meter.Int64Gauge("streams.tracked") + gauge.Record(ctx, 40) + + count, _ := meter.Int64Counter("requests.total", metric.WithDescription("I will be inherited")) + count.Add(ctx, 20) + count.Add(ctx, 3, metric.WithAttributes(attribute.String("error", "limit"))) + + err := Test(spec, mr) + fmt.Println(err) +} diff --git a/processor/deltatocumulativeprocessor/internal/testing/sdktest/into.go b/processor/deltatocumulativeprocessor/internal/testing/sdktest/into.go new file mode 100644 index 000000000000..ca65c55512ab --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/testing/sdktest/into.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sdktest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" + +import ( + sdk "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// Metrics returns the [sdk.Metrics] defined by this [Spec] +func Metrics(spec Spec) []sdk.Metrics { + md := make([]sdk.Metrics, 0, len(spec)) + for _, spec := range spec { + md = append(md, spec.Into()) + } + return md +} + +func (spec Metric) Into() sdk.Metrics { + m := sdk.Metrics{Name: spec.Name} + if len(spec.Numbers) == 0 { + return m + } + + var ( + ints []sdk.DataPoint[int64] + floats []sdk.DataPoint[float64] + ) + for _, n := range spec.Numbers { + attr := n.Attr.Into() + switch { + case n.Int != nil: + ints = append(ints, sdk.DataPoint[int64]{Attributes: attr, Value: *n.Int}) + case n.Float != nil: + floats = append(floats, sdk.DataPoint[float64]{Attributes: attr, Value: *n.Float}) + } + } + + switch { + case spec.Type == TypeGauge && ints != nil: + m.Data = sdk.Gauge[int64]{DataPoints: ints} + case spec.Type == TypeGauge && floats != nil: + m.Data = sdk.Gauge[float64]{DataPoints: floats} + case spec.Type == TypeSum && ints != nil: + m.Data = sdk.Sum[int64]{DataPoints: ints, Temporality: spec.Temporality, IsMonotonic: spec.Monotonic} + case spec.Type == TypeSum && floats != nil: + m.Data = sdk.Sum[float64]{DataPoints: floats, Temporality: spec.Temporality, IsMonotonic: spec.Monotonic} + } + + return m +} + +// Flatten turns the nested [sdk.ResourceMetrics] structure into a flat +// [sdk.Metrics] slice. If a metric is present multiple time in different scopes +// / resources, the last occurrence is used. +func Flatten(rm sdk.ResourceMetrics) []sdk.Metrics { + set := make(map[string]sdk.Metrics) + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + set[m.Name] = m + } + } + md := make([]sdk.Metrics, 0, len(set)) + for _, m := range set { + md = append(md, m) + } + return md +} diff --git a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go new file mode 100644 index 000000000000..440d094ba274 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go @@ -0,0 +1,125 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sdktest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" + +import ( + "fmt" + "strings" + + "go.opentelemetry.io/otel/attribute" + sdk "go.opentelemetry.io/otel/sdk/metric/metricdata" + "gopkg.in/yaml.v3" +) + +// Spec is the partial metric specification. To be used with [Compare] +type Spec = map[string]Metric + +type Type string + +const ( + TypeSum Type = "sum" + TypeGauge Type = "gauge" +) + +type Metric struct { + Type + Name string + + Numbers []Number + Monotonic bool + Temporality sdk.Temporality +} + +type Number struct { + Int *int64 + Float *float64 + Attr attributes +} + +// Unmarshal specification in [Format] into the given [Spec]. +func Unmarshal(data Format, into *Spec) error { + var doc map[string]yaml.Node + if err := yaml.Unmarshal(data, &doc); err != nil { + return err + } + + if *into == nil { + *into = make(map[string]Metric, len(doc)) + } + md := *into + + for key, node := range doc { + args := strings.Fields(key) + if len(args) < 2 { + return fmt.Errorf("key must of form ' ', but got %q", key) + } + + m := Metric{Name: args[1]} + switch args[0] { + case "counter": + m.Type = TypeSum + m.Monotonic = true + case "updown": + m.Type = TypeSum + m.Monotonic = false + case "gauge": + m.Type = TypeGauge + default: + return fmt.Errorf("no such instrument type: %q", args[0]) + } + + m.Temporality = sdk.CumulativeTemporality + for _, arg := range args[2:] { + switch arg { + case "delta": + m.Temporality = sdk.DeltaTemporality + case "cumulative": + m.Temporality = sdk.CumulativeTemporality + } + } + + var into any + switch m.Type { + case TypeGauge, TypeSum: + into = &m.Numbers + default: + panic("unreachable") + } + + if err := node.Decode(into); err != nil { + return err + } + + md[m.Name] = m + } + + return nil +} + +type attributes map[string]string + +func (attr attributes) Into() attribute.Set { + kvs := make([]attribute.KeyValue, 0, len(attr)) + for k, v := range attr { + kvs = append(kvs, attribute.String(k, v)) + } + return attribute.NewSet(kvs...) +} + +// Format defines the yaml-based format to be used with [Unmarshal] for specifying [Spec]. +// +// It looks as follows: +// +// [ delta|cumulative ]: +// - int: | float: +// attr: +// [string]: +// +// The supported instruments are: +// - counter: [TypeSum], monotonic +// - updown: [TypeSum], non-monotonic +// - gauge: [TypeGauge] +// +// Temporality is optional and defaults to [sdk.CumulativeTemporality] +type Format = []byte diff --git a/processor/deltatocumulativeprocessor/internal/testar/decode.go b/processor/deltatocumulativeprocessor/internal/testing/testar/decode.go similarity index 93% rename from processor/deltatocumulativeprocessor/internal/testar/decode.go rename to processor/deltatocumulativeprocessor/internal/testing/testar/decode.go index 13c6e42ab2d9..2d01f34174c2 100644 --- a/processor/deltatocumulativeprocessor/internal/testar/decode.go +++ b/processor/deltatocumulativeprocessor/internal/testing/testar/decode.go @@ -16,7 +16,7 @@ // err := Read(data, &into) // // See [Read] and [Parser] for examples. -package testar // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testar" +package testar // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/testar" import ( "fmt" @@ -98,8 +98,10 @@ type Format struct { parse func(file []byte, into any) error } -func Parser(name string, fn func(data []byte, into any) error) Format { - return Format{name: name, parse: fn} +func Parser[T any](name string, fn func([]byte, *T) error) Format { + return Format{name: name, parse: func(file []byte, ptr any) error { + return fn(file, ptr.(*T)) + }} } // LiteralParser sets data unaltered into a []byte or string diff --git a/processor/deltatocumulativeprocessor/internal/testar/read_test.go b/processor/deltatocumulativeprocessor/internal/testing/testar/read_test.go similarity index 93% rename from processor/deltatocumulativeprocessor/internal/testar/read_test.go rename to processor/deltatocumulativeprocessor/internal/testing/testar/read_test.go index 99b59bb9249d..ffbe38215efb 100644 --- a/processor/deltatocumulativeprocessor/internal/testar/read_test.go +++ b/processor/deltatocumulativeprocessor/internal/testing/testar/read_test.go @@ -45,12 +45,12 @@ func ExampleParser() { Foobar int `testar:"foobar,atoi"` } - _ = Read(data, &into, Parser("atoi", func(file []byte, into any) error { + _ = Read(data, &into, Parser("atoi", func(file []byte, into *int) error { n, err := strconv.Atoi(strings.TrimSpace(string(file))) if err != nil { return err } - *(into.(*int)) = n + *into = n return nil })) diff --git a/processor/deltatocumulativeprocessor/processor_test.go b/processor/deltatocumulativeprocessor/processor_test.go index 506cd4a7a511..df5257d86d86 100644 --- a/processor/deltatocumulativeprocessor/processor_test.go +++ b/processor/deltatocumulativeprocessor/processor_test.go @@ -18,12 +18,12 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/otel/sdk/metric/metricdata" "gopkg.in/yaml.v3" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testar" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/compare" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/testar" ) func TestProcessor(t *testing.T) { @@ -38,11 +38,14 @@ func TestProcessor(t *testing.T) { type Stage struct { In pmetric.Metrics `testar:"in,pmetric"` Out pmetric.Metrics `testar:"out,pmetric"` + + Sdk sdktest.Spec `testar:"telemetry,sdk"` } read := func(file string, into *Stage) error { return testar.ReadFile(file, into, testar.Parser("pmetric", unmarshalMetrics), + testar.Parser("sdk", sdktest.Unmarshal), ) } @@ -54,7 +57,9 @@ func TestProcessor(t *testing.T) { ctx := context.Background() cfg := config(t, file("config.yaml")) - proc, sink := setup(t, cfg) + + st := setup(t, cfg) + proc, sink := st.proc, st.sink stages, _ := filepath.Glob(file("*.test")) for _, file := range stages { @@ -70,6 +75,10 @@ func TestProcessor(t *testing.T) { if diff := compare.Diff(out, sink.AllMetrics()); diff != "" { t.Fatal(diff) } + + if err := sdktest.Test(stage.Sdk, st.tel.reader); err != nil { + t.Fatal(err) + } } }) } @@ -88,7 +97,7 @@ func config(t *testing.T, file string) *Config { return cfg } -func setup(t *testing.T, cfg *Config) (processor.Metrics, *consumertest.MetricsSink) { +func setup(t *testing.T, cfg *Config) State { t.Helper() next := &consumertest.MetricsSink{} @@ -96,18 +105,30 @@ func setup(t *testing.T, cfg *Config) (processor.Metrics, *consumertest.MetricsS cfg = &Config{MaxStale: 0, MaxStreams: math.MaxInt} } + tt := setupTestTelemetry() proc, err := NewFactory().CreateMetrics( context.Background(), - processortest.NewNopSettings(), + tt.NewSettings(), cfg, next, ) require.NoError(t, err) - return proc, next + return State{ + proc: proc, + sink: next, + tel: tt, + } +} + +type State struct { + proc processor.Metrics + sink *consumertest.MetricsSink + + tel componentTestTelemetry } -func unmarshalMetrics(data []byte, into any) error { +func unmarshalMetrics(data []byte, into *pmetric.Metrics) error { var tmp any if err := yaml.Unmarshal(data, &tmp); err != nil { return err @@ -120,7 +141,7 @@ func unmarshalMetrics(data []byte, into any) error { if err != nil { return err } - *(into.(*pmetric.Metrics)) = md + *into = md return nil } diff --git a/processor/deltatocumulativeprocessor/testdata/exponential/1.test b/processor/deltatocumulativeprocessor/testdata/exponential/1.test new file mode 100644 index 000000000000..a8c82e51c009 --- /dev/null +++ b/processor/deltatocumulativeprocessor/testdata/exponential/1.test @@ -0,0 +1,96 @@ +-- in -- +resourceMetrics: + - schemaUrl: https://test.com/resource + scopeMetrics: + - schemaUrl: https://test.com/scope + scope: + name: Test + version: 1.2.3 + metrics: + - name: expo.simple + exponential_histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 10 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [4, 7, 9, 6, 25] + negative: + offset: 6 + bucketCounts: [2, 13, 7, 12, 4] + - timeUnixNano: 20 + scale: 4 + zeroCount: 2 + positive: + offset: 2 + bucketCounts: [1, 2, 1, 2, 1] + negative: + offset: 6 + bucketCounts: [1, 2, 1, 2, 1] + - name: expo.downscale + exponential_histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 10 + scale: 2 + zeroCount: 1 + positive: + bucketCounts: [1,2,3,4,5] + - timeUnixNano: 20 + scale: 1 + zeroCount: 0 + positive: + bucketCounts: [0,0,0,0,0] + +-- out -- +resourceMetrics: + - schemaUrl: https://test.com/resource + scopeMetrics: + - schemaUrl: https://test.com/scope + scope: + name: Test + version: 1.2.3 + metrics: + - name: expo.simple + exponential_histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 10 + scale: 4 + zeroCount: 5 + positive: + offset: 2 + bucketCounts: [4, 7, 9, 6, 25] + negative: + offset: 6 + bucketCounts: [2, 13, 7, 12, 4] + - timeUnixNano: 20 + scale: 4 + zeroCount: 7 + positive: + offset: 2 + bucketCounts: [5, 9, 10, 8, 26] + negative: + offset: 6 + bucketCounts: [3, 15, 8, 14, 5] + + - name: expo.downscale + exponential_histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 10 + scale: 2 + zeroCount: 1 + positive: + bucketCounts: [1,2,3,4,5] + - timeUnixNano: 20 + scale: 1 + zeroCount: 1 + positive: + bucketCounts: [3,7,5,0,0] + +-- telemetry -- +updown otelcol_deltatocumulative.streams.tracked: +- int: 2 diff --git a/processor/deltatocumulativeprocessor/testdata/histograms/1.test b/processor/deltatocumulativeprocessor/testdata/histograms/1.test new file mode 100644 index 000000000000..6b63c17275b9 --- /dev/null +++ b/processor/deltatocumulativeprocessor/testdata/histograms/1.test @@ -0,0 +1,53 @@ +-- in -- +resourceMetrics: + - schemaUrl: https://test.com/resource + scopeMetrics: + - schemaUrl: https://test.com/scope + scope: + name: test + version: 1.2.3 + metrics: + - name: some.histogram + histogram: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 10 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [1, 2, 3, 4, 5] + - timeUnixNano: 20 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [1, 0, 1, 0, 1] + + # bounds change + - timeUnixNano: 30 + explicitBounds: [ 0.1, 1, 10, 100] + bucketCounts: [ 1, 2, 3, 4] + +-- out -- +resourceMetrics: + - schemaUrl: https://test.com/resource + scopeMetrics: + - schemaUrl: https://test.com/scope + scope: + name: test + version: 1.2.3 + metrics: + - name: some.histogram + histogram: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 10 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [1, 2, 3, 4, 5] + - timeUnixNano: 20 + explicitBounds: [0.01, 0.1, 1, 10, 100] + bucketCounts: [2, 2, 4, 4, 6] + + # bounds change: reset + - timeUnixNano: 30 + explicitBounds: [ 0.1, 1, 10, 100] + bucketCounts: [ 1, 2, 3, 4] + +-- telemetry -- +updown otelcol_deltatocumulative.streams.tracked: +- int: 1 diff --git a/processor/deltatocumulativeprocessor/testdata/limit/1.test b/processor/deltatocumulativeprocessor/testdata/limit/1.test index 0acad04bf3c6..cdc6d8a97f39 100644 --- a/processor/deltatocumulativeprocessor/testdata/limit/1.test +++ b/processor/deltatocumulativeprocessor/testdata/limit/1.test @@ -45,3 +45,11 @@ resourceMetrics: - {timeUnixNano: 1, asDouble: 1, attributes: [{key: series, value: {stringValue: "7"}}]} - {timeUnixNano: 1, asDouble: 1, attributes: [{key: series, value: {stringValue: "8"}}]} - {timeUnixNano: 1, asDouble: 1, attributes: [{key: series, value: {stringValue: "9"}}]} + +-- telemetry -- +counter otelcol_deltatocumulative.datapoints.linear: + - int: 10 + attr: {} + +updown otelcol_deltatocumulative.streams.tracked.linear: + - int: 10 diff --git a/processor/deltatocumulativeprocessor/testdata/limit/2.test b/processor/deltatocumulativeprocessor/testdata/limit/2.test index 20cd03a7db41..236e518aee0a 100644 --- a/processor/deltatocumulativeprocessor/testdata/limit/2.test +++ b/processor/deltatocumulativeprocessor/testdata/limit/2.test @@ -47,3 +47,13 @@ resourceMetrics: - {timeUnixNano: 2, asDouble: 2, attributes: [{key: series, value: {stringValue: "8"}}]} - {timeUnixNano: 2, asDouble: 2, attributes: [{key: series, value: {stringValue: "9"}}]} # - {timeUnixNano: 2, asDouble: 2, attributes: [{key: series, value: {stringValue: "x"}}]} # dropped + +-- telemetry -- +counter otelcol_deltatocumulative.datapoints.linear: + - int: 20 + attr: {} + - int: 1 + attr: {error: "limit"} + +updown otelcol_deltatocumulative.streams.tracked.linear: + - int: 10 diff --git a/processor/deltatocumulativeprocessor/testdata/notemporality-ignored/1.test b/processor/deltatocumulativeprocessor/testdata/notemporality-ignored/1.test index c7c743bcde30..556a59bae0f5 100644 --- a/processor/deltatocumulativeprocessor/testdata/notemporality-ignored/1.test +++ b/processor/deltatocumulativeprocessor/testdata/notemporality-ignored/1.test @@ -14,6 +14,12 @@ resourceMetrics: - key: scopeattr value: { stringValue: string } metrics: + - name: test.sum + sum: + aggregationTemporality: 1 + dataPoints: + - timeUnixNano: 1 + asInt: 3 - name: test.gauge gauge: dataPoints: @@ -43,6 +49,12 @@ resourceMetrics: - key: scopeattr value: { stringValue: string } metrics: + - name: test.sum + sum: + aggregationTemporality: 2 + dataPoints: + - timeUnixNano: 1 + asInt: 3 - name: test.gauge gauge: dataPoints: @@ -55,3 +67,10 @@ resourceMetrics: quantileValues: - quantile: 0.25 value: 25 + +-- telemetry -- +counter otelcol_deltatocumulative.datapoints.linear: +- int: 1 + +updown otelcol_deltatocumulative.streams.tracked.linear: +- int: 1 diff --git a/processor/deltatocumulativeprocessor/testdata/timestamps/1.test b/processor/deltatocumulativeprocessor/testdata/timestamps/1.test index 4f6d48c54e36..cae1f8af87ab 100644 --- a/processor/deltatocumulativeprocessor/testdata/timestamps/1.test +++ b/processor/deltatocumulativeprocessor/testdata/timestamps/1.test @@ -34,3 +34,11 @@ resourceMetrics: - {startTimeUnixNano: 1000, timeUnixNano: 1100, asDouble: 0} - {startTimeUnixNano: 1000, timeUnixNano: 1200, asDouble: 0} - {startTimeUnixNano: 1000, timeUnixNano: 1400, asDouble: 0} + +-- telemetry -- +counter otelcol_deltatocumulative.datapoints.linear: +- int: 3 +- attr: {error: "delta.ErrOutOfOrder"} + int: 1 +- attr: {error: "delta.ErrOlderStart"} + int: 1 diff --git a/processor/deltatocumulativeprocessor/testdata/tracking/1.test b/processor/deltatocumulativeprocessor/testdata/tracking/1.test index 76ab437989c2..152fee2b5934 100644 --- a/processor/deltatocumulativeprocessor/testdata/tracking/1.test +++ b/processor/deltatocumulativeprocessor/testdata/tracking/1.test @@ -374,3 +374,6 @@ resourceMetrics: - {key: "67ef", value: {stringValue: "4299"}} name: 58a7 version: 1cd0 + +-- telemetry -- +# skip