Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use existing timestamp for request finish in measuring latency metrics #364

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions pkg/models/observer_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestObserverBuffer(t *testing.T) {
},
}

r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)
r := NewTargetWriteResult(sent, failed, nil, nil)

b.AppendWrite(r)
b.AppendWrite(r)
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestObserverBuffer_Basic(t *testing.T) {
},
}

r := NewTargetWriteResultWithTime(sent, nil, nil, nil, timeNow)
r := NewTargetWriteResult(sent, nil, nil, nil)

b.AppendWrite(r)
b.AppendWrite(nil)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestObserverBuffer_Basic(t *testing.T) {
assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:120000,SumTransformLatency:120000,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String())
}

// TestObserverBuffer_Basic is a basic version of the above test, stripping away all but one event.
// TestObserverBuffer_BasicNoTransform is a basic version of the above test, stripping away all but one event.
// It exists purely to simplify reasoning through bugs.
func TestObserverBuffer_BasicNoTransform(t *testing.T) {
assert := assert.New(t)
Expand All @@ -206,14 +206,16 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) {

sent := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestStarted: timeNow.Add(time.Duration(-1) * time.Minute),
TimeRequestFinished: timeNow,
},
}

r := NewTargetWriteResultWithTime(sent, nil, nil, nil, timeNow)
r := NewTargetWriteResult(sent, nil, nil, nil)

b.AppendWrite(r)
b.AppendWrite(nil)
Expand Down Expand Up @@ -255,8 +257,8 @@ func TestObserverBuffer_BasicNoTransform(t *testing.T) {
assert.Equal(time.Duration(0), b.MinFilterLatency)
assert.Equal(time.Duration(0), b.GetAvgFilterLatency())

assert.Equal(time.Duration(0)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(0)*time.Minute, b.MinRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MaxRequestLatency)
assert.Equal(time.Duration(1)*time.Minute, b.MinRequestLatency)

assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:0,MaxReqLatency:0,SumReqLatency:0", b.String())
assert.Equal("TargetResults:1,MsgFiltered:0,MsgSent:1,MsgFailed:0,OversizedTargetResults:0,OversizedMsgSent:0,OversizedMsgFailed:0,InvalidTargetResults:0,InvalidMsgSent:0,InvalidMsgFailed:0,MaxProcLatency:240000,MaxMsgLatency:3000000,MaxFilterLatency:0,MaxTransformLatency:0,SumTransformLatency:0,SumProcLatency:240000,SumMsgLatency:3000000,MinReqLatency:60000,MaxReqLatency:60000,SumReqLatency:60000", b.String())
}
17 changes: 6 additions & 11 deletions pkg/models/target_write_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ type TargetWriteResult struct {
// and need to be specially handled.
Invalid []*Message

// Delta between TimePulled and TimeOfWrite tells us how well the
// Delta between TimePulled and TimeRequestfinished tells us how well the
// application is at processing data internally
MaxProcLatency time.Duration
MinProcLatency time.Duration
AvgProcLatency time.Duration

// Delta between TimeCreated and TimeOfWrite tells us how far behind
// Delta between TimeCreated and TimeRequestfinished tells us how far behind
// the application is on the stream it is consuming from
MaxMsgLatency time.Duration
MinMsgLatency time.Duration
Expand All @@ -63,15 +63,10 @@ type TargetWriteResult struct {
AvgRequestLatency time.Duration
}

// NewTargetWriteResult uses the current time as the WriteTime and then calls NewTargetWriteResultWithTime
func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message) *TargetWriteResult {
return NewTargetWriteResultWithTime(sent, failed, oversized, invalid, time.Now().UTC())
}

// NewTargetWriteResultWithTime builds a result structure to return from a target write
// NewTargetWriteResult builds a result structure to return from a target write
// attempt which contains the sent and failed message counts as well as several
// derived latency measures.
func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message, timeOfWrite time.Time) *TargetWriteResult {
func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Message, invalid []*Message) *TargetWriteResult {
r := TargetWriteResult{
SentCount: int64(len(sent)),
FailedCount: int64(len(failed)),
Expand All @@ -91,7 +86,7 @@ func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized
var sumRequestLatency time.Duration

for _, msg := range processed {
procLatency := timeOfWrite.Sub(msg.TimePulled)
procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled)
if r.MaxProcLatency < procLatency {
r.MaxProcLatency = procLatency
}
Expand All @@ -100,7 +95,7 @@ func NewTargetWriteResultWithTime(sent []*Message, failed []*Message, oversized
}
sumProcLatency += procLatency

messageLatency := timeOfWrite.Sub(msg.TimeCreated)
messageLatency := msg.TimeRequestFinished.Sub(msg.TimeCreated)
if r.MaxMsgLatency < messageLatency {
r.MaxMsgLatency = messageLatency
}
Expand Down
101 changes: 55 additions & 46 deletions pkg/models/target_write_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestNewTargetWriteResult_EmptyWithoutTime(t *testing.T) {
func TestNewTargetWriteResult_EmptyWithTime(t *testing.T) {
assert := assert.New(t)

r := NewTargetWriteResultWithTime(nil, nil, nil, nil, time.Now().UTC())
r := NewTargetWriteResult(nil, nil, nil, nil)
assert.NotNil(r)

assert.Equal(int64(0), r.SentCount)
Expand Down Expand Up @@ -74,31 +74,34 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {

sent := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-2) * time.Minute),
TimeRequestFinished: timeNow,
},
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestFinished: timeNow,
},
}
failed := []*Message{
{
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-9) * time.Minute),
TimeRequestFinished: timeNow,
},
}

