From 601c002960e5b502a90784ec62511a0dc997fc57 Mon Sep 17 00:00:00 2001
From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com>
Date: Wed, 10 Jul 2024 16:51:42 +0400
Subject: [PATCH 1/4] perf(consensus): Undo revert #3211. (Remove reactor
Consensus RLocks) (backport #3341) (#3490)
Pretty simple bug fix for the e2e failure on #3211. There was a race
condition at iniitialization for initial height, because we didn't
initialize it early on enough.
The error in the E2E logs was:
```
validator03 | E[2024-06-21|21:13:20.744] Stopping peer for error module=p2p peer="Peer{MConn{10.186.73.2:34810} 4fe295e4cfad69f1247ad85975c6fd87757195db in}" err="invalid field LastCommitRound can only be negative for initial height 0"
validator03 | I[2024-06-21|21:13:20.744] service stop module=p2p peer=4fe295e4cfad69f1247ad85975c6fd87757195db@10.186.73.2:34810 msg="Stopping Peer service" impl="Peer{MConn{10.186.73.2:34810} 4fe295e4cfad69f1247ad85975c6fd87757195db in}"
```
hinting at initial height not being set rpoperly.
---
- [ ] Tests written/updated
- [ ] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [ ] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec
This is an automatic backport of pull request #3341 done by
[Mergify](https://mergify.com).
Co-authored-by: Dev Ojha
---
...ake-cs-reactor-no-longer-takes-cs-locks.md | 4 ++
consensus/reactor.go | 57 +++++++++++--------
consensus/state.go | 10 +++-
3 files changed, 47 insertions(+), 24 deletions(-)
create mode 100644 .changelog/v0.38.8/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md
diff --git a/.changelog/v0.38.8/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md b/.changelog/v0.38.8/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md
new file mode 100644
index 0000000000..72fae64598
--- /dev/null
+++ b/.changelog/v0.38.8/improvements/3211-make-cs-reactor-no-longer-takes-cs-locks.md
@@ -0,0 +1,4 @@
+- `[consensus]` Make the consensus reactor no longer have packets on receive take the consensus lock.
+Consensus will now update the reactor's view after every relevant change through the existing
+synchronous event bus subscription.
+ ([\#3211](https://github.com/cometbft/cometbft/pull/3211))
diff --git a/consensus/reactor.go b/consensus/reactor.go
index 65fb6c8a9a..9e86e70d2e 100644
--- a/consensus/reactor.go
+++ b/consensus/reactor.go
@@ -46,7 +46,10 @@ type Reactor struct {
mtx cmtsync.RWMutex
waitSync bool
eventBus *types.EventBus
- rs *cstypes.RoundState
+
+ rsMtx cmtsync.RWMutex
+ rs *cstypes.RoundState
+ initialHeight int64 // under rsMtx
Metrics *Metrics
}
@@ -57,10 +60,11 @@ type ReactorOption func(*Reactor)
// consensusState.
func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption) *Reactor {
conR := &Reactor{
- conS: consensusState,
- waitSync: waitSync,
- rs: consensusState.GetRoundState(),
- Metrics: NopMetrics(),
+ conS: consensusState,
+ waitSync: waitSync,
+ rs: consensusState.GetRoundState(),
+ initialHeight: consensusState.state.InitialHeight,
+ Metrics: NopMetrics(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
@@ -260,9 +264,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case StateChannel:
switch msg := msg.(type) {
case *NewRoundStepMessage:
- conR.conS.mtx.RLock()
- initialHeight := conR.conS.state.InitialHeight
- conR.conS.mtx.RUnlock()
+ conR.rsMtx.RLock()
+ initialHeight := conR.initialHeight
+ conR.rsMtx.RUnlock()
if err = msg.ValidateHeight(initialHeight); err != nil {
conR.Logger.Error("Peer sent us invalid msg", "peer", e.Src, "msg", msg, "err", err)
conR.Switch.StopPeerForError(e.Src, err)
@@ -274,10 +278,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
case *HasVoteMessage:
ps.ApplyHasVoteMessage(msg)
case *VoteSetMaj23Message:
- cs := conR.conS
- cs.mtx.RLock()
- height, votes := cs.Height, cs.Votes
- cs.mtx.RUnlock()
+ conR.rsMtx.RLock()
+ height, votes := conR.rs.Height, conR.rs.Votes
+ conR.rsMtx.RUnlock()
if height != msg.Height {
return
}
@@ -342,12 +345,11 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
switch msg := msg.(type) {
case *VoteMessage:
cs := conR.conS
- cs.mtx.RLock()
- height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
- cs.mtx.RUnlock()
- ps.EnsureVoteBitArrays(height, valSize)
- ps.EnsureVoteBitArrays(height-1, lastCommitSize)
- ps.SetHasVote(msg.Vote)
+
+ conR.rsMtx.RLock()
+ height, valSize, lastCommitSize := conR.rs.Height, conR.rs.Validators.Size(), conR.rs.LastCommit.Size()
+ conR.rsMtx.RUnlock()
+ ps.SetHasVoteFromPeer(msg.Vote, height, valSize, lastCommitSize)
cs.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
@@ -363,10 +365,9 @@ func (conR *Reactor) Receive(e p2p.Envelope) {
}
switch msg := msg.(type) {
case *VoteSetBitsMessage:
- cs := conR.conS
- cs.mtx.RLock()
- height, votes := cs.Height, cs.Votes
- cs.mtx.RUnlock()
+ conR.rsMtx.RLock()
+ height, votes := conR.rs.Height, conR.rs.Votes
+ conR.rsMtx.RUnlock()
if height == msg.Height {
var ourVotes *bits.BitArray
@@ -415,6 +416,7 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
func(data cmtevents.EventData) {
conR.broadcastNewRoundStepMessage(data.(*cstypes.RoundState))
+ conR.updateRoundStateNoCsLock()
}); err != nil {
conR.Logger.Error("Error adding listener for events", "err", err)
}
@@ -429,8 +431,9 @@ func (conR *Reactor) subscribeToBroadcastEvents() {
if err := conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
func(data cmtevents.EventData) {
conR.broadcastHasVoteMessage(data.(*types.Vote))
+ conR.updateRoundStateNoCsLock()
}); err != nil {
- conR.Logger.Error("Error adding listener for events", "err", err)
+ conR.Logger.Error("Error adding listener for events (Vote)", "err", err)
}
}
@@ -539,6 +542,14 @@ func (conR *Reactor) updateRoundStateRoutine() {
}
}
+func (conR *Reactor) updateRoundStateNoCsLock() {
+ rs := conR.conS.getRoundState()
+ conR.rsMtx.Lock()
+ conR.rs = rs
+ conR.initialHeight = conR.conS.state.InitialHeight
+ conR.rsMtx.Unlock()
+}
+
func (conR *Reactor) getRoundState() *cstypes.RoundState {
conR.mtx.RLock()
defer conR.mtx.RUnlock()
diff --git a/consensus/state.go b/consensus/state.go
index 96ec4f8446..91bf8bf7a8 100644
--- a/consensus/state.go
+++ b/consensus/state.go
@@ -256,10 +256,18 @@ func (cs *State) GetLastHeight() int64 {
}
// GetRoundState returns a shallow copy of the internal consensus state.
+// This function is thread-safe.
func (cs *State) GetRoundState() *cstypes.RoundState {
cs.mtx.RLock()
- rs := cs.RoundState // copy
+ rs := cs.getRoundState()
cs.mtx.RUnlock()
+ return rs
+}
+
+// getRoundState returns a shallow copy of the internal consensus state.
+// This function is not thread-safe. Use GetRoundState for the thread-safe version.
+func (cs *State) getRoundState() *cstypes.RoundState {
+ rs := cs.RoundState // copy
return &rs
}
From 195d6fb4d545016280624361549acadddfdb24b6 Mon Sep 17 00:00:00 2001
From: Dev Ojha
Date: Mon, 19 Aug 2024 15:42:10 -0400
Subject: [PATCH 2/4] BP 3156
---
.../3156-make-addvote-only-take-one-ps-mtx.md | 2 ++
CHANGELOG.md | 1 +
consensus/reactor.go | 10 ++++++++++
3 files changed, 13 insertions(+)
create mode 100644 .changelog/unreleased/improvements/3156-make-addvote-only-take-one-ps-mtx.md
diff --git a/.changelog/unreleased/improvements/3156-make-addvote-only-take-one-ps-mtx.md b/.changelog/unreleased/improvements/3156-make-addvote-only-take-one-ps-mtx.md
new file mode 100644
index 0000000000..215295c2cf
--- /dev/null
+++ b/.changelog/unreleased/improvements/3156-make-addvote-only-take-one-ps-mtx.md
@@ -0,0 +1,2 @@
+- `[consensus]` Make Vote messages only take one peerstate mutex
+ ([\#3156](https://github.com/cometbft/cometbft/issues/3156))
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5f6a8644ab..3e648c749d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -18,6 +18,7 @@ It also includes a few other bug fixes and performance improvements.
- `[indexer]` Fixed ineffective select break statements; they now
point to their enclosing for loop label to exit
([\#3544](https://github.com/cometbft/cometbft/issues/3544))
+- [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156)
## v0.38.10
diff --git a/consensus/reactor.go b/consensus/reactor.go
index 9e86e70d2e..8bb7de92b1 100644
--- a/consensus/reactor.go
+++ b/consensus/reactor.go
@@ -1365,6 +1365,16 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
}
+// SetHasVote sets the given vote as known by the peer.
+func (ps *PeerState) SetHasVoteFromPeer(vote *types.Vote, csHeight int64, valSize, lastCommitSize int) {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
+
+ ps.ensureVoteBitArrays(csHeight, valSize)
+ ps.ensureVoteBitArrays(csHeight-1, lastCommitSize)
+ ps.setHasVote(vote.Height, vote.Round, vote.Type, vote.ValidatorIndex)
+}
+
func (ps *PeerState) setHasVote(height int64, round int32, voteType cmtproto.SignedMsgType, index int32) {
ps.logger.Debug("setHasVote",
"peerH/R",
From 5bf7ef69d52d8dfe27288f61a3c5f53c3bdabe58 Mon Sep 17 00:00:00 2001
From: Dev Ojha
Date: Mon, 19 Aug 2024 15:52:41 -0400
Subject: [PATCH 3/4] use rsMtx
---
consensus/reactor.go | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/consensus/reactor.go b/consensus/reactor.go
index 8bb7de92b1..53b2d2e78d 100644
--- a/consensus/reactor.go
+++ b/consensus/reactor.go
@@ -536,9 +536,9 @@ func (conR *Reactor) updateRoundStateRoutine() {
return
}
rs := conR.conS.GetRoundState()
- conR.mtx.Lock()
+ conR.rsMtx.Lock()
conR.rs = rs
- conR.mtx.Unlock()
+ conR.rsMtx.Unlock()
}
}
@@ -551,8 +551,8 @@ func (conR *Reactor) updateRoundStateNoCsLock() {
}
func (conR *Reactor) getRoundState() *cstypes.RoundState {
- conR.mtx.RLock()
- defer conR.mtx.RUnlock()
+ conR.rsMtx.RLock()
+ defer conR.rsMtx.RUnlock()
return conR.rs
}
From 9355856e6dd47ebe25db59f4ce20f2a4c1506651 Mon Sep 17 00:00:00 2001
From: Dev Ojha
Date: Mon, 19 Aug 2024 15:56:47 -0400
Subject: [PATCH 4/4] Fix lint
---
libs/os/os.go | 2 +-
types/validator_set_test.go | 14 ++++++--------
2 files changed, 7 insertions(+), 9 deletions(-)
diff --git a/libs/os/os.go b/libs/os/os.go
index 334eaf4c89..cfab38f48c 100644
--- a/libs/os/os.go
+++ b/libs/os/os.go
@@ -41,7 +41,7 @@ func Kill() error {
}
func Exit(s string) {
- fmt.Printf(s + "\n")
+ fmt.Println(s)
os.Exit(1)
}
diff --git a/types/validator_set_test.go b/types/validator_set_test.go
index 3070a43a9b..e795702105 100644
--- a/types/validator_set_test.go
+++ b/types/validator_set_test.go
@@ -331,7 +331,7 @@ func TestProposerSelection3(t *testing.T) {
got := vset.GetProposer().Address
expected := proposerOrder[j%4].Address
if !bytes.Equal(got, expected) {
- t.Fatalf(fmt.Sprintf("vset.Proposer (%X) does not match expected proposer (%X) for (%d, %d)", got, expected, i, j))
+ t.Fatalf("vset.Proposer (%X) does not match expected proposer (%X) for (%d, %d)", got, expected, i, j)
}
// serialize, deserialize, check proposer
@@ -342,13 +342,11 @@ func TestProposerSelection3(t *testing.T) {
if i != 0 {
if !bytes.Equal(got, computed.Address) {
t.Fatalf(
- fmt.Sprintf(
- "vset.Proposer (%X) does not match computed proposer (%X) for (%d, %d)",
- got,
- computed.Address,
- i,
- j,
- ),
+ "vset.Proposer (%X) does not match computed proposer (%X) for (%d, %d)",
+ got,
+ computed.Address,
+ i,
+ j,
)
}
}