Skip to content

Commit

Permalink
simplify ensure peers
Browse files Browse the repository at this point in the history
  • Loading branch information
czarcas7ic committed Jun 25, 2024
1 parent a008f3c commit fa7f420
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 128 deletions.
1 change: 1 addition & 0 deletions p2p/pex/addrbook.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func (a *addrBook) GetAddressRegion(addr *p2p.NetAddress) (string, error) {

ka, exists := a.addrLookup[addr.ID]
if !exists || ka.Region == "" {
// If the region is not set, or the address is not in the lookup table, we will attempt to get it from the IP address
region, err := p2p.GetRegionFromIP(addr.IP.String())
a.curRegionQueryCount++
if err != nil {
Expand Down
223 changes: 95 additions & 128 deletions p2p/pex/pex_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,15 +430,14 @@ func (r *Reactor) ensurePeersRoutine() {

// fire once immediately.
// ensures we dial the seeds right away if the book is empty
swConfig := r.Switch.GetConfig()
r.ensurePeers(swConfig.RegionAware)
r.ensurePeers()

// fire periodically
ticker := time.NewTicker(r.ensurePeersPeriod)
for {
select {
case <-ticker.C:
r.ensurePeers(swConfig.RegionAware)
r.ensurePeers()
case <-r.Quit():
ticker.Stop()
return
Expand All @@ -451,7 +450,7 @@ func (r *Reactor) ensurePeersRoutine() {
// heuristic that we haven't perfected yet, or, perhaps is manually edited by
// the node operator. It should not be used to compute what addresses are
// already connected or not.
func (r *Reactor) ensurePeers(regionAware bool) {
func (r *Reactor) ensurePeers() {
var (
out, in, dial = r.Switch.NumPeers()
numToDial = r.Switch.MaxNumOutboundPeers() - (out + dial)
Expand All @@ -472,158 +471,126 @@ func (r *Reactor) ensurePeers(regionAware bool) {
// not perfect, but somewhat ensures that we prioritize connecting to more-vetted
// NOTE: range here is [10, 90]. Too high ?
newBias := cmtmath.MinInt(out, 8)*10 + 10

toDial := make(map[p2p.ID]*p2p.NetAddress)
maxAttempts := numToDial * 3
swConfig := r.Switch.GetConfig()

if regionAware {
r.book.ResetCurRegionQueryCount()
toDialInRegion := make(map[p2p.ID]*p2p.NetAddress)
toDialOutOfRegion := make(map[p2p.ID]*p2p.NetAddress)
var toDial map[p2p.ID]*p2p.NetAddress
if swConfig.RegionAware {
toDial = r.collectAddressesByRegion(newBias, numToDial, maxAttempts)
} else {
toDial = r.collectAddresses(newBias, numToDial, maxAttempts)
}

r.dialAddresses(toDial, numToDial)

swConfig := r.Switch.GetConfig()
if r.book.NeedMoreAddrs() {
// Check if banned nodes can be reinstated
r.book.ReinstateBadPeers()
}

currentOutboundInOtherRegion := r.Switch.CurrentNumOutboundPeersInOtherRegion
maxOutboundPeersInOtherRegion := swConfig.MaxNumOutboundPeers - int(swConfig.PercentPeersInSameRegion*float64(swConfig.MaxNumOutboundPeers))
if r.book.NeedMoreAddrs() {
// If we still need more addresses, request more from peers or dial seeds.
r.requestMoreAddresses()
}
}

numToDialInOtherRegion := maxOutboundPeersInOtherRegion - currentOutboundInOtherRegion
numToDialInSameRegion := numToDial - numToDialInOtherRegion
func (r *Reactor) collectAddresses(newBias, numToDial, maxAttempts int) map[p2p.ID]*p2p.NetAddress {
toDial := make(map[p2p.ID]*p2p.NetAddress)
reserveSize := cmtmath.MaxInt((numToDial+1)/2, 5)

reserveSizeInRegion := cmtmath.MaxInt((numToDialInSameRegion+1)/2, 5)
reserveSizeOutOfRegion := cmtmath.MaxInt((numToDialInOtherRegion+1)/2, 5)
if numToDialInSameRegion == 0 {
reserveSizeInRegion = 0
for i := 0; i < maxAttempts && len(toDial) < numToDial+reserveSize; i++ {
try := r.book.PickAddress(newBias)
if try == nil || r.Switch.IsDialingOrExistingAddress(try) {
continue
}
if numToDialInOtherRegion == 0 {
reserveSizeOutOfRegion = 0
if _, selected := toDial[try.ID]; selected {
continue
}
toDial[try.ID] = try
}
return toDial
}

// First iteration: Dial peers in the same region
for i := 0; i < maxAttempts && len(toDialInRegion) < numToDialInSameRegion+reserveSizeInRegion; i++ {
try := r.book.PickAddressInRegion(newBias, r.Switch.MyRegion)
if try == nil {
continue
}
if _, selected := toDialInRegion[try.ID]; selected {
continue
}
if r.Switch.IsDialingOrExistingAddress(try) {
continue
}
toDialInRegion[try.ID] = try
}
func (r *Reactor) collectAddressesByRegion(newBias, numToDial, maxAttempts int) map[p2p.ID]*p2p.NetAddress {
toDialInRegion := make(map[p2p.ID]*p2p.NetAddress)
toDialOutOfRegion := make(map[p2p.ID]*p2p.NetAddress)

// Second iteration: Dial peers in other regions
for i := 0; i < maxAttempts && len(toDialOutOfRegion) < numToDialInOtherRegion+reserveSizeOutOfRegion; i++ {
try := r.book.PickAddressOutsideRegion(newBias, r.Switch.MyRegion)
if try == nil {
continue
}
if _, selected := toDialOutOfRegion[try.ID]; selected {
continue
}
if r.Switch.IsDialingOrExistingAddress(try) {
continue
}
toDialOutOfRegion[try.ID] = try
}
swConfig := r.Switch.GetConfig()
r.book.ResetCurRegionQueryCount()

// Dial in-region addresses
successfulDials := 0
for _, addr := range toDialInRegion {
if successfulDials >= numToDialInSameRegion {
break
}
err := r.dialPeer(addr)
if err != nil {
switch err.(type) {
case errMaxAttemptsToDial, errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Debug(err.Error(), "addr", addr)
}
} else {
successfulDials++
}
}
currentOutboundInOtherRegion := r.Switch.CurrentNumOutboundPeersInOtherRegion
maxOutboundPeersInOtherRegion := swConfig.MaxNumOutboundPeers - int(swConfig.PercentPeersInSameRegion*float64(swConfig.MaxNumOutboundPeers))

// Dial out-of-region addresses
for _, addr := range toDialOutOfRegion {
if successfulDials >= numToDial {
break
}
err := r.dialPeer(addr)
if err != nil {
switch err.(type) {
case errMaxAttemptsToDial, errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Debug(err.Error(), "addr", addr)
}
} else {
successfulDials++
}
}
} else {
reserveSize := cmtmath.MaxInt((numToDial+1)/2, 5)
numToDialInOtherRegion := maxOutboundPeersInOtherRegion - currentOutboundInOtherRegion
numToDialInSameRegion := numToDial - numToDialInOtherRegion

for i := 0; i < maxAttempts && len(toDial) < numToDial+reserveSize; i++ {
try := r.book.PickAddress(newBias)
if try == nil {
continue
}
if _, selected := toDial[try.ID]; selected {
continue
}
if r.Switch.IsDialingOrExistingAddress(try) {
continue
}
toDial[try.ID] = try
}
reserveSizeInRegion := cmtmath.MaxInt((numToDialInSameRegion+1)/2, 5)
reserveSizeOutOfRegion := cmtmath.MaxInt((numToDialInOtherRegion+1)/2, 5)

// Dial picked addresses
successfulDials := 0
for _, addr := range toDial {
if successfulDials >= numToDial {
break
for i := 0; i < maxAttempts; i++ {
if len(toDialInRegion) < numToDialInSameRegion+reserveSizeInRegion {
try := r.book.PickAddressInRegion(newBias, r.Switch.MyRegion)
if try != nil && !r.Switch.IsDialingOrExistingAddress(try) {
if _, selected := toDialInRegion[try.ID]; !selected {
toDialInRegion[try.ID] = try
}
}
err := r.dialPeer(addr)
if err != nil {
switch err.(type) {
case errMaxAttemptsToDial, errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Debug(err.Error(), "addr", addr)
}
if len(toDialOutOfRegion) < numToDialInOtherRegion+reserveSizeOutOfRegion {
try := r.book.PickAddressOutsideRegion(newBias, r.Switch.MyRegion)
if try != nil && !r.Switch.IsDialingOrExistingAddress(try) {
if _, selected := toDialOutOfRegion[try.ID]; !selected {
toDialOutOfRegion[try.ID] = try
}
} else {
successfulDials++
}
}
}

if r.book.NeedMoreAddrs() {
// Check if banned nodes can be reinstated
r.book.ReinstateBadPeers()
toDial := make(map[p2p.ID]*p2p.NetAddress)
for k, v := range toDialInRegion {
toDial[k] = v
}
for k, v := range toDialOutOfRegion {
toDial[k] = v
}
return toDial
}

if r.book.NeedMoreAddrs() {
func (r *Reactor) dialAddresses(toDial map[p2p.ID]*p2p.NetAddress, numToDial int) {
successfulDials := 0
for _, addr := range toDial {
if successfulDials >= numToDial {
break
}
err := r.dialPeer(addr)
if err != nil {
switch err.(type) {
case errMaxAttemptsToDial, errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Debug(err.Error(), "addr", addr)
}
} else {
successfulDials++
}
}
}

func (r *Reactor) requestMoreAddresses() {
peers := r.Switch.Peers().List()
if len(peers) > 0 {
// 1) Pick a random peer and ask for more.
peers := r.Switch.Peers().List()
peersCount := len(peers)
if peersCount > 0 {
peer := peers[cmtrand.Int()%peersCount]
r.Logger.Info("We need more addresses. Sending pexRequest to random peer", "peer", peer)
r.RequestAddrs(peer)
}
peer := peers[cmtrand.Int()%len(peers)]
r.Logger.Info("We need more addresses. Sending pexRequest to random peer", "peer", peer)
r.RequestAddrs(peer)
}

if len(peers) == 0 {
// 2) Dial seeds if we are not dialing anyone.
// This is done in addition to asking a peer for addresses to work-around
// peers not participating in PEX.
if len(toDial) == 0 {
r.Logger.Info("No addresses to dial. Falling back to seeds")
r.dialSeeds()
}
r.Logger.Info("No addresses to dial. Falling back to seeds")
r.dialSeeds()
}
}

Expand Down

0 comments on commit fa7f420

Please sign in to comment.