Skip to content

Commit 578684e

Browse files
authored
p2p: support EnableGossipService in p2p streams (algorand#6073)
1 parent e697ae8 commit 578684e

File tree

3 files changed

+163
-35
lines changed

3 files changed

+163
-35
lines changed

network/p2p/p2p.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func configureResourceManager(cfg config.Local) (network.ResourceManager, error)
159159
// MakeService creates a P2P service instance
160160
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, bootstrapPeers []*peer.AddrInfo) (*serviceImpl, error) {
161161

162-
sm := makeStreamManager(ctx, log, h, wsStreamHandler)
162+
sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService)
163163
h.Network().Notify(sm)
164164
h.SetStreamHandler(AlgorandWsProtocol, sm.streamHandler)
165165

network/p2p/streams.go

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ import (
3030

3131
// streamManager implements network.Notifiee to create and manage streams for use with non-gossipsub protocols.
3232
type streamManager struct {
33-
ctx context.Context
34-
log logging.Logger
35-
host host.Host
36-
handler StreamHandler
33+
ctx context.Context
34+
log logging.Logger
35+
host host.Host
36+
handler StreamHandler
37+
allowIncomingGossip bool
3738

3839
streams map[peer.ID]network.Stream
3940
streamsLock deadlock.Mutex
@@ -42,18 +43,25 @@ type streamManager struct {
4243
// StreamHandler is called when a new bidirectional stream for a given protocol and peer is opened.
4344
type StreamHandler func(ctx context.Context, pid peer.ID, s network.Stream, incoming bool)
4445

45-
func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler) *streamManager {
46+
func makeStreamManager(ctx context.Context, log logging.Logger, h host.Host, handler StreamHandler, allowIncomingGossip bool) *streamManager {
4647
return &streamManager{
47-
ctx: ctx,
48-
log: log,
49-
host: h,
50-
handler: handler,
51-
streams: make(map[peer.ID]network.Stream),
48+
ctx: ctx,
49+
log: log,
50+
host: h,
51+
handler: handler,
52+
allowIncomingGossip: allowIncomingGossip,
53+
streams: make(map[peer.ID]network.Stream),
5254
}
5355
}
5456

5557
// streamHandler is called by libp2p when a new stream is accepted
5658
func (n *streamManager) streamHandler(stream network.Stream) {
59+
if stream.Conn().Stat().Direction == network.DirInbound && !n.allowIncomingGossip {
60+
n.log.Debugf("rejecting stream from incoming connection from %s", stream.Conn().RemotePeer().String())
61+
stream.Close()
62+
return
63+
}
64+
5765
n.streamsLock.Lock()
5866
defer n.streamsLock.Unlock()
5967

@@ -74,15 +82,7 @@ func (n *streamManager) streamHandler(stream network.Stream) {
7482
}
7583
n.streams[stream.Conn().RemotePeer()] = stream
7684

77-
// streamHandler is supposed to be called for accepted streams, so we expect incoming here
7885
incoming := stream.Conn().Stat().Direction == network.DirInbound
79-
if !incoming {
80-
if stream.Stat().Direction == network.DirUnknown {
81-
n.log.Warnf("Unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
82-
} else {
83-
n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
84-
}
85-
}
8686
n.handler(n.ctx, remotePeer, stream, incoming)
8787
return
8888
}
@@ -92,20 +92,18 @@ func (n *streamManager) streamHandler(stream network.Stream) {
9292
}
9393
// no old stream
9494
n.streams[stream.Conn().RemotePeer()] = stream
95-
// streamHandler is supposed to be called for accepted streams, so we expect incoming here
9695
incoming := stream.Conn().Stat().Direction == network.DirInbound
97-
if !incoming {
98-
if stream.Stat().Direction == network.DirUnknown {
99-
n.log.Warnf("streamHandler: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
100-
} else {
101-
n.log.Warnf("Unexpected outgoing stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
102-
}
103-
}
10496
n.handler(n.ctx, remotePeer, stream, incoming)
10597
}
10698

10799
// Connected is called when a connection is opened
100+
// for both incoming (listener -> addConn) and outgoing (dialer -> addConn) connections.
108101
func (n *streamManager) Connected(net network.Network, conn network.Conn) {
102+
if conn.Stat().Direction == network.DirInbound && !n.allowIncomingGossip {
103+
n.log.Debugf("ignoring incoming connection from %s", conn.RemotePeer().String())
104+
return
105+
}
106+
109107
remotePeer := conn.RemotePeer()
110108
localPeer := n.host.ID()
111109

@@ -138,15 +136,7 @@ func (n *streamManager) Connected(net network.Network, conn network.Conn) {
138136
needUnlock = false
139137
n.streamsLock.Unlock()
140138

141-
// a new stream created above, expected direction is outbound
142139
incoming := stream.Conn().Stat().Direction == network.DirInbound
143-
if incoming {
144-
n.log.Warnf("Unexpected incoming stream in streamHandler for connection %s (%s): %s vs %s stream", stream.Conn().ID(), remotePeer, stream.Conn().Stat().Direction, stream.Stat().Direction.String())
145-
} else {
146-
if stream.Stat().Direction == network.DirUnknown {
147-
n.log.Warnf("Connected: unknown direction for a steam %s to/from %s", stream.ID(), remotePeer)
148-
}
149-
}
150140
n.handler(n.ctx, remotePeer, stream, incoming)
151141
}
152142

network/p2pNetwork_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,3 +1207,141 @@ func TestP2PwsStreamHandlerDedup(t *testing.T) {
12071207
require.False(t, netA.hasPeers())
12081208
require.False(t, netB.hasPeers())
12091209
}
1210+
1211+
// TestP2PEnableGossipService_NodeDisable ensures that a node with EnableGossipService=false
1212+
// still can participate in the network by sending and receiving messages.
1213+
func TestP2PEnableGossipService_NodeDisable(t *testing.T) {
1214+
partitiontest.PartitionTest(t)
1215+
1216+
log := logging.TestingLog(t)
1217+
1218+
// prepare configs
1219+
cfg := config.GetDefaultLocal()
1220+
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
1221+
1222+
relayCfg := cfg
1223+
relayCfg.NetAddress = "127.0.0.1:0"
1224+
1225+
nodeCfg := cfg
1226+
nodeCfg.EnableGossipService = false
1227+
nodeCfg2 := nodeCfg
1228+
nodeCfg2.NetAddress = "127.0.0.1:0"
1229+
1230+
tests := []struct {
1231+
name string
1232+
relayCfg config.Local
1233+
nodeCfg config.Local
1234+
}{
1235+
{"non-listening-node", relayCfg, nodeCfg},
1236+
{"listening-node", relayCfg, nodeCfg2},
1237+
}
1238+
for _, test := range tests {
1239+
t.Run(test.name, func(t *testing.T) {
1240+
relayCfg := test.relayCfg
1241+
netA, err := NewP2PNetwork(log, relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
1242+
require.NoError(t, err)
1243+
netA.Start()
1244+
defer netA.Stop()
1245+
1246+
peerInfoA := netA.service.AddrInfo()
1247+
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
1248+
require.NoError(t, err)
1249+
require.NotZero(t, addrsA[0])
1250+
multiAddrStr := addrsA[0].String()
1251+
phoneBookAddresses := []string{multiAddrStr}
1252+
1253+
// start netB with gossip service disabled
1254+
nodeCfg := test.nodeCfg
1255+
netB, err := NewP2PNetwork(log, nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
1256+
require.NoError(t, err)
1257+
netB.Start()
1258+
defer netB.Stop()
1259+
1260+
require.Eventually(t, func() bool {
1261+
return netA.hasPeers() && netB.hasPeers()
1262+
}, 1*time.Second, 50*time.Millisecond)
1263+
1264+
testTag := protocol.AgreementVoteTag
1265+
1266+
var handlerCountA atomic.Uint32
1267+
passThroughHandlerA := []TaggedMessageHandler{
1268+
{Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage {
1269+
handlerCountA.Add(1)
1270+
return OutgoingMessage{Action: Broadcast}
1271+
})},
1272+
}
1273+
var handlerCountB atomic.Uint32
1274+
passThroughHandlerB := []TaggedMessageHandler{
1275+
{Tag: testTag, MessageHandler: HandlerFunc(func(msg IncomingMessage) OutgoingMessage {
1276+
handlerCountB.Add(1)
1277+
return OutgoingMessage{Action: Broadcast}
1278+
})},
1279+
}
1280+
netA.RegisterHandlers(passThroughHandlerA)
1281+
netB.RegisterHandlers(passThroughHandlerB)
1282+
1283+
// send messages from both nodes to each other and confirm they are received.
1284+
for i := 0; i < 10; i++ {
1285+
err = netA.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from A %d", i)), false, nil)
1286+
require.NoError(t, err)
1287+
err = netB.Broadcast(context.Background(), testTag, []byte(fmt.Sprintf("hello from B %d", i)), false, nil)
1288+
require.NoError(t, err)
1289+
}
1290+
1291+
require.Eventually(
1292+
t,
1293+
func() bool {
1294+
return handlerCountA.Load() == 10 && handlerCountB.Load() == 10
1295+
},
1296+
2*time.Second,
1297+
50*time.Millisecond,
1298+
)
1299+
})
1300+
}
1301+
}
1302+
1303+
// TestP2PEnableGossipService_BothDisable checks if both relay and node have EnableGossipService=false
1304+
// they do not gossip to each other.
1305+
//
1306+
// Note, this test checks a configuration where node A (relay) does not know about node B,
1307+
// and node B is configured to connect to A, and this scenario rejecting logic is guaranteed to work.
1308+
func TestP2PEnableGossipService_BothDisable(t *testing.T) {
1309+
partitiontest.PartitionTest(t)
1310+
1311+
log := logging.TestingLog(t)
1312+
1313+
// prepare configs
1314+
cfg := config.GetDefaultLocal()
1315+
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses
1316+
cfg.EnableGossipService = false // disable gossip service by default
1317+
1318+
relayCfg := cfg
1319+
relayCfg.NetAddress = "127.0.0.1:0"
1320+
1321+
netA, err := NewP2PNetwork(log.With("net", "netA"), relayCfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
1322+
require.NoError(t, err)
1323+
netA.Start()
1324+
defer netA.Stop()
1325+
1326+
peerInfoA := netA.service.AddrInfo()
1327+
addrsA, err := peer.AddrInfoToP2pAddrs(&peerInfoA)
1328+
require.NoError(t, err)
1329+
require.NotZero(t, addrsA[0])
1330+
multiAddrStr := addrsA[0].String()
1331+
phoneBookAddresses := []string{multiAddrStr}
1332+
1333+
nodeCfg := cfg
1334+
nodeCfg.NetAddress = ""
1335+
1336+
netB, err := NewP2PNetwork(log.With("net", "netB"), nodeCfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
1337+
require.NoError(t, err)
1338+
netB.Start()
1339+
defer netB.Stop()
1340+
1341+
require.Eventually(t, func() bool {
1342+
return len(netA.service.Conns()) > 0 && len(netB.service.Conns()) > 0
1343+
}, 1*time.Second, 50*time.Millisecond)
1344+
1345+
require.False(t, netA.hasPeers())
1346+
require.False(t, netB.hasPeers())
1347+
}

0 commit comments

Comments
 (0)