Skip to content

Commit

Permalink
Rework PhiAccrualFailureDetector to enable monitoring of interval (ap…
Browse files Browse the repository at this point in the history
…ache#1137)

* Introduce trait to avoid matching concrete subclass
* Extract interval method to make it available for recording metrics

---------
Co-authored-by: Matthew de Detrich <matthew.dedetrich@aiven.io>
  • Loading branch information
nvollmar committed Feb 26, 2024
1 parent 6c57720 commit ff5bfbc
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# method was private to pekko package
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.remote.PhiAccrualFailureDetector.address_=")
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.{ Lock, ReentrantLock }

import scala.annotation.tailrec
import scala.collection.immutable.Map

/**
* A lock-less thread-safe implementation of [[org.apache.pekko.remote.FailureDetectorRegistry]].
Expand Down Expand Up @@ -59,8 +58,8 @@ class DefaultFailureDetectorRegistry[A](detectorFactory: () => FailureDetector)

// address below was introduced as a var because of binary compatibility constraints
newDetector match {
case phi: PhiAccrualFailureDetector => phi.address = resource.toString
case _ =>
case dwa: FailureDetectorWithAddress => dwa.setAddress(resource.toString)
case _ =>
}

newDetector.heartbeat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ trait FailureDetector {

}

trait FailureDetectorWithAddress {

/**
* Address of observed host will be set after detector creation.
*/
def setAddress(addr: String): Unit
}

object FailureDetector {

/**
Expand All @@ -50,6 +58,6 @@ object FailureDetector {
abstract class Clock extends (() => Long)

implicit val defaultClock: Clock = new Clock {
def apply() = NANOSECONDS.toMillis(System.nanoTime)
def apply(): Long = NANOSECONDS.toMillis(System.nanoTime)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
package org.apache.pekko.remote

import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration

import com.typesafe.config.Config

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.event.EventStream
import pekko.event.Logging
import pekko.event.Logging.Warning
Expand Down Expand Up @@ -73,7 +71,7 @@ class PhiAccrualFailureDetector(
eventStream: Option[EventStream])(
implicit
clock: Clock)
extends FailureDetector {
extends FailureDetector with FailureDetectorWithAddress {

/**
* Constructor without eventStream to support backwards compatibility
Expand Down Expand Up @@ -118,8 +116,9 @@ class PhiAccrualFailureDetector(

private val acceptableHeartbeatPauseMillis = acceptableHeartbeatPause.toMillis

// address below was introduced as a var because of binary compatibility constraints
private[pekko] var address: String = "N/A"
// NOTE: address below was introduced as a var because of binary compatibility constraints
private var address: String = "N/A"
def setAddress(addr: String): Unit = this.address = addr

/**
* Implement using optimistic lockless concurrency, all state is represented
Expand All @@ -139,7 +138,6 @@ class PhiAccrualFailureDetector(

@tailrec
final override def heartbeat(): Unit = {

val timestamp = clock()
val oldState = state.get

Expand All @@ -152,17 +150,10 @@ class PhiAccrualFailureDetector(
// this is a known connection
val interval = timestamp - latestTimestamp
// don't use the first heartbeat after failure for the history, since a long pause will skew the stats
if (isAvailable(timestamp)) {
if (interval >= (acceptableHeartbeatPauseMillis / 3 * 2) && eventStream.isDefined)
eventStream.get.publish(
Warning(
this.toString,
getClass,
s"heartbeat interval is growing too large for address $address: $interval millis",
Logging.emptyMDC,
RemoteLogMarker.failureDetectorGrowing(address)))
oldState.history :+ interval
} else oldState.history
if (isAvailable(timestamp))
recordInterval(interval)
else
oldState.history
}

// record new timestamp and possibly-amended history
Expand All @@ -172,6 +163,19 @@ class PhiAccrualFailureDetector(
if (!state.compareAndSet(oldState, newState)) heartbeat() // recur
}

@InternalApi
protected def recordInterval(interval: Long): HeartbeatHistory = {
if (interval >= (acceptableHeartbeatPauseMillis / 3 * 2) && eventStream.isDefined)
eventStream.get.publish(
Warning(
this.toString,
getClass,
s"heartbeat interval is growing too large for address $address: $interval millis",
Logging.emptyMDC,
RemoteLogMarker.failureDetectorGrowing(address)))
state.get.history :+ interval
}

/**
* The suspicion level of the accrual failure detector.
*
Expand Down

0 comments on commit ff5bfbc

Please sign in to comment.