Skip to content

Commit

Permalink
Re-implement java kafka metrics collection using registry, kudos to Z…
Browse files Browse the repository at this point in the history
…1kkurat
  • Loading branch information
Denys Fakhritdinov committed Jul 12, 2024
1 parent 1da4c59 commit eac1fdd
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.evolutiongaming.skafka.consumer

import cats.effect.{Resource, Sync}
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition}
import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector
import com.evolutiongaming.skafka.{Topic, TopicPartition}
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import io.prometheus.client.CollectorRegistry

import scala.concurrent.duration.FiniteDuration
Expand All @@ -14,16 +14,18 @@ object ConsumerMetricsOf {
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
* @param source original [[ConsumerMetrics]]
* @param prefix function that provides _unique_ prefix for each client
* @param prefix metric name prefix
* @param prometheus instance of Prometheus registry
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry](
source: ConsumerMetrics[F],
prefix: ClientId => String,
prefix: Option[String],
prometheus: CollectorRegistry
): ConsumerMetrics[F] =
new ConsumerMetrics[F] {
): Resource[F, ConsumerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
source.call(name, topic, latency, success)

Expand All @@ -39,21 +41,12 @@ object ConsumerMetricsOf {
override def topics(latency: FiniteDuration): F[Unit] =
source.topics(latency)

override def exposeJavaMetrics[K, V](
consumer: Consumer[F, K, V],
clientId: Option[ClientId]
): Resource[F, Unit] = {
val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, clientId.map(prefix))
Resource.make {
Sync[F].delay { prometheus.register(collector) }
} { _ =>
Sync[F].delay { prometheus.unregister(collector) }
}
}
override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] =
registry.register(consumer.clientMetrics)

}

