Skip to content

Commit

Permalink
Merge pull request #430 from evolution-gaming/df/expose-java-client-m…
Browse files Browse the repository at this point in the history
…etrics

Exposing native metrics of multiple clients in the same JVM
  • Loading branch information
dfakhritdinov authored Jul 15, 2024
2 parents 1da5c3f + 3b47961 commit 7a7a5d3
Show file tree
Hide file tree
Showing 10 changed files with 363 additions and 23 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,29 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>
}
```

## Java client metrics example

The example below demonstrates creation of `Consumer`, but same can be done for `Producer` as well.

> :warning: using `ConsumerMetricsOf.withJavaClientMetrics` (or its alternative `metrics.exposeJavaClientMetrics`)
> registers new Prometheus collector under the hood. Please use unique prefixes for each collector
> to avoid duplicated metrics in Prometheus (i.e. runtime exception on registration).
> Prefix can be set as parameter in: `ConsumerMetricsOf.withJavaClientMetrics(prometheus, Some("the_prefix"))`
```scala
import ConsumerMetricsOf.*

val config: ConsumerConfig = ???
val prometheus: CollectorRegistry = ???
val metrics: ConsumerMetrics[IO] = ???

for {
metrics <- metrics.exposeJavaClientMetrics(prometheus)
consumerOf = ConsumerOf.apply1(metrics1.some)
consumer <- consumerOf(config)
} yield ???
```

## Setup

```scala
Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ lazy val commonSettings = Seq(
ProblemFilters.exclude[ReversedMissingMethodProblem]("com.evolutiongaming.skafka.consumer.Consumer.subscribe"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"com.evolutiongaming.skafka.Converters#MapJOps.asScalaMap$extension"
)
),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.skafka.consumer.ConsumerMetrics#ConsumerMetricsOps.mapK$extension"),
ProblemFilters.exclude[DirectMissingMethodProblem]("com.evolutiongaming.skafka.producer.ProducerMetrics#ProducerMetricsOps.mapK$extension"),
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.evolutiongaming.skafka.consumer

