Skip to content

Commit be065fb

Browse files
authored
Add support for reliable acks based on configuration (#167)
Implement reliable acks in fleet telemetry which will allow to signal back to the vehicle that the request was processed successfully
1 parent 3f39c7c commit be065fb

21 files changed

+412
-193
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,10 @@ The following [dispatchers](./telemetry/producer.go#L10-L19) are supported
158158
* ZMQ: Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
159159
* Logger: This is a simple STDOUT logger that serializes the protos to json.
160160

161-
>NOTE: To add a new dispatcher, please provide integration tests and updated documentation. To serialize dispatcher data as json instead of protobufs, add a config `transmit_decoded_records` and set value to `true` as shown [here](config/test_configs_test.go#L104)
161+
>NOTE: To add a new dispatcher, please provide integration tests and updated documentation. To serialize dispatcher data as json instead of protobufs, add a config `transmit_decoded_records` and set value to `true` as shown [here](config/test_configs_test.go#L186)
162+
163+
## Reliable Acks
164+
Fleet telemetry allows you to send ack messages back to the vehicle. This is useful for applications that need to ensure the data was received and processed. To enable this feature, set `reliable_ack_sources` to one of configured dispatchers (`kafka`,`kinesis`,`pubsub`,`zmq`) in the config file. You can only set reliable acks to one dispatcher per recordType. See [here](./test/integration/config.json#L8) for sample config.
162165

163166
## Metrics
164167
Configure and use Prometheus or a StatsD-interface supporting data store for metrics. The integration test runs fleet telemetry with [grafana](https://grafana.com/docs/grafana/latest/datasources/google-cloud-monitoring/), which is compatible with prometheus. It also has an example dashboard which tracks important metrics related to the hosted server. Sample screenshot for the [sample dashboard](./test/integration/grafana/provisioning/dashboards/dashboard.json):-

config/config.go

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,8 @@ type Config struct {
5353
// RateLimit is a configuration for the ratelimit
5454
RateLimit *RateLimit `json:"rate_limit,omitempty"`
5555

56-
// ReliableAck if true, the server will send an ack back to the client only when the message has been stored in a datastore
57-
ReliableAck bool `json:"reliable_ack,omitempty"`
58-
59-
// ReliableAckWorkers is the number of workers that will handle the acknowledgment
60-
ReliableAckWorkers int `json:"reliable_ack_workers,omitempty"`
56+
// ReliableAckSources is a mapping of record types to a dispatcher that will be used for reliable ack
57+
ReliableAckSources map[string]telemetry.Dispatcher `json:"reliable_ack_sources,omitempty"`
6158

6259
// Kafka is a configuration for the standard librdkafka configuration properties
6360
// seen here: https://raw.githubusercontent.com/confluentinc/librdkafka/master/CONFIGURATION.md
@@ -245,12 +242,13 @@ func (c *Config) prometheusEnabled() bool {
245242
return false
246243
}
247244

248-
func (c *Config) ReliableAcksDisabled() bool {
249-
return c.ReliableAck == false && c.ReliableAckWorkers == 0
250-
}
251-
252245
// ConfigureProducers validates and establishes connections to the producers (kafka/pubsub/logger)
253246
func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, logger *logrus.Logger) (map[string][]telemetry.Producer, error) {
247+
reliableAckSources, err := c.configureReliableAckSources()
248+
if err != nil {
249+
return nil, err
250+
}
251+
254252
producers := make(map[telemetry.Dispatcher]telemetry.Producer)
255253
producers[telemetry.Logger] = simple.NewProtoLogger(logger)
256254

@@ -266,7 +264,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
266264
return nil, errors.New("Expected Kafka to be configured")
267265
}
268266
convertKafkaConfig(c.Kafka)
269-
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
267+
kafkaProducer, err := kafka.NewProducer(c.Kafka, c.Namespace, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kafka], logger)
270268
if err != nil {
271269
return nil, err
272270
}
@@ -277,7 +275,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
277275
if c.Pubsub == nil {
278276
return nil, errors.New("Expected Pubsub to be configured")
279277
}
280-
googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, logger)
278+
googleProducer, err := googlepubsub.NewProducer(context.Background(), c.prometheusEnabled(), c.Pubsub.ProjectID, c.Namespace, c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Pubsub], logger)
281279
if err != nil {
282280
return nil, err
283281
}
@@ -293,7 +291,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
293291
maxRetries = *c.Kinesis.MaxRetries
294292
}
295293
streamMapping := c.CreateKinesisStreamMapping(recordNames)
296-
kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, logger)
294+
kinesis, err := kinesis.NewProducer(maxRetries, streamMapping, c.Kinesis.OverrideHost, c.prometheusEnabled(), c.MetricCollector, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.Kinesis], logger)
297295
if err != nil {
298296
return nil, err
299297
}
@@ -304,7 +302,7 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
304302
if c.ZMQ == nil {
305303
return nil, errors.New("Expected ZMQ to be configured")
306304
}
307-
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, logger)
305+
zmqProducer, err := zmq.NewProducer(context.Background(), c.ZMQ, c.MetricCollector, c.Namespace, airbrakeHandler, c.AckChan, reliableAckSources[telemetry.ZMQ], logger)
308306
if err != nil {
309307
return nil, err
310308
}
@@ -327,6 +325,43 @@ func (c *Config) ConfigureProducers(airbrakeHandler *airbrake.AirbrakeHandler, l
327325
return dispatchProducerRules, nil
328326
}
329327

