From 4c857a8e2ef8429dd0d45c0d191f171da83f30f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Poniedzia=C5=82ek?= Date: Mon, 11 Nov 2024 17:51:50 +0100 Subject: [PATCH] Add E2E latency metrics --- config/config.go | 10 +++++- pkg/models/message.go | 2 ++ pkg/models/observer_buffer.go | 21 ++++++++++++- pkg/models/target_write_result.go | 26 ++++++++++++++++ pkg/statsreceiver/statsd.go | 18 ++++++++--- pkg/transform/snowplow_collector_tstamp.go | 31 +++++++++++++++++++ .../transformconfig/transform_config.go | 4 +++ 7 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 pkg/transform/snowplow_collector_tstamp.go diff --git a/config/config.go b/config/config.go index d4cd33c3..d1634e16 100644 --- a/config/config.go +++ b/config/config.go @@ -56,6 +56,7 @@ type configurationData struct { DisableTelemetry bool `hcl:"disable_telemetry,optional"` License *licenseConfig `hcl:"license,block"` Retry *retryConfig `hcl:"retry,block"` + Metrics *metricsConfig `hcl:"metrics,block` } // component is a type to abstract over configuration blocks. @@ -100,6 +101,10 @@ type retryConfig struct { Setup *setupRetryConfig `hcl:"setup,block"` } +type metricsConfig struct { + E2ELatencyEnabled bool `hcl:"enable_e2e_latency,optional"` +} + type transientRetryConfig struct { Delay int `hcl:"delay_ms,optional"` MaxAttempts int `hcl:"max_attempts,optional"` @@ -142,6 +147,9 @@ func defaultConfigData() *configurationData { Delay: 20000, }, }, + Metrics: &metricsConfig{ + E2ELatencyEnabled: false, + }, } } @@ -359,7 +367,7 @@ func (c *Config) getStatsReceiver(tags map[string]string) (statsreceiveriface.St switch useReceiver.Name { case "statsd": plug := statsreceiver.AdaptStatsDStatsReceiverFunc( - statsreceiver.NewStatsDReceiverWithTags(tags), + statsreceiver.NewStatsDReceiverWithTags(tags, c.Data.Metrics.E2ELatencyEnabled), ) component, err := c.CreateComponent(plug, decoderOpts) if err != nil { diff --git a/pkg/models/message.go b/pkg/models/message.go index bb33d85d..1c55eec0 100644 --- a/pkg/models/message.go +++ b/pkg/models/message.go @@ -37,6 +37,8 @@ type Message struct { // Time the request was done, to measure request latency for debugging purposes - we manually track this timestamp unlike other metrics, to get as accurate as possible a picture of just the request latency. TimeRequestFinished time.Time + CollectorTstamp time.Time + // AckFunc must be called on a successful message emission to ensure // any cleanup process for the source is actioned AckFunc func() diff --git a/pkg/models/observer_buffer.go b/pkg/models/observer_buffer.go index eaf1d4e5..e9f5fd5d 100644 --- a/pkg/models/observer_buffer.go +++ b/pkg/models/observer_buffer.go @@ -52,6 +52,9 @@ type ObserverBuffer struct { MaxRequestLatency time.Duration MinRequestLatency time.Duration SumRequestLatency time.Duration + MaxE2ELatency time.Duration + MinE2ELatency time.Duration + SumE2ELatency time.Duration } // AppendWrite adds a normal TargetWriteResult onto the buffer and stores the result @@ -128,6 +131,14 @@ func (b *ObserverBuffer) appendWriteResult(res *TargetWriteResult) { b.MinRequestLatency = res.MinRequestLatency } b.SumRequestLatency += res.AvgRequestLatency + + if b.MaxE2ELatency < res.MaxE2ELatency { + b.MaxE2ELatency = res.MaxE2ELatency + } + if b.MinE2ELatency > res.MinE2ELatency || b.MinE2ELatency == time.Duration(0) { + b.MinE2ELatency = res.MinE2ELatency + } + b.SumE2ELatency += res.AvgE2ELatency } // AppendFiltered adds a FilterResult onto the buffer and stores the result @@ -180,9 +191,14 @@ func (b *ObserverBuffer) GetAvgRequestLatency() time.Duration { return common.GetAverageFromDuration(b.SumRequestLatency, b.MsgTotal) } +// GetAvgRequestLatency calculates average request latency +func (b *ObserverBuffer) GetAvgE2ELatency() time.Duration { + return common.GetAverageFromDuration(b.SumE2ELatency, b.MsgTotal) +} + func (b *ObserverBuffer) String() string { return fmt.Sprintf( - "TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d,SumTransformLatency:%d,SumProcLatency:%d,SumMsgLatency:%d,MinReqLatency:%d,MaxReqLatency:%d,SumReqLatency:%d", + "TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d,SumTransformLatency:%d,SumProcLatency:%d,SumMsgLatency:%d,MinReqLatency:%d,MaxReqLatency:%d,SumReqLatency:%d,MinE2ELatency:%d,MaxE2ELatency:%d,SumE2ELatency:%d", b.TargetResults, b.MsgFiltered, b.MsgSent, @@ -203,5 +219,8 @@ func (b *ObserverBuffer) String() string { b.MinRequestLatency.Milliseconds(), b.MaxRequestLatency.Milliseconds(), b.SumRequestLatency.Milliseconds(), + b.MinE2ELatency.Milliseconds(), + b.MaxE2ELatency.Milliseconds(), + b.SumE2ELatency.Milliseconds(), ) } diff --git a/pkg/models/target_write_result.go b/pkg/models/target_write_result.go index bf296291..1f2db699 100644 --- a/pkg/models/target_write_result.go +++ b/pkg/models/target_write_result.go @@ -61,6 +61,10 @@ type TargetWriteResult struct { MaxRequestLatency time.Duration MinRequestLatency time.Duration AvgRequestLatency time.Duration + + MaxE2ELatency time.Duration + MinE2ELatency time.Duration + AvgE2ELatency time.Duration } // NewTargetWriteResult builds a result structure to return from a target write @@ -84,6 +88,7 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa var sumMessageLatency time.Duration var sumTransformLatency time.Duration var sumRequestLatency time.Duration + var sumE2eLatency time.Duration for _, msg := range processed { procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled) @@ -124,6 +129,18 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa r.MinRequestLatency = requestLatency } sumRequestLatency += requestLatency + + var e2eLatency time.Duration + if !msg.CollectorTstamp.IsZero() { + e2eLatency = msg.TimeRequestFinished.Sub(msg.CollectorTstamp) + } + if r.MaxE2ELatency < e2eLatency { + r.MaxE2ELatency = e2eLatency + } + if r.MinE2ELatency > e2eLatency || r.MinE2ELatency == time.Duration(0) { + r.MinE2ELatency = e2eLatency + } + sumE2eLatency += e2eLatency } if processedLen > 0 { @@ -131,6 +148,7 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa r.AvgMsgLatency = common.GetAverageFromDuration(sumMessageLatency, processedLen) r.AvgTransformLatency = common.GetAverageFromDuration(sumTransformLatency, processedLen) r.AvgRequestLatency = common.GetAverageFromDuration(sumRequestLatency, processedLen) + r.AvgE2ELatency = common.GetAverageFromDuration(sumE2eLatency, processedLen) } return &r @@ -186,6 +204,14 @@ func (wr *TargetWriteResult) Append(nwr *TargetWriteResult) *TargetWriteResult { wrC.MinRequestLatency = nwr.MinRequestLatency } wrC.AvgRequestLatency = common.GetAverageFromDuration(wrC.AvgRequestLatency+nwr.AvgRequestLatency, 2) + + if wrC.MaxE2ELatency < nwr.MaxE2ELatency { + wrC.MaxE2ELatency = nwr.MaxE2ELatency + } + if wrC.MinE2ELatency > nwr.MinE2ELatency || wrC.MinE2ELatency == time.Duration(0) { + wrC.MinE2ELatency = nwr.MinE2ELatency + } + wrC.AvgE2ELatency = common.GetAverageFromDuration(wrC.AvgE2ELatency+nwr.AvgE2ELatency, 2) } return &wrC diff --git a/pkg/statsreceiver/statsd.go b/pkg/statsreceiver/statsd.go index a8d2a034..52a9f0ee 100644 --- a/pkg/statsreceiver/statsd.go +++ b/pkg/statsreceiver/statsd.go @@ -31,11 +31,13 @@ type StatsDStatsReceiverConfig struct { // statsDStatsReceiver holds a new client for writing statistics to a StatsD server type statsDStatsReceiver struct { - client *statsd.Client + client *statsd.Client + enableE2ELatency bool } // newStatsDStatsReceiver creates a new client for writing metrics to StatsD -func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsMapClient map[string]string) (*statsDStatsReceiver, error) { +func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsMapClient map[string]string, enableE2ELatency bool) (*statsDStatsReceiver, error) { + tagsMap := map[string]string{} err := json.Unmarshal([]byte(tagsRaw), &tagsMap) if err != nil { @@ -59,19 +61,21 @@ func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsM ) return &statsDStatsReceiver{ - client: client, + client: client, + enableE2ELatency: enableE2ELatency, }, nil } // NewStatsDReceiverWithTags closes over a given tags map and returns a function // that creates a statsDStatsReceiver given a StatsDStatsReceiverConfig. -func NewStatsDReceiverWithTags(tags map[string]string) func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) { +func NewStatsDReceiverWithTags(tags map[string]string, enableE2ELatency bool) func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) { return func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) { return newStatsDStatsReceiver( c.Address, c.Prefix, c.Tags, tags, + enableE2ELatency, ) } } @@ -141,4 +145,10 @@ func (s *statsDStatsReceiver) Send(b *models.ObserverBuffer) { s.client.PrecisionTiming("min_request_latency", b.MinRequestLatency) s.client.PrecisionTiming("max_request_latency", b.MaxRequestLatency) s.client.PrecisionTiming("avg_request_latency", b.GetAvgRequestLatency()) + + if s.enableE2ELatency { + s.client.PrecisionTiming("min_e2e_latency", b.MinE2ELatency) + s.client.PrecisionTiming("max_e2e_latency", b.MaxE2ELatency) + s.client.PrecisionTiming("avg_e2e_latency", b.GetAvgE2ELatency()) + } } diff --git a/pkg/transform/snowplow_collector_tstamp.go b/pkg/transform/snowplow_collector_tstamp.go new file mode 100644 index 00000000..14dec8c3 --- /dev/null +++ b/pkg/transform/snowplow_collector_tstamp.go @@ -0,0 +1,31 @@ +package transform + +import ( + "time" + + log "github.com/sirupsen/logrus" + "github.com/snowplow/snowbridge/pkg/models" +) + +// CollectorTstampTransformation returns a transformation function +func CollectorTstampTransformation() TransformationFunction { + return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + parsedEvent, err := IntermediateAsSpEnrichedParsed(interState, message) + if err != nil { + log.Warnf("Error while extracting 'collector_tstamp'. Could not parse input message as Snowplow event, error: %s", err) + return message, nil, nil, nil + } + + tstamp, err := parsedEvent.GetValue("collector_tstamp") + if err != nil { + log.Warnf("Error while extracting 'collector_tstamp', error: %s", err) + return message, nil, nil, parsedEvent + } + + if collectorTstamp, ok := tstamp.(time.Time); ok { + message.CollectorTstamp = collectorTstamp + } + + return message, nil, nil, parsedEvent + } +} diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/transformconfig/transform_config.go index ae6b76fc..205224ca 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/transformconfig/transform_config.go @@ -66,5 +66,9 @@ func GetTransformations(c *config.Config, supportedTransformations []config.Conf funcs = append(funcs, f) } + if c.Data.Metrics.E2ELatencyEnabled { + funcs = append(funcs, transform.CollectorTstampTransformation()) + } + return transform.NewTransformation(funcs...), nil }