From 200fa90c9f8edbaa78be0585e92065245c5de08d Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Wed, 12 Jul 2023 13:12:20 +0800 Subject: [PATCH] Refactor EventToCombinedMetrics to partition --- aggregators/aggregator.go | 85 +----------------- aggregators/aggregator_test.go | 26 +++--- aggregators/converter.go | 158 +++++++++++++++++++++++---------- aggregators/converter_test.go | 63 +++++++++++-- 4 files changed, 183 insertions(+), 149 deletions(-) diff --git a/aggregators/aggregator.go b/aggregators/aggregator.go index 54a9d76..a478b26 100644 --- a/aggregators/aggregator.go +++ b/aggregators/aggregator.go @@ -362,14 +362,14 @@ func (a *Aggregator) aggregateAPMEvent( ctx, span := a.cfg.Tracer.Start(ctx, "aggregateAPMEvent", trace.WithAttributes(traceAttrs...)) defer span.End() - cm, err := EventToCombinedMetrics(e, cmk.Interval) + kvs, err := EventToCombinedMetrics(e, cmk, a.cfg.Partitioner) if err != nil { span.RecordError(err) return 0, fmt.Errorf("failed to convert event to combined metrics: %w", err) } var totalBytesIn int - for _, ckv := range a.partition(cmk, cm) { - bytesIn, err := a.aggregate(ctx, ckv.Key, ckv.Value) + for _, kv := range kvs { + bytesIn, err := a.aggregate(ctx, kv.Key, kv.Value) totalBytesIn += bytesIn if err != nil { span.RecordError(err) @@ -380,85 +380,6 @@ func (a *Aggregator) aggregateAPMEvent( return totalBytesIn, nil } -// partition will partition a combined metrics into smaller partitions based -// on the partitioning logic. It will ignore overflows as the partition will -// be executed on a single APMEvent with no possibility of overflows. -func (a *Aggregator) partition( - unpartitionedKey CombinedMetricsKey, - cm CombinedMetrics, -) []CombinedKV { - var ckvs []CombinedKV - for svcKey, svc := range cm.Services { - hasher := Hasher{}.Chain(svcKey) - for svcInsKey, svcIns := range svc.ServiceInstanceGroups { - svcInsHasher := hasher.Chain(svcInsKey) - for svcTxnKey, svcTxn := range svcIns.ServiceTransactionGroups { - ck := unpartitionedKey - ck.PartitionID = a.cfg.Partitioner.Partition(svcInsHasher.Chain(svcTxnKey).Sum()) - cv := CombinedMetrics{ - Services: map[ServiceAggregationKey]ServiceMetrics{ - svcKey: ServiceMetrics{ - ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{ - svcInsKey: ServiceInstanceMetrics{ - ServiceTransactionGroups: map[ServiceTransactionAggregationKey]ServiceTransactionMetrics{ - svcTxnKey: svcTxn, - }, - }, - }, - }, - }, - } - ckvs = append(ckvs, CombinedKV{Key: ck, Value: cv}) - } - for txnKey, txn := range svcIns.TransactionGroups { - ck := unpartitionedKey - ck.PartitionID = a.cfg.Partitioner.Partition(svcInsHasher.Chain(txnKey).Sum()) - cv := CombinedMetrics{ - Services: map[ServiceAggregationKey]ServiceMetrics{ - svcKey: ServiceMetrics{ - ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{ - svcInsKey: ServiceInstanceMetrics{ - TransactionGroups: map[TransactionAggregationKey]TransactionMetrics{ - txnKey: txn, - }, - }, - }, - }, - }, - } - ckvs = append(ckvs, CombinedKV{Key: ck, Value: cv}) - } - for spanKey, span := range svcIns.SpanGroups { - ck := unpartitionedKey - ck.PartitionID = a.cfg.Partitioner.Partition(svcInsHasher.Chain(spanKey).Sum()) - cv := CombinedMetrics{ - Services: map[ServiceAggregationKey]ServiceMetrics{ - svcKey: ServiceMetrics{ - ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{ - svcInsKey: ServiceInstanceMetrics{ - SpanGroups: map[SpanAggregationKey]SpanMetrics{ - spanKey: span, - }, - }, - }, - }, - }, - } - ckvs = append(ckvs, CombinedKV{Key: ck, Value: cv}) - } - } - } - // Approximate events total by uniformly distributing the events total - // amongst the partitioned key values. - weightedEventsTotal := cm.eventsTotal / float64(len(ckvs)) - for i := range ckvs { - cv := &ckvs[i].Value - cv.eventsTotal = weightedEventsTotal - cv.youngestEventTimestamp = cm.youngestEventTimestamp - } - return ckvs -} - // aggregate aggregates combined metrics for a given key and returns // number of bytes ingested along with the error, if any. func (a *Aggregator) aggregate( diff --git a/aggregators/aggregator_test.go b/aggregators/aggregator_test.go index d350d82..2ed3117 100644 --- a/aggregators/aggregator_test.go +++ b/aggregators/aggregator_test.go @@ -1117,7 +1117,12 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) { b.Cleanup(func() { agg.Stop(context.Background()) }) - cm, err := EventToCombinedMetrics( + cmk := CombinedMetricsKey{ + Interval: aggIvl, + ProcessingTime: time.Now().Truncate(aggIvl), + ID: "testid", + } + kvs, err := EventToCombinedMetrics( &modelpb.APMEvent{ Processor: modelpb.TransactionProcessor(), Event: &modelpb.Event{Duration: durationpb.New(time.Millisecond)}, @@ -1126,23 +1131,20 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) { RepresentativeCount: 1, }, }, - aggIvl, + cmk, NewHashPartitioner(1), ) if err != nil { b.Fatal(err) } b.ResetTimer() for i := 0; i < b.N; i++ { - if err := agg.AggregateCombinedMetrics( - context.Background(), - CombinedMetricsKey{ - Interval: aggIvl, - ProcessingTime: time.Now().Truncate(aggIvl), - ID: "testid", - }, - cm, - ); err != nil { - b.Fatal(err) + for _, kv := range kvs { + if err := agg.AggregateCombinedMetrics( + context.Background(), + kv.Key, kv.Value, + ); err != nil { + b.Fatal(err) + } } } } diff --git a/aggregators/converter.go b/aggregators/converter.go index 2561bc2..9a10abd 100644 --- a/aggregators/converter.go +++ b/aggregators/converter.go @@ -35,86 +35,148 @@ func setMetricCountBasedOnOutcome(stm *ServiceTransactionMetrics, from *modelpb. } } -// EventToCombinedMetrics converts APMEvent to CombinedMetrics. +// EventToCombinedMetrics converts APMEvent to CombinedMetrics. Metrics are +// partitioned into smaller partitions based on the partitioning logic. It +// will ignore overflows as the partition will be executed on a single +// APMEvent with no possibility of overflows. func EventToCombinedMetrics( e *modelpb.APMEvent, - aggInterval time.Duration, -) (CombinedMetrics, error) { + unpartitionedKey CombinedMetricsKey, + partitioner Partitioner, +) ([]CombinedKV, error) { var ( - cm CombinedMetrics - sim ServiceInstanceMetrics + kvs []CombinedKV + gl GlobalLabels ) + gl.fromLabelsAndNumericLabels(e.Labels, e.NumericLabels) + gls, err := gl.MarshalString() + if err != nil { + return nil, err + } - cm.eventsTotal = 1 // combined metrics is representing a single APM event - cm.youngestEventTimestamp = e.GetEvent().GetReceived().AsTime() + svcKey := serviceKey(e, unpartitionedKey.Interval) + svcInstanceKey := ServiceInstanceAggregationKey{GlobalLabelsStr: gls} + hasher := Hasher{}.Chain(svcKey).Chain(svcInstanceKey) processor := e.GetProcessor() switch { case processor.IsTransaction(): repCount := e.GetTransaction().GetRepresentativeCount() if repCount <= 0 { - return cm, nil + return nil, nil } tm := newTransactionMetrics() - stm := newServiceTransactionMetrics() tm.Histogram.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount) - stm.Histogram.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount) + txnKey := transactionKey(e) + cmk := unpartitionedKey + cmk.PartitionID = partitioner.Partition(hasher.Chain(txnKey).Sum()) + kvs = append(kvs, CombinedKV{ + Key: cmk, + Value: CombinedMetrics{ + Services: map[ServiceAggregationKey]ServiceMetrics{ + svcKey: ServiceMetrics{ + ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{ + svcInstanceKey: ServiceInstanceMetrics{ + TransactionGroups: map[TransactionAggregationKey]TransactionMetrics{txnKey: tm}, + }, + }, + }, + }, + }, + }) + stm := newServiceTransactionMetrics() + stm.Histogram.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount) setMetricCountBasedOnOutcome(&stm, e) - sim.TransactionGroups = map[TransactionAggregationKey]TransactionMetrics{ - transactionKey(e): tm, - } - sim.ServiceTransactionGroups = map[ServiceTransactionAggregationKey]ServiceTransactionMetrics{ - serviceTransactionKey(e): stm, - } + svcTxnKey := serviceTransactionKey(e) + cmk.PartitionID = partitioner.Partition(hasher.Chain(svcTxnKey).Sum()) + kvs = append(kvs, CombinedKV{ + Key: cmk, + Value: CombinedMetrics{ + Services: map[ServiceAggregationKey]ServiceMetrics{ + svcKey: ServiceMetrics{ + ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{ + svcInstanceKey: ServiceInstanceMetrics{ + ServiceTransactionGroups: map[ServiceTransactionAggregationKey]ServiceTransactionMetrics{svcTxnKey: stm}, + }, + }, + }, + }, + }, + }) + // Handle dropped span stats - dss := e.GetTransaction().GetDroppedSpansStats() - var spanGroups map[SpanAggregationKey]SpanMetrics - if len(dss) > 0 { - spanGroups = make(map[SpanAggregationKey]SpanMetrics, len(dss)) - for _, ds := range dss { - spanGroups[droppedSpanStatsKey(ds)] = SpanMetrics{ - Count: float64(ds.GetDuration().GetCount()) * repCount, - Sum: float64(ds.GetDuration().GetSum().AsDuration()) * repCount, - } - } - sim.SpanGroups = spanGroups + for _, dss := range e.GetTransaction().GetDroppedSpansStats() { + dssKey := droppedSpanStatsKey(dss) + cmk.PartitionID = partitioner.Partition(hasher.Chain(dssKey).Sum()) + kvs = append(kvs, CombinedKV{ + Key: cmk, + Value: CombinedMetrics{ + Services: map[ServiceAggregationKey]ServiceMetrics{ + svcKey: ServiceMetrics{ + ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{ + svcInstanceKey: ServiceInstanceMetrics{ + SpanGroups: map[SpanAggregationKey]SpanMetrics{ + dssKey: SpanMetrics{ + Count: float64(dss.GetDuration().GetCount()) * repCount, + Sum: float64(dss.GetDuration().GetSum().AsDuration()) * repCount, + }, + }, + }, + }, + }, + }, + }, + }) } case processor.IsSpan(): target := e.GetService().GetTarget() repCount := e.GetSpan().GetRepresentativeCount() destSvc := e.GetSpan().GetDestinationService().GetResource() if repCount <= 0 || (target == nil && destSvc == "") { - return cm, nil + return nil, nil } - var count uint32 - count = 1 + + var count uint32 = 1 duration := e.GetEvent().GetDuration().AsDuration() - composite := e.GetSpan().GetComposite() - if composite != nil { + if composite := e.GetSpan().GetComposite(); composite != nil { count = composite.GetCount() duration = time.Duration(composite.GetSum() * float64(time.Millisecond)) } - sim.SpanGroups = map[SpanAggregationKey]SpanMetrics{ - spanKey(e): SpanMetrics{ - Count: float64(count) * repCount, - Sum: float64(duration) * repCount, + spanKey := spanKey(e) + cmk := unpartitionedKey + cmk.PartitionID = partitioner.Partition(hasher.Chain(spanKey).Sum()) + kvs = append(kvs, CombinedKV{ + Key: cmk, + Value: CombinedMetrics{ + Services: map[ServiceAggregationKey]ServiceMetrics{ + svcKey: ServiceMetrics{ + ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{ + svcInstanceKey: ServiceInstanceMetrics{ + SpanGroups: map[SpanAggregationKey]SpanMetrics{ + spanKey: SpanMetrics{ + Count: float64(count) * repCount, + Sum: float64(duration) * repCount, + }, + }, + }, + }, + }, + }, }, - } + }) } - var gl GlobalLabels - gl.fromLabelsAndNumericLabels(e.GetLabels(), e.GetNumericLabels()) - gls, err := gl.MarshalString() - if err != nil { - return CombinedMetrics{}, err - } - sm := newServiceMetrics() - sm.ServiceInstanceGroups[ServiceInstanceAggregationKey{GlobalLabelsStr: gls}] = sim - cm.Services = map[ServiceAggregationKey]ServiceMetrics{ - serviceKey(e, aggInterval): sm, + // Approximate events total by uniformly distributing the events total + // amongst the partitioned key values. + weightedEventsTotal := 1 / float64(len(kvs)) + eventTs := e.GetEvent().GetReceived().AsTime() + for i := range kvs { + cv := &kvs[i].Value + cv.eventsTotal = weightedEventsTotal + cv.youngestEventTimestamp = eventTs } - return cm, nil + return kvs, nil } // CombinedMetricsToBatch converts CombinedMetrics to a batch of APMEvents. diff --git a/aggregators/converter_test.go b/aggregators/converter_test.go index a704225..0f484c6 100644 --- a/aggregators/converter_test.go +++ b/aggregators/converter_test.go @@ -44,15 +44,58 @@ func TestEventToCombinedMetrics(t *testing.T) { Type: "testtyp", }, } - cm, err := EventToCombinedMetrics(event, time.Minute) + cmk := CombinedMetricsKey{ + Interval: time.Minute, + ProcessingTime: time.Now().Truncate(time.Minute), + ID: "test-id", + } + kvs, err := EventToCombinedMetrics(event, cmk, NewHashPartitioner(1)) require.NoError(t, err) - expected := CombinedMetrics( - *createTestCombinedMetrics(withEventsTotal(1), withYoungestEventTimestamp(receivedTS)). - addTransaction(ts.Truncate(time.Minute), event.Service.Name, "", testTransaction{txnName: event.Transaction.Name, txnType: event.Transaction.Type, eventOutcome: event.Event.Outcome, count: 1}). - addServiceTransaction(ts.Truncate(time.Minute), event.Service.Name, "", testServiceTransaction{txnType: event.Transaction.Type, count: 1}), + var expected []CombinedKV + createTestCombinedMetrics( + withEventsTotal(0.5), // one for txn and one for svc txn + withYoungestEventTimestamp(receivedTS), + ) + expected = append(expected, + CombinedKV{ + Key: cmk, + Value: CombinedMetrics( + *createTestCombinedMetrics( + withEventsTotal(0.5), // one for txn and one for svc txn + withYoungestEventTimestamp(receivedTS), + ).addTransaction( + ts.Truncate(time.Minute), + event.Service.Name, + "", + testTransaction{ + txnName: event.Transaction.Name, + txnType: event.Transaction.Type, + eventOutcome: event.Event.Outcome, + count: 1, + }, + ), + ), + }, + CombinedKV{ + Key: cmk, + Value: CombinedMetrics( + *createTestCombinedMetrics( + withEventsTotal(0.5), // one for txn and one for svc txn + withYoungestEventTimestamp(receivedTS), + ).addServiceTransaction( + ts.Truncate(time.Minute), + event.Service.Name, + "", + testServiceTransaction{ + txnType: event.Transaction.Type, + count: 1, + }, + ), + ), + }, ) assert.Empty(t, cmp.Diff( - expected, cm, + expected, kvs, cmpopts.EquateEmpty(), cmp.Comparer(func(a, b hdrhistogram.HybridCountsRep) bool { return a.Equal(&b) @@ -229,9 +272,15 @@ func BenchmarkEventToCombinedMetrics(b *testing.B) { Type: "testtyp", }, } + cmk := CombinedMetricsKey{ + Interval: time.Minute, + ProcessingTime: time.Now().Truncate(time.Minute), + ID: "testid", + } + partitioner := NewHashPartitioner(1) b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := EventToCombinedMetrics(event, time.Minute) + _, err := EventToCombinedMetrics(event, cmk, partitioner) if err != nil { b.Fatal(err) }