diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala index ef02d20b..f4ea311d 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetricsOf.scala @@ -3,7 +3,7 @@ package com.evolutiongaming.skafka.consumer import cats.effect.{Resource, Sync} import cats.effect.std.UUIDGen import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.{Topic, TopicPartition} +import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition} import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry @@ -11,6 +11,26 @@ import scala.concurrent.duration.FiniteDuration object ConsumerMetricsOf { + /** + * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. + * + * @param sourceOf original [[ConsumerMetrics]] factory + * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix + * @return [[ConsumerMetrics]] that exposes Java Kafka client metrics + */ + def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen]( + sourceOf: ClientId => ConsumerMetrics[F], + prometheus: CollectorRegistry, + prefix: Option[String], + ): Resource[F, ClientId => ConsumerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield { (clientId: ClientId) => + val source = sourceOf(clientId) + consumerMetricsOf(source, registry) + } + /** * Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics. * @@ -26,7 +46,13 @@ object ConsumerMetricsOf { ): Resource[F, ConsumerMetrics[F]] = for { registry <- KafkaMetricsRegistry.of(prometheus, prefix) - } yield new ConsumerMetrics[F] { + } yield consumerMetricsOf(source, registry) + + private def consumerMetricsOf[F[_]]( + source: ConsumerMetrics[F], + registry: KafkaMetricsRegistry[F], + ): ConsumerMetrics[F] = + new ConsumerMetrics[F] { override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] = source.call(name, topic, latency, success) diff --git a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala index ee3ce3a8..b45891f3 100644 --- a/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala +++ b/modules/metrics/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetricsOf.scala @@ -3,7 +3,7 @@ package com.evolutiongaming.skafka.producer import cats.effect.{Resource, Sync} import cats.effect.std.UUIDGen import com.evolutiongaming.catshelper.ToTry -import com.evolutiongaming.skafka.Topic +import com.evolutiongaming.skafka.{ClientId, Topic} import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry import io.prometheus.client.CollectorRegistry @@ -11,6 +11,26 @@ import scala.concurrent.duration.FiniteDuration object ProducerMetricsOf { + /** + * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. + * + * @param sourceOf original [[ProducerMetrics]] factory + * @param prometheus instance of Prometheus registry + * @param prefix metric name prefix + * @return [[ProducerMetrics]] that exposes Java Kafka client metrics + */ + def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen]( + sourceOf: ClientId => ProducerMetrics[F], + prometheus: CollectorRegistry, + prefix: Option[String], + ): Resource[F, ClientId => ProducerMetrics[F]] = + for { + registry <- KafkaMetricsRegistry.of(prometheus, prefix) + } yield { (clientId: ClientId) => + val source = sourceOf(clientId) + producerMetricsOf(source, registry) + } + /** * Construct [[ProducerMetrics]] that will expose Java Kafka client metrics. * @@ -26,7 +46,13 @@ object ProducerMetricsOf { ): Resource[F, ProducerMetrics[F]] = for { registry <- KafkaMetricsRegistry.of(prometheus, prefix) - } yield new ProducerMetrics[F] { + } yield producerMetricsOf(source, registry) + + private def producerMetricsOf[F[_]]( + source: ProducerMetrics[F], + registry: KafkaMetricsRegistry[F], + ): ProducerMetrics[F] = + new ProducerMetrics[F] { override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency) override def beginTransaction: F[Unit] = source.beginTransaction diff --git a/version.sbt b/version.sbt index 80f5c980..41d07dfb 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "16.2.1-SNAPSHOT" +ThisBuild / version := "16.3.0"