forked from DataDog/kafka-kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metrics.go
127 lines (99 loc) · 3.07 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"fmt"
"strconv"
"strings"
"time"
dd "github.com/zorkian/go-datadog-api"
)
// Both functions here fetch 2x the span in duration to handle lagging
// metrics from the DD API. This should result in a max of two rollup values
// per timeseries. We then choose the latest non-nil value.
// It's also OK to ignore errors beyond the metrics query. Sometimes one or more
// timeseries may not have been returned, but if we're not using them, it doesn't
// matter. Error handling for required but missing metrics is handled elsewhere
// in metricsfetcher.
func partitionMetrics(c *Config) (map[string]map[string]map[string]float64, error) {
start := time.Now().Add(-time.Duration(c.Span*2) * time.Second).Unix()
o, err := c.Client.QueryMetrics(start, time.Now().Unix(), c.PartnQuery)
if err != nil {
return nil, err
}
d := map[string]map[string]map[string]float64{}
for _, ts := range o {
topic := tagValFromScope(ts.GetScope(), "topic")
// Cope with the double underscore dedupe in the __consumer_offsets topic.
if topic == "_consumer_offsets" {
topic = "__consumer_offsets"
}
partition := tagValFromScope(ts.GetScope(), "partition")
// Get the latest value.
val, err := latestValue(ts.Points)
if err != nil {
continue
}
if _, exists := d[topic]; !exists {
d[topic] = map[string]map[string]float64{}
}
d[topic][partition] = map[string]float64{}
d[topic][partition]["Size"] = val
}
return d, nil
}
func brokerMetrics(c *Config) (map[string]map[string]float64, error) {
start := time.Now().Add(-time.Duration(c.Span*2) * time.Second).Unix()
o, err := c.Client.QueryMetrics(start, time.Now().Unix(), c.BrokerQuery)
if err != nil {
return nil, err
}
// Populate.
d := map[string]map[string]float64{}
for _, ts := range o {
broker := tagValFromScope(ts.GetScope(), c.BrokerIDTag)
// Check that the tag value is actually a broker ID. An improperly tagged or
// untagged broker may have "N/A" or some other invalid value.
if _, err := strconv.Atoi(broker); err != nil {
continue
}
// Get the latest value.
val, err := latestValue(ts.Points)
if err != nil {
continue
}
if _, exists := d[broker]; !exists {
d[broker] = map[string]float64{}
}
d[broker]["StorageFree"] = val
}
return d, nil
}
// latestValue takes a []dd.DataPoint and returns the most recent (in time)
// non-nil datapoint.
func latestValue(points []dd.DataPoint) (float64, error) {
for i := len(points) - 1; i >= 0; i-- {
val := points[i][1]
if val != nil {
return *val, nil
}
}
return 0, fmt.Errorf("no value found")
}
// tagValFromScope takes a metric scope string and a tag and returns that tag's value.
func tagValFromScope(scope, tag string) string {
ts := strings.Split(scope, ",")
return valFromTags(ts, tag)
}
// valFromTags takes a []string of tags and a key, returning the value for the key.
func valFromTags(tags []string, key string) string {
var v []string
for _, tag := range tags {
if strings.HasPrefix(tag, key+":") {
v = strings.Split(tag, ":")
break
}
}
if len(v) > 1 {
return v[1]
}
return ""
}