forked from echojc/kafka-offset-exporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
92 lines (77 loc) · 2.55 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package main
import (
"flag"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/Shopify/sarama"
log "github.com/Sirupsen/logrus"
)
func main() {
brokerString := flag.String("brokers", "", "Kafka brokers to connect to, comma-separated")
topics := flag.String("topics", "", "Only fetch offsets for topics matching this regex (default all)")
groups := flag.String("groups", "", "Also fetch offsets for consumer groups matching this regex (default none)")
port := flag.Int("port", 9000, "Port to export metrics on")
path := flag.String("path", "/", "Path to export metrics on")
refresh := flag.Duration("refresh", 1*time.Minute, "Time between refreshing cluster metadata")
fetchMin := flag.Duration("fetchMin", 15*time.Second, "Min time before requesting updates from broker")
fetchMax := flag.Duration("fetchMax", 40*time.Second, "Max time before requesting updates from broker")
level := flag.String("level", "info", "Logger level")
flag.Parse()
mustSetupLogger(*level)
serverConfig := mustNewServerConfig(*port, *path)
scrapeConfig := mustNewScrapeConfig(*refresh, *fetchMin, *fetchMax, *topics, *groups)
kafka := mustNewKafka(*brokerString)
defer kafka.Close()
enforceGracefulShutdown(func(wg *sync.WaitGroup, shutdown chan struct{}) {
startKafkaScraper(wg, shutdown, kafka, scrapeConfig)
startMetricsServer(wg, shutdown, serverConfig)
})
}
func enforceGracefulShutdown(f func(wg *sync.WaitGroup, shutdown chan struct{})) {
wg := &sync.WaitGroup{}
shutdown := make(chan struct{})
signals := make(chan os.Signal)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP)
go func() {
<-signals
close(shutdown)
}()
log.Info("Graceful shutdown enabled")
f(wg, shutdown)
<-shutdown
wg.Wait()
}
func mustNewKafka(brokerString string) sarama.Client {
brokers := strings.Split(brokerString, ",")
for i := range brokers {
brokers[i] = strings.TrimSpace(brokers[i])
if !strings.ContainsRune(brokers[i], ':') {
brokers[i] += ":9092"
}
}
log.WithField("brokers.bootstrap", brokers).Info("connecting to cluster with bootstrap hosts")
cfg := sarama.NewConfig()
cfg.Version = sarama.V0_10_0_0
client, err := sarama.NewClient(brokers, cfg)
if err != nil {
log.Fatal(err)
}
var addrs []string
for _, b := range client.Brokers() {
addrs = append(addrs, b.Addr())
}
log.WithField("brokers", addrs).Info("connected to cluster")
return client
}
func mustSetupLogger(level string) {
logLevel, err := log.ParseLevel(level)
if err != nil {
log.Fatal(err)
}
log.SetLevel(logLevel)
log.SetFormatter(&log.JSONFormatter{})
}