From c502f20477ee724220228809f7572fa8b7ab6ce2 Mon Sep 17 00:00:00 2001 From: William Easton Date: Tue, 4 Feb 2025 00:43:14 -0600 Subject: [PATCH] Add latency metrics for logstash async output (#42565) * Add latency metrics for logstash async output * Properly handle per-batch latency (cherry picked from commit accc5e147cdbd028f3bcb3c7a03340ea60f7469f) --- libbeat/outputs/logstash/async.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 760066b2f7c3..4abef08d1dfd 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -221,7 +221,8 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { window[i] = &events[i].Content } ref.count.Add(1) - return client.Send(ref.callback, window) + + return client.Send(ref.customizedCallback(), window) } func (c *asyncClient) getClient() *v2.AsyncClient { @@ -231,7 +232,15 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } -func (r *msgRef) callback(n uint32, err error) { +func (r *msgRef) customizedCallback() func(uint32, error) { + start := time.Now() + + return func(n uint32, err error) { + r.callback(start, n, err) + } +} + +func (r *msgRef) callback(start time.Time, n uint32, err error) { r.client.observer.AckedEvents(int(n)) r.slice = r.slice[n:] r.deadlockListener.ack(int(n)) @@ -246,6 +255,11 @@ func (r *msgRef) callback(n uint32, err error) { r.win.tryGrowWindow(r.batchSize) } } + + // Report the latency for the batch of events + duration := time.Since(start) + r.client.observer.ReportLatency(duration) + r.dec() }