Skip to content

Commit

Permalink
Merge branch 'main' into queue_partitioner
Browse files Browse the repository at this point in the history
  • Loading branch information
lahsivjar committed Jul 12, 2023
2 parents 13434d5 + 1a75213 commit 490c9a0
Show file tree
Hide file tree
Showing 16 changed files with 728 additions and 525 deletions.
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @elastic/apm-server
632 changes: 311 additions & 321 deletions aggregationpb/aggregation.pb.go

Large diffs are not rendered by default.

132 changes: 16 additions & 116 deletions aggregationpb/aggregation_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 64 additions & 18 deletions aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"testing"
"time"

"go.uber.org/zap"

"github.com/cockroachdb/pebble"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand All @@ -33,6 +35,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-aggregation/aggregators/internal/hdrhistogram"
"github.com/elastic/apm-data/model/modelpb"
)

Expand Down Expand Up @@ -211,7 +214,6 @@ func TestAggregateBatch(t *testing.T) {
stm.Histogram.RecordDuration(txnDuration, 1)
stm.SuccessCount++
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].ServiceTransactionGroups[stxKey] = stm

sm := expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[spanKey]
sm.Count++
expectedCombinedMetrics.Services[svcKey].ServiceInstanceGroups[sik].SpanGroups[spanKey] = sm
Expand All @@ -230,6 +232,9 @@ func TestAggregateBatch(t *testing.T) {
expectedCombinedMetrics, cm,
cmpopts.EquateEmpty(),
cmpopts.EquateApprox(0, 0.01),
cmp.Comparer(func(a, b hdrhistogram.HybridCountsRep) bool {
return a.Equal(&b)
}),
cmp.AllowUnexported(CombinedMetrics{}),
))
assert.Empty(t, cmp.Diff(
Expand Down Expand Up @@ -788,7 +793,7 @@ func TestHarvest(t *testing.T) {
expectedMeasurements = append(expectedMeasurements, apmmodel.Metrics{
Samples: map[string]apmmodel.Metric{
"aggregator.requests.total": {Value: 1},
"aggregator.bytes.ingested": {Value: 417},
"aggregator.bytes.ingested": {Value: 393},
},
Labels: apmmodel.StringMap{
apmmodel.StringMapItem{Key: "id_key", Value: cmID},
Expand Down Expand Up @@ -1080,6 +1085,7 @@ func TestRunStopOrchestration(t *testing.T) {
}

func BenchmarkAggregateCombinedMetrics(b *testing.B) {
b.ReportAllocs()
gatherer, err := apmotel.NewGatherer()
if err != nil {
b.Fatal(err)
Expand All @@ -1100,6 +1106,7 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) {
}),
WithProcessor(noOpProcessor()),
WithMeter(mp.Meter("test")),
WithLogger(zap.NewNop()),
)
if err != nil {
b.Fatal(err)
Expand Down Expand Up @@ -1140,9 +1147,39 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) {
}
}

func BenchmarkAggregateBatch(b *testing.B) {
func BenchmarkAggregateBatchSerial(b *testing.B) {
b.ReportAllocs()
agg := newTestAggregator(b)
batch := newTestBatchForBenchmark()
b.ResetTimer()

for i := 0; i < b.N; i++ {
if err := agg.AggregateBatch(context.Background(), "test", batch); err != nil {
b.Fatal(err)
}
}
flushTestAggregator(b, agg)
}

func BenchmarkAggregateBatchParallel(b *testing.B) {
b.ReportAllocs()
agg := newTestAggregator(b)
batch := newTestBatchForBenchmark()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if err := agg.AggregateBatch(context.Background(), "test", batch); err != nil {
b.Fatal(err)
}
}
})
flushTestAggregator(b, agg)
}

func newTestAggregator(tb testing.TB) *Aggregator {
agg, err := New(
WithDataDir(b.TempDir()),
WithDataDir(tb.TempDir()),
WithLimits(Limits{
MaxSpanGroups: 1000,
MaxSpanGroupsPerService: 100,
Expand All @@ -1154,18 +1191,32 @@ func BenchmarkAggregateBatch(b *testing.B) {
MaxServiceInstanceGroupsPerService: 100,
}),
WithProcessor(noOpProcessor()),
WithAggregationIntervals([]time.Duration{time.Minute}),
WithAggregationIntervals([]time.Duration{time.Second, time.Minute, time.Hour}),
WithLogger(zap.NewNop()),
)
if err != nil {
b.Fatal(err)
tb.Fatal(err)
}
go func() {
agg.Run(context.Background())
}()
b.Cleanup(func() {
agg.Stop(context.Background())
})
batch := &modelpb.Batch{
return agg
}

func flushTestAggregator(tb testing.TB, agg *Aggregator) {
if agg.batch != nil {
if err := agg.batch.Commit(pebble.Sync); err != nil {
tb.Fatal(err)
}
if err := agg.batch.Close(); err != nil {
tb.Fatal(err)
}
agg.batch = nil
}
if err := agg.db.Close(); err != nil {
tb.Fatal(err)
}
}

func newTestBatchForBenchmark() *modelpb.Batch {
return &modelpb.Batch{
&modelpb.APMEvent{
Processor: modelpb.TransactionProcessor(),
Event: &modelpb.Event{Duration: durationpb.New(time.Millisecond)},
Expand All @@ -1175,11 +1226,6 @@ func BenchmarkAggregateBatch(b *testing.B) {
},
},
}
for i := 0; i < b.N; i++ {
if err := agg.AggregateBatch(context.Background(), "test", batch); err != nil {
b.Fatal(err)
}
}
}

func noOpProcessor() Processor {
Expand Down
Loading

0 comments on commit 490c9a0

Please sign in to comment.