Skip to content

Commit

Permalink
Refactor EventToCombinedMetrics to partition
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed Jul 12, 2023
1 parent 490c9a0 commit 200fa90
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 149 deletions.
85 changes: 3 additions & 82 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,14 @@ func (a *Aggregator) aggregateAPMEvent(
ctx, span := a.cfg.Tracer.Start(ctx, "aggregateAPMEvent", trace.WithAttributes(traceAttrs...))
defer span.End()

cm, err := EventToCombinedMetrics(e, cmk.Interval)
kvs, err := EventToCombinedMetrics(e, cmk, a.cfg.Partitioner)
if err != nil {
span.RecordError(err)
return 0, fmt.Errorf("failed to convert event to combined metrics: %w", err)
}
var totalBytesIn int
for _, ckv := range a.partition(cmk, cm) {
bytesIn, err := a.aggregate(ctx, ckv.Key, ckv.Value)
for _, kv := range kvs {
bytesIn, err := a.aggregate(ctx, kv.Key, kv.Value)
totalBytesIn += bytesIn
if err != nil {
span.RecordError(err)
Expand All @@ -380,85 +380,6 @@ func (a *Aggregator) aggregateAPMEvent(
return totalBytesIn, nil
}

// partition will partition a combined metrics into smaller partitions based
// on the partitioning logic. It will ignore overflows as the partition will
// be executed on a single APMEvent with no possibility of overflows.
func (a *Aggregator) partition(
unpartitionedKey CombinedMetricsKey,
cm CombinedMetrics,
) []CombinedKV {
var ckvs []CombinedKV
for svcKey, svc := range cm.Services {
hasher := Hasher{}.Chain(svcKey)
for svcInsKey, svcIns := range svc.ServiceInstanceGroups {
svcInsHasher := hasher.Chain(svcInsKey)
for svcTxnKey, svcTxn := range svcIns.ServiceTransactionGroups {
ck := unpartitionedKey
ck.PartitionID = a.cfg.Partitioner.Partition(svcInsHasher.Chain(svcTxnKey).Sum())
cv := CombinedMetrics{
Services: map[ServiceAggregationKey]ServiceMetrics{
svcKey: ServiceMetrics{
ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{
svcInsKey: ServiceInstanceMetrics{
ServiceTransactionGroups: map[ServiceTransactionAggregationKey]ServiceTransactionMetrics{
svcTxnKey: svcTxn,
},
},
},
},
},
}
ckvs = append(ckvs, CombinedKV{Key: ck, Value: cv})
}
for txnKey, txn := range svcIns.TransactionGroups {
ck := unpartitionedKey
ck.PartitionID = a.cfg.Partitioner.Partition(svcInsHasher.Chain(txnKey).Sum())
cv := CombinedMetrics{
Services: map[ServiceAggregationKey]ServiceMetrics{
svcKey: ServiceMetrics{
ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{
svcInsKey: ServiceInstanceMetrics{
TransactionGroups: map[TransactionAggregationKey]TransactionMetrics{
txnKey: txn,
},
},
},
},
},
}
ckvs = append(ckvs, CombinedKV{Key: ck, Value: cv})
}
for spanKey, span := range svcIns.SpanGroups {
ck := unpartitionedKey
ck.PartitionID = a.cfg.Partitioner.Partition(svcInsHasher.Chain(spanKey).Sum())
cv := CombinedMetrics{
Services: map[ServiceAggregationKey]ServiceMetrics{
svcKey: ServiceMetrics{
ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{
svcInsKey: ServiceInstanceMetrics{
SpanGroups: map[SpanAggregationKey]SpanMetrics{
spanKey: span,
},
},
},
},
},
}
ckvs = append(ckvs, CombinedKV{Key: ck, Value: cv})
}
}
}
// Approximate events total by uniformly distributing the events total
// amongst the partitioned key values.
weightedEventsTotal := cm.eventsTotal / float64(len(ckvs))
for i := range ckvs {
cv := &ckvs[i].Value
cv.eventsTotal = weightedEventsTotal
cv.youngestEventTimestamp = cm.youngestEventTimestamp
}
return ckvs
}

// aggregate aggregates combined metrics for a given key and returns
// number of bytes ingested along with the error, if any.
func (a *Aggregator) aggregate(
Expand Down
26 changes: 14 additions & 12 deletions aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,12 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) {
b.Cleanup(func() {
agg.Stop(context.Background())
})
cm, err := EventToCombinedMetrics(
cmk := CombinedMetricsKey{
Interval: aggIvl,
ProcessingTime: time.Now().Truncate(aggIvl),
ID: "testid",
}
kvs, err := EventToCombinedMetrics(
&modelpb.APMEvent{
Processor: modelpb.TransactionProcessor(),
Event: &modelpb.Event{Duration: durationpb.New(time.Millisecond)},
Expand All @@ -1126,23 +1131,20 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) {
RepresentativeCount: 1,
},
},
aggIvl,
cmk, NewHashPartitioner(1),
)
if err != nil {
b.Fatal(err)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if err := agg.AggregateCombinedMetrics(
context.Background(),
CombinedMetricsKey{
Interval: aggIvl,
ProcessingTime: time.Now().Truncate(aggIvl),
ID: "testid",
},
cm,
); err != nil {
b.Fatal(err)
for _, kv := range kvs {
if err := agg.AggregateCombinedMetrics(
context.Background(),
kv.Key, kv.Value,
); err != nil {
b.Fatal(err)
}
}
}
}
Expand Down
158 changes: 110 additions & 48 deletions aggregators/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,86 +35,148 @@ func setMetricCountBasedOnOutcome(stm *ServiceTransactionMetrics, from *modelpb.
}
}

// EventToCombinedMetrics converts APMEvent to CombinedMetrics.
// EventToCombinedMetrics converts APMEvent to CombinedMetrics. Metrics are
// partitioned into smaller partitions based on the partitioning logic. It
// will ignore overflows as the partition will be executed on a single
// APMEvent with no possibility of overflows.
func EventToCombinedMetrics(
e *modelpb.APMEvent,
aggInterval time.Duration,
) (CombinedMetrics, error) {
unpartitionedKey CombinedMetricsKey,
partitioner Partitioner,
) ([]CombinedKV, error) {
var (
cm CombinedMetrics
sim ServiceInstanceMetrics
kvs []CombinedKV
gl GlobalLabels
)
gl.fromLabelsAndNumericLabels(e.Labels, e.NumericLabels)
gls, err := gl.MarshalString()
if err != nil {
return nil, err
}

cm.eventsTotal = 1 // combined metrics is representing a single APM event
cm.youngestEventTimestamp = e.GetEvent().GetReceived().AsTime()
svcKey := serviceKey(e, unpartitionedKey.Interval)
svcInstanceKey := ServiceInstanceAggregationKey{GlobalLabelsStr: gls}
hasher := Hasher{}.Chain(svcKey).Chain(svcInstanceKey)

processor := e.GetProcessor()
switch {
case processor.IsTransaction():
repCount := e.GetTransaction().GetRepresentativeCount()
if repCount <= 0 {
return cm, nil
return nil, nil
}
tm := newTransactionMetrics()
stm := newServiceTransactionMetrics()
tm.Histogram.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount)
stm.Histogram.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount)
txnKey := transactionKey(e)
cmk := unpartitionedKey
cmk.PartitionID = partitioner.Partition(hasher.Chain(txnKey).Sum())
kvs = append(kvs, CombinedKV{
Key: cmk,
Value: CombinedMetrics{
Services: map[ServiceAggregationKey]ServiceMetrics{
svcKey: ServiceMetrics{
ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{
svcInstanceKey: ServiceInstanceMetrics{
TransactionGroups: map[TransactionAggregationKey]TransactionMetrics{txnKey: tm},
},
},
},
},
},
})

stm := newServiceTransactionMetrics()
stm.Histogram.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount)
setMetricCountBasedOnOutcome(&stm, e)
sim.TransactionGroups = map[TransactionAggregationKey]TransactionMetrics{
transactionKey(e): tm,
}
sim.ServiceTransactionGroups = map[ServiceTransactionAggregationKey]ServiceTransactionMetrics{
serviceTransactionKey(e): stm,
}
svcTxnKey := serviceTransactionKey(e)
cmk.PartitionID = partitioner.Partition(hasher.Chain(svcTxnKey).Sum())
kvs = append(kvs, CombinedKV{
Key: cmk,
Value: CombinedMetrics{
Services: map[ServiceAggregationKey]ServiceMetrics{
svcKey: ServiceMetrics{
ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{
svcInstanceKey: ServiceInstanceMetrics{
ServiceTransactionGroups: map[ServiceTransactionAggregationKey]ServiceTransactionMetrics{svcTxnKey: stm},
},
},
},
},
},
})

// Handle dropped span stats
dss := e.GetTransaction().GetDroppedSpansStats()
var spanGroups map[SpanAggregationKey]SpanMetrics
if len(dss) > 0 {
spanGroups = make(map[SpanAggregationKey]SpanMetrics, len(dss))
for _, ds := range dss {
spanGroups[droppedSpanStatsKey(ds)] = SpanMetrics{
Count: float64(ds.GetDuration().GetCount()) * repCount,
Sum: float64(ds.GetDuration().GetSum().AsDuration()) * repCount,
}
}
sim.SpanGroups = spanGroups
for _, dss := range e.GetTransaction().GetDroppedSpansStats() {
dssKey := droppedSpanStatsKey(dss)
cmk.PartitionID = partitioner.Partition(hasher.Chain(dssKey).Sum())
kvs = append(kvs, CombinedKV{
Key: cmk,
Value: CombinedMetrics{
Services: map[ServiceAggregationKey]ServiceMetrics{
svcKey: ServiceMetrics{
ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{
svcInstanceKey: ServiceInstanceMetrics{
SpanGroups: map[SpanAggregationKey]SpanMetrics{
dssKey: SpanMetrics{
Count: float64(dss.GetDuration().GetCount()) * repCount,
Sum: float64(dss.GetDuration().GetSum().AsDuration()) * repCount,
},
},
},
},
},
},
},
})
}
case processor.IsSpan():
target := e.GetService().GetTarget()
repCount := e.GetSpan().GetRepresentativeCount()
destSvc := e.GetSpan().GetDestinationService().GetResource()
if repCount <= 0 || (target == nil && destSvc == "") {
return cm, nil
return nil, nil
}
var count uint32
count = 1

var count uint32 = 1
duration := e.GetEvent().GetDuration().AsDuration()
composite := e.GetSpan().GetComposite()
if composite != nil {
if composite := e.GetSpan().GetComposite(); composite != nil {
count = composite.GetCount()
duration = time.Duration(composite.GetSum() * float64(time.Millisecond))
}
sim.SpanGroups = map[SpanAggregationKey]SpanMetrics{
spanKey(e): SpanMetrics{
Count: float64(count) * repCount,
Sum: float64(duration) * repCount,
spanKey := spanKey(e)
cmk := unpartitionedKey
cmk.PartitionID = partitioner.Partition(hasher.Chain(spanKey).Sum())
kvs = append(kvs, CombinedKV{
Key: cmk,
Value: CombinedMetrics{
Services: map[ServiceAggregationKey]ServiceMetrics{
svcKey: ServiceMetrics{
ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{
svcInstanceKey: ServiceInstanceMetrics{
SpanGroups: map[SpanAggregationKey]SpanMetrics{
spanKey: SpanMetrics{
Count: float64(count) * repCount,
Sum: float64(duration) * repCount,
},
},
},
},
},
},
},
}
})
}

var gl GlobalLabels
gl.fromLabelsAndNumericLabels(e.GetLabels(), e.GetNumericLabels())
gls, err := gl.MarshalString()
if err != nil {
return CombinedMetrics{}, err
}
sm := newServiceMetrics()
sm.ServiceInstanceGroups[ServiceInstanceAggregationKey{GlobalLabelsStr: gls}] = sim
cm.Services = map[ServiceAggregationKey]ServiceMetrics{
serviceKey(e, aggInterval): sm,
// Approximate events total by uniformly distributing the events total
// amongst the partitioned key values.
weightedEventsTotal := 1 / float64(len(kvs))
eventTs := e.GetEvent().GetReceived().AsTime()

Check failure on line 173 in aggregators/converter.go

View workflow job for this annotation

GitHub Actions / lint

var eventTs should be eventTS (ST1003)
for i := range kvs {
cv := &kvs[i].Value
cv.eventsTotal = weightedEventsTotal
cv.youngestEventTimestamp = eventTs
}
return cm, nil
return kvs, nil
}

// CombinedMetricsToBatch converts CombinedMetrics to a batch of APMEvents.
Expand Down
Loading

0 comments on commit 200fa90

Please sign in to comment.