Skip to content
This repository has been archived by the owner on Oct 25, 2023. It is now read-only.

Add schema registry metrics #77

Merged
merged 2 commits into from
Jun 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ To use the exporter, the following environment variables need to be specified:
## Usage

```shell
./ccloudexporter [-cluster <cluster_id>] [-connector <connector_id>] [-ksqlDB <app_id>]
./ccloudexporter [-cluster <cluster_id>] [-connector <connector_id>] [-ksqlDB <app_id>] [-schemaRegistry <sr_id>]
```

### Options
Expand All @@ -34,6 +34,8 @@ Usage of ./ccloudexporter:
Granularity for the metrics query, by default set to 1 minutes (default "PT1M")
-ksqlDB string
Comma separated list of ksqlDB application to fetch metric for. If not specified, the environment variable CCLOUD_KSQL will be used
-schemaRegistry string
Comma separated list of Schema Registry ID to fetch metric for. If not specified, the environment variable CCLOUD_SCHEMA_REGISTRY will be used
-listener string
Listener for the HTTP interface (default ":2112")
-log-pretty-print
Expand Down Expand Up @@ -122,14 +124,15 @@ If you do not provide a configuration file, the exporter creates one from the pr

#### Rule configuration

| Key | Description |
|--------------------|---------------------------------------------------------------------------------------------------------------|
| rules.clusters | List of Kafka clusters to fetch metrics for |
| rules.connectors | List of connectors to fetch metrics for |
| rules.ksqls | List of ksqlDB applications to fetch metrics for |
| rules.labels | Labels to exposed to Prometheus and group by in the query |
| rules.topics | Optional list of topics to filter the metrics |
| rules.metrics | List of metrics to gather |
| Key | Description |
|------------------------|---------------------------------------------------------------------------------------------------------------|
| rules.clusters | List of Kafka clusters to fetch metrics for |
| rules.connectors | List of connectors to fetch metrics for |
| rules.ksqls | List of ksqlDB applications to fetch metrics for |
| rules.schemaRegistries | List of Schema Registries id to fetch metrics for |
| rules.labels | Labels to exposed to Prometheus and group by in the query |
| rules.topics | Optional list of topics to filter the metrics |
| rules.metrics | List of metrics to gather |

### Examples of configuration files

Expand All @@ -154,6 +157,8 @@ rules:
- $CCLOUD_CONNECTOR
ksqls:
- $CCLOUD_KSQL
schemaRegistries:
- $CCLOUD_SCHEMA_REGISTRY
metrics:
- io.confluent.kafka.server/received_bytes
- io.confluent.kafka.server/sent_bytes
Expand All @@ -170,6 +175,7 @@ rules:
- io.confluent.kafka.connect/sent_records
- io.confluent.kafka.connect/dead_letter_queue_records
- io.confluent.kafka.ksql/streaming_unit_count
- io.confluent.kafka.schema_registry/schema_count
labels:
- kafka_id
- topic
Expand Down
28 changes: 20 additions & 8 deletions cmd/internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ type CCloudCollectorMetric struct {
// CCloudCollector is a custom prometheu collector to collect data from
// Confluent Cloud Metrics API
type CCloudCollector struct {
metrics map[string]CCloudCollectorMetric
rules []Rule
kafkaCollector *KafkaCCloudCollector
connectorCollector *ConnectorCCloudCollector
ksqlCollector *KsqlCCloudCollector
metrics map[string]CCloudCollectorMetric
rules []Rule
kafkaCollector *KafkaCCloudCollector
connectorCollector *ConnectorCCloudCollector
ksqlCollector *KsqlCCloudCollector
schemaRegistryCollector *SchemaRegistryCCloudCollector
}

var (
Expand All @@ -45,6 +46,7 @@ func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) {
cc.kafkaCollector.Describe(ch)
cc.connectorCollector.Describe(ch)
cc.ksqlCollector.Describe(ch)
cc.schemaRegistryCollector.Describe(ch)
}

// Collect all metrics for Prometheus
Expand All @@ -54,6 +56,7 @@ func (cc CCloudCollector) Collect(ch chan<- prometheus.Metric) {
cc.kafkaCollector.Collect(ch, &wg)
cc.connectorCollector.Collect(ch, &wg)
cc.ksqlCollector.Collect(ch, &wg)
cc.schemaRegistryCollector.Collect(ch, &wg)
wg.Wait()
}

Expand All @@ -68,9 +71,10 @@ func NewCCloudCollector() CCloudCollector {
}

var (
connectorResource ResourceDescription
kafkaResource ResourceDescription
ksqlResource ResourceDescription
connectorResource ResourceDescription
kafkaResource ResourceDescription
ksqlResource ResourceDescription
schemaRegistryResource ResourceDescription
)
resourceDescription := SendResourceDescriptorQuery()
for _, resource := range resourceDescription.Data {
Expand All @@ -80,6 +84,8 @@ func NewCCloudCollector() CCloudCollector {
kafkaResource = resource
} else if resource.Type == "ksql" {
ksqlResource = resource
} else if resource.Type == "schema_registry" {
schemaRegistryResource = resource
}
}

Expand All @@ -95,14 +101,20 @@ func NewCCloudCollector() CCloudCollector {
log.WithField("descriptorResponse", resourceDescription).Fatalln("No ksqlDB resource available")
}

if schemaRegistryResource.Type == "" {
log.WithField("descriptorResponse", resourceDescription).Fatalln("No SchemaRegistry resource available")
}

collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)}
kafkaCollector := NewKafkaCCloudCollector(collector, kafkaResource)
connectorCollector := NewConnectorCCloudCollector(collector, connectorResource)
ksqlCollector := NewKsqlCCloudCollector(collector, ksqlResource)
schemaRegistryCollector := NewSchemaRegistryCCloudCollector(collector, schemaRegistryResource)

