Skip to content

Commit

Permalink
Generate agggregationpb key hash functions
Browse files Browse the repository at this point in the history
Generate key hashing functions in an internal package,
and remove the Hasher type. This will ensure the hash
functions stay in sync, and will enable us to split up
the aggregators package into subpackages.

Also, remove global labels from ServiceAggregationKey.
  • Loading branch information
axw committed Aug 2, 2023
1 parent df78561 commit 026bec8
Show file tree
Hide file tree
Showing 15 changed files with 485 additions and 480 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ gen-proto: $(PROTOC_GEN_GO) $(PROTOC_GEN_GO_VTPROTO) $(PROTOC)
--go-vtproto_opt=features=marshal+unmarshal+size+pool+clone \
$(PROTOC_VT_STRUCTS) \
$(wildcard proto/*.proto)
go generate ./aggregators/internal/protohash
$(MAKE) fmt
462 changes: 226 additions & 236 deletions aggregationpb/aggregation.pb.go

Large diffs are not rendered by default.

52 changes: 0 additions & 52 deletions aggregationpb/aggregation_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 58 additions & 31 deletions aggregators/combined_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ package aggregators
import (
"time"

"github.com/cespare/xxhash/v2"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-aggregation/aggregationpb"
"github.com/elastic/apm-aggregation/aggregators/internal/hdrhistogram"
"github.com/elastic/apm-aggregation/aggregators/internal/protohash"
"github.com/elastic/apm-aggregation/aggregators/internal/timestamppb"
)

Expand Down Expand Up @@ -192,11 +194,11 @@ func (tsm *TestServiceMetrics) AddServiceInstanceMetricsOverflow(
}
}
// All service instance overflows to global bucket.
hash := Hasher{}.
Chain(tsm.sk.ToProto()).
Chain(sik.ToProto()).
Sum()
insertHash(&tsm.tcm.OverflowServiceInstancesEstimator, hash)
hash := protohash.HashServiceInstanceAggregationKey(
protohash.HashServiceAggregationKey(xxhash.Digest{}, tsm.sk.ToProto()),
sik.ToProto(),
)
insertHash(&tsm.tcm.OverflowServiceInstancesEstimator, hash.Sum64())
// Does not save to a map, children of service instance will automatically
// overflow to the global overflow bucket.
return &TestServiceInstanceMetrics{
Expand Down Expand Up @@ -251,18 +253,20 @@ func (tsim *TestServiceInstanceMetrics) AddTransactionOverflow(
from := aggregationpb.TransactionMetricsFromVTPool()
from.Histogram = histogramToProto(hdr)

hash := Hasher{}.
Chain(tsim.tsm.sk.ToProto()).
Chain(tsim.sik.ToProto()).
Chain(tk.ToProto()).
Sum()
hash := protohash.HashTransactionAggregationKey(
protohash.HashServiceInstanceAggregationKey(
protohash.HashServiceAggregationKey(xxhash.Digest{}, tsim.tsm.sk.ToProto()),
tsim.sik.ToProto(),
),
tk.ToProto(),
)
if tsim.tsm.overflow {
// Global overflow
tsim.tsm.tcm.OverflowServices.OverflowTransaction.Merge(from, hash)
tsim.tsm.tcm.OverflowServices.OverflowTransaction.Merge(from, hash.Sum64())
} else {
// Per service overflow
svc := tsim.tsm.tcm.Services[tsim.tsm.sk]
svc.OverflowGroups.OverflowTransaction.Merge(from, hash)
svc.OverflowGroups.OverflowTransaction.Merge(from, hash.Sum64())
tsim.tsm.tcm.Services[tsim.tsm.sk] = svc
}
return tsim
Expand Down Expand Up @@ -320,18 +324,20 @@ func (tsim *TestServiceInstanceMetrics) AddServiceTransactionOverflow(
from.SuccessCount = float64(cfg.count)
}

hash := Hasher{}.
Chain(tsim.tsm.sk.ToProto()).
Chain(tsim.sik.ToProto()).
Chain(stk.ToProto()).
Sum()
hash := protohash.HashServiceTransactionAggregationKey(
protohash.HashServiceInstanceAggregationKey(
protohash.HashServiceAggregationKey(xxhash.Digest{}, tsim.tsm.sk.ToProto()),
tsim.sik.ToProto(),
),
stk.ToProto(),
)
if tsim.tsm.overflow {
// Global overflow
tsim.tsm.tcm.OverflowServices.OverflowServiceTransaction.Merge(from, hash)
tsim.tsm.tcm.OverflowServices.OverflowServiceTransaction.Merge(from, hash.Sum64())
} else {
// Per service overflow
svc := tsim.tsm.tcm.Services[tsim.tsm.sk]
svc.OverflowGroups.OverflowServiceTransaction.Merge(from, hash)
svc.OverflowGroups.OverflowServiceTransaction.Merge(from, hash.Sum64())
tsim.tsm.tcm.Services[tsim.tsm.sk] = svc
}
return tsim
Expand Down Expand Up @@ -375,18 +381,20 @@ func (tsim *TestServiceInstanceMetrics) AddSpanOverflow(
from.Sum += float64(cfg.duration * time.Duration(cfg.count))
from.Count += float64(cfg.count)

hash := Hasher{}.
Chain(tsim.tsm.sk.ToProto()).
Chain(tsim.sik.ToProto()).
Chain(spk.ToProto()).
Sum()
hash := protohash.HashSpanAggregationKey(
protohash.HashServiceInstanceAggregationKey(
protohash.HashServiceAggregationKey(xxhash.Digest{}, tsim.tsm.sk.ToProto()),
tsim.sik.ToProto(),
),
spk.ToProto(),
)
if tsim.tsm.overflow {
// Global overflow
tsim.tsm.tcm.OverflowServices.OverflowSpan.Merge(from, hash)
tsim.tsm.tcm.OverflowServices.OverflowSpan.Merge(from, hash.Sum64())
} else {
// Per service overflow
svc := tsim.tsm.tcm.Services[tsim.tsm.sk]
svc.OverflowGroups.OverflowSpan.Merge(from, hash)
svc.OverflowGroups.OverflowSpan.Merge(from, hash.Sum64())
tsim.tsm.tcm.Services[tsim.tsm.sk] = svc
}
return tsim
Expand All @@ -404,18 +412,37 @@ func (tsim *TestServiceInstanceMetrics) Get() combinedMetrics {
// are not considered.
var combinedMetricsSliceSorters = []cmp.Option{
protocmp.SortRepeated(func(a, b *aggregationpb.KeyedServiceMetrics) bool {
return Hasher{}.Chain(a.Key).Sum() < Hasher{}.Chain(b.Key).Sum()
return xxhashDigestLess(
protohash.HashServiceAggregationKey(xxhash.Digest{}, a.Key),
protohash.HashServiceAggregationKey(xxhash.Digest{}, b.Key),
)
}),
protocmp.SortRepeated(func(a, b *aggregationpb.KeyedServiceInstanceMetrics) bool {
return Hasher{}.Chain(a.Key).Sum() < Hasher{}.Chain(b.Key).Sum()
return xxhashDigestLess(
protohash.HashServiceInstanceAggregationKey(xxhash.Digest{}, a.Key),
protohash.HashServiceInstanceAggregationKey(xxhash.Digest{}, b.Key),
)
}),
protocmp.SortRepeated(func(a, b *aggregationpb.KeyedTransactionMetrics) bool {
return Hasher{}.Chain(a.Key).Sum() < Hasher{}.Chain(b.Key).Sum()
return xxhashDigestLess(
protohash.HashTransactionAggregationKey(xxhash.Digest{}, a.Key),
protohash.HashTransactionAggregationKey(xxhash.Digest{}, b.Key),
)
}),
protocmp.SortRepeated(func(a, b *aggregationpb.KeyedServiceTransactionMetrics) bool {
return Hasher{}.Chain(a.Key).Sum() < Hasher{}.Chain(b.Key).Sum()
return xxhashDigestLess(
protohash.HashServiceTransactionAggregationKey(xxhash.Digest{}, a.Key),
protohash.HashServiceTransactionAggregationKey(xxhash.Digest{}, b.Key),
)
}),
protocmp.SortRepeated(func(a, b *aggregationpb.KeyedSpanMetrics) bool {
return Hasher{}.Chain(a.Key).Sum() < Hasher{}.Chain(b.Key).Sum()
return xxhashDigestLess(
protohash.HashSpanAggregationKey(xxhash.Digest{}, a.Key),
protohash.HashSpanAggregationKey(xxhash.Digest{}, b.Key),
)
}),
}

func xxhashDigestLess(a, b xxhash.Digest) bool {
return a.Sum64() < b.Sum64()
}
32 changes: 21 additions & 11 deletions aggregators/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/cespare/xxhash/v2"

"github.com/elastic/apm-aggregation/aggregationpb"
"github.com/elastic/apm-aggregation/aggregators/internal/hdrhistogram"
"github.com/elastic/apm-aggregation/aggregators/internal/protohash"
tspb "github.com/elastic/apm-aggregation/aggregators/internal/timestamppb"
"github.com/elastic/apm-aggregation/aggregators/nullable"
"github.com/elastic/apm-data/model/modelpb"
Expand All @@ -38,9 +41,9 @@ var (
// partitionedMetricsBuilder provides support for building partitioned
// sets of metrics from an event.
type partitionedMetricsBuilder struct {
partitions uint16
hasher Hasher
builders []*eventMetricsBuilder // partitioned metrics
partitions uint16
serviceInstanceHash xxhash.Digest
builders []*eventMetricsBuilder // partitioned metrics

// Event metrics are for exactly one service instance, so we create an
// array of a single element and use that for backing the slice in
Expand Down Expand Up @@ -82,7 +85,10 @@ func getPartitionedMetricsBuilder(
}
p.serviceAggregationKey = serviceAggregationKey
p.serviceInstanceAggregationKey = serviceInstanceAggregationKey
p.hasher = Hasher{}.Chain(&p.serviceAggregationKey).Chain(&p.serviceInstanceAggregationKey)
p.serviceInstanceHash = protohash.HashServiceInstanceAggregationKey(
protohash.HashServiceAggregationKey(xxhash.Digest{}, &p.serviceAggregationKey),
&p.serviceInstanceAggregationKey,
)
p.partitions = partitions
return p
}
Expand Down Expand Up @@ -131,8 +137,9 @@ func (p *partitionedMetricsBuilder) processEvent(e *modelpb.APMEvent) {
func (p *partitionedMetricsBuilder) addTransactionMetrics(e *modelpb.APMEvent, count float64, duration time.Duration) {
var key aggregationpb.TransactionAggregationKey
setTransactionKey(e, &key)
hash := protohash.HashTransactionAggregationKey(p.serviceInstanceHash, &key)

mb := p.get(p.hasher.Chain(&key))
mb := p.get(hash)
mb.transactionAggregationKey = key

hdr := hdrhistogram.New()
Expand All @@ -145,8 +152,9 @@ func (p *partitionedMetricsBuilder) addTransactionMetrics(e *modelpb.APMEvent, c
func (p *partitionedMetricsBuilder) addServiceTransactionMetrics(e *modelpb.APMEvent, count float64, duration time.Duration) {
var key aggregationpb.ServiceTransactionAggregationKey
setServiceTransactionKey(e, &key)
hash := protohash.HashServiceTransactionAggregationKey(p.serviceInstanceHash, &key)

mb := p.get(p.hasher.Chain(&key))
mb := p.get(hash)
mb.serviceTransactionAggregationKey = key

if mb.transactionMetrics.Histogram == nil {
Expand Down Expand Up @@ -174,8 +182,9 @@ func (p *partitionedMetricsBuilder) addServiceTransactionMetrics(e *modelpb.APME
func (p *partitionedMetricsBuilder) addDroppedSpanStatsMetrics(dss *modelpb.DroppedSpanStats, repCount float64) {
var key aggregationpb.SpanAggregationKey
setDroppedSpanStatsKey(dss, &key)
hash := protohash.HashSpanAggregationKey(p.serviceInstanceHash, &key)

mb := p.get(p.hasher.Chain(&key))
mb := p.get(hash)
i := len(mb.keyedSpanMetricsSlice)
if i == len(mb.keyedSpanMetricsArray) {
// No more capacity. The spec says that when 128 dropped span
Expand All @@ -194,8 +203,9 @@ func (p *partitionedMetricsBuilder) addDroppedSpanStatsMetrics(dss *modelpb.Drop
func (p *partitionedMetricsBuilder) addSpanMetrics(e *modelpb.APMEvent, repCount float64) {
var key aggregationpb.SpanAggregationKey
setSpanKey(e, &key)
hash := protohash.HashSpanAggregationKey(p.serviceInstanceHash, &key)

mb := p.get(p.hasher.Chain(&key))
mb := p.get(hash)
i := len(mb.keyedSpanMetricsSlice)
mb.spanAggregationKey[i] = key
setSpanMetrics(e, repCount, &mb.spanMetrics[i])
Expand All @@ -208,11 +218,11 @@ func (p *partitionedMetricsBuilder) addServiceSummaryMetrics() {
// There are no actual metric values, we're just want to
// create documents for the dimensions, so we can build a
// list of services.
_ = p.get(p.hasher)
_ = p.get(p.serviceInstanceHash)
}

func (p *partitionedMetricsBuilder) get(h Hasher) *eventMetricsBuilder {
partition := uint16(h.Sum() % uint64(p.partitions))
func (p *partitionedMetricsBuilder) get(h xxhash.Digest) *eventMetricsBuilder {
partition := uint16(h.Sum64() % uint64(p.partitions))
for _, mb := range p.builders {
if mb.partition == partition {
return mb
Expand Down
37 changes: 0 additions & 37 deletions aggregators/hasher.go

This file was deleted.

Loading

0 comments on commit 026bec8

Please sign in to comment.