Skip to content

Commit 568711a

Browse files
feat: add peer connections subcommand
1 parent 380e7d5 commit 568711a

File tree

4 files changed

+233
-1
lines changed

4 files changed

+233
-1
lines changed

app/client/cli/peer/connections.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package peer
2+
3+
import (
4+
"fmt"
5+
"github.com/pokt-network/pocket/p2p/debug"
6+
"github.com/pokt-network/pocket/shared/messaging"
7+
"github.com/spf13/cobra"
8+
"google.golang.org/protobuf/types/known/anypb"
9+
10+
"github.com/pokt-network/pocket/app/client/cli/helpers"
11+
)
12+
13+
var (
14+
connectionsCmd = &cobra.Command{
15+
Use: "connections",
16+
Short: "Print open peer connections",
17+
RunE: connectionsRunE,
18+
}
19+
)
20+
21+
func init() {
22+
PeerCmd.AddCommand(connectionsCmd)
23+
}
24+
25+
func connectionsRunE(cmd *cobra.Command, _ []string) error {
26+
var routerType debug.RouterType
27+
28+
bus, err := helpers.GetBusFromCmd(cmd)
29+
if err != nil {
30+
return err
31+
}
32+
33+
switch {
34+
case stakedFlag:
35+
if unstakedFlag || allFlag {
36+
return ErrRouterType
37+
}
38+
routerType = debug.StakedRouterType
39+
case unstakedFlag:
40+
if stakedFlag || allFlag {
41+
return ErrRouterType
42+
}
43+
routerType = debug.UnstakedRouterType
44+
// even if `allFlag` is false, we still want to print all peers
45+
default:
46+
if stakedFlag || unstakedFlag {
47+
return ErrRouterType
48+
}
49+
routerType = debug.AllRouterTypes
50+
}
51+
52+
debugMsg := &messaging.DebugMessage{
53+
Action: messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS,
54+
Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST,
55+
Message: &anypb.Any{
56+
Value: []byte(routerType),
57+
},
58+
}
59+
debugMsgAny, err := anypb.New(debugMsg)
60+
if err != nil {
61+
return fmt.Errorf("creating anypb from debug message: %w", err)
62+
}
63+
64+
if localFlag {
65+
if err := debug.PrintPeerConnections(bus, routerType); err != nil {
66+
return fmt.Errorf("printing peer list: %w", err)
67+
}
68+
return nil
69+
}
70+
71+
// TECHDEBT(#810, #811): will need to wait for DHT bootstrapping to complete before
72+
// p2p broadcast can be used with to reach unstaked actors.
73+
// CONSIDERATION: add the peer commands to the interactive CLI as the P2P module
74+
// instance could persist between commands. Other interactive CLI commands which
75+
// rely on unstaked actor router broadcast are working as expected.
76+
77+
// TECHDEBT(#810, #811): use broadcast instead to reach all peers.
78+
return sendToStakedPeers(cmd, debugMsgAny)
79+
}

