Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: cleanup database #518

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ require (
github.com/onsi/gomega v1.35.1
github.com/ory/dockertest/v3 v3.11.0
github.com/pborman/uuid v1.2.1
github.com/pkg/errors v0.9.1
github.com/shomali11/xsql v0.0.0-20190608141458-bf76292144df
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
Expand All @@ -55,6 +54,7 @@ require gopkg.in/yaml.v3 v3.0.1 // indirect
require (
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c // indirect
github.com/pkg/errors v0.9.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
)

Expand Down
2 changes: 1 addition & 1 deletion internal/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ type BalancesByAssetsByAccounts map[string]BalancesByAssets
```go
type Configuration struct {
Bucket string `json:"bucket" bun:"bucket,type:varchar(255)"`
Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb"`
Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb,nullzero"`
Features features.FeatureSet `json:"features" bun:"features,type:jsonb"`
}
```
Expand Down
2 changes: 1 addition & 1 deletion internal/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var (

type Configuration struct {
Bucket string `json:"bucket" bun:"bucket,type:varchar(255)"`
Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb"`
Metadata metadata.Metadata `json:"metadata" bun:"metadata,type:jsonb,nullzero"`
Features features.FeatureSet `json:"features" bun:"features,type:jsonb"`
}

Expand Down
10 changes: 5 additions & 5 deletions internal/storage/bucket/default_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
)

// stateless version (+1 regarding directory name, as migrations start from 1 in the lib)
const MinimalSchemaVersion = 12
const MinimalSchemaVersion = 27

type DefaultBucket struct {
name string
db *bun.DB
name string
db *bun.DB
tracer trace.Tracer
}

