From f710d6e560a0189fcb513119a2d4cc2b8be287d5 Mon Sep 17 00:00:00 2001 From: Bryan White Date: Tue, 6 Jun 2023 09:18:09 +0200 Subject: [PATCH] feat: add `peer connections` subcommand --- app/client/cli/peer/connections.go | 79 +++++++++++ p2p/debug.go | 6 +- p2p/debug/connections.go | 148 +++++++++++++++++++++ shared/messaging/proto/debug_message.proto | 1 + 4 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 app/client/cli/peer/connections.go create mode 100644 p2p/debug/connections.go diff --git a/app/client/cli/peer/connections.go b/app/client/cli/peer/connections.go new file mode 100644 index 000000000..b966207ff --- /dev/null +++ b/app/client/cli/peer/connections.go @@ -0,0 +1,79 @@ +package peer + +import ( + "fmt" + "github.com/pokt-network/pocket/p2p/debug" + "github.com/pokt-network/pocket/shared/messaging" + "github.com/spf13/cobra" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/pokt-network/pocket/app/client/cli/helpers" +) + +var ( + connectionsCmd = &cobra.Command{ + Use: "connections", + Short: "Print open peer connections", + RunE: connectionsRunE, + } +) + +func init() { + PeerCmd.AddCommand(connectionsCmd) +} + +func connectionsRunE(cmd *cobra.Command, _ []string) error { + var routerType debug.RouterType + + bus, err := helpers.GetBusFromCmd(cmd) + if err != nil { + return err + } + + switch { + case stakedFlag: + if unstakedFlag || allFlag { + return ErrRouterType + } + routerType = debug.StakedRouterType + case unstakedFlag: + if stakedFlag || allFlag { + return ErrRouterType + } + routerType = debug.UnstakedRouterType + // even if `allFlag` is false, we still want to print all peers + default: + if stakedFlag || unstakedFlag { + return ErrRouterType + } + routerType = debug.AllRouterTypes + } + + debugMsg := &messaging.DebugMessage{ + Action: messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS, + Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST, + Message: &anypb.Any{ + Value: []byte(routerType), + }, + } + debugMsgAny, err := anypb.New(debugMsg) + if err != nil { + return fmt.Errorf("creating anypb from debug message: %w", err) + } + + if localFlag { + if err := debug.PrintPeerConnections(bus, routerType); err != nil { + return fmt.Errorf("printing peer list: %w", err) + } + return nil + } + + // TECHDEBT(#810, #811): will need to wait for DHT bootstrapping to complete before + // p2p broadcast can be used with to reach unstaked actors. + // CONSIDERATION: add the peer commands to the interactive CLI as the P2P module + // instance could persist between commands. Other interactive CLI commands which + // rely on unstaked actor router broadcast are working as expected. + + // TECHDEBT(#810, #811): use broadcast instead to reach all peers. + return sendToStakedPeers(cmd, debugMsgAny) +} diff --git a/p2p/debug.go b/p2p/debug.go index 7c352eb5a..e4da506a1 100644 --- a/p2p/debug.go +++ b/p2p/debug.go @@ -10,7 +10,8 @@ import ( func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error { switch msg.Action { - case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST: + case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST, + messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS: if !m.cfg.EnablePeerDiscoveryDebugRpc { return typesP2P.ErrPeerDiscoveryDebugRPCDisabled } @@ -23,6 +24,9 @@ func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error { case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST: routerType := debug.RouterType(msg.Message.Value) return debug.PrintPeerList(m.GetBus(), routerType) + case messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS: + routerType := debug.RouterType(msg.Message.Value) + return debug.PrintPeerConnections(m.GetBus(), routerType) default: return fmt.Errorf("unsupported P2P debug message action: %s", msg.Action) } diff --git a/p2p/debug/connections.go b/p2p/debug/connections.go new file mode 100644 index 000000000..a0dde210c --- /dev/null +++ b/p2p/debug/connections.go @@ -0,0 +1,148 @@ +package debug + +import ( + "fmt" + libp2pPeer "github.com/libp2p/go-libp2p/core/peer" + "os" + "strconv" + + libp2pNetwork "github.com/libp2p/go-libp2p/core/network" + "github.com/pokt-network/pocket/p2p/providers/peerstore_provider" + typesP2P "github.com/pokt-network/pocket/p2p/types" + "github.com/pokt-network/pocket/p2p/utils" + "github.com/pokt-network/pocket/shared/modules" +) + +var printConnectionsHeader = []string{"Peer ID", "Multiaddr", "Opened", "Direction", "NumStreams"} + +func PrintPeerConnections(bus modules.Bus, routerType RouterType) error { + var ( + connections []libp2pNetwork.Conn + routerPlurality = "" + ) + + if routerType == AllRouterTypes { + routerPlurality = "s" + } + + connections, err := getFilteredConnections(bus, routerType) + if err != nil { + return fmt.Errorf("getting connecions: %w", err) + } + + if err := LogSelfAddress(bus); err != nil { + return fmt.Errorf("printing self address: %w", err) + } + + // NB: Intentionally printing with `fmt` instead of the logger to match + // `utils.PrintPeerListTable` which does not use the logger due to + // incompatibilities with the tabwriter. + // (This doesn't seem to work as expected; i.e. not printing at all in tilt.) + if _, err := fmt.Fprintf( + os.Stdout, + "%s router peerstore%s:\n", + routerType, + routerPlurality, + ); err != nil { + return fmt.Errorf("printing to stdout: %w", err) + } + + if err := PrintConnectionsTable(connections); err != nil { + return fmt.Errorf("printing peer list: %w", err) + } + return nil +} + +func PrintConnectionsTable(conns []libp2pNetwork.Conn) error { + return utils.PrintTable(printConnectionsHeader, peerConnsRowConsumerFactory(conns)) +} + +func getFilteredConnections( + bus modules.Bus, + routerType RouterType, +) ([]libp2pNetwork.Conn, error) { + var ( + pstore typesP2P.Peerstore + idsToInclude map[libp2pPeer.ID]struct{} + p2pModule = bus.GetP2PModule() + connections = p2pModule.GetConnections() + ) + + // TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider + // is retrievable as a proper submodule. + pstoreProviderModule, err := bus.GetModulesRegistry(). + GetModule(peerstore_provider.PeerstoreProviderSubmoduleName) + if err != nil { + return nil, fmt.Errorf("getting peerstore provider: %w", err) + } + pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider) + if !ok { + return nil, fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule) + } + //-- + + switch routerType { + case AllRouterTypes: + // return early; no need to filter + return connections, nil + case StakedRouterType: + pstore, err = pstoreProvider.GetStakedPeerstoreAtCurrentHeight() + if err != nil { + return nil, fmt.Errorf("getting staked peerstore: %w", err) + } + case UnstakedRouterType: + pstore, err = pstoreProvider.GetUnstakedPeerstore() + if err != nil { + return nil, fmt.Errorf("getting unstaked peerstore: %w", err) + } + } + + idsToInclude, err = getPeerIDs(pstore.GetPeerList()) + if err != nil { + return nil, fmt.Errorf("getting peer IDs: %w", err) + } + + for _, conn := range connections { + if _, ok := idsToInclude[conn.RemotePeer()]; !ok { + // remote peer ID not in `idsToInclude` set; filter connection out + connections = append(connections[:], connections[1:]...) + } + } + return connections, nil +} + +func peerConnsRowConsumerFactory(conns []libp2pNetwork.Conn) utils.RowConsumer { + return func(provideRow utils.RowProvider) error { + for _, conn := range conns { + err := provideRow( + conn.RemotePeer().String(), + conn.RemoteMultiaddr().String(), + conn.Stat().Opened.String(), + conn.Stat().Direction.String(), + strconv.Itoa(conn.Stat().NumStreams), + ) + if err != nil { + return err + } + } + return nil + } +} + +func getPeerIDs(peers []typesP2P.Peer) (ids map[libp2pPeer.ID]struct{}, err error) { + for _, peer := range peers { + addrInfo, err := utils.Libp2pAddrInfoFromPeer(peer) + if err != nil { + return nil, err + } + + // ID already in set; continue + if _, ok := ids[addrInfo.ID]; !ok { + continue + } + + // add ID to set + ids[addrInfo.ID] = struct{}{} + } + return ids, nil +} diff --git a/shared/messaging/proto/debug_message.proto b/shared/messaging/proto/debug_message.proto index 55a87c695..9019ab85e 100644 --- a/shared/messaging/proto/debug_message.proto +++ b/shared/messaging/proto/debug_message.proto @@ -23,6 +23,7 @@ enum DebugMessageAction { DEBUG_PERSISTENCE_CLEAR_STATE = 8; DEBUG_PERSISTENCE_RESET_TO_GENESIS = 9; DEBUG_P2P_PEER_LIST = 10; + DEBUG_P2P_PEER_CONNECTIONS = 11; } message DebugMessage {