From 87fcd13bf2c0ef73c48640c0fbe770a4f6e81844 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Tue, 22 Oct 2024 07:27:47 +0200 Subject: [PATCH] chore: clean compat layer from v2.1 --- go.mod | 2 +- internal/README.md | 2 +- internal/ledger.go | 2 +- internal/storage/bucket/default_bucket.go | 10 +- .../notes.yaml | 1 + .../26-accounts-recreate-unique-index/up.sql | 9 + .../migrations/27-clean-database/notes.yaml | 1 + .../migrations/27-clean-database/up.sql | 124 ++++ internal/storage/driver/adapters.go | 10 +- internal/storage/driver/driver.go | 4 - internal/storage/ledger/adapters.go | 51 ++ internal/storage/ledger/balances.go | 67 +- internal/storage/ledger/balances_test.go | 77 +- internal/storage/ledger/legacy/accounts.go | 198 ----- .../storage/ledger/legacy/accounts_test.go | 338 --------- internal/storage/ledger/legacy/adapters.go | 128 ---- internal/storage/ledger/legacy/balances.go | 146 ---- .../storage/ledger/legacy/balances_test.go | 175 ----- internal/storage/ledger/legacy/debug.go | 42 -- internal/storage/ledger/legacy/errors.go | 30 - internal/storage/ledger/legacy/logs.go | 50 -- internal/storage/ledger/legacy/logs_test.go | 62 -- internal/storage/ledger/legacy/main_test.go | 79 -- internal/storage/ledger/legacy/queries.go | 159 ----- internal/storage/ledger/legacy/store.go | 43 -- .../storage/ledger/legacy/transactions.go | 206 ------ .../ledger/legacy/transactions_test.go | 281 -------- internal/storage/ledger/legacy/utils.go | 185 ----- internal/storage/ledger/legacy/volumes.go | 188 ----- .../storage/ledger/legacy/volumes_test.go | 675 ------------------ .../ledger/resource_aggregated_balances.go | 2 +- internal/storage/system/migrations.go | 12 + 32 files changed, 250 insertions(+), 3109 deletions(-) create mode 100644 internal/storage/bucket/migrations/26-accounts-recreate-unique-index/notes.yaml create mode 100644 internal/storage/bucket/migrations/26-accounts-recreate-unique-index/up.sql create mode 100644 internal/storage/bucket/migrations/27-clean-database/notes.yaml create mode 100644 internal/storage/bucket/migrations/27-clean-database/up.sql create mode 100644 internal/storage/ledger/adapters.go delete mode 100644 internal/storage/ledger/legacy/accounts.go delete mode 100644 internal/storage/ledger/legacy/accounts_test.go delete mode 100644 internal/storage/ledger/legacy/adapters.go delete mode 100644 internal/storage/ledger/legacy/balances.go delete mode 100644 internal/storage/ledger/legacy/balances_test.go delete mode 100644 internal/storage/ledger/legacy/debug.go delete mode 100644 internal/storage/ledger/legacy/errors.go delete mode 100644 internal/storage/ledger/legacy/logs.go delete mode 100644 internal/storage/ledger/legacy/logs_test.go delete mode 100644 internal/storage/ledger/legacy/main_test.go delete mode 100644 internal/storage/ledger/legacy/queries.go delete mode 100644 internal/storage/ledger/legacy/store.go delete mode 100644 internal/storage/ledger/legacy/transactions.go delete mode 100644 internal/storage/ledger/legacy/transactions_test.go delete mode 100644 internal/storage/ledger/legacy/utils.go delete mode 100644 internal/storage/ledger/legacy/volumes.go delete mode 100644 internal/storage/ledger/legacy/volumes_test.go diff --git a/go.mod b/go.mod index fece61599..cc9c8f740 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,6 @@ require ( github.com/onsi/gomega v1.35.1 github.com/ory/dockertest/v3 v3.11.0 github.com/pborman/uuid v1.2.1 - github.com/pkg/errors v0.9.1 github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 @@ -55,6 +54,7 @@ require gopkg.in/yaml.v3 v3.0.1 // indirect require ( github.com/hashicorp/go-hclog v1.6.3 // indirect github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c // indirect + github.com/pkg/errors v0.9.1 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect ) diff --git a/internal/README.md b/internal/README.md index 42282d7d3..616e988bf 100644 --- a/internal/README.md +++ b/internal/README.md @@ -245,7 +245,7 @@ type BalancesByAssetsByAccounts map[string]BalancesByAssets ```go type Configuration struct { Bucket string `json:"bucket" bun:"bucket,type:varchar(255)"` - Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb"` + Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb,nullzero"` Features features.FeatureSet `json:"features" bun:"features,type:jsonb"` } ``` diff --git a/internal/ledger.go b/internal/ledger.go index f50708fd2..1fcb0b7e2 100644 --- a/internal/ledger.go +++ b/internal/ledger.go @@ -84,7 +84,7 @@ var ( type Configuration struct { Bucket string `json:"bucket" bun:"bucket,type:varchar(255)"` - Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb"` + Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb,nullzero"` Features features.FeatureSet `json:"features" bun:"features,type:jsonb"` } diff --git a/internal/storage/bucket/default_bucket.go b/internal/storage/bucket/default_bucket.go index 13e3ca8af..2267a2ca8 100644 --- a/internal/storage/bucket/default_bucket.go +++ b/internal/storage/bucket/default_bucket.go @@ -15,11 +15,11 @@ import ( ) // stateless version (+1 regarding directory name, as migrations start from 1 in the lib) -const MinimalSchemaVersion = 12 +const MinimalSchemaVersion = 27 type DefaultBucket struct { - name string - db *bun.DB + name string + db *bun.DB tracer trace.Tracer } @@ -81,8 +81,8 @@ func (b *DefaultBucket) AddLedger(ctx context.Context, l ledger.Ledger) error { func NewDefault(db *bun.DB, tracer trace.Tracer, name string) *DefaultBucket { return &DefaultBucket{ - db: db, - name: name, + db: db, + name: name, tracer: tracer, } } diff --git a/internal/storage/bucket/migrations/26-accounts-recreate-unique-index/notes.yaml b/internal/storage/bucket/migrations/26-accounts-recreate-unique-index/notes.yaml new file mode 100644 index 000000000..4b7c24021 --- /dev/null +++ b/internal/storage/bucket/migrations/26-accounts-recreate-unique-index/notes.yaml @@ -0,0 +1 @@ +name: Recreate accounts unique index diff --git a/internal/storage/bucket/migrations/26-accounts-recreate-unique-index/up.sql b/internal/storage/bucket/migrations/26-accounts-recreate-unique-index/up.sql new file mode 100644 index 000000000..3b8ad12bd --- /dev/null +++ b/internal/storage/bucket/migrations/26-accounts-recreate-unique-index/up.sql @@ -0,0 +1,9 @@ +-- There is already a covering index on accounts table (including seq column). +-- As we will remove the seq column in next migration, we have to create a new index without it (PG will remove it automatically in background). +-- Also, we create the index concurrently to avoid locking the table. +-- And, as there is already an index on this table, the index creation should not fail. +-- +-- We create this index in a dedicated as, as the doc mentions it (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT) +-- multi statements queries are automatically wrapped inside transaction block, and it's forbidden +-- to create index concurrently inside a transaction block. +create unique index concurrently accounts_ledger2 on "{{.Schema}}".accounts (ledger, address) \ No newline at end of file diff --git a/internal/storage/bucket/migrations/27-clean-database/notes.yaml b/internal/storage/bucket/migrations/27-clean-database/notes.yaml new file mode 100644 index 000000000..759af9421 --- /dev/null +++ b/internal/storage/bucket/migrations/27-clean-database/notes.yaml @@ -0,0 +1 @@ +name: Clean not used columns in database diff --git a/internal/storage/bucket/migrations/27-clean-database/up.sql b/internal/storage/bucket/migrations/27-clean-database/up.sql new file mode 100644 index 000000000..d120fb02d --- /dev/null +++ b/internal/storage/bucket/migrations/27-clean-database/up.sql @@ -0,0 +1,124 @@ +set search_path = '{{.Schema}}'; + +-- Clean all useless function/aggregates/indexes inherited from stateful version. +drop aggregate aggregate_objects(jsonb); +drop aggregate first(anyelement); + +drop function array_distinct(anyarray); +drop function insert_posting(_transaction_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb); +drop function upsert_account(_ledger character varying, _address character varying, _metadata jsonb, _date timestamp without time zone, _first_usage timestamp without time zone); +drop function get_latest_move_for_account_and_asset(_ledger character varying, _account_address character varying, _asset character varying, _before timestamp without time zone); +drop function update_transaction_metadata(_ledger character varying, _id numeric, _metadata jsonb, _date timestamp without time zone); +drop function delete_account_metadata(_ledger character varying, _address character varying, _key character varying, _date timestamp without time zone); +drop function delete_transaction_metadata(_ledger character varying, _id numeric, _key character varying, _date timestamp without time zone); +drop function balance_from_volumes(v volumes); +drop function get_all_account_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); +drop function first_agg(anyelement, anyelement); +drop function volumes_to_jsonb(v volumes_with_asset); +drop function get_account_aggregated_effective_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); +drop function handle_log(); +drop function get_account_aggregated_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone); +drop function get_aggregated_volumes_for_transaction(_ledger character varying, tx numeric); +drop function insert_move(_transactions_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, _account_address character varying, _asset character varying, _amount numeric, _is_source boolean, _account_exists boolean); +drop function get_all_assets(_ledger character varying); +drop function insert_transaction(_ledger character varying, data jsonb, _date timestamp without time zone, _account_metadata jsonb); +drop function get_all_account_effective_volumes(_ledger character varying, _account character varying, _before timestamp without time zone); +drop function get_account_balance(_ledger character varying, _account character varying, _asset character varying, _before timestamp without time zone); +drop function get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric); +drop function aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] ); +drop function get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone); +drop function revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone); + +drop index transactions_sources_arrays; +drop index transactions_destinations_arrays; +drop index transactions_sources; +drop index transactions_destinations; + +-- We will remove some triggers writing these columns (set_compat_xxx) later in this file. +-- When these triggers will be removed, there is a little moment where the columns will not be filled and constraints +-- still checked by the database. +-- So, we drop the not null constraint before removing the triggers. +-- Once the triggers removed, we will be able to drop the columns. +alter table moves +alter column transactions_seq drop not null, +alter column accounts_seq drop not null, +alter column accounts_address_array drop not null; + +alter table transactions_metadata +alter column transactions_seq drop not null; + +alter table accounts_metadata +alter column accounts_seq drop not null; + +-- Now, the columns are nullable, we can drop the trigger +drop trigger set_compat_on_move on moves; +drop trigger set_compat_on_accounts_metadata on accounts_metadata; +drop trigger set_compat_on_transactions_metadata on transactions_metadata; +drop function set_compat_on_move(); +drop function set_compat_on_accounts_metadata(); +drop function set_compat_on_transactions_metadata(); + +-- Finally remove the columns +alter table moves +drop column transactions_seq, +drop column accounts_seq, +drop column accounts_address_array; + +alter table transactions_metadata +drop column transactions_seq; + +alter table accounts_metadata +drop column accounts_seq; + +alter table transactions +drop column seq; + +alter table accounts +drop column seq; + +-- rename index create in previous migration, as the drop of the column seq of accounts table has automatically dropped the index accounts_ledger +alter index accounts_ledger2 +rename to accounts_ledger; + +create or replace function set_log_hash() + returns trigger + security definer + language plpgsql +as +$$ +declare + previousHash bytea; + marshalledAsJSON varchar; +begin + select hash into previousHash + from logs + where ledger = new.ledger + order by id desc + limit 1; + + -- select only fields participating in the hash on the backend and format json representation the same way + select '{' || + '"type":"' || new.type || '",' || + '"data":' || encode(new.memento, 'escape') || ',' || + '"date":"' || (to_json(new.date::timestamp)#>>'{}') || 'Z",' || + '"idempotencyKey":"' || coalesce(new.idempotency_key, '') || '",' || + '"id":0,' || + '"hash":null' || + '}' into marshalledAsJSON; + + new.hash = ( + select public.digest( + case + when previousHash is null + then marshalledAsJSON::bytea + else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || convert_to(marshalledAsJSON, 'LATIN1')::bytea + end || E'\n', 'sha256'::text + ) + ); + + return new; +end; +$$ set search_path from current; + +alter table logs +drop column seq; \ No newline at end of file diff --git a/internal/storage/driver/adapters.go b/internal/storage/driver/adapters.go index 304d9b7f6..de9ef7341 100644 --- a/internal/storage/driver/adapters.go +++ b/internal/storage/driver/adapters.go @@ -2,8 +2,7 @@ package driver import ( "context" - "fmt" - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger/legacy" + ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" ledger "github.com/formancehq/ledger/internal" ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" @@ -20,12 +19,7 @@ func (d *DefaultStorageDriverAdapter) OpenLedger(ctx context.Context, name strin return nil, nil, err } - isUpToDate, err := store.GetBucket().IsUpToDate(ctx) - if err != nil { - return nil, nil, fmt.Errorf("checking if bucket is up to date: %w", err) - } - - return ledgerstore.NewDefaultStoreAdapter(isUpToDate, store), l, nil + return ledgerstore.NewDefaultStoreAdapter(store), l, nil } func (d *DefaultStorageDriverAdapter) CreateLedger(ctx context.Context, l *ledger.Ledger) error { diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 497bb406e..ab45d4dac 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -39,10 +39,6 @@ type Driver struct { func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) { - if l.Metadata == nil { - l.Metadata = metadata.Metadata{} - } - b := d.bucketFactory.Create(l.Bucket) isInitialized, err := b.IsInitialized(ctx) if err != nil { diff --git a/internal/storage/ledger/adapters.go b/internal/storage/ledger/adapters.go new file mode 100644 index 000000000..17505d9a4 --- /dev/null +++ b/internal/storage/ledger/adapters.go @@ -0,0 +1,51 @@ +package ledger + +import ( + "context" + "database/sql" + ledger "github.com/formancehq/ledger/internal" + ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" +) + +type TX struct { + *Store +} + +type DefaultStoreAdapter struct { + *Store +} + +func (d *DefaultStoreAdapter) IsUpToDate(ctx context.Context) (bool, error) { + return d.HasMinimalVersion(ctx) +} + +func (d *DefaultStoreAdapter) BeginTX(ctx context.Context, opts *sql.TxOptions) (ledgercontroller.Store, error) { + store, err := d.Store.BeginTX(ctx, opts) + if err != nil { + return nil, err + } + + return &DefaultStoreAdapter{ + Store: store, + }, nil +} + +func (d *DefaultStoreAdapter) Commit() error { + return d.Store.Commit() +} + +func (d *DefaultStoreAdapter) Rollback() error { + return d.Store.Rollback() +} + +func (d *DefaultStoreAdapter) AggregatedBalances() ledgercontroller.Resource[ledger.AggregatedVolumes, ledgercontroller.GetAggregatedVolumesOptions] { + return d.AggregatedVolumes() +} + +func NewDefaultStoreAdapter(store *Store) *DefaultStoreAdapter { + return &DefaultStoreAdapter{ + Store: store, + } +} + +var _ ledgercontroller.Store = (*DefaultStoreAdapter)(nil) diff --git a/internal/storage/ledger/balances.go b/internal/storage/ledger/balances.go index d702f7135..6af3e6929 100644 --- a/internal/storage/ledger/balances.go +++ b/internal/storage/ledger/balances.go @@ -49,62 +49,25 @@ func (store *Store) GetBalances(ctx context.Context, query ledgercontroller.Bala } } - // Try to insert volumes using last move (to keep compat with previous version) or 0 values. - // This way, if the account has a 0 balance at this point, it will be locked as any other accounts. - // If the complete sql transaction fails, the account volumes will not be inserted. - selectMoves := store.db.NewSelect(). - ModelTableExpr(store.GetPrefixedRelationName("moves")). - DistinctOn("accounts_address, asset"). - Column("accounts_address", "asset"). - ColumnExpr("first_value(post_commit_volumes) over (partition by accounts_address, asset order by seq desc) as post_commit_volumes"). - ColumnExpr("first_value(ledger) over (partition by accounts_address, asset order by seq desc) as ledger"). - Where("("+strings.Join(conditions, ") OR (")+")", args...) - - zeroValuesAndMoves := store.db.NewSelect(). - TableExpr("(?) data", selectMoves). - Column("ledger", "accounts_address", "asset"). - ColumnExpr("(post_commit_volumes).inputs as input"). - ColumnExpr("(post_commit_volumes).outputs as output"). - UnionAll( - store.db.NewSelect(). - TableExpr( - "(?) data", - store.db.NewSelect().NewValues(&accountsVolumes), - ). - Column("*"), - ) - - zeroValueOrMoves := store.db.NewSelect(). - TableExpr("(?) data", zeroValuesAndMoves). - Column("ledger", "accounts_address", "asset", "input", "output"). - DistinctOn("ledger, accounts_address, asset") - - insertDefaultValue := store.db.NewInsert(). - TableExpr(store.GetPrefixedRelationName("accounts_volumes")). - TableExpr("(" + zeroValueOrMoves.String() + ") data"). - On("conflict (ledger, accounts_address, asset) do nothing"). - Returning("ledger, accounts_address, asset, input, output") - - selectExistingValues := store.db.NewSelect(). + err := store.db.NewSelect(). + With( + "ins", + // Try to insert volumes with 0 values. + // This way, if the account has a 0 balance at this point, it will be locked as any other accounts. + // It the complete sql transaction fail, the account volumes will not be inserted. + store.db.NewInsert(). + Model(&accountsVolumes). + ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). + On("conflict do nothing"), + ). + Model(&accountsVolumes). ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). - Column("ledger", "accounts_address", "asset", "input", "output"). + Column("accounts_address", "asset", "input", "output"). Where("("+strings.Join(conditions, ") OR (")+")", args...). For("update"). // notes(gfyrag): Keep order, it ensures consistent locking order and limit deadlocks - Order("accounts_address", "asset") - - finalQuery := store.db.NewSelect(). - With("inserted", insertDefaultValue). - With("existing", selectExistingValues). - ModelTableExpr( - "(?) accounts_volumes", - store.db.NewSelect(). - ModelTableExpr("inserted"). - UnionAll(store.db.NewSelect().ModelTableExpr("existing")), - ). - Model(&accountsVolumes) - - err := finalQuery.Scan(ctx) + Order("accounts_address", "asset"). + Scan(ctx) if err != nil { return nil, postgres.ResolveError(err) } diff --git a/internal/storage/ledger/balances_test.go b/internal/storage/ledger/balances_test.go index 3713294e9..8f299d5a3 100644 --- a/internal/storage/ledger/balances_test.go +++ b/internal/storage/ledger/balances_test.go @@ -4,7 +4,6 @@ package ledger_test import ( "database/sql" - "github.com/formancehq/go-libs/v2/bun/bunpaginate" "math/big" "testing" @@ -44,6 +43,32 @@ func TestBalancesGet(t *testing.T) { }) require.NoError(t, err) + t.Run("get balances of not existing account should create an empty row", func(t *testing.T) { + t.Parallel() + + balances, err := store.GetBalances(ctx, ledgercontroller.BalanceQuery{ + "orders:1234": []string{"USD"}, + }) + require.NoError(t, err) + require.Len(t, balances, 1) + require.NotNil(t, balances["orders:1234"]) + require.Len(t, balances["orders:1234"], 1) + require.Equal(t, big.NewInt(0), balances["orders:1234"]["USD"]) + + volumes := make([]*ledger.AccountsVolumes, 0) + + err = store.GetDB().NewSelect(). + Model(&volumes). + ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). + Where("accounts_address = ?", "orders:1234"). + Scan(ctx) + require.NoError(t, err) + require.Len(t, volumes, 1) + require.Equal(t, "USD", volumes[0].Asset) + require.Equal(t, big.NewInt(0), volumes[0].Input) + require.Equal(t, big.NewInt(0), volumes[0].Output) + }) + t.Run("check concurrent access on same balance", func(t *testing.T) { t.Parallel() @@ -130,56 +155,6 @@ func TestBalancesGet(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, count) }) - - t.Run("with balance from move", func(t *testing.T) { - t.Parallel() - - tx := ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "bank", "USD", big.NewInt(100)), - ) - err := store.InsertTransaction(ctx, &tx) - require.NoError(t, err) - - bankAccount := ledger.Account{ - Address: "bank", - FirstUsage: tx.InsertedAt, - InsertionDate: tx.InsertedAt, - UpdatedAt: tx.InsertedAt, - } - err = store.UpsertAccounts(ctx, &bankAccount) - require.NoError(t, err) - - err = store.InsertMoves(ctx, &ledger.Move{ - TransactionID: tx.ID, - IsSource: false, - Account: "bank", - Amount: (*bunpaginate.BigInt)(big.NewInt(100)), - Asset: "USD", - InsertionDate: tx.InsertedAt, - EffectiveDate: tx.InsertedAt, - PostCommitVolumes: pointer.For(ledger.NewVolumesInt64(100, 0)), - }) - require.NoError(t, err) - - balances, err := store.GetBalances(ctx, ledgercontroller.BalanceQuery{ - "bank": {"USD"}, - }) - require.NoError(t, err) - - require.NotNil(t, balances["bank"]) - RequireEqual(t, big.NewInt(100), balances["bank"]["USD"]) - - // Check a new line has been inserted into accounts_volumes table - volumes := &ledger.AccountsVolumes{} - err = store.GetDB().NewSelect(). - ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")). - Where("accounts_address = ? and ledger = ?", "bank", store.GetLedger().Name). - Scan(ctx, volumes) - require.NoError(t, err) - - RequireEqual(t, big.NewInt(100), volumes.Input) - RequireEqual(t, big.NewInt(0), volumes.Output) - }) } func TestBalancesAggregates(t *testing.T) { diff --git a/internal/storage/ledger/legacy/accounts.go b/internal/storage/ledger/legacy/accounts.go deleted file mode 100644 index ccadb0c77..000000000 --- a/internal/storage/ledger/legacy/accounts.go +++ /dev/null @@ -1,198 +0,0 @@ -package legacy - -import ( - "context" - "errors" - "fmt" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - "regexp" - - "github.com/formancehq/go-libs/v2/bun/bunpaginate" - - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/uptrace/bun" -) - -func (store *Store) buildAccountQuery(q PITFilterWithVolumes, query *bun.SelectQuery) *bun.SelectQuery { - - query = query. - Column("accounts.address", "accounts.first_usage"). - Where("accounts.ledger = ?", store.name). - Apply(filterPIT(q.PIT, "first_usage")). - Order("accounts.address"). - ModelTableExpr(store.GetPrefixedRelationName("accounts")) - - if q.PIT != nil && !q.PIT.IsZero() { - query = query. - Column("accounts.address"). - ColumnExpr(`coalesce(accounts_metadata.metadata, '{}'::jsonb) as metadata`). - Join(` - left join lateral ( - select metadata, accounts_seq - from `+store.GetPrefixedRelationName("accounts_metadata")+` - where accounts_metadata.accounts_seq = accounts.seq and accounts_metadata.date < ? - order by revision desc - limit 1 - ) accounts_metadata on true - `, q.PIT) - } else { - query = query.ColumnExpr("accounts.metadata") - } - - if q.ExpandVolumes { - query = query. - ColumnExpr("volumes.*"). - Join(`join `+store.GetPrefixedRelationName("get_account_aggregated_volumes")+`(?, accounts.address, ?) volumes on true`, store.name, q.PIT) - } - - if q.ExpandEffectiveVolumes { - query = query. - ColumnExpr("effective_volumes.*"). - Join(`join `+store.GetPrefixedRelationName("get_account_aggregated_effective_volumes")+`(?, accounts.address, ?) effective_volumes on true`, store.name, q.PIT) - } - - return query -} - -func (store *Store) accountQueryContext(qb query.Builder, q ListAccountsQuery) (string, []any, error) { - metadataRegex := regexp.MustCompile(`metadata\[(.+)]`) - balanceRegex := regexp.MustCompile(`balance\[(.*)]`) - - return qb.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { - convertOperatorToSQL := func() string { - switch operator { - case "$match": - return "=" - case "$lt": - return "<" - case "$gt": - return ">" - case "$lte": - return "<=" - case "$gte": - return ">=" - } - panic("unreachable") - } - switch { - case key == "address": - if operator != "$match" { - return "", nil, errors.New("'address' column can only be used with $match") - } - switch address := value.(type) { - case string: - return filterAccountAddress(address, "accounts.address"), nil, nil - default: - return "", nil, newErrInvalidQuery("unexpected type %T for column 'address'", address) - } - case metadataRegex.Match([]byte(key)): - if operator != "$match" { - return "", nil, newErrInvalidQuery("'account' column can only be used with $match") - } - match := metadataRegex.FindAllStringSubmatch(key, 3) - - key := "metadata" - if q.Options.Options.PIT != nil && !q.Options.Options.PIT.IsZero() { - key = "accounts_metadata.metadata" - } - - return key + " @> ?", []any{map[string]any{ - match[0][1]: value, - }}, nil - case balanceRegex.Match([]byte(key)): - match := balanceRegex.FindAllStringSubmatch(key, 2) - - return fmt.Sprintf(`( - select `+store.GetPrefixedRelationName("balance_from_volumes")+`(post_commit_volumes) - from `+store.GetPrefixedRelationName("moves")+` - where asset = ? and accounts_address = accounts.address and ledger = ? - order by seq desc - limit 1 - ) %s ?`, convertOperatorToSQL()), []any{match[0][1], store.name, value}, nil - case key == "balance": - return fmt.Sprintf(`( - select `+store.GetPrefixedRelationName("balance_from_volumes")+`(post_commit_volumes) - from `+store.GetPrefixedRelationName("moves")+` - where accounts_address = accounts.address and ledger = ? - order by seq desc - limit 1 - ) %s ?`, convertOperatorToSQL()), []any{store.name, value}, nil - - case key == "metadata": - if operator != "$exists" { - return "", nil, newErrInvalidQuery("'metadata' key filter can only be used with $exists") - } - if q.Options.Options.PIT != nil && !q.Options.Options.PIT.IsZero() { - key = "accounts_metadata.metadata" - } - - return fmt.Sprintf("%s -> ? IS NOT NULL", key), []any{value}, nil - default: - return "", nil, newErrInvalidQuery("unknown key '%s' when building query", key) - } - })) -} - -func (store *Store) buildAccountListQuery(selectQuery *bun.SelectQuery, q ListAccountsQuery, where string, args []any) *bun.SelectQuery { - selectQuery = store.buildAccountQuery(q.Options.Options, selectQuery) - - if where != "" { - return selectQuery.Where(where, args...) - } - - return selectQuery -} - -func (store *Store) GetAccountsWithVolumes(ctx context.Context, q ListAccountsQuery) (*bunpaginate.Cursor[ledger.Account], error) { - var ( - where string - args []any - err error - ) - if q.Options.QueryBuilder != nil { - where, args, err = store.accountQueryContext(q.Options.QueryBuilder, q) - if err != nil { - return nil, err - } - } - - return paginateWithOffset[ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes], ledger.Account](store, ctx, - (*bunpaginate.OffsetPaginatedQuery[ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes]])(&q), - func(query *bun.SelectQuery) *bun.SelectQuery { - return store.buildAccountListQuery(query, q, where, args) - }, - ) -} - -func (store *Store) GetAccountWithVolumes(ctx context.Context, q GetAccountQuery) (*ledger.Account, error) { - account, err := fetch[*ledger.Account](store, true, ctx, func(query *bun.SelectQuery) *bun.SelectQuery { - query = store.buildAccountQuery(q.PITFilterWithVolumes, query). - Where("accounts.address = ?", q.Addr). - Limit(1) - - return query - }) - if err != nil { - return nil, err - } - return account, nil -} - -func (store *Store) CountAccounts(ctx context.Context, q ListAccountsQuery) (int, error) { - var ( - where string - args []any - err error - ) - if q.Options.QueryBuilder != nil { - where, args, err = store.accountQueryContext(q.Options.QueryBuilder, q) - if err != nil { - return 0, err - } - } - - return count[ledger.Account](store, true, ctx, func(query *bun.SelectQuery) *bun.SelectQuery { - return store.buildAccountListQuery(query, q, where, args) - }) -} diff --git a/internal/storage/ledger/legacy/accounts_test.go b/internal/storage/ledger/legacy/accounts_test.go deleted file mode 100644 index 62201eaa8..000000000 --- a/internal/storage/ledger/legacy/accounts_test.go +++ /dev/null @@ -1,338 +0,0 @@ -//go:build it - -package legacy_test - -import ( - "github.com/formancehq/go-libs/v2/pointer" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - "github.com/formancehq/ledger/internal/storage/ledger/legacy" - "math/big" - "testing" - - "github.com/formancehq/go-libs/v2/time" - - "github.com/formancehq/go-libs/v2/logging" - - "github.com/formancehq/go-libs/v2/metadata" - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger/legacy" - "github.com/stretchr/testify/require" -) - -func TestGetAccounts(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - now := time.Now() - ctx := logging.TestingContext() - - err := store.newStore.CommitTransaction(ctx, pointer.For(ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(100))). - WithTimestamp(now). - WithInsertedAt(now))) - require.NoError(t, err) - - require.NoError(t, store.newStore.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{ - "account:1": { - "category": "1", - }, - "account:2": { - "category": "2", - }, - "account:3": { - "category": "3", - }, - "orders:1": { - "foo": "bar", - }, - "orders:2": { - "foo": "bar", - }, - })) - - err = store.newStore.CommitTransaction(ctx, pointer.For(ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(100))). - WithTimestamp(now.Add(4*time.Minute)). - WithInsertedAt(now.Add(100*time.Millisecond)))) - require.NoError(t, err) - - err = store.newStore.CommitTransaction(ctx, pointer.For(ledger.NewTransaction(). - WithPostings(ledger.NewPosting("account:1", "bank", "USD", big.NewInt(50))). - WithTimestamp(now.Add(3*time.Minute)). - WithInsertedAt(now.Add(200*time.Millisecond)))) - require.NoError(t, err) - - err = store.newStore.CommitTransaction(ctx, pointer.For(ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(0))). - WithTimestamp(now.Add(-time.Minute)). - WithInsertedAt(now.Add(200*time.Millisecond)))) - require.NoError(t, err) - - t.Run("list all", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}))) - require.NoError(t, err) - require.Len(t, accounts.Data, 7) - }) - - t.Run("list using metadata", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Match("metadata[category]", "1")), - )) - require.NoError(t, err) - require.Len(t, accounts.Data, 1) - }) - - t.Run("list before date", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{ - PITFilter: ledgerstore.PITFilter{ - PIT: &now, - }, - }))) - require.NoError(t, err) - require.Len(t, accounts.Data, 2) - }) - - t.Run("list with volumes", func(t *testing.T) { - t.Parallel() - - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{ - ExpandVolumes: true, - }).WithQueryBuilder(query.Match("address", "account:1")))) - require.NoError(t, err) - require.Len(t, accounts.Data, 1) - require.Equal(t, ledger.VolumesByAssets{ - "USD": ledger.NewVolumesInt64(200, 50), - }, accounts.Data[0].Volumes) - }) - - t.Run("list with volumes using PIT", func(t *testing.T) { - t.Parallel() - - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{ - PITFilter: ledgerstore.PITFilter{ - PIT: &now, - }, - ExpandVolumes: true, - }).WithQueryBuilder(query.Match("address", "account:1")))) - require.NoError(t, err) - require.Len(t, accounts.Data, 1) - require.Equal(t, ledger.VolumesByAssets{ - "USD": ledger.NewVolumesInt64(100, 0), - }, accounts.Data[0].Volumes) - }) - - t.Run("list with effective volumes", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{ - ExpandEffectiveVolumes: true, - }).WithQueryBuilder(query.Match("address", "account:1")))) - require.NoError(t, err) - require.Len(t, accounts.Data, 1) - require.Equal(t, ledger.VolumesByAssets{ - "USD": ledger.NewVolumesInt64(200, 50), - }, accounts.Data[0].EffectiveVolumes) - }) - - t.Run("list with effective volumes using PIT", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{ - PITFilter: ledgerstore.PITFilter{ - PIT: &now, - }, - ExpandEffectiveVolumes: true, - }).WithQueryBuilder(query.Match("address", "account:1")))) - require.NoError(t, err) - require.Len(t, accounts.Data, 1) - require.Equal(t, ledger.VolumesByAssets{ - "USD": ledger.NewVolumesInt64(100, 0), - }, accounts.Data[0].EffectiveVolumes) - }) - - t.Run("list using filter on address", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Match("address", "account:")), - )) - require.NoError(t, err) - require.Len(t, accounts.Data, 3) - }) - t.Run("list using filter on multiple address", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder( - query.Or( - query.Match("address", "account:1"), - query.Match("address", "orders:"), - ), - ), - )) - require.NoError(t, err) - require.Len(t, accounts.Data, 3) - }) - t.Run("list using filter on balances", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Lt("balance[USD]", 0)), - )) - require.NoError(t, err) - require.Len(t, accounts.Data, 1) // world - - accounts, err = store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Gt("balance[USD]", 0)), - )) - require.NoError(t, err) - require.Len(t, accounts.Data, 2) - require.Equal(t, "account:1", accounts.Data[0].Address) - require.Equal(t, "bank", accounts.Data[1].Address) - }) - - t.Run("list using filter on exists metadata", func(t *testing.T) { - t.Parallel() - accounts, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Exists("metadata", "foo")), - )) - require.NoError(t, err) - require.Len(t, accounts.Data, 2) - - accounts, err = store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Exists("metadata", "category")), - )) - require.NoError(t, err) - require.Len(t, accounts.Data, 3) - }) - - t.Run("list using filter invalid field", func(t *testing.T) { - t.Parallel() - _, err := store.GetAccountsWithVolumes(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Lt("invalid", 0)), - )) - require.Error(t, err) - require.True(t, legacy.IsErrInvalidQuery(err)) - }) -} - -func TestGetAccount(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - now := time.Now() - ctx := logging.TestingContext() - - err := store.newStore.CommitTransaction(ctx, pointer.For(ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "multi", "USD/2", big.NewInt(100)), - ).WithTimestamp(now))) - require.NoError(t, err) - - require.NoError(t, store.newStore.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{ - "multi": { - "category": "gold", - }, - })) - - err = store.newStore.CommitTransaction(ctx, pointer.For(ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "multi", "USD/2", big.NewInt(0)), - ).WithTimestamp(now.Add(-time.Minute)))) - require.NoError(t, err) - - t.Run("find account", func(t *testing.T) { - t.Parallel() - account, err := store.GetAccountWithVolumes(ctx, ledgerstore.NewGetAccountQuery("multi")) - require.NoError(t, err) - require.Equal(t, ledger.Account{ - Address: "multi", - Metadata: metadata.Metadata{ - "category": "gold", - }, - FirstUsage: now.Add(-time.Minute), - }, *account) - - account, err = store.GetAccountWithVolumes(ctx, ledgerstore.NewGetAccountQuery("world")) - require.NoError(t, err) - require.Equal(t, ledger.Account{ - Address: "world", - Metadata: metadata.Metadata{}, - FirstUsage: now.Add(-time.Minute), - }, *account) - }) - - t.Run("find account in past", func(t *testing.T) { - t.Parallel() - account, err := store.GetAccountWithVolumes(ctx, ledgerstore.NewGetAccountQuery("multi").WithPIT(now.Add(-30*time.Second))) - require.NoError(t, err) - require.Equal(t, ledger.Account{ - Address: "multi", - Metadata: metadata.Metadata{}, - FirstUsage: now.Add(-time.Minute), - }, *account) - }) - - t.Run("find account with volumes", func(t *testing.T) { - t.Parallel() - account, err := store.GetAccountWithVolumes(ctx, ledgerstore.NewGetAccountQuery("multi"). - WithExpandVolumes()) - require.NoError(t, err) - require.Equal(t, ledger.Account{ - Address: "multi", - Metadata: metadata.Metadata{ - "category": "gold", - }, - FirstUsage: now.Add(-time.Minute), - Volumes: ledger.VolumesByAssets{ - "USD/2": ledger.NewVolumesInt64(100, 0), - }, - }, *account) - }) - - t.Run("find account with effective volumes", func(t *testing.T) { - t.Parallel() - account, err := store.GetAccountWithVolumes(ctx, ledgerstore.NewGetAccountQuery("multi"). - WithExpandEffectiveVolumes()) - require.NoError(t, err) - require.Equal(t, ledger.Account{ - Address: "multi", - Metadata: metadata.Metadata{ - "category": "gold", - }, - FirstUsage: now.Add(-time.Minute), - - EffectiveVolumes: ledger.VolumesByAssets{ - "USD/2": ledger.NewVolumesInt64(100, 0), - }, - }, *account) - }) - - t.Run("find account using pit", func(t *testing.T) { - t.Parallel() - account, err := store.GetAccountWithVolumes(ctx, ledgerstore.NewGetAccountQuery("multi").WithPIT(now)) - require.NoError(t, err) - require.Equal(t, ledger.Account{ - Address: "multi", - Metadata: metadata.Metadata{}, - FirstUsage: now.Add(-time.Minute), - }, *account) - }) - - t.Run("not existent account", func(t *testing.T) { - t.Parallel() - _, err := store.GetAccountWithVolumes(ctx, ledgerstore.NewGetAccountQuery("account_not_existing")) - require.Error(t, err) - }) - -} - -func TestCountAccounts(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - ctx := logging.TestingContext() - - err := store.newStore.CommitTransaction(ctx, pointer.For(ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", "central_bank", "USD/2", big.NewInt(100)), - ))) - require.NoError(t, err) - - countAccounts, err := store.CountAccounts(ctx, ledgerstore.NewListAccountsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}))) - require.NoError(t, err) - require.EqualValues(t, 2, countAccounts) // world + central_bank -} diff --git a/internal/storage/ledger/legacy/adapters.go b/internal/storage/ledger/legacy/adapters.go deleted file mode 100644 index 3ea005891..000000000 --- a/internal/storage/ledger/legacy/adapters.go +++ /dev/null @@ -1,128 +0,0 @@ -package legacy - -import ( - "context" - "database/sql" - "github.com/formancehq/go-libs/v2/metadata" - "github.com/formancehq/go-libs/v2/migrations" - "github.com/formancehq/go-libs/v2/time" - ledger "github.com/formancehq/ledger/internal" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" - "github.com/uptrace/bun" -) - -type DefaultStoreAdapter struct { - newStore *ledgerstore.Store - legacyStore *Store - isFullUpToDate bool -} - -// todo; handle compat with v1 -func (d *DefaultStoreAdapter) Accounts() ledgercontroller.PaginatedResource[ledger.Account, any, ledgercontroller.OffsetPaginatedQuery[any]] { - return d.newStore.Accounts() -} - -func (d *DefaultStoreAdapter) Logs() ledgercontroller.PaginatedResource[ledger.Log, any, ledgercontroller.ColumnPaginatedQuery[any]] { - return d.newStore.Logs() -} - -func (d *DefaultStoreAdapter) Transactions() ledgercontroller.PaginatedResource[ledger.Transaction, any, ledgercontroller.ColumnPaginatedQuery[any]] { - return d.newStore.Transactions() -} - -func (d *DefaultStoreAdapter) AggregatedBalances() ledgercontroller.Resource[ledger.AggregatedVolumes, ledgercontroller.GetAggregatedVolumesOptions] { - return d.newStore.AggregatedVolumes() -} - -func (d *DefaultStoreAdapter) Volumes() ledgercontroller.PaginatedResource[ledger.VolumesWithBalanceByAssetByAccount, ledgercontroller.GetVolumesOptions, ledgercontroller.OffsetPaginatedQuery[ledgercontroller.GetVolumesOptions]] { - return d.newStore.Volumes() -} - -func (d *DefaultStoreAdapter) GetDB() bun.IDB { - return d.newStore.GetDB() -} - -func (d *DefaultStoreAdapter) GetBalances(ctx context.Context, query ledgercontroller.BalanceQuery) (ledgercontroller.Balances, error) { - return d.newStore.GetBalances(ctx, query) -} - -func (d *DefaultStoreAdapter) CommitTransaction(ctx context.Context, transaction *ledger.Transaction) error { - return d.newStore.CommitTransaction(ctx, transaction) -} - -func (d *DefaultStoreAdapter) RevertTransaction(ctx context.Context, id int, at time.Time) (*ledger.Transaction, bool, error) { - return d.newStore.RevertTransaction(ctx, id, at) -} - -func (d *DefaultStoreAdapter) UpdateTransactionMetadata(ctx context.Context, transactionID int, m metadata.Metadata) (*ledger.Transaction, bool, error) { - return d.newStore.UpdateTransactionMetadata(ctx, transactionID, m) -} - -func (d *DefaultStoreAdapter) DeleteTransactionMetadata(ctx context.Context, transactionID int, key string) (*ledger.Transaction, bool, error) { - return d.newStore.DeleteTransactionMetadata(ctx, transactionID, key) -} - -func (d *DefaultStoreAdapter) UpdateAccountsMetadata(ctx context.Context, m map[string]metadata.Metadata) error { - return d.newStore.UpdateAccountsMetadata(ctx, m) -} - -func (d *DefaultStoreAdapter) UpsertAccounts(ctx context.Context, accounts ...*ledger.Account) error { - return d.newStore.UpsertAccounts(ctx, accounts...) -} - -func (d *DefaultStoreAdapter) DeleteAccountMetadata(ctx context.Context, address, key string) error { - return d.newStore.DeleteAccountMetadata(ctx, address, key) -} - -func (d *DefaultStoreAdapter) InsertLog(ctx context.Context, log *ledger.Log) error { - return d.newStore.InsertLog(ctx, log) -} - -func (d *DefaultStoreAdapter) LockLedger(ctx context.Context) error { - return d.newStore.LockLedger(ctx) -} - -func (d *DefaultStoreAdapter) ReadLogWithIdempotencyKey(ctx context.Context, ik string) (*ledger.Log, error) { - return d.newStore.ReadLogWithIdempotencyKey(ctx, ik) -} - -func (d *DefaultStoreAdapter) IsUpToDate(ctx context.Context) (bool, error) { - return d.newStore.HasMinimalVersion(ctx) -} - -func (d *DefaultStoreAdapter) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error) { - return d.newStore.GetMigrationsInfo(ctx) -} - -func (d *DefaultStoreAdapter) BeginTX(ctx context.Context, opts *sql.TxOptions) (ledgercontroller.Store, error) { - store, err := d.newStore.BeginTX(ctx, opts) - if err != nil { - return nil, err - } - - legacyStore := d.legacyStore.WithDB(store.GetDB()) - - return &DefaultStoreAdapter{ - newStore: store, - legacyStore: legacyStore, - }, nil -} - -func (d *DefaultStoreAdapter) Commit() error { - return d.newStore.Commit() -} - -func (d *DefaultStoreAdapter) Rollback() error { - return d.newStore.Rollback() -} - -func NewDefaultStoreAdapter(isFullUpToDate bool, store *ledgerstore.Store) *DefaultStoreAdapter { - return &DefaultStoreAdapter{ - isFullUpToDate: isFullUpToDate, - newStore: store, - legacyStore: New(store.GetDB(), store.GetLedger().Bucket, store.GetLedger().Name), - } -} - -var _ ledgercontroller.Store = (*DefaultStoreAdapter)(nil) diff --git a/internal/storage/ledger/legacy/balances.go b/internal/storage/ledger/legacy/balances.go deleted file mode 100644 index 7a3b02cbd..000000000 --- a/internal/storage/ledger/legacy/balances.go +++ /dev/null @@ -1,146 +0,0 @@ -package legacy - -import ( - "context" - "errors" - "fmt" - "github.com/formancehq/go-libs/v2/platform/postgres" - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/uptrace/bun" -) - -func (store *Store) GetAggregatedBalances(ctx context.Context, q GetAggregatedBalanceQuery) (ledger.BalancesByAssets, error) { - - var ( - needMetadata bool - subQuery string - args []any - err error - ) - if q.QueryBuilder != nil { - subQuery, args, err = q.QueryBuilder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { - switch { - case key == "address": - if operator != "$match" { - return "", nil, newErrInvalidQuery("'address' column can only be used with $match") - } - - switch address := value.(type) { - case string: - return filterAccountAddress(address, "accounts_address"), nil, nil - default: - return "", nil, newErrInvalidQuery("unexpected type %T for column 'address'", address) - } - case metadataRegex.Match([]byte(key)): - if operator != "$match" { - return "", nil, newErrInvalidQuery("'metadata' column can only be used with $match") - } - match := metadataRegex.FindAllStringSubmatch(key, 3) - needMetadata = true - key := "accounts.metadata" - if q.PIT != nil { - key = "am.metadata" - } - - return key + " @> ?", []any{map[string]any{ - match[0][1]: value, - }}, nil - - case key == "metadata": - if operator != "$exists" { - return "", nil, newErrInvalidQuery("'metadata' key filter can only be used with $exists") - } - needMetadata = true - key := "accounts.metadata" - if q.PIT != nil && !q.PIT.IsZero() { - key = "am.metadata" - } - - return fmt.Sprintf("%s -> ? IS NOT NULL", key), []any{value}, nil - default: - return "", nil, newErrInvalidQuery("unknown key '%s' when building query", key) - } - })) - if err != nil { - return nil, err - } - } - - type Temp struct { - Aggregated ledger.VolumesByAssets `bun:"aggregated,type:jsonb"` - } - ret, err := fetch[*Temp](store, false, ctx, - func(selectQuery *bun.SelectQuery) *bun.SelectQuery { - pitColumn := "effective_date" - if q.UseInsertionDate { - pitColumn = "insertion_date" - } - moves := store.db. - NewSelect(). - ModelTableExpr(store.GetPrefixedRelationName("moves")). - DistinctOn("moves.accounts_address, moves.asset"). - Where("moves.ledger = ?", store.name). - Apply(filterPIT(q.PIT, pitColumn)) - - if q.UseInsertionDate { - moves = moves. - ColumnExpr("accounts_address"). - ColumnExpr("asset"). - ColumnExpr("first_value(moves.post_commit_volumes) over (partition by moves.accounts_address, moves.asset order by seq desc) as post_commit_volumes") - } else { - moves = moves. - ColumnExpr("accounts_address"). - ColumnExpr("asset"). - ColumnExpr("first_value(moves.post_commit_effective_volumes) over (partition by moves.accounts_address, moves.asset order by effective_date desc, seq desc) as post_commit_effective_volumes") - } - - if needMetadata { - if q.PIT != nil { - moves = moves.Join(`join lateral ( - select metadata - from `+store.GetPrefixedRelationName("accounts_metadata")+` am - where am.accounts_seq = moves.accounts_seq and (? is null or date <= ?) - order by revision desc - limit 1 - ) am on true`, q.PIT, q.PIT) - } else { - moves = moves.Join(`join lateral ( - select metadata - from ` + store.GetPrefixedRelationName("accounts") + ` a - where a.seq = moves.accounts_seq - ) accounts on true`) - } - } - if subQuery != "" { - moves = moves.Where(subQuery, args...) - } - - volumesColumn := "post_commit_effective_volumes" - if q.UseInsertionDate { - volumesColumn = "post_commit_volumes" - } - - finalQuery := selectQuery. - With("moves", moves). - With( - "data", - selectQuery.NewSelect(). - TableExpr("moves"). - ColumnExpr(fmt.Sprintf(store.GetPrefixedRelationName("volumes_to_jsonb")+`((moves.asset, (sum((moves.%s).inputs), sum((moves.%s).outputs))::%s)) as aggregated`, volumesColumn, volumesColumn, store.GetPrefixedRelationName("volumes"))). - Group("moves.asset"), - ). - TableExpr("data"). - ColumnExpr("aggregate_objects(data.aggregated) as aggregated") - - return finalQuery - }) - if err != nil && !errors.Is(err, postgres.ErrNotFound) { - return nil, err - } - if errors.Is(err, postgres.ErrNotFound) { - return ledger.BalancesByAssets{}, nil - } - - return ret.Aggregated.Balances(), nil -} diff --git a/internal/storage/ledger/legacy/balances_test.go b/internal/storage/ledger/legacy/balances_test.go deleted file mode 100644 index e2adaac9d..000000000 --- a/internal/storage/ledger/legacy/balances_test.go +++ /dev/null @@ -1,175 +0,0 @@ -//go:build it - -package legacy_test - -import ( - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger/legacy" - "github.com/google/go-cmp/cmp" - "math/big" - "testing" - - "github.com/formancehq/go-libs/v2/time" - - "github.com/formancehq/go-libs/v2/logging" - "github.com/formancehq/go-libs/v2/pointer" - - "github.com/formancehq/go-libs/v2/metadata" - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/stretchr/testify/require" -) - -func TestGetBalancesAggregated(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - now := time.Now() - ctx := logging.TestingContext() - - bigInt, _ := big.NewInt(0).SetString("1000", 10) - smallInt := big.NewInt(100) - - tx1 := ledger.NewTransaction(). - WithPostings( - ledger.NewPosting("world", "users:1", "USD", bigInt), - ledger.NewPosting("world", "users:2", "USD", smallInt), - ). - WithTimestamp(now). - WithInsertedAt(now) - err := store.newStore.CommitTransaction(ctx, &tx1) - require.NoError(t, err) - - tx2 := ledger.NewTransaction(). - WithPostings( - ledger.NewPosting("world", "users:1", "USD", bigInt), - ledger.NewPosting("world", "users:2", "USD", smallInt), - ledger.NewPosting("world", "xxx", "EUR", smallInt), - ). - WithTimestamp(now.Add(-time.Minute)). - WithInsertedAt(now.Add(time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx2) - require.NoError(t, err) - - require.NoError(t, store.newStore.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{ - "users:1": { - "category": "premium", - }, - "users:2": { - "category": "premium", - }, - })) - - require.NoError(t, store.newStore.DeleteAccountMetadata(ctx, "users:2", "category")) - - require.NoError(t, store.newStore.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{ - "users:1": { - "category": "premium", - }, - "users:2": { - "category": "2", - }, - "world": { - "world": "bar", - }, - })) - - t.Run("aggregate on all", func(t *testing.T) { - t.Parallel() - cursor, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.PITFilter{}, nil, false)) - require.NoError(t, err) - RequireEqual(t, ledger.BalancesByAssets{ - "USD": big.NewInt(0), - "EUR": big.NewInt(0), - }, cursor) - }) - t.Run("filter on address", func(t *testing.T) { - t.Parallel() - ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.PITFilter{}, - query.Match("address", "users:"), false)) - require.NoError(t, err) - require.Equal(t, ledger.BalancesByAssets{ - "USD": big.NewInt(0).Add( - big.NewInt(0).Mul(bigInt, big.NewInt(2)), - big.NewInt(0).Mul(smallInt, big.NewInt(2)), - ), - }, ret) - }) - t.Run("using pit on effective date", func(t *testing.T) { - t.Parallel() - ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.PITFilter{ - PIT: pointer.For(now.Add(-time.Second)), - }, query.Match("address", "users:"), false)) - require.NoError(t, err) - require.Equal(t, ledger.BalancesByAssets{ - "USD": big.NewInt(0).Add( - bigInt, - smallInt, - ), - }, ret) - }) - t.Run("using pit on insertion date", func(t *testing.T) { - t.Parallel() - ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.PITFilter{ - PIT: pointer.For(now), - }, query.Match("address", "users:"), true)) - require.NoError(t, err) - require.Equal(t, ledger.BalancesByAssets{ - "USD": big.NewInt(0).Add( - bigInt, - smallInt, - ), - }, ret) - }) - t.Run("using a metadata and pit", func(t *testing.T) { - t.Parallel() - - ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.PITFilter{ - PIT: pointer.For(now.Add(time.Minute)), - }, query.Match("metadata[category]", "premium"), false)) - require.NoError(t, err) - require.Equal(t, ledger.BalancesByAssets{ - "USD": big.NewInt(0).Add( - big.NewInt(0).Mul(bigInt, big.NewInt(2)), - big.NewInt(0), - ), - }, ret) - }) - t.Run("using a metadata without pit", func(t *testing.T) { - t.Parallel() - ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.PITFilter{}, - query.Match("metadata[category]", "premium"), false)) - require.NoError(t, err) - require.Equal(t, ledger.BalancesByAssets{ - "USD": big.NewInt(0).Mul(bigInt, big.NewInt(2)), - }, ret) - }) - t.Run("when no matching", func(t *testing.T) { - t.Parallel() - ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.PITFilter{}, - query.Match("metadata[category]", "guest"), false)) - require.NoError(t, err) - require.Equal(t, ledger.BalancesByAssets{}, ret) - }) - - t.Run("using a filter exist on metadata", func(t *testing.T) { - t.Parallel() - ret, err := store.GetAggregatedBalances(ctx, ledgerstore.NewGetAggregatedBalancesQuery(ledgerstore.PITFilter{}, query.Exists("metadata", "category"), false)) - require.NoError(t, err) - require.Equal(t, ledger.BalancesByAssets{ - "USD": big.NewInt(0).Add( - big.NewInt(0).Mul(bigInt, big.NewInt(2)), - big.NewInt(0).Mul(smallInt, big.NewInt(2)), - ), - }, ret) - }) -} - -func RequireEqual(t *testing.T, expected, actual any) { - t.Helper() - if diff := cmp.Diff(expected, actual, cmp.Comparer(bigIntComparer)); diff != "" { - require.Failf(t, "Content not matching", diff) - } -} - -func bigIntComparer(v1 *big.Int, v2 *big.Int) bool { - return v1.String() == v2.String() -} diff --git a/internal/storage/ledger/legacy/debug.go b/internal/storage/ledger/legacy/debug.go deleted file mode 100644 index 64141226f..000000000 --- a/internal/storage/ledger/legacy/debug.go +++ /dev/null @@ -1,42 +0,0 @@ -package legacy - -import ( - "context" - "database/sql" - "fmt" - "github.com/shomali11/xsql" - "github.com/uptrace/bun" -) - -//nolint:unused -func (s *Store) DumpTables(ctx context.Context, tables ...string) { - for _, table := range tables { - s.DumpQuery( - ctx, - s.db.NewSelect(). - ModelTableExpr(s.GetPrefixedRelationName(table)), - ) - } -} - -//nolint:unused -func (s *Store) DumpQuery(ctx context.Context, query *bun.SelectQuery) { - fmt.Println(query) - rows, err := query.Rows(ctx) - if err != nil { - panic(err) - } - s.DumpRows(rows) -} - -//nolint:unused -func (s *Store) DumpRows(rows *sql.Rows) { - data, err := xsql.Pretty(rows) - if err != nil { - panic(err) - } - fmt.Println(data) - if err := rows.Close(); err != nil { - panic(err) - } -} diff --git a/internal/storage/ledger/legacy/errors.go b/internal/storage/ledger/legacy/errors.go deleted file mode 100644 index 41754951e..000000000 --- a/internal/storage/ledger/legacy/errors.go +++ /dev/null @@ -1,30 +0,0 @@ -package legacy - -import ( - "fmt" - - "github.com/pkg/errors" -) - -type errInvalidQuery struct { - msg string -} - -func (e *errInvalidQuery) Error() string { - return e.msg -} - -func (e *errInvalidQuery) Is(err error) bool { - _, ok := err.(*errInvalidQuery) - return ok -} - -func newErrInvalidQuery(msg string, args ...any) *errInvalidQuery { - return &errInvalidQuery{ - msg: fmt.Sprintf(msg, args...), - } -} - -func IsErrInvalidQuery(err error) bool { - return errors.Is(err, &errInvalidQuery{}) -} diff --git a/internal/storage/ledger/legacy/logs.go b/internal/storage/ledger/legacy/logs.go deleted file mode 100644 index a022aa789..000000000 --- a/internal/storage/ledger/legacy/logs.go +++ /dev/null @@ -1,50 +0,0 @@ -package legacy - -import ( - "context" - "fmt" - "github.com/formancehq/go-libs/v2/bun/bunpaginate" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" - - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/uptrace/bun" -) - -func (store *Store) logsQueryBuilder(q ledgercontroller.PaginatedQueryOptions[any]) func(*bun.SelectQuery) *bun.SelectQuery { - return func(selectQuery *bun.SelectQuery) *bun.SelectQuery { - - selectQuery = selectQuery.Where("ledger = ?", store.name).ModelTableExpr(store.GetPrefixedRelationName("logs")) - if q.QueryBuilder != nil { - subQuery, args, err := q.QueryBuilder.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { - switch { - case key == "date": - return fmt.Sprintf("%s %s ?", key, query.DefaultComparisonOperatorsMapping[operator]), []any{value}, nil - default: - return "", nil, fmt.Errorf("unknown key '%s' when building query", key) - } - })) - if err != nil { - panic(err) - } - selectQuery = selectQuery.Where(subQuery, args...) - } - - return selectQuery - } -} - -func (store *Store) GetLogs(ctx context.Context, q GetLogsQuery) (*bunpaginate.Cursor[ledger.Log], error) { - logs, err := paginateWithColumn[ledgercontroller.PaginatedQueryOptions[any], ledgerstore.Log](store, ctx, - (*bunpaginate.ColumnPaginatedQuery[ledgercontroller.PaginatedQueryOptions[any]])(&q), - store.logsQueryBuilder(q.Options), - ) - if err != nil { - return nil, err - } - - return bunpaginate.MapCursor(logs, func(from ledgerstore.Log) ledger.Log { - return from.ToCore() - }), nil -} diff --git a/internal/storage/ledger/legacy/logs_test.go b/internal/storage/ledger/legacy/logs_test.go deleted file mode 100644 index 6e6e298f9..000000000 --- a/internal/storage/ledger/legacy/logs_test.go +++ /dev/null @@ -1,62 +0,0 @@ -//go:build it - -package legacy_test - -import ( - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger/legacy" - "testing" - - "github.com/formancehq/go-libs/v2/bun/bunpaginate" - "github.com/formancehq/go-libs/v2/time" - - "github.com/formancehq/go-libs/v2/logging" - - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/stretchr/testify/require" -) - -func TestLogsList(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - now := time.Now() - ctx := logging.TestingContext() - - for i := 1; i <= 3; i++ { - newLog := ledger.NewLog(ledger.CreatedTransaction{ - Transaction: ledger.NewTransaction(), - AccountMetadata: ledger.AccountMetadata{}, - }) - newLog.Date = now.Add(-time.Duration(i) * time.Hour) - - err := store.newStore.InsertLog(ctx, &newLog) - require.NoError(t, err) - } - - cursor, err := store.GetLogs(ctx, ledgerstore.NewListLogsQuery(ledgercontroller.NewPaginatedQueryOptions[any](nil))) - require.NoError(t, err) - require.Equal(t, bunpaginate.QueryDefaultPageSize, cursor.PageSize) - - require.Equal(t, 3, len(cursor.Data)) - require.EqualValues(t, 3, cursor.Data[0].ID) - - cursor, err = store.GetLogs(ctx, ledgerstore.NewListLogsQuery(ledgercontroller.NewPaginatedQueryOptions[any](nil).WithPageSize(1))) - require.NoError(t, err) - // Should get only the first log. - require.Equal(t, 1, cursor.PageSize) - require.EqualValues(t, 3, cursor.Data[0].ID) - - cursor, err = store.GetLogs(ctx, ledgerstore.NewListLogsQuery(ledgercontroller.NewPaginatedQueryOptions[any](nil). - WithQueryBuilder(query.And( - query.Gte("date", now.Add(-2*time.Hour)), - query.Lt("date", now.Add(-time.Hour)), - )). - WithPageSize(10), - )) - require.NoError(t, err) - require.Equal(t, 10, cursor.PageSize) - // Should get only the second log, as StartTime is inclusive and EndTime exclusive. - require.Len(t, cursor.Data, 1) - require.EqualValues(t, 2, cursor.Data[0].ID) -} diff --git a/internal/storage/ledger/legacy/main_test.go b/internal/storage/ledger/legacy/main_test.go deleted file mode 100644 index 392ea3750..000000000 --- a/internal/storage/ledger/legacy/main_test.go +++ /dev/null @@ -1,79 +0,0 @@ -//go:build it - -package legacy_test - -import ( - "github.com/formancehq/go-libs/v2/bun/bundebug" - "github.com/formancehq/go-libs/v2/testing/docker" - "github.com/formancehq/go-libs/v2/testing/utils" - "github.com/formancehq/ledger/internal/storage/bucket" - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger" - "github.com/formancehq/ledger/internal/storage/ledger/legacy" - systemstore "github.com/formancehq/ledger/internal/storage/system" - "go.opentelemetry.io/otel/trace/noop" - "os" - "testing" - - "github.com/formancehq/go-libs/v2/bun/bunconnect" - - "github.com/uptrace/bun" - - "github.com/formancehq/go-libs/v2/logging" - "github.com/formancehq/go-libs/v2/testing/platform/pgtesting" - ledger "github.com/formancehq/ledger/internal" - "github.com/google/uuid" - "github.com/stretchr/testify/require" -) - -var ( - srv *pgtesting.PostgresServer -) - -func TestMain(m *testing.M) { - utils.WithTestMain(func(t *utils.TestingTForMain) int { - srv = pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing())) - - return m.Run() - }) -} - -type T interface { - require.TestingT - Helper() - Cleanup(func()) -} - -type testStore struct { - *legacy.Store - newStore *ledgerstore.Store -} - -func newLedgerStore(t T) *testStore { - t.Helper() - - ledgerName := uuid.NewString()[:8] - ctx := logging.TestingContext() - - pgDatabase := srv.NewDatabase(t) - - hooks := make([]bun.QueryHook, 0) - if os.Getenv("DEBUG") == "true" { - hooks = append(hooks, bundebug.NewQueryHook()) - } - - db, err := bunconnect.OpenSQLDB(ctx, pgDatabase.ConnectionOptions(), hooks...) - require.NoError(t, err) - - require.NoError(t, systemstore.Migrate(ctx, db)) - - l := ledger.MustNewWithDefault(ledgerName) - - b := bucket.NewDefault(db, noop.Tracer{}, ledger.DefaultBucket) - require.NoError(t, b.Migrate(ctx)) - require.NoError(t, b.AddLedger(ctx, l)) - - return &testStore{ - Store: legacy.New(db, ledger.DefaultBucket, l.Name), - newStore: ledgerstore.New(db, b, l), - } -} diff --git a/internal/storage/ledger/legacy/queries.go b/internal/storage/ledger/legacy/queries.go deleted file mode 100644 index ade76c7aa..000000000 --- a/internal/storage/ledger/legacy/queries.go +++ /dev/null @@ -1,159 +0,0 @@ -package legacy - -import ( - "github.com/formancehq/go-libs/v2/bun/bunpaginate" - "github.com/formancehq/go-libs/v2/pointer" - "github.com/formancehq/go-libs/v2/query" - "github.com/formancehq/go-libs/v2/time" - "github.com/formancehq/ledger/internal/controller/ledger" -) - -type PITFilter struct { - PIT *time.Time `json:"pit"` - OOT *time.Time `json:"oot"` -} - -type PITFilterWithVolumes struct { - PITFilter - ExpandVolumes bool `json:"volumes"` - ExpandEffectiveVolumes bool `json:"effectiveVolumes"` -} - -type FiltersForVolumes struct { - PITFilter - UseInsertionDate bool - GroupLvl int -} - -func NewGetVolumesWithBalancesQuery(opts ledger.PaginatedQueryOptions[FiltersForVolumes]) GetVolumesWithBalancesQuery { - return GetVolumesWithBalancesQuery{ - PageSize: opts.PageSize, - Order: bunpaginate.OrderAsc, - Options: opts, - } -} - -type ListTransactionsQuery bunpaginate.ColumnPaginatedQuery[ledger.PaginatedQueryOptions[PITFilterWithVolumes]] - -func (q ListTransactionsQuery) WithColumn(column string) ListTransactionsQuery { - ret := pointer.For((bunpaginate.ColumnPaginatedQuery[ledger.PaginatedQueryOptions[PITFilterWithVolumes]])(q)) - ret = ret.WithColumn(column) - - return ListTransactionsQuery(*ret) -} - -func NewListTransactionsQuery(options ledger.PaginatedQueryOptions[PITFilterWithVolumes]) ListTransactionsQuery { - return ListTransactionsQuery{ - PageSize: options.PageSize, - Column: "id", - Order: bunpaginate.OrderDesc, - Options: options, - } -} - -type GetTransactionQuery struct { - PITFilterWithVolumes - ID int -} - -func (q GetTransactionQuery) WithExpandVolumes() GetTransactionQuery { - q.ExpandVolumes = true - - return q -} - -func (q GetTransactionQuery) WithExpandEffectiveVolumes() GetTransactionQuery { - q.ExpandEffectiveVolumes = true - - return q -} - -func NewGetTransactionQuery(id int) GetTransactionQuery { - return GetTransactionQuery{ - PITFilterWithVolumes: PITFilterWithVolumes{}, - ID: id, - } -} - -type ListAccountsQuery bunpaginate.OffsetPaginatedQuery[ledger.PaginatedQueryOptions[PITFilterWithVolumes]] - -func (q ListAccountsQuery) WithExpandVolumes() ListAccountsQuery { - q.Options.Options.ExpandVolumes = true - - return q -} - -func (q ListAccountsQuery) WithExpandEffectiveVolumes() ListAccountsQuery { - q.Options.Options.ExpandEffectiveVolumes = true - - return q -} - -func NewListAccountsQuery(opts ledger.PaginatedQueryOptions[PITFilterWithVolumes]) ListAccountsQuery { - return ListAccountsQuery{ - PageSize: opts.PageSize, - Order: bunpaginate.OrderAsc, - Options: opts, - } -} - -type GetAccountQuery struct { - PITFilterWithVolumes - Addr string -} - -func (q GetAccountQuery) WithPIT(pit time.Time) GetAccountQuery { - q.PIT = &pit - - return q -} - -func (q GetAccountQuery) WithExpandVolumes() GetAccountQuery { - q.ExpandVolumes = true - - return q -} - -func (q GetAccountQuery) WithExpandEffectiveVolumes() GetAccountQuery { - q.ExpandEffectiveVolumes = true - - return q -} - -func NewGetAccountQuery(addr string) GetAccountQuery { - return GetAccountQuery{ - Addr: addr, - } -} - -type GetAggregatedBalanceQuery struct { - PITFilter - QueryBuilder query.Builder - UseInsertionDate bool -} - -func NewGetAggregatedBalancesQuery(filter PITFilter, qb query.Builder, useInsertionDate bool) GetAggregatedBalanceQuery { - return GetAggregatedBalanceQuery{ - PITFilter: filter, - QueryBuilder: qb, - UseInsertionDate: useInsertionDate, - } -} - -type GetVolumesWithBalancesQuery bunpaginate.OffsetPaginatedQuery[ledger.PaginatedQueryOptions[FiltersForVolumes]] - -type GetLogsQuery bunpaginate.ColumnPaginatedQuery[ledger.PaginatedQueryOptions[any]] - -func (q GetLogsQuery) WithOrder(order bunpaginate.Order) GetLogsQuery { - q.Order = order - return q -} - -func NewListLogsQuery(options ledger.PaginatedQueryOptions[any]) GetLogsQuery { - return GetLogsQuery{ - PageSize: options.PageSize, - Column: "id", - Order: bunpaginate.OrderDesc, - Options: options, - } -} diff --git a/internal/storage/ledger/legacy/store.go b/internal/storage/ledger/legacy/store.go deleted file mode 100644 index d3ae4cb62..000000000 --- a/internal/storage/ledger/legacy/store.go +++ /dev/null @@ -1,43 +0,0 @@ -package legacy - -import ( - "fmt" - _ "github.com/jackc/pgx/v5/stdlib" - "github.com/uptrace/bun" -) - -type Store struct { - db bun.IDB - - bucket string - name string -} - -func (store *Store) GetPrefixedRelationName(v string) string { - return fmt.Sprintf(`"%s".%s`, store.bucket, v) -} - -func (store *Store) Name() string { - return store.name -} - -func (store *Store) GetDB() bun.IDB { - return store.db -} - -func (store Store) WithDB(db bun.IDB) *Store { - store.db = db - return &store -} - -func New( - db bun.IDB, - bucket string, - name string, -) *Store { - return &Store{ - db: db, - bucket: bucket, - name: name, - } -} diff --git a/internal/storage/ledger/legacy/transactions.go b/internal/storage/ledger/legacy/transactions.go deleted file mode 100644 index b9bd8399a..000000000 --- a/internal/storage/ledger/legacy/transactions.go +++ /dev/null @@ -1,206 +0,0 @@ -package legacy - -import ( - "context" - "errors" - "fmt" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - "regexp" - - "github.com/formancehq/go-libs/v2/time" - - "github.com/formancehq/go-libs/v2/bun/bunpaginate" - - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/uptrace/bun" -) - -var ( - metadataRegex = regexp.MustCompile(`metadata\[(.+)]`) -) - -func (store *Store) buildTransactionQuery(p PITFilterWithVolumes, query *bun.SelectQuery) *bun.SelectQuery { - - selectMetadata := query.NewSelect(). - ModelTableExpr(store.GetPrefixedRelationName("transactions_metadata")). - Where("transactions.seq = transactions_metadata.transactions_seq"). - Order("revision desc"). - Limit(1) - - if p.PIT != nil && !p.PIT.IsZero() { - selectMetadata = selectMetadata.Where("date <= ?", p.PIT) - } - - query = query. - ModelTableExpr(store.GetPrefixedRelationName("transactions")). - Where("transactions.ledger = ?", store.name) - - if p.PIT != nil && !p.PIT.IsZero() { - query = query. - Where("timestamp <= ?", p.PIT). - Column("id", "inserted_at", "timestamp", "postings"). - Column("transactions_metadata.metadata"). - Join(fmt.Sprintf(`left join lateral (%s) as transactions_metadata on true`, selectMetadata.String())). - ColumnExpr(fmt.Sprintf("case when reverted_at is not null and reverted_at > '%s' then null else reverted_at end", p.PIT.Format(time.DateFormat))) - } else { - query = query.Column( - "transactions.metadata", - "transactions.id", - "transactions.inserted_at", - "transactions.timestamp", - "transactions.postings", - "transactions.reverted_at", - "transactions.reference", - ) - } - - if p.ExpandEffectiveVolumes { - query = query.ColumnExpr(store.GetPrefixedRelationName("get_aggregated_effective_volumes_for_transaction")+"(?, transactions.seq) as post_commit_effective_volumes", store.name) - } - if p.ExpandVolumes { - query = query.ColumnExpr(store.GetPrefixedRelationName("get_aggregated_volumes_for_transaction")+"(?, transactions.seq) as post_commit_volumes", store.name) - } - return query -} - -func (store *Store) transactionQueryContext(qb query.Builder, q ListTransactionsQuery) (string, []any, error) { - - return qb.Build(query.ContextFn(func(key, operator string, value any) (string, []any, error) { - switch { - case key == "reference" || key == "timestamp": - return fmt.Sprintf("%s %s ?", key, query.DefaultComparisonOperatorsMapping[operator]), []any{value}, nil - case key == "reverted": - if operator != "$match" { - return "", nil, newErrInvalidQuery("'reverted' column can only be used with $match") - } - switch value := value.(type) { - case bool: - ret := "reverted_at is" - if value { - ret += " not" - } - return ret + " null", nil, nil - default: - return "", nil, newErrInvalidQuery("'reverted' can only be used with bool value") - } - case key == "account": - if operator != "$match" { - return "", nil, newErrInvalidQuery("'account' column can only be used with $match") - } - switch address := value.(type) { - case string: - return filterAccountAddressOnTransactions(address, true, true), nil, nil - default: - return "", nil, newErrInvalidQuery("unexpected type %T for column 'account'", address) - } - case key == "source": - if operator != "$match" { - return "", nil, errors.New("'source' column can only be used with $match") - } - switch address := value.(type) { - case string: - return filterAccountAddressOnTransactions(address, true, false), nil, nil - default: - return "", nil, newErrInvalidQuery("unexpected type %T for column 'source'", address) - } - case key == "destination": - if operator != "$match" { - return "", nil, errors.New("'destination' column can only be used with $match") - } - switch address := value.(type) { - case string: - return filterAccountAddressOnTransactions(address, false, true), nil, nil - default: - return "", nil, newErrInvalidQuery("unexpected type %T for column 'destination'", address) - } - case metadataRegex.Match([]byte(key)): - if operator != "$match" { - return "", nil, newErrInvalidQuery("'account' column can only be used with $match") - } - match := metadataRegex.FindAllStringSubmatch(key, 3) - - key := "metadata" - if q.Options.Options.PIT != nil && !q.Options.Options.PIT.IsZero() { - key = "transactions_metadata.metadata" - } - - return key + " @> ?", []any{map[string]any{ - match[0][1]: value, - }}, nil - - case key == "metadata": - if operator != "$exists" { - return "", nil, newErrInvalidQuery("'metadata' key filter can only be used with $exists") - } - if q.Options.Options.PIT != nil && !q.Options.Options.PIT.IsZero() { - key = "transactions_metadata.metadata" - } - - return fmt.Sprintf("%s -> ? IS NOT NULL", key), []any{value}, nil - default: - return "", nil, newErrInvalidQuery("unknown key '%s' when building query", key) - } - })) -} - -func (store *Store) buildTransactionListQuery(selectQuery *bun.SelectQuery, q ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes], where string, args []any) *bun.SelectQuery { - - selectQuery = store.buildTransactionQuery(q.Options, selectQuery) - if where != "" { - return selectQuery.Where(where, args...) - } - - return selectQuery -} - -func (store *Store) GetTransactions(ctx context.Context, q ListTransactionsQuery) (*bunpaginate.Cursor[ledger.Transaction], error) { - - var ( - where string - args []any - err error - ) - if q.Options.QueryBuilder != nil { - where, args, err = store.transactionQueryContext(q.Options.QueryBuilder, q) - if err != nil { - return nil, err - } - } - - return paginateWithColumn[ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes], ledger.Transaction](store, ctx, - (*bunpaginate.ColumnPaginatedQuery[ledgercontroller.PaginatedQueryOptions[PITFilterWithVolumes]])(&q), - func(query *bun.SelectQuery) *bun.SelectQuery { - return store.buildTransactionListQuery(query, q.Options, where, args) - }, - ) -} - -func (store *Store) CountTransactions(ctx context.Context, q ListTransactionsQuery) (int, error) { - - var ( - where string - args []any - err error - ) - - if q.Options.QueryBuilder != nil { - where, args, err = store.transactionQueryContext(q.Options.QueryBuilder, q) - if err != nil { - return 0, err - } - } - - return count[ledger.Transaction](store, true, ctx, func(query *bun.SelectQuery) *bun.SelectQuery { - return store.buildTransactionListQuery(query, q.Options, where, args) - }) -} - -func (store *Store) GetTransactionWithVolumes(ctx context.Context, filter GetTransactionQuery) (*ledger.Transaction, error) { - return fetch[*ledger.Transaction](store, true, ctx, - func(query *bun.SelectQuery) *bun.SelectQuery { - return store.buildTransactionQuery(filter.PITFilterWithVolumes, query). - Where("transactions.id = ?", filter.ID). - Limit(1) - }) -} diff --git a/internal/storage/ledger/legacy/transactions_test.go b/internal/storage/ledger/legacy/transactions_test.go deleted file mode 100644 index 23b778f5a..000000000 --- a/internal/storage/ledger/legacy/transactions_test.go +++ /dev/null @@ -1,281 +0,0 @@ -//go:build it - -package legacy_test - -import ( - "context" - "fmt" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger/legacy" - "math/big" - "testing" - - "github.com/formancehq/go-libs/v2/time" - - "github.com/pkg/errors" - - "github.com/formancehq/go-libs/v2/logging" - "github.com/formancehq/go-libs/v2/pointer" - - "github.com/formancehq/go-libs/v2/metadata" - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/stretchr/testify/require" -) - -func TestGetTransactionWithVolumes(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - now := time.Now() - ctx := logging.TestingContext() - - tx1 := ledger.NewTransaction(). - WithPostings( - ledger.NewPosting("world", "central_bank", "USD", big.NewInt(100)), - ). - WithReference("tx1"). - WithTimestamp(now.Add(-3 * time.Hour)) - err := store.newStore.CommitTransaction(ctx, &tx1) - require.NoError(t, err) - - tx2 := ledger.NewTransaction(). - WithPostings( - ledger.NewPosting("world", "central_bank", "USD", big.NewInt(100)), - ). - WithReference("tx2"). - WithTimestamp(now.Add(-2 * time.Hour)) - err = store.newStore.CommitTransaction(ctx, &tx2) - require.NoError(t, err) - - tx, err := store.GetTransactionWithVolumes(ctx, ledgerstore.NewGetTransactionQuery(tx1.ID). - WithExpandVolumes(). - WithExpandEffectiveVolumes()) - require.NoError(t, err) - require.Equal(t, tx1.Postings, tx.Postings) - require.Equal(t, tx1.Reference, tx.Reference) - require.Equal(t, tx1.Timestamp, tx.Timestamp) - RequireEqual(t, ledger.PostCommitVolumes{ - "world": { - "USD": { - Input: big.NewInt(0), - Output: big.NewInt(100), - }, - }, - "central_bank": { - "USD": { - Input: big.NewInt(100), - Output: big.NewInt(0), - }, - }, - }, tx.PostCommitVolumes) - - tx, err = store.GetTransactionWithVolumes(ctx, ledgerstore.NewGetTransactionQuery(tx2.ID). - WithExpandVolumes(). - WithExpandEffectiveVolumes()) - require.NoError(t, err) - require.Equal(t, tx2.Postings, tx.Postings) - require.Equal(t, tx2.Reference, tx.Reference) - require.Equal(t, tx2.Timestamp, tx.Timestamp) - RequireEqual(t, ledger.PostCommitVolumes{ - "world": { - "USD": { - Input: big.NewInt(0), - Output: big.NewInt(200), - }, - }, - "central_bank": { - "USD": { - Input: big.NewInt(200), - Output: big.NewInt(0), - }, - }, - }, tx.PostCommitVolumes) -} - -func TestCountTransactions(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - ctx := logging.TestingContext() - - for i := 0; i < 3; i++ { - tx := ledger.NewTransaction().WithPostings( - ledger.NewPosting("world", fmt.Sprintf("account%d", i), "USD", big.NewInt(100)), - ) - err := store.newStore.CommitTransaction(ctx, &tx) - require.NoError(t, err) - } - - count, err := store.CountTransactions(context.Background(), ledgerstore.NewListTransactionsQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}))) - require.NoError(t, err, "counting transactions should not fail") - require.Equal(t, 3, count, "count should be equal") -} - -func TestGetTransactions(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - now := time.Now() - ctx := logging.TestingContext() - - tx1 := ledger.NewTransaction(). - WithPostings( - ledger.NewPosting("world", "alice", "USD", big.NewInt(100)), - ). - WithMetadata(metadata.Metadata{"category": "1"}). - WithTimestamp(now.Add(-3 * time.Hour)) - err := store.newStore.CommitTransaction(ctx, &tx1) - require.NoError(t, err) - - tx2 := ledger.NewTransaction(). - WithPostings( - ledger.NewPosting("world", "bob", "USD", big.NewInt(100)), - ). - WithMetadata(metadata.Metadata{"category": "2"}). - WithTimestamp(now.Add(-2 * time.Hour)) - err = store.newStore.CommitTransaction(ctx, &tx2) - require.NoError(t, err) - - tx3BeforeRevert := ledger.NewTransaction(). - WithPostings( - ledger.NewPosting("world", "users:marley", "USD", big.NewInt(100)), - ). - WithMetadata(metadata.Metadata{"category": "3"}). - WithTimestamp(now.Add(-time.Hour)) - err = store.newStore.CommitTransaction(ctx, &tx3BeforeRevert) - require.NoError(t, err) - - _, hasBeenReverted, err := store.newStore.RevertTransaction(ctx, tx3BeforeRevert.ID, time.Time{}) - require.NoError(t, err) - require.True(t, hasBeenReverted) - - tx4 := tx3BeforeRevert.Reverse().WithTimestamp(now) - err = store.newStore.CommitTransaction(ctx, &tx4) - require.NoError(t, err) - - _, _, err = store.newStore.UpdateTransactionMetadata(ctx, tx3BeforeRevert.ID, metadata.Metadata{ - "additional_metadata": "true", - }) - require.NoError(t, err) - - // refresh tx3 - // we can't take the result of the call on RevertTransaction nor UpdateTransactionMetadata as the result does not contains pc(e)v - tx3 := func() ledger.Transaction { - tx3, err := store.Store.GetTransactionWithVolumes(ctx, ledgerstore.NewGetTransactionQuery(tx3BeforeRevert.ID). - WithExpandVolumes(). - WithExpandEffectiveVolumes()) - require.NoError(t, err) - return *tx3 - }() - - tx5 := ledger.NewTransaction(). - WithPostings( - ledger.NewPosting("users:marley", "sellers:amazon", "USD", big.NewInt(100)), - ). - WithTimestamp(now) - err = store.newStore.CommitTransaction(ctx, &tx5) - require.NoError(t, err) - - type testCase struct { - name string - query ledgercontroller.PaginatedQueryOptions[ledgerstore.PITFilterWithVolumes] - expected []ledger.Transaction - expectError error - } - testCases := []testCase{ - { - name: "nominal", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}), - expected: []ledger.Transaction{tx5, tx4, tx3, tx2, tx1}, - }, - { - name: "address filter", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Match("account", "bob")), - expected: []ledger.Transaction{tx2}, - }, - { - name: "address filter using segments matching two addresses by individual segments", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Match("account", "users:amazon")), - expected: []ledger.Transaction{}, - }, - { - name: "address filter using segment", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Match("account", "users:")), - expected: []ledger.Transaction{tx5, tx4, tx3}, - }, - { - name: "filter using metadata", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Match("metadata[category]", "2")), - expected: []ledger.Transaction{tx2}, - }, - { - name: "using point in time", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{ - PITFilter: ledgerstore.PITFilter{ - PIT: pointer.For(now.Add(-time.Hour)), - }, - }), - expected: []ledger.Transaction{tx3BeforeRevert, tx2, tx1}, - }, - { - name: "reverted transactions", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Match("reverted", true)), - expected: []ledger.Transaction{tx3}, - }, - { - name: "filter using exists metadata", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Exists("metadata", "category")), - expected: []ledger.Transaction{tx3, tx2, tx1}, - }, - { - name: "filter using metadata and pit", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{ - PITFilter: ledgerstore.PITFilter{ - PIT: pointer.For(tx3.Timestamp), - }, - }). - WithQueryBuilder(query.Match("metadata[category]", "2")), - expected: []ledger.Transaction{tx2}, - }, - { - name: "filter using not exists metadata", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Not(query.Exists("metadata", "category"))), - expected: []ledger.Transaction{tx5, tx4}, - }, - { - name: "filter using timestamp", - query: ledgercontroller.NewPaginatedQueryOptions(ledgerstore.PITFilterWithVolumes{}). - WithQueryBuilder(query.Match("timestamp", tx5.Timestamp.Format(time.RFC3339Nano))), - expected: []ledger.Transaction{tx5, tx4}, - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - tc.query.Options.ExpandVolumes = true - tc.query.Options.ExpandEffectiveVolumes = true - - cursor, err := store.GetTransactions(ctx, ledgerstore.NewListTransactionsQuery(tc.query)) - if tc.expectError != nil { - require.True(t, errors.Is(err, tc.expectError)) - } else { - require.NoError(t, err) - require.Len(t, cursor.Data, len(tc.expected)) - RequireEqual(t, tc.expected, cursor.Data) - - count, err := store.CountTransactions(ctx, ledgerstore.NewListTransactionsQuery(tc.query)) - require.NoError(t, err) - - require.EqualValues(t, len(tc.expected), count) - } - }) - } -} diff --git a/internal/storage/ledger/legacy/utils.go b/internal/storage/ledger/legacy/utils.go deleted file mode 100644 index 97c6b51c6..000000000 --- a/internal/storage/ledger/legacy/utils.go +++ /dev/null @@ -1,185 +0,0 @@ -package legacy - -import ( - "context" - "encoding/json" - "fmt" - "github.com/formancehq/go-libs/v2/platform/postgres" - "reflect" - "strings" - - "github.com/formancehq/go-libs/v2/time" - - "github.com/formancehq/go-libs/v2/bun/bunpaginate" - - "github.com/uptrace/bun" -) - -func fetch[T any](s *Store, addModel bool, ctx context.Context, builders ...func(query *bun.SelectQuery) *bun.SelectQuery) (T, error) { - - var ret T - ret = reflect.New(reflect.TypeOf(ret).Elem()).Interface().(T) - - query := s.db.NewSelect() - - if addModel { - query = query.Model(ret) - } - - for _, builder := range builders { - query = query.Apply(builder) - } - - if err := query.Scan(ctx, ret); err != nil { - return ret, postgres.ResolveError(err) - } - - return ret, nil -} - -func paginateWithOffset[FILTERS any, RETURN any](s *Store, ctx context.Context, - q *bunpaginate.OffsetPaginatedQuery[FILTERS], builders ...func(query *bun.SelectQuery) *bun.SelectQuery) (*bunpaginate.Cursor[RETURN], error) { - - query := s.db.NewSelect() - for _, builder := range builders { - query = query.Apply(builder) - } - return bunpaginate.UsingOffset[FILTERS, RETURN](ctx, query, *q) -} - -func paginateWithOffsetWithoutModel[FILTERS any, RETURN any](s *Store, ctx context.Context, - q *bunpaginate.OffsetPaginatedQuery[FILTERS], builders ...func(query *bun.SelectQuery) *bun.SelectQuery) (*bunpaginate.Cursor[RETURN], error) { - - query := s.db.NewSelect() - for _, builder := range builders { - query = query.Apply(builder) - } - - return bunpaginate.UsingOffset[FILTERS, RETURN](ctx, query, *q) -} - -func paginateWithColumn[FILTERS any, RETURN any](s *Store, ctx context.Context, q *bunpaginate.ColumnPaginatedQuery[FILTERS], builders ...func(query *bun.SelectQuery) *bun.SelectQuery) (*bunpaginate.Cursor[RETURN], error) { - query := s.db.NewSelect() - for _, builder := range builders { - query = query.Apply(builder) - } - - ret, err := bunpaginate.UsingColumn[FILTERS, RETURN](ctx, query, *q) - if err != nil { - return nil, postgres.ResolveError(err) - } - - return ret, nil -} - -func count[T any](s *Store, addModel bool, ctx context.Context, builders ...func(query *bun.SelectQuery) *bun.SelectQuery) (int, error) { - query := s.db.NewSelect() - if addModel { - query = query.Model((*T)(nil)) - } - for _, builder := range builders { - query = query.Apply(builder) - } - return s.db.NewSelect(). - TableExpr("(" + query.String() + ") data"). - Count(ctx) -} - -func filterAccountAddress(address, key string) string { - parts := make([]string, 0) - src := strings.Split(address, ":") - - needSegmentCheck := false - for _, segment := range src { - needSegmentCheck = segment == "" - if needSegmentCheck { - break - } - } - - if needSegmentCheck { - parts = append(parts, fmt.Sprintf("jsonb_array_length(%s_array) = %d", key, len(src))) - - for i, segment := range src { - if len(segment) == 0 { - continue - } - parts = append(parts, fmt.Sprintf("%s_array @@ ('$[%d] == \"%s\"')::jsonpath", key, i, segment)) - } - } else { - parts = append(parts, fmt.Sprintf("%s = '%s'", key, address)) - } - - return strings.Join(parts, " and ") -} - -func filterAccountAddressOnTransactions(address string, source, destination bool) string { - src := strings.Split(address, ":") - - needSegmentCheck := false - for _, segment := range src { - needSegmentCheck = segment == "" - if needSegmentCheck { - break - } - } - - if needSegmentCheck { - m := map[string]any{ - fmt.Sprint(len(src)): nil, - } - parts := make([]string, 0) - - for i, segment := range src { - if len(segment) == 0 { - continue - } - m[fmt.Sprint(i)] = segment - } - - data, err := json.Marshal([]any{m}) - if err != nil { - panic(err) - } - - if source { - parts = append(parts, fmt.Sprintf("sources_arrays @> '%s'", string(data))) - } - if destination { - parts = append(parts, fmt.Sprintf("destinations_arrays @> '%s'", string(data))) - } - return strings.Join(parts, " or ") - } else { - data, err := json.Marshal([]string{address}) - if err != nil { - panic(err) - } - - parts := make([]string, 0) - if source { - parts = append(parts, fmt.Sprintf("sources @> '%s'", string(data))) - } - if destination { - parts = append(parts, fmt.Sprintf("destinations @> '%s'", string(data))) - } - return strings.Join(parts, " or ") - } -} - -func filterPIT(pit *time.Time, column string) func(query *bun.SelectQuery) *bun.SelectQuery { - return func(query *bun.SelectQuery) *bun.SelectQuery { - if pit == nil || pit.IsZero() { - return query - } - return query.Where(fmt.Sprintf("%s <= ?", column), pit) - } -} - -func filterOOT(oot *time.Time, column string) func(query *bun.SelectQuery) *bun.SelectQuery { - return func(query *bun.SelectQuery) *bun.SelectQuery { - if oot == nil || oot.IsZero() { - return query - } - return query.Where(fmt.Sprintf("%s >= ?", column), oot) - } -} diff --git a/internal/storage/ledger/legacy/volumes.go b/internal/storage/ledger/legacy/volumes.go deleted file mode 100644 index c427227c6..000000000 --- a/internal/storage/ledger/legacy/volumes.go +++ /dev/null @@ -1,188 +0,0 @@ -package legacy - -import ( - "context" - "fmt" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - "regexp" - - "github.com/formancehq/go-libs/v2/bun/bunpaginate" - lquery "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/uptrace/bun" -) - -func (store *Store) volumesQueryContext(q GetVolumesWithBalancesQuery) (string, []any, bool, error) { - - metadataRegex := regexp.MustCompile(`metadata\[(.+)]`) - balanceRegex := regexp.MustCompile(`balance\[(.*)]`) - var ( - subQuery string - args []any - err error - ) - - var useMetadata = false - - if q.Options.QueryBuilder != nil { - subQuery, args, err = q.Options.QueryBuilder.Build(lquery.ContextFn(func(key, operator string, value any) (string, []any, error) { - - convertOperatorToSQL := func() string { - switch operator { - case "$match": - return "=" - case "$lt": - return "<" - case "$gt": - return ">" - case "$lte": - return "<=" - case "$gte": - return ">=" - } - panic("unreachable") - } - - switch { - case key == "account" || key == "address": - if operator != "$match" { - return "", nil, newErrInvalidQuery("'%s' column can only be used with $match", key) - } - - switch address := value.(type) { - case string: - return filterAccountAddress(address, "accounts_address"), nil, nil - default: - return "", nil, newErrInvalidQuery("unexpected type %T for column 'address'", address) - } - case metadataRegex.Match([]byte(key)): - if operator != "$match" { - return "", nil, newErrInvalidQuery("'metadata' column can only be used with $match") - } - useMetadata = true - match := metadataRegex.FindAllStringSubmatch(key, 3) - key := "metadata" - - return key + " @> ?", []any{map[string]any{ - match[0][1]: value, - }}, nil - case key == "metadata": - if operator != "$exists" { - return "", nil, newErrInvalidQuery("'metadata' key filter can only be used with $exists") - } - useMetadata = true - key := "metadata" - - return fmt.Sprintf("%s -> ? IS NOT NULL", key), []any{value}, nil - case balanceRegex.Match([]byte(key)): - match := balanceRegex.FindAllStringSubmatch(key, 2) - return fmt.Sprintf(`balance %s ? and asset = ?`, convertOperatorToSQL()), []any{value, match[0][1]}, nil - default: - return "", nil, newErrInvalidQuery("unknown key '%s' when building query", key) - } - })) - if err != nil { - return "", nil, false, err - } - } - - return subQuery, args, useMetadata, nil - -} - -func (store *Store) buildVolumesWithBalancesQuery(query *bun.SelectQuery, q GetVolumesWithBalancesQuery, where string, args []any, useMetadata bool) *bun.SelectQuery { - - filtersForVolumes := q.Options.Options - dateFilterColumn := "effective_date" - - if filtersForVolumes.UseInsertionDate { - dateFilterColumn = "insertion_date" - } - - selectAccounts := store.GetDB().NewSelect(). - Column("accounts_address_array"). - Column("accounts_address"). - Column("accounts_seq"). - Column("asset"). - Column("ledger"). - ColumnExpr("sum(case when not is_source then amount else 0 end) as input"). - ColumnExpr("sum(case when is_source then amount else 0 end) as output"). - ColumnExpr("sum(case when not is_source then amount else -amount end) as balance"). - ModelTableExpr(store.GetPrefixedRelationName("moves")). - Group("ledger", "accounts_seq", "accounts_address", "accounts_address_array", "asset"). - Apply(filterPIT(filtersForVolumes.PIT, dateFilterColumn)). - Apply(filterOOT(filtersForVolumes.OOT, dateFilterColumn)) - - query = query. - TableExpr("(?) accountsWithVolumes", selectAccounts). - Column( - "accounts_address", - "accounts_address_array", - "accounts_seq", - "ledger", - "asset", - "input", - "output", - "balance", - ) - - if useMetadata { - query = query. - ColumnExpr("accounts_metadata.metadata as metadata"). - Join(`join lateral ( - select metadata - from ` + store.GetPrefixedRelationName("accounts") + ` a - where a.seq = accountsWithVolumes.accounts_seq - ) accounts_metadata on true`, - ) - } - - query = query. - Where("ledger = ?", store.name) - - globalQuery := query.NewSelect() - globalQuery = globalQuery. - With("query", query). - ModelTableExpr("query") - - if where != "" { - globalQuery.Where(where, args...) - } - - if filtersForVolumes.GroupLvl > 0 { - globalQuery = globalQuery. - ColumnExpr(fmt.Sprintf(`(array_to_string((string_to_array(accounts_address, ':'))[1:LEAST(array_length(string_to_array(accounts_address, ':'),1),%d)],':')) as account`, filtersForVolumes.GroupLvl)). - ColumnExpr("asset"). - ColumnExpr("sum(input) as input"). - ColumnExpr("sum(output) as output"). - ColumnExpr("sum(balance) as balance"). - GroupExpr("account, asset") - } else { - globalQuery = globalQuery.ColumnExpr("accounts_address as account, asset, input, output, balance") - } - globalQuery = globalQuery.Order("account", "asset") - - return globalQuery -} - -func (store *Store) GetVolumesWithBalances(ctx context.Context, q GetVolumesWithBalancesQuery) (*bunpaginate.Cursor[ledger.VolumesWithBalanceByAssetByAccount], error) { - var ( - where string - args []any - err error - useMetadata bool - ) - if q.Options.QueryBuilder != nil { - where, args, useMetadata, err = store.volumesQueryContext(q) - if err != nil { - return nil, err - } - } - - return paginateWithOffsetWithoutModel[ledgercontroller.PaginatedQueryOptions[FiltersForVolumes], ledger.VolumesWithBalanceByAssetByAccount]( - store, ctx, (*bunpaginate.OffsetPaginatedQuery[ledgercontroller.PaginatedQueryOptions[FiltersForVolumes]])(&q), - func(query *bun.SelectQuery) *bun.SelectQuery { - return store.buildVolumesWithBalancesQuery(query, q, where, args, useMetadata) - }, - ) -} diff --git a/internal/storage/ledger/legacy/volumes_test.go b/internal/storage/ledger/legacy/volumes_test.go deleted file mode 100644 index 4f5553167..000000000 --- a/internal/storage/ledger/legacy/volumes_test.go +++ /dev/null @@ -1,675 +0,0 @@ -//go:build it - -package legacy_test - -import ( - "github.com/formancehq/go-libs/v2/pointer" - ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger" - ledgerstore "github.com/formancehq/ledger/internal/storage/ledger/legacy" - "math/big" - "testing" - - "github.com/formancehq/go-libs/v2/time" - - "github.com/formancehq/go-libs/v2/logging" - - "github.com/formancehq/go-libs/v2/metadata" - "github.com/formancehq/go-libs/v2/query" - ledger "github.com/formancehq/ledger/internal" - "github.com/stretchr/testify/require" -) - -func TestVolumesList(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - now := time.Now() - ctx := logging.TestingContext() - - previousPIT := now.Add(-2 * time.Minute) - futurPIT := now.Add(2 * time.Minute) - - previousOOT := now.Add(-2 * time.Minute) - futurOOT := now.Add(2 * time.Minute) - - require.NoError(t, store.newStore.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{ - "account:1": { - "category": "1", - }, - "account:2": { - "category": "2", - }, - "world": { - "foo": "bar", - }, - })) - - tx1 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(100))). - WithTimestamp(now.Add(-4 * time.Minute)). - WithInsertedAt(now.Add(4 * time.Minute)) - err := store.newStore.CommitTransaction(ctx, &tx1) - require.NoError(t, err) - - tx2 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(100))). - WithTimestamp(now.Add(-3 * time.Minute)). - WithInsertedAt(now.Add(3 * time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx2) - require.NoError(t, err) - - tx3 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("account:1", "bank", "USD", big.NewInt(50))). - WithTimestamp(now.Add(-2 * time.Minute)). - WithInsertedAt(now.Add(2 * time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx3) - require.NoError(t, err) - - tx4 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1", "USD", big.NewInt(0))). - WithTimestamp(now.Add(-time.Minute)). - WithInsertedAt(now.Add(time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx4) - require.NoError(t, err) - - tx5 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:2", "USD", big.NewInt(50))). - WithTimestamp(now). - WithInsertedAt(now) - err = store.newStore.CommitTransaction(ctx, &tx5) - require.NoError(t, err) - - tx6 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:2", "USD", big.NewInt(50))). - WithTimestamp(now.Add(1 * time.Minute)). - WithInsertedAt(now.Add(-time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx6) - require.NoError(t, err) - - tx7 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("account:2", "bank", "USD", big.NewInt(50))). - WithTimestamp(now.Add(2 * time.Minute)). - WithInsertedAt(now.Add(-2 * time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx7) - require.NoError(t, err) - - tx8 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:2", "USD", big.NewInt(25))). - WithTimestamp(now.Add(3 * time.Minute)). - WithInsertedAt(now.Add(-3 * time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx8) - require.NoError(t, err) - - t.Run("Get all volumes with balance for insertion date", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.FiltersForVolumes{UseInsertionDate: true}))) - require.NoError(t, err) - - require.Len(t, volumes.Data, 4) - }) - - t.Run("Get all volumes with balance for effective date", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions(ledgerstore.FiltersForVolumes{UseInsertionDate: false}))) - require.NoError(t, err) - - require.Len(t, volumes.Data, 4) - }) - - t.Run("Get all volumes with balance for insertion date with previous pit", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &previousPIT, OOT: nil}, - UseInsertionDate: true, - }))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 3) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:2", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(25), - Output: big.NewInt(50), - Balance: big.NewInt(-25), - }, - }, volumes.Data[0]) - }) - - t.Run("Get all volumes with balance for insertion date with futur pit", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &futurPIT, OOT: nil}, - UseInsertionDate: true, - }))) - require.NoError(t, err) - - require.Len(t, volumes.Data, 4) - }) - - t.Run("Get all volumes with balance for insertion date with previous oot", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: nil, OOT: &previousOOT}, - UseInsertionDate: true, - }))) - require.NoError(t, err) - - require.Len(t, volumes.Data, 4) - }) - - t.Run("Get all volumes with balance for insertion date with future oot", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: nil, OOT: &futurOOT}, - UseInsertionDate: true, - }))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 3) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:1", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(200), - Output: big.NewInt(50), - Balance: big.NewInt(150), - }, - }, volumes.Data[0]) - }) - - t.Run("Get all volumes with balance for effective date with previous pit", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &previousPIT, OOT: nil}, - UseInsertionDate: false, - }))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 3) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:1", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(200), - Output: big.NewInt(50), - Balance: big.NewInt(150), - }, - }, volumes.Data[0]) - }) - - t.Run("Get all volumes with balance for effective date with futur pit", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &futurPIT, OOT: nil}, - UseInsertionDate: false, - }))) - require.NoError(t, err) - - require.Len(t, volumes.Data, 4) - }) - - t.Run("Get all volumes with balance for effective date with previous oot", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: nil, OOT: &previousOOT}, - UseInsertionDate: false, - }))) - require.NoError(t, err) - - require.Len(t, volumes.Data, 4) - }) - - t.Run("Get all volumes with balance for effective date with futur oot", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: nil, OOT: &futurOOT}, - UseInsertionDate: false, - }))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 3) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:2", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(25), - Output: big.NewInt(50), - Balance: big.NewInt(-25), - }, - }, volumes.Data[0]) - }) - - t.Run("Get all volumes with balance for insertion date with future PIT and now OOT", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &futurPIT, OOT: &now}, - UseInsertionDate: true, - }))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 4) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:1", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(0), - Output: big.NewInt(50), - Balance: big.NewInt(-50), - }, - }, volumes.Data[0]) - - }) - - t.Run("Get all volumes with balance for insertion date with previous OOT and now PIT", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &now, OOT: &previousOOT}, - UseInsertionDate: true, - }))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 3) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:2", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(100), - Output: big.NewInt(50), - Balance: big.NewInt(50), - }, - }, volumes.Data[0]) - - }) - - t.Run("Get all volumes with balance for effective date with future PIT and now OOT", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &futurPIT, OOT: &now}, - UseInsertionDate: false, - }))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 3) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:2", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(100), - Output: big.NewInt(50), - Balance: big.NewInt(50), - }, - }, volumes.Data[0]) - }) - - t.Run("Get all volumes with balance for insertion date with previous OOT and now PIT", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery(ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &now, OOT: &previousOOT}, - UseInsertionDate: false, - }))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 4) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:1", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(0), - Output: big.NewInt(50), - Balance: big.NewInt(-50), - }, - }, volumes.Data[0]) - - }) - - t.Run("Get account1 volume and Balance for insertion date with previous OOT and now PIT", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, - ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{PIT: &now, OOT: &previousOOT}, - UseInsertionDate: false, - }).WithQueryBuilder(query.Match("account", "account:1"))), - ) - - require.NoError(t, err) - require.Len(t, volumes.Data, 1) - require.Equal(t, ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:1", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(0), - Output: big.NewInt(50), - Balance: big.NewInt(-50), - }, - }, volumes.Data[0]) - - }) - - t.Run("Using Metadata regex", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, - ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{}).WithQueryBuilder(query.Match("metadata[foo]", "bar"))), - ) - - require.NoError(t, err) - require.Len(t, volumes.Data, 1) - - }) - - t.Run("Using exists metadata filter 1", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, - ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{}).WithQueryBuilder(query.Exists("metadata", "category"))), - ) - - require.NoError(t, err) - require.Len(t, volumes.Data, 2) - }) - - t.Run("Using exists metadata filter 2", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, - ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{}).WithQueryBuilder(query.Exists("metadata", "foo"))), - ) - - require.NoError(t, err) - require.Len(t, volumes.Data, 1) - }) -} - -func TestVolumesAggregate(t *testing.T) { - t.Parallel() - store := newLedgerStore(t) - now := time.Now() - ctx := logging.TestingContext() - - pit := now.Add(2 * time.Minute) - oot := now.Add(-2 * time.Minute) - - tx1 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1:2", "USD", big.NewInt(100))). - WithTimestamp(now.Add(-4 * time.Minute)). - WithInsertedAt(now) - err := store.newStore.CommitTransaction(ctx, &tx1) - require.NoError(t, err) - - tx2 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1:1", "EUR", big.NewInt(100))). - WithTimestamp(now.Add(-3 * time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx2) - require.NoError(t, err) - - tx3 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1:2", "EUR", big.NewInt(50))). - WithTimestamp(now.Add(-2 * time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx3) - require.NoError(t, err) - - tx4 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:1:3", "USD", big.NewInt(0))). - WithTimestamp(now.Add(-time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx4) - require.NoError(t, err) - - tx5 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:2:1", "USD", big.NewInt(50))). - WithTimestamp(now) - err = store.newStore.CommitTransaction(ctx, &tx5) - require.NoError(t, err) - - tx6 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:2:2", "USD", big.NewInt(50))). - WithTimestamp(now.Add(1 * time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx6) - require.NoError(t, err) - - tx7 := ledger.NewTransaction(). - WithPostings(ledger.NewPosting("world", "account:2:3", "EUR", big.NewInt(25))). - WithTimestamp(now.Add(3 * time.Minute)) - err = store.newStore.CommitTransaction(ctx, &tx7) - require.NoError(t, err) - - require.NoError(t, store.newStore.UpdateAccountsMetadata(ctx, map[string]metadata.Metadata{ - "account:1:1": { - "foo": "bar", - }, - })) - - t.Run("Aggregation Volumes with balance for GroupLvl 0", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - UseInsertionDate: true, - GroupLvl: 0, - }).WithQueryBuilder(query.Match("account", "account::")))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 7) - }) - - t.Run("Aggregation Volumes with balance for GroupLvl 1", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - UseInsertionDate: true, - GroupLvl: 1, - }).WithQueryBuilder(query.Match("account", "account::")))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 2) - }) - - t.Run("Aggregation Volumes with balance for GroupLvl 2", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - UseInsertionDate: true, - GroupLvl: 2, - }).WithQueryBuilder(query.Match("account", "account::")))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 4) - }) - - t.Run("Aggregation Volumes with balance for GroupLvl 3", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - UseInsertionDate: true, - GroupLvl: 3, - }).WithQueryBuilder(query.Match("account", "account::")))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 7) - }) - - t.Run("Aggregation Volumes with balance for GroupLvl 1 && PIT && OOT && effectiveDate", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{ - PIT: &pit, - OOT: &oot, - }, - GroupLvl: 1, - }).WithQueryBuilder(query.Match("account", "account::")))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 2) - require.Equal(t, volumes.Data[0], ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account", - Asset: "EUR", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(50), - Output: big.NewInt(0), - Balance: big.NewInt(50), - }, - }) - require.Equal(t, volumes.Data[1], ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(100), - Output: big.NewInt(0), - Balance: big.NewInt(100), - }, - }) - }) - - t.Run("Aggregation Volumes with balance for GroupLvl 1 && PIT && OOT && effectiveDate && Balance Filter 1", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{ - PIT: &pit, - OOT: &oot, - }, - UseInsertionDate: false, - GroupLvl: 1, - }).WithQueryBuilder( - query.And(query.Match("account", "account::"), query.Gte("balance[EUR]", 50))))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 1) - require.Equal(t, volumes.Data[0], ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account", - Asset: "EUR", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(50), - Output: big.NewInt(0), - Balance: big.NewInt(50), - }, - }) - }) - - t.Run("Aggregation Volumes with balance for GroupLvl 1 && Balance Filter 2", func(t *testing.T) { - t.Parallel() - volumes, err := store.GetVolumesWithBalances(ctx, ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - PITFilter: ledgerstore.PITFilter{}, - UseInsertionDate: true, - GroupLvl: 2, - }).WithQueryBuilder( - query.Or( - query.Match("account", "account:1:"), - query.Lte("balance[USD]", 0))))) - - require.NoError(t, err) - require.Len(t, volumes.Data, 3) - require.Equal(t, volumes.Data[0], ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:1", - Asset: "EUR", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(150), - Output: big.NewInt(0), - Balance: big.NewInt(150), - }, - }) - require.Equal(t, volumes.Data[1], ledger.VolumesWithBalanceByAssetByAccount{ - Account: "account:1", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(100), - Output: big.NewInt(0), - Balance: big.NewInt(100), - }, - }) - require.Equal(t, volumes.Data[2], ledger.VolumesWithBalanceByAssetByAccount{ - Account: "world", - Asset: "USD", - VolumesWithBalance: ledger.VolumesWithBalance{ - Input: big.NewInt(0), - Output: big.NewInt(200), - Balance: big.NewInt(-200), - }, - }) - }) - t.Run("filter using account matching, metadata, and group", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, - ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - GroupLvl: 1, - }).WithQueryBuilder(query.And( - query.Match("account", "account::"), - query.Match("metadata[foo]", "bar"), - ))), - ) - - require.NoError(t, err) - require.Len(t, volumes.Data, 1) - }) - - t.Run("filter using account matching, metadata, and group and PIT", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, - ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - GroupLvl: 1, - PITFilter: ledgerstore.PITFilter{ - PIT: pointer.For(now.Add(time.Minute)), - }, - }).WithQueryBuilder(query.And( - query.Match("account", "account::"), - query.Match("metadata[foo]", "bar"), - ))), - ) - - require.NoError(t, err) - require.Len(t, volumes.Data, 1) - }) - - t.Run("filter using metadata matching only", func(t *testing.T) { - t.Parallel() - - volumes, err := store.GetVolumesWithBalances(ctx, - ledgerstore.NewGetVolumesWithBalancesQuery( - ledgercontroller.NewPaginatedQueryOptions( - ledgerstore.FiltersForVolumes{ - GroupLvl: 1, - }).WithQueryBuilder(query.And( - query.Match("metadata[foo]", "bar"), - ))), - ) - - require.NoError(t, err) - require.Len(t, volumes.Data, 1) - }) -} diff --git a/internal/storage/ledger/resource_aggregated_balances.go b/internal/storage/ledger/resource_aggregated_balances.go index b5a32370c..f1cbe204f 100644 --- a/internal/storage/ledger/resource_aggregated_balances.go +++ b/internal/storage/ledger/resource_aggregated_balances.go @@ -166,7 +166,7 @@ func (h aggregatedBalancesResourceRepositoryHandler) project( return store.db.NewSelect(). TableExpr("(?) values", sumVolumesForAsset). - ColumnExpr("aggregate_objects(json_build_object(asset, volumes)::jsonb) as aggregated"), nil + ColumnExpr("public.aggregate_objects(json_build_object(asset, volumes)::jsonb) as aggregated"), nil } var _ repositoryHandler[ledgercontroller.GetAggregatedVolumesOptions] = aggregatedBalancesResourceRepositoryHandler{} diff --git a/internal/storage/system/migrations.go b/internal/storage/system/migrations.go index 0de325056..f096d202c 100644 --- a/internal/storage/system/migrations.go +++ b/internal/storage/system/migrations.go @@ -215,6 +215,18 @@ func GetMigrator(db *bun.DB, options ...migrations.Option) *migrations.Migrator }) }, }, + migrations.Migration{ + Name: "set default metadata on ledgers", + Up: func(ctx context.Context, db bun.IDB) error { + return db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error { + _, err := tx.ExecContext(ctx, ` + alter table _system.ledgers + alter column metadata set default '{}'::jsonb; + `) + return err + }) + }, + }, ) return migrator