Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
}
}

count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket)
if err != nil {
return fmt.Errorf("counting ledgers in bucket: %w", err)
}

ret = d.ledgerStoreFactory.Create(b, *l)
ret.SetAloneInBucket(count == 1)

return nil
})
Expand All @@ -86,12 +92,22 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto

func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
// todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory
ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name)
// NOTE: the aloneInBucket flag is now shared per bucket via the Factory,
// so all stores in the same bucket see updates immediately.
systemStore := d.systemStoreFactory.Create(d.db)

ret, err := systemStore.GetLedger(ctx, name)
if err != nil {
return nil, nil, err
}

count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket)
if err != nil {
return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err)
}

store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
store.SetAloneInBucket(count == 1)

return store, ret, err
Comment on lines 93 to 112
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

TOCTOU race: OpenLedger can overwrite a concurrent CreateLedger's flag update.

OpenLedger reads the count outside a transaction and then calls SetAloneInBucket(count == 1). If a concurrent CreateLedger commits between the count read and the Store call, the sequence can be:

  1. OpenLedger reads count=1 (2nd ledger not yet committed)
  2. CreateLedger commits → sets shared flag to false
  3. OpenLedger calls SetAloneInBucket(true) → overwrites back to true

All stores in that bucket now incorrectly skip WHERE ledger = ?, potentially returning data from the wrong ledger.

Consider making the flag transition one-directional: SetAloneInBucket should only allow true → false, never false → true, or use CompareAndSwap. The false → true transition should only happen in a controlled path (e.g., ledger deletion, if applicable), not in OpenLedger.

Proposed fix in store.go
 func (store *Store) SetAloneInBucket(alone bool) {
 	if store.aloneInBucket != nil {
-		store.aloneInBucket.Store(alone)
+		if alone {
+			// Only upgrade from false→true is unsafe from OpenLedger;
+			// use CAS to avoid overwriting a concurrent false.
+			store.aloneInBucket.CompareAndSwap(false, true)
+		} else {
+			store.aloneInBucket.Store(false)
+		}
 	}
 }

Actually, even the CAS above doesn't fully resolve this—it still races. A safer approach: OpenLedger should never set the flag to true; it should only set it to false when count > 1. Leave the true initialization to CreateLedger (which runs transactionally) or to the factory's lazy init.

Alternative: only propagate false from OpenLedger
 	store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
-	store.SetAloneInBucket(count == 1)
+	if count > 1 {
+		store.SetAloneInBucket(false)
+	}
 
 	return store, ret, err
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
// todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory
ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name)
// NOTE: the aloneInBucket flag is now shared per bucket via the Factory,
// so all stores in the same bucket see updates immediately.
systemStore := d.systemStoreFactory.Create(d.db)
ret, err := systemStore.GetLedger(ctx, name)
if err != nil {
return nil, nil, err
}
count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket)
if err != nil {
return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err)
}
store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
store.SetAloneInBucket(count == 1)
return store, ret, err
func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
// todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory
// NOTE: the aloneInBucket flag is now shared per bucket via the Factory,
// so all stores in the same bucket see updates immediately.
systemStore := d.systemStoreFactory.Create(d.db)
ret, err := systemStore.GetLedger(ctx, name)
if err != nil {
return nil, nil, err
}
count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket)
if err != nil {
return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err)
}
store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
if count > 1 {
store.SetAloneInBucket(false)
}
return store, ret, err
}
🤖 Prompt for AI Agents
In `@internal/storage/driver/driver.go` around lines 93 - 112, OpenLedger
currently reads the ledger count then calls store.SetAloneInBucket(count == 1),
which can race with concurrent CreateLedger and mistakenly set the shared alone
flag back to true; change OpenLedger to never set the flag to true — only clear
it when you observe count > 1 (i.e., call SetAloneInBucket(false) when count >
1) or rely on CreateLedger/factory to initialize true; alternatively make
SetAloneInBucket one-directional (only allow true→false, never false→true) so
OpenLedger cannot revert a concurrent false to true; update references in
OpenLedger and ensure CreateLedger remains the only path that can set true.

}
Expand Down
1 change: 1 addition & 0 deletions internal/storage/driver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type SystemStore interface {
//ListLedgers(ctx context.Context, q systemstore.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error)
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
GetDistinctBuckets(ctx context.Context) ([]string, error)
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)

