-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
all: TSS RPC Caller Service #40
Changes from 72 commits
a58d519
9920f48
3f9f9f0
373c71a
2de9898
c0f9d32
a9cf4e3
6fc0dc2
fb807aa
f8c261b
d774f2a
6b04d62
f2fefef
b441a31
02c7de3
f9a5611
32bd6ce
5c7db27
c3e8de9
82b12fb
9a9c9be
0662da9
8f76d61
456b71f
7974aa9
5bdf8f2
3890bfe
e87c6d6
c1d3786
01ed3eb
3ddd2bd
5776cb1
2c6db84
77a6e4b
01109ab
c005a06
f560a05
38ba21b
8ddb961
d32adb1
e0727b5
968a6b7
096c7bd
6cfc3fd
b71e342
ccf27f1
8045d8d
d062a11
1a3c17c
174ed45
5d31b01
a6dc954
43fcf10
b4ab5de
f10d239
db8714d
2c0d060
b82a25a
101bd43
637aeaf
0fb94e3
df7ae05
cf2dc18
16581fd
ab7c0d7
5fe5d0b
78042c9
9118a06
12425f9
6b2a6f7
b4f6052
01ad88d
26e8733
33c2b78
3cdacaa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,11 @@ import ( | |
"github.com/stellar/wallet-backend/internal/signing" | ||
"github.com/stellar/wallet-backend/internal/signing/store" | ||
signingutils "github.com/stellar/wallet-backend/internal/signing/utils" | ||
"github.com/stellar/wallet-backend/internal/tss" | ||
tsschannel "github.com/stellar/wallet-backend/internal/tss/channels" | ||
tssrouter "github.com/stellar/wallet-backend/internal/tss/router" | ||
tssservices "github.com/stellar/wallet-backend/internal/tss/services" | ||
tssstore "github.com/stellar/wallet-backend/internal/tss/store" | ||
) | ||
|
||
// NOTE: perhaps move this to a environment variable. | ||
|
@@ -58,7 +63,10 @@ type Configs struct { | |
HorizonClientURL string | ||
DistributionAccountSignatureClient signing.SignatureClient | ||
ChannelAccountSignatureClient signing.SignatureClient | ||
|
||
// TSS | ||
RPCURL string | ||
RPCCallerServiceChannelBufferSize int | ||
RPCCallerServiceChannelMaxWorkers int | ||
// Error Tracker | ||
AppTracker apptracker.AppTracker | ||
} | ||
|
@@ -74,7 +82,11 @@ type handlerDeps struct { | |
AccountService services.AccountService | ||
AccountSponsorshipService services.AccountSponsorshipService | ||
PaymentService services.PaymentService | ||
AppTracker apptracker.AppTracker | ||
// TSS | ||
RPCCallerServiceChannel tss.Channel | ||
TSSRouter tssrouter.Router | ||
// Error Tracker | ||
AppTracker apptracker.AppTracker | ||
} | ||
|
||
func Serve(cfg Configs) error { | ||
|
@@ -92,6 +104,7 @@ func Serve(cfg Configs) error { | |
}, | ||
OnStopping: func() { | ||
log.Info("Stopping Wallet Backend server") | ||
deps.RPCCallerServiceChannel.Stop() | ||
}, | ||
}) | ||
|
||
|
@@ -155,6 +168,49 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) { | |
} | ||
go ensureChannelAccounts(channelAccountService, int64(cfg.NumberOfChannelAccounts)) | ||
|
||
// TSS | ||
txServiceOpts := tssservices.TransactionServiceOptions{ | ||
DistributionAccountSignatureClient: cfg.DistributionAccountSignatureClient, | ||
ChannelAccountSignatureClient: cfg.ChannelAccountSignatureClient, | ||
HorizonClient: &horizonClient, | ||
BaseFee: int64(cfg.BaseFee), | ||
} | ||
tssTxService, err := tssservices.NewTransactionService(txServiceOpts) | ||
if err != nil { | ||
return handlerDeps{}, fmt.Errorf("instantiating tss transaction service: %w", err) | ||
} | ||
httpClient := http.Client{Timeout: time.Duration(30 * time.Second)} | ||
rpcService, err := services.NewRPCService(cfg.RPCURL, &httpClient) | ||
if err != nil { | ||
return handlerDeps{}, fmt.Errorf("instantiating rpc service: %w", err) | ||
} | ||
|
||
store, err := tssstore.NewStore(dbConnectionPool) | ||
if err != nil { | ||
return handlerDeps{}, fmt.Errorf("instantiating tss store: %w", err) | ||
} | ||
txManager := tssservices.NewTransactionManager(tssservices.TransactionManagerConfigs{ | ||
TxService: tssTxService, | ||
RPCService: rpcService, | ||
Store: store, | ||
}) | ||
|
||
rpcCallerServiceChannel := tsschannel.NewRPCCallerChannel(tsschannel.RPCCallerChannelConfigs{ | ||
TxManager: txManager, | ||
Store: store, | ||
MaxBufferSize: cfg.RPCCallerServiceChannelBufferSize, | ||
MaxWorkers: cfg.RPCCallerServiceChannelMaxWorkers, | ||
}) | ||
|
||
router := tssrouter.NewRouter(tssrouter.RouterConfigs{ | ||
RPCCallerChannel: rpcCallerServiceChannel, | ||
ErrorJitterChannel: nil, | ||
ErrorNonJitterChannel: nil, | ||
WebhookChannel: nil, | ||
}) | ||
|
||
rpcCallerServiceChannel.SetRouter(router) | ||
|
||
return handlerDeps{ | ||
Models: models, | ||
SignatureVerifier: signatureVerifier, | ||
|
@@ -163,6 +219,9 @@ func initHandlerDeps(cfg Configs) (handlerDeps, error) { | |
AccountSponsorshipService: accountSponsorshipService, | ||
PaymentService: paymentService, | ||
AppTracker: cfg.AppTracker, | ||
// TSS | ||
RPCCallerServiceChannel: rpcCallerServiceChannel, | ||
TSSRouter: router, | ||
Comment on lines
+223
to
+224
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which of the two will be the entrypoint when handling HTTP requests? Probably that one can be our only dependency to be injected in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We actually need both, because the channel is being closed (via Stop()) in the Serve function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But the entrypoint will be the router There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm got it, maybe we can create an |
||
}, nil | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package servicesmocks | ||
|
||
import ( | ||
"github.com/stellar/wallet-backend/internal/entities" | ||
"github.com/stellar/wallet-backend/internal/services" | ||
"github.com/stretchr/testify/mock" | ||
) | ||
|
||
type RPCServiceMock struct { | ||
mock.Mock | ||
} | ||
|
||
var _ services.RPCService = (*RPCServiceMock)(nil) | ||
|
||
func (r *RPCServiceMock) SendTransaction(transactionXdr string) (entities.RPCSendTransactionResult, error) { | ||
args := r.Called(transactionXdr) | ||
return args.Get(0).(entities.RPCSendTransactionResult), args.Error(1) | ||
} | ||
|
||
func (r *RPCServiceMock) GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error) { | ||
args := r.Called(transactionHash) | ||
return args.Get(0).(entities.RPCGetTransactionResult), args.Error(1) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
package channels | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/alitto/pond" | ||
|
||
"github.com/stellar/go/support/log" | ||
"github.com/stellar/wallet-backend/internal/tss" | ||
"github.com/stellar/wallet-backend/internal/tss/router" | ||
"github.com/stellar/wallet-backend/internal/tss/services" | ||
"github.com/stellar/wallet-backend/internal/tss/store" | ||
) | ||
|
||
type RPCCallerChannelConfigs struct { | ||
TxManager services.TransactionManager | ||
Router router.Router | ||
Store store.Store | ||
MaxBufferSize int | ||
MaxWorkers int | ||
} | ||
|
||
type rpcCallerPool struct { | ||
Pool *pond.WorkerPool | ||
TxManager services.TransactionManager | ||
Router router.Router | ||
Store store.Store | ||
} | ||
|
||
var RPCCallerChannelName = "RPCCallerChannel" | ||
|
||
var _ tss.Channel = (*rpcCallerPool)(nil) | ||
|
||
func NewRPCCallerChannel(cfg RPCCallerChannelConfigs) *rpcCallerPool { | ||
pool := pond.New(cfg.MaxBufferSize, cfg.MaxWorkers, pond.Strategy(pond.Balanced())) | ||
return &rpcCallerPool{ | ||
Pool: pool, | ||
TxManager: cfg.TxManager, | ||
Store: cfg.Store, | ||
Router: cfg.Router, | ||
} | ||
|
||
} | ||
|
||
func (p *rpcCallerPool) Send(payload tss.Payload) { | ||
p.Pool.Submit(func() { | ||
p.Receive(payload) | ||
}) | ||
} | ||
|
||
func (p *rpcCallerPool) Receive(payload tss.Payload) { | ||
|
||
ctx := context.Background() | ||
// Create a new transaction record in the transactions table. | ||
err := p.Store.UpsertTransaction(ctx, payload.WebhookURL, payload.TransactionHash, payload.TransactionXDR, tss.RPCTXStatus{OtherStatus: tss.NewStatus}) | ||
|
||
if err != nil { | ||
log.Errorf("%s: unable to upsert transaction into transactions table: %e", RPCCallerChannelName, err) | ||
return | ||
} | ||
rpcSendResp, err := p.TxManager.BuildAndSubmitTransaction(ctx, RPCCallerChannelName, payload) | ||
|
||
if err != nil { | ||
log.Errorf("%s: unable to sign and submit transaction: %e", RPCCallerChannelName, err) | ||
return | ||
} | ||
payload.RpcSubmitTxResponse = rpcSendResp | ||
err = p.Router.Route(payload) | ||
if err != nil { | ||
log.Errorf("%s: unable to route payload: %e", RPCCallerChannelName, err) | ||
} | ||
} | ||
|
||
func (p *rpcCallerPool) SetRouter(router router.Router) { | ||
p.Router = router | ||
} | ||
|
||
func (p *rpcCallerPool) Stop() { | ||
p.Pool.StopAndWait() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
package channels | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
|
||
"github.com/stellar/wallet-backend/internal/db" | ||
"github.com/stellar/wallet-backend/internal/db/dbtest" | ||
"github.com/stellar/wallet-backend/internal/entities" | ||
"github.com/stellar/wallet-backend/internal/tss" | ||
"github.com/stellar/wallet-backend/internal/tss/router" | ||
"github.com/stellar/wallet-backend/internal/tss/services" | ||
"github.com/stellar/wallet-backend/internal/tss/store" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestSend(t *testing.T) { | ||
dbt := dbtest.Open(t) | ||
defer dbt.Close() | ||
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) | ||
require.NoError(t, err) | ||
defer dbConnectionPool.Close() | ||
store, _ := store.NewStore(dbConnectionPool) | ||
txManagerMock := services.TransactionManagerMock{} | ||
routerMock := router.MockRouter{} | ||
cfgs := RPCCallerChannelConfigs{ | ||
Store: store, | ||
TxManager: &txManagerMock, | ||
Router: &routerMock, | ||
MaxBufferSize: 10, | ||
MaxWorkers: 10, | ||
} | ||
channel := NewRPCCallerChannel(cfgs) | ||
payload := tss.Payload{} | ||
payload.WebhookURL = "www.stellar.com" | ||
payload.TransactionHash = "hash" | ||
payload.TransactionXDR = "xdr" | ||
|
||
rpcResp := tss.RPCSendTxResponse{ | ||
Status: tss.RPCTXStatus{RPCStatus: entities.TryAgainLaterStatus}, | ||
} | ||
payload.RpcSubmitTxResponse = rpcResp | ||
|
||
txManagerMock. | ||
On("BuildAndSubmitTransaction", context.Background(), RPCCallerChannelName, payload). | ||
Return(rpcResp, nil). | ||
Once() | ||
|
||
routerMock. | ||
On("Route", payload). | ||
Return(nil). | ||
Once() | ||
|
||
channel.Send(payload) | ||
channel.Stop() | ||
|
||
routerMock.AssertCalled(t, "Route", payload) | ||
} | ||
|
||
func TestReceivee(t *testing.T) { | ||
dbt := dbtest.Open(t) | ||
defer dbt.Close() | ||
dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) | ||
require.NoError(t, err) | ||
defer dbConnectionPool.Close() | ||
store, _ := store.NewStore(dbConnectionPool) | ||
txManagerMock := services.TransactionManagerMock{} | ||
routerMock := router.MockRouter{} | ||
cfgs := RPCCallerChannelConfigs{ | ||
Store: store, | ||
TxManager: &txManagerMock, | ||
Router: &routerMock, | ||
MaxBufferSize: 10, | ||
MaxWorkers: 10, | ||
} | ||
channel := NewRPCCallerChannel(cfgs) | ||
payload := tss.Payload{} | ||
payload.WebhookURL = "www.stellar.com" | ||
payload.TransactionHash = "hash" | ||
payload.TransactionXDR = "xdr" | ||
|
||
t.Run("build_and_submit_tx_fail", func(t *testing.T) { | ||
txManagerMock. | ||
On("BuildAndSubmitTransaction", context.Background(), RPCCallerChannelName, payload). | ||
Return(tss.RPCSendTxResponse{}, errors.New("build tx failed")). | ||
Once() | ||
|
||
channel.Receive(payload) | ||
|
||
routerMock.AssertNotCalled(t, "Route", payload) | ||
}) | ||
|
||
t.Run("payload_routed", func(t *testing.T) { | ||
rpcResp := tss.RPCSendTxResponse{ | ||
Status: tss.RPCTXStatus{RPCStatus: entities.ErrorStatus}, | ||
} | ||
payload.RpcSubmitTxResponse = rpcResp | ||
|
||
txManagerMock. | ||
On("BuildAndSubmitTransaction", context.Background(), RPCCallerChannelName, payload). | ||
Return(rpcResp, nil). | ||
Once() | ||
|
||
routerMock. | ||
On("Route", payload). | ||
Return(nil). | ||
Once() | ||
|
||
channel.Receive(payload) | ||
|
||
routerMock.AssertCalled(t, "Route", payload) | ||
}) | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good idea to check in the
NewRouter
andNewStore
functions that the parameters are passed in correctly and are notnil
. If a parameter is invalid, they can return an error.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did add a nil check for NewStore, but kept NewRouter the way it is right now, for reasons I explained in another comment, I would like the option of some of these channels being nil