forked from echojc/kafka-offset-exporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmetrics.go
95 lines (86 loc) · 1.83 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"context"
"fmt"
"math"
"net/http"
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
var (
metricOffsetOldest = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_offset_oldest",
Help: "Oldest offset for a partition",
},
[]string{
"topic",
"partition",
},
)
metricOffsetNewest = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_offset_newest",
Help: "Newest offset for a partition",
},
[]string{
"topic",
"partition",
},
)
metricOffsetConsumer = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_offset_consumer",
Help: "Current offset for a consumer group",
},
[]string{
"topic",
"partition",
"group",
},
)
)
func init() {
prometheus.MustRegister(metricOffsetOldest)
prometheus.MustRegister(metricOffsetNewest)
prometheus.MustRegister(metricOffsetConsumer)
}
type serverConfig struct {
port int
path string
}
func mustNewServerConfig(port int, path string) serverConfig {
if port < 0 || port > math.MaxUint16 {
log.Fatal("Invalid port number")
}
return serverConfig{
port: port,
path: path,
}
}
func startMetricsServer(wg *sync.WaitGroup, shutdown chan struct{}, cfg serverConfig) {
go func() {
wg.Add(1)
defer wg.Done()
mux := http.NewServeMux()
mux.Handle(cfg.path, promhttp.Handler())
srv := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.port),
Handler: mux,
}
go func() {
log.WithField("port", cfg.port).
WithField("path", cfg.path).
Info("Starting metrics HTTP server")
srv.ListenAndServe()
}()
<-shutdown
log.Info("Shutting down metrics HTTP server")
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
srv.Shutdown(ctx)
}()
}