Skip to content

Commit

Permalink
Optimize memory allocations in the event processing pipeline
Browse files Browse the repository at this point in the history
Previously, every time a processor ran on an event we made a clone of
the entire event for two reasons:

1. this event could have some nested maps that are
shared among multiple events.

2. in case a processor fails to make a change it should be able to
revert its partial changes.

This change added a new `EventEditor` wrapper that is used for collecting
pending event changes in processors with an option to `Apply` or `Reset` them.

Additionally, this `EventEditor` takes care of the efficient memory management when
making changes to an event by cloning only the nested maps that
processors access or modify. Most of the processors just put new keys
or delete existing keys on the root-level, so most of the time the nested maps in the
event remain untouched and it does not require the whole event to be cloned.
  • Loading branch information
rdner committed Oct 12, 2023
1 parent a4f8aa1 commit 19dc9e5
Show file tree
Hide file tree
Showing 4 changed files with 733 additions and 43 deletions.
132 changes: 89 additions & 43 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const FlagField = "log.flags"
const (
timestampFieldKey = "@timestamp"
metadataFieldKey = "@metadata"
metadataKeyPrefix = metadataFieldKey + "."
metadataKeyOffset = len(metadataKeyPrefix)
)

// Event is the common event format shared by all beats.
Expand All @@ -47,28 +49,50 @@ type Event struct {
}

var (
errNoTimestamp = errors.New("value is no timestamp")
errNoMapStr = errors.New("value is no map[string]interface{} type")
errNoTimestamp = errors.New("value is no timestamp")
errNoMapStr = errors.New("value is no map[string]interface{} type")
)

// SetID overwrites the "id" field in the events metadata.
// If Meta is nil, a new Meta dictionary is created.
func (e *Event) SetID(id string) {
if e.Meta == nil {
e.Meta = mapstr.M{}
_, _ = e.PutValue("@metadata._id", id)
}

func (e *Event) HasKey(key string) (bool, error) {
if key == timestampFieldKey {
return true, nil
}

if subKey, ok := metadataKey(key); ok {
if subKey == "" || e.Meta == nil {
return e.Meta != nil, nil
}
return e.Meta.HasKey(subKey)
}
e.Meta["_id"] = id

return e.Fields.HasKey(key)
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == timestampFieldKey {
return e.Timestamp, nil
} else if subKey, ok := metadataKey(key); ok {
if subKey == "" || e.Meta == nil {
}

if subKey, ok := metadataKey(key); ok {
if e.Meta == nil {
return nil, mapstr.ErrKeyNotFound
}
if subKey == "" {
return e.Meta, nil
}
return e.Meta.GetValue(subKey)
}

if e.Fields == nil {
return nil, mapstr.ErrKeyNotFound
}

return e.Fields.GetValue(key)
}

Expand Down Expand Up @@ -106,6 +130,36 @@ func (e *Event) DeepUpdateNoOverwrite(d mapstr.M) {
e.deepUpdate(d, false)
}

func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
if key == timestampFieldKey {
return e.setTimestamp(v)
}

if subKey, ok := metadataKey(key); ok {
return e.putMetadataValue(subKey, v)
}

if e.Fields == nil {
e.Fields = mapstr.M{}
}

return e.Fields.Put(key, v)
}

func (e *Event) Delete(key string) error {
if subKey, ok := metadataKey(key); ok {
if subKey == "" {
e.Meta = nil
return nil
}
if e.Meta == nil {
return nil
}
return e.Meta.Delete(subKey)
}
return e.Fields.Delete(key)
}

func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
if len(d) == 0 {
return
Expand All @@ -116,7 +170,7 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
timestampValue, timestampExists := d[timestampFieldKey]
if timestampExists {
if overwrite {
_ = e.setTimestamp(timestampValue)
_, _ = e.setTimestamp(timestampValue)
}

// Temporary delete it from the update map,
Expand All @@ -127,6 +181,7 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {

// It's supported to update the metadata using this function.
// However, we must handle it separately since it's a separate field of the event.
// !!!BUG!!!, no `@metadata` key prefix support
metaValue, metaExists := d[metadataFieldKey]
if metaExists {
var metaUpdate mapstr.M
Expand Down Expand Up @@ -180,54 +235,45 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) {
}
}

func (e *Event) setTimestamp(v interface{}) error {
func (e *Event) setTimestamp(v interface{}) (interface{}, error) {
// to satisfy the PutValue interface, this function
// must return the overwritten value
prevValue := e.Timestamp

switch ts := v.(type) {
case time.Time:
e.Timestamp = ts
return prevValue, nil
case common.Time:
e.Timestamp = time.Time(ts)
return prevValue, nil
default:
return errNoTimestamp
return nil, errNoTimestamp
}

return nil
}

func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
if key == timestampFieldKey {
err := e.setTimestamp(v)
return nil, err
} else if subKey, ok := metadataKey(key); ok {
if subKey == "" {
switch meta := v.(type) {
case mapstr.M:
e.Meta = meta
case map[string]interface{}:
e.Meta = meta
default:
return nil, errNoMapStr
}
} else if e.Meta == nil {
e.Meta = mapstr.M{}
}
return e.Meta.Put(subKey, v)
func (e *Event) putMetadataValue(subKey string, v interface{}) (interface{}, error) {
if e.Meta == nil {
e.Meta = mapstr.M{}
}
if subKey == "" {
// to satisfy the PutValue interface, this function
// must return the overwritten value
prevValue := e.Meta

return e.Fields.Put(key, v)
}

func (e *Event) Delete(key string) error {
if subKey, ok := metadataKey(key); ok {
if subKey == "" {
e.Meta = nil
return nil
}
if e.Meta == nil {
return nil
switch meta := v.(type) {
case mapstr.M:
e.Meta = meta
return prevValue, nil
case map[string]interface{}:
e.Meta = mapstr.M(meta)
return prevValue, nil
default:
return nil, errNoMapStr
}
return e.Meta.Delete(subKey)
}
return e.Fields.Delete(key)

return e.Meta.Put(subKey, v)
}

func metadataKey(key string) (string, bool) {
Expand Down
Loading

0 comments on commit 19dc9e5

Please sign in to comment.