From 651193a2834b61be9aeb146a6d6836e5480aec6a Mon Sep 17 00:00:00 2001 From: mpetrun5 Date: Tue, 17 Oct 2023 11:37:42 +0200 Subject: [PATCH] Integrate core changes into substrate listener --- chains/substrate/listener/listener.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/chains/substrate/listener/listener.go b/chains/substrate/listener/listener.go index 7f8ee29c..12b93a0c 100644 --- a/chains/substrate/listener/listener.go +++ b/chains/substrate/listener/listener.go @@ -9,9 +9,10 @@ import ( "time" "github.com/ChainSafe/chainbridge-core/relayer/message" + "github.com/ChainSafe/chainbridge-core/store" + "github.com/ChainSafe/sygma-relayer/chains/substrate" - "github.com/ChainSafe/chainbridge-core/store" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser" "github.com/centrifuge/go-substrate-rpc-client/v4/types" "github.com/rs/zerolog" @@ -21,6 +22,7 @@ import ( type EventHandler interface { HandleEvents(evts []*parser.Event, msgChan chan []*message.Message) error } + type ChainConnection interface { UpdateMetatdata() error GetHeaderLatest() (*types.Header, error) @@ -54,6 +56,7 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big. endBlock := big.NewInt(0) go func() { + loop: for { select { case <-ctx.Done(): @@ -61,13 +64,13 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big. default: hash, err := l.conn.GetFinalizedHead() if err != nil { - l.log.Error().Err(err).Msg("Failed to fetch finalized header") + l.log.Warn().Err(err).Msg("Failed to fetch finalized header") time.Sleep(l.blockRetryInterval) continue } head, err := l.conn.GetBlock(hash) if err != nil { - l.log.Error().Err(err).Msg("Failed to fetch block") + l.log.Warn().Err(err).Msg("Failed to fetch block") time.Sleep(l.blockRetryInterval) continue } @@ -85,7 +88,7 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big. evts, err := l.fetchEvents(startBlock, endBlock) if err != nil { - l.log.Err(err).Msgf("Failed fetching events for block range %s-%s", startBlock, endBlock) + l.log.Warn().Err(err).Msgf("Failed fetching events for block range %s-%s", startBlock, endBlock) time.Sleep(l.blockRetryInterval) continue } @@ -93,11 +96,11 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big. for _, handler := range l.eventHandlers { err := handler.HandleEvents(evts, msgChan) if err != nil { - l.log.Error().Err(err).Msg("Error handling substrate events") - continue + l.log.Warn().Err(err).Msg("Error handling substrate events") + continue loop } } - err = blockstore.StoreBlock(startBlock, domainID) + err = blockstore.StoreBlock(endBlock, domainID) if err != nil { l.log.Error().Str("block", startBlock.String()).Err(err).Msg("Failed to write latest block to blockstore") }