Skip to content

Commit

Permalink
updateForkChoice: send state changes to txpool after tx is committed (#…
Browse files Browse the repository at this point in the history
…8035)

[txpool](https://github.com/ledgerwatch/erigon-lib/blob/main/txpool/pool.go)
expects an `OnNewBlock` update only after the DB transaction is
committed.

This fixes, for example, a nonce gap mis-detection in Hive test
"engine-cancun/Blob Transactions On Block 1, Cancun Genesis".
  • Loading branch information
yperbasis authored Aug 17, 2023
1 parent f82bc1d commit 611145c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 25 deletions.
6 changes: 3 additions & 3 deletions turbo/engineapi/engine_helpers/fork_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (fv *ForkValidator) ExtendingForkHeadHash() libcommon.Hash {
return fv.extendingForkHeadHash
}

// NotifyCurrentHeight is to be called at the end of the stage cycle and repressent the last processed block.
// NotifyCurrentHeight is to be called at the end of the stage cycle and represent the last processed block.
func (fv *ForkValidator) NotifyCurrentHeight(currentHeight uint64) {
fv.lock.Lock()
defer fv.lock.Unlock()
Expand Down Expand Up @@ -229,15 +229,15 @@ func (fv *ForkValidator) ValidatePayload(tx kv.Tx, header *types.Header, body *t
}

// Clear wipes out current extending fork data, this method is called after fcu is called,
// because fcu decides what the head is and after the call is done all the non-chosed forks are
// because fcu decides what the head is and after the call is done all the non-chosen forks are
// to be considered obsolete.
func (fv *ForkValidator) clear() {
fv.extendingForkHeadHash = libcommon.Hash{}
fv.extendingForkNumber = 0
fv.memoryDiff = nil
}

// Clear wipes out current extending fork data and notify txpool.
// Clear wipes out current extending fork data.
func (fv *ForkValidator) ClearWithUnwind(tx kv.RwTx, accumulator *shards.Accumulator, c shards.StateChangeConsumer) {
fv.lock.Lock()
defer fv.lock.Unlock()
Expand Down
20 changes: 11 additions & 9 deletions turbo/execution/eth1/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (e *EthereumExecutionModule) UpdateForkChoice(ctx context.Context, req *exe

select {
case <-fcuTimer.C:
e.logger.Debug("treating forkChoiceUpdated as asyncronous as it is taking too long")
e.logger.Debug("treating forkChoiceUpdated as asynchronous as it is taking too long")
return &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
Status: execution.ExecutionStatus_Busy,
Expand Down Expand Up @@ -134,7 +134,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHas
}

if canonicalHash == blockHash {
// if block hash is part of the canononical chain treat it as no-op.
// if block hash is part of the canonical chain treat it as no-op.
writeForkChoiceHashes(tx, blockHash, safeHash, finalizedHash)
valid, err := e.verifyForkchoiceHashes(ctx, tx, blockHash, finalizedHash, safeHash)
if err != nil {
Expand All @@ -155,7 +155,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHas
return
}

// If we dont have it, too bad
// If we don't have it, too bad
if fcuHeader == nil {
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &execution.ForkChoiceReceipt{
LatestValidHash: gointerfaces.ConvertHashToH256(libcommon.Hash{}),
Expand Down Expand Up @@ -278,12 +278,6 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHas
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if e.hook != nil {
if err = e.hook.AfterRun(tx, finishProgressBefore); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
}
// if head hash was set then success otherwise no
headHash := rawdb.ReadHeadBlockHash(tx)
headNumber := rawdb.ReadHeaderNumber(tx, headHash)
Expand Down Expand Up @@ -313,6 +307,14 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, blockHas
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
if e.hook != nil {
if err := e.db.View(ctx, func(tx kv.Tx) error {
return e.hook.AfterRun(tx, finishProgressBefore)
}); err != nil {
sendForkchoiceErrorWithoutWaiting(outcomeCh, err)
return
}
}
if log {
e.logger.Info("head updated", "hash", headHash, "number", *headNumber)
}
Expand Down
23 changes: 10 additions & 13 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,17 @@ func StageLoopIteration(ctx context.Context, db kv.RwDB, tx kv.RwTx, sync *stage
}

// -- send notifications START
if externalTx {
if hook != nil {
if hook != nil {
if externalTx {
if err = hook.AfterRun(tx, finishProgressBefore); err != nil {
return err
}
}
} else {
if err := db.View(ctx, func(tx kv.Tx) error {
if hook != nil {
if err = hook.AfterRun(tx, finishProgressBefore); err != nil {
return err
}
} else {
if err := db.View(ctx, func(tx kv.Tx) error {
return hook.AfterRun(tx, finishProgressBefore)
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
}
if canRunCycleInOneTransaction && !externalTx && commitTime > 500*time.Millisecond {
Expand Down Expand Up @@ -273,7 +268,9 @@ func (h *Hook) AfterRun(tx kv.Tx, finishProgressBefore uint64) error {
if plainStateVersion, err = rawdb.GetStateVersion(tx); err != nil {
return err
}
notifications.Accumulator.SetStateID(plainStateVersion)
if notifications != nil && notifications.Accumulator != nil {
notifications.Accumulator.SetStateID(plainStateVersion)
}

if headTd != nil && headHeader != nil {
headTd256, overflow := uint256.FromBig(headTd)
Expand Down

0 comments on commit 611145c

Please sign in to comment.