Skip to content

Commit

Permalink
Caplin Optimization: Efficient Processing of Epoch and Reuse of check…
Browse files Browse the repository at this point in the history
…point states (#8020)
  • Loading branch information
Giulio2002 authored Aug 15, 2023
1 parent 1867d93 commit 114dd68
Show file tree
Hide file tree
Showing 16 changed files with 326 additions and 97 deletions.
3 changes: 2 additions & 1 deletion cl/phase1/core/state/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package state
import (
"crypto/sha256"
"encoding/binary"

"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/phase1/core/state/lru"
"github.com/ledgerwatch/erigon/cl/phase1/core/state/raw"
Expand Down Expand Up @@ -30,7 +31,6 @@ type CachingBeaconState struct {
totalActiveBalanceRootCache uint64
proposerIndex *uint64
previousStateRoot common.Hash
// Configs
}

func New(cfg *clparams.BeaconChainConfig) *CachingBeaconState {
Expand Down Expand Up @@ -215,6 +215,7 @@ func (b *CachingBeaconState) initBeaconState() error {

b.ForEachValidator(func(validator solid.Validator, i, total int) bool {
b.publicKeyIndicies[validator.PublicKey()] = uint64(i)

return true
})

Expand Down
41 changes: 29 additions & 12 deletions cl/phase1/forkchoice/fork_graph/fork_graph.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fork_graph

import (
"fmt"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
Expand Down Expand Up @@ -134,7 +136,7 @@ func (f *ForkGraph) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock, full
return nil, InvalidBlock, nil
}

newState, didLongRecconnection, err := f.GetState(block.ParentRoot, false)
newState, didLongRecconnection, err := f.GetState(block.ParentRoot, nil)
if err != nil {
return nil, InvalidBlock, err
}
Expand Down Expand Up @@ -209,7 +211,7 @@ func (f *ForkGraph) getBlock(blockRoot libcommon.Hash) (*cltypes.SignedBeaconBlo
return obj, has
}

func (f *ForkGraph) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*state.CachingBeaconState, bool, error) {
func (f *ForkGraph) GetState(blockRoot libcommon.Hash, recipient *state.CachingBeaconState) (*state.CachingBeaconState, bool, error) {
// collect all blocks beetwen greatest extending node path and block.
blocksInTheWay := []*cltypes.SignedBeaconBlock{}
// Use the parent root as a reverse iterator.
Expand All @@ -235,25 +237,40 @@ func (f *ForkGraph) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*state.
currentIteratorRoot = block.Block.ParentRoot
}

var copyReferencedState *state.CachingBeaconState
copyReferencedState := recipient
didLongRecconnection := currentIteratorRoot == reconnectionRootLong && reconnectionRootLong != reconnectionRootShort
if f.currentStateBlockRoot == blockRoot {
if alwaysCopy {
s, err := f.currentState.Copy()
return s, didLongRecconnection, err
if recipient != nil {
err := f.currentState.CopyInto(recipient)
return recipient, didLongRecconnection, err
}
return f.currentState, didLongRecconnection, nil
}
// Take a copy to the reference state.
if currentIteratorRoot == reconnectionRootLong {
copyReferencedState, err = f.currentReferenceState.Copy()
if err != nil {
return nil, true, err
if copyReferencedState == nil || f.currentReferenceState == nil {
copyReferencedState, err = f.currentReferenceState.Copy()
if err != nil {
return nil, true, err
}
} else {
fmt.Println(copyReferencedState)
err = f.currentReferenceState.CopyInto(copyReferencedState)
if err != nil {
return nil, true, err
}
}
} else {
copyReferencedState, err = f.nextReferenceState.Copy()
if err != nil {
return nil, false, err
if copyReferencedState == nil || f.nextReferenceState == nil {
copyReferencedState, err = f.nextReferenceState.Copy()
if err != nil {
return nil, false, err
}
} else {
err = f.nextReferenceState.CopyInto(copyReferencedState)
if err != nil {
return nil, true, err
}
}
}

Expand Down
9 changes: 9 additions & 0 deletions cl/phase1/forkchoice/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ledgerwatch/erigon/cl/cltypes/solid"
"github.com/ledgerwatch/erigon/cl/freezer"
"github.com/ledgerwatch/erigon/cl/phase1/core/state"
state2 "github.com/ledgerwatch/erigon/cl/phase1/core/state"
"github.com/ledgerwatch/erigon/cl/phase1/execution_client"
"github.com/ledgerwatch/erigon/cl/phase1/forkchoice/fork_graph"
Expand Down Expand Up @@ -41,6 +42,9 @@ type ForkChoiceStore struct {
engine execution_client.ExecutionEngine
// freezer
recorder freezer.Freezer

// zero alloc checkpoint states production
checkpointDummyState *state.CachingBeaconState
}

type LatestMessage struct {
Expand All @@ -66,6 +70,10 @@ func NewForkChoiceStore(anchorState *state2.CachingBeaconState, engine execution
if err != nil {
return nil, err
}
checkpointDummyState, err := anchorState.Copy()
if err != nil {
return nil, err
}
return &ForkChoiceStore{
highestSeen: anchorState.Slot(),
time: anchorState.GenesisTime() + anchorState.BeaconConfig().SecondsPerSlot*anchorState.Slot(),
Expand All @@ -80,6 +88,7 @@ func NewForkChoiceStore(anchorState *state2.CachingBeaconState, engine execution
eth2Roots: eth2Roots,
engine: engine,
recorder: recorder,
checkpointDummyState: checkpointDummyState,
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cl/phase1/forkchoice/on_attester_slashing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (f *ForkChoiceStore) OnAttesterSlashing(attesterSlashing *cltypes.AttesterS
return fmt.Errorf("attestation data is not slashable")
}
// Retrieve justified state
s, _, err := f.forkGraph.GetState(f.justifiedCheckpoint.BlockRoot(), false)
s, _, err := f.forkGraph.GetState(f.justifiedCheckpoint.BlockRoot(), nil)
if err != nil {
return err
}
Expand Down
11 changes: 10 additions & 1 deletion cl/phase1/forkchoice/on_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ package forkchoice

import (
"fmt"
"runtime"
"time"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon/cl/cltypes"
Expand All @@ -14,6 +18,7 @@ import (
func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload, fullValidation bool) error {
f.mu.Lock()
defer f.mu.Unlock()
start := time.Now()
blockRoot, err := block.Block.HashSSZ()
if err != nil {
return err
Expand Down Expand Up @@ -80,7 +85,7 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload,
justificationBits = lastProcessedState.JustificationBits().Copy()
)
// Eagerly compute unrealized justification and finality
if err := statechange.ProcessJustificationBitsAndFinality(lastProcessedState); err != nil {
if err := statechange.ProcessJustificationBitsAndFinality(lastProcessedState, nil); err != nil {
return err
}
f.updateUnrealizedCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy())
Expand All @@ -95,5 +100,9 @@ func (f *ForkChoiceStore) OnBlock(block *cltypes.SignedBeaconBlock, newPayload,
if blockEpoch < currentEpoch {
f.updateCheckpoints(lastProcessedState.CurrentJustifiedCheckpoint().Copy(), lastProcessedState.FinalizedCheckpoint().Copy())
}
var m runtime.MemStats
dbg.ReadMemStats(&m)
log.Debug("OnBlock", "elapsed", time.Since(start), "alloc", libcommon.ByteCount(m.Alloc),
"sys", libcommon.ByteCount(m.Sys))
return nil
}
3 changes: 2 additions & 1 deletion cl/phase1/forkchoice/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package forkchoice

import (
"fmt"

"github.com/ledgerwatch/erigon/cl/transition"

libcommon "github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -73,7 +74,7 @@ func (f *ForkChoiceStore) getCheckpointState(checkpoint solid.Checkpoint) (*chec
return state, nil
}
// If it is not in cache compute it and then put in cache.
baseState, _, err := f.forkGraph.GetState(checkpoint.BlockRoot(), true)
baseState, _, err := f.forkGraph.GetState(checkpoint.BlockRoot(), f.checkpointDummyState)
if err != nil {
return nil, err
}
Expand Down
43 changes: 35 additions & 8 deletions cl/phase1/stages/clstages.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/spf13/afero"
)

const doDownload = false

type Cfg struct {
rpc *rpc.BeaconRpcP2P
genesisCfg *clparams.GenesisConfig
Expand All @@ -37,6 +39,8 @@ type Args struct {

targetEpoch, seenEpoch uint64
targetSlot, seenSlot uint64

downloadedHistory bool
}

func ClStagesCfg(
Expand Down Expand Up @@ -64,29 +68,34 @@ func ClStagesCfg(
type StageName = string

const (
WaitForPeers StageName = "WaitForPeers"
CatchUpEpochs StageName = "CatchUpEpochs"
CatchUpBlocks StageName = "CatchUpBlocks"
ForkChoice StageName = "ForkChoice"
ListenForForks StageName = "ListenForForks"
CleanupAndPruning StageName = "CleanupAndPruning"
SleepForSlot StageName = "SleepForSlot"
WaitForPeers StageName = "WaitForPeers"
CatchUpEpochs StageName = "CatchUpEpochs"
CatchUpBlocks StageName = "CatchUpBlocks"
ForkChoice StageName = "ForkChoice"
ListenForForks StageName = "ListenForForks"
CleanupAndPruning StageName = "CleanupAndPruning"
SleepForSlot StageName = "SleepForSlot"
DownloadHistoricalBlocks StageName = "DownloadHistoricalBlocks"
)

const (
minPeersForDownload = uint64(4)
)

func MetaCatchingUp(args Args) string {
func MetaCatchingUp(args Args) StageName {
if args.peers < minPeersForDownload {
return WaitForPeers
}
if !args.downloadedHistory && doDownload {
return DownloadHistoricalBlocks
}
if args.seenEpoch < args.targetEpoch {
return CatchUpEpochs
}
if args.seenSlot < args.targetSlot {
return CatchUpBlocks
}

return ""
}

Expand Down Expand Up @@ -207,6 +216,24 @@ func ConsensusClStages(ctx context.Context,
return nil
},
},
DownloadHistoricalBlocks: {
Description: "Download historical blocks",
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
if x := MetaCatchingUp(args); x != "" {
return x
}
return CatchUpBlocks
},
ActionFunc: func(ctx context.Context, logger log.Logger, cfg *Cfg, args Args) error {
startingRoot, err := cfg.state.BlockRoot()
if err != nil {
return err
}
startingSlot := cfg.state.LatestBlockHeader().Slot
downloader := network2.NewBackwardBeaconDownloader(ctx, cfg.rpc)
return SpawnStageHistoryDownload(StageHistoryReconstruction(downloader, cfg.dataDirFs, cfg.genesisCfg, cfg.beaconCfg, 500_000, startingRoot, startingSlot, "/tmp", logger), ctx, logger)
},
},
CatchUpEpochs: {
Description: `if we are 1 or more epochs behind, we download in parallel by epoch`,
TransitionFunc: func(cfg *Cfg, args Args, err error) string {
Expand Down
107 changes: 107 additions & 0 deletions cl/phase1/stages/stage_history_download.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package stages

import (
"context"
"fmt"
"time"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/cl/clpersist"
"github.com/ledgerwatch/erigon/cl/phase1/network"
"github.com/spf13/afero"

"github.com/ledgerwatch/erigon/cl/clparams"
"github.com/ledgerwatch/erigon/cl/cltypes"
"github.com/ledgerwatch/log/v3"
)

type StageHistoryReconstructionCfg struct {
genesisCfg *clparams.GenesisConfig
beaconCfg *clparams.BeaconChainConfig
downloader *network.BackwardBeaconDownloader
startingRoot libcommon.Hash
startingSlot uint64
backFillingAmount uint64
tmpdir string
dataDirFs afero.Fs
logger log.Logger
}

const logIntervalTime = 30 * time.Second

func StageHistoryReconstruction(downloader *network.BackwardBeaconDownloader, dataDirFs afero.Fs, genesisCfg *clparams.GenesisConfig, beaconCfg *clparams.BeaconChainConfig, backFillingAmount uint64, startingRoot libcommon.Hash, startinSlot uint64, tmpdir string, logger log.Logger) StageHistoryReconstructionCfg {
return StageHistoryReconstructionCfg{
genesisCfg: genesisCfg,
beaconCfg: beaconCfg,
downloader: downloader,
startingRoot: startingRoot,
tmpdir: tmpdir,
startingSlot: startinSlot,
logger: logger,
backFillingAmount: backFillingAmount,
dataDirFs: dataDirFs,
}
}

// SpawnStageBeaconsForward spawn the beacon forward stage
func SpawnStageHistoryDownload(cfg StageHistoryReconstructionCfg, ctx context.Context, logger log.Logger) error {

blockRoot := cfg.startingRoot
destinationSlot := uint64(0)
currentSlot := cfg.startingSlot
if currentSlot > cfg.backFillingAmount {
destinationSlot = currentSlot - cfg.backFillingAmount
}

// Start the procedure
logger.Info("Downloading History", "from", currentSlot, "to", destinationSlot)
// Setup slot and block root
cfg.downloader.SetSlotToDownload(currentSlot)
cfg.downloader.SetExpectedRoot(blockRoot)
foundLatestEth1ValidHash := false

fs := afero.NewBasePathFs(cfg.dataDirFs, "caplin/beacon")
// Set up onNewBlock callback
cfg.downloader.SetOnNewBlock(func(blk *cltypes.SignedBeaconBlock) (finished bool, err error) {
slot := blk.Block.Slot
return slot <= destinationSlot && foundLatestEth1ValidHash, clpersist.SaveBlockWithConfig(fs, blk, cfg.beaconCfg)
})
prevProgress := cfg.downloader.Progress()

logInterval := time.NewTicker(logIntervalTime)
finishCh := make(chan struct{})
// Start logging thread
go func() {
for {
select {
case <-logInterval.C:
logArgs := []interface{}{}
currProgress := cfg.downloader.Progress()
speed := float64(prevProgress-currProgress) / float64(logIntervalTime/time.Second)
prevProgress = currProgress
peerCount, err := cfg.downloader.Peers()
if err != nil {
return
}
logArgs = append(logArgs,
"progress", currProgress,
"blk/sec", fmt.Sprintf("%.1f", speed),
"peers", peerCount)
if currentSlot > destinationSlot {
logArgs = append(logArgs, "remaining", currProgress-destinationSlot)
}
logger.Info("Downloading History", logArgs...)
case <-finishCh:
return
case <-ctx.Done():

}
}
}()
for !cfg.downloader.Finished() {
cfg.downloader.RequestMore(ctx)
}
close(finishCh)

return nil
}
Loading

0 comments on commit 114dd68

Please sign in to comment.