Expand Down Expand Up @@ -81,8 +81,8 @@ func (b *DefaultBucket) AddLedger(ctx context.Context, l ledger.Ledger) error {

func NewDefault(db *bun.DB, tracer trace.Tracer, name string) *DefaultBucket {
return &DefaultBucket{
db: db,
name: name,
db: db,
name: name,
tracer: tracer,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Recreate accounts unique index
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- There is already a covering index on accounts table (including seq column).
-- As we will remove the seq column in next migration, we have to create a new index without it (PG will remove it automatically in background).
-- Also, we create the index concurrently to avoid locking the table.
-- And, as there is already an index on this table, the index creation should not fail.
--
-- We create this index in a dedicated as, as the doc mentions it (https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-MULTI-STATEMENT)
-- multi statements queries are automatically wrapped inside transaction block, and it's forbidden
-- to create index concurrently inside a transaction block.
create unique index concurrently accounts_ledger2 on "{{.Schema}}".accounts (ledger, address)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
name: Clean not used columns in database
124 changes: 124 additions & 0 deletions internal/storage/bucket/migrations/27-clean-database/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
set search_path = '{{.Schema}}';

-- Clean all useless function/aggregates/indexes inherited from stateful version.
drop aggregate aggregate_objects(jsonb);
drop aggregate first(anyelement);

drop function array_distinct(anyarray);
drop function insert_posting(_transaction_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, posting jsonb, _account_metadata jsonb);
drop function upsert_account(_ledger character varying, _address character varying, _metadata jsonb, _date timestamp without time zone, _first_usage timestamp without time zone);
drop function get_latest_move_for_account_and_asset(_ledger character varying, _account_address character varying, _asset character varying, _before timestamp without time zone);
drop function update_transaction_metadata(_ledger character varying, _id numeric, _metadata jsonb, _date timestamp without time zone);
drop function delete_account_metadata(_ledger character varying, _address character varying, _key character varying, _date timestamp without time zone);
drop function delete_transaction_metadata(_ledger character varying, _id numeric, _key character varying, _date timestamp without time zone);
drop function balance_from_volumes(v volumes);
drop function get_all_account_volumes(_ledger character varying, _account character varying, _before timestamp without time zone);
drop function first_agg(anyelement, anyelement);
drop function volumes_to_jsonb(v volumes_with_asset);
drop function get_account_aggregated_effective_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone);
drop function handle_log();
drop function get_account_aggregated_volumes(_ledger character varying, _account_address character varying, _before timestamp without time zone);
drop function get_aggregated_volumes_for_transaction(_ledger character varying, tx numeric);
drop function insert_move(_transactions_seq bigint, _ledger character varying, _insertion_date timestamp without time zone, _effective_date timestamp without time zone, _account_address character varying, _asset character varying, _amount numeric, _is_source boolean, _account_exists boolean);
drop function get_all_assets(_ledger character varying);
drop function insert_transaction(_ledger character varying, data jsonb, _date timestamp without time zone, _account_metadata jsonb);
drop function get_all_account_effective_volumes(_ledger character varying, _account character varying, _before timestamp without time zone);
drop function get_account_balance(_ledger character varying, _account character varying, _asset character varying, _before timestamp without time zone);
drop function get_aggregated_effective_volumes_for_transaction(_ledger character varying, tx numeric);
drop function aggregate_ledger_volumes(_ledger character varying, _before timestamp without time zone, _accounts character varying[], _assets character varying[] );
drop function get_transaction(_ledger character varying, _id numeric, _before timestamp without time zone);
drop function revert_transaction(_ledger character varying, _id numeric, _date timestamp without time zone);

drop index transactions_sources_arrays;
drop index transactions_destinations_arrays;
drop index transactions_sources;
drop index transactions_destinations;

-- We will remove some triggers writing these columns (set_compat_xxx) later in this file.
-- When these triggers will be removed, there is a little moment where the columns will not be filled and constraints
-- still checked by the database.
-- So, we drop the not null constraint before removing the triggers.
-- Once the triggers removed, we will be able to drop the columns.
alter table moves
alter column transactions_seq drop not null,
alter column accounts_seq drop not null,
alter column accounts_address_array drop not null;

alter table transactions_metadata
alter column transactions_seq drop not null;

alter table accounts_metadata
alter column accounts_seq drop not null;

-- Now, the columns are nullable, we can drop the trigger
drop trigger set_compat_on_move on moves;
drop trigger set_compat_on_accounts_metadata on accounts_metadata;
drop trigger set_compat_on_transactions_metadata on transactions_metadata;
drop function set_compat_on_move();
drop function set_compat_on_accounts_metadata();
drop function set_compat_on_transactions_metadata();

-- Finally remove the columns
alter table moves
drop column transactions_seq,
drop column accounts_seq,
drop column accounts_address_array;

alter table transactions_metadata
drop column transactions_seq;

alter table accounts_metadata
drop column accounts_seq;

alter table transactions
drop column seq;

alter table accounts
drop column seq;

-- rename index create in previous migration, as the drop of the column seq of accounts table has automatically dropped the index accounts_ledger
alter index accounts_ledger2
rename to accounts_ledger;

create or replace function set_log_hash()
returns trigger
security definer
language plpgsql
as
$$
declare
previousHash bytea;
marshalledAsJSON varchar;
begin
select hash into previousHash
from logs
where ledger = new.ledger
order by id desc
limit 1;

-- select only fields participating in the hash on the backend and format json representation the same way
select '{' ||
'"type":"' || new.type || '",' ||
'"data":' || encode(new.memento, 'escape') || ',' ||
'"date":"' || (to_json(new.date::timestamp)#>>'{}') || 'Z",' ||
'"idempotencyKey":"' || coalesce(new.idempotency_key, '') || '",' ||
'"id":0,' ||
'"hash":null' ||
'}' into marshalledAsJSON;

new.hash = (
select public.digest(
case
when previousHash is null
then marshalledAsJSON::bytea
else '"' || encode(previousHash::bytea, 'base64')::bytea || E'"\n' || convert_to(marshalledAsJSON, 'LATIN1')::bytea
end || E'\n', 'sha256'::text
)
);

return new;
end;
$$ set search_path from current;

alter table logs
drop column seq;
10 changes: 2 additions & 8 deletions internal/storage/driver/adapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package driver

import (
"context"
"fmt"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger/legacy"
ledgerstore "github.com/formancehq/ledger/internal/storage/ledger"

ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
Expand All @@ -20,12 +19,7 @@ func (d *DefaultStorageDriverAdapter) OpenLedger(ctx context.Context, name strin
return nil, nil, err
}

isUpToDate, err := store.GetBucket().IsUpToDate(ctx)
if err != nil {
return nil, nil, fmt.Errorf("checking if bucket is up to date: %w", err)
}

return ledgerstore.NewDefaultStoreAdapter(isUpToDate, store), l, nil
return ledgerstore.NewDefaultStoreAdapter(store), l, nil
}

func (d *DefaultStorageDriverAdapter) CreateLedger(ctx context.Context, l *ledger.Ledger) error {
Expand Down
4 changes: 0 additions & 4 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ type Driver struct {

func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) {

if l.Metadata == nil {
l.Metadata = metadata.Metadata{}
}

b := d.bucketFactory.Create(l.Bucket)
isInitialized, err := b.IsInitialized(ctx)
if err != nil {
Expand Down
51 changes: 51 additions & 0 deletions internal/storage/ledger/adapters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ledger

import (
"context"
"database/sql"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
)

type TX struct {
*Store
}

type DefaultStoreAdapter struct {
*Store
}

func (d *DefaultStoreAdapter) IsUpToDate(ctx context.Context) (bool, error) {
return d.HasMinimalVersion(ctx)
}

func (d *DefaultStoreAdapter) BeginTX(ctx context.Context, opts *sql.TxOptions) (ledgercontroller.Store, error) {
store, err := d.Store.BeginTX(ctx, opts)
if err != nil {
return nil, err
}

return &DefaultStoreAdapter{
Store: store,
}, nil
}

func (d *DefaultStoreAdapter) Commit() error {
return d.Store.Commit()
}

func (d *DefaultStoreAdapter) Rollback() error {
return d.Store.Rollback()
}

func (d *DefaultStoreAdapter) AggregatedBalances() ledgercontroller.Resource[ledger.AggregatedVolumes, ledgercontroller.GetAggregatedVolumesOptions] {
return d.AggregatedVolumes()
}

func NewDefaultStoreAdapter(store *Store) *DefaultStoreAdapter {
return &DefaultStoreAdapter{
Store: store,
}
}

var _ ledgercontroller.Store = (*DefaultStoreAdapter)(nil)
67 changes: 15 additions & 52 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,62 +49,25 @@ func (store *Store) GetBalances(ctx context.Context, query ledgercontroller.Bala
}
}

// Try to insert volumes using last move (to keep compat with previous version) or 0 values.
// This way, if the account has a 0 balance at this point, it will be locked as any other accounts.
// If the complete sql transaction fails, the account volumes will not be inserted.
selectMoves := store.db.NewSelect().
ModelTableExpr(store.GetPrefixedRelationName("moves")).
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset").
ColumnExpr("first_value(post_commit_volumes) over (partition by accounts_address, asset order by seq desc) as post_commit_volumes").
ColumnExpr("first_value(ledger) over (partition by accounts_address, asset order by seq desc) as ledger").
Where("("+strings.Join(conditions, ") OR (")+")", args...)

zeroValuesAndMoves := store.db.NewSelect().
TableExpr("(?) data", selectMoves).
Column("ledger", "accounts_address", "asset").
ColumnExpr("(post_commit_volumes).inputs as input").
ColumnExpr("(post_commit_volumes).outputs as output").
UnionAll(
store.db.NewSelect().
TableExpr(
"(?) data",
store.db.NewSelect().NewValues(&accountsVolumes),
).
Column("*"),
)

zeroValueOrMoves := store.db.NewSelect().
TableExpr("(?) data", zeroValuesAndMoves).
Column("ledger", "accounts_address", "asset", "input", "output").
DistinctOn("ledger, accounts_address, asset")

insertDefaultValue := store.db.NewInsert().
TableExpr(store.GetPrefixedRelationName("accounts_volumes")).
TableExpr("(" + zeroValueOrMoves.String() + ") data").
On("conflict (ledger, accounts_address, asset) do nothing").
Returning("ledger, accounts_address, asset, input, output")

selectExistingValues := store.db.NewSelect().
err := store.db.NewSelect().
With(
"ins",
// Try to insert volumes with 0 values.
// This way, if the account has a 0 balance at this point, it will be locked as any other accounts.
// It the complete sql transaction fail, the account volumes will not be inserted.
store.db.NewInsert().
Model(&accountsVolumes).
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
On("conflict do nothing"),
).
Model(&accountsVolumes).
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
Column("ledger", "accounts_address", "asset", "input", "output").
Column("accounts_address", "asset", "input", "output").
Where("("+strings.Join(conditions, ") OR (")+")", args...).
For("update").
// notes(gfyrag): Keep order, it ensures consistent locking order and limit deadlocks
Order("accounts_address", "asset")

finalQuery := store.db.NewSelect().
With("inserted", insertDefaultValue).
With("existing", selectExistingValues).
ModelTableExpr(
"(?) accounts_volumes",
store.db.NewSelect().
ModelTableExpr("inserted").
UnionAll(store.db.NewSelect().ModelTableExpr("existing")),
).
Model(&accountsVolumes)

err := finalQuery.Scan(ctx)
Order("accounts_address", "asset").
Scan(ctx)
if err != nil {
return nil, postgres.ResolveError(err)
}
Expand Down
Loading
Loading