Skip to content

Commit

Permalink
Enforce max span attribute size (grafana#4335)
Browse files Browse the repository at this point in the history
* enforce max span attribute size

* changelog, lint, docs

* updated test and response

* update to truncate instead of discard

* update test

* add metrics to capture truncated attributes count

* remove test logs

* removing appending "_truncated"

* lint

* fix comments

* manifest.md

* fix test

* remove log line
  • Loading branch information
ie-pham authored Jan 2, 2025
1 parent 1b56102 commit d382b58
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ querier:
* [ENHANCEMENT] Update golang.org/x/crypto [#4474](https://github.com/grafana/tempo/pull/4474) (@javiermolinar)
* [ENHANCEMENT] Distributor shim: add test verifying receiver works (including metrics) [#4477](https://github.com/grafana/tempo/pull/4477) (@yvrhdn)
* [ENHANCEMENT] Reduce goroutines in all non-querier components. [#4484](https://github.com/grafana/tempo/pull/4484) (@joe-elliott)
* [ENHANCEMENT] Add option to enforce max span attribute size [#4335](https://github.com/grafana/tempo/pull/4335) (@ie-pham)
* [BUGFIX] Handle invalid TraceQL query filter in tag values v2 disk cache [#4392](https://github.com/grafana/tempo/pull/4392) (@electron0zero)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ distributor:
# instruct the client how to retry.
[retry_after_on_resource_exhausted: <duration> | default = '0' ]

# Optional
# Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing
# Setting this parameter to '0' would disable this check against attribute size
[max_span_attr_byte: <int> | default = '2048']

# Optional.
# Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable
# attributes exposed on /usage_metrics.
Expand Down
1 change: 1 addition & 0 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ distributor:
stale_duration: 15m0s
extend_writes: true
retry_after_on_resource_exhausted: 0s
max_span_attr_byte: 2048
ingester_client:
pool_config:
checkinterval: 15s
Expand Down
4 changes: 4 additions & 0 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Config struct {

// For testing.
factory ring_client.PoolAddrFunc `yaml:"-"`

MaxSpanAttrByte int `yaml:"max_span_attr_byte"`
}

type LogSpansConfig struct {
Expand All @@ -74,6 +76,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.OverrideRingKey = distributorRingKey
cfg.ExtendWrites = true

cfg.MaxSpanAttrByte = 2048 // 2KB

f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")
Expand Down
52 changes: 48 additions & 4 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/usagestats"
tempo_util "github.com/grafana/tempo/pkg/util"
Expand Down Expand Up @@ -112,6 +113,11 @@ var (
Name: "distributor_metrics_generator_clients",
Help: "The current number of metrics-generator clients.",
})
metricAttributesTruncated = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "distributor_attributes_truncated_total",
Help: "The total number of attribute keys or values truncated per tenant",
}, []string{"tenant"})

statBytesReceived = usagestats.NewCounter("distributor_bytes_received")
statSpansReceived = usagestats.NewCounter("distributor_spans_received")
Expand Down Expand Up @@ -378,13 +384,17 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
d.usage.Observe(userID, batches)
}

keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrByte)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger)
return nil, err
}

if truncatedAttributeCount > 0 {
metricAttributesTruncated.WithLabelValues(userID).Add(float64(truncatedAttributeCount))
}

err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys)
if err != nil {
return nil, err
Expand Down Expand Up @@ -525,18 +535,29 @@ func (d *Distributor) UsageTrackerHandler() http.Handler {

// requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring
// and traces to pass onto the ingesters.
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*rebatchedTrace, error) {
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, int, error) {
const tracesPerBatch = 20 // p50 of internal env
tracesByID := make(map[uint32]*rebatchedTrace, tracesPerBatch)
truncatedAttributeCount := 0

for _, b := range batches {
spansByILS := make(map[uint32]*v1.ScopeSpans)
// check for large resources for large attributes
if maxSpanAttrSize > 0 && b.Resource != nil {
resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize)
truncatedAttributeCount += resourceAttrTruncatedCount
}

for _, ils := range b.ScopeSpans {
for _, span := range ils.Spans {
// check large spans for large attributes
if maxSpanAttrSize > 0 {
spanAttrTruncatedCount := processAttributes(span.Attributes, maxSpanAttrSize)
truncatedAttributeCount += spanAttrTruncatedCount
}
traceID := span.TraceId
if !validation.ValidTraceID(traceID) {
return nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8)
return nil, nil, 0, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8)
}

traceKey := tempo_util.TokenFor(userID, traceID)
Expand Down Expand Up @@ -602,7 +623,30 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int
traces = append(traces, r)
}

return keys, traces, nil
return keys, traces, truncatedAttributeCount, nil
}