implicit final class Syntax[F[_]](val source: ConsumerMetrics[F]) extends AnyVal {
implicit final class ConsumerMetricsOps[F[_]](val source: ConsumerMetrics[F]) extends AnyVal {

/**
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
Expand All @@ -63,9 +56,10 @@ object ConsumerMetricsOf {
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def exposeJavaClientMetrics(
prefix: ClientId => String,
prefix: Option[String],
prometheus: CollectorRegistry
)(implicit F: Sync[F], toTry: ToTry[F]): ConsumerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus)
)(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ConsumerMetrics[F]] =
withJavaClientMetrics(source, prefix, prometheus)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.evolutiongaming.skafka.metrics

import cats.syntax.all._
import cats.effect.syntax.resource._
import cats.effect.{Resource, Ref, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.ClientMetric
import io.prometheus.client.CollectorRegistry

import java.util.UUID

/**
* Allows reporting metrics of multiple Kafka clients inside a single VM.
*
* Example:
* {{{
* val prometheus: CollectorRegistry = ???
* val consumerOf: ConsumerOf[F] = ???
* val producerOf: ProducerOf[F] = ???
*
* for {
* registry <- KafkaMetricsRegistry.of(prometheus)
*
* consumer <- consumerOf(consumerConfig)
* _ <- registry.register(consumer.clientMetrics)
*
* producer <- producerOf(producerConfig)
* _ <- registry.register(producer.clientMetrics)
* } yield ()
* }}}
*/
trait KafkaMetricsRegistry[F[_]] {

/**
* Register a function to obtain a list of client metrics.
* Normally, you would pass
* [[com.evolutiongaming.skafka.consumer.Consumer#clientMetrics]] or
* [[com.evolutiongaming.skafka.producer.Producer#clientMetrics]]
*/
def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit]
}

object KafkaMetricsRegistry {

def of[F[_]: Sync: ToTry: UUIDGen](
prometheus: CollectorRegistry,
prefix: Option[String] = None,
): Resource[F, KafkaMetricsRegistry[F]] =
for {
sources <- Ref[F].of(Map.empty[UUID, F[Seq[ClientMetric[F]]]]).toResource

metrics = sources.get.flatMap { sources =>
sources.toSeq.flatTraverse {
case (uuid, metrics) =>
metrics.map { metrics =>
metrics.map { metric => metric.copy(tags = metric.tags + ("uuid" -> uuid.toString)) }
}
}
}

collector = new KafkaMetricsCollector[F](metrics, prefix)
allocate = Sync[F].delay { prometheus.register(collector) }
release = Sync[F].delay { prometheus.unregister(collector) }

_ <- Resource.make(allocate)(_ => release)
} yield new KafkaMetricsRegistry[F] {

def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] =
for {
uuid <- UUIDGen[F].randomUUID.toResource

allocate = sources.update { sources => sources.updated(uuid, metrics) }
release = sources.update { sources => sources - uuid }

_ <- Resource.make(allocate)(_ => release)
} yield {}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.evolutiongaming.skafka.producer

import cats.effect.{Resource, Sync}
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.{ClientId, Topic}
import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector
import com.evolutiongaming.skafka.Topic
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import io.prometheus.client.CollectorRegistry

import scala.concurrent.duration.FiniteDuration
Expand All @@ -14,16 +14,18 @@ object ProducerMetricsOf {
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param source original [[ProducerMetrics]]
* @param prefix function that provides _unique_ prefix for each client
* @param prefix metric name prefix
* @param prometheus instance of Prometheus registry
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry](
source: ProducerMetrics[F],
prefix: ClientId => String,
prefix: Option[String],
prometheus: CollectorRegistry
): ProducerMetrics[F] =
new ProducerMetrics[F] {
): Resource[F, ProducerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ProducerMetrics[F] {
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)

override def beginTransaction: F[Unit] = source.beginTransaction
Expand All @@ -44,30 +46,25 @@ object ProducerMetricsOf {

override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency)

override def exposeJavaMetrics(producer: Producer[F], clientId: Option[ClientId]): Resource[F, Unit] = {
val collector = new KafkaMetricsCollector[F](producer.clientMetrics, clientId.map(prefix))
Resource.make {
Sync[F].delay { prometheus.register(collector) }
} { _ =>
Sync[F].delay { prometheus.unregister(collector) }
}
}
override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] =
registry.register(producer.clientMetrics)

}

implicit final class Syntax[F[_]](val source: ProducerMetrics[F]) extends AnyVal {
implicit final class ProducerMetricsOps[F[_]](val source: ProducerMetrics[F]) extends AnyVal {

/**
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param prefix function that provides _unique_ prefix for each client
* @param prefix metric name prefix
* @param prometheus instance of Prometheus registry
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def exposeJavaClientMetrics(
prefix: ClientId => String,
prefix: Option[String],
prometheus: CollectorRegistry
)(implicit F: Sync[F], toTry: ToTry[F]): ProducerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus)
)(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ProducerMetrics[F]] =
withJavaClientMetrics(source, prefix, prometheus)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ trait ConsumerMetrics[F[_]] {

def topics(latency: FiniteDuration): F[Unit]

private[consumer] def exposeJavaMetrics[K, V](
@nowarn consumer: Consumer[F, K, V],
@nowarn clientId: Option[ClientId],
): Resource[F, Unit] = Resource.unit[F]
private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] =
Resource.unit[F]
}

object ConsumerMetrics {
Expand Down Expand Up @@ -220,8 +218,8 @@ object ConsumerMetrics {
fg(self.topics(latency))
}

override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V], clientId: Option[ClientId]) =
self.exposeJavaMetrics(consumer.mapK(gf, fg), clientId).mapK(fg)
override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) =
self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object ConsumerOf {

case Some(metrics) =>
for {
_ <- metrics.exposeJavaMetrics[K, V](consumer, config.common.clientId)
_ <- metrics.exposeJavaMetrics[K, V](consumer)
} yield {
consumer.withMetrics1[Throwable](metrics)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ trait ProducerMetrics[F[_]] {

def flush(latency: FiniteDuration): F[Unit]

private[producer] def exposeJavaMetrics(
@nowarn producer: Producer[F],
@nowarn clientId: Option[ClientId],
): Resource[F, Unit] = Resource.unit[F]
private[producer] def exposeJavaMetrics(@nowarn producer: Producer[F]): Resource[F, Unit] = Resource.unit[F]
}

object ProducerMetrics {
Expand Down Expand Up @@ -233,8 +230,8 @@ object ProducerMetrics {

def flush(latency: FiniteDuration) = fg(self.flush(latency))

override def exposeJavaMetrics(producer: Producer[G], clientId: Option[ClientId]) =
self.exposeJavaMetrics(producer.mapK[F](gf, fg), clientId).mapK(fg)
override def exposeJavaMetrics(producer: Producer[G]) =
self.exposeJavaMetrics(producer.mapK[F](gf, fg)).mapK(fg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object ProducerOf {

case Some(metrics) =>
for {
_ <- metrics.exposeJavaMetrics(producer, config.common.clientId)
_ <- metrics.exposeJavaMetrics(producer)
} yield {
producer.withMetrics[Throwable](metrics)
}
Expand Down

0 comments on commit eac1fdd

Please sign in to comment.