Skip to content

Commit

Permalink
implement app hash error channel
Browse files Browse the repository at this point in the history
  • Loading branch information
czarcas7ic committed Jan 3, 2024
1 parent c0d08de commit 7c374f1
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 21 deletions.
31 changes: 16 additions & 15 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func NewReactor(state sm.State, blockExec *sm.BlockExecutor, store *store.BlockS

requestsCh := make(chan BlockRequest, maxTotalRequesters)

const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
appHashErrorsCh := make(chan p2p.AppHashError, 5)
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
appHashErrorsCh := make(chan p2p.AppHashError) // create an unbuffered channel to stream appHash errors

startHeight := store.Height() + 1
if startHeight == 1 {
Expand Down Expand Up @@ -364,19 +364,20 @@ FOR_LOOP:
err = bcR.blockExec.ValidateBlock(state, first)
}

if strings.Contains(err.Error(), "wrong Block.Header.AppHash") {
bcR.appHashErrorsCh <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height),
}
} else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") {
bcR.appHashErrorsCh <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height - 1),
if err != nil {
// If this is an appHash or lastResultsHash error, also pass to the appHashError channel.
if strings.Contains(err.Error(), "wrong Block.Header.AppHash") {
bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height),
}
} else if strings.Contains(err.Error(), "wrong Block.Header.LastResultsHash") {
bcR.BaseReactor.AppHashErrorChanBR <- p2p.AppHashError{
Err: err,
Height: uint64(first.Height - 1),
}
}
}

if err != nil {
bcR.Logger.Error("Error in validation", "err", err)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
Expand Down Expand Up @@ -432,6 +433,6 @@ func (bcR *Reactor) BroadcastStatusRequest() {
})
}

func (r *Reactor) AppHashErrorsCh() <-chan p2p.AppHashError {
func (r *Reactor) AppHashErrorsCh() chan p2p.AppHashError {

Check warning on line 436 in blocksync/reactor.go

View workflow job for this annotation

GitHub Actions / golangci-lint

receiver-naming: receiver name r should be consistent with previous receiver name bcR for Reactor (revive)
return r.appHashErrorsCh
}
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,4 +605,4 @@ func (br *ByzantineReactor) ReceiveEnvelope(e p2p.Envelope) {

func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

func (br *ByzantineReactor) AppHashErrorsCh() <-chan p2p.AppHashError { return nil }
func (br *ByzantineReactor) AppHashErrorsCh() chan p2p.AppHashError { return nil }
2 changes: 1 addition & 1 deletion mempool/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,6 @@ func (m *TxsMessage) String() string {
return fmt.Sprintf("[TxsMessage %v]", m.Txs)
}

func (memR *Reactor) AppHashErrorsCh() <-chan p2p.AppHashError {
func (memR *Reactor) AppHashErrorsCh() chan p2p.AppHashError {
return memR.appHashErrorsCh
}
5 changes: 5 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,11 @@ func (n *Node) ConsensusReactor() *cs.Reactor {
return n.consensusReactor
}

// BCReactor returns the Node's BlockchainReactor.
func (n *Node) BCReactor() p2p.Reactor {
return n.bcReactor
}

// MempoolReactor returns the Node's mempool reactor.
func (n *Node) MempoolReactor() p2p.Reactor {
return n.mempoolReactor
Expand Down
12 changes: 8 additions & 4 deletions p2p/base_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,24 @@ type Reactor interface {
// peer on any of the channels registered by the reactor.
ReceiveEnvelope(Envelope)

AppHashErrorsCh() <-chan AppHashError
// AppHashErrorsCh is used to stream hash errors to the sdk, which is then used
// to provide further debugging information in logs to the user.
AppHashErrorsCh() chan AppHashError
}

//--------------------------------------

type BaseReactor struct {
service.BaseService // Provides Start, Stop, .Quit
Switch *Switch
AppHashErrorChanBR chan AppHashError
}

func NewBaseReactor(name string, impl Reactor) *BaseReactor {
return &BaseReactor{
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
AppHashErrorChanBR: impl.AppHashErrorsCh(),
}
}

Expand All @@ -79,4 +83,4 @@ func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (*BaseReactor) ReceiveEnvelope(e Envelope) {}
func (*BaseReactor) InitPeer(peer Peer) Peer { return peer }
func (*BaseReactor) AppHashErrorsCh() <-chan AppHashError { return nil }
func (*BaseReactor) AppHashErrorsCh() chan AppHashError { return nil }

0 comments on commit 7c374f1

Please sign in to comment.