Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to sync.Map for cached event stats #29

Merged
merged 3 commits into from
Jul 20, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 75 additions & 60 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -53,7 +54,7 @@ type Aggregator struct {
mu sync.Mutex
processingTime time.Time
batch *pebble.Batch
cachedStats map[time.Duration]map[[16]byte]stats
cachedEvents cachedEventsMap

stopping chan struct{}
runStarted atomic.Bool
Expand All @@ -62,17 +63,69 @@ type Aggregator struct {
metrics *telemetry.Metrics
}

// stats is used to cache request based stats accepted by the
// pipeline and publish them once harvest is performed. This
// fixes the time shift introduced by the aggregation interval.
// Mostly useful for metrics which needs to be correlated with
// harvest time metrics.
type stats struct {
eventsTotal float64
// cachedEventsMap holds a counts of cached events, keyed by interval and ID.
// Cached events are events that have been processed by Aggregate methods,
// but which haven't yet been harvested. Event counts are fractional because
// an event may be spread over multiple partitions.
//
// Access to the map is protected with a mutex. During harvest, an exclusive
// (write) lock is held. Concurrent aggregations may perform atomic updates
// to the map, and the harvester may assume that the map will not be modified
// while it is reading it.
type cachedEventsMap struct {
// (interval, id) -> count
m sync.Map
countPool sync.Pool
}

func (s *stats) merge(from stats) {
s.eventsTotal += from.eventsTotal
func (m *cachedEventsMap) loadAndDelete(end time.Time) map[time.Duration]map[[16]byte]float64 {
loaded := make(map[time.Duration]map[[16]byte]float64)
m.m.Range(func(k, v any) bool {
key := k.(cachedEventsStatsKey)
if !end.Truncate(key.interval).Equal(end) {
return true
}
intervalMetrics, ok := loaded[key.interval]
if !ok {
intervalMetrics = make(map[[16]byte]float64)
loaded[key.interval] = intervalMetrics
}
vscaled := *v.(*uint64)
value := float64(vscaled / math.MaxUint16)
intervalMetrics[key.id] = value
m.m.Delete(k)
m.countPool.Put(v)
return true
})
return loaded
}

func (m *cachedEventsMap) add(interval time.Duration, id [16]byte, n float64) {
// We use a pool for the value to minimise allocations, as it will
// always escape to the heap through LoadOrStore.
nscaled, ok := m.countPool.Get().(*uint64)
if !ok {
nscaled = new(uint64)
}
// Scale by the maximum number of partitions to get an integer value,
// for simpler atomic operations.
*nscaled = uint64(n * math.MaxUint16)
key := cachedEventsStatsKey{interval: interval, id: id}
old, loaded := m.m.Load(key)
if !loaded {
old, loaded = m.m.LoadOrStore(key, nscaled)
if !loaded {
// Stored a new value.
return
}
}
atomic.AddUint64(old.(*uint64), *nscaled)
m.countPool.Put(nscaled)
}

type cachedEventsStatsKey struct {
interval time.Duration
id [16]byte
}

// New returns a new aggregator instance.
Expand Down Expand Up @@ -112,21 +165,12 @@ func New(opts ...Option) (*Aggregator, error) {
db: pb,
cfg: cfg,
processingTime: time.Now().Truncate(cfg.AggregationIntervals[0]),
cachedStats: newCachedStats(cfg.AggregationIntervals),
stopping: make(chan struct{}),
runStopped: make(chan struct{}),
metrics: metrics,
}, nil
}

func newCachedStats(ivls []time.Duration) map[time.Duration]map[[16]byte]stats {
m := make(map[time.Duration]map[[16]byte]stats, len(ivls))
for _, ivl := range ivls {
m[ivl] = make(map[[16]byte]stats)
}
return m
}

// AggregateBatch aggregates all events in the batch. This function will return
// an error if the aggregator's Run loop has errored or has been explicitly stopped.
// However, it doesn't require aggregator to be running to perform aggregation.
Expand Down Expand Up @@ -161,9 +205,7 @@ func (a *Aggregator) AggregateBatch(
}
totalBytesIn += int64(bytesIn)
}
cmStats := a.cachedStats[ivl][id]
cmStats.eventsTotal += float64(len(*b))
a.cachedStats[ivl][id] = cmStats
a.cachedEvents.add(ivl, id, float64(len(*b)))
}

cmIDAttrSet := attribute.NewSet(cmIDAttrs...)
Expand Down Expand Up @@ -204,15 +246,7 @@ func (a *Aggregator) AggregateCombinedMetrics(
}

bytesIn, err := a.aggregate(ctx, cmk, cm)

if _, ok := a.cachedStats[cmk.Interval]; !ok {
// Protection for stats collected from a different instance
// of aggregator as aggregators can be chained.
a.cachedStats[cmk.Interval] = make(map[[16]byte]stats)
}
cmStats := a.cachedStats[cmk.Interval][cmk.ID]
cmStats.eventsTotal += cm.EventsTotal
a.cachedStats[cmk.Interval][cmk.ID] = cmStats
a.cachedEvents.add(cmk.Interval, cmk.ID, cm.EventsTotal)

