Skip to content

Commit

Permalink
track signal usage for received messages
Browse files Browse the repository at this point in the history
  • Loading branch information
agbpatro committed Jan 31, 2025
1 parent dce2d86 commit 22308b4
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 3 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ Vehicles must be running firmware version 2023.20.6 or later. Some older model
## Personalized Backends/Dispatchers
Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry servers. They can be of any type, from distributed message queues to STDOUT logger. Here is a list of the currently supported [dispatchers](./telemetry/producer.go#L10-L19)::
* Kafka (preferred): Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla`
* Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity` and \*prefix\*`_alerts`. The default prefix is `tesla`
* Kinesis: Configure with standard [AWS env variables and config files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html). The default AWS credentials and config files are: `~/.aws/credentials` and `~/.aws/config`.
* By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_errors`, `tesla_alerts`, etc
* By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_alerts`, etc
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
Expand All @@ -179,6 +179,9 @@ On the vehicle, Fleet Telemetry client behave similarly to how the connectivity
}
```

## Tracking incoming signals
If you have metrics enabled, you can use it to track count of incoming signals. This can help you identify approximate billing for your service. There are two ways to track signals. By default, it tracks signals per record_type (\*prefix\*`V`,\*prefix\*`connectivity`, \*prefix\*`alerts`, and \*prefix\*`errors`). If you wish to track signals for a subset of VINs, you can add `vins_signal_tracking_enabled` in the config file which will track metrics for usage from those particular vins as well.

## Metrics
Configure and use Prometheus or a StatsD-interface supporting data store for metrics. The integration test runs Fleet Telemetry with [grafana](https://grafana.com/docs/grafana/latest/datasources/google-cloud-monitoring/), which is compatible with prometheus. It also has an example dashboard which tracks important metrics related to the hosted server. Sample screenshot for the [sample dashboard](./test/integration/grafana/provisioning/dashboards/dashboard.json):-

Expand Down
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Config struct {
// when vehicle configuration has prefer_typed set to true, enum fields will have a prefix
TransmitDecodedRecords bool `json:"transmit_decoded_records,omitempty"`

VinsSignalTrackingEnabled []string `json:"vins_signal_tracking_enabled"`

// MetricCollector collects metrics for the application
MetricCollector metrics.MetricCollector

Expand Down Expand Up @@ -192,6 +194,18 @@ func (c *Config) AirbrakeTLSConfig() (*tls.Config, error) {
return tlsConfig, nil
}

// VinsToTrack to track incoming signals in promemetheus
func (c *Config) VinsToTrack() map[string]struct{} {
output := make(map[string]struct{}, 0)
if len(c.VinsSignalTrackingEnabled) == 0 {
return output
}
for _, vin := range c.VinsSignalTrackingEnabled {
output[vin] = struct{}{}
}
return output
}

// ExtractServiceTLSConfig return the TLS config needed for stating the mTLS Server
func (c *Config) ExtractServiceTLSConfig(logger *logrus.Logger) (*tls.Config, error) {
if c.TLS == nil {
Expand Down
17 changes: 16 additions & 1 deletion config/config_initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"encoding/json"
"flag"
"fmt"
"log"
"os"

Expand All @@ -14,6 +15,10 @@ import (
"github.com/teslamotors/fleet-telemetry/telemetry"
)

var (
maxVinsToTrack = 20
)

// LoadApplicationConfiguration loads the configuration from args and config files
func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err error) {

Expand All @@ -27,7 +32,6 @@ func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err

config, err = loadApplicationConfig(configFilePath)
if err != nil {
logger.ErrorLog("read_application_configuration_error", err, nil)
return nil, nil, err
}

Expand Down Expand Up @@ -55,11 +59,22 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
if err != nil {
return nil, err
}

if err := validateConfig(config); err != nil {
return nil, err
}
config.MetricCollector = metrics.NewCollector(config.Monitoring, logger)
config.AckChan = make(chan *telemetry.Record)
return config, err
}

func validateConfig(config *Config) error {
if len(config.VinsToTrack()) > maxVinsToTrack {
return fmt.Errorf("set the value of `vins_signal_tracking_enabled` less than %d unique vins", maxVinsToTrack)
}
return nil
}

func loadConfigFlags() string {
applicationConfig := ""
flag.StringVar(&applicationConfig, "config", "config.json", "application configuration file")
Expand Down
25 changes: 25 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@ var _ = Describe("Test full application config", func() {
})
})

Context("VinsToTrack", func() {

AfterEach(func() {
maxVinsToTrack = 20
})

It("empty vins to track", func() {
config, err := loadTestApplicationConfig(TestSmallConfig)
Expect(err).NotTo(HaveOccurred())
Expect(config.VinsToTrack()).To(BeEmpty())
})

It("valid vins to track", func() {
config, err := loadTestApplicationConfig(TestVinsToTrackConfig)
Expect(err).NotTo(HaveOccurred())
Expect(config.VinsToTrack()).To(HaveLen(2))
})

It("returns an error when `vins_signal_tracking_enabled` exceeds limit", func() {
maxVinsToTrack = 2
_, err := loadTestApplicationConfig(BadVinsConfig)
Expect(err).To(MatchError("set the value of `vins_signal_tracking_enabled` less than 2 unique vins"))
})
})

Context("configure pubsub", func() {
var (
pubsubConfig *Config
Expand Down
37 changes: 37 additions & 0 deletions config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ const TestSmallConfig = `
}
`

const BadVinsConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"namespace": "tesla_telemetry",
"kafka": {
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
"ssl.certificate.location": "kafka.crt",
"ssl.key.location": "kafka.key",
"queue.buffering.max.messages": 1000000
},
"records": {
"V": ["kafka"]
},
"vins_signal_tracking_enabled": ["vin1", "vin2", "vin3"],
"tls": {
"ca_file": "tesla.ca",
"server_cert": "your_own_cert.crt",
"server_key": "your_own_key.key"
}
}
`
const TestBadReliableAckConfig = `
{
"host": "127.0.0.1",
Expand Down Expand Up @@ -190,6 +214,19 @@ const TestTransmitDecodedRecords = `
}
`

const TestVinsToTrackConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"transmit_decoded_records": true,
"records": {
"V": ["logger"]
},
"vins_signal_tracking_enabled": ["v1", "v2"]
}
`