328+
func (c *Config) configureReliableAckSources() (map[telemetry.Dispatcher]map[string]interface{}, error) {
329+
reliableAckSources := make(map[telemetry.Dispatcher]map[string]interface{}, 0)
330+
for txType, dispatchRule := range c.ReliableAckSources {
331+
if dispatchRule == telemetry.Logger {
332+
return nil, fmt.Errorf("logger cannot be configured as reliable ack for record: %s", txType)
333+
}
334+
dispatchers, ok := c.Records[txType]
335+
if !ok {
336+
return nil, fmt.Errorf("%s cannot be configured as reliable ack for record: %s since no record mapping exists", dispatchRule, txType)
337+
}
338+
dispatchRuleFound := false
339+
validDispatchers := parseValidDispatchers(dispatchers)
340+
for _, dispatcher := range validDispatchers {
341+
if dispatcher == dispatchRule {
342+
dispatchRuleFound = true
343+
reliableAckSources[dispatchRule] = map[string]interface{}{txType: true}
344+
break
345+
}
346+
}
347+
if !dispatchRuleFound {
348+
return nil, fmt.Errorf("%s cannot be configured as reliable ack for record: %s. Valid datastores configured %v", dispatchRule, txType, validDispatchers)
349+
}
350+
}
351+
return reliableAckSources, nil
352+
}
353+
354+
// parseValidDispatchers removes no-op dispatcher from the input i.e. Logger
355+
func parseValidDispatchers(input []telemetry.Dispatcher) []telemetry.Dispatcher {
356+
var result []telemetry.Dispatcher
357+
for _, v := range input {
358+
if v != telemetry.Logger {
359+
result = append(result, v)
360+
}
361+
}
362+
return result
363+
}
364+
330365
// convertKafkaConfig will prioritize int over float
331366
// see: https://github.com/confluentinc/confluent-kafka-go/blob/cde2827bc49655eca0f9ce3fc1cda13cb6cdabc9/kafka/config.go#L108-L125
332367
func convertKafkaConfig(input *confluent.ConfigMap) {

config/config_initializer.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package config
22

33
import (
44
"encoding/json"
5-
"errors"
65
"flag"
76
"log"
87
"os"
@@ -54,11 +53,6 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
5453
return nil, err
5554
}
5655
config.MetricCollector = metrics.NewCollector(config.Monitoring, logger)
57-
58-
// TODO disble this check when reliable acks are properly supported
59-
if !config.ReliableAcksDisabled() {
60-
return nil, errors.New("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file")
61-
}
6256
config.AckChan = make(chan *telemetry.Record)
6357
return config, err
6458
}

