diff --git a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConsistentHashingRouter.java b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConsistentHashingRouter.java index 4a90ab476..f622cb993 100644 --- a/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConsistentHashingRouter.java +++ b/mantis-network/src/main/java/io/reactivex/mantis/network/push/ConsistentHashingRouter.java @@ -16,6 +16,10 @@ package io.reactivex.mantis.network.push; +import io.mantisrx.common.metrics.Counter; +import io.mantisrx.common.metrics.Metrics; +import io.mantisrx.common.metrics.MetricsRegistry; +import io.mantisrx.common.metrics.spectator.MetricGroupId; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -37,12 +41,23 @@ public class ConsistentHashingRouter extends Router> { private static long validCacheAgeMSec = 5000; private HashFunction hashFunction; private AtomicReference>>>> cachedRingRef = new AtomicReference<>(); + private final MetricGroupId metricGroup = new MetricGroupId("ConsistentHashingRouter"); + private final Metrics metrics; + private final Counter collisionsCounter; public ConsistentHashingRouter(String name, Func1, byte[]> dataEncoder, HashFunction hashFunction) { super("ConsistentHashingRouter_" + name, dataEncoder); this.hashFunction = hashFunction; + + Metrics metrics = new Metrics.Builder() + .id(metricGroup) + .addCounter("numHashCollisions") + .build(); + + this.metrics = MetricsRegistry.getInstance().registerAndGet(metrics); + this.collisionsCounter = this.metrics.getCounter("numHashCollisions"); } @Override @@ -98,6 +113,7 @@ private AsyncConnection> lookupConnection(long hash, SortedMa private void computeRing(Set>> connections) { SortedMap>> ring = new TreeMap>>(); + Map> collisions = new HashMap<>(); for (AsyncConnection> connection : connections) { for (int i = 0; i < connectionRepetitionOnRing; i++) { // hash node on ring @@ -108,12 +124,21 @@ private void computeRing(Set>> connections) { byte[] connectionBytes = (connectionId + "-" + i).getBytes(); long hash = hashFunction.computeHash(connectionBytes); if (ring.containsKey(hash)) { - logger.error("Hash collision when computing ring. {} hashed to a value already in the ring.", connectionId + "-" + i); + this.collisionsCounter.increment(); + if(!collisions.containsKey(hash)) { + collisions.put(hash, new ArrayList<>()); + collisions.get(hash).add(ring.get(hash).getSlotId()); + } + + collisions.get(hash).add(connection.toString()); } ring.put(hash, connection); } } cachedRingRef.set(new SnapshotCache>>>(ring)); + if(!collisions.isEmpty()) { + logger.debug("hash collisions were found while recomputing ring: {}", collisions); + } } private SortedMap>> hashConnections(Set>> connections) {