diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/SingleThreadedChunker.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/SingleThreadedChunker.java index 9c8fb8140..532cd546d 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/SingleThreadedChunker.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/SingleThreadedChunker.java @@ -16,6 +16,10 @@ package io.reactivex.mantis.network.push; +import io.mantisrx.common.metrics.Counter; +import io.mantisrx.common.metrics.Metrics; +import io.mantisrx.common.metrics.MetricsRegistry; +import io.mantisrx.common.metrics.spectator.MetricGroupId; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; @@ -33,6 +37,11 @@ public class SingleThreadedChunker implements Callable { int iteration = 0; private int index = 0; + private final Counter numEventsDrained; + private final Counter drainTriggeredByTimer; + private final Counter drainTriggeredByBatch; + + public SingleThreadedChunker(ChunkProcessor processor, MonitoredQueue iQ, int chunkSize, long maxChunkInterval, ConnectionManager connMgr) { this.inputQueue = iQ; this.chunkSize = chunkSize; @@ -41,6 +50,17 @@ public SingleThreadedChunker(ChunkProcessor processor, MonitoredQueue iQ, this.connectionManager = connMgr; chunk = new Object[this.chunkSize]; + MetricGroupId metricsGroup = new MetricGroupId("SingleThreadedChunker"); + Metrics metrics = new Metrics.Builder() + .id(metricsGroup) + .addCounter("numEventsDrained") + .addCounter("drainTriggeredByTimer") + .addCounter("drainTriggeredByBatch") + .build(); + numEventsDrained = metrics.getCounter("numEventsDrained"); + drainTriggeredByTimer = metrics.getCounter("drainTriggeredByTimer"); + drainTriggeredByBatch = metrics.getCounter("drainTriggeredByBatch"); + MetricsRegistry.getInstance().registerAndGet(metrics); } private boolean stopCondition() { @@ -58,6 +78,7 @@ public Void call() throws Exception { long currTime = System.currentTimeMillis(); if (currTime - maxChunkInterval > chunkStartTime) { + drainTriggeredByTimer.increment(); drain(); } iteration = 0; @@ -73,6 +94,7 @@ public Void call() throws Exception { chunk[index++] = ele; } } else { + drainTriggeredByBatch.increment(); drain(); chunkStartTime = System.currentTimeMillis(); if (stopCondition()) { @@ -93,6 +115,7 @@ private void drain() { } processor.process(connectionManager, copy); + numEventsDrained.increment(copy.size()); index = 0; } }