diff --git a/pkg/models/message.go b/pkg/models/message.go index 1c55eec0..68f82e10 100644 --- a/pkg/models/message.go +++ b/pkg/models/message.go @@ -22,6 +22,9 @@ type Message struct { Data []byte HTTPHeaders map[string]string + // CollectorTstamp is the timestamp created by the Snowplow collector, extracted from the `collector_tstamp` atomic field. Used to measure E2E latency + CollectorTstamp time.Time + // TimeCreated is when the message was created originally TimeCreated time.Time @@ -37,8 +40,6 @@ type Message struct { // Time the request was done, to measure request latency for debugging purposes - we manually track this timestamp unlike other metrics, to get as accurate as possible a picture of just the request latency. TimeRequestFinished time.Time - CollectorTstamp time.Time - // AckFunc must be called on a successful message emission to ensure // any cleanup process for the source is actioned AckFunc func() diff --git a/pkg/models/observer_buffer_test.go b/pkg/models/observer_buffer_test.go index 1fb13253..2d7108ab 100644 --- a/pkg/models/observer_buffer_test.go +++ b/pkg/models/observer_buffer_test.go @@ -30,6 +30,7 @@ func TestObserverBuffer(t *testing.T) { { Data: []byte("Baz"), PartitionKey: "partition1", + CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute), @@ -39,6 +40,7 @@ func TestObserverBuffer(t *testing.T) { { Data: []byte("Bar"), PartitionKey: "partition2", + CollectorTstamp: timeNow.Add(time.Duration(-80) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), @@ -50,6 +52,7 @@ func TestObserverBuffer(t *testing.T) { { Data: []byte("Foo"), PartitionKey: "partition3", + CollectorTstamp: timeNow.Add(time.Duration(-40) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute), @@ -119,7 +122,11 @@ func TestObserverBuffer(t *testing.T) { assert.Equal(time.Duration(8)*time.Minute, b.MaxRequestLatency) assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency) - assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000,SumTransformLatency:720000,SumProcLatency:2520000,SumMsgLatency:18000000,MinReqLatency:60000,MaxReqLatency:480000,SumReqLatency:1320000", b.String()) + assert.Equal(time.Duration(80)*time.Minute, b.MaxE2ELatency) + assert.Equal(time.Duration(40)*time.Minute, b.MinE2ELatency) + assert.Equal(time.Duration(60)*time.Minute, b.GetAvgE2ELatency()) + + assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000,SumTransformLatency:720000,SumProcLatency:2520000,SumMsgLatency:18000000,MinReqLatency:60000,MaxReqLatency:480000,SumReqLatency:1320000,MinE2ELatency:2400000,MaxE2ELatency:4800000,SumE2ELatency:21600000", b.String()) } // TestObserverBuffer_Basic is a basic version of the above test, stripping away all but one event @@ -191,7 +198,7 @@ func TestObserverBuffer_Basic(t *testing.T) { assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency) assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency) - assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String()) + assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000,MinE2ELatency:0,MaxE2ELatency:0,SumE2ELatency:0", b.String()) } // TestObserverBuffer_BasicNoTransform is a basic version of the above test, stripping away all but one event. @@ -260,5 +267,5 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) { assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency) assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency) - assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String()) + assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000,MinE2ELatency:0,MaxE2ELatency:0,SumE2ELatency:0", b.String()) } diff --git a/pkg/models/target_write_result_test.go b/pkg/models/target_write_result_test.go index 9ab26c19..c012be26 100644 --- a/pkg/models/target_write_result_test.go +++ b/pkg/models/target_write_result_test.go @@ -40,30 +40,10 @@ func TestNewTargetWriteResult_EmptyWithoutTime(t *testing.T) { assert.Equal(time.Duration(0), r.MaxTransformLatency) assert.Equal(time.Duration(0), r.MinTransformLatency) assert.Equal(time.Duration(0), r.AvgTransformLatency) -} - -// TestNewTargetWriteResult_EmptyWithTime tests that an empty targetWriteResult with no a provided timestamp will report 0s across the board -func TestNewTargetWriteResult_EmptyWithTime(t *testing.T) { - assert := assert.New(t) - - r := NewTargetWriteResult(nil, nil, nil, nil) - assert.NotNil(r) - assert.Equal(int64(0), r.SentCount) - assert.Equal(int64(0), r.FailedCount) - assert.Equal(int64(0), r.Total()) - - assert.Equal(time.Duration(0), r.MaxProcLatency) - assert.Equal(time.Duration(0), r.MinProcLatency) - assert.Equal(time.Duration(0), r.AvgProcLatency) - - assert.Equal(time.Duration(0), r.MaxMsgLatency) - assert.Equal(time.Duration(0), r.MinMsgLatency) - assert.Equal(time.Duration(0), r.AvgMsgLatency) - - assert.Equal(time.Duration(0), r.MaxTransformLatency) - assert.Equal(time.Duration(0), r.MinTransformLatency) - assert.Equal(time.Duration(0), r.AvgTransformLatency) + assert.Equal(time.Duration(0), r.MaxE2ELatency) + assert.Equal(time.Duration(0), r.MinE2ELatency) + assert.Equal(time.Duration(0), r.AvgE2ELatency) } // TestNewTargetWriteResult_WithMessages tests that reporting of statistics is as it should be when we have all data @@ -76,6 +56,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Baz"), PartitionKey: "partition1", + CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute), @@ -84,6 +65,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Bar"), PartitionKey: "partition2", + CollectorTstamp: timeNow.Add(time.Duration(-80) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), @@ -94,6 +76,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Foo"), PartitionKey: "partition3", + CollectorTstamp: timeNow.Add(time.Duration(-40) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute), @@ -116,11 +99,15 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { assert.Equal(time.Duration(3)*time.Minute, r.MaxTransformLatency) assert.Equal(time.Duration(1)*time.Minute, r.MinTransformLatency) assert.Equal(time.Duration(2)*time.Minute, r.AvgTransformLatency) + assert.Equal(time.Duration(80)*time.Minute, r.MaxE2ELatency) + assert.Equal(time.Duration(40)*time.Minute, r.MinE2ELatency) + assert.Equal(time.Duration(60)*time.Minute, r.AvgE2ELatency) sent1 := []*Message{ { Data: []byte("Baz"), PartitionKey: "partition1", + CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute), TimePulled: timeNow.Add(time.Duration(-2) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute), @@ -131,6 +118,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Bar"), PartitionKey: "partition2", + CollectorTstamp: timeNow.Add(time.Duration(-120) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute), TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), @@ -139,6 +127,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { { Data: []byte("Foo"), PartitionKey: "partition3", + CollectorTstamp: timeNow.Add(time.Duration(-30) * time.Minute), TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute), TimePulled: timeNow.Add(time.Duration(-15) * time.Minute), TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute), @@ -172,10 +161,13 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { assert.Equal(time.Duration(8)*time.Minute, r3.MaxTransformLatency) assert.Equal(time.Duration(1)*time.Minute, r3.MinTransformLatency) assert.Equal(time.Duration(3)*time.Minute, r3.AvgTransformLatency) + assert.Equal(time.Duration(120)*time.Minute, r3.MaxE2ELatency) + assert.Equal(time.Duration(30)*time.Minute, r3.MinE2ELatency) + assert.Equal(time.Duration(65)*time.Minute, r3.AvgE2ELatency) } -// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed -func TestNewTargetWriteResult_NoTransformation(t *testing.T) { +// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed and no collector timestamp +func TestNewTargetWriteResult_NoTransformation_NoE2E(t *testing.T) { assert := assert.New(t) timeNow := time.Now().UTC() @@ -221,4 +213,7 @@ func TestNewTargetWriteResult_NoTransformation(t *testing.T) { assert.Equal(time.Duration(0), r.MaxTransformLatency) assert.Equal(time.Duration(0), r.MinTransformLatency) assert.Equal(time.Duration(0), r.AvgTransformLatency) + assert.Equal(time.Duration(0), r.MaxE2ELatency) + assert.Equal(time.Duration(0), r.MinE2ELatency) + assert.Equal(time.Duration(0), r.AvgE2ELatency) }