diff --git a/README.md b/README.md index 9e94e43..7e0d095 100644 --- a/README.md +++ b/README.md @@ -151,9 +151,9 @@ Vehicles must be running firmware version 2023.20.6 or later. Some older model ## Personalized Backends/Dispatchers Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry servers. They can be of any type, from distributed message queues to STDOUT logger. Here is a list of the currently supported [dispatchers](./telemetry/producer.go#L10-L19):: * Kafka (preferred): Configure with the config.json file. See implementation here: [config/config.go](./config/config.go) - * Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla` + * Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity` and \*prefix\*`_alerts`. The default prefix is `tesla` * Kinesis: Configure with standard [AWS env variables and config files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html). The default AWS credentials and config files are: `~/.aws/credentials` and `~/.aws/config`. - * By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_errors`, `tesla_alerts`, etc + * By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_alerts`, etc * Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }` * Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V` * Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS` @@ -179,6 +179,9 @@ On the vehicle, Fleet Telemetry client behave similarly to how the connectivity } ``` +## Tracking incoming signals +If you have metrics enabled, you can use it to track count of incoming signals. This can help you identify approximate billing for your service. There are two ways to track signals. By default, it tracks signals per record_type (\*prefix\*`V`,\*prefix\*`connectivity`, \*prefix\*`alerts`, and \*prefix\*`errors`). If you wish to track signals for a subset of VINs, you can add `vins_signal_tracking_enabled` in the config file which will track metrics for usage from those particular vins as well. + ## Metrics Configure and use Prometheus or a StatsD-interface supporting data store for metrics. The integration test runs Fleet Telemetry with [grafana](https://grafana.com/docs/grafana/latest/datasources/google-cloud-monitoring/), which is compatible with prometheus. It also has an example dashboard which tracks important metrics related to the hosted server. Sample screenshot for the [sample dashboard](./test/integration/grafana/provisioning/dashboards/dashboard.json):- diff --git a/config/config.go b/config/config.go index fe92258..4a3dc11 100644 --- a/config/config.go +++ b/config/config.go @@ -93,6 +93,8 @@ type Config struct { // when vehicle configuration has prefer_typed set to true, enum fields will have a prefix TransmitDecodedRecords bool `json:"transmit_decoded_records,omitempty"` + VinsSignalTrackingEnabled []string `json:"vins_signal_tracking_enabled"` + // MetricCollector collects metrics for the application MetricCollector metrics.MetricCollector @@ -192,6 +194,18 @@ func (c *Config) AirbrakeTLSConfig() (*tls.Config, error) { return tlsConfig, nil } +// VinsToTrack to track incoming signals in promemetheus +func (c *Config) VinsToTrack() map[string]struct{} { + output := make(map[string]struct{}, 0) + if len(c.VinsSignalTrackingEnabled) == 0 { + return output + } + for _, vin := range c.VinsSignalTrackingEnabled { + output[vin] = struct{}{} + } + return output +} + // ExtractServiceTLSConfig return the TLS config needed for stating the mTLS Server func (c *Config) ExtractServiceTLSConfig(logger *logrus.Logger) (*tls.Config, error) { if c.TLS == nil { diff --git a/config/config_initializer.go b/config/config_initializer.go index ea65e23..82ba4d7 100644 --- a/config/config_initializer.go +++ b/config/config_initializer.go @@ -3,6 +3,7 @@ package config import ( "encoding/json" "flag" + "fmt" "log" "os" @@ -14,6 +15,10 @@ import ( "github.com/teslamotors/fleet-telemetry/telemetry" ) +var ( + maxVinsToTrack = 20 +) + // LoadApplicationConfiguration loads the configuration from args and config files func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err error) { @@ -27,7 +32,6 @@ func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err config, err = loadApplicationConfig(configFilePath) if err != nil { - logger.ErrorLog("read_application_configuration_error", err, nil) return nil, nil, err } @@ -55,11 +59,22 @@ func loadApplicationConfig(configFilePath string) (*Config, error) { if err != nil { return nil, err } + + if err := validateConfig(config); err != nil { + return nil, err + } config.MetricCollector = metrics.NewCollector(config.Monitoring, logger) config.AckChan = make(chan *telemetry.Record) return config, err } +func validateConfig(config *Config) error { + if len(config.VinsToTrack()) > maxVinsToTrack { + return fmt.Errorf("set the value of `vins_signal_tracking_enabled` less than %d unique vins", maxVinsToTrack) + } + return nil +} + func loadConfigFlags() string { applicationConfig := "" flag.StringVar(&applicationConfig, "config", "config.json", "application configuration file") diff --git a/config/config_test.go b/config/config_test.go index 67b075a..9688203 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -211,6 +211,31 @@ var _ = Describe("Test full application config", func() { }) }) + Context("VinsToTrack", func() { + + AfterEach(func() { + maxVinsToTrack = 20 + }) + + It("empty vins to track", func() { + config, err := loadTestApplicationConfig(TestSmallConfig) + Expect(err).NotTo(HaveOccurred()) + Expect(config.VinsToTrack()).To(BeEmpty()) + }) + + It("valid vins to track", func() { + config, err := loadTestApplicationConfig(TestVinsToTrackConfig) + Expect(err).NotTo(HaveOccurred()) + Expect(config.VinsToTrack()).To(HaveLen(2)) + }) + + It("returns an error when `vins_signal_tracking_enabled` exceeds limit", func() { + maxVinsToTrack = 2 + _, err := loadTestApplicationConfig(BadVinsConfig) + Expect(err).To(MatchError("set the value of `vins_signal_tracking_enabled` less than 2 unique vins")) + }) + }) + Context("configure pubsub", func() { var ( pubsubConfig *Config diff --git a/config/test_configs_test.go b/config/test_configs_test.go index 7ff0c6d..5753369 100644 --- a/config/test_configs_test.go +++ b/config/test_configs_test.go @@ -62,6 +62,30 @@ const TestSmallConfig = ` } ` +const BadVinsConfig = ` +{ + "host": "127.0.0.1", + "port": 443, + "status_port": 8080, + "namespace": "tesla_telemetry", + "kafka": { + "bootstrap.servers": "some.broker1:9093,some.broker1:9093", + "ssl.ca.location": "kafka.ca", + "ssl.certificate.location": "kafka.crt", + "ssl.key.location": "kafka.key", + "queue.buffering.max.messages": 1000000 + }, + "records": { + "V": ["kafka"] + }, + "vins_signal_tracking_enabled": ["vin1", "vin2", "vin3"], + "tls": { + "ca_file": "tesla.ca", + "server_cert": "your_own_cert.crt", + "server_key": "your_own_key.key" + } +} +` const TestBadReliableAckConfig = ` { "host": "127.0.0.1", @@ -190,6 +214,19 @@ const TestTransmitDecodedRecords = ` } ` +const TestVinsToTrackConfig = ` +{ + "host": "127.0.0.1", + "port": 443, + "status_port": 8080, + "transmit_decoded_records": true, + "records": { + "V": ["logger"] + }, + "vins_signal_tracking_enabled": ["v1", "v2"] +} +` + const TestAirbrakeConfig = ` { "host": "127.0.0.1", diff --git a/server/streaming/socket.go b/server/streaming/socket.go index 9ee5bd9..7614eef 100644 --- a/server/streaming/socket.go +++ b/server/streaming/socket.go @@ -47,6 +47,7 @@ type SocketManager struct { stopChan chan struct{} writeChan chan SocketMessage transmitDecodedRecords bool + vinsSignalTracking map[string]struct{} } // SocketMessage represents incoming socket connection @@ -66,6 +67,8 @@ type Metrics struct { socketErrorCount adapter.Counter recordSizeBytesTotal adapter.Counter recordCount adapter.Counter + signalsCount adapter.Gauge + vinSignalCount adapter.Gauge } var ( @@ -94,6 +97,7 @@ func NewSocketManager(ctx context.Context, requestIdentity *telemetry.RequestIde stopChan: make(chan struct{}), requestIdentity: requestIdentity, transmitDecodedRecords: config.TransmitDecodedRecords, + vinsSignalTracking: config.VinsToTrack(), } } @@ -205,6 +209,7 @@ func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer // client exceeded the rate limit messagesRateLimited++ record, _ := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords) + sm.trackSignalUsage(record) metricsRegistry.rateLimitExceededCount.Inc(map[string]string{"device_id": sm.requestIdentity.DeviceID, "txtype": record.TxType}) if sm.config.RateLimit != nil && sm.config.RateLimit.Enabled { continue @@ -223,6 +228,15 @@ func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer } } +func (sm *SocketManager) trackSignalUsage(record *telemetry.Record) { + metricsRegistry.signalsCount.Add(int64(record.SignalsCount()), map[string]string{"record_type": record.TxType}) + vin := record.Vin + if _, ok := sm.vinsSignalTracking[vin]; !ok { + return + } + metricsRegistry.vinSignalCount.Add(int64(record.SignalsCount()), map[string]string{"vin": vin, "record_type": record.TxType}) +} + // ParseAndProcessRecord reads incoming client message and dispatches to relevant producer func (sm *SocketManager) ParseAndProcessRecord(serializer *telemetry.BinarySerializer, message []byte) { record, err := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords) @@ -391,4 +405,16 @@ func registerMetrics(metricsCollector metrics.MetricCollector) { Labels: []string{"record_type"}, }) + metricsRegistry.signalsCount = metricsCollector.RegisterGauge(adapter.CollectorOptions{ + Name: "signal_count", + Help: "Total number of signals received per record type", + Labels: []string{"record_type"}, + }) + + metricsRegistry.vinSignalCount = metricsCollector.RegisterGauge(adapter.CollectorOptions{ + Name: "vin_signal_count", + Help: "Total number of signals received per vin per record type", + Labels: []string{"record_type", "vin"}, + }) + } diff --git a/telemetry/record.go b/telemetry/record.go index f4bd863..19be8de 100644 --- a/telemetry/record.go +++ b/telemetry/record.go @@ -135,6 +135,18 @@ func (record *Record) ensureEncoded() { } } +// SignalsCount received per record_type from the vehicle +func (record *Record) SignalsCount() int { + switch payload := record.protoMessage.(type) { + case *protos.Payload: + return len(payload.GetData()) + case *protos.VehicleAlerts: + return len(payload.GetAlerts()) + default: + return 0 + } +} + func (record *Record) applyProtoRecordTransforms() error { switch record.TxType { case "alerts": diff --git a/test/integration/config.json b/test/integration/config.json index c45aa35..335b42e 100644 --- a/test/integration/config.json +++ b/test/integration/config.json @@ -37,6 +37,9 @@ "connect_timeout_ms": 30000, "publish_timeout_ms": 1000 }, + "vins_signal_tracking_enabled": [ + "device-1" + ], "monitoring": { "prometheus_metrics_port": 9090, "profiler_port": 4269,