Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[elasticsearchexporter] Support for complex attributes for log records in OTel mode #37021

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/elasticsearchexporter_complex-log-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: elasticsearchexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support for complex attributes for log records in OTel mode

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37021]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
30 changes: 30 additions & 0 deletions exporter/elasticsearchexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,36 @@ func TestExporterLogs(t *testing.T) {
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"a":"a","a.b":"a.b"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})

t.Run("otel mode attribute complex value", func(t *testing.T) {
rec := newBulkRecorder()
server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) {
rec.Record(docs)
return itemsAllOK(docs)
})

exporter := newTestLogsExporter(t, server.URL, func(cfg *Config) {
cfg.Mapping.Mode = "otel"
})

logs := plog.NewLogs()
resourceLog := logs.ResourceLogs().AppendEmpty()
resourceLog.Resource().Attributes().PutEmptyMap("some.resource.attribute").PutEmptyMap("foo").PutStr("bar", "baz")
Copy link
Contributor Author

@felixbarny felixbarny Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this kinda works, there are some limitations due to the way obj model gets serialized and de-dotted in OTel mode. The test would fail if, for example changing to

resourceLog.Resource().Attributes().PutEmptyMap("some.resource.attribute").PutEmptyMap("foo.bar").PutStr("bar", "baz")

I think we should merge the change despite this limitation but think about longer-term alternatives.

I'm currently doing a POC of serializing pdata events directly to JSON for the OTel mode, without first converting them do an objmodel.Document. This seems to work out quite well so far and I'd also suspect it to yield performance improvements due to one less layer of abstraction and avoiding the allocation of temporary objects. WDYT of that approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the draft PR for that POC: #37032

scopeLog := resourceLog.ScopeLogs().AppendEmpty()
scopeLog.Scope().Attributes().PutEmptyMap("some.scope.attribute").PutEmptyMap("foo").PutStr("bar", "baz")
logRecord := scopeLog.LogRecords().AppendEmpty()
logRecord.Attributes().PutEmptyMap("some.record.attribute").PutEmptyMap("foo").PutStr("bar", "baz")

mustSendLogs(t, exporter, logs)

rec.WaitItems(1)

assert.Len(t, rec.Items(), 1)
doc := rec.Items()[0].Document
assert.JSONEq(t, `{"some.record.attribute":{"foo":{"bar":"baz"}}}`, gjson.GetBytes(doc, `attributes`).Raw)
assert.JSONEq(t, `{"some.scope.attribute.foo.bar":"baz"}`, gjson.GetBytes(doc, `scope.attributes`).Raw)
assert.JSONEq(t, `{"some.resource.attribute.foo.bar":"baz"}`, gjson.GetBytes(doc, `resource.attributes`).Raw)
})
}

func TestExporterMetrics(t *testing.T) {
Expand Down
28 changes: 17 additions & 11 deletions exporter/elasticsearchexporter/internal/objmodel/objmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func DocumentFromAttributesWithPath(path string, am pcommon.Map) Document {
}

fields := make([]field, 0, am.Len())
fields = appendAttributeFields(fields, path, am)
fields = appendAttributeFields(fields, path, am, true)
return Document{fields: fields}
}

Expand Down Expand Up @@ -175,10 +175,16 @@ func (doc *Document) AddUInt(key string, value uint64) {
doc.Add(key, UIntValue(value))
}

