Skip to content

Commit

Permalink
tailsamplingprocessor: Optimize tag mutator memory allocations (open-…
Browse files Browse the repository at this point in the history
…telemetry#27889)

**Description:**

Since each `tailSamplingSpanProcessor`'s instance is not concurrently
called by the ticker worker (it's a 1-to-1 relationship) we can safely
reuse a slice for the tag mutators used in `makeDecision`. Additionally
the tag mutators themselves were causing a lot of allocations and since
they are static, we created constants for them preventing allocations on
each execution of `makeDecision`.

This improved the `makeDecision` benchmark by ~31%.

```
benchstat old.txt new.txt
name         old time/op  new time/op  delta
Sampling-10  51.8µs ± 1%  35.7µs ± 1%  -30.94%  (p=0.008 n=5+5)
```

**Testing:** Unit tests unchanged; added a benchmark

**Documentation:** Perf improvement so no documentation changes needed.

This was all based on production profiling data at Polar Signals running
the collector. Here is a snapshot of the original profiling data we
started with: https://pprof.me/52a7fab/

Judging by the production profiling data, a 31% improvement on the
`makeDecision` codepath, should translate roughly into a 6% baseline CPU
improvement our production deployment of the opentelemetry collector.

The profiling data after improving: https://pprof.me/58c0e84/

This improvement was done as part of the Let's Profile Livestream where
we optimize popular open-source projects live:
https://www.youtube.com/watch?v=vkMQRjiNTHM

---------

Co-authored-by: Jiekun <zhujiekun@52tt.com>
  • Loading branch information
2 people authored and sigilioso committed Oct 27, 2023
1 parent fdb86a6 commit 1e9d8c1
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 4 deletions.
27 changes: 27 additions & 0 deletions .chloggen/optimize-tailsamplingprocessor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: tailsamplingprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Optimize performance of tailsamplingprocessor

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27889]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
26 changes: 22 additions & 4 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/tailsamplingprocessor/internal/sampling"
)

var (
tagUpsertSampled = tag.Upsert(tagSampledKey, "true")
tagUpsertNotSampled = tag.Upsert(tagSampledKey, "false")
)

// policy combines a sampling policy evaluator with the destinations to be
// used for that policy.
type policy struct {
Expand All @@ -51,6 +56,10 @@ type tailSamplingSpanProcessor struct {
decisionBatcher idbatcher.Batcher
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64

// This is for reusing the slice by each call of `makeDecision`. This
// was previously identified to be a bottleneck using profiling.
mutatorsBuf []tag.Mutator
}

// spanAndScope a structure for holding information about span and its instrumentation scope.
Expand Down Expand Up @@ -115,6 +124,10 @@ func newTracesProcessor(ctx context.Context, settings component.TelemetrySetting
policies: policies,
tickerFrequency: time.Second,
numTracesOnMap: &atomic.Uint64{},

// We allocate exactly 1 element, because that's the exact amount
// used in any place.
mutatorsBuf: make([]tag.Mutator, 1),
}

tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick}
Expand Down Expand Up @@ -279,6 +292,7 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
finalDecision = sampling.Sampled
}

