diff --git a/x-pack/apm-server/aggregation/servicesummarymetrics/aggregator.go b/x-pack/apm-server/aggregation/servicesummarymetrics/aggregator.go index dd84ccdad8..19559a6066 100644 --- a/x-pack/apm-server/aggregation/servicesummarymetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/servicesummarymetrics/aggregator.go @@ -13,11 +13,9 @@ import ( "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" - "github.com/pkg/errors" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/pkg/errors" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/logs" @@ -313,9 +311,9 @@ func makeAggregationKey(event *modelpb.APMEvent, interval time.Duration) aggrega serviceLanguageName: event.GetService().GetLanguage().GetName(), }, } - if event.Timestamp != nil { + if event.Timestamp != 0 { // Group metrics by time interval. - key.comparable.timestamp = event.Timestamp.AsTime().Truncate(interval) + key.comparable.timestamp = time.Unix(0, int64(event.Timestamp)).Truncate(interval) } key.AggregatedGlobalLabels.Read(event) return key @@ -336,10 +334,6 @@ func makeOverflowAggregationKey(interval time.Duration) aggregationKey { } func makeMetricset(key aggregationKey, interval string) *modelpb.APMEvent { - var t *timestamppb.Timestamp - if !key.timestamp.IsZero() { - t = timestamppb.New(key.timestamp) - } var agent *modelpb.Agent if key.agentName != "" { agent = &modelpb.Agent{Name: key.agentName} @@ -353,7 +347,7 @@ func makeMetricset(key aggregationKey, interval string) *modelpb.APMEvent { } return &modelpb.APMEvent{ - Timestamp: t, + Timestamp: uint64(key.timestamp.UnixNano()), Service: &modelpb.Service{ Name: key.serviceName, Environment: key.serviceEnvironment, diff --git a/x-pack/apm-server/aggregation/servicetxmetrics/aggregator.go b/x-pack/apm-server/aggregation/servicetxmetrics/aggregator.go index 3036a4d059..b2b1da00d2 100644 --- a/x-pack/apm-server/aggregation/servicetxmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/servicetxmetrics/aggregator.go @@ -14,12 +14,10 @@ import ( "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" - "github.com/pkg/errors" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/go-hdrhistogram" + "github.com/pkg/errors" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/logs" @@ -382,9 +380,9 @@ func makeAggregationKey(event *modelpb.APMEvent, interval time.Duration) aggrega transactionType: event.GetTransaction().GetType(), }, } - if event.Timestamp != nil { + if event.Timestamp != 0 { // Group metrics by time interval. - key.comparable.timestamp = event.Timestamp.AsTime().Truncate(interval) + key.comparable.timestamp = time.Unix(0, int64(event.Timestamp)).Truncate(interval) } key.AggregatedGlobalLabels.Read(event) return key @@ -446,7 +444,7 @@ func (m *serviceTxMetrics) histogramBuckets() (totalCount uint64, counts []uint6 func makeServiceTxMetrics(event *modelpb.APMEvent) serviceTxMetrics { transactionCount := event.Transaction.RepresentativeCount metrics := serviceTxMetrics{ - transactionDuration: float64(event.Event.Duration.AsDuration()), + transactionDuration: float64(event.Event.Duration), transactionCount: transactionCount, } switch event.Event.Outcome { @@ -472,11 +470,6 @@ func makeMetricset(key aggregationKey, metrics serviceTxMetrics, interval string transactionDurationSummary.Sum += v * float64(counts[i]) } - var t *timestamppb.Timestamp - if !key.timestamp.IsZero() { - t = timestamppb.New(key.timestamp) - } - var event *modelpb.Event if metrics.successCount != 0 || metrics.failureCount != 0 { event = &modelpb.Event{ @@ -500,7 +493,7 @@ func makeMetricset(key aggregationKey, metrics serviceTxMetrics, interval string } return &modelpb.APMEvent{ - Timestamp: t, + Timestamp: uint64(key.timestamp.UnixNano()), Service: &modelpb.Service{ Name: key.serviceName, Environment: key.serviceEnvironment, diff --git a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go index 4041a3bc78..6bb9161015 100644 --- a/x-pack/apm-server/aggregation/spanmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/spanmetrics/aggregator.go @@ -14,10 +14,6 @@ import ( "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" - "github.com/pkg/errors" - "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/logs" "github.com/elastic/apm-server/x-pack/apm-server/aggregation/baseaggregator" @@ -25,6 +21,7 @@ import ( "github.com/elastic/apm-server/x-pack/apm-server/aggregation/labels" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/pkg/errors" ) const ( @@ -246,7 +243,7 @@ func (a *Aggregator) processSpan(event *modelpb.APMEvent) { // pre-aggregated spans and excludes time gaps that are counted in the reported // span duration. For non-composite spans we just use the reported span duration. count := 1 - duration := event.Event.Duration.AsDuration() + duration := time.Duration(event.Event.Duration) if event.Span.Composite != nil { count = int(event.Span.Composite.Count) duration = time.Duration(event.Span.Composite.Sum * float64(time.Millisecond)) @@ -292,7 +289,7 @@ func (a *Aggregator) processDroppedSpanStats(event *modelpb.APMEvent, dss *model metrics := spanMetrics{ count: float64(dss.Duration.Count) * representativeCount, - sum: float64(dss.Duration.Sum.AsDuration()) * representativeCount, + sum: float64(dss.Duration.Sum) * representativeCount, } for _, interval := range a.Intervals { key := makeAggregationKey( @@ -467,9 +464,9 @@ func makeAggregationKey( resource: resource, }, } - if event.Timestamp != nil { + if event.Timestamp != 0 { // Group metrics by time interval. - key.comparable.timestamp = event.Timestamp.AsTime().Truncate(interval) + key.comparable.timestamp = time.Unix(0, int64(event.Timestamp)).Truncate(interval) } key.AggregatedGlobalLabels.Read(event) return key @@ -502,10 +499,6 @@ func makeMetricset(key aggregationKey, metrics spanMetrics) *modelpb.APMEvent { Name: key.targetName, } } - var t *timestamppb.Timestamp - if !key.timestamp.IsZero() { - t = timestamppb.New(key.timestamp) - } var agent *modelpb.Agent if key.agentName != "" { @@ -520,7 +513,7 @@ func makeMetricset(key aggregationKey, metrics spanMetrics) *modelpb.APMEvent { } return &modelpb.APMEvent{ - Timestamp: t, + Timestamp: uint64(key.timestamp.UnixNano()), Agent: agent, Service: &modelpb.Service{ Name: key.serviceName, @@ -540,7 +533,7 @@ func makeMetricset(key aggregationKey, metrics spanMetrics) *modelpb.APMEvent { Resource: key.resource, ResponseTime: &modelpb.AggregatedDuration{ Count: uint64(math.Round(metrics.count)), - Sum: durationpb.New(time.Duration(math.Round(metrics.sum))), + Sum: uint64(math.Round(metrics.sum)), }, }, }, diff --git a/x-pack/apm-server/aggregation/txmetrics/aggregator.go b/x-pack/apm-server/aggregation/txmetrics/aggregator.go index f704a11cf0..1db7e1d085 100644 --- a/x-pack/apm-server/aggregation/txmetrics/aggregator.go +++ b/x-pack/apm-server/aggregation/txmetrics/aggregator.go @@ -13,12 +13,10 @@ import ( "github.com/axiomhq/hyperloglog" "github.com/cespare/xxhash/v2" - "github.com/pkg/errors" - "google.golang.org/protobuf/types/known/timestamppb" - "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/go-hdrhistogram" + "github.com/pkg/errors" "github.com/elastic/apm-data/model/modelpb" "github.com/elastic/apm-server/internal/logs" @@ -312,7 +310,7 @@ func (a *Aggregator) AggregateTransaction(event *modelpb.APMEvent) { } for _, interval := range a.Intervals { key := makeTransactionAggregationKey(event, interval) - if err := a.updateTransactionMetrics(key, count, event.GetEvent().GetDuration().AsDuration(), interval); err != nil { + if err := a.updateTransactionMetrics(key, count, time.Duration(event.GetEvent().GetDuration()), interval); err != nil { a.config.Logger.Errorf("failed to aggregate transaction: %w", err) } } @@ -500,9 +498,9 @@ func makeTransactionAggregationKey(event *modelpb.APMEvent, interval time.Durati faasVersion: event.GetFaas().GetVersion(), }, } - if event.Timestamp != nil { + if event.Timestamp != 0 { // Group metrics by time interval. - key.comparable.timestamp = event.Timestamp.AsTime().Truncate(interval) + key.comparable.timestamp = time.Unix(0, int64(event.Timestamp)).Truncate(interval) } if event.Faas != nil { key.comparable.faasColdstart = nullableBoolFromPtr(event.Faas.ColdStart) @@ -531,11 +529,6 @@ func makeMetricset(key transactionAggregationKey, metrics transactionMetrics, in // Keep both Count and Sum as 0. } - var t *timestamppb.Timestamp - if !key.timestamp.IsZero() { - t = timestamppb.New(key.timestamp) - } - transactionDurationSummary := modelpb.SummaryMetric{ Count: totalCount, } @@ -645,7 +638,7 @@ func makeMetricset(key transactionAggregationKey, metrics transactionMetrics, in } return &modelpb.APMEvent{ - Timestamp: t, + Timestamp: uint64(key.timestamp.UnixNano()), Agent: agent, Container: container, Kubernetes: kube,