From 2165fa4443b09390d372aa6b649be8fd69c58e58 Mon Sep 17 00:00:00 2001 From: sebco59 Date: Thu, 3 Jun 2021 18:00:20 +0200 Subject: [PATCH] Add Schema Registry Metrics --- README.md | 24 ++- cmd/internal/collector/collector.go | 28 ++- .../collector/collector_schemaregistry.go | 176 ++++++++++++++++++ cmd/internal/collector/context.go | 14 ++ cmd/internal/collector/descriptor.go | 22 ++- cmd/internal/collector/option.go | 19 +- cmd/internal/collector/query.go | 60 +++++- 7 files changed, 305 insertions(+), 38 deletions(-) create mode 100644 cmd/internal/collector/collector_schemaregistry.go diff --git a/README.md b/README.md index 357ca98..9107965 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ To use the exporter, the following environment variables need to be specified: ## Usage ```shell -./ccloudexporter [-cluster ] [-connector ] [-ksqlDB ] +./ccloudexporter [-cluster ] [-connector ] [-ksqlDB ] [-schemaRegistry ] ``` ### Options @@ -34,6 +34,8 @@ Usage of ./ccloudexporter: Granularity for the metrics query, by default set to 1 minutes (default "PT1M") -ksqlDB string Comma separated list of ksqlDB application to fetch metric for. If not specified, the environment variable CCLOUD_KSQL will be used + -schemaRegistry string + Comma separated list of Schema Registry ID to fetch metric for. If not specified, the environment variable CCLOUD_SCHEMA_REGISTRY will be used -listener string Listener for the HTTP interface (default ":2112") -log-pretty-print @@ -122,14 +124,15 @@ If you do not provide a configuration file, the exporter creates one from the pr #### Rule configuration -| Key | Description | -|--------------------|---------------------------------------------------------------------------------------------------------------| -| rules.clusters | List of Kafka clusters to fetch metrics for | -| rules.connectors | List of connectors to fetch metrics for | -| rules.ksqls | List of ksqlDB applications to fetch metrics for | -| rules.labels | Labels to exposed to Prometheus and group by in the query | -| rules.topics | Optional list of topics to filter the metrics | -| rules.metrics | List of metrics to gather | +| Key | Description | +|------------------------|---------------------------------------------------------------------------------------------------------------| +| rules.clusters | List of Kafka clusters to fetch metrics for | +| rules.connectors | List of connectors to fetch metrics for | +| rules.ksqls | List of ksqlDB applications to fetch metrics for | +| rules.schemaRegistries | List of Schema Registries id to fetch metrics for | +| rules.labels | Labels to exposed to Prometheus and group by in the query | +| rules.topics | Optional list of topics to filter the metrics | +| rules.metrics | List of metrics to gather | ### Examples of configuration files @@ -154,6 +157,8 @@ rules: - $CCLOUD_CONNECTOR ksqls: - $CCLOUD_KSQL + schemaRegistries: + - $CCLOUD_SCHEMA_REGISTRY metrics: - io.confluent.kafka.server/received_bytes - io.confluent.kafka.server/sent_bytes @@ -170,6 +175,7 @@ rules: - io.confluent.kafka.connect/sent_records - io.confluent.kafka.connect/dead_letter_queue_records - io.confluent.kafka.ksql/streaming_unit_count + - io.confluent.kafka.schema_registry/schema_count labels: - kafka_id - topic diff --git a/cmd/internal/collector/collector.go b/cmd/internal/collector/collector.go index 730351f..6e52786 100644 --- a/cmd/internal/collector/collector.go +++ b/cmd/internal/collector/collector.go @@ -29,11 +29,12 @@ type CCloudCollectorMetric struct { // CCloudCollector is a custom prometheu collector to collect data from // Confluent Cloud Metrics API type CCloudCollector struct { - metrics map[string]CCloudCollectorMetric - rules []Rule - kafkaCollector *KafkaCCloudCollector - connectorCollector *ConnectorCCloudCollector - ksqlCollector *KsqlCCloudCollector + metrics map[string]CCloudCollectorMetric + rules []Rule + kafkaCollector *KafkaCCloudCollector + connectorCollector *ConnectorCCloudCollector + ksqlCollector *KsqlCCloudCollector + schemaRegistryCollector *SchemaRegistryCCloudCollector } var ( @@ -45,6 +46,7 @@ func (cc CCloudCollector) Describe(ch chan<- *prometheus.Desc) { cc.kafkaCollector.Describe(ch) cc.connectorCollector.Describe(ch) cc.ksqlCollector.Describe(ch) + cc.schemaRegistryCollector.Describe(ch) } // Collect all metrics for Prometheus @@ -54,6 +56,7 @@ func (cc CCloudCollector) Collect(ch chan<- prometheus.Metric) { cc.kafkaCollector.Collect(ch, &wg) cc.connectorCollector.Collect(ch, &wg) cc.ksqlCollector.Collect(ch, &wg) + cc.schemaRegistryCollector.Collect(ch, &wg) wg.Wait() } @@ -68,9 +71,10 @@ func NewCCloudCollector() CCloudCollector { } var ( - connectorResource ResourceDescription - kafkaResource ResourceDescription - ksqlResource ResourceDescription + connectorResource ResourceDescription + kafkaResource ResourceDescription + ksqlResource ResourceDescription + schemaRegistryResource ResourceDescription ) resourceDescription := SendResourceDescriptorQuery() for _, resource := range resourceDescription.Data { @@ -80,6 +84,8 @@ func NewCCloudCollector() CCloudCollector { kafkaResource = resource } else if resource.Type == "ksql" { ksqlResource = resource + } else if resource.Type == "schema_registry" { + schemaRegistryResource = resource } } @@ -95,14 +101,20 @@ func NewCCloudCollector() CCloudCollector { log.WithField("descriptorResponse", resourceDescription).Fatalln("No ksqlDB resource available") } + if schemaRegistryResource.Type == "" { + log.WithField("descriptorResponse", resourceDescription).Fatalln("No SchemaRegistry resource available") + } + collector := CCloudCollector{rules: Context.Rules, metrics: make(map[string]CCloudCollectorMetric)} kafkaCollector := NewKafkaCCloudCollector(collector, kafkaResource) connectorCollector := NewConnectorCCloudCollector(collector, connectorResource) ksqlCollector := NewKsqlCCloudCollector(collector, ksqlResource) + schemaRegistryCollector := NewSchemaRegistryCCloudCollector(collector, schemaRegistryResource) collector.kafkaCollector = &kafkaCollector collector.connectorCollector = &connectorCollector collector.ksqlCollector = &ksqlCollector + collector.schemaRegistryCollector = &schemaRegistryCollector return collector } diff --git a/cmd/internal/collector/collector_schemaregistry.go b/cmd/internal/collector/collector_schemaregistry.go new file mode 100644 index 0000000..694019a --- /dev/null +++ b/cmd/internal/collector/collector_schemaregistry.go @@ -0,0 +1,176 @@ +package collector + +// +// collector.go +// Copyright (C) 2020 gaspar_d +// +// Distributed under terms of the MIT license. +// + +import ( + "fmt" + "strconv" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" +) + +// SchemaRegistryCCloudCollector is a custom prometheus collector to collect data from +// Confluent Cloud Metrics API. It fetches schema_registry resources types metrics +type SchemaRegistryCCloudCollector struct { + metrics map[string]CCloudCollectorMetric + rules []Rule + ccloud CCloudCollector + resource ResourceDescription +} + +// Describe collect all metrics for ccloudexporter +func (cc SchemaRegistryCCloudCollector) Describe(ch chan<- *prometheus.Desc) { + for _, desc := range cc.metrics { + ch <- desc.desc + desc.duration.Describe(ch) + } +} + +// Collect all metrics for Prometheus +// to avoid reaching the scrape_timeout, metrics are fetched in multiple goroutine +func (cc SchemaRegistryCCloudCollector) Collect(ch chan<- prometheus.Metric, wg *sync.WaitGroup) { + for _, rule := range cc.rules { + for _, metric := range rule.Metrics { + _, present := cc.metrics[metric] + if !present { + continue + } + + if len(rule.SchemaRegistries) <= 0 { + log.WithFields(log.Fields{"rule": rule}).Errorln("SchemaRegistries rule has no SchemaRegistry ID specified") + continue + } + + wg.Add(1) + go cc.CollectMetricsForRule(wg, ch, rule, cc.metrics[metric]) + } + } +} + +// CollectMetricsForRule collects all metrics for a specific rule +func (cc SchemaRegistryCCloudCollector) CollectMetricsForRule(wg *sync.WaitGroup, ch chan<- prometheus.Metric, rule Rule, ccmetric CCloudCollectorMetric) { + defer wg.Done() + query := BuildSchemaRegistryQuery(ccmetric.metric, rule.SchemaRegistries, cc.resource) + log.WithFields(log.Fields{"query": query}).Traceln("The following query has been created") + optimizedQuery, additionalLabels := OptimizeQuery(query) + log.WithFields(log.Fields{"optimizedQuery": optimizedQuery, "additionalLabels": additionalLabels}).Traceln("Query has been optimized") + durationMetric, _ := ccmetric.duration.GetMetricWithLabelValues(strconv.Itoa(rule.id)) + timer := prometheus.NewTimer(prometheus.ObserverFunc(durationMetric.Set)) + response, err := SendQuery(optimizedQuery) + timer.ObserveDuration() + ch <- durationMetric + if err != nil { + log.WithError(err).WithFields(log.Fields{"optimizedQuery": optimizedQuery, "response": response}).Errorln("Query did not succeed") + return + } + log.WithFields(log.Fields{"response": response}).Traceln("Response has been received") + cc.handleResponse(response, ccmetric, ch, rule, additionalLabels) +} + +func (cc SchemaRegistryCCloudCollector) handleResponse(response QueryResponse, ccmetric CCloudCollectorMetric, ch chan<- prometheus.Metric, rule Rule, additionalLabels map[string]string) { + desc := ccmetric.desc + for _, dataPoint := range response.Data { + value, ok := dataPoint["value"].(float64) + if !ok { + log.WithField("datapoint", dataPoint["value"]).Errorln("Can not convert result to float") + return + } + + labels := []string{} + for _, label := range ccmetric.labels { + name := cc.resource.datapointFieldNameForLabel(label) + + // Could be remove when fix is done in descriptor.go line 95 + if name == "resource.schema.registry.id" { + name = "resource.schema_registry.id" + } + + labelValue, labelValuePresent := dataPoint[name].(string) + if !labelValuePresent { + labelValue, labelValuePresent = additionalLabels[name] + } + labels = append(labels, labelValue) + } + metric := prometheus.MustNewConstMetric( + desc, + prometheus.GaugeValue, + value, + labels..., + ) + + if Context.NoTimestamp { + ch <- metric + } else { + timestamp, err := time.Parse(time.RFC3339, fmt.Sprint(dataPoint["timestamp"])) + if err != nil { + log.WithError(err).Errorln("Can not parse timestamp, ignoring the response") + return + } + metricWithTime := prometheus.NewMetricWithTimestamp(timestamp, metric) + ch <- metricWithTime + } + } +} + +// NewSchemaRegistryCCloudCollector create a new Confluent Cloud SchemaRegistry collector +func NewSchemaRegistryCCloudCollector(ccloudcollecter CCloudCollector, resource ResourceDescription) SchemaRegistryCCloudCollector { + collector := SchemaRegistryCCloudCollector{ + rules: Context.GetSchemaRegistryRules(), + metrics: make(map[string]CCloudCollectorMetric), + ccloud: ccloudcollecter, + resource: resource, + } + descriptorResponse := SendDescriptorQuery(resource.Type) + log.WithField("descriptor response", descriptorResponse).Traceln("The following response for the descriptor endpoint has been received") + mapOfWhiteListedMetrics := Context.GetMapOfMetrics("io.confluent.kafka.schema_registry") + + for _, metr := range descriptorResponse.Data { + _, metricPresent := mapOfWhiteListedMetrics[metr.Name] + if !metricPresent { + continue + } + delete(mapOfWhiteListedMetrics, metr.Name) + var labels []string + for _, metrLabel := range metr.Labels { + labels = append(labels, metrLabel.Key) + } + + for _, rsrcLabel := range resource.Labels { + labels = append(labels, GetPrometheusNameForLabel(rsrcLabel.Key)) + } + desc := prometheus.NewDesc( + "ccloud_metric_schema_registry_"+GetNiceNameForMetric(metr), + metr.Description, + labels, + nil, + ) + + requestDuration := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ccloud_metrics_api_request_latency", + Help: "Metrics API request latency", + ConstLabels: map[string]string{"metric": metr.Name}, + }, []string{"ruleNumber"}) + + metric := CCloudCollectorMetric{ + metric: metr, + desc: desc, + duration: requestDuration, + labels: labels, + } + collector.metrics[metr.Name] = metric + } + + if len(mapOfWhiteListedMetrics) > 0 { + log.WithField("Ignored metrics", mapOfWhiteListedMetrics).Warnln("The following metrics will not be gathered as they are not exposed by the Metrics API") + } + + return collector +} diff --git a/cmd/internal/collector/context.go b/cmd/internal/collector/context.go index a5578e8..f3e7725 100644 --- a/cmd/internal/collector/context.go +++ b/cmd/internal/collector/context.go @@ -29,6 +29,7 @@ type Rule struct { Clusters []string `mapstructure:"clusters"` Connectors []string `mapstructure:"connectors"` Ksql []string `mapstructure:"ksqls"` + SchemaRegistries []string `mapstructure:"schemaregistries"` Metrics []string `mapstructure:"metrics"` GroupByLabels []string `mapstructure:"labels"` cachedIgnoreGlobalResultForTopic map[TopicClusterMetric]bool @@ -72,6 +73,7 @@ var DefaultMetrics = []string{ "io.confluent.kafka.connect/sent_records", "io.confluent.kafka.connect/dead_letter_queue_records", "io.confluent.kafka.ksql/streaming_unit_count", + "io.confluent.kafka.schema_registry/schema_count", } // GetMapOfMetrics returns the whitelist of metrics in a map @@ -140,6 +142,18 @@ func (context ExporterContext) GetKsqlRules() []Rule { return ksqlRules } +// GetSchemaRegistryRules return all rules associated to at least one Schema Registry instance +func (context ExporterContext) GetSchemaRegistryRules() []Rule { + schemaRegistryRules := make([]Rule, 0) + for _, irule := range Context.Rules { + if len(irule.SchemaRegistries) > 0 { + schemaRegistryRules = append(schemaRegistryRules, irule) + } + } + + return schemaRegistryRules +} + // ShouldIgnoreResultForRule returns true if the result for this topic need to be ignored for this rule. // Some results might be ignored as they are defined in another rule, thus global and override result // could conflict if we do not ignore the global result diff --git a/cmd/internal/collector/descriptor.go b/cmd/internal/collector/descriptor.go index e38f0d4..607804d 100644 --- a/cmd/internal/collector/descriptor.go +++ b/cmd/internal/collector/descriptor.go @@ -7,10 +7,13 @@ package collector // Distributed under terms of the MIT license. // -import "strings" -import "encoding/json" -import "io/ioutil" -import log "github.com/sirupsen/logrus" +import ( + "encoding/json" + "io/ioutil" + "strings" + + log "github.com/sirupsen/logrus" +) // DescriptorMetricResponse is the response from Confluent Cloud API metric endpoint // This is the JSON structure for the endpoint @@ -54,10 +57,11 @@ type ResourceDescription struct { var ( excludeListForMetric = map[string]string{ - "io.confluent.kafka.server": "", - "io.confluent.kafka.connect": "", - "io.confluent.kafka.ksql": "", - "delta": "", + "io.confluent.kafka.server": "", + "io.confluent.kafka.connect": "", + "io.confluent.kafka.ksql": "", + "io.confluent.kafka.schema_registry": "", + "delta": "", } descriptorURI = "v2/metrics/cloud/descriptors/metrics" descriptorResourceURI = "v2/metrics/cloud/descriptors/resources" @@ -88,6 +92,8 @@ func (resource ResourceDescription) hasLabel(label string) bool { func (resource ResourceDescription) datapointFieldNameForLabel(label string) string { if resource.hasLabel(label) { return "resource." + strings.Replace(label, "_", ".", -1) + // TODO fix it to work with schema_registry_id in param. Must return resource.schema_registry.id + // When fix, could remove hard coded conversion in collector_schemaregistry.go line 91->93 } return "metric." + label } diff --git a/cmd/internal/collector/option.go b/cmd/internal/collector/option.go index 997ac9a..cd38809 100644 --- a/cmd/internal/collector/option.go +++ b/cmd/internal/collector/option.go @@ -24,6 +24,7 @@ func ParseOption() { var clusters string var connectors string var ksqlApplications string + var schemaRegistries string var configPath string flag.StringVar(&configPath, "config", "", "Path to configuration file used to override default behavior of ccloudexporter") @@ -34,6 +35,7 @@ func ParseOption() { flag.StringVar(&clusters, "cluster", "", "Comma separated list of cluster ID to fetch metric for. If not specified, the environment variable CCLOUD_CLUSTER will be used") flag.StringVar(&connectors, "connector", "", "Comma separated list of connector ID to fetch metric for. If not specified, the environment variable CCLOUD_CONNECTOR will be used") flag.StringVar(&ksqlApplications, "ksqlDB", "", "Comma separated list of ksqlDB application to fetch metric for. If not specified, the environment variable CCLOUD_KSQL will be used") + flag.StringVar(&schemaRegistries, "schemaRegistry", "", "Comma separated list of Schema Registry ID to fetch metric for. If not specified, the environment variable CCLOUD_SCHEMA_REGISTRY will be used") flag.StringVar(&Context.Listener, "listener", ":2112", "Listener for the HTTP interface") flag.BoolVar(&Context.NoTimestamp, "no-timestamp", false, "Do not propagate the timestamp from the the metrics API to prometheus") versionFlag := flag.Bool("version", false, "Print the current version and exit") @@ -58,6 +60,7 @@ func ParseOption() { clusters = getFromEnvIfEmpty(clusters, "CCLOUD_CLUSTER") connectors = getFromEnvIfEmpty(connectors, "CCLOUD_CONNECTOR") ksqlApplications = getFromEnvIfEmpty(ksqlApplications, "CCLOUD_KSQL") + schemaRegistries = getFromEnvIfEmpty(schemaRegistries, "CCLOUD_SCHEMA_REGISTRY") if configPath != "" { parseConfigFile(configPath) @@ -66,6 +69,7 @@ func ParseOption() { splitEnv(clusters), splitEnv(connectors), splitEnv(ksqlApplications), + splitEnv(schemaRegistries), ) } validateConfiguration() @@ -156,15 +160,16 @@ func parseConfigFile(configPath string) { } } -func createDefaultRule(clusters []string, connectors []string, ksqlDBApplications []string) { +func createDefaultRule(clusters []string, connectors []string, ksqlDBApplications []string, schemaRegistries []string) { Context.Rules = make([]Rule, 1) Context.Rules[0] = Rule{ - id: 0, - Clusters: clusters, - Connectors: connectors, - Ksql: ksqlDBApplications, - Metrics: DefaultMetrics, - GroupByLabels: DefaultGroupingLabels, + id: 0, + Clusters: clusters, + Connectors: connectors, + Ksql: ksqlDBApplications, + SchemaRegistries: schemaRegistries, + Metrics: DefaultMetrics, + GroupByLabels: DefaultGroupingLabels, } } diff --git a/cmd/internal/collector/query.go b/cmd/internal/collector/query.go index ce069fa..cd69c50 100644 --- a/cmd/internal/collector/query.go +++ b/cmd/internal/collector/query.go @@ -8,15 +8,16 @@ package collector // import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" "strings" "time" + + log "github.com/sirupsen/logrus" ) -import "fmt" -import "bytes" -import "errors" -import "io/ioutil" -import "encoding/json" -import log "github.com/sirupsen/logrus" // Query to Confluent Cloud API metric endpoint // This is the JSON structure for the endpoint @@ -236,6 +237,53 @@ func BuildKsqlQuery(metric MetricDescription, ksqlAppIds []string, resource Reso } } +// BuildSchemaRegistryQuery creates a new Query for a metric for a specific schema registry id +// This function will return the main global query, override queries will not be generated +func BuildSchemaRegistryQuery(metric MetricDescription, schemaregistries []string, resource ResourceDescription) Query { + timeFrom := time.Now().Add(time.Duration(-Context.Delay) * time.Second) // the last minute might contains data that is not yet finalized + timeFrom = timeFrom.Add(time.Duration(-timeFrom.Second()) * time.Second) // the seconds need to be stripped to have an effective delay + + aggregation := Aggregation{ + Agg: "SUM", + Metric: metric.Name, + } + + filters := make([]Filter, 0) + + connectorFilters := make([]Filter, 0) + for _, schemaRegistryID := range schemaregistries { + connectorFilters = append(connectorFilters, Filter{ + Field: "resource.schema_registry.id", + Op: "EQ", + Value: schemaRegistryID, + }) + } + + filters = append(filters, Filter{ + Op: "OR", + Filters: connectorFilters, + }) + + filterHeader := FilterHeader{ + Op: "AND", + Filters: filters, + } + + groupBy := make([]string, len(resource.Labels)) + for i, rsrcLabel := range resource.Labels { + groupBy[i] = "resource." + rsrcLabel.Key + } + + return Query{ + Aggreations: []Aggregation{aggregation}, + Filter: filterHeader, + Granularity: Context.Granularity, + GroupBy: groupBy, + Limit: 1000, + Intervals: []string{fmt.Sprintf("%s/%s", timeFrom.Format(time.RFC3339), Context.Granularity)}, + } +} + // SendQuery sends a query to Confluent Cloud API metrics and wait for the response synchronously func SendQuery(query Query) (QueryResponse, error) { jsonQuery, err := json.Marshal(query)