Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track signal usage for received messages #339

Merged
merged 1 commit into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@ Vehicles must be running firmware version 2023.20.6 or later. Some older model
## Personalized Backends/Dispatchers
Dispatchers handle vehicle data processing upon its arrival at Fleet Telemetry servers. They can be of any type, from distributed message queues to STDOUT logger. Here is a list of the currently supported [dispatchers](./telemetry/producer.go#L10-L19)::
* Kafka (preferred): Configure with the config.json file. See implementation here: [config/config.go](./config/config.go)
* Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity`, \*prefix\*`_alerts`, and \*prefix\*`_errors`. The default prefix is `tesla`
* Topics will need to be created for \*prefix\*`_V`,\*prefix\*`_connectivity` and \*prefix\*`_alerts`. The default prefix is `tesla`
* Kinesis: Configure with standard [AWS env variables and config files](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html). The default AWS credentials and config files are: `~/.aws/credentials` and `~/.aws/config`.
* By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_errors`, `tesla_alerts`, etc
* By default, stream names will be \*configured namespace\*_\*topic_name\* ex.: `tesla_V`, `tesla_alerts`, etc
* Configure stream names directly by setting the streams config `"kinesis": { "streams": { *topic_name*: stream_name } }`
* Override stream names with env variables: KINESIS_STREAM_\*uppercase topic\* ex.: `KINESIS_STREAM_V`
* Google pubsub: Along with the required pubsub config (See ./test/integration/config.json for example), be sure to set the environment variable `GOOGLE_APPLICATION_CREDENTIALS`
Expand All @@ -179,6 +179,9 @@ On the vehicle, Fleet Telemetry client behave similarly to how the connectivity
}
```

## Tracking incoming signals
If you have metrics enabled, you can use it to track count of incoming signals. This can help you identify approximate billing for your service. There are two ways to track signals. By default, it tracks signals per record_type (\*prefix\*`V` and \*prefix\*`alerts`). If you wish to track signals for a subset of VINs, you can add `vins_signal_tracking_enabled` in the config file which will track metrics for usage from those particular vins as well.

## Metrics
Configure and use Prometheus or a StatsD-interface supporting data store for metrics. The integration test runs Fleet Telemetry with [grafana](https://grafana.com/docs/grafana/latest/datasources/google-cloud-monitoring/), which is compatible with prometheus. It also has an example dashboard which tracks important metrics related to the hosted server. Sample screenshot for the [sample dashboard](./test/integration/grafana/provisioning/dashboards/dashboard.json):-

Expand Down
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type Config struct {
// when vehicle configuration has prefer_typed set to true, enum fields will have a prefix
TransmitDecodedRecords bool `json:"transmit_decoded_records,omitempty"`

VinsSignalTrackingEnabled []string `json:"vins_signal_tracking_enabled"`

// MetricCollector collects metrics for the application
MetricCollector metrics.MetricCollector

Expand Down Expand Up @@ -192,6 +194,18 @@ func (c *Config) AirbrakeTLSConfig() (*tls.Config, error) {
return tlsConfig, nil
}

// VinsToTrack to track incoming signals in promemetheus
func (c *Config) VinsToTrack() map[string]struct{} {
output := make(map[string]struct{}, 0)
if len(c.VinsSignalTrackingEnabled) == 0 {
return output
}
for _, vin := range c.VinsSignalTrackingEnabled {
output[vin] = struct{}{}
}
return output
}

// ExtractServiceTLSConfig return the TLS config needed for stating the mTLS Server
func (c *Config) ExtractServiceTLSConfig(logger *logrus.Logger) (*tls.Config, error) {
if c.TLS == nil {
Expand Down
17 changes: 16 additions & 1 deletion config/config_initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"encoding/json"
"flag"
"fmt"
"log"
"os"

Expand All @@ -14,6 +15,10 @@ import (
"github.com/teslamotors/fleet-telemetry/telemetry"
)

var (
maxVinsToTrack = 20
)

// LoadApplicationConfiguration loads the configuration from args and config files
func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err error) {

Expand All @@ -27,7 +32,6 @@ func LoadApplicationConfiguration() (config *Config, logger *logrus.Logger, err

config, err = loadApplicationConfig(configFilePath)
if err != nil {
logger.ErrorLog("read_application_configuration_error", err, nil)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we log this outside the method so don't need it here

return nil, nil, err
}

Expand Down Expand Up @@ -55,11 +59,22 @@ func loadApplicationConfig(configFilePath string) (*Config, error) {
if err != nil {
return nil, err
}

if err := validateConfig(config); err != nil {
return nil, err
}
config.MetricCollector = metrics.NewCollector(config.Monitoring, logger)
config.AckChan = make(chan *telemetry.Record)
return config, err
}

func validateConfig(config *Config) error {
if len(config.VinsToTrack()) > maxVinsToTrack {
return fmt.Errorf("set the value of `vins_signal_tracking_enabled` less than %d unique vins", maxVinsToTrack)
}
return nil
}

func loadConfigFlags() string {
applicationConfig := ""
flag.StringVar(&applicationConfig, "config", "config.json", "application configuration file")
Expand Down
25 changes: 25 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@ var _ = Describe("Test full application config", func() {
})
})

Context("VinsToTrack", func() {

AfterEach(func() {
maxVinsToTrack = 20
})

It("empty vins to track", func() {
config, err := loadTestApplicationConfig(TestSmallConfig)
Expect(err).NotTo(HaveOccurred())
Expect(config.VinsToTrack()).To(BeEmpty())
})

It("valid vins to track", func() {
config, err := loadTestApplicationConfig(TestVinsToTrackConfig)
Expect(err).NotTo(HaveOccurred())
Expect(config.VinsToTrack()).To(HaveLen(2))
})

It("returns an error when `vins_signal_tracking_enabled` exceeds limit", func() {
maxVinsToTrack = 2
_, err := loadTestApplicationConfig(BadVinsConfig)
Expect(err).To(MatchError("set the value of `vins_signal_tracking_enabled` less than 2 unique vins"))
})
})

Context("configure pubsub", func() {
var (
pubsubConfig *Config
Expand Down
37 changes: 37 additions & 0 deletions config/test_configs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ const TestSmallConfig = `
}
`

const BadVinsConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"namespace": "tesla_telemetry",
"kafka": {
"bootstrap.servers": "some.broker1:9093,some.broker1:9093",
"ssl.ca.location": "kafka.ca",
"ssl.certificate.location": "kafka.crt",
"ssl.key.location": "kafka.key",
"queue.buffering.max.messages": 1000000
},
"records": {
"V": ["kafka"]
},
"vins_signal_tracking_enabled": ["vin1", "vin2", "vin3"],
"tls": {
"ca_file": "tesla.ca",
"server_cert": "your_own_cert.crt",
"server_key": "your_own_key.key"
}
}
`
const TestBadReliableAckConfig = `
{
"host": "127.0.0.1",
Expand Down Expand Up @@ -190,6 +214,19 @@ const TestTransmitDecodedRecords = `
}
`

const TestVinsToTrackConfig = `
{
"host": "127.0.0.1",
"port": 443,
"status_port": 8080,
"transmit_decoded_records": true,
"records": {
"V": ["logger"]
},
"vins_signal_tracking_enabled": ["v1", "v2"]
}
`

const TestAirbrakeConfig = `
{
"host": "127.0.0.1",
Expand Down
26 changes: 26 additions & 0 deletions server/streaming/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type SocketManager struct {
stopChan chan struct{}
writeChan chan SocketMessage
transmitDecodedRecords bool
vinsSignalTracking map[string]struct{}
}

// SocketMessage represents incoming socket connection
Expand All @@ -66,6 +67,8 @@ type Metrics struct {
socketErrorCount adapter.Counter
recordSizeBytesTotal adapter.Counter
recordCount adapter.Counter
signalsCount adapter.Gauge
vinSignalCount adapter.Gauge
}

var (
Expand Down Expand Up @@ -94,6 +97,7 @@ func NewSocketManager(ctx context.Context, requestIdentity *telemetry.RequestIde
stopChan: make(chan struct{}),
requestIdentity: requestIdentity,
transmitDecodedRecords: config.TransmitDecodedRecords,
vinsSignalTracking: config.VinsToTrack(),
}
}

Expand Down Expand Up @@ -205,6 +209,7 @@ func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer
// client exceeded the rate limit
messagesRateLimited++
record, _ := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords)
sm.trackSignalUsage(record)
metricsRegistry.rateLimitExceededCount.Inc(map[string]string{"device_id": sm.requestIdentity.DeviceID, "txtype": record.TxType})
if sm.config.RateLimit != nil && sm.config.RateLimit.Enabled {
continue
Expand All @@ -223,6 +228,15 @@ func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer
}
}

