Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove acceptor queue (part 1) #1334

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 34 additions & 163 deletions core/blockchain.go
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need to store the latest tip with WriteAcceptorTip? I mean stopping updating the stored value (not to completely remove it).

Copy link
Collaborator

@ceyonur ceyonur Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken acceptorTip can only be =< to lastAccepted. So if we decide not to update this we might end up always reprocessing the state.

I wonder after removing the acceptor queue we will ever need to reprocess the state. Maybe we can keep the check bc.HasState(current.Root()).

I also don't fully understand how that reprocessState works. If acceptorTip =< lastAccepted then from what I see it would go back from acceptorTip since we do this current = bc.GetBlockByHash(acceptorTip). what happens to the state between acceptorTip and lastAccepted then? I might be completely wrong though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is correct that acceptorTip <= lastAccepted or that acceptorTip == common.Hash{}
reprocessState is kind of confusing:

  • If acceptorTip < lastAccepted, execution will start at acceptorTip.
  • Next, we find the block with state present on disk so we can start re-executing (this can be less than acceptorTip/lastAccepted since we only persist roots to disk once per commit interval in pruning mode)
  • When the acceptorTip is reached, from that point we will start processing the snapshot along with the state & also write the tx accepted indexes.
  • We continue up to lastAccepted.

We can stop updating it once it matches lastAccepted, but I would prefer to keep that to the next PR so we get it right. Let me know if you prefer to include stop updating it in this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no I think it's fine if we remove them altogether in a safer way.

Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ var (
blockValidationTimer = metrics.NewRegisteredCounter("chain/block/validations/state", nil)
blockWriteTimer = metrics.NewRegisteredCounter("chain/block/writes", nil)

acceptorQueueGauge = metrics.NewRegisteredGauge("chain/acceptor/queue/size", nil)
acceptorWorkTimer = metrics.NewRegisteredCounter("chain/acceptor/work", nil)
acceptorWorkCount = metrics.NewRegisteredCounter("chain/acceptor/work/count", nil)
lastAcceptedBlockBaseFeeGauge = metrics.NewRegisteredGauge("chain/block/fee/basefee", nil)
Expand Down Expand Up @@ -175,7 +174,6 @@ type CacheConfig struct {
TriePrefetcherParallelism int // Max concurrent disk reads trie prefetcher should perform at once
CommitInterval uint64 // Commit the trie every [CommitInterval] blocks.
Pruning bool // Whether to disable trie write caching and GC altogether (archive node)
AcceptorQueueLimit int // Blocks to queue before blocking during acceptance
PopulateMissingTries *uint64 // If non-nil, sets the starting height for re-generating historical tries.
PopulateMissingTriesParallelism int // Number of readers to use when trying to populate missing tries.
AllowMissingTries bool // Whether to allow an archive node to run with pruning enabled
Expand Down Expand Up @@ -221,7 +219,6 @@ var DefaultCacheConfig = &CacheConfig{
TriePrefetcherParallelism: 16,
Pruning: true,
CommitInterval: 4096,
AcceptorQueueLimit: 64, // Provides 2 minutes of buffer (2s block target) for a commit delay
SnapshotLimit: 256,
AcceptedCacheSize: 32,
StateScheme: rawdb.HashScheme,
Expand Down Expand Up @@ -305,25 +302,6 @@ type BlockChain struct {

senderCacher *TxSenderCacher

// [acceptorQueue] is a processing queue for the Acceptor. This is
// different than [chainAcceptedFeed], which is sent an event after an accepted
// block is processed (after each loop of the accepted worker). If there is a
// clean shutdown, all items inserted into the [acceptorQueue] will be processed.
acceptorQueue chan *types.Block

// [acceptorClosingLock], and [acceptorClosed] are used
// to synchronize the closing of the [acceptorQueue] channel.
//
// Because we can't check if a channel is closed without reading from it
// (which we don't want to do as we may remove a processing block), we need
// to use a second variable to ensure we don't close a closed channel.
acceptorClosingLock sync.RWMutex
acceptorClosed bool

// [acceptorWg] is used to wait for the acceptorQueue to clear. This is used
// during shutdown and in tests.
acceptorWg sync.WaitGroup

// [wg] is used to wait for the async blockchain processes to finish on shutdown.
wg sync.WaitGroup

Expand All @@ -332,16 +310,6 @@ type BlockChain struct {
// WaitGroups are used to ensure that async processes have finished during shutdown.
quit chan struct{}

// [acceptorTip] is the last block processed by the acceptor. This is
// returned as the LastAcceptedBlock() to ensure clients get only fully
// processed blocks. This may be equal to [lastAccepted].
acceptorTip *types.Block
acceptorTipLock sync.Mutex

// [flattenLock] prevents the [acceptor] from flattening snapshots while
// a block is being verified.
flattenLock sync.Mutex

// [acceptedLogsCache] stores recently accepted logs to improve the performance of eth_getLogs.
acceptedLogsCache FIFOCache[common.Hash, [][]*types.Log]

Expand Down Expand Up @@ -395,7 +363,6 @@ func NewBlockChain(
engine: engine,
vmConfig: vmConfig,
senderCacher: NewTxSenderCacher(runtime.NumCPU()),
acceptorQueue: make(chan *types.Block, cacheConfig.AcceptorQueueLimit),
quit: make(chan struct{}),
acceptedLogsCache: NewFIFOCache[common.Hash, [][]*types.Log](cacheConfig.AcceptedCacheSize),
}
Expand All @@ -422,13 +389,6 @@ func NewBlockChain(
return nil, err
}

// After loading the last state (and reprocessing if necessary), we are
// guaranteed that [acceptorTip] is equal to [lastAccepted].
//
// It is critical to update this vaue before performing any state repairs so
// that all accepted blocks can be considered.
bc.acceptorTip = bc.lastAccepted

// Make sure the state associated with the block is available
head := bc.CurrentBlock()
if !bc.HasState(head.Root) {
Expand Down Expand Up @@ -461,10 +421,6 @@ func NewBlockChain(
latestStateSynced := rawdb.GetLatestSyncPerformed(bc.db)
bc.repairTxIndexTail(latestStateSynced)
}

// Start processing accepted blocks effects in the background
go bc.startAcceptor()

// Start tx indexer if it's enabled.
if bc.cacheConfig.TransactionHistory != 0 {
bc.txIndexer = newTxIndexer(bc.cacheConfig.TransactionHistory, bc)
Expand Down Expand Up @@ -513,12 +469,6 @@ func (bc *BlockChain) flattenSnapshot(postAbortWork func() error, hash common.Ha
return err
}

// Ensure we avoid flattening the snapshot while we are processing a block, or
// block execution will fallback to reading from the trie (which is much
// slower).
bc.flattenLock.Lock()
defer bc.flattenLock.Unlock()

// Flatten the entire snap Trie to disk
//
// Note: This resumes snapshot generation.
Expand Down Expand Up @@ -562,110 +512,41 @@ func (bc *BlockChain) warmAcceptedCaches() {
log.Info("Warmed accepted caches", "start", startIndex, "end", lastAccepted, "t", time.Since(startTime))
}

// startAcceptor starts processing items on the [acceptorQueue]. If a [nil]
// object is placed on the [acceptorQueue], the [startAcceptor] will exit.
func (bc *BlockChain) startAcceptor() {
log.Info("Starting Acceptor", "queue length", bc.cacheConfig.AcceptorQueueLimit)

for next := range bc.acceptorQueue {
start := time.Now()
acceptorQueueGauge.Dec(1)

if err := bc.flattenSnapshot(func() error {
return bc.stateManager.AcceptTrie(next)
}, next.Hash()); err != nil {
log.Crit("unable to flatten snapshot from acceptor", "blockHash", next.Hash(), "err", err)
}

// Update last processed and transaction lookup index
if err := bc.writeBlockAcceptedIndices(next); err != nil {
log.Crit("failed to write accepted block effects", "err", err)
}

// Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content
bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header())
logs := bc.collectUnflattenedLogs(next, false)
bc.acceptedLogsCache.Put(next.Hash(), logs)

// Update the acceptor tip before sending events to ensure that any client acting based off of
// the events observes the updated acceptorTip on subsequent requests
bc.acceptorTipLock.Lock()
bc.acceptorTip = next
bc.acceptorTipLock.Unlock()

// Update accepted feeds
flattenedLogs := types.FlattenLogs(logs)
bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs})
if len(flattenedLogs) > 0 {
bc.logsAcceptedFeed.Send(flattenedLogs)
}
if len(next.Transactions()) != 0 {
bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()})
}

bc.acceptorWg.Done()
// accept processes a block that has been verified and updates the snapshot
// and indexes.
func (bc *BlockChain) accept(next *types.Block) error {
start := time.Now()

acceptorWorkTimer.Inc(time.Since(start).Milliseconds())
acceptorWorkCount.Inc(1)
// Note: in contrast to most accepted metrics, we increment the accepted log metrics in the acceptor queue because
// the logs are already processed in the acceptor queue.
acceptedLogsCounter.Inc(int64(len(logs)))
if err := bc.flattenSnapshot(func() error {
return bc.stateManager.AcceptTrie(next)
}, next.Hash()); err != nil {
log.Crit("unable to flatten snapshot from acceptor", "blockHash", next.Hash(), "err", err)
}
}

// addAcceptorQueue adds a new *types.Block to the [acceptorQueue]. This will
// block if there are [AcceptorQueueLimit] items in [acceptorQueue].
func (bc *BlockChain) addAcceptorQueue(b *types.Block) {
// We only acquire a read lock here because it is ok to add items to the
// [acceptorQueue] concurrently.
bc.acceptorClosingLock.RLock()
defer bc.acceptorClosingLock.RUnlock()

if bc.acceptorClosed {
return
// Update last processed and transaction lookup index
if err := bc.writeBlockAcceptedIndices(next); err != nil {
log.Crit("failed to write accepted block effects", "err", err)
Copy link
Collaborator

@ceyonur ceyonur Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return errs here instead of just log them? I think we should still keep the logs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is correct to return the error (I will use fmt.Errorf)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Crit uses os.Exit so it seems it should be fine, no?

}

acceptorQueueGauge.Inc(1)
bc.acceptorWg.Add(1)
bc.acceptorQueue <- b
}

// DrainAcceptorQueue blocks until all items in [acceptorQueue] have been
// processed.
func (bc *BlockChain) DrainAcceptorQueue() {
bc.acceptorClosingLock.RLock()
defer bc.acceptorClosingLock.RUnlock()
// Ensure [hc.acceptedNumberCache] and [acceptedLogsCache] have latest content
bc.hc.acceptedNumberCache.Put(next.NumberU64(), next.Header())
logs := bc.collectUnflattenedLogs(next, false)
bc.acceptedLogsCache.Put(next.Hash(), logs)

if bc.acceptorClosed {
return
// Update accepted feeds
flattenedLogs := types.FlattenLogs(logs)
bc.chainAcceptedFeed.Send(ChainEvent{Block: next, Hash: next.Hash(), Logs: flattenedLogs})
if len(flattenedLogs) > 0 {
bc.logsAcceptedFeed.Send(flattenedLogs)
}

bc.acceptorWg.Wait()
}

// stopAcceptor sends a signal to the Acceptor to stop processing accepted
// blocks. The Acceptor will exit once all items in [acceptorQueue] have been
// processed.
func (bc *BlockChain) stopAcceptor() {
bc.acceptorClosingLock.Lock()
defer bc.acceptorClosingLock.Unlock()

// If [acceptorClosed] is already false, we should just return here instead
// of attempting to close [acceptorQueue] more than once (will cause
// a panic).
//
// This typically happens when a test calls [stopAcceptor] directly (prior to
// shutdown) and then [stopAcceptor] is called again in shutdown.
if bc.acceptorClosed {
return
if len(next.Transactions()) != 0 {
bc.txAcceptedFeed.Send(NewTxsEvent{next.Transactions()})
}

// Although nothing should be added to [acceptorQueue] after
// [acceptorClosed] is updated, we close the channel so the Acceptor
// goroutine exits.
bc.acceptorWg.Wait()
bc.acceptorClosed = true
close(bc.acceptorQueue)
acceptorWorkTimer.Inc(time.Since(start).Milliseconds())
acceptorWorkCount.Inc(1)
acceptedLogsCounter.Inc(int64(len(logs)))
return nil
}

func (bc *BlockChain) InitializeSnapshots() {
Expand Down Expand Up @@ -817,9 +698,6 @@ func (bc *BlockChain) writeHeadBlock(block *types.Block) {

// ValidateCanonicalChain confirms a canonical chain is well-formed.
func (bc *BlockChain) ValidateCanonicalChain() error {
// Ensure all accepted blocks are fully processed
bc.DrainAcceptorQueue()

current := bc.CurrentBlock()
i := 0
log.Info("Beginning to validate canonical chain", "startBlock", current.Number)
Expand Down Expand Up @@ -936,11 +814,6 @@ func (bc *BlockChain) stopWithoutSaving() {

log.Info("Closing quit channel")
close(bc.quit)
// Wait for accepted feed to process all remaining items
log.Info("Stopping Acceptor")
start := time.Now()
bc.stopAcceptor()
log.Info("Acceptor queue drained", "t", time.Since(start))

// Stop senderCacher's goroutines
log.Info("Shutting down sender cacher")
Expand Down Expand Up @@ -1041,10 +914,10 @@ func (bc *BlockChain) LastConsensusAcceptedBlock() *types.Block {
//
// Note: During initialization, [acceptorTip] is equal to [lastAccepted].
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should comment change here?

func (bc *BlockChain) LastAcceptedBlock() *types.Block {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove "LastConsensusAcceptedBlock"?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also I think we don't need to hold a lock everywhere we call this. seems we can just directly use the variable. should be careful with that though.

  • CleanBlockRootsAbcoveLastAccepted this function can be removed altogether
  • populateMissingTries
  • warmAcceptedCaches

Copy link
Collaborator Author

@darioush darioush Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CleanBlockRootsAboveLastAccepted is still used in offline pruning (to remove roots of non-accepted blocks), but I inlined the use of the variable in the 3 cases you mentioned

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CleanBlockRootsAboveLastAccepted is a bit weird since it refetches the last accepted everytime rather than trying to clean the same block root when called, so I feel there can be a race.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR here #1339

bc.acceptorTipLock.Lock()
defer bc.acceptorTipLock.Unlock()
bc.chainmu.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a RLock here?

defer bc.chainmu.Unlock()

return bc.acceptorTip
return bc.lastAccepted
}

// Accept sets a minimum height at which no reorg can pass. Additionally,
Expand Down Expand Up @@ -1077,9 +950,12 @@ func (bc *BlockChain) Accept(block *types.Block) error {
}
}

// Enqueue block in the acceptor
// Update the last accepted block
bc.lastAccepted = block
bc.addAcceptorQueue(block)
if err := bc.accept(block); err != nil {
return err
}

acceptedBlockGasUsedCounter.Inc(int64(block.GasUsed()))
acceptedTxsCounter.Inc(int64(len(block.Transactions())))
if baseFee := block.BaseFee(); baseFee != nil {
Expand Down Expand Up @@ -1365,10 +1241,6 @@ func (bc *BlockChain) insertBlock(block *types.Block, writes bool) error {

// Instantiate the statedb to use for processing transactions
//
// NOTE: Flattening a snapshot during block execution requires fetching state
// entries directly from the trie (much slower).
bc.flattenLock.Lock()
defer bc.flattenLock.Unlock()
statedb, err := state.New(parent.Root, bc.stateCache, bc.snaps)
if err != nil {
return err
Expand Down Expand Up @@ -2129,7 +2001,6 @@ func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {

// Update all in-memory chain markers
bc.lastAccepted = block
bc.acceptorTip = block
bc.currentBlock.Store(block.Header())
bc.hc.SetCurrentHeader(block.Header())

Expand Down
1 change: 0 additions & 1 deletion core/blockchain_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func TestAcceptedLogsSubscription(t *testing.T) {
err := chain.Accept(block)
require.NoError(err)
}
chain.DrainAcceptorQueue()

logs := <-logsCh
require.Len(logs, 1)
Expand Down
1 change: 0 additions & 1 deletion core/blockchain_repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,6 @@ func testRepairWithScheme(t *testing.T, tt *rewindTest, snapshots bool, scheme s
}
lastAcceptedHash = canonblocks[i].Hash()
}
chain.DrainAcceptorQueue()
}
}
if _, err := chain.InsertChain(canonblocks[tt.commitBlock:]); err != nil {
Expand Down
1 change: 0 additions & 1 deletion core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo
}
basic.lastAcceptedHash = blocks[i].Hash()
}
chain.DrainAcceptorQueue()

diskRoot, blockRoot := chain.snaps.DiskRoot(), blocks[point-1].Root()
if !bytes.Equal(diskRoot.Bytes(), blockRoot.Bytes()) {
Expand Down
Loading
Loading