Skip to content

Commit

Permalink
stage_bor_heimdall commits partial progress (#12097)
Browse files Browse the repository at this point in the history
after this PR, stage_bor_heimdall doesn't run in 1 transaction, thus can
commit partial progress
  • Loading branch information
stevemilk authored Nov 3, 2024
1 parent 00cf8eb commit e46cc43
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 60 deletions.
105 changes: 52 additions & 53 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,75 +787,74 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log

heimdallClient := engine.(*bor.Bor).HeimdallClient

return db.Update(ctx, func(tx kv.RwTx) error {
if reset {
if err := reset2.ResetBorHeimdall(ctx, tx); err != nil {
return err
}
return nil
}
if unwind > 0 {
sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()

stageState := stage(sync, tx, nil, stages.BorHeimdall)

snapshotsMaxBlock := borSn.BlocksAvailable()
if unwind <= snapshotsMaxBlock {
return fmt.Errorf("cannot unwind past snapshots max block: %d", snapshotsMaxBlock)
}

if unwind > stageState.BlockNumber {
return fmt.Errorf("cannot unwind to a point beyond stage: %d", stageState.BlockNumber)
}

unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, heimdallStore, bridgeStore, nil, nil, nil, nil, nil, false, unwindTypes)
if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil {
return err
}

stageProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall)
if err != nil {
return fmt.Errorf("re-read bor heimdall progress: %w", err)
}

logger.Info("progress", "bor heimdall", stageProgress)
return nil
var tx kv.RwTx
if reset {
if err := reset2.ResetBorHeimdall(ctx, tx, db); err != nil {
return err
}

return nil
}
if unwind > 0 {
sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
blockReader, _ := blocksIO(db, logger)
var (
snapDb kv.RwDB
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot]
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address]
)
if bor, ok := engine.(*bor.Bor); ok {
snapDb = bor.DB
recents = bor.Recents
signatures = bor.Signatures
}
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, nil, nil, recents, signatures, false, unwindTypes)

stageState := stage(sync, tx, nil, stages.BorHeimdall)
if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil {

snapshotsMaxBlock := borSn.BlocksAvailable()
if unwind <= snapshotsMaxBlock {
return fmt.Errorf("cannot unwind past snapshots max block: %d", snapshotsMaxBlock)
}

if unwind > stageState.BlockNumber {
return fmt.Errorf("cannot unwind to a point beyond stage: %d", stageState.BlockNumber)
}

unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false)
cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, heimdallStore, bridgeStore, nil, nil, nil, nil, nil, false, unwindTypes)
if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil {
return err
}

stageProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall)
stageProgress, err := stagedsync.BorHeimdallStageProgress(tx, cfg)
if err != nil {
return fmt.Errorf("re-read bor heimdall progress: %w", err)
}

logger.Info("progress", "bor heimdall", stageProgress)
return nil
})
}

sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
blockReader, _ := blocksIO(db, logger)
var (
snapDb kv.RwDB
recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot]
signatures *lru.ARCCache[libcommon.Hash, libcommon.Address]
)
if bor, ok := engine.(*bor.Bor); ok {
snapDb = bor.DB
recents = bor.Recents
signatures = bor.Signatures
}
cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, nil, nil, recents, signatures, false, unwindTypes)

stageState := stage(sync, tx, nil, stages.BorHeimdall)
if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil {
return err
}

stageProgress, err := stagedsync.BorHeimdallStageProgress(tx, cfg)
if err != nil {
return fmt.Errorf("re-read bor heimdall progress: %w", err)
}

logger.Info("progress", "bor heimdall", stageProgress)
return nil
}

func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error {
Expand Down
19 changes: 17 additions & 2 deletions core/rawdb/rawdbreset/reset_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,16 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.Full

return nil
}
func ResetBorHeimdall(ctx context.Context, tx kv.RwTx) error {
func ResetBorHeimdall(ctx context.Context, tx kv.RwTx, db kv.RwDB) error {
useExternalTx := tx != nil
if !useExternalTx {
var err error
tx, err = db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if err := tx.ClearBucket(kv.BorEventNums); err != nil {
return err
}
Expand All @@ -109,7 +118,13 @@ func ResetBorHeimdall(ctx context.Context, tx kv.RwTx) error {
if err := tx.ClearBucket(kv.BorSpans); err != nil {
return err
}
return clearStageProgress(tx, stages.BorHeimdall)
if err := clearStageProgress(tx, stages.BorHeimdall); err != nil {
return err
}
if !useExternalTx {
return tx.Commit()
}
return nil
}

func ResetPolygonSync(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.FullBlockReader, bw *blockio.BlockWriter, dirs datadir.Dirs, cc chain.Config, logger log.Logger) error {
Expand Down
12 changes: 8 additions & 4 deletions eth/stagedsync/stage_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64
workersCount = 1
}

prevStageProgress, err := senderStageProgress(txc.Tx, cfg.db)
prevStageProgress, err := stageProgress(txc.Tx, cfg.db, stages.Senders)
if err != nil {
return err
}
Expand Down Expand Up @@ -230,15 +230,15 @@ func unwindExec3(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx contex
return nil
}

func senderStageProgress(tx kv.Tx, db kv.RoDB) (prevStageProgress uint64, err error) {
func stageProgress(tx kv.Tx, db kv.RoDB, stage stages.SyncStage) (prevStageProgress uint64, err error) {
if tx != nil {
prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders)
prevStageProgress, err = stages.GetStageProgress(tx, stage)
if err != nil {
return prevStageProgress, err
}
} else {
if err = db.View(context.Background(), func(tx kv.Tx) error {
prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders)
prevStageProgress, err = stages.GetStageProgress(tx, stage)
if err != nil {
return err
}
Expand All @@ -250,6 +250,10 @@ func senderStageProgress(tx kv.Tx, db kv.RoDB) (prevStageProgress uint64, err er
return prevStageProgress, nil
}

func BorHeimdallStageProgress(tx kv.Tx, cfg BorHeimdallCfg) (prevStageProgress uint64, err error) {
return stageProgress(tx, cfg.db, stages.BorHeimdall)
}

// ================ Erigon3 End ================

func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) (err error) {
Expand Down
2 changes: 1 addition & 1 deletion migrations/clear_bor_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var ClearBorTables = Migration{
return err
}

if err := reset2.ResetBorHeimdall(context.Background(), tx); err != nil {
if err := reset2.ResetBorHeimdall(context.Background(), tx, db); err != nil {
return err
}

Expand Down

0 comments on commit e46cc43

Please sign in to comment.