Skip to content

Commit

Permalink
Merge pull request #19 from jyates/11_export-partition-status
Browse files Browse the repository at this point in the history
#11 Export partition and group status as metrics
  • Loading branch information
jirwin authored Jul 9, 2018
2 parents 278ac5b + d313781 commit aacae76
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 9 deletions.
11 changes: 10 additions & 1 deletion burrow-exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ func main() {
Usage: "Burrow API version to leverage",
Value: 2,
},
cli.BoolFlag{
Name: "skip-partition-status",
Usage: "Skip exporting the per-partition status",
},
cli.BoolFlag{
Name: "skip-group-status",
Usage: "Skip exporting the per-group status",
},
}

app.Action = func(c *cli.Context) error {
Expand All @@ -62,7 +70,8 @@ func main() {

ctx, cancel := context.WithCancel(context.Background())

exporter := burrow_exporter.MakeBurrowExporter(c.String("burrow-addr"), c.Int("api-version"), c.String("metrics-addr"), c.Int("interval"))
exporter := burrow_exporter.MakeBurrowExporter(c.String("burrow-addr"), c.Int("api-version"),
c.String("metrics-addr"), c.Int("interval"), c.Bool("skip-partition-status"), c.Bool("skip-group-status"))
go exporter.Start(ctx)

<-done
Expand Down
37 changes: 29 additions & 8 deletions burrow_exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
)

type BurrowExporter struct {
client *BurrowClient
metricsListenAddr string
interval int
wg sync.WaitGroup
client *BurrowClient
metricsListenAddr string
interval int
wg sync.WaitGroup
skipPartitionStatus bool
skipConsumerStatus bool
}

func (be *BurrowExporter) processGroup(cluster, group string) {
Expand All @@ -46,6 +48,15 @@ func (be *BurrowExporter) processGroup(cluster, group string) {
"partition": strconv.Itoa(int(partition.Partition)),
}).Set(float64(partition.End.Offset))

if !be.skipPartitionStatus {
KafkaConsumerPartitionCurrentStatus.With(prometheus.Labels{
"cluster": status.Status.Cluster,
"group": status.Status.Group,
"topic": partition.Topic,
"partition": strconv.Itoa(int(partition.Partition)),
}).Set(float64(Status[partition.Status]))
}

KafkaConsumerPartitionMaxOffset.With(prometheus.Labels{
"cluster": status.Status.Cluster,
"group": status.Status.Group,
Expand All @@ -58,6 +69,13 @@ func (be *BurrowExporter) processGroup(cluster, group string) {
"cluster": status.Status.Cluster,
"group": status.Status.Group,
}).Set(float64(status.Status.TotalLag))

if !be.skipConsumerStatus {
KafkaConsumerStatus.With(prometheus.Labels{
"cluster": status.Status.Cluster,
"group": status.Status.Group,
}).Set(float64(Status[status.Status.Status]))
}
}

func (be *BurrowExporter) processTopic(cluster, topic string) {
Expand Down Expand Up @@ -189,10 +207,13 @@ func (be *BurrowExporter) mainLoop(ctx context.Context) {
}
}

func MakeBurrowExporter(burrowUrl string, apiVersion int, metricsAddr string, interval int) *BurrowExporter {
func MakeBurrowExporter(burrowUrl string, apiVersion int, metricsAddr string, interval int, skipPartitionStatus bool,
skipConsumerStatus bool) *BurrowExporter {
return &BurrowExporter{
client: MakeBurrowClient(burrowUrl, apiVersion),
metricsListenAddr: metricsAddr,
interval: interval,
client: MakeBurrowClient(burrowUrl, apiVersion),
metricsListenAddr: metricsAddr,
interval: interval,
skipPartitionStatus: skipPartitionStatus,
skipConsumerStatus: skipConsumerStatus,
}
}
27 changes: 27 additions & 0 deletions burrow_exporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ package burrow_exporter

import "github.com/prometheus/client_golang/prometheus"

// If we are missing a status, it will return 0
var Status = map[string]int{
"NOTFOUND": 1,
"OK": 2,
"WARN": 3,
"ERR": 4,
"STOP": 5,
"STALL": 6,
"REWIND": 7,
}

var (
KafkaConsumerPartitionLag = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand All @@ -17,6 +28,13 @@ var (
},
[]string{"cluster", "group", "topic", "partition"},
)
KafkaConsumerPartitionCurrentStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_burrow_partition_status",
Help: "The status of a partition as reported by burrow.",
},
[]string{"cluster", "group", "topic", "partition"},
)
KafkaConsumerPartitionMaxOffset = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_burrow_partition_max_offset",
Expand All @@ -31,6 +49,13 @@ var (
},
[]string{"cluster", "group"},
)
KafkaConsumerStatus = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_burrow_status",
Help: "The status of a partition as reported by burrow.",
},
[]string{"cluster", "group"},
)
KafkaTopicPartitionOffset = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kafka_burrow_topic_partition_offset",
Expand All @@ -43,7 +68,9 @@ var (
func init() {
prometheus.MustRegister(KafkaConsumerPartitionLag)
prometheus.MustRegister(KafkaConsumerPartitionCurrentOffset)
prometheus.MustRegister(KafkaConsumerPartitionCurrentStatus)
prometheus.MustRegister(KafkaConsumerPartitionMaxOffset)
prometheus.MustRegister(KafkaConsumerTotalLag)
prometheus.MustRegister(KafkaConsumerStatus)
prometheus.MustRegister(KafkaTopicPartitionOffset)
}

0 comments on commit aacae76

Please sign in to comment.