import cats.effect.{Resource, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.{Topic, TopicPartition}
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import io.prometheus.client.CollectorRegistry

import scala.concurrent.duration.FiniteDuration

object ConsumerMetricsOf {

/**
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
* @param source original [[ConsumerMetrics]]
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
source: ConsumerMetrics[F],
prometheus: CollectorRegistry,
prefix: Option[String],
): Resource[F, ConsumerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
source.call(name, topic, latency, success)

override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] =
source.poll(topic, bytes, records, age)

override def count(name: String, topic: Topic): F[Unit] =
source.count(name, topic)

override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] =
source.rebalance(name, topicPartition)

override def topics(latency: FiniteDuration): F[Unit] =
source.topics(latency)

override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] =
registry.register(consumer.clientMetrics)

}

implicit final class ConsumerMetricsOps[F[_]](val source: ConsumerMetrics[F]) extends AnyVal {

/**
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def exposeJavaClientMetrics(
prometheus: CollectorRegistry,
prefix: Option[String] = None,
)(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ConsumerMetrics[F]] =
withJavaClientMetrics(source, prometheus, prefix)

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.evolutiongaming.skafka.metrics

import cats.syntax.all._
import cats.effect.syntax.resource._
import cats.effect.{Resource, Ref, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.ClientMetric
import io.prometheus.client.CollectorRegistry

import java.util.UUID

/**
* Allows reporting metrics of multiple Kafka clients inside a single VM.
*
* Example:
* {{{
* val prometheus: CollectorRegistry = ???
* val consumerOf: ConsumerOf[F] = ???
* val producerOf: ProducerOf[F] = ???
*
* for {
* registry <- KafkaMetricsRegistry.of(prometheus)
*
* consumer <- consumerOf(consumerConfig)
* _ <- registry.register(consumer.clientMetrics)
*
* producer <- producerOf(producerConfig)
* _ <- registry.register(producer.clientMetrics)
* } yield ()
* }}}
*/
trait KafkaMetricsRegistry[F[_]] {

/**
* Register a function to obtain a list of client metrics.
* Normally, you would pass
* [[com.evolutiongaming.skafka.consumer.Consumer#clientMetrics]] or
* [[com.evolutiongaming.skafka.producer.Producer#clientMetrics]]
*/
def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit]
}

object KafkaMetricsRegistry {

def of[F[_]: Sync: ToTry: UUIDGen](
prometheus: CollectorRegistry,
prefix: Option[String] = None,
): Resource[F, KafkaMetricsRegistry[F]] =
for {
sources <- Ref[F].of(Map.empty[UUID, F[Seq[ClientMetric[F]]]]).toResource

metrics = sources
.get
.flatMap { sources =>
sources
.toList
.flatTraverse {
case (uuid, metrics) =>
metrics.map { metrics =>
metrics.toList.map { metric => metric.copy(tags = metric.tags + ("uuid" -> uuid.toString)) }
}
}
}
.widen[Seq[ClientMetric[F]]]

collector = new KafkaMetricsCollector[F](metrics, prefix)
allocate = Sync[F].delay { prometheus.register(collector) }
release = Sync[F].delay { prometheus.unregister(collector) }

_ <- Resource.make(allocate)(_ => release)
} yield new KafkaMetricsRegistry[F] {

def register(metrics: F[Seq[ClientMetric[F]]]): Resource[F, Unit] =
for {
uuid <- UUIDGen[F].randomUUID.toResource

allocate = sources.update { sources => sources.updated(uuid, metrics) }
release = sources.update { sources => sources - uuid }

_ <- Resource.make(allocate)(_ => release)
} yield {}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.evolutiongaming.skafka.producer

import cats.effect.{Resource, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.Topic
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
import io.prometheus.client.CollectorRegistry

import scala.concurrent.duration.FiniteDuration

object ProducerMetricsOf {

/**
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param source original [[ProducerMetrics]]
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
source: ProducerMetrics[F],
prometheus: CollectorRegistry,
prefix: Option[String],
): Resource[F, ProducerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ProducerMetrics[F] {
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)

override def beginTransaction: F[Unit] = source.beginTransaction

override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] = source.sendOffsetsToTransaction(latency)

override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency)

override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency)

override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] = source.send(topic, latency, bytes)

override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency)

override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency)

override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency)

override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency)

override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] =
registry.register(producer.clientMetrics)

}

implicit final class ProducerMetricsOps[F[_]](val source: ProducerMetrics[F]) extends AnyVal {

/**
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def exposeJavaClientMetrics(
prometheus: CollectorRegistry,
prefix: Option[String],
)(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ProducerMetrics[F]] =
withJavaClientMetrics(source, prometheus, prefix)

}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.evolutiongaming.skafka.consumer

import cats.effect.Resource
import cats.effect.{MonadCancel, Resource}
import cats.implicits._
import cats.{Applicative, Monad, ~>}
import com.evolutiongaming.skafka.{ClientId, Topic, TopicPartition}
import com.evolutiongaming.smetrics.MetricsHelper._
import com.evolutiongaming.smetrics.{CollectorRegistry, LabelNames, Quantiles}

import scala.concurrent.duration.FiniteDuration
import scala.annotation.nowarn

trait ConsumerMetrics[F[_]] {

Expand All @@ -20,6 +21,9 @@ trait ConsumerMetrics[F[_]] {
def rebalance(name: String, topicPartition: TopicPartition): F[Unit]

def topics(latency: FiniteDuration): F[Unit]

private[consumer] def exposeJavaMetrics[K, V](@nowarn consumer: Consumer[F, K, V]): Resource[F, Unit] =
Resource.unit[F]
}

object ConsumerMetrics {
Expand Down Expand Up @@ -104,11 +108,11 @@ object ConsumerMetrics {
bytesSummary <- bytesSummary
rebalancesCounter <- rebalancesCounter
topicsLatency <- topicsLatency
ageSummary <- registry.summary(
name = s"${ prefix }_poll_age",
help = "Poll records age, time since record.timestamp",
ageSummary <- registry.summary(
name = s"${prefix}_poll_age",
help = "Poll records age, time since record.timestamp",
quantiles = Quantiles.Default,
labels = LabelNames("client", "topic")
labels = LabelNames("client", "topic")
)
} yield { (clientId: ClientId) =>
new ConsumerMetrics[F] {
Expand Down Expand Up @@ -162,6 +166,7 @@ object ConsumerMetrics {

implicit class ConsumerMetricsOps[F[_]](val self: ConsumerMetrics[F]) extends AnyVal {

@deprecated("Use mapK(f, g) instead", "16.2.0")
def mapK[G[_]](f: F ~> G): ConsumerMetrics[G] = {
new MapK with ConsumerMetrics[G] {

Expand All @@ -186,5 +191,36 @@ object ConsumerMetrics {
}
}
}

def mapK[G[_]](
fg: F ~> G,
gf: G ~> F
)(implicit F: MonadCancel[F, Throwable], G: MonadCancel[G, Throwable]): ConsumerMetrics[G] = {
new MapK with ConsumerMetrics[G] {

def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean) = {
fg(self.call(name, topic, latency, success))
}

def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]) = {
fg(self.poll(topic, bytes, records, age))
}

def count(name: String, topic: Topic) = {
fg(self.count(name, topic))
}

def rebalance(name: String, topicPartition: TopicPartition) = {
fg(self.rebalance(name, topicPartition))
}

def topics(latency: FiniteDuration) = {
fg(self.topics(latency))
}

override def exposeJavaMetrics[K, V](consumer: Consumer[G, K, V]) =
self.exposeJavaMetrics(consumer.mapK(gf, fg)).mapK(fg)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,31 @@ object ConsumerOf {
def apply[K, V](config: ConsumerConfig)(implicit fromBytesK: FromBytes[F, K], fromBytesV: FromBytes[F, V]) = {
Consumer
.of[F, K, V](config)
.map { consumer =>
metrics.fold { consumer } { consumer.withMetrics1[Throwable] }
.flatMap { consumer =>
metrics match {

case None =>
Resource.pure[F, Consumer[F, K, V]](consumer)

case Some(metrics) =>
for {
_ <- metrics.exposeJavaMetrics[K, V](consumer)
} yield {
consumer.withMetrics1[Throwable](metrics)
}
}
}
}
}
}

/** The sole purpose of this method is to support binary compatibility with an intermediate
* version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics`
* and `apply2` using `MeasureDuration` from `cats-helper`.
* This should not be used and should be removed in a reasonable amount of time.
*/
* version (namely, 15.2.0) which had `apply1` method using `MeasureDuration` from `smetrics`
* and `apply2` using `MeasureDuration` from `cats-helper`.
* This should not be used and should be removed in a reasonable amount of time.
*/
@deprecated("Use `apply1`", since = "16.0.3")
def apply2[F[_] : Async : ToTry : ToFuture : MeasureDuration](
def apply2[F[_]: Async: ToTry: ToFuture: MeasureDuration](
metrics: Option[ConsumerMetrics[F]] = None
): ConsumerOf[F] = apply1(metrics)

Expand Down
Loading

0 comments on commit 7a7a5d3

Please sign in to comment.