Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netsync: Track peer for requested blocks. #3444

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 184 additions & 71 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,26 @@ type Peer struct {
// longer useful or are otherwise being malicious.
numConsecutiveOrphanHeaders int32

lastAnnouncedBlock *chainhash.Hash
// These fields are used to track the best known block announced by the peer
// which in turn provides a means to discover which blocks are available to
// download from the peer.
//
// announcedOrphanBlock is the hash of the most recently announced block
// that did not connect to any headers known to the local chain at the time
// of the announcement. It is tracked because such announcements are
// typically for newly found blocks whose parent headers will eventually
// become known and therefore have a fairly good chance of becoming the
// block with the most cumulative proof of work that the peer has announced.
//
// bestAnnouncedBlock is the hash of the block with the most cumulative
// proof of work that the peer has announced that is also known to the local
// chain.
//
// bestAnnouncedWork is the cumulative proof of work for the associated best
// announced block hash.
announcedOrphanBlock *chainhash.Hash
bestAnnouncedBlock *chainhash.Hash
bestAnnouncedWork *uint256.Uint256
}

// NewPeer returns a new instance of a peer that wraps the provided underlying
Expand Down Expand Up @@ -319,7 +338,7 @@ type SyncManager struct {
rejectedTxns *apbf.Filter
rejectedMixMsgs *apbf.Filter
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]*Peer
requestedMixMsgs map[chainhash.Hash]struct{}
progressLogger *progresslog.Logger
syncPeer *Peer
Expand Down Expand Up @@ -405,6 +424,16 @@ func (m *SyncManager) maybeUpdateNextNeededBlocks() {
}
}

// isRequestedBlock returns whether or not the given block hash has already been
// requested from any remote peer.
//
// This function is NOT safe for concurrent access. It must be called from the
// event handler goroutine.
func (m *SyncManager) isRequestedBlock(hash *chainhash.Hash) bool {
_, ok := m.requestedBlocks[*hash]
return ok
}

