Skip to content

Commit

Permalink
Use NewMem and DisableWAL
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonip committed Jul 24, 2023
1 parent 9dc4f71 commit e2df969
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 11 deletions.
14 changes: 8 additions & 6 deletions aggregators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,10 @@ func New(opts ...Option) (*Aggregator, error) {
},
},
}
if cfg.InMemoryFS {
pebbleOpts.FS = vfs.NewStrictMem()
if cfg.InMemory {
pebbleOpts.FS = vfs.NewMem()
pebbleOpts.DisableWAL = true
cfg.WriteOpts = pebble.NoSync
}
pb, err := pebble.Open(cfg.DataDir, pebbleOpts)
if err != nil {
Expand Down Expand Up @@ -329,7 +331,7 @@ func (a *Aggregator) Stop(ctx context.Context) error {
if a.db != nil {
a.cfg.Logger.Info("running final aggregation")
if a.batch != nil {
if err := a.batch.Commit(pebble.Sync); err != nil {
if err := a.batch.Commit(a.cfg.WriteOpts); err != nil {
span.RecordError(err)
return fmt.Errorf("failed to commit batch: %w", err)
}
Expand Down Expand Up @@ -416,7 +418,7 @@ func (a *Aggregator) aggregate(

bytesIn := cm.SizeVT()
if a.batch.Len() >= dbCommitThresholdBytes {
if err := a.batch.Commit(pebble.Sync); err != nil {
if err := a.batch.Commit(a.cfg.WriteOpts); err != nil {
return bytesIn, fmt.Errorf("failed to commit pebble batch: %w", err)
}
if err := a.batch.Close(); err != nil {
Expand All @@ -438,7 +440,7 @@ func (a *Aggregator) commitAndHarvest(

var errs []error
if batch != nil {
if err := batch.Commit(pebble.Sync); err != nil {
if err := batch.Commit(a.cfg.WriteOpts); err != nil {
span.RecordError(err)
errs = append(errs, fmt.Errorf("failed to commit batch before harvest: %w", err))
}
Expand Down Expand Up @@ -572,7 +574,7 @@ func (a *Aggregator) harvestForInterval(
a.metrics.ProcessingDelay.Record(ctx, processingDelay, attrSet)
a.metrics.EventsProcessed.Add(ctx, harvestStats.eventsTotal, attrSet)
}
err := a.db.DeleteRange(lb, ub, pebble.Sync)
err := a.db.DeleteRange(lb, ub, a.cfg.WriteOpts)
if len(errs) > 0 {
err = errors.Join(err, fmt.Errorf(
"failed to process %d out of %d metrics:\n%w",
Expand Down
2 changes: 1 addition & 1 deletion aggregators/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ func newTestAggregator(tb testing.TB) *Aggregator {

func flushTestAggregator(tb testing.TB, agg *Aggregator) {
if agg.batch != nil {
if err := agg.batch.Commit(pebble.Sync); err != nil {
if err := agg.batch.Commit(agg.cfg.WriteOpts); err != nil {
tb.Fatal(err)
}
if err := agg.batch.Close(); err != nil {
Expand Down
15 changes: 11 additions & 4 deletions aggregators/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"time"

"github.com/cockroachdb/pebble"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -43,7 +44,8 @@ type Config struct {
AggregationIntervals []time.Duration
HarvestDelay time.Duration
CombinedMetricsIDToKVs func([16]byte) []attribute.KeyValue
InMemoryFS bool
InMemory bool
WriteOpts *pebble.WriteOptions

Meter metric.Meter
Tracer trace.Tracer
Expand Down Expand Up @@ -168,10 +170,10 @@ func WithLogger(logger *zap.Logger) Option {
}
}

// WithInMemoryFS defines whether aggregator uses in-memory file system.
func WithInMemoryFS(enabled bool) Option {
// WithInMemory defines whether aggregator uses in-memory file system.
func WithInMemory(enabled bool) Option {
return func(c Config) Config {
c.InMemoryFS = enabled
c.InMemory = enabled
return c
}
}
Expand All @@ -186,6 +188,7 @@ func defaultCfg() Config {
Tracer: otel.Tracer(instrumentationName),
CombinedMetricsIDToKVs: func(_ [16]byte) []attribute.KeyValue { return nil },
Logger: zap.Must(zap.NewDevelopment()),
WriteOpts: pebble.Sync,
}
}

Expand Down Expand Up @@ -225,6 +228,10 @@ func validateCfg(cfg Config) error {
if highest > 18*time.Hour {
return errors.New("aggregation interval greater than 18 hours is not supported")
}

if cfg.WriteOpts != pebble.Sync && cfg.WriteOpts != pebble.NoSync {
return errors.New("write opts should be pebble.Sync or pebble.NoSync")
}
return nil
}

Expand Down

0 comments on commit e2df969

Please sign in to comment.