diff --git a/chains/btc/executor/executor.go b/chains/btc/executor/executor.go index bee6fffb..2ee5ae85 100644 --- a/chains/btc/executor/executor.go +++ b/chains/btc/executor/executor.go @@ -31,8 +31,9 @@ import ( var ( signingTimeout = 30 * time.Minute - INPUT_SIZE = 180 - OUTPUT_SIZE = 34 + INPUT_SIZE uint64 = 180 + OUTPUT_SIZE uint64 = 34 + FEE_ROUNDING_FACTOR uint64 = 5 ) type MempoolAPI interface { @@ -88,8 +89,8 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error { e.exitLock.RLock() defer e.exitLock.RUnlock() - sessionID := proposals[0].MessageID - props, err := e.proposalsForExecution(proposals) + messageID := proposals[0].MessageID + props, err := e.proposalsForExecution(proposals, messageID) if err != nil { return err } @@ -113,27 +114,29 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error { return fmt.Errorf("no resource for ID %s", hex.EncodeToString(resourceID[:])) } - sessionID := fmt.Sprintf("%s-%s", sessionID, hex.EncodeToString(resourceID[:])) - return e.executeResourceProps(props, resource, sessionID) + return e.executeResourceProps(props, resource, messageID) }) } return p.Wait() } -func (e *Executor) executeResourceProps(props []*BtcTransferProposal, resource config.Resource, sessionID string) error { - log.Info().Str("SessionID", sessionID).Msgf("Executing proposals for resource %s", hex.EncodeToString(resource.ResourceID[:])) +func (e *Executor) executeResourceProps(props []*BtcTransferProposal, resource config.Resource, messageID string) error { + log.Info().Str("messageID", messageID).Msgf("Executing proposals %+v for resource %s", props, hex.EncodeToString(resource.ResourceID[:])) tx, utxos, err := e.rawTx(props, resource) if err != nil { return err } - sigChn := make(chan interface{}) + sigChn := make(chan interface{}, len(tx.TxIn)) p := pool.New().WithErrors() executionContext, cancelExecution := context.WithCancel(context.Background()) watchContext, cancelWatch := context.WithCancel(context.Background()) + sessionID := fmt.Sprintf("%s-%s", messageID, hex.EncodeToString(resource.ResourceID[:])) defer cancelWatch() - p.Go(func() error { return e.watchExecution(watchContext, cancelExecution, tx, props, sigChn, sessionID) }) + p.Go(func() error { + return e.watchExecution(watchContext, cancelExecution, tx, props, sigChn, sessionID, messageID) + }) prevOuts := make(map[wire.OutPoint]*wire.TxOut) for _, utxo := range utxos { txOut := wire.NewTxOut(int64(utxo.Value), resource.Script) @@ -146,33 +149,49 @@ func (e *Executor) executeResourceProps(props []*BtcTransferProposal, resource c prevOutputFetcher := txscript.NewMultiPrevOutFetcher(prevOuts) sigHashes := txscript.NewTxSigHashes(tx, prevOutputFetcher) + var buf buffer.Buffer + _ = tx.Serialize(&buf) + bytes := buf.Bytes() + log.Info().Str("messageID", messageID).Msgf("Assembled raw unsigned transaction %s", hex.EncodeToString(bytes)) + // we need to sign each input individually + tssProcesses := make([]tss.TssProcess, len(tx.TxIn)) for i := range tx.TxIn { + sessionID := fmt.Sprintf("%s-%d", sessionID, i) txHash, err := txscript.CalcTaprootSignatureHash(sigHashes, txscript.SigHashDefault, tx, i, prevOutputFetcher) if err != nil { return err } - p.Go(func() error { - msg := new(big.Int) - msg.SetBytes(txHash[:]) - signing, err := signing.NewSigning( - i, - msg, - resource.Tweak, - fmt.Sprintf("%s-%d", sessionID, i), - e.host, - e.comm, - e.fetcher) - if err != nil { - return err - } - return e.coordinator.Execute(executionContext, signing, sigChn) - }) + msg := new(big.Int) + msg.SetBytes(txHash[:]) + signing, err := signing.NewSigning( + i, + msg, + resource.Tweak, + messageID, + sessionID, + e.host, + e.comm, + e.fetcher) + if err != nil { + return err + } + tssProcesses[i] = signing } + p.Go(func() error { + return e.coordinator.Execute(executionContext, tssProcesses, sigChn) + }) return p.Wait() } -func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, tx *wire.MsgTx, proposals []*BtcTransferProposal, sigChn chan interface{}, sessionID string) error { +func (e *Executor) watchExecution( + ctx context.Context, + cancelExecution context.CancelFunc, + tx *wire.MsgTx, + proposals []*BtcTransferProposal, + sigChn chan interface{}, + sessionID string, + messageID string) error { timeout := time.NewTicker(signingTimeout) defer timeout.Stop() defer cancelExecution() @@ -192,7 +211,7 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C } cancelExecution() - hash, err := e.sendTx(tx, signatures) + hash, err := e.sendTx(tx, signatures, messageID) if err != nil { _ = e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID) e.storeProposalsStatus(proposals, store.FailedProp) @@ -200,7 +219,8 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C } e.storeProposalsStatus(proposals, store.ExecutedProp) - log.Info().Str("SessionID", sessionID).Msgf("Sent proposals execution with hash: %s", hash) + log.Info().Str("messageID", messageID).Msgf("Sent proposals execution with hash: %s", hash) + return nil } case <-timeout.C: { @@ -220,7 +240,7 @@ func (e *Executor) rawTx(proposals []*BtcTransferProposal, resource config.Resou if err != nil { return nil, nil, err } - feeEstimate, err := e.fee(int64(len(proposals)), int64(len(proposals))) + feeEstimate, err := e.fee(uint64(len(proposals)), uint64(len(proposals))) if err != nil { return nil, nil, err } @@ -231,26 +251,26 @@ func (e *Executor) rawTx(proposals []*BtcTransferProposal, resource config.Resou if inputAmount < outputAmount { return nil, nil, fmt.Errorf("utxo input amount %d less than output amount %d", inputAmount, outputAmount) } - fee, err := e.fee(int64(len(utxos)), int64(len(proposals))+1) + fee, err := e.fee(uint64(len(utxos)), uint64(len(proposals))+1) if err != nil { return nil, nil, err } - returnAmount := int64(inputAmount) - fee - outputAmount + returnAmount := inputAmount - fee - outputAmount if returnAmount > 0 { // return extra funds returnScript, err := txscript.PayToAddrScript(resource.Address) if err != nil { return nil, nil, err } - txOut := wire.NewTxOut(returnAmount, returnScript) + txOut := wire.NewTxOut(int64(returnAmount), returnScript) tx.AddTxOut(txOut) } return tx, utxos, err } -func (e *Executor) outputs(tx *wire.MsgTx, proposals []*BtcTransferProposal) (int64, error) { - outputAmount := int64(0) +func (e *Executor) outputs(tx *wire.MsgTx, proposals []*BtcTransferProposal) (uint64, error) { + outputAmount := uint64(0) for _, prop := range proposals { addr, err := btcutil.DecodeAddress(prop.Data.Recipient, &e.chainCfg) if err != nil { @@ -260,16 +280,16 @@ func (e *Executor) outputs(tx *wire.MsgTx, proposals []*BtcTransferProposal) (in if err != nil { return 0, err } - txOut := wire.NewTxOut(prop.Data.Amount, destinationAddrByte) + txOut := wire.NewTxOut(int64(prop.Data.Amount), destinationAddrByte) tx.AddTxOut(txOut) outputAmount += prop.Data.Amount } return outputAmount, nil } -func (e *Executor) inputs(tx *wire.MsgTx, address btcutil.Address, outputAmount int64) (int64, []mempool.Utxo, error) { +func (e *Executor) inputs(tx *wire.MsgTx, address btcutil.Address, outputAmount uint64) (uint64, []mempool.Utxo, error) { usedUtxos := make([]mempool.Utxo, 0) - inputAmount := int64(0) + inputAmount := uint64(0) utxos, err := e.mempool.Utxos(address.String()) if err != nil { return 0, nil, err @@ -284,7 +304,7 @@ func (e *Executor) inputs(tx *wire.MsgTx, address btcutil.Address, outputAmount tx.AddTxIn(txIn) usedUtxos = append(usedUtxos, utxo) - inputAmount += int64(utxo.Value) + inputAmount += uint64(utxo.Value) if inputAmount > outputAmount { break } @@ -292,15 +312,16 @@ func (e *Executor) inputs(tx *wire.MsgTx, address btcutil.Address, outputAmount return inputAmount, usedUtxos, nil } -func (e *Executor) fee(numOfInputs, numOfOutputs int64) (int64, error) { +func (e *Executor) fee(numOfInputs, numOfOutputs uint64) (uint64, error) { recommendedFee, err := e.mempool.RecommendedFee() if err != nil { return 0, err } - return (numOfInputs*int64(INPUT_SIZE) + numOfOutputs*int64(OUTPUT_SIZE)) * recommendedFee.EconomyFee, nil + + return (numOfInputs*INPUT_SIZE + numOfOutputs*OUTPUT_SIZE) * ((recommendedFee.EconomyFee/FEE_ROUNDING_FACTOR)*FEE_ROUNDING_FACTOR + FEE_ROUNDING_FACTOR), nil } -func (e *Executor) sendTx(tx *wire.MsgTx, signatures []taproot.Signature) (*chainhash.Hash, error) { +func (e *Executor) sendTx(tx *wire.MsgTx, signatures []taproot.Signature, messageID string) (*chainhash.Hash, error) { for i, sig := range signatures { tx.TxIn[i].Witness = wire.TxWitness{sig} } @@ -311,7 +332,7 @@ func (e *Executor) sendTx(tx *wire.MsgTx, signatures []taproot.Signature) (*chai return nil, err } bytes := buf.Bytes() - log.Debug().Msgf("Assembled raw transaction %s", hex.EncodeToString(bytes)) + log.Debug().Str("messageID", messageID).Msgf("Assembled raw transaction %s", hex.EncodeToString(bytes)) return e.conn.SendRawTransaction(tx, true) } @@ -325,7 +346,7 @@ func (e *Executor) signaturesFilled(signatures []taproot.Signature) bool { return true } -func (e *Executor) proposalsForExecution(proposals []*proposal.Proposal) ([]*BtcTransferProposal, error) { +func (e *Executor) proposalsForExecution(proposals []*proposal.Proposal, messageID string) ([]*BtcTransferProposal, error) { e.propMutex.Lock() props := make([]*BtcTransferProposal, 0) for _, prop := range proposals { @@ -335,7 +356,7 @@ func (e *Executor) proposalsForExecution(proposals []*proposal.Proposal) ([]*Btc } if executed { - log.Info().Msgf("Proposal %s already executed", fmt.Sprintf("%d-%d-%d", prop.Source, prop.Destination, prop.Data.(BtcTransferProposalData).DepositNonce)) + log.Warn().Str("messageID", messageID).Msgf("Proposal %s already executed", fmt.Sprintf("%d-%d-%d", prop.Source, prop.Destination, prop.Data.(BtcTransferProposalData).DepositNonce)) continue } diff --git a/chains/btc/executor/message-handler.go b/chains/btc/executor/message-handler.go index 8a2dced8..0cfead28 100644 --- a/chains/btc/executor/message-handler.go +++ b/chains/btc/executor/message-handler.go @@ -19,7 +19,7 @@ import ( ) type BtcTransferProposalData struct { - Amount int64 + Amount uint64 Recipient string DepositNonce uint64 ResourceId [32]byte @@ -69,7 +69,7 @@ func ERC20MessageHandler(msg *transfer.TransferMessage) (*proposal.Proposal, err bigAmount.Div(bigAmount, divisor) return proposal.NewProposal(msg.Source, msg.Destination, BtcTransferProposalData{ - Amount: bigAmount.Int64(), + Amount: bigAmount.Uint64(), Recipient: string(recipient), DepositNonce: msg.Data.DepositNonce, ResourceId: msg.Data.ResourceId, diff --git a/chains/btc/listener/deposit-handler.go b/chains/btc/listener/deposit-handler.go index 278ceacf..df36e252 100644 --- a/chains/btc/listener/deposit-handler.go +++ b/chains/btc/listener/deposit-handler.go @@ -22,7 +22,8 @@ func NewBtcDepositHandler() *BtcDepositHandler { return &BtcDepositHandler{} } -func (e *BtcDepositHandler) HandleDeposit(sourceID uint8, +func (e *BtcDepositHandler) HandleDeposit( + sourceID uint8, depositNonce uint64, resourceID [32]byte, amount *big.Int, diff --git a/chains/btc/mempool/mempool.go b/chains/btc/mempool/mempool.go index 445b9c75..814bfeb0 100644 --- a/chains/btc/mempool/mempool.go +++ b/chains/btc/mempool/mempool.go @@ -23,11 +23,11 @@ type Utxo struct { } type Fee struct { - FastestFee int64 - HalfHourFee int64 - MinimumFee int64 - EconomyFee int64 - HourFee int64 + FastestFee uint64 + HalfHourFee uint64 + MinimumFee uint64 + EconomyFee uint64 + HourFee uint64 } type MempoolAPI struct { @@ -77,7 +77,11 @@ func (a *MempoolAPI) Utxos(address string) ([]Utxo, error) { return nil, err } sort.Slice(utxos, func(i int, j int) bool { - return utxos[i].Status.BlockTime < utxos[j].Status.BlockTime + if utxos[i].Status.BlockTime == utxos[j].Status.BlockTime { + return utxos[i].TxID < utxos[j].TxID + } else { + return utxos[i].Status.BlockTime < utxos[j].Status.BlockTime + } }) return utxos, nil diff --git a/chains/btc/mempool/mempool_test.go b/chains/btc/mempool/mempool_test.go index cc507edc..20c068b1 100644 --- a/chains/btc/mempool/mempool_test.go +++ b/chains/btc/mempool/mempool_test.go @@ -60,6 +60,17 @@ func (s *MempoolTestSuite) Test_Utxo_SuccessfulFetch() { BlockTime: 1715083122, }, }, + { + TxID: "28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe2", + Vout: 0, + Value: 11197, + Status: mempool.Status{ + Confirmed: true, + BlockHeight: 2812826, + BlockHash: "000000000000001a01d4058773384f2c23aed5a7e5ede252f99e290fa58324a3", + BlockTime: 1715083122, + }, + }, { TxID: "28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe9", Vout: 0, diff --git a/chains/btc/mempool/test-data/successful-utxo.json b/chains/btc/mempool/test-data/successful-utxo.json index 0435b6c9..d47aceb3 100644 --- a/chains/btc/mempool/test-data/successful-utxo.json +++ b/chains/btc/mempool/test-data/successful-utxo.json @@ -1,4 +1,5 @@ [ {"txid":"28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe9","vout":0,"status":{"confirmed":true,"block_height":2812827,"block_hash":"000000000000001a01d4058773384f2c23aed5a7e5ede252f99e290fa58324a5","block_time":1715083123},"value":11198}, + {"txid":"28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe2","vout":0,"status":{"confirmed":true,"block_height":2812826,"block_hash":"000000000000001a01d4058773384f2c23aed5a7e5ede252f99e290fa58324a3","block_time":1715083122},"value":11197}, {"txid":"28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe1","vout":0,"status":{"confirmed":true,"block_height":2812826,"block_hash":"000000000000001a01d4058773384f2c23aed5a7e5ede252f99e290fa58324a3","block_time":1715083122},"value":11197} ] \ No newline at end of file diff --git a/chains/evm/calls/contracts/bridge/bridge.go b/chains/evm/calls/contracts/bridge/bridge.go index bc1f6804..47eab036 100644 --- a/chains/evm/calls/contracts/bridge/bridge.go +++ b/chains/evm/calls/contracts/bridge/bridge.go @@ -106,7 +106,7 @@ func (c *BridgeContract) IsProposalExecuted(p *transfer.TransferProposal) (bool, Str("depositNonce", strconv.FormatUint(p.Data.DepositNonce, 10)). Str("resourceID", hexutil.Encode(p.Data.ResourceId[:])). Msg("Getting is proposal executed") - res, err := c.CallContract("isProposalExecuted", p.Source, big.NewInt(int64(p.Data.DepositNonce))) + res, err := c.CallContract("isProposalExecuted", p.Source, new(big.Int).SetUint64(p.Data.DepositNonce)) if err != nil { return false, err } diff --git a/chains/evm/executor/executor.go b/chains/evm/executor/executor.go index 348ada6a..6b283454 100644 --- a/chains/evm/executor/executor.go +++ b/chains/evm/executor/executor.go @@ -87,6 +87,7 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error { if len(batch.proposals) == 0 { continue } + messageID := batch.proposals[0].MessageID b := batch p.Go(func() error { @@ -95,13 +96,14 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error { return err } - sessionID := fmt.Sprintf("%s-%d", batch.proposals[0].MessageID, i) + sessionID := fmt.Sprintf("%s-%d", messageID, i) log.Info().Str("messageID", batch.proposals[0].MessageID).Msgf("Starting session with ID: %s", sessionID) msg := big.NewInt(0) msg.SetBytes(propHash) signing, err := signing.NewSigning( msg, + messageID, sessionID, e.host, e.comm, @@ -115,21 +117,27 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error { watchContext, cancelWatch := context.WithCancel(context.Background()) ep := pool.New().WithErrors() ep.Go(func() error { - err := e.coordinator.Execute(executionContext, signing, sigChn) + err := e.coordinator.Execute(executionContext, []tss.TssProcess{signing}, sigChn) if err != nil { cancelWatch() } return err }) - ep.Go(func() error { return e.watchExecution(watchContext, cancelExecution, b, sigChn, sessionID) }) + ep.Go(func() error { return e.watchExecution(watchContext, cancelExecution, b, sigChn, sessionID, messageID) }) return ep.Wait() }) } return p.Wait() } -func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, batch *Batch, sigChn chan interface{}, sessionID string) error { +func (e *Executor) watchExecution( + ctx context.Context, + cancelExecution context.CancelFunc, + batch *Batch, + sigChn chan interface{}, + sessionID string, + messageID string) error { ticker := time.NewTicker(executionCheckPeriod) timeout := time.NewTicker(signingTimeout) defer ticker.Stop() @@ -152,7 +160,7 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C return err } - log.Info().Str("messageID", sessionID).Msgf("Sent proposals execution with hash: %s", hash) + log.Info().Str("messageID", messageID).Msgf("Sent proposals execution with hash: %s", hash) } case <-ticker.C: { @@ -160,7 +168,7 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C continue } - log.Info().Str("messageID", sessionID).Msgf("Successfully executed proposals") + log.Info().Str("messageID", messageID).Msgf("Successfully executed proposals") return nil } case <-timeout.C: diff --git a/chains/evm/listener/eventHandlers/tss.go b/chains/evm/listener/eventHandlers/tss.go index 6a246ada..7dd12ebe 100644 --- a/chains/evm/listener/eventHandlers/tss.go +++ b/chains/evm/listener/eventHandlers/tss.go @@ -81,7 +81,7 @@ func (eh *KeygenEventHandler) HandleEvents( keygenBlockNumber := big.NewInt(0).SetUint64(keygenEvents[0].BlockNumber) keygen := keygen.NewKeygen(eh.sessionID(keygenBlockNumber), eh.threshold, eh.host, eh.communication, eh.storer) - err = eh.coordinator.Execute(context.Background(), keygen, make(chan interface{}, 1)) + err = eh.coordinator.Execute(context.Background(), []tss.TssProcess{keygen}, make(chan interface{}, 1)) if err != nil { log.Err(err).Msgf("Failed executing keygen") } @@ -146,7 +146,7 @@ func (eh *FrostKeygenEventHandler) HandleEvents( keygenBlockNumber := big.NewInt(0).SetUint64(keygenEvents[0].BlockNumber) keygen := frostKeygen.NewKeygen(eh.sessionID(keygenBlockNumber), eh.threshold, eh.host, eh.communication, eh.storer) - err = eh.coordinator.Execute(context.Background(), keygen, make(chan interface{}, 1)) + err = eh.coordinator.Execute(context.Background(), []tss.TssProcess{keygen}, make(chan interface{}, 1)) if err != nil { log.Err(err).Msgf("Failed executing keygen") } @@ -217,15 +217,18 @@ func (eh *RefreshEventHandler) HandleEvents( hash := refreshEvents[len(refreshEvents)-1].Hash if hash == "" { - return fmt.Errorf("hash cannot be empty string") + log.Error().Msgf("Hash cannot be empty string") + return nil } topology, err := eh.topologyProvider.NetworkTopology(hash) if err != nil { - return err + log.Error().Err(err).Msgf("Failed fetching network topology") + return nil } err = eh.topologyStore.StoreTopology(topology) if err != nil { - return err + log.Error().Err(err).Msgf("Failed storing network topology") + return nil } eh.connectionGate.SetTopology(topology) @@ -238,16 +241,18 @@ func (eh *RefreshEventHandler) HandleEvents( resharing := resharing.NewResharing( eh.sessionID(startBlock), topology.Threshold, eh.host, eh.communication, eh.ecdsaStorer, ) - err = eh.coordinator.Execute(context.Background(), resharing, make(chan interface{}, 1)) + err = eh.coordinator.Execute(context.Background(), []tss.TssProcess{resharing}, make(chan interface{}, 1)) if err != nil { log.Err(err).Msgf("Failed executing ecdsa key refresh") + return nil } frostResharing := frostResharing.NewResharing( eh.sessionID(startBlock), topology.Threshold, eh.host, eh.communication, eh.frostStorer, ) - err = eh.coordinator.Execute(context.Background(), frostResharing, make(chan interface{}, 1)) + err = eh.coordinator.Execute(context.Background(), []tss.TssProcess{frostResharing}, make(chan interface{}, 1)) if err != nil { log.Err(err).Msgf("Failed executing frost key refresh") + return nil } return nil } diff --git a/chains/proposal.go b/chains/proposal.go index 6652d7c7..0a8a72eb 100644 --- a/chains/proposal.go +++ b/chains/proposal.go @@ -5,6 +5,7 @@ package chains import ( "fmt" + "math/big" "github.com/ChainSafe/sygma-relayer/relayer/transfer" "github.com/ethereum/go-ethereum/common/hexutil" @@ -17,8 +18,8 @@ func ProposalsHash(proposals []*transfer.TransferProposal, chainID int64, verifC formattedProps := make([]interface{}, len(proposals)) for i, prop := range proposals { formattedProps[i] = map[string]interface{}{ - "originDomainID": math.NewHexOrDecimal256(int64(prop.Source)), - "depositNonce": math.NewHexOrDecimal256(int64(prop.Data.DepositNonce)), + "originDomainID": big.NewInt(int64(prop.Source)), + "depositNonce": new(big.Int).SetUint64(prop.Data.DepositNonce), "resourceID": hexutil.Encode(prop.Data.ResourceId[:]), "data": prop.Data.Data, } diff --git a/chains/proposal_test.go b/chains/proposal_test.go index afa8921c..9b685686 100644 --- a/chains/proposal_test.go +++ b/chains/proposal_test.go @@ -4,6 +4,8 @@ package chains import ( + "testing" + "github.com/ChainSafe/sygma-relayer/relayer/transfer" "github.com/stretchr/testify/suite" ) @@ -15,6 +17,10 @@ type ProposalTestSuite struct { suite.Suite } +func TestRunProposalTestSuite(t *testing.T) { + suite.Run(t, new(ProposalTestSuite)) +} + func (s *ProposalTestSuite) Test_ProposalsHash() { data := []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 90, 243, 16, 122, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 0, 1, 1, 0, 212, 53, 147, 199, 21, 253, 211, 28, 97, 20, 26, 189, 4, 169, 159, 214, 130, 44, 133, 88, 133, 76, 205, 227, 154, 86, 132, 231, 165, 109, 162, 125} @@ -22,13 +28,13 @@ func (s *ProposalTestSuite) Test_ProposalsHash() { Source: 1, Destination: 2, Data: transfer.TransferProposalData{ - DepositNonce: 3, + DepositNonce: 15078986465725403975, ResourceId: [32]byte{3}, Metadata: nil, Data: data, }, }} - correctRes := []byte{253, 216, 81, 25, 46, 239, 181, 138, 51, 225, 165, 111, 156, 95, 27, 239, 160, 87, 89, 84, 50, 22, 97, 185, 132, 200, 201, 210, 204, 99, 94, 131} + correctRes := []byte{0xde, 0x7b, 0x5c, 0x9e, 0x8, 0x7a, 0xb4, 0xf5, 0xfb, 0xe, 0x9f, 0x73, 0xa7, 0xe5, 0xbd, 0xb, 0xdf, 0x9e, 0xeb, 0x4, 0xaa, 0xbb, 0xd0, 0xe8, 0xf8, 0xde, 0x58, 0xa2, 0x4, 0xa3, 0x3e, 0x55} res, err := ProposalsHash(prop, 5, verifyingContract, bridgeVersion) s.Nil(err) diff --git a/chains/substrate/executor/executor.go b/chains/substrate/executor/executor.go index d3f8252f..de22429a 100644 --- a/chains/substrate/executor/executor.go +++ b/chains/substrate/executor/executor.go @@ -105,12 +105,13 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error { return err } - sessionID := transferProposals[0].MessageID + messageID := transferProposals[0].MessageID msg := big.NewInt(0) msg.SetBytes(propHash) signing, err := signing.NewSigning( msg, - sessionID, + messageID, + messageID, e.host, e.comm, e.fetcher) @@ -124,7 +125,7 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error { pool := pool.New().WithErrors() pool.Go(func() error { - err := e.coordinator.Execute(executionContext, signing, sigChn) + err := e.coordinator.Execute(executionContext, []tss.TssProcess{signing}, sigChn) if err != nil { cancelWatch() } @@ -132,7 +133,7 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error { return err }) pool.Go(func() error { - return e.watchExecution(watchContext, cancelExecution, transferProposals, sigChn, sessionID) + return e.watchExecution(watchContext, cancelExecution, transferProposals, sigChn, messageID) }) return pool.Wait() } diff --git a/tss/coordinator.go b/tss/coordinator.go index 49852127..ac0802b1 100644 --- a/tss/coordinator.go +++ b/tss/coordinator.go @@ -69,8 +69,10 @@ func NewCoordinator( } // Execute calculates process leader and coordinates party readiness and start the tss processes. -func (c *Coordinator) Execute(ctx context.Context, tssProcess TssProcess, resultChn chan interface{}) error { - sessionID := tssProcess.SessionID() +// Array of processes can be passed if all the processes have to have the same peer subset and +// the result of all of them is needed. The processes should have an unique session ID for each one. +func (c *Coordinator) Execute(ctx context.Context, tssProcesses []TssProcess, resultChn chan interface{}) error { + sessionID := tssProcesses[0].SessionID() value, ok := c.pendingProcesses[sessionID] if ok && value { log.Warn().Str("SessionID", sessionID).Msgf("Process already pending") @@ -89,71 +91,74 @@ func (c *Coordinator) Execute(ctx context.Context, tssProcess TssProcess, result c.processLock.Lock() c.pendingProcesses[sessionID] = false c.processLock.Unlock() - tssProcess.Stop() + for _, process := range tssProcesses { + process.Stop() + } }() coordinatorElector := c.electorFactory.CoordinatorElector(sessionID, elector.Static) - coordinator, _ := coordinatorElector.Coordinator(ctx, tssProcess.ValidCoordinators()) + coordinator, _ := coordinatorElector.Coordinator(ctx, tssProcesses[0].ValidCoordinators()) log.Info().Str("SessionID", sessionID).Msgf("Starting process with coordinator %s", coordinator.Pretty()) p.Go(func(ctx context.Context) error { - err := c.start(ctx, tssProcess, coordinator, resultChn, []peer.ID{}) + err := c.start(ctx, tssProcesses, coordinator, resultChn, []peer.ID{}) if err == nil { cancel() } return err }) p.Go(func(ctx context.Context) error { - return c.watchExecution(ctx, tssProcess, coordinator) + return c.watchExecution(ctx, tssProcesses[0], coordinator) }) err := p.Wait() if err == nil { return nil } - if !tssProcess.Retryable() { + if !tssProcesses[0].Retryable() { return err } - return c.handleError(ctx, err, tssProcess, resultChn) + return c.handleError(ctx, err, tssProcesses, resultChn) } -func (c *Coordinator) handleError(ctx context.Context, err error, tssProcess TssProcess, resultChn chan interface{}) error { +func (c *Coordinator) handleError(ctx context.Context, err error, tssProcesses []TssProcess, resultChn chan interface{}) error { ctx, cancel := context.WithCancel(ctx) defer cancel() rp := pool.New().WithContext(ctx).WithCancelOnError() rp.Go(func(ctx context.Context) error { - return c.watchExecution(ctx, tssProcess, peer.ID("")) + return c.watchExecution(ctx, tssProcesses[0], peer.ID("")) }) + sessionID := tssProcesses[0].SessionID() switch err := err.(type) { case *CoordinatorError: { - log.Err(err).Str("SessionID", tssProcess.SessionID()).Msgf("Tss process failed with error %+v", err) + log.Err(err).Str("SessionID", sessionID).Msgf("Tss process failed with error %+v", err) excludedPeers := []peer.ID{err.Peer} - rp.Go(func(ctx context.Context) error { return c.retry(ctx, tssProcess, resultChn, excludedPeers) }) + rp.Go(func(ctx context.Context) error { return c.retry(ctx, tssProcesses, resultChn, excludedPeers) }) } case *comm.CommunicationError: { - log.Err(err).Str("SessionID", tssProcess.SessionID()).Msgf("Tss process failed with error %+v", err) - rp.Go(func(ctx context.Context) error { return c.retry(ctx, tssProcess, resultChn, []peer.ID{}) }) + log.Err(err).Str("SessionID", sessionID).Msgf("Tss process failed with error %+v", err) + rp.Go(func(ctx context.Context) error { return c.retry(ctx, tssProcesses, resultChn, []peer.ID{}) }) } case *tss.Error: { - log.Err(err).Str("SessionID", tssProcess.SessionID()).Msgf("Tss process failed with error %+v", err) + log.Err(err).Str("SessionID", sessionID).Msgf("Tss process failed with error %+v", err) excludedPeers, err := common.PeersFromParties(err.Culprits()) if err != nil { return err } - rp.Go(func(ctx context.Context) error { return c.retry(ctx, tssProcess, resultChn, excludedPeers) }) + rp.Go(func(ctx context.Context) error { return c.retry(ctx, tssProcesses, resultChn, excludedPeers) }) } case *SubsetError: { // wait for start message if existing singing process fails rp.Go(func(ctx context.Context) error { - return c.waitForStart(ctx, tssProcess, resultChn, peer.ID(""), c.TssTimeout) + return c.waitForStart(ctx, tssProcesses, resultChn, peer.ID(""), c.TssTimeout) }) } default: @@ -197,24 +202,24 @@ func (c *Coordinator) watchExecution(ctx context.Context, tssProcess TssProcess, } // start initiates listeners for coordinator and participants with static calculated coordinator -func (c *Coordinator) start(ctx context.Context, tssProcess TssProcess, coordinator peer.ID, resultChn chan interface{}, excludedPeers []peer.ID) error { +func (c *Coordinator) start(ctx context.Context, tssProcesses []TssProcess, coordinator peer.ID, resultChn chan interface{}, excludedPeers []peer.ID) error { if coordinator.Pretty() == c.host.ID().Pretty() { - return c.initiate(ctx, tssProcess, resultChn, excludedPeers) + return c.initiate(ctx, tssProcesses, resultChn, excludedPeers) } else { - return c.waitForStart(ctx, tssProcess, resultChn, coordinator, c.CoordinatorTimeout) + return c.waitForStart(ctx, tssProcesses, resultChn, coordinator, c.CoordinatorTimeout) } } // retry initiates full bully process to calculate coordinator and starts a new tss process after // an expected error ocurred during regular tss execution -func (c *Coordinator) retry(ctx context.Context, tssProcess TssProcess, resultChn chan interface{}, excludedPeers []peer.ID) error { - coordinatorElector := c.electorFactory.CoordinatorElector(tssProcess.SessionID(), elector.Bully) - coordinator, err := coordinatorElector.Coordinator(ctx, common.ExcludePeers(tssProcess.ValidCoordinators(), excludedPeers)) +func (c *Coordinator) retry(ctx context.Context, tssProcesses []TssProcess, resultChn chan interface{}, excludedPeers []peer.ID) error { + coordinatorElector := c.electorFactory.CoordinatorElector(tssProcesses[0].SessionID(), elector.Bully) + coordinator, err := coordinatorElector.Coordinator(ctx, common.ExcludePeers(tssProcesses[0].ValidCoordinators(), excludedPeers)) if err != nil { return err } - return c.start(ctx, tssProcess, coordinator, resultChn, excludedPeers) + return c.start(ctx, tssProcesses, coordinator, resultChn, excludedPeers) } // broadcastInitiateMsg sends TssInitiateMsg to all peers @@ -228,11 +233,12 @@ func (c *Coordinator) broadcastInitiateMsg(sessionID string) { // initiate sends initiate message to all peers and waits // for ready response. After tss process declares that enough // peers are ready, start message is broadcasted and tss process is started. -func (c *Coordinator) initiate(ctx context.Context, tssProcess TssProcess, resultChn chan interface{}, excludedPeers []peer.ID) error { +func (c *Coordinator) initiate(ctx context.Context, tssProcesses []TssProcess, resultChn chan interface{}, excludedPeers []peer.ID) error { readyChan := make(chan *comm.WrappedMessage) readyPeers := make([]peer.ID, 0) readyPeers = append(readyPeers, c.host.ID()) + tssProcess := tssProcesses[0] subID := c.communication.Subscribe(tssProcess.SessionID(), comm.TssReadyMsg, readyChan) defer c.communication.UnSubscribe(subID) @@ -244,7 +250,7 @@ func (c *Coordinator) initiate(ctx context.Context, tssProcess TssProcess, resul case wMsg := <-readyChan: { log.Debug().Str("SessionID", tssProcess.SessionID()).Msgf("received ready message from %s", wMsg.From) - if !slices.Contains(excludedPeers, wMsg.From) { + if !slices.Contains(excludedPeers, wMsg.From) && !slices.Contains(readyPeers, wMsg.From) { readyPeers = append(readyPeers, wMsg.From) } ready, err := tssProcess.Ready(readyPeers, excludedPeers) @@ -262,7 +268,14 @@ func (c *Coordinator) initiate(ctx context.Context, tssProcess TssProcess, resul } _ = c.communication.Broadcast(c.host.Peerstore().Peers(), startMsgBytes, comm.TssStartMsg, tssProcess.SessionID()) - return tssProcess.Run(ctx, true, resultChn, startParams) + p := pool.New().WithContext(ctx).WithCancelOnError() + for _, process := range tssProcesses { + tssProcess := process + p.Go(func(ctx context.Context) error { + return tssProcess.Run(ctx, true, resultChn, startParams) + }) + } + return p.Wait() } case <-ticker.C: { @@ -280,7 +293,7 @@ func (c *Coordinator) initiate(ctx context.Context, tssProcess TssProcess, resul // when it receives the start message. func (c *Coordinator) waitForStart( ctx context.Context, - tssProcess TssProcess, + tssProcesses []TssProcess, resultChn chan interface{}, coordinator peer.ID, timeout time.Duration, @@ -288,6 +301,7 @@ func (c *Coordinator) waitForStart( msgChan := make(chan *comm.WrappedMessage) startMsgChn := make(chan *comm.WrappedMessage) + tssProcess := tssProcesses[0] initSubID := c.communication.Subscribe(tssProcess.SessionID(), comm.TssInitiateMsg, msgChan) defer c.communication.UnSubscribe(initSubID) startSubID := c.communication.Subscribe(tssProcess.SessionID(), comm.TssStartMsg, startMsgChn) @@ -327,7 +341,14 @@ func (c *Coordinator) waitForStart( return err } - return tssProcess.Run(ctx, false, resultChn, msg.Params) + p := pool.New().WithContext(ctx).WithCancelOnError() + for _, process := range tssProcesses { + tssProcess := process + p.Go(func(ctx context.Context) error { + return tssProcess.Run(ctx, false, resultChn, msg.Params) + }) + } + return p.Wait() } case <-coordinatorTimeoutTicker.C: { diff --git a/tss/ecdsa/keygen/keygen_test.go b/tss/ecdsa/keygen/keygen_test.go index 2ccb6770..2e88e5b5 100644 --- a/tss/ecdsa/keygen/keygen_test.go +++ b/tss/ecdsa/keygen/keygen_test.go @@ -50,7 +50,7 @@ func (s *KeygenTestSuite) Test_ValidKeygenProcess() { s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Times(3) pool := pool.New().WithContext(context.Background()).WithCancelOnError() for i, coordinator := range coordinators { - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], nil) }) + pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, nil) }) } err := pool.Wait() @@ -81,7 +81,7 @@ func (s *KeygenTestSuite) Test_KeygenTimeout() { s.MockECDSAStorer.EXPECT().StoreKeyshare(gomock.Any()).Times(0) pool := pool.New().WithContext(context.Background()) for i, coordinator := range coordinators { - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], nil) }) + pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, nil) }) } err := pool.Wait() diff --git a/tss/ecdsa/resharing/resharing.go b/tss/ecdsa/resharing/resharing.go index 3540fa15..f84b7dd2 100644 --- a/tss/ecdsa/resharing/resharing.go +++ b/tss/ecdsa/resharing/resharing.go @@ -136,7 +136,7 @@ func (r *Resharing) Run( // Stop ends all subscriptions created when starting the tss process and unlocks keyshare. func (r *Resharing) Stop() { - log.Info().Str("sessionID", r.SessionID()).Msgf("Stopping tss process.") + r.Log.Info().Msgf("Stopping tss process.") r.Communication.UnSubscribe(r.subscriptionID) r.storer.UnlockKeyshare() r.Cancel() @@ -165,7 +165,7 @@ func (r *Resharing) ValidCoordinators() []peer.ID { // StartParams returns threshold and peer subset from the old key to share with new parties. func (r *Resharing) StartParams(readyPeers []peer.ID) []byte { - oldSubset := common.PeersIntersection(r.key.Peers, r.Host.Peerstore().Peers()) + oldSubset := common.PeersIntersection(r.key.Peers, r.Host.Peerstore().Peers()) startParams := &startParams{ OldThreshold: r.key.Threshold, OldSubset: oldSubset, diff --git a/tss/ecdsa/resharing/resharing_test.go b/tss/ecdsa/resharing/resharing_test.go index 10a14203..737f813f 100644 --- a/tss/ecdsa/resharing/resharing_test.go +++ b/tss/ecdsa/resharing/resharing_test.go @@ -68,7 +68,9 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_OldAndNewSubset() { resultChn := make(chan interface{}) pool := pool.New().WithContext(context.Background()).WithCancelOnError() for i, coordinator := range coordinators { - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], resultChn) }) + pool.Go(func(ctx context.Context) error { + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) + }) } err := pool.Wait() @@ -114,7 +116,7 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_RemovePeer() { pool := pool.New().WithContext(context.Background()).WithCancelOnError() for i, coordinator := range coordinators { pool.Go(func(ctx context.Context) error { - return coordinator.Execute(ctx, processes[i], resultChn) + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) }) } @@ -164,7 +166,7 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Le pool := pool.New().WithContext(context.Background()) for i, coordinator := range coordinators { pool.Go(func(ctx context.Context) error { - return coordinator.Execute(ctx, processes[i], resultChn) + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) }) } err := pool.Wait() @@ -212,7 +214,9 @@ func (s *ResharingTestSuite) Test_InvalidResharingProcess_InvalidOldThreshold_Bi resultChn := make(chan interface{}) pool := pool.New().WithContext(context.Background()) for i, coordinator := range coordinators { - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], resultChn) }) + pool.Go(func(ctx context.Context) error { + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) + }) } err := pool.Wait() diff --git a/tss/ecdsa/signing/signing.go b/tss/ecdsa/signing/signing.go index c4a1c49b..1241d334 100644 --- a/tss/ecdsa/signing/signing.go +++ b/tss/ecdsa/signing/signing.go @@ -44,6 +44,7 @@ type Signing struct { func NewSigning( msg *big.Int, + messageID string, sessionID string, host host.Host, comm comm.Communication, @@ -64,7 +65,7 @@ func NewSigning( Communication: comm, Peers: key.Peers, SID: sessionID, - Log: log.With().Str("SessionID", sessionID).Str("Process", "signing").Logger(), + Log: log.With().Str("SessionID", sessionID).Str("messageID", messageID).Str("Process", "signing").Logger(), Cancel: func() {}, }, key: key, @@ -126,7 +127,7 @@ func (s *Signing) Run( p.Go(func(ctx context.Context) error { return s.processEndMessage(ctx, sigChn) }) p.Go(func(ctx context.Context) error { return s.monitorSigning(ctx) }) - s.Log.Info().Msgf("Started signing process") + s.Log.Info().Msgf("Started signing process for message %s", s.msg.Text(16)) tssError := s.Party.Start() if tssError != nil { @@ -138,7 +139,7 @@ func (s *Signing) Run( // Stop ends all subscriptions created when starting the tss process. func (s *Signing) Stop() { - log.Info().Str("sessionID", s.SessionID()).Msgf("Stopping tss process.") + s.Log.Info().Msgf("Stopping tss process.") s.Communication.UnSubscribe(s.subscriptionID) s.Cancel() } diff --git a/tss/ecdsa/signing/signing_test.go b/tss/ecdsa/signing/signing_test.go index 1ebc6e70..68e327ac 100644 --- a/tss/ecdsa/signing/signing_test.go +++ b/tss/ecdsa/signing/signing_test.go @@ -46,7 +46,7 @@ func (s *SigningTestSuite) Test_ValidSigningProcess() { msgBytes := []byte("Message") msg := big.NewInt(0) msg.SetBytes(msgBytes) - signing, err := signing.NewSigning(msg, "signing1", host, &communication, fetcher) + signing, err := signing.NewSigning(msg, "signing1", "signing1", host, &communication, fetcher) if err != nil { panic(err) } @@ -56,14 +56,14 @@ func (s *SigningTestSuite) Test_ValidSigningProcess() { } tsstest.SetupCommunication(communicationMap) - resultChn := make(chan interface{}) + resultChn := make(chan interface{}, 2) ctx, cancel := context.WithCancel(context.Background()) pool := pool.New().WithContext(ctx) for i, coordinator := range coordinators { coordinator := coordinator pool.Go(func(ctx context.Context) error { - return coordinator.Execute(ctx, processes[i], resultChn) + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) }) } @@ -96,7 +96,7 @@ func (s *SigningTestSuite) Test_SigningTimeout() { msgBytes := []byte("Message") msg := big.NewInt(0) msg.SetBytes(msgBytes) - signing, err := signing.NewSigning(msg, "signing2", host, &communication, fetcher) + signing, err := signing.NewSigning(msg, "signing2", "signing2", host, &communication, fetcher) if err != nil { panic(err) } @@ -112,7 +112,9 @@ func (s *SigningTestSuite) Test_SigningTimeout() { pool := pool.New().WithContext(context.Background()) for i, coordinator := range coordinators { coordinator := coordinator - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], resultChn) }) + pool.Go(func(ctx context.Context) error { + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) + }) } err := pool.Wait() @@ -140,8 +142,8 @@ func (s *SigningTestSuite) Test_PendingProcessExists() { s.MockECDSAStorer.EXPECT().UnlockKeyshare().AnyTimes() pool := pool.New().WithContext(context.Background()).WithCancelOnError() for i, coordinator := range coordinators { - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], nil) }) - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], nil) }) + pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, nil) }) + pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, nil) }) } err := pool.Wait() diff --git a/tss/frost/keygen/keygen_test.go b/tss/frost/keygen/keygen_test.go index 6d307630..3b80fe44 100644 --- a/tss/frost/keygen/keygen_test.go +++ b/tss/frost/keygen/keygen_test.go @@ -49,7 +49,7 @@ func (s *KeygenTestSuite) Test_ValidKeygenProcess() { pool := pool.New().WithContext(context.Background()).WithCancelOnError() for i, coordinator := range coordinators { - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], nil) }) + pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, nil) }) } err := pool.Wait() diff --git a/tss/frost/resharing/resharing.go b/tss/frost/resharing/resharing.go index f5dd9b3e..2810d422 100644 --- a/tss/frost/resharing/resharing.go +++ b/tss/frost/resharing/resharing.go @@ -57,6 +57,7 @@ func NewResharing( }, } } + key.Key.Threshold = threshold return &Resharing{ BaseFrostTss: common.BaseFrostTss{ @@ -110,7 +111,7 @@ func (r *Resharing) Run( // Stop ends all subscriptions created when starting the tss process and unlocks keyshare. func (r *Resharing) Stop() { - log.Info().Str("sessionID", r.SessionID()).Msgf("Stopping tss process.") + r.Log.Info().Msgf("Stopping tss process.") r.Communication.UnSubscribe(r.subscriptionID) r.storer.UnlockKeyshare() r.Cancel() diff --git a/tss/frost/resharing/resharing_test.go b/tss/frost/resharing/resharing_test.go index 8925c756..177b3896 100644 --- a/tss/frost/resharing/resharing_test.go +++ b/tss/frost/resharing/resharing_test.go @@ -68,7 +68,9 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_OldAndNewSubset() { resultChn := make(chan interface{}) pool := pool.New().WithContext(context.Background()).WithCancelOnError() for i, coordinator := range coordinators { - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], resultChn) }) + pool.Go(func(ctx context.Context) error { + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) + }) } err := pool.Wait() @@ -113,7 +115,9 @@ func (s *ResharingTestSuite) Test_ValidResharingProcess_RemovePeer() { resultChn := make(chan interface{}) pool := pool.New().WithContext(context.Background()).WithCancelOnError() for i, coordinator := range coordinators { - pool.Go(func(ctx context.Context) error { return coordinator.Execute(ctx, processes[i], resultChn) }) + pool.Go(func(ctx context.Context) error { + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) + }) } err := pool.Wait() diff --git a/tss/frost/signing/signing.go b/tss/frost/signing/signing.go index 4d3ee992..c4f07f46 100644 --- a/tss/frost/signing/signing.go +++ b/tss/frost/signing/signing.go @@ -52,6 +52,7 @@ func NewSigning( id int, msg *big.Int, tweak string, + messageID string, sessionID string, host host.Host, comm comm.Communication, @@ -85,7 +86,7 @@ func NewSigning( Communication: comm, Peers: key.Peers, SID: sessionID, - Log: log.With().Str("SessionID", sessionID).Str("Process", "signing").Logger(), + Log: log.With().Str("SessionID", sessionID).Str("messageID", messageID).Str("Process", "signing").Logger(), Cancel: func() {}, Done: make(chan bool), }, @@ -135,13 +136,13 @@ func (s *Signing) Run( p.Go(func(ctx context.Context) error { return s.processEndMessage(ctx) }) p.Go(func(ctx context.Context) error { return s.ProcessOutboundMessages(ctx, outChn, comm.TssKeySignMsg) }) - s.Log.Info().Msgf("Started signing process") + s.Log.Info().Msgf("Started signing process for message %s", s.msg.Text(16)) return p.Wait() } // Stop ends all subscriptions created when starting the tss process. func (s *Signing) Stop() { - log.Info().Str("sessionID", s.SessionID()).Msgf("Stopping tss process.") + s.Log.Info().Msgf("Stopping tss process.") s.Communication.UnSubscribe(s.subscriptionID) s.Cancel() } diff --git a/tss/frost/signing/signing_test.go b/tss/frost/signing/signing_test.go index b7feb435..ae8b3e56 100644 --- a/tss/frost/signing/signing_test.go +++ b/tss/frost/signing/signing_test.go @@ -60,7 +60,7 @@ func (s *SigningTestSuite) Test_ValidSigningProcess() { communicationMap[host.ID()] = &communication fetcher := keyshare.NewFrostKeyshareStore(fmt.Sprintf("../../test/keyshares/%d-frost.keyshare", i)) - signing, err := signing.NewSigning(1, msg, tweak, "signing1", host, &communication, fetcher) + signing, err := signing.NewSigning(1, msg, tweak, "signing1", "signing1", host, &communication, fetcher) if err != nil { panic(err) } @@ -70,14 +70,14 @@ func (s *SigningTestSuite) Test_ValidSigningProcess() { } tsstest.SetupCommunication(communicationMap) - resultChn := make(chan interface{}) + resultChn := make(chan interface{}, 2) ctx, cancel := context.WithCancel(context.Background()) pool := pool.New().WithContext(ctx) for i, coordinator := range coordinators { coordinator := coordinator pool.Go(func(ctx context.Context) error { - return coordinator.Execute(ctx, processes[i], resultChn) + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) }) } @@ -92,6 +92,74 @@ func (s *SigningTestSuite) Test_ValidSigningProcess() { s.Nil(err) } +func (s *SigningTestSuite) Test_MultipleProcesses() { + communicationMap := make(map[peer.ID]*tsstest.TestCommunication) + coordinators := []*tss.Coordinator{} + processes := [][]tss.TssProcess{} + + tweak := "c82aa6ae534bb28aaafeb3660c31d6a52e187d8f05d48bb6bdb9b733a9b42212" + tweakBytes, err := hex.DecodeString(tweak) + s.Nil(err) + h := &curve.Secp256k1Scalar{} + err = h.UnmarshalBinary(tweakBytes) + s.Nil(err) + + msgBytes := []byte("Message") + msg := big.NewInt(0) + msg.SetBytes(msgBytes) + for i, host := range s.Hosts { + communication := tsstest.TestCommunication{ + Host: host, + Subscriptions: make(map[comm.SubscriptionID]chan *comm.WrappedMessage), + } + communicationMap[host.ID()] = &communication + fetcher := keyshare.NewFrostKeyshareStore(fmt.Sprintf("../../test/keyshares/%d-frost.keyshare", i)) + + signing1, err := signing.NewSigning(1, msg, tweak, "signing1", "signing1", host, &communication, fetcher) + if err != nil { + panic(err) + } + signing2, err := signing.NewSigning(1, msg, tweak, "signing1", "signing2", host, &communication, fetcher) + if err != nil { + panic(err) + } + signing3, err := signing.NewSigning(1, msg, tweak, "signing1", "signing3", host, &communication, fetcher) + if err != nil { + panic(err) + } + electorFactory := elector.NewCoordinatorElectorFactory(host, s.BullyConfig) + coordinator := tss.NewCoordinator(host, &communication, electorFactory) + coordinators = append(coordinators, coordinator) + processes = append(processes, []tss.TssProcess{signing1, signing2, signing3}) + } + tsstest.SetupCommunication(communicationMap) + + resultChn := make(chan interface{}, 6) + ctx, cancel := context.WithCancel(context.Background()) + pool := pool.New().WithContext(ctx) + for i, coordinator := range coordinators { + coordinator := coordinator + + pool.Go(func(ctx context.Context) error { + return coordinator.Execute(ctx, processes[i], resultChn) + }) + } + + results := make([]signing.Signature, 6) + i := 0 + for result := range resultChn { + sig := result.(signing.Signature) + results[i] = sig + i++ + if i == 5 { + break + } + } + err = pool.Wait() + s.NotNil(err) + cancel() +} + func (s *SigningTestSuite) Test_ProcessTimeout() { communicationMap := make(map[peer.ID]*tsstest.TestCommunication) coordinators := []*tss.Coordinator{} @@ -115,7 +183,7 @@ func (s *SigningTestSuite) Test_ProcessTimeout() { communicationMap[host.ID()] = &communication fetcher := keyshare.NewFrostKeyshareStore(fmt.Sprintf("../../test/keyshares/%d-frost.keyshare", i)) - signing, err := signing.NewSigning(1, msg, tweak, "signing1", host, &communication, fetcher) + signing, err := signing.NewSigning(1, msg, tweak, "signing1", "signing1", host, &communication, fetcher) if err != nil { panic(err) } @@ -134,7 +202,7 @@ func (s *SigningTestSuite) Test_ProcessTimeout() { for i, coordinator := range coordinators { coordinator := coordinator pool.Go(func(ctx context.Context) error { - return coordinator.Execute(ctx, processes[i], resultChn) + return coordinator.Execute(ctx, []tss.TssProcess{processes[i]}, resultChn) }) } diff --git a/tss/test/communication.go b/tss/test/communication.go index b01faeca..0f694408 100644 --- a/tss/test/communication.go +++ b/tss/test/communication.go @@ -6,6 +6,7 @@ package tsstest import ( "fmt" "sync" + "time" "github.com/ChainSafe/sygma-relayer/comm" "github.com/libp2p/go-libp2p/core/host" @@ -35,6 +36,8 @@ func (tc *TestCommunication) Broadcast( Payload: msg, From: tc.Host.ID(), } + + time.Sleep(100 * time.Millisecond) for _, peer := range peers { if tc.PeerCommunications[peer.Pretty()] == nil { continue