Skip to content

Commit

Permalink
Use existing timestamp for request finish in measuring latency metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 4, 2024
1 parent 1e270ce commit b998b05
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 93 deletions.
24 changes: 13 additions & 11 deletions pkg/models/observer_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestObserverBuffer(t *testing.T) {
},
}

r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)
r := NewTargetWriteResult(sent, failed, nil, nil)

b.AppendWrite(r)
b.AppendWrite(r)
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestObserverBuffer_Basic(t *testing.T) {
},
}

r := NewTargetWriteResultWithTime(sent, nil, nil, nil, timeNow)
r := NewTargetWriteResult(sent, nil, nil, nil)

b.AppendWrite(r)
b.AppendWrite(nil)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestObserverBuffer_Basic(t *testing.T) {
assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String())
}

// TestObserverBuffer_Basic is a basic version of the above test, stripping away all but one event.
// TestObserverBuffer_BasicNoTransform is a basic version of the above test, stripping away all but one event.
// It exists purely to simplify reasoning through bugs.
func TestObserverBuffer_BasicNoTransform(t *testing.T) {
assert := assert.New(t)
Expand All @@ -206,14 +206,16 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) {

sent := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestStarted: timeNow.Add(time.Duration(-1) * time.Minute),
TimeRequestFinished: timeNow,
},
}

r := NewTargetWriteResultWithTime(sent, nil, nil, nil, timeNow)
r := NewTargetWriteResult(sent, nil, nil, nil)

b.AppendWrite(r)
b.AppendWrite(nil)
Expand Down Expand Up @@ -255,8 +257,8 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) {
assert.Equal(time.Duration(0), b.MinFilterLatency)
assert.Equal(time.Duration(0), b.GetAvgFilterLatency())

assert.Equal(time.Duration(0)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(0)*time.Minute, b.MinRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency)

assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:0,MaxReqLatency:0,SumReqLatency:0", b.String())
assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String())
}
17 changes: 6 additions & 11 deletions pkg/models/target_write_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ type TargetWriteResult struct {
// and need to be specially handled.
Invalid []*Message

// Delta between TimePulled and TimeOfWrite tells us how well the
// Delta between TimePulled and TimeRequestfinished tells us how well the
// application is at processing data internally
MaxProcLatency time.Duration
MinProcLatency time.Duration
AvgProcLatency time.Duration

// Delta between TimeCreated and TimeOfWrite tells us how far behind
// Delta between TimeCreated and TimeRequestfinished tells us how far behind
// the application is on the stream it is consuming from
MaxMsgLatency time.Duration
MinMsgLatency time.Duration
Expand All @@ -63,15 +63,10 @@ type TargetWriteResult struct {
AvgRequestLatency time.Duration
}

// NewTargetWriteResult uses the current time as the WriteTime and then calls NewTargetWriteResultWithTime
func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message) *TargetWriteResult {
return NewTargetWriteResultWithTime(sent, failed, oversized, invalid, time.Now().UTC())
}

// NewTargetWriteResultWithTime builds a result structure to return from a target write
// NewTargetWriteResult builds a result structure to return from a target write
// attempt which contains the sent and failed message counts as well as several
// derived latency measures.
func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message, timeOfWrite time.Time) *TargetWriteResult {
func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message) *TargetWriteResult {
r := TargetWriteResult{
SentCount: int64(len(sent)),
FailedCount: int64(len(failed)),
Expand All @@ -91,7 +86,7 @@ func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized
var sumRequestLatency time.Duration

for _, msg := range processed {
procLatency := timeOfWrite.Sub(msg.TimePulled)
procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled)
if r.MaxProcLatency < procLatency {
r.MaxProcLatency = procLatency
}
Expand All @@ -100,7 +95,7 @@ func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized
}
sumProcLatency += procLatency

messageLatency := timeOfWrite.Sub(msg.TimeCreated)
messageLatency := msg.TimeRequestFinished.Sub(msg.TimeCreated)
if r.MaxMsgLatency < messageLatency {
r.MaxMsgLatency = messageLatency
}
Expand Down
101 changes: 55 additions & 46 deletions pkg/models/target_write_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestNewTargetWriteResult_EmptyWithoutTime(t *testing.T) {
func TestNewTargetWriteResult_EmptyWithTime(t *testing.T) {
assert := assert.New(t)

r := NewTargetWriteResultWithTime(nil, nil, nil, nil, time.Now().UTC())
r := NewTargetWriteResult(nil, nil, nil, nil)
assert.NotNil(r)

assert.Equal(int64(0), r.SentCount)
Expand Down Expand Up @@ -74,31 +74,34 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {

sent := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
TimeRequestFinished: timeNow,
},
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestFinished: timeNow,
},
}
failed := []*Message{
{
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
TimeRequestFinished: timeNow,
},
}

r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)
r := NewTargetWriteResult(sent, failed, nil, nil)
assert.NotNil(r)

