Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve original meta fields during early encoding of events #42559

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions libbeat/outputs/elasticsearch/event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type encodedEvent struct {
// there's an ingestion error.
timestamp time.Time

// The meta fields from the original event (which aren't included in the
// encoding but may still need to be logged if there is an error).
meta mapstr.M

id string
opType events.OpType
pipeline string
Expand Down Expand Up @@ -130,6 +134,7 @@ func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent {
copy(bytes, bufBytes)
return &encodedEvent{
id: id,
meta: e.Meta,
timestamp: e.Timestamp,
opType: opType,
pipeline: pipeline,
Expand All @@ -152,9 +157,14 @@ func (e *encodedEvent) setDeadLetter(
e.encoding = []byte(deadLetterReencoding.String())
}

// String converts e.encoding to string and returns it.
// String converts e.encoding (and meta fields if present)
// to string and returns it.
// The goal of this method is to provide an easy way to log
// the event encoded.
func (e *encodedEvent) String() string {
return string(e.encoding)
metaString := "none"
if e.meta != nil {
metaString = e.meta.String()
}
return string(e.encoding) + ", Meta: " + metaString
}
16 changes: 11 additions & 5 deletions libbeat/outputs/elasticsearch/event_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@

encoder := newEventEncoder(true, indexSelector, nil)

metaFields := mapstr.M{
events.FieldMetaOpType: "create",
events.FieldMetaPipeline: "TEST_PIPELINE",
events.FieldMetaID: "test_id",
}

timestamp := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC)
pubEvent := publisher.Event{
Content: beat.Event{
Expand All @@ -53,11 +59,7 @@
"nested_field": "nested_value",
},
},
Meta: mapstr.M{
events.FieldMetaOpType: "create",
events.FieldMetaPipeline: "TEST_PIPELINE",
events.FieldMetaID: "test_id",
},
Meta: metaFields,
},
}

Expand All @@ -81,6 +83,7 @@
assert.Equal(t, timestamp, encBeatEvent.timestamp, "encodedEvent.timestamp should match the original event")
assert.Equal(t, events.OpTypeCreate, encBeatEvent.opType, "encoded opType should match the original metadata")
assert.False(t, encBeatEvent.deadLetter, "encoded event shouldn't have deadLetter flag set")
assert.Equal(t, encBeatEvent.meta, metaFields, "encoded event struct should include original event's meta fields")

// Check encoded fields
var eventContent struct {
Expand All @@ -97,6 +100,9 @@
assert.Equal(t, "test_value", eventContent.TestField, "Encoded field should match original")
assert.Equal(t, 5, eventContent.NumberField, "Encoded field should match original")
assert.Equal(t, "nested_value", eventContent.Nested.NestedField, "Encoded field should match original")

// Check string representation includes meta fields
assert.Contains(t, encBeatEvent.String(), `"pipeline":"TEST_PIPELINE"`, "String representation of encoded event should include the original event's meta fields")
}

// encodeBatch encodes a publisher.Batch so it can be provided to
Expand Down Expand Up @@ -124,14 +130,14 @@
// Skip encoding if there's already encoded data present
if events[i].EncodedEvent == nil {
encoded, _ := encoder.EncodeEntry(events[i])
event := encoded.(publisher.Event)

Check failure on line 133 in libbeat/outputs/elasticsearch/event_encoder_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value is not checked (errcheck)
events[i] = event
}
}
return events
}

func encodeEvent(client *Client, event publisher.Event) publisher.Event {

Check failure on line 140 in libbeat/outputs/elasticsearch/event_encoder_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

func `encodeEvent` is unused (unused)
encoder := newEventEncoder(
client.conn.EscapeHTML,
client.indexSelector,
Expand Down
Loading