Skip to content

Commit

Permalink
Merge branch 'main' into mpetrun5/retry-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrun5 committed Jul 19, 2024
2 parents 1150292 + a966428 commit 21c9f23
Show file tree
Hide file tree
Showing 24 changed files with 299 additions and 135 deletions.
111 changes: 66 additions & 45 deletions chains/btc/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
var (
signingTimeout = 30 * time.Minute

INPUT_SIZE = 180
OUTPUT_SIZE = 34
INPUT_SIZE uint64 = 180
OUTPUT_SIZE uint64 = 34
FEE_ROUNDING_FACTOR uint64 = 5
)

type MempoolAPI interface {
Expand Down Expand Up @@ -88,8 +89,8 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error {
e.exitLock.RLock()
defer e.exitLock.RUnlock()

sessionID := proposals[0].MessageID
props, err := e.proposalsForExecution(proposals)
messageID := proposals[0].MessageID
props, err := e.proposalsForExecution(proposals, messageID)
if err != nil {
return err
}
Expand All @@ -113,27 +114,29 @@ func (e *Executor) Execute(proposals []*proposal.Proposal) error {
return fmt.Errorf("no resource for ID %s", hex.EncodeToString(resourceID[:]))
}

sessionID := fmt.Sprintf("%s-%s", sessionID, hex.EncodeToString(resourceID[:]))
return e.executeResourceProps(props, resource, sessionID)
return e.executeResourceProps(props, resource, messageID)
})
}
return p.Wait()
}

