Skip to content

Commit

Permalink
Extract interval method to make it available for recording metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nvollmar committed Feb 22, 2024
1 parent d7b7174 commit a549ce5
Showing 1 changed file with 15 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
package org.apache.pekko.remote

import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import com.typesafe.config.Config

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.event.EventStream
import pekko.event.Logging
import pekko.event.Logging.Warning
Expand Down Expand Up @@ -141,7 +139,6 @@ class PhiAccrualFailureDetector(

@tailrec
final override def heartbeat(): Unit = {

val timestamp = clock()
val oldState = state.get

Expand All @@ -155,15 +152,7 @@ class PhiAccrualFailureDetector(
val interval = timestamp - latestTimestamp
// don't use the first heartbeat after failure for the history, since a long pause will skew the stats
if (isAvailable(timestamp)) {
if (interval >= (acceptableHeartbeatPauseMillis / 3 * 2) && eventStream.isDefined)
eventStream.get.publish(
Warning(
this.toString,
getClass,
s"heartbeat interval is growing too large for address $address: $interval millis",
Logging.emptyMDC,
RemoteLogMarker.failureDetectorGrowing(address)))
oldState.history :+ interval
recordInterval(interval)
} else oldState.history
}

Expand All @@ -174,6 +163,19 @@ class PhiAccrualFailureDetector(
if (!state.compareAndSet(oldState, newState)) heartbeat() // recur
}

@InternalApi
protected def recordInterval(interval: Long): HeartbeatHistory = {
if (interval >= (acceptableHeartbeatPauseMillis / 3 * 2) && eventStream.isDefined)
eventStream.get.publish(
Warning(
this.toString,
getClass,
s"heartbeat interval is growing too large for address $address: $interval millis",
Logging.emptyMDC,
RemoteLogMarker.failureDetectorGrowing(address)))
state.get.history :+ interval
}

/**
* The suspicion level of the accrual failure detector.
*
Expand Down

0 comments on commit a549ce5

Please sign in to comment.