func (sm *SocketManager) trackSignalUsage(record *telemetry.Record) {
metricsRegistry.signalsCount.Add(int64(record.SignalsCount()), map[string]string{"record_type": record.TxType})
vin := record.Vin
if _, ok := sm.vinsSignalTracking[vin]; !ok {
return
}
metricsRegistry.vinSignalCount.Add(int64(record.SignalsCount()), map[string]string{"vin": vin, "record_type": record.TxType})
}

// ParseAndProcessRecord reads incoming client message and dispatches to relevant producer
func (sm *SocketManager) ParseAndProcessRecord(serializer *telemetry.BinarySerializer, message []byte) {
record, err := telemetry.NewRecord(serializer, message, sm.UUID, sm.transmitDecodedRecords)
Expand Down Expand Up @@ -391,4 +405,16 @@ func registerMetrics(metricsCollector metrics.MetricCollector) {
Labels: []string{"record_type"},
})

metricsRegistry.signalsCount = metricsCollector.RegisterGauge(adapter.CollectorOptions{
Name: "signal_count",
Help: "Total number of signals received per record type",
Labels: []string{"record_type"},
})

metricsRegistry.vinSignalCount = metricsCollector.RegisterGauge(adapter.CollectorOptions{
Name: "vin_signal_count",
Help: "Total number of signals received per vin per record type",
Labels: []string{"record_type", "vin"},
})

}
12 changes: 12 additions & 0 deletions telemetry/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ func (record *Record) ensureEncoded() {
}
}

// SignalsCount received per record_type from the vehicle
func (record *Record) SignalsCount() int {
switch payload := record.protoMessage.(type) {
case *protos.Payload:
return len(payload.GetData())
case *protos.VehicleAlerts:
return len(payload.GetAlerts())
default:
return 0
}
}

func (record *Record) applyProtoRecordTransforms() error {
switch record.TxType {
case "alerts":
Expand Down
3 changes: 3 additions & 0 deletions test/integration/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
"connect_timeout_ms": 30000,
"publish_timeout_ms": 1000
},
"vins_signal_tracking_enabled": [
"device-1"
],
"monitoring": {
"prometheus_metrics_port": 9090,
"profiler_port": 4269,
Expand Down