Skip to content

Commit

Permalink
Remove subnet filter from Peer.TrackedSubnets() (#2975)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored May 24, 2024
1 parent 54c4b53 commit 15ac8cd
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 210 deletions.
29 changes: 18 additions & 11 deletions network/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/peer"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/constants"
"github.com/ava-labs/avalanchego/utils/set"
)

type metrics struct {
// trackedSubnets does not include the primary network ID
trackedSubnets set.Set[ids.ID]

numTracked prometheus.Gauge
numPeers prometheus.Gauge
numSubnetPeers *prometheus.GaugeVec
Expand All @@ -41,8 +43,13 @@ type metrics struct {
peerConnectedStartTimesSum float64
}

func newMetrics(namespace string, registerer prometheus.Registerer, initialSubnetIDs set.Set[ids.ID]) (*metrics, error) {
func newMetrics(
namespace string,
registerer prometheus.Registerer,
trackedSubnets set.Set[ids.ID],
) (*metrics, error) {
m := &metrics{
trackedSubnets: trackedSubnets,
numPeers: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "peers",
Expand Down Expand Up @@ -169,11 +176,7 @@ func newMetrics(namespace string, registerer prometheus.Registerer, initialSubne
)

// init subnet tracker metrics with tracked subnets
for subnetID := range initialSubnetIDs {
// no need to track primary network ID
if subnetID == constants.PrimaryNetworkID {
continue
}
for subnetID := range trackedSubnets {
// initialize to 0
subnetIDStr := subnetID.String()
m.numSubnetPeers.WithLabelValues(subnetIDStr).Set(0)
Expand All @@ -189,8 +192,10 @@ func (m *metrics) markConnected(peer peer.Peer) {
m.connected.Inc()

trackedSubnets := peer.TrackedSubnets()
for subnetID := range trackedSubnets {
m.numSubnetPeers.WithLabelValues(subnetID.String()).Inc()
for subnetID := range m.trackedSubnets {
if trackedSubnets.Contains(subnetID) {
m.numSubnetPeers.WithLabelValues(subnetID.String()).Inc()
}
}

m.lock.Lock()
Expand All @@ -206,8 +211,10 @@ func (m *metrics) markDisconnected(peer peer.Peer) {
m.disconnected.Inc()

trackedSubnets := peer.TrackedSubnets()
for subnetID := range trackedSubnets {
m.numSubnetPeers.WithLabelValues(subnetID.String()).Dec()
for subnetID := range m.trackedSubnets {
if trackedSubnets.Contains(subnetID) {
m.numSubnetPeers.WithLabelValues(subnetID.String()).Dec()
}
}

m.lock.Lock()
Expand Down
14 changes: 8 additions & 6 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,12 @@ func (n *network) Connected(nodeID ids.NodeID) {

peerVersion := peer.Version()
n.router.Connected(nodeID, peerVersion, constants.PrimaryNetworkID)
for subnetID := range peer.TrackedSubnets() {
n.router.Connected(nodeID, peerVersion, subnetID)

trackedSubnets := peer.TrackedSubnets()
for subnetID := range n.peerConfig.MySubnets {
if trackedSubnets.Contains(subnetID) {
n.router.Connected(nodeID, peerVersion, subnetID)
}
}
}

Expand Down Expand Up @@ -694,8 +698,7 @@ func (n *network) getPeers(
continue
}

trackedSubnets := peer.TrackedSubnets()
if subnetID != constants.PrimaryNetworkID && !trackedSubnets.Contains(subnetID) {
if trackedSubnets := peer.TrackedSubnets(); !trackedSubnets.Contains(subnetID) {
continue
}

Expand Down Expand Up @@ -731,8 +734,7 @@ func (n *network) samplePeers(
numValidatorsToSample+config.NonValidators+config.Peers,
func(p peer.Peer) bool {
// Only return peers that are tracking [subnetID]
trackedSubnets := p.TrackedSubnets()
if subnetID != constants.PrimaryNetworkID && !trackedSubnets.Contains(subnetID) {
if trackedSubnets := p.TrackedSubnets(); !trackedSubnets.Contains(subnetID) {
return false
}

Expand Down
15 changes: 8 additions & 7 deletions network/peer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ type Config struct {
Network Network
Router router.InboundHandler
VersionCompatibility version.Compatibility
MySubnets set.Set[ids.ID]
Beacons validators.Manager
Validators validators.Manager
NetworkID uint32
PingFrequency time.Duration
PongTimeout time.Duration
MaxClockDifference time.Duration
// MySubnets does not include the primary network ID
MySubnets set.Set[ids.ID]
Beacons validators.Manager
Validators validators.Manager
NetworkID uint32
PingFrequency time.Duration
PongTimeout time.Duration
MaxClockDifference time.Duration

SupportedACPs []uint32
ObjectedACPs []uint32
Expand Down
37 changes: 26 additions & 11 deletions network/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// maxBloomSaltLen restricts the allowed size of the bloom salt to prevent
// excessively expensive bloom filter contains checks.
maxBloomSaltLen = 32
// maxNumTrackedSubnets limits how many subnets a peer can track to prevent
// excessive memory usage.
maxNumTrackedSubnets = 16

disconnectingLog = "disconnecting from peer"
failedToCreateMessageLog = "failed to create message"
Expand Down Expand Up @@ -139,8 +142,8 @@ type peer struct {
// version is the claimed version the peer is running that we received in
// the Handshake message.
version *version.Application
// trackedSubnets is the subset of subnetIDs the peer sent us in the Handshake
// message that we are also tracking.
// trackedSubnets are the subnetIDs the peer sent us in the Handshake
// message. The primary network ID is always included.
trackedSubnets set.Set[ids.ID]
// options of ACPs provided in the Handshake message.
supportedACPs set.Set[uint32]
Expand Down Expand Up @@ -271,9 +274,8 @@ func (p *peer) Info() Info {
publicIPStr = p.ip.IPPort.String()
}

uptimes := make(map[ids.ID]json.Uint32, p.trackedSubnets.Len())

for subnetID := range p.trackedSubnets {
uptimes := make(map[ids.ID]json.Uint32, p.MySubnets.Len())
for subnetID := range p.MySubnets {
uptime, exist := p.ObservedUptime(subnetID)
if !exist {
continue
Expand Down Expand Up @@ -851,8 +853,12 @@ func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) {
primaryUptime = 0
}

subnetUptimes := make([]*p2p.SubnetUptime, 0, p.trackedSubnets.Len())
for subnetID := range p.trackedSubnets {
subnetUptimes := make([]*p2p.SubnetUptime, 0, p.MySubnets.Len())
for subnetID := range p.MySubnets {
if !p.trackedSubnets.Contains(subnetID) {
continue
}

subnetUptime, err := p.UptimeCalculator.CalculateUptimePercent(p.id, subnetID)
if err != nil {
p.Log.Debug(failedToGetUptimeLog,
Expand Down Expand Up @@ -951,6 +957,18 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) {
}

// handle subnet IDs
if numTrackedSubnets := len(msg.TrackedSubnets); numTrackedSubnets > maxNumTrackedSubnets {
p.Log.Debug(malformedMessageLog,
zap.Stringer("nodeID", p.id),
zap.Stringer("messageOp", message.HandshakeOp),
zap.String("field", "trackedSubnets"),
zap.Int("numTrackedSubnets", numTrackedSubnets),
)
p.StartClose()
return
}

p.trackedSubnets.Add(constants.PrimaryNetworkID)
for _, subnetIDBytes := range msg.TrackedSubnets {
subnetID, err := ids.ToID(subnetIDBytes)
if err != nil {
Expand All @@ -963,10 +981,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) {
p.StartClose()
return
}
// add only if we also track this subnet
if p.MySubnets.Contains(subnetID) {
p.trackedSubnets.Add(subnetID)
}
p.trackedSubnets.Add(subnetID)
}

for _, acp := range msg.SupportedAcps {
Expand Down
Loading

0 comments on commit 15ac8cd

Please sign in to comment.