Skip to content

Commit

Permalink
miner: define logic for BroadcastDelayBlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanBSC committed Aug 1, 2024
1 parent 2fa3a87 commit 44686b1
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
2 changes: 1 addition & 1 deletion miner/malicious_behaviour.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type MBConfig struct {
DoubleSign bool
// Skip block production for in-turn validators at a specified offset
SkipOffsetInturn *uint64 `toml:",omitempty"`
// Delay broadcasting mined blocks by a specified number of blocks
// Delay broadcasting mined blocks by a specified number of blocks, only for in turn validators
BroadcastDelayBlocks uint64
// Mining time (milliseconds) for the last block in every turn
LastBlockMiningTime uint64
Expand Down
71 changes: 69 additions & 2 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ type worker struct {
fullTaskHook func() // Method to call before pushing the full sealing task.
resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
recentMinedBlocks *lru.Cache

// Test purpose
delayedBlocksForBroadcast []*types.Block
delayedMu sync.RWMutex
}

func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(header *types.Header) bool, init bool) *worker {
Expand Down Expand Up @@ -302,6 +306,11 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
go worker.resultLoop()
go worker.taskLoop()

if worker.config.MB.BroadcastDelayBlocks > 0 {
worker.wg.Add(1)
go worker.delayBlocksBroadcastLoop()
}

// Submit first work to initialize pending state.
if init {
worker.startCh <- struct{}{}
Expand Down Expand Up @@ -663,6 +672,7 @@ func (w *worker) resultLoop() {
w.recentMinedBlocks.Add(block.NumberU64(), []common.Hash{block.ParentHash()})
}

inturn := w.inTurn()
// Commit block and state to database.
task.state.SetExpectedStateRoot(block.Root())
start := time.Now()
Expand All @@ -678,7 +688,7 @@ func (w *worker) resultLoop() {
writeBlockTimer.UpdateSince(start)
log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
"elapsed", common.PrettyDuration(time.Since(task.createdAt)))
w.mux.Post(core.NewMinedBlockEvent{Block: block})
w.postBlock(block, inturn)
if p, ok := w.engine.(*parlia.Parlia); ok {
if w.config.MB.DoubleSign {
shadowHeader := block.Header()
Expand All @@ -687,7 +697,7 @@ func (w *worker) resultLoop() {
shadowBlock := types.NewBlockWithHeader(shadowHeader).WithBody(block.Transactions(), block.Uncles()).WithWithdrawals(block.Withdrawals()).WithSidecars(block.Sidecars())
shadowBlock, err := p.AssembleSignature(shadowBlock)
if err == nil {
w.mux.Post(core.NewMinedBlockEvent{Block: shadowBlock})
w.postBlock(shadowBlock, inturn)
sealhash := w.engine.SealHash(shadowBlock.Header())
hash := shadowBlock.Hash()
log.Info("Successfully sealed new block", "number", shadowBlock.Number(), "sealhash", sealhash, "hash", hash,
Expand All @@ -704,6 +714,63 @@ func (w *worker) resultLoop() {
}
}

func (w *worker) postBlock(block *types.Block, inTurn bool) {
if w.config.MB.BroadcastDelayBlocks > 0 && inTurn {
w.delayedMu.Lock()
w.delayedBlocksForBroadcast = append(w.delayedBlocksForBroadcast, block)
w.delayedMu.Unlock()
} else {
w.mux.Post(core.NewMinedBlockEvent{Block: block})
}
}
func (w *worker) delayBlocksBroadcastLoop() {
defer w.wg.Done()

for {
if len(w.delayedBlocksForBroadcast) > 0 {
w.delayedMu.Lock()

currentBlock := w.chain.CurrentBlock()
currentBlockNum := currentBlock.Number.Uint64()

delayTime := (w.config.MB.BroadcastDelayBlocks - 1) * w.chainConfig.Parlia.Period
if p, ok := w.engine.(*parlia.Parlia); ok {
service := p.APIs(w.chain)[0].Service
latestBlockNumber := rpc.LatestBlockNumber
currentTurnLength, err := service.(*parlia.API).GetTurnLength(&latestBlockNumber)
nonInTurnBackoff := w.config.MB.BroadcastDelayBlocks
if err == nil {
if w.config.MB.BroadcastDelayBlocks > uint64(currentTurnLength) {
// suppose extra blocks are generated by in turn validators
nonInTurnBackoff = uint64(currentTurnLength)
}
}
delayTime += nonInTurnBackoff
}

firstBlock := w.delayedBlocksForBroadcast[0]
if uint64(time.Now().Unix()) >= (firstBlock.Time() + delayTime) {
time.Sleep(500 * time.Microsecond)
for _, block := range w.delayedBlocksForBroadcast {
w.mux.Post(core.NewMinedBlockEvent{Block: block})
log.Debug("delayBlocksBroadcastLoop", "number", block.Number(), "hash", block.Hash(),
"time", block.Time(), "now", uint64(time.Now().Unix()), "currentBlockNum", currentBlockNum)
}
w.delayedBlocksForBroadcast = make([]*types.Block, 0)
}

w.delayedMu.Unlock()
}

select {
case <-w.exitCh:
return
default:
time.Sleep(100 * time.Millisecond)
}
}
}

// makeEnv creates a new environment for the sealing block.
func (w *worker) makeEnv(parent *types.Header, header *types.Header, coinbase common.Address,
prevEnv *environment) (*environment, error) {
Expand Down

0 comments on commit 44686b1

Please sign in to comment.