Skip to content

Commit

Permalink
Integrate core changes into substrate listener
Browse files Browse the repository at this point in the history
  • Loading branch information
mpetrun5 committed Oct 17, 2023
1 parent 24f34d7 commit 651193a
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions chains/substrate/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (
"time"

"github.com/ChainSafe/chainbridge-core/relayer/message"
"github.com/ChainSafe/chainbridge-core/store"

"github.com/ChainSafe/sygma-relayer/chains/substrate"

"github.com/ChainSafe/chainbridge-core/store"
"github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser"
"github.com/centrifuge/go-substrate-rpc-client/v4/types"
"github.com/rs/zerolog"
Expand All @@ -21,6 +22,7 @@ import (
type EventHandler interface {
HandleEvents(evts []*parser.Event, msgChan chan []*message.Message) error
}

type ChainConnection interface {
UpdateMetatdata() error
GetHeaderLatest() (*types.Header, error)
Expand Down Expand Up @@ -54,20 +56,21 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.
endBlock := big.NewInt(0)

go func() {
loop:
for {
select {
case <-ctx.Done():
return
default:
hash, err := l.conn.GetFinalizedHead()
if err != nil {
l.log.Error().Err(err).Msg("Failed to fetch finalized header")
l.log.Warn().Err(err).Msg("Failed to fetch finalized header")
time.Sleep(l.blockRetryInterval)
continue
}
head, err := l.conn.GetBlock(hash)
if err != nil {
l.log.Error().Err(err).Msg("Failed to fetch block")
l.log.Warn().Err(err).Msg("Failed to fetch block")
time.Sleep(l.blockRetryInterval)
continue
}
Expand All @@ -85,19 +88,19 @@ func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.

evts, err := l.fetchEvents(startBlock, endBlock)
if err != nil {
l.log.Err(err).Msgf("Failed fetching events for block range %s-%s", startBlock, endBlock)
l.log.Warn().Err(err).Msgf("Failed fetching events for block range %s-%s", startBlock, endBlock)
time.Sleep(l.blockRetryInterval)
continue
}

for _, handler := range l.eventHandlers {
err := handler.HandleEvents(evts, msgChan)
if err != nil {
l.log.Error().Err(err).Msg("Error handling substrate events")
continue
l.log.Warn().Err(err).Msg("Error handling substrate events")
continue loop
}
}
err = blockstore.StoreBlock(startBlock, domainID)
err = blockstore.StoreBlock(endBlock, domainID)
if err != nil {
l.log.Error().Str("block", startBlock.String()).Err(err).Msg("Failed to write latest block to blockstore")
}
Expand Down

0 comments on commit 651193a

Please sign in to comment.