Skip to content

Commit

Permalink
Shard writes across NumCPU batches
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Jul 20, 2023
1 parent a2be42e commit c8ff068
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 37 deletions.
97 changes: 64 additions & 33 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -51,9 +53,9 @@ type Aggregator struct {
db *pebble.DB
cfg Config

mu sync.Mutex
mu sync.RWMutex
processingTime time.Time
batch *pebble.Batch
batches []*lockedBatch
cachedEvents cachedEventsMap

stopping chan struct{}
Expand All @@ -63,6 +65,11 @@ type Aggregator struct {
metrics *telemetry.Metrics
}

type lockedBatch struct {
mu sync.Mutex
*pebble.Batch
}

// cachedEventsMap holds a counts of cached events, keyed by interval and ID.
// Cached events are events that have been processed by Aggregate methods,
// but which haven't yet been harvested. Event counts are fractional because
Expand Down Expand Up @@ -161,8 +168,15 @@ func New(opts ...Option) (*Aggregator, error) {
return nil, fmt.Errorf("failed to create metrics: %w", err)
}

// TODO(axw) make the number of batches configurable?
batches := make([]*lockedBatch, runtime.NumCPU())
for i := range batches {
batches[i] = &lockedBatch{Batch: pb.NewBatch()}
}

return &Aggregator{
db: pb,
batches: batches,
cfg: cfg,
processingTime: time.Now().Truncate(cfg.AggregationIntervals[0]),
stopping: make(chan struct{}),
Expand All @@ -179,11 +193,6 @@ func (a *Aggregator) AggregateBatch(
id [16]byte,
b *modelpb.Batch,
) error {
cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(id)

a.mu.Lock()
defer a.mu.Unlock()

select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -192,6 +201,10 @@ func (a *Aggregator) AggregateBatch(
default:
}

cmIDAttrs := a.cfg.CombinedMetricsIDToKVs(id)
a.mu.RLock()
defer a.mu.RUnlock()

var errs []error
var totalBytesIn int64
cmk := CombinedMetricsKey{ID: id}
Expand Down Expand Up @@ -234,8 +247,8 @@ func (a *Aggregator) AggregateCombinedMetrics(
ctx, span := a.cfg.Tracer.Start(ctx, "AggregateCombinedMetrics", trace.WithAttributes(traceAttrs...))
defer span.End()

a.mu.Lock()
defer a.mu.Unlock()
a.mu.RLock()
defer a.mu.RUnlock()

select {
case <-ctx.Done():
Expand Down Expand Up @@ -280,14 +293,20 @@ func (a *Aggregator) Run(ctx context.Context) error {
case <-timer.C:
}

var commitBatches []*pebble.Batch
a.mu.Lock()
batch := a.batch
a.batch = nil
for _, batch := range a.batches {
if batch.Batch.Empty() {
continue
}
commitBatches = append(commitBatches, batch.Batch)
batch.Batch = a.db.NewBatch()
}
a.processingTime = to
cachedEventsStats := a.cachedEvents.loadAndDelete(to)
a.mu.Unlock()

if err := a.commitAndHarvest(ctx, batch, to, cachedEventsStats); err != nil {
if err := a.commitAndHarvest(ctx, commitBatches, to, cachedEventsStats); err != nil {
a.cfg.Logger.Warn("failed to commit and harvest metrics", zap.Error(err))
}
to = to.Add(a.cfg.AggregationIntervals[0])
Expand Down Expand Up @@ -323,17 +342,19 @@ func (a *Aggregator) Stop(ctx context.Context) error {

if a.db != nil {
a.cfg.Logger.Info("running final aggregation")
if a.batch != nil {
if err := a.batch.Commit(pebble.Sync); err != nil {

for _, batch := range a.batches {
if err := batch.Commit(pebble.Sync); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to commit batch: %w", err)
}
if err := a.batch.Close(); err != nil {
if err := batch.Close(); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to close batch: %w", err)
}
a.batch = nil
}
a.batches = nil

var errs []error
for _, ivl := range a.cfg.AggregationIntervals {
// At any particular time there will be 1 harvest candidate for
Expand Down Expand Up @@ -391,14 +412,29 @@ func (a *Aggregator) aggregate(
ctx context.Context,
cmk CombinedMetricsKey,
cm *aggregationpb.CombinedMetrics,
) (int, error) {
if a.batch == nil {
// Batch is backed by a sync pool. After each commit we will release the batch
// back to the pool by calling Batch#Close and subsequently acquire a new batch.
a.batch = a.db.NewBatch()
}
) (_ int, resultErr error) {

op := a.batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT())
// We conditionally commit and close the batch if it is large enough after writing.
// We do this after releasing the lock to avoid holding up other writers.
var commitBatch *pebble.Batch
defer func() {
if commitBatch == nil {
return
}
if err := commitBatch.Commit(pebble.Sync); err != nil {
resultErr = errors.Join(resultErr, fmt.Errorf("failed to commit pebble batch: %w", err))
}
if err := commitBatch.Close(); err != nil {
resultErr = errors.Join(resultErr, fmt.Errorf("failed to close pebble batch: %w", err))
}
}()

// Aggregate into a random batch, to minimise lock contention across goroutines.
batch := a.batches[rand.Intn(len(a.batches))]
batch.mu.Lock()
defer batch.mu.Unlock()

op := batch.MergeDeferred(cmk.SizeBinary(), cm.SizeVT())
if err := cmk.MarshalBinaryToSizedBuffer(op.Key); err != nil {
return 0, fmt.Errorf("failed to marshal combined metrics key: %w", err)
}
Expand All @@ -410,29 +446,24 @@ func (a *Aggregator) aggregate(
}

bytesIn := cm.SizeVT()
if a.batch.Len() >= dbCommitThresholdBytes {
if err := a.batch.Commit(pebble.Sync); err != nil {
return bytesIn, fmt.Errorf("failed to commit pebble batch: %w", err)
}
if err := a.batch.Close(); err != nil {
return bytesIn, fmt.Errorf("failed to close pebble batch: %w", err)
}
a.batch = nil
if batch.Len() >= dbCommitThresholdBytes {
commitBatch = batch.Batch
batch.Batch = a.db.NewBatch()
}
return bytesIn, nil
}

func (a *Aggregator) commitAndHarvest(
ctx context.Context,
batch *pebble.Batch,
batches []*pebble.Batch,
to time.Time,
cachedEventsStats map[time.Duration]map[[16]byte]float64,
) error {
ctx, span := a.cfg.Tracer.Start(ctx, "commitAndHarvest")
defer span.End()

var errs []error
if batch != nil {
for _, batch := range batches {
if err := batch.Commit(pebble.Sync); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf("failed to commit batch before harvest: %w", err))
Expand Down
7 changes: 3 additions & 4 deletions aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,14 +1198,13 @@ func newTestAggregator(tb testing.TB) *Aggregator {
}

func flushTestAggregator(tb testing.TB, agg *Aggregator) {
if agg.batch != nil {
if err := agg.batch.Commit(pebble.Sync); err != nil {
for _, batch := range agg.batches {
if err := batch.Commit(pebble.Sync); err != nil {
tb.Fatal(err)
}
if err := agg.batch.Close(); err != nil {
if err := batch.Close(); err != nil {
tb.Fatal(err)
}
agg.batch = nil
}
if err := agg.db.Close(); err != nil {
tb.Fatal(err)
Expand Down

0 comments on commit c8ff068

Please sign in to comment.