diff --git a/.changelog/v0.38.8/improvements/3006-use-thread-independent-randomness-in-gossip.md b/.changelog/v0.38.8/improvements/3006-use-thread-independent-randomness-in-gossip.md new file mode 100644 index 0000000000..290da0b1c8 --- /dev/null +++ b/.changelog/v0.38.8/improvements/3006-use-thread-independent-randomness-in-gossip.md @@ -0,0 +1,2 @@ +- [`consensus`] Use an independent rng for gossip threads, reducing mutex contention. + ([\#3005](https://github.com/cometbft/cometbft/issues/3005) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b8fd3ed48..5f6a8644ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,5 @@ # CHANGELOG -## v0.38.11 - -*August 12, 2024* - This release fixes a panic in consensus where CometBFT would previously panic if there's no extension signature in non-nil Precommit EVEN IF vote extensions themselves are disabled. diff --git a/consensus/reactor.go b/consensus/reactor.go index ee87b7ba63..3304159556 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -3,6 +3,7 @@ package consensus import ( "errors" "fmt" + "math/rand" "reflect" "sync" "time" @@ -12,6 +13,7 @@ import ( cmtevents "github.com/cometbft/cometbft/libs/events" cmtjson "github.com/cometbft/cometbft/libs/json" "github.com/cometbft/cometbft/libs/log" + cmtrand "github.com/cometbft/cometbft/libs/rand" cmtsync "github.com/cometbft/cometbft/libs/sync" "github.com/cometbft/cometbft/p2p" cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus" @@ -538,6 +540,7 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState { func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + rng := cmtrand.NewStdlibRand() OUTER_LOOP: for { @@ -550,7 +553,7 @@ OUTER_LOOP: // Send proposal Block parts? if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) { - if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { + if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(rng); ok { part := rs.ProposalBlockParts.GetPart(index) parts, err := part.ToProto() if err != nil { @@ -589,7 +592,7 @@ OUTER_LOOP: // continue the loop since prs is a copy and not effected by this initialization continue OUTER_LOOP } - conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer) + conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer, rng) continue OUTER_LOOP } @@ -644,9 +647,9 @@ OUTER_LOOP: } func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState, - prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer, -) { - if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { + prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer, rng *rand.Rand) { + + if index, ok := prs.ProposalBlockParts.Not().PickRandom(rng); ok { // Ensure that the peer's PartSetHeader is correct blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) if blockMeta == nil { @@ -697,6 +700,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) + rng := cmtrand.NewStdlibRand() // Simple hack to throttle logs upon sleep. sleeping := 0 @@ -723,7 +727,7 @@ OUTER_LOOP: // If height matches, then send LastCommit, Prevotes, Precommits. if rs.Height == prs.Height { heightLogger := logger.With("height", prs.Height) - if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) { + if conR.gossipVotesForHeight(heightLogger, rs, prs, ps, rng) { continue OUTER_LOOP } } @@ -731,7 +735,7 @@ OUTER_LOOP: // Special catchup logic. // If peer is lagging by height 1, send LastCommit. if prs.Height != 0 && rs.Height == prs.Height+1 { - if ps.PickSendVote(rs.LastCommit) { + if ps.PickSendVote(rng, rs.LastCommit) { logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) continue OUTER_LOOP } @@ -762,7 +766,7 @@ OUTER_LOOP: if ec == nil { continue } - if ps.PickSendVote(ec) { + if ps.PickSendVote(rng, ec) { logger.Debug("Picked Catchup commit to send", "height", prs.Height) continue OUTER_LOOP } @@ -771,9 +775,9 @@ OUTER_LOOP: if sleeping == 0 { // We sent nothing. Sleep... sleeping = 1 - logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height, - "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, - "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits) + // logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height, + // "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, + // "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits) } else if sleeping == 2 { // Continued sleep... sleeping = 1 @@ -789,10 +793,11 @@ func (conR *Reactor) gossipVotesForHeight( rs *cstypes.RoundState, prs *cstypes.PeerRoundState, ps *PeerState, + rng *rand.Rand, ) bool { // If there are lastCommits to send... if prs.Step == cstypes.RoundStepNewHeight { - if ps.PickSendVote(rs.LastCommit) { + if ps.PickSendVote(rng, rs.LastCommit) { logger.Debug("Picked rs.LastCommit to send") return true } @@ -800,7 +805,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are POL prevotes to send... if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { + if ps.PickSendVote(rng, polPrevotes) { logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) return true @@ -809,21 +814,21 @@ func (conR *Reactor) gossipVotesForHeight( } // If there are prevotes to send... if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) { logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) return true } } // If there are precommits to send... if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { + if ps.PickSendVote(rng, rs.Votes.Precommits(prs.Round)) { logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round) return true } } // If there are prevotes to send...Needed because of validBlock mechanism if prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) { logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) return true } @@ -831,7 +836,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are POLPrevotes to send... if prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { + if ps.PickSendVote(rng, polPrevotes) { logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) return true @@ -1144,8 +1149,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in // PickSendVote picks a vote and sends it to the peer. // Returns true if vote was sent. -func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { - if vote, ok := ps.PickVoteToSend(votes); ok { +func (ps *PeerState) PickSendVote(rng *rand.Rand, votes types.VoteSetReader) bool { + if vote, ok := ps.PickVoteToSend(rng, votes); ok { ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) if ps.peer.Send(p2p.Envelope{ ChannelID: VoteChannel, @@ -1164,7 +1169,7 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { // PickVoteToSend picks a vote to send to the peer. // Returns true if a vote was picked. // NOTE: `votes` must be the correct Size() for the Height(). -func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) { +func (ps *PeerState) PickVoteToSend(rng *rand.Rand, votes types.VoteSetReader) (vote *types.Vote, ok bool) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1184,7 +1189,7 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote if psVotes == nil { return nil, false // Not something worth sending } - if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok { + if index, ok := votes.BitArray().Sub(psVotes).PickRandom(rng); ok { return votes.GetByIndex(int32(index)), true } return nil, false diff --git a/libs/bits/bit_array.go b/libs/bits/bit_array.go index f9744f9c7b..e186b05b8d 100644 --- a/libs/bits/bit_array.go +++ b/libs/bits/bit_array.go @@ -4,12 +4,12 @@ import ( "encoding/binary" "fmt" "math/bits" + "math/rand" "regexp" "strings" "sync" cmtmath "github.com/cometbft/cometbft/libs/math" - cmtrand "github.com/cometbft/cometbft/libs/rand" cmtprotobits "github.com/cometbft/cometbft/proto/tendermint/libs/bits" ) @@ -261,8 +261,8 @@ func (bA *BitArray) IsFull() bool { // PickRandom returns a random index for a set bit in the bit array. // If there is no such value, it returns 0, false. -// It uses the global randomness in `random.go` to get this index. -func (bA *BitArray) PickRandom() (int, bool) { +// It uses the provided randomness to get this index. +func (bA *BitArray) PickRandom(r *rand.Rand) (int, bool) { if bA == nil { return 0, false } @@ -273,7 +273,7 @@ func (bA *BitArray) PickRandom() (int, bool) { bA.mtx.Unlock() return 0, false } - index := bA.getNthTrueIndex(cmtrand.Intn(numTrueIndices)) + index := bA.getNthTrueIndex(r.Intn(numTrueIndices)) bA.mtx.Unlock() if index == -1 { return 0, false diff --git a/libs/bits/bit_array_test.go b/libs/bits/bit_array_test.go index 0f7351f346..dc36f4f8c8 100644 --- a/libs/bits/bit_array_test.go +++ b/libs/bits/bit_array_test.go @@ -4,7 +4,9 @@ import ( "bytes" "encoding/json" "fmt" + "math/rand" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -17,6 +19,7 @@ var ( empty64Bits = empty16Bits + empty16Bits + empty16Bits + empty16Bits full16bits = "xxxxxxxxxxxxxxxx" full64bits = full16bits + full16bits + full16bits + full16bits + grand = rand.New(rand.NewSource(time.Now().UnixNano())) ) func randBitArray(bits int) *BitArray { @@ -139,7 +142,7 @@ func TestPickRandom(t *testing.T) { var bitArr *BitArray err := json.Unmarshal([]byte(tc.bA), &bitArr) require.NoError(t, err) - _, ok := bitArr.PickRandom() + _, ok := bitArr.PickRandom(grand) require.Equal(t, tc.ok, ok, "PickRandom got an unexpected result on input %s", tc.bA) } } @@ -394,6 +397,6 @@ func BenchmarkPickRandomBitArray(b *testing.B) { require.NoError(b, err) b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = bitArr.PickRandom() + _, _ = bitArr.PickRandom(grand) } } diff --git a/libs/rand/random.go b/libs/rand/random.go index 053e03e15e..f89a983be7 100644 --- a/libs/rand/random.go +++ b/libs/rand/random.go @@ -37,14 +37,28 @@ func NewRand() *Rand { return rand } -func (r *Rand) init() { +// Make a new stdlib rand source. Its up to the caller to ensure +// that the rand source is not called in parallel. +// The failure mode of calling the returned rand multiple times in parallel is +// repeated values across threads. +func NewStdlibRand() *mrand.Rand { + // G404: Use of weak random number generator (math/rand instead of crypto/rand) + //nolint:gosec + return mrand.New(mrand.NewSource(newSeed())) +} + +func newSeed() int64 { bz := cRandBytes(8) var seed uint64 for i := 0; i < 8; i++ { seed |= uint64(bz[i]) seed <<= 8 } - r.reset(int64(seed)) + return int64(seed) +} + +func (r *Rand) init() { + r.reset(newSeed()) } func (r *Rand) reset(seed int64) { @@ -302,6 +316,8 @@ func (r *Rand) Perm(n int) []int { // NOTE: This relies on the os's random number generator. // For real security, we should salt that with some seed. // See github.com/cometbft/cometbft/crypto for a more secure reader. +// This function is thread safe, see: +// https://stackoverflow.com/questions/75685374/is-golang-crypto-rand-thread-safe func cRandBytes(numBytes int) []byte { b := make([]byte, numBytes) _, err := crand.Read(b) diff --git a/libs/rand/random_test.go b/libs/rand/random_test.go index ec4aa32718..de6405bb8c 100644 --- a/libs/rand/random_test.go +++ b/libs/rand/random_test.go @@ -83,6 +83,24 @@ func TestRngConcurrencySafety(_ *testing.T) { wg.Wait() } +// Makes a new stdlib random instance 100 times concurrently. +// Ensures that it is concurrent safe to create rand instances, and call independent rand +// sources in parallel. +func TestStdlibRngConcurrencySafety(_ *testing.T) { + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + r := NewStdlibRand() + _ = r.Uint64() + <-time.After(time.Millisecond * time.Duration(Intn(100))) + _ = r.Perm(3) + }() + } + wg.Wait() +} + func BenchmarkRandBytes10B(b *testing.B) { benchmarkRandBytes(b, 10) }