p2p/debug.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ import (
99

1010
func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
1111
switch msg.Action {
12-
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
12+
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST,
13+
messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS:
1314
if !m.cfg.EnablePeerDiscoveryDebugRpc {
1415
return typesP2P.ErrPeerDiscoveryDebugRPCDisabled
1516
}
@@ -22,6 +23,9 @@ func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
2223
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
2324
routerType := debug.RouterType(msg.Message.Value)
2425
return debug.PrintPeerList(m.GetBus(), routerType)
26+
case messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS:
27+
routerType := debug.RouterType(msg.Message.Value)
28+
return debug.PrintPeerConnections(m.GetBus(), routerType)
2529
default:
2630
return fmt.Errorf("unsupported P2P debug message action: %s", msg.Action)
2731
}

p2p/debug/connections.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package debug
2+
3+
import (
4+
"fmt"
5+
libp2pPeer "github.com/libp2p/go-libp2p/core/peer"
6+
"os"
7+
"strconv"
8+
9+
libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
10+
"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
11+
typesP2P "github.com/pokt-network/pocket/p2p/types"
12+
"github.com/pokt-network/pocket/p2p/utils"
13+
"github.com/pokt-network/pocket/shared/modules"
14+
)
15+
16+
var printConnectionsHeader = []string{"Peer ID", "Multiaddr", "Opened", "Direction", "NumStreams"}
17+
18+
func PrintPeerConnections(bus modules.Bus, routerType RouterType) error {
19+
var (
20+
connections []libp2pNetwork.Conn
21+
routerPlurality = ""
22+
)
23+
24+
if routerType == AllRouterTypes {
25+
routerPlurality = "s"
26+
}
27+
28+
connections, err := getFilteredConnections(bus, routerType)
29+
if err != nil {
30+
return fmt.Errorf("getting connecions: %w", err)
31+
}
32+
33+
if err := LogSelfAddress(bus); err != nil {
34+
return fmt.Errorf("printing self address: %w", err)
35+
}
36+
37+
// NB: Intentionally printing with `fmt` instead of the logger to match
38+
// `utils.PrintPeerListTable` which does not use the logger due to
39+
// incompatibilities with the tabwriter.
40+
// (This doesn't seem to work as expected; i.e. not printing at all in tilt.)
41+
if _, err := fmt.Fprintf(
42+
os.Stdout,
43+
"%s router peerstore%s:\n",
44+
routerType,
45+
routerPlurality,
46+
); err != nil {
47+
return fmt.Errorf("printing to stdout: %w", err)
48+
}
49+
50+
if err := PrintConnectionsTable(connections); err != nil {
51+
return fmt.Errorf("printing peer list: %w", err)
52+
}
53+
return nil
54+
}
55+
56+
func PrintConnectionsTable(conns []libp2pNetwork.Conn) error {
57+
return utils.PrintTable(printConnectionsHeader, peerConnsRowConsumerFactory(conns))
58+
}
59+
60+
func getFilteredConnections(
61+
bus modules.Bus,
62+
routerType RouterType,
63+
) ([]libp2pNetwork.Conn, error) {
64+
var (
65+
pstore typesP2P.Peerstore
66+
idsToInclude map[libp2pPeer.ID]struct{}
67+
p2pModule = bus.GetP2PModule()
68+
connections = p2pModule.GetConnections()
69+
)
70+
71+
// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider
72+
// is retrievable as a proper submodule.
73+
pstoreProviderModule, err := bus.GetModulesRegistry().
74+
GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
75+
if err != nil {
76+
return nil, fmt.Errorf("getting peerstore provider: %w", err)
77+
}
78+
pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider)
79+
if !ok {
80+
return nil, fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule)
81+
}
82+
//--
83+
84+
switch routerType {
85+
case AllRouterTypes:
86+
// return early; no need to filter
87+
return connections, nil
88+
case StakedRouterType:
89+
pstore, err = pstoreProvider.GetStakedPeerstoreAtCurrentHeight()
90+
if err != nil {
91+
return nil, fmt.Errorf("getting staked peerstore: %w", err)
92+
}
93+
case UnstakedRouterType:
94+
pstore, err = pstoreProvider.GetUnstakedPeerstore()
95+
if err != nil {
96+
return nil, fmt.Errorf("getting unstaked peerstore: %w", err)
97+
}
98+
}
99+
100+
idsToInclude, err = getPeerIDs(pstore.GetPeerList())
101+
if err != nil {
102+
return nil, fmt.Errorf("getting peer IDs: %w", err)
103+
}
104+
105+
for _, conn := range connections {
106+
if _, ok := idsToInclude[conn.RemotePeer()]; !ok {
107+
// remote peer ID not in `idsToInclude` set; filter connection out
108+
connections = append(connections[:], connections[1:]...)
109+
}
110+
}
111+
return connections, nil
112+
}
113+
114+
func peerConnsRowConsumerFactory(conns []libp2pNetwork.Conn) utils.RowConsumer {
115+
return func(provideRow utils.RowProvider) error {
116+
for _, conn := range conns {
117+
err := provideRow(
118+
conn.RemotePeer().String(),
119+
conn.RemoteMultiaddr().String(),
120+
conn.Stat().Opened.String(),
121+
conn.Stat().Direction.String(),
122+
strconv.Itoa(conn.Stat().NumStreams),
123+
)
124+
if err != nil {
125+
return err
126+
}
127+
}
128+
return nil
129+
}
130+
}
131+
132+
func getPeerIDs(peers []typesP2P.Peer) (ids map[libp2pPeer.ID]struct{}, err error) {
133+
for _, peer := range peers {
134+
addrInfo, err := utils.Libp2pAddrInfoFromPeer(peer)
135+
if err != nil {
136+
return nil, err
137+
}
138+
139+
// ID already in set; continue
140+
if _, ok := ids[addrInfo.ID]; !ok {
141+
continue
142+
}
143+
144+
// add ID to set
145+
ids[addrInfo.ID] = struct{}{}
146+
}
147+
return ids, nil
148+
}

shared/messaging/proto/debug_message.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ enum DebugMessageAction {
2323
DEBUG_PERSISTENCE_CLEAR_STATE = 8;
2424
DEBUG_PERSISTENCE_RESET_TO_GENESIS = 9;
2525
DEBUG_P2P_PEER_LIST = 10;
26+
DEBUG_P2P_PEER_CONNECTIONS = 11;
2627
}
2728

2829
message DebugMessage {

0 commit comments

Comments
 (0)