Migrate(ctx context.Context, options ...migrations.Option) error
GetMigrator(options ...migrations.Option) *migrations.Migrator
Expand Down
15 changes: 15 additions & 0 deletions internal/storage/driver/store_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 21 additions & 3 deletions internal/storage/ledger/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package ledger

import (
"sync"
"sync/atomic"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/uptrace/bun"
Expand All @@ -13,15 +16,30 @@ type Factory interface {
type DefaultFactory struct {
db *bun.DB
options []Option

mu sync.Mutex
bucketFlags map[string]*atomic.Bool
}

func NewFactory(db *bun.DB, options ...Option) *DefaultFactory {
return &DefaultFactory{
db: db,
options: options,
db: db,
options: options,
bucketFlags: make(map[string]*atomic.Bool),
}
}

func (d *DefaultFactory) Create(b bucket.Bucket, l ledger.Ledger) *Store {
return New(d.db, b, l, d.options...)
d.mu.Lock()
flag, ok := d.bucketFlags[l.Bucket]
if !ok {
flag = &atomic.Bool{}
d.bucketFlags[l.Bucket] = flag
}
d.mu.Unlock()

store := New(d.db, b, l, d.options...)
store.aloneInBucket = flag

return store
}
41 changes: 23 additions & 18 deletions internal/storage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"database/sql"
"fmt"
"sync/atomic"

"github.com/formancehq/go-libs/v3/bun/bunpaginate"
"github.com/formancehq/go-libs/v3/migrations"
"github.com/formancehq/go-libs/v3/platform/postgres"
Expand All @@ -24,6 +26,12 @@ type Store struct {
bucket bucket.Bucket
ledger ledger.Ledger

// aloneInBucket is a shared optimization hint (per bucket) indicating whether
// this ledger is the only one in its bucket. The pointer is shared across all
// stores in the same bucket via the Factory, so updating it from any store
// (e.g. when a new ledger is created) immediately affects all stores.
aloneInBucket *atomic.Bool

tracer trace.Tracer
meter metric.Meter
checkBucketSchemaHistogram metric.Int64Histogram
Expand Down Expand Up @@ -164,26 +172,23 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err
}

// newScopedSelect creates a new select query scoped to the current ledger.
// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan.
// To avoid that, we use a WHERE OR to separate the two cases:
// 1. Check if the ledger is the only one in the bucket
// 2. Otherwise, filter by ledger name
// When the ledger is alone in its bucket, we skip the WHERE clause to avoid
// a degraded seq scan plan (selectivity ~100%). Otherwise, we filter by ledger
// name to use the composite index (ledger, id) efficiently.
//
// This relies on aloneInBucket being up to date (shared across the bucket).
func (store *Store) newScopedSelect() *bun.SelectQuery {
q := store.db.NewSelect()
checkLedgerAlone := store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("count = 1").
Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket",
store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("bucket").
ColumnExpr("COUNT(*) AS count").
Group("bucket"),
).
Where("_system.ledgers.name = ?", store.ledger.Name)

return q.
Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name)
if store.aloneInBucket == nil || !store.aloneInBucket.Load() {
q = q.Where("ledger = ?", store.ledger.Name)
}
return q
}

func (store *Store) SetAloneInBucket(alone bool) {
if store.aloneInBucket != nil {
store.aloneInBucket.Store(alone)
}
}

func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {
Expand Down
12 changes: 12 additions & 0 deletions internal/storage/system/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Store interface {
Ledgers() common.PaginatedResource[ledger.Ledger, ListLedgersQueryPayload]
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
GetDistinctBuckets(ctx context.Context) ([]string, error)
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)

Migrate(ctx context.Context, options ...migrations.Option) error
GetMigrator(options ...migrations.Option) *migrations.Migrator
Expand Down Expand Up @@ -60,6 +61,17 @@ func (d *DefaultStore) GetDistinctBuckets(ctx context.Context) ([]string, error)
return buckets, nil
}

func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) {
count, err := d.db.NewSelect().
Model(&ledger.Ledger{}).
Where("bucket = ?", bucket).
Count(ctx)
if err != nil {
return 0, postgres.ResolveError(err)
}
return count, nil
}

func (d *DefaultStore) CreateLedger(ctx context.Context, l *ledger.Ledger) error {

if l.Metadata == nil {
Expand Down
Loading