From 5b29bdf24807927084b645427124acfc0daad721 Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Fri, 6 Feb 2026 14:02:44 +0100 Subject: [PATCH] fix(storage): share aloneInBucket flag per bucket via atomic.Bool in Factory The previous subquery-based approach in newScopedSelect executed a COUNT(*) on _system.ledgers for every scoped query. Replace it with a cached *atomic.Bool per bucket, shared across all stores through the DefaultFactory. When a new ledger is created, SetAloneInBucket(false) immediately propagates to every store in that bucket, ensuring the WHERE ledger=? predicate is never incorrectly skipped. Also adds CountLedgersInBucket to the system store interface so CreateLedger and OpenLedger can seed the flag correctly. --- internal/storage/driver/driver.go | 18 ++++++++- internal/storage/driver/store.go | 1 + .../storage/driver/store_generated_test.go | 15 +++++++ internal/storage/ledger/factory.go | 24 +++++++++-- internal/storage/ledger/store.go | 40 ++++++++++--------- internal/storage/system/store.go | 12 ++++++ 6 files changed, 88 insertions(+), 22 deletions(-) diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index c8803a273..666ac85d9 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -75,7 +75,13 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto } } + count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket) + if err != nil { + return fmt.Errorf("counting ledgers in bucket: %w", err) + } + ret = d.ledgerStoreFactory.Create(b, *l) + ret.SetAloneInBucket(count == 1) return nil }) @@ -88,12 +94,22 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) { // todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory - ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name) + // NOTE: the aloneInBucket flag is now shared per bucket via the Factory, + // so all stores in the same bucket see updates immediately. + systemStore := d.systemStoreFactory.Create(d.db) + + ret, err := systemStore.GetLedger(ctx, name) if err != nil { return nil, nil, err } + count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket) + if err != nil { + return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err) + } + store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret) + store.SetAloneInBucket(count == 1) return store, ret, err } diff --git a/internal/storage/driver/store.go b/internal/storage/driver/store.go index 6adfd1d55..a52d594e7 100644 --- a/internal/storage/driver/store.go +++ b/internal/storage/driver/store.go @@ -18,6 +18,7 @@ type SystemStore interface { //ListLedgers(ctx context.Context, q systemstore.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucket string) (int, error) Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator diff --git a/internal/storage/driver/store_generated_test.go b/internal/storage/driver/store_generated_test.go index 2a76e6fd1..3f9b72e3d 100644 --- a/internal/storage/driver/store_generated_test.go +++ b/internal/storage/driver/store_generated_test.go @@ -41,6 +41,21 @@ func (m *MockSystemStore) EXPECT() *MockSystemStoreMockRecorder { return m.recorder } +// CountLedgersInBucket mocks base method. +func (m *MockSystemStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountLedgersInBucket", ctx, bucket) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountLedgersInBucket indicates an expected call of CountLedgersInBucket. +func (mr *MockSystemStoreMockRecorder) CountLedgersInBucket(ctx, bucket any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountLedgersInBucket", reflect.TypeOf((*MockSystemStore)(nil).CountLedgersInBucket), ctx, bucket) +} + // CreateLedger mocks base method. func (m *MockSystemStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { m.ctrl.T.Helper() diff --git a/internal/storage/ledger/factory.go b/internal/storage/ledger/factory.go index 806def387..9ae8b01e6 100644 --- a/internal/storage/ledger/factory.go +++ b/internal/storage/ledger/factory.go @@ -1,6 +1,9 @@ package ledger import ( + "sync" + "sync/atomic" + "github.com/uptrace/bun" ledger "github.com/formancehq/ledger/internal" @@ -14,15 +17,30 @@ type Factory interface { type DefaultFactory struct { db *bun.DB options []Option + + mu sync.Mutex + bucketFlags map[string]*atomic.Bool } func NewFactory(db *bun.DB, options ...Option) *DefaultFactory { return &DefaultFactory{ - db: db, - options: options, + db: db, + options: options, + bucketFlags: make(map[string]*atomic.Bool), } } func (d *DefaultFactory) Create(b bucket.Bucket, l ledger.Ledger) *Store { - return New(d.db, b, l, d.options...) + d.mu.Lock() + flag, ok := d.bucketFlags[l.Bucket] + if !ok { + flag = &atomic.Bool{} + d.bucketFlags[l.Bucket] = flag + } + d.mu.Unlock() + + store := New(d.db, b, l, d.options...) + store.aloneInBucket = flag + + return store } diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index faf6b1a68..a6b32e9f2 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "sync/atomic" "github.com/uptrace/bun" "go.opentelemetry.io/otel/metric" @@ -27,6 +28,12 @@ type Store struct { bucket bucket.Bucket ledger ledger.Ledger + // aloneInBucket is a shared optimization hint (per bucket) indicating whether + // this ledger is the only one in its bucket. The pointer is shared across all + // stores in the same bucket via the Factory, so updating it from any store + // (e.g. when a new ledger is created) immediately affects all stores. + aloneInBucket *atomic.Bool + tracer trace.Tracer meter metric.Meter checkBucketSchemaHistogram metric.Int64Histogram @@ -187,26 +194,23 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err } // newScopedSelect creates a new select query scoped to the current ledger. -// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan. -// To avoid that, we use a WHERE OR to separate the two cases: -// 1. Check if the ledger is the only one in the bucket -// 2. Otherwise, filter by ledger name +// When the ledger is alone in its bucket, we skip the WHERE clause to avoid +// a degraded seq scan plan (selectivity ~100%). Otherwise, we filter by ledger +// name to use the composite index (ledger, id) efficiently. +// +// This relies on aloneInBucket being up to date (shared across the bucket). func (store *Store) newScopedSelect() *bun.SelectQuery { q := store.db.NewSelect() - checkLedgerAlone := store.db.NewSelect(). - TableExpr("_system.ledgers"). - ColumnExpr("count = 1"). - Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket", - store.db.NewSelect(). - TableExpr("_system.ledgers"). - ColumnExpr("bucket"). - ColumnExpr("COUNT(*) AS count"). - Group("bucket"), - ). - Where("_system.ledgers.name = ?", store.ledger.Name) - - return q. - Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name) + if store.aloneInBucket == nil || !store.aloneInBucket.Load() { + q = q.Where("ledger = ?", store.ledger.Name) + } + return q +} + +func (store *Store) SetAloneInBucket(alone bool) { + if store.aloneInBucket != nil { + store.aloneInBucket.Store(alone) + } } func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store { diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index f24ab545f..b752c7657 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -28,6 +28,7 @@ type Store interface { Ledgers() common.PaginatedResource[ledger.Ledger, ListLedgersQueryPayload] GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucket string) (int, error) DeleteBucket(ctx context.Context, bucket string) error RestoreBucket(ctx context.Context, bucket string) error GetDeletedBucketsOlderThan(ctx context.Context, olderThan time.Time) ([]string, error) @@ -69,6 +70,17 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error) return buckets, nil } +func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) { + count, err := d.db.NewSelect(). + Model(&ledger.Ledger{}). + Where("bucket = ?", bucket). + Count(ctx) + if err != nil { + return 0, postgres.ResolveError(err) + } + return count, nil +} + func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { if l.Metadata == nil {