Skip to content

Commit

Permalink
[receiver/datadog] Implement support for span links
Browse files Browse the repository at this point in the history
  • Loading branch information
lopes-felipe committed Jan 23, 2025
1 parent 231aa7f commit 8e20de7
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 17 deletions.
14 changes: 7 additions & 7 deletions pkg/translator/zipkin/zipkinv2/to_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ func zTagsToSpanLinks(tags map[string]string, dest ptrace.SpanLinkSlice) error {

// Convert trace id.
rawTrace := [16]byte{}
errTrace := unmarshalJSON(rawTrace[:], []byte(parts[0]))
errTrace := UnmarshalJSON(rawTrace[:], []byte(parts[0]))
if errTrace != nil {
return errTrace
}
link.SetTraceID(rawTrace)

// Convert span id.
rawSpan := [8]byte{}
errSpan := unmarshalJSON(rawSpan[:], []byte(parts[1]))
errSpan := UnmarshalJSON(rawSpan[:], []byte(parts[1]))
if errSpan != nil {
return errSpan
}
Expand All @@ -228,7 +228,7 @@ func zTagsToSpanLinks(tags map[string]string, dest ptrace.SpanLinkSlice) error {
if err := json.Unmarshal([]byte(jsonStr), &attrs); err != nil {
return err
}
if err := jsonMapToAttributeMap(attrs, link.Attributes()); err != nil {
if err := JsonMapToAttributeMap(attrs, link.Attributes()); err != nil {
return err
}

Expand Down Expand Up @@ -265,7 +265,7 @@ func populateSpanEvents(zspan *zipkinmodel.SpanModel, events ptrace.SpanEventSli
if err := json.Unmarshal([]byte(jsonStr), &attrs); err != nil {
return err
}
if err := jsonMapToAttributeMap(attrs, event.Attributes()); err != nil {
if err := JsonMapToAttributeMap(attrs, event.Attributes()); err != nil {
return err
}

Expand All @@ -278,7 +278,7 @@ func populateSpanEvents(zspan *zipkinmodel.SpanModel, events ptrace.SpanEventSli
return nil
}

func jsonMapToAttributeMap(attrs map[string]any, dest pcommon.Map) error {
func JsonMapToAttributeMap(attrs map[string]any, dest pcommon.Map) error {
for key, val := range attrs {
if s, ok := val.(string); ok {
dest.PutStr(key, s)
Expand Down Expand Up @@ -437,9 +437,9 @@ func setTimestampsV2(zspan *zipkinmodel.SpanModel, dest ptrace.Span, destAttrs p
}
}

// unmarshalJSON inflates trace id from hex string, possibly enclosed in quotes.
// UnmarshalJSON inflates trace id from hex string, possibly enclosed in quotes.
// TODO: Find a way to avoid this duplicate code. Consider to expose this in pdata.
func unmarshalJSON(dst []byte, src []byte) error {
func UnmarshalJSON(dst []byte, src []byte) error {
if l := len(src); l >= 2 && src[0] == '"' && src[l-1] == '"' {
src = src[1 : l-1]
}
Expand Down
5 changes: 5 additions & 0 deletions receiver/datadogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ https://github.com/open-telemetry/opentelemetry-collector/tree/main/config/confi
- `dd.span.Resource`: The datadog resource name (as distinct from the span name)

### Optional Attributes

- `_dd.span_links`: This receiver supports DD Agent's `_dd.span_links` attribute for span links creation, as produced by Datadog's tracing libraries.
Format example can be found [here](./internal/translator/traces_translator_test.go).

### Datadog's API support

**Traces**
Expand Down
14 changes: 10 additions & 4 deletions receiver/datadogreceiver/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver

go 1.22.0
go 1.22.7

toolchain go1.23.0

require (
github.com/DataDog/agent-payload/v5 v5.0.140
Expand All @@ -12,6 +14,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.118.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.118.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.118.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.118.0
github.com/stretchr/testify v1.10.0
github.com/tinylib/msgp v1.2.5
github.com/vmihailenco/msgpack/v5 v5.4.1
Expand Down Expand Up @@ -44,7 +47,7 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
Expand All @@ -68,11 +71,12 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.118.0 // indirect
github.com/openzipkin/zipkin-go v0.4.3 // indirect
github.com/outcaste-io/ristretto v0.2.1 // indirect
github.com/philhofer/fwd v1.1.3-0.20240916144458-20a13a1f6b7c // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect
github.com/rs/cors v1.11.1 // indirect
github.com/secure-systems-lab/go-securesystemslib v0.7.0 // indirect
Expand Down Expand Up @@ -108,7 +112,7 @@ require (
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241216192217-9240e9c98484 // indirect
google.golang.org/grpc v1.69.4 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand All @@ -131,3 +135,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden =>
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal => ../../internal/coreinternal

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin => ../../pkg/translator/zipkin
12 changes: 8 additions & 4 deletions receiver/datadogreceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions receiver/datadogreceiver/internal/translator/traces_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
semconv "go.opentelemetry.io/collector/semconv/v1.16.0"
"google.golang.org/protobuf/proto"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/datadogreceiver/internal/translator/header"
)

Expand Down Expand Up @@ -98,6 +99,8 @@ func ToTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
}
newSpan := slice.AppendEmpty()

_ = tagsToSpanLinks(span.GetMeta(), newSpan.Links())

newSpan.SetTraceID(uInt64ToTraceID(0, span.TraceID))
newSpan.SetSpanID(uInt64ToSpanID(span.SpanID))
newSpan.SetStartTimestamp(pcommon.Timestamp(span.Start))
Expand Down Expand Up @@ -165,6 +168,56 @@ func ToTraces(payload *pb.TracerPayload, req *http.Request) ptrace.Traces {
return results
}

// DDSpanLink represents the structure of each JSON object
type DDSpanLink struct {
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
Tracestate string `json:"tracestate"`
Attributes map[string]any `json:"attributes"`
}

func tagsToSpanLinks(tags map[string]string, dest ptrace.SpanLinkSlice) error {
key := "_dd.span_links"
val, ok := tags[key]
if !ok {
return nil
}
delete(tags, key)

var spans []DDSpanLink
err := json.Unmarshal([]byte(val), &spans)
if err != nil {
return err
}

for i := 0; i < len(spans); i++ {
span := spans[i]
link := dest.AppendEmpty()

// Convert trace id.
rawTrace := [16]byte{}
errTrace := zipkinv2.UnmarshalJSON(rawTrace[:], []byte(span.TraceID))
if errTrace != nil {
return errTrace
}
link.SetTraceID(rawTrace)

// Convert span id.
rawSpan := [8]byte{}
errSpan := zipkinv2.UnmarshalJSON(rawSpan[:], []byte(span.SpanID))
if errSpan != nil {
return errSpan
}
link.SetSpanID(rawSpan)

link.TraceState().FromRaw(span.Tracestate)

_ = zipkinv2.JsonMapToAttributeMap(span.Attributes, link.Attributes())
}

return nil
}

var bufferPool = sync.Pool{
New: func() any {
return new(bytes.Buffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ var data = [2]any{
11: "service.name",
12: "1.0.1",
13: "version",
14: "_dd.span_links",
15: `[{"attributes":{"attr1":"val1","attr2":"val2"},"span_id":"70666bf9dee4a3fe","trace_id":"0eacdb57bebc935038bf5b4802ccabd5","tracestate":"dd=k:v"}]`,
},
1: [][][12]any{
{
Expand All @@ -57,6 +59,7 @@ var data = [2]any{
2: 3,
11: 6,
13: 12,
14: 15,
},
map[any]float64{
5: 1.2,
Expand Down Expand Up @@ -107,6 +110,13 @@ func TestTracePayloadV05Unmarshalling(t *testing.T) {
numericAttributeValue, _ := span.Attributes().Get("numeric_attribute")
numericAttributeFloat, _ := strconv.ParseFloat(numericAttributeValue.AsString(), 64)
assert.Equal(t, 1.2, numericAttributeFloat)

spanLink := span.Links().At(0)
assert.Equal(t, "70666bf9dee4a3fe", spanLink.SpanID().String())
assert.Equal(t, "0eacdb57bebc935038bf5b4802ccabd5", spanLink.TraceID().String())
assert.Equal(t, "dd=k:v", spanLink.TraceState().AsRaw())
spanLinkAttrVal, _ := spanLink.Attributes().Get("attr1")
assert.Equal(t, "val1", spanLinkAttrVal.Str())
}

func TestTracePayloadV07Unmarshalling(t *testing.T) {
Expand All @@ -126,7 +136,7 @@ func TestTracePayloadV07Unmarshalling(t *testing.T) {
translated := translatedPayloads[0]
span := translated.GetChunks()[0].GetSpans()[0]
assert.NotNil(t, span)
assert.Len(t, span.GetMeta(), 5, "missing attributes")
assert.Len(t, span.GetMeta(), 6, "missing attributes")
value, exists := span.GetMeta()["service.name"]
assert.True(t, exists, "service.name missing")
assert.Equal(t, "my-service", value, "service.name attribute value incorrect")
Expand Down Expand Up @@ -166,7 +176,7 @@ func TestTracePayloadApiV02Unmarshalling(t *testing.T) {
span := translated.Chunks[0].Spans[0]

assert.NotNil(t, span)
assert.Len(t, span.Meta, 5, "missing attributes")
assert.Len(t, span.Meta, 6, "missing attributes")
assert.Equal(t, "my-service", span.Meta["service.name"])
assert.Equal(t, "my-name", span.Name)
assert.Equal(t, "my-resource", span.Resource)
Expand Down

0 comments on commit 8e20de7

Please sign in to comment.