From 4a71bf1e92fbd6e969eab9821aed35b8096b2389 Mon Sep 17 00:00:00 2001 From: sanaz Date: Fri, 25 Aug 2023 10:45:51 -0700 Subject: [PATCH 1/5] traces block part size --- consensus/reactor.go | 4 +++- pkg/trace/schema/consensus.go | 14 ++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index eb662c6af2..6b4fb0f323 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -595,7 +595,9 @@ OUTER_LOOP: Part: *parts, }, }, logger) { - schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.TransferTypeUpload) + schema.WriteBlockPart(conR.traceClient, rs.Height, + rs.Round, peer.ID(), part.Index, + schema.TransferTypeUpload, int64(parts.Size())) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } continue OUTER_LOOP diff --git a/pkg/trace/schema/consensus.go b/pkg/trace/schema/consensus.go index 3fea523853..e76424958e 100644 --- a/pkg/trace/schema/consensus.go +++ b/pkg/trace/schema/consensus.go @@ -55,13 +55,17 @@ const ( // BlockPartIndexFieldKey is the name of the field that stores the block // part BlockPartIndexFieldKey = "index" + + // BlockPartSizeFieldKey is the name of the field that stores the size of a block + // part in bytes. The value is an int64. + BlockPartSizeFieldKey = "block_part_size" ) // WriteBlockPart writes a tracing point for a BlockPart using the predetermined // schema for consensus state tracing. This is used to create a table in the // following schema: // -// | time | height | round | index | peer | transfer type | +// | time | height | round | peer | index | transfer type | block_part_size | func WriteBlockPart( client *trace.Client, height int64, @@ -69,6 +73,7 @@ func WriteBlockPart( peer p2p.ID, index uint32, transferType string, + size int64, ) { // this check is redundant to what is checked during WritePoint, although it // is an optimization to avoid allocations from the map of fields. @@ -78,9 +83,10 @@ func WriteBlockPart( client.WritePoint(BlockPartsTable, map[string]interface{}{ HeightFieldKey: height, RoundFieldKey: round, - BlockPartIndexFieldKey: index, PeerFieldKey: peer, + BlockPartIndexFieldKey: index, TransferTypeFieldKey: transferType, + BlockPartSizeFieldKey: size, }) } @@ -88,7 +94,7 @@ const ( // BlockTable is the name of the table that stores metadata about consensus blocks. // following schema: // - // | time | height | timestamp | + // | time | height | unix_millisecond_timestamp | tx_count | square_size | block_size | proposer | last_commit_round | BlockTable = "consensus_block" // UnixMillisecondTimestampFieldKey is the name of the field that stores the timestamp in @@ -101,7 +107,7 @@ const ( // SquareSizeFieldKey is the name of the field that stores the square size // of the block. SquareSize is the number of shares in a single row or - // column of the origianl data square. + // column of the original data square. SquareSizeFieldKey = "square_size" // BlockSizeFieldKey is the name of the field that stores the size of From 334bc016518584afaaa39b81d54b7862e66b5b3d Mon Sep 17 00:00:00 2001 From: sanaz Date: Fri, 25 Aug 2023 11:21:42 -0700 Subject: [PATCH 2/5] traces block part size when receiving block parts --- consensus/reactor.go | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 6b4fb0f323..b4a26e08bc 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -316,7 +316,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { if votes := ourVotes.ToProto(); votes != nil { eMsg.Votes = *votes } - p2p.TrySendEnvelopeShim(e.Src, p2p.Envelope{ //nolint: staticcheck + p2p.TrySendEnvelopeShim(e.Src, p2p.Envelope{ // nolint: staticcheck ChannelID: VoteSetBitsChannel, Message: eMsg, }, conR.Logger) @@ -338,7 +338,15 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) - schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload) + part, err := msg.Part.ToProto() // this conversion is only needed to get the part size + // consistent with how it is measured in the gossipDataRoutine + partSize := -1 + if err != nil { + partSize = part.Size() + } + schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, + e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload, + int64(partSize)) conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()} default: conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) @@ -540,7 +548,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *cmtcons.NewRoundStep) func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) { rs := conR.getRoundState() nrsMsg := makeRoundStepMessage(rs) - p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: StateChannel, Message: nrsMsg, }, conR.Logger) @@ -587,7 +595,7 @@ OUTER_LOOP: panic(err) } logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) - if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: DataChannel, Message: &cmtcons.BlockPart{ Height: rs.Height, // This tells peer that this part applies to us. @@ -644,7 +652,7 @@ OUTER_LOOP: // Proposal: share the proposal metadata with peer. { logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) - if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: DataChannel, Message: &cmtcons.Proposal{Proposal: *rs.Proposal.ToProto()}, }, logger) { @@ -658,7 +666,7 @@ OUTER_LOOP: // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound). if 0 <= rs.Proposal.POLRound { logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) - p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: DataChannel, Message: &cmtcons.ProposalPOL{ Height: rs.Height, @@ -708,7 +716,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt logger.Error("Could not convert part to proto", "index", index, "error", err) return } - if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: DataChannel, Message: &cmtcons.BlockPart{ Height: prs.Height, // Not our height, so it doesn't matter. @@ -877,7 +885,7 @@ OUTER_LOOP: if rs.Height == prs.Height { if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { - p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -897,7 +905,7 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -918,7 +926,7 @@ OUTER_LOOP: if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -941,7 +949,7 @@ OUTER_LOOP: if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() && prs.Height >= conR.conS.blockStore.Base() { if commit := conR.conS.LoadCommit(prs.Height); commit != nil { - p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck + p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -1168,7 +1176,7 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) - if p2p.SendEnvelopeShim(ps.peer, p2p.Envelope{ //nolint: staticcheck + if p2p.SendEnvelopeShim(ps.peer, p2p.Envelope{ // nolint: staticcheck ChannelID: VoteChannel, Message: &cmtcons.Vote{ Vote: vote.ToProto(), From 6dde431606592ed405ca90ad0158f9494c4f844b Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Fri, 25 Aug 2023 12:22:25 -0700 Subject: [PATCH 3/5] removes leading space in nolint directives --- consensus/reactor.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index b4a26e08bc..513e44b89e 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -316,7 +316,7 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { if votes := ourVotes.ToProto(); votes != nil { eMsg.Votes = *votes } - p2p.TrySendEnvelopeShim(e.Src, p2p.Envelope{ // nolint: staticcheck + p2p.TrySendEnvelopeShim(e.Src, p2p.Envelope{ //nolint: staticcheck ChannelID: VoteSetBitsChannel, Message: eMsg, }, conR.Logger) @@ -548,7 +548,7 @@ func makeRoundStepMessage(rs *cstypes.RoundState) (nrsMsg *cmtcons.NewRoundStep) func (conR *Reactor) sendNewRoundStepMessage(peer p2p.Peer) { rs := conR.getRoundState() nrsMsg := makeRoundStepMessage(rs) - p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: nrsMsg, }, conR.Logger) @@ -595,7 +595,7 @@ OUTER_LOOP: panic(err) } logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) - if p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: DataChannel, Message: &cmtcons.BlockPart{ Height: rs.Height, // This tells peer that this part applies to us. @@ -652,7 +652,7 @@ OUTER_LOOP: // Proposal: share the proposal metadata with peer. { logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) - if p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: DataChannel, Message: &cmtcons.Proposal{Proposal: *rs.Proposal.ToProto()}, }, logger) { @@ -666,7 +666,7 @@ OUTER_LOOP: // so we definitely have rs.Votes.Prevotes(rs.Proposal.POLRound). if 0 <= rs.Proposal.POLRound { logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) - p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: DataChannel, Message: &cmtcons.ProposalPOL{ Height: rs.Height, @@ -716,7 +716,7 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt logger.Error("Could not convert part to proto", "index", index, "error", err) return } - if p2p.SendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + if p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: DataChannel, Message: &cmtcons.BlockPart{ Height: prs.Height, // Not our height, so it doesn't matter. @@ -885,7 +885,7 @@ OUTER_LOOP: if rs.Height == prs.Height { if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { - p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -905,7 +905,7 @@ OUTER_LOOP: prs := ps.GetRoundState() if rs.Height == prs.Height { if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { - p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -926,7 +926,7 @@ OUTER_LOOP: if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { - p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -949,7 +949,7 @@ OUTER_LOOP: if prs.CatchupCommitRound != -1 && prs.Height > 0 && prs.Height <= conR.conS.blockStore.Height() && prs.Height >= conR.conS.blockStore.Base() { if commit := conR.conS.LoadCommit(prs.Height); commit != nil { - p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ // nolint: staticcheck + p2p.TrySendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: StateChannel, Message: &cmtcons.VoteSetMaj23{ Height: prs.Height, @@ -1176,7 +1176,7 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) - if p2p.SendEnvelopeShim(ps.peer, p2p.Envelope{ // nolint: staticcheck + if p2p.SendEnvelopeShim(ps.peer, p2p.Envelope{ //nolint: staticcheck ChannelID: VoteChannel, Message: &cmtcons.Vote{ Vote: vote.ToProto(), From f47fdfe4ace81104627cbef9f40f2b6046e53506 Mon Sep 17 00:00:00 2001 From: sanaz Date: Fri, 25 Aug 2023 12:27:43 -0700 Subject: [PATCH 4/5] clarifies the meaning of negative value for block part size --- pkg/trace/schema/consensus.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/trace/schema/consensus.go b/pkg/trace/schema/consensus.go index e76424958e..dd47254c1d 100644 --- a/pkg/trace/schema/consensus.go +++ b/pkg/trace/schema/consensus.go @@ -58,6 +58,7 @@ const ( // BlockPartSizeFieldKey is the name of the field that stores the size of a block // part in bytes. The value is an int64. + // Negative value indicates unknown block part size. BlockPartSizeFieldKey = "block_part_size" ) @@ -66,6 +67,8 @@ const ( // following schema: // // | time | height | round | peer | index | transfer type | block_part_size | +// Negative value for `size` i.e., block_part_size indicates unknown block part +// size. func WriteBlockPart( client *trace.Client, height int64, From a82c5749254845b0e650304af0c59f31c12a4422 Mon Sep 17 00:00:00 2001 From: Sanaz Taheri <35961250+staheri14@users.noreply.github.com> Date: Fri, 25 Aug 2023 13:40:29 -0700 Subject: [PATCH 5/5] moves the comments a few lines up --- consensus/reactor.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 513e44b89e..323fad4086 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -338,8 +338,9 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { case *BlockPartMessage: ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index)) conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1) - part, err := msg.Part.ToProto() // this conversion is only needed to get the part size - // consistent with how it is measured in the gossipDataRoutine + // The .ToProto conversion is only needed to get the part size. + // This is consistent with how it is measured in the gossipDataRoutine. + part, err := msg.Part.ToProto() partSize := -1 if err != nil { partSize = part.Size()