From a2591f7674b14d62fbdfc452c60bab100a3fa087 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 2 Dec 2024 09:47:44 +0100 Subject: [PATCH] [chore] [deltatocumulative]: linear histograms (#36486) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #### Description Finishes work started in https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35048 That PR only partially introduced a less complex processor architecture by only using it for Sums. Back then I was not sure of the best way to do it for multiple datatypes, as generics seemed to introduce a lot of complexity regardless of usage. I since then did of a lot of perf analysis and due to the way Go works (see gcshapes), we do not really gain anything at runtime from using generics, given method calls are still dynamic. This implementation uses regular Go interfaces and a good old type switch in the hot path (ConsumeMetrics), which lowers mental complexity quite a lot imo. The value of the new architecture is backed up by the following benchmark: ``` goos: linux goarch: arm64 pkg: github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor │ sums.nested │ sums.linear │ │ sec/op │ sec/op vs base │ Processor/sums-8 56.35µ ± 1% 39.99µ ± 1% -29.04% (p=0.000 n=10) │ sums.nested │ sums.linear │ │ B/op │ B/op vs base │ Processor/sums-8 11.520Ki ± 0% 3.683Ki ± 0% -68.03% (p=0.000 n=10) │ sums.nested │ sums.linear │ │ allocs/op │ allocs/op vs base │ Processor/sums-8 365.0 ± 0% 260.0 ± 0% -28.77% (p=0.000 n=10) ``` #### Testing This is a refactor, existing tests pass unaltered. #### Documentation not needed --- .../benchmark_test.go | 161 ++++++++++++++++++ .../internal/delta/delta.go | 22 ++- .../internal/lineartelemetry/attr.go | 12 ++ .../internal/metrics/data.go | 2 + .../internal/metrics/metrics.go | 48 +++++- .../deltatocumulativeprocessor/linear.go | 159 ++++++++--------- .../processor_test.go | 2 +- .../testdata/exponential/1.test | 2 +- .../testdata/histograms/1.test | 2 +- 9 files changed, 326 insertions(+), 84 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/benchmark_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go diff --git a/processor/deltatocumulativeprocessor/benchmark_test.go b/processor/deltatocumulativeprocessor/benchmark_test.go new file mode 100644 index 000000000000..cfe6e5146c8d --- /dev/null +++ b/processor/deltatocumulativeprocessor/benchmark_test.go @@ -0,0 +1,161 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor + +import ( + "context" + "math/rand/v2" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/histo" +) + +var out *consumertest.MetricsSink + +func BenchmarkProcessor(gb *testing.B) { + const ( + metrics = 5 + streams = 10 + ) + + now := time.Now() + start := pcommon.NewTimestampFromTime(now) + ts := pcommon.NewTimestampFromTime(now.Add(time.Minute)) + + type Case struct { + name string + fill func(m pmetric.Metric) + next func(m pmetric.Metric) + } + cases := []Case{{ + name: "sums", + fill: func(m pmetric.Metric) { + sum := m.SetEmptySum() + sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := sum.DataPoints().AppendEmpty() + dp.SetIntValue(int64(rand.IntN(10))) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + } + }, + next: func(m pmetric.Metric) { + dps := m.Sum().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, + }, { + name: "histogram", + fill: func(m pmetric.Metric) { + hist := m.SetEmptyHistogram() + hist.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := hist.DataPoints().AppendEmpty() + histo.DefaultBounds.Observe( + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + float64(rand.IntN(1000)), + ).CopyTo(dp) + + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + } + }, + next: func(m pmetric.Metric) { + dps := m.Histogram().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, + }, { + name: "exponential", + fill: func(m pmetric.Metric) { + ex := m.SetEmptyExponentialHistogram() + ex.SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + for i := range streams { + dp := ex.DataPoints().AppendEmpty() + o := expotest.Observe(expo.Scale(2), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + float64(rand.IntN(31)+1), + ) + o.CopyTo(dp.Positive()) + o.CopyTo(dp.Negative()) + + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.Attributes().PutStr("idx", strconv.Itoa(i)) + } + }, + next: func(m pmetric.Metric) { + dps := m.ExponentialHistogram().DataPoints() + for i := range dps.Len() { + dp := dps.At(i) + dp.SetStartTimestamp(dp.Timestamp()) + dp.SetTimestamp(pcommon.NewTimestampFromTime( + dp.Timestamp().AsTime().Add(time.Minute), + )) + } + }, + }} + + for _, cs := range cases { + gb.Run(cs.name, func(b *testing.B) { + st := setup(b, nil) + out = st.sink + + md := pmetric.NewMetrics() + ms := md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + for i := range metrics { + m := ms.AppendEmpty() + m.SetName(strconv.Itoa(i)) + cs.fill(m) + } + + b.ReportAllocs() + b.ResetTimer() + b.StopTimer() + + ctx := context.Background() + for range b.N { + for i := range ms.Len() { + cs.next(ms.At(i)) + } + req := pmetric.NewMetrics() + md.CopyTo(req) + + b.StartTimer() + err := st.proc.ConsumeMetrics(ctx, req) + b.StopTimer() + require.NoError(b, err) + } + + // verify all dps are processed without error + b.StopTimer() + require.Equal(b, b.N*metrics*streams, st.sink.DataPointCount()) + }) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index e8d71d669f12..3320d44f2724 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -7,6 +7,7 @@ import ( "fmt" "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" @@ -83,10 +84,17 @@ func (e ErrGap) Error() string { return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) } +type Type interface { + pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint + + StartTimestamp() pcommon.Timestamp + Timestamp() pcommon.Timestamp +} + // AccumulateInto adds state and dp, storing the result in state // // state = state + dp -func AccumulateInto[P data.Point[P]](state P, dp P) error { +func AccumulateInto[T Type](state, dp T) error { switch { case dp.StartTimestamp() < state.StartTimestamp(): // belongs to older series @@ -96,6 +104,16 @@ func AccumulateInto[P data.Point[P]](state P, dp P) error { return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()} } - state.Add(dp) + switch dp := any(dp).(type) { + case pmetric.NumberDataPoint: + state := any(state).(pmetric.NumberDataPoint) + data.Number{NumberDataPoint: state}.Add(data.Number{NumberDataPoint: dp}) + case pmetric.HistogramDataPoint: + state := any(state).(pmetric.HistogramDataPoint) + data.Histogram{HistogramDataPoint: state}.Add(data.Histogram{HistogramDataPoint: dp}) + case pmetric.ExponentialHistogramDataPoint: + state := any(state).(pmetric.ExponentialHistogramDataPoint) + data.ExpHistogram{DataPoint: state}.Add(data.ExpHistogram{DataPoint: dp}) + } return nil } diff --git a/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go b/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go new file mode 100644 index 000000000000..cdd68a75b76c --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/lineartelemetry/attr.go @@ -0,0 +1,12 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" + +import "go.opentelemetry.io/otel/attribute" + +type Attributes []attribute.KeyValue + +func (a *Attributes) Set(attr attribute.KeyValue) { + *a = append(*a, attr) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index 9fa1df07eb1d..08e1aa4b8ae8 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -115,6 +115,7 @@ func (s Gauge) Filter(expr func(data.Number) bool) { return !expr(data.Number{NumberDataPoint: dp}) }) } +func (s Gauge) SetAggregationTemporality(pmetric.AggregationTemporality) {} type Summary Metric @@ -136,3 +137,4 @@ func (s Summary) Filter(expr func(data.Summary) bool) { return !expr(data.Summary{SummaryDataPoint: dp}) }) } +func (s Summary) SetAggregationTemporality(pmetric.AggregationTemporality) {} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index 98388dbf5eb6..b19b03f1a1c7 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -47,7 +47,7 @@ func (m Metric) AggregationTemporality() pmetric.AggregationTemporality { return pmetric.AggregationTemporalityUnspecified } -func (m Metric) Typed() any { +func (m Metric) Typed() Any { //exhaustive:enforce switch m.Type() { case pmetric.MetricTypeSum: @@ -63,3 +63,49 @@ func (m Metric) Typed() any { } panic("unreachable") } + +var ( + _ Any = Sum{} + _ Any = Gauge{} + _ Any = ExpHistogram{} + _ Any = Histogram{} + _ Any = Summary{} +) + +type Any interface { + Len() int + Ident() identity.Metric + + SetAggregationTemporality(pmetric.AggregationTemporality) +} + +func (m Metric) Filter(ok func(id identity.Stream, dp any) bool) { + mid := m.Ident() + switch m.Type() { + case pmetric.MetricTypeSum: + m.Sum().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeGauge: + m.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeHistogram: + m.Histogram().DataPoints().RemoveIf(func(dp pmetric.HistogramDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeExponentialHistogram: + m.ExponentialHistogram().DataPoints().RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + case pmetric.MetricTypeSummary: + m.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool { + id := identity.OfStream(mid, dp) + return !ok(id, dp) + }) + } +} diff --git a/processor/deltatocumulativeprocessor/linear.go b/processor/deltatocumulativeprocessor/linear.go index 0b547fe5145d..2b725b7dc78d 100644 --- a/processor/deltatocumulativeprocessor/linear.go +++ b/processor/deltatocumulativeprocessor/linear.go @@ -5,7 +5,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" - "errors" "sync" "time" @@ -16,22 +15,19 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" - exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) var _ processor.Metrics = (*Linear)(nil) type Linear struct { - next processor.Metrics + next consumer.Metrics cfg Config - state state - mtx sync.Mutex + last state + mtx sync.Mutex ctx context.Context cancel context.CancelFunc @@ -40,16 +36,16 @@ type Linear struct { tel telemetry.Metrics } -func newLinear(cfg *Config, tel telemetry.Metrics, next processor.Metrics) *Linear { +func newLinear(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Linear { ctx, cancel := context.WithCancel(context.Background()) proc := Linear{ next: next, cfg: *cfg, - state: state{ - nums: make(exp.HashMap[data.Number]), - hist: make(exp.HashMap[data.Histogram]), - expo: make(exp.HashMap[data.ExpHistogram]), + last: state{ + nums: make(map[identity.Stream]pmetric.NumberDataPoint), + hist: make(map[identity.Stream]pmetric.HistogramDataPoint), + expo: make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint), }, ctx: ctx, cancel: cancel, @@ -58,7 +54,7 @@ func newLinear(cfg *Config, tel telemetry.Metrics, next processor.Metrics) *Line tel: tel, } - tel.WithTracked(proc.state.Len) + tel.WithTracked(proc.last.Len) cfg.Metrics(tel) return &proc @@ -75,68 +71,68 @@ func (p *Linear) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { drop = false ) - // possible errors encountered while aggregating. - // errors.Join-ed []streams.Error - var errs error - metrics.Filter(md, func(m metrics.Metric) bool { if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { return keep } - // NOTE: to make review and migration easier, below only does sums for now. - // all other datatypes are handled by older code, which is called after this. - // - // TODO: implement other datatypes here - if m.Type() != pmetric.MetricTypeSum { - return keep - } - - sum := metrics.Sum(m) - state := p.state.nums - - // apply fn to each dp in stream. if fn's err != nil, dp is removed from stream - err := streams.Apply(sum, func(id identity.Stream, dp data.Number) (data.Number, error) { - acc, ok := state.Load(id) - // if at stream limit and stream not seen before, reject - if !ok && p.state.Len() >= p.cfg.MaxStreams { - p.tel.Datapoints().Inc(ctx, telemetry.Error("limit")) - return dp, streams.Drop + // aggregate the datapoints. + // using filter here, as the pmetric.*DataPoint are reference types so + // we can modify them using their "value". + m.Filter(func(id identity.Stream, dp any) bool { + // count the processed datatype. + // uses whatever value of attrs has at return-time + var attrs telemetry.Attributes + defer func() { p.tel.Datapoints().Inc(ctx, attrs...) }() + + // if stream new and state capacity reached, reject + exist := p.last.Has(id) + if !exist && p.last.Len() >= p.cfg.MaxStreams { + attrs.Set(telemetry.Error("limit")) + return drop } - // stream is alive, update stale tracker + // stream is ok and active, update stale tracker p.stale.Refresh(now, id) - acc, err := func() (data.Number, error) { - if !ok { - // new stream: there is no existing aggregation, so start new with current dp - return dp.Clone(), nil - } - // tracked stream: add incoming delta dp to existing cumulative aggregation - return acc, delta.AccumulateInto(acc, dp) - }() - // aggregation failed, record as metric and drop datapoint + // this is the first sample of the stream. there is nothing to + // aggregate with, so clone this value into the state and done + if !exist { + p.last.BeginWith(id, dp) + return keep + } + + // aggregate with state from previous requests. + // delta.AccumulateInto(state, dp) stores result in `state`. + // this is then copied into `dp` (the value passed onto the pipeline) + var err error + switch dp := dp.(type) { + case pmetric.NumberDataPoint: + state := p.last.nums[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + case pmetric.HistogramDataPoint: + state := p.last.hist[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + case pmetric.ExponentialHistogramDataPoint: + state := p.last.expo[id] + err = delta.AccumulateInto(state, dp) + state.CopyTo(dp) + } if err != nil { - p.tel.Datapoints().Inc(ctx, telemetry.Cause(err)) - return acc, streams.Drop + attrs.Set(telemetry.Cause(err)) + return drop } - // store aggregated result in state and return - p.tel.Datapoints().Inc(ctx) - _ = state.Store(id, acc) - return acc, nil + return keep }) - errs = errors.Join(errs, err) - - // all remaining datapoints are cumulative - sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - // if no datapoints remain, drop now-empty metric - return sum.Len() > 0 + // all remaining datapoints of this metric are now cumulative + m.Typed().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + // if no datapoints remain, drop empty metric + return m.Typed().Len() > 0 }) - if errs != nil { - return errs - } // no need to continue pipeline if we dropped all metrics if md.MetricCount() == 0 { @@ -159,7 +155,7 @@ func (p *Linear) Start(_ context.Context, _ component.Host) error { p.mtx.Lock() stale := p.stale.Collect(p.cfg.MaxStale) for _, id := range stale { - p.state.Delete(id) + p.last.Delete(id) } p.mtx.Unlock() } @@ -179,33 +175,40 @@ func (p *Linear) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } -type Metric[T data.Point[T]] interface { - metrics.Filterable[T] - SetAggregationTemporality(pmetric.AggregationTemporality) -} - // state keeps a cumulative value, aggregated over time, per stream type state struct { - nums streams.Map[data.Number] - - // future use - hist streams.Map[data.Histogram] - expo streams.Map[data.ExpHistogram] + nums map[identity.Stream]pmetric.NumberDataPoint + hist map[identity.Stream]pmetric.HistogramDataPoint + expo map[identity.Stream]pmetric.ExponentialHistogramDataPoint } func (m state) Len() int { - return m.nums.Len() + m.hist.Len() + m.expo.Len() + return len(m.nums) + len(m.hist) + len(m.expo) } func (m state) Has(id identity.Stream) bool { - _, nok := m.nums.Load(id) - _, hok := m.hist.Load(id) - _, eok := m.expo.Load(id) + _, nok := m.nums[id] + _, hok := m.hist[id] + _, eok := m.expo[id] return nok || hok || eok } func (m state) Delete(id identity.Stream) { - m.nums.Delete(id) - m.hist.Delete(id) - m.expo.Delete(id) + delete(m.nums, id) + delete(m.hist, id) + delete(m.expo, id) +} + +func (m state) BeginWith(id identity.Stream, dp any) { + switch dp := dp.(type) { + case pmetric.NumberDataPoint: + m.nums[id] = pmetric.NewNumberDataPoint() + dp.CopyTo(m.nums[id]) + case pmetric.HistogramDataPoint: + m.hist[id] = pmetric.NewHistogramDataPoint() + dp.CopyTo(m.hist[id]) + case pmetric.ExponentialHistogramDataPoint: + m.expo[id] = pmetric.NewExponentialHistogramDataPoint() + dp.CopyTo(m.expo[id]) + } } diff --git a/processor/deltatocumulativeprocessor/processor_test.go b/processor/deltatocumulativeprocessor/processor_test.go index df5257d86d86..cbddc68ef5d5 100644 --- a/processor/deltatocumulativeprocessor/processor_test.go +++ b/processor/deltatocumulativeprocessor/processor_test.go @@ -97,7 +97,7 @@ func config(t *testing.T, file string) *Config { return cfg } -func setup(t *testing.T, cfg *Config) State { +func setup(t testing.TB, cfg *Config) State { t.Helper() next := &consumertest.MetricsSink{} diff --git a/processor/deltatocumulativeprocessor/testdata/exponential/1.test b/processor/deltatocumulativeprocessor/testdata/exponential/1.test index a8c82e51c009..8aa87775ae80 100644 --- a/processor/deltatocumulativeprocessor/testdata/exponential/1.test +++ b/processor/deltatocumulativeprocessor/testdata/exponential/1.test @@ -92,5 +92,5 @@ resourceMetrics: bucketCounts: [3,7,5,0,0] -- telemetry -- -updown otelcol_deltatocumulative.streams.tracked: +updown otelcol_deltatocumulative.streams.tracked.linear: - int: 2 diff --git a/processor/deltatocumulativeprocessor/testdata/histograms/1.test b/processor/deltatocumulativeprocessor/testdata/histograms/1.test index 6b63c17275b9..ed1265db7713 100644 --- a/processor/deltatocumulativeprocessor/testdata/histograms/1.test +++ b/processor/deltatocumulativeprocessor/testdata/histograms/1.test @@ -49,5 +49,5 @@ resourceMetrics: bucketCounts: [ 1, 2, 3, 4] -- telemetry -- -updown otelcol_deltatocumulative.streams.tracked: +updown otelcol_deltatocumulative.streams.tracked.linear: - int: 1