From 68e7273f0ebb235fd601a46822d4ff2b109bef4a Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 21 Nov 2024 10:41:45 +0100 Subject: [PATCH 1/4] deltatocumulative: linear (exponential) histograms expands the linear architecture to do exponential and fixed-width histograms. --- .../internal/delta/delta.go | 22 ++- .../internal/lineartelemetry/attr.go | 9 + .../internal/metrics/data.go | 2 + .../internal/metrics/metrics.go | 48 +++++- .../deltatocumulativeprocessor/linear.go | 159 +++++++++--------- .../testdata/exponential/1.test | 2 +- .../testdata/histograms/1.test | 2 +- 7 files changed, 161 insertions(+), 83 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index e8d71d669f12..3320d44f2724 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -7,6 +7,7 @@ import ( "fmt" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" @@ -83,10 +84,17 @@ func (e ErrGap) Error() string { return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) } +type Type interface { + pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint + + StartTimestamp() pcommon.Timestamp + Timestamp() pcommon.Timestamp +} + // AccumulateInto adds state and dp, storing the result in state // // state = state + dp -func AccumulateInto[P data.Point[P]](state P, dp P) error { +func AccumulateInto[T Type](state, dp T) error { switch { case dp.StartTimestamp() < state.StartTimestamp(): // belongs to older series @@ -96,6 +104,16 @@ func AccumulateInto[P data.Point[P]](state P, dp P) error { return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()} } - state.Add(dp) + switch dp := any(dp).(type) { + case pmetric.NumberDataPoint: + state := any(state).(pmetric.NumberDataPoint) + data.Number{NumberDataPoint: state}.Add(data.Number{NumberDataPoint: dp}) + case pmetric.HistogramDataPoint: + state := any(state).(pmetric.HistogramDataPoint) + data.Histogram{HistogramDataPoint: state}.Add(data.Histogram{HistogramDataPoint: dp}) + case pmetric.ExponentialHistogramDataPoint: + state := any(state).(pmetric.ExponentialHistogramDataPoint) + data.ExpHistogram{DataPoint: state}.Add(data.ExpHistogram{DataPoint: dp}) + } return nil } diff --git a/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go b/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go new file mode 100644 index 000000000000..a2a2e224a0ca --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go @@ -0,0 +1,9 @@ +package telemetry + +import "go.opentelemetry.io/otel/attribute" + +type Attributes []attribute.KeyValue + +func (a *Attributes) Set(attr attribute.KeyValue) { + *a = append(*a, attr) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index 9fa1df07eb1d..08e1aa4b8ae8 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -115,6 +115,7 @@ func (s Gauge) Filter(expr func(data.Number) bool) { return !expr(data.Number{NumberDataPoint: dp}) }) } +func (s Gauge) SetAggregationTemporality(pmetric.AggregationTemporality) {} type Summary Metric @@ -136,3 +137,4 @@ func (s Summary) Filter(expr func(data.Summary) bool) { return !expr(data.Summary{SummaryDataPoint: dp}) }) } +func (s Summary) SetAggregationTemporality(pmetric.AggregationTemporality) {} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index 98388dbf5eb6..b19b03f1a1c7 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -47,7 +47,7 @@ func (m Metric) AggregationTemporality() pmetric.AggregationTemporality { return pmetric.AggregationTemporalityUnspecified } -func (m Metric) Typed() any { +func (m Metric) Typed() Any { //exhaustive:enforce switch m.Type() { case pmetric.MetricTypeSum: @@ -63,3 +63,49 @@ func (m Metric) Typed() any { } panic("unreachable") } + +var ( + _ Any = Sum{} + _ Any = Gauge{} + _ Any = ExpHistogram{} + _ Any = Histogram{} + _ Any = Summary{} +) + +type Any interface { + Len() int + Ident() identity.Metric + + SetAggregationTemporality(pmetric.AggregationTemporality) +} + +func (m Metric) Filter(ok func(id identity.Stream, dp any) bool) { + mid := m.Ident() + switch m.Type() { + case pmetric.MetricTypeSum: + m.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeGauge: + m.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeHistogram: + m.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeExponentialHistogram: + m.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeSummary: + m.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + } +} diff --git a/processor/deltatocumulativeprocessor/linear.go b/processor/deltatocumulativeprocessor/linear.go index 0b547fe5145d..2b725b7dc78d 100644 --- a/processor/deltatocumulativeprocessor/linear.go +++ b/processor/deltatocumulativeprocessor/linear.go @@ -5,7 +5,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" - "errors" "sync" "time" @@ -16,22 +15,19 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) var _ processor.Metrics = (*Linear)(nil) type Linear struct { - next processor.Metrics + next consumer.Metrics cfg Config - state state - mtx sync.Mutex + last state + mtx sync.Mutex ctx context.Context cancel context.CancelFunc @@ -40,16 +36,16 @@ type Linear struct { tel telemetry.Metrics } -func newLinear(cfg *Config, tel telemetry.Metrics, next processor.Metrics) *Linear { +func newLinear(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Linear { ctx, cancel := context.WithCancel(context.Background()) proc := Linear{ next: next, cfg: *cfg, - state: state{ - nums: make(exp.HashMap[data.Number]), - hist: make(exp.HashMap[data.Histogram]), - expo: make(exp.HashMap[data.ExpHistogram]), + last: state{ + nums: make(map[identity.Stream]pmetric.NumberDataPoint), + hist: make(map[identity.Stream]pmetric.HistogramDataPoint), + expo: make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint), }, ctx: ctx, cancel: cancel, @@ -58,7 +54,7 @@ func newLinear(cfg *Config, tel telemetry.Metrics, next processor.Metrics) *Line tel: tel, } - tel.WithTracked(proc.state.Len) + tel.WithTracked(proc.last.Len) cfg.Metrics(tel) return &proc @@ -75,68 +71,68 @@ func (p *Linear) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { drop = false ) - // possible errors encountered while aggregating. - // errors.Join-ed []streams.Error - var errs error - metrics.Filter(md, func(m metrics.Metric) bool { if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { return keep } - // NOTE: to make review and migration easier, below only does sums for now. - // all other datatypes are handled by older code, which is called after this. - // - // TODO: implement other datatypes here - if m.Type() != pmetric.MetricTypeSum { - return keep - } - - sum := metrics.Sum(m) - state := p.state.nums - - // apply fn to each dp in stream. if fn's err != nil, dp is removed from stream - err := streams.Apply(sum, func(id identity.Stream, dp data.Number) (data.Number, error) { - acc, ok := state.Load(id) - // if at stream limit and stream not seen before, reject - if !ok && p.state.Len() >= p.cfg.MaxStreams { - p.tel.Datapoints().Inc(ctx, telemetry.Error("limit")) - return dp, streams.Drop + // aggregate the datapoints. + // using filter here, as the pmetric.*DataPoint are reference types so + // we can modify them using their "value". + m.Filter(func(id identity.Stream, dp any) bool { + // count the processed datatype. + // uses whatever value of attrs has at return-time + var attrs telemetry.Attributes + defer func() { p.tel.Datapoints().Inc(ctx, attrs...) }() + + // if stream new and state capacity reached, reject + exist := p.last.Has(id) + if !exist && p.last.Len() >= p.cfg.MaxStreams { + attrs.Set(telemetry.Error("limit")) + return drop } - // stream is alive, update stale tracker + // stream is ok and active, update stale tracker p.stale.Refresh(now, id) - acc, err := func() (data.Number, error) { - if !ok { - // new stream: there is no existing aggregation, so start new with current dp - return dp.Clone(), nil - } - // tracked stream: add incoming delta dp to existing cumulative aggregation - return acc, delta.AccumulateInto(acc, dp) - }() - // aggregation failed, record as metric and drop datapoint + // this is the first sample of the stream. there is nothing to + // aggregate with, so clone this value into the state and done + if !exist { + p.last.BeginWith(id, dp) + return keep + } + + // aggregate with state from previous requests. + // delta.AccumulateInto(state, dp) stores result in `state`. + // this is then copied into `dp` (the value passed onto the pipeline) + var err error + switch dp := dp.(type) { + case pmetric.NumberDataPoint: + state := p.last.nums[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + case pmetric.HistogramDataPoint: + state := p.last.hist[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + case pmetric.ExponentialHistogramDataPoint: + state := p.last.expo[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + } if err != nil { - p.tel.Datapoints().Inc(ctx, telemetry.Cause(err)) - return acc, streams.Drop + attrs.Set(telemetry.Cause(err)) + return drop } - // store aggregated result in state and return - p.tel.Datapoints().Inc(ctx) - _ = state.Store(id, acc) - return acc, nil + return keep }) - errs = errors.Join(errs, err) - - // all remaining datapoints are cumulative - sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - // if no datapoints remain, drop now-empty metric - return sum.Len() > 0 + // all remaining datapoints of this metric are now cumulative + m.Typed().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + // if no datapoints remain, drop empty metric + return m.Typed().Len() > 0 }) - if errs != nil { - return errs - } // no need to continue pipeline if we dropped all metrics if md.MetricCount() == 0 { @@ -159,7 +155,7 @@ func (p *Linear) Start(_ context.Context, _ component.Host) error { p.mtx.Lock() stale := p.stale.Collect(p.cfg.MaxStale) for _, id := range stale { - p.state.Delete(id) + p.last.Delete(id) } p.mtx.Unlock() } @@ -179,33 +175,40 @@ func (p *Linear) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } -type Metric[T data.Point[T]] interface { - metrics.Filterable[T] - SetAggregationTemporality(pmetric.AggregationTemporality) -} - // state keeps a cumulative value, aggregated over time, per stream type state struct { - nums streams.Map[data.Number] - - // future use - hist streams.Map[data.Histogram] - expo streams.Map[data.ExpHistogram] + nums map[identity.Stream]pmetric.NumberDataPoint + hist map[identity.Stream]pmetric.HistogramDataPoint + expo map[identity.Stream]pmetric.ExponentialHistogramDataPoint } func (m state) Len() int { - return m.nums.Len() + m.hist.Len() + m.expo.Len() + return len(m.nums) + len(m.hist) + len(m.expo) } func (m state) Has(id identity.Stream) bool { - _, nok := m.nums.Load(id) - _, hok := m.hist.Load(id) - _, eok := m.expo.Load(id) + _, nok := m.nums[id] + _, hok := m.hist[id] + _, eok := m.expo[id] return nok || hok || eok } func (m state) Delete(id identity.Stream) { - m.nums.Delete(id) - m.hist.Delete(id) - m.expo.Delete(id) + delete(m.nums, id) + delete(m.hist, id) + delete(m.expo, id) +} + +func (m state) BeginWith(id identity.Stream, dp any) { + switch dp := dp.(type) { + case pmetric.NumberDataPoint: + m.nums[id] = pmetric.NewNumberDataPoint() + dp.CopyTo(m.nums[id]) + case pmetric.HistogramDataPoint: + m.hist[id] = pmetric.NewHistogramDataPoint() + dp.CopyTo(m.hist[id]) + case pmetric.ExponentialHistogramDataPoint: + m.expo[id] = pmetric.NewExponentialHistogramDataPoint() + dp.CopyTo(m.expo[id]) + } } diff --git a/processor/deltatocumulativeprocessor/testdata/exponential/1.test b/processor/deltatocumulativeprocessor/testdata/exponential/1.test index a8c82e51c009..8aa87775ae80 100644 --- a/processor/deltatocumulativeprocessor/testdata/exponential/1.test +++ b/processor/deltatocumulativeprocessor/testdata/exponential/1.test @@ -92,5 +92,5 @@ resourceMetrics: bucketCounts: [3,7,5,0,0] -- telemetry -- -updown otelcol_deltatocumulative.streams.tracked: +updown otelcol_deltatocumulative.streams.tracked.linear: - int: 2 diff --git a/processor/deltatocumulativeprocessor/testdata/histograms/1.test b/processor/deltatocumulativeprocessor/testdata/histograms/1.test index 6b63c17275b9..ed1265db7713 100644 --- a/processor/deltatocumulativeprocessor/testdata/histograms/1.test +++ b/processor/deltatocumulativeprocessor/testdata/histograms/1.test @@ -49,5 +49,5 @@ resourceMetrics: bucketCounts: [ 1, 2, 3, 4] -- telemetry -- -updown otelcol_deltatocumulative.streams.tracked: +updown otelcol_deltatocumulative.streams.tracked.linear: - int: 1 From c6720dd971d9eb08bfccb9ebded2c8e44e25f572 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 21 Nov 2024 16:09:25 +0100 Subject: [PATCH 2/4] deltatocumulative: benchmark --- .../benchmark_test.go | 181 ++++++++++++++++++ .../internal/lineartelemetry/attr.go | 5 +- .../internal/testing/sdktest/metrics.go | 11 ++ .../processor_test.go | 2 +- 4 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/benchmark_test.go diff --git a/processor/deltatocumulativeprocessor/benchmark_test.go b/processor/deltatocumulativeprocessor/benchmark_test.go new file mode 100644 index 000000000000..f0ac5c987359 --- /dev/null +++ b/processor/deltatocumulativeprocessor/benchmark_test.go @@ -0,0 +1,181 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor + +import ( + "context" + "math/rand/v2" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" +) + +var out consumertest.MetricsSink + +func BenchmarkProcessor(gb *testing.B) { + const ( + metrics = 5 + streams = 10 + ) + + type Case struct { + name string + fill func(m pmetric.Metric) + next func(m pmetric.Metric) + } + + run := func(b *testing.B, proc consumer.Metrics, cs Case) { + md := pmetric.NewMetrics() + ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + for i := range metrics { + m := ms.AppendEmpty() + m.SetName(strconv.Itoa(i)) + cs.fill(m) + } + + b.ReportAllocs() + b.ResetTimer() + b.StopTimer() + + ctx := context.Background() + for range b.N { + for i := range ms.Len() { + cs.next(ms.At(i)) + } + req := pmetric.NewMetrics() + md.CopyTo(req) + + b.StartTimer() + err := proc.ConsumeMetrics(ctx, req) + b.StopTimer() + require.NoError(b, err) + } + } + + now := time.Now() + start := pcommon.NewTimestampFromTime(now) + ts := pcommon.NewTimestampFromTime(now.Add(time.Minute)) + + cases := []Case{{ + name: "sums", + fill: func(m pmetric.Metric) { + sum := m.SetEmptySum() + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := sum.DataPoints().AppendEmpty() + dp.SetIntValue(int64(rand.IntN(10))) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + } + }, + next: next(pmetric.Metric.Sum), + }, { + name: "histogram", + fill: func(m pmetric.Metric) { + hist := m.SetEmptyHistogram() + hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := hist.DataPoints().AppendEmpty() + histo.DefaultBounds.Observe( + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + ).CopyTo(dp) + + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + } + }, + next: next(pmetric.Metric.Histogram), + }, { + name: "exponential", + fill: func(m pmetric.Metric) { + ex := m.SetEmptyExponentialHistogram() + ex.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := ex.DataPoints().AppendEmpty() + o := expotest.Observe(expo.Scale(2), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + ) + o.CopyTo(dp.Positive()) + o.CopyTo(dp.Negative()) + + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + } + }, + next: next(pmetric.Metric.ExponentialHistogram), + }} + + tel := func(n int) sdktest.Spec { + total := int64(n * metrics * streams) + tracked := int64(metrics * streams) + return sdktest.Expect(map[string]sdktest.Metric{ + "otelcol_deltatocumulative.datapoints.linear": { + Type: sdktest.TypeSum, + Numbers: []sdktest.Number{{Int: &total}}, + Monotonic: true, + }, + "otelcol_deltatocumulative.streams.tracked.linear": { + Type: sdktest.TypeSum, + Numbers: []sdktest.Number{{Int: &tracked}}, + }, + }) + } + + for _, cs := range cases { + gb.Run(cs.name, func(b *testing.B) { + st := setup(b, nil) + run(b, st.proc, cs) + + // verify all dps are processed without error + b.StopTimer() + if err := sdktest.Test(tel(b.N), st.tel.reader); err != nil { + b.Fatal(err) + } + }) + } +} + +func next[ + T interface{ DataPoints() Ps }, + Ps interface { + At(int) P + Len() int + }, + P interface { + Timestamp() pcommon.Timestamp + SetStartTimestamp(pcommon.Timestamp) + SetTimestamp(pcommon.Timestamp) + }, +](sel func(pmetric.Metric) T) func(m pmetric.Metric) { + return func(m pmetric.Metric) { + dps := sel(m).DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + } +} diff --git a/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go b/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go index a2a2e224a0ca..cdd68a75b76c 100644 --- a/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go +++ b/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go @@ -1,4 +1,7 @@ -package telemetry +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" import "go.opentelemetry.io/otel/attribute" diff --git a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go index 440d094ba274..cf9ca23b317b 100644 --- a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go @@ -123,3 +123,14 @@ func (attr attributes) Into() attribute.Set { // // Temporality is optional and defaults to [sdk.CumulativeTemporality] type Format = []byte + +func Expect(metrics map[string]Metric) Spec { + for name, m := range metrics { + m.Name = name + if m.Temporality == 0 { + m.Temporality = sdk.CumulativeTemporality + } + metrics[name] = m + } + return Spec(metrics) +} diff --git a/processor/deltatocumulativeprocessor/processor_test.go b/processor/deltatocumulativeprocessor/processor_test.go index df5257d86d86..cbddc68ef5d5 100644 --- a/processor/deltatocumulativeprocessor/processor_test.go +++ b/processor/deltatocumulativeprocessor/processor_test.go @@ -97,7 +97,7 @@ func config(t *testing.T, file string) *Config { return cfg } -func setup(t *testing.T, cfg *Config) State { +func setup(t testing.TB, cfg *Config) State { t.Helper() next := &consumertest.MetricsSink{} From 78ca576256ee40ce3f3423b25003fea3f9bb4a6f Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 21 Nov 2024 17:07:45 +0100 Subject: [PATCH 3/4] *: linter fixes --- processor/deltatocumulativeprocessor/benchmark_test.go | 3 ++- .../internal/testing/sdktest/metrics.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/processor/deltatocumulativeprocessor/benchmark_test.go b/processor/deltatocumulativeprocessor/benchmark_test.go index f0ac5c987359..fd71734ddd39 100644 --- a/processor/deltatocumulativeprocessor/benchmark_test.go +++ b/processor/deltatocumulativeprocessor/benchmark_test.go @@ -22,7 +22,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" ) -var out consumertest.MetricsSink +var out *consumertest.MetricsSink func BenchmarkProcessor(gb *testing.B) { const ( @@ -145,6 +145,7 @@ func BenchmarkProcessor(gb *testing.B) { for _, cs := range cases { gb.Run(cs.name, func(b *testing.B) { st := setup(b, nil) + out = st.sink run(b, st.proc, cs) // verify all dps are processed without error diff --git a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go index cf9ca23b317b..e2c5a77d293a 100644 --- a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go @@ -132,5 +132,5 @@ func Expect(metrics map[string]Metric) Spec { } metrics[name] = m } - return Spec(metrics) + return metrics } From 0df2648cae639cda18362c9bd3c89994755ef5d5 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 28 Nov 2024 18:26:06 +0100 Subject: [PATCH 4/4] *: review feedback --- .../benchmark_test.go | 143 ++++++++---------- .../internal/testing/sdktest/metrics.go | 11 -- 2 files changed, 61 insertions(+), 93 deletions(-) diff --git a/processor/deltatocumulativeprocessor/benchmark_test.go b/processor/deltatocumulativeprocessor/benchmark_test.go index fd71734ddd39..cfe6e5146c8d 100644 --- a/processor/deltatocumulativeprocessor/benchmark_test.go +++ b/processor/deltatocumulativeprocessor/benchmark_test.go @@ -11,7 +11,6 @@ import ( "time" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -19,7 +18,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testing/sdktest" ) var out *consumertest.MetricsSink @@ -30,44 +28,15 @@ func BenchmarkProcessor(gb *testing.B) { streams = 10 ) + now := time.Now() + start := pcommon.NewTimestampFromTime(now) + ts := pcommon.NewTimestampFromTime(now.Add(time.Minute)) + type Case struct { name string fill func(m pmetric.Metric) next func(m pmetric.Metric) } - - run := func(b *testing.B, proc consumer.Metrics, cs Case) { - md := pmetric.NewMetrics() - ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() - for i := range metrics { - m := ms.AppendEmpty() - m.SetName(strconv.Itoa(i)) - cs.fill(m) - } - - b.ReportAllocs() - b.ResetTimer() - b.StopTimer() - - ctx := context.Background() - for range b.N { - for i := range ms.Len() { - cs.next(ms.At(i)) - } - req := pmetric.NewMetrics() - md.CopyTo(req) - - b.StartTimer() - err := proc.ConsumeMetrics(ctx, req) - b.StopTimer() - require.NoError(b, err) - } - } - - now := time.Now() - start := pcommon.NewTimestampFromTime(now) - ts := pcommon.NewTimestampFromTime(now.Add(time.Minute)) - cases := []Case{{ name: "sums", fill: func(m pmetric.Metric) { @@ -81,7 +50,16 @@ func BenchmarkProcessor(gb *testing.B) { dp.SetTimestamp(ts) } }, - next: next(pmetric.Metric.Sum), + next: func(m pmetric.Metric) { + dps := m.Sum().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, }, { name: "histogram", fill: func(m pmetric.Metric) { @@ -101,7 +79,16 @@ func BenchmarkProcessor(gb *testing.B) { dp.Attributes().PutStr("idx", strconv.Itoa(i)) } }, - next: next(pmetric.Metric.Histogram), + next: func(m pmetric.Metric) { + dps := m.Histogram().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, }, { name: "exponential", fill: func(m pmetric.Metric) { @@ -123,60 +110,52 @@ func BenchmarkProcessor(gb *testing.B) { dp.Attributes().PutStr("idx", strconv.Itoa(i)) } }, - next: next(pmetric.Metric.ExponentialHistogram), + next: func(m pmetric.Metric) { + dps := m.ExponentialHistogram().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, }} - tel := func(n int) sdktest.Spec { - total := int64(n * metrics * streams) - tracked := int64(metrics * streams) - return sdktest.Expect(map[string]sdktest.Metric{ - "otelcol_deltatocumulative.datapoints.linear": { - Type: sdktest.TypeSum, - Numbers: []sdktest.Number{{Int: &total}}, - Monotonic: true, - }, - "otelcol_deltatocumulative.streams.tracked.linear": { - Type: sdktest.TypeSum, - Numbers: []sdktest.Number{{Int: &tracked}}, - }, - }) - } - for _, cs := range cases { gb.Run(cs.name, func(b *testing.B) { st := setup(b, nil) out = st.sink - run(b, st.proc, cs) - // verify all dps are processed without error + md := pmetric.NewMetrics() + ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + for i := range metrics { + m := ms.AppendEmpty() + m.SetName(strconv.Itoa(i)) + cs.fill(m) + } + + b.ReportAllocs() + b.ResetTimer() b.StopTimer() - if err := sdktest.Test(tel(b.N), st.tel.reader); err != nil { - b.Fatal(err) + + ctx := context.Background() + for range b.N { + for i := range ms.Len() { + cs.next(ms.At(i)) + } + req := pmetric.NewMetrics() + md.CopyTo(req) + + b.StartTimer() + err := st.proc.ConsumeMetrics(ctx, req) + b.StopTimer() + require.NoError(b, err) } - }) - } -} -func next[ - T interface{ DataPoints() Ps }, - Ps interface { - At(int) P - Len() int - }, - P interface { - Timestamp() pcommon.Timestamp - SetStartTimestamp(pcommon.Timestamp) - SetTimestamp(pcommon.Timestamp) - }, -](sel func(pmetric.Metric) T) func(m pmetric.Metric) { - return func(m pmetric.Metric) { - dps := sel(m).DataPoints() - for i := range dps.Len() { - dp := dps.At(i) - dp.SetStartTimestamp(dp.Timestamp()) - dp.SetTimestamp(pcommon.NewTimestampFromTime( - dp.Timestamp().AsTime().Add(time.Minute), - )) - } + // verify all dps are processed without error + b.StopTimer() + require.Equal(b, b.N*metrics*streams, st.sink.DataPointCount()) + }) } } diff --git a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go index e2c5a77d293a..440d094ba274 100644 --- a/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/testing/sdktest/metrics.go @@ -123,14 +123,3 @@ func (attr attributes) Into() attribute.Set { // // Temporality is optional and defaults to [sdk.CumulativeTemporality] type Format = []byte - -func Expect(metrics map[string]Metric) Spec { - for name, m := range metrics { - m.Name = name - if m.Temporality == 0 { - m.Temporality = sdk.CumulativeTemporality - } - metrics[name] = m - } - return metrics -}