// AddAttributes expands and flattens all key-value pairs from the input attribute map into
// AddFlattenedAttributes expands and flattens all key-value pairs from the input attribute map into
// the document.
func (doc *Document) AddAttributes(key string, attributes pcommon.Map) {
doc.fields = appendAttributeFields(doc.fields, key, attributes)
func (doc *Document) AddFlattenedAttributes(key string, attributes pcommon.Map) {
doc.AddAttributes(key, attributes, true)
}

// AddAttributes optionally expands and flattens all key-value pairs from the input attribute map into
// the document.
func (doc *Document) AddAttributes(key string, attributes pcommon.Map, flattenValues bool) {
doc.fields = appendAttributeFields(doc.fields, key, attributes, flattenValues)
}

// AddAttribute converts and adds a AttributeValue to the document. If the attribute represents a map,
Expand All @@ -188,7 +194,7 @@ func (doc *Document) AddAttribute(key string, attribute pcommon.Value) {
case pcommon.ValueTypeEmpty:
// do not add 'null'
case pcommon.ValueTypeMap:
doc.AddAttributes(key, attribute.Map())
doc.AddFlattenedAttributes(key, attribute.Map())
default:
doc.Add(key, ValueFromAttribute(attribute))
}
Expand All @@ -199,7 +205,7 @@ func (doc *Document) AddEvents(key string, events ptrace.SpanEventSlice) {
for i := 0; i < events.Len(); i++ {
e := events.At(i)
doc.AddTimestamp(flattenKey(key, e.Name()+".time"), e.Timestamp())
doc.AddAttributes(flattenKey(key, e.Name()), e.Attributes())
doc.AddFlattenedAttributes(flattenKey(key, e.Name()), e.Attributes())
}
}

Expand Down Expand Up @@ -584,21 +590,21 @@ func arrFromAttributes(aa pcommon.Slice) []Value {
return values
}

func appendAttributeFields(fields []field, path string, am pcommon.Map) []field {
func appendAttributeFields(fields []field, path string, am pcommon.Map, flattenValues bool) []field {
am.Range(func(k string, val pcommon.Value) bool {
fields = appendAttributeValue(fields, path, k, val)
fields = appendAttributeValue(fields, path, k, val, flattenValues)
return true
})
return fields
}

func appendAttributeValue(fields []field, path string, key string, attr pcommon.Value) []field {
func appendAttributeValue(fields []field, path string, key string, attr pcommon.Value, flattenValues bool) []field {
if attr.Type() == pcommon.ValueTypeEmpty {
return fields
}

if attr.Type() == pcommon.ValueTypeMap {
return appendAttributeFields(fields, flattenKey(path, key), attr.Map())
if flattenValues && attr.Type() == pcommon.ValueTypeMap {
return appendAttributeFields(fields, flattenKey(path, key), attr.Map(), true)
}

return append(fields, field{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestObjectModel_CreateMap(t *testing.T) {
m := pcommon.NewMap()
m.PutInt("i", 42)
m.PutStr("str", "test")
doc.AddAttributes("prefix", m)
doc.AddFlattenedAttributes("prefix", m)
return doc
},
want: Document{fields: []field{{"prefix.i", IntValue(42)}, {"prefix.str", StringValue("test")}}},
Expand Down
26 changes: 13 additions & 13 deletions exporter/elasticsearchexporter/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func (m *encodeModel) encodeLogDefaultMode(resource pcommon.Resource, record plo
document.AddInt("SeverityNumber", int64(record.SeverityNumber()))
document.AddAttribute("Body", record.Body())
m.encodeAttributes(&document, record.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddAttributes("Scope", scopeToAttributes(scope))
document.AddFlattenedAttributes("Resource", resource.Attributes())
document.AddFlattenedAttributes("Scope", scopeToAttributes(scope))

return document
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (m *encodeModel) encodeLogOTelMode(resource pcommon.Resource, resourceSchem
document.AddInt("severity_number", int64(record.SeverityNumber()))
document.AddInt("dropped_attributes_count", int64(record.DroppedAttributesCount()))

m.encodeAttributesOTelMode(&document, record.Attributes())
m.encodeAttributesOTelMode(&document, record.Attributes(), false)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

Expand Down Expand Up @@ -314,7 +314,7 @@ func (m *encodeModel) upsertMetricDataPointValueECSMode(documents map[uint32]obj
if document, ok = documents[hash]; !ok {
encodeAttributesECSMode(&document, resource.Attributes(), resourceAttrsConversionMap, resourceAttrsToPreserve)
document.AddTimestamp("@timestamp", dp.Timestamp())
document.AddAttributes("", dp.Attributes())
document.AddFlattenedAttributes("", dp.Attributes())
}

document.AddAttribute(metric.Name(), value)
Expand Down Expand Up @@ -342,7 +342,7 @@ func (m *encodeModel) upsertMetricDataPointValueOTelMode(documents map[uint32]ob
}
document.AddString("unit", metric.Unit())

m.encodeAttributesOTelMode(&document, dp.Attributes())
m.encodeAttributesOTelMode(&document, dp.Attributes(), true)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)
}
Expand Down Expand Up @@ -630,7 +630,7 @@ func (m *encodeModel) encodeScopeOTelMode(document *objmodel.Document, scope pco
document.Add("scope", objmodel.ValueFromAttribute(scopeMapVal))
}

func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map) {
func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attributeMap pcommon.Map, flattenValues bool) {
attrsCopy := pcommon.NewMap() // Copy to avoid mutating original map
attributeMap.CopyTo(attrsCopy)
attrsCopy.RemoveIf(func(key string, val pcommon.Value) bool {
Expand All @@ -647,7 +647,7 @@ func (m *encodeModel) encodeAttributesOTelMode(document *objmodel.Document, attr
return false
})
mergeGeolocation(attrsCopy)
document.AddAttributes("attributes", attrsCopy)
document.AddAttributes("attributes", attrsCopy, flattenValues)
}

func (m *encodeModel) encodeSpan(resource pcommon.Resource, resourceSchemaURL string, span ptrace.Span, scope pcommon.InstrumentationScope, scopeSchemaURL string) ([]byte, error) {
Expand Down Expand Up @@ -676,7 +676,7 @@ func (m *encodeModel) encodeSpanOTelMode(resource pcommon.Resource, resourceSche
document.AddString("kind", span.Kind().String())
document.AddUInt("duration", uint64(span.EndTimestamp()-span.StartTimestamp()))

m.encodeAttributesOTelMode(&document, span.Attributes())
m.encodeAttributesOTelMode(&document, span.Attributes(), true)

document.AddInt("dropped_attributes_count", int64(span.DroppedAttributesCount()))
document.AddInt("dropped_events_count", int64(span.DroppedEventsCount()))
Expand Down Expand Up @@ -719,10 +719,10 @@ func (m *encodeModel) encodeSpanDefaultMode(resource pcommon.Resource, span ptra
document.AddString("TraceStatusDescription", span.Status().Message())
document.AddString("Link", spanLinksToString(span.Links()))
m.encodeAttributes(&document, span.Attributes())
document.AddAttributes("Resource", resource.Attributes())
document.AddFlattenedAttributes("Resource", resource.Attributes())
m.encodeEvents(&document, span.Events())
document.AddInt("Duration", durationAsMicroseconds(span.StartTimestamp().AsTime(), span.EndTimestamp().AsTime())) // unit is microseconds
document.AddAttributes("Scope", scopeToAttributes(scope))
document.AddFlattenedAttributes("Scope", scopeToAttributes(scope))
return document
}

Expand All @@ -739,7 +739,7 @@ func (m *encodeModel) encodeSpanEvent(resource pcommon.Resource, resourceSchemaU
document.AddTraceID("trace_id", span.TraceID())
document.AddInt("dropped_attributes_count", int64(spanEvent.DroppedAttributesCount()))

m.encodeAttributesOTelMode(&document, spanEvent.Attributes())
m.encodeAttributesOTelMode(&document, spanEvent.Attributes(), true)
m.encodeResourceOTelMode(&document, resource, resourceSchemaURL)
m.encodeScopeOTelMode(&document, scope, scopeSchemaURL)

Expand All @@ -751,7 +751,7 @@ func (m *encodeModel) encodeAttributes(document *objmodel.Document, attributes p
if m.mode == MappingRaw {
key = ""
}
document.AddAttributes(key, attributes)
document.AddFlattenedAttributes(key, attributes)
}

func (m *encodeModel) encodeEvents(document *objmodel.Document, events ptrace.SpanEventSlice) {
Expand Down Expand Up @@ -796,7 +796,7 @@ func encodeAttributesECSMode(document *objmodel.Document, attrs pcommon.Map, con
if len(conversionMap) == 0 {
// No conversions to be done; add all attributes at top level of
// document.
document.AddAttributes("", attrs)
document.AddFlattenedAttributes("", attrs)
return
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/elasticsearchexporter/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@ func TestEncodeAttributes(t *testing.T) {
mappingMode: MappingNone,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddAttributes("Attributes", attributes)
doc.AddFlattenedAttributes("Attributes", attributes)
return doc
},
},
"ecs": {
mappingMode: MappingECS,
want: func() objmodel.Document {
doc := objmodel.Document{}
doc.AddAttributes("Attributes", attributes)
doc.AddFlattenedAttributes("Attributes", attributes)
return doc
},
},
Expand Down
Loading