Skip to content

Commit

Permalink
Merge pull request #433 from evolution-gaming/df/java-client-metrics-of
Browse files Browse the repository at this point in the history
Expose Java client metrics via factory from ClientId
  • Loading branch information
dfakhritdinov committed Jul 24, 2024
2 parents 004160d + 087e9e7 commit 6c8dfd6
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,34 @@ package com.evolutiongaming.skafka.consumer
import cats.effect.{Resource, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.{Topic, TopicPartition}
import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition}
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import io.prometheus.client.CollectorRegistry

import scala.concurrent.duration.FiniteDuration

object ConsumerMetricsOf {

/**
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
* @param sourceOf original [[ConsumerMetrics]] factory
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
sourceOf: ClientId => ConsumerMetrics[F],
prometheus: CollectorRegistry,
prefix: Option[String],
): Resource[F, ClientId => ConsumerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield { (clientId: ClientId) =>
val source = sourceOf(clientId)
consumerMetricsOf(source, registry)
}

/**
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
Expand All @@ -26,7 +46,13 @@ object ConsumerMetricsOf {
): Resource[F, ConsumerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ConsumerMetrics[F] {
} yield consumerMetricsOf(source, registry)

private def consumerMetricsOf[F[_]](
source: ConsumerMetrics[F],
registry: KafkaMetricsRegistry[F],
): ConsumerMetrics[F] =
new ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
source.call(name, topic, latency, success)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,34 @@ package com.evolutiongaming.skafka.producer
import cats.effect.{Resource, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.Topic
import com.evolutiongaming.skafka.{ClientId, Topic}
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import io.prometheus.client.CollectorRegistry

import scala.concurrent.duration.FiniteDuration

object ProducerMetricsOf {

/**
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param sourceOf original [[ProducerMetrics]] factory
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
sourceOf: ClientId => ProducerMetrics[F],
prometheus: CollectorRegistry,
prefix: Option[String],
): Resource[F, ClientId => ProducerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield { (clientId: ClientId) =>
val source = sourceOf(clientId)
producerMetricsOf(source, registry)
}

/**
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
Expand All @@ -26,7 +46,13 @@ object ProducerMetricsOf {
): Resource[F, ProducerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ProducerMetrics[F] {
} yield producerMetricsOf(source, registry)

private def producerMetricsOf[F[_]](
source: ProducerMetrics[F],
registry: KafkaMetricsRegistry[F],
): ProducerMetrics[F] =
new ProducerMetrics[F] {
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)

override def beginTransaction: F[Unit] = source.beginTransaction
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ThisBuild / version := "16.2.1-SNAPSHOT"
ThisBuild / version := "16.3.0"

0 comments on commit 6c8dfd6

Please sign in to comment.