Skip to content

Commit

Permalink
Migrate all processors to using the EventEditor
Browse files Browse the repository at this point in the history
  • Loading branch information
rdner committed Oct 18, 2023
1 parent 6bc2131 commit b236659
Show file tree
Hide file tree
Showing 67 changed files with 1,598 additions and 852 deletions.
70 changes: 41 additions & 29 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ var (
const FlagField = "log.flags"

const (
timestampFieldKey = "@timestamp"
metadataFieldKey = "@metadata"
metadataKeyPrefix = metadataFieldKey + "."
TimestampFieldKey = "@timestamp"
MetadataFieldKey = "@metadata"
ErrorFieldKey = "error"
TypeFieldKey = "type"
metadataKeyPrefix = MetadataFieldKey + "."
metadataKeyOffset = len(metadataKeyPrefix)
)

Expand All @@ -59,9 +61,9 @@ type Event struct {
var (
ErrValueNotTimestamp = errors.New("value is not a timestamp")
ErrValueNotMapStr = errors.New("value is not `mapstr.M` or `map[string]interface{}` type")
ErrAlterMetadataKey = fmt.Errorf("deleting/replacing %q key is not supported", metadataFieldKey)
ErrMetadataAccess = fmt.Errorf("accessing %q key directly is not supported, try nested keys", metadataFieldKey)
ErrDeleteTimestamp = fmt.Errorf("deleting %q key is not supported", timestampFieldKey)
ErrAlterMetadataKey = fmt.Errorf("deleting/replacing %q key is not supported", MetadataFieldKey)
ErrMetadataAccess = fmt.Errorf("accessing %q key directly is not supported, try nested keys", MetadataFieldKey)
ErrDeleteTimestamp = fmt.Errorf("deleting %q key is not supported", TimestampFieldKey)
)

// SetID overwrites the "id" field in the events metadata.
Expand All @@ -71,7 +73,7 @@ func (e *Event) SetID(id string) {
}

func (e *Event) HasKey(key string) (bool, error) {
if key == timestampFieldKey || key == metadataFieldKey {
if key == TimestampFieldKey || key == MetadataFieldKey {
return true, nil
}

Expand All @@ -90,10 +92,10 @@ func (e *Event) HasKey(key string) (bool, error) {
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == timestampFieldKey {
if key == TimestampFieldKey {
return e.Timestamp, nil
}
if key == metadataFieldKey {
if key == MetadataFieldKey {
return nil, ErrMetadataAccess
}

Expand All @@ -113,15 +115,15 @@ func (e *Event) GetValue(key string) (interface{}, error) {

// Clone creates an exact copy of the event
// TODO DELETE
func (e *Event) Clone() *Event {
return &Event{
Timestamp: e.Timestamp,
Meta: e.Meta.Clone(),
Fields: e.Fields.Clone(),
Private: e.Private,
TimeSeries: e.TimeSeries,
}
}
// func (e *Event) Clone() *Event {
// return &Event{
// Timestamp: e.Timestamp,
// Meta: e.Meta.Clone(),
// Fields: e.Fields.Clone(),
// Private: e.Private,
// TimeSeries: e.TimeSeries,
// }
// }

// DeepUpdate recursively copies the key-value pairs from `d` to various properties of the event.
// When the key equals `@timestamp` it's set as the `Timestamp` property of the event.
Expand All @@ -147,10 +149,10 @@ func (e *Event) DeepUpdateNoOverwrite(d mapstr.M) {
}

func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
if key == timestampFieldKey {
if key == TimestampFieldKey {
return e.setTimestamp(v)
}
if key == metadataFieldKey {
if key == MetadataFieldKey {
return nil, ErrAlterMetadataKey
}

Expand All @@ -170,10 +172,10 @@ func (e *Event) PutValue(key string, v interface{}) (interface{}, error) {
}

func (e *Event) Delete(key string) error {
if key == timestampFieldKey {
if key == TimestampFieldKey {
return ErrDeleteTimestamp
}
if key == metadataFieldKey {
if key == MetadataFieldKey {
return ErrAlterMetadataKey
}
if subKey, ok := e.metadataSubKey(key); ok {
Expand All @@ -194,7 +196,7 @@ func (e *Event) Delete(key string) error {
// error as fields with the corresponding name.
func (e *Event) SetErrorWithOption(message string, addErrKey bool, data string, field string) {
if addErrKey {
errorField := mapstr.M{"message": message, "type": "json"}
errorField := mapstr.M{"message": message, TypeFieldKey: "json"}
if data != "" {
errorField["data"] = data
}
Expand All @@ -205,14 +207,24 @@ func (e *Event) SetErrorWithOption(message string, addErrKey bool, data string,
}
}

// String returns a string representation of the event.
func (e *Event) String() string {
m := mapstr.M{
TimestampFieldKey: e.Timestamp,
MetadataFieldKey: e.Meta,
}
m.DeepUpdate(e.Fields)
return m.String()
}

func (e *Event) deepUpdate(d mapstr.M, mode updateMode) {
if len(d) == 0 {
return
}

// It's supported to update the timestamp using this function.
// However, we must handle it separately since it's a separate field of the event.
timestampValue, timestampExists := d[timestampFieldKey]
timestampValue, timestampExists := d[TimestampFieldKey]
if timestampExists {
if mode == updateModeOverwrite {
_, _ = e.setTimestamp(timestampValue)
Expand All @@ -221,15 +233,15 @@ func (e *Event) deepUpdate(d mapstr.M, mode updateMode) {
// Temporary delete it from the update map,
// so we can do `e.Fields.DeepUpdate(d)` or
// `e.Fields.DeepUpdateNoOverwrite(d)` later
delete(d, timestampFieldKey)
delete(d, TimestampFieldKey)
defer func() {
d[timestampFieldKey] = timestampValue
d[TimestampFieldKey] = timestampValue
}()
}

// It's supported to update the metadata using this function.
// However, we must handle it separately since it's a separate field of the event.
metaValue, metaExists := d[metadataFieldKey]
metaValue, metaExists := d[MetadataFieldKey]
if metaExists {
var metaUpdate mapstr.M

Expand All @@ -255,9 +267,9 @@ func (e *Event) deepUpdate(d mapstr.M, mode updateMode) {
// Temporary delete it from the update map,
// so we can do `e.Fields.DeepUpdate(d)` or
// `e.Fields.DeepUpdateNoOverwrite(d)` later
delete(d, metadataFieldKey)
delete(d, MetadataFieldKey)
defer func() {
d[metadataFieldKey] = metaValue
d[MetadataFieldKey] = metaValue
}()
}

Expand Down
Loading

0 comments on commit b236659

Please sign in to comment.