Skip to content

Commit

Permalink
Some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Nov 12, 2024
1 parent 4c857a8 commit a9eb3b0
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 30 deletions.
5 changes: 3 additions & 2 deletions pkg/models/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type Message struct {
Data []byte
HTTPHeaders map[string]string

// CollectorTstamp is the timestamp created by the Snowplow collector, extracted from the `collector_tstamp` atomic field. Used to measure E2E latency
CollectorTstamp time.Time

// TimeCreated is when the message was created originally
TimeCreated time.Time

Expand All @@ -37,8 +40,6 @@ type Message struct {
// Time the request was done, to measure request latency for debugging purposes - we manually track this timestamp unlike other metrics, to get as accurate as possible a picture of just the request latency.
TimeRequestFinished time.Time

CollectorTstamp time.Time

// AckFunc must be called on a successful message emission to ensure
// any cleanup process for the source is actioned
AckFunc func()
Expand Down
13 changes: 10 additions & 3 deletions pkg/models/observer_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func TestObserverBuffer(t *testing.T) {
{
Data: []byte("Baz"),
PartitionKey: "partition1",
CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
Expand All @@ -39,6 +40,7 @@ func TestObserverBuffer(t *testing.T) {
{
Data: []byte("Bar"),
PartitionKey: "partition2",
CollectorTstamp: timeNow.Add(time.Duration(-80) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Expand All @@ -50,6 +52,7 @@ func TestObserverBuffer(t *testing.T) {
{
Data: []byte("Foo"),
PartitionKey: "partition3",
CollectorTstamp: timeNow.Add(time.Duration(-40) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
Expand Down Expand Up @@ -119,7 +122,11 @@ func TestObserverBuffer(t *testing.T) {
assert.Equal(time.Duration(8)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency)

assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000,SumTransformLatency:720000,SumProcLatency:2520000,SumMsgLatency:18000000,MinReqLatency:60000,MaxReqLatency:480000,SumReqLatency:1320000", b.String())
assert.Equal(time.Duration(80)*time.Minute, b.MaxE2ELatency)
assert.Equal(time.Duration(40)*time.Minute, b.MinE2ELatency)
assert.Equal(time.Duration(60)*time.Minute, b.GetAvgE2ELatency())

assert.Equal("TargetResults:2,MsgFiltered:1,MsgSent:4,MsgFailed:2,OversizedTargetResults:2,OversizedMsgSent:4,OversizedMsgFailed:2,InvalidTargetResults:2,InvalidMsgSent:4,InvalidMsgFailed:2,MaxProcLatency:600000,MaxMsgLatency:4200000,MaxFilterLatency:600000,MaxTransformLatency:180000,SumTransformLatency:720000,SumProcLatency:2520000,SumMsgLatency:18000000,MinReqLatency:60000,MaxReqLatency:480000,SumReqLatency:1320000,MinE2ELatency:2400000,MaxE2ELatency:4800000,SumE2ELatency:21600000", b.String())
}

// TestObserverBuffer_Basic is a basic version of the above test, stripping away all but one event
Expand Down Expand Up @@ -191,7 +198,7 @@ func TestObserverBuffer_Basic(t *testing.T) {
assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency)

assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String())
assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000,MinE2ELatency:0,MaxE2ELatency:0,SumE2ELatency:0", b.String())
}

// TestObserverBuffer_BasicNoTransform is a basic version of the above test, stripping away all but one event.
Expand Down Expand Up @@ -260,5 +267,5 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) {
assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency)

assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String())
assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000,MinE2ELatency:0,MaxE2ELatency:0,SumE2ELatency:0", b.String())
}
45 changes: 20 additions & 25 deletions pkg/models/target_write_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,10 @@ func TestNewTargetWriteResult_EmptyWithoutTime(t *testing.T) {
assert.Equal(time.Duration(0), r.MaxTransformLatency)
assert.Equal(time.Duration(0), r.MinTransformLatency)
assert.Equal(time.Duration(0), r.AvgTransformLatency)
}

// TestNewTargetWriteResult_EmptyWithTime tests that an empty targetWriteResult with no a provided timestamp will report 0s across the board
func TestNewTargetWriteResult_EmptyWithTime(t *testing.T) {
assert := assert.New(t)

r := NewTargetWriteResult(nil, nil, nil, nil)
assert.NotNil(r)

assert.Equal(int64(0), r.SentCount)
assert.Equal(int64(0), r.FailedCount)
assert.Equal(int64(0), r.Total())

assert.Equal(time.Duration(0), r.MaxProcLatency)
assert.Equal(time.Duration(0), r.MinProcLatency)
assert.Equal(time.Duration(0), r.AvgProcLatency)

assert.Equal(time.Duration(0), r.MaxMsgLatency)
assert.Equal(time.Duration(0), r.MinMsgLatency)
assert.Equal(time.Duration(0), r.AvgMsgLatency)

assert.Equal(time.Duration(0), r.MaxTransformLatency)
assert.Equal(time.Duration(0), r.MinTransformLatency)
assert.Equal(time.Duration(0), r.AvgTransformLatency)
assert.Equal(time.Duration(0), r.MaxE2ELatency)
assert.Equal(time.Duration(0), r.MinE2ELatency)
assert.Equal(time.Duration(0), r.AvgE2ELatency)
}

// TestNewTargetWriteResult_WithMessages tests that reporting of statistics is as it should be when we have all data
Expand All @@ -76,6 +56,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Baz"),
PartitionKey: "partition1",
CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
Expand All @@ -84,6 +65,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Bar"),
PartitionKey: "partition2",
CollectorTstamp: timeNow.Add(time.Duration(-80) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Expand All @@ -94,6 +76,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Foo"),
PartitionKey: "partition3",
CollectorTstamp: timeNow.Add(time.Duration(-40) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
Expand All @@ -116,11 +99,15 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
assert.Equal(time.Duration(3)*time.Minute, r.MaxTransformLatency)
assert.Equal(time.Duration(1)*time.Minute, r.MinTransformLatency)
assert.Equal(time.Duration(2)*time.Minute, r.AvgTransformLatency)
assert.Equal(time.Duration(80)*time.Minute, r.MaxE2ELatency)
assert.Equal(time.Duration(40)*time.Minute, r.MinE2ELatency)
assert.Equal(time.Duration(60)*time.Minute, r.AvgE2ELatency)

sent1 := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-2) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute),
Expand All @@ -131,6 +118,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Bar"),
PartitionKey: "partition2",
CollectorTstamp: timeNow.Add(time.Duration(-120) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Expand All @@ -139,6 +127,7 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Foo"),
PartitionKey: "partition3",
CollectorTstamp: timeNow.Add(time.Duration(-30) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-15) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute),
Expand Down Expand Up @@ -172,10 +161,13 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
assert.Equal(time.Duration(8)*time.Minute, r3.MaxTransformLatency)
assert.Equal(time.Duration(1)*time.Minute, r3.MinTransformLatency)
assert.Equal(time.Duration(3)*time.Minute, r3.AvgTransformLatency)
assert.Equal(time.Duration(120)*time.Minute, r3.MaxE2ELatency)
assert.Equal(time.Duration(30)*time.Minute, r3.MinE2ELatency)
assert.Equal(time.Duration(65)*time.Minute, r3.AvgE2ELatency)
}

// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed
func TestNewTargetWriteResult_NoTransformation(t *testing.T) {
// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed and no collector timestamp
func TestNewTargetWriteResult_NoTransformation_NoE2E(t *testing.T) {
assert := assert.New(t)

timeNow := time.Now().UTC()
Expand Down Expand Up @@ -221,4 +213,7 @@ func TestNewTargetWriteResult_NoTransformation(t *testing.T) {
assert.Equal(time.Duration(0), r.MaxTransformLatency)
assert.Equal(time.Duration(0), r.MinTransformLatency)
assert.Equal(time.Duration(0), r.AvgTransformLatency)
assert.Equal(time.Duration(0), r.MaxE2ELatency)
assert.Equal(time.Duration(0), r.MinE2ELatency)
assert.Equal(time.Duration(0), r.AvgE2ELatency)
}

0 comments on commit a9eb3b0

Please sign in to comment.