Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize memory allocations in the event processing pipeline #36830

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 165 additions & 108 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,31 @@ package beat

import (
"errors"
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type updateMode bool

var (
updateModeOverwrite updateMode = true
updateModeNoOverwrite updateMode = false
)

// FlagField fields used to keep information or errors when events are parsed.
const FlagField = "log.flags"

const (
timestampFieldKey = "@timestamp"
metadataFieldKey = "@metadata"
TimestampFieldKey = "@timestamp"
MetadataFieldKey = "@metadata"
ErrorFieldKey = "error"
TypeFieldKey = "type"
metadataKeyPrefix = MetadataFieldKey + "."
metadataKeyOffset = len(metadataKeyPrefix)
)

// Event is the common event format shared by all beats.
Expand All @@ -47,41 +59,71 @@ type Event struct {
}

var (
errNoTimestamp = errors.New("value is no timestamp")
errNoMapStr = errors.New("value is no map[string]interface{} type")
ErrValueNotTimestamp = errors.New("value is not a timestamp")
ErrValueNotMapStr = errors.New("value is not `mapstr.M` or `map[string]interface{}` type")
ErrAlterMetadataKey = fmt.Errorf("deleting/replacing %q key is not supported", MetadataFieldKey)
ErrMetadataAccess = fmt.Errorf("accessing %q key directly is not supported, try nested keys", MetadataFieldKey)
ErrDeleteTimestamp = fmt.Errorf("deleting %q key is not supported", TimestampFieldKey)
)

// SetID overwrites the "id" field in the events metadata.
// If Meta is nil, a new Meta dictionary is created.
func (e *Event) SetID(id string) {
if e.Meta == nil {
e.Meta = mapstr.M{}
_, _ = e.PutValue("@metadata._id", id)
}

func (e *Event) HasKey(key string) (bool, error) {
if key == TimestampFieldKey || key == MetadataFieldKey {
return true, nil
}
e.Meta["_id"] = id

if subKey, ok := e.metadataSubKey(key); ok {
if e.Meta == nil {
return false, nil
}
return e.Meta.HasKey(subKey)
}

if e.Fields == nil {
return false, nil
}

return e.Fields.HasKey(key)
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == timestampFieldKey {
if key == TimestampFieldKey {
return e.Timestamp, nil
} else if subKey, ok := metadataKey(key); ok {
if subKey == "" || e.Meta == nil {
return e.Meta, nil
}
if key == MetadataFieldKey {
return nil, ErrMetadataAccess
}

if subKey, ok := e.metadataSubKey(key); ok {
if e.Meta == nil {
return nil, mapstr.ErrKeyNotFound
}
return e.Meta.GetValue(subKey)
}

if e.Fields == nil {
return nil, mapstr.ErrKeyNotFound
}

return e.Fields.GetValue(key)
}

// Clone creates an exact copy of the event
func (e *Event) Clone() *Event {
return &Event{
Timestamp: e.Timestamp,
Meta: e.Meta.Clone(),
Fields: e.Fields.Clone(),
Private: e.Private,
TimeSeries: e.TimeSeries,
}
}
// TODO DELETE
// func (e *Event) Clone() *Event {
// return &Event{
// Timestamp: e.Timestamp,
// Meta: e.Meta.Clone(),
// Fields: e.Fields.Clone(),
// Private: e.Private,
// TimeSeries: e.TimeSeries,
// }
// }

// DeepUpdate recursively copies the key-value pairs from `d` to various properties of the event.
// When the key equals `@timestamp` it's set as the `Timestamp` property of the event.
Expand All @@ -92,7 +134,7 @@ func (e *Event) Clone() *Event {
// `DeepUpdateNoOverwrite` is a version of this function that does not
// overwrite existing values.
func (e *Event) DeepUpdate(d mapstr.M) {
e.deepUpdate(d, true)
e.deepUpdate(d, updateModeOverwrite)
}

// DeepUpdateNoOverwrite recursively copies the key-value pairs from `d` to various properties of the event.
Expand All @@ -103,31 +145,103 @@ func (e *Event) DeepUpdate(d mapstr.M) {
// via `DeepUpdateNoOverwrite`.
// `DeepUpdate` is a version of this function that overwrites existing values.
func (e *Event) DeepUpdateNoOverwrite(d mapstr.M) {
e.deepUpdate(d, false)
e.deepUpdate(d, updateModeNoOverwrite)
}

func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
if key == TimestampFieldKey {
return e.setTimestamp(v)
}
if key == MetadataFieldKey {
return nil, ErrAlterMetadataKey
}

if subKey, ok := e.metadataSubKey(key); ok {
if e.Meta == nil {
e.Meta = mapstr.M{}
}

return e.Meta.Put(subKey, v)
}

if e.Fields == nil {
e.Fields = mapstr.M{}
}

return e.Fields.Put(key, v)
}

func (e *Event) Delete(key string) error {
if key == TimestampFieldKey {
return ErrDeleteTimestamp
}
if key == MetadataFieldKey {
return ErrAlterMetadataKey
}
if subKey, ok := e.metadataSubKey(key); ok {
if e.Meta == nil {
return nil
}
return e.Meta.Delete(subKey)
}

if e.Fields == nil {
return nil
}
return e.Fields.Delete(key)
}

// SetErrorWithOption sets the event error field with the message when the addErrKey is set to true.
// If you want to include the data and field you can pass them as parameters and will be appended into the
// error as fields with the corresponding name.
func (e *Event) SetErrorWithOption(message string, addErrKey bool, data string, field string) {
if addErrKey {
errorField := mapstr.M{"message": message, TypeFieldKey: "json"}
if data != "" {
errorField["data"] = data
}
if field != "" {
errorField["field"] = field
}
e.Fields["error"] = errorField
}
}

// String returns a string representation of the event.
func (e *Event) String() string {
m := mapstr.M{
TimestampFieldKey: e.Timestamp,
MetadataFieldKey: e.Meta,
}
m.DeepUpdate(e.Fields)
return m.String()
}

func (e *Event) deepUpdate(d mapstr.M, mode updateMode) {
if len(d) == 0 {
return
}

// It's supported to update the timestamp using this function.
// However, we must handle it separately since it's a separate field of the event.
timestampValue, timestampExists := d[timestampFieldKey]
timestampValue, timestampExists := d[TimestampFieldKey]
if timestampExists {
if overwrite {
_ = e.setTimestamp(timestampValue)
if mode == updateModeOverwrite {
_, _ = e.setTimestamp(timestampValue)
}

// Temporary delete it from the update map,
// so we can do `e.Fields.DeepUpdate(d)` or
// `e.Fields.DeepUpdateNoOverwrite(d)` later
delete(d, timestampFieldKey)
delete(d, TimestampFieldKey)
defer func() {
d[TimestampFieldKey] = timestampValue
}()
}

// It's supported to update the metadata using this function.
// However, we must handle it separately since it's a separate field of the event.
metaValue, metaExists := d[metadataFieldKey]
metaValue, metaExists := d[MetadataFieldKey]
if metaExists {
var metaUpdate mapstr.M

Expand All @@ -142,29 +256,23 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
if e.Meta == nil {
e.Meta = mapstr.M{}
}
if overwrite {
switch mode {
case updateModeOverwrite:
e.Meta.DeepUpdate(metaUpdate)
} else {
case updateModeNoOverwrite:
e.Meta.DeepUpdateNoOverwrite(metaUpdate)
}
}

// Temporary delete it from the update map,
// so we can do `e.Fields.DeepUpdate(d)` or
// `e.Fields.DeepUpdateNoOverwrite(d)` later
delete(d, metadataFieldKey)
delete(d, MetadataFieldKey)
defer func() {
d[MetadataFieldKey] = metaValue
}()
}

// At the end we revert all changes we made to the update map
defer func() {
if timestampExists {
d[timestampFieldKey] = timestampValue
}
if metaExists {
d[metadataFieldKey] = metaValue
}
}()

if len(d) == 0 {
return
}
Expand All @@ -173,90 +281,39 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
e.Fields = mapstr.M{}
}

if overwrite {
switch mode {
case updateModeOverwrite:
e.Fields.DeepUpdate(d)
} else {
case updateModeNoOverwrite:
e.Fields.DeepUpdateNoOverwrite(d)
}
}

func (e *Event) setTimestamp(v interface{}) error {
func (e *Event) setTimestamp(v interface{}) (interface{}, error) {
// to satisfy the PutValue interface, this function
// must return the overwritten value
prevValue := e.Timestamp

switch ts := v.(type) {
case time.Time:
e.Timestamp = ts
return prevValue, nil
case common.Time:
e.Timestamp = time.Time(ts)
return prevValue, nil
default:
return errNoTimestamp
}

return nil
}

func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
if key == timestampFieldKey {
err := e.setTimestamp(v)
return nil, err
} else if subKey, ok := metadataKey(key); ok {
if subKey == "" {
switch meta := v.(type) {
case mapstr.M:
e.Meta = meta
case map[string]interface{}:
e.Meta = meta
default:
return nil, errNoMapStr
}
} else if e.Meta == nil {
e.Meta = mapstr.M{}
}
return e.Meta.Put(subKey, v)
}

return e.Fields.Put(key, v)
}

func (e *Event) Delete(key string) error {
if subKey, ok := metadataKey(key); ok {
if subKey == "" {
e.Meta = nil
return nil
}
if e.Meta == nil {
return nil
}
return e.Meta.Delete(subKey)
return nil, ErrValueNotTimestamp
}
return e.Fields.Delete(key)
}

func metadataKey(key string) (string, bool) {
if !strings.HasPrefix(key, metadataFieldKey) {
func (e *Event) metadataSubKey(key string) (string, bool) {
if !strings.HasPrefix(key, metadataKeyPrefix) {
return "", false
}

subKey := key[len(metadataFieldKey):]
subKey := key[metadataKeyOffset:]
if subKey == "" {
return "", true
}
if subKey[0] == '.' {
return subKey[1:], true
}
return "", false
}

// SetErrorWithOption sets the event error field with the message when the addErrKey is set to true.
// If you want to include the data and field you can pass them as parameters and will be appended into the
// error as fields with the corresponding name.
func (e *Event) SetErrorWithOption(message string, addErrKey bool, data string, field string) {
if addErrKey {
errorField := mapstr.M{"message": message, "type": "json"}
if data != "" {
errorField["data"] = data
}
if field != "" {
errorField["field"] = field
}
e.Fields["error"] = errorField
return "", false
}
return subKey, true
}
Loading
Loading