assert.Equal(int64(2), r.SentCount)
Expand All @@ -116,31 +119,34 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {

sent1 := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-2) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-2) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute),
TimeRequestFinished: timeNow,
},
}
failed1 := []*Message{
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestFinished: timeNow,
},
{
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-15) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute),
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-15) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute),
TimeRequestFinished: timeNow,
},
}

r1 := NewTargetWriteResultWithTime(sent1, failed1, nil, nil, timeNow)
r1 := NewTargetWriteResult(sent1, failed1, nil, nil)
assert.NotNil(r)

// Append a result
Expand Down Expand Up @@ -176,28 +182,31 @@ func TestNewTargetWriteResult_NoTransformation(t *testing.T) {

sent := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestFinished: timeNow,
},
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeRequestFinished: timeNow,
},
}
failed := []*Message{
{
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeRequestFinished: timeNow,
},
}

r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)
r := NewTargetWriteResult(sent, failed, nil, nil)
assert.NotNil(r)

assert.Equal(int64(2), r.SentCount)
Expand Down
29 changes: 16 additions & 13 deletions pkg/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,30 @@ func TestObserverTargetWrite(t *testing.T) {
timeNow := time.Now().UTC()
sent := []*models.Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestFinished: timeNow,
},
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeRequestFinished: timeNow,
},
}
failed := []*models.Message{
{
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeRequestFinished: timeNow,
},
}
r := models.NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)
r := models.NewTargetWriteResult(sent, failed, nil, nil)
for i := 0; i < 5; i++ {
observer.TargetWrite(r)
observer.TargetWriteOversized(r)
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ func (eht *EventHubTarget) process(messages []*models.Message) (*models.TargetWr
defer cancel()

batchIterator := eventhub.NewEventBatchIterator(ehBatch...)
requestStarted := time.Now()
requestStarted := time.Now().UTC()
err := eht.client.SendBatch(ctx, batchIterator, eventhub.BatchWithMaxSizeInBytes(eht.batchByteLimit))
requestFinished := time.Now()
requestFinished := time.Now().UTC()

// Not clean but for a quick test release it's fine
for _, msg := range messages {
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu
if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set
request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword)
}
requestStarted := time.Now()
requestStarted := time.Now().UTC()
resp, err := ht.client.Do(request) // Make request
requestFinished := time.Now()
requestFinished := time.Now().UTC()

msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,13 @@ func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteRes
}
} else if kt.syncProducer != nil {
for _, msg := range safeMessages {
requestStarted := time.Now()
requestStarted := time.Now().UTC()
_, _, err := kt.syncProducer.SendMessage(&sarama.ProducerMessage{
Topic: kt.topicName,
Key: sarama.StringEncoder(msg.PartitionKey),
Value: sarama.ByteEncoder(msg.Data),
})
requestFinished := time.Now()
requestFinished := time.Now().UTC()

msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit
}
}

requestStarted := time.Now()
requestStarted := time.Now().UTC()
res, err := kt.client.PutRecords(&kinesis.PutRecordsInput{
Records: entries,
StreamName: aws.String(kt.streamName),
})
requestFinished := time.Now()
requestFinished := time.Now().UTC()

// Assign timings
// These will only get recorded in metrics once the messages are successful
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func (ps *PubSubTarget) Write(messages []*models.Message) (*models.TargetWriteRe
pubSubMsg := &pubsub.Message{
Data: msg.Data,
}
requestStarted := time.Now()
requestStarted := time.Now().UTC()
r := ps.topic.Publish(ctx, pubSubMsg)
requestFinished := time.Now()
requestFinished := time.Now().UTC()

msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ func (st *SQSTarget) process(messages []*models.Message) (*models.TargetWriteRes
lookup[msgID] = msg
}

requestStarted := time.Now()
requestStarted := time.Now().UTC()
res, err := st.client.SendMessageBatch(&sqs.SendMessageBatchInput{
Entries: entries,
QueueUrl: aws.String(st.queueURL),
})
requestFinished := time.Now()
requestFinished := time.Now().UTC()

for _, msg := range messages {
msg.TimeRequestStarted = requestStarted
Expand Down
Loading

0 comments on commit b998b05

Please sign in to comment.