Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tracks block part size in consensus block parts table #1067

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,16 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, int(msg.Part.Index))
conR.Metrics.BlockParts.With("peer_id", string(e.Src.ID())).Add(1)
schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload)
// The .ToProto conversion is only needed to get the part size.
// This is consistent with how it is measured in the gossipDataRoutine.
part, err := msg.Part.ToProto()
partSize := -1
if err != nil {
partSize = part.Size()
}
Comment on lines +344 to +347
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[not blocking] Does this intentionally avoid checking for an error?

Copy link
Contributor Author

@staheri14 staheri14 Aug 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is intentional, because the earlier conversion (.toProto()) is merely for measuring the size and tracing it in influx DB, which is not a critical part of the consensus operation. Nevertheless, it is not expected to get any error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is intentional, because the earlier conversion (.toProto()) is merely for measuring the size and tracing it in influx DB, which is not a critical part of the consensus operation.

Makes sense! Since it is not critical to the consensus operation, I think this could not propagate the error and log it instead but this feels kinda hacky:

part, err := msg.Part.ToProto()
if err != nil {
    // log the error
} else {
    schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round, e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload, int64(part.Size()))
}

I'm not familiar with this code so defer to what you already have.

schema.WriteBlockPart(conR.traceClient, msg.Height, msg.Round,
e.Src.ID(), msg.Part.Index, schema.TransferTypeDownload,
int64(partSize))
conR.conS.peerMsgQueue <- msgInfo{msg, e.Src.ID()}
default:
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
Expand Down Expand Up @@ -595,7 +604,9 @@ OUTER_LOOP:
Part: *parts,
},
}, logger) {
schema.WriteBlockPart(conR.traceClient, rs.Height, rs.Round, peer.ID(), part.Index, schema.TransferTypeUpload)
schema.WriteBlockPart(conR.traceClient, rs.Height,
rs.Round, peer.ID(), part.Index,
schema.TransferTypeUpload, int64(parts.Size()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the reviewers: While an alternative approach could involve utilizing a helper function to measure the part size, it's worth noting that the Size() method within the protobuf message type Part already accomplishes the same task.

ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
Expand Down
17 changes: 13 additions & 4 deletions pkg/trace/schema/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,28 @@ const (
// BlockPartIndexFieldKey is the name of the field that stores the block
// part
BlockPartIndexFieldKey = "index"

// BlockPartSizeFieldKey is the name of the field that stores the size of a block
// part in bytes. The value is an int64.
// Negative value indicates unknown block part size.
BlockPartSizeFieldKey = "block_part_size"
)

// WriteBlockPart writes a tracing point for a BlockPart using the predetermined
// schema for consensus state tracing. This is used to create a table in the
// following schema:
//
// | time | height | round | index | peer | transfer type |
// | time | height | round | peer | index | transfer type | block_part_size |
// Negative value for `size` i.e., block_part_size indicates unknown block part
// size.
func WriteBlockPart(
client *trace.Client,
height int64,
round int32,
peer p2p.ID,
index uint32,
transferType string,
size int64,
) {
// this check is redundant to what is checked during WritePoint, although it
// is an optimization to avoid allocations from the map of fields.
Expand All @@ -78,17 +86,18 @@ func WriteBlockPart(
client.WritePoint(BlockPartsTable, map[string]interface{}{
HeightFieldKey: height,
RoundFieldKey: round,
BlockPartIndexFieldKey: index,
PeerFieldKey: peer,
BlockPartIndexFieldKey: index,
TransferTypeFieldKey: transferType,
BlockPartSizeFieldKey: size,
})
}

const (
// BlockTable is the name of the table that stores metadata about consensus blocks.
// following schema:
//
// | time | height | timestamp |
// | time | height | unix_millisecond_timestamp | tx_count | square_size | block_size | proposer | last_commit_round |
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockTable = "consensus_block"

// UnixMillisecondTimestampFieldKey is the name of the field that stores the timestamp in
Expand All @@ -101,7 +110,7 @@ const (

// SquareSizeFieldKey is the name of the field that stores the square size
// of the block. SquareSize is the number of shares in a single row or
// column of the origianl data square.
// column of the original data square.
SquareSizeFieldKey = "square_size"

// BlockSizeFieldKey is the name of the field that stores the size of
Expand Down
Loading