diff --git a/assets/docs/configuration/metrics/e2e-latency-example.hcl b/assets/docs/configuration/metrics/e2e-latency-example.hcl index 9ddeba12..8585a38b 100644 --- a/assets/docs/configuration/metrics/e2e-latency-example.hcl +++ b/assets/docs/configuration/metrics/e2e-latency-example.hcl @@ -1,4 +1,4 @@ metrics { # Optional toggle for E2E latency (difference between Snowplow collector timestamp and target write timestamp) enable_e2e_latency = true -} +} \ No newline at end of file diff --git a/pkg/models/target_write_result.go b/pkg/models/target_write_result.go index 1f2db699..ab80a62b 100644 --- a/pkg/models/target_write_result.go +++ b/pkg/models/target_write_result.go @@ -88,7 +88,11 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa var sumMessageLatency time.Duration var sumTransformLatency time.Duration var sumRequestLatency time.Duration - var sumE2eLatency time.Duration + var sumE2ELatency time.Duration + + // Some of processed messages may have NO attached collector tstamp (non-Snowplow data), so we have separate counter. + // It's used to calculate avarage E2E latency + var withCollectorTstampCount int64 for _, msg := range processed { procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled) @@ -131,16 +135,18 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa sumRequestLatency += requestLatency var e2eLatency time.Duration + // Calculate E2E only when there is collector tstamp present if !msg.CollectorTstamp.IsZero() { e2eLatency = msg.TimeRequestFinished.Sub(msg.CollectorTstamp) + sumE2ELatency += e2eLatency + withCollectorTstampCount += 1 + if r.MaxE2ELatency < e2eLatency { + r.MaxE2ELatency = e2eLatency + } + if r.MinE2ELatency > e2eLatency || r.MinE2ELatency == time.Duration(0) { + r.MinE2ELatency = e2eLatency + } } - if r.MaxE2ELatency < e2eLatency { - r.MaxE2ELatency = e2eLatency - } - if r.MinE2ELatency > e2eLatency || r.MinE2ELatency == time.Duration(0) { - r.MinE2ELatency = e2eLatency - } - sumE2eLatency += e2eLatency } if processedLen > 0 { @@ -148,7 +154,7 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa r.AvgMsgLatency = common.GetAverageFromDuration(sumMessageLatency, processedLen) r.AvgTransformLatency = common.GetAverageFromDuration(sumTransformLatency, processedLen) r.AvgRequestLatency = common.GetAverageFromDuration(sumRequestLatency, processedLen) - r.AvgE2ELatency = common.GetAverageFromDuration(sumE2eLatency, processedLen) + r.AvgE2ELatency = common.GetAverageFromDuration(sumE2ELatency, withCollectorTstampCount) } return &r @@ -208,10 +214,16 @@ func (wr *TargetWriteResult) Append(nwr *TargetWriteResult) *TargetWriteResult { if wrC.MaxE2ELatency < nwr.MaxE2ELatency { wrC.MaxE2ELatency = nwr.MaxE2ELatency } - if wrC.MinE2ELatency > nwr.MinE2ELatency || wrC.MinE2ELatency == time.Duration(0) { + if (wrC.MinE2ELatency > nwr.MinE2ELatency || wrC.MinE2ELatency == time.Duration(0)) && nwr.MinE2ELatency != time.Duration(0) { wrC.MinE2ELatency = nwr.MinE2ELatency } - wrC.AvgE2ELatency = common.GetAverageFromDuration(wrC.AvgE2ELatency+nwr.AvgE2ELatency, 2) + + if wrC.AvgE2ELatency == time.Duration(0) { + wrC.AvgE2ELatency = nwr.AvgE2ELatency + } else if nwr.AvgE2ELatency != time.Duration(0) { + // Only calcuate new average when both wrC and nwr are non-zero + wrC.AvgE2ELatency = common.GetAverageFromDuration(wrC.AvgE2ELatency+nwr.AvgE2ELatency, 2) + } } return &wrC diff --git a/pkg/models/target_write_result_test.go b/pkg/models/target_write_result_test.go index c012be26..79002aa1 100644 --- a/pkg/models/target_write_result_test.go +++ b/pkg/models/target_write_result_test.go @@ -107,7 +107,6 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Baz"), PartitionKey: "partition1", - CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute), TimePulled: timeNow.Add(time.Duration(-2) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute), @@ -118,7 +117,6 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Bar"), PartitionKey: "partition2", - CollectorTstamp: timeNow.Add(time.Duration(-120) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute), TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), @@ -127,7 +125,6 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Foo"), PartitionKey: "partition3", - CollectorTstamp: timeNow.Add(time.Duration(-30) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute), TimePulled: timeNow.Add(time.Duration(-15) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute), @@ -161,9 +158,9 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { assert.Equal(time.Duration(8)*time.Minute, r3.MaxTransformLatency) assert.Equal(time.Duration(1)*time.Minute, r3.MinTransformLatency) assert.Equal(time.Duration(3)*time.Minute, r3.AvgTransformLatency) - assert.Equal(time.Duration(120)*time.Minute, r3.MaxE2ELatency) - assert.Equal(time.Duration(30)*time.Minute, r3.MinE2ELatency) - assert.Equal(time.Duration(65)*time.Minute, r3.AvgE2ELatency) + assert.Equal(time.Duration(80)*time.Minute, r3.MaxE2ELatency) + assert.Equal(time.Duration(40)*time.Minute, r3.MinE2ELatency) + assert.Equal(time.Duration(60)*time.Minute, r3.AvgE2ELatency) } // TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed and no collector timestamp diff --git a/pkg/transform/snowplow_collector_tstamp.go b/pkg/transform/snowplow_collector_tstamp.go index 14dec8c3..aa903749 100644 --- a/pkg/transform/snowplow_collector_tstamp.go +++ b/pkg/transform/snowplow_collector_tstamp.go @@ -7,18 +7,20 @@ import ( "github.com/snowplow/snowbridge/pkg/models" ) -// CollectorTstampTransformation returns a transformation function +// CollectorTstampTransformation returns a transformation function attaching collector timestamp to the input message +// This transformation is not like other configurable transformations - it's enabled/disabled based on top-level metric configuration toggle (`metrics.enable_e2e_latency`) +// It doesn't produce invalid data in case of errors - it logs a warning and proceeds with input data as nothing happened. func CollectorTstampTransformation() TransformationFunction { return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) { parsedEvent, err := IntermediateAsSpEnrichedParsed(interState, message) if err != nil { - log.Warnf("Error while extracting 'collector_tstamp'. Could not parse input message as Snowplow event, error: %s", err) + log.Warnf("Error while extracting 'collector_tstamp': %s", err) return message, nil, nil, nil } tstamp, err := parsedEvent.GetValue("collector_tstamp") if err != nil { - log.Warnf("Error while extracting 'collector_tstamp', error: %s", err) + log.Warnf("Error while extracting 'collector_tstamp': %s", err) return message, nil, nil, parsedEvent }