collector.kafkaCollector = &kafkaCollector
collector.connectorCollector = &connectorCollector
collector.ksqlCollector = &ksqlCollector
collector.schemaRegistryCollector = &schemaRegistryCollector

return collector
}
176 changes: 176 additions & 0 deletions cmd/internal/collector/collector_schemaregistry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package collector

//
// collector.go
// Copyright (C) 2020 gaspar_d </var/spool/mail/gaspar_d>
//
// Distributed under terms of the MIT license.
//

import (
"fmt"
"strconv"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
)

// SchemaRegistryCCloudCollector is a custom prometheus collector to collect data from
// Confluent Cloud Metrics API. It fetches schema_registry resources types metrics
type SchemaRegistryCCloudCollector struct {
metrics map[string]CCloudCollectorMetric
rules []Rule
ccloud CCloudCollector
resource ResourceDescription
}

// Describe collect all metrics for ccloudexporter
func (cc SchemaRegistryCCloudCollector) Describe(ch chan<- *prometheus.Desc) {
for _, desc := range cc.metrics {
ch <- desc.desc
desc.duration.Describe(ch)
}
}

// Collect all metrics for Prometheus
// to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine
func (cc SchemaRegistryCCloudCollector) Collect(ch chan<- prometheus.Metric, wg *sync.WaitGroup) {
for _, rule := range cc.rules {
for _, metric := range rule.Metrics {
_, present := cc.metrics[metric]
if !present {
continue
}

if len(rule.SchemaRegistries) <= 0 {
log.WithFields(log.Fields{"rule": rule}).Errorln("SchemaRegistries rule has no SchemaRegistry ID specified")
continue
}

wg.Add(1)
go cc.CollectMetricsForRule(wg, ch, rule, cc.metrics[metric])
}
}
}

// CollectMetricsForRule collects all metrics for a specific rule
func (cc SchemaRegistryCCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) {
defer wg.Done()
query := BuildSchemaRegistryQuery(ccmetric.metric, rule.SchemaRegistries, cc.resource)
log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created")
optimizedQuery, additionalLabels := OptimizeQuery(query)
log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized")
durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id))
timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set))
response, err := SendQuery(optimizedQuery)
timer.ObserveDuration()
ch <- durationMetric
if err != nil {
log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed")
return
}
log.WithFields(log.Fields{"response": response}).Traceln("Response has been received")
cc.handleResponse(response, ccmetric, ch, rule, additionalLabels)
}

func (cc SchemaRegistryCCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) {
desc := ccmetric.desc
for _, dataPoint := range response.Data {
value, ok := dataPoint["value"].(float64)
if !ok {
log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float")
return
}

labels := []string{}
for _, label := range ccmetric.labels {
name := cc.resource.datapointFieldNameForLabel(label)

// Could be remove when fix is done in descriptor.go line 95
if name == "resource.schema.registry.id" {
name = "resource.schema_registry.id"
}

labelValue, labelValuePresent := dataPoint[name].(string)
if !labelValuePresent {
labelValue, labelValuePresent = additionalLabels[name]
}
labels = append(labels, labelValue)
}
metric := prometheus.MustNewConstMetric(
desc,
prometheus.GaugeValue,
value,
labels...,
)

if Context.NoTimestamp {
ch <- metric
} else {
timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"]))
if err != nil {
log.WithError(err).Errorln("Can not parse timestamp, ignoring the response")
return
}
metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric)
ch <- metricWithTime
}
}
}

