Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto

ret = d.ledgerStoreFactory.Create(b, *l)

count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket)
if err != nil {
return fmt.Errorf("counting ledgers in bucket: %w", err)
}
ret.SetAloneInBucket(count == 1)

return nil
})
if err != nil {
Expand All @@ -86,13 +92,24 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto

func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
// todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory
ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name)
// NOTE: if the store is ever cached, the isAloneInBucket flag must be
// refreshed/invalidated when bucket membership changes, otherwise
// cross-ledger reads may occur (missing WHERE ledger = ?).
systemStore := d.systemStoreFactory.Create(d.db)

ret, err := systemStore.GetLedger(ctx, name)
if err != nil {
return nil, nil, err
}

store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)

count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket)
if err != nil {
return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err)
}
store.SetAloneInBucket(count == 1)

return store, ret, err
Comment on lines 93 to 113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Staleness risk: isAloneInBucket is never refreshed for an already-opened store.

The comment on Lines 95-97 correctly warns about cache invalidation, but the risk goes beyond caching. If the store returned by OpenLedger is held open (even transiently) while a concurrent CreateLedger adds a second ledger to the same bucket, the first store will silently skip the WHERE ledger = ? filter, leaking rows from the new ledger.

For the CreateLedger path this is safe (transactional), but OpenLedger is outside any transaction and the flag is set-once. Consider:

  1. Wrapping GetLedger + CountLedgersInBucket in a read-only transaction (or snapshot) so the count is at least consistent with the ledger lookup.
  2. Documenting that callers must not reuse the returned *Store across requests when bucket membership may change, or adding a per-request refresh.

This is flagged for awareness — the window is narrow but the impact (cross-ledger data leakage) is high if it occurs.

🤖 Prompt for AI Agents
In `@internal/storage/driver/driver.go` around lines 93 - 113, OpenLedger
currently calls systemStore.GetLedger and systemStore.CountLedgersInBucket
separately which can race with concurrent CreateLedger and leave an already-open
Store with a stale isAloneInBucket flag; fix by performing the ledger lookup and
bucket count in a single read-only transaction/snapshot (use whatever
transaction/snapshot API your DB/systemStore exposes) so the
CountLedgersInBucket result is consistent with GetLedger, then call
store.SetAloneInBucket based on that transactional count; alternatively, if a
transactional snapshot isn't available, document that callers must not reuse the
returned *ledgerstore.Store across requests and add a per-request refresh path
that re-evaluates CountLedgersInBucket before critical reads (reference:
OpenLedger, systemStore.GetLedger, systemStore.CountLedgersInBucket,
store.SetAloneInBucket).

}

Expand Down
1 change: 1 addition & 0 deletions internal/storage/driver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type SystemStore interface {
//ListLedgers(ctx context.Context, q systemstore.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error)
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
GetDistinctBuckets(ctx context.Context) ([]string, error)
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)

Migrate(ctx context.Context, options ...migrations.Option) error
GetMigrator(options ...migrations.Option) *migrations.Migrator
Expand Down
15 changes: 15 additions & 0 deletions internal/storage/driver/store_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 16 additions & 17 deletions internal/storage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type Store struct {
bucket bucket.Bucket
ledger ledger.Ledger

// isAloneInBucket is a point-in-time optimization hint set when the store
// is created or opened. It indicates whether the ledger is the only one in
// its bucket, allowing the scoped select to skip the WHERE ledger = ? clause.
// Must be refreshed if bucket membership changes. Dangerous if the store is
// cached without proper invalidation.
isAloneInBucket bool

tracer trace.Tracer
meter metric.Meter
checkBucketSchemaHistogram metric.Int64Histogram
Expand Down Expand Up @@ -165,25 +172,17 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err

// newScopedSelect creates a new select query scoped to the current ledger.
// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan.
// To avoid that, we use a WHERE OR to separate the two cases:
// 1. Check if the ledger is the only one in the bucket
// 2. Otherwise, filter by ledger name
// When the ledger is the only one in its bucket, we skip the clause entirely.
func (store *Store) newScopedSelect() *bun.SelectQuery {
q := store.db.NewSelect()
checkLedgerAlone := store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("count = 1").
Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket",
store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("bucket").
ColumnExpr("COUNT(*) AS count").
Group("bucket"),
).
Where("_system.ledgers.name = ?", store.ledger.Name)

return q.
Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name)
if !store.isAloneInBucket {
q = q.Where("ledger = ?", store.ledger.Name)
}
return q
}

func (store *Store) SetAloneInBucket(alone bool) {
store.isAloneInBucket = alone
}

func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {
Expand Down
12 changes: 12 additions & 0 deletions internal/storage/system/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Store interface {
Ledgers() common.PaginatedResource[ledger.Ledger, ListLedgersQueryPayload]
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
GetDistinctBuckets(ctx context.Context) ([]string, error)
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)

Migrate(ctx context.Context, options ...migrations.Option) error
GetMigrator(options ...migrations.Option) *migrations.Migrator
Expand Down Expand Up @@ -117,6 +118,17 @@ func (d *DefaultStore) GetLedger(ctx context.Context, name string) (*ledger.Ledg
return ret, nil
}

func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) {
count, err := d.db.NewSelect().
TableExpr("_system.ledgers").
Where("bucket = ?", bucket).
Count(ctx)
if err != nil {
return 0, postgres.ResolveError(err)
}
return count, nil
}

func (d *DefaultStore) Migrate(ctx context.Context, options ...migrations.Option) error {
_, err := tracing.Trace(ctx, d.tracer, "MigrateSystemStore", func(ctx context.Context) (any, error) {
return nil, d.GetMigrator(options...).Up(ctx)
Expand Down
Loading