diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 1210c1053..5e93c2102 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -73,7 +73,13 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto } } + count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket) + if err != nil { + return fmt.Errorf("counting ledgers in bucket: %w", err) + } + ret = d.ledgerStoreFactory.Create(b, *l) + ret.SetAloneInBucket(count == 1) return nil }) @@ -86,12 +92,22 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) { // todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory - ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name) + // NOTE: the aloneInBucket flag is now shared per bucket via the Factory, + // so all stores in the same bucket see updates immediately. + systemStore := d.systemStoreFactory.Create(d.db) + + ret, err := systemStore.GetLedger(ctx, name) if err != nil { return nil, nil, err } + count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket) + if err != nil { + return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err) + } + store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret) + store.SetAloneInBucket(count == 1) return store, ret, err } diff --git a/internal/storage/driver/store.go b/internal/storage/driver/store.go index d7966f37d..9da6c08f4 100644 --- a/internal/storage/driver/store.go +++ b/internal/storage/driver/store.go @@ -16,6 +16,7 @@ type SystemStore interface { //ListLedgers(ctx context.Context, q systemstore.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucket string) (int, error) Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator diff --git a/internal/storage/driver/store_generated_test.go b/internal/storage/driver/store_generated_test.go index 2a76e6fd1..3f9b72e3d 100644 --- a/internal/storage/driver/store_generated_test.go +++ b/internal/storage/driver/store_generated_test.go @@ -41,6 +41,21 @@ func (m *MockSystemStore) EXPECT() *MockSystemStoreMockRecorder { return m.recorder } +// CountLedgersInBucket mocks base method. +func (m *MockSystemStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountLedgersInBucket", ctx, bucket) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountLedgersInBucket indicates an expected call of CountLedgersInBucket. +func (mr *MockSystemStoreMockRecorder) CountLedgersInBucket(ctx, bucket any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountLedgersInBucket", reflect.TypeOf((*MockSystemStore)(nil).CountLedgersInBucket), ctx, bucket) +} + // CreateLedger mocks base method. func (m *MockSystemStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { m.ctrl.T.Helper() diff --git a/internal/storage/ledger/factory.go b/internal/storage/ledger/factory.go index d96c4e17f..e8dbe5ba2 100644 --- a/internal/storage/ledger/factory.go +++ b/internal/storage/ledger/factory.go @@ -1,6 +1,9 @@ package ledger import ( + "sync" + "sync/atomic" + ledger "github.com/formancehq/ledger/internal" "github.com/formancehq/ledger/internal/storage/bucket" "github.com/uptrace/bun" @@ -13,15 +16,30 @@ type Factory interface { type DefaultFactory struct { db *bun.DB options []Option + + mu sync.Mutex + bucketFlags map[string]*atomic.Bool } func NewFactory(db *bun.DB, options ...Option) *DefaultFactory { return &DefaultFactory{ - db: db, - options: options, + db: db, + options: options, + bucketFlags: make(map[string]*atomic.Bool), } } func (d *DefaultFactory) Create(b bucket.Bucket, l ledger.Ledger) *Store { - return New(d.db, b, l, d.options...) + d.mu.Lock() + flag, ok := d.bucketFlags[l.Bucket] + if !ok { + flag = &atomic.Bool{} + d.bucketFlags[l.Bucket] = flag + } + d.mu.Unlock() + + store := New(d.db, b, l, d.options...) + store.aloneInBucket = flag + + return store } diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index 994be1181..d831213f0 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -4,6 +4,8 @@ import ( "context" "database/sql" "fmt" + "sync/atomic" + "github.com/formancehq/go-libs/v3/bun/bunpaginate" "github.com/formancehq/go-libs/v3/migrations" "github.com/formancehq/go-libs/v3/platform/postgres" @@ -24,6 +26,12 @@ type Store struct { bucket bucket.Bucket ledger ledger.Ledger + // aloneInBucket is a shared optimization hint (per bucket) indicating whether + // this ledger is the only one in its bucket. The pointer is shared across all + // stores in the same bucket via the Factory, so updating it from any store + // (e.g. when a new ledger is created) immediately affects all stores. + aloneInBucket *atomic.Bool + tracer trace.Tracer meter metric.Meter checkBucketSchemaHistogram metric.Int64Histogram @@ -164,26 +172,23 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err } // newScopedSelect creates a new select query scoped to the current ledger. -// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan. -// To avoid that, we use a WHERE OR to separate the two cases: -// 1. Check if the ledger is the only one in the bucket -// 2. Otherwise, filter by ledger name +// When the ledger is alone in its bucket, we skip the WHERE clause to avoid +// a degraded seq scan plan (selectivity ~100%). Otherwise, we filter by ledger +// name to use the composite index (ledger, id) efficiently. +// +// This relies on aloneInBucket being up to date (shared across the bucket). func (store *Store) newScopedSelect() *bun.SelectQuery { q := store.db.NewSelect() - checkLedgerAlone := store.db.NewSelect(). - TableExpr("_system.ledgers"). - ColumnExpr("count = 1"). - Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket", - store.db.NewSelect(). - TableExpr("_system.ledgers"). - ColumnExpr("bucket"). - ColumnExpr("COUNT(*) AS count"). - Group("bucket"), - ). - Where("_system.ledgers.name = ?", store.ledger.Name) - - return q. - Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name) + if store.aloneInBucket == nil || !store.aloneInBucket.Load() { + q = q.Where("ledger = ?", store.ledger.Name) + } + return q +} + +func (store *Store) SetAloneInBucket(alone bool) { + if store.aloneInBucket != nil { + store.aloneInBucket.Store(alone) + } } func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store { diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index bba206dfd..82ae138de 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -23,6 +23,7 @@ type Store interface { Ledgers() common.PaginatedResource[ledger.Ledger, ListLedgersQueryPayload] GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucket string) (int, error) Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator @@ -60,6 +61,17 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error) return buckets, nil } +func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) { + count, err := d.db.NewSelect(). + Model(&ledger.Ledger{}). + Where("bucket = ?", bucket). + Count(ctx) + if err != nil { + return 0, postgres.ResolveError(err) + } + return count, nil +} + func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { if l.Metadata == nil {