Skip to content

Commit

Permalink
Extract interval method to make it available for recording metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nvollmar committed Feb 21, 2024
1 parent d7b7174 commit 5957aa1
Showing 1 changed file with 18 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
package org.apache.pekko.remote

import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import com.typesafe.config.Config

import org.apache.pekko
import org.apache.pekko.annotation.InternalApi
import pekko.event.EventStream
import pekko.event.Logging
import pekko.event.Logging.Warning
Expand Down Expand Up @@ -141,7 +139,6 @@ class PhiAccrualFailureDetector(

@tailrec
final override def heartbeat(): Unit = {

val timestamp = clock()
val oldState = state.get

Expand All @@ -155,16 +152,10 @@ class PhiAccrualFailureDetector(
val interval = timestamp - latestTimestamp
// don't use the first heartbeat after failure for the history, since a long pause will skew the stats
if (isAvailable(timestamp)) {
if (interval >= (acceptableHeartbeatPauseMillis / 3 * 2) && eventStream.isDefined)
eventStream.get.publish(
Warning(
this.toString,
getClass,
s"heartbeat interval is growing too large for address $address: $interval millis",
Logging.emptyMDC,
RemoteLogMarker.failureDetectorGrowing(address)))
oldState.history :+ interval
} else oldState.history
recordInterval(interval)
} else {
oldState.history
}
}

// record new timestamp and possibly-amended history
Expand All @@ -174,6 +165,19 @@ class PhiAccrualFailureDetector(
if (!state.compareAndSet(oldState, newState)) heartbeat() // recur
}

@InternalApi
protected def recordInterval(interval: Long): HeartbeatHistory = {
if (interval >= (acceptableHeartbeatPauseMillis / 3 * 2) && eventStream.isDefined)
eventStream.get.publish(
Warning(
this.toString,
getClass,
s"heartbeat interval is growing too large for address $address: $interval millis",
Logging.emptyMDC,
RemoteLogMarker.failureDetectorGrowing(address)))
state.get.history :+ interval
}

/**
* The suspicion level of the accrual failure detector.
*
Expand Down

0 comments on commit 5957aa1

Please sign in to comment.