Skip to content

[8.x](backport #42412) otelconsumer: set document id attribute for elasticsearchexporter #42535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
)

const (
// esDocumentIDAttribute is the attribute key used to store the document ID in the log record.
esDocumentIDAttribute = "elasticsearch.document_id"
)

func init() {
outputs.RegisterType("otelconsumer", makeOtelConsumer)
}
Expand Down Expand Up @@ -84,9 +89,30 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)
sourceLogs := resourceLogs.ScopeLogs().AppendEmpty()
logRecords := sourceLogs.LogRecords()

// Convert the batch of events to Otel plog.Logs. The encoding we
// choose here is to set all fields in a Map in the Body of the log
// record. Each log record encodes a single beats event.
// This way we have full control over the final structure of the log in the
// destination, as long as the exporter allows it.
// For example, the elasticsearchexporter has an encoding specifically for this.
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/35444.
events := batch.Events()
for _, event := range events {
logRecord := logRecords.AppendEmpty()

if id, ok := event.Content.Meta["_id"]; ok {
// Specify the id as an attribute used by the elasticsearchexporter
// to set the final document ID in Elasticsearch.
// When using the bodymap encoding in the exporter all attributes
// are stripped out of the final Elasticsearch document.
//
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36882.
switch id := id.(type) {
case string:
logRecord.Attributes().PutStr(esDocumentIDAttribute, id)
}
}

beatEvent := event.Content.Fields.Clone()
beatEvent["@timestamp"] = event.Content.Timestamp
logRecord.SetTimestamp(pcommon.NewTimestampFromTime(event.Content.Timestamp))
Expand Down
21 changes: 21 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestPublish(t *testing.T) {
event1 := beat.Event{Fields: mapstr.M{"field": 1}}
event2 := beat.Event{Fields: mapstr.M{"field": 2}}
event3 := beat.Event{Fields: mapstr.M{"field": 3}}
event4 := beat.Event{Meta: mapstr.M{"_id": "abc123"}}

makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer {
t.Helper()
Expand Down Expand Up @@ -118,6 +119,26 @@ func TestPublish(t *testing.T) {
assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag)
})

t.Run("sets the elasticsearchexporter doc id attribute from metadata", func(t *testing.T) {
batch := outest.NewBatch(event4)

var docID string
otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
record := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
attr, ok := record.Attributes().Get(esDocumentIDAttribute)
assert.True(t, ok, "document ID attribute should be set")
docID = attr.AsString()

return nil
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag)
assert.Equal(t, event4.Meta["_id"], docID)
})

t.Run("sets the @timestamp field with the correct format", func(t *testing.T) {
batch := outest.NewBatch(event3)
batch.Events()[0].Content.Timestamp = time.Date(2025, time.January, 29, 9, 2, 39, 0, time.UTC)
Expand Down
Loading