span.SetAttributes(attribute.Int("bytes_ingested", bytesIn))
cmIDAttrSet := attribute.NewSet(cmIDAttrs...)
Expand All @@ -236,7 +270,6 @@ func (a *Aggregator) Run(ctx context.Context) error {

to := a.processingTime.Add(a.cfg.AggregationIntervals[0])
timer := time.NewTimer(time.Until(to.Add(a.cfg.HarvestDelay)))
harvestStats := newCachedStats(a.cfg.AggregationIntervals)
defer timer.Stop()
for {
select {
Expand All @@ -251,22 +284,10 @@ func (a *Aggregator) Run(ctx context.Context) error {
batch := a.batch
a.batch = nil
a.processingTime = to
for ivl, statsm := range a.cachedStats {
if _, ok := harvestStats[ivl]; !ok {
// Protection for stats collected from a different instance
// of aggregator as aggregators can be chained.
harvestStats[ivl] = make(map[[16]byte]stats)
}
for cmID, cmStats := range statsm {
hstats := harvestStats[ivl][cmID]
hstats.merge(cmStats)
harvestStats[ivl][cmID] = hstats
delete(statsm, cmID)
}
}
cachedEventsStats := a.cachedEvents.loadAndDelete(to)
a.mu.Unlock()

if err := a.commitAndHarvest(ctx, batch, to, harvestStats); err != nil {
if err := a.commitAndHarvest(ctx, batch, to, cachedEventsStats); err != nil {
a.cfg.Logger.Warn("failed to commit and harvest metrics", zap.Error(err))
}
to = to.Add(a.cfg.AggregationIntervals[0])
Expand Down Expand Up @@ -322,7 +343,7 @@ func (a *Aggregator) Stop(ctx context.Context) error {
// TODO (lahsivjar): It is possible to harvest the same
// time multiple times, not an issue but can be optimized.
to := a.processingTime.Truncate(ivl).Add(ivl)
if err := a.harvest(ctx, to, a.cachedStats); err != nil {
if err := a.harvest(ctx, to, a.cachedEvents.loadAndDelete(to)); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf(
"failed to harvest metrics for interval %s: %w", formatDuration(ivl), err),
Expand Down Expand Up @@ -405,7 +426,7 @@ func (a *Aggregator) commitAndHarvest(
ctx context.Context,
batch *pebble.Batch,
to time.Time,
harvestStats map[time.Duration]map[[16]byte]stats,
cachedEventsStats map[time.Duration]map[[16]byte]float64,
) error {
ctx, span := a.cfg.Tracer.Start(ctx, "commitAndHarvest")
defer span.End()
Expand All @@ -421,7 +442,7 @@ func (a *Aggregator) commitAndHarvest(
errs = append(errs, fmt.Errorf("failed to close batch before harvest: %w", err))
}
}
if err := a.harvest(ctx, to, harvestStats); err != nil {
if err := a.harvest(ctx, to, cachedEventsStats); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf("failed to harvest aggregated metrics: %w", err))
}
Expand All @@ -437,7 +458,7 @@ func (a *Aggregator) commitAndHarvest(
func (a *Aggregator) harvest(
ctx context.Context,
end time.Time,
harvestStats map[time.Duration]map[[16]byte]stats,
cachedEventsStats map[time.Duration]map[[16]byte]float64,
) error {
snap := a.db.NewSnapshot()
defer snap.Close()
Expand All @@ -448,7 +469,7 @@ func (a *Aggregator) harvest(
if end.Truncate(ivl).Equal(end) {
start := end.Add(-ivl)
cmCount, err := a.harvestForInterval(
ctx, snap, start, end, ivl, harvestStats[ivl],
ctx, snap, start, end, ivl, cachedEventsStats[ivl],
)
if err != nil {
errs = append(errs, fmt.Errorf(
Expand Down Expand Up @@ -477,7 +498,7 @@ func (a *Aggregator) harvestForInterval(
snap *pebble.Snapshot,
start, end time.Time,
ivl time.Duration,
cmStats map[[16]byte]stats,
cachedEventsStats map[[16]byte]float64,
) (int, error) {
from := CombinedMetricsKey{
Interval: ivl,
Expand All @@ -498,15 +519,9 @@ func (a *Aggregator) harvestForInterval(
// stopped when the L2 aggregator is waiting for harvest delay leading to
// premature harvest as part of the graceful shutdown process.
ivlAttr := attribute.String(aggregationIvlKey, formatDuration(ivl))
for cmID, stats := range cmStats {
for cmID, eventsTotal := range cachedEventsStats {
attrs := append(a.cfg.CombinedMetricsIDToKVs(cmID), ivlAttr)
a.metrics.EventsTotal.Add(
ctx, stats.eventsTotal,
metric.WithAttributeSet(
attribute.NewSet(attrs...),
),
)
delete(cmStats, cmID)
a.metrics.EventsTotal.Add(ctx, eventsTotal, metric.WithAttributes(attrs...))
}

iter := snap.NewIter(&pebble.IterOptions{
Expand Down