Skip to content

Commit

Permalink
Merge branch 'main' into mumbai_pipeline_testnet
Browse files Browse the repository at this point in the history
  • Loading branch information
eedygreen authored Oct 4, 2023
2 parents 8cc5f1e + 67a5ae7 commit da9326e
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 345 deletions.
4 changes: 2 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,12 @@ func Run() error {
tssListener := events.NewListener(client)
eventHandlers := make([]coreListener.EventHandler, 0)
l := log.With().Str("chain", fmt.Sprintf("%v", config.GeneralChainConfig.Name)).Uint8("domainID", *config.GeneralChainConfig.Id)
eventHandlers = append(eventHandlers, listener.NewDepositEventHandler(l, depositListener, depositHandler, bridgeAddress, *config.GeneralChainConfig.Id))
eventHandlers = append(eventHandlers, coreListener.NewDepositEventHandler(depositListener, depositHandler, bridgeAddress, *config.GeneralChainConfig.Id))
eventHandlers = append(eventHandlers, listener.NewKeygenEventHandler(l, tssListener, coordinator, host, communication, keyshareStore, bridgeAddress, networkTopology.Threshold))
eventHandlers = append(eventHandlers, listener.NewRefreshEventHandler(l, topologyProvider, topologyStore, tssListener, coordinator, host, communication, connectionGate, keyshareStore, bridgeAddress))
eventHandlers = append(eventHandlers, listener.NewRetryEventHandler(l, tssListener, depositHandler, bridgeAddress, *config.GeneralChainConfig.Id, config.BlockConfirmations))
evmListener := coreListener.NewEVMListener(client, eventHandlers, blockstore, sygmaMetrics, *config.GeneralChainConfig.Id, config.BlockRetryInterval, config.BlockConfirmations, config.BlockInterval)
executor := executor.NewExecutor(host, communication, coordinator, mh, bridgeContract, keyshareStore, exitLock)
executor := executor.NewExecutor(host, communication, coordinator, mh, bridgeContract, keyshareStore, exitLock, config.GasLimit.Uint64())

chain := evm.NewEVMChain(
client, evmListener, executor, blockstore, *config.GeneralChainConfig.Id, config.StartBlock,
Expand Down
2 changes: 1 addition & 1 deletion chains/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type RawEVMConfig struct {
MaxGasPrice int64 `mapstructure:"maxGasPrice" default:"500000000000"`
GasMultiplier float64 `mapstructure:"gasMultiplier" default:"1"`
GasIncreasePercentage int64 `mapstructure:"gasIncreasePercentage" default:"15"`
GasLimit int64 `mapstructure:"gasLimit" default:"2000000"`
GasLimit int64 `mapstructure:"gasLimit" default:"15000000"`
StartBlock int64 `mapstructure:"startBlock"`
BlockConfirmations int64 `mapstructure:"blockConfirmations" default:"10"`
BlockInterval int64 `mapstructure:"blockInterval" default:"5"`
Expand Down
2 changes: 1 addition & 1 deletion chains/evm/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *NewEVMConfigTestSuite) Test_ValidConfig() {
Id: id,
},
Bridge: "bridgeAddress",
GasLimit: big.NewInt(2000000),
GasLimit: big.NewInt(15000000),
MaxGasPrice: big.NewInt(500000000000),
GasMultiplier: big.NewFloat(1),
GasIncreasePercentage: big.NewInt(15),
Expand Down
192 changes: 116 additions & 76 deletions chains/evm/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import (

const TRANSFER_GAS_COST = 200000

type Batch struct {
proposals []*chains.Proposal
gasLimit uint64
}

var (
executionCheckPeriod = time.Minute
signingTimeout = 30 * time.Minute
Expand All @@ -44,13 +49,14 @@ type BridgeContract interface {
}

type Executor struct {
coordinator *tss.Coordinator
host host.Host
comm comm.Communication
fetcher signing.SaveDataFetcher
bridge BridgeContract
mh MessageHandler
exitLock *sync.RWMutex
coordinator *tss.Coordinator
host host.Host
comm comm.Communication
fetcher signing.SaveDataFetcher
bridge BridgeContract
mh MessageHandler
exitLock *sync.RWMutex
transactionMaxGas uint64
}

func NewExecutor(
Expand All @@ -61,15 +67,17 @@ func NewExecutor(
bridgeContract BridgeContract,
fetcher signing.SaveDataFetcher,
exitLock *sync.RWMutex,
transactionMaxGas uint64,
) *Executor {
return &Executor{
host: host,
comm: comm,
coordinator: coordinator,
mh: mh,
bridge: bridgeContract,
fetcher: fetcher,
exitLock: exitLock,
host: host,
comm: comm,
coordinator: coordinator,
mh: mh,
bridge: bridgeContract,
fetcher: fetcher,
exitLock: exitLock,
transactionMaxGas: transactionMaxGas,
}
}

Expand All @@ -78,63 +86,57 @@ func (e *Executor) Execute(msgs []*message.Message) error {
e.exitLock.RLock()
defer e.exitLock.RUnlock()

proposals := make([]*chains.Proposal, 0)
for _, m := range msgs {
prop, err := e.mh.HandleMessage(m)
if err != nil {
return err
}
evmProposal := chains.NewProposal(prop.Source, prop.Destination, prop.DepositNonce, prop.ResourceId, prop.Data, prop.Metadata)
isExecuted, err := e.bridge.IsProposalExecuted(evmProposal)
if err != nil {
return err
}
if isExecuted {
log.Info().Msgf("Prop %p already executed", prop)
continue
}

proposals = append(proposals, evmProposal)
}
if len(proposals) == 0 {
return nil
}

propHash, err := e.bridge.ProposalsHash(proposals)
batches, err := e.proposalBatches(msgs)
if err != nil {
return err
}

sessionID := e.sessionID(propHash)
msg := big.NewInt(0)
msg.SetBytes(propHash)
signing, err := signing.NewSigning(
msg,
e.sessionID(propHash),
e.host,
e.comm,
e.fetcher)
if err != nil {
return err
}

sigChn := make(chan interface{})
executionContext, cancelExecution := context.WithCancel(context.Background())
watchContext, cancelWatch := context.WithCancel(context.Background())
pool := pool.New().WithErrors()
pool.Go(func() error {
err := e.coordinator.Execute(executionContext, signing, sigChn)
if err != nil {
cancelWatch()
p := pool.New().WithErrors()
for _, batch := range batches {
if len(batch.proposals) == 0 {
continue
}

return err
})
pool.Go(func() error { return e.watchExecution(watchContext, cancelExecution, proposals, sigChn, sessionID) })
return pool.Wait()
b := batch
p.Go(func() error {
propHash, err := e.bridge.ProposalsHash(b.proposals)
if err != nil {
return err
}

sessionID := e.sessionID(propHash)
msg := big.NewInt(0)
msg.SetBytes(propHash)
signing, err := signing.NewSigning(
msg,
e.sessionID(propHash),
e.host,
e.comm,
e.fetcher)
if err != nil {
return err
}

sigChn := make(chan interface{})
executionContext, cancelExecution := context.WithCancel(context.Background())
watchContext, cancelWatch := context.WithCancel(context.Background())
ep := pool.New().WithErrors()
ep.Go(func() error {
err := e.coordinator.Execute(executionContext, signing, sigChn)
if err != nil {
cancelWatch()
}

return err
})
ep.Go(func() error { return e.watchExecution(watchContext, cancelExecution, b, sigChn, sessionID) })
return ep.Wait()
})
}
return p.Wait()
}

func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, proposals []*chains.Proposal, sigChn chan interface{}, sessionID string) error {
func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, batch *Batch, sigChn chan interface{}, sessionID string) error {
ticker := time.NewTicker(executionCheckPeriod)
timeout := time.NewTicker(signingTimeout)
defer ticker.Stop()
Expand All @@ -151,7 +153,7 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}

signatureData := sigResult.(*common.SignatureData)
hash, err := e.executeProposal(proposals, signatureData)
hash, err := e.executeBatch(batch, signatureData)
if err != nil {
_ = e.comm.Broadcast(e.host.Peerstore().Peers(), []byte{}, comm.TssFailMsg, sessionID)
return err
Expand All @@ -161,7 +163,7 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}
case <-ticker.C:
{
if !e.areProposalsExecuted(proposals, sessionID) {
if !e.areProposalsExecuted(batch.proposals, sessionID) {
continue
}

Expand All @@ -180,23 +182,61 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}
}

func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *common.SignatureData) (*ethCommon.Hash, error) {
func (e *Executor) proposalBatches(msgs []*message.Message) ([]*Batch, error) {
batches := make([]*Batch, 1)
currentBatch := &Batch{
proposals: make([]*chains.Proposal, 0),
gasLimit: 0,
}
batches[0] = currentBatch

for _, m := range msgs {
prop, err := e.mh.HandleMessage(m)
if err != nil {
return nil, err
}

evmProposal := chains.NewProposal(prop.Source, prop.Destination, prop.DepositNonce, prop.ResourceId, prop.Data, prop.Metadata)
isExecuted, err := e.bridge.IsProposalExecuted(evmProposal)
if err != nil {
return nil, err
}
if isExecuted {
log.Info().Msgf("Proposal %p already executed", prop)
continue
}

var propGasLimit uint64
l, ok := evmProposal.Metadata.Data["gasLimit"]
if ok {
propGasLimit = l.(uint64)
} else {
propGasLimit = uint64(TRANSFER_GAS_COST)
}
currentBatch.gasLimit += propGasLimit
if currentBatch.gasLimit >= e.transactionMaxGas {
currentBatch = &Batch{
proposals: make([]*chains.Proposal, 0),
gasLimit: 0,
}
batches = append(batches, currentBatch)
}

currentBatch.proposals = append(currentBatch.proposals, evmProposal)
}

return batches, nil
}

func (e *Executor) executeBatch(batch *Batch, signatureData *common.SignatureData) (*ethCommon.Hash, error) {
sig := []byte{}
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.R, 32)...)
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.S, 32)...)
sig = append(sig[:], signatureData.SignatureRecovery...)
sig[len(sig)-1] += 27 // Transform V from 0/1 to 27/28

var gasLimit uint64
l, ok := proposals[0].Metadata.Data["gasLimit"]
if ok {
gasLimit = l.(uint64)
} else {
gasLimit = uint64(TRANSFER_GAS_COST * len(proposals))
}

hash, err := e.bridge.ExecuteProposals(proposals, sig, transactor.TransactOptions{
GasLimit: gasLimit,
hash, err := e.bridge.ExecuteProposals(batch.proposals, sig, transactor.TransactOptions{
GasLimit: batch.gasLimit,
})
if err != nil {
return nil, err
Expand Down
72 changes: 0 additions & 72 deletions chains/evm/listener/event-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,78 +36,6 @@ type EventListener interface {
FetchDepositEvent(event hubEvents.RetryEvent, bridgeAddress common.Address, blockConfirmations *big.Int) ([]events.Deposit, error)
}

type DepositEventHandler struct {
log zerolog.Logger
eventListener listener.EventListener
depositHandler listener.DepositHandler

bridgeAddress common.Address
domainID uint8
}

func NewDepositEventHandler(
logC zerolog.Context,
eventListener listener.EventListener,
depositHandler listener.DepositHandler,
bridgeAddress common.Address,
domainID uint8,
) *DepositEventHandler {
return &DepositEventHandler{
log: logC.Logger(),
eventListener: eventListener,
depositHandler: depositHandler,
bridgeAddress: bridgeAddress,
domainID: domainID,
}
}

func (eh *DepositEventHandler) HandleEvent(
startBlock *big.Int,
endBlock *big.Int,
msgChan chan []*message.Message,
) error {
deposits, err := eh.eventListener.FetchDeposits(context.Background(), eh.bridgeAddress, startBlock, endBlock)
if err != nil {
return fmt.Errorf("unable to fetch deposit events because of: %+v", err)
}

domainDeposits := make(map[uint8][]*message.Message)
for _, d := range deposits {
func(d *events.Deposit) {
defer func() {
if r := recover(); r != nil {
eh.log.Error().Err(err).Msgf("panic occured while handling deposit %+v", d)
}
}()

m, err := eh.depositHandler.HandleDeposit(
eh.domainID, d.DestinationDomainID, d.DepositNonce, d.ResourceID, d.Data, d.HandlerResponse,
)
if err != nil {
eh.log.Error().Err(err).Str("start block", startBlock.String()).Str(
"end block", endBlock.String(),
).Uint8("domainID", eh.domainID).Msgf("%v", err)
return
}

eh.log.Info().Msgf("Resolved deposit message %+v in block range: %s-%s", m, startBlock.String(), endBlock.String())

if m.Type == PermissionlessGenericTransfer {
msgChan <- []*message.Message{m}
return
}

domainDeposits[m.Destination] = append(domainDeposits[m.Destination], m)
}(d)
}

for _, deposits := range domainDeposits {
msgChan <- deposits
}

return nil
}

type RetryEventHandler struct {
log zerolog.Logger
eventListener EventListener
Expand Down
Loading

0 comments on commit da9326e

Please sign in to comment.