From eac1fdd89d13dcebda2afe05e9523a13ca96a671 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Fri, 12 Jul 2024 16:12:04 +0200 Subject: [PATCH] Re-implement java kafka metrics collection using registry, kudos to Z1kkurat --- .../skafka/consumer/ConsumerMetricsOf.scala | 34 ++++---- .../skafka/metrics/KafkaMetricsRegistry.scala | 80 +++++++++++++++++++ .../skafka/producer/ProducerMetricsOf.scala | 33 ++++---- .../skafka/consumer/ConsumerMetrics.scala | 10 +-- .../skafka/consumer/ConsumerOf.scala | 2 +- .../skafka/producer/ProducerMetrics.scala | 9 +-- .../skafka/producer/ProducerOf.scala | 2 +- 7 files changed, 118 insertions(+), 52 deletions(-) create mode 100644 modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 91b5c709..e0709fb5 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -2,8 +2,8 @@ package com.evolutiongaming.skafka.consumer import cats.effect.{Resource, Sync} import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition} -import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector +import com.evolutiongaming.skafka.{Topic, TopicPartition} +import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry import scala.concurrent.duration.FiniteDuration @@ -14,16 +14,18 @@ object ConsumerMetricsOf { * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. * * @param source original [[ConsumerMetrics]] - * @param prefix function that provides _unique_ prefix for each client + * @param prefix metric name prefix * @param prometheus instance of Prometheus registry * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics */ def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ConsumerMetrics[F], - prefix: ClientId => String, + prefix: Option[String], prometheus: CollectorRegistry - ): ConsumerMetrics[F] = - new ConsumerMetrics[F] { + ): Resource[F, ConsumerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield new ConsumerMetrics[F] { override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = source.call(name, topic, latency, success) @@ -39,21 +41,12 @@ object ConsumerMetricsOf { override def topics(latency: FiniteDuration): F[Unit] = source.topics(latency) - override def exposeJavaMetrics[K, V]( - consumer: Consumer[F, K, V], - clientId: Option[ClientId] - ): Resource[F, Unit] = { - val collector = new KafkaMetricsCollector[F](consumer.clientMetrics, clientId.map(prefix)) - Resource.make { - Sync[F].delay { prometheus.register(collector) } - } { _ => - Sync[F].delay { prometheus.unregister(collector) } - } - } + override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = + registry.register(consumer.clientMetrics) } - implicit final class Syntax[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { + implicit final class ConsumerMetricsOps[F[_]](val source: ConsumerMetrics[F]) extends AnyVal { /** * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. @@ -63,9 +56,10 @@ object ConsumerMetricsOf { * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics */ def exposeJavaClientMetrics( - prefix: ClientId => String, + prefix: Option[String], prometheus: CollectorRegistry - )(implicit F: Sync[F], toTry: ToTry[F]): ConsumerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) + )(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ConsumerMetrics[F]] = + withJavaClientMetrics(source, prefix, prometheus) } } diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala new file mode 100644 index 00000000..0d55dc49 --- /dev/null +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/metrics/KafkaMetricsRegistry.scala @@ -0,0 +1,80 @@ +package com.evolutiongaming.skafka.metrics + +import cats.syntax.all._ +import cats.effect.syntax.resource._ +import cats.effect.{Resource, Ref, Sync} +import cats.effect.std.UUIDGen +import com.evolutiongaming.catshelper.ToTry +import com.evolutiongaming.skafka.ClientMetric +import io.prometheus.client.CollectorRegistry + +import java.util.UUID + +/** + * Allows reporting metrics of multiple Kafka clients inside a single VM. + * + * Example: + * {{{ + * val prometheus: CollectorRegistry = ??? + * val consumerOf: ConsumerOf[F] = ??? + * val producerOf: ProducerOf[F] = ??? + * + * for { + * registry <- KafkaMetricsRegistry.of(prometheus) + * + * consumer <- consumerOf(consumerConfig) + * _ <- registry.register(consumer.clientMetrics) + * + * producer <- producerOf(producerConfig) + * _ <- registry.register(producer.clientMetrics) + * } yield () + * }}} + */ +trait KafkaMetricsRegistry[F[_]] { + + /** + * Register a function to obtain a list of client metrics. + * Normally, you would pass + * [[com.evolutiongaming.skafka.consumer.Consumer#clientMetrics]] or + * [[com.evolutiongaming.skafka.producer.Producer#clientMetrics]] + */ + def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] +} + +object KafkaMetricsRegistry { + + def of[F[_]: Sync: ToTry: UUIDGen]( + prometheus: CollectorRegistry, + prefix: Option[String] = None, + ): Resource[F, KafkaMetricsRegistry[F]] = + for { + sources <- Ref[F].of(Map.empty[UUID, F[Seq[ClientMetric[F]]]]).toResource + + metrics = sources.get.flatMap { sources => + sources.toSeq.flatTraverse { + case (uuid, metrics) => + metrics.map { metrics => + metrics.map { metric => metric.copy(tags = metric.tags + ("uuid" -> uuid.toString)) } + } + } + } + + collector = new KafkaMetricsCollector[F](metrics, prefix) + allocate = Sync[F].delay { prometheus.register(collector) } + release = Sync[F].delay { prometheus.unregister(collector) } + + _ <- Resource.make(allocate)(_ => release) + } yield new KafkaMetricsRegistry[F] { + + def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] = + for { + uuid <- UUIDGen[F].randomUUID.toResource + + allocate = sources.update { sources => sources.updated(uuid, metrics) } + release = sources.update { sources => sources - uuid } + + _ <- Resource.make(allocate)(_ => release) + } yield {} + } + +} diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index 2e7fd62f..6efba510 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -2,8 +2,8 @@ package com.evolutiongaming.skafka.producer import cats.effect.{Resource, Sync} import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.{ClientId, Topic} -import com.evolutiongaming.skafka.metrics.KafkaMetricsCollector +import com.evolutiongaming.skafka.Topic +import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry import scala.concurrent.duration.FiniteDuration @@ -14,16 +14,18 @@ object ProducerMetricsOf { * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. * * @param source original [[ProducerMetrics]] - * @param prefix function that provides _unique_ prefix for each client + * @param prefix metric name prefix * @param prometheus instance of Prometheus registry * @return [[ProducerMetrics]] that exposes Java Kafka client metrics */ def withJavaClientMetrics[F[_]: Sync: ToTry]( source: ProducerMetrics[F], - prefix: ClientId => String, + prefix: Option[String], prometheus: CollectorRegistry - ): ProducerMetrics[F] = - new ProducerMetrics[F] { + ): Resource[F, ProducerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield new ProducerMetrics[F] { override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) override def beginTransaction: F[Unit] = source.beginTransaction @@ -44,30 +46,25 @@ object ProducerMetricsOf { override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) - override def exposeJavaMetrics(producer: Producer[F], clientId: Option[ClientId]): Resource[F, Unit] = { - val collector = new KafkaMetricsCollector[F](producer.clientMetrics, clientId.map(prefix)) - Resource.make { - Sync[F].delay { prometheus.register(collector) } - } { _ => - Sync[F].delay { prometheus.unregister(collector) } - } - } + override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = + registry.register(producer.clientMetrics) } - implicit final class Syntax[F[_]](val source: ProducerMetrics[F]) extends AnyVal { + implicit final class ProducerMetricsOps[F[_]](val source: ProducerMetrics[F]) extends AnyVal { /** * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. * - * @param prefix function that provides _unique_ prefix for each client + * @param prefix metric name prefix * @param prometheus instance of Prometheus registry * @return [[ProducerMetrics]] that exposes Java Kafka client metrics */ def exposeJavaClientMetrics( - prefix: ClientId => String, + prefix: Option[String], prometheus: CollectorRegistry - )(implicit F: Sync[F], toTry: ToTry[F]): ProducerMetrics[F] = withJavaClientMetrics(source, prefix, prometheus) + )(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ProducerMetrics[F]] = + withJavaClientMetrics(source, prefix, prometheus) } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala index 3c30488f..a2d38504 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala @@ -22,10 +22,8 @@ trait ConsumerMetrics[F[_]] { def topics(latency: FiniteDuration): F[Unit] - private[consumer] def exposeJavaMetrics[K, V]( - @nowarn consumer: Consumer[F, K, V], - @nowarn clientId: Option[ClientId], - ): Resource[F, Unit] = Resource.unit[F] + private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] = + Resource.unit[F] } object ConsumerMetrics { @@ -220,8 +218,8 @@ object ConsumerMetrics { fg(self.topics(latency)) } - override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V], clientId: Option[ClientId]) = - self.exposeJavaMetrics(consumer.mapK(gf, fg), clientId).mapK(fg) + override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) = + self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg) } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala index 1a6911a4..c9a97662 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerOf.scala @@ -39,7 +39,7 @@ object ConsumerOf { case Some(metrics) => for { - _ <- metrics.exposeJavaMetrics[K, V](consumer, config.common.clientId) + _ <- metrics.exposeJavaMetrics[K, V](consumer) } yield { consumer.withMetrics1[Throwable](metrics) } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala index 6911337a..8a1610e0 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala @@ -32,10 +32,7 @@ trait ProducerMetrics[F[_]] { def flush(latency: FiniteDuration): F[Unit] - private[producer] def exposeJavaMetrics( - @nowarn producer: Producer[F], - @nowarn clientId: Option[ClientId], - ): Resource[F, Unit] = Resource.unit[F] + private[producer] def exposeJavaMetrics(@nowarn producer: Producer[F]): Resource[F, Unit] = Resource.unit[F] } object ProducerMetrics { @@ -233,8 +230,8 @@ object ProducerMetrics { def flush(latency: FiniteDuration) = fg(self.flush(latency)) - override def exposeJavaMetrics(producer: Producer[G], clientId: Option[ClientId]) = - self.exposeJavaMetrics(producer.mapK[F](gf, fg), clientId).mapK(fg) + override def exposeJavaMetrics(producer: Producer[G]) = + self.exposeJavaMetrics(producer.mapK[F](gf, fg)).mapK(fg) } } } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala index de5b4547..e431616c 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerOf.scala @@ -33,7 +33,7 @@ object ProducerOf { case Some(metrics) => for { - _ <- metrics.exposeJavaMetrics(producer, config.common.clientId) + _ <- metrics.exposeJavaMetrics(producer) } yield { producer.withMetrics[Throwable](metrics) }