From 99570ee625f837b66eeeb25f99a9122ba86f9219 Mon Sep 17 00:00:00 2001 From: Tim Martin Date: Tue, 7 Jan 2025 17:52:31 -0500 Subject: [PATCH] Add metrics to SingleThreadedChunker Using a similar set of metrics to what the `TimedChunker` uses. It's helpful for diagnosing queue depth issues. When we switched to the SPSC queue, we were surprised the metrics were missing so I figured we could add them in. --- .../network/push/SingleThreadedChunker.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/SingleThreadedChunker.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/SingleThreadedChunker.java index 9c8fb8140..532cd546d 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/SingleThreadedChunker.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/SingleThreadedChunker.java @@ -16,6 +16,10 @@ package io.reactivex.mantis.network.push; +import io.mantisrx.common.metrics.Counter; +import io.mantisrx.common.metrics.Metrics; +import io.mantisrx.common.metrics.MetricsRegistry; +import io.mantisrx.common.metrics.spectator.MetricGroupId; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -33,6 +37,11 @@ public class SingleThreadedChunker implements Callable { int iteration = 0; private int index = 0; + private final Counter numEventsDrained; + private final Counter drainTriggeredByTimer; + private final Counter drainTriggeredByBatch; + + public SingleThreadedChunker(ChunkProcessor processor, MonitoredQueue iQ, int chunkSize, long maxChunkInterval, ConnectionManager connMgr) { this.inputQueue = iQ; this.chunkSize = chunkSize; @@ -41,6 +50,17 @@ public SingleThreadedChunker(ChunkProcessor processor, MonitoredQueue iQ, this.connectionManager = connMgr; chunk = new Object[this.chunkSize]; + MetricGroupId metricsGroup = new MetricGroupId("SingleThreadedChunker"); + Metrics metrics = new Metrics.Builder() + .id(metricsGroup) + .addCounter("numEventsDrained") + .addCounter("drainTriggeredByTimer") + .addCounter("drainTriggeredByBatch") + .build(); + numEventsDrained = metrics.getCounter("numEventsDrained"); + drainTriggeredByTimer = metrics.getCounter("drainTriggeredByTimer"); + drainTriggeredByBatch = metrics.getCounter("drainTriggeredByBatch"); + MetricsRegistry.getInstance().registerAndGet(metrics); } private boolean stopCondition() { @@ -58,6 +78,7 @@ public Void call() throws Exception { long currTime = System.currentTimeMillis(); if (currTime - maxChunkInterval > chunkStartTime) { + drainTriggeredByTimer.increment(); drain(); } iteration = 0; @@ -73,6 +94,7 @@ public Void call() throws Exception { chunk[index++] = ele; } } else { + drainTriggeredByBatch.increment(); drain(); chunkStartTime = System.currentTimeMillis(); if (stopCondition()) { @@ -93,6 +115,7 @@ private void drain() { } processor.process(connectionManager, copy); + numEventsDrained.increment(copy.size()); index = 0; } }