Skip to content

Commit

Permalink
Use vector in message sender (#2988)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored May 3, 2024
1 parent 7467a40 commit bc4d747
Showing 1 changed file with 52 additions and 43 deletions.
95 changes: 52 additions & 43 deletions snow/networking/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
)

var _ common.Sender = (*sender)(nil)
const opLabel = "op"

var (
_ common.Sender = (*sender)(nil)

opLabels = []string{opLabel}
)

// sender is a wrapper around an ExternalSender.
// Messages to this node are put directly into [router] rather than
Expand All @@ -37,11 +43,11 @@ type sender struct {
router router.Router
timeouts timeout.Manager

// Request message type --> Counts how many of that request
// have failed because the node was benched
failedDueToBench map[message.Op]prometheus.Counter
engineType p2p.EngineType
subnet subnets.Subnet
// Counts how many request have failed because the node was benched
failedDueToBench *prometheus.CounterVec // op

engineType p2p.EngineType
subnet subnets.Subnet
}

func New(
Expand All @@ -54,40 +60,33 @@ func New(
subnet subnets.Subnet,
) (common.Sender, error) {
s := &sender{
ctx: ctx,
msgCreator: msgCreator,
sender: externalSender,
router: router,
timeouts: timeouts,
failedDueToBench: make(map[message.Op]prometheus.Counter, len(message.ConsensusRequestOps)),
engineType: engineType,
subnet: subnet,
}

for _, op := range message.ConsensusRequestOps {
counter := prometheus.NewCounter(
ctx: ctx,
msgCreator: msgCreator,
sender: externalSender,
router: router,
timeouts: timeouts,
failedDueToBench: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: fmt.Sprintf("%s_failed_benched", op),
Help: fmt.Sprintf("# of times a %s request was not sent because the node was benched", op),
Namespace: "",
Name: "failed_benched",
Help: "requests dropped because a node was benched",
},
)

switch engineType {
case p2p.EngineType_ENGINE_TYPE_SNOWMAN:
if err := ctx.Registerer.Register(counter); err != nil {
return nil, fmt.Errorf("couldn't register metric for %s: %w", op, err)
}
case p2p.EngineType_ENGINE_TYPE_AVALANCHE:
if err := ctx.AvalancheRegisterer.Register(counter); err != nil {
return nil, fmt.Errorf("couldn't register metric for %s: %w", op, err)
}
default:
return nil, fmt.Errorf("unknown engine type %s", engineType)
}

s.failedDueToBench[op] = counter
}
return s, nil
opLabels,
),
engineType: engineType,
subnet: subnet,
}

var reg prometheus.Registerer
switch engineType {
case p2p.EngineType_ENGINE_TYPE_SNOWMAN:
reg = ctx.Registerer
case p2p.EngineType_ENGINE_TYPE_AVALANCHE:
reg = ctx.AvalancheRegisterer
default:
return nil, fmt.Errorf("unknown engine type %s", engineType)
}
return s, reg.Register(s.failedDueToBench)
}

func (s *sender) SendGetStateSummaryFrontier(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32) {
Expand Down Expand Up @@ -675,7 +674,9 @@ func (s *sender) SendGetAncestors(ctx context.Context, nodeID ids.NodeID, reques
// [nodeID] may be benched. That is, they've been unresponsive so we don't
// even bother sending requests to them. We just have them immediately fail.
if s.timeouts.IsBenched(nodeID, s.ctx.ChainID) {
s.failedDueToBench[message.GetAncestorsOp].Inc() // update metric
s.failedDueToBench.With(prometheus.Labels{
opLabel: message.GetAncestorsOp.String(),
}).Inc()
s.timeouts.RegisterRequestToUnreachableValidator()
go s.router.HandleInbound(ctx, inMsg)
return
Expand Down Expand Up @@ -794,7 +795,9 @@ func (s *sender) SendGet(ctx context.Context, nodeID ids.NodeID, requestID uint3
// [nodeID] may be benched. That is, they've been unresponsive so we don't
// even bother sending requests to them. We just have them immediately fail.
if s.timeouts.IsBenched(nodeID, s.ctx.ChainID) {
s.failedDueToBench[message.GetOp].Inc() // update metric
s.failedDueToBench.With(prometheus.Labels{
opLabel: message.GetOp.String(),
}).Inc()
s.timeouts.RegisterRequestToUnreachableValidator()
go s.router.HandleInbound(ctx, inMsg)
return
Expand Down Expand Up @@ -948,7 +951,9 @@ func (s *sender) SendPushQuery(
// immediately fail.
for nodeID := range nodeIDs {
if s.timeouts.IsBenched(nodeID, s.ctx.ChainID) {
s.failedDueToBench[message.PushQueryOp].Inc() // update metric
s.failedDueToBench.With(prometheus.Labels{
opLabel: message.PushQueryOp.String(),
}).Inc()
nodeIDs.Remove(nodeID)
s.timeouts.RegisterRequestToUnreachableValidator()

Expand Down Expand Up @@ -1084,7 +1089,9 @@ func (s *sender) SendPullQuery(
// have them immediately fail.
for nodeID := range nodeIDs {
if s.timeouts.IsBenched(nodeID, s.ctx.ChainID) {
s.failedDueToBench[message.PullQueryOp].Inc() // update metric
s.failedDueToBench.With(prometheus.Labels{
opLabel: message.PullQueryOp.String(),
}).Inc()
nodeIDs.Remove(nodeID)
s.timeouts.RegisterRequestToUnreachableValidator()
// Immediately register a failure. Do so asynchronously to avoid
Expand Down Expand Up @@ -1331,7 +1338,9 @@ func (s *sender) SendAppRequest(ctx context.Context, nodeIDs set.Set[ids.NodeID]
// have them immediately fail.
for nodeID := range nodeIDs {
if s.timeouts.IsBenched(nodeID, s.ctx.ChainID) {
s.failedDueToBench[message.AppRequestOp].Inc() // update metric
s.failedDueToBench.With(prometheus.Labels{
opLabel: message.AppRequestOp.String(),
}).Inc()
nodeIDs.Remove(nodeID)
s.timeouts.RegisterRequestToUnreachableValidator()

Expand Down

0 comments on commit bc4d747

Please sign in to comment.