r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)
r := NewTargetWriteResult(sent, failed, nil, nil)
assert.NotNil(r)

assert.Equal(int64(2), r.SentCount)
Expand All @@ -116,31 +119,34 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {

sent1 := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-2) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-2) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute),
TimeRequestFinished: timeNow,
},
}
failed1 := []*Message{
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestFinished: timeNow,
},
{
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-15) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute),
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-15) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute),
TimeRequestFinished: timeNow,
},
}

r1 := NewTargetWriteResultWithTime(sent1, failed1, nil, nil, timeNow)
r1 := NewTargetWriteResult(sent1, failed1, nil, nil)
assert.NotNil(r)

// Append a result
Expand Down Expand Up @@ -176,28 +182,31 @@ func TestNewTargetWriteResult_NoTransformation(t *testing.T) {

sent := []*Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestFinished: timeNow,
},
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeRequestFinished: timeNow,
},
}
failed := []*Message{
{
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeRequestFinished: timeNow,
},
}

r := NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)
r := NewTargetWriteResult(sent, failed, nil, nil)
assert.NotNil(r)

assert.Equal(int64(2), r.SentCount)
Expand Down
29 changes: 16 additions & 13 deletions pkg/observer/observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,30 @@ func TestObserverTargetWrite(t *testing.T) {
timeNow := time.Now().UTC()
sent := []*models.Message{
{
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
Data: []byte("Baz"),
PartitionKey: "partition1",
TimeCreated: timeNow.Add(time.Duration(-50) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-4) * time.Minute),
TimeRequestFinished: timeNow,
},
{
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
Data: []byte("Bar"),
PartitionKey: "partition2",
TimeCreated: timeNow.Add(time.Duration(-70) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeRequestFinished: timeNow,
},
}
failed := []*models.Message{
{
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
Data: []byte("Foo"),
PartitionKey: "partition3",
TimeCreated: timeNow.Add(time.Duration(-30) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-10) * time.Minute),
TimeRequestFinished: timeNow,
},
}
r := models.NewTargetWriteResultWithTime(sent, failed, nil, nil, timeNow)
r := models.NewTargetWriteResult(sent, failed, nil, nil)
for i := 0; i < 5; i++ {
observer.TargetWrite(r)
observer.TargetWriteOversized(r)
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ func (eht *EventHubTarget) process(messages []*models.Message) (*models.TargetWr
defer cancel()

batchIterator := eventhub.NewEventBatchIterator(ehBatch...)
requestStarted := time.Now()
requestStarted := time.Now().UTC()
err := eht.client.SendBatch(ctx, batchIterator, eventhub.BatchWithMaxSizeInBytes(eht.batchByteLimit))
requestFinished := time.Now()
requestFinished := time.Now().UTC()

// Not clean but for a quick test release it's fine
for _, msg := range messages {
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,9 @@ func (ht *HTTPTarget) Write(messages []*models.Message) (*models.TargetWriteResu
if ht.basicAuthUsername != "" && ht.basicAuthPassword != "" { // Add basic auth if set
request.SetBasicAuth(ht.basicAuthUsername, ht.basicAuthPassword)
}
requestStarted := time.Now()
requestStarted := time.Now().UTC()
resp, err := ht.client.Do(request) // Make request
requestFinished := time.Now()
requestFinished := time.Now().UTC()

msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,13 @@ func (kt *KafkaTarget) Write(messages []*models.Message) (*models.TargetWriteRes
}
} else if kt.syncProducer != nil {
for _, msg := range safeMessages {
requestStarted := time.Now()
requestStarted := time.Now().UTC()
_, _, err := kt.syncProducer.SendMessage(&sarama.ProducerMessage{
Topic: kt.topicName,
Key: sarama.StringEncoder(msg.PartitionKey),
Value: sarama.ByteEncoder(msg.Data),
})
requestFinished := time.Now()
requestFinished := time.Now().UTC()

msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ func (kt *KinesisTarget) process(messages []*models.Message) (*models.TargetWrit
}
}

requestStarted := time.Now()
requestStarted := time.Now().UTC()
res, err := kt.client.PutRecords(&kinesis.PutRecordsInput{
Records: entries,
StreamName: aws.String(kt.streamName),
})
requestFinished := time.Now()
requestFinished := time.Now().UTC()

// Assign timings
// These will only get recorded in metrics once the messages are successful
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func (ps *PubSubTarget) Write(messages []*models.Message) (*models.TargetWriteRe
pubSubMsg := &pubsub.Message{
Data: msg.Data,
}
requestStarted := time.Now()
requestStarted := time.Now().UTC()
r := ps.topic.Publish(ctx, pubSubMsg)
requestFinished := time.Now()
requestFinished := time.Now().UTC()

msg.TimeRequestStarted = requestStarted
msg.TimeRequestFinished = requestFinished
Expand Down
4 changes: 2 additions & 2 deletions pkg/target/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,12 @@ func (st *SQSTarget) process(messages []*models.Message) (*models.TargetWriteRes
lookup[msgID] = msg
}

requestStarted := time.Now()
requestStarted := time.Now().UTC()
res, err := st.client.SendMessageBatch(&sqs.SendMessageBatchInput{
Entries: entries,
QueueUrl: aws.String(st.queueURL),
})
requestFinished := time.Now()
requestFinished := time.Now().UTC()

for _, msg := range messages {
msg.TimeRequestStarted = requestStarted
Expand Down
Loading
Loading