Skip to content

Commit edda2ee

Browse files
rpcs: simplify API for BlockService to handle multiple HTTP paths (algorand#5718)
Co-authored-by: Pavel Zbitskiy <pavel@algorand.com>
1 parent 04ec5f9 commit edda2ee

14 files changed

+80
-30
lines changed

catchup/fetcher_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,13 @@ func (b *basicRPCNode) RegisterHTTPHandler(path string, handler http.Handler) {
141141
b.rmux.Handle(path, handler)
142142
}
143143

144+
func (b *basicRPCNode) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
145+
if b.rmux == nil {
146+
b.rmux = mux.NewRouter()
147+
}
148+
b.rmux.HandleFunc(path, handler)
149+
}
150+
144151
func (b *basicRPCNode) RegisterHandlers(dispatch []network.TaggedMessageHandler) {
145152
}
146153

catchup/pref_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {
5050
net := &httpTestPeerSource{}
5151
ls := rpcs.MakeBlockService(logging.TestingLog(b), config.GetDefaultLocal(), remote, net, "test genesisID")
5252
nodeA := basicRPCNode{}
53-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
53+
ls.RegisterHandlers(&nodeA)
5454
nodeA.start()
5555
defer nodeA.stop()
5656
rootURL := nodeA.rootURL()

catchup/service_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func TestServiceFetchBlocksSameRange(t *testing.T) {
151151
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
152152

153153
nodeA := basicRPCNode{}
154-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
154+
ls.RegisterHandlers(&nodeA)
155155
nodeA.start()
156156
defer nodeA.stop()
157157
rootURL := nodeA.rootURL()
@@ -223,7 +223,7 @@ func TestSyncRound(t *testing.T) {
223223
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
224224

225225
nodeA := basicRPCNode{}
226-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
226+
ls.RegisterHandlers(&nodeA)
227227
nodeA.start()
228228
defer nodeA.stop()
229229
rootURL := nodeA.rootURL()
@@ -313,7 +313,7 @@ func TestPeriodicSync(t *testing.T) {
313313
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
314314

315315
nodeA := basicRPCNode{}
316-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
316+
ls.RegisterHandlers(&nodeA)
317317
nodeA.start()
318318
defer nodeA.stop()
319319
rootURL := nodeA.rootURL()
@@ -379,7 +379,7 @@ func TestServiceFetchBlocksOneBlock(t *testing.T) {
379379
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
380380

381381
nodeA := basicRPCNode{}
382-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
382+
ls.RegisterHandlers(&nodeA)
383383
nodeA.start()
384384
defer nodeA.stop()
385385
rootURL := nodeA.rootURL()
@@ -443,7 +443,7 @@ func TestAbruptWrites(t *testing.T) {
443443
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
444444

445445
nodeA := basicRPCNode{}
446-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
446+
ls.RegisterHandlers(&nodeA)
447447
nodeA.start()
448448
defer nodeA.stop()
449449
rootURL := nodeA.rootURL()
@@ -501,7 +501,7 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
501501
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
502502

503503
nodeA := basicRPCNode{}
504-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
504+
ls.RegisterHandlers(&nodeA)
505505
nodeA.start()
506506
defer nodeA.stop()
507507
rootURL := nodeA.rootURL()
@@ -555,7 +555,7 @@ func TestServiceFetchBlocksMalformed(t *testing.T) {
555555
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
556556

557557
nodeA := basicRPCNode{}
558-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
558+
ls.RegisterHandlers(&nodeA)
559559
nodeA.start()
560560
defer nodeA.stop()
561561
rootURL := nodeA.rootURL()
@@ -709,7 +709,7 @@ func helperTestOnSwitchToUnSupportedProtocol(
709709
ls := rpcs.MakeBlockService(logging.Base(), config, remote, net, "test genesisID")
710710

711711
nodeA := basicRPCNode{}
712-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
712+
ls.RegisterHandlers(&nodeA)
713713
nodeA.start()
714714
defer nodeA.stop()
715715
rootURL := nodeA.rootURL()
@@ -932,7 +932,7 @@ func TestCatchupUnmatchedCertificate(t *testing.T) {
932932
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
933933

934934
nodeA := basicRPCNode{}
935-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
935+
ls.RegisterHandlers(&nodeA)
936936
nodeA.start()
937937
defer nodeA.stop()
938938
rootURL := nodeA.rootURL()
@@ -1064,7 +1064,7 @@ func TestServiceLedgerUnavailable(t *testing.T) {
10641064
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
10651065

10661066
nodeA := basicRPCNode{}
1067-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
1067+
ls.RegisterHandlers(&nodeA)
10681068
nodeA.start()
10691069
defer nodeA.stop()
10701070
rootURL := nodeA.rootURL()
@@ -1110,7 +1110,7 @@ func TestServiceNoBlockForRound(t *testing.T) {
11101110
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")
11111111

11121112
nodeA := basicRPCNode{}
1113-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
1113+
ls.RegisterHandlers(&nodeA)
11141114
nodeA.start()
11151115
defer nodeA.stop()
11161116
rootURL := nodeA.rootURL()

catchup/universalFetcher_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func TestUGetBlockHTTP(t *testing.T) {
101101
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")
102102

103103
nodeA := basicRPCNode{}
104-
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
104+
ls.RegisterHandlers(&nodeA)
105105
nodeA.start()
106106
defer nodeA.stop()
107107
rootURL := nodeA.rootURL()

components/mocks/mockNetwork.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ func (network *MockNetwork) ClearProcessors() {
103103
func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
104104
}
105105

106+
// RegisterHTTPHandlerFunc - empty implementation
107+
func (network *MockNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
108+
}
109+
106110
// OnNetworkAdvance - empty implementation
107111
func (network *MockNetwork) OnNetworkAdvance() {}
108112

network/gossipNode.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ type GossipNode interface {
5757
Disconnect(badnode DisconnectablePeer)
5858
DisconnectPeers() // only used by testing
5959

60-
// RegisterHTTPHandler path accepts gorilla/mux path annotations
60+
// RegisterHTTPHandler and RegisterHTTPHandlerFunc: path accepts gorilla/mux path annotations
6161
RegisterHTTPHandler(path string, handler http.Handler)
62+
RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request))
6263

6364
// RequestConnectOutgoing asks the system to actually connect to peers.
6465
// `replace` optionally drops existing connections before making new ones.

network/hybridNetwork.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ func (n *HybridP2PNetwork) RegisterHTTPHandler(path string, handler http.Handler
146146
n.wsNetwork.RegisterHTTPHandler(path, handler)
147147
}
148148

149+
// RegisterHTTPHandlerFunc implements GossipNode
150+
func (n *HybridP2PNetwork) RegisterHTTPHandlerFunc(path string, handlerFunc func(http.ResponseWriter, *http.Request)) {
151+
n.p2pNetwork.RegisterHTTPHandlerFunc(path, handlerFunc)
152+
n.wsNetwork.RegisterHTTPHandlerFunc(path, handlerFunc)
153+
}
154+
149155
// RequestConnectOutgoing implements GossipNode
150156
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {}
151157

network/p2p/http.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ func (s *HTTPServer) RegisterHTTPHandler(path string, handler http.Handler) {
5757
})
5858
}
5959

60+
// RegisterHTTPHandlerFunc registers a http handler with a given path.
61+
func (s *HTTPServer) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
62+
s.p2phttpMux.HandleFunc(path, handler)
63+
s.p2phttpMuxRegistrarOnce.Do(func() {
64+
s.Host.SetHTTPHandlerAtPath(algorandP2pHTTPProtocol, "/", s.p2phttpMux)
65+
})
66+
}
67+
6068
// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
6169
func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
6270
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)

network/p2pNetwork.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,12 @@ func (n *P2PNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
577577
n.httpServer.RegisterHTTPHandler(path, handler)
578578
}
579579

580+
// RegisterHTTPHandlerFunc is like RegisterHTTPHandler but accepts
581+
// a callback handler function instead of a method receiver.
582+
func (n *P2PNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
583+
n.httpServer.RegisterHTTPHandlerFunc(path, handler)
584+
}
585+
580586
// RequestConnectOutgoing asks the system to actually connect to peers.
581587
// `replace` optionally drops existing connections before making new ones.
582588
// `quit` chan allows cancellation.

network/wsNetwork.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,11 @@ func (wn *WebsocketNetwork) RegisterHTTPHandler(path string, handler http.Handle
523523
wn.router.Handle(path, handler)
524524
}
525525

526+
// RegisterHTTPHandlerFunc path accepts gorilla/mux path annotations
527+
func (wn *WebsocketNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
528+
wn.router.HandleFunc(path, handler)
529+
}
530+
526531
// RequestConnectOutgoing tries to actually do the connect to new peers.
527532
// `replace` drop all connections first and find new peers.
528533
func (wn *WebsocketNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {

rpcs/blockService.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const blockServerCatchupRequestBufferSize = 10
5858
const BlockResponseLatestRoundHeader = "X-Latest-Round"
5959

6060
// BlockServiceBlockPath is the path to register BlockService as a handler for when using gorilla/mux
61-
// e.g. .Handle(BlockServiceBlockPath, &ls)
61+
// e.g. .HandleFunc(BlockServiceBlockPath, ls.ServeBlockPath)
6262
const BlockServiceBlockPath = "/v{version:[0-9.]+}/{genesisID}/block/{round:[0-9a-z]+}"
6363

6464
// Constant strings used as keys for topics
@@ -147,11 +147,16 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger LedgerForB
147147
memoryCap: config.BlockServiceMemCap,
148148
}
149149
if service.enableService {
150-
net.RegisterHTTPHandler(BlockServiceBlockPath, service)
150+
service.RegisterHandlers(net)
151151
}
152152
return service
153153
}
154154

155+
// RegisterHandlers registers the request handlers for BlockService's paths with the registrar.
156+
func (bs *BlockService) RegisterHandlers(registrar Registrar) {
157+
registrar.RegisterHTTPHandlerFunc(BlockServiceBlockPath, bs.ServeBlockPath)
158+
}
159+
155160
// Start listening to catchup requests over ws
156161
func (bs *BlockService) Start() {
157162
bs.mu.Lock()
@@ -179,10 +184,10 @@ func (bs *BlockService) Stop() {
179184
bs.closeWaitGroup.Wait()
180185
}
181186

182-
// ServerHTTP returns blocks
187+
// ServeBlockPath returns blocks
183188
// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version}
184189
// Uses gorilla/mux for path argument parsing.
185-
func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) {
190+
func (bs *BlockService) ServeBlockPath(response http.ResponseWriter, request *http.Request) {
186191
pathVars := mux.Vars(request)
187192
versionStr, hasVersionStr := pathVars["version"]
188193
roundStr, hasRoundStr := pathVars["round"]
@@ -260,13 +265,13 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re
260265
if !ok {
261266
response.Header().Set("Retry-After", blockResponseRetryAfter)
262267
response.WriteHeader(http.StatusServiceUnavailable)
263-
bs.log.Debugf("ServeHTTP: returned retry-after: %v", err)
268+
bs.log.Debugf("ServeBlockPath: returned retry-after: %v", err)
264269
}
265270
httpBlockMessagesDroppedCounter.Inc(nil)
266271
return
267272
default:
268273
// unexpected error.
269-
bs.log.Warnf("ServeHTTP : failed to retrieve block %d %v", round, err)
274+
bs.log.Warnf("ServeBlockPath: failed to retrieve block %d %v", round, err)
270275
response.WriteHeader(http.StatusInternalServerError)
271276
return
272277
}

rpcs/blockService_test.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ func TestRedirectFallbackEndpoints(t *testing.T) {
166166
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
167167
bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID")
168168

169-
nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
170-
nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
169+
bs1.RegisterHandlers(nodeA)
170+
bs2.RegisterHandlers(nodeB)
171171

172172
parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
173173
require.NoError(t, err)
@@ -210,7 +210,7 @@ func TestBlockServiceShutdown(t *testing.T) {
210210

211211
nodeA := &basicRPCNode{}
212212

213-
nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
213+
bs1.RegisterHandlers(nodeA)
214214
nodeA.start()
215215
defer nodeA.stop()
216216

@@ -292,9 +292,8 @@ func TestRedirectOnFullCapacity(t *testing.T) {
292292
bs1.memoryCap = 250
293293
bs2.memoryCap = 250
294294

295-
nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
296-
297-
nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
295+
bs1.RegisterHandlers(nodeA)
296+
bs2.RegisterHandlers(nodeB)
298297

299298
parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
300299
require.NoError(t, err)
@@ -371,11 +370,11 @@ forloop:
371370

372371
// First node redirects, does not return retry
373372
require.True(t, strings.Contains(logBuffer1.String(), "redirectRequest: redirected block request to"))
374-
require.False(t, strings.Contains(logBuffer1.String(), "ServeHTTP: returned retry-after: block service memory over capacity"))
373+
require.False(t, strings.Contains(logBuffer1.String(), "ServeBlockPath: returned retry-after: block service memory over capacity"))
375374

376375
// Second node cannot redirect, it returns retry-after when over capacity
377376
require.False(t, strings.Contains(logBuffer2.String(), "redirectRequest: redirected block request to"))
378-
require.True(t, strings.Contains(logBuffer2.String(), "ServeHTTP: returned retry-after: block service memory over capacity"))
377+
require.True(t, strings.Contains(logBuffer2.String(), "ServeBlockPath: returned retry-after: block service memory over capacity"))
379378
}
380379

381380
// TestWsBlockLimiting ensures that limits are applied correctly on the websocket side of the service
@@ -474,8 +473,8 @@ func TestRedirectExceptions(t *testing.T) {
474473
bs1 := MakeBlockService(log1, configInvalidRedirects, ledger1, net1, "{genesisID}")
475474
bs2 := MakeBlockService(log2, configWithRedirectToSelf, ledger2, net2, "{genesisID}")
476475

477-
nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
478-
nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
476+
bs1.RegisterHandlers(nodeA)
477+
bs2.RegisterHandlers(nodeB)
479478

480479
parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
481480
require.NoError(t, err)

rpcs/registrar.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
type Registrar interface {
2727
// RegisterHTTPHandler path accepts gorilla/mux path annotations
2828
RegisterHTTPHandler(path string, handler http.Handler)
29+
// RegisterHTTPHandlerFunc path accepts gorilla/mux path annotations and a HandlerFunc
30+
RegisterHTTPHandlerFunc(path string, handler func(response http.ResponseWriter, request *http.Request))
2931
// RegisterHandlers exposes global websocket handler registration
3032
RegisterHandlers(dispatch []network.TaggedMessageHandler)
3133
}

rpcs/txService_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,13 @@ func (b *basicRPCNode) RegisterHTTPHandler(path string, handler http.Handler) {
8989
b.rmux.Handle(path, handler)
9090
}
9191

92+
func (b *basicRPCNode) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
93+
if b.rmux == nil {
94+
b.rmux = mux.NewRouter()
95+
}
96+
b.rmux.HandleFunc(path, handler)
97+
}
98+
9299
func (b *basicRPCNode) RegisterHandlers(dispatch []network.TaggedMessageHandler) {
93100
}
94101

0 commit comments

Comments
 (0)