Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- fastssz version bump (better error messages).
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413)
- Add Bellatrix tests for light client functions
- Add Discovery Rebooter Feature


### Changed

Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/p2p/discover"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/prysmaticlabs/go-bitfield"
Expand Down Expand Up @@ -236,7 +235,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
bootNode := bootListener.Self()
subnet := uint64(5)

var listeners []*discover.UDPv5
var listeners []*listenerWrapper
var hosts []host.Host
// setup other nodes.
cfg = &Config{
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/connection_gater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestPeer_AtMaxLimit(t *testing.T) {
}()

for i := 0; i < highWatermarkBuffer; i++ {
addPeer(t, s.peers, peers.PeerConnected)
addPeer(t, s.peers, peers.PeerConnected, false)
}

// create alternate host
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
inboundLimit += 1
// Add in up to inbound peer limit.
for i := 0; i < int(inboundLimit); i++ {
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false)
}
valid = s.InterceptAccept(&maEndpoints{raddr: multiAddress})
if valid {
Expand Down
216 changes: 172 additions & 44 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ import (
"github.com/prysmaticlabs/prysm/v5/time/slots"
)

type ListenerRebooter interface {
Listener
RebootListener() error
}

// Listener defines the discovery V5 network interface that is used
// to communicate with other peers.
type Listener interface {
Expand All @@ -47,6 +52,87 @@ type quicProtocol uint16
// quicProtocol is the "quic" key, which holds the QUIC port of the node.
func (quicProtocol) ENRKey() string { return "quic" }

type listenerWrapper struct {
mu sync.RWMutex
listener *discover.UDPv5
listenerCreator func() (*discover.UDPv5, error)
}

func newListener(listenerCreator func() (*discover.UDPv5, error)) (*listenerWrapper, error) {
rawListener, err := listenerCreator()
if err != nil {
return nil, errors.Wrap(err, "could not create new listener")
}
return &listenerWrapper{
listener: rawListener,
listenerCreator: listenerCreator,
}, nil
}

func (l *listenerWrapper) Self() *enode.Node {
l.mu.RLock()
defer l.mu.RUnlock()
return l.listener.Self()
}

func (l *listenerWrapper) Close() {
l.mu.RLock()
defer l.mu.RUnlock()
l.listener.Close()
}

func (l *listenerWrapper) Lookup(id enode.ID) []*enode.Node {
l.mu.RLock()
defer l.mu.RUnlock()
return l.listener.Lookup(id)
}

func (l *listenerWrapper) Resolve(node *enode.Node) *enode.Node {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not tested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this particular method isn't used by us anywhere at all which is why it isn't tested. I have to implement these methods as they are needed to satisfy the Listener interface. I can add it in a test, but it will purely cosmetic as we have no need for this method. The same applies for the other cases raised

l.mu.RLock()
defer l.mu.RUnlock()
return l.listener.Resolve(node)
}

func (l *listenerWrapper) RandomNodes() enode.Iterator {
l.mu.RLock()
defer l.mu.RUnlock()
return l.listener.RandomNodes()
}

func (l *listenerWrapper) Ping(node *enode.Node) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not tested.

l.mu.RLock()
defer l.mu.RUnlock()
return l.listener.Ping(node)
}

func (l *listenerWrapper) RequestENR(node *enode.Node) (*enode.Node, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not tested.

l.mu.RLock()
defer l.mu.RUnlock()
return l.listener.RequestENR(node)
}

func (l *listenerWrapper) LocalNode() *enode.LocalNode {
l.mu.RLock()
defer l.mu.RUnlock()
return l.listener.LocalNode()
}

func (l *listenerWrapper) RebootListener() error {
l.mu.Lock()
defer l.mu.Unlock()

// Close current listener
l.listener.Close()

newListener, err := l.listenerCreator()
if err != nil {
return err
}

l.listener = newListener
return nil
}

// RefreshENR uses an epoch to refresh the enr entry for our node
// with the tracked committee ids for the epoch, allowing our node
// to be dynamically discoverable by others given our tracked committee ids.
Expand Down Expand Up @@ -110,55 +196,78 @@ func (s *Service) RefreshENR() {
func (s *Service) listenForNewNodes() {
iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
defer iterator.Close()
connectivityTicker := time.NewTicker(1 * time.Minute)
thresholdCount := 0

for {
// Exit if service's context is canceled.
if s.ctx.Err() != nil {
break
}

if s.isPeerAtLimit(false /* inbound */) {
// Pause the main loop for a period to stop looking
// for new peers.
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
wantedCount := s.wantedPeerDials()
if wantedCount == 0 {
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
}
wantedNodes := enode.ReadNodes(iterator, wantedCount)
wg := new(sync.WaitGroup)
for i := 0; i < len(wantedNodes); i++ {
node := wantedNodes[i]
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
select {
case <-s.ctx.Done():
return
case <-connectivityTicker.C:
// Skip the connectivity check if not enabled.
if !features.Get().EnableDiscoveryReboot {
continue
}

if peerInfo == nil {
if !s.isBelowOutboundPeerThreshold() {
// Reset counter if we are beyond the threshold
thresholdCount = 0
continue
}
thresholdCount++
// Reboot listener if connectivity drops
if thresholdCount > 5 {
log.WithField("outboundConnectionCount", len(s.peers.OutboundConnected())).Warn("Rebooting discovery listener, reached threshold.")
if err := s.dv5Listener.RebootListener(); err != nil {
log.WithError(err).Error("Could not reboot listener")
continue
}
iterator = filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
thresholdCount = 0
}
default:
if s.isPeerAtLimit(false /* inbound */) {
// Pause the main loop for a period to stop looking
// for new peers.
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
wantedCount := s.wantedPeerDials()
if wantedCount == 0 {
log.Trace("Not looking for peers, at peer limit")
time.Sleep(pollingPeriod)
continue
}
// Restrict dials if limit is applied.
if flags.MaxDialIsActive() {
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
}
wantedNodes := enode.ReadNodes(iterator, wantedCount)
wg := new(sync.WaitGroup)
for i := 0; i < len(wantedNodes); i++ {
node := wantedNodes[i]
peerInfo, _, err := convertToAddrInfo(node)
if err != nil {
log.WithError(err).Error("Could not convert to peer info")
continue
}

// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
wg.Add(1)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
if peerInfo == nil {
continue
}
wg.Done()
}(peerInfo)

// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
s.Peers().RandomizeBackOff(peerInfo.ID)
wg.Add(1)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
}
wg.Done()
}(peerInfo)
}
wg.Wait()
}
wg.Wait()
}
}

Expand Down Expand Up @@ -299,14 +408,17 @@ func (s *Service) createLocalNode(
func (s *Service) startDiscoveryV5(
addr net.IP,
privKey *ecdsa.PrivateKey,
) (*discover.UDPv5, error) {
listener, err := s.createListener(addr, privKey)
) (*listenerWrapper, error) {
createListener := func() (*discover.UDPv5, error) {
return s.createListener(addr, privKey)
}
wrappedListener, err := newListener(createListener)
if err != nil {
return nil, errors.Wrap(err, "could not create listener")
}
record := listener.Self()
record := wrappedListener.Self()
log.WithField("ENR", record.String()).Info("Started discovery v5")
return listener, nil
return wrappedListener, nil
}

// filterPeer validates each node that we retrieve from our dht. We
Expand Down Expand Up @@ -398,6 +510,22 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
return activePeers >= maxPeers || numOfConns >= maxPeers
}

// isBelowOutboundPeerThreshold checks if the number of outbound peers that
// we are connected to satisfies the minimum expected outbound peer count
// according to our peer limit.
func (s *Service) isBelowOutboundPeerThreshold() bool {
maxPeers := int(s.cfg.MaxPeers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we cast uint to int.
Maybe it would be safer to do the opposite?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason I did it this way is because some of the other methods that we call all return int instead of uint. We would have to cast those to uint too in this particular case. Since this is a user defined value I think it is fine to expect it to be a manageable value.

inBoundLimit := s.Peers().InboundLimit()
// Impossible Condition
if maxPeers < inBoundLimit {
return false
}
outboundFloor := maxPeers - inBoundLimit
outBoundThreshold := outboundFloor / 2
outBoundCount := len(s.Peers().OutboundConnected())
return outBoundCount < outBoundThreshold
}

func (s *Service) wantedPeerDials() int {
maxPeers := int(s.cfg.MaxPeers)

Expand Down
Loading