Skip to content
This repository was archived by the owner on Oct 25, 2023. It is now read-only.

Commit 8d66630

Browse files
authored
Add Schema Registry Metrics (#77)
1 parent 110df29 commit 8d66630

File tree

7 files changed

+305
-38
lines changed

7 files changed

+305
-38
lines changed

README.md

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ To use the exporter, the following environment variables need to be specified:
1313
## Usage
1414

1515
```shell
16-
./ccloudexporter [-cluster <cluster_id>] [-connector <connector_id>] [-ksqlDB <app_id>]
16+
./ccloudexporter [-cluster <cluster_id>] [-connector <connector_id>] [-ksqlDB <app_id>] [-schemaRegistry <sr_id>]
1717
```
1818

1919
### Options
@@ -34,6 +34,8 @@ Usage of ./ccloudexporter:
3434
Granularity for the metrics query, by default set to 1 minutes (default "PT1M")
3535
-ksqlDB string
3636
Comma separated list of ksqlDB application to fetch metric for. If not specified, the environment variable CCLOUD_KSQL will be used
37+
-schemaRegistry string
38+
Comma separated list of Schema Registry ID to fetch metric for. If not specified, the environment variable CCLOUD_SCHEMA_REGISTRY will be used
3739
-listener string
3840
Listener for the HTTP interface (default ":2112")
3941
-log-pretty-print
@@ -122,14 +124,15 @@ If you do not provide a configuration file, the exporter creates one from the pr
122124

123125
#### Rule configuration
124126

125-
| Key | Description |
126-
|--------------------|---------------------------------------------------------------------------------------------------------------|
127-
| rules.clusters | List of Kafka clusters to fetch metrics for |
128-
| rules.connectors | List of connectors to fetch metrics for |
129-
| rules.ksqls | List of ksqlDB applications to fetch metrics for |
130-
| rules.labels | Labels to exposed to Prometheus and group by in the query |
131-
| rules.topics | Optional list of topics to filter the metrics |
132-
| rules.metrics | List of metrics to gather |
127+
| Key | Description |
128+
|------------------------|---------------------------------------------------------------------------------------------------------------|
129+
| rules.clusters | List of Kafka clusters to fetch metrics for |
130+
| rules.connectors | List of connectors to fetch metrics for |
131+
| rules.ksqls | List of ksqlDB applications to fetch metrics for |
132+
| rules.schemaRegistries | List of Schema Registries id to fetch metrics for |
133+
| rules.labels | Labels to exposed to Prometheus and group by in the query |
134+
| rules.topics | Optional list of topics to filter the metrics |
135+
| rules.metrics | List of metrics to gather |
133136

134137
### Examples of configuration files
135138

@@ -154,6 +157,8 @@ rules:
154157
- $CCLOUD_CONNECTOR
155158
ksqls:
156159
- $CCLOUD_KSQL
160+
schemaRegistries:
161+
- $CCLOUD_SCHEMA_REGISTRY
157162
metrics:
158163
- io.confluent.kafka.server/received_bytes
159164
- io.confluent.kafka.server/sent_bytes
@@ -170,6 +175,7 @@ rules:
170175
- io.confluent.kafka.connect/sent_records
171176
- io.confluent.kafka.connect/dead_letter_queue_records
172177
- io.confluent.kafka.ksql/streaming_unit_count
178+
- io.confluent.kafka.schema_registry/schema_count
173179
labels:
174180
- kafka_id
175181
- topic

cmd/internal/collector/collector.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ type CCloudCollectorMetric struct {
2929
// CCloudCollector is a custom prometheu collector to collect data from
3030
// Confluent Cloud Metrics API
3131
type CCloudCollector struct {
32-
metrics map[string]CCloudCollectorMetric
33-
rules []Rule
34-
kafkaCollector *KafkaCCloudCollector
35-
connectorCollector *ConnectorCCloudCollector
36-
ksqlCollector *KsqlCCloudCollector
32+
metrics map[string]CCloudCollectorMetric
33+
rules []Rule
34+
kafkaCollector *KafkaCCloudCollector
35+
connectorCollector *ConnectorCCloudCollector
36+
ksqlCollector *KsqlCCloudCollector
37+
schemaRegistryCollector *SchemaRegistryCCloudCollector
3738
}
3839

3940
var (
@@ -45,6 +46,7 @@ func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) {
4546
cc.kafkaCollector.Describe(ch)
4647
cc.connectorCollector.Describe(ch)
4748
cc.ksqlCollector.Describe(ch)
49+
cc.schemaRegistryCollector.Describe(ch)
4850
}
4951

5052
// Collect all metrics for Prometheus
@@ -54,6 +56,7 @@ func (cc CCloudCollector) Collect(ch chan<- prometheus.Metric) {
5456
cc.kafkaCollector.Collect(ch, &wg)
5557
cc.connectorCollector.Collect(ch, &wg)
5658
cc.ksqlCollector.Collect(ch, &wg)
59+
cc.schemaRegistryCollector.Collect(ch, &wg)
5760
wg.Wait()
5861
}
5962

@@ -68,9 +71,10 @@ func NewCCloudCollector() CCloudCollector {
6871
}
6972

7073
var (
71-
connectorResource ResourceDescription
72-
kafkaResource ResourceDescription
73-
ksqlResource ResourceDescription
74+
connectorResource ResourceDescription
75+
kafkaResource ResourceDescription
76+
ksqlResource ResourceDescription
77+
schemaRegistryResource ResourceDescription
7478
)
7579
resourceDescription := SendResourceDescriptorQuery()
7680
for _, resource := range resourceDescription.Data {
@@ -80,6 +84,8 @@ func NewCCloudCollector() CCloudCollector {
8084
kafkaResource = resource
8185
} else if resource.Type == "ksql" {
8286
ksqlResource = resource
87+
} else if resource.Type == "schema_registry" {
88+
schemaRegistryResource = resource
8389
}
8490
}
8591

@@ -95,14 +101,20 @@ func NewCCloudCollector() CCloudCollector {
95101
log.WithField("descriptorResponse", resourceDescription).Fatalln("No ksqlDB resource available")
96102
}
97103

104+
if schemaRegistryResource.Type == "" {
105+
log.WithField("descriptorResponse", resourceDescription).Fatalln("No SchemaRegistry resource available")
106+
}
107+
98108
collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)}
99109
kafkaCollector := NewKafkaCCloudCollector(collector, kafkaResource)
100110
connectorCollector := NewConnectorCCloudCollector(collector, connectorResource)
101111
ksqlCollector := NewKsqlCCloudCollector(collector, ksqlResource)
112+
schemaRegistryCollector := NewSchemaRegistryCCloudCollector(collector, schemaRegistryResource)
102113

