Skip to content

Preserve original meta fields during early encoding of events #42559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
- Fix autodiscovery memory leak related to metadata of start events {pull}41748[41748]
- All standard queue metrics are now included in metrics monitoring, including: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`. {pull}42439[42439]
- The following output latency metrics are now included in metrics monitoring: `output.latency.{count, max, median, p99}`. {pull}42439[42439]
- Restored event Meta fields in the Elasticsearch output's error logs. {pull}42559[42559]

*Auditbeat*

Expand Down
14 changes: 12 additions & 2 deletions libbeat/outputs/elasticsearch/event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type encodedEvent struct {
// there's an ingestion error.
timestamp time.Time

// The meta fields from the original event (which aren't included in the
// encoding but may still need to be logged if there is an error).
meta mapstr.M

id string
opType events.OpType
pipeline string
Expand Down Expand Up @@ -130,6 +134,7 @@ func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent {
copy(bytes, bufBytes)
return &encodedEvent{
id: id,
meta: e.Meta,
timestamp: e.Timestamp,
opType: opType,
pipeline: pipeline,
Expand All @@ -152,9 +157,14 @@ func (e *encodedEvent) setDeadLetter(
e.encoding = []byte(deadLetterReencoding.String())
}

// String converts e.encoding to string and returns it.
// String converts e.encoding (and meta fields if present)
// to string and returns it.
// The goal of this method is to provide an easy way to log
// the event encoded.
func (e *encodedEvent) String() string {
return string(e.encoding)
metaString := "none"
if e.meta != nil {
metaString = e.meta.String()
}
return string(e.encoding) + ", Meta: " + metaString
}
19 changes: 13 additions & 6 deletions libbeat/outputs/elasticsearch/event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ func TestEncodeEntry(t *testing.T) {

encoder := newEventEncoder(true, indexSelector, nil)

metaFields := mapstr.M{
events.FieldMetaOpType: "create",
events.FieldMetaPipeline: "TEST_PIPELINE",
events.FieldMetaID: "test_id",
}

timestamp := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC)
pubEvent := publisher.Event{
Content: beat.Event{
Expand All @@ -53,11 +59,7 @@ func TestEncodeEntry(t *testing.T) {
"nested_field": "nested_value",
},
},
Meta: mapstr.M{
events.FieldMetaOpType: "create",
events.FieldMetaPipeline: "TEST_PIPELINE",
events.FieldMetaID: "test_id",
},
Meta: metaFields,
},
}

Expand All @@ -81,6 +83,7 @@ func TestEncodeEntry(t *testing.T) {
assert.Equal(t, timestamp, encBeatEvent.timestamp, "encodedEvent.timestamp should match the original event")
assert.Equal(t, events.OpTypeCreate, encBeatEvent.opType, "encoded opType should match the original metadata")
assert.False(t, encBeatEvent.deadLetter, "encoded event shouldn't have deadLetter flag set")
assert.Equal(t, encBeatEvent.meta, metaFields, "encoded event struct should include original event's meta fields")

// Check encoded fields
var eventContent struct {
Expand All @@ -97,6 +100,9 @@ func TestEncodeEntry(t *testing.T) {
assert.Equal(t, "test_value", eventContent.TestField, "Encoded field should match original")
assert.Equal(t, 5, eventContent.NumberField, "Encoded field should match original")
assert.Equal(t, "nested_value", eventContent.Nested.NestedField, "Encoded field should match original")

// Check string representation includes meta fields
assert.Contains(t, encBeatEvent.String(), `"pipeline":"TEST_PIPELINE"`, "String representation of encoded event should include the original event's meta fields")
}

// encodeBatch encodes a publisher.Batch so it can be provided to
Expand Down Expand Up @@ -124,13 +130,14 @@ func encodeEvents(client *Client, events []publisher.Event) []publisher.Event {
// Skip encoding if there's already encoded data present
if events[i].EncodedEvent == nil {
encoded, _ := encoder.EncodeEntry(events[i])
event := encoded.(publisher.Event)
event, _ := encoded.(publisher.Event)
events[i] = event
}
}
return events
}

//nolint:unused // False positive caused by varying build tags, this is used in client_test
func encodeEvent(client *Client, event publisher.Event) publisher.Event {
encoder := newEventEncoder(
client.conn.EscapeHTML,
Expand Down
Loading