Skip to content

Commit

Permalink
Switch to sync.Map for cached event stats (#29)
Browse files Browse the repository at this point in the history
* Switch to sync.Map for cached event stats

* Fix releasing back to pool
  • Loading branch information
axw authored Jul 20, 2023
1 parent b34534e commit a2be42e
Showing 1 changed file with 75 additions and 60 deletions.
135 changes: 75 additions & 60 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -53,7 +54,7 @@ type Aggregator struct {
mu sync.Mutex
processingTime time.Time
batch *pebble.Batch
cachedStats map[time.Duration]map[[16]byte]stats
cachedEvents cachedEventsMap

stopping chan struct{}
runStarted atomic.Bool
Expand All @@ -62,17 +63,69 @@ type Aggregator struct {
metrics *telemetry.Metrics
}

// stats is used to cache request based stats accepted by the
// pipeline and publish them once harvest is performed. This
// fixes the time shift introduced by the aggregation interval.
// Mostly useful for metrics which needs to be correlated with
// harvest time metrics.
type stats struct {
eventsTotal float64
// cachedEventsMap holds a counts of cached events, keyed by interval and ID.
// Cached events are events that have been processed by Aggregate methods,
// but which haven't yet been harvested. Event counts are fractional because
// an event may be spread over multiple partitions.
//
// Access to the map is protected with a mutex. During harvest, an exclusive
// (write) lock is held. Concurrent aggregations may perform atomic updates
// to the map, and the harvester may assume that the map will not be modified
// while it is reading it.
type cachedEventsMap struct {
// (interval, id) -> count
m sync.Map
countPool sync.Pool
}

func (s *stats) merge(from stats) {
s.eventsTotal += from.eventsTotal
func (m *cachedEventsMap) loadAndDelete(end time.Time) map[time.Duration]map[[16]byte]float64 {
loaded := make(map[time.Duration]map[[16]byte]float64)
m.m.Range(func(k, v any) bool {
key := k.(cachedEventsStatsKey)
if !end.Truncate(key.interval).Equal(end) {
return true
}
intervalMetrics, ok := loaded[key.interval]
if !ok {
intervalMetrics = make(map[[16]byte]float64)
loaded[key.interval] = intervalMetrics
}
vscaled := *v.(*uint64)
value := float64(vscaled / math.MaxUint16)
intervalMetrics[key.id] = value
m.m.Delete(k)
m.countPool.Put(v)
return true
})
return loaded
}

func (m *cachedEventsMap) add(interval time.Duration, id [16]byte, n float64) {
// We use a pool for the value to minimise allocations, as it will
// always escape to the heap through LoadOrStore.
nscaled, ok := m.countPool.Get().(*uint64)
if !ok {
nscaled = new(uint64)
}
// Scale by the maximum number of partitions to get an integer value,
// for simpler atomic operations.
*nscaled = uint64(n * math.MaxUint16)
key := cachedEventsStatsKey{interval: interval, id: id}
old, loaded := m.m.Load(key)
if !loaded {
old, loaded = m.m.LoadOrStore(key, nscaled)
if !loaded {
// Stored a new value.
return
}
}
atomic.AddUint64(old.(*uint64), *nscaled)
m.countPool.Put(nscaled)
}

type cachedEventsStatsKey struct {
interval time.Duration
id [16]byte
}

// New returns a new aggregator instance.
Expand Down Expand Up @@ -112,21 +165,12 @@ func New(opts ...Option) (*Aggregator, error) {
db: pb,
cfg: cfg,
processingTime: time.Now().Truncate(cfg.AggregationIntervals[0]),
cachedStats: newCachedStats(cfg.AggregationIntervals),
stopping: make(chan struct{}),
runStopped: make(chan struct{}),
metrics: metrics,
}, nil
}

func newCachedStats(ivls []time.Duration) map[time.Duration]map[[16]byte]stats {
m := make(map[time.Duration]map[[16]byte]stats, len(ivls))
for _, ivl := range ivls {
m[ivl] = make(map[[16]byte]stats)
}
return m
}

// AggregateBatch aggregates all events in the batch. This function will return
// an error if the aggregator's Run loop has errored or has been explicitly stopped.
// However, it doesn't require aggregator to be running to perform aggregation.
Expand Down Expand Up @@ -161,9 +205,7 @@ func (a *Aggregator) AggregateBatch(
}
totalBytesIn += int64(bytesIn)
}
cmStats := a.cachedStats[ivl][id]
cmStats.eventsTotal += float64(len(*b))
a.cachedStats[ivl][id] = cmStats
a.cachedEvents.add(ivl, id, float64(len(*b)))
}

cmIDAttrSet := attribute.NewSet(cmIDAttrs...)
Expand Down Expand Up @@ -204,15 +246,7 @@ func (a *Aggregator) AggregateCombinedMetrics(
}

bytesIn, err := a.aggregate(ctx, cmk, cm)

if _, ok := a.cachedStats[cmk.Interval]; !ok {
// Protection for stats collected from a different instance
// of aggregator as aggregators can be chained.
a.cachedStats[cmk.Interval] = make(map[[16]byte]stats)
}
cmStats := a.cachedStats[cmk.Interval][cmk.ID]
cmStats.eventsTotal += cm.EventsTotal
a.cachedStats[cmk.Interval][cmk.ID] = cmStats
a.cachedEvents.add(cmk.Interval, cmk.ID, cm.EventsTotal)

span.SetAttributes(attribute.Int("bytes_ingested", bytesIn))
cmIDAttrSet := attribute.NewSet(cmIDAttrs...)
Expand All @@ -236,7 +270,6 @@ func (a *Aggregator) Run(ctx context.Context) error {

to := a.processingTime.Add(a.cfg.AggregationIntervals[0])
timer := time.NewTimer(time.Until(to.Add(a.cfg.HarvestDelay)))
harvestStats := newCachedStats(a.cfg.AggregationIntervals)
defer timer.Stop()
for {
select {
Expand All @@ -251,22 +284,10 @@ func (a *Aggregator) Run(ctx context.Context) error {
batch := a.batch
a.batch = nil
a.processingTime = to
for ivl, statsm := range a.cachedStats {
if _, ok := harvestStats[ivl]; !ok {
// Protection for stats collected from a different instance
// of aggregator as aggregators can be chained.
harvestStats[ivl] = make(map[[16]byte]stats)
}
for cmID, cmStats := range statsm {
hstats := harvestStats[ivl][cmID]
hstats.merge(cmStats)
harvestStats[ivl][cmID] = hstats
delete(statsm, cmID)
}
}
cachedEventsStats := a.cachedEvents.loadAndDelete(to)
a.mu.Unlock()

if err := a.commitAndHarvest(ctx, batch, to, harvestStats); err != nil {
if err := a.commitAndHarvest(ctx, batch, to, cachedEventsStats); err != nil {
a.cfg.Logger.Warn("failed to commit and harvest metrics", zap.Error(err))
}
to = to.Add(a.cfg.AggregationIntervals[0])
Expand Down Expand Up @@ -322,7 +343,7 @@ func (a *Aggregator) Stop(ctx context.Context) error {
// TODO (lahsivjar): It is possible to harvest the same
// time multiple times, not an issue but can be optimized.
to := a.processingTime.Truncate(ivl).Add(ivl)
if err := a.harvest(ctx, to, a.cachedStats); err != nil {
if err := a.harvest(ctx, to, a.cachedEvents.loadAndDelete(to)); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf(
"failed to harvest metrics for interval %s: %w", formatDuration(ivl), err),
Expand Down Expand Up @@ -405,7 +426,7 @@ func (a *Aggregator) commitAndHarvest(
ctx context.Context,
batch *pebble.Batch,
to time.Time,
harvestStats map[time.Duration]map[[16]byte]stats,
cachedEventsStats map[time.Duration]map[[16]byte]float64,
) error {
ctx, span := a.cfg.Tracer.Start(ctx, "commitAndHarvest")
defer span.End()
Expand All @@ -421,7 +442,7 @@ func (a *Aggregator) commitAndHarvest(
errs = append(errs, fmt.Errorf("failed to close batch before harvest: %w", err))
}
}
if err := a.harvest(ctx, to, harvestStats); err != nil {
if err := a.harvest(ctx, to, cachedEventsStats); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf("failed to harvest aggregated metrics: %w", err))
}
Expand All @@ -437,7 +458,7 @@ func (a *Aggregator) commitAndHarvest(
func (a *Aggregator) harvest(
ctx context.Context,
end time.Time,
harvestStats map[time.Duration]map[[16]byte]stats,
cachedEventsStats map[time.Duration]map[[16]byte]float64,
) error {
snap := a.db.NewSnapshot()
defer snap.Close()
Expand All @@ -448,7 +469,7 @@ func (a *Aggregator) harvest(
if end.Truncate(ivl).Equal(end) {
start := end.Add(-ivl)
cmCount, err := a.harvestForInterval(
ctx, snap, start, end, ivl, harvestStats[ivl],
ctx, snap, start, end, ivl, cachedEventsStats[ivl],
)
if err != nil {
errs = append(errs, fmt.Errorf(
Expand Down Expand Up @@ -477,7 +498,7 @@ func (a *Aggregator) harvestForInterval(
snap *pebble.Snapshot,
start, end time.Time,
ivl time.Duration,
cmStats map[[16]byte]stats,
cachedEventsStats map[[16]byte]float64,
) (int, error) {
from := CombinedMetricsKey{
Interval: ivl,
Expand All @@ -498,15 +519,9 @@ func (a *Aggregator) harvestForInterval(
// stopped when the L2 aggregator is waiting for harvest delay leading to
// premature harvest as part of the graceful shutdown process.
ivlAttr := attribute.String(aggregationIvlKey, formatDuration(ivl))
for cmID, stats := range cmStats {
for cmID, eventsTotal := range cachedEventsStats {
attrs := append(a.cfg.CombinedMetricsIDToKVs(cmID), ivlAttr)
a.metrics.EventsTotal.Add(
ctx, stats.eventsTotal,
metric.WithAttributeSet(
attribute.NewSet(attrs...),
),
)
delete(cmStats, cmID)
a.metrics.EventsTotal.Add(ctx, eventsTotal, metric.WithAttributes(attrs...))
}

iter := snap.NewIter(&pebble.IterOptions{
Expand Down

0 comments on commit a2be42e

Please sign in to comment.