diff --git a/aggregators/aggregator.go b/aggregators/aggregator.go index 5c9d13a..3d4d36c 100644 --- a/aggregators/aggregator.go +++ b/aggregators/aggregator.go @@ -326,7 +326,7 @@ func (a *Aggregator) aggregateAPMEvent( totalBytesIn += bytesIn return err } - err := EventToCombinedMetrics(e, cmk, a.cfg.Partitioner, aggregateFunc) + err := EventToCombinedMetrics(e, cmk, a.cfg.Partitions, aggregateFunc) if err != nil { return 0, fmt.Errorf("failed to aggregate combined metrics: %w", err) } diff --git a/aggregators/config.go b/aggregators/config.go index 5ee0f6b..8cebf08 100644 --- a/aggregators/config.go +++ b/aggregators/config.go @@ -33,18 +33,12 @@ type Processor func( aggregationIvl time.Duration, ) error -// Partitioner partitions the aggregation key based on the configured -// partition logic. -type Partitioner interface { - Partition(Hasher) uint16 -} - // Config contains the required config for running the aggregator. type Config struct { DataDir string Limits Limits Processor Processor - Partitioner Partitioner + Partitions uint16 AggregationIntervals []time.Duration HarvestDelay time.Duration CombinedMetricsIDToKVs func([16]byte) []attribute.KeyValue @@ -93,13 +87,15 @@ func WithProcessor(processor Processor) Option { } } -// WithPartitioner configures a partitioner for partitioning the combined -// metrics in pebble. Partition IDs are encoded in a way that all the -// partitions of a specific combined metric are listed before any other if -// compared using the bytes comparer. -func WithPartitioner(partitioner Partitioner) Option { +// WithPartitions configures the number of partitions for combined metrics +// written to pebble. Defaults to 1. +// +// Partition IDs are encoded in a way that all the partitions of a specific +// combined metric are listed before any other if compared using the bytes +// comparer. +func WithPartitions(n uint16) Option { return func(c Config) Config { - c.Partitioner = partitioner + c.Partitions = n return c } } @@ -185,7 +181,7 @@ func defaultCfg() Config { return Config{ DataDir: "/tmp", Processor: stdoutProcessor, - Partitioner: NewHashPartitioner(1), + Partitions: 1, AggregationIntervals: []time.Duration{time.Minute}, Meter: otel.Meter(instrumentationName), Tracer: otel.Tracer(instrumentationName), @@ -201,8 +197,8 @@ func validateCfg(cfg Config) error { if cfg.Processor == nil { return errors.New("processor is required") } - if cfg.Partitioner == nil { - return errors.New("partitioner is required") + if cfg.Partitions == 0 { + return errors.New("partitions must be greater than zero") } if len(cfg.AggregationIntervals) == 0 { return errors.New("at least one aggregation interval is required") diff --git a/aggregators/converter.go b/aggregators/converter.go index 884f881..3ccbb3c 100644 --- a/aggregators/converter.go +++ b/aggregators/converter.go @@ -38,9 +38,9 @@ var ( // partitionedMetricsBuilder provides support for building partitioned // sets of metrics from an event. type partitionedMetricsBuilder struct { - partitioner Partitioner - hasher Hasher - builders []*eventMetricsBuilder // partitioned metrics + partitions uint16 + hasher Hasher + builders []*eventMetricsBuilder // partitioned metrics // Event metrics are for exactly one service instance, so we create an // array of a single element and use that for backing the slice in @@ -66,7 +66,7 @@ type partitionedMetricsBuilder struct { func getPartitionedMetricsBuilder( serviceAggregationKey aggregationpb.ServiceAggregationKey, serviceInstanceAggregationKey aggregationpb.ServiceInstanceAggregationKey, - partitioner Partitioner, + partitions uint16, ) *partitionedMetricsBuilder { p, ok := partitionedMetricsBuilderPool.Get().(*partitionedMetricsBuilder) if !ok { @@ -83,7 +83,7 @@ func getPartitionedMetricsBuilder( p.serviceAggregationKey = serviceAggregationKey p.serviceInstanceAggregationKey = serviceInstanceAggregationKey p.hasher = Hasher{}.Chain(&p.serviceAggregationKey).Chain(&p.serviceInstanceAggregationKey) - p.partitioner = partitioner + p.partitions = partitions return p } @@ -212,7 +212,7 @@ func (p *partitionedMetricsBuilder) addServiceSummaryMetrics() { } func (p *partitionedMetricsBuilder) get(h Hasher) *eventMetricsBuilder { - partition := p.partitioner.Partition(h) + partition := uint16(h.Sum() % uint64(p.partitions)) for _, mb := range p.builders { if mb.partition == partition { return mb @@ -323,7 +323,7 @@ func (mb *eventMetricsBuilder) release() { func EventToCombinedMetrics( e *modelpb.APMEvent, unpartitionedKey CombinedMetricsKey, - partitioner Partitioner, + partitions uint16, callback func(CombinedMetricsKey, *aggregationpb.CombinedMetrics) error, ) error { globalLabels, err := marshalEventGlobalLabels(e) @@ -342,7 +342,7 @@ func EventToCombinedMetrics( AgentName: e.GetAgent().GetName(), }, aggregationpb.ServiceInstanceAggregationKey{GlobalLabelsStr: globalLabels}, - partitioner, + partitions, ) defer pmb.release() diff --git a/aggregators/converter_test.go b/aggregators/converter_test.go index 0e1adc2..20307b5 100644 --- a/aggregators/converter_test.go +++ b/aggregators/converter_test.go @@ -38,17 +38,17 @@ func TestEventToCombinedMetrics(t *testing.T) { }, } for _, tc := range []struct { - name string - input func() []*modelpb.APMEvent - partitioner Partitioner - expected func() []*aggregationpb.CombinedMetrics + name string + input func() []*modelpb.APMEvent + partitions uint16 + expected func() []*aggregationpb.CombinedMetrics }{ { name: "nil-input", input: func() []*modelpb.APMEvent { return nil }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return nil }, @@ -64,7 +64,7 @@ func TestEventToCombinedMetrics(t *testing.T) { } return []*modelpb.APMEvent{event} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return nil }, @@ -80,7 +80,7 @@ func TestEventToCombinedMetrics(t *testing.T) { } return []*modelpb.APMEvent{event} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return []*aggregationpb.CombinedMetrics{ NewTestCombinedMetrics( @@ -112,7 +112,7 @@ func TestEventToCombinedMetrics(t *testing.T) { } return []*modelpb.APMEvent{event} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return nil }, @@ -128,7 +128,7 @@ func TestEventToCombinedMetrics(t *testing.T) { } return []*modelpb.APMEvent{event} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return nil }, @@ -150,7 +150,7 @@ func TestEventToCombinedMetrics(t *testing.T) { event.Event.Duration = durationpb.New(time.Nanosecond) return []*modelpb.APMEvent{event} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return []*aggregationpb.CombinedMetrics{ NewTestCombinedMetrics( @@ -185,7 +185,7 @@ func TestEventToCombinedMetrics(t *testing.T) { event.Event.Duration = durationpb.New(time.Nanosecond) return []*modelpb.APMEvent{event} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return []*aggregationpb.CombinedMetrics{ NewTestCombinedMetrics( @@ -213,7 +213,7 @@ func TestEventToCombinedMetrics(t *testing.T) { } return []*modelpb.APMEvent{event} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return []*aggregationpb.CombinedMetrics{ NewTestCombinedMetrics( @@ -234,7 +234,7 @@ func TestEventToCombinedMetrics(t *testing.T) { event.Log = &modelpb.Log{} return []*modelpb.APMEvent{event} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return []*aggregationpb.CombinedMetrics{ NewTestCombinedMetrics( @@ -266,7 +266,7 @@ func TestEventToCombinedMetrics(t *testing.T) { } return []*modelpb.APMEvent{success, unknown} }, - partitioner: NewHashPartitioner(1), + partitions: 1, expected: func() []*aggregationpb.CombinedMetrics { return []*aggregationpb.CombinedMetrics{ NewTestCombinedMetrics( @@ -318,7 +318,7 @@ func TestEventToCombinedMetrics(t *testing.T) { return nil } for _, e := range tc.input() { - err := EventToCombinedMetrics(e, cmk, tc.partitioner, collector) + err := EventToCombinedMetrics(e, cmk, tc.partitions, collector) require.NoError(t, err) } assert.Empty(t, cmp.Diff( @@ -543,13 +543,12 @@ func BenchmarkEventToCombinedMetrics(b *testing.B) { ProcessingTime: time.Now().Truncate(time.Minute), ID: EncodeToCombinedMetricsKeyID(b, "ab01"), } - partitioner := NewHashPartitioner(1) noop := func(_ CombinedMetricsKey, _ *aggregationpb.CombinedMetrics) error { return nil } b.ResetTimer() for i := 0; i < b.N; i++ { - err := EventToCombinedMetrics(event, cmk, partitioner, noop) + err := EventToCombinedMetrics(event, cmk, 1 /*partitions*/, noop) if err != nil { b.Fatal(err) }