Skip to content

Commit 2c981d5

Browse files
authored
Reboot Discovery Listener (#14487)
* Add Current Changes To Routine * Add In New Test * Add Feature Flag * Add Discovery Rebooter feature * Do Not Export Mutex And Use Zero Value Mutex * Wrap Error For Better Debugging * Fix Function Name and Add Specific Test For it * Manu's Review
1 parent 492c8af commit 2c981d5

File tree

10 files changed

+284
-65
lines changed

10 files changed

+284
-65
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
2424
- fastssz version bump (better error messages).
2525
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413)
2626
- Add Bellatrix tests for light client functions
27+
- Add Discovery Rebooter Feature
28+
2729

2830
### Changed
2931

beacon-chain/p2p/broadcaster_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"testing"
1010
"time"
1111

12-
"github.com/ethereum/go-ethereum/p2p/discover"
1312
pubsub "github.com/libp2p/go-libp2p-pubsub"
1413
"github.com/libp2p/go-libp2p/core/host"
1514
"github.com/prysmaticlabs/go-bitfield"
@@ -236,7 +235,7 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
236235
bootNode := bootListener.Self()
237236
subnet := uint64(5)
238237

239-
var listeners []*discover.UDPv5
238+
var listeners []*listenerWrapper
240239
var hosts []host.Host
241240
// setup other nodes.
242241
cfg = &Config{

beacon-chain/p2p/connection_gater_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestPeer_AtMaxLimit(t *testing.T) {
5050
}()
5151

5252
for i := 0; i < highWatermarkBuffer; i++ {
53-
addPeer(t, s.peers, peers.PeerConnected)
53+
addPeer(t, s.peers, peers.PeerConnected, false)
5454
}
5555

5656
// create alternate host
@@ -159,7 +159,7 @@ func TestService_RejectInboundPeersBeyondLimit(t *testing.T) {
159159
inboundLimit += 1
160160
// Add in up to inbound peer limit.
161161
for i := 0; i < int(inboundLimit); i++ {
162-
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED))
162+
addPeer(t, s.peers, peerdata.PeerConnectionState(ethpb.ConnectionState_CONNECTED), false)
163163
}
164164
valid = s.InterceptAccept(&maEndpoints{raddr: multiAddress})
165165
if valid {

beacon-chain/p2p/discovery.go

Lines changed: 172 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ import (
2424
"github.com/prysmaticlabs/prysm/v5/time/slots"
2525
)
2626

27+
type ListenerRebooter interface {
28+
Listener
29+
RebootListener() error
30+
}
31+
2732
// Listener defines the discovery V5 network interface that is used
2833
// to communicate with other peers.
2934
type Listener interface {
@@ -47,6 +52,87 @@ type quicProtocol uint16
4752
// quicProtocol is the "quic" key, which holds the QUIC port of the node.
4853
func (quicProtocol) ENRKey() string { return "quic" }
4954

55+
type listenerWrapper struct {
56+
mu sync.RWMutex
57+
listener *discover.UDPv5
58+
listenerCreator func() (*discover.UDPv5, error)
59+
}
60+
61+
func newListener(listenerCreator func() (*discover.UDPv5, error)) (*listenerWrapper, error) {
62+
rawListener, err := listenerCreator()
63+
if err != nil {
64+
return nil, errors.Wrap(err, "could not create new listener")
65+
}
66+
return &listenerWrapper{
67+
listener: rawListener,
68+
listenerCreator: listenerCreator,
69+
}, nil
70+
}
71+
72+
func (l *listenerWrapper) Self() *enode.Node {
73+
l.mu.RLock()
74+
defer l.mu.RUnlock()
75+
return l.listener.Self()
76+
}
77+
78+
func (l *listenerWrapper) Close() {
79+
l.mu.RLock()
80+
defer l.mu.RUnlock()
81+
l.listener.Close()
82+
}
83+
84+
func (l *listenerWrapper) Lookup(id enode.ID) []*enode.Node {
85+
l.mu.RLock()
86+
defer l.mu.RUnlock()
87+
return l.listener.Lookup(id)
88+
}
89+
90+
func (l *listenerWrapper) Resolve(node *enode.Node) *enode.Node {
91+
l.mu.RLock()
92+
defer l.mu.RUnlock()
93+
return l.listener.Resolve(node)
94+
}
95+
96+
func (l *listenerWrapper) RandomNodes() enode.Iterator {
97+
l.mu.RLock()
98+
defer l.mu.RUnlock()
99+
return l.listener.RandomNodes()
100+
}
101+
102+
func (l *listenerWrapper) Ping(node *enode.Node) error {
103+
l.mu.RLock()
104+
defer l.mu.RUnlock()
105+
return l.listener.Ping(node)
106+
}
107+
108+
func (l *listenerWrapper) RequestENR(node *enode.Node) (*enode.Node, error) {
109+
l.mu.RLock()
110+
defer l.mu.RUnlock()
111+
return l.listener.RequestENR(node)
112+
}
113+
114+
func (l *listenerWrapper) LocalNode() *enode.LocalNode {
115+
l.mu.RLock()
116+
defer l.mu.RUnlock()
117+
return l.listener.LocalNode()
118+
}
119+
120+
func (l *listenerWrapper) RebootListener() error {
121+
l.mu.Lock()
122+
defer l.mu.Unlock()
123+
124+
// Close current listener
125+
l.listener.Close()
126+
127+
newListener, err := l.listenerCreator()
128+
if err != nil {
129+
return err
130+
}
131+
132+
l.listener = newListener
133+
return nil
134+
}
135+
50136
// RefreshENR uses an epoch to refresh the enr entry for our node
51137
// with the tracked committee ids for the epoch, allowing our node
52138
// to be dynamically discoverable by others given our tracked committee ids.
@@ -110,55 +196,78 @@ func (s *Service) RefreshENR() {
110196
func (s *Service) listenForNewNodes() {
111197
iterator := filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
112198
defer iterator.Close()
199+
connectivityTicker := time.NewTicker(1 * time.Minute)
200+
thresholdCount := 0
113201

114202
for {
115-
// Exit if service's context is canceled.
116-
if s.ctx.Err() != nil {
117-
break
118-
}
119-
120-
if s.isPeerAtLimit(false /* inbound */) {
121-
// Pause the main loop for a period to stop looking
122-
// for new peers.
123-
log.Trace("Not looking for peers, at peer limit")
124-
time.Sleep(pollingPeriod)
125-
continue
126-
}
127-
wantedCount := s.wantedPeerDials()
128-
if wantedCount == 0 {
129-
log.Trace("Not looking for peers, at peer limit")
130-
time.Sleep(pollingPeriod)
131-
continue
132-
}
133-
// Restrict dials if limit is applied.
134-
if flags.MaxDialIsActive() {
135-
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
136-
}
137-
wantedNodes := enode.ReadNodes(iterator, wantedCount)
138-
wg := new(sync.WaitGroup)
139-
for i := 0; i < len(wantedNodes); i++ {
140-
node := wantedNodes[i]
141-
peerInfo, _, err := convertToAddrInfo(node)
142-
if err != nil {
143-
log.WithError(err).Error("Could not convert to peer info")
203+
select {
204+
case <-s.ctx.Done():
205+
return
206+
case <-connectivityTicker.C:
207+
// Skip the connectivity check if not enabled.
208+
if !features.Get().EnableDiscoveryReboot {
144209
continue
145210
}
146-
147-
if peerInfo == nil {
211+
if !s.isBelowOutboundPeerThreshold() {
212+
// Reset counter if we are beyond the threshold
213+
thresholdCount = 0
214+
continue
215+
}
216+
thresholdCount++
217+
// Reboot listener if connectivity drops
218+
if thresholdCount > 5 {
219+
log.WithField("outboundConnectionCount", len(s.peers.OutboundConnected())).Warn("Rebooting discovery listener, reached threshold.")
220+
if err := s.dv5Listener.RebootListener(); err != nil {
221+
log.WithError(err).Error("Could not reboot listener")
222+
continue
223+
}
224+
iterator = filterNodes(s.ctx, s.dv5Listener.RandomNodes(), s.filterPeer)
225+
thresholdCount = 0
226+
}
227+
default:
228+
if s.isPeerAtLimit(false /* inbound */) {
229+
// Pause the main loop for a period to stop looking
230+
// for new peers.
231+
log.Trace("Not looking for peers, at peer limit")
232+
time.Sleep(pollingPeriod)
148233
continue
149234
}
235+
wantedCount := s.wantedPeerDials()
236+
if wantedCount == 0 {
237+
log.Trace("Not looking for peers, at peer limit")
238+
time.Sleep(pollingPeriod)
239+
continue
240+
}
241+
// Restrict dials if limit is applied.
242+
if flags.MaxDialIsActive() {
243+
wantedCount = min(wantedCount, flags.Get().MaxConcurrentDials)
244+
}
245+
wantedNodes := enode.ReadNodes(iterator, wantedCount)
246+
wg := new(sync.WaitGroup)
247+
for i := 0; i < len(wantedNodes); i++ {
248+
node := wantedNodes[i]
249+
peerInfo, _, err := convertToAddrInfo(node)
250+
if err != nil {
251+
log.WithError(err).Error("Could not convert to peer info")
252+
continue
253+
}
150254

151-
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
152-
s.Peers().RandomizeBackOff(peerInfo.ID)
153-
wg.Add(1)
154-
go func(info *peer.AddrInfo) {
155-
if err := s.connectWithPeer(s.ctx, *info); err != nil {
156-
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
255+
if peerInfo == nil {
256+
continue
157257
}
158-
wg.Done()
159-
}(peerInfo)
258+
259+
// Make sure that peer is not dialed too often, for each connection attempt there's a backoff period.
260+
s.Peers().RandomizeBackOff(peerInfo.ID)
261+
wg.Add(1)
262+
go func(info *peer.AddrInfo) {
263+
if err := s.connectWithPeer(s.ctx, *info); err != nil {
264+
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
265+
}
266+
wg.Done()
267+
}(peerInfo)
268+
}
269+
wg.Wait()
160270
}
161-
wg.Wait()
162271
}
163272
}
164273

@@ -299,14 +408,17 @@ func (s *Service) createLocalNode(
299408
func (s *Service) startDiscoveryV5(
300409
addr net.IP,
301410
privKey *ecdsa.PrivateKey,
302-
) (*discover.UDPv5, error) {
303-
listener, err := s.createListener(addr, privKey)
411+
) (*listenerWrapper, error) {
412+
createListener := func() (*discover.UDPv5, error) {
413+
return s.createListener(addr, privKey)
414+
}
415+
wrappedListener, err := newListener(createListener)
304416
if err != nil {
305417
return nil, errors.Wrap(err, "could not create listener")
306418
}
307-
record := listener.Self()
419+
record := wrappedListener.Self()
308420
log.WithField("ENR", record.String()).Info("Started discovery v5")
309-
return listener, nil
421+
return wrappedListener, nil
310422
}
311423

312424
// filterPeer validates each node that we retrieve from our dht. We
@@ -398,6 +510,22 @@ func (s *Service) isPeerAtLimit(inbound bool) bool {
398510
return activePeers >= maxPeers || numOfConns >= maxPeers
399511
}
400512

513+
// isBelowOutboundPeerThreshold checks if the number of outbound peers that
514+
// we are connected to satisfies the minimum expected outbound peer count
515+
// according to our peer limit.
516+
func (s *Service) isBelowOutboundPeerThreshold() bool {
517+
maxPeers := int(s.cfg.MaxPeers)
518+
inBoundLimit := s.Peers().InboundLimit()
519+
// Impossible Condition
520+
if maxPeers < inBoundLimit {
521+
return false
522+
}
523+
outboundFloor := maxPeers - inBoundLimit
524+
outBoundThreshold := outboundFloor / 2
525+
outBoundCount := len(s.Peers().OutboundConnected())
526+
return outBoundCount < outBoundThreshold
527+
}
528+
401529
func (s *Service) wantedPeerDials() int {
402530
maxPeers := int(s.cfg.MaxPeers)
403531

0 commit comments

Comments
 (0)