diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 760066b2f7c3..4abef08d1dfd 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -221,7 +221,8 @@ func (c *asyncClient) sendEvents(ref *msgRef, events []publisher.Event) error { window[i] = &events[i].Content } ref.count.Add(1) - return client.Send(ref.callback, window) + + return client.Send(ref.customizedCallback(), window) } func (c *asyncClient) getClient() *v2.AsyncClient { @@ -231,7 +232,15 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } -func (r *msgRef) callback(n uint32, err error) { +func (r *msgRef) customizedCallback() func(uint32, error) { + start := time.Now() + + return func(n uint32, err error) { + r.callback(start, n, err) + } +} + +func (r *msgRef) callback(start time.Time, n uint32, err error) { r.client.observer.AckedEvents(int(n)) r.slice = r.slice[n:] r.deadlockListener.ack(int(n)) @@ -246,6 +255,11 @@ func (r *msgRef) callback(n uint32, err error) { r.win.tryGrowWindow(r.batchSize) } } + + // Report the latency for the batch of events + duration := time.Since(start) + r.client.observer.ReportLatency(duration) + r.dec() }