Skip to content

Commit

Permalink
minor refactoring + fix docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Jul 15, 2024
1 parent 2a454d5 commit 3b47961
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,20 @@ val records: IO[ConsumerRecords[String, String]] = consumer.use { consumer =>

The example below demonstrates creation of `Consumer`, but same can be done for `Producer` as well.

> :warning: using `ConsumerMetricsOf.withJavaClientMetrics` registers new Prometheus collector
> under the hood. Please use unique prefixes for each collector to avoid duplicated metrics
> in Prometheus (i.e. runtime exception on registration). Prefix can be set as parameter in:
> `ConsumerMetricsOf.withJavaClientMetrics(registry, Some("the_prefix"))`
> :warning: using `ConsumerMetricsOf.withJavaClientMetrics` (or its alternative `metrics.exposeJavaClientMetrics`)
> registers new Prometheus collector under the hood. Please use unique prefixes for each collector
> to avoid duplicated metrics in Prometheus (i.e. runtime exception on registration).
> Prefix can be set as parameter in: `ConsumerMetricsOf.withJavaClientMetrics(prometheus, Some("the_prefix"))`
```scala
import ConsumerMetricsOf.*

val config: ConsumerConfig = ???
val registry: CollectorRegistry = ??? // Prometheus client
val prometheus: CollectorRegistry = ???
val metrics: ConsumerMetrics[IO] = ???

for {
metrics <- metrics.exposeJavaClientMetrics(registry)
metrics <- metrics.exposeJavaClientMetrics(prometheus)
consumerOf = ConsumerOf.apply1(metrics1.some)
consumer <- consumerOf(config)
} yield ???
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.evolutiongaming.skafka.consumer

import cats.effect.{Resource, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.{Topic, TopicPartition}
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
Expand All @@ -14,14 +15,14 @@ object ConsumerMetricsOf {
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
* @param source original [[ConsumerMetrics]]
* @param prefix metric name prefix
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry](
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
source: ConsumerMetrics[F],
prometheus: CollectorRegistry,
prefix: Option[String],
prometheus: CollectorRegistry
): Resource[F, ConsumerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
Expand Down Expand Up @@ -51,15 +52,15 @@ object ConsumerMetricsOf {
/**
* Construct [[ConsumerMetrics]] that will expose Java Kafka client metrics.
*
* @param prefix function that provides _unique_ prefix for each client
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ConsumerMetrics]] that exposes Java Kafka client metrics
*/
def exposeJavaClientMetrics(
prefix: Option[String],
prometheus: CollectorRegistry
prometheus: CollectorRegistry,
prefix: Option[String] = None,
)(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ConsumerMetrics[F]] =
withJavaClientMetrics(source, prefix, prometheus)
withJavaClientMetrics(source, prometheus, prefix)

}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.evolutiongaming.skafka.producer

import cats.effect.{Resource, Sync}
import cats.effect.std.UUIDGen
import com.evolutiongaming.catshelper.ToTry
import com.evolutiongaming.skafka.Topic
import com.evolutiongaming.skafka.metrics.KafkaMetricsRegistry
Expand All @@ -14,14 +15,14 @@ object ProducerMetricsOf {
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param source original [[ProducerMetrics]]
* @param prefix metric name prefix
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def withJavaClientMetrics[F[_]: Sync: ToTry](
def withJavaClientMetrics[F[_]: Sync: ToTry: UUIDGen](
source: ProducerMetrics[F],
prometheus: CollectorRegistry,
prefix: Option[String],
prometheus: CollectorRegistry
): Resource[F, ProducerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
Expand Down Expand Up @@ -56,15 +57,15 @@ object ProducerMetricsOf {
/**
* Construct [[ProducerMetrics]] that will expose Java Kafka client metrics.
*
* @param prefix metric name prefix
* @param prometheus instance of Prometheus registry
* @param prefix metric name prefix
* @return [[ProducerMetrics]] that exposes Java Kafka client metrics
*/
def exposeJavaClientMetrics(
prometheus: CollectorRegistry,
prefix: Option[String],
prometheus: CollectorRegistry
)(implicit F: Sync[F], toTry: ToTry[F]): Resource[F, ProducerMetrics[F]] =
withJavaClientMetrics(source, prefix, prometheus)
withJavaClientMetrics(source, prometheus, prefix)

}

Expand Down

0 comments on commit 3b47961

Please sign in to comment.