Skip to content

Commit

Permalink
perf: Make every gossip thread use its own randomness instance, reduc…
Browse files Browse the repository at this point in the history
…… (backport #77) (#82)

* perf: Make every gossip thread use its own randomness instance, reducing contention (cometbft#3006)

Closes cometbft#3005

As noted in that issue, we currently are doing extra CPU overhead and
mutex contention for getting a random number. This PR removes this
overhead by making every performance sensitive thread have its own rand
instance.

In a subsequent PR, we can cleanup all the testing usages, and likely
just entirely delete our internal randomness package. I didn't do that
in this PR to keep it straightforward to verify.

---

- [x] Tests written/updated
- [ ] Changelog entry added in `.changelog` (we use
[unclog](https://github.com/informalsystems/unclog) to manage our
changelog)
- [ ] Updated relevant documentation (`docs/` or `spec/`) and code
comments
- [ ] Title follows the [Conventional
Commits](https://www.conventionalcommits.org/en/v1.0.0/) spec

(cherry picked from commit f55b9f4)

* Add Changelgo

(cherry picked from commit ce04f04)

* Fix changelog further

(cherry picked from commit bd34ce6)

---------

Co-authored-by: Dev Ojha <ValarDragon@users.noreply.github.com>
Co-authored-by: Dev Ojha <dojha@berkeley.edu>
Co-authored-by: PaddyMc <paddymchale@hotmail.com>
  • Loading branch information
4 people committed Aug 19, 2024
1 parent 4e0fc75 commit 7a7fce2
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- [`consensus`] Use an independent rng for gossip threads, reducing mutex contention.
([\#3005](https://github.com/cometbft/cometbft/issues/3005)
4 changes: 0 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
# CHANGELOG

## v0.38.11

*August 12, 2024*

This release fixes a panic in consensus where CometBFT would previously panic
if there's no extension signature in non-nil Precommit EVEN IF vote extensions
themselves are disabled.
Expand Down
47 changes: 26 additions & 21 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"errors"
"fmt"
"math/rand"
"reflect"
"sync"
"time"
Expand All @@ -12,6 +13,7 @@ import (
cmtevents "github.com/cometbft/cometbft/libs/events"
cmtjson "github.com/cometbft/cometbft/libs/json"
"github.com/cometbft/cometbft/libs/log"
cmtrand "github.com/cometbft/cometbft/libs/rand"
cmtsync "github.com/cometbft/cometbft/libs/sync"
"github.com/cometbft/cometbft/p2p"
cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus"
Expand Down Expand Up @@ -538,6 +540,7 @@ func (conR *Reactor) getRoundState() *cstypes.RoundState {

func (conR *Reactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
rng := cmtrand.NewStdlibRand()

OUTER_LOOP:
for {
Expand All @@ -550,7 +553,7 @@ OUTER_LOOP:

// Send proposal Block parts?
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartSetHeader) {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(rng); ok {
part := rs.ProposalBlockParts.GetPart(index)
parts, err := part.ToProto()
if err != nil {
Expand Down Expand Up @@ -589,7 +592,7 @@ OUTER_LOOP:
// continue the loop since prs is a copy and not effected by this initialization
continue OUTER_LOOP
}
conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer, rng)
continue OUTER_LOOP
}

Expand Down Expand Up @@ -644,9 +647,9 @@ OUTER_LOOP:
}

func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundState,
prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer,
) {
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
prs *cstypes.PeerRoundState, ps *PeerState, peer p2p.Peer, rng *rand.Rand) {

if index, ok := prs.ProposalBlockParts.Not().PickRandom(rng); ok {
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
Expand Down Expand Up @@ -697,6 +700,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt

func (conR *Reactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
rng := cmtrand.NewStdlibRand()

// Simple hack to throttle logs upon sleep.
sleeping := 0
Expand All @@ -723,15 +727,15 @@ OUTER_LOOP:
// If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height {
heightLogger := logger.With("height", prs.Height)
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
if conR.gossipVotesForHeight(heightLogger, rs, prs, ps, rng) {
continue OUTER_LOOP
}
}

// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ps.PickSendVote(rs.LastCommit) {
if ps.PickSendVote(rng, rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
}
Expand Down Expand Up @@ -762,7 +766,7 @@ OUTER_LOOP:
if ec == nil {
continue
}
if ps.PickSendVote(ec) {
if ps.PickSendVote(rng, ec) {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
continue OUTER_LOOP
}
Expand All @@ -771,9 +775,9 @@ OUTER_LOOP:
if sleeping == 0 {
// We sent nothing. Sleep...
sleeping = 1
logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
"localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
"localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
// logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
// "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
// "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
} else if sleeping == 2 {
// Continued sleep...
sleeping = 1
Expand All @@ -789,18 +793,19 @@ func (conR *Reactor) gossipVotesForHeight(
rs *cstypes.RoundState,
prs *cstypes.PeerRoundState,
ps *PeerState,
rng *rand.Rand,
) bool {
// If there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
if ps.PickSendVote(rs.LastCommit) {
if ps.PickSendVote(rng, rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send")
return true
}
}
// If there are POL prevotes to send...
if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
if ps.PickSendVote(rng, polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
Expand All @@ -809,29 +814,29 @@ func (conR *Reactor) gossipVotesForHeight(
}
// If there are prevotes to send...
if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are precommits to send...
if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Precommits(prs.Round)) {
logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are prevotes to send...Needed because of validBlock mechanism
if prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
if ps.PickSendVote(rng, rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
if ps.PickSendVote(rng, polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
Expand Down Expand Up @@ -1144,8 +1149,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in

// PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok {
func (ps *PeerState) PickSendVote(rng *rand.Rand, votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(rng, votes); ok {
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if ps.peer.Send(p2p.Envelope{
ChannelID: VoteChannel,
Expand All @@ -1164,7 +1169,7 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
// PickVoteToSend picks a vote to send to the peer.
// Returns true if a vote was picked.
// NOTE: `votes` must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
func (ps *PeerState) PickVoteToSend(rng *rand.Rand, votes types.VoteSetReader) (vote *types.Vote, ok bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()

Expand All @@ -1184,7 +1189,7 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote
if psVotes == nil {
return nil, false // Not something worth sending
}
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(rng); ok {
return votes.GetByIndex(int32(index)), true
}
return nil, false
Expand Down
8 changes: 4 additions & 4 deletions libs/bits/bit_array.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"encoding/binary"
"fmt"
"math/bits"
"math/rand"
"regexp"
"strings"
"sync"

cmtmath "github.com/cometbft/cometbft/libs/math"
cmtrand "github.com/cometbft/cometbft/libs/rand"
cmtprotobits "github.com/cometbft/cometbft/proto/tendermint/libs/bits"
)

Expand Down Expand Up @@ -261,8 +261,8 @@ func (bA *BitArray) IsFull() bool {

// PickRandom returns a random index for a set bit in the bit array.
// If there is no such value, it returns 0, false.
// It uses the global randomness in `random.go` to get this index.
func (bA *BitArray) PickRandom() (int, bool) {
// It uses the provided randomness to get this index.
func (bA *BitArray) PickRandom(r *rand.Rand) (int, bool) {
if bA == nil {
return 0, false
}
Expand All @@ -273,7 +273,7 @@ func (bA *BitArray) PickRandom() (int, bool) {
bA.mtx.Unlock()
return 0, false
}
index := bA.getNthTrueIndex(cmtrand.Intn(numTrueIndices))
index := bA.getNthTrueIndex(r.Intn(numTrueIndices))
bA.mtx.Unlock()
if index == -1 {
return 0, false
Expand Down
7 changes: 5 additions & 2 deletions libs/bits/bit_array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bytes"
"encoding/json"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -17,6 +19,7 @@ var (
empty64Bits = empty16Bits + empty16Bits + empty16Bits + empty16Bits
full16bits = "xxxxxxxxxxxxxxxx"
full64bits = full16bits + full16bits + full16bits + full16bits
grand = rand.New(rand.NewSource(time.Now().UnixNano()))
)

func randBitArray(bits int) *BitArray {
Expand Down Expand Up @@ -139,7 +142,7 @@ func TestPickRandom(t *testing.T) {
var bitArr *BitArray
err := json.Unmarshal([]byte(tc.bA), &bitArr)
require.NoError(t, err)
_, ok := bitArr.PickRandom()
_, ok := bitArr.PickRandom(grand)
require.Equal(t, tc.ok, ok, "PickRandom got an unexpected result on input %s", tc.bA)
}
}
Expand Down Expand Up @@ -394,6 +397,6 @@ func BenchmarkPickRandomBitArray(b *testing.B) {
require.NoError(b, err)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, _ = bitArr.PickRandom()
_, _ = bitArr.PickRandom(grand)
}
}
20 changes: 18 additions & 2 deletions libs/rand/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,28 @@ func NewRand() *Rand {
return rand
}

func (r *Rand) init() {
// Make a new stdlib rand source. Its up to the caller to ensure
// that the rand source is not called in parallel.
// The failure mode of calling the returned rand multiple times in parallel is
// repeated values across threads.
func NewStdlibRand() *mrand.Rand {
// G404: Use of weak random number generator (math/rand instead of crypto/rand)
//nolint:gosec
return mrand.New(mrand.NewSource(newSeed()))
}

func newSeed() int64 {
bz := cRandBytes(8)
var seed uint64
for i := 0; i < 8; i++ {
seed |= uint64(bz[i])
seed <<= 8
}
r.reset(int64(seed))
return int64(seed)
}

func (r *Rand) init() {
r.reset(newSeed())
}

func (r *Rand) reset(seed int64) {
Expand Down Expand Up @@ -302,6 +316,8 @@ func (r *Rand) Perm(n int) []int {
// NOTE: This relies on the os's random number generator.
// For real security, we should salt that with some seed.
// See github.com/cometbft/cometbft/crypto for a more secure reader.
// This function is thread safe, see:
// https://stackoverflow.com/questions/75685374/is-golang-crypto-rand-thread-safe
func cRandBytes(numBytes int) []byte {
b := make([]byte, numBytes)
_, err := crand.Read(b)
Expand Down
18 changes: 18 additions & 0 deletions libs/rand/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ func TestRngConcurrencySafety(_ *testing.T) {
wg.Wait()
}

// Makes a new stdlib random instance 100 times concurrently.
// Ensures that it is concurrent safe to create rand instances, and call independent rand
// sources in parallel.
func TestStdlibRngConcurrencySafety(_ *testing.T) {
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r := NewStdlibRand()
_ = r.Uint64()
<-time.After(time.Millisecond * time.Duration(Intn(100)))
_ = r.Perm(3)
}()
}
wg.Wait()
}

func BenchmarkRandBytes10B(b *testing.B) {
benchmarkRandBytes(b, 10)
}
Expand Down

0 comments on commit 7a7fce2

Please sign in to comment.