Skip to content

Commit

Permalink
Add snapshot processing for all astrid modes (#12339)
Browse files Browse the repository at this point in the history
This PR allows stand alone astrid to participate in snapshot production.

It includes the following changes:

- All bor data storage is handled in the bor/heimdall stage
- Stores now process db & snapshot calls
- Snapshot production runs from multiple databases in stand alone mode
- In the legacy stages and  astrid stage mode the chain db is still used

The current state of the types which manage bor persistence are as
follows:

![db-types-Astrid
Types](https://github.com/user-attachments/assets/3bb1e035-b54b-4384-ab52-7f7479b256b1)

These changes have been tested by running amoy from scratch in various
modes:

* no polygon sync
* polygon.sync
* polygon.sync.stage

I have also tested removing chaindata (& heimdall & polygon-bridge) dbs
and restarting as well as removing locally produced snapshot files.

In all cases for my testing amoy startes, syncs and runs at the tip.
  • Loading branch information
mh0lt authored Oct 28, 2024
1 parent 0ff97f9 commit 26c3173
Show file tree
Hide file tree
Showing 123 changed files with 7,158 additions and 4,966 deletions.
2 changes: 1 addition & 1 deletion accounts/abi/unpack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func TestOOMMaliciousInput(t *testing.T) {
}
encb, err := hex.DecodeString(test.enc)
if err != nil {
t.Fatalf("invalid hex: %s" + test.enc)
t.Fatalf("invalid hex: %s", test.enc)
}
_, err = abi.Methods["method"].Outputs.UnpackValues(encb)
if err == nil {
Expand Down
3 changes: 1 addition & 2 deletions cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,7 @@ func (r *RetrieveHistoricalState) Run(ctx *Context) error {
return err
}

var bor *freezeblocks.BorRoSnapshots
blockReader := freezeblocks.NewBlockReader(allSnapshots, bor)
blockReader := freezeblocks.NewBlockReader(allSnapshots, nil, nil, nil)
eth1Getter := getters.NewExecutionSnapshotReader(ctx, blockReader, db)
eth1Getter.SetBeaconChainConfig(beaconConfig)
csn := freezeblocks.NewCaplinSnapshots(ethconfig.BlocksFreezing{}, beaconConfig, dirs, log.Root())
Expand Down
12 changes: 6 additions & 6 deletions cmd/devnet/scenarios/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *suite) runStep(ctx context.Context, scenario *Scenario, step *Step, pre
earlyReturn := prevStepErr != nil || sr.Err == ErrUndefined

// Run after step handlers.
rctx, sr.Err = s.runAfterStepHooks(ctx, step, sr.Status, sr.Err)
rctx, sr.Err = s.runAfterStepHooks(ctx, step, sr.Status, sr.Err) //nolint

// Trigger after scenario on failing or last step to attach possible hook error to step.
if isLast || (sr.Status != Skipped && sr.Status != Undefined && sr.Err != nil) {
Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *suite) runScenario(scenario *Scenario) (sr *ScenarioResult, err error)
if s.testingT != nil {
// Running scenario as a subtest.
s.testingT.Run(scenario.Name, func(t *testing.T) {
ctx, sr.StepResults, err = s.runSteps(ctx, scenario, scenario.Steps)
ctx, sr.StepResults, err = s.runSteps(ctx, scenario, scenario.Steps) //nolint
if s.shouldFail(err) {
t.Error(err)
}
Expand Down Expand Up @@ -346,7 +346,7 @@ func (s *suite) runBeforeStepHooks(ctx context.Context, step *Step, err error) (
}

if hctx != nil {
ctx = hctx
ctx = hctx //nolint
}
}

Expand All @@ -371,7 +371,7 @@ func (s *suite) runAfterStepHooks(ctx context.Context, step *Step, status StepSt
}

if hctx != nil {
ctx = hctx
ctx = hctx //nolint
}
}

Expand All @@ -393,7 +393,7 @@ func (s *suite) runBeforeScenarioHooks(ctx context.Context, scenario *Scenario)
}

if hctx != nil {
ctx = hctx
ctx = hctx //nolint
}
}

Expand Down Expand Up @@ -431,7 +431,7 @@ func (s *suite) runAfterScenarioHooks(ctx context.Context, scenario *Scenario, l
}

if hctx != nil {
ctx = hctx
ctx = hctx //nolint
}
}

Expand Down
130 changes: 109 additions & 21 deletions cmd/devnet/services/polygon/heimdallsim/heimdall_simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@ package heimdallsim

import (
"context"
"encoding/json"
"errors"
"os"
"time"

"github.com/erigontech/erigon-lib/log/v3"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/downloader/snaptype"
"github.com/erigontech/erigon/eth/ethconfig"
"github.com/erigontech/erigon/polygon/bridge"
"github.com/erigontech/erigon/polygon/heimdall"
"github.com/erigontech/erigon/rlp"
"github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"
)

type HeimdallSimulator struct {
snapshots *freezeblocks.BorRoSnapshots
snapshots *heimdall.RoSnapshots
blockReader *freezeblocks.BlockReader

iterations []uint64 // list of final block numbers for an iteration
Expand All @@ -43,8 +45,99 @@ type HeimdallSimulator struct {

var _ heimdall.HeimdallClient = (*HeimdallSimulator)(nil)

type sprintLengthCalculator struct{}

func (sprintLengthCalculator) CalculateSprintLength(number uint64) uint64 {
return 16
}

type noopBridgeStore struct{}

func (noopBridgeStore) Prepare(ctx context.Context) error {
return nil
}

func (noopBridgeStore) Close() {}

func (noopBridgeStore) LastEventId(ctx context.Context) (uint64, error) {
return 0, errors.New("noop")
}
func (noopBridgeStore) LastEventIdWithinWindow(ctx context.Context, fromID uint64, toTime time.Time) (uint64, error) {
return 0, errors.New("noop")
}
func (noopBridgeStore) LastProcessedEventId(ctx context.Context) (uint64, error) {
return 0, errors.New("noop")
}
func (noopBridgeStore) LastProcessedBlockInfo(ctx context.Context) (bridge.ProcessedBlockInfo, bool, error) {
return bridge.ProcessedBlockInfo{}, false, errors.New("noop")
}
func (noopBridgeStore) LastFrozenEventId() uint64 {
return 0
}
func (noopBridgeStore) LastFrozenEventBlockNum() uint64 {
return 0
}
func (noopBridgeStore) EventTxnToBlockNum(ctx context.Context, borTxHash libcommon.Hash) (uint64, bool, error) {
return 0, false, errors.New("noop")
}
func (noopBridgeStore) Events(ctx context.Context, start, end uint64) ([][]byte, error) {
return nil, errors.New("noop")
}
func (noopBridgeStore) BlockEventIdsRange(ctx context.Context, blockNum uint64) (start uint64, end uint64, err error) {
return 0, 0, errors.New("noop")
}
func (noopBridgeStore) PutEventTxnToBlockNum(ctx context.Context, eventTxnToBlockNum map[libcommon.Hash]uint64) error {
return nil
}
func (noopBridgeStore) PutEvents(ctx context.Context, events []*heimdall.EventRecordWithTime) error {
return nil
}
func (noopBridgeStore) PutBlockNumToEventId(ctx context.Context, blockNumToEventId map[uint64]uint64) error {
return nil
}
func (noopBridgeStore) PutProcessedBlockInfo(ctx context.Context, info bridge.ProcessedBlockInfo) error {
return nil
}
func (noopBridgeStore) Unwind(ctx context.Context, blockNum uint64) error {
return nil
}
func (noopBridgeStore) BorStartEventId(ctx context.Context, hash libcommon.Hash, blockHeight uint64) (uint64, error) {
return 0, errors.New("noop")
}
func (noopBridgeStore) EventsByBlock(ctx context.Context, hash libcommon.Hash, blockNum uint64) ([]rlp.RawValue, error) {
return nil, errors.New("noop")
}
func (noopBridgeStore) EventsByIdFromSnapshot(from uint64, to time.Time, limit int) ([]*heimdall.EventRecordWithTime, bool, error) {
return nil, false, errors.New("noop")
}
func (noopBridgeStore) PruneEvents(ctx context.Context, blocksTo uint64, blocksDeleteLimit int) (deleted int, err error) {
return 0, nil
}

type heimdallStore struct {
spans heimdall.EntityStore[*heimdall.Span]
}

func (heimdallStore) Checkpoints() heimdall.EntityStore[*heimdall.Checkpoint] {
return nil
}
func (heimdallStore) Milestones() heimdall.EntityStore[*heimdall.Milestone] {
return nil
}
func (hs heimdallStore) Spans() heimdall.EntityStore[*heimdall.Span] {
return hs.spans
}
func (heimdallStore) SpanBlockProducerSelections() heimdall.EntityStore[*heimdall.SpanBlockProducerSelection] {
return nil
}
func (heimdallStore) Prepare(ctx context.Context) error {
return nil
}
func (heimdallStore) Close() {
}

func NewHeimdallSimulator(ctx context.Context, snapDir string, logger log.Logger, iterations []uint64) (*HeimdallSimulator, error) {
snapshots := freezeblocks.NewBorRoSnapshots(ethconfig.Defaults.Snapshot, snapDir, 0, logger)
snapshots := heimdall.NewRoSnapshots(ethconfig.Defaults.Snapshot, snapDir, 0, logger)

// index local files
localFiles, err := os.ReadDir(snapDir)
Expand All @@ -55,7 +148,7 @@ func NewHeimdallSimulator(ctx context.Context, snapDir string, logger log.Logger
for _, file := range localFiles {
info, _, _ := snaptype.ParseFileName(snapDir, file.Name())
if info.Ext == ".seg" {
err = info.Type.BuildIndexes(ctx, info, nil, snapDir, nil, log.LvlWarn, logger)
err = info.Type.BuildIndexes(ctx, info, nil, nil, snapDir, nil, log.LvlWarn, logger)
if err != nil {
return nil, err
}
Expand All @@ -67,8 +160,12 @@ func NewHeimdallSimulator(ctx context.Context, snapDir string, logger log.Logger
}

h := HeimdallSimulator{
snapshots: snapshots,
blockReader: freezeblocks.NewBlockReader(nil, snapshots),
snapshots: snapshots,
blockReader: freezeblocks.NewBlockReader(nil, snapshots,
heimdallStore{
spans: heimdall.NewSpanSnapshotStore(heimdall.NoopEntityStore[*heimdall.Span]{Type: heimdall.Spans}, snapshots),
},
bridge.NewSnapshotStore(noopBridgeStore{}, snapshots, sprintLengthCalculator{})),

iterations: iterations,

Expand Down Expand Up @@ -97,25 +194,25 @@ func (h *HeimdallSimulator) Next() {
func (h *HeimdallSimulator) FetchLatestSpan(ctx context.Context) (*heimdall.Span, error) {
latestSpan := uint64(heimdall.SpanIdAt(h.lastAvailableBlockNumber))

span, err := h.getSpan(ctx, latestSpan)
span, _, err := h.getSpan(ctx, latestSpan)
if err != nil {
return nil, err
}

return &span, nil
return span, nil
}

func (h *HeimdallSimulator) FetchSpan(ctx context.Context, spanID uint64) (*heimdall.Span, error) {
if spanID > uint64(heimdall.SpanIdAt(h.lastAvailableBlockNumber)) {
return nil, errors.New("span not found")
}

span, err := h.getSpan(ctx, spanID)
span, _, err := h.getSpan(ctx, spanID)
if err != nil {
return nil, err
}

return &span, err
return span, err
}

func (h *HeimdallSimulator) FetchSpans(ctx context.Context, page uint64, limit uint64) ([]*heimdall.Span, error) {
Expand Down Expand Up @@ -167,15 +264,6 @@ func (h *HeimdallSimulator) FetchMilestoneID(ctx context.Context, milestoneID st
return errors.New("method FetchMilestoneID not implemented")
}

func (h *HeimdallSimulator) getSpan(ctx context.Context, spanId uint64) (heimdall.Span, error) {
span, err := h.blockReader.Span(ctx, nil, spanId)
if span != nil && err == nil {
var s heimdall.Span
if err = json.Unmarshal(span, &s); err != nil {
return heimdall.Span{}, err
}
return s, err
}

return heimdall.Span{}, err
func (h *HeimdallSimulator) getSpan(ctx context.Context, spanId uint64) (*heimdall.Span, bool, error) {
return h.blockReader.Span(ctx, nil, spanId)
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestSimulatorEvents(t *testing.T) {
}

// the number of events included in v1-000000-000500-borevents.seg
eventsCount := 23
eventsCount := 100

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
8 changes: 4 additions & 4 deletions cmd/devnet/tests/bor_devnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func TestStateSync(t *testing.T) {
})
t.Run("DeployChildChainReceiver", func(t *testing.T) {
var err error
ctx, err = contracts_steps.DeployChildChainReceiver(ctx, "child-funder")
ctx, err = contracts_steps.DeployChildChainReceiver(ctx, "child-funder") //nolint
require.Nil(t, err)
})
t.Run("DeployRootChainSender", func(t *testing.T) {
var err error
ctx, err = contracts_steps.DeployRootChainSender(ctx, "root-funder")
ctx, err = contracts_steps.DeployRootChainSender(ctx, "root-funder") //nolint
require.Nil(t, err)
})
t.Run("GenerateSyncEvents", func(t *testing.T) {
Expand Down Expand Up @@ -87,12 +87,12 @@ func TestChildChainExit(t *testing.T) {
})
t.Run("DeployRootChainReceiver", func(t *testing.T) {
var err error
ctx, err = contracts_steps.DeployRootChainReceiver(ctx, "root-funder")
ctx, err = contracts_steps.DeployRootChainReceiver(ctx, "root-funder") //nolint
require.Nil(t, err)
})
t.Run("DeployChildChainSender", func(t *testing.T) {
var err error
ctx, err = contracts_steps.DeployChildChainSender(ctx, "child-funder")
ctx, err = contracts_steps.DeployChildChainSender(ctx, "child-funder") //nolint
require.Nil(t, err)
})
t.Run("ProcessChildTransfers", func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/diag/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {

var cancel context.CancelFunc

ctx.Context, cancel = context.WithCancel(sync.WithLogger(ctx.Context, logger))
ctx.Context, cancel = context.WithCancel(sync.WithLogger(ctx.Context, logger)) //nolint

go handleTerminationSignals(cancel, logger)

Expand Down
4 changes: 2 additions & 2 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"time"

"github.com/erigontech/erigon-lib/common/dbg"
_ "github.com/erigontech/erigon/core/snaptype" //hack
_ "github.com/erigontech/erigon/polygon/bor/snaptype" //hack
_ "github.com/erigontech/erigon/core/snaptype" //hack
_ "github.com/erigontech/erigon/polygon/heimdall" //hack

"github.com/anacrolix/torrent/metainfo"
"github.com/c2h5oh/datasize"
Expand Down
2 changes: 1 addition & 1 deletion cmd/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func printCurrentBlockNumber(chaindata string) {
}

func blocksIO(db kv.RoDB) (services.FullBlockReader, *blockio.BlockWriter) {
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{}, "", 0, log.New()), nil /* BorSnapshots */)
br := freezeblocks.NewBlockReader(freezeblocks.NewRoSnapshots(ethconfig.BlocksFreezing{}, "", 0, log.New()), nil, nil, nil)
bw := blockio.NewBlockWriter()
return br, bw
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/integration/commands/reset_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/rawdbv3"
"github.com/erigontech/erigon-lib/state"
"github.com/erigontech/erigon/polygon/heimdall"
"github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"

"github.com/erigontech/erigon/core/rawdb/rawdbhelpers"
Expand All @@ -54,7 +55,7 @@ var cmdResetState = &cobra.Command{
}
ctx, _ := common.RootContext()
defer db.Close()
sn, borSn, agg, _ := allSnapshots(ctx, db, logger)
sn, borSn, agg, _, _, _ := allSnapshots(ctx, db, logger)
defer sn.Close()
defer borSn.Close()
defer agg.Close()
Expand Down Expand Up @@ -113,7 +114,7 @@ func init() {
rootCmd.AddCommand(cmdClearBadBlocks)
}

func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *freezeblocks.BorRoSnapshots, agg *state.Aggregator) error {
func printStages(tx kv.Tx, snapshots *freezeblocks.RoSnapshots, borSn *heimdall.RoSnapshots, agg *state.Aggregator) error {
var err error
var progress uint64
w := new(tabwriter.Writer)
Expand Down
2 changes: 1 addition & 1 deletion cmd/integration/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func openDB(opts kv2.MdbxOpts, applyMigrations bool, logger log.Logger) (kv.RwDB
}

if opts.GetLabel() == kv.ChainDB {
_, _, agg, _ := allSnapshots(context.Background(), db, logger)
_, _, agg, _, _, _ := allSnapshots(context.Background(), db, logger)
tdb, err := temporal.New(db, agg)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 26c3173

Please sign in to comment.