Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto
}
}

count, err := systemStore.CountLedgersInBucket(ctx, l.Bucket)
if err != nil {
return fmt.Errorf("counting ledgers in bucket: %w", err)
}

ret = d.ledgerStoreFactory.Create(b, *l)
ret.SetAloneInBucket(count == 1)

return nil
})
Expand All @@ -89,12 +95,22 @@ func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgersto

func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {
// todo: keep the ledger in cache somewhere to avoid read the ledger at each request, maybe in the factory
ret, err := d.systemStoreFactory.Create(d.db).GetLedger(ctx, name)
// NOTE: the aloneInBucket flag is now shared per bucket via the Factory,
// so all stores in the same bucket see updates immediately.
systemStore := d.systemStoreFactory.Create(d.db)

ret, err := systemStore.GetLedger(ctx, name)
if err != nil {
return nil, nil, err
}

count, err := systemStore.CountLedgersInBucket(ctx, ret.Bucket)
if err != nil {
return nil, nil, fmt.Errorf("counting ledgers in bucket: %w", err)
}

store := d.ledgerStoreFactory.Create(d.bucketFactory.Create(ret.Bucket), *ret)
store.SetAloneInBucket(count == 1)

return store, ret, err
}
Expand Down
15 changes: 15 additions & 0 deletions internal/storage/driver/system_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 21 additions & 3 deletions internal/storage/ledger/factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package ledger

import (
"sync"
"sync/atomic"

ledger "github.com/formancehq/ledger/internal"
"github.com/formancehq/ledger/internal/storage/bucket"
"github.com/uptrace/bun"
Expand All @@ -13,15 +16,30 @@ type Factory interface {
type DefaultFactory struct {
db *bun.DB
options []Option

mu sync.Mutex
bucketFlags map[string]*atomic.Bool
}

func NewFactory(db *bun.DB, options ...Option) *DefaultFactory {
return &DefaultFactory{
db: db,
options: options,
db: db,
options: options,
bucketFlags: make(map[string]*atomic.Bool),
}
}

func (d *DefaultFactory) Create(b bucket.Bucket, l ledger.Ledger) *Store {
return New(d.db, b, l, d.options...)
d.mu.Lock()
flag, ok := d.bucketFlags[l.Bucket]
if !ok {
flag = &atomic.Bool{}
d.bucketFlags[l.Bucket] = flag
}
d.mu.Unlock()

store := New(d.db, b, l, d.options...)
store.aloneInBucket = flag

return store
}
40 changes: 22 additions & 18 deletions internal/storage/ledger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"sync/atomic"

"github.com/formancehq/go-libs/v2/bun/bunpaginate"
"github.com/formancehq/go-libs/v2/migrations"
Expand All @@ -27,6 +28,12 @@ type Store struct {
bucket bucket.Bucket
ledger ledger.Ledger

// aloneInBucket is a shared optimization hint (per bucket) indicating whether
// this ledger is the only one in its bucket. The pointer is shared across all
// stores in the same bucket via the Factory, so updating it from any store
// (e.g. when a new ledger is created) immediately affects all stores.
aloneInBucket *atomic.Bool

tracer trace.Tracer
meter metric.Meter
checkBucketSchemaHistogram metric.Int64Histogram
Expand Down Expand Up @@ -189,26 +196,23 @@ func (store *Store) LockLedger(ctx context.Context) (*Store, bun.IDB, func() err
}

// newScopedSelect creates a new select query scoped to the current ledger.
// notes(gfyrag): The "WHERE ledger = 'XXX'" condition can cause degraded postgres plan.
// To avoid that, we use a WHERE OR to separate the two cases:
// 1. Check if the ledger is the only one in the bucket
// 2. Otherwise, filter by ledger name
// When the ledger is alone in its bucket, we skip the WHERE clause to avoid
// a degraded seq scan plan (selectivity ~100%). Otherwise, we filter by ledger
// name to use the composite index (ledger, id) efficiently.
//
// This relies on aloneInBucket being up to date (shared across the bucket).
func (store *Store) newScopedSelect() *bun.SelectQuery {
q := store.db.NewSelect()
checkLedgerAlone := store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("count = 1").
Join("JOIN (?) AS counters ON _system.ledgers.bucket = counters.bucket",
store.db.NewSelect().
TableExpr("_system.ledgers").
ColumnExpr("bucket").
ColumnExpr("COUNT(*) AS count").
Group("bucket"),
).
Where("_system.ledgers.name = ?", store.ledger.Name)

return q.
Where("((?) or ledger = ?)", checkLedgerAlone, store.ledger.Name)
if store.aloneInBucket == nil || !store.aloneInBucket.Load() {
q = q.Where("ledger = ?", store.ledger.Name)
}
return q
}

func (store *Store) SetAloneInBucket(alone bool) {
if store.aloneInBucket != nil {
store.aloneInBucket.Store(alone)
}
}

func New(db bun.IDB, bucket bucket.Bucket, l ledger.Ledger, opts ...Option) *Store {
Expand Down
12 changes: 12 additions & 0 deletions internal/storage/system/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Store interface {
ListLedgers(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error)
GetLedger(ctx context.Context, name string) (*ledger.Ledger, error)
GetDistinctBuckets(ctx context.Context) ([]string, error)
CountLedgersInBucket(ctx context.Context, bucket string) (int, error)

Migrate(ctx context.Context, options ...migrations.Option) error
GetMigrator(options ...migrations.Option) *migrations.Migrator
Expand Down Expand Up @@ -125,6 +126,17 @@ func (d *DefaultStore) GetLedger(ctx context.Context, name string) (*ledger.Ledg
return ret, nil
}

func (d *DefaultStore) CountLedgersInBucket(ctx context.Context, bucket string) (int, error) {
count, err := d.db.NewSelect().
TableExpr("_system.ledgers").
Where("bucket = ?", bucket).
Count(ctx)
if err != nil {
return 0, postgres.ResolveError(err)
}
return count, nil
}

func (d *DefaultStore) Migrate(ctx context.Context, options ...migrations.Option) error {
_, err := tracing.Trace(ctx, d.tracer, "MigrateSystemStore", func(ctx context.Context) (any, error) {
return nil, d.GetMigrator(options...).Up(ctx)
Expand Down
Loading