Skip to content

Commit 77ab764

Browse files
committed
track signal usage for received messages
1 parent dce2d86 commit 77ab764

File tree

9 files changed

+139
-3
lines changed

9 files changed

+139
-3
lines changed

README.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,9 @@ Vehicles must be running firmware version 2023.20.6 or later. Some older model
151151
## Personalized Backends/Dispatchers
152152
Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry servers. They can be of any type, from distributed message queues to STDOUT logger. Here is a list of the currently supported [dispatchers](./telemetry/producer.go#L10-L19)::
153153
* Kafka (preferred): Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
154-
* Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla`
154+
* Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity` and \*prefix\*`_alerts`. The default prefix is `tesla`
155155
* Kinesis: Configure with standard [AWS env variables and config files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html). The default AWS credentials and config files are: `~/.aws/credentials` and `~/.aws/config`.
156-
* By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_errors`, `tesla_alerts`, etc
156+
* By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_alerts`, etc
157157
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
158158
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
159159
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
@@ -179,6 +179,9 @@ On the vehicle, Fleet Telemetry client behave similarly to how the connectivity
179179
}
180180
```
181181

182+
## Tracking incoming signals
183+
If you have metrics enabled, you can use it to track count of incoming signals. This can help you identify approximate billing for your service. There are two ways to track signals. By default, it tracks signals per record_type (\*prefix\*`V`,\*prefix\*`connectivity`, \*prefix\*`alerts`, and \*prefix\*`errors`). If you wish to track signals for a subset of VINs, you can add `vins_signal_tracking_enabled` in the config file which will track metrics for usage from those particular vins as well.
184+
182185
## Metrics
183186
Configure and use Prometheus or a StatsD-interface supporting data store for metrics. The integration test runs Fleet Telemetry with [grafana](https://grafana.com/docs/grafana/latest/datasources/google-cloud-monitoring/), which is compatible with prometheus. It also has an example dashboard which tracks important metrics related to the hosted server. Sample screenshot for the [sample dashboard](./test/integration/grafana/provisioning/dashboards/dashboard.json):-
184187

config/config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type Config struct {
9393
// when vehicle configuration has prefer_typed set to true, enum fields will have a prefix
9494
TransmitDecodedRecords bool `json:"transmit_decoded_records,omitempty"`
9595

96+
VinsSignalTrackingEnabled []string `json:"vins_signal_tracking_enabled"`
97+
9698
// MetricCollector collects metrics for the application
9799
MetricCollector metrics.MetricCollector
98100

@@ -192,6 +194,18 @@ func (c *Config) AirbrakeTLSConfig() (*tls.Config, error) {
192194
return tlsConfig, nil
193195
}
194196

197+
// VinsToTrack to track incoming signals in promemetheus
198+
func (c *Config) VinsToTrack() map[string]struct{} {
199+
output := make(map[string]struct{}, 0)
200+
if len(c.VinsSignalTrackingEnabled) == 0 {
201+
return output
202+
}
203+
for _, vin := range c.VinsSignalTrackingEnabled {
204+
output[vin] = struct{}{}
205+
}
206+
return output
207+
}
208+
195209
// ExtractServiceTLSConfig return the TLS config needed for stating the mTLS Server
196210
func (c *Config) ExtractServiceTLSConfig(logger *logrus.Logger) (*tls.Config, error) {
197211
if c.TLS == nil {

config/config_initializer.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package config
33
import (
44
"encoding/json"
55
"flag"
6+
"fmt"
67
"log"
78
"os"
89

@@ -14,6 +15,10 @@ import (
1415
"github.com/teslamotors/fleet-telemetry/telemetry"
1516
)
1617

18+
var (
19+
maxVinsToTrack = 20
20+
)
21+
1722
// LoadApplicationConfiguration loads the configuration from args and config files
1823
func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err error) {
1924

@@ -27,7 +32,6 @@ func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err
2732

2833
config, err = loadApplicationConfig(configFilePath)
2934
if err != nil {
30-
logger.ErrorLog("read_application_configuration_error", err, nil)
3135
return nil, nil, err
3236
}
3337

@@ -55,11 +59,22 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
5559
if err != nil {
5660
return nil, err
5761
}
62+
63+
if err := validateConfig(config); err != nil {
64+
return nil, err
65+
}
5866
config.MetricCollector = metrics.NewCollector(config.Monitoring, logger)
5967
config.AckChan = make(chan *telemetry.Record)
6068
return config, err
6169
}
6270

71+
func validateConfig(config *Config) error {
72+
if len(config.VinsToTrack()) > maxVinsToTrack {
73+
return fmt.Errorf("set the value of `vins_signal_tracking_enabled` less than %d unique vins", maxVinsToTrack)
74+
}
75+
return nil
76+
}
77+
6378
func loadConfigFlags() string {
6479
applicationConfig := ""
6580
flag.StringVar(&applicationConfig, "config", "config.json", "application configuration file")

config/config_initializer_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
)
1616

1717
var _ = Describe("Test application config initialization", func() {
18+
1819
It("loads the config properly", func() {
1920
expectedConfig := &Config{
2021
Host: "127.0.0.1",

config/config_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,31 @@ var _ = Describe("Test full application config", func() {
211211
})
212212
})
213213

214+
Context("VinsToTrack", func() {
215+
216+
AfterEach(func() {
217+
maxVinsToTrack = 20
218+
})
219+
220+
It("empty vins to track", func() {
221+
config, err := loadTestApplicationConfig(TestSmallConfig)
222+
Expect(err).NotTo(HaveOccurred())
223+
Expect(config.VinsToTrack()).To(BeEmpty())
224+
})
225+
226+
It("empty vins to track", func() {
227+
config, err := loadTestApplicationConfig(TestVinsToTrackConfig)
228+
Expect(err).NotTo(HaveOccurred())
229+
Expect(config.VinsToTrack()).To(HaveLen(2))
230+
})
231+
232+
It("returns an error when `vins_signal_tracking_enabled` exceeds limit", func() {
233+
maxVinsToTrack = 2
234+
_, err := loadTestApplicationConfig(BadVinsConfig)
235+
Expect(err).To(MatchError("set the value of `vins_signal_tracking_enabled` less than 2 unique vins"))
236+
})
237+
})
238+
214239
Context("configure pubsub", func() {
215240
var (
216241
pubsubConfig *Config

config/test_configs_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,30 @@ const TestSmallConfig = `
6262
}
6363
`
6464

65+
const BadVinsConfig = `
66+
{
67+
"host": "127.0.0.1",
68+
"port": 443,
69+
"status_port": 8080,
70+
"namespace": "tesla_telemetry",
71+
"kafka": {
72+
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
73+
"ssl.ca.location": "kafka.ca",
74+
"ssl.certificate.location": "kafka.crt",
75+
"ssl.key.location": "kafka.key",
76+
"queue.buffering.max.messages": 1000000
77+
},
78+
"records": {
79+
"V": ["kafka"]
80+
},
81+
"vins_signal_tracking_enabled": ["vin1", "vin2", "vin3"],
82+
"tls": {
83+
"ca_file": "tesla.ca",
84+
"server_cert": "your_own_cert.crt",
85+
"server_key": "your_own_key.key"
86+
}
87+
}
88+
`
6589
const TestBadReliableAckConfig = `
6690
{
6791
"host": "127.0.0.1",
@@ -190,6 +214,19 @@ const TestTransmitDecodedRecords = `
190214
}
191215
`
192216

217+
const TestVinsToTrackConfig = `
218+
{
219+
"host": "127.0.0.1",
220+
"port": 443,
221+
"status_port": 8080,
222+
"transmit_decoded_records": true,
223+
"records": {
224+
"V": ["logger"]
225+
},
226+
"vins_signal_tracking_enabled": ["v1", "v2"]
227+
}
228+
`
229+
193230
const TestAirbrakeConfig = `
194231
{
195232
"host": "127.0.0.1",

server/streaming/socket.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type SocketManager struct {
4747
stopChan chan struct{}
4848
writeChan chan SocketMessage
4949
transmitDecodedRecords bool
50+
vinsSignalTracking map[string]struct{}
5051
}
5152

5253
// SocketMessage represents incoming socket connection
@@ -66,6 +67,8 @@ type Metrics struct {
6667
socketErrorCount adapter.Counter
6768
recordSizeBytesTotal adapter.Counter
6869
recordCount adapter.Counter
70+
signalsCount adapter.Gauge
71+
vinSignalCount adapter.Gauge
6972
}
7073

7174
var (
@@ -94,6 +97,7 @@ func NewSocketManager(ctx context.Context, requestIdentity *telemetry.RequestIde
9497
stopChan: make(chan struct{}),
9598
requestIdentity: requestIdentity,
9699
transmitDecodedRecords: config.TransmitDecodedRecords,
100+
vinsSignalTracking: config.VinsToTrack(),
97101
}
98102
}
99103

@@ -205,6 +209,7 @@ func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer
205209
// client exceeded the rate limit
206210
messagesRateLimited++
207211
record, _ := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords)
212+
sm.trackSignalUsage(record)
208213
metricsRegistry.rateLimitExceededCount.Inc(map[string]string{"device_id": sm.requestIdentity.DeviceID, "txtype": record.TxType})
209214
if sm.config.RateLimit != nil && sm.config.RateLimit.Enabled {
210215
continue
@@ -223,6 +228,15 @@ func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer
223228
}
224229
}
225230

231+
func (sm *SocketManager) trackSignalUsage(record *telemetry.Record) {
232+
metricsRegistry.signalsCount.Add(int64(record.SignalsCount()), map[string]string{"record_type": record.TxType})
233+
vin := record.Vin
234+
if _, ok := sm.vinsSignalTracking[vin]; !ok {
235+
return
236+
}
237+
metricsRegistry.vinSignalCount.Add(int64(record.SignalsCount()), map[string]string{"vin": vin, "record_type": record.TxType})
238+
}
239+
226240
// ParseAndProcessRecord reads incoming client message and dispatches to relevant producer
227241
func (sm *SocketManager) ParseAndProcessRecord(serializer *telemetry.BinarySerializer, message []byte) {
228242
record, err := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords)
@@ -391,4 +405,16 @@ func registerMetrics(metricsCollector metrics.MetricCollector) {
391405
Labels: []string{"record_type"},
392406
})
393407

408+
metricsRegistry.signalsCount = metricsCollector.RegisterGauge(adapter.CollectorOptions{
409+
Name: "signal_count",
410+
Help: "Total number of signals received per record type",
411+
Labels: []string{"record_type"},
412+
})
413+
414+
metricsRegistry.vinSignalCount = metricsCollector.RegisterGauge(adapter.CollectorOptions{
415+
Name: "vin_signal_count",
416+
Help: "Total number of signals received per vin per record type",
417+
Labels: []string{"record_type", "vin"},
418+
})
419+
394420
}

telemetry/record.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,18 @@ func (record *Record) ensureEncoded() {
135135
}
136136
}
137137

138+
// SignalsCount received per record_type from the vehicle
139+
func (record *Record) SignalsCount() int {
140+
switch payload := record.protoMessage.(type) {
141+
case *protos.Payload:
142+
return len(payload.GetData())
143+
case *protos.VehicleAlerts:
144+
return len(payload.GetAlerts())
145+
default:
146+
return 0
147+
}
148+
}
149+
138150
func (record *Record) applyProtoRecordTransforms() error {
139151
switch record.TxType {
140152
case "alerts":

test/integration/config.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
"connect_timeout_ms": 30000,
3838
"publish_timeout_ms": 1000
3939
},
40+
"vins_signal_tracking_enabled": [
41+
"device-1"
42+
],
4043
"monitoring": {
4144
"prometheus_metrics_port": 9090,
4245
"profiler_port": 4269,

0 commit comments

Comments
 (0)