func (e *Executor) executeResourceProps(props []*BtcTransferProposal, resource config.Resource, sessionID string) error {
log.Info().Str("SessionID", sessionID).Msgf("Executing proposals for resource %s", hex.EncodeToString(resource.ResourceID[:]))
func (e *Executor) executeResourceProps(props []*BtcTransferProposal, resource config.Resource, messageID string) error {
log.Info().Str("messageID", messageID).Msgf("Executing proposals %+v for resource %s", props, hex.EncodeToString(resource.ResourceID[:]))

tx, utxos, err := e.rawTx(props, resource)
if err != nil {
return err
}

sigChn := make(chan interface{})
sigChn := make(chan interface{}, len(tx.TxIn))
p := pool.New().WithErrors()
executionContext, cancelExecution := context.WithCancel(context.Background())
watchContext, cancelWatch := context.WithCancel(context.Background())
sessionID := fmt.Sprintf("%s-%s", messageID, hex.EncodeToString(resource.ResourceID[:]))
defer cancelWatch()
p.Go(func() error { return e.watchExecution(watchContext, cancelExecution, tx, props, sigChn, sessionID) })
p.Go(func() error {
return e.watchExecution(watchContext, cancelExecution, tx, props, sigChn, sessionID, messageID)
})
prevOuts := make(map[wire.OutPoint]*wire.TxOut)
for _, utxo := range utxos {
txOut := wire.NewTxOut(int64(utxo.Value), resource.Script)
Expand All @@ -146,33 +149,49 @@ func (e *Executor) executeResourceProps(props []*BtcTransferProposal, resource c
prevOutputFetcher := txscript.NewMultiPrevOutFetcher(prevOuts)
sigHashes := txscript.NewTxSigHashes(tx, prevOutputFetcher)

var buf buffer.Buffer
_ = tx.Serialize(&buf)
bytes := buf.Bytes()
log.Info().Str("messageID", messageID).Msgf("Assembled raw unsigned transaction %s", hex.EncodeToString(bytes))

// we need to sign each input individually
tssProcesses := make([]tss.TssProcess, len(tx.TxIn))
for i := range tx.TxIn {
sessionID := fmt.Sprintf("%s-%d", sessionID, i)
txHash, err := txscript.CalcTaprootSignatureHash(sigHashes, txscript.SigHashDefault, tx, i, prevOutputFetcher)
if err != nil {
return err
}
p.Go(func() error {
msg := new(big.Int)
msg.SetBytes(txHash[:])
signing, err := signing.NewSigning(
i,
msg,
resource.Tweak,
fmt.Sprintf("%s-%d", sessionID, i),
e.host,
e.comm,
e.fetcher)
if err != nil {
return err
}
return e.coordinator.Execute(executionContext, signing, sigChn)
})
msg := new(big.Int)
msg.SetBytes(txHash[:])
signing, err := signing.NewSigning(
i,
msg,
resource.Tweak,
messageID,
sessionID,
e.host,
e.comm,
e.fetcher)
if err != nil {
return err
}
tssProcesses[i] = signing
}
p.Go(func() error {
return e.coordinator.Execute(executionContext, tssProcesses, sigChn)
})
return p.Wait()
}

func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, tx *wire.MsgTx, proposals []*BtcTransferProposal, sigChn chan interface{}, sessionID string) error {
func (e *Executor) watchExecution(
ctx context.Context,
cancelExecution context.CancelFunc,
tx *wire.MsgTx,
proposals []*BtcTransferProposal,
sigChn chan interface{},
sessionID string,
messageID string) error {
timeout := time.NewTicker(signingTimeout)
defer timeout.Stop()
defer cancelExecution()
Expand All @@ -192,15 +211,16 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}
cancelExecution()

hash, err := e.sendTx(tx, signatures)
hash, err := e.sendTx(tx, signatures, messageID)
if err != nil {
_ = e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
e.storeProposalsStatus(proposals, store.FailedProp)
return err
}

e.storeProposalsStatus(proposals, store.ExecutedProp)
log.Info().Str("SessionID", sessionID).Msgf("Sent proposals execution with hash: %s", hash)
log.Info().Str("messageID", messageID).Msgf("Sent proposals execution with hash: %s", hash)
return nil
}
case <-timeout.C:
{
Expand All @@ -220,7 +240,7 @@ func (e *Executor) rawTx(proposals []*BtcTransferProposal, resource config.Resou
if err != nil {
return nil, nil, err
}
feeEstimate, err := e.fee(int64(len(proposals)), int64(len(proposals)))
feeEstimate, err := e.fee(uint64(len(proposals)), uint64(len(proposals)))
if err != nil {
return nil, nil, err
}
Expand All @@ -231,26 +251,26 @@ func (e *Executor) rawTx(proposals []*BtcTransferProposal, resource config.Resou
if inputAmount < outputAmount {
return nil, nil, fmt.Errorf("utxo input amount %d less than output amount %d", inputAmount, outputAmount)
}
fee, err := e.fee(int64(len(utxos)), int64(len(proposals))+1)
fee, err := e.fee(uint64(len(utxos)), uint64(len(proposals))+1)
if err != nil {
return nil, nil, err
}

returnAmount := int64(inputAmount) - fee - outputAmount
returnAmount := inputAmount - fee - outputAmount
if returnAmount > 0 {
// return extra funds
returnScript, err := txscript.PayToAddrScript(resource.Address)
if err != nil {
return nil, nil, err
}
txOut := wire.NewTxOut(returnAmount, returnScript)
txOut := wire.NewTxOut(int64(returnAmount), returnScript)
tx.AddTxOut(txOut)
}
return tx, utxos, err
}

func (e *Executor) outputs(tx *wire.MsgTx, proposals []*BtcTransferProposal) (int64, error) {
outputAmount := int64(0)
func (e *Executor) outputs(tx *wire.MsgTx, proposals []*BtcTransferProposal) (uint64, error) {
outputAmount := uint64(0)
for _, prop := range proposals {
addr, err := btcutil.DecodeAddress(prop.Data.Recipient, &e.chainCfg)
if err != nil {
Expand All @@ -260,16 +280,16 @@ func (e *Executor) outputs(tx *wire.MsgTx, proposals []*BtcTransferProposal) (in
if err != nil {
return 0, err
}
txOut := wire.NewTxOut(prop.Data.Amount, destinationAddrByte)
txOut := wire.NewTxOut(int64(prop.Data.Amount), destinationAddrByte)
tx.AddTxOut(txOut)
outputAmount += prop.Data.Amount
}
return outputAmount, nil
}

func (e *Executor) inputs(tx *wire.MsgTx, address btcutil.Address, outputAmount int64) (int64, []mempool.Utxo, error) {
func (e *Executor) inputs(tx *wire.MsgTx, address btcutil.Address, outputAmount uint64) (uint64, []mempool.Utxo, error) {
usedUtxos := make([]mempool.Utxo, 0)
inputAmount := int64(0)
inputAmount := uint64(0)
utxos, err := e.mempool.Utxos(address.String())
if err != nil {
return 0, nil, err
Expand All @@ -284,23 +304,24 @@ func (e *Executor) inputs(tx *wire.MsgTx, address btcutil.Address, outputAmount
tx.AddTxIn(txIn)

usedUtxos = append(usedUtxos, utxo)
inputAmount += int64(utxo.Value)
inputAmount += uint64(utxo.Value)
if inputAmount > outputAmount {
break
}
}
return inputAmount, usedUtxos, nil
}

func (e *Executor) fee(numOfInputs, numOfOutputs int64) (int64, error) {
func (e *Executor) fee(numOfInputs, numOfOutputs uint64) (uint64, error) {
recommendedFee, err := e.mempool.RecommendedFee()
if err != nil {
return 0, err
}
return (numOfInputs*int64(INPUT_SIZE) + numOfOutputs*int64(OUTPUT_SIZE)) * recommendedFee.EconomyFee, nil

return (numOfInputs*INPUT_SIZE + numOfOutputs*OUTPUT_SIZE) * ((recommendedFee.EconomyFee/FEE_ROUNDING_FACTOR)*FEE_ROUNDING_FACTOR + FEE_ROUNDING_FACTOR), nil
}

func (e *Executor) sendTx(tx *wire.MsgTx, signatures []taproot.Signature) (*chainhash.Hash, error) {
func (e *Executor) sendTx(tx *wire.MsgTx, signatures []taproot.Signature, messageID string) (*chainhash.Hash, error) {
for i, sig := range signatures {
tx.TxIn[i].Witness = wire.TxWitness{sig}
}
Expand All @@ -311,7 +332,7 @@ func (e *Executor) sendTx(tx *wire.MsgTx, signatures []taproot.Signature) (*chai
return nil, err
}
bytes := buf.Bytes()
log.Debug().Msgf("Assembled raw transaction %s", hex.EncodeToString(bytes))
log.Debug().Str("messageID", messageID).Msgf("Assembled raw transaction %s", hex.EncodeToString(bytes))
return e.conn.SendRawTransaction(tx, true)
}

Expand All @@ -325,7 +346,7 @@ func (e *Executor) signaturesFilled(signatures []taproot.Signature) bool {
return true
}

func (e *Executor) proposalsForExecution(proposals []*proposal.Proposal) ([]*BtcTransferProposal, error) {
func (e *Executor) proposalsForExecution(proposals []*proposal.Proposal, messageID string) ([]*BtcTransferProposal, error) {
e.propMutex.Lock()
props := make([]*BtcTransferProposal, 0)
for _, prop := range proposals {
Expand All @@ -335,7 +356,7 @@ func (e *Executor) proposalsForExecution(proposals []*proposal.Proposal) ([]*Btc
}

if executed {
log.Info().Msgf("Proposal %s already executed", fmt.Sprintf("%d-%d-%d", prop.Source, prop.Destination, prop.Data.(BtcTransferProposalData).DepositNonce))
log.Warn().Str("messageID", messageID).Msgf("Proposal %s already executed", fmt.Sprintf("%d-%d-%d", prop.Source, prop.Destination, prop.Data.(BtcTransferProposalData).DepositNonce))
continue
}

Expand Down
4 changes: 2 additions & 2 deletions chains/btc/executor/message-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

type BtcTransferProposalData struct {
Amount int64
Amount uint64
Recipient string
DepositNonce uint64
ResourceId [32]byte
Expand Down Expand Up @@ -69,7 +69,7 @@ func ERC20MessageHandler(msg *transfer.TransferMessage) (*proposal.Proposal, err
bigAmount.Div(bigAmount, divisor)

return proposal.NewProposal(msg.Source, msg.Destination, BtcTransferProposalData{
Amount: bigAmount.Int64(),
Amount: bigAmount.Uint64(),
Recipient: string(recipient),
DepositNonce: msg.Data.DepositNonce,
ResourceId: msg.Data.ResourceId,
Expand Down
3 changes: 2 additions & 1 deletion chains/btc/listener/deposit-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ func NewBtcDepositHandler() *BtcDepositHandler {
return &BtcDepositHandler{}
}

func (e *BtcDepositHandler) HandleDeposit(sourceID uint8,
func (e *BtcDepositHandler) HandleDeposit(
sourceID uint8,
depositNonce uint64,
resourceID [32]byte,
amount *big.Int,
Expand Down
16 changes: 10 additions & 6 deletions chains/btc/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ type Utxo struct {
}

type Fee struct {
FastestFee int64
HalfHourFee int64
MinimumFee int64
EconomyFee int64
HourFee int64
FastestFee uint64
HalfHourFee uint64
MinimumFee uint64
EconomyFee uint64
HourFee uint64
}

type MempoolAPI struct {
Expand Down Expand Up @@ -77,7 +77,11 @@ func (a *MempoolAPI) Utxos(address string) ([]Utxo, error) {
return nil, err
}
sort.Slice(utxos, func(i int, j int) bool {
return utxos[i].Status.BlockTime < utxos[j].Status.BlockTime
if utxos[i].Status.BlockTime == utxos[j].Status.BlockTime {
return utxos[i].TxID < utxos[j].TxID
} else {
return utxos[i].Status.BlockTime < utxos[j].Status.BlockTime
}
})

return utxos, nil
Expand Down
11 changes: 11 additions & 0 deletions chains/btc/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ func (s *MempoolTestSuite) Test_Utxo_SuccessfulFetch() {
BlockTime: 1715083122,
},
},
{
TxID: "28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe2",
Vout: 0,
Value: 11197,
Status: mempool.Status{
Confirmed: true,
BlockHeight: 2812826,
BlockHash: "000000000000001a01d4058773384f2c23aed5a7e5ede252f99e290fa58324a3",
BlockTime: 1715083122,
},
},
{
TxID: "28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe9",
Vout: 0,
Expand Down
1 change: 1 addition & 0 deletions chains/btc/mempool/test-data/successful-utxo.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[
{"txid":"28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe9","vout":0,"status":{"confirmed":true,"block_height":2812827,"block_hash":"000000000000001a01d4058773384f2c23aed5a7e5ede252f99e290fa58324a5","block_time":1715083123},"value":11198},
{"txid":"28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe2","vout":0,"status":{"confirmed":true,"block_height":2812826,"block_hash":"000000000000001a01d4058773384f2c23aed5a7e5ede252f99e290fa58324a3","block_time":1715083122},"value":11197},
{"txid":"28154e2008912d27978225c096c22ffe2ea65e1d55bf440ee41c21f9489c7fe1","vout":0,"status":{"confirmed":true,"block_height":2812826,"block_hash":"000000000000001a01d4058773384f2c23aed5a7e5ede252f99e290fa58324a3","block_time":1715083122},"value":11197}
]
2 changes: 1 addition & 1 deletion chains/evm/calls/contracts/bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (c *BridgeContract) IsProposalExecuted(p *transfer.TransferProposal) (bool,
Str("depositNonce", strconv.FormatUint(p.Data.DepositNonce, 10)).
Str("resourceID", hexutil.Encode(p.Data.ResourceId[:])).
Msg("Getting is proposal executed")
res, err := c.CallContract("isProposalExecuted", p.Source, big.NewInt(int64(p.Data.DepositNonce)))
res, err := c.CallContract("isProposalExecuted", p.Source, new(big.Int).SetUint64(p.Data.DepositNonce))
if err != nil {
return false, err
}
Expand Down
Loading

0 comments on commit 21c9f23

Please sign in to comment.