From 4e8f4586e0e6259ecf1713fed40554d4b7eb1437 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 31 Jan 2025 18:56:18 +0000 Subject: [PATCH] Include full queue metrics in the monitoring index (#42439) (#42476) Add queue metrics to the Metricbeat monitoring schema so they can be included in standard Agent dashboards. This is the Beats-side half of https://github.com/elastic/beats/issues/42093. Affected metrics are: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`, all within `monitoring.metrics.libbeat.pipeline.queue`. (cherry picked from commit 7eb2bdd28c04f0f66638e2e4feea8fe32159c795) Co-authored-by: Fae Charlton --- CHANGELOG.next.asciidoc | 2 ++ libbeat/publisher/pipeline/monitoring.go | 18 ------------------ metricbeat/module/beat/stats/data.go | 23 ++++++++++++++++++++++- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index da572f3d1c9a..5327da99cb5d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -118,6 +118,8 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - Fix setting unique registry for non beat receivers {issue}42288[42288] {pull}42292[42292] - The Kafka output now drops events when there is an authorisation error {issue}42343[42343] {pull}42401[42401] - Fix autodiscovery memory leak related to metadata of start events {pull}41748[41748] +- All standard queue metrics are now included in metrics monitoring, including: `added.{events, bytes}`, `consumed.{events, bytes}`, `removed.{events, bytes}`, and `filled.{events, bytes, pct}`. {pull}42439[42439] +- The following output latency metrics are now included in metrics monitoring: `output.latency.{count, max, median, p99}`. {pull}42439[42439] *Auditbeat* diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index 4a1e5ad76a1a..50a32ad13fbe 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -74,11 +74,6 @@ type metricsObserverVars struct { eventsTotal, eventsFiltered, eventsPublished, eventsFailed *monitoring.Uint eventsDropped, eventsRetry *monitoring.Uint // (retryer) drop/retry counters activeEvents *monitoring.Uint - - // queue metrics - queueACKed *monitoring.Uint - queueMaxEvents *monitoring.Uint - percentQueueFull *monitoring.Float } func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { @@ -118,19 +113,6 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { // events.dropped counts events that were dropped because errors from // the output workers exceeded the configured maximum retry count. eventsDropped: monitoring.NewUint(reg, "events.dropped"), - - // (Gauge) queue.max_events measures the maximum number of events the - // queue will accept, or 0 if there is none. - queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"), - - // queue.acked counts events that have been acknowledged by the output - // workers. This includes events that were dropped for fatal errors, - // which are also reported in events.dropped. - queueACKed: monitoring.NewUint(reg, "queue.acked"), - - // (Gauge) queue.filled.pct.events measures the fraction (from 0 to 1) - // of the queue's event capacity that is currently filled. - percentQueueFull: monitoring.NewFloat(reg, "queue.filled.pct.events"), }, } } diff --git a/metricbeat/module/beat/stats/data.go b/metricbeat/module/beat/stats/data.go index f8b63c4d7c42..b6500903c9e7 100644 --- a/metricbeat/module/beat/stats/data.go +++ b/metricbeat/module/beat/stats/data.go @@ -85,8 +85,29 @@ var ( "pipeline": c.Dict("pipeline", s.Schema{ "clients": c.Int("clients"), "queue": c.Dict("queue", s.Schema{ - "acked": c.Int("acked"), "max_events": c.Int("max_events"), + + "added": c.Dict("added", s.Schema{ + "events": c.Int("events"), + "bytes": c.Int("bytes"), + }), + "consumed": c.Dict("consumed", s.Schema{ + "events": c.Int("events"), + "bytes": c.Int("bytes"), + }), + "removed": c.Dict("removed", s.Schema{ + "events": c.Int("events"), + "bytes": c.Int("bytes"), + }), + "filled": c.Dict("filled", s.Schema{ + "events": c.Int("events"), + "bytes": c.Int("bytes"), + "pct": c.Float("pct"), + }), + + // Backwards compatibility: "acked" is the old name for + // "removed.events" and should not be used by new code/dashboards. + "acked": c.Int("acked"), }), "events": c.Dict("events", s.Schema{ "active": c.Int("active"),