Skip to content

Commit 03528fd

Browse files
authored
Use delivery channel to get kafka producer metrics (#162)
Confluent provides the channel interface to get delivery reports We should use that to get kafka metrics Add more metrics for other failure mode Disable reliable acks since it will not work as expected
1 parent f36e2e5 commit 03528fd

File tree

7 files changed

+55
-38
lines changed

7 files changed

+55
-38
lines changed

config/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,10 @@ func (c *Config) prometheusEnabled() bool {
245245
return false
246246
}
247247

248+
func (c *Config) ReliableAcksDisabled() bool {
249+
return c.ReliableAck == false && c.ReliableAckWorkers == 0
250+
}
251+
248252
// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
249253
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (map[string][]telemetry.Producer, error) {
250254
producers := make(map[telemetry.Dispatcher]telemetry.Producer)
@@ -262,7 +266,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
262266
return nil, errors.New("Expected Kafka to be configured")
263267
}
264268
convertKafkaConfig(c.Kafka)
265-
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.ReliableAckWorkers, c.AckChan, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
269+
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
266270
if err != nil {
267271
return nil, err
268272
}

config/config_initializer.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"encoding/json"
5+
"errors"
56
"flag"
67
"log"
78
"os"
@@ -54,6 +55,10 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
5455
}
5556
config.MetricCollector = metrics.NewCollector(config.Monitoring, logger)
5657

58+
// TODO disble this check when reliable acks are properly supported
59+
if !config.ReliableAcksDisabled() {
60+
return nil, errors.New("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file")
61+
}
5762
config.AckChan = make(chan *telemetry.Record)
5863
return config, err
5964
}

