Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Partitioner interface #62

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (a *Aggregator) aggregateAPMEvent(
totalBytesIn += bytesIn
return err
}
err := EventToCombinedMetrics(e, cmk, a.cfg.Partitioner, aggregateFunc)
err := EventToCombinedMetrics(e, cmk, a.cfg.Partitions, aggregateFunc)
if err != nil {
return 0, fmt.Errorf("failed to aggregate combined metrics: %w", err)
}
Expand Down
28 changes: 12 additions & 16 deletions aggregators/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,12 @@ type Processor func(
aggregationIvl time.Duration,
) error

// Partitioner partitions the aggregation key based on the configured
// partition logic.
type Partitioner interface {
Partition(Hasher) uint16
}

// Config contains the required config for running the aggregator.
type Config struct {
DataDir string
Limits Limits
Processor Processor
Partitioner Partitioner
Partitions uint16
AggregationIntervals []time.Duration
HarvestDelay time.Duration
CombinedMetricsIDToKVs func([16]byte) []attribute.KeyValue
Expand Down Expand Up @@ -93,13 +87,15 @@ func WithProcessor(processor Processor) Option {
}
}

// WithPartitioner configures a partitioner for partitioning the combined
// metrics in pebble. Partition IDs are encoded in a way that all the
// partitions of a specific combined metric are listed before any other if
// compared using the bytes comparer.
func WithPartitioner(partitioner Partitioner) Option {
// WithPartitions configures the number of partitions for combined metrics
// written to pebble. Defaults to 1.
//
// Partition IDs are encoded in a way that all the partitions of a specific
// combined metric are listed before any other if compared using the bytes
// comparer.
func WithPartitions(n uint16) Option {
return func(c Config) Config {
c.Partitioner = partitioner
c.Partitions = n
return c
}
}
Expand Down Expand Up @@ -185,7 +181,7 @@ func defaultCfg() Config {
return Config{
DataDir: "/tmp",
Processor: stdoutProcessor,
Partitioner: NewHashPartitioner(1),
Partitions: 1,
AggregationIntervals: []time.Duration{time.Minute},
Meter: otel.Meter(instrumentationName),
Tracer: otel.Tracer(instrumentationName),
Expand All @@ -201,8 +197,8 @@ func validateCfg(cfg Config) error {
if cfg.Processor == nil {
return errors.New("processor is required")
}
if cfg.Partitioner == nil {
return errors.New("partitioner is required")
if cfg.Partitions == 0 {
return errors.New("partitions must be greater than zero")
}
if len(cfg.AggregationIntervals) == 0 {
return errors.New("at least one aggregation interval is required")
Expand Down
16 changes: 8 additions & 8 deletions aggregators/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ var (
// partitionedMetricsBuilder provides support for building partitioned
// sets of metrics from an event.
type partitionedMetricsBuilder struct {
partitioner Partitioner
hasher Hasher
builders []*eventMetricsBuilder // partitioned metrics
partitions uint16
hasher Hasher
builders []*eventMetricsBuilder // partitioned metrics

// Event metrics are for exactly one service instance, so we create an
// array of a single element and use that for backing the slice in
Expand All @@ -66,7 +66,7 @@ type partitionedMetricsBuilder struct {
func getPartitionedMetricsBuilder(
serviceAggregationKey aggregationpb.ServiceAggregationKey,
serviceInstanceAggregationKey aggregationpb.ServiceInstanceAggregationKey,
partitioner Partitioner,
partitions uint16,
) *partitionedMetricsBuilder {
p, ok := partitionedMetricsBuilderPool.Get().(*partitionedMetricsBuilder)
if !ok {
Expand All @@ -83,7 +83,7 @@ func getPartitionedMetricsBuilder(
p.serviceAggregationKey = serviceAggregationKey
p.serviceInstanceAggregationKey = serviceInstanceAggregationKey
p.hasher = Hasher{}.Chain(&p.serviceAggregationKey).Chain(&p.serviceInstanceAggregationKey)
p.partitioner = partitioner
p.partitions = partitions
return p
}

Expand Down Expand Up @@ -212,7 +212,7 @@ func (p *partitionedMetricsBuilder) addServiceSummaryMetrics() {
}

func (p *partitionedMetricsBuilder) get(h Hasher) *eventMetricsBuilder {
partition := p.partitioner.Partition(h)
partition := uint16(h.Sum() % uint64(p.partitions))
for _, mb := range p.builders {
if mb.partition == partition {
return mb
Expand Down Expand Up @@ -323,7 +323,7 @@ func (mb *eventMetricsBuilder) release() {
func EventToCombinedMetrics(
e *modelpb.APMEvent,
unpartitionedKey CombinedMetricsKey,
partitioner Partitioner,
partitions uint16,
callback func(CombinedMetricsKey, *aggregationpb.CombinedMetrics) error,
) error {
globalLabels, err := marshalEventGlobalLabels(e)
Expand All @@ -342,7 +342,7 @@ func EventToCombinedMetrics(
AgentName: e.GetAgent().GetName(),
},
aggregationpb.ServiceInstanceAggregationKey{GlobalLabelsStr: globalLabels},
partitioner,
partitions,
)
defer pmb.release()

Expand Down
33 changes: 16 additions & 17 deletions aggregators/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ func TestEventToCombinedMetrics(t *testing.T) {
},
}
for _, tc := range []struct {
name string
input func() []*modelpb.APMEvent
partitioner Partitioner
expected func() []*aggregationpb.CombinedMetrics
name string
input func() []*modelpb.APMEvent
partitions uint16
expected func() []*aggregationpb.CombinedMetrics
}{
{
name: "nil-input",
input: func() []*modelpb.APMEvent {
return nil
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return nil
},
Expand All @@ -64,7 +64,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return nil
},
Expand All @@ -80,7 +80,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -112,7 +112,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return nil
},
Expand All @@ -128,7 +128,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return nil
},
Expand All @@ -150,7 +150,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
event.Event.Duration = durationpb.New(time.Nanosecond)
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
event.Event.Duration = durationpb.New(time.Nanosecond)
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand All @@ -234,7 +234,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
event.Log = &modelpb.Log{}
return []*modelpb.APMEvent{event}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
}
return []*modelpb.APMEvent{success, unknown}
},
partitioner: NewHashPartitioner(1),
partitions: 1,
expected: func() []*aggregationpb.CombinedMetrics {
return []*aggregationpb.CombinedMetrics{
NewTestCombinedMetrics(
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestEventToCombinedMetrics(t *testing.T) {
return nil
}
for _, e := range tc.input() {
err := EventToCombinedMetrics(e, cmk, tc.partitioner, collector)
err := EventToCombinedMetrics(e, cmk, tc.partitions, collector)
require.NoError(t, err)
}
assert.Empty(t, cmp.Diff(
Expand Down Expand Up @@ -543,13 +543,12 @@ func BenchmarkEventToCombinedMetrics(b *testing.B) {
ProcessingTime: time.Now().Truncate(time.Minute),
ID: EncodeToCombinedMetricsKeyID(b, "ab01"),
}
partitioner := NewHashPartitioner(1)
noop := func(_ CombinedMetricsKey, _ *aggregationpb.CombinedMetrics) error {
return nil
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := EventToCombinedMetrics(event, cmk, partitioner, noop)
err := EventToCombinedMetrics(event, cmk, 1 /*partitions*/, noop)
if err != nil {
b.Fatal(err)
}
Expand Down