diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 1210c1053..92233139c 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -75,6 +75,12 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto ret = d.ledgerStoreFactory.Create(b, *l) + count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket) + if err != nil { + return fmt.Errorf("counting ledgers in bucket: %w", err) + } + ret.SetAloneInBucket(count == 1) + return nil }) if err != nil { @@ -86,13 +92,24 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) { // todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory - ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name) + // NOTE: if the store is ever cached, the isAloneInBucket flag must be + // refreshed/invalidated when bucket membership changes, otherwise + // cross-ledger reads may occur (missing WHERE ledger = ?). + systemStore := d.systemStoreFactory.Create(d.db) + + ret, err := systemStore.GetLedger(ctx, name) if err != nil { return nil, nil, err } store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret) + count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket) + if err != nil { + return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err) + } + store.SetAloneInBucket(count == 1) + return store, ret, err } diff --git a/internal/storage/driver/store.go b/internal/storage/driver/store.go index d7966f37d..9da6c08f4 100644 --- a/internal/storage/driver/store.go +++ b/internal/storage/driver/store.go @@ -16,6 +16,7 @@ type SystemStore interface { //ListLedgers(ctx context.Context, q systemstore.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucket string) (int, error) Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator diff --git a/internal/storage/driver/store_generated_test.go b/internal/storage/driver/store_generated_test.go index 2a76e6fd1..3f9b72e3d 100644 --- a/internal/storage/driver/store_generated_test.go +++ b/internal/storage/driver/store_generated_test.go @@ -41,6 +41,21 @@ func (m *MockSystemStore) EXPECT() *MockSystemStoreMockRecorder { return m.recorder } +// CountLedgersInBucket mocks base method. +func (m *MockSystemStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CountLedgersInBucket", ctx, bucket) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountLedgersInBucket indicates an expected call of CountLedgersInBucket. +func (mr *MockSystemStoreMockRecorder) CountLedgersInBucket(ctx, bucket any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountLedgersInBucket", reflect.TypeOf((*MockSystemStore)(nil).CountLedgersInBucket), ctx, bucket) +} + // CreateLedger mocks base method. func (m *MockSystemStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error { m.ctrl.T.Helper() diff --git a/internal/storage/ledger/store.go b/internal/storage/ledger/store.go index 994be1181..c9b2ff814 100644 --- a/internal/storage/ledger/store.go +++ b/internal/storage/ledger/store.go @@ -24,6 +24,13 @@ type Store struct { bucket bucket.Bucket ledger ledger.Ledger + // isAloneInBucket is a point-in-time optimization hint set when the store + // is created or opened. It indicates whether the ledger is the only one in + // its bucket, allowing the scoped select to skip the WHERE ledger = ? clause. + // Must be refreshed if bucket membership changes. Dangerous if the store is + // cached without proper invalidation. + isAloneInBucket bool + tracer trace.Tracer meter metric.Meter checkBucketSchemaHistogram metric.Int64Histogram @@ -165,25 +172,17 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err // newScopedSelect creates a new select query scoped to the current ledger. // notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan. -// To avoid that, we use a WHERE OR to separate the two cases: -// 1. Check if the ledger is the only one in the bucket -// 2. Otherwise, filter by ledger name +// When the ledger is the only one in its bucket, we skip the clause entirely. func (store *Store) newScopedSelect() *bun.SelectQuery { q := store.db.NewSelect() - checkLedgerAlone := store.db.NewSelect(). - TableExpr("_system.ledgers"). - ColumnExpr("count = 1"). - Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket", - store.db.NewSelect(). - TableExpr("_system.ledgers"). - ColumnExpr("bucket"). - ColumnExpr("COUNT(*) AS count"). - Group("bucket"), - ). - Where("_system.ledgers.name = ?", store.ledger.Name) - - return q. - Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name) + if !store.isAloneInBucket { + q = q.Where("ledger = ?", store.ledger.Name) + } + return q +} + +func (store *Store) SetAloneInBucket(alone bool) { + store.isAloneInBucket = alone } func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store { diff --git a/internal/storage/system/store.go b/internal/storage/system/store.go index bba206dfd..21dfded9e 100644 --- a/internal/storage/system/store.go +++ b/internal/storage/system/store.go @@ -23,6 +23,7 @@ type Store interface { Ledgers() common.PaginatedResource[ledger.Ledger, ListLedgersQueryPayload] GetLedger(ctx context.Context, name string) (*ledger.Ledger, error) GetDistinctBuckets(ctx context.Context) ([]string, error) + CountLedgersInBucket(ctx context.Context, bucket string) (int, error) Migrate(ctx context.Context, options ...migrations.Option) error GetMigrator(options ...migrations.Option) *migrations.Migrator @@ -117,6 +118,17 @@ func (d *DefaultStore) GetLedger(ctx context.Context, name string) (*ledger.Ledg return ret, nil } +func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) { + count, err := d.db.NewSelect(). + TableExpr("_system.ledgers"). + Where("bucket = ?", bucket). + Count(ctx) + if err != nil { + return 0, postgres.ResolveError(err) + } + return count, nil +} + func (d *DefaultStore) Migrate(ctx context.Context, options ...migrations.Option) error { _, err := tracing.Trace(ctx, d.tracer, "MigrateSystemStore", func(ctx context.Context) (any, error) { return nil, d.GetMigrator(options...).Up(ctx)