Skip to content

Commit fbbb32b

Browse files
authored
Merge pull request #240 from kcalvinalvin/2025-01-16-always-download-headers-for-blocks
netsync, main: always download headers for blocks
2 parents 0616196 + dc23fd7 commit fbbb32b

File tree

2 files changed

+127
-70
lines changed

2 files changed

+127
-70
lines changed

netsync/manager.go

Lines changed: 123 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,11 @@ type SyncManager struct {
219219
headersBuildMode bool
220220

221221
// The following fields are used for headers-first mode.
222-
headersFirstMode bool
223-
headerList *list.List
224-
startHeader *list.Element
225-
nextCheckpoint *chaincfg.Checkpoint
226-
utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader
227-
requestedUtreexoHeaders map[chainhash.Hash]struct{}
222+
headersFirstMode bool
223+
headerList *list.List
224+
startHeader *list.Element
225+
nextCheckpoint *chaincfg.Checkpoint
226+
utreexoHeaders map[chainhash.Hash]*wire.MsgUtreexoHeader
228227

229228
// An optional fee estimator.
230229
feeEstimator *mempool.FeeEstimator
@@ -366,7 +365,6 @@ func (sm *SyncManager) startSync() {
366365
// during headersFirstMode.
367366
if !sm.headersFirstMode {
368367
sm.requestedBlocks = make(map[chainhash.Hash]struct{})
369-
sm.requestedUtreexoHeaders = make(map[chainhash.Hash]struct{})
370368
}
371369

372370
log.Infof("Syncing to block height %d from peer %v",
@@ -396,11 +394,11 @@ func (sm *SyncManager) startSync() {
396394
// should have all the previous headers as well.
397395
_, have := sm.utreexoHeaders[*node.hash]
398396
if !have {
399-
sm.fetchUtreexoHeaders()
397+
sm.fetchUtreexoHeaders(nil)
400398
return
401399
}
402400
}
403-
sm.fetchHeaderBlocks()
401+
sm.fetchHeaderBlocks(nil)
404402
return
405403
}
406404

@@ -414,11 +412,11 @@ func (sm *SyncManager) startSync() {
414412
// should have all the previous headers as well.
415413
_, have := sm.utreexoHeaders[*node.hash]
416414
if !have {
417-
sm.fetchUtreexoHeaders()
415+
sm.fetchUtreexoHeaders(nil)
418416
return
419417
}
420418
}
421-
sm.fetchHeaderBlocks()
419+
sm.fetchHeaderBlocks(nil)
422420
return
423421
}
424422

@@ -693,12 +691,6 @@ func (sm *SyncManager) clearRequestedState(state *peerSyncState) {
693691
for blockHash := range state.requestedBlocks {
694692
delete(sm.requestedBlocks, blockHash)
695693
}
696-
697-
// Also remove requested utreexo headers from the global map so
698-
// that they will be fetched from elsewhere next time we get an inv.
699-
for blockHash := range state.requestedUtreexoHeaders {
700-
delete(sm.requestedUtreexoHeaders, blockHash)
701-
}
702694
}
703695
}
704696

