From 4b64bad09790a9e4442d01fe6c98c9931b143268 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 20 Jul 2023 17:59:25 +0800 Subject: [PATCH] Use proto models directly for AggregateBatch (#28) --- Makefile | 2 +- aggregationpb/aggregation_vtproto.pb.go | 465 ++++++++++++++++++++++++ aggregationpb/labels_vtproto.pb.go | 79 ++++ aggregators/aggregator.go | 32 +- aggregators/aggregator_test.go | 41 +-- aggregators/codec.go | 48 ++- aggregators/converter.go | 418 +++++++++++++-------- aggregators/converter_test.go | 240 +++++++++--- aggregators/hasher.go | 118 +++++- aggregators/hasher_test.go | 10 +- aggregators/merger_test.go | 4 + aggregators/models.go | 18 - go.mod | 2 +- go.sum | 4 +- 14 files changed, 1191 insertions(+), 290 deletions(-) diff --git a/Makefile b/Makefile index a2cb1de..e6b7bce 100644 --- a/Makefile +++ b/Makefile @@ -44,7 +44,7 @@ gen-proto: $(PROTOC_GEN_GO) $(PROTOC_GEN_GO_VTPROTO) $(PROTOC) $(eval PROTOC_VT_STRUCTS := $(shell for s in $(STRUCTS); do echo --go-vtproto_opt=pool=./aggregationpb.$$s ;done)) $(PROTOC) -I . --go_out=$(PROTOC_OUT) --plugin protoc-gen-go="$(PROTOC_GEN_GO)" \ --go-vtproto_out=$(PROTOC_OUT) --plugin protoc-gen-go-vtproto="$(PROTOC_GEN_GO_VTPROTO)" \ - --go-vtproto_opt=features=marshal+unmarshal+size+pool \ + --go-vtproto_opt=features=marshal+unmarshal+size+pool+clone \ $(PROTOC_VT_STRUCTS) \ $(wildcard proto/*.proto) $(MAKE) fmt diff --git a/aggregationpb/aggregation_vtproto.pb.go b/aggregationpb/aggregation_vtproto.pb.go index 59cde22..c93f8dc 100644 --- a/aggregationpb/aggregation_vtproto.pb.go +++ b/aggregationpb/aggregation_vtproto.pb.go @@ -16,6 +16,7 @@ import ( bits "math/bits" sync "sync" + proto "google.golang.org/protobuf/proto" protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) @@ -26,6 +27,470 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +func (m *CombinedMetrics) CloneVT() *CombinedMetrics { + if m == nil { + return (*CombinedMetrics)(nil) + } + r := &CombinedMetrics{ + OverflowServices: m.OverflowServices.CloneVT(), + EventsTotal: m.EventsTotal, + YoungestEventTimestamp: m.YoungestEventTimestamp, + } + if rhs := m.ServiceMetrics; rhs != nil { + tmpContainer := make([]*KeyedServiceMetrics, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.ServiceMetrics = tmpContainer + } + if rhs := m.OverflowServiceInstancesEstimator; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.OverflowServiceInstancesEstimator = tmpBytes + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *CombinedMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *KeyedServiceMetrics) CloneVT() *KeyedServiceMetrics { + if m == nil { + return (*KeyedServiceMetrics)(nil) + } + r := &KeyedServiceMetrics{ + Key: m.Key.CloneVT(), + Metrics: m.Metrics.CloneVT(), + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *KeyedServiceMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *ServiceAggregationKey) CloneVT() *ServiceAggregationKey { + if m == nil { + return (*ServiceAggregationKey)(nil) + } + r := &ServiceAggregationKey{ + Timestamp: m.Timestamp, + ServiceName: m.ServiceName, + ServiceEnvironment: m.ServiceEnvironment, + ServiceLanguageName: m.ServiceLanguageName, + AgentName: m.AgentName, + } + if rhs := m.GlobalLabelsStr; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.GlobalLabelsStr = tmpBytes + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *ServiceAggregationKey) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *ServiceMetrics) CloneVT() *ServiceMetrics { + if m == nil { + return (*ServiceMetrics)(nil) + } + r := &ServiceMetrics{ + OverflowGroups: m.OverflowGroups.CloneVT(), + } + if rhs := m.ServiceInstanceMetrics; rhs != nil { + tmpContainer := make([]*KeyedServiceInstanceMetrics, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.ServiceInstanceMetrics = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *ServiceMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *ServiceInstanceAggregationKey) CloneVT() *ServiceInstanceAggregationKey { + if m == nil { + return (*ServiceInstanceAggregationKey)(nil) + } + r := &ServiceInstanceAggregationKey{} + if rhs := m.GlobalLabelsStr; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.GlobalLabelsStr = tmpBytes + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *ServiceInstanceAggregationKey) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *ServiceInstanceMetrics) CloneVT() *ServiceInstanceMetrics { + if m == nil { + return (*ServiceInstanceMetrics)(nil) + } + r := &ServiceInstanceMetrics{} + if rhs := m.TransactionMetrics; rhs != nil { + tmpContainer := make([]*KeyedTransactionMetrics, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.TransactionMetrics = tmpContainer + } + if rhs := m.ServiceTransactionMetrics; rhs != nil { + tmpContainer := make([]*KeyedServiceTransactionMetrics, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.ServiceTransactionMetrics = tmpContainer + } + if rhs := m.SpanMetrics; rhs != nil { + tmpContainer := make([]*KeyedSpanMetrics, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.SpanMetrics = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *ServiceInstanceMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *KeyedServiceInstanceMetrics) CloneVT() *KeyedServiceInstanceMetrics { + if m == nil { + return (*KeyedServiceInstanceMetrics)(nil) + } + r := &KeyedServiceInstanceMetrics{ + Key: m.Key.CloneVT(), + Metrics: m.Metrics.CloneVT(), + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *KeyedServiceInstanceMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *KeyedTransactionMetrics) CloneVT() *KeyedTransactionMetrics { + if m == nil { + return (*KeyedTransactionMetrics)(nil) + } + r := &KeyedTransactionMetrics{ + Key: m.Key.CloneVT(), + Metrics: m.Metrics.CloneVT(), + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *KeyedTransactionMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *TransactionAggregationKey) CloneVT() *TransactionAggregationKey { + if m == nil { + return (*TransactionAggregationKey)(nil) + } + r := &TransactionAggregationKey{ + TraceRoot: m.TraceRoot, + ContainerId: m.ContainerId, + KubernetesPodName: m.KubernetesPodName, + ServiceVersion: m.ServiceVersion, + ServiceNodeName: m.ServiceNodeName, + ServiceRuntimeName: m.ServiceRuntimeName, + ServiceRuntimeVersion: m.ServiceRuntimeVersion, + ServiceLanguageVersion: m.ServiceLanguageVersion, + HostHostname: m.HostHostname, + HostName: m.HostName, + HostOsPlatform: m.HostOsPlatform, + EventOutcome: m.EventOutcome, + TransactionName: m.TransactionName, + TransactionType: m.TransactionType, + TransactionResult: m.TransactionResult, + FaasColdstart: m.FaasColdstart, + FaasId: m.FaasId, + FaasName: m.FaasName, + FaasVersion: m.FaasVersion, + FaasTriggerType: m.FaasTriggerType, + CloudProvider: m.CloudProvider, + CloudRegion: m.CloudRegion, + CloudAvailabilityZone: m.CloudAvailabilityZone, + CloudServiceName: m.CloudServiceName, + CloudAccountId: m.CloudAccountId, + CloudAccountName: m.CloudAccountName, + CloudMachineType: m.CloudMachineType, + CloudProjectId: m.CloudProjectId, + CloudProjectName: m.CloudProjectName, + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *TransactionAggregationKey) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *TransactionMetrics) CloneVT() *TransactionMetrics { + if m == nil { + return (*TransactionMetrics)(nil) + } + r := &TransactionMetrics{ + Histogram: m.Histogram.CloneVT(), + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *TransactionMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *KeyedServiceTransactionMetrics) CloneVT() *KeyedServiceTransactionMetrics { + if m == nil { + return (*KeyedServiceTransactionMetrics)(nil) + } + r := &KeyedServiceTransactionMetrics{ + Key: m.Key.CloneVT(), + Metrics: m.Metrics.CloneVT(), + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *KeyedServiceTransactionMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *ServiceTransactionAggregationKey) CloneVT() *ServiceTransactionAggregationKey { + if m == nil { + return (*ServiceTransactionAggregationKey)(nil) + } + r := &ServiceTransactionAggregationKey{ + TransactionType: m.TransactionType, + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *ServiceTransactionAggregationKey) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *ServiceTransactionMetrics) CloneVT() *ServiceTransactionMetrics { + if m == nil { + return (*ServiceTransactionMetrics)(nil) + } + r := &ServiceTransactionMetrics{ + Histogram: m.Histogram.CloneVT(), + FailureCount: m.FailureCount, + SuccessCount: m.SuccessCount, + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *ServiceTransactionMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *KeyedSpanMetrics) CloneVT() *KeyedSpanMetrics { + if m == nil { + return (*KeyedSpanMetrics)(nil) + } + r := &KeyedSpanMetrics{ + Key: m.Key.CloneVT(), + Metrics: m.Metrics.CloneVT(), + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *KeyedSpanMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *SpanAggregationKey) CloneVT() *SpanAggregationKey { + if m == nil { + return (*SpanAggregationKey)(nil) + } + r := &SpanAggregationKey{ + SpanName: m.SpanName, + Outcome: m.Outcome, + TargetType: m.TargetType, + TargetName: m.TargetName, + Resource: m.Resource, + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *SpanAggregationKey) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *SpanMetrics) CloneVT() *SpanMetrics { + if m == nil { + return (*SpanMetrics)(nil) + } + r := &SpanMetrics{ + Count: m.Count, + Sum: m.Sum, + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *SpanMetrics) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *CountValue) CloneVT() *CountValue { + if m == nil { + return (*CountValue)(nil) + } + r := &CountValue{ + Count: m.Count, + Value: m.Value, + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *CountValue) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *HDRHistogram) CloneVT() *HDRHistogram { + if m == nil { + return (*HDRHistogram)(nil) + } + r := &HDRHistogram{ + LowestTrackableValue: m.LowestTrackableValue, + HighestTrackableValue: m.HighestTrackableValue, + SignificantFigures: m.SignificantFigures, + } + if rhs := m.Counts; rhs != nil { + tmpContainer := make([]int64, len(rhs)) + copy(tmpContainer, rhs) + r.Counts = tmpContainer + } + if rhs := m.Buckets; rhs != nil { + tmpContainer := make([]int32, len(rhs)) + copy(tmpContainer, rhs) + r.Buckets = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *HDRHistogram) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *Overflow) CloneVT() *Overflow { + if m == nil { + return (*Overflow)(nil) + } + r := &Overflow{ + OverflowTransactions: m.OverflowTransactions.CloneVT(), + OverflowServiceTransactions: m.OverflowServiceTransactions.CloneVT(), + OverflowSpans: m.OverflowSpans.CloneVT(), + } + if rhs := m.OverflowTransactionsEstimator; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.OverflowTransactionsEstimator = tmpBytes + } + if rhs := m.OverflowServiceTransactionsEstimator; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.OverflowServiceTransactionsEstimator = tmpBytes + } + if rhs := m.OverflowSpansEstimator; rhs != nil { + tmpBytes := make([]byte, len(rhs)) + copy(tmpBytes, rhs) + r.OverflowSpansEstimator = tmpBytes + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *Overflow) CloneMessageVT() proto.Message { + return m.CloneVT() +} + func (m *CombinedMetrics) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil diff --git a/aggregationpb/labels_vtproto.pb.go b/aggregationpb/labels_vtproto.pb.go index adad2f0..fb64476 100644 --- a/aggregationpb/labels_vtproto.pb.go +++ b/aggregationpb/labels_vtproto.pb.go @@ -15,6 +15,7 @@ import ( math "math" sync "sync" + proto "google.golang.org/protobuf/proto" protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) @@ -25,6 +26,84 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +func (m *GlobalLabels) CloneVT() *GlobalLabels { + if m == nil { + return (*GlobalLabels)(nil) + } + r := &GlobalLabels{} + if rhs := m.Labels; rhs != nil { + tmpContainer := make([]*Label, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.Labels = tmpContainer + } + if rhs := m.NumericLabels; rhs != nil { + tmpContainer := make([]*NumericLabel, len(rhs)) + for k, v := range rhs { + tmpContainer[k] = v.CloneVT() + } + r.NumericLabels = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *GlobalLabels) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *Label) CloneVT() *Label { + if m == nil { + return (*Label)(nil) + } + r := &Label{ + Key: m.Key, + Value: m.Value, + } + if rhs := m.Values; rhs != nil { + tmpContainer := make([]string, len(rhs)) + copy(tmpContainer, rhs) + r.Values = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *Label) CloneMessageVT() proto.Message { + return m.CloneVT() +} + +func (m *NumericLabel) CloneVT() *NumericLabel { + if m == nil { + return (*NumericLabel)(nil) + } + r := &NumericLabel{ + Key: m.Key, + Value: m.Value, + } + if rhs := m.Values; rhs != nil { + tmpContainer := make([]float64, len(rhs)) + copy(tmpContainer, rhs) + r.Values = tmpContainer + } + if len(m.unknownFields) > 0 { + r.unknownFields = make([]byte, len(m.unknownFields)) + copy(r.unknownFields, m.unknownFields) + } + return r +} + +func (m *NumericLabel) CloneMessageVT() proto.Message { + return m.CloneVT() +} + func (m *GlobalLabels) MarshalVT() (dAtA []byte, err error) { if m == nil { return nil, nil diff --git a/aggregators/aggregator.go b/aggregators/aggregator.go index 788c8ed..35ae979 100644 --- a/aggregators/aggregator.go +++ b/aggregators/aggregator.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "github.com/elastic/apm-aggregation/aggregationpb" "github.com/elastic/apm-aggregation/aggregators/internal/telemetry" "github.com/elastic/apm-data/model/modelpb" ) @@ -182,7 +183,7 @@ func (a *Aggregator) AggregateBatch( func (a *Aggregator) AggregateCombinedMetrics( ctx context.Context, cmk CombinedMetricsKey, - cm CombinedMetrics, + cm *aggregationpb.CombinedMetrics, ) error { cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(cmk.ID) traceAttrs := append(append([]attribute.KeyValue{}, cmIDAttrs...), @@ -210,7 +211,7 @@ func (a *Aggregator) AggregateCombinedMetrics( a.cachedStats[cmk.Interval] = make(map[[16]byte]stats) } cmStats := a.cachedStats[cmk.Interval][cmk.ID] - cmStats.eventsTotal += cm.eventsTotal + cmStats.eventsTotal += cm.EventsTotal a.cachedStats[cmk.Interval][cmk.ID] = cmStats span.SetAttributes(attribute.Int("bytes_ingested", bytesIn)) @@ -350,17 +351,15 @@ func (a *Aggregator) aggregateAPMEvent( cmk CombinedMetricsKey, e *modelpb.APMEvent, ) (int, error) { - kvs, err := EventToCombinedMetrics(e, cmk, a.cfg.Partitioner) - if err != nil { - return 0, fmt.Errorf("failed to convert event to combined metrics: %w", err) - } var totalBytesIn int - for cmk, cm := range kvs { - bytesIn, err := a.aggregate(ctx, cmk, *cm) + aggregateFunc := func(k CombinedMetricsKey, m *aggregationpb.CombinedMetrics) error { + bytesIn, err := a.aggregate(ctx, k, m) totalBytesIn += bytesIn - if err != nil { - return totalBytesIn, fmt.Errorf("failed to aggregate combined metrics: %w", err) - } + return err + } + err := EventToCombinedMetrics(e, cmk, a.cfg.Partitioner, aggregateFunc) + if err != nil { + return 0, fmt.Errorf("failed to aggregate combined metrics: %w", err) } return totalBytesIn, nil } @@ -370,29 +369,26 @@ func (a *Aggregator) aggregateAPMEvent( func (a *Aggregator) aggregate( ctx context.Context, cmk CombinedMetricsKey, - cm CombinedMetrics, + cm *aggregationpb.CombinedMetrics, ) (int, error) { - cmproto := cm.ToProto() - defer cmproto.ReturnToVTPool() - if a.batch == nil { // Batch is backed by a sync pool. After each commit we will release the batch // back to the pool by calling Batch#Close and subsequently acquire a new batch. a.batch = a.db.NewBatch() } - op := a.batch.MergeDeferred(cmk.SizeBinary(), cmproto.SizeVT()) + op := a.batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT()) if err := cmk.MarshalBinaryToSizedBuffer(op.Key); err != nil { return 0, fmt.Errorf("failed to marshal combined metrics key: %w", err) } - if _, err := cmproto.MarshalToSizedBufferVT(op.Value); err != nil { + if _, err := cm.MarshalToSizedBufferVT(op.Value); err != nil { return 0, fmt.Errorf("failed to marshal combined metrics: %w", err) } if err := op.Finish(); err != nil { return 0, fmt.Errorf("failed to finalize merge operation: %w", err) } - bytesIn := cmproto.SizeVT() + bytesIn := cm.SizeVT() if a.batch.Len() >= dbCommitThresholdBytes { if err := a.batch.Commit(pebble.Sync); err != nil { return bytesIn, fmt.Errorf("failed to commit pebble batch: %w", err) diff --git a/aggregators/aggregator_test.go b/aggregators/aggregator_test.go index 1d9a0cd..6f83957 100644 --- a/aggregators/aggregator_test.go +++ b/aggregators/aggregator_test.go @@ -155,7 +155,7 @@ func TestAggregateBatch(t *testing.T) { { Samples: map[string]apmmodel.Metric{ "aggregator.requests.total": {Value: 1}, - "aggregator.bytes.ingested": {Value: 149750}, + "aggregator.bytes.ingested": {Value: 133750}, }, Labels: apmmodel.StringMap{ apmmodel.StringMapItem{Key: "id_key", Value: string(cmID[:])}, @@ -788,7 +788,7 @@ func TestHarvest(t *testing.T) { expectedMeasurements = append(expectedMeasurements, apmmodel.Metrics{ Samples: map[string]apmmodel.Metric{ "aggregator.requests.total": {Value: 1}, - "aggregator.bytes.ingested": {Value: 318}, + "aggregator.bytes.ingested": {Value: 270}, }, Labels: apmmodel.StringMap{ apmmodel.StringMapItem{Key: "id_key", Value: string(cmID[:])}, @@ -1080,7 +1080,6 @@ func TestRunStopOrchestration(t *testing.T) { } func BenchmarkAggregateCombinedMetrics(b *testing.B) { - b.ReportAllocs() gatherer, err := apmotel.NewGatherer() if err != nil { b.Fatal(err) @@ -1117,28 +1116,28 @@ func BenchmarkAggregateCombinedMetrics(b *testing.B) { ProcessingTime: time.Now().Truncate(aggIvl), ID: EncodeToCombinedMetricsKeyID(b, "ab01"), } - kvs, err := EventToCombinedMetrics( - &modelpb.APMEvent{ - Event: &modelpb.Event{Duration: durationpb.New(time.Millisecond)}, - Transaction: &modelpb.Transaction{ - Name: "T-1000", - Type: "type", - RepresentativeCount: 1, - }, - }, - cmk, NewHashPartitioner(1), - ) - if err != nil { - b.Fatal(err) - } + cm := (*CombinedMetrics)(createTestCombinedMetrics(withEventsTotal(1)). + addServiceTransaction( + time.Now(), + "test-svc", + "", + testServiceTransaction{txnType: "txntype", count: 1}, + ). + addTransaction( + time.Now(), + "test-svc", + "", + testTransaction{txnName: "txntest", txnType: "txntype", count: 1}, + ), + ).ToProto() + b.Cleanup(func() { cm.ReturnToVTPool() }) ctx, cancel := context.WithCancel(context.Background()) b.Cleanup(func() { cancel() }) + b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - for cmk, cm := range kvs { - if err := agg.AggregateCombinedMetrics(ctx, cmk, *cm); err != nil { - b.Fatal(err) - } + if err := agg.AggregateCombinedMetrics(ctx, cmk, cm); err != nil { + b.Fatal(err) } } } diff --git a/aggregators/codec.go b/aggregators/codec.go index 8e7e144..f8746a4 100644 --- a/aggregators/codec.go +++ b/aggregators/codec.go @@ -90,8 +90,10 @@ func (m *CombinedMetrics) ToProto() *aggregationpb.CombinedMetrics { ksm.Metrics = m.ToProto() pb.ServiceMetrics = append(pb.ServiceMetrics, ksm) } - pb.OverflowServices = m.OverflowServices.ToProto() - pb.OverflowServiceInstancesEstimator = hllBytes(m.OverflowServiceInstancesEstimator) + if pb.OverflowServiceInstancesEstimator != nil { + pb.OverflowServices = m.OverflowServices.ToProto() + pb.OverflowServiceInstancesEstimator = hllBytes(m.OverflowServiceInstancesEstimator) + } pb.EventsTotal = m.eventsTotal pb.YoungestEventTimestamp = timestamppb.TimeToPBTimestamp(m.youngestEventTimestamp) return pb @@ -109,8 +111,8 @@ func (m *CombinedMetrics) FromProto(pb *aggregationpb.CombinedMetrics) { } if pb.OverflowServices != nil { m.OverflowServices.FromProto(pb.OverflowServices) + m.OverflowServiceInstancesEstimator = hllSketch(pb.OverflowServiceInstancesEstimator) } - m.OverflowServiceInstancesEstimator = hllSketch(pb.OverflowServiceInstancesEstimator) m.eventsTotal = pb.EventsTotal m.youngestEventTimestamp = timestamppb.PBTimestampToTime(pb.YoungestEventTimestamp) } @@ -179,7 +181,9 @@ func (m *ServiceMetrics) FromProto(pb *aggregationpb.ServiceMetrics) { v.FromProto(ksim.Metrics) m.ServiceInstanceGroups[k] = v } - m.OverflowGroups.FromProto(pb.OverflowGroups) + if pb.OverflowGroups != nil { + m.OverflowGroups.FromProto(pb.OverflowGroups) + } } // ToProto converts ServiceInstanceAggregationKey to its protobuf representation. @@ -467,23 +471,35 @@ func (m *SpanMetrics) FromProto(pb *aggregationpb.SpanMetrics) { // ToProto converts Overflow to its protobuf representation. func (o *Overflow) ToProto() *aggregationpb.Overflow { pb := aggregationpb.OverflowFromVTPool() - pb.OverflowTransactions = o.OverflowTransaction.Metrics.ToProto() - pb.OverflowServiceTransactions = o.OverflowServiceTransaction.Metrics.ToProto() - pb.OverflowSpans = o.OverflowSpan.Metrics.ToProto() - pb.OverflowTransactionsEstimator = hllBytes(o.OverflowTransaction.Estimator) - pb.OverflowServiceTransactionsEstimator = hllBytes(o.OverflowServiceTransaction.Estimator) - pb.OverflowSpansEstimator = hllBytes(o.OverflowSpan.Estimator) + if !o.OverflowTransaction.Empty() { + pb.OverflowTransactions = o.OverflowTransaction.Metrics.ToProto() + pb.OverflowTransactionsEstimator = hllBytes(o.OverflowTransaction.Estimator) + } + if !o.OverflowServiceTransaction.Empty() { + pb.OverflowServiceTransactions = o.OverflowServiceTransaction.Metrics.ToProto() + pb.OverflowServiceTransactionsEstimator = hllBytes(o.OverflowServiceTransaction.Estimator) + } + if !o.OverflowSpan.Empty() { + pb.OverflowSpans = o.OverflowSpan.Metrics.ToProto() + pb.OverflowSpansEstimator = hllBytes(o.OverflowSpan.Estimator) + } return pb } // FromProto converts protobuf representation to Overflow. func (o *Overflow) FromProto(pb *aggregationpb.Overflow) { - o.OverflowTransaction.Metrics.FromProto(pb.OverflowTransactions) - o.OverflowServiceTransaction.Metrics.FromProto(pb.OverflowServiceTransactions) - o.OverflowSpan.Metrics.FromProto(pb.OverflowSpans) - o.OverflowTransaction.Estimator = hllSketch(pb.OverflowTransactionsEstimator) - o.OverflowServiceTransaction.Estimator = hllSketch(pb.OverflowServiceTransactionsEstimator) - o.OverflowSpan.Estimator = hllSketch(pb.OverflowSpansEstimator) + if pb.OverflowTransactions != nil { + o.OverflowTransaction.Metrics.FromProto(pb.OverflowTransactions) + o.OverflowTransaction.Estimator = hllSketch(pb.OverflowTransactionsEstimator) + } + if pb.OverflowServiceTransactions != nil { + o.OverflowServiceTransaction.Metrics.FromProto(pb.OverflowServiceTransactions) + o.OverflowServiceTransaction.Estimator = hllSketch(pb.OverflowServiceTransactionsEstimator) + } + if pb.OverflowSpans != nil { + o.OverflowSpan.Metrics.FromProto(pb.OverflowSpans) + o.OverflowSpan.Estimator = hllSketch(pb.OverflowSpansEstimator) + } } // ToProto converts GlobalLabels to its protobuf representation. diff --git a/aggregators/converter.go b/aggregators/converter.go index bed2fa1..223d23c 100644 --- a/aggregators/converter.go +++ b/aggregators/converter.go @@ -5,14 +5,19 @@ package aggregators import ( + "errors" "fmt" "math" "time" "github.com/axiomhq/hyperloglog" + "golang.org/x/exp/slices" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" + "github.com/elastic/apm-aggregation/aggregationpb" + "github.com/elastic/apm-aggregation/aggregators/internal/hdrhistogram" + tspb "github.com/elastic/apm-aggregation/aggregators/internal/timestamppb" "github.com/elastic/apm-data/model/modelpb" ) @@ -25,114 +30,123 @@ const ( overflowBucketName = "_other" ) -func setMetricCountBasedOnOutcome(stm *ServiceTransactionMetrics, from *modelpb.APMEvent) { - txn := from.GetTransaction() - switch from.GetEvent().GetOutcome() { - case "failure": - stm.FailureCount = txn.GetRepresentativeCount() - case "success": - stm.SuccessCount = txn.GetRepresentativeCount() - } -} - -// EventToCombinedMetrics converts APMEvent to one or more CombinedMetrics. -// If an event results in multiple metrics, they may be spread across different partitions. +// EventToCombinedMetrics converts APMEvent to one or more CombinedMetrics and +// calls the provided callback for each pair of CombinedMetricsKey and +// CombinedMetrics. The callback MUST NOT hold the reference of the passed +// CombinedMetrics. If required, the callback can call CloneVT to clone the +// CombinedMetrics. If an event results in multiple metrics, they may be spread +// across different partitions. // -// EventToCombinedMetrics will never produce overflow metrics, as it applies to a single APMEvent. +// EventToCombinedMetrics will never produce overflow metrics, as it applies to a +// single APMEvent. func EventToCombinedMetrics( e *modelpb.APMEvent, unpartitionedKey CombinedMetricsKey, partitioner Partitioner, -) (map[CombinedMetricsKey]*CombinedMetrics, error) { - var gl GlobalLabels - gl.fromLabelsAndNumericLabels(e.Labels, e.NumericLabels) - gls, err := gl.MarshalString() + callback func(CombinedMetricsKey, *aggregationpb.CombinedMetrics) error, +) error { + svcKey := serviceKey(e, unpartitionedKey.Interval) + svcInstanceKey, err := serviceInstanceKey(e) if err != nil { - return nil, err + return err } + hasher := Hasher{}. + Chain(serviceKeyHasher(svcKey)). + Chain(serviceInstanceKeyHasher(svcInstanceKey)) - kvs := make(map[CombinedMetricsKey]*CombinedMetrics) - svcKey := serviceKey(e, unpartitionedKey.Interval) - svcInstanceKey := ServiceInstanceAggregationKey{GlobalLabelsStr: gls} - hasher := Hasher{}.Chain(svcKey).Chain(svcInstanceKey) - - setCombinedMetrics := func(k CombinedMetricsKey, sim ServiceInstanceMetrics) { - cm, ok := kvs[k] + // m collects service instance metrics for each partition + m := make(map[uint16]*aggregationpb.ServiceInstanceMetrics) + addToM := func(partitionID uint16, from *aggregationpb.ServiceInstanceMetrics) { + to, ok := m[partitionID] if !ok { - cm = &CombinedMetrics{ - Services: map[ServiceAggregationKey]ServiceMetrics{ - svcKey: ServiceMetrics{ - ServiceInstanceGroups: map[ServiceInstanceAggregationKey]ServiceInstanceMetrics{}, - }, - }, - } - kvs[k] = cm - } - svcInstanceM := cm.Services[svcKey].ServiceInstanceGroups[svcInstanceKey] - for k, v := range sim.TransactionGroups { - if svcInstanceM.TransactionGroups == nil { - svcInstanceM.TransactionGroups = make(map[TransactionAggregationKey]TransactionMetrics) - } - svcInstanceM.TransactionGroups[k] = v - } - for k, v := range sim.ServiceTransactionGroups { - if svcInstanceM.ServiceTransactionGroups == nil { - svcInstanceM.ServiceTransactionGroups = make(map[ServiceTransactionAggregationKey]ServiceTransactionMetrics) - } - svcInstanceM.ServiceTransactionGroups[k] = v - } - for k, v := range sim.SpanGroups { - if svcInstanceM.SpanGroups == nil { - svcInstanceM.SpanGroups = make(map[SpanAggregationKey]SpanMetrics) - } - svcInstanceM.SpanGroups[k] = v + m[partitionID] = from + return } - cm.Services[svcKey].ServiceInstanceGroups[svcInstanceKey] = svcInstanceM + to.ServiceTransactionMetrics = mergeSlices[aggregationpb.KeyedServiceTransactionMetrics]( + to.ServiceTransactionMetrics, from.ServiceTransactionMetrics, + ) + from.ServiceTransactionMetrics = nil + to.TransactionMetrics = mergeSlices[aggregationpb.KeyedTransactionMetrics]( + to.TransactionMetrics, from.TransactionMetrics, + ) + from.TransactionMetrics = nil + to.SpanMetrics = mergeSlices[aggregationpb.KeyedSpanMetrics]( + to.SpanMetrics, from.SpanMetrics, + ) + from.SpanMetrics = nil + from.ReturnToVTPool() } switch e.Type() { case modelpb.TransactionEventType: repCount := e.GetTransaction().GetRepresentativeCount() if repCount <= 0 { - return nil, nil + return nil } - tm := newTransactionMetrics() - tm.Histogram.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount) + + hdr := hdrhistogram.New() + hdr.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount) + + // Transaction metrics + tm := aggregationpb.TransactionMetricsFromVTPool() + tm.Histogram = HistogramToProto(hdr) + txnKey := transactionKey(e) - cmk := unpartitionedKey - cmk.PartitionID = partitioner.Partition(hasher.Chain(txnKey).Sum()) - setCombinedMetrics(cmk, ServiceInstanceMetrics{ - TransactionGroups: map[TransactionAggregationKey]TransactionMetrics{txnKey: tm}, - }) - - stm := newServiceTransactionMetrics() - stm.Histogram.RecordDuration(e.GetEvent().GetDuration().AsDuration(), repCount) - setMetricCountBasedOnOutcome(&stm, e) + ktm := aggregationpb.KeyedTransactionMetricsFromVTPool() + ktm.Key, ktm.Metrics = txnKey, tm + + svcInstanceMetrics := aggregationpb.ServiceInstanceMetricsFromVTPool() + svcInstanceMetrics.TransactionMetrics = append( + svcInstanceMetrics.TransactionMetrics, ktm, + ) + partitionID := partitioner.Partition( + hasher.Chain(transactionKeyHasher(txnKey)).Sum(), + ) + addToM(partitionID, svcInstanceMetrics) + + // Service Transaction metrics + stm := aggregationpb.ServiceTransactionMetricsFromVTPool() + stm.Histogram = HistogramToProto(hdr) // use same histogram + setMetricCountBasedOnOutcome(stm, e) + svcTxnKey := serviceTransactionKey(e) - cmk.PartitionID = partitioner.Partition(hasher.Chain(svcTxnKey).Sum()) - setCombinedMetrics(cmk, ServiceInstanceMetrics{ - ServiceTransactionGroups: map[ServiceTransactionAggregationKey]ServiceTransactionMetrics{svcTxnKey: stm}, - }) + kstm := aggregationpb.KeyedServiceTransactionMetricsFromVTPool() + kstm.Key, kstm.Metrics = svcTxnKey, stm - // Handle dropped span stats + svcInstanceMetrics = aggregationpb.ServiceInstanceMetricsFromVTPool() + svcInstanceMetrics.ServiceTransactionMetrics = append( + svcInstanceMetrics.ServiceTransactionMetrics, kstm, + ) + partitionID = partitioner.Partition( + hasher.Chain(serviceTransactionKeyHasher(svcTxnKey)).Sum(), + ) + addToM(partitionID, svcInstanceMetrics) + + // Dropped span stats for _, dss := range e.GetTransaction().GetDroppedSpansStats() { + spm := aggregationpb.SpanMetricsFromVTPool() + spm.Count = float64(dss.GetDuration().GetCount()) * repCount + spm.Sum = float64(dss.GetDuration().GetSum().AsDuration()) * repCount + dssKey := droppedSpanStatsKey(dss) - cmk.PartitionID = partitioner.Partition(hasher.Chain(dssKey).Sum()) - setCombinedMetrics(cmk, ServiceInstanceMetrics{ - SpanGroups: map[SpanAggregationKey]SpanMetrics{ - dssKey: SpanMetrics{ - Count: float64(dss.GetDuration().GetCount()) * repCount, - Sum: float64(dss.GetDuration().GetSum().AsDuration()) * repCount, - }, - }, - }) + kspm := aggregationpb.KeyedSpanMetricsFromVTPool() + kspm.Key, kspm.Metrics = dssKey, spm + + svcInstanceMetrics = aggregationpb.ServiceInstanceMetricsFromVTPool() + svcInstanceMetrics.SpanMetrics = append( + svcInstanceMetrics.SpanMetrics, kspm, + ) + partitionID = partitioner.Partition( + hasher.Chain(spanKeyHasher(dssKey)).Sum(), + ) + addToM(partitionID, svcInstanceMetrics) } case modelpb.SpanEventType: target := e.GetService().GetTarget() repCount := e.GetSpan().GetRepresentativeCount() destSvc := e.GetSpan().GetDestinationService().GetResource() if repCount <= 0 || (target == nil && destSvc == "") { - return nil, nil + return nil } var count uint32 = 1 @@ -141,28 +155,59 @@ func EventToCombinedMetrics( count = composite.GetCount() duration = time.Duration(composite.GetSum() * float64(time.Millisecond)) } + + spm := aggregationpb.SpanMetricsFromVTPool() + spm.Count = float64(count) * repCount + spm.Sum = float64(duration) * repCount + spanKey := spanKey(e) - cmk := unpartitionedKey - cmk.PartitionID = partitioner.Partition(hasher.Chain(spanKey).Sum()) - setCombinedMetrics(cmk, ServiceInstanceMetrics{ - SpanGroups: map[SpanAggregationKey]SpanMetrics{ - spanKey: SpanMetrics{ - Count: float64(count) * repCount, - Sum: float64(duration) * repCount, - }, - }, - }) + kspm := aggregationpb.KeyedSpanMetricsFromVTPool() + kspm.Key, kspm.Metrics = spanKey, spm + + svcInstanceMetrics := aggregationpb.ServiceInstanceMetricsFromVTPool() + svcInstanceMetrics.SpanMetrics = append( + svcInstanceMetrics.SpanMetrics, kspm, + ) + partitionID := partitioner.Partition( + hasher.Chain(spanKeyHasher(spanKey)).Sum(), + ) + addToM(partitionID, svcInstanceMetrics) } // Approximate events total by uniformly distributing the events total // amongst the partitioned key values. - weightedEventsTotal := 1 / float64(len(kvs)) - eventTS := e.GetEvent().GetReceived().AsTime() - for _, cm := range kvs { - cm.eventsTotal = weightedEventsTotal - cm.youngestEventTimestamp = eventTS + weightedEventsTotal := 1 / float64(len(m)) + eventTS := tspb.TimeToPBTimestamp(e.GetEvent().GetReceived().AsTime()) + + ksim := aggregationpb.KeyedServiceInstanceMetricsFromVTPool() + ksim.Key = svcInstanceKey + + ksm := aggregationpb.KeyedServiceMetricsFromVTPool() + ksm.Key, ksm.Metrics = svcKey, aggregationpb.ServiceMetricsFromVTPool() + ksm.Metrics.ServiceInstanceMetrics = append(ksm.Metrics.ServiceInstanceMetrics, ksim) + + cm := aggregationpb.CombinedMetricsFromVTPool() + defer cm.ReturnToVTPool() + cm.ServiceMetrics = append(cm.ServiceMetrics, ksm) + + var errs []error + for partitionID, svcInstanceMetrics := range m { + key := unpartitionedKey + key.PartitionID = partitionID + + cm.ServiceMetrics[0].Metrics.ServiceInstanceMetrics[0].Metrics = svcInstanceMetrics + cm.EventsTotal = weightedEventsTotal + cm.YoungestEventTimestamp = uint64(eventTS) + if err := callback(key, cm); err != nil { + errs = append(errs, err) + } + cm.ServiceMetrics[0].Metrics.ServiceInstanceMetrics[0].Metrics = nil + svcInstanceMetrics.ReturnToVTPool() + } + if len(errs) > 0 { + return fmt.Errorf("failed while executing callback: %w", errors.Join(errs...)) } - return kvs, nil + return nil } // CombinedMetricsToBatch converts CombinedMetrics to a batch of APMEvents. @@ -679,70 +724,107 @@ func overflowSpanMetricsToAPMEvent( baseEvent.Metricset.DocCount = int64(overflowCount) } -func serviceKey(e *modelpb.APMEvent, aggInterval time.Duration) ServiceAggregationKey { - return ServiceAggregationKey{ - Timestamp: e.GetTimestamp().AsTime().Truncate(aggInterval), - ServiceName: e.GetService().GetName(), - ServiceEnvironment: e.GetService().GetEnvironment(), - ServiceLanguageName: e.GetService().GetLanguage().GetName(), - AgentName: e.GetAgent().GetName(), +func serviceKey(e *modelpb.APMEvent, aggInterval time.Duration) *aggregationpb.ServiceAggregationKey { + key := aggregationpb.ServiceAggregationKeyFromVTPool() + key.Timestamp = tspb.TimeToPBTimestamp( + e.GetTimestamp().AsTime().Truncate(aggInterval), + ) + key.ServiceName = e.GetService().GetName() + key.ServiceEnvironment = e.GetService().GetEnvironment() + key.ServiceLanguageName = e.GetService().GetLanguage().GetName() + key.AgentName = e.GetAgent().GetName() + + return key +} + +func serviceInstanceKey(e *modelpb.APMEvent) (*aggregationpb.ServiceInstanceAggregationKey, error) { + var gl GlobalLabels + for k, v := range e.Labels { + if !v.Global { + continue + } + if (&gl).Labels == nil { + (&gl).Labels = make(modelpb.Labels) + } + gl.Labels[k] = v + } + for k, v := range e.NumericLabels { + if !v.Global { + continue + } + if (&gl).NumericLabels == nil { + (&gl).NumericLabels = make(modelpb.NumericLabels) + } + gl.NumericLabels[k] = v } + glb, err := gl.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("failed to parse global labels: %w", err) + } + + key := aggregationpb.ServiceInstanceAggregationKeyFromVTPool() + key.GlobalLabelsStr = glb + + return key, nil } -func transactionKey(e *modelpb.APMEvent) TransactionAggregationKey { +func transactionKey(e *modelpb.APMEvent) *aggregationpb.TransactionAggregationKey { var faasColdstart NullableBool faas := e.GetFaas() if faas != nil { faasColdstart.ParseBoolPtr(faas.ColdStart) } - return TransactionAggregationKey{ - TraceRoot: e.GetParentId() == "", - ContainerID: e.GetContainer().GetId(), - KubernetesPodName: e.GetKubernetes().GetPodName(), + key := aggregationpb.TransactionAggregationKeyFromVTPool() + key.TraceRoot = e.GetParentId() == "" - ServiceVersion: e.GetService().GetVersion(), - ServiceNodeName: e.GetService().GetNode().GetName(), + key.ContainerId = e.GetContainer().GetId() + key.KubernetesPodName = e.GetKubernetes().GetPodName() - ServiceRuntimeName: e.GetService().GetRuntime().GetName(), - ServiceRuntimeVersion: e.GetService().GetRuntime().GetVersion(), - ServiceLanguageVersion: e.GetService().GetLanguage().GetVersion(), + key.ServiceVersion = e.GetService().GetVersion() + key.ServiceNodeName = e.GetService().GetNode().GetName() - HostHostname: e.GetHost().GetHostname(), - HostName: e.GetHost().GetName(), - HostOSPlatform: e.GetHost().GetOs().GetPlatform(), + key.ServiceRuntimeName = e.GetService().GetRuntime().GetName() + key.ServiceRuntimeVersion = e.GetService().GetRuntime().GetVersion() + key.ServiceLanguageVersion = e.GetService().GetLanguage().GetVersion() - EventOutcome: e.GetEvent().GetOutcome(), + key.HostHostname = e.GetHost().GetHostname() + key.HostName = e.GetHost().GetName() + key.HostOsPlatform = e.GetHost().GetOs().GetPlatform() - TransactionName: e.GetTransaction().GetName(), - TransactionType: e.GetTransaction().GetType(), - TransactionResult: e.GetTransaction().GetResult(), + key.EventOutcome = e.GetEvent().GetOutcome() - FAASColdstart: faasColdstart, - FAASID: faas.GetId(), - FAASName: faas.GetName(), - FAASVersion: faas.GetVersion(), - FAASTriggerType: faas.GetTriggerType(), + key.TransactionName = e.GetTransaction().GetName() + key.TransactionType = e.GetTransaction().GetType() + key.TransactionResult = e.GetTransaction().GetResult() - CloudProvider: e.GetCloud().GetProvider(), - CloudRegion: e.GetCloud().GetRegion(), - CloudAvailabilityZone: e.GetCloud().GetAvailabilityZone(), - CloudServiceName: e.GetCloud().GetServiceName(), - CloudAccountID: e.GetCloud().GetAccountId(), - CloudAccountName: e.GetCloud().GetAccountName(), - CloudMachineType: e.GetCloud().GetMachineType(), - CloudProjectID: e.GetCloud().GetProjectId(), - CloudProjectName: e.GetCloud().GetProjectName(), - } + key.FaasColdstart = uint32(faasColdstart) + key.FaasId = faas.GetId() + key.FaasName = faas.GetName() + key.FaasVersion = faas.GetVersion() + key.FaasTriggerType = faas.GetTriggerType() + + key.CloudProvider = e.GetCloud().GetProvider() + key.CloudRegion = e.GetCloud().GetRegion() + key.CloudAvailabilityZone = e.GetCloud().GetAvailabilityZone() + key.CloudServiceName = e.GetCloud().GetServiceName() + key.CloudAccountId = e.GetCloud().GetAccountId() + key.CloudAccountName = e.GetCloud().GetAccountName() + key.CloudMachineType = e.GetCloud().GetMachineType() + key.CloudProjectId = e.GetCloud().GetProjectId() + key.CloudProjectName = e.GetCloud().GetProjectName() + + return key } -func serviceTransactionKey(e *modelpb.APMEvent) ServiceTransactionAggregationKey { - return ServiceTransactionAggregationKey{ - TransactionType: e.GetTransaction().GetType(), - } +func serviceTransactionKey(e *modelpb.APMEvent) *aggregationpb.ServiceTransactionAggregationKey { + key := aggregationpb.ServiceTransactionAggregationKeyFromVTPool() + key.TransactionType = e.GetTransaction().GetType() + + return key } -func spanKey(e *modelpb.APMEvent) SpanAggregationKey { +func spanKey(e *modelpb.APMEvent) *aggregationpb.SpanAggregationKey { var resource, targetType, targetName string target := e.GetService().GetTarget() if target != nil { @@ -753,28 +835,44 @@ func spanKey(e *modelpb.APMEvent) SpanAggregationKey { if destSvc != nil { resource = destSvc.GetResource() } - return SpanAggregationKey{ - SpanName: e.GetSpan().GetName(), - Outcome: e.GetEvent().GetOutcome(), - TargetType: targetType, - TargetName: targetName, + key := aggregationpb.SpanAggregationKeyFromVTPool() + key.SpanName = e.GetSpan().GetName() + key.Outcome = e.GetEvent().GetOutcome() - Resource: resource, - } + key.TargetType = targetType + key.TargetName = targetName + + key.Resource = resource + + return key } -func droppedSpanStatsKey(dss *modelpb.DroppedSpanStats) SpanAggregationKey { - return SpanAggregationKey{ - // Dropped span statistics do not contain span name because it - // would be too expensive to track dropped span stats per span name. - SpanName: "", - Outcome: dss.GetOutcome(), +func droppedSpanStatsKey(dss *modelpb.DroppedSpanStats) *aggregationpb.SpanAggregationKey { + key := aggregationpb.SpanAggregationKeyFromVTPool() + // Dropped span statistics do not contain span name because it + // would be too expensive to track dropped span stats per span name. + key.SpanName = "" + key.Outcome = dss.GetOutcome() + + key.TargetType = dss.GetServiceTargetType() + key.TargetName = dss.GetServiceTargetName() - TargetType: dss.GetServiceTargetType(), - TargetName: dss.GetServiceTargetName(), + key.Resource = dss.GetDestinationServiceResource() + + return key +} - Resource: dss.GetDestinationServiceResource(), +func setMetricCountBasedOnOutcome( + stm *aggregationpb.ServiceTransactionMetrics, + from *modelpb.APMEvent, +) { + txn := from.GetTransaction() + switch from.GetEvent().GetOutcome() { + case "failure": + stm.FailureCount = txn.GetRepresentativeCount() + case "success": + stm.SuccessCount = txn.GetRepresentativeCount() } } @@ -791,3 +889,11 @@ func populateNil[T any](a *T) *T { } return a } + +func mergeSlices[T any](to []*T, from []*T) []*T { + if len(from) == 0 { + return to + } + to = slices.Grow(to, len(from)) + return append(to, from...) +} diff --git a/aggregators/converter_test.go b/aggregators/converter_test.go index 7d6bc6f..b5c29f7 100644 --- a/aggregators/converter_test.go +++ b/aggregators/converter_test.go @@ -20,67 +20,209 @@ import ( "github.com/elastic/apm-data/model/modelpb" + "github.com/elastic/apm-aggregation/aggregationpb" "github.com/elastic/apm-aggregation/aggregators/internal/hdrhistogram" ) func TestEventToCombinedMetrics(t *testing.T) { ts := time.Now().UTC() receivedTS := ts.Add(time.Second) - event := &modelpb.APMEvent{ + baseEvent := &modelpb.APMEvent{ Timestamp: timestamppb.New(ts), ParentId: "nonroot", - Service: &modelpb.Service{ - Name: "test", - }, + Service: &modelpb.Service{Name: "test"}, Event: &modelpb.Event{ Duration: durationpb.New(time.Second), Outcome: "success", Received: timestamppb.New(receivedTS), }, - Transaction: &modelpb.Transaction{ - RepresentativeCount: 1, - Name: "testtxn", - Type: "testtyp", - }, - } - cmk := CombinedMetricsKey{ - Interval: time.Minute, - ProcessingTime: time.Now().Truncate(time.Minute), - ID: EncodeToCombinedMetricsKeyID(t, "ab01"), } - kvs, err := EventToCombinedMetrics(event, cmk, NewHashPartitioner(1)) - require.NoError(t, err) - expected := make(map[CombinedMetricsKey]*CombinedMetrics) - expected[cmk] = (*CombinedMetrics)(createTestCombinedMetrics( - withEventsTotal(1), - withYoungestEventTimestamp(receivedTS), - ).addTransaction( - ts.Truncate(time.Minute), - event.Service.Name, - "", - testTransaction{ - txnName: event.Transaction.Name, - txnType: event.Transaction.Type, - eventOutcome: event.Event.Outcome, - count: 1, + for _, tc := range []struct { + name string + input func() *modelpb.APMEvent + partitioner Partitioner + expected func() []*aggregationpb.CombinedMetrics + }{ + { + name: "with-zero-rep-count-txn", + input: func() *modelpb.APMEvent { + event := baseEvent.CloneVT() + event.Transaction = &modelpb.Transaction{ + Name: "testtxn", + Type: "testtyp", + RepresentativeCount: 0, + } + return event + }, + partitioner: NewHashPartitioner(1), + expected: func() []*aggregationpb.CombinedMetrics { + return nil + }, + }, + { + name: "with-good-txn", + input: func() *modelpb.APMEvent { + event := baseEvent.CloneVT() + event.Transaction = &modelpb.Transaction{ + Name: "testtxn", + Type: "testtyp", + RepresentativeCount: 1, + } + return event + }, + partitioner: NewHashPartitioner(1), + expected: func() []*aggregationpb.CombinedMetrics { + return []*aggregationpb.CombinedMetrics{ + (*CombinedMetrics)(createTestCombinedMetrics( + withEventsTotal(1), + withYoungestEventTimestamp(receivedTS), + ).addTransaction( + ts.Truncate(time.Minute), "test", "", + testTransaction{ + txnName: "testtxn", + txnType: "testtyp", + eventOutcome: "success", + count: 1, + }, + ).addServiceTransaction( + ts.Truncate(time.Minute), "test", "", + testServiceTransaction{ + txnType: "testtyp", + count: 1, + }, + )).ToProto(), + } + }, + }, + { + name: "with-zero-rep-count-span", + input: func() *modelpb.APMEvent { + event := baseEvent.CloneVT() + event.Span = &modelpb.Span{ + Name: "testspan", + Type: "testtyp", + RepresentativeCount: 0, + } + return event + }, + partitioner: NewHashPartitioner(1), + expected: func() []*aggregationpb.CombinedMetrics { + return nil + }, + }, + { + name: "with-no-exit-span", + input: func() *modelpb.APMEvent { + event := baseEvent.CloneVT() + event.Span = &modelpb.Span{ + Name: "testspan", + Type: "testtyp", + RepresentativeCount: 1, + } + return event + }, + partitioner: NewHashPartitioner(1), + expected: func() []*aggregationpb.CombinedMetrics { + return nil + }, }, - ).addServiceTransaction( - ts.Truncate(time.Minute), - event.Service.Name, - "", - testServiceTransaction{ - txnType: event.Transaction.Type, - count: 1, + { + name: "with-good-span-dest-svc", + input: func() *modelpb.APMEvent { + event := baseEvent.CloneVT() + event.Span = &modelpb.Span{ + Name: "testspan", + Type: "testtyp", + RepresentativeCount: 1, + } + event.Service.Target = &modelpb.ServiceTarget{ + Name: "psql", + Type: "db", + } + // Current test structs are hardcoded to use 1ns for spans + event.Event.Duration = durationpb.New(time.Nanosecond) + return event + }, + partitioner: NewHashPartitioner(1), + expected: func() []*aggregationpb.CombinedMetrics { + return []*aggregationpb.CombinedMetrics{ + (*CombinedMetrics)(createTestCombinedMetrics( + withEventsTotal(1), + withYoungestEventTimestamp(receivedTS), + ).addSpan( + ts.Truncate(time.Minute), "test", "", + testSpan{ + spanName: "testspan", + targetName: "psql", + targetType: "db", + outcome: "success", + count: 1, + }, + )).ToProto(), + } + }, + }, + { + name: "with-good-span-svc-target", + input: func() *modelpb.APMEvent { + event := baseEvent.CloneVT() + event.Span = &modelpb.Span{ + Name: "testspan", + Type: "testtyp", + RepresentativeCount: 1, + DestinationService: &modelpb.DestinationService{ + Resource: "db", + }, + } + // Current test structs are hardcoded to use 1ns for spans + event.Event.Duration = durationpb.New(time.Nanosecond) + return event + }, + partitioner: NewHashPartitioner(1), + expected: func() []*aggregationpb.CombinedMetrics { + return []*aggregationpb.CombinedMetrics{ + (*CombinedMetrics)(createTestCombinedMetrics( + withEventsTotal(1), + withYoungestEventTimestamp(receivedTS), + ).addSpan( + ts.Truncate(time.Minute), "test", "", + testSpan{ + spanName: "testspan", + destinationResource: "db", + outcome: "success", + count: 1, + }, + )).ToProto(), + } + }, }, - )) - assert.Empty(t, cmp.Diff( - expected, kvs, - cmpopts.EquateEmpty(), - cmp.Comparer(func(a, b hdrhistogram.HybridCountsRep) bool { - return a.Equal(&b) - }), - cmp.AllowUnexported(CombinedMetrics{}), - )) + } { + t.Run(tc.name, func(t *testing.T) { + cmk := CombinedMetricsKey{ + Interval: time.Minute, + ProcessingTime: time.Now().Truncate(time.Minute), + ID: EncodeToCombinedMetricsKeyID(t, "ab01"), + } + var actual []*aggregationpb.CombinedMetrics + collector := func( + _ CombinedMetricsKey, + m *aggregationpb.CombinedMetrics, + ) error { + actual = append(actual, m.CloneVT()) + return nil + } + err := EventToCombinedMetrics(tc.input(), cmk, tc.partitioner, collector) + require.NoError(t, err) + assert.Empty(t, cmp.Diff( + tc.expected(), actual, + cmp.Comparer(func(a, b hdrhistogram.HybridCountsRep) bool { + return a.Equal(&b) + }), + protocmp.Transform(), + protocmp.IgnoreEmptyMessages(), + )) + }) + } } func TestCombinedMetricsToBatch(t *testing.T) { @@ -187,8 +329,6 @@ func TestCombinedMetricsToBatch(t *testing.T) { assert.Empty(t, cmp.Diff( tc.expectedEvents, *b, cmpopts.IgnoreTypes(netip.Addr{}), - cmpopts.EquateEmpty(), - protocmp.Transform(), cmpopts.SortSlices(func(e1, e2 *modelpb.APMEvent) bool { m1Name := e1.GetMetricset().GetName() m2Name := e2.GetMetricset().GetName() @@ -204,6 +344,7 @@ func TestCombinedMetricsToBatch(t *testing.T) { return e1.GetService().GetEnvironment() < e2.GetService().GetEnvironment() }), + protocmp.Transform(), )) }) } @@ -256,9 +397,12 @@ func BenchmarkEventToCombinedMetrics(b *testing.B) { ID: EncodeToCombinedMetricsKeyID(b, "ab01"), } partitioner := NewHashPartitioner(1) + noop := func(_ CombinedMetricsKey, _ *aggregationpb.CombinedMetrics) error { + return nil + } b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := EventToCombinedMetrics(event, cmk, partitioner) + err := EventToCombinedMetrics(event, cmk, partitioner, noop) if err != nil { b.Fatal(err) } diff --git a/aggregators/hasher.go b/aggregators/hasher.go index f726964..f2c6364 100644 --- a/aggregators/hasher.go +++ b/aggregators/hasher.go @@ -4,20 +4,136 @@ package aggregators -import "github.com/cespare/xxhash/v2" +import ( + "encoding/binary" + "github.com/cespare/xxhash/v2" + + "github.com/elastic/apm-aggregation/aggregationpb" +) + +// HashableFunc is a function type that implements Hashable. +type HashableFunc func(xxhash.Digest) xxhash.Digest + +// Hash calls HashableFunc function. +func (f HashableFunc) Hash(d xxhash.Digest) xxhash.Digest { + return f(d) +} + +// Hashable represents the hash function interface implemented by aggregation models. type Hashable interface { Hash(xxhash.Digest) xxhash.Digest } +// Hasher contains a safe to copy digest. type Hasher struct { digest xxhash.Digest // xxhash.Digest does not contain pointers and is safe to copy } +// Chain allows chaining hash functions for Hashable interfaces. func (h Hasher) Chain(hashable Hashable) Hasher { return Hasher{digest: hashable.Hash(h.digest)} } +// Sum returns the hash for all the chained interfaces. func (h Hasher) Sum() uint64 { return h.digest.Sum64() } + +func serviceKeyHasher( + k *aggregationpb.ServiceAggregationKey, +) Hashable { + return HashableFunc(func(h xxhash.Digest) xxhash.Digest { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], k.Timestamp) + h.Write(buf[:]) + + h.WriteString(k.ServiceName) + h.WriteString(k.ServiceEnvironment) + h.WriteString(k.ServiceLanguageName) + h.WriteString(k.AgentName) + return h + }) +} + +func serviceInstanceKeyHasher( + k *aggregationpb.ServiceInstanceAggregationKey, +) Hashable { + return HashableFunc(func(h xxhash.Digest) xxhash.Digest { + h.Write(k.GlobalLabelsStr) + return h + }) +} + +func serviceTransactionKeyHasher( + k *aggregationpb.ServiceTransactionAggregationKey, +) Hashable { + return HashableFunc(func(h xxhash.Digest) xxhash.Digest { + h.WriteString(k.TransactionType) + return h + }) +} + +func spanKeyHasher( + k *aggregationpb.SpanAggregationKey, +) Hashable { + return HashableFunc(func(h xxhash.Digest) xxhash.Digest { + h.WriteString(k.SpanName) + h.WriteString(k.Outcome) + + h.WriteString(k.TargetType) + h.WriteString(k.TargetName) + + h.WriteString(k.Resource) + return h + }) +} + +func transactionKeyHasher( + k *aggregationpb.TransactionAggregationKey, +) Hashable { + return HashableFunc(func(h xxhash.Digest) xxhash.Digest { + if k.TraceRoot { + h.WriteString("1") + } + + h.WriteString(k.ContainerId) + h.WriteString(k.KubernetesPodName) + + h.WriteString(k.ServiceVersion) + h.WriteString(k.ServiceNodeName) + + h.WriteString(k.ServiceRuntimeName) + h.WriteString(k.ServiceRuntimeVersion) + h.WriteString(k.ServiceLanguageVersion) + + h.WriteString(k.HostHostname) + h.WriteString(k.HostName) + h.WriteString(k.HostOsPlatform) + + h.WriteString(k.EventOutcome) + + h.WriteString(k.TransactionName) + h.WriteString(k.TransactionType) + h.WriteString(k.TransactionResult) + + if k.FaasColdstart == uint32(True) { + h.WriteString("1") + } + h.WriteString(k.FaasId) + h.WriteString(k.FaasName) + h.WriteString(k.FaasVersion) + h.WriteString(k.FaasTriggerType) + + h.WriteString(k.CloudProvider) + h.WriteString(k.CloudRegion) + h.WriteString(k.CloudAvailabilityZone) + h.WriteString(k.CloudServiceName) + h.WriteString(k.CloudAccountId) + h.WriteString(k.CloudAccountName) + h.WriteString(k.CloudMachineType) + h.WriteString(k.CloudProjectId) + h.WriteString(k.CloudProjectName) + return h + }) +} diff --git a/aggregators/hasher_test.go b/aggregators/hasher_test.go index 05a9536..0a7237a 100644 --- a/aggregators/hasher_test.go +++ b/aggregators/hasher_test.go @@ -12,19 +12,13 @@ import ( "github.com/cespare/xxhash/v2" ) -type testHashable func(xxhash.Digest) xxhash.Digest - -func (f testHashable) Hash(h xxhash.Digest) xxhash.Digest { - return f(h) -} - func TestHasher(t *testing.T) { a := Hasher{} - b := a.Chain(testHashable(func(h xxhash.Digest) xxhash.Digest { + b := a.Chain(HashableFunc(func(h xxhash.Digest) xxhash.Digest { h.WriteString("1") return h })) - c := a.Chain(testHashable(func(h xxhash.Digest) xxhash.Digest { + c := a.Chain(HashableFunc(func(h xxhash.Digest) xxhash.Digest { h.WriteString("1") return h })) diff --git a/aggregators/merger_test.go b/aggregators/merger_test.go index b02ede1..9325993 100644 --- a/aggregators/merger_test.go +++ b/aggregators/merger_test.go @@ -534,6 +534,8 @@ type testSpan struct { spanName string destinationResource string targetName string + targetType string + outcome string count int } @@ -557,7 +559,9 @@ func spanKeyFromTestSpan(span testSpan) SpanAggregationKey { return SpanAggregationKey{ SpanName: span.spanName, TargetName: span.targetName, + TargetType: span.targetType, Resource: span.destinationResource, + Outcome: span.outcome, } } diff --git a/aggregators/models.go b/aggregators/models.go index c4dc962..5ff8ce7 100644 --- a/aggregators/models.go +++ b/aggregators/models.go @@ -447,21 +447,3 @@ type GlobalLabels struct { Labels modelpb.Labels NumericLabels modelpb.NumericLabels } - -func (gl *GlobalLabels) fromLabelsAndNumericLabels(labels modelpb.Labels, numericLabels modelpb.NumericLabels) { - gl.Labels = make(modelpb.Labels) - for k, v := range labels { - if !v.Global { - continue - } - gl.Labels[k] = v - } - - gl.NumericLabels = make(modelpb.NumericLabels) - for k, v := range numericLabels { - if !v.Global { - continue - } - gl.NumericLabels[k] = v - } -} diff --git a/go.mod b/go.mod index f6c2b74..771efc9 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.39.0 go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/zap v1.24.0 + golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 golang.org/x/sync v0.3.0 google.golang.org/protobuf v1.31.0 ) @@ -55,7 +56,6 @@ require ( go.elastic.co/fastjson v1.3.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/exp v0.0.0-20200513190911-00229845015e // indirect golang.org/x/sys v0.8.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect howett.net/plist v1.0.0 // indirect diff --git a/go.sum b/go.sum index 89f62ad..d982de4 100644 --- a/go.sum +++ b/go.sum @@ -431,8 +431,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20200513190911-00229845015e h1:rMqLP+9XLy+LdbCXHjJHAmTfXCr93W7oruWA6Hq1Alc= -golang.org/x/exp v0.0.0-20200513190911-00229845015e/go.mod h1:4M0jN8W1tt0AVLNr8HDosyJCDCDuyL9N9+3m7wDWgKw= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 h1:MGwJjxBy0HJshjDNfLsYO8xppfqWlA5ZT9OhtUUhTNw= +golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=