const TestAirbrakeConfig = `
{
"host": "127.0.0.1",
Expand Down
26 changes: 26 additions & 0 deletions server/streaming/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SocketManager struct {
stopChan chan struct{}
writeChan chan SocketMessage
transmitDecodedRecords bool
vinsSignalTracking map[string]struct{}
}

// SocketMessage represents incoming socket connection
Expand All @@ -66,6 +67,8 @@ type Metrics struct {
socketErrorCount adapter.Counter
recordSizeBytesTotal adapter.Counter
recordCount adapter.Counter
signalsCount adapter.Gauge
vinSignalCount adapter.Gauge
}

var (
Expand Down Expand Up @@ -94,6 +97,7 @@ func NewSocketManager(ctx context.Context, requestIdentity *telemetry.RequestIde
stopChan: make(chan struct{}),
requestIdentity: requestIdentity,
transmitDecodedRecords: config.TransmitDecodedRecords,
vinsSignalTracking: config.VinsToTrack(),
}
}

Expand Down Expand Up @@ -205,6 +209,7 @@ func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer
// client exceeded the rate limit
messagesRateLimited++
record, _ := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords)
sm.trackSignalUsage(record)
metricsRegistry.rateLimitExceededCount.Inc(map[string]string{"device_id": sm.requestIdentity.DeviceID, "txtype": record.TxType})
if sm.config.RateLimit != nil && sm.config.RateLimit.Enabled {
continue
Expand All @@ -223,6 +228,15 @@ func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer
}
}

func (sm *SocketManager) trackSignalUsage(record *telemetry.Record) {
metricsRegistry.signalsCount.Add(int64(record.SignalsCount()), map[string]string{"record_type": record.TxType})
vin := record.Vin
if _, ok := sm.vinsSignalTracking[vin]; !ok {
return
}
metricsRegistry.vinSignalCount.Add(int64(record.SignalsCount()), map[string]string{"vin": vin, "record_type": record.TxType})
}

// ParseAndProcessRecord reads incoming client message and dispatches to relevant producer
func (sm *SocketManager) ParseAndProcessRecord(serializer *telemetry.BinarySerializer, message []byte) {
record, err := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords)
Expand Down Expand Up @@ -391,4 +405,16 @@ func registerMetrics(metricsCollector metrics.MetricCollector) {
Labels: []string{"record_type"},
})

metricsRegistry.signalsCount = metricsCollector.RegisterGauge(adapter.CollectorOptions{
Name: "signal_count",
Help: "Total number of signals received per record type",
Labels: []string{"record_type"},
})

metricsRegistry.vinSignalCount = metricsCollector.RegisterGauge(adapter.CollectorOptions{
Name: "vin_signal_count",
Help: "Total number of signals received per vin per record type",
Labels: []string{"record_type", "vin"},
})

}
12 changes: 12 additions & 0 deletions telemetry/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ func (record *Record) ensureEncoded() {
}
}

// SignalsCount received per record_type from the vehicle
func (record *Record) SignalsCount() int {
switch payload := record.protoMessage.(type) {
case *protos.Payload:
return len(payload.GetData())
case *protos.VehicleAlerts:
return len(payload.GetAlerts())
default:
return 0
}
}

func (record *Record) applyProtoRecordTransforms() error {
switch record.TxType {
case "alerts":
Expand Down
3 changes: 3 additions & 0 deletions test/integration/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
"connect_timeout_ms": 30000,
"publish_timeout_ms": 1000
},
"vins_signal_tracking_enabled": [
"device-1"
],
"monitoring": {
"prometheus_metrics_port": 9090,
"profiler_port": 4269,
Expand Down

0 comments on commit 22308b4

Please sign in to comment.