Skip to content

Commit

Permalink
x-pack/filebeat/input/awss3: fix document ID construction when using …
Browse files Browse the repository at this point in the history
…csv decoder (#42019)
  • Loading branch information
efd6 authored Dec 18, 2024
1 parent d115014 commit f310728
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920]
- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]
- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019]

*Heartbeat*

Expand Down
5 changes: 4 additions & 1 deletion x-pack/filebeat/input/awss3/decoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ type decoder interface {
type valueDecoder interface {
decoder

decodeValue() (any, error)
// decodeValue returns the current value, and its offset
// in the stream. If the receiver is unable to provide
// a unique offset for the value, offset will be negative.
decodeValue() (offset int64, val any, _ error)
}

// newDecoder creates a new decoder based on the codec type.
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/input/awss3/decoding_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type csvDecoder struct {
r *csv.Reader

header []string
offset int64
current []string

err error
Expand Down Expand Up @@ -51,14 +52,15 @@ func (d *csvDecoder) next() bool {
if d.err != nil {
return false
}
d.offset = d.r.InputOffset()
d.current, d.err = d.r.Read()
return d.err == nil
}

// decode returns the JSON encoded value of the current CSV line. next must
// have been called before any calls to decode.
func (d *csvDecoder) decode() ([]byte, error) {
v, err := d.decodeValue()
_, v, err := d.decodeValue()
if err != nil {
return nil, err
}
Expand All @@ -68,12 +70,12 @@ func (d *csvDecoder) decode() ([]byte, error) {
// decodeValue returns the value of the current CSV line interpreted as
// an object with fields based on the header held by the receiver. next must
// have been called before any calls to decode.
func (d *csvDecoder) decodeValue() (any, error) {
func (d *csvDecoder) decodeValue() (offset int64, val any, _ error) {
if d.err != nil {
return nil, d.err
return d.offset, nil, d.err
}
if len(d.current) == 0 {
return nil, fmt.Errorf("decode called before next")
return d.offset, nil, fmt.Errorf("decode called before next")
}
m := make(map[string]string, len(d.header))
// By the time we are here, current must be the same
Expand All @@ -83,7 +85,7 @@ func (d *csvDecoder) decodeValue() (any, error) {
for i, n := range d.header {
m[n] = d.current[i]
}
return m, nil
return d.offset, m, nil
}

// close closes the parquet decoder and releases the resources.
Expand Down
15 changes: 11 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,12 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
if err != nil {
return err
}
var evtOffset int64
switch dec := dec.(type) {
case valueDecoder:
defer dec.close()

for dec.next() {
val, err := dec.decodeValue()
evtOffset, val, err := dec.decodeValue()
if err != nil {
if errors.Is(err, io.EOF) {
return nil
Expand All @@ -183,6 +182,7 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
case decoder:
defer dec.close()

var evtOffset int64
for dec.next() {
data, err := dec.decode()
if err != nil {
Expand Down Expand Up @@ -420,13 +420,17 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error {
return nil
}

// createEvent constructs a beat.Event from message and offset. The value of
// message populates the event message field, and offset is used to set the
// log.offset field and, with the object's ARN and key, the @metadata._id field.
// If offset is negative, it is ignored. No @metadata._id field is added to
// the event and the log.offset field is not set.
func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event {
event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: mapstr.M{
"message": message,
"log": mapstr.M{
"offset": offset,
"file": mapstr.M{
"path": p.s3RequestURL,
},
Expand All @@ -448,7 +452,10 @@ func (p *s3ObjectProcessor) createEvent(message string, offset int64) beat.Event
},
},
}
event.SetID(objectID(p.s3ObjHash, offset))
if offset >= 0 {
event.Fields.Put("log.offset", offset)
event.SetID(objectID(p.s3ObjHash, offset))
}

if len(p.s3Metadata) > 0 {
_, _ = event.Fields.Put("aws.s3.metadata", p.s3Metadata)
Expand Down

0 comments on commit f310728

Please sign in to comment.