From 29a9085274e720437f7367f5d578b3d0ad1d5e75 Mon Sep 17 00:00:00 2001 From: Ivan Topolnjak Date: Thu, 4 Aug 2022 13:08:32 +0200 Subject: [PATCH] publish gauges with akka cluster members per status (#1189) * publish gauges with cluster members per status * only apply cluster instrumentation on Akka 2.5/2.6 * remove the bintray plugin * publish status and reachability gauges for Cluster members * use backwards-compatible version of Cluster.subscribe(...) * avoid registering member metrics without tags * put the cluster metrics behind an feature flag --- build.sbt | 4 - .../src/common/resources/reference.conf | 15 +- .../akka/AkkaInstrumentation.scala | 7 +- .../ClusterInstrumentation.scala | 174 ++++++++++++++++++ project/plugins.sbt | 1 - 5 files changed, 193 insertions(+), 8 deletions(-) create mode 100644 instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/instrumentations/ClusterInstrumentation.scala diff --git a/build.sbt b/build.sbt index 384d58971..142194096 100644 --- a/build.sbt +++ b/build.sbt @@ -196,7 +196,6 @@ lazy val `kamon-twitter-future` = (project in file("instrumentation/kamon-twitte .enablePlugins(JavaAgent) .settings(instrumentationSettings) .settings( - bintrayPackage := "kamon-futures", libraryDependencies ++= Seq( kanelaAgent % "provided", "com.twitter" %% "util-core" % "20.3.0" % "provided", @@ -211,7 +210,6 @@ lazy val `kamon-scalaz-future` = (project in file("instrumentation/kamon-scalaz- .enablePlugins(JavaAgent) .settings(instrumentationSettings) .settings( - bintrayPackage := "kamon-futures", libraryDependencies ++= Seq( kanelaAgent % "provided", "org.scalaz" %% "scalaz-concurrent" % "7.2.28" % "provided", @@ -226,7 +224,6 @@ lazy val `kamon-scala-future` = (project in file("instrumentation/kamon-scala-fu .enablePlugins(JavaAgent) .settings(instrumentationSettings) .settings( - bintrayPackage := "kamon-futures", libraryDependencies ++=Seq( kanelaAgent % "provided", scalatest % "test", @@ -240,7 +237,6 @@ lazy val `kamon-cats-io` = (project in file("instrumentation/kamon-cats-io")) .enablePlugins(JavaAgent) .settings(instrumentationSettings) .settings( - bintrayPackage := "kamon-futures", libraryDependencies ++= Seq( kanelaAgent % "provided", { diff --git a/instrumentation/kamon-akka/src/common/resources/reference.conf b/instrumentation/kamon-akka/src/common/resources/reference.conf index ce6d686c6..be3029c48 100644 --- a/instrumentation/kamon-akka/src/common/resources/reference.conf +++ b/instrumentation/kamon-akka/src/common/resources/reference.conf @@ -122,6 +122,17 @@ kamon.instrumentation.akka { # shards) will be sampled. shard-metrics-sample-interval = ${kamon.metric.tick-interval} } + + cluster { + + # !! EXPERIMENTAL !! + # + # Decides whether to expose the akka.cluster.[members|datacenters] metrics. These metrics are considered + # experimental and must be explicitly enabled until a future release when they graduate to stable. The name of + # this setting might change in the future. + track-cluster-metrics = no + + } } # Signals to akka that it should load KamonRemoteInstrument akka.remote.artery.advanced.instruments += "akka.remote.artery.KamonRemoteInstrument" @@ -144,7 +155,8 @@ kanela.modules { "kamon.instrumentation.akka.instrumentations.akka_25.DispatcherInstrumentation", "kamon.instrumentation.akka.instrumentations.akka_26.DispatcherInstrumentation", "kamon.instrumentation.akka.instrumentations.akka_26.ActorMonitorInstrumentation", - "kamon.instrumentation.akka.instrumentations.SchedulerInstrumentation" + "kamon.instrumentation.akka.instrumentations.SchedulerInstrumentation", + "kamon.instrumentation.akka.instrumentations.ClusterInstrumentation" ] within = [ @@ -152,6 +164,7 @@ kanela.modules { "^akka.event..*", "^akka.actor..*", "^akka.pattern..*", + "^akka.cluster..*", "^akka.routing..*", "kamon.instrumentation.akka.instrumentations..*" ] diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaInstrumentation.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaInstrumentation.scala index 4120262eb..22579873b 100644 --- a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaInstrumentation.scala +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/AkkaInstrumentation.scala @@ -84,7 +84,8 @@ object AkkaInstrumentation { autoGrouping: Boolean, allowDoomsdayWildcards: Boolean, safeActorTrackFilter: Filter, - safeActorStartTraceFilter: Filter + safeActorStartTraceFilter: Filter, + exposeClusterMetrics: Boolean ) object Settings { @@ -92,6 +93,7 @@ object AkkaInstrumentation { def from(config: Config): Settings = { val akkaConfig = config.getConfig("kamon.instrumentation.akka") val allowDoomsdayWildcards = akkaConfig.getBoolean("filters.actors.doomsday-wildcard") + val exposeClusterMetrics = akkaConfig.getBoolean("cluster.track-cluster-metrics") val askPatternWarning = akkaConfig.getString("ask-pattern-timeout-warning") match { case "off" => Off @@ -105,7 +107,8 @@ object AkkaInstrumentation { akkaConfig.getBoolean("auto-grouping"), allowDoomsdayWildcards, safeFilter(config.getConfig(TrackActorFilterName), allowDoomsdayWildcards), - safeFilter(config.getConfig(StartTraceActorFilterName), allowDoomsdayWildcards) + safeFilter(config.getConfig(StartTraceActorFilterName), allowDoomsdayWildcards), + exposeClusterMetrics ) } diff --git a/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/instrumentations/ClusterInstrumentation.scala b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/instrumentations/ClusterInstrumentation.scala new file mode 100644 index 000000000..07bff8cb3 --- /dev/null +++ b/instrumentation/kamon-akka/src/common/scala/kamon/instrumentation/akka/instrumentations/ClusterInstrumentation.scala @@ -0,0 +1,174 @@ +package kamon.instrumentation +package akka.instrumentations + +import _root_.akka.actor.{Actor, Address, ExtendedActorSystem, Props} +import _root_.akka.cluster.{Cluster, ClusterEvent, MemberStatus} +import kamon.Kamon +import kamon.instrumentation.akka.AkkaInstrumentation +import kamon.metric.Gauge +import kamon.tag.TagSet +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + +import scala.collection.mutable + +class ClusterInstrumentation extends InstrumentationBuilder with VersionFiltering { + + onAkka("2.5", "2.6") { + onType("akka.cluster.Cluster$") + .advise(method("createExtension").and(takesArguments(1)), AfterClusterInitializationAdvice) + } +} + +object AfterClusterInitializationAdvice { + + @Advice.OnMethodExit + def onClusterExtensionCreated(@Advice.Argument(0) system: ExtendedActorSystem, @Advice.Return clusterExtension: Cluster): Unit = { + val settings = AkkaInstrumentation.settings() + if(settings.exposeClusterMetrics) { + val stateExporter = system.systemActorOf(Props[ClusterInstrumentation.ClusterStateExporter], "kamon-cluster-state-exporter") + clusterExtension.subscribe(stateExporter, classOf[ClusterEvent.ClusterDomainEvent]) + } + } +} + +object ClusterInstrumentation { + + class ClusterStateExporter extends Actor { + private val clusterExtension = Cluster(context.system) + private val clusterTags = TagSet.of("akka.system.name", context.system.name) + + private val joiningMembers = ClusterMembersJoining.withTags(clusterTags) + private val weaklyUpMembers = ClusterMembersWeaklyUp.withTags(clusterTags) + private val upMembers = ClusterMembersUp.withTags(clusterTags) + private val leavingMembers = ClusterMembersLeaving.withTags(clusterTags) + private val exitingMembers = ClusterMembersExiting.withTags(clusterTags) + private val downMembers = ClusterMembersDown.withTags(clusterTags) + private val removedMembers = ClusterMembersRemoved.withTags(clusterTags) + private val totalMembers = ClusterMembersTotal.withTags(clusterTags) + private val unreachableMembers = ClusterMembersUnreachable.withTags(clusterTags) + private val unreachableDatacenters = ClusterDatacentersUnreachable.withTags(clusterTags) + private val monitoredNodes = mutable.HashMap.empty[Address, (Gauge, Gauge)] + + override def receive: Receive = { + case _: ClusterEvent.ClusterDomainEvent => updateAllStates(clusterExtension.state) + case initialState: ClusterEvent.CurrentClusterState => updateAllStates(initialState) + } + + private def updateAllStates(clusterState: ClusterEvent.CurrentClusterState): Unit = { + val membersPerStatus = clusterState.members.groupBy(_.status) + joiningMembers.update(membersPerStatus.getOrElse(MemberStatus.Joining, Set.empty).size) + weaklyUpMembers.update(membersPerStatus.getOrElse(MemberStatus.WeaklyUp, Set.empty).size) + upMembers.update(membersPerStatus.getOrElse(MemberStatus.Up, Set.empty).size) + leavingMembers.update(membersPerStatus.getOrElse(MemberStatus.Leaving, Set.empty).size) + exitingMembers.update(membersPerStatus.getOrElse(MemberStatus.Exiting, Set.empty).size) + downMembers.update(membersPerStatus.getOrElse(MemberStatus.Down, Set.empty).size) + + val removedMembersCount = membersPerStatus.getOrElse(MemberStatus.Removed, Set.empty).size + val totalMembersCount = clusterState.members.size - removedMembersCount + removedMembers.update(removedMembersCount) + totalMembers.update(totalMembersCount) + + unreachableMembers.update(clusterState.unreachable.size) + unreachableDatacenters.update(clusterState.unreachableDataCenters.size) + + // The status and reachability gauges will only be published for the subset of members that are currently being + // monitored by this node. + val currentlyMonitoredMembers = clusterState.members.filter(m => clusterExtension.failureDetector.isMonitoring(m.address)) + val currentlyMonitoredAddresses = currentlyMonitoredMembers.map { member => + val (statusGauge, reachabilityGauge) = monitoredNodes.getOrElseUpdate(member.address, { + val memberTags = clusterTags.withTag("member", member.address.toString) + + ( + ClusterMemberStatus.withTags(memberTags), + ClusterMemberReachability.withTags(memberTags) + ) + }) + + statusGauge.update(statusToGaugeValue(member.status)) + reachabilityGauge.update(if(clusterState.unreachable(member)) 1D else 0D) + member.address + } + + // Remove any cached Gauges for members that we might not be monitoring anymore + monitoredNodes.keys.filterNot(a => currentlyMonitoredAddresses(a)).foreach { addressToRemove => + monitoredNodes.remove(addressToRemove).foreach { + case (statusGauge, reachabilityGauge) => + statusGauge.remove() + reachabilityGauge.remove() + } + } + } + + private def statusToGaugeValue(memberStatus: MemberStatus): Double = memberStatus match { + case MemberStatus.Joining => 1 + case MemberStatus.WeaklyUp => 2 + case MemberStatus.Up => 3 + case MemberStatus.Leaving => 4 + case MemberStatus.Exiting => 5 + case MemberStatus.Down => 6 + case MemberStatus.Removed => 7 + case _ => 0 // This should never happen, but covering the bases here + } + } + + val ClusterMembersJoining = Kamon.gauge( + name = "akka.cluster.members.joining.count", + description = "Tracks the number of cluster members in the Joining state" + ) + + val ClusterMembersWeaklyUp = Kamon.gauge( + name = "akka.cluster.members.weakly-up.count", + description = "Tracks the number of cluster members in the Weakly-Up state" + ) + + val ClusterMembersUp = Kamon.gauge( + name = "akka.cluster.members.up.count", + description = "Tracks the number of cluster members in the Up state" + ) + + val ClusterMembersLeaving = Kamon.gauge( + name = "akka.cluster.members.leaving.count", + description = "Tracks the number of cluster members in the Leaving state" + ) + + val ClusterMembersExiting = Kamon.gauge( + name = "akka.cluster.members.exiting.count", + description = "Tracks the number of cluster members in the Exiting state" + ) + + val ClusterMembersDown = Kamon.gauge( + name = "akka.cluster.members.down.count", + description = "Tracks the number of cluster members in the Down state" + ) + + val ClusterMembersRemoved = Kamon.gauge( + name = "akka.cluster.members.removed.count", + description = "Tracks the number of cluster members in the Removed state" + ) + + val ClusterMembersTotal = Kamon.gauge( + name = "akka.cluster.members.total.count", + description = "Tracks the total number of cluster members, without including Removed members" + ) + + val ClusterMembersUnreachable = Kamon.gauge( + name = "akka.cluster.members.unreachable.count", + description = "Tracks the total number of cluster members marked as unreachable" + ) + + val ClusterDatacentersUnreachable = Kamon.gauge( + name = "akka.cluster.datacenters.unreachable.count", + description = "Tracks the total number of cluster members marked as unreachable" + ) + + val ClusterMemberStatus = Kamon.gauge( + name = "akka.cluster.members.status", + description = "Tracks the current status of all monitored nodes by a cluster member" + ) + + val ClusterMemberReachability = Kamon.gauge( + name = "akka.cluster.members.reachability", + description = "Tracks the current reachability status of all monitored nodes by a cluster member" + ) +} diff --git a/project/plugins.sbt b/project/plugins.sbt index fcc501bef..adb3bc44c 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -8,7 +8,6 @@ addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.10") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.6") addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.9.2") addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4") -addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.4") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.2.0") addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.8.1") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")