Skip to content

Commit

Permalink
fix: sanitize OTEL datastream (#369)
Browse files Browse the repository at this point in the history
  • Loading branch information
rubvs authored Sep 26, 2024
1 parent 8d96aae commit bd5e341
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 19 deletions.
4 changes: 2 additions & 2 deletions input/otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ func (c *Consumer) convertLogRecord(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
setLabel(replaceDots(k), event, v)
}
Expand Down
14 changes: 14 additions & 0 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package otlp_test
import (
"context"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -532,6 +533,10 @@ func processLogEvents(t *testing.T, logs plog.Logs) modelpb.Batch {
}

func TestConsumerConsumeLogsDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLenNamespace := otlp.MaxDataStreamBytes - len(otlp.DisallowedNamespaceRunes)
maxLenDataset := otlp.MaxDataStreamBytes - len(otlp.DisallowedDatasetRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -567,6 +572,15 @@ func TestConsumerConsumeLogsDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDatasetRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedNamespaceRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDatasetRunes)) + randomString[:maxLenDataset],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedNamespaceRunes)) + randomString[:maxLenNamespace],
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
t.Run(tcName, func(t *testing.T) {
Expand Down
44 changes: 39 additions & 5 deletions input/otlp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"regexp"
"strconv"
"strings"
"unicode"

"go.opentelemetry.io/collector/pdata/pcommon"
semconv "go.opentelemetry.io/collector/semconv/v1.25.0"
Expand All @@ -30,7 +31,10 @@ import (
)

const (
AgentNameJaeger = "Jaeger"
AgentNameJaeger = "Jaeger"
MaxDataStreamBytes = 100
DisallowedNamespaceRunes = "\\/*?\"<>| ,#:"
DisallowedDatasetRunes = "-\\/*?\"<>| ,#:"
)

var (
Expand Down Expand Up @@ -316,12 +320,12 @@ func translateResourceMetadata(resource pcommon.Resource, out *modelpb.APMEvent)
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Dataset = v.Str()
out.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Namespace = v.Str()
out.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
if out.Labels == nil {
out.Labels = make(modelpb.Labels)
Expand Down Expand Up @@ -459,12 +463,12 @@ func translateScopeMetadata(scope pcommon.InstrumentationScope, out *modelpb.APM
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Dataset = v.Str()
out.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if out.DataStream == nil {
out.DataStream = &modelpb.DataStream{}
}
out.DataStream.Namespace = v.Str()
out.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
}
return true
})
Expand Down Expand Up @@ -545,3 +549,33 @@ func setLabel(key string, event *modelpb.APMEvent, v pcommon.Value) {
}
}
}

// Sanitize the datastream fields (dataset, namespace) to apply restrictions
// as outlined in https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
func sanitizeDataStreamDataset(field string) string {
field = strings.Map(replaceReservedRune(DisallowedDatasetRunes), field)
if len(field) > MaxDataStreamBytes {
return field[:MaxDataStreamBytes]
}

return field
}

// Sanitize the datastream fields (dataset, namespace) to apply restrictions
// as outlined in https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
func sanitizeDataStreamNamespace(field string) string {
field = strings.Map(replaceReservedRune(DisallowedNamespaceRunes), field)
if len(field) > MaxDataStreamBytes {
return field[:MaxDataStreamBytes]
}
return field
}

func replaceReservedRune(disallowedRunes string) func(r rune) rune {
return func(r rune) rune {
if strings.ContainsRune(disallowedRunes, r) {
return '_'
}
return unicode.ToLower(r)
}
}
4 changes: 2 additions & 2 deletions input/otlp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ func (c *Consumer) handleScopeMetrics(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())

// The below fields are required by the Processes tab of the
// curated Kibana's hostmetrics UI. These fields are
Expand Down
13 changes: 13 additions & 0 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ func TestConsumeMetricsExportTimestamp(t *testing.T) {
}

func TestConsumeMetricsDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLenNamespace := otlp.MaxDataStreamBytes - len(otlp.DisallowedNamespaceRunes)
maxLenDataset := otlp.MaxDataStreamBytes - len(otlp.DisallowedDatasetRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -764,6 +768,15 @@ func TestConsumeMetricsDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDatasetRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedNamespaceRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDatasetRunes)) + randomString[:maxLenDataset],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedNamespaceRunes)) + randomString[:maxLenNamespace],
},
} {
tcName := fmt.Sprintf("%s,%s", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace)
t.Run(tcName, func(t *testing.T) {
Expand Down
20 changes: 10 additions & 10 deletions input/otlp/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,12 @@ func TranslateTransaction(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = stringval
event.DataStream.Dataset = sanitizeDataStreamDataset(stringval)
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = stringval
event.DataStream.Namespace = sanitizeDataStreamNamespace(stringval)
default:
modelpb.Labels(event.Labels).Set(k, stringval)
}
Expand Down Expand Up @@ -824,12 +824,12 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = stringval
event.DataStream.Dataset = sanitizeDataStreamDataset(stringval)
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = stringval
event.DataStream.Namespace = sanitizeDataStreamNamespace(stringval)
default:
setLabel(k, event, v)
}
Expand Down Expand Up @@ -1103,12 +1103,12 @@ func (c *Consumer) convertSpanEvent(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())

default:
setLabel(replaceDots(k), event, v)
Expand Down Expand Up @@ -1144,12 +1144,12 @@ func (c *Consumer) convertSpanEvent(
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Dataset = v.Str()
event.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if event.DataStream == nil {
event.DataStream = &modelpb.DataStream{}
}
event.DataStream.Namespace = v.Str()
event.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())
default:
k = replaceDots(k)
if isJaeger && k == "message" {
Expand Down Expand Up @@ -1202,12 +1202,12 @@ func (c *Consumer) convertJaegerErrorSpanEvent(event ptrace.SpanEvent, apmEvent
if apmEvent.DataStream == nil {
apmEvent.DataStream = &modelpb.DataStream{}
}
apmEvent.DataStream.Dataset = v.Str()
apmEvent.DataStream.Dataset = sanitizeDataStreamDataset(v.Str())
case attributeDataStreamNamespace:
if apmEvent.DataStream == nil {
apmEvent.DataStream = &modelpb.DataStream{}
}
apmEvent.DataStream.Namespace = v.Str()
apmEvent.DataStream.Namespace = sanitizeDataStreamNamespace(v.Str())

default:
setLabel(replaceDots(k), apmEvent, v)
Expand Down
13 changes: 13 additions & 0 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,10 @@ func TestSpanNetworkAttributes(t *testing.T) {
}

func TestSpanDataStream(t *testing.T) {
randomString := strings.Repeat("abcdefghijklmnopqrstuvwxyz0123456789", 10)
maxLenNamespace := otlp.MaxDataStreamBytes - len(otlp.DisallowedNamespaceRunes)
maxLenDataset := otlp.MaxDataStreamBytes - len(otlp.DisallowedDatasetRunes)

for _, tc := range []struct {
resourceDataStreamDataset string
resourceDataStreamNamespace string
Expand Down Expand Up @@ -908,6 +912,15 @@ func TestSpanDataStream(t *testing.T) {
expectedDataStreamDataset: "1",
expectedDataStreamNamespace: "2",
},
// Test data sanitization: https://www.elastic.co/guide/en/ecs/current/ecs-data_stream.html
// 1. Replace all disallowed runes with _
// 2. Datastream length should not exceed otlp.MaxDataStreamBytes
{
resourceDataStreamDataset: otlp.DisallowedDatasetRunes + randomString,
resourceDataStreamNamespace: otlp.DisallowedNamespaceRunes + randomString,
expectedDataStreamDataset: strings.Repeat("_", len(otlp.DisallowedDatasetRunes)) + randomString[:maxLenDataset],
expectedDataStreamNamespace: strings.Repeat("_", len(otlp.DisallowedNamespaceRunes)) + randomString[:maxLenNamespace],
},
} {
for _, isTxn := range []bool{false, true} {
tcName := fmt.Sprintf("%s,%s,txn=%v", tc.expectedDataStreamDataset, tc.expectedDataStreamNamespace, isTxn)
Expand Down

0 comments on commit bd5e341

Please sign in to comment.