Skip to content

Commit

Permalink
backport CS lock removal (#137)
Browse files Browse the repository at this point in the history
* perf(consensus): Undo revert cometbft#3211. (Remove reactor Consensus RLocks) (backport cometbft#3341) (cometbft#3490)

Pretty simple bug fix for the e2e failure on cometbft#3211. There was a race
condition at iniitialization for initial height, because we didn't
initialize it early on enough.

The error in the E2E logs was:
```
validator03    | E[2024-06-21|21:13:20.744] Stopping peer for error                      module=p2p peer="Peer{MConn{10.186.73.2:34810} 4fe295e4cfad69f1247ad85975c6fd87757195db in}" err="invalid field LastCommitRound can only be negative for initial height 0"
validator03    | I[2024-06-21|21:13:20.744] service stop                                 module=p2p peer=4fe295e4cfad69f1247ad85975c6fd87757195db@10.186.73.2:34810 msg="Stopping Peer service" impl="Peer{MConn{10.186.73.2:34810} 4fe295e4cfad69f1247ad85975c6fd87757195db in}"
```
hinting at initial height not being set rpoperly.

---

- [ ] Tests written/updated
- [ ] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [ ] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec
<hr>This is an automatic backport of pull request cometbft#3341 done by
[Mergify](https://mergify.com).

Co-authored-by: Dev Ojha <ValarDragon@users.noreply.github.com>

* BP 3156

* use rsMtx

* Fix lint

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
ValarDragon and mergify[bot] authored Aug 19, 2024
1 parent d76422d commit 854fd6b
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[consensus]` Make Vote messages only take one peerstate mutex
([\#3156](https://github.com/cometbft/cometbft/issues/3156))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[consensus]` Make the consensus reactor no longer have packets on receive take the consensus lock.
Consensus will now update the reactor's view after every relevant change through the existing
synchronous event bus subscription.
([\#3211](https://github.com/cometbft/cometbft/pull/3211))
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ It also includes a few other bug fixes and performance improvements.
- `[indexer]` Fixed ineffective select break statements; they now
point to their enclosing for loop label to exit
([\#3544](https://github.com/cometbft/cometbft/issues/3544))
- [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156)

## v0.38.10

Expand Down
75 changes: 48 additions & 27 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ type Reactor struct {
mtx cmtsync.RWMutex
waitSync bool
eventBus *types.EventBus
rs *cstypes.RoundState

rsMtx cmtsync.RWMutex
rs *cstypes.RoundState
initialHeight int64 // under rsMtx

Metrics *Metrics
}
Expand All @@ -57,10 +60,11 @@ type ReactorOption func(*Reactor)
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
initialHeight: consensusState.state.InitialHeight,
Metrics: NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

Expand Down Expand Up @@ -260,9 +264,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
conR.conS.mtx.RLock()
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.RUnlock()
conR.rsMtx.RLock()
initialHeight := conR.initialHeight
conR.rsMtx.RUnlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
Expand All @@ -274,10 +278,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.RLock()
height, votes := cs.Height, cs.Votes
cs.mtx.RUnlock()
conR.rsMtx.RLock()
height, votes := conR.rs.Height, conR.rs.Votes
conR.rsMtx.RUnlock()
if height != msg.Height {
return
}
Expand Down Expand Up @@ -342,12 +345,11 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
switch msg := msg.(type) {
case *VoteMessage:
cs := conR.conS
cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()
ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)

conR.rsMtx.RLock()
height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size()
conR.rsMtx.RUnlock()
ps.SetHasVoteFromPeer(msg.Vote, height, valSize, lastCommitSize)

cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}

Expand All @@ -363,10 +365,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
cs := conR.conS
cs.mtx.RLock()
height, votes := cs.Height, cs.Votes
cs.mtx.RUnlock()
conR.rsMtx.RLock()
height, votes := conR.rs.Height, conR.rs.Votes
conR.rsMtx.RUnlock()

if height == msg.Height {
var ourVotes *bits.BitArray
Expand Down Expand Up @@ -415,6 +416,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
func(data cmtevents.EventData) {
conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
}
Expand All @@ -429,8 +431,9 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
func(data cmtevents.EventData) {
conR.broadcastHasVoteMessage(data.(*types.Vote))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
conR.Logger.Error("Error adding listener for events (Vote)", "err", err)
}
}

Expand Down Expand Up @@ -533,15 +536,23 @@ func (conR *Reactor) updateRoundStateRoutine() {
return
}
rs := conR.conS.GetRoundState()
conR.mtx.Lock()
conR.rsMtx.Lock()
conR.rs = rs
conR.mtx.Unlock()
conR.rsMtx.Unlock()
}
}

func (conR *Reactor) updateRoundStateNoCsLock() {
rs := conR.conS.getRoundState()
conR.rsMtx.Lock()
conR.rs = rs
conR.initialHeight = conR.conS.state.InitialHeight
conR.rsMtx.Unlock()
}

func (conR *Reactor) getRoundState() *cstypes.RoundState {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
conR.rsMtx.RLock()
defer conR.rsMtx.RUnlock()
return conR.rs
}

Expand Down Expand Up @@ -1354,6 +1365,16 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
}

// SetHasVote sets the given vote as known by the peer.
func (ps *PeerState) SetHasVoteFromPeer(vote *types.Vote, csHeight int64, valSize, lastCommitSize int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

ps.ensureVoteBitArrays(csHeight, valSize)
ps.ensureVoteBitArrays(csHeight-1, lastCommitSize)
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
}

func (ps *PeerState) setHasVote(height int64, round int32, voteType cmtproto.SignedMsgType, index int32) {
ps.logger.Debug("setHasVote",
"peerH/R",
Expand Down
10 changes: 9 additions & 1 deletion consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,18 @@ func (cs *State) GetLastHeight() int64 {
}

// GetRoundState returns a shallow copy of the internal consensus state.
// This function is thread-safe.
func (cs *State) GetRoundState() *cstypes.RoundState {
cs.mtx.RLock()
rs := cs.RoundState // copy
rs := cs.getRoundState()
cs.mtx.RUnlock()
return rs
}

// getRoundState returns a shallow copy of the internal consensus state.
// This function is not thread-safe. Use GetRoundState for the thread-safe version.
func (cs *State) getRoundState() *cstypes.RoundState {
rs := cs.RoundState // copy
return &rs
}

Expand Down
2 changes: 1 addition & 1 deletion libs/os/os.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Kill() error {
}

func Exit(s string) {
fmt.Printf(s + "\n")
fmt.Println(s)
os.Exit(1)
}

Expand Down
14 changes: 6 additions & 8 deletions types/validator_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestProposerSelection3(t *testing.T) {
got := vset.GetProposer().Address
expected := proposerOrder[j%4].Address
if !bytes.Equal(got, expected) {
t.Fatalf(fmt.Sprintf("vset.Proposer (%X) does not match expected proposer (%X) for (%d, %d)", got, expected, i, j))
t.Fatalf("vset.Proposer (%X) does not match expected proposer (%X) for (%d, %d)", got, expected, i, j)
}

// serialize, deserialize, check proposer
Expand All @@ -342,13 +342,11 @@ func TestProposerSelection3(t *testing.T) {
if i != 0 {
if !bytes.Equal(got, computed.Address) {
t.Fatalf(
fmt.Sprintf(
"vset.Proposer (%X) does not match computed proposer (%X) for (%d, %d)",
got,
computed.Address,
i,
j,
),
"vset.Proposer (%X) does not match computed proposer (%X) for (%d, %d)",
got,
computed.Address,
i,
j,
)
}
}
Expand Down

0 comments on commit 854fd6b

Please sign in to comment.