Skip to content

Commit

Permalink
Add E2E latency metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Nov 11, 2024
1 parent 9302e89 commit 467fa07
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 6 deletions.
10 changes: 9 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type configurationData struct {
DisableTelemetry bool `hcl:"disable_telemetry,optional"`
License *licenseConfig `hcl:"license,block"`
Retry *retryConfig `hcl:"retry,block"`
Metrics *metricsConfig `hcl:"metrics,block`
}

// component is a type to abstract over configuration blocks.
Expand Down Expand Up @@ -100,6 +101,10 @@ type retryConfig struct {
Setup *setupRetryConfig `hcl:"setup,block"`
}

type metricsConfig struct {
E2ELatencyEnabled bool `hcl:"enable_e2e_latency,optional"`
}

type transientRetryConfig struct {
Delay int `hcl:"delay_ms,optional"`
MaxAttempts int `hcl:"max_attempts,optional"`
Expand Down Expand Up @@ -142,6 +147,9 @@ func defaultConfigData() *configurationData {
Delay: 20000,
},
},
Metrics: &metricsConfig{
E2ELatencyEnabled: false,
},
}
}

Expand Down Expand Up @@ -359,7 +367,7 @@ func (c *Config) getStatsReceiver(tags map[string]string) (statsreceiveriface.St
switch useReceiver.Name {
case "statsd":
plug := statsreceiver.AdaptStatsDStatsReceiverFunc(
statsreceiver.NewStatsDReceiverWithTags(tags),
statsreceiver.NewStatsDReceiverWithTags(tags, c.Data.Metrics.E2ELatencyEnabled),
)
component, err := c.CreateComponent(plug, decoderOpts)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/models/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Message struct {
// Time the request was done, to measure request latency for debugging purposes - we manually track this timestamp unlike other metrics, to get as accurate as possible a picture of just the request latency.
TimeRequestFinished time.Time

CollectorTstamp time.Time

// AckFunc must be called on a successful message emission to ensure
// any cleanup process for the source is actioned
AckFunc func()
Expand Down
21 changes: 20 additions & 1 deletion pkg/models/observer_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type ObserverBuffer struct {
MaxRequestLatency time.Duration
MinRequestLatency time.Duration
SumRequestLatency time.Duration
MaxE2ELatency time.Duration
MinE2ELatency time.Duration
SumE2ELatency time.Duration
}

// AppendWrite adds a normal TargetWriteResult onto the buffer and stores the result
Expand Down Expand Up @@ -128,6 +131,14 @@ func (b *ObserverBuffer) appendWriteResult(res *TargetWriteResult) {
b.MinRequestLatency = res.MinRequestLatency
}
b.SumRequestLatency += res.AvgRequestLatency

if b.MaxE2ELatency < res.MaxE2ELatency {
b.MaxE2ELatency = res.MaxE2ELatency
}
if b.MinE2ELatency > res.MinE2ELatency || b.MinE2ELatency == time.Duration(0) {
b.MinE2ELatency = res.MinE2ELatency
}
b.SumE2ELatency += res.AvgE2ELatency
}

// AppendFiltered adds a FilterResult onto the buffer and stores the result
Expand Down Expand Up @@ -180,9 +191,14 @@ func (b *ObserverBuffer) GetAvgRequestLatency() time.Duration {
return common.GetAverageFromDuration(b.SumRequestLatency, b.MsgTotal)
}

// GetAvgRequestLatency calculates average request latency
func (b *ObserverBuffer) GetAvgE2ELatency() time.Duration {
return common.GetAverageFromDuration(b.SumE2ELatency, b.MsgTotal)
}

func (b *ObserverBuffer) String() string {
return fmt.Sprintf(
"TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d,SumTransformLatency:%d,SumProcLatency:%d,SumMsgLatency:%d,MinReqLatency:%d,MaxReqLatency:%d,SumReqLatency:%d",
"TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d,SumTransformLatency:%d,SumProcLatency:%d,SumMsgLatency:%d,MinReqLatency:%d,MaxReqLatency:%d,SumReqLatency:%d,MinE2ELatency:%d,MaxE2ELatency:%d,SumE2ELatency:%d",
b.TargetResults,
b.MsgFiltered,
b.MsgSent,
Expand All @@ -203,5 +219,8 @@ func (b *ObserverBuffer) String() string {
b.MinRequestLatency.Milliseconds(),
b.MaxRequestLatency.Milliseconds(),
b.SumRequestLatency.Milliseconds(),
b.MinE2ELatency.Milliseconds(),
b.MaxE2ELatency.Milliseconds(),
b.SumE2ELatency.Milliseconds(),
)
}
26 changes: 26 additions & 0 deletions pkg/models/target_write_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type TargetWriteResult struct {
MaxRequestLatency time.Duration
MinRequestLatency time.Duration
AvgRequestLatency time.Duration

MaxE2ELatency time.Duration
MinE2ELatency time.Duration
AvgE2ELatency time.Duration
}

// NewTargetWriteResult builds a result structure to return from a target write
Expand All @@ -84,6 +88,7 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa
var sumMessageLatency time.Duration
var sumTransformLatency time.Duration
var sumRequestLatency time.Duration
var sumE2eLatency time.Duration

for _, msg := range processed {
procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled)
Expand Down Expand Up @@ -124,13 +129,26 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa
r.MinRequestLatency = requestLatency
}
sumRequestLatency += requestLatency

var e2eLatency time.Duration
if !msg.CollectorTstamp.IsZero() {
e2eLatency = msg.TimeRequestFinished.Sub(msg.CollectorTstamp)
}
if r.MaxE2ELatency < e2eLatency {
r.MaxE2ELatency = e2eLatency
}
if r.MinE2ELatency > e2eLatency || r.MinE2ELatency == time.Duration(0) {
r.MinE2ELatency = e2eLatency
}
sumE2eLatency += e2eLatency
}

if processedLen > 0 {
r.AvgProcLatency = common.GetAverageFromDuration(sumProcLatency, processedLen)
r.AvgMsgLatency = common.GetAverageFromDuration(sumMessageLatency, processedLen)
r.AvgTransformLatency = common.GetAverageFromDuration(sumTransformLatency, processedLen)
r.AvgRequestLatency = common.GetAverageFromDuration(sumRequestLatency, processedLen)
r.AvgE2ELatency = common.GetAverageFromDuration(sumE2eLatency, processedLen)
}

return &r
Expand Down Expand Up @@ -186,6 +204,14 @@ func (wr *TargetWriteResult) Append(nwr *TargetWriteResult) *TargetWriteResult {
wrC.MinRequestLatency = nwr.MinRequestLatency
}
wrC.AvgRequestLatency = common.GetAverageFromDuration(wrC.AvgRequestLatency+nwr.AvgRequestLatency, 2)

if wrC.MaxE2ELatency < nwr.MaxE2ELatency {
wrC.MaxE2ELatency = nwr.MaxE2ELatency
}
if wrC.MinE2ELatency > nwr.MinE2ELatency || wrC.MinE2ELatency == time.Duration(0) {
wrC.MinE2ELatency = nwr.MinE2ELatency
}
wrC.AvgE2ELatency = common.GetAverageFromDuration(wrC.AvgE2ELatency+nwr.AvgE2ELatency, 2)
}

