Skip to content

Commit

Permalink
feat!: traces gossiped votes in the InfluxDB (#1204)
Browse files Browse the repository at this point in the history
Closes #1200
  • Loading branch information
staheri14 authored Feb 2, 2024
1 parent de08d3a commit 40ee792
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 15 deletions.
47 changes: 33 additions & 14 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,12 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
case *VoteMessage:
cs := conR.conS
cs.mtx.RLock()
height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size()
height, round, valSize, lastCommitSize := cs.Height, cs.Round,
cs.Validators.Size(), cs.LastCommit.Size()
cs.mtx.RUnlock()

schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.TransferTypeDownload)

ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
Expand Down Expand Up @@ -763,7 +767,7 @@ OUTER_LOOP:
// Special catchup logic.
// If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && rs.Height == prs.Height+1 {
if ps.PickSendVote(rs.LastCommit) {
if conR.pickSendVoteAndTrace(rs.LastCommit, rs, ps) {
logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP
}
Expand All @@ -776,8 +780,11 @@ OUTER_LOOP:
// Load the block commit for prs.Height,
// which contains precommit signatures for prs.Height.
if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil {
if ps.PickSendVote(commit) {
vote := ps.PickSendVote(commit)
if vote != nil {
logger.Debug("Picked Catchup commit to send", "height", prs.Height)
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
ps.peer.ID(), schema.TransferTypeUpload)
continue OUTER_LOOP
}
}
Expand All @@ -799,6 +806,18 @@ OUTER_LOOP:
}
}

// pickSendVoteAndTrace picks a vote to send and traces it.
// It returns true if a vote is sent.
// Note that it is a wrapper around PickSendVote with the addition of tracing the vote.
func (conR *Reactor) pickSendVoteAndTrace(votes types.VoteSetReader, rs *cstypes.RoundState, ps *PeerState) bool {
vote := ps.PickSendVote(votes)
if vote != nil { // if a vote is sent, trace it
schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote,
ps.peer.ID(), schema.TransferTypeUpload)
return true
}
return false
}
func (conR *Reactor) gossipVotesForHeight(
logger log.Logger,
rs *cstypes.RoundState,
Expand All @@ -808,15 +827,15 @@ func (conR *Reactor) gossipVotesForHeight(

// If there are lastCommits to send...
if prs.Step == cstypes.RoundStepNewHeight {
if ps.PickSendVote(rs.LastCommit) {
if conR.pickSendVoteAndTrace(rs.LastCommit, rs, ps) {
logger.Debug("Picked rs.LastCommit to send")
return true
}
}
// If there are POL prevotes to send...
if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
if conR.pickSendVoteAndTrace(polPrevotes, rs, ps) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
Expand All @@ -825,29 +844,29 @@ func (conR *Reactor) gossipVotesForHeight(
}
// If there are prevotes to send...
if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
if conR.pickSendVoteAndTrace(rs.Votes.Prevotes(prs.Round), rs, ps) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are precommits to send...
if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
if conR.pickSendVoteAndTrace(rs.Votes.Precommits(prs.Round), rs, ps) {
logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are prevotes to send...Needed because of validBlock mechanism
if prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
if conR.pickSendVoteAndTrace(rs.Votes.Prevotes(prs.Round), rs, ps) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
if conR.pickSendVoteAndTrace(polPrevotes, rs, ps) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
Expand Down Expand Up @@ -1163,8 +1182,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in
}

// PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
// Returns the vote if vote was sent. Otherwise, returns nil.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) *types.Vote {
if vote, ok := ps.PickVoteToSend(votes); ok {
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
if p2p.SendEnvelopeShim(ps.peer, p2p.Envelope{ //nolint: staticcheck
Expand All @@ -1174,11 +1193,11 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
},
}, ps.logger) {
ps.SetHasVote(vote)
return true
return vote
}
return false
return nil
}
return false
return nil
}

// PickVoteToSend picks a vote to send to the peer.
Expand Down
52 changes: 52 additions & 0 deletions pkg/trace/schema/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func ConsensusTables() []string {
RoundStateTable,
BlockPartsTable,
BlockTable,
VoteTable,
}
}

Expand Down Expand Up @@ -128,3 +129,54 @@ func WriteBlock(client *trace.Client, block *types.Block, size int) {
LastCommitRoundFieldKey: block.LastCommit.Round,
})
}

// Schema constants for the consensus votes tracing database.
const (
// VoteTable is the name of the table that stores the consensus
// voting traces. Follows this schema:
//
// | time | height | round | vote_type | vote_height | vote_round
// | vote_block_id| vote_unix_millisecond_timestamp
// | vote_validator_address | vote_validator_index | peer
// | transfer_type |
VoteTable = "consensus_vote"

VoteTypeFieldKey = "vote_type"
VoteHeightFieldKey = "vote_height"
VoteRoundFieldKey = "vote_round"
VoteBlockIDFieldKey = "vote_block_id"
VoteTimestampFieldKey = "vote_unix_millisecond_timestamp"
ValidatorAddressFieldKey = "vote_validator_address"
ValidatorIndexFieldKey = "vote_validator_index"
)

// WriteVote writes a tracing point for a vote using the predetermined
// schema for consensus vote tracing.
// This is used to create a table in the following
// schema:
//
// | time | height | round | vote_type | vote_height | vote_round
// | vote_block_id| vote_unix_millisecond_timestamp
// | vote_validator_address | vote_validator_index | peer
// | transfer_type |
func WriteVote(client *trace.Client,
height int64, // height of the current peer when it received/sent the vote
round int32, // round of the current peer when it received/sent the vote
vote *types.Vote, // vote received by the current peer
peer p2p.ID, // the peer from which it received the vote or the peer to which it sent the vote
transferType string, // download (received) or upload(sent)
) {
client.WritePoint(VoteTable, map[string]interface{}{
HeightFieldKey: height,
RoundFieldKey: round,
VoteTypeFieldKey: vote.Type.String(),
VoteHeightFieldKey: vote.Height,
VoteRoundFieldKey: vote.Round,
VoteBlockIDFieldKey: vote.BlockID.Hash.String(),
VoteTimestampFieldKey: vote.Timestamp.UnixMilli(),
ValidatorAddressFieldKey: vote.ValidatorAddress.String(),
ValidatorIndexFieldKey: vote.ValidatorIndex,
PeerFieldKey: peer,
TransferTypeFieldKey: transferType,
})
}
3 changes: 2 additions & 1 deletion pkg/trace/schema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
// value.
PeerFieldKey = "peer"

// TransferTypeFieldKey is the tracing field key for the class of a tx.
// TransferTypeFieldKey is the tracing field key for the class of a tx
// and votes.
TransferTypeFieldKey = "transfer_type"

// TransferTypeDownload is a tracing field value for receiving some
Expand Down

0 comments on commit 40ee792

Please sign in to comment.