Skip to content

Commit

Permalink
Merge pull request #63 from MarshallWace/metrics
Browse files Browse the repository at this point in the history
Metrics
  • Loading branch information
mw-lb authored Oct 9, 2024
2 parents 20030ee + de5e33a commit a82c2ac
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* SPDX-FileCopyrightText: 2024 Marshall Wace <opensource@mwam.com>
*
* SPDX-License-Identifier: Apache-2.0
*/

package com.mwam.kafkakewl.common.metrics

fun metricsName(name: String): String {
return "kafkakewl_vext_$name";
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package com.mwam.kafkakewl.common.plugins

import com.auth0.jwt.JWT
import com.auth0.jwt.algorithms.Algorithm
import com.mwam.kafkakewl.common.metrics.metricsName
import io.github.smiley4.ktorswaggerui.dsl.*
import io.ktor.http.*
import io.ktor.serialization.kotlinx.json.json
Expand All @@ -24,31 +25,60 @@ import io.ktor.server.request.httpMethod
import io.ktor.server.request.path
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.micrometer.core.instrument.Gauge
import io.micrometer.core.instrument.MeterRegistry
import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.config.MeterFilter
import io.micrometer.prometheus.*
import kotlinx.serialization.json.Json
import org.koin.core.module.Module
import org.koin.dsl.module
import org.koin.ktor.ext.inject
import org.slf4j.event.Level

/** initializes the logging, sets some logback variables */
fun initializeLogging(kafkaClusterName: String) {
System.setProperty("LOGBACK_KAFKA_CLUSTER", kafkaClusterName)
}

/** creates a koin module with the metrics' PrometheusMeterRegistry */
fun koinModuleForMetrics(): Module {
val meterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
return module {
single<MeterRegistry> { meterRegistry }
single<PrometheusMeterRegistry> { meterRegistry }
}
}

fun Application.configureMonitoring() {
val appMicrometerRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
val meterRegistry by inject<PrometheusMeterRegistry>()

install(MicrometerMetrics) {
registry = appMicrometerRegistry
registry = meterRegistry
// ...
}
routing {
get("/metrics", {
hidden = true
}) {
call.respond(appMicrometerRegistry.scrape())
call.respond(meterRegistry.scrape())
}
}
}

fun Application.configureCoreMetrics(kafkaClusterName: String) {
val meterRegistry by inject<PrometheusMeterRegistry>()

// registering the kafka-cluster name as a common metrics tag
meterRegistry.config().meterFilter(MeterFilter.commonTags(listOf(Tag.of("kafka_cluster", kafkaClusterName))))

var startTimeMillis = System.currentTimeMillis()

Gauge.builder(metricsName("uptime"), { System.currentTimeMillis() - startTimeMillis })
.description("the up-time of the service in milliseconds")
.register(meterRegistry)
}

fun Application.configureHealthCheck(name: String, isHealth: () -> Boolean) {
routing {
get(name, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ fun main() {
fun Application.module(config: Config) {
configureSecurity()
configureHTTP()
configureMonitoring()
configureSerialization()
configureCallLogging()
configureFrameworks(config)
configureMonitoring()
configureCoreMetrics(config.kafkaCluster.name)

environment.monitor.subscribe(ApplicationStopped) { application ->
application.environment.log.info("Ktor server has stopped")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import com.mwam.kafkakewl.common.config.KafkaClientConfig
import com.mwam.kafkakewl.common.config.KafkaPersistentStoreConfig
import com.mwam.kafkakewl.common.persistence.KafkaPersistentStore
import com.mwam.kafkakewl.common.persistence.PersistentStore
import com.mwam.kafkakewl.common.plugins.koinModuleForMetrics
import com.mwam.kafkakewl.deploy.Config
import com.mwam.kafkakewl.deploy.services.TopologyDeploymentsService
import com.mwam.kafkakewl.deploy.services.TopologyDeploymentsServiceImpl
import com.mwam.kafkakewl.deploy.services.*
import io.ktor.server.application.*
import org.koin.dsl.module
import org.koin.ktor.plugin.Koin
Expand All @@ -21,12 +21,15 @@ import org.koin.logger.slf4jLogger
fun Application.configureFrameworks(config: Config) {
install(Koin) {
slf4jLogger()
modules(module {
single<Config> { config }
single<KafkaClientConfig> { (get<Config>().kafkaCluster.client) }
single<KafkaPersistentStoreConfig> { config.kafkaPersistentStore }
single<PersistentStore> { KafkaPersistentStore(get(), get()) }
single<TopologyDeploymentsService>(createdAtStart=true) { TopologyDeploymentsServiceImpl.create(get()) }
})
modules(
koinModuleForMetrics(),
module {
single<Config> { config }
single<KafkaClientConfig> { (get<Config>().kafkaCluster.client) }
single<KafkaPersistentStoreConfig> { config.kafkaPersistentStore }
single<PersistentStore> { KafkaPersistentStore(get(), get()) }
single<TopologyDeploymentsService>(createdAtStart=true) { TopologyDeploymentsServiceImpl.create(get()) }
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ fun main() {
fun Application.module(config: Config) {
configureSecurity()
configureHTTP()
configureMonitoring()
configureSerialization()
configureCallLogging()
configureFrameworks(config)
configureMonitoring()
configureCoreMetrics(config.kafkaCluster.name)

val kafkaTopicInfoSource by inject<KafkaTopicInfoSource>()
kafkaTopicInfoSource.startPublishing()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.mwam.kafkakewl.common.config.KafkaClientConfig
import com.mwam.kafkakewl.common.config.KafkaPersistentStoreConfig
import com.mwam.kafkakewl.common.persistence.KafkaPersistentStore
import com.mwam.kafkakewl.common.persistence.PersistentStore
import com.mwam.kafkakewl.common.plugins.koinModuleForMetrics
import com.mwam.kafkakewl.metrics.Config
import com.mwam.kafkakewl.metrics.services.*
import io.ktor.server.application.*
Expand All @@ -20,13 +21,16 @@ import org.koin.logger.slf4jLogger
fun Application.configureFrameworks(config: Config) {
install(Koin) {
slf4jLogger()
modules(module {
single<Config> { config }
single<KafkaClientConfig> { (get<Config>().kafkaCluster.client) }
single<KafkaPersistentStoreConfig> { config.kafkaPersistentStore }
single<PersistentStore> { KafkaPersistentStore(get(), get()) }
single<KafkaTopicInfoSource> { KafkaTopicInfoSourceImpl(get(), config.topicInfoSource) }
single<KafkaTopicInfoCache>(createdAtStart=true) { KafkaTopicInfoCacheImpl(get()) }
})
modules(
koinModuleForMetrics(),
module {
single<Config> { config }
single<KafkaClientConfig> { (get<Config>().kafkaCluster.client) }
single<KafkaPersistentStoreConfig> { config.kafkaPersistentStore }
single<PersistentStore> { KafkaPersistentStore(get(), get()) }
single<KafkaTopicInfoSource> { KafkaTopicInfoSourceImpl(get(), config.topicInfoSource) }
single<KafkaTopicInfoCache>(createdAtStart=true) { KafkaTopicInfoCacheImpl(get()) }
}
)
}
}

0 comments on commit a82c2ac

Please sign in to comment.