Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize proto translation for merger #40

Merged
merged 13 commits into from
Jul 27, 2023
5 changes: 3 additions & 2 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/apm-aggregation/aggregationpb"
"github.com/elastic/apm-aggregation/aggregators/internal/telemetry"
"github.com/elastic/apm-aggregation/aggregators/internal/timestamppb"
"github.com/elastic/apm-data/model/modelpb"
)

Expand Down Expand Up @@ -608,7 +609,7 @@ func (a *Aggregator) processHarvest(
if err := a.cfg.Processor(ctx, cmk, cm, aggIvl); err != nil {
return hs, fmt.Errorf("failed to process combined metrics ID %s: %w", cmk.ID, err)
}
hs.eventsTotal = cm.eventsTotal
hs.youngestEventTimestamp = cm.youngestEventTimestamp
hs.eventsTotal = cm.EventsTotal
hs.youngestEventTimestamp = timestamppb.PBTimestampToTime(cm.YoungestEventTimestamp)
return hs, nil
}
88 changes: 34 additions & 54 deletions aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ func TestAggregateBatch(t *testing.T) {
mp := metric.NewMeterProvider(metric.WithReader(gatherer))

cmID := EncodeToCombinedMetricsKeyID(t, "ab01")
txnDuration := 100 * time.Millisecond
eventDuration := 100 * time.Millisecond
dssDuration := 10 * time.Millisecond
uniqueEventCount := 100 // for each of txns and spans
uniqueServices := 10
repCount := 5
Expand All @@ -67,7 +68,7 @@ func TestAggregateBatch(t *testing.T) {
batch = append(batch, &modelpb.APMEvent{
Event: &modelpb.Event{
Outcome: "success",
Duration: durationpb.New(txnDuration),
Duration: durationpb.New(eventDuration),
Received: timestamppb.New(ts),
},
Transaction: &modelpb.Transaction{
Expand All @@ -80,7 +81,7 @@ func TestAggregateBatch(t *testing.T) {
Outcome: "success",
Duration: &modelpb.AggregatedDuration{
Count: 1,
Sum: durationpb.New(10 * time.Millisecond),
Sum: durationpb.New(dssDuration),
},
},
},
Expand All @@ -89,6 +90,7 @@ func TestAggregateBatch(t *testing.T) {
})
batch = append(batch, &modelpb.APMEvent{
Event: &modelpb.Event{
Duration: durationpb.New(eventDuration),
Received: timestamppb.New(ts),
},
Span: &modelpb.Span{
Expand Down Expand Up @@ -146,16 +148,15 @@ func TestAggregateBatch(t *testing.T) {
}
assert.NotNil(t, span)

expectedCombinedMetrics := CombinedMetrics{
Services: make(map[ServiceAggregationKey]ServiceMetrics),
eventsTotal: float64(len(batch)),
youngestEventTimestamp: ts,
}
expectedCombinedMetrics := NewTestCombinedMetrics(
WithEventsTotal(float64(len(batch))),
WithYoungestEventTimestamp(ts),
)
expectedMeasurements := []apmmodel.Metrics{
{
Samples: map[string]apmmodel.Metric{
"aggregator.requests.total": {Value: 1},
"aggregator.bytes.ingested": {Value: 133750},
"aggregator.bytes.ingested": {Value: 138250},
},
Labels: apmmodel.StringMap{
apmmodel.StringMapItem{Key: "id_key", Value: string(cmID[:])},
Expand Down Expand Up @@ -193,48 +194,27 @@ func TestAggregateBatch(t *testing.T) {
SpanName: fmt.Sprintf("bar%d", i%uniqueEventCount),
Resource: "test_dest",
}
if _, ok := expectedCombinedMetrics.Services[svcKey]; !ok {
expectedCombinedMetrics.Services[svcKey] = newServiceMetrics()
}
if _, ok := expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik]; !ok {
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik] = newServiceInstanceMetrics()
}
var ok bool
var tm TransactionMetrics
if tm, ok = expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].TransactionGroups[txKey]; !ok {
tm = newTransactionMetrics()
}
tm.Histogram.RecordDuration(txnDuration, 1)
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].TransactionGroups[txKey] = tm
var stm ServiceTransactionMetrics
if stm, ok = expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].ServiceTransactionGroups[stxKey]; !ok {
stm = newServiceTransactionMetrics()
}
stm.Histogram.RecordDuration(txnDuration, 1)
stm.SuccessCount++
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].ServiceTransactionGroups[stxKey] = stm
sm := expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[spanKey]
sm.Count++
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[spanKey] = sm

droppedSpanStatsKey := SpanAggregationKey{
dssKey := SpanAggregationKey{
SpanName: "",
Resource: fmt.Sprintf("dropped_dest_resource%d", i%uniqueEventCount),
Outcome: "success",
}
dssm := expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[droppedSpanStatsKey]
dssm.Count++
dssm.Sum += float64(10 * time.Millisecond)
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[droppedSpanStatsKey] = dssm
expectedCombinedMetrics.
AddServiceMetrics(svcKey).
AddServiceInstanceMetrics(sik).
AddTransaction(txKey, WithTransactionDuration(eventDuration)).
AddServiceTransaction(stxKey, WithTransactionDuration(eventDuration)).
AddSpan(spanKey, WithSpanDuration(eventDuration)).
AddSpan(dssKey, WithSpanDuration(dssDuration))
}
assert.Empty(t, cmp.Diff(
expectedCombinedMetrics, cm,
expectedCombinedMetrics.Get(), cm,
cmpopts.EquateEmpty(),
cmpopts.EquateApprox(0, 0.01),
cmp.Comparer(func(a, b hdrhistogram.HybridCountsRep) bool {
return a.Equal(&b)
}),
cmp.AllowUnexported(CombinedMetrics{}),
protocmp.Transform(),
))
assert.Empty(t, cmp.Diff(
expectedMeasurements,
Expand Down Expand Up @@ -1116,20 +1096,20 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) {
ProcessingTime: time.Now().Truncate(aggIvl),
ID: EncodeToCombinedMetricsKeyID(b, "ab01"),
}
cm := (*CombinedMetrics)(createTestCombinedMetrics(withEventsTotal(1)).
addServiceTransaction(
time.Now(),
"test-svc",
"",
testServiceTransaction{txnType: "txntype", count: 1},
).
addTransaction(
time.Now(),
"test-svc",
"",
testTransaction{txnName: "txntest", txnType: "txntype", count: 1},
),
).ToProto()
cm := NewTestCombinedMetrics(WithEventsTotal(1)).
AddServiceMetrics(ServiceAggregationKey{
Timestamp: time.Now(),
ServiceName: "test-svc",
}).
AddServiceInstanceMetrics(ServiceInstanceAggregationKey{}).
AddTransaction(TransactionAggregationKey{
TransactionName: "txntest",
TransactionType: "txntype",
}).
AddServiceTransaction(ServiceTransactionAggregationKey{
TransactionType: "txntype",
}).
GetProto()
b.Cleanup(func() { cm.ReturnToVTPool() })
ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(func() { cancel() })
Expand Down
Loading