Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Substrate message handling refactor #246

Merged
merged 13 commits into from
Jan 22, 2024
30 changes: 13 additions & 17 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"github.com/ChainSafe/chainbridge-core/logger"
"github.com/ChainSafe/chainbridge-core/lvldb"
"github.com/ChainSafe/chainbridge-core/opentelemetry"
"github.com/ChainSafe/chainbridge-core/relayer"
"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/ChainSafe/chainbridge-core/store"
"github.com/ChainSafe/sygma-relayer/chains/evm"
Expand All @@ -30,6 +29,8 @@
"github.com/ChainSafe/sygma-relayer/chains/evm/listener/depositHandlers"
hubEventHandlers "github.com/ChainSafe/sygma-relayer/chains/evm/listener/eventHandlers"
"github.com/ChainSafe/sygma-relayer/chains/substrate"
coreSubstrate "github.com/sygmaprotocol/sygma-core/chains/substrate"

"github.com/ChainSafe/sygma-relayer/chains/substrate/client"
"github.com/ChainSafe/sygma-relayer/chains/substrate/connection"
substrateExecutor "github.com/ChainSafe/sygma-relayer/chains/substrate/executor"
Expand All @@ -56,6 +57,7 @@
"github.com/rs/zerolog/log"
"github.com/spf13/viper"

"github.com/sygmaprotocol/sygma-core/relayer"
coreMessage "github.com/sygmaprotocol/sygma-core/relayer/message"
)

