From 2157f976b3d2a5257efedd9498a3f3d5c046afdd Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Sun, 21 Jul 2024 11:18:41 +0200 Subject: [PATCH 1/3] Expose Java client metrics via factory from ClientId --- .../skafka/consumer/ConsumerMetricsOf.scala | 40 +++++++++++++++ .../skafka/producer/ProducerMetricsOf.scala | 49 ++++++++++++++++++- version.sbt | 2 +- 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index ef02d20b..7c6cb0a1 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -8,9 +8,49 @@ import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry import scala.concurrent.duration.FiniteDuration +import com.evolutiongaming.skafka.ClientId object ConsumerMetricsOf { + /** + * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. + * + * @param sourceOf original [[ConsumerMetrics]] factory + * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix + * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics + */ + def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen]( + sourceOf: ClientId => ConsumerMetrics[F], + prometheus: CollectorRegistry, + prefix: Option[String], + ): Resource[F, ClientId => ConsumerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield { (clientId: ClientId) => + val source = sourceOf(clientId) + + new ConsumerMetrics[F] { + override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = + source.call(name, topic, latency, success) + + override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = + source.poll(topic, bytes, records, age) + + override def count(name: String, topic: Topic): F[Unit] = + source.count(name, topic) + + override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = + source.rebalance(name, topicPartition) + + override def topics(latency: FiniteDuration): F[Unit] = + source.topics(latency) + + override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = + registry.register(consumer.clientMetrics) + } + } + /** * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. * diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index ee3ce3a8..c7870879 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -3,7 +3,7 @@ package com.evolutiongaming.skafka.producer import cats.effect.{Resource, Sync} import cats.effect.std.UUIDGen import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.Topic +import com.evolutiongaming.skafka.{ClientId, Topic} import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry @@ -11,6 +11,53 @@ import scala.concurrent.duration.FiniteDuration object ProducerMetricsOf { + /** + * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. + * + * @param sourceOf original [[ProducerMetrics]] factory + * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix + * @return [[ProducerMetrics]] that exposes Java Kafka client metrics + */ + def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen]( + sourceOf: ClientId => ProducerMetrics[F], + prometheus: CollectorRegistry, + prefix: Option[String], + ): Resource[F, ClientId => ProducerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield { (clientId: ClientId) => + val source = sourceOf(clientId) + + new ProducerMetrics[F] { + override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) + + override def beginTransaction: F[Unit] = source.beginTransaction + + override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] = + source.sendOffsetsToTransaction(latency) + + override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency) + + override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency) + + override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] = + source.send(topic, latency, bytes) + + override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency) + + override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency) + + override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency) + + override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) + + override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = + registry.register(producer.clientMetrics) + + } + } + /** * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. * diff --git a/version.sbt b/version.sbt index 80f5c980..41d07dfb 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "16.2.1-SNAPSHOT" +ThisBuild / version := "16.3.0" From 941e551753e78995b6d286420699db129bf25030 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Sun, 21 Jul 2024 22:08:01 +0200 Subject: [PATCH 2/3] imports refactoring --- .../evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 7c6cb0a1..4b21c489 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -3,12 +3,11 @@ package com.evolutiongaming.skafka.consumer import cats.effect.{Resource, Sync} import cats.effect.std.UUIDGen import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.{Topic, TopicPartition} +import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition} import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry import scala.concurrent.duration.FiniteDuration -import com.evolutiongaming.skafka.ClientId object ConsumerMetricsOf { From 087e9e7b027cd6c6c3b2f656b6b18da42f359f49 Mon Sep 17 00:00:00 2001 From: Denys Fakhritdinov Date: Wed, 24 Jul 2024 11:27:42 +0200 Subject: [PATCH 3/3] extract consumer/producer metrics creation in separate method --- .../skafka/consumer/ConsumerMetricsOf.scala | 29 ++++----------- .../skafka/producer/ProducerMetricsOf.scala | 37 ++++--------------- 2 files changed, 16 insertions(+), 50 deletions(-) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index 4b21c489..f4ea311d 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -28,26 +28,7 @@ object ConsumerMetricsOf { registry <- KafkaMetricsRegistry.of(prometheus, prefix) } yield { (clientId: ClientId) => val source = sourceOf(clientId) - - new ConsumerMetrics[F] { - override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = - source.call(name, topic, latency, success) - - override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] = - source.poll(topic, bytes, records, age) - - override def count(name: String, topic: Topic): F[Unit] = - source.count(name, topic) - - override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] = - source.rebalance(name, topicPartition) - - override def topics(latency: FiniteDuration): F[Unit] = - source.topics(latency) - - override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] = - registry.register(consumer.clientMetrics) - } + consumerMetricsOf(source, registry) } /** @@ -65,7 +46,13 @@ object ConsumerMetricsOf { ): Resource[F, ConsumerMetrics[F]] = for { registry <- KafkaMetricsRegistry.of(prometheus, prefix) - } yield new ConsumerMetrics[F] { + } yield consumerMetricsOf(source, registry) + + private def consumerMetricsOf[F[_]]( + source: ConsumerMetrics[F], + registry: KafkaMetricsRegistry[F], + ): ConsumerMetrics[F] = + new ConsumerMetrics[F] { override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = source.call(name, topic, latency, success) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index c7870879..b45891f3 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -28,34 +28,7 @@ object ProducerMetricsOf { registry <- KafkaMetricsRegistry.of(prometheus, prefix) } yield { (clientId: ClientId) => val source = sourceOf(clientId) - - new ProducerMetrics[F] { - override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) - - override def beginTransaction: F[Unit] = source.beginTransaction - - override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] = - source.sendOffsetsToTransaction(latency) - - override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency) - - override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency) - - override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] = - source.send(topic, latency, bytes) - - override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency) - - override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency) - - override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency) - - override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency) - - override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] = - registry.register(producer.clientMetrics) - - } + producerMetricsOf(source, registry) } /** @@ -73,7 +46,13 @@ object ProducerMetricsOf { ): Resource[F, ProducerMetrics[F]] = for { registry <- KafkaMetricsRegistry.of(prometheus, prefix) - } yield new ProducerMetrics[F] { + } yield producerMetricsOf(source, registry) + + private def producerMetricsOf[F[_]]( + source: ProducerMetrics[F], + registry: KafkaMetricsRegistry[F], + ): ProducerMetrics[F] = + new ProducerMetrics[F] { override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) override def beginTransaction: F[Unit] = source.beginTransaction