Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report aggregation overflows as OTel metrics #86

Merged
merged 2 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ The delay in processing a batch based on the youngest APM event received in the
- [`aggregation_interval`](#aggregation_interval)
- [`outcome`](#outcome)

#### `metrics.overflowed.count`

- Type: `Int64Counter`

Estimated number of metric aggregation keys that resulted in an overflow, per interval and aggregation type.

##### Dimensions

- [`aggregation_interval`](#aggregation_interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also mention the combined_metrics_id dimension.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, indeed - will fix, thanks.

- [`aggregation_type`](#aggregation_type)

#### `pebble.flushes`

- Type: `Int64ObservableCounter`
Expand Down Expand Up @@ -172,6 +183,11 @@ option is not supplied then this dimension is omitted.
Holds the value of aggregation interval for which the combined metrics is produced.
For example: `1m`, `10m`, etc.

#### `aggregation_type`

Holds the the aggregation type for which an overflow occurred.
For example: `service`, `transaction`, `service_transaction`, `service_destination`.

#### `outcome`

##### `success`
Expand Down
69 changes: 52 additions & 17 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
const (
dbCommitThresholdBytes = 10 * 1024 * 1024 // commit every 10MB
aggregationIvlKey = "aggregation_interval"
aggregationTypeKey = "aggregation_type"
)

var (
Expand Down Expand Up @@ -496,13 +497,27 @@ func (a *Aggregator) harvestForInterval(
}
cmCount++

attrSetOpt := metric.WithAttributeSet(
attribute.NewSet(append(
a.cfg.CombinedMetricsIDToKVs(cmk.ID),
ivlAttr,
telemetry.WithSuccess(),
)...),
commonAttrsOpt := metric.WithAttributes(
append(a.cfg.CombinedMetricsIDToKVs(cmk.ID), ivlAttr)...,
)
outcomeAttrOpt := metric.WithAttributes(telemetry.WithSuccess())

// Report the estimated number of overflowed metrics per aggregation interval.
// It is not meaningful to aggregate these across intervals or aggregators,
// as the overflowed aggregation keys may be overlapping sets.
recordMetricsOverflow := func(n uint64, aggregationType string) {
if n == 0 {
return
}
a.metrics.MetricsOverflowed.Add(ctx, int64(n), commonAttrsOpt, metric.WithAttributes(
attribute.String(aggregationTypeKey, aggregationType),
))
}
recordMetricsOverflow(harvestStats.servicesOverflowed, "service")
recordMetricsOverflow(harvestStats.transactionsOverflowed, "transaction")
recordMetricsOverflow(harvestStats.serviceTransactionsOverflowed, "service_transaction")
recordMetricsOverflow(harvestStats.spansOverflowed, "service_destination")

// processingDelay is normalized by subtracting aggregation interval and
// harvest delay, both of which are expected delays. Normalization helps
// us to use the lower (higher resolution) range of the histogram for the
Expand All @@ -518,11 +533,11 @@ func (a *Aggregator) harvestForInterval(
// Negative values are possible at edges due to delays in running the
// harvest loop or time sync issues between agents and server.
queuedDelay := time.Since(harvestStats.youngestEventTimestamp).Seconds()
a.metrics.MinQueuedDelay.Record(ctx, queuedDelay, attrSetOpt)
a.metrics.ProcessingLatency.Record(ctx, processingDelay, attrSetOpt)
a.metrics.MinQueuedDelay.Record(ctx, queuedDelay, commonAttrsOpt, outcomeAttrOpt)
a.metrics.ProcessingLatency.Record(ctx, processingDelay, commonAttrsOpt, outcomeAttrOpt)
// Events harvested have been successfully processed, publish these
// as success. Update the map to keep track of events failed.
a.metrics.EventsProcessed.Add(ctx, harvestStats.eventsTotal, attrSetOpt)
a.metrics.EventsProcessed.Add(ctx, harvestStats.eventsTotal, commonAttrsOpt, outcomeAttrOpt)
cachedEventsStats[cmk.ID] -= harvestStats.eventsTotal
}
err := a.db.DeleteRange(lb, ub, a.writeOptions)
Expand Down Expand Up @@ -563,6 +578,11 @@ func (a *Aggregator) harvestForInterval(
type harvestStats struct {
eventsTotal float64
youngestEventTimestamp time.Time

servicesOverflowed uint64
transactionsOverflowed uint64
serviceTransactionsOverflowed uint64
spansOverflowed uint64
}

func (a *Aggregator) processHarvest(
Expand All @@ -571,20 +591,35 @@ func (a *Aggregator) processHarvest(
cmb []byte,
aggIvl time.Duration,
) (harvestStats, error) {
var hs harvestStats
cm := aggregationpb.CombinedMetricsFromVTPool()
defer cm.ReturnToVTPool()
if err := cm.UnmarshalVT(cmb); err != nil {
return hs, fmt.Errorf("failed to unmarshal metrics: %w", err)
return harvestStats{}, fmt.Errorf("failed to unmarshal metrics: %w", err)
}

// Processor can mutate the CombinedMetrics, so we cannot rely on the
// CombinedMetrics after Processor is called.
eventsTotal := cm.EventsTotal
youngestEventTS := modelpb.ToTime(cm.YoungestEventTimestamp)
// CombinedMetrics after Processor is called. Take a snapshot of the
// fields we record if processing succeeds.
hs := harvestStats{
eventsTotal: cm.EventsTotal,
youngestEventTimestamp: modelpb.ToTime(cm.YoungestEventTimestamp),
servicesOverflowed: hllSketchEstimate(cm.OverflowServicesEstimator),
}
addOverflow := func(o *aggregationpb.Overflow) {
if o == nil {
return
}
hs.transactionsOverflowed += hllSketchEstimate(o.OverflowTransactionsEstimator)
hs.serviceTransactionsOverflowed += hllSketchEstimate(o.OverflowServiceTransactionsEstimator)
hs.spansOverflowed += hllSketchEstimate(o.OverflowSpansEstimator)
}
addOverflow(cm.OverflowServices)
for _, ksm := range cm.ServiceMetrics {
addOverflow(ksm.GetMetrics().GetOverflowGroups())
}

if err := a.cfg.Processor(ctx, cmk, cm, aggIvl); err != nil {
return hs, fmt.Errorf("failed to process combined metrics ID %s: %w", cmk.ID, err)
return harvestStats{}, fmt.Errorf("failed to process combined metrics ID %s: %w", cmk.ID, err)
}
hs.eventsTotal = eventsTotal
hs.youngestEventTimestamp = youngestEventTS
return hs, nil
}
126 changes: 123 additions & 3 deletions aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
apmmodel "go.elastic.co/apm/v2/model"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -991,6 +993,124 @@ func TestAggregateAndHarvest(t *testing.T) {
))
}

func TestHarvestOverflowCount(t *testing.T) {
ivls := []time.Duration{time.Minute}
reader := metric.NewManualReader()
meter := metric.NewMeterProvider(metric.WithReader(reader)).Meter("test")

limits := Limits{
MaxSpanGroups: 4,
MaxSpanGroupsPerService: 4,
MaxTransactionGroups: 3,
MaxTransactionGroupsPerService: 3,
MaxServiceTransactionGroups: 2,
MaxServiceTransactionGroupsPerService: 2,
MaxServices: 1,
}
agg := newTestAggregator(t,
WithLimits(limits),
WithAggregationIntervals(ivls),
WithMeter(meter),
WithCombinedMetricsIDToKVs(func(id [16]byte) []attribute.KeyValue {
return []attribute.KeyValue{attribute.String("id_key", "id_value")}
}),
)

var batch modelpb.Batch
for i := 0; i < limits.MaxServices+1; i++ {
serviceName := fmt.Sprintf("service_name_%d", i)
for i := 0; i < limits.MaxTransactionGroups+1; i++ {
transactionName := fmt.Sprintf("transaction_name_%d", i)
transactionType := fmt.Sprintf(
"transaction_type_%d", i%(limits.MaxServiceTransactionGroups+1),
)
batch = append(batch, &modelpb.APMEvent{
Service: &modelpb.Service{Name: serviceName},
Transaction: &modelpb.Transaction{
Name: transactionName,
Type: transactionType,
RepresentativeCount: 1,
},
})
}
for i := 0; i < limits.MaxSpanGroups+1; i++ {
serviceTargetName := fmt.Sprintf("service_target_name_%d", i)
batch = append(batch, &modelpb.APMEvent{
Service: &modelpb.Service{
Name: serviceName,
Target: &modelpb.ServiceTarget{
Name: serviceTargetName,
Type: "service_target_type",
},
},
Span: &modelpb.Span{
Name: "span_name",
Type: "span_type",
RepresentativeCount: 1,
},
})
}
}
cmID := EncodeToCombinedMetricsKeyID(t, "cm_id")
require.NoError(t, agg.AggregateBatch(context.Background(), cmID, &batch))

// Force harvest.
require.NoError(t, agg.Close(context.Background()))

var resourceMetrics metricdata.ResourceMetrics
require.NoError(t, reader.Collect(context.Background(), &resourceMetrics))
require.Len(t, resourceMetrics.ScopeMetrics, 1)
scopeMetrics := resourceMetrics.ScopeMetrics[0]

expected := metricdata.Sum[int64]{
IsMonotonic: true,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{{
Attributes: attribute.NewSet(
attribute.String(aggregationIvlKey, "1m"),
attribute.String(aggregationTypeKey, "service"),
attribute.String("id_key", "id_value"),
),
Value: 1,
}, {
Attributes: attribute.NewSet(
attribute.String(aggregationIvlKey, "1m"),
attribute.String(aggregationTypeKey, "service_destination"),
attribute.String("id_key", "id_value"),
),
Value: int64(limits.MaxSpanGroups) + 2,
}, {
Attributes: attribute.NewSet(
attribute.String(aggregationIvlKey, "1m"),
attribute.String(aggregationTypeKey, "service_transaction"),
attribute.String("id_key", "id_value"),
),
Value: int64(limits.MaxServiceTransactionGroups) + 2,
}, {
Attributes: attribute.NewSet(
attribute.String(aggregationIvlKey, "1m"),
attribute.String(aggregationTypeKey, "transaction"),
attribute.String("id_key", "id_value"),
),
Value: int64(limits.MaxTransactionGroups) + 2,
}},
}

var found bool
for _, metric := range scopeMetrics.Metrics {
if metric.Name != "metrics.overflowed.count" {
continue
}
metricdatatest.AssertAggregationsEqual(
t, expected, metric.Data,
metricdatatest.IgnoreTimestamp(),
)
found = true
break
}
assert.True(t, found)
}

func TestRunStopOrchestration(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -1161,8 +1281,8 @@ func BenchmarkAggregateBatchParallel(b *testing.B) {
})
}

func newTestAggregator(tb testing.TB) *Aggregator {
agg, err := New(
func newTestAggregator(tb testing.TB, opts ...Option) *Aggregator {
agg, err := New(append([]Option{
WithDataDir(tb.TempDir()),
WithLimits(Limits{
MaxSpanGroups: 1000,
Expand All @@ -1176,7 +1296,7 @@ func newTestAggregator(tb testing.TB) *Aggregator {
WithProcessor(noOpProcessor()),
WithAggregationIntervals([]time.Duration{time.Second, time.Minute, time.Hour}),
WithLogger(zap.NewNop()),
)
}, opts...)...)
if err != nil {
tb.Fatal(err)
}
Expand Down
9 changes: 9 additions & 0 deletions aggregators/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,15 @@ func hllBytes(estimator *hyperloglog.Sketch) []byte {
return b
}

// hllSketchEstimate returns hllSketch(estimator).Estimate() if estimator is
// non-nil, and zero if estimator is nil.
func hllSketchEstimate(estimator []byte) uint64 {
if sketch := hllSketch(estimator); sketch != nil {
return sketch.Estimate()
}
return 0
}

func hllSketch(estimator []byte) *hyperloglog.Sketch {
if len(estimator) == 0 {
return nil
Expand Down
11 changes: 11 additions & 0 deletions aggregators/internal/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Metrics struct {
BytesProcessed metric.Int64Counter
MinQueuedDelay metric.Float64Histogram
ProcessingLatency metric.Float64Histogram
MetricsOverflowed metric.Int64Counter

// Asynchronous metrics used to get pebble metrics and
// record measurements. These are kept unexported as they are
Expand Down Expand Up @@ -99,6 +100,16 @@ func NewMetrics(provider pebbleProvider, opts ...Option) (*Metrics, error) {
if err != nil {
return nil, fmt.Errorf("failed to create metric for queued delay: %w", err)
}
i.MetricsOverflowed, err = meter.Int64Counter(
"metrics.overflowed.count",
metric.WithDescription(
"Estimated number of metric aggregation keys that resulted in an overflow, per interval and aggregation type",
),
metric.WithUnit(countUnit),
)
if err != nil {
return nil, fmt.Errorf("failed to create metric for metrics overflowed: %w", err)
}

// Pebble metrics
i.pebbleFlushes, err = meter.Int64ObservableCounter(
Expand Down