Skip to content

Commit

Permalink
polygon/heimdall: Split read functions into reader (#11924)
Browse files Browse the repository at this point in the history
- Create `heimdall.Reader` for future use in `bor_*` API
- Make `AssembleReader` and `NewReader` leaner by not requiring full
`BorConfig`
  • Loading branch information
shohamc1 authored Sep 9, 2024
1 parent f931a43 commit 1158ad1
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 34 deletions.
5 changes: 3 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,9 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}

if config.PolygonSync {
polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, consensusConfig.(*borcfg.BorConfig), heimdallClient)
heimdallService = heimdall.AssembleService(consensusConfig.(*borcfg.BorConfig), config.HeimdallURL, dirs.DataDir, tmpdir, logger)
borConfig := consensusConfig.(*borcfg.BorConfig)
polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, borConfig, heimdallClient)
heimdallService = heimdall.AssembleService(borConfig.CalculateSprintNumber, config.HeimdallURL, dirs.DataDir, tmpdir, logger)

backend.polygonBridge = polygonBridge
}
Expand Down
3 changes: 2 additions & 1 deletion eth/stagedsync/stage_polygon_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func NewPolygonSyncStageCfg(
txActionStream: txActionStream,
}
borConfig := chainConfig.Bor.(*borcfg.BorConfig)
heimdallService := heimdall.NewService(borConfig, heimdallClient, heimdallStore, logger)
heimdallReader := heimdall.NewReader(borConfig.CalculateSprintNumber, heimdallStore, logger)
heimdallService := heimdall.NewService(borConfig.CalculateSprintNumber, heimdallClient, heimdallStore, logger, heimdallReader)
bridgeService := bridge.NewBridge(bridgeStore, logger, borConfig, heimdallClient, nil)
p2pService := p2p.NewService(maxPeers, logger, sentry, statusDataProvider.GetStatusData)
checkpointVerifier := polygonsync.VerifyCheckpointHeaders
Expand Down
57 changes: 57 additions & 0 deletions polygon/heimdall/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package heimdall

import (
"context"

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/polygon/bor/valset"
)

type Reader struct {
logger log.Logger
store ServiceStore
spanBlockProducersTracker *spanBlockProducersTracker
}

// AssembleReader creates and opens the MDBX store. For use cases where the store is only being read from. Must call Close.
func AssembleReader(ctx context.Context, calculateSprintNumber CalculateSprintNumberFunc, dataDir string, tmpDir string, logger log.Logger) (*Reader, error) {
store := NewMdbxServiceStore(logger, dataDir, tmpDir)

err := store.Prepare(ctx)
if err != nil {
return nil, err
}

return NewReader(calculateSprintNumber, store, logger), nil
}

func NewReader(calculateSprintNumber CalculateSprintNumberFunc, store ServiceStore, logger log.Logger) *Reader {
return &Reader{
logger: logger,
store: store,
spanBlockProducersTracker: newSpanBlockProducersTracker(logger, calculateSprintNumber, store.SpanBlockProducerSelections()),
}
}

func (r *Reader) Span(ctx context.Context, id uint64) (*Span, bool, error) {
return r.store.Spans().Entity(ctx, id)
}

func (r *Reader) CheckpointsFromBlock(ctx context.Context, startBlock uint64) (Waypoints, error) {
entities, err := r.store.Checkpoints().RangeFromBlockNum(ctx, startBlock)
return libcommon.SliceMap(entities, castEntityToWaypoint[*Checkpoint]), err
}

func (r *Reader) MilestonesFromBlock(ctx context.Context, startBlock uint64) (Waypoints, error) {
entities, err := r.store.Milestones().RangeFromBlockNum(ctx, startBlock)
return libcommon.SliceMap(entities, castEntityToWaypoint[*Milestone]), err
}

func (r *Reader) Producers(ctx context.Context, blockNum uint64) (*valset.ValidatorSet, error) {
return r.spanBlockProducersTracker.Producers(ctx, blockNum)
}

