From 8bf35f057ae2c75a6797a5350af011f156b6e13e Mon Sep 17 00:00:00 2001 From: pk910 Date: Thu, 24 Aug 2023 17:38:40 +0200 Subject: [PATCH] fixed crashes due to bad data from rpc nodes --- indexer/cacheBlock.go | 4 ++++ indexer/cacheLogic.go | 9 +++++++-- indexer/client.go | 13 ++++++++----- indexer/indexer.go | 32 +++++++++++++++++++++++--------- 4 files changed, 42 insertions(+), 16 deletions(-) diff --git a/indexer/cacheBlock.go b/indexer/cacheBlock.go index fc6c180e..e50e75f2 100644 --- a/indexer/cacheBlock.go +++ b/indexer/cacheBlock.go @@ -144,3 +144,7 @@ func (block *CacheBlock) IsCanonical(indexer *Indexer, head []byte) bool { } return indexer.indexerCache.isCanonicalBlock(block.Root, head) } + +func (block *CacheBlock) IsReady() bool { + return block.header != nil && (block.block != nil || block.isInDb) +} diff --git a/indexer/cacheLogic.go b/indexer/cacheLogic.go index f27e57f8..07513b93 100644 --- a/indexer/cacheLogic.go +++ b/indexer/cacheLogic.go @@ -239,6 +239,9 @@ func (cache *indexerCache) processOrphanedBlocks(processedEpoch int64) error { defer tx.Rollback() for _, block := range orphanedBlocks { + if !block.IsReady() { + continue + } dbBlock := buildDbBlock(block, cache.getEpochStats(utils.EpochOfSlot(block.Slot), nil)) dbBlock.Orphaned = true db.InsertBlock(dbBlock, tx) @@ -297,7 +300,7 @@ func (cache *indexerCache) processCachePersistence() error { defer tx.Rollback() for _, block := range pruneBlocks { - if !block.isInDb { + if !block.isInDb && block.IsReady() { orphanedBlock := block.buildOrphanedBlock() err := db.InsertUnfinalizedBlock(&dbtypes.UnfinalizedBlock{ Root: block.Root, @@ -320,7 +323,9 @@ func (cache *indexerCache) processCachePersistence() error { } for _, block := range pruneBlocks { - block.block = nil + if block.isInDb { + block.block = nil + } } return nil diff --git a/indexer/client.go b/indexer/client.go index 855a1524..e8c8ecb7 100644 --- a/indexer/client.go +++ b/indexer/client.go @@ -238,14 +238,14 @@ func (client *IndexerClient) prefillCache(finalizedSlot uint64, latestHeader *rp } client.ensureBlock(currentBlock, &latestHeader.Data.Header) - finalizedCheckpoint := (client.indexerCache.finalizedEpoch + 1) * int64(utils.Config.Chain.Config.SlotsPerEpoch) - if finalizedCheckpoint > int64(finalizedSlot) { - finalizedSlot = uint64(finalizedCheckpoint) - } - // walk backwards and load all blocks until we reach a finalized epoch parentRoot := []byte(currentBlock.header.Message.ParentRoot) for { + finalizedCheckpoint := (client.indexerCache.finalizedEpoch + 1) * int64(utils.Config.Chain.Config.SlotsPerEpoch) + if finalizedCheckpoint > int64(finalizedSlot) { + finalizedSlot = uint64(finalizedCheckpoint) + } + var parentHead *rpctypes.SignedBeaconBlockHeader parentBlock := client.indexerCache.getCachedBlock(parentRoot) if parentBlock != nil { @@ -274,6 +274,7 @@ func (client *IndexerClient) prefillCache(finalizedSlot uint64, latestHeader *rp logger.WithField("client", client.clientName).Debugf("received known block %v:%v [0x%x] warmup", utils.EpochOfSlot(parentSlot), parentSlot, parentRoot) } client.ensureBlock(parentBlock, parentHead) + if parentSlot <= finalizedSlot { logger.WithField("client", client.clientName).Debugf("prefill cache: reached finalized slot %v:%v [0x%x]", utils.EpochOfSlot(parentSlot), parentSlot, parentRoot) break @@ -308,6 +309,7 @@ func (client *IndexerClient) ensureBlock(block *CacheBlock, header *rpctypes.Sig if header == nil { headerRsp, err := client.rpcClient.GetBlockHeaderByBlockroot(block.Root) if err != nil { + logger.WithField("client", client.clientName).Warnf("ensure block %v [0x%x] failed (header): %v", block.Slot, block.Root, err) return err } header = &headerRsp.Data.Header @@ -317,6 +319,7 @@ func (client *IndexerClient) ensureBlock(block *CacheBlock, header *rpctypes.Sig if block.block == nil && !block.isInDb { blockRsp, err := client.rpcClient.GetBlockBodyByBlockroot(block.Root) if err != nil { + logger.WithField("client", client.clientName).Warnf("ensure block %v [0x%x] failed (block): %v", block.Slot, block.Root, err) return err } block.block = &blockRsp.Data diff --git a/indexer/indexer.go b/indexer/indexer.go index 01832729..06bf17be 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -2,6 +2,7 @@ package indexer import ( "bytes" + "fmt" "math/rand" "sort" @@ -108,7 +109,11 @@ func (indexer *Indexer) getReadyClientCandidates(headFork *HeadFork, archive boo } func (indexer *Indexer) GetRpcClient(archive bool, head []byte) *rpc.BeaconClient { - return indexer.getReadyClient(archive, head).rpcClient + readyClient := indexer.getReadyClient(archive, head) + if head != nil { + fmt.Printf("client for head 0x%x: %v\n", head, readyClient.clientName) + } + return readyClient.rpcClient } func (indexer *Indexer) GetFinalizedEpoch() (int64, []byte) { @@ -207,9 +212,11 @@ func (indexer *Indexer) GetCachedBlocks(slot uint64) []*CacheBlock { } indexer.indexerCache.cacheMutex.RLock() defer indexer.indexerCache.cacheMutex.RUnlock() - blocks := indexer.indexerCache.slotMap[slot] - if blocks == nil { - return nil + blocks := make([]*CacheBlock, 0) + for _, block := range indexer.indexerCache.slotMap[slot] { + if block.IsReady() { + blocks = append(blocks, block) + } } return blocks } @@ -217,8 +224,11 @@ func (indexer *Indexer) GetCachedBlocks(slot uint64) []*CacheBlock { func (indexer *Indexer) GetCachedBlock(root []byte) *CacheBlock { indexer.indexerCache.cacheMutex.RLock() defer indexer.indexerCache.cacheMutex.RUnlock() - - return indexer.indexerCache.rootMap[string(root)] + block := indexer.indexerCache.rootMap[string(root)] + if block != nil && !block.IsReady() { + return nil + } + return block } func (indexer *Indexer) GetCachedBlockByStateroot(stateroot []byte) *CacheBlock { @@ -236,7 +246,11 @@ func (indexer *Indexer) GetCachedBlockByStateroot(stateroot []byte) *CacheBlock blocks := indexer.indexerCache.slotMap[slot] for _, block := range blocks { if bytes.Equal(block.header.Message.StateRoot, stateroot) { - return block + if !block.IsReady() { + return nil + } else { + return block + } } } } @@ -258,7 +272,7 @@ func (indexer *Indexer) GetCachedBlocksByProposer(proposer uint64) []*CacheBlock slot := uint64(slotIdx) blocks := indexer.indexerCache.slotMap[slot] for _, block := range blocks { - if uint64(block.header.Message.ProposerIndex) == proposer { + if block.IsReady() && uint64(block.header.Message.ProposerIndex) == proposer { resBlocks = append(resBlocks, block) } } @@ -271,7 +285,7 @@ func (indexer *Indexer) GetCachedBlocksByParentRoot(parentRoot []byte) []*CacheB defer indexer.indexerCache.cacheMutex.RUnlock() resBlocks := make([]*CacheBlock, 0) for _, block := range indexer.indexerCache.rootMap { - if block.header != nil && bytes.Equal(block.header.Message.ParentRoot, parentRoot) { + if block.IsReady() && bytes.Equal(block.header.Message.ParentRoot, parentRoot) { resBlocks = append(resBlocks, block) } }