// fetchNextBlocks creates and sends a request to the provided peer for the next
// blocks to be downloaded based on the current headers.
func (m *SyncManager) fetchNextBlocks(peer *Peer) {
Expand Down Expand Up @@ -439,12 +468,12 @@ func (m *SyncManager) fetchNextBlocks(peer *Peer) {
// Skip blocks that have already been requested. The needed blocks
// might have been updated above thereby potentially repopulating some
// blocks that are still in flight.
if _, ok := m.requestedBlocks[*hash]; ok {
if m.isRequestedBlock(hash) {
continue
}

iv := wire.NewInvVect(wire.InvTypeBlock, hash)
m.requestedBlocks[*hash] = struct{}{}
m.requestedBlocks[*hash] = peer
peer.requestedBlocks[*hash] = struct{}{}
gdmsg.AddInvVect(iv)
}
Expand Down Expand Up @@ -649,6 +678,21 @@ func (m *SyncManager) handlePeerConnectedMsg(ctx context.Context, peer *Peer) {

m.peers[peer] = struct{}{}

// Request headers starting from the parent of the best known header for the
// local chain immediately when the initial headers sync process is complete
// and the peer is a sync candidate.
//
// This primarily serves two purposes:
//
// 1) It immediately discovers any blocks that are not already known
// 2) It provides accurate discovery of the best known block of the peer
//
// Note that the parent is used because the request would otherwise result
// in an empty response when both the local and remote tips are the same.
if peer.syncCandidate && m.hdrSyncState.headersSynced {
m.fetchNextHeaders(peer)
}

// Start syncing by choosing the best candidate if needed.
if peer.syncCandidate && m.syncPeer == nil {
m.startSync()
Expand Down Expand Up @@ -891,6 +935,55 @@ func (m *SyncManager) maybeUpdateIsCurrent() {
}
}

// maybeUpdateBestAnnouncedBlock potentially updates the block with the most
// cumulative proof of work that the given peer has announced which includes its
// associated hash, cumulative work sum, and height.
//
// This function is NOT safe for concurrent access. It must be called from the
// event handler goroutine.
func (m *SyncManager) maybeUpdateBestAnnouncedBlock(p *Peer, hash *chainhash.Hash, header *wire.BlockHeader) {
chain := m.cfg.Chain
workSum, err := chain.ChainWork(hash)
if err != nil {
return
}

// Update the best block and associated values when the cumulative work for
// given block exceeds that of the current best known block for the peer.
if p.bestAnnouncedWork == nil || workSum.Gt(p.bestAnnouncedWork) {
p.bestAnnouncedBlock = hash
p.bestAnnouncedWork = &workSum
p.UpdateLastBlockHeight(int64(header.Height))
}
}

// maybeResolveOrphanBlock potentially resolves the most recently announced
// block by the peer that did not connect to any headers known to the local
// chain at the time of the announcement by checking if it is now known and,
// when it is, potentially making it the block with the most cumulative proof of
// work announced by the peer if needed.
//
// This function is NOT safe for concurrent access. It must be called from the
// event handler goroutine.
func (m *SyncManager) maybeResolveOrphanBlock(p *Peer) {
// Nothing to do if there isn't a pending orphan block announcement that has
// not yet been resolved or the block still isn't known.
chain := m.cfg.Chain
blockHash := p.announcedOrphanBlock
if blockHash == nil || !chain.HaveHeader(blockHash) {
return
}

// The block has now been resolved, so potentially make it the block with
// the most cumulative proof of work announced by the peer.
header, err := chain.HeaderByHash(blockHash)
if err != nil {
log.Warnf("Unable to retrieve known good header %s: %v", blockHash, err)
return
}
m.maybeUpdateBestAnnouncedBlock(p, blockHash, &header)
}

// processBlock processes the provided block using the internal chain instance.
//
// When no errors occurred during processing, the first return value indicates
Expand Down Expand Up @@ -1057,30 +1150,6 @@ func (m *SyncManager) handleBlockMsg(bmsg *blockMsg) {
m.cfg.MixPool.ExpireMessagesInBackground(header.Height)
}

// Update the latest block height for the peer to avoid stale heights when
// looking for future potential sync node candidacy.
//
// Also, when the chain is considered current and the block was accepted to
// the main chain, update the heights of other peers whose invs may have
// been ignored when actively syncing while the chain was not yet current or
// lost the lock announcement race.
blockHeight := int64(header.Height)
peer.UpdateLastBlockHeight(blockHeight)
if onMainChain && m.IsCurrent() {
for p := range m.peers {
// The height for the sending peer is already updated.
if p == peer {
continue
}

lastAnnBlock := p.lastAnnouncedBlock
if lastAnnBlock != nil && *lastAnnBlock == *blockHash {
p.UpdateLastBlockHeight(blockHeight)
p.lastAnnouncedBlock = nil
}
}
}

// Request more blocks using the headers when the request queue is getting
// short.
if peer == m.syncPeer && len(peer.requestedBlocks) < minInFlightBlocks {
Expand Down Expand Up @@ -1167,45 +1236,66 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
firstHeaderConnects := chain.HaveHeader(&firstHeader.PrevBlock)
headersSynced := m.hdrSyncState.headersSynced
if !firstHeaderConnects {
// Ignore headers that do not connect to any known headers when the
// initial headers sync is taking place. It is expected that headers
// will be announced that are not yet known.
if !headersSynced {
return
}

// Attempt to detect block announcements which do not connect to any
// known headers and request any headers starting from the best header
// the local chain knows in order to (hopefully) discover the missing
// headers.
// headers unless the initial headers sync process is still in progress.
//
// Meanwhile, also keep track of how many times the peer has
// consecutively sent a headers message that does not connect and
// disconnect it once the max allowed threshold has been reached.
// consecutively sent a headers message that looks like an announcement
// that does not connect and disconnect it once the max allowed
// threshold has been reached.
if numHeaders < maxExpectedHeaderAnnouncementsPerMsg {
peer.numConsecutiveOrphanHeaders++
if peer.numConsecutiveOrphanHeaders >= maxConsecutiveOrphanHeaders {
log.Debugf("Received %d consecutive headers messages that do "+
"not connect from peer %s -- disconnecting",
peer.numConsecutiveOrphanHeaders, peer)
peer.Disconnect()
return
}

log.Debugf("Requesting missing parents for header %s (height %d) "+
"received from peer %s", firstHeaderHash, firstHeader.Height,
peer)
bestHeaderHash, _ := chain.BestHeader()
blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash)
locator := chainBlockLocatorToHashes(blkLocator)
peer.PushGetHeadersMsg(locator, &zeroHash)
if headersSynced {
log.Debugf("Requesting missing parents for header %s (height "+
"%d) received from peer %s", firstHeaderHash,
firstHeader.Height, peer)
bestHeaderHash, _ := chain.BestHeader()
blkLocator := chain.BlockLocatorFromHash(&bestHeaderHash)
locator := chainBlockLocatorToHashes(blkLocator)
peer.PushGetHeadersMsg(locator, &zeroHash)
}

// Track the final announced header as the most recently announced
// block by the peer that does not connect to any headers known to
// the local chain since there is a good chance it will eventually
// become known either from this peer or others.
m.maybeResolveOrphanBlock(peer)
finalHeader := headers[len(headers)-1]
finalHeaderHash := finalHeader.BlockHash()
peer.announcedOrphanBlock = &finalHeaderHash

// Update the latest block height for the peer to avoid stale
// heights when looking for future potential header sync node
// candidacy when the initial headers sync process is still in
// progess.
if !headersSynced {
peer.UpdateLastBlockHeight(int64(finalHeader.Height))
}
return
}

// The initial headers sync process is done and this does not appear to
// be a block announcement, so disconnect the peer.
log.Debugf("Received orphan header from peer %s -- disconnecting", peer)
peer.Disconnect()
// Disconnect the peer when the initial headers sync process is done and
// this does not appear to be a block announcement.
if headersSynced {
log.Debugf("Received orphan header from peer %s -- disconnecting",
peer)
peer.Disconnect()
return
}

// Ignore headers that do not connect to any known headers when the
// initial headers sync is taking place. It is expected that headers
// will be announced that are not yet known.
return
}

Expand Down Expand Up @@ -1273,12 +1363,13 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
// of the provided headers are successfully processed above.
peer.numConsecutiveOrphanHeaders = 0

// Update the last announced block to the final one in the announced headers
// above and update the height for the peer too.
// Potentially resolve a previously unknown announced block and then update
// the block with the most cumulative proof of work the peer has announced
// to the final announced header if needed.
finalHeader := headers[len(headers)-1]
finalReceivedHash := &headerHashes[len(headerHashes)-1]
peer.lastAnnouncedBlock = finalReceivedHash
peer.UpdateLastBlockHeight(int64(finalHeader.Height))
m.maybeResolveOrphanBlock(peer)
m.maybeUpdateBestAnnouncedBlock(peer, finalReceivedHash, finalHeader)

// Update the sync height if the new best known header height exceeds it.
syncHeight := m.SyncHeight()
Expand Down Expand Up @@ -1335,6 +1426,18 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
log.Info("Syncing chain")
m.progressLogger.SetLastLogTime(time.Now())

// Request headers starting from the parent of the best known header
// for the local chain from any sync candidates that have not yet
// had their best known block discovered now that the initial
// headers sync process is complete.
for peer := range m.peers {
m.maybeResolveOrphanBlock(peer)
if !peer.syncCandidate || peer.bestAnnouncedBlock != nil {
continue
}
m.fetchNextHeaders(peer)
}

// Potentially update whether the chain believes it is current now
// that the headers are synced.
chain.MaybeUpdateIsCurrent()
Expand All @@ -1361,14 +1464,19 @@ func (m *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
// Skip the block when it has already been requested or is otherwise
// already known.
hash := &headerHashes[i]
_, isRequestedBlock := m.requestedBlocks[*hash]
if isRequestedBlock || chain.HaveBlock(hash) {
if m.isRequestedBlock(hash) || chain.HaveBlock(hash) {
continue
}

// Stop requesting when the request would exceed the max size of the
// map used to track requests.
if len(m.requestedBlocks)+1 > maxRequestedBlocks {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just highlighting that this is a change in behavior. Previously limitAdd would enforce limits onto m.requestedBlocks and peer.requestedBlocks separately, but with the new code peer.requestedBlocks is not being limited.

Copy link
Member Author

@davecgh davecgh Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I probably should've called this out in the description, but I changed it because the two maps really need to stay in sync. Otherwise, peers can be hit with misbehavior if expected entries are not in one map or the other.

It's currently not an issue because it's impossible to hit the limiting cases of limitAdd since there is a single sync peer and there are never more outstanding requests than maxInFlightBlocks = 16.

break
}

m.requestedBlocks[*hash] = peer
peer.requestedBlocks[*hash] = struct{}{}
iv := wire.NewInvVect(wire.InvTypeBlock, hash)
limitAdd(m.requestedBlocks, *hash, maxRequestedBlocks)
limitAdd(peer.requestedBlocks, *hash, maxRequestedBlocks)
gdmsg.AddInvVect(iv)
}
if len(gdmsg.InvList) > 0 {
Expand Down Expand Up @@ -1534,15 +1642,23 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) {
}

if lastBlock != nil {
// Update the last announced block to the final one in the announced
// inventory above (if any). In the case the header for that block is
// already known, use that information to update the height for the peer
// too.
peer.lastAnnouncedBlock = &lastBlock.Hash
if isCurrent {
// Determine if the final announced block is already known to the local
// chain and then either track it as the most recently announced
// block by the peer that does not connect to any headers known to the
// local chain or potentially make it the block with the most cumulative
// proof of work announced by the peer when it is already known.
if !m.cfg.Chain.HaveHeader(&lastBlock.Hash) {
// Notice a copy of the hash is made here to avoid keeping a
// reference into the inventory vector which would prevent it from
// being GCd.
lastBlockHash := lastBlock.Hash
m.maybeResolveOrphanBlock(peer)
peer.announcedOrphanBlock = &lastBlockHash
} else {
header, err := m.cfg.Chain.HeaderByHash(&lastBlock.Hash)
if err == nil {
peer.UpdateLastBlockHeight(int64(header.Height))
m.maybeResolveOrphanBlock(peer)
m.maybeUpdateBestAnnouncedBlock(peer, &lastBlock.Hash, &header)
}
}
}
Expand Down Expand Up @@ -1819,12 +1935,9 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes,
// Add the blocks to the request.
msgResp := wire.NewMsgGetData()
for i := range blocks {
// If we've already requested this block, skip it.
// Skip the block when it has already been requested.
bh := &blocks[i]
_, alreadyReqP := peer.requestedBlocks[*bh]
_, alreadyReqB := m.requestedBlocks[*bh]

if alreadyReqP || alreadyReqB {
if m.isRequestedBlock(bh) {
continue
}

Expand All @@ -1841,7 +1954,7 @@ func (m *SyncManager) requestFromPeer(peer *Peer, blocks, voteHashes,
}

peer.requestedBlocks[*bh] = struct{}{}
m.requestedBlocks[*bh] = struct{}{}
m.requestedBlocks[*bh] = peer
}

addTxsToRequest := func(txs []chainhash.Hash, txType stake.TxType) error {
Expand Down Expand Up @@ -2068,7 +2181,7 @@ func New(config *Config) *SyncManager {
rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate),
rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]*Peer),
requestedMixMsgs: make(map[chainhash.Hash]struct{}),
peers: make(map[*Peer]struct{}),
minKnownWork: minKnownWork,
Expand Down