// find and truncate the span attributes that are too large
func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int) int {
count := 0
for _, attr := range attributes {
if len(attr.Key) > maxAttrSize {
attr.Key = attr.Key[:maxAttrSize]
count++
}

switch value := attr.GetValue().Value.(type) {
case *v1_common.AnyValue_StringValue:
if len(value.StringValue) > maxAttrSize {
value.StringValue = value.StringValue[:maxAttrSize]
count++
}
default:
continue
}
}

return count
}

// discardedPredicate determines if a trace is discarded based on the number of successful replications.
Expand Down
67 changes: 62 additions & 5 deletions modules/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func TestRequestsByTraceID(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
keys, rebatchedTraces, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1)
keys, rebatchedTraces, _, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1, 1000)
require.Equal(t, len(keys), len(rebatchedTraces))

for i, expectedKey := range tt.expectedKeys {
Expand All @@ -739,9 +739,64 @@ func TestRequestsByTraceID(t *testing.T) {
}
}

func TestProcessAttributes(t *testing.T) {
spanCount := 10
batchCount := 3
trace := test.MakeTraceWithSpanCount(batchCount, spanCount, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F})

maxAttrByte := 1000
longString := strings.Repeat("t", 1100)

// add long attributes to the resource level
trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes,
test.MakeAttribute("long value", longString),
)
trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes,
test.MakeAttribute(longString, "long key"),
)

// add long attributes to the span level
trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes,
test.MakeAttribute("long value", longString),
)
trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes,
test.MakeAttribute(longString, "long key"),
)

_, rebatchedTrace, truncatedCount, _ := requestsByTraceID(trace.ResourceSpans, "test", spanCount*batchCount, maxAttrByte)
assert.Equal(t, 4, truncatedCount)
for _, rT := range rebatchedTrace {
for _, resource := range rT.trace.ResourceSpans {
// find large resource attributes
for _, attr := range resource.Resource.Attributes {
if attr.Key == "long value" {
assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue())
}
if attr.Value.GetStringValue() == "long key" {
assert.Equal(t, longString[:maxAttrByte], attr.Key)
}
}
// find large span attributes
for _, scope := range resource.ScopeSpans {
for _, span := range scope.Spans {
for _, attr := range span.Attributes {
if attr.Key == "long value" {
assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue())
}
if attr.Value.GetStringValue() == "long key" {
assert.Equal(t, longString[:maxAttrByte], attr.Key)
}
}
}
}

}
}
}

func BenchmarkTestsByRequestID(b *testing.B) {
spansPer := 100
batches := 10
spansPer := 5000
batches := 100
traces := []*tempopb.Trace{
test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}),
test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0B, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}),
Expand All @@ -757,14 +812,15 @@ func BenchmarkTestsByRequestID(b *testing.B) {
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
for _, blerg := range ils {
_, _, err := requestsByTraceID([]*v1.ResourceSpans{
_, _, _, err := requestsByTraceID([]*v1.ResourceSpans{
{
ScopeSpans: blerg,
},
}, "test", spansPer*len(traces))
}, "test", spansPer*len(traces), 5)
require.NoError(b, err)
}
}
Expand Down Expand Up @@ -1631,6 +1687,7 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Dist
})
}

distributorConfig.MaxSpanAttrByte = 1000
distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond
distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int())
distributorConfig.DistributorRing.KVStore.Mock = nil
Expand Down
4 changes: 2 additions & 2 deletions modules/distributor/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestForwarder(t *testing.T) {
require.NoError(t, err)

b := test.MakeBatch(10, id)
keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10)
keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
require.NoError(t, err)

o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer)
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestForwarder_shutdown(t *testing.T) {
require.NoError(t, err)

b := test.MakeBatch(10, id)
keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10)
keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
require.NoError(t, err)

o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer)
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/test/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ import (
"github.com/stretchr/testify/require"
)

func MakeAttribute(key, value string) *v1_common.KeyValue {
return &v1_common.KeyValue{
Key: key,
Value: &v1_common.AnyValue{
Value: &v1_common.AnyValue_StringValue{
StringValue: value,
},
},
}
}

func MakeSpan(traceID []byte) *v1_trace.Span {
return MakeSpanWithAttributeCount(traceID, rand.Int()%10+1)
}
Expand Down

0 comments on commit d382b58

Please sign in to comment.