func (r *Reader) Close() {
r.store.Close()
}
26 changes: 13 additions & 13 deletions polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/polygon/bor/valset"
"github.com/erigontech/erigon/polygon/polygoncommon"
)
Expand All @@ -46,23 +45,25 @@ type Service interface {
type service struct {
logger log.Logger
store ServiceStore
reader *Reader
checkpointScraper *scraper[*Checkpoint]
milestoneScraper *scraper[*Milestone]
spanScraper *scraper[*Span]
spanBlockProducersTracker *spanBlockProducersTracker
}

func AssembleService(borConfig *borcfg.BorConfig, heimdallUrl string, dataDir string, tmpDir string, logger log.Logger) Service {
func AssembleService(calculateSprintNumberFn CalculateSprintNumberFunc, heimdallUrl string, dataDir string, tmpDir string, logger log.Logger) Service {
store := NewMdbxServiceStore(logger, dataDir, tmpDir)
client := NewHeimdallClient(heimdallUrl, logger)
return NewService(borConfig, client, store, logger)
reader := NewReader(calculateSprintNumberFn, store, logger)
return NewService(calculateSprintNumberFn, client, store, logger, reader)
}

func NewService(borConfig *borcfg.BorConfig, client HeimdallClient, store ServiceStore, logger log.Logger) Service {
return newService(borConfig, client, store, logger)
func NewService(calculateSprintNumberFn CalculateSprintNumberFunc, client HeimdallClient, store ServiceStore, logger log.Logger, reader *Reader) Service {
return newService(calculateSprintNumberFn, client, store, logger, reader)
}

func newService(borConfig *borcfg.BorConfig, client HeimdallClient, store ServiceStore, logger log.Logger) *service {
func newService(calculateSprintNumberFn CalculateSprintNumberFunc, client HeimdallClient, store ServiceStore, logger log.Logger, reader *Reader) *service {
checkpointFetcher := newCheckpointFetcher(client, logger)
milestoneFetcher := newMilestoneFetcher(client, logger)
spanFetcher := newSpanFetcher(client, logger)
Expand Down Expand Up @@ -101,10 +102,11 @@ func newService(borConfig *borcfg.BorConfig, client HeimdallClient, store Servic
return &service{
logger: logger,
store: store,
reader: reader,
checkpointScraper: checkpointScraper,
milestoneScraper: milestoneScraper,
spanScraper: spanScraper,
spanBlockProducersTracker: newSpanBlockProducersTracker(logger, borConfig, store.SpanBlockProducerSelections()),
spanBlockProducersTracker: newSpanBlockProducersTracker(logger, calculateSprintNumberFn, store.SpanBlockProducerSelections()),
}
}

Expand Down Expand Up @@ -164,7 +166,7 @@ func newSpanFetcher(client HeimdallClient, logger log.Logger) entityFetcher[*Spa
}

func (s *service) Span(ctx context.Context, id uint64) (*Span, bool, error) {
return s.store.Spans().Entity(ctx, id)
return s.reader.Span(ctx, id)
}

func castEntityToWaypoint[TEntity Waypoint](entity TEntity) Waypoint {
Expand Down Expand Up @@ -220,17 +222,15 @@ func (s *service) synchronizeSpans(ctx context.Context) error {
}

func (s *service) CheckpointsFromBlock(ctx context.Context, startBlock uint64) (Waypoints, error) {
entities, err := s.store.Checkpoints().RangeFromBlockNum(ctx, startBlock)
return libcommon.SliceMap(entities, castEntityToWaypoint[*Checkpoint]), err
return s.reader.CheckpointsFromBlock(ctx, startBlock)
}

func (s *service) MilestonesFromBlock(ctx context.Context, startBlock uint64) (Waypoints, error) {
entities, err := s.store.Milestones().RangeFromBlockNum(ctx, startBlock)
return libcommon.SliceMap(entities, castEntityToWaypoint[*Milestone]), err
return s.reader.MilestonesFromBlock(ctx, startBlock)
}

func (s *service) Producers(ctx context.Context, blockNum uint64) (*valset.ValidatorSet, error) {
return s.spanBlockProducersTracker.Producers(ctx, blockNum)
return s.reader.Producers(ctx, blockNum)
}

func (s *service) RegisterMilestoneObserver(callback func(*Milestone), opts ...ObserverOption) polygoncommon.UnregisterFunc {
Expand Down
3 changes: 2 additions & 1 deletion polygon/heimdall/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func (suite *ServiceTestSuite) SetupSuite() {
suite.setupSpans()
suite.setupCheckpoints()
suite.setupMilestones()
suite.service = newService(borConfig, suite.client, store, logger)
reader := NewReader(borConfig.CalculateSprintNumber, store, logger)
suite.service = newService(borConfig.CalculateSprintNumber, suite.client, store, logger, reader)

err := suite.service.store.Prepare(suite.ctx)
require.NoError(suite.T(), err)
Expand Down
35 changes: 18 additions & 17 deletions polygon/heimdall/span_block_producers_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,32 @@ import (
"sync/atomic"

"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/polygon/bor/borcfg"
"github.com/erigontech/erigon/polygon/bor/valset"
)

type CalculateSprintNumberFunc func(uint64) uint64

func newSpanBlockProducersTracker(
logger log.Logger,
borConfig *borcfg.BorConfig,
calculateSprintNumber CalculateSprintNumberFunc,
store EntityStore[*SpanBlockProducerSelection],
) *spanBlockProducersTracker {
return &spanBlockProducersTracker{
logger: logger,
borConfig: borConfig,
store: store,
newSpans: make(chan *Span),
idleSignal: make(chan struct{}),
logger: logger,
calculateSprintNumber: calculateSprintNumber,
store: store,
newSpans: make(chan *Span),
idleSignal: make(chan struct{}),
}
}

type spanBlockProducersTracker struct {
logger log.Logger
borConfig *borcfg.BorConfig
store EntityStore[*SpanBlockProducerSelection]
newSpans chan *Span
queued atomic.Int32
idleSignal chan struct{}
logger log.Logger
calculateSprintNumber CalculateSprintNumberFunc
store EntityStore[*SpanBlockProducerSelection]
newSpans chan *Span
queued atomic.Int32
idleSignal chan struct{}
}

func (t *spanBlockProducersTracker) Run(ctx context.Context) error {
Expand Down Expand Up @@ -141,8 +142,8 @@ func (t *spanBlockProducersTracker) ObserveSpan(ctx context.Context, newSpan *Sp
return err
}

spanStartSprintNum := t.borConfig.CalculateSprintNumber(lastProducerSelection.StartBlock)
spanEndSprintNum := t.borConfig.CalculateSprintNumber(lastProducerSelection.EndBlock)
spanStartSprintNum := t.calculateSprintNumber(lastProducerSelection.StartBlock)
spanEndSprintNum := t.calculateSprintNumber(lastProducerSelection.EndBlock)
increments := int(spanEndSprintNum - spanStartSprintNum)
if increments > 0 {
producers.IncrementProposerPriority(increments)
Expand Down Expand Up @@ -181,8 +182,8 @@ func (t *spanBlockProducersTracker) Producers(ctx context.Context, blockNum uint
return nil, err
}

spanStartSprintNum := t.borConfig.CalculateSprintNumber(producerSelection.StartBlock)
currentSprintNum := t.borConfig.CalculateSprintNumber(blockNum)
spanStartSprintNum := t.calculateSprintNumber(producerSelection.StartBlock)
currentSprintNum := t.calculateSprintNumber(blockNum)
increments := int(currentSprintNum - spanStartSprintNum)
if increments > 0 {
producers.IncrementProposerPriority(increments)
Expand Down

0 comments on commit 1158ad1

Please sign in to comment.