Skip to content

Commit

Permalink
Remove Partitioner interface (#62)
Browse files Browse the repository at this point in the history
Configure a number of partitions, and always
just calculate the partition has hash%partitions.
The existing API is more or less required to do
this anyway, since it receives a hash.
  • Loading branch information
axw authored Aug 1, 2023
1 parent b719ec3 commit b464118
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 42 deletions.
2 changes: 1 addition & 1 deletion aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (a *Aggregator) aggregateAPMEvent(
totalBytesIn += bytesIn
return err
}
err := EventToCombinedMetrics(e, cmk, a.cfg.Partitioner, aggregateFunc)
err := EventToCombinedMetrics(e, cmk, a.cfg.Partitions, aggregateFunc)
if err != nil {
return 0, fmt.Errorf("failed to aggregate combined metrics: %w", err)
}
Expand Down
28 changes: 12 additions & 16 deletions aggregators/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,12 @@ type Processor func(
aggregationIvl time.Duration,
) error

// Partitioner partitions the aggregation key based on the configured
// partition logic.
type Partitioner interface {
Partition(Hasher) uint16
}

// Config contains the required config for running the aggregator.
type Config struct {
DataDir string
Limits Limits
Processor Processor
Partitioner Partitioner
Partitions uint16
AggregationIntervals []time.Duration
HarvestDelay time.Duration
CombinedMetricsIDToKVs func([16]byte) []attribute.KeyValue
Expand Down Expand Up @@ -93,13 +87,15 @@ func WithProcessor(processor Processor) Option {
}
}

// WithPartitioner configures a partitioner for partitioning the combined
// metrics in pebble. Partition IDs are encoded in a way that all the
// partitions of a specific combined metric are listed before any other if
// compared using the bytes comparer.
func WithPartitioner(partitioner Partitioner) Option {
// WithPartitions configures the number of partitions for combined metrics
// written to pebble. Defaults to 1.
//
// Partition IDs are encoded in a way that all the partitions of a specific
// combined metric are listed before any other if compared using the bytes
// comparer.
func WithPartitions(n uint16) Option {
return func(c Config) Config {
c.Partitioner = partitioner
c.Partitions = n
return c
}
}
Expand Down Expand Up @@ -185,7 +181,7 @@ func defaultCfg() Config {
return Config{
DataDir: "/tmp",
Processor: stdoutProcessor,
Partitioner: NewHashPartitioner(1),
Partitions: 1,
AggregationIntervals: []time.Duration{time.Minute},
Meter: otel.Meter(instrumentationName),
Tracer: otel.Tracer(instrumentationName),
Expand All @@ -201,8 +197,8 @@ func validateCfg(cfg Config) error {
if cfg.Processor == nil {
return errors.New("processor is required")
}
if cfg.Partitioner == nil {
return errors.New("partitioner is required")
if cfg.Partitions == 0 {
return errors.New("partitions must be greater than zero")
}
if len(cfg.AggregationIntervals) == 0 {
return errors.New("at least one aggregation interval is required")
Expand Down
16 changes: 8 additions & 8 deletions aggregators/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ var (
// partitionedMetricsBuilder provides support for building partitioned
// sets of metrics from an event.
type partitionedMetricsBuilder struct {
partitioner Partitioner
hasher Hasher
builders []*eventMetricsBuilder // partitioned metrics
partitions uint16
hasher Hasher
builders []*eventMetricsBuilder // partitioned metrics

// Event metrics are for exactly one service instance, so we create an
// array of a single element and use that for backing the slice in
Expand All @@ -66,7 +66,7 @@ type partitionedMetricsBuilder struct {
func getPartitionedMetricsBuilder(
serviceAggregationKey aggregationpb.ServiceAggregationKey,
serviceInstanceAggregationKey aggregationpb.ServiceInstanceAggregationKey,
partitioner Partitioner,
partitions uint16,
) *partitionedMetricsBuilder {
p, ok := partitionedMetricsBuilderPool.Get().(*partitionedMetricsBuilder)
if !ok {
Expand All @@ -83,7 +83,7 @@ func getPartitionedMetricsBuilder(
p.serviceAggregationKey = serviceAggregationKey
p.serviceInstanceAggregationKey = serviceInstanceAggregationKey
p.hasher = Hasher{}.Chain(&p.serviceAggregationKey).Chain(&p.serviceInstanceAggregationKey)
p.partitioner = partitioner
p.partitions = partitions
return p
}

Expand Down Expand Up @@ -212,7 +212,7 @@ func (p *partitionedMetricsBuilder) addServiceSummaryMetrics() {
}

func (p *partitionedMetricsBuilder) get(h Hasher) *eventMetricsBuilder {
partition := p.partitioner.Partition(h)
partition := uint16(h.Sum() % uint64(p.partitions))
for _, mb := range p.builders {
if mb.partition == partition {
return mb
Expand Down Expand Up @@ -323,7 +323,7 @@ func (mb *eventMetricsBuilder) release() {
func EventToCombinedMetrics(
e *modelpb.APMEvent,
unpartitionedKey CombinedMetricsKey,
partitioner Partitioner,
partitions uint16,
callback func(CombinedMetricsKey, *aggregationpb.CombinedMetrics) error,
) error {
globalLabels, err := marshalEventGlobalLabels(e)
Expand All @@ -342,7 +342,7 @@ func EventToCombinedMetrics(
AgentName: e.GetAgent().GetName(),
},
aggregationpb.ServiceInstanceAggregationKey{GlobalLabelsStr: globalLabels},
partitioner,
partitions,
)
defer pmb.release()

Expand Down
33 changes: 16 additions & 17 deletions aggregators/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ func TestEventToCombinedMetrics(t *testing.T) {
},
}
for _, tc := range []struct {
name string
input func() []*modelpb.APMEvent
partitioner Partitioner
expected func() []*aggregationpb.CombinedMetrics
name string
input func() []*modelpb.APMEvent
partitions uint16
expected func() []*aggregationpb.CombinedMetrics
}{
{
name: "nil-input",
input: func() []*modelpb.APMEvent {
return nil
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return nil
},
Expand All @@ -64,7 +64,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return nil
},
Expand All @@ -80,7 +80,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return nil
},
Expand All @@ -128,7 +128,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return nil
},
Expand All @@ -150,7 +150,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
event.Event.Duration = durationpb.New(time.Nanosecond)
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
event.Event.Duration = durationpb.New(time.Nanosecond)
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand All @@ -234,7 +234,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
event.Log = &modelpb.Log{}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{success, unknown}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
return nil
}
for _, e := range tc.input() {
err := EventToCombinedMetrics(e, cmk, tc.partitioner, collector)
err := EventToCombinedMetrics(e, cmk, tc.partitions, collector)
require.NoError(t, err)
}
assert.Empty(t, cmp.Diff(
Expand Down Expand Up @@ -543,13 +543,12 @@ func BenchmarkEventToCombinedMetrics(b *testing.B) {
ProcessingTime: time.Now().Truncate(time.Minute),
ID: EncodeToCombinedMetricsKeyID(b, "ab01"),
}
partitioner := NewHashPartitioner(1)
noop := func(_ CombinedMetricsKey, _ *aggregationpb.CombinedMetrics) error {
return nil
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := EventToCombinedMetrics(event, cmk, partitioner, noop)
err := EventToCombinedMetrics(event, cmk, 1 /*partitions*/, noop)
if err != nil {
b.Fatal(err)
}
Expand Down

0 comments on commit b464118

Please sign in to comment.