From 7c374f174d56e7f805dccedf2820f34d5348b7ec Mon Sep 17 00:00:00 2001 From: Adam Tucker Date: Tue, 2 Jan 2024 22:50:11 -0700 Subject: [PATCH] implement app hash error channel --- blocksync/reactor.go | 31 ++++++++++++++++--------------- consensus/byzantine_test.go | 2 +- mempool/v1/reactor.go | 2 +- node/node.go | 5 +++++ p2p/base_reactor.go | 12 ++++++++---- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 004add37e7..bae362b5c9 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -73,9 +73,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS requestsCh := make(chan BlockRequest, maxTotalRequesters) - const capacity = 1000 // must be bigger than peers count - errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock - appHashErrorsCh := make(chan p2p.AppHashError, 5) + const capacity = 1000 // must be bigger than peers count + errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock + appHashErrorsCh := make(chan p2p.AppHashError) // create an unbuffered channel to stream appHash errors startHeight := store.Height() + 1 if startHeight == 1 { @@ -364,19 +364,20 @@ FOR_LOOP: err = bcR.blockExec.ValidateBlock(state, first) } - if strings.Contains(err.Error(), "wrong Block.Header.AppHash") { - bcR.appHashErrorsCh <- p2p.AppHashError{ - Err: err, - Height: uint64(first.Height), - } - } else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") { - bcR.appHashErrorsCh <- p2p.AppHashError{ - Err: err, - Height: uint64(first.Height - 1), + if err != nil { + // If this is an appHash or lastResultsHash error, also pass to the appHashError channel. + if strings.Contains(err.Error(), "wrong Block.Header.AppHash") { + bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{ + Err: err, + Height: uint64(first.Height), + } + } else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") { + bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{ + Err: err, + Height: uint64(first.Height - 1), + } } - } - if err != nil { bcR.Logger.Error("Error in validation", "err", err) peerID := bcR.pool.RedoRequest(first.Height) peer := bcR.Switch.Peers().Get(peerID) @@ -432,6 +433,6 @@ func (bcR *Reactor) BroadcastStatusRequest() { }) } -func (r *Reactor) AppHashErrorsCh() <-chan p2p.AppHashError { +func (r *Reactor) AppHashErrorsCh() chan p2p.AppHashError { return r.appHashErrorsCh } diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 5db3f42d78..cc1fa39dd4 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -605,4 +605,4 @@ func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) { func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer } -func (br *ByzantineReactor) AppHashErrorsCh() <-chan p2p.AppHashError { return nil } +func (br *ByzantineReactor) AppHashErrorsCh() chan p2p.AppHashError { return nil } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index 3a061ab495..3918d1bb79 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -282,6 +282,6 @@ func (m *TxsMessage) String() string { return fmt.Sprintf("[TxsMessage %v]", m.Txs) } -func (memR *Reactor) AppHashErrorsCh() <-chan p2p.AppHashError { +func (memR *Reactor) AppHashErrorsCh() chan p2p.AppHashError { return memR.appHashErrorsCh } diff --git a/node/node.go b/node/node.go index 4e5238d1da..ac3951b980 100644 --- a/node/node.go +++ b/node/node.go @@ -1255,6 +1255,11 @@ func (n *Node) ConsensusReactor() *cs.Reactor { return n.consensusReactor } +// BCReactor returns the Node's BlockchainReactor. +func (n *Node) BCReactor() p2p.Reactor { + return n.bcReactor +} + // MempoolReactor returns the Node's mempool reactor. func (n *Node) MempoolReactor() p2p.Reactor { return n.mempoolReactor diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index 8ce393eebc..d9e2f7b339 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -53,7 +53,9 @@ type Reactor interface { // peer on any of the channels registered by the reactor. ReceiveEnvelope(Envelope) - AppHashErrorsCh() <-chan AppHashError + // AppHashErrorsCh is used to stream hash errors to the sdk, which is then used + // to provide further debugging information in logs to the user. + AppHashErrorsCh() chan AppHashError } //-------------------------------------- @@ -61,12 +63,14 @@ type Reactor interface { type BaseReactor struct { service.BaseService // Provides Start, Stop, .Quit Switch *Switch + AppHashErrorChanBR chan AppHashError } func NewBaseReactor(name string, impl Reactor) *BaseReactor { return &BaseReactor{ - BaseService: *service.NewBaseService(nil, name, impl), - Switch: nil, + BaseService: *service.NewBaseService(nil, name, impl), + Switch: nil, + AppHashErrorChanBR: impl.AppHashErrorsCh(), } } @@ -79,4 +83,4 @@ func (*BaseReactor) AddPeer(peer Peer) {} func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (*BaseReactor) ReceiveEnvelope(e Envelope) {} func (*BaseReactor) InitPeer(peer Peer) Peer { return peer } -func (*BaseReactor) AppHashErrorsCh() <-chan AppHashError { return nil } +func (*BaseReactor) AppHashErrorsCh() chan AppHashError { return nil }