From f310728094f3eb68da0dcc42b424041d35973155 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 18 Dec 2024 18:30:38 +1030 Subject: [PATCH] x-pack/filebeat/input/awss3: fix document ID construction when using csv decoder (#42019) --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/awss3/decoding.go | 5 ++++- x-pack/filebeat/input/awss3/decoding_csv.go | 12 +++++++----- x-pack/filebeat/input/awss3/s3_objects.go | 15 +++++++++++---- 4 files changed, 23 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 05a6a09516c..3bd5716080a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -196,6 +196,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920] - Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036] +- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019] *Heartbeat* diff --git a/x-pack/filebeat/input/awss3/decoding.go b/x-pack/filebeat/input/awss3/decoding.go index 2fb5b396d1a..95be6493192 100644 --- a/x-pack/filebeat/input/awss3/decoding.go +++ b/x-pack/filebeat/input/awss3/decoding.go @@ -25,7 +25,10 @@ type decoder interface { type valueDecoder interface { decoder - decodeValue() (any, error) + // decodeValue returns the current value, and its offset + // in the stream. If the receiver is unable to provide + // a unique offset for the value, offset will be negative. + decodeValue() (offset int64, val any, _ error) } // newDecoder creates a new decoder based on the codec type. diff --git a/x-pack/filebeat/input/awss3/decoding_csv.go b/x-pack/filebeat/input/awss3/decoding_csv.go index 050063517e1..b76a97d20d3 100644 --- a/x-pack/filebeat/input/awss3/decoding_csv.go +++ b/x-pack/filebeat/input/awss3/decoding_csv.go @@ -17,6 +17,7 @@ type csvDecoder struct { r *csv.Reader header []string + offset int64 current []string err error @@ -51,6 +52,7 @@ func (d *csvDecoder) next() bool { if d.err != nil { return false } + d.offset = d.r.InputOffset() d.current, d.err = d.r.Read() return d.err == nil } @@ -58,7 +60,7 @@ func (d *csvDecoder) next() bool { // decode returns the JSON encoded value of the current CSV line. next must // have been called before any calls to decode. func (d *csvDecoder) decode() ([]byte, error) { - v, err := d.decodeValue() + _, v, err := d.decodeValue() if err != nil { return nil, err } @@ -68,12 +70,12 @@ func (d *csvDecoder) decode() ([]byte, error) { // decodeValue returns the value of the current CSV line interpreted as // an object with fields based on the header held by the receiver. next must // have been called before any calls to decode. -func (d *csvDecoder) decodeValue() (any, error) { +func (d *csvDecoder) decodeValue() (offset int64, val any, _ error) { if d.err != nil { - return nil, d.err + return d.offset, nil, d.err } if len(d.current) == 0 { - return nil, fmt.Errorf("decode called before next") + return d.offset, nil, fmt.Errorf("decode called before next") } m := make(map[string]string, len(d.header)) // By the time we are here, current must be the same @@ -83,7 +85,7 @@ func (d *csvDecoder) decodeValue() (any, error) { for i, n := range d.header { m[n] = d.current[i] } - return m, nil + return d.offset, m, nil } // close closes the parquet decoder and releases the resources. diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index acd4d173439..c36bd7858f9 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -158,13 +158,12 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func if err != nil { return err } - var evtOffset int64 switch dec := dec.(type) { case valueDecoder: defer dec.close() for dec.next() { - val, err := dec.decodeValue() + evtOffset, val, err := dec.decodeValue() if err != nil { if errors.Is(err, io.EOF) { return nil @@ -183,6 +182,7 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func case decoder: defer dec.close() + var evtOffset int64 for dec.next() { data, err := dec.decode() if err != nil { @@ -420,13 +420,17 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { return nil } +// createEvent constructs a beat.Event from message and offset. The value of +// message populates the event message field, and offset is used to set the +// log.offset field and, with the object's ARN and key, the @metadata._id field. +// If offset is negative, it is ignored. No @metadata._id field is added to +// the event and the log.offset field is not set. func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event { event := beat.Event{ Timestamp: time.Now().UTC(), Fields: mapstr.M{ "message": message, "log": mapstr.M{ - "offset": offset, "file": mapstr.M{ "path": p.s3RequestURL, }, @@ -448,7 +452,10 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event }, }, } - event.SetID(objectID(p.s3ObjHash, offset)) + if offset >= 0 { + event.Fields.Put("log.offset", offset) + event.SetID(objectID(p.s3ObjHash, offset)) + } if len(p.s3Metadata) > 0 { _, _ = event.Fields.Put("aws.s3.metadata", p.s3Metadata)