diff --git a/assets/docs/configuration/metrics/e2e-latency-example.hcl b/assets/docs/configuration/metrics/e2e-latency-example.hcl new file mode 100644 index 00000000..8585a38b --- /dev/null +++ b/assets/docs/configuration/metrics/e2e-latency-example.hcl @@ -0,0 +1,4 @@ +metrics { + # Optional toggle for E2E latency (difference between Snowplow collector timestamp and target write timestamp) + enable_e2e_latency = true +} \ No newline at end of file diff --git a/assets/docs/configuration/overview-full-example.hcl b/assets/docs/configuration/overview-full-example.hcl index a2e13703..61f245a7 100644 --- a/assets/docs/configuration/overview-full-example.hcl +++ b/assets/docs/configuration/overview-full-example.hcl @@ -108,6 +108,11 @@ retry { } } +metrics { + # Optional toggle for E2E latency (difference between Snowplow collector timestamp and target write timestamp) + enable_e2e_latency = true +} + license { accept = true } diff --git a/assets/test/transformconfig/TestEnginesAndTransformations/configs/transform-collector-tstamp.hcl b/assets/test/transformconfig/TestEnginesAndTransformations/configs/transform-collector-tstamp.hcl new file mode 100644 index 00000000..42ac15f2 --- /dev/null +++ b/assets/test/transformconfig/TestEnginesAndTransformations/configs/transform-collector-tstamp.hcl @@ -0,0 +1,3 @@ +metrics { + enable_e2e_latency = true +} \ No newline at end of file diff --git a/config/config.go b/config/config.go index d4cd33c3..41dec9ea 100644 --- a/config/config.go +++ b/config/config.go @@ -56,6 +56,7 @@ type configurationData struct { DisableTelemetry bool `hcl:"disable_telemetry,optional"` License *licenseConfig `hcl:"license,block"` Retry *retryConfig `hcl:"retry,block"` + Metrics *metricsConfig `hcl:"metrics,block"` } // component is a type to abstract over configuration blocks. @@ -100,6 +101,10 @@ type retryConfig struct { Setup *setupRetryConfig `hcl:"setup,block"` } +type metricsConfig struct { + E2ELatencyEnabled bool `hcl:"enable_e2e_latency,optional"` +} + type transientRetryConfig struct { Delay int `hcl:"delay_ms,optional"` MaxAttempts int `hcl:"max_attempts,optional"` @@ -142,6 +147,9 @@ func defaultConfigData() *configurationData { Delay: 20000, }, }, + Metrics: &metricsConfig{ + E2ELatencyEnabled: false, + }, } } @@ -359,7 +367,7 @@ func (c *Config) getStatsReceiver(tags map[string]string) (statsreceiveriface.St switch useReceiver.Name { case "statsd": plug := statsreceiver.AdaptStatsDStatsReceiverFunc( - statsreceiver.NewStatsDReceiverWithTags(tags), + statsreceiver.NewStatsDReceiverWithTags(tags, c.Data.Metrics.E2ELatencyEnabled), ) component, err := c.CreateComponent(plug, decoderOpts) if err != nil { diff --git a/docs/configuration_metrics_docs_test.go b/docs/configuration_metrics_docs_test.go new file mode 100644 index 00000000..248d214f --- /dev/null +++ b/docs/configuration_metrics_docs_test.go @@ -0,0 +1,30 @@ +/** + * Copyright (c) 2020-present Snowplow Analytics Ltd. + * All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package docs + +import ( + "path/filepath" + "testing" + + "github.com/snowplow/snowbridge/assets" + "github.com/stretchr/testify/assert" +) + +func TestMetricsConfigDocumentation(t *testing.T) { + assert := assert.New(t) + configPath := filepath.Join(assets.AssetsRootDir, "docs", "configuration", "metrics", "e2e-latency-example.hcl") + c := getConfigFromFilepath(t, configPath) + + metricsConfig := c.Data.Metrics + assert.NotNil(metricsConfig) + assert.Equal(true, metricsConfig.E2ELatencyEnabled) +} diff --git a/pkg/models/message.go b/pkg/models/message.go index bb33d85d..68f82e10 100644 --- a/pkg/models/message.go +++ b/pkg/models/message.go @@ -22,6 +22,9 @@ type Message struct { Data []byte HTTPHeaders map[string]string + // CollectorTstamp is the timestamp created by the Snowplow collector, extracted from the `collector_tstamp` atomic field. Used to measure E2E latency + CollectorTstamp time.Time + // TimeCreated is when the message was created originally TimeCreated time.Time diff --git a/pkg/models/observer_buffer.go b/pkg/models/observer_buffer.go index eaf1d4e5..a5e2d133 100644 --- a/pkg/models/observer_buffer.go +++ b/pkg/models/observer_buffer.go @@ -52,6 +52,9 @@ type ObserverBuffer struct { MaxRequestLatency time.Duration MinRequestLatency time.Duration SumRequestLatency time.Duration + MaxE2ELatency time.Duration + MinE2ELatency time.Duration + SumE2ELatency time.Duration } // AppendWrite adds a normal TargetWriteResult onto the buffer and stores the result @@ -128,6 +131,14 @@ func (b *ObserverBuffer) appendWriteResult(res *TargetWriteResult) { b.MinRequestLatency = res.MinRequestLatency } b.SumRequestLatency += res.AvgRequestLatency + + if b.MaxE2ELatency < res.MaxE2ELatency { + b.MaxE2ELatency = res.MaxE2ELatency + } + if b.MinE2ELatency > res.MinE2ELatency || b.MinE2ELatency == time.Duration(0) { + b.MinE2ELatency = res.MinE2ELatency + } + b.SumE2ELatency += res.AvgE2ELatency } // AppendFiltered adds a FilterResult onto the buffer and stores the result @@ -180,9 +191,14 @@ func (b *ObserverBuffer) GetAvgRequestLatency() time.Duration { return common.GetAverageFromDuration(b.SumRequestLatency, b.MsgTotal) } +// GetAvgE2ELatency calculates average E2E latency +func (b *ObserverBuffer) GetAvgE2ELatency() time.Duration { + return common.GetAverageFromDuration(b.SumE2ELatency, b.MsgTotal) +} + func (b *ObserverBuffer) String() string { return fmt.Sprintf( - "TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d,SumTransformLatency:%d,SumProcLatency:%d,SumMsgLatency:%d,MinReqLatency:%d,MaxReqLatency:%d,SumReqLatency:%d", + "TargetResults:%d,MsgFiltered:%d,MsgSent:%d,MsgFailed:%d,OversizedTargetResults:%d,OversizedMsgSent:%d,OversizedMsgFailed:%d,InvalidTargetResults:%d,InvalidMsgSent:%d,InvalidMsgFailed:%d,MaxProcLatency:%d,MaxMsgLatency:%d,MaxFilterLatency:%d,MaxTransformLatency:%d,SumTransformLatency:%d,SumProcLatency:%d,SumMsgLatency:%d,MinReqLatency:%d,MaxReqLatency:%d,SumReqLatency:%d,MinE2ELatency:%d,MaxE2ELatency:%d,SumE2ELatency:%d", b.TargetResults, b.MsgFiltered, b.MsgSent, @@ -203,5 +219,8 @@ func (b *ObserverBuffer) String() string { b.MinRequestLatency.Milliseconds(), b.MaxRequestLatency.Milliseconds(), b.SumRequestLatency.Milliseconds(), + b.MinE2ELatency.Milliseconds(), + b.MaxE2ELatency.Milliseconds(), + b.SumE2ELatency.Milliseconds(), ) } diff --git a/pkg/models/observer_buffer_test.go b/pkg/models/observer_buffer_test.go index 1fb13253..2d7108ab 100644 --- a/pkg/models/observer_buffer_test.go +++ b/pkg/models/observer_buffer_test.go @@ -30,6 +30,7 @@ func TestObserverBuffer(t *testing.T) { { Data: []byte("Baz"), PartitionKey: "partition1", + CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute), @@ -39,6 +40,7 @@ func TestObserverBuffer(t *testing.T) { { Data: []byte("Bar"), PartitionKey: "partition2", + CollectorTstamp: timeNow.Add(time.Duration(-80) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), @@ -50,6 +52,7 @@ func TestObserverBuffer(t *testing.T) { { Data: []byte("Foo"), PartitionKey: "partition3", + CollectorTstamp: timeNow.Add(time.Duration(-40) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute), @@ -119,7 +122,11 @@ func TestObserverBuffer(t *testing.T) { assert.Equal(time.Duration(8)*time.Minute, b.MaxRequestLatency) assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency) - assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000,SumTransformLatency:720000,SumProcLatency:2520000,SumMsgLatency:18000000,MinReqLatency:60000,MaxReqLatency:480000,SumReqLatency:1320000", b.String()) + assert.Equal(time.Duration(80)*time.Minute, b.MaxE2ELatency) + assert.Equal(time.Duration(40)*time.Minute, b.MinE2ELatency) + assert.Equal(time.Duration(60)*time.Minute, b.GetAvgE2ELatency()) + + assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000,SumTransformLatency:720000,SumProcLatency:2520000,SumMsgLatency:18000000,MinReqLatency:60000,MaxReqLatency:480000,SumReqLatency:1320000,MinE2ELatency:2400000,MaxE2ELatency:4800000,SumE2ELatency:21600000", b.String()) } // TestObserverBuffer_Basic is a basic version of the above test, stripping away all but one event @@ -191,7 +198,7 @@ func TestObserverBuffer_Basic(t *testing.T) { assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency) assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency) - assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String()) + assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000,MinE2ELatency:0,MaxE2ELatency:0,SumE2ELatency:0", b.String()) } // TestObserverBuffer_BasicNoTransform is a basic version of the above test, stripping away all but one event. @@ -260,5 +267,5 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) { assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency) assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency) - assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String()) + assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000,MinE2ELatency:0,MaxE2ELatency:0,SumE2ELatency:0", b.String()) } diff --git a/pkg/models/target_write_result.go b/pkg/models/target_write_result.go index bf296291..26ed30a4 100644 --- a/pkg/models/target_write_result.go +++ b/pkg/models/target_write_result.go @@ -61,6 +61,10 @@ type TargetWriteResult struct { MaxRequestLatency time.Duration MinRequestLatency time.Duration AvgRequestLatency time.Duration + + MaxE2ELatency time.Duration + MinE2ELatency time.Duration + AvgE2ELatency time.Duration } // NewTargetWriteResult builds a result structure to return from a target write @@ -84,6 +88,7 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa var sumMessageLatency time.Duration var sumTransformLatency time.Duration var sumRequestLatency time.Duration + var sumE2ELatency time.Duration for _, msg := range processed { procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled) @@ -124,6 +129,18 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa r.MinRequestLatency = requestLatency } sumRequestLatency += requestLatency + + var e2eLatency time.Duration + if !msg.CollectorTstamp.IsZero() { + e2eLatency = msg.TimeRequestFinished.Sub(msg.CollectorTstamp) + } + if r.MaxE2ELatency < e2eLatency { + r.MaxE2ELatency = e2eLatency + } + if r.MinE2ELatency > e2eLatency || r.MinE2ELatency == time.Duration(0) { + r.MinE2ELatency = e2eLatency + } + sumE2ELatency += e2eLatency } if processedLen > 0 { @@ -131,6 +148,7 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa r.AvgMsgLatency = common.GetAverageFromDuration(sumMessageLatency, processedLen) r.AvgTransformLatency = common.GetAverageFromDuration(sumTransformLatency, processedLen) r.AvgRequestLatency = common.GetAverageFromDuration(sumRequestLatency, processedLen) + r.AvgE2ELatency = common.GetAverageFromDuration(sumE2ELatency, processedLen) } return &r @@ -186,6 +204,14 @@ func (wr *TargetWriteResult) Append(nwr *TargetWriteResult) *TargetWriteResult { wrC.MinRequestLatency = nwr.MinRequestLatency } wrC.AvgRequestLatency = common.GetAverageFromDuration(wrC.AvgRequestLatency+nwr.AvgRequestLatency, 2) + + if wrC.MaxE2ELatency < nwr.MaxE2ELatency { + wrC.MaxE2ELatency = nwr.MaxE2ELatency + } + if wrC.MinE2ELatency > nwr.MinE2ELatency || wrC.MinE2ELatency == time.Duration(0) { + wrC.MinE2ELatency = nwr.MinE2ELatency + } + wrC.AvgE2ELatency = common.GetAverageFromDuration(wrC.AvgE2ELatency+nwr.AvgE2ELatency, 2) } return &wrC diff --git a/pkg/models/target_write_result_test.go b/pkg/models/target_write_result_test.go index 9ab26c19..c012be26 100644 --- a/pkg/models/target_write_result_test.go +++ b/pkg/models/target_write_result_test.go @@ -40,30 +40,10 @@ func TestNewTargetWriteResult_EmptyWithoutTime(t *testing.T) { assert.Equal(time.Duration(0), r.MaxTransformLatency) assert.Equal(time.Duration(0), r.MinTransformLatency) assert.Equal(time.Duration(0), r.AvgTransformLatency) -} - -// TestNewTargetWriteResult_EmptyWithTime tests that an empty targetWriteResult with no a provided timestamp will report 0s across the board -func TestNewTargetWriteResult_EmptyWithTime(t *testing.T) { - assert := assert.New(t) - - r := NewTargetWriteResult(nil, nil, nil, nil) - assert.NotNil(r) - assert.Equal(int64(0), r.SentCount) - assert.Equal(int64(0), r.FailedCount) - assert.Equal(int64(0), r.Total()) - - assert.Equal(time.Duration(0), r.MaxProcLatency) - assert.Equal(time.Duration(0), r.MinProcLatency) - assert.Equal(time.Duration(0), r.AvgProcLatency) - - assert.Equal(time.Duration(0), r.MaxMsgLatency) - assert.Equal(time.Duration(0), r.MinMsgLatency) - assert.Equal(time.Duration(0), r.AvgMsgLatency) - - assert.Equal(time.Duration(0), r.MaxTransformLatency) - assert.Equal(time.Duration(0), r.MinTransformLatency) - assert.Equal(time.Duration(0), r.AvgTransformLatency) + assert.Equal(time.Duration(0), r.MaxE2ELatency) + assert.Equal(time.Duration(0), r.MinE2ELatency) + assert.Equal(time.Duration(0), r.AvgE2ELatency) } // TestNewTargetWriteResult_WithMessages tests that reporting of statistics is as it should be when we have all data @@ -76,6 +56,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Baz"), PartitionKey: "partition1", + CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute), @@ -84,6 +65,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Bar"), PartitionKey: "partition2", + CollectorTstamp: timeNow.Add(time.Duration(-80) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), @@ -94,6 +76,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Foo"), PartitionKey: "partition3", + CollectorTstamp: timeNow.Add(time.Duration(-40) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute), @@ -116,11 +99,15 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { assert.Equal(time.Duration(3)*time.Minute, r.MaxTransformLatency) assert.Equal(time.Duration(1)*time.Minute, r.MinTransformLatency) assert.Equal(time.Duration(2)*time.Minute, r.AvgTransformLatency) + assert.Equal(time.Duration(80)*time.Minute, r.MaxE2ELatency) + assert.Equal(time.Duration(40)*time.Minute, r.MinE2ELatency) + assert.Equal(time.Duration(60)*time.Minute, r.AvgE2ELatency) sent1 := []*Message{ { Data: []byte("Baz"), PartitionKey: "partition1", + CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute), TimePulled: timeNow.Add(time.Duration(-2) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute), @@ -131,6 +118,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Bar"), PartitionKey: "partition2", + CollectorTstamp: timeNow.Add(time.Duration(-120) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute), TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), @@ -139,6 +127,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Foo"), PartitionKey: "partition3", + CollectorTstamp: timeNow.Add(time.Duration(-30) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute), TimePulled: timeNow.Add(time.Duration(-15) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute), @@ -172,10 +161,13 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { assert.Equal(time.Duration(8)*time.Minute, r3.MaxTransformLatency) assert.Equal(time.Duration(1)*time.Minute, r3.MinTransformLatency) assert.Equal(time.Duration(3)*time.Minute, r3.AvgTransformLatency) + assert.Equal(time.Duration(120)*time.Minute, r3.MaxE2ELatency) + assert.Equal(time.Duration(30)*time.Minute, r3.MinE2ELatency) + assert.Equal(time.Duration(65)*time.Minute, r3.AvgE2ELatency) } -// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed -func TestNewTargetWriteResult_NoTransformation(t *testing.T) { +// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed and no collector timestamp +func TestNewTargetWriteResult_NoTransformation_NoE2E(t *testing.T) { assert := assert.New(t) timeNow := time.Now().UTC() @@ -221,4 +213,7 @@ func TestNewTargetWriteResult_NoTransformation(t *testing.T) { assert.Equal(time.Duration(0), r.MaxTransformLatency) assert.Equal(time.Duration(0), r.MinTransformLatency) assert.Equal(time.Duration(0), r.AvgTransformLatency) + assert.Equal(time.Duration(0), r.MaxE2ELatency) + assert.Equal(time.Duration(0), r.MinE2ELatency) + assert.Equal(time.Duration(0), r.AvgE2ELatency) } diff --git a/pkg/statsreceiver/statsd.go b/pkg/statsreceiver/statsd.go index a8d2a034..52a9f0ee 100644 --- a/pkg/statsreceiver/statsd.go +++ b/pkg/statsreceiver/statsd.go @@ -31,11 +31,13 @@ type StatsDStatsReceiverConfig struct { // statsDStatsReceiver holds a new client for writing statistics to a StatsD server type statsDStatsReceiver struct { - client *statsd.Client + client *statsd.Client + enableE2ELatency bool } // newStatsDStatsReceiver creates a new client for writing metrics to StatsD -func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsMapClient map[string]string) (*statsDStatsReceiver, error) { +func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsMapClient map[string]string, enableE2ELatency bool) (*statsDStatsReceiver, error) { + tagsMap := map[string]string{} err := json.Unmarshal([]byte(tagsRaw), &tagsMap) if err != nil { @@ -59,19 +61,21 @@ func newStatsDStatsReceiver(address string, prefix string, tagsRaw string, tagsM ) return &statsDStatsReceiver{ - client: client, + client: client, + enableE2ELatency: enableE2ELatency, }, nil } // NewStatsDReceiverWithTags closes over a given tags map and returns a function // that creates a statsDStatsReceiver given a StatsDStatsReceiverConfig. -func NewStatsDReceiverWithTags(tags map[string]string) func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) { +func NewStatsDReceiverWithTags(tags map[string]string, enableE2ELatency bool) func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) { return func(c *StatsDStatsReceiverConfig) (*statsDStatsReceiver, error) { return newStatsDStatsReceiver( c.Address, c.Prefix, c.Tags, tags, + enableE2ELatency, ) } } @@ -141,4 +145,10 @@ func (s *statsDStatsReceiver) Send(b *models.ObserverBuffer) { s.client.PrecisionTiming("min_request_latency", b.MinRequestLatency) s.client.PrecisionTiming("max_request_latency", b.MaxRequestLatency) s.client.PrecisionTiming("avg_request_latency", b.GetAvgRequestLatency()) + + if s.enableE2ELatency { + s.client.PrecisionTiming("min_e2e_latency", b.MinE2ELatency) + s.client.PrecisionTiming("max_e2e_latency", b.MaxE2ELatency) + s.client.PrecisionTiming("avg_e2e_latency", b.GetAvgE2ELatency()) + } } diff --git a/pkg/transform/snowplow_collector_tstamp.go b/pkg/transform/snowplow_collector_tstamp.go new file mode 100644 index 00000000..aa903749 --- /dev/null +++ b/pkg/transform/snowplow_collector_tstamp.go @@ -0,0 +1,33 @@ +package transform + +import ( + "time" + + log "github.com/sirupsen/logrus" + "github.com/snowplow/snowbridge/pkg/models" +) + +// CollectorTstampTransformation returns a transformation function attaching collector timestamp to the input message +// This transformation is not like other configurable transformations - it's enabled/disabled based on top-level metric configuration toggle (`metrics.enable_e2e_latency`) +// It doesn't produce invalid data in case of errors - it logs a warning and proceeds with input data as nothing happened. +func CollectorTstampTransformation() TransformationFunction { + return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { + parsedEvent, err := IntermediateAsSpEnrichedParsed(interState, message) + if err != nil { + log.Warnf("Error while extracting 'collector_tstamp': %s", err) + return message, nil, nil, nil + } + + tstamp, err := parsedEvent.GetValue("collector_tstamp") + if err != nil { + log.Warnf("Error while extracting 'collector_tstamp': %s", err) + return message, nil, nil, parsedEvent + } + + if collectorTstamp, ok := tstamp.(time.Time); ok { + message.CollectorTstamp = collectorTstamp + } + + return message, nil, nil, parsedEvent + } +} diff --git a/pkg/transform/snowplow_collector_tstamp_test.go b/pkg/transform/snowplow_collector_tstamp_test.go new file mode 100644 index 00000000..cf62879f --- /dev/null +++ b/pkg/transform/snowplow_collector_tstamp_test.go @@ -0,0 +1,44 @@ +package transform + +import ( + "testing" + "time" + + "github.com/snowplow/snowbridge/pkg/models" + "github.com/stretchr/testify/assert" +) + +func TestCollectorTstamp_Snowplow_Data(t *testing.T) { + assert := assert.New(t) + + input := models.Message{ + Data: SnowplowTsv1, + PartitionKey: "some-key", + } + + ts := CollectorTstampTransformation() + + good, filtered, invalid, _ := ts(&input, nil) + + assert.Equal(time.Date(2019, 5, 10, 14, 40, 35, 972000000, time.UTC), good.CollectorTstamp) + assert.Empty(filtered) + assert.Empty(invalid) +} + +func TestCollectorTstamp_Non_Snowplow_Data(t *testing.T) { + assert := assert.New(t) + + input := &models.Message{ + Data: []byte("Some kind of custom non-Snowplow data"), + PartitionKey: "some-key", + } + + ts := CollectorTstampTransformation() + + good, filtered, invalid, _ := ts(input, nil) + + assert.Equal(input, good) + assert.Empty(good.CollectorTstamp) + assert.Empty(filtered) + assert.Empty(invalid) +} diff --git a/pkg/transform/transformconfig/transform_config.go b/pkg/transform/transformconfig/transform_config.go index ae6b76fc..205224ca 100644 --- a/pkg/transform/transformconfig/transform_config.go +++ b/pkg/transform/transformconfig/transform_config.go @@ -66,5 +66,9 @@ func GetTransformations(c *config.Config, supportedTransformations []config.Conf funcs = append(funcs, f) } + if c.Data.Metrics.E2ELatencyEnabled { + funcs = append(funcs, transform.CollectorTstampTransformation()) + } + return transform.NewTransformation(funcs...), nil } diff --git a/pkg/transform/transformconfig/transform_config_test.go b/pkg/transform/transformconfig/transform_config_test.go index 44e86206..1d75563d 100644 --- a/pkg/transform/transformconfig/transform_config_test.go +++ b/pkg/transform/transformconfig/transform_config_test.go @@ -18,6 +18,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/stretchr/testify/assert" @@ -165,6 +166,21 @@ func TestEnginesAndTransformations(t *testing.T) { }}, }, }, + { + Description: `e2e latency metric enabled -> collector tstamp attached`, + File: "transform-collector-tstamp.hcl", + ExpectedMessages: expectedMessages{ + Before: []*models.Message{{ + Data: snowplowTsv1, + PartitionKey: "some-key", + }}, + After: []*models.Message{{ + Data: snowplowTsv1, + PartitionKey: "some-key", + CollectorTstamp: time.Date(2019, 5, 10, 14, 40, 35, 972000000, time.UTC), + }}, + }, + }, } // Absolute paths to scripts @@ -232,6 +248,11 @@ func TestEnginesAndTransformations(t *testing.T) { assert.Equal(resultMessage.GetError(), tt.ExpectedMessages.After[idx].GetError()) } + // check if collector timestamp has been attached + for idx, resultMessage := range result.Result { + assert.Equal(resultMessage.CollectorTstamp, tt.ExpectedMessages.After[idx].CollectorTstamp) + } + // check result for transformed messages in case of filtered results if result.FilteredCount != 0 { assert.NotNil(result.Filtered)