Skip to content

Commit

Permalink
Store timestamp as uint64 in protobuf (#15)
Browse files Browse the repository at this point in the history
Store timestamp as uint64 in protobuf so no pointer is needed for timestamp field
  • Loading branch information
carsonip authored Jul 11, 2023
1 parent 0addc06 commit 1a75213
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 448 deletions.
632 changes: 311 additions & 321 deletions aggregationpb/aggregation.pb.go

Large diffs are not rendered by default.

132 changes: 16 additions & 116 deletions aggregationpb/aggregation_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ func TestHarvest(t *testing.T) {
expectedMeasurements = append(expectedMeasurements, apmmodel.Metrics{
Samples: map[string]apmmodel.Metric{
"aggregator.requests.total": {Value: 1},
"aggregator.bytes.ingested": {Value: 273},
"aggregator.bytes.ingested": {Value: 261},
},
Labels: apmmodel.StringMap{
apmmodel.StringMapItem{Key: "id_key", Value: cmID},
Expand Down
11 changes: 5 additions & 6 deletions aggregators/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ import (
"sort"
"time"

"google.golang.org/protobuf/types/known/timestamppb"

"github.com/axiomhq/hyperloglog"

"github.com/elastic/apm-aggregation/aggregationpb"
"github.com/elastic/apm-aggregation/aggregators/internal/hdrhistogram"
"github.com/elastic/apm-aggregation/aggregators/internal/timestamppb"
"github.com/elastic/apm-data/model/modelpb"
)

Expand Down Expand Up @@ -74,7 +73,7 @@ func (m *CombinedMetrics) ToProto() *aggregationpb.CombinedMetrics {
pb.OverflowServices = m.OverflowServices.ToProto()
pb.OverflowServiceInstancesEstimator = hllBytes(m.OverflowServiceInstancesEstimator)
pb.EventsTotal = m.eventsTotal
pb.YoungestEventTimestamp = timestamppb.New(m.youngestEventTimestamp)
pb.YoungestEventTimestamp = timestamppb.TimeToPBTimestamp(m.youngestEventTimestamp)
return pb
}

Expand All @@ -93,7 +92,7 @@ func (m *CombinedMetrics) FromProto(pb *aggregationpb.CombinedMetrics) {
}
m.OverflowServiceInstancesEstimator = hllSketch(pb.OverflowServiceInstancesEstimator)
m.eventsTotal = pb.EventsTotal
m.youngestEventTimestamp = pb.GetYoungestEventTimestamp().AsTime()
m.youngestEventTimestamp = timestamppb.PBTimestampToTime(pb.YoungestEventTimestamp)
}

// MarshalBinary marshals CombinedMetrics to binary using protobuf.
Expand All @@ -117,7 +116,7 @@ func (m *CombinedMetrics) UnmarshalBinary(data []byte) error {
// ToProto converts ServiceAggregationKey to its protobuf representation.
func (k *ServiceAggregationKey) ToProto() *aggregationpb.ServiceAggregationKey {
pb := aggregationpb.ServiceAggregationKeyFromVTPool()
pb.Timestamp = timestamppb.New(k.Timestamp)
pb.Timestamp = timestamppb.TimeToPBTimestamp(k.Timestamp)
pb.ServiceName = k.ServiceName
pb.ServiceEnvironment = k.ServiceEnvironment
pb.ServiceLanguageName = k.ServiceLanguageName
Expand All @@ -127,7 +126,7 @@ func (k *ServiceAggregationKey) ToProto() *aggregationpb.ServiceAggregationKey {

// FromProto converts protobuf representation to ServiceAggregationKey.
func (k *ServiceAggregationKey) FromProto(pb *aggregationpb.ServiceAggregationKey) {
k.Timestamp = pb.Timestamp.AsTime()
k.Timestamp = timestamppb.PBTimestampToTime(pb.Timestamp)
k.ServiceName = pb.ServiceName
k.ServiceEnvironment = pb.ServiceEnvironment
k.ServiceLanguageName = pb.ServiceLanguageName
Expand Down
19 changes: 19 additions & 0 deletions aggregators/internal/timestamppb/timestamppb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

// Package timestamppb provides helper functions to encode and decode time.Time
// for protobuf.
package timestamppb

import "time"

// TimeToPBTimestamp encodes a time.Time to Unix epoch nanos in uint64 for protobuf.
func TimeToPBTimestamp(t time.Time) uint64 {
return uint64(t.UnixNano())
}

// PBTimestampToTime decodes a uint64 of Unix epoch nanos to a time.Time for protobuf.
func PBTimestampToTime(timestamp uint64) time.Time {
return time.Unix(0, int64(timestamp)).UTC()
}
17 changes: 17 additions & 0 deletions aggregators/internal/timestamppb/timestamppb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package timestamppb

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTimeToPBTimestamp(t *testing.T) {
tt := time.Date(2000, 1, 2, 3, 4, 5, 6, time.UTC)
assert.Equal(t, tt, PBTimestampToTime(TimeToPBTimestamp(tt)))
}
6 changes: 2 additions & 4 deletions proto/aggregation.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ package elastic.apm;
option go_package = "./aggregationpb";
option optimize_for = SPEED;

import "google/protobuf/timestamp.proto";

message CombinedMetrics {
repeated KeyedServiceMetrics service_metrics = 1;
Overflow overflow_services = 2;
bytes overflow_service_instances_estimator = 3;
int64 events_total = 4;
google.protobuf.Timestamp youngest_event_timestamp = 5;
uint64 youngest_event_timestamp = 5;
}

message KeyedServiceMetrics {
Expand All @@ -21,7 +19,7 @@ message KeyedServiceMetrics {
}

message ServiceAggregationKey {
google.protobuf.Timestamp timestamp = 1;
uint64 timestamp = 1;
string service_name = 2;
string service_environment = 3;
string service_language_name = 4;
Expand Down

0 comments on commit 1a75213

Please sign in to comment.