From 34943118687fcadc8da6b5e4673a9546211533cc Mon Sep 17 00:00:00 2001 From: pk910 Date: Mon, 7 Aug 2023 21:01:22 +0200 Subject: [PATCH] reprocess epoch aggregations if relevant slots are still cached (rel. #2) --- indexer/indexer.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/indexer/indexer.go b/indexer/indexer.go index 5364c620..02902158 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -488,7 +488,13 @@ func (indexer *Indexer) processHeadBlock(slot uint64, header *rpctypes.StandardV } if !reachedEnd { logger.Errorf("Large chain reorg detected, resync needed") - // TODO: Drop all unfinalized & resync + // TODO: Start synchronization + } else { + reorgMinEpoch := utils.EpochOfSlot(uint64(canonicalBlock.Header.Data.Header.Message.Slot)) + if reorgMinEpoch <= indexer.state.lastProcessedEpoch { + logger.Infof("Chain reorg touched processed epochs, reset epoch processing to %v", reorgMinEpoch-1) + indexer.state.lastProcessedEpoch = reorgMinEpoch - 1 + } } } indexer.state.lastHeadBlock = slot @@ -610,8 +616,9 @@ func (indexer *Indexer) loadEpochValidators(epoch uint64, epochStats *EpochStats func (indexer *Indexer) processIndexing() { // process old epochs currentEpoch := utils.EpochOfSlot(indexer.state.lastHeadBlock) - processEpoch := currentEpoch - uint64(indexer.epochProcessingDelay) - if indexer.state.lastProcessedEpoch < processEpoch { + maxProcessEpoch := currentEpoch - uint64(indexer.epochProcessingDelay) + for indexer.state.lastProcessedEpoch < maxProcessEpoch { + processEpoch := indexer.state.lastProcessedEpoch + 1 indexer.processEpoch(processEpoch) indexer.state.lastProcessedEpoch = processEpoch }