Skip to content

Commit

Permalink
fixed crashes due to bad data from rpc nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 24, 2023
1 parent 8c83a83 commit 8bf35f0
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 16 deletions.
4 changes: 4 additions & 0 deletions indexer/cacheBlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,7 @@ func (block *CacheBlock) IsCanonical(indexer *Indexer, head []byte) bool {
}
return indexer.indexerCache.isCanonicalBlock(block.Root, head)
}

func (block *CacheBlock) IsReady() bool {
return block.header != nil && (block.block != nil || block.isInDb)
}
9 changes: 7 additions & 2 deletions indexer/cacheLogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ func (cache *indexerCache) processOrphanedBlocks(processedEpoch int64) error {
defer tx.Rollback()

for _, block := range orphanedBlocks {
if !block.IsReady() {
continue
}
dbBlock := buildDbBlock(block, cache.getEpochStats(utils.EpochOfSlot(block.Slot), nil))
dbBlock.Orphaned = true
db.InsertBlock(dbBlock, tx)
Expand Down Expand Up @@ -297,7 +300,7 @@ func (cache *indexerCache) processCachePersistence() error {
defer tx.Rollback()

for _, block := range pruneBlocks {
if !block.isInDb {
if !block.isInDb && block.IsReady() {
orphanedBlock := block.buildOrphanedBlock()
err := db.InsertUnfinalizedBlock(&dbtypes.UnfinalizedBlock{
Root: block.Root,
Expand All @@ -320,7 +323,9 @@ func (cache *indexerCache) processCachePersistence() error {
}

for _, block := range pruneBlocks {
block.block = nil
if block.isInDb {
block.block = nil
}
}

return nil
Expand Down
13 changes: 8 additions & 5 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,14 +238,14 @@ func (client *IndexerClient) prefillCache(finalizedSlot uint64, latestHeader *rp
}
client.ensureBlock(currentBlock, &latestHeader.Data.Header)

finalizedCheckpoint := (client.indexerCache.finalizedEpoch + 1) * int64(utils.Config.Chain.Config.SlotsPerEpoch)
if finalizedCheckpoint > int64(finalizedSlot) {
finalizedSlot = uint64(finalizedCheckpoint)
}

// walk backwards and load all blocks until we reach a finalized epoch
parentRoot := []byte(currentBlock.header.Message.ParentRoot)
for {
finalizedCheckpoint := (client.indexerCache.finalizedEpoch + 1) * int64(utils.Config.Chain.Config.SlotsPerEpoch)
if finalizedCheckpoint > int64(finalizedSlot) {
finalizedSlot = uint64(finalizedCheckpoint)
}

var parentHead *rpctypes.SignedBeaconBlockHeader
parentBlock := client.indexerCache.getCachedBlock(parentRoot)
if parentBlock != nil {
Expand Down Expand Up @@ -274,6 +274,7 @@ func (client *IndexerClient) prefillCache(finalizedSlot uint64, latestHeader *rp
logger.WithField("client", client.clientName).Debugf("received known block %v:%v [0x%x] warmup", utils.EpochOfSlot(parentSlot), parentSlot, parentRoot)
}
client.ensureBlock(parentBlock, parentHead)

if parentSlot <= finalizedSlot {
logger.WithField("client", client.clientName).Debugf("prefill cache: reached finalized slot %v:%v [0x%x]", utils.EpochOfSlot(parentSlot), parentSlot, parentRoot)
break
Expand Down Expand Up @@ -308,6 +309,7 @@ func (client *IndexerClient) ensureBlock(block *CacheBlock, header *rpctypes.Sig
if header == nil {
headerRsp, err := client.rpcClient.GetBlockHeaderByBlockroot(block.Root)
if err != nil {
logger.WithField("client", client.clientName).Warnf("ensure block %v [0x%x] failed (header): %v", block.Slot, block.Root, err)
return err
}
header = &headerRsp.Data.Header
Expand All @@ -317,6 +319,7 @@ func (client *IndexerClient) ensureBlock(block *CacheBlock, header *rpctypes.Sig
if block.block == nil && !block.isInDb {
blockRsp, err := client.rpcClient.GetBlockBodyByBlockroot(block.Root)
if err != nil {
logger.WithField("client", client.clientName).Warnf("ensure block %v [0x%x] failed (block): %v", block.Slot, block.Root, err)
return err
}
block.block = &blockRsp.Data
Expand Down
32 changes: 23 additions & 9 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"bytes"
"fmt"
"math/rand"
"sort"

Expand Down Expand Up @@ -108,7 +109,11 @@ func (indexer *Indexer) getReadyClientCandidates(headFork *HeadFork, archive boo
}

func (indexer *Indexer) GetRpcClient(archive bool, head []byte) *rpc.BeaconClient {
return indexer.getReadyClient(archive, head).rpcClient
readyClient := indexer.getReadyClient(archive, head)
if head != nil {
fmt.Printf("client for head 0x%x: %v\n", head, readyClient.clientName)
}
return readyClient.rpcClient
}

func (indexer *Indexer) GetFinalizedEpoch() (int64, []byte) {
Expand Down Expand Up @@ -207,18 +212,23 @@ func (indexer *Indexer) GetCachedBlocks(slot uint64) []*CacheBlock {
}
indexer.indexerCache.cacheMutex.RLock()
defer indexer.indexerCache.cacheMutex.RUnlock()
blocks := indexer.indexerCache.slotMap[slot]
if blocks == nil {
return nil
blocks := make([]*CacheBlock, 0)
for _, block := range indexer.indexerCache.slotMap[slot] {
if block.IsReady() {
blocks = append(blocks, block)
}
}
return blocks
}

func (indexer *Indexer) GetCachedBlock(root []byte) *CacheBlock {
indexer.indexerCache.cacheMutex.RLock()
defer indexer.indexerCache.cacheMutex.RUnlock()

return indexer.indexerCache.rootMap[string(root)]
block := indexer.indexerCache.rootMap[string(root)]
if block != nil && !block.IsReady() {
return nil
}
return block
}

func (indexer *Indexer) GetCachedBlockByStateroot(stateroot []byte) *CacheBlock {
Expand All @@ -236,7 +246,11 @@ func (indexer *Indexer) GetCachedBlockByStateroot(stateroot []byte) *CacheBlock
blocks := indexer.indexerCache.slotMap[slot]
for _, block := range blocks {
if bytes.Equal(block.header.Message.StateRoot, stateroot) {
return block
if !block.IsReady() {
return nil
} else {
return block
}
}
}
}
Expand All @@ -258,7 +272,7 @@ func (indexer *Indexer) GetCachedBlocksByProposer(proposer uint64) []*CacheBlock
slot := uint64(slotIdx)
blocks := indexer.indexerCache.slotMap[slot]
for _, block := range blocks {
if uint64(block.header.Message.ProposerIndex) == proposer {
if block.IsReady() && uint64(block.header.Message.ProposerIndex) == proposer {
resBlocks = append(resBlocks, block)
}
}
Expand All @@ -271,7 +285,7 @@ func (indexer *Indexer) GetCachedBlocksByParentRoot(parentRoot []byte) []*CacheB
defer indexer.indexerCache.cacheMutex.RUnlock()
resBlocks := make([]*CacheBlock, 0)
for _, block := range indexer.indexerCache.rootMap {
if block.header != nil && bytes.Equal(block.header.Message.ParentRoot, parentRoot) {
if block.IsReady() && bytes.Equal(block.header.Message.ParentRoot, parentRoot) {
resBlocks = append(resBlocks, block)
}
}
Expand Down

0 comments on commit 8bf35f0

Please sign in to comment.