diff --git a/.changelog/unreleased/improvements/3156-make-addvote-only-take-one-ps-mtx.md b/.changelog/unreleased/improvements/3156-make-addvote-only-take-one-ps-mtx.md new file mode 100644 index 0000000000..215295c2cf --- /dev/null +++ b/.changelog/unreleased/improvements/3156-make-addvote-only-take-one-ps-mtx.md @@ -0,0 +1,2 @@ +- `[consensus]` Make Vote messages only take one peerstate mutex + ([\#3156](https://github.com/cometbft/cometbft/issues/3156)) diff --git a/.changelog/v0.38.8/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md b/.changelog/v0.38.8/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md new file mode 100644 index 0000000000..72fae64598 --- /dev/null +++ b/.changelog/v0.38.8/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md @@ -0,0 +1,4 @@ +- `[consensus]` Make the consensus reactor no longer have packets on receive take the consensus lock. +Consensus will now update the reactor's view after every relevant change through the existing +synchronous event bus subscription. + ([\#3211](https://github.com/cometbft/cometbft/pull/3211)) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f6a8644ab..3e648c749d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ It also includes a few other bug fixes and performance improvements. - `[indexer]` Fixed ineffective select break statements; they now point to their enclosing for loop label to exit ([\#3544](https://github.com/cometbft/cometbft/issues/3544)) +- [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156) ## v0.38.10 diff --git a/consensus/reactor.go b/consensus/reactor.go index 65fb6c8a9a..53b2d2e78d 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -46,7 +46,10 @@ type Reactor struct { mtx cmtsync.RWMutex waitSync bool eventBus *types.EventBus - rs *cstypes.RoundState + + rsMtx cmtsync.RWMutex + rs *cstypes.RoundState + initialHeight int64 // under rsMtx Metrics *Metrics } @@ -57,10 +60,11 @@ type ReactorOption func(*Reactor) // consensusState. func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor { conR := &Reactor{ - conS: consensusState, - waitSync: waitSync, - rs: consensusState.GetRoundState(), - Metrics: NopMetrics(), + conS: consensusState, + waitSync: waitSync, + rs: consensusState.GetRoundState(), + initialHeight: consensusState.state.InitialHeight, + Metrics: NopMetrics(), } conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR) @@ -260,9 +264,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { case StateChannel: switch msg := msg.(type) { case *NewRoundStepMessage: - conR.conS.mtx.RLock() - initialHeight := conR.conS.state.InitialHeight - conR.conS.mtx.RUnlock() + conR.rsMtx.RLock() + initialHeight := conR.initialHeight + conR.rsMtx.RUnlock() if err = msg.ValidateHeight(initialHeight); err != nil { conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err) conR.Switch.StopPeerForError(e.Src, err) @@ -274,10 +278,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { case *HasVoteMessage: ps.ApplyHasVoteMessage(msg) case *VoteSetMaj23Message: - cs := conR.conS - cs.mtx.RLock() - height, votes := cs.Height, cs.Votes - cs.mtx.RUnlock() + conR.rsMtx.RLock() + height, votes := conR.rs.Height, conR.rs.Votes + conR.rsMtx.RUnlock() if height != msg.Height { return } @@ -342,12 +345,11 @@ func (conR *Reactor) Receive(e p2p.Envelope) { switch msg := msg.(type) { case *VoteMessage: cs := conR.conS - cs.mtx.RLock() - height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() - cs.mtx.RUnlock() - ps.EnsureVoteBitArrays(height, valSize) - ps.EnsureVoteBitArrays(height-1, lastCommitSize) - ps.SetHasVote(msg.Vote) + + conR.rsMtx.RLock() + height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size() + conR.rsMtx.RUnlock() + ps.SetHasVoteFromPeer(msg.Vote, height, valSize, lastCommitSize) cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()} @@ -363,10 +365,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) { } switch msg := msg.(type) { case *VoteSetBitsMessage: - cs := conR.conS - cs.mtx.RLock() - height, votes := cs.Height, cs.Votes - cs.mtx.RUnlock() + conR.rsMtx.RLock() + height, votes := conR.rs.Height, conR.rs.Votes + conR.rsMtx.RUnlock() if height == msg.Height { var ourVotes *bits.BitArray @@ -415,6 +416,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() { if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep, func(data cmtevents.EventData) { conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState)) + conR.updateRoundStateNoCsLock() }); err != nil { conR.Logger.Error("Error adding listener for events", "err", err) } @@ -429,8 +431,9 @@ func (conR *Reactor) subscribeToBroadcastEvents() { if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote, func(data cmtevents.EventData) { conR.broadcastHasVoteMessage(data.(*types.Vote)) + conR.updateRoundStateNoCsLock() }); err != nil { - conR.Logger.Error("Error adding listener for events", "err", err) + conR.Logger.Error("Error adding listener for events (Vote)", "err", err) } } @@ -533,15 +536,23 @@ func (conR *Reactor) updateRoundStateRoutine() { return } rs := conR.conS.GetRoundState() - conR.mtx.Lock() + conR.rsMtx.Lock() conR.rs = rs - conR.mtx.Unlock() + conR.rsMtx.Unlock() } } +func (conR *Reactor) updateRoundStateNoCsLock() { + rs := conR.conS.getRoundState() + conR.rsMtx.Lock() + conR.rs = rs + conR.initialHeight = conR.conS.state.InitialHeight + conR.rsMtx.Unlock() +} + func (conR *Reactor) getRoundState() *cstypes.RoundState { - conR.mtx.RLock() - defer conR.mtx.RUnlock() + conR.rsMtx.RLock() + defer conR.rsMtx.RUnlock() return conR.rs } @@ -1354,6 +1365,16 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) { ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex) } +// SetHasVote sets the given vote as known by the peer. +func (ps *PeerState) SetHasVoteFromPeer(vote *types.Vote, csHeight int64, valSize, lastCommitSize int) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + + ps.ensureVoteBitArrays(csHeight, valSize) + ps.ensureVoteBitArrays(csHeight-1, lastCommitSize) + ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex) +} + func (ps *PeerState) setHasVote(height int64, round int32, voteType cmtproto.SignedMsgType, index int32) { ps.logger.Debug("setHasVote", "peerH/R", diff --git a/consensus/state.go b/consensus/state.go index 96ec4f8446..91bf8bf7a8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -256,10 +256,18 @@ func (cs *State) GetLastHeight() int64 { } // GetRoundState returns a shallow copy of the internal consensus state. +// This function is thread-safe. func (cs *State) GetRoundState() *cstypes.RoundState { cs.mtx.RLock() - rs := cs.RoundState // copy + rs := cs.getRoundState() cs.mtx.RUnlock() + return rs +} + +// getRoundState returns a shallow copy of the internal consensus state. +// This function is not thread-safe. Use GetRoundState for the thread-safe version. +func (cs *State) getRoundState() *cstypes.RoundState { + rs := cs.RoundState // copy return &rs } diff --git a/libs/os/os.go b/libs/os/os.go index 334eaf4c89..cfab38f48c 100644 --- a/libs/os/os.go +++ b/libs/os/os.go @@ -41,7 +41,7 @@ func Kill() error { } func Exit(s string) { - fmt.Printf(s + "\n") + fmt.Println(s) os.Exit(1) } diff --git a/types/validator_set_test.go b/types/validator_set_test.go index 3070a43a9b..e795702105 100644 --- a/types/validator_set_test.go +++ b/types/validator_set_test.go @@ -331,7 +331,7 @@ func TestProposerSelection3(t *testing.T) { got := vset.GetProposer().Address expected := proposerOrder[j%4].Address if !bytes.Equal(got, expected) { - t.Fatalf(fmt.Sprintf("vset.Proposer (%X) does not match expected proposer (%X) for (%d, %d)", got, expected, i, j)) + t.Fatalf("vset.Proposer (%X) does not match expected proposer (%X) for (%d, %d)", got, expected, i, j) } // serialize, deserialize, check proposer @@ -342,13 +342,11 @@ func TestProposerSelection3(t *testing.T) { if i != 0 { if !bytes.Equal(got, computed.Address) { t.Fatalf( - fmt.Sprintf( - "vset.Proposer (%X) does not match computed proposer (%X) for (%d, %d)", - got, - computed.Address, - i, - j, - ), + "vset.Proposer (%X) does not match computed proposer (%X) for (%d, %d)", + got, + computed.Address, + i, + j, ) } }