From 59ad5d0d67ba9c5983f08e5f448a06f70bacea77 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 25 Sep 2023 17:15:46 +0530 Subject: [PATCH 1/5] chore: add config to limit peerstore capacity --- cmd/waku/flags.go | 6 ++++++ cmd/waku/main.go | 1 + cmd/waku/node.go | 1 + cmd/waku/options.go | 1 + waku/v2/node/wakunode2.go | 2 +- waku/v2/node/wakuoptions.go | 8 ++++++++ waku/v2/peermanager/peer_manager.go | 19 +++++++++++++++++-- waku/v2/peermanager/peer_manager_test.go | 2 +- 8 files changed, 36 insertions(+), 4 deletions(-) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 99a4ad744..e60a498d6 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -32,6 +32,12 @@ var ( Destination: &options.MaxPeerConnections, EnvVars: []string{"WAKUNODE2_MAX_CONNECTIONS"}, }) + PeerStoreCapacity = altsrc.NewIntFlag(&cli.IntFlag{ + Name: "peer-store-capacity", + Usage: "Maximum stored peers in the peerstore.", + Destination: &options.PeerStoreCapacity, + EnvVars: []string{"WAKUNODE2_PEERSTORE_CAPACITY"}, + }) WebsocketSupport = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "websocket-support", Aliases: []string{"ws"}, diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 9c6097b52..c260fb9bf 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -22,6 +22,7 @@ func main() { TcpPort, Address, MaxPeerConnections, + PeerStoreCapacity, WebsocketSupport, WebsocketPort, WebsocketSecurePort, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 99f97e63a..e0d0fbf63 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -129,6 +129,7 @@ func Execute(options NodeOptions) { node.WithKeepAlive(options.KeepAlive), node.WithMaxPeerConnections(options.MaxPeerConnections), node.WithPrometheusRegisterer(prometheus.DefaultRegisterer), + node.WithPeerStoreCapacity(options.PeerStoreCapacity), } if len(options.AdvertiseAddresses) != 0 { nodeOpts = append(nodeOpts, node.WithAdvertiseAddresses(options.AdvertiseAddresses...)) diff --git a/cmd/waku/options.go b/cmd/waku/options.go index f27fd4ab5..e1bd026f3 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -169,6 +169,7 @@ type NodeOptions struct { UserAgent string PProf bool MaxPeerConnections int + PeerStoreCapacity int PeerExchange PeerExchangeOptions Websocket WSOptions diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index cd55b2163..1a3b18c21 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -253,7 +253,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } //Initialize peer manager. - w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.log) + w.peermanager = peermanager.NewPeerManager(w.opts.maxPeerConnections, w.opts.peerStoreCapacity, w.log) w.peerConnector, err = peermanager.NewPeerConnectionStrategy(w.peermanager, discoveryConnectTimeout, w.log) if err != nil { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 559fd858e..2f85d9c10 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -88,6 +88,7 @@ type WakuNodeParameters struct { rendezvousDB *rendezvous.DB maxPeerConnections int + peerStoreCapacity int enableDiscV5 bool udpPort uint @@ -356,6 +357,13 @@ func WithMaxPeerConnections(maxPeers int) WakuNodeOption { } } +func WithPeerStoreCapacity(capacity int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.peerStoreCapacity = capacity + return nil + } +} + // WithDiscoveryV5 is a WakuOption used to enable DiscV5 peer discovery func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 1881bec15..7b7d015d2 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -2,6 +2,7 @@ package peermanager import ( "context" + "errors" "sync" "time" @@ -29,6 +30,7 @@ type NodeTopicDetails struct { // PeerManager applies various controls and manage connections towards peers. type PeerManager struct { peerConnector *PeerConnectionStrategy + maxPeers int maxRelayPeers int logger *zap.Logger InRelayPeersTarget int @@ -61,11 +63,15 @@ func inAndOutRelayPeers(relayPeers int) (int, int) { } // NewPeerManager creates a new peerManager instance. -func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager { +func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerManager { maxRelayPeers, _ := relayAndServicePeers(maxConnections) inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers) + if maxPeers == 0 || maxConnections > maxPeers { + maxPeers = 5 * maxConnections + } + pm := &PeerManager{ logger: logger.Named("peer-manager"), maxRelayPeers: maxRelayPeers, @@ -73,11 +79,13 @@ func NewPeerManager(maxConnections int, logger *zap.Logger) *PeerManager { OutRelayPeersTarget: outRelayPeersTarget, serviceSlots: NewServiceSlot(), subRelayTopics: make(map[string]*NodeTopicDetails), + maxPeers: maxPeers, } logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections), zap.Int("maxRelayPeers", maxRelayPeers), zap.Int("outRelayPeersTarget", outRelayPeersTarget), - zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget)) + zap.Int("inRelayPeersTarget", pm.InRelayPeersTarget), + zap.Int("maxPeers", maxPeers)) return pm } @@ -253,6 +261,10 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { // Note that these peers will not be set in service-slots. // TODO: It maybe good to set in service-slots based on services supported in the ENR func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { + //Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes. + if pm.maxPeers < pm.host.Peerstore().Peers().Len() { + return + } //Check if the peer is already present, if so skip adding _, err := pm.host.Peerstore().(wps.WakuPeerstore).Origin(p.AddrInfo.ID) if err == nil || err != peerstore.ErrNotFound { @@ -296,6 +308,9 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { // addPeer adds peer to only the peerStore. // It also sets additional metadata such as origin, ENR and supported protocols func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { + if pm.maxPeers < pm.host.Peerstore().Peers().Len() { + return errors.New("peer store capacity reached") + } pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL) err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin) diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index cef6598fa..259b92f46 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -31,7 +31,7 @@ func initTest(t *testing.T) (context.Context, *PeerManager, func()) { defer h1.Close() // host 1 is used by peer manager - pm := NewPeerManager(10, utils.Logger()) + pm := NewPeerManager(10, 20, utils.Logger()) pm.SetHost(h1) return ctx, pm, func() { From 43492398c103c18e117c1aa98d021ef51a7dd795 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 25 Sep 2023 17:20:12 +0530 Subject: [PATCH 2/5] chore: nit in compare --- waku/v2/peermanager/peer_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 7b7d015d2..c903d7272 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -262,7 +262,7 @@ func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { // TODO: It maybe good to set in service-slots based on services supported in the ENR func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { //Doing this check again inside addPeer, in order to avoid additional complexity of rollingBack other changes. - if pm.maxPeers < pm.host.Peerstore().Peers().Len() { + if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { return } //Check if the peer is already present, if so skip adding @@ -308,7 +308,7 @@ func (pm *PeerManager) AddDiscoveredPeer(p PeerData, connectNow bool) { // addPeer adds peer to only the peerStore. // It also sets additional metadata such as origin, ENR and supported protocols func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) error { - if pm.maxPeers < pm.host.Peerstore().Peers().Len() { + if pm.maxPeers <= pm.host.Peerstore().Peers().Len() { return errors.New("peer store capacity reached") } pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) From 8c446baf67274b40d0b0a8f6c6ac2bc1d2ddab92 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 26 Sep 2023 07:04:50 +0530 Subject: [PATCH 3/5] chore:address review comment --- waku/v2/peermanager/peer_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index c903d7272..68066ec74 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -69,7 +69,7 @@ func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerM inRelayPeersTarget, outRelayPeersTarget := inAndOutRelayPeers(maxRelayPeers) if maxPeers == 0 || maxConnections > maxPeers { - maxPeers = 5 * maxConnections + maxPeers = maxConnsToPeerRatio * maxConnections } pm := &PeerManager{ From 82c0f4a6f6ce40eff91167539506091bf9e014b1 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 26 Sep 2023 09:54:37 +0530 Subject: [PATCH 4/5] fix issue wit build --- waku/v2/peermanager/peer_manager.go | 1 + 1 file changed, 1 insertion(+) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 68066ec74..f5042a638 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -45,6 +45,7 @@ type PeerManager struct { const peerConnectivityLoopSecs = 15 const relayOptimalPeersPerShard = 6 +const maxConnsToPeerRatio = 5 // 80% relay peers 20% service peers func relayAndServicePeers(maxConnections int) (int, int) { From a86466a255a78cdd08e3a072755e8fc4db186eab Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 27 Sep 2023 12:05:01 +0530 Subject: [PATCH 5/5] fix: expire peer addresses discovered dynamically,remove them from peerStore after expiry --- waku/v2/peermanager/peer_manager.go | 43 ++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 6ee0d3c81..042d3de99 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" ) +// NodeTopicDetails stores pubSubTopic related data like topicHandle for the node. type NodeTopicDetails struct { topic *pubsub.Topic } @@ -47,7 +48,6 @@ type PeerManager struct { const peerConnectivityLoopSecs = 15 const maxConnsToPeerRatio = 5 - // 80% relay peers 20% service peers func relayAndServicePeers(maxConnections int) (int, int) { return maxConnections - maxConnections/5, maxConnections / 5 @@ -148,6 +148,8 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers p return inPeers, outPeers, nil } +// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers. +// If specifiedPeers is empty, it checks within all peers in peerStore. func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { //Group peers by their connected direction inbound or outbound. inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers) @@ -167,6 +169,9 @@ func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer return } +// ensureMinRelayConnsPerTopic makes sure there are min of D conns per pubsubTopic. +// If not it will look into peerStore to initiate more connections. +// If peerStore doesn't have enough peers, will wait for discv5 to find more and try in next cycle func (pm *PeerManager) ensureMinRelayConnsPerTopic() { pm.topicMutex.RLock() defer pm.topicMutex.RUnlock() @@ -194,6 +199,9 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() { } } +// connectToRelayPeers ensures minimum D connections are there for each pubSubTopic. +// If not, initiates connections to additional peers. +// It also checks for incoming relay connections and prunes once they cross inRelayTarget func (pm *PeerManager) connectToRelayPeers() { //Check for out peer connections and connect to more peers. pm.ensureMinRelayConnsPerTopic() @@ -208,22 +216,36 @@ func (pm *PeerManager) connectToRelayPeers() { } } -func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) PeerData { - return PeerData{ +// addrInfoToPeerData returns addressinfo for a peer +// If addresses are expired, it removes the peer from host peerStore and returns nil. +func addrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host) *PeerData { + addrs := host.Peerstore().Addrs(peerID) + if len(addrs) == 0 { + //Addresses expired, remove peer from peerStore + host.Peerstore().RemovePeer(peerID) + return nil + } + return &PeerData{ Origin: origin, AddrInfo: peer.AddrInfo{ ID: peerID, - Addrs: host.Peerstore().Addrs(peerID), + Addrs: addrs, }, } } + +// connectToPeers connects to peers provided in the list if the addresses have not expired. func (pm *PeerManager) connectToPeers(peers peer.IDSlice) { for _, peerID := range peers { peerData := addrInfoToPeerData(wps.PeerManager, peerID, pm.host) - pm.peerConnector.PushToChan(peerData) + if peerData == nil { + continue + } + pm.peerConnector.PushToChan(*peerData) } } +// getNotConnectedPers returns peers for a pubSubTopic that are not connected. func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeers peer.IDSlice) { var peerList peer.IDSlice if pubsubTopic == "" { @@ -239,10 +261,11 @@ func (pm *PeerManager) getNotConnectedPers(pubsubTopic string) (notConnectedPeer return } +// pruneInRelayConns prune any incoming relay connections crossing derived inrelayPeerTarget func (pm *PeerManager) pruneInRelayConns(inRelayPeers peer.IDSlice) { //Start disconnecting peers, based on what? - //For now, just disconnect most recently connected peers + //For now no preference is used //TODO: Need to have more intelligent way of doing this, maybe peer scores. //TODO: Keep optimalPeersRequired for a pubSubTopic in mind while pruning connections to peers. pm.logger.Info("peer connections exceed target relay peers, hence pruning", @@ -314,7 +337,13 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig return errors.New("peer store capacity reached") } pm.logger.Info("adding peer to peerstore", logging.HostID("peer", ID)) - pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL) + if origin == wps.Static { + pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.PermanentAddrTTL) + } else { + //Need to re-evaluate the address expiry + // For now expiring them with default addressTTL which is an hour. + pm.host.Peerstore().AddAddrs(ID, addrs, peerstore.AddressTTL) + } err := pm.host.Peerstore().(wps.WakuPeerstore).SetOrigin(ID, origin) if err != nil { pm.logger.Error("could not set origin", zap.Error(err), logging.HostID("peer", ID))