diff --git a/processor/tailsamplingprocessor/factory.go b/processor/tailsamplingprocessor/factory.go index c078a86278870..df267f296ff81 100644 --- a/processor/tailsamplingprocessor/factory.go +++ b/processor/tailsamplingprocessor/factory.go @@ -7,38 +7,17 @@ package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry import ( "context" - "sync" "time" - "go.opencensus.io/stats/view" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/consumer" - "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/processor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata" ) -var onceMetrics sync.Once - -var metricStatCountSpansSampledFeatureGate = featuregate.GlobalRegistry().MustRegister( - "processor.tailsamplingprocessor.metricstatcountspanssampled", - featuregate.StageAlpha, - featuregate.WithRegisterDescription("When enabled, a new metric stat_count_spans_sampled will be available in the tail sampling processor. Differently from stat_count_traces_sampled, this metric will count the number of spans sampled or not per sampling policy, where the original counts traces."), -) - -func isMetricStatCountSpansSampledEnabled() bool { - return metricStatCountSpansSampledFeatureGate.IsEnabled() -} - // NewFactory returns a new factory for the Tail Sampling processor. func NewFactory() processor.Factory { - onceMetrics.Do(func() { - // TODO: this is hardcoding the metrics level and skips error handling - _ = view.Register(samplingProcessorMetricViews(configtelemetry.LevelNormal)...) - }) - return processor.NewFactory( metadata.Type, createDefaultConfig, diff --git a/processor/tailsamplingprocessor/internal/sampling/package_test.go b/processor/tailsamplingprocessor/internal/sampling/package_test.go deleted file mode 100644 index 73b9f3d9f0ab1..0000000000000 --- a/processor/tailsamplingprocessor/internal/sampling/package_test.go +++ /dev/null @@ -1,17 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package sampling - -import ( - "testing" - - "go.uber.org/goleak" -) - -// The IgnoreTopFunction call prevents catching the leak generated by opencensus -// defaultWorker.Start which at this time is part of the package's init call. -// See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information. -func TestMain(m *testing.M) { - goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) -} diff --git a/processor/tailsamplingprocessor/internal/telemetry/featureflag.go b/processor/tailsamplingprocessor/internal/telemetry/featureflag.go new file mode 100644 index 0000000000000..f8e4239c01eb9 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/telemetry/featureflag.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" + +import "go.opentelemetry.io/collector/featuregate" + +var metricStatCountSpansSampledFeatureGate = featuregate.GlobalRegistry().MustRegister( + "processor.tailsamplingprocessor.metricstatcountspanssampled", + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, a new metric stat_count_spans_sampled will be available in the tail sampling processor. Differently from stat_count_traces_sampled, this metric will count the number of spans sampled or not per sampling policy, where the original counts traces."), +) + +func isMetricStatCountSpansSampledEnabled() bool { + return metricStatCountSpansSampledFeatureGate.IsEnabled() +} diff --git a/processor/tailsamplingprocessor/metrics.go b/processor/tailsamplingprocessor/internal/telemetry/metrics_oc.go similarity index 73% rename from processor/tailsamplingprocessor/metrics.go rename to processor/tailsamplingprocessor/internal/telemetry/metrics_oc.go index 9a7e320063fe5..40c006bb49406 100644 --- a/processor/tailsamplingprocessor/metrics.go +++ b/processor/tailsamplingprocessor/internal/telemetry/metrics_oc.go @@ -1,9 +1,11 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package tailsamplingprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor" +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" import ( + "context" + "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" @@ -11,6 +13,7 @@ import ( "go.opentelemetry.io/collector/processor/processorhelper" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" ) // Variables related to metrics specific to tail sampling. @@ -19,6 +22,9 @@ var ( tagSampledKey, _ = tag.NewKey("sampled") tagSourceFormat, _ = tag.NewKey("source_format") + tagMutatorSampled = []tag.Mutator{tag.Upsert(tagSampledKey, "true")} + tagMutatorNotSampled = []tag.Mutator{tag.Upsert(tagSampledKey, "false")} + statDecisionLatencyMicroSec = stats.Int64("sampling_decision_latency", "Latency (in microseconds) of a given sampling policy", "µs") statOverallDecisionLatencyUs = stats.Int64("sampling_decision_timer_latency", "Latency (in microseconds) of each run of the sampling decision timer", "µs") @@ -36,6 +42,74 @@ var ( statTracesOnMemoryGauge = stats.Int64("sampling_traces_on_memory", "Tracks the number of traces current on memory", stats.UnitDimensionless) ) +func contextForPolicyOC(ctx context.Context, configName, format string) (context.Context, error) { + return tag.New(ctx, tag.Upsert(tagPolicyKey, configName), tag.Upsert(tagSourceFormat, format)) +} + +func recordFinalDecisionOC(ctx context.Context, latencyMicroSec, droppedTooEarly, evaluationErrors, tracesOnMemory int64, decision sampling.Decision) { + stats.Record(ctx, + statOverallDecisionLatencyUs.M(latencyMicroSec), + statDroppedTooEarlyCount.M(droppedTooEarly), + statPolicyEvaluationErrorCount.M(evaluationErrors), + statTracesOnMemoryGauge.M(tracesOnMemory), + ) + + var mutators []tag.Mutator + switch decision { + case sampling.Sampled: + mutators = tagMutatorSampled + case sampling.NotSampled: + mutators = tagMutatorNotSampled + } + + _ = stats.RecordWithTags( + ctx, + mutators, + statCountGlobalTracesSampled.M(int64(1)), + ) +} + +func recordPolicyLatencyOC(ctx context.Context, latencyMicroSec int64) { + stats.Record(ctx, + statDecisionLatencyMicroSec.M(latencyMicroSec), + ) +} + +func recordPolicyDecisionOC(ctx context.Context, sampled bool, numSpans int64) { + var mutators []tag.Mutator + if sampled { + mutators = tagMutatorSampled + } else { + mutators = tagMutatorNotSampled + } + + _ = stats.RecordWithTags( + ctx, + mutators, + statCountTracesSampled.M(int64(1)), + ) + if isMetricStatCountSpansSampledEnabled() { + _ = stats.RecordWithTags( + ctx, + mutators, + statCountSpansSampled.M(numSpans), + ) + } + +} + +func recordNewTraceIDsOC(ctx context.Context, count int64) { + stats.Record(ctx, statNewTraceIDReceivedCount.M(count)) +} + +func recordLateSpanOC(ctx context.Context, ageSec int64) { + stats.Record(ctx, statLateSpanArrivalAfterDecision.M(ageSec)) +} + +func recordTraceRemovalAgeOC(ctx context.Context, ageSec int64) { + stats.Record(ctx, statTraceRemovalAgeSec.M(ageSec)) +} + // samplingProcessorMetricViews return the metrics views according to given telemetry level. func samplingProcessorMetricViews(level configtelemetry.Level) []*view.View { if level == configtelemetry.LevelNone { diff --git a/processor/tailsamplingprocessor/internal/telemetry/telemetry.go b/processor/tailsamplingprocessor/internal/telemetry/telemetry.go new file mode 100644 index 0000000000000..a31d9b8b6bcd3 --- /dev/null +++ b/processor/tailsamplingprocessor/internal/telemetry/telemetry.go @@ -0,0 +1,42 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" + +import ( + "context" + "sync" + + "go.opencensus.io/stats/view" + "go.opentelemetry.io/collector/config/configtelemetry" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" +) + +var onceTelemetry sync.Once + +type T struct { + ContextForPolicy func(ctx context.Context, configName, format string) (context.Context, error) + RecordPolicyLatency func(ctx context.Context, latencyMicroSec int64) + RecordPolicyDecision func(ctx context.Context, sampled bool, numSpans int64) + RecordNewTraceIDs func(ctx context.Context, count int64) + RecordLateSpan func(ctx context.Context, ageSec int64) + RecordTraceRemovalAge func(ctx context.Context, ageSec int64) + RecordFinalDecision func(ctx context.Context, latencyMicroSec, droppedTooEarly, evaluationErrors, tracesOnMemory int64, decision sampling.Decision) +} + +func New() *T { + onceTelemetry.Do(func() { + _ = view.Register(samplingProcessorMetricViews(configtelemetry.LevelNormal)...) + }) + + return &T{ + ContextForPolicy: contextForPolicyOC, + RecordPolicyLatency: recordPolicyLatencyOC, + RecordPolicyDecision: recordPolicyDecisionOC, + RecordNewTraceIDs: recordNewTraceIDsOC, + RecordLateSpan: recordLateSpanOC, + RecordTraceRemovalAge: recordTraceRemovalAgeOC, + RecordFinalDecision: recordFinalDecisionOC, + } +} diff --git a/processor/tailsamplingprocessor/processor.go b/processor/tailsamplingprocessor/processor.go index 979c790839e35..faee80217d451 100644 --- a/processor/tailsamplingprocessor/processor.go +++ b/processor/tailsamplingprocessor/processor.go @@ -12,8 +12,6 @@ import ( "sync/atomic" "time" - "go.opencensus.io/stats" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" @@ -24,11 +22,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/idbatcher" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" -) - -var ( - tagUpsertSampled = tag.Upsert(tagSampledKey, "true") - tagUpsertNotSampled = tag.Upsert(tagSampledKey, "false") + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" ) // policy combines a sampling policy evaluator with the destinations to be @@ -45,6 +39,7 @@ type policy struct { // tailSamplingSpanProcessor handles the incoming trace data and uses the given sampling // policy to sample traces. type tailSamplingSpanProcessor struct { + *telemetry.T ctx context.Context nextConsumer consumer.Traces maxNumTraces uint64 @@ -56,10 +51,6 @@ type tailSamplingSpanProcessor struct { decisionBatcher idbatcher.Batcher deleteChan chan pcommon.TraceID numTracesOnMap *atomic.Uint64 - - // This is for reusing the slice by each call of `makeDecision`. This - // was previously identified to be a bottleneck using profiling. - mutatorsBuf []tag.Mutator } // spanAndScope a structure for holding information about span and its instrumentation scope. @@ -77,6 +68,7 @@ const ( // newTracesProcessor returns a processor.TracesProcessor that will perform tail sampling according to the given // configuration. func newTracesProcessor(ctx context.Context, settings component.TelemetrySettings, nextConsumer consumer.Traces, cfg Config) (processor.Traces, error) { + telemetry := telemetry.New() policyNames := map[string]bool{} policies := make([]*policy, len(cfg.PolicyCfgs)) for i := range cfg.PolicyCfgs { @@ -87,7 +79,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting } policyNames[policyCfg.Name] = true - policyCtx, err := tag.New(ctx, tag.Upsert(tagPolicyKey, policyCfg.Name), tag.Upsert(tagSourceFormat, sourceFormat)) + policyCtx, err := telemetry.ContextForPolicy(ctx, policyCfg.Name, sourceFormat) if err != nil { return nil, err } @@ -120,10 +112,7 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting policies: policies, tickerFrequency: time.Second, numTracesOnMap: &atomic.Uint64{}, - - // We allocate exactly 1 element, because that's the exact amount - // used in any place. - mutatorsBuf: make([]tag.Mutator, 1), + T: telemetry, } tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick} @@ -206,6 +195,13 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { trace.DecisionTime = time.Now() decision, policy := tsp.makeDecision(id, trace, &metrics) + tsp.RecordFinalDecision(tsp.ctx, + int64(time.Since(startTime)/time.Microsecond), + metrics.idNotFoundOnMapCount, + metrics.evaluateErrorCount, + int64(tsp.numTracesOnMap.Load()), + decision, + ) // Sampled or not, remove the batches trace.Lock() @@ -219,12 +215,6 @@ func (tsp *tailSamplingSpanProcessor) samplingPolicyOnTick() { } } - stats.Record(tsp.ctx, - statOverallDecisionLatencyUs.M(int64(time.Since(startTime)/time.Microsecond)), - statDroppedTooEarlyCount.M(metrics.idNotFoundOnMapCount), - statPolicyEvaluationErrorCount.M(metrics.evaluateErrorCount), - statTracesOnMemoryGauge.M(int64(tsp.numTracesOnMap.Load()))) - tsp.logger.Debug("Sampling policy evaluation completed", zap.Int("batch.len", batchLen), zap.Int64("sampled", metrics.decisionSampled), @@ -249,9 +239,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa for i, p := range tsp.policies { policyEvaluateStartTime := time.Now() decision, err := p.evaluator.Evaluate(p.ctx, id, trace) - stats.Record( - p.ctx, - statDecisionLatencyMicroSec.M(int64(time.Since(policyEvaluateStartTime)/time.Microsecond))) + tsp.RecordPolicyLatency(p.ctx, int64(time.Since(policyEvaluateStartTime)/time.Microsecond)) if err != nil { samplingDecision[sampling.Error] = true trace.Decisions[i] = sampling.NotSampled @@ -288,7 +276,6 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa finalDecision = sampling.Sampled } - mutators := tsp.mutatorsBuf for i, p := range tsp.policies { switch trace.Decisions[i] { case sampling.Sampled: @@ -298,56 +285,14 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa matchingPolicy = p } - mutators[0] = tagUpsertSampled - _ = stats.RecordWithTags( - p.ctx, - mutators, - statCountTracesSampled.M(int64(1)), - ) - if isMetricStatCountSpansSampledEnabled() { - _ = stats.RecordWithTags( - p.ctx, - mutators, - statCountSpansSampled.M(trace.SpanCount.Load()), - ) - } + tsp.RecordPolicyDecision(p.ctx, true, trace.SpanCount.Load()) metrics.decisionSampled++ case sampling.NotSampled: - mutators[0] = tagUpsertNotSampled - _ = stats.RecordWithTags( - p.ctx, - mutators, - statCountTracesSampled.M(int64(1)), - ) - if isMetricStatCountSpansSampledEnabled() { - _ = stats.RecordWithTags( - p.ctx, - mutators, - statCountSpansSampled.M(trace.SpanCount.Load()), - ) - } + tsp.RecordPolicyDecision(p.ctx, false, trace.SpanCount.Load()) metrics.decisionNotSampled++ } } - - switch finalDecision { - case sampling.Sampled: - mutators[0] = tagUpsertSampled - _ = stats.RecordWithTags( - tsp.ctx, - mutators, - statCountGlobalTracesSampled.M(int64(1)), - ) - case sampling.NotSampled: - mutators[0] = tagUpsertNotSampled - _ = stats.RecordWithTags( - tsp.ctx, - mutators, - statCountGlobalTracesSampled.M(int64(1)), - ) - } - return finalDecision, matchingPolicy } @@ -444,7 +389,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc zap.Error(err)) } case sampling.NotSampled: - stats.Record(tsp.ctx, statLateSpanArrivalAfterDecision.M(int64(time.Since(actualData.DecisionTime)/time.Second))) + tsp.RecordLateSpan(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second)) default: tsp.logger.Warn("Encountered unexpected sampling decision", zap.Int("decision", int(finalDecision))) @@ -452,7 +397,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc } } - stats.Record(tsp.ctx, statNewTraceIDReceivedCount.M(newTraceIDs)) + tsp.RecordNewTraceIDs(tsp.ctx, newTraceIDs) } func (tsp *tailSamplingSpanProcessor) Capabilities() consumer.Capabilities { @@ -485,7 +430,7 @@ func (tsp *tailSamplingSpanProcessor) dropTrace(traceID pcommon.TraceID, deletio return } - stats.Record(tsp.ctx, statTraceRemovalAgeSec.M(int64(deletionTime.Sub(trace.ArrivalTime)/time.Second))) + tsp.RecordTraceRemovalAge(tsp.ctx, int64(deletionTime.Sub(trace.ArrivalTime)/time.Second)) } func appendToTraces(dest ptrace.Traces, rss ptrace.ResourceSpans, spanAndScopes []spanAndScope) { diff --git a/processor/tailsamplingprocessor/processor_test.go b/processor/tailsamplingprocessor/processor_test.go index 45a2740722f90..82dd049e275a2 100644 --- a/processor/tailsamplingprocessor/processor_test.go +++ b/processor/tailsamplingprocessor/processor_test.go @@ -15,7 +15,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opencensus.io/tag" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pcommon" @@ -26,6 +25,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/timeutils" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/idbatcher" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/telemetry" ) const ( @@ -119,6 +119,7 @@ func TestTraceIntegrity(t *testing.T) { mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ + T: telemetry.New(), ctx: context.Background(), nextConsumer: msp, maxNumTraces: spanCount, @@ -129,7 +130,6 @@ func TestTraceIntegrity(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, - mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -360,6 +360,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ + T: telemetry.New(), ctx: context.Background(), nextConsumer: msp, maxNumTraces: maxSize, @@ -370,7 +371,6 @@ func TestSamplingPolicyTypicalPath(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, - mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -422,6 +422,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ + T: telemetry.New(), ctx: context.Background(), nextConsumer: msp, maxNumTraces: maxSize, @@ -432,7 +433,6 @@ func TestSamplingPolicyInvertSampled(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, - mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -485,6 +485,7 @@ func TestSamplingMultiplePolicies(t *testing.T) { mpe2 := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ + T: telemetry.New(), ctx: context.Background(), nextConsumer: msp, maxNumTraces: maxSize, @@ -501,7 +502,6 @@ func TestSamplingMultiplePolicies(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, - mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -555,6 +555,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ + T: telemetry.New(), ctx: context.Background(), nextConsumer: msp, maxNumTraces: maxSize, @@ -565,7 +566,6 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, - mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -619,6 +619,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ + T: telemetry.New(), ctx: context.Background(), nextConsumer: msp, maxNumTraces: maxSize, @@ -628,7 +629,6 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) { deleteChan: make(chan pcommon.TraceID, maxSize), policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, - mutatorsBuf: make([]tag.Mutator, 1), numTracesOnMap: &atomic.Uint64{}, } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) @@ -680,6 +680,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { mpe1 := &mockPolicyEvaluator{} mpe2 := &mockPolicyEvaluator{} tsp := &tailSamplingSpanProcessor{ + T: telemetry.New(), ctx: context.Background(), nextConsumer: nextConsumer, maxNumTraces: maxSize, @@ -693,7 +694,6 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) { policyTicker: &manualTTicker{}, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, - mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() { @@ -751,6 +751,7 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { mpe := &mockPolicyEvaluator{} mtt := &manualTTicker{} tsp := &tailSamplingSpanProcessor{ + T: telemetry.New(), ctx: context.Background(), nextConsumer: msp, maxNumTraces: maxSize, @@ -761,7 +762,6 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) { policyTicker: mtt, tickerFrequency: 100 * time.Millisecond, numTracesOnMap: &atomic.Uint64{}, - mutatorsBuf: make([]tag.Mutator, 1), } require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost())) defer func() {