Skip to content

Commit

Permalink
extract consumer/producer metrics creation in separate method
Browse files Browse the repository at this point in the history
  • Loading branch information
Denys Fakhritdinov committed Jul 24, 2024
1 parent 941e551 commit 087e9e7
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,7 @@ object ConsumerMetricsOf {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield { (clientId: ClientId) =>
val source = sourceOf(clientId)

new ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
source.call(name, topic, latency, success)

override def poll(topic: Topic, bytes: Int, records: Int, age: Option[FiniteDuration]): F[Unit] =
source.poll(topic, bytes, records, age)

override def count(name: String, topic: Topic): F[Unit] =
source.count(name, topic)

override def rebalance(name: String, topicPartition: TopicPartition): F[Unit] =
source.rebalance(name, topicPartition)

override def topics(latency: FiniteDuration): F[Unit] =
source.topics(latency)

override def exposeJavaMetrics[K, V](consumer: Consumer[F, K, V]): Resource[F, Unit] =
registry.register(consumer.clientMetrics)
}
consumerMetricsOf(source, registry)
}

/**
Expand All @@ -65,7 +46,13 @@ object ConsumerMetricsOf {
): Resource[F, ConsumerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ConsumerMetrics[F] {
} yield consumerMetricsOf(source, registry)

private def consumerMetricsOf[F[_]](
source: ConsumerMetrics[F],
registry: KafkaMetricsRegistry[F],
): ConsumerMetrics[F] =
new ConsumerMetrics[F] {
override def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean): F[Unit] =
source.call(name, topic, latency, success)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,7 @@ object ProducerMetricsOf {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield { (clientId: ClientId) =>
val source = sourceOf(clientId)

new ProducerMetrics[F] {
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)

override def beginTransaction: F[Unit] = source.beginTransaction

override def sendOffsetsToTransaction(latency: FiniteDuration): F[Unit] =
source.sendOffsetsToTransaction(latency)

override def commitTransaction(latency: FiniteDuration): F[Unit] = source.commitTransaction(latency)

override def abortTransaction(latency: FiniteDuration): F[Unit] = source.abortTransaction(latency)

override def send(topic: Topic, latency: FiniteDuration, bytes: Int): F[Unit] =
source.send(topic, latency, bytes)

override def block(topic: Topic, latency: FiniteDuration): F[Unit] = source.block(topic, latency)

override def failure(topic: Topic, latency: FiniteDuration): F[Unit] = source.failure(topic, latency)

override def partitions(topic: Topic, latency: FiniteDuration): F[Unit] = source.partitions(topic, latency)

override def flush(latency: FiniteDuration): F[Unit] = source.flush(latency)

override def exposeJavaMetrics(producer: Producer[F]): Resource[F, Unit] =
registry.register(producer.clientMetrics)

}
producerMetricsOf(source, registry)
}

/**
Expand All @@ -73,7 +46,13 @@ object ProducerMetricsOf {
): Resource[F, ProducerMetrics[F]] =
for {
registry <- KafkaMetricsRegistry.of(prometheus, prefix)
} yield new ProducerMetrics[F] {
} yield producerMetricsOf(source, registry)

private def producerMetricsOf[F[_]](
source: ProducerMetrics[F],
registry: KafkaMetricsRegistry[F],
): ProducerMetrics[F] =
new ProducerMetrics[F] {
override def initTransactions(latency: FiniteDuration): F[Unit] = source.initTransactions(latency)

override def beginTransaction: F[Unit] = source.beginTransaction
Expand Down

0 comments on commit 087e9e7

Please sign in to comment.