Skip to content

Commit

Permalink
Performance improvements (#24)
Browse files Browse the repository at this point in the history
* Reuse labels backing array

* Remove tracing for AggregateBatch
  • Loading branch information
carsonip authored Jul 13, 2023
1 parent f6655e2 commit 8825af3
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 17 deletions.
15 changes: 0 additions & 15 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ func (a *Aggregator) AggregateBatch(
b *modelpb.Batch,
) error {
cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(id)
ctx, span := a.cfg.Tracer.Start(ctx, "AggregateBatch", trace.WithAttributes(cmIDAttrs...))
defer span.End()

a.mu.Lock()
defer a.mu.Unlock()
Expand All @@ -158,7 +156,6 @@ func (a *Aggregator) AggregateBatch(
for _, e := range *b {
bytesIn, err := a.aggregateAPMEvent(ctx, cmk, e)
if err != nil {
span.RecordError(err)
errs = append(errs, err)
}
totalBytesIn += int64(bytesIn)
Expand All @@ -168,7 +165,6 @@ func (a *Aggregator) AggregateBatch(
a.cachedStats[ivl][id] = cmStats
}

span.SetAttributes(attribute.Int64("total_bytes_ingested", totalBytesIn))
cmIDAttrSet := attribute.NewSet(cmIDAttrs...)
a.metrics.RequestsTotal.Add(ctx, 1, metric.WithAttributeSet(cmIDAttrSet))
a.metrics.BytesIngested.Add(ctx, totalBytesIn, metric.WithAttributeSet(cmIDAttrSet))
Expand Down Expand Up @@ -354,23 +350,12 @@ func (a *Aggregator) aggregateAPMEvent(
cmk CombinedMetricsKey,
e *modelpb.APMEvent,
) (int, error) {
traceAttrs := append(
a.cfg.CombinedMetricsIDToKVs(cmk.ID),
attribute.String(aggregationIvlKey, formatDuration(cmk.Interval)),
attribute.String("processing_time", cmk.ProcessingTime.String()),
)
ctx, span := a.cfg.Tracer.Start(ctx, "aggregateAPMEvent", trace.WithAttributes(traceAttrs...))
defer span.End()

cm, err := EventToCombinedMetrics(e, cmk.Interval)
if err != nil {
span.RecordError(err)
return 0, fmt.Errorf("failed to convert event to combined metrics: %w", err)
}
bytesIn, err := a.aggregate(ctx, cmk, cm)
span.SetAttributes(attribute.Int("bytes_ingested", bytesIn))
if err != nil {
span.RecordError(err)
return bytesIn, fmt.Errorf("failed to aggregate combined metrics: %w", err)
}
return bytesIn, nil
Expand Down
8 changes: 6 additions & 2 deletions aggregators/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ func (gl *GlobalLabels) ToProto() *aggregationpb.GlobalLabels {

// Keys must be sorted to ensure wire formats are deterministically generated and strings are directly comparable
// i.e. Protobuf formats are equal if and only if the structs are equal
pb.Labels = make([]*aggregationpb.Label, 0, len(gl.Labels))
if len(gl.Labels) > cap(pb.Labels) {
pb.Labels = make([]*aggregationpb.Label, 0, len(gl.Labels))
}
for k, v := range gl.Labels {
l := aggregationpb.LabelFromVTPool()
l.Key = k
Expand All @@ -487,7 +489,9 @@ func (gl *GlobalLabels) ToProto() *aggregationpb.GlobalLabels {
return pb.Labels[i].Key < pb.Labels[j].Key
})

pb.NumericLabels = make([]*aggregationpb.NumericLabel, 0, len(gl.NumericLabels))
if len(gl.NumericLabels) > cap(pb.NumericLabels) {
pb.NumericLabels = make([]*aggregationpb.NumericLabel, 0, len(gl.NumericLabels))
}
for k, v := range gl.NumericLabels {
l := aggregationpb.NumericLabelFromVTPool()
l.Key = k
Expand Down

0 comments on commit 8825af3

Please sign in to comment.