Skip to content

Commit

Permalink
fix(payments): fix generic connector retries
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas committed Jan 17, 2025
1 parent 0d756f8 commit e395cc0
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func (c *Client) ListAccounts(ctx context.Context, page, pageSize int64) ([]gene
Page(page).
PageSize(pageSize)

accounts, _, err := req.Execute()
accounts, resp, err := req.Execute()
if err != nil {
return nil, err
return nil, wrapError(err, resp)
}

return accounts, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ func (c *Client) GetBalances(ctx context.Context, accountID string) (*genericcli

req := c.apiClient.DefaultApi.GetAccountBalances(ctx, accountID)

balances, _, err := req.Execute()
balances, resp, err := req.Execute()
if err != nil {
return nil, err
return nil, wrapError(err, resp)
}

return balances, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func (c *Client) ListBeneficiaries(ctx context.Context, page, pageSize int64, cr
req = req.CreatedAtFrom(createdAtFrom)
}

beneficiaries, _, err := req.Execute()
beneficiaries, resp, err := req.Execute()
if err != nil {
return nil, err
return nil, wrapError(err, resp)
}

return beneficiaries, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package client

import (
"errors"
"fmt"
"net/http"
)

var (
ErrStatusCodeClientError = errors.New("client error")
ErrStatusCodeServerError = errors.New("server error")
)

func wrapError(err error, resp *http.Response) error {
statusCode := resp.StatusCode

if statusCode >= http.StatusBadRequest && statusCode < http.StatusInternalServerError {
return fmt.Errorf("%w: %w", err, ErrStatusCodeClientError)
}

return fmt.Errorf("%w: %w", err, ErrStatusCodeServerError)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func (c *Client) ListTransactions(ctx context.Context, page, pageSize int64, upd
req = req.UpdatedAtFrom(updatedAtFrom)
}

transactions, _, err := req.Execute()
transactions, resp, err := req.Execute()
if err != nil {
return nil, err
return nil, wrapError(err, resp)
}

return transactions, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package generic
import (
"context"
"encoding/json"
"fmt"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
"github.com/formancehq/payments/cmd/connectors/internal/connectors/generic/client"
Expand Down Expand Up @@ -62,15 +63,18 @@ func taskFetchAccounts(client *client.Client, config *Config) task.Task {
func ingestAccounts(
ctx context.Context,
connectorID models.ConnectorID,
client *client.Client,
c *client.Client,
ingester ingestion.Ingester,
scheduler task.Scheduler,
) error {

balancesTasks := make([]models.TaskDescriptor, 0)
for page := 1; ; page++ {
accounts, err := client.ListAccounts(ctx, int64(page), pageSize)
accounts, err := c.ListAccounts(ctx, int64(page), pageSize)
if err != nil {
if errors.Is(err, client.ErrStatusCodeServerError) {
return fmt.Errorf("%w: %w", task.ErrRetryable, err)
}
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package generic

import (
"context"
"errors"
"fmt"
"math/big"

Expand All @@ -16,7 +17,7 @@ import (
"go.opentelemetry.io/otel/attribute"
)

func taskFetchBalances(client *client.Client, config *Config, accountID string) task.Task {
func taskFetchBalances(c *client.Client, config *Config, accountID string) task.Task {
return func(
ctx context.Context,
taskID models.TaskID,
Expand All @@ -32,10 +33,15 @@ func taskFetchBalances(client *client.Client, config *Config, accountID string)
)
defer span.End()

balances, err := client.GetBalances(ctx, accountID)
balances, err := c.GetBalances(ctx, accountID)
if err != nil {
// retryable error already handled by the client
otel.RecordError(span, err)

if errors.Is(err, client.ErrStatusCodeServerError) {
return fmt.Errorf("%w: %w", task.ErrRetryable, err)
}

return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package generic
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/formancehq/payments/cmd/connectors/internal/connectors"
Expand Down Expand Up @@ -55,7 +56,7 @@ func taskFetchBeneficiaries(client *client.Client, config *Config) task.Task {
func ingestBeneficiaries(
ctx context.Context,
connectorID models.ConnectorID,
client *client.Client,
c *client.Client,
ingester ingestion.Ingester,
state fetchBeneficiariesState,
) (fetchBeneficiariesState, error) {
Expand All @@ -64,8 +65,12 @@ func ingestBeneficiaries(
}

for page := 1; ; page++ {
beneficiaries, err := client.ListBeneficiaries(ctx, int64(page), pageSize, state.LastCreatedAt)
beneficiaries, err := c.ListBeneficiaries(ctx, int64(page), pageSize, state.LastCreatedAt)
if err != nil {
if errors.Is(err, client.ErrStatusCodeServerError) {
return fetchBeneficiariesState{}, fmt.Errorf("%w: %w", task.ErrRetryable, err)
}

return fetchBeneficiariesState{}, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func taskFetchTransactions(client *client.Client, config *Config) task.Task {
func ingestTransactions(
ctx context.Context,
connectorID models.ConnectorID,
client *client.Client,
c *client.Client,
ingester ingestion.Ingester,
scheduler task.Scheduler,
state fetchTransactionsState,
Expand All @@ -70,8 +70,12 @@ func ingestTransactions(
}

for page := 1; ; page++ {
transactions, err := client.ListTransactions(ctx, int64(page), pageSize, state.LastUpdatedAt)
transactions, err := c.ListTransactions(ctx, int64(page), pageSize, state.LastUpdatedAt)
if err != nil {
if errors.Is(err, client.ErrStatusCodeServerError) {
return fetchTransactionsState{}, fmt.Errorf("%w: %w", task.ErrRetryable, err)
}

return fetchTransactionsState{}, err
}

Expand Down

0 comments on commit e395cc0

Please sign in to comment.