config/config_initializer_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ var _ = Describe("Test application config initialization", func() {
2323
Namespace: "tesla_telemetry",
2424
TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"},
2525
RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30},
26-
ReliableAck: false,
27-
ReliableAckWorkers: 0,
26+
ReliableAckSources: map[string]telemetry.Dispatcher{"V": telemetry.Kafka},
2827
Kafka: &confluent.ConfigMap{
2928
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
3029
"ssl.ca.location": "kafka.ca",
@@ -36,7 +35,7 @@ var _ = Describe("Test application config initialization", func() {
3635
MetricCollector: prometheus.NewCollector(),
3736
LogLevel: "info",
3837
JSONLogEnable: true,
39-
Records: map[string][]telemetry.Dispatcher{"FS": {"kafka"}},
38+
Records: map[string][]telemetry.Dispatcher{"V": {"kafka"}},
4039
}
4140

4241
loadedConfig, err := loadTestApplicationConfig(TestConfig)
@@ -62,7 +61,7 @@ var _ = Describe("Test application config initialization", func() {
6261
"queue.buffering.max.messages": float64(1000000),
6362
},
6463
MetricCollector: noop.NewCollector(),
65-
Records: map[string][]telemetry.Dispatcher{"FS": {"kafka"}},
64+
Records: map[string][]telemetry.Dispatcher{"V": {"kafka"}},
6665
}
6766

6867
loadedConfig, err := loadTestApplicationConfig(TestSmallConfig)
@@ -73,11 +72,6 @@ var _ = Describe("Test application config initialization", func() {
7372
Expect(loadedConfig).To(Equal(expectedConfig))
7473
})
7574

76-
It("fails when reliable acks are set", func() {
77-
_, err := loadTestApplicationConfig(TestReliableAckConfig)
78-
Expect(err).Should(MatchError("reliable acks not support yet. Unset `reliable_ack` and `reliable_ack_workers` in the config file"))
79-
})
80-
8175
It("returns an error if config is not appropriate", func() {
8276
_, err := loadTestApplicationConfig(BadTopicConfig)
8377
Expect(err).To(MatchError("invalid character '}' looking for beginning of object key string"))

config/config_test.go

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,12 @@ var _ = Describe("Test full application config", func() {
2727
BeforeEach(func() {
2828
log, _ = logrus.NoOpLogger()
2929
config = &Config{
30-
Host: "127.0.0.1",
31-
Port: 443,
32-
StatusPort: 8080,
33-
Namespace: "tesla_telemetry",
34-
TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"},
35-
RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30},
36-
ReliableAck: true,
37-
ReliableAckWorkers: 15,
30+
Host: "127.0.0.1",
31+
Port: 443,
32+
StatusPort: 8080,
33+
Namespace: "tesla_telemetry",
34+
TLS: &TLS{CAFile: "tesla.ca", ServerCert: "your_own_cert.crt", ServerKey: "your_own_key.key"},
35+
RateLimit: &RateLimit{Enabled: true, MessageLimit: 1000, MessageInterval: 30},
3836
Kafka: &confluent.ConfigMap{
3937
"bootstrap.servers": "some.broker:9093",
4038
"ssl.ca.location": "kafka.ca",
@@ -44,7 +42,7 @@ var _ = Describe("Test full application config", func() {
4442
Monitoring: &metrics.MonitoringConfig{PrometheusMetricsPort: 9090, ProfilerPort: 4269, ProfilingPath: "/tmp/fleet-telemetry/profile/"},
4543
LogLevel: "info",
4644
JSONLogEnable: true,
47-
Records: map[string][]telemetry.Dispatcher{"FS": {"kafka"}},
45+
Records: map[string][]telemetry.Dispatcher{"V": {"kafka"}},
4846
}
4947
})
5048

@@ -137,7 +135,7 @@ var _ = Describe("Test full application config", func() {
137135

138136
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
139137
Expect(err).NotTo(HaveOccurred())
140-
Expect(producers["FS"]).To(HaveLen(1))
138+
Expect(producers["V"]).To(HaveLen(1))
141139

142140
value, err := config.Kafka.Get("queue.buffering.max.messages", 10)
143141
Expect(err).NotTo(HaveOccurred())
@@ -168,10 +166,29 @@ var _ = Describe("Test full application config", func() {
168166
})
169167
})
170168

169+
Context("configure reliable acks", func() {
170+
171+
DescribeTable("fails",
172+
func(configInput string, errMessage string) {
173+
174+
config, err := loadTestApplicationConfig(configInput)
175+
Expect(err).NotTo(HaveOccurred())
176+
177+
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
178+
Expect(err).To(MatchError(errMessage))
179+
Expect(producers).To(BeNil())
180+
},
181+
Entry("when reliable ack is mapped incorrectly", TestBadReliableAckConfig, "pubsub cannot be configured as reliable ack for record: V. Valid datastores configured [kafka]"),
182+
Entry("when logger is configured as reliable ack", TestLoggerAsReliableAckConfig, "logger cannot be configured as reliable ack for record: V"),
183+
Entry("when reliable ack is configured for unmapped txtype", TestUnusedTxTypeAsReliableAckConfig, "kafka cannot be configured as reliable ack for record: error since no record mapping exists"),
184+
)
185+
186+
})
187+
171188
Context("configure kinesis", func() {
172189
It("returns an error if kinesis isn't included", func() {
173190
log, _ := logrus.NoOpLogger()
174-
config.Records = map[string][]telemetry.Dispatcher{"FS": {"kinesis"}}
191+
config.Records = map[string][]telemetry.Dispatcher{"V": {"kinesis"}}
175192

176193
var err error
177194
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
@@ -218,7 +235,7 @@ var _ = Describe("Test full application config", func() {
218235
var err error
219236
producers, err = pubsubConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
220237
Expect(err).NotTo(HaveOccurred())
221-
Expect(producers["FS"]).NotTo(BeNil())
238+
Expect(producers["V"]).NotTo(BeNil())
222239
})
223240
})
224241

@@ -233,7 +250,7 @@ var _ = Describe("Test full application config", func() {
233250

234251
It("returns an error if zmq isn't included", func() {
235252
log, _ := logrus.NoOpLogger()
236-
config.Records = map[string][]telemetry.Dispatcher{"FS": {"zmq"}}
253+
config.Records = map[string][]telemetry.Dispatcher{"V": {"zmq"}}
237254
var err error
238255
producers, err = config.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
239256
Expect(err).To(MatchError("Expected ZMQ to be configured"))
@@ -249,7 +266,7 @@ var _ = Describe("Test full application config", func() {
249266
var err error
250267
producers, err = zmqConfig.ConfigureProducers(airbrake.NewAirbrakeHandler(nil), log)
251268
Expect(err).NotTo(HaveOccurred())
252-
Expect(producers["FS"]).NotTo(BeNil())
269+
Expect(producers["V"]).NotTo(BeNil())
253270
})
254271
})
255272

0 commit comments

Comments
 (0)