Skip to content

Commit 2b6e018

Browse files
authored
node: clear new p2p net handlers on fast catchup (algorand#6127)
1 parent 90353e5 commit 2b6e018

File tree

9 files changed

+65
-25
lines changed

9 files changed

+65
-25
lines changed

data/txHandler.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -257,15 +257,7 @@ func (handler *TxHandler) Start() {
257257

258258
// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
259259
handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{
260-
{
261-
Tag: protocol.TxnTag,
262-
// create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface
263-
MessageHandler: struct {
264-
network.ValidateHandleFunc
265-
}{
266-
network.ValidateHandleFunc(handler.validateIncomingTxMessage),
267-
},
268-
},
260+
{Tag: protocol.TxnTag, MessageHandler: network.ValidateHandleFunc(handler.validateIncomingTxMessage)},
269261
})
270262

271263
handler.backlogWg.Add(2)

network/gossipNode.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ type GossipNode interface {
8686
// Currently used as p2p pubsub topic validators.
8787
RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler)
8888

89-
// ClearProcessors deregisters all the existing message processors.
90-
ClearProcessors()
89+
// ClearValidatorHandlers deregisters all the existing message processors.
90+
ClearValidatorHandlers()
9191

9292
// GetHTTPClient returns a http.Client with a suitable for the network Transport
9393
// that would also limit the number of outgoing connections.

network/hybridNetwork.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,10 @@ func (n *HybridP2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageVal
203203
n.wsNetwork.RegisterValidatorHandlers(dispatch)
204204
}
205205

206-
// ClearProcessors deregisters all the existing message processors.
207-
func (n *HybridP2PNetwork) ClearProcessors() {
208-
n.p2pNetwork.ClearProcessors()
209-
n.wsNetwork.ClearProcessors()
206+
// ClearValidatorHandlers deregisters all the existing message processors.
207+
func (n *HybridP2PNetwork) ClearValidatorHandlers() {
208+
n.p2pNetwork.ClearValidatorHandlers()
209+
n.wsNetwork.ClearValidatorHandlers()
210210
}
211211

212212
// GetHTTPClient returns a http.Client with a suitable for the network Transport

network/p2p/http.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,14 @@ func MakeHTTPServer(streamHost host.Host) *HTTPServer {
4848
p2phttpMux: mux.NewRouter(),
4949
}
5050
// libp2phttp server requires either explicit ListenAddrs or streamHost.Addrs() to be non-empty.
51-
// If streamHost.Addrs() is empty, we will listen on all interfaces
51+
// If streamHost.Addrs() is empty (that happens when NetAddress is set to ":0" and private address filtering is automatically enabled),
52+
// we will listen on localhost to satisfy libp2phttp.Host.Serve() requirements.
53+
// A side effect is it actually starts listening on interfaces listed in ListenAddrs and as go-libp2p v0.33.2
54+
// there is no other way to have libp2phttp server running AND to have streamHost.Addrs() filtered.
5255
if len(streamHost.Addrs()) == 0 {
53-
logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen all interfaces", streamHost.ID())
56+
logging.Base().Debugf("MakeHTTPServer: no addresses for %s, asking to listen localhost interface to satisfy libp2phttp.Host.Serve ", streamHost.ID())
5457
httpServer.ListenAddrs = []multiaddr.Multiaddr{
55-
multiaddr.StringCast("/ip4/0.0.0.0/tcp/0/http"),
58+
multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http"),
5659
}
5760
httpServer.InsecureAllowHTTP = true
5861
}

network/p2pNetwork.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -706,8 +706,8 @@ func (n *P2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidator
706706
n.handler.RegisterValidatorHandlers(dispatch)
707707
}
708708

709-
// ClearProcessors deregisters all the existing message handlers.
710-
func (n *P2PNetwork) ClearProcessors() {
709+
// ClearValidatorHandlers deregisters all the existing message handlers.
710+
func (n *P2PNetwork) ClearValidatorHandlers() {
711711
n.handler.ClearValidatorHandlers([]Tag{})
712712
}
713713

network/p2pNetwork_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ func TestP2PRelay(t *testing.T) {
963963
counter.Store(0)
964964
var loggedMsgs [][]byte
965965
counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs)
966-
netA.ClearProcessors()
966+
netA.ClearValidatorHandlers()
967967
netA.RegisterValidatorHandlers(counterHandler)
968968

969969
for i := 0; i < expectedMsgs/2; i++ {

network/wsNetwork.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -825,8 +825,8 @@ func (wn *WebsocketNetwork) ClearHandlers() {
825825
func (wn *WebsocketNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) {
826826
}
827827

828-
// ClearProcessors deregisters all the existing message handlers.
829-
func (wn *WebsocketNetwork) ClearProcessors() {
828+
// ClearValidatorHandlers deregisters all the existing message handlers.
829+
func (wn *WebsocketNetwork) ClearValidatorHandlers() {
830830
}
831831

832832
func (wn *WebsocketNetwork) setHeaders(header http.Header) {

node/node.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,7 @@ func (node *AlgorandFullNode) Stop() {
452452
}()
453453

454454
node.net.ClearHandlers()
455+
node.net.ClearValidatorHandlers()
455456
if !node.config.DisableNetworking {
456457
node.net.Stop()
457458
}
@@ -1218,6 +1219,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
12181219
node.waitMonitoringRoutines()
12191220
}()
12201221
node.net.ClearHandlers()
1222+
node.net.ClearValidatorHandlers()
12211223
node.stateProofWorker.Stop()
12221224
node.txHandler.Stop()
12231225
node.agreementService.Shutdown()

node/node_test.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -606,12 +606,11 @@ func TestDefaultResourcePaths(t *testing.T) {
606606
log := logging.Base()
607607

608608
n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis)
609+
require.NoError(t, err)
609610

610611
n.Start()
611612
defer n.Stop()
612613

613-
require.NoError(t, err)
614-
615614
// confirm genesis dir exists in the data dir, and that resources exist in the expected locations
616615
require.DirExists(t, filepath.Join(testDirectory, genesis.ID()))
617616

@@ -1073,3 +1072,47 @@ func TestNodeP2PRelays(t *testing.T) {
10731072
return len(nodes[2].net.GetPeers(network.PeersPhonebookRelays)) == 2
10741073
}, 80*time.Second, 1*time.Second)
10751074
}
1075+
1076+
// TestNodeSetCatchpointCatchupMode checks node can handle services restart for fast catchup correctly
1077+
func TestNodeSetCatchpointCatchupMode(t *testing.T) {
1078+
partitiontest.PartitionTest(t)
1079+
1080+
testDirectory := t.TempDir()
1081+
1082+
genesis := bookkeeping.Genesis{
1083+
SchemaID: "gen",
1084+
Proto: protocol.ConsensusCurrentVersion,
1085+
Network: config.Devtestnet,
1086+
FeeSink: sinkAddr.String(),
1087+
RewardsPool: poolAddr.String(),
1088+
}
1089+
log := logging.TestingLog(t)
1090+
cfg := config.GetDefaultLocal()
1091+
1092+
tests := []struct {
1093+
name string
1094+
enableP2P bool
1095+
}{
1096+
{"WS node", false},
1097+
{"P2P node", true},
1098+
}
1099+
1100+
for _, test := range tests {
1101+
t.Run(test.name, func(t *testing.T) {
1102+
cfg.EnableP2P = test.enableP2P
1103+
1104+
n, err := MakeFull(log, testDirectory, cfg, []string{}, genesis)
1105+
require.NoError(t, err)
1106+
err = n.Start()
1107+
require.NoError(t, err)
1108+
defer n.Stop()
1109+
1110+
// "start" catchpoint catchup => close services
1111+
outCh := n.SetCatchpointCatchupMode(true)
1112+
<-outCh
1113+
// "stop" catchpoint catchup => resume services
1114+
outCh = n.SetCatchpointCatchupMode(false)
1115+
<-outCh
1116+
})
1117+
}
1118+
}

0 commit comments

Comments
 (0)