mutators := tsp.mutatorsBuf
for i, p := range tsp.policies {
switch trace.Decisions[i] {
case sampling.Sampled:
Expand All @@ -288,17 +302,19 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa
matchingPolicy = p
}

mutators[0] = tagUpsertSampled
_ = stats.RecordWithTags(
p.ctx,
[]tag.Mutator{tag.Upsert(tagSampledKey, "true")},
mutators,
statCountTracesSampled.M(int64(1)),
)
metrics.decisionSampled++

case sampling.NotSampled:
mutators[0] = tagUpsertNotSampled
_ = stats.RecordWithTags(
p.ctx,
[]tag.Mutator{tag.Upsert(tagSampledKey, "false")},
mutators,
statCountTracesSampled.M(int64(1)),
)
metrics.decisionNotSampled++
Expand All @@ -307,15 +323,17 @@ func (tsp *tailSamplingSpanProcessor) makeDecision(id pcommon.TraceID, trace *sa

switch finalDecision {
case sampling.Sampled:
mutators[0] = tagUpsertSampled
_ = stats.RecordWithTags(
tsp.ctx,
[]tag.Mutator{tag.Upsert(tagSampledKey, "true")},
mutators,
statCountGlobalTracesSampled.M(int64(1)),
)
case sampling.NotSampled:
mutators[0] = tagUpsertNotSampled
_ = stats.RecordWithTags(
tsp.ctx,
[]tag.Mutator{tag.Upsert(tagSampledKey, "false")},
mutators,
statCountGlobalTracesSampled.M(int64(1)),
)
}
Expand Down
43 changes: 43 additions & 0 deletions processor/tailsamplingprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/tag"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -128,6 +129,7 @@ func TestTraceIntegrity(t *testing.T) {
policyTicker: mtt,
tickerFrequency: 100 * time.Millisecond,
numTracesOnMap: &atomic.Uint64{},
mutatorsBuf: make([]tag.Mutator, 1),
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down Expand Up @@ -360,6 +362,7 @@ func TestSamplingPolicyTypicalPath(t *testing.T) {
policyTicker: mtt,
tickerFrequency: 100 * time.Millisecond,
numTracesOnMap: &atomic.Uint64{},
mutatorsBuf: make([]tag.Mutator, 1),
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down Expand Up @@ -421,6 +424,7 @@ func TestSamplingPolicyInvertSampled(t *testing.T) {
policyTicker: mtt,
tickerFrequency: 100 * time.Millisecond,
numTracesOnMap: &atomic.Uint64{},
mutatorsBuf: make([]tag.Mutator, 1),
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down Expand Up @@ -489,6 +493,7 @@ func TestSamplingMultiplePolicies(t *testing.T) {
policyTicker: mtt,
tickerFrequency: 100 * time.Millisecond,
numTracesOnMap: &atomic.Uint64{},
mutatorsBuf: make([]tag.Mutator, 1),
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down Expand Up @@ -552,6 +557,7 @@ func TestSamplingPolicyDecisionNotSampled(t *testing.T) {
policyTicker: mtt,
tickerFrequency: 100 * time.Millisecond,
numTracesOnMap: &atomic.Uint64{},
mutatorsBuf: make([]tag.Mutator, 1),
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down Expand Up @@ -614,6 +620,7 @@ func TestSamplingPolicyDecisionInvertNotSampled(t *testing.T) {
deleteChan: make(chan pcommon.TraceID, maxSize),
policyTicker: mtt,
tickerFrequency: 100 * time.Millisecond,
mutatorsBuf: make([]tag.Mutator, 1),
numTracesOnMap: &atomic.Uint64{},
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
Expand Down Expand Up @@ -678,6 +685,7 @@ func TestLateArrivingSpansAssignedOriginalDecision(t *testing.T) {
policyTicker: &manualTTicker{},
tickerFrequency: 100 * time.Millisecond,
numTracesOnMap: &atomic.Uint64{},
mutatorsBuf: make([]tag.Mutator, 1),
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down Expand Up @@ -745,6 +753,7 @@ func TestMultipleBatchesAreCombinedIntoOne(t *testing.T) {
policyTicker: mtt,
tickerFrequency: 100 * time.Millisecond,
numTracesOnMap: &atomic.Uint64{},
mutatorsBuf: make([]tag.Mutator, 1),
}
require.NoError(t, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
Expand Down Expand Up @@ -1013,3 +1022,37 @@ func simpleTracesWithID(traceID pcommon.TraceID) ptrace.Traces {
traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty().SetTraceID(traceID)
return traces
}

func BenchmarkSampling(b *testing.B) {
traceIds, batches := generateIdsAndBatches(128)
cfg := Config{
DecisionWait: defaultTestDecisionWait,
NumTraces: uint64(2 * len(traceIds)),
ExpectedNewTracesPerSec: 64,
PolicyCfgs: testPolicy,
}

sp, _ := newTracesProcessor(context.Background(), componenttest.NewNopTelemetrySettings(), consumertest.NewNop(), cfg)
tsp := sp.(*tailSamplingSpanProcessor)
require.NoError(b, tsp.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
require.NoError(b, tsp.Shutdown(context.Background()))
}()
metrics := &policyMetrics{}
sampleBatches := make([]*sampling.TraceData, 0, len(batches))

for i := 0; i < len(batches); i++ {
sampleBatches = append(sampleBatches, &sampling.TraceData{
Decisions: []sampling.Decision{sampling.Pending},
ArrivalTime: time.Now(),
//SpanCount: spanCount,
ReceivedBatches: ptrace.NewTraces(),
})
}

for i := 0; i < b.N; i++ {
for i, id := range traceIds {
_, _ = tsp.makeDecision(id, sampleBatches[i], metrics)
}
}
}

0 comments on commit 1e9d8c1

Please sign in to comment.