@@ -1030,7 +1022,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
10301022
if bmsg.block.Height() < lastHeight {
10311023
if sm.startHeader != nil &&
10321024
len(state.requestedBlocks) < minInFlightBlocks {
1033-
sm.fetchHeaderBlocks()
1025+
sm.fetchHeaderBlocks(nil)
10341026
}
10351027
return
10361028
}
@@ -1050,7 +1042,7 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
10501042
if !isCheckpointBlock {
10511043
if sm.startHeader != nil &&
10521044
len(state.requestedBlocks) < minInFlightBlocks {
1053-
sm.fetchHeaderBlocks()
1045+
sm.fetchHeaderBlocks(nil)
10541046
}
10551047
return
10561048
}
@@ -1090,16 +1082,23 @@ func (sm *SyncManager) handleBlockMsg(bmsg *blockMsg) {
10901082

10911083
// fetchUtreexoHeaders creates and sends a request to the syncPeer for the next
10921084
// list of utreexo headers to be downloaded based on the current list of headers.
1093-
func (sm *SyncManager) fetchUtreexoHeaders() {
1085+
// Will fetch from the peer if it's not nil. Otherwise it'll default to the syncPeer.
1086+
func (sm *SyncManager) fetchUtreexoHeaders(peer *peerpkg.Peer) {
10941087
// Nothing to do if there is no start header.
10951088
if sm.startHeader == nil {
10961089
log.Warnf("fetchUtreexoHeaders called with no start header")
10971090
return
10981091
}
10991092

1100-
state, exists := sm.peerStates[sm.syncPeer]
1093+
// Default to the syncPeer unless we're given a peer by the caller.
1094+
reqPeer := sm.syncPeer
1095+
if reqPeer == nil {
1096+
reqPeer = peer
1097+
}
1098+
1099+
state, exists := sm.peerStates[reqPeer]
11011100
if !exists {
1102-
log.Warnf("Don't have peer state for sync peer %s", sm.syncPeer.String())
1101+
log.Warnf("Don't have peer state for request peer %s", reqPeer.String())
11031102
return
11041103
}
11051104

@@ -1115,7 +1114,7 @@ func (sm *SyncManager) fetchUtreexoHeaders() {
11151114
if !requested && !have {
11161115
state.requestedUtreexoHeaders[*node.hash] = struct{}{}
11171116
ghmsg := wire.NewMsgGetUtreexoHeader(*node.hash)
1118-
sm.syncPeer.QueueMessage(ghmsg, nil)
1117+
reqPeer.QueueMessage(ghmsg, nil)
11191118
}
11201119

11211120
if len(state.requestedUtreexoHeaders) > minInFlightBlocks {
@@ -1126,13 +1125,20 @@ func (sm *SyncManager) fetchUtreexoHeaders() {
11261125

11271126
// fetchHeaderBlocks creates and sends a request to the syncPeer for the next
11281127
// list of blocks to be downloaded based on the current list of headers.
1129-
func (sm *SyncManager) fetchHeaderBlocks() {
1128+
// Will fetch from the peer if it's not nil. Otherwise it'll default to the syncPeer.
1129+
func (sm *SyncManager) fetchHeaderBlocks(peer *peerpkg.Peer) {
11301130
// Nothing to do if there is no start header.
11311131
if sm.startHeader == nil {
11321132
log.Warnf("fetchHeaderBlocks called with no start header")
11331133
return
11341134
}
11351135

1136+
// Default to the syncPeer unless we're given a peer by the caller.
1137+
reqPeer := sm.syncPeer
1138+
if reqPeer == nil {
1139+
reqPeer = peer
1140+
}
1141+
11361142
// Build up a getdata request for the list of blocks the headers
11371143
// describe. The size hint will be limited to wire.MaxInvPerMsg by
11381144
// the function, so no need to double check it here.
@@ -1153,24 +1159,24 @@ func (sm *SyncManager) fetchHeaderBlocks() {
11531159
"fetch: %v", err)
11541160
}
11551161
if !haveInv {
1156-
syncPeerState := sm.peerStates[sm.syncPeer]
1162+
syncPeerState := sm.peerStates[reqPeer]
11571163
syncPeerState.requestedBlocks[*node.hash] = struct{}{}
11581164

11591165
// If we're fetching from a witness enabled peer
11601166
// post-fork, then ensure that we receive all the
11611167
// witness data in the blocks.
1162-
if sm.syncPeer.IsWitnessEnabled() {
1168+
if reqPeer.IsWitnessEnabled() {
11631169
iv.Type = wire.InvTypeWitnessBlock
11641170

11651171
// If we're syncing from a utreexo enabled peer, also
11661172
// ask for the proofs.
1167-
if sm.syncPeer.IsUtreexoEnabled() {
1173+
if reqPeer.IsUtreexoEnabled() {
11681174
iv.Type = wire.InvTypeWitnessUtreexoBlock
11691175
}
11701176
} else {
11711177
// If we're syncing from a utreexo enabled peer, also
11721178
// ask for the proofs.
1173-
if sm.syncPeer.IsUtreexoEnabled() {
1179+
if reqPeer.IsUtreexoEnabled() {
11741180
iv.Type = wire.InvTypeUtreexoBlock
11751181
}
11761182
}
@@ -1184,7 +1190,7 @@ func (sm *SyncManager) fetchHeaderBlocks() {
11841190
}
11851191
}
11861192
if len(gdmsg.InvList) > 0 {
1187-
sm.syncPeer.QueueMessage(gdmsg, nil)
1193+
reqPeer.QueueMessage(gdmsg, nil)
11881194
}
11891195
}
11901196

@@ -1198,21 +1204,79 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
11981204
return
11991205
}
12001206

1201-
// The remote peer is misbehaving if we didn't request headers.
12021207
msg := hmsg.headers
12031208
numHeaders := len(msg.Headers)
1204-
if !sm.headersFirstMode && !sm.headersBuildMode {
1205-
log.Warnf("Got %d unrequested headers from %s -- "+
1206-
"disconnecting", numHeaders, peer.Addr())
1207-
peer.Disconnect()
1208-
return
1209-
}
12101209

12111210
// Nothing to do for an empty headers message.
12121211
if numHeaders == 0 {
12131212
return
12141213
}
12151214

1215+
utreexoViewActive := sm.chain.IsUtreexoViewActive()
1216+
1217+
// If we're not in headers first, it means that these headers are for new
1218+
// block announcements.
1219+
if !sm.headersFirstMode && !sm.headersBuildMode {
1220+
best := sm.chain.BestSnapshot()
1221+
sm.headerList.Init()
1222+
sm.resetHeaderState(&best.Hash, best.Height)
1223+
1224+
for _, blockHeader := range msg.Headers {
1225+
err := sm.chain.ProcessBlockHeader(blockHeader)
1226+
if err != nil {
1227+
log.Warnf("Received block header from peer %v "+
1228+
"failed header verification -- disconnecting",
1229+
peer.Addr())
1230+
peer.Disconnect()
1231+
return
1232+
}
1233+
1234+
prevNodeEl := sm.headerList.Back()
1235+
if prevNodeEl == nil {
1236+
log.Warnf("Header list does not contain a previous" +
1237+
"element as expected -- disconnecting peer")
1238+
peer.Disconnect()
1239+
return
1240+
}
1241+
1242+
prevNode := prevNodeEl.Value.(*headerNode)
1243+
blockHash := blockHeader.BlockHash()
1244+
node := headerNode{hash: &blockHash}
1245+
if prevNode.hash.IsEqual(&blockHeader.PrevBlock) {
1246+
node.height = prevNode.height + 1
1247+
e := sm.headerList.PushBack(&node)
1248+
if sm.startHeader == nil {
1249+
sm.startHeader = e
1250+
}
1251+
} else {
1252+
log.Warnf("Received block header that does not "+
1253+
"properly connect to the chain from peer %s "+
1254+
"-- disconnecting", peer.Addr())
1255+
peer.Disconnect()
1256+
return
1257+
}
1258+
}
1259+
1260+
// Since the first entry of the list is always the final block
1261+
// that is already in the database and is only used to ensure
1262+
// the next header links properly, it must be removed before
1263+
// fetching the headers or the utreexo headers.
1264+
sm.headerList.Remove(sm.headerList.Front())
1265+
sm.progressLogger.SetLastLogTime(time.Now())
1266+
1267+
if utreexoViewActive {
1268+
log.Infof("Received %v block headers: Fetching utreexo headers",
1269+
sm.headerList.Len())
1270+
sm.fetchUtreexoHeaders(hmsg.peer)
1271+
} else {
1272+
log.Infof("Received %v block headers: Fetching blocks",
1273+
sm.headerList.Len())
1274+
sm.fetchHeaderBlocks(hmsg.peer)
1275+
}
1276+
1277+
return
1278+
}
1279+
12161280
if sm.headersBuildMode {
12171281
var finalHeader *wire.BlockHeader
12181282
var finalHeight int32
@@ -1294,8 +1358,6 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
12941358
return
12951359
}
12961360

1297-
utreexoViewActive := sm.chain.IsUtreexoViewActive()
1298-
12991361
// This means that we've ran out of checkpoints and need to verify the headers that
13001362
// we've received.
13011363
if sm.nextCheckpoint == nil {
@@ -1359,11 +1421,11 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
13591421
if utreexoViewActive {
13601422
log.Infof("Received %v block headers: Fetching utreexo headers",
13611423
sm.headerList.Len())
1362-
sm.fetchUtreexoHeaders()
1424+
sm.fetchUtreexoHeaders(nil)
13631425
} else {
13641426
log.Infof("Received %v block headers: Fetching blocks",
13651427
sm.headerList.Len())
1366-
sm.fetchHeaderBlocks()
1428+
sm.fetchHeaderBlocks(nil)
13671429
}
13681430
}
13691431

@@ -1438,7 +1500,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
14381500
log.Infof("Received %v block headers: Fetching utreexo headers",
14391501
sm.headerList.Len())
14401502
sm.progressLogger.SetLastLogTime(time.Now())
1441-
sm.fetchUtreexoHeaders()
1503+
sm.fetchUtreexoHeaders(nil)
14421504
return
14431505
}
14441506
// Since the first entry of the list is always the final block
@@ -1449,7 +1511,7 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
14491511
log.Infof("Received %v block headers: Fetching blocks",
14501512
sm.headerList.Len())
14511513
sm.progressLogger.SetLastLogTime(time.Now())
1452-
sm.fetchHeaderBlocks()
1514+
sm.fetchHeaderBlocks(nil)
14531515
return
14541516
}
14551517

@@ -1510,20 +1572,20 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) {
15101572
log.Infof("Received utreexo headers to block "+
15111573
"%d/hash %s. Fetching blocks",
15121574
node.height, node.hash)
1513-
sm.fetchHeaderBlocks()
1575+
sm.fetchHeaderBlocks(nil)
15141576
return
15151577
} else if node.height == peer.LastBlock() {
15161578
log.Infof("Received utreexo headers to block "+
15171579
"%d/hash %s. Fetching blocks",
15181580
node.height, node.hash)
1519-
sm.fetchHeaderBlocks()
1581+
sm.fetchHeaderBlocks(nil)
15201582
return
15211583
}
15221584
}
15231585
}
15241586

15251587
if len(peerState.requestedUtreexoHeaders) < minInFlightBlocks {
1526-
sm.fetchUtreexoHeaders()
1588+
sm.fetchUtreexoHeaders(nil)
15271589
}
15281590

15291591
return
@@ -1532,19 +1594,12 @@ func (sm *SyncManager) handleUtreexoHeaderMsg(hmsg *utreexoHeaderMsg) {
15321594
// We're not in headers-first mode. When we receive a utreexo header,
15331595
// immediately ask for the block.
15341596
sm.utreexoHeaders[msg.BlockHash] = hmsg.header
1535-
delete(sm.requestedUtreexoHeaders, msg.BlockHash)
1597+
delete(peerState.requestedUtreexoHeaders, msg.BlockHash)
15361598
log.Debugf("accepted utreexo header for block %v. have %v headers",
15371599
msg.BlockHash, len(sm.utreexoHeaders))
15381600

1539-
if !sm.headersFirstMode {
1540-
sm.requestedBlocks[msg.BlockHash] = struct{}{}
1541-
}
15421601
peerState.requestedBlocks[msg.BlockHash] = struct{}{}
1543-
1544-
gdmsg := wire.NewMsgGetData()
1545-
iv := wire.NewInvVect(wire.InvTypeWitnessUtreexoBlock, &msg.BlockHash)
1546-
gdmsg.AddInvVect(iv)
1547-
peer.QueueMessage(gdmsg, nil)
1602+
sm.fetchHeaderBlocks(peer)
15481603
}
15491604

15501605
// handleNotFoundMsg handles notfound messages from all peers.
@@ -1876,7 +1931,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
18761931
if _, exists := sm.requestedBlocks[iv.Hash]; !exists {
18771932
amUtreexoNode := sm.chain.IsUtreexoViewActive()
18781933
if amUtreexoNode {
1879-
sm.requestedUtreexoHeaders[iv.Hash] = struct{}{}
18801934
ghmsg := wire.NewMsgGetUtreexoHeader(iv.Hash)
18811935
peer.QueueMessage(ghmsg, nil)
18821936
continue
@@ -2362,21 +2416,20 @@ func (sm *SyncManager) Pause() chan<- struct{} {
23622416
// block, tx, and inv updates.
23632417
func New(config *Config) (*SyncManager, error) {
23642418
sm := SyncManager{
2365-
peerNotifier: config.PeerNotifier,
2366-
chain: config.Chain,
2367-
txMemPool: config.TxMemPool,
2368-
chainParams: config.ChainParams,
2369-
rejectedTxns: make(map[chainhash.Hash]struct{}),
2370-
requestedTxns: make(map[chainhash.Hash]struct{}),
2371-
requestedBlocks: make(map[chainhash.Hash]struct{}),
2372-
requestedUtreexoHeaders: make(map[chainhash.Hash]struct{}),
2373-
utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader),
2374-
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
2375-
progressLogger: newBlockProgressLogger("Processed", log),
2376-
msgChan: make(chan interface{}, config.MaxPeers*3),
2377-
headerList: list.New(),
2378-
quit: make(chan struct{}),
2379-
feeEstimator: config.FeeEstimator,
2419+
peerNotifier: config.PeerNotifier,
2420+
chain: config.Chain,
2421+
txMemPool: config.TxMemPool,
2422+
chainParams: config.ChainParams,
2423+
rejectedTxns: make(map[chainhash.Hash]struct{}),
2424+
requestedTxns: make(map[chainhash.Hash]struct{}),
2425+
requestedBlocks: make(map[chainhash.Hash]struct{}),
2426+
utreexoHeaders: make(map[chainhash.Hash]*wire.MsgUtreexoHeader),
2427+
peerStates: make(map[*peerpkg.Peer]*peerSyncState),
2428+
progressLogger: newBlockProgressLogger("Processed", log),
2429+
msgChan: make(chan interface{}, config.MaxPeers*3),
2430+
headerList: list.New(),
2431+
quit: make(chan struct{}),
2432+
feeEstimator: config.FeeEstimator,
23802433
}
23812434

23822435
best := sm.chain.BestSnapshot()

0 commit comments

Comments
 (0)