Expand Down Expand Up @@ -152,7 +154,7 @@

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
chains := []relayer.RelayedChain{}
chains := make(map[uint8]relayer.RelayedChain)
for _, chainConfig := range configuration.ChainConfigs {
switch chainConfig["type"] {
case "evm":
Expand Down Expand Up @@ -214,7 +216,7 @@

chain := coreEvm.NewEVMChain(evmListener, mh, executor, *config.GeneralChainConfig.Id, config.StartBlock)

chains = append(chains, chain)
chains[0] = chain
}
case "substrate":
{
Expand Down Expand Up @@ -244,13 +246,13 @@
eventHandlers = append(eventHandlers, substrate_listener.NewRetryEventHandler(l, conn, depositHandler, *config.GeneralChainConfig.Id))
substrateListener := substrate_listener.NewSubstrateListener(conn, eventHandlers, config)

mh := substrateExecutor.NewSubstrateMessageHandler()
mh.RegisterMessageHandler(message.FungibleTransfer, substrateExecutor.FungibleTransferMessageHandler)
mh := coreMessage.NewMessageHandler()
mh.RegisterMessageHandler(substrateExecutor.FungibleTransfer, &substrateExecutor.SubstrateMessageHandler{})

sExecutor := substrateExecutor.NewExecutor(host, communication, coordinator, mh, bridgePallet, keyshareStore, conn, exitLock)
substrateChain := substrate.NewSubstrateChain(substrateClient, substrateListener, nil, blockstore, config, sExecutor)
sExecutor := substrateExecutor.NewExecutor(host, communication, coordinator, bridgePallet, keyshareStore, conn, exitLock)
substrateChain := coreSubstrate.NewSubstrateChain(substrateListener, mh, sExecutor, *config.GeneralChainConfig.Id, config.StartBlock)

Check failure on line 253 in app/app.go

View workflow job for this annotation

GitHub Actions / linter-check

cannot use substrateListener (variable of type *"github.com/ChainSafe/sygma-relayer/chains/substrate/listener".SubstrateListener) as type "github.com/sygmaprotocol/sygma-core/chains/substrate".EventListener in argument to coreSubstrate.NewSubstrateChain:

Check failure on line 253 in app/app.go

View workflow job for this annotation

GitHub Actions / linter-check

cannot use substrateListener (variable of type *"github.com/ChainSafe/sygma-relayer/chains/substrate/listener".SubstrateListener) as type "github.com/sygmaprotocol/sygma-core/chains/substrate".EventListener in argument to coreSubstrate.NewSubstrateChain:

Check failure on line 253 in app/app.go

View workflow job for this annotation

GitHub Actions / test (1.19.x, ubuntu-latest)

cannot use substrateListener (variable of type *"github.com/ChainSafe/sygma-relayer/chains/substrate/listener".SubstrateListener) as type "github.com/sygmaprotocol/sygma-core/chains/substrate".EventListener in argument to coreSubstrate.NewSubstrateChain:

chains = append(chains, substrateChain)
chains[1] = substrateChain
}
default:
panic(fmt.Errorf("type '%s' not recognized", chainConfig["type"]))
Expand All @@ -259,13 +261,10 @@

go jobs.StartCommunicationHealthCheckJob(host, configuration.RelayerConfig.MpcConfig.CommHealthCheckInterval, sygmaMetrics)

r := relayer.NewRelayer(
chains,
sygmaMetrics,
)
r := relayer.NewRelayer(chains)

errChn := make(chan error)
go r.Start(ctx, errChn)
msgChan := make(chan []*coreMessage.Message)
go r.Start(ctx, msgChan)

sysErr := make(chan os.Signal, 1)
signal.Notify(sysErr,
Expand All @@ -283,9 +282,6 @@
}

select {
case err := <-errChn:
log.Error().Err(err).Msg("failed to listen and serve")
return err
case sig := <-sysErr:
log.Info().Msgf("terminating got ` [%v] signal", sig)
return nil
Expand Down
7 changes: 7 additions & 0 deletions chains/proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func ProposalsHash(proposals []*TransferProposal, chainID int64, verifContract s
return crypto.Keccak256(rawData), nil
}

type TransferMessage struct {
Source uint8
Destination uint8
Data TransferMessageData
Type coreMessage.MessageType
}

type TransferMessageData struct {
DepositNonce uint64
ResourceId [32]byte
Expand Down
101 changes: 0 additions & 101 deletions chains/substrate/chain.go

This file was deleted.

51 changes: 27 additions & 24 deletions chains/substrate/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ChainSafe/sygma-relayer/chains/substrate/connection"
"github.com/binance-chain/tss-lib/common"
"github.com/sourcegraph/conc/pool"
"github.com/sygmaprotocol/sygma-core/relayer/message"
"github.com/sygmaprotocol/sygma-core/relayer/proposal"

"github.com/centrifuge/go-substrate-rpc-client/v4/rpc/author"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
Expand All @@ -22,26 +24,24 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"

"github.com/ChainSafe/chainbridge-core/relayer/message"

"github.com/ChainSafe/sygma-relayer/comm"
"github.com/ChainSafe/sygma-relayer/tss"
"github.com/ChainSafe/sygma-relayer/tss/signing"
)

const (
FungibleTransfer message.MessageType = "FungibleTransfer"
)

var (
executionCheckPeriod = time.Minute
signingTimeout = 30 * time.Minute
)

type MessageHandler interface {
HandleMessage(m *message.Message) (*chains.Proposal, error)
}

type BridgePallet interface {
IsProposalExecuted(p *chains.Proposal) (bool, error)
ExecuteProposals(proposals []*chains.Proposal, signature []byte) (types.Hash, *author.ExtrinsicStatusSubscription, error)
ProposalsHash(proposals []*chains.Proposal) ([]byte, error)
IsProposalExecuted(p *chains.TransferProposal) (bool, error)
ExecuteProposals(proposals []*chains.TransferProposal, signature []byte) (types.Hash, *author.ExtrinsicStatusSubscription, error)
ProposalsHash(proposals []*chains.TransferProposal) ([]byte, error)
TrackExtrinsic(extHash types.Hash, sub *author.ExtrinsicStatusSubscription) error
}

Expand All @@ -51,7 +51,6 @@ type Executor struct {
comm comm.Communication
fetcher signing.SaveDataFetcher
bridge BridgePallet
mh MessageHandler
conn *connection.Connection
exitLock *sync.RWMutex
}
Expand All @@ -60,7 +59,6 @@ func NewExecutor(
host host.Host,
comm comm.Communication,
coordinator *tss.Coordinator,
mh MessageHandler,
bridgePallet BridgePallet,
fetcher signing.SaveDataFetcher,
conn *connection.Connection,
Expand All @@ -70,7 +68,6 @@ func NewExecutor(
host: host,
comm: comm,
coordinator: coordinator,
mh: mh,
bridge: bridgePallet,
fetcher: fetcher,
conn: conn,
Expand All @@ -79,18 +76,24 @@ func NewExecutor(
}

// Execute starts a signing process and executes proposals when signature is generated
func (e *Executor) Execute(msgs []*message.Message) error {
func (e *Executor) Execute(proposals []*proposal.Proposal) error {

e.exitLock.RLock()
defer e.exitLock.RUnlock()

proposals := make([]*chains.Proposal, 0)
for _, m := range msgs {
prop, err := e.mh.HandleMessage(m)
if err != nil {
return err
transferProposals := make([]*chains.TransferProposal, 0)

for _, prop := range proposals {

transferProposal := &chains.TransferProposal{
Source: prop.Source,
Destination: prop.Destination,
Data: prop.Data.(chains.TransferProposalData),
Type: prop.Type,
}
transferProposals = append(transferProposals, transferProposal)

isExecuted, err := e.bridge.IsProposalExecuted(prop)
isExecuted, err := e.bridge.IsProposalExecuted(transferProposal)
if err != nil {
return err
}
Expand All @@ -104,7 +107,7 @@ func (e *Executor) Execute(msgs []*message.Message) error {
return nil
}

propHash, err := e.bridge.ProposalsHash(proposals)
propHash, err := e.bridge.ProposalsHash(transferProposals)
if err != nil {
return err
}
Expand Down Expand Up @@ -136,12 +139,12 @@ func (e *Executor) Execute(msgs []*message.Message) error {
return err
})
pool.Go(func() error {
return e.watchExecution(watchContext, cancelExecution, proposals, sigChn, sessionID)
return e.watchExecution(watchContext, cancelExecution, transferProposals, sigChn, sessionID)
})
return pool.Wait()
}

func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, proposals []*chains.Proposal, sigChn chan interface{}, sessionID string) error {
func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.CancelFunc, proposals []*chains.TransferProposal, sigChn chan interface{}, sessionID string) error {
ticker := time.NewTicker(executionCheckPeriod)
timeout := time.NewTicker(signingTimeout)
defer ticker.Stop()
Expand Down Expand Up @@ -187,7 +190,7 @@ func (e *Executor) watchExecution(ctx context.Context, cancelExecution context.C
}
}

func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *common.SignatureData) (types.Hash, *author.ExtrinsicStatusSubscription, error) {
func (e *Executor) executeProposal(proposals []*chains.TransferProposal, signatureData *common.SignatureData) (types.Hash, *author.ExtrinsicStatusSubscription, error) {
sig := []byte{}
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.R, 32)...)
sig = append(sig[:], ethCommon.LeftPadBytes(signatureData.S, 32)...)
Expand All @@ -202,7 +205,7 @@ func (e *Executor) executeProposal(proposals []*chains.Proposal, signatureData *
return hash, sub, err
}

func (e *Executor) areProposalsExecuted(proposals []*chains.Proposal, sessionID string) bool {
func (e *Executor) areProposalsExecuted(proposals []*chains.TransferProposal, sessionID string) bool {
for _, prop := range proposals {
isExecuted, err := e.bridge.IsProposalExecuted(prop)
if err != nil || !isExecuted {
Expand Down
Loading
Loading