diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index 7f49882bfcd..79d736b9708 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -37,6 +37,7 @@ import ( "github.com/erigontech/erigon/cl/persistence/blob_storage" state_accessors "github.com/erigontech/erigon/cl/persistence/state" "github.com/erigontech/erigon/cl/phase1/core/state" + "github.com/erigontech/erigon/turbo/snapshotsync" "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" ) @@ -50,6 +51,7 @@ type Antiquary struct { downloader proto_downloader.DownloaderClient logger log.Logger sn *freezeblocks.CaplinSnapshots + stateSn *snapshotsync.CaplinStateSnapshots snReader freezeblocks.BeaconSnapshotReader snBuildSema *semaphore.Weighted // semaphore for building only one type (blocks, caplin, v3) at a time ctx context.Context @@ -65,7 +67,7 @@ type Antiquary struct { balances32 []byte } -func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, genesisState *state.CachingBeaconState, validatorsTable *state_accessors.StaticValidatorTable, cfg *clparams.BeaconChainConfig, dirs datadir.Dirs, downloader proto_downloader.DownloaderClient, mainDB kv.RwDB, sn *freezeblocks.CaplinSnapshots, reader freezeblocks.BeaconSnapshotReader, logger log.Logger, states, blocks, blobs, snapgen bool, snBuildSema *semaphore.Weighted) *Antiquary { +func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, genesisState *state.CachingBeaconState, validatorsTable *state_accessors.StaticValidatorTable, cfg *clparams.BeaconChainConfig, dirs datadir.Dirs, downloader proto_downloader.DownloaderClient, mainDB kv.RwDB, stateSn *snapshotsync.CaplinStateSnapshots, sn *freezeblocks.CaplinSnapshots, reader freezeblocks.BeaconSnapshotReader, logger log.Logger, states, blocks, blobs, snapgen bool, snBuildSema *semaphore.Weighted) *Antiquary { backfilled := &atomic.Bool{} blobBackfilled := &atomic.Bool{} backfilled.Store(false) @@ -89,6 +91,7 @@ func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, gen blocks: blocks, blobs: blobs, snapgen: snapgen, + stateSn: stateSn, } } diff --git a/cl/antiquary/state_antiquary.go b/cl/antiquary/state_antiquary.go index efb5238954b..4f78c215ef6 100644 --- a/cl/antiquary/state_antiquary.go +++ b/cl/antiquary/state_antiquary.go @@ -27,7 +27,9 @@ import ( "github.com/erigontech/erigon-lib/common" libcommon "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/etl" + proto_downloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cl/clparams" @@ -42,6 +44,7 @@ import ( "github.com/erigontech/erigon/cl/phase1/core/state/raw" "github.com/erigontech/erigon/cl/transition" "github.com/erigontech/erigon/cl/transition/impl/eth2" + "github.com/erigontech/erigon/turbo/snapshotsync" ) // pool for buffers @@ -111,6 +114,9 @@ func (s *Antiquary) readHistoricalProcessingProgress(ctx context.Context) (progr if err != nil { return } + if s.stateSn != nil { + progress = max(progress, s.stateSn.BlocksAvailable()) + } finalized, err = beacon_indicies.ReadHighestFinalized(tx) if err != nil { @@ -119,8 +125,68 @@ func (s *Antiquary) readHistoricalProcessingProgress(ctx context.Context) (progr return } +func FillStaticValidatorsTableIfNeeded(ctx context.Context, logger log.Logger, stateSn *snapshotsync.CaplinStateSnapshots, validatorsTable *state_accessors.StaticValidatorTable) (bool, error) { + if stateSn == nil || validatorsTable.Slot() != 0 { + return false, nil + } + if err := stateSn.OpenFolder(); err != nil { + return false, err + } + blocksAvaiable := stateSn.BlocksAvailable() + stateSnRoTx := stateSn.View() + defer stateSnRoTx.Close() + + start := time.Now() + for slot := uint64(0); slot <= stateSn.BlocksAvailable(); slot++ { + seg, ok := stateSnRoTx.VisibleSegment(slot, kv.StateEvents) + if !ok { + return false, fmt.Errorf("segment not found for slot %d", slot) + } + buf, err := seg.Get(slot) + if err != nil { + return false, err + } + if len(buf) == 0 { + continue + } + event := state_accessors.NewStateEventsFromBytes(buf) + state_accessors.ReplayEvents( + func(validatorIndex uint64, validator solid.Validator) error { + return validatorsTable.AddValidator(validator, validatorIndex, slot) + }, + func(validatorIndex uint64, exitEpoch uint64) error { + return validatorsTable.AddExitEpoch(validatorIndex, slot, exitEpoch) + }, + func(validatorIndex uint64, withdrawableEpoch uint64) error { + return validatorsTable.AddWithdrawableEpoch(validatorIndex, slot, withdrawableEpoch) + }, + func(validatorIndex uint64, withdrawalCredentials libcommon.Hash) error { + return validatorsTable.AddWithdrawalCredentials(validatorIndex, slot, withdrawalCredentials) + }, + func(validatorIndex uint64, activationEpoch uint64) error { + return validatorsTable.AddActivationEpoch(validatorIndex, slot, activationEpoch) + }, + func(validatorIndex uint64, activationEligibilityEpoch uint64) error { + return validatorsTable.AddActivationEligibility(validatorIndex, slot, activationEligibilityEpoch) + }, + func(validatorIndex uint64, slashed bool) error { + return validatorsTable.AddSlashed(validatorIndex, slot, slashed) + }, + event, + ) + validatorsTable.SetSlot(slot) + } + logger.Info("[Antiquary] Filled static validators table", "slots", blocksAvaiable, "elapsed", time.Since(start)) + return true, nil +} + func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { - var tx kv.Tx + + // Check if you need to fill the static validators table + refilledStaticValidators, err := FillStaticValidatorsTableIfNeeded(ctx, s.logger, s.stateSn, s.validatorsTable) + if err != nil { + return err + } tx, err := s.mainDB.BeginRo(ctx) if err != nil { @@ -131,6 +197,13 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { // maps which validators changes var changedValidators sync.Map + if refilledStaticValidators { + s.validatorsTable.ForEach(func(validatorIndex uint64, validator *state_accessors.StaticValidator) bool { + changedValidators.Store(validatorIndex, struct{}{}) + return true + }) + } + stateAntiquaryCollector := newBeaconStatesCollector(s.cfg, s.dirs.Tmp, s.logger) defer stateAntiquaryCollector.close() @@ -413,6 +486,54 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { return err } log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime) + if s.snapgen { + if err := s.stateSn.OpenFolder(); err != nil { + return err + } + blocksPerStatefulFile := uint64(snaptype.CaplinMergeLimit * 5) + from := s.stateSn.BlocksAvailable() + 1 + if from+blocksPerStatefulFile+safetyMargin > s.currentState.Slot() { + return nil + } + to := s.currentState.Slot() + if to < (safetyMargin + blocksPerStatefulFile) { + return nil + } + to = to - (safetyMargin + blocksPerStatefulFile) + if from >= to { + return nil + } + if err := s.stateSn.DumpCaplinState( + ctx, + s.stateSn.BlocksAvailable()+1, + to, + blocksPerStatefulFile, + s.sn.Salt, + s.dirs, + 1, + log.LvlInfo, + s.logger, + ); err != nil { + return err + } + paths := s.stateSn.SegFileNames(from, to) + downloadItems := make([]*proto_downloader.AddItem, len(paths)) + for i, path := range paths { + downloadItems[i] = &proto_downloader.AddItem{ + Path: path, + } + } + if s.downloader != nil { + // Notify bittorent to seed the new snapshots + if _, err := s.downloader.Add(s.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil { + s.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err) + } + } + if err := s.stateSn.OpenFolder(); err != nil { + return err + } + } + return nil } @@ -439,12 +560,15 @@ func (s *Antiquary) initializeStateAntiquaryIfNeeded(ctx context.Context, tx kv. if err != nil { return err } + if s.stateSn != nil { + targetSlot = max(targetSlot, s.stateSn.BlocksAvailable()) + } // We want to backoff by some slots until we get a correct state from DB. // we start from 10 * clparams.SlotsPerDump. backoffStrides := uint64(10) backoffStep := backoffStrides - historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.genesisState) + historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.genesisState, s.stateSn) for { attempt, err := computeSlotToBeRequested(tx, s.cfg, s.genesisState.Slot(), targetSlot, backoffStep) @@ -465,6 +589,7 @@ func (s *Antiquary) initializeStateAntiquaryIfNeeded(ctx context.Context, tx kv. if err != nil { return fmt.Errorf("failed to read historical state at slot %d: %w", attempt, err) } + if s.currentState == nil { log.Warn("historical state not found, backoff more and try again", "slot", attempt) backoffStep += backoffStrides diff --git a/cl/antiquary/state_antiquary_test.go b/cl/antiquary/state_antiquary_test.go index 08e37c4fc6c..12f8cf8d792 100644 --- a/cl/antiquary/state_antiquary_test.go +++ b/cl/antiquary/state_antiquary_test.go @@ -41,7 +41,7 @@ func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postSt ctx := context.Background() vt := state_accessors.NewStaticValidatorTable() - a := NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, reader, log.New(), true, true, true, false, nil) + a := NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, nil, reader, log.New(), true, true, true, false, nil) require.NoError(t, a.IncrementBeaconState(ctx, blocks[len(blocks)-1].Block.Slot+33)) } diff --git a/cl/beacon/handler/attestation_rewards.go b/cl/beacon/handler/attestation_rewards.go index a315df28bf5..2c175c9f4c1 100644 --- a/cl/beacon/handler/attestation_rewards.go +++ b/cl/beacon/handler/attestation_rewards.go @@ -178,13 +178,17 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r if lastSlot > stateProgress { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("requested range is not yet processed or the node is not archivial")) } + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() - epochData, err := state_accessors.ReadEpochData(tx, a.beaconChainCfg.RoundSlotToEpoch(lastSlot)) + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) + + epochData, err := state_accessors.ReadEpochData(stateGetter, a.beaconChainCfg.RoundSlotToEpoch(lastSlot)) if err != nil { return nil, err } - validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, lastSlot) + validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, stateGetter, lastSlot) if err != nil { return nil, err } @@ -192,12 +196,12 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("no validator set found for this epoch")) } - _, previousIdx, err := a.stateReader.ReadParticipations(tx, lastSlot) + _, previousIdx, err := a.stateReader.ReadParticipations(tx, stateGetter, lastSlot) if err != nil { return nil, err } - _, _, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(tx, epoch*a.beaconChainCfg.SlotsPerEpoch) + _, _, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch) if err != nil { return nil, err } @@ -212,7 +216,7 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r return resp.WithFinalized(true).WithOptimistic(a.forkchoiceStore.IsRootOptimistic(root)), nil } inactivityScores := solid.NewUint64ListSSZ(int(a.beaconChainCfg.ValidatorRegistryLimit)) - if err := a.stateReader.ReconstructUint64ListDump(tx, lastSlot, kv.InactivityScores, validatorSet.Length(), inactivityScores); err != nil { + if err := a.stateReader.ReconstructUint64ListDump(stateGetter, lastSlot, kv.InactivityScores, validatorSet.Length(), inactivityScores); err != nil { return nil, err } resp, err := a.computeAttestationsRewardsForAltair( diff --git a/cl/beacon/handler/committees.go b/cl/beacon/handler/committees.go index 9f4d25b022a..d417ec1aae4 100644 --- a/cl/beacon/handler/committees.go +++ b/cl/beacon/handler/committees.go @@ -121,8 +121,13 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea } return newBeaconResponse(resp).WithFinalized(isFinalized).WithOptimistic(isOptimistic), nil } + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) // finality case - activeIdxs, err := state_accessors.ReadActiveIndicies(tx, epoch*a.beaconChainCfg.SlotsPerEpoch) + activeIdxs, err := state_accessors.ReadActiveIndicies( + stateGetter, + epoch*a.beaconChainCfg.SlotsPerEpoch) if err != nil { return nil, err } @@ -136,7 +141,7 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea } mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % a.beaconChainCfg.EpochsPerHistoricalVector - mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition) + mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition) if err != nil { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao mix: %v", err)) } diff --git a/cl/beacon/handler/duties_attester.go b/cl/beacon/handler/duties_attester.go index 5085975a128..4bf953e921f 100644 --- a/cl/beacon/handler/duties_attester.go +++ b/cl/beacon/handler/duties_attester.go @@ -150,8 +150,15 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) ( if (epoch)*a.beaconChainCfg.SlotsPerEpoch >= stageStateProgress { return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Errorf("epoch %d is too far in the future", epoch)) } + + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) // finality case - activeIdxs, err := state_accessors.ReadActiveIndicies(tx, epoch*a.beaconChainCfg.SlotsPerEpoch) + activeIdxs, err := state_accessors.ReadActiveIndicies( + stateGetter, + epoch*a.beaconChainCfg.SlotsPerEpoch) if err != nil { return nil, err } @@ -165,7 +172,7 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) ( } mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % a.beaconChainCfg.EpochsPerHistoricalVector - mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition) + mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition) if err != nil { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao mix: %v", err)) } diff --git a/cl/beacon/handler/duties_sync.go b/cl/beacon/handler/duties_sync.go index 024fd6d45e5..bc4e7cc082a 100644 --- a/cl/beacon/handler/duties_sync.go +++ b/cl/beacon/handler/duties_sync.go @@ -81,9 +81,13 @@ func (a *ApiHandler) getSyncDuties(w http.ResponseWriter, r *http.Request) (*bea if !ok { _, syncCommittee, ok = a.forkchoiceStore.GetSyncCommittees(period - 1) } + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() // Read them from the archive node if we do not have them in the fast-access storage if !ok { - syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(tx, a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch)) + syncCommittee, err = state_accessors.ReadCurrentSyncCommittee( + state_accessors.GetValFnTxAndSnapshot(tx, snRoTx), + a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch)) if syncCommittee == nil { log.Warn("could not find sync committee for epoch", "epoch", epoch, "period", period) return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not find sync committee for epoch %d", epoch)) diff --git a/cl/beacon/handler/handler.go b/cl/beacon/handler/handler.go index 5d74b12226a..76ce92e4b43 100644 --- a/cl/beacon/handler/handler.go +++ b/cl/beacon/handler/handler.go @@ -49,6 +49,7 @@ import ( "github.com/erigontech/erigon/cl/validator/committee_subscription" "github.com/erigontech/erigon/cl/validator/sync_contribution_pool" "github.com/erigontech/erigon/cl/validator/validator_params" + "github.com/erigontech/erigon/turbo/snapshotsync" "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" ) @@ -64,18 +65,19 @@ type ApiHandler struct { o sync.Once mux *chi.Mux - blockReader freezeblocks.BeaconSnapshotReader - indiciesDB kv.RwDB - netConfig *clparams.NetworkConfig - ethClock eth_clock.EthereumClock - beaconChainCfg *clparams.BeaconChainConfig - forkchoiceStore forkchoice.ForkChoiceStorage - operationsPool pool.OperationsPool - syncedData synced_data.SyncedData - stateReader *historical_states_reader.HistoricalStatesReader - sentinel sentinel.SentinelClient - blobStoage blob_storage.BlobStorage - caplinSnapshots *freezeblocks.CaplinSnapshots + blockReader freezeblocks.BeaconSnapshotReader + indiciesDB kv.RwDB + netConfig *clparams.NetworkConfig + ethClock eth_clock.EthereumClock + beaconChainCfg *clparams.BeaconChainConfig + forkchoiceStore forkchoice.ForkChoiceStorage + operationsPool pool.OperationsPool + syncedData synced_data.SyncedData + stateReader *historical_states_reader.HistoricalStatesReader + sentinel sentinel.SentinelClient + blobStoage blob_storage.BlobStorage + caplinSnapshots *freezeblocks.CaplinSnapshots + caplinStateSnapshots *snapshotsync.CaplinStateSnapshots version string // Node's version @@ -143,6 +145,7 @@ func NewApiHandler( proposerSlashingService services.ProposerSlashingService, builderClient builder.BuilderClient, validatorMonitor monitor.ValidatorMonitor, + caplinStateSnapshots *snapshotsync.CaplinStateSnapshots, enableMemoizedHeadState bool, ) *ApiHandler { blobBundles, err := lru.New[common.Bytes48, BlobBundle]("blobs", maxBlobBundleCacheSize) @@ -150,18 +153,19 @@ func NewApiHandler( panic(err) } return &ApiHandler{ - logger: logger, - validatorParams: validatorParams, - o: sync.Once{}, - netConfig: netConfig, - ethClock: ethClock, - beaconChainCfg: beaconChainConfig, - indiciesDB: indiciesDB, - forkchoiceStore: forkchoiceStore, - operationsPool: operationsPool, - blockReader: rcsn, - syncedData: syncedData, - stateReader: stateReader, + logger: logger, + validatorParams: validatorParams, + o: sync.Once{}, + netConfig: netConfig, + ethClock: ethClock, + beaconChainCfg: beaconChainConfig, + indiciesDB: indiciesDB, + forkchoiceStore: forkchoiceStore, + operationsPool: operationsPool, + blockReader: rcsn, + syncedData: syncedData, + stateReader: stateReader, + caplinStateSnapshots: caplinStateSnapshots, randaoMixesPool: sync.Pool{New: func() interface{} { return solid.NewHashVector(int(beaconChainConfig.EpochsPerHistoricalVector)) }}, diff --git a/cl/beacon/handler/lighthouse.go b/cl/beacon/handler/lighthouse.go index 612cb31e480..f2e978f5e56 100644 --- a/cl/beacon/handler/lighthouse.go +++ b/cl/beacon/handler/lighthouse.go @@ -76,6 +76,10 @@ func (a *ApiHandler) GetLighthouseValidatorInclusionGlobal(w http.ResponseWriter } defer tx.Rollback() + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) + slot := epoch * a.beaconChainCfg.SlotsPerEpoch if slot >= a.forkchoiceStore.LowestAvailableSlot() { // Take data from forkchoice @@ -120,29 +124,30 @@ func (a *ApiHandler) GetLighthouseValidatorInclusionGlobal(w http.ResponseWriter } // read the epoch datas first - epochData, err := state_accessors.ReadEpochData(tx, epoch*a.beaconChainCfg.SlotsPerEpoch) + epochData, err := state_accessors.ReadEpochData(stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch) if err != nil { return nil, err } if epochData == nil { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("epoch data not found for current epoch")) } - prevEpochData, err := state_accessors.ReadEpochData(tx, prevEpoch*a.beaconChainCfg.SlotsPerEpoch) + prevEpochData, err := state_accessors.ReadEpochData(stateGetter, prevEpoch*a.beaconChainCfg.SlotsPerEpoch) if err != nil { return nil, err } if prevEpochData == nil { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("epoch data not found for previous epoch")) } + // read the validator set - validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, slot) + validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, stateGetter, slot) if err != nil { return nil, err } if validatorSet == nil { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("validator set not found for current epoch")) } - currentEpochParticipation, previousEpochParticipation, err := a.stateReader.ReadParticipations(tx, slot+(a.beaconChainCfg.SlotsPerEpoch-1)) + currentEpochParticipation, previousEpochParticipation, err := a.stateReader.ReadParticipations(tx, stateGetter, slot+(a.beaconChainCfg.SlotsPerEpoch-1)) if err != nil { return nil, err } @@ -277,15 +282,18 @@ func (a *ApiHandler) GetLighthouseValidatorInclusion(w http.ResponseWriter, r *h return newBeaconResponse(a.computeLighthouseValidatorInclusion(int(validatorIndex), prevEpoch, epoch, activeBalance, prevActiveBalance, validatorSet, currentEpochParticipation, previousEpochParticipation)), nil } + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) // read the epoch datas first - epochData, err := state_accessors.ReadEpochData(tx, epoch*a.beaconChainCfg.SlotsPerEpoch) + epochData, err := state_accessors.ReadEpochData(stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch) if err != nil { return nil, err } if epochData == nil { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("epoch data not found for current epoch")) } - prevEpochData, err := state_accessors.ReadEpochData(tx, prevEpoch*a.beaconChainCfg.SlotsPerEpoch) + prevEpochData, err := state_accessors.ReadEpochData(stateGetter, prevEpoch*a.beaconChainCfg.SlotsPerEpoch) if err != nil { return nil, err } @@ -293,14 +301,14 @@ func (a *ApiHandler) GetLighthouseValidatorInclusion(w http.ResponseWriter, r *h return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("epoch data not found for previous epoch")) } // read the validator set - validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, slot) + validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, stateGetter, slot) if err != nil { return nil, err } if validatorSet == nil { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("validator set not found for current epoch")) } - currentEpochParticipation, previousEpochParticipation, err := a.stateReader.ReadParticipations(tx, slot+(a.beaconChainCfg.SlotsPerEpoch-1)) + currentEpochParticipation, previousEpochParticipation, err := a.stateReader.ReadParticipations(tx, stateGetter, slot+(a.beaconChainCfg.SlotsPerEpoch-1)) if err != nil { return nil, err } diff --git a/cl/beacon/handler/liveness.go b/cl/beacon/handler/liveness.go index ccce105a571..f81d2e74621 100644 --- a/cl/beacon/handler/liveness.go +++ b/cl/beacon/handler/liveness.go @@ -28,6 +28,7 @@ import ( "github.com/erigontech/erigon/cl/beacon/beaconhttp" "github.com/erigontech/erigon/cl/cltypes" "github.com/erigontech/erigon/cl/cltypes/solid" + state_accessors "github.com/erigontech/erigon/cl/persistence/state" ) type live struct { @@ -138,11 +139,15 @@ func (a *ApiHandler) obtainCurrentEpochParticipationFromEpoch(tx kv.Tx, epoch ui if epoch > 0 { prevEpoch-- } + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) currParticipation, ok1 := a.forkchoiceStore.Participation(epoch) prevParticipation, ok2 := a.forkchoiceStore.Participation(prevEpoch) if !ok1 || !ok2 { - return a.stateReader.ReadParticipations(tx, blockSlot) + return a.stateReader.ReadParticipations(tx, stateGetter, blockSlot) } return currParticipation, prevParticipation, nil diff --git a/cl/beacon/handler/rewards.go b/cl/beacon/handler/rewards.go index bec4923de39..6a302207020 100644 --- a/cl/beacon/handler/rewards.go +++ b/cl/beacon/handler/rewards.go @@ -81,7 +81,11 @@ func (a *ApiHandler) GetEthV1BeaconRewardsBlocks(w http.ResponseWriter, r *http. Total: blkRewards.Attestations + blkRewards.ProposerSlashings + blkRewards.AttesterSlashings + blkRewards.SyncAggregate, }).WithFinalized(isFinalized).WithOptimistic(isOptimistic), nil } - slotData, err := state_accessors.ReadSlotData(tx, slot) + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) + slotData, err := state_accessors.ReadSlotData(stateGetter, slot) if err != nil { return nil, err } @@ -165,11 +169,15 @@ func (a *ApiHandler) PostEthV1BeaconRewardsSyncCommittees(w http.ResponseWriter, syncCommittee *solid.SyncCommittee totalActiveBalance uint64 ) + + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + getter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) if slot < a.forkchoiceStore.LowestAvailableSlot() { if !isCanonical { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("non-canonical finalized block not found")) } - epochData, err := state_accessors.ReadEpochData(tx, a.beaconChainCfg.RoundSlotToEpoch(blk.Block.Slot)) + epochData, err := state_accessors.ReadEpochData(getter, a.beaconChainCfg.RoundSlotToEpoch(blk.Block.Slot)) if err != nil { return nil, err } @@ -177,7 +185,7 @@ func (a *ApiHandler) PostEthV1BeaconRewardsSyncCommittees(w http.ResponseWriter, return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("could not read historical sync committee rewards, node may not be archive or it still processing historical states")) } totalActiveBalance = epochData.TotalActiveBalance - syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(tx, a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(blk.Block.Slot)) + syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(getter, a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(blk.Block.Slot)) if err != nil { return nil, err } diff --git a/cl/beacon/handler/states.go b/cl/beacon/handler/states.go index 81e31957125..e781433c74a 100644 --- a/cl/beacon/handler/states.go +++ b/cl/beacon/handler/states.go @@ -259,8 +259,13 @@ func (a *ApiHandler) getFinalityCheckpoints(w http.ResponseWriter, r *http.Reque if err != nil { return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err) } + + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) if !ok { - currentJustifiedCheckpoint, previousJustifiedCheckpoint, finalizedCheckpoint, ok, err = state_accessors.ReadCheckpoints(tx, a.beaconChainCfg.RoundSlotToEpoch(*slot)) + currentJustifiedCheckpoint, previousJustifiedCheckpoint, finalizedCheckpoint, ok, err = state_accessors.ReadCheckpoints(stateGetter, a.beaconChainCfg.RoundSlotToEpoch(*slot)) if err != nil { return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err) } @@ -314,16 +319,21 @@ func (a *ApiHandler) getSyncCommittees(w http.ResponseWriter, r *http.Request) ( return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read block slot: %x", blockRoot)) } + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) + // Code here currentSyncCommittee, nextSyncCommittee, ok := a.forkchoiceStore.GetSyncCommittees(a.beaconChainCfg.SyncCommitteePeriod(*slot)) if !ok { syncCommitteeSlot := a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(*slot) // Check the main database if it cannot be found in the forkchoice store - currentSyncCommittee, err = state_accessors.ReadCurrentSyncCommittee(tx, syncCommitteeSlot) + currentSyncCommittee, err = state_accessors.ReadCurrentSyncCommittee(stateGetter, syncCommitteeSlot) if err != nil { return nil, err } - nextSyncCommittee, err = state_accessors.ReadNextSyncCommittee(tx, syncCommitteeSlot) + nextSyncCommittee, err = state_accessors.ReadNextSyncCommittee(stateGetter, syncCommitteeSlot) if err != nil { return nil, err } @@ -438,7 +448,11 @@ func (a *ApiHandler) getRandao(w http.ResponseWriter, r *http.Request) (*beaconh if canonicalRoot != blockRoot { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao: %x", blockRoot)) } - mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, slot, epoch%a.beaconChainCfg.EpochsPerHistoricalVector) + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) + mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, slot, epoch%a.beaconChainCfg.EpochsPerHistoricalVector) if err != nil { return nil, err } diff --git a/cl/beacon/handler/states_test.go b/cl/beacon/handler/states_test.go index c822ed6774f..48035a6e599 100644 --- a/cl/beacon/handler/states_test.go +++ b/cl/beacon/handler/states_test.go @@ -18,6 +18,7 @@ package handler import ( "encoding/json" + "fmt" "io" "net/http" "net/http/httptest" @@ -159,6 +160,7 @@ func TestGetStateFullHistorical(t *testing.T) { // setupTestingHandler(t, clparams.Phase0Version) _, blocks, _, _, postState, handler, _, _, fcu, _ := setupTestingHandler(t, clparams.Phase0Version, log.Root(), true) + fmt.Println("AX") postRoot, err := postState.HashSSZ() require.NoError(t, err) @@ -213,7 +215,32 @@ func TestGetStateFullHistorical(t *testing.T) { require.NoError(t, err) other := state.New(&clparams.MainnetBeaconConfig) require.NoError(t, other.DecodeSSZ(out, int(clparams.Phase0Version))) - + for i := 0; i < other.ValidatorLength(); i++ { + if other.ValidatorSet().Get(i).PublicKey() != postState.ValidatorSet().Get(i).PublicKey() { + fmt.Println("difference in validator", i, other.ValidatorSet().Get(i).PublicKey(), postState.ValidatorSet().Get(i).PublicKey()) + } + if other.ValidatorSet().Get(i).WithdrawalCredentials() != postState.ValidatorSet().Get(i).WithdrawalCredentials() { + fmt.Println("difference in withdrawal", i, other.ValidatorSet().Get(i).WithdrawalCredentials(), postState.ValidatorSet().Get(i).WithdrawalCredentials()) + } + if other.ValidatorSet().Get(i).EffectiveBalance() != postState.ValidatorSet().Get(i).EffectiveBalance() { + fmt.Println("difference in effective", i, other.ValidatorSet().Get(i).EffectiveBalance(), postState.ValidatorSet().Get(i).EffectiveBalance()) + } + if other.ValidatorSet().Get(i).Slashed() != postState.ValidatorSet().Get(i).Slashed() { + fmt.Println("difference in slashed", i, other.ValidatorSet().Get(i).Slashed(), postState.ValidatorSet().Get(i).Slashed()) + } + if other.ValidatorSet().Get(i).ActivationEligibilityEpoch() != postState.ValidatorSet().Get(i).ActivationEligibilityEpoch() { + fmt.Println("difference in activation", i, other.ValidatorSet().Get(i).ActivationEligibilityEpoch(), postState.ValidatorSet().Get(i).ActivationEligibilityEpoch()) + } + if other.ValidatorSet().Get(i).ActivationEpoch() != postState.ValidatorSet().Get(i).ActivationEpoch() { + fmt.Println("difference in activation", i, other.ValidatorSet().Get(i).ActivationEpoch(), postState.ValidatorSet().Get(i).ActivationEpoch()) + } + if other.ValidatorSet().Get(i).ExitEpoch() != postState.ValidatorSet().Get(i).ExitEpoch() { + fmt.Println("difference in exit", i, other.ValidatorSet().Get(i).ExitEpoch(), postState.ValidatorSet().Get(i).ExitEpoch()) + } + if other.ValidatorSet().Get(i).WithdrawableEpoch() != postState.ValidatorSet().Get(i).WithdrawableEpoch() { + fmt.Println("difference in withdrawable", i, other.ValidatorSet().Get(i).WithdrawableEpoch(), postState.ValidatorSet().Get(i).WithdrawableEpoch()) + } + } otherRoot, err := other.HashSSZ() require.NoError(t, err) require.Equal(t, postRoot, otherRoot) diff --git a/cl/beacon/handler/utils_test.go b/cl/beacon/handler/utils_test.go index 13d263570b1..526fa44a3e9 100644 --- a/cl/beacon/handler/utils_test.go +++ b/cl/beacon/handler/utils_test.go @@ -78,10 +78,10 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge ctx := context.Background() vt := state_accessors.NewStaticValidatorTable() - a := antiquary.NewAntiquary(ctx, nil, preState, vt, &bcfg, datadir.New("/tmp"), nil, db, nil, reader, logger, true, true, false, false, nil) + a := antiquary.NewAntiquary(ctx, nil, preState, vt, &bcfg, datadir.New("/tmp"), nil, db, nil, nil, reader, logger, true, true, false, false, nil) require.NoError(t, a.IncrementBeaconState(ctx, blocks[len(blocks)-1].Block.Slot+33)) // historical states reader below - statesReader := historical_states_reader.NewHistoricalStatesReader(&bcfg, reader, vt, preState) + statesReader := historical_states_reader.NewHistoricalStatesReader(&bcfg, reader, vt, preState, nil) opPool = pool.NewOperationsPool(&bcfg) fcu.Pool = opPool @@ -176,6 +176,7 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge proposerSlashingService, nil, mockValidatorMonitor, + nil, false, ) // TODO: add tests h.Init() diff --git a/cl/beacon/handler/validator_test.go b/cl/beacon/handler/validator_test.go index 53b904ead38..181cc9b9482 100644 --- a/cl/beacon/handler/validator_test.go +++ b/cl/beacon/handler/validator_test.go @@ -75,6 +75,7 @@ func (t *validatorTestSuite) SetupTest() { nil, nil, nil, + nil, false, ) t.gomockCtrl = gomockCtrl diff --git a/cl/beacon/handler/validators.go b/cl/beacon/handler/validators.go index 4cdcc6e6bc0..64a0cd7ed35 100644 --- a/cl/beacon/handler/validators.go +++ b/cl/beacon/handler/validators.go @@ -339,8 +339,13 @@ func (a *ApiHandler) writeValidatorsResponse( } stateEpoch := *slot / a.beaconChainCfg.SlotsPerEpoch + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + getter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) + if *slot < a.forkchoiceStore.LowestAvailableSlot() { - validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, *slot) + validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, getter, *slot) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -348,7 +353,7 @@ func (a *ApiHandler) writeValidatorsResponse( http.Error(w, fmt.Errorf("state not found for slot %v", *slot).Error(), http.StatusNotFound) return } - balances, err := a.stateReader.ReadValidatorsBalances(tx, *slot) + balances, err := a.stateReader.ReadValidatorsBalances(tx, getter, *slot) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -455,6 +460,11 @@ func (a *ApiHandler) GetEthV1BeaconStatesValidator(w http.ResponseWriter, r *htt return nil, err } + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + getter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) + if blockId.Head() { // Lets see if we point to head, if yes then we need to look at the head state we always keep. s, cn := a.syncedData.HeadState() defer cn() @@ -477,14 +487,14 @@ func (a *ApiHandler) GetEthV1BeaconStatesValidator(w http.ResponseWriter, r *htt stateEpoch := *slot / a.beaconChainCfg.SlotsPerEpoch if *slot < a.forkchoiceStore.LowestAvailableSlot() { - validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, *slot) + validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, getter, *slot) if err != nil { return nil, err } if validatorSet == nil { return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("validators not found")) } - balances, err := a.stateReader.ReadValidatorsBalances(tx, *slot) + balances, err := a.stateReader.ReadValidatorsBalances(tx, getter, *slot) if err != nil { return nil, err } @@ -598,8 +608,13 @@ func (a *ApiHandler) getValidatorBalances(ctx context.Context, w http.ResponseWr return } + snRoTx := a.caplinStateSnapshots.View() + defer snRoTx.Close() + + getter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx) + if *slot < a.forkchoiceStore.LowestAvailableSlot() { - balances, err := a.stateReader.ReadValidatorsBalances(tx, *slot) + balances, err := a.stateReader.ReadValidatorsBalances(tx, getter, *slot) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/cl/persistence/state/historical_states_reader/attesting_indicies.go b/cl/persistence/state/historical_states_reader/attesting_indicies.go index f0ef90b286c..9da5eb8be79 100644 --- a/cl/persistence/state/historical_states_reader/attesting_indicies.go +++ b/cl/persistence/state/historical_states_reader/attesting_indicies.go @@ -19,13 +19,11 @@ package historical_states_reader import ( "errors" "fmt" - "time" libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon/cl/clparams" "github.com/erigontech/erigon/cl/cltypes/solid" - "github.com/erigontech/erigon/cl/monitor/shuffling_metrics" "github.com/erigontech/erigon/cl/persistence/base_encoding" state_accessors "github.com/erigontech/erigon/cl/persistence/state" "github.com/erigontech/erigon/cl/phase1/core/state/shuffling" @@ -104,19 +102,9 @@ func (r *HistoricalStatesReader) ComputeCommittee(mix libcommon.Hash, indicies [ start := (lenIndicies * index) / count end := (lenIndicies * (index + 1)) / count var shuffledIndicies []uint64 - epoch := slot / cfg.SlotsPerEpoch - /* - mixPosition := (epoch + cfg.EpochsPerHistoricalVector - cfg.MinSeedLookahead - 1) % cfg.EpochsPerHistoricalVector - */ - if shuffledIndicesInterface, ok := r.shuffledSetsCache.Get(epoch); ok { - shuffledIndicies = shuffledIndicesInterface - } else { - shuffledIndicies = make([]uint64, lenIndicies) - start := time.Now() - shuffledIndicies = shuffling.ComputeShuffledIndicies(cfg, mix, shuffledIndicies, indicies, slot) - shuffling_metrics.ObserveComputeShuffledIndiciesTime(start) - r.shuffledSetsCache.Add(epoch, shuffledIndicies) - } + + shuffledIndicies = make([]uint64, lenIndicies) + shuffledIndicies = shuffling.ComputeShuffledIndicies(cfg, mix, shuffledIndicies, indicies, slot) return shuffledIndicies[start:end], nil } @@ -132,7 +120,7 @@ func committeeCount(cfg *clparams.BeaconChainConfig, epoch uint64, idxs []uint64 return committeCount } -func (r *HistoricalStatesReader) readHistoricalBlockRoot(tx kv.Tx, slot, index uint64) (libcommon.Hash, error) { +func (r *HistoricalStatesReader) readHistoricalBlockRoot(kvGetter state_accessors.GetValFn, slot, index uint64) (libcommon.Hash, error) { slotSubIndex := slot % r.cfg.SlotsPerHistoricalRoot needFromGenesis := true @@ -152,7 +140,7 @@ func (r *HistoricalStatesReader) readHistoricalBlockRoot(tx kv.Tx, slot, index u if needFromGenesis { return r.genesisState.GetBlockRootAtSlot(slot) } - br, err := tx.GetOne(kv.BlockRoot, base_encoding.Encode64ToBytes4(slotLookup)) + br, err := kvGetter(kv.BlockRoot, base_encoding.Encode64ToBytes4(slotLookup)) if err != nil { return libcommon.Hash{}, err } @@ -163,8 +151,9 @@ func (r *HistoricalStatesReader) readHistoricalBlockRoot(tx kv.Tx, slot, index u } -func (r *HistoricalStatesReader) getAttestationParticipationFlagIndicies(tx kv.Tx, version clparams.StateVersion, stateSlot uint64, data solid.AttestationData, inclusionDelay uint64, skipAssert bool) ([]uint8, error) { - currentCheckpoint, previousCheckpoint, _, ok, err := state_accessors.ReadCheckpoints(tx, r.cfg.RoundSlotToEpoch(stateSlot)) +func (r *HistoricalStatesReader) getAttestationParticipationFlagIndicies(tx kv.Tx, getter state_accessors.GetValFn, version clparams.StateVersion, stateSlot uint64, data solid.AttestationData, inclusionDelay uint64, skipAssert bool) ([]uint8, error) { + + currentCheckpoint, previousCheckpoint, _, ok, err := state_accessors.ReadCheckpoints(getter, r.cfg.RoundSlotToEpoch(stateSlot)) if err != nil { return nil, err } @@ -186,13 +175,13 @@ func (r *HistoricalStatesReader) getAttestationParticipationFlagIndicies(tx kv.T return nil, errors.New("GetAttestationParticipationFlagIndicies: source does not match.") } i := (data.Target.Epoch * r.cfg.SlotsPerEpoch) % r.cfg.SlotsPerHistoricalRoot - targetRoot, err := r.readHistoricalBlockRoot(tx, stateSlot, i) + targetRoot, err := r.readHistoricalBlockRoot(getter, stateSlot, i) if err != nil { return nil, err } i = data.Slot % r.cfg.SlotsPerHistoricalRoot - headRoot, err := r.readHistoricalBlockRoot(tx, stateSlot, i) + headRoot, err := r.readHistoricalBlockRoot(getter, stateSlot, i) if err != nil { return nil, err } diff --git a/cl/persistence/state/historical_states_reader/historical_states_reader.go b/cl/persistence/state/historical_states_reader/historical_states_reader.go index d1cd90c670c..6bc33e3b005 100644 --- a/cl/persistence/state/historical_states_reader/historical_states_reader.go +++ b/cl/persistence/state/historical_states_reader/historical_states_reader.go @@ -33,7 +33,7 @@ import ( "github.com/erigontech/erigon/cl/persistence/base_encoding" state_accessors "github.com/erigontech/erigon/cl/persistence/state" "github.com/erigontech/erigon/cl/phase1/core/state" - "github.com/erigontech/erigon/cl/phase1/core/state/lru" + "github.com/erigontech/erigon/turbo/snapshotsync" "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" "github.com/klauspost/compress/zstd" ) @@ -46,39 +46,43 @@ type HistoricalStatesReader struct { cfg *clparams.BeaconChainConfig validatorTable *state_accessors.StaticValidatorTable // We can save 80% of the I/O by caching the validator table blockReader freezeblocks.BeaconSnapshotReader + stateSn *snapshotsync.CaplinStateSnapshots genesisState *state.CachingBeaconState - - // cache for shuffled sets - shuffledSetsCache *lru.Cache[uint64, []uint64] } func NewHistoricalStatesReader( cfg *clparams.BeaconChainConfig, blockReader freezeblocks.BeaconSnapshotReader, validatorTable *state_accessors.StaticValidatorTable, - genesisState *state.CachingBeaconState) *HistoricalStatesReader { - - cache, err := lru.New[uint64, []uint64]("shuffledSetsCache_reader", 125) - if err != nil { - panic(err) - } + genesisState *state.CachingBeaconState, stateSn *snapshotsync.CaplinStateSnapshots) *HistoricalStatesReader { return &HistoricalStatesReader{ - cfg: cfg, - blockReader: blockReader, - genesisState: genesisState, - validatorTable: validatorTable, - shuffledSetsCache: cache, + cfg: cfg, + blockReader: blockReader, + genesisState: genesisState, + validatorTable: validatorTable, + stateSn: stateSn, } } func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv.Tx, slot uint64) (*state.CachingBeaconState, error) { + snapshotView := r.stateSn.View() + defer snapshotView.Close() + + kvGetter := state_accessors.GetValFnTxAndSnapshot(tx, snapshotView) + ret := state.New(r.cfg) latestProcessedState, err := state_accessors.GetStateProcessingProgress(tx) if err != nil { return nil, err } + var blocksAvailableInSnapshots uint64 + if r.stateSn != nil { + blocksAvailableInSnapshots = r.stateSn.BlocksAvailable() + } + latestProcessedState = max(latestProcessedState, blocksAvailableInSnapshots) + // If this happens, we need to update our static tables if slot > latestProcessedState || slot > r.validatorTable.Slot() { log.Warn("slot is ahead of the latest processed state", "slot", slot, "latestProcessedState", latestProcessedState, "validatorTableSlot", r.validatorTable.Slot()) @@ -100,7 +104,7 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. blockHeader := block.SignedBeaconBlockHeader().Header blockHeader.Root = common.Hash{} // Read the epoch and per-slot data. - slotData, err := state_accessors.ReadSlotData(tx, slot) + slotData, err := state_accessors.ReadSlotData(kvGetter, slot) if err != nil { return nil, err } @@ -110,7 +114,7 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. } roundedSlot := r.cfg.RoundSlotToEpoch(slot) - epochData, err := state_accessors.ReadEpochData(tx, roundedSlot) + epochData, err := state_accessors.ReadEpochData(kvGetter, roundedSlot) if err != nil { return nil, fmt.Errorf("failed to read epoch data: %w", err) } @@ -129,12 +133,12 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. stateRoots, blockRoots := solid.NewHashVector(int(r.cfg.SlotsPerHistoricalRoot)), solid.NewHashVector(int(r.cfg.SlotsPerHistoricalRoot)) ret.SetLatestBlockHeader(blockHeader) - if err := r.readHistoryHashVector(tx, r.genesisState.BlockRoots(), slot, r.cfg.SlotsPerHistoricalRoot, kv.BlockRoot, blockRoots); err != nil { + if err := r.readHistoryHashVector(tx, kvGetter, r.genesisState.BlockRoots(), slot, r.cfg.SlotsPerHistoricalRoot, kv.BlockRoot, blockRoots); err != nil { return nil, fmt.Errorf("failed to read block roots: %w", err) } ret.SetBlockRoots(blockRoots) - if err := r.readHistoryHashVector(tx, r.genesisState.StateRoots(), slot, r.cfg.SlotsPerHistoricalRoot, kv.StateRoot, stateRoots); err != nil { + if err := r.readHistoryHashVector(tx, kvGetter, r.genesisState.StateRoots(), slot, r.cfg.SlotsPerHistoricalRoot, kv.StateRoot, stateRoots); err != nil { return nil, fmt.Errorf("failed to read state roots: %w", err) } ret.SetStateRoots(stateRoots) @@ -150,14 +154,14 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. // Eth1 eth1DataVotes := solid.NewStaticListSSZ[*cltypes.Eth1Data](int(r.cfg.Eth1DataVotesLength()), 72) - if err := r.readEth1DataVotes(tx, slotData.Eth1DataLength, slot, eth1DataVotes); err != nil { + if err := r.readEth1DataVotes(kvGetter, slotData.Eth1DataLength, slot, eth1DataVotes); err != nil { return nil, fmt.Errorf("failed to read eth1 data votes: %w", err) } ret.SetEth1DataVotes(eth1DataVotes) ret.SetEth1Data(slotData.Eth1Data) ret.SetEth1DepositIndex(slotData.Eth1DepositIndex) // Registry (Validators + Balances) - balancesBytes, err := r.reconstructBalances(tx, slotData.ValidatorLength, slot, kv.ValidatorBalance, kv.BalancesDump) + balancesBytes, err := r.reconstructBalances(tx, kvGetter, slotData.ValidatorLength, slot, kv.ValidatorBalance, kv.BalancesDump) if err != nil { return nil, fmt.Errorf("failed to read validator balances: %w", err) } @@ -168,27 +172,27 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. ret.SetBalances(balances) - validatorSet, err := r.ReadValidatorsForHistoricalState(tx, slot) + validatorSet, err := r.ReadValidatorsForHistoricalState(tx, kvGetter, slot) if err != nil { return nil, fmt.Errorf("failed to read validators: %w", err) } ret.SetValidators(validatorSet) // Randomness randaoMixes := solid.NewHashVector(int(r.cfg.EpochsPerHistoricalVector)) - if err := r.readRandaoMixes(tx, slot, randaoMixes); err != nil { + if err := r.readRandaoMixes(tx, kvGetter, slot, randaoMixes); err != nil { return nil, fmt.Errorf("failed to read randao mixes: %w", err) } ret.SetRandaoMixes(randaoMixes) slashingsVector := solid.NewUint64VectorSSZ(int(r.cfg.EpochsPerSlashingsVector)) // Slashings - err = r.ReconstructUint64ListDump(tx, slot, kv.ValidatorSlashings, int(r.cfg.EpochsPerSlashingsVector), slashingsVector) + err = r.ReconstructUint64ListDump(kvGetter, slot, kv.ValidatorSlashings, int(r.cfg.EpochsPerSlashingsVector), slashingsVector) if err != nil { return nil, fmt.Errorf("failed to read slashings: %w", err) } ret.SetSlashings(slashingsVector) // Finality - currentCheckpoint, previousCheckpoint, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(tx, roundedSlot) + currentCheckpoint, previousCheckpoint, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(kvGetter, roundedSlot) if err != nil { return nil, fmt.Errorf("failed to read checkpoints: %w", err) } @@ -211,7 +215,7 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. ret.SetCurrentEpochAttestations(currentAtts) ret.SetPreviousEpochAttestations(previousAtts) } else { - currentIdxs, previousIdxs, err := r.ReadParticipations(tx, slot) + currentIdxs, previousIdxs, err := r.ReadParticipations(tx, kvGetter, slot) if err != nil { return nil, fmt.Errorf("failed to read participations: %w", err) } @@ -224,7 +228,7 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. } inactivityScores := solid.NewUint64ListSSZ(int(r.cfg.ValidatorRegistryLimit)) // Inactivity - err = r.ReconstructUint64ListDump(tx, slot, kv.InactivityScores, int(slotData.ValidatorLength), inactivityScores) + err = r.ReconstructUint64ListDump(kvGetter, slot, kv.InactivityScores, int(slotData.ValidatorLength), inactivityScores) if err != nil { return nil, fmt.Errorf("failed to read inactivity scores: %w", err) } @@ -232,7 +236,7 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. ret.SetInactivityScoresRaw(inactivityScores) // Sync syncCommitteeSlot := r.cfg.RoundSlotToSyncCommitteePeriod(slot) - currentSyncCommittee, err := state_accessors.ReadCurrentSyncCommittee(tx, syncCommitteeSlot) + currentSyncCommittee, err := state_accessors.ReadCurrentSyncCommittee(kvGetter, syncCommitteeSlot) if err != nil { return nil, fmt.Errorf("failed to read current sync committee: %w", err) } @@ -240,7 +244,7 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. currentSyncCommittee = r.genesisState.CurrentSyncCommittee() } - nextSyncCommittee, err := state_accessors.ReadNextSyncCommittee(tx, syncCommitteeSlot) + nextSyncCommittee, err := state_accessors.ReadNextSyncCommittee(kvGetter, syncCommitteeSlot) if err != nil { return nil, fmt.Errorf("failed to read next sync committee: %w", err) } @@ -277,30 +281,36 @@ func (r *HistoricalStatesReader) ReadHistoricalState(ctx context.Context, tx kv. return ret, nil } -func (r *HistoricalStatesReader) readHistoryHashVector(tx kv.Tx, genesisVector solid.HashVectorSSZ, slot, size uint64, table string, out solid.HashVectorSSZ) (err error) { +func (r *HistoricalStatesReader) readHistoryHashVector(tx kv.Tx, kvGetter state_accessors.GetValFn, genesisVector solid.HashVectorSSZ, slot, size uint64, table string, out solid.HashVectorSSZ) (err error) { var needFromGenesis, inserted uint64 if size > slot || slot-size <= r.genesisState.Slot() { needFromGenesis = size - (slot - r.genesisState.Slot()) } needFromDB := size - needFromGenesis - cursor, err := tx.Cursor(table) + highestAvaiableSlot, err := r.highestSlotInSnapshotsAndDB(tx, table) if err != nil { return err } - defer cursor.Close() + var currKeySlot uint64 - for k, v, err := cursor.Seek(base_encoding.Encode64ToBytes4(slot - needFromDB)); err == nil && k != nil; k, v, err = cursor.Next() { + for i := slot - needFromDB; i <= highestAvaiableSlot; i++ { + key := base_encoding.Encode64ToBytes4(i) + v, err := kvGetter(table, key) + if err != nil { + return err + } if len(v) != 32 { - return fmt.Errorf("invalid key %x", k) + return fmt.Errorf("invalid key %x", key) } - currKeySlot = base_encoding.Decode64FromBytes4(k) + currKeySlot = i out.Set(int(currKeySlot%size), common.BytesToHash(v)) inserted++ if inserted == needFromDB { break } } + for i := 0; i < int(needFromGenesis); i++ { currKeySlot++ out.Set(int(currKeySlot%size), genesisVector.Get(int(currKeySlot%size))) @@ -308,18 +318,8 @@ func (r *HistoricalStatesReader) readHistoryHashVector(tx kv.Tx, genesisVector s return nil } -func (r *HistoricalStatesReader) readEth1DataVotes(tx kv.Tx, eth1DataVotesLength, slot uint64, out *solid.ListSSZ[*cltypes.Eth1Data]) error { +func (r *HistoricalStatesReader) readEth1DataVotes(kvGetter state_accessors.GetValFn, eth1DataVotesLength, slot uint64, out *solid.ListSSZ[*cltypes.Eth1Data]) error { initialSlot := r.cfg.RoundSlotToVotePeriod(slot) - initialKey := base_encoding.Encode64ToBytes4(initialSlot) - cursor, err := tx.Cursor(kv.Eth1DataVotes) - if err != nil { - return err - } - defer cursor.Close() - k, v, err := cursor.Seek(initialKey) - if err != nil { - return err - } if initialSlot <= r.genesisState.Slot() { // We need to prepend the genesis votes for i := 0; i < r.genesisState.Eth1DataVotes().Len(); i++ { @@ -329,24 +329,53 @@ func (r *HistoricalStatesReader) readEth1DataVotes(tx kv.Tx, eth1DataVotesLength endSlot := r.cfg.RoundSlotToVotePeriod(slot + r.cfg.SlotsPerEpoch*r.cfg.EpochsPerEth1VotingPeriod) - for k != nil && base_encoding.Decode64FromBytes4(k) < endSlot { + for i := initialSlot; i < endSlot; i++ { if out.Len() >= int(eth1DataVotesLength) { break } + key := base_encoding.Encode64ToBytes4(i) + v, err := kvGetter(kv.Eth1DataVotes, key) + if err != nil { + return err + } + if len(v) == 0 { + continue + } eth1Data := &cltypes.Eth1Data{} if err := eth1Data.DecodeSSZ(v, 0); err != nil { return err } out.Append(eth1Data) - k, v, err = cursor.Next() - if err != nil { - return err - } } + return nil } -func (r *HistoricalStatesReader) readRandaoMixes(tx kv.Tx, slot uint64, out solid.HashVectorSSZ) error { +func (r *HistoricalStatesReader) highestSlotInSnapshotsAndDB(tx kv.Tx, tbl string) (uint64, error) { + cursor, err := tx.Cursor(tbl) + if err != nil { + return 0, err + } + defer cursor.Close() + k, _, err := cursor.Last() + if err != nil { + return 0, err + } + if k == nil { + if r.stateSn != nil { + return r.stateSn.BlocksAvailable(), nil + } + return 0, nil + } + avaiableInDB := base_encoding.Decode64FromBytes4(k) + var availableInSnapshots uint64 + if r.stateSn != nil { + availableInSnapshots = r.stateSn.BlocksAvailable() + } + return max(avaiableInDB, availableInSnapshots), nil +} + +func (r *HistoricalStatesReader) readRandaoMixes(tx kv.Tx, kvGetter state_accessors.GetValFn, slot uint64, out solid.HashVectorSSZ) error { size := r.cfg.EpochsPerHistoricalVector genesisVector := r.genesisState.RandaoMixes() var needFromGenesis, inserted uint64 @@ -358,17 +387,26 @@ func (r *HistoricalStatesReader) readRandaoMixes(tx kv.Tx, slot uint64, out soli } needFromDB := size - needFromGenesis - cursor, err := tx.Cursor(kv.RandaoMixes) + + highestAvaiableSlot, err := r.highestSlotInSnapshotsAndDB(tx, kv.RandaoMixes) if err != nil { return err } - defer cursor.Close() var currKeyEpoch uint64 - for k, v, err := cursor.Seek(base_encoding.Encode64ToBytes4(roundedSlot - (needFromDB)*r.cfg.SlotsPerEpoch)); err == nil && k != nil; k, v, err = cursor.Next() { + + for i := roundedSlot - (needFromDB)*r.cfg.SlotsPerEpoch; i <= highestAvaiableSlot; i++ { + key := base_encoding.Encode64ToBytes4(i) + v, err := kvGetter(kv.RandaoMixes, key) + if err != nil { + return err + } + if len(v) == 0 { + continue + } if len(v) != 32 { - return fmt.Errorf("invalid key %x", k) + return fmt.Errorf("invalid key %x", key) } - currKeyEpoch = base_encoding.Decode64FromBytes4(k) / r.cfg.SlotsPerEpoch + currKeyEpoch = i / r.cfg.SlotsPerEpoch out.Set(int(currKeyEpoch%size), common.BytesToHash(v)) inserted++ if inserted == needFromDB { @@ -379,8 +417,9 @@ func (r *HistoricalStatesReader) readRandaoMixes(tx kv.Tx, slot uint64, out soli currKeyEpoch++ out.Set(int(currKeyEpoch%size), genesisVector.Get(int(currKeyEpoch%size))) } + // Now we need to read the intra epoch randao mix. - intraRandaoMix, err := tx.GetOne(kv.IntraRandaoMixes, base_encoding.Encode64ToBytes4(slot)) + intraRandaoMix, err := kvGetter(kv.IntraRandaoMixes, base_encoding.Encode64ToBytes4(slot)) if err != nil { return err } @@ -391,7 +430,7 @@ func (r *HistoricalStatesReader) readRandaoMixes(tx kv.Tx, slot uint64, out soli return nil } -func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, validatorSetLength, slot uint64, diffBucket string, dumpBucket string) ([]byte, error) { +func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, kvGetter state_accessors.GetValFn, validatorSetLength, slot uint64, diffBucket string, dumpBucket string) ([]byte, error) { // Read the file remainder := slot % clparams.SlotsPerDump freshDumpSlot := slot - remainder @@ -403,13 +442,14 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, validator return nil, err } forward := remainder <= midpoint || currentStageProgress <= freshDumpSlot+clparams.SlotsPerDump + fmt.Println("forward", forward) if forward { - compressed, err = tx.GetOne(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot)) + compressed, err = kvGetter(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot)) if err != nil { return nil, err } } else { - compressed, err = tx.GetOne(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot+clparams.SlotsPerDump)) + compressed, err = kvGetter(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot+clparams.SlotsPerDump)) if err != nil { return nil, err } @@ -438,43 +478,44 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, validator return nil, err } - diffCursor, err := tx.Cursor(diffBucket) + highestSlotAvailable, err := r.highestSlotInSnapshotsAndDB(tx, diffBucket) if err != nil { return nil, err } - defer diffCursor.Close() if forward { - for k, v, err := diffCursor.Seek(base_encoding.Encode64ToBytes4(freshDumpSlot)); err == nil && k != nil && base_encoding.Decode64FromBytes4(k) <= slot; k, v, err = diffCursor.Next() { + for currSlot := freshDumpSlot; currSlot <= slot && currSlot <= highestSlotAvailable; currSlot++ { + key := base_encoding.Encode64ToBytes4(currSlot) + v, err := kvGetter(diffBucket, key) if err != nil { return nil, err } - if len(k) != 4 { - return nil, fmt.Errorf("invalid key %x", k) + if len(v) == 0 { + continue + } + if len(key) != 4 { + return nil, fmt.Errorf("invalid key %x", key) } - currSlot := base_encoding.Decode64FromBytes4(k) if currSlot == freshDumpSlot { continue } - if currSlot > slot { - return nil, fmt.Errorf("diff not found for slot %d", slot) - } currentList, err = base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, v, false) if err != nil { return nil, err } } } else { - for k, v, err := diffCursor.Seek(base_encoding.Encode64ToBytes4(freshDumpSlot + clparams.SlotsPerDump)); err == nil && k != nil && base_encoding.Decode64FromBytes4(k) > slot; k, v, err = diffCursor.Prev() { + for currSlot := freshDumpSlot + clparams.SlotsPerDump; currSlot > slot && currSlot > r.genesisState.Slot(); currSlot-- { + key := base_encoding.Encode64ToBytes4(currSlot) + v, err := kvGetter(diffBucket, key) if err != nil { return nil, err } - if len(k) != 4 { - return nil, fmt.Errorf("invalid key %x", k) - } - currSlot := base_encoding.Decode64FromBytes4(k) - if currSlot <= slot || currSlot > freshDumpSlot+clparams.SlotsPerDump { + if len(v) == 0 { continue } + if len(key) != 4 { + return nil, fmt.Errorf("invalid key %x", key) + } currentList, err = base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, v, true) if err != nil { return nil, err @@ -485,7 +526,7 @@ func (r *HistoricalStatesReader) reconstructDiffedUint64List(tx kv.Tx, validator return currentList, err } -func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, validatorSetLength, slot uint64, diffBucket, dumpBucket string) ([]byte, error) { +func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, kvGetter state_accessors.GetValFn, validatorSetLength, slot uint64, diffBucket, dumpBucket string) ([]byte, error) { remainder := slot % clparams.SlotsPerDump freshDumpSlot := slot - remainder @@ -501,12 +542,12 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, validatorSetLengt midpoint := uint64(clparams.SlotsPerDump / 2) forward := remainder <= midpoint || currentStageProgress <= freshDumpSlot+clparams.SlotsPerDump if forward { - compressed, err = tx.GetOne(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot)) + compressed, err = kvGetter(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot)) if err != nil { return nil, err } } else { - compressed, err = tx.GetOne(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot+clparams.SlotsPerDump)) + compressed, err = kvGetter(dumpBucket, base_encoding.Encode64ToBytes4(freshDumpSlot+clparams.SlotsPerDump)) if err != nil { return nil, err } @@ -535,7 +576,7 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, validatorSetLengt if i == freshDumpSlot { continue } - diff, err := tx.GetOne(diffBucket, base_encoding.Encode64ToBytes4(i)) + diff, err := kvGetter(diffBucket, base_encoding.Encode64ToBytes4(i)) if err != nil { return nil, err } @@ -549,7 +590,7 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, validatorSetLengt } } else { for i := freshDumpSlot + clparams.SlotsPerDump; i > roundedSlot; i -= r.cfg.SlotsPerEpoch { - diff, err := tx.GetOne(diffBucket, base_encoding.Encode64ToBytes4(i)) + diff, err := kvGetter(diffBucket, base_encoding.Encode64ToBytes4(i)) if err != nil { return nil, err } @@ -563,17 +604,12 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, validatorSetLengt } } - diffCursor, err := tx.Cursor(diffBucket) - if err != nil { - return nil, err - } - defer diffCursor.Close() if slot%r.cfg.SlotsPerEpoch == 0 { currentList = currentList[:validatorSetLength*8] return currentList, nil } - slotDiff, err := tx.GetOne(diffBucket, base_encoding.Encode64ToBytes4(slot)) + slotDiff, err := kvGetter(diffBucket, base_encoding.Encode64ToBytes4(slot)) if err != nil { return nil, err } @@ -585,27 +621,24 @@ func (r *HistoricalStatesReader) reconstructBalances(tx kv.Tx, validatorSetLengt return base_encoding.ApplyCompressedSerializedUint64ListDiff(currentList, currentList, slotDiff, false) } -func (r *HistoricalStatesReader) ReconstructUint64ListDump(tx kv.Tx, slot uint64, bkt string, size int, out solid.Uint64ListSSZ) error { - diffCursor, err := tx.Cursor(bkt) - if err != nil { - return err - } - defer diffCursor.Close() - - k, v, err := diffCursor.Seek(base_encoding.Encode64ToBytes4(slot)) - if err != nil { - return err - } - if k == nil { - return fmt.Errorf("diff not found for slot %d", slot) - } - keySlot := base_encoding.Decode64FromBytes4(k) - if keySlot > slot { - _, v, err = diffCursor.Prev() +func (r *HistoricalStatesReader) ReconstructUint64ListDump(kvGetter state_accessors.GetValFn, slot uint64, bkt string, size int, out solid.Uint64ListSSZ) error { + var ( + v []byte + err error + ) + // Try seeking <= to slot + for i := slot; i >= r.genesisState.Slot(); i-- { + key := base_encoding.Encode64ToBytes4(i) + v, err = kvGetter(bkt, key) if err != nil { return err } + if len(v) == 0 { + continue + } + break } + var b bytes.Buffer if _, err := b.Write(v); err != nil { return err @@ -625,9 +658,9 @@ func (r *HistoricalStatesReader) ReconstructUint64ListDump(tx kv.Tx, slot uint64 return out.DecodeSSZ(currentList, 0) } -func (r *HistoricalStatesReader) ReadValidatorsForHistoricalState(tx kv.Tx, slot uint64) (*solid.ValidatorSet, error) { +func (r *HistoricalStatesReader) ReadValidatorsForHistoricalState(tx kv.Tx, kvGetter state_accessors.GetValFn, slot uint64) (*solid.ValidatorSet, error) { // Read the minimal beacon state which have the small fields. - sd, err := state_accessors.ReadSlotData(tx, slot) + sd, err := state_accessors.ReadSlotData(kvGetter, slot) if err != nil { return nil, err } @@ -648,7 +681,7 @@ func (r *HistoricalStatesReader) ReadValidatorsForHistoricalState(tx kv.Tx, slot }) // Read the balances - bytesEffectiveBalances, err := r.reconstructDiffedUint64List(tx, validatorSetLength, slot, kv.ValidatorEffectiveBalance, kv.EffectiveBalancesDump) + bytesEffectiveBalances, err := r.reconstructDiffedUint64List(tx, kvGetter, validatorSetLength, slot, kv.ValidatorEffectiveBalance, kv.EffectiveBalancesDump) if err != nil { return nil, err } @@ -711,12 +744,12 @@ func (r *HistoricalStatesReader) readPendingEpochs(tx kv.Tx, slot uint64) (*soli } // readParticipations shuffles active indicies and returns the participation flags for the given epoch. -func (r *HistoricalStatesReader) ReadParticipations(tx kv.Tx, slot uint64) (*solid.ParticipationBitList, *solid.ParticipationBitList, error) { +func (r *HistoricalStatesReader) ReadParticipations(tx kv.Tx, kvGetter state_accessors.GetValFn, slot uint64) (*solid.ParticipationBitList, *solid.ParticipationBitList, error) { var beginSlot uint64 epoch, prevEpoch := r.computeRelevantEpochs(slot) beginSlot = prevEpoch * r.cfg.SlotsPerEpoch - currentActiveIndicies, err := state_accessors.ReadActiveIndicies(tx, epoch*r.cfg.SlotsPerEpoch) + currentActiveIndicies, err := state_accessors.ReadActiveIndicies(kvGetter, epoch*r.cfg.SlotsPerEpoch) if err != nil { return nil, nil, err } @@ -724,14 +757,14 @@ func (r *HistoricalStatesReader) ReadParticipations(tx kv.Tx, slot uint64) (*sol if epoch == 0 { previousActiveIndicies = currentActiveIndicies } else { - previousActiveIndicies, err = state_accessors.ReadActiveIndicies(tx, (epoch-1)*r.cfg.SlotsPerEpoch) + previousActiveIndicies, err = state_accessors.ReadActiveIndicies(kvGetter, (epoch-1)*r.cfg.SlotsPerEpoch) if err != nil { return nil, nil, err } } // Read the minimal beacon state which have the small fields. - sd, err := state_accessors.ReadSlotData(tx, slot) + sd, err := state_accessors.ReadSlotData(kvGetter, slot) if err != nil { return nil, nil, err } @@ -746,10 +779,7 @@ func (r *HistoricalStatesReader) ReadParticipations(tx kv.Tx, slot uint64) (*sol if err != nil { return nil, nil, err } - // trigger the cache for shuffled sets in parallel - if err := r.tryCachingEpochsInParallell(tx, [][]uint64{currentActiveIndicies, previousActiveIndicies}, []uint64{epoch, prevEpoch}); err != nil { - return nil, nil, err - } + // Read the previous idxs for i := beginSlot; i <= slot; i++ { // Read the block @@ -784,7 +814,7 @@ func (r *HistoricalStatesReader) ReadParticipations(tx kv.Tx, slot uint64) (*sol attestationEpoch := data.Slot / r.cfg.SlotsPerEpoch mixPosition := (attestationEpoch + r.cfg.EpochsPerHistoricalVector - r.cfg.MinSeedLookahead - 1) % r.cfg.EpochsPerHistoricalVector - mix, err := r.ReadRandaoMixBySlotAndIndex(tx, data.Slot, mixPosition) + mix, err := r.ReadRandaoMixBySlotAndIndex(tx, kvGetter, data.Slot, mixPosition) if err != nil { return false } @@ -795,7 +825,7 @@ func (r *HistoricalStatesReader) ReadParticipations(tx kv.Tx, slot uint64) (*sol return false } var participationFlagsIndicies []uint8 - participationFlagsIndicies, err = r.getAttestationParticipationFlagIndicies(tx, block.Version(), i, *data, i-data.Slot, true) + participationFlagsIndicies, err = r.getAttestationParticipationFlagIndicies(tx, kvGetter, block.Version(), i, *data, i-data.Slot, true) if err != nil { return false } @@ -836,12 +866,12 @@ func (r *HistoricalStatesReader) computeRelevantEpochs(slot uint64) (uint64, uin return epoch, epoch - 1 } -func (r *HistoricalStatesReader) tryCachingEpochsInParallell(tx kv.Tx, activeIdxs [][]uint64, epochs []uint64) error { +func (r *HistoricalStatesReader) tryCachingEpochsInParallell(tx kv.Tx, kvGetter state_accessors.GetValFn, activeIdxs [][]uint64, epochs []uint64) error { var wg sync.WaitGroup wg.Add(len(epochs)) for i, epoch := range epochs { mixPosition := (epoch + r.cfg.EpochsPerHistoricalVector - r.cfg.MinSeedLookahead - 1) % r.cfg.EpochsPerHistoricalVector - mix, err := r.ReadRandaoMixBySlotAndIndex(tx, epochs[0]*r.cfg.SlotsPerEpoch, mixPosition) + mix, err := r.ReadRandaoMixBySlotAndIndex(tx, kvGetter, epochs[0]*r.cfg.SlotsPerEpoch, mixPosition) if err != nil { return err } @@ -856,8 +886,8 @@ func (r *HistoricalStatesReader) tryCachingEpochsInParallell(tx kv.Tx, activeIdx return nil } -func (r *HistoricalStatesReader) ReadValidatorsBalances(tx kv.Tx, slot uint64) (solid.Uint64ListSSZ, error) { - sd, err := state_accessors.ReadSlotData(tx, slot) +func (r *HistoricalStatesReader) ReadValidatorsBalances(tx kv.Tx, kvGetter state_accessors.GetValFn, slot uint64) (solid.Uint64ListSSZ, error) { + sd, err := state_accessors.ReadSlotData(kvGetter, slot) if err != nil { return nil, err } @@ -866,7 +896,7 @@ func (r *HistoricalStatesReader) ReadValidatorsBalances(tx kv.Tx, slot uint64) ( return nil, nil } - balances, err := r.reconstructBalances(tx, sd.ValidatorLength, slot, kv.ValidatorBalance, kv.BalancesDump) + balances, err := r.reconstructBalances(tx, kvGetter, sd.ValidatorLength, slot, kv.ValidatorBalance, kv.BalancesDump) if err != nil { return nil, err } @@ -875,11 +905,11 @@ func (r *HistoricalStatesReader) ReadValidatorsBalances(tx kv.Tx, slot uint64) ( return balancesList, balancesList.DecodeSSZ(balances, 0) } -func (r *HistoricalStatesReader) ReadRandaoMixBySlotAndIndex(tx kv.Tx, slot, index uint64) (common.Hash, error) { +func (r *HistoricalStatesReader) ReadRandaoMixBySlotAndIndex(tx kv.Tx, kvGetter state_accessors.GetValFn, slot, index uint64) (common.Hash, error) { epoch := slot / r.cfg.SlotsPerEpoch epochSubIndex := epoch % r.cfg.EpochsPerHistoricalVector if index == epochSubIndex { - intraRandaoMix, err := tx.GetOne(kv.IntraRandaoMixes, base_encoding.Encode64ToBytes4(slot)) + intraRandaoMix, err := kvGetter(kv.IntraRandaoMixes, base_encoding.Encode64ToBytes4(slot)) if err != nil { return common.Hash{}, err } @@ -908,7 +938,7 @@ func (r *HistoricalStatesReader) ReadRandaoMixBySlotAndIndex(tx kv.Tx, slot, ind if needFromGenesis { return r.genesisState.GetRandaoMixes(epoch), nil } - mixBytes, err := tx.GetOne(kv.RandaoMixes, base_encoding.Encode64ToBytes4(epochLookup*r.cfg.SlotsPerEpoch)) + mixBytes, err := kvGetter(kv.RandaoMixes, base_encoding.Encode64ToBytes4(epochLookup*r.cfg.SlotsPerEpoch)) if err != nil { return common.Hash{}, err } diff --git a/cl/persistence/state/historical_states_reader/historical_states_reader_test.go b/cl/persistence/state/historical_states_reader/historical_states_reader_test.go index 38151494504..290239143bd 100644 --- a/cl/persistence/state/historical_states_reader/historical_states_reader_test.go +++ b/cl/persistence/state/historical_states_reader/historical_states_reader_test.go @@ -41,7 +41,7 @@ func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postSt ctx := context.Background() vt := state_accessors.NewStaticValidatorTable() - a := antiquary.NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, reader, log.New(), true, true, true, false, nil) + a := antiquary.NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, nil, reader, log.New(), true, true, true, false, nil) require.NoError(t, a.IncrementBeaconState(ctx, blocks[len(blocks)-1].Block.Slot+33)) // Now lets test it against the reader tx, err := db.BeginRw(ctx) @@ -50,7 +50,7 @@ func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postSt vt = state_accessors.NewStaticValidatorTable() require.NoError(t, state_accessors.ReadValidatorsTable(tx, vt)) - hr := historical_states_reader.NewHistoricalStatesReader(&clparams.MainnetBeaconConfig, reader, vt, preState) + hr := historical_states_reader.NewHistoricalStatesReader(&clparams.MainnetBeaconConfig, reader, vt, preState, nil) s, err := hr.ReadHistoricalState(ctx, tx, blocks[len(blocks)-1].Block.Slot) require.NoError(t, err) diff --git a/cl/persistence/state/state_accessors.go b/cl/persistence/state/state_accessors.go index 6e9fdf7b6e6..05353687069 100644 --- a/cl/persistence/state/state_accessors.go +++ b/cl/persistence/state/state_accessors.go @@ -18,16 +18,33 @@ package state_accessors import ( "bytes" + "encoding/binary" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon/cl/cltypes" "github.com/erigontech/erigon/cl/cltypes/solid" "github.com/erigontech/erigon/cl/persistence/base_encoding" "github.com/erigontech/erigon/cl/phase1/core/state" + "github.com/erigontech/erigon/turbo/snapshotsync" libcommon "github.com/erigontech/erigon-lib/common" ) +type GetValFn func(table string, key []byte) ([]byte, error) + +func GetValFnTxAndSnapshot(tx kv.Tx, snapshotRoTx *snapshotsync.CaplinStateView) GetValFn { + return func(table string, key []byte) ([]byte, error) { + if snapshotRoTx != nil { + slot := uint64(binary.BigEndian.Uint32(key)) + segment, ok := snapshotRoTx.VisibleSegment(slot, table) + if ok { + return segment.Get(slot) + } + } + return tx.GetOne(table, key) + } +} + // InitializeValidatorTable initializes the validator table in the database. func InitializeStaticTables(tx kv.RwTx, state *state.CachingBeaconState) error { var err error @@ -164,9 +181,9 @@ func SetStateProcessingProgress(tx kv.RwTx, progress uint64) error { return tx.Put(kv.StatesProcessingProgress, kv.StatesProcessingKey, base_encoding.Encode64ToBytes4(progress)) } -func ReadSlotData(tx kv.Tx, slot uint64) (*SlotData, error) { +func ReadSlotData(getFn GetValFn, slot uint64) (*SlotData, error) { sd := &SlotData{} - v, err := tx.GetOne(kv.SlotData, base_encoding.Encode64ToBytes4(slot)) + v, err := getFn(kv.SlotData, base_encoding.Encode64ToBytes4(slot)) if err != nil { return nil, err } @@ -178,9 +195,9 @@ func ReadSlotData(tx kv.Tx, slot uint64) (*SlotData, error) { return sd, sd.ReadFrom(buf) } -func ReadEpochData(tx kv.Tx, slot uint64) (*EpochData, error) { +func ReadEpochData(getFn GetValFn, slot uint64) (*EpochData, error) { ed := &EpochData{} - v, err := tx.GetOne(kv.EpochData, base_encoding.Encode64ToBytes4(slot)) + v, err := getFn(kv.EpochData, base_encoding.Encode64ToBytes4(slot)) if err != nil { return nil, err } @@ -193,10 +210,10 @@ func ReadEpochData(tx kv.Tx, slot uint64) (*EpochData, error) { } // ReadCheckpoints reads the checkpoints from the database, Current, Previous and Finalized -func ReadCheckpoints(tx kv.Tx, slot uint64) (current solid.Checkpoint, previous solid.Checkpoint, finalized solid.Checkpoint, ok bool, err error) { +func ReadCheckpoints(getFn GetValFn, slot uint64) (current solid.Checkpoint, previous solid.Checkpoint, finalized solid.Checkpoint, ok bool, err error) { ed := &EpochData{} var v []byte - v, err = tx.GetOne(kv.EpochData, base_encoding.Encode64ToBytes4(slot)) + v, err = getFn(kv.EpochData, base_encoding.Encode64ToBytes4(slot)) if err != nil { return } @@ -212,8 +229,8 @@ func ReadCheckpoints(tx kv.Tx, slot uint64) (current solid.Checkpoint, previous } // ReadCheckpoints reads the checkpoints from the database, Current, Previous and Finalized -func ReadNextSyncCommittee(tx kv.Tx, slot uint64) (committee *solid.SyncCommittee, err error) { - v, err := tx.GetOne(kv.NextSyncCommittee, base_encoding.Encode64ToBytes4(slot)) +func ReadNextSyncCommittee(getFn GetValFn, slot uint64) (committee *solid.SyncCommittee, err error) { + v, err := getFn(kv.NextSyncCommittee, base_encoding.Encode64ToBytes4(slot)) if err != nil { return nil, err } @@ -226,8 +243,8 @@ func ReadNextSyncCommittee(tx kv.Tx, slot uint64) (committee *solid.SyncCommitte } // ReadCheckpoints reads the checkpoints from the database, Current, Previous and Finalized -func ReadCurrentSyncCommittee(tx kv.Tx, slot uint64) (committee *solid.SyncCommittee, err error) { - v, err := tx.GetOne(kv.CurrentSyncCommittee, base_encoding.Encode64ToBytes4(slot)) +func ReadCurrentSyncCommittee(getFn GetValFn, slot uint64) (committee *solid.SyncCommittee, err error) { + v, err := getFn(kv.CurrentSyncCommittee, base_encoding.Encode64ToBytes4(slot)) if err != nil { return nil, err } @@ -301,9 +318,9 @@ func ReadValidatorsTable(tx kv.Tx, out *StaticValidatorTable) error { return err } -func ReadActiveIndicies(tx kv.Tx, slot uint64) ([]uint64, error) { +func ReadActiveIndicies(getFn GetValFn, slot uint64) ([]uint64, error) { key := base_encoding.Encode64ToBytes4(slot) - v, err := tx.GetOne(kv.ActiveValidatorIndicies, key) + v, err := getFn(kv.ActiveValidatorIndicies, key) if err != nil { return nil, err } diff --git a/cl/persistence/state/validator_events.go b/cl/persistence/state/validator_events.go index bc5066f5f5e..ec469a68baa 100644 --- a/cl/persistence/state/validator_events.go +++ b/cl/persistence/state/validator_events.go @@ -50,6 +50,10 @@ func NewStateEvents() *StateEvents { return &StateEvents{} } +func NewStateEventsFromBytes(buf []byte) *StateEvents { + return &StateEvents{buf: libcommon.Copy(buf)} +} + func (se *StateEvents) AddValidator(validatorIndex uint64, validator solid.Validator) { se.mu.Lock() defer se.mu.Unlock() diff --git a/cl/sentinel/sentinel_requests_test.go b/cl/sentinel/sentinel_requests_test.go index cfd0fa65cd4..3fe6d03050d 100644 --- a/cl/sentinel/sentinel_requests_test.go +++ b/cl/sentinel/sentinel_requests_test.go @@ -55,7 +55,7 @@ func loadChain(t *testing.T) (db kv.RwDB, blocks []*cltypes.SignedBeaconBlock, f ctx := context.Background() vt := state_accessors.NewStaticValidatorTable() - a := antiquary.NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, reader, log.New(), true, true, false, false, nil) + a := antiquary.NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, nil, reader, log.New(), true, true, false, false, nil) require.NoError(t, a.IncrementBeaconState(ctx, blocks[len(blocks)-1].Block.Slot+33)) return } diff --git a/cmd/capcli/cli.go b/cmd/capcli/cli.go index 830226c658c..ff20788cf11 100644 --- a/cmd/capcli/cli.go +++ b/cmd/capcli/cli.go @@ -27,6 +27,7 @@ import ( "net/http" "net/url" "os" + "runtime" "strconv" "strings" "time" @@ -63,6 +64,7 @@ import ( "github.com/erigontech/erigon/eth/ethconfig" "github.com/erigontech/erigon/eth/ethconfig/estimate" "github.com/erigontech/erigon/turbo/debug" + "github.com/erigontech/erigon/turbo/snapshotsync" "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" ) @@ -80,6 +82,7 @@ var CLI struct { CheckBlobsSnapshots CheckBlobsSnapshots `cmd:"" help:"check blobs snapshots"` CheckBlobsSnapshotsCount CheckBlobsSnapshotsCount `cmd:"" help:"check blobs snapshots count"` DumpBlobsSnapshotsToStore DumpBlobsSnapshotsToStore `cmd:"" help:"dump blobs snapshots to store"` + DumpStateSnapshots DumpStateSnapshots `cmd:"" help:"dump state snapshots"` } type chainCfg struct { @@ -178,7 +181,7 @@ func (c *Chain) Run(ctx *Context) error { } downloader := network.NewBackwardBeaconDownloader(ctx, beacon, nil, nil, db) - cfg := stages.StageHistoryReconstruction(downloader, antiquary.NewAntiquary(ctx, nil, nil, nil, nil, dirs, nil, nil, nil, nil, nil, false, false, false, false, nil), csn, db, nil, beaconConfig, true, false, true, bRoot, bs.Slot(), "/tmp", 300*time.Millisecond, nil, nil, blobStorage, log.Root()) + cfg := stages.StageHistoryReconstruction(downloader, antiquary.NewAntiquary(ctx, nil, nil, nil, nil, dirs, nil, nil, nil, nil, nil, nil, false, false, false, false, nil), csn, db, nil, beaconConfig, true, false, true, bRoot, bs.Slot(), "/tmp", 300*time.Millisecond, nil, nil, blobStorage, log.Root()) return stages.SpawnStageHistoryDownload(cfg, ctx, log.Root()) } @@ -534,6 +537,7 @@ func (c *LoopSnapshots) Run(ctx *Context) error { type RetrieveHistoricalState struct { chainCfg outputFolder + withPPROF CompareFile string `help:"compare file" default:""` CompareSlot uint64 `help:"compare slot" default:"0"` Out string `help:"output file" default:""` @@ -579,7 +583,17 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error { return err } - hr := historical_states_reader.NewHistoricalStatesReader(beaconConfig, snr, vt, gSpot) + snTypes := snapshotsync.MakeCaplinStateSnapshotsTypes(db) + stateSn := snapshotsync.NewCaplinStateSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, snTypes, log.Root()) + if err := stateSn.OpenFolder(); err != nil { + return err + } + if _, err := antiquary.FillStaticValidatorsTableIfNeeded(ctx, log.Root(), stateSn, vt); err != nil { + return err + } + fmt.Println(vt.WithdrawableEpoch(0, 1)) + r.withPPROF.withProfile() + hr := historical_states_reader.NewHistoricalStatesReader(beaconConfig, snr, vt, gSpot, stateSn) start := time.Now() haveState, err := hr.ReadHistoricalState(ctx, tx, r.CompareSlot) if err != nil { @@ -635,11 +649,11 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error { return err } if hRoot != wRoot { - // for i := 0; i < haveState.PreviousEpochParticipation().Length(); i++ { - // if haveState.PreviousEpochParticipation().Get(i) != wantState.PreviousEpochParticipation().Get(i) { - // log.Info("Participation mismatch", "index", i, "have", haveState.PreviousEpochParticipation().Get(i), "want", wantState.PreviousEpochParticipation().Get(i)) - // } - // } + for i := 0; i < haveState.PreviousEpochParticipation().Length(); i++ { + if haveState.BlockRoots().Get(i) != wantState.BlockRoots().Get(i) { + log.Info("block roots mismatch", "index", i, "have", haveState.BlockRoots().Get(i), "want", wantState.BlockRoots().Get(i)) + } + } return fmt.Errorf("state mismatch: got %s, want %s", libcommon.Hash(hRoot), libcommon.Hash(wRoot)) } return nil @@ -1171,3 +1185,60 @@ func (c *DumpBlobsSnapshotsToStore) Run(ctx *Context) error { return nil } + +type DumpStateSnapshots struct { + chainCfg + outputFolder + To uint64 `name:"to" help:"slot to dump"` + StepSize uint64 `name:"step-size" help:"step size" default:"10000"` +} + +func (c *DumpStateSnapshots) Run(ctx *Context) error { + _, beaconConfig, _, err := clparams.GetConfigsByNetworkName(c.Chain) + if err != nil { + return err + } + log.Root().SetHandler(log.LvlFilterHandler(log.LvlDebug, log.StderrHandler)) + log.Info("Started chain download", "chain", c.Chain) + + dirs := datadir.New(c.Datadir) + log.Root().SetHandler(log.LvlFilterHandler(log.LvlInfo, log.StderrHandler)) + + db, _, err := caplin1.OpenCaplinDatabase(ctx, beaconConfig, nil, dirs.CaplinIndexing, dirs.CaplinBlobs, nil, false, 0) + if err != nil { + return err + } + var to uint64 + db.View(ctx, func(tx kv.Tx) (err error) { + if c.To == 0 { + to, err = state_accessors.GetStateProcessingProgress(tx) + return + } + to = c.To + return + }) + + salt, err := snaptype.GetIndexSalt(dirs.Snap) + + if err != nil { + return err + } + snTypes := snapshotsync.MakeCaplinStateSnapshotsTypes(db) + stateSn := snapshotsync.NewCaplinStateSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, snTypes, log.Root()) + if err := stateSn.OpenFolder(); err != nil { + return err + } + r, _ := stateSn.Get(kv.BlockRoot, 999424) + fmt.Printf("%x\n", r) + + if err := stateSn.DumpCaplinState(ctx, stateSn.BlocksAvailable(), to, c.StepSize, salt, dirs, runtime.NumCPU(), log.LvlInfo, log.Root()); err != nil { + return err + } + if err := stateSn.OpenFolder(); err != nil { + return err + } + r, _ = stateSn.Get(kv.BlockRoot, 999424) + fmt.Printf("%x\n", r) + + return nil +} diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 0d059a76f4b..31afd768acf 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -51,6 +51,7 @@ import ( "github.com/erigontech/erigon/cl/validator/validator_params" "github.com/erigontech/erigon/eth/ethconfig" "github.com/erigontech/erigon/params" + "github.com/erigontech/erigon/turbo/snapshotsync" "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" "github.com/spf13/afero" @@ -380,8 +381,11 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi return err } } - - antiq := antiquary.NewAntiquary(ctx, blobStorage, genesisState, vTables, beaconConfig, dirs, snDownloader, indexDB, csn, rcsn, logger, states, backfilling, blobBackfilling, config.SnapshotGenerationEnabled, snBuildSema) + stateSnapshots := snapshotsync.NewCaplinStateSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, snapshotsync.MakeCaplinStateSnapshotsTypes(indexDB), logger) + if err := stateSnapshots.OpenFolder(); err != nil { + return err + } + antiq := antiquary.NewAntiquary(ctx, blobStorage, genesisState, vTables, beaconConfig, dirs, snDownloader, indexDB, stateSnapshots, csn, rcsn, logger, states, backfilling, blobBackfilling, config.SnapshotGenerationEnabled, snBuildSema) // Create the antiquary go func() { if err := antiq.Loop(); err != nil { @@ -393,7 +397,7 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi return err } - statesReader := historical_states_reader.NewHistoricalStatesReader(beaconConfig, rcsn, vTables, genesisState) + statesReader := historical_states_reader.NewHistoricalStatesReader(beaconConfig, rcsn, vTables, genesisState, stateSnapshots) validatorParameters := validator_params.NewValidatorParams() if config.BeaconAPIRouter.Active { apiHandler := handler.NewApiHandler( @@ -428,6 +432,7 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi proposerSlashingService, option.builderClient, validatorMonitor, + stateSnapshots, true, ) go beacon.ListenAndServe(&beacon.LayeredBeaconHandler{ diff --git a/erigon-lib/common/datadir/dirs.go b/erigon-lib/common/datadir/dirs.go index 266625088f2..ca6b975552c 100644 --- a/erigon-lib/common/datadir/dirs.go +++ b/erigon-lib/common/datadir/dirs.go @@ -42,6 +42,7 @@ type Dirs struct { SnapHistory string SnapDomain string SnapAccessors string + SnapCaplin string Downloader string TxPool string Nodes string @@ -72,6 +73,7 @@ func New(datadir string) Dirs { SnapHistory: filepath.Join(datadir, "snapshots", "history"), SnapDomain: filepath.Join(datadir, "snapshots", "domain"), SnapAccessors: filepath.Join(datadir, "snapshots", "accessor"), + SnapCaplin: filepath.Join(datadir, "snapshots", "caplin"), Downloader: filepath.Join(datadir, "downloader"), TxPool: filepath.Join(datadir, "txpool"), Nodes: filepath.Join(datadir, "nodes"), @@ -82,7 +84,7 @@ func New(datadir string) Dirs { } dir.MustExist(dirs.Chaindata, dirs.Tmp, - dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors, + dirs.SnapIdx, dirs.SnapHistory, dirs.SnapDomain, dirs.SnapAccessors, dirs.SnapCaplin, dirs.Downloader, dirs.TxPool, dirs.Nodes, dirs.CaplinBlobs, dirs.CaplinIndexing, dirs.CaplinLatest, dirs.CaplinGenesis) return dirs } diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 6fb5eb1cbfc..e089097d0a5 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -2645,14 +2645,21 @@ func SeedableFiles(dirs datadir.Dirs, chainName string, all bool) ([]string, err if err != nil { return nil, err } - var l4 []string + var l4, l5 []string if all { l4, err = seedableStateFilesBySubDir(dirs.Snap, "accessor", all) if err != nil { return nil, err } } - files = append(append(append(append(files, l1...), l2...), l3...), l4...) + // check if dirs.SnapCaplin exists + if _, err := os.Stat(dirs.SnapCaplin); !os.IsNotExist(err) { + l5, err = seedableSegmentFiles(dirs.SnapCaplin, chainName, all) + if err != nil { + return nil, err + } + } + files = append(append(append(append(append(files, l1...), l2...), l3...), l4...), l5...) return files, nil } diff --git a/erigon-lib/downloader/snaptype/files.go b/erigon-lib/downloader/snaptype/files.go index 26cebf23e70..4ac92207dc2 100644 --- a/erigon-lib/downloader/snaptype/files.go +++ b/erigon-lib/downloader/snaptype/files.go @@ -152,6 +152,8 @@ func parseFileName(dir, fileName string) (res FileInfo, ok bool) { return } res.To = to * 1_000 + res.TypeString = parts[3] + res.Type, ok = ParseFileType(parts[3]) if !ok { return res, ok @@ -243,6 +245,7 @@ type FileInfo struct { From, To uint64 name, Path, Ext string Type Type + TypeString string // This is for giulio's generic snapshots } func (f FileInfo) TorrentFileExists() (bool, error) { return dir.FileExist(f.Path + ".torrent") } diff --git a/erigon-lib/downloader/snaptype/type.go b/erigon-lib/downloader/snaptype/type.go index 70dda5e6a99..a5a3244dd93 100644 --- a/erigon-lib/downloader/snaptype/type.go +++ b/erigon-lib/downloader/snaptype/type.go @@ -484,6 +484,67 @@ func BuildIndex(ctx context.Context, info FileInfo, cfg recsplit.RecSplitArgs, l } } +func BuildIndexWithSnapName(ctx context.Context, info FileInfo, cfg recsplit.RecSplitArgs, lvl log.Lvl, p *background.Progress, walker func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error, logger log.Logger) (err error) { + defer func() { + if rec := recover(); rec != nil { + err = fmt.Errorf("index panic: at=%s, %v, %s", info.Name(), rec, dbg.Stack()) + } + }() + + d, err := seg.NewDecompressor(info.Path) + if err != nil { + return fmt.Errorf("can't open %s for indexing: %w", info.Name(), err) + } + defer d.Close() + + if p != nil { + fname := info.Name() + p.Name.Store(&fname) + p.Total.Store(uint64(d.Count())) + } + cfg.KeyCount = d.Count() + cfg.IndexFile = filepath.Join(info.Dir(), strings.ReplaceAll(info.name, ".seg", ".idx")) + rs, err := recsplit.NewRecSplit(cfg, logger) + if err != nil { + return err + } + rs.LogLvl(lvl) + + defer d.EnableReadAhead().DisableReadAhead() + + for { + g := d.MakeGetter() + var i, offset, nextPos uint64 + word := make([]byte, 0, 4096) + + for g.HasNext() { + word, nextPos = g.Next(word[:0]) + if err := walker(rs, i, offset, word); err != nil { + return err + } + i++ + offset = nextPos + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } + + if err = rs.Build(ctx); err != nil { + if errors.Is(err, recsplit.ErrCollision) { + logger.Info("Building recsplit. Collision happened. It's ok. Restarting with another salt...", "err", err) + rs.ResetNextSalt() + continue + } + return err + } + + return nil + } +} + func ExtractRange(ctx context.Context, f FileInfo, extractor RangeExtractor, indexBuilder IndexBuilder, firstKey FirstKeyGetter, chainDB kv.RoDB, chainConfig *chain.Config, tmpDir string, workers int, lvl log.Lvl, logger log.Logger) (uint64, error) { var lastKeyValue uint64 diff --git a/erigon-lib/downloader/util.go b/erigon-lib/downloader/util.go index 5dc141ddbca..3887820a23a 100644 --- a/erigon-lib/downloader/util.go +++ b/erigon-lib/downloader/util.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "runtime" "strings" @@ -79,8 +80,12 @@ func seedableSegmentFiles(dir string, chainName string, skipSeedableCheck bool) res := make([]string, 0, len(files)) for _, fPath := range files { - _, name := filepath.Split(fPath) + // A bit hacky but whatever... basically caplin is incompatible with enums. + if strings.Contains(dir, "caplin") { + res = append(res, path.Join("caplin", name)) + continue + } if !skipSeedableCheck && !snaptype.IsCorrectFileName(name) { continue } diff --git a/turbo/snapshotsync/caplin_state_snapshots.go b/turbo/snapshotsync/caplin_state_snapshots.go new file mode 100644 index 00000000000..f5c75568dcc --- /dev/null +++ b/turbo/snapshotsync/caplin_state_snapshots.go @@ -0,0 +1,712 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package snapshotsync + +import ( + "context" + "encoding/binary" + "errors" + "fmt" + "math" + "os" + "path/filepath" + "runtime/debug" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/tidwall/btree" + + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon-lib/recsplit" + + "github.com/erigontech/erigon-lib/chain/snapcfg" + libcommon "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/common/background" + "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/downloader/snaptype" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/seg" + + "github.com/erigontech/erigon/cl/clparams" + "github.com/erigontech/erigon/cl/persistence/base_encoding" + "github.com/erigontech/erigon/eth/ethconfig" +) + +func BeaconSimpleIdx(ctx context.Context, sn snaptype.FileInfo, salt uint32, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { + num := make([]byte, binary.MaxVarintLen64) + cfg := recsplit.RecSplitArgs{ + Enums: true, + BucketSize: 2000, + LeafSize: 8, + TmpDir: tmpDir, + Salt: &salt, + BaseDataID: sn.From, + } + if err := snaptype.BuildIndex(ctx, sn, cfg, log.LvlDebug, p, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { + if i%20_000 == 0 { + logger.Log(lvl, "Generating idx for "+sn.Type.Name(), "progress", i) + } + p.Processed.Add(1) + n := binary.PutUvarint(num, i) + if err := idx.AddKey(num[:n], offset); err != nil { + return err + } + return nil + }, logger); err != nil { + return fmt.Errorf("idx: %w", err) + } + + return nil +} + +func getKvGetterForStateTable(db kv.RoDB, tableName string) KeyValueGetter { + return func(numId uint64) ([]byte, []byte, error) { + var key, value []byte + var err error + if err := db.View(context.TODO(), func(tx kv.Tx) error { + key = base_encoding.Encode64ToBytes4(numId) + value, err = tx.GetOne(tableName, base_encoding.Encode64ToBytes4(numId)) + value = libcommon.Copy(value) + return err + }); err != nil { + return nil, nil, err + } + return key, value, nil + } +} + +func MakeCaplinStateSnapshotsTypes(db kv.RoDB) SnapshotTypes { + return SnapshotTypes{ + KeyValueGetters: map[string]KeyValueGetter{ + kv.ValidatorEffectiveBalance: getKvGetterForStateTable(db, kv.ValidatorEffectiveBalance), + kv.ValidatorSlashings: getKvGetterForStateTable(db, kv.ValidatorSlashings), + kv.ValidatorBalance: getKvGetterForStateTable(db, kv.ValidatorBalance), + kv.StateEvents: getKvGetterForStateTable(db, kv.StateEvents), + kv.ActiveValidatorIndicies: getKvGetterForStateTable(db, kv.ActiveValidatorIndicies), + kv.StateRoot: getKvGetterForStateTable(db, kv.StateRoot), + kv.BlockRoot: getKvGetterForStateTable(db, kv.BlockRoot), + kv.SlotData: getKvGetterForStateTable(db, kv.SlotData), + kv.EpochData: getKvGetterForStateTable(db, kv.EpochData), + kv.InactivityScores: getKvGetterForStateTable(db, kv.InactivityScores), + kv.NextSyncCommittee: getKvGetterForStateTable(db, kv.NextSyncCommittee), + kv.CurrentSyncCommittee: getKvGetterForStateTable(db, kv.CurrentSyncCommittee), + kv.Eth1DataVotes: getKvGetterForStateTable(db, kv.Eth1DataVotes), + kv.IntraRandaoMixes: getKvGetterForStateTable(db, kv.IntraRandaoMixes), + kv.RandaoMixes: getKvGetterForStateTable(db, kv.RandaoMixes), + kv.Proposers: getKvGetterForStateTable(db, kv.Proposers), + kv.BalancesDump: getKvGetterForStateTable(db, kv.BalancesDump), + kv.EffectiveBalancesDump: getKvGetterForStateTable(db, kv.EffectiveBalancesDump), + }, + Compression: map[string]bool{}, + } +} + +// value: chunked(ssz(SignedBeaconBlocks)) +// slot -> beacon_slot_segment_offset + +type CaplinStateSnapshots struct { + indicesReady atomic.Bool + segmentsReady atomic.Bool + + Salt uint32 + + dirtySegmentsLock sync.RWMutex + visibleSegmentsLock sync.RWMutex + + // BeaconBlocks *segments + // BlobSidecars *segments + // Segments map[string]*segments + dirtyLock sync.RWMutex // guards `dirty` field + dirty map[string]*btree.BTreeG[*DirtySegment] // ordered map `type.Enum()` -> DirtySegments + + visibleLock sync.RWMutex // guards `visible` field + visible map[string]VisibleSegments // ordered map `type.Enum()` -> VisbileSegments + + snapshotTypes SnapshotTypes + + dir string + tmpdir string + segmentsMax atomic.Uint64 // all types of .seg files are available - up to this number + idxMax atomic.Uint64 // all types of .idx files are available - up to this number + cfg ethconfig.BlocksFreezing + logger log.Logger + // allows for pruning segments - this is the min availible segment + segmentsMin atomic.Uint64 + // chain cfg + beaconCfg *clparams.BeaconChainConfig +} + +type KeyValueGetter func(numId uint64) ([]byte, []byte, error) + +type SnapshotTypes struct { + KeyValueGetters map[string]KeyValueGetter + Compression map[string]bool +} + +// NewCaplinStateSnapshots - opens all snapshots. But to simplify everything: +// - it opens snapshots only on App start and immutable after +// - all snapshots of given blocks range must exist - to make this blocks range available +// - gaps are not allowed +// - segment have [from:to) semantic +func NewCaplinStateSnapshots(cfg ethconfig.BlocksFreezing, beaconCfg *clparams.BeaconChainConfig, dirs datadir.Dirs, snapshotTypes SnapshotTypes, logger log.Logger) *CaplinStateSnapshots { + // BeaconBlocks := &segments{ + // DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), + // } + // BlobSidecars := &segments{ + // DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), + // } + // Segments := make(map[string]*segments) + // for k := range snapshotTypes.KeyValueGetters { + // Segments[k] = &segments{ + // DirtySegments: btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}), + // } + // } + dirty := make(map[string]*btree.BTreeG[*DirtySegment]) + for k := range snapshotTypes.KeyValueGetters { + dirty[k] = btree.NewBTreeGOptions[*DirtySegment](DirtySegmentLess, btree.Options{Degree: 128, NoLocks: false}) + } + visible := make(map[string]VisibleSegments) + for k := range snapshotTypes.KeyValueGetters { + visible[k] = make(VisibleSegments, 0) + } + c := &CaplinStateSnapshots{snapshotTypes: snapshotTypes, dir: dirs.SnapCaplin, tmpdir: dirs.Tmp, cfg: cfg, visible: visible, dirty: dirty, logger: logger, beaconCfg: beaconCfg} + c.recalcVisibleFiles() + return c +} + +func (s *CaplinStateSnapshots) IndicesMax() uint64 { return s.idxMax.Load() } +func (s *CaplinStateSnapshots) SegmentsMax() uint64 { return s.segmentsMax.Load() } + +func (s *CaplinStateSnapshots) LogStat(str string) { + s.logger.Info(fmt.Sprintf("[snapshots:%s] Stat", str), + "blocks", libcommon.PrettyCounter(s.SegmentsMax()+1), "indices", libcommon.PrettyCounter(s.IndicesMax()+1)) +} + +func (s *CaplinStateSnapshots) LS() { + if s == nil { + return + } + view := s.View() + defer view.Close() + + for _, roTx := range view.roTxs { + if roTx != nil { + for _, seg := range roTx.Segments { + s.logger.Info("[agg] ", "f", seg.src.filePath, "words", seg.src.Decompressor.Count()) + } + } + } +} + +func (s *CaplinStateSnapshots) SegFileNames(from, to uint64) []string { + view := s.View() + defer view.Close() + + var res []string + + for _, roTx := range view.roTxs { + if roTx == nil { + continue + } + for _, seg := range roTx.Segments { + if seg.from >= to || seg.to <= from { + continue + } + res = append(res, seg.src.filePath) + } + + } + return res +} + +func (s *CaplinStateSnapshots) BlocksAvailable() uint64 { + return min(s.segmentsMax.Load(), s.idxMax.Load()) +} + +func (s *CaplinStateSnapshots) Close() { + if s == nil { + return + } + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() + + s.closeWhatNotInList(nil) +} + +func (s *CaplinStateSnapshots) openSegIfNeed(sn *DirtySegment, filepath string) error { + if sn.Decompressor != nil { + return nil + } + var err error + sn.Decompressor, err = seg.NewDecompressor(filepath) + if err != nil { + return fmt.Errorf("%w, fileName: %s", err, filepath) + } + return nil +} + +// OpenList stops on optimistic=false, continue opening files on optimistic=true +func (s *CaplinStateSnapshots) OpenList(fileNames []string, optimistic bool) error { + defer s.recalcVisibleFiles() + + s.dirtySegmentsLock.Lock() + defer s.dirtySegmentsLock.Unlock() + + s.closeWhatNotInList(fileNames) + var segmentsMax uint64 + var segmentsMaxSet bool +Loop: + for _, fName := range fileNames { + f, _, _ := snaptype.ParseFileName(s.dir, fName) + + var processed bool = true + var exists bool + var sn *DirtySegment + + dirtySegments, ok := s.dirty[f.TypeString] + if !ok { + continue + } + filePath := filepath.Join(s.dir, fName) + dirtySegments.Walk(func(segments []*DirtySegment) bool { + for _, sn2 := range segments { + if sn2.Decompressor == nil { // it's ok if some segment was not able to open + continue + } + if filePath == sn2.filePath { + sn = sn2 + exists = true + break + } + } + return true + }) + if !exists { + sn = &DirtySegment{ + // segType: f.Type, Unsupported + version: f.Version, + Range: Range{f.From, f.To}, + frozen: snapcfg.IsFrozen(s.cfg.ChainName, f), + filePath: filePath, + } + } + if err := s.openSegIfNeed(sn, filePath); err != nil { + if errors.Is(err, os.ErrNotExist) { + if optimistic { + continue Loop + } else { + break Loop + } + } + if optimistic { + s.logger.Warn("[snapshots] open segment", "err", err) + continue Loop + } else { + return err + } + } + + if !exists { + // it's possible to iterate over .seg file even if you don't have index + // then make segment available even if index open may fail + dirtySegments.Set(sn) + } + if err := openIdxForCaplinStateIfNeeded(sn, filePath, optimistic); err != nil { + return err + } + // Only bob sidecars count for progression + if processed { + if f.To > 0 { + segmentsMax = f.To - 1 + } else { + segmentsMax = 0 + } + segmentsMaxSet = true + } + } + + if segmentsMaxSet { + s.segmentsMax.Store(segmentsMax) + } + s.segmentsReady.Store(true) + return nil +} + +func openIdxForCaplinStateIfNeeded(s *DirtySegment, filePath string, optimistic bool) error { + if s.Decompressor == nil { + return nil + } + err := openIdxIfNeedForCaplinState(s, filePath) + if err != nil { + if !errors.Is(err, os.ErrNotExist) { + if optimistic { + log.Warn("[snapshots] open index", "err", err) + } else { + return err + } + } + } + + return nil +} + +func openIdxIfNeedForCaplinState(s *DirtySegment, filePath string) (err error) { + s.indexes = make([]*recsplit.Index, 1) + if s.indexes[0] != nil { + return nil + } + + filePath = strings.ReplaceAll(filePath, ".seg", ".idx") + index, err := recsplit.OpenIndex(filePath) + if err != nil { + return fmt.Errorf("%w, fileName: %s", err, filePath) + } + + s.indexes[0] = index + + return nil +} + +func isIndexed(s *DirtySegment) bool { + if s.Decompressor == nil { + return false + } + + for _, idx := range s.indexes { + if idx == nil { + return false + } + } + return true +} + +func (s *CaplinStateSnapshots) recalcVisibleFiles() { + defer func() { + s.idxMax.Store(s.idxAvailability()) + s.indicesReady.Store(true) + }() + + s.visibleLock.Lock() + defer s.visibleLock.Unlock() + + getNewVisibleSegments := func(dirtySegments *btree.BTreeG[*DirtySegment]) []*VisibleSegment { + newVisibleSegments := make([]*VisibleSegment, 0, dirtySegments.Len()) + dirtySegments.Walk(func(segments []*DirtySegment) bool { + for _, sn := range segments { + if sn.canDelete.Load() { + continue + } + if !isIndexed(sn) { + continue + } + for len(newVisibleSegments) > 0 && newVisibleSegments[len(newVisibleSegments)-1].src.isSubSetOf(sn) { + newVisibleSegments[len(newVisibleSegments)-1].src = nil + newVisibleSegments = newVisibleSegments[:len(newVisibleSegments)-1] + } + newVisibleSegments = append(newVisibleSegments, &VisibleSegment{ + Range: sn.Range, + segType: sn.segType, + src: sn, + }) + } + return true + }) + return newVisibleSegments + } + + for k := range s.visible { + s.visible[k] = getNewVisibleSegments(s.dirty[k]) + } +} + +func (s *CaplinStateSnapshots) idxAvailability() uint64 { + s.visibleLock.RLock() + defer s.visibleLock.RUnlock() + + min := uint64(math.MaxUint64) + for _, segs := range s.visible { + if len(segs) == 0 { + return 0 + } + if segs[len(segs)-1].to < min { + min = segs[len(segs)-1].to + } + } + if min == math.MaxUint64 { + return 0 + } + return min +} + +func listAllSegFilesInDir(dir string) []string { + files, err := os.ReadDir(dir) + if err != nil { + panic(err) + } + list := make([]string, 0, len(files)) + for _, f := range files { + if f.IsDir() { + continue + } + // check if it's a .seg file + if filepath.Ext(f.Name()) != ".seg" { + continue + } + list = append(list, f.Name()) + } + return list +} + +func (s *CaplinStateSnapshots) OpenFolder() error { + return s.OpenList(listAllSegFilesInDir(s.dir), false) +} + +func (s *CaplinStateSnapshots) closeWhatNotInList(l []string) { + protectFiles := make(map[string]struct{}, len(l)) + for _, fName := range l { + protectFiles[fName] = struct{}{} + } + + for _, dirtySegments := range s.dirty { + toClose := make([]*DirtySegment, 0) + dirtySegments.Walk(func(segments []*DirtySegment) bool { + for _, sn := range segments { + if sn.Decompressor == nil { + continue + } + _, name := filepath.Split(sn.FilePath()) + if _, ok := protectFiles[name]; ok { + continue + } + toClose = append(toClose, sn) + } + return true + }) + for _, sn := range toClose { + sn.close() + dirtySegments.Delete(sn) + } + } +} + +type CaplinStateView struct { + s *CaplinStateSnapshots + roTxs map[string]*RoTx + closed bool +} + +func (s *CaplinStateSnapshots) View() *CaplinStateView { + if s == nil { + return nil + } + s.visibleSegmentsLock.RLock() + defer s.visibleSegmentsLock.RUnlock() + + v := &CaplinStateView{s: s, roTxs: make(map[string]*RoTx)} + // BeginRo increments refcount - which is contended + s.dirtySegmentsLock.RLock() + defer s.dirtySegmentsLock.RUnlock() + + for k, segments := range s.visible { + v.roTxs[k] = segments.BeginRo() + } + return v +} + +func (v *CaplinStateView) Close() { + if v == nil { + return + } + if v.closed { + return + } + for _, segments := range v.roTxs { + segments.Close() + } + v.s = nil + v.closed = true +} + +func (v *CaplinStateView) VisibleSegments(tbl string) []*VisibleSegment { + if v.s == nil || v.s.visible[tbl] == nil { + return nil + } + return v.s.visible[tbl] +} + +func (v *CaplinStateView) VisibleSegment(slot uint64, tbl string) (*VisibleSegment, bool) { + for _, seg := range v.VisibleSegments(tbl) { + if !(slot >= seg.from && slot < seg.to) { + continue + } + return seg, true + } + return nil, false +} + +func dumpCaplinState(ctx context.Context, snapName string, kvGetter KeyValueGetter, fromSlot uint64, toSlot, blocksPerFile uint64, salt uint32, dirs datadir.Dirs, workers int, lvl log.Lvl, logger log.Logger, compress bool) error { + tmpDir, snapDir := dirs.Tmp, dirs.SnapCaplin + + segName := snaptype.BeaconBlocks.FileName(0, fromSlot, toSlot) + // a little bit ugly. + segName = strings.ReplaceAll(segName, "beaconblocks", snapName) + f, _, _ := snaptype.ParseFileName(snapDir, segName) + + compressCfg := seg.DefaultCfg + compressCfg.Workers = workers + sn, err := seg.NewCompressor(ctx, "Snapshots "+snapName, f.Path, tmpDir, compressCfg, lvl, logger) + if err != nil { + return err + } + defer sn.Close() + + // Generate .seg file, which is just the list of beacon blocks. + for i := fromSlot; i < toSlot; i++ { + // read root. + _, dump, err := kvGetter(i) + if err != nil { + return err + } + if i%20_000 == 0 { + logger.Log(lvl, "Dumping "+snapName, "progress", i) + } + if compress { + if err := sn.AddWord(dump); err != nil { + return err + } + } else { + if err := sn.AddUncompressedWord(dump); err != nil { + return err + } + } + } + if sn.Count() != int(blocksPerFile) { + return fmt.Errorf("expected %d blocks, got %d", blocksPerFile, sn.Count()) + } + if err := sn.Compress(); err != nil { + return err + } + // Generate .idx file, which is the slot => offset mapping. + p := &background.Progress{} + + // Ugly hack to wait for fsync + time.Sleep(15 * time.Second) + + return simpleIdx(ctx, f, salt, tmpDir, p, lvl, logger) +} + +func simpleIdx(ctx context.Context, sn snaptype.FileInfo, salt uint32, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { + num := make([]byte, binary.MaxVarintLen64) + cfg := recsplit.RecSplitArgs{ + Enums: true, + BucketSize: 2000, + LeafSize: 8, + TmpDir: tmpDir, + Salt: &salt, + BaseDataID: sn.From, + } + if err := snaptype.BuildIndexWithSnapName(ctx, sn, cfg, log.LvlDebug, p, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { + if i%20_000 == 0 { + logger.Log(lvl, "Generating idx for "+sn.Name(), "progress", i) + } + p.Processed.Add(1) + n := binary.PutUvarint(num, i) + if err := idx.AddKey(num[:n], offset); err != nil { + return err + } + return nil + }, logger); err != nil { + return fmt.Errorf("idx: %w", err) + } + + return nil +} + +func (s *CaplinStateSnapshots) DumpCaplinState(ctx context.Context, fromSlot, toSlot, blocksPerFile uint64, salt uint32, dirs datadir.Dirs, workers int, lvl log.Lvl, logger log.Logger) error { + fromSlot = (fromSlot / blocksPerFile) * blocksPerFile + toSlot = (toSlot / blocksPerFile) * blocksPerFile + for snapName, kvGetter := range s.snapshotTypes.KeyValueGetters { + for i := fromSlot; i < toSlot; i += blocksPerFile { + if toSlot-i < blocksPerFile { + break + } + // keep beaconblocks here but whatever.... + to := i + blocksPerFile + logger.Log(lvl, "Dumping "+snapName, "from", i, "to", to) + if err := dumpCaplinState(ctx, snapName, kvGetter, i, to, blocksPerFile, salt, dirs, workers, lvl, logger, s.snapshotTypes.Compression[snapName]); err != nil { + return err + } + } + } + return nil +} + +func (s *CaplinStateSnapshots) BuildMissingIndices(ctx context.Context, logger log.Logger) error { + if s == nil { + return nil + } + // if !s.segmentsReady.Load() { + // return fmt.Errorf("not all snapshot segments are available") + // } + + // wait for Downloader service to download all expected snapshots + segments, _, err := SegmentsCaplin(s.dir, 0) + if err != nil { + return err + } + noneDone := true + for index := range segments { + segment := segments[index] + // The same slot=>offset mapping is used for both beacon blocks and blob sidecars. + if segment.Type.Enum() != snaptype.CaplinEnums.BeaconBlocks && segment.Type.Enum() != snaptype.CaplinEnums.BlobSidecars { + continue + } + if segment.Type.HasIndexFiles(segment, logger) { + continue + } + p := &background.Progress{} + noneDone = false + if err := BeaconSimpleIdx(ctx, segment, s.Salt, s.tmpdir, p, log.LvlDebug, logger); err != nil { + return err + } + } + if noneDone { + return nil + } + + return s.OpenFolder() +} + +func (s *CaplinStateSnapshots) Get(tbl string, slot uint64) ([]byte, error) { + defer func() { + if rec := recover(); rec != nil { + panic(fmt.Sprintf("Get(%s, %d), %s, %s\n", tbl, slot, rec, debug.Stack())) + } + }() + + view := s.View() + defer view.Close() + + seg, ok := view.VisibleSegment(slot, tbl) + if !ok { + return nil, nil + } + + return seg.Get(slot) +} diff --git a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go index 5145e335a24..2f37fb583bd 100644 --- a/turbo/snapshotsync/freezeblocks/caplin_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/caplin_snapshots.go @@ -19,7 +19,6 @@ package freezeblocks import ( "bytes" "context" - "encoding/binary" "errors" "fmt" "math" @@ -41,7 +40,6 @@ import ( "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/kv/dbutils" "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon-lib/recsplit" "github.com/erigontech/erigon-lib/seg" "github.com/erigontech/erigon/cl/clparams" @@ -55,33 +53,6 @@ import ( var sidecarSSZSize = (&cltypes.BlobSidecar{}).EncodingSizeSSZ() -func BeaconSimpleIdx(ctx context.Context, sn snaptype.FileInfo, salt uint32, tmpDir string, p *background.Progress, lvl log.Lvl, logger log.Logger) (err error) { - num := make([]byte, binary.MaxVarintLen64) - cfg := recsplit.RecSplitArgs{ - Enums: true, - BucketSize: 2000, - LeafSize: 8, - TmpDir: tmpDir, - Salt: &salt, - BaseDataID: sn.From, - } - if err := snaptype.BuildIndex(ctx, sn, cfg, log.LvlDebug, p, func(idx *recsplit.RecSplit, i, offset uint64, word []byte) error { - if i%20_000 == 0 { - logger.Log(lvl, "Generating idx for "+sn.Type.Name(), "progress", i) - } - p.Processed.Add(1) - n := binary.PutUvarint(num, i) - if err := idx.AddKey(num[:n], offset); err != nil { - return err - } - return nil - }, logger); err != nil { - return fmt.Errorf("idx: %w", err) - } - - return nil -} - // value: chunked(ssz(SignedBeaconBlocks)) // slot -> beacon_slot_segment_offset @@ -515,7 +486,7 @@ func dumpBeaconBlocksRange(ctx context.Context, db kv.RoDB, fromSlot uint64, toS // Ugly hack to wait for fsync time.Sleep(15 * time.Second) - return BeaconSimpleIdx(ctx, f, salt, tmpDir, p, lvl, logger) + return snapshotsync.BeaconSimpleIdx(ctx, f, salt, tmpDir, p, lvl, logger) } func DumpBlobSidecarsRange(ctx context.Context, db kv.RoDB, storage blob_storage.BlobStorage, fromSlot uint64, toSlot uint64, salt uint32, dirs datadir.Dirs, workers int, blobCountFn BlobCountBySlotFn, lvl log.Lvl, logger log.Logger) error { @@ -601,7 +572,7 @@ func DumpBlobSidecarsRange(ctx context.Context, db kv.RoDB, storage blob_storage // Generate .idx file, which is the slot => offset mapping. p := &background.Progress{} - return BeaconSimpleIdx(ctx, f, salt, tmpDir, p, lvl, logger) + return snapshotsync.BeaconSimpleIdx(ctx, f, salt, tmpDir, p, lvl, logger) } func DumpBeaconBlocks(ctx context.Context, db kv.RoDB, fromSlot, toSlot uint64, salt uint32, dirs datadir.Dirs, workers int, lvl log.Lvl, logger log.Logger) error { @@ -665,7 +636,7 @@ func (s *CaplinSnapshots) BuildMissingIndices(ctx context.Context, logger log.Lo } p := &background.Progress{} noneDone = false - if err := BeaconSimpleIdx(ctx, segment, s.Salt, s.tmpdir, p, log.LvlDebug, logger); err != nil { + if err := snapshotsync.BeaconSimpleIdx(ctx, segment, s.Salt, s.tmpdir, p, log.LvlDebug, logger); err != nil { return err } } diff --git a/turbo/snapshotsync/snapshots.go b/turbo/snapshotsync/snapshots.go index da86afbc070..cc55fb8346a 100644 --- a/turbo/snapshotsync/snapshots.go +++ b/turbo/snapshotsync/snapshots.go @@ -249,6 +249,9 @@ type DirtySegment struct { refcount atomic.Int32 canDelete atomic.Bool + + // only caplin state + filePath string } func NewDirtySegment(segType snaptype.Type, version snaptype.Version, from uint64, to uint64, frozen bool) *DirtySegment { @@ -274,6 +277,28 @@ func (s *VisibleSegment) IsIndexed() bool { return s.src.IsIndexed() } +func (v *VisibleSegment) Get(globalId uint64) ([]byte, error) { + idxSlot := v.src.Index() + + if idxSlot == nil { + return nil, nil + } + blockOffset := idxSlot.OrdinalLookup(globalId - idxSlot.BaseDataID()) + + gg := v.src.MakeGetter() + gg.Reset(blockOffset) + if !gg.HasNext() { + return nil, nil + } + var buf []byte + buf, _ = gg.Next(buf) + if len(buf) == 0 { + return nil, nil + } + + return buf, nil +} + func DirtySegmentLess(i, j *DirtySegment) bool { if i.from != j.from { return i.from < j.from