Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
}
}

count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket)
if err != nil {
return fmt.Errorf("counting ledgers in bucket: %w", err)
}

ret = d.ledgerStoreFactory.Create(b, *l)
ret.SetAloneInBucket(count == 1)
Comment on lines +78 to +84
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

TOCTOU race: concurrent CreateLedger calls for the same bucket can leave aloneInBucket = true with 2 ledgers.

Under PostgreSQL READ COMMITTED (the default with nil tx options on Line 44), each transaction's COUNT only sees rows committed before that statement began plus its own inserts. When two CreateLedger calls for the same bucket overlap, both can observe count == 1 (each seeing only its own uncommitted row), leading both to call SetAloneInBucket(true). After both commit, the bucket has 2 ledgers but the shared flag is true, causing newScopedSelect to omit the WHERE ledger = ? predicate and potentially leaking cross-ledger data.

This is the exact class of bug described in the PR root-cause. The flag self-corrects on the next OpenLedger, but the window could be significant if stores are cached.

Simplest safe fix: always set aloneInBucket = false inside CreateLedger (since creating a ledger can only increase the count), and rely on OpenLedger to promote the flag to true when appropriate.

Suggested fix
-		count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket)
-		if err != nil {
-			return fmt.Errorf("counting ledgers in bucket: %w", err)
-		}
-
 		ret = d.ledgerStoreFactory.Create(b, *l)
-		ret.SetAloneInBucket(count == 1)
+		// Always conservative: a concurrent create may not yet be visible.
+		// OpenLedger will promote to true when the count is actually 1.
+		ret.SetAloneInBucket(false)

 		return nil

Alternatively, move the count + set after the transaction commits so the read sees all committed rows:

Alternative fix — count after commit
 	if err != nil {
 		return nil, postgres.ResolveError(err)
 	}

+	count, err := d.systemStoreFactory.Create(d.db).CountLedgersInBucket(ctx, l.Bucket)
+	if err != nil {
+		return nil, fmt.Errorf("counting ledgers in bucket: %w", err)
+	}
+	ret.SetAloneInBucket(count == 1)
+
 	return ret, nil
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket)
if err != nil {
return fmt.Errorf("counting ledgers in bucket: %w", err)
}
ret = d.ledgerStoreFactory.Create(b, *l)
ret.SetAloneInBucket(count == 1)
ret = d.ledgerStoreFactory.Create(b, *l)
// Always conservative: a concurrent create may not yet be visible.
// OpenLedger will promote to true when the count is actually 1.
ret.SetAloneInBucket(false)
return nil
🤖 Prompt for AI Agents
In `@internal/storage/driver/driver.go` around lines 78 - 84, The CreateLedger
path currently uses systemStore.CountLedgersInBucket and may incorrectly set
ret.SetAloneInBucket(count == 1) under READ COMMITTED; change CreateLedger to
always call ret.SetAloneInBucket(false) (since creating a ledger cannot make it
alone) and remove/ignore the COUNT-based logic here; rely on OpenLedger to
compute and promote aloneInBucket=true when it can safely observe committed
state, or alternatively move the COUNT call to after the transaction commit if
you prefer the other approach.


return nil
})
Expand All @@ -88,12 +94,22 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto

func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
// todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory
ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name)
// NOTE: the aloneInBucket flag is now shared per bucket via the Factory,
// so all stores in the same bucket see updates immediately.
systemStore := d.systemStoreFactory.Create(d.db)

ret, err := systemStore.GetLedger(ctx, name)
if err != nil {
return nil, nil, err
}

count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket)
if err != nil {
return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err)
}

store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
store.SetAloneInBucket(count == 1)

return store, ret, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/storage/driver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type SystemStore interface {
//ListLedgers(ctx context.Context, q systemstore.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error)
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
GetDistinctBuckets(ctx context.Context) ([]string, error)
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)

Migrate(ctx context.Context, options ...migrations.Option) error
GetMigrator(options ...migrations.Option) *migrations.Migrator
Expand Down
15 changes: 15 additions & 0 deletions internal/storage/driver/store_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 21 additions & 3 deletions internal/storage/ledger/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package ledger