return &wrC
Expand Down
24 changes: 20 additions & 4 deletions pkg/statsreceiver/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ type StatsDStatsReceiverConfig struct {

// statsDStatsReceiver holds a new client for writing statistics to a StatsD server
type statsDStatsReceiver struct {
client *statsd.Client
client *statsd.Client
enableE2ELatency bool
}

// newStatsDStatsReceiver creates a new client for writing metrics to StatsD
func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsMapClient map[string]string) (*statsDStatsReceiver, error) {
func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsMapClient map[string]string, enableE2ELatency bool) (*statsDStatsReceiver, error) {

tagsMap := map[string]string{}
err := json.Unmarshal([]byte(tagsRaw), &tagsMap)
if err != nil {
Expand All @@ -59,19 +61,21 @@ func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsM
)

return &statsDStatsReceiver{
client: client,
client: client,
enableE2ELatency: enableE2ELatency,
}, nil
}

// NewStatsDReceiverWithTags closes over a given tags map and returns a function
// that creates a statsDStatsReceiver given a StatsDStatsReceiverConfig.
func NewStatsDReceiverWithTags(tags map[string]string) func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) {
func NewStatsDReceiverWithTags(tags map[string]string, enableE2ELatency bool) func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) {
return func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) {
return newStatsDStatsReceiver(
c.Address,
c.Prefix,
c.Tags,
tags,
enableE2ELatency,
)
}
}
Expand Down Expand Up @@ -141,4 +145,16 @@ func (s *statsDStatsReceiver) Send(b *models.ObserverBuffer) {
s.client.PrecisionTiming("min_request_latency", b.MinRequestLatency)
s.client.PrecisionTiming("max_request_latency", b.MaxRequestLatency)
s.client.PrecisionTiming("avg_request_latency", b.GetAvgRequestLatency())

if b.MinE2ELatency != time.Duration(0) {
s.client.PrecisionTiming("min_e2e_latency", b.MinE2ELatency)
}

if b.MaxE2ELatency != time.Duration(0) {
s.client.PrecisionTiming("max_e2e_latency", b.MaxE2ELatency)
}

if avgE2ELatency := b.GetAvgE2ELatency(); avgE2ELatency != time.Duration(0) {
s.client.PrecisionTiming("avg_e2e_latency", avgE2ELatency)
}
}
31 changes: 31 additions & 0 deletions pkg/transform/snowplow_collector_tstamp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package transform

import (
"time"

log "github.com/sirupsen/logrus"
"github.com/snowplow/snowbridge/pkg/models"
)

// CollectorTstampTransformation returns a transformation function
func CollectorTstampTransformation() TransformationFunction {
return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
parsedEvent, err := IntermediateAsSpEnrichedParsed(interState, message)
if err != nil {
log.Warnf("Error while extracting 'collector_tstamp'. Could not parse input message as Snowplow event, error: %s", err)
return message, nil, nil, nil
}

tstamp, err := parsedEvent.GetValue("collector_tstamp")
if err != nil {
log.Warnf("Error while extracting 'collector_tstamp', error: %s", err)
return message, nil, nil, parsedEvent
}

if collectorTstamp, ok := tstamp.(time.Time); ok {
message.CollectorTstamp = collectorTstamp
}

return message, nil, nil, parsedEvent
}
}
4 changes: 4 additions & 0 deletions pkg/transform/transformconfig/transform_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,9 @@ func GetTransformations(c *config.Config, supportedTransformations []config.Conf
funcs = append(funcs, f)
}

if c.Data.Metrics.E2ELatencyEnabled {
funcs = append(funcs, transform.CollectorTstampTransformation())
}

return transform.NewTransformation(funcs...), nil
}

0 comments on commit 467fa07

Please sign in to comment.