103114
collector.kafkaCollector = &kafkaCollector
104115
collector.connectorCollector = &connectorCollector
105116
collector.ksqlCollector = &ksqlCollector
117+
collector.schemaRegistryCollector = &schemaRegistryCollector
106118

107119
return collector
108120
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package collector
2+
3+
//
4+
// collector.go
5+
// Copyright (C) 2020 gaspar_d </var/spool/mail/gaspar_d>
6+
//
7+
// Distributed under terms of the MIT license.
8+
//
9+
10+
import (
11+
"fmt"
12+
"strconv"
13+
"sync"
14+
"time"
15+
16+
"github.com/prometheus/client_golang/prometheus"
17+
log "github.com/sirupsen/logrus"
18+
)
19+
20+
// SchemaRegistryCCloudCollector is a custom prometheus collector to collect data from
21+
// Confluent Cloud Metrics API. It fetches schema_registry resources types metrics
22+
type SchemaRegistryCCloudCollector struct {
23+
metrics map[string]CCloudCollectorMetric
24+
rules []Rule
25+
ccloud CCloudCollector
26+
resource ResourceDescription
27+
}
28+
29+
// Describe collect all metrics for ccloudexporter
30+
func (cc SchemaRegistryCCloudCollector) Describe(ch chan<- *prometheus.Desc) {
31+
for _, desc := range cc.metrics {
32+
ch <- desc.desc
33+
desc.duration.Describe(ch)
34+
}
35+
}
36+
37+
// Collect all metrics for Prometheus
38+
// to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine
39+
func (cc SchemaRegistryCCloudCollector) Collect(ch chan<- prometheus.Metric, wg *sync.WaitGroup) {
40+
for _, rule := range cc.rules {
41+
for _, metric := range rule.Metrics {
42+
_, present := cc.metrics[metric]
43+
if !present {
44+
continue
45+
}
46+
47+
if len(rule.SchemaRegistries) <= 0 {
48+
log.WithFields(log.Fields{"rule": rule}).Errorln("SchemaRegistries rule has no SchemaRegistry ID specified")
49+
continue
50+
}
51+
52+
wg.Add(1)
53+
go cc.CollectMetricsForRule(wg, ch, rule, cc.metrics[metric])
54+
}
55+
}
56+
}
57+
58+
// CollectMetricsForRule collects all metrics for a specific rule
59+
func (cc SchemaRegistryCCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) {
60+
defer wg.Done()
61+
query := BuildSchemaRegistryQuery(ccmetric.metric, rule.SchemaRegistries, cc.resource)
62+
log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created")
63+
optimizedQuery, additionalLabels := OptimizeQuery(query)
64+
log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized")
65+
durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id))
66+
timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set))
67+
response, err := SendQuery(optimizedQuery)
68+
timer.ObserveDuration()
69+
ch <- durationMetric
70+
if err != nil {
71+
log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed")
72+
return
73+
}
74+
log.WithFields(log.Fields{"response": response}).Traceln("Response has been received")
75+
cc.handleResponse(response, ccmetric, ch, rule, additionalLabels)
76+
}
77+
78+
func (cc SchemaRegistryCCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) {
79+
desc := ccmetric.desc
80+
for _, dataPoint := range response.Data {
81+
value, ok := dataPoint["value"].(float64)
82+
if !ok {
83+
log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float")
84+
return
85+
}
86+
87+
labels := []string{}
88+
for _, label := range ccmetric.labels {
89+
name := cc.resource.datapointFieldNameForLabel(label)
90+
91+
// Could be remove when fix is done in descriptor.go line 95
92+
if name == "resource.schema.registry.id" {
93+
name = "resource.schema_registry.id"
94+
}
95+
96+
labelValue, labelValuePresent := dataPoint[name].(string)
97+
if !labelValuePresent {
98+
labelValue, labelValuePresent = additionalLabels[name]
99+
}
100+
labels = append(labels, labelValue)
101+
}
102+
metric := prometheus.MustNewConstMetric(
103+
desc,
104+
prometheus.GaugeValue,
105+
value,
106+
labels...,
107+
)
108+
109+
if Context.NoTimestamp {
110+
ch <- metric
111+
} else {
112+
timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"]))
113+
if err != nil {
114+
log.WithError(err).Errorln("Can not parse timestamp, ignoring the response")
115+
return
116+
}
117+
metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric)
118+
ch <- metricWithTime
119+
}
120+
}
121+
}
122+
123+
// NewSchemaRegistryCCloudCollector create a new Confluent Cloud SchemaRegistry collector
124+
func NewSchemaRegistryCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) SchemaRegistryCCloudCollector {
125+
collector := SchemaRegistryCCloudCollector{
126+
rules: Context.GetSchemaRegistryRules(),
127+
metrics: make(map[string]CCloudCollectorMetric),
128+
ccloud: ccloudcollecter,
129+
resource: resource,
130+
}
131+
descriptorResponse := SendDescriptorQuery(resource.Type)
132+
log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received")
133+
mapOfWhiteListedMetrics := Context.GetMapOfMetrics("io.confluent.kafka.schema_registry")
134+
135+
for _, metr := range descriptorResponse.Data {
136+
_, metricPresent := mapOfWhiteListedMetrics[metr.Name]
137+
if !metricPresent {
138+
continue
139+
}
140+
delete(mapOfWhiteListedMetrics, metr.Name)
141+
var labels []string
142+
for _, metrLabel := range metr.Labels {
143+
labels = append(labels, metrLabel.Key)
144+
}
145+
146+
for _, rsrcLabel := range resource.Labels {
147+
labels = append(labels, GetPrometheusNameForLabel(rsrcLabel.Key))
148+
}
149+
desc := prometheus.NewDesc(
150+
"ccloud_metric_schema_registry_"+GetNiceNameForMetric(metr),
151+
metr.Description,
152+
labels,
153+
nil,
154+
)
155+
156+
requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{
157+
Name: "ccloud_metrics_api_request_latency",
158+
Help: "Metrics API request latency",
159+
ConstLabels: map[string]string{"metric": metr.Name},
160+
}, []string{"ruleNumber"})
161+
162+
metric := CCloudCollectorMetric{
163+
metric: metr,
164+
desc: desc,
165+
duration: requestDuration,
166+
labels: labels,
167+
}
168+
collector.metrics[metr.Name] = metric
169+
}
170+
171+
if len(mapOfWhiteListedMetrics) > 0 {
172+
log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API")
173+
}
174+
175+
return collector
176+
}