import (
"sync"
"sync/atomic"

"github.com/uptrace/bun"

ledger "github.com/formancehq/ledger/internal"
Expand All @@ -14,15 +17,30 @@ type Factory interface {
type DefaultFactory struct {
db *bun.DB
options []Option

mu sync.Mutex
bucketFlags map[string]*atomic.Bool
}

func NewFactory(db *bun.DB, options ...Option) *DefaultFactory {
return &DefaultFactory{
db: db,
options: options,
db: db,
options: options,
bucketFlags: make(map[string]*atomic.Bool),
}
}

func (d *DefaultFactory) Create(b bucket.Bucket, l ledger.Ledger) *Store {
return New(d.db, b, l, d.options...)
d.mu.Lock()
flag, ok := d.bucketFlags[l.Bucket]
if !ok {
flag = &atomic.Bool{}
d.bucketFlags[l.Bucket] = flag
}
d.mu.Unlock()

store := New(d.db, b, l, d.options...)
store.aloneInBucket = flag

return store
}
40 changes: 22 additions & 18 deletions internal/storage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"sync/atomic"

"github.com/uptrace/bun"
"go.opentelemetry.io/otel/metric"
Expand All @@ -27,6 +28,12 @@ type Store struct {
bucket bucket.Bucket
ledger ledger.Ledger

// aloneInBucket is a shared optimization hint (per bucket) indicating whether
// this ledger is the only one in its bucket. The pointer is shared across all
// stores in the same bucket via the Factory, so updating it from any store
// (e.g. when a new ledger is created) immediately affects all stores.
aloneInBucket *atomic.Bool

tracer trace.Tracer
meter metric.Meter
checkBucketSchemaHistogram metric.Int64Histogram
Expand Down Expand Up @@ -187,26 +194,23 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err
}

// newScopedSelect creates a new select query scoped to the current ledger.
// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan.
// To avoid that, we use a WHERE OR to separate the two cases:
// 1. Check if the ledger is the only one in the bucket
// 2. Otherwise, filter by ledger name
// When the ledger is alone in its bucket, we skip the WHERE clause to avoid
// a degraded seq scan plan (selectivity ~100%). Otherwise, we filter by ledger
// name to use the composite index (ledger, id) efficiently.
//
// This relies on aloneInBucket being up to date (shared across the bucket).
func (store *Store) newScopedSelect() *bun.SelectQuery {
q := store.db.NewSelect()
checkLedgerAlone := store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("count = 1").
Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket",
store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("bucket").
ColumnExpr("COUNT(*) AS count").
Group("bucket"),
).
Where("_system.ledgers.name = ?", store.ledger.Name)

return q.
Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name)
if store.aloneInBucket == nil || !store.aloneInBucket.Load() {
q = q.Where("ledger = ?", store.ledger.Name)
}
return q
}

func (store *Store) SetAloneInBucket(alone bool) {
if store.aloneInBucket != nil {
store.aloneInBucket.Store(alone)
}
}

func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {
Expand Down
12 changes: 12 additions & 0 deletions internal/storage/system/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Store interface {
Ledgers() common.PaginatedResource[ledger.Ledger, ListLedgersQueryPayload]
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
GetDistinctBuckets(ctx context.Context) ([]string, error)
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)
DeleteBucket(ctx context.Context, bucket string) error
RestoreBucket(ctx context.Context, bucket string) error
GetDeletedBucketsOlderThan(ctx context.Context, olderThan time.Time) ([]string, error)
Expand Down Expand Up @@ -69,6 +70,17 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error)
return buckets, nil
}

func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) {
count, err := d.db.NewSelect().
Model(&ledger.Ledger{}).
Where("bucket = ?", bucket).
Count(ctx)
if err != nil {
return 0, postgres.ResolveError(err)
}
return count, nil
}

func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error {

if l.Metadata == nil {
Expand Down
Loading