Skip to content

Commit

Permalink
Optimize proto translation for merger (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar authored Jul 27, 2023
1 parent b373390 commit 4884484
Show file tree
Hide file tree
Showing 10 changed files with 1,830 additions and 1,167 deletions.
5 changes: 3 additions & 2 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/apm-aggregation/aggregationpb"
"github.com/elastic/apm-aggregation/aggregators/internal/telemetry"
"github.com/elastic/apm-aggregation/aggregators/internal/timestamppb"
"github.com/elastic/apm-data/model/modelpb"
)

Expand Down Expand Up @@ -608,7 +609,7 @@ func (a *Aggregator) processHarvest(
if err := a.cfg.Processor(ctx, cmk, cm, aggIvl); err != nil {
return hs, fmt.Errorf("failed to process combined metrics ID %s: %w", cmk.ID, err)
}
hs.eventsTotal = cm.eventsTotal
hs.youngestEventTimestamp = cm.youngestEventTimestamp
hs.eventsTotal = cm.EventsTotal
hs.youngestEventTimestamp = timestamppb.PBTimestampToTime(cm.YoungestEventTimestamp)
return hs, nil
}
88 changes: 34 additions & 54 deletions aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestAggregateBatch(t *testing.T) {
mp := metric.NewMeterProvider(metric.WithReader(gatherer))

cmID := EncodeToCombinedMetricsKeyID(t, "ab01")
txnDuration := 100 * time.Millisecond
eventDuration := 100 * time.Millisecond
dssDuration := 10 * time.Millisecond
uniqueEventCount := 100 // for each of txns and spans
uniqueServices := 10
repCount := 5
Expand All @@ -67,7 +68,7 @@ func TestAggregateBatch(t *testing.T) {
batch = append(batch, &modelpb.APMEvent{
Event: &modelpb.Event{
Outcome: "success",
Duration: durationpb.New(txnDuration),
Duration: durationpb.New(eventDuration),
Received: timestamppb.New(ts),
},
Transaction: &modelpb.Transaction{
Expand All @@ -80,7 +81,7 @@ func TestAggregateBatch(t *testing.T) {
Outcome: "success",
Duration: &modelpb.AggregatedDuration{
Count: 1,
Sum: durationpb.New(10 * time.Millisecond),
Sum: durationpb.New(dssDuration),
},
},
},
Expand All @@ -89,6 +90,7 @@ func TestAggregateBatch(t *testing.T) {
})
batch = append(batch, &modelpb.APMEvent{
Event: &modelpb.Event{
Duration: durationpb.New(eventDuration),
Received: timestamppb.New(ts),
},
Span: &modelpb.Span{
Expand Down Expand Up @@ -146,16 +148,15 @@ func TestAggregateBatch(t *testing.T) {
}
assert.NotNil(t, span)

expectedCombinedMetrics := CombinedMetrics{
Services: make(map[ServiceAggregationKey]ServiceMetrics),
eventsTotal: float64(len(batch)),
youngestEventTimestamp: ts,
}
expectedCombinedMetrics := NewTestCombinedMetrics(
WithEventsTotal(float64(len(batch))),
WithYoungestEventTimestamp(ts),
)
expectedMeasurements := []apmmodel.Metrics{
{
Samples: map[string]apmmodel.Metric{
"aggregator.requests.total": {Value: 1},
"aggregator.bytes.ingested": {Value: 133750},
"aggregator.bytes.ingested": {Value: 138250},
},
Labels: apmmodel.StringMap{
apmmodel.StringMapItem{Key: "id_key", Value: string(cmID[:])},
Expand Down Expand Up @@ -193,48 +194,27 @@ func TestAggregateBatch(t *testing.T) {
SpanName: fmt.Sprintf("bar%d", i%uniqueEventCount),
Resource: "test_dest",
}
if _, ok := expectedCombinedMetrics.Services[svcKey]; !ok {
expectedCombinedMetrics.Services[svcKey] = newServiceMetrics()
}
if _, ok := expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik]; !ok {
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik] = newServiceInstanceMetrics()
}
var ok bool
var tm TransactionMetrics
if tm, ok = expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].TransactionGroups[txKey]; !ok {
tm = newTransactionMetrics()
}
tm.Histogram.RecordDuration(txnDuration, 1)
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].TransactionGroups[txKey] = tm
var stm ServiceTransactionMetrics
if stm, ok = expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].ServiceTransactionGroups[stxKey]; !ok {
stm = newServiceTransactionMetrics()
}
stm.Histogram.RecordDuration(txnDuration, 1)
stm.SuccessCount++
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].ServiceTransactionGroups[stxKey] = stm
sm := expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[spanKey]
sm.Count++
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[spanKey] = sm

droppedSpanStatsKey := SpanAggregationKey{
dssKey := SpanAggregationKey{
SpanName: "",
Resource: fmt.Sprintf("dropped_dest_resource%d", i%uniqueEventCount),
Outcome: "success",
}
dssm := expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[droppedSpanStatsKey]
dssm.Count++
dssm.Sum += float64(10 * time.Millisecond)
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[droppedSpanStatsKey] = dssm
expectedCombinedMetrics.
AddServiceMetrics(svcKey).
AddServiceInstanceMetrics(sik).
AddTransaction(txKey, WithTransactionDuration(eventDuration)).
AddServiceTransaction(stxKey, WithTransactionDuration(eventDuration)).
AddSpan(spanKey, WithSpanDuration(eventDuration)).
AddSpan(dssKey, WithSpanDuration(dssDuration))
}
assert.Empty(t, cmp.Diff(
expectedCombinedMetrics, cm,
expectedCombinedMetrics.Get(), cm,
cmpopts.EquateEmpty(),
cmpopts.EquateApprox(0, 0.01),
cmp.Comparer(func(a, b hdrhistogram.HybridCountsRep) bool {
return a.Equal(&b)
}),
cmp.AllowUnexported(CombinedMetrics{}),
protocmp.Transform(),
))
assert.Empty(t, cmp.Diff(
expectedMeasurements,
Expand Down Expand Up @@ -1116,20 +1096,20 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) {
ProcessingTime: time.Now().Truncate(aggIvl),
ID: EncodeToCombinedMetricsKeyID(b, "ab01"),
}
cm := (*CombinedMetrics)(createTestCombinedMetrics(withEventsTotal(1)).
addServiceTransaction(
time.Now(),
"test-svc",
"",
testServiceTransaction{txnType: "txntype", count: 1},
).
addTransaction(
time.Now(),
"test-svc",
"",
testTransaction{txnName: "txntest", txnType: "txntype", count: 1},
),
).ToProto()
cm := NewTestCombinedMetrics(WithEventsTotal(1)).
AddServiceMetrics(ServiceAggregationKey{
Timestamp: time.Now(),
ServiceName: "test-svc",
}).
AddServiceInstanceMetrics(ServiceInstanceAggregationKey{}).
AddTransaction(TransactionAggregationKey{
TransactionName: "txntest",
TransactionType: "txntype",
}).
AddServiceTransaction(ServiceTransactionAggregationKey{
TransactionType: "txntype",
}).
GetProto()
b.Cleanup(func() { cm.ReturnToVTPool() })
ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(func() { cancel() })
Expand Down
Loading

0 comments on commit 4884484

Please sign in to comment.