Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent indexing of event.received field #11602

Closed
wants to merge 12 commits into from
3 changes: 3 additions & 0 deletions apmpackage/apm/changelog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
- description: Remove unused `processor.event` field from logs data streams
type: enhancement
link: https://github.com/elastic/apm-server/pull/11494
- description: Remove `event.received` fields from logs, metrics and traces data streams
type: enhancement
link: https://github.com/elastic/apm-server/pull/11602
- version: "8.10.0"
changes:
- description: Add permissions to reroute to dedicated datasets for logs, metrics and traces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ processors:
name: observer_ids
- pipeline:
name: ecs_version
- pipeline:
name: remove_event_received
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ processors:
name: client_geoip
- pipeline:
name: set_metrics
- pipeline:
name: remove_event_received
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ processors:
if: ctx.error?.log?.message != null
field: error.grouping_name
copy_from: error.log.message
- pipeline:
name: remove_event_received
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,5 @@ processors:
- remove:
field: _dynamic_templates
ignore_missing: true
- pipeline:
name: remove_event_received
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ processors:
- service.framework
ignore_missing: true
ignore_failure: true

- pipeline:
name: remove_event_received
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ processors:
value: 0
ignore_failure: true
# end of event.success_count logic
- pipeline:
name: remove_event_received
26 changes: 18 additions & 8 deletions apmpackage/cmd/genpackage/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ import (
// - ...
func getCommonPipeline(name string, version *version.V) []map[string]interface{} {
commonPipelines := map[string][]map[string]interface{}{
"observer_version": getObserverVersionPipeline(version),
"observer_ids": observerIDsPipeline,
"ecs_version": ecsVersionPipeline,
"user_agent": userAgentPipeline,
"process_ppid": processPpidPipeline,
"client_geoip": clientGeoIPPipeline,
"event_duration": eventDurationPipeline,
"set_metrics": setMetricsPipeline,
"observer_version": getObserverVersionPipeline(version),
"observer_ids": observerIDsPipeline,
"ecs_version": ecsVersionPipeline,
"user_agent": userAgentPipeline,
"process_ppid": processPpidPipeline,
"client_geoip": clientGeoIPPipeline,
"event_duration": eventDurationPipeline,
"set_metrics": setMetricsPipeline,
"remove_event_received": removeEventReceivedPipeline,
}
return commonPipelines[name]
}
Expand Down Expand Up @@ -144,6 +145,15 @@ var clientGeoIPPipeline = []map[string]interface{}{{
},
}}

var removeEventReceivedPipeline = []map[string]interface{}{{
"remove": map[string]interface{}{
"field": "event.received",
"ignore_missing": true,
"ignore_failure": true,
"description": "The field is used for internal statistics but does not need to be indexed.",
},
}}

// This pipeline translates `event.duration` (defaulting to zero if not
// found) to `transaction.duration.us` or `span.duration.us` depending on
// the event type, and then removes `event.duration`. Older versions of
Expand Down
4 changes: 0 additions & 4 deletions internal/beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,6 @@ func (s *Runner) Run(ctx context.Context) error {
modelpb.ProcessBatchFunc(rateLimitBatchProcessor),
modelpb.ProcessBatchFunc(authorizeEventIngestProcessor),

// Add a model processor that removes `event.received`, which is added by
// apm-data, but which we don't yet map.
modelpb.ProcessBatchFunc(removeEventReceivedBatchProcessor),

// Pre-process events before they are sent to the final processors for
// aggregation, sampling, and indexing.
modelprocessor.SetHostHostname{},
Expand Down
11 changes: 0 additions & 11 deletions internal/beater/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,6 @@ func newObserverBatchProcessor() modelpb.ProcessBatchFunc {
}
}

// TODO remove this once we have added `event.received` to the
// data stream mappings, in 8.10.
func removeEventReceivedBatchProcessor(ctx context.Context, batch *modelpb.Batch) error {
for _, event := range *batch {
if event.Event != nil {
event.Event.Received = 0
}
}
return nil
}

func newDocappenderBatchProcessor(a *docappender.Appender) modelpb.ProcessBatchFunc {
var pool sync.Pool
pool.New = func() any {
Expand Down