Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport CS lock removal #137

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[consensus]` Make Vote messages only take one peerstate mutex
([\#3156](https://github.com/cometbft/cometbft/issues/3156))
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- `[consensus]` Make the consensus reactor no longer have packets on receive take the consensus lock.
Consensus will now update the reactor's view after every relevant change through the existing
synchronous event bus subscription.
([\#3211](https://github.com/cometbft/cometbft/pull/3211))
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ It also includes a few other bug fixes and performance improvements.
- `[indexer]` Fixed ineffective select break statements; they now
point to their enclosing for loop label to exit
([\#3544](https://github.com/cometbft/cometbft/issues/3544))
- [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156)

## v0.38.10

Expand Down
75 changes: 48 additions & 27 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ type Reactor struct {
mtx cmtsync.RWMutex
waitSync bool
eventBus *types.EventBus
rs *cstypes.RoundState

rsMtx cmtsync.RWMutex
rs *cstypes.RoundState
initialHeight int64 // under rsMtx

Metrics *Metrics
}
Expand All @@ -57,10 +60,11 @@ type ReactorOption func(*Reactor)
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
conR := &Reactor{
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
Metrics: NopMetrics(),
conS: consensusState,
waitSync: waitSync,
rs: consensusState.GetRoundState(),
initialHeight: consensusState.state.InitialHeight,
Metrics: NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

Expand Down Expand Up @@ -260,9 +264,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
conR.conS.mtx.RLock()
initialHeight := conR.conS.state.InitialHeight
conR.conS.mtx.RUnlock()
conR.rsMtx.RLock()
initialHeight := conR.initialHeight
conR.rsMtx.RUnlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
Expand All @@ -274,10 +278,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
cs := conR.conS
cs.mtx.RLock()
height, votes := cs.Height, cs.Votes
cs.mtx.RUnlock()
conR.rsMtx.RLock()
height, votes := conR.rs.Height, conR.rs.Votes
conR.rsMtx.RUnlock()
if height != msg.Height {
return
}
Expand Down Expand Up @@ -342,12 +345,11 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
switch msg := msg.(type) {
case *VoteMessage:
cs := conR.conS
cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()
ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)

conR.rsMtx.RLock()
height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size()
conR.rsMtx.RUnlock()
ps.SetHasVoteFromPeer(msg.Vote, height, valSize, lastCommitSize)

cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}

Expand All @@ -363,10 +365,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
cs := conR.conS
cs.mtx.RLock()
height, votes := cs.Height, cs.Votes
cs.mtx.RUnlock()
conR.rsMtx.RLock()
height, votes := conR.rs.Height, conR.rs.Votes
conR.rsMtx.RUnlock()

if height == msg.Height {
var ourVotes *bits.BitArray
Expand Down Expand Up @@ -415,6 +416,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
func(data cmtevents.EventData) {
conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
}
Expand All @@ -429,8 +431,9 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
func(data cmtevents.EventData) {
conR.broadcastHasVoteMessage(data.(*types.Vote))
conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
conR.Logger.Error("Error adding listener for events (Vote)", "err", err)
}
}

Expand Down Expand Up @@ -533,15 +536,23 @@ func (conR *Reactor) updateRoundStateRoutine() {
return
}
rs := conR.conS.GetRoundState()
conR.mtx.Lock()
conR.rsMtx.Lock()
conR.rs = rs
conR.mtx.Unlock()
conR.rsMtx.Unlock()
}
}

func (conR *Reactor) updateRoundStateNoCsLock() {
rs := conR.conS.getRoundState()
conR.rsMtx.Lock()
conR.rs = rs
conR.initialHeight = conR.conS.state.InitialHeight
conR.rsMtx.Unlock()
}

func (conR *Reactor) getRoundState() *cstypes.RoundState {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
conR.rsMtx.RLock()
defer conR.rsMtx.RUnlock()
return conR.rs
}

Expand Down Expand Up @@ -1354,6 +1365,16 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
}

// SetHasVote sets the given vote as known by the peer.
func (ps *PeerState) SetHasVoteFromPeer(vote *types.Vote, csHeight int64, valSize, lastCommitSize int) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

ps.ensureVoteBitArrays(csHeight, valSize)
ps.ensureVoteBitArrays(csHeight-1, lastCommitSize)
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
}

func (ps *PeerState) setHasVote(height int64, round int32, voteType cmtproto.SignedMsgType, index int32) {
ps.logger.Debug("setHasVote",
"peerH/R",
Expand Down
10 changes: 9 additions & 1 deletion consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,18 @@ func (cs *State) GetLastHeight() int64 {
}

// GetRoundState returns a shallow copy of the internal consensus state.
// This function is thread-safe.
func (cs *State) GetRoundState() *cstypes.RoundState {
cs.mtx.RLock()
rs := cs.RoundState // copy
rs := cs.getRoundState()
cs.mtx.RUnlock()
return rs
}

// getRoundState returns a shallow copy of the internal consensus state.
// This function is not thread-safe. Use GetRoundState for the thread-safe version.
func (cs *State) getRoundState() *cstypes.RoundState {
rs := cs.RoundState // copy
return &rs
}

Expand Down
2 changes: 1 addition & 1 deletion libs/os/os.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Kill() error {
}

func Exit(s string) {
fmt.Printf(s + "\n")
fmt.Println(s)
os.Exit(1)
}

Expand Down
14 changes: 6 additions & 8 deletions types/validator_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ func TestProposerSelection3(t *testing.T) {
got := vset.GetProposer().Address
expected := proposerOrder[j%4].Address
if !bytes.Equal(got, expected) {
t.Fatalf(fmt.Sprintf("vset.Proposer (%X) does not match expected proposer (%X) for (%d, %d)", got, expected, i, j))
t.Fatalf("vset.Proposer (%X) does not match expected proposer (%X) for (%d, %d)", got, expected, i, j)
}

// serialize, deserialize, check proposer
Expand All @@ -342,13 +342,11 @@ func TestProposerSelection3(t *testing.T) {
if i != 0 {
if !bytes.Equal(got, computed.Address) {
t.Fatalf(
fmt.Sprintf(
"vset.Proposer (%X) does not match computed proposer (%X) for (%d, %d)",
got,
computed.Address,
i,
j,
),
"vset.Proposer (%X) does not match computed proposer (%X) for (%d, %d)",
got,
computed.Address,
i,
j,
)
}
}
Expand Down
Loading