Skip to content

Commit

Permalink
Fix uint64 fields
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Aug 16, 2023
1 parent 2e44190 commit d284a32
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 48 deletions.
14 changes: 4 additions & 10 deletions x-pack/apm-server/aggregation/servicesummarymetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/pkg/errors"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/logs"
Expand Down Expand Up @@ -313,9 +311,9 @@ func makeAggregationKey(event *modelpb.APMEvent, interval time.Duration) aggrega
serviceLanguageName: event.GetService().GetLanguage().GetName(),
},
}
if event.Timestamp != nil {
if event.Timestamp != 0 {
// Group metrics by time interval.
key.comparable.timestamp = event.Timestamp.AsTime().Truncate(interval)
key.comparable.timestamp = time.Unix(0, int64(event.Timestamp)).Truncate(interval)
}
key.AggregatedGlobalLabels.Read(event)
return key
Expand All @@ -336,10 +334,6 @@ func makeOverflowAggregationKey(interval time.Duration) aggregationKey {
}

func makeMetricset(key aggregationKey, interval string) *modelpb.APMEvent {
var t *timestamppb.Timestamp
if !key.timestamp.IsZero() {
t = timestamppb.New(key.timestamp)
}
var agent *modelpb.Agent
if key.agentName != "" {
agent = &modelpb.Agent{Name: key.agentName}
Expand All @@ -353,7 +347,7 @@ func makeMetricset(key aggregationKey, interval string) *modelpb.APMEvent {
}

return &modelpb.APMEvent{
Timestamp: t,
Timestamp: uint64(key.timestamp.UnixNano()),
Service: &modelpb.Service{
Name: key.serviceName,
Environment: key.serviceEnvironment,
Expand Down
17 changes: 5 additions & 12 deletions x-pack/apm-server/aggregation/servicetxmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ import (

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-hdrhistogram"
"github.com/pkg/errors"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/logs"
Expand Down Expand Up @@ -382,9 +380,9 @@ func makeAggregationKey(event *modelpb.APMEvent, interval time.Duration) aggrega
transactionType: event.GetTransaction().GetType(),
},
}
if event.Timestamp != nil {
if event.Timestamp != 0 {
// Group metrics by time interval.
key.comparable.timestamp = event.Timestamp.AsTime().Truncate(interval)
key.comparable.timestamp = time.Unix(0, int64(event.Timestamp)).Truncate(interval)
}
key.AggregatedGlobalLabels.Read(event)
return key
Expand Down Expand Up @@ -446,7 +444,7 @@ func (m *serviceTxMetrics) histogramBuckets() (totalCount uint64, counts []uint6
func makeServiceTxMetrics(event *modelpb.APMEvent) serviceTxMetrics {
transactionCount := event.Transaction.RepresentativeCount
metrics := serviceTxMetrics{
transactionDuration: float64(event.Event.Duration.AsDuration()),
transactionDuration: float64(event.Event.Duration),
transactionCount: transactionCount,
}
switch event.Event.Outcome {
Expand All @@ -472,11 +470,6 @@ func makeMetricset(key aggregationKey, metrics serviceTxMetrics, interval string
transactionDurationSummary.Sum += v * float64(counts[i])
}

var t *timestamppb.Timestamp
if !key.timestamp.IsZero() {
t = timestamppb.New(key.timestamp)
}

var event *modelpb.Event
if metrics.successCount != 0 || metrics.failureCount != 0 {
event = &modelpb.Event{
Expand All @@ -500,7 +493,7 @@ func makeMetricset(key aggregationKey, metrics serviceTxMetrics, interval string
}

return &modelpb.APMEvent{
Timestamp: t,
Timestamp: uint64(key.timestamp.UnixNano()),
Service: &modelpb.Service{
Name: key.serviceName,
Environment: key.serviceEnvironment,
Expand Down
21 changes: 7 additions & 14 deletions x-pack/apm-server/aggregation/spanmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,14 @@ import (

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/baseaggregator"
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/interval"
"github.com/elastic/apm-server/x-pack/apm-server/aggregation/labels"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -246,7 +243,7 @@ func (a *Aggregator) processSpan(event *modelpb.APMEvent) {
// pre-aggregated spans and excludes time gaps that are counted in the reported
// span duration. For non-composite spans we just use the reported span duration.
count := 1
duration := event.Event.Duration.AsDuration()
duration := time.Duration(event.Event.Duration)
if event.Span.Composite != nil {
count = int(event.Span.Composite.Count)
duration = time.Duration(event.Span.Composite.Sum * float64(time.Millisecond))
Expand Down Expand Up @@ -292,7 +289,7 @@ func (a *Aggregator) processDroppedSpanStats(event *modelpb.APMEvent, dss *model

metrics := spanMetrics{
count: float64(dss.Duration.Count) * representativeCount,
sum: float64(dss.Duration.Sum.AsDuration()) * representativeCount,
sum: float64(dss.Duration.Sum) * representativeCount,
}
for _, interval := range a.Intervals {
key := makeAggregationKey(
Expand Down Expand Up @@ -467,9 +464,9 @@ func makeAggregationKey(
resource: resource,
},
}
if event.Timestamp != nil {
if event.Timestamp != 0 {
// Group metrics by time interval.
key.comparable.timestamp = event.Timestamp.AsTime().Truncate(interval)
key.comparable.timestamp = time.Unix(0, int64(event.Timestamp)).Truncate(interval)
}
key.AggregatedGlobalLabels.Read(event)
return key
Expand Down Expand Up @@ -502,10 +499,6 @@ func makeMetricset(key aggregationKey, metrics spanMetrics) *modelpb.APMEvent {
Name: key.targetName,
}
}
var t *timestamppb.Timestamp
if !key.timestamp.IsZero() {
t = timestamppb.New(key.timestamp)
}

var agent *modelpb.Agent
if key.agentName != "" {
Expand All @@ -520,7 +513,7 @@ func makeMetricset(key aggregationKey, metrics spanMetrics) *modelpb.APMEvent {
}

return &modelpb.APMEvent{
Timestamp: t,
Timestamp: uint64(key.timestamp.UnixNano()),
Agent: agent,
Service: &modelpb.Service{
Name: key.serviceName,
Expand All @@ -540,7 +533,7 @@ func makeMetricset(key aggregationKey, metrics spanMetrics) *modelpb.APMEvent {
Resource: key.resource,
ResponseTime: &modelpb.AggregatedDuration{
Count: uint64(math.Round(metrics.count)),
Sum: durationpb.New(time.Duration(math.Round(metrics.sum))),
Sum: uint64(math.Round(metrics.sum)),
},
},
},
Expand Down
17 changes: 5 additions & 12 deletions x-pack/apm-server/aggregation/txmetrics/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ import (

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/go-hdrhistogram"
"github.com/pkg/errors"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/logs"
Expand Down Expand Up @@ -312,7 +310,7 @@ func (a *Aggregator) AggregateTransaction(event *modelpb.APMEvent) {
}
for _, interval := range a.Intervals {
key := makeTransactionAggregationKey(event, interval)
if err := a.updateTransactionMetrics(key, count, event.GetEvent().GetDuration().AsDuration(), interval); err != nil {
if err := a.updateTransactionMetrics(key, count, time.Duration(event.GetEvent().GetDuration()), interval); err != nil {
a.config.Logger.Errorf("failed to aggregate transaction: %w", err)
}
}
Expand Down Expand Up @@ -500,9 +498,9 @@ func makeTransactionAggregationKey(event *modelpb.APMEvent, interval time.Durati
faasVersion: event.GetFaas().GetVersion(),
},
}
if event.Timestamp != nil {
if event.Timestamp != 0 {
// Group metrics by time interval.
key.comparable.timestamp = event.Timestamp.AsTime().Truncate(interval)
key.comparable.timestamp = time.Unix(0, int64(event.Timestamp)).Truncate(interval)
}
if event.Faas != nil {
key.comparable.faasColdstart = nullableBoolFromPtr(event.Faas.ColdStart)
Expand Down Expand Up @@ -531,11 +529,6 @@ func makeMetricset(key transactionAggregationKey, metrics transactionMetrics, in
// Keep both Count and Sum as 0.
}

var t *timestamppb.Timestamp
if !key.timestamp.IsZero() {
t = timestamppb.New(key.timestamp)
}

transactionDurationSummary := modelpb.SummaryMetric{
Count: totalCount,
}
Expand Down Expand Up @@ -645,7 +638,7 @@ func makeMetricset(key transactionAggregationKey, metrics transactionMetrics, in
}

return &modelpb.APMEvent{
Timestamp: t,
Timestamp: uint64(key.timestamp.UnixNano()),
Agent: agent,
Container: container,
Kubernetes: kube,
Expand Down

0 comments on commit d284a32

Please sign in to comment.