config/config_initializer_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ var _ = Describe("Test application config initialization", func() {
2323
Namespace: "tesla_telemetry",
2424
TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"},
2525
RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30},
26-
ReliableAck: true,
27-
ReliableAckWorkers: 15,
26+
ReliableAck: false,
27+
ReliableAckWorkers: 0,
2828
Kafka: &confluent.ConfigMap{
2929
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
3030
"ssl.ca.location": "kafka.ca",
@@ -73,6 +73,11 @@ var _ = Describe("Test application config initialization", func() {
7373
Expect(loadedConfig).To(Equal(expectedConfig))
7474
})
7575

76+
It("fails when reliable acks are set", func() {
77+
_, err := loadTestApplicationConfig(TestReliableAckConfig)
78+
Expect(err).Should(MatchError("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file"))
79+
})
80+
7681
It("returns an error if config is not appropriate", func() {
7782
_, err := loadTestApplicationConfig(BadTopicConfig)
7883
Expect(err).To(MatchError("invalid character '}' looking for beginning of object key string"))

config/test_configs_test.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ const TestConfig = `{
77
"log_level": "info",
88
"json_log_enable": true,
99
"namespace": "tesla_telemetry",
10-
"reliable_ack": true,
11-
"reliable_ack_workers": 15,
1210
"kafka": {
1311
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
1412
"ssl.ca.location": "kafka.ca",
@@ -61,6 +59,32 @@ const TestSmallConfig = `
6159
}
6260
`
6361

62+
const TestReliableAckConfig = `
63+
{
64+
"host": "127.0.0.1",
65+
"port": 443,
66+
"status_port": 8080,
67+
"namespace": "tesla_telemetry",
68+
"reliable_ack": true,
69+
"reliable_ack_workers": 15,
70+
"kafka": {
71+
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
72+
"ssl.ca.location": "kafka.ca",
73+
"ssl.certificate.location": "kafka.crt",
74+
"ssl.key.location": "kafka.key",
75+
"queue.buffering.max.messages": 1000000
76+
},
77+
"records": {
78+
"FS": ["kafka"]
79+
},
80+
"tls": {
81+
"ca_file": "tesla.ca",
82+
"server_cert": "your_own_cert.crt",
83+
"server_key": "your_own_key.key"
84+
}
85+
}
86+
`
87+
6488
const TestPubsubConfig = `
6589
{
6690
"host": "127.0.0.1",

datastore/kafka/kafka.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type Producer struct {
2222
metricsCollector metrics.MetricCollector
2323
logger *logrus.Logger
2424
airbrakeHandler *airbrake.AirbrakeHandler
25-
reliableAck bool
25+
deliveryChan chan kafka.Event
2626
}
2727

2828
// Metrics stores metrics reported from this package
@@ -41,8 +41,7 @@ var (
4141
)
4242

4343
// NewProducer establishes the kafka connection and define the dispatch method
44-
func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers int,
45-
ackChan chan (*telemetry.Record), prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
44+
func NewProducer(config *kafka.ConfigMap, namespace string, prometheusEnabled bool, metricsCollector metrics.MetricCollector, airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (telemetry.Producer, error) {
4645
registerMetricsOnce(metricsCollector)
4746

4847
kafkaProducer, err := kafka.NewProducer(config)
@@ -57,10 +56,10 @@ func NewProducer(config *kafka.ConfigMap, namespace string, reliableAckWorkers i
5756
prometheusEnabled: prometheusEnabled,
5857
logger: logger,
5958
airbrakeHandler: airbrakeHandler,
60-
reliableAck: reliableAckWorkers > 0,
59+
deliveryChan: make(chan kafka.Event),
6160
}
6261

63-
go producer.handleProducerEvents(ackChan)
62+
go producer.handleProducerEvents()
6463
go producer.reportProducerMetrics()
6564
producer.logger.ActivityLog("kafka_registered", logrus.LogInfo{"namespace": namespace})
6665
return producer, nil
@@ -82,7 +81,7 @@ func (p *Producer) Produce(entry *telemetry.Record) {
8281
// Note: confluent kafka supports the concept of one channel per connection, so we could add those here and get rid of reliableAckWorkers
8382
// ex.: https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/producer_custom_channel_example/producer_custom_channel_example.go#L79
8483
entry.ProduceTime = time.Now()
85-
if err := p.kafkaProducer.Produce(msg, nil); err != nil {
84+
if err := p.kafkaProducer.Produce(msg, p.deliveryChan); err != nil {
8685
p.logError(err)
8786
return
8887
}
@@ -106,21 +105,23 @@ func headersFromRecord(record *telemetry.Record) (headers []kafka.Header) {
106105
return
107106
}
108107

109-
func (p *Producer) handleProducerEvents(ackChan chan (*telemetry.Record)) {
110-
for e := range p.kafkaProducer.Events() {
108+
func (p *Producer) handleProducerEvents() {
109+
for e := range p.deliveryChan {
111110
switch ev := e.(type) {
112111
case kafka.Error:
113112
p.logError(fmt.Errorf("producer_error %v", ev))
114113
case *kafka.Message:
114+
if ev.TopicPartition.Error != nil {
115+
p.logError(fmt.Errorf("topic_partition_error %v", ev))
116+
continue
117+
}
115118
entry, ok := ev.Opaque.(*telemetry.Record)
116119
if !ok {
120+
p.logError(fmt.Errorf("opaque_record_missing %v", ev))
117121
continue
118122
}
119123
metricsRegistry.producerAckCount.Inc(map[string]string{"record_type": entry.TxType})
120124
metricsRegistry.bytesAckTotal.Add(int64(entry.Length()), map[string]string{"record_type": entry.TxType})
121-
if p.reliableAck {
122-
ackChan <- entry
123-
}
124125
default:
125126
p.logger.ActivityLog("kafka_event_ignored", logrus.LogInfo{"event": ev.String()})
126127
}

server/streaming/socket.go

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -324,26 +324,6 @@ func (sm SocketManager) ReportMetricBytesPerRecords(recordType string, byteSize
324324
metricsRegistry.recordCount.Inc(map[string]string{"record_type": recordType})
325325
}
326326

327-
// DatastoreAckProcessor records metrics after acking records
328-
func (sm SocketManager) DatastoreAckProcessor(ackChan chan (*telemetry.Record)) {
329-
for record := range ackChan {
330-
durationMs := time.Since(record.ProduceTime) / time.Millisecond
331-
332-
metricsRegistry.kafkaWriteMs.Observe(int64(durationMs), map[string]string{})
333-
metricsRegistry.kafkaWriteBytesTotal.Add(int64(record.Length()), map[string]string{"record_type": record.TxType})
334-
metricsRegistry.kafkaWriteCount.Inc(map[string]string{"record_type": record.TxType})
335-
336-
if record.Serializer != nil && record.Serializer.ReliableAck() {
337-
if socket := sm.registry.GetSocket(record.SocketID); socket != nil {
338-
metricsRegistry.reliableAckCount.Inc(map[string]string{"record_type": record.TxType})
339-
socket.respondToVehicle(record, nil)
340-
} else {
341-
metricsRegistry.reliableAckMissCount.Inc(map[string]string{"record_type": record.TxType})
342-
}
343-
}
344-
}
345-
}
346-
347327
func registerMetricsOnce(metricsCollector metrics.MetricCollector) {
348328
metricsOnce.Do(func() { registerMetrics(metricsCollector) })
349329
}

test/integration/config.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
"log_level": "info",
66
"json_log_enable": true,
77
"namespace": "tesla_telemetry",
8-
"reliable_ack": true,
9-
"reliable_ack_workers": 15,
108
"kafka": {
119
"bootstrap.servers": "kafka:9092",
1210
"queue.buffering.max.messages": 1000000,

0 commit comments

Comments
 (0)