Skip to content

Commit cb2000f

Browse files
authored
fix(balances): fix balances insertion (#1734)
1 parent ab7a709 commit cb2000f

File tree

7 files changed

+167
-44
lines changed

7 files changed

+167
-44
lines changed

components/ledger/Earthfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ sources:
1919
generate:
2020
FROM core+builder-image
2121
RUN apk update && apk add openjdk11
22-
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@latest
22+
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@v0.4.0
2323
COPY (+sources/*) /src
2424
WORKDIR /src/components/ledger
2525
DO --pass-args core+GO_GENERATE

components/operator/Earthfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ generate:
5959

6060
generate-mock:
6161
FROM core+builder-image
62-
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@latest
62+
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@v0.4.0
6363
COPY (+sources/*) /src
6464
WORKDIR /src/components/operator
6565
RUN go generate -run mockgen ./...

components/payments/cmd/api/internal/storage/balances.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func (s *Storage) GetBalanceAtByCurrency(ctx context.Context, accountID models.A
117117
Where("currency = ?", currency).
118118
Where("created_at <= ?", at).
119119
Where("last_updated_at >= ?", at).
120-
Order("last_updated_at DESC").
120+
Order("created_at DESC").
121121
Limit(1).
122122
Scan(ctx)
123123
if err != nil {

components/payments/cmd/connectors/internal/storage/balance_test.go

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
var (
1515
b1T = time.Date(2023, 11, 14, 5, 1, 10, 0, time.UTC)
1616
b2T = time.Date(2023, 11, 14, 5, 1, 20, 0, time.UTC)
17+
b3T = time.Date(2023, 11, 14, 5, 1, 40, 0, time.UTC)
1718
)
1819

1920
func TestBalances(t *testing.T) {
@@ -22,6 +23,7 @@ func TestBalances(t *testing.T) {
2223
testInstallConnectors(t, store)
2324
testCreateAccounts(t, store)
2425
testCreateBalances(t, store)
26+
testUpdateBalances(t, store)
2527
testUninstallConnectors(t, store)
2628
testBalancesDeletedAfterConnectorUninstall(t, store)
2729
}
@@ -60,15 +62,71 @@ func testCreateBalances(t *testing.T, store *storage.Storage) {
6062
err = store.InsertBalances(context.Background(), []*models.Balance{b2}, true)
6163
require.NoError(t, err)
6264

63-
testGetBalance(t, store, acc1ID, []*models.Balance{b2, b1}, nil)
65+
testGetBalance(t, store, acc1ID, []*models.Balance{b2, b1})
66+
}
67+
68+
func testUpdateBalances(t *testing.T, store *storage.Storage) {
69+
b1 := &models.Balance{
70+
AccountID: acc2ID,
71+
Asset: "USD",
72+
Balance: big.NewInt(int64(338737362)),
73+
CreatedAt: time.Date(2024, 10, 8, 22, 28, 18, 893000, time.UTC),
74+
LastUpdatedAt: time.Date(2024, 10, 8, 22, 28, 18, 893000, time.UTC),
75+
}
76+
77+
err := store.InsertBalances(context.Background(), []*models.Balance{b1}, false)
78+
require.NoError(t, err)
79+
80+
testGetBalance(t, store, acc2ID, []*models.Balance{b1})
81+
82+
b2 := &models.Balance{
83+
AccountID: acc2ID,
84+
Asset: "USD",
85+
Balance: big.NewInt(int64(317070162)),
86+
CreatedAt: time.Date(2024, 10, 15, 15, 00, 01, 960000, time.UTC),
87+
LastUpdatedAt: time.Date(2024, 10, 15, 15, 00, 01, 960000, time.UTC),
88+
}
89+
90+
b1.LastUpdatedAt = b2.CreatedAt
91+
err = store.InsertBalances(context.Background(), []*models.Balance{b1, b2}, false)
92+
require.NoError(t, err)
93+
94+
testGetBalance(t, store, acc2ID, []*models.Balance{b2, b1})
95+
96+
b3 := &models.Balance{
97+
AccountID: acc2ID,
98+
Asset: "USD",
99+
Balance: big.NewInt(int64(327762162)),
100+
CreatedAt: time.Date(2024, 10, 16, 19, 36, 29, 850000, time.UTC),
101+
LastUpdatedAt: time.Date(2024, 10, 16, 19, 36, 29, 850000, time.UTC),
102+
}
103+
104+
err = store.InsertBalances(context.Background(), []*models.Balance{b1, b2, b3}, false)
105+
require.NoError(t, err)
106+
107+
b2.LastUpdatedAt = b3.CreatedAt
108+
testGetBalance(t, store, acc2ID, []*models.Balance{b3, b2, b1})
109+
110+
b4 := &models.Balance{
111+
AccountID: acc2ID,
112+
Asset: "USD",
113+
Balance: big.NewInt(int64(327762162)),
114+
CreatedAt: time.Date(2024, 10, 16, 19, 38, 06, 766000, time.UTC),
115+
LastUpdatedAt: time.Date(2024, 10, 16, 19, 38, 06, 766000, time.UTC),
116+
}
117+
118+
err = store.InsertBalances(context.Background(), []*models.Balance{b1, b2, b3, b4}, false)
119+
require.NoError(t, err)
120+
121+
b3.LastUpdatedAt = b3T
122+
testGetBalance(t, store, acc2ID, []*models.Balance{b3, b2, b1})
64123
}
65124

66125
func testGetBalance(
67126
t *testing.T,
68127
store *storage.Storage,
69128
accountID models.AccountID,
70129
expectedBalances []*models.Balance,
71-
expectedError error,
72130
) {
73131
balances, err := store.GetBalancesForAccountID(context.Background(), accountID)
74132
require.NoError(t, err)

components/payments/cmd/connectors/internal/storage/balances.go

Lines changed: 102 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,59 +2,123 @@ package storage
22

33
import (
44
"context"
5+
"database/sql"
6+
"errors"
57

68
"github.com/formancehq/payments/internal/models"
9+
"github.com/formancehq/stack/libs/go-libs/logging"
10+
"github.com/uptrace/bun"
711
)
812

913
func (s *Storage) InsertBalances(ctx context.Context, balances []*models.Balance, checkIfAccountExists bool) error {
1014
if len(balances) == 0 {
1115
return nil
1216
}
1317

14-
query := s.db.NewInsert().
15-
Model((*models.Balance)(nil)).
16-
With("cte1", s.db.NewValues(&balances)).
17-
Column(
18-
"created_at",
19-
"account_id",
20-
"balance",
21-
"currency",
22-
"last_updated_at",
23-
)
24-
if checkIfAccountExists {
25-
query = query.TableExpr(`
26-
(SELECT *
27-
FROM cte1
28-
WHERE EXISTS (SELECT 1 FROM accounts.account WHERE id = cte1.account_id)
29-
AND cte1.balance != COALESCE((SELECT balance FROM accounts.balances WHERE account_id = cte1.account_id AND last_updated_at < cte1.last_updated_at AND currency = cte1.currency ORDER BY last_updated_at DESC LIMIT 1), cte1.balance+1)
30-
) data`)
31-
} else {
32-
query = query.TableExpr(`
33-
(SELECT *
34-
FROM cte1
35-
WHERE cte1.balance != COALESCE((SELECT balance FROM accounts.balances WHERE account_id = cte1.account_id AND last_updated_at < cte1.last_updated_at AND currency = cte1.currency ORDER BY last_updated_at DESC LIMIT 1), cte1.balance+1)
36-
) data`)
18+
tx, err := s.db.BeginTx(ctx, &sql.TxOptions{})
19+
if err != nil {
20+
return err
21+
}
22+
defer func() {
23+
err := tx.Rollback()
24+
if err != nil {
25+
logging.FromContext(ctx).Error("failed to rollback transaction", err)
26+
}
27+
}()
28+
29+
for _, balance := range balances {
30+
if err := s.insertBalances(ctx, tx, balance, checkIfAccountExists); err != nil {
31+
return err
32+
}
3733
}
3834

39-
query = query.On("CONFLICT (account_id, created_at, currency) DO NOTHING")
35+
return e("failed to commit transaction", tx.Commit())
36+
}
4037

41-
_, err := query.Exec(ctx)
38+
func (s *Storage) insertBalances(ctx context.Context, tx bun.Tx, balance *models.Balance, checkIfAccountExists bool) error {
39+
var account models.Account
40+
err := tx.NewSelect().
41+
Model(&account).
42+
Where("id = ?", balance.AccountID).
43+
Scan(ctx)
4244
if err != nil {
43-
return e("failed to create balances", err)
45+
pErr := e("failed to get account", err)
46+
if !errors.Is(pErr, ErrNotFound) {
47+
return pErr
48+
}
49+
50+
// error not found here
51+
if !checkIfAccountExists {
52+
// return an error here to keep the same behavior as before when
53+
// checkIfAccountExists is false
54+
return pErr
55+
} else {
56+
// if checkIfAccountExists is true, we should ignore the balance
57+
// if the account does not exist
58+
return nil
59+
}
4460
}
4561

46-
// Always update the previous row in order to keep the balance history consistent.
47-
_, err = s.db.NewUpdate().
48-
Model((*models.Balance)(nil)).
49-
With("cte1", s.db.NewValues(&balances)).
50-
TableExpr(`
51-
(SELECT (SELECT created_at FROM accounts.balances WHERE last_updated_at < cte1.last_updated_at AND account_id = cte1.account_id AND currency = cte1.currency ORDER BY last_updated_at DESC LIMIT 1), cte1.account_id, cte1.currency, cte1.last_updated_at FROM cte1) data
52-
`).
53-
Set("last_updated_at = data.last_updated_at").
54-
Where("balance.account_id = data.account_id AND balance.currency = data.currency AND balance.created_at = data.created_at").
55-
Exec(ctx)
62+
var lastBalance models.Balance
63+
found := true
64+
err = tx.NewSelect().
65+
Model(&lastBalance).
66+
Where("account_id = ? AND currency = ?", balance.AccountID, balance.Asset).
67+
Order("created_at DESC").
68+
Limit(1).
69+
Scan(ctx)
5670
if err != nil {
57-
return e("failed to update balances", err)
71+
pErr := e("failed to get account", err)
72+
if !errors.Is(pErr, ErrNotFound) {
73+
return pErr
74+
}
75+
found = false
76+
}
77+
78+
if found && lastBalance.CreatedAt.After(balance.CreatedAt) {
79+
// Do not insert balance if the last balance is newer
80+
return nil
81+
}
82+
83+
switch {
84+
case found && lastBalance.Balance.Cmp(balance.Balance) == 0:
85+
// same balance, no need to have a new entry, just update the last one
86+
_, err = tx.NewUpdate().
87+
Model((*models.Balance)(nil)).
88+
Set("last_updated_at = ?", balance.LastUpdatedAt).
89+
Where("account_id = ? AND created_at = ? AND currency = ?", lastBalance.AccountID, lastBalance.CreatedAt, lastBalance.Asset).
90+
Exec(ctx)
91+
if err != nil {
92+
return e("failed to update balance", err)
93+
}
94+
95+
case found && lastBalance.Balance.Cmp(balance.Balance) != 0:
96+
// different balance, insert a new entry
97+
_, err = tx.NewInsert().
98+
Model(balance).
99+
Exec(ctx)
100+
if err != nil {
101+
return e("failed to insert balance", err)
102+
}
103+
104+
// and update last row last updated at to this created at
105+
_, err = tx.NewUpdate().
106+
Model(&lastBalance).
107+
Set("last_updated_at = ?", balance.CreatedAt).
108+
Where("account_id = ? AND created_at = ? AND currency = ?", lastBalance.AccountID, lastBalance.CreatedAt, lastBalance.Asset).
109+
Exec(ctx)
110+
if err != nil {
111+
return e("failed to update balance", err)
112+
}
113+
114+
case !found:
115+
// no balance found, insert a new entry
116+
_, err = tx.NewInsert().
117+
Model(balance).
118+
Exec(ctx)
119+
if err != nil {
120+
return e("failed to insert balance", err)
121+
}
58122
}
59123

60124
return nil
@@ -66,6 +130,7 @@ func (s *Storage) GetBalancesForAccountID(ctx context.Context, accountID models.
66130
err := s.db.NewSelect().
67131
Model(&balances).
68132
Where("account_id = ?", accountID).
133+
Order("created_at DESC").
69134
Scan(ctx)
70135
if err != nil {
71136
return nil, e("failed to get balances", err)

ee/agent/Earthfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ tidy:
9696

9797
generate:
9898
FROM core+builder-image
99-
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@latest
99+
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@v0.4.0
100100
COPY (+sources/*) /src
101101
WORKDIR /src/ee/agent
102102
RUN go generate -run mockgen ./...

ee/orchestration/Earthfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ sources:
2020
generate:
2121
FROM core+builder-image
2222
RUN apk update && apk add openjdk11
23-
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@latest
23+
DO --pass-args core+GO_INSTALL --package=go.uber.org/mock/mockgen@v0.4.0
2424
COPY (+sources/*) /src
2525
WORKDIR /src/ee/orchestration
2626
DO --pass-args core+GO_GENERATE

0 commit comments

Comments
 (0)