Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add config to limit peerstore capacity #770

Merged
merged 6 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ var (
Destination: &options.MaxPeerConnections,
EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"},
})
PeerStoreCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "peer-store-capacity",
Usage: "Maximum stored peers in the peerstore.",
Destination: &options.PeerStoreCapacity,
EnvVars: []string{"WAKUNODE2_PEERSTORE_CAPACITY"},
})
WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "websocket-support",
Aliases: []string{"ws"},
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func main() {
TcpPort,
Address,
MaxPeerConnections,
PeerStoreCapacity,
WebsocketSupport,
WebsocketPort,
WebsocketSecurePort,
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func Execute(options NodeOptions) {
node.WithKeepAlive(options.KeepAlive),
node.WithMaxPeerConnections(options.MaxPeerConnections),
node.WithPrometheusRegisterer(prometheus.DefaultRegisterer),
node.WithPeerStoreCapacity(options.PeerStoreCapacity),
}
if len(options.AdvertiseAddresses) != 0 {
nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...))
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type NodeOptions struct {
UserAgent string
PProf bool
MaxPeerConnections int
PeerStoreCapacity int

PeerExchange PeerExchangeOptions
Websocket WSOptions
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}

//Initialize peer manager.
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log)
w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, w.log)

w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type WakuNodeParameters struct {
rendezvousDB *rendezvous.DB

maxPeerConnections int
peerStoreCapacity int

enableDiscV5 bool
udpPort uint
Expand Down Expand Up @@ -356,6 +357,13 @@ func WithMaxPeerConnections(maxPeers int) WakuNodeOption {
}
}

func WithPeerStoreCapacity(capacity int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.peerStoreCapacity = capacity
return nil
}
}

// WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery
func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
Expand Down
62 changes: 54 additions & 8 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peermanager

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -23,13 +24,15 @@ import (
"go.uber.org/zap"
)

// NodeTopicDetails stores pubSubTopic related data like topicHandle for the node.
type NodeTopicDetails struct {
topic *pubsub.Topic
}

// PeerManager applies various controls and manage connections towards peers.
type PeerManager struct {
peerConnector *PeerConnectionStrategy
maxPeers int
maxRelayPeers int
logger *zap.Logger
InRelayPeersTarget int
Expand All @@ -43,6 +46,7 @@ type PeerManager struct {
}

const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 5

// 80% relay peers 20% service peers
func relayAndServicePeers(maxConnections int) (int, int) {
Expand All @@ -61,23 +65,29 @@ func inAndOutRelayPeers(relayPeers int) (int, int) {
}

// NewPeerManager creates a new peerManager instance.
func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager {
func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerManager {

maxRelayPeers, _ := relayAndServicePeers(maxConnections)
inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers)

if maxPeers == 0 || maxConnections > maxPeers {
maxPeers = maxConnsToPeerRatio * maxConnections
}

pm := &PeerManager{
logger: logger.Named("peer-manager"),
maxRelayPeers: maxRelayPeers,
InRelayPeersTarget: inRelayPeersTarget,
OutRelayPeersTarget: outRelayPeersTarget,
serviceSlots: NewServiceSlot(),
subRelayTopics: make(map[string]*NodeTopicDetails),
maxPeers: maxPeers,
}
logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections),
zap.Int("maxRelayPeers", maxRelayPeers),
zap.Int("outRelayPeersTarget", outRelayPeersTarget),
zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget))
zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget),
zap.Int("maxPeers", maxPeers))

return pm
}
Expand Down Expand Up @@ -138,6 +148,8 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers p
return inPeers, outPeers, nil
}

// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers.
// If specifiedPeers is empty, it checks within all peers in peerStore.
func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers)
Expand All @@ -157,6 +169,9 @@ func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer
return
}

// ensureMinRelayConnsPerTopic makes sure there are min of D conns per pubsubTopic.
// If not it will look into peerStore to initiate more connections.
// If peerStore doesn't have enough peers, will wait for discv5 to find more and try in next cycle
func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
pm.topicMutex.RLock()
defer pm.topicMutex.RUnlock()
Expand Down Expand Up @@ -184,6 +199,9 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
}
}

// connectToRelayPeers ensures minimum D connections are there for each pubSubTopic.
// If not, initiates connections to additional peers.
// It also checks for incoming relay connections and prunes once they cross inRelayTarget
func (pm *PeerManager) connectToRelayPeers() {
//Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic()
Expand All @@ -198,22 +216,36 @@ func (pm *PeerManager) connectToRelayPeers() {
}
}

func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) PeerData {
return PeerData{
// addrInfoToPeerData returns addressinfo for a peer
// If addresses are expired, it removes the peer from host peerStore and returns nil.
func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *PeerData {
addrs := host.Peerstore().Addrs(peerID)
if len(addrs) == 0 {
//Addresses expired, remove peer from peerStore
host.Peerstore().RemovePeer(peerID)
return nil
}
return &PeerData{
Origin: origin,
AddrInfo: peer.AddrInfo{
ID: peerID,
Addrs: host.Peerstore().Addrs(peerID),
Addrs: addrs,
},
}
}

// connectToPeers connects to peers provided in the list if the addresses have not expired.
func (pm *PeerManager) connectToPeers(peers peer.IDSlice) {
for _, peerID := range peers {
peerData := addrInfoToPeerData(wps.PeerManager, peerID, pm.host)
pm.peerConnector.PushToChan(peerData)
if peerData == nil {
continue
}
pm.peerConnector.PushToChan(*peerData)
}
}

// getNotConnectedPers returns peers for a pubSubTopic that are not connected.
func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) {
var peerList peer.IDSlice
if pubsubTopic == "" {
Expand All @@ -229,10 +261,11 @@ func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeer
return
}

// pruneInRelayConns prune any incoming relay connections crossing derived inrelayPeerTarget
func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {

//Start disconnecting peers, based on what?
//For now, just disconnect most recently connected peers
//For now no preference is used
//TODO: Need to have more intelligent way of doing this, maybe peer scores.
//TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers.
pm.logger.Info("peer connections exceed target relay peers, hence pruning",
Expand All @@ -253,6 +286,10 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) {
// Note that these peers will not be set in service-slots.
// TODO: It maybe good to set in service-slots based on services supported in the ENR
func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) {
//Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes.
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
return
}
//Check if the peer is already present, if so skip adding
_, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID)
if err == nil {
Expand Down Expand Up @@ -296,8 +333,17 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) {
// addPeer adds peer to only the peerStore.
// It also sets additional metadata such as origin, ENR and supported protocols
func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error {
if pm.maxPeers <= pm.host.Peerstore().Peers().Len() {
return errors.New("peer store capacity reached")
}
pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID))
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL)
if origin == wps.Static {
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL)
} else {
//Need to re-evaluate the address expiry
// For now expiring them with default addressTTL which is an hour.
pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL)
}
err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin)
if err != nil {
pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID))
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func initTest(t *testing.T) (context.Context, *PeerManager, func()) {
defer h1.Close()

// host 1 is used by peer manager
pm := NewPeerManager(10, utils.Logger())
pm := NewPeerManager(10, 20, utils.Logger())
pm.SetHost(h1)

return ctx, pm, func() {
Expand Down