Skip to content

Commit

Permalink
retry fetching block data up to 3x in case of RPC errors
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 25, 2023
1 parent 29c865e commit 6fa3284
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 35 deletions.
2 changes: 1 addition & 1 deletion indexer/cacheLogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (cache *indexerCache) processFinalizedEpoch(epoch uint64) error {
epochStats, isNewStats := cache.createOrGetEpochStats(epoch, epochDependentRoot)
if isNewStats {
logger.Warnf("missing epoch stats during finalization processing (epoch: %v)", epoch)
client := cache.indexer.getReadyClient(true, nil)
client := cache.indexer.GetReadyClient(true, nil, nil)
if client != nil {
client.ensureEpochStats(epoch, client.lastHeadRoot)
time.Sleep(10 * time.Millisecond)
Expand Down
4 changes: 4 additions & 0 deletions indexer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (client *IndexerClient) GetVersion() string {
return client.versionStr
}

func (client *IndexerClient) GetRpcClient() *rpc.BeaconClient {
return client.rpcClient
}

func (client *IndexerClient) GetLastHead() (int64, []byte) {
client.cacheMutex.RLock()
defer client.cacheMutex.RUnlock()
Expand Down
53 changes: 44 additions & 9 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,50 @@ func (indexer *Indexer) GetClients() []*IndexerClient {
return indexer.indexerClients
}

func (indexer *Indexer) getReadyClient(archive bool, head []byte) *IndexerClient {
func (indexer *Indexer) GetReadyClient(archive bool, head []byte, skip []*IndexerClient) *IndexerClient {
clientCandidates := indexer.GetReadyClients(archive, head)
candidateCount := len(clientCandidates)
if candidateCount == 0 {
clientCandidates = make([]*IndexerClient, 0)
for _, client := range indexer.indexerClients {
if client.isConnected && !client.isSynchronizing {
clientCandidates = append(clientCandidates, client)
}
}
candidateCount = len(clientCandidates)
}
allCandidates := make([]*IndexerClient, candidateCount)
copy(allCandidates, clientCandidates)

// remove skipped
for _, skipClient := range skip {
skipIdx := -1
for idx, tClient := range clientCandidates {
if tClient == skipClient {
skipIdx = idx
}
}
if skipIdx != -1 {
candidateCount--
if skipIdx < candidateCount {
clientCandidates[skipIdx] = clientCandidates[candidateCount]
}
clientCandidates = clientCandidates[:candidateCount]
}
}

if candidateCount == 0 {
clientCandidates = allCandidates
candidateCount = len(clientCandidates)
}
selectedIndex := rand.Intn(candidateCount)
return clientCandidates[selectedIndex]
}

func (indexer *Indexer) GetReadyClients(archive bool, head []byte) []*IndexerClient {
headCandidates := indexer.GetHeadForks()
if len(headCandidates) == 0 {
return indexer.indexerClients[0]
return indexer.indexerClients
}

var headFork *HeadFork
Expand All @@ -82,12 +122,7 @@ func (indexer *Indexer) getReadyClient(archive bool, head []byte) *IndexerClient
if len(clientCandidates) == 0 && archive {
clientCandidates = indexer.getReadyClientCandidates(headFork, false)
}
candidateCount := len(clientCandidates)
if candidateCount == 0 {
return indexer.indexerClients[0]
}
selectedIndex := rand.Intn(candidateCount)
return clientCandidates[selectedIndex]
return clientCandidates
}

func (indexer *Indexer) getReadyClientCandidates(headFork *HeadFork, archive bool) []*IndexerClient {
Expand All @@ -109,7 +144,7 @@ func (indexer *Indexer) getReadyClientCandidates(headFork *HeadFork, archive boo
}

func (indexer *Indexer) GetRpcClient(archive bool, head []byte) *rpc.BeaconClient {
readyClient := indexer.getReadyClient(archive, head)
readyClient := indexer.GetReadyClient(archive, head, nil)
if head != nil {
fmt.Printf("client for head 0x%x: %v\n", head, readyClient.clientName)
}
Expand Down
45 changes: 26 additions & 19 deletions indexer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (sync *synchronizerState) runSync() {
sync.cachedSlot = 0
isComplete := false
retryCount := 0
var skipClients []*IndexerClient = nil
synclogger.Infof("synchronization started. Head epoch: %v", sync.currentEpoch)

for {
Expand All @@ -94,12 +95,13 @@ func (sync *synchronizerState) runSync() {
} else {
synclogger.Infof("synchronizing epoch %v", syncEpoch)
}
done, err := sync.syncEpoch(syncEpoch, lastRetry)
done, usedClient, err := sync.syncEpoch(syncEpoch, lastRetry, skipClients)
if done || lastRetry {
if err != nil {
synclogger.Warnf("synchronization of epoch %v failed: %v - skipping epoch", syncEpoch, err)
}
retryCount = 0
skipClients = nil
finalizedEpoch, _ := sync.indexer.indexerCache.getFinalizedHead()
sync.stateMutex.Lock()
syncEpoch++
Expand All @@ -110,7 +112,12 @@ func (sync *synchronizerState) runSync() {
break
}
} else if err != nil {
synclogger.Warnf("synchronization of epoch %v failed: %v - Retrying in 10 sec...", syncEpoch, err)
log := synclogger
if usedClient != nil {
log = synclogger.WithField("client", usedClient.clientName)
skipClients = append(skipClients, usedClient)
}
log.Warnf("synchronization of epoch %v failed: %v - Retrying in 10 sec...", syncEpoch, err)
retryCount++
time.Sleep(10 * time.Second)
}
Expand Down Expand Up @@ -147,24 +154,24 @@ func (sync *synchronizerState) checkKillChan(timeout time.Duration) bool {
}
}

func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool) (bool, error) {
func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool, skipClients []*IndexerClient) (bool, *IndexerClient, error) {
if db.IsEpochSynchronized(syncEpoch) {
return true, nil
return true, nil, nil
}

client := sync.indexer.getReadyClient(true, nil)
client := sync.indexer.GetReadyClient(true, nil, skipClients)

// load epoch assignments
epochAssignments, err := client.rpcClient.GetEpochAssignments(syncEpoch)
if err != nil || epochAssignments == nil {
return false, fmt.Errorf("error fetching epoch %v duties: %v", syncEpoch, err)
return false, client, fmt.Errorf("error fetching epoch %v duties: %v", syncEpoch, err)
}
if epochAssignments.AttestorAssignments == nil && !lastTry {
return false, fmt.Errorf("error fetching epoch %v duties: attestor assignments empty", syncEpoch)
return false, client, fmt.Errorf("error fetching epoch %v duties: attestor assignments empty", syncEpoch)
}

if sync.checkKillChan(0) {
return false, nil
return false, nil, nil
}

// load headers & blocks from this & next epoch
Expand All @@ -176,17 +183,17 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool) (bool,
}
headerRsp, err := client.rpcClient.GetBlockHeaderBySlot(slot)
if err != nil {
return false, fmt.Errorf("error fetching slot %v header: %v", slot, err)
return false, client, fmt.Errorf("error fetching slot %v header: %v", slot, err)
}
if headerRsp == nil {
continue
}
if sync.checkKillChan(0) {
return false, nil
return false, nil, nil
}
blockRsp, err := client.rpcClient.GetBlockBodyByBlockroot(headerRsp.Data.Root)
if err != nil {
return false, fmt.Errorf("error fetching slot %v block: %v", slot, err)
return false, client, fmt.Errorf("error fetching slot %v block: %v", slot, err)
}
sync.cachedBlocks[slot] = &CacheBlock{
Root: headerRsp.Data.Root,
Expand All @@ -198,7 +205,7 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool) (bool,
sync.cachedSlot = lastSlot

if sync.checkKillChan(0) {
return false, nil
return false, nil, nil
}

// load epoch stats
Expand All @@ -212,10 +219,10 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool) (bool,
epochStats.loadValidatorStats(client, epochAssignments.DependendStateRef)

if epochStats.validatorStats == nil && !lastTry {
return false, fmt.Errorf("error fetching validator stats for epoch %v: %v", syncEpoch, err)
return false, client, fmt.Errorf("error fetching validator stats for epoch %v: %v", syncEpoch, err)
}
if sync.checkKillChan(0) {
return false, nil
return false, nil, nil
}

// process epoch vote aggregations
Expand All @@ -241,24 +248,24 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool) (bool,
// save blocks
tx, err := db.WriterDb.Beginx()
if err != nil {
return false, fmt.Errorf("error starting db transactions: %v", err)
return false, nil, fmt.Errorf("error starting db transactions: %v", err)
}
defer tx.Rollback()

err = persistEpochData(syncEpoch, sync.cachedBlocks, epochStats, epochVotes, tx)
if err != nil {
return false, fmt.Errorf("error persisting epoch data to db: %v", err)
return false, client, fmt.Errorf("error persisting epoch data to db: %v", err)
}

err = db.SetExplorerState("indexer.syncstate", &dbtypes.IndexerSyncState{
Epoch: syncEpoch,
}, tx)
if err != nil {
return false, fmt.Errorf("error while updating sync state: %v", err)
return false, nil, fmt.Errorf("error while updating sync state: %v", err)
}

if err := tx.Commit(); err != nil {
return false, fmt.Errorf("error committing db transaction: %v", err)
return false, nil, fmt.Errorf("error committing db transaction: %v", err)
}

// cleanup cache (remove blocks from this epoch)
Expand All @@ -268,5 +275,5 @@ func (sync *synchronizerState) syncEpoch(syncEpoch uint64, lastTry bool) (bool,
}
}

return true, nil
return true, nil, nil
}
32 changes: 26 additions & 6 deletions services/beaconservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,37 @@ func (bs *BeaconService) GetSlotDetailsBySlot(slot uint64, withBlobs bool) (*rpc
Orphaned: !cachedBlock.IsCanonical(bs.indexer, nil),
}
} else {
header, err := bs.indexer.GetRpcClient(false, nil).GetBlockHeaderBySlot(slot)
if err != nil {
var skipClients []*indexer.IndexerClient = nil

var header *rpctypes.StandardV1BeaconHeaderResponse
var err error
for retry := 0; retry < 3; retry++ {
client := bs.indexer.GetReadyClient(false, nil, skipClients)
header, err = client.GetRpcClient().GetBlockHeaderBySlot(slot)
if header != nil {
break
} else if err != nil {
skipClients = append(skipClients, client)
}
}
if err != nil || header == nil {
return nil, err
}
if header == nil {
return nil, nil

var block *rpctypes.StandardV2BeaconBlockResponse
for retry := 0; retry < 3; retry++ {
client := bs.indexer.GetReadyClient(false, header.Data.Root, skipClients)
block, err = client.GetRpcClient().GetBlockBodyByBlockroot(header.Data.Root)
if block != nil {
break
} else if err != nil {
skipClients = append(skipClients, client)
}
}
block, err := bs.indexer.GetRpcClient(false, header.Data.Root).GetBlockBodyByBlockroot(header.Data.Root)
if err != nil {
if err != nil || block == nil {
return nil, err
}

result = &rpctypes.CombinedBlockResponse{
Root: header.Data.Root,
Header: &header.Data.Header,
Expand Down

0 comments on commit 6fa3284

Please sign in to comment.