Skip to content

Commit

Permalink
More tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Nov 12, 2024
1 parent 4c96a40 commit 56e45fa
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 21 deletions.
2 changes: 1 addition & 1 deletion assets/docs/configuration/metrics/e2e-latency-example.hcl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
metrics {
# Optional toggle for E2E latency (difference between Snowplow collector timestamp and target write timestamp)
enable_e2e_latency = true
}
}
34 changes: 23 additions & 11 deletions pkg/models/target_write_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa
var sumMessageLatency time.Duration
var sumTransformLatency time.Duration
var sumRequestLatency time.Duration
var sumE2eLatency time.Duration
var sumE2ELatency time.Duration

// Some of processed messages may have NO attached collector tstamp (non-Snowplow data), so we have separate counter.
// It's used to calculate avarage E2E latency
var withCollectorTstampCount int64

for _, msg := range processed {
procLatency := msg.TimeRequestFinished.Sub(msg.TimePulled)
Expand Down Expand Up @@ -131,24 +135,26 @@ func NewTargetWriteResult(sent []*Message, failed []*Message, oversized []*Messa
sumRequestLatency += requestLatency

var e2eLatency time.Duration
// Calculate E2E only when there is collector tstamp present
if !msg.CollectorTstamp.IsZero() {
e2eLatency = msg.TimeRequestFinished.Sub(msg.CollectorTstamp)
sumE2ELatency += e2eLatency
withCollectorTstampCount += 1

Check failure on line 142 in pkg/models/target_write_result.go

View workflow job for this annotation

GitHub Actions / Compile & Test (1.23, ubuntu-latest)

should replace withCollectorTstampCount += 1 with withCollectorTstampCount++
if r.MaxE2ELatency < e2eLatency {
r.MaxE2ELatency = e2eLatency
}
if r.MinE2ELatency > e2eLatency || r.MinE2ELatency == time.Duration(0) {
r.MinE2ELatency = e2eLatency
}
}
if r.MaxE2ELatency < e2eLatency {
r.MaxE2ELatency = e2eLatency
}
if r.MinE2ELatency > e2eLatency || r.MinE2ELatency == time.Duration(0) {
r.MinE2ELatency = e2eLatency
}
sumE2eLatency += e2eLatency
}

if processedLen > 0 {
r.AvgProcLatency = common.GetAverageFromDuration(sumProcLatency, processedLen)
r.AvgMsgLatency = common.GetAverageFromDuration(sumMessageLatency, processedLen)
r.AvgTransformLatency = common.GetAverageFromDuration(sumTransformLatency, processedLen)
r.AvgRequestLatency = common.GetAverageFromDuration(sumRequestLatency, processedLen)
r.AvgE2ELatency = common.GetAverageFromDuration(sumE2eLatency, processedLen)
r.AvgE2ELatency = common.GetAverageFromDuration(sumE2ELatency, withCollectorTstampCount)
}

return &r
Expand Down Expand Up @@ -208,10 +214,16 @@ func (wr *TargetWriteResult) Append(nwr *TargetWriteResult) *TargetWriteResult {
if wrC.MaxE2ELatency < nwr.MaxE2ELatency {
wrC.MaxE2ELatency = nwr.MaxE2ELatency
}
if wrC.MinE2ELatency > nwr.MinE2ELatency || wrC.MinE2ELatency == time.Duration(0) {
if (wrC.MinE2ELatency > nwr.MinE2ELatency || wrC.MinE2ELatency == time.Duration(0)) && nwr.MinE2ELatency != time.Duration(0) {
wrC.MinE2ELatency = nwr.MinE2ELatency
}
wrC.AvgE2ELatency = common.GetAverageFromDuration(wrC.AvgE2ELatency+nwr.AvgE2ELatency, 2)

if wrC.AvgE2ELatency == time.Duration(0) {
wrC.AvgE2ELatency = nwr.AvgE2ELatency
} else if nwr.AvgE2ELatency != time.Duration(0) {
// Only calcuate new average when both wrC and nwr are non-zero
wrC.AvgE2ELatency = common.GetAverageFromDuration(wrC.AvgE2ELatency+nwr.AvgE2ELatency, 2)
}
}

return &wrC
Expand Down
9 changes: 3 additions & 6 deletions pkg/models/target_write_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Baz"),
PartitionKey: "partition1",
CollectorTstamp: timeNow.Add(time.Duration(-60) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-55) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-2) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-1) * time.Minute),
Expand All @@ -118,7 +117,6 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Bar"),
PartitionKey: "partition2",
CollectorTstamp: timeNow.Add(time.Duration(-120) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-75) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-7) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-4) * time.Minute),
Expand All @@ -127,7 +125,6 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
{
Data: []byte("Foo"),
PartitionKey: "partition3",
CollectorTstamp: timeNow.Add(time.Duration(-30) * time.Minute),
TimeCreated: timeNow.Add(time.Duration(-25) * time.Minute),
TimePulled: timeNow.Add(time.Duration(-15) * time.Minute),
TimeTransformed: timeNow.Add(time.Duration(-7) * time.Minute),
Expand Down Expand Up @@ -161,9 +158,9 @@ func TestNewTargetWriteResult_WithMessages(t *testing.T) {
assert.Equal(time.Duration(8)*time.Minute, r3.MaxTransformLatency)
assert.Equal(time.Duration(1)*time.Minute, r3.MinTransformLatency)
assert.Equal(time.Duration(3)*time.Minute, r3.AvgTransformLatency)
assert.Equal(time.Duration(120)*time.Minute, r3.MaxE2ELatency)
assert.Equal(time.Duration(30)*time.Minute, r3.MinE2ELatency)
assert.Equal(time.Duration(65)*time.Minute, r3.AvgE2ELatency)
assert.Equal(time.Duration(80)*time.Minute, r3.MaxE2ELatency)
assert.Equal(time.Duration(40)*time.Minute, r3.MinE2ELatency)
assert.Equal(time.Duration(60)*time.Minute, r3.AvgE2ELatency)
}

// TestNewTargetWriteResult_NoTransformation tests that reporting of statistics is as it should be when we don't have a timeTransformed and no collector timestamp
Expand Down
8 changes: 5 additions & 3 deletions pkg/transform/snowplow_collector_tstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
"github.com/snowplow/snowbridge/pkg/models"
)

// CollectorTstampTransformation returns a transformation function
// CollectorTstampTransformation returns a transformation function attaching collector timestamp to the input message
// This transformation is not like other configurable transformations - it's enabled/disabled based on top-level metric configuration toggle (`metrics.enable_e2e_latency`)
// It doesn't produce invalid data in case of errors - it logs a warning and proceeds with input data as nothing happened.
func CollectorTstampTransformation() TransformationFunction {
return func(message *models.Message, interState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
parsedEvent, err := IntermediateAsSpEnrichedParsed(interState, message)
if err != nil {
log.Warnf("Error while extracting 'collector_tstamp'. Could not parse input message as Snowplow event, error: %s", err)
log.Warnf("Error while extracting 'collector_tstamp': %s", err)
return message, nil, nil, nil
}

tstamp, err := parsedEvent.GetValue("collector_tstamp")
if err != nil {
log.Warnf("Error while extracting 'collector_tstamp', error: %s", err)
log.Warnf("Error while extracting 'collector_tstamp': %s", err)
return message, nil, nil, parsedEvent
}

Expand Down

0 comments on commit 56e45fa

Please sign in to comment.