From 74d6d002f9d322ad7035b2bf648025fdd47422db Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 31 Jan 2025 15:40:45 -0500 Subject: [PATCH 1/5] Preserve original meta fields during early encoding of events --- libbeat/outputs/elasticsearch/client.go | 1 + libbeat/outputs/elasticsearch/event_encoder.go | 14 ++++++++++++-- .../outputs/elasticsearch/event_encoder_test.go | 16 +++++++++++----- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 56f28cdbf30d..6ef87336d391 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -241,6 +241,7 @@ func (client *Client) Clone() *Client { } func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { + span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() span.Context.SetLabel("events_original", len(batch.Events())) diff --git a/libbeat/outputs/elasticsearch/event_encoder.go b/libbeat/outputs/elasticsearch/event_encoder.go index e569ebf3abf3..2f32abe143a0 100644 --- a/libbeat/outputs/elasticsearch/event_encoder.go +++ b/libbeat/outputs/elasticsearch/event_encoder.go @@ -55,6 +55,10 @@ type encodedEvent struct { // there's an ingestion error. timestamp time.Time + // The meta fields from the original event (which aren't included in the + // encoding but may still need to be logged if there is an error). + meta mapstr.M + id string opType events.OpType pipeline string @@ -130,6 +134,7 @@ func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent { copy(bytes, bufBytes) return &encodedEvent{ id: id, + meta: e.Meta, timestamp: e.Timestamp, opType: opType, pipeline: pipeline, @@ -152,9 +157,14 @@ func (e *encodedEvent) setDeadLetter( e.encoding = []byte(deadLetterReencoding.String()) } -// String converts e.encoding to string and returns it. +// String converts e.encoding (and meta fields if present) +// to string and returns it. // The goal of this method is to provide an easy way to log // the event encoded. func (e *encodedEvent) String() string { - return string(e.encoding) + metaString := "none" + if e.meta != nil { + metaString = e.meta.String() + } + return string(e.encoding) + ", Meta: " + metaString } diff --git a/libbeat/outputs/elasticsearch/event_encoder_test.go b/libbeat/outputs/elasticsearch/event_encoder_test.go index a3aef08ca23c..5c3cbc70d69b 100644 --- a/libbeat/outputs/elasticsearch/event_encoder_test.go +++ b/libbeat/outputs/elasticsearch/event_encoder_test.go @@ -42,6 +42,12 @@ func TestEncodeEntry(t *testing.T) { encoder := newEventEncoder(true, indexSelector, nil) + metaFields := mapstr.M{ + events.FieldMetaOpType: "create", + events.FieldMetaPipeline: "TEST_PIPELINE", + events.FieldMetaID: "test_id", + } + timestamp := time.Date(1980, time.January, 1, 0, 0, 0, 0, time.UTC) pubEvent := publisher.Event{ Content: beat.Event{ @@ -53,11 +59,7 @@ func TestEncodeEntry(t *testing.T) { "nested_field": "nested_value", }, }, - Meta: mapstr.M{ - events.FieldMetaOpType: "create", - events.FieldMetaPipeline: "TEST_PIPELINE", - events.FieldMetaID: "test_id", - }, + Meta: metaFields, }, } @@ -81,6 +83,7 @@ func TestEncodeEntry(t *testing.T) { assert.Equal(t, timestamp, encBeatEvent.timestamp, "encodedEvent.timestamp should match the original event") assert.Equal(t, events.OpTypeCreate, encBeatEvent.opType, "encoded opType should match the original metadata") assert.False(t, encBeatEvent.deadLetter, "encoded event shouldn't have deadLetter flag set") + assert.Equal(t, encBeatEvent.meta, metaFields, "encoded event struct should include original event's meta fields") // Check encoded fields var eventContent struct { @@ -97,6 +100,9 @@ func TestEncodeEntry(t *testing.T) { assert.Equal(t, "test_value", eventContent.TestField, "Encoded field should match original") assert.Equal(t, 5, eventContent.NumberField, "Encoded field should match original") assert.Equal(t, "nested_value", eventContent.Nested.NestedField, "Encoded field should match original") + + // Check string representation includes meta fields + assert.Contains(t, encBeatEvent.String(), `"pipeline":"TEST_PIPELINE"`, "String representation of encoded event should include the original event's meta fields") } // encodeBatch encodes a publisher.Batch so it can be provided to From 9673d9b7ec9a0cde5d818d07fbdd2bb9b8954d0c Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 31 Jan 2025 15:44:04 -0500 Subject: [PATCH 2/5] remove accidental white space --- libbeat/outputs/elasticsearch/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 6ef87336d391..56f28cdbf30d 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -241,7 +241,6 @@ func (client *Client) Clone() *Client { } func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { - span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() span.Context.SetLabel("events_original", len(batch.Events())) From 106170440be880e4edc0abe5b79ff63938cda803 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 31 Jan 2025 16:44:42 -0500 Subject: [PATCH 3/5] lint --- libbeat/outputs/elasticsearch/event_encoder_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libbeat/outputs/elasticsearch/event_encoder_test.go b/libbeat/outputs/elasticsearch/event_encoder_test.go index 5c3cbc70d69b..9d40bead75be 100644 --- a/libbeat/outputs/elasticsearch/event_encoder_test.go +++ b/libbeat/outputs/elasticsearch/event_encoder_test.go @@ -130,13 +130,15 @@ func encodeEvents(client *Client, events []publisher.Event) []publisher.Event { // Skip encoding if there's already encoded data present if events[i].EncodedEvent == nil { encoded, _ := encoder.EncodeEntry(events[i]) - event := encoded.(publisher.Event) + event, ok := encoded.(publisher.Event) + require.True(t, ok, "EncodeEntry should return a publisher.Event") events[i] = event } } return events } +//nolint:unused // False positive caused by varying build tags, this is used in client_test func encodeEvent(client *Client, event publisher.Event) publisher.Event { encoder := newEventEncoder( client.conn.EscapeHTML, From d6f63e3962c773fb4ac4387fa8448f2220378d89 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 31 Jan 2025 16:49:19 -0500 Subject: [PATCH 4/5] update changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 944d8ece4806..50cff7028324 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -138,6 +138,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - Fix autodiscovery memory leak related to metadata of start events {pull}41748[41748] - All standard queue metrics are now included in metrics monitoring, including: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`. {pull}42439[42439] - The following output latency metrics are now included in metrics monitoring: `output.latency.{count, max, median, p99}`. {pull}42439[42439] +- Restored event Meta fields in the Elasticsearch output's error logs. {pull}42559[42559] *Auditbeat* From 6dd6c702ab62806147a5b0f98064aed637d1152a Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 31 Jan 2025 17:16:14 -0500 Subject: [PATCH 5/5] fix lint for real --- libbeat/outputs/elasticsearch/event_encoder_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libbeat/outputs/elasticsearch/event_encoder_test.go b/libbeat/outputs/elasticsearch/event_encoder_test.go index 9d40bead75be..0a9fa5561e77 100644 --- a/libbeat/outputs/elasticsearch/event_encoder_test.go +++ b/libbeat/outputs/elasticsearch/event_encoder_test.go @@ -130,8 +130,7 @@ func encodeEvents(client *Client, events []publisher.Event) []publisher.Event { // Skip encoding if there's already encoded data present if events[i].EncodedEvent == nil { encoded, _ := encoder.EncodeEntry(events[i]) - event, ok := encoded.(publisher.Event) - require.True(t, ok, "EncodeEntry should return a publisher.Event") + event, _ := encoded.(publisher.Event) events[i] = event } }