From 194c6398569abe0502f7e4f859e2943e671a6d58 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 5 Nov 2024 16:42:01 +0900 Subject: [PATCH 1/9] mempool: add addUtreexoData addUtreexoData saves the proof and the leaves in memory, allowing us to propagate proofs for transactions as a utreexo csn node. --- mempool/mempool.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/mempool/mempool.go b/mempool/mempool.go index 1ec39bcb..53d4643d 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -586,6 +586,22 @@ func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *btcutil return txD } +// addUtreexoData add the passed leaves to the memory pool and caches the proof to the accumulator. +// It should not be called directly as it doesn't perform any validation. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *TxPool) addUtreexoData(tx *btcutil.Tx, udata *wire.UData) error { + // Ingest the proof. Shouldn't error out with the proof being invalid + // here since we've already verified it above. + err := mp.cfg.VerifyUData(udata, tx.MsgTx().TxIn, true) + if err != nil { + return fmt.Errorf("error while ingesting proof. %v", err) + } + mp.poolLeaves[*tx.Hash()] = udata.LeafDatas + + return nil +} + // checkPoolDoubleSpend checks whether or not the passed transaction is // attempting to spend coins already spent by other transactions in the pool. // If it does, we'll check whether each of those transactions are signaling for From edfd1e18704fc176e217524aebd39a831873463c Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 5 Nov 2024 18:58:14 +0900 Subject: [PATCH 2/9] mempool, netsync: process utreexo data during tx validation Instead of using the UData that's currently part of the MsgTx, we take in utreexo data and use that to verify txs for utreexo nodes. This is done to support MsgUtreexoTx while reusing the same code as we currently do. --- mempool/interface.go | 2 +- mempool/mempool.go | 23 ++++++++++++++++------- netsync/manager.go | 4 +++- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/mempool/interface.go b/mempool/interface.go index 3b97c8aa..e83e9f5e 100644 --- a/mempool/interface.go +++ b/mempool/interface.go @@ -47,7 +47,7 @@ type TxMempool interface { // error is nil, the list will include the passed transaction itself // along with any additional orphan transactions that were added as a // result of the passed one being accepted. - ProcessTransaction(tx *btcutil.Tx, allowOrphan, + ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) // RemoveTransaction removes the passed transaction from the mempool. diff --git a/mempool/mempool.go b/mempool/mempool.go index 53d4643d..910fcf81 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -1019,14 +1019,14 @@ func (mp *TxPool) validateReplacement(tx *btcutil.Tx, // more details. // // This function MUST be called with the mempool lock held (for writes). -func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit, +func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, utreexoData *wire.UData, isNew, rateLimit, rejectDupOrphans bool) ([]*chainhash.Hash, *TxDesc, error) { txHash := tx.Hash() // Check for mempool acceptance. r, err := mp.checkMempoolAcceptance( - tx, nil, isNew, rateLimit, rejectDupOrphans, + tx, utreexoData, isNew, rateLimit, rejectDupOrphans, ) if err != nil { return nil, nil, err @@ -1054,6 +1054,13 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit, // it for the ingestion. mp.removeTransaction(conflict, false) } + if utreexoData != nil { + err = mp.addUtreexoData(tx, utreexoData) + if err != nil { + return nil, nil, err + } + } + txD := mp.addTransaction(r.utxoView, tx, r.bestHeight, int64(r.TxFee)) log.Debugf("Accepted transaction %v (pool size: %v)", txHash, @@ -1073,10 +1080,12 @@ func (mp *TxPool) maybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit, // be added to the orphan pool. // // This function is safe for concurrent access. -func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) { +func (mp *TxPool) MaybeAcceptTransaction(tx *btcutil.Tx, utreexoData *wire.UData, + isNew, rateLimit bool) ([]*chainhash.Hash, *TxDesc, error) { + // Protect concurrent access. mp.mtx.Lock() - hashes, txD, err := mp.maybeAcceptTransaction(tx, isNew, rateLimit, true) + hashes, txD, err := mp.maybeAcceptTransaction(tx, utreexoData, isNew, rateLimit, true) mp.mtx.Unlock() return hashes, txD, err @@ -1119,7 +1128,7 @@ func (mp *TxPool) processOrphans(acceptedTx *btcutil.Tx) []*TxDesc { // Potentially accept an orphan into the tx pool. for _, tx := range orphans { missing, txD, err := mp.maybeAcceptTransaction( - tx, true, true, false) + tx, nil, true, true, false) if err != nil { // The orphan is now invalid, so there // is no way any other orphans which @@ -1194,7 +1203,7 @@ func (mp *TxPool) ProcessOrphans(acceptedTx *btcutil.Tx) []*TxDesc { // the passed one being accepted. // // This function is safe for concurrent access. -func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) { +func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) { log.Tracef("Processing transaction %v", tx.Hash()) // Protect concurrent access. @@ -1202,7 +1211,7 @@ func (mp *TxPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool defer mp.mtx.Unlock() // Potentially accept the transaction to the memory pool. - missingParents, txD, err := mp.maybeAcceptTransaction(tx, true, rateLimit, + missingParents, txD, err := mp.maybeAcceptTransaction(tx, utreexoData, true, rateLimit, true) if err != nil { return nil, err diff --git a/netsync/manager.go b/netsync/manager.go index 486bfe72..2bcadd22 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -1749,8 +1749,10 @@ func (sm *SyncManager) handleBlockchainNotification(notification *blockchain.Not // Reinsert all of the transactions (except the coinbase) into // the transaction pool. + // + // TODO handle txs here for utreexo nodes. for _, tx := range block.Transactions()[1:] { - _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, + _, _, err := sm.txMemPool.MaybeAcceptTransaction(tx, nil, false, false) if err != nil { // Remove the transaction and all transactions From e920ba009e3203d113d7d261c7e82dcd03560ee6 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 5 Nov 2024 19:03:17 +0900 Subject: [PATCH 3/9] electrum, mempool, netsync, main: fixes to account for the new mempool interface --- electrum/server.go | 2 +- mempool/mempool_test.go | 24 ++++++++++++------------ mempool/mocks.go | 2 +- netsync/manager.go | 2 +- rpcserver.go | 2 +- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/electrum/server.go b/electrum/server.go index 69091c42..774beb52 100644 --- a/electrum/server.go +++ b/electrum/server.go @@ -523,7 +523,7 @@ func handleTransactionBroadcast(s *ElectrumServer, cmd *btcjson.Request, conn ne } tx.MsgTx().UData = udata - acceptedTxs, err := s.cfg.Mempool.ProcessTransaction(tx, false, false, 0) + acceptedTxs, err := s.cfg.Mempool.ProcessTransaction(tx, udata, false, false, 0) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 76c6f121..972de30b 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -406,7 +406,7 @@ func (ctx *testContext) addSignedTx(inputs []spendableOutput, ctx.harness.chain.SetMedianTimePast(time.Now()) } else { acceptedTxns, err := ctx.harness.txPool.ProcessTransaction( - tx, true, false, 0, + tx, nil, true, false, 0, ) if err != nil { ctx.t.Fatalf("unable to process transaction: %v", err) @@ -475,7 +475,7 @@ func TestSimpleOrphanChain(t *testing.T) { // Ensure the orphans are accepted (only up to the maximum allowed so // none are evicted). for _, tx := range chainedTxns[1 : maxOrphans+1] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -498,7 +498,7 @@ func TestSimpleOrphanChain(t *testing.T) { // all get accepted. Notice the accept orphans flag is also false here // to ensure it has no bearing on whether or not already existing // orphans in the pool are linked. - acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0], + acceptedTxns, err := harness.txPool.ProcessTransaction(chainedTxns[0], nil, false, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -537,7 +537,7 @@ func TestOrphanReject(t *testing.T) { // Ensure orphans are rejected when the allow orphans flag is not set. for _, tx := range chainedTxns[1:] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, false, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, false, false, 0) if err == nil { t.Fatalf("ProcessTransaction: did not fail on orphan "+ @@ -594,7 +594,7 @@ func TestOrphanEviction(t *testing.T) { // Add enough orphans to exceed the max allowed while ensuring they are // all accepted. This will cause an eviction. for _, tx := range chainedTxns[1:] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -658,7 +658,7 @@ func TestBasicOrphanRemoval(t *testing.T) { // Ensure the orphans are accepted (only up to the maximum allowed so // none are evicted). for _, tx := range chainedTxns[1 : maxOrphans+1] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -733,7 +733,7 @@ func TestOrphanChainRemoval(t *testing.T) { // Ensure the orphans are accepted (only up to the maximum allowed so // none are evicted). for _, tx := range chainedTxns[1 : maxOrphans+1] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -796,7 +796,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) { // Start by adding the orphan transactions from the generated chain // except the final one. for _, tx := range chainedTxns[1:maxOrphans] { - acceptedTxns, err := harness.txPool.ProcessTransaction(tx, true, + acceptedTxns, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid "+ @@ -822,7 +822,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) { if err != nil { t.Fatalf("unable to create signed tx: %v", err) } - acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx, + acceptedTxns, err := harness.txPool.ProcessTransaction(doubleSpendTx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid orphan %v", @@ -841,7 +841,7 @@ func TestMultiInputOrphanDoubleSpend(t *testing.T) { // // This will cause the shared output to become a concrete spend which // will in turn must cause the double spending orphan to be removed. - acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0], + acceptedTxns, err = harness.txPool.ProcessTransaction(chainedTxns[0], nil, false, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept valid tx %v", err) @@ -889,7 +889,7 @@ func TestCheckSpend(t *testing.T) { t.Fatalf("unable to create transaction chain: %v", err) } for _, tx := range chainedTxns { - _, err := harness.txPool.ProcessTransaction(tx, true, + _, err := harness.txPool.ProcessTransaction(tx, nil, true, false, 0) if err != nil { t.Fatalf("ProcessTransaction: failed to accept "+ @@ -1817,7 +1817,7 @@ func TestRBF(t *testing.T) { // it's not a valid one, we should see the error // expected by the test. _, err = ctx.harness.txPool.ProcessTransaction( - replacementTx, false, false, 0, + replacementTx, nil, false, false, 0, ) if testCase.err == "" && err != nil { ctx.t.Fatalf("expected no error when "+ diff --git a/mempool/mocks.go b/mempool/mocks.go index 1a762453..c2707270 100644 --- a/mempool/mocks.go +++ b/mempool/mocks.go @@ -73,7 +73,7 @@ func (m *MockTxMempool) HaveTransaction(hash *chainhash.Hash) bool { // free-standing transactions into the memory pool. It includes functionality // such as rejecting duplicate transactions, ensuring transactions follow all // rules, orphan transaction handling, and insertion into the memory pool. -func (m *MockTxMempool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, +func (m *MockTxMempool) ProcessTransaction(tx *btcutil.Tx, utreexoData *wire.UData, allowOrphan, rateLimit bool, tag Tag) ([]*TxDesc, error) { args := m.Called(tx, allowOrphan, rateLimit, tag) diff --git a/netsync/manager.go b/netsync/manager.go index 2bcadd22..0d96918d 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -620,7 +620,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. - acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx, + acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx, nil, true, true, mempool.Tag(peer.ID())) // Remove transaction from request maps. Either the mempool/chain diff --git a/rpcserver.go b/rpcserver.go index 9a7f7f0b..cffb743c 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4273,7 +4273,7 @@ func handleSearchRawTransactions(s *rpcServer, cmd interface{}, closeChan <-chan // rpcProcessTx checks that the tx is accepted into the mempool and relays it to peers // and other processes. func (s *rpcServer) rpcProcessTx(tx *btcutil.Tx, allowOrphan, rateLimit bool) error { - acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, allowOrphan, rateLimit, 0) + acceptedTxs, err := s.cfg.TxMemPool.ProcessTransaction(tx, nil, allowOrphan, rateLimit, 0) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, From b3028870ebda23a42d5a2e667a015a889ece443d Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Thu, 7 Nov 2024 15:09:30 +0900 Subject: [PATCH 4/9] peer: add code to handle utreexotx --- peer/peer.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/peer/peer.go b/peer/peer.go index 497e8651..e061c9e1 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -117,6 +117,9 @@ type MessageListeners struct { // OnTx is invoked when a peer receives a tx bitcoin message. OnTx func(p *Peer, msg *wire.MsgTx) + // OnUtreexoTx is invoked when a peer receives a utreexo tx bitcoin message. + OnUtreexoTx func(p *Peer, msg *wire.MsgUtreexoTx) + // OnBlock is invoked when a peer receives a block bitcoin message. OnBlock func(p *Peer, msg *wire.MsgBlock, buf []byte) @@ -1175,6 +1178,7 @@ func (p *Peer) maybeAddDeadline(pendingResponses map[string]time.Time, msgCmd st pendingResponses[wire.CmdBlock] = deadline pendingResponses[wire.CmdMerkleBlock] = deadline pendingResponses[wire.CmdTx] = deadline + pendingResponses[wire.CmdUtreexoTx] = deadline pendingResponses[wire.CmdNotFound] = deadline case wire.CmdGetHeaders: @@ -1234,10 +1238,13 @@ out: fallthrough case wire.CmdTx: fallthrough + case wire.CmdUtreexoTx: + fallthrough case wire.CmdNotFound: delete(pendingResponses, wire.CmdBlock) delete(pendingResponses, wire.CmdMerkleBlock) delete(pendingResponses, wire.CmdTx) + delete(pendingResponses, wire.CmdUtreexoTx) delete(pendingResponses, wire.CmdNotFound) default: @@ -1439,6 +1446,11 @@ out: p.cfg.Listeners.OnTx(p, msg) } + case *wire.MsgUtreexoTx: + if p.cfg.Listeners.OnUtreexoTx != nil { + p.cfg.Listeners.OnUtreexoTx(p, msg) + } + case *wire.MsgBlock: if p.cfg.Listeners.OnBlock != nil { p.cfg.Listeners.OnBlock(p, msg, buf) From 7384937903d55e6bca88a0cd330a007f0972f713 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Thu, 7 Nov 2024 15:20:59 +0900 Subject: [PATCH 5/9] main: send over utreexotxs instead of txs with utreexo encoding We're favoring a new message over a utreexo encoding for propagating txs for utreexo nodes. So when we're pushing out txs for utreexo tx invs, send out utreexotxs. --- server.go | 154 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 86 insertions(+), 68 deletions(-) diff --git a/server.go b/server.go index 610308aa..542791d6 100644 --- a/server.go +++ b/server.go @@ -1562,26 +1562,50 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, packedPositions return err } - // If the requsted encoding is a utreexo encoding, then also grab the - // utreexo proof for the tx. - if encoding&wire.UtreexoEncoding == wire.UtreexoEncoding { - // If utreexo proof index is not present, we can't send the tx - // as we can't grab the proof for the tx. - if s.utreexoProofIndex == nil && s.flatUtreexoProofIndex == nil && cfg.NoUtreexo { - err := fmt.Errorf("UtreexoProofIndex and FlatUtreexoProofIndex is nil. " + - "Cannot fetch utreexo accumulator proofs.") - srvrLog.Debugf(err.Error()) + // If the requsted encoding is not a utreexo encoding, send the tx over and return. + if encoding&wire.UtreexoEncoding != wire.UtreexoEncoding { + // Once we have fetched data wait for any previous operation to finish. + if waitChan != nil { + <-waitChan + } + + sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding) + return nil + } + + // If utreexo proof index is not present, we can't send the tx + // as we can't grab the proof for the tx. + if s.utreexoProofIndex == nil && s.flatUtreexoProofIndex == nil && cfg.NoUtreexo { + err := fmt.Errorf("UtreexoProofIndex and FlatUtreexoProofIndex is nil. " + + "Cannot fetch utreexo accumulator proofs.") + srvrLog.Debugf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} + } + + return err + } + + var utreexoTx *wire.MsgUtreexoTx + // For compact state nodes. + if !cfg.NoUtreexo { + // Fetch the necessary leafdatas to create the utreexo data. + leafDatas, err := s.txMemPool.FetchLeafDatas(tx.Hash()) + if err != nil { + chanLog.Errorf(err.Error()) if doneChan != nil { doneChan <- struct{}{} } - return err } - // For compact state nodes. - if !cfg.NoUtreexo { - // Fetch the necessary leafdatas to create the utreexo data. - leafDatas, err := s.txMemPool.FetchLeafDatas(tx.Hash()) + btcdLog.Debugf("fetched %v for tx %s", leafDatas, tx.Hash()) + + // Packed positions may be nil or of a length 0 if the + // peer already has all the necessary proof hashes cached. + if packedPositions != nil || len(packedPositions) == 0 { + positions := chainhash.PackedHashesToUint64(packedPositions) + ud, err := s.chain.GenerateUDataPartial(leafDatas, positions) if err != nil { chanLog.Errorf(err.Error()) if doneChan != nil { @@ -1590,69 +1614,63 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, packedPositions return err } - btcdLog.Debugf("fetched %v for tx %s", leafDatas, tx.Hash()) - - // Packed positions may be nil or of a length 0 if the - // peer already has all the necessary proof hashes cached. - if packedPositions != nil || len(packedPositions) == 0 { - positions := chainhash.PackedHashesToUint64(packedPositions) - ud, err := s.chain.GenerateUDataPartial(leafDatas, positions) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err - } - - tx.MsgTx().UData = ud + utreexoTx = &wire.MsgUtreexoTx{ + MsgTx: *tx.MsgTx(), + UData: *ud, } } - // For bridge nodes. - if s.utreexoProofIndex != nil { - if packedPositions != nil || len(packedPositions) == 0 { - leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err + } + + // For bridge nodes. + if s.utreexoProofIndex != nil { + if packedPositions != nil || len(packedPositions) == 0 { + leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain) + if err != nil { + chanLog.Errorf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} } + return err + } - positions := chainhash.PackedHashesToUint64(packedPositions) - ud, err := s.utreexoProofIndex.GenerateUDataPartial(leafDatas, positions) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err + positions := chainhash.PackedHashesToUint64(packedPositions) + ud, err := s.utreexoProofIndex.GenerateUDataPartial(leafDatas, positions) + if err != nil { + chanLog.Errorf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} } + return err + } - tx.MsgTx().UData = ud + utreexoTx = &wire.MsgUtreexoTx{ + MsgTx: *tx.MsgTx(), + UData: *ud, } - } else if s.flatUtreexoProofIndex != nil { - if packedPositions != nil || len(packedPositions) == 0 { - leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err + } + } else if s.flatUtreexoProofIndex != nil { + if packedPositions != nil || len(packedPositions) == 0 { + leafDatas, err := blockchain.TxToDelLeaves(tx, s.chain) + if err != nil { + chanLog.Errorf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} } - positions := chainhash.PackedHashesToUint64(packedPositions) - ud, err := s.flatUtreexoProofIndex.GenerateUDataPartial(leafDatas, positions) - if err != nil { - chanLog.Errorf(err.Error()) - if doneChan != nil { - doneChan <- struct{}{} - } - return err + return err + } + positions := chainhash.PackedHashesToUint64(packedPositions) + ud, err := s.flatUtreexoProofIndex.GenerateUDataPartial(leafDatas, positions) + if err != nil { + chanLog.Errorf(err.Error()) + if doneChan != nil { + doneChan <- struct{}{} } + return err + } - tx.MsgTx().UData = ud + utreexoTx = &wire.MsgUtreexoTx{ + MsgTx: *tx.MsgTx(), + UData: *ud, } } } @@ -1662,7 +1680,7 @@ func (s *server) pushTxMsg(sp *serverPeer, hash *chainhash.Hash, packedPositions <-waitChan } - sp.QueueMessageWithEncoding(tx.MsgTx(), doneChan, encoding) + sp.QueueMessageWithEncoding(utreexoTx, doneChan, wire.WitnessEncoding) return nil } From 870f174bfa1b85a238b42468489e0fdcb3f7594f Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Thu, 7 Nov 2024 15:25:33 +0900 Subject: [PATCH 6/9] netsync: add code to handle utreexo txs --- netsync/manager.go | 40 +++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/netsync/manager.go b/netsync/manager.go index 0d96918d..e87b7c24 100644 --- a/netsync/manager.go +++ b/netsync/manager.go @@ -100,6 +100,14 @@ type txMsg struct { reply chan struct{} } +// utreexoTxMsg packages a bitcoin utreexo tx message and the peer it came from together +// so the block handler has access to that information. +type utreexoTxMsg struct { + utreexoTx *btcutil.UtreexoTx + peer *peerpkg.Peer + reply chan struct{} +} + // getSyncPeerMsg is a message type to be sent across the message channel for // retrieving the current sync peer. type getSyncPeerMsg struct { @@ -591,8 +599,7 @@ func (sm *SyncManager) updateSyncPeer(dcSyncPeer bool) { } // handleTxMsg handles transaction messages from all peers. -func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { - peer := tmsg.peer +func (sm *SyncManager) handleTxMsg(tx *btcutil.Tx, peer *peerpkg.Peer, utreexoData *wire.UData) { state, exists := sm.peerStates[peer] if !exists { log.Warnf("Received tx message from unknown peer %s", peer) @@ -607,7 +614,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { // spec to proliferate. While this is not ideal, there is no check here // to disconnect peers for sending unsolicited transactions to provide // interoperability. - txHash := tmsg.tx.Hash() + txHash := tx.Hash() // Ignore transactions that we have already rejected. Do not // send a reject message here because if the transaction was already @@ -620,7 +627,7 @@ func (sm *SyncManager) handleTxMsg(tmsg *txMsg) { // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. - acceptedTxs, err := sm.txMemPool.ProcessTransaction(tmsg.tx, nil, + acceptedTxs, err := sm.txMemPool.ProcessTransaction(tx, utreexoData, true, true, mempool.Tag(peer.ID())) // Remove transaction from request maps. Either the mempool/chain @@ -1380,9 +1387,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { // Add it to the request queue. state.requestQueue = append(state.requestQueue, iv) - // If the inv is for a utreexo tx, then also pop off the utreexo - // proof hash invs and add it to the request queue. - if peer.IsUtreexoEnabled() { + if sm.chain.IsUtreexoViewActive() { switch iv.Type { case wire.InvTypeTx: case wire.InvTypeWitnessTx: @@ -1549,7 +1554,7 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { } // Add in the utreexo flag then add the tx inv. - iv.Type |= wire.InvUtreexoFlag + iv.Type = wire.InvTypeUtreexoTx gdmsg.AddInvVect(iv) numRequested++ @@ -1597,7 +1602,11 @@ out: sm.handleNewPeerMsg(msg.peer) case *txMsg: - sm.handleTxMsg(msg) + sm.handleTxMsg(msg.tx, msg.peer, nil) + msg.reply <- struct{}{} + + case *utreexoTxMsg: + sm.handleTxMsg(&msg.utreexoTx.Tx, msg.peer, &msg.utreexoTx.MsgUtreexoTx().UData) msg.reply <- struct{}{} case *blockMsg: @@ -1791,6 +1800,19 @@ func (sm *SyncManager) QueueTx(tx *btcutil.Tx, peer *peerpkg.Peer, done chan str sm.msgChan <- &txMsg{tx: tx, peer: peer, reply: done} } +// QueueUtreexoTx adds the passed transaction message and peer to the block handling +// queue. Responds to the done channel argument after the utreexo tx message is +// processed. +func (sm *SyncManager) QueueUtreexoTx(tx *btcutil.UtreexoTx, peer *peerpkg.Peer, done chan struct{}) { + // Don't accept more transactions if we're shutting down. + if atomic.LoadInt32(&sm.shutdown) != 0 { + done <- struct{}{} + return + } + + sm.msgChan <- &utreexoTxMsg{utreexoTx: tx, peer: peer, reply: done} +} + // QueueBlock adds the passed block message and peer to the block handling // queue. Responds to the done channel argument after the block message is // processed. From 2e239fb2fc45b898ac63e01a22538616c8a18bd1 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Thu, 7 Nov 2024 15:25:56 +0900 Subject: [PATCH 7/9] main: add OnUtreexoTx We add support for handling utreexotx messages that we receive from peers. --- server.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/server.go b/server.go index 542791d6..95036884 100644 --- a/server.go +++ b/server.go @@ -601,6 +601,33 @@ func (sp *serverPeer) OnTx(_ *peer.Peer, msg *wire.MsgTx) { <-sp.txProcessed } +// OnUtreexoTx is invoked when a peer receives a utreexo tx bitcoin message. +// It blocks until the bitcoin transaction has been fully processed. Unlock the block +// handler this does not serialize all transactions through a single thread +// transactions don't rely on the previous one in a linear fashion like blocks. +func (sp *serverPeer) OnUtreexoTx(_ *peer.Peer, msg *wire.MsgUtreexoTx) { + if cfg.BlocksOnly { + peerLog.Tracef("Ignoring utreexo tx %v from %v - blocksonly enabled", + msg.TxHash(), sp) + return + } + + // Add the transaction to the known inventory for the peer. + // Convert the raw MsgUtreexoTx to a btcutil.UtreexoTx which provides some convenience + // methods and things such as hash caching. + tx := btcutil.NewUtreexoTx(msg) + iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) + sp.AddKnownInventory(iv) + + // Queue the transaction up to be handled by the sync manager and + // intentionally block further receives until the transaction is fully + // processed and known good or bad. This helps prevent a malicious peer + // from queuing up a bunch of bad transactions before disconnecting (or + // being disconnected) and wasting memory. + sp.server.syncManager.QueueUtreexoTx(tx, sp.Peer, sp.txProcessed) + <-sp.txProcessed +} + // OnBlock is invoked when a peer receives a block bitcoin message. It // blocks until the bitcoin block has been fully processed. func (sp *serverPeer) OnBlock(_ *peer.Peer, msg *wire.MsgBlock, buf []byte) { @@ -2408,6 +2435,7 @@ func newPeerConfig(sp *serverPeer) *peer.Config { OnVerAck: sp.OnVerAck, OnMemPool: sp.OnMemPool, OnTx: sp.OnTx, + OnUtreexoTx: sp.OnUtreexoTx, OnBlock: sp.OnBlock, OnInv: sp.OnInv, OnHeaders: sp.OnHeaders, From f1236dfaab103b28ce8239130bea1d5e9d85f48a Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Fri, 8 Nov 2024 18:59:21 +0900 Subject: [PATCH 8/9] wire: don't serialize/deserialize udata for msgtx --- wire/msgtx.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/wire/msgtx.go b/wire/msgtx.go index 8a959a03..d88f7431 100644 --- a/wire/msgtx.go +++ b/wire/msgtx.go @@ -679,14 +679,6 @@ func (msg *MsgTx) BtcDecode(r io.Reader, pver uint32, enc MessageEncoding) error scriptPool.Return(pkScript) } - if enc&UtreexoEncoding == UtreexoEncoding { - msg.UData = new(UData) - err = msg.UData.DeserializeCompact(r) - if err != nil { - return err - } - } - return nil } @@ -786,17 +778,6 @@ func (msg *MsgTx) BtcEncode(w io.Writer, pver uint32, enc MessageEncoding) error return err } - if enc&UtreexoEncoding == UtreexoEncoding { - // AccProof can be nil for transactions that are included in - // a block. - if msg.UData != nil { - err = msg.UData.SerializeCompact(w) - if err != nil { - return err - } - } - } - return nil } From 1eb08e428e37e17b4e63a226e0a4243542b8f5ec Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Mon, 11 Nov 2024 16:01:23 +0900 Subject: [PATCH 9/9] wire: don't serialize udata for txs Since txs don't serialize udata anymore, this code doesn't change anything about the behavior of the code but makes it much more explicit that udata will not be serialized for txs. --- wire/msgblock.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/wire/msgblock.go b/wire/msgblock.go index ad4b306f..812e143a 100644 --- a/wire/msgblock.go +++ b/wire/msgblock.go @@ -234,8 +234,15 @@ func (msg *MsgBlock) BtcEncode(w io.Writer, pver uint32, enc MessageEncoding) er return err } + // Unset UtreexoEncoding for the encoding that we'll pass off to the + // tx.BtcDecode(). This is done as tx.BtcDecode() expects Utreexo + // Proofs to be appended to each tx if UtreexoEncoding bit is turned on. + // However, this only applies to mempool txs and there are no separate + // Utreexo Proofs for individual txs as the MsgBlock contains a proof + // for all the txs. + txEncoding := enc &^ UtreexoEncoding for _, tx := range msg.Transactions { - err = tx.BtcEncode(w, pver, enc) + err = tx.BtcEncode(w, pver, txEncoding) if err != nil { return err }