Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shard batches #31

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 64 additions & 33 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -51,9 +53,9 @@ type Aggregator struct {
db *pebble.DB
cfg Config

mu sync.Mutex
mu sync.RWMutex
processingTime time.Time
batch *pebble.Batch
batches []*lockedBatch
cachedEvents cachedEventsMap

stopping chan struct{}
Expand All @@ -63,6 +65,11 @@ type Aggregator struct {
metrics *telemetry.Metrics
}

type lockedBatch struct {
mu sync.Mutex
*pebble.Batch
}

// cachedEventsMap holds a counts of cached events, keyed by interval and ID.
// Cached events are events that have been processed by Aggregate methods,
// but which haven't yet been harvested. Event counts are fractional because
Expand Down Expand Up @@ -161,8 +168,15 @@ func New(opts ...Option) (*Aggregator, error) {
return nil, fmt.Errorf("failed to create metrics: %w", err)
}

// TODO(axw) make the number of batches configurable?
batches := make([]*lockedBatch, runtime.NumCPU())
for i := range batches {
batches[i] = &lockedBatch{Batch: pb.NewBatch()}
}

return &Aggregator{
db: pb,
batches: batches,
cfg: cfg,
processingTime: time.Now().Truncate(cfg.AggregationIntervals[0]),
stopping: make(chan struct{}),
Expand All @@ -179,11 +193,6 @@ func (a *Aggregator) AggregateBatch(
id [16]byte,
b *modelpb.Batch,
) error {
cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(id)

a.mu.Lock()
defer a.mu.Unlock()

select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -192,6 +201,10 @@ func (a *Aggregator) AggregateBatch(
default:
}

cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(id)
a.mu.RLock()
defer a.mu.RUnlock()

var errs []error
var totalBytesIn int64
cmk := CombinedMetricsKey{ID: id}
Expand Down Expand Up @@ -234,8 +247,8 @@ func (a *Aggregator) AggregateCombinedMetrics(
ctx, span := a.cfg.Tracer.Start(ctx, "AggregateCombinedMetrics", trace.WithAttributes(traceAttrs...))
defer span.End()

a.mu.Lock()
defer a.mu.Unlock()
a.mu.RLock()
defer a.mu.RUnlock()

select {
case <-ctx.Done():
Expand Down Expand Up @@ -280,14 +293,20 @@ func (a *Aggregator) Run(ctx context.Context) error {
case <-timer.C:
}

var commitBatches []*pebble.Batch
a.mu.Lock()
batch := a.batch
a.batch = nil
for _, batch := range a.batches {
if batch.Batch.Empty() {
continue
}
commitBatches = append(commitBatches, batch.Batch)
batch.Batch = a.db.NewBatch()
}
a.processingTime = to
cachedEventsStats := a.cachedEvents.loadAndDelete(to)
a.mu.Unlock()

if err := a.commitAndHarvest(ctx, batch, to, cachedEventsStats); err != nil {
if err := a.commitAndHarvest(ctx, commitBatches, to, cachedEventsStats); err != nil {
a.cfg.Logger.Warn("failed to commit and harvest metrics", zap.Error(err))
}
to = to.Add(a.cfg.AggregationIntervals[0])
Expand Down Expand Up @@ -323,17 +342,19 @@ func (a *Aggregator) Stop(ctx context.Context) error {

if a.db != nil {
a.cfg.Logger.Info("running final aggregation")
if a.batch != nil {
if err := a.batch.Commit(pebble.Sync); err != nil {

for _, batch := range a.batches {
if err := batch.Commit(pebble.Sync); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to commit batch: %w", err)
}
if err := a.batch.Close(); err != nil {
if err := batch.Close(); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to close batch: %w", err)
}
a.batch = nil
}
a.batches = nil

var errs []error
for _, ivl := range a.cfg.AggregationIntervals {
// At any particular time there will be 1 harvest candidate for
Expand Down Expand Up @@ -391,14 +412,29 @@ func (a *Aggregator) aggregate(
ctx context.Context,
cmk CombinedMetricsKey,
cm *aggregationpb.CombinedMetrics,
) (int, error) {
if a.batch == nil {
// Batch is backed by a sync pool. After each commit we will release the batch
// back to the pool by calling Batch#Close and subsequently acquire a new batch.
a.batch = a.db.NewBatch()
}
) (_ int, resultErr error) {

op := a.batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT())
// We conditionally commit and close the batch if it is large enough after writing.
// We do this after releasing the lock to avoid holding up other writers.
var commitBatch *pebble.Batch
defer func() {
if commitBatch == nil {
return
}
if err := commitBatch.Commit(pebble.Sync); err != nil {
resultErr = errors.Join(resultErr, fmt.Errorf("failed to commit pebble batch: %w", err))
}
if err := commitBatch.Close(); err != nil {
resultErr = errors.Join(resultErr, fmt.Errorf("failed to close pebble batch: %w", err))
}
}()

// Aggregate into a random batch, to minimise lock contention across goroutines.
batch := a.batches[rand.Intn(len(a.batches))]
batch.mu.Lock()
defer batch.mu.Unlock()

op := batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT())
if err := cmk.MarshalBinaryToSizedBuffer(op.Key); err != nil {
return 0, fmt.Errorf("failed to marshal combined metrics key: %w", err)
}
Expand All @@ -410,29 +446,24 @@ func (a *Aggregator) aggregate(
}

bytesIn := cm.SizeVT()
if a.batch.Len() >= dbCommitThresholdBytes {
if err := a.batch.Commit(pebble.Sync); err != nil {
return bytesIn, fmt.Errorf("failed to commit pebble batch: %w", err)
}
if err := a.batch.Close(); err != nil {
return bytesIn, fmt.Errorf("failed to close pebble batch: %w", err)
}
a.batch = nil
if batch.Len() >= dbCommitThresholdBytes {
commitBatch = batch.Batch
batch.Batch = a.db.NewBatch()
}
return bytesIn, nil
}

func (a *Aggregator) commitAndHarvest(
ctx context.Context,
batch *pebble.Batch,
batches []*pebble.Batch,
to time.Time,
cachedEventsStats map[time.Duration]map[[16]byte]float64,
) error {
ctx, span := a.cfg.Tracer.Start(ctx, "commitAndHarvest")
defer span.End()

var errs []error
if batch != nil {
for _, batch := range batches {
if err := batch.Commit(pebble.Sync); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf("failed to commit batch before harvest: %w", err))
Expand Down
7 changes: 3 additions & 4 deletions aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,14 +1198,13 @@ func newTestAggregator(tb testing.TB) *Aggregator {
}

func flushTestAggregator(tb testing.TB, agg *Aggregator) {
if agg.batch != nil {
if err := agg.batch.Commit(pebble.Sync); err != nil {
for _, batch := range agg.batches {
if err := batch.Commit(pebble.Sync); err != nil {
tb.Fatal(err)
}
if err := agg.batch.Close(); err != nil {
if err := batch.Close(); err != nil {
tb.Fatal(err)
}
agg.batch = nil
}
if err := agg.db.Close(); err != nil {
tb.Fatal(err)
Expand Down