// NewSchemaRegistryCCloudCollector create a new Confluent Cloud SchemaRegistry collector
func NewSchemaRegistryCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) SchemaRegistryCCloudCollector {
collector := SchemaRegistryCCloudCollector{
rules: Context.GetSchemaRegistryRules(),
metrics: make(map[string]CCloudCollectorMetric),
ccloud: ccloudcollecter,
resource: resource,
}
descriptorResponse := SendDescriptorQuery(resource.Type)
log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received")
mapOfWhiteListedMetrics := Context.GetMapOfMetrics("io.confluent.kafka.schema_registry")

for _, metr := range descriptorResponse.Data {
_, metricPresent := mapOfWhiteListedMetrics[metr.Name]
if !metricPresent {
continue
}
delete(mapOfWhiteListedMetrics, metr.Name)
var labels []string
for _, metrLabel := range metr.Labels {
labels = append(labels, metrLabel.Key)
}

for _, rsrcLabel := range resource.Labels {
labels = append(labels, GetPrometheusNameForLabel(rsrcLabel.Key))
}
desc := prometheus.NewDesc(
"ccloud_metric_schema_registry_"+GetNiceNameForMetric(metr),
metr.Description,
labels,
nil,
)

requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "ccloud_metrics_api_request_latency",
Help: "Metrics API request latency",
ConstLabels: map[string]string{"metric": metr.Name},
}, []string{"ruleNumber"})

metric := CCloudCollectorMetric{
metric: metr,
desc: desc,
duration: requestDuration,
labels: labels,
}
collector.metrics[metr.Name] = metric
}

if len(mapOfWhiteListedMetrics) > 0 {
log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API")
}

return collector
}
14 changes: 14 additions & 0 deletions cmd/internal/collector/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Rule struct {
Clusters []string `mapstructure:"clusters"`
Connectors []string `mapstructure:"connectors"`
Ksql []string `mapstructure:"ksqls"`
SchemaRegistries []string `mapstructure:"schemaregistries"`
Metrics []string `mapstructure:"metrics"`
GroupByLabels []string `mapstructure:"labels"`
cachedIgnoreGlobalResultForTopic map[TopicClusterMetric]bool
Expand Down Expand Up @@ -72,6 +73,7 @@ var DefaultMetrics = []string{
"io.confluent.kafka.connect/sent_records",
"io.confluent.kafka.connect/dead_letter_queue_records",
"io.confluent.kafka.ksql/streaming_unit_count",
"io.confluent.kafka.schema_registry/schema_count",
}

// GetMapOfMetrics returns the whitelist of metrics in a map
Expand Down Expand Up @@ -140,6 +142,18 @@ func (context ExporterContext) GetKsqlRules() []Rule {
return ksqlRules
}

// GetSchemaRegistryRules return all rules associated to at least one Schema Registry instance
func (context ExporterContext) GetSchemaRegistryRules() []Rule {
schemaRegistryRules := make([]Rule, 0)
for _, irule := range Context.Rules {
if len(irule.SchemaRegistries) > 0 {
schemaRegistryRules = append(schemaRegistryRules, irule)
}
}

return schemaRegistryRules
}

// ShouldIgnoreResultForRule returns true if the result for this topic need to be ignored for this rule.
// Some results might be ignored as they are defined in another rule, thus global and override result
// could conflict if we do not ignore the global result
Expand Down
22 changes: 14 additions & 8 deletions cmd/internal/collector/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ package collector
// Distributed under terms of the MIT license.
//

import "strings"
import "encoding/json"
import "io/ioutil"
import log "github.com/sirupsen/logrus"
import (
"encoding/json"
"io/ioutil"
"strings"

log "github.com/sirupsen/logrus"
)

// DescriptorMetricResponse is the response from Confluent Cloud API metric endpoint
// This is the JSON structure for the endpoint
Expand Down Expand Up @@ -54,10 +57,11 @@ type ResourceDescription struct {

var (
excludeListForMetric = map[string]string{
"io.confluent.kafka.server": "",
"io.confluent.kafka.connect": "",
"io.confluent.kafka.ksql": "",
"delta": "",
"io.confluent.kafka.server": "",
"io.confluent.kafka.connect": "",
"io.confluent.kafka.ksql": "",
"io.confluent.kafka.schema_registry": "",
"delta": "",
}
descriptorURI = "v2/metrics/cloud/descriptors/metrics"
descriptorResourceURI = "v2/metrics/cloud/descriptors/resources"
Expand Down Expand Up @@ -88,6 +92,8 @@ func (resource ResourceDescription) hasLabel(label string) bool {
func (resource ResourceDescription) datapointFieldNameForLabel(label string) string {
if resource.hasLabel(label) {
return "resource." + strings.Replace(label, "_", ".", -1)
// TODO fix it to work with schema_registry_id in param. Must return resource.schema_registry.id
// When fix, could remove hard coded conversion in collector_schemaregistry.go line 91->93
}
return "metric." + label
}
Expand Down
Loading