diff --git a/pkg/models/observer_buffer_test.go b/pkg/models/observer_buffer_test.go index 48daf7c5..1fb13253 100644 --- a/pkg/models/observer_buffer_test.go +++ b/pkg/models/observer_buffer_test.go @@ -69,7 +69,7 @@ func TestObserverBuffer(t *testing.T) { }, } - r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow) + r := NewTargetWriteResult(sent, failed, nil, nil) b.AppendWrite(r) b.AppendWrite(r) @@ -144,7 +144,7 @@ func TestObserverBuffer_Basic(t *testing.T) { }, } - r := NewTargetWriteResultWithTime(sent, nil, nil, nil, timeNow) + r := NewTargetWriteResult(sent, nil, nil, nil) b.AppendWrite(r) b.AppendWrite(nil) @@ -194,7 +194,7 @@ func TestObserverBuffer_Basic(t *testing.T) { assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String()) } -// TestObserverBuffer_Basic is a basic version of the above test, stripping away all but one event. +// TestObserverBuffer_BasicNoTransform is a basic version of the above test, stripping away all but one event. // It exists purely to simplify reasoning through bugs. func TestObserverBuffer_BasicNoTransform(t *testing.T) { assert := assert.New(t) @@ -206,14 +206,16 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) { sent := []*Message{ { - Data: []byte("Baz"), - PartitionKey: "partition1", - TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), + Data: []byte("Baz"), + PartitionKey: "partition1", + TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), + TimeRequestStarted: timeNow.Add(time.Duration(-1) * time.Minute), + TimeRequestFinished: timeNow, }, } - r := NewTargetWriteResultWithTime(sent, nil, nil, nil, timeNow) + r := NewTargetWriteResult(sent, nil, nil, nil) b.AppendWrite(r) b.AppendWrite(nil) @@ -255,8 +257,8 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) { assert.Equal(time.Duration(0), b.MinFilterLatency) assert.Equal(time.Duration(0), b.GetAvgFilterLatency()) - assert.Equal(time.Duration(0)*time.Minute, b.MaxRequestLatency) - assert.Equal(time.Duration(0)*time.Minute, b.MinRequestLatency) + assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency) + assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency) - assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:0,MaxReqLatency:0,SumReqLatency:0", b.String()) + assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String()) } diff --git a/pkg/models/target_write_result.go b/pkg/models/target_write_result.go index 6e164737..bf296291 100644 --- a/pkg/models/target_write_result.go +++ b/pkg/models/target_write_result.go @@ -39,13 +39,13 @@ type TargetWriteResult struct { // and need to be specially handled. Invalid []*Message - // Delta between TimePulled and TimeOfWrite tells us how well the + // Delta between TimePulled and TimeRequestfinished tells us how well the // application is at processing data internally MaxProcLatency time.Duration MinProcLatency time.Duration AvgProcLatency time.Duration - // Delta between TimeCreated and TimeOfWrite tells us how far behind + // Delta between TimeCreated and TimeRequestfinished tells us how far behind // the application is on the stream it is consuming from MaxMsgLatency time.Duration MinMsgLatency time.Duration @@ -63,15 +63,10 @@ type TargetWriteResult struct { AvgRequestLatency time.Duration } -// NewTargetWriteResult uses the current time as the WriteTime and then calls NewTargetWriteResultWithTime -func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message) *TargetWriteResult { - return NewTargetWriteResultWithTime(sent, failed, oversized, invalid, time.Now().UTC()) -} - -// NewTargetWriteResultWithTime builds a result structure to return from a target write +// NewTargetWriteResult builds a result structure to return from a target write // attempt which contains the sent and failed message counts as well as several // derived latency measures. -func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message, timeOfWrite time.Time) *TargetWriteResult { +func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message) *TargetWriteResult { r := TargetWriteResult{ SentCount: int64(len(sent)), FailedCount: int64(len(failed)), @@ -91,7 +86,7 @@ func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized var sumRequestLatency time.Duration for _, msg := range processed { - procLatency := timeOfWrite.Sub(msg.TimePulled) + procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled) if r.MaxProcLatency < procLatency { r.MaxProcLatency = procLatency } @@ -100,7 +95,7 @@ func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized } sumProcLatency += procLatency - messageLatency := timeOfWrite.Sub(msg.TimeCreated) + messageLatency := msg.TimeRequestFinished.Sub(msg.TimeCreated) if r.MaxMsgLatency < messageLatency { r.MaxMsgLatency = messageLatency } diff --git a/pkg/models/target_write_result_test.go b/pkg/models/target_write_result_test.go index e0ded311..9ab26c19 100644 --- a/pkg/models/target_write_result_test.go +++ b/pkg/models/target_write_result_test.go @@ -46,7 +46,7 @@ func TestNewTargetWriteResult_EmptyWithoutTime(t *testing.T) { func TestNewTargetWriteResult_EmptyWithTime(t *testing.T) { assert := assert.New(t) - r := NewTargetWriteResultWithTime(nil, nil, nil, nil, time.Now().UTC()) + r := NewTargetWriteResult(nil, nil, nil, nil) assert.NotNil(r) assert.Equal(int64(0), r.SentCount) @@ -74,31 +74,34 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { sent := []*Message{ { - Data: []byte("Baz"), - PartitionKey: "partition1", - TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), - TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute), + Data: []byte("Baz"), + PartitionKey: "partition1", + TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute), + TimeRequestFinished: timeNow, }, { - Data: []byte("Bar"), - PartitionKey: "partition2", - TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), - TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), + Data: []byte("Bar"), + PartitionKey: "partition2", + TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), + TimeRequestFinished: timeNow, }, } failed := []*Message{ { - Data: []byte("Foo"), - PartitionKey: "partition3", - TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), - TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute), + Data: []byte("Foo"), + PartitionKey: "partition3", + TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute), + TimeRequestFinished: timeNow, }, } - r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow) + r := NewTargetWriteResult(sent, failed, nil, nil) assert.NotNil(r) assert.Equal(int64(2), r.SentCount) @@ -116,31 +119,34 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) { sent1 := []*Message{ { - Data: []byte("Baz"), - PartitionKey: "partition1", - TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-2) * time.Minute), - TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute), + Data: []byte("Baz"), + PartitionKey: "partition1", + TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-2) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute), + TimeRequestFinished: timeNow, }, } failed1 := []*Message{ { - Data: []byte("Bar"), - PartitionKey: "partition2", - TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), - TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), + Data: []byte("Bar"), + PartitionKey: "partition2", + TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute), + TimeRequestFinished: timeNow, }, { - Data: []byte("Foo"), - PartitionKey: "partition3", - TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-15) * time.Minute), - TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute), + Data: []byte("Foo"), + PartitionKey: "partition3", + TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-15) * time.Minute), + TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute), + TimeRequestFinished: timeNow, }, } - r1 := NewTargetWriteResultWithTime(sent1, failed1, nil, nil, timeNow) + r1 := NewTargetWriteResult(sent1, failed1, nil, nil) assert.NotNil(r) // Append a result @@ -176,28 +182,31 @@ func TestNewTargetWriteResult_NoTransformation(t *testing.T) { sent := []*Message{ { - Data: []byte("Baz"), - PartitionKey: "partition1", - TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), + Data: []byte("Baz"), + PartitionKey: "partition1", + TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), + TimeRequestFinished: timeNow, }, { - Data: []byte("Bar"), - PartitionKey: "partition2", - TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), + Data: []byte("Bar"), + PartitionKey: "partition2", + TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), + TimeRequestFinished: timeNow, }, } failed := []*Message{ { - Data: []byte("Foo"), - PartitionKey: "partition3", - TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), + Data: []byte("Foo"), + PartitionKey: "partition3", + TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), + TimeRequestFinished: timeNow, }, } - r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow) + r := NewTargetWriteResult(sent, failed, nil, nil) assert.NotNil(r) assert.Equal(int64(2), r.SentCount) diff --git a/pkg/observer/observer_test.go b/pkg/observer/observer_test.go index 2c543d3a..ccb705cd 100644 --- a/pkg/observer/observer_test.go +++ b/pkg/observer/observer_test.go @@ -63,27 +63,30 @@ func TestObserverTargetWrite(t *testing.T) { timeNow := time.Now().UTC() sent := []*models.Message{ { - Data: []byte("Baz"), - PartitionKey: "partition1", - TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), + Data: []byte("Baz"), + PartitionKey: "partition1", + TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-4) * time.Minute), + TimeRequestFinished: timeNow, }, { - Data: []byte("Bar"), - PartitionKey: "partition2", - TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), + Data: []byte("Bar"), + PartitionKey: "partition2", + TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-7) * time.Minute), + TimeRequestFinished: timeNow, }, } failed := []*models.Message{ { - Data: []byte("Foo"), - PartitionKey: "partition3", - TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), - TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), + Data: []byte("Foo"), + PartitionKey: "partition3", + TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute), + TimePulled: timeNow.Add(time.Duration(-10) * time.Minute), + TimeRequestFinished: timeNow, }, } - r := models.NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow) + r := models.NewTargetWriteResult(sent, failed, nil, nil) for i := 0; i < 5; i++ { observer.TargetWrite(r) observer.TargetWriteOversized(r) diff --git a/pkg/target/eventhub.go b/pkg/target/eventhub.go index c34ddf7b..2a0c830f 100644 --- a/pkg/target/eventhub.go +++ b/pkg/target/eventhub.go @@ -197,9 +197,9 @@ func (eht *EventHubTarget) process(messages []*models.Message) (*models.TargetWr defer cancel() batchIterator := eventhub.NewEventBatchIterator(ehBatch...) - requestStarted := time.Now() + requestStarted := time.Now().UTC() err := eht.client.SendBatch(ctx, batchIterator, eventhub.BatchWithMaxSizeInBytes(eht.batchByteLimit)) - requestFinished := time.Now() + requestFinished := time.Now().UTC() // Not clean but for a quick test release it's fine for _, msg := range messages { diff --git a/pkg/target/http.go b/pkg/target/http.go index 7738f163..c1f26c9c 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -237,9 +237,9 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword) } - requestStarted := time.Now() + requestStarted := time.Now().UTC() resp, err := ht.client.Do(request) // Make request - requestFinished := time.Now() + requestFinished := time.Now().UTC() msg.TimeRequestStarted = requestStarted msg.TimeRequestFinished = requestFinished diff --git a/pkg/target/kafka.go b/pkg/target/kafka.go index 4e1f60ab..f3c11b5c 100644 --- a/pkg/target/kafka.go +++ b/pkg/target/kafka.go @@ -247,13 +247,13 @@ func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteRes } } else if kt.syncProducer != nil { for _, msg := range safeMessages { - requestStarted := time.Now() + requestStarted := time.Now().UTC() _, _, err := kt.syncProducer.SendMessage(&sarama.ProducerMessage{ Topic: kt.topicName, Key: sarama.StringEncoder(msg.PartitionKey), Value: sarama.ByteEncoder(msg.Data), }) - requestFinished := time.Now() + requestFinished := time.Now().UTC() msg.TimeRequestStarted = requestStarted msg.TimeRequestFinished = requestFinished diff --git a/pkg/target/kinesis.go b/pkg/target/kinesis.go index 77b3bdbf..e3a99dcf 100644 --- a/pkg/target/kinesis.go +++ b/pkg/target/kinesis.go @@ -181,12 +181,12 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit } } - requestStarted := time.Now() + requestStarted := time.Now().UTC() res, err := kt.client.PutRecords(&kinesis.PutRecordsInput{ Records: entries, StreamName: aws.String(kt.streamName), }) - requestFinished := time.Now() + requestFinished := time.Now().UTC() // Assign timings // These will only get recorded in metrics once the messages are successful diff --git a/pkg/target/pubsub.go b/pkg/target/pubsub.go index c86e9da3..9727cc59 100644 --- a/pkg/target/pubsub.go +++ b/pkg/target/pubsub.go @@ -143,9 +143,9 @@ func (ps *PubSubTarget) Write(messages []*models.Message) (*models.TargetWriteRe pubSubMsg := &pubsub.Message{ Data: msg.Data, } - requestStarted := time.Now() + requestStarted := time.Now().UTC() r := ps.topic.Publish(ctx, pubSubMsg) - requestFinished := time.Now() + requestFinished := time.Now().UTC() msg.TimeRequestStarted = requestStarted msg.TimeRequestFinished = requestFinished diff --git a/pkg/target/sqs.go b/pkg/target/sqs.go index 320b2a15..d2dce3c9 100644 --- a/pkg/target/sqs.go +++ b/pkg/target/sqs.go @@ -168,12 +168,12 @@ func (st *SQSTarget) process(messages []*models.Message) (*models.TargetWriteRes lookup[msgID] = msg } - requestStarted := time.Now() + requestStarted := time.Now().UTC() res, err := st.client.SendMessageBatch(&sqs.SendMessageBatchInput{ Entries: entries, QueueUrl: aws.String(st.queueURL), }) - requestFinished := time.Now() + requestFinished := time.Now().UTC() for _, msg := range messages { msg.TimeRequestStarted = requestStarted diff --git a/pkg/target/stdout.go b/pkg/target/stdout.go index b950ce9f..b7db9422 100644 --- a/pkg/target/stdout.go +++ b/pkg/target/stdout.go @@ -14,6 +14,7 @@ package target import ( "errors" "fmt" + "time" log "github.com/sirupsen/logrus" @@ -74,7 +75,9 @@ func (st *StdoutTarget) Write(messages []*models.Message) (*models.TargetWriteRe var sent []*models.Message for _, msg := range safeMessages { + msg.TimeRequestStarted = time.Now().UTC() fmt.Println(msg.String()) + msg.TimeRequestFinished = time.Now().UTC() if msg.AckFunc != nil { msg.AckFunc()