From 3fcb23fb1856cea6faf5cd0eeabfd7431c597754 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 2 Jan 2025 13:09:14 +0100 Subject: [PATCH] feat: handle missing compatibility layer with v2.1 --- internal/storage/ledger/legacy/adapters.go | 219 ++++++++++++++++++++- 1 file changed, 218 insertions(+), 1 deletion(-) diff --git a/internal/storage/ledger/legacy/adapters.go b/internal/storage/ledger/legacy/adapters.go index 3ea005891..046956565 100644 --- a/internal/storage/ledger/legacy/adapters.go +++ b/internal/storage/ledger/legacy/adapters.go @@ -3,6 +3,7 @@ package legacy import ( "context" "database/sql" + "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/go-libs/v2/metadata" "github.com/formancehq/go-libs/v2/migrations" "github.com/formancehq/go-libs/v2/time" @@ -10,32 +11,248 @@ import ( ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" "github.com/uptrace/bun" + "math/big" + "slices" ) +type accountsPaginatedResourceAdapter struct { + store *Store +} + +func (p accountsPaginatedResourceAdapter) GetOne(ctx context.Context, query ledgercontroller.ResourceQuery[any]) (*ledger.Account, error) { + var address string + _ = query.Builder.Walk(func(_ string, _ string, value any) error { + address = value.(string) + return nil + }) + return p.store.GetAccountWithVolumes(ctx, GetAccountQuery{ + PITFilterWithVolumes: PITFilterWithVolumes{ + PITFilter: PITFilter{ + PIT: query.PIT, + OOT: query.OOT, + }, + ExpandVolumes: slices.Contains(query.Expand, "volumes"), + ExpandEffectiveVolumes: slices.Contains(query.Expand, "effectiveVolumes"), + }, + Addr: address, + }) +} + +func (p accountsPaginatedResourceAdapter) Count(ctx context.Context, query ledgercontroller.ResourceQuery[any]) (int, error) { + return p.store.CountAccounts(ctx, NewListAccountsQuery(ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes]{ + QueryBuilder: query.Builder, + Options: PITFilterWithVolumes{ + PITFilter: PITFilter{ + PIT: query.PIT, + OOT: query.OOT, + }, + ExpandVolumes: slices.Contains(query.Expand, "volumes"), + ExpandEffectiveVolumes: slices.Contains(query.Expand, "effectiveVolumes"), + }, + })) +} + +func (p accountsPaginatedResourceAdapter) Paginate(ctx context.Context, query ledgercontroller.OffsetPaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Account], error) { + return p.store.GetAccountsWithVolumes(ctx, NewListAccountsQuery(ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes]{ + QueryBuilder: query.Options.Builder, + PageSize: query.PageSize, + Options: PITFilterWithVolumes{ + PITFilter: PITFilter{ + PIT: query.Options.PIT, + OOT: query.Options.OOT, + }, + ExpandVolumes: slices.Contains(query.Options.Expand, "volumes"), + ExpandEffectiveVolumes: slices.Contains(query.Options.Expand, "effectiveVolumes"), + }, + })) +} + +var _ ledgercontroller.PaginatedResource[ledger.Account, any, ledgercontroller.OffsetPaginatedQuery[any]] = (*accountsPaginatedResourceAdapter)(nil) + +type logsPaginatedResourceAdapter struct { + store *Store +} + +func (p logsPaginatedResourceAdapter) GetOne(_ context.Context, _ ledgercontroller.ResourceQuery[any]) (*ledger.Log, error) { + panic("never used") +} + +func (p logsPaginatedResourceAdapter) Count(_ context.Context, _ ledgercontroller.ResourceQuery[any]) (int, error) { + panic("never used") +} + +func (p logsPaginatedResourceAdapter) Paginate(ctx context.Context, query ledgercontroller.ColumnPaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Log], error) { + return p.store.GetLogs(ctx, NewListLogsQuery(ledgercontroller.PaginatedQueryOptions[any]{ + QueryBuilder: query.Options.Builder, + PageSize: query.PageSize, + Options: PITFilterWithVolumes{ + PITFilter: PITFilter{ + PIT: query.Options.PIT, + OOT: query.Options.OOT, + }, + }, + })) +} + +var _ ledgercontroller.PaginatedResource[ledger.Log, any, ledgercontroller.ColumnPaginatedQuery[any]] = (*logsPaginatedResourceAdapter)(nil) + +type transactionsPaginatedResourceAdapter struct { + store *Store +} + +func (p transactionsPaginatedResourceAdapter) GetOne(ctx context.Context, query ledgercontroller.ResourceQuery[any]) (*ledger.Transaction, error) { + var id int + _ = query.Builder.Walk(func(_ string, _ string, value any) error { + id = value.(int) + return nil + }) + return p.store.GetTransactionWithVolumes(ctx, GetTransactionQuery{ + PITFilterWithVolumes: PITFilterWithVolumes{ + PITFilter: PITFilter{ + PIT: query.PIT, + OOT: query.OOT, + }, + ExpandVolumes: slices.Contains(query.Expand, "volumes"), + ExpandEffectiveVolumes: slices.Contains(query.Expand, "effectiveVolumes"), + }, + ID: id, + }) +} + +func (p transactionsPaginatedResourceAdapter) Count(ctx context.Context, query ledgercontroller.ResourceQuery[any]) (int, error) { + return p.store.CountTransactions(ctx, NewListTransactionsQuery(ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes]{ + QueryBuilder: query.Builder, + Options: PITFilterWithVolumes{ + PITFilter: PITFilter{ + PIT: query.PIT, + OOT: query.OOT, + }, + ExpandVolumes: slices.Contains(query.Expand, "volumes"), + ExpandEffectiveVolumes: slices.Contains(query.Expand, "effectiveVolumes"), + }, + })) +} + +func (p transactionsPaginatedResourceAdapter) Paginate(ctx context.Context, query ledgercontroller.ColumnPaginatedQuery[any]) (*bunpaginate.Cursor[ledger.Transaction], error) { + return p.store.GetTransactions(ctx, NewListTransactionsQuery(ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes]{ + QueryBuilder: query.Options.Builder, + PageSize: query.PageSize, + Options: PITFilterWithVolumes{ + PITFilter: PITFilter{ + PIT: query.Options.PIT, + OOT: query.Options.OOT, + }, + ExpandVolumes: slices.Contains(query.Options.Expand, "volumes"), + ExpandEffectiveVolumes: slices.Contains(query.Options.Expand, "effectiveVolumes"), + }, + })) +} + +var _ ledgercontroller.PaginatedResource[ledger.Transaction, any, ledgercontroller.ColumnPaginatedQuery[any]] = (*transactionsPaginatedResourceAdapter)(nil) + +type aggregatedBalancesPaginatedResourceAdapter struct { + store *Store +} + +func (p aggregatedBalancesPaginatedResourceAdapter) GetOne(ctx context.Context, query ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions]) (*ledger.AggregatedVolumes, error) { + rawRet, err := p.store.GetAggregatedBalances(ctx, GetAggregatedBalanceQuery{ + PITFilter: PITFilter{ + PIT: query.PIT, + OOT: query.OOT, + }, + QueryBuilder: query.Builder, + UseInsertionDate: query.Opts.UseInsertionDate, + }) + if err != nil { + return nil, err + } + + ret := ledger.AggregatedVolumes{ + Aggregated: ledger.VolumesByAssets{}, + } + for asset, balance := range rawRet { + ret.Aggregated[asset] = ledger.Volumes{ + Input: new(big.Int).Add(new(big.Int), balance), + Output: new(big.Int), + } + } + + return &ret, nil +} + +func (p aggregatedBalancesPaginatedResourceAdapter) Count(_ context.Context, _ ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions]) (int, error) { + panic("never used") +} + +var _ ledgercontroller.Resource[ledger.AggregatedVolumes, ledgercontroller.GetAggregatedVolumesOptions] = (*aggregatedBalancesPaginatedResourceAdapter)(nil) + +type aggregatedVolumesPaginatedResourceAdapter struct { + store *Store +} + +func (p aggregatedVolumesPaginatedResourceAdapter) Paginate(ctx context.Context, query ledgercontroller.OffsetPaginatedQuery[ledgercontroller.GetVolumesOptions]) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) { + return p.store.GetVolumesWithBalances(ctx, NewGetVolumesWithBalancesQuery(ledgercontroller.PaginatedQueryOptions[FiltersForVolumes]{ + QueryBuilder: query.Options.Builder, + PageSize: query.PageSize, + Options: FiltersForVolumes{ + PITFilter: PITFilter{ + PIT: query.Options.PIT, + OOT: query.Options.OOT, + }, + UseInsertionDate: query.Options.Opts.UseInsertionDate, + GroupLvl: query.Options.Opts.GroupLvl, + }, + })) +} + +func (p aggregatedVolumesPaginatedResourceAdapter) GetOne(_ context.Context, _ ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions]) (*ledger.VolumesWithBalanceByAssetByAccount, error) { + panic("never used") +} + +func (p aggregatedVolumesPaginatedResourceAdapter) Count(_ context.Context, _ ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions]) (int, error) { + panic("never used") +} + +var _ ledgercontroller.PaginatedResource[ledger.VolumesWithBalanceByAssetByAccount, ledgercontroller.GetVolumesOptions, ledgercontroller.OffsetPaginatedQuery[ledgercontroller.GetVolumesOptions]] = (*aggregatedVolumesPaginatedResourceAdapter)(nil) + type DefaultStoreAdapter struct { newStore *ledgerstore.Store legacyStore *Store isFullUpToDate bool } -// todo; handle compat with v1 func (d *DefaultStoreAdapter) Accounts() ledgercontroller.PaginatedResource[ledger.Account, any, ledgercontroller.OffsetPaginatedQuery[any]] { + if !d.isFullUpToDate { + return &accountsPaginatedResourceAdapter{store: d.legacyStore} + } return d.newStore.Accounts() } func (d *DefaultStoreAdapter) Logs() ledgercontroller.PaginatedResource[ledger.Log, any, ledgercontroller.ColumnPaginatedQuery[any]] { + if !d.isFullUpToDate { + return &logsPaginatedResourceAdapter{store: d.legacyStore} + } return d.newStore.Logs() } func (d *DefaultStoreAdapter) Transactions() ledgercontroller.PaginatedResource[ledger.Transaction, any, ledgercontroller.ColumnPaginatedQuery[any]] { + if !d.isFullUpToDate { + return &transactionsPaginatedResourceAdapter{store: d.legacyStore} + } return d.newStore.Transactions() } func (d *DefaultStoreAdapter) AggregatedBalances() ledgercontroller.Resource[ledger.AggregatedVolumes, ledgercontroller.GetAggregatedVolumesOptions] { + if !d.isFullUpToDate { + return &aggregatedBalancesPaginatedResourceAdapter{store: d.legacyStore} + } return d.newStore.AggregatedVolumes() } func (d *DefaultStoreAdapter) Volumes() ledgercontroller.PaginatedResource[ledger.VolumesWithBalanceByAssetByAccount, ledgercontroller.GetVolumesOptions, ledgercontroller.OffsetPaginatedQuery[ledgercontroller.GetVolumesOptions]] { + if !d.isFullUpToDate { + return &aggregatedVolumesPaginatedResourceAdapter{store: d.legacyStore} + } return d.newStore.Volumes() }