Skip to content

Commit

Permalink
e2: rpcd to not open file before download complete (#12373)
Browse files Browse the repository at this point in the history
heuristic: download complete if `StageSnapshots > 0`
  • Loading branch information
AskAlexSharov committed Oct 22, 2024
1 parent b74578f commit 687dee8
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 38 deletions.
48 changes: 31 additions & 17 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/temporal"
"github.com/ledgerwatch/log/v3"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
grpcHealth "google.golang.org/grpc/health"
Expand Down Expand Up @@ -385,31 +386,43 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Configure sapshots
allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allBorSnapshots.OptimisticalyReopenWithDB(db)
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")

if agg, err = libstate.NewAggregator(ctx, cfg.Dirs.SnapHistory, cfg.Dirs.Tmp, config3.HistoryV3AggregationStep, db, logger); err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, fmt.Errorf("create aggregator: %w", err)
}
_ = agg.OpenFolder()

db.View(context.Background(), func(tx kv.Tx) error {
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(db)
if err != nil {
return nil, nil, nil, nil, nil, nil, nil, ff, nil, err
}
if allSegmentsDownloadComplete {
allSnapshots.OptimisticReopenWithDB(db)
allBorSnapshots.OptimisticalyReopenWithDB(db)
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")

_ = agg.OpenFolder()

db.View(context.Background(), func(tx kv.Tx) error {
agg.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
_, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
return histBlockNumProgress
})
return nil
})
return nil
})
} else {
logger.Debug("[rpc] download of segments not complete yet. please wait StageSnapshots to finish")
}

wg := errgroup.Group{}
wg.SetLimit(1)
onNewSnapshot = func() {
go func() { // don't block events processing by network communication
wg.Go(func() error { // don't block events processing by network communication
reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true))
if err != nil {
logger.Warn("[snapshots] reopen", "err", err)
return
return nil
}
if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil {
logger.Error("[snapshots] reopen", "err", err)
Expand All @@ -435,7 +448,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
return nil
})
}
}()
return nil
})
}
onNewSnapshot()
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
rootCtx, rootCancel := common.RootContext()
cmd.RunE = func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
logger := debug.SetupCobra(cmd, "sentry")
logger := debug.SetupCobra(cmd, "rpcdaemon")
db, backend, txPool, mining, stateCache, blockReader, engine, ff, agg, err := cli.RemoteServices(ctx, cfg, logger, rootCancel)
if err != nil {
if !errors.Is(err, context.Canceled) {
Expand Down
15 changes: 15 additions & 0 deletions core/rawdb/accessors_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package rawdb

import (
"context"
"encoding/json"
"fmt"

"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/polygon/bor/borcfg"

"github.com/ledgerwatch/erigon-lib/chain"
Expand Down Expand Up @@ -81,3 +84,15 @@ func WriteChainConfig(db kv.Putter, hash libcommon.Hash, cfg *chain.Config) erro
func DeleteChainConfig(db kv.Deleter, hash libcommon.Hash) error {
return db.Delete(kv.ConfigTable, hash[:])
}

func AllSegmentsDownloadComplete(tx kv.Getter) (allSegmentsDownloadComplete bool, err error) {
snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots)
return snapshotsStageProgress > 0, err
}
func AllSegmentsDownloadCompleteFromDB(db kv.RoDB) (allSegmentsDownloadComplete bool, err error) {
err = db.View(context.Background(), func(tx kv.Tx) error {
allSegmentsDownloadComplete, err = AllSegmentsDownloadComplete(tx)
return err
})
return allSegmentsDownloadComplete, err
}
48 changes: 28 additions & 20 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,9 @@ import (
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/common/disk"
"github.com/ledgerwatch/erigon-lib/common/mem"
"github.com/ledgerwatch/erigon-lib/diagnostics"

"github.com/erigontech/mdbx-go/mdbx"
lru "github.com/hashicorp/golang-lru/arc/v2"
"github.com/holiman/uint256"
"github.com/ledgerwatch/erigon-lib/config3"
"github.com/ledgerwatch/erigon-lib/kv/temporal"
"github.com/ledgerwatch/log/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand All @@ -54,6 +47,11 @@ import (
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dir"
"github.com/ledgerwatch/erigon-lib/common/disk"
"github.com/ledgerwatch/erigon-lib/common/mem"
"github.com/ledgerwatch/erigon-lib/config3"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/direct"
"github.com/ledgerwatch/erigon-lib/downloader"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
Expand All @@ -70,6 +68,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
"github.com/ledgerwatch/erigon-lib/kv/kvcfg"
"github.com/ledgerwatch/erigon-lib/kv/remotedbserver"
"github.com/ledgerwatch/erigon-lib/kv/temporal"
libstate "github.com/ledgerwatch/erigon-lib/state"
"github.com/ledgerwatch/erigon-lib/txpool"
"github.com/ledgerwatch/erigon-lib/txpool/txpoolcfg"
Expand Down Expand Up @@ -1373,28 +1372,37 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf
allBorSnapshots = freezeblocks.NewBorRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger)
}

var err error
if snConfig.Snapshot.NoDownloader {
allSnapshots.ReopenFolder()
if isBor {
allBorSnapshots.ReopenFolder()
}
} else {
allSnapshots.OptimisticalyReopenWithDB(db)
if isBor {
allBorSnapshots.OptimisticalyReopenWithDB(db)
}
}
blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots)
blockWriter := blockio.NewBlockWriter(histV3)

agg, err := libstate.NewAggregator(ctx, dirs.SnapHistory, dirs.Tmp, config3.HistoryV3AggregationStep, db, logger)
if err != nil {
return nil, nil, nil, nil, nil, err
}
if err = agg.OpenFolder(); err != nil {

allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(db)
if err != nil {
return nil, nil, nil, nil, nil, err
}
if allSegmentsDownloadComplete {
if snConfig.Snapshot.NoDownloader {
allSnapshots.ReopenFolder()
if isBor {
allBorSnapshots.ReopenFolder()
}
} else {
allSnapshots.OptimisticalyReopenWithDB(db)
if isBor {
allBorSnapshots.OptimisticalyReopenWithDB(db)
}
}
if err = agg.OpenFolder(); err != nil {
return nil, nil, nil, nil, nil, err
}
} else {
logger.Debug("[rpc] download of segments not complete yet. please wait StageSnapshots to finish")
}

return blockReader, blockWriter, allSnapshots, allBorSnapshots, agg, nil
}

Expand Down

0 comments on commit 687dee8

Please sign in to comment.