Skip to content

Commit 7deeaee

Browse files
committed
core: refactor function reorg (ethereum#23761, ethereum#24616, ethereum#24996, ethereum#30600)
1 parent 97879c0 commit 7deeaee

File tree

2 files changed

+177
-88
lines changed

2 files changed

+177
-88
lines changed

core/blockchain.go

Lines changed: 159 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ var (
8484

8585
errInsertionInterrupted = errors.New("insertion is interrupted")
8686
errChainStopped = errors.New("blockchain is stopped")
87+
errInvalidOldChain = errors.New("invalid old chain")
88+
errInvalidNewChain = errors.New("invalid new chain")
8789

8890
CheckpointCh = make(chan int)
8991
)
@@ -1476,7 +1478,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
14761478
if reorg {
14771479
// Reorganise the chain if the parent is not the head block
14781480
if block.ParentHash() != currentBlock.Hash() {
1479-
if err := bc.reorg(currentBlock, block); err != nil {
1481+
if err := bc.reorg(currentBlock.Header(), block.Header()); err != nil {
14801482
return NonStatTy, err
14811483
}
14821484
}
@@ -1491,9 +1493,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
14911493
bc.writeHeadBlock(block, false)
14921494
// prepare set of masternodes for the next epoch
14931495
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
1494-
err := bc.UpdateM1()
1495-
if err != nil {
1496-
log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNum", block.NumberU64())
1496+
if err := bc.UpdateM1(); err != nil {
1497+
log.Crit("Fail to update masternodes during writeBlockWithState", "number", block.Number, "hash", block.Hash().Hex(), "err", err)
14971498
}
14981499
}
14991500
}
@@ -2275,153 +2276,223 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
22752276
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
22762277
// blocks and inserts them to be part of the new canonical chain and accumulates
22772278
// potential missing transactions and post an event about them.
2278-
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
2279+
func (bc *BlockChain) reorg(oldHead, newHead *types.Header) error {
2280+
log.Warn("Reorg", "OldHash", oldHead.Hash().Hex(), "OldNum", oldHead.Number, "NewHash", newHead.Hash().Hex(), "NewNum", newHead.Number)
2281+
22792282
var (
2280-
newChain types.Blocks
2281-
oldChain types.Blocks
2282-
commonBlock *types.Block
2283-
deletedTxs types.Transactions
2284-
addedTxs types.Transactions
2285-
deletedLogs []*types.Log
2283+
newChain []*types.Header
2284+
oldChain []*types.Header
2285+
commonBlock *types.Header
22862286
)
2287-
log.Warn("Reorg", "oldBlock hash", oldBlock.Hash().Hex(), "number", oldBlock.NumberU64(), "newBlock hash", newBlock.Hash().Hex(), "number", newBlock.NumberU64())
2288-
2289-
// first reduce whoever is higher bound
2290-
if oldBlock.NumberU64() > newBlock.NumberU64() {
2291-
// reduce old chain
2292-
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
2293-
oldChain = append(oldChain, oldBlock)
2294-
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
2295-
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
2296-
deletedLogs = append(deletedLogs, logs...)
2297-
}
2287+
2288+
// Reduce the longer chain to the same number as the shorter one
2289+
if oldHead.Number.Uint64() > newHead.Number.Uint64() {
2290+
// Old chain is longer, gather all transactions and logs as deleted ones
2291+
for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) {
2292+
oldChain = append(oldChain, oldHead)
22982293
}
22992294
} else {
2300-
// reduce new chain and append new chain blocks for inserting later on
2301-
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
2302-
newChain = append(newChain, newBlock)
2295+
// New chain is longer, stash all blocks away for subsequent insertion
2296+
for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) {
2297+
newChain = append(newChain, newHead)
23032298
}
23042299
}
2305-
if oldBlock == nil {
2306-
return errors.New("invalid old chain")
2300+
if oldHead == nil {
2301+
return errInvalidOldChain
23072302
}
2308-
if newBlock == nil {
2309-
return errors.New("invalid new chain")
2303+
if newHead == nil {
2304+
return errInvalidNewChain
23102305
}
23112306

2307+
// Both sides of the reorg are at the same number, reduce both until the common
2308+
// ancestor is found
23122309
for {
2313-
if oldBlock.Hash() == newBlock.Hash() {
2314-
commonBlock = oldBlock
2310+
// If the common ancestor was found, bail out
2311+
if oldHead.Hash() == newHead.Hash() {
2312+
commonBlock = oldHead
23152313
break
23162314
}
2315+
// Remove an old block as well as stash away a new block
2316+
oldChain = append(oldChain, oldHead)
2317+
newChain = append(newChain, newHead)
23172318

2318-
oldChain = append(oldChain, oldBlock)
2319-
newChain = append(newChain, newBlock)
2320-
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
2321-
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
2322-
deletedLogs = append(deletedLogs, logs...)
2319+
// Step back with both chains
2320+
oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1)
2321+
if oldHead == nil {
2322+
return errInvalidOldChain
23232323
}
2324-
2325-
oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
2326-
if oldBlock == nil {
2327-
return errors.New("invalid old chain")
2328-
}
2329-
if newBlock == nil {
2330-
return errors.New("invalid new chain")
2324+
newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1)
2325+
if newHead == nil {
2326+
return errInvalidNewChain
23312327
}
23322328
}
23332329

23342330
// Ensure XDPoS engine committed block will be not reverted
23352331
if xdpos, ok := bc.Engine().(*XDPoS.XDPoS); ok {
23362332
latestCommittedBlock := xdpos.EngineV2.GetLatestCommittedBlockInfo()
23372333
if latestCommittedBlock != nil {
2338-
currentBlock := bc.CurrentBlock()
2339-
currentBlock.Number().Cmp(latestCommittedBlock.Number)
2340-
cmp := commonBlock.Number().Cmp(latestCommittedBlock.Number)
2334+
cmp := commonBlock.Number.Cmp(latestCommittedBlock.Number)
23412335
if cmp < 0 {
23422336
for _, oldBlock := range oldChain {
2343-
if oldBlock.Number().Cmp(latestCommittedBlock.Number) == 0 {
2337+
if oldBlock.Number.Cmp(latestCommittedBlock.Number) == 0 {
23442338
if oldBlock.Hash() != latestCommittedBlock.Hash {
2345-
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "committed hash", latestCommittedBlock.Hash)
2339+
log.Error("Impossible reorg, please file an issue", "OldNum", oldBlock.Number, "OldHash", oldBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex())
23462340
} else {
2347-
log.Warn("Stop reorg, blockchain is under forking attack", "old committed num", oldBlock.Number(), "old committed hash", oldBlock.Hash())
2348-
return fmt.Errorf("stop reorg, blockchain is under forking attack. old committed num %d, hash %x", oldBlock.Number(), oldBlock.Hash())
2341+
log.Warn("Stop reorg, blockchain is under forking attack", "OldCommittedNum", oldBlock.Number, "OldCommittedHash", oldBlock.Hash().Hex())
2342+
return fmt.Errorf("stop reorg, blockchain is under forking attack. OldCommitted num %d, hash %s", oldBlock.Number, oldBlock.Hash().Hex())
23492343
}
23502344
}
23512345
}
23522346
} else if cmp == 0 {
23532347
if commonBlock.Hash() != latestCommittedBlock.Hash {
2354-
log.Error("Impossible reorg, please file an issue", "oldnum", commonBlock.Number(), "oldhash", commonBlock.Hash(), "committed hash", latestCommittedBlock.Hash)
2348+
log.Error("Impossible reorg, please file an issue", "OldNum", commonBlock.Number.Uint64(), "OldHash", commonBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex())
23552349
}
23562350
}
23572351
}
23582352
}
23592353

23602354
// Ensure the user sees large reorgs
23612355
if len(oldChain) > 0 && len(newChain) > 0 {
2362-
logFn := log.Warn
2356+
logFn := log.Info
2357+
msg := "Chain reorg detected"
23632358
if len(oldChain) > 63 {
2359+
msg = "Large chain reorg detected"
23642360
logFn = log.Warn
23652361
}
2366-
logFn("Chain split detected", "number", commonBlock.Number(), "hash", commonBlock.Hash(),
2367-
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
2362+
logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash().Hex(),
2363+
"drop", len(oldChain), "dropfrom", oldChain[0].Hash().Hex(), "add", len(newChain), "addfrom", newChain[0].Hash().Hex())
23682364
blockReorgAddMeter.Mark(int64(len(newChain)))
23692365
blockReorgDropMeter.Mark(int64(len(oldChain)))
23702366
blockReorgMeter.Mark(1)
2367+
} else if len(newChain) > 0 {
2368+
// Special case happens in the post merge stage that current head is
2369+
// the ancestor of new head while these two blocks are not consecutive
2370+
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
2371+
blockReorgAddMeter.Mark(int64(len(newChain)))
23712372
} else {
2372-
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
2373+
// len(newChain) == 0 && len(oldChain) > 0
2374+
// rewind the canonical chain to a lower point.
2375+
log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain))
23732376
}
23742377

2375-
// Insert the new chain(except the head block(reverse order)),
2376-
// taking care of the proper incremental order.
2377-
for i := len(newChain) - 1; i >= 0; i-- {
2378-
// insert the block in the canonical way, re-writing history
2379-
bc.writeHeadBlock(newChain[i], true)
2378+
// Acquire the tx-lookup lock before mutation. This step is essential
2379+
// as the txlookups should be changed atomically, and all subsequent
2380+
// reads should be blocked until the mutation is complete.
2381+
// bc.txLookupLock.Lock()
2382+
2383+
// Reorg can be executed, start reducing the chain's old blocks and appending
2384+
// the new blocks
2385+
var (
2386+
deletedTxs []common.Hash
2387+
rebirthTxs []common.Hash
2388+
2389+
deletedLogs []*types.Log
2390+
rebirthLogs []*types.Log
2391+
)
2392+
2393+
// Deleted log emission on the API uses forward order, which is borked, but
2394+
// we'll leave it in for legacy reasons.
2395+
//
2396+
// TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs?
2397+
{
2398+
for i := len(oldChain) - 1; i >= 0; i-- {
2399+
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
2400+
if block == nil {
2401+
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
2402+
}
2403+
if logs := bc.collectLogs(block, true); len(logs) > 0 {
2404+
deletedLogs = append(deletedLogs, logs...)
2405+
}
2406+
if len(deletedLogs) > 512 {
2407+
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2408+
deletedLogs = nil
2409+
}
2410+
// TODO(daniel): remove chainSideFeed, reference PR #30601
2411+
// Also send event for blocks removed from the canon chain.
2412+
// bc.chainSideFeed.Send(ChainSideEvent{Block: block})
2413+
}
2414+
if len(deletedLogs) > 0 {
2415+
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2416+
}
2417+
}
23802418

2381-
// Collect the new added transactions.
2382-
addedTxs = append(addedTxs, newChain[i].Transactions()...)
2419+
// Undo old blocks in reverse order
2420+
for i := 0; i < len(oldChain); i++ {
2421+
// Collect all the deleted transactions
2422+
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
2423+
if block == nil {
2424+
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
2425+
}
2426+
for _, tx := range block.Transactions() {
2427+
deletedTxs = append(deletedTxs, tx.Hash())
2428+
}
2429+
// Collect deleted logs and emit them for new integrations
2430+
// if logs := bc.collectLogs(block, true); len(logs) > 0 {
2431+
// slices.Reverse(logs) // Emit revertals latest first, older then
2432+
// }
2433+
}
23832434

2435+
// Apply new blocks in forward order
2436+
for i := len(newChain) - 1; i >= 0; i-- {
2437+
// Collect all the included transactions
2438+
block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64())
2439+
if block == nil {
2440+
return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics
2441+
}
2442+
for _, tx := range block.Transactions() {
2443+
rebirthTxs = append(rebirthTxs, tx.Hash())
2444+
}
2445+
// Collect inserted logs and emit them
2446+
if logs := bc.collectLogs(block, false); len(logs) > 0 {
2447+
rebirthLogs = append(rebirthLogs, logs...)
2448+
}
2449+
if len(rebirthLogs) > 512 {
2450+
bc.logsFeed.Send(rebirthLogs)
2451+
rebirthLogs = nil
2452+
}
2453+
// Update the head block
2454+
bc.writeHeadBlock(block, true)
23842455
// prepare set of masternodes for the next epoch
2385-
if bc.chainConfig.XDPoS != nil && ((newChain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
2386-
err := bc.UpdateM1()
2387-
if err != nil {
2388-
log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNumber", newChain[i].NumberU64())
2456+
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
2457+
if err := bc.UpdateM1(); err != nil {
2458+
log.Crit("Fail to update masternodes during reorg", "number", block.Number, "hash", block.Hash().Hex(), "err", err)
23892459
}
23902460
}
23912461
}
2462+
if len(rebirthLogs) > 0 {
2463+
bc.logsFeed.Send(rebirthLogs)
2464+
}
23922465

23932466
// Delete useless indexes right now which includes the non-canonical
23942467
// transaction indexes, canonical chain indexes which above the head.
2395-
indexesBatch := bc.db.NewBatch()
2396-
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
2397-
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
2468+
batch := bc.db.NewBatch()
2469+
for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) {
2470+
rawdb.DeleteTxLookupEntry(batch, tx)
2471+
}
2472+
// Delete all hash markers that are not part of the new canonical chain.
2473+
// Because the reorg function handles new chain head, all hash
2474+
// markers greater than new chain head should be deleted.
2475+
number := commonBlock.Number
2476+
if len(newChain) > 0 {
2477+
number = newChain[0].Number
23982478
}
2399-
// Delete any canonical number assignments above the new head
2400-
number := bc.CurrentBlock().NumberU64()
2401-
for i := number + 1; ; i++ {
2479+
for i := number.Uint64() + 1; ; i++ {
24022480
hash := rawdb.ReadCanonicalHash(bc.db, i)
24032481
if hash == (common.Hash{}) {
24042482
break
24052483
}
2406-
rawdb.DeleteCanonicalHash(indexesBatch, i)
2484+
rawdb.DeleteCanonicalHash(batch, i)
24072485
}
2408-
if err := indexesBatch.Write(); err != nil {
2486+
if err := batch.Write(); err != nil {
24092487
log.Crit("Failed to delete useless indexes", "err", err)
24102488
}
2411-
// If any logs need to be fired, do it now. In theory we could avoid creating
2412-
// this goroutine if there are no events to fire, but realistcally that only
2413-
// ever happens if we're reorging empty blocks, which will only happen on idle
2414-
// networks where performance is not an issue either way.
2415-
if len(deletedLogs) > 0 {
2416-
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2417-
}
2418-
if len(oldChain) > 0 {
2419-
go func() {
2420-
for i := len(oldChain) - 1; i >= 0; i-- {
2421-
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
2422-
}
2423-
}()
2424-
}
2489+
2490+
// Reset the tx lookup cache to clear stale txlookup cache.
2491+
// bc.txLookupCache.Purge()
2492+
2493+
// Release the tx-lookup lock after mutation.
2494+
// bc.txLookupLock.Unlock()
2495+
24252496
return nil
24262497
}
24272498

core/types/transaction.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -678,6 +678,24 @@ func TxDifference(a, b Transactions) (keep Transactions) {
678678
return keep
679679
}
680680

681+
// HashDifference returns a new set of hashes that are present in a but not in b.
682+
func HashDifference(a, b []common.Hash) []common.Hash {
683+
keep := make([]common.Hash, 0, len(a))
684+
685+
remove := make(map[common.Hash]struct{})
686+
for _, hash := range b {
687+
remove[hash] = struct{}{}
688+
}
689+
690+
for _, hash := range a {
691+
if _, ok := remove[hash]; !ok {
692+
keep = append(keep, hash)
693+
}
694+
}
695+
696+
return keep
697+
}
698+
681699
// TxByNonce implements the sort interface to allow sorting a list of transactions
682700
// by their nonces. This is usually only useful for sorting transactions from a
683701
// single account, otherwise a nonce comparison doesn't make much sense.

0 commit comments

Comments
 (0)