cmd/internal/collector/context.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Rule struct {
2929
Clusters []string `mapstructure:"clusters"`
3030
Connectors []string `mapstructure:"connectors"`
3131
Ksql []string `mapstructure:"ksqls"`
32+
SchemaRegistries []string `mapstructure:"schemaregistries"`
3233
Metrics []string `mapstructure:"metrics"`
3334
GroupByLabels []string `mapstructure:"labels"`
3435
cachedIgnoreGlobalResultForTopic map[TopicClusterMetric]bool
@@ -72,6 +73,7 @@ var DefaultMetrics = []string{
7273
"io.confluent.kafka.connect/sent_records",
7374
"io.confluent.kafka.connect/dead_letter_queue_records",
7475
"io.confluent.kafka.ksql/streaming_unit_count",
76+
"io.confluent.kafka.schema_registry/schema_count",
7577
}
7678

7779
// GetMapOfMetrics returns the whitelist of metrics in a map
@@ -140,6 +142,18 @@ func (context ExporterContext) GetKsqlRules() []Rule {
140142
return ksqlRules
141143
}
142144

145+
// GetSchemaRegistryRules return all rules associated to at least one Schema Registry instance
146+
func (context ExporterContext) GetSchemaRegistryRules() []Rule {
147+
schemaRegistryRules := make([]Rule, 0)
148+
for _, irule := range Context.Rules {
149+
if len(irule.SchemaRegistries) > 0 {
150+
schemaRegistryRules = append(schemaRegistryRules, irule)
151+
}
152+
}
153+
154+
return schemaRegistryRules
155+
}
156+
143157
// ShouldIgnoreResultForRule returns true if the result for this topic need to be ignored for this rule.
144158
// Some results might be ignored as they are defined in another rule, thus global and override result
145159
// could conflict if we do not ignore the global result

cmd/internal/collector/descriptor.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ package collector
77
// Distributed under terms of the MIT license.
88
//
99

10-
import "strings"
11-
import "encoding/json"
12-
import "io/ioutil"
13-
import log "github.com/sirupsen/logrus"
10+
import (
11+
"encoding/json"
12+
"io/ioutil"
13+
"strings"
14+
15+
log "github.com/sirupsen/logrus"
16+
)
1417

1518
// DescriptorMetricResponse is the response from Confluent Cloud API metric endpoint
1619
// This is the JSON structure for the endpoint
@@ -54,10 +57,11 @@ type ResourceDescription struct {
5457

5558
var (
5659
excludeListForMetric = map[string]string{
57-
"io.confluent.kafka.server": "",
58-
"io.confluent.kafka.connect": "",
59-
"io.confluent.kafka.ksql": "",
60-
"delta": "",
60+
"io.confluent.kafka.server": "",
61+
"io.confluent.kafka.connect": "",
62+
"io.confluent.kafka.ksql": "",
63+
"io.confluent.kafka.schema_registry": "",
64+
"delta": "",
6165
}
6266
descriptorURI = "v2/metrics/cloud/descriptors/metrics"
6367
descriptorResourceURI = "v2/metrics/cloud/descriptors/resources"
@@ -88,6 +92,8 @@ func (resource ResourceDescription) hasLabel(label string) bool {
8892
func (resource ResourceDescription) datapointFieldNameForLabel(label string) string {
8993
if resource.hasLabel(label) {
9094
return "resource." + strings.Replace(label, "_", ".", -1)
95+
// TODO fix it to work with schema_registry_id in param. Must return resource.schema_registry.id
96+
// When fix, could remove hard coded conversion in collector_schemaregistry.go line 91->93
9197
}
9298
return "metric." + label
9399
}

0 commit comments

Comments
 (0)